mz_environmentd/deployment/state.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Deployment state handling.
11
12use std::future::Future;
13use std::sync::{Arc, Mutex};
14
15use mz_orchestratord::controller::materialize::environmentd::DeploymentStatus;
16use mz_ore::channel::trigger::{self, Trigger};
17
18#[derive(Debug)]
19enum DeploymentStateInner {
20 Initializing,
21 CatchingUp { _skip_trigger: Option<Trigger> },
22 ReadyToPromote { _promote_trigger: Trigger },
23 Promoting,
24 IsLeader,
25}
26
27/// The state of an environment deployment.
28///
29/// This object should be held by the `environmentd` server. It provides methods
30/// to handle state transitions that should be driven by the server itself.
31///
32/// A deployment begins in the `Initializing` state.
33///
34/// If, during initialization, the server realizes that it is taking over from a
35/// failed `environmentd` process of a generation that is already the leader,
36/// the server may proceed directly to the `IsLeader` state, via
37/// [`DeploymentState::set_is_leader`].
38///
39/// Otherwise, the server should leave the deployment state in `Initializing`
40/// while performing initialization activities. Once the server is catching up
41/// its workloads, it should proceeded to the `CatchingUp` state. Once the
42/// environment is ready to take over from the prior generation, the server
43/// should call [`DeploymentState::set_ready_to_promote`]. After this, the
44/// server should *not* call [`DeploymentState::set_is_leader`], as an external
45/// orchestrator will determine when promotion occurs. The future returned by
46/// `set_ready_to_promote` will resolve when promotion has occurred and the
47/// deployment should take over from the prior generation and begin serving
48/// queries.
49#[derive(Clone)]
50pub struct DeploymentState {
51 inner: Arc<Mutex<DeploymentStateInner>>,
52}
53
54impl DeploymentState {
55 /// Creates a new `LeaderState` for a deployment.
56 ///
57 /// Returns the state and a handle to the state.
58 pub fn new() -> (DeploymentState, DeploymentStateHandle) {
59 let inner = Arc::new(Mutex::new(DeploymentStateInner::Initializing));
60 let state = DeploymentState {
61 inner: Arc::clone(&inner),
62 };
63 let handle = DeploymentStateHandle { inner };
64 (state, handle)
65 }
66
67 /// Marks the deployment as catching up.
68 ///
69 /// Returns a future that resolves if the catch up phase should be skipped.
70 pub fn set_catching_up(&self) -> impl Future<Output = ()> {
71 let (skip_trigger, skip_rx) = trigger::channel();
72 {
73 let mut inner = self.inner.lock().expect("lock poisoned");
74 assert!(
75 matches!(*inner, DeploymentStateInner::Initializing),
76 "LeaderState::set_catching_up called on non-initializing state",
77 );
78 *inner = DeploymentStateInner::CatchingUp {
79 _skip_trigger: Some(skip_trigger),
80 };
81 }
82 skip_rx
83 }
84
85 /// Marks the deployment as ready to be promoted to leader.
86 ///
87 /// Returns a future that resolves when the leadership promotion occurs.
88 /// When the function returns, the state will be `ReadyToPromote`. When the
89 /// returned future resolves, the state will be `Promoting`.
90 ///
91 /// Panics if the leader state is not `Initializing`.
92 pub fn set_ready_to_promote(&self) -> impl Future<Output = ()> {
93 let (promote_trigger, promote_trigger_rx) = trigger::channel();
94 {
95 let mut inner = self.inner.lock().expect("lock poisoned");
96 assert!(
97 matches!(
98 *inner,
99 DeploymentStateInner::Initializing | DeploymentStateInner::CatchingUp { .. }
100 ),
101 "LeaderState::set_ready_to_promote called on invalid state",
102 );
103 *inner = DeploymentStateInner::ReadyToPromote {
104 _promote_trigger: promote_trigger,
105 };
106 }
107 promote_trigger_rx
108 }
109
110 /// Marks the deployment as the leader.
111 ///
112 /// Panics if the leader state is not `Initializing` or `Promoting`.
113 pub fn set_is_leader(&self) {
114 let mut inner = self.inner.lock().expect("lock poisoned");
115 assert!(
116 matches!(
117 *inner,
118 DeploymentStateInner::Initializing | DeploymentStateInner::Promoting
119 ),
120 "LeaderState::set_is_leader called on non-initializing state",
121 );
122 *inner = DeploymentStateInner::IsLeader;
123 }
124}
125
126/// A cloneable handle to a [`DeploymentState`].
127///
128/// This should be held by modules providing external interfaces to
129/// `environmentd` (e.g., the HTTP server). It provides methods to inspect the
130/// current leadership state, and to promote the deployment to the leader if it
131/// is ready to do so.
132#[derive(Debug, Clone)]
133pub struct DeploymentStateHandle {
134 inner: Arc<Mutex<DeploymentStateInner>>,
135}
136
137impl DeploymentStateHandle {
138 /// Returns the current deployment status.
139 pub fn status(&self) -> DeploymentStatus {
140 let inner = self.inner.lock().expect("lock poisoned");
141 match *inner {
142 DeploymentStateInner::Initializing => DeploymentStatus::Initializing,
143 DeploymentStateInner::CatchingUp { .. } => DeploymentStatus::Initializing,
144 DeploymentStateInner::ReadyToPromote { .. } => DeploymentStatus::ReadyToPromote,
145 DeploymentStateInner::Promoting => DeploymentStatus::Promoting,
146 DeploymentStateInner::IsLeader => DeploymentStatus::IsLeader,
147 }
148 }
149
150 /// Attempts to skip the catchup phase for the deployment.
151 ///
152 /// Deployments in the `Initializing` phase cannot have their catchup phase
153 /// skipped. Deployments in the `ReadyToPromote`, `Promoting`, and
154 /// `IsLeader` states can be promoted (with the latter two cases being
155 /// no-ops).
156 ///
157 /// If skipping the catchup was successful, returns `Ok`. Otherwise, returns
158 /// `Err`.
159 pub fn try_skip_catchup(&self) -> Result<(), ()> {
160 let mut inner = self.inner.lock().expect("lock poisoned");
161 match &mut *inner {
162 DeploymentStateInner::Initializing => Err(()),
163 DeploymentStateInner::CatchingUp { _skip_trigger } => {
164 *_skip_trigger = None;
165 Ok(())
166 }
167 DeploymentStateInner::ReadyToPromote { .. } => Ok(()),
168 DeploymentStateInner::Promoting => Ok(()),
169 DeploymentStateInner::IsLeader => Ok(()),
170 }
171 }
172
173 /// Attempts to promote this deployment to the leader.
174 ///
175 /// Deployments in the `Initializing` or `CatchingUp` state cannot be
176 /// promoted. Deployments in the `ReadyToPromote`, `Promoting`, and
177 /// `IsLeader` states can be promoted (with the latter two cases being
178 /// no-ops).
179 ///
180 /// If the leader was successfully promoted, returns `Ok`. Otherwise,
181 /// returns `Err`.
182 pub fn try_promote(&self) -> Result<(), ()> {
183 let mut inner = self.inner.lock().expect("lock poisoned");
184 match *inner {
185 DeploymentStateInner::Initializing => Err(()),
186 DeploymentStateInner::CatchingUp { .. } => Err(()),
187 DeploymentStateInner::ReadyToPromote { .. } => {
188 *inner = DeploymentStateInner::Promoting;
189 Ok(())
190 }
191 DeploymentStateInner::Promoting => Ok(()),
192 DeploymentStateInner::IsLeader => Ok(()),
193 }
194 }
195}