mz_compute/server.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! An interactive dataflow server.
11
12use std::cell::RefCell;
13use std::collections::{BTreeMap, BTreeSet};
14use std::convert::Infallible;
15use std::fmt::Debug;
16use std::path::PathBuf;
17use std::rc::Rc;
18use std::sync::{Arc, Mutex};
19use std::thread;
20use std::time::{Duration, Instant};
21
22use anyhow::Error;
23use crossbeam_channel::SendError;
24use mz_cluster::client::{ClusterClient, ClusterSpec};
25use mz_cluster_client::client::TimelyConfig;
26use mz_compute_client::protocol::command::ComputeCommand;
27use mz_compute_client::protocol::history::ComputeCommandHistory;
28use mz_compute_client::protocol::response::ComputeResponse;
29use mz_compute_client::service::ComputeClient;
30use mz_ore::halt;
31use mz_ore::metrics::MetricsRegistry;
32use mz_ore::tracing::TracingHandle;
33use mz_persist_client::cache::PersistClientCache;
34use mz_storage_types::connections::ConnectionContext;
35use mz_txn_wal::operator::TxnsContext;
36use timely::communication::Allocate;
37use timely::progress::Antichain;
38use timely::worker::Worker as TimelyWorker;
39use tokio::sync::mpsc;
40use tracing::{info, trace, warn};
41use uuid::Uuid;
42
43use crate::command_channel;
44use crate::compute_state::{ActiveComputeState, ComputeState, ReportedFrontier};
45use crate::metrics::{ComputeMetrics, WorkerMetrics};
46
47/// Caller-provided configuration for compute.
48#[derive(Clone, Debug)]
49pub struct ComputeInstanceContext {
50 /// A directory that can be used for scratch work.
51 pub scratch_directory: Option<PathBuf>,
52 /// Whether to set core affinity for Timely workers.
53 pub worker_core_affinity: bool,
54 /// Context required to connect to an external sink from compute,
55 /// like the `CopyToS3OneshotSink` compute sink.
56 pub connection_context: ConnectionContext,
57}
58
59/// Configures the server with compute-specific metrics.
60#[derive(Debug, Clone)]
61struct Config {
62 /// `persist` client cache.
63 pub persist_clients: Arc<PersistClientCache>,
64 /// Context necessary for rendering txn-wal operators.
65 pub txns_ctx: TxnsContext,
66 /// A process-global handle to tracing configuration.
67 pub tracing_handle: Arc<TracingHandle>,
68 /// Metrics exposed by compute replicas.
69 pub metrics: ComputeMetrics,
70 /// Other configuration for compute.
71 pub context: ComputeInstanceContext,
72}
73
74/// Initiates a timely dataflow computation, processing compute commands.
75pub async fn serve(
76 timely_config: TimelyConfig,
77 metrics_registry: &MetricsRegistry,
78 persist_clients: Arc<PersistClientCache>,
79 txns_ctx: TxnsContext,
80 tracing_handle: Arc<TracingHandle>,
81 context: ComputeInstanceContext,
82) -> Result<impl Fn() -> Box<dyn ComputeClient> + use<>, Error> {
83 let config = Config {
84 persist_clients,
85 txns_ctx,
86 tracing_handle,
87 metrics: ComputeMetrics::register_with(metrics_registry),
88 context,
89 };
90 let tokio_executor = tokio::runtime::Handle::current();
91
92 let timely_container = config.build_cluster(timely_config, tokio_executor).await?;
93 let timely_container = Arc::new(Mutex::new(timely_container));
94
95 let client_builder = move || {
96 let client = ClusterClient::new(Arc::clone(&timely_container));
97 let client: Box<dyn ComputeClient> = Box::new(client);
98 client
99 };
100
101 Ok(client_builder)
102}
103
104/// Error type returned on connection nonce changes.
105///
106/// A nonce change informs workers that subsequent commands come a from a new client connection
107/// and therefore require reconciliation.
108struct NonceChange(Uuid);
109
110/// Endpoint used by workers to receive compute commands.
111///
112/// Observes nonce changes in the command stream and converts them into receive errors.
113struct CommandReceiver {
114 /// The channel supplying commands.
115 inner: command_channel::Receiver,
116 /// The ID of the Timely worker.
117 worker_id: usize,
118 /// The nonce identifying the current cluster protocol incarnation.
119 nonce: Option<Uuid>,
120 /// A stash to enable peeking the next command, used in `try_recv`.
121 stashed_command: Option<ComputeCommand>,
122}
123
124impl CommandReceiver {
125 fn new(inner: command_channel::Receiver, worker_id: usize) -> Self {
126 Self {
127 inner,
128 worker_id,
129 nonce: None,
130 stashed_command: None,
131 }
132 }
133
134 /// Receive the next pending command, if any.
135 ///
136 /// If the next command has a different nonce, this method instead returns an `Err`
137 /// containing the new nonce.
138 fn try_recv(&mut self) -> Result<Option<ComputeCommand>, NonceChange> {
139 if let Some(command) = self.stashed_command.take() {
140 return Ok(Some(command));
141 }
142 let Some((command, nonce)) = self.inner.try_recv() else {
143 return Ok(None);
144 };
145
146 trace!(worker = self.worker_id, %nonce, ?command, "received command");
147
148 if Some(nonce) == self.nonce {
149 Ok(Some(command))
150 } else {
151 self.nonce = Some(nonce);
152 self.stashed_command = Some(command);
153 Err(NonceChange(nonce))
154 }
155 }
156}
157
158/// Endpoint used by workers to send sending compute responses.
159///
160/// Tags responses with the current nonce, allowing receivers to filter out responses intended for
161/// previous client connections.
162pub(crate) struct ResponseSender {
163 /// The channel consuming responses.
164 inner: crossbeam_channel::Sender<(ComputeResponse, Uuid)>,
165 /// The ID of the Timely worker.
166 worker_id: usize,
167 /// The nonce identifying the current cluster protocol incarnation.
168 nonce: Option<Uuid>,
169}
170
171impl ResponseSender {
172 fn new(inner: crossbeam_channel::Sender<(ComputeResponse, Uuid)>, worker_id: usize) -> Self {
173 Self {
174 inner,
175 worker_id,
176 nonce: None,
177 }
178 }
179
180 /// Set the cluster protocol nonce.
181 fn set_nonce(&mut self, nonce: Uuid) {
182 self.nonce = Some(nonce);
183 }
184
185 /// Send a compute response.
186 pub fn send(&self, response: ComputeResponse) -> Result<(), SendError<ComputeResponse>> {
187 let nonce = self.nonce.expect("nonce must be initialized");
188
189 trace!(worker = self.worker_id, %nonce, ?response, "sending response");
190 self.inner
191 .send((response, nonce))
192 .map_err(|SendError((resp, _))| SendError(resp))
193 }
194}
195
196/// State maintained for each worker thread.
197///
198/// Much of this state can be viewed as local variables for the worker thread,
199/// holding state that persists across function calls.
200struct Worker<'w, A: Allocate> {
201 /// The underlying Timely worker.
202 timely_worker: &'w mut TimelyWorker<A>,
203 /// The channel over which commands are received.
204 command_rx: CommandReceiver,
205 /// The channel over which responses are sent.
206 response_tx: ResponseSender,
207 compute_state: Option<ComputeState>,
208 /// Compute metrics.
209 metrics: WorkerMetrics,
210 /// A process-global cache of (blob_uri, consensus_uri) -> PersistClient.
211 /// This is intentionally shared between workers
212 persist_clients: Arc<PersistClientCache>,
213 /// Context necessary for rendering txn-wal operators.
214 txns_ctx: TxnsContext,
215 /// A process-global handle to tracing configuration.
216 tracing_handle: Arc<TracingHandle>,
217 context: ComputeInstanceContext,
218}
219
220impl ClusterSpec for Config {
221 type Command = ComputeCommand;
222 type Response = ComputeResponse;
223
224 fn run_worker<A: Allocate + 'static>(
225 &self,
226 timely_worker: &mut TimelyWorker<A>,
227 client_rx: crossbeam_channel::Receiver<(
228 Uuid,
229 crossbeam_channel::Receiver<ComputeCommand>,
230 mpsc::UnboundedSender<ComputeResponse>,
231 )>,
232 ) {
233 if self.context.worker_core_affinity {
234 set_core_affinity(timely_worker.index());
235 }
236
237 let worker_id = timely_worker.index();
238 let metrics = self.metrics.for_worker(worker_id);
239
240 // Create the command channel that broadcasts commands from worker 0 to other workers. We
241 // reuse this channel between client connections, to avoid bugs where different workers end
242 // up creating incompatible sides of the channel dataflow after reconnects.
243 // See database-issues#8964.
244 let (cmd_tx, cmd_rx) = command_channel::render(timely_worker);
245 let (resp_tx, resp_rx) = crossbeam_channel::unbounded();
246
247 spawn_channel_adapter(client_rx, cmd_tx, resp_rx, worker_id);
248
249 Worker {
250 timely_worker,
251 command_rx: CommandReceiver::new(cmd_rx, worker_id),
252 response_tx: ResponseSender::new(resp_tx, worker_id),
253 metrics,
254 context: self.context.clone(),
255 persist_clients: Arc::clone(&self.persist_clients),
256 txns_ctx: self.txns_ctx.clone(),
257 compute_state: None,
258 tracing_handle: Arc::clone(&self.tracing_handle),
259 }
260 .run()
261 }
262}
263
264/// Set the current thread's core affinity, based on the given `worker_id`.
265#[cfg(not(target_os = "macos"))]
266fn set_core_affinity(worker_id: usize) {
267 use tracing::error;
268
269 let Some(mut core_ids) = core_affinity::get_core_ids() else {
270 error!(worker_id, "unable to get core IDs for setting affinity");
271 return;
272 };
273
274 // The `get_core_ids` docs don't say anything about a guaranteed order of the returned Vec,
275 // so sort it just to be safe.
276 core_ids.sort_unstable_by_key(|i| i.id);
277
278 // On multi-process replicas `worker_id` might be greater than the number of available cores.
279 // However, we assume that we always have at least as many cores as there are local workers.
280 // Violating this assumption is safe but might lead to degraded performance due to skew in core
281 // utilization.
282 let idx = worker_id % core_ids.len();
283 let core_id = core_ids[idx];
284
285 if core_affinity::set_for_current(core_id) {
286 info!(
287 worker_id,
288 core_id = core_id.id,
289 "set core affinity for worker"
290 );
291 } else {
292 error!(
293 worker_id,
294 core_id = core_id.id,
295 "failed to set core affinity for worker"
296 )
297 }
298}
299
300/// Set the current thread's core affinity, based on the given `worker_id`.
301#[cfg(target_os = "macos")]
302fn set_core_affinity(_worker_id: usize) {
303 // Setting core affinity is known to not work on Apple Silicon:
304 // https://github.com/Elzair/core_affinity_rs/issues/22
305 info!("setting core affinity is not supported on macOS");
306}
307
308impl<'w, A: Allocate + 'static> Worker<'w, A> {
309 /// Runs a compute worker.
310 pub fn run(&mut self) {
311 // The command receiver is initialized without an nonce, so receiving the first command
312 // always triggers a nonce change.
313 let NonceChange(nonce) = self.recv_command().expect_err("change to first nonce");
314 self.set_nonce(nonce);
315
316 loop {
317 let Err(NonceChange(nonce)) = self.run_client();
318 self.set_nonce(nonce);
319 }
320 }
321
322 fn set_nonce(&mut self, nonce: Uuid) {
323 self.response_tx.set_nonce(nonce);
324 }
325
326 /// Handles commands for a client connection, returns when the nonce changes.
327 fn run_client(&mut self) -> Result<Infallible, NonceChange> {
328 self.reconcile()?;
329
330 // The last time we did periodic maintenance.
331 let mut last_maintenance = Instant::now();
332
333 // Commence normal operation.
334 loop {
335 // Get the maintenance interval, default to zero if we don't have a compute state.
336 let maintenance_interval = self
337 .compute_state
338 .as_ref()
339 .map_or(Duration::ZERO, |state| state.server_maintenance_interval);
340
341 let now = Instant::now();
342 // Determine if we need to perform maintenance, which is true if `maintenance_interval`
343 // time has passed since the last maintenance.
344 let sleep_duration;
345 if now >= last_maintenance + maintenance_interval {
346 last_maintenance = now;
347 sleep_duration = None;
348
349 // Report frontier information back the coordinator.
350 if let Some(mut compute_state) = self.activate_compute() {
351 compute_state.compute_state.traces.maintenance();
352 compute_state.report_frontiers();
353 compute_state.report_metrics();
354 compute_state.check_expiration();
355 }
356
357 self.metrics.record_shared_row_metrics();
358 } else {
359 // We didn't perform maintenance, sleep until the next maintenance interval.
360 let next_maintenance = last_maintenance + maintenance_interval;
361 sleep_duration = Some(next_maintenance.saturating_duration_since(now))
362 };
363
364 // Step the timely worker, recording the time taken.
365 let timer = self.metrics.timely_step_duration_seconds.start_timer();
366 self.timely_worker.step_or_park(sleep_duration);
367 timer.observe_duration();
368
369 self.handle_pending_commands()?;
370
371 if let Some(mut compute_state) = self.activate_compute() {
372 compute_state.process_peeks();
373 compute_state.process_subscribes();
374 compute_state.process_copy_tos();
375 }
376 }
377 }
378
379 fn handle_pending_commands(&mut self) -> Result<(), NonceChange> {
380 while let Some(cmd) = self.command_rx.try_recv()? {
381 self.handle_command(cmd);
382 }
383 Ok(())
384 }
385
386 fn handle_command(&mut self, cmd: ComputeCommand) {
387 match &cmd {
388 ComputeCommand::CreateInstance(_) => {
389 self.compute_state = Some(ComputeState::new(
390 Arc::clone(&self.persist_clients),
391 self.txns_ctx.clone(),
392 self.metrics.clone(),
393 Arc::clone(&self.tracing_handle),
394 self.context.clone(),
395 ));
396 }
397 _ => (),
398 }
399 self.activate_compute().unwrap().handle_compute_command(cmd);
400 }
401
402 fn activate_compute(&mut self) -> Option<ActiveComputeState<'_, A>> {
403 if let Some(compute_state) = &mut self.compute_state {
404 Some(ActiveComputeState {
405 timely_worker: &mut *self.timely_worker,
406 compute_state,
407 response_tx: &mut self.response_tx,
408 })
409 } else {
410 None
411 }
412 }
413
414 /// Receive the next compute command.
415 ///
416 /// This method blocks if no command is currently available, but takes care to step the Timely
417 /// worker while doing so.
418 fn recv_command(&mut self) -> Result<ComputeCommand, NonceChange> {
419 loop {
420 if let Some(cmd) = self.command_rx.try_recv()? {
421 return Ok(cmd);
422 }
423
424 let start = Instant::now();
425 self.timely_worker.step_or_park(None);
426 self.metrics
427 .timely_step_duration_seconds
428 .observe(start.elapsed().as_secs_f64());
429 }
430 }
431
432 /// Extract commands until `InitializationComplete`, and make the worker reflect those commands.
433 ///
434 /// This method is meant to be a function of the commands received thus far (as recorded in the
435 /// compute state command history) and the new commands from `command_rx`. It should not be a
436 /// function of other characteristics, like whether the worker has managed to respond to a peek
437 /// or not. Some effort goes in to narrowing our view to only the existing commands we can be sure
438 /// are live at all other workers.
439 ///
440 /// The methodology here is to drain `command_rx` until an `InitializationComplete`, at which point
441 /// the prior commands are "reconciled" in. Reconciliation takes each goal dataflow and looks for an
442 /// existing "compatible" dataflow (per `compatible()`) it can repurpose, with some additional tests
443 /// to be sure that we can cut over from one to the other (no additional compaction, no tails/sinks).
444 /// With any connections established, old orphaned dataflows are allow to compact away, and any new
445 /// dataflows are created from scratch. "Kept" dataflows are allowed to compact up to any new `as_of`.
446 ///
447 /// Some additional tidying happens, cleaning up pending peeks, reported frontiers, and creating a new
448 /// subscribe response buffer. We will need to be vigilant with future modifications to `ComputeState` to
449 /// line up changes there with clean resets here.
450 fn reconcile(&mut self) -> Result<(), NonceChange> {
451 // To initialize the connection, we want to drain all commands until we receive a
452 // `ComputeCommand::InitializationComplete` command to form a target command state.
453 let mut new_commands = Vec::new();
454 loop {
455 match self.recv_command()? {
456 ComputeCommand::InitializationComplete => break,
457 command => new_commands.push(command),
458 }
459 }
460
461 // Commands we will need to apply before entering normal service.
462 // These commands may include dropping existing dataflows, compacting existing dataflows,
463 // and creating new dataflows, in addition to standard peek and compaction commands.
464 // The result should be the same as if dropping all dataflows and running `new_commands`.
465 let mut todo_commands = Vec::new();
466 // We only have a compute history if we are in an initialized state
467 // (i.e. after a `CreateInstance`).
468 // If this is not the case, just copy `new_commands` into `todo_commands`.
469 if let Some(compute_state) = &mut self.compute_state {
470 // Reduce the installed commands.
471 // Importantly, act as if all peeks may have been retired (as we cannot know otherwise).
472 compute_state.command_history.discard_peeks();
473 compute_state.command_history.reduce();
474
475 // At this point, we need to sort out which of the *certainly installed* dataflows are
476 // suitable replacements for the requested dataflows. A dataflow is "certainly installed"
477 // as of a frontier if its compaction allows it to go no further. We ignore peeks for this
478 // reasoning, as we cannot be certain that peeks still exist at any other worker.
479
480 // Having reduced our installed command history retaining no peeks (above), we should be able
481 // to use track down installed dataflows we can use as surrogates for requested dataflows (which
482 // have retained all of their peeks, creating a more demanding `as_of` requirement).
483 // NB: installed dataflows may still be allowed to further compact, and we should double check
484 // this before being too confident. It should be rare without peeks, but could happen with e.g.
485 // multiple outputs of a dataflow.
486
487 // The values with which a prior `CreateInstance` was called, if it was.
488 let mut old_instance_config = None;
489 // Index dataflows by `export_ids().collect()`, as this is a precondition for their compatibility.
490 let mut old_dataflows = BTreeMap::default();
491 // Maintain allowed compaction, in case installed identifiers may have been allowed to compact.
492 let mut old_frontiers = BTreeMap::default();
493 for command in compute_state.command_history.iter() {
494 match command {
495 ComputeCommand::CreateInstance(config) => {
496 old_instance_config = Some(config);
497 }
498 ComputeCommand::CreateDataflow(dataflow) => {
499 let export_ids = dataflow.export_ids().collect::<BTreeSet<_>>();
500 old_dataflows.insert(export_ids, dataflow);
501 }
502 ComputeCommand::AllowCompaction { id, frontier } => {
503 old_frontiers.insert(id, frontier);
504 }
505 _ => {
506 // Nothing to do in these cases.
507 }
508 }
509 }
510
511 // Compaction commands that can be applied to existing dataflows.
512 let mut old_compaction = BTreeMap::default();
513 // Exported identifiers from dataflows we retain.
514 let mut retain_ids = BTreeSet::default();
515
516 // Traverse new commands, sorting out what remediation we can do.
517 for command in new_commands.iter() {
518 match command {
519 ComputeCommand::CreateDataflow(dataflow) => {
520 // Attempt to find an existing match for the dataflow.
521 let as_of = dataflow.as_of.as_ref().unwrap();
522 let export_ids = dataflow.export_ids().collect::<BTreeSet<_>>();
523
524 if let Some(old_dataflow) = old_dataflows.get(&export_ids) {
525 let compatible = old_dataflow.compatible_with(dataflow);
526 let uncompacted = !export_ids
527 .iter()
528 .flat_map(|id| old_frontiers.get(id))
529 .any(|frontier| {
530 !timely::PartialOrder::less_equal(
531 *frontier,
532 dataflow.as_of.as_ref().unwrap(),
533 )
534 });
535
536 // We cannot reconcile subscribe and copy-to sinks at the moment,
537 // because the response buffer is shared, and to a first approximation
538 // must be completely reformed.
539 let subscribe_free = dataflow.subscribe_ids().next().is_none();
540 let copy_to_free = dataflow.copy_to_ids().next().is_none();
541
542 // If we have replaced any dependency of this dataflow, we need to
543 // replace this dataflow, to make it use the replacement.
544 let dependencies_retained = dataflow
545 .imported_index_ids()
546 .all(|id| retain_ids.contains(&id));
547
548 if compatible
549 && uncompacted
550 && subscribe_free
551 && copy_to_free
552 && dependencies_retained
553 {
554 // Match found; remove the match from the deletion queue,
555 // and compact its outputs to the dataflow's `as_of`.
556 old_dataflows.remove(&export_ids);
557 for id in export_ids.iter() {
558 old_compaction.insert(*id, as_of.clone());
559 }
560 retain_ids.extend(export_ids);
561 } else {
562 warn!(
563 ?export_ids,
564 ?compatible,
565 ?uncompacted,
566 ?subscribe_free,
567 ?copy_to_free,
568 ?dependencies_retained,
569 old_as_of = ?old_dataflow.as_of,
570 new_as_of = ?as_of,
571 "dataflow reconciliation failed",
572 );
573
574 // Dump the full dataflow plans if they are incompatible, to
575 // simplify debugging hard-to-reproduce reconciliation failures.
576 if !compatible {
577 warn!(
578 old = ?old_dataflow,
579 new = ?dataflow,
580 "incompatible dataflows in reconciliation",
581 );
582 }
583
584 todo_commands
585 .push(ComputeCommand::CreateDataflow(dataflow.clone()));
586 }
587
588 compute_state.metrics.record_dataflow_reconciliation(
589 compatible,
590 uncompacted,
591 subscribe_free,
592 copy_to_free,
593 dependencies_retained,
594 );
595 } else {
596 todo_commands.push(ComputeCommand::CreateDataflow(dataflow.clone()));
597 }
598 }
599 ComputeCommand::CreateInstance(config) => {
600 // Cluster creation should not be performed again!
601 if old_instance_config.map_or(false, |old| !old.compatible_with(config)) {
602 halt!(
603 "new instance configuration not compatible with existing instance configuration:\n{:?}\nvs\n{:?}",
604 config,
605 old_instance_config,
606 );
607 }
608 }
609 // All other commands we apply as requested.
610 command => {
611 todo_commands.push(command.clone());
612 }
613 }
614 }
615
616 // Issue compaction commands first to reclaim resources.
617 for (_, dataflow) in old_dataflows.iter() {
618 for id in dataflow.export_ids() {
619 // We want to drop anything that has not yet been dropped,
620 // and nothing that has already been dropped.
621 if old_frontiers.get(&id) != Some(&&Antichain::new()) {
622 old_compaction.insert(id, Antichain::new());
623 }
624 }
625 }
626 for (&id, frontier) in &old_compaction {
627 let frontier = frontier.clone();
628 todo_commands.insert(0, ComputeCommand::AllowCompaction { id, frontier });
629 }
630
631 // Clean up worker-local state.
632 //
633 // Various aspects of `ComputeState` need to be either uninstalled, or return to a blank slate.
634 // All dropped dataflows should clean up after themselves, as we plan to install new dataflows
635 // re-using the same identifiers.
636 // All re-used dataflows should roll back any believed communicated information (e.g. frontiers)
637 // so that they recommunicate that information as if from scratch.
638
639 // Remove all pending peeks.
640 for (_, peek) in std::mem::take(&mut compute_state.pending_peeks) {
641 // Log dropping the peek request.
642 if let Some(logger) = compute_state.compute_logger.as_mut() {
643 logger.log(&peek.as_log_event(false));
644 }
645 }
646
647 for (&id, collection) in compute_state.collections.iter_mut() {
648 // Adjust reported frontiers:
649 // * For dataflows we continue to use, reset to ensure we report something not
650 // before the new `as_of` next.
651 // * For dataflows we drop, set to the empty frontier, to ensure we don't report
652 // anything for them.
653 let retained = retain_ids.contains(&id);
654 let compaction = old_compaction.remove(&id);
655 let new_reported_frontier = match (retained, compaction) {
656 (true, Some(new_as_of)) => ReportedFrontier::NotReported { lower: new_as_of },
657 (true, None) => {
658 unreachable!("retained dataflows are compacted to the new as_of")
659 }
660 (false, Some(new_frontier)) => {
661 assert!(new_frontier.is_empty());
662 ReportedFrontier::Reported(new_frontier)
663 }
664 (false, None) => {
665 // Logging dataflows are implicitly retained and don't have a new as_of.
666 // Reset them to the minimal frontier.
667 ReportedFrontier::new()
668 }
669 };
670
671 collection.reset_reported_frontiers(new_reported_frontier);
672
673 // Sink tokens should be retained for retained dataflows, and dropped for dropped
674 // dataflows.
675 //
676 // Dropping the tokens of active subscribe and copy-tos makes them place
677 // `DroppedAt` responses into the respective response buffer. We drop those buffers
678 // in the next step, which ensures that we don't send out `DroppedAt` responses for
679 // subscribe/copy-tos dropped during reconciliation.
680 if !retained {
681 collection.sink_token = None;
682 }
683 }
684
685 // We must drop the response buffers as they are global across all subscribe/copy-tos.
686 // If they were broken out by `GlobalId` then we could drop only the response buffers
687 // of dataflows we drop.
688 compute_state.subscribe_response_buffer = Rc::new(RefCell::new(Vec::new()));
689 compute_state.copy_to_response_buffer = Rc::new(RefCell::new(Vec::new()));
690
691 // The controller expects the logging collections to be readable from the minimum time
692 // initially. We cannot recreate the logging arrangements without restarting the
693 // instance, but we can pad the compacted times with empty data. Doing so is sound
694 // because logging collections from different replica incarnations are considered
695 // distinct TVCs, so the controller doesn't expect any historical consistency from
696 // these collections when it reconnects to a replica.
697 //
698 // TODO(database-issues#8152): Consider resolving this with controller-side reconciliation instead.
699 if let Some(config) = old_instance_config {
700 for id in config.logging.index_logs.values() {
701 let trace = compute_state
702 .traces
703 .remove(id)
704 .expect("logging trace exists");
705 let padded = trace.into_padded();
706 compute_state.traces.set(*id, padded);
707 }
708 }
709 } else {
710 todo_commands.clone_from(&new_commands);
711 }
712
713 // Execute the commands to bring us to `new_commands`.
714 for command in todo_commands.into_iter() {
715 self.handle_command(command);
716 }
717
718 // Overwrite `self.command_history` to reflect `new_commands`.
719 // It is possible that there still isn't a compute state yet.
720 if let Some(compute_state) = &mut self.compute_state {
721 let mut command_history = ComputeCommandHistory::new(self.metrics.for_history());
722 for command in new_commands.iter() {
723 command_history.push(command.clone());
724 }
725 compute_state.command_history = command_history;
726 }
727 Ok(())
728 }
729}
730
731/// Spawn a thread to bridge between [`ClusterClient`] and [`Worker`] channels.
732///
733/// The [`Worker`] expects a pair of persistent channels, with punctuation marking reconnects,
734/// while the [`ClusterClient`] provides a new pair of channels on each reconnect.
735fn spawn_channel_adapter(
736 client_rx: crossbeam_channel::Receiver<(
737 Uuid,
738 crossbeam_channel::Receiver<ComputeCommand>,
739 mpsc::UnboundedSender<ComputeResponse>,
740 )>,
741 command_tx: command_channel::Sender,
742 response_rx: crossbeam_channel::Receiver<(ComputeResponse, Uuid)>,
743 worker_id: usize,
744) {
745 thread::Builder::new()
746 // "cca" stands for "compute channel adapter". We need to shorten that because Linux has a
747 // 15-character limit for thread names.
748 .name(format!("cca-{worker_id}"))
749 .spawn(move || {
750 // To make workers aware of the individual client connections, we tag forwarded
751 // commands with the client nonce. Additionally, we use the nonce to filter out
752 // responses with a different nonce, which are intended for different client
753 // connections.
754 //
755 // It's possible that we receive responses with nonces from the past but also from the
756 // future: Worker 0 might have received a new nonce before us and broadcasted it to our
757 // Timely cluster. When we receive a response with a future nonce, we need to wait with
758 // forwarding it until we have received the same nonce from a client connection.
759 //
760 // Nonces are not ordered so we don't know whether a response nonce is from the past or
761 // the future. We thus assume that every response with an unknown nonce might be from
762 // the future and stash them all. Every time we reconnect, we immediately send all
763 // stashed responses with a matching nonce. Every time we receive a new response with a
764 // nonce that matches our current one, we can discard the entire response stash as we
765 // know that all stashed responses must be from the past.
766 let mut stashed_responses = BTreeMap::<Uuid, Vec<ComputeResponse>>::new();
767
768 while let Ok((nonce, command_rx, response_tx)) = client_rx.recv() {
769 // Send stashed responses for this client.
770 if let Some(resps) = stashed_responses.remove(&nonce) {
771 for resp in resps {
772 let _ = response_tx.send(resp);
773 }
774 }
775
776 // Wait for a new response while forwarding received commands.
777 let serve_rx_channels = || loop {
778 crossbeam_channel::select! {
779 recv(command_rx) -> msg => match msg {
780 Ok(cmd) => command_tx.send((cmd, nonce)),
781 Err(_) => return Err(()),
782 },
783 recv(response_rx) -> msg => {
784 return Ok(msg.expect("worker connected"));
785 }
786 }
787 };
788
789 // Serve this connection until we see any of the channels disconnect.
790 loop {
791 let Ok((resp, resp_nonce)) = serve_rx_channels() else {
792 break;
793 };
794
795 if resp_nonce == nonce {
796 // Response for the current connection; forward it.
797 stashed_responses.clear();
798 if response_tx.send(resp).is_err() {
799 break;
800 }
801 } else {
802 // Response for a past or future connection; stash it.
803 let stash = stashed_responses.entry(resp_nonce).or_default();
804 stash.push(resp);
805 }
806 }
807 }
808 })
809 .unwrap();
810}