mz_compute_client/controller/
replica.rs1use std::sync::Arc;
13use std::sync::atomic::{self, AtomicBool};
14use std::time::{Duration, Instant};
15
16use anyhow::bail;
17use mz_build_info::BuildInfo;
18use mz_cluster_client::client::ClusterReplicaLocation;
19use mz_compute_types::dyncfgs::ENABLE_COMPUTE_REPLICA_EXPIRATION;
20use mz_dyncfg::ConfigSet;
21use mz_ore::channel::InstrumentedUnboundedSender;
22use mz_ore::retry::{Retry, RetryState};
23use mz_ore::task::AbortOnDropHandle;
24use mz_service::client::GenericClient;
25use mz_service::params::GrpcClientParameters;
26use mz_service::transport;
27use tokio::select;
28use tokio::sync::mpsc::error::SendError;
29use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
30use tracing::{debug, info, trace, warn};
31use uuid::Uuid;
32
33use crate::controller::ReplicaId;
34use crate::controller::instance::ReplicaResponse;
35use crate::controller::sequential_hydration::SequentialHydration;
36use crate::logging::LoggingConfig;
37use crate::metrics::IntCounter;
38use crate::metrics::ReplicaMetrics;
39use crate::protocol::command::ComputeCommand;
40use crate::protocol::response::ComputeResponse;
41
42type Client = SequentialHydration;
43
44#[derive(Clone, Debug)]
46pub(super) struct ReplicaConfig {
47 pub location: ClusterReplicaLocation,
48 pub logging: LoggingConfig,
49 pub grpc_client: GrpcClientParameters,
50 pub expiration_offset: Option<Duration>,
52}
53
54#[derive(Debug)]
56pub(super) struct ReplicaClient {
57 command_tx: UnboundedSender<ComputeCommand>,
59 task: AbortOnDropHandle<()>,
63 metrics: ReplicaMetrics,
65 connected: Arc<AtomicBool>,
67}
68
69impl ReplicaClient {
70 pub(super) fn spawn(
71 id: ReplicaId,
72 build_info: &'static BuildInfo,
73 config: ReplicaConfig,
74 epoch: u64,
75 metrics: ReplicaMetrics,
76 dyncfg: Arc<ConfigSet>,
77 response_tx: InstrumentedUnboundedSender<ReplicaResponse, IntCounter>,
78 ) -> Self {
79 let (command_tx, command_rx) = unbounded_channel();
83 let connected = Arc::new(AtomicBool::new(false));
84
85 let task = mz_ore::task::spawn(
86 || format!("active-replication-replica-{id}"),
87 ReplicaTask {
88 replica_id: id,
89 build_info,
90 config: config.clone(),
91 command_rx,
92 response_tx,
93 epoch,
94 metrics: metrics.clone(),
95 connected: Arc::clone(&connected),
96 dyncfg,
97 }
98 .run(),
99 );
100
101 Self {
102 command_tx,
103 task: task.abort_on_drop(),
104 metrics,
105 connected,
106 }
107 }
108}
109
110impl ReplicaClient {
111 pub(super) fn send(&self, command: ComputeCommand) -> Result<(), SendError<ComputeCommand>> {
113 self.command_tx.send(command).map(|r| {
114 self.metrics.inner.command_queue_size.inc();
115 r
116 })
117 }
118
119 pub(super) fn is_failed(&self) -> bool {
121 self.task.is_finished()
122 }
123
124 pub(super) fn is_connected(&self) -> bool {
126 self.connected.load(atomic::Ordering::Relaxed)
127 }
128}
129
130type ComputeCtpClient = transport::Client<ComputeCommand, ComputeResponse>;
131
132struct ReplicaTask {
134 replica_id: ReplicaId,
136 config: ReplicaConfig,
138 build_info: &'static BuildInfo,
140 command_rx: UnboundedReceiver<ComputeCommand>,
142 response_tx: InstrumentedUnboundedSender<ReplicaResponse, IntCounter>,
144 epoch: u64,
147 metrics: ReplicaMetrics,
149 connected: Arc<AtomicBool>,
151 dyncfg: Arc<ConfigSet>,
153}
154
155impl ReplicaTask {
156 async fn run(self) {
158 let replica_id = self.replica_id;
159 info!(replica = ?replica_id, "starting replica task");
160
161 let client = self.connect().await;
162 match self.run_message_loop(client).await {
163 Ok(()) => info!(replica = ?replica_id, "stopped replica task"),
164 Err(error) => warn!(replica = ?replica_id, "replica task failed: {error:#}"),
165 }
166 }
167
168 async fn connect(&self) -> Client {
173 let try_connect = async |retry: RetryState| {
174 let version = self.build_info.semver_version();
175 let client_params = &self.config.grpc_client;
176 let connect_timeout = client_params.connect_timeout.unwrap_or(Duration::MAX);
177 let keepalive_timeout = client_params
178 .http2_keep_alive_timeout
179 .unwrap_or(Duration::MAX);
180
181 let connect_start = Instant::now();
182 let connect_result = ComputeCtpClient::connect_partitioned(
183 self.config.location.ctl_addrs.clone(),
184 version,
185 connect_timeout,
186 keepalive_timeout,
187 self.metrics.clone(),
188 )
189 .await
190 .map(|client| {
191 let dyncfg = Arc::clone(&self.dyncfg);
192 SequentialHydration::new(client, dyncfg, self.metrics.clone())
193 });
194
195 self.metrics.observe_connect_time(connect_start.elapsed());
196
197 connect_result.inspect_err(|error| {
198 let next_backoff = retry.next_backoff.unwrap();
199 if retry.i >= mz_service::retry::INFO_MIN_RETRIES {
200 info!(
201 replica_id = %self.replica_id, ?next_backoff,
202 "error connecting to replica: {error:#}",
203 );
204 } else {
205 debug!(
206 replica_id = %self.replica_id, ?next_backoff,
207 "error connecting to replica: {error:#}",
208 );
209 }
210 })
211 };
212
213 let client = Retry::default()
214 .clamp_backoff(Duration::from_secs(1))
215 .retry_async(try_connect)
216 .await
217 .expect("retry retries forever");
218
219 self.metrics.observe_connect();
220 self.connected.store(true, atomic::Ordering::Relaxed);
221
222 client
223 }
224
225 async fn run_message_loop(mut self, mut client: Client) -> Result<(), anyhow::Error> {
231 loop {
232 select! {
233 command = self.command_rx.recv() => {
235 let Some(mut command) = command else {
236 break;
238 };
239
240 self.specialize_command(&mut command);
241 self.observe_command(&command);
242 client.send(command).await?;
243 },
244 response = client.recv() => {
246 let Some(response) = response? else {
247 bail!("replica unexpectedly gracefully terminated connection");
248 };
249
250 self.observe_response(&response);
251
252 if self.response_tx.send((self.replica_id, self.epoch, response)).is_err() {
253 break;
255 }
256 }
257 }
258 }
259
260 Ok(())
261 }
262
263 fn specialize_command(&self, command: &mut ComputeCommand) {
268 match command {
269 ComputeCommand::Hello { nonce } => {
270 *nonce = Uuid::new_v4();
271 }
272 ComputeCommand::CreateInstance(config) => {
273 config.logging = self.config.logging.clone();
274 if ENABLE_COMPUTE_REPLICA_EXPIRATION.get(&self.dyncfg) {
275 config.expiration_offset = self.config.expiration_offset;
276 }
277 }
278 _ => {}
279 }
280 }
281
282 #[mz_ore::instrument(level = "debug")]
284 fn observe_command(&self, command: &ComputeCommand) {
285 if let ComputeCommand::Peek(peek) = command {
286 peek.otel_ctx.attach_as_parent();
287 }
288
289 trace!(
290 replica = ?self.replica_id,
291 command = ?command,
292 "sending command to replica",
293 );
294
295 self.metrics.inner.command_queue_size.dec();
296 }
297
298 #[mz_ore::instrument(level = "debug")]
300 fn observe_response(&self, response: &ComputeResponse) {
301 if let ComputeResponse::PeekResponse(_, _, otel_ctx) = response {
302 otel_ctx.attach_as_parent();
303 }
304
305 trace!(
306 replica = ?self.replica_id,
307 response = ?response,
308 "received response from replica",
309 );
310 }
311}