1use std::collections::{BTreeMap, BTreeSet};
13use std::future::Future;
14use std::sync::Arc;
15
16use bytes::Bytes;
17use mz_compute_types::dataflows::DataflowDescription;
18use mz_durable_cache::{DurableCache, DurableCacheCodec};
19use mz_dyncfg::ConfigSet;
20use mz_expr::OptimizedMirRelationExpr;
21use mz_ore::channel::trigger;
22use mz_ore::soft_panic_or_log;
23use mz_ore::task::spawn;
24use mz_persist_client::PersistClient;
25use mz_persist_client::cli::admin::{
26 EXPRESSION_CACHE_FORCE_COMPACTION_FUEL, EXPRESSION_CACHE_FORCE_COMPACTION_WAIT,
27};
28use mz_persist_types::codec_impls::VecU8Schema;
29use mz_persist_types::{Codec, ShardId};
30use mz_repr::GlobalId;
31use mz_repr::optimize::OptimizerFeatures;
32use mz_transform::dataflow::DataflowMetainfo;
33use mz_transform::notice::OptimizerNotice;
34use semver::Version;
35use serde::{Deserialize, Serialize};
36use tokio::sync::mpsc;
37use tracing::{debug, warn};
38
39#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
40enum ExpressionType {
41 Local,
42 Global,
43}
44
45#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
47pub struct LocalExpressions {
48 pub local_mir: OptimizedMirRelationExpr,
49 pub optimizer_features: OptimizerFeatures,
50}
51
52#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
54pub struct GlobalExpressions {
55 pub global_mir: DataflowDescription<OptimizedMirRelationExpr>,
56 pub physical_plan: DataflowDescription<mz_compute_types::plan::Plan>,
57 pub dataflow_metainfos: DataflowMetainfo<Arc<OptimizerNotice>>,
58 pub optimizer_features: OptimizerFeatures,
59}
60
61impl GlobalExpressions {
62 fn index_imports(&self) -> impl Iterator<Item = &GlobalId> {
63 self.global_mir
64 .index_imports
65 .keys()
66 .chain(self.physical_plan.index_imports.keys())
67 }
68}
69
70#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
71struct CacheKey {
72 build_version: String,
73 id: GlobalId,
74 expr_type: ExpressionType,
75}
76
77#[derive(Debug, PartialEq, Eq)]
78struct ExpressionCodec;
79
80impl DurableCacheCodec for ExpressionCodec {
81 type Key = CacheKey;
82 type Val = Bytes;
85 type KeyCodec = Bytes;
86 type ValCodec = Bytes;
87
88 fn schemas() -> (
89 <Self::KeyCodec as Codec>::Schema,
90 <Self::ValCodec as Codec>::Schema,
91 ) {
92 (VecU8Schema::default(), VecU8Schema::default())
93 }
94
95 fn encode(key: &Self::Key, val: &Self::Val) -> (Self::KeyCodec, Self::ValCodec) {
96 let key = bincode::serialize(key).expect("must serialize");
97 (Bytes::from(key), val.clone())
98 }
99
100 fn decode(key: &Self::KeyCodec, val: &Self::ValCodec) -> (Self::Key, Self::Val) {
101 let key = bincode::deserialize(key).expect("must deserialize");
102 (key, val.clone())
103 }
104}
105
106#[derive(Debug, Clone)]
108pub struct ExpressionCacheConfig {
109 pub build_version: Version,
110 pub persist: PersistClient,
111 pub shard_id: ShardId,
112 pub current_ids: BTreeSet<GlobalId>,
113 pub remove_prior_versions: bool,
114 pub compact_shard: bool,
115 pub dyncfgs: ConfigSet,
116}
117
118pub struct ExpressionCache {
120 build_version: Version,
121 durable_cache: DurableCache<ExpressionCodec>,
122}
123
124impl ExpressionCache {
125 pub async fn open(
137 ExpressionCacheConfig {
138 build_version,
139 persist,
140 shard_id,
141 current_ids,
142 remove_prior_versions,
143 compact_shard,
144 dyncfgs,
145 }: ExpressionCacheConfig,
146 ) -> (
147 Self,
148 BTreeMap<GlobalId, LocalExpressions>,
149 BTreeMap<GlobalId, GlobalExpressions>,
150 ) {
151 let durable_cache = DurableCache::new(&persist, shard_id, "expressions").await;
152 let mut cache = Self {
153 build_version,
154 durable_cache,
155 };
156
157 const RETRIES: usize = 100;
158 for _ in 0..RETRIES {
159 match cache
160 .try_open(¤t_ids, remove_prior_versions, compact_shard, &dyncfgs)
161 .await
162 {
163 Ok((local_expressions, global_expressions)) => {
164 return (cache, local_expressions, global_expressions);
165 }
166 Err(err) => debug!("failed to open cache: {err} ... retrying"),
167 }
168 }
169
170 panic!("Unable to open expression cache after {RETRIES} retries");
171 }
172
173 async fn try_open(
174 &mut self,
175 current_ids: &BTreeSet<GlobalId>,
176 remove_prior_versions: bool,
177 compact_shard: bool,
178 dyncfgs: &ConfigSet,
179 ) -> Result<
180 (
181 BTreeMap<GlobalId, LocalExpressions>,
182 BTreeMap<GlobalId, GlobalExpressions>,
183 ),
184 mz_durable_cache::Error,
185 > {
186 let mut keys_to_remove = Vec::new();
187 let mut local_expressions = BTreeMap::new();
188 let mut global_expressions = BTreeMap::new();
189
190 for (key, expressions) in self.durable_cache.entries_local() {
191 let build_version = match key.build_version.parse::<Version>() {
192 Ok(build_version) => build_version,
193 Err(err) => {
194 warn!("unable to parse build version: {key:?}: {err:?}");
195 keys_to_remove.push((key.clone(), None));
196 continue;
197 }
198 };
199 if build_version == self.build_version {
200 match key.expr_type {
202 ExpressionType::Local => {
203 let expressions: LocalExpressions = match bincode::deserialize(expressions)
204 {
205 Ok(expressions) => expressions,
206 Err(err) => {
207 soft_panic_or_log!(
208 "unable to deserialize local expressions: ({key:?}, {expressions:?}): {err:?}"
209 );
210 continue;
211 }
212 };
213 if !current_ids.contains(&key.id) {
215 keys_to_remove.push((key.clone(), None));
216 } else {
217 local_expressions.insert(key.id, expressions);
218 }
219 }
220 ExpressionType::Global => {
221 let expressions: GlobalExpressions = match bincode::deserialize(expressions)
222 {
223 Ok(expressions) => expressions,
224 Err(err) => {
225 soft_panic_or_log!(
226 "unable to deserialize global expressions: ({key:?}, {expressions:?}): {err:?}"
227 );
228 continue;
229 }
230 };
231 let index_dependencies: BTreeSet<_> =
233 expressions.index_imports().cloned().collect();
234 if !current_ids.contains(&key.id)
235 || !index_dependencies.is_subset(current_ids)
236 {
237 keys_to_remove.push((key.clone(), None));
238 } else {
239 global_expressions.insert(key.id, expressions);
240 }
241 }
242 }
243 } else if remove_prior_versions {
244 keys_to_remove.push((key.clone(), None));
246 }
247 }
248
249 let keys_to_remove: Vec<_> = keys_to_remove
250 .iter()
251 .map(|(key, expressions)| (key, expressions.as_ref()))
252 .collect();
253 self.durable_cache.try_set_many(&keys_to_remove).await?;
254
255 if compact_shard {
256 let fuel = EXPRESSION_CACHE_FORCE_COMPACTION_FUEL.handle(dyncfgs);
257 let wait = EXPRESSION_CACHE_FORCE_COMPACTION_WAIT.handle(dyncfgs);
258 self.durable_cache
259 .dangerous_compact_shard(move || fuel.get(), move || wait.get())
260 .await;
261 }
262
263 Ok((local_expressions, global_expressions))
264 }
265
266 async fn update(
272 &mut self,
273 new_local_expressions: Vec<(GlobalId, LocalExpressions)>,
274 new_global_expressions: Vec<(GlobalId, GlobalExpressions)>,
275 invalidate_ids: BTreeSet<GlobalId>,
276 ) {
277 let mut entries = BTreeMap::new();
278 let build_version = self.build_version.to_string();
279 for id in invalidate_ids {
282 entries.insert(
283 CacheKey {
284 id,
285 build_version: build_version.clone(),
286 expr_type: ExpressionType::Local,
287 },
288 None,
289 );
290 entries.insert(
291 CacheKey {
292 id,
293 build_version: build_version.clone(),
294 expr_type: ExpressionType::Global,
295 },
296 None,
297 );
298 }
299 for (id, expressions) in new_local_expressions {
300 let expressions = match bincode::serialize(&expressions) {
301 Ok(expressions) => Bytes::from(expressions),
302 Err(err) => {
303 soft_panic_or_log!(
304 "unable to serialize local expressions: {expressions:?}: {err:?}"
305 );
306 continue;
307 }
308 };
309 entries.insert(
310 CacheKey {
311 id,
312 build_version: build_version.clone(),
313 expr_type: ExpressionType::Local,
314 },
315 Some(expressions),
316 );
317 }
318 for (id, expressions) in new_global_expressions {
319 let expressions = match bincode::serialize(&expressions) {
320 Ok(expressions) => Bytes::from(expressions),
321 Err(err) => {
322 soft_panic_or_log!(
323 "unable to serialize global expressions: {expressions:?}: {err:?}"
324 );
325 continue;
326 }
327 };
328 entries.insert(
329 CacheKey {
330 id,
331 build_version: build_version.clone(),
332 expr_type: ExpressionType::Global,
333 },
334 Some(expressions),
335 );
336 }
337 let entries: Vec<_> = entries
338 .iter()
339 .map(|(key, expressions)| (key, expressions.as_ref()))
340 .collect();
341 self.durable_cache.set_many(&entries).await
342 }
343}
344
345enum CacheOperation {
347 Update {
349 new_local_expressions: Vec<(GlobalId, LocalExpressions)>,
350 new_global_expressions: Vec<(GlobalId, GlobalExpressions)>,
351 invalidate_ids: BTreeSet<GlobalId>,
352 trigger: trigger::Trigger,
353 },
354}
355
356#[derive(Debug, Clone)]
357pub struct ExpressionCacheHandle {
358 tx: mpsc::UnboundedSender<CacheOperation>,
359}
360
361impl ExpressionCacheHandle {
362 pub async fn spawn_expression_cache(
366 config: ExpressionCacheConfig,
367 ) -> (
368 Self,
369 BTreeMap<GlobalId, LocalExpressions>,
370 BTreeMap<GlobalId, GlobalExpressions>,
371 ) {
372 let (mut cache, local_expressions, global_expressions) =
373 ExpressionCache::open(config).await;
374 let (tx, mut rx) = mpsc::unbounded_channel();
375 spawn(|| "expression-cache-task", async move {
376 loop {
377 while let Some(op) = rx.recv().await {
378 match op {
379 CacheOperation::Update {
380 new_local_expressions,
381 new_global_expressions,
382 invalidate_ids,
383 trigger: _trigger,
384 } => {
385 cache
386 .update(
387 new_local_expressions,
388 new_global_expressions,
389 invalidate_ids,
390 )
391 .await
392 }
393 }
394 }
395 }
396 });
397
398 (Self { tx }, local_expressions, global_expressions)
399 }
400
401 pub fn update(
402 &self,
403 new_local_expressions: Vec<(GlobalId, LocalExpressions)>,
404 new_global_expressions: Vec<(GlobalId, GlobalExpressions)>,
405 invalidate_ids: BTreeSet<GlobalId>,
406 ) -> impl Future<Output = ()> + use<> {
407 let (trigger, trigger_rx) = trigger::channel();
408 let op = CacheOperation::Update {
409 new_local_expressions,
410 new_global_expressions,
411 invalidate_ids,
412 trigger,
413 };
414 let _ = self.tx.send(op);
416 trigger_rx
417 }
418}
419
420#[cfg(test)]
421mod tests {
422 use std::collections::{BTreeMap, BTreeSet};
423
424 use bytes::Bytes;
425 use mz_compute_types::dataflows::{DataflowDescription, IndexDesc, IndexImport};
426 use mz_durable_cache::DurableCacheCodec;
427 use mz_dyncfg::ConfigSet;
428 use mz_expr::{MirRelationExpr, OptimizedMirRelationExpr};
429 use mz_persist_client::PersistClient;
430 use mz_persist_types::ShardId;
431 use mz_repr::{Datum, GlobalId, SqlRelationType, SqlScalarType};
432 use semver::Version;
433
434 use super::*;
435
436 #[mz_ore::test(tokio::test)]
437 #[cfg_attr(miri, ignore)] async fn expression_cache() {
439 let first_version = Version::new(0, 1, 0);
440 let second_version = Version::new(0, 2, 0);
441 let persist = PersistClient::new_for_tests().await;
442 let shard_id = ShardId::new();
443
444 let mut current_ids = BTreeSet::new();
445 let mut remove_prior_versions = false;
446 let compact_shard = false;
448 let dyncfgs = &mz_persist_client::cfg::all_dyncfgs(ConfigSet::default());
449
450 let mut next_id = 0;
451
452 let (mut local_exps, mut global_exps) = {
453 let (cache, local_exprs, global_exprs) =
455 ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
456 build_version: first_version.clone(),
457 persist: persist.clone(),
458 shard_id,
459 current_ids: current_ids.clone(),
460 remove_prior_versions,
461 compact_shard,
462 dyncfgs: dyncfgs.clone(),
463 })
464 .await;
465 assert_eq!(local_exprs, BTreeMap::new(), "new cache should be empty");
466 assert_eq!(global_exprs, BTreeMap::new(), "new cache should be empty");
467
468 let mut local_exps = BTreeMap::new();
470 let mut global_exps = BTreeMap::new();
471 for _ in 0..4 {
472 let id = GlobalId::User(next_id);
473 let local_exp = gen_local_expressions();
474 let global_exp = gen_global_expressions();
475
476 cache
477 .update(
478 vec![(id, local_exp.clone())],
479 vec![(id, global_exp.clone())],
480 BTreeSet::new(),
481 )
482 .await;
483
484 current_ids.insert(id);
485 current_ids.extend(global_exp.index_imports());
486 local_exps.insert(id, local_exp);
487 global_exps.insert(id, global_exp);
488
489 next_id += 1;
490 }
491 (local_exps, global_exps)
492 };
493
494 {
495 let (_cache, local_entries, global_entries) =
497 ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
498 build_version: first_version.clone(),
499 persist: persist.clone(),
500 shard_id,
501 current_ids: current_ids.clone(),
502 remove_prior_versions,
503 compact_shard,
504 dyncfgs: dyncfgs.clone(),
505 })
506 .await;
507 assert_eq!(
508 local_entries, local_exps,
509 "local expression with non-matching optimizer features should be removed during reconciliation"
510 );
511 assert_eq!(
512 global_entries, global_exps,
513 "global expression with non-matching optimizer features should be removed during reconciliation"
514 );
515 }
516
517 {
518 let id_to_remove = local_exps.keys().next().expect("not empty").clone();
520 current_ids.remove(&id_to_remove);
521 let _removed_local_exp = local_exps.remove(&id_to_remove);
522 let _removed_global_exp = global_exps.remove(&id_to_remove);
523
524 let (_cache, local_entries, global_entries) =
526 ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
527 build_version: first_version.clone(),
528 persist: persist.clone(),
529 shard_id,
530 current_ids: current_ids.clone(),
531 remove_prior_versions,
532 compact_shard,
533 dyncfgs: dyncfgs.clone(),
534 })
535 .await;
536 assert_eq!(
537 local_entries, local_exps,
538 "dropped local objects should be removed during reconciliation"
539 );
540 assert_eq!(
541 global_entries, global_exps,
542 "dropped global objects should be removed during reconciliation"
543 );
544 }
545
546 {
547 let global_exp_to_remove = global_exps.keys().next().expect("not empty").clone();
549 let removed_global_exp = global_exps
550 .remove(&global_exp_to_remove)
551 .expect("known to exist");
552 let dependency_to_remove = removed_global_exp
553 .index_imports()
554 .next()
555 .expect("generator always makes non-empty index imports");
556 current_ids.remove(dependency_to_remove);
557
558 let _removed_local_exp = local_exps.remove(dependency_to_remove);
560 let _removed_global_exp = global_exps.remove(dependency_to_remove);
561 global_exps.retain(|_, exp| {
563 let index_imports: BTreeSet<_> = exp.index_imports().collect();
564 !index_imports.contains(&dependency_to_remove)
565 });
566
567 let (_cache, local_entries, global_entries) =
569 ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
570 build_version: first_version.clone(),
571 persist: persist.clone(),
572 shard_id,
573 current_ids: current_ids.clone(),
574 remove_prior_versions,
575 compact_shard,
576 dyncfgs: dyncfgs.clone(),
577 })
578 .await;
579 assert_eq!(
580 local_entries, local_exps,
581 "dropped object dependencies should NOT remove local expressions"
582 );
583 assert_eq!(
584 global_entries, global_exps,
585 "dropped object dependencies should remove global expressions"
586 );
587 }
588
589 let (new_gen_local_exps, new_gen_global_exps) = {
590 let (cache, local_entries, global_entries) =
592 ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
593 build_version: second_version.clone(),
594 persist: persist.clone(),
595 shard_id,
596 current_ids: current_ids.clone(),
597 remove_prior_versions,
598 compact_shard,
599 dyncfgs: dyncfgs.clone(),
600 })
601 .await;
602 assert_eq!(
603 local_entries,
604 BTreeMap::new(),
605 "new version should be empty"
606 );
607 assert_eq!(
608 global_entries,
609 BTreeMap::new(),
610 "new version should be empty"
611 );
612
613 let mut local_exps = BTreeMap::new();
615 let mut global_exps = BTreeMap::new();
616 for _ in 0..2 {
617 let id = GlobalId::User(next_id);
618 let local_exp = gen_local_expressions();
619 let global_exp = gen_global_expressions();
620
621 cache
622 .update(
623 vec![(id, local_exp.clone())],
624 vec![(id, global_exp.clone())],
625 BTreeSet::new(),
626 )
627 .await;
628
629 current_ids.insert(id);
630 current_ids.extend(global_exp.index_imports());
631 local_exps.insert(id, local_exp);
632 global_exps.insert(id, global_exp);
633
634 next_id += 1;
635 }
636 (local_exps, global_exps)
637 };
638
639 {
640 let (_cache, local_entries, global_entries) =
642 ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
643 build_version: first_version.clone(),
644 persist: persist.clone(),
645 shard_id,
646 current_ids: current_ids.clone(),
647 remove_prior_versions,
648 compact_shard,
649 dyncfgs: dyncfgs.clone(),
650 })
651 .await;
652 assert_eq!(
653 local_entries, local_exps,
654 "Previous version local expressions should still exist"
655 );
656 assert_eq!(
657 global_entries, global_exps,
658 "Previous version global expressions should still exist"
659 );
660 }
661
662 {
663 remove_prior_versions = true;
665 let (_cache, local_entries, global_entries) =
666 ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
667 build_version: second_version.clone(),
668 persist: persist.clone(),
669 shard_id,
670 current_ids: current_ids.clone(),
671 remove_prior_versions,
672 compact_shard,
673 dyncfgs: dyncfgs.clone(),
674 })
675 .await;
676 assert_eq!(
677 local_entries, new_gen_local_exps,
678 "new version local expressions should be persisted"
679 );
680 assert_eq!(
681 global_entries, new_gen_global_exps,
682 "new version global expressions should be persisted"
683 );
684 }
685
686 {
687 let (_cache, local_entries, global_entries) =
689 ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
690 build_version: first_version.clone(),
691 persist: persist.clone(),
692 shard_id,
693 current_ids: current_ids.clone(),
694 remove_prior_versions,
695 compact_shard,
696 dyncfgs: dyncfgs.clone(),
697 })
698 .await;
699 assert_eq!(
700 local_entries,
701 BTreeMap::new(),
702 "Previous version local expressions should be cleared"
703 );
704 assert_eq!(
705 global_entries,
706 BTreeMap::new(),
707 "Previous version global expressions should be cleared"
708 );
709 }
710 }
711
712 #[mz_ore::test]
713 fn local_expr_cache_roundtrip() {
714 let key = CacheKey {
715 id: GlobalId::User(1),
716 build_version: "1.2.3".into(),
717 expr_type: ExpressionType::Local,
718 };
719 let val = gen_local_expressions();
720
721 let bincode_val = Bytes::from(bincode::serialize(&val).expect("must serialize"));
722 let (encoded_key, encoded_val) = ExpressionCodec::encode(&key, &bincode_val);
723 let (decoded_key, decoded_val) = ExpressionCodec::decode(&encoded_key, &encoded_val);
724 let decoded_val: LocalExpressions =
725 bincode::deserialize(&decoded_val).expect("local expressions should roundtrip");
726
727 assert_eq!(key, decoded_key);
728 assert_eq!(val, decoded_val);
729 }
730
731 #[mz_ore::test]
732 fn global_expr_cache_roundtrip() {
733 let key = CacheKey {
734 id: GlobalId::User(1),
735 build_version: "1.2.3".into(),
736 expr_type: ExpressionType::Global,
737 };
738 let val = gen_global_expressions();
739
740 let bincode_val = Bytes::from(bincode::serialize(&val).expect("must serialize"));
741 let (encoded_key, encoded_val) = ExpressionCodec::encode(&key, &bincode_val);
742 let (decoded_key, decoded_val) = ExpressionCodec::decode(&encoded_key, &encoded_val);
743 let decoded_val: GlobalExpressions =
744 bincode::deserialize(&decoded_val).expect("global expressions should roundtrip");
745
746 assert_eq!(key, decoded_key);
747 assert_eq!(val, decoded_val);
748 }
749
750 fn gen_local_expressions() -> LocalExpressions {
756 let datum = Datum::UInt64(rand::random());
757
758 LocalExpressions {
759 local_mir: OptimizedMirRelationExpr(MirRelationExpr::constant(
760 vec![vec![datum]],
761 SqlRelationType::new(vec![SqlScalarType::UInt64.nullable(false)]),
762 )),
763 optimizer_features: Default::default(),
764 }
765 }
766
767 fn gen_global_expressions() -> GlobalExpressions {
773 let name = format!("test-{}", rand::random::<u64>());
774
775 let mut global_mir = DataflowDescription::new(name.clone());
776 let mut physical_plan = DataflowDescription::new(name);
777
778 let index_imports = BTreeMap::from_iter([(
780 GlobalId::User(2),
781 IndexImport {
782 desc: IndexDesc {
783 on_id: GlobalId::User(1),
784 key: Default::default(),
785 },
786 typ: SqlRelationType::empty(),
787 monotonic: false,
788 },
789 )]);
790 global_mir.index_imports = index_imports.clone();
791 physical_plan.index_imports = index_imports;
792
793 GlobalExpressions {
794 global_mir,
795 physical_plan,
796 dataflow_metainfos: Default::default(),
797 optimizer_features: Default::default(),
798 }
799 }
800}