Skip to main content

mz_adapter/coord/catalog_implications/
parsed_state_updates.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Utilities for parsing and augmenting "raw" catalog changes
11//! ([StateUpdateKind]), so that we can update in-memory adapter state, apply
12//! implications, and apply derived commands to the controller(s).
13//!
14//! See [parse_state_update] for details.
15
16use mz_catalog::builtin::BUILTIN_LOG_LOOKUP;
17use mz_catalog::memory::objects::{
18    CatalogItem, DataSourceDesc, StateDiff, StateUpdate, StateUpdateKind,
19};
20use mz_catalog::{durable, memory};
21use mz_compute_client::logging::LogVariant;
22use mz_controller_types::ClusterId;
23use mz_ore::instrument;
24use mz_repr::{CatalogItemId, GlobalId, Timestamp};
25use mz_storage_types::connections::inline::IntoInlineConnection;
26use mz_storage_types::sources::GenericSourceConnection;
27
28// DO NOT add any more imports from `crate` outside of `crate::catalog`.
29use crate::catalog::CatalogState;
30
31/// An update that needs to be applied to a controller.
32#[derive(Debug, Clone)]
33pub struct ParsedStateUpdate {
34    pub kind: ParsedStateUpdateKind,
35    pub ts: Timestamp,
36    pub diff: StateDiff,
37}
38
39/// An update that needs to be applied to a controller.
40#[derive(Debug, Clone)]
41pub enum ParsedStateUpdateKind {
42    Item {
43        durable_item: durable::objects::Item,
44        parsed_item: memory::objects::CatalogItem,
45        connection: Option<GenericSourceConnection>,
46        parsed_full_name: String,
47    },
48    TemporaryItem {
49        durable_item: memory::objects::TemporaryItem,
50        parsed_item: memory::objects::CatalogItem,
51        connection: Option<GenericSourceConnection>,
52        parsed_full_name: String,
53    },
54    Cluster {
55        durable_cluster: durable::objects::Cluster,
56        parsed_cluster: memory::objects::Cluster,
57    },
58    ClusterReplica {
59        durable_cluster_replica: durable::objects::ClusterReplica,
60        parsed_cluster_replica: memory::objects::ClusterReplica,
61    },
62    IntrospectionSourceIndex {
63        cluster_id: ClusterId,
64        log: LogVariant,
65        index_id: GlobalId,
66    },
67    /// A replica-scoped system-parameter override changed. The implication
68    /// re-pushes the complete per-replica dyncfg layer from the catalog working
69    /// copy, so it does not consume `durable`. We keep the row only so it shows
70    /// up in the `tracing::trace!` of the parsed update.
71    ReplicaSystemConfiguration {
72        durable: durable::objects::ReplicaSystemConfiguration,
73    },
74}
75
76/// Potentially generate a [ParsedStateUpdate] that corresponds to the given
77/// change to the catalog.
78///
79/// This technically doesn't "parse" the given state update but uses the given
80/// in-memory [CatalogState] as a shortcut. It already contains the parsed
81/// representation of the item. In theory, we could re-construct the parsed
82/// items by hand if we're given all the changes that lead to a given catalog
83/// state.
84///
85/// For changes with a positive diff, the given [CatalogState] must reflect the
86/// catalog state _after_ applying the catalog change to the catalog. For
87/// negative changes, the given [CatalogState] must reflect the catalog state
88/// _before_ applying the changes. This is so that we can easily extract the
89/// state of an object before it is removed.
90///
91/// Will return `None` if the given catalog change is purely internal to the
92/// catalog and does not have implications for anything else.
93#[instrument(level = "debug")]
94pub fn parse_state_update(
95    catalog: &CatalogState,
96    state_update: StateUpdate,
97) -> Option<ParsedStateUpdate> {
98    let kind = match state_update.kind {
99        StateUpdateKind::Item(item) => Some(parse_item_update(catalog, item)),
100        StateUpdateKind::TemporaryItem(item) => Some(parse_temporary_item_update(catalog, item)),
101        StateUpdateKind::Cluster(cluster) => Some(parse_cluster_update(catalog, cluster)),
102        StateUpdateKind::ClusterReplica(replica) => {
103            Some(parse_cluster_replica_update(catalog, replica))
104        }
105        StateUpdateKind::IntrospectionSourceIndex(isi) => {
106            Some(parse_introspection_source_index_update(isi))
107        }
108        StateUpdateKind::ReplicaSystemConfiguration(durable) => {
109            Some(ParsedStateUpdateKind::ReplicaSystemConfiguration { durable })
110        }
111        _ => {
112            // The controllers are currently not interested in other kinds of
113            // changes to the catalog. Cluster-scoped system-parameter overrides
114            // are read at plan time and have no controller effect, so they stay
115            // here too.
116            None
117        }
118    };
119
120    kind.map(|kind| ParsedStateUpdate {
121        kind,
122        ts: state_update.ts,
123        diff: state_update.diff,
124    })
125}
126
127fn parse_item_update(
128    catalog: &CatalogState,
129    durable_item: durable::objects::Item,
130) -> ParsedStateUpdateKind {
131    let (parsed_item, connection, parsed_full_name) =
132        parse_item_update_common(catalog, &durable_item.id);
133
134    ParsedStateUpdateKind::Item {
135        durable_item,
136        parsed_item,
137        connection,
138        parsed_full_name,
139    }
140}
141
142fn parse_temporary_item_update(
143    catalog: &CatalogState,
144    durable_item: memory::objects::TemporaryItem,
145) -> ParsedStateUpdateKind {
146    let (parsed_item, connection, parsed_full_name) =
147        parse_item_update_common(catalog, &durable_item.id);
148
149    ParsedStateUpdateKind::TemporaryItem {
150        durable_item,
151        parsed_item,
152        connection,
153        parsed_full_name,
154    }
155}
156
157// Shared between temporary items and durable items.
158fn parse_item_update_common(
159    catalog: &CatalogState,
160    item_id: &CatalogItemId,
161) -> (CatalogItem, Option<GenericSourceConnection>, String) {
162    let entry = catalog.get_entry(item_id);
163
164    let parsed_item = entry.item().clone();
165    let parsed_full_name = catalog
166        .resolve_full_name(entry.name(), entry.conn_id())
167        .to_string();
168
169    let connection = match &parsed_item {
170        memory::objects::CatalogItem::Source(source) => {
171            if let DataSourceDesc::Ingestion { desc, .. }
172            | DataSourceDesc::OldSyntaxIngestion { desc, .. } = &source.data_source
173            {
174                Some(desc.connection.clone().into_inline_connection(catalog))
175            } else {
176                None
177            }
178        }
179        _ => None,
180    };
181
182    (parsed_item, connection, parsed_full_name)
183}
184
185fn parse_cluster_update(
186    catalog: &CatalogState,
187    durable_cluster: durable::objects::Cluster,
188) -> ParsedStateUpdateKind {
189    let parsed_cluster = catalog.get_cluster(durable_cluster.id);
190
191    ParsedStateUpdateKind::Cluster {
192        durable_cluster,
193        parsed_cluster: parsed_cluster.clone(),
194    }
195}
196
197fn parse_introspection_source_index_update(
198    isi: durable::objects::IntrospectionSourceIndex,
199) -> ParsedStateUpdateKind {
200    let builtin_log = BUILTIN_LOG_LOOKUP
201        .get(isi.name.as_str())
202        .expect("introspection source index must reference a known log");
203
204    ParsedStateUpdateKind::IntrospectionSourceIndex {
205        cluster_id: isi.cluster_id,
206        log: builtin_log.variant.clone(),
207        index_id: isi.index_id,
208    }
209}
210
211fn parse_cluster_replica_update(
212    catalog: &CatalogState,
213    durable_cluster_replica: durable::objects::ClusterReplica,
214) -> ParsedStateUpdateKind {
215    let parsed_cluster_replica = catalog.get_cluster_replica(
216        durable_cluster_replica.cluster_id,
217        durable_cluster_replica.replica_id,
218    );
219
220    ParsedStateUpdateKind::ClusterReplica {
221        durable_cluster_replica,
222        parsed_cluster_replica: parsed_cluster_replica.clone(),
223    }
224}
225
226#[cfg(test)]
227mod tests {
228    use mz_catalog::durable::objects::ReplicaSystemConfiguration;
229    use mz_catalog::memory::objects::{StateDiff, StateUpdate, StateUpdateKind};
230    use mz_controller_types::ReplicaId;
231    use mz_repr::Timestamp;
232
233    use crate::catalog::Catalog;
234
235    use super::{ParsedStateUpdateKind, parse_state_update};
236
237    /// A replica-scoped system-parameter change must produce a parsed update so
238    /// the controller push fires. It was previously dropped as a change the
239    /// controllers were not interested in.
240    #[mz_ore::test(tokio::test)]
241    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function on OS `linux`
242    async fn replica_system_configuration_is_parsed() {
243        Catalog::with_debug(|catalog| async move {
244            let durable = ReplicaSystemConfiguration {
245                replica_id: ReplicaId::User(1),
246                name: "persist_pager".to_string(),
247                value: "on".to_string(),
248            };
249            let update = StateUpdate {
250                kind: StateUpdateKind::ReplicaSystemConfiguration(durable),
251                ts: Timestamp::MIN,
252                diff: StateDiff::Addition,
253            };
254            let parsed = parse_state_update(catalog.state(), update)
255                .expect("replica system configuration must produce a parsed update");
256            assert!(matches!(
257                parsed.kind,
258                ParsedStateUpdateKind::ReplicaSystemConfiguration { .. }
259            ));
260        })
261        .await
262    }
263}