1use std::collections::{BTreeMap, BTreeSet, btree_map};
14use std::time::{Duration, Instant};
15
16use futures::FutureExt;
17use maplit::btreemap;
18use mz_catalog::memory::objects::ClusterReplicaProcessStatus;
19use mz_controller::ControllerResponse;
20use mz_controller::clusters::{ClusterEvent, ClusterStatus};
21use mz_ore::instrument;
22use mz_ore::now::EpochMillis;
23use mz_ore::option::OptionExt;
24use mz_ore::tracing::OpenTelemetryContext;
25use mz_ore::{soft_assert_or_log, task};
26use mz_persist_client::usage::ShardsUsageReferenced;
27use mz_repr::{Datum, Diff, Row};
28use mz_sql::ast::Statement;
29use mz_sql::pure::PurifiedStatement;
30use mz_storage_client::controller::IntrospectionType;
31use mz_storage_types::controller::CollectionMetadata;
32use opentelemetry::trace::TraceContextExt;
33use rand::{Rng, SeedableRng, rngs};
34use serde_json::json;
35use tracing::{Instrument, Level, event, info_span, warn};
36use tracing_opentelemetry::OpenTelemetrySpanExt;
37
38use crate::active_compute_sink::{ActiveComputeSink, ActiveComputeSinkRetireReason};
39use crate::catalog::{BuiltinTableUpdate, Op};
40use crate::command::Command;
41use crate::coord::{
42 AlterConnectionValidationReady, ClusterReplicaStatuses, Coordinator,
43 CreateConnectionValidationReady, Message, PurifiedStatementReady, WatchSetResponse,
44};
45use crate::telemetry::{EventDetails, SegmentClientExt};
46use crate::{AdapterNotice, TimestampContext};
47
48impl Coordinator {
49 #[instrument]
54 pub(crate) async fn handle_message(&mut self, msg: Message) -> () {
55 match msg {
56 Message::Command(otel_ctx, cmd) => {
57 let span = tracing::info_span!("message_command").or_current();
61 span.in_scope(|| otel_ctx.attach_as_parent());
62 self.message_command(cmd).instrument(span).await
63 }
64 Message::ControllerReady => {
65 let Coordinator {
66 controller,
67 catalog,
68 ..
69 } = self;
70 let storage_metadata = catalog.state().storage_metadata();
71 if let Some(m) = controller
72 .process(storage_metadata)
73 .expect("`process` never returns an error")
74 {
75 self.message_controller(m).boxed_local().await
76 }
77 }
78 Message::PurifiedStatementReady(ready) => {
79 self.message_purified_statement_ready(ready)
80 .boxed_local()
81 .await
82 }
83 Message::CreateConnectionValidationReady(ready) => {
84 self.message_create_connection_validation_ready(ready)
85 .boxed_local()
86 .await
87 }
88 Message::AlterConnectionValidationReady(ready) => {
89 self.message_alter_connection_validation_ready(ready)
90 .boxed_local()
91 .await
92 }
93 Message::TryDeferred {
94 conn_id,
95 acquired_lock,
96 } => self.try_deferred(conn_id, acquired_lock).await,
97 Message::GroupCommitInitiate(span, permit) => {
98 tracing::Span::current().add_link(span.context().span().span_context().clone());
100 self.try_group_commit(permit)
101 .instrument(span)
102 .boxed_local()
103 .await
104 }
105 Message::AdvanceTimelines => {
106 self.advance_timelines().boxed_local().await;
107 }
108 Message::ClusterEvent(event) => self.message_cluster_event(event).boxed_local().await,
109 Message::CancelPendingPeeks { conn_id } => {
110 self.cancel_pending_peeks(&conn_id);
111 }
112 Message::LinearizeReads => {
113 self.message_linearize_reads().boxed_local().await;
114 }
115 Message::StagedBatches {
116 conn_id,
117 table_id,
118 batches,
119 } => {
120 self.commit_staged_batches(conn_id, table_id, batches);
121 }
122 Message::StorageUsageSchedule => {
123 self.schedule_storage_usage_collection().boxed_local().await;
124 }
125 Message::StorageUsageFetch => {
126 self.storage_usage_fetch().boxed_local().await;
127 }
128 Message::StorageUsageUpdate(sizes) => {
129 self.storage_usage_update(sizes).boxed_local().await;
130 }
131 Message::StorageUsagePrune(expired) => {
132 self.storage_usage_prune(expired).boxed_local().await;
133 }
134 Message::RetireExecute {
135 otel_ctx,
136 data,
137 reason,
138 } => {
139 otel_ctx.attach_as_parent();
140 self.retire_execution(reason, data);
141 }
142 Message::ExecuteSingleStatementTransaction {
143 ctx,
144 otel_ctx,
145 stmt,
146 params,
147 } => {
148 otel_ctx.attach_as_parent();
149 self.sequence_execute_single_statement_transaction(ctx, stmt, params)
150 .boxed_local()
151 .await;
152 }
153 Message::PeekStageReady { ctx, span, stage } => {
154 self.sequence_staged(ctx, span, stage).boxed_local().await;
155 }
156 Message::CreateIndexStageReady { ctx, span, stage } => {
157 self.sequence_staged(ctx, span, stage).boxed_local().await;
158 }
159 Message::CreateViewStageReady { ctx, span, stage } => {
160 self.sequence_staged(ctx, span, stage).boxed_local().await;
161 }
162 Message::CreateMaterializedViewStageReady { ctx, span, stage } => {
163 self.sequence_staged(ctx, span, stage).boxed_local().await;
164 }
165 Message::SubscribeStageReady { ctx, span, stage } => {
166 self.sequence_staged(ctx, span, stage).boxed_local().await;
167 }
168 Message::IntrospectionSubscribeStageReady { span, stage } => {
169 self.sequence_staged((), span, stage).boxed_local().await;
170 }
171 Message::ExplainTimestampStageReady { ctx, span, stage } => {
172 self.sequence_staged(ctx, span, stage).boxed_local().await;
173 }
174 Message::SecretStageReady { ctx, span, stage } => {
175 self.sequence_staged(ctx, span, stage).boxed_local().await;
176 }
177 Message::ClusterStageReady { ctx, span, stage } => {
178 self.sequence_staged(ctx, span, stage).boxed_local().await;
179 }
180 Message::DrainStatementLog => {
181 self.drain_statement_log();
182 }
183 Message::PrivateLinkVpcEndpointEvents(events) => {
184 if !self.controller.read_only() {
185 self.controller
186 .storage
187 .append_introspection_updates(
188 mz_storage_client::controller::IntrospectionType::PrivatelinkConnectionStatusHistory,
189 events
190 .into_iter()
191 .map(|e| (mz_repr::Row::from(e), Diff::ONE))
192 .collect(),
193 );
194 }
195 }
196 Message::CheckSchedulingPolicies => {
197 self.check_scheduling_policies().boxed_local().await;
198 }
199 Message::SchedulingDecisions(decisions) => {
200 self.handle_scheduling_decisions(decisions)
201 .boxed_local()
202 .await;
203 }
204 Message::DeferredStatementReady => {
205 self.handle_deferred_statement().boxed_local().await;
206 }
207 }
208 }
209
210 #[mz_ore::instrument(level = "debug")]
211 pub async fn storage_usage_fetch(&mut self) {
212 let internal_cmd_tx = self.internal_cmd_tx.clone();
213 let client = self.storage_usage_client.clone();
214
215 let live_shards: BTreeSet<_> = self
217 .controller
218 .storage
219 .active_collection_metadatas()
220 .into_iter()
221 .flat_map(|(_id, collection_metadata)| {
222 let CollectionMetadata {
223 data_shard,
224 remap_shard,
225 persist_location: _,
233 relation_desc: _,
234 txns_shard: _,
235 } = collection_metadata;
236 [remap_shard, Some(data_shard)].into_iter()
237 })
238 .filter_map(|shard| shard)
239 .collect();
240
241 let collection_metric = self
242 .metrics
243 .storage_usage_collection_time_seconds
244 .with_label_values(&[]);
245
246 task::spawn(|| "storage_usage_fetch", async move {
249 let collection_metric_timer = collection_metric.start_timer();
250 let shard_sizes = client.shards_usage_referenced(live_shards).await;
251 collection_metric_timer.observe_duration();
252
253 if let Err(e) = internal_cmd_tx.send(Message::StorageUsageUpdate(shard_sizes)) {
256 warn!("internal_cmd_rx dropped before we could send: {:?}", e);
257 }
258 });
259 }
260
261 #[mz_ore::instrument(level = "debug")]
262 async fn storage_usage_update(&mut self, shards_usage: ShardsUsageReferenced) {
263 let collection_timestamp = if self.controller.read_only() {
268 self.peek_local_write_ts().await.into()
269 } else {
270 self.get_local_write_ts().await.timestamp.into()
273 };
274
275 let ops = shards_usage
276 .by_shard
277 .into_iter()
278 .map(|(shard_id, shard_usage)| Op::WeirdStorageUsageUpdates {
279 object_id: Some(shard_id.to_string()),
280 size_bytes: shard_usage.size_bytes(),
281 collection_timestamp,
282 })
283 .collect();
284
285 match self.catalog_transact_inner(None, ops).await {
286 Ok(table_updates) => {
287 let internal_cmd_tx = self.internal_cmd_tx.clone();
288 let task_span =
289 info_span!(parent: None, "coord::storage_usage_update::table_updates");
290 OpenTelemetryContext::obtain().attach_as_parent_to(&task_span);
291 task::spawn(|| "storage_usage_update_table_updates", async move {
292 table_updates.instrument(task_span).await;
293 if let Err(e) = internal_cmd_tx.send(Message::StorageUsageSchedule) {
295 warn!("internal_cmd_rx dropped before we could send: {e:?}");
296 }
297 });
298 }
299 Err(err) => tracing::warn!("Failed to update storage metrics: {:?}", err),
300 }
301 }
302
303 #[mz_ore::instrument(level = "debug")]
304 async fn storage_usage_prune(&mut self, expired: Vec<BuiltinTableUpdate>) {
305 let (fut, _) = self.builtin_table_update().execute(expired).await;
306 task::spawn(|| "storage_usage_pruning_apply", async move {
307 fut.await;
308 });
309 }
310
311 pub async fn schedule_storage_usage_collection(&self) {
312 const SEED_LEN: usize = 32;
320 let mut seed = [0; SEED_LEN];
321 for (i, byte) in self
322 .catalog()
323 .state()
324 .config()
325 .environment_id
326 .organization_id()
327 .as_bytes()
328 .into_iter()
329 .take(SEED_LEN)
330 .enumerate()
331 {
332 seed[i] = *byte;
333 }
334 let storage_usage_collection_interval_ms: EpochMillis =
335 EpochMillis::try_from(self.storage_usage_collection_interval.as_millis())
336 .expect("storage usage collection interval must fit into u64");
337 let offset =
338 rngs::SmallRng::from_seed(seed).gen_range(0..storage_usage_collection_interval_ms);
339 let now_ts: EpochMillis = self.peek_local_write_ts().await.into();
340
341 let previous_collection_ts =
343 (now_ts - (now_ts % storage_usage_collection_interval_ms)) + offset;
344 let next_collection_ts = if previous_collection_ts > now_ts {
345 previous_collection_ts
346 } else {
347 previous_collection_ts + storage_usage_collection_interval_ms
348 };
349 let next_collection_interval = Duration::from_millis(next_collection_ts - now_ts);
350
351 let internal_cmd_tx = self.internal_cmd_tx.clone();
353 task::spawn(|| "storage_usage_collection", async move {
354 tokio::time::sleep(next_collection_interval).await;
355 if internal_cmd_tx.send(Message::StorageUsageFetch).is_err() {
356 }
358 });
359 }
360
361 #[mz_ore::instrument(level = "debug")]
362 async fn message_command(&mut self, cmd: Command) {
363 self.handle_command(cmd).await;
364 }
365
366 #[mz_ore::instrument(level = "debug")]
367 async fn message_controller(&mut self, message: ControllerResponse) {
368 event!(Level::TRACE, message = format!("{:?}", message));
369 match message {
370 ControllerResponse::PeekNotification(uuid, response, otel_ctx) => {
371 self.handle_peek_notification(uuid, response, otel_ctx);
372 }
373 ControllerResponse::SubscribeResponse(sink_id, response) => {
374 if let Some(ActiveComputeSink::Subscribe(active_subscribe)) =
375 self.active_compute_sinks.get_mut(&sink_id)
376 {
377 let finished = active_subscribe.process_response(response);
378 if finished {
379 self.retire_compute_sinks(btreemap! {
380 sink_id => ActiveComputeSinkRetireReason::Finished,
381 })
382 .await;
383 }
384
385 soft_assert_or_log!(
386 !self.introspection_subscribes.contains_key(&sink_id),
387 "`sink_id` {sink_id} unexpectedly found in both `active_subscribes` \
388 and `introspection_subscribes`",
389 );
390 } else if self.introspection_subscribes.contains_key(&sink_id) {
391 self.handle_introspection_subscribe_batch(sink_id, response)
392 .await;
393 } else {
394 }
397 }
398 ControllerResponse::CopyToResponse(sink_id, response) => {
399 match self.drop_compute_sink(sink_id).await {
400 Some(ActiveComputeSink::CopyTo(active_copy_to)) => {
401 active_copy_to.retire_with_response(response);
402 }
403 _ => {
404 }
407 }
408 }
409 ControllerResponse::ComputeReplicaMetrics(replica_id, new) => {
410 let m = match self
411 .transient_replica_metadata
412 .entry(replica_id)
413 .or_insert_with(|| Some(Default::default()))
414 {
415 None => return,
417 Some(md) => &mut md.metrics,
418 };
419 let old = std::mem::replace(m, Some(new.clone()));
420 if old.as_ref() != Some(&new) {
421 let retractions = old.map(|old| {
422 self.catalog().state().pack_replica_metric_updates(
423 replica_id,
424 &old,
425 Diff::MINUS_ONE,
426 )
427 });
428 let insertions = self.catalog().state().pack_replica_metric_updates(
429 replica_id,
430 &new,
431 Diff::ONE,
432 );
433 let updates = if let Some(retractions) = retractions {
434 retractions
435 .into_iter()
436 .chain(insertions.into_iter())
437 .collect()
438 } else {
439 insertions
440 };
441 let updates = self
442 .catalog()
443 .state()
444 .resolve_builtin_table_updates(updates);
445 let _notify = self.builtin_table_update().background(updates);
447 }
448 }
449 ControllerResponse::WatchSetFinished(ws_ids) => {
450 let now = self.now();
451 for ws_id in ws_ids {
452 let Some((conn_id, rsp)) = self.installed_watch_sets.remove(&ws_id) else {
453 continue;
454 };
455 self.connection_watch_sets
456 .get_mut(&conn_id)
457 .expect("corrupted coordinator state: unknown connection id")
458 .remove(&ws_id);
459 if self.connection_watch_sets[&conn_id].is_empty() {
460 self.connection_watch_sets.remove(&conn_id);
461 }
462
463 match rsp {
464 WatchSetResponse::StatementDependenciesReady(id, ev) => {
465 self.record_statement_lifecycle_event(&id, &ev, now);
466 }
467 WatchSetResponse::AlterSinkReady(ctx) => {
468 self.sequence_alter_sink_finish(ctx).await;
469 }
470 }
471 }
472 }
473 }
474 }
475
476 #[mz_ore::instrument(level = "debug")]
477 async fn message_purified_statement_ready(
478 &mut self,
479 PurifiedStatementReady {
480 ctx,
481 result,
482 params,
483 mut plan_validity,
484 original_stmt,
485 otel_ctx,
486 }: PurifiedStatementReady,
487 ) {
488 otel_ctx.attach_as_parent();
489
490 if plan_validity.check(self.catalog()).is_err() {
501 self.handle_execute_inner(original_stmt, params, ctx).await;
502 return;
503 }
504
505 let purified_statement = match result {
506 Ok(ok) => ok,
507 Err(e) => return ctx.retire(Err(e)),
508 };
509
510 let plan = match purified_statement {
511 PurifiedStatement::PurifiedCreateSource {
512 create_progress_subsource_stmt,
513 create_source_stmt,
514 subsources,
515 available_source_references,
516 } => {
517 self.plan_purified_create_source(
518 &ctx,
519 params,
520 create_progress_subsource_stmt,
521 create_source_stmt,
522 subsources,
523 available_source_references,
524 )
525 .await
526 }
527 PurifiedStatement::PurifiedAlterSourceAddSubsources {
528 source_name,
529 options,
530 subsources,
531 } => {
532 self.plan_purified_alter_source_add_subsource(
533 ctx.session(),
534 params,
535 source_name,
536 options,
537 subsources,
538 )
539 .await
540 }
541 PurifiedStatement::PurifiedAlterSourceRefreshReferences {
542 source_name,
543 available_source_references,
544 } => self.plan_purified_alter_source_refresh_references(
545 ctx.session(),
546 params,
547 source_name,
548 available_source_references,
549 ),
550 o @ (PurifiedStatement::PurifiedAlterSource { .. }
551 | PurifiedStatement::PurifiedCreateSink(..)
552 | PurifiedStatement::PurifiedCreateTableFromSource { .. }) => {
553 let stmt = match o {
555 PurifiedStatement::PurifiedAlterSource { alter_source_stmt } => {
556 Statement::AlterSource(alter_source_stmt)
557 }
558 PurifiedStatement::PurifiedCreateTableFromSource { stmt } => {
559 Statement::CreateTableFromSource(stmt)
560 }
561 PurifiedStatement::PurifiedCreateSink(stmt) => Statement::CreateSink(stmt),
562 PurifiedStatement::PurifiedCreateSource { .. }
563 | PurifiedStatement::PurifiedAlterSourceAddSubsources { .. }
564 | PurifiedStatement::PurifiedAlterSourceRefreshReferences { .. } => {
565 unreachable!("not part of exterior match stmt")
566 }
567 };
568
569 let catalog = self.catalog().for_session(ctx.session());
572 let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &stmt);
573 self.plan_statement(ctx.session(), stmt, ¶ms, &resolved_ids)
574 .map(|plan| (plan, resolved_ids))
575 }
576 };
577
578 match plan {
579 Ok((plan, resolved_ids)) => self.sequence_plan(ctx, plan, resolved_ids).await,
580 Err(e) => ctx.retire(Err(e)),
581 }
582 }
583
584 #[mz_ore::instrument(level = "debug")]
585 async fn message_create_connection_validation_ready(
586 &mut self,
587 CreateConnectionValidationReady {
588 mut ctx,
589 result,
590 connection_id,
591 connection_gid,
592 mut plan_validity,
593 otel_ctx,
594 resolved_ids,
595 }: CreateConnectionValidationReady,
596 ) {
597 otel_ctx.attach_as_parent();
598
599 if let Err(e) = plan_validity.check(self.catalog()) {
605 let _ = self.secrets_controller.delete(connection_id).await;
606 return ctx.retire(Err(e));
607 }
608
609 let plan = match result {
610 Ok(ok) => ok,
611 Err(e) => {
612 let _ = self.secrets_controller.delete(connection_id).await;
613 return ctx.retire(Err(e));
614 }
615 };
616
617 let result = self
618 .sequence_create_connection_stage_finish(
619 ctx.session_mut(),
620 connection_id,
621 connection_gid,
622 plan,
623 resolved_ids,
624 )
625 .await;
626 ctx.retire(result);
627 }
628
629 #[mz_ore::instrument(level = "debug")]
630 async fn message_alter_connection_validation_ready(
631 &mut self,
632 AlterConnectionValidationReady {
633 mut ctx,
634 result,
635 connection_id,
636 connection_gid: _,
637 mut plan_validity,
638 otel_ctx,
639 resolved_ids: _,
640 }: AlterConnectionValidationReady,
641 ) {
642 otel_ctx.attach_as_parent();
643
644 if let Err(e) = plan_validity.check(self.catalog()) {
650 return ctx.retire(Err(e));
651 }
652
653 let conn = match result {
654 Ok(ok) => ok,
655 Err(e) => {
656 return ctx.retire(Err(e));
657 }
658 };
659
660 let result = self
661 .sequence_alter_connection_stage_finish(ctx.session_mut(), connection_id, conn)
662 .await;
663 ctx.retire(result);
664 }
665
666 #[mz_ore::instrument(level = "debug")]
667 async fn message_cluster_event(&mut self, event: ClusterEvent) {
668 event!(Level::TRACE, event = format!("{:?}", event));
669
670 if let Some(segment_client) = &self.segment_client {
671 let env_id = &self.catalog().config().environment_id;
672 let mut properties = json!({
673 "cluster_id": event.cluster_id.to_string(),
674 "replica_id": event.replica_id.to_string(),
675 "process_id": event.process_id,
676 "status": event.status.as_kebab_case_str(),
677 });
678 match event.status {
679 ClusterStatus::Online => (),
680 ClusterStatus::Offline(reason) => {
681 let properties = match &mut properties {
682 serde_json::Value::Object(map) => map,
683 _ => unreachable!(),
684 };
685 properties.insert(
686 "reason".into(),
687 json!(reason.display_or("unknown").to_string()),
688 );
689 }
690 };
691 segment_client.environment_track(
692 env_id,
693 "Cluster Changed Status",
694 properties,
695 EventDetails {
696 timestamp: Some(event.time),
697 ..Default::default()
698 },
699 );
700 }
701
702 let Some(replica_statues) = self
705 .cluster_replica_statuses
706 .try_get_cluster_replica_statuses(event.cluster_id, event.replica_id)
707 else {
708 return;
709 };
710
711 if event.status != replica_statues[&event.process_id].status {
712 if !self.controller.read_only() {
713 let offline_reason = match event.status {
714 ClusterStatus::Online => None,
715 ClusterStatus::Offline(None) => None,
716 ClusterStatus::Offline(Some(reason)) => Some(reason.to_string()),
717 };
718 let row = Row::pack_slice(&[
719 Datum::String(&event.replica_id.to_string()),
720 Datum::UInt64(event.process_id),
721 Datum::String(event.status.as_kebab_case_str()),
722 Datum::from(offline_reason.as_deref()),
723 Datum::TimestampTz(event.time.try_into().expect("must fit")),
724 ]);
725 self.controller.storage.append_introspection_updates(
726 IntrospectionType::ReplicaStatusHistory,
727 vec![(row, Diff::ONE)],
728 );
729 }
730
731 let old_replica_status =
732 ClusterReplicaStatuses::cluster_replica_status(replica_statues);
733 let old_process_status = replica_statues
734 .get(&event.process_id)
735 .expect("Process exists");
736 let builtin_table_retraction =
737 self.catalog().state().pack_cluster_replica_status_update(
738 event.replica_id,
739 event.process_id,
740 old_process_status,
741 Diff::MINUS_ONE,
742 );
743 let builtin_table_retraction = self
744 .catalog()
745 .state()
746 .resolve_builtin_table_update(builtin_table_retraction);
747
748 let new_process_status = ClusterReplicaProcessStatus {
749 status: event.status,
750 time: event.time,
751 };
752 let builtin_table_addition = self.catalog().state().pack_cluster_replica_status_update(
753 event.replica_id,
754 event.process_id,
755 &new_process_status,
756 Diff::ONE,
757 );
758 let builtin_table_addition = self
759 .catalog()
760 .state()
761 .resolve_builtin_table_update(builtin_table_addition);
762 self.cluster_replica_statuses.ensure_cluster_status(
763 event.cluster_id,
764 event.replica_id,
765 event.process_id,
766 new_process_status,
767 );
768
769 let builtin_table_updates = vec![builtin_table_retraction, builtin_table_addition];
770 let builtin_table_completion = self.builtin_table_update().defer(builtin_table_updates);
772
773 let cluster = self.catalog().get_cluster(event.cluster_id);
774 let replica = cluster.replica(event.replica_id).expect("Replica exists");
775 let new_replica_status = self
776 .cluster_replica_statuses
777 .get_cluster_replica_status(event.cluster_id, event.replica_id);
778
779 if old_replica_status != new_replica_status {
780 let notifier = self.broadcast_notice_tx();
781 let notice = AdapterNotice::ClusterReplicaStatusChanged {
782 cluster: cluster.name.clone(),
783 replica: replica.name.clone(),
784 status: new_replica_status,
785 time: event.time,
786 };
787 mz_ore::task::spawn(
790 || format!("cluster_event-{}-{}", event.cluster_id, event.replica_id),
791 async move {
792 builtin_table_completion.await;
794 (notifier)(notice)
796 },
797 );
798 }
799 }
800 }
801
802 #[mz_ore::instrument(level = "debug")]
803 async fn message_linearize_reads(&mut self) {
808 let mut shortest_wait = Duration::MAX;
809 let mut ready_txns = Vec::new();
810
811 let mut cached_oracle_ts = BTreeMap::new();
816
817 for (conn_id, mut read_txn) in std::mem::take(&mut self.pending_linearize_read_txns) {
818 if let TimestampContext::TimelineTimestamp {
819 timeline,
820 chosen_ts,
821 oracle_ts,
822 } = read_txn.timestamp_context()
823 {
824 let oracle_ts = match oracle_ts {
825 Some(oracle_ts) => oracle_ts,
826 None => {
827 ready_txns.push(read_txn);
829 continue;
830 }
831 };
832
833 if chosen_ts <= oracle_ts {
834 ready_txns.push(read_txn);
837 continue;
838 }
839
840 let current_oracle_ts = cached_oracle_ts.entry(timeline.clone());
842 let current_oracle_ts = match current_oracle_ts {
843 btree_map::Entry::Vacant(entry) => {
844 let timestamp_oracle = self.get_timestamp_oracle(timeline);
845 let read_ts = timestamp_oracle.read_ts().await;
846 entry.insert(read_ts.clone());
847 read_ts
848 }
849 btree_map::Entry::Occupied(entry) => entry.get().clone(),
850 };
851
852 if *chosen_ts <= current_oracle_ts {
853 ready_txns.push(read_txn);
854 } else {
855 let wait =
856 Duration::from_millis(chosen_ts.saturating_sub(current_oracle_ts).into());
857 if wait < shortest_wait {
858 shortest_wait = wait;
859 }
860 read_txn.num_requeues += 1;
861 self.pending_linearize_read_txns.insert(conn_id, read_txn);
862 }
863 } else {
864 ready_txns.push(read_txn);
865 }
866 }
867
868 if !ready_txns.is_empty() {
869 let otel_ctx = ready_txns.first().expect("known to exist").otel_ctx.clone();
872 let span = tracing::debug_span!("message_linearize_reads");
873 otel_ctx.attach_as_parent_to(&span);
874
875 let now = Instant::now();
876 for ready_txn in ready_txns {
877 let span = tracing::debug_span!("retire_read_results");
878 ready_txn.otel_ctx.attach_as_parent_to(&span);
879 let _entered = span.enter();
880 self.metrics
881 .linearize_message_seconds
882 .with_label_values(&[
883 ready_txn.txn.label(),
884 if ready_txn.num_requeues == 0 {
885 "true"
886 } else {
887 "false"
888 },
889 ])
890 .observe((now - ready_txn.created).as_secs_f64());
891 if let Some((ctx, result)) = ready_txn.txn.finish() {
892 ctx.retire(result);
893 }
894 }
895 }
896
897 if !self.pending_linearize_read_txns.is_empty() {
898 let remaining_ms = std::cmp::min(shortest_wait, Duration::from_millis(1_000));
900 let internal_cmd_tx = self.internal_cmd_tx.clone();
901 task::spawn(|| "deferred_read_txns", async move {
902 tokio::time::sleep(remaining_ms).await;
903 let result = internal_cmd_tx.send(Message::LinearizeReads);
905 if let Err(e) = result {
906 warn!("internal_cmd_rx dropped before we could send: {:?}", e);
907 }
908 });
909 }
910 }
911}