mz_compute_client/controller/
replica.rs
1use std::sync::Arc;
13use std::sync::atomic::{self, AtomicBool};
14use std::time::{Duration, Instant};
15
16use anyhow::bail;
17use mz_build_info::BuildInfo;
18use mz_cluster_client::client::{ClusterReplicaLocation, ClusterStartupEpoch, TimelyConfig};
19use mz_compute_types::dyncfgs::ENABLE_COMPUTE_REPLICA_EXPIRATION;
20use mz_controller_types::dyncfgs::ENABLE_CREATE_SOCKETS_V2;
21use mz_dyncfg::ConfigSet;
22use mz_ore::channel::InstrumentedUnboundedSender;
23use mz_ore::retry::{Retry, RetryState};
24use mz_ore::task::AbortOnDropHandle;
25use mz_service::client::GenericClient;
26use mz_service::params::GrpcClientParameters;
27use tokio::select;
28use tokio::sync::mpsc::error::SendError;
29use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
30use tracing::{debug, info, trace, warn};
31
32use crate::controller::instance::ReplicaResponse;
33use crate::controller::sequential_hydration::SequentialHydration;
34use crate::controller::{ComputeControllerTimestamp, ReplicaId};
35use crate::logging::LoggingConfig;
36use crate::metrics::IntCounter;
37use crate::metrics::ReplicaMetrics;
38use crate::protocol::command::{ComputeCommand, InitialComputeParameters};
39use crate::protocol::response::ComputeResponse;
40use crate::service::{ComputeClient, ComputeGrpcClient};
41
42type Client<T> = SequentialHydration<T>;
43
44#[derive(Clone, Debug)]
46pub(super) struct ReplicaConfig {
47 pub location: ClusterReplicaLocation,
48 pub logging: LoggingConfig,
49 pub grpc_client: GrpcClientParameters,
50 pub expiration_offset: Option<Duration>,
52 pub initial_config: InitialComputeParameters,
53}
54
55#[derive(Debug)]
57pub(super) struct ReplicaClient<T> {
58 command_tx: UnboundedSender<ComputeCommand<T>>,
60 task: AbortOnDropHandle<()>,
64 metrics: ReplicaMetrics,
66 connected: Arc<AtomicBool>,
68}
69
70impl<T> ReplicaClient<T>
71where
72 T: ComputeControllerTimestamp,
73 ComputeGrpcClient: ComputeClient<T>,
74{
75 pub(super) fn spawn(
76 id: ReplicaId,
77 build_info: &'static BuildInfo,
78 config: ReplicaConfig,
79 epoch: ClusterStartupEpoch,
80 metrics: ReplicaMetrics,
81 dyncfg: Arc<ConfigSet>,
82 response_tx: InstrumentedUnboundedSender<ReplicaResponse<T>, IntCounter>,
83 ) -> Self {
84 let (command_tx, command_rx) = unbounded_channel();
88 let connected = Arc::new(AtomicBool::new(false));
89
90 let task = mz_ore::task::spawn(
91 || format!("active-replication-replica-{id}"),
92 ReplicaTask {
93 replica_id: id,
94 build_info,
95 config: config.clone(),
96 command_rx,
97 response_tx,
98 epoch,
99 metrics: metrics.clone(),
100 connected: Arc::clone(&connected),
101 dyncfg,
102 }
103 .run(),
104 );
105
106 Self {
107 command_tx,
108 task: task.abort_on_drop(),
109 metrics,
110 connected,
111 }
112 }
113}
114
115impl<T> ReplicaClient<T> {
116 pub(super) fn send(
118 &self,
119 command: ComputeCommand<T>,
120 ) -> Result<(), SendError<ComputeCommand<T>>> {
121 self.command_tx.send(command).map(|r| {
122 self.metrics.inner.command_queue_size.inc();
123 r
124 })
125 }
126
127 pub(super) fn is_failed(&self) -> bool {
129 self.task.is_finished()
130 }
131
132 pub(super) fn is_connected(&self) -> bool {
134 self.connected.load(atomic::Ordering::Relaxed)
135 }
136}
137
138struct ReplicaTask<T> {
140 replica_id: ReplicaId,
142 config: ReplicaConfig,
144 build_info: &'static BuildInfo,
146 command_rx: UnboundedReceiver<ComputeCommand<T>>,
148 response_tx: InstrumentedUnboundedSender<ReplicaResponse<T>, IntCounter>,
150 epoch: ClusterStartupEpoch,
153 metrics: ReplicaMetrics,
155 connected: Arc<AtomicBool>,
157 dyncfg: Arc<ConfigSet>,
159}
160
161impl<T> ReplicaTask<T>
162where
163 T: ComputeControllerTimestamp,
164 ComputeGrpcClient: ComputeClient<T>,
165{
166 async fn run(self) {
168 let replica_id = self.replica_id;
169 info!(replica = ?replica_id, "starting replica task");
170
171 let client = self.connect().await;
172 match self.run_message_loop(client).await {
173 Ok(()) => info!(replica = ?replica_id, "stopped replica task"),
174 Err(error) => warn!(replica = ?replica_id, "replica task failed: {error:#}"),
175 }
176 }
177
178 async fn connect(&self) -> Client<T> {
183 let try_connect = |retry: RetryState| {
184 let addrs = &self.config.location.ctl_addrs;
185 let dests = addrs
186 .iter()
187 .map(|addr| (addr.clone(), self.metrics.clone()))
188 .collect();
189 let version = self.build_info.semver_version();
190 let client_params = &self.config.grpc_client;
191
192 async move {
193 let connect_start = Instant::now();
194 let connect_result =
195 ComputeGrpcClient::connect_partitioned(dests, version, client_params).await;
196 self.metrics.observe_connect_time(connect_start.elapsed());
197
198 connect_result.inspect_err(|error| {
199 let next_backoff = retry.next_backoff.unwrap();
200 if retry.i >= mz_service::retry::INFO_MIN_RETRIES {
201 info!(
202 replica_id = %self.replica_id, ?next_backoff,
203 "error connecting to replica: {error:#}",
204 );
205 } else {
206 debug!(
207 replica_id = %self.replica_id, ?next_backoff,
208 "error connecting to replica: {error:#}",
209 );
210 }
211 })
212 }
213 };
214
215 let client = Retry::default()
216 .clamp_backoff(Duration::from_secs(1))
217 .retry_async(try_connect)
218 .await
219 .expect("retry retries forever");
220
221 self.metrics.observe_connect();
222 self.connected.store(true, atomic::Ordering::Relaxed);
223
224 let dyncfg = Arc::clone(&self.dyncfg);
225 let metrics = self.metrics.clone();
226 SequentialHydration::new(client, dyncfg, metrics)
227 }
228
229 async fn run_message_loop(mut self, mut client: Client<T>) -> Result<(), anyhow::Error>
235 where
236 T: ComputeControllerTimestamp,
237 ComputeGrpcClient: ComputeClient<T>,
238 {
239 let id = self.replica_id;
240 let incarnation = self.epoch.replica();
241 loop {
242 select! {
243 command = self.command_rx.recv() => {
245 let Some(mut command) = command else {
246 break;
248 };
249
250 self.specialize_command(&mut command);
251 self.observe_command(&command);
252 client.send(command).await?;
253 },
254 response = client.recv() => {
256 let Some(response) = response? else {
257 bail!("replica unexpectedly gracefully terminated connection");
258 };
259
260 self.observe_response(&response);
261
262 if self.response_tx.send((id, incarnation, response)).is_err() {
263 break;
265 }
266 }
267 }
268 }
269
270 Ok(())
271 }
272
273 fn specialize_command(&self, command: &mut ComputeCommand<T>) {
278 match command {
279 ComputeCommand::CreateTimely { config, epoch } => {
280 **config = TimelyConfig {
281 workers: self.config.location.workers,
282 process: 0,
283 addresses: self.config.location.dataflow_addrs.clone(),
284 arrangement_exert_proportionality: self
285 .config
286 .initial_config
287 .arrangement_exert_proportionality,
288 enable_zero_copy: self.config.initial_config.enable_zero_copy,
289 enable_zero_copy_lgalloc: self.config.initial_config.enable_zero_copy_lgalloc,
290 zero_copy_limit: self.config.initial_config.zero_copy_limit,
291 enable_create_sockets_v2: ENABLE_CREATE_SOCKETS_V2.get(&self.dyncfg),
292 };
293 *epoch = self.epoch;
294 }
295 ComputeCommand::CreateInstance(config) => {
296 config.logging = self.config.logging.clone();
297 if ENABLE_COMPUTE_REPLICA_EXPIRATION.get(&self.dyncfg) {
298 config.expiration_offset = self.config.expiration_offset;
299 }
300 }
301 _ => {}
302 }
303 }
304
305 #[mz_ore::instrument(level = "debug")]
307 fn observe_command(&mut self, command: &ComputeCommand<T>) {
308 if let ComputeCommand::Peek(peek) = command {
309 peek.otel_ctx.attach_as_parent();
310 }
311
312 trace!(
313 replica = ?self.replica_id,
314 command = ?command,
315 "sending command to replica",
316 );
317
318 self.metrics.inner.command_queue_size.dec();
319 }
320
321 #[mz_ore::instrument(level = "debug")]
323 fn observe_response(&mut self, response: &ComputeResponse<T>) {
324 if let ComputeResponse::PeekResponse(_, _, otel_ctx) = response {
325 otel_ctx.attach_as_parent();
326 }
327
328 trace!(
329 replica = ?self.replica_id,
330 response = ?response,
331 "received response from replica",
332 );
333 }
334}