mz_adapter/coord/
indexes.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeSet;
11
12use mz_catalog::memory::objects::{CatalogItem, Index};
13use mz_compute_types::ComputeInstanceId;
14use mz_expr::{CollectionPlan, MirScalarExpr};
15use mz_repr::GlobalId;
16use mz_transform::IndexOracle;
17
18use crate::coord::{CollectionIdBundle, Coordinator};
19use crate::optimize::dataflows::DataflowBuilder;
20
21impl Coordinator {
22    /// Creates a new index oracle for the specified compute instance.
23    pub fn index_oracle(&self, instance: ComputeInstanceId) -> DataflowBuilder {
24        self.dataflow_builder(instance)
25    }
26}
27
28impl DataflowBuilder<'_> {
29    /// Identifies a bundle of storage and compute collection ids sufficient for
30    /// building a dataflow for the identifiers in `ids` out of the indexes
31    /// available in this compute instance.
32    #[mz_ore::instrument(level = "debug")]
33    pub fn sufficient_collections<I>(&self, ids: I) -> CollectionIdBundle
34    where
35        I: IntoIterator<Item = GlobalId>,
36    {
37        let mut id_bundle = CollectionIdBundle::default();
38        let mut todo: BTreeSet<GlobalId> = ids.into_iter().collect();
39
40        // Iteratively extract the largest element, potentially introducing lesser elements.
41        while let Some(id) = todo.iter().rev().next().cloned() {
42            // Extract available indexes as those that are enabled, and installed on the cluster.
43            let mut available_indexes = self.indexes_on(id).map(|(id, _)| id).peekable();
44
45            if available_indexes.peek().is_some() {
46                id_bundle
47                    .compute_ids
48                    .entry(self.compute.instance_id())
49                    .or_default()
50                    .extend(available_indexes);
51            } else {
52                match self.catalog.get_entry(&id).item() {
53                    // Unmaterialized view. Search its dependencies.
54                    CatalogItem::View(view) => {
55                        todo.extend(view.optimized_expr.0.depends_on());
56                    }
57                    CatalogItem::Source(_)
58                    | CatalogItem::Table(_)
59                    | CatalogItem::MaterializedView(_) => {
60                        // Record that we are missing at least one index.
61                        id_bundle.storage_ids.insert(id);
62                    }
63                    CatalogItem::ContinualTask(_) => {
64                        id_bundle.storage_ids.insert(id);
65                    }
66                    CatalogItem::Log(_) => {
67                        // Log sources should always have an index.
68                        panic!("log source {id} is missing index");
69                    }
70                    CatalogItem::Sink(_)
71                    | CatalogItem::Index(_)
72                    | CatalogItem::Type(_)
73                    | CatalogItem::Func(_)
74                    | CatalogItem::Secret(_)
75                    | CatalogItem::Connection(_) => {
76                        // Non-indexable thing; no work to do.
77                    }
78                }
79            }
80            todo.remove(&id);
81        }
82
83        id_bundle
84    }
85
86    pub fn indexes_on(&self, id: GlobalId) -> impl Iterator<Item = (GlobalId, &Index)> {
87        self.catalog
88            .get_indexes_on(id, self.compute.instance_id())
89            .filter(|(idx_id, _idx)| self.compute.contains_collection(idx_id))
90            .filter(|(idx_id, _idx)| self.replan.map_or(true, |id| idx_id < &id))
91    }
92}
93
94impl IndexOracle for DataflowBuilder<'_> {
95    fn indexes_on(
96        &self,
97        id: GlobalId,
98    ) -> Box<dyn Iterator<Item = (GlobalId, &[MirScalarExpr])> + '_> {
99        Box::new(
100            self.indexes_on(id)
101                .map(|(idx_id, idx)| (idx_id, idx.keys.as_ref())),
102        )
103    }
104}