1use anyhow::anyhow;
11use differential_dataflow::lattice::Lattice;
12use maplit::btreemap;
13use maplit::btreeset;
14use mz_adapter_types::compaction::CompactionWindow;
15use mz_catalog::memory::objects::{CatalogItem, MaterializedView};
16use mz_expr::{CollectionPlan, ResultSpec};
17use mz_ore::collections::CollectionExt;
18use mz_ore::instrument;
19use mz_ore::soft_panic_or_log;
20use mz_repr::explain::{ExprHumanizerExt, TransientItem};
21use mz_repr::optimize::OptimizerFeatures;
22use mz_repr::optimize::OverrideFrom;
23use mz_repr::refresh_schedule::RefreshSchedule;
24use mz_repr::{CatalogItemId, Datum, RelationVersion, Row, VersionedRelationDesc};
25use mz_sql::ast::ExplainStage;
26use mz_sql::catalog::CatalogError;
27use mz_sql::names::ResolvedIds;
28use mz_sql::plan;
29use mz_sql::session::metadata::SessionMetadata;
30use mz_sql_parser::ast;
31use mz_sql_parser::ast::display::AstDisplay;
32use mz_storage_client::controller::CollectionDescription;
33use std::collections::BTreeMap;
34use timely::progress::Antichain;
35use tracing::Span;
36
37use crate::ReadHolds;
38use crate::catalog::CatalogState;
39use crate::command::ExecuteResponse;
40use crate::coord::sequencer::inner::return_if_err;
41use crate::coord::{
42 Coordinator, CreateMaterializedViewExplain, CreateMaterializedViewFinish,
43 CreateMaterializedViewOptimize, CreateMaterializedViewStage, ExplainContext,
44 ExplainPlanContext, Message, PlanValidity, StageResult, Staged,
45};
46use crate::error::AdapterError;
47use crate::explain::explain_dataflow;
48use crate::explain::explain_plan;
49use crate::explain::optimizer_trace::OptimizerTrace;
50use crate::optimize::dataflows::dataflow_import_id_bundle;
51use crate::optimize::{self, Optimize};
52use crate::session::Session;
53use crate::util::ResultExt;
54use crate::{AdapterNotice, CollectionIdBundle, ExecuteContext, TimestampProvider, catalog};
55
56impl Staged for CreateMaterializedViewStage {
57 type Ctx = ExecuteContext;
58
59 fn validity(&mut self) -> &mut PlanValidity {
60 match self {
61 Self::Optimize(stage) => &mut stage.validity,
62 Self::Finish(stage) => &mut stage.validity,
63 Self::Explain(stage) => &mut stage.validity,
64 }
65 }
66
67 async fn stage(
68 self,
69 coord: &mut Coordinator,
70 ctx: &mut ExecuteContext,
71 ) -> Result<StageResult<Box<Self>>, AdapterError> {
72 match self {
73 CreateMaterializedViewStage::Optimize(stage) => {
74 coord.create_materialized_view_optimize(stage).await
75 }
76 CreateMaterializedViewStage::Finish(stage) => {
77 coord.create_materialized_view_finish(ctx, stage).await
78 }
79 CreateMaterializedViewStage::Explain(stage) => {
80 coord
81 .create_materialized_view_explain(ctx.session(), stage)
82 .await
83 }
84 }
85 }
86
87 fn message(self, ctx: ExecuteContext, span: Span) -> Message {
88 Message::CreateMaterializedViewStageReady {
89 ctx,
90 span,
91 stage: self,
92 }
93 }
94
95 fn cancel_enabled(&self) -> bool {
96 true
97 }
98}
99
100impl Coordinator {
101 #[instrument]
102 pub(crate) async fn sequence_create_materialized_view(
103 &mut self,
104 ctx: ExecuteContext,
105 plan: plan::CreateMaterializedViewPlan,
106 resolved_ids: ResolvedIds,
107 ) {
108 let stage = return_if_err!(
109 self.create_materialized_view_validate(
110 ctx.session(),
111 plan,
112 resolved_ids,
113 ExplainContext::None
114 ),
115 ctx
116 );
117 self.sequence_staged(ctx, Span::current(), stage).await;
118 }
119
120 #[instrument]
121 pub(crate) async fn explain_create_materialized_view(
122 &mut self,
123 ctx: ExecuteContext,
124 plan::ExplainPlanPlan {
125 stage,
126 format,
127 config,
128 explainee,
129 }: plan::ExplainPlanPlan,
130 ) {
131 let plan::Explainee::Statement(stmt) = explainee else {
132 unreachable!()
135 };
136 let plan::ExplaineeStatement::CreateMaterializedView { broken, plan } = stmt else {
137 unreachable!()
140 };
141
142 let optimizer_trace = OptimizerTrace::new(stage.paths());
145
146 let resolved_ids = ResolvedIds::empty();
148
149 let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
150 broken,
151 config,
152 format,
153 stage,
154 replan: None,
155 desc: None,
156 optimizer_trace,
157 });
158 let stage = return_if_err!(
159 self.create_materialized_view_validate(ctx.session(), plan, resolved_ids, explain_ctx),
160 ctx
161 );
162 self.sequence_staged(ctx, Span::current(), stage).await;
163 }
164
165 #[instrument]
166 pub(crate) async fn explain_replan_materialized_view(
167 &mut self,
168 ctx: ExecuteContext,
169 plan::ExplainPlanPlan {
170 stage,
171 format,
172 config,
173 explainee,
174 }: plan::ExplainPlanPlan,
175 ) {
176 let plan::Explainee::ReplanMaterializedView(id) = explainee else {
177 unreachable!() };
179 let CatalogItem::MaterializedView(item) = self.catalog().get_entry(&id).item() else {
180 unreachable!() };
182 let gid = item.global_id_writes();
183
184 let create_sql = item.create_sql.clone();
185 let plan_result = self
186 .catalog_mut()
187 .deserialize_plan_with_enable_for_item_parsing(&create_sql, true);
188 let (plan, resolved_ids) = return_if_err!(plan_result, ctx);
189
190 let plan::Plan::CreateMaterializedView(plan) = plan else {
191 unreachable!() };
193
194 let broken = false;
197
198 let optimizer_trace = OptimizerTrace::new(stage.paths());
201
202 let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
203 broken,
204 config,
205 format,
206 stage,
207 replan: Some(gid),
208 desc: None,
209 optimizer_trace,
210 });
211 let stage = return_if_err!(
212 self.create_materialized_view_validate(ctx.session(), plan, resolved_ids, explain_ctx,),
213 ctx
214 );
215 self.sequence_staged(ctx, Span::current(), stage).await;
216 }
217
218 #[instrument]
219 pub(super) fn explain_materialized_view(
220 &self,
221 ctx: &ExecuteContext,
222 plan::ExplainPlanPlan {
223 stage,
224 format,
225 config,
226 explainee,
227 }: plan::ExplainPlanPlan,
228 ) -> Result<ExecuteResponse, AdapterError> {
229 let plan::Explainee::MaterializedView(id) = explainee else {
230 unreachable!() };
232 let CatalogItem::MaterializedView(view) = self.catalog().get_entry(&id).item() else {
233 unreachable!() };
235 let gid = view.global_id_writes();
236
237 let Some(dataflow_metainfo) = self.catalog().try_get_dataflow_metainfo(&gid) else {
238 if !id.is_system() {
239 tracing::error!(
240 "cannot find dataflow metainformation for materialized view {id} in catalog"
241 );
242 }
243 coord_bail!(
244 "cannot find dataflow metainformation for materialized view {id} in catalog"
245 );
246 };
247
248 let target_cluster = self.catalog().get_cluster(view.cluster_id);
249
250 let features = OptimizerFeatures::from(self.catalog().system_config())
251 .override_from(&target_cluster.config.features())
252 .override_from(&config.features);
253
254 let cardinality_stats = BTreeMap::new();
255
256 let explain = match stage {
257 ExplainStage::RawPlan => explain_plan(
258 view.raw_expr.as_ref().clone(),
259 format,
260 &config,
261 &features,
262 &self.catalog().for_session(ctx.session()),
263 cardinality_stats,
264 Some(target_cluster.name.as_str()),
265 )?,
266 ExplainStage::LocalPlan => explain_plan(
267 view.locally_optimized_expr.as_inner().clone(),
268 format,
269 &config,
270 &features,
271 &self.catalog().for_session(ctx.session()),
272 cardinality_stats,
273 Some(target_cluster.name.as_str()),
274 )?,
275 ExplainStage::GlobalPlan => {
276 let Some(plan) = self.catalog().try_get_optimized_plan(&gid).cloned() else {
277 tracing::error!("cannot find {stage} for materialized view {id} in catalog");
278 coord_bail!("cannot find {stage} for materialized view in catalog");
279 };
280 explain_dataflow(
281 plan,
282 format,
283 &config,
284 &features,
285 &self.catalog().for_session(ctx.session()),
286 cardinality_stats,
287 Some(target_cluster.name.as_str()),
288 dataflow_metainfo,
289 )?
290 }
291 ExplainStage::PhysicalPlan => {
292 let Some(plan) = self.catalog().try_get_physical_plan(&gid).cloned() else {
293 tracing::error!("cannot find {stage} for materialized view {id} in catalog",);
294 coord_bail!("cannot find {stage} for materialized view in catalog");
295 };
296 explain_dataflow(
297 plan,
298 format,
299 &config,
300 &features,
301 &self.catalog().for_session(ctx.session()),
302 cardinality_stats,
303 Some(target_cluster.name.as_str()),
304 dataflow_metainfo,
305 )?
306 }
307 _ => {
308 coord_bail!("cannot EXPLAIN {} FOR MATERIALIZED VIEW", stage);
309 }
310 };
311
312 let row = Row::pack_slice(&[Datum::from(explain.as_str())]);
313
314 Ok(Self::send_immediate_rows(row))
315 }
316
317 #[instrument]
318 fn create_materialized_view_validate(
319 &self,
320 session: &Session,
321 plan: plan::CreateMaterializedViewPlan,
322 resolved_ids: ResolvedIds,
323 explain_ctx: ExplainContext,
326 ) -> Result<CreateMaterializedViewStage, AdapterError> {
327 let plan::CreateMaterializedViewPlan {
328 materialized_view:
329 plan::MaterializedView {
330 expr,
331 cluster_id,
332 refresh_schedule,
333 ..
334 },
335 ambiguous_columns,
336 ..
337 } = &plan;
338
339 let expr_depends_on = expr.depends_on();
344 self.catalog()
345 .validate_timeline_context(expr_depends_on.iter().copied())?;
346 self.validate_system_column_references(*ambiguous_columns, &expr_depends_on)?;
347 let log_names = expr_depends_on
350 .iter()
351 .map(|gid| self.catalog.resolve_item_id(gid))
352 .flat_map(|item_id| self.catalog().introspection_dependencies(item_id))
353 .map(|item_id| self.catalog().get_entry(&item_id).name().item.clone())
354 .collect::<Vec<_>>();
355 if !log_names.is_empty() {
356 return Err(AdapterError::InvalidLogDependency {
357 object_type: "materialized view".into(),
358 log_names,
359 });
360 }
361
362 let validity =
363 PlanValidity::require_transient_revision(self.catalog().transient_revision());
364
365 if let Some(refresh_schedule) = refresh_schedule {
367 if !refresh_schedule.ats.is_empty() && matches!(explain_ctx, ExplainContext::None) {
368 let read_holds = self
371 .txn_read_holds
372 .get(session.conn_id())
373 .expect("purification acquired read holds if there are REFRESH ATs");
374 let least_valid_read = read_holds.least_valid_read();
375 for refresh_at_ts in &refresh_schedule.ats {
376 if !least_valid_read.less_equal(refresh_at_ts) {
377 return Err(AdapterError::InputNotReadableAtRefreshAtTime(
378 *refresh_at_ts,
379 least_valid_read,
380 ));
381 }
382 }
383 let ids = self
386 .index_oracle(*cluster_id)
387 .sufficient_collections(resolved_ids.collections().copied());
388 if !ids.difference(&read_holds.id_bundle()).is_empty() {
389 return Err(AdapterError::ChangedPlan(
390 "the set of possible inputs changed during the creation of the \
391 materialized view"
392 .to_string(),
393 ));
394 }
395 }
396 }
397
398 Ok(CreateMaterializedViewStage::Optimize(
399 CreateMaterializedViewOptimize {
400 validity,
401 plan,
402 resolved_ids,
403 explain_ctx,
404 },
405 ))
406 }
407
408 #[instrument]
409 async fn create_materialized_view_optimize(
410 &mut self,
411 CreateMaterializedViewOptimize {
412 validity,
413 plan,
414 resolved_ids,
415 explain_ctx,
416 }: CreateMaterializedViewOptimize,
417 ) -> Result<StageResult<Box<CreateMaterializedViewStage>>, AdapterError> {
418 let plan::CreateMaterializedViewPlan {
419 name,
420 materialized_view:
421 plan::MaterializedView {
422 column_names,
423 cluster_id,
424 non_null_assertions,
425 refresh_schedule,
426 ..
427 },
428 ..
429 } = &plan;
430
431 let compute_instance = self
433 .instance_snapshot(*cluster_id)
434 .expect("compute instance does not exist");
435 let (item_id, global_id) = if let ExplainContext::None = explain_ctx {
436 self.allocate_user_id().await?
437 } else {
438 self.allocate_transient_id()
439 };
440
441 let (_, view_id) = self.allocate_transient_id();
442 let debug_name = self.catalog().resolve_full_name(name, None).to_string();
443 let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config())
444 .override_from(&self.catalog.get_cluster(*cluster_id).config.features())
445 .override_from(&explain_ctx);
446 let optimizer_features = optimizer_config.features.clone();
447
448 let mut optimizer = optimize::materialized_view::Optimizer::new(
450 self.owned_catalog().as_optimizer_catalog(),
451 compute_instance,
452 global_id,
453 view_id,
454 column_names.clone(),
455 non_null_assertions.clone(),
456 refresh_schedule.clone(),
457 debug_name,
458 optimizer_config,
459 self.optimizer_metrics(),
460 );
461
462 let span = Span::current();
463 Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
464 || "optimize create materialized view",
465 move || {
466 span.in_scope(|| {
467 let mut pipeline = || -> Result<(
468 optimize::materialized_view::LocalMirPlan,
469 optimize::materialized_view::GlobalMirPlan,
470 optimize::materialized_view::GlobalLirPlan,
471 ), AdapterError> {
472 let _dispatch_guard = explain_ctx.dispatch_guard();
473
474 let raw_expr = plan.materialized_view.expr.clone();
475
476 let local_mir_plan = optimizer.catch_unwind_optimize(raw_expr)?;
478 let global_mir_plan =
479 optimizer.catch_unwind_optimize(local_mir_plan.clone())?;
480 let global_lir_plan =
482 optimizer.catch_unwind_optimize(global_mir_plan.clone())?;
483
484 Ok((local_mir_plan, global_mir_plan, global_lir_plan))
485 };
486
487 let stage = match pipeline() {
488 Ok((local_mir_plan, global_mir_plan, global_lir_plan)) => {
489 if let ExplainContext::Plan(explain_ctx) = explain_ctx {
490 let (_, df_meta) = global_lir_plan.unapply();
491 CreateMaterializedViewStage::Explain(
492 CreateMaterializedViewExplain {
493 validity,
494 global_id,
495 plan,
496 df_meta,
497 explain_ctx,
498 },
499 )
500 } else {
501 CreateMaterializedViewStage::Finish(CreateMaterializedViewFinish {
502 item_id,
503 global_id,
504 validity,
505 plan,
506 resolved_ids,
507 local_mir_plan,
508 global_mir_plan,
509 global_lir_plan,
510 optimizer_features,
511 })
512 }
513 }
514 Err(err) => {
517 let ExplainContext::Plan(explain_ctx) = explain_ctx else {
518 return Err(err);
520 };
521
522 if explain_ctx.broken {
523 tracing::error!("error while handling EXPLAIN statement: {}", err);
527 CreateMaterializedViewStage::Explain(
528 CreateMaterializedViewExplain {
529 global_id,
530 validity,
531 plan,
532 df_meta: Default::default(),
533 explain_ctx,
534 },
535 )
536 } else {
537 return Err(err);
539 }
540 }
541 };
542
543 Ok(Box::new(stage))
544 })
545 },
546 )))
547 }
548
549 #[instrument]
550 async fn create_materialized_view_finish(
551 &mut self,
552 ctx: &mut ExecuteContext,
553 stage: CreateMaterializedViewFinish,
554 ) -> Result<StageResult<Box<CreateMaterializedViewStage>>, AdapterError> {
555 let CreateMaterializedViewFinish {
556 item_id,
557 global_id,
558 plan:
559 plan::CreateMaterializedViewPlan {
560 name,
561 materialized_view:
562 plan::MaterializedView {
563 mut create_sql,
564 expr: raw_expr,
565 column_names,
566 dependencies,
567 replacement_target,
568 cluster_id,
569 target_replica,
570 non_null_assertions,
571 compaction_window,
572 refresh_schedule,
573 ..
574 },
575 drop_ids,
576 if_not_exists,
577 ..
578 },
579 resolved_ids,
580 local_mir_plan,
581 global_mir_plan,
582 global_lir_plan,
583 optimizer_features,
584 ..
585 } = stage;
586
587 if let Some(target_id) = replacement_target {
589 let Some(target) = self.catalog().get_entry(&target_id).materialized_view() else {
590 return Err(AdapterError::internal(
591 "create materialized view",
592 "replacement target not a materialized view",
593 ));
594 };
595
596 let schema_diff = target.desc.latest().diff(global_lir_plan.desc());
598 if !schema_diff.is_empty() {
599 return Err(AdapterError::ReplacementSchemaMismatch(schema_diff));
600 }
601 }
602
603 let id_bundle = dataflow_import_id_bundle(global_lir_plan.df_desc(), cluster_id);
605
606 let read_holds_owned;
607 let read_holds = if let Some(txn_reads) = self.txn_read_holds.get(ctx.session().conn_id()) {
608 txn_reads
612 } else {
613 read_holds_owned = self.acquire_read_holds(&id_bundle);
616 &read_holds_owned
617 };
618
619 let (dataflow_as_of, storage_as_of, until) =
620 self.select_timestamps(id_bundle, refresh_schedule.as_ref(), read_holds)?;
621
622 tracing::info!(
623 dataflow_as_of = ?dataflow_as_of,
624 storage_as_of = ?storage_as_of,
625 until = ?until,
626 "materialized view timestamp selection",
627 );
628
629 let initial_as_of = storage_as_of.clone();
630
631 if let Some(storage_as_of_ts) = storage_as_of.as_option() {
636 let stmt = mz_sql::parse::parse(&create_sql)
637 .map_err(|_| {
638 AdapterError::internal(
639 "create materialized view",
640 "original SQL should roundtrip",
641 )
642 })?
643 .into_element()
644 .ast;
645 let ast::Statement::CreateMaterializedView(mut stmt) = stmt else {
646 panic!("unexpected statement type");
647 };
648 stmt.as_of = Some(storage_as_of_ts.into());
649 create_sql = stmt.to_ast_string_stable();
650 }
651
652 let desc = VersionedRelationDesc::new(global_lir_plan.desc().clone());
653 let collections = [(RelationVersion::root(), global_id)].into_iter().collect();
654
655 let local_mir_for_cache = local_mir_plan.expr();
656
657 let ops = vec![
658 catalog::Op::DropObjects(
659 drop_ids
660 .into_iter()
661 .map(catalog::DropObjectInfo::Item)
662 .collect(),
663 ),
664 catalog::Op::CreateItem {
665 id: item_id,
666 name: name.clone(),
667 item: CatalogItem::MaterializedView(MaterializedView {
668 create_sql,
669 raw_expr: raw_expr.into(),
670 locally_optimized_expr: local_mir_plan.expr().into(),
671 desc,
672 collections,
673 resolved_ids,
674 dependencies,
675 replacement_target,
676 cluster_id,
677 target_replica,
678 non_null_assertions,
679 custom_logical_compaction_window: compaction_window,
680 refresh_schedule: refresh_schedule.clone(),
681 initial_as_of: Some(initial_as_of.clone()),
682 optimized_plan: None,
683 physical_plan: None,
684 dataflow_metainfo: None,
685 }),
686 owner_id: *ctx.session().current_role_id(),
687 },
688 ];
689
690 let notice_ids = std::iter::repeat_with(|| self.allocate_transient_id())
692 .map(|(_item_id, global_id)| global_id)
693 .take(global_lir_plan.df_meta().optimizer_notices.len())
694 .collect::<Vec<_>>();
695
696 let output_desc = global_lir_plan.desc().clone();
709 let (mut df_desc, raw_df_meta) = global_lir_plan.unapply();
710 let df_meta = {
711 let system_catalog = self.catalog().for_system_session();
712 let full_name = self.catalog().resolve_full_name(&name, None);
713 let transient_items = btreemap! {
714 global_id => TransientItem::new(
715 Some(full_name.into_parts()),
716 Some(column_names.iter().map(|c| c.to_string()).collect()),
717 )
718 };
719 let humanizer = ExprHumanizerExt::new(transient_items, &system_catalog);
720 CatalogState::render_notices_core(
721 &humanizer,
722 (self.catalog().config().now)(),
723 &raw_df_meta,
724 notice_ids,
725 Some(global_id),
726 )
727 };
728
729 self.catalog()
734 .cache_expressions(
735 global_id,
736 Some(local_mir_for_cache),
737 global_mir_plan.df_desc().clone(),
738 df_desc.clone(),
739 df_meta.clone(),
740 optimizer_features,
741 )
742 .await;
743
744 let transact_result = self
745 .catalog_transact_with_side_effects(Some(ctx), ops, move |coord, _ctx| {
746 Box::pin(async move {
747 coord
749 .catalog_mut()
750 .set_optimized_plan(global_id, global_mir_plan.df_desc().clone());
751 coord
752 .catalog_mut()
753 .set_physical_plan(global_id, df_desc.clone());
754
755 let notice_builtin_updates_fut =
756 coord.persist_dataflow_metainfo(df_meta, global_id).await;
757
758 df_desc.set_as_of(dataflow_as_of.clone());
759 df_desc.set_initial_as_of(initial_as_of);
760 df_desc.until = until;
761
762 let storage_metadata = coord.catalog.state().storage_metadata();
763
764 let mut collection_desc =
765 CollectionDescription::for_other(output_desc, Some(storage_as_of));
766 let mut allow_writes = true;
767
768 if let Some(target_id) = replacement_target {
771 let target_gid = coord.catalog.get_entry(&target_id).latest_global_id();
772 collection_desc.primary = Some(target_gid);
773 allow_writes = false;
774 }
775
776 coord
778 .controller
779 .storage
780 .create_collections(
781 storage_metadata,
782 None,
783 vec![(global_id, collection_desc)],
784 )
785 .await
786 .unwrap_or_terminate("cannot fail to append");
787
788 coord
789 .initialize_storage_read_policies(
790 btreeset![item_id],
791 compaction_window.unwrap_or(CompactionWindow::Default),
792 )
793 .await;
794
795 coord
796 .ship_dataflow_and_notice_builtin_table_updates(
797 df_desc,
798 cluster_id,
799 notice_builtin_updates_fut,
800 target_replica,
801 )
802 .await;
803
804 if allow_writes {
805 coord.allow_writes(cluster_id, global_id);
806 }
807 })
808 })
809 .await;
810
811 match transact_result {
812 Ok(_) => {
813 self.emit_raw_optimizer_notices_to_user(ctx, &raw_df_meta.optimizer_notices);
818 Ok(ExecuteResponse::CreatedMaterializedView)
819 }
820 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
821 kind:
822 mz_catalog::memory::error::ErrorKind::Sql(
823 CatalogError::ItemAlreadyExists(_, _),
824 ),
825 })) if if_not_exists => {
826 ctx.session()
827 .add_notice(AdapterNotice::ObjectAlreadyExists {
828 name: name.item,
829 ty: "materialized view",
830 });
831 Ok(ExecuteResponse::CreatedMaterializedView)
832 }
833 Err(err) => Err(err),
834 }
835 .map(StageResult::Response)
836 }
837
838 fn select_timestamps(
841 &self,
842 id_bundle: CollectionIdBundle,
843 refresh_schedule: Option<&RefreshSchedule>,
844 read_holds: &ReadHolds,
845 ) -> Result<
846 (
847 Antichain<mz_repr::Timestamp>,
848 Antichain<mz_repr::Timestamp>,
849 Antichain<mz_repr::Timestamp>,
850 ),
851 AdapterError,
852 > {
853 assert!(
854 id_bundle.difference(&read_holds.id_bundle()).is_empty(),
855 "we must have read holds for all involved collections"
856 );
857
858 let least_valid_read = read_holds.least_valid_read();
861 let mut dataflow_as_of = least_valid_read.clone();
862 let mut storage_as_of = least_valid_read.clone();
863
864 if let Some(refresh_schedule) = &refresh_schedule {
874 if let Some(least_valid_read_ts) = least_valid_read.as_option() {
875 if let Some(first_refresh_ts) =
876 refresh_schedule.round_up_timestamp(*least_valid_read_ts)
877 {
878 storage_as_of = Antichain::from_elem(first_refresh_ts);
879 dataflow_as_of.join_assign(
880 &self
881 .greatest_available_read(&id_bundle)
882 .meet(&storage_as_of),
883 );
884 } else {
885 let last_refresh = refresh_schedule.last_refresh().expect(
886 "if round_up_timestamp returned None, then there should be a last refresh",
887 );
888
889 return Err(AdapterError::MaterializedViewWouldNeverRefresh(
890 last_refresh,
891 *least_valid_read_ts,
892 ));
893 }
894 } else {
895 soft_panic_or_log!("creating a materialized view with an empty `as_of`");
897 }
898 }
899
900 let until_ts = refresh_schedule
904 .and_then(|s| s.last_refresh())
905 .and_then(|r| r.try_step_forward());
906 let until = Antichain::from_iter(until_ts);
907
908 Ok((dataflow_as_of, storage_as_of, until))
909 }
910
911 #[instrument]
912 async fn create_materialized_view_explain(
913 &self,
914 session: &Session,
915 CreateMaterializedViewExplain {
916 global_id,
917 plan:
918 plan::CreateMaterializedViewPlan {
919 name,
920 materialized_view:
921 plan::MaterializedView {
922 column_names,
923 cluster_id,
924 ..
925 },
926 ..
927 },
928 df_meta,
929 explain_ctx:
930 ExplainPlanContext {
931 config,
932 format,
933 stage,
934 optimizer_trace,
935 ..
936 },
937 ..
938 }: CreateMaterializedViewExplain,
939 ) -> Result<StageResult<Box<CreateMaterializedViewStage>>, AdapterError> {
940 let session_catalog = self.catalog().for_session(session);
941 let expr_humanizer = {
942 let full_name = self.catalog().resolve_full_name(&name, None);
943 let transient_items = btreemap! {
944 global_id => TransientItem::new(
945 Some(full_name.into_parts()),
946 Some(column_names.iter().map(|c| c.to_string()).collect()),
947 )
948 };
949 ExprHumanizerExt::new(transient_items, &session_catalog)
950 };
951
952 let target_cluster = self.catalog().get_cluster(cluster_id);
953
954 let features = OptimizerFeatures::from(self.catalog().system_config())
955 .override_from(&target_cluster.config.features())
956 .override_from(&config.features);
957
958 let rows = optimizer_trace
959 .into_rows(
960 format,
961 &config,
962 &features,
963 &expr_humanizer,
964 None,
965 Some(target_cluster),
966 df_meta,
967 stage,
968 plan::ExplaineeStatementKind::CreateMaterializedView,
969 None,
970 )
971 .await?;
972
973 Ok(StageResult::Response(Self::send_immediate_rows(rows)))
974 }
975
976 pub(crate) async fn explain_pushdown_materialized_view(
977 &self,
978 ctx: ExecuteContext,
979 item_id: CatalogItemId,
980 ) {
981 let CatalogItem::MaterializedView(mview) = self.catalog().get_entry(&item_id).item() else {
982 unreachable!() };
984 let gid = mview.global_id_writes();
985 let mview = mview.clone();
986
987 let Some(plan) = self.catalog().try_get_physical_plan(&gid).cloned() else {
988 let msg = format!("cannot find plan for materialized view {item_id} in catalog");
989 tracing::error!("{msg}");
990 ctx.retire(Err(anyhow!("{msg}").into()));
991 return;
992 };
993
994 let read_holds =
998 Some(self.acquire_read_holds(&dataflow_import_id_bundle(&plan, mview.cluster_id)));
999
1000 let frontiers = self
1001 .controller
1002 .compute
1003 .collection_frontiers(gid, Some(mview.cluster_id))
1004 .expect("materialized view exists");
1005
1006 let as_of = frontiers.read_frontier.to_owned();
1007
1008 let until = mview
1009 .refresh_schedule
1010 .as_ref()
1011 .and_then(|s| s.last_refresh())
1012 .unwrap_or(mz_repr::Timestamp::MAX);
1013
1014 let mz_now = match as_of.as_option() {
1015 Some(&as_of) => {
1016 ResultSpec::value_between(Datum::MzTimestamp(as_of), Datum::MzTimestamp(until))
1017 }
1018 None => ResultSpec::value_all(),
1019 };
1020
1021 self.execute_explain_pushdown_with_read_holds(
1022 ctx,
1023 as_of,
1024 mz_now,
1025 read_holds,
1026 plan.source_imports
1027 .into_iter()
1028 .filter_map(|(id, import)| import.desc.arguments.operators.map(|mfp| (id, mfp))),
1029 )
1030 .await
1031 }
1032}