1use std::collections::BTreeMap;
11
12use maplit::btreemap;
13use mz_catalog::memory::error::ErrorKind;
14use mz_catalog::memory::objects::{CatalogItem, View};
15use mz_expr::CollectionPlan;
16use mz_ore::instrument;
17use mz_repr::explain::{ExprHumanizerExt, TransientItem};
18use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
19use mz_repr::{Datum, RelationDesc, Row};
20use mz_sql::ast::ExplainStage;
21use mz_sql::catalog::CatalogError;
22use mz_sql::names::ResolvedIds;
23use mz_sql::plan::{self};
24use mz_sql::session::metadata::SessionMetadata;
25use tracing::Span;
26
27use crate::command::ExecuteResponse;
28use crate::coord::sequencer::inner::return_if_err;
29use crate::coord::{
30 Coordinator, CreateViewExplain, CreateViewFinish, CreateViewOptimize, CreateViewStage,
31 ExplainContext, ExplainPlanContext, Message, PlanValidity, StageResult, Staged,
32 infer_sql_type_for_catalog,
33};
34use crate::error::AdapterError;
35use crate::explain::explain_plan;
36use crate::explain::optimizer_trace::OptimizerTrace;
37use crate::optimize::{self, Optimize};
38use crate::session::Session;
39use crate::{AdapterNotice, ExecuteContext, catalog};
40
41impl Staged for CreateViewStage {
42 type Ctx = ExecuteContext;
43
44 fn validity(&mut self) -> &mut PlanValidity {
45 match self {
46 Self::Optimize(stage) => &mut stage.validity,
47 Self::Finish(stage) => &mut stage.validity,
48 Self::Explain(stage) => &mut stage.validity,
49 }
50 }
51
52 async fn stage(
53 self,
54 coord: &mut Coordinator,
55 ctx: &mut ExecuteContext,
56 ) -> Result<StageResult<Box<Self>>, AdapterError> {
57 match self {
58 CreateViewStage::Optimize(stage) => coord.create_view_optimize(stage).await,
59 CreateViewStage::Finish(stage) => coord.create_view_finish(ctx.session(), stage).await,
60 CreateViewStage::Explain(stage) => {
61 coord.create_view_explain(ctx.session(), stage).await
62 }
63 }
64 }
65
66 fn message(self, ctx: ExecuteContext, span: Span) -> Message {
67 Message::CreateViewStageReady {
68 ctx,
69 span,
70 stage: self,
71 }
72 }
73
74 fn cancel_enabled(&self) -> bool {
75 true
76 }
77}
78
79impl Coordinator {
80 pub(crate) async fn sequence_create_view(
81 &mut self,
82 ctx: ExecuteContext,
83 plan: plan::CreateViewPlan,
84 resolved_ids: ResolvedIds,
85 ) {
86 let stage = return_if_err!(
87 self.create_view_validate(plan, resolved_ids, ExplainContext::None),
88 ctx
89 );
90 self.sequence_staged(ctx, Span::current(), stage).await;
91 }
92
93 #[instrument]
94 pub(crate) async fn explain_create_view(
95 &mut self,
96 ctx: ExecuteContext,
97 plan::ExplainPlanPlan {
98 stage,
99 format,
100 config,
101 explainee,
102 }: plan::ExplainPlanPlan,
103 ) {
104 let plan::Explainee::Statement(stmt) = explainee else {
105 unreachable!()
108 };
109 let plan::ExplaineeStatement::CreateView { broken, plan } = stmt else {
110 unreachable!()
113 };
114
115 let optimizer_trace = OptimizerTrace::new(stage.paths());
118
119 let resolved_ids = ResolvedIds::empty();
121
122 let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
123 broken,
124 config,
125 format,
126 stage,
127 replan: None,
128 desc: None,
129 optimizer_trace,
130 });
131 let stage = return_if_err!(
132 self.create_view_validate(plan, resolved_ids, explain_ctx),
133 ctx
134 );
135 self.sequence_staged(ctx, Span::current(), stage).await;
136 }
137
138 #[instrument]
139 pub(crate) async fn explain_replan_view(
140 &mut self,
141 ctx: ExecuteContext,
142 plan::ExplainPlanPlan {
143 stage,
144 format,
145 config,
146 explainee,
147 }: plan::ExplainPlanPlan,
148 ) {
149 let plan::Explainee::ReplanView(id) = explainee else {
150 unreachable!() };
152 let CatalogItem::View(item) = self.catalog().get_entry(&id).item() else {
153 unreachable!() };
155 let gid = item.global_id();
156
157 let create_sql = item.create_sql.clone();
158 let plan_result = self
159 .catalog_mut()
160 .deserialize_plan_with_enable_for_item_parsing(&create_sql, true);
161 let (plan, resolved_ids) = return_if_err!(plan_result, ctx);
162
163 let plan::Plan::CreateView(plan) = plan else {
164 unreachable!() };
166
167 let broken = false;
170
171 let optimizer_trace = OptimizerTrace::new(stage.paths());
174
175 let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
176 broken,
177 config,
178 format,
179 stage,
180 replan: Some(gid),
181 desc: None,
182 optimizer_trace,
183 });
184 let stage = return_if_err!(
185 self.create_view_validate(plan, resolved_ids, explain_ctx),
186 ctx
187 );
188 self.sequence_staged(ctx, Span::current(), stage).await;
189 }
190
191 #[instrument]
192 pub(crate) fn explain_view(
193 &self,
194 ctx: &ExecuteContext,
195 plan::ExplainPlanPlan {
196 stage,
197 format,
198 config,
199 explainee,
200 }: plan::ExplainPlanPlan,
201 ) -> Result<ExecuteResponse, AdapterError> {
202 let plan::Explainee::View(id) = explainee else {
203 unreachable!() };
205 let CatalogItem::View(view) = self.catalog().get_entry(&id).item() else {
206 unreachable!() };
208
209 let target_cluster = None; let features =
212 OptimizerFeatures::from(self.catalog().system_config()).override_from(&config.features);
213
214 let cardinality_stats = BTreeMap::new();
215
216 let explain = match stage {
217 ExplainStage::RawPlan => explain_plan(
218 view.raw_expr.as_ref().clone(),
219 format,
220 &config,
221 &features,
222 &self.catalog().for_session(ctx.session()),
223 cardinality_stats,
224 target_cluster,
225 )?,
226 ExplainStage::LocalPlan => explain_plan(
227 view.optimized_expr.as_inner().clone(),
228 format,
229 &config,
230 &features,
231 &self.catalog().for_session(ctx.session()),
232 cardinality_stats,
233 target_cluster,
234 )?,
235 _ => {
236 coord_bail!("cannot EXPLAIN {} FOR VIEW", stage);
237 }
238 };
239
240 let row = Row::pack_slice(&[Datum::from(explain.as_str())]);
241
242 Ok(Self::send_immediate_rows(row))
243 }
244
245 #[instrument]
246 fn create_view_validate(
247 &self,
248 plan: plan::CreateViewPlan,
249 resolved_ids: ResolvedIds,
250 explain_ctx: ExplainContext,
253 ) -> Result<CreateViewStage, AdapterError> {
254 let plan::CreateViewPlan {
255 view: plan::View { expr, .. },
256 ambiguous_columns,
257 ..
258 } = &plan;
259
260 let expr_depends_on = expr.depends_on();
265 self.catalog()
266 .validate_timeline_context(expr_depends_on.iter().copied())?;
267 self.validate_system_column_references(*ambiguous_columns, &expr_depends_on)?;
268
269 let validity =
270 PlanValidity::require_transient_revision(self.catalog().transient_revision());
271
272 Ok(CreateViewStage::Optimize(CreateViewOptimize {
273 validity,
274 plan,
275 resolved_ids,
276 explain_ctx,
277 }))
278 }
279
280 #[instrument]
281 async fn create_view_optimize(
282 &mut self,
283 CreateViewOptimize {
284 validity,
285 plan,
286 resolved_ids,
287 explain_ctx,
288 }: CreateViewOptimize,
289 ) -> Result<StageResult<Box<CreateViewStage>>, AdapterError> {
290 let id_ts = self.get_catalog_write_ts().await;
291 let (item_id, global_id) = self.catalog().allocate_user_id(id_ts).await?;
292
293 let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config())
295 .override_from(&explain_ctx);
296
297 let mut optimizer =
299 optimize::view::Optimizer::new(optimizer_config, Some(self.optimizer_metrics()));
300
301 let span = Span::current();
302 Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
303 || "optimize create view",
304 move || {
305 span.in_scope(|| {
306 let mut pipeline =
307 || -> Result<mz_expr::OptimizedMirRelationExpr, AdapterError> {
308 let _dispatch_guard = explain_ctx.dispatch_guard();
309
310 let raw_expr = plan.view.expr.clone();
312 let optimized_expr = optimizer.catch_unwind_optimize(raw_expr)?;
313
314 Ok(optimized_expr)
315 };
316
317 let stage = match pipeline() {
318 Ok(optimized_expr) => {
319 if let ExplainContext::Plan(explain_ctx) = explain_ctx {
320 CreateViewStage::Explain(CreateViewExplain {
321 validity,
322 id: global_id,
323 plan,
324 explain_ctx,
325 })
326 } else {
327 CreateViewStage::Finish(CreateViewFinish {
328 validity,
329 item_id,
330 global_id,
331 plan,
332 optimized_expr,
333 resolved_ids,
334 })
335 }
336 }
337 Err(err) => {
340 let ExplainContext::Plan(explain_ctx) = explain_ctx else {
341 return Err(err);
343 };
344
345 if explain_ctx.broken {
346 tracing::error!("error while handling EXPLAIN statement: {}", err);
350 CreateViewStage::Explain(CreateViewExplain {
351 validity,
352 id: global_id,
353 plan,
354 explain_ctx,
355 })
356 } else {
357 return Err(err);
359 }
360 }
361 };
362
363 Ok(Box::new(stage))
364 })
365 },
366 )))
367 }
368
369 #[instrument]
370 async fn create_view_finish(
371 &mut self,
372 session: &Session,
373 CreateViewFinish {
374 item_id,
375 global_id,
376 plan:
377 plan::CreateViewPlan {
378 name,
379 view:
380 plan::View {
381 create_sql,
382 expr: raw_expr,
383 dependencies,
384 column_names,
385 temporary,
386 },
387 drop_ids,
388 if_not_exists,
389 ..
390 },
391 optimized_expr,
392 resolved_ids,
393 ..
394 }: CreateViewFinish,
395 ) -> Result<StageResult<Box<CreateViewStage>>, AdapterError> {
396 let typ = infer_sql_type_for_catalog(&raw_expr, &optimized_expr);
397 let ops = vec![
398 catalog::Op::DropObjects(
399 drop_ids
400 .iter()
401 .map(|id| catalog::DropObjectInfo::Item(*id))
402 .collect(),
403 ),
404 catalog::Op::CreateItem {
405 id: item_id,
406 name: name.clone(),
407 item: CatalogItem::View(View {
408 create_sql: create_sql.clone(),
409 global_id,
410 raw_expr: raw_expr.into(),
411 desc: RelationDesc::new(typ, column_names.clone()),
412 optimized_expr: optimized_expr.into(),
413 conn_id: if temporary {
414 Some(session.conn_id().clone())
415 } else {
416 None
417 },
418 resolved_ids: resolved_ids.clone(),
419 dependencies: dependencies.clone(),
420 }),
421 owner_id: *session.current_role_id(),
422 },
423 ];
424
425 match self.catalog_transact(Some(session), ops).await {
426 Ok(()) => Ok(StageResult::Response(ExecuteResponse::CreatedView)),
427 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
428 kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
429 })) if if_not_exists => {
430 session.add_notice(AdapterNotice::ObjectAlreadyExists {
431 name: name.item,
432 ty: "view",
433 });
434 Ok(StageResult::Response(ExecuteResponse::CreatedView))
435 }
436 Err(err) => Err(err),
437 }
438 }
439
440 #[instrument]
441 async fn create_view_explain(
442 &self,
443 session: &Session,
444 CreateViewExplain {
445 id,
446 plan:
447 plan::CreateViewPlan {
448 name,
449 view: plan::View { column_names, .. },
450 ..
451 },
452 explain_ctx:
453 ExplainPlanContext {
454 config,
455 format,
456 stage,
457 optimizer_trace,
458 ..
459 },
460 ..
461 }: CreateViewExplain,
462 ) -> Result<StageResult<Box<CreateViewStage>>, AdapterError> {
463 let session_catalog = self.catalog().for_session(session);
464 let expr_humanizer = {
465 let full_name = self.catalog().resolve_full_name(&name, None);
466 let transient_items = btreemap! {
467 id => TransientItem::new(
468 Some(full_name.into_parts()),
469 Some(column_names.iter().map(|c| c.to_string()).collect()),
470 )
471 };
472 ExprHumanizerExt::new(transient_items, &session_catalog)
473 };
474
475 let features =
476 OptimizerFeatures::from(self.catalog().system_config()).override_from(&config.features);
477
478 let rows = optimizer_trace
479 .into_rows(
480 format,
481 &config,
482 &features,
483 &expr_humanizer,
484 None,
485 None, Default::default(),
487 stage,
488 plan::ExplaineeStatementKind::CreateView,
489 None,
490 )
491 .await?;
492
493 Ok(StageResult::Response(Self::send_immediate_rows(rows)))
494 }
495}