Skip to main content

apache_avro/schema/
parser.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::error::Details;
19use crate::schema::{
20    Alias, Aliases, ArraySchema, DecimalMetadata, DecimalSchema, EnumSchema, FixedSchema,
21    MapSchema, Name, Names, NamespaceRef, Precision, RecordField, RecordSchema, Scale, Schema,
22    SchemaKind, UnionSchema, UuidSchema,
23};
24use crate::types;
25use crate::util::MapHelper;
26use crate::validator::validate_enum_symbol_name;
27use crate::{AvroResult, Error};
28use log::{debug, error, warn};
29use serde_json::{Map, Value};
30use std::collections::{BTreeMap, HashMap, HashSet};
31
32#[derive(Default)]
33pub(crate) struct Parser {
34    input_schemas: HashMap<Name, Value>,
35    /// Used to resolve cyclic references, i.e. when a
36    /// field's type is a reference to its record's type
37    resolving_schemas: Names,
38    input_order: Vec<Name>,
39    /// Used to avoid parsing the same schema twice
40    parsed_schemas: Names,
41}
42
43impl Parser {
44    pub(crate) fn new(
45        input_schemas: HashMap<Name, Value>,
46        input_order: Vec<Name>,
47        parsed_schemas: Names,
48    ) -> Self {
49        Self {
50            input_schemas,
51            resolving_schemas: HashMap::default(),
52            input_order,
53            parsed_schemas,
54        }
55    }
56
57    pub(crate) fn get_parsed_schemas(&mut self) -> &Names {
58        &self.parsed_schemas
59    }
60
61    /// Create a `Schema` from a string representing a JSON Avro schema.
62    pub(super) fn parse_str(&mut self, input: &str) -> AvroResult<Schema> {
63        let value = serde_json::from_str(input).map_err(Details::ParseSchemaJson)?;
64        self.parse(&value, None)
65    }
66
67    /// Create an array of `Schema`s from an iterator of JSON Avro schemas.
68    ///
69    /// It is allowed that the schemas have cross-dependencies; these will be resolved during parsing.
70    pub(super) fn parse_list(&mut self) -> AvroResult<Vec<Schema>> {
71        self.parse_input_schemas()?;
72
73        let mut parsed_schemas = Vec::with_capacity(self.parsed_schemas.len());
74        for name in self.input_order.drain(0..) {
75            let parsed = self
76                .parsed_schemas
77                .remove(&name)
78                .expect("One of the input schemas was unexpectedly not parsed");
79            parsed_schemas.push(parsed);
80        }
81        Ok(parsed_schemas)
82    }
83
84    /// Convert the input schemas to `parsed_schemas`.
85    pub(super) fn parse_input_schemas(&mut self) -> Result<(), Error> {
86        while !self.input_schemas.is_empty() {
87            let next_name = self
88                .input_schemas
89                .keys()
90                .next()
91                .expect("Input schemas unexpectedly empty")
92                .to_owned();
93            let (name, value) = self
94                .input_schemas
95                .remove_entry(&next_name)
96                .expect("Key unexpectedly missing");
97            let parsed = self.parse(&value, None)?;
98            self.parsed_schemas
99                .insert(self.get_schema_type_name(name, &value)?, parsed);
100        }
101        Ok(())
102    }
103
104    /// Create a `Schema` from a `serde_json::Value` representing a JSON Avro schema.
105    pub(super) fn parse(
106        &mut self,
107        value: &Value,
108        enclosing_namespace: NamespaceRef,
109    ) -> AvroResult<Schema> {
110        match *value {
111            Value::String(ref t) => self.parse_known_schema(t.as_str(), enclosing_namespace),
112            Value::Object(ref data) => self.parse_complex(data, enclosing_namespace),
113            Value::Array(ref data) => self.parse_union(data, enclosing_namespace),
114            _ => Err(Details::ParseSchemaFromValidJson.into()),
115        }
116    }
117
118    /// Parse a string as a primitive type or reference to `parsed_schemas`.
119    fn parse_known_schema(
120        &mut self,
121        name: &str,
122        enclosing_namespace: NamespaceRef,
123    ) -> AvroResult<Schema> {
124        match name {
125            "null" => Ok(Schema::Null),
126            "boolean" => Ok(Schema::Boolean),
127            "int" => Ok(Schema::Int),
128            "long" => Ok(Schema::Long),
129            "double" => Ok(Schema::Double),
130            "float" => Ok(Schema::Float),
131            "bytes" => Ok(Schema::Bytes),
132            "string" => Ok(Schema::String),
133            _ => self.fetch_schema_ref(name, enclosing_namespace),
134        }
135    }
136
137    /// Given a name, tries to retrieve the parsed schema from `parsed_schemas`.
138    ///
139    /// If a parsed schema is not found, it checks if a currently resolving
140    /// schema with that name exists.
141    /// If a resolving schema is not found, it checks if a JSON with that name exists
142    /// in `input_schemas` and then parses it (removing it from `input_schemas`)
143    /// and adds the parsed schema to `parsed_schemas`.
144    ///
145    /// This method allows schemas definitions that depend on other types to
146    /// parse their dependencies (or look them up if already parsed).
147    pub(super) fn fetch_schema_ref(
148        &mut self,
149        name: &str,
150        enclosing_namespace: NamespaceRef,
151    ) -> AvroResult<Schema> {
152        fn get_schema_ref(parsed: &Schema) -> Schema {
153            match parsed {
154                &Schema::Record(RecordSchema { ref name, .. })
155                | &Schema::Enum(EnumSchema { ref name, .. })
156                | &Schema::Fixed(FixedSchema { ref name, .. }) => {
157                    Schema::Ref { name: name.clone() }
158                }
159                _ => parsed.clone(),
160            }
161        }
162
163        let fully_qualified_name = Name::new_with_enclosing_namespace(name, enclosing_namespace)?;
164
165        if self.parsed_schemas.contains_key(&fully_qualified_name) {
166            return Ok(Schema::Ref {
167                name: fully_qualified_name,
168            });
169        }
170        if let Some(resolving_schema) = self.resolving_schemas.get(&fully_qualified_name) {
171            return Ok(resolving_schema.clone());
172        }
173
174        // For good error reporting we add this check
175        match fully_qualified_name.name() {
176            "record" | "enum" | "fixed" => {
177                return Err(
178                    Details::InvalidSchemaRecord(fully_qualified_name.name().to_string()).into(),
179                );
180            }
181            _ => (),
182        }
183
184        let value = self
185            .input_schemas
186            .remove(&fully_qualified_name)
187            // TODO make a better descriptive error message here that conveys that a named schema cannot be found
188            .ok_or_else(|| {
189                let full_name = fully_qualified_name.fullname(None);
190                if full_name == "bool" {
191                    Details::ParsePrimitiveSimilar(full_name, "boolean")
192                } else {
193                    Details::ParsePrimitive(full_name)
194                }
195            })?;
196
197        // parsing a full schema from inside another schema. Other full schema will not inherit namespace
198        let parsed = self.parse(&value, None)?;
199        self.parsed_schemas.insert(
200            self.get_schema_type_name(fully_qualified_name, &value)?,
201            parsed.clone(),
202        );
203
204        Ok(get_schema_ref(&parsed))
205    }
206
207    fn get_decimal_integer(
208        &self,
209        complex: &Map<String, Value>,
210        key: &'static str,
211    ) -> AvroResult<DecimalMetadata> {
212        match complex.get(key) {
213            Some(Value::Number(value)) => self.parse_json_integer_for_decimal(value),
214            None => {
215                if key == "scale" {
216                    Ok(0)
217                } else {
218                    Err(Details::GetDecimalMetadataFromJson(key).into())
219                }
220            }
221            Some(value) => Err(Details::GetDecimalMetadataValueFromJson {
222                key: key.into(),
223                value: value.clone(),
224            }
225            .into()),
226        }
227    }
228
229    fn parse_precision_and_scale(
230        &self,
231        complex: &Map<String, Value>,
232    ) -> AvroResult<(Precision, Scale)> {
233        let precision = self.get_decimal_integer(complex, "precision")?;
234        let scale = self.get_decimal_integer(complex, "scale")?;
235
236        if precision < 1 {
237            return Err(Details::DecimalPrecisionMuBePositive { precision }.into());
238        }
239
240        if precision < scale {
241            Err(Details::DecimalPrecisionLessThanScale { precision, scale }.into())
242        } else {
243            Ok((precision, scale))
244        }
245    }
246
247    /// Parse a `serde_json::Value` representing a complex Avro type into a `Schema`.
248    ///
249    /// Avro supports "recursive" definition of types.
250    /// e.g: `{"type": {"type": "string"}}`
251    pub(super) fn parse_complex(
252        &mut self,
253        complex: &Map<String, Value>,
254        enclosing_namespace: NamespaceRef,
255    ) -> AvroResult<Schema> {
256        // Try to parse this as a native complex type.
257        fn parse_as_native_complex(
258            complex: &Map<String, Value>,
259            parser: &mut Parser,
260            enclosing_namespace: NamespaceRef,
261        ) -> AvroResult<Schema> {
262            match complex.get("type") {
263                Some(value) => match value {
264                    Value::String(s) if s == "fixed" => {
265                        parser.parse_fixed(complex, enclosing_namespace)
266                    }
267                    _ => parser.parse(value, enclosing_namespace),
268                },
269                None => Err(Details::GetLogicalTypeField.into()),
270            }
271        }
272
273        // This crate supports some logical types natively, and this function tries to convert
274        // a native complex type with a logical type attribute to these logical types.
275        // This function:
276        // 1. Checks whether the native complex type is in the supported kinds.
277        // 2. If it is, using the convert function to convert the native complex type to
278        // a logical type.
279        fn try_convert_to_logical_type<F>(
280            logical_type: &str,
281            schema: Schema,
282            supported_schema_kinds: &[SchemaKind],
283            convert: F,
284        ) -> AvroResult<Schema>
285        where
286            F: Fn(Schema) -> AvroResult<Schema>,
287        {
288            let kind = SchemaKind::from(schema.clone());
289            if supported_schema_kinds.contains(&kind) {
290                convert(schema)
291            } else {
292                warn!(
293                    "Ignoring unknown logical type '{logical_type}' for schema of type: {schema:?}!"
294                );
295                Ok(schema)
296            }
297        }
298
299        match complex.get("logicalType") {
300            Some(Value::String(t)) => match t.as_str() {
301                "decimal" => {
302                    return try_convert_to_logical_type(
303                        "decimal",
304                        parse_as_native_complex(complex, self, enclosing_namespace)?,
305                        &[SchemaKind::Fixed, SchemaKind::Bytes],
306                        |inner| -> AvroResult<Schema> {
307                            match self.parse_precision_and_scale(complex) {
308                                Ok((precision, scale)) => Ok(Schema::Decimal(DecimalSchema {
309                                    precision,
310                                    scale,
311                                    inner: inner.try_into()?,
312                                })),
313                                Err(err) => {
314                                    warn!("Ignoring invalid decimal logical type: {err}");
315                                    Ok(inner)
316                                }
317                            }
318                        },
319                    );
320                }
321                "big-decimal" => {
322                    return try_convert_to_logical_type(
323                        "big-decimal",
324                        parse_as_native_complex(complex, self, enclosing_namespace)?,
325                        &[SchemaKind::Bytes],
326                        |_| -> AvroResult<Schema> { Ok(Schema::BigDecimal) },
327                    );
328                }
329                "uuid" => {
330                    return try_convert_to_logical_type(
331                        "uuid",
332                        parse_as_native_complex(complex, self, enclosing_namespace)?,
333                        &[SchemaKind::String, SchemaKind::Fixed, SchemaKind::Bytes],
334                        |schema| match schema {
335                            Schema::String => Ok(Schema::Uuid(UuidSchema::String)),
336                            Schema::Fixed(fixed @ FixedSchema { size: 16, .. }) => {
337                                Ok(Schema::Uuid(UuidSchema::Fixed(fixed)))
338                            }
339                            Schema::Fixed(FixedSchema { size, .. }) => {
340                                warn!(
341                                    "Ignoring uuid logical type for a Fixed schema because its size ({size:?}) is not 16! Schema: {schema:?}"
342                                );
343                                Ok(schema)
344                            }
345                            Schema::Bytes => Ok(Schema::Uuid(UuidSchema::Bytes)),
346                            _ => {
347                                warn!("Ignoring invalid uuid logical type for schema: {schema:?}");
348                                Ok(schema)
349                            }
350                        },
351                    );
352                }
353                "date" => {
354                    return try_convert_to_logical_type(
355                        "date",
356                        parse_as_native_complex(complex, self, enclosing_namespace)?,
357                        &[SchemaKind::Int],
358                        |_| -> AvroResult<Schema> { Ok(Schema::Date) },
359                    );
360                }
361                "time-millis" => {
362                    return try_convert_to_logical_type(
363                        "date",
364                        parse_as_native_complex(complex, self, enclosing_namespace)?,
365                        &[SchemaKind::Int],
366                        |_| -> AvroResult<Schema> { Ok(Schema::TimeMillis) },
367                    );
368                }
369                "time-micros" => {
370                    return try_convert_to_logical_type(
371                        "time-micros",
372                        parse_as_native_complex(complex, self, enclosing_namespace)?,
373                        &[SchemaKind::Long],
374                        |_| -> AvroResult<Schema> { Ok(Schema::TimeMicros) },
375                    );
376                }
377                "timestamp-millis" => {
378                    return try_convert_to_logical_type(
379                        "timestamp-millis",
380                        parse_as_native_complex(complex, self, enclosing_namespace)?,
381                        &[SchemaKind::Long],
382                        |_| -> AvroResult<Schema> { Ok(Schema::TimestampMillis) },
383                    );
384                }
385                "timestamp-micros" => {
386                    return try_convert_to_logical_type(
387                        "timestamp-micros",
388                        parse_as_native_complex(complex, self, enclosing_namespace)?,
389                        &[SchemaKind::Long],
390                        |_| -> AvroResult<Schema> { Ok(Schema::TimestampMicros) },
391                    );
392                }
393                "timestamp-nanos" => {
394                    return try_convert_to_logical_type(
395                        "timestamp-nanos",
396                        parse_as_native_complex(complex, self, enclosing_namespace)?,
397                        &[SchemaKind::Long],
398                        |_| -> AvroResult<Schema> { Ok(Schema::TimestampNanos) },
399                    );
400                }
401                "local-timestamp-millis" => {
402                    return try_convert_to_logical_type(
403                        "local-timestamp-millis",
404                        parse_as_native_complex(complex, self, enclosing_namespace)?,
405                        &[SchemaKind::Long],
406                        |_| -> AvroResult<Schema> { Ok(Schema::LocalTimestampMillis) },
407                    );
408                }
409                "local-timestamp-micros" => {
410                    return try_convert_to_logical_type(
411                        "local-timestamp-micros",
412                        parse_as_native_complex(complex, self, enclosing_namespace)?,
413                        &[SchemaKind::Long],
414                        |_| -> AvroResult<Schema> { Ok(Schema::LocalTimestampMicros) },
415                    );
416                }
417                "local-timestamp-nanos" => {
418                    return try_convert_to_logical_type(
419                        "local-timestamp-nanos",
420                        parse_as_native_complex(complex, self, enclosing_namespace)?,
421                        &[SchemaKind::Long],
422                        |_| -> AvroResult<Schema> { Ok(Schema::LocalTimestampNanos) },
423                    );
424                }
425                "duration" => {
426                    return try_convert_to_logical_type(
427                        "duration",
428                        parse_as_native_complex(complex, self, enclosing_namespace)?,
429                        &[SchemaKind::Fixed],
430                        |schema| -> AvroResult<Schema> {
431                            match schema {
432                                Schema::Fixed(fixed @ FixedSchema { size: 12, .. }) => {
433                                    Ok(Schema::Duration(fixed))
434                                }
435                                Schema::Fixed(FixedSchema { size, .. }) => {
436                                    warn!(
437                                        "Ignoring duration logical type on fixed type because size ({size}) is not 12! Schema: {schema:?}"
438                                    );
439                                    Ok(schema)
440                                }
441                                _ => {
442                                    warn!(
443                                        "Ignoring invalid duration logical type for schema: {schema:?}"
444                                    );
445                                    Ok(schema)
446                                }
447                            }
448                        },
449                    );
450                }
451                // In this case, of an unknown logical type, we just pass through the underlying
452                // type.
453                _ => {}
454            },
455            // The spec says to ignore invalid logical types and just pass through the
456            // underlying type. It is unclear whether that applies to this case or not, where the
457            // `logicalType` is not a string.
458            Some(value) => return Err(Details::GetLogicalTypeFieldType(value.clone()).into()),
459            _ => {}
460        }
461        match complex.get("type") {
462            Some(Value::String(t)) => match t.as_str() {
463                "record" => self.parse_record(complex, enclosing_namespace),
464                "enum" => self.parse_enum(complex, enclosing_namespace),
465                "array" => self.parse_array(complex, enclosing_namespace),
466                "map" => self.parse_map(complex, enclosing_namespace),
467                "fixed" => self.parse_fixed(complex, enclosing_namespace),
468                other => self.parse_known_schema(other, enclosing_namespace),
469            },
470            Some(Value::Object(data)) => self.parse_complex(data, enclosing_namespace),
471            Some(Value::Array(variants)) => self.parse_union(variants, enclosing_namespace),
472            Some(unknown) => Err(Details::GetComplexType(unknown.clone()).into()),
473            None => Err(Details::GetComplexTypeField.into()),
474        }
475    }
476
477    fn register_resolving_schema(&mut self, name: &Name, aliases: &Aliases) {
478        let resolving_schema = Schema::Ref { name: name.clone() };
479        self.resolving_schemas
480            .insert(name.clone(), resolving_schema.clone());
481
482        let namespace = name.namespace();
483
484        if let Some(aliases) = aliases {
485            aliases.iter().for_each(|alias| {
486                let alias_fullname = alias.fully_qualified_name(namespace).into_owned();
487                self.resolving_schemas
488                    .insert(alias_fullname, resolving_schema.clone());
489            });
490        }
491    }
492
493    fn register_parsed_schema(
494        &mut self,
495        fully_qualified_name: &Name,
496        schema: &Schema,
497        aliases: &Aliases,
498    ) {
499        // FIXME, this should be globally aware, so if there is something overwriting something
500        // else then there is an ambiguous schema definition. An appropriate error should be thrown
501        self.parsed_schemas
502            .insert(fully_qualified_name.clone(), schema.clone());
503        self.resolving_schemas.remove(fully_qualified_name);
504
505        let namespace = fully_qualified_name.namespace();
506
507        if let Some(aliases) = aliases {
508            aliases.iter().for_each(|alias| {
509                let alias_fullname = alias.fully_qualified_name(namespace);
510                self.resolving_schemas.remove(&alias_fullname);
511                self.parsed_schemas
512                    .insert(alias_fullname.into_owned(), schema.clone());
513            });
514        }
515    }
516
517    /// Returns already parsed schema or a schema that is currently being resolved.
518    fn get_already_seen_schema(
519        &self,
520        complex: &Map<String, Value>,
521        enclosing_namespace: NamespaceRef,
522    ) -> Option<&Schema> {
523        match complex.get("type") {
524            Some(Value::String(typ)) => {
525                // A `type` that is not a valid Avro name cannot match any
526                // already-seen schema, so treat it as "not seen" rather than
527                // panicking. The real validation error is surfaced later by
528                // the parse path that calls `Name::parse`.
529                let name =
530                    Name::new_with_enclosing_namespace(typ.as_str(), enclosing_namespace).ok()?;
531                self.resolving_schemas
532                    .get(&name)
533                    .or_else(|| self.parsed_schemas.get(&name))
534            }
535            _ => None,
536        }
537    }
538
539    /// Parse a `serde_json::Value` representing an Avro record type into a `Schema`.
540    fn parse_record(
541        &mut self,
542        complex: &Map<String, Value>,
543        enclosing_namespace: NamespaceRef,
544    ) -> AvroResult<Schema> {
545        let fields_opt = complex.get("fields");
546
547        if fields_opt.is_none()
548            && let Some(seen) = self.get_already_seen_schema(complex, enclosing_namespace)
549        {
550            return Ok(seen.clone());
551        }
552
553        let fully_qualified_name = Name::parse(complex, enclosing_namespace)?;
554        let aliases =
555            self.fix_aliases_namespace(complex.aliases(), fully_qualified_name.namespace())?;
556
557        let mut lookup = BTreeMap::new();
558
559        self.register_resolving_schema(&fully_qualified_name, &aliases);
560
561        debug!("Going to parse record schema: {:?}", &fully_qualified_name);
562
563        let fields: Vec<RecordField> = fields_opt
564            .and_then(|fields| fields.as_array())
565            .ok_or_else(|| Error::new(Details::GetRecordFieldsJson))
566            .and_then(|fields| {
567                fields
568                    .iter()
569                    .filter_map(|field| field.as_object())
570                    .map(|field| RecordField::parse(field, self, &fully_qualified_name))
571                    .collect::<Result<_, _>>()
572            })?;
573
574        for (position, field) in fields.iter().enumerate() {
575            if let Some(_old) = lookup.insert(field.name.clone(), position) {
576                return Err(Details::FieldNameDuplicate(field.name.clone()).into());
577            }
578
579            for alias in &field.aliases {
580                lookup.insert(alias.clone(), position);
581            }
582        }
583
584        let schema = Schema::Record(RecordSchema {
585            name: fully_qualified_name.clone(),
586            aliases: aliases.clone(),
587            doc: complex.doc(),
588            fields,
589            lookup,
590            attributes: self.get_custom_attributes(complex, &["fields"]),
591        });
592
593        self.register_parsed_schema(&fully_qualified_name, &schema, &aliases);
594        Ok(schema)
595    }
596
597    fn get_custom_attributes(
598        &self,
599        complex: &Map<String, Value>,
600        excluded: &[&'static str],
601    ) -> BTreeMap<String, Value> {
602        let mut custom_attributes: BTreeMap<String, Value> = BTreeMap::new();
603        for (key, value) in complex {
604            match key.as_str() {
605                "type" | "name" | "namespace" | "doc" | "aliases" | "logicalType" => continue,
606                candidate if excluded.contains(&candidate) => continue,
607                _ => custom_attributes.insert(key.clone(), value.clone()),
608            };
609        }
610        custom_attributes
611    }
612
613    /// Parse a `serde_json::Value` representing a Avro enum type into a `Schema`.
614    fn parse_enum(
615        &mut self,
616        complex: &Map<String, Value>,
617        enclosing_namespace: NamespaceRef,
618    ) -> AvroResult<Schema> {
619        let symbols_opt = complex.get("symbols");
620
621        if symbols_opt.is_none()
622            && let Some(seen) = self.get_already_seen_schema(complex, enclosing_namespace)
623        {
624            return Ok(seen.clone());
625        }
626
627        let fully_qualified_name = Name::parse(complex, enclosing_namespace)?;
628        let aliases =
629            self.fix_aliases_namespace(complex.aliases(), fully_qualified_name.namespace())?;
630
631        let symbols: Vec<String> = symbols_opt
632            .and_then(|v| v.as_array())
633            .ok_or_else(|| Error::from(Details::GetEnumSymbolsField))
634            .and_then(|symbols| {
635                symbols
636                    .iter()
637                    .map(|symbol| symbol.as_str().map(|s| s.to_string()))
638                    .collect::<Option<_>>()
639                    .ok_or_else(|| Error::from(Details::GetEnumSymbols))
640            })?;
641
642        let mut existing_symbols: HashSet<&String> = HashSet::with_capacity(symbols.len());
643        for symbol in symbols.iter() {
644            validate_enum_symbol_name(symbol)?;
645
646            // Ensure there are no duplicate symbols
647            if existing_symbols.contains(&symbol) {
648                return Err(Details::EnumSymbolDuplicate(symbol.to_string()).into());
649            }
650
651            existing_symbols.insert(symbol);
652        }
653
654        let mut default: Option<String> = None;
655        if let Some(value) = complex.get("default") {
656            if let Value::String(ref s) = *value {
657                default = Some(s.clone());
658            } else {
659                return Err(Details::EnumDefaultWrongType(value.clone()).into());
660            }
661        }
662
663        if let Some(ref value) = default {
664            let resolved = types::Value::from(value.clone())
665                .resolve_enum(&symbols, &Some(value.to_string()), &None)
666                .is_ok();
667            if !resolved {
668                return Err(Details::GetEnumDefault {
669                    symbol: value.to_string(),
670                    symbols,
671                }
672                .into());
673            }
674        }
675
676        let schema = Schema::Enum(EnumSchema {
677            name: fully_qualified_name.clone(),
678            aliases: aliases.clone(),
679            doc: complex.doc(),
680            symbols,
681            default,
682            attributes: self.get_custom_attributes(complex, &["symbols", "default"]),
683        });
684
685        self.register_parsed_schema(&fully_qualified_name, &schema, &aliases);
686
687        Ok(schema)
688    }
689
690    /// Parse a `serde_json::Value` representing a Avro array type into a `Schema`.
691    fn parse_array(
692        &mut self,
693        complex: &Map<String, Value>,
694        enclosing_namespace: NamespaceRef,
695    ) -> AvroResult<Schema> {
696        let items = complex
697            .get("items")
698            .ok_or_else(|| Details::GetArrayItemsField.into())
699            .and_then(|items| self.parse(items, enclosing_namespace))?;
700        Ok(Schema::Array(ArraySchema {
701            items: Box::new(items),
702            attributes: self.get_custom_attributes(complex, &["items"]),
703        }))
704    }
705
706    /// Parse a `serde_json::Value` representing a Avro map type into a `Schema`.
707    fn parse_map(
708        &mut self,
709        complex: &Map<String, Value>,
710        enclosing_namespace: NamespaceRef,
711    ) -> AvroResult<Schema> {
712        let types = complex
713            .get("values")
714            .ok_or_else(|| Details::GetMapValuesField.into())
715            .and_then(|types| self.parse(types, enclosing_namespace))?;
716
717        Ok(Schema::Map(MapSchema {
718            types: Box::new(types),
719            attributes: self.get_custom_attributes(complex, &["values"]),
720        }))
721    }
722
723    /// Parse a `serde_json::Value` representing a Avro union type into a `Schema`.
724    fn parse_union(
725        &mut self,
726        items: &[Value],
727        enclosing_namespace: NamespaceRef,
728    ) -> AvroResult<Schema> {
729        items
730            .iter()
731            .map(|v| self.parse(v, enclosing_namespace))
732            .collect::<Result<Vec<_>, _>>()
733            .and_then(|schemas| {
734                if schemas.is_empty() {
735                    error!(
736                        "Union schemas should have at least two members! \
737                    Please enable debug logging to find out which Record schema \
738                    declares the union with 'RUST_LOG=apache_avro::schema=debug'."
739                    );
740                } else if schemas.len() == 1 {
741                    warn!(
742                        "Union schema with just one member! Consider dropping the union! \
743                    Please enable debug logging to find out which Record schema \
744                    declares the union with 'RUST_LOG=apache_avro::schema=debug'."
745                    );
746                }
747                Ok(Schema::Union(UnionSchema::new(schemas)?))
748            })
749    }
750
751    /// Parse a `serde_json::Value` representing a Avro fixed type into a `Schema`.
752    fn parse_fixed(
753        &mut self,
754        complex: &Map<String, Value>,
755        enclosing_namespace: NamespaceRef,
756    ) -> AvroResult<Schema> {
757        let size_opt = complex.get("size");
758        if size_opt.is_none()
759            && let Some(seen) = self.get_already_seen_schema(complex, enclosing_namespace)
760        {
761            return Ok(seen.clone());
762        }
763
764        let doc = complex.get("doc").and_then(|v| match &v {
765            &Value::String(docstr) => Some(docstr.clone()),
766            _ => None,
767        });
768
769        let size = match size_opt {
770            Some(size) => size
771                .as_u64()
772                .ok_or_else(|| Details::GetFixedSizeFieldPositive(size.clone())),
773            None => Err(Details::GetFixedSizeField),
774        }?;
775
776        let fully_qualified_name = Name::parse(complex, enclosing_namespace)?;
777        let aliases =
778            self.fix_aliases_namespace(complex.aliases(), fully_qualified_name.namespace())?;
779
780        let schema = Schema::Fixed(FixedSchema {
781            name: fully_qualified_name.clone(),
782            aliases: aliases.clone(),
783            doc,
784            size: size as usize,
785            attributes: self.get_custom_attributes(complex, &["size"]),
786        });
787
788        self.register_parsed_schema(&fully_qualified_name, &schema, &aliases);
789
790        Ok(schema)
791    }
792
793    // A type alias may be specified either as a fully namespace-qualified, or relative
794    // to the namespace of the name it is an alias for. For example, if a type named "a.b"
795    // has aliases of "c" and "x.y", then the fully qualified names of its aliases are "a.c"
796    // and "x.y".
797    // https://avro.apache.org/docs/++version++/specification/#aliases
798    fn fix_aliases_namespace(
799        &self,
800        aliases: Option<Vec<String>>,
801        namespace: NamespaceRef,
802    ) -> AvroResult<Aliases> {
803        aliases
804            .map(|aliases| {
805                aliases
806                    .iter()
807                    .map(|alias| {
808                        // An alias that is not a valid Avro name is a schema
809                        // error — propagate it instead of unwrapping (which
810                        // would panic on attacker-controlled schema JSON).
811                        if alias.contains('.') {
812                            Alias::new(alias.as_str())
813                        } else {
814                            match namespace {
815                                Some(ns) => Alias::new(&format!("{ns}.{alias}")),
816                                None => Alias::new(alias.as_str()),
817                            }
818                        }
819                    })
820                    .collect::<AvroResult<Vec<_>>>()
821            })
822            .transpose()
823    }
824
825    fn get_schema_type_name(&self, name: Name, value: &Value) -> AvroResult<Name> {
826        match value.get("type") {
827            Some(Value::Object(complex_type)) => match complex_type.name() {
828                // Propagate the validation error if the nested `type` name is
829                // not a valid Avro name, rather than panicking on `unwrap()`.
830                Some(type_name) => Name::new(type_name),
831                _ => Ok(name),
832            },
833            _ => Ok(name),
834        }
835    }
836
837    fn parse_json_integer_for_decimal(
838        &self,
839        value: &serde_json::Number,
840    ) -> AvroResult<DecimalMetadata> {
841        Ok(if value.is_u64() {
842            let num = value
843                .as_u64()
844                .ok_or_else(|| Details::GetU64FromJson(value.clone()))?;
845            num.try_into()
846                .map_err(|e| Details::ConvertU64ToUsize(e, num))?
847        } else if value.is_i64() {
848            let num = value
849                .as_i64()
850                .ok_or_else(|| Details::GetI64FromJson(value.clone()))?;
851            num.try_into()
852                .map_err(|e| Details::ConvertI64ToUsize(e, num))?
853        } else {
854            return Err(Details::GetPrecisionOrScaleFromJson(value.clone()).into());
855        })
856    }
857}