1use std::fmt;
13use std::sync::{Arc, Mutex};
14use std::thread::Thread;
15
16use anyhow::{Error, anyhow};
17use async_trait::async_trait;
18use differential_dataflow::trace::ExertionLogic;
19use futures::future;
20use mz_cluster_client::client::{TimelyConfig, TryIntoProtocolNonce};
21use mz_service::client::{GenericClient, Partitionable, Partitioned};
22use mz_service::local::LocalClient;
23use timely::WorkerConfig;
24use timely::communication::Allocate;
25use timely::communication::allocator::GenericBuilder;
26use timely::communication::allocator::zero_copy::bytes_slab::BytesRefill;
27use timely::communication::initialize::WorkerGuards;
28use timely::execute::execute_from;
29use timely::worker::Worker as TimelyWorker;
30use tokio::runtime::Handle;
31use tokio::sync::mpsc;
32use tracing::info;
33use uuid::Uuid;
34
35use crate::communication::initialize_networking;
36
37type PartitionedClient<C, R> = Partitioned<LocalClient<C, R>, C, R>;
38
39pub struct ClusterClient<C>
41where
42 C: ClusterSpec,
43 (C::Command, C::Response): Partitionable<C::Command, C::Response>,
44{
45 inner: Option<PartitionedClient<C::Command, C::Response>>,
47 timely_container: Arc<Mutex<TimelyContainer<C>>>,
49}
50
51pub struct TimelyContainer<C: ClusterSpec> {
53 client_txs: Vec<
55 crossbeam_channel::Sender<(
56 Uuid,
57 crossbeam_channel::Receiver<C::Command>,
58 mpsc::UnboundedSender<C::Response>,
59 )>,
60 >,
61 worker_guards: WorkerGuards<()>,
63}
64
65impl<C: ClusterSpec> TimelyContainer<C> {
66 fn worker_threads(&self) -> Vec<Thread> {
67 self.worker_guards
68 .guards()
69 .iter()
70 .map(|h| h.thread().clone())
71 .collect()
72 }
73}
74
75impl<C> ClusterClient<C>
76where
77 C: ClusterSpec,
78 (C::Command, C::Response): Partitionable<C::Command, C::Response>,
79{
80 pub fn new(timely_container: Arc<Mutex<TimelyContainer<C>>>) -> Self {
82 Self {
83 timely_container,
84 inner: None,
85 }
86 }
87
88 fn connect(&mut self, nonce: Uuid) -> Result<(), Error> {
90 let timely = self.timely_container.lock().expect("poisoned");
91
92 let mut command_txs = Vec::new();
93 let mut response_rxs = Vec::new();
94 for client_tx in &timely.client_txs {
95 let (cmd_tx, cmd_rx) = crossbeam_channel::unbounded();
96 let (resp_tx, resp_rx) = mpsc::unbounded_channel();
97
98 client_tx
99 .send((nonce, cmd_rx, resp_tx))
100 .expect("worker not dropped");
101
102 command_txs.push(cmd_tx);
103 response_rxs.push(resp_rx);
104 }
105
106 self.inner = Some(LocalClient::new_partitioned(
107 response_rxs,
108 command_txs,
109 timely.worker_threads(),
110 ));
111 Ok(())
112 }
113}
114
115impl<C> fmt::Debug for ClusterClient<C>
116where
117 C: ClusterSpec,
118 (C::Command, C::Response): Partitionable<C::Command, C::Response>,
119{
120 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
121 f.debug_struct("ClusterClient")
122 .field("inner", &self.inner)
123 .finish_non_exhaustive()
124 }
125}
126
127#[async_trait]
128impl<C> GenericClient<C::Command, C::Response> for ClusterClient<C>
129where
130 C: ClusterSpec,
131 (C::Command, C::Response): Partitionable<C::Command, C::Response>,
132{
133 async fn send(&mut self, cmd: C::Command) -> Result<(), Error> {
134 tracing::debug!("ClusterClient send={:?}", &cmd);
136
137 match cmd.try_into_protocol_nonce() {
138 Ok(nonce) => self.connect(nonce),
139 Err(cmd) => self.inner.as_mut().expect("initialized").send(cmd).await,
140 }
141 }
142
143 async fn recv(&mut self) -> Result<Option<C::Response>, Error> {
149 if let Some(client) = self.inner.as_mut() {
150 client.recv().await
152 } else {
153 future::pending().await
154 }
155 }
156}
157
158#[async_trait]
163pub trait ClusterSpec: Clone + Send + Sync + 'static {
164 type Command: fmt::Debug + Send + TryIntoProtocolNonce;
166 type Response: fmt::Debug + Send;
168
169 fn run_worker<A: Allocate + 'static>(
171 &self,
172 timely_worker: &mut TimelyWorker<A>,
173 client_rx: crossbeam_channel::Receiver<(
174 Uuid,
175 crossbeam_channel::Receiver<Self::Command>,
176 mpsc::UnboundedSender<Self::Response>,
177 )>,
178 );
179
180 async fn build_cluster(
182 &self,
183 config: TimelyConfig,
184 tokio_executor: Handle,
185 ) -> Result<TimelyContainer<Self>, Error> {
186 info!("Building timely container with config {config:?}");
187 let (client_txs, client_rxs): (Vec<_>, Vec<_>) = (0..config.workers)
188 .map(|_| crossbeam_channel::unbounded())
189 .unzip();
190 let client_rxs: Mutex<Vec<_>> = Mutex::new(client_rxs.into_iter().map(Some).collect());
191
192 let refill = if config.enable_zero_copy_lgalloc {
193 BytesRefill {
194 logic: Arc::new(|size| Box::new(alloc::lgalloc_refill(size))),
195 limit: config.zero_copy_limit,
196 }
197 } else {
198 BytesRefill {
199 logic: Arc::new(|size| Box::new(vec![0; size])),
200 limit: config.zero_copy_limit,
201 }
202 };
203
204 let (builders, other) = if config.enable_zero_copy {
205 use timely::communication::allocator::zero_copy::allocator_process::ProcessBuilder;
206 initialize_networking::<ProcessBuilder>(
207 config.workers,
208 config.process,
209 config.addresses.clone(),
210 refill,
211 GenericBuilder::ZeroCopyBinary,
212 )
213 .await?
214 } else {
215 initialize_networking::<timely::communication::allocator::Process>(
216 config.workers,
217 config.process,
218 config.addresses.clone(),
219 refill,
220 GenericBuilder::ZeroCopy,
221 )
222 .await?
223 };
224
225 let mut worker_config = WorkerConfig::default();
226
227 if config.arrangement_exert_proportionality > 0 {
231 let merge_effort = Some(1000);
232
233 let arc: ExertionLogic = Arc::new(move |layers| {
239 let mut prop = config.arrangement_exert_proportionality;
240
241 let layers = layers
244 .iter()
245 .copied()
246 .skip_while(|(_idx, count, _len)| *count == 0);
247
248 let mut first = true;
249 for (_idx, count, len) in layers {
250 if count > 1 {
251 return merge_effort;
253 }
254
255 if !first && prop > 0 && len > 0 {
256 return merge_effort;
259 }
260
261 first = false;
262 prop /= 2;
263 }
264
265 None
266 });
267 worker_config.set::<ExertionLogic>("differential/default_exert_logic".to_string(), arc);
268 }
269
270 let spec = self.clone();
271 let worker_guards = execute_from(builders, other, worker_config, move |timely_worker| {
272 let timely_worker_index = timely_worker.index();
273 let _tokio_guard = tokio_executor.enter();
274 let client_rx = client_rxs.lock().unwrap()[timely_worker_index % config.workers]
275 .take()
276 .unwrap();
277 spec.run_worker(timely_worker, client_rx);
278 })
279 .map_err(|e| anyhow!(e))?;
280
281 Ok(TimelyContainer {
282 client_txs,
283 worker_guards,
284 })
285 }
286}
287
288mod alloc {
289 pub(crate) fn lgalloc_refill(size: usize) -> LgallocHandle {
295 match lgalloc::allocate::<u8>(size) {
296 Ok((pointer, capacity, handle)) => {
297 let handle = Some(handle);
298 LgallocHandle {
299 handle,
300 pointer,
301 capacity,
302 }
303 }
304 Err(_) => {
305 let mut alloc = vec![0_u8; size];
307 alloc.shrink_to_fit();
309 let pointer = std::ptr::NonNull::new(alloc.as_mut_ptr()).unwrap();
311 std::mem::forget(alloc);
313 LgallocHandle {
314 handle: None,
315 pointer,
316 capacity: size,
317 }
318 }
319 }
320 }
321
322 pub(crate) struct LgallocHandle {
326 handle: Option<lgalloc::Handle>,
328 pointer: std::ptr::NonNull<u8>,
330 capacity: usize,
332 }
333
334 impl std::ops::Deref for LgallocHandle {
335 type Target = [u8];
336 #[inline(always)]
337 fn deref(&self) -> &Self::Target {
338 unsafe { std::slice::from_raw_parts(self.pointer.as_ptr(), self.capacity) }
339 }
340 }
341
342 impl std::ops::DerefMut for LgallocHandle {
343 #[inline(always)]
344 fn deref_mut(&mut self) -> &mut Self::Target {
345 unsafe { std::slice::from_raw_parts_mut(self.pointer.as_ptr(), self.capacity) }
346 }
347 }
348
349 impl Drop for LgallocHandle {
350 fn drop(&mut self) {
351 if let Some(handle) = self.handle.take() {
353 lgalloc::deallocate(handle);
354 } else {
355 unsafe { Vec::from_raw_parts(self.pointer.as_ptr(), 0, self.capacity) };
356 }
357 self.pointer = std::ptr::NonNull::dangling();
359 self.capacity = 0;
360 }
361 }
362}