Skip to main content

mz_compute/
server.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! An interactive dataflow server.
11
12use std::cell::RefCell;
13use std::collections::{BTreeMap, BTreeSet};
14use std::convert::Infallible;
15use std::fmt::Debug;
16use std::path::PathBuf;
17use std::rc::Rc;
18use std::sync::{Arc, Mutex};
19use std::time::{Duration, Instant};
20
21use anyhow::Error;
22use mz_cluster::client::{ClusterClient, ClusterSpec};
23use mz_cluster_client::client::TimelyConfig;
24use mz_compute_client::protocol::command::ComputeCommand;
25use mz_compute_client::protocol::history::ComputeCommandHistory;
26use mz_compute_client::protocol::response::ComputeResponse;
27use mz_compute_client::service::ComputeClient;
28use mz_ore::halt;
29use mz_ore::metrics::MetricsRegistry;
30use mz_ore::tracing::TracingHandle;
31use mz_persist_client::cache::PersistClientCache;
32use mz_storage_types::connections::ConnectionContext;
33use mz_timely_util::capture::EventLink;
34use mz_txn_wal::operator::TxnsContext;
35use timely::logging::TimelyEvent;
36use timely::progress::Antichain;
37use timely::worker::Worker as TimelyWorker;
38use tokio::sync::mpsc;
39use tokio::sync::mpsc::error::SendError;
40use tracing::{info, trace, warn};
41use uuid::Uuid;
42
43use crate::command_channel;
44use crate::compute_state::{ActiveComputeState, ComputeState, ReportedFrontier};
45use crate::metrics::{ComputeMetrics, WorkerMetrics};
46
47/// Caller-provided configuration for compute.
48#[derive(Clone, Debug)]
49pub struct ComputeInstanceContext {
50    /// A directory that can be used for scratch work.
51    pub scratch_directory: Option<PathBuf>,
52    /// Whether to set core affinity for Timely workers.
53    pub worker_core_affinity: bool,
54    /// Context required to connect to an external sink from compute,
55    /// like the `CopyToS3OneshotSink` compute sink.
56    pub connection_context: ConnectionContext,
57}
58
59/// Type alias for the storage timely log reader.
60pub(crate) type StorageTimelyLogReader =
61    Arc<EventLink<mz_repr::Timestamp, Vec<(Duration, TimelyEvent)>>>;
62
63/// Configures the server with compute-specific metrics.
64#[derive(Clone)]
65struct Config {
66    /// `persist` client cache.
67    pub persist_clients: Arc<PersistClientCache>,
68    /// Context necessary for rendering txn-wal operators.
69    pub txns_ctx: TxnsContext,
70    /// A process-global handle to tracing configuration.
71    pub tracing_handle: Arc<TracingHandle>,
72    /// Metrics exposed by compute replicas.
73    pub metrics: ComputeMetrics,
74    /// Other configuration for compute.
75    pub context: ComputeInstanceContext,
76    /// The process-global metrics registry.
77    pub metrics_registry: MetricsRegistry,
78    /// The number of timely workers per process.
79    pub workers_per_process: usize,
80    /// A reader for each storage worker in this process.
81    pub storage_log_readers: Arc<Mutex<Vec<Option<StorageTimelyLogReader>>>>,
82}
83
84/// Initiates a timely dataflow computation, processing compute commands.
85pub async fn serve(
86    timely_config: TimelyConfig,
87    metrics_registry: &MetricsRegistry,
88    persist_clients: Arc<PersistClientCache>,
89    txns_ctx: TxnsContext,
90    tracing_handle: Arc<TracingHandle>,
91    context: ComputeInstanceContext,
92    storage_log_readers: Vec<StorageTimelyLogReader>,
93) -> Result<impl Fn() -> Box<dyn ComputeClient> + use<>, Error> {
94    let workers_per_process = timely_config.workers;
95    // Normalize the log-reader vec to exactly one slot per local worker. Empty
96    // input means logging is disabled; pad with `None` so index-based access is
97    // always in bounds.
98    let storage_log_readers = if storage_log_readers.is_empty() {
99        (0..workers_per_process).map(|_| None).collect()
100    } else {
101        assert_eq!(storage_log_readers.len(), workers_per_process);
102        storage_log_readers.into_iter().map(Some).collect()
103    };
104    mz_timely_util::column_pager::metrics::register(
105        metrics_registry,
106        mz_timely_util::column_pager::tiered_policy(),
107    );
108
109    let config = Config {
110        persist_clients,
111        txns_ctx,
112        tracing_handle,
113        metrics: ComputeMetrics::register_with(metrics_registry),
114        context,
115        metrics_registry: metrics_registry.clone(),
116        workers_per_process,
117        storage_log_readers: Arc::new(Mutex::new(storage_log_readers)),
118    };
119    let tokio_executor = tokio::runtime::Handle::current();
120
121    let timely_container = config.build_cluster(timely_config, tokio_executor).await?;
122    let timely_container = Arc::new(Mutex::new(timely_container));
123
124    let client_builder = move || {
125        let client = ClusterClient::new(Arc::clone(&timely_container));
126        let client: Box<dyn ComputeClient> = Box::new(client);
127        client
128    };
129
130    Ok(client_builder)
131}
132
133/// Error type returned on connection nonce changes.
134///
135/// A nonce change informs workers that subsequent commands come a from a new client connection
136/// and therefore require reconciliation.
137struct NonceChange(Uuid);
138
139/// Endpoint used by workers to receive compute commands.
140///
141/// Observes nonce changes in the command stream and converts them into receive errors.
142struct CommandReceiver {
143    /// The channel supplying commands.
144    inner: command_channel::Receiver,
145    /// The ID of the Timely worker.
146    worker_id: usize,
147    /// The nonce identifying the current cluster protocol incarnation.
148    nonce: Option<Uuid>,
149    /// A stash to enable peeking the next command, used in `try_recv`.
150    stashed_command: Option<ComputeCommand>,
151}
152
153impl CommandReceiver {
154    fn new(inner: command_channel::Receiver, worker_id: usize) -> Self {
155        Self {
156            inner,
157            worker_id,
158            nonce: None,
159            stashed_command: None,
160        }
161    }
162
163    /// Receive the next pending command, if any.
164    ///
165    /// If the next command has a different nonce, this method instead returns an `Err`
166    /// containing the new nonce.
167    fn try_recv(&mut self) -> Result<Option<ComputeCommand>, NonceChange> {
168        if let Some(command) = self.stashed_command.take() {
169            return Ok(Some(command));
170        }
171        let Some((command, nonce)) = self.inner.try_recv() else {
172            return Ok(None);
173        };
174
175        trace!(worker = self.worker_id, %nonce, ?command, "received command");
176
177        if Some(nonce) == self.nonce {
178            Ok(Some(command))
179        } else {
180            self.nonce = Some(nonce);
181            self.stashed_command = Some(command);
182            Err(NonceChange(nonce))
183        }
184    }
185}
186
187/// Endpoint used by workers to send sending compute responses.
188///
189/// Tags responses with the current nonce, allowing receivers to filter out responses intended for
190/// previous client connections.
191pub(crate) struct ResponseSender {
192    /// The channel consuming responses.
193    inner: mpsc::UnboundedSender<(ComputeResponse, Uuid)>,
194    /// The ID of the Timely worker.
195    worker_id: usize,
196    /// The nonce identifying the current cluster protocol incarnation.
197    nonce: Option<Uuid>,
198}
199
200impl ResponseSender {
201    fn new(inner: mpsc::UnboundedSender<(ComputeResponse, Uuid)>, worker_id: usize) -> Self {
202        Self {
203            inner,
204            worker_id,
205            nonce: None,
206        }
207    }
208
209    /// Set the cluster protocol nonce.
210    fn set_nonce(&mut self, nonce: Uuid) {
211        self.nonce = Some(nonce);
212    }
213
214    /// Send a compute response.
215    pub fn send(&self, response: ComputeResponse) -> Result<(), SendError<ComputeResponse>> {
216        let nonce = self.nonce.expect("nonce must be initialized");
217
218        trace!(worker = self.worker_id, %nonce, ?response, "sending response");
219        self.inner
220            .send((response, nonce))
221            .map_err(|SendError((resp, _))| SendError(resp))
222    }
223}
224
225/// State maintained for each worker thread.
226///
227/// Much of this state can be viewed as local variables for the worker thread,
228/// holding state that persists across function calls.
229struct Worker<'w> {
230    /// The underlying Timely worker.
231    timely_worker: &'w mut TimelyWorker,
232    /// The channel over which commands are received.
233    command_rx: CommandReceiver,
234    /// The channel over which responses are sent.
235    response_tx: ResponseSender,
236    compute_state: Option<ComputeState>,
237    /// Compute metrics.
238    metrics: WorkerMetrics,
239    /// A process-global cache of (blob_uri, consensus_uri) -> PersistClient.
240    /// This is intentionally shared between workers
241    persist_clients: Arc<PersistClientCache>,
242    /// Context necessary for rendering txn-wal operators.
243    txns_ctx: TxnsContext,
244    /// A process-global handle to tracing configuration.
245    tracing_handle: Arc<TracingHandle>,
246    context: ComputeInstanceContext,
247    /// The process-global metrics registry.
248    metrics_registry: MetricsRegistry,
249    /// The number of timely workers per process.
250    workers_per_process: usize,
251    /// Reader for storage timely logging events.
252    storage_log_reader: Option<StorageTimelyLogReader>,
253}
254
255impl ClusterSpec for Config {
256    type Command = ComputeCommand;
257    type Response = ComputeResponse;
258
259    const NAME: &str = "compute";
260
261    fn run_worker(
262        &self,
263        timely_worker: &mut TimelyWorker,
264        client_rx: mpsc::UnboundedReceiver<(
265            Uuid,
266            mpsc::UnboundedReceiver<ComputeCommand>,
267            mpsc::UnboundedSender<ComputeResponse>,
268        )>,
269    ) {
270        if self.context.worker_core_affinity {
271            set_core_affinity(timely_worker.index());
272        }
273
274        let worker_id = timely_worker.index();
275        let metrics = self.metrics.for_worker(worker_id);
276
277        // Take this worker's storage log reader, indexed by local worker index
278        // so compute worker x matches storage worker x.
279        let local_index = worker_id % self.workers_per_process;
280        let storage_log_reader = self.storage_log_readers.lock().unwrap()[local_index].take();
281
282        // Create the command channel that broadcasts commands from worker 0 to other workers. We
283        // reuse this channel between client connections, to avoid bugs where different workers end
284        // up creating incompatible sides of the channel dataflow after reconnects.
285        // See database-issues#8964.
286        let (cmd_tx, cmd_rx) = command_channel::render(timely_worker);
287        let (resp_tx, resp_rx) = mpsc::unbounded_channel();
288
289        spawn_channel_adapter(client_rx, cmd_tx, resp_rx, worker_id);
290
291        Worker {
292            timely_worker,
293            command_rx: CommandReceiver::new(cmd_rx, worker_id),
294            response_tx: ResponseSender::new(resp_tx, worker_id),
295            metrics,
296            context: self.context.clone(),
297            persist_clients: Arc::clone(&self.persist_clients),
298            txns_ctx: self.txns_ctx.clone(),
299            compute_state: None,
300            tracing_handle: Arc::clone(&self.tracing_handle),
301            metrics_registry: self.metrics_registry.clone(),
302            workers_per_process: self.workers_per_process,
303            storage_log_reader,
304        }
305        .run()
306    }
307}
308
309/// Set the current thread's core affinity, based on the given `worker_id`.
310#[cfg(not(target_os = "macos"))]
311fn set_core_affinity(worker_id: usize) {
312    use tracing::error;
313
314    let Some(mut core_ids) = core_affinity::get_core_ids() else {
315        error!(worker_id, "unable to get core IDs for setting affinity");
316        return;
317    };
318
319    // The `get_core_ids` docs don't say anything about a guaranteed order of the returned Vec,
320    // so sort it just to be safe.
321    core_ids.sort_unstable_by_key(|i| i.id);
322
323    // On multi-process replicas `worker_id` might be greater than the number of available cores.
324    // However, we assume that we always have at least as many cores as there are local workers.
325    // Violating this assumption is safe but might lead to degraded performance due to skew in core
326    // utilization.
327    let idx = worker_id % core_ids.len();
328    let core_id = core_ids[idx];
329
330    if core_affinity::set_for_current(core_id) {
331        info!(
332            worker_id,
333            core_id = core_id.id,
334            "set core affinity for worker"
335        );
336    } else {
337        error!(
338            worker_id,
339            core_id = core_id.id,
340            "failed to set core affinity for worker"
341        )
342    }
343}
344
345/// Set the current thread's core affinity, based on the given `worker_id`.
346#[cfg(target_os = "macos")]
347fn set_core_affinity(_worker_id: usize) {
348    // Setting core affinity is known to not work on Apple Silicon:
349    // https://github.com/Elzair/core_affinity_rs/issues/22
350    info!("setting core affinity is not supported on macOS");
351}
352
353impl<'w> Worker<'w> {
354    /// Runs a compute worker.
355    pub fn run(&mut self) {
356        // The command receiver is initialized without an nonce, so receiving the first command
357        // always triggers a nonce change.
358        let NonceChange(nonce) = self.recv_command().expect_err("change to first nonce");
359        self.set_nonce(nonce);
360
361        loop {
362            let Err(NonceChange(nonce)) = self.run_client();
363            self.set_nonce(nonce);
364        }
365    }
366
367    fn set_nonce(&mut self, nonce: Uuid) {
368        self.response_tx.set_nonce(nonce);
369    }
370
371    /// Handles commands for a client connection, returns when the nonce changes.
372    fn run_client(&mut self) -> Result<Infallible, NonceChange> {
373        self.reconcile()?;
374
375        // The last time we did periodic maintenance.
376        let mut last_maintenance = Instant::now();
377
378        // Commence normal operation.
379        loop {
380            // Get the maintenance interval, default to zero if we don't have a compute state.
381            let maintenance_interval = self
382                .compute_state
383                .as_ref()
384                .map_or(Duration::ZERO, |state| state.server_maintenance_interval);
385
386            let now = Instant::now();
387            // Determine if we need to perform maintenance, which is true if `maintenance_interval`
388            // time has passed since the last maintenance.
389            let sleep_duration;
390            if now >= last_maintenance + maintenance_interval {
391                last_maintenance = now;
392                sleep_duration = None;
393
394                // Report frontier information back the coordinator.
395                if let Some(mut compute_state) = self.activate_compute() {
396                    compute_state.compute_state.traces.maintenance();
397                    compute_state.report_frontiers();
398                    compute_state.report_metrics();
399                    compute_state.check_expiration();
400                }
401
402                self.metrics.record_shared_row_metrics();
403            } else {
404                // We didn't perform maintenance, sleep until the next maintenance interval.
405                let next_maintenance = last_maintenance + maintenance_interval;
406                sleep_duration = Some(next_maintenance.saturating_duration_since(now))
407            };
408
409            // Step the timely worker, recording the time taken.
410            let timer = self.metrics.timely_step_duration_seconds.start_timer();
411            self.timely_worker.step_or_park(sleep_duration);
412            timer.observe_duration();
413
414            self.handle_pending_commands()?;
415
416            if let Some(mut compute_state) = self.activate_compute() {
417                compute_state.process_peeks();
418                compute_state.process_subscribes();
419                compute_state.process_copy_tos();
420            }
421        }
422    }
423
424    fn handle_pending_commands(&mut self) -> Result<(), NonceChange> {
425        while let Some(cmd) = self.command_rx.try_recv()? {
426            self.handle_command(cmd);
427        }
428        Ok(())
429    }
430
431    fn handle_command(&mut self, cmd: ComputeCommand) {
432        if matches!(&cmd, ComputeCommand::CreateInstance(_)) {
433            self.compute_state = Some(ComputeState::new(
434                Arc::clone(&self.persist_clients),
435                self.txns_ctx.clone(),
436                self.metrics.clone(),
437                Arc::clone(&self.tracing_handle),
438                self.context.clone(),
439                self.metrics_registry.clone(),
440                self.workers_per_process,
441                self.storage_log_reader.take(),
442            ));
443        }
444        self.activate_compute().unwrap().handle_compute_command(cmd);
445    }
446
447    fn activate_compute(&mut self) -> Option<ActiveComputeState<'_>> {
448        if let Some(compute_state) = &mut self.compute_state {
449            Some(ActiveComputeState {
450                timely_worker: &mut *self.timely_worker,
451                compute_state,
452                response_tx: &mut self.response_tx,
453            })
454        } else {
455            None
456        }
457    }
458
459    /// Receive the next compute command.
460    ///
461    /// This method blocks if no command is currently available, but takes care to step the Timely
462    /// worker while doing so.
463    fn recv_command(&mut self) -> Result<ComputeCommand, NonceChange> {
464        loop {
465            if let Some(cmd) = self.command_rx.try_recv()? {
466                return Ok(cmd);
467            }
468
469            let start = Instant::now();
470            self.timely_worker.step_or_park(None);
471            self.metrics
472                .timely_step_duration_seconds
473                .observe(start.elapsed().as_secs_f64());
474        }
475    }
476
477    /// Extract commands until `InitializationComplete`, and make the worker reflect those commands.
478    ///
479    /// This method is meant to be a function of the commands received thus far (as recorded in the
480    /// compute state command history) and the new commands from `command_rx`. It should not be a
481    /// function of other characteristics, like whether the worker has managed to respond to a peek
482    /// or not. Some effort goes in to narrowing our view to only the existing commands we can be sure
483    /// are live at all other workers.
484    ///
485    /// The methodology here is to drain `command_rx` until an `InitializationComplete`, at which point
486    /// the prior commands are "reconciled" in. Reconciliation takes each goal dataflow and looks for an
487    /// existing "compatible" dataflow (per `compatible()`) it can repurpose, with some additional tests
488    /// to be sure that we can cut over from one to the other (no additional compaction, no tails/sinks).
489    /// With any connections established, old orphaned dataflows are allow to compact away, and any new
490    /// dataflows are created from scratch. "Kept" dataflows are allowed to compact up to any new `as_of`.
491    ///
492    /// Some additional tidying happens, cleaning up pending peeks, reported frontiers, and creating a new
493    /// subscribe response buffer. We will need to be vigilant with future modifications to `ComputeState` to
494    /// line up changes there with clean resets here.
495    fn reconcile(&mut self) -> Result<(), NonceChange> {
496        // To initialize the connection, we want to drain all commands until we receive a
497        // `ComputeCommand::InitializationComplete` command to form a target command state.
498        let mut new_commands = Vec::new();
499        loop {
500            match self.recv_command()? {
501                ComputeCommand::InitializationComplete => break,
502                command => new_commands.push(command),
503            }
504        }
505
506        // Commands we will need to apply before entering normal service.
507        // These commands may include dropping existing dataflows, compacting existing dataflows,
508        // and creating new dataflows, in addition to standard peek and compaction commands.
509        // The result should be the same as if dropping all dataflows and running `new_commands`.
510        let mut todo_commands = Vec::new();
511        // We only have a compute history if we are in an initialized state
512        // (i.e. after a `CreateInstance`).
513        // If this is not the case, just copy `new_commands` into `todo_commands`.
514        if let Some(compute_state) = &mut self.compute_state {
515            // Reduce the installed commands.
516            // Importantly, act as if all peeks may have been retired (as we cannot know otherwise).
517            compute_state.command_history.discard_peeks();
518            compute_state.command_history.reduce();
519
520            // At this point, we need to sort out which of the *certainly installed* dataflows are
521            // suitable replacements for the requested dataflows. A dataflow is "certainly installed"
522            // as of a frontier if its compaction allows it to go no further. We ignore peeks for this
523            // reasoning, as we cannot be certain that peeks still exist at any other worker.
524
525            // Having reduced our installed command history retaining no peeks (above), we should be able
526            // to use track down installed dataflows we can use as surrogates for requested dataflows (which
527            // have retained all of their peeks, creating a more demanding `as_of` requirement).
528            // NB: installed dataflows may still be allowed to further compact, and we should double check
529            // this before being too confident. It should be rare without peeks, but could happen with e.g.
530            // multiple outputs of a dataflow.
531
532            // The values with which a prior `CreateInstance` was called, if it was.
533            let mut old_instance_config = None;
534            // Index dataflows by `export_ids().collect()`, as this is a precondition for their compatibility.
535            let mut old_dataflows = BTreeMap::default();
536            // Maintain allowed compaction, in case installed identifiers may have been allowed to compact.
537            let mut old_frontiers = BTreeMap::default();
538            for command in compute_state.command_history.iter() {
539                match command {
540                    ComputeCommand::CreateInstance(config) => {
541                        old_instance_config = Some(config);
542                    }
543                    ComputeCommand::CreateDataflow(dataflow) => {
544                        let export_ids = dataflow.export_ids().collect::<BTreeSet<_>>();
545                        old_dataflows.insert(export_ids, dataflow);
546                    }
547                    ComputeCommand::AllowCompaction { id, frontier } => {
548                        old_frontiers.insert(id, frontier);
549                    }
550                    _ => {
551                        // Nothing to do in these cases.
552                    }
553                }
554            }
555
556            // Compaction commands that can be applied to existing dataflows.
557            let mut old_compaction = BTreeMap::default();
558            // Exported identifiers from dataflows we retain.
559            let mut retain_ids = BTreeSet::default();
560
561            // Traverse new commands, sorting out what remediation we can do.
562            for command in new_commands.iter() {
563                match command {
564                    ComputeCommand::CreateDataflow(dataflow) => {
565                        // Attempt to find an existing match for the dataflow.
566                        let as_of = dataflow.as_of.as_ref().unwrap();
567                        let export_ids = dataflow.export_ids().collect::<BTreeSet<_>>();
568
569                        if let Some(old_dataflow) = old_dataflows.get(&export_ids) {
570                            let compatible = old_dataflow.compatible_with(dataflow);
571                            let uncompacted = !export_ids
572                                .iter()
573                                .flat_map(|id| old_frontiers.get(id))
574                                .any(|frontier| {
575                                    !timely::PartialOrder::less_equal(
576                                        *frontier,
577                                        dataflow.as_of.as_ref().unwrap(),
578                                    )
579                                });
580
581                            // We cannot reconcile subscribe and copy-to sinks at the moment,
582                            // because the response buffer is shared, and to a first approximation
583                            // must be completely reformed.
584                            let subscribe_free = dataflow.subscribe_ids().next().is_none();
585                            let copy_to_free = dataflow.copy_to_ids().next().is_none();
586
587                            // If we have replaced any dependency of this dataflow, we need to
588                            // replace this dataflow, to make it use the replacement.
589                            let dependencies_retained = dataflow
590                                .imported_index_ids()
591                                .all(|id| retain_ids.contains(&id));
592
593                            if compatible
594                                && uncompacted
595                                && subscribe_free
596                                && copy_to_free
597                                && dependencies_retained
598                            {
599                                // Match found; remove the match from the deletion queue,
600                                // and compact its outputs to the dataflow's `as_of`.
601                                old_dataflows.remove(&export_ids);
602                                for id in export_ids.iter() {
603                                    old_compaction.insert(*id, as_of.clone());
604                                }
605                                retain_ids.extend(export_ids);
606                            } else {
607                                warn!(
608                                    ?export_ids,
609                                    ?compatible,
610                                    ?uncompacted,
611                                    ?subscribe_free,
612                                    ?copy_to_free,
613                                    ?dependencies_retained,
614                                    old_as_of = ?old_dataflow.as_of,
615                                    new_as_of = ?as_of,
616                                    "dataflow reconciliation failed",
617                                );
618
619                                // Dump the full dataflow plans if they are incompatible, to
620                                // simplify debugging hard-to-reproduce reconciliation failures.
621                                if !compatible {
622                                    warn!(
623                                        old = ?old_dataflow,
624                                        new = ?dataflow,
625                                        "incompatible dataflows in reconciliation",
626                                    );
627                                }
628
629                                todo_commands
630                                    .push(ComputeCommand::CreateDataflow(dataflow.clone()));
631                            }
632
633                            compute_state.metrics.record_dataflow_reconciliation(
634                                compatible,
635                                uncompacted,
636                                subscribe_free,
637                                copy_to_free,
638                                dependencies_retained,
639                            );
640                        } else {
641                            todo_commands.push(ComputeCommand::CreateDataflow(dataflow.clone()));
642                        }
643                    }
644                    ComputeCommand::CreateInstance(config) => {
645                        // Cluster creation should not be performed again!
646                        if old_instance_config.map_or(false, |old| !old.compatible_with(config)) {
647                            halt!(
648                                "new instance configuration not compatible with existing instance configuration:\n{:?}\nvs\n{:?}",
649                                config,
650                                old_instance_config,
651                            );
652                        }
653                    }
654                    // All other commands we apply as requested.
655                    command => {
656                        todo_commands.push(command.clone());
657                    }
658                }
659            }
660
661            // Issue compaction commands first to reclaim resources.
662            for (_, dataflow) in old_dataflows.iter() {
663                for id in dataflow.export_ids() {
664                    // We want to drop anything that has not yet been dropped,
665                    // and nothing that has already been dropped.
666                    if old_frontiers.get(&id) != Some(&&Antichain::new()) {
667                        old_compaction.insert(id, Antichain::new());
668                    }
669                }
670            }
671            for (&id, frontier) in &old_compaction {
672                let frontier = frontier.clone();
673                todo_commands.insert(0, ComputeCommand::AllowCompaction { id, frontier });
674            }
675
676            // Clean up worker-local state.
677            //
678            // Various aspects of `ComputeState` need to be either uninstalled, or return to a blank slate.
679            // All dropped dataflows should clean up after themselves, as we plan to install new dataflows
680            // re-using the same identifiers.
681            // All re-used dataflows should roll back any believed communicated information (e.g. frontiers)
682            // so that they recommunicate that information as if from scratch.
683
684            // Remove all pending peeks.
685            for (_, peek) in std::mem::take(&mut compute_state.pending_peeks) {
686                // Log dropping the peek request.
687                if let Some(logger) = compute_state.compute_logger.as_mut() {
688                    logger.log(&peek.as_log_event(false));
689                }
690            }
691
692            for (&id, collection) in compute_state.collections.iter_mut() {
693                // Adjust reported frontiers:
694                //  * For dataflows we continue to use, reset to ensure we report something not
695                //    before the new `as_of` next.
696                //  * For dataflows we drop, set to the empty frontier, to ensure we don't report
697                //    anything for them.
698                let retained = retain_ids.contains(&id);
699                let compaction = old_compaction.remove(&id);
700                let new_reported_frontier = match (retained, compaction) {
701                    (true, Some(new_as_of)) => ReportedFrontier::NotReported { lower: new_as_of },
702                    (true, None) => {
703                        unreachable!("retained dataflows are compacted to the new as_of")
704                    }
705                    (false, Some(new_frontier)) => {
706                        assert!(new_frontier.is_empty());
707                        ReportedFrontier::Reported(new_frontier)
708                    }
709                    (false, None) => {
710                        // Logging dataflows are implicitly retained and don't have a new as_of.
711                        // Reset them to the minimal frontier.
712                        ReportedFrontier::new()
713                    }
714                };
715
716                collection.reset_reported_frontiers(new_reported_frontier);
717
718                // Sink tokens should be retained for retained dataflows, and dropped for dropped
719                // dataflows.
720                //
721                // Dropping the tokens of active subscribe and copy-tos makes them place
722                // `DroppedAt` responses into the respective response buffer. We drop those buffers
723                // in the next step, which ensures that we don't send out `DroppedAt` responses for
724                // subscribe/copy-tos dropped during reconciliation.
725                if !retained {
726                    collection.sink_token = None;
727                }
728            }
729
730            // We must drop the response buffers as they are global across all subscribe/copy-tos.
731            // If they were broken out by `GlobalId` then we could drop only the response buffers
732            // of dataflows we drop.
733            compute_state.subscribe_response_buffer = Rc::new(RefCell::new(Vec::new()));
734            compute_state.copy_to_response_buffer = Rc::new(RefCell::new(Vec::new()));
735
736            // The controller expects the logging collections to be readable from the minimum time
737            // initially. We cannot recreate the logging arrangements without restarting the
738            // instance, but we can pad the compacted times with empty data. Doing so is sound
739            // because logging collections from different replica incarnations are considered
740            // distinct TVCs, so the controller doesn't expect any historical consistency from
741            // these collections when it reconnects to a replica.
742            //
743            // TODO(database-issues#8152): Consider resolving this with controller-side reconciliation instead.
744            if let Some(config) = old_instance_config {
745                for id in config.logging.index_logs.values() {
746                    let trace = compute_state
747                        .traces
748                        .remove(id)
749                        .expect("logging trace exists");
750                    let padded = trace.into_padded();
751                    compute_state.traces.set(*id, padded);
752                }
753            }
754        } else {
755            todo_commands.clone_from(&new_commands);
756        }
757
758        // Execute the commands to bring us to `new_commands`.
759        for command in todo_commands.into_iter() {
760            self.handle_command(command);
761        }
762
763        // Overwrite `self.command_history` to reflect `new_commands`.
764        // It is possible that there still isn't a compute state yet.
765        if let Some(compute_state) = &mut self.compute_state {
766            let mut command_history = ComputeCommandHistory::new(self.metrics.for_history());
767            for command in new_commands.iter() {
768                command_history.push(command.clone());
769            }
770            compute_state.command_history = command_history;
771        }
772        Ok(())
773    }
774}
775
776/// Spawn a task to bridge between [`ClusterClient`] and [`Worker`] channels.
777///
778/// The [`Worker`] expects a pair of persistent channels, with punctuation marking reconnects,
779/// while the [`ClusterClient`] provides a new pair of channels on each reconnect.
780fn spawn_channel_adapter(
781    mut client_rx: mpsc::UnboundedReceiver<(
782        Uuid,
783        mpsc::UnboundedReceiver<ComputeCommand>,
784        mpsc::UnboundedSender<ComputeResponse>,
785    )>,
786    command_tx: command_channel::Sender,
787    mut response_rx: mpsc::UnboundedReceiver<(ComputeResponse, Uuid)>,
788    worker_id: usize,
789) {
790    mz_ore::task::spawn(
791        || format!("compute-channel-adapter-{worker_id}"),
792        async move {
793            // To make workers aware of the individual client connections, we tag forwarded
794            // commands with the client nonce. Additionally, we use the nonce to filter out
795            // responses with a different nonce, which are intended for different client
796            // connections.
797            //
798            // It's possible that we receive responses with nonces from the past but also from the
799            // future: Worker 0 might have received a new nonce before us and broadcasted it to our
800            // Timely cluster. When we receive a response with a future nonce, we need to wait with
801            // forwarding it until we have received the same nonce from a client connection.
802            //
803            // Nonces are not ordered so we don't know whether a response nonce is from the past or
804            // the future. We thus assume that every response with an unknown nonce might be from
805            // the future and stash them all. Every time we reconnect, we immediately send all
806            // stashed responses with a matching nonce. Every time we receive a new response with a
807            // nonce that matches our current one, we can discard the entire response stash as we
808            // know that all stashed responses must be from the past.
809            let mut stashed_responses = BTreeMap::<Uuid, Vec<ComputeResponse>>::new();
810
811            while let Some((nonce, mut command_rx, response_tx)) = client_rx.recv().await {
812                // Send stashed responses for this client.
813                if let Some(resps) = stashed_responses.remove(&nonce) {
814                    for resp in resps {
815                        let _ = response_tx.send(resp);
816                    }
817                }
818
819                // Wait for a new response while forwarding received commands.
820                let mut serve_rx_channels = async || loop {
821                    tokio::select! {
822                        msg = command_rx.recv() => match msg {
823                            Some(cmd) => command_tx.send((cmd, nonce)),
824                            None => return Err(()),
825                        },
826                        msg = response_rx.recv() => {
827                            return Ok(msg.expect("worker connected"));
828                        }
829                    }
830                };
831
832                // Serve this connection until we see any of the channels disconnect.
833                loop {
834                    let Ok((resp, resp_nonce)) = serve_rx_channels().await else {
835                        break;
836                    };
837
838                    if resp_nonce == nonce {
839                        // Response for the current connection; forward it.
840                        stashed_responses.clear();
841                        if response_tx.send(resp).is_err() {
842                            break;
843                        }
844                    } else {
845                        // Response for a past or future connection; stash it.
846                        let stash = stashed_responses.entry(resp_nonce).or_default();
847                        stash.push(resp);
848                    }
849                }
850            }
851        },
852    );
853}