1use std::fmt;
13use std::sync::{Arc, Mutex};
14use std::thread::Thread;
15
16use anyhow::{Error, anyhow};
17use async_trait::async_trait;
18use differential_dataflow::trace::ExertionLogic;
19use futures::future;
20use mz_cluster_client::client::{TimelyConfig, TryIntoProtocolNonce};
21use mz_service::client::{GenericClient, Partitionable, Partitioned};
22use mz_service::local::LocalClient;
23use timely::WorkerConfig;
24use timely::communication::Allocate;
25use timely::communication::allocator::GenericBuilder;
26use timely::communication::allocator::zero_copy::bytes_slab::BytesRefill;
27use timely::communication::initialize::WorkerGuards;
28use timely::execute::execute_from;
29use timely::worker::Worker as TimelyWorker;
30use tokio::runtime::Handle;
31use tokio::sync::mpsc;
32use tracing::{info, info_span};
33use uuid::Uuid;
34
35use crate::communication::initialize_networking;
36
37type PartitionedClient<C, R> = Partitioned<LocalClient<C, R>, C, R>;
38
39pub struct ClusterClient<C>
41where
42 C: ClusterSpec,
43 (C::Command, C::Response): Partitionable<C::Command, C::Response>,
44{
45 inner: Option<PartitionedClient<C::Command, C::Response>>,
47 timely_container: Arc<Mutex<TimelyContainer<C>>>,
49}
50
51pub struct TimelyContainer<C: ClusterSpec> {
53 client_txs: Vec<
55 mpsc::UnboundedSender<(
56 Uuid,
57 mpsc::UnboundedReceiver<C::Command>,
58 mpsc::UnboundedSender<C::Response>,
59 )>,
60 >,
61 worker_guards: WorkerGuards<()>,
63}
64
65impl<C: ClusterSpec> TimelyContainer<C> {
66 fn worker_threads(&self) -> Vec<Thread> {
67 self.worker_guards
68 .guards()
69 .iter()
70 .map(|h| h.thread().clone())
71 .collect()
72 }
73}
74
75impl<C> ClusterClient<C>
76where
77 C: ClusterSpec,
78 (C::Command, C::Response): Partitionable<C::Command, C::Response>,
79{
80 pub fn new(timely_container: Arc<Mutex<TimelyContainer<C>>>) -> Self {
82 Self {
83 timely_container,
84 inner: None,
85 }
86 }
87
88 fn connect(&mut self, nonce: Uuid) -> Result<(), Error> {
90 let timely = self.timely_container.lock().expect("poisoned");
91
92 let mut command_txs = Vec::new();
93 let mut response_rxs = Vec::new();
94 for client_tx in &timely.client_txs {
95 let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
96 let (resp_tx, resp_rx) = mpsc::unbounded_channel();
97
98 client_tx
99 .send((nonce, cmd_rx, resp_tx))
100 .expect("worker not dropped");
101
102 command_txs.push(cmd_tx);
103 response_rxs.push(resp_rx);
104 }
105
106 self.inner = Some(LocalClient::new_partitioned(
107 response_rxs,
108 command_txs,
109 timely.worker_threads(),
110 ));
111 Ok(())
112 }
113}
114
115impl<C> fmt::Debug for ClusterClient<C>
116where
117 C: ClusterSpec,
118 (C::Command, C::Response): Partitionable<C::Command, C::Response>,
119{
120 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
121 f.debug_struct("ClusterClient")
122 .field("inner", &self.inner)
123 .finish_non_exhaustive()
124 }
125}
126
127#[async_trait]
128impl<C> GenericClient<C::Command, C::Response> for ClusterClient<C>
129where
130 C: ClusterSpec,
131 (C::Command, C::Response): Partitionable<C::Command, C::Response>,
132{
133 async fn send(&mut self, cmd: C::Command) -> Result<(), Error> {
134 tracing::debug!("ClusterClient send={:?}", &cmd);
136
137 match cmd.try_into_protocol_nonce() {
138 Ok(nonce) => self.connect(nonce),
139 Err(cmd) => self.inner.as_mut().expect("initialized").send(cmd).await,
140 }
141 }
142
143 async fn recv(&mut self) -> Result<Option<C::Response>, Error> {
149 if let Some(client) = self.inner.as_mut() {
150 client.recv().await
152 } else {
153 future::pending().await
154 }
155 }
156}
157
158#[async_trait]
163pub trait ClusterSpec: Clone + Send + Sync + 'static {
164 type Command: fmt::Debug + Send + TryIntoProtocolNonce;
166 type Response: fmt::Debug + Send;
168
169 const NAME: &str;
171
172 fn run_worker<A: Allocate + 'static>(
174 &self,
175 timely_worker: &mut TimelyWorker<A>,
176 client_rx: mpsc::UnboundedReceiver<(
177 Uuid,
178 mpsc::UnboundedReceiver<Self::Command>,
179 mpsc::UnboundedSender<Self::Response>,
180 )>,
181 );
182
183 async fn build_cluster(
185 &self,
186 config: TimelyConfig,
187 tokio_executor: Handle,
188 ) -> Result<TimelyContainer<Self>, Error> {
189 info!("Building timely container with config {config:?}");
190 let (client_txs, client_rxs): (Vec<_>, Vec<_>) = (0..config.workers)
191 .map(|_| mpsc::unbounded_channel())
192 .unzip();
193 let client_rxs: Mutex<Vec<_>> = Mutex::new(client_rxs.into_iter().map(Some).collect());
194
195 let refill = if config.enable_zero_copy_lgalloc {
196 BytesRefill {
197 logic: Arc::new(|size| Box::new(alloc::lgalloc_refill(size))),
198 limit: config.zero_copy_limit,
199 }
200 } else {
201 BytesRefill {
202 logic: Arc::new(|size| Box::new(vec![0; size])),
203 limit: config.zero_copy_limit,
204 }
205 };
206
207 let (builders, other) = if config.enable_zero_copy {
208 use timely::communication::allocator::zero_copy::allocator_process::ProcessBuilder;
209 initialize_networking::<ProcessBuilder>(
210 config.workers,
211 config.process,
212 config.addresses.clone(),
213 refill,
214 GenericBuilder::ZeroCopyBinary,
215 )
216 .await?
217 } else {
218 initialize_networking::<timely::communication::allocator::Process>(
219 config.workers,
220 config.process,
221 config.addresses.clone(),
222 refill,
223 GenericBuilder::ZeroCopy,
224 )
225 .await?
226 };
227
228 let mut worker_config = WorkerConfig::default();
229
230 if config.arrangement_exert_proportionality > 0 {
234 let merge_effort = Some(1000);
235
236 let arc: ExertionLogic = Arc::new(move |layers| {
242 let mut prop = config.arrangement_exert_proportionality;
243
244 let layers = layers
247 .iter()
248 .copied()
249 .skip_while(|(_idx, count, _len)| *count == 0);
250
251 let mut first = true;
252 for (_idx, count, len) in layers {
253 if count > 1 {
254 return merge_effort;
256 }
257
258 if !first && prop > 0 && len > 0 {
259 return merge_effort;
262 }
263
264 first = false;
265 prop /= 2;
266 }
267
268 None
269 });
270 worker_config.set::<ExertionLogic>("differential/default_exert_logic".to_string(), arc);
271 }
272
273 let spec = self.clone();
274 let worker_guards = execute_from(builders, other, worker_config, move |timely_worker| {
275 let worker_idx = timely_worker.index();
276
277 let span = info_span!("timely", name = Self::NAME, worker_id = worker_idx);
279 let _span_guard = span.enter();
280
281 let _tokio_guard = tokio_executor.enter();
282 let client_rx = client_rxs.lock().unwrap()[worker_idx % config.workers]
283 .take()
284 .unwrap();
285 spec.run_worker(timely_worker, client_rx);
286 })
287 .map_err(|e| anyhow!(e))?;
288
289 Ok(TimelyContainer {
290 client_txs,
291 worker_guards,
292 })
293 }
294}
295
296mod alloc {
297 pub(crate) fn lgalloc_refill(size: usize) -> LgallocHandle {
303 match lgalloc::allocate::<u8>(size) {
304 Ok((pointer, capacity, handle)) => {
305 let handle = Some(handle);
306 LgallocHandle {
307 handle,
308 pointer,
309 capacity,
310 }
311 }
312 Err(_) => {
313 let mut alloc = vec![0_u8; size];
315 alloc.shrink_to_fit();
317 let pointer = std::ptr::NonNull::new(alloc.as_mut_ptr()).unwrap();
319 std::mem::forget(alloc);
321 LgallocHandle {
322 handle: None,
323 pointer,
324 capacity: size,
325 }
326 }
327 }
328 }
329
330 pub(crate) struct LgallocHandle {
334 handle: Option<lgalloc::Handle>,
336 pointer: std::ptr::NonNull<u8>,
338 capacity: usize,
340 }
341
342 impl std::ops::Deref for LgallocHandle {
343 type Target = [u8];
344 #[inline(always)]
345 fn deref(&self) -> &Self::Target {
346 unsafe { std::slice::from_raw_parts(self.pointer.as_ptr(), self.capacity) }
347 }
348 }
349
350 impl std::ops::DerefMut for LgallocHandle {
351 #[inline(always)]
352 fn deref_mut(&mut self) -> &mut Self::Target {
353 unsafe { std::slice::from_raw_parts_mut(self.pointer.as_ptr(), self.capacity) }
354 }
355 }
356
357 impl Drop for LgallocHandle {
358 fn drop(&mut self) {
359 if let Some(handle) = self.handle.take() {
361 lgalloc::deallocate(handle);
362 } else {
363 unsafe { Vec::from_raw_parts(self.pointer.as_ptr(), 0, self.capacity) };
364 }
365 self.pointer = std::ptr::NonNull::dangling();
367 self.capacity = 0;
368 }
369 }
370}