1use crate::{
20 error::Error,
21 schema_equality, types,
22 util::MapHelper,
23 validator::{
24 validate_enum_symbol_name, validate_namespace, validate_record_field_name,
25 validate_schema_name,
26 },
27 AvroResult,
28};
29use digest::Digest;
30use log::{debug, error, warn};
31use serde::{
32 ser::{SerializeMap, SerializeSeq},
33 Deserialize, Serialize, Serializer,
34};
35use serde_json::{Map, Value};
36use std::{
37 borrow::Borrow,
38 collections::{BTreeMap, HashMap, HashSet},
39 fmt,
40 fmt::Debug,
41 hash::Hash,
42 io::Read,
43 str::FromStr,
44};
45use strum_macros::{Display, EnumDiscriminants, EnumString};
46
47pub struct SchemaFingerprint {
51 pub bytes: Vec<u8>,
52}
53
54impl fmt::Display for SchemaFingerprint {
55 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
56 write!(
57 f,
58 "{}",
59 self.bytes
60 .iter()
61 .map(|byte| format!("{byte:02x}"))
62 .collect::<Vec<String>>()
63 .join("")
64 )
65 }
66}
67
68#[derive(Clone, Debug, EnumDiscriminants, Display)]
72#[strum_discriminants(name(SchemaKind), derive(Hash, Ord, PartialOrd))]
73pub enum Schema {
74 Null,
76 Boolean,
78 Int,
80 Long,
82 Float,
84 Double,
86 Bytes,
89 String,
92 Array(ArraySchema),
95 Map(MapSchema),
99 Union(UnionSchema),
101 Record(RecordSchema),
103 Enum(EnumSchema),
105 Fixed(FixedSchema),
107 Decimal(DecimalSchema),
110 BigDecimal,
113 Uuid,
115 Date,
118 TimeMillis,
121 TimeMicros,
124 TimestampMillis,
126 TimestampMicros,
128 TimestampNanos,
130 LocalTimestampMillis,
132 LocalTimestampMicros,
134 LocalTimestampNanos,
136 Duration,
138 Ref { name: Name },
140}
141
142#[derive(Clone, Debug, PartialEq)]
143pub struct MapSchema {
144 pub types: Box<Schema>,
145 pub attributes: BTreeMap<String, Value>,
146}
147
148#[derive(Clone, Debug, PartialEq)]
149pub struct ArraySchema {
150 pub items: Box<Schema>,
151 pub attributes: BTreeMap<String, Value>,
152}
153
154impl PartialEq for Schema {
155 fn eq(&self, other: &Self) -> bool {
160 schema_equality::compare_schemata(self, other)
161 }
162}
163
164impl SchemaKind {
165 pub fn is_primitive(self) -> bool {
166 matches!(
167 self,
168 SchemaKind::Null
169 | SchemaKind::Boolean
170 | SchemaKind::Int
171 | SchemaKind::Long
172 | SchemaKind::Double
173 | SchemaKind::Float
174 | SchemaKind::Bytes
175 | SchemaKind::String,
176 )
177 }
178
179 pub fn is_named(self) -> bool {
180 matches!(
181 self,
182 SchemaKind::Record | SchemaKind::Enum | SchemaKind::Fixed | SchemaKind::Ref
183 )
184 }
185}
186
187impl From<&types::Value> for SchemaKind {
188 fn from(value: &types::Value) -> Self {
189 use crate::types::Value;
190 match value {
191 Value::Null => Self::Null,
192 Value::Boolean(_) => Self::Boolean,
193 Value::Int(_) => Self::Int,
194 Value::Long(_) => Self::Long,
195 Value::Float(_) => Self::Float,
196 Value::Double(_) => Self::Double,
197 Value::Bytes(_) => Self::Bytes,
198 Value::String(_) => Self::String,
199 Value::Array(_) => Self::Array,
200 Value::Map(_) => Self::Map,
201 Value::Union(_, _) => Self::Union,
202 Value::Record(_) => Self::Record,
203 Value::Enum(_, _) => Self::Enum,
204 Value::Fixed(_, _) => Self::Fixed,
205 Value::Decimal { .. } => Self::Decimal,
206 Value::BigDecimal(_) => Self::BigDecimal,
207 Value::Uuid(_) => Self::Uuid,
208 Value::Date(_) => Self::Date,
209 Value::TimeMillis(_) => Self::TimeMillis,
210 Value::TimeMicros(_) => Self::TimeMicros,
211 Value::TimestampMillis(_) => Self::TimestampMillis,
212 Value::TimestampMicros(_) => Self::TimestampMicros,
213 Value::TimestampNanos(_) => Self::TimestampNanos,
214 Value::LocalTimestampMillis(_) => Self::LocalTimestampMillis,
215 Value::LocalTimestampMicros(_) => Self::LocalTimestampMicros,
216 Value::LocalTimestampNanos(_) => Self::LocalTimestampNanos,
217 Value::Duration { .. } => Self::Duration,
218 }
219 }
220}
221
222#[derive(Clone, Debug, Hash, PartialEq, Eq)]
233pub struct Name {
234 pub name: String,
235 pub namespace: Namespace,
236}
237
238pub type Documentation = Option<String>;
240pub type Aliases = Option<Vec<Alias>>;
242pub(crate) type Names = HashMap<Name, Schema>;
244pub type NamesRef<'a> = HashMap<Name, &'a Schema>;
246pub type Namespace = Option<String>;
248
249impl Name {
250 pub fn new(name: &str) -> AvroResult<Self> {
254 let (name, namespace) = Name::get_name_and_namespace(name)?;
255 Ok(Self {
256 name,
257 namespace: namespace.filter(|ns| !ns.is_empty()),
258 })
259 }
260
261 fn get_name_and_namespace(name: &str) -> AvroResult<(String, Namespace)> {
262 validate_schema_name(name)
263 }
264
265 pub(crate) fn parse(
267 complex: &Map<String, Value>,
268 enclosing_namespace: &Namespace,
269 ) -> AvroResult<Self> {
270 let (name, namespace_from_name) = complex
271 .name()
272 .map(|name| Name::get_name_and_namespace(name.as_str()).unwrap())
273 .ok_or(Error::GetNameField)?;
274 let type_name = match complex.get("type") {
276 Some(Value::Object(complex_type)) => complex_type.name().or(None),
277 _ => None,
278 };
279
280 let namespace = namespace_from_name
281 .or_else(|| {
282 complex
283 .string("namespace")
284 .or_else(|| enclosing_namespace.clone())
285 })
286 .filter(|ns| !ns.is_empty());
287
288 if let Some(ref ns) = namespace {
289 validate_namespace(ns)?;
290 }
291
292 Ok(Self {
293 name: type_name.unwrap_or(name),
294 namespace,
295 })
296 }
297
298 pub fn fullname(&self, default_namespace: Namespace) -> String {
303 if self.name.contains('.') {
304 self.name.clone()
305 } else {
306 let namespace = self.namespace.clone().or(default_namespace);
307
308 match namespace {
309 Some(ref namespace) if !namespace.is_empty() => {
310 format!("{}.{}", namespace, self.name)
311 }
312 _ => self.name.clone(),
313 }
314 }
315 }
316
317 pub fn fully_qualified_name(&self, enclosing_namespace: &Namespace) -> Name {
331 Name {
332 name: self.name.clone(),
333 namespace: self
334 .namespace
335 .clone()
336 .or_else(|| enclosing_namespace.clone().filter(|ns| !ns.is_empty())),
337 }
338 }
339}
340
341impl From<&str> for Name {
342 fn from(name: &str) -> Self {
343 Name::new(name).unwrap()
344 }
345}
346
347impl fmt::Display for Name {
348 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
349 f.write_str(&self.fullname(None)[..])
350 }
351}
352
353impl<'de> Deserialize<'de> for Name {
354 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
355 where
356 D: serde::de::Deserializer<'de>,
357 {
358 Value::deserialize(deserializer).and_then(|value| {
359 use serde::de::Error;
360 if let Value::Object(json) = value {
361 Name::parse(&json, &None).map_err(Error::custom)
362 } else {
363 Err(Error::custom(format!("Expected a JSON object: {value:?}")))
364 }
365 })
366 }
367}
368
369#[derive(Clone, Debug, Hash, PartialEq, Eq)]
372pub struct Alias(Name);
373
374impl Alias {
375 pub fn new(name: &str) -> AvroResult<Self> {
376 Name::new(name).map(Self)
377 }
378
379 pub fn name(&self) -> String {
380 self.0.name.clone()
381 }
382
383 pub fn namespace(&self) -> Namespace {
384 self.0.namespace.clone()
385 }
386
387 pub fn fullname(&self, default_namespace: Namespace) -> String {
388 self.0.fullname(default_namespace)
389 }
390
391 pub fn fully_qualified_name(&self, default_namespace: &Namespace) -> Name {
392 self.0.fully_qualified_name(default_namespace)
393 }
394}
395
396impl From<&str> for Alias {
397 fn from(name: &str) -> Self {
398 Alias::new(name).unwrap()
399 }
400}
401
402impl Serialize for Alias {
403 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
404 where
405 S: Serializer,
406 {
407 serializer.serialize_str(&self.fullname(None))
408 }
409}
410
411#[derive(Debug)]
412pub struct ResolvedSchema<'s> {
413 names_ref: NamesRef<'s>,
414 schemata: Vec<&'s Schema>,
415}
416
417impl<'s> TryFrom<&'s Schema> for ResolvedSchema<'s> {
418 type Error = Error;
419
420 fn try_from(schema: &'s Schema) -> AvroResult<Self> {
421 let names = HashMap::new();
422 let mut rs = ResolvedSchema {
423 names_ref: names,
424 schemata: vec![schema],
425 };
426 rs.resolve(rs.get_schemata(), &None, None)?;
427 Ok(rs)
428 }
429}
430
431impl<'s> TryFrom<Vec<&'s Schema>> for ResolvedSchema<'s> {
432 type Error = Error;
433
434 fn try_from(schemata: Vec<&'s Schema>) -> AvroResult<Self> {
435 let names = HashMap::new();
436 let mut rs = ResolvedSchema {
437 names_ref: names,
438 schemata,
439 };
440 rs.resolve(rs.get_schemata(), &None, None)?;
441 Ok(rs)
442 }
443}
444
445impl<'s> ResolvedSchema<'s> {
446 pub fn get_schemata(&self) -> Vec<&'s Schema> {
447 self.schemata.clone()
448 }
449
450 pub fn get_names(&self) -> &NamesRef<'s> {
451 &self.names_ref
452 }
453
454 pub fn new_with_known_schemata<'n>(
458 schemata_to_resolve: Vec<&'s Schema>,
459 enclosing_namespace: &Namespace,
460 known_schemata: &'n NamesRef<'n>,
461 ) -> AvroResult<Self> {
462 let names = HashMap::new();
463 let mut rs = ResolvedSchema {
464 names_ref: names,
465 schemata: schemata_to_resolve,
466 };
467 rs.resolve(rs.get_schemata(), enclosing_namespace, Some(known_schemata))?;
468 Ok(rs)
469 }
470
471 fn resolve<'n>(
472 &mut self,
473 schemata: Vec<&'s Schema>,
474 enclosing_namespace: &Namespace,
475 known_schemata: Option<&'n NamesRef<'n>>,
476 ) -> AvroResult<()> {
477 for schema in schemata {
478 match schema {
479 Schema::Array(schema) => {
480 self.resolve(vec![&schema.items], enclosing_namespace, known_schemata)?
481 }
482 Schema::Map(schema) => {
483 self.resolve(vec![&schema.types], enclosing_namespace, known_schemata)?
484 }
485 Schema::Union(UnionSchema { schemas, .. }) => {
486 for schema in schemas {
487 self.resolve(vec![schema], enclosing_namespace, known_schemata)?
488 }
489 }
490 Schema::Enum(EnumSchema { name, .. }) | Schema::Fixed(FixedSchema { name, .. }) => {
491 let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
492 if self
493 .names_ref
494 .insert(fully_qualified_name.clone(), schema)
495 .is_some()
496 {
497 return Err(Error::AmbiguousSchemaDefinition(fully_qualified_name));
498 }
499 }
500 Schema::Record(RecordSchema { name, fields, .. }) => {
501 let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
502 if self
503 .names_ref
504 .insert(fully_qualified_name.clone(), schema)
505 .is_some()
506 {
507 return Err(Error::AmbiguousSchemaDefinition(fully_qualified_name));
508 } else {
509 let record_namespace = fully_qualified_name.namespace;
510 for field in fields {
511 self.resolve(vec![&field.schema], &record_namespace, known_schemata)?
512 }
513 }
514 }
515 Schema::Ref { name } => {
516 let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
517 if !self.names_ref.contains_key(&fully_qualified_name) {
519 let is_resolved_with_known_schemas = known_schemata
520 .as_ref()
521 .map(|names| names.contains_key(&fully_qualified_name))
522 .unwrap_or(false);
523 if !is_resolved_with_known_schemas {
524 return Err(Error::SchemaResolutionError(fully_qualified_name));
525 }
526 }
527 }
528 _ => (),
529 }
530 }
531 Ok(())
532 }
533}
534
535pub(crate) struct ResolvedOwnedSchema {
536 names: Names,
537 root_schema: Schema,
538}
539
540impl TryFrom<Schema> for ResolvedOwnedSchema {
541 type Error = Error;
542
543 fn try_from(schema: Schema) -> AvroResult<Self> {
544 let names = HashMap::new();
545 let mut rs = ResolvedOwnedSchema {
546 names,
547 root_schema: schema,
548 };
549 resolve_names(&rs.root_schema, &mut rs.names, &None)?;
550 Ok(rs)
551 }
552}
553
554impl ResolvedOwnedSchema {
555 pub(crate) fn get_root_schema(&self) -> &Schema {
556 &self.root_schema
557 }
558 pub(crate) fn get_names(&self) -> &Names {
559 &self.names
560 }
561}
562
563pub(crate) fn resolve_names(
564 schema: &Schema,
565 names: &mut Names,
566 enclosing_namespace: &Namespace,
567) -> AvroResult<()> {
568 match schema {
569 Schema::Array(schema) => resolve_names(&schema.items, names, enclosing_namespace),
570 Schema::Map(schema) => resolve_names(&schema.types, names, enclosing_namespace),
571 Schema::Union(UnionSchema { schemas, .. }) => {
572 for schema in schemas {
573 resolve_names(schema, names, enclosing_namespace)?
574 }
575 Ok(())
576 }
577 Schema::Enum(EnumSchema { name, .. }) | Schema::Fixed(FixedSchema { name, .. }) => {
578 let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
579 if names
580 .insert(fully_qualified_name.clone(), schema.clone())
581 .is_some()
582 {
583 Err(Error::AmbiguousSchemaDefinition(fully_qualified_name))
584 } else {
585 Ok(())
586 }
587 }
588 Schema::Record(RecordSchema { name, fields, .. }) => {
589 let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
590 if names
591 .insert(fully_qualified_name.clone(), schema.clone())
592 .is_some()
593 {
594 Err(Error::AmbiguousSchemaDefinition(fully_qualified_name))
595 } else {
596 let record_namespace = fully_qualified_name.namespace;
597 for field in fields {
598 resolve_names(&field.schema, names, &record_namespace)?
599 }
600 Ok(())
601 }
602 }
603 Schema::Ref { name } => {
604 let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
605 names
606 .get(&fully_qualified_name)
607 .map(|_| ())
608 .ok_or(Error::SchemaResolutionError(fully_qualified_name))
609 }
610 _ => Ok(()),
611 }
612}
613
614pub(crate) fn resolve_names_with_schemata(
615 schemata: &Vec<&Schema>,
616 names: &mut Names,
617 enclosing_namespace: &Namespace,
618) -> AvroResult<()> {
619 for schema in schemata {
620 resolve_names(schema, names, enclosing_namespace)?;
621 }
622 Ok(())
623}
624
625#[derive(bon::Builder, Clone, Debug, PartialEq)]
627pub struct RecordField {
628 pub name: String,
630 #[builder(default)]
632 pub doc: Documentation,
633 pub aliases: Option<Vec<String>>,
635 pub default: Option<Value>,
639 pub schema: Schema,
641 #[builder(default = RecordFieldOrder::Ignore)]
645 pub order: RecordFieldOrder,
646 #[builder(default)]
648 pub position: usize,
649 #[builder(default = BTreeMap::new())]
651 pub custom_attributes: BTreeMap<String, Value>,
652}
653
654#[derive(Clone, Debug, Eq, PartialEq, EnumString)]
656#[strum(serialize_all = "kebab_case")]
657pub enum RecordFieldOrder {
658 Ascending,
659 Descending,
660 Ignore,
661}
662
663impl RecordField {
664 fn parse(
666 field: &Map<String, Value>,
667 position: usize,
668 parser: &mut Parser,
669 enclosing_record: &Name,
670 ) -> AvroResult<Self> {
671 let name = field.name().ok_or(Error::GetNameFieldFromRecord)?;
672
673 validate_record_field_name(&name)?;
674
675 let schema = parser.parse_complex(
677 field,
678 &enclosing_record.namespace,
679 RecordSchemaParseLocation::FromField,
680 )?;
681
682 let default = field.get("default").cloned();
683 Self::resolve_default_value(
684 &schema,
685 &name,
686 &enclosing_record.fullname(None),
687 &parser.parsed_schemas,
688 &default,
689 )?;
690
691 let aliases = field.get("aliases").and_then(|aliases| {
692 aliases.as_array().map(|aliases| {
693 aliases
694 .iter()
695 .flat_map(|alias| alias.as_str())
696 .map(|alias| alias.to_string())
697 .collect::<Vec<String>>()
698 })
699 });
700
701 let order = field
702 .get("order")
703 .and_then(|order| order.as_str())
704 .and_then(|order| RecordFieldOrder::from_str(order).ok())
705 .unwrap_or(RecordFieldOrder::Ascending);
706
707 Ok(RecordField {
708 name,
709 doc: field.doc(),
710 default,
711 aliases,
712 order,
713 position,
714 custom_attributes: RecordField::get_field_custom_attributes(field, &schema),
715 schema,
716 })
717 }
718
719 fn resolve_default_value(
720 field_schema: &Schema,
721 field_name: &str,
722 record_name: &str,
723 names: &Names,
724 default: &Option<Value>,
725 ) -> AvroResult<()> {
726 if let Some(value) = default {
727 let avro_value = types::Value::from(value.clone());
728 match field_schema {
729 Schema::Union(union_schema) => {
730 let schemas = &union_schema.schemas;
731 let resolved = schemas.iter().any(|schema| {
732 avro_value
733 .to_owned()
734 .resolve_internal(schema, names, &schema.namespace(), &None)
735 .is_ok()
736 });
737
738 if !resolved {
739 let schema: Option<&Schema> = schemas.first();
740 return match schema {
741 Some(first_schema) => Err(Error::GetDefaultUnion(
742 SchemaKind::from(first_schema),
743 types::ValueKind::from(avro_value),
744 )),
745 None => Err(Error::EmptyUnion),
746 };
747 }
748 }
749 _ => {
750 let resolved = avro_value
751 .resolve_internal(field_schema, names, &field_schema.namespace(), &None)
752 .is_ok();
753
754 if !resolved {
755 return Err(Error::GetDefaultRecordField(
756 field_name.to_string(),
757 record_name.to_string(),
758 field_schema.canonical_form(),
759 ));
760 }
761 }
762 };
763 }
764
765 Ok(())
766 }
767
768 fn get_field_custom_attributes(
769 field: &Map<String, Value>,
770 schema: &Schema,
771 ) -> BTreeMap<String, Value> {
772 let mut custom_attributes: BTreeMap<String, Value> = BTreeMap::new();
773 for (key, value) in field {
774 match key.as_str() {
775 "type" | "name" | "doc" | "default" | "order" | "position" | "aliases"
776 | "logicalType" => continue,
777 key if key == "symbols" && matches!(schema, Schema::Enum(_)) => continue,
778 key if key == "size" && matches!(schema, Schema::Fixed(_)) => continue,
779 _ => custom_attributes.insert(key.clone(), value.clone()),
780 };
781 }
782 custom_attributes
783 }
784
785 pub fn is_nullable(&self) -> bool {
787 match self.schema {
788 Schema::Union(ref inner) => inner.is_nullable(),
789 _ => false,
790 }
791 }
792}
793
794#[derive(bon::Builder, Debug, Clone)]
796pub struct RecordSchema {
797 pub name: Name,
799 #[builder(default)]
801 pub aliases: Aliases,
802 #[builder(default)]
804 pub doc: Documentation,
805 pub fields: Vec<RecordField>,
807 pub lookup: BTreeMap<String, usize>,
810 #[builder(default = BTreeMap::new())]
812 pub attributes: BTreeMap<String, Value>,
813}
814
815#[derive(bon::Builder, Debug, Clone)]
817pub struct EnumSchema {
818 pub name: Name,
820 #[builder(default)]
822 pub aliases: Aliases,
823 #[builder(default)]
825 pub doc: Documentation,
826 pub symbols: Vec<String>,
828 pub default: Option<String>,
830 #[builder(default = BTreeMap::new())]
832 pub attributes: BTreeMap<String, Value>,
833}
834
835#[derive(bon::Builder, Debug, Clone)]
837pub struct FixedSchema {
838 pub name: Name,
840 #[builder(default)]
842 pub aliases: Aliases,
843 #[builder(default)]
845 pub doc: Documentation,
846 pub size: usize,
848 pub default: Option<String>,
850 #[builder(default = BTreeMap::new())]
852 pub attributes: BTreeMap<String, Value>,
853}
854
855impl FixedSchema {
856 fn serialize_to_map<S>(&self, mut map: S::SerializeMap) -> Result<S::SerializeMap, S::Error>
857 where
858 S: Serializer,
859 {
860 map.serialize_entry("type", "fixed")?;
861 if let Some(ref n) = self.name.namespace {
862 map.serialize_entry("namespace", n)?;
863 }
864 map.serialize_entry("name", &self.name.name)?;
865 if let Some(ref docstr) = self.doc {
866 map.serialize_entry("doc", docstr)?;
867 }
868 map.serialize_entry("size", &self.size)?;
869
870 if let Some(ref aliases) = self.aliases {
871 map.serialize_entry("aliases", aliases)?;
872 }
873
874 for attr in &self.attributes {
875 map.serialize_entry(attr.0, attr.1)?;
876 }
877
878 Ok(map)
879 }
880}
881
882#[derive(Debug, Clone)]
887pub struct DecimalSchema {
888 pub precision: DecimalMetadata,
890 pub scale: DecimalMetadata,
892 pub inner: Box<Schema>,
894}
895
896#[derive(Debug, Clone)]
898pub struct UnionSchema {
899 pub(crate) schemas: Vec<Schema>,
901 variant_index: BTreeMap<SchemaKind, usize>,
906}
907
908impl UnionSchema {
909 pub fn new(schemas: Vec<Schema>) -> AvroResult<Self> {
911 let mut vindex = BTreeMap::new();
912 for (i, schema) in schemas.iter().enumerate() {
913 if let Schema::Union(_) = schema {
914 return Err(Error::GetNestedUnion);
915 }
916 let kind = SchemaKind::from(schema);
917 if !kind.is_named() && vindex.insert(kind, i).is_some() {
918 return Err(Error::GetUnionDuplicate);
919 }
920 }
921 Ok(UnionSchema {
922 schemas,
923 variant_index: vindex,
924 })
925 }
926
927 pub fn variants(&self) -> &[Schema] {
929 &self.schemas
930 }
931
932 pub fn is_nullable(&self) -> bool {
934 self.schemas.iter().any(|x| matches!(x, Schema::Null))
935 }
936
937 #[deprecated(
940 since = "0.15.0",
941 note = "Please use `find_schema_with_known_schemata` instead"
942 )]
943 pub fn find_schema(&self, value: &types::Value) -> Option<(usize, &Schema)> {
944 self.find_schema_with_known_schemata::<Schema>(value, None, &None)
945 }
946
947 pub fn find_schema_with_known_schemata<S: Borrow<Schema> + Debug>(
953 &self,
954 value: &types::Value,
955 known_schemata: Option<&HashMap<Name, S>>,
956 enclosing_namespace: &Namespace,
957 ) -> Option<(usize, &Schema)> {
958 let schema_kind = SchemaKind::from(value);
959 if let Some(&i) = self.variant_index.get(&schema_kind) {
960 Some((i, &self.schemas[i]))
962 } else {
963 let mut collected_names: HashMap<Name, &Schema> = known_schemata
967 .map(|names| {
968 names
969 .iter()
970 .map(|(name, schema)| (name.clone(), schema.borrow()))
971 .collect()
972 })
973 .unwrap_or_default();
974
975 self.schemas.iter().enumerate().find(|(_, schema)| {
976 let resolved_schema = ResolvedSchema::new_with_known_schemata(
977 vec![*schema],
978 enclosing_namespace,
979 &collected_names,
980 )
981 .expect("Schema didn't successfully parse");
982 let resolved_names = resolved_schema.names_ref;
983
984 collected_names.extend(resolved_names);
986 let namespace = &schema.namespace().or_else(|| enclosing_namespace.clone());
987
988 value
989 .clone()
990 .resolve_internal(schema, &collected_names, namespace, &None)
991 .is_ok()
992 })
993 }
994 }
995}
996
997impl PartialEq for UnionSchema {
999 fn eq(&self, other: &UnionSchema) -> bool {
1000 self.schemas.eq(&other.schemas)
1001 }
1002}
1003
1004type DecimalMetadata = usize;
1005pub(crate) type Precision = DecimalMetadata;
1006pub(crate) type Scale = DecimalMetadata;
1007
1008fn parse_json_integer_for_decimal(value: &serde_json::Number) -> Result<DecimalMetadata, Error> {
1009 Ok(if value.is_u64() {
1010 let num = value
1011 .as_u64()
1012 .ok_or_else(|| Error::GetU64FromJson(value.clone()))?;
1013 num.try_into()
1014 .map_err(|e| Error::ConvertU64ToUsize(e, num))?
1015 } else if value.is_i64() {
1016 let num = value
1017 .as_i64()
1018 .ok_or_else(|| Error::GetI64FromJson(value.clone()))?;
1019 num.try_into()
1020 .map_err(|e| Error::ConvertI64ToUsize(e, num))?
1021 } else {
1022 return Err(Error::GetPrecisionOrScaleFromJson(value.clone()));
1023 })
1024}
1025
1026#[derive(Debug, Default)]
1027enum RecordSchemaParseLocation {
1028 #[default]
1030 Root,
1031
1032 FromField,
1034}
1035
1036#[derive(Default)]
1037struct Parser {
1038 input_schemas: HashMap<Name, Value>,
1039 resolving_schemas: Names,
1043 input_order: Vec<Name>,
1044 parsed_schemas: Names,
1047}
1048
1049impl Schema {
1050 pub fn canonical_form(&self) -> String {
1055 let json = serde_json::to_value(self)
1056 .unwrap_or_else(|e| panic!("Cannot parse Schema from JSON: {e}"));
1057 let mut defined_names = HashSet::new();
1058 parsing_canonical_form(&json, &mut defined_names)
1059 }
1060
1061 pub fn independent_canonical_form(&self, schemata: &Vec<Schema>) -> Result<String, Error> {
1067 let mut this = self.clone();
1068 this.denormalize(schemata)?;
1069 Ok(this.canonical_form())
1070 }
1071
1072 pub fn fingerprint<D: Digest>(&self) -> SchemaFingerprint {
1079 let mut d = D::new();
1080 d.update(self.canonical_form());
1081 SchemaFingerprint {
1082 bytes: d.finalize().to_vec(),
1083 }
1084 }
1085
1086 pub fn parse_str(input: &str) -> Result<Schema, Error> {
1088 let mut parser = Parser::default();
1089 parser.parse_str(input)
1090 }
1091
1092 pub fn parse_list(input: &[&str]) -> AvroResult<Vec<Schema>> {
1100 let mut input_schemas: HashMap<Name, Value> = HashMap::with_capacity(input.len());
1101 let mut input_order: Vec<Name> = Vec::with_capacity(input.len());
1102 for js in input {
1103 let schema: Value = serde_json::from_str(js).map_err(Error::ParseSchemaJson)?;
1104 if let Value::Object(inner) = &schema {
1105 let name = Name::parse(inner, &None)?;
1106 let previous_value = input_schemas.insert(name.clone(), schema);
1107 if previous_value.is_some() {
1108 return Err(Error::NameCollision(name.fullname(None)));
1109 }
1110 input_order.push(name);
1111 } else {
1112 return Err(Error::GetNameField);
1113 }
1114 }
1115 let mut parser = Parser {
1116 input_schemas,
1117 resolving_schemas: HashMap::default(),
1118 input_order,
1119 parsed_schemas: HashMap::with_capacity(input.len()),
1120 };
1121 parser.parse_list()
1122 }
1123
1124 pub fn parse_str_with_list(
1137 schema: &str,
1138 schemata: &[&str],
1139 ) -> AvroResult<(Schema, Vec<Schema>)> {
1140 let mut input_schemas: HashMap<Name, Value> = HashMap::with_capacity(schemata.len());
1141 let mut input_order: Vec<Name> = Vec::with_capacity(schemata.len());
1142 for json in schemata {
1143 let schema: Value = serde_json::from_str(json).map_err(Error::ParseSchemaJson)?;
1144 if let Value::Object(inner) = &schema {
1145 let name = Name::parse(inner, &None)?;
1146 if let Some(_previous) = input_schemas.insert(name.clone(), schema) {
1147 return Err(Error::NameCollision(name.fullname(None)));
1148 }
1149 input_order.push(name);
1150 } else {
1151 return Err(Error::GetNameField);
1152 }
1153 }
1154 let mut parser = Parser {
1155 input_schemas,
1156 resolving_schemas: HashMap::default(),
1157 input_order,
1158 parsed_schemas: HashMap::with_capacity(schemata.len()),
1159 };
1160 parser.parse_input_schemas()?;
1161
1162 let value = serde_json::from_str(schema).map_err(Error::ParseSchemaJson)?;
1163 let schema = parser.parse(&value, &None)?;
1164 let schemata = parser.parse_list()?;
1165 Ok((schema, schemata))
1166 }
1167
1168 pub fn parse_reader(reader: &mut (impl Read + ?Sized)) -> AvroResult<Schema> {
1170 let mut buf = String::new();
1171 match reader.read_to_string(&mut buf) {
1172 Ok(_) => Self::parse_str(&buf),
1173 Err(e) => Err(Error::ReadSchemaFromReader(e)),
1174 }
1175 }
1176
1177 pub fn parse(value: &Value) -> AvroResult<Schema> {
1179 let mut parser = Parser::default();
1180 parser.parse(value, &None)
1181 }
1182
1183 pub(crate) fn parse_with_names(value: &Value, names: Names) -> AvroResult<Schema> {
1186 let mut parser = Parser {
1187 input_schemas: HashMap::with_capacity(1),
1188 resolving_schemas: Names::default(),
1189 input_order: Vec::with_capacity(1),
1190 parsed_schemas: names,
1191 };
1192 parser.parse(value, &None)
1193 }
1194
1195 pub fn custom_attributes(&self) -> Option<&BTreeMap<String, Value>> {
1197 match self {
1198 Schema::Record(RecordSchema { attributes, .. })
1199 | Schema::Enum(EnumSchema { attributes, .. })
1200 | Schema::Fixed(FixedSchema { attributes, .. })
1201 | Schema::Array(ArraySchema { attributes, .. })
1202 | Schema::Map(MapSchema { attributes, .. }) => Some(attributes),
1203 _ => None,
1204 }
1205 }
1206
1207 pub fn name(&self) -> Option<&Name> {
1209 match self {
1210 Schema::Ref { name, .. }
1211 | Schema::Record(RecordSchema { name, .. })
1212 | Schema::Enum(EnumSchema { name, .. })
1213 | Schema::Fixed(FixedSchema { name, .. }) => Some(name),
1214 _ => None,
1215 }
1216 }
1217
1218 pub fn namespace(&self) -> Namespace {
1220 self.name().and_then(|n| n.namespace.clone())
1221 }
1222
1223 pub fn aliases(&self) -> Option<&Vec<Alias>> {
1225 match self {
1226 Schema::Record(RecordSchema { aliases, .. })
1227 | Schema::Enum(EnumSchema { aliases, .. })
1228 | Schema::Fixed(FixedSchema { aliases, .. }) => aliases.as_ref(),
1229 _ => None,
1230 }
1231 }
1232
1233 pub fn doc(&self) -> Option<&String> {
1235 match self {
1236 Schema::Record(RecordSchema { doc, .. })
1237 | Schema::Enum(EnumSchema { doc, .. })
1238 | Schema::Fixed(FixedSchema { doc, .. }) => doc.as_ref(),
1239 _ => None,
1240 }
1241 }
1242
1243 pub fn map(types: Schema) -> Self {
1245 Schema::Map(MapSchema {
1246 types: Box::new(types),
1247 attributes: Default::default(),
1248 })
1249 }
1250
1251 pub fn map_with_attributes(types: Schema, attributes: BTreeMap<String, Value>) -> Self {
1253 Schema::Map(MapSchema {
1254 types: Box::new(types),
1255 attributes,
1256 })
1257 }
1258
1259 pub fn array(items: Schema) -> Self {
1261 Schema::Array(ArraySchema {
1262 items: Box::new(items),
1263 attributes: Default::default(),
1264 })
1265 }
1266
1267 pub fn array_with_attributes(items: Schema, attributes: BTreeMap<String, Value>) -> Self {
1269 Schema::Array(ArraySchema {
1270 items: Box::new(items),
1271 attributes,
1272 })
1273 }
1274
1275 fn denormalize(&mut self, schemata: &Vec<Schema>) -> AvroResult<()> {
1276 match self {
1277 Schema::Ref { name } => {
1278 let replacement_schema = schemata
1279 .iter()
1280 .find(|s| s.name().map(|n| *n == *name).unwrap_or(false));
1281 if let Some(schema) = replacement_schema {
1282 let mut denorm = schema.clone();
1283 denorm.denormalize(schemata)?;
1284 *self = denorm;
1285 } else {
1286 return Err(Error::SchemaResolutionError(name.clone()));
1287 }
1288 }
1289 Schema::Record(record_schema) => {
1290 for field in &mut record_schema.fields {
1291 field.schema.denormalize(schemata)?;
1292 }
1293 }
1294 Schema::Array(array_schema) => {
1295 array_schema.items.denormalize(schemata)?;
1296 }
1297 Schema::Map(map_schema) => {
1298 map_schema.types.denormalize(schemata)?;
1299 }
1300 Schema::Union(union_schema) => {
1301 for schema in &mut union_schema.schemas {
1302 schema.denormalize(schemata)?;
1303 }
1304 }
1305 _ => (),
1306 }
1307 Ok(())
1308 }
1309}
1310
1311impl Parser {
1312 fn parse_str(&mut self, input: &str) -> Result<Schema, Error> {
1314 let value = serde_json::from_str(input).map_err(Error::ParseSchemaJson)?;
1315 self.parse(&value, &None)
1316 }
1317
1318 fn parse_list(&mut self) -> Result<Vec<Schema>, Error> {
1321 self.parse_input_schemas()?;
1322
1323 let mut parsed_schemas = Vec::with_capacity(self.parsed_schemas.len());
1324 for name in self.input_order.drain(0..) {
1325 let parsed = self
1326 .parsed_schemas
1327 .remove(&name)
1328 .expect("One of the input schemas was unexpectedly not parsed");
1329 parsed_schemas.push(parsed);
1330 }
1331 Ok(parsed_schemas)
1332 }
1333
1334 fn parse_input_schemas(&mut self) -> Result<(), Error> {
1336 while !self.input_schemas.is_empty() {
1337 let next_name = self
1338 .input_schemas
1339 .keys()
1340 .next()
1341 .expect("Input schemas unexpectedly empty")
1342 .to_owned();
1343 let (name, value) = self
1344 .input_schemas
1345 .remove_entry(&next_name)
1346 .expect("Key unexpectedly missing");
1347 let parsed = self.parse(&value, &None)?;
1348 self.parsed_schemas
1349 .insert(get_schema_type_name(name, value), parsed);
1350 }
1351 Ok(())
1352 }
1353
1354 fn parse(&mut self, value: &Value, enclosing_namespace: &Namespace) -> AvroResult<Schema> {
1357 match *value {
1358 Value::String(ref t) => self.parse_known_schema(t.as_str(), enclosing_namespace),
1359 Value::Object(ref data) => {
1360 self.parse_complex(data, enclosing_namespace, RecordSchemaParseLocation::Root)
1361 }
1362 Value::Array(ref data) => self.parse_union(data, enclosing_namespace),
1363 _ => Err(Error::ParseSchemaFromValidJson),
1364 }
1365 }
1366
1367 fn parse_known_schema(
1371 &mut self,
1372 name: &str,
1373 enclosing_namespace: &Namespace,
1374 ) -> AvroResult<Schema> {
1375 match name {
1376 "null" => Ok(Schema::Null),
1377 "boolean" => Ok(Schema::Boolean),
1378 "int" => Ok(Schema::Int),
1379 "long" => Ok(Schema::Long),
1380 "double" => Ok(Schema::Double),
1381 "float" => Ok(Schema::Float),
1382 "bytes" => Ok(Schema::Bytes),
1383 "string" => Ok(Schema::String),
1384 _ => self.fetch_schema_ref(name, enclosing_namespace),
1385 }
1386 }
1387
1388 fn fetch_schema_ref(
1398 &mut self,
1399 name: &str,
1400 enclosing_namespace: &Namespace,
1401 ) -> AvroResult<Schema> {
1402 fn get_schema_ref(parsed: &Schema) -> Schema {
1403 match &parsed {
1404 Schema::Record(RecordSchema { ref name, .. })
1405 | Schema::Enum(EnumSchema { ref name, .. })
1406 | Schema::Fixed(FixedSchema { ref name, .. }) => Schema::Ref { name: name.clone() },
1407 _ => parsed.clone(),
1408 }
1409 }
1410
1411 let name = Name::new(name)?;
1412 let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
1413
1414 if self.parsed_schemas.contains_key(&fully_qualified_name) {
1415 return Ok(Schema::Ref {
1416 name: fully_qualified_name,
1417 });
1418 }
1419 if let Some(resolving_schema) = self.resolving_schemas.get(&fully_qualified_name) {
1420 return Ok(resolving_schema.clone());
1421 }
1422
1423 match name.name.as_str() {
1425 "record" | "enum" | "fixed" => {
1426 return Err(Error::InvalidSchemaRecord(name.to_string()));
1427 }
1428 _ => (),
1429 }
1430
1431 let value = self
1432 .input_schemas
1433 .remove(&fully_qualified_name)
1434 .ok_or_else(|| Error::ParsePrimitive(fully_qualified_name.fullname(None)))?;
1436
1437 let parsed = self.parse(&value, &None)?;
1439 self.parsed_schemas
1440 .insert(get_schema_type_name(name, value), parsed.clone());
1441
1442 Ok(get_schema_ref(&parsed))
1443 }
1444
1445 fn parse_precision_and_scale(
1446 complex: &Map<String, Value>,
1447 ) -> Result<(Precision, Scale), Error> {
1448 fn get_decimal_integer(
1449 complex: &Map<String, Value>,
1450 key: &'static str,
1451 ) -> Result<DecimalMetadata, Error> {
1452 match complex.get(key) {
1453 Some(Value::Number(value)) => parse_json_integer_for_decimal(value),
1454 None => {
1455 if key == "scale" {
1456 Ok(0)
1457 } else {
1458 Err(Error::GetDecimalMetadataFromJson(key))
1459 }
1460 }
1461 Some(value) => Err(Error::GetDecimalMetadataValueFromJson {
1462 key: key.into(),
1463 value: value.clone(),
1464 }),
1465 }
1466 }
1467 let precision = get_decimal_integer(complex, "precision")?;
1468 let scale = get_decimal_integer(complex, "scale")?;
1469
1470 if precision < 1 {
1471 return Err(Error::DecimalPrecisionMuBePositive { precision });
1472 }
1473
1474 if precision < scale {
1475 Err(Error::DecimalPrecisionLessThanScale { precision, scale })
1476 } else {
1477 Ok((precision, scale))
1478 }
1479 }
1480
1481 fn parse_complex(
1487 &mut self,
1488 complex: &Map<String, Value>,
1489 enclosing_namespace: &Namespace,
1490 parse_location: RecordSchemaParseLocation,
1491 ) -> AvroResult<Schema> {
1492 fn parse_as_native_complex(
1494 complex: &Map<String, Value>,
1495 parser: &mut Parser,
1496 enclosing_namespace: &Namespace,
1497 ) -> AvroResult<Schema> {
1498 match complex.get("type") {
1499 Some(value) => match value {
1500 Value::String(s) if s == "fixed" => {
1501 parser.parse_fixed(complex, enclosing_namespace)
1502 }
1503 _ => parser.parse(value, enclosing_namespace),
1504 },
1505 None => Err(Error::GetLogicalTypeField),
1506 }
1507 }
1508
1509 fn try_convert_to_logical_type<F>(
1516 logical_type: &str,
1517 schema: Schema,
1518 supported_schema_kinds: &[SchemaKind],
1519 convert: F,
1520 ) -> AvroResult<Schema>
1521 where
1522 F: Fn(Schema) -> AvroResult<Schema>,
1523 {
1524 let kind = SchemaKind::from(schema.clone());
1525 if supported_schema_kinds.contains(&kind) {
1526 convert(schema)
1527 } else {
1528 warn!(
1529 "Ignoring unknown logical type '{}' for schema of type: {:?}!",
1530 logical_type, schema
1531 );
1532 Ok(schema)
1533 }
1534 }
1535
1536 match complex.get("logicalType") {
1537 Some(Value::String(t)) => match t.as_str() {
1538 "decimal" => {
1539 return try_convert_to_logical_type(
1540 "decimal",
1541 parse_as_native_complex(complex, self, enclosing_namespace)?,
1542 &[SchemaKind::Fixed, SchemaKind::Bytes],
1543 |inner| -> AvroResult<Schema> {
1544 match Self::parse_precision_and_scale(complex) {
1545 Ok((precision, scale)) => Ok(Schema::Decimal(DecimalSchema {
1546 precision,
1547 scale,
1548 inner: Box::new(inner),
1549 })),
1550 Err(err) => {
1551 warn!("Ignoring invalid decimal logical type: {}", err);
1552 Ok(inner)
1553 }
1554 }
1555 },
1556 );
1557 }
1558 "big-decimal" => {
1559 return try_convert_to_logical_type(
1560 "big-decimal",
1561 parse_as_native_complex(complex, self, enclosing_namespace)?,
1562 &[SchemaKind::Bytes],
1563 |_| -> AvroResult<Schema> { Ok(Schema::BigDecimal) },
1564 );
1565 }
1566 "uuid" => {
1567 return try_convert_to_logical_type(
1568 "uuid",
1569 parse_as_native_complex(complex, self, enclosing_namespace)?,
1570 &[SchemaKind::String, SchemaKind::Fixed],
1571 |schema| match schema {
1572 Schema::String => Ok(Schema::Uuid),
1573 Schema::Fixed(FixedSchema { size: 16, .. }) => Ok(Schema::Uuid),
1574 Schema::Fixed(FixedSchema { size, .. }) => {
1575 warn!("Ignoring uuid logical type for a Fixed schema because its size ({size:?}) is not 16! Schema: {:?}", schema);
1576 Ok(schema)
1577 }
1578 _ => {
1579 warn!(
1580 "Ignoring invalid uuid logical type for schema: {:?}",
1581 schema
1582 );
1583 Ok(schema)
1584 }
1585 },
1586 );
1587 }
1588 "date" => {
1589 return try_convert_to_logical_type(
1590 "date",
1591 parse_as_native_complex(complex, self, enclosing_namespace)?,
1592 &[SchemaKind::Int],
1593 |_| -> AvroResult<Schema> { Ok(Schema::Date) },
1594 );
1595 }
1596 "time-millis" => {
1597 return try_convert_to_logical_type(
1598 "date",
1599 parse_as_native_complex(complex, self, enclosing_namespace)?,
1600 &[SchemaKind::Int],
1601 |_| -> AvroResult<Schema> { Ok(Schema::TimeMillis) },
1602 );
1603 }
1604 "time-micros" => {
1605 return try_convert_to_logical_type(
1606 "time-micros",
1607 parse_as_native_complex(complex, self, enclosing_namespace)?,
1608 &[SchemaKind::Long],
1609 |_| -> AvroResult<Schema> { Ok(Schema::TimeMicros) },
1610 );
1611 }
1612 "timestamp-millis" => {
1613 return try_convert_to_logical_type(
1614 "timestamp-millis",
1615 parse_as_native_complex(complex, self, enclosing_namespace)?,
1616 &[SchemaKind::Long],
1617 |_| -> AvroResult<Schema> { Ok(Schema::TimestampMillis) },
1618 );
1619 }
1620 "timestamp-micros" => {
1621 return try_convert_to_logical_type(
1622 "timestamp-micros",
1623 parse_as_native_complex(complex, self, enclosing_namespace)?,
1624 &[SchemaKind::Long],
1625 |_| -> AvroResult<Schema> { Ok(Schema::TimestampMicros) },
1626 );
1627 }
1628 "timestamp-nanos" => {
1629 return try_convert_to_logical_type(
1630 "timestamp-nanos",
1631 parse_as_native_complex(complex, self, enclosing_namespace)?,
1632 &[SchemaKind::Long],
1633 |_| -> AvroResult<Schema> { Ok(Schema::TimestampNanos) },
1634 );
1635 }
1636 "local-timestamp-millis" => {
1637 return try_convert_to_logical_type(
1638 "local-timestamp-millis",
1639 parse_as_native_complex(complex, self, enclosing_namespace)?,
1640 &[SchemaKind::Long],
1641 |_| -> AvroResult<Schema> { Ok(Schema::LocalTimestampMillis) },
1642 );
1643 }
1644 "local-timestamp-micros" => {
1645 return try_convert_to_logical_type(
1646 "local-timestamp-micros",
1647 parse_as_native_complex(complex, self, enclosing_namespace)?,
1648 &[SchemaKind::Long],
1649 |_| -> AvroResult<Schema> { Ok(Schema::LocalTimestampMicros) },
1650 );
1651 }
1652 "local-timestamp-nanos" => {
1653 return try_convert_to_logical_type(
1654 "local-timestamp-nanos",
1655 parse_as_native_complex(complex, self, enclosing_namespace)?,
1656 &[SchemaKind::Long],
1657 |_| -> AvroResult<Schema> { Ok(Schema::LocalTimestampNanos) },
1658 );
1659 }
1660 "duration" => {
1661 return try_convert_to_logical_type(
1662 "duration",
1663 parse_as_native_complex(complex, self, enclosing_namespace)?,
1664 &[SchemaKind::Fixed],
1665 |_| -> AvroResult<Schema> { Ok(Schema::Duration) },
1666 );
1667 }
1668 _ => {}
1671 },
1672 Some(value) => return Err(Error::GetLogicalTypeFieldType(value.clone())),
1676 _ => {}
1677 }
1678 match complex.get("type") {
1679 Some(Value::String(t)) => match t.as_str() {
1680 "record" => match parse_location {
1681 RecordSchemaParseLocation::Root => {
1682 self.parse_record(complex, enclosing_namespace)
1683 }
1684 RecordSchemaParseLocation::FromField => {
1685 self.fetch_schema_ref(t, enclosing_namespace)
1686 }
1687 },
1688 "enum" => self.parse_enum(complex, enclosing_namespace),
1689 "array" => self.parse_array(complex, enclosing_namespace),
1690 "map" => self.parse_map(complex, enclosing_namespace),
1691 "fixed" => self.parse_fixed(complex, enclosing_namespace),
1692 other => self.parse_known_schema(other, enclosing_namespace),
1693 },
1694 Some(Value::Object(data)) => {
1695 self.parse_complex(data, enclosing_namespace, RecordSchemaParseLocation::Root)
1696 }
1697 Some(Value::Array(variants)) => self.parse_union(variants, enclosing_namespace),
1698 Some(unknown) => Err(Error::GetComplexType(unknown.clone())),
1699 None => Err(Error::GetComplexTypeField),
1700 }
1701 }
1702
1703 fn register_resolving_schema(&mut self, name: &Name, aliases: &Aliases) {
1704 let resolving_schema = Schema::Ref { name: name.clone() };
1705 self.resolving_schemas
1706 .insert(name.clone(), resolving_schema.clone());
1707
1708 let namespace = &name.namespace;
1709
1710 if let Some(ref aliases) = aliases {
1711 aliases.iter().for_each(|alias| {
1712 let alias_fullname = alias.fully_qualified_name(namespace);
1713 self.resolving_schemas
1714 .insert(alias_fullname, resolving_schema.clone());
1715 });
1716 }
1717 }
1718
1719 fn register_parsed_schema(
1720 &mut self,
1721 fully_qualified_name: &Name,
1722 schema: &Schema,
1723 aliases: &Aliases,
1724 ) {
1725 self.parsed_schemas
1728 .insert(fully_qualified_name.clone(), schema.clone());
1729 self.resolving_schemas.remove(fully_qualified_name);
1730
1731 let namespace = &fully_qualified_name.namespace;
1732
1733 if let Some(ref aliases) = aliases {
1734 aliases.iter().for_each(|alias| {
1735 let alias_fullname = alias.fully_qualified_name(namespace);
1736 self.resolving_schemas.remove(&alias_fullname);
1737 self.parsed_schemas.insert(alias_fullname, schema.clone());
1738 });
1739 }
1740 }
1741
1742 fn get_already_seen_schema(
1744 &self,
1745 complex: &Map<String, Value>,
1746 enclosing_namespace: &Namespace,
1747 ) -> Option<&Schema> {
1748 match complex.get("type") {
1749 Some(Value::String(ref typ)) => {
1750 let name = Name::new(typ.as_str())
1751 .unwrap()
1752 .fully_qualified_name(enclosing_namespace);
1753 self.resolving_schemas
1754 .get(&name)
1755 .or_else(|| self.parsed_schemas.get(&name))
1756 }
1757 _ => None,
1758 }
1759 }
1760
1761 fn parse_record(
1764 &mut self,
1765 complex: &Map<String, Value>,
1766 enclosing_namespace: &Namespace,
1767 ) -> AvroResult<Schema> {
1768 let fields_opt = complex.get("fields");
1769
1770 if fields_opt.is_none() {
1771 if let Some(seen) = self.get_already_seen_schema(complex, enclosing_namespace) {
1772 return Ok(seen.clone());
1773 }
1774 }
1775
1776 let fully_qualified_name = Name::parse(complex, enclosing_namespace)?;
1777 let aliases = fix_aliases_namespace(complex.aliases(), &fully_qualified_name.namespace);
1778
1779 let mut lookup = BTreeMap::new();
1780
1781 self.register_resolving_schema(&fully_qualified_name, &aliases);
1782
1783 debug!("Going to parse record schema: {:?}", &fully_qualified_name);
1784
1785 let fields: Vec<RecordField> = fields_opt
1786 .and_then(|fields| fields.as_array())
1787 .ok_or(Error::GetRecordFieldsJson)
1788 .and_then(|fields| {
1789 fields
1790 .iter()
1791 .filter_map(|field| field.as_object())
1792 .enumerate()
1793 .map(|(position, field)| {
1794 RecordField::parse(field, position, self, &fully_qualified_name)
1795 })
1796 .collect::<Result<_, _>>()
1797 })?;
1798
1799 for field in &fields {
1800 if let Some(_old) = lookup.insert(field.name.clone(), field.position) {
1801 return Err(Error::FieldNameDuplicate(field.name.clone()));
1802 }
1803
1804 if let Some(ref field_aliases) = field.aliases {
1805 for alias in field_aliases {
1806 lookup.insert(alias.clone(), field.position);
1807 }
1808 }
1809 }
1810
1811 let schema = Schema::Record(RecordSchema {
1812 name: fully_qualified_name.clone(),
1813 aliases: aliases.clone(),
1814 doc: complex.doc(),
1815 fields,
1816 lookup,
1817 attributes: self.get_custom_attributes(complex, vec!["fields"]),
1818 });
1819
1820 self.register_parsed_schema(&fully_qualified_name, &schema, &aliases);
1821 Ok(schema)
1822 }
1823
1824 fn get_custom_attributes(
1825 &self,
1826 complex: &Map<String, Value>,
1827 excluded: Vec<&'static str>,
1828 ) -> BTreeMap<String, Value> {
1829 let mut custom_attributes: BTreeMap<String, Value> = BTreeMap::new();
1830 for (key, value) in complex {
1831 match key.as_str() {
1832 "type" | "name" | "namespace" | "doc" | "aliases" => continue,
1833 candidate if excluded.contains(&candidate) => continue,
1834 _ => custom_attributes.insert(key.clone(), value.clone()),
1835 };
1836 }
1837 custom_attributes
1838 }
1839
1840 fn parse_enum(
1843 &mut self,
1844 complex: &Map<String, Value>,
1845 enclosing_namespace: &Namespace,
1846 ) -> AvroResult<Schema> {
1847 let symbols_opt = complex.get("symbols");
1848
1849 if symbols_opt.is_none() {
1850 if let Some(seen) = self.get_already_seen_schema(complex, enclosing_namespace) {
1851 return Ok(seen.clone());
1852 }
1853 }
1854
1855 let fully_qualified_name = Name::parse(complex, enclosing_namespace)?;
1856 let aliases = fix_aliases_namespace(complex.aliases(), &fully_qualified_name.namespace);
1857
1858 let symbols: Vec<String> = symbols_opt
1859 .and_then(|v| v.as_array())
1860 .ok_or(Error::GetEnumSymbolsField)
1861 .and_then(|symbols| {
1862 symbols
1863 .iter()
1864 .map(|symbol| symbol.as_str().map(|s| s.to_string()))
1865 .collect::<Option<_>>()
1866 .ok_or(Error::GetEnumSymbols)
1867 })?;
1868
1869 let mut existing_symbols: HashSet<&String> = HashSet::with_capacity(symbols.len());
1870 for symbol in symbols.iter() {
1871 validate_enum_symbol_name(symbol)?;
1872
1873 if existing_symbols.contains(&symbol) {
1875 return Err(Error::EnumSymbolDuplicate(symbol.to_string()));
1876 }
1877
1878 existing_symbols.insert(symbol);
1879 }
1880
1881 let mut default: Option<String> = None;
1882 if let Some(value) = complex.get("default") {
1883 if let Value::String(ref s) = *value {
1884 default = Some(s.clone());
1885 } else {
1886 return Err(Error::EnumDefaultWrongType(value.clone()));
1887 }
1888 }
1889
1890 if let Some(ref value) = default {
1891 let resolved = types::Value::from(value.clone())
1892 .resolve_enum(&symbols, &Some(value.to_string()), &None)
1893 .is_ok();
1894 if !resolved {
1895 return Err(Error::GetEnumDefault {
1896 symbol: value.to_string(),
1897 symbols,
1898 });
1899 }
1900 }
1901
1902 let schema = Schema::Enum(EnumSchema {
1903 name: fully_qualified_name.clone(),
1904 aliases: aliases.clone(),
1905 doc: complex.doc(),
1906 symbols,
1907 default,
1908 attributes: self.get_custom_attributes(complex, vec!["symbols"]),
1909 });
1910
1911 self.register_parsed_schema(&fully_qualified_name, &schema, &aliases);
1912
1913 Ok(schema)
1914 }
1915
1916 fn parse_array(
1919 &mut self,
1920 complex: &Map<String, Value>,
1921 enclosing_namespace: &Namespace,
1922 ) -> AvroResult<Schema> {
1923 complex
1924 .get("items")
1925 .ok_or(Error::GetArrayItemsField)
1926 .and_then(|items| self.parse(items, enclosing_namespace))
1927 .map(|items| {
1928 Schema::array_with_attributes(
1929 items,
1930 self.get_custom_attributes(complex, vec!["items"]),
1931 )
1932 })
1933 }
1934
1935 fn parse_map(
1938 &mut self,
1939 complex: &Map<String, Value>,
1940 enclosing_namespace: &Namespace,
1941 ) -> AvroResult<Schema> {
1942 complex
1943 .get("values")
1944 .ok_or(Error::GetMapValuesField)
1945 .and_then(|items| self.parse(items, enclosing_namespace))
1946 .map(|items| {
1947 Schema::map_with_attributes(
1948 items,
1949 self.get_custom_attributes(complex, vec!["values"]),
1950 )
1951 })
1952 }
1953
1954 fn parse_union(
1957 &mut self,
1958 items: &[Value],
1959 enclosing_namespace: &Namespace,
1960 ) -> AvroResult<Schema> {
1961 items
1962 .iter()
1963 .map(|v| self.parse(v, enclosing_namespace))
1964 .collect::<Result<Vec<_>, _>>()
1965 .and_then(|schemas| {
1966 if schemas.is_empty() {
1967 error!(
1968 "Union schemas should have at least two members! \
1969 Please enable debug logging to find out which Record schema \
1970 declares the union with 'RUST_LOG=apache_avro::schema=debug'."
1971 );
1972 } else if schemas.len() == 1 {
1973 warn!(
1974 "Union schema with just one member! Consider dropping the union! \
1975 Please enable debug logging to find out which Record schema \
1976 declares the union with 'RUST_LOG=apache_avro::schema=debug'."
1977 );
1978 }
1979 Ok(Schema::Union(UnionSchema::new(schemas)?))
1980 })
1981 }
1982
1983 fn parse_fixed(
1986 &mut self,
1987 complex: &Map<String, Value>,
1988 enclosing_namespace: &Namespace,
1989 ) -> AvroResult<Schema> {
1990 let size_opt = complex.get("size");
1991 if size_opt.is_none() {
1992 if let Some(seen) = self.get_already_seen_schema(complex, enclosing_namespace) {
1993 return Ok(seen.clone());
1994 }
1995 }
1996
1997 let doc = complex.get("doc").and_then(|v| match &v {
1998 Value::String(ref docstr) => Some(docstr.clone()),
1999 _ => None,
2000 });
2001
2002 let size = match size_opt {
2003 Some(size) => size
2004 .as_u64()
2005 .ok_or_else(|| Error::GetFixedSizeFieldPositive(size.clone())),
2006 None => Err(Error::GetFixedSizeField),
2007 }?;
2008
2009 let default = complex.get("default").and_then(|v| match &v {
2010 Value::String(ref default) => Some(default.clone()),
2011 _ => None,
2012 });
2013
2014 if default.is_some() {
2015 let len = default.clone().unwrap().len();
2016 if len != size as usize {
2017 return Err(Error::FixedDefaultLenSizeMismatch(len, size));
2018 }
2019 }
2020
2021 let fully_qualified_name = Name::parse(complex, enclosing_namespace)?;
2022 let aliases = fix_aliases_namespace(complex.aliases(), &fully_qualified_name.namespace);
2023
2024 let schema = Schema::Fixed(FixedSchema {
2025 name: fully_qualified_name.clone(),
2026 aliases: aliases.clone(),
2027 doc,
2028 size: size as usize,
2029 default,
2030 attributes: self.get_custom_attributes(complex, vec!["size"]),
2031 });
2032
2033 self.register_parsed_schema(&fully_qualified_name, &schema, &aliases);
2034
2035 Ok(schema)
2036 }
2037}
2038
2039fn fix_aliases_namespace(aliases: Option<Vec<String>>, namespace: &Namespace) -> Aliases {
2045 aliases.map(|aliases| {
2046 aliases
2047 .iter()
2048 .map(|alias| {
2049 if alias.find('.').is_none() {
2050 match namespace {
2051 Some(ref ns) => format!("{ns}.{alias}"),
2052 None => alias.clone(),
2053 }
2054 } else {
2055 alias.clone()
2056 }
2057 })
2058 .map(|alias| Alias::new(alias.as_str()).unwrap())
2059 .collect()
2060 })
2061}
2062
2063fn get_schema_type_name(name: Name, value: Value) -> Name {
2064 match value.get("type") {
2065 Some(Value::Object(complex_type)) => match complex_type.name() {
2066 Some(name) => Name::new(name.as_str()).unwrap(),
2067 _ => name,
2068 },
2069 _ => name,
2070 }
2071}
2072
2073impl Serialize for Schema {
2074 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
2075 where
2076 S: Serializer,
2077 {
2078 match *self {
2079 Schema::Ref { ref name } => serializer.serialize_str(&name.fullname(None)),
2080 Schema::Null => serializer.serialize_str("null"),
2081 Schema::Boolean => serializer.serialize_str("boolean"),
2082 Schema::Int => serializer.serialize_str("int"),
2083 Schema::Long => serializer.serialize_str("long"),
2084 Schema::Float => serializer.serialize_str("float"),
2085 Schema::Double => serializer.serialize_str("double"),
2086 Schema::Bytes => serializer.serialize_str("bytes"),
2087 Schema::String => serializer.serialize_str("string"),
2088 Schema::Array(ref inner) => {
2089 let mut map = serializer.serialize_map(Some(2 + inner.attributes.len()))?;
2090 map.serialize_entry("type", "array")?;
2091 map.serialize_entry("items", &*inner.items.clone())?;
2092 for attr in &inner.attributes {
2093 map.serialize_entry(attr.0, attr.1)?;
2094 }
2095 map.end()
2096 }
2097 Schema::Map(ref inner) => {
2098 let mut map = serializer.serialize_map(Some(2 + inner.attributes.len()))?;
2099 map.serialize_entry("type", "map")?;
2100 map.serialize_entry("values", &*inner.types.clone())?;
2101 for attr in &inner.attributes {
2102 map.serialize_entry(attr.0, attr.1)?;
2103 }
2104 map.end()
2105 }
2106 Schema::Union(ref inner) => {
2107 let variants = inner.variants();
2108 let mut seq = serializer.serialize_seq(Some(variants.len()))?;
2109 for v in variants {
2110 seq.serialize_element(v)?;
2111 }
2112 seq.end()
2113 }
2114 Schema::Record(RecordSchema {
2115 ref name,
2116 ref aliases,
2117 ref doc,
2118 ref fields,
2119 ref attributes,
2120 ..
2121 }) => {
2122 let mut map = serializer.serialize_map(None)?;
2123 map.serialize_entry("type", "record")?;
2124 if let Some(ref n) = name.namespace {
2125 map.serialize_entry("namespace", n)?;
2126 }
2127 map.serialize_entry("name", &name.name)?;
2128 if let Some(ref docstr) = doc {
2129 map.serialize_entry("doc", docstr)?;
2130 }
2131 if let Some(ref aliases) = aliases {
2132 map.serialize_entry("aliases", aliases)?;
2133 }
2134 map.serialize_entry("fields", fields)?;
2135 for attr in attributes {
2136 map.serialize_entry(attr.0, attr.1)?;
2137 }
2138 map.end()
2139 }
2140 Schema::Enum(EnumSchema {
2141 ref name,
2142 ref symbols,
2143 ref aliases,
2144 ref attributes,
2145 ..
2146 }) => {
2147 let mut map = serializer.serialize_map(None)?;
2148 map.serialize_entry("type", "enum")?;
2149 if let Some(ref n) = name.namespace {
2150 map.serialize_entry("namespace", n)?;
2151 }
2152 map.serialize_entry("name", &name.name)?;
2153 map.serialize_entry("symbols", symbols)?;
2154
2155 if let Some(ref aliases) = aliases {
2156 map.serialize_entry("aliases", aliases)?;
2157 }
2158 for attr in attributes {
2159 map.serialize_entry(attr.0, attr.1)?;
2160 }
2161 map.end()
2162 }
2163 Schema::Fixed(ref fixed_schema) => {
2164 let mut map = serializer.serialize_map(None)?;
2165 map = fixed_schema.serialize_to_map::<S>(map)?;
2166 map.end()
2167 }
2168 Schema::Decimal(DecimalSchema {
2169 ref scale,
2170 ref precision,
2171 ref inner,
2172 }) => {
2173 let mut map = serializer.serialize_map(None)?;
2174 match inner.as_ref() {
2175 Schema::Fixed(fixed_schema) => {
2176 map = fixed_schema.serialize_to_map::<S>(map)?;
2177 }
2178 Schema::Bytes => {
2179 map.serialize_entry("type", "bytes")?;
2180 }
2181 others => {
2182 return Err(serde::ser::Error::custom(format!(
2183 "DecimalSchema inner type must be Fixed or Bytes, got {:?}",
2184 SchemaKind::from(others)
2185 )));
2186 }
2187 }
2188 map.serialize_entry("logicalType", "decimal")?;
2189 map.serialize_entry("scale", scale)?;
2190 map.serialize_entry("precision", precision)?;
2191 map.end()
2192 }
2193
2194 Schema::BigDecimal => {
2195 let mut map = serializer.serialize_map(None)?;
2196 map.serialize_entry("type", "bytes")?;
2197 map.serialize_entry("logicalType", "big-decimal")?;
2198 map.end()
2199 }
2200 Schema::Uuid => {
2201 let mut map = serializer.serialize_map(None)?;
2202 map.serialize_entry("type", "string")?;
2203 map.serialize_entry("logicalType", "uuid")?;
2204 map.end()
2205 }
2206 Schema::Date => {
2207 let mut map = serializer.serialize_map(None)?;
2208 map.serialize_entry("type", "int")?;
2209 map.serialize_entry("logicalType", "date")?;
2210 map.end()
2211 }
2212 Schema::TimeMillis => {
2213 let mut map = serializer.serialize_map(None)?;
2214 map.serialize_entry("type", "int")?;
2215 map.serialize_entry("logicalType", "time-millis")?;
2216 map.end()
2217 }
2218 Schema::TimeMicros => {
2219 let mut map = serializer.serialize_map(None)?;
2220 map.serialize_entry("type", "long")?;
2221 map.serialize_entry("logicalType", "time-micros")?;
2222 map.end()
2223 }
2224 Schema::TimestampMillis => {
2225 let mut map = serializer.serialize_map(None)?;
2226 map.serialize_entry("type", "long")?;
2227 map.serialize_entry("logicalType", "timestamp-millis")?;
2228 map.end()
2229 }
2230 Schema::TimestampMicros => {
2231 let mut map = serializer.serialize_map(None)?;
2232 map.serialize_entry("type", "long")?;
2233 map.serialize_entry("logicalType", "timestamp-micros")?;
2234 map.end()
2235 }
2236 Schema::TimestampNanos => {
2237 let mut map = serializer.serialize_map(None)?;
2238 map.serialize_entry("type", "long")?;
2239 map.serialize_entry("logicalType", "timestamp-nanos")?;
2240 map.end()
2241 }
2242 Schema::LocalTimestampMillis => {
2243 let mut map = serializer.serialize_map(None)?;
2244 map.serialize_entry("type", "long")?;
2245 map.serialize_entry("logicalType", "local-timestamp-millis")?;
2246 map.end()
2247 }
2248 Schema::LocalTimestampMicros => {
2249 let mut map = serializer.serialize_map(None)?;
2250 map.serialize_entry("type", "long")?;
2251 map.serialize_entry("logicalType", "local-timestamp-micros")?;
2252 map.end()
2253 }
2254 Schema::LocalTimestampNanos => {
2255 let mut map = serializer.serialize_map(None)?;
2256 map.serialize_entry("type", "long")?;
2257 map.serialize_entry("logicalType", "local-timestamp-nanos")?;
2258 map.end()
2259 }
2260 Schema::Duration => {
2261 let mut map = serializer.serialize_map(None)?;
2262
2263 let inner = Schema::Fixed(FixedSchema {
2266 name: Name::new("duration").unwrap(),
2267 aliases: None,
2268 doc: None,
2269 size: 12,
2270 default: None,
2271 attributes: Default::default(),
2272 });
2273 map.serialize_entry("type", &inner)?;
2274 map.serialize_entry("logicalType", "duration")?;
2275 map.end()
2276 }
2277 }
2278 }
2279}
2280
2281impl Serialize for RecordField {
2282 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
2283 where
2284 S: Serializer,
2285 {
2286 let mut map = serializer.serialize_map(None)?;
2287 map.serialize_entry("name", &self.name)?;
2288 map.serialize_entry("type", &self.schema)?;
2289
2290 if let Some(ref default) = self.default {
2291 map.serialize_entry("default", default)?;
2292 }
2293
2294 if let Some(ref aliases) = self.aliases {
2295 map.serialize_entry("aliases", aliases)?;
2296 }
2297
2298 for attr in &self.custom_attributes {
2299 map.serialize_entry(attr.0, attr.1)?;
2300 }
2301
2302 map.end()
2303 }
2304}
2305
2306fn parsing_canonical_form(schema: &Value, defined_names: &mut HashSet<String>) -> String {
2309 match schema {
2310 Value::Object(map) => pcf_map(map, defined_names),
2311 Value::String(s) => pcf_string(s),
2312 Value::Array(v) => pcf_array(v, defined_names),
2313 json => panic!("got invalid JSON value for canonical form of schema: {json}"),
2314 }
2315}
2316
2317fn pcf_map(schema: &Map<String, Value>, defined_names: &mut HashSet<String>) -> String {
2318 let ns = schema.get("namespace").and_then(|v| v.as_str());
2320 let typ = schema.get("type").and_then(|v| v.as_str());
2321 let raw_name = schema.get("name").and_then(|v| v.as_str());
2322 let name = if is_named_type(typ) {
2323 Some(format!(
2324 "{}{}",
2325 ns.map_or("".to_string(), |n| { format!("{n}.") }),
2326 raw_name.unwrap_or_default()
2327 ))
2328 } else {
2329 None
2330 };
2331
2332 if let Some(ref n) = name {
2334 if defined_names.contains(n) {
2335 return pcf_string(n);
2336 } else {
2337 defined_names.insert(n.clone());
2338 }
2339 }
2340
2341 let mut fields = Vec::new();
2342 for (k, v) in schema {
2343 if schema.len() == 1 && k == "type" {
2345 if let Value::String(s) = v {
2347 return pcf_string(s);
2348 }
2349 }
2350
2351 if field_ordering_position(k).is_none()
2353 || k == "default"
2354 || k == "doc"
2355 || k == "aliases"
2356 || k == "logicalType"
2357 {
2358 continue;
2359 }
2360
2361 if k == "name" {
2363 if let Some(ref n) = name {
2364 fields.push(("name", format!("{}:{}", pcf_string(k), pcf_string(n))));
2365 continue;
2366 }
2367 }
2368
2369 if k == "size" || k == "precision" || k == "scale" {
2371 let i = match v.as_str() {
2372 Some(s) => s.parse::<i64>().expect("Only valid schemas are accepted!"),
2373 None => v.as_i64().unwrap(),
2374 };
2375 fields.push((k, format!("{}:{}", pcf_string(k), i)));
2376 continue;
2377 }
2378
2379 fields.push((
2381 k,
2382 format!(
2383 "{}:{}",
2384 pcf_string(k),
2385 parsing_canonical_form(v, defined_names)
2386 ),
2387 ));
2388 }
2389
2390 fields.sort_unstable_by_key(|(k, _)| field_ordering_position(k).unwrap());
2392 let inter = fields
2393 .into_iter()
2394 .map(|(_, v)| v)
2395 .collect::<Vec<_>>()
2396 .join(",");
2397 format!("{{{inter}}}")
2398}
2399
2400fn is_named_type(typ: Option<&str>) -> bool {
2401 matches!(
2402 typ,
2403 Some("record") | Some("enum") | Some("fixed") | Some("ref")
2404 )
2405}
2406
2407fn pcf_array(arr: &[Value], defined_names: &mut HashSet<String>) -> String {
2408 let inter = arr
2409 .iter()
2410 .map(|a| parsing_canonical_form(a, defined_names))
2411 .collect::<Vec<String>>()
2412 .join(",");
2413 format!("[{inter}]")
2414}
2415
2416fn pcf_string(s: &str) -> String {
2417 format!("\"{s}\"")
2418}
2419
2420const RESERVED_FIELDS: &[&str] = &[
2421 "name",
2422 "type",
2423 "fields",
2424 "symbols",
2425 "items",
2426 "values",
2427 "size",
2428 "logicalType",
2429 "order",
2430 "doc",
2431 "aliases",
2432 "default",
2433 "precision",
2434 "scale",
2435];
2436
2437fn field_ordering_position(field: &str) -> Option<usize> {
2439 RESERVED_FIELDS
2440 .iter()
2441 .position(|&f| f == field)
2442 .map(|pos| pos + 1)
2443}
2444
2445pub trait AvroSchema {
2450 fn get_schema() -> Schema;
2451}
2452
2453#[cfg(feature = "derive")]
2454pub mod derive {
2455 use super::*;
2456 use std::borrow::Cow;
2457
2458 pub trait AvroSchemaComponent {
2507 fn get_schema_in_ctxt(named_schemas: &mut Names, enclosing_namespace: &Namespace)
2508 -> Schema;
2509 }
2510
2511 impl<T> AvroSchema for T
2512 where
2513 T: AvroSchemaComponent,
2514 {
2515 fn get_schema() -> Schema {
2516 T::get_schema_in_ctxt(&mut HashMap::default(), &None)
2517 }
2518 }
2519
2520 macro_rules! impl_schema (
2521 ($type:ty, $variant_constructor:expr) => (
2522 impl AvroSchemaComponent for $type {
2523 fn get_schema_in_ctxt(_: &mut Names, _: &Namespace) -> Schema {
2524 $variant_constructor
2525 }
2526 }
2527 );
2528 );
2529
2530 impl_schema!(bool, Schema::Boolean);
2531 impl_schema!(i8, Schema::Int);
2532 impl_schema!(i16, Schema::Int);
2533 impl_schema!(i32, Schema::Int);
2534 impl_schema!(i64, Schema::Long);
2535 impl_schema!(u8, Schema::Int);
2536 impl_schema!(u16, Schema::Int);
2537 impl_schema!(u32, Schema::Long);
2538 impl_schema!(f32, Schema::Float);
2539 impl_schema!(f64, Schema::Double);
2540 impl_schema!(String, Schema::String);
2541 impl_schema!(uuid::Uuid, Schema::Uuid);
2542 impl_schema!(core::time::Duration, Schema::Duration);
2543
2544 impl<T> AvroSchemaComponent for Vec<T>
2545 where
2546 T: AvroSchemaComponent,
2547 {
2548 fn get_schema_in_ctxt(
2549 named_schemas: &mut Names,
2550 enclosing_namespace: &Namespace,
2551 ) -> Schema {
2552 Schema::array(T::get_schema_in_ctxt(named_schemas, enclosing_namespace))
2553 }
2554 }
2555
2556 impl<T> AvroSchemaComponent for Option<T>
2557 where
2558 T: AvroSchemaComponent,
2559 {
2560 fn get_schema_in_ctxt(
2561 named_schemas: &mut Names,
2562 enclosing_namespace: &Namespace,
2563 ) -> Schema {
2564 let inner_schema = T::get_schema_in_ctxt(named_schemas, enclosing_namespace);
2565 Schema::Union(UnionSchema {
2566 schemas: vec![Schema::Null, inner_schema.clone()],
2567 variant_index: vec![Schema::Null, inner_schema]
2568 .iter()
2569 .enumerate()
2570 .map(|(idx, s)| (SchemaKind::from(s), idx))
2571 .collect(),
2572 })
2573 }
2574 }
2575
2576 impl<T> AvroSchemaComponent for Map<String, T>
2577 where
2578 T: AvroSchemaComponent,
2579 {
2580 fn get_schema_in_ctxt(
2581 named_schemas: &mut Names,
2582 enclosing_namespace: &Namespace,
2583 ) -> Schema {
2584 Schema::map(T::get_schema_in_ctxt(named_schemas, enclosing_namespace))
2585 }
2586 }
2587
2588 impl<T> AvroSchemaComponent for HashMap<String, T>
2589 where
2590 T: AvroSchemaComponent,
2591 {
2592 fn get_schema_in_ctxt(
2593 named_schemas: &mut Names,
2594 enclosing_namespace: &Namespace,
2595 ) -> Schema {
2596 Schema::map(T::get_schema_in_ctxt(named_schemas, enclosing_namespace))
2597 }
2598 }
2599
2600 impl<T> AvroSchemaComponent for Box<T>
2601 where
2602 T: AvroSchemaComponent,
2603 {
2604 fn get_schema_in_ctxt(
2605 named_schemas: &mut Names,
2606 enclosing_namespace: &Namespace,
2607 ) -> Schema {
2608 T::get_schema_in_ctxt(named_schemas, enclosing_namespace)
2609 }
2610 }
2611
2612 impl<T> AvroSchemaComponent for std::sync::Mutex<T>
2613 where
2614 T: AvroSchemaComponent,
2615 {
2616 fn get_schema_in_ctxt(
2617 named_schemas: &mut Names,
2618 enclosing_namespace: &Namespace,
2619 ) -> Schema {
2620 T::get_schema_in_ctxt(named_schemas, enclosing_namespace)
2621 }
2622 }
2623
2624 impl<T> AvroSchemaComponent for Cow<'_, T>
2625 where
2626 T: AvroSchemaComponent + Clone,
2627 {
2628 fn get_schema_in_ctxt(
2629 named_schemas: &mut Names,
2630 enclosing_namespace: &Namespace,
2631 ) -> Schema {
2632 T::get_schema_in_ctxt(named_schemas, enclosing_namespace)
2633 }
2634 }
2635}
2636
2637#[cfg(test)]
2638mod tests {
2639 use super::*;
2640 use crate::{rabin::Rabin, SpecificSingleObjectWriter};
2641 use apache_avro_test_helper::{
2642 logger::{assert_logged, assert_not_logged},
2643 TestResult,
2644 };
2645 use serde_json::json;
2646 use serial_test::serial;
2647 use std::sync::atomic::Ordering;
2648
2649 #[test]
2650 fn test_invalid_schema() {
2651 assert!(Schema::parse_str("invalid").is_err());
2652 }
2653
2654 #[test]
2655 fn test_primitive_schema() -> TestResult {
2656 assert_eq!(Schema::Null, Schema::parse_str("\"null\"")?);
2657 assert_eq!(Schema::Int, Schema::parse_str("\"int\"")?);
2658 assert_eq!(Schema::Double, Schema::parse_str("\"double\"")?);
2659 Ok(())
2660 }
2661
2662 #[test]
2663 fn test_array_schema() -> TestResult {
2664 let schema = Schema::parse_str(r#"{"type": "array", "items": "string"}"#)?;
2665 assert_eq!(Schema::array(Schema::String), schema);
2666 Ok(())
2667 }
2668
2669 #[test]
2670 fn test_map_schema() -> TestResult {
2671 let schema = Schema::parse_str(r#"{"type": "map", "values": "double"}"#)?;
2672 assert_eq!(Schema::map(Schema::Double), schema);
2673 Ok(())
2674 }
2675
2676 #[test]
2677 fn test_union_schema() -> TestResult {
2678 let schema = Schema::parse_str(r#"["null", "int"]"#)?;
2679 assert_eq!(
2680 Schema::Union(UnionSchema::new(vec![Schema::Null, Schema::Int])?),
2681 schema
2682 );
2683 Ok(())
2684 }
2685
2686 #[test]
2687 fn test_union_unsupported_schema() {
2688 let schema = Schema::parse_str(r#"["null", ["null", "int"], "string"]"#);
2689 assert!(schema.is_err());
2690 }
2691
2692 #[test]
2693 fn test_multi_union_schema() -> TestResult {
2694 let schema = Schema::parse_str(r#"["null", "int", "float", "string", "bytes"]"#);
2695 assert!(schema.is_ok());
2696 let schema = schema?;
2697 assert_eq!(SchemaKind::from(&schema), SchemaKind::Union);
2698 let union_schema = match schema {
2699 Schema::Union(u) => u,
2700 _ => unreachable!(),
2701 };
2702 assert_eq!(union_schema.variants().len(), 5);
2703 let mut variants = union_schema.variants().iter();
2704 assert_eq!(SchemaKind::from(variants.next().unwrap()), SchemaKind::Null);
2705 assert_eq!(SchemaKind::from(variants.next().unwrap()), SchemaKind::Int);
2706 assert_eq!(
2707 SchemaKind::from(variants.next().unwrap()),
2708 SchemaKind::Float
2709 );
2710 assert_eq!(
2711 SchemaKind::from(variants.next().unwrap()),
2712 SchemaKind::String
2713 );
2714 assert_eq!(
2715 SchemaKind::from(variants.next().unwrap()),
2716 SchemaKind::Bytes
2717 );
2718 assert_eq!(variants.next(), None);
2719
2720 Ok(())
2721 }
2722
2723 #[test]
2724 fn test_avro_3621_nullable_record_field() -> TestResult {
2725 let nullable_record_field = RecordField::builder()
2726 .name("next".to_string())
2727 .schema(Schema::Union(UnionSchema::new(vec![
2728 Schema::Null,
2729 Schema::Ref {
2730 name: Name {
2731 name: "LongList".to_owned(),
2732 namespace: None,
2733 },
2734 },
2735 ])?))
2736 .order(RecordFieldOrder::Ascending)
2737 .position(1)
2738 .build();
2739
2740 assert!(nullable_record_field.is_nullable());
2741
2742 let non_nullable_record_field = RecordField::builder()
2743 .name("next".to_string())
2744 .default(json!(2))
2745 .schema(Schema::Long)
2746 .order(RecordFieldOrder::Ascending)
2747 .position(1)
2748 .build();
2749
2750 assert!(!non_nullable_record_field.is_nullable());
2751 Ok(())
2752 }
2753
2754 #[test]
2756 fn test_union_of_records() -> TestResult {
2757 use std::iter::FromIterator;
2758
2759 let schema_str_a = r#"{
2761 "name": "A",
2762 "type": "record",
2763 "fields": [
2764 {"name": "field_one", "type": "float"}
2765 ]
2766 }"#;
2767
2768 let schema_str_b = r#"{
2769 "name": "B",
2770 "type": "record",
2771 "fields": [
2772 {"name": "field_one", "type": "float"}
2773 ]
2774 }"#;
2775
2776 let schema_str_c = r#"{
2778 "name": "C",
2779 "type": "record",
2780 "fields": [
2781 {"name": "field_one", "type": ["A", "B"]}
2782 ]
2783 }"#;
2784
2785 let schema_c = Schema::parse_list(&[schema_str_a, schema_str_b, schema_str_c])?
2786 .last()
2787 .unwrap()
2788 .clone();
2789
2790 let schema_c_expected = Schema::Record(
2791 RecordSchema::builder()
2792 .name(Name::new("C")?)
2793 .fields(vec![RecordField::builder()
2794 .name("field_one".to_string())
2795 .schema(Schema::Union(UnionSchema::new(vec![
2796 Schema::Ref {
2797 name: Name::new("A")?,
2798 },
2799 Schema::Ref {
2800 name: Name::new("B")?,
2801 },
2802 ])?))
2803 .build()])
2804 .lookup(BTreeMap::from_iter(vec![("field_one".to_string(), 0)]))
2805 .build(),
2806 );
2807
2808 assert_eq!(schema_c, schema_c_expected);
2809 Ok(())
2810 }
2811
2812 #[test]
2813 fn avro_rs_104_test_root_union_of_records() -> TestResult {
2814 let schema_str_a = r#"{
2816 "name": "A",
2817 "type": "record",
2818 "fields": [
2819 {"name": "field_one", "type": "float"}
2820 ]
2821 }"#;
2822
2823 let schema_str_b = r#"{
2824 "name": "B",
2825 "type": "record",
2826 "fields": [
2827 {"name": "field_one", "type": "float"}
2828 ]
2829 }"#;
2830
2831 let schema_str_c = r#"["A", "B"]"#;
2832
2833 let (schema_c, schemata) =
2834 Schema::parse_str_with_list(schema_str_c, &[schema_str_a, schema_str_b])?;
2835
2836 let schema_a_expected = Schema::Record(RecordSchema {
2837 name: Name::new("A")?,
2838 aliases: None,
2839 doc: None,
2840 fields: vec![RecordField {
2841 name: "field_one".to_string(),
2842 doc: None,
2843 default: None,
2844 aliases: None,
2845 schema: Schema::Float,
2846 order: RecordFieldOrder::Ignore,
2847 position: 0,
2848 custom_attributes: Default::default(),
2849 }],
2850 lookup: BTreeMap::from_iter(vec![("field_one".to_string(), 0)]),
2851 attributes: Default::default(),
2852 });
2853
2854 let schema_b_expected = Schema::Record(RecordSchema {
2855 name: Name::new("B")?,
2856 aliases: None,
2857 doc: None,
2858 fields: vec![RecordField {
2859 name: "field_one".to_string(),
2860 doc: None,
2861 default: None,
2862 aliases: None,
2863 schema: Schema::Float,
2864 order: RecordFieldOrder::Ignore,
2865 position: 0,
2866 custom_attributes: Default::default(),
2867 }],
2868 lookup: BTreeMap::from_iter(vec![("field_one".to_string(), 0)]),
2869 attributes: Default::default(),
2870 });
2871
2872 let schema_c_expected = Schema::Union(UnionSchema::new(vec![
2873 Schema::Ref {
2874 name: Name::new("A")?,
2875 },
2876 Schema::Ref {
2877 name: Name::new("B")?,
2878 },
2879 ])?);
2880
2881 assert_eq!(schema_c, schema_c_expected);
2882 assert_eq!(schemata[0], schema_a_expected);
2883 assert_eq!(schemata[1], schema_b_expected);
2884
2885 Ok(())
2886 }
2887
2888 #[test]
2889 fn avro_rs_104_test_root_union_of_records_name_collision() -> TestResult {
2890 let schema_str_a1 = r#"{
2892 "name": "A",
2893 "type": "record",
2894 "fields": [
2895 {"name": "field_one", "type": "float"}
2896 ]
2897 }"#;
2898
2899 let schema_str_a2 = r#"{
2900 "name": "A",
2901 "type": "record",
2902 "fields": [
2903 {"name": "field_one", "type": "float"}
2904 ]
2905 }"#;
2906
2907 let schema_str_c = r#"["A", "A"]"#;
2908
2909 match Schema::parse_str_with_list(schema_str_c, &[schema_str_a1, schema_str_a2]) {
2910 Ok(_) => unreachable!("Expected an error that the name is already defined"),
2911 Err(e) => assert_eq!(
2912 e.to_string(),
2913 "Two schemas with the same fullname were given: \"A\""
2914 ),
2915 }
2916
2917 Ok(())
2918 }
2919
2920 #[test]
2921 fn avro_rs_104_test_root_union_of_records_no_name() -> TestResult {
2922 let schema_str_a = r#"{
2923 "name": "A",
2924 "type": "record",
2925 "fields": [
2926 {"name": "field_one", "type": "float"}
2927 ]
2928 }"#;
2929
2930 let schema_str_b = r#"{
2932 "type": "record",
2933 "fields": [
2934 {"name": "field_one", "type": "float"}
2935 ]
2936 }"#;
2937
2938 let schema_str_c = r#"["A", "A"]"#;
2939
2940 match Schema::parse_str_with_list(schema_str_c, &[schema_str_a, schema_str_b]) {
2941 Ok(_) => unreachable!("Expected an error that schema_str_b is missing a name field"),
2942 Err(e) => assert_eq!(e.to_string(), "No `name` field"),
2943 }
2944
2945 Ok(())
2946 }
2947
2948 #[test]
2949 fn avro_3584_test_recursion_records() -> TestResult {
2950 let schema_str_a = r#"{
2952 "name": "A",
2953 "type": "record",
2954 "fields": [ {"name": "field_one", "type": "B"} ]
2955 }"#;
2956
2957 let schema_str_b = r#"{
2958 "name": "B",
2959 "type": "record",
2960 "fields": [ {"name": "field_one", "type": "A"} ]
2961 }"#;
2962
2963 let list = Schema::parse_list(&[schema_str_a, schema_str_b])?;
2964
2965 let schema_a = list.first().unwrap().clone();
2966
2967 match schema_a {
2968 Schema::Record(RecordSchema { fields, .. }) => {
2969 let f1 = fields.first();
2970
2971 let ref_schema = Schema::Ref {
2972 name: Name::new("B")?,
2973 };
2974 assert_eq!(ref_schema, f1.unwrap().schema);
2975 }
2976 _ => panic!("Expected a record schema!"),
2977 }
2978
2979 Ok(())
2980 }
2981
2982 #[test]
2983 fn test_avro_3248_nullable_record() -> TestResult {
2984 use std::iter::FromIterator;
2985
2986 let schema_str_a = r#"{
2987 "name": "A",
2988 "type": "record",
2989 "fields": [
2990 {"name": "field_one", "type": "float"}
2991 ]
2992 }"#;
2993
2994 let schema_str_option_a = r#"{
2996 "name": "OptionA",
2997 "type": "record",
2998 "fields": [
2999 {"name": "field_one", "type": ["null", "A"], "default": null}
3000 ]
3001 }"#;
3002
3003 let schema_option_a = Schema::parse_list(&[schema_str_a, schema_str_option_a])?
3004 .last()
3005 .unwrap()
3006 .clone();
3007
3008 let schema_option_a_expected = Schema::Record(RecordSchema {
3009 name: Name::new("OptionA")?,
3010 aliases: None,
3011 doc: None,
3012 fields: vec![RecordField {
3013 name: "field_one".to_string(),
3014 doc: None,
3015 default: Some(Value::Null),
3016 aliases: None,
3017 schema: Schema::Union(UnionSchema::new(vec![
3018 Schema::Null,
3019 Schema::Ref {
3020 name: Name::new("A")?,
3021 },
3022 ])?),
3023 order: RecordFieldOrder::Ignore,
3024 position: 0,
3025 custom_attributes: Default::default(),
3026 }],
3027 lookup: BTreeMap::from_iter(vec![("field_one".to_string(), 0)]),
3028 attributes: Default::default(),
3029 });
3030
3031 assert_eq!(schema_option_a, schema_option_a_expected);
3032
3033 Ok(())
3034 }
3035
3036 #[test]
3037 fn test_record_schema() -> TestResult {
3038 let parsed = Schema::parse_str(
3039 r#"
3040 {
3041 "type": "record",
3042 "name": "test",
3043 "fields": [
3044 {"name": "a", "type": "long", "default": 42},
3045 {"name": "b", "type": "string"}
3046 ]
3047 }
3048 "#,
3049 )?;
3050
3051 let mut lookup = BTreeMap::new();
3052 lookup.insert("a".to_owned(), 0);
3053 lookup.insert("b".to_owned(), 1);
3054
3055 let expected = Schema::Record(RecordSchema {
3056 name: Name::new("test")?,
3057 aliases: None,
3058 doc: None,
3059 fields: vec![
3060 RecordField {
3061 name: "a".to_string(),
3062 doc: None,
3063 default: Some(Value::Number(42i64.into())),
3064 aliases: None,
3065 schema: Schema::Long,
3066 order: RecordFieldOrder::Ascending,
3067 position: 0,
3068 custom_attributes: Default::default(),
3069 },
3070 RecordField {
3071 name: "b".to_string(),
3072 doc: None,
3073 default: None,
3074 aliases: None,
3075 schema: Schema::String,
3076 order: RecordFieldOrder::Ascending,
3077 position: 1,
3078 custom_attributes: Default::default(),
3079 },
3080 ],
3081 lookup,
3082 attributes: Default::default(),
3083 });
3084
3085 assert_eq!(parsed, expected);
3086
3087 Ok(())
3088 }
3089
3090 #[test]
3091 fn test_avro_3302_record_schema_with_currently_parsing_schema() -> TestResult {
3092 let schema = Schema::parse_str(
3093 r#"
3094 {
3095 "type": "record",
3096 "name": "test",
3097 "fields": [{
3098 "name": "recordField",
3099 "type": {
3100 "type": "record",
3101 "name": "Node",
3102 "fields": [
3103 {"name": "label", "type": "string"},
3104 {"name": "children", "type": {"type": "array", "items": "Node"}}
3105 ]
3106 }
3107 }]
3108 }
3109 "#,
3110 )?;
3111
3112 let mut lookup = BTreeMap::new();
3113 lookup.insert("recordField".to_owned(), 0);
3114
3115 let mut node_lookup = BTreeMap::new();
3116 node_lookup.insert("children".to_owned(), 1);
3117 node_lookup.insert("label".to_owned(), 0);
3118
3119 let expected = Schema::Record(RecordSchema {
3120 name: Name::new("test")?,
3121 aliases: None,
3122 doc: None,
3123 fields: vec![RecordField {
3124 name: "recordField".to_string(),
3125 doc: None,
3126 default: None,
3127 aliases: None,
3128 schema: Schema::Record(RecordSchema {
3129 name: Name::new("Node")?,
3130 aliases: None,
3131 doc: None,
3132 fields: vec![
3133 RecordField {
3134 name: "label".to_string(),
3135 doc: None,
3136 default: None,
3137 aliases: None,
3138 schema: Schema::String,
3139 order: RecordFieldOrder::Ascending,
3140 position: 0,
3141 custom_attributes: Default::default(),
3142 },
3143 RecordField {
3144 name: "children".to_string(),
3145 doc: None,
3146 default: None,
3147 aliases: None,
3148 schema: Schema::array(Schema::Ref {
3149 name: Name::new("Node")?,
3150 }),
3151 order: RecordFieldOrder::Ascending,
3152 position: 1,
3153 custom_attributes: Default::default(),
3154 },
3155 ],
3156 lookup: node_lookup,
3157 attributes: Default::default(),
3158 }),
3159 order: RecordFieldOrder::Ascending,
3160 position: 0,
3161 custom_attributes: Default::default(),
3162 }],
3163 lookup,
3164 attributes: Default::default(),
3165 });
3166 assert_eq!(schema, expected);
3167
3168 let canonical_form = &schema.canonical_form();
3169 let expected = r#"{"name":"test","type":"record","fields":[{"name":"recordField","type":{"name":"Node","type":"record","fields":[{"name":"label","type":"string"},{"name":"children","type":{"type":"array","items":"Node"}}]}}]}"#;
3170 assert_eq!(canonical_form, &expected);
3171
3172 Ok(())
3173 }
3174
3175 #[test]
3177 fn test_parsing_of_recursive_type_enum() -> TestResult {
3178 let schema = r#"
3179 {
3180 "type": "record",
3181 "name": "User",
3182 "namespace": "office",
3183 "fields": [
3184 {
3185 "name": "details",
3186 "type": [
3187 {
3188 "type": "record",
3189 "name": "Employee",
3190 "fields": [
3191 {
3192 "name": "gender",
3193 "type": {
3194 "type": "enum",
3195 "name": "Gender",
3196 "symbols": [
3197 "male",
3198 "female"
3199 ]
3200 },
3201 "default": "female"
3202 }
3203 ]
3204 },
3205 {
3206 "type": "record",
3207 "name": "Manager",
3208 "fields": [
3209 {
3210 "name": "gender",
3211 "type": "Gender"
3212 }
3213 ]
3214 }
3215 ]
3216 }
3217 ]
3218 }
3219 "#;
3220
3221 let schema = Schema::parse_str(schema)?;
3222 let schema_str = schema.canonical_form();
3223 let expected = r#"{"name":"office.User","type":"record","fields":[{"name":"details","type":[{"name":"office.Employee","type":"record","fields":[{"name":"gender","type":{"name":"office.Gender","type":"enum","symbols":["male","female"]}}]},{"name":"office.Manager","type":"record","fields":[{"name":"gender","type":"office.Gender"}]}]}]}"#;
3224 assert_eq!(schema_str, expected);
3225
3226 Ok(())
3227 }
3228
3229 #[test]
3230 fn test_parsing_of_recursive_type_fixed() -> TestResult {
3231 let schema = r#"
3232 {
3233 "type": "record",
3234 "name": "User",
3235 "namespace": "office",
3236 "fields": [
3237 {
3238 "name": "details",
3239 "type": [
3240 {
3241 "type": "record",
3242 "name": "Employee",
3243 "fields": [
3244 {
3245 "name": "id",
3246 "type": {
3247 "type": "fixed",
3248 "name": "EmployeeId",
3249 "size": 16
3250 },
3251 "default": "female"
3252 }
3253 ]
3254 },
3255 {
3256 "type": "record",
3257 "name": "Manager",
3258 "fields": [
3259 {
3260 "name": "id",
3261 "type": "EmployeeId"
3262 }
3263 ]
3264 }
3265 ]
3266 }
3267 ]
3268 }
3269 "#;
3270
3271 let schema = Schema::parse_str(schema)?;
3272 let schema_str = schema.canonical_form();
3273 let expected = r#"{"name":"office.User","type":"record","fields":[{"name":"details","type":[{"name":"office.Employee","type":"record","fields":[{"name":"id","type":{"name":"office.EmployeeId","type":"fixed","size":16}}]},{"name":"office.Manager","type":"record","fields":[{"name":"id","type":"office.EmployeeId"}]}]}]}"#;
3274 assert_eq!(schema_str, expected);
3275
3276 Ok(())
3277 }
3278
3279 #[test]
3280 fn test_avro_3302_record_schema_with_currently_parsing_schema_aliases() -> TestResult {
3281 let schema = Schema::parse_str(
3282 r#"
3283 {
3284 "type": "record",
3285 "name": "LongList",
3286 "aliases": ["LinkedLongs"],
3287 "fields" : [
3288 {"name": "value", "type": "long"},
3289 {"name": "next", "type": ["null", "LinkedLongs"]}
3290 ]
3291 }
3292 "#,
3293 )?;
3294
3295 let mut lookup = BTreeMap::new();
3296 lookup.insert("value".to_owned(), 0);
3297 lookup.insert("next".to_owned(), 1);
3298
3299 let expected = Schema::Record(RecordSchema {
3300 name: Name {
3301 name: "LongList".to_owned(),
3302 namespace: None,
3303 },
3304 aliases: Some(vec![Alias::new("LinkedLongs").unwrap()]),
3305 doc: None,
3306 fields: vec![
3307 RecordField {
3308 name: "value".to_string(),
3309 doc: None,
3310 default: None,
3311 aliases: None,
3312 schema: Schema::Long,
3313 order: RecordFieldOrder::Ascending,
3314 position: 0,
3315 custom_attributes: Default::default(),
3316 },
3317 RecordField {
3318 name: "next".to_string(),
3319 doc: None,
3320 default: None,
3321 aliases: None,
3322 schema: Schema::Union(UnionSchema::new(vec![
3323 Schema::Null,
3324 Schema::Ref {
3325 name: Name {
3326 name: "LongList".to_owned(),
3327 namespace: None,
3328 },
3329 },
3330 ])?),
3331 order: RecordFieldOrder::Ascending,
3332 position: 1,
3333 custom_attributes: Default::default(),
3334 },
3335 ],
3336 lookup,
3337 attributes: Default::default(),
3338 });
3339 assert_eq!(schema, expected);
3340
3341 let canonical_form = &schema.canonical_form();
3342 let expected = r#"{"name":"LongList","type":"record","fields":[{"name":"value","type":"long"},{"name":"next","type":["null","LongList"]}]}"#;
3343 assert_eq!(canonical_form, &expected);
3344
3345 Ok(())
3346 }
3347
3348 #[test]
3349 fn test_avro_3370_record_schema_with_currently_parsing_schema_named_record() -> TestResult {
3350 let schema = Schema::parse_str(
3351 r#"
3352 {
3353 "type" : "record",
3354 "name" : "record",
3355 "fields" : [
3356 { "name" : "value", "type" : "long" },
3357 { "name" : "next", "type" : "record" }
3358 ]
3359 }
3360 "#,
3361 )?;
3362
3363 let mut lookup = BTreeMap::new();
3364 lookup.insert("value".to_owned(), 0);
3365 lookup.insert("next".to_owned(), 1);
3366
3367 let expected = Schema::Record(RecordSchema {
3368 name: Name {
3369 name: "record".to_owned(),
3370 namespace: None,
3371 },
3372 aliases: None,
3373 doc: None,
3374 fields: vec![
3375 RecordField {
3376 name: "value".to_string(),
3377 doc: None,
3378 default: None,
3379 aliases: None,
3380 schema: Schema::Long,
3381 order: RecordFieldOrder::Ascending,
3382 position: 0,
3383 custom_attributes: Default::default(),
3384 },
3385 RecordField {
3386 name: "next".to_string(),
3387 doc: None,
3388 default: None,
3389 aliases: None,
3390 schema: Schema::Ref {
3391 name: Name {
3392 name: "record".to_owned(),
3393 namespace: None,
3394 },
3395 },
3396 order: RecordFieldOrder::Ascending,
3397 position: 1,
3398 custom_attributes: Default::default(),
3399 },
3400 ],
3401 lookup,
3402 attributes: Default::default(),
3403 });
3404 assert_eq!(schema, expected);
3405
3406 let canonical_form = &schema.canonical_form();
3407 let expected = r#"{"name":"record","type":"record","fields":[{"name":"value","type":"long"},{"name":"next","type":"record"}]}"#;
3408 assert_eq!(canonical_form, &expected);
3409
3410 Ok(())
3411 }
3412
3413 #[test]
3414 fn test_avro_3370_record_schema_with_currently_parsing_schema_named_enum() -> TestResult {
3415 let schema = Schema::parse_str(
3416 r#"
3417 {
3418 "type" : "record",
3419 "name" : "record",
3420 "fields" : [
3421 {
3422 "type" : "enum",
3423 "name" : "enum",
3424 "symbols": ["one", "two", "three"]
3425 },
3426 { "name" : "next", "type" : "enum" }
3427 ]
3428 }
3429 "#,
3430 )?;
3431
3432 let mut lookup = BTreeMap::new();
3433 lookup.insert("enum".to_owned(), 0);
3434 lookup.insert("next".to_owned(), 1);
3435
3436 let expected = Schema::Record(RecordSchema {
3437 name: Name {
3438 name: "record".to_owned(),
3439 namespace: None,
3440 },
3441 aliases: None,
3442 doc: None,
3443 fields: vec![
3444 RecordField {
3445 name: "enum".to_string(),
3446 doc: None,
3447 default: None,
3448 aliases: None,
3449 schema: Schema::Enum(
3450 EnumSchema::builder()
3451 .name(Name::new("enum")?)
3452 .symbols(vec![
3453 "one".to_string(),
3454 "two".to_string(),
3455 "three".to_string(),
3456 ])
3457 .build(),
3458 ),
3459 order: RecordFieldOrder::Ascending,
3460 position: 0,
3461 custom_attributes: Default::default(),
3462 },
3463 RecordField {
3464 name: "next".to_string(),
3465 doc: None,
3466 default: None,
3467 aliases: None,
3468 schema: Schema::Enum(EnumSchema {
3469 name: Name {
3470 name: "enum".to_owned(),
3471 namespace: None,
3472 },
3473 aliases: None,
3474 doc: None,
3475 symbols: vec!["one".to_string(), "two".to_string(), "three".to_string()],
3476 default: None,
3477 attributes: Default::default(),
3478 }),
3479 order: RecordFieldOrder::Ascending,
3480 position: 1,
3481 custom_attributes: Default::default(),
3482 },
3483 ],
3484 lookup,
3485 attributes: Default::default(),
3486 });
3487 assert_eq!(schema, expected);
3488
3489 let canonical_form = &schema.canonical_form();
3490 let expected = r#"{"name":"record","type":"record","fields":[{"name":"enum","type":{"name":"enum","type":"enum","symbols":["one","two","three"]}},{"name":"next","type":"enum"}]}"#;
3491 assert_eq!(canonical_form, &expected);
3492
3493 Ok(())
3494 }
3495
3496 #[test]
3497 fn test_avro_3370_record_schema_with_currently_parsing_schema_named_fixed() -> TestResult {
3498 let schema = Schema::parse_str(
3499 r#"
3500 {
3501 "type" : "record",
3502 "name" : "record",
3503 "fields" : [
3504 {
3505 "type" : "fixed",
3506 "name" : "fixed",
3507 "size": 456
3508 },
3509 { "name" : "next", "type" : "fixed" }
3510 ]
3511 }
3512 "#,
3513 )?;
3514
3515 let mut lookup = BTreeMap::new();
3516 lookup.insert("fixed".to_owned(), 0);
3517 lookup.insert("next".to_owned(), 1);
3518
3519 let expected = Schema::Record(RecordSchema {
3520 name: Name {
3521 name: "record".to_owned(),
3522 namespace: None,
3523 },
3524 aliases: None,
3525 doc: None,
3526 fields: vec![
3527 RecordField {
3528 name: "fixed".to_string(),
3529 doc: None,
3530 default: None,
3531 aliases: None,
3532 schema: Schema::Fixed(FixedSchema {
3533 name: Name {
3534 name: "fixed".to_owned(),
3535 namespace: None,
3536 },
3537 aliases: None,
3538 doc: None,
3539 size: 456,
3540 default: None,
3541 attributes: Default::default(),
3542 }),
3543 order: RecordFieldOrder::Ascending,
3544 position: 0,
3545 custom_attributes: Default::default(),
3546 },
3547 RecordField {
3548 name: "next".to_string(),
3549 doc: None,
3550 default: None,
3551 aliases: None,
3552 schema: Schema::Fixed(FixedSchema {
3553 name: Name {
3554 name: "fixed".to_owned(),
3555 namespace: None,
3556 },
3557 aliases: None,
3558 doc: None,
3559 size: 456,
3560 default: None,
3561 attributes: Default::default(),
3562 }),
3563 order: RecordFieldOrder::Ascending,
3564 position: 1,
3565 custom_attributes: Default::default(),
3566 },
3567 ],
3568 lookup,
3569 attributes: Default::default(),
3570 });
3571 assert_eq!(schema, expected);
3572
3573 let canonical_form = &schema.canonical_form();
3574 let expected = r#"{"name":"record","type":"record","fields":[{"name":"fixed","type":{"name":"fixed","type":"fixed","size":456}},{"name":"next","type":"fixed"}]}"#;
3575 assert_eq!(canonical_form, &expected);
3576
3577 Ok(())
3578 }
3579
3580 #[test]
3581 fn test_enum_schema() -> TestResult {
3582 let schema = Schema::parse_str(
3583 r#"{"type": "enum", "name": "Suit", "symbols": ["diamonds", "spades", "clubs", "hearts"]}"#,
3584 )?;
3585
3586 let expected = Schema::Enum(EnumSchema {
3587 name: Name::new("Suit")?,
3588 aliases: None,
3589 doc: None,
3590 symbols: vec![
3591 "diamonds".to_owned(),
3592 "spades".to_owned(),
3593 "clubs".to_owned(),
3594 "hearts".to_owned(),
3595 ],
3596 default: None,
3597 attributes: Default::default(),
3598 });
3599
3600 assert_eq!(expected, schema);
3601
3602 Ok(())
3603 }
3604
3605 #[test]
3606 fn test_enum_schema_duplicate() -> TestResult {
3607 let schema = Schema::parse_str(
3609 r#"{"type": "enum", "name": "Suit", "symbols": ["diamonds", "spades", "clubs", "diamonds"]}"#,
3610 );
3611 assert!(schema.is_err());
3612
3613 Ok(())
3614 }
3615
3616 #[test]
3617 fn test_enum_schema_name() -> TestResult {
3618 let schema = Schema::parse_str(
3620 r#"{"type": "enum", "name": "Enum", "symbols": ["0000", "variant"]}"#,
3621 );
3622 assert!(schema.is_err());
3623
3624 Ok(())
3625 }
3626
3627 #[test]
3628 fn test_fixed_schema() -> TestResult {
3629 let schema = Schema::parse_str(r#"{"type": "fixed", "name": "test", "size": 16}"#)?;
3630
3631 let expected = Schema::Fixed(FixedSchema {
3632 name: Name::new("test")?,
3633 aliases: None,
3634 doc: None,
3635 size: 16_usize,
3636 default: None,
3637 attributes: Default::default(),
3638 });
3639
3640 assert_eq!(expected, schema);
3641
3642 Ok(())
3643 }
3644
3645 #[test]
3646 fn test_fixed_schema_with_documentation() -> TestResult {
3647 let schema = Schema::parse_str(
3648 r#"{"type": "fixed", "name": "test", "size": 16, "doc": "FixedSchema documentation"}"#,
3649 )?;
3650
3651 let expected = Schema::Fixed(FixedSchema {
3652 name: Name::new("test")?,
3653 aliases: None,
3654 doc: Some(String::from("FixedSchema documentation")),
3655 size: 16_usize,
3656 default: None,
3657 attributes: Default::default(),
3658 });
3659
3660 assert_eq!(expected, schema);
3661
3662 Ok(())
3663 }
3664
3665 #[test]
3666 fn test_no_documentation() -> TestResult {
3667 let schema = Schema::parse_str(
3668 r#"{"type": "enum", "name": "Coin", "symbols": ["heads", "tails"]}"#,
3669 )?;
3670
3671 let doc = match schema {
3672 Schema::Enum(EnumSchema { doc, .. }) => doc,
3673 _ => unreachable!(),
3674 };
3675
3676 assert!(doc.is_none());
3677
3678 Ok(())
3679 }
3680
3681 #[test]
3682 fn test_documentation() -> TestResult {
3683 let schema = Schema::parse_str(
3684 r#"{"type": "enum", "name": "Coin", "doc": "Some documentation", "symbols": ["heads", "tails"]}"#,
3685 )?;
3686
3687 let doc = match schema {
3688 Schema::Enum(EnumSchema { doc, .. }) => doc,
3689 _ => None,
3690 };
3691
3692 assert_eq!("Some documentation".to_owned(), doc.unwrap());
3693
3694 Ok(())
3695 }
3696
3697 #[test]
3700 fn test_schema_is_send() {
3701 fn send<S: Send>(_s: S) {}
3702
3703 let schema = Schema::Null;
3704 send(schema);
3705 }
3706
3707 #[test]
3708 fn test_schema_is_sync() {
3709 fn sync<S: Sync>(_s: S) {}
3710
3711 let schema = Schema::Null;
3712 sync(&schema);
3713 sync(schema);
3714 }
3715
3716 #[test]
3717 fn test_schema_fingerprint() -> TestResult {
3718 use crate::rabin::Rabin;
3719 use md5::Md5;
3720 use sha2::Sha256;
3721
3722 let raw_schema = r#"
3723 {
3724 "type": "record",
3725 "name": "test",
3726 "fields": [
3727 {"name": "a", "type": "long", "default": 42},
3728 {"name": "b", "type": "string"},
3729 {"name": "c", "type": "long", "logicalType": "timestamp-micros"}
3730 ]
3731 }
3732"#;
3733
3734 let schema = Schema::parse_str(raw_schema)?;
3735 assert_eq!(
3736 "7eb3b28d73dfc99bdd9af1848298b40804a2f8ad5d2642be2ecc2ad34842b987",
3737 format!("{}", schema.fingerprint::<Sha256>())
3738 );
3739
3740 assert_eq!(
3741 "cb11615e412ee5d872620d8df78ff6ae",
3742 format!("{}", schema.fingerprint::<Md5>())
3743 );
3744 assert_eq!(
3745 "92f2ccef718c6754",
3746 format!("{}", schema.fingerprint::<Rabin>())
3747 );
3748
3749 Ok(())
3750 }
3751
3752 #[test]
3753 fn test_logical_types() -> TestResult {
3754 let schema = Schema::parse_str(r#"{"type": "int", "logicalType": "date"}"#)?;
3755 assert_eq!(schema, Schema::Date);
3756
3757 let schema = Schema::parse_str(r#"{"type": "long", "logicalType": "timestamp-micros"}"#)?;
3758 assert_eq!(schema, Schema::TimestampMicros);
3759
3760 Ok(())
3761 }
3762
3763 #[test]
3764 fn test_nullable_logical_type() -> TestResult {
3765 let schema = Schema::parse_str(
3766 r#"{"type": ["null", {"type": "long", "logicalType": "timestamp-micros"}]}"#,
3767 )?;
3768 assert_eq!(
3769 schema,
3770 Schema::Union(UnionSchema::new(vec![
3771 Schema::Null,
3772 Schema::TimestampMicros,
3773 ])?)
3774 );
3775
3776 Ok(())
3777 }
3778
3779 #[test]
3780 fn record_field_order_from_str() -> TestResult {
3781 use std::str::FromStr;
3782
3783 assert_eq!(
3784 RecordFieldOrder::from_str("ascending").unwrap(),
3785 RecordFieldOrder::Ascending
3786 );
3787 assert_eq!(
3788 RecordFieldOrder::from_str("descending").unwrap(),
3789 RecordFieldOrder::Descending
3790 );
3791 assert_eq!(
3792 RecordFieldOrder::from_str("ignore").unwrap(),
3793 RecordFieldOrder::Ignore
3794 );
3795 assert!(RecordFieldOrder::from_str("not an ordering").is_err());
3796
3797 Ok(())
3798 }
3799
3800 #[test]
3801 fn test_avro_3374_preserve_namespace_for_primitive() -> TestResult {
3802 let schema = Schema::parse_str(
3803 r#"
3804 {
3805 "type" : "record",
3806 "name" : "ns.int",
3807 "fields" : [
3808 {"name" : "value", "type" : "int"},
3809 {"name" : "next", "type" : [ "null", "ns.int" ]}
3810 ]
3811 }
3812 "#,
3813 )?;
3814
3815 let json = schema.canonical_form();
3816 assert_eq!(
3817 json,
3818 r#"{"name":"ns.int","type":"record","fields":[{"name":"value","type":"int"},{"name":"next","type":["null","ns.int"]}]}"#
3819 );
3820
3821 Ok(())
3822 }
3823
3824 #[test]
3825 fn test_avro_3433_preserve_schema_refs_in_json() -> TestResult {
3826 let schema = r#"
3827 {
3828 "name": "test.test",
3829 "type": "record",
3830 "fields": [
3831 {
3832 "name": "bar",
3833 "type": { "name": "test.foo", "type": "record", "fields": [{ "name": "id", "type": "long" }] }
3834 },
3835 { "name": "baz", "type": "test.foo" }
3836 ]
3837 }
3838 "#;
3839
3840 let schema = Schema::parse_str(schema)?;
3841
3842 let expected = r#"{"name":"test.test","type":"record","fields":[{"name":"bar","type":{"name":"test.foo","type":"record","fields":[{"name":"id","type":"long"}]}},{"name":"baz","type":"test.foo"}]}"#;
3843 assert_eq!(schema.canonical_form(), expected);
3844
3845 Ok(())
3846 }
3847
3848 #[test]
3849 fn test_read_namespace_from_name() -> TestResult {
3850 let schema = r#"
3851 {
3852 "name": "space.name",
3853 "type": "record",
3854 "fields": [
3855 {
3856 "name": "num",
3857 "type": "int"
3858 }
3859 ]
3860 }
3861 "#;
3862
3863 let schema = Schema::parse_str(schema)?;
3864 if let Schema::Record(RecordSchema { name, .. }) = schema {
3865 assert_eq!(name.name, "name");
3866 assert_eq!(name.namespace, Some("space".to_string()));
3867 } else {
3868 panic!("Expected a record schema!");
3869 }
3870
3871 Ok(())
3872 }
3873
3874 #[test]
3875 fn test_namespace_from_name_has_priority_over_from_field() -> TestResult {
3876 let schema = r#"
3877 {
3878 "name": "space1.name",
3879 "namespace": "space2",
3880 "type": "record",
3881 "fields": [
3882 {
3883 "name": "num",
3884 "type": "int"
3885 }
3886 ]
3887 }
3888 "#;
3889
3890 let schema = Schema::parse_str(schema)?;
3891 if let Schema::Record(RecordSchema { name, .. }) = schema {
3892 assert_eq!(name.namespace, Some("space1".to_string()));
3893 } else {
3894 panic!("Expected a record schema!");
3895 }
3896
3897 Ok(())
3898 }
3899
3900 #[test]
3901 fn test_namespace_from_field() -> TestResult {
3902 let schema = r#"
3903 {
3904 "name": "name",
3905 "namespace": "space2",
3906 "type": "record",
3907 "fields": [
3908 {
3909 "name": "num",
3910 "type": "int"
3911 }
3912 ]
3913 }
3914 "#;
3915
3916 let schema = Schema::parse_str(schema)?;
3917 if let Schema::Record(RecordSchema { name, .. }) = schema {
3918 assert_eq!(name.namespace, Some("space2".to_string()));
3919 } else {
3920 panic!("Expected a record schema!");
3921 }
3922
3923 Ok(())
3924 }
3925
3926 #[test]
3927 fn test_namespace_from_name_with_empty_value() -> TestResult {
3929 let name = Name::new(".name")?;
3930 assert_eq!(name.name, "name");
3931 assert_eq!(name.namespace, None);
3932
3933 Ok(())
3934 }
3935
3936 #[test]
3937 fn test_name_with_whitespace_value() {
3939 match Name::new(" ") {
3940 Err(Error::InvalidSchemaName(_, _)) => {}
3941 _ => panic!("Expected an Error::InvalidSchemaName!"),
3942 }
3943 }
3944
3945 #[test]
3946 fn test_name_with_no_name_part() {
3948 match Name::new("space.") {
3949 Err(Error::InvalidSchemaName(_, _)) => {}
3950 _ => panic!("Expected an Error::InvalidSchemaName!"),
3951 }
3952 }
3953
3954 #[test]
3955 fn avro_3448_test_proper_resolution_inner_record_inherited_namespace() -> TestResult {
3956 let schema = r#"
3957 {
3958 "name": "record_name",
3959 "namespace": "space",
3960 "type": "record",
3961 "fields": [
3962 {
3963 "name": "outer_field_1",
3964 "type": [
3965 "null",
3966 {
3967 "type":"record",
3968 "name":"inner_record_name",
3969 "fields":[
3970 {
3971 "name":"inner_field_1",
3972 "type":"double"
3973 }
3974 ]
3975 }
3976 ]
3977 },
3978 {
3979 "name": "outer_field_2",
3980 "type" : "inner_record_name"
3981 }
3982 ]
3983 }
3984 "#;
3985 let schema = Schema::parse_str(schema)?;
3986 let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
3987 assert_eq!(rs.get_names().len(), 2);
3988 for s in &["space.record_name", "space.inner_record_name"] {
3989 assert!(rs.get_names().contains_key(&Name::new(s)?));
3990 }
3991
3992 Ok(())
3993 }
3994
3995 #[test]
3996 fn avro_3448_test_proper_resolution_inner_record_qualified_namespace() -> TestResult {
3997 let schema = r#"
3998 {
3999 "name": "record_name",
4000 "namespace": "space",
4001 "type": "record",
4002 "fields": [
4003 {
4004 "name": "outer_field_1",
4005 "type": [
4006 "null",
4007 {
4008 "type":"record",
4009 "name":"inner_record_name",
4010 "fields":[
4011 {
4012 "name":"inner_field_1",
4013 "type":"double"
4014 }
4015 ]
4016 }
4017 ]
4018 },
4019 {
4020 "name": "outer_field_2",
4021 "type" : "space.inner_record_name"
4022 }
4023 ]
4024 }
4025 "#;
4026 let schema = Schema::parse_str(schema)?;
4027 let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
4028 assert_eq!(rs.get_names().len(), 2);
4029 for s in &["space.record_name", "space.inner_record_name"] {
4030 assert!(rs.get_names().contains_key(&Name::new(s)?));
4031 }
4032
4033 Ok(())
4034 }
4035
4036 #[test]
4037 fn avro_3448_test_proper_resolution_inner_enum_inherited_namespace() -> TestResult {
4038 let schema = r#"
4039 {
4040 "name": "record_name",
4041 "namespace": "space",
4042 "type": "record",
4043 "fields": [
4044 {
4045 "name": "outer_field_1",
4046 "type": [
4047 "null",
4048 {
4049 "type":"enum",
4050 "name":"inner_enum_name",
4051 "symbols":["Extensive","Testing"]
4052 }
4053 ]
4054 },
4055 {
4056 "name": "outer_field_2",
4057 "type" : "inner_enum_name"
4058 }
4059 ]
4060 }
4061 "#;
4062 let schema = Schema::parse_str(schema)?;
4063 let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
4064 assert_eq!(rs.get_names().len(), 2);
4065 for s in &["space.record_name", "space.inner_enum_name"] {
4066 assert!(rs.get_names().contains_key(&Name::new(s)?));
4067 }
4068
4069 Ok(())
4070 }
4071
4072 #[test]
4073 fn avro_3448_test_proper_resolution_inner_enum_qualified_namespace() -> TestResult {
4074 let schema = r#"
4075 {
4076 "name": "record_name",
4077 "namespace": "space",
4078 "type": "record",
4079 "fields": [
4080 {
4081 "name": "outer_field_1",
4082 "type": [
4083 "null",
4084 {
4085 "type":"enum",
4086 "name":"inner_enum_name",
4087 "symbols":["Extensive","Testing"]
4088 }
4089 ]
4090 },
4091 {
4092 "name": "outer_field_2",
4093 "type" : "space.inner_enum_name"
4094 }
4095 ]
4096 }
4097 "#;
4098 let schema = Schema::parse_str(schema)?;
4099 let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
4100 assert_eq!(rs.get_names().len(), 2);
4101 for s in &["space.record_name", "space.inner_enum_name"] {
4102 assert!(rs.get_names().contains_key(&Name::new(s)?));
4103 }
4104
4105 Ok(())
4106 }
4107
4108 #[test]
4109 fn avro_3448_test_proper_resolution_inner_fixed_inherited_namespace() -> TestResult {
4110 let schema = r#"
4111 {
4112 "name": "record_name",
4113 "namespace": "space",
4114 "type": "record",
4115 "fields": [
4116 {
4117 "name": "outer_field_1",
4118 "type": [
4119 "null",
4120 {
4121 "type":"fixed",
4122 "name":"inner_fixed_name",
4123 "size": 16
4124 }
4125 ]
4126 },
4127 {
4128 "name": "outer_field_2",
4129 "type" : "inner_fixed_name"
4130 }
4131 ]
4132 }
4133 "#;
4134 let schema = Schema::parse_str(schema)?;
4135 let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
4136 assert_eq!(rs.get_names().len(), 2);
4137 for s in &["space.record_name", "space.inner_fixed_name"] {
4138 assert!(rs.get_names().contains_key(&Name::new(s)?));
4139 }
4140
4141 Ok(())
4142 }
4143
4144 #[test]
4145 fn avro_3448_test_proper_resolution_inner_fixed_qualified_namespace() -> TestResult {
4146 let schema = r#"
4147 {
4148 "name": "record_name",
4149 "namespace": "space",
4150 "type": "record",
4151 "fields": [
4152 {
4153 "name": "outer_field_1",
4154 "type": [
4155 "null",
4156 {
4157 "type":"fixed",
4158 "name":"inner_fixed_name",
4159 "size": 16
4160 }
4161 ]
4162 },
4163 {
4164 "name": "outer_field_2",
4165 "type" : "space.inner_fixed_name"
4166 }
4167 ]
4168 }
4169 "#;
4170 let schema = Schema::parse_str(schema)?;
4171 let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
4172 assert_eq!(rs.get_names().len(), 2);
4173 for s in &["space.record_name", "space.inner_fixed_name"] {
4174 assert!(rs.get_names().contains_key(&Name::new(s)?));
4175 }
4176
4177 Ok(())
4178 }
4179
4180 #[test]
4181 fn avro_3448_test_proper_resolution_inner_record_inner_namespace() -> TestResult {
4182 let schema = r#"
4183 {
4184 "name": "record_name",
4185 "namespace": "space",
4186 "type": "record",
4187 "fields": [
4188 {
4189 "name": "outer_field_1",
4190 "type": [
4191 "null",
4192 {
4193 "type":"record",
4194 "name":"inner_record_name",
4195 "namespace":"inner_space",
4196 "fields":[
4197 {
4198 "name":"inner_field_1",
4199 "type":"double"
4200 }
4201 ]
4202 }
4203 ]
4204 },
4205 {
4206 "name": "outer_field_2",
4207 "type" : "inner_space.inner_record_name"
4208 }
4209 ]
4210 }
4211 "#;
4212 let schema = Schema::parse_str(schema)?;
4213 let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
4214 assert_eq!(rs.get_names().len(), 2);
4215 for s in &["space.record_name", "inner_space.inner_record_name"] {
4216 assert!(rs.get_names().contains_key(&Name::new(s)?));
4217 }
4218
4219 Ok(())
4220 }
4221
4222 #[test]
4223 fn avro_3448_test_proper_resolution_inner_enum_inner_namespace() -> TestResult {
4224 let schema = r#"
4225 {
4226 "name": "record_name",
4227 "namespace": "space",
4228 "type": "record",
4229 "fields": [
4230 {
4231 "name": "outer_field_1",
4232 "type": [
4233 "null",
4234 {
4235 "type":"enum",
4236 "name":"inner_enum_name",
4237 "namespace": "inner_space",
4238 "symbols":["Extensive","Testing"]
4239 }
4240 ]
4241 },
4242 {
4243 "name": "outer_field_2",
4244 "type" : "inner_space.inner_enum_name"
4245 }
4246 ]
4247 }
4248 "#;
4249 let schema = Schema::parse_str(schema)?;
4250 let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
4251 assert_eq!(rs.get_names().len(), 2);
4252 for s in &["space.record_name", "inner_space.inner_enum_name"] {
4253 assert!(rs.get_names().contains_key(&Name::new(s)?));
4254 }
4255
4256 Ok(())
4257 }
4258
4259 #[test]
4260 fn avro_3448_test_proper_resolution_inner_fixed_inner_namespace() -> TestResult {
4261 let schema = r#"
4262 {
4263 "name": "record_name",
4264 "namespace": "space",
4265 "type": "record",
4266 "fields": [
4267 {
4268 "name": "outer_field_1",
4269 "type": [
4270 "null",
4271 {
4272 "type":"fixed",
4273 "name":"inner_fixed_name",
4274 "namespace": "inner_space",
4275 "size": 16
4276 }
4277 ]
4278 },
4279 {
4280 "name": "outer_field_2",
4281 "type" : "inner_space.inner_fixed_name"
4282 }
4283 ]
4284 }
4285 "#;
4286 let schema = Schema::parse_str(schema)?;
4287 let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
4288 assert_eq!(rs.get_names().len(), 2);
4289 for s in &["space.record_name", "inner_space.inner_fixed_name"] {
4290 assert!(rs.get_names().contains_key(&Name::new(s)?));
4291 }
4292
4293 Ok(())
4294 }
4295
4296 #[test]
4297 fn avro_3448_test_proper_multi_level_resolution_inner_record_outer_namespace() -> TestResult {
4298 let schema = r#"
4299 {
4300 "name": "record_name",
4301 "namespace": "space",
4302 "type": "record",
4303 "fields": [
4304 {
4305 "name": "outer_field_1",
4306 "type": [
4307 "null",
4308 {
4309 "type":"record",
4310 "name":"middle_record_name",
4311 "fields":[
4312 {
4313 "name":"middle_field_1",
4314 "type":[
4315 "null",
4316 {
4317 "type":"record",
4318 "name":"inner_record_name",
4319 "fields":[
4320 {
4321 "name":"inner_field_1",
4322 "type":"double"
4323 }
4324 ]
4325 }
4326 ]
4327 }
4328 ]
4329 }
4330 ]
4331 },
4332 {
4333 "name": "outer_field_2",
4334 "type" : "space.inner_record_name"
4335 }
4336 ]
4337 }
4338 "#;
4339 let schema = Schema::parse_str(schema)?;
4340 let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
4341 assert_eq!(rs.get_names().len(), 3);
4342 for s in &[
4343 "space.record_name",
4344 "space.middle_record_name",
4345 "space.inner_record_name",
4346 ] {
4347 assert!(rs.get_names().contains_key(&Name::new(s)?));
4348 }
4349
4350 Ok(())
4351 }
4352
4353 #[test]
4354 fn avro_3448_test_proper_multi_level_resolution_inner_record_middle_namespace() -> TestResult {
4355 let schema = r#"
4356 {
4357 "name": "record_name",
4358 "namespace": "space",
4359 "type": "record",
4360 "fields": [
4361 {
4362 "name": "outer_field_1",
4363 "type": [
4364 "null",
4365 {
4366 "type":"record",
4367 "name":"middle_record_name",
4368 "namespace":"middle_namespace",
4369 "fields":[
4370 {
4371 "name":"middle_field_1",
4372 "type":[
4373 "null",
4374 {
4375 "type":"record",
4376 "name":"inner_record_name",
4377 "fields":[
4378 {
4379 "name":"inner_field_1",
4380 "type":"double"
4381 }
4382 ]
4383 }
4384 ]
4385 }
4386 ]
4387 }
4388 ]
4389 },
4390 {
4391 "name": "outer_field_2",
4392 "type" : "middle_namespace.inner_record_name"
4393 }
4394 ]
4395 }
4396 "#;
4397 let schema = Schema::parse_str(schema)?;
4398 let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
4399 assert_eq!(rs.get_names().len(), 3);
4400 for s in &[
4401 "space.record_name",
4402 "middle_namespace.middle_record_name",
4403 "middle_namespace.inner_record_name",
4404 ] {
4405 assert!(rs.get_names().contains_key(&Name::new(s)?));
4406 }
4407
4408 Ok(())
4409 }
4410
4411 #[test]
4412 fn avro_3448_test_proper_multi_level_resolution_inner_record_inner_namespace() -> TestResult {
4413 let schema = r#"
4414 {
4415 "name": "record_name",
4416 "namespace": "space",
4417 "type": "record",
4418 "fields": [
4419 {
4420 "name": "outer_field_1",
4421 "type": [
4422 "null",
4423 {
4424 "type":"record",
4425 "name":"middle_record_name",
4426 "namespace":"middle_namespace",
4427 "fields":[
4428 {
4429 "name":"middle_field_1",
4430 "type":[
4431 "null",
4432 {
4433 "type":"record",
4434 "name":"inner_record_name",
4435 "namespace":"inner_namespace",
4436 "fields":[
4437 {
4438 "name":"inner_field_1",
4439 "type":"double"
4440 }
4441 ]
4442 }
4443 ]
4444 }
4445 ]
4446 }
4447 ]
4448 },
4449 {
4450 "name": "outer_field_2",
4451 "type" : "inner_namespace.inner_record_name"
4452 }
4453 ]
4454 }
4455 "#;
4456 let schema = Schema::parse_str(schema)?;
4457 let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
4458 assert_eq!(rs.get_names().len(), 3);
4459 for s in &[
4460 "space.record_name",
4461 "middle_namespace.middle_record_name",
4462 "inner_namespace.inner_record_name",
4463 ] {
4464 assert!(rs.get_names().contains_key(&Name::new(s)?));
4465 }
4466
4467 Ok(())
4468 }
4469
4470 #[test]
4471 fn avro_3448_test_proper_in_array_resolution_inherited_namespace() -> TestResult {
4472 let schema = r#"
4473 {
4474 "name": "record_name",
4475 "namespace": "space",
4476 "type": "record",
4477 "fields": [
4478 {
4479 "name": "outer_field_1",
4480 "type": {
4481 "type":"array",
4482 "items":{
4483 "type":"record",
4484 "name":"in_array_record",
4485 "fields": [
4486 {
4487 "name":"array_record_field",
4488 "type":"string"
4489 }
4490 ]
4491 }
4492 }
4493 },
4494 {
4495 "name":"outer_field_2",
4496 "type":"in_array_record"
4497 }
4498 ]
4499 }
4500 "#;
4501 let schema = Schema::parse_str(schema)?;
4502 let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
4503 assert_eq!(rs.get_names().len(), 2);
4504 for s in &["space.record_name", "space.in_array_record"] {
4505 assert!(rs.get_names().contains_key(&Name::new(s)?));
4506 }
4507
4508 Ok(())
4509 }
4510
4511 #[test]
4512 fn avro_3448_test_proper_in_map_resolution_inherited_namespace() -> TestResult {
4513 let schema = r#"
4514 {
4515 "name": "record_name",
4516 "namespace": "space",
4517 "type": "record",
4518 "fields": [
4519 {
4520 "name": "outer_field_1",
4521 "type": {
4522 "type":"map",
4523 "values":{
4524 "type":"record",
4525 "name":"in_map_record",
4526 "fields": [
4527 {
4528 "name":"map_record_field",
4529 "type":"string"
4530 }
4531 ]
4532 }
4533 }
4534 },
4535 {
4536 "name":"outer_field_2",
4537 "type":"in_map_record"
4538 }
4539 ]
4540 }
4541 "#;
4542 let schema = Schema::parse_str(schema)?;
4543 let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
4544 assert_eq!(rs.get_names().len(), 2);
4545 for s in &["space.record_name", "space.in_map_record"] {
4546 assert!(rs.get_names().contains_key(&Name::new(s)?));
4547 }
4548
4549 Ok(())
4550 }
4551
4552 #[test]
4553 fn avro_3466_test_to_json_inner_enum_inner_namespace() -> TestResult {
4554 let schema = r#"
4555 {
4556 "name": "record_name",
4557 "namespace": "space",
4558 "type": "record",
4559 "fields": [
4560 {
4561 "name": "outer_field_1",
4562 "type": [
4563 "null",
4564 {
4565 "type":"enum",
4566 "name":"inner_enum_name",
4567 "namespace": "inner_space",
4568 "symbols":["Extensive","Testing"]
4569 }
4570 ]
4571 },
4572 {
4573 "name": "outer_field_2",
4574 "type" : "inner_space.inner_enum_name"
4575 }
4576 ]
4577 }
4578 "#;
4579 let schema = Schema::parse_str(schema)?;
4580 let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
4581
4582 assert_eq!(rs.get_names().len(), 2);
4584 for s in &["space.record_name", "inner_space.inner_enum_name"] {
4585 assert!(rs.get_names().contains_key(&Name::new(s)?));
4586 }
4587
4588 let schema_str = serde_json::to_string(&schema).expect("test failed");
4590 let _schema = Schema::parse_str(&schema_str).expect("test failed");
4591 assert_eq!(schema, _schema);
4592
4593 Ok(())
4594 }
4595
4596 #[test]
4597 fn avro_3466_test_to_json_inner_fixed_inner_namespace() -> TestResult {
4598 let schema = r#"
4599 {
4600 "name": "record_name",
4601 "namespace": "space",
4602 "type": "record",
4603 "fields": [
4604 {
4605 "name": "outer_field_1",
4606 "type": [
4607 "null",
4608 {
4609 "type":"fixed",
4610 "name":"inner_fixed_name",
4611 "namespace": "inner_space",
4612 "size":54
4613 }
4614 ]
4615 },
4616 {
4617 "name": "outer_field_2",
4618 "type" : "inner_space.inner_fixed_name"
4619 }
4620 ]
4621 }
4622 "#;
4623 let schema = Schema::parse_str(schema)?;
4624 let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
4625
4626 assert_eq!(rs.get_names().len(), 2);
4628 for s in &["space.record_name", "inner_space.inner_fixed_name"] {
4629 assert!(rs.get_names().contains_key(&Name::new(s)?));
4630 }
4631
4632 let schema_str = serde_json::to_string(&schema).expect("test failed");
4634 let _schema = Schema::parse_str(&schema_str).expect("test failed");
4635 assert_eq!(schema, _schema);
4636
4637 Ok(())
4638 }
4639
4640 fn assert_avro_3512_aliases(aliases: &Aliases) {
4641 match aliases {
4642 Some(aliases) => {
4643 assert_eq!(aliases.len(), 3);
4644 assert_eq!(aliases[0], Alias::new("space.b").unwrap());
4645 assert_eq!(aliases[1], Alias::new("x.y").unwrap());
4646 assert_eq!(aliases[2], Alias::new(".c").unwrap());
4647 }
4648 None => {
4649 panic!("'aliases' must be Some");
4650 }
4651 }
4652 }
4653
4654 #[test]
4655 fn avro_3512_alias_with_null_namespace_record() -> TestResult {
4656 let schema = Schema::parse_str(
4657 r#"
4658 {
4659 "type": "record",
4660 "name": "a",
4661 "namespace": "space",
4662 "aliases": ["b", "x.y", ".c"],
4663 "fields" : [
4664 {"name": "time", "type": "long"}
4665 ]
4666 }
4667 "#,
4668 )?;
4669
4670 if let Schema::Record(RecordSchema { ref aliases, .. }) = schema {
4671 assert_avro_3512_aliases(aliases);
4672 } else {
4673 panic!("The Schema should be a record: {schema:?}");
4674 }
4675
4676 Ok(())
4677 }
4678
4679 #[test]
4680 fn avro_3512_alias_with_null_namespace_enum() -> TestResult {
4681 let schema = Schema::parse_str(
4682 r#"
4683 {
4684 "type": "enum",
4685 "name": "a",
4686 "namespace": "space",
4687 "aliases": ["b", "x.y", ".c"],
4688 "symbols" : [
4689 "symbol1", "symbol2"
4690 ]
4691 }
4692 "#,
4693 )?;
4694
4695 if let Schema::Enum(EnumSchema { ref aliases, .. }) = schema {
4696 assert_avro_3512_aliases(aliases);
4697 } else {
4698 panic!("The Schema should be an enum: {schema:?}");
4699 }
4700
4701 Ok(())
4702 }
4703
4704 #[test]
4705 fn avro_3512_alias_with_null_namespace_fixed() -> TestResult {
4706 let schema = Schema::parse_str(
4707 r#"
4708 {
4709 "type": "fixed",
4710 "name": "a",
4711 "namespace": "space",
4712 "aliases": ["b", "x.y", ".c"],
4713 "size" : 12
4714 }
4715 "#,
4716 )?;
4717
4718 if let Schema::Fixed(FixedSchema { ref aliases, .. }) = schema {
4719 assert_avro_3512_aliases(aliases);
4720 } else {
4721 panic!("The Schema should be a fixed: {schema:?}");
4722 }
4723
4724 Ok(())
4725 }
4726
4727 #[test]
4728 fn avro_3518_serialize_aliases_record() -> TestResult {
4729 let schema = Schema::parse_str(
4730 r#"
4731 {
4732 "type": "record",
4733 "name": "a",
4734 "namespace": "space",
4735 "aliases": ["b", "x.y", ".c"],
4736 "fields" : [
4737 {
4738 "name": "time",
4739 "type": "long",
4740 "doc": "The documentation is not serialized",
4741 "default": 123,
4742 "aliases": ["time1", "ns.time2"]
4743 }
4744 ]
4745 }
4746 "#,
4747 )?;
4748
4749 let value = serde_json::to_value(&schema)?;
4750 let serialized = serde_json::to_string(&value)?;
4751 assert_eq!(
4752 r#"{"aliases":["space.b","x.y","c"],"fields":[{"aliases":["time1","ns.time2"],"default":123,"name":"time","type":"long"}],"name":"a","namespace":"space","type":"record"}"#,
4753 &serialized
4754 );
4755 assert_eq!(schema, Schema::parse_str(&serialized)?);
4756
4757 Ok(())
4758 }
4759
4760 #[test]
4761 fn avro_3518_serialize_aliases_enum() -> TestResult {
4762 let schema = Schema::parse_str(
4763 r#"
4764 {
4765 "type": "enum",
4766 "name": "a",
4767 "namespace": "space",
4768 "aliases": ["b", "x.y", ".c"],
4769 "symbols" : [
4770 "symbol1", "symbol2"
4771 ]
4772 }
4773 "#,
4774 )?;
4775
4776 let value = serde_json::to_value(&schema)?;
4777 let serialized = serde_json::to_string(&value)?;
4778 assert_eq!(
4779 r#"{"aliases":["space.b","x.y","c"],"name":"a","namespace":"space","symbols":["symbol1","symbol2"],"type":"enum"}"#,
4780 &serialized
4781 );
4782 assert_eq!(schema, Schema::parse_str(&serialized)?);
4783
4784 Ok(())
4785 }
4786
4787 #[test]
4788 fn avro_3518_serialize_aliases_fixed() -> TestResult {
4789 let schema = Schema::parse_str(
4790 r#"
4791 {
4792 "type": "fixed",
4793 "name": "a",
4794 "namespace": "space",
4795 "aliases": ["b", "x.y", ".c"],
4796 "size" : 12
4797 }
4798 "#,
4799 )?;
4800
4801 let value = serde_json::to_value(&schema)?;
4802 let serialized = serde_json::to_string(&value)?;
4803 assert_eq!(
4804 r#"{"aliases":["space.b","x.y","c"],"name":"a","namespace":"space","size":12,"type":"fixed"}"#,
4805 &serialized
4806 );
4807 assert_eq!(schema, Schema::parse_str(&serialized)?);
4808
4809 Ok(())
4810 }
4811
4812 #[test]
4813 fn avro_3130_parse_anonymous_union_type() -> TestResult {
4814 let schema_str = r#"
4815 {
4816 "type": "record",
4817 "name": "AccountEvent",
4818 "fields": [
4819 {"type":
4820 ["null",
4821 { "name": "accountList",
4822 "type": {
4823 "type": "array",
4824 "items": "long"
4825 }
4826 }
4827 ],
4828 "name":"NullableLongArray"
4829 }
4830 ]
4831 }
4832 "#;
4833 let schema = Schema::parse_str(schema_str)?;
4834
4835 if let Schema::Record(RecordSchema { name, fields, .. }) = schema {
4836 assert_eq!(name, Name::new("AccountEvent")?);
4837
4838 let field = &fields[0];
4839 assert_eq!(&field.name, "NullableLongArray");
4840
4841 if let Schema::Union(ref union) = field.schema {
4842 assert_eq!(union.schemas[0], Schema::Null);
4843
4844 if let Schema::Array(ref array_schema) = union.schemas[1] {
4845 if let Schema::Long = *array_schema.items {
4846 } else {
4848 panic!("Expected a Schema::Array of type Long");
4849 }
4850 } else {
4851 panic!("Expected Schema::Array");
4852 }
4853 } else {
4854 panic!("Expected Schema::Union");
4855 }
4856 } else {
4857 panic!("Expected Schema::Record");
4858 }
4859
4860 Ok(())
4861 }
4862
4863 #[test]
4864 fn avro_custom_attributes_schema_without_attributes() -> TestResult {
4865 let schemata_str = [
4866 r#"
4867 {
4868 "type": "record",
4869 "name": "Rec",
4870 "doc": "A Record schema without custom attributes",
4871 "fields": []
4872 }
4873 "#,
4874 r#"
4875 {
4876 "type": "enum",
4877 "name": "Enum",
4878 "doc": "An Enum schema without custom attributes",
4879 "symbols": []
4880 }
4881 "#,
4882 r#"
4883 {
4884 "type": "fixed",
4885 "name": "Fixed",
4886 "doc": "A Fixed schema without custom attributes",
4887 "size": 0
4888 }
4889 "#,
4890 ];
4891 for schema_str in schemata_str.iter() {
4892 let schema = Schema::parse_str(schema_str)?;
4893 assert_eq!(schema.custom_attributes(), Some(&Default::default()));
4894 }
4895
4896 Ok(())
4897 }
4898
4899 const CUSTOM_ATTRS_SUFFIX: &str = r#"
4900 "string_key": "value",
4901 "number_key": 1.23,
4902 "null_key": null,
4903 "array_key": [1, 2, 3],
4904 "object_key": {
4905 "key": "value"
4906 }
4907 "#;
4908
4909 #[test]
4910 fn avro_3609_custom_attributes_schema_with_attributes() -> TestResult {
4911 let schemata_str = [
4912 r#"
4913 {
4914 "type": "record",
4915 "name": "Rec",
4916 "namespace": "ns",
4917 "doc": "A Record schema with custom attributes",
4918 "fields": [],
4919 {{{}}}
4920 }
4921 "#,
4922 r#"
4923 {
4924 "type": "enum",
4925 "name": "Enum",
4926 "namespace": "ns",
4927 "doc": "An Enum schema with custom attributes",
4928 "symbols": [],
4929 {{{}}}
4930 }
4931 "#,
4932 r#"
4933 {
4934 "type": "fixed",
4935 "name": "Fixed",
4936 "namespace": "ns",
4937 "doc": "A Fixed schema with custom attributes",
4938 "size": 2,
4939 {{{}}}
4940 }
4941 "#,
4942 ];
4943
4944 for schema_str in schemata_str.iter() {
4945 let schema = Schema::parse_str(
4946 schema_str
4947 .to_owned()
4948 .replace("{{{}}}", CUSTOM_ATTRS_SUFFIX)
4949 .as_str(),
4950 )?;
4951
4952 assert_eq!(
4953 schema.custom_attributes(),
4954 Some(&expected_custom_attributes())
4955 );
4956 }
4957
4958 Ok(())
4959 }
4960
4961 fn expected_custom_attributes() -> BTreeMap<String, Value> {
4962 let mut expected_attributes: BTreeMap<String, Value> = Default::default();
4963 expected_attributes.insert("string_key".to_string(), Value::String("value".to_string()));
4964 expected_attributes.insert("number_key".to_string(), json!(1.23));
4965 expected_attributes.insert("null_key".to_string(), Value::Null);
4966 expected_attributes.insert(
4967 "array_key".to_string(),
4968 Value::Array(vec![json!(1), json!(2), json!(3)]),
4969 );
4970 let mut object_value: HashMap<String, Value> = HashMap::new();
4971 object_value.insert("key".to_string(), Value::String("value".to_string()));
4972 expected_attributes.insert("object_key".to_string(), json!(object_value));
4973 expected_attributes
4974 }
4975
4976 #[test]
4977 fn avro_3609_custom_attributes_record_field_without_attributes() -> TestResult {
4978 let schema_str = String::from(
4979 r#"
4980 {
4981 "type": "record",
4982 "name": "Rec",
4983 "doc": "A Record schema without custom attributes",
4984 "fields": [
4985 {
4986 "name": "field_one",
4987 "type": "float",
4988 {{{}}}
4989 }
4990 ]
4991 }
4992 "#,
4993 );
4994
4995 let schema = Schema::parse_str(schema_str.replace("{{{}}}", CUSTOM_ATTRS_SUFFIX).as_str())?;
4996
4997 match schema {
4998 Schema::Record(RecordSchema { name, fields, .. }) => {
4999 assert_eq!(name, Name::new("Rec")?);
5000 assert_eq!(fields.len(), 1);
5001 let field = &fields[0];
5002 assert_eq!(&field.name, "field_one");
5003 assert_eq!(field.custom_attributes, expected_custom_attributes());
5004 }
5005 _ => panic!("Expected Schema::Record"),
5006 }
5007
5008 Ok(())
5009 }
5010
5011 #[test]
5012 fn avro_3625_null_is_first() -> TestResult {
5013 let schema_str = String::from(
5014 r#"
5015 {
5016 "type": "record",
5017 "name": "union_schema_test",
5018 "fields": [
5019 {"name": "a", "type": ["null", "long"], "default": null}
5020 ]
5021 }
5022 "#,
5023 );
5024
5025 let schema = Schema::parse_str(&schema_str)?;
5026
5027 match schema {
5028 Schema::Record(RecordSchema { name, fields, .. }) => {
5029 assert_eq!(name, Name::new("union_schema_test")?);
5030 assert_eq!(fields.len(), 1);
5031 let field = &fields[0];
5032 assert_eq!(&field.name, "a");
5033 assert_eq!(&field.default, &Some(Value::Null));
5034 match &field.schema {
5035 Schema::Union(union) => {
5036 assert_eq!(union.variants().len(), 2);
5037 assert!(union.is_nullable());
5038 assert_eq!(union.variants()[0], Schema::Null);
5039 assert_eq!(union.variants()[1], Schema::Long);
5040 }
5041 _ => panic!("Expected Schema::Union"),
5042 }
5043 }
5044 _ => panic!("Expected Schema::Record"),
5045 }
5046
5047 Ok(())
5048 }
5049
5050 #[test]
5051 fn avro_3625_null_is_last() -> TestResult {
5052 let schema_str = String::from(
5053 r#"
5054 {
5055 "type": "record",
5056 "name": "union_schema_test",
5057 "fields": [
5058 {"name": "a", "type": ["long","null"], "default": 123}
5059 ]
5060 }
5061 "#,
5062 );
5063
5064 let schema = Schema::parse_str(&schema_str)?;
5065
5066 match schema {
5067 Schema::Record(RecordSchema { name, fields, .. }) => {
5068 assert_eq!(name, Name::new("union_schema_test")?);
5069 assert_eq!(fields.len(), 1);
5070 let field = &fields[0];
5071 assert_eq!(&field.name, "a");
5072 assert_eq!(&field.default, &Some(json!(123)));
5073 match &field.schema {
5074 Schema::Union(union) => {
5075 assert_eq!(union.variants().len(), 2);
5076 assert_eq!(union.variants()[0], Schema::Long);
5077 assert_eq!(union.variants()[1], Schema::Null);
5078 }
5079 _ => panic!("Expected Schema::Union"),
5080 }
5081 }
5082 _ => panic!("Expected Schema::Record"),
5083 }
5084
5085 Ok(())
5086 }
5087
5088 #[test]
5089 fn avro_3625_null_is_the_middle() -> TestResult {
5090 let schema_str = String::from(
5091 r#"
5092 {
5093 "type": "record",
5094 "name": "union_schema_test",
5095 "fields": [
5096 {"name": "a", "type": ["long","null","int"], "default": 123}
5097 ]
5098 }
5099 "#,
5100 );
5101
5102 let schema = Schema::parse_str(&schema_str)?;
5103
5104 match schema {
5105 Schema::Record(RecordSchema { name, fields, .. }) => {
5106 assert_eq!(name, Name::new("union_schema_test")?);
5107 assert_eq!(fields.len(), 1);
5108 let field = &fields[0];
5109 assert_eq!(&field.name, "a");
5110 assert_eq!(&field.default, &Some(json!(123)));
5111 match &field.schema {
5112 Schema::Union(union) => {
5113 assert_eq!(union.variants().len(), 3);
5114 assert_eq!(union.variants()[0], Schema::Long);
5115 assert_eq!(union.variants()[1], Schema::Null);
5116 assert_eq!(union.variants()[2], Schema::Int);
5117 }
5118 _ => panic!("Expected Schema::Union"),
5119 }
5120 }
5121 _ => panic!("Expected Schema::Record"),
5122 }
5123
5124 Ok(())
5125 }
5126
5127 #[test]
5128 fn avro_3649_default_notintfirst() -> TestResult {
5129 let schema_str = String::from(
5130 r#"
5131 {
5132 "type": "record",
5133 "name": "union_schema_test",
5134 "fields": [
5135 {"name": "a", "type": ["string", "int"], "default": 123}
5136 ]
5137 }
5138 "#,
5139 );
5140
5141 let schema = Schema::parse_str(&schema_str)?;
5142
5143 match schema {
5144 Schema::Record(RecordSchema { name, fields, .. }) => {
5145 assert_eq!(name, Name::new("union_schema_test")?);
5146 assert_eq!(fields.len(), 1);
5147 let field = &fields[0];
5148 assert_eq!(&field.name, "a");
5149 assert_eq!(&field.default, &Some(json!(123)));
5150 match &field.schema {
5151 Schema::Union(union) => {
5152 assert_eq!(union.variants().len(), 2);
5153 assert_eq!(union.variants()[0], Schema::String);
5154 assert_eq!(union.variants()[1], Schema::Int);
5155 }
5156 _ => panic!("Expected Schema::Union"),
5157 }
5158 }
5159 _ => panic!("Expected Schema::Record"),
5160 }
5161
5162 Ok(())
5163 }
5164
5165 #[test]
5166 fn avro_3709_parsing_of_record_field_aliases() -> TestResult {
5167 let schema = r#"
5168 {
5169 "name": "rec",
5170 "type": "record",
5171 "fields": [
5172 {
5173 "name": "num",
5174 "type": "int",
5175 "aliases": ["num1", "num2"]
5176 }
5177 ]
5178 }
5179 "#;
5180
5181 let schema = Schema::parse_str(schema)?;
5182 if let Schema::Record(RecordSchema { fields, .. }) = schema {
5183 let num_field = &fields[0];
5184 assert_eq!(num_field.name, "num");
5185 assert_eq!(num_field.aliases, Some(vec!("num1".into(), "num2".into())));
5186 } else {
5187 panic!("Expected a record schema!");
5188 }
5189
5190 Ok(())
5191 }
5192
5193 #[test]
5194 fn avro_3735_parse_enum_namespace() -> TestResult {
5195 let schema = r#"
5196 {
5197 "type": "record",
5198 "name": "Foo",
5199 "namespace": "name.space",
5200 "fields":
5201 [
5202 {
5203 "name": "barInit",
5204 "type":
5205 {
5206 "type": "enum",
5207 "name": "Bar",
5208 "symbols":
5209 [
5210 "bar0",
5211 "bar1"
5212 ]
5213 }
5214 },
5215 {
5216 "name": "barUse",
5217 "type": "Bar"
5218 }
5219 ]
5220 }
5221 "#;
5222
5223 #[derive(
5224 Debug, PartialEq, Eq, Hash, PartialOrd, Ord, Clone, serde::Deserialize, serde::Serialize,
5225 )]
5226 pub enum Bar {
5227 #[serde(rename = "bar0")]
5228 Bar0,
5229 #[serde(rename = "bar1")]
5230 Bar1,
5231 }
5232
5233 #[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize)]
5234 pub struct Foo {
5235 #[serde(rename = "barInit")]
5236 pub bar_init: Bar,
5237 #[serde(rename = "barUse")]
5238 pub bar_use: Bar,
5239 }
5240
5241 let schema = Schema::parse_str(schema)?;
5242
5243 let foo = Foo {
5244 bar_init: Bar::Bar0,
5245 bar_use: Bar::Bar1,
5246 };
5247
5248 let avro_value = crate::to_value(foo)?;
5249 assert!(avro_value.validate(&schema));
5250
5251 let mut writer = crate::Writer::new(&schema, Vec::new());
5252
5253 writer.append(avro_value)?;
5255
5256 Ok(())
5257 }
5258
5259 #[test]
5260 fn avro_3755_deserialize() -> TestResult {
5261 #[derive(
5262 Debug, PartialEq, Eq, Hash, PartialOrd, Ord, Clone, serde::Deserialize, serde::Serialize,
5263 )]
5264 pub enum Bar {
5265 #[serde(rename = "bar0")]
5266 Bar0,
5267 #[serde(rename = "bar1")]
5268 Bar1,
5269 #[serde(rename = "bar2")]
5270 Bar2,
5271 }
5272
5273 #[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize)]
5274 pub struct Foo {
5275 #[serde(rename = "barInit")]
5276 pub bar_init: Bar,
5277 #[serde(rename = "barUse")]
5278 pub bar_use: Bar,
5279 }
5280
5281 let writer_schema = r#"{
5282 "type": "record",
5283 "name": "Foo",
5284 "fields":
5285 [
5286 {
5287 "name": "barInit",
5288 "type":
5289 {
5290 "type": "enum",
5291 "name": "Bar",
5292 "symbols":
5293 [
5294 "bar0",
5295 "bar1"
5296 ]
5297 }
5298 },
5299 {
5300 "name": "barUse",
5301 "type": "Bar"
5302 }
5303 ]
5304 }"#;
5305
5306 let reader_schema = r#"{
5307 "type": "record",
5308 "name": "Foo",
5309 "namespace": "name.space",
5310 "fields":
5311 [
5312 {
5313 "name": "barInit",
5314 "type":
5315 {
5316 "type": "enum",
5317 "name": "Bar",
5318 "symbols":
5319 [
5320 "bar0",
5321 "bar1",
5322 "bar2"
5323 ]
5324 }
5325 },
5326 {
5327 "name": "barUse",
5328 "type": "Bar"
5329 }
5330 ]
5331 }"#;
5332
5333 let writer_schema = Schema::parse_str(writer_schema)?;
5334 let foo = Foo {
5335 bar_init: Bar::Bar0,
5336 bar_use: Bar::Bar1,
5337 };
5338 let avro_value = crate::to_value(foo)?;
5339 assert!(
5340 avro_value.validate(&writer_schema),
5341 "value is valid for schema",
5342 );
5343 let datum = crate::to_avro_datum(&writer_schema, avro_value)?;
5344 let mut x = &datum[..];
5345 let reader_schema = Schema::parse_str(reader_schema)?;
5346 let deser_value = crate::from_avro_datum(&writer_schema, &mut x, Some(&reader_schema))?;
5347 match deser_value {
5348 types::Value::Record(fields) => {
5349 assert_eq!(fields.len(), 2);
5350 assert_eq!(fields[0].0, "barInit");
5351 assert_eq!(fields[0].1, types::Value::Enum(0, "bar0".to_string()));
5352 assert_eq!(fields[1].0, "barUse");
5353 assert_eq!(fields[1].1, types::Value::Enum(1, "bar1".to_string()));
5354 }
5355 _ => panic!("Expected Value::Record"),
5356 }
5357
5358 Ok(())
5359 }
5360
5361 #[test]
5362 fn test_avro_3780_decimal_schema_type_with_fixed() -> TestResult {
5363 let schema = json!(
5364 {
5365 "type": "record",
5366 "name": "recordWithDecimal",
5367 "fields": [
5368 {
5369 "name": "decimal",
5370 "type": "fixed",
5371 "name": "nestedFixed",
5372 "size": 8,
5373 "logicalType": "decimal",
5374 "precision": 4
5375 }
5376 ]
5377 });
5378
5379 let parse_result = Schema::parse(&schema);
5380 assert!(
5381 parse_result.is_ok(),
5382 "parse result must be ok, got: {:?}",
5383 parse_result
5384 );
5385
5386 Ok(())
5387 }
5388
5389 #[test]
5390 fn test_avro_3772_enum_default_wrong_type() -> TestResult {
5391 let schema = r#"
5392 {
5393 "type": "record",
5394 "name": "test",
5395 "fields": [
5396 {"name": "a", "type": "long", "default": 42},
5397 {"name": "b", "type": "string"},
5398 {
5399 "name": "c",
5400 "type": {
5401 "type": "enum",
5402 "name": "suit",
5403 "symbols": ["diamonds", "spades", "clubs", "hearts"],
5404 "default": 123
5405 }
5406 }
5407 ]
5408 }
5409 "#;
5410
5411 match Schema::parse_str(schema) {
5412 Err(err) => {
5413 assert_eq!(
5414 err.to_string(),
5415 "Default value for enum must be a string! Got: 123"
5416 );
5417 }
5418 _ => panic!("Expected an error"),
5419 }
5420 Ok(())
5421 }
5422
5423 #[test]
5424 fn test_avro_3812_handle_null_namespace_properly() -> TestResult {
5425 let schema_str = r#"
5426 {
5427 "namespace": "",
5428 "type": "record",
5429 "name": "my_schema",
5430 "fields": [
5431 {
5432 "name": "a",
5433 "type": {
5434 "type": "enum",
5435 "name": "my_enum",
5436 "namespace": "",
5437 "symbols": ["a", "b"]
5438 }
5439 }, {
5440 "name": "b",
5441 "type": {
5442 "type": "fixed",
5443 "name": "my_fixed",
5444 "namespace": "",
5445 "size": 10
5446 }
5447 }
5448 ]
5449 }
5450 "#;
5451
5452 let expected = r#"{"name":"my_schema","type":"record","fields":[{"name":"a","type":{"name":"my_enum","type":"enum","symbols":["a","b"]}},{"name":"b","type":{"name":"my_fixed","type":"fixed","size":10}}]}"#;
5453 let schema = Schema::parse_str(schema_str)?;
5454 let canonical_form = schema.canonical_form();
5455 assert_eq!(canonical_form, expected);
5456
5457 let name = Name::new("my_name")?;
5458 let fullname = name.fullname(Some("".to_string()));
5459 assert_eq!(fullname, "my_name");
5460 let qname = name.fully_qualified_name(&Some("".to_string())).to_string();
5461 assert_eq!(qname, "my_name");
5462
5463 Ok(())
5464 }
5465
5466 #[test]
5467 fn test_avro_3818_inherit_enclosing_namespace() -> TestResult {
5468 let schema_str = r#"
5470 {
5471 "namespace": "my_ns",
5472 "type": "record",
5473 "name": "my_schema",
5474 "fields": [
5475 {
5476 "name": "f1",
5477 "type": {
5478 "name": "enum1",
5479 "type": "enum",
5480 "symbols": ["a"]
5481 }
5482 }, {
5483 "name": "f2",
5484 "type": {
5485 "name": "fixed1",
5486 "type": "fixed",
5487 "size": 1
5488 }
5489 }
5490 ]
5491 }
5492 "#;
5493
5494 let expected = r#"{"name":"my_ns.my_schema","type":"record","fields":[{"name":"f1","type":{"name":"my_ns.enum1","type":"enum","symbols":["a"]}},{"name":"f2","type":{"name":"my_ns.fixed1","type":"fixed","size":1}}]}"#;
5495 let schema = Schema::parse_str(schema_str)?;
5496 let canonical_form = schema.canonical_form();
5497 assert_eq!(canonical_form, expected);
5498
5499 let schema_str = r#"
5502 {
5503 "namespace": "my_ns",
5504 "type": "record",
5505 "name": "my_schema",
5506 "fields": [
5507 {
5508 "name": "f1",
5509 "type": {
5510 "name": "enum1",
5511 "type": "enum",
5512 "namespace": "",
5513 "symbols": ["a"]
5514 }
5515 }, {
5516 "name": "f2",
5517 "type": {
5518 "name": "fixed1",
5519 "type": "fixed",
5520 "namespace": "",
5521 "size": 1
5522 }
5523 }
5524 ]
5525 }
5526 "#;
5527
5528 let expected = r#"{"name":"my_ns.my_schema","type":"record","fields":[{"name":"f1","type":{"name":"enum1","type":"enum","symbols":["a"]}},{"name":"f2","type":{"name":"fixed1","type":"fixed","size":1}}]}"#;
5529 let schema = Schema::parse_str(schema_str)?;
5530 let canonical_form = schema.canonical_form();
5531 assert_eq!(canonical_form, expected);
5532
5533 let schema_str = r#"
5535 {
5536 "namespace": "",
5537 "type": "record",
5538 "name": "my_schema",
5539 "fields": [
5540 {
5541 "name": "f1",
5542 "type": {
5543 "name": "enum1",
5544 "type": "enum",
5545 "namespace": "f1.ns",
5546 "symbols": ["a"]
5547 }
5548 }, {
5549 "name": "f2",
5550 "type": {
5551 "name": "f2.ns.fixed1",
5552 "type": "fixed",
5553 "size": 1
5554 }
5555 }
5556 ]
5557 }
5558 "#;
5559
5560 let expected = r#"{"name":"my_schema","type":"record","fields":[{"name":"f1","type":{"name":"f1.ns.enum1","type":"enum","symbols":["a"]}},{"name":"f2","type":{"name":"f2.ns.fixed1","type":"fixed","size":1}}]}"#;
5561 let schema = Schema::parse_str(schema_str)?;
5562 let canonical_form = schema.canonical_form();
5563 assert_eq!(canonical_form, expected);
5564
5565 let schema_str = r#"
5567 {
5568 "type": "record",
5569 "name": "my_ns.my_schema",
5570 "fields": [
5571 {
5572 "name": "f1",
5573 "type": {
5574 "name": "inner_record1",
5575 "type": "record",
5576 "fields": [
5577 {
5578 "name": "f1_1",
5579 "type": {
5580 "name": "enum1",
5581 "type": "enum",
5582 "symbols": ["a"]
5583 }
5584 }
5585 ]
5586 }
5587 }, {
5588 "name": "f2",
5589 "type": {
5590 "name": "inner_record2",
5591 "type": "record",
5592 "namespace": "inner_ns",
5593 "fields": [
5594 {
5595 "name": "f2_1",
5596 "type": {
5597 "name": "enum2",
5598 "type": "enum",
5599 "symbols": ["a"]
5600 }
5601 }
5602 ]
5603 }
5604 }
5605 ]
5606 }
5607 "#;
5608
5609 let expected = r#"{"name":"my_ns.my_schema","type":"record","fields":[{"name":"f1","type":{"name":"my_ns.inner_record1","type":"record","fields":[{"name":"f1_1","type":{"name":"my_ns.enum1","type":"enum","symbols":["a"]}}]}},{"name":"f2","type":{"name":"inner_ns.inner_record2","type":"record","fields":[{"name":"f2_1","type":{"name":"inner_ns.enum2","type":"enum","symbols":["a"]}}]}}]}"#;
5610 let schema = Schema::parse_str(schema_str)?;
5611 let canonical_form = schema.canonical_form();
5612 assert_eq!(canonical_form, expected);
5613
5614 Ok(())
5615 }
5616
5617 #[test]
5618 fn test_avro_3779_bigdecimal_schema() -> TestResult {
5619 let schema = json!(
5620 {
5621 "name": "decimal",
5622 "type": "bytes",
5623 "logicalType": "big-decimal"
5624 }
5625 );
5626
5627 let parse_result = Schema::parse(&schema);
5628 assert!(
5629 parse_result.is_ok(),
5630 "parse result must be ok, got: {:?}",
5631 parse_result
5632 );
5633 match parse_result? {
5634 Schema::BigDecimal => (),
5635 other => panic!("Expected Schema::BigDecimal but got: {other:?}"),
5636 }
5637
5638 Ok(())
5639 }
5640
5641 #[test]
5642 fn test_avro_3820_deny_invalid_field_names() -> TestResult {
5643 let schema_str = r#"
5644 {
5645 "name": "my_record",
5646 "type": "record",
5647 "fields": [
5648 {
5649 "name": "f1.x",
5650 "type": {
5651 "name": "my_enum",
5652 "type": "enum",
5653 "symbols": ["a"]
5654 }
5655 }, {
5656 "name": "f2",
5657 "type": {
5658 "name": "my_fixed",
5659 "type": "fixed",
5660 "size": 1
5661 }
5662 }
5663 ]
5664 }
5665 "#;
5666
5667 match Schema::parse_str(schema_str) {
5668 Err(Error::FieldName(x)) if x == "f1.x" => Ok(()),
5669 other => Err(format!("Expected Error::FieldName, got {other:?}").into()),
5670 }
5671 }
5672
5673 #[test]
5674 fn test_avro_3827_disallow_duplicate_field_names() -> TestResult {
5675 let schema_str = r#"
5676 {
5677 "name": "my_schema",
5678 "type": "record",
5679 "fields": [
5680 {
5681 "name": "f1",
5682 "type": {
5683 "name": "a",
5684 "type": "record",
5685 "fields": []
5686 }
5687 }, {
5688 "name": "f1",
5689 "type": {
5690 "name": "b",
5691 "type": "record",
5692 "fields": []
5693 }
5694 }
5695 ]
5696 }
5697 "#;
5698
5699 match Schema::parse_str(schema_str) {
5700 Err(Error::FieldNameDuplicate(_)) => (),
5701 other => {
5702 return Err(format!("Expected Error::FieldNameDuplicate, got {other:?}").into());
5703 }
5704 };
5705
5706 let schema_str = r#"
5707 {
5708 "name": "my_schema",
5709 "type": "record",
5710 "fields": [
5711 {
5712 "name": "f1",
5713 "type": {
5714 "name": "a",
5715 "type": "record",
5716 "fields": [
5717 {
5718 "name": "f1",
5719 "type": {
5720 "name": "b",
5721 "type": "record",
5722 "fields": []
5723 }
5724 }
5725 ]
5726 }
5727 }
5728 ]
5729 }
5730 "#;
5731
5732 let expected = r#"{"name":"my_schema","type":"record","fields":[{"name":"f1","type":{"name":"a","type":"record","fields":[{"name":"f1","type":{"name":"b","type":"record","fields":[]}}]}}]}"#;
5733 let schema = Schema::parse_str(schema_str)?;
5734 let canonical_form = schema.canonical_form();
5735 assert_eq!(canonical_form, expected);
5736
5737 Ok(())
5738 }
5739
5740 #[test]
5741 fn test_avro_3830_null_namespace_in_fully_qualified_names() -> TestResult {
5742 let schema_str = r#"
5745 {
5746 "name": ".record1",
5747 "namespace": "ns1",
5748 "type": "record",
5749 "fields": [
5750 {
5751 "name": "f1",
5752 "type": {
5753 "name": ".enum1",
5754 "namespace": "ns2",
5755 "type": "enum",
5756 "symbols": ["a"]
5757 }
5758 }, {
5759 "name": "f2",
5760 "type": {
5761 "name": ".fxed1",
5762 "namespace": "ns3",
5763 "type": "fixed",
5764 "size": 1
5765 }
5766 }
5767 ]
5768 }
5769 "#;
5770
5771 let expected = r#"{"name":"record1","type":"record","fields":[{"name":"f1","type":{"name":"enum1","type":"enum","symbols":["a"]}},{"name":"f2","type":{"name":"fxed1","type":"fixed","size":1}}]}"#;
5772 let schema = Schema::parse_str(schema_str)?;
5773 let canonical_form = schema.canonical_form();
5774 assert_eq!(canonical_form, expected);
5775
5776 let schema_str = r#"
5778 {
5779 "name": ".record1",
5780 "namespace": "ns1",
5781 "type": "record",
5782 "fields": [
5783 {
5784 "name": "f1",
5785 "type": {
5786 "name": "enum1",
5787 "type": "enum",
5788 "symbols": ["a"]
5789 }
5790 }, {
5791 "name": "f2",
5792 "type": {
5793 "name": "fxed1",
5794 "type": "fixed",
5795 "size": 1
5796 }
5797 }
5798 ]
5799 }
5800 "#;
5801
5802 let expected = r#"{"name":"record1","type":"record","fields":[{"name":"f1","type":{"name":"enum1","type":"enum","symbols":["a"]}},{"name":"f2","type":{"name":"fxed1","type":"fixed","size":1}}]}"#;
5803 let schema = Schema::parse_str(schema_str)?;
5804 let canonical_form = schema.canonical_form();
5805 assert_eq!(canonical_form, expected);
5806
5807 let name = Name::new(".my_name")?;
5808 let fullname = name.fullname(None);
5809 assert_eq!(fullname, "my_name");
5810 let qname = name.fully_qualified_name(&None).to_string();
5811 assert_eq!(qname, "my_name");
5812
5813 Ok(())
5814 }
5815
5816 #[test]
5817 fn test_avro_3814_schema_resolution_failure() -> TestResult {
5818 let reader_schema = json!(
5820 {
5821 "type": "record",
5822 "name": "MyOuterRecord",
5823 "fields": [
5824 {
5825 "name": "inner_record",
5826 "type": [
5827 "null",
5828 {
5829 "type": "record",
5830 "name": "MyRecord",
5831 "fields": [
5832 {"name": "a", "type": "string"}
5833 ]
5834 }
5835 ],
5836 "default": null
5837 }
5838 ]
5839 }
5840 );
5841
5842 let writer_schema = json!(
5845 {
5846 "type": "record",
5847 "name": "MyOuterRecord",
5848 "fields": [
5849 {
5850 "name": "inner_record",
5851 "type": [
5852 "null",
5853 {
5854 "type": "record",
5855 "name": "MyRecord",
5856 "fields": [
5857 {"name": "a", "type": "string"},
5858 {
5859 "name": "b",
5860 "type": [
5861 "null",
5862 {
5863 "type": "enum",
5864 "name": "MyEnum",
5865 "symbols": ["A", "B", "C"],
5866 "default": "C"
5867 }
5868 ],
5869 "default": null
5870 },
5871 ]
5872 }
5873 ]
5874 }
5875 ],
5876 "default": null
5877 }
5878 );
5879
5880 #[derive(Serialize, Deserialize, Debug)]
5883 struct MyInnerRecordReader {
5884 a: String,
5885 }
5886
5887 #[derive(Serialize, Deserialize, Debug)]
5888 struct MyRecordReader {
5889 inner_record: Option<MyInnerRecordReader>,
5890 }
5891
5892 #[derive(Serialize, Deserialize, Debug)]
5893 enum MyEnum {
5894 A,
5895 B,
5896 C,
5897 }
5898
5899 #[derive(Serialize, Deserialize, Debug)]
5900 struct MyInnerRecordWriter {
5901 a: String,
5902 b: Option<MyEnum>,
5903 }
5904
5905 #[derive(Serialize, Deserialize, Debug)]
5906 struct MyRecordWriter {
5907 inner_record: Option<MyInnerRecordWriter>,
5908 }
5909
5910 let s = MyRecordWriter {
5911 inner_record: Some(MyInnerRecordWriter {
5912 a: "foo".to_string(),
5913 b: None,
5914 }),
5915 };
5916
5917 let writer_schema = Schema::parse(&writer_schema)?;
5919 let avro_value = crate::to_value(s)?;
5920 assert!(
5921 avro_value.validate(&writer_schema),
5922 "value is valid for schema",
5923 );
5924 let datum = crate::to_avro_datum(&writer_schema, avro_value)?;
5925
5926 let reader_schema = Schema::parse(&reader_schema)?;
5928 let mut x = &datum[..];
5929
5930 let deser_value = crate::from_avro_datum(&writer_schema, &mut x, Some(&reader_schema))?;
5932 assert!(deser_value.validate(&reader_schema));
5933
5934 let d: MyRecordReader = crate::from_value(&deser_value)?;
5936 assert_eq!(d.inner_record.unwrap().a, "foo".to_string());
5937 Ok(())
5938 }
5939
5940 #[test]
5941 fn test_avro_3837_disallow_invalid_namespace() -> TestResult {
5942 let schema_str = r#"
5944 {
5945 "name": "record1",
5946 "namespace": "ns1",
5947 "type": "record",
5948 "fields": []
5949 }
5950 "#;
5951
5952 let expected = r#"{"name":"ns1.record1","type":"record","fields":[]}"#;
5953 let schema = Schema::parse_str(schema_str)?;
5954 let canonical_form = schema.canonical_form();
5955 assert_eq!(canonical_form, expected);
5956
5957 let schema_str = r#"
5959 {
5960 "name": "enum1",
5961 "namespace": "ns1.foo.bar",
5962 "type": "enum",
5963 "symbols": ["a"]
5964 }
5965 "#;
5966
5967 let expected = r#"{"name":"ns1.foo.bar.enum1","type":"enum","symbols":["a"]}"#;
5968 let schema = Schema::parse_str(schema_str)?;
5969 let canonical_form = schema.canonical_form();
5970 assert_eq!(canonical_form, expected);
5971
5972 let schema_str = r#"
5974 {
5975 "name": "fixed1",
5976 "namespace": ".ns1.a.b",
5977 "type": "fixed",
5978 "size": 1
5979 }
5980 "#;
5981
5982 match Schema::parse_str(schema_str) {
5983 Err(Error::InvalidNamespace(_, _)) => (),
5984 other => return Err(format!("Expected Error::InvalidNamespace, got {other:?}").into()),
5985 };
5986
5987 let schema_str = r#"
5989 {
5990 "name": "record1",
5991 "namespace": "ns1.a*b.c",
5992 "type": "record",
5993 "fields": []
5994 }
5995 "#;
5996
5997 match Schema::parse_str(schema_str) {
5998 Err(Error::InvalidNamespace(_, _)) => (),
5999 other => return Err(format!("Expected Error::InvalidNamespace, got {other:?}").into()),
6000 };
6001
6002 let schema_str = r#"
6004 {
6005 "name": "fixed1",
6006 "namespace": "ns1.1a.b",
6007 "type": "fixed",
6008 "size": 1
6009 }
6010 "#;
6011
6012 match Schema::parse_str(schema_str) {
6013 Err(Error::InvalidNamespace(_, _)) => (),
6014 other => return Err(format!("Expected Error::InvalidNamespace, got {other:?}").into()),
6015 };
6016
6017 let schema_str = r#"
6019 {
6020 "name": "fixed1",
6021 "namespace": "ns1..a",
6022 "type": "fixed",
6023 "size": 1
6024 }
6025 "#;
6026
6027 match Schema::parse_str(schema_str) {
6028 Err(Error::InvalidNamespace(_, _)) => (),
6029 other => return Err(format!("Expected Error::InvalidNamespace, got {other:?}").into()),
6030 };
6031
6032 let schema_str = r#"
6034 {
6035 "name": "fixed1",
6036 "namespace": "ns1.a.",
6037 "type": "fixed",
6038 "size": 1
6039 }
6040 "#;
6041
6042 match Schema::parse_str(schema_str) {
6043 Err(Error::InvalidNamespace(_, _)) => (),
6044 other => return Err(format!("Expected Error::InvalidNamespace, got {other:?}").into()),
6045 };
6046
6047 Ok(())
6048 }
6049
6050 #[test]
6051 fn test_avro_3851_validate_default_value_of_simple_record_field() -> TestResult {
6052 let schema_str = r#"
6053 {
6054 "name": "record1",
6055 "namespace": "ns",
6056 "type": "record",
6057 "fields": [
6058 {
6059 "name": "f1",
6060 "type": "int",
6061 "default": "invalid"
6062 }
6063 ]
6064 }
6065 "#;
6066 let expected = Error::GetDefaultRecordField(
6067 "f1".to_string(),
6068 "ns.record1".to_string(),
6069 r#""int""#.to_string(),
6070 )
6071 .to_string();
6072 let result = Schema::parse_str(schema_str);
6073 assert!(result.is_err());
6074 let err = result
6075 .map_err(|e| e.to_string())
6076 .err()
6077 .unwrap_or_else(|| "unexpected".to_string());
6078 assert_eq!(expected, err);
6079
6080 Ok(())
6081 }
6082
6083 #[test]
6084 fn test_avro_3851_validate_default_value_of_nested_record_field() -> TestResult {
6085 let schema_str = r#"
6086 {
6087 "name": "record1",
6088 "namespace": "ns",
6089 "type": "record",
6090 "fields": [
6091 {
6092 "name": "f1",
6093 "type": {
6094 "name": "record2",
6095 "type": "record",
6096 "fields": [
6097 {
6098 "name": "f1_1",
6099 "type": "int"
6100 }
6101 ]
6102 },
6103 "default": "invalid"
6104 }
6105 ]
6106 }
6107 "#;
6108 let expected = Error::GetDefaultRecordField(
6109 "f1".to_string(),
6110 "ns.record1".to_string(),
6111 r#"{"name":"ns.record2","type":"record","fields":[{"name":"f1_1","type":"int"}]}"#
6112 .to_string(),
6113 )
6114 .to_string();
6115 let result = Schema::parse_str(schema_str);
6116 assert!(result.is_err());
6117 let err = result
6118 .map_err(|e| e.to_string())
6119 .err()
6120 .unwrap_or_else(|| "unexpected".to_string());
6121 assert_eq!(expected, err);
6122
6123 Ok(())
6124 }
6125
6126 #[test]
6127 fn test_avro_3851_validate_default_value_of_enum_record_field() -> TestResult {
6128 let schema_str = r#"
6129 {
6130 "name": "record1",
6131 "namespace": "ns",
6132 "type": "record",
6133 "fields": [
6134 {
6135 "name": "f1",
6136 "type": {
6137 "name": "enum1",
6138 "type": "enum",
6139 "symbols": ["a", "b", "c"]
6140 },
6141 "default": "invalid"
6142 }
6143 ]
6144 }
6145 "#;
6146 let expected = Error::GetDefaultRecordField(
6147 "f1".to_string(),
6148 "ns.record1".to_string(),
6149 r#"{"name":"ns.enum1","type":"enum","symbols":["a","b","c"]}"#.to_string(),
6150 )
6151 .to_string();
6152 let result = Schema::parse_str(schema_str);
6153 assert!(result.is_err());
6154 let err = result
6155 .map_err(|e| e.to_string())
6156 .err()
6157 .unwrap_or_else(|| "unexpected".to_string());
6158 assert_eq!(expected, err);
6159
6160 Ok(())
6161 }
6162
6163 #[test]
6164 fn test_avro_3851_validate_default_value_of_fixed_record_field() -> TestResult {
6165 let schema_str = r#"
6166 {
6167 "name": "record1",
6168 "namespace": "ns",
6169 "type": "record",
6170 "fields": [
6171 {
6172 "name": "f1",
6173 "type": {
6174 "name": "fixed1",
6175 "type": "fixed",
6176 "size": 3
6177 },
6178 "default": 100
6179 }
6180 ]
6181 }
6182 "#;
6183 let expected = Error::GetDefaultRecordField(
6184 "f1".to_string(),
6185 "ns.record1".to_string(),
6186 r#"{"name":"ns.fixed1","type":"fixed","size":3}"#.to_string(),
6187 )
6188 .to_string();
6189 let result = Schema::parse_str(schema_str);
6190 assert!(result.is_err());
6191 let err = result
6192 .map_err(|e| e.to_string())
6193 .err()
6194 .unwrap_or_else(|| "unexpected".to_string());
6195 assert_eq!(expected, err);
6196
6197 Ok(())
6198 }
6199
6200 #[test]
6201 fn test_avro_3851_validate_default_value_of_array_record_field() -> TestResult {
6202 let schema_str = r#"
6203 {
6204 "name": "record1",
6205 "namespace": "ns",
6206 "type": "record",
6207 "fields": [
6208 {
6209 "name": "f1",
6210 "type": "array",
6211 "items": "int",
6212 "default": "invalid"
6213 }
6214 ]
6215 }
6216 "#;
6217 let expected = Error::GetDefaultRecordField(
6218 "f1".to_string(),
6219 "ns.record1".to_string(),
6220 r#"{"type":"array","items":"int"}"#.to_string(),
6221 )
6222 .to_string();
6223 let result = Schema::parse_str(schema_str);
6224 assert!(result.is_err());
6225 let err = result
6226 .map_err(|e| e.to_string())
6227 .err()
6228 .unwrap_or_else(|| "unexpected".to_string());
6229 assert_eq!(expected, err);
6230
6231 Ok(())
6232 }
6233
6234 #[test]
6235 fn test_avro_3851_validate_default_value_of_map_record_field() -> TestResult {
6236 let schema_str = r#"
6237 {
6238 "name": "record1",
6239 "namespace": "ns",
6240 "type": "record",
6241 "fields": [
6242 {
6243 "name": "f1",
6244 "type": "map",
6245 "values": "string",
6246 "default": "invalid"
6247 }
6248 ]
6249 }
6250 "#;
6251 let expected = Error::GetDefaultRecordField(
6252 "f1".to_string(),
6253 "ns.record1".to_string(),
6254 r#"{"type":"map","values":"string"}"#.to_string(),
6255 )
6256 .to_string();
6257 let result = Schema::parse_str(schema_str);
6258 assert!(result.is_err());
6259 let err = result
6260 .map_err(|e| e.to_string())
6261 .err()
6262 .unwrap_or_else(|| "unexpected".to_string());
6263 assert_eq!(expected, err);
6264
6265 Ok(())
6266 }
6267
6268 #[test]
6269 fn test_avro_3851_validate_default_value_of_ref_record_field() -> TestResult {
6270 let schema_str = r#"
6271 {
6272 "name": "record1",
6273 "namespace": "ns",
6274 "type": "record",
6275 "fields": [
6276 {
6277 "name": "f1",
6278 "type": {
6279 "name": "record2",
6280 "type": "record",
6281 "fields": [
6282 {
6283 "name": "f1_1",
6284 "type": "int"
6285 }
6286 ]
6287 }
6288 }, {
6289 "name": "f2",
6290 "type": "ns.record2",
6291 "default": { "f1_1": true }
6292 }
6293 ]
6294 }
6295 "#;
6296 let expected = Error::GetDefaultRecordField(
6297 "f2".to_string(),
6298 "ns.record1".to_string(),
6299 r#""ns.record2""#.to_string(),
6300 )
6301 .to_string();
6302 let result = Schema::parse_str(schema_str);
6303 assert!(result.is_err());
6304 let err = result
6305 .map_err(|e| e.to_string())
6306 .err()
6307 .unwrap_or_else(|| "unexpected".to_string());
6308 assert_eq!(expected, err);
6309
6310 Ok(())
6311 }
6312
6313 #[test]
6314 fn test_avro_3851_validate_default_value_of_enum() -> TestResult {
6315 let schema_str = r#"
6316 {
6317 "name": "enum1",
6318 "namespace": "ns",
6319 "type": "enum",
6320 "symbols": ["a", "b", "c"],
6321 "default": 100
6322 }
6323 "#;
6324 let expected = Error::EnumDefaultWrongType(100.into()).to_string();
6325 let result = Schema::parse_str(schema_str);
6326 assert!(result.is_err());
6327 let err = result
6328 .map_err(|e| e.to_string())
6329 .err()
6330 .unwrap_or_else(|| "unexpected".to_string());
6331 assert_eq!(expected, err);
6332
6333 let schema_str = r#"
6334 {
6335 "name": "enum1",
6336 "namespace": "ns",
6337 "type": "enum",
6338 "symbols": ["a", "b", "c"],
6339 "default": "d"
6340 }
6341 "#;
6342 let expected = Error::GetEnumDefault {
6343 symbol: "d".to_string(),
6344 symbols: vec!["a".to_string(), "b".to_string(), "c".to_string()],
6345 }
6346 .to_string();
6347 let result = Schema::parse_str(schema_str);
6348 assert!(result.is_err());
6349 let err = result
6350 .map_err(|e| e.to_string())
6351 .err()
6352 .unwrap_or_else(|| "unexpected".to_string());
6353 assert_eq!(expected, err);
6354
6355 Ok(())
6356 }
6357
6358 #[test]
6359 fn test_avro_3862_get_aliases() -> TestResult {
6360 let schema_str = r#"
6362 {
6363 "name": "record1",
6364 "namespace": "ns1",
6365 "type": "record",
6366 "aliases": ["r1", "ns2.r2"],
6367 "fields": [
6368 { "name": "f1", "type": "int" },
6369 { "name": "f2", "type": "string" }
6370 ]
6371 }
6372 "#;
6373 let schema = Schema::parse_str(schema_str)?;
6374 let expected = vec![Alias::new("ns1.r1")?, Alias::new("ns2.r2")?];
6375 match schema.aliases() {
6376 Some(aliases) => assert_eq!(aliases, &expected),
6377 None => panic!("Expected Some({:?}), got None", expected),
6378 }
6379
6380 let schema_str = r#"
6381 {
6382 "name": "record1",
6383 "namespace": "ns1",
6384 "type": "record",
6385 "fields": [
6386 { "name": "f1", "type": "int" },
6387 { "name": "f2", "type": "string" }
6388 ]
6389 }
6390 "#;
6391 let schema = Schema::parse_str(schema_str)?;
6392 match schema.aliases() {
6393 None => (),
6394 some => panic!("Expected None, got {some:?}"),
6395 }
6396
6397 let schema_str = r#"
6399 {
6400 "name": "enum1",
6401 "namespace": "ns1",
6402 "type": "enum",
6403 "aliases": ["en1", "ns2.en2"],
6404 "symbols": ["a", "b", "c"]
6405 }
6406 "#;
6407 let schema = Schema::parse_str(schema_str)?;
6408 let expected = vec![Alias::new("ns1.en1")?, Alias::new("ns2.en2")?];
6409 match schema.aliases() {
6410 Some(aliases) => assert_eq!(aliases, &expected),
6411 None => panic!("Expected Some({:?}), got None", expected),
6412 }
6413
6414 let schema_str = r#"
6415 {
6416 "name": "enum1",
6417 "namespace": "ns1",
6418 "type": "enum",
6419 "symbols": ["a", "b", "c"]
6420 }
6421 "#;
6422 let schema = Schema::parse_str(schema_str)?;
6423 match schema.aliases() {
6424 None => (),
6425 some => panic!("Expected None, got {some:?}"),
6426 }
6427
6428 let schema_str = r#"
6430 {
6431 "name": "fixed1",
6432 "namespace": "ns1",
6433 "type": "fixed",
6434 "aliases": ["fx1", "ns2.fx2"],
6435 "size": 10
6436 }
6437 "#;
6438 let schema = Schema::parse_str(schema_str)?;
6439 let expected = vec![Alias::new("ns1.fx1")?, Alias::new("ns2.fx2")?];
6440 match schema.aliases() {
6441 Some(aliases) => assert_eq!(aliases, &expected),
6442 None => panic!("Expected Some({:?}), got None", expected),
6443 }
6444
6445 let schema_str = r#"
6446 {
6447 "name": "fixed1",
6448 "namespace": "ns1",
6449 "type": "fixed",
6450 "size": 10
6451 }
6452 "#;
6453 let schema = Schema::parse_str(schema_str)?;
6454 match schema.aliases() {
6455 None => (),
6456 some => panic!("Expected None, got {some:?}"),
6457 }
6458
6459 let schema = Schema::Int;
6461 match schema.aliases() {
6462 None => (),
6463 some => panic!("Expected None, got {some:?}"),
6464 }
6465
6466 Ok(())
6467 }
6468
6469 #[test]
6470 fn test_avro_3862_get_doc() -> TestResult {
6471 let schema_str = r#"
6473 {
6474 "name": "record1",
6475 "type": "record",
6476 "doc": "Record Document",
6477 "fields": [
6478 { "name": "f1", "type": "int" },
6479 { "name": "f2", "type": "string" }
6480 ]
6481 }
6482 "#;
6483 let schema = Schema::parse_str(schema_str)?;
6484 let expected = "Record Document";
6485 match schema.doc() {
6486 Some(doc) => assert_eq!(doc, expected),
6487 None => panic!("Expected Some({:?}), got None", expected),
6488 }
6489
6490 let schema_str = r#"
6491 {
6492 "name": "record1",
6493 "type": "record",
6494 "fields": [
6495 { "name": "f1", "type": "int" },
6496 { "name": "f2", "type": "string" }
6497 ]
6498 }
6499 "#;
6500 let schema = Schema::parse_str(schema_str)?;
6501 match schema.doc() {
6502 None => (),
6503 some => panic!("Expected None, got {some:?}"),
6504 }
6505
6506 let schema_str = r#"
6508 {
6509 "name": "enum1",
6510 "type": "enum",
6511 "doc": "Enum Document",
6512 "symbols": ["a", "b", "c"]
6513 }
6514 "#;
6515 let schema = Schema::parse_str(schema_str)?;
6516 let expected = "Enum Document";
6517 match schema.doc() {
6518 Some(doc) => assert_eq!(doc, expected),
6519 None => panic!("Expected Some({:?}), got None", expected),
6520 }
6521
6522 let schema_str = r#"
6523 {
6524 "name": "enum1",
6525 "type": "enum",
6526 "symbols": ["a", "b", "c"]
6527 }
6528 "#;
6529 let schema = Schema::parse_str(schema_str)?;
6530 match schema.doc() {
6531 None => (),
6532 some => panic!("Expected None, got {some:?}"),
6533 }
6534
6535 let schema_str = r#"
6537 {
6538 "name": "fixed1",
6539 "type": "fixed",
6540 "doc": "Fixed Document",
6541 "size": 10
6542 }
6543 "#;
6544 let schema = Schema::parse_str(schema_str)?;
6545 let expected = "Fixed Document";
6546 match schema.doc() {
6547 Some(doc) => assert_eq!(doc, expected),
6548 None => panic!("Expected Some({:?}), got None", expected),
6549 }
6550
6551 let schema_str = r#"
6552 {
6553 "name": "fixed1",
6554 "type": "fixed",
6555 "size": 10
6556 }
6557 "#;
6558 let schema = Schema::parse_str(schema_str)?;
6559 match schema.doc() {
6560 None => (),
6561 some => panic!("Expected None, got {some:?}"),
6562 }
6563
6564 let schema = Schema::Int;
6566 match schema.doc() {
6567 None => (),
6568 some => panic!("Expected None, got {some:?}"),
6569 }
6570
6571 Ok(())
6572 }
6573
6574 #[test]
6575 fn avro_3886_serialize_attributes() -> TestResult {
6576 let attributes = BTreeMap::from([
6577 ("string_key".into(), "value".into()),
6578 ("number_key".into(), 1.23.into()),
6579 ("null_key".into(), Value::Null),
6580 (
6581 "array_key".into(),
6582 Value::Array(vec![1.into(), 2.into(), 3.into()]),
6583 ),
6584 ("object_key".into(), Value::Object(Map::default())),
6585 ]);
6586
6587 let schema = Schema::Enum(EnumSchema {
6589 name: Name::new("a")?,
6590 aliases: None,
6591 doc: None,
6592 symbols: vec![],
6593 default: None,
6594 attributes: attributes.clone(),
6595 });
6596 let serialized = serde_json::to_string(&schema)?;
6597 assert_eq!(
6598 r#"{"type":"enum","name":"a","symbols":[],"array_key":[1,2,3],"null_key":null,"number_key":1.23,"object_key":{},"string_key":"value"}"#,
6599 &serialized
6600 );
6601
6602 let schema = Schema::Fixed(FixedSchema {
6604 name: Name::new("a")?,
6605 aliases: None,
6606 doc: None,
6607 size: 1,
6608 default: None,
6609 attributes: attributes.clone(),
6610 });
6611 let serialized = serde_json::to_string(&schema)?;
6612 assert_eq!(
6613 r#"{"type":"fixed","name":"a","size":1,"array_key":[1,2,3],"null_key":null,"number_key":1.23,"object_key":{},"string_key":"value"}"#,
6614 &serialized
6615 );
6616
6617 let schema = Schema::Record(RecordSchema {
6619 name: Name::new("a")?,
6620 aliases: None,
6621 doc: None,
6622 fields: vec![],
6623 lookup: BTreeMap::new(),
6624 attributes,
6625 });
6626 let serialized = serde_json::to_string(&schema)?;
6627 assert_eq!(
6628 r#"{"type":"record","name":"a","fields":[],"array_key":[1,2,3],"null_key":null,"number_key":1.23,"object_key":{},"string_key":"value"}"#,
6629 &serialized
6630 );
6631
6632 Ok(())
6633 }
6634
6635 #[test]
6638 fn test_avro_3897_funny_valid_names_and_namespaces() -> TestResult {
6639 for funny_name in ["_", "_._", "__._", "_.__", "_._._"] {
6640 let name = Name::new(funny_name);
6641 assert!(name.is_ok());
6642 }
6643 Ok(())
6644 }
6645
6646 #[test]
6647 fn test_avro_3896_decimal_schema() -> TestResult {
6648 let schema = json!(
6650 {
6651 "type": "bytes",
6652 "name": "BytesDecimal",
6653 "logicalType": "decimal",
6654 "size": 38,
6655 "precision": 9,
6656 "scale": 2
6657 });
6658 let parse_result = Schema::parse(&schema)?;
6659 assert!(matches!(
6660 parse_result,
6661 Schema::Decimal(DecimalSchema {
6662 precision: 9,
6663 scale: 2,
6664 ..
6665 })
6666 ));
6667
6668 let schema = json!(
6670 {
6671 "type": "long",
6672 "name": "LongDecimal",
6673 "logicalType": "decimal"
6674 });
6675 let parse_result = Schema::parse(&schema)?;
6676 assert_eq!(parse_result, Schema::Long);
6678
6679 Ok(())
6680 }
6681
6682 #[test]
6683 fn avro_3896_uuid_schema_for_string() -> TestResult {
6684 let schema = json!(
6686 {
6687 "type": "string",
6688 "name": "StringUUID",
6689 "logicalType": "uuid"
6690 });
6691 let parse_result = Schema::parse(&schema)?;
6692 assert_eq!(parse_result, Schema::Uuid);
6693
6694 Ok(())
6695 }
6696
6697 #[test]
6698 #[serial(serde_is_human_readable)]
6699 fn avro_rs_53_uuid_with_fixed() -> TestResult {
6700 #[derive(Debug, Serialize, Deserialize)]
6701 struct Comment {
6702 id: crate::Uuid,
6703 }
6704
6705 impl AvroSchema for Comment {
6706 fn get_schema() -> Schema {
6707 Schema::parse_str(
6708 r#"{
6709 "type" : "record",
6710 "name" : "Comment",
6711 "fields" : [ {
6712 "name" : "id",
6713 "type" : {
6714 "type" : "fixed",
6715 "size" : 16,
6716 "logicalType" : "uuid",
6717 "name": "FixedUUID"
6718 }
6719 } ]
6720 }"#,
6721 )
6722 .expect("Invalid Comment Avro schema")
6723 }
6724 }
6725
6726 let payload = Comment {
6727 id: "de2df598-9948-4988-b00a-a41c0e287398".parse()?,
6728 };
6729 let mut buffer = Vec::new();
6730
6731 crate::util::SERDE_HUMAN_READABLE.store(true, Ordering::Release);
6733 let bytes = SpecificSingleObjectWriter::<Comment>::with_capacity(64)?
6734 .write_ref(&payload, &mut buffer)?;
6735 assert_eq!(bytes, 47);
6736
6737 crate::util::SERDE_HUMAN_READABLE.store(false, Ordering::Release);
6739 let bytes = SpecificSingleObjectWriter::<Comment>::with_capacity(64)?
6740 .write_ref(&payload, &mut buffer)?;
6741 assert_eq!(bytes, 27);
6742
6743 Ok(())
6744 }
6745
6746 #[test]
6747 fn avro_3926_uuid_schema_for_fixed_with_size_16() -> TestResult {
6748 let schema = json!(
6749 {
6750 "type": "fixed",
6751 "name": "FixedUUID",
6752 "size": 16,
6753 "logicalType": "uuid"
6754 });
6755 let parse_result = Schema::parse(&schema)?;
6756 assert_eq!(parse_result, Schema::Uuid);
6757 assert_not_logged(
6758 r#"Ignoring uuid logical type for a Fixed schema because its size (6) is not 16! Schema: Fixed(FixedSchema { name: Name { name: "FixedUUID", namespace: None }, aliases: None, doc: None, size: 6, attributes: {"logicalType": String("uuid")} })"#,
6759 );
6760
6761 Ok(())
6762 }
6763
6764 #[test]
6765 fn avro_3926_uuid_schema_for_fixed_with_size_different_than_16() -> TestResult {
6766 let schema = json!(
6767 {
6768 "type": "fixed",
6769 "name": "FixedUUID",
6770 "size": 6,
6771 "logicalType": "uuid"
6772 });
6773 let parse_result = Schema::parse(&schema)?;
6774
6775 assert_eq!(
6776 parse_result,
6777 Schema::Fixed(FixedSchema {
6778 name: Name::new("FixedUUID")?,
6779 aliases: None,
6780 doc: None,
6781 size: 6,
6782 default: None,
6783 attributes: BTreeMap::from([("logicalType".to_string(), "uuid".into())]),
6784 })
6785 );
6786 assert_logged(
6787 r#"Ignoring uuid logical type for a Fixed schema because its size (6) is not 16! Schema: Fixed(FixedSchema { name: Name { name: "FixedUUID", namespace: None }, aliases: None, doc: None, size: 6, default: None, attributes: {"logicalType": String("uuid")} })"#,
6788 );
6789
6790 Ok(())
6791 }
6792
6793 #[test]
6794 fn test_avro_3896_timestamp_millis_schema() -> TestResult {
6795 let schema = json!(
6797 {
6798 "type": "long",
6799 "name": "LongTimestampMillis",
6800 "logicalType": "timestamp-millis"
6801 });
6802 let parse_result = Schema::parse(&schema)?;
6803 assert_eq!(parse_result, Schema::TimestampMillis);
6804
6805 let schema = json!(
6807 {
6808 "type": "int",
6809 "name": "IntTimestampMillis",
6810 "logicalType": "timestamp-millis"
6811 });
6812 let parse_result = Schema::parse(&schema)?;
6813 assert_eq!(parse_result, Schema::Int);
6814
6815 Ok(())
6816 }
6817
6818 #[test]
6819 fn test_avro_3896_custom_bytes_schema() -> TestResult {
6820 let schema = json!(
6822 {
6823 "type": "bytes",
6824 "name": "BytesLog",
6825 "logicalType": "custom"
6826 });
6827 let parse_result = Schema::parse(&schema)?;
6828 assert_eq!(parse_result, Schema::Bytes);
6829 assert_eq!(parse_result.custom_attributes(), None);
6830
6831 Ok(())
6832 }
6833
6834 #[test]
6835 fn test_avro_3899_parse_decimal_type() -> TestResult {
6836 let schema = Schema::parse_str(
6837 r#"{
6838 "name": "InvalidDecimal",
6839 "type": "fixed",
6840 "size": 16,
6841 "logicalType": "decimal",
6842 "precision": 2,
6843 "scale": 3
6844 }"#,
6845 )?;
6846 match schema {
6847 Schema::Fixed(fixed_schema) => {
6848 let attrs = fixed_schema.attributes;
6849 let precision = attrs
6850 .get("precision")
6851 .expect("The 'precision' attribute is missing");
6852 let scale = attrs
6853 .get("scale")
6854 .expect("The 'scale' attribute is missing");
6855 assert_logged(&format!("Ignoring invalid decimal logical type: The decimal precision ({}) must be bigger or equal to the scale ({})", precision, scale));
6856 }
6857 _ => unreachable!("Expected Schema::Fixed, got {:?}", schema),
6858 }
6859
6860 let schema = Schema::parse_str(
6861 r#"{
6862 "name": "ValidDecimal",
6863 "type": "bytes",
6864 "logicalType": "decimal",
6865 "precision": 3,
6866 "scale": 2
6867 }"#,
6868 )?;
6869 match schema {
6870 Schema::Decimal(_) => {
6871 assert_not_logged("Ignoring invalid decimal logical type: The decimal precision (2) must be bigger or equal to the scale (3)");
6872 }
6873 _ => unreachable!("Expected Schema::Decimal, got {:?}", schema),
6874 }
6875
6876 Ok(())
6877 }
6878
6879 #[test]
6880 fn avro_3920_serialize_record_with_custom_attributes() -> TestResult {
6881 let expected = {
6882 let mut lookup = BTreeMap::new();
6883 lookup.insert("value".to_owned(), 0);
6884 Schema::Record(RecordSchema {
6885 name: Name {
6886 name: "LongList".to_owned(),
6887 namespace: None,
6888 },
6889 aliases: Some(vec![Alias::new("LinkedLongs").unwrap()]),
6890 doc: None,
6891 fields: vec![RecordField {
6892 name: "value".to_string(),
6893 doc: None,
6894 default: None,
6895 aliases: None,
6896 schema: Schema::Long,
6897 order: RecordFieldOrder::Ascending,
6898 position: 0,
6899 custom_attributes: BTreeMap::from([("field-id".to_string(), 1.into())]),
6900 }],
6901 lookup,
6902 attributes: BTreeMap::from([("custom-attribute".to_string(), "value".into())]),
6903 })
6904 };
6905
6906 let value = serde_json::to_value(&expected)?;
6907 let serialized = serde_json::to_string(&value)?;
6908 assert_eq!(
6909 r#"{"aliases":["LinkedLongs"],"custom-attribute":"value","fields":[{"field-id":1,"name":"value","type":"long"}],"name":"LongList","type":"record"}"#,
6910 &serialized
6911 );
6912 assert_eq!(expected, Schema::parse_str(&serialized)?);
6913
6914 Ok(())
6915 }
6916
6917 #[test]
6918 fn test_avro_3925_serialize_decimal_inner_fixed() -> TestResult {
6919 let schema = Schema::Decimal(DecimalSchema {
6920 precision: 36,
6921 scale: 10,
6922 inner: Box::new(Schema::Fixed(FixedSchema {
6923 name: Name::new("decimal_36_10").unwrap(),
6924 aliases: None,
6925 doc: None,
6926 size: 16,
6927 default: None,
6928 attributes: Default::default(),
6929 })),
6930 });
6931
6932 let serialized_json = serde_json::to_string_pretty(&schema)?;
6933
6934 let expected_json = r#"{
6935 "type": "fixed",
6936 "name": "decimal_36_10",
6937 "size": 16,
6938 "logicalType": "decimal",
6939 "scale": 10,
6940 "precision": 36
6941}"#;
6942
6943 assert_eq!(serialized_json, expected_json);
6944
6945 Ok(())
6946 }
6947
6948 #[test]
6949 fn test_avro_3925_serialize_decimal_inner_bytes() -> TestResult {
6950 let schema = Schema::Decimal(DecimalSchema {
6951 precision: 36,
6952 scale: 10,
6953 inner: Box::new(Schema::Bytes),
6954 });
6955
6956 let serialized_json = serde_json::to_string_pretty(&schema)?;
6957
6958 let expected_json = r#"{
6959 "type": "bytes",
6960 "logicalType": "decimal",
6961 "scale": 10,
6962 "precision": 36
6963}"#;
6964
6965 assert_eq!(serialized_json, expected_json);
6966
6967 Ok(())
6968 }
6969
6970 #[test]
6971 fn test_avro_3925_serialize_decimal_inner_invalid() -> TestResult {
6972 let schema = Schema::Decimal(DecimalSchema {
6973 precision: 36,
6974 scale: 10,
6975 inner: Box::new(Schema::String),
6976 });
6977
6978 let serialized_json = serde_json::to_string_pretty(&schema);
6979
6980 assert!(serialized_json.is_err());
6981
6982 Ok(())
6983 }
6984
6985 #[test]
6986 fn test_avro_3927_serialize_array_with_custom_attributes() -> TestResult {
6987 let expected = Schema::array_with_attributes(
6988 Schema::Long,
6989 BTreeMap::from([("field-id".to_string(), "1".into())]),
6990 );
6991
6992 let value = serde_json::to_value(&expected)?;
6993 let serialized = serde_json::to_string(&value)?;
6994 assert_eq!(
6995 r#"{"field-id":"1","items":"long","type":"array"}"#,
6996 &serialized
6997 );
6998 let actual_schema = Schema::parse_str(&serialized)?;
6999 assert_eq!(expected, actual_schema);
7000 assert_eq!(
7001 expected.custom_attributes(),
7002 actual_schema.custom_attributes()
7003 );
7004
7005 Ok(())
7006 }
7007
7008 #[test]
7009 fn test_avro_3927_serialize_map_with_custom_attributes() -> TestResult {
7010 let expected = Schema::map_with_attributes(
7011 Schema::Long,
7012 BTreeMap::from([("field-id".to_string(), "1".into())]),
7013 );
7014
7015 let value = serde_json::to_value(&expected)?;
7016 let serialized = serde_json::to_string(&value)?;
7017 assert_eq!(
7018 r#"{"field-id":"1","type":"map","values":"long"}"#,
7019 &serialized
7020 );
7021 let actual_schema = Schema::parse_str(&serialized)?;
7022 assert_eq!(expected, actual_schema);
7023 assert_eq!(
7024 expected.custom_attributes(),
7025 actual_schema.custom_attributes()
7026 );
7027
7028 Ok(())
7029 }
7030
7031 #[test]
7032 fn avro_3928_parse_int_based_schema_with_default() -> TestResult {
7033 let schema = r#"
7034 {
7035 "type": "record",
7036 "name": "DateLogicalType",
7037 "fields": [ {
7038 "name": "birthday",
7039 "type": {"type": "int", "logicalType": "date"},
7040 "default": 1681601653
7041 } ]
7042 }"#;
7043
7044 match Schema::parse_str(schema)? {
7045 Schema::Record(record_schema) => {
7046 assert_eq!(record_schema.fields.len(), 1);
7047 let field = record_schema.fields.first().unwrap();
7048 assert_eq!(field.name, "birthday");
7049 assert_eq!(field.schema, Schema::Date);
7050 assert_eq!(
7051 types::Value::from(field.default.clone().unwrap()),
7052 types::Value::Int(1681601653)
7053 );
7054 }
7055 _ => unreachable!("Expected Schema::Record"),
7056 }
7057
7058 Ok(())
7059 }
7060
7061 #[test]
7062 fn avro_3946_union_with_single_type() -> TestResult {
7063 let schema = r#"
7064 {
7065 "type": "record",
7066 "name": "Issue",
7067 "namespace": "invalid.example",
7068 "fields": [
7069 {
7070 "name": "myField",
7071 "type": ["long"]
7072 }
7073 ]
7074 }"#;
7075
7076 let _ = Schema::parse_str(schema)?;
7077
7078 assert_logged(
7079 "Union schema with just one member! Consider dropping the union! \
7080 Please enable debug logging to find out which Record schema \
7081 declares the union with 'RUST_LOG=apache_avro::schema=debug'.",
7082 );
7083
7084 Ok(())
7085 }
7086
7087 #[test]
7088 fn avro_3946_union_without_any_types() -> TestResult {
7089 let schema = r#"
7090 {
7091 "type": "record",
7092 "name": "Issue",
7093 "namespace": "invalid.example",
7094 "fields": [
7095 {
7096 "name": "myField",
7097 "type": []
7098 }
7099 ]
7100 }"#;
7101
7102 let _ = Schema::parse_str(schema)?;
7103
7104 assert_logged(
7105 "Union schemas should have at least two members! \
7106 Please enable debug logging to find out which Record schema \
7107 declares the union with 'RUST_LOG=apache_avro::schema=debug'.",
7108 );
7109
7110 Ok(())
7111 }
7112
7113 #[test]
7114 fn avro_3965_fixed_schema_with_default_bigger_than_size() -> TestResult {
7115 match Schema::parse_str(
7116 r#"{
7117 "type": "fixed",
7118 "name": "test",
7119 "size": 1,
7120 "default": "123456789"
7121 }"#,
7122 ) {
7123 Ok(_schema) => panic!("Must fail!"),
7124 Err(err) => {
7125 assert_eq!(
7126 err.to_string(),
7127 "Fixed schema's default value length (9) does not match its size (1)"
7128 );
7129 }
7130 }
7131
7132 Ok(())
7133 }
7134
7135 #[test]
7136 fn avro_4004_canonical_form_strip_logical_types() -> TestResult {
7137 let schema_str = r#"
7138 {
7139 "type": "record",
7140 "name": "test",
7141 "fields": [
7142 {"name": "a", "type": "long", "default": 42, "doc": "The field a"},
7143 {"name": "b", "type": "string", "namespace": "test.a"},
7144 {"name": "c", "type": "long", "logicalType": "timestamp-micros"}
7145 ]
7146 }"#;
7147
7148 let schema = Schema::parse_str(schema_str)?;
7149 let canonical_form = schema.canonical_form();
7150 let fp_rabin = schema.fingerprint::<Rabin>();
7151 assert_eq!(
7152 r#"{"name":"test","type":"record","fields":[{"name":"a","type":"long"},{"name":"b","type":"string"},{"name":"c","type":{"type":"long"}}]}"#,
7153 canonical_form
7154 );
7155 assert_eq!("92f2ccef718c6754", fp_rabin.to_string());
7156 Ok(())
7157 }
7158
7159 #[test]
7160 fn avro_4055_should_fail_to_parse_invalid_schema() -> TestResult {
7161 let invalid_schema_str = r#"
7163 {
7164 "type": "record",
7165 "name": "SampleSchema",
7166 "fields": [
7167 {
7168 "name": "order",
7169 "type": "record",
7170 "fields": [
7171 {
7172 "name": "order_number",
7173 "type": ["null", "string"],
7174 "default": null
7175 },
7176 { "name": "order_date", "type": "string" }
7177 ]
7178 }
7179 ]
7180 }"#;
7181
7182 let schema = Schema::parse_str(invalid_schema_str);
7183 assert!(schema.is_err());
7184 assert_eq!(
7185 schema.unwrap_err().to_string(),
7186 "Invalid schema: There is no type called 'record', if you meant to define a non-primitive schema, it should be defined inside `type` attribute. Please review the specification"
7187 );
7188
7189 let valid_schema = r#"
7190 {
7191 "type": "record",
7192 "name": "SampleSchema",
7193 "fields": [
7194 {
7195 "name": "order",
7196 "type": {
7197 "type": "record",
7198 "name": "Order",
7199 "fields": [
7200 {
7201 "name": "order_number",
7202 "type": ["null", "string"],
7203 "default": null
7204 },
7205 { "name": "order_date", "type": "string" }
7206 ]
7207 }
7208 }
7209 ]
7210 }"#;
7211 let schema = Schema::parse_str(valid_schema);
7212 assert!(schema.is_ok());
7213
7214 Ok(())
7215 }
7216}