1use std::collections::{BTreeMap, BTreeSet, btree_map};
14use std::time::{Duration, Instant};
15
16use futures::FutureExt;
17use maplit::btreemap;
18use mz_catalog::memory::objects::ClusterReplicaProcessStatus;
19use mz_controller::ControllerResponse;
20use mz_controller::clusters::{ClusterEvent, ClusterStatus};
21use mz_ore::instrument;
22use mz_ore::now::EpochMillis;
23use mz_ore::option::OptionExt;
24use mz_ore::tracing::OpenTelemetryContext;
25use mz_ore::{soft_assert_or_log, task};
26use mz_persist_client::usage::ShardsUsageReferenced;
27use mz_repr::{Datum, Diff, Row};
28use mz_sql::ast::Statement;
29use mz_sql::pure::PurifiedStatement;
30use mz_storage_client::controller::IntrospectionType;
31use opentelemetry::trace::TraceContextExt;
32use rand::{Rng, SeedableRng, rngs};
33use serde_json::json;
34use tracing::{Instrument, Level, event, info_span, warn};
35use tracing_opentelemetry::OpenTelemetrySpanExt;
36
37use crate::active_compute_sink::{ActiveComputeSink, ActiveComputeSinkRetireReason};
38use crate::catalog::{BuiltinTableUpdate, Op};
39use crate::command::Command;
40use crate::coord::{
41 AlterConnectionValidationReady, ClusterReplicaStatuses, Coordinator,
42 CreateConnectionValidationReady, Message, PurifiedStatementReady, WatchSetResponse,
43};
44use crate::telemetry::{EventDetails, SegmentClientExt};
45use crate::{AdapterNotice, TimestampContext};
46
47impl Coordinator {
48 #[instrument]
53 pub(crate) async fn handle_message(&mut self, msg: Message) -> () {
54 match msg {
55 Message::Command(otel_ctx, cmd) => {
56 let span = tracing::info_span!("message_command").or_current();
60 span.in_scope(|| otel_ctx.attach_as_parent());
61 self.message_command(cmd).instrument(span).await
62 }
63 Message::ControllerReady { controller: _ } => {
64 let Coordinator {
65 controller,
66 catalog,
67 ..
68 } = self;
69 let storage_metadata = catalog.state().storage_metadata();
70 if let Some(m) = controller
71 .process(storage_metadata)
72 .expect("`process` never returns an error")
73 {
74 self.message_controller(m).boxed_local().await
75 }
76 }
77 Message::PurifiedStatementReady(ready) => {
78 self.message_purified_statement_ready(ready)
79 .boxed_local()
80 .await
81 }
82 Message::CreateConnectionValidationReady(ready) => {
83 self.message_create_connection_validation_ready(ready)
84 .boxed_local()
85 .await
86 }
87 Message::AlterConnectionValidationReady(ready) => {
88 self.message_alter_connection_validation_ready(ready)
89 .boxed_local()
90 .await
91 }
92 Message::TryDeferred {
93 conn_id,
94 acquired_lock,
95 } => self.try_deferred(conn_id, acquired_lock).await,
96 Message::GroupCommitInitiate(span, permit) => {
97 tracing::Span::current().add_link(span.context().span().span_context().clone());
99 self.try_group_commit(permit)
100 .instrument(span)
101 .boxed_local()
102 .await
103 }
104 Message::AdvanceTimelines => {
105 self.advance_timelines().boxed_local().await;
106 }
107 Message::ClusterEvent(event) => self.message_cluster_event(event).boxed_local().await,
108 Message::CancelPendingPeeks { conn_id } => {
109 self.cancel_pending_peeks(&conn_id);
110 }
111 Message::LinearizeReads => {
112 self.message_linearize_reads().boxed_local().await;
113 }
114 Message::StagedBatches {
115 conn_id,
116 table_id,
117 batches,
118 } => {
119 self.commit_staged_batches(conn_id, table_id, batches);
120 }
121 Message::StorageUsageSchedule => {
122 self.schedule_storage_usage_collection().boxed_local().await;
123 }
124 Message::StorageUsageFetch => {
125 self.storage_usage_fetch().boxed_local().await;
126 }
127 Message::StorageUsageUpdate(sizes) => {
128 self.storage_usage_update(sizes).boxed_local().await;
129 }
130 Message::StorageUsagePrune(expired) => {
131 self.storage_usage_prune(expired).boxed_local().await;
132 }
133 Message::RetireExecute {
134 otel_ctx,
135 data,
136 reason,
137 } => {
138 otel_ctx.attach_as_parent();
139 self.retire_execution(reason, data);
140 }
141 Message::ExecuteSingleStatementTransaction {
142 ctx,
143 otel_ctx,
144 stmt,
145 params,
146 } => {
147 otel_ctx.attach_as_parent();
148 self.sequence_execute_single_statement_transaction(ctx, stmt, params)
149 .boxed_local()
150 .await;
151 }
152 Message::PeekStageReady { ctx, span, stage } => {
153 self.sequence_staged(ctx, span, stage).boxed_local().await;
154 }
155 Message::CreateIndexStageReady { ctx, span, stage } => {
156 self.sequence_staged(ctx, span, stage).boxed_local().await;
157 }
158 Message::CreateViewStageReady { ctx, span, stage } => {
159 self.sequence_staged(ctx, span, stage).boxed_local().await;
160 }
161 Message::CreateMaterializedViewStageReady { ctx, span, stage } => {
162 self.sequence_staged(ctx, span, stage).boxed_local().await;
163 }
164 Message::SubscribeStageReady { ctx, span, stage } => {
165 self.sequence_staged(ctx, span, stage).boxed_local().await;
166 }
167 Message::IntrospectionSubscribeStageReady { span, stage } => {
168 self.sequence_staged((), span, stage).boxed_local().await;
169 }
170 Message::ExplainTimestampStageReady { ctx, span, stage } => {
171 self.sequence_staged(ctx, span, stage).boxed_local().await;
172 }
173 Message::SecretStageReady { ctx, span, stage } => {
174 self.sequence_staged(ctx, span, stage).boxed_local().await;
175 }
176 Message::ClusterStageReady { ctx, span, stage } => {
177 self.sequence_staged(ctx, span, stage).boxed_local().await;
178 }
179 Message::DrainStatementLog => {
180 self.drain_statement_log();
181 }
182 Message::PrivateLinkVpcEndpointEvents(events) => {
183 if !self.controller.read_only() {
184 self.controller
185 .storage
186 .append_introspection_updates(
187 mz_storage_client::controller::IntrospectionType::PrivatelinkConnectionStatusHistory,
188 events
189 .into_iter()
190 .map(|e| (mz_repr::Row::from(e), Diff::ONE))
191 .collect(),
192 );
193 }
194 }
195 Message::CheckSchedulingPolicies => {
196 self.check_scheduling_policies().boxed_local().await;
197 }
198 Message::SchedulingDecisions(decisions) => {
199 self.handle_scheduling_decisions(decisions)
200 .boxed_local()
201 .await;
202 }
203 Message::DeferredStatementReady => {
204 self.handle_deferred_statement().boxed_local().await;
205 }
206 }
207 }
208
209 #[mz_ore::instrument(level = "debug")]
210 pub async fn storage_usage_fetch(&self) {
211 let internal_cmd_tx = self.internal_cmd_tx.clone();
212 let client = self.storage_usage_client.clone();
213
214 let live_shards: BTreeSet<_> = self
216 .controller
217 .storage
218 .active_collection_metadatas()
219 .into_iter()
220 .map(|(_id, m)| m.data_shard)
221 .collect();
222
223 let collection_metric = self.metrics.storage_usage_collection_time_seconds.clone();
224
225 task::spawn(|| "storage_usage_fetch", async move {
228 let collection_metric_timer = collection_metric.start_timer();
229 let shard_sizes = client.shards_usage_referenced(live_shards).await;
230 collection_metric_timer.observe_duration();
231
232 if let Err(e) = internal_cmd_tx.send(Message::StorageUsageUpdate(shard_sizes)) {
235 warn!("internal_cmd_rx dropped before we could send: {:?}", e);
236 }
237 });
238 }
239
240 #[mz_ore::instrument(level = "debug")]
241 async fn storage_usage_update(&mut self, shards_usage: ShardsUsageReferenced) {
242 let collection_timestamp = if self.controller.read_only() {
247 self.peek_local_write_ts().await.into()
248 } else {
249 self.get_local_write_ts().await.timestamp.into()
252 };
253
254 let ops = shards_usage
255 .by_shard
256 .into_iter()
257 .map(|(shard_id, shard_usage)| Op::WeirdStorageUsageUpdates {
258 object_id: Some(shard_id.to_string()),
259 size_bytes: shard_usage.size_bytes(),
260 collection_timestamp,
261 })
262 .collect();
263
264 match self.catalog_transact_inner(None, ops).await {
265 Ok((table_updates, catalog_updates)) => {
266 assert!(
267 catalog_updates.is_empty(),
268 "applying builtin table updates does not produce catalog implications"
269 );
270
271 let internal_cmd_tx = self.internal_cmd_tx.clone();
272 let task_span =
273 info_span!(parent: None, "coord::storage_usage_update::table_updates");
274 OpenTelemetryContext::obtain().attach_as_parent_to(&task_span);
275 task::spawn(|| "storage_usage_update_table_updates", async move {
276 table_updates.instrument(task_span).await;
277 if let Err(e) = internal_cmd_tx.send(Message::StorageUsageSchedule) {
279 warn!("internal_cmd_rx dropped before we could send: {e:?}");
280 }
281 });
282 }
283 Err(err) => tracing::warn!("Failed to update storage metrics: {:?}", err),
284 }
285 }
286
287 #[mz_ore::instrument(level = "debug")]
288 async fn storage_usage_prune(&mut self, expired: Vec<BuiltinTableUpdate>) {
289 let (fut, _) = self.builtin_table_update().execute(expired).await;
290 task::spawn(|| "storage_usage_pruning_apply", async move {
291 fut.await;
292 });
293 }
294
295 pub async fn schedule_storage_usage_collection(&self) {
296 const SEED_LEN: usize = 32;
304 let mut seed = [0; SEED_LEN];
305 for (i, byte) in self
306 .catalog()
307 .state()
308 .config()
309 .environment_id
310 .organization_id()
311 .as_bytes()
312 .into_iter()
313 .take(SEED_LEN)
314 .enumerate()
315 {
316 seed[i] = *byte;
317 }
318 let storage_usage_collection_interval_ms: EpochMillis =
319 EpochMillis::try_from(self.storage_usage_collection_interval.as_millis())
320 .expect("storage usage collection interval must fit into u64");
321 let offset =
322 rngs::SmallRng::from_seed(seed).random_range(0..storage_usage_collection_interval_ms);
323 let now_ts: EpochMillis = self.peek_local_write_ts().await.into();
324
325 let previous_collection_ts =
327 (now_ts - (now_ts % storage_usage_collection_interval_ms)) + offset;
328 let next_collection_ts = if previous_collection_ts > now_ts {
329 previous_collection_ts
330 } else {
331 previous_collection_ts + storage_usage_collection_interval_ms
332 };
333 let next_collection_interval = Duration::from_millis(next_collection_ts - now_ts);
334
335 let internal_cmd_tx = self.internal_cmd_tx.clone();
337 task::spawn(|| "storage_usage_collection", async move {
338 tokio::time::sleep(next_collection_interval).await;
339 if internal_cmd_tx.send(Message::StorageUsageFetch).is_err() {
340 }
342 });
343 }
344
345 #[mz_ore::instrument(level = "debug")]
346 async fn message_command(&mut self, cmd: Command) {
347 self.handle_command(cmd).await;
348 }
349
350 #[mz_ore::instrument(level = "debug")]
351 async fn message_controller(&mut self, message: ControllerResponse) {
352 event!(Level::TRACE, message = format!("{:?}", message));
353 match message {
354 ControllerResponse::PeekNotification(uuid, response, otel_ctx) => {
355 self.handle_peek_notification(uuid, response, otel_ctx);
356 }
357 ControllerResponse::SubscribeResponse(sink_id, response) => {
358 if let Some(ActiveComputeSink::Subscribe(active_subscribe)) =
359 self.active_compute_sinks.get_mut(&sink_id)
360 {
361 let finished = active_subscribe.process_response(response);
362 if finished {
363 self.retire_compute_sinks(btreemap! {
364 sink_id => ActiveComputeSinkRetireReason::Finished,
365 })
366 .await;
367 }
368
369 soft_assert_or_log!(
370 !self.introspection_subscribes.contains_key(&sink_id),
371 "`sink_id` {sink_id} unexpectedly found in both `active_subscribes` \
372 and `introspection_subscribes`",
373 );
374 } else if self.introspection_subscribes.contains_key(&sink_id) {
375 self.handle_introspection_subscribe_batch(sink_id, response)
376 .await;
377 } else {
378 }
381 }
382 ControllerResponse::CopyToResponse(sink_id, response) => {
383 match self.drop_compute_sink(sink_id).await {
384 Some(ActiveComputeSink::CopyTo(active_copy_to)) => {
385 active_copy_to.retire_with_response(response);
386 }
387 _ => {
388 }
391 }
392 }
393 ControllerResponse::WatchSetFinished(ws_ids) => {
394 let now = self.now();
395 for ws_id in ws_ids {
396 let Some((conn_id, rsp)) = self.installed_watch_sets.remove(&ws_id) else {
397 continue;
398 };
399 self.connection_watch_sets
400 .get_mut(&conn_id)
401 .expect("corrupted coordinator state: unknown connection id")
402 .remove(&ws_id);
403 if self.connection_watch_sets[&conn_id].is_empty() {
404 self.connection_watch_sets.remove(&conn_id);
405 }
406
407 match rsp {
408 WatchSetResponse::StatementDependenciesReady(id, ev) => {
409 self.record_statement_lifecycle_event(&id, &ev, now);
410 }
411 WatchSetResponse::AlterSinkReady(ctx) => {
412 self.sequence_alter_sink_finish(ctx).await;
413 }
414 WatchSetResponse::AlterMaterializedViewReady(ctx) => {
415 self.sequence_alter_materialized_view_apply_replacement_finish(ctx)
416 .await;
417 }
418 }
419 }
420 }
421 }
422 }
423
424 #[mz_ore::instrument(level = "debug")]
425 async fn message_purified_statement_ready(
426 &mut self,
427 PurifiedStatementReady {
428 ctx,
429 result,
430 params,
431 mut plan_validity,
432 original_stmt,
433 otel_ctx,
434 }: PurifiedStatementReady,
435 ) {
436 otel_ctx.attach_as_parent();
437
438 if plan_validity.check(self.catalog()).is_err() {
449 self.handle_execute_inner(original_stmt, params, ctx).await;
450 return;
451 }
452
453 let purified_statement = match result {
454 Ok(ok) => ok,
455 Err(e) => return ctx.retire(Err(e)),
456 };
457
458 let plan = match purified_statement {
459 PurifiedStatement::PurifiedCreateSource {
460 create_progress_subsource_stmt,
461 create_source_stmt,
462 subsources,
463 available_source_references,
464 } => {
465 self.plan_purified_create_source(
466 &ctx,
467 params,
468 create_progress_subsource_stmt,
469 create_source_stmt,
470 subsources,
471 available_source_references,
472 )
473 .await
474 }
475 PurifiedStatement::PurifiedAlterSourceAddSubsources {
476 source_name,
477 options,
478 subsources,
479 } => {
480 self.plan_purified_alter_source_add_subsource(
481 ctx.session(),
482 params,
483 source_name,
484 options,
485 subsources,
486 )
487 .await
488 }
489 PurifiedStatement::PurifiedAlterSourceRefreshReferences {
490 source_name,
491 available_source_references,
492 } => self.plan_purified_alter_source_refresh_references(
493 ctx.session(),
494 params,
495 source_name,
496 available_source_references,
497 ),
498 o @ (PurifiedStatement::PurifiedAlterSource { .. }
499 | PurifiedStatement::PurifiedCreateSink(..)
500 | PurifiedStatement::PurifiedCreateTableFromSource { .. }) => {
501 let stmt = match o {
503 PurifiedStatement::PurifiedAlterSource { alter_source_stmt } => {
504 Statement::AlterSource(alter_source_stmt)
505 }
506 PurifiedStatement::PurifiedCreateTableFromSource { stmt } => {
507 Statement::CreateTableFromSource(stmt)
508 }
509 PurifiedStatement::PurifiedCreateSink(stmt) => Statement::CreateSink(stmt),
510 PurifiedStatement::PurifiedCreateSource { .. }
511 | PurifiedStatement::PurifiedAlterSourceAddSubsources { .. }
512 | PurifiedStatement::PurifiedAlterSourceRefreshReferences { .. } => {
513 unreachable!("not part of exterior match stmt")
514 }
515 };
516
517 let catalog = self.catalog().for_session(ctx.session());
520 let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &stmt);
521 self.plan_statement(ctx.session(), stmt, ¶ms, &resolved_ids)
522 .map(|plan| (plan, resolved_ids))
523 }
524 };
525
526 match plan {
527 Ok((plan, resolved_ids)) => self.sequence_plan(ctx, plan, resolved_ids).await,
528 Err(e) => ctx.retire(Err(e)),
529 }
530 }
531
532 #[mz_ore::instrument(level = "debug")]
533 async fn message_create_connection_validation_ready(
534 &mut self,
535 CreateConnectionValidationReady {
536 mut ctx,
537 result,
538 connection_id,
539 connection_gid,
540 mut plan_validity,
541 otel_ctx,
542 resolved_ids,
543 }: CreateConnectionValidationReady,
544 ) {
545 otel_ctx.attach_as_parent();
546
547 if let Err(e) = plan_validity.check(self.catalog()) {
553 let _ = self.secrets_controller.delete(connection_id).await;
554 return ctx.retire(Err(e));
555 }
556
557 let plan = match result {
558 Ok(ok) => ok,
559 Err(e) => {
560 let _ = self.secrets_controller.delete(connection_id).await;
561 return ctx.retire(Err(e));
562 }
563 };
564
565 let result = self
566 .sequence_create_connection_stage_finish(
567 &mut ctx,
568 connection_id,
569 connection_gid,
570 plan,
571 resolved_ids,
572 )
573 .await;
574 ctx.retire(result);
575 }
576
577 #[mz_ore::instrument(level = "debug")]
578 async fn message_alter_connection_validation_ready(
579 &mut self,
580 AlterConnectionValidationReady {
581 mut ctx,
582 result,
583 connection_id,
584 connection_gid: _,
585 mut plan_validity,
586 otel_ctx,
587 resolved_ids: _,
588 }: AlterConnectionValidationReady,
589 ) {
590 otel_ctx.attach_as_parent();
591
592 if let Err(e) = plan_validity.check(self.catalog()) {
598 return ctx.retire(Err(e));
599 }
600
601 let conn = match result {
602 Ok(ok) => ok,
603 Err(e) => {
604 return ctx.retire(Err(e));
605 }
606 };
607
608 let result = self
609 .sequence_alter_connection_stage_finish(ctx.session_mut(), connection_id, conn)
610 .await;
611 ctx.retire(result);
612 }
613
614 #[mz_ore::instrument(level = "debug")]
615 async fn message_cluster_event(&mut self, event: ClusterEvent) {
616 event!(Level::TRACE, event = format!("{:?}", event));
617
618 if let Some(segment_client) = &self.segment_client {
619 let env_id = &self.catalog().config().environment_id;
620 let mut properties = json!({
621 "cluster_id": event.cluster_id.to_string(),
622 "replica_id": event.replica_id.to_string(),
623 "process_id": event.process_id,
624 "status": event.status.as_kebab_case_str(),
625 });
626 match event.status {
627 ClusterStatus::Online => (),
628 ClusterStatus::Offline(reason) => {
629 let properties = match &mut properties {
630 serde_json::Value::Object(map) => map,
631 _ => unreachable!(),
632 };
633 properties.insert(
634 "reason".into(),
635 json!(reason.display_or("unknown").to_string()),
636 );
637 }
638 };
639 segment_client.environment_track(
640 env_id,
641 "Cluster Changed Status",
642 properties,
643 EventDetails {
644 timestamp: Some(event.time),
645 ..Default::default()
646 },
647 );
648 }
649
650 let Some(replica_statues) = self
653 .cluster_replica_statuses
654 .try_get_cluster_replica_statuses(event.cluster_id, event.replica_id)
655 else {
656 return;
657 };
658
659 if event.status != replica_statues[&event.process_id].status {
660 if !self.controller.read_only() {
661 let offline_reason = match event.status {
662 ClusterStatus::Online => None,
663 ClusterStatus::Offline(None) => None,
664 ClusterStatus::Offline(Some(reason)) => Some(reason.to_string()),
665 };
666 let row = Row::pack_slice(&[
667 Datum::String(&event.replica_id.to_string()),
668 Datum::UInt64(event.process_id),
669 Datum::String(event.status.as_kebab_case_str()),
670 Datum::from(offline_reason.as_deref()),
671 Datum::TimestampTz(event.time.try_into().expect("must fit")),
672 ]);
673 self.controller.storage.append_introspection_updates(
674 IntrospectionType::ReplicaStatusHistory,
675 vec![(row, Diff::ONE)],
676 );
677 }
678
679 let old_replica_status =
680 ClusterReplicaStatuses::cluster_replica_status(replica_statues);
681
682 let new_process_status = ClusterReplicaProcessStatus {
683 status: event.status,
684 time: event.time,
685 };
686 self.cluster_replica_statuses.ensure_cluster_status(
687 event.cluster_id,
688 event.replica_id,
689 event.process_id,
690 new_process_status,
691 );
692
693 let cluster = self.catalog().get_cluster(event.cluster_id);
694 let replica = cluster.replica(event.replica_id).expect("Replica exists");
695 let new_replica_status = self
696 .cluster_replica_statuses
697 .get_cluster_replica_status(event.cluster_id, event.replica_id);
698
699 if old_replica_status != new_replica_status {
700 let notifier = self.broadcast_notice_tx();
701 let notice = AdapterNotice::ClusterReplicaStatusChanged {
702 cluster: cluster.name.clone(),
703 replica: replica.name.clone(),
704 status: new_replica_status,
705 time: event.time,
706 };
707 notifier(notice);
708 }
709 }
710 }
711
712 #[mz_ore::instrument(level = "debug")]
713 async fn message_linearize_reads(&mut self) {
718 let mut shortest_wait = Duration::MAX;
719 let mut ready_txns = Vec::new();
720
721 let mut cached_oracle_ts = BTreeMap::new();
726
727 for (conn_id, mut read_txn) in std::mem::take(&mut self.pending_linearize_read_txns) {
728 if let TimestampContext::TimelineTimestamp {
729 timeline,
730 chosen_ts,
731 oracle_ts,
732 } = read_txn.timestamp_context()
733 {
734 let oracle_ts = match oracle_ts {
735 Some(oracle_ts) => oracle_ts,
736 None => {
737 ready_txns.push(read_txn);
739 continue;
740 }
741 };
742
743 if chosen_ts <= oracle_ts {
744 ready_txns.push(read_txn);
747 continue;
748 }
749
750 let current_oracle_ts = cached_oracle_ts.entry(timeline.clone());
752 let current_oracle_ts = match current_oracle_ts {
753 btree_map::Entry::Vacant(entry) => {
754 let timestamp_oracle = self.get_timestamp_oracle(timeline);
755 let read_ts = timestamp_oracle.read_ts().await;
756 entry.insert(read_ts.clone());
757 read_ts
758 }
759 btree_map::Entry::Occupied(entry) => entry.get().clone(),
760 };
761
762 if *chosen_ts <= current_oracle_ts {
763 ready_txns.push(read_txn);
764 } else {
765 let wait =
766 Duration::from_millis(chosen_ts.saturating_sub(current_oracle_ts).into());
767 if wait < shortest_wait {
768 shortest_wait = wait;
769 }
770 read_txn.num_requeues += 1;
771 self.pending_linearize_read_txns.insert(conn_id, read_txn);
772 }
773 } else {
774 ready_txns.push(read_txn);
775 }
776 }
777
778 if !ready_txns.is_empty() {
779 let otel_ctx = ready_txns.first().expect("known to exist").otel_ctx.clone();
782 let span = tracing::debug_span!("message_linearize_reads");
783 otel_ctx.attach_as_parent_to(&span);
784
785 let now = Instant::now();
786 for ready_txn in ready_txns {
787 let span = tracing::debug_span!("retire_read_results");
788 ready_txn.otel_ctx.attach_as_parent_to(&span);
789 let _entered = span.enter();
790 self.metrics
791 .linearize_message_seconds
792 .with_label_values(&[
793 ready_txn.txn.label(),
794 if ready_txn.num_requeues == 0 {
795 "true"
796 } else {
797 "false"
798 },
799 ])
800 .observe((now - ready_txn.created).as_secs_f64());
801 if let Some((ctx, result)) = ready_txn.txn.finish() {
802 ctx.retire(result);
803 }
804 }
805 }
806
807 if !self.pending_linearize_read_txns.is_empty() {
808 let remaining_ms = std::cmp::min(shortest_wait, Duration::from_millis(1_000));
810 let internal_cmd_tx = self.internal_cmd_tx.clone();
811 task::spawn(|| "deferred_read_txns", async move {
812 tokio::time::sleep(remaining_ms).await;
813 let result = internal_cmd_tx.send(Message::LinearizeReads);
815 if let Err(e) = result {
816 warn!("internal_cmd_rx dropped before we could send: {:?}", e);
817 }
818 });
819 }
820 }
821}