Skip to main content

mz_adapter/coord/sequencer/inner/
create_index.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11
12use maplit::btreemap;
13use mz_catalog::memory::error::ErrorKind;
14use mz_catalog::memory::objects::{CatalogItem, Index};
15use mz_ore::instrument;
16use mz_repr::explain::{ExprHumanizerExt, TransientItem};
17use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
18use mz_repr::{Datum, Row};
19use mz_sql::ast::ExplainStage;
20use mz_sql::catalog::CatalogError;
21use mz_sql::names::ResolvedIds;
22use mz_sql::plan;
23use mz_sql::session::metadata::SessionMetadata;
24use tracing::Span;
25
26use crate::catalog::CatalogState;
27use crate::command::ExecuteResponse;
28use crate::coord::sequencer::inner::return_if_err;
29use crate::coord::{
30    Coordinator, CreateIndexExplain, CreateIndexFinish, CreateIndexOptimize, CreateIndexStage,
31    ExplainContext, ExplainPlanContext, Message, PlanValidity, StageResult, Staged,
32};
33use crate::error::AdapterError;
34use crate::explain::explain_dataflow;
35use crate::explain::optimizer_trace::OptimizerTrace;
36use crate::optimize::dataflows::dataflow_import_id_bundle;
37use crate::optimize::{self, Optimize};
38use crate::session::Session;
39use crate::{AdapterNotice, ExecuteContext, catalog};
40
41impl Staged for CreateIndexStage {
42    type Ctx = ExecuteContext;
43
44    fn validity(&mut self) -> &mut PlanValidity {
45        match self {
46            Self::Optimize(stage) => &mut stage.validity,
47            Self::Finish(stage) => &mut stage.validity,
48            Self::Explain(stage) => &mut stage.validity,
49        }
50    }
51
52    async fn stage(
53        self,
54        coord: &mut Coordinator,
55        ctx: &mut ExecuteContext,
56    ) -> Result<StageResult<Box<Self>>, AdapterError> {
57        match self {
58            CreateIndexStage::Optimize(stage) => coord.create_index_optimize(stage).await,
59            CreateIndexStage::Finish(stage) => coord.create_index_finish(ctx, stage).await,
60            CreateIndexStage::Explain(stage) => {
61                coord.create_index_explain(ctx.session(), stage).await
62            }
63        }
64    }
65
66    fn message(self, ctx: ExecuteContext, span: Span) -> Message {
67        Message::CreateIndexStageReady {
68            ctx,
69            span,
70            stage: self,
71        }
72    }
73
74    fn cancel_enabled(&self) -> bool {
75        true
76    }
77}
78
79impl Coordinator {
80    #[instrument]
81    pub(crate) async fn sequence_create_index(
82        &mut self,
83        ctx: ExecuteContext,
84        plan: plan::CreateIndexPlan,
85        resolved_ids: ResolvedIds,
86    ) {
87        let stage = return_if_err!(
88            self.create_index_validate(ctx.session(), plan, resolved_ids, ExplainContext::None),
89            ctx
90        );
91        self.sequence_staged(ctx, Span::current(), stage).await;
92    }
93
94    #[instrument]
95    pub(crate) async fn explain_create_index(
96        &mut self,
97        ctx: ExecuteContext,
98        plan::ExplainPlanPlan {
99            stage,
100            format,
101            config,
102            explainee,
103        }: plan::ExplainPlanPlan,
104    ) {
105        let plan::Explainee::Statement(stmt) = explainee else {
106            // This is currently asserted in the `sequence_explain_plan` code that
107            // calls this method.
108            unreachable!()
109        };
110        let plan::ExplaineeStatement::CreateIndex { broken, plan } = stmt else {
111            // This is currently asserted in the `sequence_explain_plan` code that
112            // calls this method.
113            unreachable!()
114        };
115
116        // Create an OptimizerTrace instance to collect plans emitted when
117        // executing the optimizer pipeline.
118        let optimizer_trace = OptimizerTrace::new(stage.paths());
119
120        // Not used in the EXPLAIN path so it's OK to generate a dummy value.
121        let resolved_ids = ResolvedIds::empty();
122
123        let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
124            broken,
125            config,
126            format,
127            stage,
128            replan: None,
129            desc: None,
130            optimizer_trace,
131        });
132        let stage = return_if_err!(
133            self.create_index_validate(ctx.session(), plan, resolved_ids, explain_ctx),
134            ctx
135        );
136        self.sequence_staged(ctx, Span::current(), stage).await;
137    }
138
139    #[instrument]
140    pub(crate) async fn explain_replan_index(
141        &mut self,
142        ctx: ExecuteContext,
143        plan::ExplainPlanPlan {
144            stage,
145            format,
146            config,
147            explainee,
148        }: plan::ExplainPlanPlan,
149    ) {
150        let plan::Explainee::ReplanIndex(id) = explainee else {
151            unreachable!() // Asserted in `sequence_explain_plan`.
152        };
153        let CatalogItem::Index(index) = self.catalog().get_entry(&id).item() else {
154            unreachable!() // Asserted in `plan_explain_plan`.
155        };
156        let id = index.global_id();
157
158        let create_sql = index.create_sql.clone();
159        let plan_result = self
160            .catalog_mut()
161            .deserialize_plan_with_enable_for_item_parsing(&create_sql, true);
162        let (plan, resolved_ids) = return_if_err!(plan_result, ctx);
163
164        let plan::Plan::CreateIndex(plan) = plan else {
165            unreachable!() // We are parsing the `create_sql` of an `Index` item.
166        };
167
168        // It is safe to assume that query optimization will always succeed, so
169        // for now we statically assume `broken = false`.
170        let broken = false;
171
172        // Create an OptimizerTrace instance to collect plans emitted when
173        // executing the optimizer pipeline.
174        let optimizer_trace = OptimizerTrace::new(stage.paths());
175
176        let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
177            broken,
178            config,
179            format,
180            stage,
181            replan: Some(id),
182            desc: None,
183            optimizer_trace,
184        });
185        let stage = return_if_err!(
186            self.create_index_validate(ctx.session(), plan, resolved_ids, explain_ctx),
187            ctx
188        );
189        self.sequence_staged(ctx, Span::current(), stage).await;
190    }
191
192    #[instrument]
193    pub(crate) fn explain_index(
194        &self,
195        ctx: &ExecuteContext,
196        plan::ExplainPlanPlan {
197            stage,
198            format,
199            config,
200            explainee,
201        }: plan::ExplainPlanPlan,
202    ) -> Result<ExecuteResponse, AdapterError> {
203        let plan::Explainee::Index(id) = explainee else {
204            unreachable!() // Asserted in `sequence_explain_plan`.
205        };
206        let CatalogItem::Index(index) = self.catalog().get_entry(&id).item() else {
207            unreachable!() // Asserted in `plan_explain_plan`.
208        };
209
210        let Some(dataflow_metainfo) = self.catalog().try_get_dataflow_metainfo(&index.global_id())
211        else {
212            if !id.is_system() {
213                tracing::error!("cannot find dataflow metainformation for index {id} in catalog");
214            }
215            coord_bail!("cannot find dataflow metainformation for index {id} in catalog");
216        };
217
218        let target_cluster = self.catalog().get_cluster(index.cluster_id);
219
220        let features = OptimizerFeatures::from(self.catalog().system_config())
221            .override_from(&target_cluster.config.features())
222            .override_from(&self.cluster_scoped_optimizer_overrides(index.cluster_id))
223            .override_from(&config.features);
224
225        // TODO(mgree): calculate statistics (need a timestamp)
226        let cardinality_stats = BTreeMap::new();
227
228        let explain = match stage {
229            ExplainStage::GlobalPlan => {
230                let Some(plan) = self
231                    .catalog()
232                    .try_get_optimized_plan(&index.global_id())
233                    .cloned()
234                else {
235                    tracing::error!("cannot find {stage} for index {id} in catalog");
236                    coord_bail!("cannot find {stage} for index in catalog");
237                };
238
239                explain_dataflow(
240                    plan,
241                    format,
242                    &config,
243                    &features,
244                    &self.catalog().for_session(ctx.session()),
245                    cardinality_stats,
246                    Some(target_cluster.name.as_str()),
247                    dataflow_metainfo,
248                )?
249            }
250            ExplainStage::PhysicalPlan => {
251                let Some(plan) = self
252                    .catalog()
253                    .try_get_physical_plan(&index.global_id())
254                    .cloned()
255                else {
256                    tracing::error!("cannot find {stage} for index {id} in catalog");
257                    coord_bail!("cannot find {stage} for index in catalog");
258                };
259                explain_dataflow(
260                    plan,
261                    format,
262                    &config,
263                    &features,
264                    &self.catalog().for_session(ctx.session()),
265                    cardinality_stats,
266                    Some(target_cluster.name.as_str()),
267                    dataflow_metainfo,
268                )?
269            }
270            _ => {
271                coord_bail!("cannot EXPLAIN {} FOR INDEX", stage);
272            }
273        };
274
275        let row = Row::pack_slice(&[Datum::from(explain.as_str())]);
276
277        Ok(Self::send_immediate_rows(row))
278    }
279
280    // `explain_ctx` is an optional context set iff the state machine is initiated from
281    // sequencing an EXPLAIN for this statement.
282    #[instrument]
283    fn create_index_validate(
284        &self,
285        session: &Session,
286        plan: plan::CreateIndexPlan,
287        resolved_ids: ResolvedIds,
288        explain_ctx: ExplainContext,
289    ) -> Result<CreateIndexStage, AdapterError> {
290        // Track the target cluster and resolved dependencies so concurrent
291        // drops are caught between stages instead of panicking later when the
292        // persisted SQL is re-parsed during catalog application.
293        let validity = PlanValidity::new(
294            self.catalog().transient_revision(),
295            resolved_ids.items().copied().collect(),
296            Some(plan.index.cluster_id),
297            None,
298            session.role_metadata().clone(),
299        );
300        Ok(CreateIndexStage::Optimize(CreateIndexOptimize {
301            validity,
302            plan,
303            resolved_ids,
304            explain_ctx,
305        }))
306    }
307
308    #[instrument]
309    async fn create_index_optimize(
310        &mut self,
311        CreateIndexOptimize {
312            validity,
313            plan,
314            resolved_ids,
315            explain_ctx,
316        }: CreateIndexOptimize,
317    ) -> Result<StageResult<Box<CreateIndexStage>>, AdapterError> {
318        let plan::CreateIndexPlan {
319            index: plan::Index { cluster_id, .. },
320            ..
321        } = &plan;
322
323        // Collect optimizer parameters.
324        let compute_instance = self
325            .instance_snapshot(*cluster_id)
326            .expect("compute instance does not exist");
327        let (item_id, global_id) = if let ExplainContext::None = explain_ctx {
328            self.allocate_user_id().await?
329        } else {
330            self.allocate_transient_id()
331        };
332
333        let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config())
334            .override_from(&self.catalog.get_cluster(*cluster_id).config.features())
335            .override_from(&self.cluster_scoped_optimizer_overrides(*cluster_id))
336            .override_from(&explain_ctx);
337        let optimizer_features = optimizer_config.features.clone();
338
339        // Build an optimizer for this INDEX.
340        let mut optimizer = optimize::index::Optimizer::new(
341            self.owned_catalog(),
342            compute_instance,
343            global_id,
344            optimizer_config,
345            self.optimizer_metrics(),
346        );
347        let span = Span::current();
348        Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
349            || "optimize create index",
350            move || {
351                span.in_scope(|| {
352                    let mut pipeline = || -> Result<(
353                    optimize::index::GlobalMirPlan,
354                    optimize::index::GlobalLirPlan,
355                ), AdapterError> {
356                    let _dispatch_guard = explain_ctx.dispatch_guard();
357
358                    let index_plan = optimize::index::Index::new(
359                        plan.name.clone(),
360                        plan.index.on,
361                        plan.index.keys.clone(),
362                    );
363
364                    // MIR ⇒ MIR optimization (global)
365                    let global_mir_plan = optimizer.catch_unwind_optimize(index_plan)?;
366                    // MIR ⇒ LIR lowering and LIR ⇒ LIR optimization (global)
367                    let global_lir_plan = optimizer.catch_unwind_optimize(global_mir_plan.clone())?;
368
369                    Ok((global_mir_plan, global_lir_plan))
370                };
371
372                    let stage = match pipeline() {
373                        Ok((global_mir_plan, global_lir_plan)) => {
374                            if let ExplainContext::Plan(explain_ctx) = explain_ctx {
375                                let (_, df_meta) = global_lir_plan.unapply();
376                                CreateIndexStage::Explain(CreateIndexExplain {
377                                    validity,
378                                    exported_index_id: global_id,
379                                    plan,
380                                    df_meta,
381                                    explain_ctx,
382                                })
383                            } else {
384                                CreateIndexStage::Finish(CreateIndexFinish {
385                                    validity,
386                                    item_id,
387                                    global_id,
388                                    plan,
389                                    resolved_ids,
390                                    global_mir_plan,
391                                    global_lir_plan,
392                                    optimizer_features,
393                                })
394                            }
395                        }
396                        // Internal optimizer errors are handled differently
397                        // depending on the caller.
398                        Err(err) => {
399                            let ExplainContext::Plan(explain_ctx) = explain_ctx else {
400                                // In `sequence_~` contexts, immediately error.
401                                return Err(err);
402                            };
403
404                            if explain_ctx.broken {
405                                // In `EXPLAIN BROKEN` contexts, just log the error
406                                // and move to the next stage with default
407                                // parameters.
408                                tracing::error!("error while handling EXPLAIN statement: {}", err);
409                                CreateIndexStage::Explain(CreateIndexExplain {
410                                    validity,
411                                    exported_index_id: global_id,
412                                    plan,
413                                    df_meta: Default::default(),
414                                    explain_ctx,
415                                })
416                            } else {
417                                // In regular `EXPLAIN` contexts, immediately error.
418                                return Err(err);
419                            }
420                        }
421                    };
422                    Ok(Box::new(stage))
423                })
424            },
425        )))
426    }
427
428    #[instrument]
429    async fn create_index_finish(
430        &mut self,
431        ctx: &mut ExecuteContext,
432        stage: CreateIndexFinish,
433    ) -> Result<StageResult<Box<CreateIndexStage>>, AdapterError> {
434        let CreateIndexFinish {
435            item_id,
436            global_id,
437            plan:
438                plan::CreateIndexPlan {
439                    name,
440                    index:
441                        plan::Index {
442                            create_sql,
443                            on,
444                            keys,
445                            cluster_id,
446                            compaction_window,
447                        },
448                    if_not_exists,
449                },
450            resolved_ids,
451            global_mir_plan,
452            global_lir_plan,
453            optimizer_features,
454            ..
455        } = stage;
456        let id_bundle = dataflow_import_id_bundle(global_lir_plan.df_desc(), cluster_id);
457
458        let on_entry = self.catalog().get_entry_by_global_id(&on);
459        let owner_id = *on_entry.owner_id();
460
461        let ops = vec![catalog::Op::CreateItem {
462            id: item_id,
463            name: name.clone(),
464            item: CatalogItem::Index(Index {
465                create_sql,
466                global_id,
467                keys: keys.into(),
468                on,
469                conn_id: None,
470                resolved_ids,
471                cluster_id,
472                is_retained_metrics_object: false,
473                custom_logical_compaction_window: compaction_window,
474                optimized_plan: None,
475                physical_plan: None,
476                dataflow_metainfo: None,
477            }),
478            owner_id,
479        }];
480
481        // Pre-allocate a vector of transient GlobalIds for each notice.
482        let notice_ids = std::iter::repeat_with(|| self.allocate_transient_id())
483            .map(|(_item_id, global_id)| global_id)
484            .take(global_lir_plan.df_meta().optimizer_notices.len())
485            .collect::<Vec<_>>();
486
487        // Render optimizer notices before the catalog transaction, using an
488        // `ExprHumanizerExt` that knows about the to-be-created index. This
489        // way the notice text produced here (and persisted in
490        // `mz_optimizer_notices`) resolves the new index's own `global_id` to
491        // its intended human-readable name, rather than to the bare transient
492        // id that `for_system_session()` would produce on its own.
493        //
494        // We keep `raw_df_meta` live so that on success we can emit its raw
495        // notices to the user session (rendered against the user's
496        // session-aware humanizer). We deliberately do NOT emit to the user
497        // here, so that if the catalog transaction below fails the user
498        // isn't shown confusing notices about an item that wasn't actually
499        // created.
500        let (mut df_desc, raw_df_meta) = global_lir_plan.unapply();
501        let df_meta = {
502            let system_catalog = self.catalog().for_system_session();
503            let full_name = self.catalog().resolve_full_name(&name, None);
504            let on_desc = on_entry
505                .relation_desc()
506                .expect("can only create indexes on items with a valid description");
507            let transient_items = btreemap! {
508                global_id => TransientItem::new(
509                    Some(full_name.into_parts()),
510                    Some(on_desc.iter_names().map(|c| c.to_string()).collect()),
511                )
512            };
513            let humanizer = ExprHumanizerExt::new(transient_items, &system_catalog);
514            CatalogState::render_notices_core(
515                &humanizer,
516                (self.catalog().config().now)(),
517                &raw_df_meta,
518                notice_ids,
519                Some(global_id),
520            )
521        };
522
523        // Populate the durable expression cache before the catalog
524        // transaction and await the write. This way any other envd (or a
525        // subsequent bootstrap here) will observe the cached plans +
526        // rendered notices as soon as the item becomes visible.
527        self.catalog()
528            .cache_expressions(
529                global_id,
530                None,
531                global_mir_plan.df_desc().clone(),
532                df_desc.clone(),
533                df_meta.clone(),
534                optimizer_features,
535            )
536            .await;
537
538        let transact_result = self
539            .catalog_transact_with_side_effects(Some(ctx), ops, move |coord, _ctx| {
540                Box::pin(async move {
541                    // Save plan structures.
542                    coord
543                        .catalog_mut()
544                        .set_optimized_plan(global_id, global_mir_plan.df_desc().clone());
545                    coord
546                        .catalog_mut()
547                        .set_physical_plan(global_id, df_desc.clone());
548
549                    let notice_builtin_updates_fut =
550                        coord.persist_dataflow_metainfo(df_meta, global_id).await;
551
552                    // We're putting in place read holds, such that ship_dataflow,
553                    // below, which calls update_read_capabilities, can successfully
554                    // do so. Otherwise, the since of dependencies might move along
555                    // concurrently, pulling the rug from under us!
556                    //
557                    // TODO: Maybe in the future, pass those holds on to compute, to
558                    // hold on to them and downgrade when possible?
559                    let read_holds = coord.acquire_read_holds(&id_bundle);
560                    let since = read_holds.least_valid_read();
561                    df_desc.set_as_of(since);
562
563                    coord
564                        .ship_dataflow_and_notice_builtin_table_updates(
565                            df_desc,
566                            cluster_id,
567                            notice_builtin_updates_fut,
568                            None,
569                        )
570                        .await;
571                    // No `allow_writes` here because indexes do not modify external state.
572
573                    // Drop read holds after the dataflow has been shipped, at which
574                    // point compute will have put in its own read holds.
575                    drop(read_holds);
576
577                    coord.update_compute_read_policy(
578                        cluster_id,
579                        item_id,
580                        compaction_window.unwrap_or_default().into(),
581                    );
582                })
583            })
584            .await;
585
586        match transact_result {
587            Ok(_) => {
588                // Only emit optimizer notices to the user now that the
589                // catalog transaction has succeeded. If the transaction had
590                // failed, emitting notices would confuse the user with
591                // information about an item that wasn't actually created.
592                self.emit_raw_optimizer_notices_to_user(ctx, &raw_df_meta.optimizer_notices);
593                Ok(StageResult::Response(ExecuteResponse::CreatedIndex))
594            }
595            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
596                kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
597            })) if if_not_exists => {
598                ctx.session()
599                    .add_notice(AdapterNotice::ObjectAlreadyExists {
600                        name: name.item,
601                        ty: "index",
602                    });
603                Ok(StageResult::Response(ExecuteResponse::CreatedIndex))
604            }
605            Err(err) => Err(err),
606        }
607    }
608
609    #[instrument]
610    async fn create_index_explain(
611        &self,
612        session: &Session,
613        CreateIndexExplain {
614            exported_index_id,
615            plan: plan::CreateIndexPlan { name, index, .. },
616            df_meta,
617            explain_ctx:
618                ExplainPlanContext {
619                    config,
620                    format,
621                    stage,
622                    optimizer_trace,
623                    ..
624                },
625            ..
626        }: CreateIndexExplain,
627    ) -> Result<StageResult<Box<CreateIndexStage>>, AdapterError> {
628        let session_catalog = self.catalog().for_session(session);
629        let expr_humanizer = {
630            let on_entry = self.catalog.get_entry_by_global_id(&index.on);
631            let full_name = self.catalog.resolve_full_name(&name, on_entry.conn_id());
632            let on_desc = on_entry
633                .relation_desc()
634                .expect("can only create indexes on items with a valid description");
635
636            let transient_items = btreemap! {
637                exported_index_id => TransientItem::new(
638                    Some(full_name.into_parts()),
639                    Some(on_desc.iter_names().map(|c| c.to_string()).collect()),
640                )
641            };
642            ExprHumanizerExt::new(transient_items, &session_catalog)
643        };
644
645        let target_cluster = self.catalog().get_cluster(index.cluster_id);
646
647        let features = OptimizerFeatures::from(self.catalog().system_config())
648            .override_from(&target_cluster.config.features())
649            .override_from(&self.cluster_scoped_optimizer_overrides(index.cluster_id))
650            .override_from(&config.features);
651
652        let rows = optimizer_trace
653            .into_rows(
654                format,
655                &config,
656                &features,
657                &expr_humanizer,
658                None,
659                Some(target_cluster),
660                df_meta,
661                stage,
662                plan::ExplaineeStatementKind::CreateIndex,
663                None,
664            )
665            .await?;
666
667        Ok(StageResult::Response(Self::send_immediate_rows(rows)))
668    }
669}