mz_cluster/
client.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! An interactive cluster server.
11
12use std::fmt;
13use std::sync::{Arc, Mutex};
14use std::thread::Thread;
15
16use anyhow::{Error, anyhow};
17use async_trait::async_trait;
18use differential_dataflow::trace::ExertionLogic;
19use futures::future;
20use mz_cluster_client::client::{ClusterStartupEpoch, TimelyConfig, TryIntoTimelyConfig};
21use mz_ore::error::ErrorExt;
22use mz_ore::halt;
23use mz_service::client::{GenericClient, Partitionable, Partitioned};
24use mz_service::local::LocalClient;
25use timely::WorkerConfig;
26use timely::communication::Allocate;
27use timely::communication::allocator::GenericBuilder;
28use timely::communication::allocator::zero_copy::bytes_slab::BytesRefill;
29use timely::communication::initialize::WorkerGuards;
30use timely::execute::execute_from;
31use timely::worker::Worker as TimelyWorker;
32use tokio::runtime::Handle;
33use tokio::sync::mpsc;
34use tracing::{info, warn};
35
36use crate::communication::initialize_networking;
37
38type PartitionedClient<C, R> = Partitioned<LocalClient<C, R>, C, R>;
39
40/// A client managing access to the local portion of a Timely cluster
41pub struct ClusterClient<C>
42where
43    C: ClusterSpec,
44    (C::Command, C::Response): Partitionable<C::Command, C::Response>,
45{
46    /// The actual client to talk to the cluster
47    inner: Option<PartitionedClient<C::Command, C::Response>>,
48    /// The running timely instance
49    timely_container: TimelyContainerRef<C>,
50    /// The handle to the Tokio runtime.
51    tokio_handle: tokio::runtime::Handle,
52    cluster_spec: C,
53}
54
55/// Metadata about timely workers in this process.
56pub struct TimelyContainer<C: ClusterSpec> {
57    /// The current timely config in use
58    config: TimelyConfig,
59    /// Channels over which to send endpoints for wiring up a new Client
60    client_txs: Vec<
61        crossbeam_channel::Sender<(
62            crossbeam_channel::Receiver<C::Command>,
63            mpsc::UnboundedSender<C::Response>,
64        )>,
65    >,
66    /// Thread guards that keep worker threads alive
67    worker_guards: WorkerGuards<()>,
68}
69
70impl<C: ClusterSpec> TimelyContainer<C> {
71    fn worker_threads(&self) -> Vec<Thread> {
72        self.worker_guards
73            .guards()
74            .iter()
75            .map(|h| h.thread().clone())
76            .collect()
77    }
78}
79
80impl<C: ClusterSpec> Drop for TimelyContainer<C> {
81    fn drop(&mut self) {
82        panic!("Timely container must never drop");
83    }
84}
85
86/// Threadsafe reference to an optional TimelyContainer
87type TimelyContainerRef<C> = Arc<tokio::sync::Mutex<Option<TimelyContainer<C>>>>;
88
89impl<C> ClusterClient<C>
90where
91    C: ClusterSpec,
92    (C::Command, C::Response): Partitionable<C::Command, C::Response>,
93{
94    /// Create a new `ClusterClient`.
95    pub fn new(
96        timely_container: TimelyContainerRef<C>,
97        tokio_handle: tokio::runtime::Handle,
98        cluster_spec: C,
99    ) -> Self {
100        Self {
101            timely_container,
102            inner: None,
103            tokio_handle,
104            cluster_spec,
105        }
106    }
107
108    async fn build(
109        &mut self,
110        mut config: TimelyConfig,
111        epoch: ClusterStartupEpoch,
112    ) -> Result<(), Error> {
113        let workers = config.workers;
114
115        // Check if we can reuse the existing timely instance.
116        // We currently do not support reinstantiating timely, we simply panic if another config is
117        // requested. This code must panic before dropping the worker guards contained in
118        // timely_container. As we don't terminate timely workers, the thread join would hang
119        // forever, possibly creating a fair share of confusion in the orchestrator.
120
121        let mut timely_container = self.timely_container.lock().await;
122        match &*timely_container {
123            Some(existing) => {
124                // Ignore changes to `enable_create_sockets_v2`. Once the Timely processes are
125                // connected, we don't care which connection protocol was used.
126                config.enable_create_sockets_v2 = existing.config.enable_create_sockets_v2;
127
128                if config != existing.config {
129                    info!(new = ?config, old = ?existing.config, "TimelyConfig mismatch");
130                    halt!("new timely configuration does not match existing timely configuration");
131                }
132                info!("Timely already initialized; re-using.",);
133            }
134            None => {
135                let timely = self
136                    .cluster_spec
137                    .build_cluster(config, epoch, self.tokio_handle.clone())
138                    .await
139                    .inspect_err(|e| {
140                        warn!("timely initialization failed: {}", e.display_with_causes())
141                    })?;
142
143                *timely_container = Some(timely);
144            }
145        };
146
147        let timely = timely_container.as_ref().expect("set above");
148
149        let mut command_txs = Vec::with_capacity(workers);
150        let mut response_rxs = Vec::with_capacity(workers);
151        for client_tx in &timely.client_txs {
152            let (cmd_tx, cmd_rx) = crossbeam_channel::unbounded();
153            let (resp_tx, resp_rx) = mpsc::unbounded_channel();
154
155            client_tx
156                .send((cmd_rx, resp_tx))
157                .expect("worker not dropped");
158
159            command_txs.push(cmd_tx);
160            response_rxs.push(resp_rx);
161        }
162
163        self.inner = Some(LocalClient::new_partitioned(
164            response_rxs,
165            command_txs,
166            timely.worker_threads(),
167        ));
168        Ok(())
169    }
170}
171
172impl<C> fmt::Debug for ClusterClient<C>
173where
174    C: ClusterSpec,
175    (C::Command, C::Response): Partitionable<C::Command, C::Response>,
176{
177    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
178        f.debug_struct("ClusterClient")
179            .field("inner", &self.inner)
180            .finish_non_exhaustive()
181    }
182}
183
184#[async_trait]
185impl<C> GenericClient<C::Command, C::Response> for ClusterClient<C>
186where
187    C: ClusterSpec,
188    (C::Command, C::Response): Partitionable<C::Command, C::Response>,
189{
190    async fn send(&mut self, cmd: C::Command) -> Result<(), Error> {
191        // Changing this debug statement requires changing the replica-isolation test
192        tracing::debug!("ClusterClient send={:?}", &cmd);
193        match cmd.try_into_timely_config() {
194            Ok((config, epoch)) => self.build(config, epoch).await,
195            Err(cmd) => self.inner.as_mut().expect("initialized").send(cmd).await,
196        }
197    }
198
199    /// # Cancel safety
200    ///
201    /// This method is cancel safe. If `recv` is used as the event in a [`tokio::select!`]
202    /// statement and some other branch completes first, it is guaranteed that no messages were
203    /// received by this client.
204    async fn recv(&mut self) -> Result<Option<C::Response>, Error> {
205        if let Some(client) = self.inner.as_mut() {
206            // `Partitioned::recv` is documented as cancel safe.
207            client.recv().await
208        } else {
209            future::pending().await
210        }
211    }
212}
213
214/// Specification for a Timely cluster to which a [`ClusterClient`] connects.
215///
216/// This trait is used to make the [`ClusterClient`] generic over the compute and storage cluster
217/// implementations.
218#[async_trait]
219pub trait ClusterSpec: Clone + Send + Sync + 'static {
220    /// The cluster command type.
221    type Command: fmt::Debug + Send + TryIntoTimelyConfig;
222    /// The cluster response type.
223    type Response: fmt::Debug + Send;
224
225    /// Run the given Timely worker.
226    fn run_worker<A: Allocate + 'static>(
227        &self,
228        timely_worker: &mut TimelyWorker<A>,
229        client_rx: crossbeam_channel::Receiver<(
230            crossbeam_channel::Receiver<Self::Command>,
231            mpsc::UnboundedSender<Self::Response>,
232        )>,
233    );
234
235    /// Build a Timely cluster using the given config.
236    async fn build_cluster(
237        &self,
238        config: TimelyConfig,
239        epoch: ClusterStartupEpoch,
240        tokio_executor: Handle,
241    ) -> Result<TimelyContainer<Self>, Error> {
242        info!("Building timely container with config {config:?}");
243        let (client_txs, client_rxs): (Vec<_>, Vec<_>) = (0..config.workers)
244            .map(|_| crossbeam_channel::unbounded())
245            .unzip();
246        let client_rxs: Mutex<Vec<_>> = Mutex::new(client_rxs.into_iter().map(Some).collect());
247
248        let refill = if config.enable_zero_copy_lgalloc {
249            BytesRefill {
250                logic: Arc::new(|size| Box::new(alloc::lgalloc_refill(size))),
251                limit: config.zero_copy_limit,
252            }
253        } else {
254            BytesRefill {
255                logic: Arc::new(|size| Box::new(vec![0; size])),
256                limit: config.zero_copy_limit,
257            }
258        };
259
260        let (builders, other) = if config.enable_zero_copy {
261            use timely::communication::allocator::zero_copy::allocator_process::ProcessBuilder;
262            initialize_networking::<ProcessBuilder>(
263                config.workers,
264                config.process,
265                config.addresses.clone(),
266                epoch,
267                refill,
268                GenericBuilder::ZeroCopyBinary,
269                config.enable_create_sockets_v2,
270            )
271            .await?
272        } else {
273            initialize_networking::<timely::communication::allocator::Process>(
274                config.workers,
275                config.process,
276                config.addresses.clone(),
277                epoch,
278                refill,
279                GenericBuilder::ZeroCopy,
280                config.enable_create_sockets_v2,
281            )
282            .await?
283        };
284
285        let mut worker_config = WorkerConfig::default();
286
287        // We set a custom exertion logic for proportionality > 0. A proportionality value of 0
288        // means that no arrangement merge effort is exerted and merging occurs only in response to
289        // updates.
290        if config.arrangement_exert_proportionality > 0 {
291            let merge_effort = Some(1000);
292
293            // ExertionLogic defines a function to determine if a spine is sufficiently tidied.
294            // Its arguments are an iterator over the index of a layer, the count of batches in the
295            // layer and the length of batches at the layer. The iterator enumerates layers from the
296            // largest to the smallest layer.
297
298            let arc: ExertionLogic = Arc::new(move |layers| {
299                let mut prop = config.arrangement_exert_proportionality;
300
301                // Layers are ordered from largest to smallest.
302                // Skip to the largest occupied layer.
303                let layers = layers
304                    .iter()
305                    .copied()
306                    .skip_while(|(_idx, count, _len)| *count == 0);
307
308                let mut first = true;
309                for (_idx, count, len) in layers {
310                    if count > 1 {
311                        // Found an in-progress merge that we should continue.
312                        return merge_effort;
313                    }
314
315                    if !first && prop > 0 && len > 0 {
316                        // Found a non-empty batch within `arrangement_exert_proportionality` of
317                        // the largest one.
318                        return merge_effort;
319                    }
320
321                    first = false;
322                    prop /= 2;
323                }
324
325                None
326            });
327            worker_config.set::<ExertionLogic>("differential/default_exert_logic".to_string(), arc);
328        }
329
330        let spec = self.clone();
331        let worker_guards = execute_from(builders, other, worker_config, move |timely_worker| {
332            let timely_worker_index = timely_worker.index();
333            let _tokio_guard = tokio_executor.enter();
334            let client_rx = client_rxs.lock().unwrap()[timely_worker_index % config.workers]
335                .take()
336                .unwrap();
337            spec.run_worker(timely_worker, client_rx);
338        })
339        .map_err(|e| anyhow!(e))?;
340
341        Ok(TimelyContainer {
342            config,
343            client_txs,
344            worker_guards,
345        })
346    }
347}
348
349mod alloc {
350    /// A Timely communication refill function that uses lgalloc.
351    ///
352    /// Returns a handle to lgalloc'ed memory if lgalloc can handle
353    /// the request, otherwise we fall back to a heap allocation. In either case, the handle must
354    /// be dropped to free the memory.
355    pub(crate) fn lgalloc_refill(size: usize) -> LgallocHandle {
356        match lgalloc::allocate::<u8>(size) {
357            Ok((pointer, capacity, handle)) => {
358                let handle = Some(handle);
359                LgallocHandle {
360                    handle,
361                    pointer,
362                    capacity,
363                }
364            }
365            Err(_) => {
366                // Allocate memory
367                let mut alloc = vec![0_u8; size];
368                // Ensure that the length matches the capacity.
369                alloc.shrink_to_fit();
370                // Get a pointer to the allocated memory.
371                let pointer = std::ptr::NonNull::new(alloc.as_mut_ptr()).unwrap();
372                // Forget the vector to avoid dropping it. We'll free the memory in `drop`.
373                std::mem::forget(alloc);
374                LgallocHandle {
375                    handle: None,
376                    pointer,
377                    capacity: size,
378                }
379            }
380        }
381    }
382
383    /// A handle to memory allocated by lgalloc. This can either be memory allocated by lgalloc or
384    /// memory allocated by Vec. If the handle is set, it's lgalloc memory. If the handle is None,
385    /// it's a regular heap allocation.
386    pub(crate) struct LgallocHandle {
387        /// Lgalloc handle, set if the memory was allocated by lgalloc.
388        handle: Option<lgalloc::Handle>,
389        /// Pointer to the allocated memory. Always well-aligned, but can be dangling.
390        pointer: std::ptr::NonNull<u8>,
391        /// Capacity of the allocated memory in bytes.
392        capacity: usize,
393    }
394
395    impl std::ops::Deref for LgallocHandle {
396        type Target = [u8];
397        #[inline(always)]
398        fn deref(&self) -> &Self::Target {
399            unsafe { std::slice::from_raw_parts(self.pointer.as_ptr(), self.capacity) }
400        }
401    }
402
403    impl std::ops::DerefMut for LgallocHandle {
404        #[inline(always)]
405        fn deref_mut(&mut self) -> &mut Self::Target {
406            unsafe { std::slice::from_raw_parts_mut(self.pointer.as_ptr(), self.capacity) }
407        }
408    }
409
410    impl Drop for LgallocHandle {
411        fn drop(&mut self) {
412            // If we have a handle, it's lgalloc memory. Otherwise, it's a heap allocation.
413            if let Some(handle) = self.handle.take() {
414                lgalloc::deallocate(handle);
415            } else {
416                unsafe { Vec::from_raw_parts(self.pointer.as_ptr(), 0, self.capacity) };
417            }
418            // Update pointer and capacity such that we don't double-free.
419            self.pointer = std::ptr::NonNull::dangling();
420            self.capacity = 0;
421        }
422    }
423}