1use anyhow::anyhow;
11use differential_dataflow::lattice::Lattice;
12use maplit::btreemap;
13use maplit::btreeset;
14use mz_adapter_types::compaction::CompactionWindow;
15use mz_catalog::memory::objects::{CatalogItem, MaterializedView};
16use mz_expr::{CollectionPlan, ResultSpec};
17use mz_ore::collections::CollectionExt;
18use mz_ore::instrument;
19use mz_ore::soft_panic_or_log;
20use mz_repr::explain::{ExprHumanizerExt, TransientItem};
21use mz_repr::optimize::OptimizerFeatures;
22use mz_repr::optimize::OverrideFrom;
23use mz_repr::refresh_schedule::RefreshSchedule;
24use mz_repr::{CatalogItemId, Datum, RelationVersion, Row, VersionedRelationDesc};
25use mz_sql::ast::ExplainStage;
26use mz_sql::catalog::CatalogError;
27use mz_sql::names::ResolvedIds;
28use mz_sql::plan;
29use mz_sql::session::metadata::SessionMetadata;
30use mz_sql_parser::ast;
31use mz_sql_parser::ast::display::AstDisplay;
32use mz_storage_client::controller::CollectionDescription;
33use std::collections::BTreeMap;
34use timely::progress::Antichain;
35use tracing::Span;
36
37use crate::ReadHolds;
38use crate::catalog::CatalogState;
39use crate::command::ExecuteResponse;
40use crate::coord::sequencer::inner::return_if_err;
41use crate::coord::{
42 Coordinator, CreateMaterializedViewExplain, CreateMaterializedViewFinish,
43 CreateMaterializedViewOptimize, CreateMaterializedViewStage, ExplainContext,
44 ExplainPlanContext, Message, PlanValidity, StageResult, Staged,
45};
46use crate::error::AdapterError;
47use crate::explain::explain_dataflow;
48use crate::explain::explain_plan;
49use crate::explain::optimizer_trace::OptimizerTrace;
50use crate::optimize::dataflows::dataflow_import_id_bundle;
51use crate::optimize::{self, Optimize};
52use crate::session::Session;
53use crate::util::ResultExt;
54use crate::{AdapterNotice, CollectionIdBundle, ExecuteContext, TimestampProvider, catalog};
55
56impl Staged for CreateMaterializedViewStage {
57 type Ctx = ExecuteContext;
58
59 fn validity(&mut self) -> &mut PlanValidity {
60 match self {
61 Self::Optimize(stage) => &mut stage.validity,
62 Self::Finish(stage) => &mut stage.validity,
63 Self::Explain(stage) => &mut stage.validity,
64 }
65 }
66
67 async fn stage(
68 self,
69 coord: &mut Coordinator,
70 ctx: &mut ExecuteContext,
71 ) -> Result<StageResult<Box<Self>>, AdapterError> {
72 match self {
73 CreateMaterializedViewStage::Optimize(stage) => {
74 coord.create_materialized_view_optimize(stage).await
75 }
76 CreateMaterializedViewStage::Finish(stage) => {
77 coord.create_materialized_view_finish(ctx, stage).await
78 }
79 CreateMaterializedViewStage::Explain(stage) => {
80 coord
81 .create_materialized_view_explain(ctx.session(), stage)
82 .await
83 }
84 }
85 }
86
87 fn message(self, ctx: ExecuteContext, span: Span) -> Message {
88 Message::CreateMaterializedViewStageReady {
89 ctx,
90 span,
91 stage: self,
92 }
93 }
94
95 fn cancel_enabled(&self) -> bool {
96 true
97 }
98}
99
100impl Coordinator {
101 #[instrument]
102 pub(crate) async fn sequence_create_materialized_view(
103 &mut self,
104 ctx: ExecuteContext,
105 plan: plan::CreateMaterializedViewPlan,
106 resolved_ids: ResolvedIds,
107 ) {
108 let stage = return_if_err!(
109 self.create_materialized_view_validate(
110 ctx.session(),
111 plan,
112 resolved_ids,
113 ExplainContext::None
114 ),
115 ctx
116 );
117 self.sequence_staged(ctx, Span::current(), stage).await;
118 }
119
120 #[instrument]
121 pub(crate) async fn explain_create_materialized_view(
122 &mut self,
123 ctx: ExecuteContext,
124 plan::ExplainPlanPlan {
125 stage,
126 format,
127 config,
128 explainee,
129 }: plan::ExplainPlanPlan,
130 ) {
131 let plan::Explainee::Statement(stmt) = explainee else {
132 unreachable!()
135 };
136 let plan::ExplaineeStatement::CreateMaterializedView { broken, plan } = stmt else {
137 unreachable!()
140 };
141
142 let optimizer_trace = OptimizerTrace::new(stage.paths());
145
146 let resolved_ids = ResolvedIds::empty();
148
149 let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
150 broken,
151 config,
152 format,
153 stage,
154 replan: None,
155 desc: None,
156 optimizer_trace,
157 });
158 let stage = return_if_err!(
159 self.create_materialized_view_validate(ctx.session(), plan, resolved_ids, explain_ctx),
160 ctx
161 );
162 self.sequence_staged(ctx, Span::current(), stage).await;
163 }
164
165 #[instrument]
166 pub(crate) async fn explain_replan_materialized_view(
167 &mut self,
168 ctx: ExecuteContext,
169 plan::ExplainPlanPlan {
170 stage,
171 format,
172 config,
173 explainee,
174 }: plan::ExplainPlanPlan,
175 ) {
176 let plan::Explainee::ReplanMaterializedView(id) = explainee else {
177 unreachable!() };
179 let CatalogItem::MaterializedView(item) = self.catalog().get_entry(&id).item() else {
180 unreachable!() };
182 let gid = item.global_id_writes();
183
184 let create_sql = item.create_sql.clone();
185 let plan_result = self
186 .catalog_mut()
187 .deserialize_plan_with_enable_for_item_parsing(&create_sql, true);
188 let (plan, resolved_ids) = return_if_err!(plan_result, ctx);
189
190 let plan::Plan::CreateMaterializedView(plan) = plan else {
191 unreachable!() };
193
194 let broken = false;
197
198 let optimizer_trace = OptimizerTrace::new(stage.paths());
201
202 let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
203 broken,
204 config,
205 format,
206 stage,
207 replan: Some(gid),
208 desc: None,
209 optimizer_trace,
210 });
211 let stage = return_if_err!(
212 self.create_materialized_view_validate(ctx.session(), plan, resolved_ids, explain_ctx,),
213 ctx
214 );
215 self.sequence_staged(ctx, Span::current(), stage).await;
216 }
217
218 #[instrument]
219 pub(super) fn explain_materialized_view(
220 &self,
221 ctx: &ExecuteContext,
222 plan::ExplainPlanPlan {
223 stage,
224 format,
225 config,
226 explainee,
227 }: plan::ExplainPlanPlan,
228 ) -> Result<ExecuteResponse, AdapterError> {
229 let plan::Explainee::MaterializedView(id) = explainee else {
230 unreachable!() };
232 let CatalogItem::MaterializedView(view) = self.catalog().get_entry(&id).item() else {
233 unreachable!() };
235 let gid = view.global_id_writes();
236
237 let Some(dataflow_metainfo) = self.catalog().try_get_dataflow_metainfo(&gid) else {
238 if !id.is_system() {
239 tracing::error!(
240 "cannot find dataflow metainformation for materialized view {id} in catalog"
241 );
242 }
243 coord_bail!(
244 "cannot find dataflow metainformation for materialized view {id} in catalog"
245 );
246 };
247
248 let target_cluster = self.catalog().get_cluster(view.cluster_id);
249
250 let features = OptimizerFeatures::from(self.catalog().system_config())
251 .override_from(&target_cluster.config.features())
252 .override_from(&config.features);
253
254 let cardinality_stats = BTreeMap::new();
255
256 let explain = match stage {
257 ExplainStage::RawPlan => explain_plan(
258 view.raw_expr.as_ref().clone(),
259 format,
260 &config,
261 &features,
262 &self.catalog().for_session(ctx.session()),
263 cardinality_stats,
264 Some(target_cluster.name.as_str()),
265 )?,
266 ExplainStage::LocalPlan => explain_plan(
267 view.locally_optimized_expr.as_inner().clone(),
268 format,
269 &config,
270 &features,
271 &self.catalog().for_session(ctx.session()),
272 cardinality_stats,
273 Some(target_cluster.name.as_str()),
274 )?,
275 ExplainStage::GlobalPlan => {
276 let Some(plan) = self.catalog().try_get_optimized_plan(&gid).cloned() else {
277 tracing::error!("cannot find {stage} for materialized view {id} in catalog");
278 coord_bail!("cannot find {stage} for materialized view in catalog");
279 };
280 explain_dataflow(
281 plan,
282 format,
283 &config,
284 &features,
285 &self.catalog().for_session(ctx.session()),
286 cardinality_stats,
287 Some(target_cluster.name.as_str()),
288 dataflow_metainfo,
289 )?
290 }
291 ExplainStage::PhysicalPlan => {
292 let Some(plan) = self.catalog().try_get_physical_plan(&gid).cloned() else {
293 tracing::error!("cannot find {stage} for materialized view {id} in catalog",);
294 coord_bail!("cannot find {stage} for materialized view in catalog");
295 };
296 explain_dataflow(
297 plan,
298 format,
299 &config,
300 &features,
301 &self.catalog().for_session(ctx.session()),
302 cardinality_stats,
303 Some(target_cluster.name.as_str()),
304 dataflow_metainfo,
305 )?
306 }
307 _ => {
308 coord_bail!("cannot EXPLAIN {} FOR MATERIALIZED VIEW", stage);
309 }
310 };
311
312 let row = Row::pack_slice(&[Datum::from(explain.as_str())]);
313
314 Ok(Self::send_immediate_rows(row))
315 }
316
317 #[instrument]
318 fn create_materialized_view_validate(
319 &self,
320 session: &Session,
321 plan: plan::CreateMaterializedViewPlan,
322 resolved_ids: ResolvedIds,
323 explain_ctx: ExplainContext,
326 ) -> Result<CreateMaterializedViewStage, AdapterError> {
327 let plan::CreateMaterializedViewPlan {
328 materialized_view:
329 plan::MaterializedView {
330 expr,
331 cluster_id,
332 target_replica,
333 refresh_schedule,
334 ..
335 },
336 ambiguous_columns,
337 ..
338 } = &plan;
339
340 let expr_depends_on = expr.depends_on();
345 self.catalog()
346 .validate_timeline_context(expr_depends_on.iter().copied())?;
347 self.validate_system_column_references(*ambiguous_columns, &expr_depends_on)?;
348 let log_names = expr_depends_on
351 .iter()
352 .map(|gid| self.catalog.resolve_item_id(gid))
353 .flat_map(|item_id| self.catalog().introspection_dependencies(item_id))
354 .map(|item_id| self.catalog().get_entry(&item_id).name().item.clone())
355 .collect::<Vec<_>>();
356 if !log_names.is_empty() {
357 return Err(AdapterError::InvalidLogDependency {
358 object_type: "materialized view".into(),
359 log_names,
360 });
361 }
362
363 let validity = PlanValidity::new(
369 self.catalog().transient_revision(),
370 resolved_ids.items().copied().collect(),
371 Some(*cluster_id),
372 *target_replica,
373 session.role_metadata().clone(),
374 );
375
376 if let Some(refresh_schedule) = refresh_schedule {
378 if !refresh_schedule.ats.is_empty() && matches!(explain_ctx, ExplainContext::None) {
379 let read_holds = self
382 .txn_read_holds
383 .get(session.conn_id())
384 .expect("purification acquired read holds if there are REFRESH ATs");
385 let least_valid_read = read_holds.least_valid_read();
386 for refresh_at_ts in &refresh_schedule.ats {
387 if !least_valid_read.less_equal(refresh_at_ts) {
388 return Err(AdapterError::InputNotReadableAtRefreshAtTime(
389 *refresh_at_ts,
390 least_valid_read,
391 ));
392 }
393 }
394 let ids = self
397 .index_oracle(*cluster_id)
398 .sufficient_collections(resolved_ids.collections().copied());
399 if !ids.difference(&read_holds.id_bundle()).is_empty() {
400 return Err(AdapterError::ChangedPlan(
401 "the set of possible inputs changed during the creation of the \
402 materialized view"
403 .to_string(),
404 ));
405 }
406 }
407 }
408
409 Ok(CreateMaterializedViewStage::Optimize(
410 CreateMaterializedViewOptimize {
411 validity,
412 plan,
413 resolved_ids,
414 explain_ctx,
415 },
416 ))
417 }
418
419 #[instrument]
420 async fn create_materialized_view_optimize(
421 &mut self,
422 CreateMaterializedViewOptimize {
423 validity,
424 plan,
425 resolved_ids,
426 explain_ctx,
427 }: CreateMaterializedViewOptimize,
428 ) -> Result<StageResult<Box<CreateMaterializedViewStage>>, AdapterError> {
429 let plan::CreateMaterializedViewPlan {
430 name,
431 materialized_view:
432 plan::MaterializedView {
433 column_names,
434 cluster_id,
435 non_null_assertions,
436 refresh_schedule,
437 ..
438 },
439 ..
440 } = &plan;
441
442 let compute_instance = self
444 .instance_snapshot(*cluster_id)
445 .expect("compute instance does not exist");
446 let (item_id, global_id) = if let ExplainContext::None = explain_ctx {
447 self.allocate_user_id().await?
448 } else {
449 self.allocate_transient_id()
450 };
451
452 let (_, view_id) = self.allocate_transient_id();
453 let debug_name = self.catalog().resolve_full_name(name, None).to_string();
454 let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config())
455 .override_from(&self.catalog.get_cluster(*cluster_id).config.features())
456 .override_from(&explain_ctx);
457 let optimizer_features = optimizer_config.features.clone();
458
459 let mut optimizer = optimize::materialized_view::Optimizer::new(
461 self.owned_catalog().as_optimizer_catalog(),
462 compute_instance,
463 global_id,
464 view_id,
465 column_names.clone(),
466 non_null_assertions.clone(),
467 refresh_schedule.clone(),
468 debug_name,
469 optimizer_config,
470 self.optimizer_metrics(),
471 );
472
473 let span = Span::current();
474 Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
475 || "optimize create materialized view",
476 move || {
477 span.in_scope(|| {
478 let mut pipeline = || -> Result<(
479 optimize::materialized_view::LocalMirPlan,
480 optimize::materialized_view::GlobalMirPlan,
481 optimize::materialized_view::GlobalLirPlan,
482 ), AdapterError> {
483 let _dispatch_guard = explain_ctx.dispatch_guard();
484
485 let raw_expr = plan.materialized_view.expr.clone();
486
487 let local_mir_plan = optimizer.catch_unwind_optimize(raw_expr)?;
489 let global_mir_plan =
490 optimizer.catch_unwind_optimize(local_mir_plan.clone())?;
491 let global_lir_plan =
493 optimizer.catch_unwind_optimize(global_mir_plan.clone())?;
494
495 Ok((local_mir_plan, global_mir_plan, global_lir_plan))
496 };
497
498 let stage = match pipeline() {
499 Ok((local_mir_plan, global_mir_plan, global_lir_plan)) => {
500 if let ExplainContext::Plan(explain_ctx) = explain_ctx {
501 let (_, df_meta) = global_lir_plan.unapply();
502 CreateMaterializedViewStage::Explain(
503 CreateMaterializedViewExplain {
504 validity,
505 global_id,
506 plan,
507 df_meta,
508 explain_ctx,
509 },
510 )
511 } else {
512 CreateMaterializedViewStage::Finish(CreateMaterializedViewFinish {
513 item_id,
514 global_id,
515 validity,
516 plan,
517 resolved_ids,
518 local_mir_plan,
519 global_mir_plan,
520 global_lir_plan,
521 optimizer_features,
522 })
523 }
524 }
525 Err(err) => {
528 let ExplainContext::Plan(explain_ctx) = explain_ctx else {
529 return Err(err);
531 };
532
533 if explain_ctx.broken {
534 tracing::error!("error while handling EXPLAIN statement: {}", err);
538 CreateMaterializedViewStage::Explain(
539 CreateMaterializedViewExplain {
540 global_id,
541 validity,
542 plan,
543 df_meta: Default::default(),
544 explain_ctx,
545 },
546 )
547 } else {
548 return Err(err);
550 }
551 }
552 };
553
554 Ok(Box::new(stage))
555 })
556 },
557 )))
558 }
559
560 #[instrument]
561 async fn create_materialized_view_finish(
562 &mut self,
563 ctx: &mut ExecuteContext,
564 stage: CreateMaterializedViewFinish,
565 ) -> Result<StageResult<Box<CreateMaterializedViewStage>>, AdapterError> {
566 let CreateMaterializedViewFinish {
567 item_id,
568 global_id,
569 plan:
570 plan::CreateMaterializedViewPlan {
571 name,
572 materialized_view:
573 plan::MaterializedView {
574 mut create_sql,
575 expr: raw_expr,
576 column_names,
577 dependencies,
578 replacement_target,
579 cluster_id,
580 target_replica,
581 non_null_assertions,
582 compaction_window,
583 refresh_schedule,
584 ..
585 },
586 drop_ids,
587 if_not_exists,
588 ..
589 },
590 resolved_ids,
591 local_mir_plan,
592 global_mir_plan,
593 global_lir_plan,
594 optimizer_features,
595 ..
596 } = stage;
597
598 if let Some(target_id) = replacement_target {
600 let Some(target) = self.catalog().get_entry(&target_id).materialized_view() else {
601 return Err(AdapterError::internal(
602 "create materialized view",
603 "replacement target not a materialized view",
604 ));
605 };
606
607 let schema_diff = target.desc.latest().diff(global_lir_plan.desc());
609 if !schema_diff.is_empty() {
610 return Err(AdapterError::ReplacementSchemaMismatch(schema_diff));
611 }
612 }
613
614 let id_bundle = dataflow_import_id_bundle(global_lir_plan.df_desc(), cluster_id);
616
617 let read_holds_owned;
618 let read_holds = if let Some(txn_reads) = self.txn_read_holds.get(ctx.session().conn_id()) {
619 txn_reads
623 } else {
624 read_holds_owned = self.acquire_read_holds(&id_bundle);
627 &read_holds_owned
628 };
629
630 let (dataflow_as_of, storage_as_of, until) =
631 self.select_timestamps(id_bundle, refresh_schedule.as_ref(), read_holds)?;
632
633 tracing::info!(
634 dataflow_as_of = ?dataflow_as_of,
635 storage_as_of = ?storage_as_of,
636 until = ?until,
637 "materialized view timestamp selection",
638 );
639
640 let initial_as_of = storage_as_of.clone();
641
642 if let Some(storage_as_of_ts) = storage_as_of.as_option() {
647 let stmt = mz_sql::parse::parse(&create_sql)
648 .map_err(|_| {
649 AdapterError::internal(
650 "create materialized view",
651 "original SQL should roundtrip",
652 )
653 })?
654 .into_element()
655 .ast;
656 let ast::Statement::CreateMaterializedView(mut stmt) = stmt else {
657 panic!("unexpected statement type");
658 };
659 stmt.as_of = Some(storage_as_of_ts.into());
660 create_sql = stmt.to_ast_string_stable();
661 }
662
663 let desc = VersionedRelationDesc::new(global_lir_plan.desc().clone());
664 let collections = [(RelationVersion::root(), global_id)].into_iter().collect();
665
666 let local_mir_for_cache = local_mir_plan.expr();
667
668 let ops = vec![
669 catalog::Op::DropObjects(
670 drop_ids
671 .into_iter()
672 .map(catalog::DropObjectInfo::Item)
673 .collect(),
674 ),
675 catalog::Op::CreateItem {
676 id: item_id,
677 name: name.clone(),
678 item: CatalogItem::MaterializedView(MaterializedView {
679 create_sql,
680 raw_expr: raw_expr.into(),
681 locally_optimized_expr: local_mir_plan.expr().into(),
682 desc,
683 collections,
684 resolved_ids,
685 dependencies,
686 replacement_target,
687 cluster_id,
688 target_replica,
689 non_null_assertions,
690 custom_logical_compaction_window: compaction_window,
691 refresh_schedule: refresh_schedule.clone(),
692 initial_as_of: Some(initial_as_of.clone()),
693 optimized_plan: None,
694 physical_plan: None,
695 dataflow_metainfo: None,
696 }),
697 owner_id: *ctx.session().current_role_id(),
698 },
699 ];
700
701 let notice_ids = std::iter::repeat_with(|| self.allocate_transient_id())
703 .map(|(_item_id, global_id)| global_id)
704 .take(global_lir_plan.df_meta().optimizer_notices.len())
705 .collect::<Vec<_>>();
706
707 let output_desc = global_lir_plan.desc().clone();
720 let (mut df_desc, raw_df_meta) = global_lir_plan.unapply();
721 let df_meta = {
722 let system_catalog = self.catalog().for_system_session();
723 let full_name = self.catalog().resolve_full_name(&name, None);
724 let transient_items = btreemap! {
725 global_id => TransientItem::new(
726 Some(full_name.into_parts()),
727 Some(column_names.iter().map(|c| c.to_string()).collect()),
728 )
729 };
730 let humanizer = ExprHumanizerExt::new(transient_items, &system_catalog);
731 CatalogState::render_notices_core(
732 &humanizer,
733 (self.catalog().config().now)(),
734 &raw_df_meta,
735 notice_ids,
736 Some(global_id),
737 )
738 };
739
740 self.catalog()
745 .cache_expressions(
746 global_id,
747 Some(local_mir_for_cache),
748 global_mir_plan.df_desc().clone(),
749 df_desc.clone(),
750 df_meta.clone(),
751 optimizer_features,
752 )
753 .await;
754
755 let transact_result = self
756 .catalog_transact_with_side_effects(Some(ctx), ops, move |coord, _ctx| {
757 Box::pin(async move {
758 coord
760 .catalog_mut()
761 .set_optimized_plan(global_id, global_mir_plan.df_desc().clone());
762 coord
763 .catalog_mut()
764 .set_physical_plan(global_id, df_desc.clone());
765
766 let notice_builtin_updates_fut =
767 coord.persist_dataflow_metainfo(df_meta, global_id).await;
768
769 df_desc.set_as_of(dataflow_as_of.clone());
770 df_desc.set_initial_as_of(initial_as_of);
771 df_desc.until = until;
772
773 let storage_metadata = coord.catalog.state().storage_metadata();
774
775 let mut collection_desc =
776 CollectionDescription::for_other(output_desc, Some(storage_as_of));
777 let mut allow_writes = true;
778
779 if let Some(target_id) = replacement_target {
782 let target_gid = coord.catalog.get_entry(&target_id).latest_global_id();
783 collection_desc.primary = Some(target_gid);
784 allow_writes = false;
785 }
786
787 coord
789 .controller
790 .storage
791 .create_collections(
792 storage_metadata,
793 None,
794 vec![(global_id, collection_desc)],
795 )
796 .await
797 .unwrap_or_terminate("cannot fail to append");
798
799 coord
800 .initialize_storage_read_policies(
801 btreeset![item_id],
802 compaction_window.unwrap_or(CompactionWindow::Default),
803 )
804 .await;
805
806 coord
807 .ship_dataflow_and_notice_builtin_table_updates(
808 df_desc,
809 cluster_id,
810 notice_builtin_updates_fut,
811 target_replica,
812 )
813 .await;
814
815 if allow_writes {
816 coord.allow_writes(cluster_id, global_id);
817 }
818 })
819 })
820 .await;
821
822 match transact_result {
823 Ok(_) => {
824 self.emit_raw_optimizer_notices_to_user(ctx, &raw_df_meta.optimizer_notices);
829 Ok(ExecuteResponse::CreatedMaterializedView)
830 }
831 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
832 kind:
833 mz_catalog::memory::error::ErrorKind::Sql(
834 CatalogError::ItemAlreadyExists(_, _),
835 ),
836 })) if if_not_exists => {
837 ctx.session()
838 .add_notice(AdapterNotice::ObjectAlreadyExists {
839 name: name.item,
840 ty: "materialized view",
841 });
842 Ok(ExecuteResponse::CreatedMaterializedView)
843 }
844 Err(err) => Err(err),
845 }
846 .map(StageResult::Response)
847 }
848
849 fn select_timestamps(
852 &self,
853 id_bundle: CollectionIdBundle,
854 refresh_schedule: Option<&RefreshSchedule>,
855 read_holds: &ReadHolds,
856 ) -> Result<
857 (
858 Antichain<mz_repr::Timestamp>,
859 Antichain<mz_repr::Timestamp>,
860 Antichain<mz_repr::Timestamp>,
861 ),
862 AdapterError,
863 > {
864 assert!(
865 id_bundle.difference(&read_holds.id_bundle()).is_empty(),
866 "we must have read holds for all involved collections"
867 );
868
869 let least_valid_read = read_holds.least_valid_read();
872 let mut dataflow_as_of = least_valid_read.clone();
873 let mut storage_as_of = least_valid_read.clone();
874
875 if let Some(refresh_schedule) = &refresh_schedule {
885 if let Some(least_valid_read_ts) = least_valid_read.as_option() {
886 if let Some(first_refresh_ts) =
887 refresh_schedule.round_up_timestamp(*least_valid_read_ts)
888 {
889 storage_as_of = Antichain::from_elem(first_refresh_ts);
890 dataflow_as_of.join_assign(
891 &self
892 .greatest_available_read(&id_bundle)
893 .meet(&storage_as_of),
894 );
895 } else {
896 let last_refresh = refresh_schedule.last_refresh().expect(
897 "if round_up_timestamp returned None, then there should be a last refresh",
898 );
899
900 return Err(AdapterError::MaterializedViewWouldNeverRefresh(
901 last_refresh,
902 *least_valid_read_ts,
903 ));
904 }
905 } else {
906 soft_panic_or_log!("creating a materialized view with an empty `as_of`");
908 }
909 }
910
911 let until_ts = refresh_schedule
915 .and_then(|s| s.last_refresh())
916 .and_then(|r| r.try_step_forward());
917 let until = Antichain::from_iter(until_ts);
918
919 Ok((dataflow_as_of, storage_as_of, until))
920 }
921
922 #[instrument]
923 async fn create_materialized_view_explain(
924 &self,
925 session: &Session,
926 CreateMaterializedViewExplain {
927 global_id,
928 plan:
929 plan::CreateMaterializedViewPlan {
930 name,
931 materialized_view:
932 plan::MaterializedView {
933 column_names,
934 cluster_id,
935 ..
936 },
937 ..
938 },
939 df_meta,
940 explain_ctx:
941 ExplainPlanContext {
942 config,
943 format,
944 stage,
945 optimizer_trace,
946 ..
947 },
948 ..
949 }: CreateMaterializedViewExplain,
950 ) -> Result<StageResult<Box<CreateMaterializedViewStage>>, AdapterError> {
951 let session_catalog = self.catalog().for_session(session);
952 let expr_humanizer = {
953 let full_name = self.catalog().resolve_full_name(&name, None);
954 let transient_items = btreemap! {
955 global_id => TransientItem::new(
956 Some(full_name.into_parts()),
957 Some(column_names.iter().map(|c| c.to_string()).collect()),
958 )
959 };
960 ExprHumanizerExt::new(transient_items, &session_catalog)
961 };
962
963 let target_cluster = self.catalog().get_cluster(cluster_id);
964
965 let features = OptimizerFeatures::from(self.catalog().system_config())
966 .override_from(&target_cluster.config.features())
967 .override_from(&config.features);
968
969 let rows = optimizer_trace
970 .into_rows(
971 format,
972 &config,
973 &features,
974 &expr_humanizer,
975 None,
976 Some(target_cluster),
977 df_meta,
978 stage,
979 plan::ExplaineeStatementKind::CreateMaterializedView,
980 None,
981 )
982 .await?;
983
984 Ok(StageResult::Response(Self::send_immediate_rows(rows)))
985 }
986
987 pub(crate) async fn explain_pushdown_materialized_view(
988 &self,
989 ctx: ExecuteContext,
990 item_id: CatalogItemId,
991 ) {
992 let CatalogItem::MaterializedView(mview) = self.catalog().get_entry(&item_id).item() else {
993 unreachable!() };
995 let gid = mview.global_id_writes();
996 let mview = mview.clone();
997
998 let Some(plan) = self.catalog().try_get_physical_plan(&gid).cloned() else {
999 let msg = format!("cannot find plan for materialized view {item_id} in catalog");
1000 tracing::error!("{msg}");
1001 ctx.retire(Err(anyhow!("{msg}").into()));
1002 return;
1003 };
1004
1005 let read_holds =
1009 Some(self.acquire_read_holds(&dataflow_import_id_bundle(&plan, mview.cluster_id)));
1010
1011 let frontiers = self
1012 .controller
1013 .compute
1014 .collection_frontiers(gid, Some(mview.cluster_id))
1015 .expect("materialized view exists");
1016
1017 let as_of = frontiers.read_frontier.to_owned();
1018
1019 let until = mview
1020 .refresh_schedule
1021 .as_ref()
1022 .and_then(|s| s.last_refresh())
1023 .unwrap_or(mz_repr::Timestamp::MAX);
1024
1025 let mz_now = match as_of.as_option() {
1026 Some(&as_of) => {
1027 ResultSpec::value_between(Datum::MzTimestamp(as_of), Datum::MzTimestamp(until))
1028 }
1029 None => ResultSpec::value_all(),
1030 };
1031
1032 self.execute_explain_pushdown_with_read_holds(
1033 ctx,
1034 as_of,
1035 mz_now,
1036 read_holds,
1037 plan.source_imports
1038 .into_iter()
1039 .filter_map(|(id, import)| import.desc.arguments.operators.map(|mfp| (id, mfp))),
1040 )
1041 .await
1042 }
1043}