1use std::collections::{BTreeMap, BTreeSet};
13use std::future::Future;
14use std::sync::Arc;
15
16use bytes::Bytes;
17use mz_compute_types::dataflows::DataflowDescription;
18use mz_durable_cache::{DurableCache, DurableCacheCodec};
19use mz_dyncfg::ConfigSet;
20use mz_expr::OptimizedMirRelationExpr;
21use mz_ore::channel::trigger;
22use mz_ore::soft_panic_or_log;
23use mz_ore::task::spawn;
24use mz_persist_client::PersistClient;
25use mz_persist_client::cli::admin::{
26 EXPRESSION_CACHE_FORCE_COMPACTION_FUEL, EXPRESSION_CACHE_FORCE_COMPACTION_WAIT,
27};
28use mz_persist_types::codec_impls::VecU8Schema;
29use mz_persist_types::{Codec, ShardId};
30use mz_repr::GlobalId;
31use mz_repr::optimize::OptimizerFeatures;
32use mz_transform::dataflow::DataflowMetainfo;
33use mz_transform::notice::OptimizerNotice;
34use semver::Version;
35use serde::{Deserialize, Serialize};
36use tokio::sync::mpsc;
37use tracing::{debug, warn};
38
39#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
40enum ExpressionType {
41 Local,
42 Global,
43}
44
45#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
47pub struct LocalExpressions {
48 pub local_mir: OptimizedMirRelationExpr,
49 pub optimizer_features: OptimizerFeatures,
50}
51
52#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
54pub struct GlobalExpressions {
55 pub global_mir: DataflowDescription<OptimizedMirRelationExpr>,
56 pub physical_plan: DataflowDescription<mz_compute_types::plan::Plan>,
57 pub dataflow_metainfos: DataflowMetainfo<Arc<OptimizerNotice>>,
58 pub optimizer_features: OptimizerFeatures,
59}
60
61impl GlobalExpressions {
62 fn index_imports(&self) -> impl Iterator<Item = &GlobalId> {
63 self.global_mir
64 .index_imports
65 .keys()
66 .chain(self.physical_plan.index_imports.keys())
67 }
68}
69
70#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
71struct CacheKey {
72 build_version: String,
73 id: GlobalId,
74 expr_type: ExpressionType,
75}
76
77#[derive(Debug, PartialEq, Eq)]
78struct ExpressionCodec;
79
80impl DurableCacheCodec for ExpressionCodec {
81 type Key = CacheKey;
82 type Val = Bytes;
85 type KeyCodec = Bytes;
86 type ValCodec = Bytes;
87
88 fn schemas() -> (
89 <Self::KeyCodec as Codec>::Schema,
90 <Self::ValCodec as Codec>::Schema,
91 ) {
92 (VecU8Schema::default(), VecU8Schema::default())
93 }
94
95 fn encode(key: &Self::Key, val: &Self::Val) -> (Self::KeyCodec, Self::ValCodec) {
96 let key = bincode::serialize(key).expect("must serialize");
97 (Bytes::from(key), val.clone())
98 }
99
100 fn decode(key: &Self::KeyCodec, val: &Self::ValCodec) -> (Self::Key, Self::Val) {
101 let key = bincode::deserialize(key).expect("must deserialize");
102 (key, val.clone())
103 }
104}
105
106#[derive(Debug, Clone)]
108pub struct ExpressionCacheConfig {
109 pub build_version: Version,
110 pub persist: PersistClient,
111 pub shard_id: ShardId,
112 pub current_ids: BTreeSet<GlobalId>,
113 pub remove_prior_versions: bool,
114 pub compact_shard: bool,
115 pub dyncfgs: ConfigSet,
116}
117
118pub struct ExpressionCache {
120 build_version: Version,
121 durable_cache: DurableCache<ExpressionCodec>,
122}
123
124impl ExpressionCache {
125 pub async fn open(
137 ExpressionCacheConfig {
138 build_version,
139 persist,
140 shard_id,
141 current_ids,
142 remove_prior_versions,
143 compact_shard,
144 dyncfgs,
145 }: ExpressionCacheConfig,
146 ) -> (
147 Self,
148 BTreeMap<GlobalId, LocalExpressions>,
149 BTreeMap<GlobalId, GlobalExpressions>,
150 ) {
151 let durable_cache = DurableCache::new(&persist, shard_id, "expressions").await;
152 let mut cache = Self {
153 build_version,
154 durable_cache,
155 };
156
157 const RETRIES: usize = 100;
158 for _ in 0..RETRIES {
159 match cache
160 .try_open(¤t_ids, remove_prior_versions, compact_shard, &dyncfgs)
161 .await
162 {
163 Ok((local_expressions, global_expressions)) => {
164 return (cache, local_expressions, global_expressions);
165 }
166 Err(err) => debug!("failed to open cache: {err} ... retrying"),
167 }
168 }
169
170 panic!("Unable to open expression cache after {RETRIES} retries");
171 }
172
173 async fn try_open(
174 &mut self,
175 current_ids: &BTreeSet<GlobalId>,
176 remove_prior_versions: bool,
177 compact_shard: bool,
178 dyncfgs: &ConfigSet,
179 ) -> Result<
180 (
181 BTreeMap<GlobalId, LocalExpressions>,
182 BTreeMap<GlobalId, GlobalExpressions>,
183 ),
184 mz_durable_cache::Error,
185 > {
186 let mut keys_to_remove = Vec::new();
187 let mut local_expressions = BTreeMap::new();
188 let mut global_expressions = BTreeMap::new();
189
190 for (key, expressions) in self.durable_cache.entries_local() {
191 let build_version = match key.build_version.parse::<Version>() {
192 Ok(build_version) => build_version,
193 Err(err) => {
194 warn!("unable to parse build version: {key:?}: {err:?}");
195 keys_to_remove.push((key.clone(), None));
196 continue;
197 }
198 };
199 if build_version == self.build_version {
200 match key.expr_type {
202 ExpressionType::Local => {
203 let expressions: LocalExpressions = match bincode::deserialize(expressions)
204 {
205 Ok(expressions) => expressions,
206 Err(err) => {
207 soft_panic_or_log!(
208 "unable to deserialize local expressions: ({key:?}, {expressions:?}): {err:?}"
209 );
210 continue;
211 }
212 };
213 if !current_ids.contains(&key.id) {
215 keys_to_remove.push((key.clone(), None));
216 } else {
217 local_expressions.insert(key.id, expressions);
218 }
219 }
220 ExpressionType::Global => {
221 let expressions: GlobalExpressions = match bincode::deserialize(expressions)
222 {
223 Ok(expressions) => expressions,
224 Err(err) => {
225 soft_panic_or_log!(
226 "unable to deserialize global expressions: ({key:?}, {expressions:?}): {err:?}"
227 );
228 continue;
229 }
230 };
231 let index_dependencies: BTreeSet<_> =
233 expressions.index_imports().cloned().collect();
234 if !current_ids.contains(&key.id)
235 || !index_dependencies.is_subset(current_ids)
236 {
237 keys_to_remove.push((key.clone(), None));
238 } else {
239 global_expressions.insert(key.id, expressions);
240 }
241 }
242 }
243 } else if remove_prior_versions {
244 keys_to_remove.push((key.clone(), None));
246 }
247 }
248
249 let keys_to_remove: Vec<_> = keys_to_remove
250 .iter()
251 .map(|(key, expressions)| (key, expressions.as_ref()))
252 .collect();
253 self.durable_cache.try_set_many(&keys_to_remove).await?;
254
255 if remove_prior_versions {
256 self.durable_cache.upgrade_version().await;
258 }
259
260 if compact_shard {
261 let fuel = EXPRESSION_CACHE_FORCE_COMPACTION_FUEL.handle(dyncfgs);
262 let wait = EXPRESSION_CACHE_FORCE_COMPACTION_WAIT.handle(dyncfgs);
263 self.durable_cache
264 .dangerous_compact_shard(move || fuel.get(), move || wait.get())
265 .await;
266 }
267
268 Ok((local_expressions, global_expressions))
269 }
270
271 async fn update(
277 &mut self,
278 new_local_expressions: Vec<(GlobalId, LocalExpressions)>,
279 new_global_expressions: Vec<(GlobalId, GlobalExpressions)>,
280 invalidate_ids: BTreeSet<GlobalId>,
281 ) {
282 let mut entries = BTreeMap::new();
283 let build_version = self.build_version.to_string();
284 for id in invalidate_ids {
287 entries.insert(
288 CacheKey {
289 id,
290 build_version: build_version.clone(),
291 expr_type: ExpressionType::Local,
292 },
293 None,
294 );
295 entries.insert(
296 CacheKey {
297 id,
298 build_version: build_version.clone(),
299 expr_type: ExpressionType::Global,
300 },
301 None,
302 );
303 }
304 for (id, expressions) in new_local_expressions {
305 let expressions = match bincode::serialize(&expressions) {
306 Ok(expressions) => Bytes::from(expressions),
307 Err(err) => {
308 soft_panic_or_log!(
309 "unable to serialize local expressions: {expressions:?}: {err:?}"
310 );
311 continue;
312 }
313 };
314 entries.insert(
315 CacheKey {
316 id,
317 build_version: build_version.clone(),
318 expr_type: ExpressionType::Local,
319 },
320 Some(expressions),
321 );
322 }
323 for (id, expressions) in new_global_expressions {
324 let expressions = match bincode::serialize(&expressions) {
325 Ok(expressions) => Bytes::from(expressions),
326 Err(err) => {
327 soft_panic_or_log!(
328 "unable to serialize global expressions: {expressions:?}: {err:?}"
329 );
330 continue;
331 }
332 };
333 entries.insert(
334 CacheKey {
335 id,
336 build_version: build_version.clone(),
337 expr_type: ExpressionType::Global,
338 },
339 Some(expressions),
340 );
341 }
342 let entries: Vec<_> = entries
343 .iter()
344 .map(|(key, expressions)| (key, expressions.as_ref()))
345 .collect();
346 self.durable_cache.set_many(&entries).await
347 }
348}
349
350enum CacheOperation {
352 Update {
354 new_local_expressions: Vec<(GlobalId, LocalExpressions)>,
355 new_global_expressions: Vec<(GlobalId, GlobalExpressions)>,
356 invalidate_ids: BTreeSet<GlobalId>,
357 trigger: trigger::Trigger,
358 },
359}
360
361#[derive(Debug, Clone)]
362pub struct ExpressionCacheHandle {
363 tx: mpsc::UnboundedSender<CacheOperation>,
364}
365
366impl ExpressionCacheHandle {
367 pub async fn spawn_expression_cache(
371 config: ExpressionCacheConfig,
372 ) -> (
373 Self,
374 BTreeMap<GlobalId, LocalExpressions>,
375 BTreeMap<GlobalId, GlobalExpressions>,
376 ) {
377 let (mut cache, local_expressions, global_expressions) =
378 ExpressionCache::open(config).await;
379 let (tx, mut rx) = mpsc::unbounded_channel();
380 spawn(|| "expression-cache-task", async move {
381 loop {
382 while let Some(op) = rx.recv().await {
383 match op {
384 CacheOperation::Update {
385 new_local_expressions,
386 new_global_expressions,
387 invalidate_ids,
388 trigger: _trigger,
389 } => {
390 cache
391 .update(
392 new_local_expressions,
393 new_global_expressions,
394 invalidate_ids,
395 )
396 .await
397 }
398 }
399 }
400 }
401 });
402
403 (Self { tx }, local_expressions, global_expressions)
404 }
405
406 pub fn update(
407 &self,
408 new_local_expressions: Vec<(GlobalId, LocalExpressions)>,
409 new_global_expressions: Vec<(GlobalId, GlobalExpressions)>,
410 invalidate_ids: BTreeSet<GlobalId>,
411 ) -> impl Future<Output = ()> + use<> {
412 let (trigger, trigger_rx) = trigger::channel();
413 let op = CacheOperation::Update {
414 new_local_expressions,
415 new_global_expressions,
416 invalidate_ids,
417 trigger,
418 };
419 let _ = self.tx.send(op);
421 trigger_rx
422 }
423}
424
425#[cfg(test)]
426mod tests {
427 use std::collections::{BTreeMap, BTreeSet};
428
429 use bytes::Bytes;
430 use mz_compute_types::dataflows::{DataflowDescription, IndexDesc, IndexImport};
431 use mz_durable_cache::DurableCacheCodec;
432 use mz_dyncfg::ConfigSet;
433 use mz_expr::{MirRelationExpr, OptimizedMirRelationExpr};
434 use mz_persist_client::PersistClient;
435 use mz_persist_types::ShardId;
436 use mz_repr::{Datum, GlobalId, SqlRelationType, SqlScalarType};
437 use semver::Version;
438
439 use super::*;
440
441 #[mz_ore::test(tokio::test)]
442 #[cfg_attr(miri, ignore)] async fn expression_cache() {
444 let first_version = Version::new(0, 1, 0);
445 let second_version = Version::new(0, 2, 0);
446 let persist = PersistClient::new_for_tests().await;
447 let shard_id = ShardId::new();
448
449 let mut current_ids = BTreeSet::new();
450 let mut remove_prior_versions = false;
451 let compact_shard = false;
453 let dyncfgs = &mz_persist_client::cfg::all_dyncfgs(ConfigSet::default());
454
455 let mut next_id = 0;
456
457 let (mut local_exps, mut global_exps) = {
458 let (cache, local_exprs, global_exprs) =
460 ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
461 build_version: first_version.clone(),
462 persist: persist.clone(),
463 shard_id,
464 current_ids: current_ids.clone(),
465 remove_prior_versions,
466 compact_shard,
467 dyncfgs: dyncfgs.clone(),
468 })
469 .await;
470 assert_eq!(local_exprs, BTreeMap::new(), "new cache should be empty");
471 assert_eq!(global_exprs, BTreeMap::new(), "new cache should be empty");
472
473 let mut local_exps = BTreeMap::new();
475 let mut global_exps = BTreeMap::new();
476 for _ in 0..4 {
477 let id = GlobalId::User(next_id);
478 let local_exp = gen_local_expressions();
479 let global_exp = gen_global_expressions();
480
481 cache
482 .update(
483 vec![(id, local_exp.clone())],
484 vec![(id, global_exp.clone())],
485 BTreeSet::new(),
486 )
487 .await;
488
489 current_ids.insert(id);
490 current_ids.extend(global_exp.index_imports());
491 local_exps.insert(id, local_exp);
492 global_exps.insert(id, global_exp);
493
494 next_id += 1;
495 }
496 (local_exps, global_exps)
497 };
498
499 {
500 let (_cache, local_entries, global_entries) =
502 ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
503 build_version: first_version.clone(),
504 persist: persist.clone(),
505 shard_id,
506 current_ids: current_ids.clone(),
507 remove_prior_versions,
508 compact_shard,
509 dyncfgs: dyncfgs.clone(),
510 })
511 .await;
512 assert_eq!(
513 local_entries, local_exps,
514 "local expression with non-matching optimizer features should be removed during reconciliation"
515 );
516 assert_eq!(
517 global_entries, global_exps,
518 "global expression with non-matching optimizer features should be removed during reconciliation"
519 );
520 }
521
522 {
523 let id_to_remove = local_exps.keys().next().expect("not empty").clone();
525 current_ids.remove(&id_to_remove);
526 let _removed_local_exp = local_exps.remove(&id_to_remove);
527 let _removed_global_exp = global_exps.remove(&id_to_remove);
528
529 let (_cache, local_entries, global_entries) =
531 ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
532 build_version: first_version.clone(),
533 persist: persist.clone(),
534 shard_id,
535 current_ids: current_ids.clone(),
536 remove_prior_versions,
537 compact_shard,
538 dyncfgs: dyncfgs.clone(),
539 })
540 .await;
541 assert_eq!(
542 local_entries, local_exps,
543 "dropped local objects should be removed during reconciliation"
544 );
545 assert_eq!(
546 global_entries, global_exps,
547 "dropped global objects should be removed during reconciliation"
548 );
549 }
550
551 {
552 let global_exp_to_remove = global_exps.keys().next().expect("not empty").clone();
554 let removed_global_exp = global_exps
555 .remove(&global_exp_to_remove)
556 .expect("known to exist");
557 let dependency_to_remove = removed_global_exp
558 .index_imports()
559 .next()
560 .expect("generator always makes non-empty index imports");
561 current_ids.remove(dependency_to_remove);
562
563 let _removed_local_exp = local_exps.remove(dependency_to_remove);
565 let _removed_global_exp = global_exps.remove(dependency_to_remove);
566 global_exps.retain(|_, exp| {
568 let index_imports: BTreeSet<_> = exp.index_imports().collect();
569 !index_imports.contains(&dependency_to_remove)
570 });
571
572 let (_cache, local_entries, global_entries) =
574 ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
575 build_version: first_version.clone(),
576 persist: persist.clone(),
577 shard_id,
578 current_ids: current_ids.clone(),
579 remove_prior_versions,
580 compact_shard,
581 dyncfgs: dyncfgs.clone(),
582 })
583 .await;
584 assert_eq!(
585 local_entries, local_exps,
586 "dropped object dependencies should NOT remove local expressions"
587 );
588 assert_eq!(
589 global_entries, global_exps,
590 "dropped object dependencies should remove global expressions"
591 );
592 }
593
594 let (new_gen_local_exps, new_gen_global_exps) = {
595 let (cache, local_entries, global_entries) =
597 ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
598 build_version: second_version.clone(),
599 persist: persist.clone(),
600 shard_id,
601 current_ids: current_ids.clone(),
602 remove_prior_versions,
603 compact_shard,
604 dyncfgs: dyncfgs.clone(),
605 })
606 .await;
607 assert_eq!(
608 local_entries,
609 BTreeMap::new(),
610 "new version should be empty"
611 );
612 assert_eq!(
613 global_entries,
614 BTreeMap::new(),
615 "new version should be empty"
616 );
617
618 let mut local_exps = BTreeMap::new();
620 let mut global_exps = BTreeMap::new();
621 for _ in 0..2 {
622 let id = GlobalId::User(next_id);
623 let local_exp = gen_local_expressions();
624 let global_exp = gen_global_expressions();
625
626 cache
627 .update(
628 vec![(id, local_exp.clone())],
629 vec![(id, global_exp.clone())],
630 BTreeSet::new(),
631 )
632 .await;
633
634 current_ids.insert(id);
635 current_ids.extend(global_exp.index_imports());
636 local_exps.insert(id, local_exp);
637 global_exps.insert(id, global_exp);
638
639 next_id += 1;
640 }
641 (local_exps, global_exps)
642 };
643
644 {
645 let (_cache, local_entries, global_entries) =
647 ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
648 build_version: first_version.clone(),
649 persist: persist.clone(),
650 shard_id,
651 current_ids: current_ids.clone(),
652 remove_prior_versions,
653 compact_shard,
654 dyncfgs: dyncfgs.clone(),
655 })
656 .await;
657 assert_eq!(
658 local_entries, local_exps,
659 "Previous version local expressions should still exist"
660 );
661 assert_eq!(
662 global_entries, global_exps,
663 "Previous version global expressions should still exist"
664 );
665 }
666
667 {
668 remove_prior_versions = true;
670 let (_cache, local_entries, global_entries) =
671 ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
672 build_version: second_version.clone(),
673 persist: persist.clone(),
674 shard_id,
675 current_ids: current_ids.clone(),
676 remove_prior_versions,
677 compact_shard,
678 dyncfgs: dyncfgs.clone(),
679 })
680 .await;
681 assert_eq!(
682 local_entries, new_gen_local_exps,
683 "new version local expressions should be persisted"
684 );
685 assert_eq!(
686 global_entries, new_gen_global_exps,
687 "new version global expressions should be persisted"
688 );
689 }
690
691 {
692 let (_cache, local_entries, global_entries) =
694 ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
695 build_version: first_version.clone(),
696 persist: persist.clone(),
697 shard_id,
698 current_ids: current_ids.clone(),
699 remove_prior_versions,
700 compact_shard,
701 dyncfgs: dyncfgs.clone(),
702 })
703 .await;
704 assert_eq!(
705 local_entries,
706 BTreeMap::new(),
707 "Previous version local expressions should be cleared"
708 );
709 assert_eq!(
710 global_entries,
711 BTreeMap::new(),
712 "Previous version global expressions should be cleared"
713 );
714 }
715 }
716
717 #[mz_ore::test]
718 fn local_expr_cache_roundtrip() {
719 let key = CacheKey {
720 id: GlobalId::User(1),
721 build_version: "1.2.3".into(),
722 expr_type: ExpressionType::Local,
723 };
724 let val = gen_local_expressions();
725
726 let bincode_val = Bytes::from(bincode::serialize(&val).expect("must serialize"));
727 let (encoded_key, encoded_val) = ExpressionCodec::encode(&key, &bincode_val);
728 let (decoded_key, decoded_val) = ExpressionCodec::decode(&encoded_key, &encoded_val);
729 let decoded_val: LocalExpressions =
730 bincode::deserialize(&decoded_val).expect("local expressions should roundtrip");
731
732 assert_eq!(key, decoded_key);
733 assert_eq!(val, decoded_val);
734 }
735
736 #[mz_ore::test]
737 fn global_expr_cache_roundtrip() {
738 let key = CacheKey {
739 id: GlobalId::User(1),
740 build_version: "1.2.3".into(),
741 expr_type: ExpressionType::Global,
742 };
743 let val = gen_global_expressions();
744
745 let bincode_val = Bytes::from(bincode::serialize(&val).expect("must serialize"));
746 let (encoded_key, encoded_val) = ExpressionCodec::encode(&key, &bincode_val);
747 let (decoded_key, decoded_val) = ExpressionCodec::decode(&encoded_key, &encoded_val);
748 let decoded_val: GlobalExpressions =
749 bincode::deserialize(&decoded_val).expect("global expressions should roundtrip");
750
751 assert_eq!(key, decoded_key);
752 assert_eq!(val, decoded_val);
753 }
754
755 fn gen_local_expressions() -> LocalExpressions {
761 let datum = Datum::UInt64(rand::random());
762
763 LocalExpressions {
764 local_mir: OptimizedMirRelationExpr(MirRelationExpr::constant(
765 vec![vec![datum]],
766 SqlRelationType::new(vec![SqlScalarType::UInt64.nullable(false)]),
767 )),
768 optimizer_features: Default::default(),
769 }
770 }
771
772 fn gen_global_expressions() -> GlobalExpressions {
778 let name = format!("test-{}", rand::random::<u64>());
779
780 let mut global_mir = DataflowDescription::new(name.clone());
781 let mut physical_plan = DataflowDescription::new(name);
782
783 let index_imports = BTreeMap::from_iter([(
785 GlobalId::User(2),
786 IndexImport {
787 desc: IndexDesc {
788 on_id: GlobalId::User(1),
789 key: Default::default(),
790 },
791 typ: SqlRelationType::empty(),
792 monotonic: false,
793 },
794 )]);
795 global_mir.index_imports = index_imports.clone();
796 physical_plan.index_imports = index_imports;
797
798 GlobalExpressions {
799 global_mir,
800 physical_plan,
801 dataflow_metainfos: Default::default(),
802 optimizer_features: Default::default(),
803 }
804 }
805}