mz_catalog/
expr_cache.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A cache for optimized expressions.
11
12use std::collections::{BTreeMap, BTreeSet};
13use std::future::Future;
14use std::sync::Arc;
15
16use bytes::Bytes;
17use mz_compute_types::dataflows::DataflowDescription;
18use mz_durable_cache::{DurableCache, DurableCacheCodec};
19use mz_dyncfg::ConfigSet;
20use mz_expr::OptimizedMirRelationExpr;
21use mz_ore::channel::trigger;
22use mz_ore::soft_panic_or_log;
23use mz_ore::task::spawn;
24use mz_persist_client::PersistClient;
25use mz_persist_client::cli::admin::{
26    EXPRESSION_CACHE_FORCE_COMPACTION_FUEL, EXPRESSION_CACHE_FORCE_COMPACTION_WAIT,
27};
28use mz_persist_types::codec_impls::VecU8Schema;
29use mz_persist_types::{Codec, ShardId};
30use mz_repr::GlobalId;
31use mz_repr::optimize::OptimizerFeatures;
32use mz_transform::dataflow::DataflowMetainfo;
33use mz_transform::notice::OptimizerNotice;
34use semver::Version;
35use serde::{Deserialize, Serialize};
36use tokio::sync::mpsc;
37use tracing::{debug, warn};
38
39#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
40enum ExpressionType {
41    Local,
42    Global,
43}
44
45/// The data that is cached per catalog object as a result of local optimizations.
46#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
47pub struct LocalExpressions {
48    pub local_mir: OptimizedMirRelationExpr,
49    pub optimizer_features: OptimizerFeatures,
50}
51
52/// The data that is cached per catalog object as a result of global optimizations.
53#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
54pub struct GlobalExpressions {
55    pub global_mir: DataflowDescription<OptimizedMirRelationExpr>,
56    pub physical_plan: DataflowDescription<mz_compute_types::plan::Plan>,
57    pub dataflow_metainfos: DataflowMetainfo<Arc<OptimizerNotice>>,
58    pub optimizer_features: OptimizerFeatures,
59}
60
61impl GlobalExpressions {
62    fn index_imports(&self) -> impl Iterator<Item = &GlobalId> {
63        self.global_mir
64            .index_imports
65            .keys()
66            .chain(self.physical_plan.index_imports.keys())
67    }
68}
69
70#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
71struct CacheKey {
72    build_version: String,
73    id: GlobalId,
74    expr_type: ExpressionType,
75}
76
77#[derive(Debug, PartialEq, Eq)]
78struct ExpressionCodec;
79
80impl DurableCacheCodec for ExpressionCodec {
81    type Key = CacheKey;
82    // We use a raw bytes instead of `Expressions` so that there is no backwards compatibility
83    // requirement on `Expressions` between versions.
84    type Val = Bytes;
85    type KeyCodec = Bytes;
86    type ValCodec = Bytes;
87
88    fn schemas() -> (
89        <Self::KeyCodec as Codec>::Schema,
90        <Self::ValCodec as Codec>::Schema,
91    ) {
92        (VecU8Schema::default(), VecU8Schema::default())
93    }
94
95    fn encode(key: &Self::Key, val: &Self::Val) -> (Self::KeyCodec, Self::ValCodec) {
96        let key = bincode::serialize(key).expect("must serialize");
97        (Bytes::from(key), val.clone())
98    }
99
100    fn decode(key: &Self::KeyCodec, val: &Self::ValCodec) -> (Self::Key, Self::Val) {
101        let key = bincode::deserialize(key).expect("must deserialize");
102        (key, val.clone())
103    }
104}
105
106/// Configuration needed to initialize an [`ExpressionCache`].
107#[derive(Debug, Clone)]
108pub struct ExpressionCacheConfig {
109    pub build_version: Version,
110    pub persist: PersistClient,
111    pub shard_id: ShardId,
112    pub current_ids: BTreeSet<GlobalId>,
113    pub remove_prior_versions: bool,
114    pub compact_shard: bool,
115    pub dyncfgs: ConfigSet,
116}
117
118/// A durable cache of optimized expressions.
119pub struct ExpressionCache {
120    build_version: Version,
121    durable_cache: DurableCache<ExpressionCodec>,
122}
123
124impl ExpressionCache {
125    /// Creates a new [`ExpressionCache`] and reconciles all entries in current build version.
126    /// Reconciliation will remove all entries that are not in `current_ids` and remove all
127    /// entries that have optimizer features that are not equal to `optimizer_features`.
128    ///
129    /// If `remove_prior_versions` is `true`, then all previous versions are durably removed from the
130    /// cache.
131    ///
132    /// If `compact_shard` is `true`, then this function will block on fully compacting the backing
133    /// persist shard.
134    ///
135    /// Returns all cached expressions in the current build version, after reconciliation.
136    pub async fn open(
137        ExpressionCacheConfig {
138            build_version,
139            persist,
140            shard_id,
141            current_ids,
142            remove_prior_versions,
143            compact_shard,
144            dyncfgs,
145        }: ExpressionCacheConfig,
146    ) -> (
147        Self,
148        BTreeMap<GlobalId, LocalExpressions>,
149        BTreeMap<GlobalId, GlobalExpressions>,
150    ) {
151        let durable_cache = DurableCache::new(&persist, shard_id, "expressions").await;
152        let mut cache = Self {
153            build_version,
154            durable_cache,
155        };
156
157        const RETRIES: usize = 100;
158        for _ in 0..RETRIES {
159            match cache
160                .try_open(&current_ids, remove_prior_versions, compact_shard, &dyncfgs)
161                .await
162            {
163                Ok((local_expressions, global_expressions)) => {
164                    return (cache, local_expressions, global_expressions);
165                }
166                Err(err) => debug!("failed to open cache: {err} ... retrying"),
167            }
168        }
169
170        panic!("Unable to open expression cache after {RETRIES} retries");
171    }
172
173    async fn try_open(
174        &mut self,
175        current_ids: &BTreeSet<GlobalId>,
176        remove_prior_versions: bool,
177        compact_shard: bool,
178        dyncfgs: &ConfigSet,
179    ) -> Result<
180        (
181            BTreeMap<GlobalId, LocalExpressions>,
182            BTreeMap<GlobalId, GlobalExpressions>,
183        ),
184        mz_durable_cache::Error,
185    > {
186        let mut keys_to_remove = Vec::new();
187        let mut local_expressions = BTreeMap::new();
188        let mut global_expressions = BTreeMap::new();
189
190        for (key, expressions) in self.durable_cache.entries_local() {
191            let build_version = match key.build_version.parse::<Version>() {
192                Ok(build_version) => build_version,
193                Err(err) => {
194                    warn!("unable to parse build version: {key:?}: {err:?}");
195                    keys_to_remove.push((key.clone(), None));
196                    continue;
197                }
198            };
199            if build_version == self.build_version {
200                // Only deserialize the current version.
201                match key.expr_type {
202                    ExpressionType::Local => {
203                        let expressions: LocalExpressions = match bincode::deserialize(expressions)
204                        {
205                            Ok(expressions) => expressions,
206                            Err(err) => {
207                                soft_panic_or_log!(
208                                    "unable to deserialize local expressions: ({key:?}, {expressions:?}): {err:?}"
209                                );
210                                continue;
211                            }
212                        };
213                        // Remove dropped IDs.
214                        if !current_ids.contains(&key.id) {
215                            keys_to_remove.push((key.clone(), None));
216                        } else {
217                            local_expressions.insert(key.id, expressions);
218                        }
219                    }
220                    ExpressionType::Global => {
221                        let expressions: GlobalExpressions = match bincode::deserialize(expressions)
222                        {
223                            Ok(expressions) => expressions,
224                            Err(err) => {
225                                soft_panic_or_log!(
226                                    "unable to deserialize global expressions: ({key:?}, {expressions:?}): {err:?}"
227                                );
228                                continue;
229                            }
230                        };
231                        // Remove dropped IDs and expressions that rely on dropped indexes.
232                        let index_dependencies: BTreeSet<_> =
233                            expressions.index_imports().cloned().collect();
234                        if !current_ids.contains(&key.id)
235                            || !index_dependencies.is_subset(current_ids)
236                        {
237                            keys_to_remove.push((key.clone(), None));
238                        } else {
239                            global_expressions.insert(key.id, expressions);
240                        }
241                    }
242                }
243            } else if remove_prior_versions {
244                // Remove expressions from previous versions.
245                keys_to_remove.push((key.clone(), None));
246            }
247        }
248
249        let keys_to_remove: Vec<_> = keys_to_remove
250            .iter()
251            .map(|(key, expressions)| (key, expressions.as_ref()))
252            .collect();
253        self.durable_cache.try_set_many(&keys_to_remove).await?;
254
255        if remove_prior_versions {
256            // We've purged old versions from the cache; upgrade the backing Persist version as well.
257            self.durable_cache.upgrade_version().await;
258        }
259
260        if compact_shard {
261            let fuel = EXPRESSION_CACHE_FORCE_COMPACTION_FUEL.handle(dyncfgs);
262            let wait = EXPRESSION_CACHE_FORCE_COMPACTION_WAIT.handle(dyncfgs);
263            self.durable_cache
264                .dangerous_compact_shard(move || fuel.get(), move || wait.get())
265                .await;
266        }
267
268        Ok((local_expressions, global_expressions))
269    }
270
271    /// Durably removes all entries given by `invalidate_ids` and inserts `new_local_expressions`
272    /// and `new_global_expressions` into current build version.
273    ///
274    /// If there is a duplicate ID in both `invalidate_ids` and one of the new expressions vector,
275    /// then the final value will be taken from the new expressions vector.
276    async fn update(
277        &mut self,
278        new_local_expressions: Vec<(GlobalId, LocalExpressions)>,
279        new_global_expressions: Vec<(GlobalId, GlobalExpressions)>,
280        invalidate_ids: BTreeSet<GlobalId>,
281    ) {
282        let mut entries = BTreeMap::new();
283        let build_version = self.build_version.to_string();
284        // Important to do `invalidate_ids` first, so that `new_X_expressions` overwrites duplicate
285        // keys.
286        for id in invalidate_ids {
287            entries.insert(
288                CacheKey {
289                    id,
290                    build_version: build_version.clone(),
291                    expr_type: ExpressionType::Local,
292                },
293                None,
294            );
295            entries.insert(
296                CacheKey {
297                    id,
298                    build_version: build_version.clone(),
299                    expr_type: ExpressionType::Global,
300                },
301                None,
302            );
303        }
304        for (id, expressions) in new_local_expressions {
305            let expressions = match bincode::serialize(&expressions) {
306                Ok(expressions) => Bytes::from(expressions),
307                Err(err) => {
308                    soft_panic_or_log!(
309                        "unable to serialize local expressions: {expressions:?}: {err:?}"
310                    );
311                    continue;
312                }
313            };
314            entries.insert(
315                CacheKey {
316                    id,
317                    build_version: build_version.clone(),
318                    expr_type: ExpressionType::Local,
319                },
320                Some(expressions),
321            );
322        }
323        for (id, expressions) in new_global_expressions {
324            let expressions = match bincode::serialize(&expressions) {
325                Ok(expressions) => Bytes::from(expressions),
326                Err(err) => {
327                    soft_panic_or_log!(
328                        "unable to serialize global expressions: {expressions:?}: {err:?}"
329                    );
330                    continue;
331                }
332            };
333            entries.insert(
334                CacheKey {
335                    id,
336                    build_version: build_version.clone(),
337                    expr_type: ExpressionType::Global,
338                },
339                Some(expressions),
340            );
341        }
342        let entries: Vec<_> = entries
343            .iter()
344            .map(|(key, expressions)| (key, expressions.as_ref()))
345            .collect();
346        self.durable_cache.set_many(&entries).await
347    }
348}
349
350/// Operations to perform on the cache.
351enum CacheOperation {
352    /// See [`ExpressionCache::update`].
353    Update {
354        new_local_expressions: Vec<(GlobalId, LocalExpressions)>,
355        new_global_expressions: Vec<(GlobalId, GlobalExpressions)>,
356        invalidate_ids: BTreeSet<GlobalId>,
357        trigger: trigger::Trigger,
358    },
359}
360
361#[derive(Debug, Clone)]
362pub struct ExpressionCacheHandle {
363    tx: mpsc::UnboundedSender<CacheOperation>,
364}
365
366impl ExpressionCacheHandle {
367    /// Spawns a task responsible for managing the expression cache. See [`ExpressionCache::open`].
368    ///
369    /// Returns a handle to interact with the cache and the initial contents of the cache.
370    pub async fn spawn_expression_cache(
371        config: ExpressionCacheConfig,
372    ) -> (
373        Self,
374        BTreeMap<GlobalId, LocalExpressions>,
375        BTreeMap<GlobalId, GlobalExpressions>,
376    ) {
377        let (mut cache, local_expressions, global_expressions) =
378            ExpressionCache::open(config).await;
379        let (tx, mut rx) = mpsc::unbounded_channel();
380        spawn(|| "expression-cache-task", async move {
381            loop {
382                while let Some(op) = rx.recv().await {
383                    match op {
384                        CacheOperation::Update {
385                            new_local_expressions,
386                            new_global_expressions,
387                            invalidate_ids,
388                            trigger: _trigger,
389                        } => {
390                            cache
391                                .update(
392                                    new_local_expressions,
393                                    new_global_expressions,
394                                    invalidate_ids,
395                                )
396                                .await
397                        }
398                    }
399                }
400            }
401        });
402
403        (Self { tx }, local_expressions, global_expressions)
404    }
405
406    pub fn update(
407        &self,
408        new_local_expressions: Vec<(GlobalId, LocalExpressions)>,
409        new_global_expressions: Vec<(GlobalId, GlobalExpressions)>,
410        invalidate_ids: BTreeSet<GlobalId>,
411    ) -> impl Future<Output = ()> + use<> {
412        let (trigger, trigger_rx) = trigger::channel();
413        let op = CacheOperation::Update {
414            new_local_expressions,
415            new_global_expressions,
416            invalidate_ids,
417            trigger,
418        };
419        // If the send fails, then we must be shutting down.
420        let _ = self.tx.send(op);
421        trigger_rx
422    }
423}
424
425#[cfg(test)]
426mod tests {
427    use std::collections::{BTreeMap, BTreeSet};
428
429    use bytes::Bytes;
430    use mz_compute_types::dataflows::{DataflowDescription, IndexDesc, IndexImport};
431    use mz_durable_cache::DurableCacheCodec;
432    use mz_dyncfg::ConfigSet;
433    use mz_expr::{MirRelationExpr, OptimizedMirRelationExpr};
434    use mz_persist_client::PersistClient;
435    use mz_persist_types::ShardId;
436    use mz_repr::{Datum, GlobalId, SqlRelationType, SqlScalarType};
437    use semver::Version;
438
439    use super::*;
440
441    #[mz_ore::test(tokio::test)]
442    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
443    async fn expression_cache() {
444        let first_version = Version::new(0, 1, 0);
445        let second_version = Version::new(0, 2, 0);
446        let persist = PersistClient::new_for_tests().await;
447        let shard_id = ShardId::new();
448
449        let mut current_ids = BTreeSet::new();
450        let mut remove_prior_versions = false;
451        // Compacting the shard takes too long, so we leave it to integration tests.
452        let compact_shard = false;
453        let dyncfgs = &mz_persist_client::cfg::all_dyncfgs(ConfigSet::default());
454
455        let mut next_id = 0;
456
457        let (mut local_exps, mut global_exps) = {
458            // Open a new empty cache.
459            let (cache, local_exprs, global_exprs) =
460                ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
461                    build_version: first_version.clone(),
462                    persist: persist.clone(),
463                    shard_id,
464                    current_ids: current_ids.clone(),
465                    remove_prior_versions,
466                    compact_shard,
467                    dyncfgs: dyncfgs.clone(),
468                })
469                .await;
470            assert_eq!(local_exprs, BTreeMap::new(), "new cache should be empty");
471            assert_eq!(global_exprs, BTreeMap::new(), "new cache should be empty");
472
473            // Insert some expressions into the cache.
474            let mut local_exps = BTreeMap::new();
475            let mut global_exps = BTreeMap::new();
476            for _ in 0..4 {
477                let id = GlobalId::User(next_id);
478                let local_exp = gen_local_expressions();
479                let global_exp = gen_global_expressions();
480
481                cache
482                    .update(
483                        vec![(id, local_exp.clone())],
484                        vec![(id, global_exp.clone())],
485                        BTreeSet::new(),
486                    )
487                    .await;
488
489                current_ids.insert(id);
490                current_ids.extend(global_exp.index_imports());
491                local_exps.insert(id, local_exp);
492                global_exps.insert(id, global_exp);
493
494                next_id += 1;
495            }
496            (local_exps, global_exps)
497        };
498
499        {
500            // Re-open the cache.
501            let (_cache, local_entries, global_entries) =
502                ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
503                    build_version: first_version.clone(),
504                    persist: persist.clone(),
505                    shard_id,
506                    current_ids: current_ids.clone(),
507                    remove_prior_versions,
508                    compact_shard,
509                    dyncfgs: dyncfgs.clone(),
510                })
511                .await;
512            assert_eq!(
513                local_entries, local_exps,
514                "local expression with non-matching optimizer features should be removed during reconciliation"
515            );
516            assert_eq!(
517                global_entries, global_exps,
518                "global expression with non-matching optimizer features should be removed during reconciliation"
519            );
520        }
521
522        {
523            // Simulate dropping an object.
524            let id_to_remove = local_exps.keys().next().expect("not empty").clone();
525            current_ids.remove(&id_to_remove);
526            let _removed_local_exp = local_exps.remove(&id_to_remove);
527            let _removed_global_exp = global_exps.remove(&id_to_remove);
528
529            // Re-open the cache.
530            let (_cache, local_entries, global_entries) =
531                ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
532                    build_version: first_version.clone(),
533                    persist: persist.clone(),
534                    shard_id,
535                    current_ids: current_ids.clone(),
536                    remove_prior_versions,
537                    compact_shard,
538                    dyncfgs: dyncfgs.clone(),
539                })
540                .await;
541            assert_eq!(
542                local_entries, local_exps,
543                "dropped local objects should be removed during reconciliation"
544            );
545            assert_eq!(
546                global_entries, global_exps,
547                "dropped global objects should be removed during reconciliation"
548            );
549        }
550
551        {
552            // Simulate dropping an object dependency.
553            let global_exp_to_remove = global_exps.keys().next().expect("not empty").clone();
554            let removed_global_exp = global_exps
555                .remove(&global_exp_to_remove)
556                .expect("known to exist");
557            let dependency_to_remove = removed_global_exp
558                .index_imports()
559                .next()
560                .expect("generator always makes non-empty index imports");
561            current_ids.remove(dependency_to_remove);
562
563            // If the dependency is also tracked in the cache remove it.
564            let _removed_local_exp = local_exps.remove(dependency_to_remove);
565            let _removed_global_exp = global_exps.remove(dependency_to_remove);
566            // Remove any other exps that depend on dependency.
567            global_exps.retain(|_, exp| {
568                let index_imports: BTreeSet<_> = exp.index_imports().collect();
569                !index_imports.contains(&dependency_to_remove)
570            });
571
572            // Re-open the cache.
573            let (_cache, local_entries, global_entries) =
574                ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
575                    build_version: first_version.clone(),
576                    persist: persist.clone(),
577                    shard_id,
578                    current_ids: current_ids.clone(),
579                    remove_prior_versions,
580                    compact_shard,
581                    dyncfgs: dyncfgs.clone(),
582                })
583                .await;
584            assert_eq!(
585                local_entries, local_exps,
586                "dropped object dependencies should NOT remove local expressions"
587            );
588            assert_eq!(
589                global_entries, global_exps,
590                "dropped object dependencies should remove global expressions"
591            );
592        }
593
594        let (new_gen_local_exps, new_gen_global_exps) = {
595            // Open the cache at a new version.
596            let (cache, local_entries, global_entries) =
597                ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
598                    build_version: second_version.clone(),
599                    persist: persist.clone(),
600                    shard_id,
601                    current_ids: current_ids.clone(),
602                    remove_prior_versions,
603                    compact_shard,
604                    dyncfgs: dyncfgs.clone(),
605                })
606                .await;
607            assert_eq!(
608                local_entries,
609                BTreeMap::new(),
610                "new version should be empty"
611            );
612            assert_eq!(
613                global_entries,
614                BTreeMap::new(),
615                "new version should be empty"
616            );
617
618            // Insert some expressions at the new version.
619            let mut local_exps = BTreeMap::new();
620            let mut global_exps = BTreeMap::new();
621            for _ in 0..2 {
622                let id = GlobalId::User(next_id);
623                let local_exp = gen_local_expressions();
624                let global_exp = gen_global_expressions();
625
626                cache
627                    .update(
628                        vec![(id, local_exp.clone())],
629                        vec![(id, global_exp.clone())],
630                        BTreeSet::new(),
631                    )
632                    .await;
633
634                current_ids.insert(id);
635                current_ids.extend(global_exp.index_imports());
636                local_exps.insert(id, local_exp);
637                global_exps.insert(id, global_exp);
638
639                next_id += 1;
640            }
641            (local_exps, global_exps)
642        };
643
644        {
645            // Re-open the cache at the first version.
646            let (_cache, local_entries, global_entries) =
647                ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
648                    build_version: first_version.clone(),
649                    persist: persist.clone(),
650                    shard_id,
651                    current_ids: current_ids.clone(),
652                    remove_prior_versions,
653                    compact_shard,
654                    dyncfgs: dyncfgs.clone(),
655                })
656                .await;
657            assert_eq!(
658                local_entries, local_exps,
659                "Previous version local expressions should still exist"
660            );
661            assert_eq!(
662                global_entries, global_exps,
663                "Previous version global expressions should still exist"
664            );
665        }
666
667        {
668            // Open the cache at a new version and clear previous versions.
669            remove_prior_versions = true;
670            let (_cache, local_entries, global_entries) =
671                ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
672                    build_version: second_version.clone(),
673                    persist: persist.clone(),
674                    shard_id,
675                    current_ids: current_ids.clone(),
676                    remove_prior_versions,
677                    compact_shard,
678                    dyncfgs: dyncfgs.clone(),
679                })
680                .await;
681            assert_eq!(
682                local_entries, new_gen_local_exps,
683                "new version local expressions should be persisted"
684            );
685            assert_eq!(
686                global_entries, new_gen_global_exps,
687                "new version global expressions should be persisted"
688            );
689        }
690
691        {
692            // Re-open the cache at the first version.
693            let (_cache, local_entries, global_entries) =
694                ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
695                    build_version: first_version.clone(),
696                    persist: persist.clone(),
697                    shard_id,
698                    current_ids: current_ids.clone(),
699                    remove_prior_versions,
700                    compact_shard,
701                    dyncfgs: dyncfgs.clone(),
702                })
703                .await;
704            assert_eq!(
705                local_entries,
706                BTreeMap::new(),
707                "Previous version local expressions should be cleared"
708            );
709            assert_eq!(
710                global_entries,
711                BTreeMap::new(),
712                "Previous version global expressions should be cleared"
713            );
714        }
715    }
716
717    #[mz_ore::test]
718    fn local_expr_cache_roundtrip() {
719        let key = CacheKey {
720            id: GlobalId::User(1),
721            build_version: "1.2.3".into(),
722            expr_type: ExpressionType::Local,
723        };
724        let val = gen_local_expressions();
725
726        let bincode_val = Bytes::from(bincode::serialize(&val).expect("must serialize"));
727        let (encoded_key, encoded_val) = ExpressionCodec::encode(&key, &bincode_val);
728        let (decoded_key, decoded_val) = ExpressionCodec::decode(&encoded_key, &encoded_val);
729        let decoded_val: LocalExpressions =
730            bincode::deserialize(&decoded_val).expect("local expressions should roundtrip");
731
732        assert_eq!(key, decoded_key);
733        assert_eq!(val, decoded_val);
734    }
735
736    #[mz_ore::test]
737    fn global_expr_cache_roundtrip() {
738        let key = CacheKey {
739            id: GlobalId::User(1),
740            build_version: "1.2.3".into(),
741            expr_type: ExpressionType::Global,
742        };
743        let val = gen_global_expressions();
744
745        let bincode_val = Bytes::from(bincode::serialize(&val).expect("must serialize"));
746        let (encoded_key, encoded_val) = ExpressionCodec::encode(&key, &bincode_val);
747        let (decoded_key, decoded_val) = ExpressionCodec::decode(&encoded_key, &encoded_val);
748        let decoded_val: GlobalExpressions =
749            bincode::deserialize(&decoded_val).expect("global expressions should roundtrip");
750
751        assert_eq!(key, decoded_key);
752        assert_eq!(val, decoded_val);
753    }
754
755    /// Generate a random [`LocalExpressions`] value.
756    ///
757    /// The returned values are mostly hardcoded and only differ in a single randomized number.
758    /// That's sufficient for the expr cache tests, since the cache mostly treats expressions as
759    /// opaque objects.
760    fn gen_local_expressions() -> LocalExpressions {
761        let datum = Datum::UInt64(rand::random());
762
763        LocalExpressions {
764            local_mir: OptimizedMirRelationExpr(MirRelationExpr::constant(
765                vec![vec![datum]],
766                SqlRelationType::new(vec![SqlScalarType::UInt64.nullable(false)]),
767            )),
768            optimizer_features: Default::default(),
769        }
770    }
771
772    /// Generate a random [`GlobalExpressions`] value.
773    ///
774    /// The returned values are mostly hardcoded and only differ in a single randomized string.
775    /// That's sufficient for the expr cache tests, since the cache mostly treats expressions as
776    /// opaque objects.
777    fn gen_global_expressions() -> GlobalExpressions {
778        let name = format!("test-{}", rand::random::<u64>());
779
780        let mut global_mir = DataflowDescription::new(name.clone());
781        let mut physical_plan = DataflowDescription::new(name);
782
783        // Add pieces expected by tests.
784        let index_imports = BTreeMap::from_iter([(
785            GlobalId::User(2),
786            IndexImport {
787                desc: IndexDesc {
788                    on_id: GlobalId::User(1),
789                    key: Default::default(),
790                },
791                typ: SqlRelationType::empty(),
792                monotonic: false,
793            },
794        )]);
795        global_mir.index_imports = index_imports.clone();
796        physical_plan.index_imports = index_imports;
797
798        GlobalExpressions {
799            global_mir,
800            physical_plan,
801            dataflow_metainfos: Default::default(),
802            optimizer_features: Default::default(),
803        }
804    }
805}