1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.
//! A client backed by multiple replicas.
//!
//! This client accepts commands and responds as would a correctly implemented client.
//! Its implementation is wrapped around clients that may fail at any point, and restart.
//! To accommodate this, it records the commands it accepts, and should a client restart
//! the commands are replayed at it, with some modification. As the clients respond, the
//! wrapper client tracks the responses and ensures that they are "logically deduplicated",
//! so that the receiver need not be aware of the replication and restarting.
//!
//! This tactic requires that dataflows be restartable, which they generally are not, due
//! to allowed compaction of their source data. This client must correctly observe commands
//! that allow for compaction of its assets, and only attempt to rebuild them as of those
//! compacted frontiers, as the underlying resources to rebuild them any earlier may not
//! exist any longer.
use std::collections::{HashMap, VecDeque};
use chrono::Utc;
use timely::progress::frontier::MutableAntichain;
use timely::progress::Antichain;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tokio_stream::wrappers::UnboundedReceiverStream;
use mz_ore::tracing::OpenTelemetryContext;
use mz_repr::GlobalId;
use super::{ActiveReplicationResponse, ReplicaId};
use super::{ComputeClient, GenericClient};
use super::{ComputeCommand, ComputeResponse};
use super::{Peek, PeekResponse};
/// Spawns a task that repeatedly sends messages back and forth
/// between a client and its owner, and return channels to communicate with it.
///
/// This can be useful because sending to an `mpsc` is synchronous, eliminating
/// cancelation-unsafety in some cases.
///
/// For this to be useful, `Client::recv` must itself be cancelation-safe.
pub fn spawn_client_task<
C: Send + 'static,
R: Send + 'static,
Client: GenericClient<C, R> + 'static,
Name: AsRef<str>,
NameClosure: FnOnce() -> Name,
>(
mut client: Client,
nc: NameClosure,
) -> (
UnboundedSender<C>,
UnboundedReceiver<Result<R, anyhow::Error>>,
) {
let (cmd_tx, mut cmd_rx) = unbounded_channel();
let (response_tx, response_rx) = unbounded_channel();
mz_ore::task::spawn(nc, async move {
loop {
tokio::select! {
m = cmd_rx.recv() => {
match m {
Some(c) => {
// Issues should be detected, and
// reconnect attempted, on the `client.recv` path.
let _ = client.send(c).await;
},
None => break,
}
},
m = client.recv() => {
match m.transpose() {
Some(m) => {
if response_tx.send(m).is_err() {
break;
}
}
None => break,
}
}
}
}
});
(cmd_tx, response_rx)
}
/// Additional information to store with pening peeks.
#[derive(Debug)]
pub struct PendingPeek {
/// The OpenTelemetry context for this peek.
otel_ctx: OpenTelemetryContext,
}
/// A client backed by multiple replicas.
#[derive(Debug)]
pub struct ActiveReplication<T> {
/// Handles to the replicas themselves.
replicas: HashMap<
ReplicaId,
(
UnboundedSender<ComputeCommand<T>>,
UnboundedReceiverStream<Result<ComputeResponse<T>, anyhow::Error>>,
),
>,
/// Outstanding peek identifiers, to guide responses (and which to suppress).
peeks: HashMap<uuid::Uuid, PendingPeek>,
/// Reported frontier of each in-progress tail.
tails: HashMap<GlobalId, Antichain<T>>,
/// Frontier information, both unioned across all replicas and from each individual replica.
uppers: HashMap<GlobalId, (Antichain<T>, HashMap<ReplicaId, MutableAntichain<T>>)>,
/// The command history, used when introducing new replicas or restarting existing replicas.
history: crate::client::ComputeCommandHistory<T>,
/// Most recent count of the volume of unpacked commands (e.g. dataflows in `CreateDataflows`).
last_command_count: usize,
/// Responses that should be emitted on the next `recv` call.
///
/// This is introduced to produce peek cancelation responses eagerly, without awaiting a replica
/// responding with the response itself, which allows us to compact away the peek in `self.history`.
pending_response: VecDeque<ActiveReplicationResponse<T>>,
}
impl<T> Default for ActiveReplication<T> {
fn default() -> Self {
Self {
replicas: Default::default(),
peeks: Default::default(),
tails: Default::default(),
uppers: Default::default(),
history: Default::default(),
last_command_count: 0,
pending_response: Default::default(),
}
}
}
impl<T> ActiveReplication<T>
where
T: timely::progress::Timestamp,
{
/// Introduce a new replica, and catch it up to the commands of other replicas.
///
/// It is not yet clear under which circumstances a replica can be removed.
pub fn add_replica<C: ComputeClient<T> + 'static>(&mut self, id: ReplicaId, client: C) {
for (_, frontiers) in self.uppers.values_mut() {
frontiers.insert(id, {
let mut frontier = timely::progress::frontier::MutableAntichain::new();
frontier.update_iter(Some((T::minimum(), 1)));
frontier
});
}
let (cmd_tx, resp_rx) =
spawn_client_task(client, || "ActiveReplication client message pump");
self.replicas.insert(id, (cmd_tx, resp_rx.into()));
self.hydrate_replica(id);
}
pub fn get_replica_ids(&self) -> impl Iterator<Item = ReplicaId> + '_ {
self.replicas.keys().copied()
}
/// Remove a replica by its identifier.
pub fn remove_replica(&mut self, id: ReplicaId) {
self.replicas.remove(&id);
for (_frontier, frontiers) in self.uppers.iter_mut() {
frontiers.1.remove(&id);
}
}
/// Pipes a command stream at the indicated replica, introducing new dataflow identifiers.
fn hydrate_replica(&mut self, replica_id: ReplicaId) {
// Zero out frontiers maintained by this replica.
for (_id, (_, frontiers)) in self.uppers.iter_mut() {
*frontiers.get_mut(&replica_id).unwrap() =
timely::progress::frontier::MutableAntichain::new();
frontiers
.get_mut(&replica_id)
.unwrap()
.update_iter(Some((T::minimum(), 1)));
}
// Take this opportunity to clean up the history we should present.
self.last_command_count = self.history.reduce(&self.peeks);
// Replay the commands at the client, creating new dataflow identifiers.
let (cmd_tx, _) = self.replicas.get_mut(&replica_id).unwrap();
for command in self.history.iter() {
let mut command = command.clone();
specialize_command(&mut command, replica_id);
cmd_tx
.send(command)
.expect("Channel to client has gone away!")
}
}
}
#[async_trait::async_trait]
impl<T> GenericClient<ComputeCommand<T>, ActiveReplicationResponse<T>> for ActiveReplication<T>
where
T: timely::progress::Timestamp + differential_dataflow::lattice::Lattice + std::fmt::Debug,
{
/// The ADAPTER layer's isolation from COMPUTE depends on the fact that this
/// function is essentially non-blocking, i.e. the ADAPTER blindly awaits
/// calls to this function. This lets the ADAPTER continue operating even in
/// the face of unhealthy or absent replicas.
///
/// If this function every become blocking (e.g. making networking calls),
/// the ADAPTER must amend its contract with COMPUTE.
async fn send(&mut self, cmd: ComputeCommand<T>) -> Result<(), anyhow::Error> {
// Update our tracking of peek commands.
match &cmd {
ComputeCommand::Peek(Peek { uuid, otel_ctx, .. }) => {
self.peeks.insert(
*uuid,
PendingPeek {
// TODO(guswynn): can we just hold the `tracing::Span`
// here instead?
otel_ctx: otel_ctx.clone(),
},
);
}
ComputeCommand::CancelPeeks { uuids } => {
// Enqueue the response to the cancelation.
self.pending_response.extend(uuids.iter().map(|uuid| {
// Canceled peeks should not be further responded to.
let otel_ctx = self
.peeks
.remove(uuid)
.map(|pending| pending.otel_ctx)
.unwrap_or_else(|| {
tracing::warn!("did not find pending peek for {}", uuid);
OpenTelemetryContext::empty()
});
ActiveReplicationResponse::ComputeResponse(ComputeResponse::PeekResponse(
*uuid,
PeekResponse::Canceled,
otel_ctx,
))
}));
}
_ => {}
}
// Initialize any necessary frontier tracking.
let mut start = Vec::new();
let mut cease = Vec::new();
cmd.frontier_tracking(&mut start, &mut cease);
for id in start.into_iter() {
let frontier = timely::progress::Antichain::from_elem(T::minimum());
let frontiers = self
.replicas
.keys()
.map(|id| {
let mut frontier = timely::progress::frontier::MutableAntichain::new();
frontier.update_iter(Some((T::minimum(), 1)));
(id.clone(), frontier)
})
.collect();
let previous = self.uppers.insert(id, (frontier, frontiers));
assert!(previous.is_none());
}
for id in cease.into_iter() {
let previous = self.uppers.remove(&id);
assert!(previous.is_some());
}
// Record the command so that new replicas can be brought up to speed.
self.history.push(cmd.clone());
// If we have reached a point that justifies history reduction, do that.
if self.history.len() > 2 * self.last_command_count {
self.last_command_count = self.history.reduce(&self.peeks);
}
// Clone the command for each active replica.
for (id, (tx, _)) in self.replicas.iter_mut() {
let mut command = cmd.clone();
specialize_command(&mut command, *id);
// Errors are suppressed by this client, which awaits a reconnection
// in `recv` and will rehydrate the client when that happens.
//
// NOTE: Broadcasting commands to replicas irrespective of their
// presence or health is part of the isolation contract between
// ADAPTER and COMPUTE. If this changes (e.g. awaiting responses
// from replicas), ADAPTER needs to handle its interactions with
// COMPUTE differently.
let _ = tx.send(command);
}
Ok(())
}
async fn recv(&mut self) -> Result<Option<ActiveReplicationResponse<T>>, anyhow::Error> {
// If we have a pending response, we should send it immediately.
if let Some(response) = self.pending_response.pop_front() {
return Ok(Some(response));
}
if self.replicas.is_empty() {
// We want to communicate that the result is not ready
futures::future::pending().await
} else {
// We may need to iterate, if a replica needs rehydration.
let mut clean_recv = false;
while !clean_recv {
let mut errored_replica = None;
// Receive responses from any of the replicas, and take appropriate action.
let mut stream: tokio_stream::StreamMap<_, _> = self
.replicas
.iter_mut()
.map(|(id, (_, rx))| (id.clone(), rx))
.collect();
use futures::StreamExt;
while let Some((replica_id, message)) = stream.next().await {
self.pending_response
.push_front(ActiveReplicationResponse::ReplicaHeartbeat(
replica_id,
Utc::now(),
));
match message {
Ok(ComputeResponse::PeekResponse(uuid, response, otel_ctx)) => {
// If this is the first response, forward it; otherwise do not.
// TODO: we could collect the other responses to assert equivalence?
// Trades resources (memory) for reassurances; idk which is best.
//
// NOTE: we use the `otel_ctx` from the response, not the
// pending peek, because we currently want the parent
// to be whatever the compute worker did with this peek.
//
// Additionally, we just use the `otel_ctx` from the first worker to
// respond.
if self.peeks.remove(&uuid).is_some() {
return Ok(Some(ActiveReplicationResponse::ComputeResponse(
ComputeResponse::PeekResponse(uuid, response, otel_ctx),
)));
}
}
Ok(ComputeResponse::FrontierUppers(mut list)) => {
for (id, changes) in list.iter_mut() {
if let Some((frontier, frontiers)) = self.uppers.get_mut(id) {
// Apply changes to replica `replica_id`
frontiers
.get_mut(&replica_id)
.unwrap()
.update_iter(changes.drain());
// We can swap `frontier` into `changes, negated, and then use that to repopulate `frontier`.
// Working
changes.extend(frontier.iter().map(|t| (t.clone(), -1)));
frontier.clear();
for (time1, _neg_one) in changes.iter() {
for time2 in frontiers[&replica_id].frontier().iter() {
frontier.insert(time1.join(time2));
}
}
changes.extend(frontier.iter().map(|t| (t.clone(), 1)));
changes.compact();
}
}
if !list.is_empty() {
return Ok(Some(ActiveReplicationResponse::ComputeResponse(
ComputeResponse::FrontierUppers(list),
)));
}
}
Ok(ComputeResponse::TailResponse(id, response)) => {
use crate::{TailBatch, TailResponse};
match response {
TailResponse::Batch(TailBatch {
lower: _,
upper,
mut updates,
}) => {
// It is sufficient to compare `upper` against the last reported frontier for `id`,
// and if `upper` is not less or equal to that frontier, some progress has happened.
// If so, we retain only the updates greater or equal to that last reported frontier,
// and announce a batch from that frontier to its join with `upper`.
// Ensure that we have a recorded frontier ready to go.
let entry = self
.tails
.entry(id)
.or_insert_with(|| Antichain::from_elem(T::minimum()));
// If the upper frontier has changed, we have a statement to make.
// This happens if there is any element of `entry` not greater or
// equal to some element of `upper`.
use differential_dataflow::lattice::Lattice;
let new_upper = entry.join(&upper);
if &new_upper != entry {
let new_lower = entry.clone();
entry.clone_from(&new_upper);
updates.retain(|(time, _data, _diff)| {
new_lower.less_equal(time)
});
return Ok(Some(
ActiveReplicationResponse::ComputeResponse(
ComputeResponse::TailResponse(
id,
TailResponse::Batch(TailBatch {
lower: new_lower,
upper: new_upper,
updates,
}),
),
),
));
}
}
TailResponse::DroppedAt(frontier) => {
// Introduce a new terminal frontier to suppress all future responses.
// We cannot simply remove the entry, as we currently create new entries in response
// to observed responses; if we pre-load the entries in response to commands we can
// clean up the state here.
self.tails.insert(id, Antichain::new());
return Ok(Some(ActiveReplicationResponse::ComputeResponse(
ComputeResponse::TailResponse(
id,
TailResponse::DroppedAt(frontier),
),
)));
}
}
}
Err(_error) => {
errored_replica = Some(replica_id);
break;
}
}
}
drop(stream);
if let Some(replica_id) = errored_replica {
tracing::warn!("Rehydrating replica {:?}", replica_id);
self.hydrate_replica(replica_id);
}
clean_recv = errored_replica.is_none();
}
// Indicate completion of the communication.
Ok(None)
}
}
}
/// Specialize a command for the given `ReplicaId`.
///
/// Most `ComputeCommand`s are independent of the target replica, but some
/// contain replica-specific fields that must be adjusted before sending.
fn specialize_command<T>(command: &mut ComputeCommand<T>, replica_id: ReplicaId) {
// Tell new instances their replica ID.
if let ComputeCommand::CreateInstance(config) = command {
config.replica_id = replica_id;
}
// Replace dataflow identifiers with new unique ids.
if let ComputeCommand::CreateDataflows(dataflows) = command {
for dataflow in dataflows.iter_mut() {
dataflow.id = uuid::Uuid::new_v4();
}
}
}