mz_catalog/
expr_cache.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A cache for optimized expressions.
11
12use std::collections::{BTreeMap, BTreeSet};
13use std::future::Future;
14use std::sync::Arc;
15
16use bytes::Bytes;
17use mz_compute_types::dataflows::DataflowDescription;
18use mz_durable_cache::{DurableCache, DurableCacheCodec};
19use mz_dyncfg::ConfigSet;
20use mz_expr::OptimizedMirRelationExpr;
21use mz_ore::channel::trigger;
22use mz_ore::soft_panic_or_log;
23use mz_ore::task::spawn;
24use mz_persist_client::PersistClient;
25use mz_persist_client::cli::admin::{
26    EXPRESSION_CACHE_FORCE_COMPACTION_FUEL, EXPRESSION_CACHE_FORCE_COMPACTION_WAIT,
27};
28use mz_persist_types::codec_impls::VecU8Schema;
29use mz_persist_types::{Codec, ShardId};
30use mz_repr::GlobalId;
31use mz_repr::optimize::OptimizerFeatures;
32use mz_transform::dataflow::DataflowMetainfo;
33use mz_transform::notice::OptimizerNotice;
34use proptest_derive::Arbitrary;
35use semver::Version;
36use serde::{Deserialize, Serialize};
37use timely::Container;
38use tokio::sync::mpsc;
39use tracing::{debug, warn};
40
41#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize, Arbitrary)]
42enum ExpressionType {
43    Local,
44    Global,
45}
46
47/// The data that is cached per catalog object as a result of local optimizations.
48#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
49pub struct LocalExpressions {
50    pub local_mir: OptimizedMirRelationExpr,
51    pub optimizer_features: OptimizerFeatures,
52}
53
54/// The data that is cached per catalog object as a result of global optimizations.
55#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
56pub struct GlobalExpressions {
57    pub global_mir: DataflowDescription<OptimizedMirRelationExpr>,
58    pub physical_plan: DataflowDescription<mz_compute_types::plan::Plan>,
59    pub dataflow_metainfos: DataflowMetainfo<Arc<OptimizerNotice>>,
60    pub optimizer_features: OptimizerFeatures,
61}
62
63impl GlobalExpressions {
64    fn index_imports(&self) -> impl Iterator<Item = &GlobalId> {
65        self.global_mir
66            .index_imports
67            .keys()
68            .chain(self.physical_plan.index_imports.keys())
69    }
70}
71
72#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize, Arbitrary)]
73struct CacheKey {
74    build_version: String,
75    id: GlobalId,
76    expr_type: ExpressionType,
77}
78
79#[derive(Debug, PartialEq, Eq)]
80struct ExpressionCodec;
81
82impl DurableCacheCodec for ExpressionCodec {
83    type Key = CacheKey;
84    // We use a raw bytes instead of `Expressions` so that there is no backwards compatibility
85    // requirement on `Expressions` between versions.
86    type Val = Bytes;
87    type KeyCodec = Bytes;
88    type ValCodec = Bytes;
89
90    fn schemas() -> (
91        <Self::KeyCodec as Codec>::Schema,
92        <Self::ValCodec as Codec>::Schema,
93    ) {
94        (VecU8Schema::default(), VecU8Schema::default())
95    }
96
97    fn encode(key: &Self::Key, val: &Self::Val) -> (Self::KeyCodec, Self::ValCodec) {
98        let key = bincode::serialize(key).expect("must serialize");
99        (Bytes::from(key), val.clone())
100    }
101
102    fn decode(key: &Self::KeyCodec, val: &Self::ValCodec) -> (Self::Key, Self::Val) {
103        let key = bincode::deserialize(key).expect("must deserialize");
104        (key, val.clone())
105    }
106}
107
108/// Configuration needed to initialize an [`ExpressionCache`].
109#[derive(Debug, Clone)]
110pub struct ExpressionCacheConfig {
111    pub build_version: Version,
112    pub persist: PersistClient,
113    pub shard_id: ShardId,
114    pub current_ids: BTreeSet<GlobalId>,
115    pub remove_prior_versions: bool,
116    pub compact_shard: bool,
117    pub dyncfgs: ConfigSet,
118}
119
120/// A durable cache of optimized expressions.
121pub struct ExpressionCache {
122    build_version: Version,
123    durable_cache: DurableCache<ExpressionCodec>,
124}
125
126impl ExpressionCache {
127    /// Creates a new [`ExpressionCache`] and reconciles all entries in current build version.
128    /// Reconciliation will remove all entries that are not in `current_ids` and remove all
129    /// entries that have optimizer features that are not equal to `optimizer_features`.
130    ///
131    /// If `remove_prior_versions` is `true`, then all previous versions are durably removed from the
132    /// cache.
133    ///
134    /// If `compact_shard` is `true`, then this function will block on fully compacting the backing
135    /// persist shard.
136    ///
137    /// Returns all cached expressions in the current build version, after reconciliation.
138    pub async fn open(
139        ExpressionCacheConfig {
140            build_version,
141            persist,
142            shard_id,
143            current_ids,
144            remove_prior_versions,
145            compact_shard,
146            dyncfgs,
147        }: ExpressionCacheConfig,
148    ) -> (
149        Self,
150        BTreeMap<GlobalId, LocalExpressions>,
151        BTreeMap<GlobalId, GlobalExpressions>,
152    ) {
153        let durable_cache = DurableCache::new(&persist, shard_id, "expressions").await;
154        let mut cache = Self {
155            build_version,
156            durable_cache,
157        };
158
159        const RETRIES: usize = 100;
160        for _ in 0..RETRIES {
161            match cache
162                .try_open(&current_ids, remove_prior_versions, compact_shard, &dyncfgs)
163                .await
164            {
165                Ok((local_expressions, global_expressions)) => {
166                    return (cache, local_expressions, global_expressions);
167                }
168                Err(err) => debug!("failed to open cache: {err} ... retrying"),
169            }
170        }
171
172        panic!("Unable to open expression cache after {RETRIES} retries");
173    }
174
175    async fn try_open(
176        &mut self,
177        current_ids: &BTreeSet<GlobalId>,
178        remove_prior_versions: bool,
179        compact_shard: bool,
180        dyncfgs: &ConfigSet,
181    ) -> Result<
182        (
183            BTreeMap<GlobalId, LocalExpressions>,
184            BTreeMap<GlobalId, GlobalExpressions>,
185        ),
186        mz_durable_cache::Error,
187    > {
188        let mut keys_to_remove = Vec::new();
189        let mut local_expressions = BTreeMap::new();
190        let mut global_expressions = BTreeMap::new();
191
192        for (key, expressions) in self.durable_cache.entries_local() {
193            let build_version = match key.build_version.parse::<Version>() {
194                Ok(build_version) => build_version,
195                Err(err) => {
196                    warn!("unable to parse build version: {key:?}: {err:?}");
197                    keys_to_remove.push((key.clone(), None));
198                    continue;
199                }
200            };
201            if build_version == self.build_version {
202                // Only deserialize the current version.
203                match key.expr_type {
204                    ExpressionType::Local => {
205                        let expressions: LocalExpressions = match bincode::deserialize(expressions)
206                        {
207                            Ok(expressions) => expressions,
208                            Err(err) => {
209                                soft_panic_or_log!(
210                                    "unable to deserialize local expressions: ({key:?}, {expressions:?}): {err:?}"
211                                );
212                                continue;
213                            }
214                        };
215                        // Remove dropped IDs.
216                        if !current_ids.contains(&key.id) {
217                            keys_to_remove.push((key.clone(), None));
218                        } else {
219                            local_expressions.insert(key.id, expressions);
220                        }
221                    }
222                    ExpressionType::Global => {
223                        let expressions: GlobalExpressions = match bincode::deserialize(expressions)
224                        {
225                            Ok(expressions) => expressions,
226                            Err(err) => {
227                                soft_panic_or_log!(
228                                    "unable to deserialize global expressions: ({key:?}, {expressions:?}): {err:?}"
229                                );
230                                continue;
231                            }
232                        };
233                        // Remove dropped IDs and expressions that rely on dropped indexes.
234                        let index_dependencies: BTreeSet<_> =
235                            expressions.index_imports().cloned().collect();
236                        if !current_ids.contains(&key.id)
237                            || !index_dependencies.is_subset(current_ids)
238                        {
239                            keys_to_remove.push((key.clone(), None));
240                        } else {
241                            global_expressions.insert(key.id, expressions);
242                        }
243                    }
244                }
245            } else if remove_prior_versions {
246                // Remove expressions from previous versions.
247                keys_to_remove.push((key.clone(), None));
248            }
249        }
250
251        let keys_to_remove: Vec<_> = keys_to_remove
252            .iter()
253            .map(|(key, expressions)| (key, expressions.as_ref()))
254            .collect();
255        self.durable_cache.try_set_many(&keys_to_remove).await?;
256
257        if compact_shard {
258            let fuel = EXPRESSION_CACHE_FORCE_COMPACTION_FUEL.handle(dyncfgs);
259            let wait = EXPRESSION_CACHE_FORCE_COMPACTION_WAIT.handle(dyncfgs);
260            self.durable_cache
261                .dangerous_compact_shard(move || fuel.get(), move || wait.get())
262                .await;
263        }
264
265        Ok((local_expressions, global_expressions))
266    }
267
268    /// Durably removes all entries given by `invalidate_ids` and inserts `new_local_expressions`
269    /// and `new_global_expressions` into current build version.
270    ///
271    /// If there is a duplicate ID in both `invalidate_ids` and one of the new expressions vector,
272    /// then the final value will be taken from the new expressions vector.
273    async fn update(
274        &mut self,
275        new_local_expressions: Vec<(GlobalId, LocalExpressions)>,
276        new_global_expressions: Vec<(GlobalId, GlobalExpressions)>,
277        invalidate_ids: BTreeSet<GlobalId>,
278    ) {
279        let mut entries = BTreeMap::new();
280        let build_version = self.build_version.to_string();
281        // Important to do `invalidate_ids` first, so that `new_X_expressions` overwrites duplicate
282        // keys.
283        for id in invalidate_ids {
284            entries.insert(
285                CacheKey {
286                    id,
287                    build_version: build_version.clone(),
288                    expr_type: ExpressionType::Local,
289                },
290                None,
291            );
292            entries.insert(
293                CacheKey {
294                    id,
295                    build_version: build_version.clone(),
296                    expr_type: ExpressionType::Global,
297                },
298                None,
299            );
300        }
301        for (id, expressions) in new_local_expressions {
302            let expressions = match bincode::serialize(&expressions) {
303                Ok(expressions) => Bytes::from(expressions),
304                Err(err) => {
305                    soft_panic_or_log!(
306                        "unable to serialize local expressions: {expressions:?}: {err:?}"
307                    );
308                    continue;
309                }
310            };
311            entries.insert(
312                CacheKey {
313                    id,
314                    build_version: build_version.clone(),
315                    expr_type: ExpressionType::Local,
316                },
317                Some(expressions),
318            );
319        }
320        for (id, expressions) in new_global_expressions {
321            let expressions = match bincode::serialize(&expressions) {
322                Ok(expressions) => Bytes::from(expressions),
323                Err(err) => {
324                    soft_panic_or_log!(
325                        "unable to serialize global expressions: {expressions:?}: {err:?}"
326                    );
327                    continue;
328                }
329            };
330            entries.insert(
331                CacheKey {
332                    id,
333                    build_version: build_version.clone(),
334                    expr_type: ExpressionType::Global,
335                },
336                Some(expressions),
337            );
338        }
339        let entries: Vec<_> = entries
340            .iter()
341            .map(|(key, expressions)| (key, expressions.as_ref()))
342            .collect();
343        self.durable_cache.set_many(&entries).await
344    }
345}
346
347/// Operations to perform on the cache.
348enum CacheOperation {
349    /// See [`ExpressionCache::update`].
350    Update {
351        new_local_expressions: Vec<(GlobalId, LocalExpressions)>,
352        new_global_expressions: Vec<(GlobalId, GlobalExpressions)>,
353        invalidate_ids: BTreeSet<GlobalId>,
354        trigger: trigger::Trigger,
355    },
356}
357
358#[derive(Debug, Clone)]
359pub struct ExpressionCacheHandle {
360    tx: mpsc::UnboundedSender<CacheOperation>,
361}
362
363impl ExpressionCacheHandle {
364    /// Spawns a task responsible for managing the expression cache. See [`ExpressionCache::open`].
365    ///
366    /// Returns a handle to interact with the cache and the initial contents of the cache.
367    pub async fn spawn_expression_cache(
368        config: ExpressionCacheConfig,
369    ) -> (
370        Self,
371        BTreeMap<GlobalId, LocalExpressions>,
372        BTreeMap<GlobalId, GlobalExpressions>,
373    ) {
374        let (mut cache, local_expressions, global_expressions) =
375            ExpressionCache::open(config).await;
376        let (tx, mut rx) = mpsc::unbounded_channel();
377        spawn(|| "expression-cache-task", async move {
378            loop {
379                while let Some(op) = rx.recv().await {
380                    match op {
381                        CacheOperation::Update {
382                            new_local_expressions,
383                            new_global_expressions,
384                            invalidate_ids,
385                            trigger: _trigger,
386                        } => {
387                            cache
388                                .update(
389                                    new_local_expressions,
390                                    new_global_expressions,
391                                    invalidate_ids,
392                                )
393                                .await
394                        }
395                    }
396                }
397            }
398        });
399
400        (Self { tx }, local_expressions, global_expressions)
401    }
402
403    pub fn update(
404        &self,
405        new_local_expressions: Vec<(GlobalId, LocalExpressions)>,
406        new_global_expressions: Vec<(GlobalId, GlobalExpressions)>,
407        invalidate_ids: BTreeSet<GlobalId>,
408    ) -> impl Future<Output = ()> + use<> {
409        let (trigger, trigger_rx) = trigger::channel();
410        let op = CacheOperation::Update {
411            new_local_expressions,
412            new_global_expressions,
413            invalidate_ids,
414            trigger,
415        };
416        // If the send fails, then we must be shutting down.
417        let _ = self.tx.send(op);
418        trigger_rx
419    }
420}
421
422#[cfg(test)]
423mod tests {
424    use std::collections::{BTreeMap, BTreeSet};
425    use std::marker::PhantomData;
426    use std::sync::Arc;
427    use std::time::{Duration, Instant};
428
429    use bytes::Bytes;
430    use mz_compute_types::dataflows::DataflowDescription;
431    use mz_durable_cache::DurableCacheCodec;
432    use mz_dyncfg::ConfigSet;
433    use mz_expr::OptimizedMirRelationExpr;
434    use mz_ore::test::timeout;
435    use mz_persist_client::PersistClient;
436    use mz_persist_types::ShardId;
437    use mz_repr::GlobalId;
438    use mz_repr::optimize::OptimizerFeatures;
439    use mz_transform::dataflow::DataflowMetainfo;
440    use mz_transform::notice::OptimizerNotice;
441    use proptest::arbitrary::{Arbitrary, any};
442    use proptest::prelude::{BoxedStrategy, ProptestConfig};
443    use proptest::proptest;
444    use proptest::strategy::{Strategy, ValueTree};
445    use proptest::test_runner::{RngAlgorithm, TestRng, TestRunner};
446    use semver::Version;
447    use tracing::info;
448
449    use crate::expr_cache::{
450        CacheKey, ExpressionCacheConfig, ExpressionCacheHandle, ExpressionCodec, GlobalExpressions,
451        LocalExpressions,
452    };
453
454    impl Arbitrary for LocalExpressions {
455        type Parameters = ();
456
457        fn arbitrary_with((): Self::Parameters) -> Self::Strategy {
458            (
459                any::<OptimizedMirRelationExpr>(),
460                any::<OptimizerFeatures>(),
461            )
462                .prop_map(|(local_mir, optimizer_features)| LocalExpressions {
463                    local_mir,
464                    optimizer_features,
465                })
466                .boxed()
467        }
468
469        type Strategy = BoxedStrategy<Self>;
470    }
471
472    impl Arbitrary for GlobalExpressions {
473        type Parameters = ();
474        fn arbitrary_with((): Self::Parameters) -> Self::Strategy {
475            (
476                any::<DataflowDescription<OptimizedMirRelationExpr>>(),
477                any::<DataflowDescription<mz_compute_types::plan::Plan>>(),
478                any::<DataflowMetainfo<Arc<OptimizerNotice>>>(),
479                any::<OptimizerFeatures>(),
480            )
481                .prop_map(
482                    |(global_mir, physical_plan, dataflow_metainfos, optimizer_features)| {
483                        GlobalExpressions {
484                            global_mir,
485                            physical_plan,
486                            dataflow_metainfos,
487                            optimizer_features,
488                        }
489                    },
490                )
491                .boxed()
492        }
493
494        type Strategy = BoxedStrategy<Self>;
495    }
496
497    /// The expressions can be extremely slow to generate, so we have this hacky struct that bails
498    /// if an expression is taking to long to generate and tries to generate a new one. Of course
499    /// this means that we will never test expressions above a certain complexity. This is a
500    /// worthwhile trade-off to prevent timeouts in CI.
501    struct ArbitraryTimeout<T: Arbitrary + Send + 'static> {
502        _phantom: PhantomData<T>,
503    }
504
505    impl<T: Arbitrary + Send> ArbitraryTimeout<T> {
506        // Number of attempts to generate a value before panicking. The maximum time spent
507        // generating a value is `GENERATE_ATTEMPTS` * `TIMEOUT_SECS`.
508        const GENERATE_ATTEMPTS: u64 = 10;
509        // Amount of time in seconds before we give up trying to generate a single value.
510        const TIMEOUT_SECS: u64 = 10;
511
512        fn new() -> Self {
513            Self {
514                _phantom: Default::default(),
515            }
516        }
517
518        fn new_tree() -> Box<dyn ValueTree<Value = T>>
519        where
520            T: 'static,
521        {
522            // Important to update the RNG each time, or we'll end up generating the same struct
523            // each time.
524            let seed: [u8; 32] = rand::random();
525            let mut test_runner = TestRunner::deterministic();
526            let rng = test_runner.rng();
527            *rng = TestRng::from_seed(RngAlgorithm::ChaCha, &seed);
528            Box::new(T::arbitrary().new_tree(&mut test_runner).expect("valid"))
529        }
530
531        fn generate(&self) -> T {
532            for _ in 0..Self::GENERATE_ATTEMPTS {
533                if let Ok(val) = self.try_generate() {
534                    return val;
535                }
536            }
537            panic!("timed out generating a value");
538        }
539
540        fn try_generate(&self) -> Result<T, ()> {
541            // Note it's very important to use the thread based version of `timeout` and not the
542            // async task based version. Generating a value in a task will never await and therefore
543            // always run to completion while ignoring the timeout.
544            match timeout(Duration::from_secs(Self::TIMEOUT_SECS), || {
545                // TODO(jkosh44) It would be nice to re-use this tree on success, instead of having
546                // to re-generate a new tree every call.
547                Ok(Self::new_tree().current())
548            }) {
549                Ok(val) => Ok(val),
550                Err(_) => {
551                    info!("timed out generating a value");
552                    Err(())
553                }
554            }
555        }
556    }
557
558    #[mz_ore::test(tokio::test)]
559    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
560    async fn expression_cache() {
561        let local_tree: ArbitraryTimeout<LocalExpressions> = ArbitraryTimeout::new();
562        let global_tree: ArbitraryTimeout<GlobalExpressions> = ArbitraryTimeout::new();
563
564        let first_version = Version::new(0, 1, 0);
565        let second_version = Version::new(0, 2, 0);
566        let persist = PersistClient::new_for_tests().await;
567        let shard_id = ShardId::new();
568
569        let mut current_ids = BTreeSet::new();
570        let mut remove_prior_versions = false;
571        // Compacting the shard takes too long, so we leave it to integration tests.
572        let compact_shard = false;
573        let dyncfgs = &mz_persist_client::cfg::all_dyncfgs(ConfigSet::default());
574
575        let mut next_id = 0;
576
577        let (mut local_exps, mut global_exps) = {
578            // Open a new empty cache.
579            let (cache, local_exprs, global_exprs) =
580                ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
581                    build_version: first_version.clone(),
582                    persist: persist.clone(),
583                    shard_id,
584                    current_ids: current_ids.clone(),
585                    remove_prior_versions,
586                    compact_shard,
587                    dyncfgs: dyncfgs.clone(),
588                })
589                .await;
590            assert_eq!(local_exprs, BTreeMap::new(), "new cache should be empty");
591            assert_eq!(global_exprs, BTreeMap::new(), "new cache should be empty");
592
593            // Insert some expressions into the cache.
594            let mut local_exps = BTreeMap::new();
595            let mut global_exps = BTreeMap::new();
596            for _ in 0..4 {
597                let id = GlobalId::User(next_id);
598                let start = Instant::now();
599                let local_exp = local_tree.generate();
600                let global_exp = global_tree.generate();
601                info!("Generating exps took: {:?}", start.elapsed());
602
603                cache
604                    .update(
605                        vec![(id, local_exp.clone())],
606                        vec![(id, global_exp.clone())],
607                        BTreeSet::new(),
608                    )
609                    .await;
610
611                current_ids.insert(id);
612                current_ids.extend(global_exp.index_imports());
613                local_exps.insert(id, local_exp);
614                global_exps.insert(id, global_exp);
615
616                next_id += 1;
617            }
618            (local_exps, global_exps)
619        };
620
621        {
622            // Re-open the cache.
623            let (_cache, local_entries, global_entries) =
624                ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
625                    build_version: first_version.clone(),
626                    persist: persist.clone(),
627                    shard_id,
628                    current_ids: current_ids.clone(),
629                    remove_prior_versions,
630                    compact_shard,
631                    dyncfgs: dyncfgs.clone(),
632                })
633                .await;
634            assert_eq!(
635                local_entries, local_exps,
636                "local expression with non-matching optimizer features should be removed during reconciliation"
637            );
638            assert_eq!(
639                global_entries, global_exps,
640                "global expression with non-matching optimizer features should be removed during reconciliation"
641            );
642        }
643
644        {
645            // Simulate dropping an object.
646            let id_to_remove = local_exps.keys().next().expect("not empty").clone();
647            current_ids.remove(&id_to_remove);
648            let _removed_local_exp = local_exps.remove(&id_to_remove);
649            let _removed_global_exp = global_exps.remove(&id_to_remove);
650
651            // Re-open the cache.
652            let (_cache, local_entries, global_entries) =
653                ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
654                    build_version: first_version.clone(),
655                    persist: persist.clone(),
656                    shard_id,
657                    current_ids: current_ids.clone(),
658                    remove_prior_versions,
659                    compact_shard,
660                    dyncfgs: dyncfgs.clone(),
661                })
662                .await;
663            assert_eq!(
664                local_entries, local_exps,
665                "dropped local objects should be removed during reconciliation"
666            );
667            assert_eq!(
668                global_entries, global_exps,
669                "dropped global objects should be removed during reconciliation"
670            );
671        }
672
673        {
674            // Simulate dropping an object dependency.
675            let global_exp_to_remove = global_exps.keys().next().expect("not empty").clone();
676            let removed_global_exp = global_exps
677                .remove(&global_exp_to_remove)
678                .expect("known to exist");
679            let dependency_to_remove = removed_global_exp
680                .index_imports()
681                .next()
682                .expect("arbitrary impl always makes non-empty vecs");
683            current_ids.remove(dependency_to_remove);
684
685            // If the dependency is also tracked in the cache remove it.
686            let _removed_local_exp = local_exps.remove(dependency_to_remove);
687            let _removed_global_exp = global_exps.remove(dependency_to_remove);
688            // Remove any other exps that depend on dependency.
689            global_exps.retain(|_, exp| {
690                let index_imports: BTreeSet<_> = exp.index_imports().collect();
691                !index_imports.contains(&dependency_to_remove)
692            });
693
694            // Re-open the cache.
695            let (_cache, local_entries, global_entries) =
696                ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
697                    build_version: first_version.clone(),
698                    persist: persist.clone(),
699                    shard_id,
700                    current_ids: current_ids.clone(),
701                    remove_prior_versions,
702                    compact_shard,
703                    dyncfgs: dyncfgs.clone(),
704                })
705                .await;
706            assert_eq!(
707                local_entries, local_exps,
708                "dropped object dependencies should NOT remove local expressions"
709            );
710            assert_eq!(
711                global_entries, global_exps,
712                "dropped object dependencies should remove global expressions"
713            );
714        }
715
716        let (new_gen_local_exps, new_gen_global_exps) = {
717            // Open the cache at a new version.
718            let (cache, local_entries, global_entries) =
719                ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
720                    build_version: second_version.clone(),
721                    persist: persist.clone(),
722                    shard_id,
723                    current_ids: current_ids.clone(),
724                    remove_prior_versions,
725                    compact_shard,
726                    dyncfgs: dyncfgs.clone(),
727                })
728                .await;
729            assert_eq!(
730                local_entries,
731                BTreeMap::new(),
732                "new version should be empty"
733            );
734            assert_eq!(
735                global_entries,
736                BTreeMap::new(),
737                "new version should be empty"
738            );
739
740            // Insert some expressions at the new version.
741            let mut local_exps = BTreeMap::new();
742            let mut global_exps = BTreeMap::new();
743            for _ in 0..2 {
744                let id = GlobalId::User(next_id);
745                let start = Instant::now();
746                let local_exp = local_tree.generate();
747                let global_exp = global_tree.generate();
748                info!("Generating exps took: {:?}", start.elapsed());
749
750                cache
751                    .update(
752                        vec![(id, local_exp.clone())],
753                        vec![(id, global_exp.clone())],
754                        BTreeSet::new(),
755                    )
756                    .await;
757
758                current_ids.insert(id);
759                current_ids.extend(global_exp.index_imports());
760                local_exps.insert(id, local_exp);
761                global_exps.insert(id, global_exp);
762
763                next_id += 1;
764            }
765            (local_exps, global_exps)
766        };
767
768        {
769            // Re-open the cache at the first version.
770            let (_cache, local_entries, global_entries) =
771                ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
772                    build_version: first_version.clone(),
773                    persist: persist.clone(),
774                    shard_id,
775                    current_ids: current_ids.clone(),
776                    remove_prior_versions,
777                    compact_shard,
778                    dyncfgs: dyncfgs.clone(),
779                })
780                .await;
781            assert_eq!(
782                local_entries, local_exps,
783                "Previous version local expressions should still exist"
784            );
785            assert_eq!(
786                global_entries, global_exps,
787                "Previous version global expressions should still exist"
788            );
789        }
790
791        {
792            // Open the cache at a new version and clear previous versions.
793            remove_prior_versions = true;
794            let (_cache, local_entries, global_entries) =
795                ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
796                    build_version: second_version.clone(),
797                    persist: persist.clone(),
798                    shard_id,
799                    current_ids: current_ids.clone(),
800                    remove_prior_versions,
801                    compact_shard,
802                    dyncfgs: dyncfgs.clone(),
803                })
804                .await;
805            assert_eq!(
806                local_entries, new_gen_local_exps,
807                "new version local expressions should be persisted"
808            );
809            assert_eq!(
810                global_entries, new_gen_global_exps,
811                "new version global expressions should be persisted"
812            );
813        }
814
815        {
816            // Re-open the cache at the first version.
817            let (_cache, local_entries, global_entries) =
818                ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
819                    build_version: first_version.clone(),
820                    persist: persist.clone(),
821                    shard_id,
822                    current_ids: current_ids.clone(),
823                    remove_prior_versions,
824                    compact_shard,
825                    dyncfgs: dyncfgs.clone(),
826                })
827                .await;
828            assert_eq!(
829                local_entries,
830                BTreeMap::new(),
831                "Previous version local expressions should be cleared"
832            );
833            assert_eq!(
834                global_entries,
835                BTreeMap::new(),
836                "Previous version global expressions should be cleared"
837            );
838        }
839    }
840
841    proptest! {
842        // Generating the expression structs can take an extremely long amount of time because
843        // they are recursive, which can cause test timeouts.
844        #![proptest_config(ProptestConfig::with_cases(1))]
845
846        #[mz_ore::test]
847        #[cfg_attr(miri, ignore)]
848        fn local_expr_cache_roundtrip(key in any::<CacheKey>()) {
849            let local_tree: ArbitraryTimeout<LocalExpressions> = ArbitraryTimeout::new();
850            let val = local_tree.generate();
851
852            let bincode_val = Bytes::from(bincode::serialize(&val).expect("must serialize"));
853            let (encoded_key, encoded_val) = ExpressionCodec::encode(&key, &bincode_val);
854            let (decoded_key, decoded_val) = ExpressionCodec::decode(&encoded_key, &encoded_val);
855            let decoded_val: LocalExpressions = bincode::deserialize(&decoded_val).expect("local expressions should roundtrip");
856
857            assert_eq!(key, decoded_key);
858            assert_eq!(val, decoded_val);
859        }
860
861        #[mz_ore::test]
862        #[cfg_attr(miri, ignore)]
863        fn global_expr_cache_roundtrip(key in any::<CacheKey>()) {
864            let global_tree: ArbitraryTimeout<GlobalExpressions> = ArbitraryTimeout::new();
865            let val = global_tree.generate();
866
867            let bincode_val = Bytes::from(bincode::serialize(&val).expect("must serialize"));
868            let (encoded_key, encoded_val) = ExpressionCodec::encode(&key, &bincode_val);
869            let (decoded_key, decoded_val) = ExpressionCodec::decode(&encoded_key, &encoded_val);
870            let decoded_val: GlobalExpressions = bincode::deserialize(&decoded_val).expect("global expressions should roundtrip");
871
872            assert_eq!(key, decoded_key);
873            assert_eq!(val, decoded_val);
874        }
875    }
876}