1use std::collections::BTreeMap;
11
12use maplit::btreemap;
13use mz_catalog::memory::error::ErrorKind;
14use mz_catalog::memory::objects::{CatalogItem, Index};
15use mz_ore::instrument;
16use mz_repr::explain::{ExprHumanizerExt, TransientItem};
17use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
18use mz_repr::{Datum, Row};
19use mz_sql::ast::ExplainStage;
20use mz_sql::catalog::CatalogError;
21use mz_sql::names::ResolvedIds;
22use mz_sql::plan;
23use tracing::Span;
24
25use crate::command::ExecuteResponse;
26use crate::coord::sequencer::inner::return_if_err;
27use crate::coord::{
28 Coordinator, CreateIndexExplain, CreateIndexFinish, CreateIndexOptimize, CreateIndexStage,
29 ExplainContext, ExplainPlanContext, Message, PlanValidity, StageResult, Staged,
30};
31use crate::error::AdapterError;
32use crate::explain::explain_dataflow;
33use crate::explain::optimizer_trace::OptimizerTrace;
34use crate::optimize::dataflows::dataflow_import_id_bundle;
35use crate::optimize::{self, Optimize};
36use crate::session::Session;
37use crate::{AdapterNotice, ExecuteContext, catalog};
38
39impl Staged for CreateIndexStage {
40 type Ctx = ExecuteContext;
41
42 fn validity(&mut self) -> &mut PlanValidity {
43 match self {
44 Self::Optimize(stage) => &mut stage.validity,
45 Self::Finish(stage) => &mut stage.validity,
46 Self::Explain(stage) => &mut stage.validity,
47 }
48 }
49
50 async fn stage(
51 self,
52 coord: &mut Coordinator,
53 ctx: &mut ExecuteContext,
54 ) -> Result<StageResult<Box<Self>>, AdapterError> {
55 match self {
56 CreateIndexStage::Optimize(stage) => coord.create_index_optimize(stage).await,
57 CreateIndexStage::Finish(stage) => coord.create_index_finish(ctx, stage).await,
58 CreateIndexStage::Explain(stage) => {
59 coord.create_index_explain(ctx.session(), stage).await
60 }
61 }
62 }
63
64 fn message(self, ctx: ExecuteContext, span: Span) -> Message {
65 Message::CreateIndexStageReady {
66 ctx,
67 span,
68 stage: self,
69 }
70 }
71
72 fn cancel_enabled(&self) -> bool {
73 true
74 }
75}
76
77impl Coordinator {
78 #[instrument]
79 pub(crate) async fn sequence_create_index(
80 &mut self,
81 ctx: ExecuteContext,
82 plan: plan::CreateIndexPlan,
83 resolved_ids: ResolvedIds,
84 ) {
85 let stage = return_if_err!(
86 self.create_index_validate(plan, resolved_ids, ExplainContext::None),
87 ctx
88 );
89 self.sequence_staged(ctx, Span::current(), stage).await;
90 }
91
92 #[instrument]
93 pub(crate) async fn explain_create_index(
94 &mut self,
95 ctx: ExecuteContext,
96 plan::ExplainPlanPlan {
97 stage,
98 format,
99 config,
100 explainee,
101 }: plan::ExplainPlanPlan,
102 ) {
103 let plan::Explainee::Statement(stmt) = explainee else {
104 unreachable!()
107 };
108 let plan::ExplaineeStatement::CreateIndex { broken, plan } = stmt else {
109 unreachable!()
112 };
113
114 let optimizer_trace = OptimizerTrace::new(stage.paths());
117
118 let resolved_ids = ResolvedIds::empty();
120
121 let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
122 broken,
123 config,
124 format,
125 stage,
126 replan: None,
127 desc: None,
128 optimizer_trace,
129 });
130 let stage = return_if_err!(
131 self.create_index_validate(plan, resolved_ids, explain_ctx),
132 ctx
133 );
134 self.sequence_staged(ctx, Span::current(), stage).await;
135 }
136
137 #[instrument]
138 pub(crate) async fn explain_replan_index(
139 &mut self,
140 ctx: ExecuteContext,
141 plan::ExplainPlanPlan {
142 stage,
143 format,
144 config,
145 explainee,
146 }: plan::ExplainPlanPlan,
147 ) {
148 let plan::Explainee::ReplanIndex(id) = explainee else {
149 unreachable!() };
151 let CatalogItem::Index(index) = self.catalog().get_entry(&id).item() else {
152 unreachable!() };
154 let id = index.global_id();
155
156 let create_sql = index.create_sql.clone();
157 let plan_result = self
158 .catalog_mut()
159 .deserialize_plan_with_enable_for_item_parsing(&create_sql, true);
160 let (plan, resolved_ids) = return_if_err!(plan_result, ctx);
161
162 let plan::Plan::CreateIndex(plan) = plan else {
163 unreachable!() };
165
166 let broken = false;
169
170 let optimizer_trace = OptimizerTrace::new(stage.paths());
173
174 let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
175 broken,
176 config,
177 format,
178 stage,
179 replan: Some(id),
180 desc: None,
181 optimizer_trace,
182 });
183 let stage = return_if_err!(
184 self.create_index_validate(plan, resolved_ids, explain_ctx),
185 ctx
186 );
187 self.sequence_staged(ctx, Span::current(), stage).await;
188 }
189
190 #[instrument]
191 pub(crate) fn explain_index(
192 &self,
193 ctx: &ExecuteContext,
194 plan::ExplainPlanPlan {
195 stage,
196 format,
197 config,
198 explainee,
199 }: plan::ExplainPlanPlan,
200 ) -> Result<ExecuteResponse, AdapterError> {
201 let plan::Explainee::Index(id) = explainee else {
202 unreachable!() };
204 let CatalogItem::Index(index) = self.catalog().get_entry(&id).item() else {
205 unreachable!() };
207
208 let Some(dataflow_metainfo) = self.catalog().try_get_dataflow_metainfo(&index.global_id())
209 else {
210 if !id.is_system() {
211 tracing::error!("cannot find dataflow metainformation for index {id} in catalog");
212 }
213 coord_bail!("cannot find dataflow metainformation for index {id} in catalog");
214 };
215
216 let target_cluster = self.catalog().get_cluster(index.cluster_id);
217
218 let features = OptimizerFeatures::from(self.catalog().system_config())
219 .override_from(&target_cluster.config.features())
220 .override_from(&config.features);
221
222 let cardinality_stats = BTreeMap::new();
224
225 let explain = match stage {
226 ExplainStage::GlobalPlan => {
227 let Some(plan) = self
228 .catalog()
229 .try_get_optimized_plan(&index.global_id())
230 .cloned()
231 else {
232 tracing::error!("cannot find {stage} for index {id} in catalog");
233 coord_bail!("cannot find {stage} for index in catalog");
234 };
235
236 explain_dataflow(
237 plan,
238 format,
239 &config,
240 &features,
241 &self.catalog().for_session(ctx.session()),
242 cardinality_stats,
243 Some(target_cluster.name.as_str()),
244 dataflow_metainfo,
245 )?
246 }
247 ExplainStage::PhysicalPlan => {
248 let Some(plan) = self
249 .catalog()
250 .try_get_physical_plan(&index.global_id())
251 .cloned()
252 else {
253 tracing::error!("cannot find {stage} for index {id} in catalog");
254 coord_bail!("cannot find {stage} for index in catalog");
255 };
256 explain_dataflow(
257 plan,
258 format,
259 &config,
260 &features,
261 &self.catalog().for_session(ctx.session()),
262 cardinality_stats,
263 Some(target_cluster.name.as_str()),
264 dataflow_metainfo,
265 )?
266 }
267 _ => {
268 coord_bail!("cannot EXPLAIN {} FOR INDEX", stage);
269 }
270 };
271
272 let row = Row::pack_slice(&[Datum::from(explain.as_str())]);
273
274 Ok(Self::send_immediate_rows(row))
275 }
276
277 #[instrument]
280 fn create_index_validate(
281 &self,
282 plan: plan::CreateIndexPlan,
283 resolved_ids: ResolvedIds,
284 explain_ctx: ExplainContext,
285 ) -> Result<CreateIndexStage, AdapterError> {
286 let validity =
287 PlanValidity::require_transient_revision(self.catalog().transient_revision());
288 Ok(CreateIndexStage::Optimize(CreateIndexOptimize {
289 validity,
290 plan,
291 resolved_ids,
292 explain_ctx,
293 }))
294 }
295
296 #[instrument]
297 async fn create_index_optimize(
298 &mut self,
299 CreateIndexOptimize {
300 validity,
301 plan,
302 resolved_ids,
303 explain_ctx,
304 }: CreateIndexOptimize,
305 ) -> Result<StageResult<Box<CreateIndexStage>>, AdapterError> {
306 let plan::CreateIndexPlan {
307 index: plan::Index { cluster_id, .. },
308 ..
309 } = &plan;
310
311 let compute_instance = self
313 .instance_snapshot(*cluster_id)
314 .expect("compute instance does not exist");
315 let (item_id, global_id) = if let ExplainContext::None = explain_ctx {
316 let id_ts = self.get_catalog_write_ts().await;
317 self.catalog().allocate_user_id(id_ts).await?
318 } else {
319 self.allocate_transient_id()
320 };
321
322 let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config())
323 .override_from(&self.catalog.get_cluster(*cluster_id).config.features())
324 .override_from(&explain_ctx);
325 let optimizer_features = optimizer_config.features.clone();
326
327 let mut optimizer = optimize::index::Optimizer::new(
329 self.owned_catalog(),
330 compute_instance,
331 global_id,
332 optimizer_config,
333 self.optimizer_metrics(),
334 );
335 let span = Span::current();
336 Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
337 || "optimize create index",
338 move || {
339 span.in_scope(|| {
340 let mut pipeline = || -> Result<(
341 optimize::index::GlobalMirPlan,
342 optimize::index::GlobalLirPlan,
343 ), AdapterError> {
344 let _dispatch_guard = explain_ctx.dispatch_guard();
345
346 let index_plan = optimize::index::Index::new(
347 plan.name.clone(),
348 plan.index.on,
349 plan.index.keys.clone(),
350 );
351
352 let global_mir_plan = optimizer.catch_unwind_optimize(index_plan)?;
354 let global_lir_plan = optimizer.catch_unwind_optimize(global_mir_plan.clone())?;
356
357 Ok((global_mir_plan, global_lir_plan))
358 };
359
360 let stage = match pipeline() {
361 Ok((global_mir_plan, global_lir_plan)) => {
362 if let ExplainContext::Plan(explain_ctx) = explain_ctx {
363 let (_, df_meta) = global_lir_plan.unapply();
364 CreateIndexStage::Explain(CreateIndexExplain {
365 validity,
366 exported_index_id: global_id,
367 plan,
368 df_meta,
369 explain_ctx,
370 })
371 } else {
372 CreateIndexStage::Finish(CreateIndexFinish {
373 validity,
374 item_id,
375 global_id,
376 plan,
377 resolved_ids,
378 global_mir_plan,
379 global_lir_plan,
380 optimizer_features,
381 })
382 }
383 }
384 Err(err) => {
387 let ExplainContext::Plan(explain_ctx) = explain_ctx else {
388 return Err(err);
390 };
391
392 if explain_ctx.broken {
393 tracing::error!("error while handling EXPLAIN statement: {}", err);
397 CreateIndexStage::Explain(CreateIndexExplain {
398 validity,
399 exported_index_id: global_id,
400 plan,
401 df_meta: Default::default(),
402 explain_ctx,
403 })
404 } else {
405 return Err(err);
407 }
408 }
409 };
410 Ok(Box::new(stage))
411 })
412 },
413 )))
414 }
415
416 #[instrument]
417 async fn create_index_finish(
418 &mut self,
419 ctx: &mut ExecuteContext,
420 stage: CreateIndexFinish,
421 ) -> Result<StageResult<Box<CreateIndexStage>>, AdapterError> {
422 let CreateIndexFinish {
423 item_id,
424 global_id,
425 plan:
426 plan::CreateIndexPlan {
427 name,
428 index:
429 plan::Index {
430 create_sql,
431 on,
432 keys,
433 cluster_id,
434 compaction_window,
435 },
436 if_not_exists,
437 },
438 resolved_ids,
439 global_mir_plan,
440 global_lir_plan,
441 optimizer_features,
442 ..
443 } = stage;
444 let id_bundle = dataflow_import_id_bundle(global_lir_plan.df_desc(), cluster_id);
445
446 let ops = vec![catalog::Op::CreateItem {
447 id: item_id,
448 name: name.clone(),
449 item: CatalogItem::Index(Index {
450 create_sql,
451 global_id,
452 keys: keys.into(),
453 on,
454 conn_id: None,
455 resolved_ids,
456 cluster_id,
457 is_retained_metrics_object: false,
458 custom_logical_compaction_window: compaction_window,
459 }),
460 owner_id: *self.catalog().get_entry_by_global_id(&on).owner_id(),
461 }];
462
463 let notice_ids = std::iter::repeat_with(|| self.allocate_transient_id())
465 .map(|(_item_id, global_id)| global_id)
466 .take(global_lir_plan.df_meta().optimizer_notices.len())
467 .collect::<Vec<_>>();
468
469 let transact_result = self
470 .catalog_transact_with_side_effects(Some(ctx), ops, move |coord, ctx| {
471 Box::pin(async move {
472 let (mut df_desc, df_meta) = global_lir_plan.unapply();
473
474 coord
476 .catalog_mut()
477 .set_optimized_plan(global_id, global_mir_plan.df_desc().clone());
478 coord
479 .catalog_mut()
480 .set_physical_plan(global_id, df_desc.clone());
481
482 let notice_builtin_updates_fut = coord
483 .process_dataflow_metainfo(df_meta, global_id, ctx, notice_ids)
484 .await;
485
486 coord
487 .catalog()
488 .cache_expressions(global_id, None, optimizer_features);
489
490 let read_holds = coord.acquire_read_holds(&id_bundle);
498 let since = read_holds.least_valid_read();
499 df_desc.set_as_of(since);
500
501 coord
502 .ship_dataflow_and_notice_builtin_table_updates(
503 df_desc,
504 cluster_id,
505 notice_builtin_updates_fut,
506 None,
507 )
508 .await;
509 drop(read_holds);
514
515 coord.update_compute_read_policy(
516 cluster_id,
517 item_id,
518 compaction_window.unwrap_or_default().into(),
519 );
520 })
521 })
522 .await;
523
524 match transact_result {
525 Ok(_) => Ok(StageResult::Response(ExecuteResponse::CreatedIndex)),
526 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
527 kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
528 })) if if_not_exists => {
529 ctx.session()
530 .add_notice(AdapterNotice::ObjectAlreadyExists {
531 name: name.item,
532 ty: "index",
533 });
534 Ok(StageResult::Response(ExecuteResponse::CreatedIndex))
535 }
536 Err(err) => Err(err),
537 }
538 }
539
540 #[instrument]
541 async fn create_index_explain(
542 &self,
543 session: &Session,
544 CreateIndexExplain {
545 exported_index_id,
546 plan: plan::CreateIndexPlan { name, index, .. },
547 df_meta,
548 explain_ctx:
549 ExplainPlanContext {
550 config,
551 format,
552 stage,
553 optimizer_trace,
554 ..
555 },
556 ..
557 }: CreateIndexExplain,
558 ) -> Result<StageResult<Box<CreateIndexStage>>, AdapterError> {
559 let session_catalog = self.catalog().for_session(session);
560 let expr_humanizer = {
561 let on_entry = self.catalog.get_entry_by_global_id(&index.on);
562 let full_name = self.catalog.resolve_full_name(&name, on_entry.conn_id());
563 let on_desc = on_entry
564 .relation_desc()
565 .expect("can only create indexes on items with a valid description");
566
567 let transient_items = btreemap! {
568 exported_index_id => TransientItem::new(
569 Some(full_name.into_parts()),
570 Some(on_desc.iter_names().map(|c| c.to_string()).collect()),
571 )
572 };
573 ExprHumanizerExt::new(transient_items, &session_catalog)
574 };
575
576 let target_cluster = self.catalog().get_cluster(index.cluster_id);
577
578 let features = OptimizerFeatures::from(self.catalog().system_config())
579 .override_from(&target_cluster.config.features())
580 .override_from(&config.features);
581
582 let rows = optimizer_trace
583 .into_rows(
584 format,
585 &config,
586 &features,
587 &expr_humanizer,
588 None,
589 Some(target_cluster),
590 df_meta,
591 stage,
592 plan::ExplaineeStatementKind::CreateIndex,
593 None,
594 )
595 .await?;
596
597 Ok(StageResult::Response(Self::send_immediate_rows(rows)))
598 }
599}