Skip to main content

mz_cluster/
client.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! An interactive cluster server.
11
12use std::fmt;
13use std::sync::{Arc, Mutex};
14use std::thread::Thread;
15
16use anyhow::{Error, anyhow};
17use async_trait::async_trait;
18use differential_dataflow::trace::ExertionLogic;
19use futures::future;
20use mz_cluster_client::client::{TimelyConfig, TryIntoProtocolNonce};
21use mz_service::client::{GenericClient, Partitionable, Partitioned};
22use mz_service::local::LocalClient;
23use timely::WorkerConfig;
24use timely::communication::allocator::zero_copy::bytes_slab::BytesRefill;
25use timely::communication::initialize::WorkerGuards;
26use timely::execute::execute_from;
27use timely::worker::Worker as TimelyWorker;
28use tokio::runtime::Handle;
29use tokio::sync::mpsc;
30use tracing::{info, info_span};
31use uuid::Uuid;
32
33use crate::communication::initialize_networking;
34
35type PartitionedClient<C, R> = Partitioned<LocalClient<C, R>, C, R>;
36
37/// A client managing access to the local portion of a Timely cluster
38pub struct ClusterClient<C>
39where
40    C: ClusterSpec,
41    (C::Command, C::Response): Partitionable<C::Command, C::Response>,
42{
43    /// The actual client to talk to the cluster
44    inner: Option<PartitionedClient<C::Command, C::Response>>,
45    /// The running timely instance
46    timely_container: Arc<Mutex<TimelyContainer<C>>>,
47}
48
49/// Metadata about timely workers in this process.
50pub struct TimelyContainer<C: ClusterSpec> {
51    /// Channels over which to send endpoints for wiring up a new Client
52    client_txs: Vec<
53        mpsc::UnboundedSender<(
54            Uuid,
55            mpsc::UnboundedReceiver<C::Command>,
56            mpsc::UnboundedSender<C::Response>,
57        )>,
58    >,
59    /// Thread guards that keep worker threads alive
60    worker_guards: WorkerGuards<()>,
61}
62
63impl<C: ClusterSpec> TimelyContainer<C> {
64    fn worker_threads(&self) -> Vec<Thread> {
65        self.worker_guards
66            .guards()
67            .iter()
68            .map(|h| h.thread().clone())
69            .collect()
70    }
71}
72
73impl<C> ClusterClient<C>
74where
75    C: ClusterSpec,
76    (C::Command, C::Response): Partitionable<C::Command, C::Response>,
77{
78    /// Create a new `ClusterClient`.
79    pub fn new(timely_container: Arc<Mutex<TimelyContainer<C>>>) -> Self {
80        Self {
81            timely_container,
82            inner: None,
83        }
84    }
85
86    /// Connect to the Timely cluster with the given client nonce.
87    fn connect(&mut self, nonce: Uuid) -> Result<(), Error> {
88        let timely = self.timely_container.lock().expect("poisoned");
89
90        let mut command_txs = Vec::new();
91        let mut response_rxs = Vec::new();
92        for client_tx in &timely.client_txs {
93            let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
94            let (resp_tx, resp_rx) = mpsc::unbounded_channel();
95
96            client_tx
97                .send((nonce, cmd_rx, resp_tx))
98                .expect("worker not dropped");
99
100            command_txs.push(cmd_tx);
101            response_rxs.push(resp_rx);
102        }
103
104        self.inner = Some(LocalClient::new_partitioned(
105            response_rxs,
106            command_txs,
107            timely.worker_threads(),
108        ));
109        Ok(())
110    }
111}
112
113impl<C> fmt::Debug for ClusterClient<C>
114where
115    C: ClusterSpec,
116    (C::Command, C::Response): Partitionable<C::Command, C::Response>,
117{
118    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
119        f.debug_struct("ClusterClient")
120            .field("inner", &self.inner)
121            .finish_non_exhaustive()
122    }
123}
124
125#[async_trait]
126impl<C> GenericClient<C::Command, C::Response> for ClusterClient<C>
127where
128    C: ClusterSpec,
129    (C::Command, C::Response): Partitionable<C::Command, C::Response>,
130{
131    async fn send(&mut self, cmd: C::Command) -> Result<(), Error> {
132        // Changing this debug statement requires changing the replica-isolation test
133        tracing::debug!("ClusterClient send={:?}", &cmd);
134
135        match cmd.try_into_protocol_nonce() {
136            Ok(nonce) => self.connect(nonce),
137            Err(cmd) => self.inner.as_mut().expect("initialized").send(cmd).await,
138        }
139    }
140
141    /// # Cancel safety
142    ///
143    /// This method is cancel safe. If `recv` is used as the event in a [`tokio::select!`]
144    /// statement and some other branch completes first, it is guaranteed that no messages were
145    /// received by this client.
146    async fn recv(&mut self) -> Result<Option<C::Response>, Error> {
147        if let Some(client) = self.inner.as_mut() {
148            // `Partitioned::recv` is documented as cancel safe.
149            client.recv().await
150        } else {
151            future::pending().await
152        }
153    }
154}
155
156/// Specification for a Timely cluster to which a [`ClusterClient`] connects.
157///
158/// This trait is used to make the [`ClusterClient`] generic over the compute and storage cluster
159/// implementations.
160#[async_trait]
161pub trait ClusterSpec: Clone + Send + Sync + 'static {
162    /// The cluster command type.
163    type Command: fmt::Debug + Send + TryIntoProtocolNonce;
164    /// The cluster response type.
165    type Response: fmt::Debug + Send;
166
167    /// The name of this cluster ("compute" or "storage").
168    const NAME: &str;
169
170    /// Run the given Timely worker.
171    fn run_worker(
172        &self,
173        timely_worker: &mut TimelyWorker,
174        client_rx: mpsc::UnboundedReceiver<(
175            Uuid,
176            mpsc::UnboundedReceiver<Self::Command>,
177            mpsc::UnboundedSender<Self::Response>,
178        )>,
179    );
180
181    /// Build a Timely cluster using the given config.
182    async fn build_cluster(
183        &self,
184        config: TimelyConfig,
185        tokio_executor: Handle,
186    ) -> Result<TimelyContainer<Self>, Error> {
187        info!("Building timely container with config {config:?}");
188        let (client_txs, client_rxs): (Vec<_>, Vec<_>) = (0..config.workers)
189            .map(|_| mpsc::unbounded_channel())
190            .unzip();
191        let client_rxs: Mutex<Vec<_>> = Mutex::new(client_rxs.into_iter().map(Some).collect());
192
193        let refill = if config.enable_zero_copy_lgalloc {
194            BytesRefill {
195                logic: Arc::new(|size| Box::new(alloc::lgalloc_refill(size))),
196                limit: config.zero_copy_limit,
197            }
198        } else {
199            BytesRefill {
200                logic: Arc::new(|size| Box::new(vec![0; size])),
201                limit: config.zero_copy_limit,
202            }
203        };
204
205        let (builders, other) = initialize_networking(
206            config.workers,
207            config.process,
208            config.addresses.clone(),
209            refill,
210            config.enable_zero_copy,
211        )
212        .await?;
213
214        let mut worker_config = WorkerConfig::default();
215
216        // We set a custom exertion logic for proportionality > 0. A proportionality value of 0
217        // means that no arrangement merge effort is exerted and merging occurs only in response to
218        // updates.
219        if config.arrangement_exert_proportionality > 0 {
220            let merge_effort = Some(1000);
221
222            // ExertionLogic defines a function to determine if a spine is sufficiently tidied.
223            // Its arguments are an iterator over the index of a layer, the count of batches in the
224            // layer and the length of batches at the layer. The iterator enumerates layers from the
225            // largest to the smallest layer.
226
227            let arc: ExertionLogic = Arc::new(move |layers| {
228                let mut prop = config.arrangement_exert_proportionality;
229
230                // Layers are ordered from largest to smallest.
231                // Skip to the largest occupied layer.
232                let layers = layers
233                    .iter()
234                    .copied()
235                    .skip_while(|(_idx, count, _len)| *count == 0);
236
237                let mut first = true;
238                for (_idx, count, len) in layers {
239                    if count > 1 {
240                        // Found an in-progress merge that we should continue.
241                        return merge_effort;
242                    }
243
244                    if !first && prop > 0 && len > 0 {
245                        // Found a non-empty batch within `arrangement_exert_proportionality` of
246                        // the largest one.
247                        return merge_effort;
248                    }
249
250                    first = false;
251                    prop /= 2;
252                }
253
254                None
255            });
256            worker_config.set::<ExertionLogic>("differential/default_exert_logic".to_string(), arc);
257        }
258
259        let spec = self.clone();
260        let worker_guards = execute_from(builders, other, worker_config, move |timely_worker| {
261            let worker_idx = timely_worker.index();
262
263            // Per worker tracing span, lets us identify Timely clusters and workers in the logs.
264            let span = info_span!("timely", name = Self::NAME, worker_id = worker_idx);
265            let _span_guard = span.enter();
266
267            let _tokio_guard = tokio_executor.enter();
268            let client_rx = client_rxs.lock().unwrap()[worker_idx % config.workers]
269                .take()
270                .unwrap();
271            spec.run_worker(timely_worker, client_rx);
272        })
273        .map_err(|e| anyhow!(e))?;
274
275        Ok(TimelyContainer {
276            client_txs,
277            worker_guards,
278        })
279    }
280}
281
282mod alloc {
283    /// A Timely communication refill function that uses lgalloc.
284    ///
285    /// Returns a handle to lgalloc'ed memory if lgalloc can handle
286    /// the request, otherwise we fall back to a heap allocation. In either case, the handle must
287    /// be dropped to free the memory.
288    pub(crate) fn lgalloc_refill(size: usize) -> LgallocHandle {
289        match lgalloc::allocate::<u8>(size) {
290            Ok((pointer, capacity, handle)) => {
291                let handle = Some(handle);
292                LgallocHandle {
293                    handle,
294                    pointer,
295                    capacity,
296                }
297            }
298            Err(_) => {
299                // Allocate memory
300                let mut alloc = vec![0_u8; size];
301                // Ensure that the length matches the capacity.
302                alloc.shrink_to_fit();
303                // Get a pointer to the allocated memory.
304                let pointer = std::ptr::NonNull::new(alloc.as_mut_ptr()).unwrap();
305                // Forget the vector to avoid dropping it. We'll free the memory in `drop`.
306                std::mem::forget(alloc);
307                LgallocHandle {
308                    handle: None,
309                    pointer,
310                    capacity: size,
311                }
312            }
313        }
314    }
315
316    /// A handle to memory allocated by lgalloc. This can either be memory allocated by lgalloc or
317    /// memory allocated by Vec. If the handle is set, it's lgalloc memory. If the handle is None,
318    /// it's a regular heap allocation.
319    pub(crate) struct LgallocHandle {
320        /// Lgalloc handle, set if the memory was allocated by lgalloc.
321        handle: Option<lgalloc::Handle>,
322        /// Pointer to the allocated memory. Always well-aligned, but can be dangling.
323        pointer: std::ptr::NonNull<u8>,
324        /// Capacity of the allocated memory in bytes.
325        capacity: usize,
326    }
327
328    impl std::ops::Deref for LgallocHandle {
329        type Target = [u8];
330        #[inline(always)]
331        fn deref(&self) -> &Self::Target {
332            unsafe { std::slice::from_raw_parts(self.pointer.as_ptr(), self.capacity) }
333        }
334    }
335
336    impl std::ops::DerefMut for LgallocHandle {
337        #[inline(always)]
338        fn deref_mut(&mut self) -> &mut Self::Target {
339            unsafe { std::slice::from_raw_parts_mut(self.pointer.as_ptr(), self.capacity) }
340        }
341    }
342
343    impl Drop for LgallocHandle {
344        fn drop(&mut self) {
345            // If we have a handle, it's lgalloc memory. Otherwise, it's a heap allocation.
346            if let Some(handle) = self.handle.take() {
347                lgalloc::deallocate(handle);
348            } else {
349                unsafe { Vec::from_raw_parts(self.pointer.as_ptr(), 0, self.capacity) };
350            }
351            // Update pointer and capacity such that we don't double-free.
352            self.pointer = std::ptr::NonNull::dangling();
353            self.capacity = 0;
354        }
355    }
356}