Skip to main content

mz_catalog/
expr_cache.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A cache for optimized expressions.
11
12use std::collections::{BTreeMap, BTreeSet};
13use std::future::Future;
14use std::sync::Arc;
15
16use bytes::Bytes;
17use mz_compute_types::dataflows::DataflowDescription;
18use mz_durable_cache::{DurableCache, DurableCacheCodec};
19use mz_dyncfg::ConfigSet;
20use mz_expr::OptimizedMirRelationExpr;
21use mz_ore::channel::trigger;
22use mz_ore::soft_panic_or_log;
23use mz_ore::task::spawn;
24use mz_persist_client::PersistClient;
25use mz_persist_client::cli::admin::{
26    EXPRESSION_CACHE_FORCE_COMPACTION_FUEL, EXPRESSION_CACHE_FORCE_COMPACTION_WAIT,
27};
28use mz_persist_types::codec_impls::VecU8Schema;
29use mz_persist_types::{Codec, ShardId};
30use mz_repr::GlobalId;
31use mz_repr::optimize::OptimizerFeatures;
32use mz_transform::dataflow::DataflowMetainfo;
33use mz_transform::notice::OptimizerNotice;
34use semver::Version;
35use serde::{Deserialize, Serialize};
36use tokio::sync::mpsc;
37use tracing::{debug, warn};
38
39#[derive(
40    Debug,
41    Clone,
42    PartialEq,
43    Eq,
44    PartialOrd,
45    Ord,
46    Hash,
47    Serialize,
48    Deserialize
49)]
50enum ExpressionType {
51    Local,
52    Global,
53}
54
55/// The data that is cached per catalog object as a result of local optimizations.
56#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
57pub struct LocalExpressions {
58    pub local_mir: OptimizedMirRelationExpr,
59    pub optimizer_features: OptimizerFeatures,
60}
61
62/// The data that is cached per catalog object as a result of global optimizations.
63#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
64pub struct GlobalExpressions {
65    pub global_mir: DataflowDescription<OptimizedMirRelationExpr>,
66    pub physical_plan: DataflowDescription<mz_compute_types::plan::Plan>,
67    pub dataflow_metainfos: DataflowMetainfo<Arc<OptimizerNotice>>,
68    pub optimizer_features: OptimizerFeatures,
69}
70
71impl GlobalExpressions {
72    fn index_imports(&self) -> impl Iterator<Item = &GlobalId> {
73        self.global_mir
74            .index_imports
75            .keys()
76            .chain(self.physical_plan.index_imports.keys())
77    }
78}
79
80#[derive(
81    Debug,
82    Clone,
83    PartialEq,
84    Eq,
85    PartialOrd,
86    Ord,
87    Hash,
88    Serialize,
89    Deserialize
90)]
91struct CacheKey {
92    build_version: String,
93    id: GlobalId,
94    expr_type: ExpressionType,
95}
96
97#[derive(Debug, PartialEq, Eq)]
98struct ExpressionCodec;
99
100impl DurableCacheCodec for ExpressionCodec {
101    type Key = CacheKey;
102    // We use a raw bytes instead of `Expressions` so that there is no backwards compatibility
103    // requirement on `Expressions` between versions.
104    type Val = Bytes;
105    type KeyCodec = Bytes;
106    type ValCodec = Bytes;
107
108    fn schemas() -> (
109        <Self::KeyCodec as Codec>::Schema,
110        <Self::ValCodec as Codec>::Schema,
111    ) {
112        (VecU8Schema::default(), VecU8Schema::default())
113    }
114
115    fn encode(key: &Self::Key, val: &Self::Val) -> (Self::KeyCodec, Self::ValCodec) {
116        let key = bincode::serialize(key).expect("must serialize");
117        (Bytes::from(key), val.clone())
118    }
119
120    fn decode(key: &Self::KeyCodec, val: &Self::ValCodec) -> (Self::Key, Self::Val) {
121        let key = bincode::deserialize(key).expect("must deserialize");
122        (key, val.clone())
123    }
124}
125
126/// Configuration needed to initialize an [`ExpressionCache`].
127#[derive(Debug, Clone)]
128pub struct ExpressionCacheConfig {
129    pub build_version: Version,
130    pub persist: PersistClient,
131    pub shard_id: ShardId,
132    pub current_ids: BTreeSet<GlobalId>,
133    pub remove_prior_versions: bool,
134    pub compact_shard: bool,
135    pub dyncfgs: ConfigSet,
136}
137
138/// A durable cache of optimized expressions.
139pub struct ExpressionCache {
140    build_version: Version,
141    durable_cache: DurableCache<ExpressionCodec>,
142}
143
144impl ExpressionCache {
145    /// Creates a new [`ExpressionCache`] and reconciles all entries in current build version.
146    /// Reconciliation will remove all entries that are not in `current_ids` and remove all
147    /// entries that have optimizer features that are not equal to `optimizer_features`.
148    ///
149    /// If `remove_prior_versions` is `true`, then all previous versions are durably removed from the
150    /// cache.
151    ///
152    /// If `compact_shard` is `true`, then this function will block on fully compacting the backing
153    /// persist shard.
154    ///
155    /// Returns all cached expressions in the current build version, after reconciliation.
156    pub async fn open(
157        ExpressionCacheConfig {
158            build_version,
159            persist,
160            shard_id,
161            current_ids,
162            remove_prior_versions,
163            compact_shard,
164            dyncfgs,
165        }: ExpressionCacheConfig,
166    ) -> (
167        Self,
168        BTreeMap<GlobalId, LocalExpressions>,
169        BTreeMap<GlobalId, GlobalExpressions>,
170    ) {
171        let durable_cache = DurableCache::new(&persist, shard_id, "expressions").await;
172        let mut cache = Self {
173            build_version,
174            durable_cache,
175        };
176
177        const RETRIES: usize = 100;
178        for _ in 0..RETRIES {
179            match cache
180                .try_open(&current_ids, remove_prior_versions, compact_shard, &dyncfgs)
181                .await
182            {
183                Ok((local_expressions, global_expressions)) => {
184                    return (cache, local_expressions, global_expressions);
185                }
186                Err(err) => debug!("failed to open cache: {err} ... retrying"),
187            }
188        }
189
190        panic!("Unable to open expression cache after {RETRIES} retries");
191    }
192
193    async fn try_open(
194        &mut self,
195        current_ids: &BTreeSet<GlobalId>,
196        remove_prior_versions: bool,
197        compact_shard: bool,
198        dyncfgs: &ConfigSet,
199    ) -> Result<
200        (
201            BTreeMap<GlobalId, LocalExpressions>,
202            BTreeMap<GlobalId, GlobalExpressions>,
203        ),
204        mz_durable_cache::Error,
205    > {
206        let mut keys_to_remove = Vec::new();
207        let mut local_expressions = BTreeMap::new();
208        let mut global_expressions = BTreeMap::new();
209
210        for (key, expressions) in self.durable_cache.entries_local() {
211            let build_version = match key.build_version.parse::<Version>() {
212                Ok(build_version) => build_version,
213                Err(err) => {
214                    warn!("unable to parse build version: {key:?}: {err:?}");
215                    keys_to_remove.push((key.clone(), None));
216                    continue;
217                }
218            };
219            if build_version == self.build_version {
220                // Only deserialize the current version.
221                match key.expr_type {
222                    ExpressionType::Local => {
223                        let expressions: LocalExpressions = match bincode::deserialize(expressions)
224                        {
225                            Ok(expressions) => expressions,
226                            Err(err) => {
227                                soft_panic_or_log!(
228                                    "unable to deserialize local expressions: ({key:?}, {expressions:?}): {err:?}"
229                                );
230                                continue;
231                            }
232                        };
233                        // Remove dropped IDs.
234                        if !current_ids.contains(&key.id) {
235                            keys_to_remove.push((key.clone(), None));
236                        } else {
237                            local_expressions.insert(key.id, expressions);
238                        }
239                    }
240                    ExpressionType::Global => {
241                        let expressions: GlobalExpressions = match bincode::deserialize(expressions)
242                        {
243                            Ok(expressions) => expressions,
244                            Err(err) => {
245                                soft_panic_or_log!(
246                                    "unable to deserialize global expressions: ({key:?}, {expressions:?}): {err:?}"
247                                );
248                                continue;
249                            }
250                        };
251                        // Remove dropped IDs and expressions that rely on dropped indexes.
252                        let index_dependencies: BTreeSet<_> =
253                            expressions.index_imports().cloned().collect();
254                        if !current_ids.contains(&key.id)
255                            || !index_dependencies.is_subset(current_ids)
256                        {
257                            keys_to_remove.push((key.clone(), None));
258                        } else {
259                            global_expressions.insert(key.id, expressions);
260                        }
261                    }
262                }
263            } else if remove_prior_versions {
264                // Remove expressions from previous versions.
265                keys_to_remove.push((key.clone(), None));
266            }
267        }
268
269        let keys_to_remove: Vec<_> = keys_to_remove
270            .iter()
271            .map(|(key, expressions)| (key, expressions.as_ref()))
272            .collect();
273        self.durable_cache.try_set_many(&keys_to_remove).await?;
274
275        if remove_prior_versions {
276            // We've purged old versions from the cache; upgrade the backing Persist version as well.
277            self.durable_cache.upgrade_version().await;
278        }
279
280        if compact_shard {
281            let fuel = EXPRESSION_CACHE_FORCE_COMPACTION_FUEL.handle(dyncfgs);
282            let wait = EXPRESSION_CACHE_FORCE_COMPACTION_WAIT.handle(dyncfgs);
283            self.durable_cache
284                .dangerous_compact_shard(move || fuel.get(), move || wait.get())
285                .await;
286        }
287
288        Ok((local_expressions, global_expressions))
289    }
290
291    /// Durably removes all entries given by `invalidate_ids` and inserts `new_local_expressions`
292    /// and `new_global_expressions` into current build version.
293    ///
294    /// If there is a duplicate ID in both `invalidate_ids` and one of the new expressions vector,
295    /// then the final value will be taken from the new expressions vector.
296    async fn update(
297        &mut self,
298        new_local_expressions: Vec<(GlobalId, LocalExpressions)>,
299        new_global_expressions: Vec<(GlobalId, GlobalExpressions)>,
300        invalidate_ids: BTreeSet<GlobalId>,
301    ) {
302        let mut entries = BTreeMap::new();
303        let build_version = self.build_version.to_string();
304        // Important to do `invalidate_ids` first, so that `new_X_expressions` overwrites duplicate
305        // keys.
306        for id in invalidate_ids {
307            entries.insert(
308                CacheKey {
309                    id,
310                    build_version: build_version.clone(),
311                    expr_type: ExpressionType::Local,
312                },
313                None,
314            );
315            entries.insert(
316                CacheKey {
317                    id,
318                    build_version: build_version.clone(),
319                    expr_type: ExpressionType::Global,
320                },
321                None,
322            );
323        }
324        for (id, expressions) in new_local_expressions {
325            let expressions = match bincode::serialize(&expressions) {
326                Ok(expressions) => Bytes::from(expressions),
327                Err(err) => {
328                    soft_panic_or_log!(
329                        "unable to serialize local expressions: {expressions:?}: {err:?}"
330                    );
331                    continue;
332                }
333            };
334            entries.insert(
335                CacheKey {
336                    id,
337                    build_version: build_version.clone(),
338                    expr_type: ExpressionType::Local,
339                },
340                Some(expressions),
341            );
342        }
343        for (id, expressions) in new_global_expressions {
344            let expressions = match bincode::serialize(&expressions) {
345                Ok(expressions) => Bytes::from(expressions),
346                Err(err) => {
347                    soft_panic_or_log!(
348                        "unable to serialize global expressions: {expressions:?}: {err:?}"
349                    );
350                    continue;
351                }
352            };
353            entries.insert(
354                CacheKey {
355                    id,
356                    build_version: build_version.clone(),
357                    expr_type: ExpressionType::Global,
358                },
359                Some(expressions),
360            );
361        }
362        let entries: Vec<_> = entries
363            .iter()
364            .map(|(key, expressions)| (key, expressions.as_ref()))
365            .collect();
366        self.durable_cache.set_many(&entries).await
367    }
368}
369
370/// Operations to perform on the cache.
371enum CacheOperation {
372    /// See [`ExpressionCache::update`].
373    Update {
374        new_local_expressions: Vec<(GlobalId, LocalExpressions)>,
375        new_global_expressions: Vec<(GlobalId, GlobalExpressions)>,
376        invalidate_ids: BTreeSet<GlobalId>,
377        trigger: trigger::Trigger,
378    },
379}
380
381#[derive(Debug, Clone)]
382pub struct ExpressionCacheHandle {
383    tx: mpsc::UnboundedSender<CacheOperation>,
384}
385
386impl ExpressionCacheHandle {
387    /// Spawns a task responsible for managing the expression cache. See [`ExpressionCache::open`].
388    ///
389    /// Returns a handle to interact with the cache and the initial contents of the cache.
390    pub async fn spawn_expression_cache(
391        config: ExpressionCacheConfig,
392    ) -> (
393        Self,
394        BTreeMap<GlobalId, LocalExpressions>,
395        BTreeMap<GlobalId, GlobalExpressions>,
396    ) {
397        let (mut cache, local_expressions, global_expressions) =
398            ExpressionCache::open(config).await;
399        let (tx, mut rx) = mpsc::unbounded_channel();
400        spawn(|| "expression-cache-task", async move {
401            loop {
402                while let Some(op) = rx.recv().await {
403                    match op {
404                        CacheOperation::Update {
405                            new_local_expressions,
406                            new_global_expressions,
407                            invalidate_ids,
408                            trigger: _trigger,
409                        } => {
410                            cache
411                                .update(
412                                    new_local_expressions,
413                                    new_global_expressions,
414                                    invalidate_ids,
415                                )
416                                .await
417                        }
418                    }
419                }
420            }
421        });
422
423        (Self { tx }, local_expressions, global_expressions)
424    }
425
426    pub fn update(
427        &self,
428        new_local_expressions: Vec<(GlobalId, LocalExpressions)>,
429        new_global_expressions: Vec<(GlobalId, GlobalExpressions)>,
430        invalidate_ids: BTreeSet<GlobalId>,
431    ) -> impl Future<Output = ()> + use<> {
432        let (trigger, trigger_rx) = trigger::channel();
433        let op = CacheOperation::Update {
434            new_local_expressions,
435            new_global_expressions,
436            invalidate_ids,
437            trigger,
438        };
439        // If the send fails, then we must be shutting down.
440        let _ = self.tx.send(op);
441        trigger_rx
442    }
443}
444
445#[cfg(test)]
446mod tests {
447    use std::collections::{BTreeMap, BTreeSet};
448
449    use bytes::Bytes;
450    use mz_compute_types::dataflows::{DataflowDescription, IndexDesc, IndexImport};
451    use mz_durable_cache::DurableCacheCodec;
452    use mz_dyncfg::ConfigSet;
453    use mz_expr::{MirRelationExpr, OptimizedMirRelationExpr};
454    use mz_persist_client::PersistClient;
455    use mz_persist_types::ShardId;
456    use mz_repr::{Datum, GlobalId, SqlRelationType, SqlScalarType};
457    use semver::Version;
458
459    use super::*;
460
461    #[mz_ore::test(tokio::test)]
462    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
463    async fn expression_cache() {
464        let first_version = Version::new(0, 1, 0);
465        let second_version = Version::new(0, 2, 0);
466        let persist = PersistClient::new_for_tests().await;
467        let shard_id = ShardId::new();
468
469        let mut current_ids = BTreeSet::new();
470        let mut remove_prior_versions = false;
471        // Compacting the shard takes too long, so we leave it to integration tests.
472        let compact_shard = false;
473        let dyncfgs = &mz_persist_client::cfg::all_dyncfgs(ConfigSet::default());
474
475        let mut next_id = 0;
476
477        let (mut local_exps, mut global_exps) = {
478            // Open a new empty cache.
479            let (cache, local_exprs, global_exprs) =
480                ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
481                    build_version: first_version.clone(),
482                    persist: persist.clone(),
483                    shard_id,
484                    current_ids: current_ids.clone(),
485                    remove_prior_versions,
486                    compact_shard,
487                    dyncfgs: dyncfgs.clone(),
488                })
489                .await;
490            assert_eq!(local_exprs, BTreeMap::new(), "new cache should be empty");
491            assert_eq!(global_exprs, BTreeMap::new(), "new cache should be empty");
492
493            // Insert some expressions into the cache.
494            let mut local_exps = BTreeMap::new();
495            let mut global_exps = BTreeMap::new();
496            for _ in 0..4 {
497                let id = GlobalId::User(next_id);
498                let local_exp = gen_local_expressions();
499                let global_exp = gen_global_expressions();
500
501                cache
502                    .update(
503                        vec![(id, local_exp.clone())],
504                        vec![(id, global_exp.clone())],
505                        BTreeSet::new(),
506                    )
507                    .await;
508
509                current_ids.insert(id);
510                current_ids.extend(global_exp.index_imports());
511                local_exps.insert(id, local_exp);
512                global_exps.insert(id, global_exp);
513
514                next_id += 1;
515            }
516            (local_exps, global_exps)
517        };
518
519        {
520            // Re-open the cache.
521            let (_cache, local_entries, global_entries) =
522                ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
523                    build_version: first_version.clone(),
524                    persist: persist.clone(),
525                    shard_id,
526                    current_ids: current_ids.clone(),
527                    remove_prior_versions,
528                    compact_shard,
529                    dyncfgs: dyncfgs.clone(),
530                })
531                .await;
532            assert_eq!(
533                local_entries, local_exps,
534                "local expression with non-matching optimizer features should be removed during reconciliation"
535            );
536            assert_eq!(
537                global_entries, global_exps,
538                "global expression with non-matching optimizer features should be removed during reconciliation"
539            );
540        }
541
542        {
543            // Simulate dropping an object.
544            let id_to_remove = local_exps.keys().next().expect("not empty").clone();
545            current_ids.remove(&id_to_remove);
546            let _removed_local_exp = local_exps.remove(&id_to_remove);
547            let _removed_global_exp = global_exps.remove(&id_to_remove);
548
549            // Re-open the cache.
550            let (_cache, local_entries, global_entries) =
551                ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
552                    build_version: first_version.clone(),
553                    persist: persist.clone(),
554                    shard_id,
555                    current_ids: current_ids.clone(),
556                    remove_prior_versions,
557                    compact_shard,
558                    dyncfgs: dyncfgs.clone(),
559                })
560                .await;
561            assert_eq!(
562                local_entries, local_exps,
563                "dropped local objects should be removed during reconciliation"
564            );
565            assert_eq!(
566                global_entries, global_exps,
567                "dropped global objects should be removed during reconciliation"
568            );
569        }
570
571        {
572            // Simulate dropping an object dependency.
573            let global_exp_to_remove = global_exps.keys().next().expect("not empty").clone();
574            let removed_global_exp = global_exps
575                .remove(&global_exp_to_remove)
576                .expect("known to exist");
577            let dependency_to_remove = removed_global_exp
578                .index_imports()
579                .next()
580                .expect("generator always makes non-empty index imports");
581            current_ids.remove(dependency_to_remove);
582
583            // If the dependency is also tracked in the cache remove it.
584            let _removed_local_exp = local_exps.remove(dependency_to_remove);
585            let _removed_global_exp = global_exps.remove(dependency_to_remove);
586            // Remove any other exps that depend on dependency.
587            global_exps.retain(|_, exp| {
588                let index_imports: BTreeSet<_> = exp.index_imports().collect();
589                !index_imports.contains(&dependency_to_remove)
590            });
591
592            // Re-open the cache.
593            let (_cache, local_entries, global_entries) =
594                ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
595                    build_version: first_version.clone(),
596                    persist: persist.clone(),
597                    shard_id,
598                    current_ids: current_ids.clone(),
599                    remove_prior_versions,
600                    compact_shard,
601                    dyncfgs: dyncfgs.clone(),
602                })
603                .await;
604            assert_eq!(
605                local_entries, local_exps,
606                "dropped object dependencies should NOT remove local expressions"
607            );
608            assert_eq!(
609                global_entries, global_exps,
610                "dropped object dependencies should remove global expressions"
611            );
612        }
613
614        let (new_gen_local_exps, new_gen_global_exps) = {
615            // Open the cache at a new version.
616            let (cache, local_entries, global_entries) =
617                ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
618                    build_version: second_version.clone(),
619                    persist: persist.clone(),
620                    shard_id,
621                    current_ids: current_ids.clone(),
622                    remove_prior_versions,
623                    compact_shard,
624                    dyncfgs: dyncfgs.clone(),
625                })
626                .await;
627            assert_eq!(
628                local_entries,
629                BTreeMap::new(),
630                "new version should be empty"
631            );
632            assert_eq!(
633                global_entries,
634                BTreeMap::new(),
635                "new version should be empty"
636            );
637
638            // Insert some expressions at the new version.
639            let mut local_exps = BTreeMap::new();
640            let mut global_exps = BTreeMap::new();
641            for _ in 0..2 {
642                let id = GlobalId::User(next_id);
643                let local_exp = gen_local_expressions();
644                let global_exp = gen_global_expressions();
645
646                cache
647                    .update(
648                        vec![(id, local_exp.clone())],
649                        vec![(id, global_exp.clone())],
650                        BTreeSet::new(),
651                    )
652                    .await;
653
654                current_ids.insert(id);
655                current_ids.extend(global_exp.index_imports());
656                local_exps.insert(id, local_exp);
657                global_exps.insert(id, global_exp);
658
659                next_id += 1;
660            }
661            (local_exps, global_exps)
662        };
663
664        {
665            // Re-open the cache at the first version.
666            let (_cache, local_entries, global_entries) =
667                ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
668                    build_version: first_version.clone(),
669                    persist: persist.clone(),
670                    shard_id,
671                    current_ids: current_ids.clone(),
672                    remove_prior_versions,
673                    compact_shard,
674                    dyncfgs: dyncfgs.clone(),
675                })
676                .await;
677            assert_eq!(
678                local_entries, local_exps,
679                "Previous version local expressions should still exist"
680            );
681            assert_eq!(
682                global_entries, global_exps,
683                "Previous version global expressions should still exist"
684            );
685        }
686
687        {
688            // Open the cache at a new version and clear previous versions.
689            remove_prior_versions = true;
690            let (_cache, local_entries, global_entries) =
691                ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
692                    build_version: second_version.clone(),
693                    persist: persist.clone(),
694                    shard_id,
695                    current_ids: current_ids.clone(),
696                    remove_prior_versions,
697                    compact_shard,
698                    dyncfgs: dyncfgs.clone(),
699                })
700                .await;
701            assert_eq!(
702                local_entries, new_gen_local_exps,
703                "new version local expressions should be persisted"
704            );
705            assert_eq!(
706                global_entries, new_gen_global_exps,
707                "new version global expressions should be persisted"
708            );
709        }
710
711        {
712            // Re-open the cache at the first version.
713            let (_cache, local_entries, global_entries) =
714                ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
715                    build_version: first_version.clone(),
716                    persist: persist.clone(),
717                    shard_id,
718                    current_ids: current_ids.clone(),
719                    remove_prior_versions,
720                    compact_shard,
721                    dyncfgs: dyncfgs.clone(),
722                })
723                .await;
724            assert_eq!(
725                local_entries,
726                BTreeMap::new(),
727                "Previous version local expressions should be cleared"
728            );
729            assert_eq!(
730                global_entries,
731                BTreeMap::new(),
732                "Previous version global expressions should be cleared"
733            );
734        }
735    }
736
737    #[mz_ore::test]
738    fn local_expr_cache_roundtrip() {
739        let key = CacheKey {
740            id: GlobalId::User(1),
741            build_version: "1.2.3".into(),
742            expr_type: ExpressionType::Local,
743        };
744        let val = gen_local_expressions();
745
746        let bincode_val = Bytes::from(bincode::serialize(&val).expect("must serialize"));
747        let (encoded_key, encoded_val) = ExpressionCodec::encode(&key, &bincode_val);
748        let (decoded_key, decoded_val) = ExpressionCodec::decode(&encoded_key, &encoded_val);
749        let decoded_val: LocalExpressions =
750            bincode::deserialize(&decoded_val).expect("local expressions should roundtrip");
751
752        assert_eq!(key, decoded_key);
753        assert_eq!(val, decoded_val);
754    }
755
756    #[mz_ore::test]
757    fn global_expr_cache_roundtrip() {
758        let key = CacheKey {
759            id: GlobalId::User(1),
760            build_version: "1.2.3".into(),
761            expr_type: ExpressionType::Global,
762        };
763        let val = gen_global_expressions();
764
765        let bincode_val = Bytes::from(bincode::serialize(&val).expect("must serialize"));
766        let (encoded_key, encoded_val) = ExpressionCodec::encode(&key, &bincode_val);
767        let (decoded_key, decoded_val) = ExpressionCodec::decode(&encoded_key, &encoded_val);
768        let decoded_val: GlobalExpressions =
769            bincode::deserialize(&decoded_val).expect("global expressions should roundtrip");
770
771        assert_eq!(key, decoded_key);
772        assert_eq!(val, decoded_val);
773    }
774
775    /// Generate a random [`LocalExpressions`] value.
776    ///
777    /// The returned values are mostly hardcoded and only differ in a single randomized number.
778    /// That's sufficient for the expr cache tests, since the cache mostly treats expressions as
779    /// opaque objects.
780    fn gen_local_expressions() -> LocalExpressions {
781        let datum = Datum::UInt64(rand::random());
782
783        LocalExpressions {
784            local_mir: OptimizedMirRelationExpr(MirRelationExpr::constant(
785                vec![vec![datum]],
786                SqlRelationType::new(vec![SqlScalarType::UInt64.nullable(false)]),
787            )),
788            optimizer_features: Default::default(),
789        }
790    }
791
792    /// Generate a random [`GlobalExpressions`] value.
793    ///
794    /// The returned values are mostly hardcoded and only differ in a single randomized string.
795    /// That's sufficient for the expr cache tests, since the cache mostly treats expressions as
796    /// opaque objects.
797    fn gen_global_expressions() -> GlobalExpressions {
798        let name = format!("test-{}", rand::random::<u64>());
799
800        let mut global_mir = DataflowDescription::new(name.clone());
801        let mut physical_plan = DataflowDescription::new(name);
802
803        // Add pieces expected by tests.
804        let index_imports = BTreeMap::from_iter([(
805            GlobalId::User(2),
806            IndexImport {
807                desc: IndexDesc {
808                    on_id: GlobalId::User(1),
809                    key: Default::default(),
810                },
811                typ: SqlRelationType::empty(),
812                monotonic: false,
813                with_snapshot: true,
814            },
815        )]);
816        global_mir.index_imports = index_imports.clone();
817        physical_plan.index_imports = index_imports;
818
819        GlobalExpressions {
820            global_mir,
821            physical_plan,
822            dataflow_metainfos: Default::default(),
823            optimizer_features: Default::default(),
824        }
825    }
826}