mz_compute_client/controller/sequential_hydration.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Sequential dataflow hydration support for replicas.
11//!
12//! Sequential hydration enforces a configurable "hydration concurrency" that limits how many
13//! dataflows may be hydrating at the same time. Limiting hydrating concurrency can be beneficial
14//! in reducing peak memory usage, cross-dataflow thrashing, and hydration time.
15//!
16//! The configured hydration concurrency is enforced by delaying the delivery of `Schedule` compute
17//! commands to the replica. Those commands are emitted by the controller for collections that
18//! become ready to hydrate (based on availability of input data) and are directly applied by
19//! replicas by unsuspending the corresponding dataflows. Delaying `Schedule` commands allows us to
20//! ensure only a limited number of dataflows can hydrate at the same time.
21//!
22//! Note that a dataflow may export multiple collections. `Schedule` commands are produced per
23//! collection but hydration is a dataflow-level mechanism. In practice Materialize today only
24//! produces dataflow with a single export and we rely on this assumption here to simplify the
25//! implementation. If the assumption ever ceases to hold, we will need to adjust the code in this
26//! module.
27//!
28//! Sequential hydration is enforced by a `SequentialHydration` interceptor that sits between the
29//! controller and the `PartitionedState` client that splits commands across replica processes.
30//! This location is important:
31//!
32//! * It needs to be behind the controller since hydration is a per-replica mechanism. Different
33//! replicas can progress through hydration at different paces.
34//! * It needs to be before the `PartitionedState` client because all replica workers must see
35//! `Schedule` commands in the same order. Otherwise we risk getting stuck when different
36//! workers hydrate different dataflows and wait on each other for progress in these dataflows.
37//! * It also needs to be before the `PartitionedState` client because it needs to be able to
38//! observe all compute commands. Clients behind `PartitionedState` are not guaranteed to do so,
39//! since commands are only forwarded to the first process.
40//!
41//! `SequentialHydration` is a synchronous interceptor: the replica task feeds it every command it
42//! is about to send and every response it receives, and the interceptor returns the commands that
43//! should actually be sent to the replica. The task is responsible for sending those commands, so
44//! the interceptor holds no client and spawns no task of its own.
45
46use std::collections::{BTreeMap, VecDeque};
47use std::sync::Arc;
48
49use mz_compute_types::dyncfgs::HYDRATION_CONCURRENCY;
50use mz_dyncfg::ConfigSet;
51use mz_ore::cast::CastFrom;
52use mz_ore::collections::CollectionExt;
53use mz_ore::soft_assert_eq_or_log;
54use mz_repr::{GlobalId, Timestamp};
55use timely::PartialOrder;
56use timely::progress::Antichain;
57use tracing::debug;
58
59use crate::metrics::ReplicaMetrics;
60use crate::protocol::command::ComputeCommand;
61use crate::protocol::response::{ComputeResponse, FrontiersResponse};
62
63/// A shareable token.
64type Token = Arc<()>;
65
66/// An interceptor enforcing sequential dataflow hydration.
67///
68/// The replica task drives this interceptor by feeding it the commands it intends to send (via
69/// [`SequentialHydration::absorb_command`]) and the responses it receives (via
70/// [`SequentialHydration::observe_response`]). Both methods return the commands the task should
71/// send to the replica, with `Schedule` commands held back or released according to the configured
72/// hydration concurrency.
73#[derive(Debug)]
74pub(super) struct SequentialHydration {
75 /// Dynamic system configuration.
76 dyncfg: Arc<ConfigSet>,
77 /// Tracked metrics.
78 metrics: ReplicaMetrics,
79 /// Tracked collections.
80 ///
81 /// Entries are inserted in response to observed `CreateDataflow` commands.
82 /// Entries are removed in response to `Frontiers` commands that report collection
83 /// hydration, or in response to `AllowCompaction` commands that specify the empty frontier.
84 collections: BTreeMap<GlobalId, Collection>,
85 /// A queue of scheduled collections that are awaiting hydration.
86 hydration_queue: VecDeque<GlobalId>,
87 /// A token held by hydrating collections.
88 ///
89 /// Useful to efficiently determine how many collections are currently in the process of
90 /// hydration, and thus how much capacity is available.
91 hydration_token: Token,
92}
93
94impl SequentialHydration {
95 /// Create a new `SequentialHydration` interceptor.
96 pub(super) fn new(dyncfg: Arc<ConfigSet>, metrics: ReplicaMetrics) -> Self {
97 Self {
98 dyncfg,
99 metrics,
100 collections: Default::default(),
101 hydration_queue: Default::default(),
102 hydration_token: Default::default(),
103 }
104 }
105
106 /// Return the number of hydrating collections.
107 fn hydration_count(&self) -> usize {
108 Arc::strong_count(&self.hydration_token) - 1
109 }
110
111 /// Absorb a command the task intends to send, returning the commands it should actually send.
112 pub(super) fn absorb_command(&mut self, cmd: ComputeCommand) -> Vec<ComputeCommand> {
113 // Whether to forward this command to the replica.
114 let mut forward = true;
115
116 match &cmd {
117 // We enforce sequential hydration only for non-transient dataflows, assuming that
118 // transient dataflows are created for interactive user queries and should always be
119 // scheduled as soon as possible.
120 ComputeCommand::CreateDataflow(dataflow) if !dataflow.is_transient() => {
121 let export_ids: Vec<_> = dataflow.export_ids().collect();
122 let id = export_ids.expect_element(|| "multi-export dataflows are not supported");
123 let as_of = dataflow.as_of.clone().unwrap();
124
125 debug!(%id, ?as_of, "tracking collection");
126 self.collections.insert(id, Collection::new(as_of));
127 }
128 ComputeCommand::Schedule(id) => {
129 if let Some(collection) = self.collections.get_mut(id) {
130 debug!(%id, "enqueuing collection for hydration");
131 self.hydration_queue.push_back(*id);
132 collection.set_scheduled();
133 forward = false;
134 }
135 }
136 ComputeCommand::AllowCompaction { id, frontier } if frontier.is_empty() => {
137 // The collection was dropped by the controller. Remove it from the tracking state
138 // to ensure we don't produce any more commands for it.
139 if self.collections.remove(id).is_some() {
140 debug!(%id, "collection dropped");
141 }
142 }
143 _ => (),
144 }
145
146 let mut commands = Vec::new();
147 if forward {
148 commands.push(cmd);
149 }
150
151 // Schedule collections that are ready now.
152 commands.extend(self.hydrate_collections());
153 commands
154 }
155
156 /// Observe a response the task received, returning the commands it should send in reaction.
157 pub(super) fn observe_response(&mut self, resp: &ComputeResponse) -> Vec<ComputeCommand> {
158 let mut commands = Vec::new();
159
160 if let ComputeResponse::Frontiers(
161 id,
162 FrontiersResponse {
163 output_frontier: Some(frontier),
164 ..
165 },
166 ) = resp
167 {
168 if let Some(collection) = self.collections.remove(id) {
169 let hydrated = PartialOrder::less_than(&collection.as_of, frontier);
170 if hydrated || frontier.is_empty() {
171 debug!(%id, "collection hydrated");
172
173 // Note that it is possible to observe hydration even for collections for which
174 // we never sent a `Schedule` command, if the replica decided to not suspend
175 // the dataflow after creation. The compute protocol does not require replicas
176 // to create dataflows in suspended state. It seems like a good idea to still
177 // send a `Schedule` command in this case, rather than swallowing it, to make
178 // the protocol communication more predicatable.
179
180 match collection.state {
181 State::Created => {
182 // We haven't seen a `Schedule` command yet, so no obligations to send
183 // one either.
184 }
185 State::QueuedForHydration => {
186 // We are holding back the `Schedule` command for this collection. Send
187 // it now.
188 commands.push(ComputeCommand::Schedule(*id));
189 }
190 State::Hydrating(token) => {
191 // We freed some hydration capacity and may be able to start hydrating
192 // new collections.
193 drop(token);
194 commands.extend(self.hydrate_collections());
195 }
196 }
197 } else {
198 self.collections.insert(*id, collection);
199 }
200 }
201 }
202
203 commands
204 }
205
206 /// Allow hydration based on the available capacity, returning the `Schedule` commands to send.
207 fn hydrate_collections(&mut self) -> Vec<ComputeCommand> {
208 let mut commands = Vec::new();
209
210 let capacity = HYDRATION_CONCURRENCY.get(&self.dyncfg);
211 while self.hydration_count() < capacity {
212 let Some(id) = self.hydration_queue.pop_front() else {
213 // Hydration queue is empty.
214 break;
215 };
216 let Some(collection) = self.collections.get_mut(&id) else {
217 // Collection has already been dropped.
218 continue;
219 };
220
221 debug!(%id, "starting collection hydration");
222 commands.push(ComputeCommand::Schedule(id));
223
224 let token = Arc::clone(&self.hydration_token);
225 collection.set_hydrating(token);
226 }
227
228 let queue_size = u64::cast_from(self.hydration_queue.len());
229 self.metrics.inner.hydration_queue_size.set(queue_size);
230
231 commands
232 }
233}
234
235/// Information about a tracked collection.
236#[derive(Debug)]
237struct Collection {
238 /// The as-of frontier at collection creation.
239 as_of: Antichain<Timestamp>,
240 /// The current state of the collection.
241 state: State,
242}
243
244impl Collection {
245 /// Create a new `Collection`.
246 fn new(as_of: Antichain<Timestamp>) -> Self {
247 Self {
248 as_of,
249 state: State::Created,
250 }
251 }
252
253 /// Advance this collection's state to `Scheduled`.
254 fn set_scheduled(&mut self) {
255 soft_assert_eq_or_log!(self.state, State::Created);
256 self.state = State::QueuedForHydration;
257 }
258
259 fn set_hydrating(&mut self, token: Token) {
260 soft_assert_eq_or_log!(self.state, State::QueuedForHydration);
261 self.state = State::Hydrating(token);
262 }
263}
264
265/// The state of a tracked collection.
266#[derive(Debug, PartialEq, Eq)]
267enum State {
268 /// Collection has been created and is waiting for a `Schedule` command.
269 Created,
270 /// The collection has received a `Schedule` command and has been added to the hydration queue,
271 /// waiting for hydration capacity.
272 QueuedForHydration,
273 /// Collection is hydrating and waiting for hydration to complete.
274 Hydrating(Token),
275}