mz_environmentd/deployment/
preflight.rs1use std::pin::pin;
13use std::sync::Arc;
14use std::time::Duration;
15
16use mz_adapter::ResultExt;
17use mz_catalog::durable::{BootstrapArgs, CatalogError, Metrics, OpenableDurableCatalogState};
18use mz_ore::channel::trigger;
19use mz_ore::exit;
20use mz_ore::halt;
21use mz_persist_client::PersistClient;
22use mz_repr::Timestamp;
23use mz_sql::catalog::EnvironmentId;
24use tracing::info;
25
26use crate::BUILD_INFO;
27use crate::deployment::state::DeploymentState;
28
29pub struct PreflightInput {
31 pub boot_ts: Timestamp,
32 pub environment_id: EnvironmentId,
33 pub persist_client: PersistClient,
34 pub deploy_generation: u64,
35 pub deployment_state: DeploymentState,
36 pub openable_adapter_storage: Box<dyn OpenableDurableCatalogState>,
37 pub catalog_metrics: Arc<Metrics>,
38 pub caught_up_max_wait: Duration,
39 pub ddl_check_interval: Duration,
40 pub panic_after_timeout: bool,
41 pub bootstrap_args: BootstrapArgs,
42}
43
44pub struct PreflightOutput {
46 pub openable_adapter_storage: Box<dyn OpenableDurableCatalogState>,
47 pub read_only: bool,
48 pub caught_up_trigger: Option<trigger::Trigger>,
49}
50
51pub async fn preflight_legacy(
53 PreflightInput {
54 boot_ts,
55 environment_id,
56 persist_client,
57 deploy_generation,
58 deployment_state,
59 mut openable_adapter_storage,
60 catalog_metrics,
61 bootstrap_args,
62 caught_up_max_wait: _,
63 ddl_check_interval: _,
64 panic_after_timeout: _,
65 }: PreflightInput,
66) -> Result<Box<dyn OpenableDurableCatalogState>, CatalogError> {
67 tracing::info!("Requested deploy generation {deploy_generation}");
68
69 if !openable_adapter_storage.is_initialized().await? {
70 tracing::info!(
71 "Catalog storage doesn't exist so there's no current deploy generation. We won't wait to be leader"
72 );
73 return Ok(openable_adapter_storage);
74 }
75 let catalog_generation = openable_adapter_storage.get_deployment_generation().await?;
76 tracing::info!("Found catalog generation {catalog_generation:?}");
77 if catalog_generation < deploy_generation {
78 tracing::info!(
79 "Catalog generation {catalog_generation:?} is less than deploy generation {deploy_generation}. Performing pre-flight checks"
80 );
81 match openable_adapter_storage
82 .open_savepoint(boot_ts.clone(), &bootstrap_args)
83 .await
84 {
85 Ok((adapter_storage, _)) => Box::new(adapter_storage).expire().await,
86 Err(CatalogError::Durable(e)) if e.can_recover_with_write_mode() => {
87 tracing::warn!(
94 "Unable to perform upgrade test because the target implementation is uninitialized"
95 );
96 return Ok(mz_catalog::durable::persist_backed_catalog_state(
97 persist_client,
98 environment_id.organization_id(),
99 BUILD_INFO.semver_version(),
100 Some(deploy_generation),
101 Arc::clone(&catalog_metrics),
102 )
103 .await?);
104 }
105 Err(e) => {
106 tracing::warn!(error = %e, "catalog upgrade would have failed");
107 return Err(e);
108 }
109 }
110
111 let promoted = deployment_state.set_ready_to_promote();
112
113 tracing::info!("Waiting for user to promote this envd to leader");
114 promoted.await;
115
116 Ok(mz_catalog::durable::persist_backed_catalog_state(
117 persist_client,
118 environment_id.organization_id(),
119 BUILD_INFO.semver_version(),
120 Some(deploy_generation),
121 Arc::clone(&catalog_metrics),
122 )
123 .await?)
124 } else if catalog_generation == deploy_generation {
125 tracing::info!(
126 "Server requested generation {deploy_generation} which is equal to catalog's generation"
127 );
128 Ok(openable_adapter_storage)
129 } else {
130 mz_ore::halt!(
131 "Server started with requested generation {deploy_generation} but catalog was already at {catalog_generation:?}. Deploy generations must increase monotonically"
132 );
133 }
134}
135
136pub async fn preflight_0dt(
141 PreflightInput {
142 boot_ts,
143 environment_id,
144 persist_client,
145 deploy_generation,
146 deployment_state,
147 mut openable_adapter_storage,
148 catalog_metrics,
149 caught_up_max_wait,
150 ddl_check_interval,
151 panic_after_timeout,
152 bootstrap_args,
153 }: PreflightInput,
154) -> Result<PreflightOutput, CatalogError> {
155 info!(%deploy_generation, ?caught_up_max_wait, "performing 0dt preflight checks");
156
157 if !openable_adapter_storage.is_initialized().await? {
158 info!("catalog not initialized; booting with writes allowed");
159 return Ok(PreflightOutput {
160 openable_adapter_storage,
161 read_only: false,
162 caught_up_trigger: None,
163 });
164 }
165
166 let catalog_generation = openable_adapter_storage.get_deployment_generation().await?;
167 info!(%catalog_generation, "catalog initialized");
168 if catalog_generation < deploy_generation {
169 info!("this deployment is a new generation; booting in read only mode");
170
171 let (caught_up_trigger, mut caught_up_receiver) = trigger::channel();
172
173 mz_ore::task::spawn(|| "preflight_0dt", async move {
175 let (initial_next_user_item_id, initial_next_replica_id) = get_next_ids(
176 boot_ts,
177 persist_client.clone(),
178 environment_id.clone(),
179 deploy_generation,
180 Arc::clone(&catalog_metrics),
181 bootstrap_args.clone(),
182 )
183 .await;
184
185 info!(
186 %initial_next_user_item_id,
187 %initial_next_replica_id,
188 "waiting for deployment to be caught up");
189
190 let caught_up_max_wait_fut = async {
191 tokio::time::sleep(caught_up_max_wait).await;
192 ()
193 };
194 let mut caught_up_max_wait_fut = pin!(caught_up_max_wait_fut);
195
196 let mut skip_catchup = deployment_state.set_catching_up();
197
198 let mut check_ddl_changes_interval = tokio::time::interval(ddl_check_interval);
199 check_ddl_changes_interval
200 .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
201
202 let mut should_skip_catchup = false;
203 loop {
204 tokio::select! {
205 biased;
206
207 () = &mut skip_catchup => {
208 info!("skipping waiting for deployment to catch up due to administrator request");
209 should_skip_catchup = true;
210 break;
211 }
212 () = &mut caught_up_receiver => {
213 info!("deployment caught up");
214 break;
215 }
216 () = &mut caught_up_max_wait_fut => {
217 if panic_after_timeout {
218 panic!("not caught up within {:?}", caught_up_max_wait);
219 }
220 info!("not caught up within {:?}, proceeding now", caught_up_max_wait);
221 break;
222 }
223 _ = check_ddl_changes_interval.tick() => {
224 check_ddl_changes(
225 boot_ts,
226 persist_client.clone(),
227 environment_id.clone(),
228 deploy_generation,
229 Arc::clone(&catalog_metrics),
230 bootstrap_args.clone(),
231 initial_next_user_item_id,
232 initial_next_replica_id,
233 )
234 .await;
235 }
236 }
237 }
238
239 if !should_skip_catchup {
242 check_ddl_changes(
243 boot_ts,
244 persist_client.clone(),
245 environment_id.clone(),
246 deploy_generation,
247 Arc::clone(&catalog_metrics),
248 bootstrap_args.clone(),
249 initial_next_user_item_id,
250 initial_next_replica_id,
251 )
252 .await;
253 }
254
255 let promoted = deployment_state.set_ready_to_promote();
257 info!("announced as ready to promote; waiting for promotion");
258 promoted.await;
259
260 info!("promoted; attempting takeover");
262
263 let openable_adapter_storage = mz_catalog::durable::persist_backed_catalog_state(
277 persist_client.clone(),
278 environment_id.organization_id(),
279 BUILD_INFO.semver_version(),
280 Some(deploy_generation),
281 Arc::clone(&catalog_metrics),
282 )
283 .await
284 .expect("incompatible catalog/persist version");
285
286 let (_catalog, _audit_logs) = openable_adapter_storage
287 .open(boot_ts, &bootstrap_args)
288 .await
289 .unwrap_or_terminate("unexpected error while fencing out old deployment");
290
291 halt!("fenced out old deployment; rebooting as leader")
293 });
294
295 Ok(PreflightOutput {
296 openable_adapter_storage,
297 read_only: true,
298 caught_up_trigger: Some(caught_up_trigger),
299 })
300 } else if catalog_generation == deploy_generation {
301 info!("this deployment is the current generation; booting with writes allowed");
302 Ok(PreflightOutput {
303 openable_adapter_storage,
304 read_only: false,
305 caught_up_trigger: None,
306 })
307 } else {
308 exit!(0, "this deployment has been fenced out");
309 }
310}
311
312async fn check_ddl_changes(
316 boot_ts: Timestamp,
317 persist_client: PersistClient,
318 environment_id: EnvironmentId,
319 deploy_generation: u64,
320 catalog_metrics: Arc<Metrics>,
321 bootstrap_args: BootstrapArgs,
322 initial_next_user_item_id: u64,
323 initial_next_replica_id: u64,
324) {
325 let (next_user_item_id, next_replica_id) = get_next_ids(
326 boot_ts,
327 persist_client.clone(),
328 environment_id.clone(),
329 deploy_generation,
330 Arc::clone(&catalog_metrics),
331 bootstrap_args.clone(),
332 )
333 .await;
334
335 tracing::info!(
336 %initial_next_user_item_id,
337 %initial_next_replica_id,
338 %next_user_item_id,
339 %next_replica_id,
340 "checking if there was any relevant DDL");
341
342 if next_user_item_id > initial_next_user_item_id || next_replica_id > initial_next_replica_id {
343 halt!("there have been DDL that we need to react to; rebooting in read-only mode")
344 }
345}
346
347async fn get_next_ids(
350 boot_ts: Timestamp,
351 persist_client: PersistClient,
352 environment_id: EnvironmentId,
353 deploy_generation: u64,
354 catalog_metrics: Arc<Metrics>,
355 bootstrap_args: BootstrapArgs,
356) -> (u64, u64) {
357 let openable_adapter_storage = mz_catalog::durable::persist_backed_catalog_state(
358 persist_client,
359 environment_id.organization_id(),
360 BUILD_INFO.semver_version(),
361 Some(deploy_generation),
362 catalog_metrics,
363 )
364 .await
365 .expect("incompatible catalog/persist version");
366
367 let (mut catalog, _audit_logs) = openable_adapter_storage
368 .open_savepoint(boot_ts, &bootstrap_args)
369 .await
370 .unwrap_or_terminate("can open in savepoint mode");
371
372 let next_user_item_id = catalog
373 .get_next_user_item_id()
374 .await
375 .expect("can access catalog");
376 let next_replica_item_id = catalog
377 .get_next_user_replica_id()
378 .await
379 .expect("can access catalog");
380
381 (next_user_item_id, next_replica_item_id)
382}