Skip to main content

mz_adapter/coord/catalog_implications/
parsed_state_updates.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Utilities for parsing and augmenting "raw" catalog changes
11//! ([StateUpdateKind]), so that we can update in-memory adapter state, apply
12//! implications, and apply derived commands to the controller(s).
13//!
14//! See [parse_state_update] for details.
15
16use mz_catalog::builtin::BUILTIN_LOG_LOOKUP;
17use mz_catalog::memory::objects::{
18    CatalogItem, DataSourceDesc, StateDiff, StateUpdate, StateUpdateKind,
19};
20use mz_catalog::{durable, memory};
21use mz_compute_client::logging::LogVariant;
22use mz_controller_types::ClusterId;
23use mz_ore::instrument;
24use mz_repr::{CatalogItemId, GlobalId, Timestamp};
25use mz_storage_types::connections::inline::IntoInlineConnection;
26use mz_storage_types::sources::GenericSourceConnection;
27
28// DO NOT add any more imports from `crate` outside of `crate::catalog`.
29use crate::catalog::CatalogState;
30
31/// An update that needs to be applied to a controller.
32#[derive(Debug, Clone)]
33pub struct ParsedStateUpdate {
34    pub kind: ParsedStateUpdateKind,
35    pub ts: Timestamp,
36    pub diff: StateDiff,
37}
38
39/// An update that needs to be applied to a controller.
40#[derive(Debug, Clone)]
41pub enum ParsedStateUpdateKind {
42    Item {
43        durable_item: durable::objects::Item,
44        parsed_item: memory::objects::CatalogItem,
45        connection: Option<GenericSourceConnection>,
46        parsed_full_name: String,
47    },
48    TemporaryItem {
49        durable_item: memory::objects::TemporaryItem,
50        parsed_item: memory::objects::CatalogItem,
51        connection: Option<GenericSourceConnection>,
52        parsed_full_name: String,
53    },
54    Cluster {
55        durable_cluster: durable::objects::Cluster,
56        parsed_cluster: memory::objects::Cluster,
57    },
58    ClusterReplica {
59        durable_cluster_replica: durable::objects::ClusterReplica,
60        parsed_cluster_replica: memory::objects::ClusterReplica,
61    },
62    IntrospectionSourceIndex {
63        cluster_id: ClusterId,
64        log: LogVariant,
65        index_id: GlobalId,
66    },
67}
68
69/// Potentially generate a [ParsedStateUpdate] that corresponds to the given
70/// change to the catalog.
71///
72/// This technically doesn't "parse" the given state update but uses the given
73/// in-memory [CatalogState] as a shortcut. It already contains the parsed
74/// representation of the item. In theory, we could re-construct the parsed
75/// items by hand if we're given all the changes that lead to a given catalog
76/// state.
77///
78/// For changes with a positive diff, the given [CatalogState] must reflect the
79/// catalog state _after_ applying the catalog change to the catalog. For
80/// negative changes, the given [CatalogState] must reflect the catalog state
81/// _before_ applying the changes. This is so that we can easily extract the
82/// state of an object before it is removed.
83///
84/// Will return `None` if the given catalog change is purely internal to the
85/// catalog and does not have implications for anything else.
86#[instrument(level = "debug")]
87pub fn parse_state_update(
88    catalog: &CatalogState,
89    state_update: StateUpdate,
90) -> Option<ParsedStateUpdate> {
91    let kind = match state_update.kind {
92        StateUpdateKind::Item(item) => Some(parse_item_update(catalog, item)),
93        StateUpdateKind::TemporaryItem(item) => Some(parse_temporary_item_update(catalog, item)),
94        StateUpdateKind::Cluster(cluster) => Some(parse_cluster_update(catalog, cluster)),
95        StateUpdateKind::ClusterReplica(replica) => {
96            Some(parse_cluster_replica_update(catalog, replica))
97        }
98        StateUpdateKind::IntrospectionSourceIndex(isi) => {
99            Some(parse_introspection_source_index_update(isi))
100        }
101        _ => {
102            // The controllers are currently not interested in other kinds of
103            // changes to the catalog.
104            None
105        }
106    };
107
108    kind.map(|kind| ParsedStateUpdate {
109        kind,
110        ts: state_update.ts,
111        diff: state_update.diff,
112    })
113}
114
115fn parse_item_update(
116    catalog: &CatalogState,
117    durable_item: durable::objects::Item,
118) -> ParsedStateUpdateKind {
119    let (parsed_item, connection, parsed_full_name) =
120        parse_item_update_common(catalog, &durable_item.id);
121
122    ParsedStateUpdateKind::Item {
123        durable_item,
124        parsed_item,
125        connection,
126        parsed_full_name,
127    }
128}
129
130fn parse_temporary_item_update(
131    catalog: &CatalogState,
132    durable_item: memory::objects::TemporaryItem,
133) -> ParsedStateUpdateKind {
134    let (parsed_item, connection, parsed_full_name) =
135        parse_item_update_common(catalog, &durable_item.id);
136
137    ParsedStateUpdateKind::TemporaryItem {
138        durable_item,
139        parsed_item,
140        connection,
141        parsed_full_name,
142    }
143}
144
145// Shared between temporary items and durable items.
146fn parse_item_update_common(
147    catalog: &CatalogState,
148    item_id: &CatalogItemId,
149) -> (CatalogItem, Option<GenericSourceConnection>, String) {
150    let entry = catalog.get_entry(item_id);
151
152    let parsed_item = entry.item().clone();
153    let parsed_full_name = catalog
154        .resolve_full_name(entry.name(), entry.conn_id())
155        .to_string();
156
157    let connection = match &parsed_item {
158        memory::objects::CatalogItem::Source(source) => {
159            if let DataSourceDesc::Ingestion { desc, .. }
160            | DataSourceDesc::OldSyntaxIngestion { desc, .. } = &source.data_source
161            {
162                Some(desc.connection.clone().into_inline_connection(catalog))
163            } else {
164                None
165            }
166        }
167        _ => None,
168    };
169
170    (parsed_item, connection, parsed_full_name)
171}
172
173fn parse_cluster_update(
174    catalog: &CatalogState,
175    durable_cluster: durable::objects::Cluster,
176) -> ParsedStateUpdateKind {
177    let parsed_cluster = catalog.get_cluster(durable_cluster.id);
178
179    ParsedStateUpdateKind::Cluster {
180        durable_cluster,
181        parsed_cluster: parsed_cluster.clone(),
182    }
183}
184
185fn parse_introspection_source_index_update(
186    isi: durable::objects::IntrospectionSourceIndex,
187) -> ParsedStateUpdateKind {
188    let builtin_log = BUILTIN_LOG_LOOKUP
189        .get(isi.name.as_str())
190        .expect("introspection source index must reference a known log");
191
192    ParsedStateUpdateKind::IntrospectionSourceIndex {
193        cluster_id: isi.cluster_id,
194        log: builtin_log.variant.clone(),
195        index_id: isi.index_id,
196    }
197}
198
199fn parse_cluster_replica_update(
200    catalog: &CatalogState,
201    durable_cluster_replica: durable::objects::ClusterReplica,
202) -> ParsedStateUpdateKind {
203    let parsed_cluster_replica = catalog.get_cluster_replica(
204        durable_cluster_replica.cluster_id,
205        durable_cluster_replica.replica_id,
206    );
207
208    ParsedStateUpdateKind::ClusterReplica {
209        durable_cluster_replica,
210        parsed_cluster_replica: parsed_cluster_replica.clone(),
211    }
212}