mz_adapter/coord/
message_handler.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic for processing [`Coordinator`] messages. The [`Coordinator`] receives
11//! messages from various sources (ex: controller, clients, background tasks, etc).
12
13use std::collections::{BTreeMap, BTreeSet, btree_map};
14use std::time::{Duration, Instant};
15
16use futures::FutureExt;
17use maplit::btreemap;
18use mz_catalog::memory::objects::ClusterReplicaProcessStatus;
19use mz_controller::ControllerResponse;
20use mz_controller::clusters::{ClusterEvent, ClusterStatus};
21use mz_ore::instrument;
22use mz_ore::now::EpochMillis;
23use mz_ore::option::OptionExt;
24use mz_ore::tracing::OpenTelemetryContext;
25use mz_ore::{soft_assert_or_log, task};
26use mz_persist_client::usage::ShardsUsageReferenced;
27use mz_repr::{Datum, Diff, Row};
28use mz_sql::ast::Statement;
29use mz_sql::pure::PurifiedStatement;
30use mz_storage_client::controller::IntrospectionType;
31use mz_storage_types::controller::CollectionMetadata;
32use opentelemetry::trace::TraceContextExt;
33use rand::{Rng, SeedableRng, rngs};
34use serde_json::json;
35use tracing::{Instrument, Level, event, info_span, warn};
36use tracing_opentelemetry::OpenTelemetrySpanExt;
37
38use crate::active_compute_sink::{ActiveComputeSink, ActiveComputeSinkRetireReason};
39use crate::catalog::{BuiltinTableUpdate, Op};
40use crate::command::Command;
41use crate::coord::{
42    AlterConnectionValidationReady, ClusterReplicaStatuses, Coordinator,
43    CreateConnectionValidationReady, Message, PurifiedStatementReady, WatchSetResponse,
44};
45use crate::telemetry::{EventDetails, SegmentClientExt};
46use crate::{AdapterNotice, TimestampContext};
47
48impl Coordinator {
49    /// BOXED FUTURE: As of Nov 2023 the returned Future from this function was 74KB. This would
50    /// get stored on the stack which is bad for runtime performance, and blow up our stack usage.
51    /// Because of that we purposefully move Futures of inner function calls onto the heap
52    /// (i.e. Box it).
53    #[instrument]
54    pub(crate) async fn handle_message(&mut self, msg: Message) -> () {
55        match msg {
56            Message::Command(otel_ctx, cmd) => {
57                // TODO: We need a Span that is not none for the otel_ctx to attach the parent
58                // relationship to. If we swap the otel_ctx in `Command::Message` for a Span, we
59                // can downgrade this to a debug_span.
60                let span = tracing::info_span!("message_command").or_current();
61                span.in_scope(|| otel_ctx.attach_as_parent());
62                self.message_command(cmd).instrument(span).await
63            }
64            Message::ControllerReady { controller: _ } => {
65                let Coordinator {
66                    controller,
67                    catalog,
68                    ..
69                } = self;
70                let storage_metadata = catalog.state().storage_metadata();
71                if let Some(m) = controller
72                    .process(storage_metadata)
73                    .expect("`process` never returns an error")
74                {
75                    self.message_controller(m).boxed_local().await
76                }
77            }
78            Message::PurifiedStatementReady(ready) => {
79                self.message_purified_statement_ready(ready)
80                    .boxed_local()
81                    .await
82            }
83            Message::CreateConnectionValidationReady(ready) => {
84                self.message_create_connection_validation_ready(ready)
85                    .boxed_local()
86                    .await
87            }
88            Message::AlterConnectionValidationReady(ready) => {
89                self.message_alter_connection_validation_ready(ready)
90                    .boxed_local()
91                    .await
92            }
93            Message::TryDeferred {
94                conn_id,
95                acquired_lock,
96            } => self.try_deferred(conn_id, acquired_lock).await,
97            Message::GroupCommitInitiate(span, permit) => {
98                // Add an OpenTelemetry link to our current span.
99                tracing::Span::current().add_link(span.context().span().span_context().clone());
100                self.try_group_commit(permit)
101                    .instrument(span)
102                    .boxed_local()
103                    .await
104            }
105            Message::AdvanceTimelines => {
106                self.advance_timelines().boxed_local().await;
107            }
108            Message::ClusterEvent(event) => self.message_cluster_event(event).boxed_local().await,
109            Message::CancelPendingPeeks { conn_id } => {
110                self.cancel_pending_peeks(&conn_id);
111            }
112            Message::LinearizeReads => {
113                self.message_linearize_reads().boxed_local().await;
114            }
115            Message::StagedBatches {
116                conn_id,
117                table_id,
118                batches,
119            } => {
120                self.commit_staged_batches(conn_id, table_id, batches);
121            }
122            Message::StorageUsageSchedule => {
123                self.schedule_storage_usage_collection().boxed_local().await;
124            }
125            Message::StorageUsageFetch => {
126                self.storage_usage_fetch().boxed_local().await;
127            }
128            Message::StorageUsageUpdate(sizes) => {
129                self.storage_usage_update(sizes).boxed_local().await;
130            }
131            Message::StorageUsagePrune(expired) => {
132                self.storage_usage_prune(expired).boxed_local().await;
133            }
134            Message::RetireExecute {
135                otel_ctx,
136                data,
137                reason,
138            } => {
139                otel_ctx.attach_as_parent();
140                self.retire_execution(reason, data);
141            }
142            Message::ExecuteSingleStatementTransaction {
143                ctx,
144                otel_ctx,
145                stmt,
146                params,
147            } => {
148                otel_ctx.attach_as_parent();
149                self.sequence_execute_single_statement_transaction(ctx, stmt, params)
150                    .boxed_local()
151                    .await;
152            }
153            Message::PeekStageReady { ctx, span, stage } => {
154                self.sequence_staged(ctx, span, stage).boxed_local().await;
155            }
156            Message::CreateIndexStageReady { ctx, span, stage } => {
157                self.sequence_staged(ctx, span, stage).boxed_local().await;
158            }
159            Message::CreateViewStageReady { ctx, span, stage } => {
160                self.sequence_staged(ctx, span, stage).boxed_local().await;
161            }
162            Message::CreateMaterializedViewStageReady { ctx, span, stage } => {
163                self.sequence_staged(ctx, span, stage).boxed_local().await;
164            }
165            Message::SubscribeStageReady { ctx, span, stage } => {
166                self.sequence_staged(ctx, span, stage).boxed_local().await;
167            }
168            Message::IntrospectionSubscribeStageReady { span, stage } => {
169                self.sequence_staged((), span, stage).boxed_local().await;
170            }
171            Message::ExplainTimestampStageReady { ctx, span, stage } => {
172                self.sequence_staged(ctx, span, stage).boxed_local().await;
173            }
174            Message::SecretStageReady { ctx, span, stage } => {
175                self.sequence_staged(ctx, span, stage).boxed_local().await;
176            }
177            Message::ClusterStageReady { ctx, span, stage } => {
178                self.sequence_staged(ctx, span, stage).boxed_local().await;
179            }
180            Message::DrainStatementLog => {
181                self.drain_statement_log();
182            }
183            Message::PrivateLinkVpcEndpointEvents(events) => {
184                if !self.controller.read_only() {
185                    self.controller
186                            .storage
187                            .append_introspection_updates(
188                                mz_storage_client::controller::IntrospectionType::PrivatelinkConnectionStatusHistory,
189                                events
190                                    .into_iter()
191                                    .map(|e| (mz_repr::Row::from(e), Diff::ONE))
192                                    .collect(),
193                            );
194                }
195            }
196            Message::CheckSchedulingPolicies => {
197                self.check_scheduling_policies().boxed_local().await;
198            }
199            Message::SchedulingDecisions(decisions) => {
200                self.handle_scheduling_decisions(decisions)
201                    .boxed_local()
202                    .await;
203            }
204            Message::DeferredStatementReady => {
205                self.handle_deferred_statement().boxed_local().await;
206            }
207        }
208    }
209
210    #[mz_ore::instrument(level = "debug")]
211    pub async fn storage_usage_fetch(&mut self) {
212        let internal_cmd_tx = self.internal_cmd_tx.clone();
213        let client = self.storage_usage_client.clone();
214
215        // Record the currently live shards.
216        let live_shards: BTreeSet<_> = self
217            .controller
218            .storage
219            .active_collection_metadatas()
220            .into_iter()
221            .flat_map(|(_id, collection_metadata)| {
222                let CollectionMetadata {
223                    data_shard,
224                    remap_shard,
225                    // No wildcards, to improve the odds that the addition of a
226                    // new shard type results in a compiler error here.
227                    //
228                    // ATTENTION: If you add a new type of shard that is
229                    // associated with a collection, almost surely you should
230                    // return it below, so that its usage is recorded in the
231                    // `mz_storage_usage_by_shard` table.
232                    persist_location: _,
233                    relation_desc: _,
234                    txns_shard: _,
235                } = collection_metadata;
236                [remap_shard, Some(data_shard)].into_iter()
237            })
238            .filter_map(|shard| shard)
239            .collect();
240
241        let collection_metric = self
242            .metrics
243            .storage_usage_collection_time_seconds
244            .with_label_values(&[]);
245
246        // Spawn an asynchronous task to compute the storage usage, which
247        // requires a slow scan of the underlying storage engine.
248        task::spawn(|| "storage_usage_fetch", async move {
249            let collection_metric_timer = collection_metric.start_timer();
250            let shard_sizes = client.shards_usage_referenced(live_shards).await;
251            collection_metric_timer.observe_duration();
252
253            // It is not an error for shard sizes to become ready after
254            // `internal_cmd_rx` is dropped.
255            if let Err(e) = internal_cmd_tx.send(Message::StorageUsageUpdate(shard_sizes)) {
256                warn!("internal_cmd_rx dropped before we could send: {:?}", e);
257            }
258        });
259    }
260
261    #[mz_ore::instrument(level = "debug")]
262    async fn storage_usage_update(&mut self, shards_usage: ShardsUsageReferenced) {
263        // Similar to audit events, use the oracle ts so this is guaranteed to
264        // increase. This is intentionally the timestamp of when collection
265        // finished, not when it started, so that we don't write data with a
266        // timestamp in the past.
267        let collection_timestamp = if self.controller.read_only() {
268            self.peek_local_write_ts().await.into()
269        } else {
270            // Getting a write timestamp bumps the write timestamp in the
271            // oracle, which we're not allowed in read-only mode.
272            self.get_local_write_ts().await.timestamp.into()
273        };
274
275        let ops = shards_usage
276            .by_shard
277            .into_iter()
278            .map(|(shard_id, shard_usage)| Op::WeirdStorageUsageUpdates {
279                object_id: Some(shard_id.to_string()),
280                size_bytes: shard_usage.size_bytes(),
281                collection_timestamp,
282            })
283            .collect();
284
285        match self.catalog_transact_inner(None, ops).await {
286            Ok(table_updates) => {
287                let internal_cmd_tx = self.internal_cmd_tx.clone();
288                let task_span =
289                    info_span!(parent: None, "coord::storage_usage_update::table_updates");
290                OpenTelemetryContext::obtain().attach_as_parent_to(&task_span);
291                task::spawn(|| "storage_usage_update_table_updates", async move {
292                    table_updates.instrument(task_span).await;
293                    // It is not an error for this task to be running after `internal_cmd_rx` is dropped.
294                    if let Err(e) = internal_cmd_tx.send(Message::StorageUsageSchedule) {
295                        warn!("internal_cmd_rx dropped before we could send: {e:?}");
296                    }
297                });
298            }
299            Err(err) => tracing::warn!("Failed to update storage metrics: {:?}", err),
300        }
301    }
302
303    #[mz_ore::instrument(level = "debug")]
304    async fn storage_usage_prune(&mut self, expired: Vec<BuiltinTableUpdate>) {
305        let (fut, _) = self.builtin_table_update().execute(expired).await;
306        task::spawn(|| "storage_usage_pruning_apply", async move {
307            fut.await;
308        });
309    }
310
311    pub async fn schedule_storage_usage_collection(&self) {
312        // Instead of using an `tokio::timer::Interval`, we calculate the time until the next
313        // usage collection and wait for that amount of time. This is so we can keep the intervals
314        // consistent even across restarts. If collection takes too long, it is possible that
315        // we miss an interval.
316
317        // 1) Deterministically pick some offset within the collection interval to prevent
318        // thundering herds across environments.
319        const SEED_LEN: usize = 32;
320        let mut seed = [0; SEED_LEN];
321        for (i, byte) in self
322            .catalog()
323            .state()
324            .config()
325            .environment_id
326            .organization_id()
327            .as_bytes()
328            .into_iter()
329            .take(SEED_LEN)
330            .enumerate()
331        {
332            seed[i] = *byte;
333        }
334        let storage_usage_collection_interval_ms: EpochMillis =
335            EpochMillis::try_from(self.storage_usage_collection_interval.as_millis())
336                .expect("storage usage collection interval must fit into u64");
337        let offset =
338            rngs::SmallRng::from_seed(seed).gen_range(0..storage_usage_collection_interval_ms);
339        let now_ts: EpochMillis = self.peek_local_write_ts().await.into();
340
341        // 2) Determine the amount of ms between now and the next collection time.
342        let previous_collection_ts =
343            (now_ts - (now_ts % storage_usage_collection_interval_ms)) + offset;
344        let next_collection_ts = if previous_collection_ts > now_ts {
345            previous_collection_ts
346        } else {
347            previous_collection_ts + storage_usage_collection_interval_ms
348        };
349        let next_collection_interval = Duration::from_millis(next_collection_ts - now_ts);
350
351        // 3) Sleep for that amount of time, then initiate another storage usage collection.
352        let internal_cmd_tx = self.internal_cmd_tx.clone();
353        task::spawn(|| "storage_usage_collection", async move {
354            tokio::time::sleep(next_collection_interval).await;
355            if internal_cmd_tx.send(Message::StorageUsageFetch).is_err() {
356                // If sending fails, the main thread has shutdown.
357            }
358        });
359    }
360
361    #[mz_ore::instrument(level = "debug")]
362    async fn message_command(&mut self, cmd: Command) {
363        self.handle_command(cmd).await;
364    }
365
366    #[mz_ore::instrument(level = "debug")]
367    async fn message_controller(&mut self, message: ControllerResponse) {
368        event!(Level::TRACE, message = format!("{:?}", message));
369        match message {
370            ControllerResponse::PeekNotification(uuid, response, otel_ctx) => {
371                self.handle_peek_notification(uuid, response, otel_ctx);
372            }
373            ControllerResponse::SubscribeResponse(sink_id, response) => {
374                if let Some(ActiveComputeSink::Subscribe(active_subscribe)) =
375                    self.active_compute_sinks.get_mut(&sink_id)
376                {
377                    let finished = active_subscribe.process_response(response);
378                    if finished {
379                        self.retire_compute_sinks(btreemap! {
380                            sink_id => ActiveComputeSinkRetireReason::Finished,
381                        })
382                        .await;
383                    }
384
385                    soft_assert_or_log!(
386                        !self.introspection_subscribes.contains_key(&sink_id),
387                        "`sink_id` {sink_id} unexpectedly found in both `active_subscribes` \
388                         and `introspection_subscribes`",
389                    );
390                } else if self.introspection_subscribes.contains_key(&sink_id) {
391                    self.handle_introspection_subscribe_batch(sink_id, response)
392                        .await;
393                } else {
394                    // Cancellation may cause us to receive responses for subscribes no longer
395                    // tracked, so we quietly ignore them.
396                }
397            }
398            ControllerResponse::CopyToResponse(sink_id, response) => {
399                match self.drop_compute_sink(sink_id).await {
400                    Some(ActiveComputeSink::CopyTo(active_copy_to)) => {
401                        active_copy_to.retire_with_response(response);
402                    }
403                    _ => {
404                        // Cancellation may cause us to receive responses for subscribes no longer
405                        // tracked, so we quietly ignore them.
406                    }
407                }
408            }
409            ControllerResponse::WatchSetFinished(ws_ids) => {
410                let now = self.now();
411                for ws_id in ws_ids {
412                    let Some((conn_id, rsp)) = self.installed_watch_sets.remove(&ws_id) else {
413                        continue;
414                    };
415                    self.connection_watch_sets
416                        .get_mut(&conn_id)
417                        .expect("corrupted coordinator state: unknown connection id")
418                        .remove(&ws_id);
419                    if self.connection_watch_sets[&conn_id].is_empty() {
420                        self.connection_watch_sets.remove(&conn_id);
421                    }
422
423                    match rsp {
424                        WatchSetResponse::StatementDependenciesReady(id, ev) => {
425                            self.record_statement_lifecycle_event(&id, &ev, now);
426                        }
427                        WatchSetResponse::AlterSinkReady(ctx) => {
428                            self.sequence_alter_sink_finish(ctx).await;
429                        }
430                    }
431                }
432            }
433        }
434    }
435
436    #[mz_ore::instrument(level = "debug")]
437    async fn message_purified_statement_ready(
438        &mut self,
439        PurifiedStatementReady {
440            ctx,
441            result,
442            params,
443            mut plan_validity,
444            original_stmt,
445            otel_ctx,
446        }: PurifiedStatementReady,
447    ) {
448        otel_ctx.attach_as_parent();
449
450        // Ensure that all dependencies still exist after purification, as a
451        // `DROP CONNECTION` or other `DROP` may have sneaked in. If any have gone missing, we
452        // repurify the original statement. This will either produce a nice
453        // "unknown connector" error, or pick up a new connector that has
454        // replaced the dropped connector.
455        //
456        // n.b. an `ALTER CONNECTION` occurring during purification is OK
457        // because we always look up/populate a connection's state after
458        // committing to the catalog, so are guaranteed to see the connection's
459        // most recent version.
460        if plan_validity.check(self.catalog()).is_err() {
461            self.handle_execute_inner(original_stmt, params, ctx).await;
462            return;
463        }
464
465        let purified_statement = match result {
466            Ok(ok) => ok,
467            Err(e) => return ctx.retire(Err(e)),
468        };
469
470        let plan = match purified_statement {
471            PurifiedStatement::PurifiedCreateSource {
472                create_progress_subsource_stmt,
473                create_source_stmt,
474                subsources,
475                available_source_references,
476            } => {
477                self.plan_purified_create_source(
478                    &ctx,
479                    params,
480                    create_progress_subsource_stmt,
481                    create_source_stmt,
482                    subsources,
483                    available_source_references,
484                )
485                .await
486            }
487            PurifiedStatement::PurifiedAlterSourceAddSubsources {
488                source_name,
489                options,
490                subsources,
491            } => {
492                self.plan_purified_alter_source_add_subsource(
493                    ctx.session(),
494                    params,
495                    source_name,
496                    options,
497                    subsources,
498                )
499                .await
500            }
501            PurifiedStatement::PurifiedAlterSourceRefreshReferences {
502                source_name,
503                available_source_references,
504            } => self.plan_purified_alter_source_refresh_references(
505                ctx.session(),
506                params,
507                source_name,
508                available_source_references,
509            ),
510            o @ (PurifiedStatement::PurifiedAlterSource { .. }
511            | PurifiedStatement::PurifiedCreateSink(..)
512            | PurifiedStatement::PurifiedCreateTableFromSource { .. }) => {
513                // Unify these into a `Statement`.
514                let stmt = match o {
515                    PurifiedStatement::PurifiedAlterSource { alter_source_stmt } => {
516                        Statement::AlterSource(alter_source_stmt)
517                    }
518                    PurifiedStatement::PurifiedCreateTableFromSource { stmt } => {
519                        Statement::CreateTableFromSource(stmt)
520                    }
521                    PurifiedStatement::PurifiedCreateSink(stmt) => Statement::CreateSink(stmt),
522                    PurifiedStatement::PurifiedCreateSource { .. }
523                    | PurifiedStatement::PurifiedAlterSourceAddSubsources { .. }
524                    | PurifiedStatement::PurifiedAlterSourceRefreshReferences { .. } => {
525                        unreachable!("not part of exterior match stmt")
526                    }
527                };
528
529                // Determine all dependencies, not just those in the statement
530                // itself.
531                let catalog = self.catalog().for_session(ctx.session());
532                let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &stmt);
533                self.plan_statement(ctx.session(), stmt, &params, &resolved_ids)
534                    .map(|plan| (plan, resolved_ids))
535            }
536        };
537
538        match plan {
539            Ok((plan, resolved_ids)) => self.sequence_plan(ctx, plan, resolved_ids).await,
540            Err(e) => ctx.retire(Err(e)),
541        }
542    }
543
544    #[mz_ore::instrument(level = "debug")]
545    async fn message_create_connection_validation_ready(
546        &mut self,
547        CreateConnectionValidationReady {
548            mut ctx,
549            result,
550            connection_id,
551            connection_gid,
552            mut plan_validity,
553            otel_ctx,
554            resolved_ids,
555        }: CreateConnectionValidationReady,
556    ) {
557        otel_ctx.attach_as_parent();
558
559        // Ensure that all dependencies still exist after validation, as a
560        // `DROP SECRET` may have sneaked in.
561        //
562        // WARNING: If we support `ALTER SECRET`, we'll need to also check
563        // for connectors that were altered while we were purifying.
564        if let Err(e) = plan_validity.check(self.catalog()) {
565            let _ = self.secrets_controller.delete(connection_id).await;
566            return ctx.retire(Err(e));
567        }
568
569        let plan = match result {
570            Ok(ok) => ok,
571            Err(e) => {
572                let _ = self.secrets_controller.delete(connection_id).await;
573                return ctx.retire(Err(e));
574            }
575        };
576
577        let result = self
578            .sequence_create_connection_stage_finish(
579                &mut ctx,
580                connection_id,
581                connection_gid,
582                plan,
583                resolved_ids,
584            )
585            .await;
586        ctx.retire(result);
587    }
588
589    #[mz_ore::instrument(level = "debug")]
590    async fn message_alter_connection_validation_ready(
591        &mut self,
592        AlterConnectionValidationReady {
593            mut ctx,
594            result,
595            connection_id,
596            connection_gid: _,
597            mut plan_validity,
598            otel_ctx,
599            resolved_ids: _,
600        }: AlterConnectionValidationReady,
601    ) {
602        otel_ctx.attach_as_parent();
603
604        // Ensure that all dependencies still exist after validation, as a
605        // `DROP SECRET` may have sneaked in.
606        //
607        // WARNING: If we support `ALTER SECRET`, we'll need to also check
608        // for connectors that were altered while we were purifying.
609        if let Err(e) = plan_validity.check(self.catalog()) {
610            return ctx.retire(Err(e));
611        }
612
613        let conn = match result {
614            Ok(ok) => ok,
615            Err(e) => {
616                return ctx.retire(Err(e));
617            }
618        };
619
620        let result = self
621            .sequence_alter_connection_stage_finish(ctx.session_mut(), connection_id, conn)
622            .await;
623        ctx.retire(result);
624    }
625
626    #[mz_ore::instrument(level = "debug")]
627    async fn message_cluster_event(&mut self, event: ClusterEvent) {
628        event!(Level::TRACE, event = format!("{:?}", event));
629
630        if let Some(segment_client) = &self.segment_client {
631            let env_id = &self.catalog().config().environment_id;
632            let mut properties = json!({
633                "cluster_id": event.cluster_id.to_string(),
634                "replica_id": event.replica_id.to_string(),
635                "process_id": event.process_id,
636                "status": event.status.as_kebab_case_str(),
637            });
638            match event.status {
639                ClusterStatus::Online => (),
640                ClusterStatus::Offline(reason) => {
641                    let properties = match &mut properties {
642                        serde_json::Value::Object(map) => map,
643                        _ => unreachable!(),
644                    };
645                    properties.insert(
646                        "reason".into(),
647                        json!(reason.display_or("unknown").to_string()),
648                    );
649                }
650            };
651            segment_client.environment_track(
652                env_id,
653                "Cluster Changed Status",
654                properties,
655                EventDetails {
656                    timestamp: Some(event.time),
657                    ..Default::default()
658                },
659            );
660        }
661
662        // It is possible that we receive a status update for a replica that has
663        // already been dropped from the catalog. Just ignore these events.
664        let Some(replica_statues) = self
665            .cluster_replica_statuses
666            .try_get_cluster_replica_statuses(event.cluster_id, event.replica_id)
667        else {
668            return;
669        };
670
671        if event.status != replica_statues[&event.process_id].status {
672            if !self.controller.read_only() {
673                let offline_reason = match event.status {
674                    ClusterStatus::Online => None,
675                    ClusterStatus::Offline(None) => None,
676                    ClusterStatus::Offline(Some(reason)) => Some(reason.to_string()),
677                };
678                let row = Row::pack_slice(&[
679                    Datum::String(&event.replica_id.to_string()),
680                    Datum::UInt64(event.process_id),
681                    Datum::String(event.status.as_kebab_case_str()),
682                    Datum::from(offline_reason.as_deref()),
683                    Datum::TimestampTz(event.time.try_into().expect("must fit")),
684                ]);
685                self.controller.storage.append_introspection_updates(
686                    IntrospectionType::ReplicaStatusHistory,
687                    vec![(row, Diff::ONE)],
688                );
689            }
690
691            let old_replica_status =
692                ClusterReplicaStatuses::cluster_replica_status(replica_statues);
693
694            let new_process_status = ClusterReplicaProcessStatus {
695                status: event.status,
696                time: event.time,
697            };
698            self.cluster_replica_statuses.ensure_cluster_status(
699                event.cluster_id,
700                event.replica_id,
701                event.process_id,
702                new_process_status,
703            );
704
705            let cluster = self.catalog().get_cluster(event.cluster_id);
706            let replica = cluster.replica(event.replica_id).expect("Replica exists");
707            let new_replica_status = self
708                .cluster_replica_statuses
709                .get_cluster_replica_status(event.cluster_id, event.replica_id);
710
711            if old_replica_status != new_replica_status {
712                let notifier = self.broadcast_notice_tx();
713                let notice = AdapterNotice::ClusterReplicaStatusChanged {
714                    cluster: cluster.name.clone(),
715                    replica: replica.name.clone(),
716                    status: new_replica_status,
717                    time: event.time,
718                };
719                notifier(notice);
720            }
721        }
722    }
723
724    #[mz_ore::instrument(level = "debug")]
725    /// Linearizes sending the results of a read transaction by,
726    ///   1. Holding back any results that were executed at some point in the future, until the
727    ///   containing timeline has advanced to that point in the future.
728    ///   2. Confirming that we are still the current leader before sending results to the client.
729    async fn message_linearize_reads(&mut self) {
730        let mut shortest_wait = Duration::MAX;
731        let mut ready_txns = Vec::new();
732
733        // Cache for `TimestampOracle::read_ts` calls. These are somewhat
734        // expensive so we cache the value. This is correct since all we're
735        // risking is being too conservative. We will not accidentally "release"
736        // a result too early.
737        let mut cached_oracle_ts = BTreeMap::new();
738
739        for (conn_id, mut read_txn) in std::mem::take(&mut self.pending_linearize_read_txns) {
740            if let TimestampContext::TimelineTimestamp {
741                timeline,
742                chosen_ts,
743                oracle_ts,
744            } = read_txn.timestamp_context()
745            {
746                let oracle_ts = match oracle_ts {
747                    Some(oracle_ts) => oracle_ts,
748                    None => {
749                        // There was no oracle timestamp, so no need to delay.
750                        ready_txns.push(read_txn);
751                        continue;
752                    }
753                };
754
755                if chosen_ts <= oracle_ts {
756                    // Chosen ts was already <= the oracle ts, so we're good
757                    // to go!
758                    ready_txns.push(read_txn);
759                    continue;
760                }
761
762                // See what the oracle timestamp is now and delay when needed.
763                let current_oracle_ts = cached_oracle_ts.entry(timeline.clone());
764                let current_oracle_ts = match current_oracle_ts {
765                    btree_map::Entry::Vacant(entry) => {
766                        let timestamp_oracle = self.get_timestamp_oracle(timeline);
767                        let read_ts = timestamp_oracle.read_ts().await;
768                        entry.insert(read_ts.clone());
769                        read_ts
770                    }
771                    btree_map::Entry::Occupied(entry) => entry.get().clone(),
772                };
773
774                if *chosen_ts <= current_oracle_ts {
775                    ready_txns.push(read_txn);
776                } else {
777                    let wait =
778                        Duration::from_millis(chosen_ts.saturating_sub(current_oracle_ts).into());
779                    if wait < shortest_wait {
780                        shortest_wait = wait;
781                    }
782                    read_txn.num_requeues += 1;
783                    self.pending_linearize_read_txns.insert(conn_id, read_txn);
784                }
785            } else {
786                ready_txns.push(read_txn);
787            }
788        }
789
790        if !ready_txns.is_empty() {
791            // Sniff out one ctx, this is where tracing breaks down because we
792            // process all outstanding txns as a batch here.
793            let otel_ctx = ready_txns.first().expect("known to exist").otel_ctx.clone();
794            let span = tracing::debug_span!("message_linearize_reads");
795            otel_ctx.attach_as_parent_to(&span);
796
797            let now = Instant::now();
798            for ready_txn in ready_txns {
799                let span = tracing::debug_span!("retire_read_results");
800                ready_txn.otel_ctx.attach_as_parent_to(&span);
801                let _entered = span.enter();
802                self.metrics
803                    .linearize_message_seconds
804                    .with_label_values(&[
805                        ready_txn.txn.label(),
806                        if ready_txn.num_requeues == 0 {
807                            "true"
808                        } else {
809                            "false"
810                        },
811                    ])
812                    .observe((now - ready_txn.created).as_secs_f64());
813                if let Some((ctx, result)) = ready_txn.txn.finish() {
814                    ctx.retire(result);
815                }
816            }
817        }
818
819        if !self.pending_linearize_read_txns.is_empty() {
820            // Cap wait time to 1s.
821            let remaining_ms = std::cmp::min(shortest_wait, Duration::from_millis(1_000));
822            let internal_cmd_tx = self.internal_cmd_tx.clone();
823            task::spawn(|| "deferred_read_txns", async move {
824                tokio::time::sleep(remaining_ms).await;
825                // It is not an error for this task to be running after `internal_cmd_rx` is dropped.
826                let result = internal_cmd_tx.send(Message::LinearizeReads);
827                if let Err(e) = result {
828                    warn!("internal_cmd_rx dropped before we could send: {:?}", e);
829                }
830            });
831        }
832    }
833}