1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
910//! A channel for broadcasting compute commands from worker 0 to other workers.
11//!
12//! Compute uses a dataflow to distribute commands between workers. This is to ensure workers
13//! retain a consistent dataflow state across reconnects. If each worker would receive its commands
14//! directly from the controller, there wouldn't be any guarantee that after a reconnect all
15//! workers have seen the same sequence of commands. This is particularly problematic for
16//! `CreateDataflow` commands, since Timely requires that all workers render the same dataflows in
17//! the same order. So the controller instead sends commands only to worker 0, which then
18//! broadcasts them to other workers through the Timely fabric, taking care of the correct
19//! sequencing.
20//!
21//! Commands in the command channel are tagged with an epoch identifying the incarnation of the
22//! compute protocol the command belongs to, allowing workers to recognize client reconnects that
23//! require a reconciliation.
2425use std::sync::{Arc, Mutex};
2627use crossbeam_channel::TryRecvError;
28use mz_compute_client::protocol::command::ComputeCommand;
29use mz_compute_types::dataflows::{BuildDesc, DataflowDescription};
30use mz_ore::cast::CastFrom;
31use timely::communication::Allocate;
32use timely::dataflow::channels::pact::Exchange;
33use timely::dataflow::operators::Operator;
34use timely::dataflow::operators::generic::source;
35use timely::scheduling::{Scheduler, SyncActivator};
36use timely::worker::Worker as TimelyWorker;
3738/// A sender pushing commands onto the command channel.
39pub struct Sender {
40 tx: crossbeam_channel::Sender<(ComputeCommand, u64)>,
41 activator: Arc<Mutex<Option<SyncActivator>>>,
42}
4344impl Sender {
45/// Broadcasts the given command to all workers.
46pub fn send(&self, message: (ComputeCommand, u64)) {
47if self.tx.send(message).is_err() {
48unreachable!("command channel never shuts down");
49 }
5051self.activator
52 .lock()
53 .expect("poisoned")
54 .as_ref()
55 .map(|a| a.activate());
56 }
57}
5859/// A receiver reading commands from the command channel.
60pub struct Receiver {
61 rx: crossbeam_channel::Receiver<(ComputeCommand, u64)>,
62}
6364impl Receiver {
65/// Returns the next available command, if any.
66 ///
67 /// This returns `None` when there are currently no commands but there might be commands again
68 /// in the future.
69pub fn try_recv(&self) -> Option<(ComputeCommand, u64)> {
70match self.rx.try_recv() {
71Ok(msg) => Some(msg),
72Err(TryRecvError::Empty) => None,
73Err(TryRecvError::Disconnected) => {
74unreachable!("command channel never shuts down");
75 }
76 }
77 }
78}
7980/// Render the command channel dataflow.
81pub fn render<A: Allocate>(timely_worker: &mut TimelyWorker<A>) -> (Sender, Receiver) {
82let (input_tx, input_rx) = crossbeam_channel::unbounded();
83let (output_tx, output_rx) = crossbeam_channel::unbounded();
84let activator = Arc::new(Mutex::new(None));
8586// TODO(teskje): This implementation relies on Timely channels preserving the order of their
87 // inputs, which is not something they guarantee. We can avoid that by using explicit indexing,
88 // like storage's command sequencer does.
89timely_worker.dataflow_named::<u64, _, _>("command_channel", {
90let activator = Arc::clone(&activator);
91move |scope| {
92 source(scope, "command_channel::source", |cap, info| {
93let sync_activator = scope.sync_activator_for(info.address.to_vec());
94*activator.lock().expect("poisoned") = Some(sync_activator);
9596let worker_id = scope.index();
97let peers = scope.peers();
9899// Only worker 0 broadcasts commands, other workers must drop their capability to
100 // avoid holding up dataflow progress.
101let mut capability = (worker_id == 0).then_some(cap);
102103move |output| {
104let Some(cap) = &mut capability else {
105// Non-leader workers will still receive `UpdateConfiguration` commands and
106 // we must drain those to not leak memory.
107while let Ok((cmd, _epoch)) = input_rx.try_recv() {
108assert_ne!(worker_id, 0);
109assert!(matches!(cmd, ComputeCommand::UpdateConfiguration(_)));
110 }
111return;
112 };
113114assert_eq!(worker_id, 0);
115116let input: Vec<_> = input_rx.try_iter().collect();
117for (cmd, epoch) in input {
118let worker_cmds =
119 split_command(cmd, peers).map(|(idx, cmd)| (idx, cmd, epoch));
120 output.session(&cap).give_iterator(worker_cmds);
121122 cap.downgrade(&(cap.time() + 1));
123 }
124 }
125 })
126 .sink(
127 Exchange::new(|(idx, _, _)| u64::cast_from(*idx)),
128"command_channel::sink",
129move |input| {
130while let Some((_cap, data)) = input.next() {
131for (_idx, cmd, epoch) in data.drain(..) {
132let _ = output_tx.send((cmd, epoch));
133 }
134 }
135 },
136 );
137 }
138 });
139140let tx = Sender {
141 tx: input_tx,
142 activator,
143 };
144let rx = Receiver { rx: output_rx };
145146 (tx, rx)
147}
148149/// Split the given command into the given number of parts.
150///
151/// Returns an iterator that produces each command part, along with its part index.
152fn split_command(
153 command: ComputeCommand,
154 parts: usize,
155) -> impl Iterator<Item = (usize, ComputeCommand)> {
156use itertools::Either;
157158let commands = match command {
159 ComputeCommand::CreateDataflow(dataflow) => {
160// A list of descriptions of objects for each part to build.
161let mut builds_parts = vec![Vec::new(); parts];
162// Partition each build description among `parts`.
163for build_desc in dataflow.objects_to_build {
164let build_part = build_desc.plan.partition_among(parts);
165for (plan, objects_to_build) in build_part.into_iter().zip(builds_parts.iter_mut())
166 {
167 objects_to_build.push(BuildDesc {
168 id: build_desc.id,
169 plan,
170 });
171 }
172 }
173174// Each list of build descriptions results in a dataflow description.
175let commands = builds_parts
176 .into_iter()
177 .map(move |objects_to_build| DataflowDescription {
178 source_imports: dataflow.source_imports.clone(),
179 index_imports: dataflow.index_imports.clone(),
180 objects_to_build,
181 index_exports: dataflow.index_exports.clone(),
182 sink_exports: dataflow.sink_exports.clone(),
183 as_of: dataflow.as_of.clone(),
184 until: dataflow.until.clone(),
185 debug_name: dataflow.debug_name.clone(),
186 initial_storage_as_of: dataflow.initial_storage_as_of.clone(),
187 refresh_schedule: dataflow.refresh_schedule.clone(),
188 time_dependence: dataflow.time_dependence.clone(),
189 })
190 .map(ComputeCommand::CreateDataflow);
191 Either::Left(commands)
192 }
193 command => {
194let commands = std::iter::repeat_n(command, parts);
195 Either::Right(commands)
196 }
197 };
198199 commands.into_iter().enumerate()
200}