1use std::collections::BTreeMap;
11
12use maplit::btreemap;
13use mz_catalog::memory::objects::{CatalogItem, Index};
14use mz_ore::instrument;
15use mz_repr::explain::{ExprHumanizerExt, TransientItem};
16use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
17use mz_repr::{Datum, Row};
18use mz_sql::ast::ExplainStage;
19use mz_sql::catalog::CatalogError;
20use mz_sql::names::ResolvedIds;
21use mz_sql::plan;
22use tracing::Span;
23
24use crate::command::ExecuteResponse;
25use crate::coord::sequencer::inner::return_if_err;
26use crate::coord::{
27 Coordinator, CreateIndexExplain, CreateIndexFinish, CreateIndexOptimize, CreateIndexStage,
28 ExplainContext, ExplainPlanContext, Message, PlanValidity, StageResult, Staged,
29};
30use crate::error::AdapterError;
31use crate::explain::explain_dataflow;
32use crate::explain::optimizer_trace::OptimizerTrace;
33use crate::optimize::dataflows::dataflow_import_id_bundle;
34use crate::optimize::{self, Optimize};
35use crate::session::Session;
36use crate::{AdapterNotice, ExecuteContext, catalog};
37
38impl Staged for CreateIndexStage {
39 type Ctx = ExecuteContext;
40
41 fn validity(&mut self) -> &mut PlanValidity {
42 match self {
43 Self::Optimize(stage) => &mut stage.validity,
44 Self::Finish(stage) => &mut stage.validity,
45 Self::Explain(stage) => &mut stage.validity,
46 }
47 }
48
49 async fn stage(
50 self,
51 coord: &mut Coordinator,
52 ctx: &mut ExecuteContext,
53 ) -> Result<StageResult<Box<Self>>, AdapterError> {
54 match self {
55 CreateIndexStage::Optimize(stage) => coord.create_index_optimize(stage).await,
56 CreateIndexStage::Finish(stage) => coord.create_index_finish(ctx, stage).await,
57 CreateIndexStage::Explain(stage) => {
58 coord.create_index_explain(ctx.session(), stage).await
59 }
60 }
61 }
62
63 fn message(self, ctx: ExecuteContext, span: Span) -> Message {
64 Message::CreateIndexStageReady {
65 ctx,
66 span,
67 stage: self,
68 }
69 }
70
71 fn cancel_enabled(&self) -> bool {
72 true
73 }
74}
75
76impl Coordinator {
77 #[instrument]
78 pub(crate) async fn sequence_create_index(
79 &mut self,
80 ctx: ExecuteContext,
81 plan: plan::CreateIndexPlan,
82 resolved_ids: ResolvedIds,
83 ) {
84 let stage = return_if_err!(
85 self.create_index_validate(plan, resolved_ids, ExplainContext::None),
86 ctx
87 );
88 self.sequence_staged(ctx, Span::current(), stage).await;
89 }
90
91 #[instrument]
92 pub(crate) async fn explain_create_index(
93 &mut self,
94 ctx: ExecuteContext,
95 plan::ExplainPlanPlan {
96 stage,
97 format,
98 config,
99 explainee,
100 }: plan::ExplainPlanPlan,
101 ) {
102 let plan::Explainee::Statement(stmt) = explainee else {
103 unreachable!()
106 };
107 let plan::ExplaineeStatement::CreateIndex { broken, plan } = stmt else {
108 unreachable!()
111 };
112
113 let optimizer_trace = OptimizerTrace::new(stage.paths());
116
117 let resolved_ids = ResolvedIds::empty();
119
120 let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
121 broken,
122 config,
123 format,
124 stage,
125 replan: None,
126 desc: None,
127 optimizer_trace,
128 });
129 let stage = return_if_err!(
130 self.create_index_validate(plan, resolved_ids, explain_ctx),
131 ctx
132 );
133 self.sequence_staged(ctx, Span::current(), stage).await;
134 }
135
136 #[instrument]
137 pub(crate) async fn explain_replan_index(
138 &mut self,
139 ctx: ExecuteContext,
140 plan::ExplainPlanPlan {
141 stage,
142 format,
143 config,
144 explainee,
145 }: plan::ExplainPlanPlan,
146 ) {
147 let plan::Explainee::ReplanIndex(id) = explainee else {
148 unreachable!() };
150 let CatalogItem::Index(index) = self.catalog().get_entry(&id).item() else {
151 unreachable!() };
153 let id = index.global_id();
154
155 let create_sql = index.create_sql.clone();
156 let plan_result = self
157 .catalog_mut()
158 .deserialize_plan_with_enable_for_item_parsing(&create_sql, true);
159 let (plan, resolved_ids) = return_if_err!(plan_result, ctx);
160
161 let plan::Plan::CreateIndex(plan) = plan else {
162 unreachable!() };
164
165 let broken = false;
168
169 let optimizer_trace = OptimizerTrace::new(stage.paths());
172
173 let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
174 broken,
175 config,
176 format,
177 stage,
178 replan: Some(id),
179 desc: None,
180 optimizer_trace,
181 });
182 let stage = return_if_err!(
183 self.create_index_validate(plan, resolved_ids, explain_ctx),
184 ctx
185 );
186 self.sequence_staged(ctx, Span::current(), stage).await;
187 }
188
189 #[instrument]
190 pub(crate) fn explain_index(
191 &mut self,
192 ctx: &ExecuteContext,
193 plan::ExplainPlanPlan {
194 stage,
195 format,
196 config,
197 explainee,
198 }: plan::ExplainPlanPlan,
199 ) -> Result<ExecuteResponse, AdapterError> {
200 let plan::Explainee::Index(id) = explainee else {
201 unreachable!() };
203 let CatalogItem::Index(index) = self.catalog().get_entry(&id).item() else {
204 unreachable!() };
206
207 let Some(dataflow_metainfo) = self.catalog().try_get_dataflow_metainfo(&index.global_id())
208 else {
209 if !id.is_system() {
210 tracing::error!("cannot find dataflow metainformation for index {id} in catalog");
211 }
212 coord_bail!("cannot find dataflow metainformation for index {id} in catalog");
213 };
214
215 let target_cluster = self.catalog().get_cluster(index.cluster_id);
216
217 let features = OptimizerFeatures::from(self.catalog().system_config())
218 .override_from(&target_cluster.config.features())
219 .override_from(&config.features);
220
221 let cardinality_stats = BTreeMap::new();
223
224 let explain = match stage {
225 ExplainStage::GlobalPlan => {
226 let Some(plan) = self
227 .catalog()
228 .try_get_optimized_plan(&index.global_id())
229 .cloned()
230 else {
231 tracing::error!("cannot find {stage} for index {id} in catalog");
232 coord_bail!("cannot find {stage} for index in catalog");
233 };
234
235 explain_dataflow(
236 plan,
237 format,
238 &config,
239 &features,
240 &self.catalog().for_session(ctx.session()),
241 cardinality_stats,
242 Some(target_cluster.name.as_str()),
243 dataflow_metainfo,
244 )?
245 }
246 ExplainStage::PhysicalPlan => {
247 let Some(plan) = self
248 .catalog()
249 .try_get_physical_plan(&index.global_id())
250 .cloned()
251 else {
252 tracing::error!("cannot find {stage} for index {id} in catalog");
253 coord_bail!("cannot find {stage} for index in catalog");
254 };
255 explain_dataflow(
256 plan,
257 format,
258 &config,
259 &features,
260 &self.catalog().for_session(ctx.session()),
261 cardinality_stats,
262 Some(target_cluster.name.as_str()),
263 dataflow_metainfo,
264 )?
265 }
266 _ => {
267 coord_bail!("cannot EXPLAIN {} FOR INDEX", stage);
268 }
269 };
270
271 let row = Row::pack_slice(&[Datum::from(explain.as_str())]);
272
273 Ok(Self::send_immediate_rows(row))
274 }
275
276 #[instrument]
279 fn create_index_validate(
280 &mut self,
281 plan: plan::CreateIndexPlan,
282 resolved_ids: ResolvedIds,
283 explain_ctx: ExplainContext,
284 ) -> Result<CreateIndexStage, AdapterError> {
285 let validity =
286 PlanValidity::require_transient_revision(self.catalog().transient_revision());
287 Ok(CreateIndexStage::Optimize(CreateIndexOptimize {
288 validity,
289 plan,
290 resolved_ids,
291 explain_ctx,
292 }))
293 }
294
295 #[instrument]
296 async fn create_index_optimize(
297 &mut self,
298 CreateIndexOptimize {
299 validity,
300 plan,
301 resolved_ids,
302 explain_ctx,
303 }: CreateIndexOptimize,
304 ) -> Result<StageResult<Box<CreateIndexStage>>, AdapterError> {
305 let plan::CreateIndexPlan {
306 index: plan::Index { cluster_id, .. },
307 ..
308 } = &plan;
309
310 let compute_instance = self
312 .instance_snapshot(*cluster_id)
313 .expect("compute instance does not exist");
314 let (item_id, global_id) = if let ExplainContext::None = explain_ctx {
315 let id_ts = self.get_catalog_write_ts().await;
316 self.catalog_mut().allocate_user_id(id_ts).await?
317 } else {
318 self.allocate_transient_id()
319 };
320
321 let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config())
322 .override_from(&self.catalog.get_cluster(*cluster_id).config.features())
323 .override_from(&explain_ctx);
324
325 let mut optimizer = optimize::index::Optimizer::new(
327 self.owned_catalog(),
328 compute_instance,
329 global_id,
330 optimizer_config,
331 self.optimizer_metrics(),
332 );
333 let span = Span::current();
334 Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
335 || "optimize create index",
336 move || {
337 span.in_scope(|| {
338 let mut pipeline = || -> Result<(
339 optimize::index::GlobalMirPlan,
340 optimize::index::GlobalLirPlan,
341 ), AdapterError> {
342 let _dispatch_guard = explain_ctx.dispatch_guard();
343
344 let index_plan =
345 optimize::index::Index::new(plan.name.clone(), plan.index.on, plan.index.keys.clone());
346
347 let global_mir_plan = optimizer.catch_unwind_optimize(index_plan)?;
349 let global_lir_plan = optimizer.catch_unwind_optimize(global_mir_plan.clone())?;
351
352 Ok((global_mir_plan, global_lir_plan))
353 };
354
355 let stage = match pipeline() {
356 Ok((global_mir_plan, global_lir_plan)) => {
357 if let ExplainContext::Plan(explain_ctx) = explain_ctx {
358 let (_, df_meta) = global_lir_plan.unapply();
359 CreateIndexStage::Explain(CreateIndexExplain {
360 validity,
361 exported_index_id: global_id,
362 plan,
363 df_meta,
364 explain_ctx,
365 })
366 } else {
367 CreateIndexStage::Finish(CreateIndexFinish {
368 validity,
369 item_id,
370 global_id,
371 plan,
372 resolved_ids,
373 global_mir_plan,
374 global_lir_plan,
375 })
376 }
377 }
378 Err(err) => {
381 let ExplainContext::Plan(explain_ctx) = explain_ctx else {
382 return Err(err);
384 };
385
386 if explain_ctx.broken {
387 tracing::error!("error while handling EXPLAIN statement: {}", err);
391 CreateIndexStage::Explain(CreateIndexExplain {
392 validity,
393 exported_index_id: global_id,
394 plan,
395 df_meta: Default::default(),
396 explain_ctx,
397 })
398 } else {
399 return Err(err);
401 }
402 }
403 };
404 Ok(Box::new(stage))
405 })
406 },
407 )))
408 }
409
410 #[instrument]
411 async fn create_index_finish(
412 &mut self,
413 ctx: &mut ExecuteContext,
414 stage: CreateIndexFinish,
415 ) -> Result<StageResult<Box<CreateIndexStage>>, AdapterError> {
416 let CreateIndexFinish {
417 item_id,
418 global_id,
419 plan:
420 plan::CreateIndexPlan {
421 name,
422 index:
423 plan::Index {
424 create_sql,
425 on,
426 keys,
427 cluster_id,
428 compaction_window,
429 },
430 if_not_exists,
431 },
432 resolved_ids,
433 global_mir_plan,
434 global_lir_plan,
435 ..
436 } = stage;
437 let id_bundle = dataflow_import_id_bundle(global_lir_plan.df_desc(), cluster_id);
438
439 let ops = vec![catalog::Op::CreateItem {
440 id: item_id,
441 name: name.clone(),
442 item: CatalogItem::Index(Index {
443 create_sql,
444 global_id,
445 keys: keys.into(),
446 on,
447 conn_id: None,
448 resolved_ids,
449 cluster_id,
450 is_retained_metrics_object: false,
451 custom_logical_compaction_window: compaction_window,
452 }),
453 owner_id: *self.catalog().get_entry_by_global_id(&on).owner_id(),
454 }];
455
456 let notice_ids = std::iter::repeat_with(|| self.allocate_transient_id())
458 .map(|(_item_id, global_id)| global_id)
459 .take(global_lir_plan.df_meta().optimizer_notices.len())
460 .collect::<Vec<_>>();
461
462 let transact_result = self
463 .catalog_transact_with_side_effects(Some(ctx), ops, move |coord, ctx| {
464 Box::pin(async move {
465 let (mut df_desc, df_meta) = global_lir_plan.unapply();
466
467 coord
469 .catalog_mut()
470 .set_optimized_plan(global_id, global_mir_plan.df_desc().clone());
471 coord
472 .catalog_mut()
473 .set_physical_plan(global_id, df_desc.clone());
474
475 let notice_builtin_updates_fut = coord
476 .process_dataflow_metainfo(df_meta, global_id, ctx, notice_ids)
477 .await;
478
479 let read_holds = coord.acquire_read_holds(&id_bundle);
487 let since = read_holds.least_valid_read();
488 df_desc.set_as_of(since);
489
490 coord
491 .ship_dataflow_and_notice_builtin_table_updates(
492 df_desc,
493 cluster_id,
494 notice_builtin_updates_fut,
495 )
496 .await;
497
498 drop(read_holds);
501
502 coord.update_compute_read_policy(
503 cluster_id,
504 item_id,
505 compaction_window.unwrap_or_default().into(),
506 );
507 })
508 })
509 .await;
510
511 match transact_result {
512 Ok(_) => Ok(StageResult::Response(ExecuteResponse::CreatedIndex)),
513 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
514 kind:
515 mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
516 })) if if_not_exists => {
517 ctx.session()
518 .add_notice(AdapterNotice::ObjectAlreadyExists {
519 name: name.item,
520 ty: "index",
521 });
522 Ok(StageResult::Response(ExecuteResponse::CreatedIndex))
523 }
524 Err(err) => Err(err),
525 }
526 }
527
528 #[instrument]
529 async fn create_index_explain(
530 &mut self,
531 session: &Session,
532 CreateIndexExplain {
533 exported_index_id,
534 plan: plan::CreateIndexPlan { name, index, .. },
535 df_meta,
536 explain_ctx:
537 ExplainPlanContext {
538 config,
539 format,
540 stage,
541 optimizer_trace,
542 ..
543 },
544 ..
545 }: CreateIndexExplain,
546 ) -> Result<StageResult<Box<CreateIndexStage>>, AdapterError> {
547 let session_catalog = self.catalog().for_session(session);
548 let expr_humanizer = {
549 let on_entry = self.catalog.get_entry_by_global_id(&index.on);
550 let full_name = self.catalog.resolve_full_name(&name, on_entry.conn_id());
551 let on_desc = on_entry
552 .desc(&full_name)
553 .expect("can only create indexes on items with a valid description");
554
555 let transient_items = btreemap! {
556 exported_index_id => TransientItem::new(
557 Some(full_name.into_parts()),
558 Some(on_desc.iter_names().map(|c| c.to_string()).collect()),
559 )
560 };
561 ExprHumanizerExt::new(transient_items, &session_catalog)
562 };
563
564 let target_cluster = self.catalog().get_cluster(index.cluster_id);
565
566 let features = OptimizerFeatures::from(self.catalog().system_config())
567 .override_from(&target_cluster.config.features())
568 .override_from(&config.features);
569
570 let rows = optimizer_trace
571 .into_rows(
572 format,
573 &config,
574 &features,
575 &expr_humanizer,
576 None,
577 Some(target_cluster),
578 df_meta,
579 stage,
580 plan::ExplaineeStatementKind::CreateIndex,
581 None,
582 )
583 .await?;
584
585 Ok(StageResult::Response(Self::send_immediate_rows(rows)))
586 }
587}