use std::collections::{BTreeMap, BTreeSet};
use std::future::Future;
use std::sync::Arc;
use mz_compute_types::dataflows::DataflowDescription;
use mz_durable_cache::{DurableCache, DurableCacheCodec};
use mz_dyncfg::ConfigSet;
use mz_expr::OptimizedMirRelationExpr;
use mz_ore::channel::trigger;
use mz_ore::soft_panic_or_log;
use mz_ore::task::spawn;
use mz_persist_client::cli::admin::{
EXPRESSION_CACHE_FORCE_COMPACTION_FUEL, EXPRESSION_CACHE_FORCE_COMPACTION_WAIT,
};
use mz_persist_client::PersistClient;
use mz_persist_types::codec_impls::VecU8Schema;
use mz_persist_types::Codec;
use mz_repr::optimize::OptimizerFeatures;
use mz_repr::GlobalId;
use mz_transform::dataflow::DataflowMetainfo;
use mz_transform::notice::OptimizerNotice;
use proptest_derive::Arbitrary;
use serde::{Deserialize, Serialize};
use timely::Container;
use tokio::sync::mpsc;
use tracing::debug;
use uuid::Uuid;
use crate::durable::expression_cache_shard_id;
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize, Arbitrary)]
enum ExpressionType {
Local,
Global,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct LocalExpressions {
pub local_mir: OptimizedMirRelationExpr,
pub optimizer_features: OptimizerFeatures,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct GlobalExpressions {
pub global_mir: DataflowDescription<OptimizedMirRelationExpr>,
pub physical_plan: DataflowDescription<mz_compute_types::plan::Plan>,
pub dataflow_metainfos: DataflowMetainfo<Arc<OptimizerNotice>>,
pub optimizer_features: OptimizerFeatures,
}
impl GlobalExpressions {
fn index_imports(&self) -> impl Iterator<Item = &GlobalId> {
self.global_mir
.index_imports
.keys()
.chain(self.physical_plan.index_imports.keys())
}
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize, Arbitrary)]
struct CacheKey {
deploy_generation: u64,
id: GlobalId,
expr_type: ExpressionType,
}
#[derive(Debug, PartialEq, Eq)]
struct ExpressionCodec;
impl DurableCacheCodec for ExpressionCodec {
type Key = CacheKey;
type Val = Vec<u8>;
type KeyCodec = Vec<u8>;
type ValCodec = Vec<u8>;
fn schemas() -> (
<Self::KeyCodec as Codec>::Schema,
<Self::ValCodec as Codec>::Schema,
) {
(VecU8Schema::default(), VecU8Schema::default())
}
fn encode(key: &Self::Key, val: &Self::Val) -> (Self::KeyCodec, Self::ValCodec) {
let key = bincode::serialize(key).expect("must serialize");
(key, val.clone())
}
fn decode(key: &Self::KeyCodec, val: &Self::ValCodec) -> (Self::Key, Self::Val) {
let key = bincode::deserialize(key).expect("must deserialize");
(key, val.clone())
}
}
#[derive(Debug, Clone)]
pub struct ExpressionCacheConfig {
pub deploy_generation: u64,
pub persist: PersistClient,
pub organization_id: Uuid,
pub current_ids: BTreeSet<GlobalId>,
pub remove_prior_gens: bool,
pub compact_shard: bool,
pub dyncfgs: ConfigSet,
}
pub struct ExpressionCache {
deploy_generation: u64,
durable_cache: DurableCache<ExpressionCodec>,
}
impl ExpressionCache {
pub async fn open(
ExpressionCacheConfig {
deploy_generation,
persist,
organization_id,
current_ids,
remove_prior_gens,
compact_shard,
dyncfgs,
}: ExpressionCacheConfig,
) -> (
Self,
BTreeMap<GlobalId, LocalExpressions>,
BTreeMap<GlobalId, GlobalExpressions>,
) {
let shard_id = expression_cache_shard_id(organization_id);
let durable_cache = DurableCache::new(&persist, shard_id, "expressions").await;
let mut cache = Self {
deploy_generation,
durable_cache,
};
const RETRIES: usize = 100;
for _ in 0..RETRIES {
match cache
.try_open(¤t_ids, remove_prior_gens, compact_shard, &dyncfgs)
.await
{
Ok((local_expressions, global_expressions)) => {
return (cache, local_expressions, global_expressions)
}
Err(err) => debug!("failed to open cache: {err} ... retrying"),
}
}
panic!("Unable to open expression cache after {RETRIES} retries");
}
async fn try_open(
&mut self,
current_ids: &BTreeSet<GlobalId>,
remove_prior_gens: bool,
compact_shard: bool,
dyncfgs: &ConfigSet,
) -> Result<
(
BTreeMap<GlobalId, LocalExpressions>,
BTreeMap<GlobalId, GlobalExpressions>,
),
mz_durable_cache::Error,
> {
let mut keys_to_remove = Vec::new();
let mut local_expressions = BTreeMap::new();
let mut global_expressions = BTreeMap::new();
for (key, expressions) in self.durable_cache.entries_local() {
if key.deploy_generation == self.deploy_generation {
match key.expr_type {
ExpressionType::Local => {
let expressions: LocalExpressions = match bincode::deserialize(expressions)
{
Ok(expressions) => expressions,
Err(err) => {
soft_panic_or_log!(
"unable to deserialize local expressions: {expressions:?}: {err:?}"
);
continue;
}
};
if !current_ids.contains(&key.id) {
keys_to_remove.push((key.clone(), None));
} else {
local_expressions.insert(key.id, expressions);
}
}
ExpressionType::Global => {
let expressions: GlobalExpressions = match bincode::deserialize(expressions)
{
Ok(expressions) => expressions,
Err(err) => {
soft_panic_or_log!(
"unable to deserialize global expressions: {expressions:?}: {err:?}"
);
continue;
}
};
let index_dependencies: BTreeSet<_> =
expressions.index_imports().cloned().collect();
if !current_ids.contains(&key.id)
|| !index_dependencies.is_subset(current_ids)
{
keys_to_remove.push((key.clone(), None));
} else {
global_expressions.insert(key.id, expressions);
}
}
}
} else if remove_prior_gens {
keys_to_remove.push((key.clone(), None));
}
}
let keys_to_remove: Vec<_> = keys_to_remove
.iter()
.map(|(key, expressions)| (key, expressions.as_ref()))
.collect();
self.durable_cache.try_set_many(&keys_to_remove).await?;
if compact_shard {
let fuel = EXPRESSION_CACHE_FORCE_COMPACTION_FUEL.handle(dyncfgs);
let wait = EXPRESSION_CACHE_FORCE_COMPACTION_WAIT.handle(dyncfgs);
self.durable_cache
.dangerous_compact_shard(move || fuel.get(), move || wait.get())
.await;
}
Ok((local_expressions, global_expressions))
}
async fn update(
&mut self,
new_local_expressions: Vec<(GlobalId, LocalExpressions)>,
new_global_expressions: Vec<(GlobalId, GlobalExpressions)>,
invalidate_ids: BTreeSet<GlobalId>,
) {
let mut entries = BTreeMap::new();
for id in invalidate_ids {
entries.insert(
CacheKey {
id,
deploy_generation: self.deploy_generation,
expr_type: ExpressionType::Local,
},
None,
);
entries.insert(
CacheKey {
id,
deploy_generation: self.deploy_generation,
expr_type: ExpressionType::Global,
},
None,
);
}
for (id, expressions) in new_local_expressions {
let expressions = match bincode::serialize(&expressions) {
Ok(expressions) => expressions,
Err(err) => {
soft_panic_or_log!(
"unable to serialize local expressions: {expressions:?}: {err:?}"
);
continue;
}
};
entries.insert(
CacheKey {
id,
deploy_generation: self.deploy_generation,
expr_type: ExpressionType::Local,
},
Some(expressions),
);
}
for (id, expressions) in new_global_expressions {
let expressions = match bincode::serialize(&expressions) {
Ok(expressions) => expressions,
Err(err) => {
soft_panic_or_log!(
"unable to serialize global expressions: {expressions:?}: {err:?}"
);
continue;
}
};
entries.insert(
CacheKey {
id,
deploy_generation: self.deploy_generation,
expr_type: ExpressionType::Global,
},
Some(expressions),
);
}
let entries: Vec<_> = entries
.iter()
.map(|(key, expressions)| (key, expressions.as_ref()))
.collect();
self.durable_cache.set_many(&entries).await
}
}
enum CacheOperation {
Update {
new_local_expressions: Vec<(GlobalId, LocalExpressions)>,
new_global_expressions: Vec<(GlobalId, GlobalExpressions)>,
invalidate_ids: BTreeSet<GlobalId>,
trigger: trigger::Trigger,
},
}
#[derive(Debug, Clone)]
pub struct ExpressionCacheHandle {
tx: mpsc::UnboundedSender<CacheOperation>,
}
impl ExpressionCacheHandle {
pub async fn spawn_expression_cache(
config: ExpressionCacheConfig,
) -> (
Self,
BTreeMap<GlobalId, LocalExpressions>,
BTreeMap<GlobalId, GlobalExpressions>,
) {
let (mut cache, local_expressions, global_expressions) =
ExpressionCache::open(config).await;
let (tx, mut rx) = mpsc::unbounded_channel();
spawn(|| "expression-cache-task", async move {
loop {
while let Some(op) = rx.recv().await {
match op {
CacheOperation::Update {
new_local_expressions,
new_global_expressions,
invalidate_ids,
trigger: _trigger,
} => {
cache
.update(
new_local_expressions,
new_global_expressions,
invalidate_ids,
)
.await
}
}
}
}
});
(Self { tx }, local_expressions, global_expressions)
}
pub fn update(
&self,
new_local_expressions: Vec<(GlobalId, LocalExpressions)>,
new_global_expressions: Vec<(GlobalId, GlobalExpressions)>,
invalidate_ids: BTreeSet<GlobalId>,
) -> impl Future<Output = ()> {
let (trigger, trigger_rx) = trigger::channel();
let op = CacheOperation::Update {
new_local_expressions,
new_global_expressions,
invalidate_ids,
trigger,
};
let _ = self.tx.send(op);
trigger_rx
}
}
#[cfg(test)]
mod tests {
use std::collections::{BTreeMap, BTreeSet};
use std::marker::PhantomData;
use std::sync::Arc;
use std::time::{Duration, Instant};
use mz_compute_types::dataflows::DataflowDescription;
use mz_durable_cache::DurableCacheCodec;
use mz_dyncfg::ConfigSet;
use mz_expr::OptimizedMirRelationExpr;
use mz_ore::test::timeout;
use mz_persist_client::PersistClient;
use mz_repr::optimize::OptimizerFeatures;
use mz_repr::GlobalId;
use mz_transform::dataflow::DataflowMetainfo;
use mz_transform::notice::OptimizerNotice;
use proptest::arbitrary::{any, Arbitrary};
use proptest::prelude::{BoxedStrategy, ProptestConfig};
use proptest::proptest;
use proptest::strategy::{Strategy, ValueTree};
use proptest::test_runner::{RngAlgorithm, TestRng, TestRunner};
use tracing::info;
use uuid::Uuid;
use crate::expr_cache::{
CacheKey, ExpressionCacheConfig, ExpressionCacheHandle, ExpressionCodec, GlobalExpressions,
LocalExpressions,
};
impl Arbitrary for LocalExpressions {
type Parameters = ();
fn arbitrary_with((): Self::Parameters) -> Self::Strategy {
(
any::<OptimizedMirRelationExpr>(),
any::<OptimizerFeatures>(),
)
.prop_map(|(local_mir, optimizer_features)| LocalExpressions {
local_mir,
optimizer_features,
})
.boxed()
}
type Strategy = BoxedStrategy<Self>;
}
impl Arbitrary for GlobalExpressions {
type Parameters = ();
fn arbitrary_with((): Self::Parameters) -> Self::Strategy {
(
any::<DataflowDescription<OptimizedMirRelationExpr>>(),
any::<DataflowDescription<mz_compute_types::plan::Plan>>(),
any::<DataflowMetainfo<Arc<OptimizerNotice>>>(),
any::<OptimizerFeatures>(),
)
.prop_map(
|(global_mir, physical_plan, dataflow_metainfos, optimizer_features)| {
GlobalExpressions {
global_mir,
physical_plan,
dataflow_metainfos,
optimizer_features,
}
},
)
.boxed()
}
type Strategy = BoxedStrategy<Self>;
}
struct ArbitraryTimeout<T: Arbitrary + Send + 'static> {
_phantom: PhantomData<T>,
}
impl<T: Arbitrary + Send> ArbitraryTimeout<T> {
const GENERATE_ATTEMPTS: u64 = 6;
const TIMEOUT_SECS: u64 = 5;
fn new() -> Self {
Self {
_phantom: Default::default(),
}
}
fn new_tree() -> Box<dyn ValueTree<Value = T>>
where
T: 'static,
{
let seed: [u8; 32] = rand::random();
let mut test_runner = TestRunner::deterministic();
let rng = test_runner.rng();
*rng = TestRng::from_seed(RngAlgorithm::ChaCha, &seed);
Box::new(T::arbitrary().new_tree(&mut test_runner).expect("valid"))
}
fn generate(&self) -> T {
for _ in 0..Self::GENERATE_ATTEMPTS {
if let Ok(val) = self.try_generate() {
return val;
}
}
panic!("timed out generating a value");
}
fn try_generate(&self) -> Result<T, ()> {
match timeout(Duration::from_secs(Self::TIMEOUT_SECS), || {
Ok(Self::new_tree().current())
}) {
Ok(val) => Ok(val),
Err(_) => {
info!("timed out generating a value");
Err(())
}
}
}
}
#[mz_ore::test(tokio::test)]
#[cfg_attr(miri, ignore)] async fn expression_cache() {
let local_tree: ArbitraryTimeout<LocalExpressions> = ArbitraryTimeout::new();
let global_tree: ArbitraryTimeout<GlobalExpressions> = ArbitraryTimeout::new();
let first_deploy_generation = 0;
let second_deploy_generation = 1;
let persist = PersistClient::new_for_tests().await;
let organization_id = Uuid::new_v4();
let mut current_ids = BTreeSet::new();
let mut remove_prior_gens = false;
let compact_shard = false;
let dyncfgs = &mz_persist_client::cfg::all_dyncfgs(ConfigSet::default());
let mut next_id = 0;
let (mut local_exps, mut global_exps) = {
let (cache, local_exprs, global_exprs) =
ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
deploy_generation: first_deploy_generation,
persist: persist.clone(),
organization_id,
current_ids: current_ids.clone(),
remove_prior_gens,
compact_shard,
dyncfgs: dyncfgs.clone(),
})
.await;
assert_eq!(local_exprs, BTreeMap::new(), "new cache should be empty");
assert_eq!(global_exprs, BTreeMap::new(), "new cache should be empty");
let mut local_exps = BTreeMap::new();
let mut global_exps = BTreeMap::new();
for _ in 0..4 {
let id = GlobalId::User(next_id);
let start = Instant::now();
let local_exp = local_tree.generate();
let global_exp = global_tree.generate();
info!("Generating exps took: {:?}", start.elapsed());
cache
.update(
vec![(id, local_exp.clone())],
vec![(id, global_exp.clone())],
BTreeSet::new(),
)
.await;
current_ids.insert(id);
current_ids.extend(global_exp.index_imports());
local_exps.insert(id, local_exp);
global_exps.insert(id, global_exp);
next_id += 1;
}
(local_exps, global_exps)
};
{
let (_cache, local_entries, global_entries) =
ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
deploy_generation: first_deploy_generation,
persist: persist.clone(),
organization_id,
current_ids: current_ids.clone(),
remove_prior_gens,
compact_shard,
dyncfgs: dyncfgs.clone(),
})
.await;
assert_eq!(
local_entries, local_exps,
"local expression with non-matching optimizer features should be removed during reconciliation"
);
assert_eq!(
global_entries, global_exps,
"global expression with non-matching optimizer features should be removed during reconciliation"
);
}
{
let id_to_remove = local_exps.keys().next().expect("not empty").clone();
current_ids.remove(&id_to_remove);
let _removed_local_exp = local_exps.remove(&id_to_remove);
let _removed_global_exp = global_exps.remove(&id_to_remove);
let (_cache, local_entries, global_entries) =
ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
deploy_generation: first_deploy_generation,
persist: persist.clone(),
organization_id,
current_ids: current_ids.clone(),
remove_prior_gens,
compact_shard,
dyncfgs: dyncfgs.clone(),
})
.await;
assert_eq!(
local_entries, local_exps,
"dropped local objects should be removed during reconciliation"
);
assert_eq!(
global_entries, global_exps,
"dropped global objects should be removed during reconciliation"
);
}
{
let global_exp_to_remove = global_exps.keys().next().expect("not empty").clone();
let removed_global_exp = global_exps
.remove(&global_exp_to_remove)
.expect("known to exist");
let dependency_to_remove = removed_global_exp
.index_imports()
.next()
.expect("arbitrary impl always makes non-empty vecs");
current_ids.remove(dependency_to_remove);
let _removed_local_exp = local_exps.remove(dependency_to_remove);
let _removed_global_exp = global_exps.remove(dependency_to_remove);
global_exps.retain(|_, exp| {
let index_imports: BTreeSet<_> = exp.index_imports().collect();
!index_imports.contains(&dependency_to_remove)
});
let (_cache, local_entries, global_entries) =
ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
deploy_generation: first_deploy_generation,
persist: persist.clone(),
organization_id,
current_ids: current_ids.clone(),
remove_prior_gens,
compact_shard,
dyncfgs: dyncfgs.clone(),
})
.await;
assert_eq!(
local_entries, local_exps,
"dropped object dependencies should NOT remove local expressions"
);
assert_eq!(
global_entries, global_exps,
"dropped object dependencies should remove global expressions"
);
}
let (new_gen_local_exps, new_gen_global_exps) = {
let (cache, local_entries, global_entries) =
ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
deploy_generation: second_deploy_generation,
persist: persist.clone(),
organization_id,
current_ids: current_ids.clone(),
remove_prior_gens,
compact_shard,
dyncfgs: dyncfgs.clone(),
})
.await;
assert_eq!(
local_entries,
BTreeMap::new(),
"new generation should be empty"
);
assert_eq!(
global_entries,
BTreeMap::new(),
"new generation should be empty"
);
let mut local_exps = BTreeMap::new();
let mut global_exps = BTreeMap::new();
for _ in 0..2 {
let id = GlobalId::User(next_id);
let start = Instant::now();
let local_exp = local_tree.generate();
let global_exp = global_tree.generate();
info!("Generating exps took: {:?}", start.elapsed());
cache
.update(
vec![(id, local_exp.clone())],
vec![(id, global_exp.clone())],
BTreeSet::new(),
)
.await;
current_ids.insert(id);
current_ids.extend(global_exp.index_imports());
local_exps.insert(id, local_exp);
global_exps.insert(id, global_exp);
next_id += 1;
}
(local_exps, global_exps)
};
{
let (_cache, local_entries, global_entries) =
ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
deploy_generation: first_deploy_generation,
persist: persist.clone(),
organization_id,
current_ids: current_ids.clone(),
remove_prior_gens,
compact_shard,
dyncfgs: dyncfgs.clone(),
})
.await;
assert_eq!(
local_entries, local_exps,
"Previous generation local expressions should still exist"
);
assert_eq!(
global_entries, global_exps,
"Previous generation global expressions should still exist"
);
}
{
remove_prior_gens = true;
let (_cache, local_entries, global_entries) =
ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
deploy_generation: second_deploy_generation,
persist: persist.clone(),
organization_id,
current_ids: current_ids.clone(),
remove_prior_gens,
compact_shard,
dyncfgs: dyncfgs.clone(),
})
.await;
assert_eq!(
local_entries, new_gen_local_exps,
"new generation local expressions should be persisted"
);
assert_eq!(
global_entries, new_gen_global_exps,
"new generation global expressions should be persisted"
);
}
{
let (_cache, local_entries, global_entries) =
ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
deploy_generation: first_deploy_generation,
persist: persist.clone(),
organization_id,
current_ids: current_ids.clone(),
remove_prior_gens,
compact_shard,
dyncfgs: dyncfgs.clone(),
})
.await;
assert_eq!(
local_entries,
BTreeMap::new(),
"Previous generation local expressions should be cleared"
);
assert_eq!(
global_entries,
BTreeMap::new(),
"Previous generation global expressions should be cleared"
);
}
}
proptest! {
#![proptest_config(ProptestConfig::with_cases(1))]
#[mz_ore::test]
#[cfg_attr(miri, ignore)]
fn local_expr_cache_roundtrip(key in any::<CacheKey>()) {
let local_tree: ArbitraryTimeout<LocalExpressions> = ArbitraryTimeout::new();
let val = local_tree.generate();
let bincode_val = bincode::serialize(&val).expect("must serialize");
let (encoded_key, encoded_val) = ExpressionCodec::encode(&key, &bincode_val);
let (decoded_key, decoded_val) = ExpressionCodec::decode(&encoded_key, &encoded_val);
let decoded_val: LocalExpressions = bincode::deserialize(&decoded_val).expect("local expressions should roundtrip");
assert_eq!(key, decoded_key);
assert_eq!(val, decoded_val);
}
#[mz_ore::test]
#[cfg_attr(miri, ignore)]
fn global_expr_cache_roundtrip(key in any::<CacheKey>()) {
let global_tree: ArbitraryTimeout<GlobalExpressions> = ArbitraryTimeout::new();
let val = global_tree.generate();
let bincode_val = bincode::serialize(&val).expect("must serialize");
let (encoded_key, encoded_val) = ExpressionCodec::encode(&key, &bincode_val);
let (decoded_key, decoded_val) = ExpressionCodec::decode(&encoded_key, &encoded_val);
let decoded_val: GlobalExpressions = bincode::deserialize(&decoded_val).expect("global expressions should roundtrip");
assert_eq!(key, decoded_key);
assert_eq!(val, decoded_val);
}
}
}