1use std::collections::BTreeMap;
11
12use maplit::btreemap;
13use mz_catalog::memory::error::ErrorKind;
14use mz_catalog::memory::objects::{CatalogItem, Index};
15use mz_ore::instrument;
16use mz_repr::explain::{ExprHumanizerExt, TransientItem};
17use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
18use mz_repr::{Datum, Row};
19use mz_sql::ast::ExplainStage;
20use mz_sql::catalog::CatalogError;
21use mz_sql::names::ResolvedIds;
22use mz_sql::plan;
23use mz_sql::session::metadata::SessionMetadata;
24use tracing::Span;
25
26use crate::catalog::CatalogState;
27use crate::command::ExecuteResponse;
28use crate::coord::sequencer::inner::return_if_err;
29use crate::coord::{
30 Coordinator, CreateIndexExplain, CreateIndexFinish, CreateIndexOptimize, CreateIndexStage,
31 ExplainContext, ExplainPlanContext, Message, PlanValidity, StageResult, Staged,
32};
33use crate::error::AdapterError;
34use crate::explain::explain_dataflow;
35use crate::explain::optimizer_trace::OptimizerTrace;
36use crate::optimize::dataflows::dataflow_import_id_bundle;
37use crate::optimize::{self, Optimize};
38use crate::session::Session;
39use crate::{AdapterNotice, ExecuteContext, catalog};
40
41impl Staged for CreateIndexStage {
42 type Ctx = ExecuteContext;
43
44 fn validity(&mut self) -> &mut PlanValidity {
45 match self {
46 Self::Optimize(stage) => &mut stage.validity,
47 Self::Finish(stage) => &mut stage.validity,
48 Self::Explain(stage) => &mut stage.validity,
49 }
50 }
51
52 async fn stage(
53 self,
54 coord: &mut Coordinator,
55 ctx: &mut ExecuteContext,
56 ) -> Result<StageResult<Box<Self>>, AdapterError> {
57 match self {
58 CreateIndexStage::Optimize(stage) => coord.create_index_optimize(stage).await,
59 CreateIndexStage::Finish(stage) => coord.create_index_finish(ctx, stage).await,
60 CreateIndexStage::Explain(stage) => {
61 coord.create_index_explain(ctx.session(), stage).await
62 }
63 }
64 }
65
66 fn message(self, ctx: ExecuteContext, span: Span) -> Message {
67 Message::CreateIndexStageReady {
68 ctx,
69 span,
70 stage: self,
71 }
72 }
73
74 fn cancel_enabled(&self) -> bool {
75 true
76 }
77}
78
79impl Coordinator {
80 #[instrument]
81 pub(crate) async fn sequence_create_index(
82 &mut self,
83 ctx: ExecuteContext,
84 plan: plan::CreateIndexPlan,
85 resolved_ids: ResolvedIds,
86 ) {
87 let stage = return_if_err!(
88 self.create_index_validate(ctx.session(), plan, resolved_ids, ExplainContext::None),
89 ctx
90 );
91 self.sequence_staged(ctx, Span::current(), stage).await;
92 }
93
94 #[instrument]
95 pub(crate) async fn explain_create_index(
96 &mut self,
97 ctx: ExecuteContext,
98 plan::ExplainPlanPlan {
99 stage,
100 format,
101 config,
102 explainee,
103 }: plan::ExplainPlanPlan,
104 ) {
105 let plan::Explainee::Statement(stmt) = explainee else {
106 unreachable!()
109 };
110 let plan::ExplaineeStatement::CreateIndex { broken, plan } = stmt else {
111 unreachable!()
114 };
115
116 let optimizer_trace = OptimizerTrace::new(stage.paths());
119
120 let resolved_ids = ResolvedIds::empty();
122
123 let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
124 broken,
125 config,
126 format,
127 stage,
128 replan: None,
129 desc: None,
130 optimizer_trace,
131 });
132 let stage = return_if_err!(
133 self.create_index_validate(ctx.session(), plan, resolved_ids, explain_ctx),
134 ctx
135 );
136 self.sequence_staged(ctx, Span::current(), stage).await;
137 }
138
139 #[instrument]
140 pub(crate) async fn explain_replan_index(
141 &mut self,
142 ctx: ExecuteContext,
143 plan::ExplainPlanPlan {
144 stage,
145 format,
146 config,
147 explainee,
148 }: plan::ExplainPlanPlan,
149 ) {
150 let plan::Explainee::ReplanIndex(id) = explainee else {
151 unreachable!() };
153 let CatalogItem::Index(index) = self.catalog().get_entry(&id).item() else {
154 unreachable!() };
156 let id = index.global_id();
157
158 let create_sql = index.create_sql.clone();
159 let plan_result = self
160 .catalog_mut()
161 .deserialize_plan_with_enable_for_item_parsing(&create_sql, true);
162 let (plan, resolved_ids) = return_if_err!(plan_result, ctx);
163
164 let plan::Plan::CreateIndex(plan) = plan else {
165 unreachable!() };
167
168 let broken = false;
171
172 let optimizer_trace = OptimizerTrace::new(stage.paths());
175
176 let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
177 broken,
178 config,
179 format,
180 stage,
181 replan: Some(id),
182 desc: None,
183 optimizer_trace,
184 });
185 let stage = return_if_err!(
186 self.create_index_validate(ctx.session(), plan, resolved_ids, explain_ctx),
187 ctx
188 );
189 self.sequence_staged(ctx, Span::current(), stage).await;
190 }
191
192 #[instrument]
193 pub(crate) fn explain_index(
194 &self,
195 ctx: &ExecuteContext,
196 plan::ExplainPlanPlan {
197 stage,
198 format,
199 config,
200 explainee,
201 }: plan::ExplainPlanPlan,
202 ) -> Result<ExecuteResponse, AdapterError> {
203 let plan::Explainee::Index(id) = explainee else {
204 unreachable!() };
206 let CatalogItem::Index(index) = self.catalog().get_entry(&id).item() else {
207 unreachable!() };
209
210 let Some(dataflow_metainfo) = self.catalog().try_get_dataflow_metainfo(&index.global_id())
211 else {
212 if !id.is_system() {
213 tracing::error!("cannot find dataflow metainformation for index {id} in catalog");
214 }
215 coord_bail!("cannot find dataflow metainformation for index {id} in catalog");
216 };
217
218 let target_cluster = self.catalog().get_cluster(index.cluster_id);
219
220 let features = OptimizerFeatures::from(self.catalog().system_config())
221 .override_from(&target_cluster.config.features())
222 .override_from(&self.cluster_scoped_optimizer_overrides(index.cluster_id))
223 .override_from(&config.features);
224
225 let cardinality_stats = BTreeMap::new();
227
228 let explain = match stage {
229 ExplainStage::GlobalPlan => {
230 let Some(plan) = self
231 .catalog()
232 .try_get_optimized_plan(&index.global_id())
233 .cloned()
234 else {
235 tracing::error!("cannot find {stage} for index {id} in catalog");
236 coord_bail!("cannot find {stage} for index in catalog");
237 };
238
239 explain_dataflow(
240 plan,
241 format,
242 &config,
243 &features,
244 &self.catalog().for_session(ctx.session()),
245 cardinality_stats,
246 Some(target_cluster.name.as_str()),
247 dataflow_metainfo,
248 )?
249 }
250 ExplainStage::PhysicalPlan => {
251 let Some(plan) = self
252 .catalog()
253 .try_get_physical_plan(&index.global_id())
254 .cloned()
255 else {
256 tracing::error!("cannot find {stage} for index {id} in catalog");
257 coord_bail!("cannot find {stage} for index in catalog");
258 };
259 explain_dataflow(
260 plan,
261 format,
262 &config,
263 &features,
264 &self.catalog().for_session(ctx.session()),
265 cardinality_stats,
266 Some(target_cluster.name.as_str()),
267 dataflow_metainfo,
268 )?
269 }
270 _ => {
271 coord_bail!("cannot EXPLAIN {} FOR INDEX", stage);
272 }
273 };
274
275 let row = Row::pack_slice(&[Datum::from(explain.as_str())]);
276
277 Ok(Self::send_immediate_rows(row))
278 }
279
280 #[instrument]
283 fn create_index_validate(
284 &self,
285 session: &Session,
286 plan: plan::CreateIndexPlan,
287 resolved_ids: ResolvedIds,
288 explain_ctx: ExplainContext,
289 ) -> Result<CreateIndexStage, AdapterError> {
290 let validity = PlanValidity::new(
294 self.catalog().transient_revision(),
295 resolved_ids.items().copied().collect(),
296 Some(plan.index.cluster_id),
297 None,
298 session.role_metadata().clone(),
299 );
300 Ok(CreateIndexStage::Optimize(CreateIndexOptimize {
301 validity,
302 plan,
303 resolved_ids,
304 explain_ctx,
305 }))
306 }
307
308 #[instrument]
309 async fn create_index_optimize(
310 &mut self,
311 CreateIndexOptimize {
312 validity,
313 plan,
314 resolved_ids,
315 explain_ctx,
316 }: CreateIndexOptimize,
317 ) -> Result<StageResult<Box<CreateIndexStage>>, AdapterError> {
318 let plan::CreateIndexPlan {
319 index: plan::Index { cluster_id, .. },
320 ..
321 } = &plan;
322
323 let compute_instance = self
325 .instance_snapshot(*cluster_id)
326 .expect("compute instance does not exist");
327 let (item_id, global_id) = if let ExplainContext::None = explain_ctx {
328 self.allocate_user_id().await?
329 } else {
330 self.allocate_transient_id()
331 };
332
333 let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config())
334 .override_from(&self.catalog.get_cluster(*cluster_id).config.features())
335 .override_from(&self.cluster_scoped_optimizer_overrides(*cluster_id))
336 .override_from(&explain_ctx);
337 let optimizer_features = optimizer_config.features.clone();
338
339 let mut optimizer = optimize::index::Optimizer::new(
341 self.owned_catalog(),
342 compute_instance,
343 global_id,
344 optimizer_config,
345 self.optimizer_metrics(),
346 );
347 let span = Span::current();
348 Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
349 || "optimize create index",
350 move || {
351 span.in_scope(|| {
352 let mut pipeline = || -> Result<(
353 optimize::index::GlobalMirPlan,
354 optimize::index::GlobalLirPlan,
355 ), AdapterError> {
356 let _dispatch_guard = explain_ctx.dispatch_guard();
357
358 let index_plan = optimize::index::Index::new(
359 plan.name.clone(),
360 plan.index.on,
361 plan.index.keys.clone(),
362 );
363
364 let global_mir_plan = optimizer.catch_unwind_optimize(index_plan)?;
366 let global_lir_plan = optimizer.catch_unwind_optimize(global_mir_plan.clone())?;
368
369 Ok((global_mir_plan, global_lir_plan))
370 };
371
372 let stage = match pipeline() {
373 Ok((global_mir_plan, global_lir_plan)) => {
374 if let ExplainContext::Plan(explain_ctx) = explain_ctx {
375 let (_, df_meta) = global_lir_plan.unapply();
376 CreateIndexStage::Explain(CreateIndexExplain {
377 validity,
378 exported_index_id: global_id,
379 plan,
380 df_meta,
381 explain_ctx,
382 })
383 } else {
384 CreateIndexStage::Finish(CreateIndexFinish {
385 validity,
386 item_id,
387 global_id,
388 plan,
389 resolved_ids,
390 global_mir_plan,
391 global_lir_plan,
392 optimizer_features,
393 })
394 }
395 }
396 Err(err) => {
399 let ExplainContext::Plan(explain_ctx) = explain_ctx else {
400 return Err(err);
402 };
403
404 if explain_ctx.broken {
405 tracing::error!("error while handling EXPLAIN statement: {}", err);
409 CreateIndexStage::Explain(CreateIndexExplain {
410 validity,
411 exported_index_id: global_id,
412 plan,
413 df_meta: Default::default(),
414 explain_ctx,
415 })
416 } else {
417 return Err(err);
419 }
420 }
421 };
422 Ok(Box::new(stage))
423 })
424 },
425 )))
426 }
427
428 #[instrument]
429 async fn create_index_finish(
430 &mut self,
431 ctx: &mut ExecuteContext,
432 stage: CreateIndexFinish,
433 ) -> Result<StageResult<Box<CreateIndexStage>>, AdapterError> {
434 let CreateIndexFinish {
435 item_id,
436 global_id,
437 plan:
438 plan::CreateIndexPlan {
439 name,
440 index:
441 plan::Index {
442 create_sql,
443 on,
444 keys,
445 cluster_id,
446 compaction_window,
447 },
448 if_not_exists,
449 },
450 resolved_ids,
451 global_mir_plan,
452 global_lir_plan,
453 optimizer_features,
454 ..
455 } = stage;
456 let id_bundle = dataflow_import_id_bundle(global_lir_plan.df_desc(), cluster_id);
457
458 let on_entry = self.catalog().get_entry_by_global_id(&on);
459 let owner_id = *on_entry.owner_id();
460
461 let ops = vec![catalog::Op::CreateItem {
462 id: item_id,
463 name: name.clone(),
464 item: CatalogItem::Index(Index {
465 create_sql,
466 global_id,
467 keys: keys.into(),
468 on,
469 conn_id: None,
470 resolved_ids,
471 cluster_id,
472 is_retained_metrics_object: false,
473 custom_logical_compaction_window: compaction_window,
474 optimized_plan: None,
475 physical_plan: None,
476 dataflow_metainfo: None,
477 }),
478 owner_id,
479 }];
480
481 let notice_ids = std::iter::repeat_with(|| self.allocate_transient_id())
483 .map(|(_item_id, global_id)| global_id)
484 .take(global_lir_plan.df_meta().optimizer_notices.len())
485 .collect::<Vec<_>>();
486
487 let (mut df_desc, raw_df_meta) = global_lir_plan.unapply();
501 let df_meta = {
502 let system_catalog = self.catalog().for_system_session();
503 let full_name = self.catalog().resolve_full_name(&name, None);
504 let on_desc = on_entry
505 .relation_desc()
506 .expect("can only create indexes on items with a valid description");
507 let transient_items = btreemap! {
508 global_id => TransientItem::new(
509 Some(full_name.into_parts()),
510 Some(on_desc.iter_names().map(|c| c.to_string()).collect()),
511 )
512 };
513 let humanizer = ExprHumanizerExt::new(transient_items, &system_catalog);
514 CatalogState::render_notices_core(
515 &humanizer,
516 (self.catalog().config().now)(),
517 &raw_df_meta,
518 notice_ids,
519 Some(global_id),
520 )
521 };
522
523 self.catalog()
528 .cache_expressions(
529 global_id,
530 None,
531 global_mir_plan.df_desc().clone(),
532 df_desc.clone(),
533 df_meta.clone(),
534 optimizer_features,
535 )
536 .await;
537
538 let transact_result = self
539 .catalog_transact_with_side_effects(Some(ctx), ops, move |coord, _ctx| {
540 Box::pin(async move {
541 coord
543 .catalog_mut()
544 .set_optimized_plan(global_id, global_mir_plan.df_desc().clone());
545 coord
546 .catalog_mut()
547 .set_physical_plan(global_id, df_desc.clone());
548
549 let notice_builtin_updates_fut =
550 coord.persist_dataflow_metainfo(df_meta, global_id).await;
551
552 let read_holds = coord.acquire_read_holds(&id_bundle);
560 let since = read_holds.least_valid_read();
561 df_desc.set_as_of(since);
562
563 coord
564 .ship_dataflow_and_notice_builtin_table_updates(
565 df_desc,
566 cluster_id,
567 notice_builtin_updates_fut,
568 None,
569 )
570 .await;
571 drop(read_holds);
576
577 coord.update_compute_read_policy(
578 cluster_id,
579 item_id,
580 compaction_window.unwrap_or_default().into(),
581 );
582 })
583 })
584 .await;
585
586 match transact_result {
587 Ok(_) => {
588 self.emit_raw_optimizer_notices_to_user(ctx, &raw_df_meta.optimizer_notices);
593 Ok(StageResult::Response(ExecuteResponse::CreatedIndex))
594 }
595 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
596 kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
597 })) if if_not_exists => {
598 ctx.session()
599 .add_notice(AdapterNotice::ObjectAlreadyExists {
600 name: name.item,
601 ty: "index",
602 });
603 Ok(StageResult::Response(ExecuteResponse::CreatedIndex))
604 }
605 Err(err) => Err(err),
606 }
607 }
608
609 #[instrument]
610 async fn create_index_explain(
611 &self,
612 session: &Session,
613 CreateIndexExplain {
614 exported_index_id,
615 plan: plan::CreateIndexPlan { name, index, .. },
616 df_meta,
617 explain_ctx:
618 ExplainPlanContext {
619 config,
620 format,
621 stage,
622 optimizer_trace,
623 ..
624 },
625 ..
626 }: CreateIndexExplain,
627 ) -> Result<StageResult<Box<CreateIndexStage>>, AdapterError> {
628 let session_catalog = self.catalog().for_session(session);
629 let expr_humanizer = {
630 let on_entry = self.catalog.get_entry_by_global_id(&index.on);
631 let full_name = self.catalog.resolve_full_name(&name, on_entry.conn_id());
632 let on_desc = on_entry
633 .relation_desc()
634 .expect("can only create indexes on items with a valid description");
635
636 let transient_items = btreemap! {
637 exported_index_id => TransientItem::new(
638 Some(full_name.into_parts()),
639 Some(on_desc.iter_names().map(|c| c.to_string()).collect()),
640 )
641 };
642 ExprHumanizerExt::new(transient_items, &session_catalog)
643 };
644
645 let target_cluster = self.catalog().get_cluster(index.cluster_id);
646
647 let features = OptimizerFeatures::from(self.catalog().system_config())
648 .override_from(&target_cluster.config.features())
649 .override_from(&self.cluster_scoped_optimizer_overrides(index.cluster_id))
650 .override_from(&config.features);
651
652 let rows = optimizer_trace
653 .into_rows(
654 format,
655 &config,
656 &features,
657 &expr_humanizer,
658 None,
659 Some(target_cluster),
660 df_meta,
661 stage,
662 plan::ExplaineeStatementKind::CreateIndex,
663 None,
664 )
665 .await?;
666
667 Ok(StageResult::Response(Self::send_immediate_rows(rows)))
668 }
669}