mz_adapter/coord/
id_bundle.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11
12use mz_adapter_types::connection::ConnectionId;
13use mz_compute_types::ComputeInstanceId;
14use mz_repr::GlobalId;
15use serde::{Deserialize, Serialize};
16
17use crate::catalog::Catalog;
18
19/// A bundle of storage and compute collection identifiers.
20#[derive(Deserialize, Debug, Default, Clone, Serialize)]
21pub struct CollectionIdBundle {
22    /// The identifiers for sources in the storage layer.
23    pub storage_ids: BTreeSet<GlobalId>,
24    /// The identifiers for indexes in the compute layer.
25    pub compute_ids: BTreeMap<ComputeInstanceId, BTreeSet<GlobalId>>,
26}
27
28impl CollectionIdBundle {
29    /// Reports whether the bundle contains any identifiers of any type.
30    pub fn is_empty(&self) -> bool {
31        self.storage_ids.is_empty() && self.compute_ids.values().all(|ids| ids.is_empty())
32    }
33
34    /// Returns a new bundle without the identifiers from `other`.
35    pub fn difference(&self, other: &CollectionIdBundle) -> CollectionIdBundle {
36        let storage_ids = &self.storage_ids - &other.storage_ids;
37        let compute_ids: BTreeMap<_, _> = self
38            .compute_ids
39            .iter()
40            .map(|(compute_instance, compute_ids)| {
41                let compute_ids =
42                    if let Some(other_compute_ids) = other.compute_ids.get(compute_instance) {
43                        compute_ids - other_compute_ids
44                    } else {
45                        compute_ids.clone()
46                    };
47                (*compute_instance, compute_ids)
48            })
49            .filter(|(_, ids)| !ids.is_empty())
50            .collect();
51        CollectionIdBundle {
52            storage_ids,
53            compute_ids,
54        }
55    }
56
57    /// Returns a new bundle with the identifiers that are present in both `self` and `other`.
58    pub fn intersection(&self, other: &CollectionIdBundle) -> CollectionIdBundle {
59        // Attend to storage ids.
60        let storage_ids = self
61            .storage_ids
62            .intersection(&other.storage_ids)
63            .cloned()
64            .collect();
65
66        // Intersect ComputeInstanceIds.
67        let self_compute_instances = self.compute_ids.keys().collect::<BTreeSet<_>>();
68        let other_compute_instances = other.compute_ids.keys().collect::<BTreeSet<_>>();
69        let compute_instances = self_compute_instances
70            .intersection(&other_compute_instances)
71            .cloned();
72
73        // For each ComputeInstanceId, intersect `self` with `other`.
74        let compute_ids = compute_instances
75            .map(|compute_instance_id| {
76                let self_compute_ids = self
77                    .compute_ids
78                    .get(compute_instance_id)
79                    .expect("id is in intersection, so should be found");
80                let other_compute_ids = other
81                    .compute_ids
82                    .get(compute_instance_id)
83                    .expect("id is in intersection, so should be found");
84                (
85                    compute_instance_id.clone(),
86                    self_compute_ids
87                        .intersection(other_compute_ids)
88                        .cloned()
89                        .collect(),
90                )
91            })
92            .collect();
93
94        CollectionIdBundle {
95            storage_ids,
96            compute_ids,
97        }
98    }
99
100    /// Extends a `CollectionIdBundle` with the contents of another `CollectionIdBundle`.
101    pub fn extend(&mut self, other: &CollectionIdBundle) {
102        self.storage_ids.extend(&other.storage_ids);
103        for (compute_instance, ids) in &other.compute_ids {
104            self.compute_ids
105                .entry(*compute_instance)
106                .or_default()
107                .extend(ids);
108        }
109    }
110
111    /// Returns an iterator over all IDs in the bundle.
112    ///
113    /// The IDs are iterated in an unspecified order.
114    pub fn iter(&self) -> impl Iterator<Item = GlobalId> + '_ {
115        self.storage_ids
116            .iter()
117            .copied()
118            .chain(self.compute_ids.values().flat_map(BTreeSet::iter).copied())
119    }
120
121    /// Resolves the full name from the corresponding catalog entry for each item in this bundle.
122    /// If an item in the bundle does not exist in the catalog, it's not included in the result.
123    pub fn resolve_names(&self, catalog: &Catalog, conn_id: &ConnectionId) -> Vec<String> {
124        let mut names: Vec<_> = self
125            .iter()
126            // This could filter out an entry that has been replaced in another transaction.
127            .filter_map(|id| catalog.try_get_entry_by_global_id(&id))
128            .map(|item| {
129                catalog
130                    .resolve_full_name(item.name(), Some(conn_id))
131                    .to_string()
132            })
133            .collect();
134        names.sort();
135        names
136    }
137}