mz_compute/
command_channel.rs1use std::sync::{Arc, Mutex};
26
27use crossbeam_channel::TryRecvError;
28use itertools::Itertools;
29use mz_compute_client::protocol::command::ComputeCommand;
30use mz_compute_types::dataflows::{BuildDesc, DataflowDescription};
31use mz_ore::cast::CastFrom;
32use timely::communication::Allocate;
33use timely::dataflow::channels::pact::Exchange;
34use timely::dataflow::operators::Operator;
35use timely::dataflow::operators::generic::source;
36use timely::scheduling::{Scheduler, SyncActivator};
37use timely::worker::Worker as TimelyWorker;
38use uuid::Uuid;
39
40pub struct Sender {
42 tx: crossbeam_channel::Sender<(ComputeCommand, Uuid)>,
43 activator: Arc<Mutex<Option<SyncActivator>>>,
44}
45
46impl Sender {
47 pub fn send(&self, message: (ComputeCommand, Uuid)) {
49 if self.tx.send(message).is_err() {
50 unreachable!("command channel never shuts down");
51 }
52
53 self.activator
54 .lock()
55 .expect("poisoned")
56 .as_ref()
57 .map(|a| a.activate());
58 }
59}
60
61pub struct Receiver {
63 rx: crossbeam_channel::Receiver<(ComputeCommand, Uuid)>,
64}
65
66impl Receiver {
67 pub fn try_recv(&self) -> Option<(ComputeCommand, Uuid)> {
72 match self.rx.try_recv() {
73 Ok(msg) => Some(msg),
74 Err(TryRecvError::Empty) => None,
75 Err(TryRecvError::Disconnected) => {
76 unreachable!("command channel never shuts down");
77 }
78 }
79 }
80}
81
82pub fn render<A: Allocate>(timely_worker: &mut TimelyWorker<A>) -> (Sender, Receiver) {
84 let (input_tx, input_rx) = crossbeam_channel::unbounded();
85 let (output_tx, output_rx) = crossbeam_channel::unbounded();
86 let activator = Arc::new(Mutex::new(None));
87
88 timely_worker.dataflow_named::<u64, _, _>("command_channel", {
92 let activator = Arc::clone(&activator);
93 move |scope| {
94 source(scope, "command_channel::source", |cap, info| {
95 let sync_activator = scope.sync_activator_for(info.address.to_vec());
96 *activator.lock().expect("poisoned") = Some(sync_activator);
97
98 let worker_id = scope.index();
99 let peers = scope.peers();
100
101 let mut capability = (worker_id == 0).then_some(cap);
104
105 move |output| {
106 let Some(cap) = &mut capability else {
107 while let Ok((cmd, _nonce)) = input_rx.try_recv() {
110 assert_ne!(worker_id, 0);
111 assert!(matches!(cmd, ComputeCommand::UpdateConfiguration(_)));
112 }
113 return;
114 };
115
116 assert_eq!(worker_id, 0);
117
118 let input: Vec<_> = input_rx.try_iter().collect();
119 for (cmd, nonce) in input {
120 let worker_cmds =
121 split_command(cmd, peers).map(|(idx, cmd)| (idx, cmd, nonce));
122 output.session(&cap).give_iterator(worker_cmds);
123
124 cap.downgrade(&(cap.time() + 1));
125 }
126 }
127 })
128 .sink(
129 Exchange::new(|(idx, _, _)| u64::cast_from(*idx)),
130 "command_channel::sink",
131 move |input| {
132 while let Some((_cap, data)) = input.next() {
133 for (_idx, cmd, nonce) in data.drain(..) {
134 let _ = output_tx.send((cmd, nonce));
135 }
136 }
137 },
138 );
139 }
140 });
141
142 let tx = Sender {
143 tx: input_tx,
144 activator,
145 };
146 let rx = Receiver { rx: output_rx };
147
148 (tx, rx)
149}
150
151fn split_command(
155 command: ComputeCommand,
156 parts: usize,
157) -> impl Iterator<Item = (usize, ComputeCommand)> {
158 use itertools::Either;
159
160 let commands = match command {
161 ComputeCommand::CreateDataflow(dataflow) => {
162 let dataflow = *dataflow;
163
164 let mut builds_parts = vec![Vec::new(); parts];
166 for build_desc in dataflow.objects_to_build {
168 let build_part = build_desc.plan.partition_among(parts);
169 for (plan, objects_to_build) in
170 build_part.into_iter().zip_eq(builds_parts.iter_mut())
171 {
172 objects_to_build.push(BuildDesc {
173 id: build_desc.id,
174 plan,
175 });
176 }
177 }
178
179 let commands = builds_parts
181 .into_iter()
182 .map(move |objects_to_build| DataflowDescription {
183 source_imports: dataflow.source_imports.clone(),
184 index_imports: dataflow.index_imports.clone(),
185 objects_to_build,
186 index_exports: dataflow.index_exports.clone(),
187 sink_exports: dataflow.sink_exports.clone(),
188 as_of: dataflow.as_of.clone(),
189 until: dataflow.until.clone(),
190 debug_name: dataflow.debug_name.clone(),
191 initial_storage_as_of: dataflow.initial_storage_as_of.clone(),
192 refresh_schedule: dataflow.refresh_schedule.clone(),
193 time_dependence: dataflow.time_dependence.clone(),
194 })
195 .map(Box::new)
196 .map(ComputeCommand::CreateDataflow);
197 Either::Left(commands)
198 }
199 command => {
200 let commands = std::iter::repeat_n(command, parts);
201 Either::Right(commands)
202 }
203 };
204
205 commands.into_iter().enumerate()
206}