mz_adapter/coord/
indexes.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeSet;
11
12use mz_catalog::memory::objects::{CatalogItem, Index};
13use mz_compute_types::ComputeInstanceId;
14use mz_expr::{CollectionPlan, MirScalarExpr};
15use mz_repr::GlobalId;
16use mz_transform::IndexOracle;
17
18use crate::coord::{CollectionIdBundle, Coordinator};
19use crate::optimize::dataflows::DataflowBuilder;
20
21impl Coordinator {
22    /// Creates a new index oracle for the specified compute instance.
23    pub fn index_oracle(&self, instance: ComputeInstanceId) -> DataflowBuilder<'_> {
24        self.dataflow_builder(instance)
25    }
26}
27
28impl DataflowBuilder<'_> {
29    /// Identifies a bundle of storage and compute collection ids sufficient for
30    /// building a dataflow for the identifiers in `ids` out of the indexes
31    /// available in this compute instance.
32    #[mz_ore::instrument(level = "debug")]
33    pub fn sufficient_collections<I>(&self, ids: I) -> CollectionIdBundle
34    where
35        I: IntoIterator<Item = GlobalId>,
36    {
37        let mut id_bundle = CollectionIdBundle::default();
38        let mut todo: BTreeSet<GlobalId> = ids.into_iter().collect();
39
40        // Iteratively extract the largest element, potentially introducing lesser elements.
41        while let Some(id) = todo.iter().rev().next().cloned() {
42            // Extract available indexes as those that are enabled, and installed on the cluster.
43            let mut available_indexes = self.indexes_on(id).map(|(id, _)| id).peekable();
44
45            if available_indexes.peek().is_some() {
46                id_bundle
47                    .compute_ids
48                    .entry(self.compute.instance_id())
49                    .or_default()
50                    .extend(available_indexes);
51            } else {
52                // Note that the following match should be kept in sync with `import_into_dataflow`.
53                match self.catalog.get_entry(&id).item() {
54                    // Unmaterialized view. Search its dependencies.
55                    CatalogItem::View(view) => {
56                        todo.extend(view.optimized_expr.0.depends_on());
57                    }
58                    CatalogItem::MaterializedView(mview) if mview.replacement_target.is_some() => {
59                        todo.extend(mview.optimized_expr.0.depends_on());
60                    }
61                    CatalogItem::Source(_)
62                    | CatalogItem::Table(_)
63                    | CatalogItem::MaterializedView(_) => {
64                        // Record that we are missing at least one index.
65                        id_bundle.storage_ids.insert(id);
66                    }
67                    CatalogItem::ContinualTask(_) => {
68                        id_bundle.storage_ids.insert(id);
69                    }
70                    CatalogItem::Log(_) => {
71                        // Log sources should always have an index.
72                        panic!("log source {id} is missing index");
73                    }
74                    CatalogItem::Sink(_)
75                    | CatalogItem::Index(_)
76                    | CatalogItem::Type(_)
77                    | CatalogItem::Func(_)
78                    | CatalogItem::Secret(_)
79                    | CatalogItem::Connection(_) => {
80                        // Non-indexable thing; no work to do.
81                    }
82                }
83            }
84            todo.remove(&id);
85        }
86
87        id_bundle
88    }
89
90    pub fn indexes_on(&self, id: GlobalId) -> impl Iterator<Item = (GlobalId, &Index)> {
91        self.catalog
92            .get_indexes_on(id, self.compute.instance_id())
93            .filter(|(idx_id, _idx)| self.compute.contains_collection(idx_id))
94            .filter(|(idx_id, _idx)| self.replan.map_or(true, |id| idx_id < &id))
95    }
96}
97
98impl IndexOracle for DataflowBuilder<'_> {
99    fn indexes_on(
100        &self,
101        id: GlobalId,
102    ) -> Box<dyn Iterator<Item = (GlobalId, &[MirScalarExpr])> + '_> {
103        Box::new(
104            self.indexes_on(id)
105                .map(|(idx_id, idx)| (idx_id, idx.keys.as_ref())),
106        )
107    }
108}