mz_adapter/coord/catalog_implications/
parsed_state_updates.rs1use mz_catalog::memory::objects::{
17 CatalogItem, DataSourceDesc, StateDiff, StateUpdate, StateUpdateKind,
18};
19use mz_catalog::{durable, memory};
20use mz_ore::instrument;
21use mz_repr::{CatalogItemId, Timestamp};
22use mz_storage_types::connections::inline::IntoInlineConnection;
23use mz_storage_types::sources::GenericSourceConnection;
24
25use crate::catalog::CatalogState;
27
28#[derive(Debug, Clone)]
30pub struct ParsedStateUpdate {
31 pub kind: ParsedStateUpdateKind,
32 pub ts: Timestamp,
33 pub diff: StateDiff,
34}
35
36#[derive(Debug, Clone)]
38pub enum ParsedStateUpdateKind {
39 Item {
40 durable_item: durable::objects::Item,
41 parsed_item: memory::objects::CatalogItem,
42 connection: Option<GenericSourceConnection>,
43 parsed_full_name: String,
44 },
45 TemporaryItem {
46 durable_item: memory::objects::TemporaryItem,
47 parsed_item: memory::objects::CatalogItem,
48 connection: Option<GenericSourceConnection>,
49 parsed_full_name: String,
50 },
51 Cluster {
52 durable_cluster: durable::objects::Cluster,
53 parsed_cluster: memory::objects::Cluster,
54 },
55 ClusterReplica {
56 durable_cluster_replica: durable::objects::ClusterReplica,
57 parsed_cluster_replica: memory::objects::ClusterReplica,
58 },
59}
60
61#[instrument(level = "debug")]
79pub fn parse_state_update(
80 catalog: &CatalogState,
81 state_update: StateUpdate,
82) -> Option<ParsedStateUpdate> {
83 let kind = match state_update.kind {
84 StateUpdateKind::Item(item) => Some(parse_item_update(catalog, item)),
85 StateUpdateKind::TemporaryItem(item) => Some(parse_temporary_item_update(catalog, item)),
86 StateUpdateKind::Cluster(cluster) => Some(parse_cluster_update(catalog, cluster)),
87 StateUpdateKind::ClusterReplica(replica) => {
88 Some(parse_cluster_replica_update(catalog, replica))
89 }
90 _ => {
91 None
94 }
95 };
96
97 kind.map(|kind| ParsedStateUpdate {
98 kind,
99 ts: state_update.ts,
100 diff: state_update.diff,
101 })
102}
103
104fn parse_item_update(
105 catalog: &CatalogState,
106 durable_item: durable::objects::Item,
107) -> ParsedStateUpdateKind {
108 let (parsed_item, connection, parsed_full_name) =
109 parse_item_update_common(catalog, &durable_item.id);
110
111 ParsedStateUpdateKind::Item {
112 durable_item,
113 parsed_item,
114 connection,
115 parsed_full_name,
116 }
117}
118
119fn parse_temporary_item_update(
120 catalog: &CatalogState,
121 durable_item: memory::objects::TemporaryItem,
122) -> ParsedStateUpdateKind {
123 let (parsed_item, connection, parsed_full_name) =
124 parse_item_update_common(catalog, &durable_item.id);
125
126 ParsedStateUpdateKind::TemporaryItem {
127 durable_item,
128 parsed_item,
129 connection,
130 parsed_full_name,
131 }
132}
133
134fn parse_item_update_common(
136 catalog: &CatalogState,
137 item_id: &CatalogItemId,
138) -> (CatalogItem, Option<GenericSourceConnection>, String) {
139 let entry = catalog.get_entry(item_id);
140
141 let parsed_item = entry.item().clone();
142 let parsed_full_name = catalog
143 .resolve_full_name(entry.name(), entry.conn_id())
144 .to_string();
145
146 let connection = match &parsed_item {
147 memory::objects::CatalogItem::Source(source) => {
148 if let DataSourceDesc::Ingestion { desc, .. }
149 | DataSourceDesc::OldSyntaxIngestion { desc, .. } = &source.data_source
150 {
151 Some(desc.connection.clone().into_inline_connection(catalog))
152 } else {
153 None
154 }
155 }
156 _ => None,
157 };
158
159 (parsed_item, connection, parsed_full_name)
160}
161
162fn parse_cluster_update(
163 catalog: &CatalogState,
164 durable_cluster: durable::objects::Cluster,
165) -> ParsedStateUpdateKind {
166 let parsed_cluster = catalog.get_cluster(durable_cluster.id);
167
168 ParsedStateUpdateKind::Cluster {
169 durable_cluster,
170 parsed_cluster: parsed_cluster.clone(),
171 }
172}
173
174fn parse_cluster_replica_update(
175 catalog: &CatalogState,
176 durable_cluster_replica: durable::objects::ClusterReplica,
177) -> ParsedStateUpdateKind {
178 let parsed_cluster_replica = catalog.get_cluster_replica(
179 durable_cluster_replica.cluster_id,
180 durable_cluster_replica.replica_id,
181 );
182
183 ParsedStateUpdateKind::ClusterReplica {
184 durable_cluster_replica,
185 parsed_cluster_replica: parsed_cluster_replica.clone(),
186 }
187}