mz_adapter/coord/
indexes.rs
1use std::collections::BTreeSet;
11
12use mz_catalog::memory::objects::{CatalogItem, Index};
13use mz_compute_types::ComputeInstanceId;
14use mz_expr::{CollectionPlan, MirScalarExpr};
15use mz_repr::GlobalId;
16use mz_transform::IndexOracle;
17
18use crate::coord::{CollectionIdBundle, Coordinator};
19use crate::optimize::dataflows::DataflowBuilder;
20
21impl Coordinator {
22 pub fn index_oracle(&self, instance: ComputeInstanceId) -> DataflowBuilder {
24 self.dataflow_builder(instance)
25 }
26}
27
28impl DataflowBuilder<'_> {
29 #[mz_ore::instrument(level = "debug")]
33 pub fn sufficient_collections<I>(&self, ids: I) -> CollectionIdBundle
34 where
35 I: IntoIterator<Item = GlobalId>,
36 {
37 let mut id_bundle = CollectionIdBundle::default();
38 let mut todo: BTreeSet<GlobalId> = ids.into_iter().collect();
39
40 while let Some(id) = todo.iter().rev().next().cloned() {
42 let mut available_indexes = self.indexes_on(id).map(|(id, _)| id).peekable();
44
45 if available_indexes.peek().is_some() {
46 id_bundle
47 .compute_ids
48 .entry(self.compute.instance_id())
49 .or_default()
50 .extend(available_indexes);
51 } else {
52 match self.catalog.get_entry(&id).item() {
53 CatalogItem::View(view) => {
55 todo.extend(view.optimized_expr.0.depends_on());
56 }
57 CatalogItem::Source(_)
58 | CatalogItem::Table(_)
59 | CatalogItem::MaterializedView(_) => {
60 id_bundle.storage_ids.insert(id);
62 }
63 CatalogItem::ContinualTask(_) => {
64 id_bundle.storage_ids.insert(id);
65 }
66 CatalogItem::Log(_) => {
67 panic!("log source {id} is missing index");
69 }
70 CatalogItem::Sink(_)
71 | CatalogItem::Index(_)
72 | CatalogItem::Type(_)
73 | CatalogItem::Func(_)
74 | CatalogItem::Secret(_)
75 | CatalogItem::Connection(_) => {
76 }
78 }
79 }
80 todo.remove(&id);
81 }
82
83 id_bundle
84 }
85
86 pub fn indexes_on(&self, id: GlobalId) -> impl Iterator<Item = (GlobalId, &Index)> {
87 self.catalog
88 .get_indexes_on(id, self.compute.instance_id())
89 .filter(|(idx_id, _idx)| self.compute.contains_collection(idx_id))
90 .filter(|(idx_id, _idx)| self.replan.map_or(true, |id| idx_id < &id))
91 }
92}
93
94impl IndexOracle for DataflowBuilder<'_> {
95 fn indexes_on(
96 &self,
97 id: GlobalId,
98 ) -> Box<dyn Iterator<Item = (GlobalId, &[MirScalarExpr])> + '_> {
99 Box::new(
100 self.indexes_on(id)
101 .map(|(idx_id, idx)| (idx_id, idx.keys.as_ref())),
102 )
103 }
104}