1use std::collections::BTreeMap;
11
12use maplit::btreemap;
13use mz_catalog::memory::error::ErrorKind;
14use mz_catalog::memory::objects::{CatalogItem, Index};
15use mz_ore::instrument;
16use mz_repr::explain::{ExprHumanizerExt, TransientItem};
17use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
18use mz_repr::{Datum, Row};
19use mz_sql::ast::ExplainStage;
20use mz_sql::catalog::CatalogError;
21use mz_sql::names::ResolvedIds;
22use mz_sql::plan;
23use mz_sql::session::metadata::SessionMetadata;
24use tracing::Span;
25
26use crate::catalog::CatalogState;
27use crate::command::ExecuteResponse;
28use crate::coord::sequencer::inner::return_if_err;
29use crate::coord::{
30 Coordinator, CreateIndexExplain, CreateIndexFinish, CreateIndexOptimize, CreateIndexStage,
31 ExplainContext, ExplainPlanContext, Message, PlanValidity, StageResult, Staged,
32};
33use crate::error::AdapterError;
34use crate::explain::explain_dataflow;
35use crate::explain::optimizer_trace::OptimizerTrace;
36use crate::optimize::dataflows::dataflow_import_id_bundle;
37use crate::optimize::{self, Optimize};
38use crate::session::Session;
39use crate::{AdapterNotice, ExecuteContext, catalog};
40
41impl Staged for CreateIndexStage {
42 type Ctx = ExecuteContext;
43
44 fn validity(&mut self) -> &mut PlanValidity {
45 match self {
46 Self::Optimize(stage) => &mut stage.validity,
47 Self::Finish(stage) => &mut stage.validity,
48 Self::Explain(stage) => &mut stage.validity,
49 }
50 }
51
52 async fn stage(
53 self,
54 coord: &mut Coordinator,
55 ctx: &mut ExecuteContext,
56 ) -> Result<StageResult<Box<Self>>, AdapterError> {
57 match self {
58 CreateIndexStage::Optimize(stage) => coord.create_index_optimize(stage).await,
59 CreateIndexStage::Finish(stage) => coord.create_index_finish(ctx, stage).await,
60 CreateIndexStage::Explain(stage) => {
61 coord.create_index_explain(ctx.session(), stage).await
62 }
63 }
64 }
65
66 fn message(self, ctx: ExecuteContext, span: Span) -> Message {
67 Message::CreateIndexStageReady {
68 ctx,
69 span,
70 stage: self,
71 }
72 }
73
74 fn cancel_enabled(&self) -> bool {
75 true
76 }
77}
78
79impl Coordinator {
80 #[instrument]
81 pub(crate) async fn sequence_create_index(
82 &mut self,
83 ctx: ExecuteContext,
84 plan: plan::CreateIndexPlan,
85 resolved_ids: ResolvedIds,
86 ) {
87 let stage = return_if_err!(
88 self.create_index_validate(ctx.session(), plan, resolved_ids, ExplainContext::None),
89 ctx
90 );
91 self.sequence_staged(ctx, Span::current(), stage).await;
92 }
93
94 #[instrument]
95 pub(crate) async fn explain_create_index(
96 &mut self,
97 ctx: ExecuteContext,
98 plan::ExplainPlanPlan {
99 stage,
100 format,
101 config,
102 explainee,
103 }: plan::ExplainPlanPlan,
104 ) {
105 let plan::Explainee::Statement(stmt) = explainee else {
106 unreachable!()
109 };
110 let plan::ExplaineeStatement::CreateIndex { broken, plan } = stmt else {
111 unreachable!()
114 };
115
116 let optimizer_trace = OptimizerTrace::new(stage.paths());
119
120 let resolved_ids = ResolvedIds::empty();
122
123 let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
124 broken,
125 config,
126 format,
127 stage,
128 replan: None,
129 desc: None,
130 optimizer_trace,
131 });
132 let stage = return_if_err!(
133 self.create_index_validate(ctx.session(), plan, resolved_ids, explain_ctx),
134 ctx
135 );
136 self.sequence_staged(ctx, Span::current(), stage).await;
137 }
138
139 #[instrument]
140 pub(crate) async fn explain_replan_index(
141 &mut self,
142 ctx: ExecuteContext,
143 plan::ExplainPlanPlan {
144 stage,
145 format,
146 config,
147 explainee,
148 }: plan::ExplainPlanPlan,
149 ) {
150 let plan::Explainee::ReplanIndex(id) = explainee else {
151 unreachable!() };
153 let CatalogItem::Index(index) = self.catalog().get_entry(&id).item() else {
154 unreachable!() };
156 let id = index.global_id();
157
158 let create_sql = index.create_sql.clone();
159 let plan_result = self
160 .catalog_mut()
161 .deserialize_plan_with_enable_for_item_parsing(&create_sql, true);
162 let (plan, resolved_ids) = return_if_err!(plan_result, ctx);
163
164 let plan::Plan::CreateIndex(plan) = plan else {
165 unreachable!() };
167
168 let broken = false;
171
172 let optimizer_trace = OptimizerTrace::new(stage.paths());
175
176 let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
177 broken,
178 config,
179 format,
180 stage,
181 replan: Some(id),
182 desc: None,
183 optimizer_trace,
184 });
185 let stage = return_if_err!(
186 self.create_index_validate(ctx.session(), plan, resolved_ids, explain_ctx),
187 ctx
188 );
189 self.sequence_staged(ctx, Span::current(), stage).await;
190 }
191
192 #[instrument]
193 pub(crate) fn explain_index(
194 &self,
195 ctx: &ExecuteContext,
196 plan::ExplainPlanPlan {
197 stage,
198 format,
199 config,
200 explainee,
201 }: plan::ExplainPlanPlan,
202 ) -> Result<ExecuteResponse, AdapterError> {
203 let plan::Explainee::Index(id) = explainee else {
204 unreachable!() };
206 let CatalogItem::Index(index) = self.catalog().get_entry(&id).item() else {
207 unreachable!() };
209
210 let Some(dataflow_metainfo) = self.catalog().try_get_dataflow_metainfo(&index.global_id())
211 else {
212 if !id.is_system() {
213 tracing::error!("cannot find dataflow metainformation for index {id} in catalog");
214 }
215 coord_bail!("cannot find dataflow metainformation for index {id} in catalog");
216 };
217
218 let target_cluster = self.catalog().get_cluster(index.cluster_id);
219
220 let features = OptimizerFeatures::from(self.catalog().system_config())
221 .override_from(&target_cluster.config.features())
222 .override_from(&config.features);
223
224 let cardinality_stats = BTreeMap::new();
226
227 let explain = match stage {
228 ExplainStage::GlobalPlan => {
229 let Some(plan) = self
230 .catalog()
231 .try_get_optimized_plan(&index.global_id())
232 .cloned()
233 else {
234 tracing::error!("cannot find {stage} for index {id} in catalog");
235 coord_bail!("cannot find {stage} for index in catalog");
236 };
237
238 explain_dataflow(
239 plan,
240 format,
241 &config,
242 &features,
243 &self.catalog().for_session(ctx.session()),
244 cardinality_stats,
245 Some(target_cluster.name.as_str()),
246 dataflow_metainfo,
247 )?
248 }
249 ExplainStage::PhysicalPlan => {
250 let Some(plan) = self
251 .catalog()
252 .try_get_physical_plan(&index.global_id())
253 .cloned()
254 else {
255 tracing::error!("cannot find {stage} for index {id} in catalog");
256 coord_bail!("cannot find {stage} for index in catalog");
257 };
258 explain_dataflow(
259 plan,
260 format,
261 &config,
262 &features,
263 &self.catalog().for_session(ctx.session()),
264 cardinality_stats,
265 Some(target_cluster.name.as_str()),
266 dataflow_metainfo,
267 )?
268 }
269 _ => {
270 coord_bail!("cannot EXPLAIN {} FOR INDEX", stage);
271 }
272 };
273
274 let row = Row::pack_slice(&[Datum::from(explain.as_str())]);
275
276 Ok(Self::send_immediate_rows(row))
277 }
278
279 #[instrument]
282 fn create_index_validate(
283 &self,
284 session: &Session,
285 plan: plan::CreateIndexPlan,
286 resolved_ids: ResolvedIds,
287 explain_ctx: ExplainContext,
288 ) -> Result<CreateIndexStage, AdapterError> {
289 let validity = PlanValidity::new(
293 self.catalog().transient_revision(),
294 resolved_ids.items().copied().collect(),
295 Some(plan.index.cluster_id),
296 None,
297 session.role_metadata().clone(),
298 );
299 Ok(CreateIndexStage::Optimize(CreateIndexOptimize {
300 validity,
301 plan,
302 resolved_ids,
303 explain_ctx,
304 }))
305 }
306
307 #[instrument]
308 async fn create_index_optimize(
309 &mut self,
310 CreateIndexOptimize {
311 validity,
312 plan,
313 resolved_ids,
314 explain_ctx,
315 }: CreateIndexOptimize,
316 ) -> Result<StageResult<Box<CreateIndexStage>>, AdapterError> {
317 let plan::CreateIndexPlan {
318 index: plan::Index { cluster_id, .. },
319 ..
320 } = &plan;
321
322 let compute_instance = self
324 .instance_snapshot(*cluster_id)
325 .expect("compute instance does not exist");
326 let (item_id, global_id) = if let ExplainContext::None = explain_ctx {
327 self.allocate_user_id().await?
328 } else {
329 self.allocate_transient_id()
330 };
331
332 let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config())
333 .override_from(&self.catalog.get_cluster(*cluster_id).config.features())
334 .override_from(&explain_ctx);
335 let optimizer_features = optimizer_config.features.clone();
336
337 let mut optimizer = optimize::index::Optimizer::new(
339 self.owned_catalog(),
340 compute_instance,
341 global_id,
342 optimizer_config,
343 self.optimizer_metrics(),
344 );
345 let span = Span::current();
346 Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
347 || "optimize create index",
348 move || {
349 span.in_scope(|| {
350 let mut pipeline = || -> Result<(
351 optimize::index::GlobalMirPlan,
352 optimize::index::GlobalLirPlan,
353 ), AdapterError> {
354 let _dispatch_guard = explain_ctx.dispatch_guard();
355
356 let index_plan = optimize::index::Index::new(
357 plan.name.clone(),
358 plan.index.on,
359 plan.index.keys.clone(),
360 );
361
362 let global_mir_plan = optimizer.catch_unwind_optimize(index_plan)?;
364 let global_lir_plan = optimizer.catch_unwind_optimize(global_mir_plan.clone())?;
366
367 Ok((global_mir_plan, global_lir_plan))
368 };
369
370 let stage = match pipeline() {
371 Ok((global_mir_plan, global_lir_plan)) => {
372 if let ExplainContext::Plan(explain_ctx) = explain_ctx {
373 let (_, df_meta) = global_lir_plan.unapply();
374 CreateIndexStage::Explain(CreateIndexExplain {
375 validity,
376 exported_index_id: global_id,
377 plan,
378 df_meta,
379 explain_ctx,
380 })
381 } else {
382 CreateIndexStage::Finish(CreateIndexFinish {
383 validity,
384 item_id,
385 global_id,
386 plan,
387 resolved_ids,
388 global_mir_plan,
389 global_lir_plan,
390 optimizer_features,
391 })
392 }
393 }
394 Err(err) => {
397 let ExplainContext::Plan(explain_ctx) = explain_ctx else {
398 return Err(err);
400 };
401
402 if explain_ctx.broken {
403 tracing::error!("error while handling EXPLAIN statement: {}", err);
407 CreateIndexStage::Explain(CreateIndexExplain {
408 validity,
409 exported_index_id: global_id,
410 plan,
411 df_meta: Default::default(),
412 explain_ctx,
413 })
414 } else {
415 return Err(err);
417 }
418 }
419 };
420 Ok(Box::new(stage))
421 })
422 },
423 )))
424 }
425
426 #[instrument]
427 async fn create_index_finish(
428 &mut self,
429 ctx: &mut ExecuteContext,
430 stage: CreateIndexFinish,
431 ) -> Result<StageResult<Box<CreateIndexStage>>, AdapterError> {
432 let CreateIndexFinish {
433 item_id,
434 global_id,
435 plan:
436 plan::CreateIndexPlan {
437 name,
438 index:
439 plan::Index {
440 create_sql,
441 on,
442 keys,
443 cluster_id,
444 compaction_window,
445 },
446 if_not_exists,
447 },
448 resolved_ids,
449 global_mir_plan,
450 global_lir_plan,
451 optimizer_features,
452 ..
453 } = stage;
454 let id_bundle = dataflow_import_id_bundle(global_lir_plan.df_desc(), cluster_id);
455
456 let on_entry = self.catalog().get_entry_by_global_id(&on);
457 let owner_id = *on_entry.owner_id();
458
459 let ops = vec![catalog::Op::CreateItem {
460 id: item_id,
461 name: name.clone(),
462 item: CatalogItem::Index(Index {
463 create_sql,
464 global_id,
465 keys: keys.into(),
466 on,
467 conn_id: None,
468 resolved_ids,
469 cluster_id,
470 is_retained_metrics_object: false,
471 custom_logical_compaction_window: compaction_window,
472 optimized_plan: None,
473 physical_plan: None,
474 dataflow_metainfo: None,
475 }),
476 owner_id,
477 }];
478
479 let notice_ids = std::iter::repeat_with(|| self.allocate_transient_id())
481 .map(|(_item_id, global_id)| global_id)
482 .take(global_lir_plan.df_meta().optimizer_notices.len())
483 .collect::<Vec<_>>();
484
485 let (mut df_desc, raw_df_meta) = global_lir_plan.unapply();
499 let df_meta = {
500 let system_catalog = self.catalog().for_system_session();
501 let full_name = self.catalog().resolve_full_name(&name, None);
502 let on_desc = on_entry
503 .relation_desc()
504 .expect("can only create indexes on items with a valid description");
505 let transient_items = btreemap! {
506 global_id => TransientItem::new(
507 Some(full_name.into_parts()),
508 Some(on_desc.iter_names().map(|c| c.to_string()).collect()),
509 )
510 };
511 let humanizer = ExprHumanizerExt::new(transient_items, &system_catalog);
512 CatalogState::render_notices_core(
513 &humanizer,
514 (self.catalog().config().now)(),
515 &raw_df_meta,
516 notice_ids,
517 Some(global_id),
518 )
519 };
520
521 self.catalog()
526 .cache_expressions(
527 global_id,
528 None,
529 global_mir_plan.df_desc().clone(),
530 df_desc.clone(),
531 df_meta.clone(),
532 optimizer_features,
533 )
534 .await;
535
536 let transact_result = self
537 .catalog_transact_with_side_effects(Some(ctx), ops, move |coord, _ctx| {
538 Box::pin(async move {
539 coord
541 .catalog_mut()
542 .set_optimized_plan(global_id, global_mir_plan.df_desc().clone());
543 coord
544 .catalog_mut()
545 .set_physical_plan(global_id, df_desc.clone());
546
547 let notice_builtin_updates_fut =
548 coord.persist_dataflow_metainfo(df_meta, global_id).await;
549
550 let read_holds = coord.acquire_read_holds(&id_bundle);
558 let since = read_holds.least_valid_read();
559 df_desc.set_as_of(since);
560
561 coord
562 .ship_dataflow_and_notice_builtin_table_updates(
563 df_desc,
564 cluster_id,
565 notice_builtin_updates_fut,
566 None,
567 )
568 .await;
569 drop(read_holds);
574
575 coord.update_compute_read_policy(
576 cluster_id,
577 item_id,
578 compaction_window.unwrap_or_default().into(),
579 );
580 })
581 })
582 .await;
583
584 match transact_result {
585 Ok(_) => {
586 self.emit_raw_optimizer_notices_to_user(ctx, &raw_df_meta.optimizer_notices);
591 Ok(StageResult::Response(ExecuteResponse::CreatedIndex))
592 }
593 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
594 kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
595 })) if if_not_exists => {
596 ctx.session()
597 .add_notice(AdapterNotice::ObjectAlreadyExists {
598 name: name.item,
599 ty: "index",
600 });
601 Ok(StageResult::Response(ExecuteResponse::CreatedIndex))
602 }
603 Err(err) => Err(err),
604 }
605 }
606
607 #[instrument]
608 async fn create_index_explain(
609 &self,
610 session: &Session,
611 CreateIndexExplain {
612 exported_index_id,
613 plan: plan::CreateIndexPlan { name, index, .. },
614 df_meta,
615 explain_ctx:
616 ExplainPlanContext {
617 config,
618 format,
619 stage,
620 optimizer_trace,
621 ..
622 },
623 ..
624 }: CreateIndexExplain,
625 ) -> Result<StageResult<Box<CreateIndexStage>>, AdapterError> {
626 let session_catalog = self.catalog().for_session(session);
627 let expr_humanizer = {
628 let on_entry = self.catalog.get_entry_by_global_id(&index.on);
629 let full_name = self.catalog.resolve_full_name(&name, on_entry.conn_id());
630 let on_desc = on_entry
631 .relation_desc()
632 .expect("can only create indexes on items with a valid description");
633
634 let transient_items = btreemap! {
635 exported_index_id => TransientItem::new(
636 Some(full_name.into_parts()),
637 Some(on_desc.iter_names().map(|c| c.to_string()).collect()),
638 )
639 };
640 ExprHumanizerExt::new(transient_items, &session_catalog)
641 };
642
643 let target_cluster = self.catalog().get_cluster(index.cluster_id);
644
645 let features = OptimizerFeatures::from(self.catalog().system_config())
646 .override_from(&target_cluster.config.features())
647 .override_from(&config.features);
648
649 let rows = optimizer_trace
650 .into_rows(
651 format,
652 &config,
653 &features,
654 &expr_humanizer,
655 None,
656 Some(target_cluster),
657 df_meta,
658 stage,
659 plan::ExplaineeStatementKind::CreateIndex,
660 None,
661 )
662 .await?;
663
664 Ok(StageResult::Response(Self::send_immediate_rows(rows)))
665 }
666}