mz_compute/
server.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! An interactive dataflow server.
11
12use std::cell::RefCell;
13use std::collections::{BTreeMap, BTreeSet};
14use std::convert::Infallible;
15use std::fmt::Debug;
16use std::path::PathBuf;
17use std::rc::Rc;
18use std::sync::{Arc, Mutex};
19use std::thread;
20use std::time::{Duration, Instant};
21
22use anyhow::Error;
23use crossbeam_channel::SendError;
24use mz_cluster::client::{ClusterClient, ClusterSpec};
25use mz_cluster_client::client::TimelyConfig;
26use mz_compute_client::protocol::command::ComputeCommand;
27use mz_compute_client::protocol::history::ComputeCommandHistory;
28use mz_compute_client::protocol::response::ComputeResponse;
29use mz_compute_client::service::ComputeClient;
30use mz_ore::halt;
31use mz_ore::metrics::MetricsRegistry;
32use mz_ore::tracing::TracingHandle;
33use mz_persist_client::cache::PersistClientCache;
34use mz_storage_types::connections::ConnectionContext;
35use mz_txn_wal::operator::TxnsContext;
36use timely::communication::Allocate;
37use timely::progress::Antichain;
38use timely::worker::Worker as TimelyWorker;
39use tokio::sync::mpsc;
40use tracing::{info, trace, warn};
41use uuid::Uuid;
42
43use crate::command_channel;
44use crate::compute_state::{ActiveComputeState, ComputeState, ReportedFrontier};
45use crate::metrics::{ComputeMetrics, WorkerMetrics};
46
47/// Caller-provided configuration for compute.
48#[derive(Clone, Debug)]
49pub struct ComputeInstanceContext {
50    /// A directory that can be used for scratch work.
51    pub scratch_directory: Option<PathBuf>,
52    /// Whether to set core affinity for Timely workers.
53    pub worker_core_affinity: bool,
54    /// Context required to connect to an external sink from compute,
55    /// like the `CopyToS3OneshotSink` compute sink.
56    pub connection_context: ConnectionContext,
57}
58
59/// Configures the server with compute-specific metrics.
60#[derive(Debug, Clone)]
61struct Config {
62    /// `persist` client cache.
63    pub persist_clients: Arc<PersistClientCache>,
64    /// Context necessary for rendering txn-wal operators.
65    pub txns_ctx: TxnsContext,
66    /// A process-global handle to tracing configuration.
67    pub tracing_handle: Arc<TracingHandle>,
68    /// Metrics exposed by compute replicas.
69    pub metrics: ComputeMetrics,
70    /// Other configuration for compute.
71    pub context: ComputeInstanceContext,
72}
73
74/// Initiates a timely dataflow computation, processing compute commands.
75pub async fn serve(
76    timely_config: TimelyConfig,
77    metrics_registry: &MetricsRegistry,
78    persist_clients: Arc<PersistClientCache>,
79    txns_ctx: TxnsContext,
80    tracing_handle: Arc<TracingHandle>,
81    context: ComputeInstanceContext,
82) -> Result<impl Fn() -> Box<dyn ComputeClient> + use<>, Error> {
83    let config = Config {
84        persist_clients,
85        txns_ctx,
86        tracing_handle,
87        metrics: ComputeMetrics::register_with(metrics_registry),
88        context,
89    };
90    let tokio_executor = tokio::runtime::Handle::current();
91
92    let timely_container = config.build_cluster(timely_config, tokio_executor).await?;
93    let timely_container = Arc::new(Mutex::new(timely_container));
94
95    let client_builder = move || {
96        let client = ClusterClient::new(Arc::clone(&timely_container));
97        let client: Box<dyn ComputeClient> = Box::new(client);
98        client
99    };
100
101    Ok(client_builder)
102}
103
104/// Error type returned on connection nonce changes.
105///
106/// A nonce change informs workers that subsequent commands come a from a new client connection
107/// and therefore require reconciliation.
108struct NonceChange(Uuid);
109
110/// Endpoint used by workers to receive compute commands.
111///
112/// Observes nonce changes in the command stream and converts them into receive errors.
113struct CommandReceiver {
114    /// The channel supplying commands.
115    inner: command_channel::Receiver,
116    /// The ID of the Timely worker.
117    worker_id: usize,
118    /// The nonce identifying the current cluster protocol incarnation.
119    nonce: Option<Uuid>,
120    /// A stash to enable peeking the next command, used in `try_recv`.
121    stashed_command: Option<ComputeCommand>,
122}
123
124impl CommandReceiver {
125    fn new(inner: command_channel::Receiver, worker_id: usize) -> Self {
126        Self {
127            inner,
128            worker_id,
129            nonce: None,
130            stashed_command: None,
131        }
132    }
133
134    /// Receive the next pending command, if any.
135    ///
136    /// If the next command has a different nonce, this method instead returns an `Err`
137    /// containing the new nonce.
138    fn try_recv(&mut self) -> Result<Option<ComputeCommand>, NonceChange> {
139        if let Some(command) = self.stashed_command.take() {
140            return Ok(Some(command));
141        }
142        let Some((command, nonce)) = self.inner.try_recv() else {
143            return Ok(None);
144        };
145
146        trace!(worker = self.worker_id, %nonce, ?command, "received command");
147
148        if Some(nonce) == self.nonce {
149            Ok(Some(command))
150        } else {
151            self.nonce = Some(nonce);
152            self.stashed_command = Some(command);
153            Err(NonceChange(nonce))
154        }
155    }
156}
157
158/// Endpoint used by workers to send sending compute responses.
159///
160/// Tags responses with the current nonce, allowing receivers to filter out responses intended for
161/// previous client connections.
162pub(crate) struct ResponseSender {
163    /// The channel consuming responses.
164    inner: crossbeam_channel::Sender<(ComputeResponse, Uuid)>,
165    /// The ID of the Timely worker.
166    worker_id: usize,
167    /// The nonce identifying the current cluster protocol incarnation.
168    nonce: Option<Uuid>,
169}
170
171impl ResponseSender {
172    fn new(inner: crossbeam_channel::Sender<(ComputeResponse, Uuid)>, worker_id: usize) -> Self {
173        Self {
174            inner,
175            worker_id,
176            nonce: None,
177        }
178    }
179
180    /// Set the cluster protocol nonce.
181    fn set_nonce(&mut self, nonce: Uuid) {
182        self.nonce = Some(nonce);
183    }
184
185    /// Send a compute response.
186    pub fn send(&self, response: ComputeResponse) -> Result<(), SendError<ComputeResponse>> {
187        let nonce = self.nonce.expect("nonce must be initialized");
188
189        trace!(worker = self.worker_id, %nonce, ?response, "sending response");
190        self.inner
191            .send((response, nonce))
192            .map_err(|SendError((resp, _))| SendError(resp))
193    }
194}
195
196/// State maintained for each worker thread.
197///
198/// Much of this state can be viewed as local variables for the worker thread,
199/// holding state that persists across function calls.
200struct Worker<'w, A: Allocate> {
201    /// The underlying Timely worker.
202    timely_worker: &'w mut TimelyWorker<A>,
203    /// The channel over which commands are received.
204    command_rx: CommandReceiver,
205    /// The channel over which responses are sent.
206    response_tx: ResponseSender,
207    compute_state: Option<ComputeState>,
208    /// Compute metrics.
209    metrics: WorkerMetrics,
210    /// A process-global cache of (blob_uri, consensus_uri) -> PersistClient.
211    /// This is intentionally shared between workers
212    persist_clients: Arc<PersistClientCache>,
213    /// Context necessary for rendering txn-wal operators.
214    txns_ctx: TxnsContext,
215    /// A process-global handle to tracing configuration.
216    tracing_handle: Arc<TracingHandle>,
217    context: ComputeInstanceContext,
218}
219
220impl ClusterSpec for Config {
221    type Command = ComputeCommand;
222    type Response = ComputeResponse;
223
224    fn run_worker<A: Allocate + 'static>(
225        &self,
226        timely_worker: &mut TimelyWorker<A>,
227        client_rx: crossbeam_channel::Receiver<(
228            Uuid,
229            crossbeam_channel::Receiver<ComputeCommand>,
230            mpsc::UnboundedSender<ComputeResponse>,
231        )>,
232    ) {
233        if self.context.worker_core_affinity {
234            set_core_affinity(timely_worker.index());
235        }
236
237        let worker_id = timely_worker.index();
238        let metrics = self.metrics.for_worker(worker_id);
239
240        // Create the command channel that broadcasts commands from worker 0 to other workers. We
241        // reuse this channel between client connections, to avoid bugs where different workers end
242        // up creating incompatible sides of the channel dataflow after reconnects.
243        // See database-issues#8964.
244        let (cmd_tx, cmd_rx) = command_channel::render(timely_worker);
245        let (resp_tx, resp_rx) = crossbeam_channel::unbounded();
246
247        spawn_channel_adapter(client_rx, cmd_tx, resp_rx, worker_id);
248
249        Worker {
250            timely_worker,
251            command_rx: CommandReceiver::new(cmd_rx, worker_id),
252            response_tx: ResponseSender::new(resp_tx, worker_id),
253            metrics,
254            context: self.context.clone(),
255            persist_clients: Arc::clone(&self.persist_clients),
256            txns_ctx: self.txns_ctx.clone(),
257            compute_state: None,
258            tracing_handle: Arc::clone(&self.tracing_handle),
259        }
260        .run()
261    }
262}
263
264/// Set the current thread's core affinity, based on the given `worker_id`.
265#[cfg(not(target_os = "macos"))]
266fn set_core_affinity(worker_id: usize) {
267    use tracing::error;
268
269    let Some(mut core_ids) = core_affinity::get_core_ids() else {
270        error!(worker_id, "unable to get core IDs for setting affinity");
271        return;
272    };
273
274    // The `get_core_ids` docs don't say anything about a guaranteed order of the returned Vec,
275    // so sort it just to be safe.
276    core_ids.sort_unstable_by_key(|i| i.id);
277
278    // On multi-process replicas `worker_id` might be greater than the number of available cores.
279    // However, we assume that we always have at least as many cores as there are local workers.
280    // Violating this assumption is safe but might lead to degraded performance due to skew in core
281    // utilization.
282    let idx = worker_id % core_ids.len();
283    let core_id = core_ids[idx];
284
285    if core_affinity::set_for_current(core_id) {
286        info!(
287            worker_id,
288            core_id = core_id.id,
289            "set core affinity for worker"
290        );
291    } else {
292        error!(
293            worker_id,
294            core_id = core_id.id,
295            "failed to set core affinity for worker"
296        )
297    }
298}
299
300/// Set the current thread's core affinity, based on the given `worker_id`.
301#[cfg(target_os = "macos")]
302fn set_core_affinity(_worker_id: usize) {
303    // Setting core affinity is known to not work on Apple Silicon:
304    // https://github.com/Elzair/core_affinity_rs/issues/22
305    info!("setting core affinity is not supported on macOS");
306}
307
308impl<'w, A: Allocate + 'static> Worker<'w, A> {
309    /// Runs a compute worker.
310    pub fn run(&mut self) {
311        // The command receiver is initialized without an nonce, so receiving the first command
312        // always triggers a nonce change.
313        let NonceChange(nonce) = self.recv_command().expect_err("change to first nonce");
314        self.set_nonce(nonce);
315
316        loop {
317            let Err(NonceChange(nonce)) = self.run_client();
318            self.set_nonce(nonce);
319        }
320    }
321
322    fn set_nonce(&mut self, nonce: Uuid) {
323        self.response_tx.set_nonce(nonce);
324    }
325
326    /// Handles commands for a client connection, returns when the nonce changes.
327    fn run_client(&mut self) -> Result<Infallible, NonceChange> {
328        self.reconcile()?;
329
330        // The last time we did periodic maintenance.
331        let mut last_maintenance = Instant::now();
332
333        // Commence normal operation.
334        loop {
335            // Get the maintenance interval, default to zero if we don't have a compute state.
336            let maintenance_interval = self
337                .compute_state
338                .as_ref()
339                .map_or(Duration::ZERO, |state| state.server_maintenance_interval);
340
341            let now = Instant::now();
342            // Determine if we need to perform maintenance, which is true if `maintenance_interval`
343            // time has passed since the last maintenance.
344            let sleep_duration;
345            if now >= last_maintenance + maintenance_interval {
346                last_maintenance = now;
347                sleep_duration = None;
348
349                // Report frontier information back the coordinator.
350                if let Some(mut compute_state) = self.activate_compute() {
351                    compute_state.compute_state.traces.maintenance();
352                    // Report operator hydration before frontiers, as reporting frontiers may
353                    // affect hydration reporting.
354                    compute_state.report_operator_hydration();
355                    compute_state.report_frontiers();
356                    compute_state.report_dropped_collections();
357                    compute_state.report_metrics();
358                    compute_state.check_expiration();
359                }
360
361                self.metrics.record_shared_row_metrics();
362            } else {
363                // We didn't perform maintenance, sleep until the next maintenance interval.
364                let next_maintenance = last_maintenance + maintenance_interval;
365                sleep_duration = Some(next_maintenance.saturating_duration_since(now))
366            };
367
368            // Step the timely worker, recording the time taken.
369            let timer = self.metrics.timely_step_duration_seconds.start_timer();
370            self.timely_worker.step_or_park(sleep_duration);
371            timer.observe_duration();
372
373            self.handle_pending_commands()?;
374
375            if let Some(mut compute_state) = self.activate_compute() {
376                compute_state.process_peeks();
377                compute_state.process_subscribes();
378                compute_state.process_copy_tos();
379            }
380        }
381    }
382
383    fn handle_pending_commands(&mut self) -> Result<(), NonceChange> {
384        while let Some(cmd) = self.command_rx.try_recv()? {
385            self.handle_command(cmd);
386        }
387        Ok(())
388    }
389
390    fn handle_command(&mut self, cmd: ComputeCommand) {
391        match &cmd {
392            ComputeCommand::CreateInstance(_) => {
393                self.compute_state = Some(ComputeState::new(
394                    Arc::clone(&self.persist_clients),
395                    self.txns_ctx.clone(),
396                    self.metrics.clone(),
397                    Arc::clone(&self.tracing_handle),
398                    self.context.clone(),
399                ));
400            }
401            _ => (),
402        }
403        self.activate_compute().unwrap().handle_compute_command(cmd);
404    }
405
406    fn activate_compute(&mut self) -> Option<ActiveComputeState<'_, A>> {
407        if let Some(compute_state) = &mut self.compute_state {
408            Some(ActiveComputeState {
409                timely_worker: &mut *self.timely_worker,
410                compute_state,
411                response_tx: &mut self.response_tx,
412            })
413        } else {
414            None
415        }
416    }
417
418    /// Receive the next compute command.
419    ///
420    /// This method blocks if no command is currently available, but takes care to step the Timely
421    /// worker while doing so.
422    fn recv_command(&mut self) -> Result<ComputeCommand, NonceChange> {
423        loop {
424            if let Some(cmd) = self.command_rx.try_recv()? {
425                return Ok(cmd);
426            }
427
428            let start = Instant::now();
429            self.timely_worker.step_or_park(None);
430            self.metrics
431                .timely_step_duration_seconds
432                .observe(start.elapsed().as_secs_f64());
433        }
434    }
435
436    /// Extract commands until `InitializationComplete`, and make the worker reflect those commands.
437    ///
438    /// This method is meant to be a function of the commands received thus far (as recorded in the
439    /// compute state command history) and the new commands from `command_rx`. It should not be a
440    /// function of other characteristics, like whether the worker has managed to respond to a peek
441    /// or not. Some effort goes in to narrowing our view to only the existing commands we can be sure
442    /// are live at all other workers.
443    ///
444    /// The methodology here is to drain `command_rx` until an `InitializationComplete`, at which point
445    /// the prior commands are "reconciled" in. Reconciliation takes each goal dataflow and looks for an
446    /// existing "compatible" dataflow (per `compatible()`) it can repurpose, with some additional tests
447    /// to be sure that we can cut over from one to the other (no additional compaction, no tails/sinks).
448    /// With any connections established, old orphaned dataflows are allow to compact away, and any new
449    /// dataflows are created from scratch. "Kept" dataflows are allowed to compact up to any new `as_of`.
450    ///
451    /// Some additional tidying happens, cleaning up pending peeks, reported frontiers, and creating a new
452    /// subscribe response buffer. We will need to be vigilant with future modifications to `ComputeState` to
453    /// line up changes there with clean resets here.
454    fn reconcile(&mut self) -> Result<(), NonceChange> {
455        // To initialize the connection, we want to drain all commands until we receive a
456        // `ComputeCommand::InitializationComplete` command to form a target command state.
457        let mut new_commands = Vec::new();
458        loop {
459            match self.recv_command()? {
460                ComputeCommand::InitializationComplete => break,
461                command => new_commands.push(command),
462            }
463        }
464
465        // Commands we will need to apply before entering normal service.
466        // These commands may include dropping existing dataflows, compacting existing dataflows,
467        // and creating new dataflows, in addition to standard peek and compaction commands.
468        // The result should be the same as if dropping all dataflows and running `new_commands`.
469        let mut todo_commands = Vec::new();
470        // We only have a compute history if we are in an initialized state
471        // (i.e. after a `CreateInstance`).
472        // If this is not the case, just copy `new_commands` into `todo_commands`.
473        if let Some(compute_state) = &mut self.compute_state {
474            // Reduce the installed commands.
475            // Importantly, act as if all peeks may have been retired (as we cannot know otherwise).
476            compute_state.command_history.discard_peeks();
477            compute_state.command_history.reduce();
478
479            // At this point, we need to sort out which of the *certainly installed* dataflows are
480            // suitable replacements for the requested dataflows. A dataflow is "certainly installed"
481            // as of a frontier if its compaction allows it to go no further. We ignore peeks for this
482            // reasoning, as we cannot be certain that peeks still exist at any other worker.
483
484            // Having reduced our installed command history retaining no peeks (above), we should be able
485            // to use track down installed dataflows we can use as surrogates for requested dataflows (which
486            // have retained all of their peeks, creating a more demanding `as_of` requirement).
487            // NB: installed dataflows may still be allowed to further compact, and we should double check
488            // this before being too confident. It should be rare without peeks, but could happen with e.g.
489            // multiple outputs of a dataflow.
490
491            // The values with which a prior `CreateInstance` was called, if it was.
492            let mut old_instance_config = None;
493            // Index dataflows by `export_ids().collect()`, as this is a precondition for their compatibility.
494            let mut old_dataflows = BTreeMap::default();
495            // Maintain allowed compaction, in case installed identifiers may have been allowed to compact.
496            let mut old_frontiers = BTreeMap::default();
497            for command in compute_state.command_history.iter() {
498                match command {
499                    ComputeCommand::CreateInstance(config) => {
500                        old_instance_config = Some(config);
501                    }
502                    ComputeCommand::CreateDataflow(dataflow) => {
503                        let export_ids = dataflow.export_ids().collect::<BTreeSet<_>>();
504                        old_dataflows.insert(export_ids, dataflow);
505                    }
506                    ComputeCommand::AllowCompaction { id, frontier } => {
507                        old_frontiers.insert(id, frontier);
508                    }
509                    _ => {
510                        // Nothing to do in these cases.
511                    }
512                }
513            }
514
515            // Compaction commands that can be applied to existing dataflows.
516            let mut old_compaction = BTreeMap::default();
517            // Exported identifiers from dataflows we retain.
518            let mut retain_ids = BTreeSet::default();
519
520            // Traverse new commands, sorting out what remediation we can do.
521            for command in new_commands.iter() {
522                match command {
523                    ComputeCommand::CreateDataflow(dataflow) => {
524                        // Attempt to find an existing match for the dataflow.
525                        let as_of = dataflow.as_of.as_ref().unwrap();
526                        let export_ids = dataflow.export_ids().collect::<BTreeSet<_>>();
527
528                        if let Some(old_dataflow) = old_dataflows.get(&export_ids) {
529                            let compatible = old_dataflow.compatible_with(dataflow);
530                            let uncompacted = !export_ids
531                                .iter()
532                                .flat_map(|id| old_frontiers.get(id))
533                                .any(|frontier| {
534                                    !timely::PartialOrder::less_equal(
535                                        *frontier,
536                                        dataflow.as_of.as_ref().unwrap(),
537                                    )
538                                });
539
540                            // We cannot reconcile subscribe and copy-to sinks at the moment,
541                            // because the response buffer is shared, and to a first approximation
542                            // must be completely reformed.
543                            let subscribe_free = dataflow.subscribe_ids().next().is_none();
544                            let copy_to_free = dataflow.copy_to_ids().next().is_none();
545
546                            // If we have replaced any dependency of this dataflow, we need to
547                            // replace this dataflow, to make it use the replacement.
548                            let dependencies_retained = dataflow
549                                .imported_index_ids()
550                                .all(|id| retain_ids.contains(&id));
551
552                            if compatible
553                                && uncompacted
554                                && subscribe_free
555                                && copy_to_free
556                                && dependencies_retained
557                            {
558                                // Match found; remove the match from the deletion queue,
559                                // and compact its outputs to the dataflow's `as_of`.
560                                old_dataflows.remove(&export_ids);
561                                for id in export_ids.iter() {
562                                    old_compaction.insert(*id, as_of.clone());
563                                }
564                                retain_ids.extend(export_ids);
565                            } else {
566                                warn!(
567                                    ?export_ids,
568                                    ?compatible,
569                                    ?uncompacted,
570                                    ?subscribe_free,
571                                    ?copy_to_free,
572                                    ?dependencies_retained,
573                                    old_as_of = ?old_dataflow.as_of,
574                                    new_as_of = ?as_of,
575                                    "dataflow reconciliation failed",
576                                );
577
578                                // Dump the full dataflow plans if they are incompatible, to
579                                // simplify debugging hard-to-reproduce reconciliation failures.
580                                if !compatible {
581                                    warn!(
582                                        old = ?old_dataflow,
583                                        new = ?dataflow,
584                                        "incompatible dataflows in reconciliation",
585                                    );
586                                }
587
588                                todo_commands
589                                    .push(ComputeCommand::CreateDataflow(dataflow.clone()));
590                            }
591
592                            compute_state.metrics.record_dataflow_reconciliation(
593                                compatible,
594                                uncompacted,
595                                subscribe_free,
596                                copy_to_free,
597                                dependencies_retained,
598                            );
599                        } else {
600                            todo_commands.push(ComputeCommand::CreateDataflow(dataflow.clone()));
601                        }
602                    }
603                    ComputeCommand::CreateInstance(config) => {
604                        // Cluster creation should not be performed again!
605                        if old_instance_config.map_or(false, |old| !old.compatible_with(config)) {
606                            halt!(
607                                "new instance configuration not compatible with existing instance configuration:\n{:?}\nvs\n{:?}",
608                                config,
609                                old_instance_config,
610                            );
611                        }
612                    }
613                    // All other commands we apply as requested.
614                    command => {
615                        todo_commands.push(command.clone());
616                    }
617                }
618            }
619
620            // Issue compaction commands first to reclaim resources.
621            for (_, dataflow) in old_dataflows.iter() {
622                for id in dataflow.export_ids() {
623                    // We want to drop anything that has not yet been dropped,
624                    // and nothing that has already been dropped.
625                    if old_frontiers.get(&id) != Some(&&Antichain::new()) {
626                        old_compaction.insert(id, Antichain::new());
627                    }
628                }
629            }
630            for (&id, frontier) in &old_compaction {
631                let frontier = frontier.clone();
632                todo_commands.insert(0, ComputeCommand::AllowCompaction { id, frontier });
633            }
634
635            // Clean up worker-local state.
636            //
637            // Various aspects of `ComputeState` need to be either uninstalled, or return to a blank slate.
638            // All dropped dataflows should clean up after themselves, as we plan to install new dataflows
639            // re-using the same identifiers.
640            // All re-used dataflows should roll back any believed communicated information (e.g. frontiers)
641            // so that they recommunicate that information as if from scratch.
642
643            // Remove all pending peeks.
644            for (_, peek) in std::mem::take(&mut compute_state.pending_peeks) {
645                // Log dropping the peek request.
646                if let Some(logger) = compute_state.compute_logger.as_mut() {
647                    logger.log(&peek.as_log_event(false));
648                }
649            }
650
651            // Clear the list of dropped collections.
652            // We intended to report their dropping, but the controller does not expect to hear
653            // about them anymore.
654            compute_state.dropped_collections = Default::default();
655
656            for (&id, collection) in compute_state.collections.iter_mut() {
657                // Adjust reported frontiers:
658                //  * For dataflows we continue to use, reset to ensure we report something not
659                //    before the new `as_of` next.
660                //  * For dataflows we drop, set to the empty frontier, to ensure we don't report
661                //    anything for them.
662                let retained = retain_ids.contains(&id);
663                let compaction = old_compaction.remove(&id);
664                let new_reported_frontier = match (retained, compaction) {
665                    (true, Some(new_as_of)) => ReportedFrontier::NotReported { lower: new_as_of },
666                    (true, None) => {
667                        unreachable!("retained dataflows are compacted to the new as_of")
668                    }
669                    (false, Some(new_frontier)) => {
670                        assert!(new_frontier.is_empty());
671                        ReportedFrontier::Reported(new_frontier)
672                    }
673                    (false, None) => {
674                        // Logging dataflows are implicitly retained and don't have a new as_of.
675                        // Reset them to the minimal frontier.
676                        ReportedFrontier::new()
677                    }
678                };
679
680                collection.reset_reported_frontiers(new_reported_frontier);
681
682                // Sink tokens should be retained for retained dataflows, and dropped for dropped
683                // dataflows.
684                //
685                // Dropping the tokens of active subscribe and copy-tos makes them place
686                // `DroppedAt` responses into the respective response buffer. We drop those buffers
687                // in the next step, which ensures that we don't send out `DroppedAt` responses for
688                // subscribe/copy-tos dropped during reconciliation.
689                if !retained {
690                    collection.sink_token = None;
691                }
692            }
693
694            // We must drop the response buffers as they are global across all subscribe/copy-tos.
695            // If they were broken out by `GlobalId` then we could drop only the response buffers
696            // of dataflows we drop.
697            compute_state.subscribe_response_buffer = Rc::new(RefCell::new(Vec::new()));
698            compute_state.copy_to_response_buffer = Rc::new(RefCell::new(Vec::new()));
699
700            // The controller expects the logging collections to be readable from the minimum time
701            // initially. We cannot recreate the logging arrangements without restarting the
702            // instance, but we can pad the compacted times with empty data. Doing so is sound
703            // because logging collections from different replica incarnations are considered
704            // distinct TVCs, so the controller doesn't expect any historical consistency from
705            // these collections when it reconnects to a replica.
706            //
707            // TODO(database-issues#8152): Consider resolving this with controller-side reconciliation instead.
708            if let Some(config) = old_instance_config {
709                for id in config.logging.index_logs.values() {
710                    let trace = compute_state
711                        .traces
712                        .remove(id)
713                        .expect("logging trace exists");
714                    let padded = trace.into_padded();
715                    compute_state.traces.set(*id, padded);
716                }
717            }
718        } else {
719            todo_commands.clone_from(&new_commands);
720        }
721
722        // Execute the commands to bring us to `new_commands`.
723        for command in todo_commands.into_iter() {
724            self.handle_command(command);
725        }
726
727        // Overwrite `self.command_history` to reflect `new_commands`.
728        // It is possible that there still isn't a compute state yet.
729        if let Some(compute_state) = &mut self.compute_state {
730            let mut command_history = ComputeCommandHistory::new(self.metrics.for_history());
731            for command in new_commands.iter() {
732                command_history.push(command.clone());
733            }
734            compute_state.command_history = command_history;
735        }
736        Ok(())
737    }
738}
739
740/// Spawn a thread to bridge between [`ClusterClient`] and [`Worker`] channels.
741///
742/// The [`Worker`] expects a pair of persistent channels, with punctuation marking reconnects,
743/// while the [`ClusterClient`] provides a new pair of channels on each reconnect.
744fn spawn_channel_adapter(
745    client_rx: crossbeam_channel::Receiver<(
746        Uuid,
747        crossbeam_channel::Receiver<ComputeCommand>,
748        mpsc::UnboundedSender<ComputeResponse>,
749    )>,
750    command_tx: command_channel::Sender,
751    response_rx: crossbeam_channel::Receiver<(ComputeResponse, Uuid)>,
752    worker_id: usize,
753) {
754    thread::Builder::new()
755        // "cca" stands for "compute channel adapter". We need to shorten that because Linux has a
756        // 15-character limit for thread names.
757        .name(format!("cca-{worker_id}"))
758        .spawn(move || {
759            // To make workers aware of the individual client connections, we tag forwarded
760            // commands with the client nonce. Additionally, we use the nonce to filter out
761            // responses with a different nonce, which are intended for different client
762            // connections.
763            //
764            // It's possible that we receive responses with nonces from the past but also from the
765            // future: Worker 0 might have received a new nonce before us and broadcasted it to our
766            // Timely cluster. When we receive a response with a future nonce, we need to wait with
767            // forwarding it until we have received the same nonce from a client connection.
768            //
769            // Nonces are not ordered so we don't know whether a response nonce is from the past or
770            // the future. We thus assume that every response with an unknown nonce might be from
771            // the future and stash them all. Every time we reconnect, we immediately send all
772            // stashed responses with a matching nonce. Every time we receive a new response with a
773            // nonce that matches our current one, we can discard the entire response stash as we
774            // know that all stashed responses must be from the past.
775            let mut stashed_responses = BTreeMap::<Uuid, Vec<ComputeResponse>>::new();
776
777            while let Ok((nonce, command_rx, response_tx)) = client_rx.recv() {
778                // Send stashed responses for this client.
779                if let Some(resps) = stashed_responses.remove(&nonce) {
780                    for resp in resps {
781                        let _ = response_tx.send(resp);
782                    }
783                }
784
785                // Wait for a new response while forwarding received commands.
786                let serve_rx_channels = || loop {
787                    crossbeam_channel::select! {
788                        recv(command_rx) -> msg => match msg {
789                            Ok(cmd) => command_tx.send((cmd, nonce)),
790                            Err(_) => return Err(()),
791                        },
792                        recv(response_rx) -> msg => {
793                            return Ok(msg.expect("worker connected"));
794                        }
795                    }
796                };
797
798                // Serve this connection until we see any of the channels disconnect.
799                loop {
800                    let Ok((resp, resp_nonce)) = serve_rx_channels() else {
801                        break;
802                    };
803
804                    if resp_nonce == nonce {
805                        // Response for the current connection; forward it.
806                        stashed_responses.clear();
807                        if response_tx.send(resp).is_err() {
808                            break;
809                        }
810                    } else {
811                        // Response for a past or future connection; stash it.
812                        let stash = stashed_responses.entry(resp_nonce).or_default();
813                        stash.push(resp);
814                    }
815                }
816            }
817        })
818        .unwrap();
819}