Skip to main content

mz_compute_client/controller/
sequential_hydration.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Sequential dataflow hydration support for replicas.
11//!
12//! Sequential hydration enforces a configurable "hydration concurrency" that limits how many
13//! dataflows may be hydrating at the same time. Limiting hydrating concurrency can be beneficial
14//! in reducing peak memory usage, cross-dataflow thrashing, and hydration time.
15//!
16//! The configured hydration concurrency is enforced by delaying the delivery of `Schedule` compute
17//! commands to the replica. Those commands are emitted by the controller for collections that
18//! become ready to hydrate (based on availability of input data) and are directly applied by
19//! replicas by unsuspending the corresponding dataflows. Delaying `Schedule` commands allows us to
20//! ensure only a limited number of dataflows can hydrate at the same time.
21//!
22//! Note that a dataflow may export multiple collections. `Schedule` commands are produced per
23//! collection but hydration is a dataflow-level mechanism. In practice Materialize today only
24//! produces dataflow with a single export and we rely on this assumption here to simplify the
25//! implementation. If the assumption ever ceases to hold, we will need to adjust the code in this
26//! module.
27//!
28//! Sequential hydration is enforced by a `SequentialHydration` interceptor that sits between the
29//! controller and the `PartitionedState` client that splits commands across replica processes.
30//! This location is important:
31//!
32//!  * It needs to be behind the controller since hydration is a per-replica mechanism. Different
33//!    replicas can progress through hydration at different paces.
34//!  * It needs to be before the `PartitionedState` client because all replica workers must see
35//!    `Schedule` commands in the same order. Otherwise we risk getting stuck when different
36//!    workers hydrate different dataflows and wait on each other for progress in these dataflows.
37//!  * It also needs to be before the `PartitionedState` client because it needs to be able to
38//!    observe all compute commands. Clients behind `PartitionedState` are not guaranteed to do so,
39//!    since commands are only forwarded to the first process.
40//!
41//! `SequentialHydration` is a synchronous interceptor: the replica task feeds it every command it
42//! is about to send and every response it receives, and the interceptor returns the commands that
43//! should actually be sent to the replica. The task is responsible for sending those commands, so
44//! the interceptor holds no client and spawns no task of its own.
45
46use std::collections::{BTreeMap, VecDeque};
47use std::sync::Arc;
48
49use mz_compute_types::dyncfgs::HYDRATION_CONCURRENCY;
50use mz_dyncfg::ConfigSet;
51use mz_ore::cast::CastFrom;
52use mz_ore::collections::CollectionExt;
53use mz_ore::soft_assert_eq_or_log;
54use mz_repr::{GlobalId, Timestamp};
55use timely::PartialOrder;
56use timely::progress::Antichain;
57use tracing::debug;
58
59use crate::metrics::ReplicaMetrics;
60use crate::protocol::command::ComputeCommand;
61use crate::protocol::response::{ComputeResponse, FrontiersResponse};
62
63/// A shareable token.
64type Token = Arc<()>;
65
66/// An interceptor enforcing sequential dataflow hydration.
67///
68/// The replica task drives this interceptor by feeding it the commands it intends to send (via
69/// [`SequentialHydration::absorb_command`]) and the responses it receives (via
70/// [`SequentialHydration::observe_response`]). Both methods return the commands the task should
71/// send to the replica, with `Schedule` commands held back or released according to the configured
72/// hydration concurrency.
73#[derive(Debug)]
74pub(super) struct SequentialHydration {
75    /// Dynamic system configuration.
76    dyncfg: Arc<ConfigSet>,
77    /// Tracked metrics.
78    metrics: ReplicaMetrics,
79    /// Tracked collections.
80    ///
81    /// Entries are inserted in response to observed `CreateDataflow` commands.
82    /// Entries are removed in response to `Frontiers` commands that report collection
83    /// hydration, or in response to `AllowCompaction` commands that specify the empty frontier.
84    collections: BTreeMap<GlobalId, Collection>,
85    /// A queue of scheduled collections that are awaiting hydration.
86    hydration_queue: VecDeque<GlobalId>,
87    /// A token held by hydrating collections.
88    ///
89    /// Useful to efficiently determine how many collections are currently in the process of
90    /// hydration, and thus how much capacity is available.
91    hydration_token: Token,
92}
93
94impl SequentialHydration {
95    /// Create a new `SequentialHydration` interceptor.
96    pub(super) fn new(dyncfg: Arc<ConfigSet>, metrics: ReplicaMetrics) -> Self {
97        Self {
98            dyncfg,
99            metrics,
100            collections: Default::default(),
101            hydration_queue: Default::default(),
102            hydration_token: Default::default(),
103        }
104    }
105
106    /// Return the number of hydrating collections.
107    fn hydration_count(&self) -> usize {
108        Arc::strong_count(&self.hydration_token) - 1
109    }
110
111    /// Absorb a command the task intends to send, returning the commands it should actually send.
112    pub(super) fn absorb_command(&mut self, cmd: ComputeCommand) -> Vec<ComputeCommand> {
113        // Whether to forward this command to the replica.
114        let mut forward = true;
115
116        match &cmd {
117            // We enforce sequential hydration only for non-transient dataflows, assuming that
118            // transient dataflows are created for interactive user queries and should always be
119            // scheduled as soon as possible.
120            ComputeCommand::CreateDataflow(dataflow) if !dataflow.is_transient() => {
121                let export_ids: Vec<_> = dataflow.export_ids().collect();
122                let id = export_ids.expect_element(|| "multi-export dataflows are not supported");
123                let as_of = dataflow.as_of.clone().unwrap();
124
125                debug!(%id, ?as_of, "tracking collection");
126                self.collections.insert(id, Collection::new(as_of));
127            }
128            ComputeCommand::Schedule(id) => {
129                if let Some(collection) = self.collections.get_mut(id) {
130                    debug!(%id, "enqueuing collection for hydration");
131                    self.hydration_queue.push_back(*id);
132                    collection.set_scheduled();
133                    forward = false;
134                }
135            }
136            ComputeCommand::AllowCompaction { id, frontier } if frontier.is_empty() => {
137                // The collection was dropped by the controller. Remove it from the tracking state
138                // to ensure we don't produce any more commands for it.
139                if self.collections.remove(id).is_some() {
140                    debug!(%id, "collection dropped");
141                }
142            }
143            _ => (),
144        }
145
146        let mut commands = Vec::new();
147        if forward {
148            commands.push(cmd);
149        }
150
151        // Schedule collections that are ready now.
152        commands.extend(self.hydrate_collections());
153        commands
154    }
155
156    /// Observe a response the task received, returning the commands it should send in reaction.
157    pub(super) fn observe_response(&mut self, resp: &ComputeResponse) -> Vec<ComputeCommand> {
158        let mut commands = Vec::new();
159
160        if let ComputeResponse::Frontiers(
161            id,
162            FrontiersResponse {
163                output_frontier: Some(frontier),
164                ..
165            },
166        ) = resp
167        {
168            if let Some(collection) = self.collections.remove(id) {
169                let hydrated = PartialOrder::less_than(&collection.as_of, frontier);
170                if hydrated || frontier.is_empty() {
171                    debug!(%id, "collection hydrated");
172
173                    // Note that it is possible to observe hydration even for collections for which
174                    // we never sent a `Schedule` command, if the replica decided to not suspend
175                    // the dataflow after creation. The compute protocol does not require replicas
176                    // to create dataflows in suspended state. It seems like a good idea to still
177                    // send a `Schedule` command in this case, rather than swallowing it, to make
178                    // the protocol communication more predicatable.
179
180                    match collection.state {
181                        State::Created => {
182                            // We haven't seen a `Schedule` command yet, so no obligations to send
183                            // one either.
184                        }
185                        State::QueuedForHydration => {
186                            // We are holding back the `Schedule` command for this collection. Send
187                            // it now.
188                            commands.push(ComputeCommand::Schedule(*id));
189                        }
190                        State::Hydrating(token) => {
191                            // We freed some hydration capacity and may be able to start hydrating
192                            // new collections.
193                            drop(token);
194                            commands.extend(self.hydrate_collections());
195                        }
196                    }
197                } else {
198                    self.collections.insert(*id, collection);
199                }
200            }
201        }
202
203        commands
204    }
205
206    /// Allow hydration based on the available capacity, returning the `Schedule` commands to send.
207    fn hydrate_collections(&mut self) -> Vec<ComputeCommand> {
208        let mut commands = Vec::new();
209
210        let capacity = HYDRATION_CONCURRENCY.get(&self.dyncfg);
211        while self.hydration_count() < capacity {
212            let Some(id) = self.hydration_queue.pop_front() else {
213                // Hydration queue is empty.
214                break;
215            };
216            let Some(collection) = self.collections.get_mut(&id) else {
217                // Collection has already been dropped.
218                continue;
219            };
220
221            debug!(%id, "starting collection hydration");
222            commands.push(ComputeCommand::Schedule(id));
223
224            let token = Arc::clone(&self.hydration_token);
225            collection.set_hydrating(token);
226        }
227
228        let queue_size = u64::cast_from(self.hydration_queue.len());
229        self.metrics.inner.hydration_queue_size.set(queue_size);
230
231        commands
232    }
233}
234
235/// Information about a tracked collection.
236#[derive(Debug)]
237struct Collection {
238    /// The as-of frontier at collection creation.
239    as_of: Antichain<Timestamp>,
240    /// The current state of the collection.
241    state: State,
242}
243
244impl Collection {
245    /// Create a new `Collection`.
246    fn new(as_of: Antichain<Timestamp>) -> Self {
247        Self {
248            as_of,
249            state: State::Created,
250        }
251    }
252
253    /// Advance this collection's state to `Scheduled`.
254    fn set_scheduled(&mut self) {
255        soft_assert_eq_or_log!(self.state, State::Created);
256        self.state = State::QueuedForHydration;
257    }
258
259    fn set_hydrating(&mut self, token: Token) {
260        soft_assert_eq_or_log!(self.state, State::QueuedForHydration);
261        self.state = State::Hydrating(token);
262    }
263}
264
265/// The state of a tracked collection.
266#[derive(Debug, PartialEq, Eq)]
267enum State {
268    /// Collection has been created and is waiting for a `Schedule` command.
269    Created,
270    /// The collection has received a `Schedule` command and has been added to the hydration queue,
271    /// waiting for hydration capacity.
272    QueuedForHydration,
273    /// Collection is hydrating and waiting for hydration to complete.
274    Hydrating(Token),
275}