mz_adapter/coord/catalog_implications/
parsed_state_updates.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Utilities for parsing and augmenting "raw" catalog changes
11//! ([StateUpdateKind]), so that we can update in-memory adapter state, apply
12//! implications, and apply derived commands to the controller(s).
13//!
14//! See [parse_state_update] for details.
15
16use mz_catalog::memory::objects::{
17    CatalogItem, DataSourceDesc, StateDiff, StateUpdate, StateUpdateKind,
18};
19use mz_catalog::{durable, memory};
20use mz_ore::instrument;
21use mz_repr::{CatalogItemId, Timestamp};
22use mz_storage_types::connections::inline::IntoInlineConnection;
23use mz_storage_types::sources::GenericSourceConnection;
24
25// DO NOT add any more imports from `crate` outside of `crate::catalog`.
26use crate::catalog::CatalogState;
27
28/// An update that needs to be applied to a controller.
29#[derive(Debug, Clone)]
30pub struct ParsedStateUpdate {
31    pub kind: ParsedStateUpdateKind,
32    pub ts: Timestamp,
33    pub diff: StateDiff,
34}
35
36/// An update that needs to be applied to a controller.
37#[derive(Debug, Clone)]
38pub enum ParsedStateUpdateKind {
39    Item {
40        durable_item: durable::objects::Item,
41        parsed_item: memory::objects::CatalogItem,
42        connection: Option<GenericSourceConnection>,
43        parsed_full_name: String,
44    },
45    TemporaryItem {
46        durable_item: memory::objects::TemporaryItem,
47        parsed_item: memory::objects::CatalogItem,
48        connection: Option<GenericSourceConnection>,
49        parsed_full_name: String,
50    },
51    Cluster {
52        durable_cluster: durable::objects::Cluster,
53        parsed_cluster: memory::objects::Cluster,
54    },
55    ClusterReplica {
56        durable_cluster_replica: durable::objects::ClusterReplica,
57        parsed_cluster_replica: memory::objects::ClusterReplica,
58    },
59}
60
61/// Potentially generate a [ParsedStateUpdate] that corresponds to the given
62/// change to the catalog.
63///
64/// This technically doesn't "parse" the given state update but uses the given
65/// in-memory [CatalogState] as a shortcut. It already contains the parsed
66/// representation of the item. In theory, we could re-construct the parsed
67/// items by hand if we're given all the changes that lead to a given catalog
68/// state.
69///
70/// For changes with a positive diff, the given [CatalogState] must reflect the
71/// catalog state _after_ applying the catalog change to the catalog. For
72/// negative changes, the given [CatalogState] must reflect the catalog state
73/// _before_ applying the changes. This is so that we can easily extract the
74/// state of an object before it is removed.
75///
76/// Will return `None` if the given catalog change is purely internal to the
77/// catalog and does not have implications for anything else.
78#[instrument(level = "debug")]
79pub fn parse_state_update(
80    catalog: &CatalogState,
81    state_update: StateUpdate,
82) -> Option<ParsedStateUpdate> {
83    let kind = match state_update.kind {
84        StateUpdateKind::Item(item) => Some(parse_item_update(catalog, item)),
85        StateUpdateKind::TemporaryItem(item) => Some(parse_temporary_item_update(catalog, item)),
86        StateUpdateKind::Cluster(cluster) => Some(parse_cluster_update(catalog, cluster)),
87        StateUpdateKind::ClusterReplica(replica) => {
88            Some(parse_cluster_replica_update(catalog, replica))
89        }
90        _ => {
91            // The controllers are currently not interested in other kinds of
92            // changes to the catalog.
93            None
94        }
95    };
96
97    kind.map(|kind| ParsedStateUpdate {
98        kind,
99        ts: state_update.ts,
100        diff: state_update.diff,
101    })
102}
103
104fn parse_item_update(
105    catalog: &CatalogState,
106    durable_item: durable::objects::Item,
107) -> ParsedStateUpdateKind {
108    let (parsed_item, connection, parsed_full_name) =
109        parse_item_update_common(catalog, &durable_item.id);
110
111    ParsedStateUpdateKind::Item {
112        durable_item,
113        parsed_item,
114        connection,
115        parsed_full_name,
116    }
117}
118
119fn parse_temporary_item_update(
120    catalog: &CatalogState,
121    durable_item: memory::objects::TemporaryItem,
122) -> ParsedStateUpdateKind {
123    let (parsed_item, connection, parsed_full_name) =
124        parse_item_update_common(catalog, &durable_item.id);
125
126    ParsedStateUpdateKind::TemporaryItem {
127        durable_item,
128        parsed_item,
129        connection,
130        parsed_full_name,
131    }
132}
133
134// Shared between temporary items and durable items.
135fn parse_item_update_common(
136    catalog: &CatalogState,
137    item_id: &CatalogItemId,
138) -> (CatalogItem, Option<GenericSourceConnection>, String) {
139    let entry = catalog.get_entry(item_id);
140
141    let parsed_item = entry.item().clone();
142    let parsed_full_name = catalog
143        .resolve_full_name(entry.name(), entry.conn_id())
144        .to_string();
145
146    let connection = match &parsed_item {
147        memory::objects::CatalogItem::Source(source) => {
148            if let DataSourceDesc::Ingestion { desc, .. }
149            | DataSourceDesc::OldSyntaxIngestion { desc, .. } = &source.data_source
150            {
151                Some(desc.connection.clone().into_inline_connection(catalog))
152            } else {
153                None
154            }
155        }
156        _ => None,
157    };
158
159    (parsed_item, connection, parsed_full_name)
160}
161
162fn parse_cluster_update(
163    catalog: &CatalogState,
164    durable_cluster: durable::objects::Cluster,
165) -> ParsedStateUpdateKind {
166    let parsed_cluster = catalog.get_cluster(durable_cluster.id);
167
168    ParsedStateUpdateKind::Cluster {
169        durable_cluster,
170        parsed_cluster: parsed_cluster.clone(),
171    }
172}
173
174fn parse_cluster_replica_update(
175    catalog: &CatalogState,
176    durable_cluster_replica: durable::objects::ClusterReplica,
177) -> ParsedStateUpdateKind {
178    let parsed_cluster_replica = catalog.get_cluster_replica(
179        durable_cluster_replica.cluster_id,
180        durable_cluster_replica.replica_id,
181    );
182
183    ParsedStateUpdateKind::ClusterReplica {
184        durable_cluster_replica,
185        parsed_cluster_replica: parsed_cluster_replica.clone(),
186    }
187}