mz_compute_client/controller/
sequential_hydration.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Sequential dataflow hydration support for replicas.
11//!
12//! Sequential hydration enforces a configurable "hydration concurrency" that limits how many
13//! dataflows may be hydrating at the same time. Limiting hydrating concurrency can be beneficial
14//! in reducing peak memory usage, cross-dataflow thrashing, and hydration time.
15//!
16//! The configured hydration concurrency is enforced by delaying the delivery of `Schedule` compute
17//! commands to the replica. Those commands are emitted by the controller for collections that
18//! become ready to hydrate (based on availability of input data) and are directly applied by
19//! replicas by unsuspending the corresponding dataflows. Delaying `Schedule` commands allows us to
20//! ensure only a limited number of dataflows can hydrate at the same time.
21//!
22//! Note that a dataflow may export multiple collections. `Schedule` commands are produced per
23//! collection but hydration is a dataflow-level mechanism. In practice Materialize today only
24//! produces dataflow with a single export and we rely on this assumption here to simplify the
25//! implementation. If the assumption ever ceases to hold, we will need to adjust the code in this
26//! module.
27//!
28//! Sequential hydration is enforeced by a `SequentialHydration` client that sits between the
29//! controller and the `PartitionedState` client that splits commands across replica processes.
30//! This location is important:
31//!
32//!  * It needs to be behind the controller since hydration is a per-replica mechanism. Different
33//!    replicas can progress through hydration at different paces.
34//!  * It needs to be before the `PartitionedState` client because all replica workers must see
35//!    `Schedule` commands in the same order. Otherwise we risk getting stuck when different
36//!    workers hydrate different dataflows and wait on each other for progress in these dataflows.
37//!  * It also needs to be before the `PartitionedState` client because it needs to be able to
38//!    observe all compute commands. Clients behind `PartitionedState` are not guaranteed to do so,
39//!    since commands are only forwarded to the first process.
40
41use std::collections::{BTreeMap, VecDeque};
42use std::sync::Arc;
43
44use async_trait::async_trait;
45use mz_compute_types::dyncfgs::HYDRATION_CONCURRENCY;
46use mz_dyncfg::ConfigSet;
47use mz_ore::cast::CastFrom;
48use mz_ore::collections::CollectionExt;
49use mz_ore::soft_assert_eq_or_log;
50use mz_ore::task::AbortOnDropHandle;
51use mz_repr::GlobalId;
52use mz_service::client::GenericClient;
53use timely::PartialOrder;
54use timely::progress::Antichain;
55use tokio::sync::mpsc;
56use tracing::debug;
57
58use crate::controller::ComputeControllerTimestamp;
59use crate::metrics::ReplicaMetrics;
60use crate::protocol::command::ComputeCommand;
61use crate::protocol::response::{ComputeResponse, FrontiersResponse};
62use crate::service::ComputeClient;
63
64/// A shareable token.
65type Token = Arc<()>;
66
67/// A client enforcing sequential dataflow hydration.
68#[derive(Debug)]
69pub(super) struct SequentialHydration<T> {
70    /// A sender for commands to the wrapped client.
71    command_tx: mpsc::UnboundedSender<ComputeCommand<T>>,
72    /// A receiver for responses from the wrapped client.
73    response_rx: mpsc::UnboundedReceiver<Result<ComputeResponse<T>, anyhow::Error>>,
74    /// Dynamic system configuration.
75    dyncfg: Arc<ConfigSet>,
76    /// Tracked metrics.
77    metrics: ReplicaMetrics,
78    /// Tracked collections.
79    ///
80    /// Entries are inserted in response to observed `CreateDataflow` commands.
81    /// Entries are removed in response to `Frontiers` commands that report collection
82    /// hydration, or in response to `AllowCompaction` commands that specify the empty frontier.
83    collections: BTreeMap<GlobalId, Collection<T>>,
84    /// A queue of scheduled collections that are awaiting hydration.
85    hydration_queue: VecDeque<GlobalId>,
86    /// A token held by hydrating collections.
87    ///
88    /// Useful to efficiently determine how many collections are currently in the process of
89    /// hydration, and thus how much capacity is available.
90    hydration_token: Token,
91    /// Handle to the forwarder task, to abort it when `SequentialHydration` is dropped.
92    _forwarder_task: AbortOnDropHandle<()>,
93}
94
95impl<T> SequentialHydration<T>
96where
97    T: ComputeControllerTimestamp,
98{
99    /// Create a new `SequentialHydration` client.
100    pub fn new<C>(client: C, dyncfg: Arc<ConfigSet>, metrics: ReplicaMetrics) -> Self
101    where
102        C: ComputeClient<T> + 'static,
103    {
104        let (command_tx, command_rx) = mpsc::unbounded_channel();
105        let (response_tx, response_rx) = mpsc::unbounded_channel();
106        let forwarder = mz_ore::task::spawn(
107            || "sequential_hydration:forwarder",
108            forward_messages(client, command_rx, response_tx),
109        );
110
111        Self {
112            command_tx,
113            response_rx,
114            dyncfg,
115            metrics,
116            collections: Default::default(),
117            hydration_queue: Default::default(),
118            hydration_token: Default::default(),
119            _forwarder_task: forwarder.abort_on_drop(),
120        }
121    }
122
123    /// Return the number of hydrating collections.
124    fn hydration_count(&self) -> usize {
125        Arc::strong_count(&self.hydration_token) - 1
126    }
127
128    /// Absorb a command and send resulting commands to the wrapped client.
129    fn absorb_command(&mut self, cmd: ComputeCommand<T>) -> Result<(), anyhow::Error> {
130        // Whether to forward this command to the wrapped client.
131        let mut forward = true;
132
133        match &cmd {
134            // We enforce sequential hydration only for non-transient dataflows, assuming that
135            // transient dataflows are created for interactive user queries and should always be
136            // scheduled as soon as possible.
137            ComputeCommand::CreateDataflow(dataflow) if !dataflow.is_transient() => {
138                let export_ids: Vec<_> = dataflow.export_ids().collect();
139                let id = export_ids.expect_element(|| "multi-export dataflows are not supported");
140                let as_of = dataflow.as_of.clone().unwrap();
141
142                debug!(%id, ?as_of, "tracking collection");
143                self.collections.insert(id, Collection::new(as_of));
144            }
145            ComputeCommand::Schedule(id) => {
146                if let Some(collection) = self.collections.get_mut(id) {
147                    debug!(%id, "enqueuing collection for hydration");
148                    self.hydration_queue.push_back(*id);
149                    collection.set_scheduled();
150                    forward = false;
151                }
152            }
153            ComputeCommand::AllowCompaction { id, frontier } if frontier.is_empty() => {
154                // The collection was dropped by the controller. Remove it from the tracking state
155                // to ensure we don't produce any more commands for it.
156                if self.collections.remove(id).is_some() {
157                    debug!(%id, "collection dropped");
158                }
159            }
160            _ => (),
161        }
162
163        if forward {
164            self.command_tx.send(cmd)?;
165        }
166
167        // Schedule collections that are ready now.
168        self.hydrate_collections()
169    }
170
171    /// Observe a response and send resulting commands to the wrapped client.
172    fn observe_response(&mut self, resp: &ComputeResponse<T>) -> Result<(), anyhow::Error> {
173        if let ComputeResponse::Frontiers(
174            id,
175            FrontiersResponse {
176                output_frontier: Some(frontier),
177                ..
178            },
179        ) = resp
180        {
181            if let Some(collection) = self.collections.remove(id) {
182                let hydrated = PartialOrder::less_than(&collection.as_of, frontier);
183                if hydrated || frontier.is_empty() {
184                    debug!(%id, "collection hydrated");
185
186                    // Note that it is possible to observe hydration even for collections for which
187                    // we never sent a `Schedule` command, if the replica decided to not suspend
188                    // the dataflow after creation. The compute protocol does not require replicas
189                    // to create dataflows in suspended state. It seems like a good idea to still
190                    // send a `Schedule` command in this case, rather than swallowing it, to make
191                    // the protocol communication more predicatable.
192
193                    match collection.state {
194                        State::Created => {
195                            // We haven't seen a `Schedule` command yet, so no obligations to send
196                            // one either.
197                        }
198                        State::QueuedForHydration => {
199                            // We are holding back the `Schedule` command for this collection. Send
200                            // it now.
201                            self.command_tx.send(ComputeCommand::Schedule(*id))?;
202                        }
203                        State::Hydrating(token) => {
204                            // We freed some hydration capacity and may be able to start hydrating
205                            // new collections.
206                            drop(token);
207                            self.hydrate_collections()?;
208                        }
209                    }
210                } else {
211                    self.collections.insert(*id, collection);
212                }
213            }
214        }
215
216        Ok(())
217    }
218
219    /// Allow hydration based on the available capacity.
220    fn hydrate_collections(&mut self) -> Result<(), anyhow::Error> {
221        let capacity = HYDRATION_CONCURRENCY.get(&self.dyncfg);
222        while self.hydration_count() < capacity {
223            let Some(id) = self.hydration_queue.pop_front() else {
224                // Hydration queue is empty.
225                break;
226            };
227            let Some(collection) = self.collections.get_mut(&id) else {
228                // Collection has already been dropped.
229                continue;
230            };
231
232            debug!(%id, "starting collection hydration");
233            self.command_tx.send(ComputeCommand::Schedule(id))?;
234
235            let token = Arc::clone(&self.hydration_token);
236            collection.set_hydrating(token);
237        }
238
239        let queue_size = u64::cast_from(self.hydration_queue.len());
240        self.metrics.inner.hydration_queue_size.set(queue_size);
241
242        Ok(())
243    }
244}
245
246#[async_trait]
247impl<T> GenericClient<ComputeCommand<T>, ComputeResponse<T>> for SequentialHydration<T>
248where
249    T: ComputeControllerTimestamp,
250{
251    async fn send(&mut self, cmd: ComputeCommand<T>) -> Result<(), anyhow::Error> {
252        self.absorb_command(cmd)
253    }
254
255    /// # Cancel safety
256    ///
257    /// This method is cancel safe. If `recv` is used as the event in a [`tokio::select!`]
258    /// statement and some other branch completes first, it is guaranteed that no messages were
259    /// received by this client.
260    async fn recv(&mut self) -> Result<Option<ComputeResponse<T>>, anyhow::Error> {
261        // `mpsc::UnboundedReceiver::recv` is documented as cancel safe.
262        match self.response_rx.recv().await {
263            Some(Ok(response)) => {
264                self.observe_response(&response)?;
265                Ok(Some(response))
266            }
267            Some(Err(error)) => Err(error), // client error
268            None => Ok(None),               // client disconnected
269        }
270    }
271}
272
273/// Information about a tracked collection.
274#[derive(Debug)]
275struct Collection<T> {
276    /// The as-of frontier at collection creation.
277    as_of: Antichain<T>,
278    /// The current state of the collection.
279    state: State,
280}
281
282impl<T> Collection<T> {
283    /// Create a new `Collection`.
284    fn new(as_of: Antichain<T>) -> Self {
285        Self {
286            as_of,
287            state: State::Created,
288        }
289    }
290
291    /// Advance this collection's state to `Scheduled`.
292    fn set_scheduled(&mut self) {
293        soft_assert_eq_or_log!(self.state, State::Created);
294        self.state = State::QueuedForHydration;
295    }
296
297    fn set_hydrating(&mut self, token: Token) {
298        soft_assert_eq_or_log!(self.state, State::QueuedForHydration);
299        self.state = State::Hydrating(token);
300    }
301}
302
303/// The state of a tracked collection.
304#[derive(Debug, PartialEq, Eq)]
305enum State {
306    /// Collection has been created and is waiting for a `Schedule` command.
307    Created,
308    /// The collection has received a `Schedule` command and has been added to the hydration queue,
309    /// waiting for hydration capacity.
310    QueuedForHydration,
311    /// Collection is hydrating and waiting for hydration to complete.
312    Hydrating(Token),
313}
314
315/// Forward messages between a pair of channels and a [`ComputeClient`].
316///
317/// This functions is run in its own task and exists to allow `SequentialHydration::recv` to be
318/// cancel safe even though it needs to send commands to the wrapped client, which isn't cancel
319/// safe.
320async fn forward_messages<C, T>(
321    mut client: C,
322    mut rx: mpsc::UnboundedReceiver<ComputeCommand<T>>,
323    tx: mpsc::UnboundedSender<Result<ComputeResponse<T>, anyhow::Error>>,
324) where
325    C: ComputeClient<T>,
326{
327    loop {
328        tokio::select! {
329            command = rx.recv() => {
330                let Some(command) = command else {
331                    break; // `SequentialHydration` dropped
332                };
333                if let Err(error) = client.send(command).await {
334                    // Client produced an unrecoverable error.
335                    let _ = tx.send(Err(error));
336                    break;
337                }
338            }
339            response = client.recv() => {
340                let response = match response {
341                    Ok(Some(response)) => response,
342                    Ok(None) => {
343                        break; // client disconnected
344                    }
345                    Err(error) => {
346                        // Client produced an unrecoverable error.
347                        let _ = tx.send(Err(error));
348                        break;
349                    }
350                };
351                if tx.send(Ok(response)).is_err() {
352                    break; // `SequentialHydration` dropped
353                }
354            }
355        }
356    }
357}