1use anyhow::anyhow;
11use differential_dataflow::lattice::Lattice;
12use maplit::btreemap;
13use maplit::btreeset;
14use mz_adapter_types::compaction::CompactionWindow;
15use mz_catalog::memory::objects::{CatalogItem, MaterializedView};
16use mz_expr::{CollectionPlan, ResultSpec};
17use mz_ore::collections::CollectionExt;
18use mz_ore::instrument;
19use mz_ore::soft_panic_or_log;
20use mz_repr::explain::{ExprHumanizerExt, TransientItem};
21use mz_repr::optimize::OptimizerFeatures;
22use mz_repr::optimize::OverrideFrom;
23use mz_repr::refresh_schedule::RefreshSchedule;
24use mz_repr::{CatalogItemId, Datum, RelationVersion, Row, VersionedRelationDesc};
25use mz_sql::ast::ExplainStage;
26use mz_sql::catalog::CatalogError;
27use mz_sql::names::ResolvedIds;
28use mz_sql::plan;
29use mz_sql::session::metadata::SessionMetadata;
30use mz_sql_parser::ast;
31use mz_sql_parser::ast::display::AstDisplay;
32use mz_storage_client::controller::CollectionDescription;
33use std::collections::BTreeMap;
34use timely::progress::Antichain;
35use tracing::Span;
36
37use crate::ReadHolds;
38use crate::command::ExecuteResponse;
39use crate::coord::sequencer::inner::return_if_err;
40use crate::coord::{
41 Coordinator, CreateMaterializedViewExplain, CreateMaterializedViewFinish,
42 CreateMaterializedViewOptimize, CreateMaterializedViewStage, ExplainContext,
43 ExplainPlanContext, Message, PlanValidity, StageResult, Staged,
44};
45use crate::error::AdapterError;
46use crate::explain::explain_dataflow;
47use crate::explain::explain_plan;
48use crate::explain::optimizer_trace::OptimizerTrace;
49use crate::optimize::dataflows::dataflow_import_id_bundle;
50use crate::optimize::{self, Optimize};
51use crate::session::Session;
52use crate::util::ResultExt;
53use crate::{AdapterNotice, CollectionIdBundle, ExecuteContext, TimestampProvider, catalog};
54
55impl Staged for CreateMaterializedViewStage {
56 type Ctx = ExecuteContext;
57
58 fn validity(&mut self) -> &mut PlanValidity {
59 match self {
60 Self::Optimize(stage) => &mut stage.validity,
61 Self::Finish(stage) => &mut stage.validity,
62 Self::Explain(stage) => &mut stage.validity,
63 }
64 }
65
66 async fn stage(
67 self,
68 coord: &mut Coordinator,
69 ctx: &mut ExecuteContext,
70 ) -> Result<StageResult<Box<Self>>, AdapterError> {
71 match self {
72 CreateMaterializedViewStage::Optimize(stage) => {
73 coord.create_materialized_view_optimize(stage).await
74 }
75 CreateMaterializedViewStage::Finish(stage) => {
76 coord.create_materialized_view_finish(ctx, stage).await
77 }
78 CreateMaterializedViewStage::Explain(stage) => {
79 coord
80 .create_materialized_view_explain(ctx.session(), stage)
81 .await
82 }
83 }
84 }
85
86 fn message(self, ctx: ExecuteContext, span: Span) -> Message {
87 Message::CreateMaterializedViewStageReady {
88 ctx,
89 span,
90 stage: self,
91 }
92 }
93
94 fn cancel_enabled(&self) -> bool {
95 true
96 }
97}
98
99impl Coordinator {
100 #[instrument]
101 pub(crate) async fn sequence_create_materialized_view(
102 &mut self,
103 ctx: ExecuteContext,
104 plan: plan::CreateMaterializedViewPlan,
105 resolved_ids: ResolvedIds,
106 ) {
107 let stage = return_if_err!(
108 self.create_materialized_view_validate(
109 ctx.session(),
110 plan,
111 resolved_ids,
112 ExplainContext::None
113 ),
114 ctx
115 );
116 self.sequence_staged(ctx, Span::current(), stage).await;
117 }
118
119 #[instrument]
120 pub(crate) async fn explain_create_materialized_view(
121 &mut self,
122 ctx: ExecuteContext,
123 plan::ExplainPlanPlan {
124 stage,
125 format,
126 config,
127 explainee,
128 }: plan::ExplainPlanPlan,
129 ) {
130 let plan::Explainee::Statement(stmt) = explainee else {
131 unreachable!()
134 };
135 let plan::ExplaineeStatement::CreateMaterializedView { broken, plan } = stmt else {
136 unreachable!()
139 };
140
141 let optimizer_trace = OptimizerTrace::new(stage.paths());
144
145 let resolved_ids = ResolvedIds::empty();
147
148 let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
149 broken,
150 config,
151 format,
152 stage,
153 replan: None,
154 desc: None,
155 optimizer_trace,
156 });
157 let stage = return_if_err!(
158 self.create_materialized_view_validate(ctx.session(), plan, resolved_ids, explain_ctx),
159 ctx
160 );
161 self.sequence_staged(ctx, Span::current(), stage).await;
162 }
163
164 #[instrument]
165 pub(crate) async fn explain_replan_materialized_view(
166 &mut self,
167 ctx: ExecuteContext,
168 plan::ExplainPlanPlan {
169 stage,
170 format,
171 config,
172 explainee,
173 }: plan::ExplainPlanPlan,
174 ) {
175 let plan::Explainee::ReplanMaterializedView(id) = explainee else {
176 unreachable!() };
178 let CatalogItem::MaterializedView(item) = self.catalog().get_entry(&id).item() else {
179 unreachable!() };
181 let gid = item.global_id_writes();
182
183 let create_sql = item.create_sql.clone();
184 let plan_result = self
185 .catalog_mut()
186 .deserialize_plan_with_enable_for_item_parsing(&create_sql, true);
187 let (plan, resolved_ids) = return_if_err!(plan_result, ctx);
188
189 let plan::Plan::CreateMaterializedView(plan) = plan else {
190 unreachable!() };
192
193 let broken = false;
196
197 let optimizer_trace = OptimizerTrace::new(stage.paths());
200
201 let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
202 broken,
203 config,
204 format,
205 stage,
206 replan: Some(gid),
207 desc: None,
208 optimizer_trace,
209 });
210 let stage = return_if_err!(
211 self.create_materialized_view_validate(ctx.session(), plan, resolved_ids, explain_ctx,),
212 ctx
213 );
214 self.sequence_staged(ctx, Span::current(), stage).await;
215 }
216
217 #[instrument]
218 pub(super) fn explain_materialized_view(
219 &self,
220 ctx: &ExecuteContext,
221 plan::ExplainPlanPlan {
222 stage,
223 format,
224 config,
225 explainee,
226 }: plan::ExplainPlanPlan,
227 ) -> Result<ExecuteResponse, AdapterError> {
228 let plan::Explainee::MaterializedView(id) = explainee else {
229 unreachable!() };
231 let CatalogItem::MaterializedView(view) = self.catalog().get_entry(&id).item() else {
232 unreachable!() };
234 let gid = view.global_id_writes();
235
236 let Some(dataflow_metainfo) = self.catalog().try_get_dataflow_metainfo(&gid) else {
237 if !id.is_system() {
238 tracing::error!(
239 "cannot find dataflow metainformation for materialized view {id} in catalog"
240 );
241 }
242 coord_bail!(
243 "cannot find dataflow metainformation for materialized view {id} in catalog"
244 );
245 };
246
247 let target_cluster = self.catalog().get_cluster(view.cluster_id);
248
249 let features = OptimizerFeatures::from(self.catalog().system_config())
250 .override_from(&target_cluster.config.features())
251 .override_from(&config.features);
252
253 let cardinality_stats = BTreeMap::new();
254
255 let explain = match stage {
256 ExplainStage::RawPlan => explain_plan(
257 view.raw_expr.as_ref().clone(),
258 format,
259 &config,
260 &features,
261 &self.catalog().for_session(ctx.session()),
262 cardinality_stats,
263 Some(target_cluster.name.as_str()),
264 )?,
265 ExplainStage::LocalPlan => explain_plan(
266 view.optimized_expr.as_inner().clone(),
267 format,
268 &config,
269 &features,
270 &self.catalog().for_session(ctx.session()),
271 cardinality_stats,
272 Some(target_cluster.name.as_str()),
273 )?,
274 ExplainStage::GlobalPlan => {
275 let Some(plan) = self.catalog().try_get_optimized_plan(&gid).cloned() else {
276 tracing::error!("cannot find {stage} for materialized view {id} in catalog");
277 coord_bail!("cannot find {stage} for materialized view in catalog");
278 };
279 explain_dataflow(
280 plan,
281 format,
282 &config,
283 &features,
284 &self.catalog().for_session(ctx.session()),
285 cardinality_stats,
286 Some(target_cluster.name.as_str()),
287 dataflow_metainfo,
288 )?
289 }
290 ExplainStage::PhysicalPlan => {
291 let Some(plan) = self.catalog().try_get_physical_plan(&gid).cloned() else {
292 tracing::error!("cannot find {stage} for materialized view {id} in catalog",);
293 coord_bail!("cannot find {stage} for materialized view in catalog");
294 };
295 explain_dataflow(
296 plan,
297 format,
298 &config,
299 &features,
300 &self.catalog().for_session(ctx.session()),
301 cardinality_stats,
302 Some(target_cluster.name.as_str()),
303 dataflow_metainfo,
304 )?
305 }
306 _ => {
307 coord_bail!("cannot EXPLAIN {} FOR MATERIALIZED VIEW", stage);
308 }
309 };
310
311 let row = Row::pack_slice(&[Datum::from(explain.as_str())]);
312
313 Ok(Self::send_immediate_rows(row))
314 }
315
316 #[instrument]
317 fn create_materialized_view_validate(
318 &self,
319 session: &Session,
320 plan: plan::CreateMaterializedViewPlan,
321 resolved_ids: ResolvedIds,
322 explain_ctx: ExplainContext,
325 ) -> Result<CreateMaterializedViewStage, AdapterError> {
326 let plan::CreateMaterializedViewPlan {
327 materialized_view:
328 plan::MaterializedView {
329 expr,
330 cluster_id,
331 refresh_schedule,
332 ..
333 },
334 ambiguous_columns,
335 ..
336 } = &plan;
337
338 let expr_depends_on = expr.depends_on();
343 self.catalog()
344 .validate_timeline_context(expr_depends_on.iter().copied())?;
345 self.validate_system_column_references(*ambiguous_columns, &expr_depends_on)?;
346 let log_names = expr_depends_on
349 .iter()
350 .map(|gid| self.catalog.resolve_item_id(gid))
351 .flat_map(|item_id| self.catalog().introspection_dependencies(item_id))
352 .map(|item_id| self.catalog().get_entry(&item_id).name().item.clone())
353 .collect::<Vec<_>>();
354 if !log_names.is_empty() {
355 return Err(AdapterError::InvalidLogDependency {
356 object_type: "materialized view".into(),
357 log_names,
358 });
359 }
360
361 let validity =
362 PlanValidity::require_transient_revision(self.catalog().transient_revision());
363
364 if let Some(refresh_schedule) = refresh_schedule {
366 if !refresh_schedule.ats.is_empty() && matches!(explain_ctx, ExplainContext::None) {
367 let read_holds = self
370 .txn_read_holds
371 .get(session.conn_id())
372 .expect("purification acquired read holds if there are REFRESH ATs");
373 let least_valid_read = read_holds.least_valid_read();
374 for refresh_at_ts in &refresh_schedule.ats {
375 if !least_valid_read.less_equal(refresh_at_ts) {
376 return Err(AdapterError::InputNotReadableAtRefreshAtTime(
377 *refresh_at_ts,
378 least_valid_read,
379 ));
380 }
381 }
382 let ids = self
385 .index_oracle(*cluster_id)
386 .sufficient_collections(resolved_ids.collections().copied());
387 if !ids.difference(&read_holds.id_bundle()).is_empty() {
388 return Err(AdapterError::ChangedPlan(
389 "the set of possible inputs changed during the creation of the \
390 materialized view"
391 .to_string(),
392 ));
393 }
394 }
395 }
396
397 Ok(CreateMaterializedViewStage::Optimize(
398 CreateMaterializedViewOptimize {
399 validity,
400 plan,
401 resolved_ids,
402 explain_ctx,
403 },
404 ))
405 }
406
407 #[instrument]
408 async fn create_materialized_view_optimize(
409 &mut self,
410 CreateMaterializedViewOptimize {
411 validity,
412 plan,
413 resolved_ids,
414 explain_ctx,
415 }: CreateMaterializedViewOptimize,
416 ) -> Result<StageResult<Box<CreateMaterializedViewStage>>, AdapterError> {
417 let plan::CreateMaterializedViewPlan {
418 name,
419 materialized_view:
420 plan::MaterializedView {
421 column_names,
422 cluster_id,
423 non_null_assertions,
424 refresh_schedule,
425 ..
426 },
427 ..
428 } = &plan;
429
430 let compute_instance = self
432 .instance_snapshot(*cluster_id)
433 .expect("compute instance does not exist");
434 let (item_id, global_id) = if let ExplainContext::None = explain_ctx {
435 let id_ts = self.get_catalog_write_ts().await;
436 self.catalog().allocate_user_id(id_ts).await?
437 } else {
438 self.allocate_transient_id()
439 };
440
441 let (_, view_id) = self.allocate_transient_id();
442 let debug_name = self.catalog().resolve_full_name(name, None).to_string();
443 let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config())
444 .override_from(&self.catalog.get_cluster(*cluster_id).config.features())
445 .override_from(&explain_ctx);
446 let force_non_monotonic = Default::default();
447
448 let mut optimizer = optimize::materialized_view::Optimizer::new(
450 self.owned_catalog().as_optimizer_catalog(),
451 compute_instance,
452 global_id,
453 view_id,
454 column_names.clone(),
455 non_null_assertions.clone(),
456 refresh_schedule.clone(),
457 debug_name,
458 optimizer_config,
459 self.optimizer_metrics(),
460 force_non_monotonic,
461 );
462
463 let span = Span::current();
464 Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
465 || "optimize create materialized view",
466 move || {
467 span.in_scope(|| {
468 let mut pipeline = || -> Result<(
469 optimize::materialized_view::LocalMirPlan,
470 optimize::materialized_view::GlobalMirPlan,
471 optimize::materialized_view::GlobalLirPlan,
472 ), AdapterError> {
473 let _dispatch_guard = explain_ctx.dispatch_guard();
474
475 let raw_expr = plan.materialized_view.expr.clone();
476
477 let local_mir_plan = optimizer.catch_unwind_optimize(raw_expr)?;
479 let global_mir_plan = optimizer.catch_unwind_optimize(local_mir_plan.clone())?;
480 let global_lir_plan = optimizer.catch_unwind_optimize(global_mir_plan.clone())?;
482
483 Ok((local_mir_plan, global_mir_plan, global_lir_plan))
484 };
485
486 let stage = match pipeline() {
487 Ok((local_mir_plan, global_mir_plan, global_lir_plan)) => {
488 if let ExplainContext::Plan(explain_ctx) = explain_ctx {
489 let (_, df_meta) = global_lir_plan.unapply();
490 CreateMaterializedViewStage::Explain(
491 CreateMaterializedViewExplain {
492 validity,
493 global_id,
494 plan,
495 df_meta,
496 explain_ctx,
497 },
498 )
499 } else {
500 CreateMaterializedViewStage::Finish(CreateMaterializedViewFinish {
501 item_id,
502 global_id,
503 validity,
504 plan,
505 resolved_ids,
506 local_mir_plan,
507 global_mir_plan,
508 global_lir_plan,
509 })
510 }
511 }
512 Err(err) => {
515 let ExplainContext::Plan(explain_ctx) = explain_ctx else {
516 return Err(err);
518 };
519
520 if explain_ctx.broken {
521 tracing::error!("error while handling EXPLAIN statement: {}", err);
525 CreateMaterializedViewStage::Explain(
526 CreateMaterializedViewExplain {
527 global_id,
528 validity,
529 plan,
530 df_meta: Default::default(),
531 explain_ctx,
532 },
533 )
534 } else {
535 return Err(err);
537 }
538 }
539 };
540
541 Ok(Box::new(stage))
542 })
543 },
544 )))
545 }
546
547 #[instrument]
548 async fn create_materialized_view_finish(
549 &mut self,
550 ctx: &mut ExecuteContext,
551 stage: CreateMaterializedViewFinish,
552 ) -> Result<StageResult<Box<CreateMaterializedViewStage>>, AdapterError> {
553 let CreateMaterializedViewFinish {
554 item_id,
555 global_id,
556 plan:
557 plan::CreateMaterializedViewPlan {
558 name,
559 materialized_view:
560 plan::MaterializedView {
561 mut create_sql,
562 expr: raw_expr,
563 dependencies,
564 replacement_target,
565 cluster_id,
566 non_null_assertions,
567 compaction_window,
568 refresh_schedule,
569 ..
570 },
571 drop_ids,
572 if_not_exists,
573 ..
574 },
575 resolved_ids,
576 local_mir_plan,
577 global_mir_plan,
578 global_lir_plan,
579 ..
580 } = stage;
581
582 if let Some(target_id) = replacement_target {
585 let Some(target) = self.catalog().get_entry(&target_id).materialized_view() else {
586 return Err(AdapterError::internal(
587 "create materialized view",
588 "replacement target not a materialized view",
589 ));
590 };
591
592 if &target.desc.latest() != global_lir_plan.desc() {
594 return Err(AdapterError::Unstructured(anyhow!("incompatible schemas")));
595 }
596 }
597
598 let id_bundle = dataflow_import_id_bundle(global_lir_plan.df_desc(), cluster_id);
600
601 let read_holds_owned;
602 let read_holds = if let Some(txn_reads) = self.txn_read_holds.get(ctx.session().conn_id()) {
603 txn_reads
607 } else {
608 read_holds_owned = self.acquire_read_holds(&id_bundle);
611 &read_holds_owned
612 };
613
614 let (dataflow_as_of, storage_as_of, until) = self.select_timestamps(
615 id_bundle,
616 refresh_schedule.as_ref(),
617 read_holds,
618 replacement_target,
619 )?;
620
621 tracing::info!(
622 dataflow_as_of = ?dataflow_as_of,
623 storage_as_of = ?storage_as_of,
624 until = ?until,
625 "materialized view timestamp selection",
626 );
627
628 let initial_as_of = storage_as_of.clone();
629
630 if let Some(storage_as_of_ts) = storage_as_of.as_option() {
635 let stmt = mz_sql::parse::parse(&create_sql)
636 .map_err(|_| {
637 AdapterError::internal(
638 "create materialized view",
639 "original SQL should roundtrip",
640 )
641 })?
642 .into_element()
643 .ast;
644 let ast::Statement::CreateMaterializedView(mut stmt) = stmt else {
645 panic!("unexpected statement type");
646 };
647 stmt.as_of = Some(storage_as_of_ts.into());
648 create_sql = stmt.to_ast_string_stable();
649 }
650
651 let desc = VersionedRelationDesc::new(global_lir_plan.desc().clone());
652 let collections = [(RelationVersion::root(), global_id)].into_iter().collect();
653
654 let ops = vec![
655 catalog::Op::DropObjects(
656 drop_ids
657 .into_iter()
658 .map(catalog::DropObjectInfo::Item)
659 .collect(),
660 ),
661 catalog::Op::CreateItem {
662 id: item_id,
663 name: name.clone(),
664 item: CatalogItem::MaterializedView(MaterializedView {
665 create_sql,
666 raw_expr: raw_expr.into(),
667 optimized_expr: local_mir_plan.expr().into(),
668 desc,
669 collections,
670 resolved_ids,
671 dependencies,
672 replacement_target,
673 cluster_id,
674 non_null_assertions,
675 custom_logical_compaction_window: compaction_window,
676 refresh_schedule: refresh_schedule.clone(),
677 initial_as_of: Some(initial_as_of.clone()),
678 }),
679 owner_id: *ctx.session().current_role_id(),
680 },
681 ];
682
683 let notice_ids = std::iter::repeat_with(|| self.allocate_transient_id())
685 .map(|(_item_id, global_id)| global_id)
686 .take(global_lir_plan.df_meta().optimizer_notices.len())
687 .collect::<Vec<_>>();
688
689 let transact_result = self
690 .catalog_transact_with_side_effects(Some(ctx), ops, move |coord, ctx| {
691 Box::pin(async move {
692 let output_desc = global_lir_plan.desc().clone();
693 let (mut df_desc, df_meta) = global_lir_plan.unapply();
694
695 coord
697 .catalog_mut()
698 .set_optimized_plan(global_id, global_mir_plan.df_desc().clone());
699 coord
700 .catalog_mut()
701 .set_physical_plan(global_id, df_desc.clone());
702
703 let notice_builtin_updates_fut = coord
704 .process_dataflow_metainfo(df_meta, global_id, ctx, notice_ids)
705 .await;
706
707 df_desc.set_as_of(dataflow_as_of.clone());
708 df_desc.set_initial_as_of(initial_as_of);
709 df_desc.until = until;
710
711 let storage_metadata = coord.catalog.state().storage_metadata();
712
713 let mut collection_desc =
714 CollectionDescription::for_other(output_desc, Some(storage_as_of));
715 let mut allow_writes = true;
716
717 if let Some(target_id) = replacement_target {
720 let target_gid = coord.catalog.get_entry(&target_id).latest_global_id();
721 collection_desc.primary = Some(target_gid);
722 allow_writes = false;
723 }
724
725 coord
727 .controller
728 .storage
729 .create_collections(
730 storage_metadata,
731 None,
732 vec![(global_id, collection_desc)],
733 )
734 .await
735 .unwrap_or_terminate("cannot fail to append");
736
737 coord
738 .initialize_storage_read_policies(
739 btreeset![item_id],
740 compaction_window.unwrap_or(CompactionWindow::Default),
741 )
742 .await;
743
744 coord
745 .ship_dataflow_and_notice_builtin_table_updates(
746 df_desc,
747 cluster_id,
748 notice_builtin_updates_fut,
749 )
750 .await;
751
752 if allow_writes {
753 coord.allow_writes(cluster_id, global_id);
754 }
755 })
756 })
757 .await;
758
759 match transact_result {
760 Ok(_) => Ok(ExecuteResponse::CreatedMaterializedView),
761 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
762 kind:
763 mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
764 })) if if_not_exists => {
765 ctx.session()
766 .add_notice(AdapterNotice::ObjectAlreadyExists {
767 name: name.item,
768 ty: "materialized view",
769 });
770 Ok(ExecuteResponse::CreatedMaterializedView)
771 }
772 Err(err) => Err(err),
773 }
774 .map(StageResult::Response)
775 }
776
777 fn select_timestamps(
780 &self,
781 id_bundle: CollectionIdBundle,
782 refresh_schedule: Option<&RefreshSchedule>,
783 read_holds: &ReadHolds<mz_repr::Timestamp>,
784 replacement_target: Option<CatalogItemId>,
785 ) -> Result<
786 (
787 Antichain<mz_repr::Timestamp>,
788 Antichain<mz_repr::Timestamp>,
789 Antichain<mz_repr::Timestamp>,
790 ),
791 AdapterError,
792 > {
793 assert!(
794 id_bundle.difference(&read_holds.id_bundle()).is_empty(),
795 "we must have read holds for all involved collections"
796 );
797
798 let least_valid_read = read_holds.least_valid_read();
801 let mut dataflow_as_of = least_valid_read.clone();
802 let mut storage_as_of = least_valid_read.clone();
803
804 if let Some(refresh_schedule) = &refresh_schedule {
814 if let Some(least_valid_read_ts) = least_valid_read.as_option() {
815 if let Some(first_refresh_ts) =
816 refresh_schedule.round_up_timestamp(*least_valid_read_ts)
817 {
818 storage_as_of = Antichain::from_elem(first_refresh_ts);
819 dataflow_as_of.join_assign(
820 &self
821 .greatest_available_read(&id_bundle)
822 .meet(&storage_as_of),
823 );
824 } else {
825 let last_refresh = refresh_schedule.last_refresh().expect(
826 "if round_up_timestamp returned None, then there should be a last refresh",
827 );
828
829 return Err(AdapterError::MaterializedViewWouldNeverRefresh(
830 last_refresh,
831 *least_valid_read_ts,
832 ));
833 }
834 } else {
835 soft_panic_or_log!("creating a materialized view with an empty `as_of`");
837 }
838 }
839
840 let until_ts = refresh_schedule
844 .and_then(|s| s.last_refresh())
845 .and_then(|r| r.try_step_forward());
846 let until = Antichain::from_iter(until_ts);
847
848 if let Some(target_id) = replacement_target {
852 let target_gid = self.catalog().get_entry(&target_id).latest_global_id();
853 let frontiers = self
854 .controller
855 .storage_collections
856 .collection_frontiers(target_gid)
857 .expect("replacement target exists");
858 let lower_bound = frontiers
859 .read_capabilities
860 .iter()
861 .map(|t| t.step_forward())
862 .collect();
863 storage_as_of.join_assign(&lower_bound);
864 }
865
866 Ok((dataflow_as_of, storage_as_of, until))
867 }
868
869 #[instrument]
870 async fn create_materialized_view_explain(
871 &self,
872 session: &Session,
873 CreateMaterializedViewExplain {
874 global_id,
875 plan:
876 plan::CreateMaterializedViewPlan {
877 name,
878 materialized_view:
879 plan::MaterializedView {
880 column_names,
881 cluster_id,
882 ..
883 },
884 ..
885 },
886 df_meta,
887 explain_ctx:
888 ExplainPlanContext {
889 config,
890 format,
891 stage,
892 optimizer_trace,
893 ..
894 },
895 ..
896 }: CreateMaterializedViewExplain,
897 ) -> Result<StageResult<Box<CreateMaterializedViewStage>>, AdapterError> {
898 let session_catalog = self.catalog().for_session(session);
899 let expr_humanizer = {
900 let full_name = self.catalog().resolve_full_name(&name, None);
901 let transient_items = btreemap! {
902 global_id => TransientItem::new(
903 Some(full_name.into_parts()),
904 Some(column_names.iter().map(|c| c.to_string()).collect()),
905 )
906 };
907 ExprHumanizerExt::new(transient_items, &session_catalog)
908 };
909
910 let target_cluster = self.catalog().get_cluster(cluster_id);
911
912 let features = OptimizerFeatures::from(self.catalog().system_config())
913 .override_from(&target_cluster.config.features())
914 .override_from(&config.features);
915
916 let rows = optimizer_trace
917 .into_rows(
918 format,
919 &config,
920 &features,
921 &expr_humanizer,
922 None,
923 Some(target_cluster),
924 df_meta,
925 stage,
926 plan::ExplaineeStatementKind::CreateMaterializedView,
927 None,
928 )
929 .await?;
930
931 Ok(StageResult::Response(Self::send_immediate_rows(rows)))
932 }
933
934 pub(crate) async fn explain_pushdown_materialized_view(
935 &self,
936 ctx: ExecuteContext,
937 item_id: CatalogItemId,
938 ) {
939 let CatalogItem::MaterializedView(mview) = self.catalog().get_entry(&item_id).item() else {
940 unreachable!() };
942 let gid = mview.global_id_writes();
943 let mview = mview.clone();
944
945 let Some(plan) = self.catalog().try_get_physical_plan(&gid).cloned() else {
946 let msg = format!("cannot find plan for materialized view {item_id} in catalog");
947 tracing::error!("{msg}");
948 ctx.retire(Err(anyhow!("{msg}").into()));
949 return;
950 };
951
952 let read_holds =
956 Some(self.acquire_read_holds(&dataflow_import_id_bundle(&plan, mview.cluster_id)));
957
958 let frontiers = self
959 .controller
960 .compute
961 .collection_frontiers(gid, Some(mview.cluster_id))
962 .expect("materialized view exists");
963
964 let as_of = frontiers.read_frontier.to_owned();
965
966 let until = mview
967 .refresh_schedule
968 .as_ref()
969 .and_then(|s| s.last_refresh())
970 .unwrap_or(mz_repr::Timestamp::MAX);
971
972 let mz_now = match as_of.as_option() {
973 Some(&as_of) => {
974 ResultSpec::value_between(Datum::MzTimestamp(as_of), Datum::MzTimestamp(until))
975 }
976 None => ResultSpec::value_all(),
977 };
978
979 self.execute_explain_pushdown_with_read_holds(
980 ctx,
981 as_of,
982 mz_now,
983 read_holds,
984 plan.source_imports
985 .into_iter()
986 .filter_map(|(id, (source, _, _upper))| {
987 source.arguments.operators.map(|mfp| (id, mfp))
988 }),
989 )
990 .await
991 }
992}