1use std::str::FromStr;
13use std::sync::Arc;
14
15use arrow::array::{Array, AsArray, ListArray, NullArray, StructArray, new_null_array};
16use arrow::datatypes::{DataType, Field, FieldRef, Fields, SchemaBuilder};
17use itertools::Itertools;
18use mz_ore::cast::CastFrom;
19use mz_proto::{ProtoType, RustType, TryFromProtoError};
20use proptest_derive::Arbitrary;
21use serde::{Deserialize, Serialize};
22
23#[derive(
26 Debug,
27 Clone,
28 Copy,
29 PartialEq,
30 Eq,
31 PartialOrd,
32 Ord,
33 Serialize,
34 Deserialize,
35 Arbitrary
36)]
37#[serde(try_from = "String", into = "String")]
38pub struct SchemaId(pub usize);
39
40impl std::fmt::Display for SchemaId {
41 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42 write!(f, "h{}", self.0)
43 }
44}
45
46impl From<SchemaId> for String {
47 fn from(schema_id: SchemaId) -> Self {
48 schema_id.to_string()
49 }
50}
51
52impl TryFrom<String> for SchemaId {
53 type Error = String;
54 fn try_from(encoded: String) -> Result<Self, Self::Error> {
55 let encoded = match encoded.strip_prefix('h') {
56 Some(x) => x,
57 None => return Err(format!("invalid SchemaId {}: incorrect prefix", encoded)),
58 };
59 let schema_id = u64::from_str(encoded)
60 .map_err(|err| format!("invalid SchemaId {}: {}", encoded, err))?;
61 Ok(SchemaId(usize::cast_from(schema_id)))
62 }
63}
64
65impl RustType<u64> for SchemaId {
66 fn into_proto(&self) -> u64 {
67 self.0.into_proto()
68 }
69
70 fn from_proto(proto: u64) -> Result<Self, TryFromProtoError> {
71 Ok(SchemaId(proto.into_rust()?))
72 }
73}
74
75pub fn backward_compatible(old: &DataType, new: &DataType) -> Option<Migration> {
79 backward_compatible_typ(old, new).map(Migration)
80}
81
82#[derive(Debug, PartialEq)]
84pub struct Migration(ArrayMigration);
85
86impl Migration {
87 pub fn contains_drop(&self) -> bool {
90 self.0.contains_drop()
91 }
92
93 pub fn preserves_order(&self) -> bool {
95 !self.contains_drop()
98 }
99
100 pub fn migrate(&self, array: Arc<dyn Array>) -> Arc<dyn Array> {
103 self.0.migrate(array)
104 }
105}
106
107#[derive(Debug, PartialEq)]
108pub(crate) enum ArrayMigration {
109 NoOp,
110 Struct(Vec<StructArrayMigration>),
111 List(FieldRef, Box<ArrayMigration>),
112}
113
114#[derive(Debug, PartialEq)]
115pub(crate) enum StructArrayMigration {
116 AddFieldNullableAtEnd {
119 name: String,
120 typ: DataType,
121 },
122 DropField {
124 name: String,
125 },
126 MakeNull {
130 name: String,
131 },
132 AlterFieldNullable {
134 name: String,
135 },
136 Recurse {
137 name: String,
138 migration: ArrayMigration,
139 },
140}
141
142impl ArrayMigration {
143 fn contains_drop(&self) -> bool {
144 use ArrayMigration::*;
145 match self {
146 NoOp => false,
147 Struct(xs) => xs.iter().any(|x| x.contains_drop()),
148 List(_f, x) => x.contains_drop(),
149 }
150 }
151
152 fn migrate(&self, array: Arc<dyn Array>) -> Arc<dyn Array> {
153 use ArrayMigration::*;
154 match self {
155 NoOp => array,
156 Struct(migrations) => {
157 let len = array.len();
158
159 let (mut fields, mut arrays, nulls) = match array.data_type() {
160 DataType::Null => {
161 let all_add_nullable = migrations.iter().all(|action| {
162 matches!(action, StructArrayMigration::AddFieldNullableAtEnd { .. })
163 });
164 assert!(all_add_nullable, "invalid migrations, {migrations:?}");
165 (Fields::empty(), Vec::new(), None)
166 }
167 DataType::Struct(_) => {
168 let array = array
169 .as_any()
170 .downcast_ref::<StructArray>()
171 .expect("known to be StructArray")
172 .clone();
173 array.into_parts()
174 }
175 other => panic!("expected Struct or Null got {other:?}"),
176 };
177
178 for migration in migrations {
179 migration.migrate(len, &mut fields, &mut arrays);
180 }
181 assert_eq!(fields.len(), arrays.len(), "invalid migration");
182
183 let array = if arrays.is_empty() {
184 StructArray::new_empty_fields(len, nulls)
185 } else {
186 StructArray::new(fields, arrays, nulls)
187 };
188 Arc::new(array)
189 }
190 List(field, entry_migration) => {
191 let list_array: ListArray = if let Some(list_array) = array.as_list_opt() {
192 list_array.clone()
193 } else if let Some(map_array) = array.as_map_opt() {
194 map_array.clone().into()
195 } else {
196 panic!("expected list-like array; got {:?}", array.data_type())
197 };
198
199 let (_field, offsets, entries, nulls) = list_array.into_parts();
200 let entries = entry_migration.migrate(entries);
201 Arc::new(ListArray::new(Arc::clone(field), offsets, entries, nulls))
202 }
203 }
204 }
205}
206
207impl StructArrayMigration {
208 fn contains_drop(&self) -> bool {
209 use StructArrayMigration::*;
210 match self {
211 AddFieldNullableAtEnd { .. } => false,
212 DropField { .. } | MakeNull { .. } => true,
213 AlterFieldNullable { .. } => false,
214 Recurse { migration, .. } => migration.contains_drop(),
215 }
216 }
217
218 fn migrate(&self, len: usize, fields: &mut Fields, arrays: &mut Vec<Arc<dyn Array>>) {
219 use StructArrayMigration::*;
220 match self {
221 AddFieldNullableAtEnd { name, typ } => {
222 arrays.push(new_null_array(typ, len));
223 let mut f = SchemaBuilder::from(&*fields);
224 f.push(Arc::new(Field::new(name, typ.clone(), true)));
225 *fields = f.finish().fields;
226 }
227 DropField { name } => {
228 let (idx, _) = fields
229 .find(name)
230 .unwrap_or_else(|| panic!("expected to find field {} in {:?}", name, fields));
231 arrays.remove(idx);
232 let mut f = SchemaBuilder::from(&*fields);
233 f.remove(idx);
234 *fields = f.finish().fields;
235 }
236 MakeNull { name } => {
237 let (idx, _) = fields
238 .find(name)
239 .unwrap_or_else(|| panic!("expected to find field {} in {:?}", name, fields));
240 let array_len = arrays
241 .get(idx)
242 .expect("checked above that this exists")
243 .len();
244 arrays[idx] = Arc::new(NullArray::new(array_len));
245 let mut f = SchemaBuilder::from(&*fields);
246 let field = f.field_mut(idx);
247 *field = Arc::new(Field::new(name.clone(), DataType::Null, true));
248 *fields = f.finish().fields;
249 }
250 AlterFieldNullable { name } => {
251 let (idx, _) = fields
252 .find(name)
253 .unwrap_or_else(|| panic!("expected to find field {} in {:?}", name, fields));
254 let mut f = SchemaBuilder::from(&*fields);
255 let field = f.field_mut(idx);
256 assert_eq!(field.is_nullable(), false);
258 *field = Arc::new(Field::new(field.name(), field.data_type().clone(), true));
259 *fields = f.finish().fields;
260 }
261 Recurse { name, migration } => {
262 let (idx, _) = fields
263 .find(name)
264 .unwrap_or_else(|| panic!("expected to find field {} in {:?}", name, fields));
265 arrays[idx] = migration.migrate(Arc::clone(&arrays[idx]));
266 let mut f = SchemaBuilder::from(&*fields);
267 *f.field_mut(idx) = Arc::new(Field::new(
268 name,
269 arrays[idx].data_type().clone(),
270 f.field(idx).is_nullable(),
271 ));
272 *fields = f.finish().fields;
273 }
274 }
275 }
276}
277
278fn backward_compatible_typ(old: &DataType, new: &DataType) -> Option<ArrayMigration> {
279 use ArrayMigration::NoOp;
280 use DataType::*;
281 match (old, new) {
282 (Null, Struct(fields)) if fields.iter().all(|field| field.is_nullable()) => {
283 let migrations = fields
284 .iter()
285 .map(|field| StructArrayMigration::AddFieldNullableAtEnd {
286 name: field.name().clone(),
287 typ: field.data_type().clone(),
288 })
289 .collect();
290 Some(ArrayMigration::Struct(migrations))
291 }
292 (
293 Null | Boolean | Int8 | Int16 | Int32 | Int64 | UInt8 | UInt16 | UInt32 | UInt64
294 | Float16 | Float32 | Float64 | Binary | Utf8 | Date32 | Date64 | LargeBinary
295 | BinaryView | LargeUtf8 | Utf8View,
296 _,
297 ) => (old == new).then_some(NoOp),
298 (FixedSizeBinary(o), FixedSizeBinary(n)) => (o == n).then_some(NoOp),
299 (FixedSizeBinary(_), _) => None,
300 (Struct(o), Struct(n)) => backward_compatible_struct(o, n),
301 (Struct(_), _) => None,
302 (List(o), List(n)) | (Map(o, _), List(n)) => {
303 if o.is_nullable() && !n.is_nullable() {
307 None
308 } else {
309 let nested = backward_compatible_typ(o.data_type(), n.data_type())?;
310 let migration =
311 if matches!(old, DataType::List(_)) && o == n && nested == ArrayMigration::NoOp
312 {
313 ArrayMigration::NoOp
314 } else {
315 ArrayMigration::List(Arc::clone(n), nested.into())
316 };
317 Some(migration)
318 }
319 }
320 (List(_), _) => None,
321 (Map(o, _), Map(n, _)) => (o == n).then_some(NoOp),
322 (Map(_, _), _) => None,
323 (
324 Timestamp(_, _)
325 | Time32(_)
326 | Time64(_)
327 | Duration(_)
328 | Interval(_)
329 | ListView(_)
330 | FixedSizeList(_, _)
331 | LargeList(_)
332 | LargeListView(_)
333 | Union(_, _)
334 | Dictionary(_, _)
335 | Decimal32(_, _)
336 | Decimal64(_, _)
337 | Decimal128(_, _)
338 | Decimal256(_, _)
339 | RunEndEncoded(_, _),
340 _,
341 ) => unimplemented!("not used in mz: old={:?} new={:?}", old, new),
342 }
343}
344
345fn backward_compatible_struct(old: &Fields, new: &Fields) -> Option<ArrayMigration> {
346 use ArrayMigration::*;
347 use StructArrayMigration::*;
348
349 let mut added_field = false;
350 let mut field_migrations = Vec::new();
351 for n in new.iter() {
352 let o = old.find(n.name());
357 let o = match o {
358 Some(_) if added_field => return None,
362 Some((_, o)) => o,
363 None if !n.is_nullable() => return None,
365 None => {
366 added_field = true;
367 field_migrations.push(AddFieldNullableAtEnd {
368 name: n.name().to_owned(),
369 typ: n.data_type().clone(),
370 });
371 continue;
372 }
373 };
374
375 if o.is_nullable() && !n.is_nullable() {
377 return None;
378 }
379
380 if matches!(o.data_type(), DataType::Struct(_))
386 && o.is_nullable()
387 && n.data_type().is_null()
388 {
389 field_migrations.push(MakeNull {
390 name: n.name().clone(),
391 });
392 continue;
393 }
394
395 let make_nullable = !o.is_nullable() && n.is_nullable();
397
398 match backward_compatible_typ(o.data_type(), n.data_type()) {
399 None => return None,
400 Some(NoOp) if make_nullable => {
401 field_migrations.push(AlterFieldNullable {
402 name: n.name().clone(),
403 });
404 }
405 Some(NoOp) => continue,
406 Some(migration) => {
407 fn recursively_all_nullable(migration: &ArrayMigration) -> bool {
409 match migration {
410 NoOp => true,
411 List(_field, child) => recursively_all_nullable(child),
412 Struct(children) => children.iter().all(|child| match child {
413 AddFieldNullableAtEnd { .. } | DropField { .. } | MakeNull { .. } => {
414 false
415 }
416 AlterFieldNullable { .. } => true,
417 Recurse { migration, .. } => recursively_all_nullable(migration),
418 }),
419 }
420 }
421
422 if make_nullable {
429 if recursively_all_nullable(&migration) {
430 field_migrations.extend([
431 AlterFieldNullable {
432 name: n.name().clone(),
433 },
434 Recurse {
435 name: n.name().clone(),
436 migration,
437 },
438 ]);
439 } else {
440 return None;
441 }
442 } else {
443 field_migrations.push(Recurse {
444 name: n.name().clone(),
445 migration,
446 })
447 }
448 }
449 }
450 }
451
452 for o in old.iter() {
453 let n = new.find(o.name());
454 if n.is_none() {
455 field_migrations.push(DropField {
457 name: o.name().to_owned(),
458 });
459 }
460 }
461
462 let same_order = new
464 .iter()
465 .flat_map(|n| old.find(n.name()))
466 .tuple_windows()
467 .all(|((i, _), (j, _))| i <= j);
468 if !same_order {
469 return None;
470 }
471
472 if field_migrations.is_empty() {
473 Some(NoOp)
474 } else {
475 Some(Struct(field_migrations))
476 }
477}
478
479#[cfg(test)]
480mod tests {
481 use arrow::array::new_empty_array;
482 use arrow::datatypes::Field;
483
484 use super::*;
485
486 #[track_caller]
487 fn testcase(old: DataType, new: DataType, expected: Option<bool>) {
488 let migration = super::backward_compatible_typ(&old, &new);
489 let actual = migration.as_ref().map(|x| x.contains_drop());
490 assert_eq!(actual, expected);
491 if let Some(migration) = migration {
494 let (old, new) = (new_empty_array(&old), new_empty_array(&new));
495 let migrated = migration.migrate(old);
496 assert_eq!(new.data_type(), migrated.data_type());
497 }
498 }
499
500 fn struct_(fields: impl IntoIterator<Item = (&'static str, DataType, bool)>) -> DataType {
501 let fields = fields
502 .into_iter()
503 .map(|(name, typ, nullable)| Field::new(name, typ, nullable))
504 .collect();
505 DataType::Struct(fields)
506 }
507
508 #[mz_ore::test]
511 fn backward_compatible() {
512 use DataType::*;
513
514 testcase(Boolean, Boolean, Some(false));
516 testcase(Utf8, Utf8, Some(false));
517
518 testcase(Boolean, Utf8, None);
520 testcase(Utf8, Boolean, None);
521
522 testcase(
524 struct_([("a", Boolean, true)]),
525 struct_([("a", Boolean, true)]),
526 Some(false),
527 );
528 testcase(
529 struct_([("a", Boolean, false)]),
530 struct_([("a", Boolean, false)]),
531 Some(false),
532 );
533
534 testcase(
536 struct_([("a", Boolean, true)]),
537 struct_([("a", Boolean, false)]),
538 None,
539 );
540 testcase(
541 struct_([("a", Boolean, false)]),
542 struct_([("a", Boolean, true)]),
543 Some(false),
544 );
545
546 testcase(struct_([]), struct_([("a", Boolean, true)]), Some(false));
548 testcase(struct_([]), struct_([("a", Boolean, false)]), None);
549 testcase(struct_([("a", Boolean, true)]), struct_([]), Some(true));
550 testcase(struct_([("a", Boolean, false)]), struct_([]), Some(true));
551
552 testcase(
554 struct_([("a", Boolean, true)]),
555 struct_([("b", Boolean, true)]),
556 Some(true),
557 );
558
559 testcase(
561 struct_([]),
562 struct_([("a", Boolean, true), ("b", Boolean, true)]),
563 Some(false),
564 );
565
566 testcase(
568 struct_([("a", struct_([("b", Boolean, false)]), false)]),
569 struct_([("a", struct_([("b", Boolean, false)]), false)]),
570 Some(false),
571 );
572 testcase(
573 struct_([("a", struct_([]), false)]),
574 struct_([("a", struct_([("b", Boolean, true)]), false)]),
575 Some(false),
576 );
577 testcase(
578 struct_([("a", struct_([]), false)]),
579 struct_([("a", struct_([("b", Boolean, false)]), false)]),
580 None,
581 );
582 testcase(
583 struct_([("a", struct_([("b", Boolean, true)]), false)]),
584 struct_([("a", struct_([]), false)]),
585 Some(true),
586 );
587 testcase(
588 struct_([("a", struct_([("b", Boolean, false)]), false)]),
589 struct_([("a", struct_([]), false)]),
590 Some(true),
591 );
592
593 testcase(
597 struct_([("a", struct_([]), false)]),
598 struct_([("a", struct_([("b", Boolean, false)]), true)]),
599 None,
600 );
601
602 testcase(
606 struct_([("a", Boolean, false), ("b", Utf8, false)]),
607 struct_([("b", Utf8, false), ("a", Boolean, false)]),
608 None,
609 );
610
611 testcase(
614 struct_([("2", Boolean, false), ("10", Utf8, false)]),
615 struct_([("10", Utf8, false)]),
616 Some(true),
617 );
618
619 testcase(
622 struct_([("a", Boolean, true), ("c", Boolean, true)]),
623 struct_([
624 ("a", Boolean, true),
625 ("b", Boolean, true),
626 ("c", Boolean, true),
627 ]),
628 None,
629 );
630
631 testcase(Null, struct_([("a", Boolean, true)]), Some(false));
635
636 testcase(
638 Map(
639 Field::new_struct(
640 "map_entries",
641 vec![
642 Field::new("keys", Utf8, false),
643 Field::new("values", Boolean, true),
644 ],
645 false,
646 )
647 .into(),
648 true,
649 ),
650 List(
651 Field::new_struct(
652 "map_entries",
653 vec![
654 Field::new("keys", Utf8, false),
655 Field::new("values", Boolean, true),
656 ],
657 false,
658 )
659 .into(),
660 ),
661 Some(false),
662 );
663
664 testcase(
666 List(Field::new_struct("entries", vec![Field::new("keys", Utf8, false)], true).into()),
667 List(
668 Field::new_struct(
669 "entries",
670 vec![
671 Field::new("keys", Utf8, false),
672 Field::new("values", Boolean, true),
673 ],
674 true,
675 )
676 .into(),
677 ),
678 Some(false),
679 );
680
681 testcase(
683 struct_([("0", struct_([("foo", Utf8, false)]), false)]),
684 struct_([("0", struct_([("foo", Utf8, true)]), true)]),
685 Some(false),
686 )
687 }
688
689 #[mz_ore::test]
693 fn backwards_compatible_nested_types() {
694 use DataType::*;
695
696 testcase(
697 struct_([
698 (
699 "ok",
700 struct_([
701 (
702 "0",
703 List(
704 Field::new_struct(
705 "map_entries",
706 vec![
707 Field::new("key", Utf8, false),
708 Field::new("val", Int32, true),
709 ],
710 false,
711 )
712 .into(),
713 ),
714 true,
715 ),
716 (
717 "1",
718 List(
719 Field::new_struct(
720 "map_entries",
721 vec![
722 Field::new("key", Utf8, false),
723 Field::new("val", Int32, true),
724 ],
725 false,
726 )
727 .into(),
728 ),
729 false,
730 ),
731 (
732 "2",
733 List(
734 Field::new_list("item", Field::new_list_field(Int32, true), true)
735 .into(),
736 ),
737 true,
738 ),
739 (
740 "3",
741 List(
742 Field::new_list("item", Field::new_list_field(Int32, true), true)
743 .into(),
744 ),
745 false,
746 ),
747 ("4", struct_([("0", Int32, true), ("1", Utf8, true)]), true),
748 (
749 "5",
750 struct_([("0", Int32, false), ("1", Utf8, false)]),
751 false,
752 ),
753 ("6", Utf8, true),
754 (
755 "7",
756 struct_([
757 (
758 "dims",
759 List(Field::new_list_field(FixedSizeBinary(16), true).into()),
760 true,
761 ),
762 ("vals", List(Field::new_list_field(Utf8, true).into()), true),
763 ]),
764 false,
765 ),
766 ]),
767 true,
768 ),
769 ("err", Binary, true),
770 ]),
771 struct_([
772 (
773 "ok",
774 struct_([
775 (
776 "0",
777 List(
778 Field::new_struct(
779 "map_entries",
780 vec![
781 Field::new("key", Utf8, false),
782 Field::new("val", Int32, true),
783 ],
784 false,
785 )
786 .into(),
787 ),
788 true,
789 ),
790 (
791 "1",
792 List(
793 Field::new_struct(
794 "map_entries",
795 vec![
796 Field::new("key", Utf8, false),
797 Field::new("val", Int32, true),
798 ],
799 false,
800 )
801 .into(),
802 ),
803 true,
804 ),
805 (
806 "2",
807 List(
808 Field::new_list("item", Field::new_list_field(Int32, true), true)
809 .into(),
810 ),
811 true,
812 ),
813 (
814 "3",
815 List(
816 Field::new_list("item", Field::new_list_field(Int32, true), true)
817 .into(),
818 ),
819 true,
820 ),
821 ("4", struct_([("0", Int32, true), ("1", Utf8, true)]), true),
822 ("5", struct_([("0", Int32, true), ("1", Utf8, true)]), true),
823 ("6", Utf8, true),
824 (
825 "7",
826 struct_([
827 (
828 "dims",
829 List(Field::new_list_field(FixedSizeBinary(16), true).into()),
830 true,
831 ),
832 ("vals", List(Field::new_list_field(Utf8, true).into()), true),
833 ]),
834 true,
835 ),
836 ]),
837 true,
838 ),
839 ("err", Binary, true),
840 ]),
841 Some(false),
843 )
844 }
845}