mz_adapter/coord/catalog_implications/
parsed_state_updates.rs1use mz_catalog::builtin::BUILTIN_LOG_LOOKUP;
17use mz_catalog::memory::objects::{
18 CatalogItem, DataSourceDesc, StateDiff, StateUpdate, StateUpdateKind,
19};
20use mz_catalog::{durable, memory};
21use mz_compute_client::logging::LogVariant;
22use mz_controller_types::ClusterId;
23use mz_ore::instrument;
24use mz_repr::{CatalogItemId, GlobalId, Timestamp};
25use mz_storage_types::connections::inline::IntoInlineConnection;
26use mz_storage_types::sources::GenericSourceConnection;
27
28use crate::catalog::CatalogState;
30
31#[derive(Debug, Clone)]
33pub struct ParsedStateUpdate {
34 pub kind: ParsedStateUpdateKind,
35 pub ts: Timestamp,
36 pub diff: StateDiff,
37}
38
39#[derive(Debug, Clone)]
41pub enum ParsedStateUpdateKind {
42 Item {
43 durable_item: durable::objects::Item,
44 parsed_item: memory::objects::CatalogItem,
45 connection: Option<GenericSourceConnection>,
46 parsed_full_name: String,
47 },
48 TemporaryItem {
49 durable_item: memory::objects::TemporaryItem,
50 parsed_item: memory::objects::CatalogItem,
51 connection: Option<GenericSourceConnection>,
52 parsed_full_name: String,
53 },
54 Cluster {
55 durable_cluster: durable::objects::Cluster,
56 parsed_cluster: memory::objects::Cluster,
57 },
58 ClusterReplica {
59 durable_cluster_replica: durable::objects::ClusterReplica,
60 parsed_cluster_replica: memory::objects::ClusterReplica,
61 },
62 IntrospectionSourceIndex {
63 cluster_id: ClusterId,
64 log: LogVariant,
65 index_id: GlobalId,
66 },
67}
68
69#[instrument(level = "debug")]
87pub fn parse_state_update(
88 catalog: &CatalogState,
89 state_update: StateUpdate,
90) -> Option<ParsedStateUpdate> {
91 let kind = match state_update.kind {
92 StateUpdateKind::Item(item) => Some(parse_item_update(catalog, item)),
93 StateUpdateKind::TemporaryItem(item) => Some(parse_temporary_item_update(catalog, item)),
94 StateUpdateKind::Cluster(cluster) => Some(parse_cluster_update(catalog, cluster)),
95 StateUpdateKind::ClusterReplica(replica) => {
96 Some(parse_cluster_replica_update(catalog, replica))
97 }
98 StateUpdateKind::IntrospectionSourceIndex(isi) => {
99 Some(parse_introspection_source_index_update(isi))
100 }
101 _ => {
102 None
105 }
106 };
107
108 kind.map(|kind| ParsedStateUpdate {
109 kind,
110 ts: state_update.ts,
111 diff: state_update.diff,
112 })
113}
114
115fn parse_item_update(
116 catalog: &CatalogState,
117 durable_item: durable::objects::Item,
118) -> ParsedStateUpdateKind {
119 let (parsed_item, connection, parsed_full_name) =
120 parse_item_update_common(catalog, &durable_item.id);
121
122 ParsedStateUpdateKind::Item {
123 durable_item,
124 parsed_item,
125 connection,
126 parsed_full_name,
127 }
128}
129
130fn parse_temporary_item_update(
131 catalog: &CatalogState,
132 durable_item: memory::objects::TemporaryItem,
133) -> ParsedStateUpdateKind {
134 let (parsed_item, connection, parsed_full_name) =
135 parse_item_update_common(catalog, &durable_item.id);
136
137 ParsedStateUpdateKind::TemporaryItem {
138 durable_item,
139 parsed_item,
140 connection,
141 parsed_full_name,
142 }
143}
144
145fn parse_item_update_common(
147 catalog: &CatalogState,
148 item_id: &CatalogItemId,
149) -> (CatalogItem, Option<GenericSourceConnection>, String) {
150 let entry = catalog.get_entry(item_id);
151
152 let parsed_item = entry.item().clone();
153 let parsed_full_name = catalog
154 .resolve_full_name(entry.name(), entry.conn_id())
155 .to_string();
156
157 let connection = match &parsed_item {
158 memory::objects::CatalogItem::Source(source) => {
159 if let DataSourceDesc::Ingestion { desc, .. }
160 | DataSourceDesc::OldSyntaxIngestion { desc, .. } = &source.data_source
161 {
162 Some(desc.connection.clone().into_inline_connection(catalog))
163 } else {
164 None
165 }
166 }
167 _ => None,
168 };
169
170 (parsed_item, connection, parsed_full_name)
171}
172
173fn parse_cluster_update(
174 catalog: &CatalogState,
175 durable_cluster: durable::objects::Cluster,
176) -> ParsedStateUpdateKind {
177 let parsed_cluster = catalog.get_cluster(durable_cluster.id);
178
179 ParsedStateUpdateKind::Cluster {
180 durable_cluster,
181 parsed_cluster: parsed_cluster.clone(),
182 }
183}
184
185fn parse_introspection_source_index_update(
186 isi: durable::objects::IntrospectionSourceIndex,
187) -> ParsedStateUpdateKind {
188 let builtin_log = BUILTIN_LOG_LOOKUP
189 .get(isi.name.as_str())
190 .expect("introspection source index must reference a known log");
191
192 ParsedStateUpdateKind::IntrospectionSourceIndex {
193 cluster_id: isi.cluster_id,
194 log: builtin_log.variant.clone(),
195 index_id: isi.index_id,
196 }
197}
198
199fn parse_cluster_replica_update(
200 catalog: &CatalogState,
201 durable_cluster_replica: durable::objects::ClusterReplica,
202) -> ParsedStateUpdateKind {
203 let parsed_cluster_replica = catalog.get_cluster_replica(
204 durable_cluster_replica.cluster_id,
205 durable_cluster_replica.replica_id,
206 );
207
208 ParsedStateUpdateKind::ClusterReplica {
209 durable_cluster_replica,
210 parsed_cluster_replica: parsed_cluster_replica.clone(),
211 }
212}