1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.
//! Sequential dataflow hydration support for replicas.
//!
//! Sequential hydration enforces a configurable "hydration concurrency" that limits how many
//! dataflows may be hydrating at the same time. Limiting hydrating concurrency can be beneficial
//! in reducing peak memory usage, cross-dataflow thrashing, and hydration time.
//!
//! The configured hydration concurrency is enforced by delaying the delivery of `Schedule` compute
//! commands to the replica. Those commands are emitted by the controller for collections that
//! become ready to hydrate (based on availability of input data) and are directly applied by
//! replicas by unsuspending the corresponding dataflows. Delaying `Schedule` commands allows us to
//! ensure only a limited number of dataflows can hydrate at the same time.
//!
//! Note that a dataflow may export multiple collections. `Schedule` commands are produced per
//! collection but hydration is a dataflow-level mechanism. In practice Materialize today only
//! produces dataflow with a single export and we rely on this assumption here to simplify the
//! implementation. If the assumption ever ceases to hold, we will need to adjust the code in this
//! module.
//!
//! Sequential hydration is enforeced by a `SequentialHydration` client that sits between the
//! controller and the `PartitionedState` client that splits commands across replica processes.
//! This location is important:
//!
//! * It needs to be behind the controller since hydration is a per-replica mechanism. Different
//! replicas can progress through hydration at different paces.
//! * It needs to be before the `PartitionedState` client because all replica workers must see
//! `Schedule` commands in the same order. Otherwise we risk getting stuck when different
//! workers hydrate different dataflows and wait on each other for progress in these dataflows.
//! * It also needs to be before the `PartitionedState` client because it needs to be able to
//! observe all compute commands. Clients behind `PartitionedState` are not guaranteed to do so,
//! since commands are only forwarded to the first process.
use std::collections::{BTreeMap, VecDeque};
use std::sync::Arc;
use async_trait::async_trait;
use mz_compute_types::dyncfgs::HYDRATION_CONCURRENCY;
use mz_dyncfg::ConfigSet;
use mz_ore::cast::CastFrom;
use mz_ore::collections::CollectionExt;
use mz_ore::soft_assert_eq_or_log;
use mz_ore::task::AbortOnDropHandle;
use mz_repr::GlobalId;
use mz_service::client::GenericClient;
use timely::progress::Antichain;
use timely::PartialOrder;
use tokio::sync::mpsc;
use tracing::debug;
use crate::controller::ComputeControllerTimestamp;
use crate::metrics::ReplicaMetrics;
use crate::protocol::command::ComputeCommand;
use crate::protocol::response::{ComputeResponse, FrontiersResponse};
use crate::service::ComputeClient;
/// A shareable token.
type Token = Arc<()>;
/// A client enforcing sequential dataflow hydration.
#[derive(Debug)]
pub(super) struct SequentialHydration<T> {
/// A sender for commands to the wrapped client.
command_tx: mpsc::UnboundedSender<ComputeCommand<T>>,
/// A receiver for responses from the wrapped client.
response_rx: mpsc::UnboundedReceiver<Result<ComputeResponse<T>, anyhow::Error>>,
/// Dynamic system configuration.
dyncfg: Arc<ConfigSet>,
/// Tracked metrics.
metrics: ReplicaMetrics,
/// Tracked collections.
///
/// Entries are inserted in response to observed `CreateDataflow` commands.
/// Entries are removed in response to `Frontiers` commands that report collection
/// hydration, or in response to `AllowCompaction` commands that specify the empty frontier.
collections: BTreeMap<GlobalId, Collection<T>>,
/// A queue of scheduled collections that are awaiting hydration.
hydration_queue: VecDeque<GlobalId>,
/// A token held by hydrating collections.
///
/// Useful to efficiently determine how many collections are currently in the process of
/// hydration, and thus how much capacity is available.
hydration_token: Token,
/// Handle to the forwarder task, to abort it when `SequentialHydration` is dropped.
_forwarder_task: AbortOnDropHandle<()>,
}
impl<T> SequentialHydration<T>
where
T: ComputeControllerTimestamp,
{
/// Create a new `SequentialHydration` client.
pub fn new<C>(client: C, dyncfg: Arc<ConfigSet>, metrics: ReplicaMetrics) -> Self
where
C: ComputeClient<T> + 'static,
{
let (command_tx, command_rx) = mpsc::unbounded_channel();
let (response_tx, response_rx) = mpsc::unbounded_channel();
let forwarder = mz_ore::task::spawn(
|| "sequential_hydration:forwarder",
forward_messages(client, command_rx, response_tx),
);
Self {
command_tx,
response_rx,
dyncfg,
metrics,
collections: Default::default(),
hydration_queue: Default::default(),
hydration_token: Default::default(),
_forwarder_task: forwarder.abort_on_drop(),
}
}
/// Return the number of hydrating collections.
fn hydration_count(&self) -> usize {
Arc::strong_count(&self.hydration_token) - 1
}
/// Absorb a command and send resulting commands to the wrapped client.
fn absorb_command(&mut self, cmd: ComputeCommand<T>) -> Result<(), anyhow::Error> {
// Whether to forward this command to the wrapped client.
let mut forward = true;
match &cmd {
// We enforce sequential hydration only for non-transient dataflows, assuming that
// transient dataflows are created for interactive user queries and should always be
// scheduled as soon as possible.
ComputeCommand::CreateDataflow(dataflow) if !dataflow.is_transient() => {
let export_ids: Vec<_> = dataflow.export_ids().collect();
let id = export_ids.expect_element(|| "multi-export dataflows are not supported");
let as_of = dataflow.as_of.clone().unwrap();
debug!(%id, ?as_of, "tracking collection");
self.collections.insert(id, Collection::new(as_of));
}
ComputeCommand::Schedule(id) => {
if let Some(collection) = self.collections.get_mut(id) {
debug!(%id, "enqueuing collection for hydration");
self.hydration_queue.push_back(*id);
collection.set_scheduled();
forward = false;
}
}
ComputeCommand::AllowCompaction { id, frontier } if frontier.is_empty() => {
// The collection was dropped by the controller. Remove it from the tracking state
// to ensure we don't produce any more commands for it.
if self.collections.remove(id).is_some() {
debug!(%id, "collection dropped");
}
}
_ => (),
}
if forward {
self.command_tx.send(cmd)?;
}
// Schedule collections that are ready now.
self.hydrate_collections()
}
/// Observe a response and send resulting commands to the wrapped client.
fn observe_response(&mut self, resp: &ComputeResponse<T>) -> Result<(), anyhow::Error> {
if let ComputeResponse::Frontiers(
id,
FrontiersResponse {
output_frontier: Some(frontier),
..
},
) = resp
{
if let Some(collection) = self.collections.remove(id) {
let hydrated = PartialOrder::less_than(&collection.as_of, frontier);
if hydrated || frontier.is_empty() {
debug!(%id, "collection hydrated");
// Note that it is possible to observe hydration even for collections for which
// we never sent a `Schedule` command, if the replica decided to not suspend
// the dataflow after creation. The compute protocol does not require replicas
// to create dataflows in suspended state. It seems like a good idea to still
// send a `Schedule` command in this case, rather than swallowing it, to make
// the protocol communication more predicatable.
match collection.state {
State::Created => {
// We haven't seen a `Schedule` command yet, so no obligations to send
// one either.
}
State::QueuedForHydration => {
// We are holding back the `Schedule` command for this collection. Send
// it now.
self.command_tx.send(ComputeCommand::Schedule(*id))?;
}
State::Hydrating(token) => {
// We freed some hydration capacity and may be able to start hydrating
// new collections.
drop(token);
self.hydrate_collections()?;
}
}
} else {
self.collections.insert(*id, collection);
}
}
}
Ok(())
}
/// Allow hydration based on the available capacity.
fn hydrate_collections(&mut self) -> Result<(), anyhow::Error> {
let capacity = HYDRATION_CONCURRENCY.get(&self.dyncfg);
while self.hydration_count() < capacity {
let Some(id) = self.hydration_queue.pop_front() else {
// Hydration queue is empty.
break;
};
let Some(collection) = self.collections.get_mut(&id) else {
// Collection has already been dropped.
continue;
};
debug!(%id, "starting collection hydration");
self.command_tx.send(ComputeCommand::Schedule(id))?;
let token = Arc::clone(&self.hydration_token);
collection.set_hydrating(token);
}
let queue_size = u64::cast_from(self.hydration_queue.len());
self.metrics.inner.hydration_queue_size.set(queue_size);
Ok(())
}
}
#[async_trait]
impl<T> GenericClient<ComputeCommand<T>, ComputeResponse<T>> for SequentialHydration<T>
where
T: ComputeControllerTimestamp,
{
async fn send(&mut self, cmd: ComputeCommand<T>) -> Result<(), anyhow::Error> {
self.absorb_command(cmd)
}
/// # Cancel safety
///
/// This method is cancel safe. If `recv` is used as the event in a [`tokio::select!`]
/// statement and some other branch completes first, it is guaranteed that no messages were
/// received by this client.
async fn recv(&mut self) -> Result<Option<ComputeResponse<T>>, anyhow::Error> {
// `mpsc::UnboundedReceiver::recv` is documented as cancel safe.
match self.response_rx.recv().await {
Some(Ok(response)) => {
self.observe_response(&response)?;
Ok(Some(response))
}
Some(Err(error)) => Err(error), // client error
None => Ok(None), // client disconnected
}
}
}
/// Information about a tracked collection.
#[derive(Debug)]
struct Collection<T> {
/// The as-of frontier at collection creation.
as_of: Antichain<T>,
/// The current state of the collection.
state: State,
}
impl<T> Collection<T> {
/// Create a new `Collection`.
fn new(as_of: Antichain<T>) -> Self {
Self {
as_of,
state: State::Created,
}
}
/// Advance this collection's state to `Scheduled`.
fn set_scheduled(&mut self) {
soft_assert_eq_or_log!(self.state, State::Created);
self.state = State::QueuedForHydration;
}
fn set_hydrating(&mut self, token: Token) {
soft_assert_eq_or_log!(self.state, State::QueuedForHydration);
self.state = State::Hydrating(token);
}
}
/// The state of a tracked collection.
#[derive(Debug, PartialEq, Eq)]
enum State {
/// Collection has been created and is waiting for a `Schedule` command.
Created,
/// The collection has received a `Schedule` command and has been added to the hydration queue,
/// waiting for hydration capacity.
QueuedForHydration,
/// Collection is hydrating and waiting for hydration to complete.
Hydrating(Token),
}
/// Forward messages between a pair of channels and a [`ComputeClient`].
///
/// This functions is run in its own task and exists to allow `SequentialHydration::recv` to be
/// cancel safe even though it needs to send commands to the wrapped client, which isn't cancel
/// safe.
async fn forward_messages<C, T>(
mut client: C,
mut rx: mpsc::UnboundedReceiver<ComputeCommand<T>>,
tx: mpsc::UnboundedSender<Result<ComputeResponse<T>, anyhow::Error>>,
) where
C: ComputeClient<T>,
{
loop {
tokio::select! {
command = rx.recv() => {
let Some(command) = command else {
break; // `SequentialHydration` dropped
};
if let Err(error) = client.send(command).await {
// Client produced an unrecoverable error.
let _ = tx.send(Err(error));
break;
}
}
response = client.recv() => {
let response = match response {
Ok(Some(response)) => response,
Ok(None) => {
break; // client disconnected
}
Err(error) => {
// Client produced an unrecoverable error.
let _ = tx.send(Err(error));
break;
}
};
if tx.send(Ok(response)).is_err() {
break; // `SequentialHydration` dropped
}
}
}
}
}