1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::collections::{BTreeMap, BTreeSet};

use mz_compute_types::ComputeInstanceId;
use mz_repr::GlobalId;
use mz_sql::session::metadata::SessionMetadata;
use serde::{Deserialize, Serialize};

use crate::coord::Coordinator;
use crate::session::Session;

/// A bundle of storage and compute collection identifiers.
#[derive(Deserialize, Debug, Default, Clone, Serialize)]
pub struct CollectionIdBundle {
    /// The identifiers for sources in the storage layer.
    pub storage_ids: BTreeSet<GlobalId>,
    /// The identifiers for indexes in the compute layer.
    pub compute_ids: BTreeMap<ComputeInstanceId, BTreeSet<GlobalId>>,
}

impl CollectionIdBundle {
    /// Reports whether the bundle contains any identifiers of any type.
    pub fn is_empty(&self) -> bool {
        self.storage_ids.is_empty() && self.compute_ids.values().all(|ids| ids.is_empty())
    }

    /// Returns a new bundle without the identifiers from `other`.
    pub fn difference(&self, other: &CollectionIdBundle) -> CollectionIdBundle {
        let storage_ids = &self.storage_ids - &other.storage_ids;
        let compute_ids: BTreeMap<_, _> = self
            .compute_ids
            .iter()
            .map(|(compute_instance, compute_ids)| {
                let compute_ids =
                    if let Some(other_compute_ids) = other.compute_ids.get(compute_instance) {
                        compute_ids - other_compute_ids
                    } else {
                        compute_ids.clone()
                    };
                (*compute_instance, compute_ids)
            })
            .filter(|(_, ids)| !ids.is_empty())
            .collect();
        CollectionIdBundle {
            storage_ids,
            compute_ids,
        }
    }

    /// Returns a new bundle with the identifiers that are present in both `self` and `other`.
    pub fn intersection(&self, other: &CollectionIdBundle) -> CollectionIdBundle {
        // Attend to storage ids.
        let storage_ids = self
            .storage_ids
            .intersection(&other.storage_ids)
            .cloned()
            .collect();

        // Intersect ComputeInstanceIds.
        let self_compute_instances = self.compute_ids.keys().collect::<BTreeSet<_>>();
        let other_compute_instances = other.compute_ids.keys().collect::<BTreeSet<_>>();
        let compute_instances = self_compute_instances
            .intersection(&other_compute_instances)
            .cloned();

        // For each ComputeInstanceId, intersect `self` with `other`.
        let compute_ids = compute_instances
            .map(|compute_instance_id| {
                let self_compute_ids = self
                    .compute_ids
                    .get(compute_instance_id)
                    .expect("id is in intersection, so should be found");
                let other_compute_ids = other
                    .compute_ids
                    .get(compute_instance_id)
                    .expect("id is in intersection, so should be found");
                (
                    compute_instance_id.clone(),
                    self_compute_ids
                        .intersection(other_compute_ids)
                        .cloned()
                        .collect(),
                )
            })
            .collect();

        CollectionIdBundle {
            storage_ids,
            compute_ids,
        }
    }

    /// Extends a `CollectionIdBundle` with the contents of another `CollectionIdBundle`.
    pub fn extend(&mut self, other: &CollectionIdBundle) {
        self.storage_ids.extend(&other.storage_ids);
        for (compute_instance, ids) in &other.compute_ids {
            self.compute_ids
                .entry(*compute_instance)
                .or_default()
                .extend(ids);
        }
    }

    /// Returns an iterator over all IDs in the bundle.
    ///
    /// The IDs are iterated in an unspecified order.
    pub fn iter(&self) -> impl Iterator<Item = GlobalId> + '_ {
        self.storage_ids
            .iter()
            .copied()
            .chain(self.compute_ids.values().flat_map(BTreeSet::iter).copied())
    }
}

impl Coordinator {
    /// Resolves the full name from the corresponding catalog entry for each item in `id_bundle`.
    /// If an item in the bundle does not exist in the catalog, it's not included in the result.
    pub fn resolve_collection_id_bundle_names(
        &self,
        session: &Session,
        id_bundle: &CollectionIdBundle,
    ) -> Vec<String> {
        let mut names: Vec<_> = id_bundle
            .iter()
            // This could filter out an entry that has been replaced in another transaction.
            .filter_map(|id| self.catalog().try_get_entry(&id))
            .map(|item| {
                self.catalog()
                    .resolve_full_name(item.name(), Some(session.conn_id()))
                    .to_string()
            })
            .collect();
        names.sort();
        names
    }
}