mz_compute_client/controller/sequential_hydration.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Sequential dataflow hydration support for replicas.
11//!
12//! Sequential hydration enforces a configurable "hydration concurrency" that limits how many
13//! dataflows may be hydrating at the same time. Limiting hydrating concurrency can be beneficial
14//! in reducing peak memory usage, cross-dataflow thrashing, and hydration time.
15//!
16//! The configured hydration concurrency is enforced by delaying the delivery of `Schedule` compute
17//! commands to the replica. Those commands are emitted by the controller for collections that
18//! become ready to hydrate (based on availability of input data) and are directly applied by
19//! replicas by unsuspending the corresponding dataflows. Delaying `Schedule` commands allows us to
20//! ensure only a limited number of dataflows can hydrate at the same time.
21//!
22//! Note that a dataflow may export multiple collections. `Schedule` commands are produced per
23//! collection but hydration is a dataflow-level mechanism. In practice Materialize today only
24//! produces dataflow with a single export and we rely on this assumption here to simplify the
25//! implementation. If the assumption ever ceases to hold, we will need to adjust the code in this
26//! module.
27//!
28//! Sequential hydration is enforeced by a `SequentialHydration` client that sits between the
29//! controller and the `PartitionedState` client that splits commands across replica processes.
30//! This location is important:
31//!
32//! * It needs to be behind the controller since hydration is a per-replica mechanism. Different
33//! replicas can progress through hydration at different paces.
34//! * It needs to be before the `PartitionedState` client because all replica workers must see
35//! `Schedule` commands in the same order. Otherwise we risk getting stuck when different
36//! workers hydrate different dataflows and wait on each other for progress in these dataflows.
37//! * It also needs to be before the `PartitionedState` client because it needs to be able to
38//! observe all compute commands. Clients behind `PartitionedState` are not guaranteed to do so,
39//! since commands are only forwarded to the first process.
40
41use std::collections::{BTreeMap, VecDeque};
42use std::sync::Arc;
43
44use async_trait::async_trait;
45use mz_compute_types::dyncfgs::HYDRATION_CONCURRENCY;
46use mz_dyncfg::ConfigSet;
47use mz_ore::cast::CastFrom;
48use mz_ore::collections::CollectionExt;
49use mz_ore::soft_assert_eq_or_log;
50use mz_ore::task::AbortOnDropHandle;
51use mz_repr::GlobalId;
52use mz_service::client::GenericClient;
53use timely::PartialOrder;
54use timely::progress::Antichain;
55use tokio::sync::mpsc;
56use tracing::debug;
57
58use crate::controller::ComputeControllerTimestamp;
59use crate::metrics::ReplicaMetrics;
60use crate::protocol::command::ComputeCommand;
61use crate::protocol::response::{ComputeResponse, FrontiersResponse};
62use crate::service::ComputeClient;
63
64/// A shareable token.
65type Token = Arc<()>;
66
67/// A client enforcing sequential dataflow hydration.
68#[derive(Debug)]
69pub(super) struct SequentialHydration<T> {
70 /// A sender for commands to the wrapped client.
71 command_tx: mpsc::UnboundedSender<ComputeCommand<T>>,
72 /// A receiver for responses from the wrapped client.
73 response_rx: mpsc::UnboundedReceiver<Result<ComputeResponse<T>, anyhow::Error>>,
74 /// Dynamic system configuration.
75 dyncfg: Arc<ConfigSet>,
76 /// Tracked metrics.
77 metrics: ReplicaMetrics,
78 /// Tracked collections.
79 ///
80 /// Entries are inserted in response to observed `CreateDataflow` commands.
81 /// Entries are removed in response to `Frontiers` commands that report collection
82 /// hydration, or in response to `AllowCompaction` commands that specify the empty frontier.
83 collections: BTreeMap<GlobalId, Collection<T>>,
84 /// A queue of scheduled collections that are awaiting hydration.
85 hydration_queue: VecDeque<GlobalId>,
86 /// A token held by hydrating collections.
87 ///
88 /// Useful to efficiently determine how many collections are currently in the process of
89 /// hydration, and thus how much capacity is available.
90 hydration_token: Token,
91 /// Handle to the forwarder task, to abort it when `SequentialHydration` is dropped.
92 _forwarder_task: AbortOnDropHandle<()>,
93}
94
95impl<T> SequentialHydration<T>
96where
97 T: ComputeControllerTimestamp,
98{
99 /// Create a new `SequentialHydration` client.
100 pub fn new<C>(client: C, dyncfg: Arc<ConfigSet>, metrics: ReplicaMetrics) -> Self
101 where
102 C: ComputeClient<T> + 'static,
103 {
104 let (command_tx, command_rx) = mpsc::unbounded_channel();
105 let (response_tx, response_rx) = mpsc::unbounded_channel();
106 let forwarder = mz_ore::task::spawn(
107 || "sequential_hydration:forwarder",
108 forward_messages(client, command_rx, response_tx),
109 );
110
111 Self {
112 command_tx,
113 response_rx,
114 dyncfg,
115 metrics,
116 collections: Default::default(),
117 hydration_queue: Default::default(),
118 hydration_token: Default::default(),
119 _forwarder_task: forwarder.abort_on_drop(),
120 }
121 }
122
123 /// Return the number of hydrating collections.
124 fn hydration_count(&self) -> usize {
125 Arc::strong_count(&self.hydration_token) - 1
126 }
127
128 /// Absorb a command and send resulting commands to the wrapped client.
129 fn absorb_command(&mut self, cmd: ComputeCommand<T>) -> Result<(), anyhow::Error> {
130 // Whether to forward this command to the wrapped client.
131 let mut forward = true;
132
133 match &cmd {
134 // We enforce sequential hydration only for non-transient dataflows, assuming that
135 // transient dataflows are created for interactive user queries and should always be
136 // scheduled as soon as possible.
137 ComputeCommand::CreateDataflow(dataflow) if !dataflow.is_transient() => {
138 let export_ids: Vec<_> = dataflow.export_ids().collect();
139 let id = export_ids.expect_element(|| "multi-export dataflows are not supported");
140 let as_of = dataflow.as_of.clone().unwrap();
141
142 debug!(%id, ?as_of, "tracking collection");
143 self.collections.insert(id, Collection::new(as_of));
144 }
145 ComputeCommand::Schedule(id) => {
146 if let Some(collection) = self.collections.get_mut(id) {
147 debug!(%id, "enqueuing collection for hydration");
148 self.hydration_queue.push_back(*id);
149 collection.set_scheduled();
150 forward = false;
151 }
152 }
153 ComputeCommand::AllowCompaction { id, frontier } if frontier.is_empty() => {
154 // The collection was dropped by the controller. Remove it from the tracking state
155 // to ensure we don't produce any more commands for it.
156 if self.collections.remove(id).is_some() {
157 debug!(%id, "collection dropped");
158 }
159 }
160 _ => (),
161 }
162
163 if forward {
164 self.command_tx.send(cmd)?;
165 }
166
167 // Schedule collections that are ready now.
168 self.hydrate_collections()
169 }
170
171 /// Observe a response and send resulting commands to the wrapped client.
172 fn observe_response(&mut self, resp: &ComputeResponse<T>) -> Result<(), anyhow::Error> {
173 if let ComputeResponse::Frontiers(
174 id,
175 FrontiersResponse {
176 output_frontier: Some(frontier),
177 ..
178 },
179 ) = resp
180 {
181 if let Some(collection) = self.collections.remove(id) {
182 let hydrated = PartialOrder::less_than(&collection.as_of, frontier);
183 if hydrated || frontier.is_empty() {
184 debug!(%id, "collection hydrated");
185
186 // Note that it is possible to observe hydration even for collections for which
187 // we never sent a `Schedule` command, if the replica decided to not suspend
188 // the dataflow after creation. The compute protocol does not require replicas
189 // to create dataflows in suspended state. It seems like a good idea to still
190 // send a `Schedule` command in this case, rather than swallowing it, to make
191 // the protocol communication more predicatable.
192
193 match collection.state {
194 State::Created => {
195 // We haven't seen a `Schedule` command yet, so no obligations to send
196 // one either.
197 }
198 State::QueuedForHydration => {
199 // We are holding back the `Schedule` command for this collection. Send
200 // it now.
201 self.command_tx.send(ComputeCommand::Schedule(*id))?;
202 }
203 State::Hydrating(token) => {
204 // We freed some hydration capacity and may be able to start hydrating
205 // new collections.
206 drop(token);
207 self.hydrate_collections()?;
208 }
209 }
210 } else {
211 self.collections.insert(*id, collection);
212 }
213 }
214 }
215
216 Ok(())
217 }
218
219 /// Allow hydration based on the available capacity.
220 fn hydrate_collections(&mut self) -> Result<(), anyhow::Error> {
221 let capacity = HYDRATION_CONCURRENCY.get(&self.dyncfg);
222 while self.hydration_count() < capacity {
223 let Some(id) = self.hydration_queue.pop_front() else {
224 // Hydration queue is empty.
225 break;
226 };
227 let Some(collection) = self.collections.get_mut(&id) else {
228 // Collection has already been dropped.
229 continue;
230 };
231
232 debug!(%id, "starting collection hydration");
233 self.command_tx.send(ComputeCommand::Schedule(id))?;
234
235 let token = Arc::clone(&self.hydration_token);
236 collection.set_hydrating(token);
237 }
238
239 let queue_size = u64::cast_from(self.hydration_queue.len());
240 self.metrics.inner.hydration_queue_size.set(queue_size);
241
242 Ok(())
243 }
244}
245
246#[async_trait]
247impl<T> GenericClient<ComputeCommand<T>, ComputeResponse<T>> for SequentialHydration<T>
248where
249 T: ComputeControllerTimestamp,
250{
251 async fn send(&mut self, cmd: ComputeCommand<T>) -> Result<(), anyhow::Error> {
252 self.absorb_command(cmd)
253 }
254
255 /// # Cancel safety
256 ///
257 /// This method is cancel safe. If `recv` is used as the event in a [`tokio::select!`]
258 /// statement and some other branch completes first, it is guaranteed that no messages were
259 /// received by this client.
260 async fn recv(&mut self) -> Result<Option<ComputeResponse<T>>, anyhow::Error> {
261 // `mpsc::UnboundedReceiver::recv` is documented as cancel safe.
262 match self.response_rx.recv().await {
263 Some(Ok(response)) => {
264 self.observe_response(&response)?;
265 Ok(Some(response))
266 }
267 Some(Err(error)) => Err(error), // client error
268 None => Ok(None), // client disconnected
269 }
270 }
271}
272
273/// Information about a tracked collection.
274#[derive(Debug)]
275struct Collection<T> {
276 /// The as-of frontier at collection creation.
277 as_of: Antichain<T>,
278 /// The current state of the collection.
279 state: State,
280}
281
282impl<T> Collection<T> {
283 /// Create a new `Collection`.
284 fn new(as_of: Antichain<T>) -> Self {
285 Self {
286 as_of,
287 state: State::Created,
288 }
289 }
290
291 /// Advance this collection's state to `Scheduled`.
292 fn set_scheduled(&mut self) {
293 soft_assert_eq_or_log!(self.state, State::Created);
294 self.state = State::QueuedForHydration;
295 }
296
297 fn set_hydrating(&mut self, token: Token) {
298 soft_assert_eq_or_log!(self.state, State::QueuedForHydration);
299 self.state = State::Hydrating(token);
300 }
301}
302
303/// The state of a tracked collection.
304#[derive(Debug, PartialEq, Eq)]
305enum State {
306 /// Collection has been created and is waiting for a `Schedule` command.
307 Created,
308 /// The collection has received a `Schedule` command and has been added to the hydration queue,
309 /// waiting for hydration capacity.
310 QueuedForHydration,
311 /// Collection is hydrating and waiting for hydration to complete.
312 Hydrating(Token),
313}
314
315/// Forward messages between a pair of channels and a [`ComputeClient`].
316///
317/// This functions is run in its own task and exists to allow `SequentialHydration::recv` to be
318/// cancel safe even though it needs to send commands to the wrapped client, which isn't cancel
319/// safe.
320async fn forward_messages<C, T>(
321 mut client: C,
322 mut rx: mpsc::UnboundedReceiver<ComputeCommand<T>>,
323 tx: mpsc::UnboundedSender<Result<ComputeResponse<T>, anyhow::Error>>,
324) where
325 C: ComputeClient<T>,
326{
327 loop {
328 tokio::select! {
329 command = rx.recv() => {
330 let Some(command) = command else {
331 break; // `SequentialHydration` dropped
332 };
333 if let Err(error) = client.send(command).await {
334 // Client produced an unrecoverable error.
335 let _ = tx.send(Err(error));
336 break;
337 }
338 }
339 response = client.recv() => {
340 let response = match response {
341 Ok(Some(response)) => response,
342 Ok(None) => {
343 break; // client disconnected
344 }
345 Err(error) => {
346 // Client produced an unrecoverable error.
347 let _ = tx.send(Err(error));
348 break;
349 }
350 };
351 if tx.send(Ok(response)).is_err() {
352 break; // `SequentialHydration` dropped
353 }
354 }
355 }
356 }
357}