use std::str::FromStr;
use std::sync::Arc;
use arrow::array::{new_null_array, Array, AsArray, ListArray, StructArray};
use arrow::datatypes::{DataType, Field, FieldRef, Fields, SchemaBuilder};
use itertools::Itertools;
use mz_ore::cast::CastFrom;
use mz_proto::{ProtoType, RustType, TryFromProtoError};
use proptest_derive::Arbitrary;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize, Arbitrary)]
#[serde(try_from = "String", into = "String")]
pub struct SchemaId(pub usize);
impl std::fmt::Display for SchemaId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "h{}", self.0)
}
}
impl From<SchemaId> for String {
fn from(schema_id: SchemaId) -> Self {
schema_id.to_string()
}
}
impl TryFrom<String> for SchemaId {
type Error = String;
fn try_from(encoded: String) -> Result<Self, Self::Error> {
let encoded = match encoded.strip_prefix('h') {
Some(x) => x,
None => return Err(format!("invalid SchemaId {}: incorrect prefix", encoded)),
};
let schema_id = u64::from_str(encoded)
.map_err(|err| format!("invalid SchemaId {}: {}", encoded, err))?;
Ok(SchemaId(usize::cast_from(schema_id)))
}
}
impl RustType<u64> for SchemaId {
fn into_proto(&self) -> u64 {
self.0.into_proto()
}
fn from_proto(proto: u64) -> Result<Self, TryFromProtoError> {
Ok(SchemaId(proto.into_rust()?))
}
}
pub fn backward_compatible(old: &DataType, new: &DataType) -> Option<Migration> {
backward_compatible_typ(old, new).map(Migration)
}
#[derive(Debug, PartialEq)]
pub struct Migration(ArrayMigration);
impl Migration {
pub fn contains_drop(&self) -> bool {
self.0.contains_drop()
}
pub fn preserves_order(&self) -> bool {
!self.contains_drop()
}
pub fn migrate(&self, array: Arc<dyn Array>) -> Arc<dyn Array> {
self.0.migrate(array)
}
}
#[derive(Debug, PartialEq)]
pub(crate) enum ArrayMigration {
NoOp,
Struct(Vec<StructArrayMigration>),
List(FieldRef, Box<ArrayMigration>),
}
#[derive(Debug, PartialEq)]
pub(crate) enum StructArrayMigration {
AddFieldNullableAtEnd {
name: String,
typ: DataType,
},
DropField {
name: String,
},
AlterFieldNullable {
name: String,
},
Recurse {
name: String,
migration: ArrayMigration,
},
}
impl ArrayMigration {
fn contains_drop(&self) -> bool {
use ArrayMigration::*;
match self {
NoOp => false,
Struct(xs) => xs.iter().any(|x| x.contains_drop()),
List(_f, x) => x.contains_drop(),
}
}
fn migrate(&self, array: Arc<dyn Array>) -> Arc<dyn Array> {
use ArrayMigration::*;
match self {
NoOp => array,
Struct(migrations) => {
let len = array.len();
let (mut fields, mut arrays, nulls) = match array.data_type() {
DataType::Null => {
let all_add_nullable = migrations.iter().all(|action| {
matches!(action, StructArrayMigration::AddFieldNullableAtEnd { .. })
});
assert!(all_add_nullable, "invalid migrations, {migrations:?}");
(Fields::empty(), Vec::new(), None)
}
DataType::Struct(_) => {
let array = array
.as_any()
.downcast_ref::<StructArray>()
.expect("known to be StructArray")
.clone();
array.into_parts()
}
other => panic!("expected Struct or Null got {other:?}"),
};
for migration in migrations {
migration.migrate(len, &mut fields, &mut arrays);
}
Arc::new(StructArray::new(fields, arrays, nulls))
}
List(field, entry_migration) => {
let list_array: ListArray = if let Some(list_array) = array.as_list_opt() {
list_array.clone()
} else if let Some(map_array) = array.as_map_opt() {
map_array.clone().into()
} else {
panic!("expected list-like array; got {:?}", array.data_type())
};
let (_field, offsets, entries, nulls) = list_array.into_parts();
let entries = entry_migration.migrate(entries);
Arc::new(ListArray::new(Arc::clone(field), offsets, entries, nulls))
}
}
}
}
impl StructArrayMigration {
fn contains_drop(&self) -> bool {
use StructArrayMigration::*;
match self {
AddFieldNullableAtEnd { .. } => false,
DropField { .. } => true,
AlterFieldNullable { .. } => false,
Recurse { migration, .. } => migration.contains_drop(),
}
}
fn migrate(&self, len: usize, fields: &mut Fields, arrays: &mut Vec<Arc<dyn Array>>) {
use StructArrayMigration::*;
match self {
AddFieldNullableAtEnd { name, typ } => {
arrays.push(new_null_array(typ, len));
let mut f = SchemaBuilder::from(&*fields);
f.push(Arc::new(Field::new(name, typ.clone(), true)));
*fields = f.finish().fields;
}
DropField { name } => {
let (idx, _) = fields
.find(name)
.unwrap_or_else(|| panic!("expected to find field {} in {:?}", name, fields));
arrays.remove(idx);
let mut f = SchemaBuilder::from(&*fields);
f.remove(idx);
*fields = f.finish().fields;
}
AlterFieldNullable { name } => {
let (idx, _) = fields
.find(name)
.unwrap_or_else(|| panic!("expected to find field {} in {:?}", name, fields));
let mut f = SchemaBuilder::from(&*fields);
let field = f.field_mut(idx);
assert_eq!(field.is_nullable(), false);
*field = Arc::new(Field::new(field.name(), field.data_type().clone(), true));
*fields = f.finish().fields;
}
Recurse { name, migration } => {
let (idx, _) = fields
.find(name)
.unwrap_or_else(|| panic!("expected to find field {} in {:?}", name, fields));
arrays[idx] = migration.migrate(Arc::clone(&arrays[idx]));
let mut f = SchemaBuilder::from(&*fields);
*f.field_mut(idx) = Arc::new(Field::new(
name,
arrays[idx].data_type().clone(),
f.field(idx).is_nullable(),
));
*fields = f.finish().fields;
}
}
}
}
fn backward_compatible_typ(old: &DataType, new: &DataType) -> Option<ArrayMigration> {
use ArrayMigration::NoOp;
use DataType::*;
match (old, new) {
(Null, Struct(fields)) if fields.iter().all(|field| field.is_nullable()) => {
let migrations = fields
.iter()
.map(|field| StructArrayMigration::AddFieldNullableAtEnd {
name: field.name().clone(),
typ: field.data_type().clone(),
})
.collect();
Some(ArrayMigration::Struct(migrations))
}
(
Null | Boolean | Int8 | Int16 | Int32 | Int64 | UInt8 | UInt16 | UInt32 | UInt64
| Float16 | Float32 | Float64 | Binary | Utf8 | Date32 | Date64 | LargeBinary
| BinaryView | LargeUtf8 | Utf8View,
_,
) => (old == new).then_some(NoOp),
(FixedSizeBinary(o), FixedSizeBinary(n)) => (o == n).then_some(NoOp),
(FixedSizeBinary(_), _) => None,
(Struct(o), Struct(n)) => backward_compatible_struct(o, n),
(Struct(_), _) => None,
(List(o), List(n)) | (Map(o, _), List(n)) => {
if o.is_nullable() && !n.is_nullable() {
None
} else {
let nested = backward_compatible_typ(o.data_type(), n.data_type())?;
let migration =
if matches!(old, DataType::List(_)) && o == n && nested == ArrayMigration::NoOp
{
ArrayMigration::NoOp
} else {
ArrayMigration::List(Arc::clone(n), nested.into())
};
Some(migration)
}
}
(List(_), _) => None,
(Map(o, _), Map(n, _)) => (o == n).then_some(NoOp),
(Map(_, _), _) => None,
(
Timestamp(_, _)
| Time32(_)
| Time64(_)
| Duration(_)
| Interval(_)
| ListView(_)
| FixedSizeList(_, _)
| LargeList(_)
| LargeListView(_)
| Union(_, _)
| Dictionary(_, _)
| Decimal128(_, _)
| Decimal256(_, _)
| RunEndEncoded(_, _),
_,
) => unimplemented!("not used in mz: old={:?} new={:?}", old, new),
}
}
fn backward_compatible_struct(old: &Fields, new: &Fields) -> Option<ArrayMigration> {
use ArrayMigration::*;
use StructArrayMigration::*;
let mut added_field = false;
let mut field_migrations = Vec::new();
for n in new.iter() {
let o = old.find(n.name());
let o = match o {
Some(_) if added_field => return None,
Some((_, o)) => o,
None if !n.is_nullable() => return None,
None => {
added_field = true;
field_migrations.push(AddFieldNullableAtEnd {
name: n.name().to_owned(),
typ: n.data_type().clone(),
});
continue;
}
};
if o.is_nullable() && !n.is_nullable() {
return None;
}
let make_nullable = !o.is_nullable() && n.is_nullable();
match backward_compatible_typ(o.data_type(), n.data_type()) {
None => return None,
Some(NoOp) if make_nullable => {
field_migrations.push(AlterFieldNullable {
name: n.name().clone(),
});
}
Some(NoOp) => continue,
Some(_) if make_nullable => return None,
Some(migration) => field_migrations.push(Recurse {
name: n.name().clone(),
migration,
}),
}
}
for o in old.iter() {
let n = new.find(o.name());
if n.is_none() {
field_migrations.push(DropField {
name: o.name().to_owned(),
});
}
}
let same_order = new
.iter()
.flat_map(|n| old.find(n.name()))
.tuple_windows()
.all(|((i, _), (j, _))| i <= j);
if !same_order {
return None;
}
if field_migrations.is_empty() {
Some(NoOp)
} else {
Some(Struct(field_migrations))
}
}
#[cfg(test)]
mod tests {
use arrow::array::new_empty_array;
use arrow::datatypes::Field;
use super::*;
#[mz_ore::test]
fn backward_compatible() {
use DataType::*;
#[track_caller]
fn testcase(old: DataType, new: DataType, expected: Option<bool>) {
let migration = super::backward_compatible_typ(&old, &new);
let actual = migration.as_ref().map(|x| x.contains_drop());
assert_eq!(actual, expected);
if let Some(migration) = migration {
let (old, new) = (new_empty_array(&old), new_empty_array(&new));
let migrated = migration.migrate(old);
assert_eq!(new.data_type(), migrated.data_type());
}
}
fn struct_(fields: impl IntoIterator<Item = (&'static str, DataType, bool)>) -> DataType {
let fields = fields
.into_iter()
.map(|(name, typ, nullable)| Field::new(name, typ, nullable))
.collect();
DataType::Struct(fields)
}
testcase(Boolean, Boolean, Some(false));
testcase(Utf8, Utf8, Some(false));
testcase(Boolean, Utf8, None);
testcase(Utf8, Boolean, None);
testcase(
struct_([("a", Boolean, true)]),
struct_([("a", Boolean, true)]),
Some(false),
);
testcase(
struct_([("a", Boolean, false)]),
struct_([("a", Boolean, false)]),
Some(false),
);
testcase(
struct_([("a", Boolean, true)]),
struct_([("a", Boolean, false)]),
None,
);
testcase(
struct_([("a", Boolean, false)]),
struct_([("a", Boolean, true)]),
Some(false),
);
testcase(struct_([]), struct_([("a", Boolean, true)]), Some(false));
testcase(struct_([]), struct_([("a", Boolean, false)]), None);
testcase(struct_([("a", Boolean, true)]), struct_([]), Some(true));
testcase(struct_([("a", Boolean, false)]), struct_([]), Some(true));
testcase(
struct_([("a", Boolean, true)]),
struct_([("b", Boolean, true)]),
Some(true),
);
testcase(
struct_([]),
struct_([("a", Boolean, true), ("b", Boolean, true)]),
Some(false),
);
testcase(
struct_([("a", struct_([("b", Boolean, false)]), false)]),
struct_([("a", struct_([("b", Boolean, false)]), false)]),
Some(false),
);
testcase(
struct_([("a", struct_([]), false)]),
struct_([("a", struct_([("b", Boolean, true)]), false)]),
Some(false),
);
testcase(
struct_([("a", struct_([]), false)]),
struct_([("a", struct_([("b", Boolean, false)]), false)]),
None,
);
testcase(
struct_([("a", struct_([("b", Boolean, true)]), false)]),
struct_([("a", struct_([]), false)]),
Some(true),
);
testcase(
struct_([("a", struct_([("b", Boolean, false)]), false)]),
struct_([("a", struct_([]), false)]),
Some(true),
);
testcase(
struct_([("a", struct_([]), false)]),
struct_([("a", struct_([("b", Boolean, false)]), true)]),
None,
);
testcase(
struct_([("a", Boolean, false), ("b", Utf8, false)]),
struct_([("b", Utf8, false), ("a", Boolean, false)]),
None,
);
testcase(
struct_([("2", Boolean, false), ("10", Utf8, false)]),
struct_([("10", Utf8, false)]),
Some(true),
);
testcase(
struct_([("a", Boolean, true), ("c", Boolean, true)]),
struct_([
("a", Boolean, true),
("b", Boolean, true),
("c", Boolean, true),
]),
None,
);
testcase(Null, struct_([("a", Boolean, true)]), Some(false));
testcase(
Map(
Field::new_struct(
"map_entries",
vec![
Field::new("keys", Utf8, false),
Field::new("values", Boolean, true),
],
false,
)
.into(),
true,
),
List(
Field::new_struct(
"map_entries",
vec![
Field::new("keys", Utf8, false),
Field::new("values", Boolean, true),
],
false,
)
.into(),
),
Some(false),
);
testcase(
List(Field::new_struct("entries", vec![Field::new("keys", Utf8, false)], true).into()),
List(
Field::new_struct(
"entries",
vec![
Field::new("keys", Utf8, false),
Field::new("values", Boolean, true),
],
true,
)
.into(),
),
Some(false),
);
}
}