Skip to main content

mz_compute/
server.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! An interactive dataflow server.
11
12use std::cell::RefCell;
13use std::collections::{BTreeMap, BTreeSet};
14use std::convert::Infallible;
15use std::fmt::Debug;
16use std::path::PathBuf;
17use std::rc::Rc;
18use std::sync::{Arc, Mutex};
19use std::time::{Duration, Instant};
20
21use anyhow::Error;
22use mz_cluster::client::{ClusterClient, ClusterSpec};
23use mz_cluster_client::client::TimelyConfig;
24use mz_compute_client::protocol::command::ComputeCommand;
25use mz_compute_client::protocol::history::ComputeCommandHistory;
26use mz_compute_client::protocol::response::ComputeResponse;
27use mz_compute_client::service::ComputeClient;
28use mz_ore::halt;
29use mz_ore::metrics::MetricsRegistry;
30use mz_ore::tracing::TracingHandle;
31use mz_persist_client::cache::PersistClientCache;
32use mz_storage_types::connections::ConnectionContext;
33use mz_txn_wal::operator::TxnsContext;
34use timely::progress::Antichain;
35use timely::worker::Worker as TimelyWorker;
36use tokio::sync::mpsc;
37use tokio::sync::mpsc::error::SendError;
38use tracing::{info, trace, warn};
39use uuid::Uuid;
40
41use crate::command_channel;
42use crate::compute_state::{ActiveComputeState, ComputeState, ReportedFrontier};
43use crate::metrics::{ComputeMetrics, WorkerMetrics};
44
45/// Caller-provided configuration for compute.
46#[derive(Clone, Debug)]
47pub struct ComputeInstanceContext {
48    /// A directory that can be used for scratch work.
49    pub scratch_directory: Option<PathBuf>,
50    /// Whether to set core affinity for Timely workers.
51    pub worker_core_affinity: bool,
52    /// Context required to connect to an external sink from compute,
53    /// like the `CopyToS3OneshotSink` compute sink.
54    pub connection_context: ConnectionContext,
55}
56
57/// Configures the server with compute-specific metrics.
58#[derive(Debug, Clone)]
59struct Config {
60    /// `persist` client cache.
61    pub persist_clients: Arc<PersistClientCache>,
62    /// Context necessary for rendering txn-wal operators.
63    pub txns_ctx: TxnsContext,
64    /// A process-global handle to tracing configuration.
65    pub tracing_handle: Arc<TracingHandle>,
66    /// Metrics exposed by compute replicas.
67    pub metrics: ComputeMetrics,
68    /// Other configuration for compute.
69    pub context: ComputeInstanceContext,
70    /// The process-global metrics registry.
71    pub metrics_registry: MetricsRegistry,
72    /// The number of timely workers per process.
73    pub workers_per_process: usize,
74}
75
76/// Initiates a timely dataflow computation, processing compute commands.
77pub async fn serve(
78    timely_config: TimelyConfig,
79    metrics_registry: &MetricsRegistry,
80    persist_clients: Arc<PersistClientCache>,
81    txns_ctx: TxnsContext,
82    tracing_handle: Arc<TracingHandle>,
83    context: ComputeInstanceContext,
84) -> Result<impl Fn() -> Box<dyn ComputeClient> + use<>, Error> {
85    let config = Config {
86        persist_clients,
87        txns_ctx,
88        tracing_handle,
89        metrics: ComputeMetrics::register_with(metrics_registry),
90        context,
91        metrics_registry: metrics_registry.clone(),
92        workers_per_process: timely_config.workers,
93    };
94    let tokio_executor = tokio::runtime::Handle::current();
95
96    let timely_container = config.build_cluster(timely_config, tokio_executor).await?;
97    let timely_container = Arc::new(Mutex::new(timely_container));
98
99    let client_builder = move || {
100        let client = ClusterClient::new(Arc::clone(&timely_container));
101        let client: Box<dyn ComputeClient> = Box::new(client);
102        client
103    };
104
105    Ok(client_builder)
106}
107
108/// Error type returned on connection nonce changes.
109///
110/// A nonce change informs workers that subsequent commands come a from a new client connection
111/// and therefore require reconciliation.
112struct NonceChange(Uuid);
113
114/// Endpoint used by workers to receive compute commands.
115///
116/// Observes nonce changes in the command stream and converts them into receive errors.
117struct CommandReceiver {
118    /// The channel supplying commands.
119    inner: command_channel::Receiver,
120    /// The ID of the Timely worker.
121    worker_id: usize,
122    /// The nonce identifying the current cluster protocol incarnation.
123    nonce: Option<Uuid>,
124    /// A stash to enable peeking the next command, used in `try_recv`.
125    stashed_command: Option<ComputeCommand>,
126}
127
128impl CommandReceiver {
129    fn new(inner: command_channel::Receiver, worker_id: usize) -> Self {
130        Self {
131            inner,
132            worker_id,
133            nonce: None,
134            stashed_command: None,
135        }
136    }
137
138    /// Receive the next pending command, if any.
139    ///
140    /// If the next command has a different nonce, this method instead returns an `Err`
141    /// containing the new nonce.
142    fn try_recv(&mut self) -> Result<Option<ComputeCommand>, NonceChange> {
143        if let Some(command) = self.stashed_command.take() {
144            return Ok(Some(command));
145        }
146        let Some((command, nonce)) = self.inner.try_recv() else {
147            return Ok(None);
148        };
149
150        trace!(worker = self.worker_id, %nonce, ?command, "received command");
151
152        if Some(nonce) == self.nonce {
153            Ok(Some(command))
154        } else {
155            self.nonce = Some(nonce);
156            self.stashed_command = Some(command);
157            Err(NonceChange(nonce))
158        }
159    }
160}
161
162/// Endpoint used by workers to send sending compute responses.
163///
164/// Tags responses with the current nonce, allowing receivers to filter out responses intended for
165/// previous client connections.
166pub(crate) struct ResponseSender {
167    /// The channel consuming responses.
168    inner: mpsc::UnboundedSender<(ComputeResponse, Uuid)>,
169    /// The ID of the Timely worker.
170    worker_id: usize,
171    /// The nonce identifying the current cluster protocol incarnation.
172    nonce: Option<Uuid>,
173}
174
175impl ResponseSender {
176    fn new(inner: mpsc::UnboundedSender<(ComputeResponse, Uuid)>, worker_id: usize) -> Self {
177        Self {
178            inner,
179            worker_id,
180            nonce: None,
181        }
182    }
183
184    /// Set the cluster protocol nonce.
185    fn set_nonce(&mut self, nonce: Uuid) {
186        self.nonce = Some(nonce);
187    }
188
189    /// Send a compute response.
190    pub fn send(&self, response: ComputeResponse) -> Result<(), SendError<ComputeResponse>> {
191        let nonce = self.nonce.expect("nonce must be initialized");
192
193        trace!(worker = self.worker_id, %nonce, ?response, "sending response");
194        self.inner
195            .send((response, nonce))
196            .map_err(|SendError((resp, _))| SendError(resp))
197    }
198}
199
200/// State maintained for each worker thread.
201///
202/// Much of this state can be viewed as local variables for the worker thread,
203/// holding state that persists across function calls.
204struct Worker<'w> {
205    /// The underlying Timely worker.
206    timely_worker: &'w mut TimelyWorker,
207    /// The channel over which commands are received.
208    command_rx: CommandReceiver,
209    /// The channel over which responses are sent.
210    response_tx: ResponseSender,
211    compute_state: Option<ComputeState>,
212    /// Compute metrics.
213    metrics: WorkerMetrics,
214    /// A process-global cache of (blob_uri, consensus_uri) -> PersistClient.
215    /// This is intentionally shared between workers
216    persist_clients: Arc<PersistClientCache>,
217    /// Context necessary for rendering txn-wal operators.
218    txns_ctx: TxnsContext,
219    /// A process-global handle to tracing configuration.
220    tracing_handle: Arc<TracingHandle>,
221    context: ComputeInstanceContext,
222    /// The process-global metrics registry.
223    metrics_registry: MetricsRegistry,
224    /// The number of timely workers per process.
225    workers_per_process: usize,
226}
227
228impl ClusterSpec for Config {
229    type Command = ComputeCommand;
230    type Response = ComputeResponse;
231
232    const NAME: &str = "compute";
233
234    fn run_worker(
235        &self,
236        timely_worker: &mut TimelyWorker,
237        client_rx: mpsc::UnboundedReceiver<(
238            Uuid,
239            mpsc::UnboundedReceiver<ComputeCommand>,
240            mpsc::UnboundedSender<ComputeResponse>,
241        )>,
242    ) {
243        if self.context.worker_core_affinity {
244            set_core_affinity(timely_worker.index());
245        }
246
247        let worker_id = timely_worker.index();
248        let metrics = self.metrics.for_worker(worker_id);
249
250        // Create the command channel that broadcasts commands from worker 0 to other workers. We
251        // reuse this channel between client connections, to avoid bugs where different workers end
252        // up creating incompatible sides of the channel dataflow after reconnects.
253        // See database-issues#8964.
254        let (cmd_tx, cmd_rx) = command_channel::render(timely_worker);
255        let (resp_tx, resp_rx) = mpsc::unbounded_channel();
256
257        spawn_channel_adapter(client_rx, cmd_tx, resp_rx, worker_id);
258
259        Worker {
260            timely_worker,
261            command_rx: CommandReceiver::new(cmd_rx, worker_id),
262            response_tx: ResponseSender::new(resp_tx, worker_id),
263            metrics,
264            context: self.context.clone(),
265            persist_clients: Arc::clone(&self.persist_clients),
266            txns_ctx: self.txns_ctx.clone(),
267            compute_state: None,
268            tracing_handle: Arc::clone(&self.tracing_handle),
269            metrics_registry: self.metrics_registry.clone(),
270            workers_per_process: self.workers_per_process,
271        }
272        .run()
273    }
274}
275
276/// Set the current thread's core affinity, based on the given `worker_id`.
277#[cfg(not(target_os = "macos"))]
278fn set_core_affinity(worker_id: usize) {
279    use tracing::error;
280
281    let Some(mut core_ids) = core_affinity::get_core_ids() else {
282        error!(worker_id, "unable to get core IDs for setting affinity");
283        return;
284    };
285
286    // The `get_core_ids` docs don't say anything about a guaranteed order of the returned Vec,
287    // so sort it just to be safe.
288    core_ids.sort_unstable_by_key(|i| i.id);
289
290    // On multi-process replicas `worker_id` might be greater than the number of available cores.
291    // However, we assume that we always have at least as many cores as there are local workers.
292    // Violating this assumption is safe but might lead to degraded performance due to skew in core
293    // utilization.
294    let idx = worker_id % core_ids.len();
295    let core_id = core_ids[idx];
296
297    if core_affinity::set_for_current(core_id) {
298        info!(
299            worker_id,
300            core_id = core_id.id,
301            "set core affinity for worker"
302        );
303    } else {
304        error!(
305            worker_id,
306            core_id = core_id.id,
307            "failed to set core affinity for worker"
308        )
309    }
310}
311
312/// Set the current thread's core affinity, based on the given `worker_id`.
313#[cfg(target_os = "macos")]
314fn set_core_affinity(_worker_id: usize) {
315    // Setting core affinity is known to not work on Apple Silicon:
316    // https://github.com/Elzair/core_affinity_rs/issues/22
317    info!("setting core affinity is not supported on macOS");
318}
319
320impl<'w> Worker<'w> {
321    /// Runs a compute worker.
322    pub fn run(&mut self) {
323        // The command receiver is initialized without an nonce, so receiving the first command
324        // always triggers a nonce change.
325        let NonceChange(nonce) = self.recv_command().expect_err("change to first nonce");
326        self.set_nonce(nonce);
327
328        loop {
329            let Err(NonceChange(nonce)) = self.run_client();
330            self.set_nonce(nonce);
331        }
332    }
333
334    fn set_nonce(&mut self, nonce: Uuid) {
335        self.response_tx.set_nonce(nonce);
336    }
337
338    /// Handles commands for a client connection, returns when the nonce changes.
339    fn run_client(&mut self) -> Result<Infallible, NonceChange> {
340        self.reconcile()?;
341
342        // The last time we did periodic maintenance.
343        let mut last_maintenance = Instant::now();
344
345        // Commence normal operation.
346        loop {
347            // Get the maintenance interval, default to zero if we don't have a compute state.
348            let maintenance_interval = self
349                .compute_state
350                .as_ref()
351                .map_or(Duration::ZERO, |state| state.server_maintenance_interval);
352
353            let now = Instant::now();
354            // Determine if we need to perform maintenance, which is true if `maintenance_interval`
355            // time has passed since the last maintenance.
356            let sleep_duration;
357            if now >= last_maintenance + maintenance_interval {
358                last_maintenance = now;
359                sleep_duration = None;
360
361                // Report frontier information back the coordinator.
362                if let Some(mut compute_state) = self.activate_compute() {
363                    compute_state.compute_state.traces.maintenance();
364                    compute_state.report_frontiers();
365                    compute_state.report_metrics();
366                    compute_state.check_expiration();
367                }
368
369                self.metrics.record_shared_row_metrics();
370            } else {
371                // We didn't perform maintenance, sleep until the next maintenance interval.
372                let next_maintenance = last_maintenance + maintenance_interval;
373                sleep_duration = Some(next_maintenance.saturating_duration_since(now))
374            };
375
376            // Step the timely worker, recording the time taken.
377            let timer = self.metrics.timely_step_duration_seconds.start_timer();
378            self.timely_worker.step_or_park(sleep_duration);
379            timer.observe_duration();
380
381            self.handle_pending_commands()?;
382
383            if let Some(mut compute_state) = self.activate_compute() {
384                compute_state.process_peeks();
385                compute_state.process_subscribes();
386                compute_state.process_copy_tos();
387            }
388        }
389    }
390
391    fn handle_pending_commands(&mut self) -> Result<(), NonceChange> {
392        while let Some(cmd) = self.command_rx.try_recv()? {
393            self.handle_command(cmd);
394        }
395        Ok(())
396    }
397
398    fn handle_command(&mut self, cmd: ComputeCommand) {
399        match &cmd {
400            ComputeCommand::CreateInstance(_) => {
401                self.compute_state = Some(ComputeState::new(
402                    Arc::clone(&self.persist_clients),
403                    self.txns_ctx.clone(),
404                    self.metrics.clone(),
405                    Arc::clone(&self.tracing_handle),
406                    self.context.clone(),
407                    self.metrics_registry.clone(),
408                    self.workers_per_process,
409                ));
410            }
411            _ => (),
412        }
413        self.activate_compute().unwrap().handle_compute_command(cmd);
414    }
415
416    fn activate_compute(&mut self) -> Option<ActiveComputeState<'_>> {
417        if let Some(compute_state) = &mut self.compute_state {
418            Some(ActiveComputeState {
419                timely_worker: &mut *self.timely_worker,
420                compute_state,
421                response_tx: &mut self.response_tx,
422            })
423        } else {
424            None
425        }
426    }
427
428    /// Receive the next compute command.
429    ///
430    /// This method blocks if no command is currently available, but takes care to step the Timely
431    /// worker while doing so.
432    fn recv_command(&mut self) -> Result<ComputeCommand, NonceChange> {
433        loop {
434            if let Some(cmd) = self.command_rx.try_recv()? {
435                return Ok(cmd);
436            }
437
438            let start = Instant::now();
439            self.timely_worker.step_or_park(None);
440            self.metrics
441                .timely_step_duration_seconds
442                .observe(start.elapsed().as_secs_f64());
443        }
444    }
445
446    /// Extract commands until `InitializationComplete`, and make the worker reflect those commands.
447    ///
448    /// This method is meant to be a function of the commands received thus far (as recorded in the
449    /// compute state command history) and the new commands from `command_rx`. It should not be a
450    /// function of other characteristics, like whether the worker has managed to respond to a peek
451    /// or not. Some effort goes in to narrowing our view to only the existing commands we can be sure
452    /// are live at all other workers.
453    ///
454    /// The methodology here is to drain `command_rx` until an `InitializationComplete`, at which point
455    /// the prior commands are "reconciled" in. Reconciliation takes each goal dataflow and looks for an
456    /// existing "compatible" dataflow (per `compatible()`) it can repurpose, with some additional tests
457    /// to be sure that we can cut over from one to the other (no additional compaction, no tails/sinks).
458    /// With any connections established, old orphaned dataflows are allow to compact away, and any new
459    /// dataflows are created from scratch. "Kept" dataflows are allowed to compact up to any new `as_of`.
460    ///
461    /// Some additional tidying happens, cleaning up pending peeks, reported frontiers, and creating a new
462    /// subscribe response buffer. We will need to be vigilant with future modifications to `ComputeState` to
463    /// line up changes there with clean resets here.
464    fn reconcile(&mut self) -> Result<(), NonceChange> {
465        // To initialize the connection, we want to drain all commands until we receive a
466        // `ComputeCommand::InitializationComplete` command to form a target command state.
467        let mut new_commands = Vec::new();
468        loop {
469            match self.recv_command()? {
470                ComputeCommand::InitializationComplete => break,
471                command => new_commands.push(command),
472            }
473        }
474
475        // Commands we will need to apply before entering normal service.
476        // These commands may include dropping existing dataflows, compacting existing dataflows,
477        // and creating new dataflows, in addition to standard peek and compaction commands.
478        // The result should be the same as if dropping all dataflows and running `new_commands`.
479        let mut todo_commands = Vec::new();
480        // We only have a compute history if we are in an initialized state
481        // (i.e. after a `CreateInstance`).
482        // If this is not the case, just copy `new_commands` into `todo_commands`.
483        if let Some(compute_state) = &mut self.compute_state {
484            // Reduce the installed commands.
485            // Importantly, act as if all peeks may have been retired (as we cannot know otherwise).
486            compute_state.command_history.discard_peeks();
487            compute_state.command_history.reduce();
488
489            // At this point, we need to sort out which of the *certainly installed* dataflows are
490            // suitable replacements for the requested dataflows. A dataflow is "certainly installed"
491            // as of a frontier if its compaction allows it to go no further. We ignore peeks for this
492            // reasoning, as we cannot be certain that peeks still exist at any other worker.
493
494            // Having reduced our installed command history retaining no peeks (above), we should be able
495            // to use track down installed dataflows we can use as surrogates for requested dataflows (which
496            // have retained all of their peeks, creating a more demanding `as_of` requirement).
497            // NB: installed dataflows may still be allowed to further compact, and we should double check
498            // this before being too confident. It should be rare without peeks, but could happen with e.g.
499            // multiple outputs of a dataflow.
500
501            // The values with which a prior `CreateInstance` was called, if it was.
502            let mut old_instance_config = None;
503            // Index dataflows by `export_ids().collect()`, as this is a precondition for their compatibility.
504            let mut old_dataflows = BTreeMap::default();
505            // Maintain allowed compaction, in case installed identifiers may have been allowed to compact.
506            let mut old_frontiers = BTreeMap::default();
507            for command in compute_state.command_history.iter() {
508                match command {
509                    ComputeCommand::CreateInstance(config) => {
510                        old_instance_config = Some(config);
511                    }
512                    ComputeCommand::CreateDataflow(dataflow) => {
513                        let export_ids = dataflow.export_ids().collect::<BTreeSet<_>>();
514                        old_dataflows.insert(export_ids, dataflow);
515                    }
516                    ComputeCommand::AllowCompaction { id, frontier } => {
517                        old_frontiers.insert(id, frontier);
518                    }
519                    _ => {
520                        // Nothing to do in these cases.
521                    }
522                }
523            }
524
525            // Compaction commands that can be applied to existing dataflows.
526            let mut old_compaction = BTreeMap::default();
527            // Exported identifiers from dataflows we retain.
528            let mut retain_ids = BTreeSet::default();
529
530            // Traverse new commands, sorting out what remediation we can do.
531            for command in new_commands.iter() {
532                match command {
533                    ComputeCommand::CreateDataflow(dataflow) => {
534                        // Attempt to find an existing match for the dataflow.
535                        let as_of = dataflow.as_of.as_ref().unwrap();
536                        let export_ids = dataflow.export_ids().collect::<BTreeSet<_>>();
537
538                        if let Some(old_dataflow) = old_dataflows.get(&export_ids) {
539                            let compatible = old_dataflow.compatible_with(dataflow);
540                            let uncompacted = !export_ids
541                                .iter()
542                                .flat_map(|id| old_frontiers.get(id))
543                                .any(|frontier| {
544                                    !timely::PartialOrder::less_equal(
545                                        *frontier,
546                                        dataflow.as_of.as_ref().unwrap(),
547                                    )
548                                });
549
550                            // We cannot reconcile subscribe and copy-to sinks at the moment,
551                            // because the response buffer is shared, and to a first approximation
552                            // must be completely reformed.
553                            let subscribe_free = dataflow.subscribe_ids().next().is_none();
554                            let copy_to_free = dataflow.copy_to_ids().next().is_none();
555
556                            // If we have replaced any dependency of this dataflow, we need to
557                            // replace this dataflow, to make it use the replacement.
558                            let dependencies_retained = dataflow
559                                .imported_index_ids()
560                                .all(|id| retain_ids.contains(&id));
561
562                            if compatible
563                                && uncompacted
564                                && subscribe_free
565                                && copy_to_free
566                                && dependencies_retained
567                            {
568                                // Match found; remove the match from the deletion queue,
569                                // and compact its outputs to the dataflow's `as_of`.
570                                old_dataflows.remove(&export_ids);
571                                for id in export_ids.iter() {
572                                    old_compaction.insert(*id, as_of.clone());
573                                }
574                                retain_ids.extend(export_ids);
575                            } else {
576                                warn!(
577                                    ?export_ids,
578                                    ?compatible,
579                                    ?uncompacted,
580                                    ?subscribe_free,
581                                    ?copy_to_free,
582                                    ?dependencies_retained,
583                                    old_as_of = ?old_dataflow.as_of,
584                                    new_as_of = ?as_of,
585                                    "dataflow reconciliation failed",
586                                );
587
588                                // Dump the full dataflow plans if they are incompatible, to
589                                // simplify debugging hard-to-reproduce reconciliation failures.
590                                if !compatible {
591                                    warn!(
592                                        old = ?old_dataflow,
593                                        new = ?dataflow,
594                                        "incompatible dataflows in reconciliation",
595                                    );
596                                }
597
598                                todo_commands
599                                    .push(ComputeCommand::CreateDataflow(dataflow.clone()));
600                            }
601
602                            compute_state.metrics.record_dataflow_reconciliation(
603                                compatible,
604                                uncompacted,
605                                subscribe_free,
606                                copy_to_free,
607                                dependencies_retained,
608                            );
609                        } else {
610                            todo_commands.push(ComputeCommand::CreateDataflow(dataflow.clone()));
611                        }
612                    }
613                    ComputeCommand::CreateInstance(config) => {
614                        // Cluster creation should not be performed again!
615                        if old_instance_config.map_or(false, |old| !old.compatible_with(config)) {
616                            halt!(
617                                "new instance configuration not compatible with existing instance configuration:\n{:?}\nvs\n{:?}",
618                                config,
619                                old_instance_config,
620                            );
621                        }
622                    }
623                    // All other commands we apply as requested.
624                    command => {
625                        todo_commands.push(command.clone());
626                    }
627                }
628            }
629
630            // Issue compaction commands first to reclaim resources.
631            for (_, dataflow) in old_dataflows.iter() {
632                for id in dataflow.export_ids() {
633                    // We want to drop anything that has not yet been dropped,
634                    // and nothing that has already been dropped.
635                    if old_frontiers.get(&id) != Some(&&Antichain::new()) {
636                        old_compaction.insert(id, Antichain::new());
637                    }
638                }
639            }
640            for (&id, frontier) in &old_compaction {
641                let frontier = frontier.clone();
642                todo_commands.insert(0, ComputeCommand::AllowCompaction { id, frontier });
643            }
644
645            // Clean up worker-local state.
646            //
647            // Various aspects of `ComputeState` need to be either uninstalled, or return to a blank slate.
648            // All dropped dataflows should clean up after themselves, as we plan to install new dataflows
649            // re-using the same identifiers.
650            // All re-used dataflows should roll back any believed communicated information (e.g. frontiers)
651            // so that they recommunicate that information as if from scratch.
652
653            // Remove all pending peeks.
654            for (_, peek) in std::mem::take(&mut compute_state.pending_peeks) {
655                // Log dropping the peek request.
656                if let Some(logger) = compute_state.compute_logger.as_mut() {
657                    logger.log(&peek.as_log_event(false));
658                }
659            }
660
661            for (&id, collection) in compute_state.collections.iter_mut() {
662                // Adjust reported frontiers:
663                //  * For dataflows we continue to use, reset to ensure we report something not
664                //    before the new `as_of` next.
665                //  * For dataflows we drop, set to the empty frontier, to ensure we don't report
666                //    anything for them.
667                let retained = retain_ids.contains(&id);
668                let compaction = old_compaction.remove(&id);
669                let new_reported_frontier = match (retained, compaction) {
670                    (true, Some(new_as_of)) => ReportedFrontier::NotReported { lower: new_as_of },
671                    (true, None) => {
672                        unreachable!("retained dataflows are compacted to the new as_of")
673                    }
674                    (false, Some(new_frontier)) => {
675                        assert!(new_frontier.is_empty());
676                        ReportedFrontier::Reported(new_frontier)
677                    }
678                    (false, None) => {
679                        // Logging dataflows are implicitly retained and don't have a new as_of.
680                        // Reset them to the minimal frontier.
681                        ReportedFrontier::new()
682                    }
683                };
684
685                collection.reset_reported_frontiers(new_reported_frontier);
686
687                // Sink tokens should be retained for retained dataflows, and dropped for dropped
688                // dataflows.
689                //
690                // Dropping the tokens of active subscribe and copy-tos makes them place
691                // `DroppedAt` responses into the respective response buffer. We drop those buffers
692                // in the next step, which ensures that we don't send out `DroppedAt` responses for
693                // subscribe/copy-tos dropped during reconciliation.
694                if !retained {
695                    collection.sink_token = None;
696                }
697            }
698
699            // We must drop the response buffers as they are global across all subscribe/copy-tos.
700            // If they were broken out by `GlobalId` then we could drop only the response buffers
701            // of dataflows we drop.
702            compute_state.subscribe_response_buffer = Rc::new(RefCell::new(Vec::new()));
703            compute_state.copy_to_response_buffer = Rc::new(RefCell::new(Vec::new()));
704
705            // The controller expects the logging collections to be readable from the minimum time
706            // initially. We cannot recreate the logging arrangements without restarting the
707            // instance, but we can pad the compacted times with empty data. Doing so is sound
708            // because logging collections from different replica incarnations are considered
709            // distinct TVCs, so the controller doesn't expect any historical consistency from
710            // these collections when it reconnects to a replica.
711            //
712            // TODO(database-issues#8152): Consider resolving this with controller-side reconciliation instead.
713            if let Some(config) = old_instance_config {
714                for id in config.logging.index_logs.values() {
715                    let trace = compute_state
716                        .traces
717                        .remove(id)
718                        .expect("logging trace exists");
719                    let padded = trace.into_padded();
720                    compute_state.traces.set(*id, padded);
721                }
722            }
723        } else {
724            todo_commands.clone_from(&new_commands);
725        }
726
727        // Execute the commands to bring us to `new_commands`.
728        for command in todo_commands.into_iter() {
729            self.handle_command(command);
730        }
731
732        // Overwrite `self.command_history` to reflect `new_commands`.
733        // It is possible that there still isn't a compute state yet.
734        if let Some(compute_state) = &mut self.compute_state {
735            let mut command_history = ComputeCommandHistory::new(self.metrics.for_history());
736            for command in new_commands.iter() {
737                command_history.push(command.clone());
738            }
739            compute_state.command_history = command_history;
740        }
741        Ok(())
742    }
743}
744
745/// Spawn a task to bridge between [`ClusterClient`] and [`Worker`] channels.
746///
747/// The [`Worker`] expects a pair of persistent channels, with punctuation marking reconnects,
748/// while the [`ClusterClient`] provides a new pair of channels on each reconnect.
749fn spawn_channel_adapter(
750    mut client_rx: mpsc::UnboundedReceiver<(
751        Uuid,
752        mpsc::UnboundedReceiver<ComputeCommand>,
753        mpsc::UnboundedSender<ComputeResponse>,
754    )>,
755    command_tx: command_channel::Sender,
756    mut response_rx: mpsc::UnboundedReceiver<(ComputeResponse, Uuid)>,
757    worker_id: usize,
758) {
759    mz_ore::task::spawn(
760        || format!("compute-channel-adapter-{worker_id}"),
761        async move {
762            // To make workers aware of the individual client connections, we tag forwarded
763            // commands with the client nonce. Additionally, we use the nonce to filter out
764            // responses with a different nonce, which are intended for different client
765            // connections.
766            //
767            // It's possible that we receive responses with nonces from the past but also from the
768            // future: Worker 0 might have received a new nonce before us and broadcasted it to our
769            // Timely cluster. When we receive a response with a future nonce, we need to wait with
770            // forwarding it until we have received the same nonce from a client connection.
771            //
772            // Nonces are not ordered so we don't know whether a response nonce is from the past or
773            // the future. We thus assume that every response with an unknown nonce might be from
774            // the future and stash them all. Every time we reconnect, we immediately send all
775            // stashed responses with a matching nonce. Every time we receive a new response with a
776            // nonce that matches our current one, we can discard the entire response stash as we
777            // know that all stashed responses must be from the past.
778            let mut stashed_responses = BTreeMap::<Uuid, Vec<ComputeResponse>>::new();
779
780            while let Some((nonce, mut command_rx, response_tx)) = client_rx.recv().await {
781                // Send stashed responses for this client.
782                if let Some(resps) = stashed_responses.remove(&nonce) {
783                    for resp in resps {
784                        let _ = response_tx.send(resp);
785                    }
786                }
787
788                // Wait for a new response while forwarding received commands.
789                let mut serve_rx_channels = async || loop {
790                    tokio::select! {
791                        msg = command_rx.recv() => match msg {
792                            Some(cmd) => command_tx.send((cmd, nonce)),
793                            None => return Err(()),
794                        },
795                        msg = response_rx.recv() => {
796                            return Ok(msg.expect("worker connected"));
797                        }
798                    }
799                };
800
801                // Serve this connection until we see any of the channels disconnect.
802                loop {
803                    let Ok((resp, resp_nonce)) = serve_rx_channels().await else {
804                        break;
805                    };
806
807                    if resp_nonce == nonce {
808                        // Response for the current connection; forward it.
809                        stashed_responses.clear();
810                        if response_tx.send(resp).is_err() {
811                            break;
812                        }
813                    } else {
814                        // Response for a past or future connection; stash it.
815                        let stash = stashed_responses.entry(resp_nonce).or_default();
816                        stash.push(resp);
817                    }
818                }
819            }
820        },
821    );
822}