1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::collections::BTreeSet;

use mz_compute_client::controller::{ComputeController, ComputeInstanceId};
use mz_expr::MirScalarExpr;
use mz_repr::GlobalId;
use mz_stash::Append;
use mz_transform::IndexOracle;

use crate::catalog::{CatalogItem, CatalogState, Index};
use crate::coord::dataflow_builder::DataflowBuilder;
use crate::coord::{CollectionIdBundle, CoordTimestamp, Coordinator};

/// Answers questions about the indexes available on a particular compute
/// instance.
#[derive(Debug)]
pub struct ComputeInstanceIndexOracle<'a, T> {
    catalog: &'a CatalogState,
    compute: ComputeController<'a, T>,
}

impl<S: Append> Coordinator<S> {
    /// Creates a new index oracle for the specified compute instance.
    pub fn index_oracle(
        &self,
        instance: ComputeInstanceId,
    ) -> ComputeInstanceIndexOracle<mz_repr::Timestamp> {
        ComputeInstanceIndexOracle {
            catalog: self.catalog.state(),
            compute: self.dataflow_client.compute(instance).unwrap(),
        }
    }
}

impl<T: Copy> DataflowBuilder<'_, T> {
    /// Creates a new index oracle for the same compute instance as the dataflow
    /// builder.
    pub fn index_oracle(&self) -> ComputeInstanceIndexOracle<T> {
        ComputeInstanceIndexOracle {
            catalog: self.catalog,
            compute: self.compute,
        }
    }
}

impl<T: CoordTimestamp> ComputeInstanceIndexOracle<'_, T> {
    /// Identifies a bundle of storage and compute collection ids sufficient for
    /// building a dataflow for the identifiers in `ids` out of the indexes
    /// available in this compute instance.
    pub fn sufficient_collections<'a, I>(&self, ids: I) -> CollectionIdBundle
    where
        I: IntoIterator<Item = &'a GlobalId>,
    {
        let mut id_bundle = CollectionIdBundle::default();
        let mut todo: BTreeSet<GlobalId> = ids.into_iter().cloned().collect();

        // Iteratively extract the largest element, potentially introducing lesser elements.
        while let Some(id) = todo.iter().rev().next().cloned() {
            // Extract available indexes as those that are enabled, and installed on the cluster.
            let mut available_indexes = self.indexes_on(id).map(|(id, _)| id).peekable();

            if available_indexes.peek().is_some() {
                id_bundle.compute_ids.extend(available_indexes);
            } else {
                match self.catalog.get_entry(&id).item() {
                    // Unmaterialized view. Search its dependencies.
                    view @ CatalogItem::View(_) => {
                        todo.extend(view.uses());
                    }
                    CatalogItem::Source(_) | CatalogItem::Table(_) => {
                        // Unmaterialized source or table. Record that we are
                        // missing at least one index.
                        id_bundle.storage_ids.insert(id);
                    }
                    _ => {
                        // Non-indexable thing; no work to do.
                    }
                }
            }
            todo.remove(&id);
        }

        id_bundle
    }

    pub fn indexes_on(&self, id: GlobalId) -> impl Iterator<Item = (GlobalId, &Index)> {
        self.catalog
            .get_indexes_on(id, self.compute.instance_id())
            .filter(|(idx_id, _idx)| self.compute.collection(*idx_id).is_ok())
    }
}

impl<T: CoordTimestamp> IndexOracle for ComputeInstanceIndexOracle<'_, T> {
    fn indexes_on(&self, id: GlobalId) -> Box<dyn Iterator<Item = &[MirScalarExpr]> + '_> {
        Box::new(
            ComputeInstanceIndexOracle::indexes_on(self, id)
                .map(|(_idx_id, idx)| idx.keys.as_slice()),
        )
    }
}