1use std::collections::BTreeSet;
21
22use crate::cli::commands::ObjectRef;
23use crate::cli::error::CliError;
24use crate::client::{Client, quote_identifier};
25use crate::config::Settings;
26use crate::project::SchemaQualifier;
27use crate::project::analysis::changeset::ChangeSet;
28use crate::project::analysis::deployment_snapshot;
29use crate::project::ast::Statement;
30use crate::project::ir::compiled::FullyQualifiedName;
31use crate::project::resolve::normalize::NormalizingVisitor;
32use crate::{info, verbose};
33
34fn overlay_db_name(base_db: &str, profile: &str) -> String {
36 format!("{}__{}", base_db, profile)
37}
38
39async fn refuse_if_targets_production_cluster(
42 client: &Client,
43 cluster: &str,
44) -> Result<(), CliError> {
45 let production = client.deployments().list_production_clusters().await?;
46 if let Some(rec) = production.into_iter().find(|r| r.cluster_name == cluster) {
47 return Err(CliError::DevTargetsProductionCluster { cluster: rec });
48 }
49 Ok(())
50}
51
52pub async fn run(
62 settings: &Settings,
63 cluster: Option<String>,
64 down: bool,
65 dry_run: bool,
66) -> Result<(), CliError> {
67 let profile = settings.connection();
68 let profile_name = settings
70 .profile_name
71 .clone()
72 .expect("dev requires an active profile");
73 let project_name = settings
74 .directory
75 .file_name()
76 .and_then(|s| s.to_str())
77 .filter(|s| !s.is_empty())
78 .ok_or_else(|| CliError::InvalidProjectDirectory {
79 path: settings.directory.display().to_string(),
80 })?
81 .to_string();
82
83 let planned_project = super::compile::run(settings, true).await?;
84
85 let in_project_databases: BTreeSet<String> = planned_project
86 .databases
87 .iter()
88 .map(|db| db.name.clone())
89 .collect();
90
91 let client = Client::connect_with_profile(profile.clone())
92 .await
93 .map_err(CliError::Connection)?;
94
95 crate::cli::commands::setup::verify(&client, settings.emulator()).await?;
96 let role =
97 crate::cli::commands::setup::validate_connection(&client, settings.emulator()).await?;
98 crate::cli::commands::setup::require_developer(role)?;
99
100 if in_project_databases.is_empty() {
101 info!("Project has no databases — nothing to overlay.");
102 return Ok(());
103 }
104
105 let sample_overlay_db = overlay_db_name(
106 in_project_databases.iter().next().expect("non-empty"),
107 &profile_name,
108 );
109 crate::cli::commands::setup::require_createdb(&client, &profile.username, &sample_overlay_db)
110 .await?;
111
112 if down {
113 drop_phase(&client, &profile_name, &project_name, &in_project_databases).await?;
114 info!("Overlay removed.");
115 return Ok(());
116 }
117
118 let target_cluster = cluster.expect("cluster required unless --down");
120 refuse_if_targets_production_cluster(&client, &target_cluster).await?;
121
122 let new_snapshot = deployment_snapshot::build_snapshot_from_planned(&planned_project)?;
123 let production_snapshot = deployment_snapshot::load_from_database(&client, None).await?;
124
125 let change_set = if production_snapshot.objects.is_empty() {
127 verbose!("Full deployment: no production deployment found");
128 None
129 } else {
130 Some(ChangeSet::from_deployment_snapshot_comparison(
131 &production_snapshot,
132 &new_snapshot,
133 &planned_project,
134 ))
135 };
136
137 let all_objects = match change_set.as_ref() {
138 Some(cs) if cs.is_empty() => Vec::new(),
139 Some(cs) => {
140 verbose!("{}", cs);
141 planned_project.get_sorted_objects_filtered(&cs.objects_to_deploy)?
142 }
143 None => planned_project.get_sorted_objects()?,
144 };
145
146 let mut skipped = 0usize;
147 let overlay_objects: Vec<ObjectRef<'_>> = all_objects
148 .into_iter()
149 .filter(|(_, typed_obj)| match &typed_obj.stmt {
150 Statement::CreateView(_) | Statement::CreateMaterializedView(_) => true,
151 _ => {
152 skipped += 1;
153 false
154 }
155 })
156 .collect();
157 if skipped > 0 {
158 verbose!(
159 "skipped {} object(s) of unsupported type (tables/sources/sinks)",
160 skipped
161 );
162 }
163
164 let dirty_schemas: BTreeSet<SchemaQualifier> = overlay_objects
165 .iter()
166 .map(|(id, _)| {
167 SchemaQualifier::new(id.expect_database().to_string(), id.schema().to_string())
168 })
169 .collect();
170
171 print_plan(&dirty_schemas, &profile_name);
172
173 if dry_run {
174 return Ok(());
175 }
176
177 drop_phase(&client, &profile_name, &project_name, &in_project_databases).await?;
178
179 if dirty_schemas.is_empty() {
180 info!("Dev overlay ready (nothing to overlay).");
181 return Ok(());
182 }
183
184 create_phase(
185 &client,
186 &profile_name,
187 &project_name,
188 &in_project_databases,
189 &dirty_schemas,
190 &overlay_objects,
191 &target_cluster,
192 )
193 .await?;
194
195 info!("Dev overlay ready.");
196 Ok(())
197}
198
199fn print_plan(dirty_schemas: &BTreeSet<SchemaQualifier>, profile_name: &str) {
200 if dirty_schemas.is_empty() {
201 info!("Dirty set is empty — nothing to overlay.");
202 return;
203 }
204 info!("→ Dirty schemas:");
205 for qual in dirty_schemas {
206 info!(" {}.{}", qual.database, qual.schema);
207 }
208
209 let overlay_dbs: BTreeSet<String> = dirty_schemas
210 .iter()
211 .map(|q| overlay_db_name(&q.database, profile_name))
212 .collect();
213 info!("→ Overlay databases:");
214 for db in &overlay_dbs {
215 info!(" {}", db);
216 }
217}
218
219pub(crate) async fn drop_phase(
224 client: &Client,
225 profile_name: &str,
226 project_name: &str,
227 in_project_databases: &BTreeSet<String>,
228) -> Result<(), CliError> {
229 let overlays = client.dev_overlays();
230
231 let existing: BTreeSet<String> = overlays
232 .list_overlays(profile_name, project_name)
233 .await?
234 .into_iter()
235 .collect();
236 for db in &existing {
237 drop_database(client, db).await?;
238 }
239 overlays.delete_overlays(profile_name, project_name).await?;
240
241 for base_db in in_project_databases {
242 let overlay_db = overlay_db_name(base_db, profile_name);
243 if !existing.contains(&overlay_db) {
244 drop_database(client, &overlay_db).await?;
245 }
246 }
247
248 Ok(())
249}
250
251async fn drop_database(client: &Client, database: &str) -> Result<(), CliError> {
252 let sql = format!(
253 "DROP DATABASE IF EXISTS {} CASCADE",
254 quote_identifier(database),
255 );
256 client.execute(&sql, &[]).await?;
257 Ok(())
258}
259
260pub(crate) async fn create_phase(
268 client: &Client,
269 profile_name: &str,
270 project_name: &str,
271 in_project_databases: &BTreeSet<String>,
272 dirty_schemas: &BTreeSet<SchemaQualifier>,
273 overlay_objects: &[ObjectRef<'_>],
274 target_cluster: &str,
275) -> Result<(), CliError> {
276 let provisioning = client.provisioning();
277 let overlays = client.dev_overlays();
278
279 let mut created_overlay_dbs: BTreeSet<String> = BTreeSet::new();
280 for qualifier in dirty_schemas {
281 let overlay_db = overlay_db_name(&qualifier.database, profile_name);
282 if created_overlay_dbs.insert(overlay_db.clone()) {
283 provisioning.create_database(&overlay_db).await?;
284 overlays
285 .insert_overlay(profile_name, project_name, &overlay_db)
286 .await?;
287 }
288 }
289
290 for qualifier in dirty_schemas {
291 let overlay_db = overlay_db_name(&qualifier.database, profile_name);
292 provisioning
293 .create_schema(&overlay_db, &qualifier.schema)
294 .await?;
295 }
296
297 for (object_id, typed_object) in overlay_objects {
298 let original_fqn: FullyQualifiedName = object_id.clone().into();
299 let mut visitor = NormalizingVisitor::overlay(
300 &original_fqn,
301 profile_name,
302 in_project_databases,
303 dirty_schemas,
304 target_cluster,
305 );
306
307 let stmt = typed_object
308 .stmt
309 .clone()
310 .normalize_name_with(&visitor, &original_fqn.to_item_name())
311 .normalize_dependencies_with(&mut visitor)
312 .normalize_cluster_with(&visitor);
313
314 client.execute(&stmt.to_string(), &[]).await?;
315
316 let mut indexes = typed_object.indexes.clone();
317 visitor.normalize_index_references(&mut indexes);
318 visitor.normalize_index_clusters(&mut indexes);
319 for index in &indexes {
320 client.execute(&index.to_string(), &[]).await?;
321 }
322 }
323
324 Ok(())
325}