mz_catalog/
expr_cache.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A cache for optimized expressions.
11
12use std::collections::{BTreeMap, BTreeSet};
13use std::future::Future;
14use std::sync::Arc;
15
16use bytes::Bytes;
17use mz_compute_types::dataflows::DataflowDescription;
18use mz_durable_cache::{DurableCache, DurableCacheCodec};
19use mz_dyncfg::ConfigSet;
20use mz_expr::OptimizedMirRelationExpr;
21use mz_ore::channel::trigger;
22use mz_ore::soft_panic_or_log;
23use mz_ore::task::spawn;
24use mz_persist_client::PersistClient;
25use mz_persist_client::cli::admin::{
26    EXPRESSION_CACHE_FORCE_COMPACTION_FUEL, EXPRESSION_CACHE_FORCE_COMPACTION_WAIT,
27};
28use mz_persist_types::codec_impls::VecU8Schema;
29use mz_persist_types::{Codec, ShardId};
30use mz_repr::GlobalId;
31use mz_repr::optimize::OptimizerFeatures;
32use mz_transform::dataflow::DataflowMetainfo;
33use mz_transform::notice::OptimizerNotice;
34use semver::Version;
35use serde::{Deserialize, Serialize};
36use tokio::sync::mpsc;
37use tracing::{debug, warn};
38
39#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
40enum ExpressionType {
41    Local,
42    Global,
43}
44
45/// The data that is cached per catalog object as a result of local optimizations.
46#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
47pub struct LocalExpressions {
48    pub local_mir: OptimizedMirRelationExpr,
49    pub optimizer_features: OptimizerFeatures,
50}
51
52/// The data that is cached per catalog object as a result of global optimizations.
53#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
54pub struct GlobalExpressions {
55    pub global_mir: DataflowDescription<OptimizedMirRelationExpr>,
56    pub physical_plan: DataflowDescription<mz_compute_types::plan::Plan>,
57    pub dataflow_metainfos: DataflowMetainfo<Arc<OptimizerNotice>>,
58    pub optimizer_features: OptimizerFeatures,
59}
60
61impl GlobalExpressions {
62    fn index_imports(&self) -> impl Iterator<Item = &GlobalId> {
63        self.global_mir
64            .index_imports
65            .keys()
66            .chain(self.physical_plan.index_imports.keys())
67    }
68}
69
70#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
71struct CacheKey {
72    build_version: String,
73    id: GlobalId,
74    expr_type: ExpressionType,
75}
76
77#[derive(Debug, PartialEq, Eq)]
78struct ExpressionCodec;
79
80impl DurableCacheCodec for ExpressionCodec {
81    type Key = CacheKey;
82    // We use a raw bytes instead of `Expressions` so that there is no backwards compatibility
83    // requirement on `Expressions` between versions.
84    type Val = Bytes;
85    type KeyCodec = Bytes;
86    type ValCodec = Bytes;
87
88    fn schemas() -> (
89        <Self::KeyCodec as Codec>::Schema,
90        <Self::ValCodec as Codec>::Schema,
91    ) {
92        (VecU8Schema::default(), VecU8Schema::default())
93    }
94
95    fn encode(key: &Self::Key, val: &Self::Val) -> (Self::KeyCodec, Self::ValCodec) {
96        let key = bincode::serialize(key).expect("must serialize");
97        (Bytes::from(key), val.clone())
98    }
99
100    fn decode(key: &Self::KeyCodec, val: &Self::ValCodec) -> (Self::Key, Self::Val) {
101        let key = bincode::deserialize(key).expect("must deserialize");
102        (key, val.clone())
103    }
104}
105
106/// Configuration needed to initialize an [`ExpressionCache`].
107#[derive(Debug, Clone)]
108pub struct ExpressionCacheConfig {
109    pub build_version: Version,
110    pub persist: PersistClient,
111    pub shard_id: ShardId,
112    pub current_ids: BTreeSet<GlobalId>,
113    pub remove_prior_versions: bool,
114    pub compact_shard: bool,
115    pub dyncfgs: ConfigSet,
116}
117
118/// A durable cache of optimized expressions.
119pub struct ExpressionCache {
120    build_version: Version,
121    durable_cache: DurableCache<ExpressionCodec>,
122}
123
124impl ExpressionCache {
125    /// Creates a new [`ExpressionCache`] and reconciles all entries in current build version.
126    /// Reconciliation will remove all entries that are not in `current_ids` and remove all
127    /// entries that have optimizer features that are not equal to `optimizer_features`.
128    ///
129    /// If `remove_prior_versions` is `true`, then all previous versions are durably removed from the
130    /// cache.
131    ///
132    /// If `compact_shard` is `true`, then this function will block on fully compacting the backing
133    /// persist shard.
134    ///
135    /// Returns all cached expressions in the current build version, after reconciliation.
136    pub async fn open(
137        ExpressionCacheConfig {
138            build_version,
139            persist,
140            shard_id,
141            current_ids,
142            remove_prior_versions,
143            compact_shard,
144            dyncfgs,
145        }: ExpressionCacheConfig,
146    ) -> (
147        Self,
148        BTreeMap<GlobalId, LocalExpressions>,
149        BTreeMap<GlobalId, GlobalExpressions>,
150    ) {
151        let durable_cache = DurableCache::new(&persist, shard_id, "expressions").await;
152        let mut cache = Self {
153            build_version,
154            durable_cache,
155        };
156
157        const RETRIES: usize = 100;
158        for _ in 0..RETRIES {
159            match cache
160                .try_open(&current_ids, remove_prior_versions, compact_shard, &dyncfgs)
161                .await
162            {
163                Ok((local_expressions, global_expressions)) => {
164                    return (cache, local_expressions, global_expressions);
165                }
166                Err(err) => debug!("failed to open cache: {err} ... retrying"),
167            }
168        }
169
170        panic!("Unable to open expression cache after {RETRIES} retries");
171    }
172
173    async fn try_open(
174        &mut self,
175        current_ids: &BTreeSet<GlobalId>,
176        remove_prior_versions: bool,
177        compact_shard: bool,
178        dyncfgs: &ConfigSet,
179    ) -> Result<
180        (
181            BTreeMap<GlobalId, LocalExpressions>,
182            BTreeMap<GlobalId, GlobalExpressions>,
183        ),
184        mz_durable_cache::Error,
185    > {
186        let mut keys_to_remove = Vec::new();
187        let mut local_expressions = BTreeMap::new();
188        let mut global_expressions = BTreeMap::new();
189
190        for (key, expressions) in self.durable_cache.entries_local() {
191            let build_version = match key.build_version.parse::<Version>() {
192                Ok(build_version) => build_version,
193                Err(err) => {
194                    warn!("unable to parse build version: {key:?}: {err:?}");
195                    keys_to_remove.push((key.clone(), None));
196                    continue;
197                }
198            };
199            if build_version == self.build_version {
200                // Only deserialize the current version.
201                match key.expr_type {
202                    ExpressionType::Local => {
203                        let expressions: LocalExpressions = match bincode::deserialize(expressions)
204                        {
205                            Ok(expressions) => expressions,
206                            Err(err) => {
207                                soft_panic_or_log!(
208                                    "unable to deserialize local expressions: ({key:?}, {expressions:?}): {err:?}"
209                                );
210                                continue;
211                            }
212                        };
213                        // Remove dropped IDs.
214                        if !current_ids.contains(&key.id) {
215                            keys_to_remove.push((key.clone(), None));
216                        } else {
217                            local_expressions.insert(key.id, expressions);
218                        }
219                    }
220                    ExpressionType::Global => {
221                        let expressions: GlobalExpressions = match bincode::deserialize(expressions)
222                        {
223                            Ok(expressions) => expressions,
224                            Err(err) => {
225                                soft_panic_or_log!(
226                                    "unable to deserialize global expressions: ({key:?}, {expressions:?}): {err:?}"
227                                );
228                                continue;
229                            }
230                        };
231                        // Remove dropped IDs and expressions that rely on dropped indexes.
232                        let index_dependencies: BTreeSet<_> =
233                            expressions.index_imports().cloned().collect();
234                        if !current_ids.contains(&key.id)
235                            || !index_dependencies.is_subset(current_ids)
236                        {
237                            keys_to_remove.push((key.clone(), None));
238                        } else {
239                            global_expressions.insert(key.id, expressions);
240                        }
241                    }
242                }
243            } else if remove_prior_versions {
244                // Remove expressions from previous versions.
245                keys_to_remove.push((key.clone(), None));
246            }
247        }
248
249        let keys_to_remove: Vec<_> = keys_to_remove
250            .iter()
251            .map(|(key, expressions)| (key, expressions.as_ref()))
252            .collect();
253        self.durable_cache.try_set_many(&keys_to_remove).await?;
254
255        if compact_shard {
256            let fuel = EXPRESSION_CACHE_FORCE_COMPACTION_FUEL.handle(dyncfgs);
257            let wait = EXPRESSION_CACHE_FORCE_COMPACTION_WAIT.handle(dyncfgs);
258            self.durable_cache
259                .dangerous_compact_shard(move || fuel.get(), move || wait.get())
260                .await;
261        }
262
263        Ok((local_expressions, global_expressions))
264    }
265
266    /// Durably removes all entries given by `invalidate_ids` and inserts `new_local_expressions`
267    /// and `new_global_expressions` into current build version.
268    ///
269    /// If there is a duplicate ID in both `invalidate_ids` and one of the new expressions vector,
270    /// then the final value will be taken from the new expressions vector.
271    async fn update(
272        &mut self,
273        new_local_expressions: Vec<(GlobalId, LocalExpressions)>,
274        new_global_expressions: Vec<(GlobalId, GlobalExpressions)>,
275        invalidate_ids: BTreeSet<GlobalId>,
276    ) {
277        let mut entries = BTreeMap::new();
278        let build_version = self.build_version.to_string();
279        // Important to do `invalidate_ids` first, so that `new_X_expressions` overwrites duplicate
280        // keys.
281        for id in invalidate_ids {
282            entries.insert(
283                CacheKey {
284                    id,
285                    build_version: build_version.clone(),
286                    expr_type: ExpressionType::Local,
287                },
288                None,
289            );
290            entries.insert(
291                CacheKey {
292                    id,
293                    build_version: build_version.clone(),
294                    expr_type: ExpressionType::Global,
295                },
296                None,
297            );
298        }
299        for (id, expressions) in new_local_expressions {
300            let expressions = match bincode::serialize(&expressions) {
301                Ok(expressions) => Bytes::from(expressions),
302                Err(err) => {
303                    soft_panic_or_log!(
304                        "unable to serialize local expressions: {expressions:?}: {err:?}"
305                    );
306                    continue;
307                }
308            };
309            entries.insert(
310                CacheKey {
311                    id,
312                    build_version: build_version.clone(),
313                    expr_type: ExpressionType::Local,
314                },
315                Some(expressions),
316            );
317        }
318        for (id, expressions) in new_global_expressions {
319            let expressions = match bincode::serialize(&expressions) {
320                Ok(expressions) => Bytes::from(expressions),
321                Err(err) => {
322                    soft_panic_or_log!(
323                        "unable to serialize global expressions: {expressions:?}: {err:?}"
324                    );
325                    continue;
326                }
327            };
328            entries.insert(
329                CacheKey {
330                    id,
331                    build_version: build_version.clone(),
332                    expr_type: ExpressionType::Global,
333                },
334                Some(expressions),
335            );
336        }
337        let entries: Vec<_> = entries
338            .iter()
339            .map(|(key, expressions)| (key, expressions.as_ref()))
340            .collect();
341        self.durable_cache.set_many(&entries).await
342    }
343}
344
345/// Operations to perform on the cache.
346enum CacheOperation {
347    /// See [`ExpressionCache::update`].
348    Update {
349        new_local_expressions: Vec<(GlobalId, LocalExpressions)>,
350        new_global_expressions: Vec<(GlobalId, GlobalExpressions)>,
351        invalidate_ids: BTreeSet<GlobalId>,
352        trigger: trigger::Trigger,
353    },
354}
355
356#[derive(Debug, Clone)]
357pub struct ExpressionCacheHandle {
358    tx: mpsc::UnboundedSender<CacheOperation>,
359}
360
361impl ExpressionCacheHandle {
362    /// Spawns a task responsible for managing the expression cache. See [`ExpressionCache::open`].
363    ///
364    /// Returns a handle to interact with the cache and the initial contents of the cache.
365    pub async fn spawn_expression_cache(
366        config: ExpressionCacheConfig,
367    ) -> (
368        Self,
369        BTreeMap<GlobalId, LocalExpressions>,
370        BTreeMap<GlobalId, GlobalExpressions>,
371    ) {
372        let (mut cache, local_expressions, global_expressions) =
373            ExpressionCache::open(config).await;
374        let (tx, mut rx) = mpsc::unbounded_channel();
375        spawn(|| "expression-cache-task", async move {
376            loop {
377                while let Some(op) = rx.recv().await {
378                    match op {
379                        CacheOperation::Update {
380                            new_local_expressions,
381                            new_global_expressions,
382                            invalidate_ids,
383                            trigger: _trigger,
384                        } => {
385                            cache
386                                .update(
387                                    new_local_expressions,
388                                    new_global_expressions,
389                                    invalidate_ids,
390                                )
391                                .await
392                        }
393                    }
394                }
395            }
396        });
397
398        (Self { tx }, local_expressions, global_expressions)
399    }
400
401    pub fn update(
402        &self,
403        new_local_expressions: Vec<(GlobalId, LocalExpressions)>,
404        new_global_expressions: Vec<(GlobalId, GlobalExpressions)>,
405        invalidate_ids: BTreeSet<GlobalId>,
406    ) -> impl Future<Output = ()> + use<> {
407        let (trigger, trigger_rx) = trigger::channel();
408        let op = CacheOperation::Update {
409            new_local_expressions,
410            new_global_expressions,
411            invalidate_ids,
412            trigger,
413        };
414        // If the send fails, then we must be shutting down.
415        let _ = self.tx.send(op);
416        trigger_rx
417    }
418}
419
420#[cfg(test)]
421mod tests {
422    use std::collections::{BTreeMap, BTreeSet};
423
424    use bytes::Bytes;
425    use mz_compute_types::dataflows::{DataflowDescription, IndexDesc, IndexImport};
426    use mz_durable_cache::DurableCacheCodec;
427    use mz_dyncfg::ConfigSet;
428    use mz_expr::{MirRelationExpr, OptimizedMirRelationExpr};
429    use mz_persist_client::PersistClient;
430    use mz_persist_types::ShardId;
431    use mz_repr::{Datum, GlobalId, SqlRelationType, SqlScalarType};
432    use semver::Version;
433
434    use super::*;
435
436    #[mz_ore::test(tokio::test)]
437    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
438    async fn expression_cache() {
439        let first_version = Version::new(0, 1, 0);
440        let second_version = Version::new(0, 2, 0);
441        let persist = PersistClient::new_for_tests().await;
442        let shard_id = ShardId::new();
443
444        let mut current_ids = BTreeSet::new();
445        let mut remove_prior_versions = false;
446        // Compacting the shard takes too long, so we leave it to integration tests.
447        let compact_shard = false;
448        let dyncfgs = &mz_persist_client::cfg::all_dyncfgs(ConfigSet::default());
449
450        let mut next_id = 0;
451
452        let (mut local_exps, mut global_exps) = {
453            // Open a new empty cache.
454            let (cache, local_exprs, global_exprs) =
455                ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
456                    build_version: first_version.clone(),
457                    persist: persist.clone(),
458                    shard_id,
459                    current_ids: current_ids.clone(),
460                    remove_prior_versions,
461                    compact_shard,
462                    dyncfgs: dyncfgs.clone(),
463                })
464                .await;
465            assert_eq!(local_exprs, BTreeMap::new(), "new cache should be empty");
466            assert_eq!(global_exprs, BTreeMap::new(), "new cache should be empty");
467
468            // Insert some expressions into the cache.
469            let mut local_exps = BTreeMap::new();
470            let mut global_exps = BTreeMap::new();
471            for _ in 0..4 {
472                let id = GlobalId::User(next_id);
473                let local_exp = gen_local_expressions();
474                let global_exp = gen_global_expressions();
475
476                cache
477                    .update(
478                        vec![(id, local_exp.clone())],
479                        vec![(id, global_exp.clone())],
480                        BTreeSet::new(),
481                    )
482                    .await;
483
484                current_ids.insert(id);
485                current_ids.extend(global_exp.index_imports());
486                local_exps.insert(id, local_exp);
487                global_exps.insert(id, global_exp);
488
489                next_id += 1;
490            }
491            (local_exps, global_exps)
492        };
493
494        {
495            // Re-open the cache.
496            let (_cache, local_entries, global_entries) =
497                ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
498                    build_version: first_version.clone(),
499                    persist: persist.clone(),
500                    shard_id,
501                    current_ids: current_ids.clone(),
502                    remove_prior_versions,
503                    compact_shard,
504                    dyncfgs: dyncfgs.clone(),
505                })
506                .await;
507            assert_eq!(
508                local_entries, local_exps,
509                "local expression with non-matching optimizer features should be removed during reconciliation"
510            );
511            assert_eq!(
512                global_entries, global_exps,
513                "global expression with non-matching optimizer features should be removed during reconciliation"
514            );
515        }
516
517        {
518            // Simulate dropping an object.
519            let id_to_remove = local_exps.keys().next().expect("not empty").clone();
520            current_ids.remove(&id_to_remove);
521            let _removed_local_exp = local_exps.remove(&id_to_remove);
522            let _removed_global_exp = global_exps.remove(&id_to_remove);
523
524            // Re-open the cache.
525            let (_cache, local_entries, global_entries) =
526                ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
527                    build_version: first_version.clone(),
528                    persist: persist.clone(),
529                    shard_id,
530                    current_ids: current_ids.clone(),
531                    remove_prior_versions,
532                    compact_shard,
533                    dyncfgs: dyncfgs.clone(),
534                })
535                .await;
536            assert_eq!(
537                local_entries, local_exps,
538                "dropped local objects should be removed during reconciliation"
539            );
540            assert_eq!(
541                global_entries, global_exps,
542                "dropped global objects should be removed during reconciliation"
543            );
544        }
545
546        {
547            // Simulate dropping an object dependency.
548            let global_exp_to_remove = global_exps.keys().next().expect("not empty").clone();
549            let removed_global_exp = global_exps
550                .remove(&global_exp_to_remove)
551                .expect("known to exist");
552            let dependency_to_remove = removed_global_exp
553                .index_imports()
554                .next()
555                .expect("generator always makes non-empty index imports");
556            current_ids.remove(dependency_to_remove);
557
558            // If the dependency is also tracked in the cache remove it.
559            let _removed_local_exp = local_exps.remove(dependency_to_remove);
560            let _removed_global_exp = global_exps.remove(dependency_to_remove);
561            // Remove any other exps that depend on dependency.
562            global_exps.retain(|_, exp| {
563                let index_imports: BTreeSet<_> = exp.index_imports().collect();
564                !index_imports.contains(&dependency_to_remove)
565            });
566
567            // Re-open the cache.
568            let (_cache, local_entries, global_entries) =
569                ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
570                    build_version: first_version.clone(),
571                    persist: persist.clone(),
572                    shard_id,
573                    current_ids: current_ids.clone(),
574                    remove_prior_versions,
575                    compact_shard,
576                    dyncfgs: dyncfgs.clone(),
577                })
578                .await;
579            assert_eq!(
580                local_entries, local_exps,
581                "dropped object dependencies should NOT remove local expressions"
582            );
583            assert_eq!(
584                global_entries, global_exps,
585                "dropped object dependencies should remove global expressions"
586            );
587        }
588
589        let (new_gen_local_exps, new_gen_global_exps) = {
590            // Open the cache at a new version.
591            let (cache, local_entries, global_entries) =
592                ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
593                    build_version: second_version.clone(),
594                    persist: persist.clone(),
595                    shard_id,
596                    current_ids: current_ids.clone(),
597                    remove_prior_versions,
598                    compact_shard,
599                    dyncfgs: dyncfgs.clone(),
600                })
601                .await;
602            assert_eq!(
603                local_entries,
604                BTreeMap::new(),
605                "new version should be empty"
606            );
607            assert_eq!(
608                global_entries,
609                BTreeMap::new(),
610                "new version should be empty"
611            );
612
613            // Insert some expressions at the new version.
614            let mut local_exps = BTreeMap::new();
615            let mut global_exps = BTreeMap::new();
616            for _ in 0..2 {
617                let id = GlobalId::User(next_id);
618                let local_exp = gen_local_expressions();
619                let global_exp = gen_global_expressions();
620
621                cache
622                    .update(
623                        vec![(id, local_exp.clone())],
624                        vec![(id, global_exp.clone())],
625                        BTreeSet::new(),
626                    )
627                    .await;
628
629                current_ids.insert(id);
630                current_ids.extend(global_exp.index_imports());
631                local_exps.insert(id, local_exp);
632                global_exps.insert(id, global_exp);
633
634                next_id += 1;
635            }
636            (local_exps, global_exps)
637        };
638
639        {
640            // Re-open the cache at the first version.
641            let (_cache, local_entries, global_entries) =
642                ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
643                    build_version: first_version.clone(),
644                    persist: persist.clone(),
645                    shard_id,
646                    current_ids: current_ids.clone(),
647                    remove_prior_versions,
648                    compact_shard,
649                    dyncfgs: dyncfgs.clone(),
650                })
651                .await;
652            assert_eq!(
653                local_entries, local_exps,
654                "Previous version local expressions should still exist"
655            );
656            assert_eq!(
657                global_entries, global_exps,
658                "Previous version global expressions should still exist"
659            );
660        }
661
662        {
663            // Open the cache at a new version and clear previous versions.
664            remove_prior_versions = true;
665            let (_cache, local_entries, global_entries) =
666                ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
667                    build_version: second_version.clone(),
668                    persist: persist.clone(),
669                    shard_id,
670                    current_ids: current_ids.clone(),
671                    remove_prior_versions,
672                    compact_shard,
673                    dyncfgs: dyncfgs.clone(),
674                })
675                .await;
676            assert_eq!(
677                local_entries, new_gen_local_exps,
678                "new version local expressions should be persisted"
679            );
680            assert_eq!(
681                global_entries, new_gen_global_exps,
682                "new version global expressions should be persisted"
683            );
684        }
685
686        {
687            // Re-open the cache at the first version.
688            let (_cache, local_entries, global_entries) =
689                ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
690                    build_version: first_version.clone(),
691                    persist: persist.clone(),
692                    shard_id,
693                    current_ids: current_ids.clone(),
694                    remove_prior_versions,
695                    compact_shard,
696                    dyncfgs: dyncfgs.clone(),
697                })
698                .await;
699            assert_eq!(
700                local_entries,
701                BTreeMap::new(),
702                "Previous version local expressions should be cleared"
703            );
704            assert_eq!(
705                global_entries,
706                BTreeMap::new(),
707                "Previous version global expressions should be cleared"
708            );
709        }
710    }
711
712    #[mz_ore::test]
713    fn local_expr_cache_roundtrip() {
714        let key = CacheKey {
715            id: GlobalId::User(1),
716            build_version: "1.2.3".into(),
717            expr_type: ExpressionType::Local,
718        };
719        let val = gen_local_expressions();
720
721        let bincode_val = Bytes::from(bincode::serialize(&val).expect("must serialize"));
722        let (encoded_key, encoded_val) = ExpressionCodec::encode(&key, &bincode_val);
723        let (decoded_key, decoded_val) = ExpressionCodec::decode(&encoded_key, &encoded_val);
724        let decoded_val: LocalExpressions =
725            bincode::deserialize(&decoded_val).expect("local expressions should roundtrip");
726
727        assert_eq!(key, decoded_key);
728        assert_eq!(val, decoded_val);
729    }
730
731    #[mz_ore::test]
732    fn global_expr_cache_roundtrip() {
733        let key = CacheKey {
734            id: GlobalId::User(1),
735            build_version: "1.2.3".into(),
736            expr_type: ExpressionType::Global,
737        };
738        let val = gen_global_expressions();
739
740        let bincode_val = Bytes::from(bincode::serialize(&val).expect("must serialize"));
741        let (encoded_key, encoded_val) = ExpressionCodec::encode(&key, &bincode_val);
742        let (decoded_key, decoded_val) = ExpressionCodec::decode(&encoded_key, &encoded_val);
743        let decoded_val: GlobalExpressions =
744            bincode::deserialize(&decoded_val).expect("global expressions should roundtrip");
745
746        assert_eq!(key, decoded_key);
747        assert_eq!(val, decoded_val);
748    }
749
750    /// Generate a random [`LocalExpressions`] value.
751    ///
752    /// The returned values are mostly hardcoded and only differ in a single randomized number.
753    /// That's sufficient for the expr cache tests, since the cache mostly treats expressions as
754    /// opaque objects.
755    fn gen_local_expressions() -> LocalExpressions {
756        let datum = Datum::UInt64(rand::random());
757
758        LocalExpressions {
759            local_mir: OptimizedMirRelationExpr(MirRelationExpr::constant(
760                vec![vec![datum]],
761                SqlRelationType::new(vec![SqlScalarType::UInt64.nullable(false)]),
762            )),
763            optimizer_features: Default::default(),
764        }
765    }
766
767    /// Generate a random [`GlobalExpressions`] value.
768    ///
769    /// The returned values are mostly hardcoded and only differ in a single randomized string.
770    /// That's sufficient for the expr cache tests, since the cache mostly treats expressions as
771    /// opaque objects.
772    fn gen_global_expressions() -> GlobalExpressions {
773        let name = format!("test-{}", rand::random::<u64>());
774
775        let mut global_mir = DataflowDescription::new(name.clone());
776        let mut physical_plan = DataflowDescription::new(name);
777
778        // Add pieces expected by tests.
779        let index_imports = BTreeMap::from_iter([(
780            GlobalId::User(2),
781            IndexImport {
782                desc: IndexDesc {
783                    on_id: GlobalId::User(1),
784                    key: Default::default(),
785                },
786                typ: SqlRelationType::empty(),
787                monotonic: false,
788            },
789        )]);
790        global_mir.index_imports = index_imports.clone();
791        physical_plan.index_imports = index_imports;
792
793        GlobalExpressions {
794            global_mir,
795            physical_plan,
796            dataflow_metainfos: Default::default(),
797            optimizer_features: Default::default(),
798        }
799    }
800}