Skip to main content

mz_deploy/client/
models.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Domain models for Materialize catalog objects.
11//!
12//! These types represent objects in the Materialize system catalog and provide
13//! a type-safe interface over raw database rows.
14
15use chrono::{DateTime, Utc};
16use std::fmt;
17use std::str::FromStr;
18
19use crate::project::SchemaQualifier;
20
21/// The type of deployment - either tables-only or full objects.
22#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
23#[serde(rename_all = "snake_case")]
24pub enum DeploymentKind {
25    /// Table creation deployment (apply tables command)
26    Tables,
27    /// Full object deployment (stage, apply commands)
28    Objects,
29    /// Contains sinks
30    Sinks,
31    /// Contains replacement materialized views (drop after apply, not swap)
32    Replacement,
33}
34
35impl fmt::Display for DeploymentKind {
36    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
37        match self {
38            DeploymentKind::Tables => write!(f, "tables"),
39            DeploymentKind::Objects => write!(f, "objects"),
40            DeploymentKind::Sinks => write!(f, "sinks"),
41            DeploymentKind::Replacement => write!(f, "replacement"),
42        }
43    }
44}
45
46impl FromStr for DeploymentKind {
47    type Err = String;
48
49    fn from_str(s: &str) -> Result<Self, Self::Err> {
50        match s {
51            "tables" => Ok(DeploymentKind::Tables),
52            "objects" => Ok(DeploymentKind::Objects),
53            "sinks" => Ok(DeploymentKind::Sinks),
54            "replacement" => Ok(DeploymentKind::Replacement),
55            _ => Err(format!("Invalid deployment kind: {}", s)),
56        }
57    }
58}
59
60/// Deployment mode tag stored alongside each deployment.
61///
62/// Currently only `Stage` exists; kept as an enum for schema compatibility
63/// with the `mode` column in `_mz_deploy.tables.deployments`.
64#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
65#[serde(rename_all = "snake_case")]
66pub enum DeploymentMode {
67    /// Full staging deployment that can be promoted to production.
68    Stage,
69}
70
71impl fmt::Display for DeploymentMode {
72    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
73        match self {
74            DeploymentMode::Stage => write!(f, "stage"),
75        }
76    }
77}
78
79impl FromStr for DeploymentMode {
80    type Err = String;
81
82    fn from_str(s: &str) -> Result<Self, Self::Err> {
83        match s {
84            "stage" => Ok(DeploymentMode::Stage),
85            _ => Err(format!("Invalid deployment mode: {}", s)),
86        }
87    }
88}
89
90/// A compute cluster in Materialize.
91///
92/// Clusters provide the compute resources for materialized views, indexes, and sinks.
93#[derive(Debug, Clone, PartialEq, Eq)]
94pub struct Cluster {
95    /// Materialize's unique identifier for the cluster
96    pub id: String,
97    /// Cluster name (e.g., "quickstart")
98    pub name: String,
99    /// Cluster size (e.g., "M.1-large"), None for unmanaged clusters
100    pub size: Option<String>,
101    /// Number of replicas for fault tolerance (stored as i64 to handle postgres uint4 type)
102    pub replication_factor: Option<i64>,
103}
104
105/// Options for creating a new cluster.
106///
107/// Only size and replication factor are configurable - all other settings
108/// use Materialize defaults.
109#[derive(Debug, Clone, PartialEq, Eq)]
110pub struct ClusterOptions {
111    /// Cluster size (e.g., "M.1-large", "M.1-small")
112    pub size: String,
113    /// Number of replicas (default: 1)
114    pub replication_factor: u32,
115}
116
117impl ClusterOptions {
118    /// Create cluster options from a production cluster configuration.
119    pub fn from_cluster(cluster: &Cluster) -> Result<Self, String> {
120        let size = cluster.size.clone().ok_or_else(|| {
121            format!(
122                "Cluster '{}' has no size (unmanaged cluster?)",
123                cluster.name
124            )
125        })?;
126
127        let replication_factor = cluster
128            .replication_factor
129            .unwrap_or(1)
130            .try_into()
131            .map_err(|_| format!("Invalid replication_factor for cluster '{}'", cluster.name))?;
132
133        Ok(Self {
134            size,
135            replication_factor,
136        })
137    }
138}
139
140/// Configuration for a cluster replica (used for unmanaged clusters).
141#[derive(Debug, Clone, PartialEq, Eq)]
142pub struct ClusterReplica {
143    /// Replica name (e.g., "r1", "r2")
144    pub name: String,
145    /// Replica size (e.g., "25cc")
146    pub size: String,
147    /// Optional availability zone
148    pub availability_zone: Option<String>,
149}
150
151/// A privilege grant on a cluster.
152#[derive(Debug, Clone, PartialEq, Eq)]
153pub struct ObjectGrant {
154    /// Role name that receives the grant
155    pub grantee: String,
156    /// Privilege type (e.g., "USAGE", "CREATE")
157    pub privilege_type: String,
158}
159
160/// Configuration for creating a cluster (managed or unmanaged).
161///
162/// This captures all the information needed to clone a cluster's configuration
163/// including its replicas (for unmanaged clusters) and privilege grants.
164#[derive(Debug, Clone, PartialEq, Eq)]
165pub enum ClusterConfig {
166    /// Managed cluster with SIZE and REPLICATION FACTOR
167    Managed {
168        /// Cluster options (size, replication factor)
169        options: ClusterOptions,
170        /// Privilege grants on the cluster
171        grants: Vec<ObjectGrant>,
172    },
173    /// Unmanaged cluster with explicit replicas
174    Unmanaged {
175        /// Replica configurations
176        replicas: Vec<ClusterReplica>,
177        /// Privilege grants on the cluster
178        grants: Vec<ObjectGrant>,
179    },
180}
181
182impl ClusterConfig {
183    /// Get the grants for this cluster configuration.
184    pub fn grants(&self) -> &[ObjectGrant] {
185        match self {
186            ClusterConfig::Managed { grants, .. } => grants,
187            ClusterConfig::Unmanaged { grants, .. } => grants,
188        }
189    }
190}
191
192/// A schema deployment record tracking when and how a schema was deployed.
193///
194/// Stored in the `deploy.deployments` table. Schemas are deployed
195/// atomically - all objects in a dirty schema are redeployed together.
196#[derive(Debug, Clone, PartialEq, Eq)]
197pub struct SchemaDeploymentRecord {
198    /// Deploy ID (e.g., `"<init>"` for direct deploy, `"staging"` for staged deploy)
199    pub deploy_id: String,
200    /// Database name (e.g., "materialize")
201    pub database: String,
202    /// Schema name (e.g., "public")
203    pub schema: String,
204    /// When this schema was deployed
205    pub deployed_at: DateTime<Utc>,
206    /// Which Materialize user/role deployed this schema
207    pub deployed_by: String,
208    /// When this schema was promoted to production (NULL for staging, set on promotion)
209    pub promoted_at: Option<DateTime<Utc>>,
210    /// Git commit hash if available
211    pub git_commit: Option<String>,
212    /// Type of deployment (tables or objects)
213    pub kind: DeploymentKind,
214    /// Whether this is a stage or preview deployment
215    pub mode: DeploymentMode,
216}
217
218/// An object deployment record tracking object-level deployment history.
219///
220/// Stored in the `deploy.objects` table (append-only).
221/// Each row records that an object with a specific hash was deployed
222/// to a deployment at a point in time.
223#[derive(Debug, Clone, PartialEq, Eq)]
224pub struct DeploymentObjectRecord {
225    /// Deploy ID (e.g., `"<init>"` for direct deploy, `"staging"` for staged deploy)
226    pub deploy_id: String,
227    /// Database name (e.g., "materialize")
228    pub database: String,
229    /// Schema name (e.g., "public")
230    pub schema: String,
231    /// Object name (e.g., "my_view")
232    pub object: String,
233    /// Hash of the HIR DatabaseObject (semantic content hash)
234    pub object_hash: String,
235    /// When this object was deployed
236    pub deployed_at: DateTime<Utc>,
237}
238
239/// Metadata about a deployment.
240///
241/// Used for validation before operations like apply or abort.
242#[derive(Debug, Clone, PartialEq, Eq)]
243pub struct DeploymentMetadata {
244    /// Deploy ID
245    pub deploy_id: String,
246    /// When this deployment was promoted (NULL if not promoted)
247    pub promoted_at: Option<DateTime<Utc>>,
248    /// Whether this is a stage or preview deployment
249    pub mode: DeploymentMode,
250    /// List of (database, schema) tuples in this deployment
251    pub schemas: Vec<SchemaQualifier>,
252}
253
254/// A conflict record indicating a schema was updated after deployment started.
255///
256/// Used for git-merge-style conflict detection when promoting deployments.
257/// Returned by conflict detection queries that check if production schemas
258/// were modified since the staging deployment began.
259#[derive(Debug, Clone, PartialEq, Eq)]
260pub struct ConflictRecord {
261    /// Database name containing the conflicting schema
262    pub database: String,
263    /// Schema name that has a conflict
264    pub schema: String,
265    /// Deploy ID that last promoted this schema
266    pub deploy_id: String,
267    /// When the schema was last promoted to production
268    pub promoted_at: DateTime<Utc>,
269}
270
271/// A cluster known to host at least one promoted deployment.
272///
273/// Returned by `list_production_clusters()` and used by `dev` to refuse
274/// overlay deployments that would land on production compute. Each record
275/// carries one representative protected deployment so the error message
276/// can explain why the cluster is considered production.
277#[derive(Debug, Clone, PartialEq, Eq)]
278pub struct ProductionClusterRecord {
279    /// Cluster name as it appears in `mz_clusters` (resolved from cluster_id).
280    pub cluster_name: String,
281    /// Database of one promoted deployment hosted on this cluster.
282    pub database: String,
283    /// Schema of one promoted deployment hosted on this cluster.
284    pub schema: String,
285    /// When that deployment was promoted.
286    pub promoted_at: DateTime<Utc>,
287}
288
289/// Details about a specific deployment.
290///
291/// Returned by `get_deployment_details()` for the describe command.
292#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize)]
293pub struct DeploymentDetails {
294    /// When this deployment was created
295    pub deployed_at: DateTime<Utc>,
296    /// When this deployment was promoted (None if still staging)
297    pub promoted_at: Option<DateTime<Utc>>,
298    /// Which Materialize user/role deployed this
299    pub deployed_by: String,
300    /// Git commit hash if available
301    pub git_commit: Option<String>,
302    /// Type of deployment (tables or objects)
303    pub kind: DeploymentKind,
304    /// Whether this is a stage or preview deployment
305    pub mode: DeploymentMode,
306    /// List of (database, schema) tuples in this deployment
307    pub schemas: Vec<SchemaQualifier>,
308}
309
310/// Summary of a staging deployment.
311///
312/// Used by `list_staging_deployments()` for the deployments command.
313#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize)]
314pub struct StagingDeployment {
315    /// When this deployment was created
316    pub deployed_at: DateTime<Utc>,
317    /// Which Materialize user/role deployed this
318    pub deployed_by: String,
319    /// Git commit hash if available
320    pub git_commit: Option<String>,
321    /// Type of deployment (tables or objects)
322    pub kind: DeploymentKind,
323    /// Whether this is a stage or preview deployment
324    pub mode: DeploymentMode,
325    /// List of (database, schema) tuples in this deployment
326    pub schemas: Vec<SchemaQualifier>,
327}
328
329/// A promoted deployment in history.
330///
331/// Returned by `list_deployment_history()` for the history command.
332#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize)]
333pub struct DeploymentHistoryEntry {
334    /// Deploy ID for this deployment
335    pub deploy_id: String,
336    /// When this deployment was promoted
337    pub promoted_at: DateTime<Utc>,
338    /// Which Materialize user/role deployed this
339    pub deployed_by: String,
340    /// Git commit hash if available
341    pub git_commit: Option<String>,
342    /// Type of deployment (tables or objects)
343    pub kind: DeploymentKind,
344    /// List of (database, schema) tuples in this deployment
345    pub schemas: Vec<SchemaQualifier>,
346}
347
348/// State of an apply operation for resumable apply.
349///
350/// This is determined by checking the existence and comments of the
351/// `_mz_deploy.apply_<deploy_id>_pre` and `_mz_deploy.apply_<deploy_id>_post` schemas.
352/// Comments are set when creating the schemas; the swap transaction exchanges which
353/// schema has which comment.
354#[derive(Debug, Clone, Copy, PartialEq, Eq)]
355pub enum ApplyState {
356    /// No apply state schemas exist - fresh apply or completed.
357    NotStarted,
358    /// State schemas exist but swap hasn't happened yet.
359    /// The `_pre` schema has comment 'swapped=false'.
360    PreSwap,
361    /// Swap has completed.
362    /// After the swap, `_pre` schema has comment 'swapped=true' (it was `_post` before).
363    PostSwap,
364}
365
366/// A replacement materialized view record tracking the mapping between
367/// the replacement MV (in staging schema) and its production target.
368///
369/// The replacement MV lives in a staging schema (`target_schema` + staging suffix)
370/// within the same database, and has the same object name as the target.
371///
372/// Stored in `_mz_deploy.public.replacement_mvs` table.
373#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize)]
374pub struct ReplacementMvRecord {
375    /// Deploy ID this replacement belongs to
376    pub deploy_id: String,
377    /// Database of the target (and replacement) MV
378    pub target_database: String,
379    /// Schema of the production target MV
380    pub target_schema: String,
381    /// Name of the target (and replacement) MV
382    pub target_name: String,
383    /// Schema of the replacement MV (staging) — typically `target_schema` + staging suffix
384    pub replacement_schema: String,
385}
386
387/// A pending statement to be executed after the swap.
388///
389/// Used for deferred execution of statements like sinks that cannot
390/// be created in staging (they write to external systems immediately).
391/// Stored in `_mz_deploy.public.pending_statements` table.
392#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize)]
393pub struct PendingStatement {
394    /// Deploy ID this statement belongs to
395    pub deploy_id: String,
396    /// Sequence number for ordering execution
397    pub sequence_num: i32,
398    /// Database containing the object
399    pub database: String,
400    /// Schema containing the object
401    pub schema: String,
402    /// Object name
403    pub object: String,
404    /// Hash of the object definition
405    pub object_hash: String,
406    /// SQL statement to execute
407    pub statement_sql: String,
408    /// Kind of statement (e.g., "sink")
409    pub statement_kind: String,
410    /// When this statement was executed (None if not yet executed)
411    pub executed_at: Option<DateTime<Utc>>,
412}
413
414#[cfg(test)]
415mod tests {
416    use super::*;
417
418    #[mz_ore::test]
419    fn test_deployment_kind_display() {
420        assert_eq!(DeploymentKind::Tables.to_string(), "tables");
421        assert_eq!(DeploymentKind::Objects.to_string(), "objects");
422    }
423
424    #[mz_ore::test]
425    fn test_deployment_kind_from_str_valid() {
426        assert_eq!(
427            "tables".parse::<DeploymentKind>().unwrap(),
428            DeploymentKind::Tables
429        );
430        assert_eq!(
431            "objects".parse::<DeploymentKind>().unwrap(),
432            DeploymentKind::Objects
433        );
434    }
435
436    #[mz_ore::test]
437    fn test_deployment_kind_from_str_invalid() {
438        let result = "invalid".parse::<DeploymentKind>();
439        assert!(result.is_err());
440        assert_eq!(result.unwrap_err(), "Invalid deployment kind: invalid");
441    }
442
443    #[mz_ore::test]
444    fn test_deployment_kind_roundtrip() {
445        // Verify that Display and FromStr are consistent
446        for kind in [
447            DeploymentKind::Tables,
448            DeploymentKind::Objects,
449            DeploymentKind::Sinks,
450            DeploymentKind::Replacement,
451        ] {
452            let s = kind.to_string();
453            let parsed: DeploymentKind = s.parse().unwrap();
454            assert_eq!(kind, parsed);
455        }
456    }
457
458    #[mz_ore::test]
459    fn test_cluster_options_from_cluster_success() {
460        let cluster = Cluster {
461            id: "u1".to_string(),
462            name: "quickstart".to_string(),
463            size: Some("25cc".to_string()),
464            replication_factor: Some(2),
465        };
466
467        let options = ClusterOptions::from_cluster(&cluster).unwrap();
468        assert_eq!(options.size, "25cc");
469        assert_eq!(options.replication_factor, 2);
470    }
471
472    #[mz_ore::test]
473    fn test_cluster_options_from_cluster_default_replication() {
474        let cluster = Cluster {
475            id: "u1".to_string(),
476            name: "quickstart".to_string(),
477            size: Some("25cc".to_string()),
478            replication_factor: None, // Should default to 1
479        };
480
481        let options = ClusterOptions::from_cluster(&cluster).unwrap();
482        assert_eq!(options.size, "25cc");
483        assert_eq!(options.replication_factor, 1);
484    }
485
486    #[mz_ore::test]
487    fn test_cluster_options_from_cluster_no_size() {
488        let cluster = Cluster {
489            id: "u1".to_string(),
490            name: "unmanaged".to_string(),
491            size: None, // Unmanaged cluster
492            replication_factor: Some(1),
493        };
494
495        let result = ClusterOptions::from_cluster(&cluster);
496        assert!(result.is_err());
497        let err_msg = result.unwrap_err();
498        assert!(err_msg.contains("unmanaged"));
499        assert!(err_msg.contains("has no size"));
500    }
501
502    #[mz_ore::test]
503    fn test_cluster_options_from_cluster_negative_replication() {
504        let cluster = Cluster {
505            id: "u1".to_string(),
506            name: "test".to_string(),
507            size: Some("25cc".to_string()),
508            replication_factor: Some(-1), // Invalid negative value
509        };
510
511        let result = ClusterOptions::from_cluster(&cluster);
512        assert!(result.is_err());
513        assert!(result.unwrap_err().contains("Invalid replication_factor"));
514    }
515
516    #[mz_ore::test]
517    fn test_cluster_equality() {
518        let cluster1 = Cluster {
519            id: "u1".to_string(),
520            name: "test".to_string(),
521            size: Some("25cc".to_string()),
522            replication_factor: Some(1),
523        };
524
525        let cluster2 = Cluster {
526            id: "u1".to_string(),
527            name: "test".to_string(),
528            size: Some("25cc".to_string()),
529            replication_factor: Some(1),
530        };
531
532        let cluster3 = Cluster {
533            id: "u2".to_string(), // Different ID
534            name: "test".to_string(),
535            size: Some("25cc".to_string()),
536            replication_factor: Some(1),
537        };
538
539        assert_eq!(cluster1, cluster2);
540        assert_ne!(cluster1, cluster3);
541    }
542
543    #[mz_ore::test]
544    fn test_cluster_options_equality() {
545        let opts1 = ClusterOptions {
546            size: "25cc".to_string(),
547            replication_factor: 2,
548        };
549
550        let opts2 = ClusterOptions {
551            size: "25cc".to_string(),
552            replication_factor: 2,
553        };
554
555        let opts3 = ClusterOptions {
556            size: "50cc".to_string(),
557            replication_factor: 2,
558        };
559
560        assert_eq!(opts1, opts2);
561        assert_ne!(opts1, opts3);
562    }
563
564    #[mz_ore::test]
565    fn test_cluster_replica_equality() {
566        let r1 = ClusterReplica {
567            name: "r1".to_string(),
568            size: "25cc".to_string(),
569            availability_zone: Some("use1-az1".to_string()),
570        };
571
572        let r2 = ClusterReplica {
573            name: "r1".to_string(),
574            size: "25cc".to_string(),
575            availability_zone: Some("use1-az1".to_string()),
576        };
577
578        let r3 = ClusterReplica {
579            name: "r2".to_string(),
580            size: "25cc".to_string(),
581            availability_zone: None,
582        };
583
584        assert_eq!(r1, r2);
585        assert_ne!(r1, r3);
586    }
587
588    #[mz_ore::test]
589    fn test_cluster_grant_equality() {
590        let g1 = ObjectGrant {
591            grantee: "reader".to_string(),
592            privilege_type: "USAGE".to_string(),
593        };
594
595        let g2 = ObjectGrant {
596            grantee: "reader".to_string(),
597            privilege_type: "USAGE".to_string(),
598        };
599
600        let g3 = ObjectGrant {
601            grantee: "writer".to_string(),
602            privilege_type: "CREATE".to_string(),
603        };
604
605        assert_eq!(g1, g2);
606        assert_ne!(g1, g3);
607    }
608
609    #[mz_ore::test]
610    fn test_cluster_config_managed() {
611        let config = ClusterConfig::Managed {
612            options: ClusterOptions {
613                size: "25cc".to_string(),
614                replication_factor: 2,
615            },
616            grants: vec![ObjectGrant {
617                grantee: "reader".to_string(),
618                privilege_type: "USAGE".to_string(),
619            }],
620        };
621
622        assert_eq!(config.grants().len(), 1);
623        assert_eq!(config.grants()[0].grantee, "reader");
624    }
625
626    #[mz_ore::test]
627    fn test_cluster_config_unmanaged() {
628        let config = ClusterConfig::Unmanaged {
629            replicas: vec![
630                ClusterReplica {
631                    name: "r1".to_string(),
632                    size: "25cc".to_string(),
633                    availability_zone: None,
634                },
635                ClusterReplica {
636                    name: "r2".to_string(),
637                    size: "50cc".to_string(),
638                    availability_zone: Some("use1-az1".to_string()),
639                },
640            ],
641            grants: vec![],
642        };
643
644        if let ClusterConfig::Unmanaged { replicas, grants } = &config {
645            assert_eq!(replicas.len(), 2);
646            assert_eq!(replicas[0].name, "r1");
647            assert_eq!(replicas[1].availability_zone, Some("use1-az1".to_string()));
648            assert!(grants.is_empty());
649        } else {
650            panic!("Expected Unmanaged config");
651        }
652    }
653
654    #[mz_ore::test]
655    fn test_cluster_config_unmanaged_empty_replicas() {
656        // Unmanaged clusters with 0 replicas are valid
657        let config = ClusterConfig::Unmanaged {
658            replicas: vec![],
659            grants: vec![ObjectGrant {
660                grantee: "admin".to_string(),
661                privilege_type: "CREATE".to_string(),
662            }],
663        };
664
665        if let ClusterConfig::Unmanaged { replicas, grants } = &config {
666            assert!(replicas.is_empty());
667            assert_eq!(grants.len(), 1);
668        } else {
669            panic!("Expected Unmanaged config");
670        }
671    }
672}