Skip to main content

mz_compute_client/controller/
replica.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A client for replicas of a compute instance.
11
12use std::sync::Arc;
13use std::sync::atomic::{self, AtomicBool};
14use std::time::{Duration, Instant};
15
16use anyhow::bail;
17use mz_build_info::BuildInfo;
18use mz_cluster_client::client::ClusterReplicaLocation;
19use mz_compute_types::dyncfgs::ENABLE_COMPUTE_REPLICA_EXPIRATION;
20use mz_dyncfg::ConfigSet;
21use mz_ore::channel::InstrumentedUnboundedSender;
22use mz_ore::retry::{Retry, RetryState};
23use mz_ore::task::AbortOnDropHandle;
24use mz_service::client::GenericClient;
25use mz_service::params::GrpcClientParameters;
26use mz_service::transport;
27use tokio::select;
28use tokio::sync::mpsc::error::SendError;
29use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
30use tracing::{debug, info, trace, warn};
31use uuid::Uuid;
32
33use crate::controller::ReplicaId;
34use crate::controller::instance::ReplicaResponse;
35use crate::controller::sequential_hydration::SequentialHydration;
36use crate::logging::LoggingConfig;
37use crate::metrics::IntCounter;
38use crate::metrics::ReplicaMetrics;
39use crate::protocol::command::ComputeCommand;
40use crate::protocol::response::ComputeResponse;
41
42type Client = SequentialHydration;
43
44/// Replica-specific configuration.
45#[derive(Clone, Debug)]
46pub(super) struct ReplicaConfig {
47    pub location: ClusterReplicaLocation,
48    pub logging: LoggingConfig,
49    pub grpc_client: GrpcClientParameters,
50    /// The offset to use for replica expiration, if any.
51    pub expiration_offset: Option<Duration>,
52}
53
54/// A client for a replica task.
55#[derive(Debug)]
56pub(super) struct ReplicaClient {
57    /// A sender for commands for the replica.
58    command_tx: UnboundedSender<ComputeCommand>,
59    /// A handle to the task that aborts it when the replica is dropped.
60    ///
61    /// If the task is finished, the replica has failed and needs rehydration.
62    task: AbortOnDropHandle<()>,
63    /// Replica metrics.
64    metrics: ReplicaMetrics,
65    /// Flag reporting whether the replica connection has been established.
66    connected: Arc<AtomicBool>,
67}
68
69impl ReplicaClient {
70    pub(super) fn spawn(
71        id: ReplicaId,
72        build_info: &'static BuildInfo,
73        config: ReplicaConfig,
74        epoch: u64,
75        metrics: ReplicaMetrics,
76        dyncfg: Arc<ConfigSet>,
77        response_tx: InstrumentedUnboundedSender<ReplicaResponse, IntCounter>,
78    ) -> Self {
79        // Launch a task to handle communication with the replica
80        // asynchronously. This isolates the main controller thread from
81        // the replica.
82        let (command_tx, command_rx) = unbounded_channel();
83        let connected = Arc::new(AtomicBool::new(false));
84
85        let task = mz_ore::task::spawn(
86            || format!("active-replication-replica-{id}"),
87            ReplicaTask {
88                replica_id: id,
89                build_info,
90                config: config.clone(),
91                command_rx,
92                response_tx,
93                epoch,
94                metrics: metrics.clone(),
95                connected: Arc::clone(&connected),
96                dyncfg,
97            }
98            .run(),
99        );
100
101        Self {
102            command_tx,
103            task: task.abort_on_drop(),
104            metrics,
105            connected,
106        }
107    }
108}
109
110impl ReplicaClient {
111    /// Sends a command to this replica.
112    pub(super) fn send(&self, command: ComputeCommand) -> Result<(), SendError<ComputeCommand>> {
113        self.command_tx.send(command).map(|r| {
114            self.metrics.inner.command_queue_size.inc();
115            r
116        })
117    }
118
119    /// Determine if the replica task has failed.
120    pub(super) fn is_failed(&self) -> bool {
121        self.task.is_finished()
122    }
123
124    /// Determine if the replica connection has been established.
125    pub(super) fn is_connected(&self) -> bool {
126        self.connected.load(atomic::Ordering::Relaxed)
127    }
128}
129
130type ComputeCtpClient = transport::Client<ComputeCommand, ComputeResponse>;
131
132/// Configuration for `replica_task`.
133struct ReplicaTask {
134    /// The ID of the replica.
135    replica_id: ReplicaId,
136    /// Replica configuration.
137    config: ReplicaConfig,
138    /// The build information for this process.
139    build_info: &'static BuildInfo,
140    /// A channel upon which commands intended for the replica are delivered.
141    command_rx: UnboundedReceiver<ComputeCommand>,
142    /// A channel upon which responses from the replica are delivered.
143    response_tx: InstrumentedUnboundedSender<ReplicaResponse, IntCounter>,
144    /// A number identifying this incarnation of the replica.
145    /// The semantics of this don't matter, except that it must strictly increase.
146    epoch: u64,
147    /// Replica metrics.
148    metrics: ReplicaMetrics,
149    /// Flag to report successful replica connection.
150    connected: Arc<AtomicBool>,
151    /// Dynamic system configuration.
152    dyncfg: Arc<ConfigSet>,
153}
154
155impl ReplicaTask {
156    /// Asynchronously forwards commands to and responses from a single replica.
157    async fn run(self) {
158        let replica_id = self.replica_id;
159        info!(replica = ?replica_id, "starting replica task");
160
161        let client = self.connect().await;
162        match self.run_message_loop(client).await {
163            Ok(()) => info!(replica = ?replica_id, "stopped replica task"),
164            Err(error) => warn!(replica = ?replica_id, "replica task failed: {error:#}"),
165        }
166    }
167
168    /// Connects to the replica.
169    ///
170    /// The connection is retried forever (with backoff) and this method returns only after
171    /// a connection was successfully established.
172    async fn connect(&self) -> Client {
173        let try_connect = async |retry: RetryState| {
174            let version = self.build_info.semver_version();
175            let client_params = &self.config.grpc_client;
176            let connect_timeout = client_params.connect_timeout.unwrap_or(Duration::MAX);
177            let keepalive_timeout = client_params
178                .http2_keep_alive_timeout
179                .unwrap_or(Duration::MAX);
180
181            let connect_start = Instant::now();
182            let connect_result = ComputeCtpClient::connect_partitioned(
183                self.config.location.ctl_addrs.clone(),
184                version,
185                connect_timeout,
186                keepalive_timeout,
187                self.metrics.clone(),
188            )
189            .await
190            .map(|client| {
191                let dyncfg = Arc::clone(&self.dyncfg);
192                SequentialHydration::new(client, dyncfg, self.metrics.clone())
193            });
194
195            self.metrics.observe_connect_time(connect_start.elapsed());
196
197            connect_result.inspect_err(|error| {
198                let next_backoff = retry.next_backoff.unwrap();
199                if retry.i >= mz_service::retry::INFO_MIN_RETRIES {
200                    info!(
201                        replica_id = %self.replica_id, ?next_backoff,
202                        "error connecting to replica: {error:#}",
203                    );
204                } else {
205                    debug!(
206                        replica_id = %self.replica_id, ?next_backoff,
207                        "error connecting to replica: {error:#}",
208                    );
209                }
210            })
211        };
212
213        let client = Retry::default()
214            .clamp_backoff(Duration::from_secs(1))
215            .retry_async(try_connect)
216            .await
217            .expect("retry retries forever");
218
219        self.metrics.observe_connect();
220        self.connected.store(true, atomic::Ordering::Relaxed);
221
222        client
223    }
224
225    /// Runs the message loop.
226    ///
227    /// Returns (with an `Err`) if it encounters an error condition (e.g. the replica disconnects).
228    /// If no error condition is encountered, the task runs until the controller disconnects from
229    /// the command channel, or the task is dropped.
230    async fn run_message_loop(mut self, mut client: Client) -> Result<(), anyhow::Error> {
231        loop {
232            select! {
233                // Command from controller to forward to replica.
234                command = self.command_rx.recv() => {
235                    let Some(mut command) = command else {
236                        // Controller is no longer interested in this replica. Shut down.
237                        break;
238                    };
239
240                    self.specialize_command(&mut command);
241                    self.observe_command(&command);
242                    client.send(command).await?;
243                },
244                // Response from replica to forward to controller.
245                response = client.recv() => {
246                    let Some(response) = response? else {
247                        bail!("replica unexpectedly gracefully terminated connection");
248                    };
249
250                    self.observe_response(&response);
251
252                    if self.response_tx.send((self.replica_id, self.epoch, response)).is_err() {
253                        // Controller is no longer interested in this replica. Shut down.
254                        break;
255                    }
256                }
257            }
258        }
259
260        Ok(())
261    }
262
263    /// Specialize a command for the given replica configuration.
264    ///
265    /// Most `ComputeCommand`s are independent of the target replica, but some
266    /// contain replica-specific fields that must be adjusted before sending.
267    fn specialize_command(&self, command: &mut ComputeCommand) {
268        match command {
269            ComputeCommand::Hello { nonce } => {
270                *nonce = Uuid::new_v4();
271            }
272            ComputeCommand::CreateInstance(config) => {
273                config.logging = self.config.logging.clone();
274                if ENABLE_COMPUTE_REPLICA_EXPIRATION.get(&self.dyncfg) {
275                    config.expiration_offset = self.config.expiration_offset;
276                }
277            }
278            _ => {}
279        }
280    }
281
282    /// Update task state according to an observed command.
283    #[mz_ore::instrument(level = "debug")]
284    fn observe_command(&self, command: &ComputeCommand) {
285        if let ComputeCommand::Peek(peek) = command {
286            peek.otel_ctx.attach_as_parent();
287        }
288
289        trace!(
290            replica = ?self.replica_id,
291            command = ?command,
292            "sending command to replica",
293        );
294
295        self.metrics.inner.command_queue_size.dec();
296    }
297
298    /// Update task state according to an observed response.
299    #[mz_ore::instrument(level = "debug")]
300    fn observe_response(&self, response: &ComputeResponse) {
301        if let ComputeResponse::PeekResponse(_, _, otel_ctx) = response {
302            otel_ctx.attach_as_parent();
303        }
304
305        trace!(
306            replica = ?self.replica_id,
307            response = ?response,
308            "received response from replica",
309        );
310    }
311}