mz_compute/server.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! An interactive dataflow server.
11
12use std::cell::RefCell;
13use std::collections::{BTreeMap, BTreeSet};
14use std::convert::Infallible;
15use std::fmt::Debug;
16use std::path::PathBuf;
17use std::rc::Rc;
18use std::sync::{Arc, Mutex};
19use std::time::{Duration, Instant};
20
21use anyhow::Error;
22use mz_cluster::client::{ClusterClient, ClusterSpec};
23use mz_cluster_client::client::TimelyConfig;
24use mz_compute_client::protocol::command::ComputeCommand;
25use mz_compute_client::protocol::history::ComputeCommandHistory;
26use mz_compute_client::protocol::response::ComputeResponse;
27use mz_compute_client::service::ComputeClient;
28use mz_ore::halt;
29use mz_ore::metrics::MetricsRegistry;
30use mz_ore::tracing::TracingHandle;
31use mz_persist_client::cache::PersistClientCache;
32use mz_storage_types::connections::ConnectionContext;
33use mz_timely_util::capture::EventLink;
34use mz_txn_wal::operator::TxnsContext;
35use timely::logging::TimelyEvent;
36use timely::progress::Antichain;
37use timely::worker::Worker as TimelyWorker;
38use tokio::sync::mpsc;
39use tokio::sync::mpsc::error::SendError;
40use tracing::{info, trace, warn};
41use uuid::Uuid;
42
43use crate::command_channel;
44use crate::compute_state::{ActiveComputeState, ComputeState, ReportedFrontier};
45use crate::metrics::{ComputeMetrics, WorkerMetrics};
46
47/// Caller-provided configuration for compute.
48#[derive(Clone, Debug)]
49pub struct ComputeInstanceContext {
50 /// A directory that can be used for scratch work.
51 pub scratch_directory: Option<PathBuf>,
52 /// Whether to set core affinity for Timely workers.
53 pub worker_core_affinity: bool,
54 /// Context required to connect to an external sink from compute,
55 /// like the `CopyToS3OneshotSink` compute sink.
56 pub connection_context: ConnectionContext,
57}
58
59/// Type alias for the storage timely log reader.
60pub(crate) type StorageTimelyLogReader =
61 Arc<EventLink<mz_repr::Timestamp, Vec<(Duration, TimelyEvent)>>>;
62
63/// Configures the server with compute-specific metrics.
64#[derive(Clone)]
65struct Config {
66 /// `persist` client cache.
67 pub persist_clients: Arc<PersistClientCache>,
68 /// Context necessary for rendering txn-wal operators.
69 pub txns_ctx: TxnsContext,
70 /// A process-global handle to tracing configuration.
71 pub tracing_handle: Arc<TracingHandle>,
72 /// Metrics exposed by compute replicas.
73 pub metrics: ComputeMetrics,
74 /// Other configuration for compute.
75 pub context: ComputeInstanceContext,
76 /// The process-global metrics registry.
77 pub metrics_registry: MetricsRegistry,
78 /// The number of timely workers per process.
79 pub workers_per_process: usize,
80 /// A reader for each storage worker in this process.
81 pub storage_log_readers: Arc<Mutex<Vec<Option<StorageTimelyLogReader>>>>,
82}
83
84/// Initiates a timely dataflow computation, processing compute commands.
85pub async fn serve(
86 timely_config: TimelyConfig,
87 metrics_registry: &MetricsRegistry,
88 persist_clients: Arc<PersistClientCache>,
89 txns_ctx: TxnsContext,
90 tracing_handle: Arc<TracingHandle>,
91 context: ComputeInstanceContext,
92 storage_log_readers: Vec<StorageTimelyLogReader>,
93) -> Result<impl Fn() -> Box<dyn ComputeClient> + use<>, Error> {
94 let workers_per_process = timely_config.workers;
95 // Normalize the log-reader vec to exactly one slot per local worker. Empty
96 // input means logging is disabled; pad with `None` so index-based access is
97 // always in bounds.
98 let storage_log_readers = if storage_log_readers.is_empty() {
99 (0..workers_per_process).map(|_| None).collect()
100 } else {
101 assert_eq!(storage_log_readers.len(), workers_per_process);
102 storage_log_readers.into_iter().map(Some).collect()
103 };
104 let config = Config {
105 persist_clients,
106 txns_ctx,
107 tracing_handle,
108 metrics: ComputeMetrics::register_with(metrics_registry),
109 context,
110 metrics_registry: metrics_registry.clone(),
111 workers_per_process,
112 storage_log_readers: Arc::new(Mutex::new(storage_log_readers)),
113 };
114 let tokio_executor = tokio::runtime::Handle::current();
115
116 let timely_container = config.build_cluster(timely_config, tokio_executor).await?;
117 let timely_container = Arc::new(Mutex::new(timely_container));
118
119 let client_builder = move || {
120 let client = ClusterClient::new(Arc::clone(&timely_container));
121 let client: Box<dyn ComputeClient> = Box::new(client);
122 client
123 };
124
125 Ok(client_builder)
126}
127
128/// Error type returned on connection nonce changes.
129///
130/// A nonce change informs workers that subsequent commands come a from a new client connection
131/// and therefore require reconciliation.
132struct NonceChange(Uuid);
133
134/// Endpoint used by workers to receive compute commands.
135///
136/// Observes nonce changes in the command stream and converts them into receive errors.
137struct CommandReceiver {
138 /// The channel supplying commands.
139 inner: command_channel::Receiver,
140 /// The ID of the Timely worker.
141 worker_id: usize,
142 /// The nonce identifying the current cluster protocol incarnation.
143 nonce: Option<Uuid>,
144 /// A stash to enable peeking the next command, used in `try_recv`.
145 stashed_command: Option<ComputeCommand>,
146}
147
148impl CommandReceiver {
149 fn new(inner: command_channel::Receiver, worker_id: usize) -> Self {
150 Self {
151 inner,
152 worker_id,
153 nonce: None,
154 stashed_command: None,
155 }
156 }
157
158 /// Receive the next pending command, if any.
159 ///
160 /// If the next command has a different nonce, this method instead returns an `Err`
161 /// containing the new nonce.
162 fn try_recv(&mut self) -> Result<Option<ComputeCommand>, NonceChange> {
163 if let Some(command) = self.stashed_command.take() {
164 return Ok(Some(command));
165 }
166 let Some((command, nonce)) = self.inner.try_recv() else {
167 return Ok(None);
168 };
169
170 trace!(worker = self.worker_id, %nonce, ?command, "received command");
171
172 if Some(nonce) == self.nonce {
173 Ok(Some(command))
174 } else {
175 self.nonce = Some(nonce);
176 self.stashed_command = Some(command);
177 Err(NonceChange(nonce))
178 }
179 }
180}
181
182/// Endpoint used by workers to send sending compute responses.
183///
184/// Tags responses with the current nonce, allowing receivers to filter out responses intended for
185/// previous client connections.
186pub(crate) struct ResponseSender {
187 /// The channel consuming responses.
188 inner: mpsc::UnboundedSender<(ComputeResponse, Uuid)>,
189 /// The ID of the Timely worker.
190 worker_id: usize,
191 /// The nonce identifying the current cluster protocol incarnation.
192 nonce: Option<Uuid>,
193}
194
195impl ResponseSender {
196 fn new(inner: mpsc::UnboundedSender<(ComputeResponse, Uuid)>, worker_id: usize) -> Self {
197 Self {
198 inner,
199 worker_id,
200 nonce: None,
201 }
202 }
203
204 /// Set the cluster protocol nonce.
205 fn set_nonce(&mut self, nonce: Uuid) {
206 self.nonce = Some(nonce);
207 }
208
209 /// Send a compute response.
210 pub fn send(&self, response: ComputeResponse) -> Result<(), SendError<ComputeResponse>> {
211 let nonce = self.nonce.expect("nonce must be initialized");
212
213 trace!(worker = self.worker_id, %nonce, ?response, "sending response");
214 self.inner
215 .send((response, nonce))
216 .map_err(|SendError((resp, _))| SendError(resp))
217 }
218}
219
220/// State maintained for each worker thread.
221///
222/// Much of this state can be viewed as local variables for the worker thread,
223/// holding state that persists across function calls.
224struct Worker<'w> {
225 /// The underlying Timely worker.
226 timely_worker: &'w mut TimelyWorker,
227 /// The channel over which commands are received.
228 command_rx: CommandReceiver,
229 /// The channel over which responses are sent.
230 response_tx: ResponseSender,
231 compute_state: Option<ComputeState>,
232 /// Compute metrics.
233 metrics: WorkerMetrics,
234 /// A process-global cache of (blob_uri, consensus_uri) -> PersistClient.
235 /// This is intentionally shared between workers
236 persist_clients: Arc<PersistClientCache>,
237 /// Context necessary for rendering txn-wal operators.
238 txns_ctx: TxnsContext,
239 /// A process-global handle to tracing configuration.
240 tracing_handle: Arc<TracingHandle>,
241 context: ComputeInstanceContext,
242 /// The process-global metrics registry.
243 metrics_registry: MetricsRegistry,
244 /// The number of timely workers per process.
245 workers_per_process: usize,
246 /// Reader for storage timely logging events.
247 storage_log_reader: Option<StorageTimelyLogReader>,
248}
249
250impl ClusterSpec for Config {
251 type Command = ComputeCommand;
252 type Response = ComputeResponse;
253
254 const NAME: &str = "compute";
255
256 fn run_worker(
257 &self,
258 timely_worker: &mut TimelyWorker,
259 client_rx: mpsc::UnboundedReceiver<(
260 Uuid,
261 mpsc::UnboundedReceiver<ComputeCommand>,
262 mpsc::UnboundedSender<ComputeResponse>,
263 )>,
264 ) {
265 if self.context.worker_core_affinity {
266 set_core_affinity(timely_worker.index());
267 }
268
269 let worker_id = timely_worker.index();
270 let metrics = self.metrics.for_worker(worker_id);
271
272 // Take this worker's storage log reader, indexed by local worker index
273 // so compute worker x matches storage worker x.
274 let local_index = worker_id % self.workers_per_process;
275 let storage_log_reader = self.storage_log_readers.lock().unwrap()[local_index].take();
276
277 // Create the command channel that broadcasts commands from worker 0 to other workers. We
278 // reuse this channel between client connections, to avoid bugs where different workers end
279 // up creating incompatible sides of the channel dataflow after reconnects.
280 // See database-issues#8964.
281 let (cmd_tx, cmd_rx) = command_channel::render(timely_worker);
282 let (resp_tx, resp_rx) = mpsc::unbounded_channel();
283
284 spawn_channel_adapter(client_rx, cmd_tx, resp_rx, worker_id);
285
286 Worker {
287 timely_worker,
288 command_rx: CommandReceiver::new(cmd_rx, worker_id),
289 response_tx: ResponseSender::new(resp_tx, worker_id),
290 metrics,
291 context: self.context.clone(),
292 persist_clients: Arc::clone(&self.persist_clients),
293 txns_ctx: self.txns_ctx.clone(),
294 compute_state: None,
295 tracing_handle: Arc::clone(&self.tracing_handle),
296 metrics_registry: self.metrics_registry.clone(),
297 workers_per_process: self.workers_per_process,
298 storage_log_reader,
299 }
300 .run()
301 }
302}
303
304/// Set the current thread's core affinity, based on the given `worker_id`.
305#[cfg(not(target_os = "macos"))]
306fn set_core_affinity(worker_id: usize) {
307 use tracing::error;
308
309 let Some(mut core_ids) = core_affinity::get_core_ids() else {
310 error!(worker_id, "unable to get core IDs for setting affinity");
311 return;
312 };
313
314 // The `get_core_ids` docs don't say anything about a guaranteed order of the returned Vec,
315 // so sort it just to be safe.
316 core_ids.sort_unstable_by_key(|i| i.id);
317
318 // On multi-process replicas `worker_id` might be greater than the number of available cores.
319 // However, we assume that we always have at least as many cores as there are local workers.
320 // Violating this assumption is safe but might lead to degraded performance due to skew in core
321 // utilization.
322 let idx = worker_id % core_ids.len();
323 let core_id = core_ids[idx];
324
325 if core_affinity::set_for_current(core_id) {
326 info!(
327 worker_id,
328 core_id = core_id.id,
329 "set core affinity for worker"
330 );
331 } else {
332 error!(
333 worker_id,
334 core_id = core_id.id,
335 "failed to set core affinity for worker"
336 )
337 }
338}
339
340/// Set the current thread's core affinity, based on the given `worker_id`.
341#[cfg(target_os = "macos")]
342fn set_core_affinity(_worker_id: usize) {
343 // Setting core affinity is known to not work on Apple Silicon:
344 // https://github.com/Elzair/core_affinity_rs/issues/22
345 info!("setting core affinity is not supported on macOS");
346}
347
348impl<'w> Worker<'w> {
349 /// Runs a compute worker.
350 pub fn run(&mut self) {
351 // The command receiver is initialized without an nonce, so receiving the first command
352 // always triggers a nonce change.
353 let NonceChange(nonce) = self.recv_command().expect_err("change to first nonce");
354 self.set_nonce(nonce);
355
356 loop {
357 let Err(NonceChange(nonce)) = self.run_client();
358 self.set_nonce(nonce);
359 }
360 }
361
362 fn set_nonce(&mut self, nonce: Uuid) {
363 self.response_tx.set_nonce(nonce);
364 }
365
366 /// Handles commands for a client connection, returns when the nonce changes.
367 fn run_client(&mut self) -> Result<Infallible, NonceChange> {
368 self.reconcile()?;
369
370 // The last time we did periodic maintenance.
371 let mut last_maintenance = Instant::now();
372
373 // Commence normal operation.
374 loop {
375 // Get the maintenance interval, default to zero if we don't have a compute state.
376 let maintenance_interval = self
377 .compute_state
378 .as_ref()
379 .map_or(Duration::ZERO, |state| state.server_maintenance_interval);
380
381 let now = Instant::now();
382 // Determine if we need to perform maintenance, which is true if `maintenance_interval`
383 // time has passed since the last maintenance.
384 let sleep_duration;
385 if now >= last_maintenance + maintenance_interval {
386 last_maintenance = now;
387 sleep_duration = None;
388
389 // Report frontier information back the coordinator.
390 if let Some(mut compute_state) = self.activate_compute() {
391 compute_state.compute_state.traces.maintenance();
392 compute_state.report_frontiers();
393 compute_state.report_metrics();
394 compute_state.check_expiration();
395 }
396
397 self.metrics.record_shared_row_metrics();
398 } else {
399 // We didn't perform maintenance, sleep until the next maintenance interval.
400 let next_maintenance = last_maintenance + maintenance_interval;
401 sleep_duration = Some(next_maintenance.saturating_duration_since(now))
402 };
403
404 // Step the timely worker, recording the time taken.
405 let timer = self.metrics.timely_step_duration_seconds.start_timer();
406 self.timely_worker.step_or_park(sleep_duration);
407 timer.observe_duration();
408
409 self.handle_pending_commands()?;
410
411 if let Some(mut compute_state) = self.activate_compute() {
412 compute_state.process_peeks();
413 compute_state.process_subscribes();
414 compute_state.process_copy_tos();
415 }
416 }
417 }
418
419 fn handle_pending_commands(&mut self) -> Result<(), NonceChange> {
420 while let Some(cmd) = self.command_rx.try_recv()? {
421 self.handle_command(cmd);
422 }
423 Ok(())
424 }
425
426 fn handle_command(&mut self, cmd: ComputeCommand) {
427 if matches!(&cmd, ComputeCommand::CreateInstance(_)) {
428 self.compute_state = Some(ComputeState::new(
429 Arc::clone(&self.persist_clients),
430 self.txns_ctx.clone(),
431 self.metrics.clone(),
432 Arc::clone(&self.tracing_handle),
433 self.context.clone(),
434 self.metrics_registry.clone(),
435 self.workers_per_process,
436 self.storage_log_reader.take(),
437 ));
438 }
439 self.activate_compute().unwrap().handle_compute_command(cmd);
440 }
441
442 fn activate_compute(&mut self) -> Option<ActiveComputeState<'_>> {
443 if let Some(compute_state) = &mut self.compute_state {
444 Some(ActiveComputeState {
445 timely_worker: &mut *self.timely_worker,
446 compute_state,
447 response_tx: &mut self.response_tx,
448 })
449 } else {
450 None
451 }
452 }
453
454 /// Receive the next compute command.
455 ///
456 /// This method blocks if no command is currently available, but takes care to step the Timely
457 /// worker while doing so.
458 fn recv_command(&mut self) -> Result<ComputeCommand, NonceChange> {
459 loop {
460 if let Some(cmd) = self.command_rx.try_recv()? {
461 return Ok(cmd);
462 }
463
464 let start = Instant::now();
465 self.timely_worker.step_or_park(None);
466 self.metrics
467 .timely_step_duration_seconds
468 .observe(start.elapsed().as_secs_f64());
469 }
470 }
471
472 /// Extract commands until `InitializationComplete`, and make the worker reflect those commands.
473 ///
474 /// This method is meant to be a function of the commands received thus far (as recorded in the
475 /// compute state command history) and the new commands from `command_rx`. It should not be a
476 /// function of other characteristics, like whether the worker has managed to respond to a peek
477 /// or not. Some effort goes in to narrowing our view to only the existing commands we can be sure
478 /// are live at all other workers.
479 ///
480 /// The methodology here is to drain `command_rx` until an `InitializationComplete`, at which point
481 /// the prior commands are "reconciled" in. Reconciliation takes each goal dataflow and looks for an
482 /// existing "compatible" dataflow (per `compatible()`) it can repurpose, with some additional tests
483 /// to be sure that we can cut over from one to the other (no additional compaction, no tails/sinks).
484 /// With any connections established, old orphaned dataflows are allow to compact away, and any new
485 /// dataflows are created from scratch. "Kept" dataflows are allowed to compact up to any new `as_of`.
486 ///
487 /// Some additional tidying happens, cleaning up pending peeks, reported frontiers, and creating a new
488 /// subscribe response buffer. We will need to be vigilant with future modifications to `ComputeState` to
489 /// line up changes there with clean resets here.
490 fn reconcile(&mut self) -> Result<(), NonceChange> {
491 // To initialize the connection, we want to drain all commands until we receive a
492 // `ComputeCommand::InitializationComplete` command to form a target command state.
493 let mut new_commands = Vec::new();
494 loop {
495 match self.recv_command()? {
496 ComputeCommand::InitializationComplete => break,
497 command => new_commands.push(command),
498 }
499 }
500
501 // Commands we will need to apply before entering normal service.
502 // These commands may include dropping existing dataflows, compacting existing dataflows,
503 // and creating new dataflows, in addition to standard peek and compaction commands.
504 // The result should be the same as if dropping all dataflows and running `new_commands`.
505 let mut todo_commands = Vec::new();
506 // We only have a compute history if we are in an initialized state
507 // (i.e. after a `CreateInstance`).
508 // If this is not the case, just copy `new_commands` into `todo_commands`.
509 if let Some(compute_state) = &mut self.compute_state {
510 // Reduce the installed commands.
511 // Importantly, act as if all peeks may have been retired (as we cannot know otherwise).
512 compute_state.command_history.discard_peeks();
513 compute_state.command_history.reduce();
514
515 // At this point, we need to sort out which of the *certainly installed* dataflows are
516 // suitable replacements for the requested dataflows. A dataflow is "certainly installed"
517 // as of a frontier if its compaction allows it to go no further. We ignore peeks for this
518 // reasoning, as we cannot be certain that peeks still exist at any other worker.
519
520 // Having reduced our installed command history retaining no peeks (above), we should be able
521 // to use track down installed dataflows we can use as surrogates for requested dataflows (which
522 // have retained all of their peeks, creating a more demanding `as_of` requirement).
523 // NB: installed dataflows may still be allowed to further compact, and we should double check
524 // this before being too confident. It should be rare without peeks, but could happen with e.g.
525 // multiple outputs of a dataflow.
526
527 // The values with which a prior `CreateInstance` was called, if it was.
528 let mut old_instance_config = None;
529 // Index dataflows by `export_ids().collect()`, as this is a precondition for their compatibility.
530 let mut old_dataflows = BTreeMap::default();
531 // Maintain allowed compaction, in case installed identifiers may have been allowed to compact.
532 let mut old_frontiers = BTreeMap::default();
533 for command in compute_state.command_history.iter() {
534 match command {
535 ComputeCommand::CreateInstance(config) => {
536 old_instance_config = Some(config);
537 }
538 ComputeCommand::CreateDataflow(dataflow) => {
539 let export_ids = dataflow.export_ids().collect::<BTreeSet<_>>();
540 old_dataflows.insert(export_ids, dataflow);
541 }
542 ComputeCommand::AllowCompaction { id, frontier } => {
543 old_frontiers.insert(id, frontier);
544 }
545 _ => {
546 // Nothing to do in these cases.
547 }
548 }
549 }
550
551 // Compaction commands that can be applied to existing dataflows.
552 let mut old_compaction = BTreeMap::default();
553 // Exported identifiers from dataflows we retain.
554 let mut retain_ids = BTreeSet::default();
555
556 // Traverse new commands, sorting out what remediation we can do.
557 for command in new_commands.iter() {
558 match command {
559 ComputeCommand::CreateDataflow(dataflow) => {
560 // Attempt to find an existing match for the dataflow.
561 let as_of = dataflow.as_of.as_ref().unwrap();
562 let export_ids = dataflow.export_ids().collect::<BTreeSet<_>>();
563
564 if let Some(old_dataflow) = old_dataflows.get(&export_ids) {
565 let compatible = old_dataflow.compatible_with(dataflow);
566 let uncompacted = !export_ids
567 .iter()
568 .flat_map(|id| old_frontiers.get(id))
569 .any(|frontier| {
570 !timely::PartialOrder::less_equal(
571 *frontier,
572 dataflow.as_of.as_ref().unwrap(),
573 )
574 });
575
576 // We cannot reconcile subscribe and copy-to sinks at the moment,
577 // because the response buffer is shared, and to a first approximation
578 // must be completely reformed.
579 let subscribe_free = dataflow.subscribe_ids().next().is_none();
580 let copy_to_free = dataflow.copy_to_ids().next().is_none();
581
582 // If we have replaced any dependency of this dataflow, we need to
583 // replace this dataflow, to make it use the replacement.
584 let dependencies_retained = dataflow
585 .imported_index_ids()
586 .all(|id| retain_ids.contains(&id));
587
588 if compatible
589 && uncompacted
590 && subscribe_free
591 && copy_to_free
592 && dependencies_retained
593 {
594 // Match found; remove the match from the deletion queue,
595 // and compact its outputs to the dataflow's `as_of`.
596 old_dataflows.remove(&export_ids);
597 for id in export_ids.iter() {
598 old_compaction.insert(*id, as_of.clone());
599 }
600 retain_ids.extend(export_ids);
601 } else {
602 warn!(
603 ?export_ids,
604 ?compatible,
605 ?uncompacted,
606 ?subscribe_free,
607 ?copy_to_free,
608 ?dependencies_retained,
609 old_as_of = ?old_dataflow.as_of,
610 new_as_of = ?as_of,
611 "dataflow reconciliation failed",
612 );
613
614 // Dump the full dataflow plans if they are incompatible, to
615 // simplify debugging hard-to-reproduce reconciliation failures.
616 if !compatible {
617 warn!(
618 old = ?old_dataflow,
619 new = ?dataflow,
620 "incompatible dataflows in reconciliation",
621 );
622 }
623
624 todo_commands
625 .push(ComputeCommand::CreateDataflow(dataflow.clone()));
626 }
627
628 compute_state.metrics.record_dataflow_reconciliation(
629 compatible,
630 uncompacted,
631 subscribe_free,
632 copy_to_free,
633 dependencies_retained,
634 );
635 } else {
636 todo_commands.push(ComputeCommand::CreateDataflow(dataflow.clone()));
637 }
638 }
639 ComputeCommand::CreateInstance(config) => {
640 // Cluster creation should not be performed again!
641 if old_instance_config.map_or(false, |old| !old.compatible_with(config)) {
642 halt!(
643 "new instance configuration not compatible with existing instance configuration:\n{:?}\nvs\n{:?}",
644 config,
645 old_instance_config,
646 );
647 }
648 }
649 // All other commands we apply as requested.
650 command => {
651 todo_commands.push(command.clone());
652 }
653 }
654 }
655
656 // Issue compaction commands first to reclaim resources.
657 for (_, dataflow) in old_dataflows.iter() {
658 for id in dataflow.export_ids() {
659 // We want to drop anything that has not yet been dropped,
660 // and nothing that has already been dropped.
661 if old_frontiers.get(&id) != Some(&&Antichain::new()) {
662 old_compaction.insert(id, Antichain::new());
663 }
664 }
665 }
666 for (&id, frontier) in &old_compaction {
667 let frontier = frontier.clone();
668 todo_commands.insert(0, ComputeCommand::AllowCompaction { id, frontier });
669 }
670
671 // Clean up worker-local state.
672 //
673 // Various aspects of `ComputeState` need to be either uninstalled, or return to a blank slate.
674 // All dropped dataflows should clean up after themselves, as we plan to install new dataflows
675 // re-using the same identifiers.
676 // All re-used dataflows should roll back any believed communicated information (e.g. frontiers)
677 // so that they recommunicate that information as if from scratch.
678
679 // Remove all pending peeks.
680 for (_, peek) in std::mem::take(&mut compute_state.pending_peeks) {
681 // Log dropping the peek request.
682 if let Some(logger) = compute_state.compute_logger.as_mut() {
683 logger.log(&peek.as_log_event(false));
684 }
685 }
686
687 for (&id, collection) in compute_state.collections.iter_mut() {
688 // Adjust reported frontiers:
689 // * For dataflows we continue to use, reset to ensure we report something not
690 // before the new `as_of` next.
691 // * For dataflows we drop, set to the empty frontier, to ensure we don't report
692 // anything for them.
693 let retained = retain_ids.contains(&id);
694 let compaction = old_compaction.remove(&id);
695 let new_reported_frontier = match (retained, compaction) {
696 (true, Some(new_as_of)) => ReportedFrontier::NotReported { lower: new_as_of },
697 (true, None) => {
698 unreachable!("retained dataflows are compacted to the new as_of")
699 }
700 (false, Some(new_frontier)) => {
701 assert!(new_frontier.is_empty());
702 ReportedFrontier::Reported(new_frontier)
703 }
704 (false, None) => {
705 // Logging dataflows are implicitly retained and don't have a new as_of.
706 // Reset them to the minimal frontier.
707 ReportedFrontier::new()
708 }
709 };
710
711 collection.reset_reported_frontiers(new_reported_frontier);
712
713 // Sink tokens should be retained for retained dataflows, and dropped for dropped
714 // dataflows.
715 //
716 // Dropping the tokens of active subscribe and copy-tos makes them place
717 // `DroppedAt` responses into the respective response buffer. We drop those buffers
718 // in the next step, which ensures that we don't send out `DroppedAt` responses for
719 // subscribe/copy-tos dropped during reconciliation.
720 if !retained {
721 collection.sink_token = None;
722 }
723 }
724
725 // We must drop the response buffers as they are global across all subscribe/copy-tos.
726 // If they were broken out by `GlobalId` then we could drop only the response buffers
727 // of dataflows we drop.
728 compute_state.subscribe_response_buffer = Rc::new(RefCell::new(Vec::new()));
729 compute_state.copy_to_response_buffer = Rc::new(RefCell::new(Vec::new()));
730
731 // The controller expects the logging collections to be readable from the minimum time
732 // initially. We cannot recreate the logging arrangements without restarting the
733 // instance, but we can pad the compacted times with empty data. Doing so is sound
734 // because logging collections from different replica incarnations are considered
735 // distinct TVCs, so the controller doesn't expect any historical consistency from
736 // these collections when it reconnects to a replica.
737 //
738 // TODO(database-issues#8152): Consider resolving this with controller-side reconciliation instead.
739 if let Some(config) = old_instance_config {
740 for id in config.logging.index_logs.values() {
741 let trace = compute_state
742 .traces
743 .remove(id)
744 .expect("logging trace exists");
745 let padded = trace.into_padded();
746 compute_state.traces.set(*id, padded);
747 }
748 }
749 } else {
750 todo_commands.clone_from(&new_commands);
751 }
752
753 // Execute the commands to bring us to `new_commands`.
754 for command in todo_commands.into_iter() {
755 self.handle_command(command);
756 }
757
758 // Overwrite `self.command_history` to reflect `new_commands`.
759 // It is possible that there still isn't a compute state yet.
760 if let Some(compute_state) = &mut self.compute_state {
761 let mut command_history = ComputeCommandHistory::new(self.metrics.for_history());
762 for command in new_commands.iter() {
763 command_history.push(command.clone());
764 }
765 compute_state.command_history = command_history;
766 }
767 Ok(())
768 }
769}
770
771/// Spawn a task to bridge between [`ClusterClient`] and [`Worker`] channels.
772///
773/// The [`Worker`] expects a pair of persistent channels, with punctuation marking reconnects,
774/// while the [`ClusterClient`] provides a new pair of channels on each reconnect.
775fn spawn_channel_adapter(
776 mut client_rx: mpsc::UnboundedReceiver<(
777 Uuid,
778 mpsc::UnboundedReceiver<ComputeCommand>,
779 mpsc::UnboundedSender<ComputeResponse>,
780 )>,
781 command_tx: command_channel::Sender,
782 mut response_rx: mpsc::UnboundedReceiver<(ComputeResponse, Uuid)>,
783 worker_id: usize,
784) {
785 mz_ore::task::spawn(
786 || format!("compute-channel-adapter-{worker_id}"),
787 async move {
788 // To make workers aware of the individual client connections, we tag forwarded
789 // commands with the client nonce. Additionally, we use the nonce to filter out
790 // responses with a different nonce, which are intended for different client
791 // connections.
792 //
793 // It's possible that we receive responses with nonces from the past but also from the
794 // future: Worker 0 might have received a new nonce before us and broadcasted it to our
795 // Timely cluster. When we receive a response with a future nonce, we need to wait with
796 // forwarding it until we have received the same nonce from a client connection.
797 //
798 // Nonces are not ordered so we don't know whether a response nonce is from the past or
799 // the future. We thus assume that every response with an unknown nonce might be from
800 // the future and stash them all. Every time we reconnect, we immediately send all
801 // stashed responses with a matching nonce. Every time we receive a new response with a
802 // nonce that matches our current one, we can discard the entire response stash as we
803 // know that all stashed responses must be from the past.
804 let mut stashed_responses = BTreeMap::<Uuid, Vec<ComputeResponse>>::new();
805
806 while let Some((nonce, mut command_rx, response_tx)) = client_rx.recv().await {
807 // Send stashed responses for this client.
808 if let Some(resps) = stashed_responses.remove(&nonce) {
809 for resp in resps {
810 let _ = response_tx.send(resp);
811 }
812 }
813
814 // Wait for a new response while forwarding received commands.
815 let mut serve_rx_channels = async || loop {
816 tokio::select! {
817 msg = command_rx.recv() => match msg {
818 Some(cmd) => command_tx.send((cmd, nonce)),
819 None => return Err(()),
820 },
821 msg = response_rx.recv() => {
822 return Ok(msg.expect("worker connected"));
823 }
824 }
825 };
826
827 // Serve this connection until we see any of the channels disconnect.
828 loop {
829 let Ok((resp, resp_nonce)) = serve_rx_channels().await else {
830 break;
831 };
832
833 if resp_nonce == nonce {
834 // Response for the current connection; forward it.
835 stashed_responses.clear();
836 if response_tx.send(resp).is_err() {
837 break;
838 }
839 } else {
840 // Response for a past or future connection; stash it.
841 let stash = stashed_responses.entry(resp_nonce).or_default();
842 stash.push(resp);
843 }
844 }
845 }
846 },
847 );
848}