mz_cluster/
client.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! An interactive cluster server.
11
12use std::fmt;
13use std::sync::{Arc, Mutex};
14use std::thread::Thread;
15
16use anyhow::{Error, anyhow};
17use async_trait::async_trait;
18use differential_dataflow::trace::ExertionLogic;
19use futures::future;
20use mz_cluster_client::client::{TimelyConfig, TryIntoProtocolNonce};
21use mz_service::client::{GenericClient, Partitionable, Partitioned};
22use mz_service::local::LocalClient;
23use timely::WorkerConfig;
24use timely::communication::Allocate;
25use timely::communication::allocator::GenericBuilder;
26use timely::communication::allocator::zero_copy::bytes_slab::BytesRefill;
27use timely::communication::initialize::WorkerGuards;
28use timely::execute::execute_from;
29use timely::worker::Worker as TimelyWorker;
30use tokio::runtime::Handle;
31use tokio::sync::mpsc;
32use tracing::info;
33use uuid::Uuid;
34
35use crate::communication::initialize_networking;
36
37type PartitionedClient<C, R> = Partitioned<LocalClient<C, R>, C, R>;
38
39/// A client managing access to the local portion of a Timely cluster
40pub struct ClusterClient<C>
41where
42    C: ClusterSpec,
43    (C::Command, C::Response): Partitionable<C::Command, C::Response>,
44{
45    /// The actual client to talk to the cluster
46    inner: Option<PartitionedClient<C::Command, C::Response>>,
47    /// The running timely instance
48    timely_container: Arc<Mutex<TimelyContainer<C>>>,
49}
50
51/// Metadata about timely workers in this process.
52pub struct TimelyContainer<C: ClusterSpec> {
53    /// Channels over which to send endpoints for wiring up a new Client
54    client_txs: Vec<
55        crossbeam_channel::Sender<(
56            Uuid,
57            crossbeam_channel::Receiver<C::Command>,
58            mpsc::UnboundedSender<C::Response>,
59        )>,
60    >,
61    /// Thread guards that keep worker threads alive
62    worker_guards: WorkerGuards<()>,
63}
64
65impl<C: ClusterSpec> TimelyContainer<C> {
66    fn worker_threads(&self) -> Vec<Thread> {
67        self.worker_guards
68            .guards()
69            .iter()
70            .map(|h| h.thread().clone())
71            .collect()
72    }
73}
74
75impl<C> ClusterClient<C>
76where
77    C: ClusterSpec,
78    (C::Command, C::Response): Partitionable<C::Command, C::Response>,
79{
80    /// Create a new `ClusterClient`.
81    pub fn new(timely_container: Arc<Mutex<TimelyContainer<C>>>) -> Self {
82        Self {
83            timely_container,
84            inner: None,
85        }
86    }
87
88    /// Connect to the Timely cluster with the given client nonce.
89    fn connect(&mut self, nonce: Uuid) -> Result<(), Error> {
90        let timely = self.timely_container.lock().expect("poisoned");
91
92        let mut command_txs = Vec::new();
93        let mut response_rxs = Vec::new();
94        for client_tx in &timely.client_txs {
95            let (cmd_tx, cmd_rx) = crossbeam_channel::unbounded();
96            let (resp_tx, resp_rx) = mpsc::unbounded_channel();
97
98            client_tx
99                .send((nonce, cmd_rx, resp_tx))
100                .expect("worker not dropped");
101
102            command_txs.push(cmd_tx);
103            response_rxs.push(resp_rx);
104        }
105
106        self.inner = Some(LocalClient::new_partitioned(
107            response_rxs,
108            command_txs,
109            timely.worker_threads(),
110        ));
111        Ok(())
112    }
113}
114
115impl<C> fmt::Debug for ClusterClient<C>
116where
117    C: ClusterSpec,
118    (C::Command, C::Response): Partitionable<C::Command, C::Response>,
119{
120    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
121        f.debug_struct("ClusterClient")
122            .field("inner", &self.inner)
123            .finish_non_exhaustive()
124    }
125}
126
127#[async_trait]
128impl<C> GenericClient<C::Command, C::Response> for ClusterClient<C>
129where
130    C: ClusterSpec,
131    (C::Command, C::Response): Partitionable<C::Command, C::Response>,
132{
133    async fn send(&mut self, cmd: C::Command) -> Result<(), Error> {
134        // Changing this debug statement requires changing the replica-isolation test
135        tracing::debug!("ClusterClient send={:?}", &cmd);
136
137        match cmd.try_into_protocol_nonce() {
138            Ok(nonce) => self.connect(nonce),
139            Err(cmd) => self.inner.as_mut().expect("initialized").send(cmd).await,
140        }
141    }
142
143    /// # Cancel safety
144    ///
145    /// This method is cancel safe. If `recv` is used as the event in a [`tokio::select!`]
146    /// statement and some other branch completes first, it is guaranteed that no messages were
147    /// received by this client.
148    async fn recv(&mut self) -> Result<Option<C::Response>, Error> {
149        if let Some(client) = self.inner.as_mut() {
150            // `Partitioned::recv` is documented as cancel safe.
151            client.recv().await
152        } else {
153            future::pending().await
154        }
155    }
156}
157
158/// Specification for a Timely cluster to which a [`ClusterClient`] connects.
159///
160/// This trait is used to make the [`ClusterClient`] generic over the compute and storage cluster
161/// implementations.
162#[async_trait]
163pub trait ClusterSpec: Clone + Send + Sync + 'static {
164    /// The cluster command type.
165    type Command: fmt::Debug + Send + TryIntoProtocolNonce;
166    /// The cluster response type.
167    type Response: fmt::Debug + Send;
168
169    /// Run the given Timely worker.
170    fn run_worker<A: Allocate + 'static>(
171        &self,
172        timely_worker: &mut TimelyWorker<A>,
173        client_rx: crossbeam_channel::Receiver<(
174            Uuid,
175            crossbeam_channel::Receiver<Self::Command>,
176            mpsc::UnboundedSender<Self::Response>,
177        )>,
178    );
179
180    /// Build a Timely cluster using the given config.
181    async fn build_cluster(
182        &self,
183        config: TimelyConfig,
184        tokio_executor: Handle,
185    ) -> Result<TimelyContainer<Self>, Error> {
186        info!("Building timely container with config {config:?}");
187        let (client_txs, client_rxs): (Vec<_>, Vec<_>) = (0..config.workers)
188            .map(|_| crossbeam_channel::unbounded())
189            .unzip();
190        let client_rxs: Mutex<Vec<_>> = Mutex::new(client_rxs.into_iter().map(Some).collect());
191
192        let refill = if config.enable_zero_copy_lgalloc {
193            BytesRefill {
194                logic: Arc::new(|size| Box::new(alloc::lgalloc_refill(size))),
195                limit: config.zero_copy_limit,
196            }
197        } else {
198            BytesRefill {
199                logic: Arc::new(|size| Box::new(vec![0; size])),
200                limit: config.zero_copy_limit,
201            }
202        };
203
204        let (builders, other) = if config.enable_zero_copy {
205            use timely::communication::allocator::zero_copy::allocator_process::ProcessBuilder;
206            initialize_networking::<ProcessBuilder>(
207                config.workers,
208                config.process,
209                config.addresses.clone(),
210                refill,
211                GenericBuilder::ZeroCopyBinary,
212            )
213            .await?
214        } else {
215            initialize_networking::<timely::communication::allocator::Process>(
216                config.workers,
217                config.process,
218                config.addresses.clone(),
219                refill,
220                GenericBuilder::ZeroCopy,
221            )
222            .await?
223        };
224
225        let mut worker_config = WorkerConfig::default();
226
227        // We set a custom exertion logic for proportionality > 0. A proportionality value of 0
228        // means that no arrangement merge effort is exerted and merging occurs only in response to
229        // updates.
230        if config.arrangement_exert_proportionality > 0 {
231            let merge_effort = Some(1000);
232
233            // ExertionLogic defines a function to determine if a spine is sufficiently tidied.
234            // Its arguments are an iterator over the index of a layer, the count of batches in the
235            // layer and the length of batches at the layer. The iterator enumerates layers from the
236            // largest to the smallest layer.
237
238            let arc: ExertionLogic = Arc::new(move |layers| {
239                let mut prop = config.arrangement_exert_proportionality;
240
241                // Layers are ordered from largest to smallest.
242                // Skip to the largest occupied layer.
243                let layers = layers
244                    .iter()
245                    .copied()
246                    .skip_while(|(_idx, count, _len)| *count == 0);
247
248                let mut first = true;
249                for (_idx, count, len) in layers {
250                    if count > 1 {
251                        // Found an in-progress merge that we should continue.
252                        return merge_effort;
253                    }
254
255                    if !first && prop > 0 && len > 0 {
256                        // Found a non-empty batch within `arrangement_exert_proportionality` of
257                        // the largest one.
258                        return merge_effort;
259                    }
260
261                    first = false;
262                    prop /= 2;
263                }
264
265                None
266            });
267            worker_config.set::<ExertionLogic>("differential/default_exert_logic".to_string(), arc);
268        }
269
270        let spec = self.clone();
271        let worker_guards = execute_from(builders, other, worker_config, move |timely_worker| {
272            let timely_worker_index = timely_worker.index();
273            let _tokio_guard = tokio_executor.enter();
274            let client_rx = client_rxs.lock().unwrap()[timely_worker_index % config.workers]
275                .take()
276                .unwrap();
277            spec.run_worker(timely_worker, client_rx);
278        })
279        .map_err(|e| anyhow!(e))?;
280
281        Ok(TimelyContainer {
282            client_txs,
283            worker_guards,
284        })
285    }
286}
287
288mod alloc {
289    /// A Timely communication refill function that uses lgalloc.
290    ///
291    /// Returns a handle to lgalloc'ed memory if lgalloc can handle
292    /// the request, otherwise we fall back to a heap allocation. In either case, the handle must
293    /// be dropped to free the memory.
294    pub(crate) fn lgalloc_refill(size: usize) -> LgallocHandle {
295        match lgalloc::allocate::<u8>(size) {
296            Ok((pointer, capacity, handle)) => {
297                let handle = Some(handle);
298                LgallocHandle {
299                    handle,
300                    pointer,
301                    capacity,
302                }
303            }
304            Err(_) => {
305                // Allocate memory
306                let mut alloc = vec![0_u8; size];
307                // Ensure that the length matches the capacity.
308                alloc.shrink_to_fit();
309                // Get a pointer to the allocated memory.
310                let pointer = std::ptr::NonNull::new(alloc.as_mut_ptr()).unwrap();
311                // Forget the vector to avoid dropping it. We'll free the memory in `drop`.
312                std::mem::forget(alloc);
313                LgallocHandle {
314                    handle: None,
315                    pointer,
316                    capacity: size,
317                }
318            }
319        }
320    }
321
322    /// A handle to memory allocated by lgalloc. This can either be memory allocated by lgalloc or
323    /// memory allocated by Vec. If the handle is set, it's lgalloc memory. If the handle is None,
324    /// it's a regular heap allocation.
325    pub(crate) struct LgallocHandle {
326        /// Lgalloc handle, set if the memory was allocated by lgalloc.
327        handle: Option<lgalloc::Handle>,
328        /// Pointer to the allocated memory. Always well-aligned, but can be dangling.
329        pointer: std::ptr::NonNull<u8>,
330        /// Capacity of the allocated memory in bytes.
331        capacity: usize,
332    }
333
334    impl std::ops::Deref for LgallocHandle {
335        type Target = [u8];
336        #[inline(always)]
337        fn deref(&self) -> &Self::Target {
338            unsafe { std::slice::from_raw_parts(self.pointer.as_ptr(), self.capacity) }
339        }
340    }
341
342    impl std::ops::DerefMut for LgallocHandle {
343        #[inline(always)]
344        fn deref_mut(&mut self) -> &mut Self::Target {
345            unsafe { std::slice::from_raw_parts_mut(self.pointer.as_ptr(), self.capacity) }
346        }
347    }
348
349    impl Drop for LgallocHandle {
350        fn drop(&mut self) {
351            // If we have a handle, it's lgalloc memory. Otherwise, it's a heap allocation.
352            if let Some(handle) = self.handle.take() {
353                lgalloc::deallocate(handle);
354            } else {
355                unsafe { Vec::from_raw_parts(self.pointer.as_ptr(), 0, self.capacity) };
356            }
357            // Update pointer and capacity such that we don't double-free.
358            self.pointer = std::ptr::NonNull::dangling();
359            self.capacity = 0;
360        }
361    }
362}