1use std::collections::BTreeMap;
11
12use maplit::btreemap;
13use mz_catalog::memory::error::ErrorKind;
14use mz_catalog::memory::objects::{CatalogItem, Index};
15use mz_ore::instrument;
16use mz_repr::explain::{ExprHumanizerExt, TransientItem};
17use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
18use mz_repr::{Datum, Row};
19use mz_sql::ast::ExplainStage;
20use mz_sql::catalog::CatalogError;
21use mz_sql::names::ResolvedIds;
22use mz_sql::plan;
23use tracing::Span;
24
25use crate::catalog::CatalogState;
26use crate::command::ExecuteResponse;
27use crate::coord::sequencer::inner::return_if_err;
28use crate::coord::{
29 Coordinator, CreateIndexExplain, CreateIndexFinish, CreateIndexOptimize, CreateIndexStage,
30 ExplainContext, ExplainPlanContext, Message, PlanValidity, StageResult, Staged,
31};
32use crate::error::AdapterError;
33use crate::explain::explain_dataflow;
34use crate::explain::optimizer_trace::OptimizerTrace;
35use crate::optimize::dataflows::dataflow_import_id_bundle;
36use crate::optimize::{self, Optimize};
37use crate::session::Session;
38use crate::{AdapterNotice, ExecuteContext, catalog};
39
40impl Staged for CreateIndexStage {
41 type Ctx = ExecuteContext;
42
43 fn validity(&mut self) -> &mut PlanValidity {
44 match self {
45 Self::Optimize(stage) => &mut stage.validity,
46 Self::Finish(stage) => &mut stage.validity,
47 Self::Explain(stage) => &mut stage.validity,
48 }
49 }
50
51 async fn stage(
52 self,
53 coord: &mut Coordinator,
54 ctx: &mut ExecuteContext,
55 ) -> Result<StageResult<Box<Self>>, AdapterError> {
56 match self {
57 CreateIndexStage::Optimize(stage) => coord.create_index_optimize(stage).await,
58 CreateIndexStage::Finish(stage) => coord.create_index_finish(ctx, stage).await,
59 CreateIndexStage::Explain(stage) => {
60 coord.create_index_explain(ctx.session(), stage).await
61 }
62 }
63 }
64
65 fn message(self, ctx: ExecuteContext, span: Span) -> Message {
66 Message::CreateIndexStageReady {
67 ctx,
68 span,
69 stage: self,
70 }
71 }
72
73 fn cancel_enabled(&self) -> bool {
74 true
75 }
76}
77
78impl Coordinator {
79 #[instrument]
80 pub(crate) async fn sequence_create_index(
81 &mut self,
82 ctx: ExecuteContext,
83 plan: plan::CreateIndexPlan,
84 resolved_ids: ResolvedIds,
85 ) {
86 let stage = return_if_err!(
87 self.create_index_validate(plan, resolved_ids, ExplainContext::None),
88 ctx
89 );
90 self.sequence_staged(ctx, Span::current(), stage).await;
91 }
92
93 #[instrument]
94 pub(crate) async fn explain_create_index(
95 &mut self,
96 ctx: ExecuteContext,
97 plan::ExplainPlanPlan {
98 stage,
99 format,
100 config,
101 explainee,
102 }: plan::ExplainPlanPlan,
103 ) {
104 let plan::Explainee::Statement(stmt) = explainee else {
105 unreachable!()
108 };
109 let plan::ExplaineeStatement::CreateIndex { broken, plan } = stmt else {
110 unreachable!()
113 };
114
115 let optimizer_trace = OptimizerTrace::new(stage.paths());
118
119 let resolved_ids = ResolvedIds::empty();
121
122 let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
123 broken,
124 config,
125 format,
126 stage,
127 replan: None,
128 desc: None,
129 optimizer_trace,
130 });
131 let stage = return_if_err!(
132 self.create_index_validate(plan, resolved_ids, explain_ctx),
133 ctx
134 );
135 self.sequence_staged(ctx, Span::current(), stage).await;
136 }
137
138 #[instrument]
139 pub(crate) async fn explain_replan_index(
140 &mut self,
141 ctx: ExecuteContext,
142 plan::ExplainPlanPlan {
143 stage,
144 format,
145 config,
146 explainee,
147 }: plan::ExplainPlanPlan,
148 ) {
149 let plan::Explainee::ReplanIndex(id) = explainee else {
150 unreachable!() };
152 let CatalogItem::Index(index) = self.catalog().get_entry(&id).item() else {
153 unreachable!() };
155 let id = index.global_id();
156
157 let create_sql = index.create_sql.clone();
158 let plan_result = self
159 .catalog_mut()
160 .deserialize_plan_with_enable_for_item_parsing(&create_sql, true);
161 let (plan, resolved_ids) = return_if_err!(plan_result, ctx);
162
163 let plan::Plan::CreateIndex(plan) = plan else {
164 unreachable!() };
166
167 let broken = false;
170
171 let optimizer_trace = OptimizerTrace::new(stage.paths());
174
175 let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
176 broken,
177 config,
178 format,
179 stage,
180 replan: Some(id),
181 desc: None,
182 optimizer_trace,
183 });
184 let stage = return_if_err!(
185 self.create_index_validate(plan, resolved_ids, explain_ctx),
186 ctx
187 );
188 self.sequence_staged(ctx, Span::current(), stage).await;
189 }
190
191 #[instrument]
192 pub(crate) fn explain_index(
193 &self,
194 ctx: &ExecuteContext,
195 plan::ExplainPlanPlan {
196 stage,
197 format,
198 config,
199 explainee,
200 }: plan::ExplainPlanPlan,
201 ) -> Result<ExecuteResponse, AdapterError> {
202 let plan::Explainee::Index(id) = explainee else {
203 unreachable!() };
205 let CatalogItem::Index(index) = self.catalog().get_entry(&id).item() else {
206 unreachable!() };
208
209 let Some(dataflow_metainfo) = self.catalog().try_get_dataflow_metainfo(&index.global_id())
210 else {
211 if !id.is_system() {
212 tracing::error!("cannot find dataflow metainformation for index {id} in catalog");
213 }
214 coord_bail!("cannot find dataflow metainformation for index {id} in catalog");
215 };
216
217 let target_cluster = self.catalog().get_cluster(index.cluster_id);
218
219 let features = OptimizerFeatures::from(self.catalog().system_config())
220 .override_from(&target_cluster.config.features())
221 .override_from(&config.features);
222
223 let cardinality_stats = BTreeMap::new();
225
226 let explain = match stage {
227 ExplainStage::GlobalPlan => {
228 let Some(plan) = self
229 .catalog()
230 .try_get_optimized_plan(&index.global_id())
231 .cloned()
232 else {
233 tracing::error!("cannot find {stage} for index {id} in catalog");
234 coord_bail!("cannot find {stage} for index in catalog");
235 };
236
237 explain_dataflow(
238 plan,
239 format,
240 &config,
241 &features,
242 &self.catalog().for_session(ctx.session()),
243 cardinality_stats,
244 Some(target_cluster.name.as_str()),
245 dataflow_metainfo,
246 )?
247 }
248 ExplainStage::PhysicalPlan => {
249 let Some(plan) = self
250 .catalog()
251 .try_get_physical_plan(&index.global_id())
252 .cloned()
253 else {
254 tracing::error!("cannot find {stage} for index {id} in catalog");
255 coord_bail!("cannot find {stage} for index in catalog");
256 };
257 explain_dataflow(
258 plan,
259 format,
260 &config,
261 &features,
262 &self.catalog().for_session(ctx.session()),
263 cardinality_stats,
264 Some(target_cluster.name.as_str()),
265 dataflow_metainfo,
266 )?
267 }
268 _ => {
269 coord_bail!("cannot EXPLAIN {} FOR INDEX", stage);
270 }
271 };
272
273 let row = Row::pack_slice(&[Datum::from(explain.as_str())]);
274
275 Ok(Self::send_immediate_rows(row))
276 }
277
278 #[instrument]
281 fn create_index_validate(
282 &self,
283 plan: plan::CreateIndexPlan,
284 resolved_ids: ResolvedIds,
285 explain_ctx: ExplainContext,
286 ) -> Result<CreateIndexStage, AdapterError> {
287 let validity =
288 PlanValidity::require_transient_revision(self.catalog().transient_revision());
289 Ok(CreateIndexStage::Optimize(CreateIndexOptimize {
290 validity,
291 plan,
292 resolved_ids,
293 explain_ctx,
294 }))
295 }
296
297 #[instrument]
298 async fn create_index_optimize(
299 &mut self,
300 CreateIndexOptimize {
301 validity,
302 plan,
303 resolved_ids,
304 explain_ctx,
305 }: CreateIndexOptimize,
306 ) -> Result<StageResult<Box<CreateIndexStage>>, AdapterError> {
307 let plan::CreateIndexPlan {
308 index: plan::Index { cluster_id, .. },
309 ..
310 } = &plan;
311
312 let compute_instance = self
314 .instance_snapshot(*cluster_id)
315 .expect("compute instance does not exist");
316 let (item_id, global_id) = if let ExplainContext::None = explain_ctx {
317 self.allocate_user_id().await?
318 } else {
319 self.allocate_transient_id()
320 };
321
322 let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config())
323 .override_from(&self.catalog.get_cluster(*cluster_id).config.features())
324 .override_from(&explain_ctx);
325 let optimizer_features = optimizer_config.features.clone();
326
327 let mut optimizer = optimize::index::Optimizer::new(
329 self.owned_catalog(),
330 compute_instance,
331 global_id,
332 optimizer_config,
333 self.optimizer_metrics(),
334 );
335 let span = Span::current();
336 Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
337 || "optimize create index",
338 move || {
339 span.in_scope(|| {
340 let mut pipeline = || -> Result<(
341 optimize::index::GlobalMirPlan,
342 optimize::index::GlobalLirPlan,
343 ), AdapterError> {
344 let _dispatch_guard = explain_ctx.dispatch_guard();
345
346 let index_plan = optimize::index::Index::new(
347 plan.name.clone(),
348 plan.index.on,
349 plan.index.keys.clone(),
350 );
351
352 let global_mir_plan = optimizer.catch_unwind_optimize(index_plan)?;
354 let global_lir_plan = optimizer.catch_unwind_optimize(global_mir_plan.clone())?;
356
357 Ok((global_mir_plan, global_lir_plan))
358 };
359
360 let stage = match pipeline() {
361 Ok((global_mir_plan, global_lir_plan)) => {
362 if let ExplainContext::Plan(explain_ctx) = explain_ctx {
363 let (_, df_meta) = global_lir_plan.unapply();
364 CreateIndexStage::Explain(CreateIndexExplain {
365 validity,
366 exported_index_id: global_id,
367 plan,
368 df_meta,
369 explain_ctx,
370 })
371 } else {
372 CreateIndexStage::Finish(CreateIndexFinish {
373 validity,
374 item_id,
375 global_id,
376 plan,
377 resolved_ids,
378 global_mir_plan,
379 global_lir_plan,
380 optimizer_features,
381 })
382 }
383 }
384 Err(err) => {
387 let ExplainContext::Plan(explain_ctx) = explain_ctx else {
388 return Err(err);
390 };
391
392 if explain_ctx.broken {
393 tracing::error!("error while handling EXPLAIN statement: {}", err);
397 CreateIndexStage::Explain(CreateIndexExplain {
398 validity,
399 exported_index_id: global_id,
400 plan,
401 df_meta: Default::default(),
402 explain_ctx,
403 })
404 } else {
405 return Err(err);
407 }
408 }
409 };
410 Ok(Box::new(stage))
411 })
412 },
413 )))
414 }
415
416 #[instrument]
417 async fn create_index_finish(
418 &mut self,
419 ctx: &mut ExecuteContext,
420 stage: CreateIndexFinish,
421 ) -> Result<StageResult<Box<CreateIndexStage>>, AdapterError> {
422 let CreateIndexFinish {
423 item_id,
424 global_id,
425 plan:
426 plan::CreateIndexPlan {
427 name,
428 index:
429 plan::Index {
430 create_sql,
431 on,
432 keys,
433 cluster_id,
434 compaction_window,
435 },
436 if_not_exists,
437 },
438 resolved_ids,
439 global_mir_plan,
440 global_lir_plan,
441 optimizer_features,
442 ..
443 } = stage;
444 let id_bundle = dataflow_import_id_bundle(global_lir_plan.df_desc(), cluster_id);
445
446 let on_entry = self.catalog().get_entry_by_global_id(&on);
447 let owner_id = *on_entry.owner_id();
448
449 let ops = vec![catalog::Op::CreateItem {
450 id: item_id,
451 name: name.clone(),
452 item: CatalogItem::Index(Index {
453 create_sql,
454 global_id,
455 keys: keys.into(),
456 on,
457 conn_id: None,
458 resolved_ids,
459 cluster_id,
460 is_retained_metrics_object: false,
461 custom_logical_compaction_window: compaction_window,
462 optimized_plan: None,
463 physical_plan: None,
464 dataflow_metainfo: None,
465 }),
466 owner_id,
467 }];
468
469 let notice_ids = std::iter::repeat_with(|| self.allocate_transient_id())
471 .map(|(_item_id, global_id)| global_id)
472 .take(global_lir_plan.df_meta().optimizer_notices.len())
473 .collect::<Vec<_>>();
474
475 let (mut df_desc, raw_df_meta) = global_lir_plan.unapply();
489 let df_meta = {
490 let system_catalog = self.catalog().for_system_session();
491 let full_name = self.catalog().resolve_full_name(&name, None);
492 let on_desc = on_entry
493 .relation_desc()
494 .expect("can only create indexes on items with a valid description");
495 let transient_items = btreemap! {
496 global_id => TransientItem::new(
497 Some(full_name.into_parts()),
498 Some(on_desc.iter_names().map(|c| c.to_string()).collect()),
499 )
500 };
501 let humanizer = ExprHumanizerExt::new(transient_items, &system_catalog);
502 CatalogState::render_notices_core(
503 &humanizer,
504 (self.catalog().config().now)(),
505 &raw_df_meta,
506 notice_ids,
507 Some(global_id),
508 )
509 };
510
511 self.catalog()
516 .cache_expressions(
517 global_id,
518 None,
519 global_mir_plan.df_desc().clone(),
520 df_desc.clone(),
521 df_meta.clone(),
522 optimizer_features,
523 )
524 .await;
525
526 let transact_result = self
527 .catalog_transact_with_side_effects(Some(ctx), ops, move |coord, _ctx| {
528 Box::pin(async move {
529 coord
531 .catalog_mut()
532 .set_optimized_plan(global_id, global_mir_plan.df_desc().clone());
533 coord
534 .catalog_mut()
535 .set_physical_plan(global_id, df_desc.clone());
536
537 let notice_builtin_updates_fut =
538 coord.persist_dataflow_metainfo(df_meta, global_id).await;
539
540 let read_holds = coord.acquire_read_holds(&id_bundle);
548 let since = read_holds.least_valid_read();
549 df_desc.set_as_of(since);
550
551 coord
552 .ship_dataflow_and_notice_builtin_table_updates(
553 df_desc,
554 cluster_id,
555 notice_builtin_updates_fut,
556 None,
557 )
558 .await;
559 drop(read_holds);
564
565 coord.update_compute_read_policy(
566 cluster_id,
567 item_id,
568 compaction_window.unwrap_or_default().into(),
569 );
570 })
571 })
572 .await;
573
574 match transact_result {
575 Ok(_) => {
576 self.emit_raw_optimizer_notices_to_user(ctx, &raw_df_meta.optimizer_notices);
581 Ok(StageResult::Response(ExecuteResponse::CreatedIndex))
582 }
583 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
584 kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
585 })) if if_not_exists => {
586 ctx.session()
587 .add_notice(AdapterNotice::ObjectAlreadyExists {
588 name: name.item,
589 ty: "index",
590 });
591 Ok(StageResult::Response(ExecuteResponse::CreatedIndex))
592 }
593 Err(err) => Err(err),
594 }
595 }
596
597 #[instrument]
598 async fn create_index_explain(
599 &self,
600 session: &Session,
601 CreateIndexExplain {
602 exported_index_id,
603 plan: plan::CreateIndexPlan { name, index, .. },
604 df_meta,
605 explain_ctx:
606 ExplainPlanContext {
607 config,
608 format,
609 stage,
610 optimizer_trace,
611 ..
612 },
613 ..
614 }: CreateIndexExplain,
615 ) -> Result<StageResult<Box<CreateIndexStage>>, AdapterError> {
616 let session_catalog = self.catalog().for_session(session);
617 let expr_humanizer = {
618 let on_entry = self.catalog.get_entry_by_global_id(&index.on);
619 let full_name = self.catalog.resolve_full_name(&name, on_entry.conn_id());
620 let on_desc = on_entry
621 .relation_desc()
622 .expect("can only create indexes on items with a valid description");
623
624 let transient_items = btreemap! {
625 exported_index_id => TransientItem::new(
626 Some(full_name.into_parts()),
627 Some(on_desc.iter_names().map(|c| c.to_string()).collect()),
628 )
629 };
630 ExprHumanizerExt::new(transient_items, &session_catalog)
631 };
632
633 let target_cluster = self.catalog().get_cluster(index.cluster_id);
634
635 let features = OptimizerFeatures::from(self.catalog().system_config())
636 .override_from(&target_cluster.config.features())
637 .override_from(&config.features);
638
639 let rows = optimizer_trace
640 .into_rows(
641 format,
642 &config,
643 &features,
644 &expr_humanizer,
645 None,
646 Some(target_cluster),
647 df_meta,
648 stage,
649 plan::ExplaineeStatementKind::CreateIndex,
650 None,
651 )
652 .await?;
653
654 Ok(StageResult::Response(Self::send_immediate_rows(rows)))
655 }
656}