mz_adapter/coord/
id_bundle.rs1use std::collections::{BTreeMap, BTreeSet};
11
12use mz_adapter_types::connection::ConnectionId;
13use mz_compute_types::ComputeInstanceId;
14use mz_repr::GlobalId;
15use serde::{Deserialize, Serialize};
16
17use crate::catalog::Catalog;
18
19#[derive(Deserialize, Debug, Default, Clone, Serialize)]
21pub struct CollectionIdBundle {
22 pub storage_ids: BTreeSet<GlobalId>,
24 pub compute_ids: BTreeMap<ComputeInstanceId, BTreeSet<GlobalId>>,
28}
29
30impl CollectionIdBundle {
31 pub fn is_empty(&self) -> bool {
33 self.storage_ids.is_empty() && self.compute_ids.values().all(|ids| ids.is_empty())
34 }
35
36 pub fn difference(&self, other: &CollectionIdBundle) -> CollectionIdBundle {
38 let storage_ids = &self.storage_ids - &other.storage_ids;
39 let compute_ids: BTreeMap<_, _> = self
40 .compute_ids
41 .iter()
42 .map(|(compute_instance, compute_ids)| {
43 let compute_ids =
44 if let Some(other_compute_ids) = other.compute_ids.get(compute_instance) {
45 compute_ids - other_compute_ids
46 } else {
47 compute_ids.clone()
48 };
49 (*compute_instance, compute_ids)
50 })
51 .filter(|(_, ids)| !ids.is_empty())
52 .collect();
53 CollectionIdBundle {
54 storage_ids,
55 compute_ids,
56 }
57 }
58
59 pub fn intersection(&self, other: &CollectionIdBundle) -> CollectionIdBundle {
61 let storage_ids = self
63 .storage_ids
64 .intersection(&other.storage_ids)
65 .cloned()
66 .collect();
67
68 let self_compute_instances = self.compute_ids.keys().collect::<BTreeSet<_>>();
70 let other_compute_instances = other.compute_ids.keys().collect::<BTreeSet<_>>();
71 let compute_instances = self_compute_instances
72 .intersection(&other_compute_instances)
73 .cloned();
74
75 let compute_ids = compute_instances
77 .map(|compute_instance_id| {
78 let self_compute_ids = self
79 .compute_ids
80 .get(compute_instance_id)
81 .expect("id is in intersection, so should be found");
82 let other_compute_ids = other
83 .compute_ids
84 .get(compute_instance_id)
85 .expect("id is in intersection, so should be found");
86 (
87 compute_instance_id.clone(),
88 self_compute_ids
89 .intersection(other_compute_ids)
90 .cloned()
91 .collect(),
92 )
93 })
94 .collect();
95
96 CollectionIdBundle {
97 storage_ids,
98 compute_ids,
99 }
100 }
101
102 pub fn extend(&mut self, other: &CollectionIdBundle) {
104 self.storage_ids.extend(&other.storage_ids);
105 for (compute_instance, ids) in &other.compute_ids {
106 self.compute_ids
107 .entry(*compute_instance)
108 .or_default()
109 .extend(ids);
110 }
111 }
112
113 pub fn iter(&self) -> impl Iterator<Item = GlobalId> + '_ {
117 self.storage_ids
118 .iter()
119 .copied()
120 .chain(self.compute_ids.values().flat_map(BTreeSet::iter).copied())
121 }
122
123 pub fn resolve_names(&self, catalog: &Catalog, conn_id: &ConnectionId) -> Vec<String> {
126 let mut names: Vec<_> = self
127 .iter()
128 .filter_map(|id| catalog.try_get_entry_by_global_id(&id))
130 .map(|item| {
131 catalog
132 .resolve_full_name(item.name(), Some(conn_id))
133 .to_string()
134 })
135 .collect();
136 names.sort();
137 names
138 }
139}