1use std::fmt;
13use std::sync::{Arc, Mutex};
14use std::thread::Thread;
15
16use anyhow::{Error, anyhow};
17use async_trait::async_trait;
18use differential_dataflow::trace::ExertionLogic;
19use futures::future;
20use mz_cluster_client::client::{TimelyConfig, TryIntoProtocolNonce};
21use mz_service::client::{GenericClient, Partitionable, Partitioned};
22use mz_service::local::LocalClient;
23use timely::WorkerConfig;
24use timely::communication::allocator::zero_copy::bytes_slab::BytesRefill;
25use timely::communication::initialize::WorkerGuards;
26use timely::execute::execute_from;
27use timely::worker::Worker as TimelyWorker;
28use tokio::runtime::Handle;
29use tokio::sync::mpsc;
30use tracing::{info, info_span};
31use uuid::Uuid;
32
33use crate::communication::initialize_networking;
34
35type PartitionedClient<C, R> = Partitioned<LocalClient<C, R>, C, R>;
36
37pub struct ClusterClient<C>
39where
40 C: ClusterSpec,
41 (C::Command, C::Response): Partitionable<C::Command, C::Response>,
42{
43 inner: Option<PartitionedClient<C::Command, C::Response>>,
45 timely_container: Arc<Mutex<TimelyContainer<C>>>,
47}
48
49pub struct TimelyContainer<C: ClusterSpec> {
51 client_txs: Vec<
53 mpsc::UnboundedSender<(
54 Uuid,
55 mpsc::UnboundedReceiver<C::Command>,
56 mpsc::UnboundedSender<C::Response>,
57 )>,
58 >,
59 worker_guards: WorkerGuards<()>,
61}
62
63impl<C: ClusterSpec> TimelyContainer<C> {
64 fn worker_threads(&self) -> Vec<Thread> {
65 self.worker_guards
66 .guards()
67 .iter()
68 .map(|h| h.thread().clone())
69 .collect()
70 }
71}
72
73impl<C> ClusterClient<C>
74where
75 C: ClusterSpec,
76 (C::Command, C::Response): Partitionable<C::Command, C::Response>,
77{
78 pub fn new(timely_container: Arc<Mutex<TimelyContainer<C>>>) -> Self {
80 Self {
81 timely_container,
82 inner: None,
83 }
84 }
85
86 fn connect(&mut self, nonce: Uuid) -> Result<(), Error> {
88 let timely = self.timely_container.lock().expect("poisoned");
89
90 let mut command_txs = Vec::new();
91 let mut response_rxs = Vec::new();
92 for client_tx in &timely.client_txs {
93 let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
94 let (resp_tx, resp_rx) = mpsc::unbounded_channel();
95
96 client_tx
97 .send((nonce, cmd_rx, resp_tx))
98 .expect("worker not dropped");
99
100 command_txs.push(cmd_tx);
101 response_rxs.push(resp_rx);
102 }
103
104 self.inner = Some(LocalClient::new_partitioned(
105 response_rxs,
106 command_txs,
107 timely.worker_threads(),
108 ));
109 Ok(())
110 }
111}
112
113impl<C> fmt::Debug for ClusterClient<C>
114where
115 C: ClusterSpec,
116 (C::Command, C::Response): Partitionable<C::Command, C::Response>,
117{
118 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
119 f.debug_struct("ClusterClient")
120 .field("inner", &self.inner)
121 .finish_non_exhaustive()
122 }
123}
124
125#[async_trait]
126impl<C> GenericClient<C::Command, C::Response> for ClusterClient<C>
127where
128 C: ClusterSpec,
129 (C::Command, C::Response): Partitionable<C::Command, C::Response>,
130{
131 async fn send(&mut self, cmd: C::Command) -> Result<(), Error> {
132 tracing::debug!("ClusterClient send={:?}", &cmd);
134
135 match cmd.try_into_protocol_nonce() {
136 Ok(nonce) => self.connect(nonce),
137 Err(cmd) => self.inner.as_mut().expect("initialized").send(cmd).await,
138 }
139 }
140
141 async fn recv(&mut self) -> Result<Option<C::Response>, Error> {
147 if let Some(client) = self.inner.as_mut() {
148 client.recv().await
150 } else {
151 future::pending().await
152 }
153 }
154}
155
156#[async_trait]
161pub trait ClusterSpec: Clone + Send + Sync + 'static {
162 type Command: fmt::Debug + Send + TryIntoProtocolNonce;
164 type Response: fmt::Debug + Send;
166
167 const NAME: &str;
169
170 fn run_worker(
172 &self,
173 timely_worker: &mut TimelyWorker,
174 client_rx: mpsc::UnboundedReceiver<(
175 Uuid,
176 mpsc::UnboundedReceiver<Self::Command>,
177 mpsc::UnboundedSender<Self::Response>,
178 )>,
179 );
180
181 async fn build_cluster(
183 &self,
184 config: TimelyConfig,
185 tokio_executor: Handle,
186 ) -> Result<TimelyContainer<Self>, Error> {
187 info!("Building timely container with config {config:?}");
188 let (client_txs, client_rxs): (Vec<_>, Vec<_>) = (0..config.workers)
189 .map(|_| mpsc::unbounded_channel())
190 .unzip();
191 let client_rxs: Mutex<Vec<_>> = Mutex::new(client_rxs.into_iter().map(Some).collect());
192
193 let refill = if config.enable_zero_copy_lgalloc {
194 BytesRefill {
195 logic: Arc::new(|size| Box::new(alloc::lgalloc_refill(size))),
196 limit: config.zero_copy_limit,
197 }
198 } else {
199 BytesRefill {
200 logic: Arc::new(|size| Box::new(vec![0; size])),
201 limit: config.zero_copy_limit,
202 }
203 };
204
205 let (builders, other) = initialize_networking(
206 config.workers,
207 config.process,
208 config.addresses.clone(),
209 refill,
210 config.enable_zero_copy,
211 )
212 .await?;
213
214 let mut worker_config = WorkerConfig::default();
215
216 if config.arrangement_exert_proportionality > 0 {
220 let merge_effort = Some(1000);
221
222 let arc: ExertionLogic = Arc::new(move |layers| {
228 let mut prop = config.arrangement_exert_proportionality;
229
230 let layers = layers
233 .iter()
234 .copied()
235 .skip_while(|(_idx, count, _len)| *count == 0);
236
237 let mut first = true;
238 for (_idx, count, len) in layers {
239 if count > 1 {
240 return merge_effort;
242 }
243
244 if !first && prop > 0 && len > 0 {
245 return merge_effort;
248 }
249
250 first = false;
251 prop /= 2;
252 }
253
254 None
255 });
256 worker_config.set::<ExertionLogic>("differential/default_exert_logic".to_string(), arc);
257 }
258
259 let spec = self.clone();
260 let worker_guards = execute_from(builders, other, worker_config, move |timely_worker| {
261 let worker_idx = timely_worker.index();
262
263 let span = info_span!("timely", name = Self::NAME, worker_id = worker_idx);
265 let _span_guard = span.enter();
266
267 let _tokio_guard = tokio_executor.enter();
268 let client_rx = client_rxs.lock().unwrap()[worker_idx % config.workers]
269 .take()
270 .unwrap();
271 spec.run_worker(timely_worker, client_rx);
272 })
273 .map_err(|e| anyhow!(e))?;
274
275 Ok(TimelyContainer {
276 client_txs,
277 worker_guards,
278 })
279 }
280}
281
282mod alloc {
283 pub(crate) fn lgalloc_refill(size: usize) -> LgallocHandle {
289 match lgalloc::allocate::<u8>(size) {
290 Ok((pointer, capacity, handle)) => {
291 let handle = Some(handle);
292 LgallocHandle {
293 handle,
294 pointer,
295 capacity,
296 }
297 }
298 Err(_) => {
299 let mut alloc = vec![0_u8; size];
301 alloc.shrink_to_fit();
303 let pointer = std::ptr::NonNull::new(alloc.as_mut_ptr()).unwrap();
305 std::mem::forget(alloc);
307 LgallocHandle {
308 handle: None,
309 pointer,
310 capacity: size,
311 }
312 }
313 }
314 }
315
316 pub(crate) struct LgallocHandle {
320 handle: Option<lgalloc::Handle>,
322 pointer: std::ptr::NonNull<u8>,
324 capacity: usize,
326 }
327
328 impl std::ops::Deref for LgallocHandle {
329 type Target = [u8];
330 #[inline(always)]
331 fn deref(&self) -> &Self::Target {
332 unsafe { std::slice::from_raw_parts(self.pointer.as_ptr(), self.capacity) }
333 }
334 }
335
336 impl std::ops::DerefMut for LgallocHandle {
337 #[inline(always)]
338 fn deref_mut(&mut self) -> &mut Self::Target {
339 unsafe { std::slice::from_raw_parts_mut(self.pointer.as_ptr(), self.capacity) }
340 }
341 }
342
343 impl Drop for LgallocHandle {
344 fn drop(&mut self) {
345 if let Some(handle) = self.handle.take() {
347 lgalloc::deallocate(handle);
348 } else {
349 unsafe { Vec::from_raw_parts(self.pointer.as_ptr(), 0, self.capacity) };
350 }
351 self.pointer = std::ptr::NonNull::dangling();
353 self.capacity = 0;
354 }
355 }
356}