1use std::collections::{BTreeMap, BTreeSet, btree_map};
14use std::time::{Duration, Instant};
15
16use futures::FutureExt;
17use maplit::btreemap;
18use mz_catalog::memory::objects::ClusterReplicaProcessStatus;
19use mz_controller::ControllerResponse;
20use mz_controller::clusters::{ClusterEvent, ClusterStatus};
21use mz_ore::instrument;
22use mz_ore::now::EpochMillis;
23use mz_ore::option::OptionExt;
24use mz_ore::tracing::OpenTelemetryContext;
25use mz_ore::{soft_assert_or_log, task};
26use mz_persist_client::usage::ShardsUsageReferenced;
27use mz_repr::{Datum, Diff, Row};
28use mz_sql::ast::Statement;
29use mz_sql::pure::PurifiedStatement;
30use mz_storage_client::controller::IntrospectionType;
31use opentelemetry::trace::TraceContextExt;
32use rand::{Rng, SeedableRng, rngs};
33use serde_json::json;
34use tracing::{Instrument, Level, event, info_span, warn};
35use tracing_opentelemetry::OpenTelemetrySpanExt;
36
37use crate::active_compute_sink::{ActiveComputeSink, ActiveComputeSinkRetireReason};
38use crate::catalog::{BuiltinTableUpdate, Op};
39use crate::command::Command;
40use crate::coord::{
41 AlterConnectionValidationReady, ClusterReplicaStatuses, Coordinator,
42 CreateConnectionValidationReady, Message, PurifiedStatementReady, WatchSetResponse,
43};
44use crate::telemetry::{EventDetails, SegmentClientExt};
45use crate::{AdapterNotice, TimestampContext};
46
47impl Coordinator {
48 #[instrument]
53 pub(crate) async fn handle_message(&mut self, msg: Message) -> () {
54 match msg {
55 Message::Command(otel_ctx, cmd) => {
56 let span = tracing::info_span!("message_command").or_current();
60 span.in_scope(|| otel_ctx.attach_as_parent());
61 self.message_command(cmd).instrument(span).await
62 }
63 Message::ControllerReady { controller: _ } => {
64 let Coordinator {
65 controller,
66 catalog,
67 ..
68 } = self;
69 let storage_metadata = catalog.state().storage_metadata();
70 if let Some(m) = controller
71 .process(storage_metadata)
72 .expect("`process` never returns an error")
73 {
74 self.message_controller(m).boxed_local().await
75 }
76 }
77 Message::PurifiedStatementReady(ready) => {
78 self.message_purified_statement_ready(ready)
79 .boxed_local()
80 .await
81 }
82 Message::CreateConnectionValidationReady(ready) => {
83 self.message_create_connection_validation_ready(ready)
84 .boxed_local()
85 .await
86 }
87 Message::AlterConnectionValidationReady(ready) => {
88 self.message_alter_connection_validation_ready(ready)
89 .boxed_local()
90 .await
91 }
92 Message::TryDeferred {
93 conn_id,
94 acquired_lock,
95 } => self.try_deferred(conn_id, acquired_lock).await,
96 Message::GroupCommitInitiate(span, permit) => {
97 tracing::Span::current().add_link(span.context().span().span_context().clone());
99 self.try_group_commit(permit)
100 .instrument(span)
101 .boxed_local()
102 .await
103 }
104 Message::AdvanceTimelines => {
105 self.advance_timelines().boxed_local().await;
106 }
107 Message::ClusterEvent(event) => self.message_cluster_event(event).boxed_local().await,
108 Message::CancelPendingPeeks { conn_id } => {
109 self.cancel_pending_peeks(&conn_id);
110 }
111 Message::LinearizeReads => {
112 self.message_linearize_reads().boxed_local().await;
113 }
114 Message::StagedBatches {
115 conn_id,
116 table_id,
117 batches,
118 } => {
119 self.commit_staged_batches(conn_id, table_id, batches);
120 }
121 Message::StorageUsageSchedule => {
122 self.schedule_storage_usage_collection().boxed_local().await;
123 }
124 Message::StorageUsageFetch => {
125 self.storage_usage_fetch().boxed_local().await;
126 }
127 Message::StorageUsageUpdate(sizes) => {
128 self.storage_usage_update(sizes).boxed_local().await;
129 }
130 Message::StorageUsagePrune(expired) => {
131 self.storage_usage_prune(expired).boxed_local().await;
132 }
133 Message::RetireExecute {
134 otel_ctx,
135 data,
136 reason,
137 } => {
138 otel_ctx.attach_as_parent();
139 self.retire_execution(reason, data);
140 }
141 Message::ExecuteSingleStatementTransaction {
142 ctx,
143 otel_ctx,
144 stmt,
145 params,
146 } => {
147 otel_ctx.attach_as_parent();
148 self.sequence_execute_single_statement_transaction(ctx, stmt, params)
149 .boxed_local()
150 .await;
151 }
152 Message::PeekStageReady { ctx, span, stage } => {
153 self.sequence_staged(ctx, span, stage).boxed_local().await;
154 }
155 Message::CreateIndexStageReady { ctx, span, stage } => {
156 self.sequence_staged(ctx, span, stage).boxed_local().await;
157 }
158 Message::CreateViewStageReady { ctx, span, stage } => {
159 self.sequence_staged(ctx, span, stage).boxed_local().await;
160 }
161 Message::CreateMaterializedViewStageReady { ctx, span, stage } => {
162 self.sequence_staged(ctx, span, stage).boxed_local().await;
163 }
164 Message::SubscribeStageReady { ctx, span, stage } => {
165 self.sequence_staged(ctx, span, stage).boxed_local().await;
166 }
167 Message::IntrospectionSubscribeStageReady { span, stage } => {
168 self.sequence_staged((), span, stage).boxed_local().await;
169 }
170 Message::ExplainTimestampStageReady { ctx, span, stage } => {
171 self.sequence_staged(ctx, span, stage).boxed_local().await;
172 }
173 Message::SecretStageReady { ctx, span, stage } => {
174 self.sequence_staged(ctx, span, stage).boxed_local().await;
175 }
176 Message::ClusterStageReady { ctx, span, stage } => {
177 self.sequence_staged(ctx, span, stage).boxed_local().await;
178 }
179 Message::DrainStatementLog => {
180 self.drain_statement_log();
181 }
182 Message::PrivateLinkVpcEndpointEvents(events) => {
183 if !self.controller.read_only() {
184 self.controller
185 .storage
186 .append_introspection_updates(
187 mz_storage_client::controller::IntrospectionType::PrivatelinkConnectionStatusHistory,
188 events
189 .into_iter()
190 .map(|e| (mz_repr::Row::from(e), Diff::ONE))
191 .collect(),
192 );
193 }
194 }
195 Message::CheckSchedulingPolicies => {
196 self.check_scheduling_policies().boxed_local().await;
197 }
198 Message::SchedulingDecisions(decisions) => {
199 self.handle_scheduling_decisions(decisions)
200 .boxed_local()
201 .await;
202 }
203 Message::DeferredStatementReady => {
204 self.handle_deferred_statement().boxed_local().await;
205 }
206 }
207 }
208
209 #[mz_ore::instrument(level = "debug")]
210 pub async fn storage_usage_fetch(&mut self) {
211 let internal_cmd_tx = self.internal_cmd_tx.clone();
212 let client = self.storage_usage_client.clone();
213
214 let live_shards: BTreeSet<_> = self
216 .controller
217 .storage
218 .active_collection_metadatas()
219 .into_iter()
220 .map(|(_id, m)| m.data_shard)
221 .collect();
222
223 let collection_metric = self.metrics.storage_usage_collection_time_seconds.clone();
224
225 task::spawn(|| "storage_usage_fetch", async move {
228 let collection_metric_timer = collection_metric.start_timer();
229 let shard_sizes = client.shards_usage_referenced(live_shards).await;
230 collection_metric_timer.observe_duration();
231
232 if let Err(e) = internal_cmd_tx.send(Message::StorageUsageUpdate(shard_sizes)) {
235 warn!("internal_cmd_rx dropped before we could send: {:?}", e);
236 }
237 });
238 }
239
240 #[mz_ore::instrument(level = "debug")]
241 async fn storage_usage_update(&mut self, shards_usage: ShardsUsageReferenced) {
242 let collection_timestamp = if self.controller.read_only() {
247 self.peek_local_write_ts().await.into()
248 } else {
249 self.get_local_write_ts().await.timestamp.into()
252 };
253
254 let ops = shards_usage
255 .by_shard
256 .into_iter()
257 .map(|(shard_id, shard_usage)| Op::WeirdStorageUsageUpdates {
258 object_id: Some(shard_id.to_string()),
259 size_bytes: shard_usage.size_bytes(),
260 collection_timestamp,
261 })
262 .collect();
263
264 match self.catalog_transact_inner(None, ops).await {
265 Ok((table_updates, catalog_updates)) => {
266 assert!(
267 catalog_updates.is_empty(),
268 "applying builtin table updates does not produce catalog implications"
269 );
270
271 let internal_cmd_tx = self.internal_cmd_tx.clone();
272 let task_span =
273 info_span!(parent: None, "coord::storage_usage_update::table_updates");
274 OpenTelemetryContext::obtain().attach_as_parent_to(&task_span);
275 task::spawn(|| "storage_usage_update_table_updates", async move {
276 table_updates.instrument(task_span).await;
277 if let Err(e) = internal_cmd_tx.send(Message::StorageUsageSchedule) {
279 warn!("internal_cmd_rx dropped before we could send: {e:?}");
280 }
281 });
282 }
283 Err(err) => tracing::warn!("Failed to update storage metrics: {:?}", err),
284 }
285 }
286
287 #[mz_ore::instrument(level = "debug")]
288 async fn storage_usage_prune(&mut self, expired: Vec<BuiltinTableUpdate>) {
289 let (fut, _) = self.builtin_table_update().execute(expired).await;
290 task::spawn(|| "storage_usage_pruning_apply", async move {
291 fut.await;
292 });
293 }
294
295 pub async fn schedule_storage_usage_collection(&self) {
296 const SEED_LEN: usize = 32;
304 let mut seed = [0; SEED_LEN];
305 for (i, byte) in self
306 .catalog()
307 .state()
308 .config()
309 .environment_id
310 .organization_id()
311 .as_bytes()
312 .into_iter()
313 .take(SEED_LEN)
314 .enumerate()
315 {
316 seed[i] = *byte;
317 }
318 let storage_usage_collection_interval_ms: EpochMillis =
319 EpochMillis::try_from(self.storage_usage_collection_interval.as_millis())
320 .expect("storage usage collection interval must fit into u64");
321 let offset =
322 rngs::SmallRng::from_seed(seed).random_range(0..storage_usage_collection_interval_ms);
323 let now_ts: EpochMillis = self.peek_local_write_ts().await.into();
324
325 let previous_collection_ts =
327 (now_ts - (now_ts % storage_usage_collection_interval_ms)) + offset;
328 let next_collection_ts = if previous_collection_ts > now_ts {
329 previous_collection_ts
330 } else {
331 previous_collection_ts + storage_usage_collection_interval_ms
332 };
333 let next_collection_interval = Duration::from_millis(next_collection_ts - now_ts);
334
335 let internal_cmd_tx = self.internal_cmd_tx.clone();
337 task::spawn(|| "storage_usage_collection", async move {
338 tokio::time::sleep(next_collection_interval).await;
339 if internal_cmd_tx.send(Message::StorageUsageFetch).is_err() {
340 }
342 });
343 }
344
345 #[mz_ore::instrument(level = "debug")]
346 async fn message_command(&mut self, cmd: Command) {
347 self.handle_command(cmd).await;
348 }
349
350 #[mz_ore::instrument(level = "debug")]
351 async fn message_controller(&mut self, message: ControllerResponse) {
352 event!(Level::TRACE, message = format!("{:?}", message));
353 match message {
354 ControllerResponse::PeekNotification(uuid, response, otel_ctx) => {
355 self.handle_peek_notification(uuid, response, otel_ctx);
356 }
357 ControllerResponse::SubscribeResponse(sink_id, response) => {
358 if let Some(ActiveComputeSink::Subscribe(active_subscribe)) =
359 self.active_compute_sinks.get_mut(&sink_id)
360 {
361 let finished = active_subscribe.process_response(response);
362 if finished {
363 self.retire_compute_sinks(btreemap! {
364 sink_id => ActiveComputeSinkRetireReason::Finished,
365 })
366 .await;
367 }
368
369 soft_assert_or_log!(
370 !self.introspection_subscribes.contains_key(&sink_id),
371 "`sink_id` {sink_id} unexpectedly found in both `active_subscribes` \
372 and `introspection_subscribes`",
373 );
374 } else if self.introspection_subscribes.contains_key(&sink_id) {
375 self.handle_introspection_subscribe_batch(sink_id, response)
376 .await;
377 } else {
378 }
381 }
382 ControllerResponse::CopyToResponse(sink_id, response) => {
383 match self.drop_compute_sink(sink_id).await {
384 Some(ActiveComputeSink::CopyTo(active_copy_to)) => {
385 active_copy_to.retire_with_response(response);
386 }
387 _ => {
388 }
391 }
392 }
393 ControllerResponse::WatchSetFinished(ws_ids) => {
394 let now = self.now();
395 for ws_id in ws_ids {
396 let Some((conn_id, rsp)) = self.installed_watch_sets.remove(&ws_id) else {
397 continue;
398 };
399 self.connection_watch_sets
400 .get_mut(&conn_id)
401 .expect("corrupted coordinator state: unknown connection id")
402 .remove(&ws_id);
403 if self.connection_watch_sets[&conn_id].is_empty() {
404 self.connection_watch_sets.remove(&conn_id);
405 }
406
407 match rsp {
408 WatchSetResponse::StatementDependenciesReady(id, ev) => {
409 self.record_statement_lifecycle_event(&id, &ev, now);
410 }
411 WatchSetResponse::AlterSinkReady(ctx) => {
412 self.sequence_alter_sink_finish(ctx).await;
413 }
414 }
415 }
416 }
417 }
418 }
419
420 #[mz_ore::instrument(level = "debug")]
421 async fn message_purified_statement_ready(
422 &mut self,
423 PurifiedStatementReady {
424 ctx,
425 result,
426 params,
427 mut plan_validity,
428 original_stmt,
429 otel_ctx,
430 }: PurifiedStatementReady,
431 ) {
432 otel_ctx.attach_as_parent();
433
434 if plan_validity.check(self.catalog()).is_err() {
445 self.handle_execute_inner(original_stmt, params, ctx).await;
446 return;
447 }
448
449 let purified_statement = match result {
450 Ok(ok) => ok,
451 Err(e) => return ctx.retire(Err(e)),
452 };
453
454 let plan = match purified_statement {
455 PurifiedStatement::PurifiedCreateSource {
456 create_progress_subsource_stmt,
457 create_source_stmt,
458 subsources,
459 available_source_references,
460 } => {
461 self.plan_purified_create_source(
462 &ctx,
463 params,
464 create_progress_subsource_stmt,
465 create_source_stmt,
466 subsources,
467 available_source_references,
468 )
469 .await
470 }
471 PurifiedStatement::PurifiedAlterSourceAddSubsources {
472 source_name,
473 options,
474 subsources,
475 } => {
476 self.plan_purified_alter_source_add_subsource(
477 ctx.session(),
478 params,
479 source_name,
480 options,
481 subsources,
482 )
483 .await
484 }
485 PurifiedStatement::PurifiedAlterSourceRefreshReferences {
486 source_name,
487 available_source_references,
488 } => self.plan_purified_alter_source_refresh_references(
489 ctx.session(),
490 params,
491 source_name,
492 available_source_references,
493 ),
494 o @ (PurifiedStatement::PurifiedAlterSource { .. }
495 | PurifiedStatement::PurifiedCreateSink(..)
496 | PurifiedStatement::PurifiedCreateTableFromSource { .. }) => {
497 let stmt = match o {
499 PurifiedStatement::PurifiedAlterSource { alter_source_stmt } => {
500 Statement::AlterSource(alter_source_stmt)
501 }
502 PurifiedStatement::PurifiedCreateTableFromSource { stmt } => {
503 Statement::CreateTableFromSource(stmt)
504 }
505 PurifiedStatement::PurifiedCreateSink(stmt) => Statement::CreateSink(stmt),
506 PurifiedStatement::PurifiedCreateSource { .. }
507 | PurifiedStatement::PurifiedAlterSourceAddSubsources { .. }
508 | PurifiedStatement::PurifiedAlterSourceRefreshReferences { .. } => {
509 unreachable!("not part of exterior match stmt")
510 }
511 };
512
513 let catalog = self.catalog().for_session(ctx.session());
516 let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &stmt);
517 self.plan_statement(ctx.session(), stmt, ¶ms, &resolved_ids)
518 .map(|plan| (plan, resolved_ids))
519 }
520 };
521
522 match plan {
523 Ok((plan, resolved_ids)) => self.sequence_plan(ctx, plan, resolved_ids).await,
524 Err(e) => ctx.retire(Err(e)),
525 }
526 }
527
528 #[mz_ore::instrument(level = "debug")]
529 async fn message_create_connection_validation_ready(
530 &mut self,
531 CreateConnectionValidationReady {
532 mut ctx,
533 result,
534 connection_id,
535 connection_gid,
536 mut plan_validity,
537 otel_ctx,
538 resolved_ids,
539 }: CreateConnectionValidationReady,
540 ) {
541 otel_ctx.attach_as_parent();
542
543 if let Err(e) = plan_validity.check(self.catalog()) {
549 let _ = self.secrets_controller.delete(connection_id).await;
550 return ctx.retire(Err(e));
551 }
552
553 let plan = match result {
554 Ok(ok) => ok,
555 Err(e) => {
556 let _ = self.secrets_controller.delete(connection_id).await;
557 return ctx.retire(Err(e));
558 }
559 };
560
561 let result = self
562 .sequence_create_connection_stage_finish(
563 &mut ctx,
564 connection_id,
565 connection_gid,
566 plan,
567 resolved_ids,
568 )
569 .await;
570 ctx.retire(result);
571 }
572
573 #[mz_ore::instrument(level = "debug")]
574 async fn message_alter_connection_validation_ready(
575 &mut self,
576 AlterConnectionValidationReady {
577 mut ctx,
578 result,
579 connection_id,
580 connection_gid: _,
581 mut plan_validity,
582 otel_ctx,
583 resolved_ids: _,
584 }: AlterConnectionValidationReady,
585 ) {
586 otel_ctx.attach_as_parent();
587
588 if let Err(e) = plan_validity.check(self.catalog()) {
594 return ctx.retire(Err(e));
595 }
596
597 let conn = match result {
598 Ok(ok) => ok,
599 Err(e) => {
600 return ctx.retire(Err(e));
601 }
602 };
603
604 let result = self
605 .sequence_alter_connection_stage_finish(ctx.session_mut(), connection_id, conn)
606 .await;
607 ctx.retire(result);
608 }
609
610 #[mz_ore::instrument(level = "debug")]
611 async fn message_cluster_event(&mut self, event: ClusterEvent) {
612 event!(Level::TRACE, event = format!("{:?}", event));
613
614 if let Some(segment_client) = &self.segment_client {
615 let env_id = &self.catalog().config().environment_id;
616 let mut properties = json!({
617 "cluster_id": event.cluster_id.to_string(),
618 "replica_id": event.replica_id.to_string(),
619 "process_id": event.process_id,
620 "status": event.status.as_kebab_case_str(),
621 });
622 match event.status {
623 ClusterStatus::Online => (),
624 ClusterStatus::Offline(reason) => {
625 let properties = match &mut properties {
626 serde_json::Value::Object(map) => map,
627 _ => unreachable!(),
628 };
629 properties.insert(
630 "reason".into(),
631 json!(reason.display_or("unknown").to_string()),
632 );
633 }
634 };
635 segment_client.environment_track(
636 env_id,
637 "Cluster Changed Status",
638 properties,
639 EventDetails {
640 timestamp: Some(event.time),
641 ..Default::default()
642 },
643 );
644 }
645
646 let Some(replica_statues) = self
649 .cluster_replica_statuses
650 .try_get_cluster_replica_statuses(event.cluster_id, event.replica_id)
651 else {
652 return;
653 };
654
655 if event.status != replica_statues[&event.process_id].status {
656 if !self.controller.read_only() {
657 let offline_reason = match event.status {
658 ClusterStatus::Online => None,
659 ClusterStatus::Offline(None) => None,
660 ClusterStatus::Offline(Some(reason)) => Some(reason.to_string()),
661 };
662 let row = Row::pack_slice(&[
663 Datum::String(&event.replica_id.to_string()),
664 Datum::UInt64(event.process_id),
665 Datum::String(event.status.as_kebab_case_str()),
666 Datum::from(offline_reason.as_deref()),
667 Datum::TimestampTz(event.time.try_into().expect("must fit")),
668 ]);
669 self.controller.storage.append_introspection_updates(
670 IntrospectionType::ReplicaStatusHistory,
671 vec![(row, Diff::ONE)],
672 );
673 }
674
675 let old_replica_status =
676 ClusterReplicaStatuses::cluster_replica_status(replica_statues);
677
678 let new_process_status = ClusterReplicaProcessStatus {
679 status: event.status,
680 time: event.time,
681 };
682 self.cluster_replica_statuses.ensure_cluster_status(
683 event.cluster_id,
684 event.replica_id,
685 event.process_id,
686 new_process_status,
687 );
688
689 let cluster = self.catalog().get_cluster(event.cluster_id);
690 let replica = cluster.replica(event.replica_id).expect("Replica exists");
691 let new_replica_status = self
692 .cluster_replica_statuses
693 .get_cluster_replica_status(event.cluster_id, event.replica_id);
694
695 if old_replica_status != new_replica_status {
696 let notifier = self.broadcast_notice_tx();
697 let notice = AdapterNotice::ClusterReplicaStatusChanged {
698 cluster: cluster.name.clone(),
699 replica: replica.name.clone(),
700 status: new_replica_status,
701 time: event.time,
702 };
703 notifier(notice);
704 }
705 }
706 }
707
708 #[mz_ore::instrument(level = "debug")]
709 async fn message_linearize_reads(&mut self) {
714 let mut shortest_wait = Duration::MAX;
715 let mut ready_txns = Vec::new();
716
717 let mut cached_oracle_ts = BTreeMap::new();
722
723 for (conn_id, mut read_txn) in std::mem::take(&mut self.pending_linearize_read_txns) {
724 if let TimestampContext::TimelineTimestamp {
725 timeline,
726 chosen_ts,
727 oracle_ts,
728 } = read_txn.timestamp_context()
729 {
730 let oracle_ts = match oracle_ts {
731 Some(oracle_ts) => oracle_ts,
732 None => {
733 ready_txns.push(read_txn);
735 continue;
736 }
737 };
738
739 if chosen_ts <= oracle_ts {
740 ready_txns.push(read_txn);
743 continue;
744 }
745
746 let current_oracle_ts = cached_oracle_ts.entry(timeline.clone());
748 let current_oracle_ts = match current_oracle_ts {
749 btree_map::Entry::Vacant(entry) => {
750 let timestamp_oracle = self.get_timestamp_oracle(timeline);
751 let read_ts = timestamp_oracle.read_ts().await;
752 entry.insert(read_ts.clone());
753 read_ts
754 }
755 btree_map::Entry::Occupied(entry) => entry.get().clone(),
756 };
757
758 if *chosen_ts <= current_oracle_ts {
759 ready_txns.push(read_txn);
760 } else {
761 let wait =
762 Duration::from_millis(chosen_ts.saturating_sub(current_oracle_ts).into());
763 if wait < shortest_wait {
764 shortest_wait = wait;
765 }
766 read_txn.num_requeues += 1;
767 self.pending_linearize_read_txns.insert(conn_id, read_txn);
768 }
769 } else {
770 ready_txns.push(read_txn);
771 }
772 }
773
774 if !ready_txns.is_empty() {
775 let otel_ctx = ready_txns.first().expect("known to exist").otel_ctx.clone();
778 let span = tracing::debug_span!("message_linearize_reads");
779 otel_ctx.attach_as_parent_to(&span);
780
781 let now = Instant::now();
782 for ready_txn in ready_txns {
783 let span = tracing::debug_span!("retire_read_results");
784 ready_txn.otel_ctx.attach_as_parent_to(&span);
785 let _entered = span.enter();
786 self.metrics
787 .linearize_message_seconds
788 .with_label_values(&[
789 ready_txn.txn.label(),
790 if ready_txn.num_requeues == 0 {
791 "true"
792 } else {
793 "false"
794 },
795 ])
796 .observe((now - ready_txn.created).as_secs_f64());
797 if let Some((ctx, result)) = ready_txn.txn.finish() {
798 ctx.retire(result);
799 }
800 }
801 }
802
803 if !self.pending_linearize_read_txns.is_empty() {
804 let remaining_ms = std::cmp::min(shortest_wait, Duration::from_millis(1_000));
806 let internal_cmd_tx = self.internal_cmd_tx.clone();
807 task::spawn(|| "deferred_read_txns", async move {
808 tokio::time::sleep(remaining_ms).await;
809 let result = internal_cmd_tx.send(Message::LinearizeReads);
811 if let Err(e) = result {
812 warn!("internal_cmd_rx dropped before we could send: {:?}", e);
813 }
814 });
815 }
816 }
817}