1use std::collections::{BTreeMap, BTreeSet, btree_map};
14use std::time::{Duration, Instant};
15
16use futures::FutureExt;
17use maplit::btreemap;
18use mz_catalog::memory::objects::ClusterReplicaProcessStatus;
19use mz_controller::ControllerResponse;
20use mz_controller::clusters::{ClusterEvent, ClusterStatus};
21use mz_ore::instrument;
22use mz_ore::now::EpochMillis;
23use mz_ore::option::OptionExt;
24use mz_ore::tracing::OpenTelemetryContext;
25use mz_ore::{soft_assert_or_log, task};
26use mz_persist_client::usage::ShardsUsageReferenced;
27use mz_repr::{Datum, Diff, Row};
28use mz_sql::ast::Statement;
29use mz_sql::pure::PurifiedStatement;
30use mz_storage_client::controller::IntrospectionType;
31use mz_storage_types::controller::CollectionMetadata;
32use opentelemetry::trace::TraceContextExt;
33use rand::{Rng, SeedableRng, rngs};
34use serde_json::json;
35use tracing::{Instrument, Level, event, info_span, warn};
36use tracing_opentelemetry::OpenTelemetrySpanExt;
37
38use crate::active_compute_sink::{ActiveComputeSink, ActiveComputeSinkRetireReason};
39use crate::catalog::{BuiltinTableUpdate, Op};
40use crate::command::Command;
41use crate::coord::{
42 AlterConnectionValidationReady, ClusterReplicaStatuses, Coordinator,
43 CreateConnectionValidationReady, Message, PurifiedStatementReady, WatchSetResponse,
44};
45use crate::telemetry::{EventDetails, SegmentClientExt};
46use crate::{AdapterNotice, TimestampContext};
47
48impl Coordinator {
49 #[instrument]
54 pub(crate) async fn handle_message(&mut self, msg: Message) -> () {
55 match msg {
56 Message::Command(otel_ctx, cmd) => {
57 let span = tracing::info_span!("message_command").or_current();
61 span.in_scope(|| otel_ctx.attach_as_parent());
62 self.message_command(cmd).instrument(span).await
63 }
64 Message::ControllerReady { controller: _ } => {
65 let Coordinator {
66 controller,
67 catalog,
68 ..
69 } = self;
70 let storage_metadata = catalog.state().storage_metadata();
71 if let Some(m) = controller
72 .process(storage_metadata)
73 .expect("`process` never returns an error")
74 {
75 self.message_controller(m).boxed_local().await
76 }
77 }
78 Message::PurifiedStatementReady(ready) => {
79 self.message_purified_statement_ready(ready)
80 .boxed_local()
81 .await
82 }
83 Message::CreateConnectionValidationReady(ready) => {
84 self.message_create_connection_validation_ready(ready)
85 .boxed_local()
86 .await
87 }
88 Message::AlterConnectionValidationReady(ready) => {
89 self.message_alter_connection_validation_ready(ready)
90 .boxed_local()
91 .await
92 }
93 Message::TryDeferred {
94 conn_id,
95 acquired_lock,
96 } => self.try_deferred(conn_id, acquired_lock).await,
97 Message::GroupCommitInitiate(span, permit) => {
98 tracing::Span::current().add_link(span.context().span().span_context().clone());
100 self.try_group_commit(permit)
101 .instrument(span)
102 .boxed_local()
103 .await
104 }
105 Message::AdvanceTimelines => {
106 self.advance_timelines().boxed_local().await;
107 }
108 Message::ClusterEvent(event) => self.message_cluster_event(event).boxed_local().await,
109 Message::CancelPendingPeeks { conn_id } => {
110 self.cancel_pending_peeks(&conn_id);
111 }
112 Message::LinearizeReads => {
113 self.message_linearize_reads().boxed_local().await;
114 }
115 Message::StagedBatches {
116 conn_id,
117 table_id,
118 batches,
119 } => {
120 self.commit_staged_batches(conn_id, table_id, batches);
121 }
122 Message::StorageUsageSchedule => {
123 self.schedule_storage_usage_collection().boxed_local().await;
124 }
125 Message::StorageUsageFetch => {
126 self.storage_usage_fetch().boxed_local().await;
127 }
128 Message::StorageUsageUpdate(sizes) => {
129 self.storage_usage_update(sizes).boxed_local().await;
130 }
131 Message::StorageUsagePrune(expired) => {
132 self.storage_usage_prune(expired).boxed_local().await;
133 }
134 Message::RetireExecute {
135 otel_ctx,
136 data,
137 reason,
138 } => {
139 otel_ctx.attach_as_parent();
140 self.retire_execution(reason, data);
141 }
142 Message::ExecuteSingleStatementTransaction {
143 ctx,
144 otel_ctx,
145 stmt,
146 params,
147 } => {
148 otel_ctx.attach_as_parent();
149 self.sequence_execute_single_statement_transaction(ctx, stmt, params)
150 .boxed_local()
151 .await;
152 }
153 Message::PeekStageReady { ctx, span, stage } => {
154 self.sequence_staged(ctx, span, stage).boxed_local().await;
155 }
156 Message::CreateIndexStageReady { ctx, span, stage } => {
157 self.sequence_staged(ctx, span, stage).boxed_local().await;
158 }
159 Message::CreateViewStageReady { ctx, span, stage } => {
160 self.sequence_staged(ctx, span, stage).boxed_local().await;
161 }
162 Message::CreateMaterializedViewStageReady { ctx, span, stage } => {
163 self.sequence_staged(ctx, span, stage).boxed_local().await;
164 }
165 Message::SubscribeStageReady { ctx, span, stage } => {
166 self.sequence_staged(ctx, span, stage).boxed_local().await;
167 }
168 Message::IntrospectionSubscribeStageReady { span, stage } => {
169 self.sequence_staged((), span, stage).boxed_local().await;
170 }
171 Message::ExplainTimestampStageReady { ctx, span, stage } => {
172 self.sequence_staged(ctx, span, stage).boxed_local().await;
173 }
174 Message::SecretStageReady { ctx, span, stage } => {
175 self.sequence_staged(ctx, span, stage).boxed_local().await;
176 }
177 Message::ClusterStageReady { ctx, span, stage } => {
178 self.sequence_staged(ctx, span, stage).boxed_local().await;
179 }
180 Message::DrainStatementLog => {
181 self.drain_statement_log();
182 }
183 Message::PrivateLinkVpcEndpointEvents(events) => {
184 if !self.controller.read_only() {
185 self.controller
186 .storage
187 .append_introspection_updates(
188 mz_storage_client::controller::IntrospectionType::PrivatelinkConnectionStatusHistory,
189 events
190 .into_iter()
191 .map(|e| (mz_repr::Row::from(e), Diff::ONE))
192 .collect(),
193 );
194 }
195 }
196 Message::CheckSchedulingPolicies => {
197 self.check_scheduling_policies().boxed_local().await;
198 }
199 Message::SchedulingDecisions(decisions) => {
200 self.handle_scheduling_decisions(decisions)
201 .boxed_local()
202 .await;
203 }
204 Message::DeferredStatementReady => {
205 self.handle_deferred_statement().boxed_local().await;
206 }
207 }
208 }
209
210 #[mz_ore::instrument(level = "debug")]
211 pub async fn storage_usage_fetch(&mut self) {
212 let internal_cmd_tx = self.internal_cmd_tx.clone();
213 let client = self.storage_usage_client.clone();
214
215 let live_shards: BTreeSet<_> = self
217 .controller
218 .storage
219 .active_collection_metadatas()
220 .into_iter()
221 .flat_map(|(_id, collection_metadata)| {
222 let CollectionMetadata {
223 data_shard,
224 remap_shard,
225 persist_location: _,
233 relation_desc: _,
234 txns_shard: _,
235 } = collection_metadata;
236 [remap_shard, Some(data_shard)].into_iter()
237 })
238 .filter_map(|shard| shard)
239 .collect();
240
241 let collection_metric = self
242 .metrics
243 .storage_usage_collection_time_seconds
244 .with_label_values(&[]);
245
246 task::spawn(|| "storage_usage_fetch", async move {
249 let collection_metric_timer = collection_metric.start_timer();
250 let shard_sizes = client.shards_usage_referenced(live_shards).await;
251 collection_metric_timer.observe_duration();
252
253 if let Err(e) = internal_cmd_tx.send(Message::StorageUsageUpdate(shard_sizes)) {
256 warn!("internal_cmd_rx dropped before we could send: {:?}", e);
257 }
258 });
259 }
260
261 #[mz_ore::instrument(level = "debug")]
262 async fn storage_usage_update(&mut self, shards_usage: ShardsUsageReferenced) {
263 let collection_timestamp = if self.controller.read_only() {
268 self.peek_local_write_ts().await.into()
269 } else {
270 self.get_local_write_ts().await.timestamp.into()
273 };
274
275 let ops = shards_usage
276 .by_shard
277 .into_iter()
278 .map(|(shard_id, shard_usage)| Op::WeirdStorageUsageUpdates {
279 object_id: Some(shard_id.to_string()),
280 size_bytes: shard_usage.size_bytes(),
281 collection_timestamp,
282 })
283 .collect();
284
285 match self.catalog_transact_inner(None, ops).await {
286 Ok(table_updates) => {
287 let internal_cmd_tx = self.internal_cmd_tx.clone();
288 let task_span =
289 info_span!(parent: None, "coord::storage_usage_update::table_updates");
290 OpenTelemetryContext::obtain().attach_as_parent_to(&task_span);
291 task::spawn(|| "storage_usage_update_table_updates", async move {
292 table_updates.instrument(task_span).await;
293 if let Err(e) = internal_cmd_tx.send(Message::StorageUsageSchedule) {
295 warn!("internal_cmd_rx dropped before we could send: {e:?}");
296 }
297 });
298 }
299 Err(err) => tracing::warn!("Failed to update storage metrics: {:?}", err),
300 }
301 }
302
303 #[mz_ore::instrument(level = "debug")]
304 async fn storage_usage_prune(&mut self, expired: Vec<BuiltinTableUpdate>) {
305 let (fut, _) = self.builtin_table_update().execute(expired).await;
306 task::spawn(|| "storage_usage_pruning_apply", async move {
307 fut.await;
308 });
309 }
310
311 pub async fn schedule_storage_usage_collection(&self) {
312 const SEED_LEN: usize = 32;
320 let mut seed = [0; SEED_LEN];
321 for (i, byte) in self
322 .catalog()
323 .state()
324 .config()
325 .environment_id
326 .organization_id()
327 .as_bytes()
328 .into_iter()
329 .take(SEED_LEN)
330 .enumerate()
331 {
332 seed[i] = *byte;
333 }
334 let storage_usage_collection_interval_ms: EpochMillis =
335 EpochMillis::try_from(self.storage_usage_collection_interval.as_millis())
336 .expect("storage usage collection interval must fit into u64");
337 let offset =
338 rngs::SmallRng::from_seed(seed).gen_range(0..storage_usage_collection_interval_ms);
339 let now_ts: EpochMillis = self.peek_local_write_ts().await.into();
340
341 let previous_collection_ts =
343 (now_ts - (now_ts % storage_usage_collection_interval_ms)) + offset;
344 let next_collection_ts = if previous_collection_ts > now_ts {
345 previous_collection_ts
346 } else {
347 previous_collection_ts + storage_usage_collection_interval_ms
348 };
349 let next_collection_interval = Duration::from_millis(next_collection_ts - now_ts);
350
351 let internal_cmd_tx = self.internal_cmd_tx.clone();
353 task::spawn(|| "storage_usage_collection", async move {
354 tokio::time::sleep(next_collection_interval).await;
355 if internal_cmd_tx.send(Message::StorageUsageFetch).is_err() {
356 }
358 });
359 }
360
361 #[mz_ore::instrument(level = "debug")]
362 async fn message_command(&mut self, cmd: Command) {
363 self.handle_command(cmd).await;
364 }
365
366 #[mz_ore::instrument(level = "debug")]
367 async fn message_controller(&mut self, message: ControllerResponse) {
368 event!(Level::TRACE, message = format!("{:?}", message));
369 match message {
370 ControllerResponse::PeekNotification(uuid, response, otel_ctx) => {
371 self.handle_peek_notification(uuid, response, otel_ctx);
372 }
373 ControllerResponse::SubscribeResponse(sink_id, response) => {
374 if let Some(ActiveComputeSink::Subscribe(active_subscribe)) =
375 self.active_compute_sinks.get_mut(&sink_id)
376 {
377 let finished = active_subscribe.process_response(response);
378 if finished {
379 self.retire_compute_sinks(btreemap! {
380 sink_id => ActiveComputeSinkRetireReason::Finished,
381 })
382 .await;
383 }
384
385 soft_assert_or_log!(
386 !self.introspection_subscribes.contains_key(&sink_id),
387 "`sink_id` {sink_id} unexpectedly found in both `active_subscribes` \
388 and `introspection_subscribes`",
389 );
390 } else if self.introspection_subscribes.contains_key(&sink_id) {
391 self.handle_introspection_subscribe_batch(sink_id, response)
392 .await;
393 } else {
394 }
397 }
398 ControllerResponse::CopyToResponse(sink_id, response) => {
399 match self.drop_compute_sink(sink_id).await {
400 Some(ActiveComputeSink::CopyTo(active_copy_to)) => {
401 active_copy_to.retire_with_response(response);
402 }
403 _ => {
404 }
407 }
408 }
409 ControllerResponse::WatchSetFinished(ws_ids) => {
410 let now = self.now();
411 for ws_id in ws_ids {
412 let Some((conn_id, rsp)) = self.installed_watch_sets.remove(&ws_id) else {
413 continue;
414 };
415 self.connection_watch_sets
416 .get_mut(&conn_id)
417 .expect("corrupted coordinator state: unknown connection id")
418 .remove(&ws_id);
419 if self.connection_watch_sets[&conn_id].is_empty() {
420 self.connection_watch_sets.remove(&conn_id);
421 }
422
423 match rsp {
424 WatchSetResponse::StatementDependenciesReady(id, ev) => {
425 self.record_statement_lifecycle_event(&id, &ev, now);
426 }
427 WatchSetResponse::AlterSinkReady(ctx) => {
428 self.sequence_alter_sink_finish(ctx).await;
429 }
430 }
431 }
432 }
433 }
434 }
435
436 #[mz_ore::instrument(level = "debug")]
437 async fn message_purified_statement_ready(
438 &mut self,
439 PurifiedStatementReady {
440 ctx,
441 result,
442 params,
443 mut plan_validity,
444 original_stmt,
445 otel_ctx,
446 }: PurifiedStatementReady,
447 ) {
448 otel_ctx.attach_as_parent();
449
450 if plan_validity.check(self.catalog()).is_err() {
461 self.handle_execute_inner(original_stmt, params, ctx).await;
462 return;
463 }
464
465 let purified_statement = match result {
466 Ok(ok) => ok,
467 Err(e) => return ctx.retire(Err(e)),
468 };
469
470 let plan = match purified_statement {
471 PurifiedStatement::PurifiedCreateSource {
472 create_progress_subsource_stmt,
473 create_source_stmt,
474 subsources,
475 available_source_references,
476 } => {
477 self.plan_purified_create_source(
478 &ctx,
479 params,
480 create_progress_subsource_stmt,
481 create_source_stmt,
482 subsources,
483 available_source_references,
484 )
485 .await
486 }
487 PurifiedStatement::PurifiedAlterSourceAddSubsources {
488 source_name,
489 options,
490 subsources,
491 } => {
492 self.plan_purified_alter_source_add_subsource(
493 ctx.session(),
494 params,
495 source_name,
496 options,
497 subsources,
498 )
499 .await
500 }
501 PurifiedStatement::PurifiedAlterSourceRefreshReferences {
502 source_name,
503 available_source_references,
504 } => self.plan_purified_alter_source_refresh_references(
505 ctx.session(),
506 params,
507 source_name,
508 available_source_references,
509 ),
510 o @ (PurifiedStatement::PurifiedAlterSource { .. }
511 | PurifiedStatement::PurifiedCreateSink(..)
512 | PurifiedStatement::PurifiedCreateTableFromSource { .. }) => {
513 let stmt = match o {
515 PurifiedStatement::PurifiedAlterSource { alter_source_stmt } => {
516 Statement::AlterSource(alter_source_stmt)
517 }
518 PurifiedStatement::PurifiedCreateTableFromSource { stmt } => {
519 Statement::CreateTableFromSource(stmt)
520 }
521 PurifiedStatement::PurifiedCreateSink(stmt) => Statement::CreateSink(stmt),
522 PurifiedStatement::PurifiedCreateSource { .. }
523 | PurifiedStatement::PurifiedAlterSourceAddSubsources { .. }
524 | PurifiedStatement::PurifiedAlterSourceRefreshReferences { .. } => {
525 unreachable!("not part of exterior match stmt")
526 }
527 };
528
529 let catalog = self.catalog().for_session(ctx.session());
532 let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &stmt);
533 self.plan_statement(ctx.session(), stmt, ¶ms, &resolved_ids)
534 .map(|plan| (plan, resolved_ids))
535 }
536 };
537
538 match plan {
539 Ok((plan, resolved_ids)) => self.sequence_plan(ctx, plan, resolved_ids).await,
540 Err(e) => ctx.retire(Err(e)),
541 }
542 }
543
544 #[mz_ore::instrument(level = "debug")]
545 async fn message_create_connection_validation_ready(
546 &mut self,
547 CreateConnectionValidationReady {
548 mut ctx,
549 result,
550 connection_id,
551 connection_gid,
552 mut plan_validity,
553 otel_ctx,
554 resolved_ids,
555 }: CreateConnectionValidationReady,
556 ) {
557 otel_ctx.attach_as_parent();
558
559 if let Err(e) = plan_validity.check(self.catalog()) {
565 let _ = self.secrets_controller.delete(connection_id).await;
566 return ctx.retire(Err(e));
567 }
568
569 let plan = match result {
570 Ok(ok) => ok,
571 Err(e) => {
572 let _ = self.secrets_controller.delete(connection_id).await;
573 return ctx.retire(Err(e));
574 }
575 };
576
577 let result = self
578 .sequence_create_connection_stage_finish(
579 &mut ctx,
580 connection_id,
581 connection_gid,
582 plan,
583 resolved_ids,
584 )
585 .await;
586 ctx.retire(result);
587 }
588
589 #[mz_ore::instrument(level = "debug")]
590 async fn message_alter_connection_validation_ready(
591 &mut self,
592 AlterConnectionValidationReady {
593 mut ctx,
594 result,
595 connection_id,
596 connection_gid: _,
597 mut plan_validity,
598 otel_ctx,
599 resolved_ids: _,
600 }: AlterConnectionValidationReady,
601 ) {
602 otel_ctx.attach_as_parent();
603
604 if let Err(e) = plan_validity.check(self.catalog()) {
610 return ctx.retire(Err(e));
611 }
612
613 let conn = match result {
614 Ok(ok) => ok,
615 Err(e) => {
616 return ctx.retire(Err(e));
617 }
618 };
619
620 let result = self
621 .sequence_alter_connection_stage_finish(ctx.session_mut(), connection_id, conn)
622 .await;
623 ctx.retire(result);
624 }
625
626 #[mz_ore::instrument(level = "debug")]
627 async fn message_cluster_event(&mut self, event: ClusterEvent) {
628 event!(Level::TRACE, event = format!("{:?}", event));
629
630 if let Some(segment_client) = &self.segment_client {
631 let env_id = &self.catalog().config().environment_id;
632 let mut properties = json!({
633 "cluster_id": event.cluster_id.to_string(),
634 "replica_id": event.replica_id.to_string(),
635 "process_id": event.process_id,
636 "status": event.status.as_kebab_case_str(),
637 });
638 match event.status {
639 ClusterStatus::Online => (),
640 ClusterStatus::Offline(reason) => {
641 let properties = match &mut properties {
642 serde_json::Value::Object(map) => map,
643 _ => unreachable!(),
644 };
645 properties.insert(
646 "reason".into(),
647 json!(reason.display_or("unknown").to_string()),
648 );
649 }
650 };
651 segment_client.environment_track(
652 env_id,
653 "Cluster Changed Status",
654 properties,
655 EventDetails {
656 timestamp: Some(event.time),
657 ..Default::default()
658 },
659 );
660 }
661
662 let Some(replica_statues) = self
665 .cluster_replica_statuses
666 .try_get_cluster_replica_statuses(event.cluster_id, event.replica_id)
667 else {
668 return;
669 };
670
671 if event.status != replica_statues[&event.process_id].status {
672 if !self.controller.read_only() {
673 let offline_reason = match event.status {
674 ClusterStatus::Online => None,
675 ClusterStatus::Offline(None) => None,
676 ClusterStatus::Offline(Some(reason)) => Some(reason.to_string()),
677 };
678 let row = Row::pack_slice(&[
679 Datum::String(&event.replica_id.to_string()),
680 Datum::UInt64(event.process_id),
681 Datum::String(event.status.as_kebab_case_str()),
682 Datum::from(offline_reason.as_deref()),
683 Datum::TimestampTz(event.time.try_into().expect("must fit")),
684 ]);
685 self.controller.storage.append_introspection_updates(
686 IntrospectionType::ReplicaStatusHistory,
687 vec![(row, Diff::ONE)],
688 );
689 }
690
691 let old_replica_status =
692 ClusterReplicaStatuses::cluster_replica_status(replica_statues);
693
694 let new_process_status = ClusterReplicaProcessStatus {
695 status: event.status,
696 time: event.time,
697 };
698 self.cluster_replica_statuses.ensure_cluster_status(
699 event.cluster_id,
700 event.replica_id,
701 event.process_id,
702 new_process_status,
703 );
704
705 let cluster = self.catalog().get_cluster(event.cluster_id);
706 let replica = cluster.replica(event.replica_id).expect("Replica exists");
707 let new_replica_status = self
708 .cluster_replica_statuses
709 .get_cluster_replica_status(event.cluster_id, event.replica_id);
710
711 if old_replica_status != new_replica_status {
712 let notifier = self.broadcast_notice_tx();
713 let notice = AdapterNotice::ClusterReplicaStatusChanged {
714 cluster: cluster.name.clone(),
715 replica: replica.name.clone(),
716 status: new_replica_status,
717 time: event.time,
718 };
719 notifier(notice);
720 }
721 }
722 }
723
724 #[mz_ore::instrument(level = "debug")]
725 async fn message_linearize_reads(&mut self) {
730 let mut shortest_wait = Duration::MAX;
731 let mut ready_txns = Vec::new();
732
733 let mut cached_oracle_ts = BTreeMap::new();
738
739 for (conn_id, mut read_txn) in std::mem::take(&mut self.pending_linearize_read_txns) {
740 if let TimestampContext::TimelineTimestamp {
741 timeline,
742 chosen_ts,
743 oracle_ts,
744 } = read_txn.timestamp_context()
745 {
746 let oracle_ts = match oracle_ts {
747 Some(oracle_ts) => oracle_ts,
748 None => {
749 ready_txns.push(read_txn);
751 continue;
752 }
753 };
754
755 if chosen_ts <= oracle_ts {
756 ready_txns.push(read_txn);
759 continue;
760 }
761
762 let current_oracle_ts = cached_oracle_ts.entry(timeline.clone());
764 let current_oracle_ts = match current_oracle_ts {
765 btree_map::Entry::Vacant(entry) => {
766 let timestamp_oracle = self.get_timestamp_oracle(timeline);
767 let read_ts = timestamp_oracle.read_ts().await;
768 entry.insert(read_ts.clone());
769 read_ts
770 }
771 btree_map::Entry::Occupied(entry) => entry.get().clone(),
772 };
773
774 if *chosen_ts <= current_oracle_ts {
775 ready_txns.push(read_txn);
776 } else {
777 let wait =
778 Duration::from_millis(chosen_ts.saturating_sub(current_oracle_ts).into());
779 if wait < shortest_wait {
780 shortest_wait = wait;
781 }
782 read_txn.num_requeues += 1;
783 self.pending_linearize_read_txns.insert(conn_id, read_txn);
784 }
785 } else {
786 ready_txns.push(read_txn);
787 }
788 }
789
790 if !ready_txns.is_empty() {
791 let otel_ctx = ready_txns.first().expect("known to exist").otel_ctx.clone();
794 let span = tracing::debug_span!("message_linearize_reads");
795 otel_ctx.attach_as_parent_to(&span);
796
797 let now = Instant::now();
798 for ready_txn in ready_txns {
799 let span = tracing::debug_span!("retire_read_results");
800 ready_txn.otel_ctx.attach_as_parent_to(&span);
801 let _entered = span.enter();
802 self.metrics
803 .linearize_message_seconds
804 .with_label_values(&[
805 ready_txn.txn.label(),
806 if ready_txn.num_requeues == 0 {
807 "true"
808 } else {
809 "false"
810 },
811 ])
812 .observe((now - ready_txn.created).as_secs_f64());
813 if let Some((ctx, result)) = ready_txn.txn.finish() {
814 ctx.retire(result);
815 }
816 }
817 }
818
819 if !self.pending_linearize_read_txns.is_empty() {
820 let remaining_ms = std::cmp::min(shortest_wait, Duration::from_millis(1_000));
822 let internal_cmd_tx = self.internal_cmd_tx.clone();
823 task::spawn(|| "deferred_read_txns", async move {
824 tokio::time::sleep(remaining_ms).await;
825 let result = internal_cmd_tx.send(Message::LinearizeReads);
827 if let Err(e) = result {
828 warn!("internal_cmd_rx dropped before we could send: {:?}", e);
829 }
830 });
831 }
832 }
833}