mz_adapter/coord/
id_bundle.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11
12use mz_compute_types::ComputeInstanceId;
13use mz_repr::GlobalId;
14use mz_sql::session::metadata::SessionMetadata;
15use serde::{Deserialize, Serialize};
16
17use crate::coord::Coordinator;
18use crate::session::Session;
19
20/// A bundle of storage and compute collection identifiers.
21#[derive(Deserialize, Debug, Default, Clone, Serialize)]
22pub struct CollectionIdBundle {
23    /// The identifiers for sources in the storage layer.
24    pub storage_ids: BTreeSet<GlobalId>,
25    /// The identifiers for indexes in the compute layer.
26    pub compute_ids: BTreeMap<ComputeInstanceId, BTreeSet<GlobalId>>,
27}
28
29impl CollectionIdBundle {
30    /// Reports whether the bundle contains any identifiers of any type.
31    pub fn is_empty(&self) -> bool {
32        self.storage_ids.is_empty() && self.compute_ids.values().all(|ids| ids.is_empty())
33    }
34
35    /// Returns a new bundle without the identifiers from `other`.
36    pub fn difference(&self, other: &CollectionIdBundle) -> CollectionIdBundle {
37        let storage_ids = &self.storage_ids - &other.storage_ids;
38        let compute_ids: BTreeMap<_, _> = self
39            .compute_ids
40            .iter()
41            .map(|(compute_instance, compute_ids)| {
42                let compute_ids =
43                    if let Some(other_compute_ids) = other.compute_ids.get(compute_instance) {
44                        compute_ids - other_compute_ids
45                    } else {
46                        compute_ids.clone()
47                    };
48                (*compute_instance, compute_ids)
49            })
50            .filter(|(_, ids)| !ids.is_empty())
51            .collect();
52        CollectionIdBundle {
53            storage_ids,
54            compute_ids,
55        }
56    }
57
58    /// Returns a new bundle with the identifiers that are present in both `self` and `other`.
59    pub fn intersection(&self, other: &CollectionIdBundle) -> CollectionIdBundle {
60        // Attend to storage ids.
61        let storage_ids = self
62            .storage_ids
63            .intersection(&other.storage_ids)
64            .cloned()
65            .collect();
66
67        // Intersect ComputeInstanceIds.
68        let self_compute_instances = self.compute_ids.keys().collect::<BTreeSet<_>>();
69        let other_compute_instances = other.compute_ids.keys().collect::<BTreeSet<_>>();
70        let compute_instances = self_compute_instances
71            .intersection(&other_compute_instances)
72            .cloned();
73
74        // For each ComputeInstanceId, intersect `self` with `other`.
75        let compute_ids = compute_instances
76            .map(|compute_instance_id| {
77                let self_compute_ids = self
78                    .compute_ids
79                    .get(compute_instance_id)
80                    .expect("id is in intersection, so should be found");
81                let other_compute_ids = other
82                    .compute_ids
83                    .get(compute_instance_id)
84                    .expect("id is in intersection, so should be found");
85                (
86                    compute_instance_id.clone(),
87                    self_compute_ids
88                        .intersection(other_compute_ids)
89                        .cloned()
90                        .collect(),
91                )
92            })
93            .collect();
94
95        CollectionIdBundle {
96            storage_ids,
97            compute_ids,
98        }
99    }
100
101    /// Extends a `CollectionIdBundle` with the contents of another `CollectionIdBundle`.
102    pub fn extend(&mut self, other: &CollectionIdBundle) {
103        self.storage_ids.extend(&other.storage_ids);
104        for (compute_instance, ids) in &other.compute_ids {
105            self.compute_ids
106                .entry(*compute_instance)
107                .or_default()
108                .extend(ids);
109        }
110    }
111
112    /// Returns an iterator over all IDs in the bundle.
113    ///
114    /// The IDs are iterated in an unspecified order.
115    pub fn iter(&self) -> impl Iterator<Item = GlobalId> + '_ {
116        self.storage_ids
117            .iter()
118            .copied()
119            .chain(self.compute_ids.values().flat_map(BTreeSet::iter).copied())
120    }
121}
122
123impl Coordinator {
124    /// Resolves the full name from the corresponding catalog entry for each item in `id_bundle`.
125    /// If an item in the bundle does not exist in the catalog, it's not included in the result.
126    pub fn resolve_collection_id_bundle_names(
127        &self,
128        session: &Session,
129        id_bundle: &CollectionIdBundle,
130    ) -> Vec<String> {
131        let mut names: Vec<_> = id_bundle
132            .iter()
133            // This could filter out an entry that has been replaced in another transaction.
134            .filter_map(|id| self.catalog().try_get_entry_by_global_id(&id))
135            .map(|item| {
136                self.catalog()
137                    .resolve_full_name(item.name(), Some(session.conn_id()))
138                    .to_string()
139            })
140            .collect();
141        names.sort();
142        names
143    }
144}