Skip to main content

mz_adapter/coord/
id_bundle.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11
12use mz_adapter_types::connection::ConnectionId;
13use mz_compute_types::ComputeInstanceId;
14use mz_repr::GlobalId;
15use serde::{Deserialize, Serialize};
16
17use crate::catalog::Catalog;
18
19/// A bundle of storage and compute collection identifiers.
20#[derive(Deserialize, Debug, Default, Clone, Serialize)]
21pub struct CollectionIdBundle {
22    /// The identifiers for sources in the storage layer.
23    pub storage_ids: BTreeSet<GlobalId>,
24    /// The identifiers for indexes in the compute layer.
25    /// (If this ever changes to include things other than indexes, then please attend to the
26    /// `Display` of `constraints::Reason::ComputeInput`.)
27    pub compute_ids: BTreeMap<ComputeInstanceId, BTreeSet<GlobalId>>,
28}
29
30impl CollectionIdBundle {
31    /// Reports whether the bundle contains any identifiers of any type.
32    pub fn is_empty(&self) -> bool {
33        self.storage_ids.is_empty() && self.compute_ids.values().all(|ids| ids.is_empty())
34    }
35
36    /// Returns a new bundle without the identifiers from `other`.
37    pub fn difference(&self, other: &CollectionIdBundle) -> CollectionIdBundle {
38        let storage_ids = &self.storage_ids - &other.storage_ids;
39        let compute_ids: BTreeMap<_, _> = self
40            .compute_ids
41            .iter()
42            .map(|(compute_instance, compute_ids)| {
43                let compute_ids =
44                    if let Some(other_compute_ids) = other.compute_ids.get(compute_instance) {
45                        compute_ids - other_compute_ids
46                    } else {
47                        compute_ids.clone()
48                    };
49                (*compute_instance, compute_ids)
50            })
51            .filter(|(_, ids)| !ids.is_empty())
52            .collect();
53        CollectionIdBundle {
54            storage_ids,
55            compute_ids,
56        }
57    }
58
59    /// Returns a new bundle with the identifiers that are present in both `self` and `other`.
60    pub fn intersection(&self, other: &CollectionIdBundle) -> CollectionIdBundle {
61        // Attend to storage ids.
62        let storage_ids = self
63            .storage_ids
64            .intersection(&other.storage_ids)
65            .cloned()
66            .collect();
67
68        // Intersect ComputeInstanceIds.
69        let self_compute_instances = self.compute_ids.keys().collect::<BTreeSet<_>>();
70        let other_compute_instances = other.compute_ids.keys().collect::<BTreeSet<_>>();
71        let compute_instances = self_compute_instances
72            .intersection(&other_compute_instances)
73            .cloned();
74
75        // For each ComputeInstanceId, intersect `self` with `other`.
76        let compute_ids = compute_instances
77            .map(|compute_instance_id| {
78                let self_compute_ids = self
79                    .compute_ids
80                    .get(compute_instance_id)
81                    .expect("id is in intersection, so should be found");
82                let other_compute_ids = other
83                    .compute_ids
84                    .get(compute_instance_id)
85                    .expect("id is in intersection, so should be found");
86                (
87                    compute_instance_id.clone(),
88                    self_compute_ids
89                        .intersection(other_compute_ids)
90                        .cloned()
91                        .collect(),
92                )
93            })
94            .collect();
95
96        CollectionIdBundle {
97            storage_ids,
98            compute_ids,
99        }
100    }
101
102    /// Extends a `CollectionIdBundle` with the contents of another `CollectionIdBundle`.
103    pub fn extend(&mut self, other: &CollectionIdBundle) {
104        self.storage_ids.extend(&other.storage_ids);
105        for (compute_instance, ids) in &other.compute_ids {
106            self.compute_ids
107                .entry(*compute_instance)
108                .or_default()
109                .extend(ids);
110        }
111    }
112
113    /// Returns an iterator over all IDs in the bundle.
114    ///
115    /// The IDs are iterated in an unspecified order.
116    pub fn iter(&self) -> impl Iterator<Item = GlobalId> + '_ {
117        self.storage_ids
118            .iter()
119            .copied()
120            .chain(self.compute_ids.values().flat_map(BTreeSet::iter).copied())
121    }
122
123    /// Resolves the full name from the corresponding catalog entry for each item in this bundle.
124    /// If an item in the bundle does not exist in the catalog, it's not included in the result.
125    pub fn resolve_names(&self, catalog: &Catalog, conn_id: &ConnectionId) -> Vec<String> {
126        let mut names: Vec<_> = self
127            .iter()
128            // This could filter out an entry that has been replaced in another transaction.
129            .filter_map(|id| catalog.try_get_entry_by_global_id(&id))
130            .map(|item| {
131                catalog
132                    .resolve_full_name(item.name(), Some(conn_id))
133                    .to_string()
134            })
135            .collect();
136        names.sort();
137        names
138    }
139}