mz_compute/server.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! An interactive dataflow server.
11
12use std::cell::RefCell;
13use std::collections::{BTreeMap, BTreeSet};
14use std::convert::Infallible;
15use std::fmt::Debug;
16use std::path::PathBuf;
17use std::rc::Rc;
18use std::sync::{Arc, Mutex};
19use std::time::{Duration, Instant};
20
21use anyhow::Error;
22use mz_cluster::client::{ClusterClient, ClusterSpec};
23use mz_cluster_client::client::TimelyConfig;
24use mz_compute_client::protocol::command::ComputeCommand;
25use mz_compute_client::protocol::history::ComputeCommandHistory;
26use mz_compute_client::protocol::response::ComputeResponse;
27use mz_compute_client::service::ComputeClient;
28use mz_ore::halt;
29use mz_ore::metrics::MetricsRegistry;
30use mz_ore::tracing::TracingHandle;
31use mz_persist_client::cache::PersistClientCache;
32use mz_storage_types::connections::ConnectionContext;
33use mz_txn_wal::operator::TxnsContext;
34use timely::communication::Allocate;
35use timely::progress::Antichain;
36use timely::worker::Worker as TimelyWorker;
37use tokio::sync::mpsc;
38use tokio::sync::mpsc::error::SendError;
39use tracing::{info, trace, warn};
40use uuid::Uuid;
41
42use crate::command_channel;
43use crate::compute_state::{ActiveComputeState, ComputeState, ReportedFrontier};
44use crate::metrics::{ComputeMetrics, WorkerMetrics};
45
46/// Caller-provided configuration for compute.
47#[derive(Clone, Debug)]
48pub struct ComputeInstanceContext {
49 /// A directory that can be used for scratch work.
50 pub scratch_directory: Option<PathBuf>,
51 /// Whether to set core affinity for Timely workers.
52 pub worker_core_affinity: bool,
53 /// Context required to connect to an external sink from compute,
54 /// like the `CopyToS3OneshotSink` compute sink.
55 pub connection_context: ConnectionContext,
56}
57
58/// Configures the server with compute-specific metrics.
59#[derive(Debug, Clone)]
60struct Config {
61 /// `persist` client cache.
62 pub persist_clients: Arc<PersistClientCache>,
63 /// Context necessary for rendering txn-wal operators.
64 pub txns_ctx: TxnsContext,
65 /// A process-global handle to tracing configuration.
66 pub tracing_handle: Arc<TracingHandle>,
67 /// Metrics exposed by compute replicas.
68 pub metrics: ComputeMetrics,
69 /// Other configuration for compute.
70 pub context: ComputeInstanceContext,
71}
72
73/// Initiates a timely dataflow computation, processing compute commands.
74pub async fn serve(
75 timely_config: TimelyConfig,
76 metrics_registry: &MetricsRegistry,
77 persist_clients: Arc<PersistClientCache>,
78 txns_ctx: TxnsContext,
79 tracing_handle: Arc<TracingHandle>,
80 context: ComputeInstanceContext,
81) -> Result<impl Fn() -> Box<dyn ComputeClient> + use<>, Error> {
82 let config = Config {
83 persist_clients,
84 txns_ctx,
85 tracing_handle,
86 metrics: ComputeMetrics::register_with(metrics_registry),
87 context,
88 };
89 let tokio_executor = tokio::runtime::Handle::current();
90
91 let timely_container = config.build_cluster(timely_config, tokio_executor).await?;
92 let timely_container = Arc::new(Mutex::new(timely_container));
93
94 let client_builder = move || {
95 let client = ClusterClient::new(Arc::clone(&timely_container));
96 let client: Box<dyn ComputeClient> = Box::new(client);
97 client
98 };
99
100 Ok(client_builder)
101}
102
103/// Error type returned on connection nonce changes.
104///
105/// A nonce change informs workers that subsequent commands come a from a new client connection
106/// and therefore require reconciliation.
107struct NonceChange(Uuid);
108
109/// Endpoint used by workers to receive compute commands.
110///
111/// Observes nonce changes in the command stream and converts them into receive errors.
112struct CommandReceiver {
113 /// The channel supplying commands.
114 inner: command_channel::Receiver,
115 /// The ID of the Timely worker.
116 worker_id: usize,
117 /// The nonce identifying the current cluster protocol incarnation.
118 nonce: Option<Uuid>,
119 /// A stash to enable peeking the next command, used in `try_recv`.
120 stashed_command: Option<ComputeCommand>,
121}
122
123impl CommandReceiver {
124 fn new(inner: command_channel::Receiver, worker_id: usize) -> Self {
125 Self {
126 inner,
127 worker_id,
128 nonce: None,
129 stashed_command: None,
130 }
131 }
132
133 /// Receive the next pending command, if any.
134 ///
135 /// If the next command has a different nonce, this method instead returns an `Err`
136 /// containing the new nonce.
137 fn try_recv(&mut self) -> Result<Option<ComputeCommand>, NonceChange> {
138 if let Some(command) = self.stashed_command.take() {
139 return Ok(Some(command));
140 }
141 let Some((command, nonce)) = self.inner.try_recv() else {
142 return Ok(None);
143 };
144
145 trace!(worker = self.worker_id, %nonce, ?command, "received command");
146
147 if Some(nonce) == self.nonce {
148 Ok(Some(command))
149 } else {
150 self.nonce = Some(nonce);
151 self.stashed_command = Some(command);
152 Err(NonceChange(nonce))
153 }
154 }
155}
156
157/// Endpoint used by workers to send sending compute responses.
158///
159/// Tags responses with the current nonce, allowing receivers to filter out responses intended for
160/// previous client connections.
161pub(crate) struct ResponseSender {
162 /// The channel consuming responses.
163 inner: mpsc::UnboundedSender<(ComputeResponse, Uuid)>,
164 /// The ID of the Timely worker.
165 worker_id: usize,
166 /// The nonce identifying the current cluster protocol incarnation.
167 nonce: Option<Uuid>,
168}
169
170impl ResponseSender {
171 fn new(inner: mpsc::UnboundedSender<(ComputeResponse, Uuid)>, worker_id: usize) -> Self {
172 Self {
173 inner,
174 worker_id,
175 nonce: None,
176 }
177 }
178
179 /// Set the cluster protocol nonce.
180 fn set_nonce(&mut self, nonce: Uuid) {
181 self.nonce = Some(nonce);
182 }
183
184 /// Send a compute response.
185 pub fn send(&self, response: ComputeResponse) -> Result<(), SendError<ComputeResponse>> {
186 let nonce = self.nonce.expect("nonce must be initialized");
187
188 trace!(worker = self.worker_id, %nonce, ?response, "sending response");
189 self.inner
190 .send((response, nonce))
191 .map_err(|SendError((resp, _))| SendError(resp))
192 }
193}
194
195/// State maintained for each worker thread.
196///
197/// Much of this state can be viewed as local variables for the worker thread,
198/// holding state that persists across function calls.
199struct Worker<'w, A: Allocate> {
200 /// The underlying Timely worker.
201 timely_worker: &'w mut TimelyWorker<A>,
202 /// The channel over which commands are received.
203 command_rx: CommandReceiver,
204 /// The channel over which responses are sent.
205 response_tx: ResponseSender,
206 compute_state: Option<ComputeState>,
207 /// Compute metrics.
208 metrics: WorkerMetrics,
209 /// A process-global cache of (blob_uri, consensus_uri) -> PersistClient.
210 /// This is intentionally shared between workers
211 persist_clients: Arc<PersistClientCache>,
212 /// Context necessary for rendering txn-wal operators.
213 txns_ctx: TxnsContext,
214 /// A process-global handle to tracing configuration.
215 tracing_handle: Arc<TracingHandle>,
216 context: ComputeInstanceContext,
217}
218
219impl ClusterSpec for Config {
220 type Command = ComputeCommand;
221 type Response = ComputeResponse;
222
223 const NAME: &str = "compute";
224
225 fn run_worker<A: Allocate + 'static>(
226 &self,
227 timely_worker: &mut TimelyWorker<A>,
228 client_rx: mpsc::UnboundedReceiver<(
229 Uuid,
230 mpsc::UnboundedReceiver<ComputeCommand>,
231 mpsc::UnboundedSender<ComputeResponse>,
232 )>,
233 ) {
234 if self.context.worker_core_affinity {
235 set_core_affinity(timely_worker.index());
236 }
237
238 let worker_id = timely_worker.index();
239 let metrics = self.metrics.for_worker(worker_id);
240
241 // Create the command channel that broadcasts commands from worker 0 to other workers. We
242 // reuse this channel between client connections, to avoid bugs where different workers end
243 // up creating incompatible sides of the channel dataflow after reconnects.
244 // See database-issues#8964.
245 let (cmd_tx, cmd_rx) = command_channel::render(timely_worker);
246 let (resp_tx, resp_rx) = mpsc::unbounded_channel();
247
248 spawn_channel_adapter(client_rx, cmd_tx, resp_rx, worker_id);
249
250 Worker {
251 timely_worker,
252 command_rx: CommandReceiver::new(cmd_rx, worker_id),
253 response_tx: ResponseSender::new(resp_tx, worker_id),
254 metrics,
255 context: self.context.clone(),
256 persist_clients: Arc::clone(&self.persist_clients),
257 txns_ctx: self.txns_ctx.clone(),
258 compute_state: None,
259 tracing_handle: Arc::clone(&self.tracing_handle),
260 }
261 .run()
262 }
263}
264
265/// Set the current thread's core affinity, based on the given `worker_id`.
266#[cfg(not(target_os = "macos"))]
267fn set_core_affinity(worker_id: usize) {
268 use tracing::error;
269
270 let Some(mut core_ids) = core_affinity::get_core_ids() else {
271 error!(worker_id, "unable to get core IDs for setting affinity");
272 return;
273 };
274
275 // The `get_core_ids` docs don't say anything about a guaranteed order of the returned Vec,
276 // so sort it just to be safe.
277 core_ids.sort_unstable_by_key(|i| i.id);
278
279 // On multi-process replicas `worker_id` might be greater than the number of available cores.
280 // However, we assume that we always have at least as many cores as there are local workers.
281 // Violating this assumption is safe but might lead to degraded performance due to skew in core
282 // utilization.
283 let idx = worker_id % core_ids.len();
284 let core_id = core_ids[idx];
285
286 if core_affinity::set_for_current(core_id) {
287 info!(
288 worker_id,
289 core_id = core_id.id,
290 "set core affinity for worker"
291 );
292 } else {
293 error!(
294 worker_id,
295 core_id = core_id.id,
296 "failed to set core affinity for worker"
297 )
298 }
299}
300
301/// Set the current thread's core affinity, based on the given `worker_id`.
302#[cfg(target_os = "macos")]
303fn set_core_affinity(_worker_id: usize) {
304 // Setting core affinity is known to not work on Apple Silicon:
305 // https://github.com/Elzair/core_affinity_rs/issues/22
306 info!("setting core affinity is not supported on macOS");
307}
308
309impl<'w, A: Allocate + 'static> Worker<'w, A> {
310 /// Runs a compute worker.
311 pub fn run(&mut self) {
312 // The command receiver is initialized without an nonce, so receiving the first command
313 // always triggers a nonce change.
314 let NonceChange(nonce) = self.recv_command().expect_err("change to first nonce");
315 self.set_nonce(nonce);
316
317 loop {
318 let Err(NonceChange(nonce)) = self.run_client();
319 self.set_nonce(nonce);
320 }
321 }
322
323 fn set_nonce(&mut self, nonce: Uuid) {
324 self.response_tx.set_nonce(nonce);
325 }
326
327 /// Handles commands for a client connection, returns when the nonce changes.
328 fn run_client(&mut self) -> Result<Infallible, NonceChange> {
329 self.reconcile()?;
330
331 // The last time we did periodic maintenance.
332 let mut last_maintenance = Instant::now();
333
334 // Commence normal operation.
335 loop {
336 // Get the maintenance interval, default to zero if we don't have a compute state.
337 let maintenance_interval = self
338 .compute_state
339 .as_ref()
340 .map_or(Duration::ZERO, |state| state.server_maintenance_interval);
341
342 let now = Instant::now();
343 // Determine if we need to perform maintenance, which is true if `maintenance_interval`
344 // time has passed since the last maintenance.
345 let sleep_duration;
346 if now >= last_maintenance + maintenance_interval {
347 last_maintenance = now;
348 sleep_duration = None;
349
350 // Report frontier information back the coordinator.
351 if let Some(mut compute_state) = self.activate_compute() {
352 compute_state.compute_state.traces.maintenance();
353 compute_state.report_frontiers();
354 compute_state.report_metrics();
355 compute_state.check_expiration();
356 }
357
358 self.metrics.record_shared_row_metrics();
359 } else {
360 // We didn't perform maintenance, sleep until the next maintenance interval.
361 let next_maintenance = last_maintenance + maintenance_interval;
362 sleep_duration = Some(next_maintenance.saturating_duration_since(now))
363 };
364
365 // Step the timely worker, recording the time taken.
366 let timer = self.metrics.timely_step_duration_seconds.start_timer();
367 self.timely_worker.step_or_park(sleep_duration);
368 timer.observe_duration();
369
370 self.handle_pending_commands()?;
371
372 if let Some(mut compute_state) = self.activate_compute() {
373 compute_state.process_peeks();
374 compute_state.process_subscribes();
375 compute_state.process_copy_tos();
376 }
377 }
378 }
379
380 fn handle_pending_commands(&mut self) -> Result<(), NonceChange> {
381 while let Some(cmd) = self.command_rx.try_recv()? {
382 self.handle_command(cmd);
383 }
384 Ok(())
385 }
386
387 fn handle_command(&mut self, cmd: ComputeCommand) {
388 match &cmd {
389 ComputeCommand::CreateInstance(_) => {
390 self.compute_state = Some(ComputeState::new(
391 Arc::clone(&self.persist_clients),
392 self.txns_ctx.clone(),
393 self.metrics.clone(),
394 Arc::clone(&self.tracing_handle),
395 self.context.clone(),
396 ));
397 }
398 _ => (),
399 }
400 self.activate_compute().unwrap().handle_compute_command(cmd);
401 }
402
403 fn activate_compute(&mut self) -> Option<ActiveComputeState<'_, A>> {
404 if let Some(compute_state) = &mut self.compute_state {
405 Some(ActiveComputeState {
406 timely_worker: &mut *self.timely_worker,
407 compute_state,
408 response_tx: &mut self.response_tx,
409 })
410 } else {
411 None
412 }
413 }
414
415 /// Receive the next compute command.
416 ///
417 /// This method blocks if no command is currently available, but takes care to step the Timely
418 /// worker while doing so.
419 fn recv_command(&mut self) -> Result<ComputeCommand, NonceChange> {
420 loop {
421 if let Some(cmd) = self.command_rx.try_recv()? {
422 return Ok(cmd);
423 }
424
425 let start = Instant::now();
426 self.timely_worker.step_or_park(None);
427 self.metrics
428 .timely_step_duration_seconds
429 .observe(start.elapsed().as_secs_f64());
430 }
431 }
432
433 /// Extract commands until `InitializationComplete`, and make the worker reflect those commands.
434 ///
435 /// This method is meant to be a function of the commands received thus far (as recorded in the
436 /// compute state command history) and the new commands from `command_rx`. It should not be a
437 /// function of other characteristics, like whether the worker has managed to respond to a peek
438 /// or not. Some effort goes in to narrowing our view to only the existing commands we can be sure
439 /// are live at all other workers.
440 ///
441 /// The methodology here is to drain `command_rx` until an `InitializationComplete`, at which point
442 /// the prior commands are "reconciled" in. Reconciliation takes each goal dataflow and looks for an
443 /// existing "compatible" dataflow (per `compatible()`) it can repurpose, with some additional tests
444 /// to be sure that we can cut over from one to the other (no additional compaction, no tails/sinks).
445 /// With any connections established, old orphaned dataflows are allow to compact away, and any new
446 /// dataflows are created from scratch. "Kept" dataflows are allowed to compact up to any new `as_of`.
447 ///
448 /// Some additional tidying happens, cleaning up pending peeks, reported frontiers, and creating a new
449 /// subscribe response buffer. We will need to be vigilant with future modifications to `ComputeState` to
450 /// line up changes there with clean resets here.
451 fn reconcile(&mut self) -> Result<(), NonceChange> {
452 // To initialize the connection, we want to drain all commands until we receive a
453 // `ComputeCommand::InitializationComplete` command to form a target command state.
454 let mut new_commands = Vec::new();
455 loop {
456 match self.recv_command()? {
457 ComputeCommand::InitializationComplete => break,
458 command => new_commands.push(command),
459 }
460 }
461
462 // Commands we will need to apply before entering normal service.
463 // These commands may include dropping existing dataflows, compacting existing dataflows,
464 // and creating new dataflows, in addition to standard peek and compaction commands.
465 // The result should be the same as if dropping all dataflows and running `new_commands`.
466 let mut todo_commands = Vec::new();
467 // We only have a compute history if we are in an initialized state
468 // (i.e. after a `CreateInstance`).
469 // If this is not the case, just copy `new_commands` into `todo_commands`.
470 if let Some(compute_state) = &mut self.compute_state {
471 // Reduce the installed commands.
472 // Importantly, act as if all peeks may have been retired (as we cannot know otherwise).
473 compute_state.command_history.discard_peeks();
474 compute_state.command_history.reduce();
475
476 // At this point, we need to sort out which of the *certainly installed* dataflows are
477 // suitable replacements for the requested dataflows. A dataflow is "certainly installed"
478 // as of a frontier if its compaction allows it to go no further. We ignore peeks for this
479 // reasoning, as we cannot be certain that peeks still exist at any other worker.
480
481 // Having reduced our installed command history retaining no peeks (above), we should be able
482 // to use track down installed dataflows we can use as surrogates for requested dataflows (which
483 // have retained all of their peeks, creating a more demanding `as_of` requirement).
484 // NB: installed dataflows may still be allowed to further compact, and we should double check
485 // this before being too confident. It should be rare without peeks, but could happen with e.g.
486 // multiple outputs of a dataflow.
487
488 // The values with which a prior `CreateInstance` was called, if it was.
489 let mut old_instance_config = None;
490 // Index dataflows by `export_ids().collect()`, as this is a precondition for their compatibility.
491 let mut old_dataflows = BTreeMap::default();
492 // Maintain allowed compaction, in case installed identifiers may have been allowed to compact.
493 let mut old_frontiers = BTreeMap::default();
494 for command in compute_state.command_history.iter() {
495 match command {
496 ComputeCommand::CreateInstance(config) => {
497 old_instance_config = Some(config);
498 }
499 ComputeCommand::CreateDataflow(dataflow) => {
500 let export_ids = dataflow.export_ids().collect::<BTreeSet<_>>();
501 old_dataflows.insert(export_ids, dataflow);
502 }
503 ComputeCommand::AllowCompaction { id, frontier } => {
504 old_frontiers.insert(id, frontier);
505 }
506 _ => {
507 // Nothing to do in these cases.
508 }
509 }
510 }
511
512 // Compaction commands that can be applied to existing dataflows.
513 let mut old_compaction = BTreeMap::default();
514 // Exported identifiers from dataflows we retain.
515 let mut retain_ids = BTreeSet::default();
516
517 // Traverse new commands, sorting out what remediation we can do.
518 for command in new_commands.iter() {
519 match command {
520 ComputeCommand::CreateDataflow(dataflow) => {
521 // Attempt to find an existing match for the dataflow.
522 let as_of = dataflow.as_of.as_ref().unwrap();
523 let export_ids = dataflow.export_ids().collect::<BTreeSet<_>>();
524
525 if let Some(old_dataflow) = old_dataflows.get(&export_ids) {
526 let compatible = old_dataflow.compatible_with(dataflow);
527 let uncompacted = !export_ids
528 .iter()
529 .flat_map(|id| old_frontiers.get(id))
530 .any(|frontier| {
531 !timely::PartialOrder::less_equal(
532 *frontier,
533 dataflow.as_of.as_ref().unwrap(),
534 )
535 });
536
537 // We cannot reconcile subscribe and copy-to sinks at the moment,
538 // because the response buffer is shared, and to a first approximation
539 // must be completely reformed.
540 let subscribe_free = dataflow.subscribe_ids().next().is_none();
541 let copy_to_free = dataflow.copy_to_ids().next().is_none();
542
543 // If we have replaced any dependency of this dataflow, we need to
544 // replace this dataflow, to make it use the replacement.
545 let dependencies_retained = dataflow
546 .imported_index_ids()
547 .all(|id| retain_ids.contains(&id));
548
549 if compatible
550 && uncompacted
551 && subscribe_free
552 && copy_to_free
553 && dependencies_retained
554 {
555 // Match found; remove the match from the deletion queue,
556 // and compact its outputs to the dataflow's `as_of`.
557 old_dataflows.remove(&export_ids);
558 for id in export_ids.iter() {
559 old_compaction.insert(*id, as_of.clone());
560 }
561 retain_ids.extend(export_ids);
562 } else {
563 warn!(
564 ?export_ids,
565 ?compatible,
566 ?uncompacted,
567 ?subscribe_free,
568 ?copy_to_free,
569 ?dependencies_retained,
570 old_as_of = ?old_dataflow.as_of,
571 new_as_of = ?as_of,
572 "dataflow reconciliation failed",
573 );
574
575 // Dump the full dataflow plans if they are incompatible, to
576 // simplify debugging hard-to-reproduce reconciliation failures.
577 if !compatible {
578 warn!(
579 old = ?old_dataflow,
580 new = ?dataflow,
581 "incompatible dataflows in reconciliation",
582 );
583 }
584
585 todo_commands
586 .push(ComputeCommand::CreateDataflow(dataflow.clone()));
587 }
588
589 compute_state.metrics.record_dataflow_reconciliation(
590 compatible,
591 uncompacted,
592 subscribe_free,
593 copy_to_free,
594 dependencies_retained,
595 );
596 } else {
597 todo_commands.push(ComputeCommand::CreateDataflow(dataflow.clone()));
598 }
599 }
600 ComputeCommand::CreateInstance(config) => {
601 // Cluster creation should not be performed again!
602 if old_instance_config.map_or(false, |old| !old.compatible_with(config)) {
603 halt!(
604 "new instance configuration not compatible with existing instance configuration:\n{:?}\nvs\n{:?}",
605 config,
606 old_instance_config,
607 );
608 }
609 }
610 // All other commands we apply as requested.
611 command => {
612 todo_commands.push(command.clone());
613 }
614 }
615 }
616
617 // Issue compaction commands first to reclaim resources.
618 for (_, dataflow) in old_dataflows.iter() {
619 for id in dataflow.export_ids() {
620 // We want to drop anything that has not yet been dropped,
621 // and nothing that has already been dropped.
622 if old_frontiers.get(&id) != Some(&&Antichain::new()) {
623 old_compaction.insert(id, Antichain::new());
624 }
625 }
626 }
627 for (&id, frontier) in &old_compaction {
628 let frontier = frontier.clone();
629 todo_commands.insert(0, ComputeCommand::AllowCompaction { id, frontier });
630 }
631
632 // Clean up worker-local state.
633 //
634 // Various aspects of `ComputeState` need to be either uninstalled, or return to a blank slate.
635 // All dropped dataflows should clean up after themselves, as we plan to install new dataflows
636 // re-using the same identifiers.
637 // All re-used dataflows should roll back any believed communicated information (e.g. frontiers)
638 // so that they recommunicate that information as if from scratch.
639
640 // Remove all pending peeks.
641 for (_, peek) in std::mem::take(&mut compute_state.pending_peeks) {
642 // Log dropping the peek request.
643 if let Some(logger) = compute_state.compute_logger.as_mut() {
644 logger.log(&peek.as_log_event(false));
645 }
646 }
647
648 for (&id, collection) in compute_state.collections.iter_mut() {
649 // Adjust reported frontiers:
650 // * For dataflows we continue to use, reset to ensure we report something not
651 // before the new `as_of` next.
652 // * For dataflows we drop, set to the empty frontier, to ensure we don't report
653 // anything for them.
654 let retained = retain_ids.contains(&id);
655 let compaction = old_compaction.remove(&id);
656 let new_reported_frontier = match (retained, compaction) {
657 (true, Some(new_as_of)) => ReportedFrontier::NotReported { lower: new_as_of },
658 (true, None) => {
659 unreachable!("retained dataflows are compacted to the new as_of")
660 }
661 (false, Some(new_frontier)) => {
662 assert!(new_frontier.is_empty());
663 ReportedFrontier::Reported(new_frontier)
664 }
665 (false, None) => {
666 // Logging dataflows are implicitly retained and don't have a new as_of.
667 // Reset them to the minimal frontier.
668 ReportedFrontier::new()
669 }
670 };
671
672 collection.reset_reported_frontiers(new_reported_frontier);
673
674 // Sink tokens should be retained for retained dataflows, and dropped for dropped
675 // dataflows.
676 //
677 // Dropping the tokens of active subscribe and copy-tos makes them place
678 // `DroppedAt` responses into the respective response buffer. We drop those buffers
679 // in the next step, which ensures that we don't send out `DroppedAt` responses for
680 // subscribe/copy-tos dropped during reconciliation.
681 if !retained {
682 collection.sink_token = None;
683 }
684 }
685
686 // We must drop the response buffers as they are global across all subscribe/copy-tos.
687 // If they were broken out by `GlobalId` then we could drop only the response buffers
688 // of dataflows we drop.
689 compute_state.subscribe_response_buffer = Rc::new(RefCell::new(Vec::new()));
690 compute_state.copy_to_response_buffer = Rc::new(RefCell::new(Vec::new()));
691
692 // The controller expects the logging collections to be readable from the minimum time
693 // initially. We cannot recreate the logging arrangements without restarting the
694 // instance, but we can pad the compacted times with empty data. Doing so is sound
695 // because logging collections from different replica incarnations are considered
696 // distinct TVCs, so the controller doesn't expect any historical consistency from
697 // these collections when it reconnects to a replica.
698 //
699 // TODO(database-issues#8152): Consider resolving this with controller-side reconciliation instead.
700 if let Some(config) = old_instance_config {
701 for id in config.logging.index_logs.values() {
702 let trace = compute_state
703 .traces
704 .remove(id)
705 .expect("logging trace exists");
706 let padded = trace.into_padded();
707 compute_state.traces.set(*id, padded);
708 }
709 }
710 } else {
711 todo_commands.clone_from(&new_commands);
712 }
713
714 // Execute the commands to bring us to `new_commands`.
715 for command in todo_commands.into_iter() {
716 self.handle_command(command);
717 }
718
719 // Overwrite `self.command_history` to reflect `new_commands`.
720 // It is possible that there still isn't a compute state yet.
721 if let Some(compute_state) = &mut self.compute_state {
722 let mut command_history = ComputeCommandHistory::new(self.metrics.for_history());
723 for command in new_commands.iter() {
724 command_history.push(command.clone());
725 }
726 compute_state.command_history = command_history;
727 }
728 Ok(())
729 }
730}
731
732/// Spawn a task to bridge between [`ClusterClient`] and [`Worker`] channels.
733///
734/// The [`Worker`] expects a pair of persistent channels, with punctuation marking reconnects,
735/// while the [`ClusterClient`] provides a new pair of channels on each reconnect.
736fn spawn_channel_adapter(
737 mut client_rx: mpsc::UnboundedReceiver<(
738 Uuid,
739 mpsc::UnboundedReceiver<ComputeCommand>,
740 mpsc::UnboundedSender<ComputeResponse>,
741 )>,
742 command_tx: command_channel::Sender,
743 mut response_rx: mpsc::UnboundedReceiver<(ComputeResponse, Uuid)>,
744 worker_id: usize,
745) {
746 mz_ore::task::spawn(
747 || format!("compute-channel-adapter-{worker_id}"),
748 async move {
749 // To make workers aware of the individual client connections, we tag forwarded
750 // commands with the client nonce. Additionally, we use the nonce to filter out
751 // responses with a different nonce, which are intended for different client
752 // connections.
753 //
754 // It's possible that we receive responses with nonces from the past but also from the
755 // future: Worker 0 might have received a new nonce before us and broadcasted it to our
756 // Timely cluster. When we receive a response with a future nonce, we need to wait with
757 // forwarding it until we have received the same nonce from a client connection.
758 //
759 // Nonces are not ordered so we don't know whether a response nonce is from the past or
760 // the future. We thus assume that every response with an unknown nonce might be from
761 // the future and stash them all. Every time we reconnect, we immediately send all
762 // stashed responses with a matching nonce. Every time we receive a new response with a
763 // nonce that matches our current one, we can discard the entire response stash as we
764 // know that all stashed responses must be from the past.
765 let mut stashed_responses = BTreeMap::<Uuid, Vec<ComputeResponse>>::new();
766
767 while let Some((nonce, mut command_rx, response_tx)) = client_rx.recv().await {
768 // Send stashed responses for this client.
769 if let Some(resps) = stashed_responses.remove(&nonce) {
770 for resp in resps {
771 let _ = response_tx.send(resp);
772 }
773 }
774
775 // Wait for a new response while forwarding received commands.
776 let mut serve_rx_channels = async || loop {
777 tokio::select! {
778 msg = command_rx.recv() => match msg {
779 Some(cmd) => command_tx.send((cmd, nonce)),
780 None => return Err(()),
781 },
782 msg = response_rx.recv() => {
783 return Ok(msg.expect("worker connected"));
784 }
785 }
786 };
787
788 // Serve this connection until we see any of the channels disconnect.
789 loop {
790 let Ok((resp, resp_nonce)) = serve_rx_channels().await else {
791 break;
792 };
793
794 if resp_nonce == nonce {
795 // Response for the current connection; forward it.
796 stashed_responses.clear();
797 if response_tx.send(resp).is_err() {
798 break;
799 }
800 } else {
801 // Response for a past or future connection; stash it.
802 let stash = stashed_responses.entry(resp_nonce).or_default();
803 stash.push(resp);
804 }
805 }
806 }
807 },
808 );
809}