1use std::collections::{BTreeMap, BTreeSet};
13use std::future::Future;
14use std::sync::Arc;
15
16use bytes::Bytes;
17use mz_compute_types::dataflows::DataflowDescription;
18use mz_durable_cache::{DurableCache, DurableCacheCodec};
19use mz_dyncfg::ConfigSet;
20use mz_expr::OptimizedMirRelationExpr;
21use mz_ore::channel::trigger;
22use mz_ore::soft_panic_or_log;
23use mz_ore::task::spawn;
24use mz_persist_client::PersistClient;
25use mz_persist_client::cli::admin::{
26 EXPRESSION_CACHE_FORCE_COMPACTION_FUEL, EXPRESSION_CACHE_FORCE_COMPACTION_WAIT,
27};
28use mz_persist_types::codec_impls::VecU8Schema;
29use mz_persist_types::{Codec, ShardId};
30use mz_repr::GlobalId;
31use mz_repr::optimize::OptimizerFeatures;
32use mz_transform::dataflow::DataflowMetainfo;
33use mz_transform::notice::OptimizerNotice;
34use semver::Version;
35use serde::{Deserialize, Serialize};
36use tokio::sync::mpsc;
37use tracing::{debug, warn};
38
39#[derive(
40 Debug,
41 Clone,
42 PartialEq,
43 Eq,
44 PartialOrd,
45 Ord,
46 Hash,
47 Serialize,
48 Deserialize
49)]
50enum ExpressionType {
51 Local,
52 Global,
53}
54
55#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
57pub struct LocalExpressions {
58 pub local_mir: OptimizedMirRelationExpr,
59 pub optimizer_features: OptimizerFeatures,
60}
61
62#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
64pub struct GlobalExpressions {
65 pub global_mir: DataflowDescription<OptimizedMirRelationExpr>,
66 pub physical_plan: DataflowDescription<mz_compute_types::plan::Plan>,
67 pub dataflow_metainfos: DataflowMetainfo<Arc<OptimizerNotice>>,
68 pub optimizer_features: OptimizerFeatures,
69}
70
71impl GlobalExpressions {
72 fn index_imports(&self) -> impl Iterator<Item = &GlobalId> {
73 self.global_mir
74 .index_imports
75 .keys()
76 .chain(self.physical_plan.index_imports.keys())
77 }
78}
79
80#[derive(
81 Debug,
82 Clone,
83 PartialEq,
84 Eq,
85 PartialOrd,
86 Ord,
87 Hash,
88 Serialize,
89 Deserialize
90)]
91struct CacheKey {
92 build_version: String,
93 id: GlobalId,
94 expr_type: ExpressionType,
95}
96
97#[derive(Debug, PartialEq, Eq)]
98struct ExpressionCodec;
99
100impl DurableCacheCodec for ExpressionCodec {
101 type Key = CacheKey;
102 type Val = Bytes;
105 type KeyCodec = Bytes;
106 type ValCodec = Bytes;
107
108 fn schemas() -> (
109 <Self::KeyCodec as Codec>::Schema,
110 <Self::ValCodec as Codec>::Schema,
111 ) {
112 (VecU8Schema::default(), VecU8Schema::default())
113 }
114
115 fn encode(key: &Self::Key, val: &Self::Val) -> (Self::KeyCodec, Self::ValCodec) {
116 let key = bincode::serialize(key).expect("must serialize");
117 (Bytes::from(key), val.clone())
118 }
119
120 fn decode(key: &Self::KeyCodec, val: &Self::ValCodec) -> (Self::Key, Self::Val) {
121 let key = bincode::deserialize(key).expect("must deserialize");
122 (key, val.clone())
123 }
124}
125
126#[derive(Debug, Clone)]
128pub struct ExpressionCacheConfig {
129 pub build_version: Version,
130 pub persist: PersistClient,
131 pub shard_id: ShardId,
132 pub current_ids: BTreeSet<GlobalId>,
133 pub remove_prior_versions: bool,
134 pub compact_shard: bool,
135 pub dyncfgs: ConfigSet,
136}
137
138pub struct ExpressionCache {
140 build_version: Version,
141 durable_cache: DurableCache<ExpressionCodec>,
142}
143
144impl ExpressionCache {
145 pub async fn open(
157 ExpressionCacheConfig {
158 build_version,
159 persist,
160 shard_id,
161 current_ids,
162 remove_prior_versions,
163 compact_shard,
164 dyncfgs,
165 }: ExpressionCacheConfig,
166 ) -> (
167 Self,
168 BTreeMap<GlobalId, LocalExpressions>,
169 BTreeMap<GlobalId, GlobalExpressions>,
170 ) {
171 let durable_cache = DurableCache::new(&persist, shard_id, "expressions").await;
172 let mut cache = Self {
173 build_version,
174 durable_cache,
175 };
176
177 const RETRIES: usize = 100;
178 for _ in 0..RETRIES {
179 match cache
180 .try_open(¤t_ids, remove_prior_versions, compact_shard, &dyncfgs)
181 .await
182 {
183 Ok((local_expressions, global_expressions)) => {
184 return (cache, local_expressions, global_expressions);
185 }
186 Err(err) => debug!("failed to open cache: {err} ... retrying"),
187 }
188 }
189
190 panic!("Unable to open expression cache after {RETRIES} retries");
191 }
192
193 async fn try_open(
194 &mut self,
195 current_ids: &BTreeSet<GlobalId>,
196 remove_prior_versions: bool,
197 compact_shard: bool,
198 dyncfgs: &ConfigSet,
199 ) -> Result<
200 (
201 BTreeMap<GlobalId, LocalExpressions>,
202 BTreeMap<GlobalId, GlobalExpressions>,
203 ),
204 mz_durable_cache::Error,
205 > {
206 let mut keys_to_remove = Vec::new();
207 let mut local_expressions = BTreeMap::new();
208 let mut global_expressions = BTreeMap::new();
209
210 for (key, expressions) in self.durable_cache.entries_local() {
211 let build_version = match key.build_version.parse::<Version>() {
212 Ok(build_version) => build_version,
213 Err(err) => {
214 warn!("unable to parse build version: {key:?}: {err:?}");
215 keys_to_remove.push((key.clone(), None));
216 continue;
217 }
218 };
219 if build_version == self.build_version {
220 match key.expr_type {
222 ExpressionType::Local => {
223 let expressions: LocalExpressions = match bincode::deserialize(expressions)
224 {
225 Ok(expressions) => expressions,
226 Err(err) => {
227 soft_panic_or_log!(
228 "unable to deserialize local expressions: ({key:?}, {expressions:?}): {err:?}"
229 );
230 continue;
231 }
232 };
233 if !current_ids.contains(&key.id) {
235 keys_to_remove.push((key.clone(), None));
236 } else {
237 local_expressions.insert(key.id, expressions);
238 }
239 }
240 ExpressionType::Global => {
241 let expressions: GlobalExpressions = match bincode::deserialize(expressions)
242 {
243 Ok(expressions) => expressions,
244 Err(err) => {
245 soft_panic_or_log!(
246 "unable to deserialize global expressions: ({key:?}, {expressions:?}): {err:?}"
247 );
248 continue;
249 }
250 };
251 let index_dependencies: BTreeSet<_> =
253 expressions.index_imports().cloned().collect();
254 if !current_ids.contains(&key.id)
255 || !index_dependencies.is_subset(current_ids)
256 {
257 keys_to_remove.push((key.clone(), None));
258 } else {
259 global_expressions.insert(key.id, expressions);
260 }
261 }
262 }
263 } else if remove_prior_versions {
264 keys_to_remove.push((key.clone(), None));
266 }
267 }
268
269 let keys_to_remove: Vec<_> = keys_to_remove
270 .iter()
271 .map(|(key, expressions)| (key, expressions.as_ref()))
272 .collect();
273 self.durable_cache.try_set_many(&keys_to_remove).await?;
274
275 if remove_prior_versions {
276 self.durable_cache.upgrade_version().await;
278 }
279
280 if compact_shard {
281 let fuel = EXPRESSION_CACHE_FORCE_COMPACTION_FUEL.handle(dyncfgs);
282 let wait = EXPRESSION_CACHE_FORCE_COMPACTION_WAIT.handle(dyncfgs);
283 self.durable_cache
284 .dangerous_compact_shard(move || fuel.get(), move || wait.get())
285 .await;
286 }
287
288 Ok((local_expressions, global_expressions))
289 }
290
291 async fn update(
297 &mut self,
298 new_local_expressions: Vec<(GlobalId, LocalExpressions)>,
299 new_global_expressions: Vec<(GlobalId, GlobalExpressions)>,
300 invalidate_ids: BTreeSet<GlobalId>,
301 ) {
302 let mut entries = BTreeMap::new();
303 let build_version = self.build_version.to_string();
304 for id in invalidate_ids {
307 entries.insert(
308 CacheKey {
309 id,
310 build_version: build_version.clone(),
311 expr_type: ExpressionType::Local,
312 },
313 None,
314 );
315 entries.insert(
316 CacheKey {
317 id,
318 build_version: build_version.clone(),
319 expr_type: ExpressionType::Global,
320 },
321 None,
322 );
323 }
324 for (id, expressions) in new_local_expressions {
325 let expressions = match bincode::serialize(&expressions) {
326 Ok(expressions) => Bytes::from(expressions),
327 Err(err) => {
328 soft_panic_or_log!(
329 "unable to serialize local expressions: {expressions:?}: {err:?}"
330 );
331 continue;
332 }
333 };
334 entries.insert(
335 CacheKey {
336 id,
337 build_version: build_version.clone(),
338 expr_type: ExpressionType::Local,
339 },
340 Some(expressions),
341 );
342 }
343 for (id, expressions) in new_global_expressions {
344 let expressions = match bincode::serialize(&expressions) {
345 Ok(expressions) => Bytes::from(expressions),
346 Err(err) => {
347 soft_panic_or_log!(
348 "unable to serialize global expressions: {expressions:?}: {err:?}"
349 );
350 continue;
351 }
352 };
353 entries.insert(
354 CacheKey {
355 id,
356 build_version: build_version.clone(),
357 expr_type: ExpressionType::Global,
358 },
359 Some(expressions),
360 );
361 }
362 let entries: Vec<_> = entries
363 .iter()
364 .map(|(key, expressions)| (key, expressions.as_ref()))
365 .collect();
366 self.durable_cache.set_many(&entries).await
367 }
368}
369
370enum CacheOperation {
372 Update {
374 new_local_expressions: Vec<(GlobalId, LocalExpressions)>,
375 new_global_expressions: Vec<(GlobalId, GlobalExpressions)>,
376 invalidate_ids: BTreeSet<GlobalId>,
377 trigger: trigger::Trigger,
378 },
379}
380
381#[derive(Debug, Clone)]
382pub struct ExpressionCacheHandle {
383 tx: mpsc::UnboundedSender<CacheOperation>,
384}
385
386impl ExpressionCacheHandle {
387 pub async fn spawn_expression_cache(
391 config: ExpressionCacheConfig,
392 ) -> (
393 Self,
394 BTreeMap<GlobalId, LocalExpressions>,
395 BTreeMap<GlobalId, GlobalExpressions>,
396 ) {
397 let (mut cache, local_expressions, global_expressions) =
398 ExpressionCache::open(config).await;
399 let (tx, mut rx) = mpsc::unbounded_channel();
400 spawn(|| "expression-cache-task", async move {
401 loop {
402 while let Some(op) = rx.recv().await {
403 match op {
404 CacheOperation::Update {
405 new_local_expressions,
406 new_global_expressions,
407 invalidate_ids,
408 trigger: _trigger,
409 } => {
410 cache
411 .update(
412 new_local_expressions,
413 new_global_expressions,
414 invalidate_ids,
415 )
416 .await
417 }
418 }
419 }
420 }
421 });
422
423 (Self { tx }, local_expressions, global_expressions)
424 }
425
426 pub fn update(
427 &self,
428 new_local_expressions: Vec<(GlobalId, LocalExpressions)>,
429 new_global_expressions: Vec<(GlobalId, GlobalExpressions)>,
430 invalidate_ids: BTreeSet<GlobalId>,
431 ) -> impl Future<Output = ()> + use<> {
432 let (trigger, trigger_rx) = trigger::channel();
433 let op = CacheOperation::Update {
434 new_local_expressions,
435 new_global_expressions,
436 invalidate_ids,
437 trigger,
438 };
439 let _ = self.tx.send(op);
441 trigger_rx
442 }
443}
444
445#[cfg(test)]
446mod tests {
447 use std::collections::{BTreeMap, BTreeSet};
448
449 use bytes::Bytes;
450 use mz_compute_types::dataflows::{DataflowDescription, IndexDesc, IndexImport};
451 use mz_durable_cache::DurableCacheCodec;
452 use mz_dyncfg::ConfigSet;
453 use mz_expr::{MirRelationExpr, OptimizedMirRelationExpr};
454 use mz_persist_client::PersistClient;
455 use mz_persist_types::ShardId;
456 use mz_repr::{Datum, GlobalId, SqlRelationType, SqlScalarType};
457 use semver::Version;
458
459 use super::*;
460
461 #[mz_ore::test(tokio::test)]
462 #[cfg_attr(miri, ignore)] async fn expression_cache() {
464 let first_version = Version::new(0, 1, 0);
465 let second_version = Version::new(0, 2, 0);
466 let persist = PersistClient::new_for_tests().await;
467 let shard_id = ShardId::new();
468
469 let mut current_ids = BTreeSet::new();
470 let mut remove_prior_versions = false;
471 let compact_shard = false;
473 let dyncfgs = &mz_persist_client::cfg::all_dyncfgs(ConfigSet::default());
474
475 let mut next_id = 0;
476
477 let (mut local_exps, mut global_exps) = {
478 let (cache, local_exprs, global_exprs) =
480 ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
481 build_version: first_version.clone(),
482 persist: persist.clone(),
483 shard_id,
484 current_ids: current_ids.clone(),
485 remove_prior_versions,
486 compact_shard,
487 dyncfgs: dyncfgs.clone(),
488 })
489 .await;
490 assert_eq!(local_exprs, BTreeMap::new(), "new cache should be empty");
491 assert_eq!(global_exprs, BTreeMap::new(), "new cache should be empty");
492
493 let mut local_exps = BTreeMap::new();
495 let mut global_exps = BTreeMap::new();
496 for _ in 0..4 {
497 let id = GlobalId::User(next_id);
498 let local_exp = gen_local_expressions();
499 let global_exp = gen_global_expressions();
500
501 cache
502 .update(
503 vec![(id, local_exp.clone())],
504 vec![(id, global_exp.clone())],
505 BTreeSet::new(),
506 )
507 .await;
508
509 current_ids.insert(id);
510 current_ids.extend(global_exp.index_imports());
511 local_exps.insert(id, local_exp);
512 global_exps.insert(id, global_exp);
513
514 next_id += 1;
515 }
516 (local_exps, global_exps)
517 };
518
519 {
520 let (_cache, local_entries, global_entries) =
522 ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
523 build_version: first_version.clone(),
524 persist: persist.clone(),
525 shard_id,
526 current_ids: current_ids.clone(),
527 remove_prior_versions,
528 compact_shard,
529 dyncfgs: dyncfgs.clone(),
530 })
531 .await;
532 assert_eq!(
533 local_entries, local_exps,
534 "local expression with non-matching optimizer features should be removed during reconciliation"
535 );
536 assert_eq!(
537 global_entries, global_exps,
538 "global expression with non-matching optimizer features should be removed during reconciliation"
539 );
540 }
541
542 {
543 let id_to_remove = local_exps.keys().next().expect("not empty").clone();
545 current_ids.remove(&id_to_remove);
546 let _removed_local_exp = local_exps.remove(&id_to_remove);
547 let _removed_global_exp = global_exps.remove(&id_to_remove);
548
549 let (_cache, local_entries, global_entries) =
551 ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
552 build_version: first_version.clone(),
553 persist: persist.clone(),
554 shard_id,
555 current_ids: current_ids.clone(),
556 remove_prior_versions,
557 compact_shard,
558 dyncfgs: dyncfgs.clone(),
559 })
560 .await;
561 assert_eq!(
562 local_entries, local_exps,
563 "dropped local objects should be removed during reconciliation"
564 );
565 assert_eq!(
566 global_entries, global_exps,
567 "dropped global objects should be removed during reconciliation"
568 );
569 }
570
571 {
572 let global_exp_to_remove = global_exps.keys().next().expect("not empty").clone();
574 let removed_global_exp = global_exps
575 .remove(&global_exp_to_remove)
576 .expect("known to exist");
577 let dependency_to_remove = removed_global_exp
578 .index_imports()
579 .next()
580 .expect("generator always makes non-empty index imports");
581 current_ids.remove(dependency_to_remove);
582
583 let _removed_local_exp = local_exps.remove(dependency_to_remove);
585 let _removed_global_exp = global_exps.remove(dependency_to_remove);
586 global_exps.retain(|_, exp| {
588 let index_imports: BTreeSet<_> = exp.index_imports().collect();
589 !index_imports.contains(&dependency_to_remove)
590 });
591
592 let (_cache, local_entries, global_entries) =
594 ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
595 build_version: first_version.clone(),
596 persist: persist.clone(),
597 shard_id,
598 current_ids: current_ids.clone(),
599 remove_prior_versions,
600 compact_shard,
601 dyncfgs: dyncfgs.clone(),
602 })
603 .await;
604 assert_eq!(
605 local_entries, local_exps,
606 "dropped object dependencies should NOT remove local expressions"
607 );
608 assert_eq!(
609 global_entries, global_exps,
610 "dropped object dependencies should remove global expressions"
611 );
612 }
613
614 let (new_gen_local_exps, new_gen_global_exps) = {
615 let (cache, local_entries, global_entries) =
617 ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
618 build_version: second_version.clone(),
619 persist: persist.clone(),
620 shard_id,
621 current_ids: current_ids.clone(),
622 remove_prior_versions,
623 compact_shard,
624 dyncfgs: dyncfgs.clone(),
625 })
626 .await;
627 assert_eq!(
628 local_entries,
629 BTreeMap::new(),
630 "new version should be empty"
631 );
632 assert_eq!(
633 global_entries,
634 BTreeMap::new(),
635 "new version should be empty"
636 );
637
638 let mut local_exps = BTreeMap::new();
640 let mut global_exps = BTreeMap::new();
641 for _ in 0..2 {
642 let id = GlobalId::User(next_id);
643 let local_exp = gen_local_expressions();
644 let global_exp = gen_global_expressions();
645
646 cache
647 .update(
648 vec![(id, local_exp.clone())],
649 vec![(id, global_exp.clone())],
650 BTreeSet::new(),
651 )
652 .await;
653
654 current_ids.insert(id);
655 current_ids.extend(global_exp.index_imports());
656 local_exps.insert(id, local_exp);
657 global_exps.insert(id, global_exp);
658
659 next_id += 1;
660 }
661 (local_exps, global_exps)
662 };
663
664 {
665 let (_cache, local_entries, global_entries) =
667 ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
668 build_version: first_version.clone(),
669 persist: persist.clone(),
670 shard_id,
671 current_ids: current_ids.clone(),
672 remove_prior_versions,
673 compact_shard,
674 dyncfgs: dyncfgs.clone(),
675 })
676 .await;
677 assert_eq!(
678 local_entries, local_exps,
679 "Previous version local expressions should still exist"
680 );
681 assert_eq!(
682 global_entries, global_exps,
683 "Previous version global expressions should still exist"
684 );
685 }
686
687 {
688 remove_prior_versions = true;
690 let (_cache, local_entries, global_entries) =
691 ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
692 build_version: second_version.clone(),
693 persist: persist.clone(),
694 shard_id,
695 current_ids: current_ids.clone(),
696 remove_prior_versions,
697 compact_shard,
698 dyncfgs: dyncfgs.clone(),
699 })
700 .await;
701 assert_eq!(
702 local_entries, new_gen_local_exps,
703 "new version local expressions should be persisted"
704 );
705 assert_eq!(
706 global_entries, new_gen_global_exps,
707 "new version global expressions should be persisted"
708 );
709 }
710
711 {
712 let (_cache, local_entries, global_entries) =
714 ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
715 build_version: first_version.clone(),
716 persist: persist.clone(),
717 shard_id,
718 current_ids: current_ids.clone(),
719 remove_prior_versions,
720 compact_shard,
721 dyncfgs: dyncfgs.clone(),
722 })
723 .await;
724 assert_eq!(
725 local_entries,
726 BTreeMap::new(),
727 "Previous version local expressions should be cleared"
728 );
729 assert_eq!(
730 global_entries,
731 BTreeMap::new(),
732 "Previous version global expressions should be cleared"
733 );
734 }
735 }
736
737 #[mz_ore::test]
738 fn local_expr_cache_roundtrip() {
739 let key = CacheKey {
740 id: GlobalId::User(1),
741 build_version: "1.2.3".into(),
742 expr_type: ExpressionType::Local,
743 };
744 let val = gen_local_expressions();
745
746 let bincode_val = Bytes::from(bincode::serialize(&val).expect("must serialize"));
747 let (encoded_key, encoded_val) = ExpressionCodec::encode(&key, &bincode_val);
748 let (decoded_key, decoded_val) = ExpressionCodec::decode(&encoded_key, &encoded_val);
749 let decoded_val: LocalExpressions =
750 bincode::deserialize(&decoded_val).expect("local expressions should roundtrip");
751
752 assert_eq!(key, decoded_key);
753 assert_eq!(val, decoded_val);
754 }
755
756 #[mz_ore::test]
757 fn global_expr_cache_roundtrip() {
758 let key = CacheKey {
759 id: GlobalId::User(1),
760 build_version: "1.2.3".into(),
761 expr_type: ExpressionType::Global,
762 };
763 let val = gen_global_expressions();
764
765 let bincode_val = Bytes::from(bincode::serialize(&val).expect("must serialize"));
766 let (encoded_key, encoded_val) = ExpressionCodec::encode(&key, &bincode_val);
767 let (decoded_key, decoded_val) = ExpressionCodec::decode(&encoded_key, &encoded_val);
768 let decoded_val: GlobalExpressions =
769 bincode::deserialize(&decoded_val).expect("global expressions should roundtrip");
770
771 assert_eq!(key, decoded_key);
772 assert_eq!(val, decoded_val);
773 }
774
775 fn gen_local_expressions() -> LocalExpressions {
781 let datum = Datum::UInt64(rand::random());
782
783 LocalExpressions {
784 local_mir: OptimizedMirRelationExpr(MirRelationExpr::constant(
785 vec![vec![datum]],
786 SqlRelationType::new(vec![SqlScalarType::UInt64.nullable(false)]),
787 )),
788 optimizer_features: Default::default(),
789 }
790 }
791
792 fn gen_global_expressions() -> GlobalExpressions {
798 let name = format!("test-{}", rand::random::<u64>());
799
800 let mut global_mir = DataflowDescription::new(name.clone());
801 let mut physical_plan = DataflowDescription::new(name);
802
803 let index_imports = BTreeMap::from_iter([(
805 GlobalId::User(2),
806 IndexImport {
807 desc: IndexDesc {
808 on_id: GlobalId::User(1),
809 key: Default::default(),
810 },
811 typ: SqlRelationType::empty(),
812 monotonic: false,
813 with_snapshot: true,
814 },
815 )]);
816 global_mir.index_imports = index_imports.clone();
817 physical_plan.index_imports = index_imports;
818
819 GlobalExpressions {
820 global_mir,
821 physical_plan,
822 dataflow_metainfos: Default::default(),
823 optimizer_features: Default::default(),
824 }
825 }
826}