mz_adapter/coord/catalog_implications/
parsed_state_updates.rs1use mz_catalog::builtin::BUILTIN_LOG_LOOKUP;
17use mz_catalog::memory::objects::{
18 CatalogItem, DataSourceDesc, StateDiff, StateUpdate, StateUpdateKind,
19};
20use mz_catalog::{durable, memory};
21use mz_compute_client::logging::LogVariant;
22use mz_controller_types::ClusterId;
23use mz_ore::instrument;
24use mz_repr::{CatalogItemId, GlobalId, Timestamp};
25use mz_storage_types::connections::inline::IntoInlineConnection;
26use mz_storage_types::sources::GenericSourceConnection;
27
28use crate::catalog::CatalogState;
30
31#[derive(Debug, Clone)]
33pub struct ParsedStateUpdate {
34 pub kind: ParsedStateUpdateKind,
35 pub ts: Timestamp,
36 pub diff: StateDiff,
37}
38
39#[derive(Debug, Clone)]
41pub enum ParsedStateUpdateKind {
42 Item {
43 durable_item: durable::objects::Item,
44 parsed_item: memory::objects::CatalogItem,
45 connection: Option<GenericSourceConnection>,
46 parsed_full_name: String,
47 },
48 TemporaryItem {
49 durable_item: memory::objects::TemporaryItem,
50 parsed_item: memory::objects::CatalogItem,
51 connection: Option<GenericSourceConnection>,
52 parsed_full_name: String,
53 },
54 Cluster {
55 durable_cluster: durable::objects::Cluster,
56 parsed_cluster: memory::objects::Cluster,
57 },
58 ClusterReplica {
59 durable_cluster_replica: durable::objects::ClusterReplica,
60 parsed_cluster_replica: memory::objects::ClusterReplica,
61 },
62 IntrospectionSourceIndex {
63 cluster_id: ClusterId,
64 log: LogVariant,
65 index_id: GlobalId,
66 },
67 ReplicaSystemConfiguration {
72 durable: durable::objects::ReplicaSystemConfiguration,
73 },
74}
75
76#[instrument(level = "debug")]
94pub fn parse_state_update(
95 catalog: &CatalogState,
96 state_update: StateUpdate,
97) -> Option<ParsedStateUpdate> {
98 let kind = match state_update.kind {
99 StateUpdateKind::Item(item) => Some(parse_item_update(catalog, item)),
100 StateUpdateKind::TemporaryItem(item) => Some(parse_temporary_item_update(catalog, item)),
101 StateUpdateKind::Cluster(cluster) => Some(parse_cluster_update(catalog, cluster)),
102 StateUpdateKind::ClusterReplica(replica) => {
103 Some(parse_cluster_replica_update(catalog, replica))
104 }
105 StateUpdateKind::IntrospectionSourceIndex(isi) => {
106 Some(parse_introspection_source_index_update(isi))
107 }
108 StateUpdateKind::ReplicaSystemConfiguration(durable) => {
109 Some(ParsedStateUpdateKind::ReplicaSystemConfiguration { durable })
110 }
111 _ => {
112 None
117 }
118 };
119
120 kind.map(|kind| ParsedStateUpdate {
121 kind,
122 ts: state_update.ts,
123 diff: state_update.diff,
124 })
125}
126
127fn parse_item_update(
128 catalog: &CatalogState,
129 durable_item: durable::objects::Item,
130) -> ParsedStateUpdateKind {
131 let (parsed_item, connection, parsed_full_name) =
132 parse_item_update_common(catalog, &durable_item.id);
133
134 ParsedStateUpdateKind::Item {
135 durable_item,
136 parsed_item,
137 connection,
138 parsed_full_name,
139 }
140}
141
142fn parse_temporary_item_update(
143 catalog: &CatalogState,
144 durable_item: memory::objects::TemporaryItem,
145) -> ParsedStateUpdateKind {
146 let (parsed_item, connection, parsed_full_name) =
147 parse_item_update_common(catalog, &durable_item.id);
148
149 ParsedStateUpdateKind::TemporaryItem {
150 durable_item,
151 parsed_item,
152 connection,
153 parsed_full_name,
154 }
155}
156
157fn parse_item_update_common(
159 catalog: &CatalogState,
160 item_id: &CatalogItemId,
161) -> (CatalogItem, Option<GenericSourceConnection>, String) {
162 let entry = catalog.get_entry(item_id);
163
164 let parsed_item = entry.item().clone();
165 let parsed_full_name = catalog
166 .resolve_full_name(entry.name(), entry.conn_id())
167 .to_string();
168
169 let connection = match &parsed_item {
170 memory::objects::CatalogItem::Source(source) => {
171 if let DataSourceDesc::Ingestion { desc, .. }
172 | DataSourceDesc::OldSyntaxIngestion { desc, .. } = &source.data_source
173 {
174 Some(desc.connection.clone().into_inline_connection(catalog))
175 } else {
176 None
177 }
178 }
179 _ => None,
180 };
181
182 (parsed_item, connection, parsed_full_name)
183}
184
185fn parse_cluster_update(
186 catalog: &CatalogState,
187 durable_cluster: durable::objects::Cluster,
188) -> ParsedStateUpdateKind {
189 let parsed_cluster = catalog.get_cluster(durable_cluster.id);
190
191 ParsedStateUpdateKind::Cluster {
192 durable_cluster,
193 parsed_cluster: parsed_cluster.clone(),
194 }
195}
196
197fn parse_introspection_source_index_update(
198 isi: durable::objects::IntrospectionSourceIndex,
199) -> ParsedStateUpdateKind {
200 let builtin_log = BUILTIN_LOG_LOOKUP
201 .get(isi.name.as_str())
202 .expect("introspection source index must reference a known log");
203
204 ParsedStateUpdateKind::IntrospectionSourceIndex {
205 cluster_id: isi.cluster_id,
206 log: builtin_log.variant.clone(),
207 index_id: isi.index_id,
208 }
209}
210
211fn parse_cluster_replica_update(
212 catalog: &CatalogState,
213 durable_cluster_replica: durable::objects::ClusterReplica,
214) -> ParsedStateUpdateKind {
215 let parsed_cluster_replica = catalog.get_cluster_replica(
216 durable_cluster_replica.cluster_id,
217 durable_cluster_replica.replica_id,
218 );
219
220 ParsedStateUpdateKind::ClusterReplica {
221 durable_cluster_replica,
222 parsed_cluster_replica: parsed_cluster_replica.clone(),
223 }
224}
225
226#[cfg(test)]
227mod tests {
228 use mz_catalog::durable::objects::ReplicaSystemConfiguration;
229 use mz_catalog::memory::objects::{StateDiff, StateUpdate, StateUpdateKind};
230 use mz_controller_types::ReplicaId;
231 use mz_repr::Timestamp;
232
233 use crate::catalog::Catalog;
234
235 use super::{ParsedStateUpdateKind, parse_state_update};
236
237 #[mz_ore::test(tokio::test)]
241 #[cfg_attr(miri, ignore)] async fn replica_system_configuration_is_parsed() {
243 Catalog::with_debug(|catalog| async move {
244 let durable = ReplicaSystemConfiguration {
245 replica_id: ReplicaId::User(1),
246 name: "persist_pager".to_string(),
247 value: "on".to_string(),
248 };
249 let update = StateUpdate {
250 kind: StateUpdateKind::ReplicaSystemConfiguration(durable),
251 ts: Timestamp::MIN,
252 diff: StateDiff::Addition,
253 };
254 let parsed = parse_state_update(catalog.state(), update)
255 .expect("replica system configuration must produce a parsed update");
256 assert!(matches!(
257 parsed.kind,
258 ParsedStateUpdateKind::ReplicaSystemConfiguration { .. }
259 ));
260 })
261 .await
262 }
263}