mz_compute_client/controller/sequential_hydration.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Sequential dataflow hydration support for replicas.
11//!
12//! Sequential hydration enforces a configurable "hydration concurrency" that limits how many
13//! dataflows may be hydrating at the same time. Limiting hydrating concurrency can be beneficial
14//! in reducing peak memory usage, cross-dataflow thrashing, and hydration time.
15//!
16//! The configured hydration concurrency is enforced by delaying the delivery of `Schedule` compute
17//! commands to the replica. Those commands are emitted by the controller for collections that
18//! become ready to hydrate (based on availability of input data) and are directly applied by
19//! replicas by unsuspending the corresponding dataflows. Delaying `Schedule` commands allows us to
20//! ensure only a limited number of dataflows can hydrate at the same time.
21//!
22//! Note that a dataflow may export multiple collections. `Schedule` commands are produced per
23//! collection but hydration is a dataflow-level mechanism. In practice Materialize today only
24//! produces dataflow with a single export and we rely on this assumption here to simplify the
25//! implementation. If the assumption ever ceases to hold, we will need to adjust the code in this
26//! module.
27//!
28//! Sequential hydration is enforeced by a `SequentialHydration` client that sits between the
29//! controller and the `PartitionedState` client that splits commands across replica processes.
30//! This location is important:
31//!
32//! * It needs to be behind the controller since hydration is a per-replica mechanism. Different
33//! replicas can progress through hydration at different paces.
34//! * It needs to be before the `PartitionedState` client because all replica workers must see
35//! `Schedule` commands in the same order. Otherwise we risk getting stuck when different
36//! workers hydrate different dataflows and wait on each other for progress in these dataflows.
37//! * It also needs to be before the `PartitionedState` client because it needs to be able to
38//! observe all compute commands. Clients behind `PartitionedState` are not guaranteed to do so,
39//! since commands are only forwarded to the first process.
40
41use std::collections::{BTreeMap, VecDeque};
42use std::sync::Arc;
43
44use async_trait::async_trait;
45use mz_compute_types::dyncfgs::HYDRATION_CONCURRENCY;
46use mz_dyncfg::ConfigSet;
47use mz_ore::cast::CastFrom;
48use mz_ore::collections::CollectionExt;
49use mz_ore::soft_assert_eq_or_log;
50use mz_ore::task::AbortOnDropHandle;
51use mz_repr::{GlobalId, Timestamp};
52use mz_service::client::GenericClient;
53use timely::PartialOrder;
54use timely::progress::Antichain;
55use tokio::sync::mpsc;
56use tracing::debug;
57
58use crate::metrics::ReplicaMetrics;
59use crate::protocol::command::ComputeCommand;
60use crate::protocol::response::{ComputeResponse, FrontiersResponse};
61use crate::service::ComputeClient;
62
63/// A shareable token.
64type Token = Arc<()>;
65
66/// A client enforcing sequential dataflow hydration.
67#[derive(Debug)]
68pub(super) struct SequentialHydration {
69 /// A sender for commands to the wrapped client.
70 command_tx: mpsc::UnboundedSender<ComputeCommand>,
71 /// A receiver for responses from the wrapped client.
72 response_rx: mpsc::UnboundedReceiver<Result<ComputeResponse, anyhow::Error>>,
73 /// Dynamic system configuration.
74 dyncfg: Arc<ConfigSet>,
75 /// Tracked metrics.
76 metrics: ReplicaMetrics,
77 /// Tracked collections.
78 ///
79 /// Entries are inserted in response to observed `CreateDataflow` commands.
80 /// Entries are removed in response to `Frontiers` commands that report collection
81 /// hydration, or in response to `AllowCompaction` commands that specify the empty frontier.
82 collections: BTreeMap<GlobalId, Collection>,
83 /// A queue of scheduled collections that are awaiting hydration.
84 hydration_queue: VecDeque<GlobalId>,
85 /// A token held by hydrating collections.
86 ///
87 /// Useful to efficiently determine how many collections are currently in the process of
88 /// hydration, and thus how much capacity is available.
89 hydration_token: Token,
90 /// Handle to the forwarder task, to abort it when `SequentialHydration` is dropped.
91 _forwarder_task: AbortOnDropHandle<()>,
92}
93
94impl SequentialHydration {
95 /// Create a new `SequentialHydration` client.
96 pub fn new<C>(client: C, dyncfg: Arc<ConfigSet>, metrics: ReplicaMetrics) -> Self
97 where
98 C: ComputeClient + 'static,
99 {
100 let (command_tx, command_rx) = mpsc::unbounded_channel();
101 let (response_tx, response_rx) = mpsc::unbounded_channel();
102 let forwarder = mz_ore::task::spawn(
103 || "sequential_hydration:forwarder",
104 forward_messages(client, command_rx, response_tx),
105 );
106
107 Self {
108 command_tx,
109 response_rx,
110 dyncfg,
111 metrics,
112 collections: Default::default(),
113 hydration_queue: Default::default(),
114 hydration_token: Default::default(),
115 _forwarder_task: forwarder.abort_on_drop(),
116 }
117 }
118
119 /// Return the number of hydrating collections.
120 fn hydration_count(&self) -> usize {
121 Arc::strong_count(&self.hydration_token) - 1
122 }
123
124 /// Absorb a command and send resulting commands to the wrapped client.
125 fn absorb_command(&mut self, cmd: ComputeCommand) -> Result<(), anyhow::Error> {
126 // Whether to forward this command to the wrapped client.
127 let mut forward = true;
128
129 match &cmd {
130 // We enforce sequential hydration only for non-transient dataflows, assuming that
131 // transient dataflows are created for interactive user queries and should always be
132 // scheduled as soon as possible.
133 ComputeCommand::CreateDataflow(dataflow) if !dataflow.is_transient() => {
134 let export_ids: Vec<_> = dataflow.export_ids().collect();
135 let id = export_ids.expect_element(|| "multi-export dataflows are not supported");
136 let as_of = dataflow.as_of.clone().unwrap();
137
138 debug!(%id, ?as_of, "tracking collection");
139 self.collections.insert(id, Collection::new(as_of));
140 }
141 ComputeCommand::Schedule(id) => {
142 if let Some(collection) = self.collections.get_mut(id) {
143 debug!(%id, "enqueuing collection for hydration");
144 self.hydration_queue.push_back(*id);
145 collection.set_scheduled();
146 forward = false;
147 }
148 }
149 ComputeCommand::AllowCompaction { id, frontier } if frontier.is_empty() => {
150 // The collection was dropped by the controller. Remove it from the tracking state
151 // to ensure we don't produce any more commands for it.
152 if self.collections.remove(id).is_some() {
153 debug!(%id, "collection dropped");
154 }
155 }
156 _ => (),
157 }
158
159 if forward {
160 self.command_tx.send(cmd)?;
161 }
162
163 // Schedule collections that are ready now.
164 self.hydrate_collections()
165 }
166
167 /// Observe a response and send resulting commands to the wrapped client.
168 fn observe_response(&mut self, resp: &ComputeResponse) -> Result<(), anyhow::Error> {
169 if let ComputeResponse::Frontiers(
170 id,
171 FrontiersResponse {
172 output_frontier: Some(frontier),
173 ..
174 },
175 ) = resp
176 {
177 if let Some(collection) = self.collections.remove(id) {
178 let hydrated = PartialOrder::less_than(&collection.as_of, frontier);
179 if hydrated || frontier.is_empty() {
180 debug!(%id, "collection hydrated");
181
182 // Note that it is possible to observe hydration even for collections for which
183 // we never sent a `Schedule` command, if the replica decided to not suspend
184 // the dataflow after creation. The compute protocol does not require replicas
185 // to create dataflows in suspended state. It seems like a good idea to still
186 // send a `Schedule` command in this case, rather than swallowing it, to make
187 // the protocol communication more predicatable.
188
189 match collection.state {
190 State::Created => {
191 // We haven't seen a `Schedule` command yet, so no obligations to send
192 // one either.
193 }
194 State::QueuedForHydration => {
195 // We are holding back the `Schedule` command for this collection. Send
196 // it now.
197 self.command_tx.send(ComputeCommand::Schedule(*id))?;
198 }
199 State::Hydrating(token) => {
200 // We freed some hydration capacity and may be able to start hydrating
201 // new collections.
202 drop(token);
203 self.hydrate_collections()?;
204 }
205 }
206 } else {
207 self.collections.insert(*id, collection);
208 }
209 }
210 }
211
212 Ok(())
213 }
214
215 /// Allow hydration based on the available capacity.
216 fn hydrate_collections(&mut self) -> Result<(), anyhow::Error> {
217 let capacity = HYDRATION_CONCURRENCY.get(&self.dyncfg);
218 while self.hydration_count() < capacity {
219 let Some(id) = self.hydration_queue.pop_front() else {
220 // Hydration queue is empty.
221 break;
222 };
223 let Some(collection) = self.collections.get_mut(&id) else {
224 // Collection has already been dropped.
225 continue;
226 };
227
228 debug!(%id, "starting collection hydration");
229 self.command_tx.send(ComputeCommand::Schedule(id))?;
230
231 let token = Arc::clone(&self.hydration_token);
232 collection.set_hydrating(token);
233 }
234
235 let queue_size = u64::cast_from(self.hydration_queue.len());
236 self.metrics.inner.hydration_queue_size.set(queue_size);
237
238 Ok(())
239 }
240}
241
242#[async_trait]
243impl GenericClient<ComputeCommand, ComputeResponse> for SequentialHydration {
244 async fn send(&mut self, cmd: ComputeCommand) -> Result<(), anyhow::Error> {
245 self.absorb_command(cmd)
246 }
247
248 /// # Cancel safety
249 ///
250 /// This method is cancel safe. If `recv` is used as the event in a [`tokio::select!`]
251 /// statement and some other branch completes first, it is guaranteed that no messages were
252 /// received by this client.
253 async fn recv(&mut self) -> Result<Option<ComputeResponse>, anyhow::Error> {
254 // `mpsc::UnboundedReceiver::recv` is documented as cancel safe.
255 match self.response_rx.recv().await {
256 Some(Ok(response)) => {
257 self.observe_response(&response)?;
258 Ok(Some(response))
259 }
260 Some(Err(error)) => Err(error), // client error
261 None => Ok(None), // client disconnected
262 }
263 }
264}
265
266/// Information about a tracked collection.
267#[derive(Debug)]
268struct Collection {
269 /// The as-of frontier at collection creation.
270 as_of: Antichain<Timestamp>,
271 /// The current state of the collection.
272 state: State,
273}
274
275impl Collection {
276 /// Create a new `Collection`.
277 fn new(as_of: Antichain<Timestamp>) -> Self {
278 Self {
279 as_of,
280 state: State::Created,
281 }
282 }
283
284 /// Advance this collection's state to `Scheduled`.
285 fn set_scheduled(&mut self) {
286 soft_assert_eq_or_log!(self.state, State::Created);
287 self.state = State::QueuedForHydration;
288 }
289
290 fn set_hydrating(&mut self, token: Token) {
291 soft_assert_eq_or_log!(self.state, State::QueuedForHydration);
292 self.state = State::Hydrating(token);
293 }
294}
295
296/// The state of a tracked collection.
297#[derive(Debug, PartialEq, Eq)]
298enum State {
299 /// Collection has been created and is waiting for a `Schedule` command.
300 Created,
301 /// The collection has received a `Schedule` command and has been added to the hydration queue,
302 /// waiting for hydration capacity.
303 QueuedForHydration,
304 /// Collection is hydrating and waiting for hydration to complete.
305 Hydrating(Token),
306}
307
308/// Forward messages between a pair of channels and a [`ComputeClient`].
309///
310/// This functions is run in its own task and exists to allow `SequentialHydration::recv` to be
311/// cancel safe even though it needs to send commands to the wrapped client, which isn't cancel
312/// safe.
313async fn forward_messages<C>(
314 mut client: C,
315 mut rx: mpsc::UnboundedReceiver<ComputeCommand>,
316 tx: mpsc::UnboundedSender<Result<ComputeResponse, anyhow::Error>>,
317) where
318 C: ComputeClient,
319{
320 loop {
321 tokio::select! {
322 command = rx.recv() => {
323 let Some(command) = command else {
324 break; // `SequentialHydration` dropped
325 };
326 if let Err(error) = client.send(command).await {
327 // Client produced an unrecoverable error.
328 let _ = tx.send(Err(error));
329 break;
330 }
331 }
332 response = client.recv() => {
333 let response = match response {
334 Ok(Some(response)) => response,
335 Ok(None) => {
336 break; // client disconnected
337 }
338 Err(error) => {
339 // Client produced an unrecoverable error.
340 let _ = tx.send(Err(error));
341 break;
342 }
343 };
344 if tx.send(Ok(response)).is_err() {
345 break; // `SequentialHydration` dropped
346 }
347 }
348 }
349 }
350}