1use anyhow::anyhow;
11use differential_dataflow::lattice::Lattice;
12use maplit::btreemap;
13use maplit::btreeset;
14use mz_adapter_types::compaction::CompactionWindow;
15use mz_catalog::memory::objects::{CatalogItem, MaterializedView};
16use mz_expr::{CollectionPlan, ResultSpec};
17use mz_ore::collections::CollectionExt;
18use mz_ore::instrument;
19use mz_ore::soft_panic_or_log;
20use mz_repr::explain::{ExprHumanizerExt, TransientItem};
21use mz_repr::optimize::OptimizerFeatures;
22use mz_repr::optimize::OverrideFrom;
23use mz_repr::refresh_schedule::RefreshSchedule;
24use mz_repr::{CatalogItemId, Datum, RelationVersion, Row, VersionedRelationDesc};
25use mz_sql::ast::ExplainStage;
26use mz_sql::catalog::CatalogError;
27use mz_sql::names::ResolvedIds;
28use mz_sql::plan;
29use mz_sql::session::metadata::SessionMetadata;
30use mz_sql_parser::ast;
31use mz_sql_parser::ast::display::AstDisplay;
32use mz_storage_client::controller::CollectionDescription;
33use std::collections::BTreeMap;
34use timely::progress::Antichain;
35use tracing::Span;
36
37use crate::ReadHolds;
38use crate::catalog::CatalogState;
39use crate::command::ExecuteResponse;
40use crate::coord::sequencer::inner::return_if_err;
41use crate::coord::{
42 Coordinator, CreateMaterializedViewExplain, CreateMaterializedViewFinish,
43 CreateMaterializedViewOptimize, CreateMaterializedViewStage, ExplainContext,
44 ExplainPlanContext, Message, PlanValidity, StageResult, Staged,
45};
46use crate::error::AdapterError;
47use crate::explain::explain_dataflow;
48use crate::explain::explain_plan;
49use crate::explain::optimizer_trace::OptimizerTrace;
50use crate::optimize::dataflows::dataflow_import_id_bundle;
51use crate::optimize::{self, Optimize};
52use crate::session::Session;
53use crate::util::ResultExt;
54use crate::{AdapterNotice, CollectionIdBundle, ExecuteContext, TimestampProvider, catalog};
55
56impl Staged for CreateMaterializedViewStage {
57 type Ctx = ExecuteContext;
58
59 fn validity(&mut self) -> &mut PlanValidity {
60 match self {
61 Self::Optimize(stage) => &mut stage.validity,
62 Self::Finish(stage) => &mut stage.validity,
63 Self::Explain(stage) => &mut stage.validity,
64 }
65 }
66
67 async fn stage(
68 self,
69 coord: &mut Coordinator,
70 ctx: &mut ExecuteContext,
71 ) -> Result<StageResult<Box<Self>>, AdapterError> {
72 match self {
73 CreateMaterializedViewStage::Optimize(stage) => {
74 coord.create_materialized_view_optimize(stage).await
75 }
76 CreateMaterializedViewStage::Finish(stage) => {
77 coord.create_materialized_view_finish(ctx, stage).await
78 }
79 CreateMaterializedViewStage::Explain(stage) => {
80 coord
81 .create_materialized_view_explain(ctx.session(), stage)
82 .await
83 }
84 }
85 }
86
87 fn message(self, ctx: ExecuteContext, span: Span) -> Message {
88 Message::CreateMaterializedViewStageReady {
89 ctx,
90 span,
91 stage: self,
92 }
93 }
94
95 fn cancel_enabled(&self) -> bool {
96 true
97 }
98}
99
100impl Coordinator {
101 #[instrument]
102 pub(crate) async fn sequence_create_materialized_view(
103 &mut self,
104 ctx: ExecuteContext,
105 plan: plan::CreateMaterializedViewPlan,
106 resolved_ids: ResolvedIds,
107 ) {
108 let stage = return_if_err!(
109 self.create_materialized_view_validate(
110 ctx.session(),
111 plan,
112 resolved_ids,
113 ExplainContext::None
114 ),
115 ctx
116 );
117 self.sequence_staged(ctx, Span::current(), stage).await;
118 }
119
120 #[instrument]
121 pub(crate) async fn explain_create_materialized_view(
122 &mut self,
123 ctx: ExecuteContext,
124 plan::ExplainPlanPlan {
125 stage,
126 format,
127 config,
128 explainee,
129 }: plan::ExplainPlanPlan,
130 ) {
131 let plan::Explainee::Statement(stmt) = explainee else {
132 unreachable!()
135 };
136 let plan::ExplaineeStatement::CreateMaterializedView { broken, plan } = stmt else {
137 unreachable!()
140 };
141
142 let optimizer_trace = OptimizerTrace::new(stage.paths());
145
146 let resolved_ids = ResolvedIds::empty();
148
149 let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
150 broken,
151 config,
152 format,
153 stage,
154 replan: None,
155 desc: None,
156 optimizer_trace,
157 });
158 let stage = return_if_err!(
159 self.create_materialized_view_validate(ctx.session(), plan, resolved_ids, explain_ctx),
160 ctx
161 );
162 self.sequence_staged(ctx, Span::current(), stage).await;
163 }
164
165 #[instrument]
166 pub(crate) async fn explain_replan_materialized_view(
167 &mut self,
168 ctx: ExecuteContext,
169 plan::ExplainPlanPlan {
170 stage,
171 format,
172 config,
173 explainee,
174 }: plan::ExplainPlanPlan,
175 ) {
176 let plan::Explainee::ReplanMaterializedView(id) = explainee else {
177 unreachable!() };
179 let CatalogItem::MaterializedView(item) = self.catalog().get_entry(&id).item() else {
180 unreachable!() };
182 let gid = item.global_id_writes();
183
184 let create_sql = item.create_sql.clone();
185 let plan_result = self
186 .catalog_mut()
187 .deserialize_plan_with_enable_for_item_parsing(&create_sql, true);
188 let (plan, resolved_ids) = return_if_err!(plan_result, ctx);
189
190 let plan::Plan::CreateMaterializedView(plan) = plan else {
191 unreachable!() };
193
194 let broken = false;
197
198 let optimizer_trace = OptimizerTrace::new(stage.paths());
201
202 let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
203 broken,
204 config,
205 format,
206 stage,
207 replan: Some(gid),
208 desc: None,
209 optimizer_trace,
210 });
211 let stage = return_if_err!(
212 self.create_materialized_view_validate(ctx.session(), plan, resolved_ids, explain_ctx,),
213 ctx
214 );
215 self.sequence_staged(ctx, Span::current(), stage).await;
216 }
217
218 #[instrument]
219 pub(super) fn explain_materialized_view(
220 &self,
221 ctx: &ExecuteContext,
222 plan::ExplainPlanPlan {
223 stage,
224 format,
225 config,
226 explainee,
227 }: plan::ExplainPlanPlan,
228 ) -> Result<ExecuteResponse, AdapterError> {
229 let plan::Explainee::MaterializedView(id) = explainee else {
230 unreachable!() };
232 let CatalogItem::MaterializedView(view) = self.catalog().get_entry(&id).item() else {
233 unreachable!() };
235 let gid = view.global_id_writes();
236
237 let Some(dataflow_metainfo) = self.catalog().try_get_dataflow_metainfo(&gid) else {
238 if !id.is_system() {
239 tracing::error!(
240 "cannot find dataflow metainformation for materialized view {id} in catalog"
241 );
242 }
243 coord_bail!(
244 "cannot find dataflow metainformation for materialized view {id} in catalog"
245 );
246 };
247
248 let target_cluster = self.catalog().get_cluster(view.cluster_id);
249
250 let features = OptimizerFeatures::from(self.catalog().system_config())
251 .override_from(&target_cluster.config.features())
252 .override_from(&self.cluster_scoped_optimizer_overrides(view.cluster_id))
253 .override_from(&config.features);
254
255 let cardinality_stats = BTreeMap::new();
256
257 let explain = match stage {
258 ExplainStage::RawPlan => explain_plan(
259 view.raw_expr.as_ref().clone(),
260 format,
261 &config,
262 &features,
263 &self.catalog().for_session(ctx.session()),
264 cardinality_stats,
265 Some(target_cluster.name.as_str()),
266 )?,
267 ExplainStage::LocalPlan => explain_plan(
268 view.locally_optimized_expr.as_inner().clone(),
269 format,
270 &config,
271 &features,
272 &self.catalog().for_session(ctx.session()),
273 cardinality_stats,
274 Some(target_cluster.name.as_str()),
275 )?,
276 ExplainStage::GlobalPlan => {
277 let Some(plan) = self.catalog().try_get_optimized_plan(&gid).cloned() else {
278 tracing::error!("cannot find {stage} for materialized view {id} in catalog");
279 coord_bail!("cannot find {stage} for materialized view in catalog");
280 };
281 explain_dataflow(
282 plan,
283 format,
284 &config,
285 &features,
286 &self.catalog().for_session(ctx.session()),
287 cardinality_stats,
288 Some(target_cluster.name.as_str()),
289 dataflow_metainfo,
290 )?
291 }
292 ExplainStage::PhysicalPlan => {
293 let Some(plan) = self.catalog().try_get_physical_plan(&gid).cloned() else {
294 tracing::error!("cannot find {stage} for materialized view {id} in catalog",);
295 coord_bail!("cannot find {stage} for materialized view in catalog");
296 };
297 explain_dataflow(
298 plan,
299 format,
300 &config,
301 &features,
302 &self.catalog().for_session(ctx.session()),
303 cardinality_stats,
304 Some(target_cluster.name.as_str()),
305 dataflow_metainfo,
306 )?
307 }
308 _ => {
309 coord_bail!("cannot EXPLAIN {} FOR MATERIALIZED VIEW", stage);
310 }
311 };
312
313 let row = Row::pack_slice(&[Datum::from(explain.as_str())]);
314
315 Ok(Self::send_immediate_rows(row))
316 }
317
318 #[instrument]
319 fn create_materialized_view_validate(
320 &self,
321 session: &Session,
322 plan: plan::CreateMaterializedViewPlan,
323 resolved_ids: ResolvedIds,
324 explain_ctx: ExplainContext,
327 ) -> Result<CreateMaterializedViewStage, AdapterError> {
328 let plan::CreateMaterializedViewPlan {
329 materialized_view:
330 plan::MaterializedView {
331 expr,
332 cluster_id,
333 target_replica,
334 refresh_schedule,
335 ..
336 },
337 ambiguous_columns,
338 ..
339 } = &plan;
340
341 let expr_depends_on = expr.depends_on();
346 self.catalog()
347 .validate_timeline_context(expr_depends_on.iter().copied())?;
348 self.validate_system_column_references(*ambiguous_columns, &expr_depends_on)?;
349 let log_names = expr_depends_on
352 .iter()
353 .map(|gid| self.catalog.resolve_item_id(gid))
354 .flat_map(|item_id| self.catalog().introspection_dependencies(item_id))
355 .map(|item_id| self.catalog().get_entry(&item_id).name().item.clone())
356 .collect::<Vec<_>>();
357 if !log_names.is_empty() {
358 return Err(AdapterError::InvalidLogDependency {
359 object_type: "materialized view".into(),
360 log_names,
361 });
362 }
363
364 let validity = PlanValidity::new(
370 self.catalog().transient_revision(),
371 resolved_ids.items().copied().collect(),
372 Some(*cluster_id),
373 *target_replica,
374 session.role_metadata().clone(),
375 );
376
377 if let Some(refresh_schedule) = refresh_schedule {
379 if !refresh_schedule.ats.is_empty() && matches!(explain_ctx, ExplainContext::None) {
380 let read_holds = self
383 .txn_read_holds
384 .get(session.conn_id())
385 .expect("purification acquired read holds if there are REFRESH ATs");
386 let least_valid_read = read_holds.least_valid_read();
387 for refresh_at_ts in &refresh_schedule.ats {
388 if !least_valid_read.less_equal(refresh_at_ts) {
389 return Err(AdapterError::InputNotReadableAtRefreshAtTime(
390 *refresh_at_ts,
391 least_valid_read,
392 ));
393 }
394 }
395 let ids = self
398 .index_oracle(*cluster_id)
399 .sufficient_collections(resolved_ids.collections().copied());
400 if !ids.difference(&read_holds.id_bundle()).is_empty() {
401 return Err(AdapterError::ChangedPlan(
402 "the set of possible inputs changed during the creation of the \
403 materialized view"
404 .to_string(),
405 ));
406 }
407 }
408 }
409
410 Ok(CreateMaterializedViewStage::Optimize(
411 CreateMaterializedViewOptimize {
412 validity,
413 plan,
414 resolved_ids,
415 explain_ctx,
416 },
417 ))
418 }
419
420 #[instrument]
421 async fn create_materialized_view_optimize(
422 &mut self,
423 CreateMaterializedViewOptimize {
424 validity,
425 plan,
426 resolved_ids,
427 explain_ctx,
428 }: CreateMaterializedViewOptimize,
429 ) -> Result<StageResult<Box<CreateMaterializedViewStage>>, AdapterError> {
430 let plan::CreateMaterializedViewPlan {
431 name,
432 materialized_view:
433 plan::MaterializedView {
434 column_names,
435 cluster_id,
436 non_null_assertions,
437 refresh_schedule,
438 ..
439 },
440 ..
441 } = &plan;
442
443 let compute_instance = self
445 .instance_snapshot(*cluster_id)
446 .expect("compute instance does not exist");
447 let (item_id, global_id) = if let ExplainContext::None = explain_ctx {
448 self.allocate_user_id().await?
449 } else {
450 self.allocate_transient_id()
451 };
452
453 let (_, view_id) = self.allocate_transient_id();
454 let debug_name = self.catalog().resolve_full_name(name, None).to_string();
455 let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config())
456 .override_from(&self.catalog.get_cluster(*cluster_id).config.features())
457 .override_from(&self.cluster_scoped_optimizer_overrides(*cluster_id))
458 .override_from(&explain_ctx);
459 let optimizer_features = optimizer_config.features.clone();
460
461 let mut optimizer = optimize::materialized_view::Optimizer::new(
463 self.owned_catalog().as_optimizer_catalog(),
464 compute_instance,
465 global_id,
466 view_id,
467 column_names.clone(),
468 non_null_assertions.clone(),
469 refresh_schedule.clone(),
470 debug_name,
471 optimizer_config,
472 self.optimizer_metrics(),
473 );
474
475 let span = Span::current();
476 Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
477 || "optimize create materialized view",
478 move || {
479 span.in_scope(|| {
480 let mut pipeline = || -> Result<(
481 optimize::materialized_view::LocalMirPlan,
482 optimize::materialized_view::GlobalMirPlan,
483 optimize::materialized_view::GlobalLirPlan,
484 ), AdapterError> {
485 let _dispatch_guard = explain_ctx.dispatch_guard();
486
487 let raw_expr = plan.materialized_view.expr.clone();
488
489 let local_mir_plan = optimizer.catch_unwind_optimize(raw_expr)?;
491 let global_mir_plan =
492 optimizer.catch_unwind_optimize(local_mir_plan.clone())?;
493 let global_lir_plan =
495 optimizer.catch_unwind_optimize(global_mir_plan.clone())?;
496
497 Ok((local_mir_plan, global_mir_plan, global_lir_plan))
498 };
499
500 let stage = match pipeline() {
501 Ok((local_mir_plan, global_mir_plan, global_lir_plan)) => {
502 if let ExplainContext::Plan(explain_ctx) = explain_ctx {
503 let (_, df_meta) = global_lir_plan.unapply();
504 CreateMaterializedViewStage::Explain(
505 CreateMaterializedViewExplain {
506 validity,
507 global_id,
508 plan,
509 df_meta,
510 explain_ctx,
511 },
512 )
513 } else {
514 CreateMaterializedViewStage::Finish(CreateMaterializedViewFinish {
515 item_id,
516 global_id,
517 validity,
518 plan,
519 resolved_ids,
520 local_mir_plan,
521 global_mir_plan,
522 global_lir_plan,
523 optimizer_features,
524 })
525 }
526 }
527 Err(err) => {
530 let ExplainContext::Plan(explain_ctx) = explain_ctx else {
531 return Err(err);
533 };
534
535 if explain_ctx.broken {
536 tracing::error!("error while handling EXPLAIN statement: {}", err);
540 CreateMaterializedViewStage::Explain(
541 CreateMaterializedViewExplain {
542 global_id,
543 validity,
544 plan,
545 df_meta: Default::default(),
546 explain_ctx,
547 },
548 )
549 } else {
550 return Err(err);
552 }
553 }
554 };
555
556 Ok(Box::new(stage))
557 })
558 },
559 )))
560 }
561
562 #[instrument]
563 async fn create_materialized_view_finish(
564 &mut self,
565 ctx: &mut ExecuteContext,
566 stage: CreateMaterializedViewFinish,
567 ) -> Result<StageResult<Box<CreateMaterializedViewStage>>, AdapterError> {
568 let CreateMaterializedViewFinish {
569 item_id,
570 global_id,
571 plan:
572 plan::CreateMaterializedViewPlan {
573 name,
574 materialized_view:
575 plan::MaterializedView {
576 mut create_sql,
577 expr: raw_expr,
578 column_names,
579 dependencies,
580 replacement_target,
581 cluster_id,
582 target_replica,
583 non_null_assertions,
584 compaction_window,
585 refresh_schedule,
586 ..
587 },
588 drop_ids,
589 if_not_exists,
590 ..
591 },
592 resolved_ids,
593 local_mir_plan,
594 global_mir_plan,
595 global_lir_plan,
596 optimizer_features,
597 ..
598 } = stage;
599
600 if let Some(target_id) = replacement_target {
602 let Some(target) = self.catalog().get_entry(&target_id).materialized_view() else {
603 return Err(AdapterError::internal(
604 "create materialized view",
605 "replacement target not a materialized view",
606 ));
607 };
608
609 let schema_diff = target.desc.latest().diff(global_lir_plan.desc());
611 if !schema_diff.is_empty() {
612 return Err(AdapterError::ReplacementSchemaMismatch(schema_diff));
613 }
614 }
615
616 let id_bundle = dataflow_import_id_bundle(global_lir_plan.df_desc(), cluster_id);
618
619 let read_holds_owned;
620 let read_holds = if let Some(txn_reads) = self.txn_read_holds.get(ctx.session().conn_id()) {
621 txn_reads
625 } else {
626 read_holds_owned = self.acquire_read_holds(&id_bundle);
629 &read_holds_owned
630 };
631
632 let (dataflow_as_of, storage_as_of, until) =
633 self.select_timestamps(id_bundle, refresh_schedule.as_ref(), read_holds)?;
634
635 tracing::info!(
636 dataflow_as_of = ?dataflow_as_of,
637 storage_as_of = ?storage_as_of,
638 until = ?until,
639 "materialized view timestamp selection",
640 );
641
642 let initial_as_of = storage_as_of.clone();
643
644 if let Some(storage_as_of_ts) = storage_as_of.as_option() {
649 let stmt = mz_sql::parse::parse(&create_sql)
650 .map_err(|_| {
651 AdapterError::internal(
652 "create materialized view",
653 "original SQL should roundtrip",
654 )
655 })?
656 .into_element()
657 .ast;
658 let ast::Statement::CreateMaterializedView(mut stmt) = stmt else {
659 panic!("unexpected statement type");
660 };
661 stmt.as_of = Some(storage_as_of_ts.into());
662 create_sql = stmt.to_ast_string_stable();
663 }
664
665 let desc = VersionedRelationDesc::new(global_lir_plan.desc().clone());
666 let collections = [(RelationVersion::root(), global_id)].into_iter().collect();
667
668 let local_mir_for_cache = local_mir_plan.expr();
669
670 let ops = vec![
671 catalog::Op::DropObjects(
672 drop_ids
673 .into_iter()
674 .map(catalog::DropObjectInfo::Item)
675 .collect(),
676 ),
677 catalog::Op::CreateItem {
678 id: item_id,
679 name: name.clone(),
680 item: CatalogItem::MaterializedView(MaterializedView {
681 create_sql,
682 raw_expr: raw_expr.into(),
683 locally_optimized_expr: local_mir_plan.expr().into(),
684 desc,
685 collections,
686 resolved_ids,
687 dependencies,
688 replacement_target,
689 cluster_id,
690 target_replica,
691 non_null_assertions,
692 custom_logical_compaction_window: compaction_window,
693 refresh_schedule: refresh_schedule.clone(),
694 initial_as_of: Some(initial_as_of.clone()),
695 optimized_plan: None,
696 physical_plan: None,
697 dataflow_metainfo: None,
698 }),
699 owner_id: *ctx.session().current_role_id(),
700 },
701 ];
702
703 let notice_ids = std::iter::repeat_with(|| self.allocate_transient_id())
705 .map(|(_item_id, global_id)| global_id)
706 .take(global_lir_plan.df_meta().optimizer_notices.len())
707 .collect::<Vec<_>>();
708
709 let output_desc = global_lir_plan.desc().clone();
722 let (mut df_desc, raw_df_meta) = global_lir_plan.unapply();
723 let df_meta = {
724 let system_catalog = self.catalog().for_system_session();
725 let full_name = self.catalog().resolve_full_name(&name, None);
726 let transient_items = btreemap! {
727 global_id => TransientItem::new(
728 Some(full_name.into_parts()),
729 Some(column_names.iter().map(|c| c.to_string()).collect()),
730 )
731 };
732 let humanizer = ExprHumanizerExt::new(transient_items, &system_catalog);
733 CatalogState::render_notices_core(
734 &humanizer,
735 (self.catalog().config().now)(),
736 &raw_df_meta,
737 notice_ids,
738 Some(global_id),
739 )
740 };
741
742 self.catalog()
747 .cache_expressions(
748 global_id,
749 Some(local_mir_for_cache),
750 global_mir_plan.df_desc().clone(),
751 df_desc.clone(),
752 df_meta.clone(),
753 optimizer_features,
754 )
755 .await;
756
757 let transact_result = self
758 .catalog_transact_with_side_effects(Some(ctx), ops, move |coord, _ctx| {
759 Box::pin(async move {
760 coord
762 .catalog_mut()
763 .set_optimized_plan(global_id, global_mir_plan.df_desc().clone());
764 coord
765 .catalog_mut()
766 .set_physical_plan(global_id, df_desc.clone());
767
768 let notice_builtin_updates_fut =
769 coord.persist_dataflow_metainfo(df_meta, global_id).await;
770
771 df_desc.set_as_of(dataflow_as_of.clone());
772 df_desc.set_initial_as_of(initial_as_of);
773 df_desc.until = until;
774
775 let storage_metadata = coord.catalog.state().storage_metadata();
776
777 let mut collection_desc =
778 CollectionDescription::for_other(output_desc, Some(storage_as_of));
779 let mut allow_writes = true;
780
781 if let Some(target_id) = replacement_target {
784 let target_gid = coord.catalog.get_entry(&target_id).latest_global_id();
785 collection_desc.primary = Some(target_gid);
786 allow_writes = false;
787 }
788
789 coord
791 .controller
792 .storage
793 .create_collections(
794 storage_metadata,
795 None,
796 vec![(global_id, collection_desc)],
797 )
798 .await
799 .unwrap_or_terminate("cannot fail to append");
800
801 coord
802 .initialize_storage_read_policies(
803 btreeset![item_id],
804 compaction_window.unwrap_or(CompactionWindow::Default),
805 )
806 .await;
807
808 coord
809 .ship_dataflow_and_notice_builtin_table_updates(
810 df_desc,
811 cluster_id,
812 notice_builtin_updates_fut,
813 target_replica,
814 )
815 .await;
816
817 if allow_writes {
818 coord.allow_writes(cluster_id, global_id);
819 }
820 })
821 })
822 .await;
823
824 match transact_result {
825 Ok(_) => {
826 self.emit_raw_optimizer_notices_to_user(ctx, &raw_df_meta.optimizer_notices);
831 Ok(ExecuteResponse::CreatedMaterializedView)
832 }
833 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
834 kind:
835 mz_catalog::memory::error::ErrorKind::Sql(
836 CatalogError::ItemAlreadyExists(_, _),
837 ),
838 })) if if_not_exists => {
839 ctx.session()
840 .add_notice(AdapterNotice::ObjectAlreadyExists {
841 name: name.item,
842 ty: "materialized view",
843 });
844 Ok(ExecuteResponse::CreatedMaterializedView)
845 }
846 Err(err) => Err(err),
847 }
848 .map(StageResult::Response)
849 }
850
851 fn select_timestamps(
854 &self,
855 id_bundle: CollectionIdBundle,
856 refresh_schedule: Option<&RefreshSchedule>,
857 read_holds: &ReadHolds,
858 ) -> Result<
859 (
860 Antichain<mz_repr::Timestamp>,
861 Antichain<mz_repr::Timestamp>,
862 Antichain<mz_repr::Timestamp>,
863 ),
864 AdapterError,
865 > {
866 assert!(
867 id_bundle.difference(&read_holds.id_bundle()).is_empty(),
868 "we must have read holds for all involved collections"
869 );
870
871 let least_valid_read = read_holds.least_valid_read();
874 let mut dataflow_as_of = least_valid_read.clone();
875 let mut storage_as_of = least_valid_read.clone();
876
877 if let Some(refresh_schedule) = &refresh_schedule {
887 if let Some(least_valid_read_ts) = least_valid_read.as_option() {
888 if let Some(first_refresh_ts) =
889 refresh_schedule.round_up_timestamp(*least_valid_read_ts)
890 {
891 storage_as_of = Antichain::from_elem(first_refresh_ts);
892 dataflow_as_of.join_assign(
893 &self
894 .greatest_available_read(&id_bundle)
895 .meet(&storage_as_of),
896 );
897 } else {
898 let last_refresh = refresh_schedule.last_refresh().expect(
899 "if round_up_timestamp returned None, then there should be a last refresh",
900 );
901
902 return Err(AdapterError::MaterializedViewWouldNeverRefresh(
903 last_refresh,
904 *least_valid_read_ts,
905 ));
906 }
907 } else {
908 soft_panic_or_log!("creating a materialized view with an empty `as_of`");
910 }
911 }
912
913 let until_ts = refresh_schedule
917 .and_then(|s| s.last_refresh())
918 .and_then(|r| r.try_step_forward());
919 let until = Antichain::from_iter(until_ts);
920
921 Ok((dataflow_as_of, storage_as_of, until))
922 }
923
924 #[instrument]
925 async fn create_materialized_view_explain(
926 &self,
927 session: &Session,
928 CreateMaterializedViewExplain {
929 global_id,
930 plan:
931 plan::CreateMaterializedViewPlan {
932 name,
933 materialized_view:
934 plan::MaterializedView {
935 column_names,
936 cluster_id,
937 ..
938 },
939 ..
940 },
941 df_meta,
942 explain_ctx:
943 ExplainPlanContext {
944 config,
945 format,
946 stage,
947 optimizer_trace,
948 ..
949 },
950 ..
951 }: CreateMaterializedViewExplain,
952 ) -> Result<StageResult<Box<CreateMaterializedViewStage>>, AdapterError> {
953 let session_catalog = self.catalog().for_session(session);
954 let expr_humanizer = {
955 let full_name = self.catalog().resolve_full_name(&name, None);
956 let transient_items = btreemap! {
957 global_id => TransientItem::new(
958 Some(full_name.into_parts()),
959 Some(column_names.iter().map(|c| c.to_string()).collect()),
960 )
961 };
962 ExprHumanizerExt::new(transient_items, &session_catalog)
963 };
964
965 let target_cluster = self.catalog().get_cluster(cluster_id);
966
967 let features = OptimizerFeatures::from(self.catalog().system_config())
968 .override_from(&target_cluster.config.features())
969 .override_from(&self.cluster_scoped_optimizer_overrides(cluster_id))
970 .override_from(&config.features);
971
972 let rows = optimizer_trace
973 .into_rows(
974 format,
975 &config,
976 &features,
977 &expr_humanizer,
978 None,
979 Some(target_cluster),
980 df_meta,
981 stage,
982 plan::ExplaineeStatementKind::CreateMaterializedView,
983 None,
984 )
985 .await?;
986
987 Ok(StageResult::Response(Self::send_immediate_rows(rows)))
988 }
989
990 pub(crate) async fn explain_pushdown_materialized_view(
991 &self,
992 ctx: ExecuteContext,
993 item_id: CatalogItemId,
994 ) {
995 let CatalogItem::MaterializedView(mview) = self.catalog().get_entry(&item_id).item() else {
996 unreachable!() };
998 let gid = mview.global_id_writes();
999 let mview = mview.clone();
1000
1001 let Some(plan) = self.catalog().try_get_physical_plan(&gid).cloned() else {
1002 let msg = format!("cannot find plan for materialized view {item_id} in catalog");
1003 tracing::error!("{msg}");
1004 ctx.retire(Err(anyhow!("{msg}").into()));
1005 return;
1006 };
1007
1008 let read_holds =
1012 Some(self.acquire_read_holds(&dataflow_import_id_bundle(&plan, mview.cluster_id)));
1013
1014 let frontiers = self
1015 .controller
1016 .compute
1017 .collection_frontiers(gid, Some(mview.cluster_id))
1018 .expect("materialized view exists");
1019
1020 let as_of = frontiers.read_frontier.to_owned();
1021
1022 let until = mview
1023 .refresh_schedule
1024 .as_ref()
1025 .and_then(|s| s.last_refresh())
1026 .unwrap_or(mz_repr::Timestamp::MAX);
1027
1028 let mz_now = match as_of.as_option() {
1029 Some(&as_of) => {
1030 ResultSpec::value_between(Datum::MzTimestamp(as_of), Datum::MzTimestamp(until))
1031 }
1032 None => ResultSpec::value_all(),
1033 };
1034
1035 self.execute_explain_pushdown_with_read_holds(
1036 ctx,
1037 as_of,
1038 mz_now,
1039 read_holds,
1040 plan.source_imports
1041 .into_iter()
1042 .filter_map(|(id, import)| import.desc.arguments.operators.map(|mfp| (id, mfp))),
1043 )
1044 .await
1045 }
1046}