mz_compute/
command_channel.rs1use std::sync::mpsc::{self, TryRecvError};
26use std::sync::{Arc, Mutex};
27
28use itertools::Itertools;
29use mz_compute_client::protocol::command::ComputeCommand;
30use mz_compute_types::dataflows::{BuildDesc, DataflowDescription};
31use mz_ore::cast::CastFrom;
32use mz_timely_util::scope_label::ScopeExt;
33use timely::dataflow::channels::pact::Exchange;
34use timely::dataflow::operators::Operator;
35use timely::dataflow::operators::generic::source;
36use timely::scheduling::SyncActivator;
37use timely::worker::Worker as TimelyWorker;
38use uuid::Uuid;
39
40pub struct Sender {
42 tx: mpsc::Sender<(ComputeCommand, Uuid)>,
43 activator: Arc<Mutex<Option<SyncActivator>>>,
44}
45
46impl Sender {
47 pub fn send(&self, message: (ComputeCommand, Uuid)) {
49 if self.tx.send(message).is_err() {
50 unreachable!("command channel never shuts down");
51 }
52
53 self.activator
54 .lock()
55 .expect("poisoned")
56 .as_ref()
57 .map(|a| a.activate());
58 }
59}
60
61pub struct Receiver {
63 rx: mpsc::Receiver<(ComputeCommand, Uuid)>,
64}
65
66impl Receiver {
67 pub fn try_recv(&self) -> Option<(ComputeCommand, Uuid)> {
72 match self.rx.try_recv() {
73 Ok(msg) => Some(msg),
74 Err(TryRecvError::Empty) => None,
75 Err(TryRecvError::Disconnected) => {
76 unreachable!("command channel never shuts down");
77 }
78 }
79 }
80}
81
82pub fn render(timely_worker: &mut TimelyWorker) -> (Sender, Receiver) {
84 let (input_tx, input_rx) = mpsc::channel();
85 let (output_tx, output_rx) = mpsc::channel();
86 let activator = Arc::new(Mutex::new(None));
87
88 timely_worker.dataflow_named::<u64, _, _>("command_channel", {
92 let activator = Arc::clone(&activator);
93 move |scope| {
94 let scope = scope.with_label();
95
96 source(scope, "command_channel::source", |cap, info| {
97 let sync_activator = scope.worker().sync_activator_for(info.address.to_vec());
98 *activator.lock().expect("poisoned") = Some(sync_activator);
99
100 let worker_id = scope.index();
101 let peers = scope.peers();
102
103 let mut capability = (worker_id == 0).then_some(cap);
106
107 move |output| {
108 let Some(cap) = &mut capability else {
109 while let Ok((cmd, _nonce)) = input_rx.try_recv() {
112 assert_ne!(worker_id, 0);
113 assert!(matches!(cmd, ComputeCommand::UpdateConfiguration(_)));
114 }
115 return;
116 };
117
118 assert_eq!(worker_id, 0);
119
120 let input: Vec<_> = input_rx.try_iter().collect();
121 for (cmd, nonce) in input {
122 let worker_cmds =
123 split_command(cmd, peers).map(|(idx, cmd)| (idx, cmd, nonce));
124 output.session(&cap).give_iterator(worker_cmds);
125
126 cap.downgrade(&(cap.time() + 1));
127 }
128 }
129 })
130 .sink(
131 Exchange::new(|(idx, _, _)| u64::cast_from(*idx)),
132 "command_channel::sink",
133 move |(input, _)| {
134 input.for_each(|_time, data| {
135 for (_idx, cmd, nonce) in data.drain(..) {
136 let _ = output_tx.send((cmd, nonce));
137 }
138 });
139 },
140 );
141 }
142 });
143
144 let tx = Sender {
145 tx: input_tx,
146 activator,
147 };
148 let rx = Receiver { rx: output_rx };
149
150 (tx, rx)
151}
152
153fn split_command(
157 command: ComputeCommand,
158 parts: usize,
159) -> impl Iterator<Item = (usize, ComputeCommand)> {
160 use itertools::Either;
161
162 let commands = match command {
163 ComputeCommand::CreateDataflow(dataflow) => {
164 let dataflow = *dataflow;
165
166 let mut builds_parts = vec![Vec::new(); parts];
168 for build_desc in dataflow.objects_to_build {
170 let build_part = build_desc.plan.partition_among(parts);
171 for (plan, objects_to_build) in
172 build_part.into_iter().zip_eq(builds_parts.iter_mut())
173 {
174 objects_to_build.push(BuildDesc {
175 id: build_desc.id,
176 plan,
177 });
178 }
179 }
180
181 let commands = builds_parts
183 .into_iter()
184 .map(move |objects_to_build| DataflowDescription {
185 source_imports: dataflow.source_imports.clone(),
186 index_imports: dataflow.index_imports.clone(),
187 objects_to_build,
188 index_exports: dataflow.index_exports.clone(),
189 sink_exports: dataflow.sink_exports.clone(),
190 as_of: dataflow.as_of.clone(),
191 until: dataflow.until.clone(),
192 debug_name: dataflow.debug_name.clone(),
193 initial_storage_as_of: dataflow.initial_storage_as_of.clone(),
194 refresh_schedule: dataflow.refresh_schedule.clone(),
195 time_dependence: dataflow.time_dependence.clone(),
196 })
197 .map(Box::new)
198 .map(ComputeCommand::CreateDataflow);
199 Either::Left(commands)
200 }
201 command => {
202 let commands = std::iter::repeat_n(command, parts);
203 Either::Right(commands)
204 }
205 };
206
207 commands.into_iter().enumerate()
208}