mz_compute_client/controller/
replica.rs1use std::sync::Arc;
13use std::sync::atomic::{self, AtomicBool};
14use std::time::{Duration, Instant};
15
16use anyhow::bail;
17use mz_build_info::BuildInfo;
18use mz_cluster_client::client::ClusterReplicaLocation;
19use mz_compute_types::dyncfgs::ENABLE_COMPUTE_REPLICA_EXPIRATION;
20use mz_dyncfg::ConfigSet;
21use mz_ore::channel::InstrumentedUnboundedSender;
22use mz_ore::retry::{Retry, RetryState};
23use mz_ore::task::AbortOnDropHandle;
24use mz_service::client::GenericClient;
25use mz_service::params::GrpcClientParameters;
26use mz_service::transport;
27use tokio::select;
28use tokio::sync::mpsc::error::SendError;
29use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
30use tracing::{debug, info, trace, warn};
31use uuid::Uuid;
32
33use crate::controller::instance::ReplicaResponse;
34use crate::controller::sequential_hydration::SequentialHydration;
35use crate::controller::{ComputeControllerTimestamp, ReplicaId};
36use crate::logging::LoggingConfig;
37use crate::metrics::IntCounter;
38use crate::metrics::ReplicaMetrics;
39use crate::protocol::command::ComputeCommand;
40use crate::protocol::response::ComputeResponse;
41use crate::service::{ComputeClient, ComputeGrpcClient};
42
43type Client<T> = SequentialHydration<T>;
44
45#[derive(Clone, Debug)]
47pub(super) struct ReplicaConfig {
48 pub location: ClusterReplicaLocation,
49 pub logging: LoggingConfig,
50 pub grpc_client: GrpcClientParameters,
51 pub expiration_offset: Option<Duration>,
53 pub enable_ctp: bool,
54}
55
56#[derive(Debug)]
58pub(super) struct ReplicaClient<T> {
59 command_tx: UnboundedSender<ComputeCommand<T>>,
61 task: AbortOnDropHandle<()>,
65 metrics: ReplicaMetrics,
67 connected: Arc<AtomicBool>,
69}
70
71impl<T> ReplicaClient<T>
72where
73 T: ComputeControllerTimestamp,
74 ComputeGrpcClient: ComputeClient<T>,
75{
76 pub(super) fn spawn(
77 id: ReplicaId,
78 build_info: &'static BuildInfo,
79 config: ReplicaConfig,
80 epoch: u64,
81 metrics: ReplicaMetrics,
82 dyncfg: Arc<ConfigSet>,
83 response_tx: InstrumentedUnboundedSender<ReplicaResponse<T>, IntCounter>,
84 ) -> Self {
85 let (command_tx, command_rx) = unbounded_channel();
89 let connected = Arc::new(AtomicBool::new(false));
90
91 let task = mz_ore::task::spawn(
92 || format!("active-replication-replica-{id}"),
93 ReplicaTask {
94 replica_id: id,
95 build_info,
96 config: config.clone(),
97 command_rx,
98 response_tx,
99 epoch,
100 metrics: metrics.clone(),
101 connected: Arc::clone(&connected),
102 dyncfg,
103 }
104 .run(),
105 );
106
107 Self {
108 command_tx,
109 task: task.abort_on_drop(),
110 metrics,
111 connected,
112 }
113 }
114}
115
116impl<T> ReplicaClient<T> {
117 pub(super) fn send(
119 &self,
120 command: ComputeCommand<T>,
121 ) -> Result<(), SendError<ComputeCommand<T>>> {
122 self.command_tx.send(command).map(|r| {
123 self.metrics.inner.command_queue_size.inc();
124 r
125 })
126 }
127
128 pub(super) fn is_failed(&self) -> bool {
130 self.task.is_finished()
131 }
132
133 pub(super) fn is_connected(&self) -> bool {
135 self.connected.load(atomic::Ordering::Relaxed)
136 }
137}
138
139type ComputeCtpClient<T> = transport::Client<ComputeCommand<T>, ComputeResponse<T>>;
140
141struct ReplicaTask<T> {
143 replica_id: ReplicaId,
145 config: ReplicaConfig,
147 build_info: &'static BuildInfo,
149 command_rx: UnboundedReceiver<ComputeCommand<T>>,
151 response_tx: InstrumentedUnboundedSender<ReplicaResponse<T>, IntCounter>,
153 epoch: u64,
156 metrics: ReplicaMetrics,
158 connected: Arc<AtomicBool>,
160 dyncfg: Arc<ConfigSet>,
162}
163
164impl<T> ReplicaTask<T>
165where
166 T: ComputeControllerTimestamp,
167 ComputeGrpcClient: ComputeClient<T>,
168{
169 async fn run(self) {
171 let replica_id = self.replica_id;
172 info!(replica = ?replica_id, "starting replica task");
173
174 let client = self.connect().await;
175 match self.run_message_loop(client).await {
176 Ok(()) => info!(replica = ?replica_id, "stopped replica task"),
177 Err(error) => warn!(replica = ?replica_id, "replica task failed: {error:#}"),
178 }
179 }
180
181 async fn connect(&self) -> Client<T> {
186 let try_connect = async |retry: RetryState| {
187 let version = self.build_info.semver_version();
188 let client_params = &self.config.grpc_client;
189
190 let connect_start = Instant::now();
191 let connect_result = if self.config.enable_ctp {
192 let connect_timeout = client_params.connect_timeout.unwrap_or(Duration::MAX);
193 let keepalive_timeout = client_params
194 .http2_keep_alive_timeout
195 .unwrap_or(Duration::MAX);
196
197 ComputeCtpClient::<T>::connect_partitioned(
198 self.config.location.ctl_addrs.clone(),
199 version,
200 connect_timeout,
201 keepalive_timeout,
202 self.metrics.clone(),
203 )
204 .await
205 .map(|client| {
206 let dyncfg = Arc::clone(&self.dyncfg);
207 SequentialHydration::new(client, dyncfg, self.metrics.clone())
208 })
209 } else {
210 let addrs = &self.config.location.ctl_addrs;
211 let dests = addrs
212 .iter()
213 .map(|addr| (addr.clone(), self.metrics.clone()))
214 .collect();
215 ComputeGrpcClient::connect_partitioned(dests, version, client_params)
216 .await
217 .map(|client| {
218 let dyncfg = Arc::clone(&self.dyncfg);
219 SequentialHydration::new(client, dyncfg, self.metrics.clone())
220 })
221 };
222
223 self.metrics.observe_connect_time(connect_start.elapsed());
224
225 connect_result.inspect_err(|error| {
226 let next_backoff = retry.next_backoff.unwrap();
227 if retry.i >= mz_service::retry::INFO_MIN_RETRIES {
228 info!(
229 replica_id = %self.replica_id, ?next_backoff,
230 "error connecting to replica: {error:#}",
231 );
232 } else {
233 debug!(
234 replica_id = %self.replica_id, ?next_backoff,
235 "error connecting to replica: {error:#}",
236 );
237 }
238 })
239 };
240
241 let client = Retry::default()
242 .clamp_backoff(Duration::from_secs(1))
243 .retry_async(try_connect)
244 .await
245 .expect("retry retries forever");
246
247 self.metrics.observe_connect();
248 self.connected.store(true, atomic::Ordering::Relaxed);
249
250 client
251 }
252
253 async fn run_message_loop(mut self, mut client: Client<T>) -> Result<(), anyhow::Error>
259 where
260 T: ComputeControllerTimestamp,
261 ComputeGrpcClient: ComputeClient<T>,
262 {
263 loop {
264 select! {
265 command = self.command_rx.recv() => {
267 let Some(mut command) = command else {
268 break;
270 };
271
272 self.specialize_command(&mut command);
273 self.observe_command(&command);
274 client.send(command).await?;
275 },
276 response = client.recv() => {
278 let Some(response) = response? else {
279 bail!("replica unexpectedly gracefully terminated connection");
280 };
281
282 self.observe_response(&response);
283
284 if self.response_tx.send((self.replica_id, self.epoch, response)).is_err() {
285 break;
287 }
288 }
289 }
290 }
291
292 Ok(())
293 }
294
295 fn specialize_command(&self, command: &mut ComputeCommand<T>) {
300 match command {
301 ComputeCommand::Hello { nonce } => {
302 *nonce = Uuid::new_v4();
303 }
304 ComputeCommand::CreateInstance(config) => {
305 config.logging = self.config.logging.clone();
306 if ENABLE_COMPUTE_REPLICA_EXPIRATION.get(&self.dyncfg) {
307 config.expiration_offset = self.config.expiration_offset;
308 }
309 }
310 _ => {}
311 }
312 }
313
314 #[mz_ore::instrument(level = "debug")]
316 fn observe_command(&mut self, command: &ComputeCommand<T>) {
317 if let ComputeCommand::Peek(peek) = command {
318 peek.otel_ctx.attach_as_parent();
319 }
320
321 trace!(
322 replica = ?self.replica_id,
323 command = ?command,
324 "sending command to replica",
325 );
326
327 self.metrics.inner.command_queue_size.dec();
328 }
329
330 #[mz_ore::instrument(level = "debug")]
332 fn observe_response(&mut self, response: &ComputeResponse<T>) {
333 if let ComputeResponse::PeekResponse(_, _, otel_ctx) = response {
334 otel_ctx.attach_as_parent();
335 }
336
337 trace!(
338 replica = ?self.replica_id,
339 response = ?response,
340 "received response from replica",
341 );
342 }
343}