Skip to main content

mz_adapter/coord/sequencer/inner/
create_index.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11
12use maplit::btreemap;
13use mz_catalog::memory::error::ErrorKind;
14use mz_catalog::memory::objects::{CatalogItem, Index};
15use mz_ore::instrument;
16use mz_repr::explain::{ExprHumanizerExt, TransientItem};
17use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
18use mz_repr::{Datum, Row};
19use mz_sql::ast::ExplainStage;
20use mz_sql::catalog::CatalogError;
21use mz_sql::names::ResolvedIds;
22use mz_sql::plan;
23use tracing::Span;
24
25use crate::command::ExecuteResponse;
26use crate::coord::sequencer::inner::return_if_err;
27use crate::coord::{
28    Coordinator, CreateIndexExplain, CreateIndexFinish, CreateIndexOptimize, CreateIndexStage,
29    ExplainContext, ExplainPlanContext, Message, PlanValidity, StageResult, Staged,
30};
31use crate::error::AdapterError;
32use crate::explain::explain_dataflow;
33use crate::explain::optimizer_trace::OptimizerTrace;
34use crate::optimize::dataflows::dataflow_import_id_bundle;
35use crate::optimize::{self, Optimize};
36use crate::session::Session;
37use crate::{AdapterNotice, ExecuteContext, catalog};
38
39impl Staged for CreateIndexStage {
40    type Ctx = ExecuteContext;
41
42    fn validity(&mut self) -> &mut PlanValidity {
43        match self {
44            Self::Optimize(stage) => &mut stage.validity,
45            Self::Finish(stage) => &mut stage.validity,
46            Self::Explain(stage) => &mut stage.validity,
47        }
48    }
49
50    async fn stage(
51        self,
52        coord: &mut Coordinator,
53        ctx: &mut ExecuteContext,
54    ) -> Result<StageResult<Box<Self>>, AdapterError> {
55        match self {
56            CreateIndexStage::Optimize(stage) => coord.create_index_optimize(stage).await,
57            CreateIndexStage::Finish(stage) => coord.create_index_finish(ctx, stage).await,
58            CreateIndexStage::Explain(stage) => {
59                coord.create_index_explain(ctx.session(), stage).await
60            }
61        }
62    }
63
64    fn message(self, ctx: ExecuteContext, span: Span) -> Message {
65        Message::CreateIndexStageReady {
66            ctx,
67            span,
68            stage: self,
69        }
70    }
71
72    fn cancel_enabled(&self) -> bool {
73        true
74    }
75}
76
77impl Coordinator {
78    #[instrument]
79    pub(crate) async fn sequence_create_index(
80        &mut self,
81        ctx: ExecuteContext,
82        plan: plan::CreateIndexPlan,
83        resolved_ids: ResolvedIds,
84    ) {
85        let stage = return_if_err!(
86            self.create_index_validate(plan, resolved_ids, ExplainContext::None),
87            ctx
88        );
89        self.sequence_staged(ctx, Span::current(), stage).await;
90    }
91
92    #[instrument]
93    pub(crate) async fn explain_create_index(
94        &mut self,
95        ctx: ExecuteContext,
96        plan::ExplainPlanPlan {
97            stage,
98            format,
99            config,
100            explainee,
101        }: plan::ExplainPlanPlan,
102    ) {
103        let plan::Explainee::Statement(stmt) = explainee else {
104            // This is currently asserted in the `sequence_explain_plan` code that
105            // calls this method.
106            unreachable!()
107        };
108        let plan::ExplaineeStatement::CreateIndex { broken, plan } = stmt else {
109            // This is currently asserted in the `sequence_explain_plan` code that
110            // calls this method.
111            unreachable!()
112        };
113
114        // Create an OptimizerTrace instance to collect plans emitted when
115        // executing the optimizer pipeline.
116        let optimizer_trace = OptimizerTrace::new(stage.paths());
117
118        // Not used in the EXPLAIN path so it's OK to generate a dummy value.
119        let resolved_ids = ResolvedIds::empty();
120
121        let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
122            broken,
123            config,
124            format,
125            stage,
126            replan: None,
127            desc: None,
128            optimizer_trace,
129        });
130        let stage = return_if_err!(
131            self.create_index_validate(plan, resolved_ids, explain_ctx),
132            ctx
133        );
134        self.sequence_staged(ctx, Span::current(), stage).await;
135    }
136
137    #[instrument]
138    pub(crate) async fn explain_replan_index(
139        &mut self,
140        ctx: ExecuteContext,
141        plan::ExplainPlanPlan {
142            stage,
143            format,
144            config,
145            explainee,
146        }: plan::ExplainPlanPlan,
147    ) {
148        let plan::Explainee::ReplanIndex(id) = explainee else {
149            unreachable!() // Asserted in `sequence_explain_plan`.
150        };
151        let CatalogItem::Index(index) = self.catalog().get_entry(&id).item() else {
152            unreachable!() // Asserted in `plan_explain_plan`.
153        };
154        let id = index.global_id();
155
156        let create_sql = index.create_sql.clone();
157        let plan_result = self
158            .catalog_mut()
159            .deserialize_plan_with_enable_for_item_parsing(&create_sql, true);
160        let (plan, resolved_ids) = return_if_err!(plan_result, ctx);
161
162        let plan::Plan::CreateIndex(plan) = plan else {
163            unreachable!() // We are parsing the `create_sql` of an `Index` item.
164        };
165
166        // It is safe to assume that query optimization will always succeed, so
167        // for now we statically assume `broken = false`.
168        let broken = false;
169
170        // Create an OptimizerTrace instance to collect plans emitted when
171        // executing the optimizer pipeline.
172        let optimizer_trace = OptimizerTrace::new(stage.paths());
173
174        let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
175            broken,
176            config,
177            format,
178            stage,
179            replan: Some(id),
180            desc: None,
181            optimizer_trace,
182        });
183        let stage = return_if_err!(
184            self.create_index_validate(plan, resolved_ids, explain_ctx),
185            ctx
186        );
187        self.sequence_staged(ctx, Span::current(), stage).await;
188    }
189
190    #[instrument]
191    pub(crate) fn explain_index(
192        &self,
193        ctx: &ExecuteContext,
194        plan::ExplainPlanPlan {
195            stage,
196            format,
197            config,
198            explainee,
199        }: plan::ExplainPlanPlan,
200    ) -> Result<ExecuteResponse, AdapterError> {
201        let plan::Explainee::Index(id) = explainee else {
202            unreachable!() // Asserted in `sequence_explain_plan`.
203        };
204        let CatalogItem::Index(index) = self.catalog().get_entry(&id).item() else {
205            unreachable!() // Asserted in `plan_explain_plan`.
206        };
207
208        let Some(dataflow_metainfo) = self.catalog().try_get_dataflow_metainfo(&index.global_id())
209        else {
210            if !id.is_system() {
211                tracing::error!("cannot find dataflow metainformation for index {id} in catalog");
212            }
213            coord_bail!("cannot find dataflow metainformation for index {id} in catalog");
214        };
215
216        let target_cluster = self.catalog().get_cluster(index.cluster_id);
217
218        let features = OptimizerFeatures::from(self.catalog().system_config())
219            .override_from(&target_cluster.config.features())
220            .override_from(&config.features);
221
222        // TODO(mgree): calculate statistics (need a timestamp)
223        let cardinality_stats = BTreeMap::new();
224
225        let explain = match stage {
226            ExplainStage::GlobalPlan => {
227                let Some(plan) = self
228                    .catalog()
229                    .try_get_optimized_plan(&index.global_id())
230                    .cloned()
231                else {
232                    tracing::error!("cannot find {stage} for index {id} in catalog");
233                    coord_bail!("cannot find {stage} for index in catalog");
234                };
235
236                explain_dataflow(
237                    plan,
238                    format,
239                    &config,
240                    &features,
241                    &self.catalog().for_session(ctx.session()),
242                    cardinality_stats,
243                    Some(target_cluster.name.as_str()),
244                    dataflow_metainfo,
245                )?
246            }
247            ExplainStage::PhysicalPlan => {
248                let Some(plan) = self
249                    .catalog()
250                    .try_get_physical_plan(&index.global_id())
251                    .cloned()
252                else {
253                    tracing::error!("cannot find {stage} for index {id} in catalog");
254                    coord_bail!("cannot find {stage} for index in catalog");
255                };
256                explain_dataflow(
257                    plan,
258                    format,
259                    &config,
260                    &features,
261                    &self.catalog().for_session(ctx.session()),
262                    cardinality_stats,
263                    Some(target_cluster.name.as_str()),
264                    dataflow_metainfo,
265                )?
266            }
267            _ => {
268                coord_bail!("cannot EXPLAIN {} FOR INDEX", stage);
269            }
270        };
271
272        let row = Row::pack_slice(&[Datum::from(explain.as_str())]);
273
274        Ok(Self::send_immediate_rows(row))
275    }
276
277    // `explain_ctx` is an optional context set iff the state machine is initiated from
278    // sequencing an EXPLAIN for this statement.
279    #[instrument]
280    fn create_index_validate(
281        &self,
282        plan: plan::CreateIndexPlan,
283        resolved_ids: ResolvedIds,
284        explain_ctx: ExplainContext,
285    ) -> Result<CreateIndexStage, AdapterError> {
286        let validity =
287            PlanValidity::require_transient_revision(self.catalog().transient_revision());
288        Ok(CreateIndexStage::Optimize(CreateIndexOptimize {
289            validity,
290            plan,
291            resolved_ids,
292            explain_ctx,
293        }))
294    }
295
296    #[instrument]
297    async fn create_index_optimize(
298        &mut self,
299        CreateIndexOptimize {
300            validity,
301            plan,
302            resolved_ids,
303            explain_ctx,
304        }: CreateIndexOptimize,
305    ) -> Result<StageResult<Box<CreateIndexStage>>, AdapterError> {
306        let plan::CreateIndexPlan {
307            index: plan::Index { cluster_id, .. },
308            ..
309        } = &plan;
310
311        // Collect optimizer parameters.
312        let compute_instance = self
313            .instance_snapshot(*cluster_id)
314            .expect("compute instance does not exist");
315        let (item_id, global_id) = if let ExplainContext::None = explain_ctx {
316            let id_ts = self.get_catalog_write_ts().await;
317            self.catalog().allocate_user_id(id_ts).await?
318        } else {
319            self.allocate_transient_id()
320        };
321
322        let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config())
323            .override_from(&self.catalog.get_cluster(*cluster_id).config.features())
324            .override_from(&explain_ctx);
325        let optimizer_features = optimizer_config.features.clone();
326
327        // Build an optimizer for this INDEX.
328        let mut optimizer = optimize::index::Optimizer::new(
329            self.owned_catalog(),
330            compute_instance,
331            global_id,
332            optimizer_config,
333            self.optimizer_metrics(),
334        );
335        let span = Span::current();
336        Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
337            || "optimize create index",
338            move || {
339                span.in_scope(|| {
340                    let mut pipeline = || -> Result<(
341                    optimize::index::GlobalMirPlan,
342                    optimize::index::GlobalLirPlan,
343                ), AdapterError> {
344                    let _dispatch_guard = explain_ctx.dispatch_guard();
345
346                    let index_plan = optimize::index::Index::new(
347                        plan.name.clone(),
348                        plan.index.on,
349                        plan.index.keys.clone(),
350                    );
351
352                    // MIR ⇒ MIR optimization (global)
353                    let global_mir_plan = optimizer.catch_unwind_optimize(index_plan)?;
354                    // MIR ⇒ LIR lowering and LIR ⇒ LIR optimization (global)
355                    let global_lir_plan = optimizer.catch_unwind_optimize(global_mir_plan.clone())?;
356
357                    Ok((global_mir_plan, global_lir_plan))
358                };
359
360                    let stage = match pipeline() {
361                        Ok((global_mir_plan, global_lir_plan)) => {
362                            if let ExplainContext::Plan(explain_ctx) = explain_ctx {
363                                let (_, df_meta) = global_lir_plan.unapply();
364                                CreateIndexStage::Explain(CreateIndexExplain {
365                                    validity,
366                                    exported_index_id: global_id,
367                                    plan,
368                                    df_meta,
369                                    explain_ctx,
370                                })
371                            } else {
372                                CreateIndexStage::Finish(CreateIndexFinish {
373                                    validity,
374                                    item_id,
375                                    global_id,
376                                    plan,
377                                    resolved_ids,
378                                    global_mir_plan,
379                                    global_lir_plan,
380                                    optimizer_features,
381                                })
382                            }
383                        }
384                        // Internal optimizer errors are handled differently
385                        // depending on the caller.
386                        Err(err) => {
387                            let ExplainContext::Plan(explain_ctx) = explain_ctx else {
388                                // In `sequence_~` contexts, immediately error.
389                                return Err(err);
390                            };
391
392                            if explain_ctx.broken {
393                                // In `EXPLAIN BROKEN` contexts, just log the error
394                                // and move to the next stage with default
395                                // parameters.
396                                tracing::error!("error while handling EXPLAIN statement: {}", err);
397                                CreateIndexStage::Explain(CreateIndexExplain {
398                                    validity,
399                                    exported_index_id: global_id,
400                                    plan,
401                                    df_meta: Default::default(),
402                                    explain_ctx,
403                                })
404                            } else {
405                                // In regular `EXPLAIN` contexts, immediately error.
406                                return Err(err);
407                            }
408                        }
409                    };
410                    Ok(Box::new(stage))
411                })
412            },
413        )))
414    }
415
416    #[instrument]
417    async fn create_index_finish(
418        &mut self,
419        ctx: &mut ExecuteContext,
420        stage: CreateIndexFinish,
421    ) -> Result<StageResult<Box<CreateIndexStage>>, AdapterError> {
422        let CreateIndexFinish {
423            item_id,
424            global_id,
425            plan:
426                plan::CreateIndexPlan {
427                    name,
428                    index:
429                        plan::Index {
430                            create_sql,
431                            on,
432                            keys,
433                            cluster_id,
434                            compaction_window,
435                        },
436                    if_not_exists,
437                },
438            resolved_ids,
439            global_mir_plan,
440            global_lir_plan,
441            optimizer_features,
442            ..
443        } = stage;
444        let id_bundle = dataflow_import_id_bundle(global_lir_plan.df_desc(), cluster_id);
445
446        let ops = vec![catalog::Op::CreateItem {
447            id: item_id,
448            name: name.clone(),
449            item: CatalogItem::Index(Index {
450                create_sql,
451                global_id,
452                keys: keys.into(),
453                on,
454                conn_id: None,
455                resolved_ids,
456                cluster_id,
457                is_retained_metrics_object: false,
458                custom_logical_compaction_window: compaction_window,
459            }),
460            owner_id: *self.catalog().get_entry_by_global_id(&on).owner_id(),
461        }];
462
463        // Pre-allocate a vector of transient GlobalIds for each notice.
464        let notice_ids = std::iter::repeat_with(|| self.allocate_transient_id())
465            .map(|(_item_id, global_id)| global_id)
466            .take(global_lir_plan.df_meta().optimizer_notices.len())
467            .collect::<Vec<_>>();
468
469        let transact_result = self
470            .catalog_transact_with_side_effects(Some(ctx), ops, move |coord, ctx| {
471                Box::pin(async move {
472                    let (mut df_desc, df_meta) = global_lir_plan.unapply();
473
474                    // Save plan structures.
475                    coord
476                        .catalog_mut()
477                        .set_optimized_plan(global_id, global_mir_plan.df_desc().clone());
478                    coord
479                        .catalog_mut()
480                        .set_physical_plan(global_id, df_desc.clone());
481
482                    let notice_builtin_updates_fut = coord
483                        .process_dataflow_metainfo(df_meta, global_id, ctx, notice_ids)
484                        .await;
485
486                    coord
487                        .catalog()
488                        .cache_expressions(global_id, None, optimizer_features);
489
490                    // We're putting in place read holds, such that ship_dataflow,
491                    // below, which calls update_read_capabilities, can successfully
492                    // do so. Otherwise, the since of dependencies might move along
493                    // concurrently, pulling the rug from under us!
494                    //
495                    // TODO: Maybe in the future, pass those holds on to compute, to
496                    // hold on to them and downgrade when possible?
497                    let read_holds = coord.acquire_read_holds(&id_bundle);
498                    let since = read_holds.least_valid_read();
499                    df_desc.set_as_of(since);
500
501                    coord
502                        .ship_dataflow_and_notice_builtin_table_updates(
503                            df_desc,
504                            cluster_id,
505                            notice_builtin_updates_fut,
506                            None,
507                        )
508                        .await;
509                    // No `allow_writes` here because indexes do not modify external state.
510
511                    // Drop read holds after the dataflow has been shipped, at which
512                    // point compute will have put in its own read holds.
513                    drop(read_holds);
514
515                    coord.update_compute_read_policy(
516                        cluster_id,
517                        item_id,
518                        compaction_window.unwrap_or_default().into(),
519                    );
520                })
521            })
522            .await;
523
524        match transact_result {
525            Ok(_) => Ok(StageResult::Response(ExecuteResponse::CreatedIndex)),
526            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
527                kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
528            })) if if_not_exists => {
529                ctx.session()
530                    .add_notice(AdapterNotice::ObjectAlreadyExists {
531                        name: name.item,
532                        ty: "index",
533                    });
534                Ok(StageResult::Response(ExecuteResponse::CreatedIndex))
535            }
536            Err(err) => Err(err),
537        }
538    }
539
540    #[instrument]
541    async fn create_index_explain(
542        &self,
543        session: &Session,
544        CreateIndexExplain {
545            exported_index_id,
546            plan: plan::CreateIndexPlan { name, index, .. },
547            df_meta,
548            explain_ctx:
549                ExplainPlanContext {
550                    config,
551                    format,
552                    stage,
553                    optimizer_trace,
554                    ..
555                },
556            ..
557        }: CreateIndexExplain,
558    ) -> Result<StageResult<Box<CreateIndexStage>>, AdapterError> {
559        let session_catalog = self.catalog().for_session(session);
560        let expr_humanizer = {
561            let on_entry = self.catalog.get_entry_by_global_id(&index.on);
562            let full_name = self.catalog.resolve_full_name(&name, on_entry.conn_id());
563            let on_desc = on_entry
564                .relation_desc()
565                .expect("can only create indexes on items with a valid description");
566
567            let transient_items = btreemap! {
568                exported_index_id => TransientItem::new(
569                    Some(full_name.into_parts()),
570                    Some(on_desc.iter_names().map(|c| c.to_string()).collect()),
571                )
572            };
573            ExprHumanizerExt::new(transient_items, &session_catalog)
574        };
575
576        let target_cluster = self.catalog().get_cluster(index.cluster_id);
577
578        let features = OptimizerFeatures::from(self.catalog().system_config())
579            .override_from(&target_cluster.config.features())
580            .override_from(&config.features);
581
582        let rows = optimizer_trace
583            .into_rows(
584                format,
585                &config,
586                &features,
587                &expr_humanizer,
588                None,
589                Some(target_cluster),
590                df_meta,
591                stage,
592                plan::ExplaineeStatementKind::CreateIndex,
593                None,
594            )
595            .await?;
596
597        Ok(StageResult::Response(Self::send_immediate_rows(rows)))
598    }
599}