1use std::collections::BTreeMap;
11
12use maplit::btreemap;
13use mz_catalog::memory::error::ErrorKind;
14use mz_catalog::memory::objects::{CatalogItem, View};
15use mz_expr::CollectionPlan;
16use mz_ore::instrument;
17use mz_repr::explain::{ExprHumanizerExt, TransientItem};
18use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
19use mz_repr::{Datum, RelationDesc, Row};
20use mz_sql::ast::ExplainStage;
21use mz_sql::catalog::CatalogError;
22use mz_sql::names::ResolvedIds;
23use mz_sql::plan::{self};
24use mz_sql::session::metadata::SessionMetadata;
25use tracing::Span;
26
27use crate::command::ExecuteResponse;
28use crate::coord::sequencer::inner::return_if_err;
29use crate::coord::{
30 Coordinator, CreateViewExplain, CreateViewFinish, CreateViewOptimize, CreateViewStage,
31 ExplainContext, ExplainPlanContext, Message, PlanValidity, StageResult, Staged,
32 infer_sql_type_for_catalog,
33};
34use crate::error::AdapterError;
35use crate::explain::explain_plan;
36use crate::explain::optimizer_trace::OptimizerTrace;
37use crate::optimize::{self, Optimize};
38use crate::session::Session;
39use crate::{AdapterNotice, ExecuteContext, catalog};
40
41impl Staged for CreateViewStage {
42 type Ctx = ExecuteContext;
43
44 fn validity(&mut self) -> &mut PlanValidity {
45 match self {
46 Self::Optimize(stage) => &mut stage.validity,
47 Self::Finish(stage) => &mut stage.validity,
48 Self::Explain(stage) => &mut stage.validity,
49 }
50 }
51
52 async fn stage(
53 self,
54 coord: &mut Coordinator,
55 ctx: &mut ExecuteContext,
56 ) -> Result<StageResult<Box<Self>>, AdapterError> {
57 match self {
58 CreateViewStage::Optimize(stage) => coord.create_view_optimize(stage).await,
59 CreateViewStage::Finish(stage) => coord.create_view_finish(ctx.session(), stage).await,
60 CreateViewStage::Explain(stage) => {
61 coord.create_view_explain(ctx.session(), stage).await
62 }
63 }
64 }
65
66 fn message(self, ctx: ExecuteContext, span: Span) -> Message {
67 Message::CreateViewStageReady {
68 ctx,
69 span,
70 stage: self,
71 }
72 }
73
74 fn cancel_enabled(&self) -> bool {
75 true
76 }
77}
78
79impl Coordinator {
80 pub(crate) async fn sequence_create_view(
81 &mut self,
82 ctx: ExecuteContext,
83 plan: plan::CreateViewPlan,
84 resolved_ids: ResolvedIds,
85 ) {
86 let stage = return_if_err!(
87 self.create_view_validate(plan, resolved_ids, ExplainContext::None),
88 ctx
89 );
90 self.sequence_staged(ctx, Span::current(), stage).await;
91 }
92
93 #[instrument]
94 pub(crate) async fn explain_create_view(
95 &mut self,
96 ctx: ExecuteContext,
97 plan::ExplainPlanPlan {
98 stage,
99 format,
100 config,
101 explainee,
102 }: plan::ExplainPlanPlan,
103 ) {
104 let plan::Explainee::Statement(stmt) = explainee else {
105 unreachable!()
108 };
109 let plan::ExplaineeStatement::CreateView { broken, plan } = stmt else {
110 unreachable!()
113 };
114
115 let optimizer_trace = OptimizerTrace::new(stage.paths());
118
119 let resolved_ids = ResolvedIds::empty();
121
122 let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
123 broken,
124 config,
125 format,
126 stage,
127 replan: None,
128 desc: None,
129 optimizer_trace,
130 });
131 let stage = return_if_err!(
132 self.create_view_validate(plan, resolved_ids, explain_ctx),
133 ctx
134 );
135 self.sequence_staged(ctx, Span::current(), stage).await;
136 }
137
138 #[instrument]
139 pub(crate) async fn explain_replan_view(
140 &mut self,
141 ctx: ExecuteContext,
142 plan::ExplainPlanPlan {
143 stage,
144 format,
145 config,
146 explainee,
147 }: plan::ExplainPlanPlan,
148 ) {
149 let plan::Explainee::ReplanView(id) = explainee else {
150 unreachable!() };
152 let CatalogItem::View(item) = self.catalog().get_entry(&id).item() else {
153 unreachable!() };
155 let gid = item.global_id();
156
157 let create_sql = item.create_sql.clone();
158 let plan_result = self
159 .catalog_mut()
160 .deserialize_plan_with_enable_for_item_parsing(&create_sql, true);
161 let (plan, resolved_ids) = return_if_err!(plan_result, ctx);
162
163 let plan::Plan::CreateView(plan) = plan else {
164 unreachable!() };
166
167 let broken = false;
170
171 let optimizer_trace = OptimizerTrace::new(stage.paths());
174
175 let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
176 broken,
177 config,
178 format,
179 stage,
180 replan: Some(gid),
181 desc: None,
182 optimizer_trace,
183 });
184 let stage = return_if_err!(
185 self.create_view_validate(plan, resolved_ids, explain_ctx),
186 ctx
187 );
188 self.sequence_staged(ctx, Span::current(), stage).await;
189 }
190
191 #[instrument]
192 pub(crate) fn explain_view(
193 &self,
194 ctx: &ExecuteContext,
195 plan::ExplainPlanPlan {
196 stage,
197 format,
198 config,
199 explainee,
200 }: plan::ExplainPlanPlan,
201 ) -> Result<ExecuteResponse, AdapterError> {
202 let plan::Explainee::View(id) = explainee else {
203 unreachable!() };
205 let CatalogItem::View(view) = self.catalog().get_entry(&id).item() else {
206 unreachable!() };
208
209 let target_cluster = None; let features =
212 OptimizerFeatures::from(self.catalog().system_config()).override_from(&config.features);
213
214 let cardinality_stats = BTreeMap::new();
215
216 let explain = match stage {
217 ExplainStage::RawPlan => explain_plan(
218 view.raw_expr.as_ref().clone(),
219 format,
220 &config,
221 &features,
222 &self.catalog().for_session(ctx.session()),
223 cardinality_stats,
224 target_cluster,
225 )?,
226 ExplainStage::LocalPlan => explain_plan(
227 view.optimized_expr.as_inner().clone(),
228 format,
229 &config,
230 &features,
231 &self.catalog().for_session(ctx.session()),
232 cardinality_stats,
233 target_cluster,
234 )?,
235 _ => {
236 coord_bail!("cannot EXPLAIN {} FOR VIEW", stage);
237 }
238 };
239
240 let row = Row::pack_slice(&[Datum::from(explain.as_str())]);
241
242 Ok(Self::send_immediate_rows(row))
243 }
244
245 #[instrument]
246 fn create_view_validate(
247 &self,
248 plan: plan::CreateViewPlan,
249 resolved_ids: ResolvedIds,
250 explain_ctx: ExplainContext,
253 ) -> Result<CreateViewStage, AdapterError> {
254 let plan::CreateViewPlan {
255 view: plan::View { expr, .. },
256 ambiguous_columns,
257 ..
258 } = &plan;
259
260 let expr_depends_on = expr.depends_on();
265 self.catalog()
266 .validate_timeline_context(expr_depends_on.iter().copied())?;
267 self.validate_system_column_references(*ambiguous_columns, &expr_depends_on)?;
268
269 let validity =
270 PlanValidity::require_transient_revision(self.catalog().transient_revision());
271
272 Ok(CreateViewStage::Optimize(CreateViewOptimize {
273 validity,
274 plan,
275 resolved_ids,
276 explain_ctx,
277 }))
278 }
279
280 #[instrument]
281 async fn create_view_optimize(
282 &mut self,
283 CreateViewOptimize {
284 validity,
285 plan,
286 resolved_ids,
287 explain_ctx,
288 }: CreateViewOptimize,
289 ) -> Result<StageResult<Box<CreateViewStage>>, AdapterError> {
290 let (item_id, global_id) = self.allocate_user_id().await?;
291
292 let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config())
294 .override_from(&explain_ctx);
295
296 let mut optimizer =
298 optimize::view::Optimizer::new(optimizer_config, Some(self.optimizer_metrics()));
299
300 let span = Span::current();
301 Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
302 || "optimize create view",
303 move || {
304 span.in_scope(|| {
305 let mut pipeline =
306 || -> Result<mz_expr::OptimizedMirRelationExpr, AdapterError> {
307 let _dispatch_guard = explain_ctx.dispatch_guard();
308
309 let raw_expr = plan.view.expr.clone();
311 let optimized_expr = optimizer.catch_unwind_optimize(raw_expr)?;
312
313 Ok(optimized_expr)
314 };
315
316 let stage = match pipeline() {
317 Ok(optimized_expr) => {
318 if let ExplainContext::Plan(explain_ctx) = explain_ctx {
319 CreateViewStage::Explain(CreateViewExplain {
320 validity,
321 id: global_id,
322 plan,
323 explain_ctx,
324 })
325 } else {
326 CreateViewStage::Finish(CreateViewFinish {
327 validity,
328 item_id,
329 global_id,
330 plan,
331 optimized_expr,
332 resolved_ids,
333 })
334 }
335 }
336 Err(err) => {
339 let ExplainContext::Plan(explain_ctx) = explain_ctx else {
340 return Err(err);
342 };
343
344 if explain_ctx.broken {
345 tracing::error!("error while handling EXPLAIN statement: {}", err);
349 CreateViewStage::Explain(CreateViewExplain {
350 validity,
351 id: global_id,
352 plan,
353 explain_ctx,
354 })
355 } else {
356 return Err(err);
358 }
359 }
360 };
361
362 Ok(Box::new(stage))
363 })
364 },
365 )))
366 }
367
368 #[instrument]
369 async fn create_view_finish(
370 &mut self,
371 session: &Session,
372 CreateViewFinish {
373 item_id,
374 global_id,
375 plan:
376 plan::CreateViewPlan {
377 name,
378 view:
379 plan::View {
380 create_sql,
381 expr: raw_expr,
382 dependencies,
383 column_names,
384 temporary,
385 },
386 drop_ids,
387 if_not_exists,
388 ..
389 },
390 optimized_expr,
391 resolved_ids,
392 ..
393 }: CreateViewFinish,
394 ) -> Result<StageResult<Box<CreateViewStage>>, AdapterError> {
395 let typ = infer_sql_type_for_catalog(&raw_expr, &optimized_expr);
396 let ops = vec![
397 catalog::Op::DropObjects(
398 drop_ids
399 .iter()
400 .map(|id| catalog::DropObjectInfo::Item(*id))
401 .collect(),
402 ),
403 catalog::Op::CreateItem {
404 id: item_id,
405 name: name.clone(),
406 item: CatalogItem::View(View {
407 create_sql: create_sql.clone(),
408 global_id,
409 raw_expr: raw_expr.into(),
410 desc: RelationDesc::new(typ, column_names.clone()),
411 optimized_expr: optimized_expr.into(),
412 conn_id: if temporary {
413 Some(session.conn_id().clone())
414 } else {
415 None
416 },
417 resolved_ids: resolved_ids.clone(),
418 dependencies: dependencies.clone(),
419 }),
420 owner_id: *session.current_role_id(),
421 },
422 ];
423
424 match self.catalog_transact(Some(session), ops).await {
425 Ok(()) => Ok(StageResult::Response(ExecuteResponse::CreatedView)),
426 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
427 kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
428 })) if if_not_exists => {
429 session.add_notice(AdapterNotice::ObjectAlreadyExists {
430 name: name.item,
431 ty: "view",
432 });
433 Ok(StageResult::Response(ExecuteResponse::CreatedView))
434 }
435 Err(err) => Err(err),
436 }
437 }
438
439 #[instrument]
440 async fn create_view_explain(
441 &self,
442 session: &Session,
443 CreateViewExplain {
444 id,
445 plan:
446 plan::CreateViewPlan {
447 name,
448 view: plan::View { column_names, .. },
449 ..
450 },
451 explain_ctx:
452 ExplainPlanContext {
453 config,
454 format,
455 stage,
456 optimizer_trace,
457 ..
458 },
459 ..
460 }: CreateViewExplain,
461 ) -> Result<StageResult<Box<CreateViewStage>>, AdapterError> {
462 let session_catalog = self.catalog().for_session(session);
463 let expr_humanizer = {
464 let full_name = self.catalog().resolve_full_name(&name, None);
465 let transient_items = btreemap! {
466 id => TransientItem::new(
467 Some(full_name.into_parts()),
468 Some(column_names.iter().map(|c| c.to_string()).collect()),
469 )
470 };
471 ExprHumanizerExt::new(transient_items, &session_catalog)
472 };
473
474 let features =
475 OptimizerFeatures::from(self.catalog().system_config()).override_from(&config.features);
476
477 let rows = optimizer_trace
478 .into_rows(
479 format,
480 &config,
481 &features,
482 &expr_humanizer,
483 None,
484 None, Default::default(),
486 stage,
487 plan::ExplaineeStatementKind::CreateView,
488 None,
489 )
490 .await?;
491
492 Ok(StageResult::Response(Self::send_immediate_rows(rows)))
493 }
494}