1use std::collections::{BTreeMap, BTreeSet};
13use std::future::Future;
14use std::sync::Arc;
15
16use bytes::Bytes;
17use mz_compute_types::dataflows::DataflowDescription;
18use mz_durable_cache::{DurableCache, DurableCacheCodec};
19use mz_dyncfg::ConfigSet;
20use mz_expr::OptimizedMirRelationExpr;
21use mz_ore::channel::trigger;
22use mz_ore::soft_panic_or_log;
23use mz_ore::task::spawn;
24use mz_persist_client::PersistClient;
25use mz_persist_client::cli::admin::{
26 EXPRESSION_CACHE_FORCE_COMPACTION_FUEL, EXPRESSION_CACHE_FORCE_COMPACTION_WAIT,
27};
28use mz_persist_types::codec_impls::VecU8Schema;
29use mz_persist_types::{Codec, ShardId};
30use mz_repr::GlobalId;
31use mz_repr::optimize::OptimizerFeatures;
32use mz_transform::dataflow::DataflowMetainfo;
33use mz_transform::notice::OptimizerNotice;
34use semver::Version;
35use serde::{Deserialize, Serialize};
36use tokio::sync::mpsc;
37use tracing::{debug, warn};
38
39#[derive(
40 Debug,
41 Clone,
42 PartialEq,
43 Eq,
44 PartialOrd,
45 Ord,
46 Hash,
47 Serialize,
48 Deserialize
49)]
50enum ExpressionType {
51 Local,
52 Global,
53}
54
55#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
57pub struct LocalExpressions {
58 pub local_mir: OptimizedMirRelationExpr,
59 pub optimizer_features: OptimizerFeatures,
60}
61
62#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
64pub struct GlobalExpressions {
65 pub global_mir: DataflowDescription<OptimizedMirRelationExpr>,
66 pub physical_plan: DataflowDescription<mz_compute_types::plan::Plan>,
67 pub dataflow_metainfos: DataflowMetainfo<Arc<OptimizerNotice>>,
68 pub optimizer_features: OptimizerFeatures,
69}
70
71impl GlobalExpressions {
72 fn index_imports(&self) -> impl Iterator<Item = &GlobalId> {
73 self.global_mir
74 .index_imports
75 .keys()
76 .chain(self.physical_plan.index_imports.keys())
77 }
78}
79
80#[derive(
81 Debug,
82 Clone,
83 PartialEq,
84 Eq,
85 PartialOrd,
86 Ord,
87 Hash,
88 Serialize,
89 Deserialize
90)]
91struct CacheKey {
92 build_version: String,
93 id: GlobalId,
94 expr_type: ExpressionType,
95}
96
97#[derive(Debug, PartialEq, Eq)]
98struct ExpressionCodec;
99
100impl DurableCacheCodec for ExpressionCodec {
101 type Key = CacheKey;
102 type Val = Bytes;
105 type KeyCodec = Bytes;
106 type ValCodec = Bytes;
107
108 fn schemas() -> (
109 <Self::KeyCodec as Codec>::Schema,
110 <Self::ValCodec as Codec>::Schema,
111 ) {
112 (VecU8Schema::default(), VecU8Schema::default())
113 }
114
115 fn encode(key: &Self::Key, val: &Self::Val) -> (Self::KeyCodec, Self::ValCodec) {
116 let key = bincode::serialize(key).expect("must serialize");
117 (Bytes::from(key), val.clone())
118 }
119
120 fn decode(key: &Self::KeyCodec, val: &Self::ValCodec) -> (Self::Key, Self::Val) {
121 let key = bincode::deserialize(key).expect("must deserialize");
122 (key, val.clone())
123 }
124}
125
126#[derive(Debug, Clone)]
128pub struct ExpressionCacheConfig {
129 pub build_version: Version,
130 pub persist: PersistClient,
131 pub shard_id: ShardId,
132 pub current_ids: BTreeSet<GlobalId>,
133 pub remove_prior_versions: bool,
134 pub compact_shard: bool,
135 pub dyncfgs: ConfigSet,
136}
137
138pub struct ExpressionCache {
140 build_version: Version,
141 durable_cache: DurableCache<ExpressionCodec>,
142}
143
144impl ExpressionCache {
145 pub async fn open(
157 ExpressionCacheConfig {
158 build_version,
159 persist,
160 shard_id,
161 current_ids,
162 remove_prior_versions,
163 compact_shard,
164 dyncfgs,
165 }: ExpressionCacheConfig,
166 ) -> (
167 Self,
168 BTreeMap<GlobalId, LocalExpressions>,
169 BTreeMap<GlobalId, GlobalExpressions>,
170 ) {
171 let durable_cache = DurableCache::new(&persist, shard_id, "expressions").await;
172 let mut cache = Self {
173 build_version,
174 durable_cache,
175 };
176
177 const RETRIES: usize = 100;
178 for _ in 0..RETRIES {
179 match cache
180 .try_open(¤t_ids, remove_prior_versions, compact_shard, &dyncfgs)
181 .await
182 {
183 Ok((local_expressions, global_expressions)) => {
184 return (cache, local_expressions, global_expressions);
185 }
186 Err(err) => debug!("failed to open cache: {err} ... retrying"),
187 }
188 }
189
190 panic!("Unable to open expression cache after {RETRIES} retries");
191 }
192
193 async fn try_open(
194 &mut self,
195 current_ids: &BTreeSet<GlobalId>,
196 remove_prior_versions: bool,
197 compact_shard: bool,
198 dyncfgs: &ConfigSet,
199 ) -> Result<
200 (
201 BTreeMap<GlobalId, LocalExpressions>,
202 BTreeMap<GlobalId, GlobalExpressions>,
203 ),
204 mz_durable_cache::Error,
205 > {
206 let mut keys_to_remove = Vec::new();
207 let mut local_expressions = BTreeMap::new();
208 let mut global_expressions = BTreeMap::new();
209
210 for (key, expressions) in self.durable_cache.entries_local() {
211 let build_version = match key.build_version.parse::<Version>() {
212 Ok(build_version) => build_version,
213 Err(err) => {
214 warn!("unable to parse build version: {key:?}: {err:?}");
215 keys_to_remove.push((key.clone(), None));
216 continue;
217 }
218 };
219 if build_version == self.build_version {
220 match key.expr_type {
222 ExpressionType::Local => {
223 let expressions: LocalExpressions = match bincode::deserialize(expressions)
224 {
225 Ok(expressions) => expressions,
226 Err(err) => {
227 soft_panic_or_log!(
228 "unable to deserialize local expressions: ({key:?}, {expressions:?}): {err:?}"
229 );
230 continue;
231 }
232 };
233 if !current_ids.contains(&key.id) {
235 keys_to_remove.push((key.clone(), None));
236 } else {
237 local_expressions.insert(key.id, expressions);
238 }
239 }
240 ExpressionType::Global => {
241 let expressions: GlobalExpressions = match bincode::deserialize(expressions)
242 {
243 Ok(expressions) => expressions,
244 Err(err) => {
245 soft_panic_or_log!(
246 "unable to deserialize global expressions: ({key:?}, {expressions:?}): {err:?}"
247 );
248 continue;
249 }
250 };
251 let index_dependencies: BTreeSet<_> =
253 expressions.index_imports().cloned().collect();
254 if !current_ids.contains(&key.id)
255 || !index_dependencies.is_subset(current_ids)
256 {
257 keys_to_remove.push((key.clone(), None));
258 } else {
259 global_expressions.insert(key.id, expressions);
260 }
261 }
262 }
263 } else if remove_prior_versions {
264 keys_to_remove.push((key.clone(), None));
266 }
267 }
268
269 let keys_to_remove: Vec<_> = keys_to_remove
270 .iter()
271 .map(|(key, expressions)| (key, expressions.as_ref()))
272 .collect();
273 self.durable_cache.try_set_many(&keys_to_remove).await?;
274
275 if remove_prior_versions {
276 self.durable_cache.upgrade_version().await;
278 }
279
280 if compact_shard {
281 let fuel = EXPRESSION_CACHE_FORCE_COMPACTION_FUEL.handle(dyncfgs);
282 let wait = EXPRESSION_CACHE_FORCE_COMPACTION_WAIT.handle(dyncfgs);
283 self.durable_cache
284 .dangerous_compact_shard(move || fuel.get(), move || wait.get())
285 .await;
286 }
287
288 Ok((local_expressions, global_expressions))
289 }
290
291 async fn update(
297 &mut self,
298 new_local_expressions: Vec<(GlobalId, LocalExpressions)>,
299 new_global_expressions: Vec<(GlobalId, GlobalExpressions)>,
300 invalidate_ids: BTreeSet<GlobalId>,
301 ) {
302 let mut entries = BTreeMap::new();
303 let build_version = self.build_version.to_string();
304 for id in invalidate_ids {
307 entries.insert(
308 CacheKey {
309 id,
310 build_version: build_version.clone(),
311 expr_type: ExpressionType::Local,
312 },
313 None,
314 );
315 entries.insert(
316 CacheKey {
317 id,
318 build_version: build_version.clone(),
319 expr_type: ExpressionType::Global,
320 },
321 None,
322 );
323 }
324 for (id, expressions) in new_local_expressions {
325 let expressions = match bincode::serialize(&expressions) {
326 Ok(expressions) => Bytes::from(expressions),
327 Err(err) => {
328 soft_panic_or_log!(
329 "unable to serialize local expressions: {expressions:?}: {err:?}"
330 );
331 continue;
332 }
333 };
334 entries.insert(
335 CacheKey {
336 id,
337 build_version: build_version.clone(),
338 expr_type: ExpressionType::Local,
339 },
340 Some(expressions),
341 );
342 }
343 for (id, expressions) in new_global_expressions {
344 let expressions = match bincode::serialize(&expressions) {
345 Ok(expressions) => Bytes::from(expressions),
346 Err(err) => {
347 soft_panic_or_log!(
348 "unable to serialize global expressions: {expressions:?}: {err:?}"
349 );
350 continue;
351 }
352 };
353 entries.insert(
354 CacheKey {
355 id,
356 build_version: build_version.clone(),
357 expr_type: ExpressionType::Global,
358 },
359 Some(expressions),
360 );
361 }
362 let entries: Vec<_> = entries
363 .iter()
364 .map(|(key, expressions)| (key, expressions.as_ref()))
365 .collect();
366 self.durable_cache.set_many(&entries).await
367 }
368}
369
370enum CacheOperation {
372 Update {
374 new_local_expressions: Vec<(GlobalId, LocalExpressions)>,
375 new_global_expressions: Vec<(GlobalId, GlobalExpressions)>,
376 invalidate_ids: BTreeSet<GlobalId>,
377 trigger: trigger::Trigger,
378 },
379}
380
381#[derive(Debug, Clone)]
382pub struct ExpressionCacheHandle {
383 tx: mpsc::UnboundedSender<CacheOperation>,
384}
385
386impl ExpressionCacheHandle {
387 pub async fn spawn_expression_cache(
391 config: ExpressionCacheConfig,
392 ) -> (
393 Self,
394 BTreeMap<GlobalId, LocalExpressions>,
395 BTreeMap<GlobalId, GlobalExpressions>,
396 ) {
397 let (mut cache, local_expressions, global_expressions) =
398 ExpressionCache::open(config).await;
399 let (tx, mut rx) = mpsc::unbounded_channel();
400 spawn(|| "expression-cache-task", async move {
401 while let Some(op) = rx.recv().await {
402 match op {
403 CacheOperation::Update {
404 new_local_expressions,
405 new_global_expressions,
406 invalidate_ids,
407 trigger: _trigger,
408 } => {
409 cache
410 .update(
411 new_local_expressions,
412 new_global_expressions,
413 invalidate_ids,
414 )
415 .await
416 }
417 }
418 }
419 });
420
421 (Self { tx }, local_expressions, global_expressions)
422 }
423
424 pub fn update(
425 &self,
426 new_local_expressions: Vec<(GlobalId, LocalExpressions)>,
427 new_global_expressions: Vec<(GlobalId, GlobalExpressions)>,
428 invalidate_ids: BTreeSet<GlobalId>,
429 ) -> impl Future<Output = ()> + use<> {
430 let (trigger, trigger_rx) = trigger::channel();
431 let op = CacheOperation::Update {
432 new_local_expressions,
433 new_global_expressions,
434 invalidate_ids,
435 trigger,
436 };
437 let _ = self.tx.send(op);
439 trigger_rx
440 }
441}
442
443#[cfg(test)]
444mod tests {
445 use std::collections::{BTreeMap, BTreeSet};
446
447 use bytes::Bytes;
448 use mz_compute_types::dataflows::{DataflowDescription, IndexDesc, IndexImport};
449 use mz_durable_cache::DurableCacheCodec;
450 use mz_dyncfg::ConfigSet;
451 use mz_expr::{MirRelationExpr, OptimizedMirRelationExpr};
452 use mz_persist_client::PersistClient;
453 use mz_persist_types::ShardId;
454 use mz_repr::{Datum, GlobalId, ReprRelationType, ReprScalarType};
455 use semver::Version;
456
457 use super::*;
458
459 #[mz_ore::test(tokio::test)]
460 #[cfg_attr(miri, ignore)] async fn expression_cache() {
462 let first_version = Version::new(0, 1, 0);
463 let second_version = Version::new(0, 2, 0);
464 let persist = PersistClient::new_for_tests().await;
465 let shard_id = ShardId::new();
466
467 let mut current_ids = BTreeSet::new();
468 let mut remove_prior_versions = false;
469 let compact_shard = false;
471 let dyncfgs = &mz_persist_client::cfg::all_dyncfgs(ConfigSet::default());
472
473 let mut next_id = 0;
474
475 let (mut local_exps, mut global_exps) = {
476 let (cache, local_exprs, global_exprs) =
478 ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
479 build_version: first_version.clone(),
480 persist: persist.clone(),
481 shard_id,
482 current_ids: current_ids.clone(),
483 remove_prior_versions,
484 compact_shard,
485 dyncfgs: dyncfgs.clone(),
486 })
487 .await;
488 assert_eq!(local_exprs, BTreeMap::new(), "new cache should be empty");
489 assert_eq!(global_exprs, BTreeMap::new(), "new cache should be empty");
490
491 let mut local_exps = BTreeMap::new();
493 let mut global_exps = BTreeMap::new();
494 for _ in 0..4 {
495 let id = GlobalId::User(next_id);
496 let local_exp = gen_local_expressions();
497 let global_exp = gen_global_expressions();
498
499 cache
500 .update(
501 vec![(id, local_exp.clone())],
502 vec![(id, global_exp.clone())],
503 BTreeSet::new(),
504 )
505 .await;
506
507 current_ids.insert(id);
508 current_ids.extend(global_exp.index_imports());
509 local_exps.insert(id, local_exp);
510 global_exps.insert(id, global_exp);
511
512 next_id += 1;
513 }
514 (local_exps, global_exps)
515 };
516
517 {
518 let (_cache, local_entries, global_entries) =
520 ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
521 build_version: first_version.clone(),
522 persist: persist.clone(),
523 shard_id,
524 current_ids: current_ids.clone(),
525 remove_prior_versions,
526 compact_shard,
527 dyncfgs: dyncfgs.clone(),
528 })
529 .await;
530 assert_eq!(
531 local_entries, local_exps,
532 "local expression with non-matching optimizer features should be removed during reconciliation"
533 );
534 assert_eq!(
535 global_entries, global_exps,
536 "global expression with non-matching optimizer features should be removed during reconciliation"
537 );
538 }
539
540 {
541 let id_to_remove = local_exps.keys().next().expect("not empty").clone();
543 current_ids.remove(&id_to_remove);
544 let _removed_local_exp = local_exps.remove(&id_to_remove);
545 let _removed_global_exp = global_exps.remove(&id_to_remove);
546
547 let (_cache, local_entries, global_entries) =
549 ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
550 build_version: first_version.clone(),
551 persist: persist.clone(),
552 shard_id,
553 current_ids: current_ids.clone(),
554 remove_prior_versions,
555 compact_shard,
556 dyncfgs: dyncfgs.clone(),
557 })
558 .await;
559 assert_eq!(
560 local_entries, local_exps,
561 "dropped local objects should be removed during reconciliation"
562 );
563 assert_eq!(
564 global_entries, global_exps,
565 "dropped global objects should be removed during reconciliation"
566 );
567 }
568
569 {
570 let global_exp_to_remove = global_exps.keys().next().expect("not empty").clone();
572 let removed_global_exp = global_exps
573 .remove(&global_exp_to_remove)
574 .expect("known to exist");
575 let dependency_to_remove = removed_global_exp
576 .index_imports()
577 .next()
578 .expect("generator always makes non-empty index imports");
579 current_ids.remove(dependency_to_remove);
580
581 let _removed_local_exp = local_exps.remove(dependency_to_remove);
583 let _removed_global_exp = global_exps.remove(dependency_to_remove);
584 global_exps.retain(|_, exp| {
586 let index_imports: BTreeSet<_> = exp.index_imports().collect();
587 !index_imports.contains(&dependency_to_remove)
588 });
589
590 let (_cache, local_entries, global_entries) =
592 ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
593 build_version: first_version.clone(),
594 persist: persist.clone(),
595 shard_id,
596 current_ids: current_ids.clone(),
597 remove_prior_versions,
598 compact_shard,
599 dyncfgs: dyncfgs.clone(),
600 })
601 .await;
602 assert_eq!(
603 local_entries, local_exps,
604 "dropped object dependencies should NOT remove local expressions"
605 );
606 assert_eq!(
607 global_entries, global_exps,
608 "dropped object dependencies should remove global expressions"
609 );
610 }
611
612 let (new_gen_local_exps, new_gen_global_exps) = {
613 let (cache, local_entries, global_entries) =
615 ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
616 build_version: second_version.clone(),
617 persist: persist.clone(),
618 shard_id,
619 current_ids: current_ids.clone(),
620 remove_prior_versions,
621 compact_shard,
622 dyncfgs: dyncfgs.clone(),
623 })
624 .await;
625 assert_eq!(
626 local_entries,
627 BTreeMap::new(),
628 "new version should be empty"
629 );
630 assert_eq!(
631 global_entries,
632 BTreeMap::new(),
633 "new version should be empty"
634 );
635
636 let mut local_exps = BTreeMap::new();
638 let mut global_exps = BTreeMap::new();
639 for _ in 0..2 {
640 let id = GlobalId::User(next_id);
641 let local_exp = gen_local_expressions();
642 let global_exp = gen_global_expressions();
643
644 cache
645 .update(
646 vec![(id, local_exp.clone())],
647 vec![(id, global_exp.clone())],
648 BTreeSet::new(),
649 )
650 .await;
651
652 current_ids.insert(id);
653 current_ids.extend(global_exp.index_imports());
654 local_exps.insert(id, local_exp);
655 global_exps.insert(id, global_exp);
656
657 next_id += 1;
658 }
659 (local_exps, global_exps)
660 };
661
662 {
663 let (_cache, local_entries, global_entries) =
665 ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
666 build_version: first_version.clone(),
667 persist: persist.clone(),
668 shard_id,
669 current_ids: current_ids.clone(),
670 remove_prior_versions,
671 compact_shard,
672 dyncfgs: dyncfgs.clone(),
673 })
674 .await;
675 assert_eq!(
676 local_entries, local_exps,
677 "Previous version local expressions should still exist"
678 );
679 assert_eq!(
680 global_entries, global_exps,
681 "Previous version global expressions should still exist"
682 );
683 }
684
685 {
686 remove_prior_versions = true;
688 let (_cache, local_entries, global_entries) =
689 ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
690 build_version: second_version.clone(),
691 persist: persist.clone(),
692 shard_id,
693 current_ids: current_ids.clone(),
694 remove_prior_versions,
695 compact_shard,
696 dyncfgs: dyncfgs.clone(),
697 })
698 .await;
699 assert_eq!(
700 local_entries, new_gen_local_exps,
701 "new version local expressions should be persisted"
702 );
703 assert_eq!(
704 global_entries, new_gen_global_exps,
705 "new version global expressions should be persisted"
706 );
707 }
708
709 {
710 let (_cache, local_entries, global_entries) =
712 ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
713 build_version: first_version.clone(),
714 persist: persist.clone(),
715 shard_id,
716 current_ids: current_ids.clone(),
717 remove_prior_versions,
718 compact_shard,
719 dyncfgs: dyncfgs.clone(),
720 })
721 .await;
722 assert_eq!(
723 local_entries,
724 BTreeMap::new(),
725 "Previous version local expressions should be cleared"
726 );
727 assert_eq!(
728 global_entries,
729 BTreeMap::new(),
730 "Previous version global expressions should be cleared"
731 );
732 }
733 }
734
735 #[mz_ore::test]
736 fn local_expr_cache_roundtrip() {
737 let key = CacheKey {
738 id: GlobalId::User(1),
739 build_version: "1.2.3".into(),
740 expr_type: ExpressionType::Local,
741 };
742 let val = gen_local_expressions();
743
744 let bincode_val = Bytes::from(bincode::serialize(&val).expect("must serialize"));
745 let (encoded_key, encoded_val) = ExpressionCodec::encode(&key, &bincode_val);
746 let (decoded_key, decoded_val) = ExpressionCodec::decode(&encoded_key, &encoded_val);
747 let decoded_val: LocalExpressions =
748 bincode::deserialize(&decoded_val).expect("local expressions should roundtrip");
749
750 assert_eq!(key, decoded_key);
751 assert_eq!(val, decoded_val);
752 }
753
754 #[mz_ore::test]
755 fn global_expr_cache_roundtrip() {
756 let key = CacheKey {
757 id: GlobalId::User(1),
758 build_version: "1.2.3".into(),
759 expr_type: ExpressionType::Global,
760 };
761 let val = gen_global_expressions();
762
763 let bincode_val = Bytes::from(bincode::serialize(&val).expect("must serialize"));
764 let (encoded_key, encoded_val) = ExpressionCodec::encode(&key, &bincode_val);
765 let (decoded_key, decoded_val) = ExpressionCodec::decode(&encoded_key, &encoded_val);
766 let decoded_val: GlobalExpressions =
767 bincode::deserialize(&decoded_val).expect("global expressions should roundtrip");
768
769 assert_eq!(key, decoded_key);
770 assert_eq!(val, decoded_val);
771 }
772
773 fn gen_local_expressions() -> LocalExpressions {
779 let datum = Datum::UInt64(rand::random());
780
781 LocalExpressions {
782 local_mir: OptimizedMirRelationExpr(MirRelationExpr::constant(
783 vec![vec![datum]],
784 ReprRelationType::new(vec![ReprScalarType::UInt64.nullable(false)]),
785 )),
786 optimizer_features: Default::default(),
787 }
788 }
789
790 fn gen_global_expressions() -> GlobalExpressions {
796 let name = format!("test-{}", rand::random::<u64>());
797
798 let mut global_mir = DataflowDescription::new(name.clone());
799 let mut physical_plan = DataflowDescription::new(name);
800
801 let index_imports = BTreeMap::from_iter([(
803 GlobalId::User(2),
804 IndexImport {
805 desc: IndexDesc {
806 on_id: GlobalId::User(1),
807 key: Default::default(),
808 },
809 typ: ReprRelationType::empty(),
810 monotonic: false,
811 with_snapshot: true,
812 },
813 )]);
814 global_mir.index_imports = index_imports.clone();
815 physical_plan.index_imports = index_imports;
816
817 GlobalExpressions {
818 global_mir,
819 physical_plan,
820 dataflow_metainfos: Default::default(),
821 optimizer_features: Default::default(),
822 }
823 }
824}