mz_adapter/coord/
indexes.rs1use std::collections::BTreeSet;
11
12use mz_catalog::memory::objects::{CatalogItem, Index};
13use mz_compute_types::ComputeInstanceId;
14use mz_expr::{CollectionPlan, MirScalarExpr};
15use mz_repr::GlobalId;
16use mz_transform::IndexOracle;
17
18use crate::coord::{CollectionIdBundle, Coordinator};
19use crate::optimize::dataflows::DataflowBuilder;
20
21impl Coordinator {
22 pub fn index_oracle(&self, instance: ComputeInstanceId) -> DataflowBuilder<'_> {
24 self.dataflow_builder(instance)
25 }
26}
27
28impl DataflowBuilder<'_> {
29 #[mz_ore::instrument(level = "debug")]
33 pub fn sufficient_collections<I>(&self, ids: I) -> CollectionIdBundle
34 where
35 I: IntoIterator<Item = GlobalId>,
36 {
37 let mut id_bundle = CollectionIdBundle::default();
38 let mut todo: BTreeSet<GlobalId> = ids.into_iter().collect();
39
40 while let Some(id) = todo.iter().rev().next().cloned() {
42 let mut available_indexes = self.indexes_on(id).map(|(id, _)| id).peekable();
44
45 if available_indexes.peek().is_some() {
46 id_bundle
47 .compute_ids
48 .entry(self.compute.instance_id())
49 .or_default()
50 .extend(available_indexes);
51 } else {
52 match self.catalog.get_entry(&id).item() {
54 CatalogItem::View(view) => {
56 todo.extend(view.optimized_expr.0.depends_on());
57 }
58 CatalogItem::MaterializedView(mview) if mview.replacement_target.is_some() => {
59 todo.extend(mview.optimized_expr.0.depends_on());
60 }
61 CatalogItem::Source(_)
62 | CatalogItem::Table(_)
63 | CatalogItem::MaterializedView(_) => {
64 id_bundle.storage_ids.insert(id);
66 }
67 CatalogItem::ContinualTask(_) => {
68 id_bundle.storage_ids.insert(id);
69 }
70 CatalogItem::Log(_) => {
71 panic!("log source {id} is missing index");
73 }
74 CatalogItem::Sink(_)
75 | CatalogItem::Index(_)
76 | CatalogItem::Type(_)
77 | CatalogItem::Func(_)
78 | CatalogItem::Secret(_)
79 | CatalogItem::Connection(_) => {
80 }
82 }
83 }
84 todo.remove(&id);
85 }
86
87 id_bundle
88 }
89
90 pub fn indexes_on(&self, id: GlobalId) -> impl Iterator<Item = (GlobalId, &Index)> {
91 self.catalog
92 .get_indexes_on(id, self.compute.instance_id())
93 .filter(|(idx_id, _idx)| self.compute.contains_collection(idx_id))
94 .filter(|(idx_id, _idx)| self.replan.map_or(true, |id| idx_id < &id))
95 }
96}
97
98impl IndexOracle for DataflowBuilder<'_> {
99 fn indexes_on(
100 &self,
101 id: GlobalId,
102 ) -> Box<dyn Iterator<Item = (GlobalId, &[MirScalarExpr])> + '_> {
103 Box::new(
104 self.indexes_on(id)
105 .map(|(idx_id, idx)| (idx_id, idx.keys.as_ref())),
106 )
107 }
108}