1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
910use std::collections::{BTreeMap, BTreeSet};
1112use mz_compute_types::ComputeInstanceId;
13use mz_repr::GlobalId;
14use mz_sql::session::metadata::SessionMetadata;
15use serde::{Deserialize, Serialize};
1617use crate::coord::Coordinator;
18use crate::session::Session;
1920/// A bundle of storage and compute collection identifiers.
21#[derive(Deserialize, Debug, Default, Clone, Serialize)]
22pub struct CollectionIdBundle {
23/// The identifiers for sources in the storage layer.
24pub storage_ids: BTreeSet<GlobalId>,
25/// The identifiers for indexes in the compute layer.
26pub compute_ids: BTreeMap<ComputeInstanceId, BTreeSet<GlobalId>>,
27}
2829impl CollectionIdBundle {
30/// Reports whether the bundle contains any identifiers of any type.
31pub fn is_empty(&self) -> bool {
32self.storage_ids.is_empty() && self.compute_ids.values().all(|ids| ids.is_empty())
33 }
3435/// Returns a new bundle without the identifiers from `other`.
36pub fn difference(&self, other: &CollectionIdBundle) -> CollectionIdBundle {
37let storage_ids = &self.storage_ids - &other.storage_ids;
38let compute_ids: BTreeMap<_, _> = self
39.compute_ids
40 .iter()
41 .map(|(compute_instance, compute_ids)| {
42let compute_ids =
43if let Some(other_compute_ids) = other.compute_ids.get(compute_instance) {
44 compute_ids - other_compute_ids
45 } else {
46 compute_ids.clone()
47 };
48 (*compute_instance, compute_ids)
49 })
50 .filter(|(_, ids)| !ids.is_empty())
51 .collect();
52 CollectionIdBundle {
53 storage_ids,
54 compute_ids,
55 }
56 }
5758/// Returns a new bundle with the identifiers that are present in both `self` and `other`.
59pub fn intersection(&self, other: &CollectionIdBundle) -> CollectionIdBundle {
60// Attend to storage ids.
61let storage_ids = self
62.storage_ids
63 .intersection(&other.storage_ids)
64 .cloned()
65 .collect();
6667// Intersect ComputeInstanceIds.
68let self_compute_instances = self.compute_ids.keys().collect::<BTreeSet<_>>();
69let other_compute_instances = other.compute_ids.keys().collect::<BTreeSet<_>>();
70let compute_instances = self_compute_instances
71 .intersection(&other_compute_instances)
72 .cloned();
7374// For each ComputeInstanceId, intersect `self` with `other`.
75let compute_ids = compute_instances
76 .map(|compute_instance_id| {
77let self_compute_ids = self
78.compute_ids
79 .get(compute_instance_id)
80 .expect("id is in intersection, so should be found");
81let other_compute_ids = other
82 .compute_ids
83 .get(compute_instance_id)
84 .expect("id is in intersection, so should be found");
85 (
86 compute_instance_id.clone(),
87 self_compute_ids
88 .intersection(other_compute_ids)
89 .cloned()
90 .collect(),
91 )
92 })
93 .collect();
9495 CollectionIdBundle {
96 storage_ids,
97 compute_ids,
98 }
99 }
100101/// Extends a `CollectionIdBundle` with the contents of another `CollectionIdBundle`.
102pub fn extend(&mut self, other: &CollectionIdBundle) {
103self.storage_ids.extend(&other.storage_ids);
104for (compute_instance, ids) in &other.compute_ids {
105self.compute_ids
106 .entry(*compute_instance)
107 .or_default()
108 .extend(ids);
109 }
110 }
111112/// Returns an iterator over all IDs in the bundle.
113 ///
114 /// The IDs are iterated in an unspecified order.
115pub fn iter(&self) -> impl Iterator<Item = GlobalId> + '_ {
116self.storage_ids
117 .iter()
118 .copied()
119 .chain(self.compute_ids.values().flat_map(BTreeSet::iter).copied())
120 }
121}
122123impl Coordinator {
124/// Resolves the full name from the corresponding catalog entry for each item in `id_bundle`.
125 /// If an item in the bundle does not exist in the catalog, it's not included in the result.
126pub fn resolve_collection_id_bundle_names(
127&self,
128 session: &Session,
129 id_bundle: &CollectionIdBundle,
130 ) -> Vec<String> {
131let mut names: Vec<_> = id_bundle
132 .iter()
133// This could filter out an entry that has been replaced in another transaction.
134.filter_map(|id| self.catalog().try_get_entry_by_global_id(&id))
135 .map(|item| {
136self.catalog()
137 .resolve_full_name(item.name(), Some(session.conn_id()))
138 .to_string()
139 })
140 .collect();
141 names.sort();
142 names
143 }
144}