mz_environmentd/deployment/
preflight.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Preflight checks for deployments.
11
12use std::pin::pin;
13use std::sync::Arc;
14use std::time::Duration;
15
16use mz_adapter::ResultExt;
17use mz_catalog::durable::{BootstrapArgs, CatalogError, Metrics, OpenableDurableCatalogState};
18use mz_ore::channel::trigger;
19use mz_ore::exit;
20use mz_ore::halt;
21use mz_persist_client::PersistClient;
22use mz_repr::Timestamp;
23use mz_sql::catalog::EnvironmentId;
24use tracing::info;
25
26use crate::BUILD_INFO;
27use crate::deployment::state::DeploymentState;
28
29/// The necessary input for preflight checks.
30pub struct PreflightInput {
31    pub boot_ts: Timestamp,
32    pub environment_id: EnvironmentId,
33    pub persist_client: PersistClient,
34    pub deploy_generation: u64,
35    pub deployment_state: DeploymentState,
36    pub openable_adapter_storage: Box<dyn OpenableDurableCatalogState>,
37    pub catalog_metrics: Arc<Metrics>,
38    pub caught_up_max_wait: Duration,
39    pub ddl_check_interval: Duration,
40    pub panic_after_timeout: bool,
41    pub bootstrap_args: BootstrapArgs,
42}
43
44/// Output of preflight checks.
45pub struct PreflightOutput {
46    pub openable_adapter_storage: Box<dyn OpenableDurableCatalogState>,
47    pub read_only: bool,
48    pub caught_up_trigger: Option<trigger::Trigger>,
49}
50
51/// Perform a legacy (non-0dt) preflight check.
52pub async fn preflight_legacy(
53    PreflightInput {
54        boot_ts,
55        environment_id,
56        persist_client,
57        deploy_generation,
58        deployment_state,
59        mut openable_adapter_storage,
60        catalog_metrics,
61        bootstrap_args,
62        caught_up_max_wait: _,
63        ddl_check_interval: _,
64        panic_after_timeout: _,
65    }: PreflightInput,
66) -> Result<Box<dyn OpenableDurableCatalogState>, CatalogError> {
67    tracing::info!("Requested deploy generation {deploy_generation}");
68
69    if !openable_adapter_storage.is_initialized().await? {
70        tracing::info!(
71            "Catalog storage doesn't exist so there's no current deploy generation. We won't wait to be leader"
72        );
73        return Ok(openable_adapter_storage);
74    }
75    let catalog_generation = openable_adapter_storage.get_deployment_generation().await?;
76    tracing::info!("Found catalog generation {catalog_generation:?}");
77    if catalog_generation < deploy_generation {
78        tracing::info!(
79            "Catalog generation {catalog_generation:?} is less than deploy generation {deploy_generation}. Performing pre-flight checks"
80        );
81        match openable_adapter_storage
82            .open_savepoint(boot_ts.clone(), &bootstrap_args)
83            .await
84        {
85            Ok((adapter_storage, _)) => Box::new(adapter_storage).expire().await,
86            Err(CatalogError::Durable(e)) if e.can_recover_with_write_mode() => {
87                // This is theoretically possible if catalog implementation A is
88                // initialized, implementation B is uninitialized, and we are going to
89                // migrate from A to B. The current code avoids this by always
90                // initializing all implementations, regardless of the target
91                // implementation. Still it's easy to protect against this and worth it in
92                // case things change in the future.
93                tracing::warn!(
94                    "Unable to perform upgrade test because the target implementation is uninitialized"
95                );
96                return Ok(mz_catalog::durable::persist_backed_catalog_state(
97                    persist_client,
98                    environment_id.organization_id(),
99                    BUILD_INFO.semver_version(),
100                    Some(deploy_generation),
101                    Arc::clone(&catalog_metrics),
102                )
103                .await?);
104            }
105            Err(e) => {
106                tracing::warn!(error = %e, "catalog upgrade would have failed");
107                return Err(e);
108            }
109        }
110
111        let promoted = deployment_state.set_ready_to_promote();
112
113        tracing::info!("Waiting for user to promote this envd to leader");
114        promoted.await;
115
116        Ok(mz_catalog::durable::persist_backed_catalog_state(
117            persist_client,
118            environment_id.organization_id(),
119            BUILD_INFO.semver_version(),
120            Some(deploy_generation),
121            Arc::clone(&catalog_metrics),
122        )
123        .await?)
124    } else if catalog_generation == deploy_generation {
125        tracing::info!(
126            "Server requested generation {deploy_generation} which is equal to catalog's generation"
127        );
128        Ok(openable_adapter_storage)
129    } else {
130        mz_ore::halt!(
131            "Server started with requested generation {deploy_generation} but catalog was already at {catalog_generation:?}. Deploy generations must increase monotonically"
132        );
133    }
134}
135
136/// Perform a 0dt preflight check.
137///
138/// Returns the openable adapter storage to use and whether or not to boot in
139/// read only mode.
140pub async fn preflight_0dt(
141    PreflightInput {
142        boot_ts,
143        environment_id,
144        persist_client,
145        deploy_generation,
146        deployment_state,
147        mut openable_adapter_storage,
148        catalog_metrics,
149        caught_up_max_wait,
150        ddl_check_interval,
151        panic_after_timeout,
152        bootstrap_args,
153    }: PreflightInput,
154) -> Result<PreflightOutput, CatalogError> {
155    info!(%deploy_generation, ?caught_up_max_wait, "performing 0dt preflight checks");
156
157    if !openable_adapter_storage.is_initialized().await? {
158        info!("catalog not initialized; booting with writes allowed");
159        return Ok(PreflightOutput {
160            openable_adapter_storage,
161            read_only: false,
162            caught_up_trigger: None,
163        });
164    }
165
166    let catalog_generation = openable_adapter_storage.get_deployment_generation().await?;
167    info!(%catalog_generation, "catalog initialized");
168    if catalog_generation < deploy_generation {
169        info!("this deployment is a new generation; booting in read only mode");
170
171        let (caught_up_trigger, mut caught_up_receiver) = trigger::channel();
172
173        // Spawn a background task to handle promotion to leader.
174        mz_ore::task::spawn(|| "preflight_0dt", async move {
175            let (initial_next_user_item_id, initial_next_replica_id) = get_next_ids(
176                boot_ts,
177                persist_client.clone(),
178                environment_id.clone(),
179                deploy_generation,
180                Arc::clone(&catalog_metrics),
181                bootstrap_args.clone(),
182            )
183            .await;
184
185            info!(
186                %initial_next_user_item_id,
187                %initial_next_replica_id,
188                "waiting for deployment to be caught up");
189
190            let caught_up_max_wait_fut = async {
191                tokio::time::sleep(caught_up_max_wait).await;
192                ()
193            };
194            let mut caught_up_max_wait_fut = pin!(caught_up_max_wait_fut);
195
196            let mut skip_catchup = deployment_state.set_catching_up();
197
198            let mut check_ddl_changes_interval = tokio::time::interval(ddl_check_interval);
199            check_ddl_changes_interval
200                .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
201
202            let mut should_skip_catchup = false;
203            loop {
204                tokio::select! {
205                    biased;
206
207                    () = &mut skip_catchup => {
208                        info!("skipping waiting for deployment to catch up due to administrator request");
209                        should_skip_catchup = true;
210                        break;
211                    }
212                    () = &mut caught_up_receiver => {
213                        info!("deployment caught up");
214                        break;
215                    }
216                    () = &mut caught_up_max_wait_fut => {
217                        if panic_after_timeout {
218                            panic!("not caught up within {:?}", caught_up_max_wait);
219                        }
220                        info!("not caught up within {:?}, proceeding now", caught_up_max_wait);
221                        break;
222                    }
223                    _ = check_ddl_changes_interval.tick() => {
224                        check_ddl_changes(
225                            boot_ts,
226                            persist_client.clone(),
227                            environment_id.clone(),
228                            deploy_generation,
229                            Arc::clone(&catalog_metrics),
230                            bootstrap_args.clone(),
231                            initial_next_user_item_id,
232                            initial_next_replica_id,
233                        )
234                        .await;
235                    }
236                }
237            }
238
239            // Check for DDL changes one last time before announcing as ready to
240            // promote.
241            if !should_skip_catchup {
242                check_ddl_changes(
243                    boot_ts,
244                    persist_client.clone(),
245                    environment_id.clone(),
246                    deploy_generation,
247                    Arc::clone(&catalog_metrics),
248                    bootstrap_args.clone(),
249                    initial_next_user_item_id,
250                    initial_next_replica_id,
251                )
252                .await;
253            }
254
255            // Announce that we're ready to promote.
256            let promoted = deployment_state.set_ready_to_promote();
257            info!("announced as ready to promote; waiting for promotion");
258            promoted.await;
259
260            // Take over the catalog.
261            info!("promoted; attempting takeover");
262
263            // NOTE: There _is_ a window where DDL can happen in the old
264            // environment, between checking above, us announcing as ready to
265            // promote, and cloud giving us the go-ahead signal. Its size
266            // depends on how quickly cloud will trigger promotion once we
267            // report as ready.
268            //
269            // We could add another check here, right before cutting over, but I
270            // think this requires changes in Cloud: with this additional check,
271            // it can now happen that cloud gives us the promote signal but we
272            // then notice there were changes and restart. Could would have to
273            // notice this and give us the promote signal again, once we're
274            // ready again.
275
276            let openable_adapter_storage = mz_catalog::durable::persist_backed_catalog_state(
277                persist_client.clone(),
278                environment_id.organization_id(),
279                BUILD_INFO.semver_version(),
280                Some(deploy_generation),
281                Arc::clone(&catalog_metrics),
282            )
283            .await
284            .expect("incompatible catalog/persist version");
285
286            let (_catalog, _audit_logs) = openable_adapter_storage
287                .open(boot_ts, &bootstrap_args)
288                .await
289                .unwrap_or_terminate("unexpected error while fencing out old deployment");
290
291            // Reboot as the leader.
292            halt!("fenced out old deployment; rebooting as leader")
293        });
294
295        Ok(PreflightOutput {
296            openable_adapter_storage,
297            read_only: true,
298            caught_up_trigger: Some(caught_up_trigger),
299        })
300    } else if catalog_generation == deploy_generation {
301        info!("this deployment is the current generation; booting with writes allowed");
302        Ok(PreflightOutput {
303            openable_adapter_storage,
304            read_only: false,
305            caught_up_trigger: None,
306        })
307    } else {
308        exit!(0, "this deployment has been fenced out");
309    }
310}
311
312/// Check if there have been any DDL that create new collections or replicas,
313/// restart in read-only mode if so, in order to pick up those new items and
314/// start hydrating them before cutting over.
315async fn check_ddl_changes(
316    boot_ts: Timestamp,
317    persist_client: PersistClient,
318    environment_id: EnvironmentId,
319    deploy_generation: u64,
320    catalog_metrics: Arc<Metrics>,
321    bootstrap_args: BootstrapArgs,
322    initial_next_user_item_id: u64,
323    initial_next_replica_id: u64,
324) {
325    let (next_user_item_id, next_replica_id) = get_next_ids(
326        boot_ts,
327        persist_client.clone(),
328        environment_id.clone(),
329        deploy_generation,
330        Arc::clone(&catalog_metrics),
331        bootstrap_args.clone(),
332    )
333    .await;
334
335    tracing::info!(
336        %initial_next_user_item_id,
337        %initial_next_replica_id,
338        %next_user_item_id,
339        %next_replica_id,
340        "checking if there was any relevant DDL");
341
342    if next_user_item_id > initial_next_user_item_id || next_replica_id > initial_next_replica_id {
343        halt!("there have been DDL that we need to react to; rebooting in read-only mode")
344    }
345}
346
347/// Gets and returns the next user item ID and user replica ID that would be
348/// allocated as of the current catalog state.
349async fn get_next_ids(
350    boot_ts: Timestamp,
351    persist_client: PersistClient,
352    environment_id: EnvironmentId,
353    deploy_generation: u64,
354    catalog_metrics: Arc<Metrics>,
355    bootstrap_args: BootstrapArgs,
356) -> (u64, u64) {
357    let openable_adapter_storage = mz_catalog::durable::persist_backed_catalog_state(
358        persist_client,
359        environment_id.organization_id(),
360        BUILD_INFO.semver_version(),
361        Some(deploy_generation),
362        catalog_metrics,
363    )
364    .await
365    .expect("incompatible catalog/persist version");
366
367    let (mut catalog, _audit_logs) = openable_adapter_storage
368        .open_savepoint(boot_ts, &bootstrap_args)
369        .await
370        .unwrap_or_terminate("can open in savepoint mode");
371
372    let next_user_item_id = catalog
373        .get_next_user_item_id()
374        .await
375        .expect("can access catalog");
376    let next_replica_item_id = catalog
377        .get_next_user_replica_id()
378        .await
379        .expect("can access catalog");
380
381    (next_user_item_id, next_replica_item_id)
382}