mz_compute_client/controller/
replica.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A client for replicas of a compute instance.
11
12use std::sync::Arc;
13use std::sync::atomic::{self, AtomicBool};
14use std::time::{Duration, Instant};
15
16use anyhow::bail;
17use mz_build_info::BuildInfo;
18use mz_cluster_client::client::{ClusterReplicaLocation, ClusterStartupEpoch, TimelyConfig};
19use mz_compute_types::dyncfgs::ENABLE_COMPUTE_REPLICA_EXPIRATION;
20use mz_controller_types::dyncfgs::ENABLE_CREATE_SOCKETS_V2;
21use mz_dyncfg::ConfigSet;
22use mz_ore::channel::InstrumentedUnboundedSender;
23use mz_ore::retry::{Retry, RetryState};
24use mz_ore::task::AbortOnDropHandle;
25use mz_service::client::GenericClient;
26use mz_service::params::GrpcClientParameters;
27use tokio::select;
28use tokio::sync::mpsc::error::SendError;
29use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
30use tracing::{debug, info, trace, warn};
31
32use crate::controller::instance::ReplicaResponse;
33use crate::controller::sequential_hydration::SequentialHydration;
34use crate::controller::{ComputeControllerTimestamp, ReplicaId};
35use crate::logging::LoggingConfig;
36use crate::metrics::IntCounter;
37use crate::metrics::ReplicaMetrics;
38use crate::protocol::command::{ComputeCommand, InitialComputeParameters};
39use crate::protocol::response::ComputeResponse;
40use crate::service::{ComputeClient, ComputeGrpcClient};
41
42type Client<T> = SequentialHydration<T>;
43
44/// Replica-specific configuration.
45#[derive(Clone, Debug)]
46pub(super) struct ReplicaConfig {
47    pub location: ClusterReplicaLocation,
48    pub logging: LoggingConfig,
49    pub grpc_client: GrpcClientParameters,
50    /// The offset to use for replica expiration, if any.
51    pub expiration_offset: Option<Duration>,
52    pub initial_config: InitialComputeParameters,
53}
54
55/// A client for a replica task.
56#[derive(Debug)]
57pub(super) struct ReplicaClient<T> {
58    /// A sender for commands for the replica.
59    command_tx: UnboundedSender<ComputeCommand<T>>,
60    /// A handle to the task that aborts it when the replica is dropped.
61    ///
62    /// If the task is finished, the replica has failed and needs rehydration.
63    task: AbortOnDropHandle<()>,
64    /// Replica metrics.
65    metrics: ReplicaMetrics,
66    /// Flag reporting whether the replica connection has been established.
67    connected: Arc<AtomicBool>,
68}
69
70impl<T> ReplicaClient<T>
71where
72    T: ComputeControllerTimestamp,
73    ComputeGrpcClient: ComputeClient<T>,
74{
75    pub(super) fn spawn(
76        id: ReplicaId,
77        build_info: &'static BuildInfo,
78        config: ReplicaConfig,
79        epoch: ClusterStartupEpoch,
80        metrics: ReplicaMetrics,
81        dyncfg: Arc<ConfigSet>,
82        response_tx: InstrumentedUnboundedSender<ReplicaResponse<T>, IntCounter>,
83    ) -> Self {
84        // Launch a task to handle communication with the replica
85        // asynchronously. This isolates the main controller thread from
86        // the replica.
87        let (command_tx, command_rx) = unbounded_channel();
88        let connected = Arc::new(AtomicBool::new(false));
89
90        let task = mz_ore::task::spawn(
91            || format!("active-replication-replica-{id}"),
92            ReplicaTask {
93                replica_id: id,
94                build_info,
95                config: config.clone(),
96                command_rx,
97                response_tx,
98                epoch,
99                metrics: metrics.clone(),
100                connected: Arc::clone(&connected),
101                dyncfg,
102            }
103            .run(),
104        );
105
106        Self {
107            command_tx,
108            task: task.abort_on_drop(),
109            metrics,
110            connected,
111        }
112    }
113}
114
115impl<T> ReplicaClient<T> {
116    /// Sends a command to this replica.
117    pub(super) fn send(
118        &self,
119        command: ComputeCommand<T>,
120    ) -> Result<(), SendError<ComputeCommand<T>>> {
121        self.command_tx.send(command).map(|r| {
122            self.metrics.inner.command_queue_size.inc();
123            r
124        })
125    }
126
127    /// Determine if the replica task has failed.
128    pub(super) fn is_failed(&self) -> bool {
129        self.task.is_finished()
130    }
131
132    /// Determine if the replica connection has been established.
133    pub(super) fn is_connected(&self) -> bool {
134        self.connected.load(atomic::Ordering::Relaxed)
135    }
136}
137
138/// Configuration for `replica_task`.
139struct ReplicaTask<T> {
140    /// The ID of the replica.
141    replica_id: ReplicaId,
142    /// Replica configuration.
143    config: ReplicaConfig,
144    /// The build information for this process.
145    build_info: &'static BuildInfo,
146    /// A channel upon which commands intended for the replica are delivered.
147    command_rx: UnboundedReceiver<ComputeCommand<T>>,
148    /// A channel upon which responses from the replica are delivered.
149    response_tx: InstrumentedUnboundedSender<ReplicaResponse<T>, IntCounter>,
150    /// A number (technically, pair of numbers) identifying this incarnation of the replica.
151    /// The semantics of this don't matter, except that it must strictly increase.
152    epoch: ClusterStartupEpoch,
153    /// Replica metrics.
154    metrics: ReplicaMetrics,
155    /// Flag to report successful replica connection.
156    connected: Arc<AtomicBool>,
157    /// Dynamic system configuration.
158    dyncfg: Arc<ConfigSet>,
159}
160
161impl<T> ReplicaTask<T>
162where
163    T: ComputeControllerTimestamp,
164    ComputeGrpcClient: ComputeClient<T>,
165{
166    /// Asynchronously forwards commands to and responses from a single replica.
167    async fn run(self) {
168        let replica_id = self.replica_id;
169        info!(replica = ?replica_id, "starting replica task");
170
171        let client = self.connect().await;
172        match self.run_message_loop(client).await {
173            Ok(()) => info!(replica = ?replica_id, "stopped replica task"),
174            Err(error) => warn!(replica = ?replica_id, "replica task failed: {error:#}"),
175        }
176    }
177
178    /// Connects to the replica.
179    ///
180    /// The connection is retried forever (with backoff) and this method returns only after
181    /// a connection was successfully established.
182    async fn connect(&self) -> Client<T> {
183        let try_connect = |retry: RetryState| {
184            let addrs = &self.config.location.ctl_addrs;
185            let dests = addrs
186                .iter()
187                .map(|addr| (addr.clone(), self.metrics.clone()))
188                .collect();
189            let version = self.build_info.semver_version();
190            let client_params = &self.config.grpc_client;
191
192            async move {
193                let connect_start = Instant::now();
194                let connect_result =
195                    ComputeGrpcClient::connect_partitioned(dests, version, client_params).await;
196                self.metrics.observe_connect_time(connect_start.elapsed());
197
198                connect_result.inspect_err(|error| {
199                    let next_backoff = retry.next_backoff.unwrap();
200                    if retry.i >= mz_service::retry::INFO_MIN_RETRIES {
201                        info!(
202                            replica_id = %self.replica_id, ?next_backoff,
203                            "error connecting to replica: {error:#}",
204                        );
205                    } else {
206                        debug!(
207                            replica_id = %self.replica_id, ?next_backoff,
208                            "error connecting to replica: {error:#}",
209                        );
210                    }
211                })
212            }
213        };
214
215        let client = Retry::default()
216            .clamp_backoff(Duration::from_secs(1))
217            .retry_async(try_connect)
218            .await
219            .expect("retry retries forever");
220
221        self.metrics.observe_connect();
222        self.connected.store(true, atomic::Ordering::Relaxed);
223
224        let dyncfg = Arc::clone(&self.dyncfg);
225        let metrics = self.metrics.clone();
226        SequentialHydration::new(client, dyncfg, metrics)
227    }
228
229    /// Runs the message loop.
230    ///
231    /// Returns (with an `Err`) if it encounters an error condition (e.g. the replica disconnects).
232    /// If no error condition is encountered, the task runs until the controller disconnects from
233    /// the command channel, or the task is dropped.
234    async fn run_message_loop(mut self, mut client: Client<T>) -> Result<(), anyhow::Error>
235    where
236        T: ComputeControllerTimestamp,
237        ComputeGrpcClient: ComputeClient<T>,
238    {
239        let id = self.replica_id;
240        let incarnation = self.epoch.replica();
241        loop {
242            select! {
243                // Command from controller to forward to replica.
244                command = self.command_rx.recv() => {
245                    let Some(mut command) = command else {
246                        // Controller is no longer interested in this replica. Shut down.
247                        break;
248                    };
249
250                    self.specialize_command(&mut command);
251                    self.observe_command(&command);
252                    client.send(command).await?;
253                },
254                // Response from replica to forward to controller.
255                response = client.recv() => {
256                    let Some(response) = response? else {
257                        bail!("replica unexpectedly gracefully terminated connection");
258                    };
259
260                    self.observe_response(&response);
261
262                    if self.response_tx.send((id, incarnation, response)).is_err() {
263                        // Controller is no longer interested in this replica. Shut down.
264                        break;
265                    }
266                }
267            }
268        }
269
270        Ok(())
271    }
272
273    /// Specialize a command for the given replica configuration.
274    ///
275    /// Most `ComputeCommand`s are independent of the target replica, but some
276    /// contain replica-specific fields that must be adjusted before sending.
277    fn specialize_command(&self, command: &mut ComputeCommand<T>) {
278        match command {
279            ComputeCommand::CreateTimely { config, epoch } => {
280                **config = TimelyConfig {
281                    workers: self.config.location.workers,
282                    process: 0,
283                    addresses: self.config.location.dataflow_addrs.clone(),
284                    arrangement_exert_proportionality: self
285                        .config
286                        .initial_config
287                        .arrangement_exert_proportionality,
288                    enable_zero_copy: self.config.initial_config.enable_zero_copy,
289                    enable_zero_copy_lgalloc: self.config.initial_config.enable_zero_copy_lgalloc,
290                    zero_copy_limit: self.config.initial_config.zero_copy_limit,
291                    enable_create_sockets_v2: ENABLE_CREATE_SOCKETS_V2.get(&self.dyncfg),
292                };
293                *epoch = self.epoch;
294            }
295            ComputeCommand::CreateInstance(config) => {
296                config.logging = self.config.logging.clone();
297                if ENABLE_COMPUTE_REPLICA_EXPIRATION.get(&self.dyncfg) {
298                    config.expiration_offset = self.config.expiration_offset;
299                }
300            }
301            _ => {}
302        }
303    }
304
305    /// Update task state according to an observed command.
306    #[mz_ore::instrument(level = "debug")]
307    fn observe_command(&mut self, command: &ComputeCommand<T>) {
308        if let ComputeCommand::Peek(peek) = command {
309            peek.otel_ctx.attach_as_parent();
310        }
311
312        trace!(
313            replica = ?self.replica_id,
314            command = ?command,
315            "sending command to replica",
316        );
317
318        self.metrics.inner.command_queue_size.dec();
319    }
320
321    /// Update task state according to an observed response.
322    #[mz_ore::instrument(level = "debug")]
323    fn observe_response(&mut self, response: &ComputeResponse<T>) {
324        if let ComputeResponse::PeekResponse(_, _, otel_ctx) = response {
325            otel_ctx.attach_as_parent();
326        }
327
328        trace!(
329            replica = ?self.replica_id,
330            response = ?response,
331            "received response from replica",
332        );
333    }
334}