1use std::collections::BTreeMap;
27use std::fmt;
28
29use chrono::NaiveDateTime;
30use enum_kinds::EnumKind;
31use serde_json::Value as JsonValue;
32
33use crate::schema::{RecordField, SchemaNode, SchemaPiece, SchemaPieceOrNamed};
34
35#[derive(Clone, Debug, Eq, PartialEq)]
37pub struct SchemaResolutionError(pub String);
38
39impl SchemaResolutionError {
40 pub fn new<S>(msg: S) -> SchemaResolutionError
41 where
42 S: Into<String>,
43 {
44 SchemaResolutionError(msg.into())
45 }
46}
47
48impl fmt::Display for SchemaResolutionError {
49 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
50 self.0.fmt(f)
51 }
52}
53
54impl std::error::Error for SchemaResolutionError {}
55
56#[derive(Clone, Debug, PartialEq)]
57pub struct DecimalValue {
58 pub unscaled: Vec<u8>,
60 pub precision: usize,
61 pub scale: usize,
62}
63
64#[derive(Clone, Copy, Debug, PartialEq, EnumKind)] #[enum_kind(ScalarKind)]
66pub enum Scalar {
67 Null,
68 Boolean(bool),
69 Int(i32),
70 Long(i64),
71 Float(f32),
72 Double(f64),
73 Date(i32),
74 Timestamp(NaiveDateTime),
75}
76
77impl From<Scalar> for Value {
78 fn from(s: Scalar) -> Self {
79 match s {
80 Scalar::Null => Value::Null,
81 Scalar::Boolean(v) => Value::Boolean(v),
82 Scalar::Int(v) => Value::Int(v),
83 Scalar::Long(v) => Value::Long(v),
84 Scalar::Float(v) => Value::Float(v),
85 Scalar::Double(v) => Value::Double(v),
86 Scalar::Date(v) => Value::Date(v),
87 Scalar::Timestamp(v) => Value::Timestamp(v),
88 }
89 }
90}
91
92#[derive(Clone, Debug, PartialEq)]
96pub enum Value {
97 Null,
100 Boolean(bool),
102 Int(i32),
104 Long(i64),
106 Float(f32),
108 Double(f64),
110 Date(i32),
113 Timestamp(NaiveDateTime),
115
116 Decimal(DecimalValue),
122 Bytes(Vec<u8>),
124 String(String),
126 Fixed(usize, Vec<u8>),
129 Enum(usize, String),
136 Union {
138 index: usize,
140 inner: Box<Value>,
142 n_variants: usize,
145 null_variant: Option<usize>,
147 },
148 Array(Vec<Value>),
150 Map(BTreeMap<String, Value>),
152 Record(Vec<(String, Value)>),
159 Json(serde_json::Value),
164 Uuid(uuid::Uuid),
166}
167
168pub trait ToAvro {
171 fn avro(self) -> Value;
173}
174
175macro_rules! to_avro(
176 ($t:ty, $v:expr) => (
177 impl ToAvro for $t {
178 fn avro(self) -> Value {
179 $v(self)
180 }
181 }
182 );
183);
184
185to_avro!(bool, Value::Boolean);
186to_avro!(i32, Value::Int);
187to_avro!(i64, Value::Long);
188to_avro!(f32, Value::Float);
189to_avro!(f64, Value::Double);
190to_avro!(String, Value::String);
191
192impl ToAvro for () {
193 fn avro(self) -> Value {
194 Value::Null
195 }
196}
197
198impl ToAvro for usize {
199 fn avro(self) -> Value {
200 (self as i64).avro()
201 }
202}
203
204impl<'a> ToAvro for &'a str {
205 fn avro(self) -> Value {
206 Value::String(self.to_owned())
207 }
208}
209
210impl<'a> ToAvro for &'a [u8] {
211 fn avro(self) -> Value {
212 Value::Bytes(self.to_owned())
213 }
214}
215
216impl<T> ToAvro for BTreeMap<String, T>
217where
218 T: ToAvro,
219{
220 fn avro(self) -> Value {
221 Value::Map(
222 self.into_iter()
223 .map(|(key, value)| (key, value.avro()))
224 .collect::<_>(),
225 )
226 }
227}
228
229impl<'a, T> ToAvro for BTreeMap<&'a str, T>
230where
231 T: ToAvro,
232{
233 fn avro(self) -> Value {
234 Value::Map(
235 self.into_iter()
236 .map(|(key, value)| (key.to_owned(), value.avro()))
237 .collect::<_>(),
238 )
239 }
240}
241
242impl ToAvro for Value {
243 fn avro(self) -> Value {
244 self
245 }
246}
247
248impl Default for Value {
259 fn default() -> Self {
260 Value::Null
261 }
262}
263
264#[derive(Debug, Clone)]
266pub struct Record<'a> {
267 pub fields: Vec<(String, Value)>,
271 schema_lookup: &'a BTreeMap<String, usize>,
272 schema_fields: &'a Vec<RecordField>,
273}
274
275impl<'a> Record<'a> {
276 pub fn new(schema: SchemaNode<'a>) -> Option<Record<'a>> {
280 let ret = match schema.inner {
281 SchemaPiece::Record {
282 fields: schema_fields,
283 lookup: schema_lookup,
284 ..
285 } => {
286 let mut fields = Vec::with_capacity(schema_fields.len());
287 for schema_field in schema_fields.iter() {
288 fields.push((schema_field.name.clone(), Value::Null));
289 }
290
291 Some(Record {
292 fields,
293 schema_lookup,
294 schema_fields,
295 })
296 }
297 _ => None,
298 };
299 ret
300 }
301
302 pub fn put<V>(&mut self, field: &str, value: V)
308 where
309 V: ToAvro,
310 {
311 if let Some(&position) = self.schema_lookup.get(field) {
312 self.fields[position].1 = value.avro()
313 }
314 }
315
316 pub fn field_by_name(&self, name: &str) -> Option<&'a RecordField> {
318 self.schema_lookup
319 .get(name)
320 .map(|idx| &self.schema_fields[*idx])
321 }
322}
323
324impl<'a> ToAvro for Record<'a> {
325 fn avro(self) -> Value {
326 Value::Record(self.fields)
327 }
328}
329
330impl ToAvro for JsonValue {
331 fn avro(self) -> Value {
332 match self {
333 JsonValue::Null => Value::Null,
334 JsonValue::Bool(b) => Value::Boolean(b),
335 JsonValue::Number(ref n) if n.is_i64() => Value::Long(n.as_i64().unwrap()),
336 JsonValue::Number(ref n) if n.is_f64() => Value::Double(n.as_f64().unwrap()),
337 JsonValue::Number(n) => Value::Long(n.as_u64().unwrap() as i64), JsonValue::String(s) => Value::String(s),
339 JsonValue::Array(items) => {
340 Value::Array(items.into_iter().map(|item| item.avro()).collect::<_>())
341 }
342 JsonValue::Object(items) => Value::Map(
343 items
344 .into_iter()
345 .map(|(key, value)| (key, value.avro()))
346 .collect::<_>(),
347 ),
348 }
349 }
350}
351
352impl Value {
353 pub fn validate(&self, schema: SchemaNode) -> bool {
358 match (self, schema.inner) {
359 (&Value::Null, SchemaPiece::Null) => true,
360 (&Value::Boolean(_), SchemaPiece::Boolean) => true,
361 (&Value::Int(_), SchemaPiece::Int) => true,
362 (&Value::Long(_), SchemaPiece::Long) => true,
363 (&Value::Float(_), SchemaPiece::Float) => true,
364 (&Value::Double(_), SchemaPiece::Double) => true,
365 (&Value::Date(_), SchemaPiece::Date) => true,
366 (&Value::Timestamp(_), SchemaPiece::TimestampMicro) => true,
367 (&Value::Timestamp(_), SchemaPiece::TimestampMilli) => true,
368 (
369 &Value::Decimal(DecimalValue {
370 precision: vp,
371 scale: vs,
372 ..
373 }),
374 SchemaPiece::Decimal {
375 precision: sp,
376 scale: ss,
377 fixed_size: _,
378 },
379 ) => vp == *sp && vs == *ss,
380 (&Value::Bytes(_), SchemaPiece::Bytes) => true,
381 (&Value::String(_), SchemaPiece::String) => true,
382 (&Value::Fixed(n, _), SchemaPiece::Fixed { size }) => n == *size,
383 (&Value::String(ref s), SchemaPiece::Enum { symbols, .. }) => symbols.contains(s),
384 (&Value::Enum(i, ref s), SchemaPiece::Enum { symbols, .. }) => {
385 symbols.get(i).map(|symbol| symbol == s).unwrap_or(false)
386 }
387 (
388 &Value::Union {
389 index,
390 ref inner,
391 n_variants,
392 null_variant,
393 },
394 SchemaPiece::Union(schema_inner),
395 ) => {
396 schema_inner.variants().len() > index
397 && n_variants == schema_inner.variants().len()
398 && inner.validate(schema.step(&schema_inner.variants()[index]))
399 && match null_variant {
400 None => !schema_inner
401 .variants()
402 .iter()
403 .any(|v| v == &SchemaPieceOrNamed::Piece(SchemaPiece::Null)),
404 Some(null_variant_idx) => {
405 schema_inner.variants().get(null_variant_idx)
406 == Some(&SchemaPieceOrNamed::Piece(SchemaPiece::Null))
407 }
408 }
409 }
410 (&Value::Array(ref items), SchemaPiece::Array(inner)) => {
411 let node = schema.step(&**inner);
412 items.iter().all(|item| item.validate(node))
413 }
414 (&Value::Map(ref items), SchemaPiece::Map(inner)) => {
415 let node = schema.step(&**inner);
416 items.iter().all(|(_, value)| value.validate(node))
417 }
418 (&Value::Record(ref record_fields), SchemaPiece::Record { fields, .. }) => {
419 fields.len() == record_fields.len()
420 && fields.iter().zip(record_fields.iter()).all(
421 |(field, &(ref name, ref value))| {
422 let node = schema.step(&field.schema);
423 field.name == *name && value.validate(node)
424 },
425 )
426 }
427 (Value::Json(_), SchemaPiece::Json) => true,
428 (Value::Uuid(_), SchemaPiece::Uuid) => true,
429 _ => false,
430 }
431 }
432
433 pub fn into_string(self) -> Option<String> {
435 match self {
436 Value::String(s) => Some(s),
437 _ => None,
438 }
439 }
440
441 pub fn into_nullable_bool(self) -> Option<bool> {
442 match self {
443 Value::Boolean(b) => Some(b),
444 Value::Union { inner, .. } => inner.into_nullable_bool(),
445 _ => None,
446 }
447 }
448
449 pub fn into_integral(self) -> Option<i64> {
450 match self {
451 Value::Int(i) => Some(i as i64),
452 Value::Long(l) => Some(l),
453 _ => None,
454 }
455 }
456
457 pub fn into_usize(self) -> Option<usize> {
458 self.into_integral().and_then(|i| i.try_into().ok())
459 }
460}
461
462#[cfg(test)]
463mod tests {
464 use std::str::FromStr;
465
466 use crate::Schema;
467
468 use super::*;
469
470 #[mz_ore::test]
471 fn validate() {
472 let value_schema_valid = vec![
473 (Value::Int(42), "\"int\"", true),
474 (Value::Int(42), "\"boolean\"", false),
475 (
476 Value::Union {
477 index: 0,
478 inner: Box::new(Value::Null),
479 n_variants: 2,
480 null_variant: Some(0),
481 },
482 r#"["null", "int"]"#,
483 true,
484 ),
485 (
486 Value::Union {
487 index: 1,
488 inner: Box::new(Value::Int(42)),
489 n_variants: 2,
490 null_variant: Some(0),
491 },
492 r#"["null", "int"]"#,
493 true,
494 ),
495 (
496 Value::Union {
497 index: 1,
498 inner: Box::new(Value::Null),
499 n_variants: 2,
500 null_variant: Some(1),
501 },
502 r#"["double", "int"]"#,
503 false,
504 ),
505 (
506 Value::Union {
507 index: 3,
508 inner: Box::new(Value::Int(42)),
509 n_variants: 4,
510 null_variant: Some(0),
511 },
512 r#"["null", "double", "string", "int"]"#,
513 true,
514 ),
515 (
516 Value::Array(vec![Value::Long(42i64)]),
517 r#"{"type": "array", "items": "long"}"#,
518 true,
519 ),
520 (
521 Value::Array(vec![Value::Boolean(true)]),
522 r#"{"type": "array", "items": "long"}"#,
523 false,
524 ),
525 (Value::Record(vec![]), "\"null\"", false),
526 ];
527
528 for (value, schema_str, valid) in value_schema_valid.into_iter() {
529 let schema = Schema::from_str(schema_str).unwrap();
530 assert_eq!(
531 valid,
532 value.validate(schema.top_node()),
533 "Schema failed to validate against value: {} {:#?}",
534 schema_str,
535 value
536 );
537 }
538 }
539
540 #[mz_ore::test]
541 fn validate_fixed() {
542 let schema =
543 Schema::from_str(r#"{"type": "fixed", "size": 4, "name": "some_fixed"}"#).unwrap();
544
545 assert!(Value::Fixed(4, vec![0, 0, 0, 0]).validate(schema.top_node()));
546 assert!(!Value::Fixed(5, vec![0, 0, 0, 0, 0]).validate(schema.top_node()));
547 }
548
549 #[mz_ore::test]
550 fn validate_enum() {
551 let schema = Schema::from_str(r#"{"type": "enum", "name": "some_enum", "symbols": ["spades", "hearts", "diamonds", "clubs"]}"#).unwrap();
552
553 assert!(Value::Enum(0, "spades".to_string()).validate(schema.top_node()));
554 assert!(Value::String("spades".to_string()).validate(schema.top_node()));
555
556 assert!(!Value::Enum(1, "spades".to_string()).validate(schema.top_node()));
557 assert!(!Value::String("lorem".to_string()).validate(schema.top_node()));
558
559 let other_schema = Schema::from_str(r#"{"type": "enum", "name": "some_other_enum", "symbols": ["hearts", "diamonds", "clubs", "spades"]}"#).unwrap();
560
561 assert!(!Value::Enum(0, "spades".to_string()).validate(other_schema.top_node()));
562 }
563
564 #[mz_ore::test]
565 fn validate_record() {
566 let schema = Schema::from_str(
567 r#"{
568 "type": "record",
569 "fields": [
570 {"type": "long", "name": "a"},
571 {"type": "string", "name": "b"}
572 ],
573 "name": "some_record"
574 }"#,
575 )
576 .unwrap();
577
578 assert!(
579 Value::Record(vec![
580 ("a".to_string(), Value::Long(42i64)),
581 ("b".to_string(), Value::String("foo".to_string())),
582 ])
583 .validate(schema.top_node())
584 );
585
586 assert!(
587 !Value::Record(vec![
588 ("b".to_string(), Value::String("foo".to_string())),
589 ("a".to_string(), Value::Long(42i64)),
590 ])
591 .validate(schema.top_node())
592 );
593
594 assert!(
595 !Value::Record(vec![
596 ("a".to_string(), Value::Boolean(false)),
597 ("b".to_string(), Value::String("foo".to_string())),
598 ])
599 .validate(schema.top_node())
600 );
601
602 assert!(
603 !Value::Record(vec![
604 ("a".to_string(), Value::Long(42i64)),
605 ("c".to_string(), Value::String("foo".to_string())),
606 ])
607 .validate(schema.top_node())
608 );
609
610 assert!(
611 !Value::Record(vec![
612 ("a".to_string(), Value::Long(42i64)),
613 ("b".to_string(), Value::String("foo".to_string())),
614 ("c".to_string(), Value::Null),
615 ])
616 .validate(schema.top_node())
617 );
618 }
619
620 #[mz_ore::test]
621 fn validate_decimal() {
622 assert!(
623 Value::Decimal(DecimalValue {
624 unscaled: vec![7],
625 precision: 12,
626 scale: 5
627 })
628 .validate(
629 Schema::from_str(
630 r#"
631 {
632 "type": "bytes",
633 "logicalType": "decimal",
634 "precision": 12,
635 "scale": 5
636 }
637 "#
638 )
639 .unwrap()
640 .top_node()
641 )
642 );
643
644 assert!(
645 !Value::Decimal(DecimalValue {
646 unscaled: vec![7],
647 precision: 13,
648 scale: 5
649 })
650 .validate(
651 Schema::from_str(
652 r#"
653 {
654 "type": "bytes",
655 "logicalType": "decimal",
656 "precision": 12,
657 "scale": 5
658 }
659 "#
660 )
661 .unwrap()
662 .top_node()
663 )
664 );
665 }
666}