Skip to main content

mz_compute_client/controller/
sequential_hydration.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Sequential dataflow hydration support for replicas.
11//!
12//! Sequential hydration enforces a configurable "hydration concurrency" that limits how many
13//! dataflows may be hydrating at the same time. Limiting hydrating concurrency can be beneficial
14//! in reducing peak memory usage, cross-dataflow thrashing, and hydration time.
15//!
16//! The configured hydration concurrency is enforced by delaying the delivery of `Schedule` compute
17//! commands to the replica. Those commands are emitted by the controller for collections that
18//! become ready to hydrate (based on availability of input data) and are directly applied by
19//! replicas by unsuspending the corresponding dataflows. Delaying `Schedule` commands allows us to
20//! ensure only a limited number of dataflows can hydrate at the same time.
21//!
22//! Note that a dataflow may export multiple collections. `Schedule` commands are produced per
23//! collection but hydration is a dataflow-level mechanism. In practice Materialize today only
24//! produces dataflow with a single export and we rely on this assumption here to simplify the
25//! implementation. If the assumption ever ceases to hold, we will need to adjust the code in this
26//! module.
27//!
28//! Sequential hydration is enforeced by a `SequentialHydration` client that sits between the
29//! controller and the `PartitionedState` client that splits commands across replica processes.
30//! This location is important:
31//!
32//!  * It needs to be behind the controller since hydration is a per-replica mechanism. Different
33//!    replicas can progress through hydration at different paces.
34//!  * It needs to be before the `PartitionedState` client because all replica workers must see
35//!    `Schedule` commands in the same order. Otherwise we risk getting stuck when different
36//!    workers hydrate different dataflows and wait on each other for progress in these dataflows.
37//!  * It also needs to be before the `PartitionedState` client because it needs to be able to
38//!    observe all compute commands. Clients behind `PartitionedState` are not guaranteed to do so,
39//!    since commands are only forwarded to the first process.
40
41use std::collections::{BTreeMap, VecDeque};
42use std::sync::Arc;
43
44use async_trait::async_trait;
45use mz_compute_types::dyncfgs::HYDRATION_CONCURRENCY;
46use mz_dyncfg::ConfigSet;
47use mz_ore::cast::CastFrom;
48use mz_ore::collections::CollectionExt;
49use mz_ore::soft_assert_eq_or_log;
50use mz_ore::task::AbortOnDropHandle;
51use mz_repr::{GlobalId, Timestamp};
52use mz_service::client::GenericClient;
53use timely::PartialOrder;
54use timely::progress::Antichain;
55use tokio::sync::mpsc;
56use tracing::debug;
57
58use crate::metrics::ReplicaMetrics;
59use crate::protocol::command::ComputeCommand;
60use crate::protocol::response::{ComputeResponse, FrontiersResponse};
61use crate::service::ComputeClient;
62
63/// A shareable token.
64type Token = Arc<()>;
65
66/// A client enforcing sequential dataflow hydration.
67#[derive(Debug)]
68pub(super) struct SequentialHydration {
69    /// A sender for commands to the wrapped client.
70    command_tx: mpsc::UnboundedSender<ComputeCommand>,
71    /// A receiver for responses from the wrapped client.
72    response_rx: mpsc::UnboundedReceiver<Result<ComputeResponse, anyhow::Error>>,
73    /// Dynamic system configuration.
74    dyncfg: Arc<ConfigSet>,
75    /// Tracked metrics.
76    metrics: ReplicaMetrics,
77    /// Tracked collections.
78    ///
79    /// Entries are inserted in response to observed `CreateDataflow` commands.
80    /// Entries are removed in response to `Frontiers` commands that report collection
81    /// hydration, or in response to `AllowCompaction` commands that specify the empty frontier.
82    collections: BTreeMap<GlobalId, Collection>,
83    /// A queue of scheduled collections that are awaiting hydration.
84    hydration_queue: VecDeque<GlobalId>,
85    /// A token held by hydrating collections.
86    ///
87    /// Useful to efficiently determine how many collections are currently in the process of
88    /// hydration, and thus how much capacity is available.
89    hydration_token: Token,
90    /// Handle to the forwarder task, to abort it when `SequentialHydration` is dropped.
91    _forwarder_task: AbortOnDropHandle<()>,
92}
93
94impl SequentialHydration {
95    /// Create a new `SequentialHydration` client.
96    pub fn new<C>(client: C, dyncfg: Arc<ConfigSet>, metrics: ReplicaMetrics) -> Self
97    where
98        C: ComputeClient + 'static,
99    {
100        let (command_tx, command_rx) = mpsc::unbounded_channel();
101        let (response_tx, response_rx) = mpsc::unbounded_channel();
102        let forwarder = mz_ore::task::spawn(
103            || "sequential_hydration:forwarder",
104            forward_messages(client, command_rx, response_tx),
105        );
106
107        Self {
108            command_tx,
109            response_rx,
110            dyncfg,
111            metrics,
112            collections: Default::default(),
113            hydration_queue: Default::default(),
114            hydration_token: Default::default(),
115            _forwarder_task: forwarder.abort_on_drop(),
116        }
117    }
118
119    /// Return the number of hydrating collections.
120    fn hydration_count(&self) -> usize {
121        Arc::strong_count(&self.hydration_token) - 1
122    }
123
124    /// Absorb a command and send resulting commands to the wrapped client.
125    fn absorb_command(&mut self, cmd: ComputeCommand) -> Result<(), anyhow::Error> {
126        // Whether to forward this command to the wrapped client.
127        let mut forward = true;
128
129        match &cmd {
130            // We enforce sequential hydration only for non-transient dataflows, assuming that
131            // transient dataflows are created for interactive user queries and should always be
132            // scheduled as soon as possible.
133            ComputeCommand::CreateDataflow(dataflow) if !dataflow.is_transient() => {
134                let export_ids: Vec<_> = dataflow.export_ids().collect();
135                let id = export_ids.expect_element(|| "multi-export dataflows are not supported");
136                let as_of = dataflow.as_of.clone().unwrap();
137
138                debug!(%id, ?as_of, "tracking collection");
139                self.collections.insert(id, Collection::new(as_of));
140            }
141            ComputeCommand::Schedule(id) => {
142                if let Some(collection) = self.collections.get_mut(id) {
143                    debug!(%id, "enqueuing collection for hydration");
144                    self.hydration_queue.push_back(*id);
145                    collection.set_scheduled();
146                    forward = false;
147                }
148            }
149            ComputeCommand::AllowCompaction { id, frontier } if frontier.is_empty() => {
150                // The collection was dropped by the controller. Remove it from the tracking state
151                // to ensure we don't produce any more commands for it.
152                if self.collections.remove(id).is_some() {
153                    debug!(%id, "collection dropped");
154                }
155            }
156            _ => (),
157        }
158
159        if forward {
160            self.command_tx.send(cmd)?;
161        }
162
163        // Schedule collections that are ready now.
164        self.hydrate_collections()
165    }
166
167    /// Observe a response and send resulting commands to the wrapped client.
168    fn observe_response(&mut self, resp: &ComputeResponse) -> Result<(), anyhow::Error> {
169        if let ComputeResponse::Frontiers(
170            id,
171            FrontiersResponse {
172                output_frontier: Some(frontier),
173                ..
174            },
175        ) = resp
176        {
177            if let Some(collection) = self.collections.remove(id) {
178                let hydrated = PartialOrder::less_than(&collection.as_of, frontier);
179                if hydrated || frontier.is_empty() {
180                    debug!(%id, "collection hydrated");
181
182                    // Note that it is possible to observe hydration even for collections for which
183                    // we never sent a `Schedule` command, if the replica decided to not suspend
184                    // the dataflow after creation. The compute protocol does not require replicas
185                    // to create dataflows in suspended state. It seems like a good idea to still
186                    // send a `Schedule` command in this case, rather than swallowing it, to make
187                    // the protocol communication more predicatable.
188
189                    match collection.state {
190                        State::Created => {
191                            // We haven't seen a `Schedule` command yet, so no obligations to send
192                            // one either.
193                        }
194                        State::QueuedForHydration => {
195                            // We are holding back the `Schedule` command for this collection. Send
196                            // it now.
197                            self.command_tx.send(ComputeCommand::Schedule(*id))?;
198                        }
199                        State::Hydrating(token) => {
200                            // We freed some hydration capacity and may be able to start hydrating
201                            // new collections.
202                            drop(token);
203                            self.hydrate_collections()?;
204                        }
205                    }
206                } else {
207                    self.collections.insert(*id, collection);
208                }
209            }
210        }
211
212        Ok(())
213    }
214
215    /// Allow hydration based on the available capacity.
216    fn hydrate_collections(&mut self) -> Result<(), anyhow::Error> {
217        let capacity = HYDRATION_CONCURRENCY.get(&self.dyncfg);
218        while self.hydration_count() < capacity {
219            let Some(id) = self.hydration_queue.pop_front() else {
220                // Hydration queue is empty.
221                break;
222            };
223            let Some(collection) = self.collections.get_mut(&id) else {
224                // Collection has already been dropped.
225                continue;
226            };
227
228            debug!(%id, "starting collection hydration");
229            self.command_tx.send(ComputeCommand::Schedule(id))?;
230
231            let token = Arc::clone(&self.hydration_token);
232            collection.set_hydrating(token);
233        }
234
235        let queue_size = u64::cast_from(self.hydration_queue.len());
236        self.metrics.inner.hydration_queue_size.set(queue_size);
237
238        Ok(())
239    }
240}
241
242#[async_trait]
243impl GenericClient<ComputeCommand, ComputeResponse> for SequentialHydration {
244    async fn send(&mut self, cmd: ComputeCommand) -> Result<(), anyhow::Error> {
245        self.absorb_command(cmd)
246    }
247
248    /// # Cancel safety
249    ///
250    /// This method is cancel safe. If `recv` is used as the event in a [`tokio::select!`]
251    /// statement and some other branch completes first, it is guaranteed that no messages were
252    /// received by this client.
253    async fn recv(&mut self) -> Result<Option<ComputeResponse>, anyhow::Error> {
254        // `mpsc::UnboundedReceiver::recv` is documented as cancel safe.
255        match self.response_rx.recv().await {
256            Some(Ok(response)) => {
257                self.observe_response(&response)?;
258                Ok(Some(response))
259            }
260            Some(Err(error)) => Err(error), // client error
261            None => Ok(None),               // client disconnected
262        }
263    }
264}
265
266/// Information about a tracked collection.
267#[derive(Debug)]
268struct Collection {
269    /// The as-of frontier at collection creation.
270    as_of: Antichain<Timestamp>,
271    /// The current state of the collection.
272    state: State,
273}
274
275impl Collection {
276    /// Create a new `Collection`.
277    fn new(as_of: Antichain<Timestamp>) -> Self {
278        Self {
279            as_of,
280            state: State::Created,
281        }
282    }
283
284    /// Advance this collection's state to `Scheduled`.
285    fn set_scheduled(&mut self) {
286        soft_assert_eq_or_log!(self.state, State::Created);
287        self.state = State::QueuedForHydration;
288    }
289
290    fn set_hydrating(&mut self, token: Token) {
291        soft_assert_eq_or_log!(self.state, State::QueuedForHydration);
292        self.state = State::Hydrating(token);
293    }
294}
295
296/// The state of a tracked collection.
297#[derive(Debug, PartialEq, Eq)]
298enum State {
299    /// Collection has been created and is waiting for a `Schedule` command.
300    Created,
301    /// The collection has received a `Schedule` command and has been added to the hydration queue,
302    /// waiting for hydration capacity.
303    QueuedForHydration,
304    /// Collection is hydrating and waiting for hydration to complete.
305    Hydrating(Token),
306}
307
308/// Forward messages between a pair of channels and a [`ComputeClient`].
309///
310/// This functions is run in its own task and exists to allow `SequentialHydration::recv` to be
311/// cancel safe even though it needs to send commands to the wrapped client, which isn't cancel
312/// safe.
313async fn forward_messages<C>(
314    mut client: C,
315    mut rx: mpsc::UnboundedReceiver<ComputeCommand>,
316    tx: mpsc::UnboundedSender<Result<ComputeResponse, anyhow::Error>>,
317) where
318    C: ComputeClient,
319{
320    loop {
321        tokio::select! {
322            command = rx.recv() => {
323                let Some(command) = command else {
324                    break; // `SequentialHydration` dropped
325                };
326                if let Err(error) = client.send(command).await {
327                    // Client produced an unrecoverable error.
328                    let _ = tx.send(Err(error));
329                    break;
330                }
331            }
332            response = client.recv() => {
333                let response = match response {
334                    Ok(Some(response)) => response,
335                    Ok(None) => {
336                        break; // client disconnected
337                    }
338                    Err(error) => {
339                        // Client produced an unrecoverable error.
340                        let _ = tx.send(Err(error));
341                        break;
342                    }
343                };
344                if tx.send(Ok(response)).is_err() {
345                    break; // `SequentialHydration` dropped
346                }
347            }
348        }
349    }
350}