Skip to main content

mz_compute/
server.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! An interactive dataflow server.
11
12use std::cell::RefCell;
13use std::collections::{BTreeMap, BTreeSet};
14use std::convert::Infallible;
15use std::fmt::Debug;
16use std::path::PathBuf;
17use std::rc::Rc;
18use std::sync::{Arc, Mutex};
19use std::time::{Duration, Instant};
20
21use anyhow::Error;
22use mz_cluster::client::{ClusterClient, ClusterSpec};
23use mz_cluster_client::client::TimelyConfig;
24use mz_compute_client::protocol::command::ComputeCommand;
25use mz_compute_client::protocol::history::ComputeCommandHistory;
26use mz_compute_client::protocol::response::ComputeResponse;
27use mz_compute_client::service::ComputeClient;
28use mz_ore::halt;
29use mz_ore::metrics::MetricsRegistry;
30use mz_ore::tracing::TracingHandle;
31use mz_persist_client::cache::PersistClientCache;
32use mz_storage_types::connections::ConnectionContext;
33use mz_txn_wal::operator::TxnsContext;
34use timely::communication::Allocate;
35use timely::progress::Antichain;
36use timely::worker::Worker as TimelyWorker;
37use tokio::sync::mpsc;
38use tokio::sync::mpsc::error::SendError;
39use tracing::{info, trace, warn};
40use uuid::Uuid;
41
42use crate::command_channel;
43use crate::compute_state::{ActiveComputeState, ComputeState, ReportedFrontier};
44use crate::metrics::{ComputeMetrics, WorkerMetrics};
45
46/// Caller-provided configuration for compute.
47#[derive(Clone, Debug)]
48pub struct ComputeInstanceContext {
49    /// A directory that can be used for scratch work.
50    pub scratch_directory: Option<PathBuf>,
51    /// Whether to set core affinity for Timely workers.
52    pub worker_core_affinity: bool,
53    /// Context required to connect to an external sink from compute,
54    /// like the `CopyToS3OneshotSink` compute sink.
55    pub connection_context: ConnectionContext,
56}
57
58/// Configures the server with compute-specific metrics.
59#[derive(Debug, Clone)]
60struct Config {
61    /// `persist` client cache.
62    pub persist_clients: Arc<PersistClientCache>,
63    /// Context necessary for rendering txn-wal operators.
64    pub txns_ctx: TxnsContext,
65    /// A process-global handle to tracing configuration.
66    pub tracing_handle: Arc<TracingHandle>,
67    /// Metrics exposed by compute replicas.
68    pub metrics: ComputeMetrics,
69    /// Other configuration for compute.
70    pub context: ComputeInstanceContext,
71}
72
73/// Initiates a timely dataflow computation, processing compute commands.
74pub async fn serve(
75    timely_config: TimelyConfig,
76    metrics_registry: &MetricsRegistry,
77    persist_clients: Arc<PersistClientCache>,
78    txns_ctx: TxnsContext,
79    tracing_handle: Arc<TracingHandle>,
80    context: ComputeInstanceContext,
81) -> Result<impl Fn() -> Box<dyn ComputeClient> + use<>, Error> {
82    let config = Config {
83        persist_clients,
84        txns_ctx,
85        tracing_handle,
86        metrics: ComputeMetrics::register_with(metrics_registry),
87        context,
88    };
89    let tokio_executor = tokio::runtime::Handle::current();
90
91    let timely_container = config.build_cluster(timely_config, tokio_executor).await?;
92    let timely_container = Arc::new(Mutex::new(timely_container));
93
94    let client_builder = move || {
95        let client = ClusterClient::new(Arc::clone(&timely_container));
96        let client: Box<dyn ComputeClient> = Box::new(client);
97        client
98    };
99
100    Ok(client_builder)
101}
102
103/// Error type returned on connection nonce changes.
104///
105/// A nonce change informs workers that subsequent commands come a from a new client connection
106/// and therefore require reconciliation.
107struct NonceChange(Uuid);
108
109/// Endpoint used by workers to receive compute commands.
110///
111/// Observes nonce changes in the command stream and converts them into receive errors.
112struct CommandReceiver {
113    /// The channel supplying commands.
114    inner: command_channel::Receiver,
115    /// The ID of the Timely worker.
116    worker_id: usize,
117    /// The nonce identifying the current cluster protocol incarnation.
118    nonce: Option<Uuid>,
119    /// A stash to enable peeking the next command, used in `try_recv`.
120    stashed_command: Option<ComputeCommand>,
121}
122
123impl CommandReceiver {
124    fn new(inner: command_channel::Receiver, worker_id: usize) -> Self {
125        Self {
126            inner,
127            worker_id,
128            nonce: None,
129            stashed_command: None,
130        }
131    }
132
133    /// Receive the next pending command, if any.
134    ///
135    /// If the next command has a different nonce, this method instead returns an `Err`
136    /// containing the new nonce.
137    fn try_recv(&mut self) -> Result<Option<ComputeCommand>, NonceChange> {
138        if let Some(command) = self.stashed_command.take() {
139            return Ok(Some(command));
140        }
141        let Some((command, nonce)) = self.inner.try_recv() else {
142            return Ok(None);
143        };
144
145        trace!(worker = self.worker_id, %nonce, ?command, "received command");
146
147        if Some(nonce) == self.nonce {
148            Ok(Some(command))
149        } else {
150            self.nonce = Some(nonce);
151            self.stashed_command = Some(command);
152            Err(NonceChange(nonce))
153        }
154    }
155}
156
157/// Endpoint used by workers to send sending compute responses.
158///
159/// Tags responses with the current nonce, allowing receivers to filter out responses intended for
160/// previous client connections.
161pub(crate) struct ResponseSender {
162    /// The channel consuming responses.
163    inner: mpsc::UnboundedSender<(ComputeResponse, Uuid)>,
164    /// The ID of the Timely worker.
165    worker_id: usize,
166    /// The nonce identifying the current cluster protocol incarnation.
167    nonce: Option<Uuid>,
168}
169
170impl ResponseSender {
171    fn new(inner: mpsc::UnboundedSender<(ComputeResponse, Uuid)>, worker_id: usize) -> Self {
172        Self {
173            inner,
174            worker_id,
175            nonce: None,
176        }
177    }
178
179    /// Set the cluster protocol nonce.
180    fn set_nonce(&mut self, nonce: Uuid) {
181        self.nonce = Some(nonce);
182    }
183
184    /// Send a compute response.
185    pub fn send(&self, response: ComputeResponse) -> Result<(), SendError<ComputeResponse>> {
186        let nonce = self.nonce.expect("nonce must be initialized");
187
188        trace!(worker = self.worker_id, %nonce, ?response, "sending response");
189        self.inner
190            .send((response, nonce))
191            .map_err(|SendError((resp, _))| SendError(resp))
192    }
193}
194
195/// State maintained for each worker thread.
196///
197/// Much of this state can be viewed as local variables for the worker thread,
198/// holding state that persists across function calls.
199struct Worker<'w, A: Allocate> {
200    /// The underlying Timely worker.
201    timely_worker: &'w mut TimelyWorker<A>,
202    /// The channel over which commands are received.
203    command_rx: CommandReceiver,
204    /// The channel over which responses are sent.
205    response_tx: ResponseSender,
206    compute_state: Option<ComputeState>,
207    /// Compute metrics.
208    metrics: WorkerMetrics,
209    /// A process-global cache of (blob_uri, consensus_uri) -> PersistClient.
210    /// This is intentionally shared between workers
211    persist_clients: Arc<PersistClientCache>,
212    /// Context necessary for rendering txn-wal operators.
213    txns_ctx: TxnsContext,
214    /// A process-global handle to tracing configuration.
215    tracing_handle: Arc<TracingHandle>,
216    context: ComputeInstanceContext,
217}
218
219impl ClusterSpec for Config {
220    type Command = ComputeCommand;
221    type Response = ComputeResponse;
222
223    fn run_worker<A: Allocate + 'static>(
224        &self,
225        timely_worker: &mut TimelyWorker<A>,
226        client_rx: mpsc::UnboundedReceiver<(
227            Uuid,
228            mpsc::UnboundedReceiver<ComputeCommand>,
229            mpsc::UnboundedSender<ComputeResponse>,
230        )>,
231    ) {
232        if self.context.worker_core_affinity {
233            set_core_affinity(timely_worker.index());
234        }
235
236        let worker_id = timely_worker.index();
237        let metrics = self.metrics.for_worker(worker_id);
238
239        // Create the command channel that broadcasts commands from worker 0 to other workers. We
240        // reuse this channel between client connections, to avoid bugs where different workers end
241        // up creating incompatible sides of the channel dataflow after reconnects.
242        // See database-issues#8964.
243        let (cmd_tx, cmd_rx) = command_channel::render(timely_worker);
244        let (resp_tx, resp_rx) = mpsc::unbounded_channel();
245
246        spawn_channel_adapter(client_rx, cmd_tx, resp_rx, worker_id);
247
248        Worker {
249            timely_worker,
250            command_rx: CommandReceiver::new(cmd_rx, worker_id),
251            response_tx: ResponseSender::new(resp_tx, worker_id),
252            metrics,
253            context: self.context.clone(),
254            persist_clients: Arc::clone(&self.persist_clients),
255            txns_ctx: self.txns_ctx.clone(),
256            compute_state: None,
257            tracing_handle: Arc::clone(&self.tracing_handle),
258        }
259        .run()
260    }
261}
262
263/// Set the current thread's core affinity, based on the given `worker_id`.
264#[cfg(not(target_os = "macos"))]
265fn set_core_affinity(worker_id: usize) {
266    use tracing::error;
267
268    let Some(mut core_ids) = core_affinity::get_core_ids() else {
269        error!(worker_id, "unable to get core IDs for setting affinity");
270        return;
271    };
272
273    // The `get_core_ids` docs don't say anything about a guaranteed order of the returned Vec,
274    // so sort it just to be safe.
275    core_ids.sort_unstable_by_key(|i| i.id);
276
277    // On multi-process replicas `worker_id` might be greater than the number of available cores.
278    // However, we assume that we always have at least as many cores as there are local workers.
279    // Violating this assumption is safe but might lead to degraded performance due to skew in core
280    // utilization.
281    let idx = worker_id % core_ids.len();
282    let core_id = core_ids[idx];
283
284    if core_affinity::set_for_current(core_id) {
285        info!(
286            worker_id,
287            core_id = core_id.id,
288            "set core affinity for worker"
289        );
290    } else {
291        error!(
292            worker_id,
293            core_id = core_id.id,
294            "failed to set core affinity for worker"
295        )
296    }
297}
298
299/// Set the current thread's core affinity, based on the given `worker_id`.
300#[cfg(target_os = "macos")]
301fn set_core_affinity(_worker_id: usize) {
302    // Setting core affinity is known to not work on Apple Silicon:
303    // https://github.com/Elzair/core_affinity_rs/issues/22
304    info!("setting core affinity is not supported on macOS");
305}
306
307impl<'w, A: Allocate + 'static> Worker<'w, A> {
308    /// Runs a compute worker.
309    pub fn run(&mut self) {
310        // The command receiver is initialized without an nonce, so receiving the first command
311        // always triggers a nonce change.
312        let NonceChange(nonce) = self.recv_command().expect_err("change to first nonce");
313        self.set_nonce(nonce);
314
315        loop {
316            let Err(NonceChange(nonce)) = self.run_client();
317            self.set_nonce(nonce);
318        }
319    }
320
321    fn set_nonce(&mut self, nonce: Uuid) {
322        self.response_tx.set_nonce(nonce);
323    }
324
325    /// Handles commands for a client connection, returns when the nonce changes.
326    fn run_client(&mut self) -> Result<Infallible, NonceChange> {
327        self.reconcile()?;
328
329        // The last time we did periodic maintenance.
330        let mut last_maintenance = Instant::now();
331
332        // Commence normal operation.
333        loop {
334            // Get the maintenance interval, default to zero if we don't have a compute state.
335            let maintenance_interval = self
336                .compute_state
337                .as_ref()
338                .map_or(Duration::ZERO, |state| state.server_maintenance_interval);
339
340            let now = Instant::now();
341            // Determine if we need to perform maintenance, which is true if `maintenance_interval`
342            // time has passed since the last maintenance.
343            let sleep_duration;
344            if now >= last_maintenance + maintenance_interval {
345                last_maintenance = now;
346                sleep_duration = None;
347
348                // Report frontier information back the coordinator.
349                if let Some(mut compute_state) = self.activate_compute() {
350                    compute_state.compute_state.traces.maintenance();
351                    compute_state.report_frontiers();
352                    compute_state.report_metrics();
353                    compute_state.check_expiration();
354                }
355
356                self.metrics.record_shared_row_metrics();
357            } else {
358                // We didn't perform maintenance, sleep until the next maintenance interval.
359                let next_maintenance = last_maintenance + maintenance_interval;
360                sleep_duration = Some(next_maintenance.saturating_duration_since(now))
361            };
362
363            // Step the timely worker, recording the time taken.
364            let timer = self.metrics.timely_step_duration_seconds.start_timer();
365            self.timely_worker.step_or_park(sleep_duration);
366            timer.observe_duration();
367
368            self.handle_pending_commands()?;
369
370            if let Some(mut compute_state) = self.activate_compute() {
371                compute_state.process_peeks();
372                compute_state.process_subscribes();
373                compute_state.process_copy_tos();
374            }
375        }
376    }
377
378    fn handle_pending_commands(&mut self) -> Result<(), NonceChange> {
379        while let Some(cmd) = self.command_rx.try_recv()? {
380            self.handle_command(cmd);
381        }
382        Ok(())
383    }
384
385    fn handle_command(&mut self, cmd: ComputeCommand) {
386        match &cmd {
387            ComputeCommand::CreateInstance(_) => {
388                self.compute_state = Some(ComputeState::new(
389                    Arc::clone(&self.persist_clients),
390                    self.txns_ctx.clone(),
391                    self.metrics.clone(),
392                    Arc::clone(&self.tracing_handle),
393                    self.context.clone(),
394                ));
395            }
396            _ => (),
397        }
398        self.activate_compute().unwrap().handle_compute_command(cmd);
399    }
400
401    fn activate_compute(&mut self) -> Option<ActiveComputeState<'_, A>> {
402        if let Some(compute_state) = &mut self.compute_state {
403            Some(ActiveComputeState {
404                timely_worker: &mut *self.timely_worker,
405                compute_state,
406                response_tx: &mut self.response_tx,
407            })
408        } else {
409            None
410        }
411    }
412
413    /// Receive the next compute command.
414    ///
415    /// This method blocks if no command is currently available, but takes care to step the Timely
416    /// worker while doing so.
417    fn recv_command(&mut self) -> Result<ComputeCommand, NonceChange> {
418        loop {
419            if let Some(cmd) = self.command_rx.try_recv()? {
420                return Ok(cmd);
421            }
422
423            let start = Instant::now();
424            self.timely_worker.step_or_park(None);
425            self.metrics
426                .timely_step_duration_seconds
427                .observe(start.elapsed().as_secs_f64());
428        }
429    }
430
431    /// Extract commands until `InitializationComplete`, and make the worker reflect those commands.
432    ///
433    /// This method is meant to be a function of the commands received thus far (as recorded in the
434    /// compute state command history) and the new commands from `command_rx`. It should not be a
435    /// function of other characteristics, like whether the worker has managed to respond to a peek
436    /// or not. Some effort goes in to narrowing our view to only the existing commands we can be sure
437    /// are live at all other workers.
438    ///
439    /// The methodology here is to drain `command_rx` until an `InitializationComplete`, at which point
440    /// the prior commands are "reconciled" in. Reconciliation takes each goal dataflow and looks for an
441    /// existing "compatible" dataflow (per `compatible()`) it can repurpose, with some additional tests
442    /// to be sure that we can cut over from one to the other (no additional compaction, no tails/sinks).
443    /// With any connections established, old orphaned dataflows are allow to compact away, and any new
444    /// dataflows are created from scratch. "Kept" dataflows are allowed to compact up to any new `as_of`.
445    ///
446    /// Some additional tidying happens, cleaning up pending peeks, reported frontiers, and creating a new
447    /// subscribe response buffer. We will need to be vigilant with future modifications to `ComputeState` to
448    /// line up changes there with clean resets here.
449    fn reconcile(&mut self) -> Result<(), NonceChange> {
450        // To initialize the connection, we want to drain all commands until we receive a
451        // `ComputeCommand::InitializationComplete` command to form a target command state.
452        let mut new_commands = Vec::new();
453        loop {
454            match self.recv_command()? {
455                ComputeCommand::InitializationComplete => break,
456                command => new_commands.push(command),
457            }
458        }
459
460        // Commands we will need to apply before entering normal service.
461        // These commands may include dropping existing dataflows, compacting existing dataflows,
462        // and creating new dataflows, in addition to standard peek and compaction commands.
463        // The result should be the same as if dropping all dataflows and running `new_commands`.
464        let mut todo_commands = Vec::new();
465        // We only have a compute history if we are in an initialized state
466        // (i.e. after a `CreateInstance`).
467        // If this is not the case, just copy `new_commands` into `todo_commands`.
468        if let Some(compute_state) = &mut self.compute_state {
469            // Reduce the installed commands.
470            // Importantly, act as if all peeks may have been retired (as we cannot know otherwise).
471            compute_state.command_history.discard_peeks();
472            compute_state.command_history.reduce();
473
474            // At this point, we need to sort out which of the *certainly installed* dataflows are
475            // suitable replacements for the requested dataflows. A dataflow is "certainly installed"
476            // as of a frontier if its compaction allows it to go no further. We ignore peeks for this
477            // reasoning, as we cannot be certain that peeks still exist at any other worker.
478
479            // Having reduced our installed command history retaining no peeks (above), we should be able
480            // to use track down installed dataflows we can use as surrogates for requested dataflows (which
481            // have retained all of their peeks, creating a more demanding `as_of` requirement).
482            // NB: installed dataflows may still be allowed to further compact, and we should double check
483            // this before being too confident. It should be rare without peeks, but could happen with e.g.
484            // multiple outputs of a dataflow.
485
486            // The values with which a prior `CreateInstance` was called, if it was.
487            let mut old_instance_config = None;
488            // Index dataflows by `export_ids().collect()`, as this is a precondition for their compatibility.
489            let mut old_dataflows = BTreeMap::default();
490            // Maintain allowed compaction, in case installed identifiers may have been allowed to compact.
491            let mut old_frontiers = BTreeMap::default();
492            for command in compute_state.command_history.iter() {
493                match command {
494                    ComputeCommand::CreateInstance(config) => {
495                        old_instance_config = Some(config);
496                    }
497                    ComputeCommand::CreateDataflow(dataflow) => {
498                        let export_ids = dataflow.export_ids().collect::<BTreeSet<_>>();
499                        old_dataflows.insert(export_ids, dataflow);
500                    }
501                    ComputeCommand::AllowCompaction { id, frontier } => {
502                        old_frontiers.insert(id, frontier);
503                    }
504                    _ => {
505                        // Nothing to do in these cases.
506                    }
507                }
508            }
509
510            // Compaction commands that can be applied to existing dataflows.
511            let mut old_compaction = BTreeMap::default();
512            // Exported identifiers from dataflows we retain.
513            let mut retain_ids = BTreeSet::default();
514
515            // Traverse new commands, sorting out what remediation we can do.
516            for command in new_commands.iter() {
517                match command {
518                    ComputeCommand::CreateDataflow(dataflow) => {
519                        // Attempt to find an existing match for the dataflow.
520                        let as_of = dataflow.as_of.as_ref().unwrap();
521                        let export_ids = dataflow.export_ids().collect::<BTreeSet<_>>();
522
523                        if let Some(old_dataflow) = old_dataflows.get(&export_ids) {
524                            let compatible = old_dataflow.compatible_with(dataflow);
525                            let uncompacted = !export_ids
526                                .iter()
527                                .flat_map(|id| old_frontiers.get(id))
528                                .any(|frontier| {
529                                    !timely::PartialOrder::less_equal(
530                                        *frontier,
531                                        dataflow.as_of.as_ref().unwrap(),
532                                    )
533                                });
534
535                            // We cannot reconcile subscribe and copy-to sinks at the moment,
536                            // because the response buffer is shared, and to a first approximation
537                            // must be completely reformed.
538                            let subscribe_free = dataflow.subscribe_ids().next().is_none();
539                            let copy_to_free = dataflow.copy_to_ids().next().is_none();
540
541                            // If we have replaced any dependency of this dataflow, we need to
542                            // replace this dataflow, to make it use the replacement.
543                            let dependencies_retained = dataflow
544                                .imported_index_ids()
545                                .all(|id| retain_ids.contains(&id));
546
547                            if compatible
548                                && uncompacted
549                                && subscribe_free
550                                && copy_to_free
551                                && dependencies_retained
552                            {
553                                // Match found; remove the match from the deletion queue,
554                                // and compact its outputs to the dataflow's `as_of`.
555                                old_dataflows.remove(&export_ids);
556                                for id in export_ids.iter() {
557                                    old_compaction.insert(*id, as_of.clone());
558                                }
559                                retain_ids.extend(export_ids);
560                            } else {
561                                warn!(
562                                    ?export_ids,
563                                    ?compatible,
564                                    ?uncompacted,
565                                    ?subscribe_free,
566                                    ?copy_to_free,
567                                    ?dependencies_retained,
568                                    old_as_of = ?old_dataflow.as_of,
569                                    new_as_of = ?as_of,
570                                    "dataflow reconciliation failed",
571                                );
572
573                                // Dump the full dataflow plans if they are incompatible, to
574                                // simplify debugging hard-to-reproduce reconciliation failures.
575                                if !compatible {
576                                    warn!(
577                                        old = ?old_dataflow,
578                                        new = ?dataflow,
579                                        "incompatible dataflows in reconciliation",
580                                    );
581                                }
582
583                                todo_commands
584                                    .push(ComputeCommand::CreateDataflow(dataflow.clone()));
585                            }
586
587                            compute_state.metrics.record_dataflow_reconciliation(
588                                compatible,
589                                uncompacted,
590                                subscribe_free,
591                                copy_to_free,
592                                dependencies_retained,
593                            );
594                        } else {
595                            todo_commands.push(ComputeCommand::CreateDataflow(dataflow.clone()));
596                        }
597                    }
598                    ComputeCommand::CreateInstance(config) => {
599                        // Cluster creation should not be performed again!
600                        if old_instance_config.map_or(false, |old| !old.compatible_with(config)) {
601                            halt!(
602                                "new instance configuration not compatible with existing instance configuration:\n{:?}\nvs\n{:?}",
603                                config,
604                                old_instance_config,
605                            );
606                        }
607                    }
608                    // All other commands we apply as requested.
609                    command => {
610                        todo_commands.push(command.clone());
611                    }
612                }
613            }
614
615            // Issue compaction commands first to reclaim resources.
616            for (_, dataflow) in old_dataflows.iter() {
617                for id in dataflow.export_ids() {
618                    // We want to drop anything that has not yet been dropped,
619                    // and nothing that has already been dropped.
620                    if old_frontiers.get(&id) != Some(&&Antichain::new()) {
621                        old_compaction.insert(id, Antichain::new());
622                    }
623                }
624            }
625            for (&id, frontier) in &old_compaction {
626                let frontier = frontier.clone();
627                todo_commands.insert(0, ComputeCommand::AllowCompaction { id, frontier });
628            }
629
630            // Clean up worker-local state.
631            //
632            // Various aspects of `ComputeState` need to be either uninstalled, or return to a blank slate.
633            // All dropped dataflows should clean up after themselves, as we plan to install new dataflows
634            // re-using the same identifiers.
635            // All re-used dataflows should roll back any believed communicated information (e.g. frontiers)
636            // so that they recommunicate that information as if from scratch.
637
638            // Remove all pending peeks.
639            for (_, peek) in std::mem::take(&mut compute_state.pending_peeks) {
640                // Log dropping the peek request.
641                if let Some(logger) = compute_state.compute_logger.as_mut() {
642                    logger.log(&peek.as_log_event(false));
643                }
644            }
645
646            for (&id, collection) in compute_state.collections.iter_mut() {
647                // Adjust reported frontiers:
648                //  * For dataflows we continue to use, reset to ensure we report something not
649                //    before the new `as_of` next.
650                //  * For dataflows we drop, set to the empty frontier, to ensure we don't report
651                //    anything for them.
652                let retained = retain_ids.contains(&id);
653                let compaction = old_compaction.remove(&id);
654                let new_reported_frontier = match (retained, compaction) {
655                    (true, Some(new_as_of)) => ReportedFrontier::NotReported { lower: new_as_of },
656                    (true, None) => {
657                        unreachable!("retained dataflows are compacted to the new as_of")
658                    }
659                    (false, Some(new_frontier)) => {
660                        assert!(new_frontier.is_empty());
661                        ReportedFrontier::Reported(new_frontier)
662                    }
663                    (false, None) => {
664                        // Logging dataflows are implicitly retained and don't have a new as_of.
665                        // Reset them to the minimal frontier.
666                        ReportedFrontier::new()
667                    }
668                };
669
670                collection.reset_reported_frontiers(new_reported_frontier);
671
672                // Sink tokens should be retained for retained dataflows, and dropped for dropped
673                // dataflows.
674                //
675                // Dropping the tokens of active subscribe and copy-tos makes them place
676                // `DroppedAt` responses into the respective response buffer. We drop those buffers
677                // in the next step, which ensures that we don't send out `DroppedAt` responses for
678                // subscribe/copy-tos dropped during reconciliation.
679                if !retained {
680                    collection.sink_token = None;
681                }
682            }
683
684            // We must drop the response buffers as they are global across all subscribe/copy-tos.
685            // If they were broken out by `GlobalId` then we could drop only the response buffers
686            // of dataflows we drop.
687            compute_state.subscribe_response_buffer = Rc::new(RefCell::new(Vec::new()));
688            compute_state.copy_to_response_buffer = Rc::new(RefCell::new(Vec::new()));
689
690            // The controller expects the logging collections to be readable from the minimum time
691            // initially. We cannot recreate the logging arrangements without restarting the
692            // instance, but we can pad the compacted times with empty data. Doing so is sound
693            // because logging collections from different replica incarnations are considered
694            // distinct TVCs, so the controller doesn't expect any historical consistency from
695            // these collections when it reconnects to a replica.
696            //
697            // TODO(database-issues#8152): Consider resolving this with controller-side reconciliation instead.
698            if let Some(config) = old_instance_config {
699                for id in config.logging.index_logs.values() {
700                    let trace = compute_state
701                        .traces
702                        .remove(id)
703                        .expect("logging trace exists");
704                    let padded = trace.into_padded();
705                    compute_state.traces.set(*id, padded);
706                }
707            }
708        } else {
709            todo_commands.clone_from(&new_commands);
710        }
711
712        // Execute the commands to bring us to `new_commands`.
713        for command in todo_commands.into_iter() {
714            self.handle_command(command);
715        }
716
717        // Overwrite `self.command_history` to reflect `new_commands`.
718        // It is possible that there still isn't a compute state yet.
719        if let Some(compute_state) = &mut self.compute_state {
720            let mut command_history = ComputeCommandHistory::new(self.metrics.for_history());
721            for command in new_commands.iter() {
722                command_history.push(command.clone());
723            }
724            compute_state.command_history = command_history;
725        }
726        Ok(())
727    }
728}
729
730/// Spawn a task to bridge between [`ClusterClient`] and [`Worker`] channels.
731///
732/// The [`Worker`] expects a pair of persistent channels, with punctuation marking reconnects,
733/// while the [`ClusterClient`] provides a new pair of channels on each reconnect.
734fn spawn_channel_adapter(
735    mut client_rx: mpsc::UnboundedReceiver<(
736        Uuid,
737        mpsc::UnboundedReceiver<ComputeCommand>,
738        mpsc::UnboundedSender<ComputeResponse>,
739    )>,
740    command_tx: command_channel::Sender,
741    mut response_rx: mpsc::UnboundedReceiver<(ComputeResponse, Uuid)>,
742    worker_id: usize,
743) {
744    mz_ore::task::spawn(
745        || format!("compute-channel-adapter-{worker_id}"),
746        async move {
747            // To make workers aware of the individual client connections, we tag forwarded
748            // commands with the client nonce. Additionally, we use the nonce to filter out
749            // responses with a different nonce, which are intended for different client
750            // connections.
751            //
752            // It's possible that we receive responses with nonces from the past but also from the
753            // future: Worker 0 might have received a new nonce before us and broadcasted it to our
754            // Timely cluster. When we receive a response with a future nonce, we need to wait with
755            // forwarding it until we have received the same nonce from a client connection.
756            //
757            // Nonces are not ordered so we don't know whether a response nonce is from the past or
758            // the future. We thus assume that every response with an unknown nonce might be from
759            // the future and stash them all. Every time we reconnect, we immediately send all
760            // stashed responses with a matching nonce. Every time we receive a new response with a
761            // nonce that matches our current one, we can discard the entire response stash as we
762            // know that all stashed responses must be from the past.
763            let mut stashed_responses = BTreeMap::<Uuid, Vec<ComputeResponse>>::new();
764
765            while let Some((nonce, mut command_rx, response_tx)) = client_rx.recv().await {
766                // Send stashed responses for this client.
767                if let Some(resps) = stashed_responses.remove(&nonce) {
768                    for resp in resps {
769                        let _ = response_tx.send(resp);
770                    }
771                }
772
773                // Wait for a new response while forwarding received commands.
774                let mut serve_rx_channels = async || loop {
775                    tokio::select! {
776                        msg = command_rx.recv() => match msg {
777                            Some(cmd) => command_tx.send((cmd, nonce)),
778                            None => return Err(()),
779                        },
780                        msg = response_rx.recv() => {
781                            return Ok(msg.expect("worker connected"));
782                        }
783                    }
784                };
785
786                // Serve this connection until we see any of the channels disconnect.
787                loop {
788                    let Ok((resp, resp_nonce)) = serve_rx_channels().await else {
789                        break;
790                    };
791
792                    if resp_nonce == nonce {
793                        // Response for the current connection; forward it.
794                        stashed_responses.clear();
795                        if response_tx.send(resp).is_err() {
796                            break;
797                        }
798                    } else {
799                        // Response for a past or future connection; stash it.
800                        let stash = stashed_responses.entry(resp_nonce).or_default();
801                        stash.push(resp);
802                    }
803                }
804            }
805        },
806    );
807}