1use std::collections::{BTreeMap, BTreeSet};
13use std::future::Future;
14use std::sync::Arc;
15
16use bytes::Bytes;
17use mz_compute_types::dataflows::DataflowDescription;
18use mz_durable_cache::{DurableCache, DurableCacheCodec};
19use mz_dyncfg::ConfigSet;
20use mz_expr::OptimizedMirRelationExpr;
21use mz_ore::channel::trigger;
22use mz_ore::soft_panic_or_log;
23use mz_ore::task::spawn;
24use mz_persist_client::PersistClient;
25use mz_persist_client::cli::admin::{
26 EXPRESSION_CACHE_FORCE_COMPACTION_FUEL, EXPRESSION_CACHE_FORCE_COMPACTION_WAIT,
27};
28use mz_persist_types::codec_impls::VecU8Schema;
29use mz_persist_types::{Codec, ShardId};
30use mz_repr::GlobalId;
31use mz_repr::optimize::OptimizerFeatures;
32use mz_transform::dataflow::DataflowMetainfo;
33use mz_transform::notice::OptimizerNotice;
34use proptest_derive::Arbitrary;
35use semver::Version;
36use serde::{Deserialize, Serialize};
37use timely::Container;
38use tokio::sync::mpsc;
39use tracing::{debug, warn};
40
41#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize, Arbitrary)]
42enum ExpressionType {
43 Local,
44 Global,
45}
46
47#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
49pub struct LocalExpressions {
50 pub local_mir: OptimizedMirRelationExpr,
51 pub optimizer_features: OptimizerFeatures,
52}
53
54#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
56pub struct GlobalExpressions {
57 pub global_mir: DataflowDescription<OptimizedMirRelationExpr>,
58 pub physical_plan: DataflowDescription<mz_compute_types::plan::Plan>,
59 pub dataflow_metainfos: DataflowMetainfo<Arc<OptimizerNotice>>,
60 pub optimizer_features: OptimizerFeatures,
61}
62
63impl GlobalExpressions {
64 fn index_imports(&self) -> impl Iterator<Item = &GlobalId> {
65 self.global_mir
66 .index_imports
67 .keys()
68 .chain(self.physical_plan.index_imports.keys())
69 }
70}
71
72#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize, Arbitrary)]
73struct CacheKey {
74 build_version: String,
75 id: GlobalId,
76 expr_type: ExpressionType,
77}
78
79#[derive(Debug, PartialEq, Eq)]
80struct ExpressionCodec;
81
82impl DurableCacheCodec for ExpressionCodec {
83 type Key = CacheKey;
84 type Val = Bytes;
87 type KeyCodec = Bytes;
88 type ValCodec = Bytes;
89
90 fn schemas() -> (
91 <Self::KeyCodec as Codec>::Schema,
92 <Self::ValCodec as Codec>::Schema,
93 ) {
94 (VecU8Schema::default(), VecU8Schema::default())
95 }
96
97 fn encode(key: &Self::Key, val: &Self::Val) -> (Self::KeyCodec, Self::ValCodec) {
98 let key = bincode::serialize(key).expect("must serialize");
99 (Bytes::from(key), val.clone())
100 }
101
102 fn decode(key: &Self::KeyCodec, val: &Self::ValCodec) -> (Self::Key, Self::Val) {
103 let key = bincode::deserialize(key).expect("must deserialize");
104 (key, val.clone())
105 }
106}
107
108#[derive(Debug, Clone)]
110pub struct ExpressionCacheConfig {
111 pub build_version: Version,
112 pub persist: PersistClient,
113 pub shard_id: ShardId,
114 pub current_ids: BTreeSet<GlobalId>,
115 pub remove_prior_versions: bool,
116 pub compact_shard: bool,
117 pub dyncfgs: ConfigSet,
118}
119
120pub struct ExpressionCache {
122 build_version: Version,
123 durable_cache: DurableCache<ExpressionCodec>,
124}
125
126impl ExpressionCache {
127 pub async fn open(
139 ExpressionCacheConfig {
140 build_version,
141 persist,
142 shard_id,
143 current_ids,
144 remove_prior_versions,
145 compact_shard,
146 dyncfgs,
147 }: ExpressionCacheConfig,
148 ) -> (
149 Self,
150 BTreeMap<GlobalId, LocalExpressions>,
151 BTreeMap<GlobalId, GlobalExpressions>,
152 ) {
153 let durable_cache = DurableCache::new(&persist, shard_id, "expressions").await;
154 let mut cache = Self {
155 build_version,
156 durable_cache,
157 };
158
159 const RETRIES: usize = 100;
160 for _ in 0..RETRIES {
161 match cache
162 .try_open(¤t_ids, remove_prior_versions, compact_shard, &dyncfgs)
163 .await
164 {
165 Ok((local_expressions, global_expressions)) => {
166 return (cache, local_expressions, global_expressions);
167 }
168 Err(err) => debug!("failed to open cache: {err} ... retrying"),
169 }
170 }
171
172 panic!("Unable to open expression cache after {RETRIES} retries");
173 }
174
175 async fn try_open(
176 &mut self,
177 current_ids: &BTreeSet<GlobalId>,
178 remove_prior_versions: bool,
179 compact_shard: bool,
180 dyncfgs: &ConfigSet,
181 ) -> Result<
182 (
183 BTreeMap<GlobalId, LocalExpressions>,
184 BTreeMap<GlobalId, GlobalExpressions>,
185 ),
186 mz_durable_cache::Error,
187 > {
188 let mut keys_to_remove = Vec::new();
189 let mut local_expressions = BTreeMap::new();
190 let mut global_expressions = BTreeMap::new();
191
192 for (key, expressions) in self.durable_cache.entries_local() {
193 let build_version = match key.build_version.parse::<Version>() {
194 Ok(build_version) => build_version,
195 Err(err) => {
196 warn!("unable to parse build version: {key:?}: {err:?}");
197 keys_to_remove.push((key.clone(), None));
198 continue;
199 }
200 };
201 if build_version == self.build_version {
202 match key.expr_type {
204 ExpressionType::Local => {
205 let expressions: LocalExpressions = match bincode::deserialize(expressions)
206 {
207 Ok(expressions) => expressions,
208 Err(err) => {
209 soft_panic_or_log!(
210 "unable to deserialize local expressions: ({key:?}, {expressions:?}): {err:?}"
211 );
212 continue;
213 }
214 };
215 if !current_ids.contains(&key.id) {
217 keys_to_remove.push((key.clone(), None));
218 } else {
219 local_expressions.insert(key.id, expressions);
220 }
221 }
222 ExpressionType::Global => {
223 let expressions: GlobalExpressions = match bincode::deserialize(expressions)
224 {
225 Ok(expressions) => expressions,
226 Err(err) => {
227 soft_panic_or_log!(
228 "unable to deserialize global expressions: ({key:?}, {expressions:?}): {err:?}"
229 );
230 continue;
231 }
232 };
233 let index_dependencies: BTreeSet<_> =
235 expressions.index_imports().cloned().collect();
236 if !current_ids.contains(&key.id)
237 || !index_dependencies.is_subset(current_ids)
238 {
239 keys_to_remove.push((key.clone(), None));
240 } else {
241 global_expressions.insert(key.id, expressions);
242 }
243 }
244 }
245 } else if remove_prior_versions {
246 keys_to_remove.push((key.clone(), None));
248 }
249 }
250
251 let keys_to_remove: Vec<_> = keys_to_remove
252 .iter()
253 .map(|(key, expressions)| (key, expressions.as_ref()))
254 .collect();
255 self.durable_cache.try_set_many(&keys_to_remove).await?;
256
257 if compact_shard {
258 let fuel = EXPRESSION_CACHE_FORCE_COMPACTION_FUEL.handle(dyncfgs);
259 let wait = EXPRESSION_CACHE_FORCE_COMPACTION_WAIT.handle(dyncfgs);
260 self.durable_cache
261 .dangerous_compact_shard(move || fuel.get(), move || wait.get())
262 .await;
263 }
264
265 Ok((local_expressions, global_expressions))
266 }
267
268 async fn update(
274 &mut self,
275 new_local_expressions: Vec<(GlobalId, LocalExpressions)>,
276 new_global_expressions: Vec<(GlobalId, GlobalExpressions)>,
277 invalidate_ids: BTreeSet<GlobalId>,
278 ) {
279 let mut entries = BTreeMap::new();
280 let build_version = self.build_version.to_string();
281 for id in invalidate_ids {
284 entries.insert(
285 CacheKey {
286 id,
287 build_version: build_version.clone(),
288 expr_type: ExpressionType::Local,
289 },
290 None,
291 );
292 entries.insert(
293 CacheKey {
294 id,
295 build_version: build_version.clone(),
296 expr_type: ExpressionType::Global,
297 },
298 None,
299 );
300 }
301 for (id, expressions) in new_local_expressions {
302 let expressions = match bincode::serialize(&expressions) {
303 Ok(expressions) => Bytes::from(expressions),
304 Err(err) => {
305 soft_panic_or_log!(
306 "unable to serialize local expressions: {expressions:?}: {err:?}"
307 );
308 continue;
309 }
310 };
311 entries.insert(
312 CacheKey {
313 id,
314 build_version: build_version.clone(),
315 expr_type: ExpressionType::Local,
316 },
317 Some(expressions),
318 );
319 }
320 for (id, expressions) in new_global_expressions {
321 let expressions = match bincode::serialize(&expressions) {
322 Ok(expressions) => Bytes::from(expressions),
323 Err(err) => {
324 soft_panic_or_log!(
325 "unable to serialize global expressions: {expressions:?}: {err:?}"
326 );
327 continue;
328 }
329 };
330 entries.insert(
331 CacheKey {
332 id,
333 build_version: build_version.clone(),
334 expr_type: ExpressionType::Global,
335 },
336 Some(expressions),
337 );
338 }
339 let entries: Vec<_> = entries
340 .iter()
341 .map(|(key, expressions)| (key, expressions.as_ref()))
342 .collect();
343 self.durable_cache.set_many(&entries).await
344 }
345}
346
347enum CacheOperation {
349 Update {
351 new_local_expressions: Vec<(GlobalId, LocalExpressions)>,
352 new_global_expressions: Vec<(GlobalId, GlobalExpressions)>,
353 invalidate_ids: BTreeSet<GlobalId>,
354 trigger: trigger::Trigger,
355 },
356}
357
358#[derive(Debug, Clone)]
359pub struct ExpressionCacheHandle {
360 tx: mpsc::UnboundedSender<CacheOperation>,
361}
362
363impl ExpressionCacheHandle {
364 pub async fn spawn_expression_cache(
368 config: ExpressionCacheConfig,
369 ) -> (
370 Self,
371 BTreeMap<GlobalId, LocalExpressions>,
372 BTreeMap<GlobalId, GlobalExpressions>,
373 ) {
374 let (mut cache, local_expressions, global_expressions) =
375 ExpressionCache::open(config).await;
376 let (tx, mut rx) = mpsc::unbounded_channel();
377 spawn(|| "expression-cache-task", async move {
378 loop {
379 while let Some(op) = rx.recv().await {
380 match op {
381 CacheOperation::Update {
382 new_local_expressions,
383 new_global_expressions,
384 invalidate_ids,
385 trigger: _trigger,
386 } => {
387 cache
388 .update(
389 new_local_expressions,
390 new_global_expressions,
391 invalidate_ids,
392 )
393 .await
394 }
395 }
396 }
397 }
398 });
399
400 (Self { tx }, local_expressions, global_expressions)
401 }
402
403 pub fn update(
404 &self,
405 new_local_expressions: Vec<(GlobalId, LocalExpressions)>,
406 new_global_expressions: Vec<(GlobalId, GlobalExpressions)>,
407 invalidate_ids: BTreeSet<GlobalId>,
408 ) -> impl Future<Output = ()> + use<> {
409 let (trigger, trigger_rx) = trigger::channel();
410 let op = CacheOperation::Update {
411 new_local_expressions,
412 new_global_expressions,
413 invalidate_ids,
414 trigger,
415 };
416 let _ = self.tx.send(op);
418 trigger_rx
419 }
420}
421
422#[cfg(test)]
423mod tests {
424 use std::collections::{BTreeMap, BTreeSet};
425 use std::marker::PhantomData;
426 use std::sync::Arc;
427 use std::time::{Duration, Instant};
428
429 use bytes::Bytes;
430 use mz_compute_types::dataflows::DataflowDescription;
431 use mz_durable_cache::DurableCacheCodec;
432 use mz_dyncfg::ConfigSet;
433 use mz_expr::OptimizedMirRelationExpr;
434 use mz_ore::test::timeout;
435 use mz_persist_client::PersistClient;
436 use mz_persist_types::ShardId;
437 use mz_repr::GlobalId;
438 use mz_repr::optimize::OptimizerFeatures;
439 use mz_transform::dataflow::DataflowMetainfo;
440 use mz_transform::notice::OptimizerNotice;
441 use proptest::arbitrary::{Arbitrary, any};
442 use proptest::prelude::{BoxedStrategy, ProptestConfig};
443 use proptest::proptest;
444 use proptest::strategy::{Strategy, ValueTree};
445 use proptest::test_runner::{RngAlgorithm, TestRng, TestRunner};
446 use semver::Version;
447 use tracing::info;
448
449 use crate::expr_cache::{
450 CacheKey, ExpressionCacheConfig, ExpressionCacheHandle, ExpressionCodec, GlobalExpressions,
451 LocalExpressions,
452 };
453
454 impl Arbitrary for LocalExpressions {
455 type Parameters = ();
456
457 fn arbitrary_with((): Self::Parameters) -> Self::Strategy {
458 (
459 any::<OptimizedMirRelationExpr>(),
460 any::<OptimizerFeatures>(),
461 )
462 .prop_map(|(local_mir, optimizer_features)| LocalExpressions {
463 local_mir,
464 optimizer_features,
465 })
466 .boxed()
467 }
468
469 type Strategy = BoxedStrategy<Self>;
470 }
471
472 impl Arbitrary for GlobalExpressions {
473 type Parameters = ();
474 fn arbitrary_with((): Self::Parameters) -> Self::Strategy {
475 (
476 any::<DataflowDescription<OptimizedMirRelationExpr>>(),
477 any::<DataflowDescription<mz_compute_types::plan::Plan>>(),
478 any::<DataflowMetainfo<Arc<OptimizerNotice>>>(),
479 any::<OptimizerFeatures>(),
480 )
481 .prop_map(
482 |(global_mir, physical_plan, dataflow_metainfos, optimizer_features)| {
483 GlobalExpressions {
484 global_mir,
485 physical_plan,
486 dataflow_metainfos,
487 optimizer_features,
488 }
489 },
490 )
491 .boxed()
492 }
493
494 type Strategy = BoxedStrategy<Self>;
495 }
496
497 struct ArbitraryTimeout<T: Arbitrary + Send + 'static> {
502 _phantom: PhantomData<T>,
503 }
504
505 impl<T: Arbitrary + Send> ArbitraryTimeout<T> {
506 const GENERATE_ATTEMPTS: u64 = 10;
509 const TIMEOUT_SECS: u64 = 10;
511
512 fn new() -> Self {
513 Self {
514 _phantom: Default::default(),
515 }
516 }
517
518 fn new_tree() -> Box<dyn ValueTree<Value = T>>
519 where
520 T: 'static,
521 {
522 let seed: [u8; 32] = rand::random();
525 let mut test_runner = TestRunner::deterministic();
526 let rng = test_runner.rng();
527 *rng = TestRng::from_seed(RngAlgorithm::ChaCha, &seed);
528 Box::new(T::arbitrary().new_tree(&mut test_runner).expect("valid"))
529 }
530
531 fn generate(&self) -> T {
532 for _ in 0..Self::GENERATE_ATTEMPTS {
533 if let Ok(val) = self.try_generate() {
534 return val;
535 }
536 }
537 panic!("timed out generating a value");
538 }
539
540 fn try_generate(&self) -> Result<T, ()> {
541 match timeout(Duration::from_secs(Self::TIMEOUT_SECS), || {
545 Ok(Self::new_tree().current())
548 }) {
549 Ok(val) => Ok(val),
550 Err(_) => {
551 info!("timed out generating a value");
552 Err(())
553 }
554 }
555 }
556 }
557
558 #[mz_ore::test(tokio::test)]
559 #[cfg_attr(miri, ignore)] async fn expression_cache() {
561 let local_tree: ArbitraryTimeout<LocalExpressions> = ArbitraryTimeout::new();
562 let global_tree: ArbitraryTimeout<GlobalExpressions> = ArbitraryTimeout::new();
563
564 let first_version = Version::new(0, 1, 0);
565 let second_version = Version::new(0, 2, 0);
566 let persist = PersistClient::new_for_tests().await;
567 let shard_id = ShardId::new();
568
569 let mut current_ids = BTreeSet::new();
570 let mut remove_prior_versions = false;
571 let compact_shard = false;
573 let dyncfgs = &mz_persist_client::cfg::all_dyncfgs(ConfigSet::default());
574
575 let mut next_id = 0;
576
577 let (mut local_exps, mut global_exps) = {
578 let (cache, local_exprs, global_exprs) =
580 ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
581 build_version: first_version.clone(),
582 persist: persist.clone(),
583 shard_id,
584 current_ids: current_ids.clone(),
585 remove_prior_versions,
586 compact_shard,
587 dyncfgs: dyncfgs.clone(),
588 })
589 .await;
590 assert_eq!(local_exprs, BTreeMap::new(), "new cache should be empty");
591 assert_eq!(global_exprs, BTreeMap::new(), "new cache should be empty");
592
593 let mut local_exps = BTreeMap::new();
595 let mut global_exps = BTreeMap::new();
596 for _ in 0..4 {
597 let id = GlobalId::User(next_id);
598 let start = Instant::now();
599 let local_exp = local_tree.generate();
600 let global_exp = global_tree.generate();
601 info!("Generating exps took: {:?}", start.elapsed());
602
603 cache
604 .update(
605 vec![(id, local_exp.clone())],
606 vec![(id, global_exp.clone())],
607 BTreeSet::new(),
608 )
609 .await;
610
611 current_ids.insert(id);
612 current_ids.extend(global_exp.index_imports());
613 local_exps.insert(id, local_exp);
614 global_exps.insert(id, global_exp);
615
616 next_id += 1;
617 }
618 (local_exps, global_exps)
619 };
620
621 {
622 let (_cache, local_entries, global_entries) =
624 ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
625 build_version: first_version.clone(),
626 persist: persist.clone(),
627 shard_id,
628 current_ids: current_ids.clone(),
629 remove_prior_versions,
630 compact_shard,
631 dyncfgs: dyncfgs.clone(),
632 })
633 .await;
634 assert_eq!(
635 local_entries, local_exps,
636 "local expression with non-matching optimizer features should be removed during reconciliation"
637 );
638 assert_eq!(
639 global_entries, global_exps,
640 "global expression with non-matching optimizer features should be removed during reconciliation"
641 );
642 }
643
644 {
645 let id_to_remove = local_exps.keys().next().expect("not empty").clone();
647 current_ids.remove(&id_to_remove);
648 let _removed_local_exp = local_exps.remove(&id_to_remove);
649 let _removed_global_exp = global_exps.remove(&id_to_remove);
650
651 let (_cache, local_entries, global_entries) =
653 ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
654 build_version: first_version.clone(),
655 persist: persist.clone(),
656 shard_id,
657 current_ids: current_ids.clone(),
658 remove_prior_versions,
659 compact_shard,
660 dyncfgs: dyncfgs.clone(),
661 })
662 .await;
663 assert_eq!(
664 local_entries, local_exps,
665 "dropped local objects should be removed during reconciliation"
666 );
667 assert_eq!(
668 global_entries, global_exps,
669 "dropped global objects should be removed during reconciliation"
670 );
671 }
672
673 {
674 let global_exp_to_remove = global_exps.keys().next().expect("not empty").clone();
676 let removed_global_exp = global_exps
677 .remove(&global_exp_to_remove)
678 .expect("known to exist");
679 let dependency_to_remove = removed_global_exp
680 .index_imports()
681 .next()
682 .expect("arbitrary impl always makes non-empty vecs");
683 current_ids.remove(dependency_to_remove);
684
685 let _removed_local_exp = local_exps.remove(dependency_to_remove);
687 let _removed_global_exp = global_exps.remove(dependency_to_remove);
688 global_exps.retain(|_, exp| {
690 let index_imports: BTreeSet<_> = exp.index_imports().collect();
691 !index_imports.contains(&dependency_to_remove)
692 });
693
694 let (_cache, local_entries, global_entries) =
696 ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
697 build_version: first_version.clone(),
698 persist: persist.clone(),
699 shard_id,
700 current_ids: current_ids.clone(),
701 remove_prior_versions,
702 compact_shard,
703 dyncfgs: dyncfgs.clone(),
704 })
705 .await;
706 assert_eq!(
707 local_entries, local_exps,
708 "dropped object dependencies should NOT remove local expressions"
709 );
710 assert_eq!(
711 global_entries, global_exps,
712 "dropped object dependencies should remove global expressions"
713 );
714 }
715
716 let (new_gen_local_exps, new_gen_global_exps) = {
717 let (cache, local_entries, global_entries) =
719 ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
720 build_version: second_version.clone(),
721 persist: persist.clone(),
722 shard_id,
723 current_ids: current_ids.clone(),
724 remove_prior_versions,
725 compact_shard,
726 dyncfgs: dyncfgs.clone(),
727 })
728 .await;
729 assert_eq!(
730 local_entries,
731 BTreeMap::new(),
732 "new version should be empty"
733 );
734 assert_eq!(
735 global_entries,
736 BTreeMap::new(),
737 "new version should be empty"
738 );
739
740 let mut local_exps = BTreeMap::new();
742 let mut global_exps = BTreeMap::new();
743 for _ in 0..2 {
744 let id = GlobalId::User(next_id);
745 let start = Instant::now();
746 let local_exp = local_tree.generate();
747 let global_exp = global_tree.generate();
748 info!("Generating exps took: {:?}", start.elapsed());
749
750 cache
751 .update(
752 vec![(id, local_exp.clone())],
753 vec![(id, global_exp.clone())],
754 BTreeSet::new(),
755 )
756 .await;
757
758 current_ids.insert(id);
759 current_ids.extend(global_exp.index_imports());
760 local_exps.insert(id, local_exp);
761 global_exps.insert(id, global_exp);
762
763 next_id += 1;
764 }
765 (local_exps, global_exps)
766 };
767
768 {
769 let (_cache, local_entries, global_entries) =
771 ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
772 build_version: first_version.clone(),
773 persist: persist.clone(),
774 shard_id,
775 current_ids: current_ids.clone(),
776 remove_prior_versions,
777 compact_shard,
778 dyncfgs: dyncfgs.clone(),
779 })
780 .await;
781 assert_eq!(
782 local_entries, local_exps,
783 "Previous version local expressions should still exist"
784 );
785 assert_eq!(
786 global_entries, global_exps,
787 "Previous version global expressions should still exist"
788 );
789 }
790
791 {
792 remove_prior_versions = true;
794 let (_cache, local_entries, global_entries) =
795 ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
796 build_version: second_version.clone(),
797 persist: persist.clone(),
798 shard_id,
799 current_ids: current_ids.clone(),
800 remove_prior_versions,
801 compact_shard,
802 dyncfgs: dyncfgs.clone(),
803 })
804 .await;
805 assert_eq!(
806 local_entries, new_gen_local_exps,
807 "new version local expressions should be persisted"
808 );
809 assert_eq!(
810 global_entries, new_gen_global_exps,
811 "new version global expressions should be persisted"
812 );
813 }
814
815 {
816 let (_cache, local_entries, global_entries) =
818 ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
819 build_version: first_version.clone(),
820 persist: persist.clone(),
821 shard_id,
822 current_ids: current_ids.clone(),
823 remove_prior_versions,
824 compact_shard,
825 dyncfgs: dyncfgs.clone(),
826 })
827 .await;
828 assert_eq!(
829 local_entries,
830 BTreeMap::new(),
831 "Previous version local expressions should be cleared"
832 );
833 assert_eq!(
834 global_entries,
835 BTreeMap::new(),
836 "Previous version global expressions should be cleared"
837 );
838 }
839 }
840
841 proptest! {
842 #![proptest_config(ProptestConfig::with_cases(1))]
845
846 #[mz_ore::test]
847 #[cfg_attr(miri, ignore)]
848 fn local_expr_cache_roundtrip(key in any::<CacheKey>()) {
849 let local_tree: ArbitraryTimeout<LocalExpressions> = ArbitraryTimeout::new();
850 let val = local_tree.generate();
851
852 let bincode_val = Bytes::from(bincode::serialize(&val).expect("must serialize"));
853 let (encoded_key, encoded_val) = ExpressionCodec::encode(&key, &bincode_val);
854 let (decoded_key, decoded_val) = ExpressionCodec::decode(&encoded_key, &encoded_val);
855 let decoded_val: LocalExpressions = bincode::deserialize(&decoded_val).expect("local expressions should roundtrip");
856
857 assert_eq!(key, decoded_key);
858 assert_eq!(val, decoded_val);
859 }
860
861 #[mz_ore::test]
862 #[cfg_attr(miri, ignore)]
863 fn global_expr_cache_roundtrip(key in any::<CacheKey>()) {
864 let global_tree: ArbitraryTimeout<GlobalExpressions> = ArbitraryTimeout::new();
865 let val = global_tree.generate();
866
867 let bincode_val = Bytes::from(bincode::serialize(&val).expect("must serialize"));
868 let (encoded_key, encoded_val) = ExpressionCodec::encode(&key, &bincode_val);
869 let (decoded_key, decoded_val) = ExpressionCodec::decode(&encoded_key, &encoded_val);
870 let decoded_val: GlobalExpressions = bincode::deserialize(&decoded_val).expect("global expressions should roundtrip");
871
872 assert_eq!(key, decoded_key);
873 assert_eq!(val, decoded_val);
874 }
875 }
876}