1use std::collections::BTreeSet;
21
22use crate::cli::commands::ObjectRef;
23use crate::cli::error::CliError;
24use crate::client::{Client, quote_identifier};
25use crate::config::Settings;
26use crate::project::SchemaQualifier;
27use crate::project::analysis::changeset::ChangeSet;
28use crate::project::analysis::deployment_snapshot;
29use crate::project::ast::Statement;
30use crate::project::ir::compiled::FullyQualifiedName;
31use crate::project::resolve::normalize::NormalizingVisitor;
32use crate::{info, verbose};
33
34fn overlay_db_name(base_db: &str, profile: &str) -> String {
36 format!("{}__{}", base_db, profile)
37}
38
39async fn refuse_if_targets_production_cluster(
42 client: &Client,
43 cluster: &str,
44) -> Result<(), CliError> {
45 let production = client.deployments().list_production_clusters().await?;
46 if let Some(rec) = production.into_iter().find(|r| r.cluster_name == cluster) {
47 return Err(CliError::DevTargetsProductionCluster { cluster: rec });
48 }
49 Ok(())
50}
51
52pub async fn run(
62 settings: &Settings,
63 cluster: Option<String>,
64 down: bool,
65 dry_run: bool,
66) -> Result<(), CliError> {
67 let profile = settings.connection();
68 let profile_name = settings
70 .profile_name
71 .clone()
72 .expect("dev requires an active profile");
73 let project_name = settings
74 .directory
75 .file_name()
76 .and_then(|s| s.to_str())
77 .filter(|s| !s.is_empty())
78 .ok_or_else(|| CliError::InvalidProjectDirectory {
79 path: settings.directory.display().to_string(),
80 })?
81 .to_string();
82
83 let planned_project = super::compile::run(settings, true).await?;
84
85 let in_project_databases: BTreeSet<String> = planned_project
86 .databases
87 .iter()
88 .map(|db| db.name.clone())
89 .collect();
90
91 let client = Client::connect_with_profile(profile.clone())
92 .await
93 .map_err(CliError::Connection)?;
94
95 crate::cli::commands::setup::verify(&client, settings.emulator()).await?;
96 let role =
97 crate::cli::commands::setup::validate_connection(&client, settings.emulator()).await?;
98 crate::cli::commands::setup::require_developer(role)?;
99
100 if in_project_databases.is_empty() {
101 info!("Project has no databases — nothing to overlay.");
102 return Ok(());
103 }
104
105 let sample_overlay_db = overlay_db_name(
106 in_project_databases.iter().next().expect("non-empty"),
107 &profile_name,
108 );
109 crate::cli::commands::setup::require_createdb(&client, &profile.username, &sample_overlay_db)
110 .await?;
111
112 if down {
113 drop_phase(&client, &profile_name, &project_name, &in_project_databases).await?;
114 info!("Overlay removed.");
115 return Ok(());
116 }
117
118 let target_cluster = cluster.expect("cluster required unless --down");
120 refuse_if_targets_production_cluster(&client, &target_cluster).await?;
121
122 let new_snapshot = deployment_snapshot::build_snapshot_from_planned(&planned_project)?;
123 let production_snapshot = deployment_snapshot::load_from_database(&client, None).await?;
124
125 let change_set = if production_snapshot.objects.is_empty() {
127 verbose!("Full deployment: no production deployment found");
128 None
129 } else {
130 Some(ChangeSet::from_deployment_snapshot_comparison(
131 &production_snapshot,
132 &new_snapshot,
133 &planned_project,
134 &BTreeSet::new(),
135 ))
136 };
137
138 let all_objects = match change_set.as_ref() {
139 Some(cs) if cs.is_empty() => Vec::new(),
140 Some(cs) => {
141 verbose!("{}", cs);
142 planned_project.get_sorted_objects_filtered(&cs.objects_to_deploy)?
143 }
144 None => planned_project.get_sorted_objects()?,
145 };
146
147 let mut skipped = 0usize;
148 let overlay_objects: Vec<ObjectRef<'_>> = all_objects
149 .into_iter()
150 .filter(|(_, typed_obj)| match &typed_obj.stmt {
151 Statement::CreateView(_) | Statement::CreateMaterializedView(_) => true,
152 _ => {
153 skipped += 1;
154 false
155 }
156 })
157 .collect();
158 if skipped > 0 {
159 verbose!(
160 "skipped {} object(s) of unsupported type (tables/sources/sinks)",
161 skipped
162 );
163 }
164
165 let dirty_schemas: BTreeSet<SchemaQualifier> = overlay_objects
166 .iter()
167 .map(|(id, _)| {
168 SchemaQualifier::new(id.expect_database().to_string(), id.schema().to_string())
169 })
170 .collect();
171
172 print_plan(&dirty_schemas, &profile_name);
173
174 if dry_run {
175 return Ok(());
176 }
177
178 drop_phase(&client, &profile_name, &project_name, &in_project_databases).await?;
179
180 if dirty_schemas.is_empty() {
181 info!("Dev overlay ready (nothing to overlay).");
182 return Ok(());
183 }
184
185 create_phase(
186 &client,
187 &profile_name,
188 &project_name,
189 &in_project_databases,
190 &dirty_schemas,
191 &overlay_objects,
192 &target_cluster,
193 )
194 .await?;
195
196 info!("Dev overlay ready.");
197 Ok(())
198}
199
200fn print_plan(dirty_schemas: &BTreeSet<SchemaQualifier>, profile_name: &str) {
201 if dirty_schemas.is_empty() {
202 info!("Dirty set is empty — nothing to overlay.");
203 return;
204 }
205 info!("→ Dirty schemas:");
206 for qual in dirty_schemas {
207 info!(" {}.{}", qual.database, qual.schema);
208 }
209
210 let overlay_dbs: BTreeSet<String> = dirty_schemas
211 .iter()
212 .map(|q| overlay_db_name(&q.database, profile_name))
213 .collect();
214 info!("→ Overlay databases:");
215 for db in &overlay_dbs {
216 info!(" {}", db);
217 }
218}
219
220pub(crate) async fn drop_phase(
225 client: &Client,
226 profile_name: &str,
227 project_name: &str,
228 in_project_databases: &BTreeSet<String>,
229) -> Result<(), CliError> {
230 let overlays = client.dev_overlays();
231
232 let existing: BTreeSet<String> = overlays
233 .list_overlays(profile_name, project_name)
234 .await?
235 .into_iter()
236 .collect();
237 for db in &existing {
238 drop_database(client, db).await?;
239 }
240 overlays.delete_overlays(profile_name, project_name).await?;
241
242 for base_db in in_project_databases {
243 let overlay_db = overlay_db_name(base_db, profile_name);
244 if !existing.contains(&overlay_db) {
245 drop_database(client, &overlay_db).await?;
246 }
247 }
248
249 Ok(())
250}
251
252async fn drop_database(client: &Client, database: &str) -> Result<(), CliError> {
253 let sql = format!(
254 "DROP DATABASE IF EXISTS {} CASCADE",
255 quote_identifier(database),
256 );
257 client.execute(&sql, &[]).await?;
258 Ok(())
259}
260
261pub(crate) async fn create_phase(
269 client: &Client,
270 profile_name: &str,
271 project_name: &str,
272 in_project_databases: &BTreeSet<String>,
273 dirty_schemas: &BTreeSet<SchemaQualifier>,
274 overlay_objects: &[ObjectRef<'_>],
275 target_cluster: &str,
276) -> Result<(), CliError> {
277 let provisioning = client.provisioning();
278 let overlays = client.dev_overlays();
279
280 let mut created_overlay_dbs: BTreeSet<String> = BTreeSet::new();
281 for qualifier in dirty_schemas {
282 let overlay_db = overlay_db_name(&qualifier.database, profile_name);
283 if created_overlay_dbs.insert(overlay_db.clone()) {
284 provisioning.create_database(&overlay_db).await?;
285 overlays
286 .insert_overlay(profile_name, project_name, &overlay_db)
287 .await?;
288 }
289 }
290
291 for qualifier in dirty_schemas {
292 let overlay_db = overlay_db_name(&qualifier.database, profile_name);
293 provisioning
294 .create_schema(&overlay_db, &qualifier.schema)
295 .await?;
296 }
297
298 for (object_id, typed_object) in overlay_objects {
299 let original_fqn: FullyQualifiedName = object_id.clone().into();
300 let mut visitor = NormalizingVisitor::overlay(
301 &original_fqn,
302 profile_name,
303 in_project_databases,
304 dirty_schemas,
305 target_cluster,
306 );
307
308 let stmt = typed_object
309 .stmt
310 .clone()
311 .normalize_name_with(&visitor, &original_fqn.to_item_name())
312 .normalize_dependencies_with(&mut visitor)
313 .normalize_cluster_with(&visitor);
314
315 client.execute(&stmt.to_string(), &[]).await?;
316
317 let mut indexes = typed_object.indexes.clone();
318 visitor.normalize_index_references(&mut indexes);
319 visitor.normalize_index_clusters(&mut indexes);
320 for index in &indexes {
321 client.execute(&index.to_string(), &[]).await?;
322 }
323 }
324
325 Ok(())
326}