Skip to main content

mz_catalog/
expr_cache.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A cache for optimized expressions.
11
12use std::collections::{BTreeMap, BTreeSet};
13use std::future::Future;
14use std::sync::Arc;
15
16use bytes::Bytes;
17use mz_compute_types::dataflows::DataflowDescription;
18use mz_durable_cache::{DurableCache, DurableCacheCodec};
19use mz_dyncfg::ConfigSet;
20use mz_expr::OptimizedMirRelationExpr;
21use mz_ore::channel::trigger;
22use mz_ore::soft_panic_or_log;
23use mz_ore::task::spawn;
24use mz_persist_client::PersistClient;
25use mz_persist_client::cli::admin::{
26    EXPRESSION_CACHE_FORCE_COMPACTION_FUEL, EXPRESSION_CACHE_FORCE_COMPACTION_WAIT,
27};
28use mz_persist_types::codec_impls::VecU8Schema;
29use mz_persist_types::{Codec, ShardId};
30use mz_repr::GlobalId;
31use mz_repr::optimize::OptimizerFeatures;
32use mz_transform::dataflow::DataflowMetainfo;
33use mz_transform::notice::OptimizerNotice;
34use semver::Version;
35use serde::{Deserialize, Serialize};
36use tokio::sync::mpsc;
37use tracing::{debug, warn};
38
39#[derive(
40    Debug,
41    Clone,
42    PartialEq,
43    Eq,
44    PartialOrd,
45    Ord,
46    Hash,
47    Serialize,
48    Deserialize
49)]
50enum ExpressionType {
51    Local,
52    Global,
53}
54
55/// The data that is cached per catalog object as a result of local optimizations.
56#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
57pub struct LocalExpressions {
58    pub local_mir: OptimizedMirRelationExpr,
59    pub optimizer_features: OptimizerFeatures,
60}
61
62/// The data that is cached per catalog object as a result of global optimizations.
63#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
64pub struct GlobalExpressions {
65    pub global_mir: DataflowDescription<OptimizedMirRelationExpr>,
66    pub physical_plan: DataflowDescription<mz_compute_types::plan::Plan>,
67    pub dataflow_metainfos: DataflowMetainfo<Arc<OptimizerNotice>>,
68    pub optimizer_features: OptimizerFeatures,
69}
70
71impl GlobalExpressions {
72    fn index_imports(&self) -> impl Iterator<Item = &GlobalId> {
73        self.global_mir
74            .index_imports
75            .keys()
76            .chain(self.physical_plan.index_imports.keys())
77    }
78}
79
80#[derive(
81    Debug,
82    Clone,
83    PartialEq,
84    Eq,
85    PartialOrd,
86    Ord,
87    Hash,
88    Serialize,
89    Deserialize
90)]
91struct CacheKey {
92    build_version: String,
93    id: GlobalId,
94    expr_type: ExpressionType,
95}
96
97#[derive(Debug, PartialEq, Eq)]
98struct ExpressionCodec;
99
100impl DurableCacheCodec for ExpressionCodec {
101    type Key = CacheKey;
102    // We use a raw bytes instead of `Expressions` so that there is no backwards compatibility
103    // requirement on `Expressions` between versions.
104    type Val = Bytes;
105    type KeyCodec = Bytes;
106    type ValCodec = Bytes;
107
108    fn schemas() -> (
109        <Self::KeyCodec as Codec>::Schema,
110        <Self::ValCodec as Codec>::Schema,
111    ) {
112        (VecU8Schema::default(), VecU8Schema::default())
113    }
114
115    fn encode(key: &Self::Key, val: &Self::Val) -> (Self::KeyCodec, Self::ValCodec) {
116        let key = bincode::serialize(key).expect("must serialize");
117        (Bytes::from(key), val.clone())
118    }
119
120    fn decode(key: &Self::KeyCodec, val: &Self::ValCodec) -> (Self::Key, Self::Val) {
121        let key = bincode::deserialize(key).expect("must deserialize");
122        (key, val.clone())
123    }
124}
125
126/// Configuration needed to initialize an [`ExpressionCache`].
127#[derive(Debug, Clone)]
128pub struct ExpressionCacheConfig {
129    pub build_version: Version,
130    pub persist: PersistClient,
131    pub shard_id: ShardId,
132    pub current_ids: BTreeSet<GlobalId>,
133    pub remove_prior_versions: bool,
134    pub compact_shard: bool,
135    pub dyncfgs: ConfigSet,
136}
137
138/// A durable cache of optimized expressions.
139pub struct ExpressionCache {
140    build_version: Version,
141    durable_cache: DurableCache<ExpressionCodec>,
142}
143
144impl ExpressionCache {
145    /// Creates a new [`ExpressionCache`] and reconciles all entries in current build version.
146    /// Reconciliation will remove all entries that are not in `current_ids` and remove all
147    /// entries that have optimizer features that are not equal to `optimizer_features`.
148    ///
149    /// If `remove_prior_versions` is `true`, then all previous versions are durably removed from the
150    /// cache.
151    ///
152    /// If `compact_shard` is `true`, then this function will block on fully compacting the backing
153    /// persist shard.
154    ///
155    /// Returns all cached expressions in the current build version, after reconciliation.
156    pub async fn open(
157        ExpressionCacheConfig {
158            build_version,
159            persist,
160            shard_id,
161            current_ids,
162            remove_prior_versions,
163            compact_shard,
164            dyncfgs,
165        }: ExpressionCacheConfig,
166    ) -> (
167        Self,
168        BTreeMap<GlobalId, LocalExpressions>,
169        BTreeMap<GlobalId, GlobalExpressions>,
170    ) {
171        let durable_cache = DurableCache::new(&persist, shard_id, "expressions").await;
172        let mut cache = Self {
173            build_version,
174            durable_cache,
175        };
176
177        const RETRIES: usize = 100;
178        for _ in 0..RETRIES {
179            match cache
180                .try_open(&current_ids, remove_prior_versions, compact_shard, &dyncfgs)
181                .await
182            {
183                Ok((local_expressions, global_expressions)) => {
184                    return (cache, local_expressions, global_expressions);
185                }
186                Err(err) => debug!("failed to open cache: {err} ... retrying"),
187            }
188        }
189
190        panic!("Unable to open expression cache after {RETRIES} retries");
191    }
192
193    async fn try_open(
194        &mut self,
195        current_ids: &BTreeSet<GlobalId>,
196        remove_prior_versions: bool,
197        compact_shard: bool,
198        dyncfgs: &ConfigSet,
199    ) -> Result<
200        (
201            BTreeMap<GlobalId, LocalExpressions>,
202            BTreeMap<GlobalId, GlobalExpressions>,
203        ),
204        mz_durable_cache::Error,
205    > {
206        let mut keys_to_remove = Vec::new();
207        let mut local_expressions = BTreeMap::new();
208        let mut global_expressions = BTreeMap::new();
209
210        for (key, expressions) in self.durable_cache.entries_local() {
211            let build_version = match key.build_version.parse::<Version>() {
212                Ok(build_version) => build_version,
213                Err(err) => {
214                    warn!("unable to parse build version: {key:?}: {err:?}");
215                    keys_to_remove.push((key.clone(), None));
216                    continue;
217                }
218            };
219            if build_version == self.build_version {
220                // Only deserialize the current version.
221                match key.expr_type {
222                    ExpressionType::Local => {
223                        let expressions: LocalExpressions = match bincode::deserialize(expressions)
224                        {
225                            Ok(expressions) => expressions,
226                            Err(err) => {
227                                soft_panic_or_log!(
228                                    "unable to deserialize local expressions: ({key:?}, {expressions:?}): {err:?}"
229                                );
230                                continue;
231                            }
232                        };
233                        // Remove dropped IDs.
234                        if !current_ids.contains(&key.id) {
235                            keys_to_remove.push((key.clone(), None));
236                        } else {
237                            local_expressions.insert(key.id, expressions);
238                        }
239                    }
240                    ExpressionType::Global => {
241                        let expressions: GlobalExpressions = match bincode::deserialize(expressions)
242                        {
243                            Ok(expressions) => expressions,
244                            Err(err) => {
245                                soft_panic_or_log!(
246                                    "unable to deserialize global expressions: ({key:?}, {expressions:?}): {err:?}"
247                                );
248                                continue;
249                            }
250                        };
251                        // Remove dropped IDs and expressions that rely on dropped indexes.
252                        let index_dependencies: BTreeSet<_> =
253                            expressions.index_imports().cloned().collect();
254                        if !current_ids.contains(&key.id)
255                            || !index_dependencies.is_subset(current_ids)
256                        {
257                            keys_to_remove.push((key.clone(), None));
258                        } else {
259                            global_expressions.insert(key.id, expressions);
260                        }
261                    }
262                }
263            } else if remove_prior_versions {
264                // Remove expressions from previous versions.
265                keys_to_remove.push((key.clone(), None));
266            }
267        }
268
269        let keys_to_remove: Vec<_> = keys_to_remove
270            .iter()
271            .map(|(key, expressions)| (key, expressions.as_ref()))
272            .collect();
273        self.durable_cache.try_set_many(&keys_to_remove).await?;
274
275        if remove_prior_versions {
276            // We've purged old versions from the cache; upgrade the backing Persist version as well.
277            self.durable_cache.upgrade_version().await;
278        }
279
280        if compact_shard {
281            let fuel = EXPRESSION_CACHE_FORCE_COMPACTION_FUEL.handle(dyncfgs);
282            let wait = EXPRESSION_CACHE_FORCE_COMPACTION_WAIT.handle(dyncfgs);
283            self.durable_cache
284                .dangerous_compact_shard(move || fuel.get(), move || wait.get())
285                .await;
286        }
287
288        Ok((local_expressions, global_expressions))
289    }
290
291    /// Durably removes all entries given by `invalidate_ids` and inserts `new_local_expressions`
292    /// and `new_global_expressions` into current build version.
293    ///
294    /// If there is a duplicate ID in both `invalidate_ids` and one of the new expressions vector,
295    /// then the final value will be taken from the new expressions vector.
296    async fn update(
297        &mut self,
298        new_local_expressions: Vec<(GlobalId, LocalExpressions)>,
299        new_global_expressions: Vec<(GlobalId, GlobalExpressions)>,
300        invalidate_ids: BTreeSet<GlobalId>,
301    ) {
302        let mut entries = BTreeMap::new();
303        let build_version = self.build_version.to_string();
304        // Important to do `invalidate_ids` first, so that `new_X_expressions` overwrites duplicate
305        // keys.
306        for id in invalidate_ids {
307            entries.insert(
308                CacheKey {
309                    id,
310                    build_version: build_version.clone(),
311                    expr_type: ExpressionType::Local,
312                },
313                None,
314            );
315            entries.insert(
316                CacheKey {
317                    id,
318                    build_version: build_version.clone(),
319                    expr_type: ExpressionType::Global,
320                },
321                None,
322            );
323        }
324        for (id, expressions) in new_local_expressions {
325            let expressions = match bincode::serialize(&expressions) {
326                Ok(expressions) => Bytes::from(expressions),
327                Err(err) => {
328                    soft_panic_or_log!(
329                        "unable to serialize local expressions: {expressions:?}: {err:?}"
330                    );
331                    continue;
332                }
333            };
334            entries.insert(
335                CacheKey {
336                    id,
337                    build_version: build_version.clone(),
338                    expr_type: ExpressionType::Local,
339                },
340                Some(expressions),
341            );
342        }
343        for (id, expressions) in new_global_expressions {
344            let expressions = match bincode::serialize(&expressions) {
345                Ok(expressions) => Bytes::from(expressions),
346                Err(err) => {
347                    soft_panic_or_log!(
348                        "unable to serialize global expressions: {expressions:?}: {err:?}"
349                    );
350                    continue;
351                }
352            };
353            entries.insert(
354                CacheKey {
355                    id,
356                    build_version: build_version.clone(),
357                    expr_type: ExpressionType::Global,
358                },
359                Some(expressions),
360            );
361        }
362        let entries: Vec<_> = entries
363            .iter()
364            .map(|(key, expressions)| (key, expressions.as_ref()))
365            .collect();
366        self.durable_cache.set_many(&entries).await
367    }
368}
369
370/// Operations to perform on the cache.
371enum CacheOperation {
372    /// See [`ExpressionCache::update`].
373    Update {
374        new_local_expressions: Vec<(GlobalId, LocalExpressions)>,
375        new_global_expressions: Vec<(GlobalId, GlobalExpressions)>,
376        invalidate_ids: BTreeSet<GlobalId>,
377        trigger: trigger::Trigger,
378    },
379}
380
381#[derive(Debug, Clone)]
382pub struct ExpressionCacheHandle {
383    tx: mpsc::UnboundedSender<CacheOperation>,
384}
385
386impl ExpressionCacheHandle {
387    /// Spawns a task responsible for managing the expression cache. See [`ExpressionCache::open`].
388    ///
389    /// Returns a handle to interact with the cache and the initial contents of the cache.
390    pub async fn spawn_expression_cache(
391        config: ExpressionCacheConfig,
392    ) -> (
393        Self,
394        BTreeMap<GlobalId, LocalExpressions>,
395        BTreeMap<GlobalId, GlobalExpressions>,
396    ) {
397        let (mut cache, local_expressions, global_expressions) =
398            ExpressionCache::open(config).await;
399        let (tx, mut rx) = mpsc::unbounded_channel();
400        spawn(|| "expression-cache-task", async move {
401            while let Some(op) = rx.recv().await {
402                match op {
403                    CacheOperation::Update {
404                        new_local_expressions,
405                        new_global_expressions,
406                        invalidate_ids,
407                        trigger: _trigger,
408                    } => {
409                        cache
410                            .update(
411                                new_local_expressions,
412                                new_global_expressions,
413                                invalidate_ids,
414                            )
415                            .await
416                    }
417                }
418            }
419        });
420
421        (Self { tx }, local_expressions, global_expressions)
422    }
423
424    pub fn update(
425        &self,
426        new_local_expressions: Vec<(GlobalId, LocalExpressions)>,
427        new_global_expressions: Vec<(GlobalId, GlobalExpressions)>,
428        invalidate_ids: BTreeSet<GlobalId>,
429    ) -> impl Future<Output = ()> + use<> {
430        let (trigger, trigger_rx) = trigger::channel();
431        let op = CacheOperation::Update {
432            new_local_expressions,
433            new_global_expressions,
434            invalidate_ids,
435            trigger,
436        };
437        // If the send fails, then we must be shutting down.
438        let _ = self.tx.send(op);
439        trigger_rx
440    }
441}
442
443#[cfg(test)]
444mod tests {
445    use std::collections::{BTreeMap, BTreeSet};
446
447    use bytes::Bytes;
448    use mz_compute_types::dataflows::{DataflowDescription, IndexDesc, IndexImport};
449    use mz_durable_cache::DurableCacheCodec;
450    use mz_dyncfg::ConfigSet;
451    use mz_expr::{MirRelationExpr, OptimizedMirRelationExpr};
452    use mz_persist_client::PersistClient;
453    use mz_persist_types::ShardId;
454    use mz_repr::{Datum, GlobalId, ReprRelationType, ReprScalarType};
455    use semver::Version;
456
457    use super::*;
458
459    #[mz_ore::test(tokio::test)]
460    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
461    async fn expression_cache() {
462        let first_version = Version::new(0, 1, 0);
463        let second_version = Version::new(0, 2, 0);
464        let persist = PersistClient::new_for_tests().await;
465        let shard_id = ShardId::new();
466
467        let mut current_ids = BTreeSet::new();
468        let mut remove_prior_versions = false;
469        // Compacting the shard takes too long, so we leave it to integration tests.
470        let compact_shard = false;
471        let dyncfgs = &mz_persist_client::cfg::all_dyncfgs(ConfigSet::default());
472
473        let mut next_id = 0;
474
475        let (mut local_exps, mut global_exps) = {
476            // Open a new empty cache.
477            let (cache, local_exprs, global_exprs) =
478                ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
479                    build_version: first_version.clone(),
480                    persist: persist.clone(),
481                    shard_id,
482                    current_ids: current_ids.clone(),
483                    remove_prior_versions,
484                    compact_shard,
485                    dyncfgs: dyncfgs.clone(),
486                })
487                .await;
488            assert_eq!(local_exprs, BTreeMap::new(), "new cache should be empty");
489            assert_eq!(global_exprs, BTreeMap::new(), "new cache should be empty");
490
491            // Insert some expressions into the cache.
492            let mut local_exps = BTreeMap::new();
493            let mut global_exps = BTreeMap::new();
494            for _ in 0..4 {
495                let id = GlobalId::User(next_id);
496                let local_exp = gen_local_expressions();
497                let global_exp = gen_global_expressions();
498
499                cache
500                    .update(
501                        vec![(id, local_exp.clone())],
502                        vec![(id, global_exp.clone())],
503                        BTreeSet::new(),
504                    )
505                    .await;
506
507                current_ids.insert(id);
508                current_ids.extend(global_exp.index_imports());
509                local_exps.insert(id, local_exp);
510                global_exps.insert(id, global_exp);
511
512                next_id += 1;
513            }
514            (local_exps, global_exps)
515        };
516
517        {
518            // Re-open the cache.
519            let (_cache, local_entries, global_entries) =
520                ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
521                    build_version: first_version.clone(),
522                    persist: persist.clone(),
523                    shard_id,
524                    current_ids: current_ids.clone(),
525                    remove_prior_versions,
526                    compact_shard,
527                    dyncfgs: dyncfgs.clone(),
528                })
529                .await;
530            assert_eq!(
531                local_entries, local_exps,
532                "local expression with non-matching optimizer features should be removed during reconciliation"
533            );
534            assert_eq!(
535                global_entries, global_exps,
536                "global expression with non-matching optimizer features should be removed during reconciliation"
537            );
538        }
539
540        {
541            // Simulate dropping an object.
542            let id_to_remove = local_exps.keys().next().expect("not empty").clone();
543            current_ids.remove(&id_to_remove);
544            let _removed_local_exp = local_exps.remove(&id_to_remove);
545            let _removed_global_exp = global_exps.remove(&id_to_remove);
546
547            // Re-open the cache.
548            let (_cache, local_entries, global_entries) =
549                ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
550                    build_version: first_version.clone(),
551                    persist: persist.clone(),
552                    shard_id,
553                    current_ids: current_ids.clone(),
554                    remove_prior_versions,
555                    compact_shard,
556                    dyncfgs: dyncfgs.clone(),
557                })
558                .await;
559            assert_eq!(
560                local_entries, local_exps,
561                "dropped local objects should be removed during reconciliation"
562            );
563            assert_eq!(
564                global_entries, global_exps,
565                "dropped global objects should be removed during reconciliation"
566            );
567        }
568
569        {
570            // Simulate dropping an object dependency.
571            let global_exp_to_remove = global_exps.keys().next().expect("not empty").clone();
572            let removed_global_exp = global_exps
573                .remove(&global_exp_to_remove)
574                .expect("known to exist");
575            let dependency_to_remove = removed_global_exp
576                .index_imports()
577                .next()
578                .expect("generator always makes non-empty index imports");
579            current_ids.remove(dependency_to_remove);
580
581            // If the dependency is also tracked in the cache remove it.
582            let _removed_local_exp = local_exps.remove(dependency_to_remove);
583            let _removed_global_exp = global_exps.remove(dependency_to_remove);
584            // Remove any other exps that depend on dependency.
585            global_exps.retain(|_, exp| {
586                let index_imports: BTreeSet<_> = exp.index_imports().collect();
587                !index_imports.contains(&dependency_to_remove)
588            });
589
590            // Re-open the cache.
591            let (_cache, local_entries, global_entries) =
592                ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
593                    build_version: first_version.clone(),
594                    persist: persist.clone(),
595                    shard_id,
596                    current_ids: current_ids.clone(),
597                    remove_prior_versions,
598                    compact_shard,
599                    dyncfgs: dyncfgs.clone(),
600                })
601                .await;
602            assert_eq!(
603                local_entries, local_exps,
604                "dropped object dependencies should NOT remove local expressions"
605            );
606            assert_eq!(
607                global_entries, global_exps,
608                "dropped object dependencies should remove global expressions"
609            );
610        }
611
612        let (new_gen_local_exps, new_gen_global_exps) = {
613            // Open the cache at a new version.
614            let (cache, local_entries, global_entries) =
615                ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
616                    build_version: second_version.clone(),
617                    persist: persist.clone(),
618                    shard_id,
619                    current_ids: current_ids.clone(),
620                    remove_prior_versions,
621                    compact_shard,
622                    dyncfgs: dyncfgs.clone(),
623                })
624                .await;
625            assert_eq!(
626                local_entries,
627                BTreeMap::new(),
628                "new version should be empty"
629            );
630            assert_eq!(
631                global_entries,
632                BTreeMap::new(),
633                "new version should be empty"
634            );
635
636            // Insert some expressions at the new version.
637            let mut local_exps = BTreeMap::new();
638            let mut global_exps = BTreeMap::new();
639            for _ in 0..2 {
640                let id = GlobalId::User(next_id);
641                let local_exp = gen_local_expressions();
642                let global_exp = gen_global_expressions();
643
644                cache
645                    .update(
646                        vec![(id, local_exp.clone())],
647                        vec![(id, global_exp.clone())],
648                        BTreeSet::new(),
649                    )
650                    .await;
651
652                current_ids.insert(id);
653                current_ids.extend(global_exp.index_imports());
654                local_exps.insert(id, local_exp);
655                global_exps.insert(id, global_exp);
656
657                next_id += 1;
658            }
659            (local_exps, global_exps)
660        };
661
662        {
663            // Re-open the cache at the first version.
664            let (_cache, local_entries, global_entries) =
665                ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
666                    build_version: first_version.clone(),
667                    persist: persist.clone(),
668                    shard_id,
669                    current_ids: current_ids.clone(),
670                    remove_prior_versions,
671                    compact_shard,
672                    dyncfgs: dyncfgs.clone(),
673                })
674                .await;
675            assert_eq!(
676                local_entries, local_exps,
677                "Previous version local expressions should still exist"
678            );
679            assert_eq!(
680                global_entries, global_exps,
681                "Previous version global expressions should still exist"
682            );
683        }
684
685        {
686            // Open the cache at a new version and clear previous versions.
687            remove_prior_versions = true;
688            let (_cache, local_entries, global_entries) =
689                ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
690                    build_version: second_version.clone(),
691                    persist: persist.clone(),
692                    shard_id,
693                    current_ids: current_ids.clone(),
694                    remove_prior_versions,
695                    compact_shard,
696                    dyncfgs: dyncfgs.clone(),
697                })
698                .await;
699            assert_eq!(
700                local_entries, new_gen_local_exps,
701                "new version local expressions should be persisted"
702            );
703            assert_eq!(
704                global_entries, new_gen_global_exps,
705                "new version global expressions should be persisted"
706            );
707        }
708
709        {
710            // Re-open the cache at the first version.
711            let (_cache, local_entries, global_entries) =
712                ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
713                    build_version: first_version.clone(),
714                    persist: persist.clone(),
715                    shard_id,
716                    current_ids: current_ids.clone(),
717                    remove_prior_versions,
718                    compact_shard,
719                    dyncfgs: dyncfgs.clone(),
720                })
721                .await;
722            assert_eq!(
723                local_entries,
724                BTreeMap::new(),
725                "Previous version local expressions should be cleared"
726            );
727            assert_eq!(
728                global_entries,
729                BTreeMap::new(),
730                "Previous version global expressions should be cleared"
731            );
732        }
733    }
734
735    #[mz_ore::test]
736    fn local_expr_cache_roundtrip() {
737        let key = CacheKey {
738            id: GlobalId::User(1),
739            build_version: "1.2.3".into(),
740            expr_type: ExpressionType::Local,
741        };
742        let val = gen_local_expressions();
743
744        let bincode_val = Bytes::from(bincode::serialize(&val).expect("must serialize"));
745        let (encoded_key, encoded_val) = ExpressionCodec::encode(&key, &bincode_val);
746        let (decoded_key, decoded_val) = ExpressionCodec::decode(&encoded_key, &encoded_val);
747        let decoded_val: LocalExpressions =
748            bincode::deserialize(&decoded_val).expect("local expressions should roundtrip");
749
750        assert_eq!(key, decoded_key);
751        assert_eq!(val, decoded_val);
752    }
753
754    #[mz_ore::test]
755    fn global_expr_cache_roundtrip() {
756        let key = CacheKey {
757            id: GlobalId::User(1),
758            build_version: "1.2.3".into(),
759            expr_type: ExpressionType::Global,
760        };
761        let val = gen_global_expressions();
762
763        let bincode_val = Bytes::from(bincode::serialize(&val).expect("must serialize"));
764        let (encoded_key, encoded_val) = ExpressionCodec::encode(&key, &bincode_val);
765        let (decoded_key, decoded_val) = ExpressionCodec::decode(&encoded_key, &encoded_val);
766        let decoded_val: GlobalExpressions =
767            bincode::deserialize(&decoded_val).expect("global expressions should roundtrip");
768
769        assert_eq!(key, decoded_key);
770        assert_eq!(val, decoded_val);
771    }
772
773    /// Generate a random [`LocalExpressions`] value.
774    ///
775    /// The returned values are mostly hardcoded and only differ in a single randomized number.
776    /// That's sufficient for the expr cache tests, since the cache mostly treats expressions as
777    /// opaque objects.
778    fn gen_local_expressions() -> LocalExpressions {
779        let datum = Datum::UInt64(rand::random());
780
781        LocalExpressions {
782            local_mir: OptimizedMirRelationExpr(MirRelationExpr::constant(
783                vec![vec![datum]],
784                ReprRelationType::new(vec![ReprScalarType::UInt64.nullable(false)]),
785            )),
786            optimizer_features: Default::default(),
787        }
788    }
789
790    /// Generate a random [`GlobalExpressions`] value.
791    ///
792    /// The returned values are mostly hardcoded and only differ in a single randomized string.
793    /// That's sufficient for the expr cache tests, since the cache mostly treats expressions as
794    /// opaque objects.
795    fn gen_global_expressions() -> GlobalExpressions {
796        let name = format!("test-{}", rand::random::<u64>());
797
798        let mut global_mir = DataflowDescription::new(name.clone());
799        let mut physical_plan = DataflowDescription::new(name);
800
801        // Add pieces expected by tests.
802        let index_imports = BTreeMap::from_iter([(
803            GlobalId::User(2),
804            IndexImport {
805                desc: IndexDesc {
806                    on_id: GlobalId::User(1),
807                    key: Default::default(),
808                },
809                typ: ReprRelationType::empty(),
810                monotonic: false,
811                with_snapshot: true,
812            },
813        )]);
814        global_mir.index_imports = index_imports.clone();
815        physical_plan.index_imports = index_imports;
816
817        GlobalExpressions {
818            global_mir,
819            physical_plan,
820            dataflow_metainfos: Default::default(),
821            optimizer_features: Default::default(),
822        }
823    }
824}