mz_compute_client/controller/
replica.rs1use std::sync::Arc;
13use std::sync::atomic::{self, AtomicBool};
14use std::time::{Duration, Instant};
15
16use anyhow::bail;
17use mz_build_info::BuildInfo;
18use mz_cluster_client::client::ClusterReplicaLocation;
19use mz_compute_types::dyncfgs::ENABLE_COMPUTE_REPLICA_EXPIRATION;
20use mz_dyncfg::ConfigSet;
21use mz_ore::channel::InstrumentedUnboundedSender;
22use mz_ore::retry::{Retry, RetryState};
23use mz_ore::task::AbortOnDropHandle;
24use mz_service::client::GenericClient;
25use mz_service::params::GrpcClientParameters;
26use mz_service::transport;
27use tokio::select;
28use tokio::sync::mpsc::error::SendError;
29use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
30use tracing::{debug, info, trace, warn};
31use uuid::Uuid;
32
33use crate::controller::instance::ReplicaResponse;
34use crate::controller::sequential_hydration::SequentialHydration;
35use crate::controller::{ComputeControllerTimestamp, ReplicaId};
36use crate::logging::LoggingConfig;
37use crate::metrics::IntCounter;
38use crate::metrics::ReplicaMetrics;
39use crate::protocol::command::ComputeCommand;
40use crate::protocol::response::ComputeResponse;
41
42type Client<T> = SequentialHydration<T>;
43
44#[derive(Clone, Debug)]
46pub(super) struct ReplicaConfig {
47 pub location: ClusterReplicaLocation,
48 pub logging: LoggingConfig,
49 pub grpc_client: GrpcClientParameters,
50 pub expiration_offset: Option<Duration>,
52}
53
54#[derive(Debug)]
56pub(super) struct ReplicaClient<T> {
57 command_tx: UnboundedSender<ComputeCommand<T>>,
59 task: AbortOnDropHandle<()>,
63 metrics: ReplicaMetrics,
65 connected: Arc<AtomicBool>,
67}
68
69impl<T> ReplicaClient<T>
70where
71 T: ComputeControllerTimestamp,
72{
73 pub(super) fn spawn(
74 id: ReplicaId,
75 build_info: &'static BuildInfo,
76 config: ReplicaConfig,
77 epoch: u64,
78 metrics: ReplicaMetrics,
79 dyncfg: Arc<ConfigSet>,
80 response_tx: InstrumentedUnboundedSender<ReplicaResponse<T>, IntCounter>,
81 ) -> Self {
82 let (command_tx, command_rx) = unbounded_channel();
86 let connected = Arc::new(AtomicBool::new(false));
87
88 let task = mz_ore::task::spawn(
89 || format!("active-replication-replica-{id}"),
90 ReplicaTask {
91 replica_id: id,
92 build_info,
93 config: config.clone(),
94 command_rx,
95 response_tx,
96 epoch,
97 metrics: metrics.clone(),
98 connected: Arc::clone(&connected),
99 dyncfg,
100 }
101 .run(),
102 );
103
104 Self {
105 command_tx,
106 task: task.abort_on_drop(),
107 metrics,
108 connected,
109 }
110 }
111}
112
113impl<T> ReplicaClient<T> {
114 pub(super) fn send(
116 &self,
117 command: ComputeCommand<T>,
118 ) -> Result<(), SendError<ComputeCommand<T>>> {
119 self.command_tx.send(command).map(|r| {
120 self.metrics.inner.command_queue_size.inc();
121 r
122 })
123 }
124
125 pub(super) fn is_failed(&self) -> bool {
127 self.task.is_finished()
128 }
129
130 pub(super) fn is_connected(&self) -> bool {
132 self.connected.load(atomic::Ordering::Relaxed)
133 }
134}
135
136type ComputeCtpClient<T> = transport::Client<ComputeCommand<T>, ComputeResponse<T>>;
137
138struct ReplicaTask<T> {
140 replica_id: ReplicaId,
142 config: ReplicaConfig,
144 build_info: &'static BuildInfo,
146 command_rx: UnboundedReceiver<ComputeCommand<T>>,
148 response_tx: InstrumentedUnboundedSender<ReplicaResponse<T>, IntCounter>,
150 epoch: u64,
153 metrics: ReplicaMetrics,
155 connected: Arc<AtomicBool>,
157 dyncfg: Arc<ConfigSet>,
159}
160
161impl<T> ReplicaTask<T>
162where
163 T: ComputeControllerTimestamp,
164{
165 async fn run(self) {
167 let replica_id = self.replica_id;
168 info!(replica = ?replica_id, "starting replica task");
169
170 let client = self.connect().await;
171 match self.run_message_loop(client).await {
172 Ok(()) => info!(replica = ?replica_id, "stopped replica task"),
173 Err(error) => warn!(replica = ?replica_id, "replica task failed: {error:#}"),
174 }
175 }
176
177 async fn connect(&self) -> Client<T> {
182 let try_connect = async |retry: RetryState| {
183 let version = self.build_info.semver_version();
184 let client_params = &self.config.grpc_client;
185 let connect_timeout = client_params.connect_timeout.unwrap_or(Duration::MAX);
186 let keepalive_timeout = client_params
187 .http2_keep_alive_timeout
188 .unwrap_or(Duration::MAX);
189
190 let connect_start = Instant::now();
191 let connect_result = ComputeCtpClient::<T>::connect_partitioned(
192 self.config.location.ctl_addrs.clone(),
193 version,
194 connect_timeout,
195 keepalive_timeout,
196 self.metrics.clone(),
197 )
198 .await
199 .map(|client| {
200 let dyncfg = Arc::clone(&self.dyncfg);
201 SequentialHydration::new(client, dyncfg, self.metrics.clone())
202 });
203
204 self.metrics.observe_connect_time(connect_start.elapsed());
205
206 connect_result.inspect_err(|error| {
207 let next_backoff = retry.next_backoff.unwrap();
208 if retry.i >= mz_service::retry::INFO_MIN_RETRIES {
209 info!(
210 replica_id = %self.replica_id, ?next_backoff,
211 "error connecting to replica: {error:#}",
212 );
213 } else {
214 debug!(
215 replica_id = %self.replica_id, ?next_backoff,
216 "error connecting to replica: {error:#}",
217 );
218 }
219 })
220 };
221
222 let client = Retry::default()
223 .clamp_backoff(Duration::from_secs(1))
224 .retry_async(try_connect)
225 .await
226 .expect("retry retries forever");
227
228 self.metrics.observe_connect();
229 self.connected.store(true, atomic::Ordering::Relaxed);
230
231 client
232 }
233
234 async fn run_message_loop(mut self, mut client: Client<T>) -> Result<(), anyhow::Error>
240 where
241 T: ComputeControllerTimestamp,
242 {
243 loop {
244 select! {
245 command = self.command_rx.recv() => {
247 let Some(mut command) = command else {
248 break;
250 };
251
252 self.specialize_command(&mut command);
253 self.observe_command(&command);
254 client.send(command).await?;
255 },
256 response = client.recv() => {
258 let Some(response) = response? else {
259 bail!("replica unexpectedly gracefully terminated connection");
260 };
261
262 self.observe_response(&response);
263
264 if self.response_tx.send((self.replica_id, self.epoch, response)).is_err() {
265 break;
267 }
268 }
269 }
270 }
271
272 Ok(())
273 }
274
275 fn specialize_command(&self, command: &mut ComputeCommand<T>) {
280 match command {
281 ComputeCommand::Hello { nonce } => {
282 *nonce = Uuid::new_v4();
283 }
284 ComputeCommand::CreateInstance(config) => {
285 config.logging = self.config.logging.clone();
286 if ENABLE_COMPUTE_REPLICA_EXPIRATION.get(&self.dyncfg) {
287 config.expiration_offset = self.config.expiration_offset;
288 }
289 }
290 _ => {}
291 }
292 }
293
294 #[mz_ore::instrument(level = "debug")]
296 fn observe_command(&mut self, command: &ComputeCommand<T>) {
297 if let ComputeCommand::Peek(peek) = command {
298 peek.otel_ctx.attach_as_parent();
299 }
300
301 trace!(
302 replica = ?self.replica_id,
303 command = ?command,
304 "sending command to replica",
305 );
306
307 self.metrics.inner.command_queue_size.dec();
308 }
309
310 #[mz_ore::instrument(level = "debug")]
312 fn observe_response(&mut self, response: &ComputeResponse<T>) {
313 if let ComputeResponse::PeekResponse(_, _, otel_ctx) = response {
314 otel_ctx.attach_as_parent();
315 }
316
317 trace!(
318 replica = ?self.replica_id,
319 response = ?response,
320 "received response from replica",
321 );
322 }
323}