1use std::collections::BTreeMap;
11
12use maplit::btreemap;
13use mz_catalog::memory::objects::{CatalogItem, View};
14use mz_expr::CollectionPlan;
15use mz_ore::instrument;
16use mz_repr::explain::{ExprHumanizerExt, TransientItem};
17use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
18use mz_repr::{Datum, RelationDesc, Row};
19use mz_sql::ast::ExplainStage;
20use mz_sql::catalog::CatalogError;
21use mz_sql::names::ResolvedIds;
22use mz_sql::plan::{self};
23use mz_sql::session::metadata::SessionMetadata;
24use tracing::Span;
25
26use crate::command::ExecuteResponse;
27use crate::coord::sequencer::inner::return_if_err;
28use crate::coord::{
29 Coordinator, CreateViewExplain, CreateViewFinish, CreateViewOptimize, CreateViewStage,
30 ExplainContext, ExplainPlanContext, Message, PlanValidity, StageResult, Staged,
31};
32use crate::error::AdapterError;
33use crate::explain::explain_plan;
34use crate::explain::optimizer_trace::OptimizerTrace;
35use crate::optimize::{self, Optimize};
36use crate::session::Session;
37use crate::{AdapterNotice, ExecuteContext, catalog};
38
39impl Staged for CreateViewStage {
40 type Ctx = ExecuteContext;
41
42 fn validity(&mut self) -> &mut PlanValidity {
43 match self {
44 Self::Optimize(stage) => &mut stage.validity,
45 Self::Finish(stage) => &mut stage.validity,
46 Self::Explain(stage) => &mut stage.validity,
47 }
48 }
49
50 async fn stage(
51 self,
52 coord: &mut Coordinator,
53 ctx: &mut ExecuteContext,
54 ) -> Result<StageResult<Box<Self>>, AdapterError> {
55 match self {
56 CreateViewStage::Optimize(stage) => coord.create_view_optimize(stage).await,
57 CreateViewStage::Finish(stage) => coord.create_view_finish(ctx.session(), stage).await,
58 CreateViewStage::Explain(stage) => {
59 coord.create_view_explain(ctx.session(), stage).await
60 }
61 }
62 }
63
64 fn message(self, ctx: ExecuteContext, span: Span) -> Message {
65 Message::CreateViewStageReady {
66 ctx,
67 span,
68 stage: self,
69 }
70 }
71
72 fn cancel_enabled(&self) -> bool {
73 true
74 }
75}
76
77impl Coordinator {
78 pub(crate) async fn sequence_create_view(
79 &mut self,
80 ctx: ExecuteContext,
81 plan: plan::CreateViewPlan,
82 resolved_ids: ResolvedIds,
83 ) {
84 let stage = return_if_err!(
85 self.create_view_validate(plan, resolved_ids, ExplainContext::None),
86 ctx
87 );
88 self.sequence_staged(ctx, Span::current(), stage).await;
89 }
90
91 #[instrument]
92 pub(crate) async fn explain_create_view(
93 &mut self,
94 ctx: ExecuteContext,
95 plan::ExplainPlanPlan {
96 stage,
97 format,
98 config,
99 explainee,
100 }: plan::ExplainPlanPlan,
101 ) {
102 let plan::Explainee::Statement(stmt) = explainee else {
103 unreachable!()
106 };
107 let plan::ExplaineeStatement::CreateView { broken, plan } = stmt else {
108 unreachable!()
111 };
112
113 let optimizer_trace = OptimizerTrace::new(stage.paths());
116
117 let resolved_ids = ResolvedIds::empty();
119
120 let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
121 broken,
122 config,
123 format,
124 stage,
125 replan: None,
126 desc: None,
127 optimizer_trace,
128 });
129 let stage = return_if_err!(
130 self.create_view_validate(plan, resolved_ids, explain_ctx),
131 ctx
132 );
133 self.sequence_staged(ctx, Span::current(), stage).await;
134 }
135
136 #[instrument]
137 pub(crate) async fn explain_replan_view(
138 &mut self,
139 ctx: ExecuteContext,
140 plan::ExplainPlanPlan {
141 stage,
142 format,
143 config,
144 explainee,
145 }: plan::ExplainPlanPlan,
146 ) {
147 let plan::Explainee::ReplanView(id) = explainee else {
148 unreachable!() };
150 let CatalogItem::View(item) = self.catalog().get_entry(&id).item() else {
151 unreachable!() };
153 let gid = item.global_id();
154
155 let create_sql = item.create_sql.clone();
156 let plan_result = self
157 .catalog_mut()
158 .deserialize_plan_with_enable_for_item_parsing(&create_sql, true);
159 let (plan, resolved_ids) = return_if_err!(plan_result, ctx);
160
161 let plan::Plan::CreateView(plan) = plan else {
162 unreachable!() };
164
165 let broken = false;
168
169 let optimizer_trace = OptimizerTrace::new(stage.paths());
172
173 let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
174 broken,
175 config,
176 format,
177 stage,
178 replan: Some(gid),
179 desc: None,
180 optimizer_trace,
181 });
182 let stage = return_if_err!(
183 self.create_view_validate(plan, resolved_ids, explain_ctx),
184 ctx
185 );
186 self.sequence_staged(ctx, Span::current(), stage).await;
187 }
188
189 #[instrument]
190 pub(crate) fn explain_view(
191 &mut self,
192 ctx: &ExecuteContext,
193 plan::ExplainPlanPlan {
194 stage,
195 format,
196 config,
197 explainee,
198 }: plan::ExplainPlanPlan,
199 ) -> Result<ExecuteResponse, AdapterError> {
200 let plan::Explainee::View(id) = explainee else {
201 unreachable!() };
203 let CatalogItem::View(view) = self.catalog().get_entry(&id).item() else {
204 unreachable!() };
206
207 let target_cluster = None; let features =
210 OptimizerFeatures::from(self.catalog().system_config()).override_from(&config.features);
211
212 let cardinality_stats = BTreeMap::new();
213
214 let explain = match stage {
215 ExplainStage::RawPlan => explain_plan(
216 view.raw_expr.as_ref().clone(),
217 format,
218 &config,
219 &features,
220 &self.catalog().for_session(ctx.session()),
221 cardinality_stats,
222 target_cluster,
223 )?,
224 ExplainStage::LocalPlan => explain_plan(
225 view.optimized_expr.as_inner().clone(),
226 format,
227 &config,
228 &features,
229 &self.catalog().for_session(ctx.session()),
230 cardinality_stats,
231 target_cluster,
232 )?,
233 _ => {
234 coord_bail!("cannot EXPLAIN {} FOR VIEW", stage);
235 }
236 };
237
238 let row = Row::pack_slice(&[Datum::from(explain.as_str())]);
239
240 Ok(Self::send_immediate_rows(row))
241 }
242
243 #[instrument]
244 fn create_view_validate(
245 &mut self,
246 plan: plan::CreateViewPlan,
247 resolved_ids: ResolvedIds,
248 explain_ctx: ExplainContext,
251 ) -> Result<CreateViewStage, AdapterError> {
252 let plan::CreateViewPlan {
253 view: plan::View { expr, .. },
254 ambiguous_columns,
255 ..
256 } = &plan;
257
258 let expr_depends_on = expr.depends_on();
263 self.catalog()
264 .validate_timeline_context(expr_depends_on.iter().copied())?;
265 self.validate_system_column_references(*ambiguous_columns, &expr_depends_on)?;
266
267 let validity =
268 PlanValidity::require_transient_revision(self.catalog().transient_revision());
269
270 Ok(CreateViewStage::Optimize(CreateViewOptimize {
271 validity,
272 plan,
273 resolved_ids,
274 explain_ctx,
275 }))
276 }
277
278 #[instrument]
279 async fn create_view_optimize(
280 &mut self,
281 CreateViewOptimize {
282 validity,
283 plan,
284 resolved_ids,
285 explain_ctx,
286 }: CreateViewOptimize,
287 ) -> Result<StageResult<Box<CreateViewStage>>, AdapterError> {
288 let id_ts = self.get_catalog_write_ts().await;
289 let (item_id, global_id) = self.catalog_mut().allocate_user_id(id_ts).await?;
290
291 let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config())
293 .override_from(&explain_ctx);
294
295 let mut optimizer =
297 optimize::view::Optimizer::new(optimizer_config, Some(self.optimizer_metrics()));
298
299 let span = Span::current();
300 Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
301 || "optimize create view",
302 move || {
303 span.in_scope(|| {
304 let mut pipeline =
305 || -> Result<mz_expr::OptimizedMirRelationExpr, AdapterError> {
306 let _dispatch_guard = explain_ctx.dispatch_guard();
307
308 let raw_expr = plan.view.expr.clone();
310 let optimized_expr = optimizer.catch_unwind_optimize(raw_expr)?;
311
312 Ok(optimized_expr)
313 };
314
315 let stage = match pipeline() {
316 Ok(optimized_expr) => {
317 if let ExplainContext::Plan(explain_ctx) = explain_ctx {
318 CreateViewStage::Explain(CreateViewExplain {
319 validity,
320 id: global_id,
321 plan,
322 explain_ctx,
323 })
324 } else {
325 CreateViewStage::Finish(CreateViewFinish {
326 validity,
327 item_id,
328 global_id,
329 plan,
330 optimized_expr,
331 resolved_ids,
332 })
333 }
334 }
335 Err(err) => {
338 let ExplainContext::Plan(explain_ctx) = explain_ctx else {
339 return Err(err);
341 };
342
343 if explain_ctx.broken {
344 tracing::error!("error while handling EXPLAIN statement: {}", err);
348 CreateViewStage::Explain(CreateViewExplain {
349 validity,
350 id: global_id,
351 plan,
352 explain_ctx,
353 })
354 } else {
355 return Err(err);
357 }
358 }
359 };
360
361 Ok(Box::new(stage))
362 })
363 },
364 )))
365 }
366
367 #[instrument]
368 async fn create_view_finish(
369 &mut self,
370 session: &Session,
371 CreateViewFinish {
372 item_id,
373 global_id,
374 plan:
375 plan::CreateViewPlan {
376 name,
377 view:
378 plan::View {
379 create_sql,
380 expr: raw_expr,
381 dependencies,
382 column_names,
383 temporary,
384 },
385 drop_ids,
386 if_not_exists,
387 ..
388 },
389 optimized_expr,
390 resolved_ids,
391 ..
392 }: CreateViewFinish,
393 ) -> Result<StageResult<Box<CreateViewStage>>, AdapterError> {
394 let ops = vec![
395 catalog::Op::DropObjects(
396 drop_ids
397 .iter()
398 .map(|id| catalog::DropObjectInfo::Item(*id))
399 .collect(),
400 ),
401 catalog::Op::CreateItem {
402 id: item_id,
403 name: name.clone(),
404 item: CatalogItem::View(View {
405 create_sql: create_sql.clone(),
406 global_id,
407 raw_expr: raw_expr.into(),
408 desc: RelationDesc::new(optimized_expr.typ(), column_names.clone()),
409 optimized_expr: optimized_expr.into(),
410 conn_id: if temporary {
411 Some(session.conn_id().clone())
412 } else {
413 None
414 },
415 resolved_ids: resolved_ids.clone(),
416 dependencies: dependencies.clone(),
417 }),
418 owner_id: *session.current_role_id(),
419 },
420 ];
421
422 match self.catalog_transact(Some(session), ops).await {
423 Ok(()) => Ok(StageResult::Response(ExecuteResponse::CreatedView)),
424 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
425 kind:
426 mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
427 })) if if_not_exists => {
428 session.add_notice(AdapterNotice::ObjectAlreadyExists {
429 name: name.item,
430 ty: "view",
431 });
432 Ok(StageResult::Response(ExecuteResponse::CreatedView))
433 }
434 Err(err) => Err(err),
435 }
436 }
437
438 #[instrument]
439 async fn create_view_explain(
440 &mut self,
441 session: &Session,
442 CreateViewExplain {
443 id,
444 plan:
445 plan::CreateViewPlan {
446 name,
447 view: plan::View { column_names, .. },
448 ..
449 },
450 explain_ctx:
451 ExplainPlanContext {
452 config,
453 format,
454 stage,
455 optimizer_trace,
456 ..
457 },
458 ..
459 }: CreateViewExplain,
460 ) -> Result<StageResult<Box<CreateViewStage>>, AdapterError> {
461 let session_catalog = self.catalog().for_session(session);
462 let expr_humanizer = {
463 let full_name = self.catalog().resolve_full_name(&name, None);
464 let transient_items = btreemap! {
465 id => TransientItem::new(
466 Some(full_name.into_parts()),
467 Some(column_names.iter().map(|c| c.to_string()).collect()),
468 )
469 };
470 ExprHumanizerExt::new(transient_items, &session_catalog)
471 };
472
473 let features =
474 OptimizerFeatures::from(self.catalog().system_config()).override_from(&config.features);
475
476 let rows = optimizer_trace
477 .into_rows(
478 format,
479 &config,
480 &features,
481 &expr_humanizer,
482 None,
483 None, Default::default(),
485 stage,
486 plan::ExplaineeStatementKind::CreateView,
487 None,
488 )
489 .await?;
490
491 Ok(StageResult::Response(Self::send_immediate_rows(rows)))
492 }
493}