apache_avro/schema/
parser.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::error::Details;
19use crate::schema::record::RecordSchemaParseLocation;
20use crate::schema::{
21    Alias, Aliases, DecimalMetadata, DecimalSchema, EnumSchema, FixedSchema, Name, Names,
22    Namespace, Precision, RecordField, RecordSchema, Scale, Schema, SchemaKind, UnionSchema,
23    UuidSchema,
24};
25use crate::types;
26use crate::util::MapHelper;
27use crate::validator::validate_enum_symbol_name;
28use crate::{AvroResult, Error};
29use log::{debug, error, warn};
30use serde_json::{Map, Value};
31use std::collections::{BTreeMap, HashMap, HashSet};
32
33#[derive(Default)]
34pub(crate) struct Parser {
35    input_schemas: HashMap<Name, Value>,
36    /// A map of name -> Schema::Ref
37    /// Used to resolve cyclic references, i.e. when a
38    /// field's type is a reference to its record's type
39    resolving_schemas: Names,
40    input_order: Vec<Name>,
41    /// A map of name -> fully parsed Schema
42    /// Used to avoid parsing the same schema twice
43    parsed_schemas: Names,
44}
45
46impl Parser {
47    pub(crate) fn new(
48        input_schemas: HashMap<Name, Value>,
49        input_order: Vec<Name>,
50        parsed_schemas: Names,
51    ) -> Self {
52        Self {
53            input_schemas,
54            resolving_schemas: HashMap::default(),
55            input_order,
56            parsed_schemas,
57        }
58    }
59
60    pub(crate) fn get_parsed_schemas(&mut self) -> &Names {
61        &self.parsed_schemas
62    }
63
64    /// Create a `Schema` from a string representing a JSON Avro schema.
65    pub(super) fn parse_str(&mut self, input: &str) -> AvroResult<Schema> {
66        let value = serde_json::from_str(input).map_err(Details::ParseSchemaJson)?;
67        self.parse(&value, &None)
68    }
69
70    /// Create an array of `Schema`'s from an iterator of JSON Avro schemas. It is allowed that
71    /// the schemas have cross-dependencies; these will be resolved during parsing.
72    pub(super) fn parse_list(&mut self) -> AvroResult<Vec<Schema>> {
73        self.parse_input_schemas()?;
74
75        let mut parsed_schemas = Vec::with_capacity(self.parsed_schemas.len());
76        for name in self.input_order.drain(0..) {
77            let parsed = self
78                .parsed_schemas
79                .remove(&name)
80                .expect("One of the input schemas was unexpectedly not parsed");
81            parsed_schemas.push(parsed);
82        }
83        Ok(parsed_schemas)
84    }
85
86    /// Convert the input schemas to parsed_schemas
87    pub(super) fn parse_input_schemas(&mut self) -> Result<(), Error> {
88        while !self.input_schemas.is_empty() {
89            let next_name = self
90                .input_schemas
91                .keys()
92                .next()
93                .expect("Input schemas unexpectedly empty")
94                .to_owned();
95            let (name, value) = self
96                .input_schemas
97                .remove_entry(&next_name)
98                .expect("Key unexpectedly missing");
99            let parsed = self.parse(&value, &None)?;
100            self.parsed_schemas
101                .insert(self.get_schema_type_name(name, value), parsed);
102        }
103        Ok(())
104    }
105
106    /// Create a `Schema` from a `serde_json::Value` representing a JSON Avro
107    /// schema.
108    pub(super) fn parse(
109        &mut self,
110        value: &Value,
111        enclosing_namespace: &Namespace,
112    ) -> AvroResult<Schema> {
113        match *value {
114            Value::String(ref t) => self.parse_known_schema(t.as_str(), enclosing_namespace),
115            Value::Object(ref data) => {
116                self.parse_complex(data, enclosing_namespace, RecordSchemaParseLocation::Root)
117            }
118            Value::Array(ref data) => self.parse_union(data, enclosing_namespace),
119            _ => Err(Details::ParseSchemaFromValidJson.into()),
120        }
121    }
122
123    /// Parse a `serde_json::Value` representing an Avro type whose Schema is known into a
124    /// `Schema`. A Schema for a `serde_json::Value` is known if it is primitive or has
125    /// been parsed previously by the parsed and stored in its map of parsed_schemas.
126    fn parse_known_schema(
127        &mut self,
128        name: &str,
129        enclosing_namespace: &Namespace,
130    ) -> AvroResult<Schema> {
131        match name {
132            "null" => Ok(Schema::Null),
133            "boolean" => Ok(Schema::Boolean),
134            "int" => Ok(Schema::Int),
135            "long" => Ok(Schema::Long),
136            "double" => Ok(Schema::Double),
137            "float" => Ok(Schema::Float),
138            "bytes" => Ok(Schema::Bytes),
139            "string" => Ok(Schema::String),
140            _ => self.fetch_schema_ref(name, enclosing_namespace),
141        }
142    }
143
144    /// Given a name, tries to retrieve the parsed schema from `parsed_schemas`.
145    /// If a parsed schema is not found, it checks if a currently resolving
146    /// schema with that name exists.
147    /// If a resolving schema is not found, it checks if a json with that name exists
148    /// in `input_schemas` and then parses it (removing it from `input_schemas`)
149    /// and adds the parsed schema to `parsed_schemas`.
150    ///
151    /// This method allows schemas definitions that depend on other types to
152    /// parse their dependencies (or look them up if already parsed).
153    pub(super) fn fetch_schema_ref(
154        &mut self,
155        name: &str,
156        enclosing_namespace: &Namespace,
157    ) -> AvroResult<Schema> {
158        fn get_schema_ref(parsed: &Schema) -> Schema {
159            match parsed {
160                &Schema::Record(RecordSchema { ref name, .. })
161                | &Schema::Enum(EnumSchema { ref name, .. })
162                | &Schema::Fixed(FixedSchema { ref name, .. }) => {
163                    Schema::Ref { name: name.clone() }
164                }
165                _ => parsed.clone(),
166            }
167        }
168
169        let name = Name::new(name)?;
170        let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
171
172        if self.parsed_schemas.contains_key(&fully_qualified_name) {
173            return Ok(Schema::Ref {
174                name: fully_qualified_name,
175            });
176        }
177        if let Some(resolving_schema) = self.resolving_schemas.get(&fully_qualified_name) {
178            return Ok(resolving_schema.clone());
179        }
180
181        // For good error reporting we add this check
182        match name.name.as_str() {
183            "record" | "enum" | "fixed" => {
184                return Err(Details::InvalidSchemaRecord(name.to_string()).into());
185            }
186            _ => (),
187        }
188
189        let value = self
190            .input_schemas
191            .remove(&fully_qualified_name)
192            // TODO make a better descriptive error message here that conveys that a named schema cannot be found
193            .ok_or_else(|| Details::ParsePrimitive(fully_qualified_name.fullname(None)))?;
194
195        // parsing a full schema from inside another schema. Other full schema will not inherit namespace
196        let parsed = self.parse(&value, &None)?;
197        self.parsed_schemas
198            .insert(self.get_schema_type_name(name, value), parsed.clone());
199
200        Ok(get_schema_ref(&parsed))
201    }
202
203    fn get_decimal_integer(
204        &self,
205        complex: &Map<String, Value>,
206        key: &'static str,
207    ) -> AvroResult<DecimalMetadata> {
208        match complex.get(key) {
209            Some(Value::Number(value)) => self.parse_json_integer_for_decimal(value),
210            None => {
211                if key == "scale" {
212                    Ok(0)
213                } else {
214                    Err(Details::GetDecimalMetadataFromJson(key).into())
215                }
216            }
217            Some(value) => Err(Details::GetDecimalMetadataValueFromJson {
218                key: key.into(),
219                value: value.clone(),
220            }
221            .into()),
222        }
223    }
224
225    fn parse_precision_and_scale(
226        &self,
227        complex: &Map<String, Value>,
228    ) -> AvroResult<(Precision, Scale)> {
229        let precision = self.get_decimal_integer(complex, "precision")?;
230        let scale = self.get_decimal_integer(complex, "scale")?;
231
232        if precision < 1 {
233            return Err(Details::DecimalPrecisionMuBePositive { precision }.into());
234        }
235
236        if precision < scale {
237            Err(Details::DecimalPrecisionLessThanScale { precision, scale }.into())
238        } else {
239            Ok((precision, scale))
240        }
241    }
242
243    /// Parse a `serde_json::Value` representing a complex Avro type into a
244    /// `Schema`.
245    ///
246    /// Avro supports "recursive" definition of types.
247    /// e.g: {"type": {"type": "string"}}
248    pub(super) fn parse_complex(
249        &mut self,
250        complex: &Map<String, Value>,
251        enclosing_namespace: &Namespace,
252        parse_location: RecordSchemaParseLocation,
253    ) -> AvroResult<Schema> {
254        // Try to parse this as a native complex type.
255        fn parse_as_native_complex(
256            complex: &Map<String, Value>,
257            parser: &mut Parser,
258            enclosing_namespace: &Namespace,
259        ) -> AvroResult<Schema> {
260            match complex.get("type") {
261                Some(value) => match value {
262                    Value::String(s) if s == "fixed" => {
263                        parser.parse_fixed(complex, enclosing_namespace)
264                    }
265                    _ => parser.parse(value, enclosing_namespace),
266                },
267                None => Err(Details::GetLogicalTypeField.into()),
268            }
269        }
270
271        // This crate support some logical types natively, and this function tries to convert
272        // a native complex type with a logical type attribute to these logical types.
273        // This function:
274        // 1. Checks whether the native complex type is in the supported kinds.
275        // 2. If it is, using the convert function to convert the native complex type to
276        // a logical type.
277        fn try_convert_to_logical_type<F>(
278            logical_type: &str,
279            schema: Schema,
280            supported_schema_kinds: &[SchemaKind],
281            convert: F,
282        ) -> AvroResult<Schema>
283        where
284            F: Fn(Schema) -> AvroResult<Schema>,
285        {
286            let kind = SchemaKind::from(schema.clone());
287            if supported_schema_kinds.contains(&kind) {
288                convert(schema)
289            } else {
290                warn!(
291                    "Ignoring unknown logical type '{logical_type}' for schema of type: {schema:?}!"
292                );
293                Ok(schema)
294            }
295        }
296
297        match complex.get("logicalType") {
298            Some(Value::String(t)) => match t.as_str() {
299                "decimal" => {
300                    return try_convert_to_logical_type(
301                        "decimal",
302                        parse_as_native_complex(complex, self, enclosing_namespace)?,
303                        &[SchemaKind::Fixed, SchemaKind::Bytes],
304                        |inner| -> AvroResult<Schema> {
305                            match self.parse_precision_and_scale(complex) {
306                                Ok((precision, scale)) => Ok(Schema::Decimal(DecimalSchema {
307                                    precision,
308                                    scale,
309                                    inner: inner.try_into()?,
310                                })),
311                                Err(err) => {
312                                    warn!("Ignoring invalid decimal logical type: {err}");
313                                    Ok(inner)
314                                }
315                            }
316                        },
317                    );
318                }
319                "big-decimal" => {
320                    return try_convert_to_logical_type(
321                        "big-decimal",
322                        parse_as_native_complex(complex, self, enclosing_namespace)?,
323                        &[SchemaKind::Bytes],
324                        |_| -> AvroResult<Schema> { Ok(Schema::BigDecimal) },
325                    );
326                }
327                "uuid" => {
328                    return try_convert_to_logical_type(
329                        "uuid",
330                        parse_as_native_complex(complex, self, enclosing_namespace)?,
331                        &[SchemaKind::String, SchemaKind::Fixed, SchemaKind::Bytes],
332                        |schema| match schema {
333                            Schema::String => Ok(Schema::Uuid(UuidSchema::String)),
334                            Schema::Fixed(fixed @ FixedSchema { size: 16, .. }) => {
335                                Ok(Schema::Uuid(UuidSchema::Fixed(fixed)))
336                            }
337                            Schema::Fixed(FixedSchema { size, .. }) => {
338                                warn!(
339                                    "Ignoring uuid logical type for a Fixed schema because its size ({size:?}) is not 16! Schema: {schema:?}"
340                                );
341                                Ok(schema)
342                            }
343                            Schema::Bytes => Ok(Schema::Uuid(UuidSchema::Bytes)),
344                            _ => {
345                                warn!("Ignoring invalid uuid logical type for schema: {schema:?}");
346                                Ok(schema)
347                            }
348                        },
349                    );
350                }
351                "date" => {
352                    return try_convert_to_logical_type(
353                        "date",
354                        parse_as_native_complex(complex, self, enclosing_namespace)?,
355                        &[SchemaKind::Int],
356                        |_| -> AvroResult<Schema> { Ok(Schema::Date) },
357                    );
358                }
359                "time-millis" => {
360                    return try_convert_to_logical_type(
361                        "date",
362                        parse_as_native_complex(complex, self, enclosing_namespace)?,
363                        &[SchemaKind::Int],
364                        |_| -> AvroResult<Schema> { Ok(Schema::TimeMillis) },
365                    );
366                }
367                "time-micros" => {
368                    return try_convert_to_logical_type(
369                        "time-micros",
370                        parse_as_native_complex(complex, self, enclosing_namespace)?,
371                        &[SchemaKind::Long],
372                        |_| -> AvroResult<Schema> { Ok(Schema::TimeMicros) },
373                    );
374                }
375                "timestamp-millis" => {
376                    return try_convert_to_logical_type(
377                        "timestamp-millis",
378                        parse_as_native_complex(complex, self, enclosing_namespace)?,
379                        &[SchemaKind::Long],
380                        |_| -> AvroResult<Schema> { Ok(Schema::TimestampMillis) },
381                    );
382                }
383                "timestamp-micros" => {
384                    return try_convert_to_logical_type(
385                        "timestamp-micros",
386                        parse_as_native_complex(complex, self, enclosing_namespace)?,
387                        &[SchemaKind::Long],
388                        |_| -> AvroResult<Schema> { Ok(Schema::TimestampMicros) },
389                    );
390                }
391                "timestamp-nanos" => {
392                    return try_convert_to_logical_type(
393                        "timestamp-nanos",
394                        parse_as_native_complex(complex, self, enclosing_namespace)?,
395                        &[SchemaKind::Long],
396                        |_| -> AvroResult<Schema> { Ok(Schema::TimestampNanos) },
397                    );
398                }
399                "local-timestamp-millis" => {
400                    return try_convert_to_logical_type(
401                        "local-timestamp-millis",
402                        parse_as_native_complex(complex, self, enclosing_namespace)?,
403                        &[SchemaKind::Long],
404                        |_| -> AvroResult<Schema> { Ok(Schema::LocalTimestampMillis) },
405                    );
406                }
407                "local-timestamp-micros" => {
408                    return try_convert_to_logical_type(
409                        "local-timestamp-micros",
410                        parse_as_native_complex(complex, self, enclosing_namespace)?,
411                        &[SchemaKind::Long],
412                        |_| -> AvroResult<Schema> { Ok(Schema::LocalTimestampMicros) },
413                    );
414                }
415                "local-timestamp-nanos" => {
416                    return try_convert_to_logical_type(
417                        "local-timestamp-nanos",
418                        parse_as_native_complex(complex, self, enclosing_namespace)?,
419                        &[SchemaKind::Long],
420                        |_| -> AvroResult<Schema> { Ok(Schema::LocalTimestampNanos) },
421                    );
422                }
423                "duration" => {
424                    return try_convert_to_logical_type(
425                        "duration",
426                        parse_as_native_complex(complex, self, enclosing_namespace)?,
427                        &[SchemaKind::Fixed],
428                        |schema| -> AvroResult<Schema> {
429                            match schema {
430                                Schema::Fixed(fixed @ FixedSchema { size: 12, .. }) => {
431                                    Ok(Schema::Duration(fixed))
432                                }
433                                Schema::Fixed(FixedSchema { size, .. }) => {
434                                    warn!(
435                                        "Ignoring duration logical type on fixed type because size ({size}) is not 12! Schema: {schema:?}"
436                                    );
437                                    Ok(schema)
438                                }
439                                _ => {
440                                    warn!(
441                                        "Ignoring invalid duration logical type for schema: {schema:?}"
442                                    );
443                                    Ok(schema)
444                                }
445                            }
446                        },
447                    );
448                }
449                // In this case, of an unknown logical type, we just pass through the underlying
450                // type.
451                _ => {}
452            },
453            // The spec says to ignore invalid logical types and just pass through the
454            // underlying type. It is unclear whether that applies to this case or not, where the
455            // `logicalType` is not a string.
456            Some(value) => return Err(Details::GetLogicalTypeFieldType(value.clone()).into()),
457            _ => {}
458        }
459        match complex.get("type") {
460            Some(Value::String(t)) => match t.as_str() {
461                "record" => match parse_location {
462                    RecordSchemaParseLocation::Root => {
463                        self.parse_record(complex, enclosing_namespace)
464                    }
465                    RecordSchemaParseLocation::FromField => {
466                        self.fetch_schema_ref(t, enclosing_namespace)
467                    }
468                },
469                "enum" => self.parse_enum(complex, enclosing_namespace),
470                "array" => self.parse_array(complex, enclosing_namespace),
471                "map" => self.parse_map(complex, enclosing_namespace),
472                "fixed" => self.parse_fixed(complex, enclosing_namespace),
473                other => self.parse_known_schema(other, enclosing_namespace),
474            },
475            Some(Value::Object(data)) => {
476                self.parse_complex(data, enclosing_namespace, RecordSchemaParseLocation::Root)
477            }
478            Some(Value::Array(variants)) => self.parse_union(variants, enclosing_namespace),
479            Some(unknown) => Err(Details::GetComplexType(unknown.clone()).into()),
480            None => Err(Details::GetComplexTypeField.into()),
481        }
482    }
483
484    fn register_resolving_schema(&mut self, name: &Name, aliases: &Aliases) {
485        let resolving_schema = Schema::Ref { name: name.clone() };
486        self.resolving_schemas
487            .insert(name.clone(), resolving_schema.clone());
488
489        let namespace = &name.namespace;
490
491        if let Some(aliases) = aliases {
492            aliases.iter().for_each(|alias| {
493                let alias_fullname = alias.fully_qualified_name(namespace);
494                self.resolving_schemas
495                    .insert(alias_fullname, resolving_schema.clone());
496            });
497        }
498    }
499
500    fn register_parsed_schema(
501        &mut self,
502        fully_qualified_name: &Name,
503        schema: &Schema,
504        aliases: &Aliases,
505    ) {
506        // FIXME, this should be globally aware, so if there is something overwriting something
507        // else then there is an ambiguous schema definition. An appropriate error should be thrown
508        self.parsed_schemas
509            .insert(fully_qualified_name.clone(), schema.clone());
510        self.resolving_schemas.remove(fully_qualified_name);
511
512        let namespace = &fully_qualified_name.namespace;
513
514        if let Some(aliases) = aliases {
515            aliases.iter().for_each(|alias| {
516                let alias_fullname = alias.fully_qualified_name(namespace);
517                self.resolving_schemas.remove(&alias_fullname);
518                self.parsed_schemas.insert(alias_fullname, schema.clone());
519            });
520        }
521    }
522
523    /// Returns already parsed schema or a schema that is currently being resolved.
524    fn get_already_seen_schema(
525        &self,
526        complex: &Map<String, Value>,
527        enclosing_namespace: &Namespace,
528    ) -> Option<&Schema> {
529        match complex.get("type") {
530            Some(Value::String(typ)) => {
531                let name = Name::new(typ.as_str())
532                    .unwrap()
533                    .fully_qualified_name(enclosing_namespace);
534                self.resolving_schemas
535                    .get(&name)
536                    .or_else(|| self.parsed_schemas.get(&name))
537            }
538            _ => None,
539        }
540    }
541
542    /// Parse a `serde_json::Value` representing a Avro record type into a
543    /// `Schema`.
544    fn parse_record(
545        &mut self,
546        complex: &Map<String, Value>,
547        enclosing_namespace: &Namespace,
548    ) -> AvroResult<Schema> {
549        let fields_opt = complex.get("fields");
550
551        if fields_opt.is_none()
552            && let Some(seen) = self.get_already_seen_schema(complex, enclosing_namespace)
553        {
554            return Ok(seen.clone());
555        }
556
557        let fully_qualified_name = Name::parse(complex, enclosing_namespace)?;
558        let aliases =
559            self.fix_aliases_namespace(complex.aliases(), &fully_qualified_name.namespace);
560
561        let mut lookup = BTreeMap::new();
562
563        self.register_resolving_schema(&fully_qualified_name, &aliases);
564
565        debug!("Going to parse record schema: {:?}", &fully_qualified_name);
566
567        let fields: Vec<RecordField> = fields_opt
568            .and_then(|fields| fields.as_array())
569            .ok_or_else(|| Error::new(Details::GetRecordFieldsJson))
570            .and_then(|fields| {
571                fields
572                    .iter()
573                    .filter_map(|field| field.as_object())
574                    .enumerate()
575                    .map(|(position, field)| {
576                        RecordField::parse(field, position, self, &fully_qualified_name)
577                    })
578                    .collect::<Result<_, _>>()
579            })?;
580
581        for field in &fields {
582            if let Some(_old) = lookup.insert(field.name.clone(), field.position) {
583                return Err(Details::FieldNameDuplicate(field.name.clone()).into());
584            }
585
586            if let Some(ref field_aliases) = field.aliases {
587                for alias in field_aliases {
588                    lookup.insert(alias.clone(), field.position);
589                }
590            }
591        }
592
593        let schema = Schema::Record(RecordSchema {
594            name: fully_qualified_name.clone(),
595            aliases: aliases.clone(),
596            doc: complex.doc(),
597            fields,
598            lookup,
599            attributes: self.get_custom_attributes(complex, vec!["fields"]),
600        });
601
602        self.register_parsed_schema(&fully_qualified_name, &schema, &aliases);
603        Ok(schema)
604    }
605
606    fn get_custom_attributes(
607        &self,
608        complex: &Map<String, Value>,
609        excluded: Vec<&'static str>,
610    ) -> BTreeMap<String, Value> {
611        let mut custom_attributes: BTreeMap<String, Value> = BTreeMap::new();
612        for (key, value) in complex {
613            match key.as_str() {
614                "type" | "name" | "namespace" | "doc" | "aliases" | "logicalType" => continue,
615                candidate if excluded.contains(&candidate) => continue,
616                _ => custom_attributes.insert(key.clone(), value.clone()),
617            };
618        }
619        custom_attributes
620    }
621
622    /// Parse a `serde_json::Value` representing a Avro enum type into a
623    /// `Schema`.
624    fn parse_enum(
625        &mut self,
626        complex: &Map<String, Value>,
627        enclosing_namespace: &Namespace,
628    ) -> AvroResult<Schema> {
629        let symbols_opt = complex.get("symbols");
630
631        if symbols_opt.is_none()
632            && let Some(seen) = self.get_already_seen_schema(complex, enclosing_namespace)
633        {
634            return Ok(seen.clone());
635        }
636
637        let fully_qualified_name = Name::parse(complex, enclosing_namespace)?;
638        let aliases =
639            self.fix_aliases_namespace(complex.aliases(), &fully_qualified_name.namespace);
640
641        let symbols: Vec<String> = symbols_opt
642            .and_then(|v| v.as_array())
643            .ok_or_else(|| Error::from(Details::GetEnumSymbolsField))
644            .and_then(|symbols| {
645                symbols
646                    .iter()
647                    .map(|symbol| symbol.as_str().map(|s| s.to_string()))
648                    .collect::<Option<_>>()
649                    .ok_or_else(|| Error::from(Details::GetEnumSymbols))
650            })?;
651
652        let mut existing_symbols: HashSet<&String> = HashSet::with_capacity(symbols.len());
653        for symbol in symbols.iter() {
654            validate_enum_symbol_name(symbol)?;
655
656            // Ensure there are no duplicate symbols
657            if existing_symbols.contains(&symbol) {
658                return Err(Details::EnumSymbolDuplicate(symbol.to_string()).into());
659            }
660
661            existing_symbols.insert(symbol);
662        }
663
664        let mut default: Option<String> = None;
665        if let Some(value) = complex.get("default") {
666            if let Value::String(ref s) = *value {
667                default = Some(s.clone());
668            } else {
669                return Err(Details::EnumDefaultWrongType(value.clone()).into());
670            }
671        }
672
673        if let Some(ref value) = default {
674            let resolved = types::Value::from(value.clone())
675                .resolve_enum(&symbols, &Some(value.to_string()), &None)
676                .is_ok();
677            if !resolved {
678                return Err(Details::GetEnumDefault {
679                    symbol: value.to_string(),
680                    symbols,
681                }
682                .into());
683            }
684        }
685
686        let schema = Schema::Enum(EnumSchema {
687            name: fully_qualified_name.clone(),
688            aliases: aliases.clone(),
689            doc: complex.doc(),
690            symbols,
691            default,
692            attributes: self.get_custom_attributes(complex, vec!["symbols"]),
693        });
694
695        self.register_parsed_schema(&fully_qualified_name, &schema, &aliases);
696
697        Ok(schema)
698    }
699
700    /// Parse a `serde_json::Value` representing a Avro array type into a
701    /// `Schema`.
702    fn parse_array(
703        &mut self,
704        complex: &Map<String, Value>,
705        enclosing_namespace: &Namespace,
706    ) -> AvroResult<Schema> {
707        complex
708            .get("items")
709            .ok_or_else(|| Details::GetArrayItemsField.into())
710            .and_then(|items| self.parse(items, enclosing_namespace))
711            .map(|items| {
712                Schema::array_with_attributes(
713                    items,
714                    self.get_custom_attributes(complex, vec!["items"]),
715                )
716            })
717    }
718
719    /// Parse a `serde_json::Value` representing a Avro map type into a
720    /// `Schema`.
721    fn parse_map(
722        &mut self,
723        complex: &Map<String, Value>,
724        enclosing_namespace: &Namespace,
725    ) -> AvroResult<Schema> {
726        complex
727            .get("values")
728            .ok_or_else(|| Details::GetMapValuesField.into())
729            .and_then(|items| self.parse(items, enclosing_namespace))
730            .map(|items| {
731                Schema::map_with_attributes(
732                    items,
733                    self.get_custom_attributes(complex, vec!["values"]),
734                )
735            })
736    }
737
738    /// Parse a `serde_json::Value` representing a Avro union type into a
739    /// `Schema`.
740    fn parse_union(
741        &mut self,
742        items: &[Value],
743        enclosing_namespace: &Namespace,
744    ) -> AvroResult<Schema> {
745        items
746            .iter()
747            .map(|v| self.parse(v, enclosing_namespace))
748            .collect::<Result<Vec<_>, _>>()
749            .and_then(|schemas| {
750                if schemas.is_empty() {
751                    error!(
752                        "Union schemas should have at least two members! \
753                    Please enable debug logging to find out which Record schema \
754                    declares the union with 'RUST_LOG=apache_avro::schema=debug'."
755                    );
756                } else if schemas.len() == 1 {
757                    warn!(
758                        "Union schema with just one member! Consider dropping the union! \
759                    Please enable debug logging to find out which Record schema \
760                    declares the union with 'RUST_LOG=apache_avro::schema=debug'."
761                    );
762                }
763                Ok(Schema::Union(UnionSchema::new(schemas)?))
764            })
765    }
766
767    /// Parse a `serde_json::Value` representing a Avro fixed type into a
768    /// `Schema`.
769    fn parse_fixed(
770        &mut self,
771        complex: &Map<String, Value>,
772        enclosing_namespace: &Namespace,
773    ) -> AvroResult<Schema> {
774        let size_opt = complex.get("size");
775        if size_opt.is_none()
776            && let Some(seen) = self.get_already_seen_schema(complex, enclosing_namespace)
777        {
778            return Ok(seen.clone());
779        }
780
781        let doc = complex.get("doc").and_then(|v| match &v {
782            &Value::String(docstr) => Some(docstr.clone()),
783            _ => None,
784        });
785
786        let size = match size_opt {
787            Some(size) => size
788                .as_u64()
789                .ok_or_else(|| Details::GetFixedSizeFieldPositive(size.clone())),
790            None => Err(Details::GetFixedSizeField),
791        }?;
792
793        let default = complex.get("default").and_then(|v| match &v {
794            &Value::String(default) => Some(default.clone()),
795            _ => None,
796        });
797
798        if default.is_some() {
799            let len = default.clone().unwrap().len();
800            if len != size as usize {
801                return Err(Details::FixedDefaultLenSizeMismatch(len, size).into());
802            }
803        }
804
805        let fully_qualified_name = Name::parse(complex, enclosing_namespace)?;
806        let aliases =
807            self.fix_aliases_namespace(complex.aliases(), &fully_qualified_name.namespace);
808
809        let schema = Schema::Fixed(FixedSchema {
810            name: fully_qualified_name.clone(),
811            aliases: aliases.clone(),
812            doc,
813            size: size as usize,
814            default,
815            attributes: self.get_custom_attributes(complex, vec!["size"]),
816        });
817
818        self.register_parsed_schema(&fully_qualified_name, &schema, &aliases);
819
820        Ok(schema)
821    }
822
823    // A type alias may be specified either as a fully namespace-qualified, or relative
824    // to the namespace of the name it is an alias for. For example, if a type named "a.b"
825    // has aliases of "c" and "x.y", then the fully qualified names of its aliases are "a.c"
826    // and "x.y".
827    // https://avro.apache.org/docs/current/specification/#aliases
828    fn fix_aliases_namespace(
829        &self,
830        aliases: Option<Vec<String>>,
831        namespace: &Namespace,
832    ) -> Aliases {
833        aliases.map(|aliases| {
834            aliases
835                .iter()
836                .map(|alias| {
837                    if alias.find('.').is_none() {
838                        match namespace {
839                            Some(ns) => format!("{ns}.{alias}"),
840                            None => alias.clone(),
841                        }
842                    } else {
843                        alias.clone()
844                    }
845                })
846                .map(|alias| Alias::new(alias.as_str()).unwrap())
847                .collect()
848        })
849    }
850
851    fn get_schema_type_name(&self, name: Name, value: Value) -> Name {
852        match value.get("type") {
853            Some(Value::Object(complex_type)) => match complex_type.name() {
854                Some(name) => Name::new(name.as_str()).unwrap(),
855                _ => name,
856            },
857            _ => name,
858        }
859    }
860
861    fn parse_json_integer_for_decimal(
862        &self,
863        value: &serde_json::Number,
864    ) -> AvroResult<DecimalMetadata> {
865        Ok(if value.is_u64() {
866            let num = value
867                .as_u64()
868                .ok_or_else(|| Details::GetU64FromJson(value.clone()))?;
869            num.try_into()
870                .map_err(|e| Details::ConvertU64ToUsize(e, num))?
871        } else if value.is_i64() {
872            let num = value
873                .as_i64()
874                .ok_or_else(|| Details::GetI64FromJson(value.clone()))?;
875            num.try_into()
876                .map_err(|e| Details::ConvertI64ToUsize(e, num))?
877        } else {
878            return Err(Details::GetPrecisionOrScaleFromJson(value.clone()).into());
879        })
880    }
881}