Skip to main content

mz_adapter/coord/sequencer/inner/
create_materialized_view.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use anyhow::anyhow;
11use differential_dataflow::lattice::Lattice;
12use maplit::btreemap;
13use maplit::btreeset;
14use mz_adapter_types::compaction::CompactionWindow;
15use mz_catalog::memory::objects::{CatalogItem, MaterializedView};
16use mz_expr::{CollectionPlan, ResultSpec};
17use mz_ore::collections::CollectionExt;
18use mz_ore::instrument;
19use mz_ore::soft_panic_or_log;
20use mz_repr::explain::{ExprHumanizerExt, TransientItem};
21use mz_repr::optimize::OptimizerFeatures;
22use mz_repr::optimize::OverrideFrom;
23use mz_repr::refresh_schedule::RefreshSchedule;
24use mz_repr::{CatalogItemId, Datum, RelationVersion, Row, VersionedRelationDesc};
25use mz_sql::ast::ExplainStage;
26use mz_sql::catalog::CatalogError;
27use mz_sql::names::ResolvedIds;
28use mz_sql::plan;
29use mz_sql::session::metadata::SessionMetadata;
30use mz_sql_parser::ast;
31use mz_sql_parser::ast::display::AstDisplay;
32use mz_storage_client::controller::CollectionDescription;
33use std::collections::BTreeMap;
34use timely::progress::Antichain;
35use tracing::Span;
36
37use crate::ReadHolds;
38use crate::command::ExecuteResponse;
39use crate::coord::sequencer::inner::return_if_err;
40use crate::coord::{
41    Coordinator, CreateMaterializedViewExplain, CreateMaterializedViewFinish,
42    CreateMaterializedViewOptimize, CreateMaterializedViewStage, ExplainContext,
43    ExplainPlanContext, Message, PlanValidity, StageResult, Staged,
44};
45use crate::error::AdapterError;
46use crate::explain::explain_dataflow;
47use crate::explain::explain_plan;
48use crate::explain::optimizer_trace::OptimizerTrace;
49use crate::optimize::dataflows::dataflow_import_id_bundle;
50use crate::optimize::{self, Optimize};
51use crate::session::Session;
52use crate::util::ResultExt;
53use crate::{AdapterNotice, CollectionIdBundle, ExecuteContext, TimestampProvider, catalog};
54
55impl Staged for CreateMaterializedViewStage {
56    type Ctx = ExecuteContext;
57
58    fn validity(&mut self) -> &mut PlanValidity {
59        match self {
60            Self::Optimize(stage) => &mut stage.validity,
61            Self::Finish(stage) => &mut stage.validity,
62            Self::Explain(stage) => &mut stage.validity,
63        }
64    }
65
66    async fn stage(
67        self,
68        coord: &mut Coordinator,
69        ctx: &mut ExecuteContext,
70    ) -> Result<StageResult<Box<Self>>, AdapterError> {
71        match self {
72            CreateMaterializedViewStage::Optimize(stage) => {
73                coord.create_materialized_view_optimize(stage).await
74            }
75            CreateMaterializedViewStage::Finish(stage) => {
76                coord.create_materialized_view_finish(ctx, stage).await
77            }
78            CreateMaterializedViewStage::Explain(stage) => {
79                coord
80                    .create_materialized_view_explain(ctx.session(), stage)
81                    .await
82            }
83        }
84    }
85
86    fn message(self, ctx: ExecuteContext, span: Span) -> Message {
87        Message::CreateMaterializedViewStageReady {
88            ctx,
89            span,
90            stage: self,
91        }
92    }
93
94    fn cancel_enabled(&self) -> bool {
95        true
96    }
97}
98
99impl Coordinator {
100    #[instrument]
101    pub(crate) async fn sequence_create_materialized_view(
102        &mut self,
103        ctx: ExecuteContext,
104        plan: plan::CreateMaterializedViewPlan,
105        resolved_ids: ResolvedIds,
106    ) {
107        let stage = return_if_err!(
108            self.create_materialized_view_validate(
109                ctx.session(),
110                plan,
111                resolved_ids,
112                ExplainContext::None
113            ),
114            ctx
115        );
116        self.sequence_staged(ctx, Span::current(), stage).await;
117    }
118
119    #[instrument]
120    pub(crate) async fn explain_create_materialized_view(
121        &mut self,
122        ctx: ExecuteContext,
123        plan::ExplainPlanPlan {
124            stage,
125            format,
126            config,
127            explainee,
128        }: plan::ExplainPlanPlan,
129    ) {
130        let plan::Explainee::Statement(stmt) = explainee else {
131            // This is currently asserted in the `sequence_explain_plan` code that
132            // calls this method.
133            unreachable!()
134        };
135        let plan::ExplaineeStatement::CreateMaterializedView { broken, plan } = stmt else {
136            // This is currently asserted in the `sequence_explain_plan` code that
137            // calls this method.
138            unreachable!()
139        };
140
141        // Create an OptimizerTrace instance to collect plans emitted when
142        // executing the optimizer pipeline.
143        let optimizer_trace = OptimizerTrace::new(stage.paths());
144
145        // Not used in the EXPLAIN path so it's OK to generate a dummy value.
146        let resolved_ids = ResolvedIds::empty();
147
148        let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
149            broken,
150            config,
151            format,
152            stage,
153            replan: None,
154            desc: None,
155            optimizer_trace,
156        });
157        let stage = return_if_err!(
158            self.create_materialized_view_validate(ctx.session(), plan, resolved_ids, explain_ctx),
159            ctx
160        );
161        self.sequence_staged(ctx, Span::current(), stage).await;
162    }
163
164    #[instrument]
165    pub(crate) async fn explain_replan_materialized_view(
166        &mut self,
167        ctx: ExecuteContext,
168        plan::ExplainPlanPlan {
169            stage,
170            format,
171            config,
172            explainee,
173        }: plan::ExplainPlanPlan,
174    ) {
175        let plan::Explainee::ReplanMaterializedView(id) = explainee else {
176            unreachable!() // Asserted in `sequence_explain_plan`.
177        };
178        let CatalogItem::MaterializedView(item) = self.catalog().get_entry(&id).item() else {
179            unreachable!() // Asserted in `plan_explain_plan`.
180        };
181        let gid = item.global_id_writes();
182
183        let create_sql = item.create_sql.clone();
184        let plan_result = self
185            .catalog_mut()
186            .deserialize_plan_with_enable_for_item_parsing(&create_sql, true);
187        let (plan, resolved_ids) = return_if_err!(plan_result, ctx);
188
189        let plan::Plan::CreateMaterializedView(plan) = plan else {
190            unreachable!() // We are parsing the `create_sql` of a `MaterializedView` item.
191        };
192
193        // It is safe to assume that query optimization will always succeed, so
194        // for now we statically assume `broken = false`.
195        let broken = false;
196
197        // Create an OptimizerTrace instance to collect plans emitted when
198        // executing the optimizer pipeline.
199        let optimizer_trace = OptimizerTrace::new(stage.paths());
200
201        let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
202            broken,
203            config,
204            format,
205            stage,
206            replan: Some(gid),
207            desc: None,
208            optimizer_trace,
209        });
210        let stage = return_if_err!(
211            self.create_materialized_view_validate(ctx.session(), plan, resolved_ids, explain_ctx,),
212            ctx
213        );
214        self.sequence_staged(ctx, Span::current(), stage).await;
215    }
216
217    #[instrument]
218    pub(super) fn explain_materialized_view(
219        &self,
220        ctx: &ExecuteContext,
221        plan::ExplainPlanPlan {
222            stage,
223            format,
224            config,
225            explainee,
226        }: plan::ExplainPlanPlan,
227    ) -> Result<ExecuteResponse, AdapterError> {
228        let plan::Explainee::MaterializedView(id) = explainee else {
229            unreachable!() // Asserted in `sequence_explain_plan`.
230        };
231        let CatalogItem::MaterializedView(view) = self.catalog().get_entry(&id).item() else {
232            unreachable!() // Asserted in `plan_explain_plan`.
233        };
234        let gid = view.global_id_writes();
235
236        let Some(dataflow_metainfo) = self.catalog().try_get_dataflow_metainfo(&gid) else {
237            if !id.is_system() {
238                tracing::error!(
239                    "cannot find dataflow metainformation for materialized view {id} in catalog"
240                );
241            }
242            coord_bail!(
243                "cannot find dataflow metainformation for materialized view {id} in catalog"
244            );
245        };
246
247        let target_cluster = self.catalog().get_cluster(view.cluster_id);
248
249        let features = OptimizerFeatures::from(self.catalog().system_config())
250            .override_from(&target_cluster.config.features())
251            .override_from(&config.features);
252
253        let cardinality_stats = BTreeMap::new();
254
255        let explain = match stage {
256            ExplainStage::RawPlan => explain_plan(
257                view.raw_expr.as_ref().clone(),
258                format,
259                &config,
260                &features,
261                &self.catalog().for_session(ctx.session()),
262                cardinality_stats,
263                Some(target_cluster.name.as_str()),
264            )?,
265            ExplainStage::LocalPlan => explain_plan(
266                view.optimized_expr.as_inner().clone(),
267                format,
268                &config,
269                &features,
270                &self.catalog().for_session(ctx.session()),
271                cardinality_stats,
272                Some(target_cluster.name.as_str()),
273            )?,
274            ExplainStage::GlobalPlan => {
275                let Some(plan) = self.catalog().try_get_optimized_plan(&gid).cloned() else {
276                    tracing::error!("cannot find {stage} for materialized view {id} in catalog");
277                    coord_bail!("cannot find {stage} for materialized view in catalog");
278                };
279                explain_dataflow(
280                    plan,
281                    format,
282                    &config,
283                    &features,
284                    &self.catalog().for_session(ctx.session()),
285                    cardinality_stats,
286                    Some(target_cluster.name.as_str()),
287                    dataflow_metainfo,
288                )?
289            }
290            ExplainStage::PhysicalPlan => {
291                let Some(plan) = self.catalog().try_get_physical_plan(&gid).cloned() else {
292                    tracing::error!("cannot find {stage} for materialized view {id} in catalog",);
293                    coord_bail!("cannot find {stage} for materialized view in catalog");
294                };
295                explain_dataflow(
296                    plan,
297                    format,
298                    &config,
299                    &features,
300                    &self.catalog().for_session(ctx.session()),
301                    cardinality_stats,
302                    Some(target_cluster.name.as_str()),
303                    dataflow_metainfo,
304                )?
305            }
306            _ => {
307                coord_bail!("cannot EXPLAIN {} FOR MATERIALIZED VIEW", stage);
308            }
309        };
310
311        let row = Row::pack_slice(&[Datum::from(explain.as_str())]);
312
313        Ok(Self::send_immediate_rows(row))
314    }
315
316    #[instrument]
317    fn create_materialized_view_validate(
318        &self,
319        session: &Session,
320        plan: plan::CreateMaterializedViewPlan,
321        resolved_ids: ResolvedIds,
322        // An optional context set iff the state machine is initiated from
323        // sequencing an EXPLAIN for this statement.
324        explain_ctx: ExplainContext,
325    ) -> Result<CreateMaterializedViewStage, AdapterError> {
326        let plan::CreateMaterializedViewPlan {
327            materialized_view:
328                plan::MaterializedView {
329                    expr,
330                    cluster_id,
331                    refresh_schedule,
332                    ..
333                },
334            ambiguous_columns,
335            ..
336        } = &plan;
337
338        // Validate any references in the materialized view's expression. We do
339        // this on the unoptimized plan to better reflect what the user typed.
340        // We want to reject queries that depend on log sources, for example,
341        // even if we can *technically* optimize that reference away.
342        let expr_depends_on = expr.depends_on();
343        self.catalog()
344            .validate_timeline_context(expr_depends_on.iter().copied())?;
345        self.validate_system_column_references(*ambiguous_columns, &expr_depends_on)?;
346        // Materialized views are not allowed to depend on log sources, as replicas
347        // are not producing the same definite collection for these.
348        let log_names = expr_depends_on
349            .iter()
350            .map(|gid| self.catalog.resolve_item_id(gid))
351            .flat_map(|item_id| self.catalog().introspection_dependencies(item_id))
352            .map(|item_id| self.catalog().get_entry(&item_id).name().item.clone())
353            .collect::<Vec<_>>();
354        if !log_names.is_empty() {
355            return Err(AdapterError::InvalidLogDependency {
356                object_type: "materialized view".into(),
357                log_names,
358            });
359        }
360
361        let validity =
362            PlanValidity::require_transient_revision(self.catalog().transient_revision());
363
364        // Check whether we can read all inputs at all the REFRESH AT times.
365        if let Some(refresh_schedule) = refresh_schedule {
366            if !refresh_schedule.ats.is_empty() && matches!(explain_ctx, ExplainContext::None) {
367                // Purification has acquired the earliest possible read holds if there are any
368                // REFRESH options.
369                let read_holds = self
370                    .txn_read_holds
371                    .get(session.conn_id())
372                    .expect("purification acquired read holds if there are REFRESH ATs");
373                let least_valid_read = read_holds.least_valid_read();
374                for refresh_at_ts in &refresh_schedule.ats {
375                    if !least_valid_read.less_equal(refresh_at_ts) {
376                        return Err(AdapterError::InputNotReadableAtRefreshAtTime(
377                            *refresh_at_ts,
378                            least_valid_read,
379                        ));
380                    }
381                }
382                // Also check that no new id has appeared in `sufficient_collections` (e.g. a new
383                // index), otherwise we might be missing some read holds.
384                let ids = self
385                    .index_oracle(*cluster_id)
386                    .sufficient_collections(resolved_ids.collections().copied());
387                if !ids.difference(&read_holds.id_bundle()).is_empty() {
388                    return Err(AdapterError::ChangedPlan(
389                        "the set of possible inputs changed during the creation of the \
390                         materialized view"
391                            .to_string(),
392                    ));
393                }
394            }
395        }
396
397        Ok(CreateMaterializedViewStage::Optimize(
398            CreateMaterializedViewOptimize {
399                validity,
400                plan,
401                resolved_ids,
402                explain_ctx,
403            },
404        ))
405    }
406
407    #[instrument]
408    async fn create_materialized_view_optimize(
409        &mut self,
410        CreateMaterializedViewOptimize {
411            validity,
412            plan,
413            resolved_ids,
414            explain_ctx,
415        }: CreateMaterializedViewOptimize,
416    ) -> Result<StageResult<Box<CreateMaterializedViewStage>>, AdapterError> {
417        let plan::CreateMaterializedViewPlan {
418            name,
419            materialized_view:
420                plan::MaterializedView {
421                    column_names,
422                    cluster_id,
423                    non_null_assertions,
424                    refresh_schedule,
425                    ..
426                },
427            ..
428        } = &plan;
429
430        // Collect optimizer parameters.
431        let compute_instance = self
432            .instance_snapshot(*cluster_id)
433            .expect("compute instance does not exist");
434        let (item_id, global_id) = if let ExplainContext::None = explain_ctx {
435            let id_ts = self.get_catalog_write_ts().await;
436            self.catalog().allocate_user_id(id_ts).await?
437        } else {
438            self.allocate_transient_id()
439        };
440
441        let (_, view_id) = self.allocate_transient_id();
442        let debug_name = self.catalog().resolve_full_name(name, None).to_string();
443        let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config())
444            .override_from(&self.catalog.get_cluster(*cluster_id).config.features())
445            .override_from(&explain_ctx);
446        let optimizer_features = optimizer_config.features.clone();
447        let force_non_monotonic = Default::default();
448
449        // Build an optimizer for this MATERIALIZED VIEW.
450        let mut optimizer = optimize::materialized_view::Optimizer::new(
451            self.owned_catalog().as_optimizer_catalog(),
452            compute_instance,
453            global_id,
454            view_id,
455            column_names.clone(),
456            non_null_assertions.clone(),
457            refresh_schedule.clone(),
458            debug_name,
459            optimizer_config,
460            self.optimizer_metrics(),
461            force_non_monotonic,
462        );
463
464        let span = Span::current();
465        Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
466            || "optimize create materialized view",
467            move || {
468                span.in_scope(|| {
469                    let mut pipeline = || -> Result<(
470                        optimize::materialized_view::LocalMirPlan,
471                        optimize::materialized_view::GlobalMirPlan,
472                        optimize::materialized_view::GlobalLirPlan,
473                    ), AdapterError> {
474                        let _dispatch_guard = explain_ctx.dispatch_guard();
475
476                        let raw_expr = plan.materialized_view.expr.clone();
477
478                        // HIR ⇒ MIR lowering and MIR ⇒ MIR optimization (local and global)
479                        let local_mir_plan = optimizer.catch_unwind_optimize(raw_expr)?;
480                        let global_mir_plan =
481                            optimizer.catch_unwind_optimize(local_mir_plan.clone())?;
482                        // MIR ⇒ LIR lowering and LIR ⇒ LIR optimization (global)
483                        let global_lir_plan =
484                            optimizer.catch_unwind_optimize(global_mir_plan.clone())?;
485
486                        Ok((local_mir_plan, global_mir_plan, global_lir_plan))
487                    };
488
489                    let stage = match pipeline() {
490                        Ok((local_mir_plan, global_mir_plan, global_lir_plan)) => {
491                            if let ExplainContext::Plan(explain_ctx) = explain_ctx {
492                                let (_, df_meta) = global_lir_plan.unapply();
493                                CreateMaterializedViewStage::Explain(
494                                    CreateMaterializedViewExplain {
495                                        validity,
496                                        global_id,
497                                        plan,
498                                        df_meta,
499                                        explain_ctx,
500                                    },
501                                )
502                            } else {
503                                CreateMaterializedViewStage::Finish(CreateMaterializedViewFinish {
504                                    item_id,
505                                    global_id,
506                                    validity,
507                                    plan,
508                                    resolved_ids,
509                                    local_mir_plan,
510                                    global_mir_plan,
511                                    global_lir_plan,
512                                    optimizer_features,
513                                })
514                            }
515                        }
516                        // Internal optimizer errors are handled differently
517                        // depending on the caller.
518                        Err(err) => {
519                            let ExplainContext::Plan(explain_ctx) = explain_ctx else {
520                                // In `sequence_~` contexts, immediately return the error.
521                                return Err(err);
522                            };
523
524                            if explain_ctx.broken {
525                                // In `EXPLAIN BROKEN` contexts, just log the error
526                                // and move to the next stage with default
527                                // parameters.
528                                tracing::error!("error while handling EXPLAIN statement: {}", err);
529                                CreateMaterializedViewStage::Explain(
530                                    CreateMaterializedViewExplain {
531                                        global_id,
532                                        validity,
533                                        plan,
534                                        df_meta: Default::default(),
535                                        explain_ctx,
536                                    },
537                                )
538                            } else {
539                                // In regular `EXPLAIN` contexts, immediately return the error.
540                                return Err(err);
541                            }
542                        }
543                    };
544
545                    Ok(Box::new(stage))
546                })
547            },
548        )))
549    }
550
551    #[instrument]
552    async fn create_materialized_view_finish(
553        &mut self,
554        ctx: &mut ExecuteContext,
555        stage: CreateMaterializedViewFinish,
556    ) -> Result<StageResult<Box<CreateMaterializedViewStage>>, AdapterError> {
557        let CreateMaterializedViewFinish {
558            item_id,
559            global_id,
560            plan:
561                plan::CreateMaterializedViewPlan {
562                    name,
563                    materialized_view:
564                        plan::MaterializedView {
565                            mut create_sql,
566                            expr: raw_expr,
567                            dependencies,
568                            replacement_target,
569                            cluster_id,
570                            target_replica,
571                            non_null_assertions,
572                            compaction_window,
573                            refresh_schedule,
574                            ..
575                        },
576                    drop_ids,
577                    if_not_exists,
578                    ..
579                },
580            resolved_ids,
581            local_mir_plan,
582            global_mir_plan,
583            global_lir_plan,
584            optimizer_features,
585            ..
586        } = stage;
587
588        // Validate the replacement target, if one is given.
589        if let Some(target_id) = replacement_target {
590            let Some(target) = self.catalog().get_entry(&target_id).materialized_view() else {
591                return Err(AdapterError::internal(
592                    "create materialized view",
593                    "replacement target not a materialized view",
594                ));
595            };
596
597            // For now, we don't support schema evolution for materialized views.
598            let schema_diff = target.desc.latest().diff(global_lir_plan.desc());
599            if !schema_diff.is_empty() {
600                return Err(AdapterError::ReplacementSchemaMismatch(schema_diff));
601            }
602        }
603
604        // Timestamp selection
605        let id_bundle = dataflow_import_id_bundle(global_lir_plan.df_desc(), cluster_id);
606
607        let read_holds_owned;
608        let read_holds = if let Some(txn_reads) = self.txn_read_holds.get(ctx.session().conn_id()) {
609            // In some cases, for example when REFRESH is used, the preparatory
610            // stages will already have acquired ReadHolds, we can re-use those.
611
612            txn_reads
613        } else {
614            // No one has acquired holds, make sure we can determine an as_of
615            // and render our dataflow below.
616            read_holds_owned = self.acquire_read_holds(&id_bundle);
617            &read_holds_owned
618        };
619
620        let (dataflow_as_of, storage_as_of, until) =
621            self.select_timestamps(id_bundle, refresh_schedule.as_ref(), read_holds)?;
622
623        tracing::info!(
624            dataflow_as_of = ?dataflow_as_of,
625            storage_as_of = ?storage_as_of,
626            until = ?until,
627            "materialized view timestamp selection",
628        );
629
630        let initial_as_of = storage_as_of.clone();
631
632        // Update the `create_sql` with the selected `as_of`. This is how we make sure the `as_of`
633        // is persisted to the catalog and can be relied on during bootstrapping.
634        // This has to be the `storage_as_of`, because bootstrapping uses this in
635        // `bootstrap_storage_collections`.
636        if let Some(storage_as_of_ts) = storage_as_of.as_option() {
637            let stmt = mz_sql::parse::parse(&create_sql)
638                .map_err(|_| {
639                    AdapterError::internal(
640                        "create materialized view",
641                        "original SQL should roundtrip",
642                    )
643                })?
644                .into_element()
645                .ast;
646            let ast::Statement::CreateMaterializedView(mut stmt) = stmt else {
647                panic!("unexpected statement type");
648            };
649            stmt.as_of = Some(storage_as_of_ts.into());
650            create_sql = stmt.to_ast_string_stable();
651        }
652
653        let desc = VersionedRelationDesc::new(global_lir_plan.desc().clone());
654        let collections = [(RelationVersion::root(), global_id)].into_iter().collect();
655
656        let local_mir_for_cache = local_mir_plan.expr();
657
658        let ops = vec![
659            catalog::Op::DropObjects(
660                drop_ids
661                    .into_iter()
662                    .map(catalog::DropObjectInfo::Item)
663                    .collect(),
664            ),
665            catalog::Op::CreateItem {
666                id: item_id,
667                name: name.clone(),
668                item: CatalogItem::MaterializedView(MaterializedView {
669                    create_sql,
670                    raw_expr: raw_expr.into(),
671                    optimized_expr: local_mir_plan.expr().into(),
672                    desc,
673                    collections,
674                    resolved_ids,
675                    dependencies,
676                    replacement_target,
677                    cluster_id,
678                    target_replica,
679                    non_null_assertions,
680                    custom_logical_compaction_window: compaction_window,
681                    refresh_schedule: refresh_schedule.clone(),
682                    initial_as_of: Some(initial_as_of.clone()),
683                }),
684                owner_id: *ctx.session().current_role_id(),
685            },
686        ];
687
688        // Pre-allocate a vector of transient GlobalIds for each notice.
689        let notice_ids = std::iter::repeat_with(|| self.allocate_transient_id())
690            .map(|(_item_id, global_id)| global_id)
691            .take(global_lir_plan.df_meta().optimizer_notices.len())
692            .collect::<Vec<_>>();
693
694        let transact_result = self
695            .catalog_transact_with_side_effects(Some(ctx), ops, move |coord, ctx| {
696                Box::pin(async move {
697                    let output_desc = global_lir_plan.desc().clone();
698                    let (mut df_desc, df_meta) = global_lir_plan.unapply();
699
700                    // Save plan structures.
701                    coord
702                        .catalog_mut()
703                        .set_optimized_plan(global_id, global_mir_plan.df_desc().clone());
704                    coord
705                        .catalog_mut()
706                        .set_physical_plan(global_id, df_desc.clone());
707
708                    let notice_builtin_updates_fut = coord
709                        .process_dataflow_metainfo(df_meta, global_id, ctx, notice_ids)
710                        .await;
711
712                    coord.catalog().cache_expressions(
713                        global_id,
714                        Some(local_mir_for_cache),
715                        optimizer_features,
716                    );
717
718                    df_desc.set_as_of(dataflow_as_of.clone());
719                    df_desc.set_initial_as_of(initial_as_of);
720                    df_desc.until = until;
721
722                    let storage_metadata = coord.catalog.state().storage_metadata();
723
724                    let mut collection_desc =
725                        CollectionDescription::for_other(output_desc, Some(storage_as_of));
726                    let mut allow_writes = true;
727
728                    // If this MV is intended to replace another one, we need to start it in
729                    // read-only mode, targeting the shard of the replacement target.
730                    if let Some(target_id) = replacement_target {
731                        let target_gid = coord.catalog.get_entry(&target_id).latest_global_id();
732                        collection_desc.primary = Some(target_gid);
733                        allow_writes = false;
734                    }
735
736                    // Announce the creation of the materialized view source.
737                    coord
738                        .controller
739                        .storage
740                        .create_collections(
741                            storage_metadata,
742                            None,
743                            vec![(global_id, collection_desc)],
744                        )
745                        .await
746                        .unwrap_or_terminate("cannot fail to append");
747
748                    coord
749                        .initialize_storage_read_policies(
750                            btreeset![item_id],
751                            compaction_window.unwrap_or(CompactionWindow::Default),
752                        )
753                        .await;
754
755                    coord
756                        .ship_dataflow_and_notice_builtin_table_updates(
757                            df_desc,
758                            cluster_id,
759                            notice_builtin_updates_fut,
760                            target_replica,
761                        )
762                        .await;
763
764                    if allow_writes {
765                        coord.allow_writes(cluster_id, global_id);
766                    }
767                })
768            })
769            .await;
770
771        match transact_result {
772            Ok(_) => Ok(ExecuteResponse::CreatedMaterializedView),
773            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
774                kind:
775                    mz_catalog::memory::error::ErrorKind::Sql(
776                        CatalogError::ItemAlreadyExists(_, _),
777                    ),
778            })) if if_not_exists => {
779                ctx.session()
780                    .add_notice(AdapterNotice::ObjectAlreadyExists {
781                        name: name.item,
782                        ty: "materialized view",
783                    });
784                Ok(ExecuteResponse::CreatedMaterializedView)
785            }
786            Err(err) => Err(err),
787        }
788        .map(StageResult::Response)
789    }
790
791    /// Select the initial `dataflow_as_of`, `storage_as_of`, and `until` frontiers for a
792    /// materialized view.
793    fn select_timestamps(
794        &self,
795        id_bundle: CollectionIdBundle,
796        refresh_schedule: Option<&RefreshSchedule>,
797        read_holds: &ReadHolds<mz_repr::Timestamp>,
798    ) -> Result<
799        (
800            Antichain<mz_repr::Timestamp>,
801            Antichain<mz_repr::Timestamp>,
802            Antichain<mz_repr::Timestamp>,
803        ),
804        AdapterError,
805    > {
806        assert!(
807            id_bundle.difference(&read_holds.id_bundle()).is_empty(),
808            "we must have read holds for all involved collections"
809        );
810
811        // For non-REFRESH MVs both the `dataflow_as_of` and the `storage_as_of` should be simply
812        // `least_valid_read`.
813        let least_valid_read = read_holds.least_valid_read();
814        let mut dataflow_as_of = least_valid_read.clone();
815        let mut storage_as_of = least_valid_read.clone();
816
817        // For MVs with non-trivial REFRESH schedules:
818        // 1. it's important to set the `storage_as_of` to the first refresh. This is because we'd
819        // like queries on the MV to block until the first refresh (rather than to show an empty
820        // MV).
821        // 2. We move the `dataflow_as_of` forward to the minimum of `greatest_available_read` and
822        // the first refresh time. There is no point in processing the times before
823        // `greatest_available_read`, because the first time for which results will be exposed is
824        // the first refresh time. Also note that simply moving the `dataflow_as_of` forward to the
825        // first refresh time would prevent warmup before the first refresh.
826        if let Some(refresh_schedule) = &refresh_schedule {
827            if let Some(least_valid_read_ts) = least_valid_read.as_option() {
828                if let Some(first_refresh_ts) =
829                    refresh_schedule.round_up_timestamp(*least_valid_read_ts)
830                {
831                    storage_as_of = Antichain::from_elem(first_refresh_ts);
832                    dataflow_as_of.join_assign(
833                        &self
834                            .greatest_available_read(&id_bundle)
835                            .meet(&storage_as_of),
836                    );
837                } else {
838                    let last_refresh = refresh_schedule.last_refresh().expect(
839                        "if round_up_timestamp returned None, then there should be a last refresh",
840                    );
841
842                    return Err(AdapterError::MaterializedViewWouldNeverRefresh(
843                        last_refresh,
844                        *least_valid_read_ts,
845                    ));
846                }
847            } else {
848                // The `as_of` should never be empty, because then the MV would be unreadable.
849                soft_panic_or_log!("creating a materialized view with an empty `as_of`");
850            }
851        }
852
853        // If we have a refresh schedule that has a last refresh, then set the `until` to the last refresh.
854        // (If the `try_step_forward` fails, then no need to set an `until`, because it's not possible to get any data
855        // beyond that last refresh time, because there are no times beyond that time.)
856        let until_ts = refresh_schedule
857            .and_then(|s| s.last_refresh())
858            .and_then(|r| r.try_step_forward());
859        let until = Antichain::from_iter(until_ts);
860
861        Ok((dataflow_as_of, storage_as_of, until))
862    }
863
864    #[instrument]
865    async fn create_materialized_view_explain(
866        &self,
867        session: &Session,
868        CreateMaterializedViewExplain {
869            global_id,
870            plan:
871                plan::CreateMaterializedViewPlan {
872                    name,
873                    materialized_view:
874                        plan::MaterializedView {
875                            column_names,
876                            cluster_id,
877                            ..
878                        },
879                    ..
880                },
881            df_meta,
882            explain_ctx:
883                ExplainPlanContext {
884                    config,
885                    format,
886                    stage,
887                    optimizer_trace,
888                    ..
889                },
890            ..
891        }: CreateMaterializedViewExplain,
892    ) -> Result<StageResult<Box<CreateMaterializedViewStage>>, AdapterError> {
893        let session_catalog = self.catalog().for_session(session);
894        let expr_humanizer = {
895            let full_name = self.catalog().resolve_full_name(&name, None);
896            let transient_items = btreemap! {
897                global_id => TransientItem::new(
898                    Some(full_name.into_parts()),
899                    Some(column_names.iter().map(|c| c.to_string()).collect()),
900                )
901            };
902            ExprHumanizerExt::new(transient_items, &session_catalog)
903        };
904
905        let target_cluster = self.catalog().get_cluster(cluster_id);
906
907        let features = OptimizerFeatures::from(self.catalog().system_config())
908            .override_from(&target_cluster.config.features())
909            .override_from(&config.features);
910
911        let rows = optimizer_trace
912            .into_rows(
913                format,
914                &config,
915                &features,
916                &expr_humanizer,
917                None,
918                Some(target_cluster),
919                df_meta,
920                stage,
921                plan::ExplaineeStatementKind::CreateMaterializedView,
922                None,
923            )
924            .await?;
925
926        Ok(StageResult::Response(Self::send_immediate_rows(rows)))
927    }
928
929    pub(crate) async fn explain_pushdown_materialized_view(
930        &self,
931        ctx: ExecuteContext,
932        item_id: CatalogItemId,
933    ) {
934        let CatalogItem::MaterializedView(mview) = self.catalog().get_entry(&item_id).item() else {
935            unreachable!() // Asserted in `sequence_explain_pushdown`.
936        };
937        let gid = mview.global_id_writes();
938        let mview = mview.clone();
939
940        let Some(plan) = self.catalog().try_get_physical_plan(&gid).cloned() else {
941            let msg = format!("cannot find plan for materialized view {item_id} in catalog");
942            tracing::error!("{msg}");
943            ctx.retire(Err(anyhow!("{msg}").into()));
944            return;
945        };
946
947        // We don't have any way to "duplicate" the read hold of the actual collection, which we
948        // obtain below... but the current implementation of read holds guarantees that the storage
949        // holds we obtain here will not be any greater than the hold we actually want.
950        let read_holds =
951            Some(self.acquire_read_holds(&dataflow_import_id_bundle(&plan, mview.cluster_id)));
952
953        let frontiers = self
954            .controller
955            .compute
956            .collection_frontiers(gid, Some(mview.cluster_id))
957            .expect("materialized view exists");
958
959        let as_of = frontiers.read_frontier.to_owned();
960
961        let until = mview
962            .refresh_schedule
963            .as_ref()
964            .and_then(|s| s.last_refresh())
965            .unwrap_or(mz_repr::Timestamp::MAX);
966
967        let mz_now = match as_of.as_option() {
968            Some(&as_of) => {
969                ResultSpec::value_between(Datum::MzTimestamp(as_of), Datum::MzTimestamp(until))
970            }
971            None => ResultSpec::value_all(),
972        };
973
974        self.execute_explain_pushdown_with_read_holds(
975            ctx,
976            as_of,
977            mz_now,
978            read_holds,
979            plan.source_imports
980                .into_iter()
981                .filter_map(|(id, import)| import.desc.arguments.operators.map(|mfp| (id, mfp))),
982        )
983        .await
984    }
985}