Skip to main content

mz_adapter/coord/sequencer/inner/
create_materialized_view.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use anyhow::anyhow;
11use differential_dataflow::lattice::Lattice;
12use maplit::btreemap;
13use maplit::btreeset;
14use mz_adapter_types::compaction::CompactionWindow;
15use mz_catalog::memory::objects::{CatalogItem, MaterializedView};
16use mz_expr::{CollectionPlan, ResultSpec};
17use mz_ore::collections::CollectionExt;
18use mz_ore::instrument;
19use mz_ore::soft_panic_or_log;
20use mz_repr::explain::{ExprHumanizerExt, TransientItem};
21use mz_repr::optimize::OptimizerFeatures;
22use mz_repr::optimize::OverrideFrom;
23use mz_repr::refresh_schedule::RefreshSchedule;
24use mz_repr::{CatalogItemId, Datum, RelationVersion, Row, VersionedRelationDesc};
25use mz_sql::ast::ExplainStage;
26use mz_sql::catalog::CatalogError;
27use mz_sql::names::ResolvedIds;
28use mz_sql::plan;
29use mz_sql::session::metadata::SessionMetadata;
30use mz_sql_parser::ast;
31use mz_sql_parser::ast::display::AstDisplay;
32use mz_storage_client::controller::CollectionDescription;
33use std::collections::BTreeMap;
34use timely::progress::Antichain;
35use tracing::Span;
36
37use crate::ReadHolds;
38use crate::catalog::CatalogState;
39use crate::command::ExecuteResponse;
40use crate::coord::sequencer::inner::return_if_err;
41use crate::coord::{
42    Coordinator, CreateMaterializedViewExplain, CreateMaterializedViewFinish,
43    CreateMaterializedViewOptimize, CreateMaterializedViewStage, ExplainContext,
44    ExplainPlanContext, Message, PlanValidity, StageResult, Staged,
45};
46use crate::error::AdapterError;
47use crate::explain::explain_dataflow;
48use crate::explain::explain_plan;
49use crate::explain::optimizer_trace::OptimizerTrace;
50use crate::optimize::dataflows::dataflow_import_id_bundle;
51use crate::optimize::{self, Optimize};
52use crate::session::Session;
53use crate::util::ResultExt;
54use crate::{AdapterNotice, CollectionIdBundle, ExecuteContext, TimestampProvider, catalog};
55
56impl Staged for CreateMaterializedViewStage {
57    type Ctx = ExecuteContext;
58
59    fn validity(&mut self) -> &mut PlanValidity {
60        match self {
61            Self::Optimize(stage) => &mut stage.validity,
62            Self::Finish(stage) => &mut stage.validity,
63            Self::Explain(stage) => &mut stage.validity,
64        }
65    }
66
67    async fn stage(
68        self,
69        coord: &mut Coordinator,
70        ctx: &mut ExecuteContext,
71    ) -> Result<StageResult<Box<Self>>, AdapterError> {
72        match self {
73            CreateMaterializedViewStage::Optimize(stage) => {
74                coord.create_materialized_view_optimize(stage).await
75            }
76            CreateMaterializedViewStage::Finish(stage) => {
77                coord.create_materialized_view_finish(ctx, stage).await
78            }
79            CreateMaterializedViewStage::Explain(stage) => {
80                coord
81                    .create_materialized_view_explain(ctx.session(), stage)
82                    .await
83            }
84        }
85    }
86
87    fn message(self, ctx: ExecuteContext, span: Span) -> Message {
88        Message::CreateMaterializedViewStageReady {
89            ctx,
90            span,
91            stage: self,
92        }
93    }
94
95    fn cancel_enabled(&self) -> bool {
96        true
97    }
98}
99
100impl Coordinator {
101    #[instrument]
102    pub(crate) async fn sequence_create_materialized_view(
103        &mut self,
104        ctx: ExecuteContext,
105        plan: plan::CreateMaterializedViewPlan,
106        resolved_ids: ResolvedIds,
107    ) {
108        let stage = return_if_err!(
109            self.create_materialized_view_validate(
110                ctx.session(),
111                plan,
112                resolved_ids,
113                ExplainContext::None
114            ),
115            ctx
116        );
117        self.sequence_staged(ctx, Span::current(), stage).await;
118    }
119
120    #[instrument]
121    pub(crate) async fn explain_create_materialized_view(
122        &mut self,
123        ctx: ExecuteContext,
124        plan::ExplainPlanPlan {
125            stage,
126            format,
127            config,
128            explainee,
129        }: plan::ExplainPlanPlan,
130    ) {
131        let plan::Explainee::Statement(stmt) = explainee else {
132            // This is currently asserted in the `sequence_explain_plan` code that
133            // calls this method.
134            unreachable!()
135        };
136        let plan::ExplaineeStatement::CreateMaterializedView { broken, plan } = stmt else {
137            // This is currently asserted in the `sequence_explain_plan` code that
138            // calls this method.
139            unreachable!()
140        };
141
142        // Create an OptimizerTrace instance to collect plans emitted when
143        // executing the optimizer pipeline.
144        let optimizer_trace = OptimizerTrace::new(stage.paths());
145
146        // Not used in the EXPLAIN path so it's OK to generate a dummy value.
147        let resolved_ids = ResolvedIds::empty();
148
149        let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
150            broken,
151            config,
152            format,
153            stage,
154            replan: None,
155            desc: None,
156            optimizer_trace,
157        });
158        let stage = return_if_err!(
159            self.create_materialized_view_validate(ctx.session(), plan, resolved_ids, explain_ctx),
160            ctx
161        );
162        self.sequence_staged(ctx, Span::current(), stage).await;
163    }
164
165    #[instrument]
166    pub(crate) async fn explain_replan_materialized_view(
167        &mut self,
168        ctx: ExecuteContext,
169        plan::ExplainPlanPlan {
170            stage,
171            format,
172            config,
173            explainee,
174        }: plan::ExplainPlanPlan,
175    ) {
176        let plan::Explainee::ReplanMaterializedView(id) = explainee else {
177            unreachable!() // Asserted in `sequence_explain_plan`.
178        };
179        let CatalogItem::MaterializedView(item) = self.catalog().get_entry(&id).item() else {
180            unreachable!() // Asserted in `plan_explain_plan`.
181        };
182        let gid = item.global_id_writes();
183
184        let create_sql = item.create_sql.clone();
185        let plan_result = self
186            .catalog_mut()
187            .deserialize_plan_with_enable_for_item_parsing(&create_sql, true);
188        let (plan, resolved_ids) = return_if_err!(plan_result, ctx);
189
190        let plan::Plan::CreateMaterializedView(plan) = plan else {
191            unreachable!() // We are parsing the `create_sql` of a `MaterializedView` item.
192        };
193
194        // It is safe to assume that query optimization will always succeed, so
195        // for now we statically assume `broken = false`.
196        let broken = false;
197
198        // Create an OptimizerTrace instance to collect plans emitted when
199        // executing the optimizer pipeline.
200        let optimizer_trace = OptimizerTrace::new(stage.paths());
201
202        let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
203            broken,
204            config,
205            format,
206            stage,
207            replan: Some(gid),
208            desc: None,
209            optimizer_trace,
210        });
211        let stage = return_if_err!(
212            self.create_materialized_view_validate(ctx.session(), plan, resolved_ids, explain_ctx,),
213            ctx
214        );
215        self.sequence_staged(ctx, Span::current(), stage).await;
216    }
217
218    #[instrument]
219    pub(super) fn explain_materialized_view(
220        &self,
221        ctx: &ExecuteContext,
222        plan::ExplainPlanPlan {
223            stage,
224            format,
225            config,
226            explainee,
227        }: plan::ExplainPlanPlan,
228    ) -> Result<ExecuteResponse, AdapterError> {
229        let plan::Explainee::MaterializedView(id) = explainee else {
230            unreachable!() // Asserted in `sequence_explain_plan`.
231        };
232        let CatalogItem::MaterializedView(view) = self.catalog().get_entry(&id).item() else {
233            unreachable!() // Asserted in `plan_explain_plan`.
234        };
235        let gid = view.global_id_writes();
236
237        let Some(dataflow_metainfo) = self.catalog().try_get_dataflow_metainfo(&gid) else {
238            if !id.is_system() {
239                tracing::error!(
240                    "cannot find dataflow metainformation for materialized view {id} in catalog"
241                );
242            }
243            coord_bail!(
244                "cannot find dataflow metainformation for materialized view {id} in catalog"
245            );
246        };
247
248        let target_cluster = self.catalog().get_cluster(view.cluster_id);
249
250        let features = OptimizerFeatures::from(self.catalog().system_config())
251            .override_from(&target_cluster.config.features())
252            .override_from(&self.cluster_scoped_optimizer_overrides(view.cluster_id))
253            .override_from(&config.features);
254
255        let cardinality_stats = BTreeMap::new();
256
257        let explain = match stage {
258            ExplainStage::RawPlan => explain_plan(
259                view.raw_expr.as_ref().clone(),
260                format,
261                &config,
262                &features,
263                &self.catalog().for_session(ctx.session()),
264                cardinality_stats,
265                Some(target_cluster.name.as_str()),
266            )?,
267            ExplainStage::LocalPlan => explain_plan(
268                view.locally_optimized_expr.as_inner().clone(),
269                format,
270                &config,
271                &features,
272                &self.catalog().for_session(ctx.session()),
273                cardinality_stats,
274                Some(target_cluster.name.as_str()),
275            )?,
276            ExplainStage::GlobalPlan => {
277                let Some(plan) = self.catalog().try_get_optimized_plan(&gid).cloned() else {
278                    tracing::error!("cannot find {stage} for materialized view {id} in catalog");
279                    coord_bail!("cannot find {stage} for materialized view in catalog");
280                };
281                explain_dataflow(
282                    plan,
283                    format,
284                    &config,
285                    &features,
286                    &self.catalog().for_session(ctx.session()),
287                    cardinality_stats,
288                    Some(target_cluster.name.as_str()),
289                    dataflow_metainfo,
290                )?
291            }
292            ExplainStage::PhysicalPlan => {
293                let Some(plan) = self.catalog().try_get_physical_plan(&gid).cloned() else {
294                    tracing::error!("cannot find {stage} for materialized view {id} in catalog",);
295                    coord_bail!("cannot find {stage} for materialized view in catalog");
296                };
297                explain_dataflow(
298                    plan,
299                    format,
300                    &config,
301                    &features,
302                    &self.catalog().for_session(ctx.session()),
303                    cardinality_stats,
304                    Some(target_cluster.name.as_str()),
305                    dataflow_metainfo,
306                )?
307            }
308            _ => {
309                coord_bail!("cannot EXPLAIN {} FOR MATERIALIZED VIEW", stage);
310            }
311        };
312
313        let row = Row::pack_slice(&[Datum::from(explain.as_str())]);
314
315        Ok(Self::send_immediate_rows(row))
316    }
317
318    #[instrument]
319    fn create_materialized_view_validate(
320        &self,
321        session: &Session,
322        plan: plan::CreateMaterializedViewPlan,
323        resolved_ids: ResolvedIds,
324        // An optional context set iff the state machine is initiated from
325        // sequencing an EXPLAIN for this statement.
326        explain_ctx: ExplainContext,
327    ) -> Result<CreateMaterializedViewStage, AdapterError> {
328        let plan::CreateMaterializedViewPlan {
329            materialized_view:
330                plan::MaterializedView {
331                    expr,
332                    cluster_id,
333                    target_replica,
334                    refresh_schedule,
335                    ..
336                },
337            ambiguous_columns,
338            ..
339        } = &plan;
340
341        // Validate any references in the materialized view's expression. We do
342        // this on the unoptimized plan to better reflect what the user typed.
343        // We want to reject queries that depend on log sources, for example,
344        // even if we can *technically* optimize that reference away.
345        let expr_depends_on = expr.depends_on();
346        self.catalog()
347            .validate_timeline_context(expr_depends_on.iter().copied())?;
348        self.validate_system_column_references(*ambiguous_columns, &expr_depends_on)?;
349        // Materialized views are not allowed to depend on log sources, as replicas
350        // are not producing the same definite collection for these.
351        let log_names = expr_depends_on
352            .iter()
353            .map(|gid| self.catalog.resolve_item_id(gid))
354            .flat_map(|item_id| self.catalog().introspection_dependencies(item_id))
355            .map(|item_id| self.catalog().get_entry(&item_id).name().item.clone())
356            .collect::<Vec<_>>();
357        if !log_names.is_empty() {
358            return Err(AdapterError::InvalidLogDependency {
359                object_type: "materialized view".into(),
360                log_names,
361            });
362        }
363
364        // Track the target cluster/replica and resolved dependencies so that
365        // concurrent drops (e.g. `ALTER CLUSTER ... SET (REPLICATION FACTOR
366        // ...)` racing with the off-thread optimizer) are caught between
367        // stages instead of panicking later when the persisted SQL is
368        // re-parsed during catalog application.
369        let validity = PlanValidity::new(
370            self.catalog().transient_revision(),
371            resolved_ids.items().copied().collect(),
372            Some(*cluster_id),
373            *target_replica,
374            session.role_metadata().clone(),
375        );
376
377        // Check whether we can read all inputs at all the REFRESH AT times.
378        if let Some(refresh_schedule) = refresh_schedule {
379            if !refresh_schedule.ats.is_empty() && matches!(explain_ctx, ExplainContext::None) {
380                // Purification has acquired the earliest possible read holds if there are any
381                // REFRESH options.
382                let read_holds = self
383                    .txn_read_holds
384                    .get(session.conn_id())
385                    .expect("purification acquired read holds if there are REFRESH ATs");
386                let least_valid_read = read_holds.least_valid_read();
387                for refresh_at_ts in &refresh_schedule.ats {
388                    if !least_valid_read.less_equal(refresh_at_ts) {
389                        return Err(AdapterError::InputNotReadableAtRefreshAtTime(
390                            *refresh_at_ts,
391                            least_valid_read,
392                        ));
393                    }
394                }
395                // Also check that no new id has appeared in `sufficient_collections` (e.g. a new
396                // index), otherwise we might be missing some read holds.
397                let ids = self
398                    .index_oracle(*cluster_id)
399                    .sufficient_collections(resolved_ids.collections().copied());
400                if !ids.difference(&read_holds.id_bundle()).is_empty() {
401                    return Err(AdapterError::ChangedPlan(
402                        "the set of possible inputs changed during the creation of the \
403                         materialized view"
404                            .to_string(),
405                    ));
406                }
407            }
408        }
409
410        Ok(CreateMaterializedViewStage::Optimize(
411            CreateMaterializedViewOptimize {
412                validity,
413                plan,
414                resolved_ids,
415                explain_ctx,
416            },
417        ))
418    }
419
420    #[instrument]
421    async fn create_materialized_view_optimize(
422        &mut self,
423        CreateMaterializedViewOptimize {
424            validity,
425            plan,
426            resolved_ids,
427            explain_ctx,
428        }: CreateMaterializedViewOptimize,
429    ) -> Result<StageResult<Box<CreateMaterializedViewStage>>, AdapterError> {
430        let plan::CreateMaterializedViewPlan {
431            name,
432            materialized_view:
433                plan::MaterializedView {
434                    column_names,
435                    cluster_id,
436                    non_null_assertions,
437                    refresh_schedule,
438                    ..
439                },
440            ..
441        } = &plan;
442
443        // Collect optimizer parameters.
444        let compute_instance = self
445            .instance_snapshot(*cluster_id)
446            .expect("compute instance does not exist");
447        let (item_id, global_id) = if let ExplainContext::None = explain_ctx {
448            self.allocate_user_id().await?
449        } else {
450            self.allocate_transient_id()
451        };
452
453        let (_, view_id) = self.allocate_transient_id();
454        let debug_name = self.catalog().resolve_full_name(name, None).to_string();
455        let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config())
456            .override_from(&self.catalog.get_cluster(*cluster_id).config.features())
457            .override_from(&self.cluster_scoped_optimizer_overrides(*cluster_id))
458            .override_from(&explain_ctx);
459        let optimizer_features = optimizer_config.features.clone();
460
461        // Build an optimizer for this MATERIALIZED VIEW.
462        let mut optimizer = optimize::materialized_view::Optimizer::new(
463            self.owned_catalog().as_optimizer_catalog(),
464            compute_instance,
465            global_id,
466            view_id,
467            column_names.clone(),
468            non_null_assertions.clone(),
469            refresh_schedule.clone(),
470            debug_name,
471            optimizer_config,
472            self.optimizer_metrics(),
473        );
474
475        let span = Span::current();
476        Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
477            || "optimize create materialized view",
478            move || {
479                span.in_scope(|| {
480                    let mut pipeline = || -> Result<(
481                        optimize::materialized_view::LocalMirPlan,
482                        optimize::materialized_view::GlobalMirPlan,
483                        optimize::materialized_view::GlobalLirPlan,
484                    ), AdapterError> {
485                        let _dispatch_guard = explain_ctx.dispatch_guard();
486
487                        let raw_expr = plan.materialized_view.expr.clone();
488
489                        // HIR ⇒ MIR lowering and MIR ⇒ MIR optimization (local and global)
490                        let local_mir_plan = optimizer.catch_unwind_optimize(raw_expr)?;
491                        let global_mir_plan =
492                            optimizer.catch_unwind_optimize(local_mir_plan.clone())?;
493                        // MIR ⇒ LIR lowering and LIR ⇒ LIR optimization (global)
494                        let global_lir_plan =
495                            optimizer.catch_unwind_optimize(global_mir_plan.clone())?;
496
497                        Ok((local_mir_plan, global_mir_plan, global_lir_plan))
498                    };
499
500                    let stage = match pipeline() {
501                        Ok((local_mir_plan, global_mir_plan, global_lir_plan)) => {
502                            if let ExplainContext::Plan(explain_ctx) = explain_ctx {
503                                let (_, df_meta) = global_lir_plan.unapply();
504                                CreateMaterializedViewStage::Explain(
505                                    CreateMaterializedViewExplain {
506                                        validity,
507                                        global_id,
508                                        plan,
509                                        df_meta,
510                                        explain_ctx,
511                                    },
512                                )
513                            } else {
514                                CreateMaterializedViewStage::Finish(CreateMaterializedViewFinish {
515                                    item_id,
516                                    global_id,
517                                    validity,
518                                    plan,
519                                    resolved_ids,
520                                    local_mir_plan,
521                                    global_mir_plan,
522                                    global_lir_plan,
523                                    optimizer_features,
524                                })
525                            }
526                        }
527                        // Internal optimizer errors are handled differently
528                        // depending on the caller.
529                        Err(err) => {
530                            let ExplainContext::Plan(explain_ctx) = explain_ctx else {
531                                // In `sequence_~` contexts, immediately return the error.
532                                return Err(err);
533                            };
534
535                            if explain_ctx.broken {
536                                // In `EXPLAIN BROKEN` contexts, just log the error
537                                // and move to the next stage with default
538                                // parameters.
539                                tracing::error!("error while handling EXPLAIN statement: {}", err);
540                                CreateMaterializedViewStage::Explain(
541                                    CreateMaterializedViewExplain {
542                                        global_id,
543                                        validity,
544                                        plan,
545                                        df_meta: Default::default(),
546                                        explain_ctx,
547                                    },
548                                )
549                            } else {
550                                // In regular `EXPLAIN` contexts, immediately return the error.
551                                return Err(err);
552                            }
553                        }
554                    };
555
556                    Ok(Box::new(stage))
557                })
558            },
559        )))
560    }
561
562    #[instrument]
563    async fn create_materialized_view_finish(
564        &mut self,
565        ctx: &mut ExecuteContext,
566        stage: CreateMaterializedViewFinish,
567    ) -> Result<StageResult<Box<CreateMaterializedViewStage>>, AdapterError> {
568        let CreateMaterializedViewFinish {
569            item_id,
570            global_id,
571            plan:
572                plan::CreateMaterializedViewPlan {
573                    name,
574                    materialized_view:
575                        plan::MaterializedView {
576                            mut create_sql,
577                            expr: raw_expr,
578                            column_names,
579                            dependencies,
580                            replacement_target,
581                            cluster_id,
582                            target_replica,
583                            non_null_assertions,
584                            compaction_window,
585                            refresh_schedule,
586                            ..
587                        },
588                    drop_ids,
589                    if_not_exists,
590                    ..
591                },
592            resolved_ids,
593            local_mir_plan,
594            global_mir_plan,
595            global_lir_plan,
596            optimizer_features,
597            ..
598        } = stage;
599
600        // Validate the replacement target, if one is given.
601        if let Some(target_id) = replacement_target {
602            let Some(target) = self.catalog().get_entry(&target_id).materialized_view() else {
603                return Err(AdapterError::internal(
604                    "create materialized view",
605                    "replacement target not a materialized view",
606                ));
607            };
608
609            // For now, we don't support schema evolution for materialized views.
610            let schema_diff = target.desc.latest().diff(global_lir_plan.desc());
611            if !schema_diff.is_empty() {
612                return Err(AdapterError::ReplacementSchemaMismatch(schema_diff));
613            }
614        }
615
616        // Timestamp selection
617        let id_bundle = dataflow_import_id_bundle(global_lir_plan.df_desc(), cluster_id);
618
619        let read_holds_owned;
620        let read_holds = if let Some(txn_reads) = self.txn_read_holds.get(ctx.session().conn_id()) {
621            // In some cases, for example when REFRESH is used, the preparatory
622            // stages will already have acquired ReadHolds, we can re-use those.
623
624            txn_reads
625        } else {
626            // No one has acquired holds, make sure we can determine an as_of
627            // and render our dataflow below.
628            read_holds_owned = self.acquire_read_holds(&id_bundle);
629            &read_holds_owned
630        };
631
632        let (dataflow_as_of, storage_as_of, until) =
633            self.select_timestamps(id_bundle, refresh_schedule.as_ref(), read_holds)?;
634
635        tracing::info!(
636            dataflow_as_of = ?dataflow_as_of,
637            storage_as_of = ?storage_as_of,
638            until = ?until,
639            "materialized view timestamp selection",
640        );
641
642        let initial_as_of = storage_as_of.clone();
643
644        // Update the `create_sql` with the selected `as_of`. This is how we make sure the `as_of`
645        // is persisted to the catalog and can be relied on during bootstrapping.
646        // This has to be the `storage_as_of`, because bootstrapping uses this in
647        // `bootstrap_storage_collections`.
648        if let Some(storage_as_of_ts) = storage_as_of.as_option() {
649            let stmt = mz_sql::parse::parse(&create_sql)
650                .map_err(|_| {
651                    AdapterError::internal(
652                        "create materialized view",
653                        "original SQL should roundtrip",
654                    )
655                })?
656                .into_element()
657                .ast;
658            let ast::Statement::CreateMaterializedView(mut stmt) = stmt else {
659                panic!("unexpected statement type");
660            };
661            stmt.as_of = Some(storage_as_of_ts.into());
662            create_sql = stmt.to_ast_string_stable();
663        }
664
665        let desc = VersionedRelationDesc::new(global_lir_plan.desc().clone());
666        let collections = [(RelationVersion::root(), global_id)].into_iter().collect();
667
668        let local_mir_for_cache = local_mir_plan.expr();
669
670        let ops = vec![
671            catalog::Op::DropObjects(
672                drop_ids
673                    .into_iter()
674                    .map(catalog::DropObjectInfo::Item)
675                    .collect(),
676            ),
677            catalog::Op::CreateItem {
678                id: item_id,
679                name: name.clone(),
680                item: CatalogItem::MaterializedView(MaterializedView {
681                    create_sql,
682                    raw_expr: raw_expr.into(),
683                    locally_optimized_expr: local_mir_plan.expr().into(),
684                    desc,
685                    collections,
686                    resolved_ids,
687                    dependencies,
688                    replacement_target,
689                    cluster_id,
690                    target_replica,
691                    non_null_assertions,
692                    custom_logical_compaction_window: compaction_window,
693                    refresh_schedule: refresh_schedule.clone(),
694                    initial_as_of: Some(initial_as_of.clone()),
695                    optimized_plan: None,
696                    physical_plan: None,
697                    dataflow_metainfo: None,
698                }),
699                owner_id: *ctx.session().current_role_id(),
700            },
701        ];
702
703        // Pre-allocate a vector of transient GlobalIds for each notice.
704        let notice_ids = std::iter::repeat_with(|| self.allocate_transient_id())
705            .map(|(_item_id, global_id)| global_id)
706            .take(global_lir_plan.df_meta().optimizer_notices.len())
707            .collect::<Vec<_>>();
708
709        // Render optimizer notices before the catalog transaction. We wrap
710        // the system-session humanizer with an `ExprHumanizerExt` so that
711        // references to the to-be-created materialized view's own
712        // `global_id` in the persisted notice text resolve to its intended
713        // human-readable name.
714        //
715        // We keep `raw_df_meta` live so that on success we can emit its raw
716        // notices to the user session (rendered against the user's
717        // session-aware humanizer). We deliberately do NOT emit to the user
718        // here, so that if the catalog transaction below fails the user
719        // isn't shown confusing notices about an item that wasn't actually
720        // created.
721        let output_desc = global_lir_plan.desc().clone();
722        let (mut df_desc, raw_df_meta) = global_lir_plan.unapply();
723        let df_meta = {
724            let system_catalog = self.catalog().for_system_session();
725            let full_name = self.catalog().resolve_full_name(&name, None);
726            let transient_items = btreemap! {
727                global_id => TransientItem::new(
728                    Some(full_name.into_parts()),
729                    Some(column_names.iter().map(|c| c.to_string()).collect()),
730                )
731            };
732            let humanizer = ExprHumanizerExt::new(transient_items, &system_catalog);
733            CatalogState::render_notices_core(
734                &humanizer,
735                (self.catalog().config().now)(),
736                &raw_df_meta,
737                notice_ids,
738                Some(global_id),
739            )
740        };
741
742        // Populate the durable expression cache before the catalog
743        // transaction and await the write. This way any other envd (or a
744        // subsequent bootstrap here) will observe the cached plans +
745        // rendered notices as soon as the item becomes visible.
746        self.catalog()
747            .cache_expressions(
748                global_id,
749                Some(local_mir_for_cache),
750                global_mir_plan.df_desc().clone(),
751                df_desc.clone(),
752                df_meta.clone(),
753                optimizer_features,
754            )
755            .await;
756
757        let transact_result = self
758            .catalog_transact_with_side_effects(Some(ctx), ops, move |coord, _ctx| {
759                Box::pin(async move {
760                    // Save plan structures.
761                    coord
762                        .catalog_mut()
763                        .set_optimized_plan(global_id, global_mir_plan.df_desc().clone());
764                    coord
765                        .catalog_mut()
766                        .set_physical_plan(global_id, df_desc.clone());
767
768                    let notice_builtin_updates_fut =
769                        coord.persist_dataflow_metainfo(df_meta, global_id).await;
770
771                    df_desc.set_as_of(dataflow_as_of.clone());
772                    df_desc.set_initial_as_of(initial_as_of);
773                    df_desc.until = until;
774
775                    let storage_metadata = coord.catalog.state().storage_metadata();
776
777                    let mut collection_desc =
778                        CollectionDescription::for_other(output_desc, Some(storage_as_of));
779                    let mut allow_writes = true;
780
781                    // If this MV is intended to replace another one, we need to start it in
782                    // read-only mode, targeting the shard of the replacement target.
783                    if let Some(target_id) = replacement_target {
784                        let target_gid = coord.catalog.get_entry(&target_id).latest_global_id();
785                        collection_desc.primary = Some(target_gid);
786                        allow_writes = false;
787                    }
788
789                    // Announce the creation of the materialized view source.
790                    coord
791                        .controller
792                        .storage
793                        .create_collections(
794                            storage_metadata,
795                            None,
796                            vec![(global_id, collection_desc)],
797                        )
798                        .await
799                        .unwrap_or_terminate("cannot fail to append");
800
801                    coord
802                        .initialize_storage_read_policies(
803                            btreeset![item_id],
804                            compaction_window.unwrap_or(CompactionWindow::Default),
805                        )
806                        .await;
807
808                    coord
809                        .ship_dataflow_and_notice_builtin_table_updates(
810                            df_desc,
811                            cluster_id,
812                            notice_builtin_updates_fut,
813                            target_replica,
814                        )
815                        .await;
816
817                    if allow_writes {
818                        coord.allow_writes(cluster_id, global_id);
819                    }
820                })
821            })
822            .await;
823
824        match transact_result {
825            Ok(_) => {
826                // Only emit optimizer notices to the user now that the
827                // catalog transaction has succeeded. If the transaction had
828                // failed, emitting notices would confuse the user with
829                // information about an item that wasn't actually created.
830                self.emit_raw_optimizer_notices_to_user(ctx, &raw_df_meta.optimizer_notices);
831                Ok(ExecuteResponse::CreatedMaterializedView)
832            }
833            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
834                kind:
835                    mz_catalog::memory::error::ErrorKind::Sql(
836                        CatalogError::ItemAlreadyExists(_, _),
837                    ),
838            })) if if_not_exists => {
839                ctx.session()
840                    .add_notice(AdapterNotice::ObjectAlreadyExists {
841                        name: name.item,
842                        ty: "materialized view",
843                    });
844                Ok(ExecuteResponse::CreatedMaterializedView)
845            }
846            Err(err) => Err(err),
847        }
848        .map(StageResult::Response)
849    }
850
851    /// Select the initial `dataflow_as_of`, `storage_as_of`, and `until` frontiers for a
852    /// materialized view.
853    fn select_timestamps(
854        &self,
855        id_bundle: CollectionIdBundle,
856        refresh_schedule: Option<&RefreshSchedule>,
857        read_holds: &ReadHolds,
858    ) -> Result<
859        (
860            Antichain<mz_repr::Timestamp>,
861            Antichain<mz_repr::Timestamp>,
862            Antichain<mz_repr::Timestamp>,
863        ),
864        AdapterError,
865    > {
866        assert!(
867            id_bundle.difference(&read_holds.id_bundle()).is_empty(),
868            "we must have read holds for all involved collections"
869        );
870
871        // For non-REFRESH MVs both the `dataflow_as_of` and the `storage_as_of` should be simply
872        // `least_valid_read`.
873        let least_valid_read = read_holds.least_valid_read();
874        let mut dataflow_as_of = least_valid_read.clone();
875        let mut storage_as_of = least_valid_read.clone();
876
877        // For MVs with non-trivial REFRESH schedules:
878        // 1. it's important to set the `storage_as_of` to the first refresh. This is because we'd
879        // like queries on the MV to block until the first refresh (rather than to show an empty
880        // MV).
881        // 2. We move the `dataflow_as_of` forward to the minimum of `greatest_available_read` and
882        // the first refresh time. There is no point in processing the times before
883        // `greatest_available_read`, because the first time for which results will be exposed is
884        // the first refresh time. Also note that simply moving the `dataflow_as_of` forward to the
885        // first refresh time would prevent warmup before the first refresh.
886        if let Some(refresh_schedule) = &refresh_schedule {
887            if let Some(least_valid_read_ts) = least_valid_read.as_option() {
888                if let Some(first_refresh_ts) =
889                    refresh_schedule.round_up_timestamp(*least_valid_read_ts)
890                {
891                    storage_as_of = Antichain::from_elem(first_refresh_ts);
892                    dataflow_as_of.join_assign(
893                        &self
894                            .greatest_available_read(&id_bundle)
895                            .meet(&storage_as_of),
896                    );
897                } else {
898                    let last_refresh = refresh_schedule.last_refresh().expect(
899                        "if round_up_timestamp returned None, then there should be a last refresh",
900                    );
901
902                    return Err(AdapterError::MaterializedViewWouldNeverRefresh(
903                        last_refresh,
904                        *least_valid_read_ts,
905                    ));
906                }
907            } else {
908                // The `as_of` should never be empty, because then the MV would be unreadable.
909                soft_panic_or_log!("creating a materialized view with an empty `as_of`");
910            }
911        }
912
913        // If we have a refresh schedule that has a last refresh, then set the `until` to the last refresh.
914        // (If the `try_step_forward` fails, then no need to set an `until`, because it's not possible to get any data
915        // beyond that last refresh time, because there are no times beyond that time.)
916        let until_ts = refresh_schedule
917            .and_then(|s| s.last_refresh())
918            .and_then(|r| r.try_step_forward());
919        let until = Antichain::from_iter(until_ts);
920
921        Ok((dataflow_as_of, storage_as_of, until))
922    }
923
924    #[instrument]
925    async fn create_materialized_view_explain(
926        &self,
927        session: &Session,
928        CreateMaterializedViewExplain {
929            global_id,
930            plan:
931                plan::CreateMaterializedViewPlan {
932                    name,
933                    materialized_view:
934                        plan::MaterializedView {
935                            column_names,
936                            cluster_id,
937                            ..
938                        },
939                    ..
940                },
941            df_meta,
942            explain_ctx:
943                ExplainPlanContext {
944                    config,
945                    format,
946                    stage,
947                    optimizer_trace,
948                    ..
949                },
950            ..
951        }: CreateMaterializedViewExplain,
952    ) -> Result<StageResult<Box<CreateMaterializedViewStage>>, AdapterError> {
953        let session_catalog = self.catalog().for_session(session);
954        let expr_humanizer = {
955            let full_name = self.catalog().resolve_full_name(&name, None);
956            let transient_items = btreemap! {
957                global_id => TransientItem::new(
958                    Some(full_name.into_parts()),
959                    Some(column_names.iter().map(|c| c.to_string()).collect()),
960                )
961            };
962            ExprHumanizerExt::new(transient_items, &session_catalog)
963        };
964
965        let target_cluster = self.catalog().get_cluster(cluster_id);
966
967        let features = OptimizerFeatures::from(self.catalog().system_config())
968            .override_from(&target_cluster.config.features())
969            .override_from(&self.cluster_scoped_optimizer_overrides(cluster_id))
970            .override_from(&config.features);
971
972        let rows = optimizer_trace
973            .into_rows(
974                format,
975                &config,
976                &features,
977                &expr_humanizer,
978                None,
979                Some(target_cluster),
980                df_meta,
981                stage,
982                plan::ExplaineeStatementKind::CreateMaterializedView,
983                None,
984            )
985            .await?;
986
987        Ok(StageResult::Response(Self::send_immediate_rows(rows)))
988    }
989
990    pub(crate) async fn explain_pushdown_materialized_view(
991        &self,
992        ctx: ExecuteContext,
993        item_id: CatalogItemId,
994    ) {
995        let CatalogItem::MaterializedView(mview) = self.catalog().get_entry(&item_id).item() else {
996            unreachable!() // Asserted in `sequence_explain_pushdown`.
997        };
998        let gid = mview.global_id_writes();
999        let mview = mview.clone();
1000
1001        let Some(plan) = self.catalog().try_get_physical_plan(&gid).cloned() else {
1002            let msg = format!("cannot find plan for materialized view {item_id} in catalog");
1003            tracing::error!("{msg}");
1004            ctx.retire(Err(anyhow!("{msg}").into()));
1005            return;
1006        };
1007
1008        // We don't have any way to "duplicate" the read hold of the actual collection, which we
1009        // obtain below... but the current implementation of read holds guarantees that the storage
1010        // holds we obtain here will not be any greater than the hold we actually want.
1011        let read_holds =
1012            Some(self.acquire_read_holds(&dataflow_import_id_bundle(&plan, mview.cluster_id)));
1013
1014        let frontiers = self
1015            .controller
1016            .compute
1017            .collection_frontiers(gid, Some(mview.cluster_id))
1018            .expect("materialized view exists");
1019
1020        let as_of = frontiers.read_frontier.to_owned();
1021
1022        let until = mview
1023            .refresh_schedule
1024            .as_ref()
1025            .and_then(|s| s.last_refresh())
1026            .unwrap_or(mz_repr::Timestamp::MAX);
1027
1028        let mz_now = match as_of.as_option() {
1029            Some(&as_of) => {
1030                ResultSpec::value_between(Datum::MzTimestamp(as_of), Datum::MzTimestamp(until))
1031            }
1032            None => ResultSpec::value_all(),
1033        };
1034
1035        self.execute_explain_pushdown_with_read_holds(
1036            ctx,
1037            as_of,
1038            mz_now,
1039            read_holds,
1040            plan.source_imports
1041                .into_iter()
1042                .filter_map(|(id, import)| import.desc.arguments.operators.map(|mfp| (id, mfp))),
1043        )
1044        .await
1045    }
1046}