1use std::collections::BTreeMap;
11
12use maplit::btreemap;
13use mz_catalog::memory::error::ErrorKind;
14use mz_catalog::memory::objects::{CatalogItem, Index};
15use mz_ore::instrument;
16use mz_repr::explain::{ExprHumanizerExt, TransientItem};
17use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
18use mz_repr::{Datum, Row};
19use mz_sql::ast::ExplainStage;
20use mz_sql::catalog::CatalogError;
21use mz_sql::names::ResolvedIds;
22use mz_sql::plan;
23use tracing::Span;
24
25use crate::command::ExecuteResponse;
26use crate::coord::sequencer::inner::return_if_err;
27use crate::coord::{
28 Coordinator, CreateIndexExplain, CreateIndexFinish, CreateIndexOptimize, CreateIndexStage,
29 ExplainContext, ExplainPlanContext, Message, PlanValidity, StageResult, Staged,
30};
31use crate::error::AdapterError;
32use crate::explain::explain_dataflow;
33use crate::explain::optimizer_trace::OptimizerTrace;
34use crate::optimize::dataflows::dataflow_import_id_bundle;
35use crate::optimize::{self, Optimize};
36use crate::session::Session;
37use crate::{AdapterNotice, ExecuteContext, catalog};
38
39impl Staged for CreateIndexStage {
40 type Ctx = ExecuteContext;
41
42 fn validity(&mut self) -> &mut PlanValidity {
43 match self {
44 Self::Optimize(stage) => &mut stage.validity,
45 Self::Finish(stage) => &mut stage.validity,
46 Self::Explain(stage) => &mut stage.validity,
47 }
48 }
49
50 async fn stage(
51 self,
52 coord: &mut Coordinator,
53 ctx: &mut ExecuteContext,
54 ) -> Result<StageResult<Box<Self>>, AdapterError> {
55 match self {
56 CreateIndexStage::Optimize(stage) => coord.create_index_optimize(stage).await,
57 CreateIndexStage::Finish(stage) => coord.create_index_finish(ctx, stage).await,
58 CreateIndexStage::Explain(stage) => {
59 coord.create_index_explain(ctx.session(), stage).await
60 }
61 }
62 }
63
64 fn message(self, ctx: ExecuteContext, span: Span) -> Message {
65 Message::CreateIndexStageReady {
66 ctx,
67 span,
68 stage: self,
69 }
70 }
71
72 fn cancel_enabled(&self) -> bool {
73 true
74 }
75}
76
77impl Coordinator {
78 #[instrument]
79 pub(crate) async fn sequence_create_index(
80 &mut self,
81 ctx: ExecuteContext,
82 plan: plan::CreateIndexPlan,
83 resolved_ids: ResolvedIds,
84 ) {
85 let stage = return_if_err!(
86 self.create_index_validate(plan, resolved_ids, ExplainContext::None),
87 ctx
88 );
89 self.sequence_staged(ctx, Span::current(), stage).await;
90 }
91
92 #[instrument]
93 pub(crate) async fn explain_create_index(
94 &mut self,
95 ctx: ExecuteContext,
96 plan::ExplainPlanPlan {
97 stage,
98 format,
99 config,
100 explainee,
101 }: plan::ExplainPlanPlan,
102 ) {
103 let plan::Explainee::Statement(stmt) = explainee else {
104 unreachable!()
107 };
108 let plan::ExplaineeStatement::CreateIndex { broken, plan } = stmt else {
109 unreachable!()
112 };
113
114 let optimizer_trace = OptimizerTrace::new(stage.paths());
117
118 let resolved_ids = ResolvedIds::empty();
120
121 let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
122 broken,
123 config,
124 format,
125 stage,
126 replan: None,
127 desc: None,
128 optimizer_trace,
129 });
130 let stage = return_if_err!(
131 self.create_index_validate(plan, resolved_ids, explain_ctx),
132 ctx
133 );
134 self.sequence_staged(ctx, Span::current(), stage).await;
135 }
136
137 #[instrument]
138 pub(crate) async fn explain_replan_index(
139 &mut self,
140 ctx: ExecuteContext,
141 plan::ExplainPlanPlan {
142 stage,
143 format,
144 config,
145 explainee,
146 }: plan::ExplainPlanPlan,
147 ) {
148 let plan::Explainee::ReplanIndex(id) = explainee else {
149 unreachable!() };
151 let CatalogItem::Index(index) = self.catalog().get_entry(&id).item() else {
152 unreachable!() };
154 let id = index.global_id();
155
156 let create_sql = index.create_sql.clone();
157 let plan_result = self
158 .catalog_mut()
159 .deserialize_plan_with_enable_for_item_parsing(&create_sql, true);
160 let (plan, resolved_ids) = return_if_err!(plan_result, ctx);
161
162 let plan::Plan::CreateIndex(plan) = plan else {
163 unreachable!() };
165
166 let broken = false;
169
170 let optimizer_trace = OptimizerTrace::new(stage.paths());
173
174 let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
175 broken,
176 config,
177 format,
178 stage,
179 replan: Some(id),
180 desc: None,
181 optimizer_trace,
182 });
183 let stage = return_if_err!(
184 self.create_index_validate(plan, resolved_ids, explain_ctx),
185 ctx
186 );
187 self.sequence_staged(ctx, Span::current(), stage).await;
188 }
189
190 #[instrument]
191 pub(crate) fn explain_index(
192 &self,
193 ctx: &ExecuteContext,
194 plan::ExplainPlanPlan {
195 stage,
196 format,
197 config,
198 explainee,
199 }: plan::ExplainPlanPlan,
200 ) -> Result<ExecuteResponse, AdapterError> {
201 let plan::Explainee::Index(id) = explainee else {
202 unreachable!() };
204 let CatalogItem::Index(index) = self.catalog().get_entry(&id).item() else {
205 unreachable!() };
207
208 let Some(dataflow_metainfo) = self.catalog().try_get_dataflow_metainfo(&index.global_id())
209 else {
210 if !id.is_system() {
211 tracing::error!("cannot find dataflow metainformation for index {id} in catalog");
212 }
213 coord_bail!("cannot find dataflow metainformation for index {id} in catalog");
214 };
215
216 let target_cluster = self.catalog().get_cluster(index.cluster_id);
217
218 let features = OptimizerFeatures::from(self.catalog().system_config())
219 .override_from(&target_cluster.config.features())
220 .override_from(&config.features);
221
222 let cardinality_stats = BTreeMap::new();
224
225 let explain = match stage {
226 ExplainStage::GlobalPlan => {
227 let Some(plan) = self
228 .catalog()
229 .try_get_optimized_plan(&index.global_id())
230 .cloned()
231 else {
232 tracing::error!("cannot find {stage} for index {id} in catalog");
233 coord_bail!("cannot find {stage} for index in catalog");
234 };
235
236 explain_dataflow(
237 plan,
238 format,
239 &config,
240 &features,
241 &self.catalog().for_session(ctx.session()),
242 cardinality_stats,
243 Some(target_cluster.name.as_str()),
244 dataflow_metainfo,
245 )?
246 }
247 ExplainStage::PhysicalPlan => {
248 let Some(plan) = self
249 .catalog()
250 .try_get_physical_plan(&index.global_id())
251 .cloned()
252 else {
253 tracing::error!("cannot find {stage} for index {id} in catalog");
254 coord_bail!("cannot find {stage} for index in catalog");
255 };
256 explain_dataflow(
257 plan,
258 format,
259 &config,
260 &features,
261 &self.catalog().for_session(ctx.session()),
262 cardinality_stats,
263 Some(target_cluster.name.as_str()),
264 dataflow_metainfo,
265 )?
266 }
267 _ => {
268 coord_bail!("cannot EXPLAIN {} FOR INDEX", stage);
269 }
270 };
271
272 let row = Row::pack_slice(&[Datum::from(explain.as_str())]);
273
274 Ok(Self::send_immediate_rows(row))
275 }
276
277 #[instrument]
280 fn create_index_validate(
281 &self,
282 plan: plan::CreateIndexPlan,
283 resolved_ids: ResolvedIds,
284 explain_ctx: ExplainContext,
285 ) -> Result<CreateIndexStage, AdapterError> {
286 let validity =
287 PlanValidity::require_transient_revision(self.catalog().transient_revision());
288 Ok(CreateIndexStage::Optimize(CreateIndexOptimize {
289 validity,
290 plan,
291 resolved_ids,
292 explain_ctx,
293 }))
294 }
295
296 #[instrument]
297 async fn create_index_optimize(
298 &mut self,
299 CreateIndexOptimize {
300 validity,
301 plan,
302 resolved_ids,
303 explain_ctx,
304 }: CreateIndexOptimize,
305 ) -> Result<StageResult<Box<CreateIndexStage>>, AdapterError> {
306 let plan::CreateIndexPlan {
307 index: plan::Index { cluster_id, .. },
308 ..
309 } = &plan;
310
311 let compute_instance = self
313 .instance_snapshot(*cluster_id)
314 .expect("compute instance does not exist");
315 let (item_id, global_id) = if let ExplainContext::None = explain_ctx {
316 let id_ts = self.get_catalog_write_ts().await;
317 self.catalog().allocate_user_id(id_ts).await?
318 } else {
319 self.allocate_transient_id()
320 };
321
322 let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config())
323 .override_from(&self.catalog.get_cluster(*cluster_id).config.features())
324 .override_from(&explain_ctx);
325
326 let mut optimizer = optimize::index::Optimizer::new(
328 self.owned_catalog(),
329 compute_instance,
330 global_id,
331 optimizer_config,
332 self.optimizer_metrics(),
333 );
334 let span = Span::current();
335 Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
336 || "optimize create index",
337 move || {
338 span.in_scope(|| {
339 let mut pipeline = || -> Result<(
340 optimize::index::GlobalMirPlan,
341 optimize::index::GlobalLirPlan,
342 ), AdapterError> {
343 let _dispatch_guard = explain_ctx.dispatch_guard();
344
345 let index_plan = optimize::index::Index::new(
346 plan.name.clone(),
347 plan.index.on,
348 plan.index.keys.clone(),
349 );
350
351 let global_mir_plan = optimizer.catch_unwind_optimize(index_plan)?;
353 let global_lir_plan = optimizer.catch_unwind_optimize(global_mir_plan.clone())?;
355
356 Ok((global_mir_plan, global_lir_plan))
357 };
358
359 let stage = match pipeline() {
360 Ok((global_mir_plan, global_lir_plan)) => {
361 if let ExplainContext::Plan(explain_ctx) = explain_ctx {
362 let (_, df_meta) = global_lir_plan.unapply();
363 CreateIndexStage::Explain(CreateIndexExplain {
364 validity,
365 exported_index_id: global_id,
366 plan,
367 df_meta,
368 explain_ctx,
369 })
370 } else {
371 CreateIndexStage::Finish(CreateIndexFinish {
372 validity,
373 item_id,
374 global_id,
375 plan,
376 resolved_ids,
377 global_mir_plan,
378 global_lir_plan,
379 })
380 }
381 }
382 Err(err) => {
385 let ExplainContext::Plan(explain_ctx) = explain_ctx else {
386 return Err(err);
388 };
389
390 if explain_ctx.broken {
391 tracing::error!("error while handling EXPLAIN statement: {}", err);
395 CreateIndexStage::Explain(CreateIndexExplain {
396 validity,
397 exported_index_id: global_id,
398 plan,
399 df_meta: Default::default(),
400 explain_ctx,
401 })
402 } else {
403 return Err(err);
405 }
406 }
407 };
408 Ok(Box::new(stage))
409 })
410 },
411 )))
412 }
413
414 #[instrument]
415 async fn create_index_finish(
416 &mut self,
417 ctx: &mut ExecuteContext,
418 stage: CreateIndexFinish,
419 ) -> Result<StageResult<Box<CreateIndexStage>>, AdapterError> {
420 let CreateIndexFinish {
421 item_id,
422 global_id,
423 plan:
424 plan::CreateIndexPlan {
425 name,
426 index:
427 plan::Index {
428 create_sql,
429 on,
430 keys,
431 cluster_id,
432 compaction_window,
433 },
434 if_not_exists,
435 },
436 resolved_ids,
437 global_mir_plan,
438 global_lir_plan,
439 ..
440 } = stage;
441 let id_bundle = dataflow_import_id_bundle(global_lir_plan.df_desc(), cluster_id);
442
443 let ops = vec![catalog::Op::CreateItem {
444 id: item_id,
445 name: name.clone(),
446 item: CatalogItem::Index(Index {
447 create_sql,
448 global_id,
449 keys: keys.into(),
450 on,
451 conn_id: None,
452 resolved_ids,
453 cluster_id,
454 is_retained_metrics_object: false,
455 custom_logical_compaction_window: compaction_window,
456 }),
457 owner_id: *self.catalog().get_entry_by_global_id(&on).owner_id(),
458 }];
459
460 let notice_ids = std::iter::repeat_with(|| self.allocate_transient_id())
462 .map(|(_item_id, global_id)| global_id)
463 .take(global_lir_plan.df_meta().optimizer_notices.len())
464 .collect::<Vec<_>>();
465
466 let transact_result = self
467 .catalog_transact_with_side_effects(Some(ctx), ops, move |coord, ctx| {
468 Box::pin(async move {
469 let (mut df_desc, df_meta) = global_lir_plan.unapply();
470
471 coord
473 .catalog_mut()
474 .set_optimized_plan(global_id, global_mir_plan.df_desc().clone());
475 coord
476 .catalog_mut()
477 .set_physical_plan(global_id, df_desc.clone());
478
479 let notice_builtin_updates_fut = coord
480 .process_dataflow_metainfo(df_meta, global_id, ctx, notice_ids)
481 .await;
482
483 let read_holds = coord.acquire_read_holds(&id_bundle);
491 let since = read_holds.least_valid_read();
492 df_desc.set_as_of(since);
493
494 coord
495 .ship_dataflow_and_notice_builtin_table_updates(
496 df_desc,
497 cluster_id,
498 notice_builtin_updates_fut,
499 )
500 .await;
501 drop(read_holds);
506
507 coord.update_compute_read_policy(
508 cluster_id,
509 item_id,
510 compaction_window.unwrap_or_default().into(),
511 );
512 })
513 })
514 .await;
515
516 match transact_result {
517 Ok(_) => Ok(StageResult::Response(ExecuteResponse::CreatedIndex)),
518 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
519 kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
520 })) if if_not_exists => {
521 ctx.session()
522 .add_notice(AdapterNotice::ObjectAlreadyExists {
523 name: name.item,
524 ty: "index",
525 });
526 Ok(StageResult::Response(ExecuteResponse::CreatedIndex))
527 }
528 Err(err) => Err(err),
529 }
530 }
531
532 #[instrument]
533 async fn create_index_explain(
534 &self,
535 session: &Session,
536 CreateIndexExplain {
537 exported_index_id,
538 plan: plan::CreateIndexPlan { name, index, .. },
539 df_meta,
540 explain_ctx:
541 ExplainPlanContext {
542 config,
543 format,
544 stage,
545 optimizer_trace,
546 ..
547 },
548 ..
549 }: CreateIndexExplain,
550 ) -> Result<StageResult<Box<CreateIndexStage>>, AdapterError> {
551 let session_catalog = self.catalog().for_session(session);
552 let expr_humanizer = {
553 let on_entry = self.catalog.get_entry_by_global_id(&index.on);
554 let full_name = self.catalog.resolve_full_name(&name, on_entry.conn_id());
555 let on_desc = on_entry
556 .relation_desc()
557 .expect("can only create indexes on items with a valid description");
558
559 let transient_items = btreemap! {
560 exported_index_id => TransientItem::new(
561 Some(full_name.into_parts()),
562 Some(on_desc.iter_names().map(|c| c.to_string()).collect()),
563 )
564 };
565 ExprHumanizerExt::new(transient_items, &session_catalog)
566 };
567
568 let target_cluster = self.catalog().get_cluster(index.cluster_id);
569
570 let features = OptimizerFeatures::from(self.catalog().system_config())
571 .override_from(&target_cluster.config.features())
572 .override_from(&config.features);
573
574 let rows = optimizer_trace
575 .into_rows(
576 format,
577 &config,
578 &features,
579 &expr_humanizer,
580 None,
581 Some(target_cluster),
582 df_meta,
583 stage,
584 plan::ExplaineeStatementKind::CreateIndex,
585 None,
586 )
587 .await?;
588
589 Ok(StageResult::Response(Self::send_immediate_rows(rows)))
590 }
591}