1use std::collections::{BTreeMap, BTreeSet, btree_map};
14use std::time::{Duration, Instant};
15
16use futures::FutureExt;
17use maplit::btreemap;
18use mz_catalog::memory::objects::ClusterReplicaProcessStatus;
19use mz_controller::ControllerResponse;
20use mz_controller::clusters::{ClusterEvent, ClusterStatus};
21use mz_ore::instrument;
22use mz_ore::now::EpochMillis;
23use mz_ore::option::OptionExt;
24use mz_ore::tracing::OpenTelemetryContext;
25use mz_ore::{soft_assert_or_log, task};
26use mz_persist_client::usage::ShardsUsageReferenced;
27use mz_repr::{Datum, Diff, Row};
28use mz_sql::ast::Statement;
29use mz_sql::pure::PurifiedStatement;
30use mz_storage_client::controller::IntrospectionType;
31use opentelemetry::trace::TraceContextExt;
32use rand::{Rng, SeedableRng, rngs};
33use serde_json::json;
34use tracing::{Instrument, Level, event, info_span, warn};
35use tracing_opentelemetry::OpenTelemetrySpanExt;
36
37use crate::active_compute_sink::{ActiveComputeSink, ActiveComputeSinkRetireReason};
38use crate::catalog::{BuiltinTableUpdate, Op};
39use crate::command::Command;
40use crate::coord::{
41 AlterConnectionValidationReady, ClusterReplicaStatuses, Coordinator,
42 CreateConnectionValidationReady, Message, PurifiedStatementReady, WatchSetResponse,
43};
44use crate::telemetry::{EventDetails, SegmentClientExt};
45use crate::{AdapterNotice, TimestampContext};
46
47impl Coordinator {
48 #[instrument]
53 pub(crate) async fn handle_message(&mut self, msg: Message) -> () {
54 match msg {
55 Message::Command(otel_ctx, cmd) => {
56 let span = tracing::info_span!("message_command").or_current();
60 span.in_scope(|| otel_ctx.attach_as_parent());
61 self.message_command(cmd).instrument(span).await
62 }
63 Message::ControllerReady { controller: _ } => {
64 let Coordinator {
65 controller,
66 catalog,
67 ..
68 } = self;
69 let storage_metadata = catalog.state().storage_metadata();
70 if let Some(m) = controller
71 .process(storage_metadata)
72 .expect("`process` never returns an error")
73 {
74 self.message_controller(m).boxed_local().await
75 }
76 }
77 Message::PurifiedStatementReady(ready) => {
78 self.message_purified_statement_ready(ready)
79 .boxed_local()
80 .await
81 }
82 Message::CreateConnectionValidationReady(ready) => {
83 self.message_create_connection_validation_ready(ready)
84 .boxed_local()
85 .await
86 }
87 Message::AlterConnectionValidationReady(ready) => {
88 self.message_alter_connection_validation_ready(ready)
89 .boxed_local()
90 .await
91 }
92 Message::TryDeferred {
93 conn_id,
94 acquired_lock,
95 } => self.try_deferred(conn_id, acquired_lock).await,
96 Message::GroupCommitInitiate(span, permit) => {
97 tracing::Span::current().add_link(span.context().span().span_context().clone());
99 self.try_group_commit(permit)
100 .instrument(span)
101 .boxed_local()
102 .await
103 }
104 Message::AdvanceTimelines => {
105 self.advance_timelines().boxed_local().await;
106 }
107 Message::ClusterEvent(event) => self.message_cluster_event(event).boxed_local().await,
108 Message::CancelPendingPeeks { conn_id } => {
109 self.cancel_pending_peeks(&conn_id);
110 }
111 Message::LinearizeReads => {
112 self.message_linearize_reads().boxed_local().await;
113 }
114 Message::StagedBatches {
115 conn_id,
116 table_id,
117 batches,
118 } => {
119 self.commit_staged_batches(conn_id, table_id, batches);
120 }
121 Message::StorageUsageSchedule => {
122 self.schedule_storage_usage_collection().boxed_local().await;
123 }
124 Message::StorageUsageFetch => {
125 self.storage_usage_fetch().boxed_local().await;
126 }
127 Message::StorageUsageUpdate(sizes) => {
128 self.storage_usage_update(sizes).boxed_local().await;
129 }
130 Message::StorageUsagePrune(expired) => {
131 self.storage_usage_prune(expired).boxed_local().await;
132 }
133 Message::RetireExecute {
134 otel_ctx,
135 data,
136 reason,
137 } => {
138 otel_ctx.attach_as_parent();
139 self.retire_execution(reason, data);
140 }
141 Message::ExecuteSingleStatementTransaction {
142 ctx,
143 otel_ctx,
144 stmt,
145 params,
146 } => {
147 otel_ctx.attach_as_parent();
148 self.sequence_execute_single_statement_transaction(ctx, stmt, params)
149 .boxed_local()
150 .await;
151 }
152 Message::PeekStageReady { ctx, span, stage } => {
153 self.sequence_staged(ctx, span, stage).boxed_local().await;
154 }
155 Message::CreateIndexStageReady { ctx, span, stage } => {
156 self.sequence_staged(ctx, span, stage).boxed_local().await;
157 }
158 Message::CreateViewStageReady { ctx, span, stage } => {
159 self.sequence_staged(ctx, span, stage).boxed_local().await;
160 }
161 Message::CreateMaterializedViewStageReady { ctx, span, stage } => {
162 self.sequence_staged(ctx, span, stage).boxed_local().await;
163 }
164 Message::SubscribeStageReady { ctx, span, stage } => {
165 self.sequence_staged(ctx, span, stage).boxed_local().await;
166 }
167 Message::IntrospectionSubscribeStageReady { span, stage } => {
168 self.sequence_staged((), span, stage).boxed_local().await;
169 }
170 Message::ExplainTimestampStageReady { ctx, span, stage } => {
171 self.sequence_staged(ctx, span, stage).boxed_local().await;
172 }
173 Message::SecretStageReady { ctx, span, stage } => {
174 self.sequence_staged(ctx, span, stage).boxed_local().await;
175 }
176 Message::ClusterStageReady { ctx, span, stage } => {
177 self.sequence_staged(ctx, span, stage).boxed_local().await;
178 }
179 Message::DrainStatementLog => {
180 self.drain_statement_log();
181 }
182 Message::PrivateLinkVpcEndpointEvents(events) => {
183 if !self.controller.read_only() {
184 self.controller.storage.append_introspection_updates(
185 IntrospectionType::PrivatelinkConnectionStatusHistory,
186 events
187 .into_iter()
188 .map(|e| (mz_repr::Row::from(e), Diff::ONE))
189 .collect(),
190 );
191 }
192 }
193 Message::CheckSchedulingPolicies => {
194 self.check_scheduling_policies().boxed_local().await;
195 }
196 Message::SchedulingDecisions(decisions) => {
197 self.handle_scheduling_decisions(decisions)
198 .boxed_local()
199 .await;
200 }
201 Message::DeferredStatementReady => {
202 self.handle_deferred_statement().boxed_local().await;
203 }
204 }
205 }
206
207 #[mz_ore::instrument(level = "debug")]
208 pub async fn storage_usage_fetch(&self) {
209 let internal_cmd_tx = self.internal_cmd_tx.clone();
210 let client = self.storage_usage_client.clone();
211
212 let live_shards: BTreeSet<_> = self
214 .controller
215 .storage
216 .active_collection_metadatas()
217 .into_iter()
218 .map(|(_id, m)| m.data_shard)
219 .collect();
220
221 let collection_metric = self.metrics.storage_usage_collection_time_seconds.clone();
222
223 task::spawn(|| "storage_usage_fetch", async move {
226 let collection_metric_timer = collection_metric.start_timer();
227 let shard_sizes = client.shards_usage_referenced(live_shards).await;
228 collection_metric_timer.observe_duration();
229
230 if let Err(e) = internal_cmd_tx.send(Message::StorageUsageUpdate(shard_sizes)) {
233 warn!("internal_cmd_rx dropped before we could send: {:?}", e);
234 }
235 });
236 }
237
238 #[mz_ore::instrument(level = "debug")]
239 async fn storage_usage_update(&mut self, shards_usage: ShardsUsageReferenced) {
240 let collection_timestamp = if self.controller.read_only() {
245 self.peek_local_write_ts().await.into()
246 } else {
247 self.get_local_write_ts().await.timestamp.into()
250 };
251
252 let ops = shards_usage
253 .by_shard
254 .into_iter()
255 .map(|(shard_id, shard_usage)| Op::WeirdStorageUsageUpdates {
256 object_id: Some(shard_id.to_string()),
257 size_bytes: shard_usage.size_bytes(),
258 collection_timestamp,
259 })
260 .collect();
261
262 match self.catalog_transact_inner(None, ops).await {
263 Ok((table_updates, catalog_updates)) => {
264 assert!(
265 catalog_updates.is_empty(),
266 "applying builtin table updates does not produce catalog implications"
267 );
268
269 let internal_cmd_tx = self.internal_cmd_tx.clone();
270 let task_span =
271 info_span!(parent: None, "coord::storage_usage_update::table_updates");
272 OpenTelemetryContext::obtain().attach_as_parent_to(&task_span);
273 task::spawn(|| "storage_usage_update_table_updates", async move {
274 table_updates.instrument(task_span).await;
275 if let Err(e) = internal_cmd_tx.send(Message::StorageUsageSchedule) {
277 warn!("internal_cmd_rx dropped before we could send: {e:?}");
278 }
279 });
280 }
281 Err(err) => tracing::warn!("Failed to update storage metrics: {:?}", err),
282 }
283 }
284
285 #[mz_ore::instrument(level = "debug")]
286 async fn storage_usage_prune(&mut self, expired: Vec<BuiltinTableUpdate>) {
287 let (fut, _) = self.builtin_table_update().execute(expired).await;
288 task::spawn(|| "storage_usage_pruning_apply", async move {
289 fut.await;
290 });
291 }
292
293 pub async fn schedule_storage_usage_collection(&self) {
294 const SEED_LEN: usize = 32;
302 let mut seed = [0; SEED_LEN];
303 for (i, byte) in self
304 .catalog()
305 .state()
306 .config()
307 .environment_id
308 .organization_id()
309 .as_bytes()
310 .into_iter()
311 .take(SEED_LEN)
312 .enumerate()
313 {
314 seed[i] = *byte;
315 }
316 let storage_usage_collection_interval_ms: EpochMillis =
317 EpochMillis::try_from(self.storage_usage_collection_interval.as_millis())
318 .expect("storage usage collection interval must fit into u64");
319 let offset =
320 rngs::SmallRng::from_seed(seed).random_range(0..storage_usage_collection_interval_ms);
321 let now_ts: EpochMillis = self.peek_local_write_ts().await.into();
322
323 let previous_collection_ts =
325 (now_ts - (now_ts % storage_usage_collection_interval_ms)) + offset;
326 let next_collection_ts = if previous_collection_ts > now_ts {
327 previous_collection_ts
328 } else {
329 previous_collection_ts + storage_usage_collection_interval_ms
330 };
331 let next_collection_interval = Duration::from_millis(next_collection_ts - now_ts);
332
333 let internal_cmd_tx = self.internal_cmd_tx.clone();
335 task::spawn(|| "storage_usage_collection", async move {
336 tokio::time::sleep(next_collection_interval).await;
337 if internal_cmd_tx.send(Message::StorageUsageFetch).is_err() {
338 }
340 });
341 }
342
343 #[mz_ore::instrument(level = "debug")]
344 async fn message_command(&mut self, cmd: Command) {
345 self.handle_command(cmd).await;
346 }
347
348 #[mz_ore::instrument(level = "debug")]
349 async fn message_controller(&mut self, message: ControllerResponse) {
350 event!(Level::TRACE, message = format!("{:?}", message));
351 match message {
352 ControllerResponse::PeekNotification(uuid, response, otel_ctx) => {
353 self.handle_peek_notification(uuid, response, otel_ctx);
354 }
355 ControllerResponse::SubscribeResponse(sink_id, response) => {
356 if let Some(ActiveComputeSink::Subscribe(active_subscribe)) =
357 self.active_compute_sinks.get_mut(&sink_id)
358 {
359 let finished = active_subscribe.process_response(response);
360 if finished {
361 self.retire_compute_sinks(btreemap! {
362 sink_id => ActiveComputeSinkRetireReason::Finished,
363 })
364 .await;
365 }
366
367 soft_assert_or_log!(
368 !self.introspection_subscribes.contains_key(&sink_id),
369 "`sink_id` {sink_id} unexpectedly found in both `active_subscribes` \
370 and `introspection_subscribes`",
371 );
372 } else if self.introspection_subscribes.contains_key(&sink_id) {
373 self.handle_introspection_subscribe_batch(sink_id, response)
374 .await;
375 } else {
376 }
379 }
380 ControllerResponse::CopyToResponse(sink_id, response) => {
381 match self.drop_compute_sink(sink_id).await {
382 Some(ActiveComputeSink::CopyTo(active_copy_to)) => {
383 active_copy_to.retire_with_response(response);
384 }
385 _ => {
386 }
389 }
390 }
391 ControllerResponse::WatchSetFinished(ws_ids) => {
392 let now = self.now();
393 for ws_id in ws_ids {
394 let Some((conn_id, rsp)) = self.installed_watch_sets.remove(&ws_id) else {
395 continue;
396 };
397 self.connection_watch_sets
398 .get_mut(&conn_id)
399 .expect("corrupted coordinator state: unknown connection id")
400 .remove(&ws_id);
401 if self.connection_watch_sets[&conn_id].is_empty() {
402 self.connection_watch_sets.remove(&conn_id);
403 }
404
405 match rsp {
406 WatchSetResponse::StatementDependenciesReady(id, ev) => {
407 self.record_statement_lifecycle_event(&id, &ev, now);
408 }
409 WatchSetResponse::AlterSinkReady(ctx) => {
410 self.sequence_alter_sink_finish(ctx).await;
411 }
412 WatchSetResponse::AlterMaterializedViewReady(ctx) => {
413 self.sequence_alter_materialized_view_apply_replacement_finish(ctx)
414 .await;
415 }
416 }
417 }
418 }
419 }
420 }
421
422 #[mz_ore::instrument(level = "debug")]
423 async fn message_purified_statement_ready(
424 &mut self,
425 PurifiedStatementReady {
426 ctx,
427 result,
428 params,
429 mut plan_validity,
430 original_stmt,
431 otel_ctx,
432 }: PurifiedStatementReady,
433 ) {
434 otel_ctx.attach_as_parent();
435
436 if plan_validity.check(self.catalog()).is_err() {
447 self.handle_execute_inner(original_stmt, params, ctx).await;
448 return;
449 }
450
451 let purified_statement = match result {
452 Ok(ok) => ok,
453 Err(e) => return ctx.retire(Err(e)),
454 };
455
456 let plan = match purified_statement {
457 PurifiedStatement::PurifiedCreateSource {
458 create_progress_subsource_stmt,
459 create_source_stmt,
460 subsources,
461 available_source_references,
462 } => {
463 self.plan_purified_create_source(
464 &ctx,
465 params,
466 create_progress_subsource_stmt,
467 create_source_stmt,
468 subsources,
469 available_source_references,
470 )
471 .await
472 }
473 PurifiedStatement::PurifiedAlterSourceAddSubsources {
474 source_name,
475 options,
476 subsources,
477 } => {
478 self.plan_purified_alter_source_add_subsource(
479 ctx.session(),
480 params,
481 source_name,
482 options,
483 subsources,
484 )
485 .await
486 }
487 PurifiedStatement::PurifiedAlterSourceRefreshReferences {
488 source_name,
489 available_source_references,
490 } => self.plan_purified_alter_source_refresh_references(
491 ctx.session(),
492 params,
493 source_name,
494 available_source_references,
495 ),
496 o @ (PurifiedStatement::PurifiedAlterSource { .. }
497 | PurifiedStatement::PurifiedCreateSink(..)
498 | PurifiedStatement::PurifiedCreateTableFromSource { .. }) => {
499 let stmt = match o {
501 PurifiedStatement::PurifiedAlterSource { alter_source_stmt } => {
502 Statement::AlterSource(alter_source_stmt)
503 }
504 PurifiedStatement::PurifiedCreateTableFromSource { stmt } => {
505 Statement::CreateTableFromSource(stmt)
506 }
507 PurifiedStatement::PurifiedCreateSink(stmt) => Statement::CreateSink(stmt),
508 PurifiedStatement::PurifiedCreateSource { .. }
509 | PurifiedStatement::PurifiedAlterSourceAddSubsources { .. }
510 | PurifiedStatement::PurifiedAlterSourceRefreshReferences { .. } => {
511 unreachable!("not part of exterior match stmt")
512 }
513 };
514
515 let catalog = self.catalog().for_session(ctx.session());
518 let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &stmt);
519 self.plan_statement(ctx.session(), stmt, ¶ms, &resolved_ids)
520 .map(|plan| (plan, resolved_ids))
521 }
522 };
523
524 match plan {
525 Ok((plan, resolved_ids)) => self.sequence_plan(ctx, plan, resolved_ids).await,
526 Err(e) => ctx.retire(Err(e)),
527 }
528 }
529
530 #[mz_ore::instrument(level = "debug")]
531 async fn message_create_connection_validation_ready(
532 &mut self,
533 CreateConnectionValidationReady {
534 mut ctx,
535 result,
536 connection_id,
537 connection_gid,
538 mut plan_validity,
539 otel_ctx,
540 resolved_ids,
541 }: CreateConnectionValidationReady,
542 ) {
543 otel_ctx.attach_as_parent();
544
545 if let Err(e) = plan_validity.check(self.catalog()) {
551 let _ = self.secrets_controller.delete(connection_id).await;
552 return ctx.retire(Err(e));
553 }
554
555 let plan = match result {
556 Ok(ok) => ok,
557 Err(e) => {
558 let _ = self.secrets_controller.delete(connection_id).await;
559 return ctx.retire(Err(e));
560 }
561 };
562
563 let result = self
564 .sequence_create_connection_stage_finish(
565 &mut ctx,
566 connection_id,
567 connection_gid,
568 plan,
569 resolved_ids,
570 )
571 .await;
572 ctx.retire(result);
573 }
574
575 #[mz_ore::instrument(level = "debug")]
576 async fn message_alter_connection_validation_ready(
577 &mut self,
578 AlterConnectionValidationReady {
579 mut ctx,
580 result,
581 connection_id,
582 connection_gid: _,
583 mut plan_validity,
584 otel_ctx,
585 resolved_ids: _,
586 }: AlterConnectionValidationReady,
587 ) {
588 otel_ctx.attach_as_parent();
589
590 if let Err(e) = plan_validity.check(self.catalog()) {
596 return ctx.retire(Err(e));
597 }
598
599 let conn = match result {
600 Ok(ok) => ok,
601 Err(e) => {
602 return ctx.retire(Err(e));
603 }
604 };
605
606 let result = self
607 .sequence_alter_connection_stage_finish(ctx.session_mut(), connection_id, conn)
608 .await;
609 ctx.retire(result);
610 }
611
612 #[mz_ore::instrument(level = "debug")]
613 async fn message_cluster_event(&mut self, event: ClusterEvent) {
614 event!(Level::TRACE, event = format!("{:?}", event));
615
616 if let Some(segment_client) = &self.segment_client {
617 let env_id = &self.catalog().config().environment_id;
618 let mut properties = json!({
619 "cluster_id": event.cluster_id.to_string(),
620 "replica_id": event.replica_id.to_string(),
621 "process_id": event.process_id,
622 "status": event.status.as_kebab_case_str(),
623 });
624 match event.status {
625 ClusterStatus::Online => (),
626 ClusterStatus::Offline(reason) => {
627 let properties = match &mut properties {
628 serde_json::Value::Object(map) => map,
629 _ => unreachable!(),
630 };
631 properties.insert(
632 "reason".into(),
633 json!(reason.display_or("unknown").to_string()),
634 );
635 }
636 };
637 segment_client.environment_track(
638 env_id,
639 "Cluster Changed Status",
640 properties,
641 EventDetails {
642 timestamp: Some(event.time),
643 ..Default::default()
644 },
645 );
646 }
647
648 let Some(replica_statues) = self
651 .cluster_replica_statuses
652 .try_get_cluster_replica_statuses(event.cluster_id, event.replica_id)
653 else {
654 return;
655 };
656
657 if event.status != replica_statues[&event.process_id].status {
658 if !self.controller.read_only() {
659 let offline_reason = match event.status {
660 ClusterStatus::Online => None,
661 ClusterStatus::Offline(None) => None,
662 ClusterStatus::Offline(Some(reason)) => Some(reason.to_string()),
663 };
664 let row = Row::pack_slice(&[
665 Datum::String(&event.replica_id.to_string()),
666 Datum::UInt64(event.process_id),
667 Datum::String(event.status.as_kebab_case_str()),
668 Datum::from(offline_reason.as_deref()),
669 Datum::TimestampTz(event.time.try_into().expect("must fit")),
670 ]);
671 self.controller.storage.append_introspection_updates(
672 IntrospectionType::ReplicaStatusHistory,
673 vec![(row, Diff::ONE)],
674 );
675 }
676
677 let old_replica_status =
678 ClusterReplicaStatuses::cluster_replica_status(replica_statues);
679
680 let new_process_status = ClusterReplicaProcessStatus {
681 status: event.status,
682 time: event.time,
683 };
684 self.cluster_replica_statuses.ensure_cluster_status(
685 event.cluster_id,
686 event.replica_id,
687 event.process_id,
688 new_process_status,
689 );
690
691 let cluster = self.catalog().get_cluster(event.cluster_id);
692 let replica = cluster.replica(event.replica_id).expect("Replica exists");
693 let new_replica_status = self
694 .cluster_replica_statuses
695 .get_cluster_replica_status(event.cluster_id, event.replica_id);
696
697 if old_replica_status != new_replica_status {
698 let notifier = self.broadcast_notice_tx();
699 let notice = AdapterNotice::ClusterReplicaStatusChanged {
700 cluster: cluster.name.clone(),
701 replica: replica.name.clone(),
702 status: new_replica_status,
703 time: event.time,
704 };
705 notifier(notice);
706 }
707 }
708 }
709
710 #[mz_ore::instrument(level = "debug")]
711 async fn message_linearize_reads(&mut self) {
716 let mut shortest_wait = Duration::MAX;
717 let mut ready_txns = Vec::new();
718
719 let mut cached_oracle_ts = BTreeMap::new();
724
725 for (conn_id, mut read_txn) in std::mem::take(&mut self.pending_linearize_read_txns) {
726 if let TimestampContext::TimelineTimestamp {
727 timeline,
728 chosen_ts,
729 oracle_ts,
730 } = read_txn.timestamp_context()
731 {
732 let oracle_ts = match oracle_ts {
733 Some(oracle_ts) => oracle_ts,
734 None => {
735 ready_txns.push(read_txn);
737 continue;
738 }
739 };
740
741 if chosen_ts <= oracle_ts {
742 ready_txns.push(read_txn);
745 continue;
746 }
747
748 let current_oracle_ts = cached_oracle_ts.entry(timeline.clone());
750 let current_oracle_ts = match current_oracle_ts {
751 btree_map::Entry::Vacant(entry) => {
752 let timestamp_oracle = self.get_timestamp_oracle(timeline);
753 let read_ts = timestamp_oracle.read_ts().await;
754 entry.insert(read_ts.clone());
755 read_ts
756 }
757 btree_map::Entry::Occupied(entry) => entry.get().clone(),
758 };
759
760 if *chosen_ts <= current_oracle_ts {
761 ready_txns.push(read_txn);
762 } else {
763 let wait =
764 Duration::from_millis(chosen_ts.saturating_sub(current_oracle_ts).into());
765 if wait < shortest_wait {
766 shortest_wait = wait;
767 }
768 read_txn.num_requeues += 1;
769 self.pending_linearize_read_txns.insert(conn_id, read_txn);
770 }
771 } else {
772 ready_txns.push(read_txn);
773 }
774 }
775
776 if !ready_txns.is_empty() {
777 let otel_ctx = ready_txns.first().expect("known to exist").otel_ctx.clone();
780 let span = tracing::debug_span!("message_linearize_reads");
781 otel_ctx.attach_as_parent_to(&span);
782
783 let now = Instant::now();
784 for ready_txn in ready_txns {
785 let span = tracing::debug_span!("retire_read_results");
786 ready_txn.otel_ctx.attach_as_parent_to(&span);
787 let _entered = span.enter();
788 self.metrics
789 .linearize_message_seconds
790 .with_label_values(&[
791 ready_txn.txn.label(),
792 if ready_txn.num_requeues == 0 {
793 "true"
794 } else {
795 "false"
796 },
797 ])
798 .observe((now - ready_txn.created).as_secs_f64());
799 if let Some((ctx, result)) = ready_txn.txn.finish() {
800 ctx.retire(result);
801 }
802 }
803 }
804
805 if !self.pending_linearize_read_txns.is_empty() {
806 let remaining_ms = std::cmp::min(shortest_wait, Duration::from_millis(1_000));
808 let internal_cmd_tx = self.internal_cmd_tx.clone();
809 task::spawn(|| "deferred_read_txns", async move {
810 tokio::time::sleep(remaining_ms).await;
811 let result = internal_cmd_tx.send(Message::LinearizeReads);
813 if let Err(e) = result {
814 warn!("internal_cmd_rx dropped before we could send: {:?}", e);
815 }
816 });
817 }
818 }
819}