1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.
use std::collections::BTreeSet;
use mz_catalog::memory::objects::{CatalogItem, Index};
use mz_compute_types::ComputeInstanceId;
use mz_expr::{CollectionPlan, MirScalarExpr};
use mz_repr::GlobalId;
use mz_transform::IndexOracle;
use crate::coord::{CollectionIdBundle, Coordinator};
use crate::optimize::dataflows::DataflowBuilder;
impl Coordinator {
/// Creates a new index oracle for the specified compute instance.
pub fn index_oracle(&self, instance: ComputeInstanceId) -> DataflowBuilder {
self.dataflow_builder(instance)
}
}
impl DataflowBuilder<'_> {
/// Identifies a bundle of storage and compute collection ids sufficient for
/// building a dataflow for the identifiers in `ids` out of the indexes
/// available in this compute instance.
#[mz_ore::instrument(level = "debug")]
pub fn sufficient_collections<I>(&self, ids: I) -> CollectionIdBundle
where
I: IntoIterator<Item = GlobalId>,
{
let mut id_bundle = CollectionIdBundle::default();
let mut todo: BTreeSet<GlobalId> = ids.into_iter().collect();
// Iteratively extract the largest element, potentially introducing lesser elements.
while let Some(id) = todo.iter().rev().next().cloned() {
// Extract available indexes as those that are enabled, and installed on the cluster.
let mut available_indexes = self.indexes_on(id).map(|(id, _)| id).peekable();
if available_indexes.peek().is_some() {
id_bundle
.compute_ids
.entry(self.compute.instance_id())
.or_default()
.extend(available_indexes);
} else {
match self.catalog.get_entry(&id).item() {
// Unmaterialized view. Search its dependencies.
CatalogItem::View(view) => {
todo.extend(view.optimized_expr.0.depends_on());
}
CatalogItem::Source(_)
| CatalogItem::Table(_)
| CatalogItem::MaterializedView(_) => {
// Record that we are missing at least one index.
id_bundle.storage_ids.insert(id);
}
CatalogItem::ContinualTask(_) => {
id_bundle.storage_ids.insert(id);
}
CatalogItem::Log(_) => {
// Log sources should always have an index.
panic!("log source {id} is missing index");
}
CatalogItem::Sink(_)
| CatalogItem::Index(_)
| CatalogItem::Type(_)
| CatalogItem::Func(_)
| CatalogItem::Secret(_)
| CatalogItem::Connection(_) => {
// Non-indexable thing; no work to do.
}
}
}
todo.remove(&id);
}
id_bundle
}
pub fn indexes_on(&self, id: GlobalId) -> impl Iterator<Item = (GlobalId, &Index)> {
self.catalog
.get_indexes_on(id, self.compute.instance_id())
.filter(|(idx_id, _idx)| self.compute.contains_collection(idx_id))
.filter(|(idx_id, _idx)| self.replan.map_or(true, |id| idx_id < &id))
}
}
impl IndexOracle for DataflowBuilder<'_> {
fn indexes_on(
&self,
id: GlobalId,
) -> Box<dyn Iterator<Item = (GlobalId, &[MirScalarExpr])> + '_> {
Box::new(
self.indexes_on(id)
.map(|(idx_id, idx)| (idx_id, idx.keys.as_ref())),
)
}
}