mz_compute/server.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! An interactive dataflow server.
11
12use std::cell::RefCell;
13use std::collections::{BTreeMap, BTreeSet};
14use std::convert::Infallible;
15use std::fmt::Debug;
16use std::path::PathBuf;
17use std::rc::Rc;
18use std::sync::{Arc, Mutex};
19use std::time::{Duration, Instant};
20
21use anyhow::Error;
22use mz_cluster::client::{ClusterClient, ClusterSpec};
23use mz_cluster_client::client::TimelyConfig;
24use mz_compute_client::protocol::command::ComputeCommand;
25use mz_compute_client::protocol::history::ComputeCommandHistory;
26use mz_compute_client::protocol::response::ComputeResponse;
27use mz_compute_client::service::ComputeClient;
28use mz_ore::halt;
29use mz_ore::metrics::MetricsRegistry;
30use mz_ore::tracing::TracingHandle;
31use mz_persist_client::cache::PersistClientCache;
32use mz_storage_types::connections::ConnectionContext;
33use mz_txn_wal::operator::TxnsContext;
34use timely::communication::Allocate;
35use timely::progress::Antichain;
36use timely::worker::Worker as TimelyWorker;
37use tokio::sync::mpsc;
38use tokio::sync::mpsc::error::SendError;
39use tracing::{info, trace, warn};
40use uuid::Uuid;
41
42use crate::command_channel;
43use crate::compute_state::{ActiveComputeState, ComputeState, ReportedFrontier};
44use crate::metrics::{ComputeMetrics, WorkerMetrics};
45
46/// Caller-provided configuration for compute.
47#[derive(Clone, Debug)]
48pub struct ComputeInstanceContext {
49 /// A directory that can be used for scratch work.
50 pub scratch_directory: Option<PathBuf>,
51 /// Whether to set core affinity for Timely workers.
52 pub worker_core_affinity: bool,
53 /// Context required to connect to an external sink from compute,
54 /// like the `CopyToS3OneshotSink` compute sink.
55 pub connection_context: ConnectionContext,
56}
57
58/// Configures the server with compute-specific metrics.
59#[derive(Debug, Clone)]
60struct Config {
61 /// `persist` client cache.
62 pub persist_clients: Arc<PersistClientCache>,
63 /// Context necessary for rendering txn-wal operators.
64 pub txns_ctx: TxnsContext,
65 /// A process-global handle to tracing configuration.
66 pub tracing_handle: Arc<TracingHandle>,
67 /// Metrics exposed by compute replicas.
68 pub metrics: ComputeMetrics,
69 /// Other configuration for compute.
70 pub context: ComputeInstanceContext,
71}
72
73/// Initiates a timely dataflow computation, processing compute commands.
74pub async fn serve(
75 timely_config: TimelyConfig,
76 metrics_registry: &MetricsRegistry,
77 persist_clients: Arc<PersistClientCache>,
78 txns_ctx: TxnsContext,
79 tracing_handle: Arc<TracingHandle>,
80 context: ComputeInstanceContext,
81) -> Result<impl Fn() -> Box<dyn ComputeClient> + use<>, Error> {
82 let config = Config {
83 persist_clients,
84 txns_ctx,
85 tracing_handle,
86 metrics: ComputeMetrics::register_with(metrics_registry),
87 context,
88 };
89 let tokio_executor = tokio::runtime::Handle::current();
90
91 let timely_container = config.build_cluster(timely_config, tokio_executor).await?;
92 let timely_container = Arc::new(Mutex::new(timely_container));
93
94 let client_builder = move || {
95 let client = ClusterClient::new(Arc::clone(&timely_container));
96 let client: Box<dyn ComputeClient> = Box::new(client);
97 client
98 };
99
100 Ok(client_builder)
101}
102
103/// Error type returned on connection nonce changes.
104///
105/// A nonce change informs workers that subsequent commands come a from a new client connection
106/// and therefore require reconciliation.
107struct NonceChange(Uuid);
108
109/// Endpoint used by workers to receive compute commands.
110///
111/// Observes nonce changes in the command stream and converts them into receive errors.
112struct CommandReceiver {
113 /// The channel supplying commands.
114 inner: command_channel::Receiver,
115 /// The ID of the Timely worker.
116 worker_id: usize,
117 /// The nonce identifying the current cluster protocol incarnation.
118 nonce: Option<Uuid>,
119 /// A stash to enable peeking the next command, used in `try_recv`.
120 stashed_command: Option<ComputeCommand>,
121}
122
123impl CommandReceiver {
124 fn new(inner: command_channel::Receiver, worker_id: usize) -> Self {
125 Self {
126 inner,
127 worker_id,
128 nonce: None,
129 stashed_command: None,
130 }
131 }
132
133 /// Receive the next pending command, if any.
134 ///
135 /// If the next command has a different nonce, this method instead returns an `Err`
136 /// containing the new nonce.
137 fn try_recv(&mut self) -> Result<Option<ComputeCommand>, NonceChange> {
138 if let Some(command) = self.stashed_command.take() {
139 return Ok(Some(command));
140 }
141 let Some((command, nonce)) = self.inner.try_recv() else {
142 return Ok(None);
143 };
144
145 trace!(worker = self.worker_id, %nonce, ?command, "received command");
146
147 if Some(nonce) == self.nonce {
148 Ok(Some(command))
149 } else {
150 self.nonce = Some(nonce);
151 self.stashed_command = Some(command);
152 Err(NonceChange(nonce))
153 }
154 }
155}
156
157/// Endpoint used by workers to send sending compute responses.
158///
159/// Tags responses with the current nonce, allowing receivers to filter out responses intended for
160/// previous client connections.
161pub(crate) struct ResponseSender {
162 /// The channel consuming responses.
163 inner: mpsc::UnboundedSender<(ComputeResponse, Uuid)>,
164 /// The ID of the Timely worker.
165 worker_id: usize,
166 /// The nonce identifying the current cluster protocol incarnation.
167 nonce: Option<Uuid>,
168}
169
170impl ResponseSender {
171 fn new(inner: mpsc::UnboundedSender<(ComputeResponse, Uuid)>, worker_id: usize) -> Self {
172 Self {
173 inner,
174 worker_id,
175 nonce: None,
176 }
177 }
178
179 /// Set the cluster protocol nonce.
180 fn set_nonce(&mut self, nonce: Uuid) {
181 self.nonce = Some(nonce);
182 }
183
184 /// Send a compute response.
185 pub fn send(&self, response: ComputeResponse) -> Result<(), SendError<ComputeResponse>> {
186 let nonce = self.nonce.expect("nonce must be initialized");
187
188 trace!(worker = self.worker_id, %nonce, ?response, "sending response");
189 self.inner
190 .send((response, nonce))
191 .map_err(|SendError((resp, _))| SendError(resp))
192 }
193}
194
195/// State maintained for each worker thread.
196///
197/// Much of this state can be viewed as local variables for the worker thread,
198/// holding state that persists across function calls.
199struct Worker<'w, A: Allocate> {
200 /// The underlying Timely worker.
201 timely_worker: &'w mut TimelyWorker<A>,
202 /// The channel over which commands are received.
203 command_rx: CommandReceiver,
204 /// The channel over which responses are sent.
205 response_tx: ResponseSender,
206 compute_state: Option<ComputeState>,
207 /// Compute metrics.
208 metrics: WorkerMetrics,
209 /// A process-global cache of (blob_uri, consensus_uri) -> PersistClient.
210 /// This is intentionally shared between workers
211 persist_clients: Arc<PersistClientCache>,
212 /// Context necessary for rendering txn-wal operators.
213 txns_ctx: TxnsContext,
214 /// A process-global handle to tracing configuration.
215 tracing_handle: Arc<TracingHandle>,
216 context: ComputeInstanceContext,
217}
218
219impl ClusterSpec for Config {
220 type Command = ComputeCommand;
221 type Response = ComputeResponse;
222
223 fn run_worker<A: Allocate + 'static>(
224 &self,
225 timely_worker: &mut TimelyWorker<A>,
226 client_rx: mpsc::UnboundedReceiver<(
227 Uuid,
228 mpsc::UnboundedReceiver<ComputeCommand>,
229 mpsc::UnboundedSender<ComputeResponse>,
230 )>,
231 ) {
232 if self.context.worker_core_affinity {
233 set_core_affinity(timely_worker.index());
234 }
235
236 let worker_id = timely_worker.index();
237 let metrics = self.metrics.for_worker(worker_id);
238
239 // Create the command channel that broadcasts commands from worker 0 to other workers. We
240 // reuse this channel between client connections, to avoid bugs where different workers end
241 // up creating incompatible sides of the channel dataflow after reconnects.
242 // See database-issues#8964.
243 let (cmd_tx, cmd_rx) = command_channel::render(timely_worker);
244 let (resp_tx, resp_rx) = mpsc::unbounded_channel();
245
246 spawn_channel_adapter(client_rx, cmd_tx, resp_rx, worker_id);
247
248 Worker {
249 timely_worker,
250 command_rx: CommandReceiver::new(cmd_rx, worker_id),
251 response_tx: ResponseSender::new(resp_tx, worker_id),
252 metrics,
253 context: self.context.clone(),
254 persist_clients: Arc::clone(&self.persist_clients),
255 txns_ctx: self.txns_ctx.clone(),
256 compute_state: None,
257 tracing_handle: Arc::clone(&self.tracing_handle),
258 }
259 .run()
260 }
261}
262
263/// Set the current thread's core affinity, based on the given `worker_id`.
264#[cfg(not(target_os = "macos"))]
265fn set_core_affinity(worker_id: usize) {
266 use tracing::error;
267
268 let Some(mut core_ids) = core_affinity::get_core_ids() else {
269 error!(worker_id, "unable to get core IDs for setting affinity");
270 return;
271 };
272
273 // The `get_core_ids` docs don't say anything about a guaranteed order of the returned Vec,
274 // so sort it just to be safe.
275 core_ids.sort_unstable_by_key(|i| i.id);
276
277 // On multi-process replicas `worker_id` might be greater than the number of available cores.
278 // However, we assume that we always have at least as many cores as there are local workers.
279 // Violating this assumption is safe but might lead to degraded performance due to skew in core
280 // utilization.
281 let idx = worker_id % core_ids.len();
282 let core_id = core_ids[idx];
283
284 if core_affinity::set_for_current(core_id) {
285 info!(
286 worker_id,
287 core_id = core_id.id,
288 "set core affinity for worker"
289 );
290 } else {
291 error!(
292 worker_id,
293 core_id = core_id.id,
294 "failed to set core affinity for worker"
295 )
296 }
297}
298
299/// Set the current thread's core affinity, based on the given `worker_id`.
300#[cfg(target_os = "macos")]
301fn set_core_affinity(_worker_id: usize) {
302 // Setting core affinity is known to not work on Apple Silicon:
303 // https://github.com/Elzair/core_affinity_rs/issues/22
304 info!("setting core affinity is not supported on macOS");
305}
306
307impl<'w, A: Allocate + 'static> Worker<'w, A> {
308 /// Runs a compute worker.
309 pub fn run(&mut self) {
310 // The command receiver is initialized without an nonce, so receiving the first command
311 // always triggers a nonce change.
312 let NonceChange(nonce) = self.recv_command().expect_err("change to first nonce");
313 self.set_nonce(nonce);
314
315 loop {
316 let Err(NonceChange(nonce)) = self.run_client();
317 self.set_nonce(nonce);
318 }
319 }
320
321 fn set_nonce(&mut self, nonce: Uuid) {
322 self.response_tx.set_nonce(nonce);
323 }
324
325 /// Handles commands for a client connection, returns when the nonce changes.
326 fn run_client(&mut self) -> Result<Infallible, NonceChange> {
327 self.reconcile()?;
328
329 // The last time we did periodic maintenance.
330 let mut last_maintenance = Instant::now();
331
332 // Commence normal operation.
333 loop {
334 // Get the maintenance interval, default to zero if we don't have a compute state.
335 let maintenance_interval = self
336 .compute_state
337 .as_ref()
338 .map_or(Duration::ZERO, |state| state.server_maintenance_interval);
339
340 let now = Instant::now();
341 // Determine if we need to perform maintenance, which is true if `maintenance_interval`
342 // time has passed since the last maintenance.
343 let sleep_duration;
344 if now >= last_maintenance + maintenance_interval {
345 last_maintenance = now;
346 sleep_duration = None;
347
348 // Report frontier information back the coordinator.
349 if let Some(mut compute_state) = self.activate_compute() {
350 compute_state.compute_state.traces.maintenance();
351 compute_state.report_frontiers();
352 compute_state.report_metrics();
353 compute_state.check_expiration();
354 }
355
356 self.metrics.record_shared_row_metrics();
357 } else {
358 // We didn't perform maintenance, sleep until the next maintenance interval.
359 let next_maintenance = last_maintenance + maintenance_interval;
360 sleep_duration = Some(next_maintenance.saturating_duration_since(now))
361 };
362
363 // Step the timely worker, recording the time taken.
364 let timer = self.metrics.timely_step_duration_seconds.start_timer();
365 self.timely_worker.step_or_park(sleep_duration);
366 timer.observe_duration();
367
368 self.handle_pending_commands()?;
369
370 if let Some(mut compute_state) = self.activate_compute() {
371 compute_state.process_peeks();
372 compute_state.process_subscribes();
373 compute_state.process_copy_tos();
374 }
375 }
376 }
377
378 fn handle_pending_commands(&mut self) -> Result<(), NonceChange> {
379 while let Some(cmd) = self.command_rx.try_recv()? {
380 self.handle_command(cmd);
381 }
382 Ok(())
383 }
384
385 fn handle_command(&mut self, cmd: ComputeCommand) {
386 match &cmd {
387 ComputeCommand::CreateInstance(_) => {
388 self.compute_state = Some(ComputeState::new(
389 Arc::clone(&self.persist_clients),
390 self.txns_ctx.clone(),
391 self.metrics.clone(),
392 Arc::clone(&self.tracing_handle),
393 self.context.clone(),
394 ));
395 }
396 _ => (),
397 }
398 self.activate_compute().unwrap().handle_compute_command(cmd);
399 }
400
401 fn activate_compute(&mut self) -> Option<ActiveComputeState<'_, A>> {
402 if let Some(compute_state) = &mut self.compute_state {
403 Some(ActiveComputeState {
404 timely_worker: &mut *self.timely_worker,
405 compute_state,
406 response_tx: &mut self.response_tx,
407 })
408 } else {
409 None
410 }
411 }
412
413 /// Receive the next compute command.
414 ///
415 /// This method blocks if no command is currently available, but takes care to step the Timely
416 /// worker while doing so.
417 fn recv_command(&mut self) -> Result<ComputeCommand, NonceChange> {
418 loop {
419 if let Some(cmd) = self.command_rx.try_recv()? {
420 return Ok(cmd);
421 }
422
423 let start = Instant::now();
424 self.timely_worker.step_or_park(None);
425 self.metrics
426 .timely_step_duration_seconds
427 .observe(start.elapsed().as_secs_f64());
428 }
429 }
430
431 /// Extract commands until `InitializationComplete`, and make the worker reflect those commands.
432 ///
433 /// This method is meant to be a function of the commands received thus far (as recorded in the
434 /// compute state command history) and the new commands from `command_rx`. It should not be a
435 /// function of other characteristics, like whether the worker has managed to respond to a peek
436 /// or not. Some effort goes in to narrowing our view to only the existing commands we can be sure
437 /// are live at all other workers.
438 ///
439 /// The methodology here is to drain `command_rx` until an `InitializationComplete`, at which point
440 /// the prior commands are "reconciled" in. Reconciliation takes each goal dataflow and looks for an
441 /// existing "compatible" dataflow (per `compatible()`) it can repurpose, with some additional tests
442 /// to be sure that we can cut over from one to the other (no additional compaction, no tails/sinks).
443 /// With any connections established, old orphaned dataflows are allow to compact away, and any new
444 /// dataflows are created from scratch. "Kept" dataflows are allowed to compact up to any new `as_of`.
445 ///
446 /// Some additional tidying happens, cleaning up pending peeks, reported frontiers, and creating a new
447 /// subscribe response buffer. We will need to be vigilant with future modifications to `ComputeState` to
448 /// line up changes there with clean resets here.
449 fn reconcile(&mut self) -> Result<(), NonceChange> {
450 // To initialize the connection, we want to drain all commands until we receive a
451 // `ComputeCommand::InitializationComplete` command to form a target command state.
452 let mut new_commands = Vec::new();
453 loop {
454 match self.recv_command()? {
455 ComputeCommand::InitializationComplete => break,
456 command => new_commands.push(command),
457 }
458 }
459
460 // Commands we will need to apply before entering normal service.
461 // These commands may include dropping existing dataflows, compacting existing dataflows,
462 // and creating new dataflows, in addition to standard peek and compaction commands.
463 // The result should be the same as if dropping all dataflows and running `new_commands`.
464 let mut todo_commands = Vec::new();
465 // We only have a compute history if we are in an initialized state
466 // (i.e. after a `CreateInstance`).
467 // If this is not the case, just copy `new_commands` into `todo_commands`.
468 if let Some(compute_state) = &mut self.compute_state {
469 // Reduce the installed commands.
470 // Importantly, act as if all peeks may have been retired (as we cannot know otherwise).
471 compute_state.command_history.discard_peeks();
472 compute_state.command_history.reduce();
473
474 // At this point, we need to sort out which of the *certainly installed* dataflows are
475 // suitable replacements for the requested dataflows. A dataflow is "certainly installed"
476 // as of a frontier if its compaction allows it to go no further. We ignore peeks for this
477 // reasoning, as we cannot be certain that peeks still exist at any other worker.
478
479 // Having reduced our installed command history retaining no peeks (above), we should be able
480 // to use track down installed dataflows we can use as surrogates for requested dataflows (which
481 // have retained all of their peeks, creating a more demanding `as_of` requirement).
482 // NB: installed dataflows may still be allowed to further compact, and we should double check
483 // this before being too confident. It should be rare without peeks, but could happen with e.g.
484 // multiple outputs of a dataflow.
485
486 // The values with which a prior `CreateInstance` was called, if it was.
487 let mut old_instance_config = None;
488 // Index dataflows by `export_ids().collect()`, as this is a precondition for their compatibility.
489 let mut old_dataflows = BTreeMap::default();
490 // Maintain allowed compaction, in case installed identifiers may have been allowed to compact.
491 let mut old_frontiers = BTreeMap::default();
492 for command in compute_state.command_history.iter() {
493 match command {
494 ComputeCommand::CreateInstance(config) => {
495 old_instance_config = Some(config);
496 }
497 ComputeCommand::CreateDataflow(dataflow) => {
498 let export_ids = dataflow.export_ids().collect::<BTreeSet<_>>();
499 old_dataflows.insert(export_ids, dataflow);
500 }
501 ComputeCommand::AllowCompaction { id, frontier } => {
502 old_frontiers.insert(id, frontier);
503 }
504 _ => {
505 // Nothing to do in these cases.
506 }
507 }
508 }
509
510 // Compaction commands that can be applied to existing dataflows.
511 let mut old_compaction = BTreeMap::default();
512 // Exported identifiers from dataflows we retain.
513 let mut retain_ids = BTreeSet::default();
514
515 // Traverse new commands, sorting out what remediation we can do.
516 for command in new_commands.iter() {
517 match command {
518 ComputeCommand::CreateDataflow(dataflow) => {
519 // Attempt to find an existing match for the dataflow.
520 let as_of = dataflow.as_of.as_ref().unwrap();
521 let export_ids = dataflow.export_ids().collect::<BTreeSet<_>>();
522
523 if let Some(old_dataflow) = old_dataflows.get(&export_ids) {
524 let compatible = old_dataflow.compatible_with(dataflow);
525 let uncompacted = !export_ids
526 .iter()
527 .flat_map(|id| old_frontiers.get(id))
528 .any(|frontier| {
529 !timely::PartialOrder::less_equal(
530 *frontier,
531 dataflow.as_of.as_ref().unwrap(),
532 )
533 });
534
535 // We cannot reconcile subscribe and copy-to sinks at the moment,
536 // because the response buffer is shared, and to a first approximation
537 // must be completely reformed.
538 let subscribe_free = dataflow.subscribe_ids().next().is_none();
539 let copy_to_free = dataflow.copy_to_ids().next().is_none();
540
541 // If we have replaced any dependency of this dataflow, we need to
542 // replace this dataflow, to make it use the replacement.
543 let dependencies_retained = dataflow
544 .imported_index_ids()
545 .all(|id| retain_ids.contains(&id));
546
547 if compatible
548 && uncompacted
549 && subscribe_free
550 && copy_to_free
551 && dependencies_retained
552 {
553 // Match found; remove the match from the deletion queue,
554 // and compact its outputs to the dataflow's `as_of`.
555 old_dataflows.remove(&export_ids);
556 for id in export_ids.iter() {
557 old_compaction.insert(*id, as_of.clone());
558 }
559 retain_ids.extend(export_ids);
560 } else {
561 warn!(
562 ?export_ids,
563 ?compatible,
564 ?uncompacted,
565 ?subscribe_free,
566 ?copy_to_free,
567 ?dependencies_retained,
568 old_as_of = ?old_dataflow.as_of,
569 new_as_of = ?as_of,
570 "dataflow reconciliation failed",
571 );
572
573 // Dump the full dataflow plans if they are incompatible, to
574 // simplify debugging hard-to-reproduce reconciliation failures.
575 if !compatible {
576 warn!(
577 old = ?old_dataflow,
578 new = ?dataflow,
579 "incompatible dataflows in reconciliation",
580 );
581 }
582
583 todo_commands
584 .push(ComputeCommand::CreateDataflow(dataflow.clone()));
585 }
586
587 compute_state.metrics.record_dataflow_reconciliation(
588 compatible,
589 uncompacted,
590 subscribe_free,
591 copy_to_free,
592 dependencies_retained,
593 );
594 } else {
595 todo_commands.push(ComputeCommand::CreateDataflow(dataflow.clone()));
596 }
597 }
598 ComputeCommand::CreateInstance(config) => {
599 // Cluster creation should not be performed again!
600 if old_instance_config.map_or(false, |old| !old.compatible_with(config)) {
601 halt!(
602 "new instance configuration not compatible with existing instance configuration:\n{:?}\nvs\n{:?}",
603 config,
604 old_instance_config,
605 );
606 }
607 }
608 // All other commands we apply as requested.
609 command => {
610 todo_commands.push(command.clone());
611 }
612 }
613 }
614
615 // Issue compaction commands first to reclaim resources.
616 for (_, dataflow) in old_dataflows.iter() {
617 for id in dataflow.export_ids() {
618 // We want to drop anything that has not yet been dropped,
619 // and nothing that has already been dropped.
620 if old_frontiers.get(&id) != Some(&&Antichain::new()) {
621 old_compaction.insert(id, Antichain::new());
622 }
623 }
624 }
625 for (&id, frontier) in &old_compaction {
626 let frontier = frontier.clone();
627 todo_commands.insert(0, ComputeCommand::AllowCompaction { id, frontier });
628 }
629
630 // Clean up worker-local state.
631 //
632 // Various aspects of `ComputeState` need to be either uninstalled, or return to a blank slate.
633 // All dropped dataflows should clean up after themselves, as we plan to install new dataflows
634 // re-using the same identifiers.
635 // All re-used dataflows should roll back any believed communicated information (e.g. frontiers)
636 // so that they recommunicate that information as if from scratch.
637
638 // Remove all pending peeks.
639 for (_, peek) in std::mem::take(&mut compute_state.pending_peeks) {
640 // Log dropping the peek request.
641 if let Some(logger) = compute_state.compute_logger.as_mut() {
642 logger.log(&peek.as_log_event(false));
643 }
644 }
645
646 for (&id, collection) in compute_state.collections.iter_mut() {
647 // Adjust reported frontiers:
648 // * For dataflows we continue to use, reset to ensure we report something not
649 // before the new `as_of` next.
650 // * For dataflows we drop, set to the empty frontier, to ensure we don't report
651 // anything for them.
652 let retained = retain_ids.contains(&id);
653 let compaction = old_compaction.remove(&id);
654 let new_reported_frontier = match (retained, compaction) {
655 (true, Some(new_as_of)) => ReportedFrontier::NotReported { lower: new_as_of },
656 (true, None) => {
657 unreachable!("retained dataflows are compacted to the new as_of")
658 }
659 (false, Some(new_frontier)) => {
660 assert!(new_frontier.is_empty());
661 ReportedFrontier::Reported(new_frontier)
662 }
663 (false, None) => {
664 // Logging dataflows are implicitly retained and don't have a new as_of.
665 // Reset them to the minimal frontier.
666 ReportedFrontier::new()
667 }
668 };
669
670 collection.reset_reported_frontiers(new_reported_frontier);
671
672 // Sink tokens should be retained for retained dataflows, and dropped for dropped
673 // dataflows.
674 //
675 // Dropping the tokens of active subscribe and copy-tos makes them place
676 // `DroppedAt` responses into the respective response buffer. We drop those buffers
677 // in the next step, which ensures that we don't send out `DroppedAt` responses for
678 // subscribe/copy-tos dropped during reconciliation.
679 if !retained {
680 collection.sink_token = None;
681 }
682 }
683
684 // We must drop the response buffers as they are global across all subscribe/copy-tos.
685 // If they were broken out by `GlobalId` then we could drop only the response buffers
686 // of dataflows we drop.
687 compute_state.subscribe_response_buffer = Rc::new(RefCell::new(Vec::new()));
688 compute_state.copy_to_response_buffer = Rc::new(RefCell::new(Vec::new()));
689
690 // The controller expects the logging collections to be readable from the minimum time
691 // initially. We cannot recreate the logging arrangements without restarting the
692 // instance, but we can pad the compacted times with empty data. Doing so is sound
693 // because logging collections from different replica incarnations are considered
694 // distinct TVCs, so the controller doesn't expect any historical consistency from
695 // these collections when it reconnects to a replica.
696 //
697 // TODO(database-issues#8152): Consider resolving this with controller-side reconciliation instead.
698 if let Some(config) = old_instance_config {
699 for id in config.logging.index_logs.values() {
700 let trace = compute_state
701 .traces
702 .remove(id)
703 .expect("logging trace exists");
704 let padded = trace.into_padded();
705 compute_state.traces.set(*id, padded);
706 }
707 }
708 } else {
709 todo_commands.clone_from(&new_commands);
710 }
711
712 // Execute the commands to bring us to `new_commands`.
713 for command in todo_commands.into_iter() {
714 self.handle_command(command);
715 }
716
717 // Overwrite `self.command_history` to reflect `new_commands`.
718 // It is possible that there still isn't a compute state yet.
719 if let Some(compute_state) = &mut self.compute_state {
720 let mut command_history = ComputeCommandHistory::new(self.metrics.for_history());
721 for command in new_commands.iter() {
722 command_history.push(command.clone());
723 }
724 compute_state.command_history = command_history;
725 }
726 Ok(())
727 }
728}
729
730/// Spawn a task to bridge between [`ClusterClient`] and [`Worker`] channels.
731///
732/// The [`Worker`] expects a pair of persistent channels, with punctuation marking reconnects,
733/// while the [`ClusterClient`] provides a new pair of channels on each reconnect.
734fn spawn_channel_adapter(
735 mut client_rx: mpsc::UnboundedReceiver<(
736 Uuid,
737 mpsc::UnboundedReceiver<ComputeCommand>,
738 mpsc::UnboundedSender<ComputeResponse>,
739 )>,
740 command_tx: command_channel::Sender,
741 mut response_rx: mpsc::UnboundedReceiver<(ComputeResponse, Uuid)>,
742 worker_id: usize,
743) {
744 mz_ore::task::spawn(
745 || format!("compute-channel-adapter-{worker_id}"),
746 async move {
747 // To make workers aware of the individual client connections, we tag forwarded
748 // commands with the client nonce. Additionally, we use the nonce to filter out
749 // responses with a different nonce, which are intended for different client
750 // connections.
751 //
752 // It's possible that we receive responses with nonces from the past but also from the
753 // future: Worker 0 might have received a new nonce before us and broadcasted it to our
754 // Timely cluster. When we receive a response with a future nonce, we need to wait with
755 // forwarding it until we have received the same nonce from a client connection.
756 //
757 // Nonces are not ordered so we don't know whether a response nonce is from the past or
758 // the future. We thus assume that every response with an unknown nonce might be from
759 // the future and stash them all. Every time we reconnect, we immediately send all
760 // stashed responses with a matching nonce. Every time we receive a new response with a
761 // nonce that matches our current one, we can discard the entire response stash as we
762 // know that all stashed responses must be from the past.
763 let mut stashed_responses = BTreeMap::<Uuid, Vec<ComputeResponse>>::new();
764
765 while let Some((nonce, mut command_rx, response_tx)) = client_rx.recv().await {
766 // Send stashed responses for this client.
767 if let Some(resps) = stashed_responses.remove(&nonce) {
768 for resp in resps {
769 let _ = response_tx.send(resp);
770 }
771 }
772
773 // Wait for a new response while forwarding received commands.
774 let mut serve_rx_channels = async || loop {
775 tokio::select! {
776 msg = command_rx.recv() => match msg {
777 Some(cmd) => command_tx.send((cmd, nonce)),
778 None => return Err(()),
779 },
780 msg = response_rx.recv() => {
781 return Ok(msg.expect("worker connected"));
782 }
783 }
784 };
785
786 // Serve this connection until we see any of the channels disconnect.
787 loop {
788 let Ok((resp, resp_nonce)) = serve_rx_channels().await else {
789 break;
790 };
791
792 if resp_nonce == nonce {
793 // Response for the current connection; forward it.
794 stashed_responses.clear();
795 if response_tx.send(resp).is_err() {
796 break;
797 }
798 } else {
799 // Response for a past or future connection; stash it.
800 let stash = stashed_responses.entry(resp_nonce).or_default();
801 stash.push(resp);
802 }
803 }
804 }
805 },
806 );
807}