1use std::str::FromStr;
13use std::sync::Arc;
14
15use arrow::array::{Array, AsArray, ListArray, NullArray, StructArray, new_null_array};
16use arrow::datatypes::{DataType, Field, FieldRef, Fields, SchemaBuilder};
17use itertools::Itertools;
18use mz_ore::cast::CastFrom;
19use mz_proto::{ProtoType, RustType, TryFromProtoError};
20use proptest_derive::Arbitrary;
21use serde::{Deserialize, Serialize};
22
23#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize, Arbitrary)]
26#[serde(try_from = "String", into = "String")]
27pub struct SchemaId(pub usize);
28
29impl std::fmt::Display for SchemaId {
30 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
31 write!(f, "h{}", self.0)
32 }
33}
34
35impl From<SchemaId> for String {
36 fn from(schema_id: SchemaId) -> Self {
37 schema_id.to_string()
38 }
39}
40
41impl TryFrom<String> for SchemaId {
42 type Error = String;
43 fn try_from(encoded: String) -> Result<Self, Self::Error> {
44 let encoded = match encoded.strip_prefix('h') {
45 Some(x) => x,
46 None => return Err(format!("invalid SchemaId {}: incorrect prefix", encoded)),
47 };
48 let schema_id = u64::from_str(encoded)
49 .map_err(|err| format!("invalid SchemaId {}: {}", encoded, err))?;
50 Ok(SchemaId(usize::cast_from(schema_id)))
51 }
52}
53
54impl RustType<u64> for SchemaId {
55 fn into_proto(&self) -> u64 {
56 self.0.into_proto()
57 }
58
59 fn from_proto(proto: u64) -> Result<Self, TryFromProtoError> {
60 Ok(SchemaId(proto.into_rust()?))
61 }
62}
63
64pub fn backward_compatible(old: &DataType, new: &DataType) -> Option<Migration> {
68 backward_compatible_typ(old, new).map(Migration)
69}
70
71#[derive(Debug, PartialEq)]
73pub struct Migration(ArrayMigration);
74
75impl Migration {
76 pub fn contains_drop(&self) -> bool {
79 self.0.contains_drop()
80 }
81
82 pub fn preserves_order(&self) -> bool {
84 !self.contains_drop()
87 }
88
89 pub fn migrate(&self, array: Arc<dyn Array>) -> Arc<dyn Array> {
92 self.0.migrate(array)
93 }
94}
95
96#[derive(Debug, PartialEq)]
97pub(crate) enum ArrayMigration {
98 NoOp,
99 Struct(Vec<StructArrayMigration>),
100 List(FieldRef, Box<ArrayMigration>),
101}
102
103#[derive(Debug, PartialEq)]
104pub(crate) enum StructArrayMigration {
105 AddFieldNullableAtEnd {
108 name: String,
109 typ: DataType,
110 },
111 DropField {
113 name: String,
114 },
115 MakeNull {
119 name: String,
120 },
121 AlterFieldNullable {
123 name: String,
124 },
125 Recurse {
126 name: String,
127 migration: ArrayMigration,
128 },
129}
130
131impl ArrayMigration {
132 fn contains_drop(&self) -> bool {
133 use ArrayMigration::*;
134 match self {
135 NoOp => false,
136 Struct(xs) => xs.iter().any(|x| x.contains_drop()),
137 List(_f, x) => x.contains_drop(),
138 }
139 }
140
141 fn migrate(&self, array: Arc<dyn Array>) -> Arc<dyn Array> {
142 use ArrayMigration::*;
143 match self {
144 NoOp => array,
145 Struct(migrations) => {
146 let len = array.len();
147
148 let (mut fields, mut arrays, nulls) = match array.data_type() {
149 DataType::Null => {
150 let all_add_nullable = migrations.iter().all(|action| {
151 matches!(action, StructArrayMigration::AddFieldNullableAtEnd { .. })
152 });
153 assert!(all_add_nullable, "invalid migrations, {migrations:?}");
154 (Fields::empty(), Vec::new(), None)
155 }
156 DataType::Struct(_) => {
157 let array = array
158 .as_any()
159 .downcast_ref::<StructArray>()
160 .expect("known to be StructArray")
161 .clone();
162 array.into_parts()
163 }
164 other => panic!("expected Struct or Null got {other:?}"),
165 };
166
167 for migration in migrations {
168 migration.migrate(len, &mut fields, &mut arrays);
169 }
170 assert_eq!(fields.len(), arrays.len(), "invalid migration");
171
172 let array = if arrays.is_empty() {
173 StructArray::new_empty_fields(len, nulls)
174 } else {
175 StructArray::new(fields, arrays, nulls)
176 };
177 Arc::new(array)
178 }
179 List(field, entry_migration) => {
180 let list_array: ListArray = if let Some(list_array) = array.as_list_opt() {
181 list_array.clone()
182 } else if let Some(map_array) = array.as_map_opt() {
183 map_array.clone().into()
184 } else {
185 panic!("expected list-like array; got {:?}", array.data_type())
186 };
187
188 let (_field, offsets, entries, nulls) = list_array.into_parts();
189 let entries = entry_migration.migrate(entries);
190 Arc::new(ListArray::new(Arc::clone(field), offsets, entries, nulls))
191 }
192 }
193 }
194}
195
196impl StructArrayMigration {
197 fn contains_drop(&self) -> bool {
198 use StructArrayMigration::*;
199 match self {
200 AddFieldNullableAtEnd { .. } => false,
201 DropField { .. } | MakeNull { .. } => true,
202 AlterFieldNullable { .. } => false,
203 Recurse { migration, .. } => migration.contains_drop(),
204 }
205 }
206
207 fn migrate(&self, len: usize, fields: &mut Fields, arrays: &mut Vec<Arc<dyn Array>>) {
208 use StructArrayMigration::*;
209 match self {
210 AddFieldNullableAtEnd { name, typ } => {
211 arrays.push(new_null_array(typ, len));
212 let mut f = SchemaBuilder::from(&*fields);
213 f.push(Arc::new(Field::new(name, typ.clone(), true)));
214 *fields = f.finish().fields;
215 }
216 DropField { name } => {
217 let (idx, _) = fields
218 .find(name)
219 .unwrap_or_else(|| panic!("expected to find field {} in {:?}", name, fields));
220 arrays.remove(idx);
221 let mut f = SchemaBuilder::from(&*fields);
222 f.remove(idx);
223 *fields = f.finish().fields;
224 }
225 MakeNull { name } => {
226 let (idx, _) = fields
227 .find(name)
228 .unwrap_or_else(|| panic!("expected to find field {} in {:?}", name, fields));
229 let array_len = arrays
230 .get(idx)
231 .expect("checked above that this exists")
232 .len();
233 arrays[idx] = Arc::new(NullArray::new(array_len));
234 let mut f = SchemaBuilder::from(&*fields);
235 let field = f.field_mut(idx);
236 *field = Arc::new(Field::new(name.clone(), DataType::Null, true));
237 *fields = f.finish().fields;
238 }
239 AlterFieldNullable { name } => {
240 let (idx, _) = fields
241 .find(name)
242 .unwrap_or_else(|| panic!("expected to find field {} in {:?}", name, fields));
243 let mut f = SchemaBuilder::from(&*fields);
244 let field = f.field_mut(idx);
245 assert_eq!(field.is_nullable(), false);
247 *field = Arc::new(Field::new(field.name(), field.data_type().clone(), true));
248 *fields = f.finish().fields;
249 }
250 Recurse { name, migration } => {
251 let (idx, _) = fields
252 .find(name)
253 .unwrap_or_else(|| panic!("expected to find field {} in {:?}", name, fields));
254 arrays[idx] = migration.migrate(Arc::clone(&arrays[idx]));
255 let mut f = SchemaBuilder::from(&*fields);
256 *f.field_mut(idx) = Arc::new(Field::new(
257 name,
258 arrays[idx].data_type().clone(),
259 f.field(idx).is_nullable(),
260 ));
261 *fields = f.finish().fields;
262 }
263 }
264 }
265}
266
267fn backward_compatible_typ(old: &DataType, new: &DataType) -> Option<ArrayMigration> {
268 use ArrayMigration::NoOp;
269 use DataType::*;
270 match (old, new) {
271 (Null, Struct(fields)) if fields.iter().all(|field| field.is_nullable()) => {
272 let migrations = fields
273 .iter()
274 .map(|field| StructArrayMigration::AddFieldNullableAtEnd {
275 name: field.name().clone(),
276 typ: field.data_type().clone(),
277 })
278 .collect();
279 Some(ArrayMigration::Struct(migrations))
280 }
281 (
282 Null | Boolean | Int8 | Int16 | Int32 | Int64 | UInt8 | UInt16 | UInt32 | UInt64
283 | Float16 | Float32 | Float64 | Binary | Utf8 | Date32 | Date64 | LargeBinary
284 | BinaryView | LargeUtf8 | Utf8View,
285 _,
286 ) => (old == new).then_some(NoOp),
287 (FixedSizeBinary(o), FixedSizeBinary(n)) => (o == n).then_some(NoOp),
288 (FixedSizeBinary(_), _) => None,
289 (Struct(o), Struct(n)) => backward_compatible_struct(o, n),
290 (Struct(_), _) => None,
291 (List(o), List(n)) | (Map(o, _), List(n)) => {
292 if o.is_nullable() && !n.is_nullable() {
296 None
297 } else {
298 let nested = backward_compatible_typ(o.data_type(), n.data_type())?;
299 let migration =
300 if matches!(old, DataType::List(_)) && o == n && nested == ArrayMigration::NoOp
301 {
302 ArrayMigration::NoOp
303 } else {
304 ArrayMigration::List(Arc::clone(n), nested.into())
305 };
306 Some(migration)
307 }
308 }
309 (List(_), _) => None,
310 (Map(o, _), Map(n, _)) => (o == n).then_some(NoOp),
311 (Map(_, _), _) => None,
312 (
313 Timestamp(_, _)
314 | Time32(_)
315 | Time64(_)
316 | Duration(_)
317 | Interval(_)
318 | ListView(_)
319 | FixedSizeList(_, _)
320 | LargeList(_)
321 | LargeListView(_)
322 | Union(_, _)
323 | Dictionary(_, _)
324 | Decimal128(_, _)
325 | Decimal256(_, _)
326 | RunEndEncoded(_, _),
327 _,
328 ) => unimplemented!("not used in mz: old={:?} new={:?}", old, new),
329 }
330}
331
332fn backward_compatible_struct(old: &Fields, new: &Fields) -> Option<ArrayMigration> {
333 use ArrayMigration::*;
334 use StructArrayMigration::*;
335
336 let mut added_field = false;
337 let mut field_migrations = Vec::new();
338 for n in new.iter() {
339 let o = old.find(n.name());
344 let o = match o {
345 Some(_) if added_field => return None,
349 Some((_, o)) => o,
350 None if !n.is_nullable() => return None,
352 None => {
353 added_field = true;
354 field_migrations.push(AddFieldNullableAtEnd {
355 name: n.name().to_owned(),
356 typ: n.data_type().clone(),
357 });
358 continue;
359 }
360 };
361
362 if o.is_nullable() && !n.is_nullable() {
364 return None;
365 }
366
367 if matches!(o.data_type(), DataType::Struct(_))
373 && o.is_nullable()
374 && n.data_type().is_null()
375 {
376 field_migrations.push(MakeNull {
377 name: n.name().clone(),
378 });
379 continue;
380 }
381
382 let make_nullable = !o.is_nullable() && n.is_nullable();
384
385 match backward_compatible_typ(o.data_type(), n.data_type()) {
386 None => return None,
387 Some(NoOp) if make_nullable => {
388 field_migrations.push(AlterFieldNullable {
389 name: n.name().clone(),
390 });
391 }
392 Some(NoOp) => continue,
393 Some(migration) => {
394 fn recursively_all_nullable(migration: &ArrayMigration) -> bool {
396 match migration {
397 NoOp => true,
398 List(_field, child) => recursively_all_nullable(child),
399 Struct(children) => children.iter().all(|child| match child {
400 AddFieldNullableAtEnd { .. } | DropField { .. } | MakeNull { .. } => {
401 false
402 }
403 AlterFieldNullable { .. } => true,
404 Recurse { migration, .. } => recursively_all_nullable(migration),
405 }),
406 }
407 }
408
409 if make_nullable {
416 if recursively_all_nullable(&migration) {
417 field_migrations.extend([
418 AlterFieldNullable {
419 name: n.name().clone(),
420 },
421 Recurse {
422 name: n.name().clone(),
423 migration,
424 },
425 ]);
426 } else {
427 return None;
428 }
429 } else {
430 field_migrations.push(Recurse {
431 name: n.name().clone(),
432 migration,
433 })
434 }
435 }
436 }
437 }
438
439 for o in old.iter() {
440 let n = new.find(o.name());
441 if n.is_none() {
442 field_migrations.push(DropField {
444 name: o.name().to_owned(),
445 });
446 }
447 }
448
449 let same_order = new
451 .iter()
452 .flat_map(|n| old.find(n.name()))
453 .tuple_windows()
454 .all(|((i, _), (j, _))| i <= j);
455 if !same_order {
456 return None;
457 }
458
459 if field_migrations.is_empty() {
460 Some(NoOp)
461 } else {
462 Some(Struct(field_migrations))
463 }
464}
465
466#[cfg(test)]
467mod tests {
468 use arrow::array::new_empty_array;
469 use arrow::datatypes::Field;
470
471 use super::*;
472
473 #[track_caller]
474 fn testcase(old: DataType, new: DataType, expected: Option<bool>) {
475 let migration = super::backward_compatible_typ(&old, &new);
476 let actual = migration.as_ref().map(|x| x.contains_drop());
477 assert_eq!(actual, expected);
478 if let Some(migration) = migration {
481 let (old, new) = (new_empty_array(&old), new_empty_array(&new));
482 let migrated = migration.migrate(old);
483 assert_eq!(new.data_type(), migrated.data_type());
484 }
485 }
486
487 fn struct_(fields: impl IntoIterator<Item = (&'static str, DataType, bool)>) -> DataType {
488 let fields = fields
489 .into_iter()
490 .map(|(name, typ, nullable)| Field::new(name, typ, nullable))
491 .collect();
492 DataType::Struct(fields)
493 }
494
495 #[mz_ore::test]
498 fn backward_compatible() {
499 use DataType::*;
500
501 testcase(Boolean, Boolean, Some(false));
503 testcase(Utf8, Utf8, Some(false));
504
505 testcase(Boolean, Utf8, None);
507 testcase(Utf8, Boolean, None);
508
509 testcase(
511 struct_([("a", Boolean, true)]),
512 struct_([("a", Boolean, true)]),
513 Some(false),
514 );
515 testcase(
516 struct_([("a", Boolean, false)]),
517 struct_([("a", Boolean, false)]),
518 Some(false),
519 );
520
521 testcase(
523 struct_([("a", Boolean, true)]),
524 struct_([("a", Boolean, false)]),
525 None,
526 );
527 testcase(
528 struct_([("a", Boolean, false)]),
529 struct_([("a", Boolean, true)]),
530 Some(false),
531 );
532
533 testcase(struct_([]), struct_([("a", Boolean, true)]), Some(false));
535 testcase(struct_([]), struct_([("a", Boolean, false)]), None);
536 testcase(struct_([("a", Boolean, true)]), struct_([]), Some(true));
537 testcase(struct_([("a", Boolean, false)]), struct_([]), Some(true));
538
539 testcase(
541 struct_([("a", Boolean, true)]),
542 struct_([("b", Boolean, true)]),
543 Some(true),
544 );
545
546 testcase(
548 struct_([]),
549 struct_([("a", Boolean, true), ("b", Boolean, true)]),
550 Some(false),
551 );
552
553 testcase(
555 struct_([("a", struct_([("b", Boolean, false)]), false)]),
556 struct_([("a", struct_([("b", Boolean, false)]), false)]),
557 Some(false),
558 );
559 testcase(
560 struct_([("a", struct_([]), false)]),
561 struct_([("a", struct_([("b", Boolean, true)]), false)]),
562 Some(false),
563 );
564 testcase(
565 struct_([("a", struct_([]), false)]),
566 struct_([("a", struct_([("b", Boolean, false)]), false)]),
567 None,
568 );
569 testcase(
570 struct_([("a", struct_([("b", Boolean, true)]), false)]),
571 struct_([("a", struct_([]), false)]),
572 Some(true),
573 );
574 testcase(
575 struct_([("a", struct_([("b", Boolean, false)]), false)]),
576 struct_([("a", struct_([]), false)]),
577 Some(true),
578 );
579
580 testcase(
584 struct_([("a", struct_([]), false)]),
585 struct_([("a", struct_([("b", Boolean, false)]), true)]),
586 None,
587 );
588
589 testcase(
593 struct_([("a", Boolean, false), ("b", Utf8, false)]),
594 struct_([("b", Utf8, false), ("a", Boolean, false)]),
595 None,
596 );
597
598 testcase(
601 struct_([("2", Boolean, false), ("10", Utf8, false)]),
602 struct_([("10", Utf8, false)]),
603 Some(true),
604 );
605
606 testcase(
609 struct_([("a", Boolean, true), ("c", Boolean, true)]),
610 struct_([
611 ("a", Boolean, true),
612 ("b", Boolean, true),
613 ("c", Boolean, true),
614 ]),
615 None,
616 );
617
618 testcase(Null, struct_([("a", Boolean, true)]), Some(false));
622
623 testcase(
625 Map(
626 Field::new_struct(
627 "map_entries",
628 vec![
629 Field::new("keys", Utf8, false),
630 Field::new("values", Boolean, true),
631 ],
632 false,
633 )
634 .into(),
635 true,
636 ),
637 List(
638 Field::new_struct(
639 "map_entries",
640 vec![
641 Field::new("keys", Utf8, false),
642 Field::new("values", Boolean, true),
643 ],
644 false,
645 )
646 .into(),
647 ),
648 Some(false),
649 );
650
651 testcase(
653 List(Field::new_struct("entries", vec![Field::new("keys", Utf8, false)], true).into()),
654 List(
655 Field::new_struct(
656 "entries",
657 vec![
658 Field::new("keys", Utf8, false),
659 Field::new("values", Boolean, true),
660 ],
661 true,
662 )
663 .into(),
664 ),
665 Some(false),
666 );
667
668 testcase(
670 struct_([("0", struct_([("foo", Utf8, false)]), false)]),
671 struct_([("0", struct_([("foo", Utf8, true)]), true)]),
672 Some(false),
673 )
674 }
675
676 #[mz_ore::test]
680 fn backwards_compatible_nested_types() {
681 use DataType::*;
682
683 testcase(
684 struct_([
685 (
686 "ok",
687 struct_([
688 (
689 "0",
690 List(
691 Field::new_struct(
692 "map_entries",
693 vec![
694 Field::new("key", Utf8, false),
695 Field::new("val", Int32, true),
696 ],
697 false,
698 )
699 .into(),
700 ),
701 true,
702 ),
703 (
704 "1",
705 List(
706 Field::new_struct(
707 "map_entries",
708 vec![
709 Field::new("key", Utf8, false),
710 Field::new("val", Int32, true),
711 ],
712 false,
713 )
714 .into(),
715 ),
716 false,
717 ),
718 (
719 "2",
720 List(
721 Field::new_list("item", Field::new_list_field(Int32, true), true)
722 .into(),
723 ),
724 true,
725 ),
726 (
727 "3",
728 List(
729 Field::new_list("item", Field::new_list_field(Int32, true), true)
730 .into(),
731 ),
732 false,
733 ),
734 ("4", struct_([("0", Int32, true), ("1", Utf8, true)]), true),
735 (
736 "5",
737 struct_([("0", Int32, false), ("1", Utf8, false)]),
738 false,
739 ),
740 ("6", Utf8, true),
741 (
742 "7",
743 struct_([
744 (
745 "dims",
746 List(Field::new_list_field(FixedSizeBinary(16), true).into()),
747 true,
748 ),
749 ("vals", List(Field::new_list_field(Utf8, true).into()), true),
750 ]),
751 false,
752 ),
753 ]),
754 true,
755 ),
756 ("err", Binary, true),
757 ]),
758 struct_([
759 (
760 "ok",
761 struct_([
762 (
763 "0",
764 List(
765 Field::new_struct(
766 "map_entries",
767 vec![
768 Field::new("key", Utf8, false),
769 Field::new("val", Int32, true),
770 ],
771 false,
772 )
773 .into(),
774 ),
775 true,
776 ),
777 (
778 "1",
779 List(
780 Field::new_struct(
781 "map_entries",
782 vec![
783 Field::new("key", Utf8, false),
784 Field::new("val", Int32, true),
785 ],
786 false,
787 )
788 .into(),
789 ),
790 true,
791 ),
792 (
793 "2",
794 List(
795 Field::new_list("item", Field::new_list_field(Int32, true), true)
796 .into(),
797 ),
798 true,
799 ),
800 (
801 "3",
802 List(
803 Field::new_list("item", Field::new_list_field(Int32, true), true)
804 .into(),
805 ),
806 true,
807 ),
808 ("4", struct_([("0", Int32, true), ("1", Utf8, true)]), true),
809 ("5", struct_([("0", Int32, true), ("1", Utf8, true)]), true),
810 ("6", Utf8, true),
811 (
812 "7",
813 struct_([
814 (
815 "dims",
816 List(Field::new_list_field(FixedSizeBinary(16), true).into()),
817 true,
818 ),
819 ("vals", List(Field::new_list_field(Utf8, true).into()), true),
820 ]),
821 true,
822 ),
823 ]),
824 true,
825 ),
826 ("err", Binary, true),
827 ]),
828 Some(false),
830 )
831 }
832}