mz_environmentd/deployment/
state.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Deployment state handling.
11
12use std::future::Future;
13use std::sync::{Arc, Mutex};
14
15use mz_orchestratord::controller::materialize::environmentd::DeploymentStatus;
16use mz_ore::channel::trigger::{self, Trigger};
17
18#[derive(Debug)]
19enum DeploymentStateInner {
20    Initializing,
21    CatchingUp { _skip_trigger: Option<Trigger> },
22    ReadyToPromote { _promote_trigger: Trigger },
23    Promoting,
24    IsLeader,
25}
26
27/// The state of an environment deployment.
28///
29/// This object should be held by the `environmentd` server. It provides methods
30/// to handle state transitions that should be driven by the server itself.
31///
32/// A deployment begins in the `Initializing` state.
33///
34/// If, during initialization, the server realizes that it is taking over from a
35/// failed `environmentd` process of a generation that is already the leader,
36/// the server may proceed directly to the `IsLeader` state, via
37/// [`DeploymentState::set_is_leader`].
38///
39/// Otherwise, the server should leave the deployment state in `Initializing`
40/// while performing initialization activities. Once the server is catching up
41/// its workloads, it should proceeded to the `CatchingUp` state. Once the
42/// environment is ready to take over from the prior generation, the server
43/// should call [`DeploymentState::set_ready_to_promote`]. After this, the
44/// server should *not* call [`DeploymentState::set_is_leader`], as an external
45/// orchestrator will determine when promotion occurs. The future returned by
46/// `set_ready_to_promote` will resolve when promotion has occurred and the
47/// deployment should take over from the prior generation and begin serving
48/// queries.
49#[derive(Clone)]
50pub struct DeploymentState {
51    inner: Arc<Mutex<DeploymentStateInner>>,
52}
53
54impl DeploymentState {
55    /// Creates a new `LeaderState` for a deployment.
56    ///
57    /// Returns the state and a handle to the state.
58    pub fn new() -> (DeploymentState, DeploymentStateHandle) {
59        let inner = Arc::new(Mutex::new(DeploymentStateInner::Initializing));
60        let state = DeploymentState {
61            inner: Arc::clone(&inner),
62        };
63        let handle = DeploymentStateHandle { inner };
64        (state, handle)
65    }
66
67    /// Marks the deployment as catching up.
68    ///
69    /// Returns a future that resolves if the catch up phase should be skipped.
70    pub fn set_catching_up(&self) -> impl Future<Output = ()> {
71        let (skip_trigger, skip_rx) = trigger::channel();
72        {
73            let mut inner = self.inner.lock().expect("lock poisoned");
74            assert!(
75                matches!(*inner, DeploymentStateInner::Initializing),
76                "LeaderState::set_catching_up called on non-initializing state",
77            );
78            *inner = DeploymentStateInner::CatchingUp {
79                _skip_trigger: Some(skip_trigger),
80            };
81        }
82        skip_rx
83    }
84
85    /// Marks the deployment as ready to be promoted to leader.
86    ///
87    /// Returns a future that resolves when the leadership promotion occurs.
88    /// When the function returns, the state will be `ReadyToPromote`. When the
89    /// returned future resolves, the state will be `Promoting`.
90    ///
91    /// Panics if the leader state is not `Initializing`.
92    pub fn set_ready_to_promote(&self) -> impl Future<Output = ()> {
93        let (promote_trigger, promote_trigger_rx) = trigger::channel();
94        {
95            let mut inner = self.inner.lock().expect("lock poisoned");
96            assert!(
97                matches!(
98                    *inner,
99                    DeploymentStateInner::Initializing | DeploymentStateInner::CatchingUp { .. }
100                ),
101                "LeaderState::set_ready_to_promote called on invalid state",
102            );
103            *inner = DeploymentStateInner::ReadyToPromote {
104                _promote_trigger: promote_trigger,
105            };
106        }
107        promote_trigger_rx
108    }
109
110    /// Marks the deployment as the leader.
111    ///
112    /// Panics if the leader state is not `Initializing` or `Promoting`.
113    pub fn set_is_leader(&self) {
114        let mut inner = self.inner.lock().expect("lock poisoned");
115        assert!(
116            matches!(
117                *inner,
118                DeploymentStateInner::Initializing | DeploymentStateInner::Promoting
119            ),
120            "LeaderState::set_is_leader called on non-initializing state",
121        );
122        *inner = DeploymentStateInner::IsLeader;
123    }
124}
125
126/// A cloneable handle to a [`DeploymentState`].
127///
128/// This should be held by modules providing external interfaces to
129/// `environmentd` (e.g., the HTTP server). It provides methods to inspect the
130/// current leadership state, and to promote the deployment to the leader if it
131/// is ready to do so.
132#[derive(Debug, Clone)]
133pub struct DeploymentStateHandle {
134    inner: Arc<Mutex<DeploymentStateInner>>,
135}
136
137impl DeploymentStateHandle {
138    /// Returns the current deployment status.
139    pub fn status(&self) -> DeploymentStatus {
140        let inner = self.inner.lock().expect("lock poisoned");
141        match *inner {
142            DeploymentStateInner::Initializing => DeploymentStatus::Initializing,
143            DeploymentStateInner::CatchingUp { .. } => DeploymentStatus::Initializing,
144            DeploymentStateInner::ReadyToPromote { .. } => DeploymentStatus::ReadyToPromote,
145            DeploymentStateInner::Promoting => DeploymentStatus::Promoting,
146            DeploymentStateInner::IsLeader => DeploymentStatus::IsLeader,
147        }
148    }
149
150    /// Attempts to skip the catchup phase for the deployment.
151    ///
152    /// Deployments in the `Initializing` phase cannot have their catchup phase
153    /// skipped. Deployments in the `ReadyToPromote`, `Promoting`, and
154    /// `IsLeader` states can be promoted (with the latter two cases being
155    /// no-ops).
156    ///
157    /// If skipping the catchup was successful, returns `Ok`. Otherwise, returns
158    /// `Err`.
159    pub fn try_skip_catchup(&self) -> Result<(), ()> {
160        let mut inner = self.inner.lock().expect("lock poisoned");
161        match &mut *inner {
162            DeploymentStateInner::Initializing => Err(()),
163            DeploymentStateInner::CatchingUp { _skip_trigger } => {
164                *_skip_trigger = None;
165                Ok(())
166            }
167            DeploymentStateInner::ReadyToPromote { .. } => Ok(()),
168            DeploymentStateInner::Promoting => Ok(()),
169            DeploymentStateInner::IsLeader => Ok(()),
170        }
171    }
172
173    /// Attempts to promote this deployment to the leader.
174    ///
175    /// Deployments in the `Initializing` or `CatchingUp` state cannot be
176    /// promoted. Deployments in the `ReadyToPromote`, `Promoting`, and
177    /// `IsLeader` states can be promoted (with the latter two cases being
178    /// no-ops).
179    ///
180    /// If the leader was successfully promoted, returns `Ok`. Otherwise,
181    /// returns `Err`.
182    pub fn try_promote(&self) -> Result<(), ()> {
183        let mut inner = self.inner.lock().expect("lock poisoned");
184        match *inner {
185            DeploymentStateInner::Initializing => Err(()),
186            DeploymentStateInner::CatchingUp { .. } => Err(()),
187            DeploymentStateInner::ReadyToPromote { .. } => {
188                *inner = DeploymentStateInner::Promoting;
189                Ok(())
190            }
191            DeploymentStateInner::Promoting => Ok(()),
192            DeploymentStateInner::IsLeader => Ok(()),
193        }
194    }
195}