mz_adapter/coord/
id_bundle.rs1use std::collections::{BTreeMap, BTreeSet};
11
12use mz_adapter_types::connection::ConnectionId;
13use mz_compute_types::ComputeInstanceId;
14use mz_repr::GlobalId;
15use serde::{Deserialize, Serialize};
16
17use crate::catalog::Catalog;
18
19#[derive(Deserialize, Debug, Default, Clone, Serialize)]
21pub struct CollectionIdBundle {
22 pub storage_ids: BTreeSet<GlobalId>,
24 pub compute_ids: BTreeMap<ComputeInstanceId, BTreeSet<GlobalId>>,
26}
27
28impl CollectionIdBundle {
29 pub fn is_empty(&self) -> bool {
31 self.storage_ids.is_empty() && self.compute_ids.values().all(|ids| ids.is_empty())
32 }
33
34 pub fn difference(&self, other: &CollectionIdBundle) -> CollectionIdBundle {
36 let storage_ids = &self.storage_ids - &other.storage_ids;
37 let compute_ids: BTreeMap<_, _> = self
38 .compute_ids
39 .iter()
40 .map(|(compute_instance, compute_ids)| {
41 let compute_ids =
42 if let Some(other_compute_ids) = other.compute_ids.get(compute_instance) {
43 compute_ids - other_compute_ids
44 } else {
45 compute_ids.clone()
46 };
47 (*compute_instance, compute_ids)
48 })
49 .filter(|(_, ids)| !ids.is_empty())
50 .collect();
51 CollectionIdBundle {
52 storage_ids,
53 compute_ids,
54 }
55 }
56
57 pub fn intersection(&self, other: &CollectionIdBundle) -> CollectionIdBundle {
59 let storage_ids = self
61 .storage_ids
62 .intersection(&other.storage_ids)
63 .cloned()
64 .collect();
65
66 let self_compute_instances = self.compute_ids.keys().collect::<BTreeSet<_>>();
68 let other_compute_instances = other.compute_ids.keys().collect::<BTreeSet<_>>();
69 let compute_instances = self_compute_instances
70 .intersection(&other_compute_instances)
71 .cloned();
72
73 let compute_ids = compute_instances
75 .map(|compute_instance_id| {
76 let self_compute_ids = self
77 .compute_ids
78 .get(compute_instance_id)
79 .expect("id is in intersection, so should be found");
80 let other_compute_ids = other
81 .compute_ids
82 .get(compute_instance_id)
83 .expect("id is in intersection, so should be found");
84 (
85 compute_instance_id.clone(),
86 self_compute_ids
87 .intersection(other_compute_ids)
88 .cloned()
89 .collect(),
90 )
91 })
92 .collect();
93
94 CollectionIdBundle {
95 storage_ids,
96 compute_ids,
97 }
98 }
99
100 pub fn extend(&mut self, other: &CollectionIdBundle) {
102 self.storage_ids.extend(&other.storage_ids);
103 for (compute_instance, ids) in &other.compute_ids {
104 self.compute_ids
105 .entry(*compute_instance)
106 .or_default()
107 .extend(ids);
108 }
109 }
110
111 pub fn iter(&self) -> impl Iterator<Item = GlobalId> + '_ {
115 self.storage_ids
116 .iter()
117 .copied()
118 .chain(self.compute_ids.values().flat_map(BTreeSet::iter).copied())
119 }
120
121 pub fn resolve_names(&self, catalog: &Catalog, conn_id: &ConnectionId) -> Vec<String> {
124 let mut names: Vec<_> = self
125 .iter()
126 .filter_map(|id| catalog.try_get_entry_by_global_id(&id))
128 .map(|item| {
129 catalog
130 .resolve_full_name(item.name(), Some(conn_id))
131 .to_string()
132 })
133 .collect();
134 names.sort();
135 names
136 }
137}