1use crate::schema::{InnerDecimalSchema, UuidSchema};
20use crate::{
21 AvroResult, Error,
22 bigdecimal::{deserialize_big_decimal, serialize_big_decimal},
23 decimal::Decimal,
24 duration::Duration,
25 error::Details,
26 schema::{
27 DecimalSchema, EnumSchema, FixedSchema, Name, Namespace, Precision, RecordField,
28 RecordSchema, ResolvedSchema, Scale, Schema, SchemaKind, UnionSchema,
29 },
30};
31use bigdecimal::BigDecimal;
32use log::{debug, error};
33use serde_json::{Number, Value as JsonValue};
34use std::{
35 borrow::Borrow,
36 collections::{BTreeMap, HashMap},
37 fmt::Debug,
38 hash::BuildHasher,
39 str::FromStr,
40};
41use uuid::Uuid;
42
43fn max_prec_for_len(len: usize) -> Result<usize, Error> {
45 let len = i32::try_from(len).map_err(|e| Details::ConvertLengthToI32(e, len))?;
46 Ok((2.0_f64.powi(8 * len - 1) - 1.0).log10().floor() as usize)
47}
48
49#[derive(Clone, Debug, PartialEq, strum_macros::EnumDiscriminants)]
54#[strum_discriminants(name(ValueKind))]
55pub enum Value {
56 Null,
58 Boolean(bool),
60 Int(i32),
62 Long(i64),
64 Float(f32),
66 Double(f64),
68 Bytes(Vec<u8>),
70 String(String),
72 Fixed(usize, Vec<u8>),
75 Enum(u32, String),
82 Union(u32, Box<Value>),
89 Array(Vec<Value>),
91 Map(HashMap<String, Value>),
93 Record(Vec<(String, Value)>),
100 Date(i32),
105 Decimal(Decimal),
107 BigDecimal(BigDecimal),
109 TimeMillis(i32),
111 TimeMicros(i64),
113 TimestampMillis(i64),
115 TimestampMicros(i64),
117 TimestampNanos(i64),
119 LocalTimestampMillis(i64),
121 LocalTimestampMicros(i64),
123 LocalTimestampNanos(i64),
125 Duration(Duration),
127 Uuid(Uuid),
129}
130
131macro_rules! to_value(
132 ($type:ty, $variant_constructor:expr) => (
133 impl From<$type> for Value {
134 fn from(value: $type) -> Self {
135 $variant_constructor(value)
136 }
137 }
138 );
139);
140
141to_value!(bool, Value::Boolean);
142to_value!(i32, Value::Int);
143to_value!(i64, Value::Long);
144to_value!(f32, Value::Float);
145to_value!(f64, Value::Double);
146to_value!(String, Value::String);
147to_value!(Vec<u8>, Value::Bytes);
148to_value!(uuid::Uuid, Value::Uuid);
149to_value!(Decimal, Value::Decimal);
150to_value!(BigDecimal, Value::BigDecimal);
151to_value!(Duration, Value::Duration);
152
153impl From<()> for Value {
154 fn from(_: ()) -> Self {
155 Self::Null
156 }
157}
158
159impl From<usize> for Value {
160 fn from(value: usize) -> Self {
161 i64::try_from(value)
162 .expect("cannot convert usize to i64")
163 .into()
164 }
165}
166
167impl From<&str> for Value {
168 fn from(value: &str) -> Self {
169 Self::String(value.to_owned())
170 }
171}
172
173impl From<&[u8]> for Value {
174 fn from(value: &[u8]) -> Self {
175 Self::Bytes(value.to_owned())
176 }
177}
178
179impl<T> From<Option<T>> for Value
180where
181 T: Into<Self>,
182{
183 fn from(value: Option<T>) -> Self {
184 Self::Union(
186 value.is_some() as u32,
187 Box::new(value.map_or_else(|| Self::Null, Into::into)),
188 )
189 }
190}
191
192impl<K, V, S> From<HashMap<K, V, S>> for Value
193where
194 K: Into<String>,
195 V: Into<Self>,
196 S: BuildHasher,
197{
198 fn from(value: HashMap<K, V, S>) -> Self {
199 Self::Map(
200 value
201 .into_iter()
202 .map(|(key, value)| (key.into(), value.into()))
203 .collect(),
204 )
205 }
206}
207
208#[derive(Debug, Clone)]
210pub struct Record<'a> {
211 pub fields: Vec<(String, Value)>,
215 schema_lookup: &'a BTreeMap<String, usize>,
216}
217
218impl Record<'_> {
219 pub fn new(schema: &Schema) -> Option<Record<'_>> {
223 match *schema {
224 Schema::Record(RecordSchema {
225 fields: ref schema_fields,
226 lookup: ref schema_lookup,
227 ..
228 }) => {
229 let mut fields = Vec::with_capacity(schema_fields.len());
230 for schema_field in schema_fields.iter() {
231 fields.push((schema_field.name.clone(), Value::Null));
232 }
233
234 Some(Record {
235 fields,
236 schema_lookup,
237 })
238 }
239 _ => None,
240 }
241 }
242
243 pub fn put<V>(&mut self, field: &str, value: V)
249 where
250 V: Into<Value>,
251 {
252 if let Some(&position) = self.schema_lookup.get(field) {
253 self.fields[position].1 = value.into()
254 }
255 }
256
257 pub fn get(&self, field: &str) -> Option<&Value> {
260 self.schema_lookup
261 .get(field)
262 .map(|&position| &self.fields[position].1)
263 }
264}
265
266impl<'a> From<Record<'a>> for Value {
267 fn from(value: Record<'a>) -> Self {
268 Self::Record(value.fields)
269 }
270}
271
272impl From<JsonValue> for Value {
273 fn from(value: JsonValue) -> Self {
274 match value {
275 JsonValue::Null => Self::Null,
276 JsonValue::Bool(b) => b.into(),
277 JsonValue::Number(ref n) if n.is_i64() => {
278 let n = n.as_i64().unwrap();
279 if n >= i32::MIN as i64 && n <= i32::MAX as i64 {
280 Value::Int(n as i32)
281 } else {
282 Value::Long(n)
283 }
284 }
285 JsonValue::Number(ref n) if n.is_f64() => Value::Double(n.as_f64().unwrap()),
286 JsonValue::Number(n) => Value::Long(n.as_u64().unwrap() as i64), JsonValue::String(s) => s.into(),
288 JsonValue::Array(items) => Value::Array(items.into_iter().map(Value::from).collect()),
289 JsonValue::Object(items) => Value::Map(
290 items
291 .into_iter()
292 .map(|(key, value)| (key, value.into()))
293 .collect(),
294 ),
295 }
296 }
297}
298
299impl TryFrom<Value> for JsonValue {
301 type Error = crate::error::Error;
302 fn try_from(value: Value) -> AvroResult<Self> {
303 match value {
304 Value::Null => Ok(Self::Null),
305 Value::Boolean(b) => Ok(Self::Bool(b)),
306 Value::Int(i) => Ok(Self::Number(i.into())),
307 Value::Long(l) => Ok(Self::Number(l.into())),
308 Value::Float(f) => Number::from_f64(f.into())
309 .map(Self::Number)
310 .ok_or_else(|| Details::ConvertF64ToJson(f.into()).into()),
311 Value::Double(d) => Number::from_f64(d)
312 .map(Self::Number)
313 .ok_or_else(|| Details::ConvertF64ToJson(d).into()),
314 Value::Bytes(bytes) => Ok(Self::Array(bytes.into_iter().map(|b| b.into()).collect())),
315 Value::String(s) => Ok(Self::String(s)),
316 Value::Fixed(_size, items) => {
317 Ok(Self::Array(items.into_iter().map(|v| v.into()).collect()))
318 }
319 Value::Enum(_i, s) => Ok(Self::String(s)),
320 Value::Union(_i, b) => Self::try_from(*b),
321 Value::Array(items) => items
322 .into_iter()
323 .map(Self::try_from)
324 .collect::<Result<Vec<_>, _>>()
325 .map(Self::Array),
326 Value::Map(items) => items
327 .into_iter()
328 .map(|(key, value)| Self::try_from(value).map(|v| (key, v)))
329 .collect::<Result<Vec<_>, _>>()
330 .map(|v| Self::Object(v.into_iter().collect())),
331 Value::Record(items) => items
332 .into_iter()
333 .map(|(key, value)| Self::try_from(value).map(|v| (key, v)))
334 .collect::<Result<Vec<_>, _>>()
335 .map(|v| Self::Object(v.into_iter().collect())),
336 Value::Date(d) => Ok(Self::Number(d.into())),
337 Value::Decimal(ref d) => <Vec<u8>>::try_from(d)
338 .map(|vec| Self::Array(vec.into_iter().map(|v| v.into()).collect())),
339 Value::BigDecimal(ref bg) => {
340 let vec1: Vec<u8> = serialize_big_decimal(bg)?;
341 Ok(Self::Array(vec1.into_iter().map(|b| b.into()).collect()))
342 }
343 Value::TimeMillis(t) => Ok(Self::Number(t.into())),
344 Value::TimeMicros(t) => Ok(Self::Number(t.into())),
345 Value::TimestampMillis(t) => Ok(Self::Number(t.into())),
346 Value::TimestampMicros(t) => Ok(Self::Number(t.into())),
347 Value::TimestampNanos(t) => Ok(Self::Number(t.into())),
348 Value::LocalTimestampMillis(t) => Ok(Self::Number(t.into())),
349 Value::LocalTimestampMicros(t) => Ok(Self::Number(t.into())),
350 Value::LocalTimestampNanos(t) => Ok(Self::Number(t.into())),
351 Value::Duration(d) => Ok(Self::Array(
352 <[u8; 12]>::from(d).iter().map(|&v| v.into()).collect(),
353 )),
354 Value::Uuid(uuid) => Ok(Self::String(uuid.as_hyphenated().to_string())),
355 }
356 }
357}
358
359impl Value {
360 pub fn validate(&self, schema: &Schema) -> bool {
365 self.validate_schemata(vec![schema])
366 }
367
368 pub fn validate_schemata(&self, schemata: Vec<&Schema>) -> bool {
369 let rs = ResolvedSchema::try_from(schemata.clone())
370 .expect("Schemata didn't successfully resolve");
371 let schemata_len = schemata.len();
372 schemata.iter().any(|schema| {
373 let enclosing_namespace = schema.namespace();
374
375 match self.validate_internal(schema, rs.get_names(), &enclosing_namespace) {
376 Some(reason) => {
377 let log_message =
378 format!("Invalid value: {self:?} for schema: {schema:?}. Reason: {reason}");
379 if schemata_len == 1 {
380 error!("{log_message}");
381 } else {
382 debug!("{log_message}");
383 };
384 false
385 }
386 None => true,
387 }
388 })
389 }
390
391 fn accumulate(accumulator: Option<String>, other: Option<String>) -> Option<String> {
392 match (accumulator, other) {
393 (None, None) => None,
394 (None, s @ Some(_)) => s,
395 (s @ Some(_), None) => s,
396 (Some(reason1), Some(reason2)) => Some(format!("{reason1}\n{reason2}")),
397 }
398 }
399
400 pub(crate) fn validate_internal<S: std::borrow::Borrow<Schema> + Debug>(
402 &self,
403 schema: &Schema,
404 names: &HashMap<Name, S>,
405 enclosing_namespace: &Namespace,
406 ) -> Option<String> {
407 match (self, schema) {
408 (_, Schema::Ref { name }) => {
409 let name = name.fully_qualified_name(enclosing_namespace);
410 names.get(&name).map_or_else(
411 || {
412 Some(format!(
413 "Unresolved schema reference: '{:?}'. Parsed names: {:?}",
414 name,
415 names.keys()
416 ))
417 },
418 |s| self.validate_internal(s.borrow(), names, &name.namespace),
419 )
420 }
421 (&Value::Null, &Schema::Null) => None,
422 (&Value::Boolean(_), &Schema::Boolean) => None,
423 (&Value::Int(_), &Schema::Int) => None,
424 (&Value::Int(_), &Schema::Date) => None,
425 (&Value::Int(_), &Schema::TimeMillis) => None,
426 (&Value::Int(_), &Schema::Long) => None,
427 (&Value::Long(_), &Schema::Long) => None,
428 (&Value::Long(_), &Schema::TimeMicros) => None,
429 (&Value::Long(_), &Schema::TimestampMillis) => None,
430 (&Value::Long(_), &Schema::TimestampMicros) => None,
431 (&Value::Long(_), &Schema::LocalTimestampMillis) => None,
432 (&Value::Long(_), &Schema::LocalTimestampMicros) => None,
433 (&Value::TimestampMicros(_), &Schema::TimestampMicros) => None,
434 (&Value::TimestampMillis(_), &Schema::TimestampMillis) => None,
435 (&Value::TimestampNanos(_), &Schema::TimestampNanos) => None,
436 (&Value::LocalTimestampMicros(_), &Schema::LocalTimestampMicros) => None,
437 (&Value::LocalTimestampMillis(_), &Schema::LocalTimestampMillis) => None,
438 (&Value::LocalTimestampNanos(_), &Schema::LocalTimestampNanos) => None,
439 (&Value::TimeMicros(_), &Schema::TimeMicros) => None,
440 (&Value::TimeMillis(_), &Schema::TimeMillis) => None,
441 (&Value::Date(_), &Schema::Date) => None,
442 (&Value::Decimal(_), &Schema::Decimal { .. }) => None,
443 (&Value::BigDecimal(_), &Schema::BigDecimal) => None,
444 (&Value::Duration(_), &Schema::Duration(_)) => None,
445 (&Value::Uuid(_), &Schema::Uuid(_)) => None,
446 (&Value::Float(_), &Schema::Float) => None,
447 (&Value::Float(_), &Schema::Double) => None,
448 (&Value::Double(_), &Schema::Double) => None,
449 (&Value::Bytes(_), &Schema::Bytes) => None,
450 (&Value::Bytes(_), &Schema::Decimal { .. }) => None,
451 (Value::Bytes(bytes), &Schema::Uuid(UuidSchema::Bytes)) => {
452 if bytes.len() != 16 {
453 Some(format!(
454 "The value's size ({}) is not the right length for a bytes UUID (16)",
455 bytes.len()
456 ))
457 } else {
458 None
459 }
460 }
461 (&Value::String(_), &Schema::String) => None,
462 (Value::String(string), &Schema::Uuid(UuidSchema::String)) => {
463 if string.len() < 32 {
465 Some(format!(
466 "The value's size ({}) is not the right length for a string UUID (>=32)",
467 string.len()
468 ))
469 } else {
470 None
471 }
472 }
473 (&Value::Fixed(n, _), &Schema::Fixed(FixedSchema { size, .. })) => {
474 if n != size {
475 Some(format!(
476 "The value's size ({n}) is different than the schema's size ({size})"
477 ))
478 } else {
479 None
480 }
481 }
482 (Value::Bytes(b), &Schema::Fixed(FixedSchema { size, .. })) => {
483 if b.len() != size {
484 Some(format!(
485 "The bytes' length ({}) is different than the schema's size ({})",
486 b.len(),
487 size
488 ))
489 } else {
490 None
491 }
492 }
493 (&Value::Fixed(n, _), &Schema::Duration(_)) => {
494 if n != 12 {
495 Some(format!(
496 "The value's size ('{n}') must be exactly 12 to be a Duration"
497 ))
498 } else {
499 None
500 }
501 }
502 (&Value::Fixed(n, _), Schema::Uuid(UuidSchema::Fixed(size, ..))) => {
503 if size.size != 16 {
504 Some(format!(
505 "The schema's size ('{}') must be exactly 16 to be a Uuid",
506 size.size
507 ))
508 } else if n != 16 {
509 Some(format!(
510 "The value's size ('{n}') must be exactly 16 to be a Uuid"
511 ))
512 } else {
513 None
514 }
515 }
516 (&Value::Fixed(_n, _), &Schema::Decimal { .. }) => None,
518 (Value::String(s), Schema::Enum(EnumSchema { symbols, .. })) => {
519 if !symbols.contains(s) {
520 Some(format!("'{s}' is not a member of the possible symbols"))
521 } else {
522 None
523 }
524 }
525 (
526 &Value::Enum(i, ref s),
527 Schema::Enum(EnumSchema {
528 symbols, default, ..
529 }),
530 ) => symbols
531 .get(i as usize)
532 .map(|ref symbol| {
533 if symbol != &s {
534 Some(format!("Symbol '{s}' is not at position '{i}'"))
535 } else {
536 None
537 }
538 })
539 .unwrap_or_else(|| match default {
540 Some(_) => None,
541 None => Some(format!("No symbol at position '{i}'")),
542 }),
543 (&Value::Union(i, ref value), Schema::Union(inner)) => inner
545 .variants()
546 .get(i as usize)
547 .map(|schema| value.validate_internal(schema, names, enclosing_namespace))
548 .unwrap_or_else(|| Some(format!("No schema in the union at position '{i}'"))),
549 (v, Schema::Union(inner)) => {
550 match inner.find_schema_with_known_schemata(v, Some(names), enclosing_namespace) {
551 Some(_) => None,
552 None => Some("Could not find matching type in union".to_string()),
553 }
554 }
555 (Value::Array(items), Schema::Array(inner)) => items.iter().fold(None, |acc, item| {
556 Value::accumulate(
557 acc,
558 item.validate_internal(&inner.items, names, enclosing_namespace),
559 )
560 }),
561 (Value::Map(items), Schema::Map(inner)) => {
562 items.iter().fold(None, |acc, (_, value)| {
563 Value::accumulate(
564 acc,
565 value.validate_internal(&inner.types, names, enclosing_namespace),
566 )
567 })
568 }
569 (
570 Value::Record(record_fields),
571 Schema::Record(RecordSchema {
572 fields,
573 lookup,
574 name,
575 ..
576 }),
577 ) => {
578 let non_nullable_fields_count =
579 fields.iter().filter(|&rf| !rf.is_nullable()).count();
580
581 if record_fields.len() < non_nullable_fields_count {
583 return Some(format!(
584 "The value's records length ({}) doesn't match the schema ({} non-nullable fields)",
585 record_fields.len(),
586 non_nullable_fields_count
587 ));
588 } else if record_fields.len() > fields.len() {
589 return Some(format!(
590 "The value's records length ({}) is greater than the schema's ({} fields)",
591 record_fields.len(),
592 fields.len(),
593 ));
594 }
595
596 record_fields
597 .iter()
598 .fold(None, |acc, (field_name, record_field)| {
599 let record_namespace = if name.namespace.is_none() {
600 enclosing_namespace
601 } else {
602 &name.namespace
603 };
604 match lookup.get(field_name) {
605 Some(idx) => {
606 let field = &fields[*idx];
607 Value::accumulate(
608 acc,
609 record_field.validate_internal(
610 &field.schema,
611 names,
612 record_namespace,
613 ),
614 )
615 }
616 None => Value::accumulate(
617 acc,
618 Some(format!("There is no schema field for field '{field_name}'")),
619 ),
620 }
621 })
622 }
623 (Value::Map(items), Schema::Record(RecordSchema { fields, .. })) => {
624 fields.iter().fold(None, |acc, field| {
625 if let Some(item) = items.get(&field.name) {
626 let res = item.validate_internal(&field.schema, names, enclosing_namespace);
627 Value::accumulate(acc, res)
628 } else if !field.is_nullable() {
629 Value::accumulate(
630 acc,
631 Some(format!(
632 "Field with name '{:?}' is not a member of the map items",
633 field.name
634 )),
635 )
636 } else {
637 acc
638 }
639 })
640 }
641 (v, s) => Some(format!(
642 "Unsupported value-schema combination! Value: {v:?}, schema: {s:?}"
643 )),
644 }
645 }
646
647 pub fn resolve(self, schema: &Schema) -> AvroResult<Self> {
654 self.resolve_schemata(schema, Vec::with_capacity(0))
655 }
656
657 pub fn resolve_schemata(self, schema: &Schema, schemata: Vec<&Schema>) -> AvroResult<Self> {
664 let enclosing_namespace = schema.namespace();
665 let rs = if schemata.is_empty() {
666 ResolvedSchema::try_from(schema)?
667 } else {
668 ResolvedSchema::try_from(schemata)?
669 };
670 self.resolve_internal(schema, rs.get_names(), &enclosing_namespace, &None)
671 }
672
673 pub(crate) fn resolve_internal<S: Borrow<Schema> + Debug>(
674 mut self,
675 schema: &Schema,
676 names: &HashMap<Name, S>,
677 enclosing_namespace: &Namespace,
678 field_default: &Option<JsonValue>,
679 ) -> AvroResult<Self> {
680 if SchemaKind::from(&self) == SchemaKind::Union
682 && SchemaKind::from(schema) != SchemaKind::Union
683 {
684 let v = match self {
686 Value::Union(_i, b) => *b,
687 _ => unreachable!(),
688 };
689 self = v;
690 }
691 match schema {
692 Schema::Ref { name } => {
693 let name = name.fully_qualified_name(enclosing_namespace);
694
695 if let Some(resolved) = names.get(&name) {
696 debug!("Resolved {name:?}");
697 self.resolve_internal(resolved.borrow(), names, &name.namespace, field_default)
698 } else {
699 error!("Failed to resolve schema {name:?}");
700 Err(Details::SchemaResolutionError(name.clone()).into())
701 }
702 }
703 Schema::Null => self.resolve_null(),
704 Schema::Boolean => self.resolve_boolean(),
705 Schema::Int => self.resolve_int(),
706 Schema::Long => self.resolve_long(),
707 Schema::Float => self.resolve_float(),
708 Schema::Double => self.resolve_double(),
709 Schema::Bytes => self.resolve_bytes(),
710 Schema::String => self.resolve_string(),
711 Schema::Fixed(FixedSchema { size, .. }) => self.resolve_fixed(*size),
712 Schema::Union(inner) => {
713 self.resolve_union(inner, names, enclosing_namespace, field_default)
714 }
715 Schema::Enum(EnumSchema {
716 symbols, default, ..
717 }) => self.resolve_enum(symbols, default, field_default),
718 Schema::Array(inner) => self.resolve_array(&inner.items, names, enclosing_namespace),
719 Schema::Map(inner) => self.resolve_map(&inner.types, names, enclosing_namespace),
720 Schema::Record(RecordSchema { fields, .. }) => {
721 self.resolve_record(fields, names, enclosing_namespace)
722 }
723 Schema::Decimal(DecimalSchema {
724 scale,
725 precision,
726 inner,
727 }) => self.resolve_decimal(*precision, *scale, inner),
728 Schema::BigDecimal => self.resolve_bigdecimal(),
729 Schema::Date => self.resolve_date(),
730 Schema::TimeMillis => self.resolve_time_millis(),
731 Schema::TimeMicros => self.resolve_time_micros(),
732 Schema::TimestampMillis => self.resolve_timestamp_millis(),
733 Schema::TimestampMicros => self.resolve_timestamp_micros(),
734 Schema::TimestampNanos => self.resolve_timestamp_nanos(),
735 Schema::LocalTimestampMillis => self.resolve_local_timestamp_millis(),
736 Schema::LocalTimestampMicros => self.resolve_local_timestamp_micros(),
737 Schema::LocalTimestampNanos => self.resolve_local_timestamp_nanos(),
738 Schema::Duration(_) => self.resolve_duration(),
739 Schema::Uuid(inner) => self.resolve_uuid(inner),
740 }
741 }
742
743 fn resolve_uuid(self, inner: &UuidSchema) -> Result<Self, Error> {
744 let value = match (self, inner) {
745 (uuid @ Value::Uuid(_), _) => uuid,
746 (Value::String(ref string), UuidSchema::String) => {
747 Value::Uuid(Uuid::from_str(string).map_err(Details::ConvertStrToUuid)?)
748 }
749 (Value::Bytes(ref bytes), UuidSchema::Bytes) => {
750 Value::Uuid(Uuid::from_slice(bytes).map_err(Details::ConvertSliceToUuid)?)
751 }
752 (Value::Fixed(n, ref bytes), UuidSchema::Fixed(_)) => {
753 if n != 16 {
754 return Err(Details::ConvertFixedToUuid(n).into());
755 }
756 Value::Uuid(Uuid::from_slice(bytes).map_err(Details::ConvertSliceToUuid)?)
757 }
758 (other, _) => return Err(Details::GetUuid(other).into()),
759 };
760 Ok(value)
761 }
762
763 fn resolve_bigdecimal(self) -> Result<Self, Error> {
764 Ok(match self {
765 bg @ Value::BigDecimal(_) => bg,
766 Value::Bytes(b) => Value::BigDecimal(deserialize_big_decimal(&b).unwrap()),
767 other => return Err(Details::GetBigDecimal(other).into()),
768 })
769 }
770
771 fn resolve_duration(self) -> Result<Self, Error> {
772 Ok(match self {
773 duration @ Value::Duration { .. } => duration,
774 Value::Fixed(size, bytes) => {
775 if size != 12 {
776 return Err(Details::GetDurationFixedBytes(size).into());
777 }
778 Value::Duration(Duration::from([
779 bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
780 bytes[8], bytes[9], bytes[10], bytes[11],
781 ]))
782 }
783 other => return Err(Details::ResolveDuration(other).into()),
784 })
785 }
786
787 fn resolve_decimal(
788 self,
789 precision: Precision,
790 scale: Scale,
791 inner: &InnerDecimalSchema,
792 ) -> Result<Self, Error> {
793 if scale > precision {
794 return Err(Details::GetScaleAndPrecision { scale, precision }.into());
795 }
796 match inner {
797 &InnerDecimalSchema::Fixed(FixedSchema { size, .. }) => {
798 if max_prec_for_len(size)? < precision {
799 return Err(Details::GetScaleWithFixedSize { size, precision }.into());
800 }
801 }
802 InnerDecimalSchema::Bytes => (),
803 };
804 match self {
805 Value::Decimal(num) => {
806 let num_bytes = num.len();
807 if max_prec_for_len(num_bytes)? < precision {
808 Err(Details::ComparePrecisionAndSize {
809 precision,
810 num_bytes,
811 }
812 .into())
813 } else {
814 Ok(Value::Decimal(num))
815 }
816 }
818 Value::Fixed(_, bytes) | Value::Bytes(bytes) => {
819 if max_prec_for_len(bytes.len())? < precision {
820 Err(Details::ComparePrecisionAndSize {
821 precision,
822 num_bytes: bytes.len(),
823 }
824 .into())
825 } else {
826 Ok(Value::Decimal(Decimal::from(bytes)))
828 }
829 }
830 other => Err(Details::ResolveDecimal(other).into()),
831 }
832 }
833
834 fn resolve_date(self) -> Result<Self, Error> {
835 match self {
836 Value::Date(d) | Value::Int(d) => Ok(Value::Date(d)),
837 other => Err(Details::GetDate(other).into()),
838 }
839 }
840
841 fn resolve_time_millis(self) -> Result<Self, Error> {
842 match self {
843 Value::TimeMillis(t) | Value::Int(t) => Ok(Value::TimeMillis(t)),
844 other => Err(Details::GetTimeMillis(other).into()),
845 }
846 }
847
848 fn resolve_time_micros(self) -> Result<Self, Error> {
849 match self {
850 Value::TimeMicros(t) | Value::Long(t) => Ok(Value::TimeMicros(t)),
851 Value::Int(t) => Ok(Value::TimeMicros(i64::from(t))),
852 other => Err(Details::GetTimeMicros(other).into()),
853 }
854 }
855
856 fn resolve_timestamp_millis(self) -> Result<Self, Error> {
857 match self {
858 Value::TimestampMillis(ts) | Value::Long(ts) => Ok(Value::TimestampMillis(ts)),
859 Value::Int(ts) => Ok(Value::TimestampMillis(i64::from(ts))),
860 other => Err(Details::GetTimestampMillis(other).into()),
861 }
862 }
863
864 fn resolve_timestamp_micros(self) -> Result<Self, Error> {
865 match self {
866 Value::TimestampMicros(ts) | Value::Long(ts) => Ok(Value::TimestampMicros(ts)),
867 Value::Int(ts) => Ok(Value::TimestampMicros(i64::from(ts))),
868 other => Err(Details::GetTimestampMicros(other).into()),
869 }
870 }
871
872 fn resolve_timestamp_nanos(self) -> Result<Self, Error> {
873 match self {
874 Value::TimestampNanos(ts) | Value::Long(ts) => Ok(Value::TimestampNanos(ts)),
875 Value::Int(ts) => Ok(Value::TimestampNanos(i64::from(ts))),
876 other => Err(Details::GetTimestampNanos(other).into()),
877 }
878 }
879
880 fn resolve_local_timestamp_millis(self) -> Result<Self, Error> {
881 match self {
882 Value::LocalTimestampMillis(ts) | Value::Long(ts) => {
883 Ok(Value::LocalTimestampMillis(ts))
884 }
885 Value::Int(ts) => Ok(Value::LocalTimestampMillis(i64::from(ts))),
886 other => Err(Details::GetLocalTimestampMillis(other).into()),
887 }
888 }
889
890 fn resolve_local_timestamp_micros(self) -> Result<Self, Error> {
891 match self {
892 Value::LocalTimestampMicros(ts) | Value::Long(ts) => {
893 Ok(Value::LocalTimestampMicros(ts))
894 }
895 Value::Int(ts) => Ok(Value::LocalTimestampMicros(i64::from(ts))),
896 other => Err(Details::GetLocalTimestampMicros(other).into()),
897 }
898 }
899
900 fn resolve_local_timestamp_nanos(self) -> Result<Self, Error> {
901 match self {
902 Value::LocalTimestampNanos(ts) | Value::Long(ts) => Ok(Value::LocalTimestampNanos(ts)),
903 Value::Int(ts) => Ok(Value::LocalTimestampNanos(i64::from(ts))),
904 other => Err(Details::GetLocalTimestampNanos(other).into()),
905 }
906 }
907
908 fn resolve_null(self) -> Result<Self, Error> {
909 match self {
910 Value::Null => Ok(Value::Null),
911 other => Err(Details::GetNull(other).into()),
912 }
913 }
914
915 fn resolve_boolean(self) -> Result<Self, Error> {
916 match self {
917 Value::Boolean(b) => Ok(Value::Boolean(b)),
918 other => Err(Details::GetBoolean(other).into()),
919 }
920 }
921
922 fn resolve_int(self) -> Result<Self, Error> {
923 match self {
924 Value::Int(n) => Ok(Value::Int(n)),
925 Value::Long(n) => {
926 let n = i32::try_from(n).map_err(|e| Details::ZagI32(e, n))?;
927 Ok(Value::Int(n))
928 }
929 other => Err(Details::GetInt(other).into()),
930 }
931 }
932
933 fn resolve_long(self) -> Result<Self, Error> {
934 match self {
935 Value::Int(n) => Ok(Value::Long(i64::from(n))),
936 Value::Long(n) => Ok(Value::Long(n)),
937 other => Err(Details::GetLong(other).into()),
938 }
939 }
940
941 fn resolve_float(self) -> Result<Self, Error> {
942 match self {
943 Value::Int(n) => Ok(Value::Float(n as f32)),
944 Value::Long(n) => Ok(Value::Float(n as f32)),
945 Value::Float(x) => Ok(Value::Float(x)),
946 Value::Double(x) => Ok(Value::Float(x as f32)),
947 Value::String(ref x) => match Self::parse_special_float(x) {
948 Some(f) => Ok(Value::Float(f)),
949 None => Err(Details::GetFloat(self).into()),
950 },
951 other => Err(Details::GetFloat(other).into()),
952 }
953 }
954
955 fn resolve_double(self) -> Result<Self, Error> {
956 match self {
957 Value::Int(n) => Ok(Value::Double(f64::from(n))),
958 Value::Long(n) => Ok(Value::Double(n as f64)),
959 Value::Float(x) => Ok(Value::Double(f64::from(x))),
960 Value::Double(x) => Ok(Value::Double(x)),
961 Value::String(ref x) => match Self::parse_special_float(x) {
962 Some(f) => Ok(Value::Double(f64::from(f))),
963 None => Err(Details::GetDouble(self).into()),
964 },
965 other => Err(Details::GetDouble(other).into()),
966 }
967 }
968
969 fn parse_special_float(value: &str) -> Option<f32> {
972 match value {
973 "NaN" => Some(f32::NAN),
974 "INF" | "Infinity" => Some(f32::INFINITY),
975 "-INF" | "-Infinity" => Some(f32::NEG_INFINITY),
976 _ => None,
977 }
978 }
979
980 fn resolve_bytes(self) -> Result<Self, Error> {
981 match self {
982 Value::Bytes(bytes) => Ok(Value::Bytes(bytes)),
983 Value::String(s) => Ok(Value::Bytes(s.into_bytes())),
984 Value::Array(items) => Ok(Value::Bytes(
985 items
986 .into_iter()
987 .map(Value::try_u8)
988 .collect::<Result<Vec<_>, _>>()?,
989 )),
990 other => Err(Details::GetBytes(other).into()),
991 }
992 }
993
994 fn resolve_string(self) -> Result<Self, Error> {
995 match self {
996 Value::String(s) => Ok(Value::String(s)),
997 Value::Bytes(bytes) | Value::Fixed(_, bytes) => Ok(Value::String(
998 String::from_utf8(bytes).map_err(Details::ConvertToUtf8)?,
999 )),
1000 other => Err(Details::GetString(other).into()),
1001 }
1002 }
1003
1004 fn resolve_fixed(self, size: usize) -> Result<Self, Error> {
1005 match self {
1006 Value::Fixed(n, bytes) => {
1007 if n == size {
1008 Ok(Value::Fixed(n, bytes))
1009 } else {
1010 Err(Details::CompareFixedSizes { size, n }.into())
1011 }
1012 }
1013 Value::String(s) => Ok(Value::Fixed(s.len(), s.into_bytes())),
1014 Value::Bytes(s) => {
1015 if s.len() == size {
1016 Ok(Value::Fixed(size, s))
1017 } else {
1018 Err(Details::CompareFixedSizes { size, n: s.len() }.into())
1019 }
1020 }
1021 other => Err(Details::GetStringForFixed(other).into()),
1022 }
1023 }
1024
1025 pub(crate) fn resolve_enum(
1026 self,
1027 symbols: &[String],
1028 enum_default: &Option<String>,
1029 _field_default: &Option<JsonValue>,
1030 ) -> Result<Self, Error> {
1031 let validate_symbol = |symbol: String, symbols: &[String]| {
1032 if let Some(index) = symbols.iter().position(|item| item == &symbol) {
1033 Ok(Value::Enum(index as u32, symbol))
1034 } else {
1035 match enum_default {
1036 Some(default) => {
1037 if let Some(index) = symbols.iter().position(|item| item == default) {
1038 Ok(Value::Enum(index as u32, default.clone()))
1039 } else {
1040 Err(Details::GetEnumDefault {
1041 symbol,
1042 symbols: symbols.into(),
1043 }
1044 .into())
1045 }
1046 }
1047 _ => Err(Details::GetEnumDefault {
1048 symbol,
1049 symbols: symbols.into(),
1050 }
1051 .into()),
1052 }
1053 }
1054 };
1055
1056 match self {
1057 Value::Enum(_raw_index, s) => validate_symbol(s, symbols),
1058 Value::String(s) => validate_symbol(s, symbols),
1059 other => Err(Details::GetEnum(other).into()),
1060 }
1061 }
1062
1063 fn resolve_union<S: Borrow<Schema> + Debug>(
1064 self,
1065 schema: &UnionSchema,
1066 names: &HashMap<Name, S>,
1067 enclosing_namespace: &Namespace,
1068 field_default: &Option<JsonValue>,
1069 ) -> Result<Self, Error> {
1070 let v = match self {
1071 Value::Union(_i, v) => *v,
1073 v => v,
1075 };
1076 let (i, inner) = schema
1077 .find_schema_with_known_schemata(&v, Some(names), enclosing_namespace)
1078 .ok_or_else(|| Details::FindUnionVariant {
1079 schema: schema.clone(),
1080 value: v.clone(),
1081 })?;
1082
1083 Ok(Value::Union(
1084 i as u32,
1085 Box::new(v.resolve_internal(inner, names, enclosing_namespace, field_default)?),
1086 ))
1087 }
1088
1089 fn resolve_array<S: Borrow<Schema> + Debug>(
1090 self,
1091 schema: &Schema,
1092 names: &HashMap<Name, S>,
1093 enclosing_namespace: &Namespace,
1094 ) -> Result<Self, Error> {
1095 match self {
1096 Value::Array(items) => Ok(Value::Array(
1097 items
1098 .into_iter()
1099 .map(|item| item.resolve_internal(schema, names, enclosing_namespace, &None))
1100 .collect::<Result<_, _>>()?,
1101 )),
1102 other => Err(Details::GetArray {
1103 expected: schema.into(),
1104 other,
1105 }
1106 .into()),
1107 }
1108 }
1109
1110 fn resolve_map<S: Borrow<Schema> + Debug>(
1111 self,
1112 schema: &Schema,
1113 names: &HashMap<Name, S>,
1114 enclosing_namespace: &Namespace,
1115 ) -> Result<Self, Error> {
1116 match self {
1117 Value::Map(items) => Ok(Value::Map(
1118 items
1119 .into_iter()
1120 .map(|(key, value)| {
1121 value
1122 .resolve_internal(schema, names, enclosing_namespace, &None)
1123 .map(|value| (key, value))
1124 })
1125 .collect::<Result<_, _>>()?,
1126 )),
1127 other => Err(Details::GetMap {
1128 expected: schema.into(),
1129 other,
1130 }
1131 .into()),
1132 }
1133 }
1134
1135 fn resolve_record<S: Borrow<Schema> + Debug>(
1136 self,
1137 fields: &[RecordField],
1138 names: &HashMap<Name, S>,
1139 enclosing_namespace: &Namespace,
1140 ) -> Result<Self, Error> {
1141 let mut items = match self {
1142 Value::Map(items) => Ok(items),
1143 Value::Record(fields) => Ok(fields.into_iter().collect::<HashMap<_, _>>()),
1144 other => Err(Error::new(Details::GetRecord {
1145 expected: fields
1146 .iter()
1147 .map(|field| (field.name.clone(), field.schema.clone().into()))
1148 .collect(),
1149 other,
1150 })),
1151 }?;
1152
1153 let new_fields = fields
1154 .iter()
1155 .map(|field| {
1156 let value = match items.remove(&field.name) {
1157 Some(value) => value,
1158 None => match field.default {
1159 Some(ref value) => match field.schema {
1160 Schema::Enum(EnumSchema {
1161 ref symbols,
1162 ref default,
1163 ..
1164 }) => Value::from(value.clone()).resolve_enum(
1165 symbols,
1166 default,
1167 &field.default.clone(),
1168 )?,
1169 Schema::Union(ref union_schema) => {
1170 let first = &union_schema.variants()[0];
1171 match first {
1174 Schema::Null => Value::Union(0, Box::new(Value::Null)),
1175 _ => Value::Union(
1176 0,
1177 Box::new(Value::from(value.clone()).resolve_internal(
1178 first,
1179 names,
1180 enclosing_namespace,
1181 &field.default,
1182 )?),
1183 ),
1184 }
1185 }
1186 _ => Value::from(value.clone()),
1187 },
1188 None => {
1189 return Err(Details::GetField(field.name.clone()).into());
1190 }
1191 },
1192 };
1193 value
1194 .resolve_internal(&field.schema, names, enclosing_namespace, &field.default)
1195 .map(|value| (field.name.clone(), value))
1196 })
1197 .collect::<Result<Vec<_>, _>>()?;
1198
1199 Ok(Value::Record(new_fields))
1200 }
1201
1202 fn try_u8(self) -> AvroResult<u8> {
1203 let int = self.resolve(&Schema::Int)?;
1204 if let Value::Int(n) = int
1205 && n >= 0
1206 && n <= i32::from(u8::MAX)
1207 {
1208 return Ok(n as u8);
1209 }
1210
1211 Err(Details::GetU8(int).into())
1212 }
1213}
1214
1215#[cfg(test)]
1216mod tests {
1217 use super::*;
1218 use crate::{
1219 duration::{Days, Millis, Months},
1220 error::Details,
1221 schema::RecordFieldOrder,
1222 };
1223 use apache_avro_test_helper::{
1224 TestResult,
1225 logger::{assert_logged, assert_not_logged},
1226 };
1227 use num_bigint::BigInt;
1228 use pretty_assertions::assert_eq;
1229 use serde_json::json;
1230
1231 #[test]
1232 fn avro_3809_validate_nested_records_with_implicit_namespace() -> TestResult {
1233 let schema = Schema::parse_str(
1234 r#"{
1235 "name": "record_name",
1236 "namespace": "space",
1237 "type": "record",
1238 "fields": [
1239 {
1240 "name": "outer_field_1",
1241 "type": {
1242 "type": "record",
1243 "name": "middle_record_name",
1244 "namespace": "middle_namespace",
1245 "fields": [
1246 {
1247 "name": "middle_field_1",
1248 "type": {
1249 "type": "record",
1250 "name": "inner_record_name",
1251 "fields": [
1252 { "name": "inner_field_1", "type": "double" }
1253 ]
1254 }
1255 },
1256 { "name": "middle_field_2", "type": "inner_record_name" }
1257 ]
1258 }
1259 }
1260 ]
1261 }"#,
1262 )?;
1263 let value = Value::Record(vec![(
1264 "outer_field_1".into(),
1265 Value::Record(vec![
1266 (
1267 "middle_field_1".into(),
1268 Value::Record(vec![("inner_field_1".into(), Value::Double(1.2f64))]),
1269 ),
1270 (
1271 "middle_field_2".into(),
1272 Value::Record(vec![("inner_field_1".into(), Value::Double(1.6f64))]),
1273 ),
1274 ]),
1275 )]);
1276
1277 assert!(value.validate(&schema));
1278 Ok(())
1279 }
1280
1281 #[test]
1282 fn validate() -> TestResult {
1283 let value_schema_valid = vec![
1284 (Value::Int(42), Schema::Int, true, ""),
1285 (Value::Int(43), Schema::Long, true, ""),
1286 (Value::Float(43.2), Schema::Float, true, ""),
1287 (Value::Float(45.9), Schema::Double, true, ""),
1288 (
1289 Value::Int(42),
1290 Schema::Boolean,
1291 false,
1292 "Invalid value: Int(42) for schema: Boolean. Reason: Unsupported value-schema combination! Value: Int(42), schema: Boolean",
1293 ),
1294 (
1295 Value::Union(0, Box::new(Value::Null)),
1296 Schema::Union(UnionSchema::new(vec![Schema::Null, Schema::Int])?),
1297 true,
1298 "",
1299 ),
1300 (
1301 Value::Union(1, Box::new(Value::Int(42))),
1302 Schema::Union(UnionSchema::new(vec![Schema::Null, Schema::Int])?),
1303 true,
1304 "",
1305 ),
1306 (
1307 Value::Union(0, Box::new(Value::Null)),
1308 Schema::Union(UnionSchema::new(vec![Schema::Double, Schema::Int])?),
1309 false,
1310 "Invalid value: Union(0, Null) for schema: Union(UnionSchema { schemas: [Double, Int], variant_index: {Int: 1, Double: 0} }). Reason: Unsupported value-schema combination! Value: Null, schema: Double",
1311 ),
1312 (
1313 Value::Union(3, Box::new(Value::Int(42))),
1314 Schema::Union(UnionSchema::new(vec![
1315 Schema::Null,
1316 Schema::Double,
1317 Schema::String,
1318 Schema::Int,
1319 ])?),
1320 true,
1321 "",
1322 ),
1323 (
1324 Value::Union(1, Box::new(Value::Long(42i64))),
1325 Schema::Union(UnionSchema::new(vec![
1326 Schema::Null,
1327 Schema::TimestampMillis,
1328 ])?),
1329 true,
1330 "",
1331 ),
1332 (
1333 Value::Union(2, Box::new(Value::Long(1_i64))),
1334 Schema::Union(UnionSchema::new(vec![Schema::Null, Schema::Int])?),
1335 false,
1336 "Invalid value: Union(2, Long(1)) for schema: Union(UnionSchema { schemas: [Null, Int], variant_index: {Null: 0, Int: 1} }). Reason: No schema in the union at position '2'",
1337 ),
1338 (
1339 Value::Array(vec![Value::Long(42i64)]),
1340 Schema::array(Schema::Long),
1341 true,
1342 "",
1343 ),
1344 (
1345 Value::Array(vec![Value::Boolean(true)]),
1346 Schema::array(Schema::Long),
1347 false,
1348 "Invalid value: Array([Boolean(true)]) for schema: Array(ArraySchema { items: Long, attributes: {} }). Reason: Unsupported value-schema combination! Value: Boolean(true), schema: Long",
1349 ),
1350 (
1351 Value::Record(vec![]),
1352 Schema::Null,
1353 false,
1354 "Invalid value: Record([]) for schema: Null. Reason: Unsupported value-schema combination! Value: Record([]), schema: Null",
1355 ),
1356 (
1357 Value::Fixed(12, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]),
1358 Schema::Duration(FixedSchema {
1359 name: Name::from("TestName"),
1360 aliases: None,
1361 doc: None,
1362 size: 12,
1363 default: None,
1364 attributes: BTreeMap::new(),
1365 }),
1366 true,
1367 "",
1368 ),
1369 (
1370 Value::Fixed(11, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]),
1371 Schema::Duration(FixedSchema {
1372 name: Name::from("TestName"),
1373 aliases: None,
1374 doc: None,
1375 size: 12,
1376 default: None,
1377 attributes: BTreeMap::new(),
1378 }),
1379 false,
1380 "Invalid value: Fixed(11, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) for schema: Duration(FixedSchema { name: Name { name: \"TestName\", namespace: None }, aliases: None, doc: None, size: 12, default: None, attributes: {} }). Reason: The value's size ('11') must be exactly 12 to be a Duration",
1381 ),
1382 (
1383 Value::Record(vec![("unknown_field_name".to_string(), Value::Null)]),
1384 Schema::Record(RecordSchema {
1385 name: Name::new("record_name").unwrap(),
1386 aliases: None,
1387 doc: None,
1388 fields: vec![RecordField {
1389 name: "field_name".to_string(),
1390 doc: None,
1391 default: None,
1392 aliases: None,
1393 schema: Schema::Int,
1394 order: RecordFieldOrder::Ignore,
1395 position: 0,
1396 custom_attributes: Default::default(),
1397 }],
1398 lookup: Default::default(),
1399 attributes: Default::default(),
1400 }),
1401 false,
1402 r#"Invalid value: Record([("unknown_field_name", Null)]) for schema: Record(RecordSchema { name: Name { name: "record_name", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "field_name", doc: None, aliases: None, default: None, schema: Int, order: Ignore, position: 0, custom_attributes: {} }], lookup: {}, attributes: {} }). Reason: There is no schema field for field 'unknown_field_name'"#,
1403 ),
1404 (
1405 Value::Record(vec![("field_name".to_string(), Value::Null)]),
1406 Schema::Record(RecordSchema {
1407 name: Name::new("record_name").unwrap(),
1408 aliases: None,
1409 doc: None,
1410 fields: vec![RecordField {
1411 name: "field_name".to_string(),
1412 doc: None,
1413 default: None,
1414 aliases: None,
1415 schema: Schema::Ref {
1416 name: Name::new("missing").unwrap(),
1417 },
1418 order: RecordFieldOrder::Ignore,
1419 position: 0,
1420 custom_attributes: Default::default(),
1421 }],
1422 lookup: [("field_name".to_string(), 0)].iter().cloned().collect(),
1423 attributes: Default::default(),
1424 }),
1425 false,
1426 r#"Invalid value: Record([("field_name", Null)]) for schema: Record(RecordSchema { name: Name { name: "record_name", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "field_name", doc: None, aliases: None, default: None, schema: Ref { name: Name { name: "missing", namespace: None } }, order: Ignore, position: 0, custom_attributes: {} }], lookup: {"field_name": 0}, attributes: {} }). Reason: Unresolved schema reference: 'Name { name: "missing", namespace: None }'. Parsed names: []"#,
1427 ),
1428 ];
1429
1430 for (value, schema, valid, expected_err_message) in value_schema_valid.into_iter() {
1431 let err_message =
1432 value.validate_internal::<Schema>(&schema, &HashMap::default(), &None);
1433 assert_eq!(valid, err_message.is_none());
1434 if !valid {
1435 let full_err_message = format!(
1436 "Invalid value: {:?} for schema: {:?}. Reason: {}",
1437 value,
1438 schema,
1439 err_message.unwrap()
1440 );
1441 assert_eq!(expected_err_message, full_err_message);
1442 }
1443 }
1444
1445 Ok(())
1446 }
1447
1448 #[test]
1449 fn validate_fixed() -> TestResult {
1450 let schema = Schema::Fixed(FixedSchema {
1451 size: 4,
1452 name: Name::new("some_fixed").unwrap(),
1453 aliases: None,
1454 doc: None,
1455 default: None,
1456 attributes: Default::default(),
1457 });
1458
1459 assert!(Value::Fixed(4, vec![0, 0, 0, 0]).validate(&schema));
1460 let value = Value::Fixed(5, vec![0, 0, 0, 0, 0]);
1461 assert!(!value.validate(&schema));
1462 assert_logged(
1463 format!(
1464 "Invalid value: {:?} for schema: {:?}. Reason: {}",
1465 value, schema, "The value's size (5) is different than the schema's size (4)"
1466 )
1467 .as_str(),
1468 );
1469
1470 assert!(Value::Bytes(vec![0, 0, 0, 0]).validate(&schema));
1471 let value = Value::Bytes(vec![0, 0, 0, 0, 0]);
1472 assert!(!value.validate(&schema));
1473 assert_logged(
1474 format!(
1475 "Invalid value: {:?} for schema: {:?}. Reason: {}",
1476 value, schema, "The bytes' length (5) is different than the schema's size (4)"
1477 )
1478 .as_str(),
1479 );
1480
1481 Ok(())
1482 }
1483
1484 #[test]
1485 fn validate_enum() -> TestResult {
1486 let schema = Schema::Enum(EnumSchema {
1487 name: Name::new("some_enum").unwrap(),
1488 aliases: None,
1489 doc: None,
1490 symbols: vec![
1491 "spades".to_string(),
1492 "hearts".to_string(),
1493 "diamonds".to_string(),
1494 "clubs".to_string(),
1495 ],
1496 default: None,
1497 attributes: Default::default(),
1498 });
1499
1500 assert!(Value::Enum(0, "spades".to_string()).validate(&schema));
1501 assert!(Value::String("spades".to_string()).validate(&schema));
1502
1503 let value = Value::Enum(1, "spades".to_string());
1504 assert!(!value.validate(&schema));
1505 assert_logged(
1506 format!(
1507 "Invalid value: {:?} for schema: {:?}. Reason: {}",
1508 value, schema, "Symbol 'spades' is not at position '1'"
1509 )
1510 .as_str(),
1511 );
1512
1513 let value = Value::Enum(1000, "spades".to_string());
1514 assert!(!value.validate(&schema));
1515 assert_logged(
1516 format!(
1517 "Invalid value: {:?} for schema: {:?}. Reason: {}",
1518 value, schema, "No symbol at position '1000'"
1519 )
1520 .as_str(),
1521 );
1522
1523 let value = Value::String("lorem".to_string());
1524 assert!(!value.validate(&schema));
1525 assert_logged(
1526 format!(
1527 "Invalid value: {:?} for schema: {:?}. Reason: {}",
1528 value, schema, "'lorem' is not a member of the possible symbols"
1529 )
1530 .as_str(),
1531 );
1532
1533 let other_schema = Schema::Enum(EnumSchema {
1534 name: Name::new("some_other_enum").unwrap(),
1535 aliases: None,
1536 doc: None,
1537 symbols: vec![
1538 "hearts".to_string(),
1539 "diamonds".to_string(),
1540 "clubs".to_string(),
1541 "spades".to_string(),
1542 ],
1543 default: None,
1544 attributes: Default::default(),
1545 });
1546
1547 let value = Value::Enum(0, "spades".to_string());
1548 assert!(!value.validate(&other_schema));
1549 assert_logged(
1550 format!(
1551 "Invalid value: {:?} for schema: {:?}. Reason: {}",
1552 value, other_schema, "Symbol 'spades' is not at position '0'"
1553 )
1554 .as_str(),
1555 );
1556
1557 Ok(())
1558 }
1559
1560 #[test]
1561 fn validate_record() -> TestResult {
1562 let schema = Schema::Record(RecordSchema {
1575 name: Name::new("some_record").unwrap(),
1576 aliases: None,
1577 doc: None,
1578 fields: vec![
1579 RecordField {
1580 name: "a".to_string(),
1581 doc: None,
1582 default: None,
1583 aliases: None,
1584 schema: Schema::Long,
1585 order: RecordFieldOrder::Ascending,
1586 position: 0,
1587 custom_attributes: Default::default(),
1588 },
1589 RecordField {
1590 name: "b".to_string(),
1591 doc: None,
1592 default: None,
1593 aliases: None,
1594 schema: Schema::String,
1595 order: RecordFieldOrder::Ascending,
1596 position: 1,
1597 custom_attributes: Default::default(),
1598 },
1599 RecordField {
1600 name: "c".to_string(),
1601 doc: None,
1602 default: Some(JsonValue::Null),
1603 aliases: None,
1604 schema: Schema::Union(UnionSchema::new(vec![Schema::Null, Schema::Int])?),
1605 order: RecordFieldOrder::Ascending,
1606 position: 2,
1607 custom_attributes: Default::default(),
1608 },
1609 ],
1610 lookup: [
1611 ("a".to_string(), 0),
1612 ("b".to_string(), 1),
1613 ("c".to_string(), 2),
1614 ]
1615 .iter()
1616 .cloned()
1617 .collect(),
1618 attributes: Default::default(),
1619 });
1620
1621 assert!(
1622 Value::Record(vec![
1623 ("a".to_string(), Value::Long(42i64)),
1624 ("b".to_string(), Value::String("foo".to_string())),
1625 ])
1626 .validate(&schema)
1627 );
1628
1629 let value = Value::Record(vec![
1630 ("b".to_string(), Value::String("foo".to_string())),
1631 ("a".to_string(), Value::Long(42i64)),
1632 ]);
1633 assert!(value.validate(&schema));
1634
1635 let value = Value::Record(vec![
1636 ("a".to_string(), Value::Boolean(false)),
1637 ("b".to_string(), Value::String("foo".to_string())),
1638 ]);
1639 assert!(!value.validate(&schema));
1640 assert_logged(
1641 r#"Invalid value: Record([("a", Boolean(false)), ("b", String("foo"))]) for schema: Record(RecordSchema { name: Name { name: "some_record", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "a", doc: None, aliases: None, default: None, schema: Long, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "b", doc: None, aliases: None, default: None, schema: String, order: Ascending, position: 1, custom_attributes: {} }, RecordField { name: "c", doc: None, aliases: None, default: Some(Null), schema: Union(UnionSchema { schemas: [Null, Int], variant_index: {Null: 0, Int: 1} }), order: Ascending, position: 2, custom_attributes: {} }], lookup: {"a": 0, "b": 1, "c": 2}, attributes: {} }). Reason: Unsupported value-schema combination! Value: Boolean(false), schema: Long"#,
1642 );
1643
1644 let value = Value::Record(vec![
1645 ("a".to_string(), Value::Long(42i64)),
1646 ("c".to_string(), Value::String("foo".to_string())),
1647 ]);
1648 assert!(!value.validate(&schema));
1649 assert_logged(
1650 r#"Invalid value: Record([("a", Long(42)), ("c", String("foo"))]) for schema: Record(RecordSchema { name: Name { name: "some_record", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "a", doc: None, aliases: None, default: None, schema: Long, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "b", doc: None, aliases: None, default: None, schema: String, order: Ascending, position: 1, custom_attributes: {} }, RecordField { name: "c", doc: None, aliases: None, default: Some(Null), schema: Union(UnionSchema { schemas: [Null, Int], variant_index: {Null: 0, Int: 1} }), order: Ascending, position: 2, custom_attributes: {} }], lookup: {"a": 0, "b": 1, "c": 2}, attributes: {} }). Reason: Could not find matching type in union"#,
1651 );
1652 assert_not_logged(
1653 r#"Invalid value: String("foo") for schema: Int. Reason: Unsupported value-schema combination"#,
1654 );
1655
1656 let value = Value::Record(vec![
1657 ("a".to_string(), Value::Long(42i64)),
1658 ("d".to_string(), Value::String("foo".to_string())),
1659 ]);
1660 assert!(!value.validate(&schema));
1661 assert_logged(
1662 r#"Invalid value: Record([("a", Long(42)), ("d", String("foo"))]) for schema: Record(RecordSchema { name: Name { name: "some_record", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "a", doc: None, aliases: None, default: None, schema: Long, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "b", doc: None, aliases: None, default: None, schema: String, order: Ascending, position: 1, custom_attributes: {} }, RecordField { name: "c", doc: None, aliases: None, default: Some(Null), schema: Union(UnionSchema { schemas: [Null, Int], variant_index: {Null: 0, Int: 1} }), order: Ascending, position: 2, custom_attributes: {} }], lookup: {"a": 0, "b": 1, "c": 2}, attributes: {} }). Reason: There is no schema field for field 'd'"#,
1663 );
1664
1665 let value = Value::Record(vec![
1666 ("a".to_string(), Value::Long(42i64)),
1667 ("b".to_string(), Value::String("foo".to_string())),
1668 ("c".to_string(), Value::Null),
1669 ("d".to_string(), Value::Null),
1670 ]);
1671 assert!(!value.validate(&schema));
1672 assert_logged(
1673 r#"Invalid value: Record([("a", Long(42)), ("b", String("foo")), ("c", Null), ("d", Null)]) for schema: Record(RecordSchema { name: Name { name: "some_record", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "a", doc: None, aliases: None, default: None, schema: Long, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "b", doc: None, aliases: None, default: None, schema: String, order: Ascending, position: 1, custom_attributes: {} }, RecordField { name: "c", doc: None, aliases: None, default: Some(Null), schema: Union(UnionSchema { schemas: [Null, Int], variant_index: {Null: 0, Int: 1} }), order: Ascending, position: 2, custom_attributes: {} }], lookup: {"a": 0, "b": 1, "c": 2}, attributes: {} }). Reason: The value's records length (4) is greater than the schema's (3 fields)"#,
1674 );
1675
1676 assert!(
1677 Value::Map(
1678 vec![
1679 ("a".to_string(), Value::Long(42i64)),
1680 ("b".to_string(), Value::String("foo".to_string())),
1681 ]
1682 .into_iter()
1683 .collect()
1684 )
1685 .validate(&schema)
1686 );
1687
1688 assert!(
1689 !Value::Map(
1690 vec![("d".to_string(), Value::Long(123_i64)),]
1691 .into_iter()
1692 .collect()
1693 )
1694 .validate(&schema)
1695 );
1696 assert_logged(
1697 r#"Invalid value: Map({"d": Long(123)}) for schema: Record(RecordSchema { name: Name { name: "some_record", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "a", doc: None, aliases: None, default: None, schema: Long, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "b", doc: None, aliases: None, default: None, schema: String, order: Ascending, position: 1, custom_attributes: {} }, RecordField { name: "c", doc: None, aliases: None, default: Some(Null), schema: Union(UnionSchema { schemas: [Null, Int], variant_index: {Null: 0, Int: 1} }), order: Ascending, position: 2, custom_attributes: {} }], lookup: {"a": 0, "b": 1, "c": 2}, attributes: {} }). Reason: Field with name '"a"' is not a member of the map items
1698Field with name '"b"' is not a member of the map items"#,
1699 );
1700
1701 let union_schema = Schema::Union(UnionSchema::new(vec![Schema::Null, schema])?);
1702
1703 assert!(
1704 Value::Union(
1705 1,
1706 Box::new(Value::Record(vec![
1707 ("a".to_string(), Value::Long(42i64)),
1708 ("b".to_string(), Value::String("foo".to_string())),
1709 ]))
1710 )
1711 .validate(&union_schema)
1712 );
1713
1714 assert!(
1715 Value::Union(
1716 1,
1717 Box::new(Value::Map(
1718 vec![
1719 ("a".to_string(), Value::Long(42i64)),
1720 ("b".to_string(), Value::String("foo".to_string())),
1721 ]
1722 .into_iter()
1723 .collect()
1724 ))
1725 )
1726 .validate(&union_schema)
1727 );
1728
1729 Ok(())
1730 }
1731
1732 #[test]
1733 fn resolve_bytes_ok() -> TestResult {
1734 let value = Value::Array(vec![Value::Int(0), Value::Int(42)]);
1735 assert_eq!(
1736 value.resolve(&Schema::Bytes)?,
1737 Value::Bytes(vec![0u8, 42u8])
1738 );
1739
1740 Ok(())
1741 }
1742
1743 #[test]
1744 fn resolve_string_from_bytes() -> TestResult {
1745 let value = Value::Bytes(vec![97, 98, 99]);
1746 assert_eq!(
1747 value.resolve(&Schema::String)?,
1748 Value::String("abc".to_string())
1749 );
1750
1751 Ok(())
1752 }
1753
1754 #[test]
1755 fn resolve_string_from_fixed() -> TestResult {
1756 let value = Value::Fixed(3, vec![97, 98, 99]);
1757 assert_eq!(
1758 value.resolve(&Schema::String)?,
1759 Value::String("abc".to_string())
1760 );
1761
1762 Ok(())
1763 }
1764
1765 #[test]
1766 fn resolve_bytes_failure() {
1767 let value = Value::Array(vec![Value::Int(2000), Value::Int(-42)]);
1768 assert!(value.resolve(&Schema::Bytes).is_err());
1769 }
1770
1771 #[test]
1772 fn resolve_decimal_bytes() -> TestResult {
1773 let value = Value::Decimal(Decimal::from(vec![1, 2, 3, 4, 5]));
1774 value.clone().resolve(&Schema::Decimal(DecimalSchema {
1775 precision: 10,
1776 scale: 4,
1777 inner: InnerDecimalSchema::Bytes,
1778 }))?;
1779 assert!(value.resolve(&Schema::String).is_err());
1780
1781 Ok(())
1782 }
1783
1784 #[test]
1785 fn resolve_decimal_invalid_scale() {
1786 let value = Value::Decimal(Decimal::from(vec![1, 2]));
1787 assert!(
1788 value
1789 .resolve(&Schema::Decimal(DecimalSchema {
1790 precision: 2,
1791 scale: 3,
1792 inner: InnerDecimalSchema::Bytes,
1793 }))
1794 .is_err()
1795 );
1796 }
1797
1798 #[test]
1799 fn resolve_decimal_invalid_precision_for_length() {
1800 let value = Value::Decimal(Decimal::from((1u8..=8u8).rev().collect::<Vec<_>>()));
1801 assert!(
1802 value
1803 .resolve(&Schema::Decimal(DecimalSchema {
1804 precision: 1,
1805 scale: 0,
1806 inner: InnerDecimalSchema::Bytes,
1807 }))
1808 .is_ok()
1809 );
1810 }
1811
1812 #[test]
1813 fn resolve_decimal_fixed() {
1814 let value = Value::Decimal(Decimal::from(vec![1, 2, 3, 4, 5]));
1815 assert!(
1816 value
1817 .clone()
1818 .resolve(&Schema::Decimal(DecimalSchema {
1819 precision: 10,
1820 scale: 1,
1821 inner: InnerDecimalSchema::Fixed(FixedSchema {
1822 name: Name::new("decimal").unwrap(),
1823 aliases: None,
1824 size: 20,
1825 doc: None,
1826 default: None,
1827 attributes: Default::default(),
1828 })
1829 }))
1830 .is_ok()
1831 );
1832 assert!(value.resolve(&Schema::String).is_err());
1833 }
1834
1835 #[test]
1836 fn resolve_date() {
1837 let value = Value::Date(2345);
1838 assert!(value.clone().resolve(&Schema::Date).is_ok());
1839 assert!(value.resolve(&Schema::String).is_err());
1840 }
1841
1842 #[test]
1843 fn resolve_time_millis() {
1844 let value = Value::TimeMillis(10);
1845 assert!(value.clone().resolve(&Schema::TimeMillis).is_ok());
1846 assert!(value.resolve(&Schema::TimeMicros).is_err());
1847 }
1848
1849 #[test]
1850 fn resolve_time_micros() {
1851 let value = Value::TimeMicros(10);
1852 assert!(value.clone().resolve(&Schema::TimeMicros).is_ok());
1853 assert!(value.resolve(&Schema::TimeMillis).is_err());
1854 }
1855
1856 #[test]
1857 fn resolve_timestamp_millis() {
1858 let value = Value::TimestampMillis(10);
1859 assert!(value.clone().resolve(&Schema::TimestampMillis).is_ok());
1860 assert!(value.resolve(&Schema::Float).is_err());
1861
1862 let value = Value::Float(10.0f32);
1863 assert!(value.resolve(&Schema::TimestampMillis).is_err());
1864 }
1865
1866 #[test]
1867 fn resolve_timestamp_micros() {
1868 let value = Value::TimestampMicros(10);
1869 assert!(value.clone().resolve(&Schema::TimestampMicros).is_ok());
1870 assert!(value.resolve(&Schema::Int).is_err());
1871
1872 let value = Value::Double(10.0);
1873 assert!(value.resolve(&Schema::TimestampMicros).is_err());
1874 }
1875
1876 #[test]
1877 fn test_avro_3914_resolve_timestamp_nanos() {
1878 let value = Value::TimestampNanos(10);
1879 assert!(value.clone().resolve(&Schema::TimestampNanos).is_ok());
1880 assert!(value.resolve(&Schema::Int).is_err());
1881
1882 let value = Value::Double(10.0);
1883 assert!(value.resolve(&Schema::TimestampNanos).is_err());
1884 }
1885
1886 #[test]
1887 fn test_avro_3853_resolve_timestamp_millis() {
1888 let value = Value::LocalTimestampMillis(10);
1889 assert!(value.clone().resolve(&Schema::LocalTimestampMillis).is_ok());
1890 assert!(value.resolve(&Schema::Float).is_err());
1891
1892 let value = Value::Float(10.0f32);
1893 assert!(value.resolve(&Schema::LocalTimestampMillis).is_err());
1894 }
1895
1896 #[test]
1897 fn test_avro_3853_resolve_timestamp_micros() {
1898 let value = Value::LocalTimestampMicros(10);
1899 assert!(value.clone().resolve(&Schema::LocalTimestampMicros).is_ok());
1900 assert!(value.resolve(&Schema::Int).is_err());
1901
1902 let value = Value::Double(10.0);
1903 assert!(value.resolve(&Schema::LocalTimestampMicros).is_err());
1904 }
1905
1906 #[test]
1907 fn test_avro_3916_resolve_timestamp_nanos() {
1908 let value = Value::LocalTimestampNanos(10);
1909 assert!(value.clone().resolve(&Schema::LocalTimestampNanos).is_ok());
1910 assert!(value.resolve(&Schema::Int).is_err());
1911
1912 let value = Value::Double(10.0);
1913 assert!(value.resolve(&Schema::LocalTimestampNanos).is_err());
1914 }
1915
1916 #[test]
1917 fn resolve_duration() {
1918 let value = Value::Duration(Duration::new(
1919 Months::new(10),
1920 Days::new(5),
1921 Millis::new(3000),
1922 ));
1923 assert!(
1924 value
1925 .clone()
1926 .resolve(&Schema::Duration(FixedSchema {
1927 name: Name::from("TestName"),
1928 aliases: None,
1929 doc: None,
1930 size: 12,
1931 default: None,
1932 attributes: BTreeMap::new()
1933 }))
1934 .is_ok()
1935 );
1936 assert!(value.resolve(&Schema::TimestampMicros).is_err());
1937 assert!(
1938 Value::Long(1i64)
1939 .resolve(&Schema::Duration(FixedSchema {
1940 name: Name::from("TestName"),
1941 aliases: None,
1942 doc: None,
1943 size: 12,
1944 default: None,
1945 attributes: BTreeMap::new()
1946 }))
1947 .is_err()
1948 );
1949 }
1950
1951 #[test]
1952 fn resolve_uuid() -> TestResult {
1953 let value = Value::Uuid(Uuid::parse_str("1481531d-ccc9-46d9-a56f-5b67459c0537")?);
1954 assert!(
1955 value
1956 .clone()
1957 .resolve(&Schema::Uuid(UuidSchema::String))
1958 .is_ok()
1959 );
1960 assert!(
1961 value
1962 .clone()
1963 .resolve(&Schema::Uuid(UuidSchema::Bytes))
1964 .is_ok()
1965 );
1966 assert!(
1967 value
1968 .clone()
1969 .resolve(&Schema::Uuid(UuidSchema::Fixed(FixedSchema {
1970 name: Name {
1971 name: "some_name".to_string(),
1972 namespace: None
1973 },
1974 aliases: None,
1975 doc: None,
1976 size: 16,
1977 default: None,
1978 attributes: Default::default(),
1979 })))
1980 .is_ok()
1981 );
1982 assert!(value.resolve(&Schema::TimestampMicros).is_err());
1983
1984 Ok(())
1985 }
1986
1987 #[test]
1988 fn avro_3678_resolve_float_to_double() {
1989 let value = Value::Float(2345.1);
1990 assert!(value.resolve(&Schema::Double).is_ok());
1991 }
1992
1993 #[test]
1994 fn test_avro_3621_resolve_to_nullable_union() -> TestResult {
1995 let schema = Schema::parse_str(
1996 r#"{
1997 "type": "record",
1998 "name": "root",
1999 "fields": [
2000 {
2001 "name": "event",
2002 "type": [
2003 "null",
2004 {
2005 "type": "record",
2006 "name": "event",
2007 "fields": [
2008 {
2009 "name": "amount",
2010 "type": "int"
2011 },
2012 {
2013 "name": "size",
2014 "type": [
2015 "null",
2016 "int"
2017 ],
2018 "default": null
2019 }
2020 ]
2021 }
2022 ],
2023 "default": null
2024 }
2025 ]
2026 }"#,
2027 )?;
2028
2029 let value = Value::Record(vec![(
2030 "event".to_string(),
2031 Value::Record(vec![("amount".to_string(), Value::Int(200))]),
2032 )]);
2033 assert!(value.resolve(&schema).is_ok());
2034
2035 let value = Value::Record(vec![(
2036 "event".to_string(),
2037 Value::Record(vec![("size".to_string(), Value::Int(1))]),
2038 )]);
2039 assert!(value.resolve(&schema).is_err());
2040
2041 Ok(())
2042 }
2043
2044 #[test]
2045 fn json_from_avro() -> TestResult {
2046 assert_eq!(JsonValue::try_from(Value::Null)?, JsonValue::Null);
2047 assert_eq!(
2048 JsonValue::try_from(Value::Boolean(true))?,
2049 JsonValue::Bool(true)
2050 );
2051 assert_eq!(
2052 JsonValue::try_from(Value::Int(1))?,
2053 JsonValue::Number(1.into())
2054 );
2055 assert_eq!(
2056 JsonValue::try_from(Value::Long(1))?,
2057 JsonValue::Number(1.into())
2058 );
2059 assert_eq!(
2060 JsonValue::try_from(Value::Float(1.0))?,
2061 JsonValue::Number(Number::from_f64(1.0).unwrap())
2062 );
2063 assert_eq!(
2064 JsonValue::try_from(Value::Double(1.0))?,
2065 JsonValue::Number(Number::from_f64(1.0).unwrap())
2066 );
2067 assert_eq!(
2068 JsonValue::try_from(Value::Bytes(vec![1, 2, 3]))?,
2069 JsonValue::Array(vec![
2070 JsonValue::Number(1.into()),
2071 JsonValue::Number(2.into()),
2072 JsonValue::Number(3.into())
2073 ])
2074 );
2075 assert_eq!(
2076 JsonValue::try_from(Value::String("test".into()))?,
2077 JsonValue::String("test".into())
2078 );
2079 assert_eq!(
2080 JsonValue::try_from(Value::Fixed(3, vec![1, 2, 3]))?,
2081 JsonValue::Array(vec![
2082 JsonValue::Number(1.into()),
2083 JsonValue::Number(2.into()),
2084 JsonValue::Number(3.into())
2085 ])
2086 );
2087 assert_eq!(
2088 JsonValue::try_from(Value::Enum(1, "test_enum".into()))?,
2089 JsonValue::String("test_enum".into())
2090 );
2091 assert_eq!(
2092 JsonValue::try_from(Value::Union(1, Box::new(Value::String("test_enum".into()))))?,
2093 JsonValue::String("test_enum".into())
2094 );
2095 assert_eq!(
2096 JsonValue::try_from(Value::Array(vec![
2097 Value::Int(1),
2098 Value::Int(2),
2099 Value::Int(3)
2100 ]))?,
2101 JsonValue::Array(vec![
2102 JsonValue::Number(1.into()),
2103 JsonValue::Number(2.into()),
2104 JsonValue::Number(3.into())
2105 ])
2106 );
2107 assert_eq!(
2108 JsonValue::try_from(Value::Map(
2109 vec![
2110 ("v1".to_string(), Value::Int(1)),
2111 ("v2".to_string(), Value::Int(2)),
2112 ("v3".to_string(), Value::Int(3))
2113 ]
2114 .into_iter()
2115 .collect()
2116 ))?,
2117 JsonValue::Object(
2118 vec![
2119 ("v1".to_string(), JsonValue::Number(1.into())),
2120 ("v2".to_string(), JsonValue::Number(2.into())),
2121 ("v3".to_string(), JsonValue::Number(3.into()))
2122 ]
2123 .into_iter()
2124 .collect()
2125 )
2126 );
2127 assert_eq!(
2128 JsonValue::try_from(Value::Record(vec![
2129 ("v1".to_string(), Value::Int(1)),
2130 ("v2".to_string(), Value::Int(2)),
2131 ("v3".to_string(), Value::Int(3))
2132 ]))?,
2133 JsonValue::Object(
2134 vec![
2135 ("v1".to_string(), JsonValue::Number(1.into())),
2136 ("v2".to_string(), JsonValue::Number(2.into())),
2137 ("v3".to_string(), JsonValue::Number(3.into()))
2138 ]
2139 .into_iter()
2140 .collect()
2141 )
2142 );
2143 assert_eq!(
2144 JsonValue::try_from(Value::Date(1))?,
2145 JsonValue::Number(1.into())
2146 );
2147 assert_eq!(
2148 JsonValue::try_from(Value::Decimal(vec![1, 2, 3].into()))?,
2149 JsonValue::Array(vec![
2150 JsonValue::Number(1.into()),
2151 JsonValue::Number(2.into()),
2152 JsonValue::Number(3.into())
2153 ])
2154 );
2155 assert_eq!(
2156 JsonValue::try_from(Value::TimeMillis(1))?,
2157 JsonValue::Number(1.into())
2158 );
2159 assert_eq!(
2160 JsonValue::try_from(Value::TimeMicros(1))?,
2161 JsonValue::Number(1.into())
2162 );
2163 assert_eq!(
2164 JsonValue::try_from(Value::TimestampMillis(1))?,
2165 JsonValue::Number(1.into())
2166 );
2167 assert_eq!(
2168 JsonValue::try_from(Value::TimestampMicros(1))?,
2169 JsonValue::Number(1.into())
2170 );
2171 assert_eq!(
2172 JsonValue::try_from(Value::TimestampNanos(1))?,
2173 JsonValue::Number(1.into())
2174 );
2175 assert_eq!(
2176 JsonValue::try_from(Value::LocalTimestampMillis(1))?,
2177 JsonValue::Number(1.into())
2178 );
2179 assert_eq!(
2180 JsonValue::try_from(Value::LocalTimestampMicros(1))?,
2181 JsonValue::Number(1.into())
2182 );
2183 assert_eq!(
2184 JsonValue::try_from(Value::LocalTimestampNanos(1))?,
2185 JsonValue::Number(1.into())
2186 );
2187 assert_eq!(
2188 JsonValue::try_from(Value::Duration(
2189 [
2190 1u8, 2u8, 3u8, 4u8, 5u8, 6u8, 7u8, 8u8, 9u8, 10u8, 11u8, 12u8
2191 ]
2192 .into()
2193 ))?,
2194 JsonValue::Array(vec![
2195 JsonValue::Number(1.into()),
2196 JsonValue::Number(2.into()),
2197 JsonValue::Number(3.into()),
2198 JsonValue::Number(4.into()),
2199 JsonValue::Number(5.into()),
2200 JsonValue::Number(6.into()),
2201 JsonValue::Number(7.into()),
2202 JsonValue::Number(8.into()),
2203 JsonValue::Number(9.into()),
2204 JsonValue::Number(10.into()),
2205 JsonValue::Number(11.into()),
2206 JsonValue::Number(12.into()),
2207 ])
2208 );
2209 assert_eq!(
2210 JsonValue::try_from(Value::Uuid(Uuid::parse_str(
2211 "936DA01F-9ABD-4D9D-80C7-02AF85C822A8"
2212 )?))?,
2213 JsonValue::String("936da01f-9abd-4d9d-80c7-02af85c822a8".into())
2214 );
2215
2216 Ok(())
2217 }
2218
2219 #[test]
2220 fn test_avro_3433_recursive_resolves_record() -> TestResult {
2221 let schema = Schema::parse_str(
2222 r#"
2223 {
2224 "type":"record",
2225 "name":"TestStruct",
2226 "fields": [
2227 {
2228 "name":"a",
2229 "type":{
2230 "type":"record",
2231 "name": "Inner",
2232 "fields": [ {
2233 "name":"z",
2234 "type":"int"
2235 }]
2236 }
2237 },
2238 {
2239 "name":"b",
2240 "type":"Inner"
2241 }
2242 ]
2243 }"#,
2244 )?;
2245
2246 let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
2247 let inner_value2 = Value::Record(vec![("z".into(), Value::Int(6))]);
2248 let outer = Value::Record(vec![("a".into(), inner_value1), ("b".into(), inner_value2)]);
2249 outer
2250 .resolve(&schema)
2251 .expect("Record definition defined in one field must be available in other field");
2252
2253 Ok(())
2254 }
2255
2256 #[test]
2257 fn test_avro_3433_recursive_resolves_array() -> TestResult {
2258 let schema = Schema::parse_str(
2259 r#"
2260 {
2261 "type":"record",
2262 "name":"TestStruct",
2263 "fields": [
2264 {
2265 "name":"a",
2266 "type":{
2267 "type":"array",
2268 "items": {
2269 "type":"record",
2270 "name": "Inner",
2271 "fields": [ {
2272 "name":"z",
2273 "type":"int"
2274 }]
2275 }
2276 }
2277 },
2278 {
2279 "name":"b",
2280 "type": {
2281 "type":"map",
2282 "values":"Inner"
2283 }
2284 }
2285 ]
2286 }"#,
2287 )?;
2288
2289 let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
2290 let inner_value2 = Value::Record(vec![("z".into(), Value::Int(6))]);
2291 let outer_value = Value::Record(vec![
2292 ("a".into(), Value::Array(vec![inner_value1])),
2293 (
2294 "b".into(),
2295 Value::Map(vec![("akey".into(), inner_value2)].into_iter().collect()),
2296 ),
2297 ]);
2298 outer_value
2299 .resolve(&schema)
2300 .expect("Record defined in array definition must be resolvable from map");
2301
2302 Ok(())
2303 }
2304
2305 #[test]
2306 fn test_avro_3433_recursive_resolves_map() -> TestResult {
2307 let schema = Schema::parse_str(
2308 r#"
2309 {
2310 "type":"record",
2311 "name":"TestStruct",
2312 "fields": [
2313 {
2314 "name":"a",
2315 "type":{
2316 "type":"record",
2317 "name": "Inner",
2318 "fields": [ {
2319 "name":"z",
2320 "type":"int"
2321 }]
2322 }
2323 },
2324 {
2325 "name":"b",
2326 "type": {
2327 "type":"map",
2328 "values":"Inner"
2329 }
2330 }
2331 ]
2332 }"#,
2333 )?;
2334
2335 let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
2336 let inner_value2 = Value::Record(vec![("z".into(), Value::Int(6))]);
2337 let outer_value = Value::Record(vec![
2338 ("a".into(), inner_value1),
2339 (
2340 "b".into(),
2341 Value::Map(vec![("akey".into(), inner_value2)].into_iter().collect()),
2342 ),
2343 ]);
2344 outer_value
2345 .resolve(&schema)
2346 .expect("Record defined in record field must be resolvable from map field");
2347
2348 Ok(())
2349 }
2350
2351 #[test]
2352 fn test_avro_3433_recursive_resolves_record_wrapper() -> TestResult {
2353 let schema = Schema::parse_str(
2354 r#"
2355 {
2356 "type":"record",
2357 "name":"TestStruct",
2358 "fields": [
2359 {
2360 "name":"a",
2361 "type":{
2362 "type":"record",
2363 "name": "Inner",
2364 "fields": [ {
2365 "name":"z",
2366 "type":"int"
2367 }]
2368 }
2369 },
2370 {
2371 "name":"b",
2372 "type": {
2373 "type":"record",
2374 "name": "InnerWrapper",
2375 "fields": [ {
2376 "name":"j",
2377 "type":"Inner"
2378 }]
2379 }
2380 }
2381 ]
2382 }"#,
2383 )?;
2384
2385 let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
2386 let inner_value2 = Value::Record(vec![(
2387 "j".into(),
2388 Value::Record(vec![("z".into(), Value::Int(6))]),
2389 )]);
2390 let outer_value =
2391 Value::Record(vec![("a".into(), inner_value1), ("b".into(), inner_value2)]);
2392 outer_value.resolve(&schema).expect("Record schema defined in field must be resolvable in Record schema defined in other field");
2393
2394 Ok(())
2395 }
2396
2397 #[test]
2398 fn test_avro_3433_recursive_resolves_map_and_array() -> TestResult {
2399 let schema = Schema::parse_str(
2400 r#"
2401 {
2402 "type":"record",
2403 "name":"TestStruct",
2404 "fields": [
2405 {
2406 "name":"a",
2407 "type":{
2408 "type":"map",
2409 "values": {
2410 "type":"record",
2411 "name": "Inner",
2412 "fields": [ {
2413 "name":"z",
2414 "type":"int"
2415 }]
2416 }
2417 }
2418 },
2419 {
2420 "name":"b",
2421 "type": {
2422 "type":"array",
2423 "items":"Inner"
2424 }
2425 }
2426 ]
2427 }"#,
2428 )?;
2429
2430 let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
2431 let inner_value2 = Value::Record(vec![("z".into(), Value::Int(6))]);
2432 let outer_value = Value::Record(vec![
2433 (
2434 "a".into(),
2435 Value::Map(vec![("akey".into(), inner_value2)].into_iter().collect()),
2436 ),
2437 ("b".into(), Value::Array(vec![inner_value1])),
2438 ]);
2439 outer_value
2440 .resolve(&schema)
2441 .expect("Record defined in map definition must be resolvable from array");
2442
2443 Ok(())
2444 }
2445
2446 #[test]
2447 fn test_avro_3433_recursive_resolves_union() -> TestResult {
2448 let schema = Schema::parse_str(
2449 r#"
2450 {
2451 "type":"record",
2452 "name":"TestStruct",
2453 "fields": [
2454 {
2455 "name":"a",
2456 "type":["null", {
2457 "type":"record",
2458 "name": "Inner",
2459 "fields": [ {
2460 "name":"z",
2461 "type":"int"
2462 }]
2463 }]
2464 },
2465 {
2466 "name":"b",
2467 "type":"Inner"
2468 }
2469 ]
2470 }"#,
2471 )?;
2472
2473 let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
2474 let inner_value2 = Value::Record(vec![("z".into(), Value::Int(6))]);
2475 let outer1 = Value::Record(vec![
2476 ("a".into(), inner_value1),
2477 ("b".into(), inner_value2.clone()),
2478 ]);
2479 outer1
2480 .resolve(&schema)
2481 .expect("Record definition defined in union must be resolved in other field");
2482 let outer2 = Value::Record(vec![("a".into(), Value::Null), ("b".into(), inner_value2)]);
2483 outer2
2484 .resolve(&schema)
2485 .expect("Record definition defined in union must be resolved in other field");
2486
2487 Ok(())
2488 }
2489
2490 #[test]
2491 fn test_avro_3461_test_multi_level_resolve_outer_namespace() -> TestResult {
2492 let schema = r#"
2493 {
2494 "name": "record_name",
2495 "namespace": "space",
2496 "type": "record",
2497 "fields": [
2498 {
2499 "name": "outer_field_1",
2500 "type": [
2501 "null",
2502 {
2503 "type": "record",
2504 "name": "middle_record_name",
2505 "fields":[
2506 {
2507 "name":"middle_field_1",
2508 "type":[
2509 "null",
2510 {
2511 "type":"record",
2512 "name":"inner_record_name",
2513 "fields":[
2514 {
2515 "name":"inner_field_1",
2516 "type":"double"
2517 }
2518 ]
2519 }
2520 ]
2521 }
2522 ]
2523 }
2524 ]
2525 },
2526 {
2527 "name": "outer_field_2",
2528 "type" : "space.inner_record_name"
2529 }
2530 ]
2531 }
2532 "#;
2533 let schema = Schema::parse_str(schema)?;
2534 let inner_record = Value::Record(vec![("inner_field_1".into(), Value::Double(5.4))]);
2535 let middle_record_variation_1 = Value::Record(vec![(
2536 "middle_field_1".into(),
2537 Value::Union(0, Box::new(Value::Null)),
2538 )]);
2539 let middle_record_variation_2 = Value::Record(vec![(
2540 "middle_field_1".into(),
2541 Value::Union(1, Box::new(inner_record.clone())),
2542 )]);
2543 let outer_record_variation_1 = Value::Record(vec![
2544 (
2545 "outer_field_1".into(),
2546 Value::Union(0, Box::new(Value::Null)),
2547 ),
2548 ("outer_field_2".into(), inner_record.clone()),
2549 ]);
2550 let outer_record_variation_2 = Value::Record(vec![
2551 (
2552 "outer_field_1".into(),
2553 Value::Union(1, Box::new(middle_record_variation_1)),
2554 ),
2555 ("outer_field_2".into(), inner_record.clone()),
2556 ]);
2557 let outer_record_variation_3 = Value::Record(vec![
2558 (
2559 "outer_field_1".into(),
2560 Value::Union(1, Box::new(middle_record_variation_2)),
2561 ),
2562 ("outer_field_2".into(), inner_record),
2563 ]);
2564
2565 outer_record_variation_1
2566 .resolve(&schema)
2567 .expect("Should be able to resolve value to the schema that is it's definition");
2568 outer_record_variation_2
2569 .resolve(&schema)
2570 .expect("Should be able to resolve value to the schema that is it's definition");
2571 outer_record_variation_3
2572 .resolve(&schema)
2573 .expect("Should be able to resolve value to the schema that is it's definition");
2574
2575 Ok(())
2576 }
2577
2578 #[test]
2579 fn test_avro_3461_test_multi_level_resolve_middle_namespace() -> TestResult {
2580 let schema = r#"
2581 {
2582 "name": "record_name",
2583 "namespace": "space",
2584 "type": "record",
2585 "fields": [
2586 {
2587 "name": "outer_field_1",
2588 "type": [
2589 "null",
2590 {
2591 "type": "record",
2592 "name": "middle_record_name",
2593 "namespace":"middle_namespace",
2594 "fields":[
2595 {
2596 "name":"middle_field_1",
2597 "type":[
2598 "null",
2599 {
2600 "type":"record",
2601 "name":"inner_record_name",
2602 "fields":[
2603 {
2604 "name":"inner_field_1",
2605 "type":"double"
2606 }
2607 ]
2608 }
2609 ]
2610 }
2611 ]
2612 }
2613 ]
2614 },
2615 {
2616 "name": "outer_field_2",
2617 "type" : "middle_namespace.inner_record_name"
2618 }
2619 ]
2620 }
2621 "#;
2622 let schema = Schema::parse_str(schema)?;
2623 let inner_record = Value::Record(vec![("inner_field_1".into(), Value::Double(5.4))]);
2624 let middle_record_variation_1 = Value::Record(vec![(
2625 "middle_field_1".into(),
2626 Value::Union(0, Box::new(Value::Null)),
2627 )]);
2628 let middle_record_variation_2 = Value::Record(vec![(
2629 "middle_field_1".into(),
2630 Value::Union(1, Box::new(inner_record.clone())),
2631 )]);
2632 let outer_record_variation_1 = Value::Record(vec![
2633 (
2634 "outer_field_1".into(),
2635 Value::Union(0, Box::new(Value::Null)),
2636 ),
2637 ("outer_field_2".into(), inner_record.clone()),
2638 ]);
2639 let outer_record_variation_2 = Value::Record(vec![
2640 (
2641 "outer_field_1".into(),
2642 Value::Union(1, Box::new(middle_record_variation_1)),
2643 ),
2644 ("outer_field_2".into(), inner_record.clone()),
2645 ]);
2646 let outer_record_variation_3 = Value::Record(vec![
2647 (
2648 "outer_field_1".into(),
2649 Value::Union(1, Box::new(middle_record_variation_2)),
2650 ),
2651 ("outer_field_2".into(), inner_record),
2652 ]);
2653
2654 outer_record_variation_1
2655 .resolve(&schema)
2656 .expect("Should be able to resolve value to the schema that is it's definition");
2657 outer_record_variation_2
2658 .resolve(&schema)
2659 .expect("Should be able to resolve value to the schema that is it's definition");
2660 outer_record_variation_3
2661 .resolve(&schema)
2662 .expect("Should be able to resolve value to the schema that is it's definition");
2663
2664 Ok(())
2665 }
2666
2667 #[test]
2668 fn test_avro_3461_test_multi_level_resolve_inner_namespace() -> TestResult {
2669 let schema = r#"
2670 {
2671 "name": "record_name",
2672 "namespace": "space",
2673 "type": "record",
2674 "fields": [
2675 {
2676 "name": "outer_field_1",
2677 "type": [
2678 "null",
2679 {
2680 "type": "record",
2681 "name": "middle_record_name",
2682 "namespace":"middle_namespace",
2683 "fields":[
2684 {
2685 "name":"middle_field_1",
2686 "type":[
2687 "null",
2688 {
2689 "type":"record",
2690 "name":"inner_record_name",
2691 "namespace":"inner_namespace",
2692 "fields":[
2693 {
2694 "name":"inner_field_1",
2695 "type":"double"
2696 }
2697 ]
2698 }
2699 ]
2700 }
2701 ]
2702 }
2703 ]
2704 },
2705 {
2706 "name": "outer_field_2",
2707 "type" : "inner_namespace.inner_record_name"
2708 }
2709 ]
2710 }
2711 "#;
2712 let schema = Schema::parse_str(schema)?;
2713
2714 let inner_record = Value::Record(vec![("inner_field_1".into(), Value::Double(5.4))]);
2715 let middle_record_variation_1 = Value::Record(vec![(
2716 "middle_field_1".into(),
2717 Value::Union(0, Box::new(Value::Null)),
2718 )]);
2719 let middle_record_variation_2 = Value::Record(vec![(
2720 "middle_field_1".into(),
2721 Value::Union(1, Box::new(inner_record.clone())),
2722 )]);
2723 let outer_record_variation_1 = Value::Record(vec![
2724 (
2725 "outer_field_1".into(),
2726 Value::Union(0, Box::new(Value::Null)),
2727 ),
2728 ("outer_field_2".into(), inner_record.clone()),
2729 ]);
2730 let outer_record_variation_2 = Value::Record(vec![
2731 (
2732 "outer_field_1".into(),
2733 Value::Union(1, Box::new(middle_record_variation_1)),
2734 ),
2735 ("outer_field_2".into(), inner_record.clone()),
2736 ]);
2737 let outer_record_variation_3 = Value::Record(vec![
2738 (
2739 "outer_field_1".into(),
2740 Value::Union(1, Box::new(middle_record_variation_2)),
2741 ),
2742 ("outer_field_2".into(), inner_record),
2743 ]);
2744
2745 outer_record_variation_1
2746 .resolve(&schema)
2747 .expect("Should be able to resolve value to the schema that is it's definition");
2748 outer_record_variation_2
2749 .resolve(&schema)
2750 .expect("Should be able to resolve value to the schema that is it's definition");
2751 outer_record_variation_3
2752 .resolve(&schema)
2753 .expect("Should be able to resolve value to the schema that is it's definition");
2754
2755 Ok(())
2756 }
2757
2758 #[test]
2759 fn test_avro_3460_validation_with_refs() -> TestResult {
2760 let schema = Schema::parse_str(
2761 r#"
2762 {
2763 "type":"record",
2764 "name":"TestStruct",
2765 "fields": [
2766 {
2767 "name":"a",
2768 "type":{
2769 "type":"record",
2770 "name": "Inner",
2771 "fields": [ {
2772 "name":"z",
2773 "type":"int"
2774 }]
2775 }
2776 },
2777 {
2778 "name":"b",
2779 "type":"Inner"
2780 }
2781 ]
2782 }"#,
2783 )?;
2784
2785 let inner_value_right = Value::Record(vec![("z".into(), Value::Int(3))]);
2786 let inner_value_wrong1 = Value::Record(vec![("z".into(), Value::Null)]);
2787 let inner_value_wrong2 = Value::Record(vec![("a".into(), Value::String("testing".into()))]);
2788 let outer1 = Value::Record(vec![
2789 ("a".into(), inner_value_right.clone()),
2790 ("b".into(), inner_value_wrong1),
2791 ]);
2792
2793 let outer2 = Value::Record(vec![
2794 ("a".into(), inner_value_right),
2795 ("b".into(), inner_value_wrong2),
2796 ]);
2797
2798 assert!(
2799 !outer1.validate(&schema),
2800 "field b record is invalid against the schema"
2801 ); assert!(
2803 !outer2.validate(&schema),
2804 "field b record is invalid against the schema"
2805 ); Ok(())
2808 }
2809
2810 #[test]
2811 fn test_avro_3460_validation_with_refs_real_struct() -> TestResult {
2812 use crate::serde::ser::Serializer;
2813 use serde::Serialize;
2814
2815 #[derive(Serialize, Clone)]
2816 struct TestInner {
2817 z: i32,
2818 }
2819
2820 #[derive(Serialize)]
2821 struct TestRefSchemaStruct1 {
2822 a: TestInner,
2823 b: String, }
2825
2826 #[derive(Serialize)]
2827 struct TestRefSchemaStruct2 {
2828 a: TestInner,
2829 b: i32, }
2831
2832 #[derive(Serialize)]
2833 struct TestRefSchemaStruct3 {
2834 a: TestInner,
2835 b: Option<TestInner>, }
2837
2838 let schema = Schema::parse_str(
2839 r#"
2840 {
2841 "type":"record",
2842 "name":"TestStruct",
2843 "fields": [
2844 {
2845 "name":"a",
2846 "type":{
2847 "type":"record",
2848 "name": "Inner",
2849 "fields": [ {
2850 "name":"z",
2851 "type":"int"
2852 }]
2853 }
2854 },
2855 {
2856 "name":"b",
2857 "type":"Inner"
2858 }
2859 ]
2860 }"#,
2861 )?;
2862
2863 let test_inner = TestInner { z: 3 };
2864 let test_outer1 = TestRefSchemaStruct1 {
2865 a: test_inner.clone(),
2866 b: "testing".into(),
2867 };
2868 let test_outer2 = TestRefSchemaStruct2 {
2869 a: test_inner.clone(),
2870 b: 24,
2871 };
2872 let test_outer3 = TestRefSchemaStruct3 {
2873 a: test_inner,
2874 b: None,
2875 };
2876
2877 let mut ser = Serializer::default();
2878 let test_outer1: Value = test_outer1.serialize(&mut ser)?;
2879 let mut ser = Serializer::default();
2880 let test_outer2: Value = test_outer2.serialize(&mut ser)?;
2881 let mut ser = Serializer::default();
2882 let test_outer3: Value = test_outer3.serialize(&mut ser)?;
2883
2884 assert!(
2885 !test_outer1.validate(&schema),
2886 "field b record is invalid against the schema"
2887 );
2888 assert!(
2889 !test_outer2.validate(&schema),
2890 "field b record is invalid against the schema"
2891 );
2892 assert!(
2893 !test_outer3.validate(&schema),
2894 "field b record is invalid against the schema"
2895 );
2896
2897 Ok(())
2898 }
2899
2900 fn avro_3674_with_or_without_namespace(with_namespace: bool) -> TestResult {
2901 use crate::serde::ser::Serializer;
2902 use serde::Serialize;
2903
2904 let schema_str = r#"
2905 {
2906 "type": "record",
2907 "name": "NamespacedMessage",
2908 [NAMESPACE]
2909 "fields": [
2910 {
2911 "name": "field_a",
2912 "type": {
2913 "type": "record",
2914 "name": "NestedMessage",
2915 "fields": [
2916 {
2917 "name": "enum_a",
2918 "type": {
2919 "type": "enum",
2920 "name": "EnumType",
2921 "symbols": ["SYMBOL_1", "SYMBOL_2"],
2922 "default": "SYMBOL_1"
2923 }
2924 },
2925 {
2926 "name": "enum_b",
2927 "type": "EnumType"
2928 }
2929 ]
2930 }
2931 }
2932 ]
2933 }
2934 "#;
2935 let schema_str = schema_str.replace(
2936 "[NAMESPACE]",
2937 if with_namespace {
2938 r#""namespace": "com.domain","#
2939 } else {
2940 ""
2941 },
2942 );
2943
2944 let schema = Schema::parse_str(&schema_str)?;
2945
2946 #[derive(Serialize)]
2947 enum EnumType {
2948 #[serde(rename = "SYMBOL_1")]
2949 Symbol1,
2950 #[serde(rename = "SYMBOL_2")]
2951 Symbol2,
2952 }
2953
2954 #[derive(Serialize)]
2955 struct FieldA {
2956 enum_a: EnumType,
2957 enum_b: EnumType,
2958 }
2959
2960 #[derive(Serialize)]
2961 struct NamespacedMessage {
2962 field_a: FieldA,
2963 }
2964
2965 let msg = NamespacedMessage {
2966 field_a: FieldA {
2967 enum_a: EnumType::Symbol2,
2968 enum_b: EnumType::Symbol1,
2969 },
2970 };
2971
2972 let mut ser = Serializer::default();
2973 let test_value: Value = msg.serialize(&mut ser)?;
2974 assert!(test_value.validate(&schema), "test_value should validate");
2975 assert!(
2976 test_value.resolve(&schema).is_ok(),
2977 "test_value should resolve"
2978 );
2979
2980 Ok(())
2981 }
2982
2983 #[test]
2984 fn test_avro_3674_validate_no_namespace_resolution() -> TestResult {
2985 avro_3674_with_or_without_namespace(false)
2986 }
2987
2988 #[test]
2989 fn test_avro_3674_validate_with_namespace_resolution() -> TestResult {
2990 avro_3674_with_or_without_namespace(true)
2991 }
2992
2993 fn avro_3688_schema_resolution_panic(set_field_b: bool) -> TestResult {
2994 use crate::serde::ser::Serializer;
2995 use serde::{Deserialize, Serialize};
2996
2997 let schema_str = r#"{
2998 "type": "record",
2999 "name": "Message",
3000 "fields": [
3001 {
3002 "name": "field_a",
3003 "type": [
3004 "null",
3005 {
3006 "name": "Inner",
3007 "type": "record",
3008 "fields": [
3009 {
3010 "name": "inner_a",
3011 "type": "string"
3012 }
3013 ]
3014 }
3015 ],
3016 "default": null
3017 },
3018 {
3019 "name": "field_b",
3020 "type": [
3021 "null",
3022 "Inner"
3023 ],
3024 "default": null
3025 }
3026 ]
3027 }"#;
3028
3029 #[derive(Serialize, Deserialize)]
3030 struct Inner {
3031 inner_a: String,
3032 }
3033
3034 #[derive(Serialize, Deserialize)]
3035 struct Message {
3036 field_a: Option<Inner>,
3037 field_b: Option<Inner>,
3038 }
3039
3040 let schema = Schema::parse_str(schema_str)?;
3041
3042 let msg = Message {
3043 field_a: Some(Inner {
3044 inner_a: "foo".to_string(),
3045 }),
3046 field_b: if set_field_b {
3047 Some(Inner {
3048 inner_a: "bar".to_string(),
3049 })
3050 } else {
3051 None
3052 },
3053 };
3054
3055 let mut ser = Serializer::default();
3056 let test_value: Value = msg.serialize(&mut ser)?;
3057 assert!(test_value.validate(&schema), "test_value should validate");
3058 assert!(
3059 test_value.resolve(&schema).is_ok(),
3060 "test_value should resolve"
3061 );
3062
3063 Ok(())
3064 }
3065
3066 #[test]
3067 fn test_avro_3688_field_b_not_set() -> TestResult {
3068 avro_3688_schema_resolution_panic(false)
3069 }
3070
3071 #[test]
3072 fn test_avro_3688_field_b_set() -> TestResult {
3073 avro_3688_schema_resolution_panic(true)
3074 }
3075
3076 #[test]
3077 fn test_avro_3764_use_resolve_schemata() -> TestResult {
3078 let referenced_schema =
3079 r#"{"name": "enumForReference", "type": "enum", "symbols": ["A", "B"]}"#;
3080 let main_schema = r#"{"name": "recordWithReference", "type": "record", "fields": [{"name": "reference", "type": "enumForReference"}]}"#;
3081
3082 let value: serde_json::Value = serde_json::from_str(
3083 r#"
3084 {
3085 "reference": "A"
3086 }
3087 "#,
3088 )?;
3089
3090 let avro_value = Value::from(value);
3091
3092 let schemas = Schema::parse_list([main_schema, referenced_schema])?;
3093
3094 let main_schema = schemas.first().unwrap();
3095 let schemata: Vec<_> = schemas.iter().skip(1).collect();
3096
3097 let resolve_result = avro_value.clone().resolve_schemata(main_schema, schemata);
3098
3099 assert!(
3100 resolve_result.is_ok(),
3101 "result of resolving with schemata should be ok, got: {resolve_result:?}"
3102 );
3103
3104 let resolve_result = avro_value.resolve(main_schema);
3105 assert!(
3106 resolve_result.is_err(),
3107 "result of resolving without schemata should be err, got: {resolve_result:?}"
3108 );
3109
3110 Ok(())
3111 }
3112
3113 #[test]
3114 fn test_avro_3767_union_resolve_complex_refs() -> TestResult {
3115 let referenced_enum =
3116 r#"{"name": "enumForReference", "type": "enum", "symbols": ["A", "B"]}"#;
3117 let referenced_record = r#"{"name": "recordForReference", "type": "record", "fields": [{"name": "refInRecord", "type": "enumForReference"}]}"#;
3118 let main_schema = r#"{"name": "recordWithReference", "type": "record", "fields": [{"name": "reference", "type": ["null", "recordForReference"]}]}"#;
3119
3120 let value: serde_json::Value = serde_json::from_str(
3121 r#"
3122 {
3123 "reference": {
3124 "refInRecord": "A"
3125 }
3126 }
3127 "#,
3128 )?;
3129
3130 let avro_value = Value::from(value);
3131
3132 let schemata = Schema::parse_list([referenced_enum, referenced_record, main_schema])?;
3133
3134 let main_schema = schemata.last().unwrap();
3135 let other_schemata: Vec<&Schema> = schemata.iter().take(2).collect();
3136
3137 let resolve_result = avro_value.resolve_schemata(main_schema, other_schemata);
3138
3139 assert!(
3140 resolve_result.is_ok(),
3141 "result of resolving with schemata should be ok, got: {resolve_result:?}"
3142 );
3143
3144 assert!(
3145 resolve_result?.validate_schemata(schemata.iter().collect()),
3146 "result of validation with schemata should be true"
3147 );
3148
3149 Ok(())
3150 }
3151
3152 #[test]
3153 fn test_avro_3782_incorrect_decimal_resolving() -> TestResult {
3154 let schema = r#"{"name": "decimalSchema", "logicalType": "decimal", "type": "fixed", "precision": 8, "scale": 0, "size": 8}"#;
3155
3156 let avro_value = Value::Decimal(Decimal::from(
3157 BigInt::from(12345678u32).to_signed_bytes_be(),
3158 ));
3159 let schema = Schema::parse_str(schema)?;
3160 let resolve_result = avro_value.resolve(&schema);
3161 assert!(
3162 resolve_result.is_ok(),
3163 "resolve result must be ok, got: {resolve_result:?}"
3164 );
3165
3166 Ok(())
3167 }
3168
3169 #[test]
3170 fn test_avro_3779_bigdecimal_resolving() -> TestResult {
3171 let schema =
3172 r#"{"name": "bigDecimalSchema", "logicalType": "big-decimal", "type": "bytes" }"#;
3173
3174 let avro_value = Value::BigDecimal(BigDecimal::from(12345678u32));
3175 let schema = Schema::parse_str(schema)?;
3176 let resolve_result: AvroResult<Value> = avro_value.resolve(&schema);
3177 assert!(
3178 resolve_result.is_ok(),
3179 "resolve result must be ok, got: {resolve_result:?}"
3180 );
3181
3182 Ok(())
3183 }
3184
3185 #[test]
3186 fn test_avro_3892_resolve_fixed_from_bytes() -> TestResult {
3187 let value = Value::Bytes(vec![97, 98, 99]);
3188 assert_eq!(
3189 value.resolve(&Schema::Fixed(FixedSchema {
3190 name: "test".into(),
3191 aliases: None,
3192 doc: None,
3193 size: 3,
3194 default: None,
3195 attributes: Default::default()
3196 }))?,
3197 Value::Fixed(3, vec![97, 98, 99])
3198 );
3199
3200 let value = Value::Bytes(vec![97, 99]);
3201 assert!(
3202 value
3203 .resolve(&Schema::Fixed(FixedSchema {
3204 name: "test".into(),
3205 aliases: None,
3206 doc: None,
3207 size: 3,
3208 default: None,
3209 attributes: Default::default()
3210 }))
3211 .is_err(),
3212 );
3213
3214 let value = Value::Bytes(vec![97, 98, 99, 100]);
3215 assert!(
3216 value
3217 .resolve(&Schema::Fixed(FixedSchema {
3218 name: "test".into(),
3219 aliases: None,
3220 doc: None,
3221 size: 3,
3222 default: None,
3223 attributes: Default::default()
3224 }))
3225 .is_err(),
3226 );
3227
3228 Ok(())
3229 }
3230
3231 #[test]
3232 fn avro_3928_from_serde_value_to_types_value() {
3233 assert_eq!(Value::from(serde_json::Value::Null), Value::Null);
3234 assert_eq!(Value::from(json!(true)), Value::Boolean(true));
3235 assert_eq!(Value::from(json!(false)), Value::Boolean(false));
3236 assert_eq!(Value::from(json!(0)), Value::Int(0));
3237 assert_eq!(Value::from(json!(i32::MIN)), Value::Int(i32::MIN));
3238 assert_eq!(Value::from(json!(i32::MAX)), Value::Int(i32::MAX));
3239 assert_eq!(
3240 Value::from(json!(i32::MIN as i64 - 1)),
3241 Value::Long(i32::MIN as i64 - 1)
3242 );
3243 assert_eq!(
3244 Value::from(json!(i32::MAX as i64 + 1)),
3245 Value::Long(i32::MAX as i64 + 1)
3246 );
3247 assert_eq!(Value::from(json!(1.23)), Value::Double(1.23));
3248 assert_eq!(Value::from(json!(-1.23)), Value::Double(-1.23));
3249 assert_eq!(Value::from(json!(u64::MIN)), Value::Int(u64::MIN as i32));
3250 assert_eq!(Value::from(json!(u64::MAX)), Value::Long(u64::MAX as i64));
3251 assert_eq!(
3252 Value::from(json!("some text")),
3253 Value::String("some text".into())
3254 );
3255 assert_eq!(
3256 Value::from(json!(["text1", "text2", "text3"])),
3257 Value::Array(vec![
3258 Value::String("text1".into()),
3259 Value::String("text2".into()),
3260 Value::String("text3".into())
3261 ])
3262 );
3263 assert_eq!(
3264 Value::from(json!({"key1": "value1", "key2": "value2"})),
3265 Value::Map(
3266 vec![
3267 ("key1".into(), Value::String("value1".into())),
3268 ("key2".into(), Value::String("value2".into()))
3269 ]
3270 .into_iter()
3271 .collect()
3272 )
3273 );
3274 }
3275
3276 #[test]
3277 fn avro_4024_resolve_double_from_unknown_string_err() -> TestResult {
3278 let schema = Schema::parse_str(r#"{"type": "double"}"#)?;
3279 let value = Value::String("unknown".to_owned());
3280 match value.resolve(&schema).map_err(Error::into_details) {
3281 Err(err @ Details::GetDouble(_)) => {
3282 assert_eq!(
3283 format!("{err:?}"),
3284 r#"Expected Value::Double, Value::Float, Value::Int, Value::Long or Value::String ("NaN", "INF", "Infinity", "-INF" or "-Infinity"), got: String("unknown")"#
3285 );
3286 }
3287 other => {
3288 panic!("Expected Details::GetDouble, got {other:?}");
3289 }
3290 }
3291 Ok(())
3292 }
3293
3294 #[test]
3295 fn avro_4024_resolve_float_from_unknown_string_err() -> TestResult {
3296 let schema = Schema::parse_str(r#"{"type": "float"}"#)?;
3297 let value = Value::String("unknown".to_owned());
3298 match value.resolve(&schema).map_err(Error::into_details) {
3299 Err(err @ Details::GetFloat(_)) => {
3300 assert_eq!(
3301 format!("{err:?}"),
3302 r#"Expected Value::Float, Value::Double, Value::Int, Value::Long or Value::String ("NaN", "INF", "Infinity", "-INF" or "-Infinity"), got: String("unknown")"#
3303 );
3304 }
3305 other => {
3306 panic!("Expected Details::GetFloat, got {other:?}");
3307 }
3308 }
3309 Ok(())
3310 }
3311
3312 #[test]
3313 fn avro_4029_resolve_from_unsupported_err() -> TestResult {
3314 let data: Vec<(&str, Value, &str)> = vec![
3315 (
3316 r#"{ "name": "NAME", "type": "int" }"#,
3317 Value::Float(123_f32),
3318 "Expected Value::Int, got: Float(123.0)",
3319 ),
3320 (
3321 r#"{ "name": "NAME", "type": "fixed", "size": 3 }"#,
3322 Value::Float(123_f32),
3323 "String expected for fixed, got: Float(123.0)",
3324 ),
3325 (
3326 r#"{ "name": "NAME", "type": "bytes" }"#,
3327 Value::Float(123_f32),
3328 "Expected Value::Bytes, got: Float(123.0)",
3329 ),
3330 (
3331 r#"{ "name": "NAME", "type": "string", "logicalType": "uuid" }"#,
3332 Value::String("abc-1234".into()),
3333 "Failed to convert &str to UUID: invalid group count: expected 5, found 2",
3334 ),
3335 (
3336 r#"{ "name": "NAME", "type": "string", "logicalType": "uuid" }"#,
3337 Value::Float(123_f32),
3338 "Expected Value::Uuid, got: Float(123.0)",
3339 ),
3340 (
3341 r#"{ "name": "NAME", "type": "bytes", "logicalType": "big-decimal" }"#,
3342 Value::Float(123_f32),
3343 "Expected Value::BigDecimal, got: Float(123.0)",
3344 ),
3345 (
3346 r#"{ "name": "NAME", "type": "fixed", "size": 12, "logicalType": "duration" }"#,
3347 Value::Float(123_f32),
3348 "Expected Value::Duration or Value::Fixed(12), got: Float(123.0)",
3349 ),
3350 (
3351 r#"{ "name": "NAME", "type": "bytes", "logicalType": "decimal", "precision": 4, "scale": 3 }"#,
3352 Value::Float(123_f32),
3353 "Expected Value::Decimal, Value::Bytes or Value::Fixed, got: Float(123.0)",
3354 ),
3355 (
3356 r#"{ "name": "NAME", "type": "bytes" }"#,
3357 Value::Array(vec![Value::Long(256_i64)]),
3358 "Unable to convert to u8, got Int(256)",
3359 ),
3360 (
3361 r#"{ "name": "NAME", "type": "int", "logicalType": "date" }"#,
3362 Value::Float(123_f32),
3363 "Expected Value::Date or Value::Int, got: Float(123.0)",
3364 ),
3365 (
3366 r#"{ "name": "NAME", "type": "int", "logicalType": "time-millis" }"#,
3367 Value::Float(123_f32),
3368 "Expected Value::TimeMillis or Value::Int, got: Float(123.0)",
3369 ),
3370 (
3371 r#"{ "name": "NAME", "type": "long", "logicalType": "time-micros" }"#,
3372 Value::Float(123_f32),
3373 "Expected Value::TimeMicros, Value::Long or Value::Int, got: Float(123.0)",
3374 ),
3375 (
3376 r#"{ "name": "NAME", "type": "long", "logicalType": "timestamp-millis" }"#,
3377 Value::Float(123_f32),
3378 "Expected Value::TimestampMillis, Value::Long or Value::Int, got: Float(123.0)",
3379 ),
3380 (
3381 r#"{ "name": "NAME", "type": "long", "logicalType": "timestamp-micros" }"#,
3382 Value::Float(123_f32),
3383 "Expected Value::TimestampMicros, Value::Long or Value::Int, got: Float(123.0)",
3384 ),
3385 (
3386 r#"{ "name": "NAME", "type": "long", "logicalType": "timestamp-nanos" }"#,
3387 Value::Float(123_f32),
3388 "Expected Value::TimestampNanos, Value::Long or Value::Int, got: Float(123.0)",
3389 ),
3390 (
3391 r#"{ "name": "NAME", "type": "long", "logicalType": "local-timestamp-millis" }"#,
3392 Value::Float(123_f32),
3393 "Expected Value::LocalTimestampMillis, Value::Long or Value::Int, got: Float(123.0)",
3394 ),
3395 (
3396 r#"{ "name": "NAME", "type": "long", "logicalType": "local-timestamp-micros" }"#,
3397 Value::Float(123_f32),
3398 "Expected Value::LocalTimestampMicros, Value::Long or Value::Int, got: Float(123.0)",
3399 ),
3400 (
3401 r#"{ "name": "NAME", "type": "long", "logicalType": "local-timestamp-nanos" }"#,
3402 Value::Float(123_f32),
3403 "Expected Value::LocalTimestampNanos, Value::Long or Value::Int, got: Float(123.0)",
3404 ),
3405 (
3406 r#"{ "name": "NAME", "type": "null" }"#,
3407 Value::Float(123_f32),
3408 "Expected Value::Null, got: Float(123.0)",
3409 ),
3410 (
3411 r#"{ "name": "NAME", "type": "boolean" }"#,
3412 Value::Float(123_f32),
3413 "Expected Value::Boolean, got: Float(123.0)",
3414 ),
3415 (
3416 r#"{ "name": "NAME", "type": "int" }"#,
3417 Value::Float(123_f32),
3418 "Expected Value::Int, got: Float(123.0)",
3419 ),
3420 (
3421 r#"{ "name": "NAME", "type": "long" }"#,
3422 Value::Float(123_f32),
3423 "Expected Value::Long or Value::Int, got: Float(123.0)",
3424 ),
3425 (
3426 r#"{ "name": "NAME", "type": "float" }"#,
3427 Value::Boolean(false),
3428 r#"Expected Value::Float, Value::Double, Value::Int, Value::Long or Value::String ("NaN", "INF", "Infinity", "-INF" or "-Infinity"), got: Boolean(false)"#,
3429 ),
3430 (
3431 r#"{ "name": "NAME", "type": "double" }"#,
3432 Value::Boolean(false),
3433 r#"Expected Value::Double, Value::Float, Value::Int, Value::Long or Value::String ("NaN", "INF", "Infinity", "-INF" or "-Infinity"), got: Boolean(false)"#,
3434 ),
3435 (
3436 r#"{ "name": "NAME", "type": "string" }"#,
3437 Value::Boolean(false),
3438 "Expected Value::String, Value::Bytes or Value::Fixed, got: Boolean(false)",
3439 ),
3440 (
3441 r#"{ "name": "NAME", "type": "enum", "symbols": ["one", "two"] }"#,
3442 Value::Boolean(false),
3443 "Expected Value::Enum, got: Boolean(false)",
3444 ),
3445 ];
3446
3447 for (schema_str, value, expected_error) in data {
3448 let schema = Schema::parse_str(schema_str)?;
3449 match value.resolve(&schema) {
3450 Err(error) => {
3451 assert_eq!(format!("{error}"), expected_error);
3452 }
3453 other => {
3454 panic!("Expected '{expected_error}', got {other:?}");
3455 }
3456 }
3457 }
3458 Ok(())
3459 }
3460
3461 #[test]
3462 fn avro_rs_130_get_from_record() -> TestResult {
3463 let schema = r#"
3464 {
3465 "type": "record",
3466 "name": "NamespacedMessage",
3467 "namespace": "space",
3468 "fields": [
3469 {
3470 "name": "foo",
3471 "type": "string"
3472 },
3473 {
3474 "name": "bar",
3475 "type": "long"
3476 }
3477 ]
3478 }
3479 "#;
3480
3481 let schema = Schema::parse_str(schema)?;
3482 let mut record = Record::new(&schema).unwrap();
3483 record.put("foo", "hello");
3484 record.put("bar", 123_i64);
3485
3486 assert_eq!(
3487 record.get("foo").unwrap(),
3488 &Value::String("hello".to_string())
3489 );
3490 assert_eq!(record.get("bar").unwrap(), &Value::Long(123));
3491
3492 assert_eq!(record.get("baz"), None);
3494
3495 Ok(())
3496 }
3497
3498 #[test]
3499 fn avro_rs_392_resolve_long_to_int() {
3500 let value = Value::Long(0);
3502 value.resolve(&Schema::Int).unwrap();
3503 let value = Value::Long(i64::MAX);
3505 assert!(matches!(
3506 value.resolve(&Schema::Int).unwrap_err().details(),
3507 Details::ZagI32(_, _)
3508 ));
3509 }
3510}