Skip to main content

mz_adapter/coord/
message_handler.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic for processing [`Coordinator`] messages. The [`Coordinator`] receives
11//! messages from various sources (ex: controller, clients, background tasks, etc).
12
13use std::collections::{BTreeMap, BTreeSet, btree_map};
14use std::time::{Duration, Instant};
15
16use futures::FutureExt;
17use maplit::btreemap;
18use mz_catalog::memory::objects::ClusterReplicaProcessStatus;
19use mz_controller::ControllerResponse;
20use mz_controller::clusters::{ClusterEvent, ClusterStatus};
21use mz_ore::instrument;
22use mz_ore::now::EpochMillis;
23use mz_ore::option::OptionExt;
24use mz_ore::tracing::OpenTelemetryContext;
25use mz_ore::{soft_assert_or_log, task};
26use mz_persist_client::usage::ShardsUsageReferenced;
27use mz_repr::{Datum, Diff, Row};
28use mz_sql::ast::Statement;
29use mz_sql::pure::PurifiedStatement;
30use mz_storage_client::controller::IntrospectionType;
31use opentelemetry::trace::TraceContextExt;
32use rand::{Rng, SeedableRng, rngs};
33use serde_json::json;
34use tracing::{Instrument, Level, event, info_span, warn};
35use tracing_opentelemetry::OpenTelemetrySpanExt;
36
37use crate::active_compute_sink::{ActiveComputeSink, ActiveComputeSinkRetireReason};
38use crate::catalog::{BuiltinTableUpdate, Op};
39use crate::command::Command;
40use crate::coord::{
41    AlterConnectionValidationReady, ClusterReplicaStatuses, Coordinator,
42    CreateConnectionValidationReady, Message, PurifiedStatementReady, WatchSetResponse,
43};
44use crate::telemetry::{EventDetails, SegmentClientExt};
45use crate::{AdapterNotice, TimestampContext};
46
47impl Coordinator {
48    /// BOXED FUTURE: As of Nov 2023 the returned Future from this function was 74KB. This would
49    /// get stored on the stack which is bad for runtime performance, and blow up our stack usage.
50    /// Because of that we purposefully move Futures of inner function calls onto the heap
51    /// (i.e. Box it).
52    #[instrument]
53    pub(crate) async fn handle_message(&mut self, msg: Message) -> () {
54        match msg {
55            Message::Command(otel_ctx, cmd) => {
56                // TODO: We need a Span that is not none for the otel_ctx to attach the parent
57                // relationship to. If we swap the otel_ctx in `Command::Message` for a Span, we
58                // can downgrade this to a debug_span.
59                let span = tracing::info_span!("message_command").or_current();
60                span.in_scope(|| otel_ctx.attach_as_parent());
61                self.message_command(cmd).instrument(span).await
62            }
63            Message::ControllerReady { controller: _ } => {
64                let Coordinator {
65                    controller,
66                    catalog,
67                    ..
68                } = self;
69                let storage_metadata = catalog.state().storage_metadata();
70                if let Some(m) = controller
71                    .process(storage_metadata)
72                    .expect("`process` never returns an error")
73                {
74                    self.message_controller(m).boxed_local().await
75                }
76            }
77            Message::PurifiedStatementReady(ready) => {
78                self.message_purified_statement_ready(ready)
79                    .boxed_local()
80                    .await
81            }
82            Message::CreateConnectionValidationReady(ready) => {
83                self.message_create_connection_validation_ready(ready)
84                    .boxed_local()
85                    .await
86            }
87            Message::AlterConnectionValidationReady(ready) => {
88                self.message_alter_connection_validation_ready(ready)
89                    .boxed_local()
90                    .await
91            }
92            Message::TryDeferred {
93                conn_id,
94                acquired_lock,
95            } => self.try_deferred(conn_id, acquired_lock).await,
96            Message::GroupCommitInitiate(span, permit) => {
97                // Add an OpenTelemetry link to our current span.
98                tracing::Span::current().add_link(span.context().span().span_context().clone());
99                self.try_group_commit(permit)
100                    .instrument(span)
101                    .boxed_local()
102                    .await
103            }
104            Message::AdvanceTimelines => {
105                self.advance_timelines().boxed_local().await;
106            }
107            Message::ClusterEvent(event) => self.message_cluster_event(event).boxed_local().await,
108            Message::CancelPendingPeeks { conn_id } => {
109                self.cancel_pending_peeks(&conn_id);
110            }
111            Message::LinearizeReads => {
112                self.message_linearize_reads().boxed_local().await;
113            }
114            Message::StagedBatches {
115                conn_id,
116                table_id,
117                batches,
118            } => {
119                self.commit_staged_batches(conn_id, table_id, batches);
120            }
121            Message::StorageUsageSchedule => {
122                self.schedule_storage_usage_collection().boxed_local().await;
123            }
124            Message::StorageUsageFetch => {
125                self.storage_usage_fetch().boxed_local().await;
126            }
127            Message::StorageUsageUpdate(sizes) => {
128                self.storage_usage_update(sizes).boxed_local().await;
129            }
130            Message::StorageUsagePrune(expired) => {
131                self.storage_usage_prune(expired).boxed_local().await;
132            }
133            Message::RetireExecute {
134                otel_ctx,
135                data,
136                reason,
137            } => {
138                otel_ctx.attach_as_parent();
139                self.retire_execution(reason, data);
140            }
141            Message::ExecuteSingleStatementTransaction {
142                ctx,
143                otel_ctx,
144                stmt,
145                params,
146            } => {
147                otel_ctx.attach_as_parent();
148                self.sequence_execute_single_statement_transaction(ctx, stmt, params)
149                    .boxed_local()
150                    .await;
151            }
152            Message::PeekStageReady { ctx, span, stage } => {
153                self.sequence_staged(ctx, span, stage).boxed_local().await;
154            }
155            Message::CreateIndexStageReady { ctx, span, stage } => {
156                self.sequence_staged(ctx, span, stage).boxed_local().await;
157            }
158            Message::CreateViewStageReady { ctx, span, stage } => {
159                self.sequence_staged(ctx, span, stage).boxed_local().await;
160            }
161            Message::CreateMaterializedViewStageReady { ctx, span, stage } => {
162                self.sequence_staged(ctx, span, stage).boxed_local().await;
163            }
164            Message::SubscribeStageReady { ctx, span, stage } => {
165                self.sequence_staged(ctx, span, stage).boxed_local().await;
166            }
167            Message::IntrospectionSubscribeStageReady { span, stage } => {
168                self.sequence_staged((), span, stage).boxed_local().await;
169            }
170            Message::ExplainTimestampStageReady { ctx, span, stage } => {
171                self.sequence_staged(ctx, span, stage).boxed_local().await;
172            }
173            Message::SecretStageReady { ctx, span, stage } => {
174                self.sequence_staged(ctx, span, stage).boxed_local().await;
175            }
176            Message::ClusterStageReady { ctx, span, stage } => {
177                self.sequence_staged(ctx, span, stage).boxed_local().await;
178            }
179            Message::DrainStatementLog => {
180                self.drain_statement_log();
181            }
182            Message::PrivateLinkVpcEndpointEvents(events) => {
183                if !self.controller.read_only() {
184                    self.controller.storage.append_introspection_updates(
185                        IntrospectionType::PrivatelinkConnectionStatusHistory,
186                        events
187                            .into_iter()
188                            .map(|e| (mz_repr::Row::from(e), Diff::ONE))
189                            .collect(),
190                    );
191                }
192            }
193            Message::CheckSchedulingPolicies => {
194                self.check_scheduling_policies().boxed_local().await;
195            }
196            Message::SchedulingDecisions(decisions) => {
197                self.handle_scheduling_decisions(decisions)
198                    .boxed_local()
199                    .await;
200            }
201            Message::DeferredStatementReady => {
202                self.handle_deferred_statement().boxed_local().await;
203            }
204        }
205    }
206
207    #[mz_ore::instrument(level = "debug")]
208    pub async fn storage_usage_fetch(&self) {
209        let internal_cmd_tx = self.internal_cmd_tx.clone();
210        let client = self.storage_usage_client.clone();
211
212        // Record the currently live shards.
213        let live_shards: BTreeSet<_> = self
214            .controller
215            .storage
216            .active_collection_metadatas()
217            .into_iter()
218            .map(|(_id, m)| m.data_shard)
219            .collect();
220
221        let collection_metric = self.metrics.storage_usage_collection_time_seconds.clone();
222
223        // Spawn an asynchronous task to compute the storage usage, which
224        // requires a slow scan of the underlying storage engine.
225        task::spawn(|| "storage_usage_fetch", async move {
226            let collection_metric_timer = collection_metric.start_timer();
227            let shard_sizes = client.shards_usage_referenced(live_shards).await;
228            collection_metric_timer.observe_duration();
229
230            // It is not an error for shard sizes to become ready after
231            // `internal_cmd_rx` is dropped.
232            if let Err(e) = internal_cmd_tx.send(Message::StorageUsageUpdate(shard_sizes)) {
233                warn!("internal_cmd_rx dropped before we could send: {:?}", e);
234            }
235        });
236    }
237
238    #[mz_ore::instrument(level = "debug")]
239    async fn storage_usage_update(&mut self, shards_usage: ShardsUsageReferenced) {
240        // Similar to audit events, use the oracle ts so this is guaranteed to
241        // increase. This is intentionally the timestamp of when collection
242        // finished, not when it started, so that we don't write data with a
243        // timestamp in the past.
244        let collection_timestamp = if self.controller.read_only() {
245            self.peek_local_write_ts().await.into()
246        } else {
247            // Getting a write timestamp bumps the write timestamp in the
248            // oracle, which we're not allowed in read-only mode.
249            self.get_local_write_ts().await.timestamp.into()
250        };
251
252        let ops = shards_usage
253            .by_shard
254            .into_iter()
255            .map(|(shard_id, shard_usage)| Op::WeirdStorageUsageUpdates {
256                object_id: Some(shard_id.to_string()),
257                size_bytes: shard_usage.size_bytes(),
258                collection_timestamp,
259            })
260            .collect();
261
262        match self.catalog_transact_inner(None, ops).await {
263            Ok((table_updates, catalog_updates)) => {
264                assert!(
265                    catalog_updates.is_empty(),
266                    "applying builtin table updates does not produce catalog implications"
267                );
268
269                let internal_cmd_tx = self.internal_cmd_tx.clone();
270                let task_span =
271                    info_span!(parent: None, "coord::storage_usage_update::table_updates");
272                OpenTelemetryContext::obtain().attach_as_parent_to(&task_span);
273                task::spawn(|| "storage_usage_update_table_updates", async move {
274                    table_updates.instrument(task_span).await;
275                    // It is not an error for this task to be running after `internal_cmd_rx` is dropped.
276                    if let Err(e) = internal_cmd_tx.send(Message::StorageUsageSchedule) {
277                        warn!("internal_cmd_rx dropped before we could send: {e:?}");
278                    }
279                });
280            }
281            Err(err) => tracing::warn!("Failed to update storage metrics: {:?}", err),
282        }
283    }
284
285    #[mz_ore::instrument(level = "debug")]
286    async fn storage_usage_prune(&mut self, expired: Vec<BuiltinTableUpdate>) {
287        let (fut, _) = self.builtin_table_update().execute(expired).await;
288        task::spawn(|| "storage_usage_pruning_apply", async move {
289            fut.await;
290        });
291    }
292
293    pub async fn schedule_storage_usage_collection(&self) {
294        // Instead of using an `tokio::timer::Interval`, we calculate the time until the next
295        // usage collection and wait for that amount of time. This is so we can keep the intervals
296        // consistent even across restarts. If collection takes too long, it is possible that
297        // we miss an interval.
298
299        // 1) Deterministically pick some offset within the collection interval to prevent
300        // thundering herds across environments.
301        const SEED_LEN: usize = 32;
302        let mut seed = [0; SEED_LEN];
303        for (i, byte) in self
304            .catalog()
305            .state()
306            .config()
307            .environment_id
308            .organization_id()
309            .as_bytes()
310            .into_iter()
311            .take(SEED_LEN)
312            .enumerate()
313        {
314            seed[i] = *byte;
315        }
316        let storage_usage_collection_interval_ms: EpochMillis =
317            EpochMillis::try_from(self.storage_usage_collection_interval.as_millis())
318                .expect("storage usage collection interval must fit into u64");
319        let offset =
320            rngs::SmallRng::from_seed(seed).random_range(0..storage_usage_collection_interval_ms);
321        let now_ts: EpochMillis = self.peek_local_write_ts().await.into();
322
323        // 2) Determine the amount of ms between now and the next collection time.
324        let previous_collection_ts =
325            (now_ts - (now_ts % storage_usage_collection_interval_ms)) + offset;
326        let next_collection_ts = if previous_collection_ts > now_ts {
327            previous_collection_ts
328        } else {
329            previous_collection_ts + storage_usage_collection_interval_ms
330        };
331        let next_collection_interval = Duration::from_millis(next_collection_ts - now_ts);
332
333        // 3) Sleep for that amount of time, then initiate another storage usage collection.
334        let internal_cmd_tx = self.internal_cmd_tx.clone();
335        task::spawn(|| "storage_usage_collection", async move {
336            tokio::time::sleep(next_collection_interval).await;
337            if internal_cmd_tx.send(Message::StorageUsageFetch).is_err() {
338                // If sending fails, the main thread has shutdown.
339            }
340        });
341    }
342
343    #[mz_ore::instrument(level = "debug")]
344    async fn message_command(&mut self, cmd: Command) {
345        self.handle_command(cmd).await;
346    }
347
348    #[mz_ore::instrument(level = "debug")]
349    async fn message_controller(&mut self, message: ControllerResponse) {
350        event!(Level::TRACE, message = format!("{:?}", message));
351        match message {
352            ControllerResponse::PeekNotification(uuid, response, otel_ctx) => {
353                self.handle_peek_notification(uuid, response, otel_ctx);
354            }
355            ControllerResponse::SubscribeResponse(sink_id, response) => {
356                if let Some(ActiveComputeSink::Subscribe(active_subscribe)) =
357                    self.active_compute_sinks.get_mut(&sink_id)
358                {
359                    let finished = active_subscribe.process_response(response);
360                    if finished {
361                        self.retire_compute_sinks(btreemap! {
362                            sink_id => ActiveComputeSinkRetireReason::Finished,
363                        })
364                        .await;
365                    }
366
367                    soft_assert_or_log!(
368                        !self.introspection_subscribes.contains_key(&sink_id),
369                        "`sink_id` {sink_id} unexpectedly found in both `active_subscribes` \
370                         and `introspection_subscribes`",
371                    );
372                } else if self.introspection_subscribes.contains_key(&sink_id) {
373                    self.handle_introspection_subscribe_batch(sink_id, response)
374                        .await;
375                } else {
376                    // Cancellation may cause us to receive responses for subscribes no longer
377                    // tracked, so we quietly ignore them.
378                }
379            }
380            ControllerResponse::CopyToResponse(sink_id, response) => {
381                match self.drop_compute_sink(sink_id).await {
382                    Some(ActiveComputeSink::CopyTo(active_copy_to)) => {
383                        active_copy_to.retire_with_response(response);
384                    }
385                    _ => {
386                        // Cancellation may cause us to receive responses for subscribes no longer
387                        // tracked, so we quietly ignore them.
388                    }
389                }
390            }
391            ControllerResponse::WatchSetFinished(ws_ids) => {
392                let now = self.now();
393                for ws_id in ws_ids {
394                    let Some((conn_id, rsp)) = self.installed_watch_sets.remove(&ws_id) else {
395                        continue;
396                    };
397                    self.connection_watch_sets
398                        .get_mut(&conn_id)
399                        .expect("corrupted coordinator state: unknown connection id")
400                        .remove(&ws_id);
401                    if self.connection_watch_sets[&conn_id].is_empty() {
402                        self.connection_watch_sets.remove(&conn_id);
403                    }
404
405                    match rsp {
406                        WatchSetResponse::StatementDependenciesReady(id, ev) => {
407                            self.record_statement_lifecycle_event(&id, &ev, now);
408                        }
409                        WatchSetResponse::AlterSinkReady(ctx) => {
410                            self.sequence_alter_sink_finish(ctx).await;
411                        }
412                        WatchSetResponse::AlterMaterializedViewReady(ctx) => {
413                            self.sequence_alter_materialized_view_apply_replacement_finish(ctx)
414                                .await;
415                        }
416                    }
417                }
418            }
419        }
420    }
421
422    #[mz_ore::instrument(level = "debug")]
423    async fn message_purified_statement_ready(
424        &mut self,
425        PurifiedStatementReady {
426            ctx,
427            result,
428            params,
429            mut plan_validity,
430            original_stmt,
431            otel_ctx,
432        }: PurifiedStatementReady,
433    ) {
434        otel_ctx.attach_as_parent();
435
436        // Ensure that all dependencies still exist after purification, as a
437        // `DROP CONNECTION` or other `DROP` may have sneaked in. If any have gone missing, we
438        // repurify the original statement. This will either produce a nice
439        // "unknown connector" error, or pick up a new connector that has
440        // replaced the dropped connector.
441        //
442        // n.b. an `ALTER CONNECTION` occurring during purification is OK
443        // because we always look up/populate a connection's state after
444        // committing to the catalog, so are guaranteed to see the connection's
445        // most recent version.
446        if plan_validity.check(self.catalog()).is_err() {
447            self.handle_execute_inner(original_stmt, params, ctx).await;
448            return;
449        }
450
451        let purified_statement = match result {
452            Ok(ok) => ok,
453            Err(e) => return ctx.retire(Err(e)),
454        };
455
456        let plan = match purified_statement {
457            PurifiedStatement::PurifiedCreateSource {
458                create_progress_subsource_stmt,
459                create_source_stmt,
460                subsources,
461                available_source_references,
462            } => {
463                self.plan_purified_create_source(
464                    &ctx,
465                    params,
466                    create_progress_subsource_stmt,
467                    create_source_stmt,
468                    subsources,
469                    available_source_references,
470                )
471                .await
472            }
473            PurifiedStatement::PurifiedAlterSourceAddSubsources {
474                source_name,
475                options,
476                subsources,
477            } => {
478                self.plan_purified_alter_source_add_subsource(
479                    ctx.session(),
480                    params,
481                    source_name,
482                    options,
483                    subsources,
484                )
485                .await
486            }
487            PurifiedStatement::PurifiedAlterSourceRefreshReferences {
488                source_name,
489                available_source_references,
490            } => self.plan_purified_alter_source_refresh_references(
491                ctx.session(),
492                params,
493                source_name,
494                available_source_references,
495            ),
496            o @ (PurifiedStatement::PurifiedAlterSource { .. }
497            | PurifiedStatement::PurifiedCreateSink(..)
498            | PurifiedStatement::PurifiedCreateTableFromSource { .. }) => {
499                // Unify these into a `Statement`.
500                let stmt = match o {
501                    PurifiedStatement::PurifiedAlterSource { alter_source_stmt } => {
502                        Statement::AlterSource(alter_source_stmt)
503                    }
504                    PurifiedStatement::PurifiedCreateTableFromSource { stmt } => {
505                        Statement::CreateTableFromSource(stmt)
506                    }
507                    PurifiedStatement::PurifiedCreateSink(stmt) => Statement::CreateSink(stmt),
508                    PurifiedStatement::PurifiedCreateSource { .. }
509                    | PurifiedStatement::PurifiedAlterSourceAddSubsources { .. }
510                    | PurifiedStatement::PurifiedAlterSourceRefreshReferences { .. } => {
511                        unreachable!("not part of exterior match stmt")
512                    }
513                };
514
515                // Determine all dependencies, not just those in the statement
516                // itself.
517                let catalog = self.catalog().for_session(ctx.session());
518                let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &stmt);
519                self.plan_statement(ctx.session(), stmt, &params, &resolved_ids)
520                    .map(|plan| (plan, resolved_ids))
521            }
522        };
523
524        match plan {
525            Ok((plan, resolved_ids)) => self.sequence_plan(ctx, plan, resolved_ids).await,
526            Err(e) => ctx.retire(Err(e)),
527        }
528    }
529
530    #[mz_ore::instrument(level = "debug")]
531    async fn message_create_connection_validation_ready(
532        &mut self,
533        CreateConnectionValidationReady {
534            mut ctx,
535            result,
536            connection_id,
537            connection_gid,
538            mut plan_validity,
539            otel_ctx,
540            resolved_ids,
541        }: CreateConnectionValidationReady,
542    ) {
543        otel_ctx.attach_as_parent();
544
545        // Ensure that all dependencies still exist after validation, as a
546        // `DROP SECRET` may have sneaked in.
547        //
548        // WARNING: If we support `ALTER SECRET`, we'll need to also check
549        // for connectors that were altered while we were purifying.
550        if let Err(e) = plan_validity.check(self.catalog()) {
551            if self.secrets_controller.delete(connection_id).await.is_ok() {
552                self.caching_secrets_reader.invalidate(connection_id);
553            }
554            return ctx.retire(Err(e));
555        }
556
557        let plan = match result {
558            Ok(ok) => ok,
559            Err(e) => {
560                if self.secrets_controller.delete(connection_id).await.is_ok() {
561                    self.caching_secrets_reader.invalidate(connection_id);
562                }
563                return ctx.retire(Err(e));
564            }
565        };
566
567        let result = self
568            .sequence_create_connection_stage_finish(
569                &mut ctx,
570                connection_id,
571                connection_gid,
572                plan,
573                resolved_ids,
574            )
575            .await;
576        ctx.retire(result);
577    }
578
579    #[mz_ore::instrument(level = "debug")]
580    async fn message_alter_connection_validation_ready(
581        &mut self,
582        AlterConnectionValidationReady {
583            mut ctx,
584            result,
585            connection_id,
586            connection_gid: _,
587            mut plan_validity,
588            otel_ctx,
589            resolved_ids: _,
590        }: AlterConnectionValidationReady,
591    ) {
592        otel_ctx.attach_as_parent();
593
594        // Ensure that all dependencies still exist after validation, as a
595        // `DROP SECRET` may have sneaked in.
596        //
597        // WARNING: If we support `ALTER SECRET`, we'll need to also check
598        // for connectors that were altered while we were purifying.
599        if let Err(e) = plan_validity.check(self.catalog()) {
600            return ctx.retire(Err(e));
601        }
602
603        let conn = match result {
604            Ok(ok) => ok,
605            Err(e) => {
606                return ctx.retire(Err(e));
607            }
608        };
609
610        let result = self
611            .sequence_alter_connection_stage_finish(ctx.session_mut(), connection_id, conn)
612            .await;
613        ctx.retire(result);
614    }
615
616    #[mz_ore::instrument(level = "debug")]
617    async fn message_cluster_event(&mut self, event: ClusterEvent) {
618        event!(Level::TRACE, event = format!("{:?}", event));
619
620        if let Some(segment_client) = &self.segment_client {
621            let env_id = &self.catalog().config().environment_id;
622            let mut properties = json!({
623                "cluster_id": event.cluster_id.to_string(),
624                "replica_id": event.replica_id.to_string(),
625                "process_id": event.process_id,
626                "status": event.status.as_kebab_case_str(),
627            });
628            match event.status {
629                ClusterStatus::Online => (),
630                ClusterStatus::Offline(reason) => {
631                    let properties = match &mut properties {
632                        serde_json::Value::Object(map) => map,
633                        _ => unreachable!(),
634                    };
635                    properties.insert(
636                        "reason".into(),
637                        json!(reason.display_or("unknown").to_string()),
638                    );
639                }
640            };
641            segment_client.environment_track(
642                env_id,
643                "Cluster Changed Status",
644                properties,
645                EventDetails {
646                    timestamp: Some(event.time),
647                    ..Default::default()
648                },
649            );
650        }
651
652        // It is possible that we receive a status update for a replica that has
653        // already been dropped from the catalog. Just ignore these events.
654        let Some(replica_statues) = self
655            .cluster_replica_statuses
656            .try_get_cluster_replica_statuses(event.cluster_id, event.replica_id)
657        else {
658            return;
659        };
660
661        if event.status != replica_statues[&event.process_id].status {
662            if !self.controller.read_only() {
663                let offline_reason = match event.status {
664                    ClusterStatus::Online => None,
665                    ClusterStatus::Offline(None) => None,
666                    ClusterStatus::Offline(Some(reason)) => Some(reason.to_string()),
667                };
668                let row = Row::pack_slice(&[
669                    Datum::String(&event.replica_id.to_string()),
670                    Datum::UInt64(event.process_id),
671                    Datum::String(event.status.as_kebab_case_str()),
672                    Datum::from(offline_reason.as_deref()),
673                    Datum::TimestampTz(event.time.try_into().expect("must fit")),
674                ]);
675                self.controller.storage.append_introspection_updates(
676                    IntrospectionType::ReplicaStatusHistory,
677                    vec![(row, Diff::ONE)],
678                );
679            }
680
681            let old_replica_status =
682                ClusterReplicaStatuses::cluster_replica_status(replica_statues);
683
684            let new_process_status = ClusterReplicaProcessStatus {
685                status: event.status,
686                time: event.time,
687            };
688            self.cluster_replica_statuses.ensure_cluster_status(
689                event.cluster_id,
690                event.replica_id,
691                event.process_id,
692                new_process_status,
693            );
694
695            let cluster = self.catalog().get_cluster(event.cluster_id);
696            let replica = cluster.replica(event.replica_id).expect("Replica exists");
697            let new_replica_status = self
698                .cluster_replica_statuses
699                .get_cluster_replica_status(event.cluster_id, event.replica_id);
700
701            if old_replica_status != new_replica_status {
702                let notifier = self.broadcast_notice_tx();
703                let notice = AdapterNotice::ClusterReplicaStatusChanged {
704                    cluster: cluster.name.clone(),
705                    replica: replica.name.clone(),
706                    status: new_replica_status,
707                    time: event.time,
708                };
709                notifier(notice);
710            }
711        }
712    }
713
714    #[mz_ore::instrument(level = "debug")]
715    /// Linearizes sending the results of a read transaction by,
716    ///   1. Holding back any results that were executed at some point in the future, until the
717    ///   containing timeline has advanced to that point in the future.
718    ///   2. Confirming that we are still the current leader before sending results to the client.
719    async fn message_linearize_reads(&mut self) {
720        let mut shortest_wait = Duration::MAX;
721        let mut ready_txns = Vec::new();
722
723        // Cache for `TimestampOracle::read_ts` calls. These are somewhat
724        // expensive so we cache the value. This is correct since all we're
725        // risking is being too conservative. We will not accidentally "release"
726        // a result too early.
727        let mut cached_oracle_ts = BTreeMap::new();
728
729        for (conn_id, mut read_txn) in std::mem::take(&mut self.pending_linearize_read_txns) {
730            if let TimestampContext::TimelineTimestamp {
731                timeline,
732                chosen_ts,
733                oracle_ts,
734            } = read_txn.timestamp_context()
735            {
736                let oracle_ts = match oracle_ts {
737                    Some(oracle_ts) => oracle_ts,
738                    None => {
739                        // There was no oracle timestamp, so no need to delay.
740                        ready_txns.push(read_txn);
741                        continue;
742                    }
743                };
744
745                if chosen_ts <= oracle_ts {
746                    // Chosen ts was already <= the oracle ts, so we're good
747                    // to go!
748                    ready_txns.push(read_txn);
749                    continue;
750                }
751
752                // See what the oracle timestamp is now and delay when needed.
753                let current_oracle_ts = cached_oracle_ts.entry(timeline.clone());
754                let current_oracle_ts = match current_oracle_ts {
755                    btree_map::Entry::Vacant(entry) => {
756                        let timestamp_oracle = self.get_timestamp_oracle(timeline);
757                        let read_ts = timestamp_oracle.read_ts().await;
758                        entry.insert(read_ts.clone());
759                        read_ts
760                    }
761                    btree_map::Entry::Occupied(entry) => entry.get().clone(),
762                };
763
764                if *chosen_ts <= current_oracle_ts {
765                    ready_txns.push(read_txn);
766                } else {
767                    let wait =
768                        Duration::from_millis(chosen_ts.saturating_sub(current_oracle_ts).into());
769                    if wait < shortest_wait {
770                        shortest_wait = wait;
771                    }
772                    read_txn.num_requeues += 1;
773                    self.pending_linearize_read_txns.insert(conn_id, read_txn);
774                }
775            } else {
776                ready_txns.push(read_txn);
777            }
778        }
779
780        if !ready_txns.is_empty() {
781            // Sniff out one ctx, this is where tracing breaks down because we
782            // process all outstanding txns as a batch here.
783            let otel_ctx = ready_txns.first().expect("known to exist").otel_ctx.clone();
784            let span = tracing::debug_span!("message_linearize_reads");
785            otel_ctx.attach_as_parent_to(&span);
786
787            let now = Instant::now();
788            for ready_txn in ready_txns {
789                let span = tracing::debug_span!("retire_read_results");
790                ready_txn.otel_ctx.attach_as_parent_to(&span);
791                let _entered = span.enter();
792                self.metrics
793                    .linearize_message_seconds
794                    .with_label_values(&[
795                        ready_txn.txn.label(),
796                        if ready_txn.num_requeues == 0 {
797                            "true"
798                        } else {
799                            "false"
800                        },
801                    ])
802                    .observe((now - ready_txn.created).as_secs_f64());
803                if let Some((ctx, result)) = ready_txn.txn.finish() {
804                    ctx.retire(result);
805                }
806            }
807        }
808
809        if !self.pending_linearize_read_txns.is_empty() {
810            // Cap wait time to 1s.
811            let remaining_ms = std::cmp::min(shortest_wait, Duration::from_millis(1_000));
812            let internal_cmd_tx = self.internal_cmd_tx.clone();
813            task::spawn(|| "deferred_read_txns", async move {
814                tokio::time::sleep(remaining_ms).await;
815                // It is not an error for this task to be running after `internal_cmd_rx` is dropped.
816                let result = internal_cmd_tx.send(Message::LinearizeReads);
817                if let Err(e) = result {
818                    warn!("internal_cmd_rx dropped before we could send: {:?}", e);
819                }
820            });
821        }
822    }
823}