mz_adapter/coord/
id_bundle.rs1use std::collections::{BTreeMap, BTreeSet};
11
12use mz_compute_types::ComputeInstanceId;
13use mz_repr::GlobalId;
14use mz_sql::session::metadata::SessionMetadata;
15use serde::{Deserialize, Serialize};
16
17use crate::coord::Coordinator;
18use crate::session::Session;
19
20#[derive(Deserialize, Debug, Default, Clone, Serialize)]
22pub struct CollectionIdBundle {
23    pub storage_ids: BTreeSet<GlobalId>,
25    pub compute_ids: BTreeMap<ComputeInstanceId, BTreeSet<GlobalId>>,
27}
28
29impl CollectionIdBundle {
30    pub fn is_empty(&self) -> bool {
32        self.storage_ids.is_empty() && self.compute_ids.values().all(|ids| ids.is_empty())
33    }
34
35    pub fn difference(&self, other: &CollectionIdBundle) -> CollectionIdBundle {
37        let storage_ids = &self.storage_ids - &other.storage_ids;
38        let compute_ids: BTreeMap<_, _> = self
39            .compute_ids
40            .iter()
41            .map(|(compute_instance, compute_ids)| {
42                let compute_ids =
43                    if let Some(other_compute_ids) = other.compute_ids.get(compute_instance) {
44                        compute_ids - other_compute_ids
45                    } else {
46                        compute_ids.clone()
47                    };
48                (*compute_instance, compute_ids)
49            })
50            .filter(|(_, ids)| !ids.is_empty())
51            .collect();
52        CollectionIdBundle {
53            storage_ids,
54            compute_ids,
55        }
56    }
57
58    pub fn intersection(&self, other: &CollectionIdBundle) -> CollectionIdBundle {
60        let storage_ids = self
62            .storage_ids
63            .intersection(&other.storage_ids)
64            .cloned()
65            .collect();
66
67        let self_compute_instances = self.compute_ids.keys().collect::<BTreeSet<_>>();
69        let other_compute_instances = other.compute_ids.keys().collect::<BTreeSet<_>>();
70        let compute_instances = self_compute_instances
71            .intersection(&other_compute_instances)
72            .cloned();
73
74        let compute_ids = compute_instances
76            .map(|compute_instance_id| {
77                let self_compute_ids = self
78                    .compute_ids
79                    .get(compute_instance_id)
80                    .expect("id is in intersection, so should be found");
81                let other_compute_ids = other
82                    .compute_ids
83                    .get(compute_instance_id)
84                    .expect("id is in intersection, so should be found");
85                (
86                    compute_instance_id.clone(),
87                    self_compute_ids
88                        .intersection(other_compute_ids)
89                        .cloned()
90                        .collect(),
91                )
92            })
93            .collect();
94
95        CollectionIdBundle {
96            storage_ids,
97            compute_ids,
98        }
99    }
100
101    pub fn extend(&mut self, other: &CollectionIdBundle) {
103        self.storage_ids.extend(&other.storage_ids);
104        for (compute_instance, ids) in &other.compute_ids {
105            self.compute_ids
106                .entry(*compute_instance)
107                .or_default()
108                .extend(ids);
109        }
110    }
111
112    pub fn iter(&self) -> impl Iterator<Item = GlobalId> + '_ {
116        self.storage_ids
117            .iter()
118            .copied()
119            .chain(self.compute_ids.values().flat_map(BTreeSet::iter).copied())
120    }
121}
122
123impl Coordinator {
124    pub fn resolve_collection_id_bundle_names(
127        &self,
128        session: &Session,
129        id_bundle: &CollectionIdBundle,
130    ) -> Vec<String> {
131        let mut names: Vec<_> = id_bundle
132            .iter()
133            .filter_map(|id| self.catalog().try_get_entry_by_global_id(&id))
135            .map(|item| {
136                self.catalog()
137                    .resolve_full_name(item.name(), Some(session.conn_id()))
138                    .to_string()
139            })
140            .collect();
141        names.sort();
142        names
143    }
144}