mz_compute_client/controller/
replica.rs1use std::sync::Arc;
13use std::sync::atomic::{self, AtomicBool};
14use std::time::{Duration, Instant};
15
16use anyhow::bail;
17use mz_build_info::BuildInfo;
18use mz_cluster_client::client::ClusterReplicaLocation;
19use mz_compute_types::dyncfgs::ENABLE_COMPUTE_REPLICA_EXPIRATION;
20use mz_dyncfg::ConfigSet;
21use mz_ore::channel::InstrumentedUnboundedSender;
22use mz_ore::retry::{Retry, RetryState};
23use mz_ore::task::AbortOnDropHandle;
24use mz_service::client::{GenericClient, Partitioned};
25use mz_service::params::GrpcClientParameters;
26use mz_service::transport;
27use tokio::select;
28use tokio::sync::mpsc::error::SendError;
29use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
30use tracing::{debug, info, trace, warn};
31use uuid::Uuid;
32
33use crate::controller::ReplicaId;
34use crate::controller::instance::ReplicaResponse;
35use crate::controller::sequential_hydration::SequentialHydration;
36use crate::logging::LoggingConfig;
37use crate::metrics::IntCounter;
38use crate::metrics::ReplicaMetrics;
39use crate::protocol::command::ComputeCommand;
40use crate::protocol::response::ComputeResponse;
41
42type Client = Partitioned<ComputeCtpClient, ComputeCommand, ComputeResponse>;
43
44#[derive(Clone, Debug)]
46pub(super) struct ReplicaConfig {
47 pub location: ClusterReplicaLocation,
48 pub logging: LoggingConfig,
49 pub grpc_client: GrpcClientParameters,
50 pub expiration_offset: Option<Duration>,
52 pub arrangement_dictionary_compression: bool,
54}
55
56#[derive(Debug)]
58pub(super) struct ReplicaClient {
59 command_tx: UnboundedSender<ComputeCommand>,
61 task: AbortOnDropHandle<()>,
65 metrics: ReplicaMetrics,
67 connected: Arc<AtomicBool>,
69}
70
71impl ReplicaClient {
72 pub(super) fn spawn(
73 id: ReplicaId,
74 build_info: &'static BuildInfo,
75 config: ReplicaConfig,
76 epoch: u64,
77 metrics: ReplicaMetrics,
78 dyncfg: Arc<ConfigSet>,
79 response_tx: InstrumentedUnboundedSender<ReplicaResponse, IntCounter>,
80 ) -> Self {
81 let (command_tx, command_rx) = unbounded_channel();
85 let connected = Arc::new(AtomicBool::new(false));
86
87 let task = mz_ore::task::spawn(
88 || format!("active-replication-replica-{id}"),
89 ReplicaTask {
90 replica_id: id,
91 build_info,
92 config: config.clone(),
93 command_rx,
94 response_tx,
95 epoch,
96 metrics: metrics.clone(),
97 connected: Arc::clone(&connected),
98 dyncfg,
99 }
100 .run(),
101 );
102
103 Self {
104 command_tx,
105 task: task.abort_on_drop(),
106 metrics,
107 connected,
108 }
109 }
110}
111
112impl ReplicaClient {
113 pub(super) fn send(&self, command: ComputeCommand) -> Result<(), SendError<ComputeCommand>> {
115 self.command_tx.send(command).map(|r| {
116 self.metrics.inner.command_queue_size.inc();
117 r
118 })
119 }
120
121 pub(super) fn is_failed(&self) -> bool {
123 self.task.is_finished()
124 }
125
126 pub(super) fn is_connected(&self) -> bool {
128 self.connected.load(atomic::Ordering::Relaxed)
129 }
130}
131
132type ComputeCtpClient = transport::Client<ComputeCommand, ComputeResponse>;
133
134struct ReplicaTask {
136 replica_id: ReplicaId,
138 config: ReplicaConfig,
140 build_info: &'static BuildInfo,
142 command_rx: UnboundedReceiver<ComputeCommand>,
144 response_tx: InstrumentedUnboundedSender<ReplicaResponse, IntCounter>,
146 epoch: u64,
149 metrics: ReplicaMetrics,
151 connected: Arc<AtomicBool>,
153 dyncfg: Arc<ConfigSet>,
155}
156
157impl ReplicaTask {
158 async fn run(self) {
160 let replica_id = self.replica_id;
161 info!(replica = ?replica_id, "starting replica task");
162
163 let client = self.connect().await;
164 match self.run_message_loop(client).await {
165 Ok(()) => info!(replica = ?replica_id, "stopped replica task"),
166 Err(error) => warn!(replica = ?replica_id, "replica task failed: {error:#}"),
167 }
168 }
169
170 async fn connect(&self) -> Client {
175 let try_connect = async |retry: RetryState| {
176 let version = self.build_info.semver_version();
177 let client_params = &self.config.grpc_client;
178 let connect_timeout = client_params.connect_timeout.unwrap_or(Duration::MAX);
179 let keepalive_timeout = client_params
180 .http2_keep_alive_timeout
181 .unwrap_or(Duration::MAX);
182
183 let connect_start = Instant::now();
184 let connect_result = ComputeCtpClient::connect_partitioned(
185 self.config.location.ctl_addrs.clone(),
186 version,
187 connect_timeout,
188 keepalive_timeout,
189 self.metrics.clone(),
190 )
191 .await;
192
193 self.metrics.observe_connect_time(connect_start.elapsed());
194
195 connect_result.inspect_err(|error| {
196 let next_backoff = retry.next_backoff.unwrap();
197 if retry.i >= mz_service::retry::INFO_MIN_RETRIES {
198 info!(
199 replica_id = %self.replica_id, ?next_backoff,
200 "error connecting to replica: {error:#}",
201 );
202 } else {
203 debug!(
204 replica_id = %self.replica_id, ?next_backoff,
205 "error connecting to replica: {error:#}",
206 );
207 }
208 })
209 };
210
211 let client = Retry::default()
212 .clamp_backoff(Duration::from_secs(1))
213 .retry_async(try_connect)
214 .await
215 .expect("retry retries forever");
216
217 self.metrics.observe_connect();
218 self.connected.store(true, atomic::Ordering::Relaxed);
219
220 client
221 }
222
223 async fn run_message_loop(mut self, mut client: Client) -> Result<(), anyhow::Error> {
229 let mut hydration =
233 SequentialHydration::new(Arc::clone(&self.dyncfg), self.metrics.clone());
234
235 loop {
236 select! {
237 command = self.command_rx.recv() => {
239 let Some(mut command) = command else {
240 break;
242 };
243
244 self.specialize_command(&mut command);
245 self.observe_command(&command);
246 for command in hydration.absorb_command(command) {
247 client.send(command).await?;
248 }
249 },
250 response = client.recv() => {
252 let Some(response) = response? else {
253 bail!("replica unexpectedly gracefully terminated connection");
254 };
255
256 self.observe_response(&response);
257
258 for command in hydration.observe_response(&response) {
259 client.send(command).await?;
260 }
261
262 if self.response_tx.send((self.replica_id, self.epoch, response)).is_err() {
263 break;
265 }
266 }
267 }
268 }
269
270 Ok(())
271 }
272
273 fn specialize_command(&self, command: &mut ComputeCommand) {
278 match command {
279 ComputeCommand::Hello { nonce } => {
280 *nonce = Uuid::new_v4();
281 }
282 ComputeCommand::CreateInstance(config) => {
283 config.logging = self.config.logging.clone();
284 if ENABLE_COMPUTE_REPLICA_EXPIRATION.get(&self.dyncfg) {
285 config.expiration_offset = self.config.expiration_offset;
286 }
287 config.arrangement_dictionary_compression =
288 self.config.arrangement_dictionary_compression;
289 }
290 _ => {}
291 }
292 }
293
294 #[mz_ore::instrument(level = "debug")]
296 fn observe_command(&self, command: &ComputeCommand) {
297 if let ComputeCommand::Peek(peek) = command {
298 peek.otel_ctx.attach_as_parent();
299 }
300
301 trace!(
302 replica = ?self.replica_id,
303 command = ?command,
304 "sending command to replica",
305 );
306
307 self.metrics.inner.command_queue_size.dec();
308 }
309
310 #[mz_ore::instrument(level = "debug")]
312 fn observe_response(&self, response: &ComputeResponse) {
313 if let ComputeResponse::PeekResponse(_, _, otel_ctx) = response {
314 otel_ctx.attach_as_parent();
315 }
316
317 trace!(
318 replica = ?self.replica_id,
319 response = ?response,
320 "received response from replica",
321 );
322 }
323}