mz_adapter/coord/sequencer/inner/
create_view.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11
12use maplit::btreemap;
13use mz_catalog::memory::objects::{CatalogItem, View};
14use mz_expr::CollectionPlan;
15use mz_ore::instrument;
16use mz_repr::explain::{ExprHumanizerExt, TransientItem};
17use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
18use mz_repr::{Datum, RelationDesc, Row};
19use mz_sql::ast::ExplainStage;
20use mz_sql::catalog::CatalogError;
21use mz_sql::names::ResolvedIds;
22use mz_sql::plan::{self};
23use mz_sql::session::metadata::SessionMetadata;
24use tracing::Span;
25
26use crate::command::ExecuteResponse;
27use crate::coord::sequencer::inner::return_if_err;
28use crate::coord::{
29    Coordinator, CreateViewExplain, CreateViewFinish, CreateViewOptimize, CreateViewStage,
30    ExplainContext, ExplainPlanContext, Message, PlanValidity, StageResult, Staged,
31};
32use crate::error::AdapterError;
33use crate::explain::explain_plan;
34use crate::explain::optimizer_trace::OptimizerTrace;
35use crate::optimize::{self, Optimize};
36use crate::session::Session;
37use crate::{AdapterNotice, ExecuteContext, catalog};
38
39impl Staged for CreateViewStage {
40    type Ctx = ExecuteContext;
41
42    fn validity(&mut self) -> &mut PlanValidity {
43        match self {
44            Self::Optimize(stage) => &mut stage.validity,
45            Self::Finish(stage) => &mut stage.validity,
46            Self::Explain(stage) => &mut stage.validity,
47        }
48    }
49
50    async fn stage(
51        self,
52        coord: &mut Coordinator,
53        ctx: &mut ExecuteContext,
54    ) -> Result<StageResult<Box<Self>>, AdapterError> {
55        match self {
56            CreateViewStage::Optimize(stage) => coord.create_view_optimize(stage).await,
57            CreateViewStage::Finish(stage) => coord.create_view_finish(ctx.session(), stage).await,
58            CreateViewStage::Explain(stage) => {
59                coord.create_view_explain(ctx.session(), stage).await
60            }
61        }
62    }
63
64    fn message(self, ctx: ExecuteContext, span: Span) -> Message {
65        Message::CreateViewStageReady {
66            ctx,
67            span,
68            stage: self,
69        }
70    }
71
72    fn cancel_enabled(&self) -> bool {
73        true
74    }
75}
76
77impl Coordinator {
78    pub(crate) async fn sequence_create_view(
79        &mut self,
80        ctx: ExecuteContext,
81        plan: plan::CreateViewPlan,
82        resolved_ids: ResolvedIds,
83    ) {
84        let stage = return_if_err!(
85            self.create_view_validate(plan, resolved_ids, ExplainContext::None),
86            ctx
87        );
88        self.sequence_staged(ctx, Span::current(), stage).await;
89    }
90
91    #[instrument]
92    pub(crate) async fn explain_create_view(
93        &mut self,
94        ctx: ExecuteContext,
95        plan::ExplainPlanPlan {
96            stage,
97            format,
98            config,
99            explainee,
100        }: plan::ExplainPlanPlan,
101    ) {
102        let plan::Explainee::Statement(stmt) = explainee else {
103            // This is currently asserted in the `sequence_explain_plan` code that
104            // calls this method.
105            unreachable!()
106        };
107        let plan::ExplaineeStatement::CreateView { broken, plan } = stmt else {
108            // This is currently asserted in the `sequence_explain_plan` code that
109            // calls this method.
110            unreachable!()
111        };
112
113        // Create an OptimizerTrace instance to collect plans emitted when
114        // executing the optimizer pipeline.
115        let optimizer_trace = OptimizerTrace::new(stage.paths());
116
117        // Not used in the EXPLAIN path so it's OK to generate a dummy value.
118        let resolved_ids = ResolvedIds::empty();
119
120        let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
121            broken,
122            config,
123            format,
124            stage,
125            replan: None,
126            desc: None,
127            optimizer_trace,
128        });
129        let stage = return_if_err!(
130            self.create_view_validate(plan, resolved_ids, explain_ctx),
131            ctx
132        );
133        self.sequence_staged(ctx, Span::current(), stage).await;
134    }
135
136    #[instrument]
137    pub(crate) async fn explain_replan_view(
138        &mut self,
139        ctx: ExecuteContext,
140        plan::ExplainPlanPlan {
141            stage,
142            format,
143            config,
144            explainee,
145        }: plan::ExplainPlanPlan,
146    ) {
147        let plan::Explainee::ReplanView(id) = explainee else {
148            unreachable!() // Asserted in `sequence_explain_plan`.
149        };
150        let CatalogItem::View(item) = self.catalog().get_entry(&id).item() else {
151            unreachable!() // Asserted in `plan_explain_plan`.
152        };
153        let gid = item.global_id();
154
155        let create_sql = item.create_sql.clone();
156        let plan_result = self
157            .catalog_mut()
158            .deserialize_plan_with_enable_for_item_parsing(&create_sql, true);
159        let (plan, resolved_ids) = return_if_err!(plan_result, ctx);
160
161        let plan::Plan::CreateView(plan) = plan else {
162            unreachable!() // We are parsing the `create_sql` of a `View` item.
163        };
164
165        // It is safe to assume that query optimization will always succeed, so
166        // for now we statically assume `broken = false`.
167        let broken = false;
168
169        // Create an OptimizerTrace instance to collect plans emitted when
170        // executing the optimizer pipeline.
171        let optimizer_trace = OptimizerTrace::new(stage.paths());
172
173        let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
174            broken,
175            config,
176            format,
177            stage,
178            replan: Some(gid),
179            desc: None,
180            optimizer_trace,
181        });
182        let stage = return_if_err!(
183            self.create_view_validate(plan, resolved_ids, explain_ctx),
184            ctx
185        );
186        self.sequence_staged(ctx, Span::current(), stage).await;
187    }
188
189    #[instrument]
190    pub(crate) fn explain_view(
191        &mut self,
192        ctx: &ExecuteContext,
193        plan::ExplainPlanPlan {
194            stage,
195            format,
196            config,
197            explainee,
198        }: plan::ExplainPlanPlan,
199    ) -> Result<ExecuteResponse, AdapterError> {
200        let plan::Explainee::View(id) = explainee else {
201            unreachable!() // Asserted in `sequence_explain_plan`.
202        };
203        let CatalogItem::View(view) = self.catalog().get_entry(&id).item() else {
204            unreachable!() // Asserted in `plan_explain_plan`.
205        };
206
207        let target_cluster = None; // Views don't have a target cluster.
208
209        let features =
210            OptimizerFeatures::from(self.catalog().system_config()).override_from(&config.features);
211
212        let cardinality_stats = BTreeMap::new();
213
214        let explain = match stage {
215            ExplainStage::RawPlan => explain_plan(
216                view.raw_expr.as_ref().clone(),
217                format,
218                &config,
219                &features,
220                &self.catalog().for_session(ctx.session()),
221                cardinality_stats,
222                target_cluster,
223            )?,
224            ExplainStage::LocalPlan => explain_plan(
225                view.optimized_expr.as_inner().clone(),
226                format,
227                &config,
228                &features,
229                &self.catalog().for_session(ctx.session()),
230                cardinality_stats,
231                target_cluster,
232            )?,
233            _ => {
234                coord_bail!("cannot EXPLAIN {} FOR VIEW", stage);
235            }
236        };
237
238        let row = Row::pack_slice(&[Datum::from(explain.as_str())]);
239
240        Ok(Self::send_immediate_rows(row))
241    }
242
243    #[instrument]
244    fn create_view_validate(
245        &mut self,
246        plan: plan::CreateViewPlan,
247        resolved_ids: ResolvedIds,
248        // An optional context set iff the state machine is initiated from
249        // sequencing an EXPLAIN for this statement.
250        explain_ctx: ExplainContext,
251    ) -> Result<CreateViewStage, AdapterError> {
252        let plan::CreateViewPlan {
253            view: plan::View { expr, .. },
254            ambiguous_columns,
255            ..
256        } = &plan;
257
258        // Validate any references in the view's expression. We do this on the
259        // unoptimized plan to better reflect what the user typed. We want to
260        // reject queries that depend on a relation in the wrong timeline, for
261        // example, even if we can *technically* optimize that reference away.
262        let expr_depends_on = expr.depends_on();
263        self.catalog()
264            .validate_timeline_context(expr_depends_on.iter().copied())?;
265        self.validate_system_column_references(*ambiguous_columns, &expr_depends_on)?;
266
267        let validity =
268            PlanValidity::require_transient_revision(self.catalog().transient_revision());
269
270        Ok(CreateViewStage::Optimize(CreateViewOptimize {
271            validity,
272            plan,
273            resolved_ids,
274            explain_ctx,
275        }))
276    }
277
278    #[instrument]
279    async fn create_view_optimize(
280        &mut self,
281        CreateViewOptimize {
282            validity,
283            plan,
284            resolved_ids,
285            explain_ctx,
286        }: CreateViewOptimize,
287    ) -> Result<StageResult<Box<CreateViewStage>>, AdapterError> {
288        let id_ts = self.get_catalog_write_ts().await;
289        let (item_id, global_id) = self.catalog_mut().allocate_user_id(id_ts).await?;
290
291        // Collect optimizer parameters.
292        let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config())
293            .override_from(&explain_ctx);
294
295        // Build an optimizer for this VIEW.
296        let mut optimizer =
297            optimize::view::Optimizer::new(optimizer_config, Some(self.optimizer_metrics()));
298
299        let span = Span::current();
300        Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
301            || "optimize create view",
302            move || {
303                span.in_scope(|| {
304                    let mut pipeline =
305                        || -> Result<mz_expr::OptimizedMirRelationExpr, AdapterError> {
306                            let _dispatch_guard = explain_ctx.dispatch_guard();
307
308                            // HIR ⇒ MIR lowering and MIR ⇒ MIR optimization (local)
309                            let raw_expr = plan.view.expr.clone();
310                            let optimized_expr = optimizer.catch_unwind_optimize(raw_expr)?;
311
312                            Ok(optimized_expr)
313                        };
314
315                    let stage = match pipeline() {
316                        Ok(optimized_expr) => {
317                            if let ExplainContext::Plan(explain_ctx) = explain_ctx {
318                                CreateViewStage::Explain(CreateViewExplain {
319                                    validity,
320                                    id: global_id,
321                                    plan,
322                                    explain_ctx,
323                                })
324                            } else {
325                                CreateViewStage::Finish(CreateViewFinish {
326                                    validity,
327                                    item_id,
328                                    global_id,
329                                    plan,
330                                    optimized_expr,
331                                    resolved_ids,
332                                })
333                            }
334                        }
335                        // Internal optimizer errors are handled differently
336                        // depending on the caller.
337                        Err(err) => {
338                            let ExplainContext::Plan(explain_ctx) = explain_ctx else {
339                                // In `sequence_~` contexts, immediately return the error.
340                                return Err(err);
341                            };
342
343                            if explain_ctx.broken {
344                                // In `EXPLAIN BROKEN` contexts, just log the error
345                                // and move to the next stage with default
346                                // parameters.
347                                tracing::error!("error while handling EXPLAIN statement: {}", err);
348                                CreateViewStage::Explain(CreateViewExplain {
349                                    validity,
350                                    id: global_id,
351                                    plan,
352                                    explain_ctx,
353                                })
354                            } else {
355                                // In regular `EXPLAIN` contexts, immediately return the error.
356                                return Err(err);
357                            }
358                        }
359                    };
360
361                    Ok(Box::new(stage))
362                })
363            },
364        )))
365    }
366
367    #[instrument]
368    async fn create_view_finish(
369        &mut self,
370        session: &Session,
371        CreateViewFinish {
372            item_id,
373            global_id,
374            plan:
375                plan::CreateViewPlan {
376                    name,
377                    view:
378                        plan::View {
379                            create_sql,
380                            expr: raw_expr,
381                            dependencies,
382                            column_names,
383                            temporary,
384                        },
385                    drop_ids,
386                    if_not_exists,
387                    ..
388                },
389            optimized_expr,
390            resolved_ids,
391            ..
392        }: CreateViewFinish,
393    ) -> Result<StageResult<Box<CreateViewStage>>, AdapterError> {
394        let ops = vec![
395            catalog::Op::DropObjects(
396                drop_ids
397                    .iter()
398                    .map(|id| catalog::DropObjectInfo::Item(*id))
399                    .collect(),
400            ),
401            catalog::Op::CreateItem {
402                id: item_id,
403                name: name.clone(),
404                item: CatalogItem::View(View {
405                    create_sql: create_sql.clone(),
406                    global_id,
407                    raw_expr: raw_expr.into(),
408                    desc: RelationDesc::new(optimized_expr.typ(), column_names.clone()),
409                    optimized_expr: optimized_expr.into(),
410                    conn_id: if temporary {
411                        Some(session.conn_id().clone())
412                    } else {
413                        None
414                    },
415                    resolved_ids: resolved_ids.clone(),
416                    dependencies: dependencies.clone(),
417                }),
418                owner_id: *session.current_role_id(),
419            },
420        ];
421
422        match self.catalog_transact(Some(session), ops).await {
423            Ok(()) => Ok(StageResult::Response(ExecuteResponse::CreatedView)),
424            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
425                kind:
426                    mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
427            })) if if_not_exists => {
428                session.add_notice(AdapterNotice::ObjectAlreadyExists {
429                    name: name.item,
430                    ty: "view",
431                });
432                Ok(StageResult::Response(ExecuteResponse::CreatedView))
433            }
434            Err(err) => Err(err),
435        }
436    }
437
438    #[instrument]
439    async fn create_view_explain(
440        &mut self,
441        session: &Session,
442        CreateViewExplain {
443            id,
444            plan:
445                plan::CreateViewPlan {
446                    name,
447                    view: plan::View { column_names, .. },
448                    ..
449                },
450            explain_ctx:
451                ExplainPlanContext {
452                    config,
453                    format,
454                    stage,
455                    optimizer_trace,
456                    ..
457                },
458            ..
459        }: CreateViewExplain,
460    ) -> Result<StageResult<Box<CreateViewStage>>, AdapterError> {
461        let session_catalog = self.catalog().for_session(session);
462        let expr_humanizer = {
463            let full_name = self.catalog().resolve_full_name(&name, None);
464            let transient_items = btreemap! {
465                id => TransientItem::new(
466                    Some(full_name.into_parts()),
467                    Some(column_names.iter().map(|c| c.to_string()).collect()),
468                )
469            };
470            ExprHumanizerExt::new(transient_items, &session_catalog)
471        };
472
473        let features =
474            OptimizerFeatures::from(self.catalog().system_config()).override_from(&config.features);
475
476        let rows = optimizer_trace
477            .into_rows(
478                format,
479                &config,
480                &features,
481                &expr_humanizer,
482                None,
483                None, // Views don't have a target cluster.
484                Default::default(),
485                stage,
486                plan::ExplaineeStatementKind::CreateView,
487                None,
488            )
489            .await?;
490
491        Ok(StageResult::Response(Self::send_immediate_rows(rows)))
492    }
493}