1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
use std::collections::BTreeSet;
use mz_compute_client::controller::{ComputeController, ComputeInstanceId};
use mz_expr::MirScalarExpr;
use mz_repr::GlobalId;
use mz_stash::Append;
use mz_transform::IndexOracle;
use crate::catalog::{CatalogItem, CatalogState, Index};
use crate::coord::dataflow_builder::DataflowBuilder;
use crate::coord::{CollectionIdBundle, CoordTimestamp, Coordinator};
#[derive(Debug)]
pub struct ComputeInstanceIndexOracle<'a, T> {
catalog: &'a CatalogState,
compute: ComputeController<'a, T>,
}
impl<S: Append> Coordinator<S> {
pub fn index_oracle(
&self,
instance: ComputeInstanceId,
) -> ComputeInstanceIndexOracle<mz_repr::Timestamp> {
ComputeInstanceIndexOracle {
catalog: self.catalog.state(),
compute: self.dataflow_client.compute(instance).unwrap(),
}
}
}
impl<T: Copy> DataflowBuilder<'_, T> {
pub fn index_oracle(&self) -> ComputeInstanceIndexOracle<T> {
ComputeInstanceIndexOracle {
catalog: self.catalog,
compute: self.compute,
}
}
}
impl<T: CoordTimestamp> ComputeInstanceIndexOracle<'_, T> {
pub fn sufficient_collections<'a, I>(&self, ids: I) -> CollectionIdBundle
where
I: IntoIterator<Item = &'a GlobalId>,
{
let mut id_bundle = CollectionIdBundle::default();
let mut todo: BTreeSet<GlobalId> = ids.into_iter().cloned().collect();
while let Some(id) = todo.iter().rev().next().cloned() {
let mut available_indexes = self.indexes_on(id).map(|(id, _)| id).peekable();
if available_indexes.peek().is_some() {
id_bundle.compute_ids.extend(available_indexes);
} else {
match self.catalog.get_entry(&id).item() {
view @ CatalogItem::View(_) => {
todo.extend(view.uses());
}
CatalogItem::Source(_) | CatalogItem::Table(_) => {
id_bundle.storage_ids.insert(id);
}
_ => {
}
}
}
todo.remove(&id);
}
id_bundle
}
pub fn indexes_on(&self, id: GlobalId) -> impl Iterator<Item = (GlobalId, &Index)> {
self.catalog
.get_indexes_on(id, self.compute.instance_id())
.filter(|(idx_id, _idx)| self.compute.collection(*idx_id).is_ok())
}
}
impl<T: CoordTimestamp> IndexOracle for ComputeInstanceIndexOracle<'_, T> {
fn indexes_on(&self, id: GlobalId) -> Box<dyn Iterator<Item = &[MirScalarExpr]> + '_> {
Box::new(
ComputeInstanceIndexOracle::indexes_on(self, id)
.map(|(_idx_id, idx)| idx.keys.as_slice()),
)
}
}