Skip to main content

mz_adapter/coord/sequencer/inner/
create_index.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11
12use maplit::btreemap;
13use mz_catalog::memory::error::ErrorKind;
14use mz_catalog::memory::objects::{CatalogItem, Index};
15use mz_ore::instrument;
16use mz_repr::explain::{ExprHumanizerExt, TransientItem};
17use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
18use mz_repr::{Datum, Row};
19use mz_sql::ast::ExplainStage;
20use mz_sql::catalog::CatalogError;
21use mz_sql::names::ResolvedIds;
22use mz_sql::plan;
23use tracing::Span;
24
25use crate::catalog::CatalogState;
26use crate::command::ExecuteResponse;
27use crate::coord::sequencer::inner::return_if_err;
28use crate::coord::{
29    Coordinator, CreateIndexExplain, CreateIndexFinish, CreateIndexOptimize, CreateIndexStage,
30    ExplainContext, ExplainPlanContext, Message, PlanValidity, StageResult, Staged,
31};
32use crate::error::AdapterError;
33use crate::explain::explain_dataflow;
34use crate::explain::optimizer_trace::OptimizerTrace;
35use crate::optimize::dataflows::dataflow_import_id_bundle;
36use crate::optimize::{self, Optimize};
37use crate::session::Session;
38use crate::{AdapterNotice, ExecuteContext, catalog};
39
40impl Staged for CreateIndexStage {
41    type Ctx = ExecuteContext;
42
43    fn validity(&mut self) -> &mut PlanValidity {
44        match self {
45            Self::Optimize(stage) => &mut stage.validity,
46            Self::Finish(stage) => &mut stage.validity,
47            Self::Explain(stage) => &mut stage.validity,
48        }
49    }
50
51    async fn stage(
52        self,
53        coord: &mut Coordinator,
54        ctx: &mut ExecuteContext,
55    ) -> Result<StageResult<Box<Self>>, AdapterError> {
56        match self {
57            CreateIndexStage::Optimize(stage) => coord.create_index_optimize(stage).await,
58            CreateIndexStage::Finish(stage) => coord.create_index_finish(ctx, stage).await,
59            CreateIndexStage::Explain(stage) => {
60                coord.create_index_explain(ctx.session(), stage).await
61            }
62        }
63    }
64
65    fn message(self, ctx: ExecuteContext, span: Span) -> Message {
66        Message::CreateIndexStageReady {
67            ctx,
68            span,
69            stage: self,
70        }
71    }
72
73    fn cancel_enabled(&self) -> bool {
74        true
75    }
76}
77
78impl Coordinator {
79    #[instrument]
80    pub(crate) async fn sequence_create_index(
81        &mut self,
82        ctx: ExecuteContext,
83        plan: plan::CreateIndexPlan,
84        resolved_ids: ResolvedIds,
85    ) {
86        let stage = return_if_err!(
87            self.create_index_validate(plan, resolved_ids, ExplainContext::None),
88            ctx
89        );
90        self.sequence_staged(ctx, Span::current(), stage).await;
91    }
92
93    #[instrument]
94    pub(crate) async fn explain_create_index(
95        &mut self,
96        ctx: ExecuteContext,
97        plan::ExplainPlanPlan {
98            stage,
99            format,
100            config,
101            explainee,
102        }: plan::ExplainPlanPlan,
103    ) {
104        let plan::Explainee::Statement(stmt) = explainee else {
105            // This is currently asserted in the `sequence_explain_plan` code that
106            // calls this method.
107            unreachable!()
108        };
109        let plan::ExplaineeStatement::CreateIndex { broken, plan } = stmt else {
110            // This is currently asserted in the `sequence_explain_plan` code that
111            // calls this method.
112            unreachable!()
113        };
114
115        // Create an OptimizerTrace instance to collect plans emitted when
116        // executing the optimizer pipeline.
117        let optimizer_trace = OptimizerTrace::new(stage.paths());
118
119        // Not used in the EXPLAIN path so it's OK to generate a dummy value.
120        let resolved_ids = ResolvedIds::empty();
121
122        let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
123            broken,
124            config,
125            format,
126            stage,
127            replan: None,
128            desc: None,
129            optimizer_trace,
130        });
131        let stage = return_if_err!(
132            self.create_index_validate(plan, resolved_ids, explain_ctx),
133            ctx
134        );
135        self.sequence_staged(ctx, Span::current(), stage).await;
136    }
137
138    #[instrument]
139    pub(crate) async fn explain_replan_index(
140        &mut self,
141        ctx: ExecuteContext,
142        plan::ExplainPlanPlan {
143            stage,
144            format,
145            config,
146            explainee,
147        }: plan::ExplainPlanPlan,
148    ) {
149        let plan::Explainee::ReplanIndex(id) = explainee else {
150            unreachable!() // Asserted in `sequence_explain_plan`.
151        };
152        let CatalogItem::Index(index) = self.catalog().get_entry(&id).item() else {
153            unreachable!() // Asserted in `plan_explain_plan`.
154        };
155        let id = index.global_id();
156
157        let create_sql = index.create_sql.clone();
158        let plan_result = self
159            .catalog_mut()
160            .deserialize_plan_with_enable_for_item_parsing(&create_sql, true);
161        let (plan, resolved_ids) = return_if_err!(plan_result, ctx);
162
163        let plan::Plan::CreateIndex(plan) = plan else {
164            unreachable!() // We are parsing the `create_sql` of an `Index` item.
165        };
166
167        // It is safe to assume that query optimization will always succeed, so
168        // for now we statically assume `broken = false`.
169        let broken = false;
170
171        // Create an OptimizerTrace instance to collect plans emitted when
172        // executing the optimizer pipeline.
173        let optimizer_trace = OptimizerTrace::new(stage.paths());
174
175        let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
176            broken,
177            config,
178            format,
179            stage,
180            replan: Some(id),
181            desc: None,
182            optimizer_trace,
183        });
184        let stage = return_if_err!(
185            self.create_index_validate(plan, resolved_ids, explain_ctx),
186            ctx
187        );
188        self.sequence_staged(ctx, Span::current(), stage).await;
189    }
190
191    #[instrument]
192    pub(crate) fn explain_index(
193        &self,
194        ctx: &ExecuteContext,
195        plan::ExplainPlanPlan {
196            stage,
197            format,
198            config,
199            explainee,
200        }: plan::ExplainPlanPlan,
201    ) -> Result<ExecuteResponse, AdapterError> {
202        let plan::Explainee::Index(id) = explainee else {
203            unreachable!() // Asserted in `sequence_explain_plan`.
204        };
205        let CatalogItem::Index(index) = self.catalog().get_entry(&id).item() else {
206            unreachable!() // Asserted in `plan_explain_plan`.
207        };
208
209        let Some(dataflow_metainfo) = self.catalog().try_get_dataflow_metainfo(&index.global_id())
210        else {
211            if !id.is_system() {
212                tracing::error!("cannot find dataflow metainformation for index {id} in catalog");
213            }
214            coord_bail!("cannot find dataflow metainformation for index {id} in catalog");
215        };
216
217        let target_cluster = self.catalog().get_cluster(index.cluster_id);
218
219        let features = OptimizerFeatures::from(self.catalog().system_config())
220            .override_from(&target_cluster.config.features())
221            .override_from(&config.features);
222
223        // TODO(mgree): calculate statistics (need a timestamp)
224        let cardinality_stats = BTreeMap::new();
225
226        let explain = match stage {
227            ExplainStage::GlobalPlan => {
228                let Some(plan) = self
229                    .catalog()
230                    .try_get_optimized_plan(&index.global_id())
231                    .cloned()
232                else {
233                    tracing::error!("cannot find {stage} for index {id} in catalog");
234                    coord_bail!("cannot find {stage} for index in catalog");
235                };
236
237                explain_dataflow(
238                    plan,
239                    format,
240                    &config,
241                    &features,
242                    &self.catalog().for_session(ctx.session()),
243                    cardinality_stats,
244                    Some(target_cluster.name.as_str()),
245                    dataflow_metainfo,
246                )?
247            }
248            ExplainStage::PhysicalPlan => {
249                let Some(plan) = self
250                    .catalog()
251                    .try_get_physical_plan(&index.global_id())
252                    .cloned()
253                else {
254                    tracing::error!("cannot find {stage} for index {id} in catalog");
255                    coord_bail!("cannot find {stage} for index in catalog");
256                };
257                explain_dataflow(
258                    plan,
259                    format,
260                    &config,
261                    &features,
262                    &self.catalog().for_session(ctx.session()),
263                    cardinality_stats,
264                    Some(target_cluster.name.as_str()),
265                    dataflow_metainfo,
266                )?
267            }
268            _ => {
269                coord_bail!("cannot EXPLAIN {} FOR INDEX", stage);
270            }
271        };
272
273        let row = Row::pack_slice(&[Datum::from(explain.as_str())]);
274
275        Ok(Self::send_immediate_rows(row))
276    }
277
278    // `explain_ctx` is an optional context set iff the state machine is initiated from
279    // sequencing an EXPLAIN for this statement.
280    #[instrument]
281    fn create_index_validate(
282        &self,
283        plan: plan::CreateIndexPlan,
284        resolved_ids: ResolvedIds,
285        explain_ctx: ExplainContext,
286    ) -> Result<CreateIndexStage, AdapterError> {
287        let validity =
288            PlanValidity::require_transient_revision(self.catalog().transient_revision());
289        Ok(CreateIndexStage::Optimize(CreateIndexOptimize {
290            validity,
291            plan,
292            resolved_ids,
293            explain_ctx,
294        }))
295    }
296
297    #[instrument]
298    async fn create_index_optimize(
299        &mut self,
300        CreateIndexOptimize {
301            validity,
302            plan,
303            resolved_ids,
304            explain_ctx,
305        }: CreateIndexOptimize,
306    ) -> Result<StageResult<Box<CreateIndexStage>>, AdapterError> {
307        let plan::CreateIndexPlan {
308            index: plan::Index { cluster_id, .. },
309            ..
310        } = &plan;
311
312        // Collect optimizer parameters.
313        let compute_instance = self
314            .instance_snapshot(*cluster_id)
315            .expect("compute instance does not exist");
316        let (item_id, global_id) = if let ExplainContext::None = explain_ctx {
317            self.allocate_user_id().await?
318        } else {
319            self.allocate_transient_id()
320        };
321
322        let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config())
323            .override_from(&self.catalog.get_cluster(*cluster_id).config.features())
324            .override_from(&explain_ctx);
325        let optimizer_features = optimizer_config.features.clone();
326
327        // Build an optimizer for this INDEX.
328        let mut optimizer = optimize::index::Optimizer::new(
329            self.owned_catalog(),
330            compute_instance,
331            global_id,
332            optimizer_config,
333            self.optimizer_metrics(),
334        );
335        let span = Span::current();
336        Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
337            || "optimize create index",
338            move || {
339                span.in_scope(|| {
340                    let mut pipeline = || -> Result<(
341                    optimize::index::GlobalMirPlan,
342                    optimize::index::GlobalLirPlan,
343                ), AdapterError> {
344                    let _dispatch_guard = explain_ctx.dispatch_guard();
345
346                    let index_plan = optimize::index::Index::new(
347                        plan.name.clone(),
348                        plan.index.on,
349                        plan.index.keys.clone(),
350                    );
351
352                    // MIR ⇒ MIR optimization (global)
353                    let global_mir_plan = optimizer.catch_unwind_optimize(index_plan)?;
354                    // MIR ⇒ LIR lowering and LIR ⇒ LIR optimization (global)
355                    let global_lir_plan = optimizer.catch_unwind_optimize(global_mir_plan.clone())?;
356
357                    Ok((global_mir_plan, global_lir_plan))
358                };
359
360                    let stage = match pipeline() {
361                        Ok((global_mir_plan, global_lir_plan)) => {
362                            if let ExplainContext::Plan(explain_ctx) = explain_ctx {
363                                let (_, df_meta) = global_lir_plan.unapply();
364                                CreateIndexStage::Explain(CreateIndexExplain {
365                                    validity,
366                                    exported_index_id: global_id,
367                                    plan,
368                                    df_meta,
369                                    explain_ctx,
370                                })
371                            } else {
372                                CreateIndexStage::Finish(CreateIndexFinish {
373                                    validity,
374                                    item_id,
375                                    global_id,
376                                    plan,
377                                    resolved_ids,
378                                    global_mir_plan,
379                                    global_lir_plan,
380                                    optimizer_features,
381                                })
382                            }
383                        }
384                        // Internal optimizer errors are handled differently
385                        // depending on the caller.
386                        Err(err) => {
387                            let ExplainContext::Plan(explain_ctx) = explain_ctx else {
388                                // In `sequence_~` contexts, immediately error.
389                                return Err(err);
390                            };
391
392                            if explain_ctx.broken {
393                                // In `EXPLAIN BROKEN` contexts, just log the error
394                                // and move to the next stage with default
395                                // parameters.
396                                tracing::error!("error while handling EXPLAIN statement: {}", err);
397                                CreateIndexStage::Explain(CreateIndexExplain {
398                                    validity,
399                                    exported_index_id: global_id,
400                                    plan,
401                                    df_meta: Default::default(),
402                                    explain_ctx,
403                                })
404                            } else {
405                                // In regular `EXPLAIN` contexts, immediately error.
406                                return Err(err);
407                            }
408                        }
409                    };
410                    Ok(Box::new(stage))
411                })
412            },
413        )))
414    }
415
416    #[instrument]
417    async fn create_index_finish(
418        &mut self,
419        ctx: &mut ExecuteContext,
420        stage: CreateIndexFinish,
421    ) -> Result<StageResult<Box<CreateIndexStage>>, AdapterError> {
422        let CreateIndexFinish {
423            item_id,
424            global_id,
425            plan:
426                plan::CreateIndexPlan {
427                    name,
428                    index:
429                        plan::Index {
430                            create_sql,
431                            on,
432                            keys,
433                            cluster_id,
434                            compaction_window,
435                        },
436                    if_not_exists,
437                },
438            resolved_ids,
439            global_mir_plan,
440            global_lir_plan,
441            optimizer_features,
442            ..
443        } = stage;
444        let id_bundle = dataflow_import_id_bundle(global_lir_plan.df_desc(), cluster_id);
445
446        let on_entry = self.catalog().get_entry_by_global_id(&on);
447        let owner_id = *on_entry.owner_id();
448
449        let ops = vec![catalog::Op::CreateItem {
450            id: item_id,
451            name: name.clone(),
452            item: CatalogItem::Index(Index {
453                create_sql,
454                global_id,
455                keys: keys.into(),
456                on,
457                conn_id: None,
458                resolved_ids,
459                cluster_id,
460                is_retained_metrics_object: false,
461                custom_logical_compaction_window: compaction_window,
462                optimized_plan: None,
463                physical_plan: None,
464                dataflow_metainfo: None,
465            }),
466            owner_id,
467        }];
468
469        // Pre-allocate a vector of transient GlobalIds for each notice.
470        let notice_ids = std::iter::repeat_with(|| self.allocate_transient_id())
471            .map(|(_item_id, global_id)| global_id)
472            .take(global_lir_plan.df_meta().optimizer_notices.len())
473            .collect::<Vec<_>>();
474
475        // Render optimizer notices before the catalog transaction, using an
476        // `ExprHumanizerExt` that knows about the to-be-created index. This
477        // way the notice text produced here (and persisted in
478        // `mz_optimizer_notices`) resolves the new index's own `global_id` to
479        // its intended human-readable name, rather than to the bare transient
480        // id that `for_system_session()` would produce on its own.
481        //
482        // We keep `raw_df_meta` live so that on success we can emit its raw
483        // notices to the user session (rendered against the user's
484        // session-aware humanizer). We deliberately do NOT emit to the user
485        // here, so that if the catalog transaction below fails the user
486        // isn't shown confusing notices about an item that wasn't actually
487        // created.
488        let (mut df_desc, raw_df_meta) = global_lir_plan.unapply();
489        let df_meta = {
490            let system_catalog = self.catalog().for_system_session();
491            let full_name = self.catalog().resolve_full_name(&name, None);
492            let on_desc = on_entry
493                .relation_desc()
494                .expect("can only create indexes on items with a valid description");
495            let transient_items = btreemap! {
496                global_id => TransientItem::new(
497                    Some(full_name.into_parts()),
498                    Some(on_desc.iter_names().map(|c| c.to_string()).collect()),
499                )
500            };
501            let humanizer = ExprHumanizerExt::new(transient_items, &system_catalog);
502            CatalogState::render_notices_core(
503                &humanizer,
504                (self.catalog().config().now)(),
505                &raw_df_meta,
506                notice_ids,
507                Some(global_id),
508            )
509        };
510
511        // Populate the durable expression cache before the catalog
512        // transaction and await the write. This way any other envd (or a
513        // subsequent bootstrap here) will observe the cached plans +
514        // rendered notices as soon as the item becomes visible.
515        self.catalog()
516            .cache_expressions(
517                global_id,
518                None,
519                global_mir_plan.df_desc().clone(),
520                df_desc.clone(),
521                df_meta.clone(),
522                optimizer_features,
523            )
524            .await;
525
526        let transact_result = self
527            .catalog_transact_with_side_effects(Some(ctx), ops, move |coord, _ctx| {
528                Box::pin(async move {
529                    // Save plan structures.
530                    coord
531                        .catalog_mut()
532                        .set_optimized_plan(global_id, global_mir_plan.df_desc().clone());
533                    coord
534                        .catalog_mut()
535                        .set_physical_plan(global_id, df_desc.clone());
536
537                    let notice_builtin_updates_fut =
538                        coord.persist_dataflow_metainfo(df_meta, global_id).await;
539
540                    // We're putting in place read holds, such that ship_dataflow,
541                    // below, which calls update_read_capabilities, can successfully
542                    // do so. Otherwise, the since of dependencies might move along
543                    // concurrently, pulling the rug from under us!
544                    //
545                    // TODO: Maybe in the future, pass those holds on to compute, to
546                    // hold on to them and downgrade when possible?
547                    let read_holds = coord.acquire_read_holds(&id_bundle);
548                    let since = read_holds.least_valid_read();
549                    df_desc.set_as_of(since);
550
551                    coord
552                        .ship_dataflow_and_notice_builtin_table_updates(
553                            df_desc,
554                            cluster_id,
555                            notice_builtin_updates_fut,
556                            None,
557                        )
558                        .await;
559                    // No `allow_writes` here because indexes do not modify external state.
560
561                    // Drop read holds after the dataflow has been shipped, at which
562                    // point compute will have put in its own read holds.
563                    drop(read_holds);
564
565                    coord.update_compute_read_policy(
566                        cluster_id,
567                        item_id,
568                        compaction_window.unwrap_or_default().into(),
569                    );
570                })
571            })
572            .await;
573
574        match transact_result {
575            Ok(_) => {
576                // Only emit optimizer notices to the user now that the
577                // catalog transaction has succeeded. If the transaction had
578                // failed, emitting notices would confuse the user with
579                // information about an item that wasn't actually created.
580                self.emit_raw_optimizer_notices_to_user(ctx, &raw_df_meta.optimizer_notices);
581                Ok(StageResult::Response(ExecuteResponse::CreatedIndex))
582            }
583            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
584                kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
585            })) if if_not_exists => {
586                ctx.session()
587                    .add_notice(AdapterNotice::ObjectAlreadyExists {
588                        name: name.item,
589                        ty: "index",
590                    });
591                Ok(StageResult::Response(ExecuteResponse::CreatedIndex))
592            }
593            Err(err) => Err(err),
594        }
595    }
596
597    #[instrument]
598    async fn create_index_explain(
599        &self,
600        session: &Session,
601        CreateIndexExplain {
602            exported_index_id,
603            plan: plan::CreateIndexPlan { name, index, .. },
604            df_meta,
605            explain_ctx:
606                ExplainPlanContext {
607                    config,
608                    format,
609                    stage,
610                    optimizer_trace,
611                    ..
612                },
613            ..
614        }: CreateIndexExplain,
615    ) -> Result<StageResult<Box<CreateIndexStage>>, AdapterError> {
616        let session_catalog = self.catalog().for_session(session);
617        let expr_humanizer = {
618            let on_entry = self.catalog.get_entry_by_global_id(&index.on);
619            let full_name = self.catalog.resolve_full_name(&name, on_entry.conn_id());
620            let on_desc = on_entry
621                .relation_desc()
622                .expect("can only create indexes on items with a valid description");
623
624            let transient_items = btreemap! {
625                exported_index_id => TransientItem::new(
626                    Some(full_name.into_parts()),
627                    Some(on_desc.iter_names().map(|c| c.to_string()).collect()),
628                )
629            };
630            ExprHumanizerExt::new(transient_items, &session_catalog)
631        };
632
633        let target_cluster = self.catalog().get_cluster(index.cluster_id);
634
635        let features = OptimizerFeatures::from(self.catalog().system_config())
636            .override_from(&target_cluster.config.features())
637            .override_from(&config.features);
638
639        let rows = optimizer_trace
640            .into_rows(
641                format,
642                &config,
643                &features,
644                &expr_humanizer,
645                None,
646                Some(target_cluster),
647                df_meta,
648                stage,
649                plan::ExplaineeStatementKind::CreateIndex,
650                None,
651            )
652            .await?;
653
654        Ok(StageResult::Response(Self::send_immediate_rows(rows)))
655    }
656}