1use std::collections::{BTreeMap, BTreeSet, btree_map};
14use std::time::{Duration, Instant};
15
16use futures::FutureExt;
17use maplit::btreemap;
18use mz_catalog::memory::objects::ClusterReplicaProcessStatus;
19use mz_controller::ControllerResponse;
20use mz_controller::clusters::{ClusterEvent, ClusterStatus};
21use mz_ore::instrument;
22use mz_ore::now::EpochMillis;
23use mz_ore::option::OptionExt;
24use mz_ore::tracing::OpenTelemetryContext;
25use mz_ore::{soft_assert_or_log, task};
26use mz_persist_client::usage::ShardsUsageReferenced;
27use mz_repr::{Datum, Diff, Row};
28use mz_sql::ast::Statement;
29use mz_sql::pure::PurifiedStatement;
30use mz_storage_client::controller::IntrospectionType;
31use opentelemetry::trace::TraceContextExt;
32use rand::{Rng, SeedableRng, rngs};
33use serde_json::json;
34use tracing::{Instrument, Level, event, info_span, warn};
35use tracing_opentelemetry::OpenTelemetrySpanExt;
36
37use crate::active_compute_sink::{ActiveComputeSink, ActiveComputeSinkRetireReason};
38use crate::catalog::{BuiltinTableUpdate, Op};
39use crate::command::Command;
40use crate::coord::{
41 AlterConnectionValidationReady, ClusterReplicaStatuses, Coordinator,
42 CreateConnectionValidationReady, Message, PurifiedStatementReady, WatchSetResponse,
43};
44use crate::telemetry::{EventDetails, SegmentClientExt};
45use crate::{AdapterNotice, TimestampContext};
46
47impl Coordinator {
48 #[instrument]
53 pub(crate) async fn handle_message(&mut self, msg: Message) -> () {
54 match msg {
55 Message::Command(otel_ctx, cmd) => {
56 let span = tracing::info_span!("message_command").or_current();
60 span.in_scope(|| otel_ctx.attach_as_parent());
61 self.message_command(cmd).instrument(span).await
62 }
63 Message::ControllerReady { controller: _ } => {
64 let Coordinator {
65 controller,
66 catalog,
67 ..
68 } = self;
69 let storage_metadata = catalog.state().storage_metadata();
70 if let Some(m) = controller
71 .process(storage_metadata)
72 .expect("`process` never returns an error")
73 {
74 self.message_controller(m).boxed_local().await
75 }
76 }
77 Message::PurifiedStatementReady(ready) => {
78 self.message_purified_statement_ready(ready)
79 .boxed_local()
80 .await
81 }
82 Message::CreateConnectionValidationReady(ready) => {
83 self.message_create_connection_validation_ready(ready)
84 .boxed_local()
85 .await
86 }
87 Message::AlterConnectionValidationReady(ready) => {
88 self.message_alter_connection_validation_ready(ready)
89 .boxed_local()
90 .await
91 }
92 Message::TryDeferred {
93 conn_id,
94 acquired_lock,
95 } => self.try_deferred(conn_id, acquired_lock).await,
96 Message::GroupCommitInitiate(span, permit) => {
97 tracing::Span::current().add_link(span.context().span().span_context().clone());
99 self.try_group_commit(permit)
100 .instrument(span)
101 .boxed_local()
102 .await
103 }
104 Message::AdvanceTimelines => {
105 self.advance_timelines().boxed_local().await;
106 }
107 Message::ClusterEvent(event) => self.message_cluster_event(event).boxed_local().await,
108 Message::CancelPendingPeeks { conn_id } => {
109 self.cancel_pending_peeks(&conn_id);
110 }
111 Message::LinearizeReads => {
112 self.message_linearize_reads().boxed_local().await;
113 }
114 Message::StagedBatches {
115 conn_id,
116 table_id,
117 batches,
118 } => {
119 self.commit_staged_batches(conn_id, table_id, batches);
120 }
121 Message::StorageUsageSchedule => {
122 self.schedule_storage_usage_collection().boxed_local().await;
123 }
124 Message::StorageUsageFetch => {
125 self.storage_usage_fetch().boxed_local().await;
126 }
127 Message::StorageUsageUpdate(sizes) => {
128 self.storage_usage_update(sizes).boxed_local().await;
129 }
130 Message::StorageUsagePrune(expired) => {
131 self.storage_usage_prune(expired).boxed_local().await;
132 }
133 Message::RetireExecute {
134 otel_ctx,
135 data,
136 reason,
137 } => {
138 otel_ctx.attach_as_parent();
139 self.retire_execution(reason, data);
140 }
141 Message::ExecuteSingleStatementTransaction {
142 ctx,
143 otel_ctx,
144 stmt,
145 params,
146 } => {
147 otel_ctx.attach_as_parent();
148 self.sequence_execute_single_statement_transaction(ctx, stmt, params)
149 .boxed_local()
150 .await;
151 }
152 Message::PeekStageReady { ctx, span, stage } => {
153 self.sequence_staged(ctx, span, stage).boxed_local().await;
154 }
155 Message::CreateIndexStageReady { ctx, span, stage } => {
156 self.sequence_staged(ctx, span, stage).boxed_local().await;
157 }
158 Message::CreateViewStageReady { ctx, span, stage } => {
159 self.sequence_staged(ctx, span, stage).boxed_local().await;
160 }
161 Message::CreateMaterializedViewStageReady { ctx, span, stage } => {
162 self.sequence_staged(ctx, span, stage).boxed_local().await;
163 }
164 Message::SubscribeStageReady { ctx, span, stage } => {
165 self.sequence_staged(ctx, span, stage).boxed_local().await;
166 }
167 Message::IntrospectionSubscribeStageReady { span, stage } => {
168 self.sequence_staged((), span, stage).boxed_local().await;
169 }
170 Message::ExplainTimestampStageReady { ctx, span, stage } => {
171 self.sequence_staged(ctx, span, stage).boxed_local().await;
172 }
173 Message::SecretStageReady { ctx, span, stage } => {
174 self.sequence_staged(ctx, span, stage).boxed_local().await;
175 }
176 Message::ClusterStageReady { ctx, span, stage } => {
177 self.sequence_staged(ctx, span, stage).boxed_local().await;
178 }
179 Message::DrainStatementLog => {
180 self.drain_statement_log();
181 }
182 Message::PrivateLinkVpcEndpointEvents(events) => {
183 if !self.controller.read_only() {
184 self.controller
185 .storage
186 .append_introspection_updates(
187 mz_storage_client::controller::IntrospectionType::PrivatelinkConnectionStatusHistory,
188 events
189 .into_iter()
190 .map(|e| (mz_repr::Row::from(e), Diff::ONE))
191 .collect(),
192 );
193 }
194 }
195 Message::CheckSchedulingPolicies => {
196 self.check_scheduling_policies().boxed_local().await;
197 }
198 Message::SchedulingDecisions(decisions) => {
199 self.handle_scheduling_decisions(decisions)
200 .boxed_local()
201 .await;
202 }
203 Message::DeferredStatementReady => {
204 self.handle_deferred_statement().boxed_local().await;
205 }
206 }
207 }
208
209 #[mz_ore::instrument(level = "debug")]
210 pub async fn storage_usage_fetch(&mut self) {
211 let internal_cmd_tx = self.internal_cmd_tx.clone();
212 let client = self.storage_usage_client.clone();
213
214 let live_shards: BTreeSet<_> = self
216 .controller
217 .storage
218 .active_collection_metadatas()
219 .into_iter()
220 .map(|(_id, m)| m.data_shard)
221 .collect();
222
223 let collection_metric = self
224 .metrics
225 .storage_usage_collection_time_seconds
226 .with_label_values(&[]);
227
228 task::spawn(|| "storage_usage_fetch", async move {
231 let collection_metric_timer = collection_metric.start_timer();
232 let shard_sizes = client.shards_usage_referenced(live_shards).await;
233 collection_metric_timer.observe_duration();
234
235 if let Err(e) = internal_cmd_tx.send(Message::StorageUsageUpdate(shard_sizes)) {
238 warn!("internal_cmd_rx dropped before we could send: {:?}", e);
239 }
240 });
241 }
242
243 #[mz_ore::instrument(level = "debug")]
244 async fn storage_usage_update(&mut self, shards_usage: ShardsUsageReferenced) {
245 let collection_timestamp = if self.controller.read_only() {
250 self.peek_local_write_ts().await.into()
251 } else {
252 self.get_local_write_ts().await.timestamp.into()
255 };
256
257 let ops = shards_usage
258 .by_shard
259 .into_iter()
260 .map(|(shard_id, shard_usage)| Op::WeirdStorageUsageUpdates {
261 object_id: Some(shard_id.to_string()),
262 size_bytes: shard_usage.size_bytes(),
263 collection_timestamp,
264 })
265 .collect();
266
267 match self.catalog_transact_inner(None, ops).await {
268 Ok(table_updates) => {
269 let internal_cmd_tx = self.internal_cmd_tx.clone();
270 let task_span =
271 info_span!(parent: None, "coord::storage_usage_update::table_updates");
272 OpenTelemetryContext::obtain().attach_as_parent_to(&task_span);
273 task::spawn(|| "storage_usage_update_table_updates", async move {
274 table_updates.instrument(task_span).await;
275 if let Err(e) = internal_cmd_tx.send(Message::StorageUsageSchedule) {
277 warn!("internal_cmd_rx dropped before we could send: {e:?}");
278 }
279 });
280 }
281 Err(err) => tracing::warn!("Failed to update storage metrics: {:?}", err),
282 }
283 }
284
285 #[mz_ore::instrument(level = "debug")]
286 async fn storage_usage_prune(&mut self, expired: Vec<BuiltinTableUpdate>) {
287 let (fut, _) = self.builtin_table_update().execute(expired).await;
288 task::spawn(|| "storage_usage_pruning_apply", async move {
289 fut.await;
290 });
291 }
292
293 pub async fn schedule_storage_usage_collection(&self) {
294 const SEED_LEN: usize = 32;
302 let mut seed = [0; SEED_LEN];
303 for (i, byte) in self
304 .catalog()
305 .state()
306 .config()
307 .environment_id
308 .organization_id()
309 .as_bytes()
310 .into_iter()
311 .take(SEED_LEN)
312 .enumerate()
313 {
314 seed[i] = *byte;
315 }
316 let storage_usage_collection_interval_ms: EpochMillis =
317 EpochMillis::try_from(self.storage_usage_collection_interval.as_millis())
318 .expect("storage usage collection interval must fit into u64");
319 let offset =
320 rngs::SmallRng::from_seed(seed).gen_range(0..storage_usage_collection_interval_ms);
321 let now_ts: EpochMillis = self.peek_local_write_ts().await.into();
322
323 let previous_collection_ts =
325 (now_ts - (now_ts % storage_usage_collection_interval_ms)) + offset;
326 let next_collection_ts = if previous_collection_ts > now_ts {
327 previous_collection_ts
328 } else {
329 previous_collection_ts + storage_usage_collection_interval_ms
330 };
331 let next_collection_interval = Duration::from_millis(next_collection_ts - now_ts);
332
333 let internal_cmd_tx = self.internal_cmd_tx.clone();
335 task::spawn(|| "storage_usage_collection", async move {
336 tokio::time::sleep(next_collection_interval).await;
337 if internal_cmd_tx.send(Message::StorageUsageFetch).is_err() {
338 }
340 });
341 }
342
343 #[mz_ore::instrument(level = "debug")]
344 async fn message_command(&mut self, cmd: Command) {
345 self.handle_command(cmd).await;
346 }
347
348 #[mz_ore::instrument(level = "debug")]
349 async fn message_controller(&mut self, message: ControllerResponse) {
350 event!(Level::TRACE, message = format!("{:?}", message));
351 match message {
352 ControllerResponse::PeekNotification(uuid, response, otel_ctx) => {
353 self.handle_peek_notification(uuid, response, otel_ctx);
354 }
355 ControllerResponse::SubscribeResponse(sink_id, response) => {
356 if let Some(ActiveComputeSink::Subscribe(active_subscribe)) =
357 self.active_compute_sinks.get_mut(&sink_id)
358 {
359 let finished = active_subscribe.process_response(response);
360 if finished {
361 self.retire_compute_sinks(btreemap! {
362 sink_id => ActiveComputeSinkRetireReason::Finished,
363 })
364 .await;
365 }
366
367 soft_assert_or_log!(
368 !self.introspection_subscribes.contains_key(&sink_id),
369 "`sink_id` {sink_id} unexpectedly found in both `active_subscribes` \
370 and `introspection_subscribes`",
371 );
372 } else if self.introspection_subscribes.contains_key(&sink_id) {
373 self.handle_introspection_subscribe_batch(sink_id, response)
374 .await;
375 } else {
376 }
379 }
380 ControllerResponse::CopyToResponse(sink_id, response) => {
381 match self.drop_compute_sink(sink_id).await {
382 Some(ActiveComputeSink::CopyTo(active_copy_to)) => {
383 active_copy_to.retire_with_response(response);
384 }
385 _ => {
386 }
389 }
390 }
391 ControllerResponse::WatchSetFinished(ws_ids) => {
392 let now = self.now();
393 for ws_id in ws_ids {
394 let Some((conn_id, rsp)) = self.installed_watch_sets.remove(&ws_id) else {
395 continue;
396 };
397 self.connection_watch_sets
398 .get_mut(&conn_id)
399 .expect("corrupted coordinator state: unknown connection id")
400 .remove(&ws_id);
401 if self.connection_watch_sets[&conn_id].is_empty() {
402 self.connection_watch_sets.remove(&conn_id);
403 }
404
405 match rsp {
406 WatchSetResponse::StatementDependenciesReady(id, ev) => {
407 self.record_statement_lifecycle_event(&id, &ev, now);
408 }
409 WatchSetResponse::AlterSinkReady(ctx) => {
410 self.sequence_alter_sink_finish(ctx).await;
411 }
412 }
413 }
414 }
415 }
416 }
417
418 #[mz_ore::instrument(level = "debug")]
419 async fn message_purified_statement_ready(
420 &mut self,
421 PurifiedStatementReady {
422 ctx,
423 result,
424 params,
425 mut plan_validity,
426 original_stmt,
427 otel_ctx,
428 }: PurifiedStatementReady,
429 ) {
430 otel_ctx.attach_as_parent();
431
432 if plan_validity.check(self.catalog()).is_err() {
443 self.handle_execute_inner(original_stmt, params, ctx).await;
444 return;
445 }
446
447 let purified_statement = match result {
448 Ok(ok) => ok,
449 Err(e) => return ctx.retire(Err(e)),
450 };
451
452 let plan = match purified_statement {
453 PurifiedStatement::PurifiedCreateSource {
454 create_progress_subsource_stmt,
455 create_source_stmt,
456 subsources,
457 available_source_references,
458 } => {
459 self.plan_purified_create_source(
460 &ctx,
461 params,
462 create_progress_subsource_stmt,
463 create_source_stmt,
464 subsources,
465 available_source_references,
466 )
467 .await
468 }
469 PurifiedStatement::PurifiedAlterSourceAddSubsources {
470 source_name,
471 options,
472 subsources,
473 } => {
474 self.plan_purified_alter_source_add_subsource(
475 ctx.session(),
476 params,
477 source_name,
478 options,
479 subsources,
480 )
481 .await
482 }
483 PurifiedStatement::PurifiedAlterSourceRefreshReferences {
484 source_name,
485 available_source_references,
486 } => self.plan_purified_alter_source_refresh_references(
487 ctx.session(),
488 params,
489 source_name,
490 available_source_references,
491 ),
492 o @ (PurifiedStatement::PurifiedAlterSource { .. }
493 | PurifiedStatement::PurifiedCreateSink(..)
494 | PurifiedStatement::PurifiedCreateTableFromSource { .. }) => {
495 let stmt = match o {
497 PurifiedStatement::PurifiedAlterSource { alter_source_stmt } => {
498 Statement::AlterSource(alter_source_stmt)
499 }
500 PurifiedStatement::PurifiedCreateTableFromSource { stmt } => {
501 Statement::CreateTableFromSource(stmt)
502 }
503 PurifiedStatement::PurifiedCreateSink(stmt) => Statement::CreateSink(stmt),
504 PurifiedStatement::PurifiedCreateSource { .. }
505 | PurifiedStatement::PurifiedAlterSourceAddSubsources { .. }
506 | PurifiedStatement::PurifiedAlterSourceRefreshReferences { .. } => {
507 unreachable!("not part of exterior match stmt")
508 }
509 };
510
511 let catalog = self.catalog().for_session(ctx.session());
514 let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &stmt);
515 self.plan_statement(ctx.session(), stmt, ¶ms, &resolved_ids)
516 .map(|plan| (plan, resolved_ids))
517 }
518 };
519
520 match plan {
521 Ok((plan, resolved_ids)) => self.sequence_plan(ctx, plan, resolved_ids).await,
522 Err(e) => ctx.retire(Err(e)),
523 }
524 }
525
526 #[mz_ore::instrument(level = "debug")]
527 async fn message_create_connection_validation_ready(
528 &mut self,
529 CreateConnectionValidationReady {
530 mut ctx,
531 result,
532 connection_id,
533 connection_gid,
534 mut plan_validity,
535 otel_ctx,
536 resolved_ids,
537 }: CreateConnectionValidationReady,
538 ) {
539 otel_ctx.attach_as_parent();
540
541 if let Err(e) = plan_validity.check(self.catalog()) {
547 let _ = self.secrets_controller.delete(connection_id).await;
548 return ctx.retire(Err(e));
549 }
550
551 let plan = match result {
552 Ok(ok) => ok,
553 Err(e) => {
554 let _ = self.secrets_controller.delete(connection_id).await;
555 return ctx.retire(Err(e));
556 }
557 };
558
559 let result = self
560 .sequence_create_connection_stage_finish(
561 &mut ctx,
562 connection_id,
563 connection_gid,
564 plan,
565 resolved_ids,
566 )
567 .await;
568 ctx.retire(result);
569 }
570
571 #[mz_ore::instrument(level = "debug")]
572 async fn message_alter_connection_validation_ready(
573 &mut self,
574 AlterConnectionValidationReady {
575 mut ctx,
576 result,
577 connection_id,
578 connection_gid: _,
579 mut plan_validity,
580 otel_ctx,
581 resolved_ids: _,
582 }: AlterConnectionValidationReady,
583 ) {
584 otel_ctx.attach_as_parent();
585
586 if let Err(e) = plan_validity.check(self.catalog()) {
592 return ctx.retire(Err(e));
593 }
594
595 let conn = match result {
596 Ok(ok) => ok,
597 Err(e) => {
598 return ctx.retire(Err(e));
599 }
600 };
601
602 let result = self
603 .sequence_alter_connection_stage_finish(ctx.session_mut(), connection_id, conn)
604 .await;
605 ctx.retire(result);
606 }
607
608 #[mz_ore::instrument(level = "debug")]
609 async fn message_cluster_event(&mut self, event: ClusterEvent) {
610 event!(Level::TRACE, event = format!("{:?}", event));
611
612 if let Some(segment_client) = &self.segment_client {
613 let env_id = &self.catalog().config().environment_id;
614 let mut properties = json!({
615 "cluster_id": event.cluster_id.to_string(),
616 "replica_id": event.replica_id.to_string(),
617 "process_id": event.process_id,
618 "status": event.status.as_kebab_case_str(),
619 });
620 match event.status {
621 ClusterStatus::Online => (),
622 ClusterStatus::Offline(reason) => {
623 let properties = match &mut properties {
624 serde_json::Value::Object(map) => map,
625 _ => unreachable!(),
626 };
627 properties.insert(
628 "reason".into(),
629 json!(reason.display_or("unknown").to_string()),
630 );
631 }
632 };
633 segment_client.environment_track(
634 env_id,
635 "Cluster Changed Status",
636 properties,
637 EventDetails {
638 timestamp: Some(event.time),
639 ..Default::default()
640 },
641 );
642 }
643
644 let Some(replica_statues) = self
647 .cluster_replica_statuses
648 .try_get_cluster_replica_statuses(event.cluster_id, event.replica_id)
649 else {
650 return;
651 };
652
653 if event.status != replica_statues[&event.process_id].status {
654 if !self.controller.read_only() {
655 let offline_reason = match event.status {
656 ClusterStatus::Online => None,
657 ClusterStatus::Offline(None) => None,
658 ClusterStatus::Offline(Some(reason)) => Some(reason.to_string()),
659 };
660 let row = Row::pack_slice(&[
661 Datum::String(&event.replica_id.to_string()),
662 Datum::UInt64(event.process_id),
663 Datum::String(event.status.as_kebab_case_str()),
664 Datum::from(offline_reason.as_deref()),
665 Datum::TimestampTz(event.time.try_into().expect("must fit")),
666 ]);
667 self.controller.storage.append_introspection_updates(
668 IntrospectionType::ReplicaStatusHistory,
669 vec![(row, Diff::ONE)],
670 );
671 }
672
673 let old_replica_status =
674 ClusterReplicaStatuses::cluster_replica_status(replica_statues);
675
676 let new_process_status = ClusterReplicaProcessStatus {
677 status: event.status,
678 time: event.time,
679 };
680 self.cluster_replica_statuses.ensure_cluster_status(
681 event.cluster_id,
682 event.replica_id,
683 event.process_id,
684 new_process_status,
685 );
686
687 let cluster = self.catalog().get_cluster(event.cluster_id);
688 let replica = cluster.replica(event.replica_id).expect("Replica exists");
689 let new_replica_status = self
690 .cluster_replica_statuses
691 .get_cluster_replica_status(event.cluster_id, event.replica_id);
692
693 if old_replica_status != new_replica_status {
694 let notifier = self.broadcast_notice_tx();
695 let notice = AdapterNotice::ClusterReplicaStatusChanged {
696 cluster: cluster.name.clone(),
697 replica: replica.name.clone(),
698 status: new_replica_status,
699 time: event.time,
700 };
701 notifier(notice);
702 }
703 }
704 }
705
706 #[mz_ore::instrument(level = "debug")]
707 async fn message_linearize_reads(&mut self) {
712 let mut shortest_wait = Duration::MAX;
713 let mut ready_txns = Vec::new();
714
715 let mut cached_oracle_ts = BTreeMap::new();
720
721 for (conn_id, mut read_txn) in std::mem::take(&mut self.pending_linearize_read_txns) {
722 if let TimestampContext::TimelineTimestamp {
723 timeline,
724 chosen_ts,
725 oracle_ts,
726 } = read_txn.timestamp_context()
727 {
728 let oracle_ts = match oracle_ts {
729 Some(oracle_ts) => oracle_ts,
730 None => {
731 ready_txns.push(read_txn);
733 continue;
734 }
735 };
736
737 if chosen_ts <= oracle_ts {
738 ready_txns.push(read_txn);
741 continue;
742 }
743
744 let current_oracle_ts = cached_oracle_ts.entry(timeline.clone());
746 let current_oracle_ts = match current_oracle_ts {
747 btree_map::Entry::Vacant(entry) => {
748 let timestamp_oracle = self.get_timestamp_oracle(timeline);
749 let read_ts = timestamp_oracle.read_ts().await;
750 entry.insert(read_ts.clone());
751 read_ts
752 }
753 btree_map::Entry::Occupied(entry) => entry.get().clone(),
754 };
755
756 if *chosen_ts <= current_oracle_ts {
757 ready_txns.push(read_txn);
758 } else {
759 let wait =
760 Duration::from_millis(chosen_ts.saturating_sub(current_oracle_ts).into());
761 if wait < shortest_wait {
762 shortest_wait = wait;
763 }
764 read_txn.num_requeues += 1;
765 self.pending_linearize_read_txns.insert(conn_id, read_txn);
766 }
767 } else {
768 ready_txns.push(read_txn);
769 }
770 }
771
772 if !ready_txns.is_empty() {
773 let otel_ctx = ready_txns.first().expect("known to exist").otel_ctx.clone();
776 let span = tracing::debug_span!("message_linearize_reads");
777 otel_ctx.attach_as_parent_to(&span);
778
779 let now = Instant::now();
780 for ready_txn in ready_txns {
781 let span = tracing::debug_span!("retire_read_results");
782 ready_txn.otel_ctx.attach_as_parent_to(&span);
783 let _entered = span.enter();
784 self.metrics
785 .linearize_message_seconds
786 .with_label_values(&[
787 ready_txn.txn.label(),
788 if ready_txn.num_requeues == 0 {
789 "true"
790 } else {
791 "false"
792 },
793 ])
794 .observe((now - ready_txn.created).as_secs_f64());
795 if let Some((ctx, result)) = ready_txn.txn.finish() {
796 ctx.retire(result);
797 }
798 }
799 }
800
801 if !self.pending_linearize_read_txns.is_empty() {
802 let remaining_ms = std::cmp::min(shortest_wait, Duration::from_millis(1_000));
804 let internal_cmd_tx = self.internal_cmd_tx.clone();
805 task::spawn(|| "deferred_read_txns", async move {
806 tokio::time::sleep(remaining_ms).await;
807 let result = internal_cmd_tx.send(Message::LinearizeReads);
809 if let Err(e) = result {
810 warn!("internal_cmd_rx dropped before we could send: {:?}", e);
811 }
812 });
813 }
814 }
815}