mz_environmentd/deployment/
preflight.rs1use std::pin::pin;
13use std::sync::Arc;
14use std::time::Duration;
15
16use mz_adapter::ResultExt;
17use mz_catalog::durable::{BootstrapArgs, CatalogError, Metrics, OpenableDurableCatalogState};
18use mz_controller_types::ReplicaId;
19use mz_ore::channel::trigger;
20use mz_ore::exit;
21use mz_ore::halt;
22use mz_ore::str::separated;
23use mz_persist_client::PersistClient;
24use mz_repr::{CatalogItemId, Timestamp};
25use mz_sql::catalog::EnvironmentId;
26use tracing::info;
27
28use crate::BUILD_INFO;
29use crate::deployment::state::DeploymentState;
30
31pub struct PreflightInput {
33 pub boot_ts: Timestamp,
34 pub environment_id: EnvironmentId,
35 pub persist_client: PersistClient,
36 pub deploy_generation: u64,
37 pub deployment_state: DeploymentState,
38 pub openable_adapter_storage: Box<dyn OpenableDurableCatalogState>,
39 pub catalog_metrics: Arc<Metrics>,
40 pub caught_up_max_wait: Duration,
41 pub ddl_check_interval: Duration,
42 pub panic_after_timeout: bool,
43 pub bootstrap_args: BootstrapArgs,
44}
45
46pub struct PreflightOutput {
48 pub openable_adapter_storage: Box<dyn OpenableDurableCatalogState>,
49 pub read_only: bool,
50 pub caught_up_trigger: Option<trigger::Trigger>,
51}
52
53pub async fn preflight_0dt(
58 PreflightInput {
59 boot_ts,
60 environment_id,
61 persist_client,
62 deploy_generation,
63 deployment_state,
64 mut openable_adapter_storage,
65 catalog_metrics,
66 caught_up_max_wait,
67 ddl_check_interval,
68 panic_after_timeout,
69 bootstrap_args,
70 }: PreflightInput,
71) -> Result<PreflightOutput, CatalogError> {
72 info!(%deploy_generation, ?caught_up_max_wait, "performing 0dt preflight checks");
73
74 if !openable_adapter_storage.is_initialized().await? {
75 info!("catalog not initialized; booting with writes allowed");
76 return Ok(PreflightOutput {
77 openable_adapter_storage,
78 read_only: false,
79 caught_up_trigger: None,
80 });
81 }
82
83 let catalog_generation = openable_adapter_storage.get_deployment_generation().await?;
84 info!(%catalog_generation, "catalog initialized");
85 if catalog_generation < deploy_generation {
86 info!("this deployment is a new generation; booting in read only mode");
87
88 let (caught_up_trigger, mut caught_up_receiver) = trigger::channel();
89
90 mz_ore::task::spawn(|| "preflight_0dt", async move {
92 let (initial_next_user_item_id, initial_next_replica_id) = get_next_ids(
93 boot_ts,
94 persist_client.clone(),
95 environment_id.clone(),
96 deploy_generation,
97 Arc::clone(&catalog_metrics),
98 bootstrap_args.clone(),
99 )
100 .await;
101
102 info!(
103 %initial_next_user_item_id,
104 %initial_next_replica_id,
105 "waiting for deployment to be caught up");
106
107 let caught_up_max_wait_fut = async {
108 tokio::time::sleep(caught_up_max_wait).await;
109 ()
110 };
111 let mut caught_up_max_wait_fut = pin!(caught_up_max_wait_fut);
112
113 let mut skip_catchup = deployment_state.set_catching_up();
114
115 let mut check_ddl_changes_interval = tokio::time::interval(ddl_check_interval);
116 check_ddl_changes_interval
117 .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
118
119 let mut should_skip_catchup = false;
120 loop {
121 tokio::select! {
122 biased;
123
124 () = &mut skip_catchup => {
125 info!("skipping waiting for deployment to catch up due to administrator request");
126 should_skip_catchup = true;
127 break;
128 }
129 () = &mut caught_up_receiver => {
130 info!("deployment caught up");
131 break;
132 }
133 () = &mut caught_up_max_wait_fut => {
134 if panic_after_timeout {
135 panic!("not caught up within {:?}", caught_up_max_wait);
136 }
137 info!("not caught up within {:?}, proceeding now", caught_up_max_wait);
138 break;
139 }
140 _ = check_ddl_changes_interval.tick() => {
141 check_ddl_changes(
142 boot_ts,
143 persist_client.clone(),
144 environment_id.clone(),
145 deploy_generation,
146 Arc::clone(&catalog_metrics),
147 bootstrap_args.clone(),
148 initial_next_user_item_id,
149 initial_next_replica_id,
150 )
151 .await;
152 }
153 }
154 }
155
156 if !should_skip_catchup {
159 check_ddl_changes(
160 boot_ts,
161 persist_client.clone(),
162 environment_id.clone(),
163 deploy_generation,
164 Arc::clone(&catalog_metrics),
165 bootstrap_args.clone(),
166 initial_next_user_item_id,
167 initial_next_replica_id,
168 )
169 .await;
170 }
171
172 let promoted = deployment_state.set_ready_to_promote();
174 info!("announced as ready to promote; waiting for promotion");
175 promoted.await;
176
177 info!("promoted; attempting takeover");
179
180 let openable_adapter_storage = mz_catalog::durable::persist_backed_catalog_state(
194 persist_client.clone(),
195 environment_id.organization_id(),
196 BUILD_INFO.semver_version(),
197 Some(deploy_generation),
198 Arc::clone(&catalog_metrics),
199 )
200 .await
201 .expect("incompatible catalog/persist version");
202
203 let (_catalog, _audit_logs) = openable_adapter_storage
204 .open(boot_ts, &bootstrap_args)
205 .await
206 .unwrap_or_terminate("unexpected error while fencing out old deployment");
207
208 halt!("fenced out old deployment; rebooting as leader")
210 });
211
212 Ok(PreflightOutput {
213 openable_adapter_storage,
214 read_only: true,
215 caught_up_trigger: Some(caught_up_trigger),
216 })
217 } else if catalog_generation == deploy_generation {
218 info!("this deployment is the current generation; booting with writes allowed");
219 Ok(PreflightOutput {
220 openable_adapter_storage,
221 read_only: false,
222 caught_up_trigger: None,
223 })
224 } else {
225 exit!(0, "this deployment has been fenced out");
226 }
227}
228
229async fn check_ddl_changes(
236 boot_ts: Timestamp,
237 persist_client: PersistClient,
238 environment_id: EnvironmentId,
239 deploy_generation: u64,
240 catalog_metrics: Arc<Metrics>,
241 bootstrap_args: BootstrapArgs,
242 initial_next_user_item_id: u64,
243 initial_next_replica_id: u64,
244) {
245 let openable_adapter_storage = mz_catalog::durable::persist_backed_catalog_state(
246 persist_client,
247 environment_id.organization_id(),
248 BUILD_INFO.semver_version(),
249 Some(deploy_generation),
250 catalog_metrics,
251 )
252 .await
253 .expect("incompatible catalog/persist version");
254
255 let (mut catalog, _audit_logs) = openable_adapter_storage
256 .open_savepoint(boot_ts, &bootstrap_args)
257 .await
258 .unwrap_or_terminate("can open in savepoint mode");
259
260 let tx = catalog
261 .transaction()
262 .await
263 .unwrap_or_terminate("unexpected error while getting transaction");
264
265 let new_replicas = tx
270 .get_cluster_replicas()
271 .filter_map(|replica| match replica.replica_id {
272 ReplicaId::User(id) if id >= initial_next_replica_id => Some(replica),
273 _ => None,
274 })
275 .collect::<Vec<_>>();
276
277 let new_objects = tx
278 .get_items()
279 .filter_map(|item| match item.id {
280 CatalogItemId::User(id) if id >= initial_next_user_item_id => Some(item),
281 _ => None,
282 })
283 .collect::<Vec<_>>();
284
285 if new_replicas.is_empty() && new_objects.is_empty() {
286 return;
287 }
288
289 let mut info_parts = Vec::new();
290
291 if !new_replicas.is_empty() {
292 let replicas = new_replicas.iter().map(|r| {
293 format!(
294 "{{replica_id: {}, replica_name: {}, cluster_id: {}}}",
295 r.replica_id, r.name, r.cluster_id
296 )
297 });
298 info_parts.push(format!("New replicas: [{}]", separated(", ", replicas)));
299 }
300
301 if !new_objects.is_empty() {
302 let objects = new_objects
303 .iter()
304 .map(|o| format!("{{object_id: {}, object_name: {}}}", o.id, o.name));
305 info_parts.push(format!("New objects: [{}]", separated(", ", objects)));
306 }
307
308 let extra_info = separated(". ", info_parts);
309
310 halt!(
311 "there have been DDL that we need to react to; rebooting in read-only mode. {}",
312 extra_info
313 )
314}
315
316async fn get_next_ids(
319 boot_ts: Timestamp,
320 persist_client: PersistClient,
321 environment_id: EnvironmentId,
322 deploy_generation: u64,
323 catalog_metrics: Arc<Metrics>,
324 bootstrap_args: BootstrapArgs,
325) -> (u64, u64) {
326 let openable_adapter_storage = mz_catalog::durable::persist_backed_catalog_state(
327 persist_client,
328 environment_id.organization_id(),
329 BUILD_INFO.semver_version(),
330 Some(deploy_generation),
331 catalog_metrics,
332 )
333 .await
334 .expect("incompatible catalog/persist version");
335
336 let (mut catalog, _audit_logs) = openable_adapter_storage
337 .open_savepoint(boot_ts, &bootstrap_args)
338 .await
339 .unwrap_or_terminate("can open in savepoint mode");
340
341 let next_user_item_id = catalog
342 .get_next_user_item_id()
343 .await
344 .expect("can access catalog");
345 let next_replica_item_id = catalog
346 .get_next_user_replica_id()
347 .await
348 .expect("can access catalog");
349
350 (next_user_item_id, next_replica_item_id)
351}