mz_compute/
command_channel.rs1use std::sync::mpsc::{self, TryRecvError};
26use std::sync::{Arc, Mutex};
27
28use itertools::Itertools;
29use mz_compute_client::protocol::command::ComputeCommand;
30use mz_compute_types::dataflows::{BuildDesc, DataflowDescription};
31use mz_ore::cast::CastFrom;
32use mz_timely_util::scope_label::ScopeExt;
33use timely::communication::Allocate;
34use timely::dataflow::channels::pact::Exchange;
35use timely::dataflow::operators::Operator;
36use timely::dataflow::operators::generic::source;
37use timely::scheduling::{Scheduler, SyncActivator};
38use timely::worker::{AsWorker, Worker as TimelyWorker};
39use uuid::Uuid;
40
41pub struct Sender {
43 tx: mpsc::Sender<(ComputeCommand, Uuid)>,
44 activator: Arc<Mutex<Option<SyncActivator>>>,
45}
46
47impl Sender {
48 pub fn send(&self, message: (ComputeCommand, Uuid)) {
50 if self.tx.send(message).is_err() {
51 unreachable!("command channel never shuts down");
52 }
53
54 self.activator
55 .lock()
56 .expect("poisoned")
57 .as_ref()
58 .map(|a| a.activate());
59 }
60}
61
62pub struct Receiver {
64 rx: mpsc::Receiver<(ComputeCommand, Uuid)>,
65}
66
67impl Receiver {
68 pub fn try_recv(&self) -> Option<(ComputeCommand, Uuid)> {
73 match self.rx.try_recv() {
74 Ok(msg) => Some(msg),
75 Err(TryRecvError::Empty) => None,
76 Err(TryRecvError::Disconnected) => {
77 unreachable!("command channel never shuts down");
78 }
79 }
80 }
81}
82
83pub fn render<A: Allocate>(timely_worker: &mut TimelyWorker<A>) -> (Sender, Receiver) {
85 let (input_tx, input_rx) = mpsc::channel();
86 let (output_tx, output_rx) = mpsc::channel();
87 let activator = Arc::new(Mutex::new(None));
88
89 timely_worker.dataflow_named::<u64, _, _>("command_channel", {
93 let activator = Arc::clone(&activator);
94 move |scope| {
95 let scope = &mut scope.with_label();
96
97 source(scope, "command_channel::source", |cap, info| {
98 let sync_activator = scope.sync_activator_for(info.address.to_vec());
99 *activator.lock().expect("poisoned") = Some(sync_activator);
100
101 let worker_id = scope.index();
102 let peers = scope.peers();
103
104 let mut capability = (worker_id == 0).then_some(cap);
107
108 move |output| {
109 let Some(cap) = &mut capability else {
110 while let Ok((cmd, _nonce)) = input_rx.try_recv() {
113 assert_ne!(worker_id, 0);
114 assert!(matches!(cmd, ComputeCommand::UpdateConfiguration(_)));
115 }
116 return;
117 };
118
119 assert_eq!(worker_id, 0);
120
121 let input: Vec<_> = input_rx.try_iter().collect();
122 for (cmd, nonce) in input {
123 let worker_cmds =
124 split_command(cmd, peers).map(|(idx, cmd)| (idx, cmd, nonce));
125 output.session(&cap).give_iterator(worker_cmds);
126
127 cap.downgrade(&(cap.time() + 1));
128 }
129 }
130 })
131 .sink(
132 Exchange::new(|(idx, _, _)| u64::cast_from(*idx)),
133 "command_channel::sink",
134 move |(input, _)| {
135 input.for_each(|_time, data| {
136 for (_idx, cmd, nonce) in data.drain(..) {
137 let _ = output_tx.send((cmd, nonce));
138 }
139 });
140 },
141 );
142 }
143 });
144
145 let tx = Sender {
146 tx: input_tx,
147 activator,
148 };
149 let rx = Receiver { rx: output_rx };
150
151 (tx, rx)
152}
153
154fn split_command(
158 command: ComputeCommand,
159 parts: usize,
160) -> impl Iterator<Item = (usize, ComputeCommand)> {
161 use itertools::Either;
162
163 let commands = match command {
164 ComputeCommand::CreateDataflow(dataflow) => {
165 let dataflow = *dataflow;
166
167 let mut builds_parts = vec![Vec::new(); parts];
169 for build_desc in dataflow.objects_to_build {
171 let build_part = build_desc.plan.partition_among(parts);
172 for (plan, objects_to_build) in
173 build_part.into_iter().zip_eq(builds_parts.iter_mut())
174 {
175 objects_to_build.push(BuildDesc {
176 id: build_desc.id,
177 plan,
178 });
179 }
180 }
181
182 let commands = builds_parts
184 .into_iter()
185 .map(move |objects_to_build| DataflowDescription {
186 source_imports: dataflow.source_imports.clone(),
187 index_imports: dataflow.index_imports.clone(),
188 objects_to_build,
189 index_exports: dataflow.index_exports.clone(),
190 sink_exports: dataflow.sink_exports.clone(),
191 as_of: dataflow.as_of.clone(),
192 until: dataflow.until.clone(),
193 debug_name: dataflow.debug_name.clone(),
194 initial_storage_as_of: dataflow.initial_storage_as_of.clone(),
195 refresh_schedule: dataflow.refresh_schedule.clone(),
196 time_dependence: dataflow.time_dependence.clone(),
197 })
198 .map(Box::new)
199 .map(ComputeCommand::CreateDataflow);
200 Either::Left(commands)
201 }
202 command => {
203 let commands = std::iter::repeat_n(command, parts);
204 Either::Right(commands)
205 }
206 };
207
208 commands.into_iter().enumerate()
209}