1use std::collections::BTreeMap;
11
12use maplit::btreemap;
13use mz_catalog::memory::error::ErrorKind;
14use mz_catalog::memory::objects::{CatalogItem, View};
15use mz_expr::CollectionPlan;
16use mz_ore::instrument;
17use mz_repr::explain::{ExprHumanizerExt, TransientItem};
18use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
19use mz_repr::{Datum, RelationDesc, Row};
20use mz_sql::ast::ExplainStage;
21use mz_sql::catalog::CatalogError;
22use mz_sql::names::ResolvedIds;
23use mz_sql::plan::{self};
24use mz_sql::session::metadata::SessionMetadata;
25use tracing::Span;
26
27use crate::command::ExecuteResponse;
28use crate::coord::sequencer::inner::return_if_err;
29use crate::coord::{
30 Coordinator, CreateViewExplain, CreateViewFinish, CreateViewOptimize, CreateViewStage,
31 ExplainContext, ExplainPlanContext, Message, PlanValidity, StageResult, Staged,
32 infer_sql_type_for_catalog,
33};
34use crate::error::AdapterError;
35use crate::explain::explain_plan;
36use crate::explain::optimizer_trace::OptimizerTrace;
37use crate::optimize::{self, Optimize};
38use crate::session::Session;
39use crate::{AdapterNotice, ExecuteContext, catalog};
40
41impl Staged for CreateViewStage {
42 type Ctx = ExecuteContext;
43
44 fn validity(&mut self) -> &mut PlanValidity {
45 match self {
46 Self::Optimize(stage) => &mut stage.validity,
47 Self::Finish(stage) => &mut stage.validity,
48 Self::Explain(stage) => &mut stage.validity,
49 }
50 }
51
52 async fn stage(
53 self,
54 coord: &mut Coordinator,
55 ctx: &mut ExecuteContext,
56 ) -> Result<StageResult<Box<Self>>, AdapterError> {
57 match self {
58 CreateViewStage::Optimize(stage) => coord.create_view_optimize(stage).await,
59 CreateViewStage::Finish(stage) => coord.create_view_finish(ctx.session(), stage).await,
60 CreateViewStage::Explain(stage) => {
61 coord.create_view_explain(ctx.session(), stage).await
62 }
63 }
64 }
65
66 fn message(self, ctx: ExecuteContext, span: Span) -> Message {
67 Message::CreateViewStageReady {
68 ctx,
69 span,
70 stage: self,
71 }
72 }
73
74 fn cancel_enabled(&self) -> bool {
75 true
76 }
77}
78
79impl Coordinator {
80 pub(crate) async fn sequence_create_view(
81 &mut self,
82 ctx: ExecuteContext,
83 plan: plan::CreateViewPlan,
84 resolved_ids: ResolvedIds,
85 ) {
86 let stage = return_if_err!(
87 self.create_view_validate(ctx.session(), plan, resolved_ids, ExplainContext::None),
88 ctx
89 );
90 self.sequence_staged(ctx, Span::current(), stage).await;
91 }
92
93 #[instrument]
94 pub(crate) async fn explain_create_view(
95 &mut self,
96 ctx: ExecuteContext,
97 plan::ExplainPlanPlan {
98 stage,
99 format,
100 config,
101 explainee,
102 }: plan::ExplainPlanPlan,
103 ) {
104 let plan::Explainee::Statement(stmt) = explainee else {
105 unreachable!()
108 };
109 let plan::ExplaineeStatement::CreateView { broken, plan } = stmt else {
110 unreachable!()
113 };
114
115 let optimizer_trace = OptimizerTrace::new(stage.paths());
118
119 let resolved_ids = ResolvedIds::empty();
121
122 let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
123 broken,
124 config,
125 format,
126 stage,
127 replan: None,
128 desc: None,
129 optimizer_trace,
130 });
131 let stage = return_if_err!(
132 self.create_view_validate(ctx.session(), plan, resolved_ids, explain_ctx),
133 ctx
134 );
135 self.sequence_staged(ctx, Span::current(), stage).await;
136 }
137
138 #[instrument]
139 pub(crate) async fn explain_replan_view(
140 &mut self,
141 ctx: ExecuteContext,
142 plan::ExplainPlanPlan {
143 stage,
144 format,
145 config,
146 explainee,
147 }: plan::ExplainPlanPlan,
148 ) {
149 let plan::Explainee::ReplanView(id) = explainee else {
150 unreachable!() };
152 let CatalogItem::View(item) = self.catalog().get_entry(&id).item() else {
153 unreachable!() };
155 let gid = item.global_id();
156
157 let create_sql = item.create_sql.clone();
158 let plan_result = self
159 .catalog_mut()
160 .deserialize_plan_with_enable_for_item_parsing(&create_sql, true);
161 let (plan, resolved_ids) = return_if_err!(plan_result, ctx);
162
163 let plan::Plan::CreateView(plan) = plan else {
164 unreachable!() };
166
167 let broken = false;
170
171 let optimizer_trace = OptimizerTrace::new(stage.paths());
174
175 let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
176 broken,
177 config,
178 format,
179 stage,
180 replan: Some(gid),
181 desc: None,
182 optimizer_trace,
183 });
184 let stage = return_if_err!(
185 self.create_view_validate(ctx.session(), plan, resolved_ids, explain_ctx),
186 ctx
187 );
188 self.sequence_staged(ctx, Span::current(), stage).await;
189 }
190
191 #[instrument]
192 pub(crate) fn explain_view(
193 &self,
194 ctx: &ExecuteContext,
195 plan::ExplainPlanPlan {
196 stage,
197 format,
198 config,
199 explainee,
200 }: plan::ExplainPlanPlan,
201 ) -> Result<ExecuteResponse, AdapterError> {
202 let plan::Explainee::View(id) = explainee else {
203 unreachable!() };
205 let CatalogItem::View(view) = self.catalog().get_entry(&id).item() else {
206 unreachable!() };
208
209 let target_cluster = None; let features =
212 OptimizerFeatures::from(self.catalog().system_config()).override_from(&config.features);
213
214 let cardinality_stats = BTreeMap::new();
215
216 let explain = match stage {
217 ExplainStage::RawPlan => explain_plan(
218 view.raw_expr.as_ref().clone(),
219 format,
220 &config,
221 &features,
222 &self.catalog().for_session(ctx.session()),
223 cardinality_stats,
224 target_cluster,
225 )?,
226 ExplainStage::LocalPlan => explain_plan(
227 view.locally_optimized_expr.as_inner().clone(),
228 format,
229 &config,
230 &features,
231 &self.catalog().for_session(ctx.session()),
232 cardinality_stats,
233 target_cluster,
234 )?,
235 _ => {
236 coord_bail!("cannot EXPLAIN {} FOR VIEW", stage);
237 }
238 };
239
240 let row = Row::pack_slice(&[Datum::from(explain.as_str())]);
241
242 Ok(Self::send_immediate_rows(row))
243 }
244
245 #[instrument]
246 fn create_view_validate(
247 &self,
248 session: &Session,
249 plan: plan::CreateViewPlan,
250 resolved_ids: ResolvedIds,
251 explain_ctx: ExplainContext,
254 ) -> Result<CreateViewStage, AdapterError> {
255 let plan::CreateViewPlan {
256 view: plan::View { expr, .. },
257 ambiguous_columns,
258 ..
259 } = &plan;
260
261 let expr_depends_on = expr.depends_on();
266 self.catalog()
267 .validate_timeline_context(expr_depends_on.iter().copied())?;
268 self.validate_system_column_references(*ambiguous_columns, &expr_depends_on)?;
269
270 let validity = PlanValidity::new(
274 self.catalog().transient_revision(),
275 resolved_ids.items().copied().collect(),
276 None,
277 None,
278 session.role_metadata().clone(),
279 );
280
281 Ok(CreateViewStage::Optimize(CreateViewOptimize {
282 validity,
283 plan,
284 resolved_ids,
285 explain_ctx,
286 }))
287 }
288
289 #[instrument]
290 async fn create_view_optimize(
291 &mut self,
292 CreateViewOptimize {
293 validity,
294 plan,
295 resolved_ids,
296 explain_ctx,
297 }: CreateViewOptimize,
298 ) -> Result<StageResult<Box<CreateViewStage>>, AdapterError> {
299 let (item_id, global_id) = self.allocate_user_id().await?;
300
301 let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config())
303 .override_from(&explain_ctx);
304
305 let mut optimizer =
307 optimize::view::Optimizer::new(optimizer_config, Some(self.optimizer_metrics()));
308
309 let span = Span::current();
310 Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
311 || "optimize create view",
312 move || {
313 span.in_scope(|| {
314 let mut pipeline =
315 || -> Result<mz_expr::OptimizedMirRelationExpr, AdapterError> {
316 let _dispatch_guard = explain_ctx.dispatch_guard();
317
318 let raw_expr = plan.view.expr.clone();
320 let optimized_expr = optimizer.catch_unwind_optimize(raw_expr)?;
321
322 Ok(optimized_expr)
323 };
324
325 let stage = match pipeline() {
326 Ok(optimized_expr) => {
327 if let ExplainContext::Plan(explain_ctx) = explain_ctx {
328 CreateViewStage::Explain(CreateViewExplain {
329 validity,
330 id: global_id,
331 plan,
332 explain_ctx,
333 })
334 } else {
335 CreateViewStage::Finish(CreateViewFinish {
336 validity,
337 item_id,
338 global_id,
339 plan,
340 optimized_expr,
341 resolved_ids,
342 })
343 }
344 }
345 Err(err) => {
348 let ExplainContext::Plan(explain_ctx) = explain_ctx else {
349 return Err(err);
351 };
352
353 if explain_ctx.broken {
354 tracing::error!("error while handling EXPLAIN statement: {}", err);
358 CreateViewStage::Explain(CreateViewExplain {
359 validity,
360 id: global_id,
361 plan,
362 explain_ctx,
363 })
364 } else {
365 return Err(err);
367 }
368 }
369 };
370
371 Ok(Box::new(stage))
372 })
373 },
374 )))
375 }
376
377 #[instrument]
378 async fn create_view_finish(
379 &mut self,
380 session: &Session,
381 CreateViewFinish {
382 item_id,
383 global_id,
384 plan:
385 plan::CreateViewPlan {
386 name,
387 view:
388 plan::View {
389 create_sql,
390 expr: raw_expr,
391 dependencies,
392 column_names,
393 temporary,
394 },
395 drop_ids,
396 if_not_exists,
397 ..
398 },
399 optimized_expr,
400 resolved_ids,
401 ..
402 }: CreateViewFinish,
403 ) -> Result<StageResult<Box<CreateViewStage>>, AdapterError> {
404 let typ = infer_sql_type_for_catalog(&raw_expr, &optimized_expr);
405 let ops = vec![
406 catalog::Op::DropObjects(
407 drop_ids
408 .iter()
409 .map(|id| catalog::DropObjectInfo::Item(*id))
410 .collect(),
411 ),
412 catalog::Op::CreateItem {
413 id: item_id,
414 name: name.clone(),
415 item: CatalogItem::View(View {
416 create_sql: create_sql.clone(),
417 global_id,
418 raw_expr: raw_expr.into(),
419 desc: RelationDesc::new(typ, column_names.clone()),
420 locally_optimized_expr: optimized_expr.into(),
421 conn_id: if temporary {
422 Some(session.conn_id().clone())
423 } else {
424 None
425 },
426 resolved_ids: resolved_ids.clone(),
427 dependencies: dependencies.clone(),
428 }),
429 owner_id: *session.current_role_id(),
430 },
431 ];
432
433 match self.catalog_transact(Some(session), ops).await {
434 Ok(()) => Ok(StageResult::Response(ExecuteResponse::CreatedView)),
435 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
436 kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
437 })) if if_not_exists => {
438 session.add_notice(AdapterNotice::ObjectAlreadyExists {
439 name: name.item,
440 ty: "view",
441 });
442 Ok(StageResult::Response(ExecuteResponse::CreatedView))
443 }
444 Err(err) => Err(err),
445 }
446 }
447
448 #[instrument]
449 async fn create_view_explain(
450 &self,
451 session: &Session,
452 CreateViewExplain {
453 id,
454 plan:
455 plan::CreateViewPlan {
456 name,
457 view: plan::View { column_names, .. },
458 ..
459 },
460 explain_ctx:
461 ExplainPlanContext {
462 config,
463 format,
464 stage,
465 optimizer_trace,
466 ..
467 },
468 ..
469 }: CreateViewExplain,
470 ) -> Result<StageResult<Box<CreateViewStage>>, AdapterError> {
471 let session_catalog = self.catalog().for_session(session);
472 let expr_humanizer = {
473 let full_name = self.catalog().resolve_full_name(&name, None);
474 let transient_items = btreemap! {
475 id => TransientItem::new(
476 Some(full_name.into_parts()),
477 Some(column_names.iter().map(|c| c.to_string()).collect()),
478 )
479 };
480 ExprHumanizerExt::new(transient_items, &session_catalog)
481 };
482
483 let features =
484 OptimizerFeatures::from(self.catalog().system_config()).override_from(&config.features);
485
486 let rows = optimizer_trace
487 .into_rows(
488 format,
489 &config,
490 &features,
491 &expr_humanizer,
492 None,
493 None, Default::default(),
495 stage,
496 plan::ExplaineeStatementKind::CreateView,
497 None,
498 )
499 .await?;
500
501 Ok(StageResult::Response(Self::send_immediate_rows(rows)))
502 }
503}