1use crate::error::Details;
19use crate::schema::{
20 Alias, Aliases, ArraySchema, DecimalMetadata, DecimalSchema, EnumSchema, FixedSchema,
21 MapSchema, Name, Names, NamespaceRef, Precision, RecordField, RecordSchema, Scale, Schema,
22 SchemaKind, UnionSchema, UuidSchema,
23};
24use crate::types;
25use crate::util::MapHelper;
26use crate::validator::validate_enum_symbol_name;
27use crate::{AvroResult, Error};
28use log::{debug, error, warn};
29use serde_json::{Map, Value};
30use std::collections::{BTreeMap, HashMap, HashSet};
31
32#[derive(Default)]
33pub(crate) struct Parser {
34 input_schemas: HashMap<Name, Value>,
35 resolving_schemas: Names,
38 input_order: Vec<Name>,
39 parsed_schemas: Names,
41}
42
43impl Parser {
44 pub(crate) fn new(
45 input_schemas: HashMap<Name, Value>,
46 input_order: Vec<Name>,
47 parsed_schemas: Names,
48 ) -> Self {
49 Self {
50 input_schemas,
51 resolving_schemas: HashMap::default(),
52 input_order,
53 parsed_schemas,
54 }
55 }
56
57 pub(crate) fn get_parsed_schemas(&mut self) -> &Names {
58 &self.parsed_schemas
59 }
60
61 pub(super) fn parse_str(&mut self, input: &str) -> AvroResult<Schema> {
63 let value = serde_json::from_str(input).map_err(Details::ParseSchemaJson)?;
64 self.parse(&value, None)
65 }
66
67 pub(super) fn parse_list(&mut self) -> AvroResult<Vec<Schema>> {
71 self.parse_input_schemas()?;
72
73 let mut parsed_schemas = Vec::with_capacity(self.parsed_schemas.len());
74 for name in self.input_order.drain(0..) {
75 let parsed = self
76 .parsed_schemas
77 .remove(&name)
78 .expect("One of the input schemas was unexpectedly not parsed");
79 parsed_schemas.push(parsed);
80 }
81 Ok(parsed_schemas)
82 }
83
84 pub(super) fn parse_input_schemas(&mut self) -> Result<(), Error> {
86 while !self.input_schemas.is_empty() {
87 let next_name = self
88 .input_schemas
89 .keys()
90 .next()
91 .expect("Input schemas unexpectedly empty")
92 .to_owned();
93 let (name, value) = self
94 .input_schemas
95 .remove_entry(&next_name)
96 .expect("Key unexpectedly missing");
97 let parsed = self.parse(&value, None)?;
98 self.parsed_schemas
99 .insert(self.get_schema_type_name(name, &value)?, parsed);
100 }
101 Ok(())
102 }
103
104 pub(super) fn parse(
106 &mut self,
107 value: &Value,
108 enclosing_namespace: NamespaceRef,
109 ) -> AvroResult<Schema> {
110 match *value {
111 Value::String(ref t) => self.parse_known_schema(t.as_str(), enclosing_namespace),
112 Value::Object(ref data) => self.parse_complex(data, enclosing_namespace),
113 Value::Array(ref data) => self.parse_union(data, enclosing_namespace),
114 _ => Err(Details::ParseSchemaFromValidJson.into()),
115 }
116 }
117
118 fn parse_known_schema(
120 &mut self,
121 name: &str,
122 enclosing_namespace: NamespaceRef,
123 ) -> AvroResult<Schema> {
124 match name {
125 "null" => Ok(Schema::Null),
126 "boolean" => Ok(Schema::Boolean),
127 "int" => Ok(Schema::Int),
128 "long" => Ok(Schema::Long),
129 "double" => Ok(Schema::Double),
130 "float" => Ok(Schema::Float),
131 "bytes" => Ok(Schema::Bytes),
132 "string" => Ok(Schema::String),
133 _ => self.fetch_schema_ref(name, enclosing_namespace),
134 }
135 }
136
137 pub(super) fn fetch_schema_ref(
148 &mut self,
149 name: &str,
150 enclosing_namespace: NamespaceRef,
151 ) -> AvroResult<Schema> {
152 fn get_schema_ref(parsed: &Schema) -> Schema {
153 match parsed {
154 &Schema::Record(RecordSchema { ref name, .. })
155 | &Schema::Enum(EnumSchema { ref name, .. })
156 | &Schema::Fixed(FixedSchema { ref name, .. }) => {
157 Schema::Ref { name: name.clone() }
158 }
159 _ => parsed.clone(),
160 }
161 }
162
163 let fully_qualified_name = Name::new_with_enclosing_namespace(name, enclosing_namespace)?;
164
165 if self.parsed_schemas.contains_key(&fully_qualified_name) {
166 return Ok(Schema::Ref {
167 name: fully_qualified_name,
168 });
169 }
170 if let Some(resolving_schema) = self.resolving_schemas.get(&fully_qualified_name) {
171 return Ok(resolving_schema.clone());
172 }
173
174 match fully_qualified_name.name() {
176 "record" | "enum" | "fixed" => {
177 return Err(
178 Details::InvalidSchemaRecord(fully_qualified_name.name().to_string()).into(),
179 );
180 }
181 _ => (),
182 }
183
184 let value = self
185 .input_schemas
186 .remove(&fully_qualified_name)
187 .ok_or_else(|| {
189 let full_name = fully_qualified_name.fullname(None);
190 if full_name == "bool" {
191 Details::ParsePrimitiveSimilar(full_name, "boolean")
192 } else {
193 Details::ParsePrimitive(full_name)
194 }
195 })?;
196
197 let parsed = self.parse(&value, None)?;
199 self.parsed_schemas.insert(
200 self.get_schema_type_name(fully_qualified_name, &value)?,
201 parsed.clone(),
202 );
203
204 Ok(get_schema_ref(&parsed))
205 }
206
207 fn get_decimal_integer(
208 &self,
209 complex: &Map<String, Value>,
210 key: &'static str,
211 ) -> AvroResult<DecimalMetadata> {
212 match complex.get(key) {
213 Some(Value::Number(value)) => self.parse_json_integer_for_decimal(value),
214 None => {
215 if key == "scale" {
216 Ok(0)
217 } else {
218 Err(Details::GetDecimalMetadataFromJson(key).into())
219 }
220 }
221 Some(value) => Err(Details::GetDecimalMetadataValueFromJson {
222 key: key.into(),
223 value: value.clone(),
224 }
225 .into()),
226 }
227 }
228
229 fn parse_precision_and_scale(
230 &self,
231 complex: &Map<String, Value>,
232 ) -> AvroResult<(Precision, Scale)> {
233 let precision = self.get_decimal_integer(complex, "precision")?;
234 let scale = self.get_decimal_integer(complex, "scale")?;
235
236 if precision < 1 {
237 return Err(Details::DecimalPrecisionMuBePositive { precision }.into());
238 }
239
240 if precision < scale {
241 Err(Details::DecimalPrecisionLessThanScale { precision, scale }.into())
242 } else {
243 Ok((precision, scale))
244 }
245 }
246
247 pub(super) fn parse_complex(
252 &mut self,
253 complex: &Map<String, Value>,
254 enclosing_namespace: NamespaceRef,
255 ) -> AvroResult<Schema> {
256 fn parse_as_native_complex(
258 complex: &Map<String, Value>,
259 parser: &mut Parser,
260 enclosing_namespace: NamespaceRef,
261 ) -> AvroResult<Schema> {
262 match complex.get("type") {
263 Some(value) => match value {
264 Value::String(s) if s == "fixed" => {
265 parser.parse_fixed(complex, enclosing_namespace)
266 }
267 _ => parser.parse(value, enclosing_namespace),
268 },
269 None => Err(Details::GetLogicalTypeField.into()),
270 }
271 }
272
273 fn try_convert_to_logical_type<F>(
280 logical_type: &str,
281 schema: Schema,
282 supported_schema_kinds: &[SchemaKind],
283 convert: F,
284 ) -> AvroResult<Schema>
285 where
286 F: Fn(Schema) -> AvroResult<Schema>,
287 {
288 let kind = SchemaKind::from(schema.clone());
289 if supported_schema_kinds.contains(&kind) {
290 convert(schema)
291 } else {
292 warn!(
293 "Ignoring unknown logical type '{logical_type}' for schema of type: {schema:?}!"
294 );
295 Ok(schema)
296 }
297 }
298
299 match complex.get("logicalType") {
300 Some(Value::String(t)) => match t.as_str() {
301 "decimal" => {
302 return try_convert_to_logical_type(
303 "decimal",
304 parse_as_native_complex(complex, self, enclosing_namespace)?,
305 &[SchemaKind::Fixed, SchemaKind::Bytes],
306 |inner| -> AvroResult<Schema> {
307 match self.parse_precision_and_scale(complex) {
308 Ok((precision, scale)) => Ok(Schema::Decimal(DecimalSchema {
309 precision,
310 scale,
311 inner: inner.try_into()?,
312 })),
313 Err(err) => {
314 warn!("Ignoring invalid decimal logical type: {err}");
315 Ok(inner)
316 }
317 }
318 },
319 );
320 }
321 "big-decimal" => {
322 return try_convert_to_logical_type(
323 "big-decimal",
324 parse_as_native_complex(complex, self, enclosing_namespace)?,
325 &[SchemaKind::Bytes],
326 |_| -> AvroResult<Schema> { Ok(Schema::BigDecimal) },
327 );
328 }
329 "uuid" => {
330 return try_convert_to_logical_type(
331 "uuid",
332 parse_as_native_complex(complex, self, enclosing_namespace)?,
333 &[SchemaKind::String, SchemaKind::Fixed, SchemaKind::Bytes],
334 |schema| match schema {
335 Schema::String => Ok(Schema::Uuid(UuidSchema::String)),
336 Schema::Fixed(fixed @ FixedSchema { size: 16, .. }) => {
337 Ok(Schema::Uuid(UuidSchema::Fixed(fixed)))
338 }
339 Schema::Fixed(FixedSchema { size, .. }) => {
340 warn!(
341 "Ignoring uuid logical type for a Fixed schema because its size ({size:?}) is not 16! Schema: {schema:?}"
342 );
343 Ok(schema)
344 }
345 Schema::Bytes => Ok(Schema::Uuid(UuidSchema::Bytes)),
346 _ => {
347 warn!("Ignoring invalid uuid logical type for schema: {schema:?}");
348 Ok(schema)
349 }
350 },
351 );
352 }
353 "date" => {
354 return try_convert_to_logical_type(
355 "date",
356 parse_as_native_complex(complex, self, enclosing_namespace)?,
357 &[SchemaKind::Int],
358 |_| -> AvroResult<Schema> { Ok(Schema::Date) },
359 );
360 }
361 "time-millis" => {
362 return try_convert_to_logical_type(
363 "date",
364 parse_as_native_complex(complex, self, enclosing_namespace)?,
365 &[SchemaKind::Int],
366 |_| -> AvroResult<Schema> { Ok(Schema::TimeMillis) },
367 );
368 }
369 "time-micros" => {
370 return try_convert_to_logical_type(
371 "time-micros",
372 parse_as_native_complex(complex, self, enclosing_namespace)?,
373 &[SchemaKind::Long],
374 |_| -> AvroResult<Schema> { Ok(Schema::TimeMicros) },
375 );
376 }
377 "timestamp-millis" => {
378 return try_convert_to_logical_type(
379 "timestamp-millis",
380 parse_as_native_complex(complex, self, enclosing_namespace)?,
381 &[SchemaKind::Long],
382 |_| -> AvroResult<Schema> { Ok(Schema::TimestampMillis) },
383 );
384 }
385 "timestamp-micros" => {
386 return try_convert_to_logical_type(
387 "timestamp-micros",
388 parse_as_native_complex(complex, self, enclosing_namespace)?,
389 &[SchemaKind::Long],
390 |_| -> AvroResult<Schema> { Ok(Schema::TimestampMicros) },
391 );
392 }
393 "timestamp-nanos" => {
394 return try_convert_to_logical_type(
395 "timestamp-nanos",
396 parse_as_native_complex(complex, self, enclosing_namespace)?,
397 &[SchemaKind::Long],
398 |_| -> AvroResult<Schema> { Ok(Schema::TimestampNanos) },
399 );
400 }
401 "local-timestamp-millis" => {
402 return try_convert_to_logical_type(
403 "local-timestamp-millis",
404 parse_as_native_complex(complex, self, enclosing_namespace)?,
405 &[SchemaKind::Long],
406 |_| -> AvroResult<Schema> { Ok(Schema::LocalTimestampMillis) },
407 );
408 }
409 "local-timestamp-micros" => {
410 return try_convert_to_logical_type(
411 "local-timestamp-micros",
412 parse_as_native_complex(complex, self, enclosing_namespace)?,
413 &[SchemaKind::Long],
414 |_| -> AvroResult<Schema> { Ok(Schema::LocalTimestampMicros) },
415 );
416 }
417 "local-timestamp-nanos" => {
418 return try_convert_to_logical_type(
419 "local-timestamp-nanos",
420 parse_as_native_complex(complex, self, enclosing_namespace)?,
421 &[SchemaKind::Long],
422 |_| -> AvroResult<Schema> { Ok(Schema::LocalTimestampNanos) },
423 );
424 }
425 "duration" => {
426 return try_convert_to_logical_type(
427 "duration",
428 parse_as_native_complex(complex, self, enclosing_namespace)?,
429 &[SchemaKind::Fixed],
430 |schema| -> AvroResult<Schema> {
431 match schema {
432 Schema::Fixed(fixed @ FixedSchema { size: 12, .. }) => {
433 Ok(Schema::Duration(fixed))
434 }
435 Schema::Fixed(FixedSchema { size, .. }) => {
436 warn!(
437 "Ignoring duration logical type on fixed type because size ({size}) is not 12! Schema: {schema:?}"
438 );
439 Ok(schema)
440 }
441 _ => {
442 warn!(
443 "Ignoring invalid duration logical type for schema: {schema:?}"
444 );
445 Ok(schema)
446 }
447 }
448 },
449 );
450 }
451 _ => {}
454 },
455 Some(value) => return Err(Details::GetLogicalTypeFieldType(value.clone()).into()),
459 _ => {}
460 }
461 match complex.get("type") {
462 Some(Value::String(t)) => match t.as_str() {
463 "record" => self.parse_record(complex, enclosing_namespace),
464 "enum" => self.parse_enum(complex, enclosing_namespace),
465 "array" => self.parse_array(complex, enclosing_namespace),
466 "map" => self.parse_map(complex, enclosing_namespace),
467 "fixed" => self.parse_fixed(complex, enclosing_namespace),
468 other => self.parse_known_schema(other, enclosing_namespace),
469 },
470 Some(Value::Object(data)) => self.parse_complex(data, enclosing_namespace),
471 Some(Value::Array(variants)) => self.parse_union(variants, enclosing_namespace),
472 Some(unknown) => Err(Details::GetComplexType(unknown.clone()).into()),
473 None => Err(Details::GetComplexTypeField.into()),
474 }
475 }
476
477 fn register_resolving_schema(&mut self, name: &Name, aliases: &Aliases) {
478 let resolving_schema = Schema::Ref { name: name.clone() };
479 self.resolving_schemas
480 .insert(name.clone(), resolving_schema.clone());
481
482 let namespace = name.namespace();
483
484 if let Some(aliases) = aliases {
485 aliases.iter().for_each(|alias| {
486 let alias_fullname = alias.fully_qualified_name(namespace).into_owned();
487 self.resolving_schemas
488 .insert(alias_fullname, resolving_schema.clone());
489 });
490 }
491 }
492
493 fn register_parsed_schema(
494 &mut self,
495 fully_qualified_name: &Name,
496 schema: &Schema,
497 aliases: &Aliases,
498 ) {
499 self.parsed_schemas
502 .insert(fully_qualified_name.clone(), schema.clone());
503 self.resolving_schemas.remove(fully_qualified_name);
504
505 let namespace = fully_qualified_name.namespace();
506
507 if let Some(aliases) = aliases {
508 aliases.iter().for_each(|alias| {
509 let alias_fullname = alias.fully_qualified_name(namespace);
510 self.resolving_schemas.remove(&alias_fullname);
511 self.parsed_schemas
512 .insert(alias_fullname.into_owned(), schema.clone());
513 });
514 }
515 }
516
517 fn get_already_seen_schema(
519 &self,
520 complex: &Map<String, Value>,
521 enclosing_namespace: NamespaceRef,
522 ) -> Option<&Schema> {
523 match complex.get("type") {
524 Some(Value::String(typ)) => {
525 let name =
530 Name::new_with_enclosing_namespace(typ.as_str(), enclosing_namespace).ok()?;
531 self.resolving_schemas
532 .get(&name)
533 .or_else(|| self.parsed_schemas.get(&name))
534 }
535 _ => None,
536 }
537 }
538
539 fn parse_record(
541 &mut self,
542 complex: &Map<String, Value>,
543 enclosing_namespace: NamespaceRef,
544 ) -> AvroResult<Schema> {
545 let fields_opt = complex.get("fields");
546
547 if fields_opt.is_none()
548 && let Some(seen) = self.get_already_seen_schema(complex, enclosing_namespace)
549 {
550 return Ok(seen.clone());
551 }
552
553 let fully_qualified_name = Name::parse(complex, enclosing_namespace)?;
554 let aliases =
555 self.fix_aliases_namespace(complex.aliases(), fully_qualified_name.namespace())?;
556
557 let mut lookup = BTreeMap::new();
558
559 self.register_resolving_schema(&fully_qualified_name, &aliases);
560
561 debug!("Going to parse record schema: {:?}", &fully_qualified_name);
562
563 let fields: Vec<RecordField> = fields_opt
564 .and_then(|fields| fields.as_array())
565 .ok_or_else(|| Error::new(Details::GetRecordFieldsJson))
566 .and_then(|fields| {
567 fields
568 .iter()
569 .filter_map(|field| field.as_object())
570 .map(|field| RecordField::parse(field, self, &fully_qualified_name))
571 .collect::<Result<_, _>>()
572 })?;
573
574 for (position, field) in fields.iter().enumerate() {
575 if let Some(_old) = lookup.insert(field.name.clone(), position) {
576 return Err(Details::FieldNameDuplicate(field.name.clone()).into());
577 }
578
579 for alias in &field.aliases {
580 lookup.insert(alias.clone(), position);
581 }
582 }
583
584 let schema = Schema::Record(RecordSchema {
585 name: fully_qualified_name.clone(),
586 aliases: aliases.clone(),
587 doc: complex.doc(),
588 fields,
589 lookup,
590 attributes: self.get_custom_attributes(complex, &["fields"]),
591 });
592
593 self.register_parsed_schema(&fully_qualified_name, &schema, &aliases);
594 Ok(schema)
595 }
596
597 fn get_custom_attributes(
598 &self,
599 complex: &Map<String, Value>,
600 excluded: &[&'static str],
601 ) -> BTreeMap<String, Value> {
602 let mut custom_attributes: BTreeMap<String, Value> = BTreeMap::new();
603 for (key, value) in complex {
604 match key.as_str() {
605 "type" | "name" | "namespace" | "doc" | "aliases" | "logicalType" => continue,
606 candidate if excluded.contains(&candidate) => continue,
607 _ => custom_attributes.insert(key.clone(), value.clone()),
608 };
609 }
610 custom_attributes
611 }
612
613 fn parse_enum(
615 &mut self,
616 complex: &Map<String, Value>,
617 enclosing_namespace: NamespaceRef,
618 ) -> AvroResult<Schema> {
619 let symbols_opt = complex.get("symbols");
620
621 if symbols_opt.is_none()
622 && let Some(seen) = self.get_already_seen_schema(complex, enclosing_namespace)
623 {
624 return Ok(seen.clone());
625 }
626
627 let fully_qualified_name = Name::parse(complex, enclosing_namespace)?;
628 let aliases =
629 self.fix_aliases_namespace(complex.aliases(), fully_qualified_name.namespace())?;
630
631 let symbols: Vec<String> = symbols_opt
632 .and_then(|v| v.as_array())
633 .ok_or_else(|| Error::from(Details::GetEnumSymbolsField))
634 .and_then(|symbols| {
635 symbols
636 .iter()
637 .map(|symbol| symbol.as_str().map(|s| s.to_string()))
638 .collect::<Option<_>>()
639 .ok_or_else(|| Error::from(Details::GetEnumSymbols))
640 })?;
641
642 let mut existing_symbols: HashSet<&String> = HashSet::with_capacity(symbols.len());
643 for symbol in symbols.iter() {
644 validate_enum_symbol_name(symbol)?;
645
646 if existing_symbols.contains(&symbol) {
648 return Err(Details::EnumSymbolDuplicate(symbol.to_string()).into());
649 }
650
651 existing_symbols.insert(symbol);
652 }
653
654 let mut default: Option<String> = None;
655 if let Some(value) = complex.get("default") {
656 if let Value::String(ref s) = *value {
657 default = Some(s.clone());
658 } else {
659 return Err(Details::EnumDefaultWrongType(value.clone()).into());
660 }
661 }
662
663 if let Some(ref value) = default {
664 let resolved = types::Value::from(value.clone())
665 .resolve_enum(&symbols, &Some(value.to_string()), &None)
666 .is_ok();
667 if !resolved {
668 return Err(Details::GetEnumDefault {
669 symbol: value.to_string(),
670 symbols,
671 }
672 .into());
673 }
674 }
675
676 let schema = Schema::Enum(EnumSchema {
677 name: fully_qualified_name.clone(),
678 aliases: aliases.clone(),
679 doc: complex.doc(),
680 symbols,
681 default,
682 attributes: self.get_custom_attributes(complex, &["symbols", "default"]),
683 });
684
685 self.register_parsed_schema(&fully_qualified_name, &schema, &aliases);
686
687 Ok(schema)
688 }
689
690 fn parse_array(
692 &mut self,
693 complex: &Map<String, Value>,
694 enclosing_namespace: NamespaceRef,
695 ) -> AvroResult<Schema> {
696 let items = complex
697 .get("items")
698 .ok_or_else(|| Details::GetArrayItemsField.into())
699 .and_then(|items| self.parse(items, enclosing_namespace))?;
700 Ok(Schema::Array(ArraySchema {
701 items: Box::new(items),
702 attributes: self.get_custom_attributes(complex, &["items"]),
703 }))
704 }
705
706 fn parse_map(
708 &mut self,
709 complex: &Map<String, Value>,
710 enclosing_namespace: NamespaceRef,
711 ) -> AvroResult<Schema> {
712 let types = complex
713 .get("values")
714 .ok_or_else(|| Details::GetMapValuesField.into())
715 .and_then(|types| self.parse(types, enclosing_namespace))?;
716
717 Ok(Schema::Map(MapSchema {
718 types: Box::new(types),
719 attributes: self.get_custom_attributes(complex, &["values"]),
720 }))
721 }
722
723 fn parse_union(
725 &mut self,
726 items: &[Value],
727 enclosing_namespace: NamespaceRef,
728 ) -> AvroResult<Schema> {
729 items
730 .iter()
731 .map(|v| self.parse(v, enclosing_namespace))
732 .collect::<Result<Vec<_>, _>>()
733 .and_then(|schemas| {
734 if schemas.is_empty() {
735 error!(
736 "Union schemas should have at least two members! \
737 Please enable debug logging to find out which Record schema \
738 declares the union with 'RUST_LOG=apache_avro::schema=debug'."
739 );
740 } else if schemas.len() == 1 {
741 warn!(
742 "Union schema with just one member! Consider dropping the union! \
743 Please enable debug logging to find out which Record schema \
744 declares the union with 'RUST_LOG=apache_avro::schema=debug'."
745 );
746 }
747 Ok(Schema::Union(UnionSchema::new(schemas)?))
748 })
749 }
750
751 fn parse_fixed(
753 &mut self,
754 complex: &Map<String, Value>,
755 enclosing_namespace: NamespaceRef,
756 ) -> AvroResult<Schema> {
757 let size_opt = complex.get("size");
758 if size_opt.is_none()
759 && let Some(seen) = self.get_already_seen_schema(complex, enclosing_namespace)
760 {
761 return Ok(seen.clone());
762 }
763
764 let doc = complex.get("doc").and_then(|v| match &v {
765 &Value::String(docstr) => Some(docstr.clone()),
766 _ => None,
767 });
768
769 let size = match size_opt {
770 Some(size) => size
771 .as_u64()
772 .ok_or_else(|| Details::GetFixedSizeFieldPositive(size.clone())),
773 None => Err(Details::GetFixedSizeField),
774 }?;
775
776 let fully_qualified_name = Name::parse(complex, enclosing_namespace)?;
777 let aliases =
778 self.fix_aliases_namespace(complex.aliases(), fully_qualified_name.namespace())?;
779
780 let schema = Schema::Fixed(FixedSchema {
781 name: fully_qualified_name.clone(),
782 aliases: aliases.clone(),
783 doc,
784 size: size as usize,
785 attributes: self.get_custom_attributes(complex, &["size"]),
786 });
787
788 self.register_parsed_schema(&fully_qualified_name, &schema, &aliases);
789
790 Ok(schema)
791 }
792
793 fn fix_aliases_namespace(
799 &self,
800 aliases: Option<Vec<String>>,
801 namespace: NamespaceRef,
802 ) -> AvroResult<Aliases> {
803 aliases
804 .map(|aliases| {
805 aliases
806 .iter()
807 .map(|alias| {
808 if alias.contains('.') {
812 Alias::new(alias.as_str())
813 } else {
814 match namespace {
815 Some(ns) => Alias::new(&format!("{ns}.{alias}")),
816 None => Alias::new(alias.as_str()),
817 }
818 }
819 })
820 .collect::<AvroResult<Vec<_>>>()
821 })
822 .transpose()
823 }
824
825 fn get_schema_type_name(&self, name: Name, value: &Value) -> AvroResult<Name> {
826 match value.get("type") {
827 Some(Value::Object(complex_type)) => match complex_type.name() {
828 Some(type_name) => Name::new(type_name),
831 _ => Ok(name),
832 },
833 _ => Ok(name),
834 }
835 }
836
837 fn parse_json_integer_for_decimal(
838 &self,
839 value: &serde_json::Number,
840 ) -> AvroResult<DecimalMetadata> {
841 Ok(if value.is_u64() {
842 let num = value
843 .as_u64()
844 .ok_or_else(|| Details::GetU64FromJson(value.clone()))?;
845 num.try_into()
846 .map_err(|e| Details::ConvertU64ToUsize(e, num))?
847 } else if value.is_i64() {
848 let num = value
849 .as_i64()
850 .ok_or_else(|| Details::GetI64FromJson(value.clone()))?;
851 num.try_into()
852 .map_err(|e| Details::ConvertI64ToUsize(e, num))?
853 } else {
854 return Err(Details::GetPrecisionOrScaleFromJson(value.clone()).into());
855 })
856 }
857}