mz_adapter/coord/sequencer/inner/
create_index.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11
12use maplit::btreemap;
13use mz_catalog::memory::objects::{CatalogItem, Index};
14use mz_ore::instrument;
15use mz_repr::explain::{ExprHumanizerExt, TransientItem};
16use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
17use mz_repr::{Datum, Row};
18use mz_sql::ast::ExplainStage;
19use mz_sql::catalog::CatalogError;
20use mz_sql::names::ResolvedIds;
21use mz_sql::plan;
22use tracing::Span;
23
24use crate::command::ExecuteResponse;
25use crate::coord::sequencer::inner::return_if_err;
26use crate::coord::{
27    Coordinator, CreateIndexExplain, CreateIndexFinish, CreateIndexOptimize, CreateIndexStage,
28    ExplainContext, ExplainPlanContext, Message, PlanValidity, StageResult, Staged,
29};
30use crate::error::AdapterError;
31use crate::explain::explain_dataflow;
32use crate::explain::optimizer_trace::OptimizerTrace;
33use crate::optimize::dataflows::dataflow_import_id_bundle;
34use crate::optimize::{self, Optimize};
35use crate::session::Session;
36use crate::{AdapterNotice, ExecuteContext, catalog};
37
38impl Staged for CreateIndexStage {
39    type Ctx = ExecuteContext;
40
41    fn validity(&mut self) -> &mut PlanValidity {
42        match self {
43            Self::Optimize(stage) => &mut stage.validity,
44            Self::Finish(stage) => &mut stage.validity,
45            Self::Explain(stage) => &mut stage.validity,
46        }
47    }
48
49    async fn stage(
50        self,
51        coord: &mut Coordinator,
52        ctx: &mut ExecuteContext,
53    ) -> Result<StageResult<Box<Self>>, AdapterError> {
54        match self {
55            CreateIndexStage::Optimize(stage) => coord.create_index_optimize(stage).await,
56            CreateIndexStage::Finish(stage) => coord.create_index_finish(ctx, stage).await,
57            CreateIndexStage::Explain(stage) => {
58                coord.create_index_explain(ctx.session(), stage).await
59            }
60        }
61    }
62
63    fn message(self, ctx: ExecuteContext, span: Span) -> Message {
64        Message::CreateIndexStageReady {
65            ctx,
66            span,
67            stage: self,
68        }
69    }
70
71    fn cancel_enabled(&self) -> bool {
72        true
73    }
74}
75
76impl Coordinator {
77    #[instrument]
78    pub(crate) async fn sequence_create_index(
79        &mut self,
80        ctx: ExecuteContext,
81        plan: plan::CreateIndexPlan,
82        resolved_ids: ResolvedIds,
83    ) {
84        let stage = return_if_err!(
85            self.create_index_validate(plan, resolved_ids, ExplainContext::None),
86            ctx
87        );
88        self.sequence_staged(ctx, Span::current(), stage).await;
89    }
90
91    #[instrument]
92    pub(crate) async fn explain_create_index(
93        &mut self,
94        ctx: ExecuteContext,
95        plan::ExplainPlanPlan {
96            stage,
97            format,
98            config,
99            explainee,
100        }: plan::ExplainPlanPlan,
101    ) {
102        let plan::Explainee::Statement(stmt) = explainee else {
103            // This is currently asserted in the `sequence_explain_plan` code that
104            // calls this method.
105            unreachable!()
106        };
107        let plan::ExplaineeStatement::CreateIndex { broken, plan } = stmt else {
108            // This is currently asserted in the `sequence_explain_plan` code that
109            // calls this method.
110            unreachable!()
111        };
112
113        // Create an OptimizerTrace instance to collect plans emitted when
114        // executing the optimizer pipeline.
115        let optimizer_trace = OptimizerTrace::new(stage.paths());
116
117        // Not used in the EXPLAIN path so it's OK to generate a dummy value.
118        let resolved_ids = ResolvedIds::empty();
119
120        let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
121            broken,
122            config,
123            format,
124            stage,
125            replan: None,
126            desc: None,
127            optimizer_trace,
128        });
129        let stage = return_if_err!(
130            self.create_index_validate(plan, resolved_ids, explain_ctx),
131            ctx
132        );
133        self.sequence_staged(ctx, Span::current(), stage).await;
134    }
135
136    #[instrument]
137    pub(crate) async fn explain_replan_index(
138        &mut self,
139        ctx: ExecuteContext,
140        plan::ExplainPlanPlan {
141            stage,
142            format,
143            config,
144            explainee,
145        }: plan::ExplainPlanPlan,
146    ) {
147        let plan::Explainee::ReplanIndex(id) = explainee else {
148            unreachable!() // Asserted in `sequence_explain_plan`.
149        };
150        let CatalogItem::Index(index) = self.catalog().get_entry(&id).item() else {
151            unreachable!() // Asserted in `plan_explain_plan`.
152        };
153        let id = index.global_id();
154
155        let create_sql = index.create_sql.clone();
156        let plan_result = self
157            .catalog_mut()
158            .deserialize_plan_with_enable_for_item_parsing(&create_sql, true);
159        let (plan, resolved_ids) = return_if_err!(plan_result, ctx);
160
161        let plan::Plan::CreateIndex(plan) = plan else {
162            unreachable!() // We are parsing the `create_sql` of an `Index` item.
163        };
164
165        // It is safe to assume that query optimization will always succeed, so
166        // for now we statically assume `broken = false`.
167        let broken = false;
168
169        // Create an OptimizerTrace instance to collect plans emitted when
170        // executing the optimizer pipeline.
171        let optimizer_trace = OptimizerTrace::new(stage.paths());
172
173        let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
174            broken,
175            config,
176            format,
177            stage,
178            replan: Some(id),
179            desc: None,
180            optimizer_trace,
181        });
182        let stage = return_if_err!(
183            self.create_index_validate(plan, resolved_ids, explain_ctx),
184            ctx
185        );
186        self.sequence_staged(ctx, Span::current(), stage).await;
187    }
188
189    #[instrument]
190    pub(crate) fn explain_index(
191        &mut self,
192        ctx: &ExecuteContext,
193        plan::ExplainPlanPlan {
194            stage,
195            format,
196            config,
197            explainee,
198        }: plan::ExplainPlanPlan,
199    ) -> Result<ExecuteResponse, AdapterError> {
200        let plan::Explainee::Index(id) = explainee else {
201            unreachable!() // Asserted in `sequence_explain_plan`.
202        };
203        let CatalogItem::Index(index) = self.catalog().get_entry(&id).item() else {
204            unreachable!() // Asserted in `plan_explain_plan`.
205        };
206
207        let Some(dataflow_metainfo) = self.catalog().try_get_dataflow_metainfo(&index.global_id())
208        else {
209            if !id.is_system() {
210                tracing::error!("cannot find dataflow metainformation for index {id} in catalog");
211            }
212            coord_bail!("cannot find dataflow metainformation for index {id} in catalog");
213        };
214
215        let target_cluster = self.catalog().get_cluster(index.cluster_id);
216
217        let features = OptimizerFeatures::from(self.catalog().system_config())
218            .override_from(&target_cluster.config.features())
219            .override_from(&config.features);
220
221        // TODO(mgree): calculate statistics (need a timestamp)
222        let cardinality_stats = BTreeMap::new();
223
224        let explain = match stage {
225            ExplainStage::GlobalPlan => {
226                let Some(plan) = self
227                    .catalog()
228                    .try_get_optimized_plan(&index.global_id())
229                    .cloned()
230                else {
231                    tracing::error!("cannot find {stage} for index {id} in catalog");
232                    coord_bail!("cannot find {stage} for index in catalog");
233                };
234
235                explain_dataflow(
236                    plan,
237                    format,
238                    &config,
239                    &features,
240                    &self.catalog().for_session(ctx.session()),
241                    cardinality_stats,
242                    Some(target_cluster.name.as_str()),
243                    dataflow_metainfo,
244                )?
245            }
246            ExplainStage::PhysicalPlan => {
247                let Some(plan) = self
248                    .catalog()
249                    .try_get_physical_plan(&index.global_id())
250                    .cloned()
251                else {
252                    tracing::error!("cannot find {stage} for index {id} in catalog");
253                    coord_bail!("cannot find {stage} for index in catalog");
254                };
255                explain_dataflow(
256                    plan,
257                    format,
258                    &config,
259                    &features,
260                    &self.catalog().for_session(ctx.session()),
261                    cardinality_stats,
262                    Some(target_cluster.name.as_str()),
263                    dataflow_metainfo,
264                )?
265            }
266            _ => {
267                coord_bail!("cannot EXPLAIN {} FOR INDEX", stage);
268            }
269        };
270
271        let row = Row::pack_slice(&[Datum::from(explain.as_str())]);
272
273        Ok(Self::send_immediate_rows(row))
274    }
275
276    // `explain_ctx` is an optional context set iff the state machine is initiated from
277    // sequencing an EXPLAIN for this statement.
278    #[instrument]
279    fn create_index_validate(
280        &mut self,
281        plan: plan::CreateIndexPlan,
282        resolved_ids: ResolvedIds,
283        explain_ctx: ExplainContext,
284    ) -> Result<CreateIndexStage, AdapterError> {
285        let validity =
286            PlanValidity::require_transient_revision(self.catalog().transient_revision());
287        Ok(CreateIndexStage::Optimize(CreateIndexOptimize {
288            validity,
289            plan,
290            resolved_ids,
291            explain_ctx,
292        }))
293    }
294
295    #[instrument]
296    async fn create_index_optimize(
297        &mut self,
298        CreateIndexOptimize {
299            validity,
300            plan,
301            resolved_ids,
302            explain_ctx,
303        }: CreateIndexOptimize,
304    ) -> Result<StageResult<Box<CreateIndexStage>>, AdapterError> {
305        let plan::CreateIndexPlan {
306            index: plan::Index { cluster_id, .. },
307            ..
308        } = &plan;
309
310        // Collect optimizer parameters.
311        let compute_instance = self
312            .instance_snapshot(*cluster_id)
313            .expect("compute instance does not exist");
314        let (item_id, global_id) = if let ExplainContext::None = explain_ctx {
315            let id_ts = self.get_catalog_write_ts().await;
316            self.catalog_mut().allocate_user_id(id_ts).await?
317        } else {
318            self.allocate_transient_id()
319        };
320
321        let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config())
322            .override_from(&self.catalog.get_cluster(*cluster_id).config.features())
323            .override_from(&explain_ctx);
324
325        // Build an optimizer for this INDEX.
326        let mut optimizer = optimize::index::Optimizer::new(
327            self.owned_catalog(),
328            compute_instance,
329            global_id,
330            optimizer_config,
331            self.optimizer_metrics(),
332        );
333        let span = Span::current();
334        Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
335            || "optimize create index",
336            move || {
337                span.in_scope(|| {
338                    let mut pipeline = || -> Result<(
339                    optimize::index::GlobalMirPlan,
340                    optimize::index::GlobalLirPlan,
341                ), AdapterError> {
342                    let _dispatch_guard = explain_ctx.dispatch_guard();
343
344                    let index_plan =
345                        optimize::index::Index::new(plan.name.clone(), plan.index.on, plan.index.keys.clone());
346
347                    // MIR ⇒ MIR optimization (global)
348                    let global_mir_plan = optimizer.catch_unwind_optimize(index_plan)?;
349                    // MIR ⇒ LIR lowering and LIR ⇒ LIR optimization (global)
350                    let global_lir_plan = optimizer.catch_unwind_optimize(global_mir_plan.clone())?;
351
352                    Ok((global_mir_plan, global_lir_plan))
353                };
354
355                    let stage = match pipeline() {
356                        Ok((global_mir_plan, global_lir_plan)) => {
357                            if let ExplainContext::Plan(explain_ctx) = explain_ctx {
358                                let (_, df_meta) = global_lir_plan.unapply();
359                                CreateIndexStage::Explain(CreateIndexExplain {
360                                    validity,
361                                    exported_index_id: global_id,
362                                    plan,
363                                    df_meta,
364                                    explain_ctx,
365                                })
366                            } else {
367                                CreateIndexStage::Finish(CreateIndexFinish {
368                                    validity,
369                                    item_id,
370                                    global_id,
371                                    plan,
372                                    resolved_ids,
373                                    global_mir_plan,
374                                    global_lir_plan,
375                                })
376                            }
377                        }
378                        // Internal optimizer errors are handled differently
379                        // depending on the caller.
380                        Err(err) => {
381                            let ExplainContext::Plan(explain_ctx) = explain_ctx else {
382                                // In `sequence_~` contexts, immediately error.
383                                return Err(err);
384                            };
385
386                            if explain_ctx.broken {
387                                // In `EXPLAIN BROKEN` contexts, just log the error
388                                // and move to the next stage with default
389                                // parameters.
390                                tracing::error!("error while handling EXPLAIN statement: {}", err);
391                                CreateIndexStage::Explain(CreateIndexExplain {
392                                    validity,
393                                    exported_index_id: global_id,
394                                    plan,
395                                    df_meta: Default::default(),
396                                    explain_ctx,
397                                })
398                            } else {
399                                // In regular `EXPLAIN` contexts, immediately error.
400                                return Err(err);
401                            }
402                        }
403                    };
404                    Ok(Box::new(stage))
405                })
406            },
407        )))
408    }
409
410    #[instrument]
411    async fn create_index_finish(
412        &mut self,
413        ctx: &mut ExecuteContext,
414        stage: CreateIndexFinish,
415    ) -> Result<StageResult<Box<CreateIndexStage>>, AdapterError> {
416        let CreateIndexFinish {
417            item_id,
418            global_id,
419            plan:
420                plan::CreateIndexPlan {
421                    name,
422                    index:
423                        plan::Index {
424                            create_sql,
425                            on,
426                            keys,
427                            cluster_id,
428                            compaction_window,
429                        },
430                    if_not_exists,
431                },
432            resolved_ids,
433            global_mir_plan,
434            global_lir_plan,
435            ..
436        } = stage;
437        let id_bundle = dataflow_import_id_bundle(global_lir_plan.df_desc(), cluster_id);
438
439        let ops = vec![catalog::Op::CreateItem {
440            id: item_id,
441            name: name.clone(),
442            item: CatalogItem::Index(Index {
443                create_sql,
444                global_id,
445                keys: keys.into(),
446                on,
447                conn_id: None,
448                resolved_ids,
449                cluster_id,
450                is_retained_metrics_object: false,
451                custom_logical_compaction_window: compaction_window,
452            }),
453            owner_id: *self.catalog().get_entry_by_global_id(&on).owner_id(),
454        }];
455
456        // Pre-allocate a vector of transient GlobalIds for each notice.
457        let notice_ids = std::iter::repeat_with(|| self.allocate_transient_id())
458            .map(|(_item_id, global_id)| global_id)
459            .take(global_lir_plan.df_meta().optimizer_notices.len())
460            .collect::<Vec<_>>();
461
462        let transact_result = self
463            .catalog_transact_with_side_effects(Some(ctx), ops, move |coord, ctx| {
464                Box::pin(async move {
465                    let (mut df_desc, df_meta) = global_lir_plan.unapply();
466
467                    // Save plan structures.
468                    coord
469                        .catalog_mut()
470                        .set_optimized_plan(global_id, global_mir_plan.df_desc().clone());
471                    coord
472                        .catalog_mut()
473                        .set_physical_plan(global_id, df_desc.clone());
474
475                    let notice_builtin_updates_fut = coord
476                        .process_dataflow_metainfo(df_meta, global_id, ctx, notice_ids)
477                        .await;
478
479                    // We're putting in place read holds, such that ship_dataflow,
480                    // below, which calls update_read_capabilities, can successfully
481                    // do so. Otherwise, the since of dependencies might move along
482                    // concurrently, pulling the rug from under us!
483                    //
484                    // TODO: Maybe in the future, pass those holds on to compute, to
485                    // hold on to them and downgrade when possible?
486                    let read_holds = coord.acquire_read_holds(&id_bundle);
487                    let since = read_holds.least_valid_read();
488                    df_desc.set_as_of(since);
489
490                    coord
491                        .ship_dataflow_and_notice_builtin_table_updates(
492                            df_desc,
493                            cluster_id,
494                            notice_builtin_updates_fut,
495                        )
496                        .await;
497
498                    // Drop read holds after the dataflow has been shipped, at which
499                    // point compute will have put in its own read holds.
500                    drop(read_holds);
501
502                    coord.update_compute_read_policy(
503                        cluster_id,
504                        item_id,
505                        compaction_window.unwrap_or_default().into(),
506                    );
507                })
508            })
509            .await;
510
511        match transact_result {
512            Ok(_) => Ok(StageResult::Response(ExecuteResponse::CreatedIndex)),
513            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
514                kind:
515                    mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
516            })) if if_not_exists => {
517                ctx.session()
518                    .add_notice(AdapterNotice::ObjectAlreadyExists {
519                        name: name.item,
520                        ty: "index",
521                    });
522                Ok(StageResult::Response(ExecuteResponse::CreatedIndex))
523            }
524            Err(err) => Err(err),
525        }
526    }
527
528    #[instrument]
529    async fn create_index_explain(
530        &mut self,
531        session: &Session,
532        CreateIndexExplain {
533            exported_index_id,
534            plan: plan::CreateIndexPlan { name, index, .. },
535            df_meta,
536            explain_ctx:
537                ExplainPlanContext {
538                    config,
539                    format,
540                    stage,
541                    optimizer_trace,
542                    ..
543                },
544            ..
545        }: CreateIndexExplain,
546    ) -> Result<StageResult<Box<CreateIndexStage>>, AdapterError> {
547        let session_catalog = self.catalog().for_session(session);
548        let expr_humanizer = {
549            let on_entry = self.catalog.get_entry_by_global_id(&index.on);
550            let full_name = self.catalog.resolve_full_name(&name, on_entry.conn_id());
551            let on_desc = on_entry
552                .desc(&full_name)
553                .expect("can only create indexes on items with a valid description");
554
555            let transient_items = btreemap! {
556                exported_index_id => TransientItem::new(
557                    Some(full_name.into_parts()),
558                    Some(on_desc.iter_names().map(|c| c.to_string()).collect()),
559                )
560            };
561            ExprHumanizerExt::new(transient_items, &session_catalog)
562        };
563
564        let target_cluster = self.catalog().get_cluster(index.cluster_id);
565
566        let features = OptimizerFeatures::from(self.catalog().system_config())
567            .override_from(&target_cluster.config.features())
568            .override_from(&config.features);
569
570        let rows = optimizer_trace
571            .into_rows(
572                format,
573                &config,
574                &features,
575                &expr_humanizer,
576                None,
577                Some(target_cluster),
578                df_meta,
579                stage,
580                plan::ExplaineeStatementKind::CreateIndex,
581                None,
582            )
583            .await?;
584
585        Ok(StageResult::Response(Self::send_immediate_rows(rows)))
586    }
587}