Skip to main content

mz_adapter/coord/sequencer/inner/
create_view.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11
12use maplit::btreemap;
13use mz_catalog::memory::error::ErrorKind;
14use mz_catalog::memory::objects::{CatalogItem, View};
15use mz_expr::CollectionPlan;
16use mz_ore::instrument;
17use mz_repr::explain::{ExprHumanizerExt, TransientItem};
18use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
19use mz_repr::{Datum, RelationDesc, Row};
20use mz_sql::ast::ExplainStage;
21use mz_sql::catalog::CatalogError;
22use mz_sql::names::ResolvedIds;
23use mz_sql::plan::{self};
24use mz_sql::session::metadata::SessionMetadata;
25use tracing::Span;
26
27use crate::command::ExecuteResponse;
28use crate::coord::sequencer::inner::return_if_err;
29use crate::coord::{
30    Coordinator, CreateViewExplain, CreateViewFinish, CreateViewOptimize, CreateViewStage,
31    ExplainContext, ExplainPlanContext, Message, PlanValidity, StageResult, Staged,
32    infer_sql_type_for_catalog,
33};
34use crate::error::AdapterError;
35use crate::explain::explain_plan;
36use crate::explain::optimizer_trace::OptimizerTrace;
37use crate::optimize::{self, Optimize};
38use crate::session::Session;
39use crate::{AdapterNotice, ExecuteContext, catalog};
40
41impl Staged for CreateViewStage {
42    type Ctx = ExecuteContext;
43
44    fn validity(&mut self) -> &mut PlanValidity {
45        match self {
46            Self::Optimize(stage) => &mut stage.validity,
47            Self::Finish(stage) => &mut stage.validity,
48            Self::Explain(stage) => &mut stage.validity,
49        }
50    }
51
52    async fn stage(
53        self,
54        coord: &mut Coordinator,
55        ctx: &mut ExecuteContext,
56    ) -> Result<StageResult<Box<Self>>, AdapterError> {
57        match self {
58            CreateViewStage::Optimize(stage) => coord.create_view_optimize(stage).await,
59            CreateViewStage::Finish(stage) => coord.create_view_finish(ctx.session(), stage).await,
60            CreateViewStage::Explain(stage) => {
61                coord.create_view_explain(ctx.session(), stage).await
62            }
63        }
64    }
65
66    fn message(self, ctx: ExecuteContext, span: Span) -> Message {
67        Message::CreateViewStageReady {
68            ctx,
69            span,
70            stage: self,
71        }
72    }
73
74    fn cancel_enabled(&self) -> bool {
75        true
76    }
77}
78
79impl Coordinator {
80    pub(crate) async fn sequence_create_view(
81        &mut self,
82        ctx: ExecuteContext,
83        plan: plan::CreateViewPlan,
84        resolved_ids: ResolvedIds,
85    ) {
86        let stage = return_if_err!(
87            self.create_view_validate(ctx.session(), plan, resolved_ids, ExplainContext::None),
88            ctx
89        );
90        self.sequence_staged(ctx, Span::current(), stage).await;
91    }
92
93    #[instrument]
94    pub(crate) async fn explain_create_view(
95        &mut self,
96        ctx: ExecuteContext,
97        plan::ExplainPlanPlan {
98            stage,
99            format,
100            config,
101            explainee,
102        }: plan::ExplainPlanPlan,
103    ) {
104        let plan::Explainee::Statement(stmt) = explainee else {
105            // This is currently asserted in the `sequence_explain_plan` code that
106            // calls this method.
107            unreachable!()
108        };
109        let plan::ExplaineeStatement::CreateView { broken, plan } = stmt else {
110            // This is currently asserted in the `sequence_explain_plan` code that
111            // calls this method.
112            unreachable!()
113        };
114
115        // Create an OptimizerTrace instance to collect plans emitted when
116        // executing the optimizer pipeline.
117        let optimizer_trace = OptimizerTrace::new(stage.paths());
118
119        // Not used in the EXPLAIN path so it's OK to generate a dummy value.
120        let resolved_ids = ResolvedIds::empty();
121
122        let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
123            broken,
124            config,
125            format,
126            stage,
127            replan: None,
128            desc: None,
129            optimizer_trace,
130        });
131        let stage = return_if_err!(
132            self.create_view_validate(ctx.session(), plan, resolved_ids, explain_ctx),
133            ctx
134        );
135        self.sequence_staged(ctx, Span::current(), stage).await;
136    }
137
138    #[instrument]
139    pub(crate) async fn explain_replan_view(
140        &mut self,
141        ctx: ExecuteContext,
142        plan::ExplainPlanPlan {
143            stage,
144            format,
145            config,
146            explainee,
147        }: plan::ExplainPlanPlan,
148    ) {
149        let plan::Explainee::ReplanView(id) = explainee else {
150            unreachable!() // Asserted in `sequence_explain_plan`.
151        };
152        let CatalogItem::View(item) = self.catalog().get_entry(&id).item() else {
153            unreachable!() // Asserted in `plan_explain_plan`.
154        };
155        let gid = item.global_id();
156
157        let create_sql = item.create_sql.clone();
158        let plan_result = self
159            .catalog_mut()
160            .deserialize_plan_with_enable_for_item_parsing(&create_sql, true);
161        let (plan, resolved_ids) = return_if_err!(plan_result, ctx);
162
163        let plan::Plan::CreateView(plan) = plan else {
164            unreachable!() // We are parsing the `create_sql` of a `View` item.
165        };
166
167        // It is safe to assume that query optimization will always succeed, so
168        // for now we statically assume `broken = false`.
169        let broken = false;
170
171        // Create an OptimizerTrace instance to collect plans emitted when
172        // executing the optimizer pipeline.
173        let optimizer_trace = OptimizerTrace::new(stage.paths());
174
175        let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
176            broken,
177            config,
178            format,
179            stage,
180            replan: Some(gid),
181            desc: None,
182            optimizer_trace,
183        });
184        let stage = return_if_err!(
185            self.create_view_validate(ctx.session(), plan, resolved_ids, explain_ctx),
186            ctx
187        );
188        self.sequence_staged(ctx, Span::current(), stage).await;
189    }
190
191    #[instrument]
192    pub(crate) fn explain_view(
193        &self,
194        ctx: &ExecuteContext,
195        plan::ExplainPlanPlan {
196            stage,
197            format,
198            config,
199            explainee,
200        }: plan::ExplainPlanPlan,
201    ) -> Result<ExecuteResponse, AdapterError> {
202        let plan::Explainee::View(id) = explainee else {
203            unreachable!() // Asserted in `sequence_explain_plan`.
204        };
205        let CatalogItem::View(view) = self.catalog().get_entry(&id).item() else {
206            unreachable!() // Asserted in `plan_explain_plan`.
207        };
208
209        let target_cluster = None; // Views don't have a target cluster.
210
211        let features =
212            OptimizerFeatures::from(self.catalog().system_config()).override_from(&config.features);
213
214        let cardinality_stats = BTreeMap::new();
215
216        let explain = match stage {
217            ExplainStage::RawPlan => explain_plan(
218                view.raw_expr.as_ref().clone(),
219                format,
220                &config,
221                &features,
222                &self.catalog().for_session(ctx.session()),
223                cardinality_stats,
224                target_cluster,
225            )?,
226            ExplainStage::LocalPlan => explain_plan(
227                view.locally_optimized_expr.as_inner().clone(),
228                format,
229                &config,
230                &features,
231                &self.catalog().for_session(ctx.session()),
232                cardinality_stats,
233                target_cluster,
234            )?,
235            _ => {
236                coord_bail!("cannot EXPLAIN {} FOR VIEW", stage);
237            }
238        };
239
240        let row = Row::pack_slice(&[Datum::from(explain.as_str())]);
241
242        Ok(Self::send_immediate_rows(row))
243    }
244
245    #[instrument]
246    fn create_view_validate(
247        &self,
248        session: &Session,
249        plan: plan::CreateViewPlan,
250        resolved_ids: ResolvedIds,
251        // An optional context set iff the state machine is initiated from
252        // sequencing an EXPLAIN for this statement.
253        explain_ctx: ExplainContext,
254    ) -> Result<CreateViewStage, AdapterError> {
255        let plan::CreateViewPlan {
256            view: plan::View { expr, .. },
257            ambiguous_columns,
258            ..
259        } = &plan;
260
261        // Validate any references in the view's expression. We do this on the
262        // unoptimized plan to better reflect what the user typed. We want to
263        // reject queries that depend on a relation in the wrong timeline, for
264        // example, even if we can *technically* optimize that reference away.
265        let expr_depends_on = expr.depends_on();
266        self.catalog()
267            .validate_timeline_context(expr_depends_on.iter().copied())?;
268        self.validate_system_column_references(*ambiguous_columns, &expr_depends_on)?;
269
270        // Track resolved dependencies so concurrent drops are caught between
271        // stages instead of panicking later when the persisted SQL is
272        // re-parsed during catalog application.
273        let validity = PlanValidity::new(
274            self.catalog().transient_revision(),
275            resolved_ids.items().copied().collect(),
276            None,
277            None,
278            session.role_metadata().clone(),
279        );
280
281        Ok(CreateViewStage::Optimize(CreateViewOptimize {
282            validity,
283            plan,
284            resolved_ids,
285            explain_ctx,
286        }))
287    }
288
289    #[instrument]
290    async fn create_view_optimize(
291        &mut self,
292        CreateViewOptimize {
293            validity,
294            plan,
295            resolved_ids,
296            explain_ctx,
297        }: CreateViewOptimize,
298    ) -> Result<StageResult<Box<CreateViewStage>>, AdapterError> {
299        let (item_id, global_id) = self.allocate_user_id().await?;
300
301        // Collect optimizer parameters.
302        let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config())
303            .override_from(&explain_ctx);
304
305        // Build an optimizer for this VIEW.
306        let mut optimizer =
307            optimize::view::Optimizer::new(optimizer_config, Some(self.optimizer_metrics()));
308
309        let span = Span::current();
310        Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
311            || "optimize create view",
312            move || {
313                span.in_scope(|| {
314                    let mut pipeline =
315                        || -> Result<mz_expr::OptimizedMirRelationExpr, AdapterError> {
316                            let _dispatch_guard = explain_ctx.dispatch_guard();
317
318                            // HIR ⇒ MIR lowering and MIR ⇒ MIR optimization (local)
319                            let raw_expr = plan.view.expr.clone();
320                            let optimized_expr = optimizer.catch_unwind_optimize(raw_expr)?;
321
322                            Ok(optimized_expr)
323                        };
324
325                    let stage = match pipeline() {
326                        Ok(optimized_expr) => {
327                            if let ExplainContext::Plan(explain_ctx) = explain_ctx {
328                                CreateViewStage::Explain(CreateViewExplain {
329                                    validity,
330                                    id: global_id,
331                                    plan,
332                                    explain_ctx,
333                                })
334                            } else {
335                                CreateViewStage::Finish(CreateViewFinish {
336                                    validity,
337                                    item_id,
338                                    global_id,
339                                    plan,
340                                    optimized_expr,
341                                    resolved_ids,
342                                })
343                            }
344                        }
345                        // Internal optimizer errors are handled differently
346                        // depending on the caller.
347                        Err(err) => {
348                            let ExplainContext::Plan(explain_ctx) = explain_ctx else {
349                                // In `sequence_~` contexts, immediately return the error.
350                                return Err(err);
351                            };
352
353                            if explain_ctx.broken {
354                                // In `EXPLAIN BROKEN` contexts, just log the error
355                                // and move to the next stage with default
356                                // parameters.
357                                tracing::error!("error while handling EXPLAIN statement: {}", err);
358                                CreateViewStage::Explain(CreateViewExplain {
359                                    validity,
360                                    id: global_id,
361                                    plan,
362                                    explain_ctx,
363                                })
364                            } else {
365                                // In regular `EXPLAIN` contexts, immediately return the error.
366                                return Err(err);
367                            }
368                        }
369                    };
370
371                    Ok(Box::new(stage))
372                })
373            },
374        )))
375    }
376
377    #[instrument]
378    async fn create_view_finish(
379        &mut self,
380        session: &Session,
381        CreateViewFinish {
382            item_id,
383            global_id,
384            plan:
385                plan::CreateViewPlan {
386                    name,
387                    view:
388                        plan::View {
389                            create_sql,
390                            expr: raw_expr,
391                            dependencies,
392                            column_names,
393                            temporary,
394                        },
395                    drop_ids,
396                    if_not_exists,
397                    ..
398                },
399            optimized_expr,
400            resolved_ids,
401            ..
402        }: CreateViewFinish,
403    ) -> Result<StageResult<Box<CreateViewStage>>, AdapterError> {
404        let typ = infer_sql_type_for_catalog(&raw_expr, &optimized_expr);
405        let ops = vec![
406            catalog::Op::DropObjects(
407                drop_ids
408                    .iter()
409                    .map(|id| catalog::DropObjectInfo::Item(*id))
410                    .collect(),
411            ),
412            catalog::Op::CreateItem {
413                id: item_id,
414                name: name.clone(),
415                item: CatalogItem::View(View {
416                    create_sql: create_sql.clone(),
417                    global_id,
418                    raw_expr: raw_expr.into(),
419                    desc: RelationDesc::new(typ, column_names.clone()),
420                    locally_optimized_expr: optimized_expr.into(),
421                    conn_id: if temporary {
422                        Some(session.conn_id().clone())
423                    } else {
424                        None
425                    },
426                    resolved_ids: resolved_ids.clone(),
427                    dependencies: dependencies.clone(),
428                }),
429                owner_id: *session.current_role_id(),
430            },
431        ];
432
433        match self.catalog_transact(Some(session), ops).await {
434            Ok(()) => Ok(StageResult::Response(ExecuteResponse::CreatedView)),
435            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
436                kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
437            })) if if_not_exists => {
438                session.add_notice(AdapterNotice::ObjectAlreadyExists {
439                    name: name.item,
440                    ty: "view",
441                });
442                Ok(StageResult::Response(ExecuteResponse::CreatedView))
443            }
444            Err(err) => Err(err),
445        }
446    }
447
448    #[instrument]
449    async fn create_view_explain(
450        &self,
451        session: &Session,
452        CreateViewExplain {
453            id,
454            plan:
455                plan::CreateViewPlan {
456                    name,
457                    view: plan::View { column_names, .. },
458                    ..
459                },
460            explain_ctx:
461                ExplainPlanContext {
462                    config,
463                    format,
464                    stage,
465                    optimizer_trace,
466                    ..
467                },
468            ..
469        }: CreateViewExplain,
470    ) -> Result<StageResult<Box<CreateViewStage>>, AdapterError> {
471        let session_catalog = self.catalog().for_session(session);
472        let expr_humanizer = {
473            let full_name = self.catalog().resolve_full_name(&name, None);
474            let transient_items = btreemap! {
475                id => TransientItem::new(
476                    Some(full_name.into_parts()),
477                    Some(column_names.iter().map(|c| c.to_string()).collect()),
478                )
479            };
480            ExprHumanizerExt::new(transient_items, &session_catalog)
481        };
482
483        let features =
484            OptimizerFeatures::from(self.catalog().system_config()).override_from(&config.features);
485
486        let rows = optimizer_trace
487            .into_rows(
488                format,
489                &config,
490                &features,
491                &expr_humanizer,
492                None,
493                None, // Views don't have a target cluster.
494                Default::default(),
495                stage,
496                plan::ExplaineeStatementKind::CreateView,
497                None,
498            )
499            .await?;
500
501        Ok(StageResult::Response(Self::send_immediate_rows(rows)))
502    }
503}