1use std::collections::BTreeMap;
11
12use maplit::btreemap;
13use mz_catalog::memory::objects::{CatalogItem, View};
14use mz_expr::CollectionPlan;
15use mz_ore::instrument;
16use mz_repr::explain::{ExprHumanizerExt, TransientItem};
17use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
18use mz_repr::{Datum, RelationDesc, Row};
19use mz_sql::ast::ExplainStage;
20use mz_sql::catalog::CatalogError;
21use mz_sql::names::ResolvedIds;
22use mz_sql::plan::{self};
23use mz_sql::session::metadata::SessionMetadata;
24use tracing::Span;
25
26use crate::command::ExecuteResponse;
27use crate::coord::sequencer::inner::return_if_err;
28use crate::coord::{
29 Coordinator, CreateViewExplain, CreateViewFinish, CreateViewOptimize, CreateViewStage,
30 ExplainContext, ExplainPlanContext, Message, PlanValidity, StageResult, Staged,
31 infer_sql_type_for_catalog,
32};
33use crate::error::AdapterError;
34use crate::explain::explain_plan;
35use crate::explain::optimizer_trace::OptimizerTrace;
36use crate::optimize::{self, Optimize};
37use crate::session::Session;
38use crate::{AdapterNotice, ExecuteContext, catalog};
39
40impl Staged for CreateViewStage {
41 type Ctx = ExecuteContext;
42
43 fn validity(&mut self) -> &mut PlanValidity {
44 match self {
45 Self::Optimize(stage) => &mut stage.validity,
46 Self::Finish(stage) => &mut stage.validity,
47 Self::Explain(stage) => &mut stage.validity,
48 }
49 }
50
51 async fn stage(
52 self,
53 coord: &mut Coordinator,
54 ctx: &mut ExecuteContext,
55 ) -> Result<StageResult<Box<Self>>, AdapterError> {
56 match self {
57 CreateViewStage::Optimize(stage) => coord.create_view_optimize(stage).await,
58 CreateViewStage::Finish(stage) => coord.create_view_finish(ctx.session(), stage).await,
59 CreateViewStage::Explain(stage) => {
60 coord.create_view_explain(ctx.session(), stage).await
61 }
62 }
63 }
64
65 fn message(self, ctx: ExecuteContext, span: Span) -> Message {
66 Message::CreateViewStageReady {
67 ctx,
68 span,
69 stage: self,
70 }
71 }
72
73 fn cancel_enabled(&self) -> bool {
74 true
75 }
76}
77
78impl Coordinator {
79 pub(crate) async fn sequence_create_view(
80 &mut self,
81 ctx: ExecuteContext,
82 plan: plan::CreateViewPlan,
83 resolved_ids: ResolvedIds,
84 ) {
85 let stage = return_if_err!(
86 self.create_view_validate(plan, resolved_ids, ExplainContext::None),
87 ctx
88 );
89 self.sequence_staged(ctx, Span::current(), stage).await;
90 }
91
92 #[instrument]
93 pub(crate) async fn explain_create_view(
94 &mut self,
95 ctx: ExecuteContext,
96 plan::ExplainPlanPlan {
97 stage,
98 format,
99 config,
100 explainee,
101 }: plan::ExplainPlanPlan,
102 ) {
103 let plan::Explainee::Statement(stmt) = explainee else {
104 unreachable!()
107 };
108 let plan::ExplaineeStatement::CreateView { broken, plan } = stmt else {
109 unreachable!()
112 };
113
114 let optimizer_trace = OptimizerTrace::new(stage.paths());
117
118 let resolved_ids = ResolvedIds::empty();
120
121 let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
122 broken,
123 config,
124 format,
125 stage,
126 replan: None,
127 desc: None,
128 optimizer_trace,
129 });
130 let stage = return_if_err!(
131 self.create_view_validate(plan, resolved_ids, explain_ctx),
132 ctx
133 );
134 self.sequence_staged(ctx, Span::current(), stage).await;
135 }
136
137 #[instrument]
138 pub(crate) async fn explain_replan_view(
139 &mut self,
140 ctx: ExecuteContext,
141 plan::ExplainPlanPlan {
142 stage,
143 format,
144 config,
145 explainee,
146 }: plan::ExplainPlanPlan,
147 ) {
148 let plan::Explainee::ReplanView(id) = explainee else {
149 unreachable!() };
151 let CatalogItem::View(item) = self.catalog().get_entry(&id).item() else {
152 unreachable!() };
154 let gid = item.global_id();
155
156 let create_sql = item.create_sql.clone();
157 let plan_result = self
158 .catalog_mut()
159 .deserialize_plan_with_enable_for_item_parsing(&create_sql, true);
160 let (plan, resolved_ids) = return_if_err!(plan_result, ctx);
161
162 let plan::Plan::CreateView(plan) = plan else {
163 unreachable!() };
165
166 let broken = false;
169
170 let optimizer_trace = OptimizerTrace::new(stage.paths());
173
174 let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
175 broken,
176 config,
177 format,
178 stage,
179 replan: Some(gid),
180 desc: None,
181 optimizer_trace,
182 });
183 let stage = return_if_err!(
184 self.create_view_validate(plan, resolved_ids, explain_ctx),
185 ctx
186 );
187 self.sequence_staged(ctx, Span::current(), stage).await;
188 }
189
190 #[instrument]
191 pub(crate) fn explain_view(
192 &self,
193 ctx: &ExecuteContext,
194 plan::ExplainPlanPlan {
195 stage,
196 format,
197 config,
198 explainee,
199 }: plan::ExplainPlanPlan,
200 ) -> Result<ExecuteResponse, AdapterError> {
201 let plan::Explainee::View(id) = explainee else {
202 unreachable!() };
204 let CatalogItem::View(view) = self.catalog().get_entry(&id).item() else {
205 unreachable!() };
207
208 let target_cluster = None; let features =
211 OptimizerFeatures::from(self.catalog().system_config()).override_from(&config.features);
212
213 let cardinality_stats = BTreeMap::new();
214
215 let explain = match stage {
216 ExplainStage::RawPlan => explain_plan(
217 view.raw_expr.as_ref().clone(),
218 format,
219 &config,
220 &features,
221 &self.catalog().for_session(ctx.session()),
222 cardinality_stats,
223 target_cluster,
224 )?,
225 ExplainStage::LocalPlan => explain_plan(
226 view.optimized_expr.as_inner().clone(),
227 format,
228 &config,
229 &features,
230 &self.catalog().for_session(ctx.session()),
231 cardinality_stats,
232 target_cluster,
233 )?,
234 _ => {
235 coord_bail!("cannot EXPLAIN {} FOR VIEW", stage);
236 }
237 };
238
239 let row = Row::pack_slice(&[Datum::from(explain.as_str())]);
240
241 Ok(Self::send_immediate_rows(row))
242 }
243
244 #[instrument]
245 fn create_view_validate(
246 &self,
247 plan: plan::CreateViewPlan,
248 resolved_ids: ResolvedIds,
249 explain_ctx: ExplainContext,
252 ) -> Result<CreateViewStage, AdapterError> {
253 let plan::CreateViewPlan {
254 view: plan::View { expr, .. },
255 ambiguous_columns,
256 ..
257 } = &plan;
258
259 let expr_depends_on = expr.depends_on();
264 self.catalog()
265 .validate_timeline_context(expr_depends_on.iter().copied())?;
266 self.validate_system_column_references(*ambiguous_columns, &expr_depends_on)?;
267
268 let validity =
269 PlanValidity::require_transient_revision(self.catalog().transient_revision());
270
271 Ok(CreateViewStage::Optimize(CreateViewOptimize {
272 validity,
273 plan,
274 resolved_ids,
275 explain_ctx,
276 }))
277 }
278
279 #[instrument]
280 async fn create_view_optimize(
281 &mut self,
282 CreateViewOptimize {
283 validity,
284 plan,
285 resolved_ids,
286 explain_ctx,
287 }: CreateViewOptimize,
288 ) -> Result<StageResult<Box<CreateViewStage>>, AdapterError> {
289 let id_ts = self.get_catalog_write_ts().await;
290 let (item_id, global_id) = self.catalog().allocate_user_id(id_ts).await?;
291
292 let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config())
294 .override_from(&explain_ctx);
295
296 let mut optimizer =
298 optimize::view::Optimizer::new(optimizer_config, Some(self.optimizer_metrics()));
299
300 let span = Span::current();
301 Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
302 || "optimize create view",
303 move || {
304 span.in_scope(|| {
305 let mut pipeline =
306 || -> Result<mz_expr::OptimizedMirRelationExpr, AdapterError> {
307 let _dispatch_guard = explain_ctx.dispatch_guard();
308
309 let raw_expr = plan.view.expr.clone();
311 let optimized_expr = optimizer.catch_unwind_optimize(raw_expr)?;
312
313 Ok(optimized_expr)
314 };
315
316 let stage = match pipeline() {
317 Ok(optimized_expr) => {
318 if let ExplainContext::Plan(explain_ctx) = explain_ctx {
319 CreateViewStage::Explain(CreateViewExplain {
320 validity,
321 id: global_id,
322 plan,
323 explain_ctx,
324 })
325 } else {
326 CreateViewStage::Finish(CreateViewFinish {
327 validity,
328 item_id,
329 global_id,
330 plan,
331 optimized_expr,
332 resolved_ids,
333 })
334 }
335 }
336 Err(err) => {
339 let ExplainContext::Plan(explain_ctx) = explain_ctx else {
340 return Err(err);
342 };
343
344 if explain_ctx.broken {
345 tracing::error!("error while handling EXPLAIN statement: {}", err);
349 CreateViewStage::Explain(CreateViewExplain {
350 validity,
351 id: global_id,
352 plan,
353 explain_ctx,
354 })
355 } else {
356 return Err(err);
358 }
359 }
360 };
361
362 Ok(Box::new(stage))
363 })
364 },
365 )))
366 }
367
368 #[instrument]
369 async fn create_view_finish(
370 &mut self,
371 session: &Session,
372 CreateViewFinish {
373 item_id,
374 global_id,
375 plan:
376 plan::CreateViewPlan {
377 name,
378 view:
379 plan::View {
380 create_sql,
381 expr: raw_expr,
382 dependencies,
383 column_names,
384 temporary,
385 },
386 drop_ids,
387 if_not_exists,
388 ..
389 },
390 optimized_expr,
391 resolved_ids,
392 ..
393 }: CreateViewFinish,
394 ) -> Result<StageResult<Box<CreateViewStage>>, AdapterError> {
395 let typ = infer_sql_type_for_catalog(&raw_expr, &optimized_expr);
396 let ops = vec![
397 catalog::Op::DropObjects(
398 drop_ids
399 .iter()
400 .map(|id| catalog::DropObjectInfo::Item(*id))
401 .collect(),
402 ),
403 catalog::Op::CreateItem {
404 id: item_id,
405 name: name.clone(),
406 item: CatalogItem::View(View {
407 create_sql: create_sql.clone(),
408 global_id,
409 raw_expr: raw_expr.into(),
410 desc: RelationDesc::new(typ, column_names.clone()),
411 optimized_expr: optimized_expr.into(),
412 conn_id: if temporary {
413 Some(session.conn_id().clone())
414 } else {
415 None
416 },
417 resolved_ids: resolved_ids.clone(),
418 dependencies: dependencies.clone(),
419 }),
420 owner_id: *session.current_role_id(),
421 },
422 ];
423
424 match self.catalog_transact(Some(session), ops).await {
425 Ok(()) => Ok(StageResult::Response(ExecuteResponse::CreatedView)),
426 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
427 kind:
428 mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
429 })) if if_not_exists => {
430 session.add_notice(AdapterNotice::ObjectAlreadyExists {
431 name: name.item,
432 ty: "view",
433 });
434 Ok(StageResult::Response(ExecuteResponse::CreatedView))
435 }
436 Err(err) => Err(err),
437 }
438 }
439
440 #[instrument]
441 async fn create_view_explain(
442 &self,
443 session: &Session,
444 CreateViewExplain {
445 id,
446 plan:
447 plan::CreateViewPlan {
448 name,
449 view: plan::View { column_names, .. },
450 ..
451 },
452 explain_ctx:
453 ExplainPlanContext {
454 config,
455 format,
456 stage,
457 optimizer_trace,
458 ..
459 },
460 ..
461 }: CreateViewExplain,
462 ) -> Result<StageResult<Box<CreateViewStage>>, AdapterError> {
463 let session_catalog = self.catalog().for_session(session);
464 let expr_humanizer = {
465 let full_name = self.catalog().resolve_full_name(&name, None);
466 let transient_items = btreemap! {
467 id => TransientItem::new(
468 Some(full_name.into_parts()),
469 Some(column_names.iter().map(|c| c.to_string()).collect()),
470 )
471 };
472 ExprHumanizerExt::new(transient_items, &session_catalog)
473 };
474
475 let features =
476 OptimizerFeatures::from(self.catalog().system_config()).override_from(&config.features);
477
478 let rows = optimizer_trace
479 .into_rows(
480 format,
481 &config,
482 &features,
483 &expr_humanizer,
484 None,
485 None, Default::default(),
487 stage,
488 plan::ExplaineeStatementKind::CreateView,
489 None,
490 )
491 .await?;
492
493 Ok(StageResult::Response(Self::send_immediate_rows(rows)))
494 }
495}