1use std::str::FromStr;
13use std::sync::Arc;
14
15use arrow::array::{Array, AsArray, ListArray, NullArray, StructArray, new_null_array};
16use arrow::datatypes::{DataType, Field, FieldRef, Fields, SchemaBuilder};
17use itertools::Itertools;
18use mz_ore::cast::CastFrom;
19use mz_proto::{ProtoType, RustType, TryFromProtoError};
20use proptest_derive::Arbitrary;
21use serde::{Deserialize, Serialize};
22
23#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize, Arbitrary)]
26#[serde(try_from = "String", into = "String")]
27pub struct SchemaId(pub usize);
28
29impl std::fmt::Display for SchemaId {
30 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
31 write!(f, "h{}", self.0)
32 }
33}
34
35impl From<SchemaId> for String {
36 fn from(schema_id: SchemaId) -> Self {
37 schema_id.to_string()
38 }
39}
40
41impl TryFrom<String> for SchemaId {
42 type Error = String;
43 fn try_from(encoded: String) -> Result<Self, Self::Error> {
44 let encoded = match encoded.strip_prefix('h') {
45 Some(x) => x,
46 None => return Err(format!("invalid SchemaId {}: incorrect prefix", encoded)),
47 };
48 let schema_id = u64::from_str(encoded)
49 .map_err(|err| format!("invalid SchemaId {}: {}", encoded, err))?;
50 Ok(SchemaId(usize::cast_from(schema_id)))
51 }
52}
53
54impl RustType<u64> for SchemaId {
55 fn into_proto(&self) -> u64 {
56 self.0.into_proto()
57 }
58
59 fn from_proto(proto: u64) -> Result<Self, TryFromProtoError> {
60 Ok(SchemaId(proto.into_rust()?))
61 }
62}
63
64pub fn backward_compatible(old: &DataType, new: &DataType) -> Option<Migration> {
68 backward_compatible_typ(old, new).map(Migration)
69}
70
71#[derive(Debug, PartialEq)]
73pub struct Migration(ArrayMigration);
74
75impl Migration {
76 pub fn contains_drop(&self) -> bool {
79 self.0.contains_drop()
80 }
81
82 pub fn preserves_order(&self) -> bool {
84 !self.contains_drop()
87 }
88
89 pub fn migrate(&self, array: Arc<dyn Array>) -> Arc<dyn Array> {
92 self.0.migrate(array)
93 }
94}
95
96#[derive(Debug, PartialEq)]
97pub(crate) enum ArrayMigration {
98 NoOp,
99 Struct(Vec<StructArrayMigration>),
100 List(FieldRef, Box<ArrayMigration>),
101}
102
103#[derive(Debug, PartialEq)]
104pub(crate) enum StructArrayMigration {
105 AddFieldNullableAtEnd {
108 name: String,
109 typ: DataType,
110 },
111 DropField {
113 name: String,
114 },
115 MakeNull {
119 name: String,
120 },
121 AlterFieldNullable {
123 name: String,
124 },
125 Recurse {
126 name: String,
127 migration: ArrayMigration,
128 },
129}
130
131impl ArrayMigration {
132 fn contains_drop(&self) -> bool {
133 use ArrayMigration::*;
134 match self {
135 NoOp => false,
136 Struct(xs) => xs.iter().any(|x| x.contains_drop()),
137 List(_f, x) => x.contains_drop(),
138 }
139 }
140
141 fn migrate(&self, array: Arc<dyn Array>) -> Arc<dyn Array> {
142 use ArrayMigration::*;
143 match self {
144 NoOp => array,
145 Struct(migrations) => {
146 let len = array.len();
147
148 let (mut fields, mut arrays, nulls) = match array.data_type() {
149 DataType::Null => {
150 let all_add_nullable = migrations.iter().all(|action| {
151 matches!(action, StructArrayMigration::AddFieldNullableAtEnd { .. })
152 });
153 assert!(all_add_nullable, "invalid migrations, {migrations:?}");
154 (Fields::empty(), Vec::new(), None)
155 }
156 DataType::Struct(_) => {
157 let array = array
158 .as_any()
159 .downcast_ref::<StructArray>()
160 .expect("known to be StructArray")
161 .clone();
162 array.into_parts()
163 }
164 other => panic!("expected Struct or Null got {other:?}"),
165 };
166
167 for migration in migrations {
168 migration.migrate(len, &mut fields, &mut arrays);
169 }
170 assert_eq!(fields.len(), arrays.len(), "invalid migration");
171
172 let array = if arrays.is_empty() {
173 StructArray::new_empty_fields(len, nulls)
174 } else {
175 StructArray::new(fields, arrays, nulls)
176 };
177 Arc::new(array)
178 }
179 List(field, entry_migration) => {
180 let list_array: ListArray = if let Some(list_array) = array.as_list_opt() {
181 list_array.clone()
182 } else if let Some(map_array) = array.as_map_opt() {
183 map_array.clone().into()
184 } else {
185 panic!("expected list-like array; got {:?}", array.data_type())
186 };
187
188 let (_field, offsets, entries, nulls) = list_array.into_parts();
189 let entries = entry_migration.migrate(entries);
190 Arc::new(ListArray::new(Arc::clone(field), offsets, entries, nulls))
191 }
192 }
193 }
194}
195
196impl StructArrayMigration {
197 fn contains_drop(&self) -> bool {
198 use StructArrayMigration::*;
199 match self {
200 AddFieldNullableAtEnd { .. } => false,
201 DropField { .. } | MakeNull { .. } => true,
202 AlterFieldNullable { .. } => false,
203 Recurse { migration, .. } => migration.contains_drop(),
204 }
205 }
206
207 fn migrate(&self, len: usize, fields: &mut Fields, arrays: &mut Vec<Arc<dyn Array>>) {
208 use StructArrayMigration::*;
209 match self {
210 AddFieldNullableAtEnd { name, typ } => {
211 arrays.push(new_null_array(typ, len));
212 let mut f = SchemaBuilder::from(&*fields);
213 f.push(Arc::new(Field::new(name, typ.clone(), true)));
214 *fields = f.finish().fields;
215 }
216 DropField { name } => {
217 let (idx, _) = fields
218 .find(name)
219 .unwrap_or_else(|| panic!("expected to find field {} in {:?}", name, fields));
220 arrays.remove(idx);
221 let mut f = SchemaBuilder::from(&*fields);
222 f.remove(idx);
223 *fields = f.finish().fields;
224 }
225 MakeNull { name } => {
226 let (idx, _) = fields
227 .find(name)
228 .unwrap_or_else(|| panic!("expected to find field {} in {:?}", name, fields));
229 let array_len = arrays
230 .get(idx)
231 .expect("checked above that this exists")
232 .len();
233 arrays[idx] = Arc::new(NullArray::new(array_len));
234 let mut f = SchemaBuilder::from(&*fields);
235 let field = f.field_mut(idx);
236 *field = Arc::new(Field::new(name.clone(), DataType::Null, true));
237 *fields = f.finish().fields;
238 }
239 AlterFieldNullable { name } => {
240 let (idx, _) = fields
241 .find(name)
242 .unwrap_or_else(|| panic!("expected to find field {} in {:?}", name, fields));
243 let mut f = SchemaBuilder::from(&*fields);
244 let field = f.field_mut(idx);
245 assert_eq!(field.is_nullable(), false);
247 *field = Arc::new(Field::new(field.name(), field.data_type().clone(), true));
248 *fields = f.finish().fields;
249 }
250 Recurse { name, migration } => {
251 let (idx, _) = fields
252 .find(name)
253 .unwrap_or_else(|| panic!("expected to find field {} in {:?}", name, fields));
254 arrays[idx] = migration.migrate(Arc::clone(&arrays[idx]));
255 let mut f = SchemaBuilder::from(&*fields);
256 *f.field_mut(idx) = Arc::new(Field::new(
257 name,
258 arrays[idx].data_type().clone(),
259 f.field(idx).is_nullable(),
260 ));
261 *fields = f.finish().fields;
262 }
263 }
264 }
265}
266
267fn backward_compatible_typ(old: &DataType, new: &DataType) -> Option<ArrayMigration> {
268 use ArrayMigration::NoOp;
269 use DataType::*;
270 match (old, new) {
271 (Null, Struct(fields)) if fields.iter().all(|field| field.is_nullable()) => {
272 let migrations = fields
273 .iter()
274 .map(|field| StructArrayMigration::AddFieldNullableAtEnd {
275 name: field.name().clone(),
276 typ: field.data_type().clone(),
277 })
278 .collect();
279 Some(ArrayMigration::Struct(migrations))
280 }
281 (
282 Null | Boolean | Int8 | Int16 | Int32 | Int64 | UInt8 | UInt16 | UInt32 | UInt64
283 | Float16 | Float32 | Float64 | Binary | Utf8 | Date32 | Date64 | LargeBinary
284 | BinaryView | LargeUtf8 | Utf8View,
285 _,
286 ) => (old == new).then_some(NoOp),
287 (FixedSizeBinary(o), FixedSizeBinary(n)) => (o == n).then_some(NoOp),
288 (FixedSizeBinary(_), _) => None,
289 (Struct(o), Struct(n)) => backward_compatible_struct(o, n),
290 (Struct(_), _) => None,
291 (List(o), List(n)) | (Map(o, _), List(n)) => {
292 if o.is_nullable() && !n.is_nullable() {
296 None
297 } else {
298 let nested = backward_compatible_typ(o.data_type(), n.data_type())?;
299 let migration =
300 if matches!(old, DataType::List(_)) && o == n && nested == ArrayMigration::NoOp
301 {
302 ArrayMigration::NoOp
303 } else {
304 ArrayMigration::List(Arc::clone(n), nested.into())
305 };
306 Some(migration)
307 }
308 }
309 (List(_), _) => None,
310 (Map(o, _), Map(n, _)) => (o == n).then_some(NoOp),
311 (Map(_, _), _) => None,
312 (
313 Timestamp(_, _)
314 | Time32(_)
315 | Time64(_)
316 | Duration(_)
317 | Interval(_)
318 | ListView(_)
319 | FixedSizeList(_, _)
320 | LargeList(_)
321 | LargeListView(_)
322 | Union(_, _)
323 | Dictionary(_, _)
324 | Decimal32(_, _)
325 | Decimal64(_, _)
326 | Decimal128(_, _)
327 | Decimal256(_, _)
328 | RunEndEncoded(_, _),
329 _,
330 ) => unimplemented!("not used in mz: old={:?} new={:?}", old, new),
331 }
332}
333
334fn backward_compatible_struct(old: &Fields, new: &Fields) -> Option<ArrayMigration> {
335 use ArrayMigration::*;
336 use StructArrayMigration::*;
337
338 let mut added_field = false;
339 let mut field_migrations = Vec::new();
340 for n in new.iter() {
341 let o = old.find(n.name());
346 let o = match o {
347 Some(_) if added_field => return None,
351 Some((_, o)) => o,
352 None if !n.is_nullable() => return None,
354 None => {
355 added_field = true;
356 field_migrations.push(AddFieldNullableAtEnd {
357 name: n.name().to_owned(),
358 typ: n.data_type().clone(),
359 });
360 continue;
361 }
362 };
363
364 if o.is_nullable() && !n.is_nullable() {
366 return None;
367 }
368
369 if matches!(o.data_type(), DataType::Struct(_))
375 && o.is_nullable()
376 && n.data_type().is_null()
377 {
378 field_migrations.push(MakeNull {
379 name: n.name().clone(),
380 });
381 continue;
382 }
383
384 let make_nullable = !o.is_nullable() && n.is_nullable();
386
387 match backward_compatible_typ(o.data_type(), n.data_type()) {
388 None => return None,
389 Some(NoOp) if make_nullable => {
390 field_migrations.push(AlterFieldNullable {
391 name: n.name().clone(),
392 });
393 }
394 Some(NoOp) => continue,
395 Some(migration) => {
396 fn recursively_all_nullable(migration: &ArrayMigration) -> bool {
398 match migration {
399 NoOp => true,
400 List(_field, child) => recursively_all_nullable(child),
401 Struct(children) => children.iter().all(|child| match child {
402 AddFieldNullableAtEnd { .. } | DropField { .. } | MakeNull { .. } => {
403 false
404 }
405 AlterFieldNullable { .. } => true,
406 Recurse { migration, .. } => recursively_all_nullable(migration),
407 }),
408 }
409 }
410
411 if make_nullable {
418 if recursively_all_nullable(&migration) {
419 field_migrations.extend([
420 AlterFieldNullable {
421 name: n.name().clone(),
422 },
423 Recurse {
424 name: n.name().clone(),
425 migration,
426 },
427 ]);
428 } else {
429 return None;
430 }
431 } else {
432 field_migrations.push(Recurse {
433 name: n.name().clone(),
434 migration,
435 })
436 }
437 }
438 }
439 }
440
441 for o in old.iter() {
442 let n = new.find(o.name());
443 if n.is_none() {
444 field_migrations.push(DropField {
446 name: o.name().to_owned(),
447 });
448 }
449 }
450
451 let same_order = new
453 .iter()
454 .flat_map(|n| old.find(n.name()))
455 .tuple_windows()
456 .all(|((i, _), (j, _))| i <= j);
457 if !same_order {
458 return None;
459 }
460
461 if field_migrations.is_empty() {
462 Some(NoOp)
463 } else {
464 Some(Struct(field_migrations))
465 }
466}
467
468#[cfg(test)]
469mod tests {
470 use arrow::array::new_empty_array;
471 use arrow::datatypes::Field;
472
473 use super::*;
474
475 #[track_caller]
476 fn testcase(old: DataType, new: DataType, expected: Option<bool>) {
477 let migration = super::backward_compatible_typ(&old, &new);
478 let actual = migration.as_ref().map(|x| x.contains_drop());
479 assert_eq!(actual, expected);
480 if let Some(migration) = migration {
483 let (old, new) = (new_empty_array(&old), new_empty_array(&new));
484 let migrated = migration.migrate(old);
485 assert_eq!(new.data_type(), migrated.data_type());
486 }
487 }
488
489 fn struct_(fields: impl IntoIterator<Item = (&'static str, DataType, bool)>) -> DataType {
490 let fields = fields
491 .into_iter()
492 .map(|(name, typ, nullable)| Field::new(name, typ, nullable))
493 .collect();
494 DataType::Struct(fields)
495 }
496
497 #[mz_ore::test]
500 fn backward_compatible() {
501 use DataType::*;
502
503 testcase(Boolean, Boolean, Some(false));
505 testcase(Utf8, Utf8, Some(false));
506
507 testcase(Boolean, Utf8, None);
509 testcase(Utf8, Boolean, None);
510
511 testcase(
513 struct_([("a", Boolean, true)]),
514 struct_([("a", Boolean, true)]),
515 Some(false),
516 );
517 testcase(
518 struct_([("a", Boolean, false)]),
519 struct_([("a", Boolean, false)]),
520 Some(false),
521 );
522
523 testcase(
525 struct_([("a", Boolean, true)]),
526 struct_([("a", Boolean, false)]),
527 None,
528 );
529 testcase(
530 struct_([("a", Boolean, false)]),
531 struct_([("a", Boolean, true)]),
532 Some(false),
533 );
534
535 testcase(struct_([]), struct_([("a", Boolean, true)]), Some(false));
537 testcase(struct_([]), struct_([("a", Boolean, false)]), None);
538 testcase(struct_([("a", Boolean, true)]), struct_([]), Some(true));
539 testcase(struct_([("a", Boolean, false)]), struct_([]), Some(true));
540
541 testcase(
543 struct_([("a", Boolean, true)]),
544 struct_([("b", Boolean, true)]),
545 Some(true),
546 );
547
548 testcase(
550 struct_([]),
551 struct_([("a", Boolean, true), ("b", Boolean, true)]),
552 Some(false),
553 );
554
555 testcase(
557 struct_([("a", struct_([("b", Boolean, false)]), false)]),
558 struct_([("a", struct_([("b", Boolean, false)]), false)]),
559 Some(false),
560 );
561 testcase(
562 struct_([("a", struct_([]), false)]),
563 struct_([("a", struct_([("b", Boolean, true)]), false)]),
564 Some(false),
565 );
566 testcase(
567 struct_([("a", struct_([]), false)]),
568 struct_([("a", struct_([("b", Boolean, false)]), false)]),
569 None,
570 );
571 testcase(
572 struct_([("a", struct_([("b", Boolean, true)]), false)]),
573 struct_([("a", struct_([]), false)]),
574 Some(true),
575 );
576 testcase(
577 struct_([("a", struct_([("b", Boolean, false)]), false)]),
578 struct_([("a", struct_([]), false)]),
579 Some(true),
580 );
581
582 testcase(
586 struct_([("a", struct_([]), false)]),
587 struct_([("a", struct_([("b", Boolean, false)]), true)]),
588 None,
589 );
590
591 testcase(
595 struct_([("a", Boolean, false), ("b", Utf8, false)]),
596 struct_([("b", Utf8, false), ("a", Boolean, false)]),
597 None,
598 );
599
600 testcase(
603 struct_([("2", Boolean, false), ("10", Utf8, false)]),
604 struct_([("10", Utf8, false)]),
605 Some(true),
606 );
607
608 testcase(
611 struct_([("a", Boolean, true), ("c", Boolean, true)]),
612 struct_([
613 ("a", Boolean, true),
614 ("b", Boolean, true),
615 ("c", Boolean, true),
616 ]),
617 None,
618 );
619
620 testcase(Null, struct_([("a", Boolean, true)]), Some(false));
624
625 testcase(
627 Map(
628 Field::new_struct(
629 "map_entries",
630 vec![
631 Field::new("keys", Utf8, false),
632 Field::new("values", Boolean, true),
633 ],
634 false,
635 )
636 .into(),
637 true,
638 ),
639 List(
640 Field::new_struct(
641 "map_entries",
642 vec![
643 Field::new("keys", Utf8, false),
644 Field::new("values", Boolean, true),
645 ],
646 false,
647 )
648 .into(),
649 ),
650 Some(false),
651 );
652
653 testcase(
655 List(Field::new_struct("entries", vec![Field::new("keys", Utf8, false)], true).into()),
656 List(
657 Field::new_struct(
658 "entries",
659 vec![
660 Field::new("keys", Utf8, false),
661 Field::new("values", Boolean, true),
662 ],
663 true,
664 )
665 .into(),
666 ),
667 Some(false),
668 );
669
670 testcase(
672 struct_([("0", struct_([("foo", Utf8, false)]), false)]),
673 struct_([("0", struct_([("foo", Utf8, true)]), true)]),
674 Some(false),
675 )
676 }
677
678 #[mz_ore::test]
682 fn backwards_compatible_nested_types() {
683 use DataType::*;
684
685 testcase(
686 struct_([
687 (
688 "ok",
689 struct_([
690 (
691 "0",
692 List(
693 Field::new_struct(
694 "map_entries",
695 vec![
696 Field::new("key", Utf8, false),
697 Field::new("val", Int32, true),
698 ],
699 false,
700 )
701 .into(),
702 ),
703 true,
704 ),
705 (
706 "1",
707 List(
708 Field::new_struct(
709 "map_entries",
710 vec![
711 Field::new("key", Utf8, false),
712 Field::new("val", Int32, true),
713 ],
714 false,
715 )
716 .into(),
717 ),
718 false,
719 ),
720 (
721 "2",
722 List(
723 Field::new_list("item", Field::new_list_field(Int32, true), true)
724 .into(),
725 ),
726 true,
727 ),
728 (
729 "3",
730 List(
731 Field::new_list("item", Field::new_list_field(Int32, true), true)
732 .into(),
733 ),
734 false,
735 ),
736 ("4", struct_([("0", Int32, true), ("1", Utf8, true)]), true),
737 (
738 "5",
739 struct_([("0", Int32, false), ("1", Utf8, false)]),
740 false,
741 ),
742 ("6", Utf8, true),
743 (
744 "7",
745 struct_([
746 (
747 "dims",
748 List(Field::new_list_field(FixedSizeBinary(16), true).into()),
749 true,
750 ),
751 ("vals", List(Field::new_list_field(Utf8, true).into()), true),
752 ]),
753 false,
754 ),
755 ]),
756 true,
757 ),
758 ("err", Binary, true),
759 ]),
760 struct_([
761 (
762 "ok",
763 struct_([
764 (
765 "0",
766 List(
767 Field::new_struct(
768 "map_entries",
769 vec![
770 Field::new("key", Utf8, false),
771 Field::new("val", Int32, true),
772 ],
773 false,
774 )
775 .into(),
776 ),
777 true,
778 ),
779 (
780 "1",
781 List(
782 Field::new_struct(
783 "map_entries",
784 vec![
785 Field::new("key", Utf8, false),
786 Field::new("val", Int32, true),
787 ],
788 false,
789 )
790 .into(),
791 ),
792 true,
793 ),
794 (
795 "2",
796 List(
797 Field::new_list("item", Field::new_list_field(Int32, true), true)
798 .into(),
799 ),
800 true,
801 ),
802 (
803 "3",
804 List(
805 Field::new_list("item", Field::new_list_field(Int32, true), true)
806 .into(),
807 ),
808 true,
809 ),
810 ("4", struct_([("0", Int32, true), ("1", Utf8, true)]), true),
811 ("5", struct_([("0", Int32, true), ("1", Utf8, true)]), true),
812 ("6", Utf8, true),
813 (
814 "7",
815 struct_([
816 (
817 "dims",
818 List(Field::new_list_field(FixedSizeBinary(16), true).into()),
819 true,
820 ),
821 ("vals", List(Field::new_list_field(Utf8, true).into()), true),
822 ]),
823 true,
824 ),
825 ]),
826 true,
827 ),
828 ("err", Binary, true),
829 ]),
830 Some(false),
832 )
833 }
834}