Skip to main content

mz_deploy/cli/commands/
setup_schema.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! DDL statements that materialize the `_mz_deploy` tracking database.
11//!
12//! Each entry is executed as its own statement by [`super::setup::setup`].
13//! Executing them individually (rather than as one multi-statement batch via
14//! `batch_execute`) avoids Materialize's rejection of DDL inside the implicit
15//! transaction block that a simple multi-statement query creates.
16//!
17//! Every statement is idempotent — `setup` is the only command that writes
18//! to `_mz_deploy`, and it can be re-run any number of times to bring an
19//! existing installation up to the current set of objects. The initial
20//! `tables.version` row is seeded separately by [`super::setup::setup`] with
21//! a pre-check, since there is no `INSERT IF NOT EXISTS` form in Materialize.
22//!
23//! Order matters: tables must exist before the indexes and views that
24//! reference them.
25//!
26//! [`EXPECTED_OBJECTS`] MUST stay in sync with the `CREATE` statements here;
27//! a unit test in this module guards the invariant.
28
29/// All DDL statements required to initialize `_mz_deploy` from a clean
30/// database, and safe to re-run against an existing one. The `_mz_deploy`
31/// database itself is created separately by [`super::setup::setup`]
32/// immediately before iterating these.
33pub(super) const SETUP_STATEMENTS: &[&str] = &[
34    "CREATE SCHEMA IF NOT EXISTS _mz_deploy.tables",
35    r#"CREATE TABLE IF NOT EXISTS _mz_deploy.tables.deployments (
36        deploy_id   TEXT NOT NULL,
37        deployed_at TIMESTAMPTZ NOT NULL,
38        promoted_at TIMESTAMPTZ,
39        database    TEXT NOT NULL,
40        schema      TEXT NOT NULL,
41        deployed_by TEXT NOT NULL,
42        commit      TEXT,
43        kind        TEXT NOT NULL,
44        mode        TEXT NOT NULL
45    )"#,
46    r#"CREATE INDEX IF NOT EXISTS deployments_deploy_id_idx
47        IN CLUSTER _mz_deploy_server
48        ON _mz_deploy.tables.deployments (deploy_id)"#,
49    r#"CREATE TABLE IF NOT EXISTS _mz_deploy.tables.objects (
50        deploy_id TEXT NOT NULL,
51        database  TEXT NOT NULL,
52        schema    TEXT NOT NULL,
53        object    TEXT NOT NULL,
54        hash      TEXT NOT NULL
55    )"#,
56    r#"CREATE INDEX IF NOT EXISTS objects_deploy_id_idx
57        IN CLUSTER _mz_deploy_server
58        ON _mz_deploy.tables.objects (deploy_id)"#,
59    r#"CREATE TABLE IF NOT EXISTS _mz_deploy.tables.clusters (
60        deploy_id  TEXT NOT NULL,
61        cluster_id TEXT NOT NULL
62    )"#,
63    r#"CREATE TABLE IF NOT EXISTS _mz_deploy.tables.pending_statements (
64        deploy_id      TEXT NOT NULL,
65        sequence_num   INT NOT NULL,
66        database       TEXT NOT NULL,
67        schema         TEXT NOT NULL,
68        object         TEXT NOT NULL,
69        object_hash    TEXT NOT NULL,
70        statement_sql  TEXT NOT NULL,
71        statement_kind TEXT NOT NULL,
72        executed_at    TIMESTAMPTZ
73    )"#,
74    r#"CREATE INDEX IF NOT EXISTS pending_statements_deploy_id_idx
75        IN CLUSTER _mz_deploy_server
76        ON _mz_deploy.tables.pending_statements (deploy_id)"#,
77    r#"CREATE TABLE IF NOT EXISTS _mz_deploy.tables.replacement_mvs (
78        deploy_id          TEXT NOT NULL,
79        target_database    TEXT NOT NULL,
80        target_schema      TEXT NOT NULL,
81        target_name        TEXT NOT NULL,
82        replacement_schema TEXT NOT NULL
83    )"#,
84    r#"CREATE TABLE IF NOT EXISTS _mz_deploy.tables.version (
85        version BIGINT NOT NULL
86    )"#,
87    r#"CREATE INDEX IF NOT EXISTS version_idx
88        IN CLUSTER _mz_deploy_server
89        ON _mz_deploy.tables.version (version)"#,
90    // Per-developer overlay database manifest.
91    r#"CREATE TABLE IF NOT EXISTS _mz_deploy.tables.dev_overlays (
92        profile       TEXT NOT NULL,
93        project       TEXT NOT NULL,
94        overlay_db    TEXT NOT NULL,
95        created_at    TIMESTAMPTZ NOT NULL
96    )"#,
97    r#"CREATE INDEX IF NOT EXISTS dev_overlays_profile_project_idx
98        IN CLUSTER _mz_deploy_server
99        ON _mz_deploy.tables.dev_overlays (profile, project)"#,
100    r#"CREATE VIEW IF NOT EXISTS _mz_deploy.public.production AS
101    WITH candidates AS (
102        SELECT DISTINCT ON (database, schema)
103            database, schema, deploy_id, promoted_at, commit, kind
104        FROM _mz_deploy.tables.deployments
105        WHERE promoted_at IS NOT NULL
106        ORDER BY database, schema, promoted_at DESC
107    )
108    SELECT c.database, c.schema, c.deploy_id, c.promoted_at, c.commit, c.kind
109    FROM candidates c
110    JOIN mz_schemas s ON c.schema = s.name
111    JOIN mz_databases d ON c.database = d.name"#,
112    r#"CREATE INDEX IF NOT EXISTS production_database_schema_idx
113        IN CLUSTER _mz_deploy_server
114        ON _mz_deploy.public.production (database, schema)"#,
115    r#"CREATE VIEW IF NOT EXISTS _mz_deploy.public.staging_deployments AS
116    SELECT deploy_id, deployed_at, database, schema, deployed_by, commit, kind, mode
117    FROM _mz_deploy.tables.deployments
118    WHERE promoted_at IS NULL"#,
119    r#"CREATE INDEX IF NOT EXISTS staging_deployments_deploy_id_idx
120        IN CLUSTER _mz_deploy_server
121        ON _mz_deploy.public.staging_deployments (deploy_id)"#,
122    r#"CREATE VIEW IF NOT EXISTS _mz_deploy.public.deployment_clusters AS
123    SELECT dc.deploy_id, c.name
124    FROM _mz_deploy.tables.clusters dc
125    JOIN mz_catalog.mz_clusters c ON dc.cluster_id = c.id"#,
126    r#"CREATE INDEX IF NOT EXISTS deployment_clusters_deploy_id_idx
127        IN CLUSTER _mz_deploy_server
128        ON _mz_deploy.public.deployment_clusters (deploy_id)"#,
129    r#"CREATE VIEW IF NOT EXISTS _mz_deploy.public.missing_clusters AS
130    SELECT d.deploy_id, dc.cluster_id
131    FROM _mz_deploy.tables.deployments d
132    JOIN _mz_deploy.tables.clusters dc USING (deploy_id)
133    LEFT JOIN mz_catalog.mz_clusters c ON dc.cluster_id = c.id
134    WHERE d.promoted_at IS NULL AND c.id IS NULL"#,
135    r#"CREATE INDEX IF NOT EXISTS missing_clusters_deploy_id_idx
136        IN CLUSTER _mz_deploy_server
137        ON _mz_deploy.public.missing_clusters (deploy_id)"#,
138    r#"CREATE VIEW IF NOT EXISTS _mz_deploy.public.deployments AS
139    SELECT deploy_id, deployed_at, promoted_at, database, schema, deployed_by,
140           commit, kind, mode
141    FROM _mz_deploy.tables.deployments"#,
142    r#"CREATE VIEW IF NOT EXISTS _mz_deploy.public.objects AS
143    SELECT deploy_id, database, schema, object, hash
144    FROM _mz_deploy.tables.objects"#,
145    r#"CREATE VIEW IF NOT EXISTS _mz_deploy.public.pending_statements AS
146    SELECT deploy_id, sequence_num, database, schema, object, object_hash,
147           statement_sql, statement_kind, executed_at
148    FROM _mz_deploy.tables.pending_statements"#,
149    r#"CREATE VIEW IF NOT EXISTS _mz_deploy.public.replacement_mvs AS
150    SELECT deploy_id, target_database, target_schema, target_name,
151           replacement_schema
152    FROM _mz_deploy.tables.replacement_mvs"#,
153    r#"CREATE VIEW IF NOT EXISTS _mz_deploy.public.version AS
154    SELECT version
155    FROM _mz_deploy.tables.version"#,
156];
157
158/// The known set of objects that `_mz_deploy` contains after a successful
159/// `setup`. Used by `verify()` to check whether setup has been run.
160///
161/// Each entry is `(schema, object_name, kind)` where `kind` is the value
162/// `mz_objects.type` uses: `"table"`, `"view"`, `"index"`.
163///
164/// This list MUST stay in sync with `SETUP_STATEMENTS` above. Any object
165/// created by a statement must appear here, and vice versa.
166pub(super) const EXPECTED_OBJECTS: &[(&str, &str, &str)] = &[
167    ("tables", "deployments", "table"),
168    ("tables", "deployments_deploy_id_idx", "index"),
169    ("tables", "objects", "table"),
170    ("tables", "objects_deploy_id_idx", "index"),
171    ("tables", "clusters", "table"),
172    ("tables", "pending_statements", "table"),
173    ("tables", "pending_statements_deploy_id_idx", "index"),
174    ("tables", "replacement_mvs", "table"),
175    ("tables", "version", "table"),
176    ("tables", "version_idx", "index"),
177    ("tables", "dev_overlays", "table"),
178    ("tables", "dev_overlays_profile_project_idx", "index"),
179    ("public", "production", "view"),
180    ("public", "production_database_schema_idx", "index"),
181    ("public", "staging_deployments", "view"),
182    ("public", "staging_deployments_deploy_id_idx", "index"),
183    ("public", "deployment_clusters", "view"),
184    ("public", "deployment_clusters_deploy_id_idx", "index"),
185    ("public", "missing_clusters", "view"),
186    ("public", "missing_clusters_deploy_id_idx", "index"),
187    ("public", "deployments", "view"),
188    ("public", "objects", "view"),
189    ("public", "pending_statements", "view"),
190    ("public", "replacement_mvs", "view"),
191    ("public", "version", "view"),
192];
193
194#[cfg(test)]
195mod tests {
196    use super::*;
197    use std::collections::BTreeSet;
198
199    /// Extract `(schema, name, kind)` triples from a single CREATE DDL
200    /// statement in `SETUP_STATEMENTS`. Returns `None` for statements we
201    /// don't track in `EXPECTED_OBJECTS` (currently: `CREATE SCHEMA`).
202    fn parsed_object(stmt: &str) -> Option<(String, String, &'static str)> {
203        // Normalize whitespace so we can work with tokens.
204        let collapsed: String = stmt.split_whitespace().collect::<Vec<_>>().join(" ");
205        let tokens: Vec<&str> = collapsed.split(' ').collect();
206
207        // Pattern prefix check — every tracked stmt starts with
208        // "CREATE {TABLE|INDEX|VIEW} IF NOT EXISTS".
209        if tokens.len() < 6 || tokens[0] != "CREATE" || tokens[2..5] != ["IF", "NOT", "EXISTS"] {
210            return None;
211        }
212
213        match tokens[1] {
214            "TABLE" | "VIEW" => {
215                // token 5: `_mz_deploy.<schema>.<name>` (may have trailing `(` for TABLE).
216                let fqn = tokens[5].trim_end_matches('(');
217                let (schema, name) = split_db_schema_name(fqn)?;
218                let kind = if tokens[1] == "TABLE" {
219                    "table"
220                } else {
221                    "view"
222                };
223                Some((schema, name, kind))
224            }
225            "INDEX" => {
226                // token 5: index name. Schema is the target's schema, found
227                // after the `ON` keyword.
228                let name = tokens[5].to_string();
229                let on_pos = tokens.iter().position(|t| *t == "ON")?;
230                let target = tokens.get(on_pos + 1)?.trim_end_matches('(');
231                let (target_schema, _target) = split_db_schema_name(target)?;
232                Some((target_schema, name, "index"))
233            }
234            _ => None,
235        }
236    }
237
238    /// Split `_mz_deploy.<schema>.<name>` into `(schema, name)`.
239    fn split_db_schema_name(fqn: &str) -> Option<(String, String)> {
240        let rest = fqn.strip_prefix("_mz_deploy.")?;
241        let (schema, name) = rest.split_once('.')?;
242        Some((schema.to_string(), name.to_string()))
243    }
244
245    /// Guards that every object created by `SETUP_STATEMENTS` has a
246    /// corresponding `EXPECTED_OBJECTS` entry and vice versa — so `verify`
247    /// catches exactly the set of objects `setup` installs. If this fails,
248    /// you added a CREATE without a matching EXPECTED_OBJECTS row (or the
249    /// other way around).
250    #[mz_ore::test]
251    fn expected_objects_match_setup_statements() {
252        let parsed: BTreeSet<(String, String, &'static str)> = SETUP_STATEMENTS
253            .iter()
254            .filter_map(|stmt| parsed_object(stmt))
255            .collect();
256
257        let expected: BTreeSet<(String, String, &'static str)> = EXPECTED_OBJECTS
258            .iter()
259            .map(|(s, n, k)| (s.to_string(), n.to_string(), *k))
260            .collect();
261
262        let missing_from_expected: Vec<_> = parsed.difference(&expected).collect();
263        let missing_from_statements: Vec<_> = expected.difference(&parsed).collect();
264
265        assert!(
266            missing_from_expected.is_empty() && missing_from_statements.is_empty(),
267            "SETUP_STATEMENTS and EXPECTED_OBJECTS are out of sync.\n\
268             In SETUP_STATEMENTS but not in EXPECTED_OBJECTS: {:?}\n\
269             In EXPECTED_OBJECTS but not in SETUP_STATEMENTS: {:?}",
270            missing_from_expected,
271            missing_from_statements,
272        );
273    }
274}