mz_compute_client/controller/
replica.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A client for replicas of a compute instance.
11
12use std::sync::Arc;
13use std::sync::atomic::{self, AtomicBool};
14use std::time::{Duration, Instant};
15
16use anyhow::bail;
17use mz_build_info::BuildInfo;
18use mz_cluster_client::client::ClusterReplicaLocation;
19use mz_compute_types::dyncfgs::ENABLE_COMPUTE_REPLICA_EXPIRATION;
20use mz_dyncfg::ConfigSet;
21use mz_ore::channel::InstrumentedUnboundedSender;
22use mz_ore::retry::{Retry, RetryState};
23use mz_ore::task::AbortOnDropHandle;
24use mz_service::client::GenericClient;
25use mz_service::params::GrpcClientParameters;
26use mz_service::transport;
27use tokio::select;
28use tokio::sync::mpsc::error::SendError;
29use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
30use tracing::{debug, info, trace, warn};
31use uuid::Uuid;
32
33use crate::controller::instance::ReplicaResponse;
34use crate::controller::sequential_hydration::SequentialHydration;
35use crate::controller::{ComputeControllerTimestamp, ReplicaId};
36use crate::logging::LoggingConfig;
37use crate::metrics::IntCounter;
38use crate::metrics::ReplicaMetrics;
39use crate::protocol::command::ComputeCommand;
40use crate::protocol::response::ComputeResponse;
41
42type Client<T> = SequentialHydration<T>;
43
44/// Replica-specific configuration.
45#[derive(Clone, Debug)]
46pub(super) struct ReplicaConfig {
47    pub location: ClusterReplicaLocation,
48    pub logging: LoggingConfig,
49    pub grpc_client: GrpcClientParameters,
50    /// The offset to use for replica expiration, if any.
51    pub expiration_offset: Option<Duration>,
52}
53
54/// A client for a replica task.
55#[derive(Debug)]
56pub(super) struct ReplicaClient<T> {
57    /// A sender for commands for the replica.
58    command_tx: UnboundedSender<ComputeCommand<T>>,
59    /// A handle to the task that aborts it when the replica is dropped.
60    ///
61    /// If the task is finished, the replica has failed and needs rehydration.
62    task: AbortOnDropHandle<()>,
63    /// Replica metrics.
64    metrics: ReplicaMetrics,
65    /// Flag reporting whether the replica connection has been established.
66    connected: Arc<AtomicBool>,
67}
68
69impl<T> ReplicaClient<T>
70where
71    T: ComputeControllerTimestamp,
72{
73    pub(super) fn spawn(
74        id: ReplicaId,
75        build_info: &'static BuildInfo,
76        config: ReplicaConfig,
77        epoch: u64,
78        metrics: ReplicaMetrics,
79        dyncfg: Arc<ConfigSet>,
80        response_tx: InstrumentedUnboundedSender<ReplicaResponse<T>, IntCounter>,
81    ) -> Self {
82        // Launch a task to handle communication with the replica
83        // asynchronously. This isolates the main controller thread from
84        // the replica.
85        let (command_tx, command_rx) = unbounded_channel();
86        let connected = Arc::new(AtomicBool::new(false));
87
88        let task = mz_ore::task::spawn(
89            || format!("active-replication-replica-{id}"),
90            ReplicaTask {
91                replica_id: id,
92                build_info,
93                config: config.clone(),
94                command_rx,
95                response_tx,
96                epoch,
97                metrics: metrics.clone(),
98                connected: Arc::clone(&connected),
99                dyncfg,
100            }
101            .run(),
102        );
103
104        Self {
105            command_tx,
106            task: task.abort_on_drop(),
107            metrics,
108            connected,
109        }
110    }
111}
112
113impl<T> ReplicaClient<T> {
114    /// Sends a command to this replica.
115    pub(super) fn send(
116        &self,
117        command: ComputeCommand<T>,
118    ) -> Result<(), SendError<ComputeCommand<T>>> {
119        self.command_tx.send(command).map(|r| {
120            self.metrics.inner.command_queue_size.inc();
121            r
122        })
123    }
124
125    /// Determine if the replica task has failed.
126    pub(super) fn is_failed(&self) -> bool {
127        self.task.is_finished()
128    }
129
130    /// Determine if the replica connection has been established.
131    pub(super) fn is_connected(&self) -> bool {
132        self.connected.load(atomic::Ordering::Relaxed)
133    }
134}
135
136type ComputeCtpClient<T> = transport::Client<ComputeCommand<T>, ComputeResponse<T>>;
137
138/// Configuration for `replica_task`.
139struct ReplicaTask<T> {
140    /// The ID of the replica.
141    replica_id: ReplicaId,
142    /// Replica configuration.
143    config: ReplicaConfig,
144    /// The build information for this process.
145    build_info: &'static BuildInfo,
146    /// A channel upon which commands intended for the replica are delivered.
147    command_rx: UnboundedReceiver<ComputeCommand<T>>,
148    /// A channel upon which responses from the replica are delivered.
149    response_tx: InstrumentedUnboundedSender<ReplicaResponse<T>, IntCounter>,
150    /// A number identifying this incarnation of the replica.
151    /// The semantics of this don't matter, except that it must strictly increase.
152    epoch: u64,
153    /// Replica metrics.
154    metrics: ReplicaMetrics,
155    /// Flag to report successful replica connection.
156    connected: Arc<AtomicBool>,
157    /// Dynamic system configuration.
158    dyncfg: Arc<ConfigSet>,
159}
160
161impl<T> ReplicaTask<T>
162where
163    T: ComputeControllerTimestamp,
164{
165    /// Asynchronously forwards commands to and responses from a single replica.
166    async fn run(self) {
167        let replica_id = self.replica_id;
168        info!(replica = ?replica_id, "starting replica task");
169
170        let client = self.connect().await;
171        match self.run_message_loop(client).await {
172            Ok(()) => info!(replica = ?replica_id, "stopped replica task"),
173            Err(error) => warn!(replica = ?replica_id, "replica task failed: {error:#}"),
174        }
175    }
176
177    /// Connects to the replica.
178    ///
179    /// The connection is retried forever (with backoff) and this method returns only after
180    /// a connection was successfully established.
181    async fn connect(&self) -> Client<T> {
182        let try_connect = async |retry: RetryState| {
183            let version = self.build_info.semver_version();
184            let client_params = &self.config.grpc_client;
185            let connect_timeout = client_params.connect_timeout.unwrap_or(Duration::MAX);
186            let keepalive_timeout = client_params
187                .http2_keep_alive_timeout
188                .unwrap_or(Duration::MAX);
189
190            let connect_start = Instant::now();
191            let connect_result = ComputeCtpClient::<T>::connect_partitioned(
192                self.config.location.ctl_addrs.clone(),
193                version,
194                connect_timeout,
195                keepalive_timeout,
196                self.metrics.clone(),
197            )
198            .await
199            .map(|client| {
200                let dyncfg = Arc::clone(&self.dyncfg);
201                SequentialHydration::new(client, dyncfg, self.metrics.clone())
202            });
203
204            self.metrics.observe_connect_time(connect_start.elapsed());
205
206            connect_result.inspect_err(|error| {
207                let next_backoff = retry.next_backoff.unwrap();
208                if retry.i >= mz_service::retry::INFO_MIN_RETRIES {
209                    info!(
210                        replica_id = %self.replica_id, ?next_backoff,
211                        "error connecting to replica: {error:#}",
212                    );
213                } else {
214                    debug!(
215                        replica_id = %self.replica_id, ?next_backoff,
216                        "error connecting to replica: {error:#}",
217                    );
218                }
219            })
220        };
221
222        let client = Retry::default()
223            .clamp_backoff(Duration::from_secs(1))
224            .retry_async(try_connect)
225            .await
226            .expect("retry retries forever");
227
228        self.metrics.observe_connect();
229        self.connected.store(true, atomic::Ordering::Relaxed);
230
231        client
232    }
233
234    /// Runs the message loop.
235    ///
236    /// Returns (with an `Err`) if it encounters an error condition (e.g. the replica disconnects).
237    /// If no error condition is encountered, the task runs until the controller disconnects from
238    /// the command channel, or the task is dropped.
239    async fn run_message_loop(mut self, mut client: Client<T>) -> Result<(), anyhow::Error>
240    where
241        T: ComputeControllerTimestamp,
242    {
243        loop {
244            select! {
245                // Command from controller to forward to replica.
246                command = self.command_rx.recv() => {
247                    let Some(mut command) = command else {
248                        // Controller is no longer interested in this replica. Shut down.
249                        break;
250                    };
251
252                    self.specialize_command(&mut command);
253                    self.observe_command(&command);
254                    client.send(command).await?;
255                },
256                // Response from replica to forward to controller.
257                response = client.recv() => {
258                    let Some(response) = response? else {
259                        bail!("replica unexpectedly gracefully terminated connection");
260                    };
261
262                    self.observe_response(&response);
263
264                    if self.response_tx.send((self.replica_id, self.epoch, response)).is_err() {
265                        // Controller is no longer interested in this replica. Shut down.
266                        break;
267                    }
268                }
269            }
270        }
271
272        Ok(())
273    }
274
275    /// Specialize a command for the given replica configuration.
276    ///
277    /// Most `ComputeCommand`s are independent of the target replica, but some
278    /// contain replica-specific fields that must be adjusted before sending.
279    fn specialize_command(&self, command: &mut ComputeCommand<T>) {
280        match command {
281            ComputeCommand::Hello { nonce } => {
282                *nonce = Uuid::new_v4();
283            }
284            ComputeCommand::CreateInstance(config) => {
285                config.logging = self.config.logging.clone();
286                if ENABLE_COMPUTE_REPLICA_EXPIRATION.get(&self.dyncfg) {
287                    config.expiration_offset = self.config.expiration_offset;
288                }
289            }
290            _ => {}
291        }
292    }
293
294    /// Update task state according to an observed command.
295    #[mz_ore::instrument(level = "debug")]
296    fn observe_command(&mut self, command: &ComputeCommand<T>) {
297        if let ComputeCommand::Peek(peek) = command {
298            peek.otel_ctx.attach_as_parent();
299        }
300
301        trace!(
302            replica = ?self.replica_id,
303            command = ?command,
304            "sending command to replica",
305        );
306
307        self.metrics.inner.command_queue_size.dec();
308    }
309
310    /// Update task state according to an observed response.
311    #[mz_ore::instrument(level = "debug")]
312    fn observe_response(&mut self, response: &ComputeResponse<T>) {
313        if let ComputeResponse::PeekResponse(_, _, otel_ctx) = response {
314            otel_ctx.attach_as_parent();
315        }
316
317        trace!(
318            replica = ?self.replica_id,
319            response = ?response,
320            "received response from replica",
321        );
322    }
323}