mz_environmentd/deployment/
preflight.rs1use std::pin::pin;
13use std::sync::Arc;
14use std::time::Duration;
15
16use mz_adapter::ResultExt;
17use mz_catalog::durable::{BootstrapArgs, CatalogError, Metrics, OpenableDurableCatalogState};
18use mz_ore::channel::trigger;
19use mz_ore::exit;
20use mz_ore::halt;
21use mz_persist_client::PersistClient;
22use mz_repr::Timestamp;
23use mz_sql::catalog::EnvironmentId;
24use tracing::info;
25
26use crate::BUILD_INFO;
27use crate::deployment::state::DeploymentState;
28
29pub struct PreflightInput {
31 pub boot_ts: Timestamp,
32 pub environment_id: EnvironmentId,
33 pub persist_client: PersistClient,
34 pub deploy_generation: u64,
35 pub deployment_state: DeploymentState,
36 pub openable_adapter_storage: Box<dyn OpenableDurableCatalogState>,
37 pub catalog_metrics: Arc<Metrics>,
38 pub caught_up_max_wait: Duration,
39 pub ddl_check_interval: Duration,
40 pub panic_after_timeout: bool,
41 pub bootstrap_args: BootstrapArgs,
42}
43
44pub struct PreflightOutput {
46 pub openable_adapter_storage: Box<dyn OpenableDurableCatalogState>,
47 pub read_only: bool,
48 pub caught_up_trigger: Option<trigger::Trigger>,
49}
50
51pub async fn preflight_0dt(
56 PreflightInput {
57 boot_ts,
58 environment_id,
59 persist_client,
60 deploy_generation,
61 deployment_state,
62 mut openable_adapter_storage,
63 catalog_metrics,
64 caught_up_max_wait,
65 ddl_check_interval,
66 panic_after_timeout,
67 bootstrap_args,
68 }: PreflightInput,
69) -> Result<PreflightOutput, CatalogError> {
70 info!(%deploy_generation, ?caught_up_max_wait, "performing 0dt preflight checks");
71
72 if !openable_adapter_storage.is_initialized().await? {
73 info!("catalog not initialized; booting with writes allowed");
74 return Ok(PreflightOutput {
75 openable_adapter_storage,
76 read_only: false,
77 caught_up_trigger: None,
78 });
79 }
80
81 let catalog_generation = openable_adapter_storage.get_deployment_generation().await?;
82 info!(%catalog_generation, "catalog initialized");
83 if catalog_generation < deploy_generation {
84 info!("this deployment is a new generation; booting in read only mode");
85
86 let (caught_up_trigger, mut caught_up_receiver) = trigger::channel();
87
88 mz_ore::task::spawn(|| "preflight_0dt", async move {
90 let (initial_next_user_item_id, initial_next_replica_id) = get_next_ids(
91 boot_ts,
92 persist_client.clone(),
93 environment_id.clone(),
94 deploy_generation,
95 Arc::clone(&catalog_metrics),
96 bootstrap_args.clone(),
97 )
98 .await;
99
100 info!(
101 %initial_next_user_item_id,
102 %initial_next_replica_id,
103 "waiting for deployment to be caught up");
104
105 let caught_up_max_wait_fut = async {
106 tokio::time::sleep(caught_up_max_wait).await;
107 ()
108 };
109 let mut caught_up_max_wait_fut = pin!(caught_up_max_wait_fut);
110
111 let mut skip_catchup = deployment_state.set_catching_up();
112
113 let mut check_ddl_changes_interval = tokio::time::interval(ddl_check_interval);
114 check_ddl_changes_interval
115 .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
116
117 let mut should_skip_catchup = false;
118 loop {
119 tokio::select! {
120 biased;
121
122 () = &mut skip_catchup => {
123 info!("skipping waiting for deployment to catch up due to administrator request");
124 should_skip_catchup = true;
125 break;
126 }
127 () = &mut caught_up_receiver => {
128 info!("deployment caught up");
129 break;
130 }
131 () = &mut caught_up_max_wait_fut => {
132 if panic_after_timeout {
133 panic!("not caught up within {:?}", caught_up_max_wait);
134 }
135 info!("not caught up within {:?}, proceeding now", caught_up_max_wait);
136 break;
137 }
138 _ = check_ddl_changes_interval.tick() => {
139 check_ddl_changes(
140 boot_ts,
141 persist_client.clone(),
142 environment_id.clone(),
143 deploy_generation,
144 Arc::clone(&catalog_metrics),
145 bootstrap_args.clone(),
146 initial_next_user_item_id,
147 initial_next_replica_id,
148 )
149 .await;
150 }
151 }
152 }
153
154 if !should_skip_catchup {
157 check_ddl_changes(
158 boot_ts,
159 persist_client.clone(),
160 environment_id.clone(),
161 deploy_generation,
162 Arc::clone(&catalog_metrics),
163 bootstrap_args.clone(),
164 initial_next_user_item_id,
165 initial_next_replica_id,
166 )
167 .await;
168 }
169
170 let promoted = deployment_state.set_ready_to_promote();
172 info!("announced as ready to promote; waiting for promotion");
173 promoted.await;
174
175 info!("promoted; attempting takeover");
177
178 let openable_adapter_storage = mz_catalog::durable::persist_backed_catalog_state(
192 persist_client.clone(),
193 environment_id.organization_id(),
194 BUILD_INFO.semver_version(),
195 Some(deploy_generation),
196 Arc::clone(&catalog_metrics),
197 )
198 .await
199 .expect("incompatible catalog/persist version");
200
201 let (_catalog, _audit_logs) = openable_adapter_storage
202 .open(boot_ts, &bootstrap_args)
203 .await
204 .unwrap_or_terminate("unexpected error while fencing out old deployment");
205
206 halt!("fenced out old deployment; rebooting as leader")
208 });
209
210 Ok(PreflightOutput {
211 openable_adapter_storage,
212 read_only: true,
213 caught_up_trigger: Some(caught_up_trigger),
214 })
215 } else if catalog_generation == deploy_generation {
216 info!("this deployment is the current generation; booting with writes allowed");
217 Ok(PreflightOutput {
218 openable_adapter_storage,
219 read_only: false,
220 caught_up_trigger: None,
221 })
222 } else {
223 exit!(0, "this deployment has been fenced out");
224 }
225}
226
227async fn check_ddl_changes(
231 boot_ts: Timestamp,
232 persist_client: PersistClient,
233 environment_id: EnvironmentId,
234 deploy_generation: u64,
235 catalog_metrics: Arc<Metrics>,
236 bootstrap_args: BootstrapArgs,
237 initial_next_user_item_id: u64,
238 initial_next_replica_id: u64,
239) {
240 let (next_user_item_id, next_replica_id) = get_next_ids(
241 boot_ts,
242 persist_client.clone(),
243 environment_id.clone(),
244 deploy_generation,
245 Arc::clone(&catalog_metrics),
246 bootstrap_args.clone(),
247 )
248 .await;
249
250 tracing::info!(
251 %initial_next_user_item_id,
252 %initial_next_replica_id,
253 %next_user_item_id,
254 %next_replica_id,
255 "checking if there was any relevant DDL");
256
257 if next_user_item_id > initial_next_user_item_id || next_replica_id > initial_next_replica_id {
258 halt!("there have been DDL that we need to react to; rebooting in read-only mode")
259 }
260}
261
262async fn get_next_ids(
265 boot_ts: Timestamp,
266 persist_client: PersistClient,
267 environment_id: EnvironmentId,
268 deploy_generation: u64,
269 catalog_metrics: Arc<Metrics>,
270 bootstrap_args: BootstrapArgs,
271) -> (u64, u64) {
272 let openable_adapter_storage = mz_catalog::durable::persist_backed_catalog_state(
273 persist_client,
274 environment_id.organization_id(),
275 BUILD_INFO.semver_version(),
276 Some(deploy_generation),
277 catalog_metrics,
278 )
279 .await
280 .expect("incompatible catalog/persist version");
281
282 let (mut catalog, _audit_logs) = openable_adapter_storage
283 .open_savepoint(boot_ts, &bootstrap_args)
284 .await
285 .unwrap_or_terminate("can open in savepoint mode");
286
287 let next_user_item_id = catalog
288 .get_next_user_item_id()
289 .await
290 .expect("can access catalog");
291 let next_replica_item_id = catalog
292 .get_next_user_replica_id()
293 .await
294 .expect("can access catalog");
295
296 (next_user_item_id, next_replica_item_id)
297}