1use chrono::{DateTime, Utc};
16use std::fmt;
17use std::str::FromStr;
18
19use crate::project::SchemaQualifier;
20
21#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
23#[serde(rename_all = "snake_case")]
24pub enum DeploymentKind {
25 Tables,
27 Objects,
29 Sinks,
31 Replacement,
33}
34
35impl fmt::Display for DeploymentKind {
36 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
37 match self {
38 DeploymentKind::Tables => write!(f, "tables"),
39 DeploymentKind::Objects => write!(f, "objects"),
40 DeploymentKind::Sinks => write!(f, "sinks"),
41 DeploymentKind::Replacement => write!(f, "replacement"),
42 }
43 }
44}
45
46impl FromStr for DeploymentKind {
47 type Err = String;
48
49 fn from_str(s: &str) -> Result<Self, Self::Err> {
50 match s {
51 "tables" => Ok(DeploymentKind::Tables),
52 "objects" => Ok(DeploymentKind::Objects),
53 "sinks" => Ok(DeploymentKind::Sinks),
54 "replacement" => Ok(DeploymentKind::Replacement),
55 _ => Err(format!("Invalid deployment kind: {}", s)),
56 }
57 }
58}
59
60#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
65#[serde(rename_all = "snake_case")]
66pub enum DeploymentMode {
67 Stage,
69}
70
71impl fmt::Display for DeploymentMode {
72 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
73 match self {
74 DeploymentMode::Stage => write!(f, "stage"),
75 }
76 }
77}
78
79impl FromStr for DeploymentMode {
80 type Err = String;
81
82 fn from_str(s: &str) -> Result<Self, Self::Err> {
83 match s {
84 "stage" => Ok(DeploymentMode::Stage),
85 _ => Err(format!("Invalid deployment mode: {}", s)),
86 }
87 }
88}
89
90#[derive(Debug, Clone, PartialEq, Eq)]
94pub struct Cluster {
95 pub id: String,
97 pub name: String,
99 pub size: Option<String>,
101 pub replication_factor: Option<i64>,
103}
104
105#[derive(Debug, Clone, PartialEq, Eq)]
110pub struct ClusterOptions {
111 pub size: String,
113 pub replication_factor: u32,
115}
116
117impl ClusterOptions {
118 pub fn from_cluster(cluster: &Cluster) -> Result<Self, String> {
120 let size = cluster.size.clone().ok_or_else(|| {
121 format!(
122 "Cluster '{}' has no size (unmanaged cluster?)",
123 cluster.name
124 )
125 })?;
126
127 let replication_factor = cluster
128 .replication_factor
129 .unwrap_or(1)
130 .try_into()
131 .map_err(|_| format!("Invalid replication_factor for cluster '{}'", cluster.name))?;
132
133 Ok(Self {
134 size,
135 replication_factor,
136 })
137 }
138}
139
140#[derive(Debug, Clone, PartialEq, Eq)]
142pub struct ClusterReplica {
143 pub name: String,
145 pub size: String,
147 pub availability_zone: Option<String>,
149}
150
151#[derive(Debug, Clone, PartialEq, Eq)]
153pub struct ObjectGrant {
154 pub grantee: String,
156 pub privilege_type: String,
158}
159
160#[derive(Debug, Clone, PartialEq, Eq)]
165pub enum ClusterConfig {
166 Managed {
168 options: ClusterOptions,
170 grants: Vec<ObjectGrant>,
172 },
173 Unmanaged {
175 replicas: Vec<ClusterReplica>,
177 grants: Vec<ObjectGrant>,
179 },
180}
181
182impl ClusterConfig {
183 pub fn grants(&self) -> &[ObjectGrant] {
185 match self {
186 ClusterConfig::Managed { grants, .. } => grants,
187 ClusterConfig::Unmanaged { grants, .. } => grants,
188 }
189 }
190}
191
192#[derive(Debug, Clone, PartialEq, Eq)]
197pub struct SchemaDeploymentRecord {
198 pub deploy_id: String,
200 pub database: String,
202 pub schema: String,
204 pub deployed_at: DateTime<Utc>,
206 pub deployed_by: String,
208 pub promoted_at: Option<DateTime<Utc>>,
210 pub git_commit: Option<String>,
212 pub kind: DeploymentKind,
214 pub mode: DeploymentMode,
216}
217
218#[derive(Debug, Clone, PartialEq, Eq)]
224pub struct DeploymentObjectRecord {
225 pub deploy_id: String,
227 pub database: String,
229 pub schema: String,
231 pub object: String,
233 pub object_hash: String,
235 pub deployed_at: DateTime<Utc>,
237}
238
239#[derive(Debug, Clone, PartialEq, Eq)]
243pub struct DeploymentMetadata {
244 pub deploy_id: String,
246 pub promoted_at: Option<DateTime<Utc>>,
248 pub mode: DeploymentMode,
250 pub schemas: Vec<SchemaQualifier>,
252}
253
254#[derive(Debug, Clone, PartialEq, Eq)]
260pub struct ConflictRecord {
261 pub database: String,
263 pub schema: String,
265 pub deploy_id: String,
267 pub promoted_at: DateTime<Utc>,
269}
270
271#[derive(Debug, Clone, PartialEq, Eq)]
278pub struct ProductionClusterRecord {
279 pub cluster_name: String,
281 pub database: String,
283 pub schema: String,
285 pub promoted_at: DateTime<Utc>,
287}
288
289#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize)]
293pub struct DeploymentDetails {
294 pub deployed_at: DateTime<Utc>,
296 pub promoted_at: Option<DateTime<Utc>>,
298 pub deployed_by: String,
300 pub git_commit: Option<String>,
302 pub kind: DeploymentKind,
304 pub mode: DeploymentMode,
306 pub schemas: Vec<SchemaQualifier>,
308}
309
310#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize)]
314pub struct StagingDeployment {
315 pub deployed_at: DateTime<Utc>,
317 pub deployed_by: String,
319 pub git_commit: Option<String>,
321 pub kind: DeploymentKind,
323 pub mode: DeploymentMode,
325 pub schemas: Vec<SchemaQualifier>,
327}
328
329#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize)]
333pub struct DeploymentHistoryEntry {
334 pub deploy_id: String,
336 pub promoted_at: DateTime<Utc>,
338 pub deployed_by: String,
340 pub git_commit: Option<String>,
342 pub kind: DeploymentKind,
344 pub schemas: Vec<SchemaQualifier>,
346}
347
348#[derive(Debug, Clone, Copy, PartialEq, Eq)]
355pub enum ApplyState {
356 NotStarted,
358 PreSwap,
361 PostSwap,
364}
365
366#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize)]
374pub struct ReplacementMvRecord {
375 pub deploy_id: String,
377 pub target_database: String,
379 pub target_schema: String,
381 pub target_name: String,
383 pub replacement_schema: String,
385}
386
387#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize)]
393pub struct PendingStatement {
394 pub deploy_id: String,
396 pub sequence_num: i32,
398 pub database: String,
400 pub schema: String,
402 pub object: String,
404 pub object_hash: String,
406 pub statement_sql: String,
408 pub statement_kind: String,
410 pub executed_at: Option<DateTime<Utc>>,
412}
413
414#[cfg(test)]
415mod tests {
416 use super::*;
417
418 #[mz_ore::test]
419 fn test_deployment_kind_display() {
420 assert_eq!(DeploymentKind::Tables.to_string(), "tables");
421 assert_eq!(DeploymentKind::Objects.to_string(), "objects");
422 }
423
424 #[mz_ore::test]
425 fn test_deployment_kind_from_str_valid() {
426 assert_eq!(
427 "tables".parse::<DeploymentKind>().unwrap(),
428 DeploymentKind::Tables
429 );
430 assert_eq!(
431 "objects".parse::<DeploymentKind>().unwrap(),
432 DeploymentKind::Objects
433 );
434 }
435
436 #[mz_ore::test]
437 fn test_deployment_kind_from_str_invalid() {
438 let result = "invalid".parse::<DeploymentKind>();
439 assert!(result.is_err());
440 assert_eq!(result.unwrap_err(), "Invalid deployment kind: invalid");
441 }
442
443 #[mz_ore::test]
444 fn test_deployment_kind_roundtrip() {
445 for kind in [
447 DeploymentKind::Tables,
448 DeploymentKind::Objects,
449 DeploymentKind::Sinks,
450 DeploymentKind::Replacement,
451 ] {
452 let s = kind.to_string();
453 let parsed: DeploymentKind = s.parse().unwrap();
454 assert_eq!(kind, parsed);
455 }
456 }
457
458 #[mz_ore::test]
459 fn test_cluster_options_from_cluster_success() {
460 let cluster = Cluster {
461 id: "u1".to_string(),
462 name: "quickstart".to_string(),
463 size: Some("25cc".to_string()),
464 replication_factor: Some(2),
465 };
466
467 let options = ClusterOptions::from_cluster(&cluster).unwrap();
468 assert_eq!(options.size, "25cc");
469 assert_eq!(options.replication_factor, 2);
470 }
471
472 #[mz_ore::test]
473 fn test_cluster_options_from_cluster_default_replication() {
474 let cluster = Cluster {
475 id: "u1".to_string(),
476 name: "quickstart".to_string(),
477 size: Some("25cc".to_string()),
478 replication_factor: None, };
480
481 let options = ClusterOptions::from_cluster(&cluster).unwrap();
482 assert_eq!(options.size, "25cc");
483 assert_eq!(options.replication_factor, 1);
484 }
485
486 #[mz_ore::test]
487 fn test_cluster_options_from_cluster_no_size() {
488 let cluster = Cluster {
489 id: "u1".to_string(),
490 name: "unmanaged".to_string(),
491 size: None, replication_factor: Some(1),
493 };
494
495 let result = ClusterOptions::from_cluster(&cluster);
496 assert!(result.is_err());
497 let err_msg = result.unwrap_err();
498 assert!(err_msg.contains("unmanaged"));
499 assert!(err_msg.contains("has no size"));
500 }
501
502 #[mz_ore::test]
503 fn test_cluster_options_from_cluster_negative_replication() {
504 let cluster = Cluster {
505 id: "u1".to_string(),
506 name: "test".to_string(),
507 size: Some("25cc".to_string()),
508 replication_factor: Some(-1), };
510
511 let result = ClusterOptions::from_cluster(&cluster);
512 assert!(result.is_err());
513 assert!(result.unwrap_err().contains("Invalid replication_factor"));
514 }
515
516 #[mz_ore::test]
517 fn test_cluster_equality() {
518 let cluster1 = Cluster {
519 id: "u1".to_string(),
520 name: "test".to_string(),
521 size: Some("25cc".to_string()),
522 replication_factor: Some(1),
523 };
524
525 let cluster2 = Cluster {
526 id: "u1".to_string(),
527 name: "test".to_string(),
528 size: Some("25cc".to_string()),
529 replication_factor: Some(1),
530 };
531
532 let cluster3 = Cluster {
533 id: "u2".to_string(), name: "test".to_string(),
535 size: Some("25cc".to_string()),
536 replication_factor: Some(1),
537 };
538
539 assert_eq!(cluster1, cluster2);
540 assert_ne!(cluster1, cluster3);
541 }
542
543 #[mz_ore::test]
544 fn test_cluster_options_equality() {
545 let opts1 = ClusterOptions {
546 size: "25cc".to_string(),
547 replication_factor: 2,
548 };
549
550 let opts2 = ClusterOptions {
551 size: "25cc".to_string(),
552 replication_factor: 2,
553 };
554
555 let opts3 = ClusterOptions {
556 size: "50cc".to_string(),
557 replication_factor: 2,
558 };
559
560 assert_eq!(opts1, opts2);
561 assert_ne!(opts1, opts3);
562 }
563
564 #[mz_ore::test]
565 fn test_cluster_replica_equality() {
566 let r1 = ClusterReplica {
567 name: "r1".to_string(),
568 size: "25cc".to_string(),
569 availability_zone: Some("use1-az1".to_string()),
570 };
571
572 let r2 = ClusterReplica {
573 name: "r1".to_string(),
574 size: "25cc".to_string(),
575 availability_zone: Some("use1-az1".to_string()),
576 };
577
578 let r3 = ClusterReplica {
579 name: "r2".to_string(),
580 size: "25cc".to_string(),
581 availability_zone: None,
582 };
583
584 assert_eq!(r1, r2);
585 assert_ne!(r1, r3);
586 }
587
588 #[mz_ore::test]
589 fn test_cluster_grant_equality() {
590 let g1 = ObjectGrant {
591 grantee: "reader".to_string(),
592 privilege_type: "USAGE".to_string(),
593 };
594
595 let g2 = ObjectGrant {
596 grantee: "reader".to_string(),
597 privilege_type: "USAGE".to_string(),
598 };
599
600 let g3 = ObjectGrant {
601 grantee: "writer".to_string(),
602 privilege_type: "CREATE".to_string(),
603 };
604
605 assert_eq!(g1, g2);
606 assert_ne!(g1, g3);
607 }
608
609 #[mz_ore::test]
610 fn test_cluster_config_managed() {
611 let config = ClusterConfig::Managed {
612 options: ClusterOptions {
613 size: "25cc".to_string(),
614 replication_factor: 2,
615 },
616 grants: vec![ObjectGrant {
617 grantee: "reader".to_string(),
618 privilege_type: "USAGE".to_string(),
619 }],
620 };
621
622 assert_eq!(config.grants().len(), 1);
623 assert_eq!(config.grants()[0].grantee, "reader");
624 }
625
626 #[mz_ore::test]
627 fn test_cluster_config_unmanaged() {
628 let config = ClusterConfig::Unmanaged {
629 replicas: vec![
630 ClusterReplica {
631 name: "r1".to_string(),
632 size: "25cc".to_string(),
633 availability_zone: None,
634 },
635 ClusterReplica {
636 name: "r2".to_string(),
637 size: "50cc".to_string(),
638 availability_zone: Some("use1-az1".to_string()),
639 },
640 ],
641 grants: vec![],
642 };
643
644 if let ClusterConfig::Unmanaged { replicas, grants } = &config {
645 assert_eq!(replicas.len(), 2);
646 assert_eq!(replicas[0].name, "r1");
647 assert_eq!(replicas[1].availability_zone, Some("use1-az1".to_string()));
648 assert!(grants.is_empty());
649 } else {
650 panic!("Expected Unmanaged config");
651 }
652 }
653
654 #[mz_ore::test]
655 fn test_cluster_config_unmanaged_empty_replicas() {
656 let config = ClusterConfig::Unmanaged {
658 replicas: vec![],
659 grants: vec![ObjectGrant {
660 grantee: "admin".to_string(),
661 privilege_type: "CREATE".to_string(),
662 }],
663 };
664
665 if let ClusterConfig::Unmanaged { replicas, grants } = &config {
666 assert!(replicas.is_empty());
667 assert_eq!(grants.len(), 1);
668 } else {
669 panic!("Expected Unmanaged config");
670 }
671 }
672}