1use std::fmt;
13use std::sync::{Arc, Mutex};
14use std::thread::Thread;
15
16use anyhow::{Error, anyhow};
17use async_trait::async_trait;
18use differential_dataflow::trace::ExertionLogic;
19use futures::future;
20use mz_cluster_client::client::{ClusterStartupEpoch, TimelyConfig, TryIntoTimelyConfig};
21use mz_ore::error::ErrorExt;
22use mz_ore::halt;
23use mz_service::client::{GenericClient, Partitionable, Partitioned};
24use mz_service::local::LocalClient;
25use timely::WorkerConfig;
26use timely::communication::Allocate;
27use timely::communication::allocator::GenericBuilder;
28use timely::communication::allocator::zero_copy::bytes_slab::BytesRefill;
29use timely::communication::initialize::WorkerGuards;
30use timely::execute::execute_from;
31use timely::worker::Worker as TimelyWorker;
32use tokio::runtime::Handle;
33use tokio::sync::mpsc;
34use tracing::{info, warn};
35
36use crate::communication::initialize_networking;
37
38type PartitionedClient<C, R> = Partitioned<LocalClient<C, R>, C, R>;
39
40pub struct ClusterClient<C>
42where
43 C: ClusterSpec,
44 (C::Command, C::Response): Partitionable<C::Command, C::Response>,
45{
46 inner: Option<PartitionedClient<C::Command, C::Response>>,
48 timely_container: TimelyContainerRef<C>,
50 tokio_handle: tokio::runtime::Handle,
52 cluster_spec: C,
53}
54
55pub struct TimelyContainer<C: ClusterSpec> {
57 config: TimelyConfig,
59 client_txs: Vec<
61 crossbeam_channel::Sender<(
62 crossbeam_channel::Receiver<C::Command>,
63 mpsc::UnboundedSender<C::Response>,
64 )>,
65 >,
66 worker_guards: WorkerGuards<()>,
68}
69
70impl<C: ClusterSpec> TimelyContainer<C> {
71 fn worker_threads(&self) -> Vec<Thread> {
72 self.worker_guards
73 .guards()
74 .iter()
75 .map(|h| h.thread().clone())
76 .collect()
77 }
78}
79
80impl<C: ClusterSpec> Drop for TimelyContainer<C> {
81 fn drop(&mut self) {
82 panic!("Timely container must never drop");
83 }
84}
85
86type TimelyContainerRef<C> = Arc<tokio::sync::Mutex<Option<TimelyContainer<C>>>>;
88
89impl<C> ClusterClient<C>
90where
91 C: ClusterSpec,
92 (C::Command, C::Response): Partitionable<C::Command, C::Response>,
93{
94 pub fn new(
96 timely_container: TimelyContainerRef<C>,
97 tokio_handle: tokio::runtime::Handle,
98 cluster_spec: C,
99 ) -> Self {
100 Self {
101 timely_container,
102 inner: None,
103 tokio_handle,
104 cluster_spec,
105 }
106 }
107
108 async fn build(
109 &mut self,
110 mut config: TimelyConfig,
111 epoch: ClusterStartupEpoch,
112 ) -> Result<(), Error> {
113 let workers = config.workers;
114
115 let mut timely_container = self.timely_container.lock().await;
122 match &*timely_container {
123 Some(existing) => {
124 config.enable_create_sockets_v2 = existing.config.enable_create_sockets_v2;
127
128 if config != existing.config {
129 info!(new = ?config, old = ?existing.config, "TimelyConfig mismatch");
130 halt!("new timely configuration does not match existing timely configuration");
131 }
132 info!("Timely already initialized; re-using.",);
133 }
134 None => {
135 let timely = self
136 .cluster_spec
137 .build_cluster(config, epoch, self.tokio_handle.clone())
138 .await
139 .inspect_err(|e| {
140 warn!("timely initialization failed: {}", e.display_with_causes())
141 })?;
142
143 *timely_container = Some(timely);
144 }
145 };
146
147 let timely = timely_container.as_ref().expect("set above");
148
149 let mut command_txs = Vec::with_capacity(workers);
150 let mut response_rxs = Vec::with_capacity(workers);
151 for client_tx in &timely.client_txs {
152 let (cmd_tx, cmd_rx) = crossbeam_channel::unbounded();
153 let (resp_tx, resp_rx) = mpsc::unbounded_channel();
154
155 client_tx
156 .send((cmd_rx, resp_tx))
157 .expect("worker not dropped");
158
159 command_txs.push(cmd_tx);
160 response_rxs.push(resp_rx);
161 }
162
163 self.inner = Some(LocalClient::new_partitioned(
164 response_rxs,
165 command_txs,
166 timely.worker_threads(),
167 ));
168 Ok(())
169 }
170}
171
172impl<C> fmt::Debug for ClusterClient<C>
173where
174 C: ClusterSpec,
175 (C::Command, C::Response): Partitionable<C::Command, C::Response>,
176{
177 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
178 f.debug_struct("ClusterClient")
179 .field("inner", &self.inner)
180 .finish_non_exhaustive()
181 }
182}
183
184#[async_trait]
185impl<C> GenericClient<C::Command, C::Response> for ClusterClient<C>
186where
187 C: ClusterSpec,
188 (C::Command, C::Response): Partitionable<C::Command, C::Response>,
189{
190 async fn send(&mut self, cmd: C::Command) -> Result<(), Error> {
191 tracing::debug!("ClusterClient send={:?}", &cmd);
193 match cmd.try_into_timely_config() {
194 Ok((config, epoch)) => self.build(config, epoch).await,
195 Err(cmd) => self.inner.as_mut().expect("initialized").send(cmd).await,
196 }
197 }
198
199 async fn recv(&mut self) -> Result<Option<C::Response>, Error> {
205 if let Some(client) = self.inner.as_mut() {
206 client.recv().await
208 } else {
209 future::pending().await
210 }
211 }
212}
213
214#[async_trait]
219pub trait ClusterSpec: Clone + Send + Sync + 'static {
220 type Command: fmt::Debug + Send + TryIntoTimelyConfig;
222 type Response: fmt::Debug + Send;
224
225 fn run_worker<A: Allocate + 'static>(
227 &self,
228 timely_worker: &mut TimelyWorker<A>,
229 client_rx: crossbeam_channel::Receiver<(
230 crossbeam_channel::Receiver<Self::Command>,
231 mpsc::UnboundedSender<Self::Response>,
232 )>,
233 );
234
235 async fn build_cluster(
237 &self,
238 config: TimelyConfig,
239 epoch: ClusterStartupEpoch,
240 tokio_executor: Handle,
241 ) -> Result<TimelyContainer<Self>, Error> {
242 info!("Building timely container with config {config:?}");
243 let (client_txs, client_rxs): (Vec<_>, Vec<_>) = (0..config.workers)
244 .map(|_| crossbeam_channel::unbounded())
245 .unzip();
246 let client_rxs: Mutex<Vec<_>> = Mutex::new(client_rxs.into_iter().map(Some).collect());
247
248 let refill = if config.enable_zero_copy_lgalloc {
249 BytesRefill {
250 logic: Arc::new(|size| Box::new(alloc::lgalloc_refill(size))),
251 limit: config.zero_copy_limit,
252 }
253 } else {
254 BytesRefill {
255 logic: Arc::new(|size| Box::new(vec![0; size])),
256 limit: config.zero_copy_limit,
257 }
258 };
259
260 let (builders, other) = if config.enable_zero_copy {
261 use timely::communication::allocator::zero_copy::allocator_process::ProcessBuilder;
262 initialize_networking::<ProcessBuilder>(
263 config.workers,
264 config.process,
265 config.addresses.clone(),
266 epoch,
267 refill,
268 GenericBuilder::ZeroCopyBinary,
269 config.enable_create_sockets_v2,
270 )
271 .await?
272 } else {
273 initialize_networking::<timely::communication::allocator::Process>(
274 config.workers,
275 config.process,
276 config.addresses.clone(),
277 epoch,
278 refill,
279 GenericBuilder::ZeroCopy,
280 config.enable_create_sockets_v2,
281 )
282 .await?
283 };
284
285 let mut worker_config = WorkerConfig::default();
286
287 if config.arrangement_exert_proportionality > 0 {
291 let merge_effort = Some(1000);
292
293 let arc: ExertionLogic = Arc::new(move |layers| {
299 let mut prop = config.arrangement_exert_proportionality;
300
301 let layers = layers
304 .iter()
305 .copied()
306 .skip_while(|(_idx, count, _len)| *count == 0);
307
308 let mut first = true;
309 for (_idx, count, len) in layers {
310 if count > 1 {
311 return merge_effort;
313 }
314
315 if !first && prop > 0 && len > 0 {
316 return merge_effort;
319 }
320
321 first = false;
322 prop /= 2;
323 }
324
325 None
326 });
327 worker_config.set::<ExertionLogic>("differential/default_exert_logic".to_string(), arc);
328 }
329
330 let spec = self.clone();
331 let worker_guards = execute_from(builders, other, worker_config, move |timely_worker| {
332 let timely_worker_index = timely_worker.index();
333 let _tokio_guard = tokio_executor.enter();
334 let client_rx = client_rxs.lock().unwrap()[timely_worker_index % config.workers]
335 .take()
336 .unwrap();
337 spec.run_worker(timely_worker, client_rx);
338 })
339 .map_err(|e| anyhow!(e))?;
340
341 Ok(TimelyContainer {
342 config,
343 client_txs,
344 worker_guards,
345 })
346 }
347}
348
349mod alloc {
350 pub(crate) fn lgalloc_refill(size: usize) -> LgallocHandle {
356 match lgalloc::allocate::<u8>(size) {
357 Ok((pointer, capacity, handle)) => {
358 let handle = Some(handle);
359 LgallocHandle {
360 handle,
361 pointer,
362 capacity,
363 }
364 }
365 Err(_) => {
366 let mut alloc = vec![0_u8; size];
368 alloc.shrink_to_fit();
370 let pointer = std::ptr::NonNull::new(alloc.as_mut_ptr()).unwrap();
372 std::mem::forget(alloc);
374 LgallocHandle {
375 handle: None,
376 pointer,
377 capacity: size,
378 }
379 }
380 }
381 }
382
383 pub(crate) struct LgallocHandle {
387 handle: Option<lgalloc::Handle>,
389 pointer: std::ptr::NonNull<u8>,
391 capacity: usize,
393 }
394
395 impl std::ops::Deref for LgallocHandle {
396 type Target = [u8];
397 #[inline(always)]
398 fn deref(&self) -> &Self::Target {
399 unsafe { std::slice::from_raw_parts(self.pointer.as_ptr(), self.capacity) }
400 }
401 }
402
403 impl std::ops::DerefMut for LgallocHandle {
404 #[inline(always)]
405 fn deref_mut(&mut self) -> &mut Self::Target {
406 unsafe { std::slice::from_raw_parts_mut(self.pointer.as_ptr(), self.capacity) }
407 }
408 }
409
410 impl Drop for LgallocHandle {
411 fn drop(&mut self) {
412 if let Some(handle) = self.handle.take() {
414 lgalloc::deallocate(handle);
415 } else {
416 unsafe { Vec::from_raw_parts(self.pointer.as_ptr(), 0, self.capacity) };
417 }
418 self.pointer = std::ptr::NonNull::dangling();
420 self.capacity = 0;
421 }
422 }
423}