1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::collections::BTreeSet;

use mz_catalog::memory::objects::{CatalogItem, Index};
use mz_compute_types::ComputeInstanceId;
use mz_expr::{CollectionPlan, MirScalarExpr};
use mz_repr::GlobalId;
use mz_transform::IndexOracle;

use crate::coord::{CollectionIdBundle, Coordinator};
use crate::optimize::dataflows::DataflowBuilder;

impl Coordinator {
    /// Creates a new index oracle for the specified compute instance.
    pub fn index_oracle(&self, instance: ComputeInstanceId) -> DataflowBuilder {
        self.dataflow_builder(instance)
    }
}

impl DataflowBuilder<'_> {
    /// Identifies a bundle of storage and compute collection ids sufficient for
    /// building a dataflow for the identifiers in `ids` out of the indexes
    /// available in this compute instance.
    #[mz_ore::instrument(level = "debug")]
    pub fn sufficient_collections<I>(&self, ids: I) -> CollectionIdBundle
    where
        I: IntoIterator<Item = GlobalId>,
    {
        let mut id_bundle = CollectionIdBundle::default();
        let mut todo: BTreeSet<GlobalId> = ids.into_iter().collect();

        // Iteratively extract the largest element, potentially introducing lesser elements.
        while let Some(id) = todo.iter().rev().next().cloned() {
            // Extract available indexes as those that are enabled, and installed on the cluster.
            let mut available_indexes = self.indexes_on(id).map(|(id, _)| id).peekable();

            if available_indexes.peek().is_some() {
                id_bundle
                    .compute_ids
                    .entry(self.compute.instance_id())
                    .or_default()
                    .extend(available_indexes);
            } else {
                match self.catalog.get_entry(&id).item() {
                    // Unmaterialized view. Search its dependencies.
                    CatalogItem::View(view) => {
                        todo.extend(view.optimized_expr.0.depends_on());
                    }
                    CatalogItem::Source(_)
                    | CatalogItem::Table(_)
                    | CatalogItem::MaterializedView(_) => {
                        // Record that we are missing at least one index.
                        id_bundle.storage_ids.insert(id);
                    }
                    CatalogItem::ContinualTask(_) => {
                        id_bundle.storage_ids.insert(id);
                    }
                    CatalogItem::Log(_) => {
                        // Log sources should always have an index.
                        panic!("log source {id} is missing index");
                    }
                    CatalogItem::Sink(_)
                    | CatalogItem::Index(_)
                    | CatalogItem::Type(_)
                    | CatalogItem::Func(_)
                    | CatalogItem::Secret(_)
                    | CatalogItem::Connection(_) => {
                        // Non-indexable thing; no work to do.
                    }
                }
            }
            todo.remove(&id);
        }

        id_bundle
    }

    pub fn indexes_on(&self, id: GlobalId) -> impl Iterator<Item = (GlobalId, &Index)> {
        self.catalog
            .get_indexes_on(id, self.compute.instance_id())
            .filter(|(idx_id, _idx)| self.compute.contains_collection(idx_id))
            .filter(|(idx_id, _idx)| self.replan.map_or(true, |id| idx_id < &id))
    }
}

impl IndexOracle for DataflowBuilder<'_> {
    fn indexes_on(
        &self,
        id: GlobalId,
    ) -> Box<dyn Iterator<Item = (GlobalId, &[MirScalarExpr])> + '_> {
        Box::new(
            self.indexes_on(id)
                .map(|(idx_id, idx)| (idx_id, idx.keys.as_ref())),
        )
    }
}