Skip to main content

mz_adapter/coord/sequencer/inner/
create_view.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11
12use maplit::btreemap;
13use mz_catalog::memory::error::ErrorKind;
14use mz_catalog::memory::objects::{CatalogItem, View};
15use mz_expr::CollectionPlan;
16use mz_ore::instrument;
17use mz_repr::explain::{ExprHumanizerExt, TransientItem};
18use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
19use mz_repr::{Datum, RelationDesc, Row};
20use mz_sql::ast::ExplainStage;
21use mz_sql::catalog::CatalogError;
22use mz_sql::names::ResolvedIds;
23use mz_sql::plan::{self};
24use mz_sql::session::metadata::SessionMetadata;
25use tracing::Span;
26
27use crate::command::ExecuteResponse;
28use crate::coord::sequencer::inner::return_if_err;
29use crate::coord::{
30    Coordinator, CreateViewExplain, CreateViewFinish, CreateViewOptimize, CreateViewStage,
31    ExplainContext, ExplainPlanContext, Message, PlanValidity, StageResult, Staged,
32    infer_sql_type_for_catalog,
33};
34use crate::error::AdapterError;
35use crate::explain::explain_plan;
36use crate::explain::optimizer_trace::OptimizerTrace;
37use crate::optimize::{self, Optimize};
38use crate::session::Session;
39use crate::{AdapterNotice, ExecuteContext, catalog};
40
41impl Staged for CreateViewStage {
42    type Ctx = ExecuteContext;
43
44    fn validity(&mut self) -> &mut PlanValidity {
45        match self {
46            Self::Optimize(stage) => &mut stage.validity,
47            Self::Finish(stage) => &mut stage.validity,
48            Self::Explain(stage) => &mut stage.validity,
49        }
50    }
51
52    async fn stage(
53        self,
54        coord: &mut Coordinator,
55        ctx: &mut ExecuteContext,
56    ) -> Result<StageResult<Box<Self>>, AdapterError> {
57        match self {
58            CreateViewStage::Optimize(stage) => coord.create_view_optimize(stage).await,
59            CreateViewStage::Finish(stage) => coord.create_view_finish(ctx.session(), stage).await,
60            CreateViewStage::Explain(stage) => {
61                coord.create_view_explain(ctx.session(), stage).await
62            }
63        }
64    }
65
66    fn message(self, ctx: ExecuteContext, span: Span) -> Message {
67        Message::CreateViewStageReady {
68            ctx,
69            span,
70            stage: self,
71        }
72    }
73
74    fn cancel_enabled(&self) -> bool {
75        true
76    }
77}
78
79impl Coordinator {
80    pub(crate) async fn sequence_create_view(
81        &mut self,
82        ctx: ExecuteContext,
83        plan: plan::CreateViewPlan,
84        resolved_ids: ResolvedIds,
85    ) {
86        let stage = return_if_err!(
87            self.create_view_validate(plan, resolved_ids, ExplainContext::None),
88            ctx
89        );
90        self.sequence_staged(ctx, Span::current(), stage).await;
91    }
92
93    #[instrument]
94    pub(crate) async fn explain_create_view(
95        &mut self,
96        ctx: ExecuteContext,
97        plan::ExplainPlanPlan {
98            stage,
99            format,
100            config,
101            explainee,
102        }: plan::ExplainPlanPlan,
103    ) {
104        let plan::Explainee::Statement(stmt) = explainee else {
105            // This is currently asserted in the `sequence_explain_plan` code that
106            // calls this method.
107            unreachable!()
108        };
109        let plan::ExplaineeStatement::CreateView { broken, plan } = stmt else {
110            // This is currently asserted in the `sequence_explain_plan` code that
111            // calls this method.
112            unreachable!()
113        };
114
115        // Create an OptimizerTrace instance to collect plans emitted when
116        // executing the optimizer pipeline.
117        let optimizer_trace = OptimizerTrace::new(stage.paths());
118
119        // Not used in the EXPLAIN path so it's OK to generate a dummy value.
120        let resolved_ids = ResolvedIds::empty();
121
122        let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
123            broken,
124            config,
125            format,
126            stage,
127            replan: None,
128            desc: None,
129            optimizer_trace,
130        });
131        let stage = return_if_err!(
132            self.create_view_validate(plan, resolved_ids, explain_ctx),
133            ctx
134        );
135        self.sequence_staged(ctx, Span::current(), stage).await;
136    }
137
138    #[instrument]
139    pub(crate) async fn explain_replan_view(
140        &mut self,
141        ctx: ExecuteContext,
142        plan::ExplainPlanPlan {
143            stage,
144            format,
145            config,
146            explainee,
147        }: plan::ExplainPlanPlan,
148    ) {
149        let plan::Explainee::ReplanView(id) = explainee else {
150            unreachable!() // Asserted in `sequence_explain_plan`.
151        };
152        let CatalogItem::View(item) = self.catalog().get_entry(&id).item() else {
153            unreachable!() // Asserted in `plan_explain_plan`.
154        };
155        let gid = item.global_id();
156
157        let create_sql = item.create_sql.clone();
158        let plan_result = self
159            .catalog_mut()
160            .deserialize_plan_with_enable_for_item_parsing(&create_sql, true);
161        let (plan, resolved_ids) = return_if_err!(plan_result, ctx);
162
163        let plan::Plan::CreateView(plan) = plan else {
164            unreachable!() // We are parsing the `create_sql` of a `View` item.
165        };
166
167        // It is safe to assume that query optimization will always succeed, so
168        // for now we statically assume `broken = false`.
169        let broken = false;
170
171        // Create an OptimizerTrace instance to collect plans emitted when
172        // executing the optimizer pipeline.
173        let optimizer_trace = OptimizerTrace::new(stage.paths());
174
175        let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
176            broken,
177            config,
178            format,
179            stage,
180            replan: Some(gid),
181            desc: None,
182            optimizer_trace,
183        });
184        let stage = return_if_err!(
185            self.create_view_validate(plan, resolved_ids, explain_ctx),
186            ctx
187        );
188        self.sequence_staged(ctx, Span::current(), stage).await;
189    }
190
191    #[instrument]
192    pub(crate) fn explain_view(
193        &self,
194        ctx: &ExecuteContext,
195        plan::ExplainPlanPlan {
196            stage,
197            format,
198            config,
199            explainee,
200        }: plan::ExplainPlanPlan,
201    ) -> Result<ExecuteResponse, AdapterError> {
202        let plan::Explainee::View(id) = explainee else {
203            unreachable!() // Asserted in `sequence_explain_plan`.
204        };
205        let CatalogItem::View(view) = self.catalog().get_entry(&id).item() else {
206            unreachable!() // Asserted in `plan_explain_plan`.
207        };
208
209        let target_cluster = None; // Views don't have a target cluster.
210
211        let features =
212            OptimizerFeatures::from(self.catalog().system_config()).override_from(&config.features);
213
214        let cardinality_stats = BTreeMap::new();
215
216        let explain = match stage {
217            ExplainStage::RawPlan => explain_plan(
218                view.raw_expr.as_ref().clone(),
219                format,
220                &config,
221                &features,
222                &self.catalog().for_session(ctx.session()),
223                cardinality_stats,
224                target_cluster,
225            )?,
226            ExplainStage::LocalPlan => explain_plan(
227                view.optimized_expr.as_inner().clone(),
228                format,
229                &config,
230                &features,
231                &self.catalog().for_session(ctx.session()),
232                cardinality_stats,
233                target_cluster,
234            )?,
235            _ => {
236                coord_bail!("cannot EXPLAIN {} FOR VIEW", stage);
237            }
238        };
239
240        let row = Row::pack_slice(&[Datum::from(explain.as_str())]);
241
242        Ok(Self::send_immediate_rows(row))
243    }
244
245    #[instrument]
246    fn create_view_validate(
247        &self,
248        plan: plan::CreateViewPlan,
249        resolved_ids: ResolvedIds,
250        // An optional context set iff the state machine is initiated from
251        // sequencing an EXPLAIN for this statement.
252        explain_ctx: ExplainContext,
253    ) -> Result<CreateViewStage, AdapterError> {
254        let plan::CreateViewPlan {
255            view: plan::View { expr, .. },
256            ambiguous_columns,
257            ..
258        } = &plan;
259
260        // Validate any references in the view's expression. We do this on the
261        // unoptimized plan to better reflect what the user typed. We want to
262        // reject queries that depend on a relation in the wrong timeline, for
263        // example, even if we can *technically* optimize that reference away.
264        let expr_depends_on = expr.depends_on();
265        self.catalog()
266            .validate_timeline_context(expr_depends_on.iter().copied())?;
267        self.validate_system_column_references(*ambiguous_columns, &expr_depends_on)?;
268
269        let validity =
270            PlanValidity::require_transient_revision(self.catalog().transient_revision());
271
272        Ok(CreateViewStage::Optimize(CreateViewOptimize {
273            validity,
274            plan,
275            resolved_ids,
276            explain_ctx,
277        }))
278    }
279
280    #[instrument]
281    async fn create_view_optimize(
282        &mut self,
283        CreateViewOptimize {
284            validity,
285            plan,
286            resolved_ids,
287            explain_ctx,
288        }: CreateViewOptimize,
289    ) -> Result<StageResult<Box<CreateViewStage>>, AdapterError> {
290        let id_ts = self.get_catalog_write_ts().await;
291        let (item_id, global_id) = self.catalog().allocate_user_id(id_ts).await?;
292
293        // Collect optimizer parameters.
294        let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config())
295            .override_from(&explain_ctx);
296
297        // Build an optimizer for this VIEW.
298        let mut optimizer =
299            optimize::view::Optimizer::new(optimizer_config, Some(self.optimizer_metrics()));
300
301        let span = Span::current();
302        Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
303            || "optimize create view",
304            move || {
305                span.in_scope(|| {
306                    let mut pipeline =
307                        || -> Result<mz_expr::OptimizedMirRelationExpr, AdapterError> {
308                            let _dispatch_guard = explain_ctx.dispatch_guard();
309
310                            // HIR ⇒ MIR lowering and MIR ⇒ MIR optimization (local)
311                            let raw_expr = plan.view.expr.clone();
312                            let optimized_expr = optimizer.catch_unwind_optimize(raw_expr)?;
313
314                            Ok(optimized_expr)
315                        };
316
317                    let stage = match pipeline() {
318                        Ok(optimized_expr) => {
319                            if let ExplainContext::Plan(explain_ctx) = explain_ctx {
320                                CreateViewStage::Explain(CreateViewExplain {
321                                    validity,
322                                    id: global_id,
323                                    plan,
324                                    explain_ctx,
325                                })
326                            } else {
327                                CreateViewStage::Finish(CreateViewFinish {
328                                    validity,
329                                    item_id,
330                                    global_id,
331                                    plan,
332                                    optimized_expr,
333                                    resolved_ids,
334                                })
335                            }
336                        }
337                        // Internal optimizer errors are handled differently
338                        // depending on the caller.
339                        Err(err) => {
340                            let ExplainContext::Plan(explain_ctx) = explain_ctx else {
341                                // In `sequence_~` contexts, immediately return the error.
342                                return Err(err);
343                            };
344
345                            if explain_ctx.broken {
346                                // In `EXPLAIN BROKEN` contexts, just log the error
347                                // and move to the next stage with default
348                                // parameters.
349                                tracing::error!("error while handling EXPLAIN statement: {}", err);
350                                CreateViewStage::Explain(CreateViewExplain {
351                                    validity,
352                                    id: global_id,
353                                    plan,
354                                    explain_ctx,
355                                })
356                            } else {
357                                // In regular `EXPLAIN` contexts, immediately return the error.
358                                return Err(err);
359                            }
360                        }
361                    };
362
363                    Ok(Box::new(stage))
364                })
365            },
366        )))
367    }
368
369    #[instrument]
370    async fn create_view_finish(
371        &mut self,
372        session: &Session,
373        CreateViewFinish {
374            item_id,
375            global_id,
376            plan:
377                plan::CreateViewPlan {
378                    name,
379                    view:
380                        plan::View {
381                            create_sql,
382                            expr: raw_expr,
383                            dependencies,
384                            column_names,
385                            temporary,
386                        },
387                    drop_ids,
388                    if_not_exists,
389                    ..
390                },
391            optimized_expr,
392            resolved_ids,
393            ..
394        }: CreateViewFinish,
395    ) -> Result<StageResult<Box<CreateViewStage>>, AdapterError> {
396        let typ = infer_sql_type_for_catalog(&raw_expr, &optimized_expr);
397        let ops = vec![
398            catalog::Op::DropObjects(
399                drop_ids
400                    .iter()
401                    .map(|id| catalog::DropObjectInfo::Item(*id))
402                    .collect(),
403            ),
404            catalog::Op::CreateItem {
405                id: item_id,
406                name: name.clone(),
407                item: CatalogItem::View(View {
408                    create_sql: create_sql.clone(),
409                    global_id,
410                    raw_expr: raw_expr.into(),
411                    desc: RelationDesc::new(typ, column_names.clone()),
412                    optimized_expr: optimized_expr.into(),
413                    conn_id: if temporary {
414                        Some(session.conn_id().clone())
415                    } else {
416                        None
417                    },
418                    resolved_ids: resolved_ids.clone(),
419                    dependencies: dependencies.clone(),
420                }),
421                owner_id: *session.current_role_id(),
422            },
423        ];
424
425        match self.catalog_transact(Some(session), ops).await {
426            Ok(()) => Ok(StageResult::Response(ExecuteResponse::CreatedView)),
427            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
428                kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
429            })) if if_not_exists => {
430                session.add_notice(AdapterNotice::ObjectAlreadyExists {
431                    name: name.item,
432                    ty: "view",
433                });
434                Ok(StageResult::Response(ExecuteResponse::CreatedView))
435            }
436            Err(err) => Err(err),
437        }
438    }
439
440    #[instrument]
441    async fn create_view_explain(
442        &self,
443        session: &Session,
444        CreateViewExplain {
445            id,
446            plan:
447                plan::CreateViewPlan {
448                    name,
449                    view: plan::View { column_names, .. },
450                    ..
451                },
452            explain_ctx:
453                ExplainPlanContext {
454                    config,
455                    format,
456                    stage,
457                    optimizer_trace,
458                    ..
459                },
460            ..
461        }: CreateViewExplain,
462    ) -> Result<StageResult<Box<CreateViewStage>>, AdapterError> {
463        let session_catalog = self.catalog().for_session(session);
464        let expr_humanizer = {
465            let full_name = self.catalog().resolve_full_name(&name, None);
466            let transient_items = btreemap! {
467                id => TransientItem::new(
468                    Some(full_name.into_parts()),
469                    Some(column_names.iter().map(|c| c.to_string()).collect()),
470                )
471            };
472            ExprHumanizerExt::new(transient_items, &session_catalog)
473        };
474
475        let features =
476            OptimizerFeatures::from(self.catalog().system_config()).override_from(&config.features);
477
478        let rows = optimizer_trace
479            .into_rows(
480                format,
481                &config,
482                &features,
483                &expr_humanizer,
484                None,
485                None, // Views don't have a target cluster.
486                Default::default(),
487                stage,
488                plan::ExplaineeStatementKind::CreateView,
489                None,
490            )
491            .await?;
492
493        Ok(StageResult::Response(Self::send_immediate_rows(rows)))
494    }
495}