Skip to main content

mz_deploy/client/
type_info.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Column-schema introspection for the data-contract and type-checking systems.
11//!
12//! Methods on [`TypeInfoClient`] query the Materialize system catalog for
13//! external dependencies and `CREATE TABLE FROM SOURCE` tables, returning their
14//! column names, types, nullability, object kinds, and comments as a
15//! `Types` snapshot.
16//!
17//! Plain `CREATE TABLE` objects are excluded — their schemas are derived from
18//! the SQL AST during type checking and do not need server queries.
19//!
20//! - **`lock`** uses [`query_types_for_objects`](TypeInfoClient::query_types_for_objects)
21//!   to generate `types.lock` for declared dependencies and source tables,
22//!   retrieving column types, object kind, and comments from the catalog in a
23//!   single query per object.
24//! - **`query_external_types`** delegates to `query_types_for_objects`, extracting
25//!   object lists from the compiled project graph.
26
27use crate::client::connection::TypeInfoClient;
28use crate::client::errors::ConnectionError;
29use crate::project::ir::object_id::ObjectId;
30use crate::types::{ColumnType, ObjectKind, Types};
31use serde::Deserialize;
32use std::collections::{BTreeMap, BTreeSet};
33
34/// Per-object payload returned by the catalog query in `query_types_for_objects`.
35#[derive(Deserialize)]
36struct CatalogObjectInfo {
37    object_type: ObjectKind,
38    object_comment: Option<String>,
39    columns: Vec<CatalogColumnInfo>,
40}
41
42#[derive(Deserialize)]
43struct CatalogColumnInfo {
44    name: String,
45    r#type: String,
46    nullable: bool,
47    position: i64,
48    comment: Option<String>,
49}
50
51impl TypeInfoClient<'_> {
52    /// Resolve the column schema, kind, and comments for `objects` plus
53    /// `source_tables` in a single catalog query.
54    ///
55    /// Joins `mz_catalog.mz_columns`, `mz_catalog.mz_objects`,
56    /// `mz_catalog.mz_schemas`, `mz_catalog.mz_databases`, and
57    /// `mz_internal.mz_comments` to retrieve columns, types, nullability,
58    /// object kind, and both object-level and column-level comments. Each input
59    /// triple `(database, schema, object)` is expanded from a single `jsonb`
60    /// parameter via `jsonb_array_elements`, and the per-object metadata is
61    /// returned as a `jsonb` blob deserialized by serde on the client.
62    ///
63    /// Returns `(types, missing)` where `missing` lists any input objects that
64    /// did not exist in the target catalog. The `lock` command surfaces those
65    /// as `DeclaredDependenciesMissing`.
66    ///
67    /// Source tables are always recorded as `ObjectKind::Table` regardless of
68    /// the catalog's `o.type`. Objects without columns (e.g. secrets,
69    /// connections) appear in the result with an empty column map.
70    pub async fn query_types_for_objects(
71        &self,
72        objects: &[ObjectId],
73        source_tables: &[ObjectId],
74    ) -> Result<(Types, Vec<ObjectId>), ConnectionError> {
75        let source_table_set: BTreeSet<&ObjectId> = source_tables.iter().collect();
76        let all_oids: Vec<&ObjectId> = objects.iter().chain(source_tables.iter()).collect();
77
78        if all_oids.is_empty() {
79            return Ok((
80                Types {
81                    version: 1,
82                    tables: BTreeMap::new(),
83                    kinds: BTreeMap::new(),
84                    comments: BTreeMap::new(),
85                },
86                Vec::new(),
87            ));
88        }
89
90        // Pass the (db, schema, obj) triples as a single jsonb array. Materialize's
91        // unnest takes one array, so jsonb_array_elements is the cleanest way to
92        // expand the input into a row per object.
93        let input_json = serde_json::Value::Array(
94            all_oids
95                .iter()
96                .map(|o| {
97                    serde_json::json!({
98                        "db": o.database(),
99                        "sch": o.schema(),
100                        "obj": o.object(),
101                    })
102                })
103                .collect(),
104        );
105
106        let rows = self
107            .client
108            .query(
109                "WITH input AS ( \
110                    SELECT \
111                        elem->>'db' AS db, \
112                        elem->>'sch' AS sch, \
113                        elem->>'obj' AS obj \
114                    FROM jsonb_array_elements($1) AS elem \
115                 ) \
116                 SELECT \
117                    i.db AS db, \
118                    i.sch AS sch, \
119                    i.obj AS obj, \
120                    jsonb_build_object( \
121                        'object_type', o.type, \
122                        'object_comment', obj_comment.comment, \
123                        'columns', COALESCE( \
124                            jsonb_agg(jsonb_build_object( \
125                                'name', c.name, \
126                                'type', c.type, \
127                                'nullable', c.nullable, \
128                                'position', c.position::int8, \
129                                'comment', col_comment.comment \
130                            )) FILTER (WHERE c.id IS NOT NULL), \
131                            '[]'::jsonb \
132                        ) \
133                    )::text AS data \
134                 FROM input i \
135                 JOIN mz_catalog.mz_schemas s ON s.name = i.sch \
136                 LEFT JOIN mz_catalog.mz_databases d ON d.id = s.database_id \
137                 JOIN mz_catalog.mz_objects o \
138                    ON o.schema_id = s.id AND o.name = i.obj \
139                 LEFT JOIN mz_catalog.mz_columns c ON c.id = o.id \
140                 LEFT JOIN mz_internal.mz_comments obj_comment \
141                    ON o.id = obj_comment.id AND obj_comment.object_sub_id IS NULL \
142                 LEFT JOIN mz_internal.mz_comments col_comment \
143                    ON c.id = col_comment.id AND col_comment.object_sub_id = c.position \
144                 WHERE (i.db IS NULL AND s.database_id IS NULL) \
145                    OR (i.db IS NOT NULL AND d.name = i.db) \
146                 GROUP BY i.db, i.sch, i.obj, o.type, obj_comment.comment",
147                &[&input_json],
148            )
149            .await?;
150
151        let mut tables = BTreeMap::new();
152        let mut kinds = BTreeMap::new();
153        let mut comments = BTreeMap::new();
154        let mut found = BTreeSet::new();
155
156        for row in &rows {
157            let db: Option<String> = row.get("db");
158            let sch: String = row.get("sch");
159            let obj: String = row.get("obj");
160            let oid = match db {
161                Some(db) => ObjectId::new(db, sch, obj),
162                None => ObjectId::new_system(sch, obj),
163            };
164            let data: String = row.get("data");
165            let info: CatalogObjectInfo = serde_json::from_str(&data).map_err(|e| {
166                ConnectionError::Message(format!(
167                    "failed to decode catalog metadata for {}: {}",
168                    oid, e
169                ))
170            })?;
171
172            let kind = if source_table_set.contains(&oid) {
173                ObjectKind::Table
174            } else {
175                info.object_type
176            };
177            kinds.insert(oid.clone(), kind);
178
179            if let Some(comment) = info.object_comment {
180                comments.insert(oid.clone(), comment);
181            }
182
183            let mut columns = BTreeMap::new();
184            for col in info.columns {
185                columns.insert(
186                    col.name,
187                    ColumnType {
188                        r#type: col.r#type,
189                        nullable: col.nullable,
190                        position: usize::try_from(col.position).unwrap_or(0),
191                        comment: col.comment,
192                    },
193                );
194            }
195            tables.insert(oid.clone(), columns);
196            found.insert(oid);
197        }
198
199        let missing: Vec<ObjectId> = all_oids
200            .iter()
201            .filter(|o| !found.contains(**o))
202            .map(|o| (*o).clone())
203            .collect();
204
205        Ok((
206            Types {
207                version: 1,
208                tables,
209                kinds,
210                comments,
211            },
212            missing,
213        ))
214    }
215}