1use std::collections::{BTreeMap, BTreeSet, btree_map};
14use std::time::{Duration, Instant};
15
16use futures::FutureExt;
17use maplit::btreemap;
18use mz_catalog::memory::objects::ClusterReplicaProcessStatus;
19use mz_controller::ControllerResponse;
20use mz_controller::clusters::{ClusterEvent, ClusterStatus};
21use mz_ore::instrument;
22use mz_ore::now::EpochMillis;
23use mz_ore::option::OptionExt;
24use mz_ore::tracing::OpenTelemetryContext;
25use mz_ore::{soft_assert_or_log, task};
26use mz_persist_client::usage::ShardsUsageReferenced;
27use mz_repr::{Datum, Diff, Row};
28use mz_sql::ast::Statement;
29use mz_sql::pure::PurifiedStatement;
30use mz_storage_client::controller::IntrospectionType;
31use opentelemetry::trace::TraceContextExt;
32use rand::{Rng, SeedableRng, rngs};
33use serde_json::json;
34use tracing::{Instrument, Level, event, info_span, warn};
35use tracing_opentelemetry::OpenTelemetrySpanExt;
36
37use crate::active_compute_sink::{ActiveComputeSink, ActiveComputeSinkRetireReason};
38use crate::catalog::{BuiltinTableUpdate, Op};
39use crate::command::Command;
40use crate::coord::{
41 AlterConnectionValidationReady, ClusterReplicaStatuses, Coordinator,
42 CreateConnectionValidationReady, Message, PurifiedStatementReady, WatchSetResponse,
43};
44use crate::telemetry::{EventDetails, SegmentClientExt};
45use crate::{AdapterNotice, TimestampContext};
46
47impl Coordinator {
48 #[instrument]
53 pub(crate) async fn handle_message(&mut self, msg: Message) -> () {
54 match msg {
55 Message::Command(otel_ctx, cmd) => {
56 let span = tracing::info_span!("message_command").or_current();
60 span.in_scope(|| otel_ctx.attach_as_parent());
61 self.message_command(cmd).instrument(span).await
62 }
63 Message::ControllerReady { controller: _ } => {
64 let Coordinator {
65 controller,
66 catalog,
67 ..
68 } = self;
69 let storage_metadata = catalog.state().storage_metadata();
70 if let Some(m) = controller
71 .process(storage_metadata)
72 .expect("`process` never returns an error")
73 {
74 self.message_controller(m).boxed_local().await
75 }
76 }
77 Message::PurifiedStatementReady(ready) => {
78 self.message_purified_statement_ready(ready)
79 .boxed_local()
80 .await
81 }
82 Message::CreateConnectionValidationReady(ready) => {
83 self.message_create_connection_validation_ready(ready)
84 .boxed_local()
85 .await
86 }
87 Message::AlterConnectionValidationReady(ready) => {
88 self.message_alter_connection_validation_ready(ready)
89 .boxed_local()
90 .await
91 }
92 Message::TryDeferred {
93 conn_id,
94 acquired_lock,
95 } => self.try_deferred(conn_id, acquired_lock).await,
96 Message::GroupCommitInitiate(span, permit) => {
97 tracing::Span::current().add_link(span.context().span().span_context().clone());
99 self.try_group_commit(permit)
100 .instrument(span)
101 .boxed_local()
102 .await
103 }
104 Message::AdvanceTimelines => {
105 self.advance_timelines().boxed_local().await;
106 }
107 Message::ClusterEvent(event) => self.message_cluster_event(event).boxed_local().await,
108 Message::CancelPendingPeeks { conn_id } => {
109 self.cancel_pending_peeks(&conn_id);
110 }
111 Message::LinearizeReads => {
112 self.message_linearize_reads().boxed_local().await;
113 }
114 Message::StagedBatches {
115 conn_id,
116 table_id,
117 batches,
118 } => {
119 self.commit_staged_batches(conn_id, table_id, batches);
120 }
121 Message::StorageUsageSchedule => {
122 self.schedule_storage_usage_collection().boxed_local().await;
123 }
124 Message::StorageUsageFetch => {
125 self.storage_usage_fetch().boxed_local().await;
126 }
127 Message::StorageUsageUpdate(sizes) => {
128 self.storage_usage_update(sizes).boxed_local().await;
129 }
130 Message::StorageUsagePrune(expired) => {
131 self.storage_usage_prune(expired).boxed_local().await;
132 }
133 Message::RetireExecute {
134 otel_ctx,
135 data,
136 reason,
137 } => {
138 otel_ctx.attach_as_parent();
139 self.retire_execution(reason, data);
140 }
141 Message::ExecuteSingleStatementTransaction {
142 ctx,
143 otel_ctx,
144 stmt,
145 params,
146 } => {
147 otel_ctx.attach_as_parent();
148 self.sequence_execute_single_statement_transaction(ctx, stmt, params)
149 .boxed_local()
150 .await;
151 }
152 Message::PeekStageReady { ctx, span, stage } => {
153 self.sequence_staged(ctx, span, stage).boxed_local().await;
154 }
155 Message::CreateIndexStageReady { ctx, span, stage } => {
156 self.sequence_staged(ctx, span, stage).boxed_local().await;
157 }
158 Message::CreateViewStageReady { ctx, span, stage } => {
159 self.sequence_staged(ctx, span, stage).boxed_local().await;
160 }
161 Message::CreateMaterializedViewStageReady { ctx, span, stage } => {
162 self.sequence_staged(ctx, span, stage).boxed_local().await;
163 }
164 Message::SubscribeStageReady { ctx, span, stage } => {
165 self.sequence_staged(ctx, span, stage).boxed_local().await;
166 }
167 Message::IntrospectionSubscribeStageReady { span, stage } => {
168 self.sequence_staged((), span, stage).boxed_local().await;
169 }
170 Message::ExplainTimestampStageReady { ctx, span, stage } => {
171 self.sequence_staged(ctx, span, stage).boxed_local().await;
172 }
173 Message::SecretStageReady { ctx, span, stage } => {
174 self.sequence_staged(ctx, span, stage).boxed_local().await;
175 }
176 Message::ClusterStageReady { ctx, span, stage } => {
177 self.sequence_staged(ctx, span, stage).boxed_local().await;
178 }
179 Message::DrainStatementLog => {
180 self.drain_statement_log();
181 }
182 Message::PrivateLinkVpcEndpointEvents(events) => {
183 if !self.controller.read_only() {
184 self.controller
185 .storage
186 .append_introspection_updates(
187 mz_storage_client::controller::IntrospectionType::PrivatelinkConnectionStatusHistory,
188 events
189 .into_iter()
190 .map(|e| (mz_repr::Row::from(e), Diff::ONE))
191 .collect(),
192 );
193 }
194 }
195 Message::CheckSchedulingPolicies => {
196 self.check_scheduling_policies().boxed_local().await;
197 }
198 Message::SchedulingDecisions(decisions) => {
199 self.handle_scheduling_decisions(decisions)
200 .boxed_local()
201 .await;
202 }
203 Message::DeferredStatementReady => {
204 self.handle_deferred_statement().boxed_local().await;
205 }
206 }
207 }
208
209 #[mz_ore::instrument(level = "debug")]
210 pub async fn storage_usage_fetch(&mut self) {
211 let internal_cmd_tx = self.internal_cmd_tx.clone();
212 let client = self.storage_usage_client.clone();
213
214 let live_shards: BTreeSet<_> = self
216 .controller
217 .storage
218 .active_collection_metadatas()
219 .into_iter()
220 .map(|(_id, m)| m.data_shard)
221 .collect();
222
223 let collection_metric = self.metrics.storage_usage_collection_time_seconds.clone();
224
225 task::spawn(|| "storage_usage_fetch", async move {
228 let collection_metric_timer = collection_metric.start_timer();
229 let shard_sizes = client.shards_usage_referenced(live_shards).await;
230 collection_metric_timer.observe_duration();
231
232 if let Err(e) = internal_cmd_tx.send(Message::StorageUsageUpdate(shard_sizes)) {
235 warn!("internal_cmd_rx dropped before we could send: {:?}", e);
236 }
237 });
238 }
239
240 #[mz_ore::instrument(level = "debug")]
241 async fn storage_usage_update(&mut self, shards_usage: ShardsUsageReferenced) {
242 let collection_timestamp = if self.controller.read_only() {
247 self.peek_local_write_ts().await.into()
248 } else {
249 self.get_local_write_ts().await.timestamp.into()
252 };
253
254 let ops = shards_usage
255 .by_shard
256 .into_iter()
257 .map(|(shard_id, shard_usage)| Op::WeirdStorageUsageUpdates {
258 object_id: Some(shard_id.to_string()),
259 size_bytes: shard_usage.size_bytes(),
260 collection_timestamp,
261 })
262 .collect();
263
264 match self.catalog_transact_inner(None, ops).await {
265 Ok(table_updates) => {
266 let internal_cmd_tx = self.internal_cmd_tx.clone();
267 let task_span =
268 info_span!(parent: None, "coord::storage_usage_update::table_updates");
269 OpenTelemetryContext::obtain().attach_as_parent_to(&task_span);
270 task::spawn(|| "storage_usage_update_table_updates", async move {
271 table_updates.instrument(task_span).await;
272 if let Err(e) = internal_cmd_tx.send(Message::StorageUsageSchedule) {
274 warn!("internal_cmd_rx dropped before we could send: {e:?}");
275 }
276 });
277 }
278 Err(err) => tracing::warn!("Failed to update storage metrics: {:?}", err),
279 }
280 }
281
282 #[mz_ore::instrument(level = "debug")]
283 async fn storage_usage_prune(&mut self, expired: Vec<BuiltinTableUpdate>) {
284 let (fut, _) = self.builtin_table_update().execute(expired).await;
285 task::spawn(|| "storage_usage_pruning_apply", async move {
286 fut.await;
287 });
288 }
289
290 pub async fn schedule_storage_usage_collection(&self) {
291 const SEED_LEN: usize = 32;
299 let mut seed = [0; SEED_LEN];
300 for (i, byte) in self
301 .catalog()
302 .state()
303 .config()
304 .environment_id
305 .organization_id()
306 .as_bytes()
307 .into_iter()
308 .take(SEED_LEN)
309 .enumerate()
310 {
311 seed[i] = *byte;
312 }
313 let storage_usage_collection_interval_ms: EpochMillis =
314 EpochMillis::try_from(self.storage_usage_collection_interval.as_millis())
315 .expect("storage usage collection interval must fit into u64");
316 let offset =
317 rngs::SmallRng::from_seed(seed).gen_range(0..storage_usage_collection_interval_ms);
318 let now_ts: EpochMillis = self.peek_local_write_ts().await.into();
319
320 let previous_collection_ts =
322 (now_ts - (now_ts % storage_usage_collection_interval_ms)) + offset;
323 let next_collection_ts = if previous_collection_ts > now_ts {
324 previous_collection_ts
325 } else {
326 previous_collection_ts + storage_usage_collection_interval_ms
327 };
328 let next_collection_interval = Duration::from_millis(next_collection_ts - now_ts);
329
330 let internal_cmd_tx = self.internal_cmd_tx.clone();
332 task::spawn(|| "storage_usage_collection", async move {
333 tokio::time::sleep(next_collection_interval).await;
334 if internal_cmd_tx.send(Message::StorageUsageFetch).is_err() {
335 }
337 });
338 }
339
340 #[mz_ore::instrument(level = "debug")]
341 async fn message_command(&mut self, cmd: Command) {
342 self.handle_command(cmd).await;
343 }
344
345 #[mz_ore::instrument(level = "debug")]
346 async fn message_controller(&mut self, message: ControllerResponse) {
347 event!(Level::TRACE, message = format!("{:?}", message));
348 match message {
349 ControllerResponse::PeekNotification(uuid, response, otel_ctx) => {
350 self.handle_peek_notification(uuid, response, otel_ctx);
351 }
352 ControllerResponse::SubscribeResponse(sink_id, response) => {
353 if let Some(ActiveComputeSink::Subscribe(active_subscribe)) =
354 self.active_compute_sinks.get_mut(&sink_id)
355 {
356 let finished = active_subscribe.process_response(response);
357 if finished {
358 self.retire_compute_sinks(btreemap! {
359 sink_id => ActiveComputeSinkRetireReason::Finished,
360 })
361 .await;
362 }
363
364 soft_assert_or_log!(
365 !self.introspection_subscribes.contains_key(&sink_id),
366 "`sink_id` {sink_id} unexpectedly found in both `active_subscribes` \
367 and `introspection_subscribes`",
368 );
369 } else if self.introspection_subscribes.contains_key(&sink_id) {
370 self.handle_introspection_subscribe_batch(sink_id, response)
371 .await;
372 } else {
373 }
376 }
377 ControllerResponse::CopyToResponse(sink_id, response) => {
378 match self.drop_compute_sink(sink_id).await {
379 Some(ActiveComputeSink::CopyTo(active_copy_to)) => {
380 active_copy_to.retire_with_response(response);
381 }
382 _ => {
383 }
386 }
387 }
388 ControllerResponse::WatchSetFinished(ws_ids) => {
389 let now = self.now();
390 for ws_id in ws_ids {
391 let Some((conn_id, rsp)) = self.installed_watch_sets.remove(&ws_id) else {
392 continue;
393 };
394 self.connection_watch_sets
395 .get_mut(&conn_id)
396 .expect("corrupted coordinator state: unknown connection id")
397 .remove(&ws_id);
398 if self.connection_watch_sets[&conn_id].is_empty() {
399 self.connection_watch_sets.remove(&conn_id);
400 }
401
402 match rsp {
403 WatchSetResponse::StatementDependenciesReady(id, ev) => {
404 self.record_statement_lifecycle_event(&id, &ev, now);
405 }
406 WatchSetResponse::AlterSinkReady(ctx) => {
407 self.sequence_alter_sink_finish(ctx).await;
408 }
409 }
410 }
411 }
412 }
413 }
414
415 #[mz_ore::instrument(level = "debug")]
416 async fn message_purified_statement_ready(
417 &mut self,
418 PurifiedStatementReady {
419 ctx,
420 result,
421 params,
422 mut plan_validity,
423 original_stmt,
424 otel_ctx,
425 }: PurifiedStatementReady,
426 ) {
427 otel_ctx.attach_as_parent();
428
429 if plan_validity.check(self.catalog()).is_err() {
440 self.handle_execute_inner(original_stmt, params, ctx).await;
441 return;
442 }
443
444 let purified_statement = match result {
445 Ok(ok) => ok,
446 Err(e) => return ctx.retire(Err(e)),
447 };
448
449 let plan = match purified_statement {
450 PurifiedStatement::PurifiedCreateSource {
451 create_progress_subsource_stmt,
452 create_source_stmt,
453 subsources,
454 available_source_references,
455 } => {
456 self.plan_purified_create_source(
457 &ctx,
458 params,
459 create_progress_subsource_stmt,
460 create_source_stmt,
461 subsources,
462 available_source_references,
463 )
464 .await
465 }
466 PurifiedStatement::PurifiedAlterSourceAddSubsources {
467 source_name,
468 options,
469 subsources,
470 } => {
471 self.plan_purified_alter_source_add_subsource(
472 ctx.session(),
473 params,
474 source_name,
475 options,
476 subsources,
477 )
478 .await
479 }
480 PurifiedStatement::PurifiedAlterSourceRefreshReferences {
481 source_name,
482 available_source_references,
483 } => self.plan_purified_alter_source_refresh_references(
484 ctx.session(),
485 params,
486 source_name,
487 available_source_references,
488 ),
489 o @ (PurifiedStatement::PurifiedAlterSource { .. }
490 | PurifiedStatement::PurifiedCreateSink(..)
491 | PurifiedStatement::PurifiedCreateTableFromSource { .. }) => {
492 let stmt = match o {
494 PurifiedStatement::PurifiedAlterSource { alter_source_stmt } => {
495 Statement::AlterSource(alter_source_stmt)
496 }
497 PurifiedStatement::PurifiedCreateTableFromSource { stmt } => {
498 Statement::CreateTableFromSource(stmt)
499 }
500 PurifiedStatement::PurifiedCreateSink(stmt) => Statement::CreateSink(stmt),
501 PurifiedStatement::PurifiedCreateSource { .. }
502 | PurifiedStatement::PurifiedAlterSourceAddSubsources { .. }
503 | PurifiedStatement::PurifiedAlterSourceRefreshReferences { .. } => {
504 unreachable!("not part of exterior match stmt")
505 }
506 };
507
508 let catalog = self.catalog().for_session(ctx.session());
511 let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &stmt);
512 self.plan_statement(ctx.session(), stmt, ¶ms, &resolved_ids)
513 .map(|plan| (plan, resolved_ids))
514 }
515 };
516
517 match plan {
518 Ok((plan, resolved_ids)) => self.sequence_plan(ctx, plan, resolved_ids).await,
519 Err(e) => ctx.retire(Err(e)),
520 }
521 }
522
523 #[mz_ore::instrument(level = "debug")]
524 async fn message_create_connection_validation_ready(
525 &mut self,
526 CreateConnectionValidationReady {
527 mut ctx,
528 result,
529 connection_id,
530 connection_gid,
531 mut plan_validity,
532 otel_ctx,
533 resolved_ids,
534 }: CreateConnectionValidationReady,
535 ) {
536 otel_ctx.attach_as_parent();
537
538 if let Err(e) = plan_validity.check(self.catalog()) {
544 let _ = self.secrets_controller.delete(connection_id).await;
545 return ctx.retire(Err(e));
546 }
547
548 let plan = match result {
549 Ok(ok) => ok,
550 Err(e) => {
551 let _ = self.secrets_controller.delete(connection_id).await;
552 return ctx.retire(Err(e));
553 }
554 };
555
556 let result = self
557 .sequence_create_connection_stage_finish(
558 &mut ctx,
559 connection_id,
560 connection_gid,
561 plan,
562 resolved_ids,
563 )
564 .await;
565 ctx.retire(result);
566 }
567
568 #[mz_ore::instrument(level = "debug")]
569 async fn message_alter_connection_validation_ready(
570 &mut self,
571 AlterConnectionValidationReady {
572 mut ctx,
573 result,
574 connection_id,
575 connection_gid: _,
576 mut plan_validity,
577 otel_ctx,
578 resolved_ids: _,
579 }: AlterConnectionValidationReady,
580 ) {
581 otel_ctx.attach_as_parent();
582
583 if let Err(e) = plan_validity.check(self.catalog()) {
589 return ctx.retire(Err(e));
590 }
591
592 let conn = match result {
593 Ok(ok) => ok,
594 Err(e) => {
595 return ctx.retire(Err(e));
596 }
597 };
598
599 let result = self
600 .sequence_alter_connection_stage_finish(ctx.session_mut(), connection_id, conn)
601 .await;
602 ctx.retire(result);
603 }
604
605 #[mz_ore::instrument(level = "debug")]
606 async fn message_cluster_event(&mut self, event: ClusterEvent) {
607 event!(Level::TRACE, event = format!("{:?}", event));
608
609 if let Some(segment_client) = &self.segment_client {
610 let env_id = &self.catalog().config().environment_id;
611 let mut properties = json!({
612 "cluster_id": event.cluster_id.to_string(),
613 "replica_id": event.replica_id.to_string(),
614 "process_id": event.process_id,
615 "status": event.status.as_kebab_case_str(),
616 });
617 match event.status {
618 ClusterStatus::Online => (),
619 ClusterStatus::Offline(reason) => {
620 let properties = match &mut properties {
621 serde_json::Value::Object(map) => map,
622 _ => unreachable!(),
623 };
624 properties.insert(
625 "reason".into(),
626 json!(reason.display_or("unknown").to_string()),
627 );
628 }
629 };
630 segment_client.environment_track(
631 env_id,
632 "Cluster Changed Status",
633 properties,
634 EventDetails {
635 timestamp: Some(event.time),
636 ..Default::default()
637 },
638 );
639 }
640
641 let Some(replica_statues) = self
644 .cluster_replica_statuses
645 .try_get_cluster_replica_statuses(event.cluster_id, event.replica_id)
646 else {
647 return;
648 };
649
650 if event.status != replica_statues[&event.process_id].status {
651 if !self.controller.read_only() {
652 let offline_reason = match event.status {
653 ClusterStatus::Online => None,
654 ClusterStatus::Offline(None) => None,
655 ClusterStatus::Offline(Some(reason)) => Some(reason.to_string()),
656 };
657 let row = Row::pack_slice(&[
658 Datum::String(&event.replica_id.to_string()),
659 Datum::UInt64(event.process_id),
660 Datum::String(event.status.as_kebab_case_str()),
661 Datum::from(offline_reason.as_deref()),
662 Datum::TimestampTz(event.time.try_into().expect("must fit")),
663 ]);
664 self.controller.storage.append_introspection_updates(
665 IntrospectionType::ReplicaStatusHistory,
666 vec![(row, Diff::ONE)],
667 );
668 }
669
670 let old_replica_status =
671 ClusterReplicaStatuses::cluster_replica_status(replica_statues);
672
673 let new_process_status = ClusterReplicaProcessStatus {
674 status: event.status,
675 time: event.time,
676 };
677 self.cluster_replica_statuses.ensure_cluster_status(
678 event.cluster_id,
679 event.replica_id,
680 event.process_id,
681 new_process_status,
682 );
683
684 let cluster = self.catalog().get_cluster(event.cluster_id);
685 let replica = cluster.replica(event.replica_id).expect("Replica exists");
686 let new_replica_status = self
687 .cluster_replica_statuses
688 .get_cluster_replica_status(event.cluster_id, event.replica_id);
689
690 if old_replica_status != new_replica_status {
691 let notifier = self.broadcast_notice_tx();
692 let notice = AdapterNotice::ClusterReplicaStatusChanged {
693 cluster: cluster.name.clone(),
694 replica: replica.name.clone(),
695 status: new_replica_status,
696 time: event.time,
697 };
698 notifier(notice);
699 }
700 }
701 }
702
703 #[mz_ore::instrument(level = "debug")]
704 async fn message_linearize_reads(&mut self) {
709 let mut shortest_wait = Duration::MAX;
710 let mut ready_txns = Vec::new();
711
712 let mut cached_oracle_ts = BTreeMap::new();
717
718 for (conn_id, mut read_txn) in std::mem::take(&mut self.pending_linearize_read_txns) {
719 if let TimestampContext::TimelineTimestamp {
720 timeline,
721 chosen_ts,
722 oracle_ts,
723 } = read_txn.timestamp_context()
724 {
725 let oracle_ts = match oracle_ts {
726 Some(oracle_ts) => oracle_ts,
727 None => {
728 ready_txns.push(read_txn);
730 continue;
731 }
732 };
733
734 if chosen_ts <= oracle_ts {
735 ready_txns.push(read_txn);
738 continue;
739 }
740
741 let current_oracle_ts = cached_oracle_ts.entry(timeline.clone());
743 let current_oracle_ts = match current_oracle_ts {
744 btree_map::Entry::Vacant(entry) => {
745 let timestamp_oracle = self.get_timestamp_oracle(timeline);
746 let read_ts = timestamp_oracle.read_ts().await;
747 entry.insert(read_ts.clone());
748 read_ts
749 }
750 btree_map::Entry::Occupied(entry) => entry.get().clone(),
751 };
752
753 if *chosen_ts <= current_oracle_ts {
754 ready_txns.push(read_txn);
755 } else {
756 let wait =
757 Duration::from_millis(chosen_ts.saturating_sub(current_oracle_ts).into());
758 if wait < shortest_wait {
759 shortest_wait = wait;
760 }
761 read_txn.num_requeues += 1;
762 self.pending_linearize_read_txns.insert(conn_id, read_txn);
763 }
764 } else {
765 ready_txns.push(read_txn);
766 }
767 }
768
769 if !ready_txns.is_empty() {
770 let otel_ctx = ready_txns.first().expect("known to exist").otel_ctx.clone();
773 let span = tracing::debug_span!("message_linearize_reads");
774 otel_ctx.attach_as_parent_to(&span);
775
776 let now = Instant::now();
777 for ready_txn in ready_txns {
778 let span = tracing::debug_span!("retire_read_results");
779 ready_txn.otel_ctx.attach_as_parent_to(&span);
780 let _entered = span.enter();
781 self.metrics
782 .linearize_message_seconds
783 .with_label_values(&[
784 ready_txn.txn.label(),
785 if ready_txn.num_requeues == 0 {
786 "true"
787 } else {
788 "false"
789 },
790 ])
791 .observe((now - ready_txn.created).as_secs_f64());
792 if let Some((ctx, result)) = ready_txn.txn.finish() {
793 ctx.retire(result);
794 }
795 }
796 }
797
798 if !self.pending_linearize_read_txns.is_empty() {
799 let remaining_ms = std::cmp::min(shortest_wait, Duration::from_millis(1_000));
801 let internal_cmd_tx = self.internal_cmd_tx.clone();
802 task::spawn(|| "deferred_read_txns", async move {
803 tokio::time::sleep(remaining_ms).await;
804 let result = internal_cmd_tx.send(Message::LinearizeReads);
806 if let Err(e) = result {
807 warn!("internal_cmd_rx dropped before we could send: {:?}", e);
808 }
809 });
810 }
811 }
812}