use std::collections::BTreeMap;
use std::fmt::Debug;
use std::sync::{Arc, RwLock};
use std::time::Instant;
use differential_dataflow::difference::Semigroup;
use differential_dataflow::lattice::Lattice;
use mz_ore::cast::CastFrom;
use mz_persist_types::columnar::data_type;
use mz_persist_types::schema::{backward_compatible, Migration, SchemaId};
use mz_persist_types::{Codec, Codec64};
use timely::progress::Timestamp;
use crate::internal::apply::Applier;
use crate::internal::encoding::Schemas;
use crate::internal::metrics::{SchemaCacheMetrics, SchemaMetrics};
use crate::internal::state::EncodedSchemas;
#[derive(Debug)]
#[cfg_attr(test, derive(PartialEq))]
pub enum CaESchema<K: Codec, V: Codec> {
Ok(SchemaId),
Incompatible,
ExpectedMismatch {
schema_id: SchemaId,
key: K::Schema,
val: V::Schema,
},
}
#[derive(Debug)]
pub(crate) struct SchemaCache<K: Codec, V: Codec, T, D> {
maps: Arc<SchemaCacheMaps<K, V>>,
applier: Applier<K, V, T, D>,
key_migration_by_ids: MigrationCacheMap,
val_migration_by_ids: MigrationCacheMap,
}
impl<K: Codec, V: Codec, T: Clone, D> Clone for SchemaCache<K, V, T, D> {
fn clone(&self) -> Self {
Self {
maps: Arc::clone(&self.maps),
applier: self.applier.clone(),
key_migration_by_ids: self.key_migration_by_ids.clone(),
val_migration_by_ids: self.val_migration_by_ids.clone(),
}
}
}
impl<K: Codec, V: Codec, T, D> Drop for SchemaCache<K, V, T, D> {
fn drop(&mut self) {
let dropped = u64::cast_from(
self.key_migration_by_ids.by_ids.len() + self.val_migration_by_ids.by_ids.len(),
);
self.applier
.metrics
.schema
.cache_migration
.dropped_count
.inc_by(dropped);
}
}
impl<K, V, T, D> SchemaCache<K, V, T, D>
where
K: Debug + Codec,
V: Debug + Codec,
T: Timestamp + Lattice + Codec64 + Sync,
D: Semigroup + Codec64,
{
pub fn new(maps: Arc<SchemaCacheMaps<K, V>>, applier: Applier<K, V, T, D>) -> Self {
let key_migration_by_ids = MigrationCacheMap {
metrics: applier.metrics.schema.cache_migration.clone(),
by_ids: BTreeMap::new(),
};
let val_migration_by_ids = MigrationCacheMap {
metrics: applier.metrics.schema.cache_migration.clone(),
by_ids: BTreeMap::new(),
};
SchemaCache {
maps,
applier,
key_migration_by_ids,
val_migration_by_ids,
}
}
async fn schemas(&self, id: &SchemaId) -> Option<Schemas<K, V>> {
let key = self
.get_or_try_init(&self.maps.key_by_id, id, |schemas| {
self.maps.key_by_id.metrics.computed_count.inc();
schemas.get(id).map(|x| K::decode_schema(&x.key))
})
.await?;
let val = self
.get_or_try_init(&self.maps.val_by_id, id, |schemas| {
self.maps.val_by_id.metrics.computed_count.inc();
schemas.get(id).map(|x| V::decode_schema(&x.val))
})
.await?;
Some(Schemas {
id: Some(*id),
key,
val,
})
}
fn key_migration(
&mut self,
write: &Schemas<K, V>,
read: &Schemas<K, V>,
) -> Option<Arc<Migration>> {
let migration_fn = || Self::migration::<K>(&write.key, &read.key);
let (Some(write_id), Some(read_id)) = (write.id, read.id) else {
self.key_migration_by_ids.metrics.computed_count.inc();
return migration_fn().map(Arc::new);
};
self.key_migration_by_ids
.get_or_try_insert(write_id, read_id, migration_fn)
}
fn val_migration(
&mut self,
write: &Schemas<K, V>,
read: &Schemas<K, V>,
) -> Option<Arc<Migration>> {
let migration_fn = || Self::migration::<V>(&write.val, &read.val);
let (Some(write_id), Some(read_id)) = (write.id, read.id) else {
self.val_migration_by_ids.metrics.computed_count.inc();
return migration_fn().map(Arc::new);
};
self.val_migration_by_ids
.get_or_try_insert(write_id, read_id, migration_fn)
}
fn migration<C: Codec>(write: &C::Schema, read: &C::Schema) -> Option<Migration> {
let write_dt = data_type::<C>(write).expect("valid schema");
let read_dt = data_type::<C>(read).expect("valid schema");
backward_compatible(&write_dt, &read_dt)
}
async fn get_or_try_init<MK: Clone + Ord, MV: PartialEq + Debug>(
&self,
map: &SchemaCacheMap<MK, MV>,
key: &MK,
f: impl Fn(&BTreeMap<SchemaId, EncodedSchemas>) -> Option<MV>,
) -> Option<Arc<MV>> {
let ret = map.get_or_try_init(key, || {
self.applier
.schemas(|seqno, schemas| f(schemas).ok_or(seqno))
});
let seqno = match ret {
Ok(ret) => return Some(ret),
Err(seqno) => seqno,
};
self.applier.metrics.schema.cache_fetch_state_count.inc();
self.applier.fetch_and_update_state(Some(seqno)).await;
map.get_or_try_init(key, || {
self.applier
.schemas(|seqno, schemas| f(schemas).ok_or(seqno))
})
.ok()
}
}
#[derive(Debug)]
pub(crate) struct SchemaCacheMaps<K: Codec, V: Codec> {
key_by_id: SchemaCacheMap<SchemaId, K::Schema>,
val_by_id: SchemaCacheMap<SchemaId, V::Schema>,
}
impl<K: Codec, V: Codec> SchemaCacheMaps<K, V> {
pub(crate) fn new(metrics: &SchemaMetrics) -> Self {
Self {
key_by_id: SchemaCacheMap {
metrics: metrics.cache_schema.clone(),
map: RwLock::new(BTreeMap::new()),
},
val_by_id: SchemaCacheMap {
metrics: metrics.cache_schema.clone(),
map: RwLock::new(BTreeMap::new()),
},
}
}
}
#[derive(Debug)]
struct SchemaCacheMap<I, S> {
metrics: SchemaCacheMetrics,
map: RwLock<BTreeMap<I, Arc<S>>>,
}
impl<I: Clone + Ord, S: PartialEq + Debug> SchemaCacheMap<I, S> {
fn get_or_try_init<E>(
&self,
id: &I,
state_fn: impl FnOnce() -> Result<S, E>,
) -> Result<Arc<S>, E> {
{
let map = self.map.read().expect("lock");
if let Some(ret) = map.get(id).map(Arc::clone) {
self.metrics.cached_count.inc();
return Ok(ret);
}
}
let ret = state_fn().map(Arc::new);
if let Ok(val) = ret.as_ref() {
let mut map = self.map.write().expect("lock");
let prev = map.insert(id.clone(), Arc::clone(val));
match prev {
Some(prev) => debug_assert_eq!(*val, prev),
None => self.metrics.added_count.inc(),
}
} else {
self.metrics.unavailable_count.inc();
}
ret
}
}
impl<I, K> Drop for SchemaCacheMap<I, K> {
fn drop(&mut self) {
let map = self.map.read().expect("lock");
self.metrics.dropped_count.inc_by(u64::cast_from(map.len()));
}
}
#[derive(Debug, Clone)]
struct MigrationCacheMap {
metrics: SchemaCacheMetrics,
by_ids: BTreeMap<(SchemaId, SchemaId), Arc<Migration>>,
}
impl MigrationCacheMap {
fn get_or_try_insert(
&mut self,
write_id: SchemaId,
read_id: SchemaId,
migration_fn: impl FnOnce() -> Option<Migration>,
) -> Option<Arc<Migration>> {
if let Some(migration) = self.by_ids.get(&(write_id, read_id)) {
self.metrics.cached_count.inc();
return Some(Arc::clone(migration));
};
self.metrics.computed_count.inc();
let migration = migration_fn().map(Arc::new);
if let Some(migration) = migration.as_ref() {
self.metrics.added_count.inc();
self.by_ids
.insert((write_id, read_id), Arc::clone(migration));
} else {
self.metrics.unavailable_count.inc();
}
migration
}
}
#[derive(Debug)]
pub(crate) enum PartMigration<K: Codec, V: Codec> {
SameSchema { both: Schemas<K, V> },
Codec { read: Schemas<K, V> },
Either {
_write: Schemas<K, V>,
read: Schemas<K, V>,
key_migration: Arc<Migration>,
val_migration: Arc<Migration>,
},
}
impl<K: Codec, V: Codec> Clone for PartMigration<K, V> {
fn clone(&self) -> Self {
match self {
Self::SameSchema { both } => Self::SameSchema { both: both.clone() },
Self::Codec { read } => Self::Codec { read: read.clone() },
Self::Either {
_write,
read,
key_migration,
val_migration,
} => Self::Either {
_write: _write.clone(),
read: read.clone(),
key_migration: Arc::clone(key_migration),
val_migration: Arc::clone(val_migration),
},
}
}
}
impl<K, V> PartMigration<K, V>
where
K: Debug + Codec,
V: Debug + Codec,
{
pub(crate) async fn new<T, D>(
write: Option<SchemaId>,
read: Schemas<K, V>,
schema_cache: &mut SchemaCache<K, V, T, D>,
) -> Result<Self, Schemas<K, V>>
where
T: Timestamp + Lattice + Codec64 + Sync,
D: Semigroup + Codec64,
{
match (write, read.id) {
(None, _) => Ok(PartMigration::Codec { read }),
(Some(w), Some(r)) if w == r => Ok(PartMigration::SameSchema { both: read }),
(Some(w), _) => {
let write = schema_cache
.schemas(&w)
.await
.expect("appended part should reference registered schema");
if write.key == read.key && write.val == read.val {
return Ok(PartMigration::SameSchema { both: read });
}
let start = Instant::now();
let key_migration = schema_cache
.key_migration(&write, &read)
.ok_or_else(|| read.clone())?;
let val_migration = schema_cache
.val_migration(&write, &read)
.ok_or_else(|| read.clone())?;
schema_cache
.applier
.metrics
.schema
.migration_new_count
.inc();
schema_cache
.applier
.metrics
.schema
.migration_new_seconds
.inc_by(start.elapsed().as_secs_f64());
Ok(PartMigration::Either {
_write: write,
read,
key_migration,
val_migration,
})
}
}
}
}
impl<K: Codec, V: Codec> PartMigration<K, V> {
pub(crate) fn codec_read(&self) -> &Schemas<K, V> {
match self {
PartMigration::SameSchema { both } => both,
PartMigration::Codec { read } => read,
PartMigration::Either { read, .. } => read,
}
}
}
#[cfg(test)]
mod tests {
use arrow::array::{
as_string_array, Array, ArrayBuilder, StringArray, StringBuilder, StructArray,
};
use arrow::datatypes::{DataType, Field};
use bytes::BufMut;
use futures::StreamExt;
use mz_dyncfg::ConfigUpdates;
use mz_persist_types::codec_impls::UnitSchema;
use mz_persist_types::columnar::{ColumnDecoder, ColumnEncoder, Schema2};
use mz_persist_types::stats::{NoneStats, StructStats};
use mz_persist_types::ShardId;
use timely::progress::Antichain;
use crate::cli::admin::info_log_non_zero_metrics;
use crate::read::ReadHandle;
use crate::tests::new_test_client;
use crate::{Diagnostics, DANGEROUS_ENABLE_SCHEMA_EVOLUTION};
use super::*;
#[mz_ore::test]
fn schema_id() {
assert_eq!(SchemaId(1).to_string(), "h1");
assert_eq!(SchemaId::try_from("h1".to_owned()), Ok(SchemaId(1)));
assert!(SchemaId::try_from("nope".to_owned()).is_err());
}
#[derive(Debug, Default, PartialEq, Eq, PartialOrd, Ord)]
struct Strings(Vec<String>);
impl Codec for Strings {
type Schema = StringsSchema;
type Storage = ();
fn codec_name() -> String {
"Strings".into()
}
fn encode<B: BufMut>(&self, buf: &mut B) {
buf.put_slice(self.0.join(",").as_bytes());
}
fn decode<'a>(buf: &'a [u8], schema: &Self::Schema) -> Result<Self, String> {
let buf = std::str::from_utf8(buf).map_err(|err| err.to_string())?;
let mut ret = buf.split(",").map(|x| x.to_owned()).collect::<Vec<_>>();
while schema.0.len() > ret.len() {
ret.push("".into());
}
while schema.0.len() < ret.len() {
ret.pop();
}
Ok(Strings(ret))
}
fn encode_schema(schema: &Self::Schema) -> bytes::Bytes {
schema
.0
.iter()
.map(|x| x.then_some('n').unwrap_or(' '))
.collect::<String>()
.into_bytes()
.into()
}
fn decode_schema(buf: &bytes::Bytes) -> Self::Schema {
let buf = std::str::from_utf8(buf).expect("valid schema");
StringsSchema(
buf.chars()
.map(|x| match x {
'n' => true,
' ' => false,
_ => unreachable!(),
})
.collect(),
)
}
}
#[derive(Debug, Clone, Default, PartialEq)]
struct StringsSchema(Vec<bool>);
impl Schema2<Strings> for StringsSchema {
type ArrowColumn = StructArray;
type Statistics = NoneStats;
type Decoder = StringsDecoder;
type Encoder = StringsEncoder;
fn decoder(&self, col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error> {
let mut cols = Vec::new();
for (idx, _) in self.0.iter().enumerate() {
cols.push(as_string_array(col.column_by_name(&idx.to_string()).unwrap()).clone());
}
Ok(StringsDecoder(cols))
}
fn encoder(&self) -> Result<Self::Encoder, anyhow::Error> {
let mut fields = Vec::new();
let mut arrays = Vec::new();
for (idx, nullable) in self.0.iter().enumerate() {
fields.push(Field::new(idx.to_string(), DataType::Utf8, *nullable));
arrays.push(StringBuilder::new());
}
Ok(StringsEncoder { fields, arrays })
}
}
#[derive(Debug)]
struct StringsDecoder(Vec<StringArray>);
impl ColumnDecoder<Strings> for StringsDecoder {
fn decode(&self, idx: usize, val: &mut Strings) {
val.0.clear();
for col in self.0.iter() {
if col.is_valid(idx) {
val.0.push(col.value(idx).into());
} else {
val.0.push("".into());
}
}
}
fn is_null(&self, _: usize) -> bool {
false
}
fn stats(&self) -> StructStats {
StructStats {
len: self.0[0].len(),
cols: Default::default(),
}
}
}
#[derive(Debug)]
struct StringsEncoder {
fields: Vec<Field>,
arrays: Vec<StringBuilder>,
}
impl ColumnEncoder<Strings> for StringsEncoder {
type FinishedColumn = StructArray;
fn goodbytes(&self) -> usize {
self.arrays.iter().map(|a| a.values_slice().len()).sum()
}
fn append(&mut self, val: &Strings) {
for (idx, val) in val.0.iter().enumerate() {
if val.is_empty() {
self.arrays[idx].append_null();
} else {
self.arrays[idx].append_value(val);
}
}
}
fn append_null(&mut self) {
unreachable!()
}
fn finish(self) -> Self::FinishedColumn {
let arrays = self
.arrays
.into_iter()
.map(|mut x| ArrayBuilder::finish(&mut x))
.collect();
StructArray::new(self.fields.into(), arrays, None)
}
}
#[mz_persist_proc::test(tokio::test)]
#[cfg_attr(miri, ignore)]
async fn compare_and_evolve_schema(mut dyncfgs: ConfigUpdates) {
dyncfgs.add(&DANGEROUS_ENABLE_SCHEMA_EVOLUTION, true);
let client = new_test_client(&dyncfgs).await;
let d = Diagnostics::for_tests();
let shard_id = ShardId::new();
let schema0 = StringsSchema(vec![false]);
let schema1 = StringsSchema(vec![false, true]);
let write0 = client
.open_writer::<Strings, (), u64, i64>(
shard_id,
Arc::new(schema0.clone()),
Arc::new(UnitSchema),
d.clone(),
)
.await
.unwrap();
assert_eq!(write0.write_schemas.id.unwrap(), SchemaId(0));
let res = client
.compare_and_evolve_schema::<Strings, (), u64, i64>(
shard_id,
SchemaId(0),
&StringsSchema(vec![]),
&UnitSchema,
d.clone(),
)
.await
.unwrap();
assert_eq!(res, CaESchema::Incompatible);
let res = client
.compare_and_evolve_schema::<Strings, (), u64, i64>(
shard_id,
SchemaId(1),
&schema1,
&UnitSchema,
d.clone(),
)
.await
.unwrap();
assert_eq!(
res,
CaESchema::ExpectedMismatch {
schema_id: SchemaId(0),
key: schema0,
val: UnitSchema
}
);
let res = client
.compare_and_evolve_schema::<Strings, (), u64, i64>(
shard_id,
SchemaId(0),
&schema1,
&UnitSchema,
d.clone(),
)
.await
.unwrap();
assert_eq!(res, CaESchema::Ok(SchemaId(1)));
let write1 = client
.open_writer::<Strings, (), u64, i64>(
shard_id,
Arc::new(schema1),
Arc::new(UnitSchema),
d.clone(),
)
.await
.unwrap();
assert_eq!(write1.write_schemas.id.unwrap(), SchemaId(1));
}
fn strings(xs: &[((Result<Strings, String>, Result<(), String>), u64, i64)]) -> Vec<Vec<&str>> {
xs.iter()
.map(|((k, _), _, _)| k.as_ref().unwrap().0.iter().map(|x| x.as_str()).collect())
.collect()
}
#[mz_persist_proc::test(tokio::test)]
#[cfg_attr(miri, ignore)]
async fn schema_evolution(mut dyncfgs: ConfigUpdates) {
dyncfgs.add(&DANGEROUS_ENABLE_SCHEMA_EVOLUTION, true);
async fn snap_streaming(
as_of: u64,
read: &mut ReadHandle<Strings, (), u64, i64>,
) -> Vec<((Result<Strings, String>, Result<(), String>), u64, i64)> {
let mut ret = read
.snapshot_and_stream(Antichain::from_elem(as_of))
.await
.unwrap()
.collect::<Vec<_>>()
.await;
ret.sort();
ret
}
let client = new_test_client(&dyncfgs).await;
let d = Diagnostics::for_tests();
let shard_id = ShardId::new();
let schema0 = StringsSchema(vec![false]);
let schema1 = StringsSchema(vec![false, true]);
let (mut write0, mut read0) = client
.open::<Strings, (), u64, i64>(
shard_id,
Arc::new(schema0.clone()),
Arc::new(UnitSchema),
d.clone(),
true,
)
.await
.unwrap();
write0
.expect_compare_and_append(&[((Strings(vec!["0 before".into()]), ()), 0, 1)], 0, 1)
.await;
let expected = vec![vec!["0 before"]];
assert_eq!(strings(&snap_streaming(0, &mut read0).await), expected);
assert_eq!(strings(&read0.expect_snapshot_and_fetch(0).await), expected);
let res = client
.compare_and_evolve_schema::<Strings, (), u64, i64>(
shard_id,
SchemaId(0),
&schema1,
&UnitSchema,
d.clone(),
)
.await
.unwrap();
assert_eq!(res, CaESchema::Ok(SchemaId(1)));
let (mut write1, mut read1) = client
.open::<Strings, (), u64, i64>(
shard_id,
Arc::new(schema1.clone()),
Arc::new(UnitSchema),
d.clone(),
true,
)
.await
.unwrap();
write1
.expect_compare_and_append(
&[
((Strings(vec!["1 null".into(), "".into()]), ()), 1, 1),
((Strings(vec!["1 not".into(), "x".into()]), ()), 1, 1),
],
1,
2,
)
.await;
write0
.expect_compare_and_append(&[((Strings(vec!["0 after".into()]), ()), 2, 1)], 2, 3)
.await;
let expected = vec![
vec!["0 after"],
vec!["0 before"],
vec!["1 not"],
vec!["1 null"],
];
assert_eq!(strings(&snap_streaming(2, &mut read0).await), expected);
assert_eq!(strings(&read0.expect_snapshot_and_fetch(2).await), expected);
let expected = vec![
vec!["0 after", ""],
vec!["0 before", ""],
vec!["1 not", "x"],
vec!["1 null", ""],
];
assert_eq!(strings(&snap_streaming(2, &mut read1).await), expected);
assert_eq!(strings(&read1.expect_snapshot_and_fetch(2).await), expected);
if false {
info_log_non_zero_metrics(&client.metrics.registry.gather());
}
}
}