Skip to main content

mz_compute_client/controller/
replica.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A client for replicas of a compute instance.
11
12use std::sync::Arc;
13use std::sync::atomic::{self, AtomicBool};
14use std::time::{Duration, Instant};
15
16use anyhow::bail;
17use mz_build_info::BuildInfo;
18use mz_cluster_client::client::ClusterReplicaLocation;
19use mz_compute_types::dyncfgs::ENABLE_COMPUTE_REPLICA_EXPIRATION;
20use mz_dyncfg::ConfigSet;
21use mz_ore::channel::InstrumentedUnboundedSender;
22use mz_ore::retry::{Retry, RetryState};
23use mz_ore::task::AbortOnDropHandle;
24use mz_service::client::{GenericClient, Partitioned};
25use mz_service::params::GrpcClientParameters;
26use mz_service::transport;
27use tokio::select;
28use tokio::sync::mpsc::error::SendError;
29use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
30use tracing::{debug, info, trace, warn};
31use uuid::Uuid;
32
33use crate::controller::ReplicaId;
34use crate::controller::instance::ReplicaResponse;
35use crate::controller::sequential_hydration::SequentialHydration;
36use crate::logging::LoggingConfig;
37use crate::metrics::IntCounter;
38use crate::metrics::ReplicaMetrics;
39use crate::protocol::command::ComputeCommand;
40use crate::protocol::response::ComputeResponse;
41
42type Client = Partitioned<ComputeCtpClient, ComputeCommand, ComputeResponse>;
43
44/// Replica-specific configuration.
45#[derive(Clone, Debug)]
46pub(super) struct ReplicaConfig {
47    pub location: ClusterReplicaLocation,
48    pub logging: LoggingConfig,
49    pub grpc_client: GrpcClientParameters,
50    /// The offset to use for replica expiration, if any.
51    pub expiration_offset: Option<Duration>,
52    /// Whether arrangements on this replica use dictionary compression, captured at creation.
53    pub arrangement_dictionary_compression: bool,
54}
55
56/// A client for a replica task.
57#[derive(Debug)]
58pub(super) struct ReplicaClient {
59    /// A sender for commands for the replica.
60    command_tx: UnboundedSender<ComputeCommand>,
61    /// A handle to the task that aborts it when the replica is dropped.
62    ///
63    /// If the task is finished, the replica has failed and needs rehydration.
64    task: AbortOnDropHandle<()>,
65    /// Replica metrics.
66    metrics: ReplicaMetrics,
67    /// Flag reporting whether the replica connection has been established.
68    connected: Arc<AtomicBool>,
69}
70
71impl ReplicaClient {
72    pub(super) fn spawn(
73        id: ReplicaId,
74        build_info: &'static BuildInfo,
75        config: ReplicaConfig,
76        epoch: u64,
77        metrics: ReplicaMetrics,
78        dyncfg: Arc<ConfigSet>,
79        response_tx: InstrumentedUnboundedSender<ReplicaResponse, IntCounter>,
80    ) -> Self {
81        // Launch a task to handle communication with the replica
82        // asynchronously. This isolates the main controller thread from
83        // the replica.
84        let (command_tx, command_rx) = unbounded_channel();
85        let connected = Arc::new(AtomicBool::new(false));
86
87        let task = mz_ore::task::spawn(
88            || format!("active-replication-replica-{id}"),
89            ReplicaTask {
90                replica_id: id,
91                build_info,
92                config: config.clone(),
93                command_rx,
94                response_tx,
95                epoch,
96                metrics: metrics.clone(),
97                connected: Arc::clone(&connected),
98                dyncfg,
99            }
100            .run(),
101        );
102
103        Self {
104            command_tx,
105            task: task.abort_on_drop(),
106            metrics,
107            connected,
108        }
109    }
110}
111
112impl ReplicaClient {
113    /// Sends a command to this replica.
114    pub(super) fn send(&self, command: ComputeCommand) -> Result<(), SendError<ComputeCommand>> {
115        self.command_tx.send(command).map(|r| {
116            self.metrics.inner.command_queue_size.inc();
117            r
118        })
119    }
120
121    /// Determine if the replica task has failed.
122    pub(super) fn is_failed(&self) -> bool {
123        self.task.is_finished()
124    }
125
126    /// Determine if the replica connection has been established.
127    pub(super) fn is_connected(&self) -> bool {
128        self.connected.load(atomic::Ordering::Relaxed)
129    }
130}
131
132type ComputeCtpClient = transport::Client<ComputeCommand, ComputeResponse>;
133
134/// Configuration for `replica_task`.
135struct ReplicaTask {
136    /// The ID of the replica.
137    replica_id: ReplicaId,
138    /// Replica configuration.
139    config: ReplicaConfig,
140    /// The build information for this process.
141    build_info: &'static BuildInfo,
142    /// A channel upon which commands intended for the replica are delivered.
143    command_rx: UnboundedReceiver<ComputeCommand>,
144    /// A channel upon which responses from the replica are delivered.
145    response_tx: InstrumentedUnboundedSender<ReplicaResponse, IntCounter>,
146    /// A number identifying this incarnation of the replica.
147    /// The semantics of this don't matter, except that it must strictly increase.
148    epoch: u64,
149    /// Replica metrics.
150    metrics: ReplicaMetrics,
151    /// Flag to report successful replica connection.
152    connected: Arc<AtomicBool>,
153    /// Dynamic system configuration.
154    dyncfg: Arc<ConfigSet>,
155}
156
157impl ReplicaTask {
158    /// Asynchronously forwards commands to and responses from a single replica.
159    async fn run(self) {
160        let replica_id = self.replica_id;
161        info!(replica = ?replica_id, "starting replica task");
162
163        let client = self.connect().await;
164        match self.run_message_loop(client).await {
165            Ok(()) => info!(replica = ?replica_id, "stopped replica task"),
166            Err(error) => warn!(replica = ?replica_id, "replica task failed: {error:#}"),
167        }
168    }
169
170    /// Connects to the replica.
171    ///
172    /// The connection is retried forever (with backoff) and this method returns only after
173    /// a connection was successfully established.
174    async fn connect(&self) -> Client {
175        let try_connect = async |retry: RetryState| {
176            let version = self.build_info.semver_version();
177            let client_params = &self.config.grpc_client;
178            let connect_timeout = client_params.connect_timeout.unwrap_or(Duration::MAX);
179            let keepalive_timeout = client_params
180                .http2_keep_alive_timeout
181                .unwrap_or(Duration::MAX);
182
183            let connect_start = Instant::now();
184            let connect_result = ComputeCtpClient::connect_partitioned(
185                self.config.location.ctl_addrs.clone(),
186                version,
187                connect_timeout,
188                keepalive_timeout,
189                self.metrics.clone(),
190            )
191            .await;
192
193            self.metrics.observe_connect_time(connect_start.elapsed());
194
195            connect_result.inspect_err(|error| {
196                let next_backoff = retry.next_backoff.unwrap();
197                if retry.i >= mz_service::retry::INFO_MIN_RETRIES {
198                    info!(
199                        replica_id = %self.replica_id, ?next_backoff,
200                        "error connecting to replica: {error:#}",
201                    );
202                } else {
203                    debug!(
204                        replica_id = %self.replica_id, ?next_backoff,
205                        "error connecting to replica: {error:#}",
206                    );
207                }
208            })
209        };
210
211        let client = Retry::default()
212            .clamp_backoff(Duration::from_secs(1))
213            .retry_async(try_connect)
214            .await
215            .expect("retry retries forever");
216
217        self.metrics.observe_connect();
218        self.connected.store(true, atomic::Ordering::Relaxed);
219
220        client
221    }
222
223    /// Runs the message loop.
224    ///
225    /// Returns (with an `Err`) if it encounters an error condition (e.g. the replica disconnects).
226    /// If no error condition is encountered, the task runs until the controller disconnects from
227    /// the command channel, or the task is dropped.
228    async fn run_message_loop(mut self, mut client: Client) -> Result<(), anyhow::Error> {
229        // The sequential hydration interceptor holds back `Schedule` commands and releases them as
230        // hydration capacity frees up. It is recreated per incarnation, matching the lifetime of
231        // the connection: any in-flight hydration state is reset when we reconnect.
232        let mut hydration =
233            SequentialHydration::new(Arc::clone(&self.dyncfg), self.metrics.clone());
234
235        loop {
236            select! {
237                // Command from controller to forward to replica.
238                command = self.command_rx.recv() => {
239                    let Some(mut command) = command else {
240                        // Controller is no longer interested in this replica. Shut down.
241                        break;
242                    };
243
244                    self.specialize_command(&mut command);
245                    self.observe_command(&command);
246                    for command in hydration.absorb_command(command) {
247                        client.send(command).await?;
248                    }
249                },
250                // Response from replica to forward to controller.
251                response = client.recv() => {
252                    let Some(response) = response? else {
253                        bail!("replica unexpectedly gracefully terminated connection");
254                    };
255
256                    self.observe_response(&response);
257
258                    for command in hydration.observe_response(&response) {
259                        client.send(command).await?;
260                    }
261
262                    if self.response_tx.send((self.replica_id, self.epoch, response)).is_err() {
263                        // Controller is no longer interested in this replica. Shut down.
264                        break;
265                    }
266                }
267            }
268        }
269
270        Ok(())
271    }
272
273    /// Specialize a command for the given replica configuration.
274    ///
275    /// Most `ComputeCommand`s are independent of the target replica, but some
276    /// contain replica-specific fields that must be adjusted before sending.
277    fn specialize_command(&self, command: &mut ComputeCommand) {
278        match command {
279            ComputeCommand::Hello { nonce } => {
280                *nonce = Uuid::new_v4();
281            }
282            ComputeCommand::CreateInstance(config) => {
283                config.logging = self.config.logging.clone();
284                if ENABLE_COMPUTE_REPLICA_EXPIRATION.get(&self.dyncfg) {
285                    config.expiration_offset = self.config.expiration_offset;
286                }
287                config.arrangement_dictionary_compression =
288                    self.config.arrangement_dictionary_compression;
289            }
290            _ => {}
291        }
292    }
293
294    /// Update task state according to an observed command.
295    #[mz_ore::instrument(level = "debug")]
296    fn observe_command(&self, command: &ComputeCommand) {
297        if let ComputeCommand::Peek(peek) = command {
298            peek.otel_ctx.attach_as_parent();
299        }
300
301        trace!(
302            replica = ?self.replica_id,
303            command = ?command,
304            "sending command to replica",
305        );
306
307        self.metrics.inner.command_queue_size.dec();
308    }
309
310    /// Update task state according to an observed response.
311    #[mz_ore::instrument(level = "debug")]
312    fn observe_response(&self, response: &ComputeResponse) {
313        if let ComputeResponse::PeekResponse(_, _, otel_ctx) = response {
314            otel_ctx.attach_as_parent();
315        }
316
317        trace!(
318            replica = ?self.replica_id,
319            response = ?response,
320            "received response from replica",
321        );
322    }
323}