mz_compute/
server.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! An interactive dataflow server.
11
12use std::cell::RefCell;
13use std::cmp::Ordering;
14use std::collections::{BTreeMap, BTreeSet};
15use std::convert::Infallible;
16use std::fmt::Debug;
17use std::path::PathBuf;
18use std::rc::Rc;
19use std::sync::Arc;
20use std::thread;
21use std::time::{Duration, Instant};
22
23use anyhow::Error;
24use crossbeam_channel::SendError;
25use mz_cluster::client::{ClusterClient, ClusterSpec};
26use mz_compute_client::protocol::command::ComputeCommand;
27use mz_compute_client::protocol::history::ComputeCommandHistory;
28use mz_compute_client::protocol::response::ComputeResponse;
29use mz_compute_client::service::ComputeClient;
30use mz_ore::halt;
31use mz_ore::metrics::MetricsRegistry;
32use mz_ore::tracing::TracingHandle;
33use mz_persist_client::cache::PersistClientCache;
34use mz_storage_types::connections::ConnectionContext;
35use mz_txn_wal::operator::TxnsContext;
36use timely::communication::Allocate;
37use timely::progress::Antichain;
38use timely::worker::Worker as TimelyWorker;
39use tokio::sync::mpsc;
40use tracing::{info, trace, warn};
41
42use crate::command_channel;
43use crate::compute_state::{ActiveComputeState, ComputeState, ReportedFrontier};
44use crate::metrics::{ComputeMetrics, WorkerMetrics};
45
46/// Caller-provided configuration for compute.
47#[derive(Clone, Debug)]
48pub struct ComputeInstanceContext {
49    /// A directory that can be used for scratch work.
50    pub scratch_directory: Option<PathBuf>,
51    /// Whether to set core affinity for Timely workers.
52    pub worker_core_affinity: bool,
53    /// Context required to connect to an external sink from compute,
54    /// like the `CopyToS3OneshotSink` compute sink.
55    pub connection_context: ConnectionContext,
56}
57
58/// Configures the server with compute-specific metrics.
59#[derive(Debug, Clone)]
60struct Config {
61    /// `persist` client cache.
62    pub persist_clients: Arc<PersistClientCache>,
63    /// Context necessary for rendering txn-wal operators.
64    pub txns_ctx: TxnsContext,
65    /// A process-global handle to tracing configuration.
66    pub tracing_handle: Arc<TracingHandle>,
67    /// Metrics exposed by compute replicas.
68    pub metrics: ComputeMetrics,
69    /// Other configuration for compute.
70    pub context: ComputeInstanceContext,
71}
72
73/// Initiates a timely dataflow computation, processing compute commands.
74pub fn serve(
75    metrics_registry: &MetricsRegistry,
76    persist_clients: Arc<PersistClientCache>,
77    txns_ctx: TxnsContext,
78    tracing_handle: Arc<TracingHandle>,
79    context: ComputeInstanceContext,
80) -> Result<impl Fn() -> Box<dyn ComputeClient> + use<>, Error> {
81    let config = Config {
82        persist_clients,
83        txns_ctx,
84        tracing_handle,
85        metrics: ComputeMetrics::register_with(metrics_registry),
86        context,
87    };
88    let tokio_executor = tokio::runtime::Handle::current();
89    let timely_container = Arc::new(tokio::sync::Mutex::new(None));
90
91    let client_builder = move || {
92        let client = ClusterClient::new(
93            Arc::clone(&timely_container),
94            tokio_executor.clone(),
95            config.clone(),
96        );
97        let client: Box<dyn ComputeClient> = Box::new(client);
98        client
99    };
100
101    Ok(client_builder)
102}
103
104/// Error type returned on connection epoch changes.
105///
106/// An epoch change informs workers that subsequent commands come a from a new client connection
107/// and therefore require reconciliation.
108struct EpochChange(u64);
109
110/// Endpoint used by workers to receive compute commands.
111///
112/// Observes epoch changes in the command stream and converts them into receive errors.
113struct CommandReceiver {
114    /// The channel supplying commands.
115    inner: command_channel::Receiver,
116    /// The ID of the Timely worker.
117    worker_id: usize,
118    /// The epoch identifying the current cluster protocol incarnation.
119    epoch: Option<u64>,
120    /// A stash to enable peeking the next command, used in `try_recv`.
121    stashed_command: Option<ComputeCommand>,
122}
123
124impl CommandReceiver {
125    fn new(inner: command_channel::Receiver, worker_id: usize) -> Self {
126        Self {
127            inner,
128            worker_id,
129            epoch: None,
130            stashed_command: None,
131        }
132    }
133
134    /// Receive the next pending command, if any.
135    ///
136    /// If the next command is at a different epoch, this method instead returns an `Err`
137    /// containing the new epoch.
138    fn try_recv(&mut self) -> Result<Option<ComputeCommand>, EpochChange> {
139        if let Some(command) = self.stashed_command.take() {
140            return Ok(Some(command));
141        }
142        let Some((command, epoch)) = self.inner.try_recv() else {
143            return Ok(None);
144        };
145
146        trace!(worker = self.worker_id, %epoch, ?command, "received command");
147
148        match self.epoch.cmp(&Some(epoch)) {
149            Ordering::Less => {
150                self.epoch = Some(epoch);
151                self.stashed_command = Some(command);
152                Err(EpochChange(epoch))
153            }
154            Ordering::Equal => Ok(Some(command)),
155            Ordering::Greater => panic!("epoch regression: {epoch} < {}", self.epoch.unwrap()),
156        }
157    }
158}
159
160/// Endpoint used by workers to send sending compute responses.
161///
162/// Tags responses with the current epoch, allowing receivers to filter out responses intended for
163/// previous client connections.
164pub(crate) struct ResponseSender {
165    /// The channel consuming responses.
166    inner: crossbeam_channel::Sender<(ComputeResponse, u64)>,
167    /// The ID of the Timely worker.
168    worker_id: usize,
169    /// The epoch identifying the current cluster protocol incarnation.
170    epoch: Option<u64>,
171}
172
173impl ResponseSender {
174    fn new(inner: crossbeam_channel::Sender<(ComputeResponse, u64)>, worker_id: usize) -> Self {
175        Self {
176            inner,
177            worker_id,
178            epoch: None,
179        }
180    }
181
182    /// Advance to the given epoch.
183    fn advance_epoch(&mut self, epoch: u64) {
184        assert!(
185            Some(epoch) > self.epoch,
186            "epoch regression: {epoch} <= {}",
187            self.epoch.unwrap(),
188        );
189        self.epoch = Some(epoch);
190    }
191
192    /// Send a compute response.
193    pub fn send(&self, response: ComputeResponse) -> Result<(), SendError<ComputeResponse>> {
194        let epoch = self.epoch.expect("epoch must be initialized");
195
196        trace!(worker = self.worker_id, %epoch, ?response, "sending response");
197        self.inner
198            .send((response, epoch))
199            .map_err(|SendError((resp, _))| SendError(resp))
200    }
201}
202
203/// State maintained for each worker thread.
204///
205/// Much of this state can be viewed as local variables for the worker thread,
206/// holding state that persists across function calls.
207struct Worker<'w, A: Allocate> {
208    /// The underlying Timely worker.
209    timely_worker: &'w mut TimelyWorker<A>,
210    /// The channel over which commands are received.
211    command_rx: CommandReceiver,
212    /// The channel over which responses are sent.
213    response_tx: ResponseSender,
214    compute_state: Option<ComputeState>,
215    /// Compute metrics.
216    metrics: WorkerMetrics,
217    /// A process-global cache of (blob_uri, consensus_uri) -> PersistClient.
218    /// This is intentionally shared between workers
219    persist_clients: Arc<PersistClientCache>,
220    /// Context necessary for rendering txn-wal operators.
221    txns_ctx: TxnsContext,
222    /// A process-global handle to tracing configuration.
223    tracing_handle: Arc<TracingHandle>,
224    context: ComputeInstanceContext,
225}
226
227impl ClusterSpec for Config {
228    type Command = ComputeCommand;
229    type Response = ComputeResponse;
230
231    fn run_worker<A: Allocate + 'static>(
232        &self,
233        timely_worker: &mut TimelyWorker<A>,
234        client_rx: crossbeam_channel::Receiver<(
235            crossbeam_channel::Receiver<ComputeCommand>,
236            mpsc::UnboundedSender<ComputeResponse>,
237        )>,
238    ) {
239        if self.context.worker_core_affinity {
240            set_core_affinity(timely_worker.index());
241        }
242
243        let worker_id = timely_worker.index();
244        let metrics = self.metrics.for_worker(worker_id);
245
246        // Create the command channel that broadcasts commands from worker 0 to other workers. We
247        // reuse this channel between client connections, to avoid bugs where different workers end
248        // up creating incompatible sides of the channel dataflow after reconnects.
249        // See database-issues#8964.
250        let (cmd_tx, cmd_rx) = command_channel::render(timely_worker);
251        let (resp_tx, resp_rx) = crossbeam_channel::unbounded();
252
253        spawn_channel_adapter(client_rx, cmd_tx, resp_rx, worker_id);
254
255        Worker {
256            timely_worker,
257            command_rx: CommandReceiver::new(cmd_rx, worker_id),
258            response_tx: ResponseSender::new(resp_tx, worker_id),
259            metrics,
260            context: self.context.clone(),
261            persist_clients: Arc::clone(&self.persist_clients),
262            txns_ctx: self.txns_ctx.clone(),
263            compute_state: None,
264            tracing_handle: Arc::clone(&self.tracing_handle),
265        }
266        .run()
267    }
268}
269
270/// Set the current thread's core affinity, based on the given `worker_id`.
271#[cfg(not(target_os = "macos"))]
272fn set_core_affinity(worker_id: usize) {
273    use tracing::error;
274
275    let Some(mut core_ids) = core_affinity::get_core_ids() else {
276        error!(worker_id, "unable to get core IDs for setting affinity");
277        return;
278    };
279
280    // The `get_core_ids` docs don't say anything about a guaranteed order of the returned Vec,
281    // so sort it just to be safe.
282    core_ids.sort_unstable_by_key(|i| i.id);
283
284    // On multi-process replicas `worker_id` might be greater than the number of available cores.
285    // However, we assume that we always have at least as many cores as there are local workers.
286    // Violating this assumption is safe but might lead to degraded performance due to skew in core
287    // utilization.
288    let idx = worker_id % core_ids.len();
289    let core_id = core_ids[idx];
290
291    if core_affinity::set_for_current(core_id) {
292        info!(
293            worker_id,
294            core_id = core_id.id,
295            "set core affinity for worker"
296        );
297    } else {
298        error!(
299            worker_id,
300            core_id = core_id.id,
301            "failed to set core affinity for worker"
302        )
303    }
304}
305
306/// Set the current thread's core affinity, based on the given `worker_id`.
307#[cfg(target_os = "macos")]
308fn set_core_affinity(_worker_id: usize) {
309    // Setting core affinity is known to not work on Apple Silicon:
310    // https://github.com/Elzair/core_affinity_rs/issues/22
311    info!("setting core affinity is not supported on macOS");
312}
313
314impl<'w, A: Allocate + 'static> Worker<'w, A> {
315    /// Runs a compute worker.
316    pub fn run(&mut self) {
317        // The command receiver is initialized without an epoch, so receiving the first command
318        // always triggers an epoch change.
319        let EpochChange(epoch) = self.recv_command().expect_err("change to first epoch");
320        self.advance_epoch(epoch);
321
322        loop {
323            let Err(EpochChange(epoch)) = self.run_client();
324            self.advance_epoch(epoch);
325        }
326    }
327
328    fn advance_epoch(&mut self, epoch: u64) {
329        self.response_tx.advance_epoch(epoch);
330    }
331
332    /// Handles commands for a client connection, returns when the epoch changes.
333    fn run_client(&mut self) -> Result<Infallible, EpochChange> {
334        self.reconcile()?;
335
336        // The last time we did periodic maintenance.
337        let mut last_maintenance = Instant::now();
338
339        // Commence normal operation.
340        loop {
341            // Get the maintenance interval, default to zero if we don't have a compute state.
342            let maintenance_interval = self
343                .compute_state
344                .as_ref()
345                .map_or(Duration::ZERO, |state| state.server_maintenance_interval);
346
347            let now = Instant::now();
348            // Determine if we need to perform maintenance, which is true if `maintenance_interval`
349            // time has passed since the last maintenance.
350            let sleep_duration;
351            if now >= last_maintenance + maintenance_interval {
352                last_maintenance = now;
353                sleep_duration = None;
354
355                // Report frontier information back the coordinator.
356                if let Some(mut compute_state) = self.activate_compute() {
357                    compute_state.compute_state.traces.maintenance();
358                    // Report operator hydration before frontiers, as reporting frontiers may
359                    // affect hydration reporting.
360                    compute_state.report_operator_hydration();
361                    compute_state.report_frontiers();
362                    compute_state.report_dropped_collections();
363                    compute_state.report_metrics();
364                    compute_state.check_expiration();
365                }
366
367                self.metrics.record_shared_row_metrics();
368            } else {
369                // We didn't perform maintenance, sleep until the next maintenance interval.
370                let next_maintenance = last_maintenance + maintenance_interval;
371                sleep_duration = Some(next_maintenance.saturating_duration_since(now))
372            };
373
374            // Step the timely worker, recording the time taken.
375            let timer = self.metrics.timely_step_duration_seconds.start_timer();
376            self.timely_worker.step_or_park(sleep_duration);
377            timer.observe_duration();
378
379            self.handle_pending_commands()?;
380
381            if let Some(mut compute_state) = self.activate_compute() {
382                compute_state.process_peeks();
383                compute_state.process_subscribes();
384                compute_state.process_copy_tos();
385            }
386        }
387    }
388
389    fn handle_pending_commands(&mut self) -> Result<(), EpochChange> {
390        while let Some(cmd) = self.command_rx.try_recv()? {
391            self.handle_command(cmd);
392        }
393        Ok(())
394    }
395
396    fn handle_command(&mut self, cmd: ComputeCommand) {
397        match &cmd {
398            ComputeCommand::CreateInstance(_) => {
399                self.compute_state = Some(ComputeState::new(
400                    Arc::clone(&self.persist_clients),
401                    self.txns_ctx.clone(),
402                    self.metrics.clone(),
403                    Arc::clone(&self.tracing_handle),
404                    self.context.clone(),
405                ));
406            }
407            _ => (),
408        }
409        self.activate_compute().unwrap().handle_compute_command(cmd);
410    }
411
412    fn activate_compute(&mut self) -> Option<ActiveComputeState<'_, A>> {
413        if let Some(compute_state) = &mut self.compute_state {
414            Some(ActiveComputeState {
415                timely_worker: &mut *self.timely_worker,
416                compute_state,
417                response_tx: &mut self.response_tx,
418            })
419        } else {
420            None
421        }
422    }
423
424    /// Receive the next compute command.
425    ///
426    /// This method blocks if no command is currently available, but takes care to step the Timely
427    /// worker while doing so.
428    fn recv_command(&mut self) -> Result<ComputeCommand, EpochChange> {
429        loop {
430            if let Some(cmd) = self.command_rx.try_recv()? {
431                return Ok(cmd);
432            }
433
434            let start = Instant::now();
435            self.timely_worker.step_or_park(None);
436            self.metrics
437                .timely_step_duration_seconds
438                .observe(start.elapsed().as_secs_f64());
439        }
440    }
441
442    /// Extract commands until `InitializationComplete`, and make the worker reflect those commands.
443    ///
444    /// This method is meant to be a function of the commands received thus far (as recorded in the
445    /// compute state command history) and the new commands from `command_rx`. It should not be a
446    /// function of other characteristics, like whether the worker has managed to respond to a peek
447    /// or not. Some effort goes in to narrowing our view to only the existing commands we can be sure
448    /// are live at all other workers.
449    ///
450    /// The methodology here is to drain `command_rx` until an `InitializationComplete`, at which point
451    /// the prior commands are "reconciled" in. Reconciliation takes each goal dataflow and looks for an
452    /// existing "compatible" dataflow (per `compatible()`) it can repurpose, with some additional tests
453    /// to be sure that we can cut over from one to the other (no additional compaction, no tails/sinks).
454    /// With any connections established, old orphaned dataflows are allow to compact away, and any new
455    /// dataflows are created from scratch. "Kept" dataflows are allowed to compact up to any new `as_of`.
456    ///
457    /// Some additional tidying happens, cleaning up pending peeks, reported frontiers, and creating a new
458    /// subscribe response buffer. We will need to be vigilant with future modifications to `ComputeState` to
459    /// line up changes there with clean resets here.
460    fn reconcile(&mut self) -> Result<(), EpochChange> {
461        // To initialize the connection, we want to drain all commands until we receive a
462        // `ComputeCommand::InitializationComplete` command to form a target command state.
463        let mut new_commands = Vec::new();
464        loop {
465            match self.recv_command()? {
466                ComputeCommand::InitializationComplete => break,
467                command => new_commands.push(command),
468            }
469        }
470
471        // Commands we will need to apply before entering normal service.
472        // These commands may include dropping existing dataflows, compacting existing dataflows,
473        // and creating new dataflows, in addition to standard peek and compaction commands.
474        // The result should be the same as if dropping all dataflows and running `new_commands`.
475        let mut todo_commands = Vec::new();
476        // We only have a compute history if we are in an initialized state
477        // (i.e. after a `CreateInstance`).
478        // If this is not the case, just copy `new_commands` into `todo_commands`.
479        if let Some(compute_state) = &mut self.compute_state {
480            // Reduce the installed commands.
481            // Importantly, act as if all peeks may have been retired (as we cannot know otherwise).
482            compute_state.command_history.discard_peeks();
483            compute_state.command_history.reduce();
484
485            // At this point, we need to sort out which of the *certainly installed* dataflows are
486            // suitable replacements for the requested dataflows. A dataflow is "certainly installed"
487            // as of a frontier if its compaction allows it to go no further. We ignore peeks for this
488            // reasoning, as we cannot be certain that peeks still exist at any other worker.
489
490            // Having reduced our installed command history retaining no peeks (above), we should be able
491            // to use track down installed dataflows we can use as surrogates for requested dataflows (which
492            // have retained all of their peeks, creating a more demanding `as_of` requirement).
493            // NB: installed dataflows may still be allowed to further compact, and we should double check
494            // this before being too confident. It should be rare without peeks, but could happen with e.g.
495            // multiple outputs of a dataflow.
496
497            // The values with which a prior `CreateInstance` was called, if it was.
498            let mut old_instance_config = None;
499            // Index dataflows by `export_ids().collect()`, as this is a precondition for their compatibility.
500            let mut old_dataflows = BTreeMap::default();
501            // Maintain allowed compaction, in case installed identifiers may have been allowed to compact.
502            let mut old_frontiers = BTreeMap::default();
503            for command in compute_state.command_history.iter() {
504                match command {
505                    ComputeCommand::CreateInstance(config) => {
506                        old_instance_config = Some(config);
507                    }
508                    ComputeCommand::CreateDataflow(dataflow) => {
509                        let export_ids = dataflow.export_ids().collect::<BTreeSet<_>>();
510                        old_dataflows.insert(export_ids, dataflow);
511                    }
512                    ComputeCommand::AllowCompaction { id, frontier } => {
513                        old_frontiers.insert(id, frontier);
514                    }
515                    _ => {
516                        // Nothing to do in these cases.
517                    }
518                }
519            }
520
521            // Compaction commands that can be applied to existing dataflows.
522            let mut old_compaction = BTreeMap::default();
523            // Exported identifiers from dataflows we retain.
524            let mut retain_ids = BTreeSet::default();
525
526            // Traverse new commands, sorting out what remediation we can do.
527            for command in new_commands.iter() {
528                match command {
529                    ComputeCommand::CreateDataflow(dataflow) => {
530                        // Attempt to find an existing match for the dataflow.
531                        let as_of = dataflow.as_of.as_ref().unwrap();
532                        let export_ids = dataflow.export_ids().collect::<BTreeSet<_>>();
533
534                        if let Some(old_dataflow) = old_dataflows.get(&export_ids) {
535                            let compatible = old_dataflow.compatible_with(dataflow);
536                            let uncompacted = !export_ids
537                                .iter()
538                                .flat_map(|id| old_frontiers.get(id))
539                                .any(|frontier| {
540                                    !timely::PartialOrder::less_equal(
541                                        *frontier,
542                                        dataflow.as_of.as_ref().unwrap(),
543                                    )
544                                });
545
546                            // We cannot reconcile subscriptions at the moment, because the
547                            // response buffer is shared, and to a first approximation must be
548                            // completely reformed.
549                            let subscribe_free = dataflow.subscribe_ids().next().is_none();
550
551                            // If we have replaced any dependency of this dataflow, we need to
552                            // replace this dataflow, to make it use the replacement.
553                            let dependencies_retained = dataflow
554                                .imported_index_ids()
555                                .all(|id| retain_ids.contains(&id));
556
557                            if compatible && uncompacted && subscribe_free && dependencies_retained
558                            {
559                                // Match found; remove the match from the deletion queue,
560                                // and compact its outputs to the dataflow's `as_of`.
561                                old_dataflows.remove(&export_ids);
562                                for id in export_ids.iter() {
563                                    old_compaction.insert(*id, as_of.clone());
564                                }
565                                retain_ids.extend(export_ids);
566                            } else {
567                                warn!(
568                                    ?export_ids,
569                                    ?compatible,
570                                    ?uncompacted,
571                                    ?subscribe_free,
572                                    ?dependencies_retained,
573                                    old_as_of = ?old_dataflow.as_of,
574                                    new_as_of = ?as_of,
575                                    "dataflow reconciliation failed",
576                                );
577                                todo_commands
578                                    .push(ComputeCommand::CreateDataflow(dataflow.clone()));
579                            }
580
581                            compute_state.metrics.record_dataflow_reconciliation(
582                                compatible,
583                                uncompacted,
584                                subscribe_free,
585                                dependencies_retained,
586                            );
587                        } else {
588                            todo_commands.push(ComputeCommand::CreateDataflow(dataflow.clone()));
589                        }
590                    }
591                    ComputeCommand::CreateInstance(config) => {
592                        // Cluster creation should not be performed again!
593                        if old_instance_config.map_or(false, |old| !old.compatible_with(config)) {
594                            halt!(
595                                "new instance configuration not compatible with existing instance configuration:\n{:?}\nvs\n{:?}",
596                                config,
597                                old_instance_config,
598                            );
599                        }
600                    }
601                    // All other commands we apply as requested.
602                    command => {
603                        todo_commands.push(command.clone());
604                    }
605                }
606            }
607
608            // Issue compaction commands first to reclaim resources.
609            for (_, dataflow) in old_dataflows.iter() {
610                for id in dataflow.export_ids() {
611                    // We want to drop anything that has not yet been dropped,
612                    // and nothing that has already been dropped.
613                    if old_frontiers.get(&id) != Some(&&Antichain::new()) {
614                        old_compaction.insert(id, Antichain::new());
615                    }
616                }
617            }
618            for (&id, frontier) in &old_compaction {
619                let frontier = frontier.clone();
620                todo_commands.insert(0, ComputeCommand::AllowCompaction { id, frontier });
621            }
622
623            // Clean up worker-local state.
624            //
625            // Various aspects of `ComputeState` need to be either uninstalled, or return to a blank slate.
626            // All dropped dataflows should clean up after themselves, as we plan to install new dataflows
627            // re-using the same identifiers.
628            // All re-used dataflows should roll back any believed communicated information (e.g. frontiers)
629            // so that they recommunicate that information as if from scratch.
630
631            // Remove all pending peeks.
632            for (_, peek) in std::mem::take(&mut compute_state.pending_peeks) {
633                // Log dropping the peek request.
634                if let Some(logger) = compute_state.compute_logger.as_mut() {
635                    logger.log(&peek.as_log_event(false));
636                }
637            }
638
639            // Clear the list of dropped collections.
640            // We intended to report their dropping, but the controller does not expect to hear
641            // about them anymore.
642            compute_state.dropped_collections = Default::default();
643
644            for (&id, collection) in compute_state.collections.iter_mut() {
645                // Adjust reported frontiers:
646                //  * For dataflows we continue to use, reset to ensure we report something not
647                //    before the new `as_of` next.
648                //  * For dataflows we drop, set to the empty frontier, to ensure we don't report
649                //    anything for them.
650                let retained = retain_ids.contains(&id);
651                let compaction = old_compaction.remove(&id);
652                let new_reported_frontier = match (retained, compaction) {
653                    (true, Some(new_as_of)) => ReportedFrontier::NotReported { lower: new_as_of },
654                    (true, None) => {
655                        unreachable!("retained dataflows are compacted to the new as_of")
656                    }
657                    (false, Some(new_frontier)) => {
658                        assert!(new_frontier.is_empty());
659                        ReportedFrontier::Reported(new_frontier)
660                    }
661                    (false, None) => {
662                        // Logging dataflows are implicitly retained and don't have a new as_of.
663                        // Reset them to the minimal frontier.
664                        ReportedFrontier::new()
665                    }
666                };
667
668                collection.reset_reported_frontiers(new_reported_frontier);
669
670                // Sink tokens should be retained for retained dataflows, and dropped for dropped
671                // dataflows.
672                //
673                // Dropping the tokens of active subscribes makes them place `DroppedAt` responses
674                // into the subscribe response buffer. We drop that buffer in the next step, which
675                // ensures that we don't send out `DroppedAt` responses for subscribes dropped
676                // during reconciliation.
677                if !retained {
678                    collection.sink_token = None;
679                }
680            }
681
682            // We must drop the subscribe response buffer as it is global across all subscribes.
683            // If it were broken out by `GlobalId` then we could drop only those of dataflows we drop.
684            compute_state.subscribe_response_buffer = Rc::new(RefCell::new(Vec::new()));
685
686            // The controller expects the logging collections to be readable from the minimum time
687            // initially. We cannot recreate the logging arrangements without restarting the
688            // instance, but we can pad the compacted times with empty data. Doing so is sound
689            // because logging collections from different replica incarnations are considered
690            // distinct TVCs, so the controller doesn't expect any historical consistency from
691            // these collections when it reconnects to a replica.
692            //
693            // TODO(database-issues#8152): Consider resolving this with controller-side reconciliation instead.
694            if let Some(config) = old_instance_config {
695                for id in config.logging.index_logs.values() {
696                    let trace = compute_state
697                        .traces
698                        .remove(id)
699                        .expect("logging trace exists");
700                    let padded = trace.into_padded();
701                    compute_state.traces.set(*id, padded);
702                }
703            }
704        } else {
705            todo_commands.clone_from(&new_commands);
706        }
707
708        // Execute the commands to bring us to `new_commands`.
709        for command in todo_commands.into_iter() {
710            self.handle_command(command);
711        }
712
713        // Overwrite `self.command_history` to reflect `new_commands`.
714        // It is possible that there still isn't a compute state yet.
715        if let Some(compute_state) = &mut self.compute_state {
716            let mut command_history = ComputeCommandHistory::new(self.metrics.for_history());
717            for command in new_commands.iter() {
718                command_history.push(command.clone());
719            }
720            compute_state.command_history = command_history;
721        }
722        Ok(())
723    }
724}
725
726/// Spawn a thread to bridge between [`ClusterClient`] and [`Worker`] channels.
727///
728/// The [`Worker`] expects a pair of persistent channels, with punctuation marking reconnects,
729/// while the [`ClusterClient`] provides a new pair of channels on each reconnect.
730fn spawn_channel_adapter(
731    client_rx: crossbeam_channel::Receiver<(
732        crossbeam_channel::Receiver<ComputeCommand>,
733        mpsc::UnboundedSender<ComputeResponse>,
734    )>,
735    command_tx: command_channel::Sender,
736    response_rx: crossbeam_channel::Receiver<(ComputeResponse, u64)>,
737    worker_id: usize,
738) {
739    thread::Builder::new()
740        // "cca" stands for "compute channel adapter". We need to shorten that because Linux has a
741        // 15-character limit for thread names.
742        .name(format!("cca-{worker_id}"))
743        .spawn(move || {
744            // To make workers aware of the individual client connections, we tag forwarded
745            // commands with an epoch that increases on every new client connection. Additionally,
746            // we use the epoch to filter out responses with a different epoch, which were intended
747            // for previous clients.
748            let mut epoch = 0;
749
750            // It's possible that we receive responses with epochs from the future: Worker 0 might
751            // have increased its epoch before us and broadcasted it to our Timely cluster. When we
752            // receive a response with a future epoch, we need to wait with forwarding it until we
753            // have increased our own epoch sufficiently (by observing new client connections). We
754            // need to stash the response in the meantime.
755            let mut stashed_response = None;
756
757            while let Ok((command_rx, response_tx)) = client_rx.recv() {
758                epoch += 1;
759
760                // Wait for a new response while forwarding received commands.
761                let serve_rx_channels = || loop {
762                    crossbeam_channel::select! {
763                        recv(command_rx) -> msg => match msg {
764                            Ok(cmd) => command_tx.send((cmd, epoch)),
765                            Err(_) => return Err(()),
766                        },
767                        recv(response_rx) -> msg => {
768                            return Ok(msg.expect("worker connected"));
769                        }
770                    }
771                };
772
773                // Serve this connection until we see any of the channels disconnect.
774                loop {
775                    let (resp, resp_epoch) = match stashed_response.take() {
776                        Some(stashed) => stashed,
777                        None => match serve_rx_channels() {
778                            Ok(response) => response,
779                            Err(()) => break,
780                        },
781                    };
782
783                    if resp_epoch < epoch {
784                        // Response for a previous connection; discard it.
785                        continue;
786                    } else if resp_epoch > epoch {
787                        // Response for a future connection; stash it and reconnect.
788                        stashed_response = Some((resp, resp_epoch));
789                        break;
790                    } else {
791                        // Response for the current connection; forward it.
792                        if response_tx.send(resp).is_err() {
793                            break;
794                        }
795                    }
796                }
797            }
798        })
799        .unwrap();
800}