mz_compute_client/controller/
replica.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A client for replicas of a compute instance.
11
12use std::sync::Arc;
13use std::sync::atomic::{self, AtomicBool};
14use std::time::{Duration, Instant};
15
16use anyhow::bail;
17use mz_build_info::BuildInfo;
18use mz_cluster_client::client::ClusterReplicaLocation;
19use mz_compute_types::dyncfgs::ENABLE_COMPUTE_REPLICA_EXPIRATION;
20use mz_dyncfg::ConfigSet;
21use mz_ore::channel::InstrumentedUnboundedSender;
22use mz_ore::retry::{Retry, RetryState};
23use mz_ore::task::AbortOnDropHandle;
24use mz_service::client::GenericClient;
25use mz_service::params::GrpcClientParameters;
26use mz_service::transport;
27use tokio::select;
28use tokio::sync::mpsc::error::SendError;
29use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
30use tracing::{debug, info, trace, warn};
31use uuid::Uuid;
32
33use crate::controller::instance::ReplicaResponse;
34use crate::controller::sequential_hydration::SequentialHydration;
35use crate::controller::{ComputeControllerTimestamp, ReplicaId};
36use crate::logging::LoggingConfig;
37use crate::metrics::IntCounter;
38use crate::metrics::ReplicaMetrics;
39use crate::protocol::command::ComputeCommand;
40use crate::protocol::response::ComputeResponse;
41use crate::service::{ComputeClient, ComputeGrpcClient};
42
43type Client<T> = SequentialHydration<T>;
44
45/// Replica-specific configuration.
46#[derive(Clone, Debug)]
47pub(super) struct ReplicaConfig {
48    pub location: ClusterReplicaLocation,
49    pub logging: LoggingConfig,
50    pub grpc_client: GrpcClientParameters,
51    /// The offset to use for replica expiration, if any.
52    pub expiration_offset: Option<Duration>,
53    pub enable_ctp: bool,
54}
55
56/// A client for a replica task.
57#[derive(Debug)]
58pub(super) struct ReplicaClient<T> {
59    /// A sender for commands for the replica.
60    command_tx: UnboundedSender<ComputeCommand<T>>,
61    /// A handle to the task that aborts it when the replica is dropped.
62    ///
63    /// If the task is finished, the replica has failed and needs rehydration.
64    task: AbortOnDropHandle<()>,
65    /// Replica metrics.
66    metrics: ReplicaMetrics,
67    /// Flag reporting whether the replica connection has been established.
68    connected: Arc<AtomicBool>,
69}
70
71impl<T> ReplicaClient<T>
72where
73    T: ComputeControllerTimestamp,
74    ComputeGrpcClient: ComputeClient<T>,
75{
76    pub(super) fn spawn(
77        id: ReplicaId,
78        build_info: &'static BuildInfo,
79        config: ReplicaConfig,
80        epoch: u64,
81        metrics: ReplicaMetrics,
82        dyncfg: Arc<ConfigSet>,
83        response_tx: InstrumentedUnboundedSender<ReplicaResponse<T>, IntCounter>,
84    ) -> Self {
85        // Launch a task to handle communication with the replica
86        // asynchronously. This isolates the main controller thread from
87        // the replica.
88        let (command_tx, command_rx) = unbounded_channel();
89        let connected = Arc::new(AtomicBool::new(false));
90
91        let task = mz_ore::task::spawn(
92            || format!("active-replication-replica-{id}"),
93            ReplicaTask {
94                replica_id: id,
95                build_info,
96                config: config.clone(),
97                command_rx,
98                response_tx,
99                epoch,
100                metrics: metrics.clone(),
101                connected: Arc::clone(&connected),
102                dyncfg,
103            }
104            .run(),
105        );
106
107        Self {
108            command_tx,
109            task: task.abort_on_drop(),
110            metrics,
111            connected,
112        }
113    }
114}
115
116impl<T> ReplicaClient<T> {
117    /// Sends a command to this replica.
118    pub(super) fn send(
119        &self,
120        command: ComputeCommand<T>,
121    ) -> Result<(), SendError<ComputeCommand<T>>> {
122        self.command_tx.send(command).map(|r| {
123            self.metrics.inner.command_queue_size.inc();
124            r
125        })
126    }
127
128    /// Determine if the replica task has failed.
129    pub(super) fn is_failed(&self) -> bool {
130        self.task.is_finished()
131    }
132
133    /// Determine if the replica connection has been established.
134    pub(super) fn is_connected(&self) -> bool {
135        self.connected.load(atomic::Ordering::Relaxed)
136    }
137}
138
139type ComputeCtpClient<T> = transport::Client<ComputeCommand<T>, ComputeResponse<T>>;
140
141/// Configuration for `replica_task`.
142struct ReplicaTask<T> {
143    /// The ID of the replica.
144    replica_id: ReplicaId,
145    /// Replica configuration.
146    config: ReplicaConfig,
147    /// The build information for this process.
148    build_info: &'static BuildInfo,
149    /// A channel upon which commands intended for the replica are delivered.
150    command_rx: UnboundedReceiver<ComputeCommand<T>>,
151    /// A channel upon which responses from the replica are delivered.
152    response_tx: InstrumentedUnboundedSender<ReplicaResponse<T>, IntCounter>,
153    /// A number identifying this incarnation of the replica.
154    /// The semantics of this don't matter, except that it must strictly increase.
155    epoch: u64,
156    /// Replica metrics.
157    metrics: ReplicaMetrics,
158    /// Flag to report successful replica connection.
159    connected: Arc<AtomicBool>,
160    /// Dynamic system configuration.
161    dyncfg: Arc<ConfigSet>,
162}
163
164impl<T> ReplicaTask<T>
165where
166    T: ComputeControllerTimestamp,
167    ComputeGrpcClient: ComputeClient<T>,
168{
169    /// Asynchronously forwards commands to and responses from a single replica.
170    async fn run(self) {
171        let replica_id = self.replica_id;
172        info!(replica = ?replica_id, "starting replica task");
173
174        let client = self.connect().await;
175        match self.run_message_loop(client).await {
176            Ok(()) => info!(replica = ?replica_id, "stopped replica task"),
177            Err(error) => warn!(replica = ?replica_id, "replica task failed: {error:#}"),
178        }
179    }
180
181    /// Connects to the replica.
182    ///
183    /// The connection is retried forever (with backoff) and this method returns only after
184    /// a connection was successfully established.
185    async fn connect(&self) -> Client<T> {
186        let try_connect = async |retry: RetryState| {
187            let version = self.build_info.semver_version();
188            let client_params = &self.config.grpc_client;
189
190            let connect_start = Instant::now();
191            let connect_result = if self.config.enable_ctp {
192                let connect_timeout = client_params.connect_timeout.unwrap_or(Duration::MAX);
193                let keepalive_timeout = client_params
194                    .http2_keep_alive_timeout
195                    .unwrap_or(Duration::MAX);
196
197                ComputeCtpClient::<T>::connect_partitioned(
198                    self.config.location.ctl_addrs.clone(),
199                    version,
200                    connect_timeout,
201                    keepalive_timeout,
202                    self.metrics.clone(),
203                )
204                .await
205                .map(|client| {
206                    let dyncfg = Arc::clone(&self.dyncfg);
207                    SequentialHydration::new(client, dyncfg, self.metrics.clone())
208                })
209            } else {
210                let addrs = &self.config.location.ctl_addrs;
211                let dests = addrs
212                    .iter()
213                    .map(|addr| (addr.clone(), self.metrics.clone()))
214                    .collect();
215                ComputeGrpcClient::connect_partitioned(dests, version, client_params)
216                    .await
217                    .map(|client| {
218                        let dyncfg = Arc::clone(&self.dyncfg);
219                        SequentialHydration::new(client, dyncfg, self.metrics.clone())
220                    })
221            };
222
223            self.metrics.observe_connect_time(connect_start.elapsed());
224
225            connect_result.inspect_err(|error| {
226                let next_backoff = retry.next_backoff.unwrap();
227                if retry.i >= mz_service::retry::INFO_MIN_RETRIES {
228                    info!(
229                        replica_id = %self.replica_id, ?next_backoff,
230                        "error connecting to replica: {error:#}",
231                    );
232                } else {
233                    debug!(
234                        replica_id = %self.replica_id, ?next_backoff,
235                        "error connecting to replica: {error:#}",
236                    );
237                }
238            })
239        };
240
241        let client = Retry::default()
242            .clamp_backoff(Duration::from_secs(1))
243            .retry_async(try_connect)
244            .await
245            .expect("retry retries forever");
246
247        self.metrics.observe_connect();
248        self.connected.store(true, atomic::Ordering::Relaxed);
249
250        client
251    }
252
253    /// Runs the message loop.
254    ///
255    /// Returns (with an `Err`) if it encounters an error condition (e.g. the replica disconnects).
256    /// If no error condition is encountered, the task runs until the controller disconnects from
257    /// the command channel, or the task is dropped.
258    async fn run_message_loop(mut self, mut client: Client<T>) -> Result<(), anyhow::Error>
259    where
260        T: ComputeControllerTimestamp,
261        ComputeGrpcClient: ComputeClient<T>,
262    {
263        loop {
264            select! {
265                // Command from controller to forward to replica.
266                command = self.command_rx.recv() => {
267                    let Some(mut command) = command else {
268                        // Controller is no longer interested in this replica. Shut down.
269                        break;
270                    };
271
272                    self.specialize_command(&mut command);
273                    self.observe_command(&command);
274                    client.send(command).await?;
275                },
276                // Response from replica to forward to controller.
277                response = client.recv() => {
278                    let Some(response) = response? else {
279                        bail!("replica unexpectedly gracefully terminated connection");
280                    };
281
282                    self.observe_response(&response);
283
284                    if self.response_tx.send((self.replica_id, self.epoch, response)).is_err() {
285                        // Controller is no longer interested in this replica. Shut down.
286                        break;
287                    }
288                }
289            }
290        }
291
292        Ok(())
293    }
294
295    /// Specialize a command for the given replica configuration.
296    ///
297    /// Most `ComputeCommand`s are independent of the target replica, but some
298    /// contain replica-specific fields that must be adjusted before sending.
299    fn specialize_command(&self, command: &mut ComputeCommand<T>) {
300        match command {
301            ComputeCommand::Hello { nonce } => {
302                *nonce = Uuid::new_v4();
303            }
304            ComputeCommand::CreateInstance(config) => {
305                config.logging = self.config.logging.clone();
306                if ENABLE_COMPUTE_REPLICA_EXPIRATION.get(&self.dyncfg) {
307                    config.expiration_offset = self.config.expiration_offset;
308                }
309            }
310            _ => {}
311        }
312    }
313
314    /// Update task state according to an observed command.
315    #[mz_ore::instrument(level = "debug")]
316    fn observe_command(&mut self, command: &ComputeCommand<T>) {
317        if let ComputeCommand::Peek(peek) = command {
318            peek.otel_ctx.attach_as_parent();
319        }
320
321        trace!(
322            replica = ?self.replica_id,
323            command = ?command,
324            "sending command to replica",
325        );
326
327        self.metrics.inner.command_queue_size.dec();
328    }
329
330    /// Update task state according to an observed response.
331    #[mz_ore::instrument(level = "debug")]
332    fn observe_response(&mut self, response: &ComputeResponse<T>) {
333        if let ComputeResponse::PeekResponse(_, _, otel_ctx) = response {
334            otel_ctx.attach_as_parent();
335        }
336
337        trace!(
338            replica = ?self.replica_id,
339            response = ?response,
340            "received response from replica",
341        );
342    }
343}