mz_adapter/coord/
message_handler.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic for processing [`Coordinator`] messages. The [`Coordinator`] receives
11//! messages from various sources (ex: controller, clients, background tasks, etc).
12
13use std::collections::{BTreeMap, BTreeSet, btree_map};
14use std::time::{Duration, Instant};
15
16use futures::FutureExt;
17use maplit::btreemap;
18use mz_catalog::memory::objects::ClusterReplicaProcessStatus;
19use mz_controller::ControllerResponse;
20use mz_controller::clusters::{ClusterEvent, ClusterStatus};
21use mz_ore::instrument;
22use mz_ore::now::EpochMillis;
23use mz_ore::option::OptionExt;
24use mz_ore::tracing::OpenTelemetryContext;
25use mz_ore::{soft_assert_or_log, task};
26use mz_persist_client::usage::ShardsUsageReferenced;
27use mz_repr::{Datum, Diff, Row};
28use mz_sql::ast::Statement;
29use mz_sql::pure::PurifiedStatement;
30use mz_storage_client::controller::IntrospectionType;
31use mz_storage_types::controller::CollectionMetadata;
32use opentelemetry::trace::TraceContextExt;
33use rand::{Rng, SeedableRng, rngs};
34use serde_json::json;
35use tracing::{Instrument, Level, event, info_span, warn};
36use tracing_opentelemetry::OpenTelemetrySpanExt;
37
38use crate::active_compute_sink::{ActiveComputeSink, ActiveComputeSinkRetireReason};
39use crate::catalog::{BuiltinTableUpdate, Op};
40use crate::command::Command;
41use crate::coord::{
42    AlterConnectionValidationReady, ClusterReplicaStatuses, Coordinator,
43    CreateConnectionValidationReady, Message, PurifiedStatementReady, WatchSetResponse,
44};
45use crate::telemetry::{EventDetails, SegmentClientExt};
46use crate::{AdapterNotice, TimestampContext};
47
48impl Coordinator {
49    /// BOXED FUTURE: As of Nov 2023 the returned Future from this function was 74KB. This would
50    /// get stored on the stack which is bad for runtime performance, and blow up our stack usage.
51    /// Because of that we purposefully move Futures of inner function calls onto the heap
52    /// (i.e. Box it).
53    #[instrument]
54    pub(crate) async fn handle_message(&mut self, msg: Message) -> () {
55        match msg {
56            Message::Command(otel_ctx, cmd) => {
57                // TODO: We need a Span that is not none for the otel_ctx to attach the parent
58                // relationship to. If we swap the otel_ctx in `Command::Message` for a Span, we
59                // can downgrade this to a debug_span.
60                let span = tracing::info_span!("message_command").or_current();
61                span.in_scope(|| otel_ctx.attach_as_parent());
62                self.message_command(cmd).instrument(span).await
63            }
64            Message::ControllerReady => {
65                let Coordinator {
66                    controller,
67                    catalog,
68                    ..
69                } = self;
70                let storage_metadata = catalog.state().storage_metadata();
71                if let Some(m) = controller
72                    .process(storage_metadata)
73                    .expect("`process` never returns an error")
74                {
75                    self.message_controller(m).boxed_local().await
76                }
77            }
78            Message::PurifiedStatementReady(ready) => {
79                self.message_purified_statement_ready(ready)
80                    .boxed_local()
81                    .await
82            }
83            Message::CreateConnectionValidationReady(ready) => {
84                self.message_create_connection_validation_ready(ready)
85                    .boxed_local()
86                    .await
87            }
88            Message::AlterConnectionValidationReady(ready) => {
89                self.message_alter_connection_validation_ready(ready)
90                    .boxed_local()
91                    .await
92            }
93            Message::TryDeferred {
94                conn_id,
95                acquired_lock,
96            } => self.try_deferred(conn_id, acquired_lock).await,
97            Message::GroupCommitInitiate(span, permit) => {
98                // Add an OpenTelemetry link to our current span.
99                tracing::Span::current().add_link(span.context().span().span_context().clone());
100                self.try_group_commit(permit)
101                    .instrument(span)
102                    .boxed_local()
103                    .await
104            }
105            Message::AdvanceTimelines => {
106                self.advance_timelines().boxed_local().await;
107            }
108            Message::ClusterEvent(event) => self.message_cluster_event(event).boxed_local().await,
109            Message::CancelPendingPeeks { conn_id } => {
110                self.cancel_pending_peeks(&conn_id);
111            }
112            Message::LinearizeReads => {
113                self.message_linearize_reads().boxed_local().await;
114            }
115            Message::StagedBatches {
116                conn_id,
117                table_id,
118                batches,
119            } => {
120                self.commit_staged_batches(conn_id, table_id, batches);
121            }
122            Message::StorageUsageSchedule => {
123                self.schedule_storage_usage_collection().boxed_local().await;
124            }
125            Message::StorageUsageFetch => {
126                self.storage_usage_fetch().boxed_local().await;
127            }
128            Message::StorageUsageUpdate(sizes) => {
129                self.storage_usage_update(sizes).boxed_local().await;
130            }
131            Message::StorageUsagePrune(expired) => {
132                self.storage_usage_prune(expired).boxed_local().await;
133            }
134            Message::RetireExecute {
135                otel_ctx,
136                data,
137                reason,
138            } => {
139                otel_ctx.attach_as_parent();
140                self.retire_execution(reason, data);
141            }
142            Message::ExecuteSingleStatementTransaction {
143                ctx,
144                otel_ctx,
145                stmt,
146                params,
147            } => {
148                otel_ctx.attach_as_parent();
149                self.sequence_execute_single_statement_transaction(ctx, stmt, params)
150                    .boxed_local()
151                    .await;
152            }
153            Message::PeekStageReady { ctx, span, stage } => {
154                self.sequence_staged(ctx, span, stage).boxed_local().await;
155            }
156            Message::CreateIndexStageReady { ctx, span, stage } => {
157                self.sequence_staged(ctx, span, stage).boxed_local().await;
158            }
159            Message::CreateViewStageReady { ctx, span, stage } => {
160                self.sequence_staged(ctx, span, stage).boxed_local().await;
161            }
162            Message::CreateMaterializedViewStageReady { ctx, span, stage } => {
163                self.sequence_staged(ctx, span, stage).boxed_local().await;
164            }
165            Message::SubscribeStageReady { ctx, span, stage } => {
166                self.sequence_staged(ctx, span, stage).boxed_local().await;
167            }
168            Message::IntrospectionSubscribeStageReady { span, stage } => {
169                self.sequence_staged((), span, stage).boxed_local().await;
170            }
171            Message::ExplainTimestampStageReady { ctx, span, stage } => {
172                self.sequence_staged(ctx, span, stage).boxed_local().await;
173            }
174            Message::SecretStageReady { ctx, span, stage } => {
175                self.sequence_staged(ctx, span, stage).boxed_local().await;
176            }
177            Message::ClusterStageReady { ctx, span, stage } => {
178                self.sequence_staged(ctx, span, stage).boxed_local().await;
179            }
180            Message::DrainStatementLog => {
181                self.drain_statement_log();
182            }
183            Message::PrivateLinkVpcEndpointEvents(events) => {
184                if !self.controller.read_only() {
185                    self.controller
186                            .storage
187                            .append_introspection_updates(
188                                mz_storage_client::controller::IntrospectionType::PrivatelinkConnectionStatusHistory,
189                                events
190                                    .into_iter()
191                                    .map(|e| (mz_repr::Row::from(e), Diff::ONE))
192                                    .collect(),
193                            );
194                }
195            }
196            Message::CheckSchedulingPolicies => {
197                self.check_scheduling_policies().boxed_local().await;
198            }
199            Message::SchedulingDecisions(decisions) => {
200                self.handle_scheduling_decisions(decisions)
201                    .boxed_local()
202                    .await;
203            }
204            Message::DeferredStatementReady => {
205                self.handle_deferred_statement().boxed_local().await;
206            }
207        }
208    }
209
210    #[mz_ore::instrument(level = "debug")]
211    pub async fn storage_usage_fetch(&mut self) {
212        let internal_cmd_tx = self.internal_cmd_tx.clone();
213        let client = self.storage_usage_client.clone();
214
215        // Record the currently live shards.
216        let live_shards: BTreeSet<_> = self
217            .controller
218            .storage
219            .active_collection_metadatas()
220            .into_iter()
221            .flat_map(|(_id, collection_metadata)| {
222                let CollectionMetadata {
223                    data_shard,
224                    remap_shard,
225                    // No wildcards, to improve the odds that the addition of a
226                    // new shard type results in a compiler error here.
227                    //
228                    // ATTENTION: If you add a new type of shard that is
229                    // associated with a collection, almost surely you should
230                    // return it below, so that its usage is recorded in the
231                    // `mz_storage_usage_by_shard` table.
232                    persist_location: _,
233                    relation_desc: _,
234                    txns_shard: _,
235                } = collection_metadata;
236                [remap_shard, Some(data_shard)].into_iter()
237            })
238            .filter_map(|shard| shard)
239            .collect();
240
241        let collection_metric = self
242            .metrics
243            .storage_usage_collection_time_seconds
244            .with_label_values(&[]);
245
246        // Spawn an asynchronous task to compute the storage usage, which
247        // requires a slow scan of the underlying storage engine.
248        task::spawn(|| "storage_usage_fetch", async move {
249            let collection_metric_timer = collection_metric.start_timer();
250            let shard_sizes = client.shards_usage_referenced(live_shards).await;
251            collection_metric_timer.observe_duration();
252
253            // It is not an error for shard sizes to become ready after
254            // `internal_cmd_rx` is dropped.
255            if let Err(e) = internal_cmd_tx.send(Message::StorageUsageUpdate(shard_sizes)) {
256                warn!("internal_cmd_rx dropped before we could send: {:?}", e);
257            }
258        });
259    }
260
261    #[mz_ore::instrument(level = "debug")]
262    async fn storage_usage_update(&mut self, shards_usage: ShardsUsageReferenced) {
263        // Similar to audit events, use the oracle ts so this is guaranteed to
264        // increase. This is intentionally the timestamp of when collection
265        // finished, not when it started, so that we don't write data with a
266        // timestamp in the past.
267        let collection_timestamp = if self.controller.read_only() {
268            self.peek_local_write_ts().await.into()
269        } else {
270            // Getting a write timestamp bumps the write timestamp in the
271            // oracle, which we're not allowed in read-only mode.
272            self.get_local_write_ts().await.timestamp.into()
273        };
274
275        let ops = shards_usage
276            .by_shard
277            .into_iter()
278            .map(|(shard_id, shard_usage)| Op::WeirdStorageUsageUpdates {
279                object_id: Some(shard_id.to_string()),
280                size_bytes: shard_usage.size_bytes(),
281                collection_timestamp,
282            })
283            .collect();
284
285        match self.catalog_transact_inner(None, ops).await {
286            Ok(table_updates) => {
287                let internal_cmd_tx = self.internal_cmd_tx.clone();
288                let task_span =
289                    info_span!(parent: None, "coord::storage_usage_update::table_updates");
290                OpenTelemetryContext::obtain().attach_as_parent_to(&task_span);
291                task::spawn(|| "storage_usage_update_table_updates", async move {
292                    table_updates.instrument(task_span).await;
293                    // It is not an error for this task to be running after `internal_cmd_rx` is dropped.
294                    if let Err(e) = internal_cmd_tx.send(Message::StorageUsageSchedule) {
295                        warn!("internal_cmd_rx dropped before we could send: {e:?}");
296                    }
297                });
298            }
299            Err(err) => tracing::warn!("Failed to update storage metrics: {:?}", err),
300        }
301    }
302
303    #[mz_ore::instrument(level = "debug")]
304    async fn storage_usage_prune(&mut self, expired: Vec<BuiltinTableUpdate>) {
305        let (fut, _) = self.builtin_table_update().execute(expired).await;
306        task::spawn(|| "storage_usage_pruning_apply", async move {
307            fut.await;
308        });
309    }
310
311    pub async fn schedule_storage_usage_collection(&self) {
312        // Instead of using an `tokio::timer::Interval`, we calculate the time until the next
313        // usage collection and wait for that amount of time. This is so we can keep the intervals
314        // consistent even across restarts. If collection takes too long, it is possible that
315        // we miss an interval.
316
317        // 1) Deterministically pick some offset within the collection interval to prevent
318        // thundering herds across environments.
319        const SEED_LEN: usize = 32;
320        let mut seed = [0; SEED_LEN];
321        for (i, byte) in self
322            .catalog()
323            .state()
324            .config()
325            .environment_id
326            .organization_id()
327            .as_bytes()
328            .into_iter()
329            .take(SEED_LEN)
330            .enumerate()
331        {
332            seed[i] = *byte;
333        }
334        let storage_usage_collection_interval_ms: EpochMillis =
335            EpochMillis::try_from(self.storage_usage_collection_interval.as_millis())
336                .expect("storage usage collection interval must fit into u64");
337        let offset =
338            rngs::SmallRng::from_seed(seed).gen_range(0..storage_usage_collection_interval_ms);
339        let now_ts: EpochMillis = self.peek_local_write_ts().await.into();
340
341        // 2) Determine the amount of ms between now and the next collection time.
342        let previous_collection_ts =
343            (now_ts - (now_ts % storage_usage_collection_interval_ms)) + offset;
344        let next_collection_ts = if previous_collection_ts > now_ts {
345            previous_collection_ts
346        } else {
347            previous_collection_ts + storage_usage_collection_interval_ms
348        };
349        let next_collection_interval = Duration::from_millis(next_collection_ts - now_ts);
350
351        // 3) Sleep for that amount of time, then initiate another storage usage collection.
352        let internal_cmd_tx = self.internal_cmd_tx.clone();
353        task::spawn(|| "storage_usage_collection", async move {
354            tokio::time::sleep(next_collection_interval).await;
355            if internal_cmd_tx.send(Message::StorageUsageFetch).is_err() {
356                // If sending fails, the main thread has shutdown.
357            }
358        });
359    }
360
361    #[mz_ore::instrument(level = "debug")]
362    async fn message_command(&mut self, cmd: Command) {
363        self.handle_command(cmd).await;
364    }
365
366    #[mz_ore::instrument(level = "debug")]
367    async fn message_controller(&mut self, message: ControllerResponse) {
368        event!(Level::TRACE, message = format!("{:?}", message));
369        match message {
370            ControllerResponse::PeekNotification(uuid, response, otel_ctx) => {
371                self.handle_peek_notification(uuid, response, otel_ctx);
372            }
373            ControllerResponse::SubscribeResponse(sink_id, response) => {
374                if let Some(ActiveComputeSink::Subscribe(active_subscribe)) =
375                    self.active_compute_sinks.get_mut(&sink_id)
376                {
377                    let finished = active_subscribe.process_response(response);
378                    if finished {
379                        self.retire_compute_sinks(btreemap! {
380                            sink_id => ActiveComputeSinkRetireReason::Finished,
381                        })
382                        .await;
383                    }
384
385                    soft_assert_or_log!(
386                        !self.introspection_subscribes.contains_key(&sink_id),
387                        "`sink_id` {sink_id} unexpectedly found in both `active_subscribes` \
388                         and `introspection_subscribes`",
389                    );
390                } else if self.introspection_subscribes.contains_key(&sink_id) {
391                    self.handle_introspection_subscribe_batch(sink_id, response)
392                        .await;
393                } else {
394                    // Cancellation may cause us to receive responses for subscribes no longer
395                    // tracked, so we quietly ignore them.
396                }
397            }
398            ControllerResponse::CopyToResponse(sink_id, response) => {
399                match self.drop_compute_sink(sink_id).await {
400                    Some(ActiveComputeSink::CopyTo(active_copy_to)) => {
401                        active_copy_to.retire_with_response(response);
402                    }
403                    _ => {
404                        // Cancellation may cause us to receive responses for subscribes no longer
405                        // tracked, so we quietly ignore them.
406                    }
407                }
408            }
409            ControllerResponse::ComputeReplicaMetrics(replica_id, new) => {
410                let m = match self
411                    .transient_replica_metadata
412                    .entry(replica_id)
413                    .or_insert_with(|| Some(Default::default()))
414                {
415                    // `None` is the tombstone for a removed replica
416                    None => return,
417                    Some(md) => &mut md.metrics,
418                };
419                let old = std::mem::replace(m, Some(new.clone()));
420                if old.as_ref() != Some(&new) {
421                    let retractions = old.map(|old| {
422                        self.catalog().state().pack_replica_metric_updates(
423                            replica_id,
424                            &old,
425                            Diff::MINUS_ONE,
426                        )
427                    });
428                    let insertions = self.catalog().state().pack_replica_metric_updates(
429                        replica_id,
430                        &new,
431                        Diff::ONE,
432                    );
433                    let updates = if let Some(retractions) = retractions {
434                        retractions
435                            .into_iter()
436                            .chain(insertions.into_iter())
437                            .collect()
438                    } else {
439                        insertions
440                    };
441                    let updates = self
442                        .catalog()
443                        .state()
444                        .resolve_builtin_table_updates(updates);
445                    // We don't care about when the write finishes.
446                    let _notify = self.builtin_table_update().background(updates);
447                }
448            }
449            ControllerResponse::WatchSetFinished(ws_ids) => {
450                let now = self.now();
451                for ws_id in ws_ids {
452                    let Some((conn_id, rsp)) = self.installed_watch_sets.remove(&ws_id) else {
453                        continue;
454                    };
455                    self.connection_watch_sets
456                        .get_mut(&conn_id)
457                        .expect("corrupted coordinator state: unknown connection id")
458                        .remove(&ws_id);
459                    if self.connection_watch_sets[&conn_id].is_empty() {
460                        self.connection_watch_sets.remove(&conn_id);
461                    }
462
463                    match rsp {
464                        WatchSetResponse::StatementDependenciesReady(id, ev) => {
465                            self.record_statement_lifecycle_event(&id, &ev, now);
466                        }
467                        WatchSetResponse::AlterSinkReady(ctx) => {
468                            self.sequence_alter_sink_finish(ctx).await;
469                        }
470                    }
471                }
472            }
473        }
474    }
475
476    #[mz_ore::instrument(level = "debug")]
477    async fn message_purified_statement_ready(
478        &mut self,
479        PurifiedStatementReady {
480            ctx,
481            result,
482            params,
483            mut plan_validity,
484            original_stmt,
485            otel_ctx,
486        }: PurifiedStatementReady,
487    ) {
488        otel_ctx.attach_as_parent();
489
490        // Ensure that all dependencies still exist after purification, as a
491        // `DROP CONNECTION` or other `DROP` may have sneaked in. If any have gone missing, we
492        // repurify the original statement. This will either produce a nice
493        // "unknown connector" error, or pick up a new connector that has
494        // replaced the dropped connector.
495        //
496        // n.b. an `ALTER CONNECTION` occurring during purification is OK
497        // because we always look up/populate a connection's state after
498        // committing to the catalog, so are guaranteed to see the connection's
499        // most recent version.
500        if plan_validity.check(self.catalog()).is_err() {
501            self.handle_execute_inner(original_stmt, params, ctx).await;
502            return;
503        }
504
505        let purified_statement = match result {
506            Ok(ok) => ok,
507            Err(e) => return ctx.retire(Err(e)),
508        };
509
510        let plan = match purified_statement {
511            PurifiedStatement::PurifiedCreateSource {
512                create_progress_subsource_stmt,
513                create_source_stmt,
514                subsources,
515                available_source_references,
516            } => {
517                self.plan_purified_create_source(
518                    &ctx,
519                    params,
520                    create_progress_subsource_stmt,
521                    create_source_stmt,
522                    subsources,
523                    available_source_references,
524                )
525                .await
526            }
527            PurifiedStatement::PurifiedAlterSourceAddSubsources {
528                source_name,
529                options,
530                subsources,
531            } => {
532                self.plan_purified_alter_source_add_subsource(
533                    ctx.session(),
534                    params,
535                    source_name,
536                    options,
537                    subsources,
538                )
539                .await
540            }
541            PurifiedStatement::PurifiedAlterSourceRefreshReferences {
542                source_name,
543                available_source_references,
544            } => self.plan_purified_alter_source_refresh_references(
545                ctx.session(),
546                params,
547                source_name,
548                available_source_references,
549            ),
550            o @ (PurifiedStatement::PurifiedAlterSource { .. }
551            | PurifiedStatement::PurifiedCreateSink(..)
552            | PurifiedStatement::PurifiedCreateTableFromSource { .. }) => {
553                // Unify these into a `Statement`.
554                let stmt = match o {
555                    PurifiedStatement::PurifiedAlterSource { alter_source_stmt } => {
556                        Statement::AlterSource(alter_source_stmt)
557                    }
558                    PurifiedStatement::PurifiedCreateTableFromSource { stmt } => {
559                        Statement::CreateTableFromSource(stmt)
560                    }
561                    PurifiedStatement::PurifiedCreateSink(stmt) => Statement::CreateSink(stmt),
562                    PurifiedStatement::PurifiedCreateSource { .. }
563                    | PurifiedStatement::PurifiedAlterSourceAddSubsources { .. }
564                    | PurifiedStatement::PurifiedAlterSourceRefreshReferences { .. } => {
565                        unreachable!("not part of exterior match stmt")
566                    }
567                };
568
569                // Determine all dependencies, not just those in the statement
570                // itself.
571                let catalog = self.catalog().for_session(ctx.session());
572                let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &stmt);
573                self.plan_statement(ctx.session(), stmt, &params, &resolved_ids)
574                    .map(|plan| (plan, resolved_ids))
575            }
576        };
577
578        match plan {
579            Ok((plan, resolved_ids)) => self.sequence_plan(ctx, plan, resolved_ids).await,
580            Err(e) => ctx.retire(Err(e)),
581        }
582    }
583
584    #[mz_ore::instrument(level = "debug")]
585    async fn message_create_connection_validation_ready(
586        &mut self,
587        CreateConnectionValidationReady {
588            mut ctx,
589            result,
590            connection_id,
591            connection_gid,
592            mut plan_validity,
593            otel_ctx,
594            resolved_ids,
595        }: CreateConnectionValidationReady,
596    ) {
597        otel_ctx.attach_as_parent();
598
599        // Ensure that all dependencies still exist after validation, as a
600        // `DROP SECRET` may have sneaked in.
601        //
602        // WARNING: If we support `ALTER SECRET`, we'll need to also check
603        // for connectors that were altered while we were purifying.
604        if let Err(e) = plan_validity.check(self.catalog()) {
605            let _ = self.secrets_controller.delete(connection_id).await;
606            return ctx.retire(Err(e));
607        }
608
609        let plan = match result {
610            Ok(ok) => ok,
611            Err(e) => {
612                let _ = self.secrets_controller.delete(connection_id).await;
613                return ctx.retire(Err(e));
614            }
615        };
616
617        let result = self
618            .sequence_create_connection_stage_finish(
619                ctx.session_mut(),
620                connection_id,
621                connection_gid,
622                plan,
623                resolved_ids,
624            )
625            .await;
626        ctx.retire(result);
627    }
628
629    #[mz_ore::instrument(level = "debug")]
630    async fn message_alter_connection_validation_ready(
631        &mut self,
632        AlterConnectionValidationReady {
633            mut ctx,
634            result,
635            connection_id,
636            connection_gid: _,
637            mut plan_validity,
638            otel_ctx,
639            resolved_ids: _,
640        }: AlterConnectionValidationReady,
641    ) {
642        otel_ctx.attach_as_parent();
643
644        // Ensure that all dependencies still exist after validation, as a
645        // `DROP SECRET` may have sneaked in.
646        //
647        // WARNING: If we support `ALTER SECRET`, we'll need to also check
648        // for connectors that were altered while we were purifying.
649        if let Err(e) = plan_validity.check(self.catalog()) {
650            return ctx.retire(Err(e));
651        }
652
653        let conn = match result {
654            Ok(ok) => ok,
655            Err(e) => {
656                return ctx.retire(Err(e));
657            }
658        };
659
660        let result = self
661            .sequence_alter_connection_stage_finish(ctx.session_mut(), connection_id, conn)
662            .await;
663        ctx.retire(result);
664    }
665
666    #[mz_ore::instrument(level = "debug")]
667    async fn message_cluster_event(&mut self, event: ClusterEvent) {
668        event!(Level::TRACE, event = format!("{:?}", event));
669
670        if let Some(segment_client) = &self.segment_client {
671            let env_id = &self.catalog().config().environment_id;
672            let mut properties = json!({
673                "cluster_id": event.cluster_id.to_string(),
674                "replica_id": event.replica_id.to_string(),
675                "process_id": event.process_id,
676                "status": event.status.as_kebab_case_str(),
677            });
678            match event.status {
679                ClusterStatus::Online => (),
680                ClusterStatus::Offline(reason) => {
681                    let properties = match &mut properties {
682                        serde_json::Value::Object(map) => map,
683                        _ => unreachable!(),
684                    };
685                    properties.insert(
686                        "reason".into(),
687                        json!(reason.display_or("unknown").to_string()),
688                    );
689                }
690            };
691            segment_client.environment_track(
692                env_id,
693                "Cluster Changed Status",
694                properties,
695                EventDetails {
696                    timestamp: Some(event.time),
697                    ..Default::default()
698                },
699            );
700        }
701
702        // It is possible that we receive a status update for a replica that has
703        // already been dropped from the catalog. Just ignore these events.
704        let Some(replica_statues) = self
705            .cluster_replica_statuses
706            .try_get_cluster_replica_statuses(event.cluster_id, event.replica_id)
707        else {
708            return;
709        };
710
711        if event.status != replica_statues[&event.process_id].status {
712            if !self.controller.read_only() {
713                let offline_reason = match event.status {
714                    ClusterStatus::Online => None,
715                    ClusterStatus::Offline(None) => None,
716                    ClusterStatus::Offline(Some(reason)) => Some(reason.to_string()),
717                };
718                let row = Row::pack_slice(&[
719                    Datum::String(&event.replica_id.to_string()),
720                    Datum::UInt64(event.process_id),
721                    Datum::String(event.status.as_kebab_case_str()),
722                    Datum::from(offline_reason.as_deref()),
723                    Datum::TimestampTz(event.time.try_into().expect("must fit")),
724                ]);
725                self.controller.storage.append_introspection_updates(
726                    IntrospectionType::ReplicaStatusHistory,
727                    vec![(row, Diff::ONE)],
728                );
729            }
730
731            let old_replica_status =
732                ClusterReplicaStatuses::cluster_replica_status(replica_statues);
733            let old_process_status = replica_statues
734                .get(&event.process_id)
735                .expect("Process exists");
736            let builtin_table_retraction =
737                self.catalog().state().pack_cluster_replica_status_update(
738                    event.replica_id,
739                    event.process_id,
740                    old_process_status,
741                    Diff::MINUS_ONE,
742                );
743            let builtin_table_retraction = self
744                .catalog()
745                .state()
746                .resolve_builtin_table_update(builtin_table_retraction);
747
748            let new_process_status = ClusterReplicaProcessStatus {
749                status: event.status,
750                time: event.time,
751            };
752            let builtin_table_addition = self.catalog().state().pack_cluster_replica_status_update(
753                event.replica_id,
754                event.process_id,
755                &new_process_status,
756                Diff::ONE,
757            );
758            let builtin_table_addition = self
759                .catalog()
760                .state()
761                .resolve_builtin_table_update(builtin_table_addition);
762            self.cluster_replica_statuses.ensure_cluster_status(
763                event.cluster_id,
764                event.replica_id,
765                event.process_id,
766                new_process_status,
767            );
768
769            let builtin_table_updates = vec![builtin_table_retraction, builtin_table_addition];
770            // Returns a Future that completes when the Builtin Table write is completed.
771            let builtin_table_completion = self.builtin_table_update().defer(builtin_table_updates);
772
773            let cluster = self.catalog().get_cluster(event.cluster_id);
774            let replica = cluster.replica(event.replica_id).expect("Replica exists");
775            let new_replica_status = self
776                .cluster_replica_statuses
777                .get_cluster_replica_status(event.cluster_id, event.replica_id);
778
779            if old_replica_status != new_replica_status {
780                let notifier = self.broadcast_notice_tx();
781                let notice = AdapterNotice::ClusterReplicaStatusChanged {
782                    cluster: cluster.name.clone(),
783                    replica: replica.name.clone(),
784                    status: new_replica_status,
785                    time: event.time,
786                };
787                // In a separate task, so we don't block the Coordinator, wait for the builtin
788                // table update to complete, and then notify active sessions.
789                mz_ore::task::spawn(
790                    || format!("cluster_event-{}-{}", event.cluster_id, event.replica_id),
791                    async move {
792                        // Wait for the builtin table updates to complete.
793                        builtin_table_completion.await;
794                        // Notify all sessions that were active at the time the cluster status changed.
795                        (notifier)(notice)
796                    },
797                );
798            }
799        }
800    }
801
802    #[mz_ore::instrument(level = "debug")]
803    /// Linearizes sending the results of a read transaction by,
804    ///   1. Holding back any results that were executed at some point in the future, until the
805    ///   containing timeline has advanced to that point in the future.
806    ///   2. Confirming that we are still the current leader before sending results to the client.
807    async fn message_linearize_reads(&mut self) {
808        let mut shortest_wait = Duration::MAX;
809        let mut ready_txns = Vec::new();
810
811        // Cache for `TimestampOracle::read_ts` calls. These are somewhat
812        // expensive so we cache the value. This is correct since all we're
813        // risking is being too conservative. We will not accidentally "release"
814        // a result too early.
815        let mut cached_oracle_ts = BTreeMap::new();
816
817        for (conn_id, mut read_txn) in std::mem::take(&mut self.pending_linearize_read_txns) {
818            if let TimestampContext::TimelineTimestamp {
819                timeline,
820                chosen_ts,
821                oracle_ts,
822            } = read_txn.timestamp_context()
823            {
824                let oracle_ts = match oracle_ts {
825                    Some(oracle_ts) => oracle_ts,
826                    None => {
827                        // There was no oracle timestamp, so no need to delay.
828                        ready_txns.push(read_txn);
829                        continue;
830                    }
831                };
832
833                if chosen_ts <= oracle_ts {
834                    // Chosen ts was already <= the oracle ts, so we're good
835                    // to go!
836                    ready_txns.push(read_txn);
837                    continue;
838                }
839
840                // See what the oracle timestamp is now and delay when needed.
841                let current_oracle_ts = cached_oracle_ts.entry(timeline.clone());
842                let current_oracle_ts = match current_oracle_ts {
843                    btree_map::Entry::Vacant(entry) => {
844                        let timestamp_oracle = self.get_timestamp_oracle(timeline);
845                        let read_ts = timestamp_oracle.read_ts().await;
846                        entry.insert(read_ts.clone());
847                        read_ts
848                    }
849                    btree_map::Entry::Occupied(entry) => entry.get().clone(),
850                };
851
852                if *chosen_ts <= current_oracle_ts {
853                    ready_txns.push(read_txn);
854                } else {
855                    let wait =
856                        Duration::from_millis(chosen_ts.saturating_sub(current_oracle_ts).into());
857                    if wait < shortest_wait {
858                        shortest_wait = wait;
859                    }
860                    read_txn.num_requeues += 1;
861                    self.pending_linearize_read_txns.insert(conn_id, read_txn);
862                }
863            } else {
864                ready_txns.push(read_txn);
865            }
866        }
867
868        if !ready_txns.is_empty() {
869            // Sniff out one ctx, this is where tracing breaks down because we
870            // process all outstanding txns as a batch here.
871            let otel_ctx = ready_txns.first().expect("known to exist").otel_ctx.clone();
872            let span = tracing::debug_span!("message_linearize_reads");
873            otel_ctx.attach_as_parent_to(&span);
874
875            let now = Instant::now();
876            for ready_txn in ready_txns {
877                let span = tracing::debug_span!("retire_read_results");
878                ready_txn.otel_ctx.attach_as_parent_to(&span);
879                let _entered = span.enter();
880                self.metrics
881                    .linearize_message_seconds
882                    .with_label_values(&[
883                        ready_txn.txn.label(),
884                        if ready_txn.num_requeues == 0 {
885                            "true"
886                        } else {
887                            "false"
888                        },
889                    ])
890                    .observe((now - ready_txn.created).as_secs_f64());
891                if let Some((ctx, result)) = ready_txn.txn.finish() {
892                    ctx.retire(result);
893                }
894            }
895        }
896
897        if !self.pending_linearize_read_txns.is_empty() {
898            // Cap wait time to 1s.
899            let remaining_ms = std::cmp::min(shortest_wait, Duration::from_millis(1_000));
900            let internal_cmd_tx = self.internal_cmd_tx.clone();
901            task::spawn(|| "deferred_read_txns", async move {
902                tokio::time::sleep(remaining_ms).await;
903                // It is not an error for this task to be running after `internal_cmd_rx` is dropped.
904                let result = internal_cmd_tx.send(Message::LinearizeReads);
905                if let Err(e) = result {
906                    warn!("internal_cmd_rx dropped before we could send: {:?}", e);
907                }
908            });
909        }
910    }
911}