Skip to main content

mz_adapter/coord/sequencer/inner/
create_view.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11
12use maplit::btreemap;
13use mz_catalog::memory::objects::{CatalogItem, View};
14use mz_expr::CollectionPlan;
15use mz_ore::instrument;
16use mz_repr::explain::{ExprHumanizerExt, TransientItem};
17use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
18use mz_repr::{Datum, RelationDesc, Row};
19use mz_sql::ast::ExplainStage;
20use mz_sql::catalog::CatalogError;
21use mz_sql::names::ResolvedIds;
22use mz_sql::plan::{self};
23use mz_sql::session::metadata::SessionMetadata;
24use tracing::Span;
25
26use crate::command::ExecuteResponse;
27use crate::coord::sequencer::inner::return_if_err;
28use crate::coord::{
29    Coordinator, CreateViewExplain, CreateViewFinish, CreateViewOptimize, CreateViewStage,
30    ExplainContext, ExplainPlanContext, Message, PlanValidity, StageResult, Staged,
31    infer_sql_type_for_catalog,
32};
33use crate::error::AdapterError;
34use crate::explain::explain_plan;
35use crate::explain::optimizer_trace::OptimizerTrace;
36use crate::optimize::{self, Optimize};
37use crate::session::Session;
38use crate::{AdapterNotice, ExecuteContext, catalog};
39
40impl Staged for CreateViewStage {
41    type Ctx = ExecuteContext;
42
43    fn validity(&mut self) -> &mut PlanValidity {
44        match self {
45            Self::Optimize(stage) => &mut stage.validity,
46            Self::Finish(stage) => &mut stage.validity,
47            Self::Explain(stage) => &mut stage.validity,
48        }
49    }
50
51    async fn stage(
52        self,
53        coord: &mut Coordinator,
54        ctx: &mut ExecuteContext,
55    ) -> Result<StageResult<Box<Self>>, AdapterError> {
56        match self {
57            CreateViewStage::Optimize(stage) => coord.create_view_optimize(stage).await,
58            CreateViewStage::Finish(stage) => coord.create_view_finish(ctx.session(), stage).await,
59            CreateViewStage::Explain(stage) => {
60                coord.create_view_explain(ctx.session(), stage).await
61            }
62        }
63    }
64
65    fn message(self, ctx: ExecuteContext, span: Span) -> Message {
66        Message::CreateViewStageReady {
67            ctx,
68            span,
69            stage: self,
70        }
71    }
72
73    fn cancel_enabled(&self) -> bool {
74        true
75    }
76}
77
78impl Coordinator {
79    pub(crate) async fn sequence_create_view(
80        &mut self,
81        ctx: ExecuteContext,
82        plan: plan::CreateViewPlan,
83        resolved_ids: ResolvedIds,
84    ) {
85        let stage = return_if_err!(
86            self.create_view_validate(plan, resolved_ids, ExplainContext::None),
87            ctx
88        );
89        self.sequence_staged(ctx, Span::current(), stage).await;
90    }
91
92    #[instrument]
93    pub(crate) async fn explain_create_view(
94        &mut self,
95        ctx: ExecuteContext,
96        plan::ExplainPlanPlan {
97            stage,
98            format,
99            config,
100            explainee,
101        }: plan::ExplainPlanPlan,
102    ) {
103        let plan::Explainee::Statement(stmt) = explainee else {
104            // This is currently asserted in the `sequence_explain_plan` code that
105            // calls this method.
106            unreachable!()
107        };
108        let plan::ExplaineeStatement::CreateView { broken, plan } = stmt else {
109            // This is currently asserted in the `sequence_explain_plan` code that
110            // calls this method.
111            unreachable!()
112        };
113
114        // Create an OptimizerTrace instance to collect plans emitted when
115        // executing the optimizer pipeline.
116        let optimizer_trace = OptimizerTrace::new(stage.paths());
117
118        // Not used in the EXPLAIN path so it's OK to generate a dummy value.
119        let resolved_ids = ResolvedIds::empty();
120
121        let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
122            broken,
123            config,
124            format,
125            stage,
126            replan: None,
127            desc: None,
128            optimizer_trace,
129        });
130        let stage = return_if_err!(
131            self.create_view_validate(plan, resolved_ids, explain_ctx),
132            ctx
133        );
134        self.sequence_staged(ctx, Span::current(), stage).await;
135    }
136
137    #[instrument]
138    pub(crate) async fn explain_replan_view(
139        &mut self,
140        ctx: ExecuteContext,
141        plan::ExplainPlanPlan {
142            stage,
143            format,
144            config,
145            explainee,
146        }: plan::ExplainPlanPlan,
147    ) {
148        let plan::Explainee::ReplanView(id) = explainee else {
149            unreachable!() // Asserted in `sequence_explain_plan`.
150        };
151        let CatalogItem::View(item) = self.catalog().get_entry(&id).item() else {
152            unreachable!() // Asserted in `plan_explain_plan`.
153        };
154        let gid = item.global_id();
155
156        let create_sql = item.create_sql.clone();
157        let plan_result = self
158            .catalog_mut()
159            .deserialize_plan_with_enable_for_item_parsing(&create_sql, true);
160        let (plan, resolved_ids) = return_if_err!(plan_result, ctx);
161
162        let plan::Plan::CreateView(plan) = plan else {
163            unreachable!() // We are parsing the `create_sql` of a `View` item.
164        };
165
166        // It is safe to assume that query optimization will always succeed, so
167        // for now we statically assume `broken = false`.
168        let broken = false;
169
170        // Create an OptimizerTrace instance to collect plans emitted when
171        // executing the optimizer pipeline.
172        let optimizer_trace = OptimizerTrace::new(stage.paths());
173
174        let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
175            broken,
176            config,
177            format,
178            stage,
179            replan: Some(gid),
180            desc: None,
181            optimizer_trace,
182        });
183        let stage = return_if_err!(
184            self.create_view_validate(plan, resolved_ids, explain_ctx),
185            ctx
186        );
187        self.sequence_staged(ctx, Span::current(), stage).await;
188    }
189
190    #[instrument]
191    pub(crate) fn explain_view(
192        &self,
193        ctx: &ExecuteContext,
194        plan::ExplainPlanPlan {
195            stage,
196            format,
197            config,
198            explainee,
199        }: plan::ExplainPlanPlan,
200    ) -> Result<ExecuteResponse, AdapterError> {
201        let plan::Explainee::View(id) = explainee else {
202            unreachable!() // Asserted in `sequence_explain_plan`.
203        };
204        let CatalogItem::View(view) = self.catalog().get_entry(&id).item() else {
205            unreachable!() // Asserted in `plan_explain_plan`.
206        };
207
208        let target_cluster = None; // Views don't have a target cluster.
209
210        let features =
211            OptimizerFeatures::from(self.catalog().system_config()).override_from(&config.features);
212
213        let cardinality_stats = BTreeMap::new();
214
215        let explain = match stage {
216            ExplainStage::RawPlan => explain_plan(
217                view.raw_expr.as_ref().clone(),
218                format,
219                &config,
220                &features,
221                &self.catalog().for_session(ctx.session()),
222                cardinality_stats,
223                target_cluster,
224            )?,
225            ExplainStage::LocalPlan => explain_plan(
226                view.optimized_expr.as_inner().clone(),
227                format,
228                &config,
229                &features,
230                &self.catalog().for_session(ctx.session()),
231                cardinality_stats,
232                target_cluster,
233            )?,
234            _ => {
235                coord_bail!("cannot EXPLAIN {} FOR VIEW", stage);
236            }
237        };
238
239        let row = Row::pack_slice(&[Datum::from(explain.as_str())]);
240
241        Ok(Self::send_immediate_rows(row))
242    }
243
244    #[instrument]
245    fn create_view_validate(
246        &self,
247        plan: plan::CreateViewPlan,
248        resolved_ids: ResolvedIds,
249        // An optional context set iff the state machine is initiated from
250        // sequencing an EXPLAIN for this statement.
251        explain_ctx: ExplainContext,
252    ) -> Result<CreateViewStage, AdapterError> {
253        let plan::CreateViewPlan {
254            view: plan::View { expr, .. },
255            ambiguous_columns,
256            ..
257        } = &plan;
258
259        // Validate any references in the view's expression. We do this on the
260        // unoptimized plan to better reflect what the user typed. We want to
261        // reject queries that depend on a relation in the wrong timeline, for
262        // example, even if we can *technically* optimize that reference away.
263        let expr_depends_on = expr.depends_on();
264        self.catalog()
265            .validate_timeline_context(expr_depends_on.iter().copied())?;
266        self.validate_system_column_references(*ambiguous_columns, &expr_depends_on)?;
267
268        let validity =
269            PlanValidity::require_transient_revision(self.catalog().transient_revision());
270
271        Ok(CreateViewStage::Optimize(CreateViewOptimize {
272            validity,
273            plan,
274            resolved_ids,
275            explain_ctx,
276        }))
277    }
278
279    #[instrument]
280    async fn create_view_optimize(
281        &mut self,
282        CreateViewOptimize {
283            validity,
284            plan,
285            resolved_ids,
286            explain_ctx,
287        }: CreateViewOptimize,
288    ) -> Result<StageResult<Box<CreateViewStage>>, AdapterError> {
289        let id_ts = self.get_catalog_write_ts().await;
290        let (item_id, global_id) = self.catalog().allocate_user_id(id_ts).await?;
291
292        // Collect optimizer parameters.
293        let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config())
294            .override_from(&explain_ctx);
295
296        // Build an optimizer for this VIEW.
297        let mut optimizer =
298            optimize::view::Optimizer::new(optimizer_config, Some(self.optimizer_metrics()));
299
300        let span = Span::current();
301        Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
302            || "optimize create view",
303            move || {
304                span.in_scope(|| {
305                    let mut pipeline =
306                        || -> Result<mz_expr::OptimizedMirRelationExpr, AdapterError> {
307                            let _dispatch_guard = explain_ctx.dispatch_guard();
308
309                            // HIR ⇒ MIR lowering and MIR ⇒ MIR optimization (local)
310                            let raw_expr = plan.view.expr.clone();
311                            let optimized_expr = optimizer.catch_unwind_optimize(raw_expr)?;
312
313                            Ok(optimized_expr)
314                        };
315
316                    let stage = match pipeline() {
317                        Ok(optimized_expr) => {
318                            if let ExplainContext::Plan(explain_ctx) = explain_ctx {
319                                CreateViewStage::Explain(CreateViewExplain {
320                                    validity,
321                                    id: global_id,
322                                    plan,
323                                    explain_ctx,
324                                })
325                            } else {
326                                CreateViewStage::Finish(CreateViewFinish {
327                                    validity,
328                                    item_id,
329                                    global_id,
330                                    plan,
331                                    optimized_expr,
332                                    resolved_ids,
333                                })
334                            }
335                        }
336                        // Internal optimizer errors are handled differently
337                        // depending on the caller.
338                        Err(err) => {
339                            let ExplainContext::Plan(explain_ctx) = explain_ctx else {
340                                // In `sequence_~` contexts, immediately return the error.
341                                return Err(err);
342                            };
343
344                            if explain_ctx.broken {
345                                // In `EXPLAIN BROKEN` contexts, just log the error
346                                // and move to the next stage with default
347                                // parameters.
348                                tracing::error!("error while handling EXPLAIN statement: {}", err);
349                                CreateViewStage::Explain(CreateViewExplain {
350                                    validity,
351                                    id: global_id,
352                                    plan,
353                                    explain_ctx,
354                                })
355                            } else {
356                                // In regular `EXPLAIN` contexts, immediately return the error.
357                                return Err(err);
358                            }
359                        }
360                    };
361
362                    Ok(Box::new(stage))
363                })
364            },
365        )))
366    }
367
368    #[instrument]
369    async fn create_view_finish(
370        &mut self,
371        session: &Session,
372        CreateViewFinish {
373            item_id,
374            global_id,
375            plan:
376                plan::CreateViewPlan {
377                    name,
378                    view:
379                        plan::View {
380                            create_sql,
381                            expr: raw_expr,
382                            dependencies,
383                            column_names,
384                            temporary,
385                        },
386                    drop_ids,
387                    if_not_exists,
388                    ..
389                },
390            optimized_expr,
391            resolved_ids,
392            ..
393        }: CreateViewFinish,
394    ) -> Result<StageResult<Box<CreateViewStage>>, AdapterError> {
395        let typ = infer_sql_type_for_catalog(&raw_expr, &optimized_expr);
396        let ops = vec![
397            catalog::Op::DropObjects(
398                drop_ids
399                    .iter()
400                    .map(|id| catalog::DropObjectInfo::Item(*id))
401                    .collect(),
402            ),
403            catalog::Op::CreateItem {
404                id: item_id,
405                name: name.clone(),
406                item: CatalogItem::View(View {
407                    create_sql: create_sql.clone(),
408                    global_id,
409                    raw_expr: raw_expr.into(),
410                    desc: RelationDesc::new(typ, column_names.clone()),
411                    optimized_expr: optimized_expr.into(),
412                    conn_id: if temporary {
413                        Some(session.conn_id().clone())
414                    } else {
415                        None
416                    },
417                    resolved_ids: resolved_ids.clone(),
418                    dependencies: dependencies.clone(),
419                }),
420                owner_id: *session.current_role_id(),
421            },
422        ];
423
424        match self.catalog_transact(Some(session), ops).await {
425            Ok(()) => Ok(StageResult::Response(ExecuteResponse::CreatedView)),
426            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
427                kind:
428                    mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
429            })) if if_not_exists => {
430                session.add_notice(AdapterNotice::ObjectAlreadyExists {
431                    name: name.item,
432                    ty: "view",
433                });
434                Ok(StageResult::Response(ExecuteResponse::CreatedView))
435            }
436            Err(err) => Err(err),
437        }
438    }
439
440    #[instrument]
441    async fn create_view_explain(
442        &self,
443        session: &Session,
444        CreateViewExplain {
445            id,
446            plan:
447                plan::CreateViewPlan {
448                    name,
449                    view: plan::View { column_names, .. },
450                    ..
451                },
452            explain_ctx:
453                ExplainPlanContext {
454                    config,
455                    format,
456                    stage,
457                    optimizer_trace,
458                    ..
459                },
460            ..
461        }: CreateViewExplain,
462    ) -> Result<StageResult<Box<CreateViewStage>>, AdapterError> {
463        let session_catalog = self.catalog().for_session(session);
464        let expr_humanizer = {
465            let full_name = self.catalog().resolve_full_name(&name, None);
466            let transient_items = btreemap! {
467                id => TransientItem::new(
468                    Some(full_name.into_parts()),
469                    Some(column_names.iter().map(|c| c.to_string()).collect()),
470                )
471            };
472            ExprHumanizerExt::new(transient_items, &session_catalog)
473        };
474
475        let features =
476            OptimizerFeatures::from(self.catalog().system_config()).override_from(&config.features);
477
478        let rows = optimizer_trace
479            .into_rows(
480                format,
481                &config,
482                &features,
483                &expr_humanizer,
484                None,
485                None, // Views don't have a target cluster.
486                Default::default(),
487                stage,
488                plan::ExplaineeStatementKind::CreateView,
489                None,
490            )
491            .await?;
492
493        Ok(StageResult::Response(Self::send_immediate_rows(rows)))
494    }
495}