mz_environmentd/deployment/
state.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Deployment state handling.
11
12use std::future::Future;
13use std::sync::{Arc, Mutex};
14
15use mz_orchestratord::controller::materialize::environmentd::DeploymentStatus;
16use mz_ore::channel::trigger::{self, Trigger};
17
18enum DeploymentStateInner {
19    Initializing,
20    CatchingUp { _skip_trigger: Option<Trigger> },
21    ReadyToPromote { _promote_trigger: Trigger },
22    Promoting,
23    IsLeader,
24}
25
26/// The state of an environment deployment.
27///
28/// This object should be held by the `environmentd` server. It provides methods
29/// to handle state transitions that should be driven by the server itself.
30///
31/// A deployment begins in the `Initializing` state.
32///
33/// If, during initialization, the server realizes that it is taking over from a
34/// failed `environmentd` process of a generation that is already the leader,
35/// the server may proceed directly to the `IsLeader` state, via
36/// [`DeploymentState::set_is_leader`].
37///
38/// Otherwise, the server should leave the deployment state in `Initializing`
39/// while performing initialization activities. Once the server is catching up
40/// its workloads, it should proceeded to the `CatchingUp` state. Once the
41/// environment is ready to take over from the prior generation, the server
42/// should call [`DeploymentState::set_ready_to_promote`]. After this, the
43/// server should *not* call [`DeploymentState::set_is_leader`], as an external
44/// orchestrator will determine when promotion occurs. The future returned by
45/// `set_ready_to_promote` will resolve when promotion has occurred and the
46/// deployment should take over from the prior generation and begin serving
47/// queries.
48#[derive(Clone)]
49pub struct DeploymentState {
50    inner: Arc<Mutex<DeploymentStateInner>>,
51}
52
53impl DeploymentState {
54    /// Creates a new `LeaderState` for a deployment.
55    ///
56    /// Returns the state and a handle to the state.
57    pub fn new() -> (DeploymentState, DeploymentStateHandle) {
58        let inner = Arc::new(Mutex::new(DeploymentStateInner::Initializing));
59        let state = DeploymentState {
60            inner: Arc::clone(&inner),
61        };
62        let handle = DeploymentStateHandle { inner };
63        (state, handle)
64    }
65
66    /// Marks the deployment as catching up.
67    ///
68    /// Returns a future that resolves if the catch up phase should be skipped.
69    pub fn set_catching_up(&self) -> impl Future<Output = ()> {
70        let (skip_trigger, skip_rx) = trigger::channel();
71        {
72            let mut inner = self.inner.lock().expect("lock poisoned");
73            assert!(
74                matches!(*inner, DeploymentStateInner::Initializing),
75                "LeaderState::set_catching_up called on non-initializing state",
76            );
77            *inner = DeploymentStateInner::CatchingUp {
78                _skip_trigger: Some(skip_trigger),
79            };
80        }
81        skip_rx
82    }
83
84    /// Marks the deployment as ready to be promoted to leader.
85    ///
86    /// Returns a future that resolves when the leadership promotion occurs.
87    /// When the function returns, the state will be `ReadyToPromote`. When the
88    /// returned future resolves, the state will be `Promoting`.
89    ///
90    /// Panics if the leader state is not `Initializing`.
91    pub fn set_ready_to_promote(&self) -> impl Future<Output = ()> {
92        let (promote_trigger, promote_trigger_rx) = trigger::channel();
93        {
94            let mut inner = self.inner.lock().expect("lock poisoned");
95            assert!(
96                matches!(
97                    *inner,
98                    DeploymentStateInner::Initializing | DeploymentStateInner::CatchingUp { .. }
99                ),
100                "LeaderState::set_ready_to_promote called on invalid state",
101            );
102            *inner = DeploymentStateInner::ReadyToPromote {
103                _promote_trigger: promote_trigger,
104            };
105        }
106        promote_trigger_rx
107    }
108
109    /// Marks the deployment as the leader.
110    ///
111    /// Panics if the leader state is not `Initializing` or `Promoting`.
112    pub fn set_is_leader(&self) {
113        let mut inner = self.inner.lock().expect("lock poisoned");
114        assert!(
115            matches!(
116                *inner,
117                DeploymentStateInner::Initializing | DeploymentStateInner::Promoting
118            ),
119            "LeaderState::set_is_leader called on non-initializing state",
120        );
121        *inner = DeploymentStateInner::IsLeader;
122    }
123}
124
125/// A cloneable handle to a [`DeploymentState`].
126///
127/// This should be held by modules providing external interfaces to
128/// `environmentd` (e.g., the HTTP server). It provides methods to inspect the
129/// current leadership state, and to promote the deployment to the leader if it
130/// is ready to do so.
131#[derive(Clone)]
132pub struct DeploymentStateHandle {
133    inner: Arc<Mutex<DeploymentStateInner>>,
134}
135
136impl DeploymentStateHandle {
137    /// Returns the current deployment status.
138    pub fn status(&self) -> DeploymentStatus {
139        let inner = self.inner.lock().expect("lock poisoned");
140        match *inner {
141            DeploymentStateInner::Initializing => DeploymentStatus::Initializing,
142            DeploymentStateInner::CatchingUp { .. } => DeploymentStatus::Initializing,
143            DeploymentStateInner::ReadyToPromote { .. } => DeploymentStatus::ReadyToPromote,
144            DeploymentStateInner::Promoting => DeploymentStatus::Promoting,
145            DeploymentStateInner::IsLeader => DeploymentStatus::IsLeader,
146        }
147    }
148
149    /// Attempts to skip the catchup phase for the deployment.
150    ///
151    /// Deployments in the `Initializing` phase cannot have their catchup phase
152    /// skipped. Deployments in the `ReadyToPromote`, `Promoting`, and
153    /// `IsLeader` states can be promoted (with the latter two cases being
154    /// no-ops).
155    ///
156    /// If skipping the catchup was successful, returns `Ok`. Otherwise, returns
157    /// `Err`.
158    pub fn try_skip_catchup(&self) -> Result<(), ()> {
159        let mut inner = self.inner.lock().expect("lock poisoned");
160        match &mut *inner {
161            DeploymentStateInner::Initializing => Err(()),
162            DeploymentStateInner::CatchingUp { _skip_trigger } => {
163                *_skip_trigger = None;
164                Ok(())
165            }
166            DeploymentStateInner::ReadyToPromote { .. } => Ok(()),
167            DeploymentStateInner::Promoting => Ok(()),
168            DeploymentStateInner::IsLeader => Ok(()),
169        }
170    }
171
172    /// Attempts to promote this deployment to the leader.
173    ///
174    /// Deployments in the `Initializing` or `CatchingUp` state cannot be
175    /// promoted. Deployments in the `ReadyToPromote`, `Promoting`, and
176    /// `IsLeader` states can be promoted (with the latter two cases being
177    /// no-ops).
178    ///
179    /// If the leader was successfully promoted, returns `Ok`. Otherwise,
180    /// returns `Err`.
181    pub fn try_promote(&self) -> Result<(), ()> {
182        let mut inner = self.inner.lock().expect("lock poisoned");
183        match *inner {
184            DeploymentStateInner::Initializing => Err(()),
185            DeploymentStateInner::CatchingUp { .. } => Err(()),
186            DeploymentStateInner::ReadyToPromote { .. } => {
187                *inner = DeploymentStateInner::Promoting;
188                Ok(())
189            }
190            DeploymentStateInner::Promoting => Ok(()),
191            DeploymentStateInner::IsLeader => Ok(()),
192        }
193    }
194}