mz_environmentd/deployment/state.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Deployment state handling.
11
12use std::future::Future;
13use std::sync::{Arc, Mutex};
14
15use mz_orchestratord::controller::materialize::environmentd::DeploymentStatus;
16use mz_ore::channel::trigger::{self, Trigger};
17
18enum DeploymentStateInner {
19 Initializing,
20 CatchingUp { _skip_trigger: Option<Trigger> },
21 ReadyToPromote { _promote_trigger: Trigger },
22 Promoting,
23 IsLeader,
24}
25
26/// The state of an environment deployment.
27///
28/// This object should be held by the `environmentd` server. It provides methods
29/// to handle state transitions that should be driven by the server itself.
30///
31/// A deployment begins in the `Initializing` state.
32///
33/// If, during initialization, the server realizes that it is taking over from a
34/// failed `environmentd` process of a generation that is already the leader,
35/// the server may proceed directly to the `IsLeader` state, via
36/// [`DeploymentState::set_is_leader`].
37///
38/// Otherwise, the server should leave the deployment state in `Initializing`
39/// while performing initialization activities. Once the server is catching up
40/// its workloads, it should proceeded to the `CatchingUp` state. Once the
41/// environment is ready to take over from the prior generation, the server
42/// should call [`DeploymentState::set_ready_to_promote`]. After this, the
43/// server should *not* call [`DeploymentState::set_is_leader`], as an external
44/// orchestrator will determine when promotion occurs. The future returned by
45/// `set_ready_to_promote` will resolve when promotion has occurred and the
46/// deployment should take over from the prior generation and begin serving
47/// queries.
48#[derive(Clone)]
49pub struct DeploymentState {
50 inner: Arc<Mutex<DeploymentStateInner>>,
51}
52
53impl DeploymentState {
54 /// Creates a new `LeaderState` for a deployment.
55 ///
56 /// Returns the state and a handle to the state.
57 pub fn new() -> (DeploymentState, DeploymentStateHandle) {
58 let inner = Arc::new(Mutex::new(DeploymentStateInner::Initializing));
59 let state = DeploymentState {
60 inner: Arc::clone(&inner),
61 };
62 let handle = DeploymentStateHandle { inner };
63 (state, handle)
64 }
65
66 /// Marks the deployment as catching up.
67 ///
68 /// Returns a future that resolves if the catch up phase should be skipped.
69 pub fn set_catching_up(&self) -> impl Future<Output = ()> {
70 let (skip_trigger, skip_rx) = trigger::channel();
71 {
72 let mut inner = self.inner.lock().expect("lock poisoned");
73 assert!(
74 matches!(*inner, DeploymentStateInner::Initializing),
75 "LeaderState::set_catching_up called on non-initializing state",
76 );
77 *inner = DeploymentStateInner::CatchingUp {
78 _skip_trigger: Some(skip_trigger),
79 };
80 }
81 skip_rx
82 }
83
84 /// Marks the deployment as ready to be promoted to leader.
85 ///
86 /// Returns a future that resolves when the leadership promotion occurs.
87 /// When the function returns, the state will be `ReadyToPromote`. When the
88 /// returned future resolves, the state will be `Promoting`.
89 ///
90 /// Panics if the leader state is not `Initializing`.
91 pub fn set_ready_to_promote(&self) -> impl Future<Output = ()> {
92 let (promote_trigger, promote_trigger_rx) = trigger::channel();
93 {
94 let mut inner = self.inner.lock().expect("lock poisoned");
95 assert!(
96 matches!(
97 *inner,
98 DeploymentStateInner::Initializing | DeploymentStateInner::CatchingUp { .. }
99 ),
100 "LeaderState::set_ready_to_promote called on invalid state",
101 );
102 *inner = DeploymentStateInner::ReadyToPromote {
103 _promote_trigger: promote_trigger,
104 };
105 }
106 promote_trigger_rx
107 }
108
109 /// Marks the deployment as the leader.
110 ///
111 /// Panics if the leader state is not `Initializing` or `Promoting`.
112 pub fn set_is_leader(&self) {
113 let mut inner = self.inner.lock().expect("lock poisoned");
114 assert!(
115 matches!(
116 *inner,
117 DeploymentStateInner::Initializing | DeploymentStateInner::Promoting
118 ),
119 "LeaderState::set_is_leader called on non-initializing state",
120 );
121 *inner = DeploymentStateInner::IsLeader;
122 }
123}
124
125/// A cloneable handle to a [`DeploymentState`].
126///
127/// This should be held by modules providing external interfaces to
128/// `environmentd` (e.g., the HTTP server). It provides methods to inspect the
129/// current leadership state, and to promote the deployment to the leader if it
130/// is ready to do so.
131#[derive(Clone)]
132pub struct DeploymentStateHandle {
133 inner: Arc<Mutex<DeploymentStateInner>>,
134}
135
136impl DeploymentStateHandle {
137 /// Returns the current deployment status.
138 pub fn status(&self) -> DeploymentStatus {
139 let inner = self.inner.lock().expect("lock poisoned");
140 match *inner {
141 DeploymentStateInner::Initializing => DeploymentStatus::Initializing,
142 DeploymentStateInner::CatchingUp { .. } => DeploymentStatus::Initializing,
143 DeploymentStateInner::ReadyToPromote { .. } => DeploymentStatus::ReadyToPromote,
144 DeploymentStateInner::Promoting => DeploymentStatus::Promoting,
145 DeploymentStateInner::IsLeader => DeploymentStatus::IsLeader,
146 }
147 }
148
149 /// Attempts to skip the catchup phase for the deployment.
150 ///
151 /// Deployments in the `Initializing` phase cannot have their catchup phase
152 /// skipped. Deployments in the `ReadyToPromote`, `Promoting`, and
153 /// `IsLeader` states can be promoted (with the latter two cases being
154 /// no-ops).
155 ///
156 /// If skipping the catchup was successful, returns `Ok`. Otherwise, returns
157 /// `Err`.
158 pub fn try_skip_catchup(&self) -> Result<(), ()> {
159 let mut inner = self.inner.lock().expect("lock poisoned");
160 match &mut *inner {
161 DeploymentStateInner::Initializing => Err(()),
162 DeploymentStateInner::CatchingUp { _skip_trigger } => {
163 *_skip_trigger = None;
164 Ok(())
165 }
166 DeploymentStateInner::ReadyToPromote { .. } => Ok(()),
167 DeploymentStateInner::Promoting => Ok(()),
168 DeploymentStateInner::IsLeader => Ok(()),
169 }
170 }
171
172 /// Attempts to promote this deployment to the leader.
173 ///
174 /// Deployments in the `Initializing` or `CatchingUp` state cannot be
175 /// promoted. Deployments in the `ReadyToPromote`, `Promoting`, and
176 /// `IsLeader` states can be promoted (with the latter two cases being
177 /// no-ops).
178 ///
179 /// If the leader was successfully promoted, returns `Ok`. Otherwise,
180 /// returns `Err`.
181 pub fn try_promote(&self) -> Result<(), ()> {
182 let mut inner = self.inner.lock().expect("lock poisoned");
183 match *inner {
184 DeploymentStateInner::Initializing => Err(()),
185 DeploymentStateInner::CatchingUp { .. } => Err(()),
186 DeploymentStateInner::ReadyToPromote { .. } => {
187 *inner = DeploymentStateInner::Promoting;
188 Ok(())
189 }
190 DeploymentStateInner::Promoting => Ok(()),
191 DeploymentStateInner::IsLeader => Ok(()),
192 }
193 }
194}