1use std::str::FromStr;
13use std::sync::Arc;
14
15use arrow::array::{Array, AsArray, ListArray, NullArray, StructArray, new_null_array};
16use arrow::datatypes::{DataType, Field, FieldRef, Fields, SchemaBuilder};
17use itertools::Itertools;
18use mz_ore::cast::CastFrom;
19use mz_proto::{ProtoType, RustType, TryFromProtoError};
20use proptest_derive::Arbitrary;
21use serde::{Deserialize, Serialize};
22
23#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize, Arbitrary)]
26#[serde(try_from = "String", into = "String")]
27pub struct SchemaId(pub usize);
28
29impl std::fmt::Display for SchemaId {
30 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
31 write!(f, "h{}", self.0)
32 }
33}
34
35impl From<SchemaId> for String {
36 fn from(schema_id: SchemaId) -> Self {
37 schema_id.to_string()
38 }
39}
40
41impl TryFrom<String> for SchemaId {
42 type Error = String;
43 fn try_from(encoded: String) -> Result<Self, Self::Error> {
44 let encoded = match encoded.strip_prefix('h') {
45 Some(x) => x,
46 None => return Err(format!("invalid SchemaId {}: incorrect prefix", encoded)),
47 };
48 let schema_id = u64::from_str(encoded)
49 .map_err(|err| format!("invalid SchemaId {}: {}", encoded, err))?;
50 Ok(SchemaId(usize::cast_from(schema_id)))
51 }
52}
53
54impl RustType<u64> for SchemaId {
55 fn into_proto(&self) -> u64 {
56 self.0.into_proto()
57 }
58
59 fn from_proto(proto: u64) -> Result<Self, TryFromProtoError> {
60 Ok(SchemaId(proto.into_rust()?))
61 }
62}
63
64pub fn backward_compatible(old: &DataType, new: &DataType) -> Option<Migration> {
68 backward_compatible_typ(old, new).map(Migration)
69}
70
71#[derive(Debug, PartialEq)]
73pub struct Migration(ArrayMigration);
74
75impl Migration {
76 pub fn contains_drop(&self) -> bool {
79 self.0.contains_drop()
80 }
81
82 pub fn preserves_order(&self) -> bool {
84 !self.contains_drop()
87 }
88
89 pub fn migrate(&self, array: Arc<dyn Array>) -> Arc<dyn Array> {
92 self.0.migrate(array)
93 }
94}
95
96#[derive(Debug, PartialEq)]
97pub(crate) enum ArrayMigration {
98 NoOp,
99 Struct(Vec<StructArrayMigration>),
100 List(FieldRef, Box<ArrayMigration>),
101}
102
103#[derive(Debug, PartialEq)]
104pub(crate) enum StructArrayMigration {
105 AddFieldNullableAtEnd {
108 name: String,
109 typ: DataType,
110 },
111 DropField {
113 name: String,
114 },
115 MakeNull {
119 name: String,
120 },
121 AlterFieldNullable {
123 name: String,
124 },
125 Recurse {
126 name: String,
127 migration: ArrayMigration,
128 },
129}
130
131impl ArrayMigration {
132 fn contains_drop(&self) -> bool {
133 use ArrayMigration::*;
134 match self {
135 NoOp => false,
136 Struct(xs) => xs.iter().any(|x| x.contains_drop()),
137 List(_f, x) => x.contains_drop(),
138 }
139 }
140
141 fn migrate(&self, array: Arc<dyn Array>) -> Arc<dyn Array> {
142 use ArrayMigration::*;
143 match self {
144 NoOp => array,
145 Struct(migrations) => {
146 let len = array.len();
147
148 let (mut fields, mut arrays, nulls) = match array.data_type() {
149 DataType::Null => {
150 let all_add_nullable = migrations.iter().all(|action| {
151 matches!(action, StructArrayMigration::AddFieldNullableAtEnd { .. })
152 });
153 assert!(all_add_nullable, "invalid migrations, {migrations:?}");
154 (Fields::empty(), Vec::new(), None)
155 }
156 DataType::Struct(_) => {
157 let array = array
158 .as_any()
159 .downcast_ref::<StructArray>()
160 .expect("known to be StructArray")
161 .clone();
162 array.into_parts()
163 }
164 other => panic!("expected Struct or Null got {other:?}"),
165 };
166
167 for migration in migrations {
168 migration.migrate(len, &mut fields, &mut arrays);
169 }
170 assert_eq!(fields.len(), arrays.len(), "invalid migration");
171
172 Arc::new(StructArray::new(fields, arrays, nulls))
173 }
174 List(field, entry_migration) => {
175 let list_array: ListArray = if let Some(list_array) = array.as_list_opt() {
176 list_array.clone()
177 } else if let Some(map_array) = array.as_map_opt() {
178 map_array.clone().into()
179 } else {
180 panic!("expected list-like array; got {:?}", array.data_type())
181 };
182
183 let (_field, offsets, entries, nulls) = list_array.into_parts();
184 let entries = entry_migration.migrate(entries);
185 Arc::new(ListArray::new(Arc::clone(field), offsets, entries, nulls))
186 }
187 }
188 }
189}
190
191impl StructArrayMigration {
192 fn contains_drop(&self) -> bool {
193 use StructArrayMigration::*;
194 match self {
195 AddFieldNullableAtEnd { .. } => false,
196 DropField { .. } | MakeNull { .. } => true,
197 AlterFieldNullable { .. } => false,
198 Recurse { migration, .. } => migration.contains_drop(),
199 }
200 }
201
202 fn migrate(&self, len: usize, fields: &mut Fields, arrays: &mut Vec<Arc<dyn Array>>) {
203 use StructArrayMigration::*;
204 match self {
205 AddFieldNullableAtEnd { name, typ } => {
206 arrays.push(new_null_array(typ, len));
207 let mut f = SchemaBuilder::from(&*fields);
208 f.push(Arc::new(Field::new(name, typ.clone(), true)));
209 *fields = f.finish().fields;
210 }
211 DropField { name } => {
212 let (idx, _) = fields
213 .find(name)
214 .unwrap_or_else(|| panic!("expected to find field {} in {:?}", name, fields));
215 arrays.remove(idx);
216 let mut f = SchemaBuilder::from(&*fields);
217 f.remove(idx);
218 *fields = f.finish().fields;
219 }
220 MakeNull { name } => {
221 let (idx, _) = fields
222 .find(name)
223 .unwrap_or_else(|| panic!("expected to find field {} in {:?}", name, fields));
224 let array_len = arrays
225 .get(idx)
226 .expect("checked above that this exists")
227 .len();
228 arrays[idx] = Arc::new(NullArray::new(array_len));
229 let mut f = SchemaBuilder::from(&*fields);
230 let field = f.field_mut(idx);
231 *field = Arc::new(Field::new(name.clone(), DataType::Null, true));
232 *fields = f.finish().fields;
233 }
234 AlterFieldNullable { name } => {
235 let (idx, _) = fields
236 .find(name)
237 .unwrap_or_else(|| panic!("expected to find field {} in {:?}", name, fields));
238 let mut f = SchemaBuilder::from(&*fields);
239 let field = f.field_mut(idx);
240 assert_eq!(field.is_nullable(), false);
242 *field = Arc::new(Field::new(field.name(), field.data_type().clone(), true));
243 *fields = f.finish().fields;
244 }
245 Recurse { name, migration } => {
246 let (idx, _) = fields
247 .find(name)
248 .unwrap_or_else(|| panic!("expected to find field {} in {:?}", name, fields));
249 arrays[idx] = migration.migrate(Arc::clone(&arrays[idx]));
250 let mut f = SchemaBuilder::from(&*fields);
251 *f.field_mut(idx) = Arc::new(Field::new(
252 name,
253 arrays[idx].data_type().clone(),
254 f.field(idx).is_nullable(),
255 ));
256 *fields = f.finish().fields;
257 }
258 }
259 }
260}
261
262fn backward_compatible_typ(old: &DataType, new: &DataType) -> Option<ArrayMigration> {
263 use ArrayMigration::NoOp;
264 use DataType::*;
265 match (old, new) {
266 (Null, Struct(fields)) if fields.iter().all(|field| field.is_nullable()) => {
267 let migrations = fields
268 .iter()
269 .map(|field| StructArrayMigration::AddFieldNullableAtEnd {
270 name: field.name().clone(),
271 typ: field.data_type().clone(),
272 })
273 .collect();
274 Some(ArrayMigration::Struct(migrations))
275 }
276 (
277 Null | Boolean | Int8 | Int16 | Int32 | Int64 | UInt8 | UInt16 | UInt32 | UInt64
278 | Float16 | Float32 | Float64 | Binary | Utf8 | Date32 | Date64 | LargeBinary
279 | BinaryView | LargeUtf8 | Utf8View,
280 _,
281 ) => (old == new).then_some(NoOp),
282 (FixedSizeBinary(o), FixedSizeBinary(n)) => (o == n).then_some(NoOp),
283 (FixedSizeBinary(_), _) => None,
284 (Struct(o), Struct(n)) => backward_compatible_struct(o, n),
285 (Struct(_), _) => None,
286 (List(o), List(n)) | (Map(o, _), List(n)) => {
287 if o.is_nullable() && !n.is_nullable() {
291 None
292 } else {
293 let nested = backward_compatible_typ(o.data_type(), n.data_type())?;
294 let migration =
295 if matches!(old, DataType::List(_)) && o == n && nested == ArrayMigration::NoOp
296 {
297 ArrayMigration::NoOp
298 } else {
299 ArrayMigration::List(Arc::clone(n), nested.into())
300 };
301 Some(migration)
302 }
303 }
304 (List(_), _) => None,
305 (Map(o, _), Map(n, _)) => (o == n).then_some(NoOp),
306 (Map(_, _), _) => None,
307 (
308 Timestamp(_, _)
309 | Time32(_)
310 | Time64(_)
311 | Duration(_)
312 | Interval(_)
313 | ListView(_)
314 | FixedSizeList(_, _)
315 | LargeList(_)
316 | LargeListView(_)
317 | Union(_, _)
318 | Dictionary(_, _)
319 | Decimal128(_, _)
320 | Decimal256(_, _)
321 | RunEndEncoded(_, _),
322 _,
323 ) => unimplemented!("not used in mz: old={:?} new={:?}", old, new),
324 }
325}
326
327fn backward_compatible_struct(old: &Fields, new: &Fields) -> Option<ArrayMigration> {
328 use ArrayMigration::*;
329 use StructArrayMigration::*;
330
331 let mut added_field = false;
332 let mut field_migrations = Vec::new();
333 for n in new.iter() {
334 let o = old.find(n.name());
339 let o = match o {
340 Some(_) if added_field => return None,
344 Some((_, o)) => o,
345 None if !n.is_nullable() => return None,
347 None => {
348 added_field = true;
349 field_migrations.push(AddFieldNullableAtEnd {
350 name: n.name().to_owned(),
351 typ: n.data_type().clone(),
352 });
353 continue;
354 }
355 };
356
357 if o.is_nullable() && !n.is_nullable() {
359 return None;
360 }
361
362 if matches!(o.data_type(), DataType::Struct(_))
368 && o.is_nullable()
369 && n.data_type().is_null()
370 {
371 field_migrations.push(MakeNull {
372 name: n.name().clone(),
373 });
374 continue;
375 }
376
377 let make_nullable = !o.is_nullable() && n.is_nullable();
379
380 match backward_compatible_typ(o.data_type(), n.data_type()) {
381 None => return None,
382 Some(NoOp) if make_nullable => {
383 field_migrations.push(AlterFieldNullable {
384 name: n.name().clone(),
385 });
386 }
387 Some(NoOp) => continue,
388 Some(migration) => {
389 fn recursively_all_nullable(migration: &ArrayMigration) -> bool {
391 match migration {
392 NoOp => true,
393 List(_field, child) => recursively_all_nullable(child),
394 Struct(children) => children.iter().all(|child| match child {
395 AddFieldNullableAtEnd { .. } | DropField { .. } | MakeNull { .. } => {
396 false
397 }
398 AlterFieldNullable { .. } => true,
399 Recurse { migration, .. } => recursively_all_nullable(migration),
400 }),
401 }
402 }
403
404 if make_nullable {
411 if recursively_all_nullable(&migration) {
412 field_migrations.extend([
413 AlterFieldNullable {
414 name: n.name().clone(),
415 },
416 Recurse {
417 name: n.name().clone(),
418 migration,
419 },
420 ]);
421 } else {
422 return None;
423 }
424 } else {
425 field_migrations.push(Recurse {
426 name: n.name().clone(),
427 migration,
428 })
429 }
430 }
431 }
432 }
433
434 for o in old.iter() {
435 let n = new.find(o.name());
436 if n.is_none() {
437 field_migrations.push(DropField {
439 name: o.name().to_owned(),
440 });
441 }
442 }
443
444 let same_order = new
446 .iter()
447 .flat_map(|n| old.find(n.name()))
448 .tuple_windows()
449 .all(|((i, _), (j, _))| i <= j);
450 if !same_order {
451 return None;
452 }
453
454 if field_migrations.is_empty() {
455 Some(NoOp)
456 } else {
457 Some(Struct(field_migrations))
458 }
459}
460
461#[cfg(test)]
462mod tests {
463 use arrow::array::new_empty_array;
464 use arrow::datatypes::Field;
465
466 use super::*;
467
468 #[track_caller]
469 fn testcase(old: DataType, new: DataType, expected: Option<bool>) {
470 let migration = super::backward_compatible_typ(&old, &new);
471 let actual = migration.as_ref().map(|x| x.contains_drop());
472 assert_eq!(actual, expected);
473 if let Some(migration) = migration {
476 let (old, new) = (new_empty_array(&old), new_empty_array(&new));
477 let migrated = migration.migrate(old);
478 assert_eq!(new.data_type(), migrated.data_type());
479 }
480 }
481
482 fn struct_(fields: impl IntoIterator<Item = (&'static str, DataType, bool)>) -> DataType {
483 let fields = fields
484 .into_iter()
485 .map(|(name, typ, nullable)| Field::new(name, typ, nullable))
486 .collect();
487 DataType::Struct(fields)
488 }
489
490 #[mz_ore::test]
493 fn backward_compatible() {
494 use DataType::*;
495
496 testcase(Boolean, Boolean, Some(false));
498 testcase(Utf8, Utf8, Some(false));
499
500 testcase(Boolean, Utf8, None);
502 testcase(Utf8, Boolean, None);
503
504 testcase(
506 struct_([("a", Boolean, true)]),
507 struct_([("a", Boolean, true)]),
508 Some(false),
509 );
510 testcase(
511 struct_([("a", Boolean, false)]),
512 struct_([("a", Boolean, false)]),
513 Some(false),
514 );
515
516 testcase(
518 struct_([("a", Boolean, true)]),
519 struct_([("a", Boolean, false)]),
520 None,
521 );
522 testcase(
523 struct_([("a", Boolean, false)]),
524 struct_([("a", Boolean, true)]),
525 Some(false),
526 );
527
528 testcase(struct_([]), struct_([("a", Boolean, true)]), Some(false));
530 testcase(struct_([]), struct_([("a", Boolean, false)]), None);
531 testcase(struct_([("a", Boolean, true)]), struct_([]), Some(true));
532 testcase(struct_([("a", Boolean, false)]), struct_([]), Some(true));
533
534 testcase(
536 struct_([("a", Boolean, true)]),
537 struct_([("b", Boolean, true)]),
538 Some(true),
539 );
540
541 testcase(
543 struct_([]),
544 struct_([("a", Boolean, true), ("b", Boolean, true)]),
545 Some(false),
546 );
547
548 testcase(
550 struct_([("a", struct_([("b", Boolean, false)]), false)]),
551 struct_([("a", struct_([("b", Boolean, false)]), false)]),
552 Some(false),
553 );
554 testcase(
555 struct_([("a", struct_([]), false)]),
556 struct_([("a", struct_([("b", Boolean, true)]), false)]),
557 Some(false),
558 );
559 testcase(
560 struct_([("a", struct_([]), false)]),
561 struct_([("a", struct_([("b", Boolean, false)]), false)]),
562 None,
563 );
564 testcase(
565 struct_([("a", struct_([("b", Boolean, true)]), false)]),
566 struct_([("a", struct_([]), false)]),
567 Some(true),
568 );
569 testcase(
570 struct_([("a", struct_([("b", Boolean, false)]), false)]),
571 struct_([("a", struct_([]), false)]),
572 Some(true),
573 );
574
575 testcase(
579 struct_([("a", struct_([]), false)]),
580 struct_([("a", struct_([("b", Boolean, false)]), true)]),
581 None,
582 );
583
584 testcase(
588 struct_([("a", Boolean, false), ("b", Utf8, false)]),
589 struct_([("b", Utf8, false), ("a", Boolean, false)]),
590 None,
591 );
592
593 testcase(
596 struct_([("2", Boolean, false), ("10", Utf8, false)]),
597 struct_([("10", Utf8, false)]),
598 Some(true),
599 );
600
601 testcase(
604 struct_([("a", Boolean, true), ("c", Boolean, true)]),
605 struct_([
606 ("a", Boolean, true),
607 ("b", Boolean, true),
608 ("c", Boolean, true),
609 ]),
610 None,
611 );
612
613 testcase(Null, struct_([("a", Boolean, true)]), Some(false));
617
618 testcase(
620 Map(
621 Field::new_struct(
622 "map_entries",
623 vec![
624 Field::new("keys", Utf8, false),
625 Field::new("values", Boolean, true),
626 ],
627 false,
628 )
629 .into(),
630 true,
631 ),
632 List(
633 Field::new_struct(
634 "map_entries",
635 vec![
636 Field::new("keys", Utf8, false),
637 Field::new("values", Boolean, true),
638 ],
639 false,
640 )
641 .into(),
642 ),
643 Some(false),
644 );
645
646 testcase(
648 List(Field::new_struct("entries", vec![Field::new("keys", Utf8, false)], true).into()),
649 List(
650 Field::new_struct(
651 "entries",
652 vec![
653 Field::new("keys", Utf8, false),
654 Field::new("values", Boolean, true),
655 ],
656 true,
657 )
658 .into(),
659 ),
660 Some(false),
661 );
662
663 testcase(
665 struct_([("0", struct_([("foo", Utf8, false)]), false)]),
666 struct_([("0", struct_([("foo", Utf8, true)]), true)]),
667 Some(false),
668 )
669 }
670
671 #[mz_ore::test]
675 fn backwards_compatible_nested_types() {
676 use DataType::*;
677
678 testcase(
679 struct_([
680 (
681 "ok",
682 struct_([
683 (
684 "0",
685 List(
686 Field::new_struct(
687 "map_entries",
688 vec![
689 Field::new("key", Utf8, false),
690 Field::new("val", Int32, true),
691 ],
692 false,
693 )
694 .into(),
695 ),
696 true,
697 ),
698 (
699 "1",
700 List(
701 Field::new_struct(
702 "map_entries",
703 vec![
704 Field::new("key", Utf8, false),
705 Field::new("val", Int32, true),
706 ],
707 false,
708 )
709 .into(),
710 ),
711 false,
712 ),
713 (
714 "2",
715 List(
716 Field::new_list("item", Field::new_list_field(Int32, true), true)
717 .into(),
718 ),
719 true,
720 ),
721 (
722 "3",
723 List(
724 Field::new_list("item", Field::new_list_field(Int32, true), true)
725 .into(),
726 ),
727 false,
728 ),
729 ("4", struct_([("0", Int32, true), ("1", Utf8, true)]), true),
730 (
731 "5",
732 struct_([("0", Int32, false), ("1", Utf8, false)]),
733 false,
734 ),
735 ("6", Utf8, true),
736 (
737 "7",
738 struct_([
739 (
740 "dims",
741 List(Field::new_list_field(FixedSizeBinary(16), true).into()),
742 true,
743 ),
744 ("vals", List(Field::new_list_field(Utf8, true).into()), true),
745 ]),
746 false,
747 ),
748 ]),
749 true,
750 ),
751 ("err", Binary, true),
752 ]),
753 struct_([
754 (
755 "ok",
756 struct_([
757 (
758 "0",
759 List(
760 Field::new_struct(
761 "map_entries",
762 vec![
763 Field::new("key", Utf8, false),
764 Field::new("val", Int32, true),
765 ],
766 false,
767 )
768 .into(),
769 ),
770 true,
771 ),
772 (
773 "1",
774 List(
775 Field::new_struct(
776 "map_entries",
777 vec![
778 Field::new("key", Utf8, false),
779 Field::new("val", Int32, true),
780 ],
781 false,
782 )
783 .into(),
784 ),
785 true,
786 ),
787 (
788 "2",
789 List(
790 Field::new_list("item", Field::new_list_field(Int32, true), true)
791 .into(),
792 ),
793 true,
794 ),
795 (
796 "3",
797 List(
798 Field::new_list("item", Field::new_list_field(Int32, true), true)
799 .into(),
800 ),
801 true,
802 ),
803 ("4", struct_([("0", Int32, true), ("1", Utf8, true)]), true),
804 ("5", struct_([("0", Int32, true), ("1", Utf8, true)]), true),
805 ("6", Utf8, true),
806 (
807 "7",
808 struct_([
809 (
810 "dims",
811 List(Field::new_list_field(FixedSizeBinary(16), true).into()),
812 true,
813 ),
814 ("vals", List(Field::new_list_field(Utf8, true).into()), true),
815 ]),
816 true,
817 ),
818 ]),
819 true,
820 ),
821 ("err", Binary, true),
822 ]),
823 Some(false),
825 )
826 }
827}