Skip to main content

apache_avro/
encode.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::schema::{InnerDecimalSchema, NamespaceRef, UuidSchema};
19use crate::{
20    AvroResult,
21    bigdecimal::serialize_big_decimal,
22    error::Details,
23    schema::{
24        DecimalSchema, EnumSchema, FixedSchema, Name, RecordSchema, ResolvedSchema, Schema,
25        SchemaKind, UnionSchema,
26    },
27    types::{Value, ValueKind},
28    util::{zig_i32, zig_i64},
29};
30use log::error;
31use std::{borrow::Borrow, collections::HashMap, io::Write};
32
33/// Encode a `Value` into avro format.
34///
35/// **NOTE** This will not perform schema validation. The value is assumed to
36/// be valid with regards to the schema. Schema are needed only to guide the
37/// encoding for complex type values.
38pub fn encode<W: Write>(value: &Value, schema: &Schema, writer: &mut W) -> AvroResult<usize> {
39    let rs = ResolvedSchema::try_from(schema)?;
40    encode_internal(value, schema, rs.get_names(), None, writer)
41}
42
43/// Encode `s` as the _bytes_ primitive type.
44///
45/// This writes the length as the _long_ primitive and then the raw bytes.
46pub(crate) fn encode_bytes<B: AsRef<[u8]> + ?Sized, W: Write>(
47    s: &B,
48    mut writer: W,
49) -> AvroResult<usize> {
50    let bytes = s.as_ref();
51    encode_long(bytes.len() as i64, &mut writer)?;
52    writer
53        .write(bytes)
54        .map_err(|e| Details::WriteBytes(e).into())
55}
56
57pub(crate) fn encode_long<W: Write>(i: i64, writer: W) -> AvroResult<usize> {
58    zig_i64(i, writer)
59}
60
61pub(crate) fn encode_int<W: Write>(i: i32, writer: W) -> AvroResult<usize> {
62    zig_i32(i, writer)
63}
64
65pub(crate) fn encode_internal<W: Write, S: Borrow<Schema>>(
66    value: &Value,
67    schema: &Schema,
68    names: &HashMap<Name, S>,
69    enclosing_namespace: NamespaceRef,
70    writer: &mut W,
71) -> AvroResult<usize> {
72    if let Schema::Ref { name } = schema {
73        let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
74        let resolved = names
75            .get(&fully_qualified_name)
76            .ok_or(Details::SchemaResolutionError(
77                fully_qualified_name.into_owned(),
78            ))?;
79        return encode_internal(value, resolved.borrow(), names, enclosing_namespace, writer);
80    }
81
82    match value {
83        Value::Null => {
84            if let Schema::Union(union) = schema {
85                match union.schemas.iter().position(|sch| *sch == Schema::Null) {
86                    None => Err(Details::EncodeValueAsSchemaError {
87                        value_kind: ValueKind::Null,
88                        supported_schema: vec![SchemaKind::Null, SchemaKind::Union],
89                    }
90                    .into()),
91                    Some(p) => encode_long(p as i64, writer),
92                }
93            } else {
94                Ok(0)
95            }
96        }
97        Value::Boolean(b) => writer
98            .write(&[u8::from(*b)])
99            .map_err(|e| Details::WriteBytes(e).into()),
100        // Pattern | Pattern here to signify that these _must_ have the same encoding.
101        Value::Int(i) | Value::Date(i) | Value::TimeMillis(i) => encode_int(*i, writer),
102        Value::Long(i)
103        | Value::TimestampMillis(i)
104        | Value::TimestampMicros(i)
105        | Value::TimestampNanos(i)
106        | Value::LocalTimestampMillis(i)
107        | Value::LocalTimestampMicros(i)
108        | Value::LocalTimestampNanos(i)
109        | Value::TimeMicros(i) => encode_long(*i, writer),
110        Value::Float(x) => writer
111            .write(&x.to_le_bytes())
112            .map_err(|e| Details::WriteBytes(e).into()),
113        Value::Double(x) => writer
114            .write(&x.to_le_bytes())
115            .map_err(|e| Details::WriteBytes(e).into()),
116        Value::Decimal(decimal) => match schema {
117            Schema::Decimal(DecimalSchema { inner, .. }) => match inner {
118                InnerDecimalSchema::Fixed(fixed) => {
119                    let bytes = decimal.to_sign_extended_bytes_with_len(fixed.size)?;
120                    let num_bytes = bytes.len();
121                    if num_bytes != fixed.size {
122                        return Err(
123                            Details::EncodeDecimalAsFixedError(num_bytes, fixed.size).into()
124                        );
125                    }
126                    encode(
127                        &Value::Fixed(fixed.size, bytes),
128                        &Schema::Fixed(fixed.copy_only_size()),
129                        writer,
130                    )
131                }
132                InnerDecimalSchema::Bytes => {
133                    encode(&Value::Bytes(decimal.try_into()?), &Schema::Bytes, writer)
134                }
135            },
136            _ => Err(Details::EncodeValueAsSchemaError {
137                value_kind: ValueKind::Decimal,
138                supported_schema: vec![SchemaKind::Decimal],
139            }
140            .into()),
141        },
142        &Value::Duration(duration) => {
143            let slice: [u8; 12] = duration.into();
144            writer
145                .write(&slice)
146                .map_err(|e| Details::WriteBytes(e).into())
147        }
148        Value::Uuid(uuid) => match *schema {
149            Schema::Uuid(UuidSchema::String) | Schema::String => encode_bytes(
150                // we need the call .to_string() to properly convert ASCII to UTF-8
151                #[allow(clippy::unnecessary_to_owned)]
152                &uuid.to_string(),
153                writer,
154            ),
155            Schema::Uuid(UuidSchema::Bytes) | Schema::Bytes => {
156                let bytes = uuid.as_bytes();
157                encode_bytes(bytes, writer)
158            }
159            Schema::Uuid(UuidSchema::Fixed(FixedSchema { size, .. }))
160            | Schema::Fixed(FixedSchema { size, .. }) => {
161                if size != 16 {
162                    return Err(Details::ConvertFixedToUuid(size).into());
163                }
164
165                let bytes = uuid.as_bytes();
166                writer
167                    .write(bytes.as_slice())
168                    .map_err(|e| Details::WriteBytes(e).into())
169            }
170            _ => Err(Details::EncodeValueAsSchemaError {
171                value_kind: ValueKind::Uuid,
172                supported_schema: vec![
173                    SchemaKind::Uuid,
174                    SchemaKind::Fixed,
175                    SchemaKind::Bytes,
176                    SchemaKind::String,
177                ],
178            }
179            .into()),
180        },
181        Value::BigDecimal(bg) => {
182            let buf: Vec<u8> = serialize_big_decimal(bg)?;
183            writer
184                .write(buf.as_slice())
185                .map_err(|e| Details::WriteBytes(e).into())
186        }
187        Value::Bytes(bytes) => match *schema {
188            Schema::Bytes | Schema::Uuid(UuidSchema::Bytes) => encode_bytes(bytes, writer),
189            Schema::Fixed { .. } => writer
190                .write(bytes.as_slice())
191                .map_err(|e| Details::WriteBytes(e).into()),
192            _ => Err(Details::EncodeValueAsSchemaError {
193                value_kind: ValueKind::Bytes,
194                supported_schema: vec![SchemaKind::Bytes, SchemaKind::Fixed, SchemaKind::Uuid],
195            }
196            .into()),
197        },
198        Value::String(s) => match *schema {
199            Schema::String | Schema::Uuid(UuidSchema::String) => encode_bytes(s, writer),
200            Schema::Enum(EnumSchema { ref symbols, .. }) => {
201                if let Some(index) = symbols.iter().position(|item| item == s) {
202                    encode_int(index as i32, writer)
203                } else {
204                    error!("Invalid symbol string {:?}.", &s[..]);
205                    Err(Details::GetEnumSymbol(s.clone()).into())
206                }
207            }
208            _ => Err(Details::EncodeValueAsSchemaError {
209                value_kind: ValueKind::String,
210                supported_schema: vec![SchemaKind::String, SchemaKind::Enum],
211            }
212            .into()),
213        },
214        Value::Fixed(_, bytes) => writer
215            .write(bytes.as_slice())
216            .map_err(|e| Details::WriteBytes(e).into()),
217        Value::Enum(i, _) => encode_int(*i as i32, writer),
218        Value::Union(idx, item) => {
219            if let Schema::Union(ref inner) = *schema {
220                let inner_schema = inner
221                    .schemas
222                    .get(*idx as usize)
223                    .expect("Invalid Union validation occurred");
224                encode_long(*idx as i64, &mut *writer)?;
225                encode_internal(item, inner_schema, names, enclosing_namespace, &mut *writer)
226            } else {
227                error!("invalid schema type for Union: {schema:?}");
228                Err(Details::EncodeValueAsSchemaError {
229                    value_kind: ValueKind::Union,
230                    supported_schema: vec![SchemaKind::Union],
231                }
232                .into())
233            }
234        }
235        Value::Array(items) => {
236            if let Schema::Array(ref inner) = *schema {
237                if !items.is_empty() {
238                    encode_long(items.len() as i64, &mut *writer)?;
239                    for item in items.iter() {
240                        encode_internal(
241                            item,
242                            &inner.items,
243                            names,
244                            enclosing_namespace,
245                            &mut *writer,
246                        )?;
247                    }
248                }
249                writer
250                    .write(&[0u8])
251                    .map_err(|e| Details::WriteBytes(e).into())
252            } else {
253                error!("invalid schema type for Array: {schema:?}");
254                Err(Details::EncodeValueAsSchemaError {
255                    value_kind: ValueKind::Array,
256                    supported_schema: vec![SchemaKind::Array],
257                }
258                .into())
259            }
260        }
261        Value::Map(items) => {
262            if let Schema::Map(ref inner) = *schema {
263                if !items.is_empty() {
264                    encode_long(items.len() as i64, &mut *writer)?;
265                    for (key, value) in items {
266                        encode_bytes(key, &mut *writer)?;
267                        encode_internal(
268                            value,
269                            &inner.types,
270                            names,
271                            enclosing_namespace,
272                            &mut *writer,
273                        )?;
274                    }
275                }
276                writer
277                    .write(&[0u8])
278                    .map_err(|e| Details::WriteBytes(e).into())
279            } else {
280                error!("invalid schema type for Map: {schema:?}");
281                Err(Details::EncodeValueAsSchemaError {
282                    value_kind: ValueKind::Map,
283                    supported_schema: vec![SchemaKind::Map],
284                }
285                .into())
286            }
287        }
288        Value::Record(value_fields) => {
289            if let Schema::Record(RecordSchema {
290                ref name,
291                fields: ref schema_fields,
292                ..
293            }) = *schema
294            {
295                let record_namespace = name.namespace().or(enclosing_namespace);
296
297                let mut lookup = HashMap::new();
298                value_fields.iter().for_each(|(name, field)| {
299                    lookup.insert(name, field);
300                });
301
302                let mut written_bytes = 0;
303                for schema_field in schema_fields.iter() {
304                    let name = &schema_field.name;
305                    let value_opt = lookup.get(name).or_else(|| {
306                        schema_field
307                            .aliases
308                            .iter()
309                            .find_map(|alias| lookup.get(alias))
310                    });
311
312                    if let Some(value) = value_opt {
313                        written_bytes += encode_internal(
314                            value,
315                            &schema_field.schema,
316                            names,
317                            record_namespace,
318                            writer,
319                        )?;
320                    } else {
321                        return Err(Details::NoEntryInLookupTable(
322                            name.clone(),
323                            format!("{lookup:?}"),
324                        )
325                        .into());
326                    }
327                }
328                Ok(written_bytes)
329            } else if let Schema::Union(UnionSchema { schemas, .. }) = schema {
330                let mut union_buffer: Vec<u8> = Vec::new();
331                for (index, schema) in schemas.iter().enumerate() {
332                    encode_long(index as i64, &mut union_buffer)?;
333                    let encode_res = encode_internal(
334                        value,
335                        schema,
336                        names,
337                        enclosing_namespace,
338                        &mut union_buffer,
339                    );
340                    match encode_res {
341                        Ok(_) => {
342                            return writer
343                                .write(union_buffer.as_slice())
344                                .map_err(|e| Details::WriteBytes(e).into());
345                        }
346                        Err(_) => {
347                            union_buffer.clear(); //undo any partial encoding
348                        }
349                    }
350                }
351                Err(Details::EncodeValueAsSchemaError {
352                    value_kind: ValueKind::Record,
353                    supported_schema: vec![SchemaKind::Record, SchemaKind::Union],
354                }
355                .into())
356            } else {
357                error!("invalid schema type for Record: {schema:?}");
358                Err(Details::EncodeValueAsSchemaError {
359                    value_kind: ValueKind::Record,
360                    supported_schema: vec![SchemaKind::Record, SchemaKind::Union],
361                }
362                .into())
363            }
364        }
365    }
366}
367
368pub fn encode_to_vec(value: &Value, schema: &Schema) -> AvroResult<Vec<u8>> {
369    let mut buffer = Vec::new();
370    encode(value, schema, &mut buffer)?;
371    Ok(buffer)
372}
373
374#[cfg(test)]
375#[allow(clippy::expect_fun_call)]
376pub(crate) mod tests {
377    use super::*;
378    use crate::error::{Details, Error};
379    use apache_avro_test_helper::TestResult;
380    use pretty_assertions::assert_eq;
381    use uuid::Uuid;
382
383    pub(crate) fn success(value: &Value, schema: &Schema) -> String {
384        format!(
385            "Value: {:?}\n should encode with schema:\n{:?}",
386            &value, &schema
387        )
388    }
389
390    #[test]
391    fn test_encode_empty_array() {
392        let mut buf = Vec::new();
393        let empty: Vec<Value> = Vec::new();
394        encode(
395            &Value::Array(empty.clone()),
396            &Schema::array(Schema::Int).build(),
397            &mut buf,
398        )
399        .expect(&success(
400            &Value::Array(empty),
401            &Schema::array(Schema::Int).build(),
402        ));
403        assert_eq!(vec![0u8], buf);
404    }
405
406    #[test]
407    fn test_encode_empty_map() {
408        let mut buf = Vec::new();
409        let empty: HashMap<String, Value> = HashMap::new();
410        encode(
411            &Value::Map(empty.clone()),
412            &Schema::map(Schema::Int).build(),
413            &mut buf,
414        )
415        .expect(&success(
416            &Value::Map(empty),
417            &Schema::map(Schema::Int).build(),
418        ));
419        assert_eq!(vec![0u8], buf);
420    }
421
422    #[test]
423    fn test_avro_3433_recursive_definition_encode_record() {
424        let mut buf = Vec::new();
425        let schema = Schema::parse_str(
426            r#"
427            {
428                "type":"record",
429                "name":"TestStruct",
430                "fields": [
431                    {
432                        "name":"a",
433                        "type":{
434                            "type":"record",
435                            "name": "Inner",
436                            "fields": [ {
437                                "name":"z",
438                                "type":"int"
439                            }]
440                        }
441                    },
442                    {
443                        "name":"b",
444                        "type":"Inner"
445                    }
446                ]
447            }"#,
448        )
449        .unwrap();
450
451        let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
452        let inner_value2 = Value::Record(vec![("z".into(), Value::Int(6))]);
453        let outer_value =
454            Value::Record(vec![("a".into(), inner_value1), ("b".into(), inner_value2)]);
455        encode(&outer_value, &schema, &mut buf).expect(&success(&outer_value, &schema));
456        assert!(!buf.is_empty());
457    }
458
459    #[test]
460    fn test_avro_3433_recursive_definition_encode_array() {
461        let mut buf = Vec::new();
462        let schema = Schema::parse_str(
463            r#"
464            {
465                "type":"record",
466                "name":"TestStruct",
467                "fields": [
468                    {
469                        "name":"a",
470                        "type":{
471                            "type":"array",
472                            "items": {
473                                "type":"record",
474                                "name": "Inner",
475                                "fields": [ {
476                                    "name":"z",
477                                    "type":"int"
478                                }]
479                            }
480                        }
481                    },
482                    {
483                        "name":"b",
484                        "type": {
485                            "type":"map",
486                            "values":"Inner"
487                        }
488                    }
489                ]
490            }"#,
491        )
492        .unwrap();
493
494        let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
495        let inner_value2 = Value::Record(vec![("z".into(), Value::Int(6))]);
496        let outer_value = Value::Record(vec![
497            ("a".into(), Value::Array(vec![inner_value1])),
498            (
499                "b".into(),
500                Value::Map(vec![("akey".into(), inner_value2)].into_iter().collect()),
501            ),
502        ]);
503        encode(&outer_value, &schema, &mut buf).expect(&success(&outer_value, &schema));
504        assert!(!buf.is_empty());
505    }
506
507    #[test]
508    fn test_avro_3433_recursive_definition_encode_map() {
509        let mut buf = Vec::new();
510        let schema = Schema::parse_str(
511            r#"
512            {
513                "type":"record",
514                "name":"TestStruct",
515                "fields": [
516                {
517                    "name":"a",
518                    "type":{
519                        "type":"record",
520                        "name": "Inner",
521                        "fields": [ {
522                            "name":"z",
523                            "type":"int"
524                        }]
525                    }
526                },
527                {
528                    "name":"b",
529                    "type": {
530                        "type":"map",
531                        "values":"Inner"
532                    }
533                }
534            ]
535        }"#,
536        )
537        .unwrap();
538
539        let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
540        let inner_value2 = Value::Record(vec![("z".into(), Value::Int(6))]);
541        let outer_value = Value::Record(vec![
542            ("a".into(), inner_value1),
543            (
544                "b".into(),
545                Value::Map(vec![("akey".into(), inner_value2)].into_iter().collect()),
546            ),
547        ]);
548        encode(&outer_value, &schema, &mut buf).expect(&success(&outer_value, &schema));
549        assert!(!buf.is_empty());
550    }
551
552    #[test]
553    fn test_avro_3433_recursive_definition_encode_record_wrapper() {
554        let mut buf = Vec::new();
555        let schema = Schema::parse_str(
556            r#"
557        {
558            "type":"record",
559            "name":"TestStruct",
560            "fields": [
561                {
562                    "name":"a",
563                    "type":{
564                        "type":"record",
565                        "name": "Inner",
566                        "fields": [ {
567                            "name":"z",
568                            "type":"int"
569                        }]
570                    }
571                },
572                {
573                    "name":"b",
574                    "type": {
575                        "type":"record",
576                        "name": "InnerWrapper",
577                        "fields": [ {
578                            "name":"j",
579                            "type":"Inner"
580                        }]
581                    }
582                }
583            ]
584        }"#,
585        )
586        .unwrap();
587
588        let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
589        let inner_value2 = Value::Record(vec![(
590            "j".into(),
591            Value::Record(vec![("z".into(), Value::Int(6))]),
592        )]);
593        let outer_value =
594            Value::Record(vec![("a".into(), inner_value1), ("b".into(), inner_value2)]);
595        encode(&outer_value, &schema, &mut buf).expect(&success(&outer_value, &schema));
596        assert!(!buf.is_empty());
597    }
598
599    #[test]
600    fn test_avro_3433_recursive_definition_encode_map_and_array() {
601        let mut buf = Vec::new();
602        let schema = Schema::parse_str(
603            r#"
604        {
605            "type":"record",
606            "name":"TestStruct",
607            "fields": [
608                {
609                    "name":"a",
610                    "type":{
611                        "type":"map",
612                        "values": {
613                            "type":"record",
614                            "name": "Inner",
615                            "fields": [ {
616                                "name":"z",
617                                "type":"int"
618                            }]
619                        }
620                    }
621                },
622                {
623                    "name":"b",
624                    "type": {
625                        "type":"array",
626                        "items":"Inner"
627                    }
628                }
629            ]
630        }"#,
631        )
632        .unwrap();
633
634        let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
635        let inner_value2 = Value::Record(vec![("z".into(), Value::Int(6))]);
636        let outer_value = Value::Record(vec![
637            (
638                "a".into(),
639                Value::Map(vec![("akey".into(), inner_value2)].into_iter().collect()),
640            ),
641            ("b".into(), Value::Array(vec![inner_value1])),
642        ]);
643        encode(&outer_value, &schema, &mut buf).expect(&success(&outer_value, &schema));
644        assert!(!buf.is_empty());
645    }
646
647    #[test]
648    fn test_avro_3433_recursive_definition_encode_union() {
649        let mut buf = Vec::new();
650        let schema = Schema::parse_str(
651            r#"
652        {
653            "type":"record",
654            "name":"TestStruct",
655            "fields": [
656                {
657                    "name":"a",
658                    "type":["null", {
659                        "type":"record",
660                        "name": "Inner",
661                        "fields": [ {
662                            "name":"z",
663                            "type":"int"
664                        }]
665                    }]
666                },
667                {
668                    "name":"b",
669                    "type":"Inner"
670                }
671            ]
672        }"#,
673        )
674        .unwrap();
675
676        let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
677        let inner_value2 = Value::Record(vec![("z".into(), Value::Int(6))]);
678        let outer_value1 = Value::Record(vec![
679            ("a".into(), Value::Union(1, Box::new(inner_value1))),
680            ("b".into(), inner_value2.clone()),
681        ]);
682        encode(&outer_value1, &schema, &mut buf).expect(&success(&outer_value1, &schema));
683        assert!(!buf.is_empty());
684
685        buf.drain(..);
686        let outer_value2 = Value::Record(vec![
687            ("a".into(), Value::Union(0, Box::new(Value::Null))),
688            ("b".into(), inner_value2),
689        ]);
690        encode(&outer_value2, &schema, &mut buf).expect(&success(&outer_value1, &schema));
691        assert!(!buf.is_empty());
692    }
693
694    #[test]
695    fn test_avro_3448_proper_multi_level_encoding_outer_namespace() {
696        let schema = r#"
697        {
698          "name": "record_name",
699          "namespace": "space",
700          "type": "record",
701          "fields": [
702            {
703              "name": "outer_field_1",
704              "type": [
705                        "null",
706                        {
707                            "type": "record",
708                            "name": "middle_record_name",
709                            "fields":[
710                                {
711                                    "name":"middle_field_1",
712                                    "type":[
713                                        "null",
714                                        {
715                                            "type":"record",
716                                            "name":"inner_record_name",
717                                            "fields":[
718                                                {
719                                                    "name":"inner_field_1",
720                                                    "type":"double"
721                                                }
722                                            ]
723                                        }
724                                    ]
725                                }
726                            ]
727                        }
728                    ]
729            },
730            {
731                "name": "outer_field_2",
732                "type" : "space.inner_record_name"
733            }
734          ]
735        }
736        "#;
737        let schema = Schema::parse_str(schema).unwrap();
738        let inner_record = Value::Record(vec![("inner_field_1".into(), Value::Double(5.4))]);
739        let middle_record_variation_1 = Value::Record(vec![(
740            "middle_field_1".into(),
741            Value::Union(0, Box::new(Value::Null)),
742        )]);
743        let middle_record_variation_2 = Value::Record(vec![(
744            "middle_field_1".into(),
745            Value::Union(1, Box::new(inner_record.clone())),
746        )]);
747        let outer_record_variation_1 = Value::Record(vec![
748            (
749                "outer_field_1".into(),
750                Value::Union(0, Box::new(Value::Null)),
751            ),
752            ("outer_field_2".into(), inner_record.clone()),
753        ]);
754        let outer_record_variation_2 = Value::Record(vec![
755            (
756                "outer_field_1".into(),
757                Value::Union(1, Box::new(middle_record_variation_1)),
758            ),
759            ("outer_field_2".into(), inner_record.clone()),
760        ]);
761        let outer_record_variation_3 = Value::Record(vec![
762            (
763                "outer_field_1".into(),
764                Value::Union(1, Box::new(middle_record_variation_2)),
765            ),
766            ("outer_field_2".into(), inner_record),
767        ]);
768
769        let mut buf = Vec::new();
770        encode(&outer_record_variation_1, &schema, &mut buf)
771            .expect(&success(&outer_record_variation_1, &schema));
772        assert!(!buf.is_empty());
773        buf.drain(..);
774        encode(&outer_record_variation_2, &schema, &mut buf)
775            .expect(&success(&outer_record_variation_2, &schema));
776        assert!(!buf.is_empty());
777        buf.drain(..);
778        encode(&outer_record_variation_3, &schema, &mut buf)
779            .expect(&success(&outer_record_variation_3, &schema));
780        assert!(!buf.is_empty());
781    }
782
783    #[test]
784    fn test_avro_3448_proper_multi_level_encoding_middle_namespace() {
785        let schema = r#"
786        {
787          "name": "record_name",
788          "namespace": "space",
789          "type": "record",
790          "fields": [
791            {
792              "name": "outer_field_1",
793              "type": [
794                        "null",
795                        {
796                            "type": "record",
797                            "name": "middle_record_name",
798                            "namespace":"middle_namespace",
799                            "fields":[
800                                {
801                                    "name":"middle_field_1",
802                                    "type":[
803                                        "null",
804                                        {
805                                            "type":"record",
806                                            "name":"inner_record_name",
807                                            "fields":[
808                                                {
809                                                    "name":"inner_field_1",
810                                                    "type":"double"
811                                                }
812                                            ]
813                                        }
814                                    ]
815                                }
816                            ]
817                        }
818                    ]
819            },
820            {
821                "name": "outer_field_2",
822                "type" : "middle_namespace.inner_record_name"
823            }
824          ]
825        }
826        "#;
827        let schema = Schema::parse_str(schema).unwrap();
828        let inner_record = Value::Record(vec![("inner_field_1".into(), Value::Double(5.4))]);
829        let middle_record_variation_1 = Value::Record(vec![(
830            "middle_field_1".into(),
831            Value::Union(0, Box::new(Value::Null)),
832        )]);
833        let middle_record_variation_2 = Value::Record(vec![(
834            "middle_field_1".into(),
835            Value::Union(1, Box::new(inner_record.clone())),
836        )]);
837        let outer_record_variation_1 = Value::Record(vec![
838            (
839                "outer_field_1".into(),
840                Value::Union(0, Box::new(Value::Null)),
841            ),
842            ("outer_field_2".into(), inner_record.clone()),
843        ]);
844        let outer_record_variation_2 = Value::Record(vec![
845            (
846                "outer_field_1".into(),
847                Value::Union(1, Box::new(middle_record_variation_1)),
848            ),
849            ("outer_field_2".into(), inner_record.clone()),
850        ]);
851        let outer_record_variation_3 = Value::Record(vec![
852            (
853                "outer_field_1".into(),
854                Value::Union(1, Box::new(middle_record_variation_2)),
855            ),
856            ("outer_field_2".into(), inner_record),
857        ]);
858
859        let mut buf = Vec::new();
860        encode(&outer_record_variation_1, &schema, &mut buf)
861            .expect(&success(&outer_record_variation_1, &schema));
862        assert!(!buf.is_empty());
863        buf.drain(..);
864        encode(&outer_record_variation_2, &schema, &mut buf)
865            .expect(&success(&outer_record_variation_2, &schema));
866        assert!(!buf.is_empty());
867        buf.drain(..);
868        encode(&outer_record_variation_3, &schema, &mut buf)
869            .expect(&success(&outer_record_variation_3, &schema));
870        assert!(!buf.is_empty());
871    }
872
873    #[test]
874    fn test_avro_3448_proper_multi_level_encoding_inner_namespace() {
875        let schema = r#"
876        {
877          "name": "record_name",
878          "namespace": "space",
879          "type": "record",
880          "fields": [
881            {
882              "name": "outer_field_1",
883              "type": [
884                        "null",
885                        {
886                            "type": "record",
887                            "name": "middle_record_name",
888                            "namespace":"middle_namespace",
889                            "fields":[
890                                {
891                                    "name":"middle_field_1",
892                                    "type":[
893                                        "null",
894                                        {
895                                            "type":"record",
896                                            "name":"inner_record_name",
897                                            "namespace":"inner_namespace",
898                                            "fields":[
899                                                {
900                                                    "name":"inner_field_1",
901                                                    "type":"double"
902                                                }
903                                            ]
904                                        }
905                                    ]
906                                }
907                            ]
908                        }
909                    ]
910            },
911            {
912                "name": "outer_field_2",
913                "type" : "inner_namespace.inner_record_name"
914            }
915          ]
916        }
917        "#;
918        let schema = Schema::parse_str(schema).unwrap();
919        let inner_record = Value::Record(vec![("inner_field_1".into(), Value::Double(5.4))]);
920        let middle_record_variation_1 = Value::Record(vec![(
921            "middle_field_1".into(),
922            Value::Union(0, Box::new(Value::Null)),
923        )]);
924        let middle_record_variation_2 = Value::Record(vec![(
925            "middle_field_1".into(),
926            Value::Union(1, Box::new(inner_record.clone())),
927        )]);
928        let outer_record_variation_1 = Value::Record(vec![
929            (
930                "outer_field_1".into(),
931                Value::Union(0, Box::new(Value::Null)),
932            ),
933            ("outer_field_2".into(), inner_record.clone()),
934        ]);
935        let outer_record_variation_2 = Value::Record(vec![
936            (
937                "outer_field_1".into(),
938                Value::Union(1, Box::new(middle_record_variation_1)),
939            ),
940            ("outer_field_2".into(), inner_record.clone()),
941        ]);
942        let outer_record_variation_3 = Value::Record(vec![
943            (
944                "outer_field_1".into(),
945                Value::Union(1, Box::new(middle_record_variation_2)),
946            ),
947            ("outer_field_2".into(), inner_record),
948        ]);
949
950        let mut buf = Vec::new();
951        encode(&outer_record_variation_1, &schema, &mut buf)
952            .expect(&success(&outer_record_variation_1, &schema));
953        assert!(!buf.is_empty());
954        buf.drain(..);
955        encode(&outer_record_variation_2, &schema, &mut buf)
956            .expect(&success(&outer_record_variation_2, &schema));
957        assert!(!buf.is_empty());
958        buf.drain(..);
959        encode(&outer_record_variation_3, &schema, &mut buf)
960            .expect(&success(&outer_record_variation_3, &schema));
961        assert!(!buf.is_empty());
962    }
963
964    #[test]
965    fn test_avro_3585_encode_uuids() {
966        let value = Value::String(String::from("00000000-0000-0000-0000-000000000000"));
967        let schema = Schema::Uuid(UuidSchema::String);
968        let mut buffer = Vec::new();
969        let encoded = encode(&value, &schema, &mut buffer);
970        assert!(encoded.is_ok());
971        assert!(!buffer.is_empty());
972    }
973
974    #[test]
975    fn avro_3926_encode_decode_uuid_to_fixed_wrong_schema_size() -> TestResult {
976        let schema = Schema::Fixed(FixedSchema {
977            size: 15,
978            name: "uuid".try_into()?,
979            aliases: None,
980            doc: None,
981            attributes: Default::default(),
982        });
983        let value = Value::Uuid(Uuid::parse_str("550e8400-e29b-41d4-a716-446655440000")?);
984
985        let mut buffer = Vec::new();
986        match encode(&value, &schema, &mut buffer).map_err(Error::into_details) {
987            Err(Details::ConvertFixedToUuid(actual)) => {
988                assert_eq!(actual, 15);
989            }
990            _ => panic!("Expected Details::ConvertFixedToUuid"),
991        }
992
993        Ok(())
994    }
995}