Skip to main content

mz_adapter/coord/
message_handler.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic for processing [`Coordinator`] messages. The [`Coordinator`] receives
11//! messages from various sources (ex: controller, clients, background tasks, etc).
12
13use std::collections::{BTreeMap, BTreeSet, btree_map};
14use std::time::{Duration, Instant};
15
16use futures::FutureExt;
17use maplit::btreemap;
18use mz_audit_log::VersionedStorageUsage;
19use mz_catalog::memory::objects::ClusterReplicaProcessStatus;
20use mz_controller::ControllerResponse;
21use mz_controller::clusters::{ClusterEvent, ClusterStatus};
22use mz_ore::cast::CastFrom;
23use mz_ore::instrument;
24use mz_ore::now::EpochMillis;
25use mz_ore::option::OptionExt;
26use mz_ore::tracing::OpenTelemetryContext;
27use mz_ore::{soft_assert_or_log, task};
28use mz_persist_client::usage::ShardsUsageReferenced;
29use mz_repr::{Datum, Diff, Row};
30use mz_sql::ast::Statement;
31use mz_sql::names::ResolvedIds;
32use mz_sql::pure::PurifiedStatement;
33use mz_storage_client::controller::IntrospectionType;
34use opentelemetry::trace::TraceContextExt;
35use rand::{Rng, SeedableRng, rngs};
36use serde_json::json;
37use tracing::{Instrument, Level, event, info_span, warn};
38use tracing_opentelemetry::OpenTelemetrySpanExt;
39
40use crate::active_compute_sink::{ActiveComputeSink, ActiveComputeSinkRetireReason};
41use crate::catalog::BuiltinTableUpdate;
42use crate::command::Command;
43use crate::coord::{
44    AlterConnectionValidationReady, ClusterReplicaStatuses, Coordinator,
45    CreateConnectionValidationReady, Message, PurifiedStatementReady, WatchSetResponse,
46};
47use crate::telemetry::{EventDetails, SegmentClientExt};
48use crate::{AdapterNotice, TimestampContext};
49
50impl Coordinator {
51    /// BOXED FUTURE: As of Nov 2023 the returned Future from this function was 74KB. This would
52    /// get stored on the stack which is bad for runtime performance, and blow up our stack usage.
53    /// Because of that we purposefully move Futures of inner function calls onto the heap
54    /// (i.e. Box it).
55    #[instrument]
56    pub(crate) async fn handle_message(&mut self, msg: Message) -> () {
57        match msg {
58            Message::Command(otel_ctx, cmd) => {
59                // TODO: We need a Span that is not none for the otel_ctx to attach the parent
60                // relationship to. If we swap the otel_ctx in `Command::Message` for a Span, we
61                // can downgrade this to a debug_span.
62                let span = tracing::info_span!("message_command").or_current();
63                span.in_scope(|| otel_ctx.attach_as_parent());
64                self.message_command(cmd).instrument(span).await
65            }
66            Message::ControllerReady { controller: _ } => {
67                let Coordinator {
68                    controller,
69                    catalog,
70                    ..
71                } = self;
72                let storage_metadata = catalog.state().storage_metadata();
73                if let Some(m) = controller
74                    .process(storage_metadata)
75                    .expect("`process` never returns an error")
76                {
77                    self.message_controller(m).boxed_local().await
78                }
79            }
80            Message::PurifiedStatementReady(ready) => {
81                self.message_purified_statement_ready(ready)
82                    .boxed_local()
83                    .await
84            }
85            Message::CreateConnectionValidationReady(ready) => {
86                self.message_create_connection_validation_ready(ready)
87                    .boxed_local()
88                    .await
89            }
90            Message::AlterConnectionValidationReady(ready) => {
91                self.message_alter_connection_validation_ready(ready)
92                    .boxed_local()
93                    .await
94            }
95            Message::TryDeferred {
96                conn_id,
97                acquired_lock,
98            } => self.try_deferred(conn_id, acquired_lock).await,
99            Message::GroupCommitInitiate(span, permit) => {
100                // Add an OpenTelemetry link to our current span.
101                tracing::Span::current().add_link(span.context().span().span_context().clone());
102                self.try_group_commit(permit)
103                    .instrument(span)
104                    .boxed_local()
105                    .await
106            }
107            Message::AdvanceTimelines => {
108                self.advance_timelines().boxed_local().await;
109            }
110            Message::ClusterEvent(event) => self.message_cluster_event(event).boxed_local().await,
111            Message::CancelPendingPeeks { conn_id } => {
112                self.cancel_pending_peeks(&conn_id);
113            }
114            Message::LinearizeReads => {
115                self.message_linearize_reads().boxed_local().await;
116            }
117            Message::StagedBatches {
118                conn_id,
119                table_id,
120                batches,
121            } => {
122                self.commit_staged_batches(conn_id, table_id, batches);
123            }
124            Message::StorageUsageSchedule => {
125                self.schedule_storage_usage_collection().boxed_local().await;
126            }
127            Message::StorageUsageFetch => {
128                self.storage_usage_fetch().boxed_local().await;
129            }
130            Message::StorageUsageUpdate(sizes) => {
131                self.storage_usage_update(sizes).boxed_local().await;
132            }
133            Message::StorageUsagePrune(expired) => {
134                self.storage_usage_prune(expired).boxed_local().await;
135            }
136            Message::ArrangementSizesSchedule => {
137                self.schedule_arrangement_sizes_collection()
138                    .boxed_local()
139                    .await;
140            }
141            Message::ArrangementSizesSnapshot => {
142                self.arrangement_sizes_snapshot().boxed_local().await;
143            }
144            Message::ArrangementSizesPrune(expired) => {
145                self.arrangement_sizes_prune(expired).boxed_local().await;
146            }
147            Message::RetireExecute {
148                otel_ctx,
149                data,
150                reason,
151            } => {
152                otel_ctx.attach_as_parent();
153                self.retire_execution(reason, data);
154            }
155            Message::ExecuteSingleStatementTransaction {
156                ctx,
157                otel_ctx,
158                stmt,
159                params,
160            } => {
161                otel_ctx.attach_as_parent();
162                self.sequence_execute_single_statement_transaction(ctx, stmt, params)
163                    .boxed_local()
164                    .await;
165            }
166            Message::PeekStageReady { ctx, span, stage } => {
167                self.sequence_staged(ctx, span, stage).boxed_local().await;
168            }
169            Message::CreateIndexStageReady { ctx, span, stage } => {
170                self.sequence_staged(ctx, span, stage).boxed_local().await;
171            }
172            Message::CreateViewStageReady { ctx, span, stage } => {
173                self.sequence_staged(ctx, span, stage).boxed_local().await;
174            }
175            Message::CreateMaterializedViewStageReady { ctx, span, stage } => {
176                self.sequence_staged(ctx, span, stage).boxed_local().await;
177            }
178            Message::SubscribeStageReady { ctx, span, stage } => {
179                self.sequence_staged(ctx, span, stage).boxed_local().await;
180            }
181            Message::IntrospectionSubscribeStageReady { span, stage } => {
182                self.sequence_staged((), span, stage).boxed_local().await;
183            }
184            Message::ExplainTimestampStageReady { ctx, span, stage } => {
185                self.sequence_staged(ctx, span, stage).boxed_local().await;
186            }
187            Message::SecretStageReady { ctx, span, stage } => {
188                self.sequence_staged(ctx, span, stage).boxed_local().await;
189            }
190            Message::ClusterStageReady { ctx, span, stage } => {
191                self.sequence_staged(ctx, span, stage).boxed_local().await;
192            }
193            Message::DrainStatementLog => {
194                self.drain_statement_log();
195            }
196            Message::PrivateLinkVpcEndpointEvents(events) => {
197                if !self.controller.read_only() {
198                    self.controller.storage.append_introspection_updates(
199                        IntrospectionType::PrivatelinkConnectionStatusHistory,
200                        events
201                            .into_iter()
202                            .map(|e| (mz_repr::Row::from(e), Diff::ONE))
203                            .collect(),
204                    );
205                }
206            }
207            Message::CheckSchedulingPolicies => {
208                self.check_scheduling_policies().boxed_local().await;
209            }
210            Message::SchedulingDecisions(decisions) => {
211                self.handle_scheduling_decisions(decisions)
212                    .boxed_local()
213                    .await;
214            }
215            Message::DeferredStatementReady => {
216                self.handle_deferred_statement().boxed_local().await;
217            }
218        }
219    }
220
221    #[mz_ore::instrument(level = "debug")]
222    pub async fn storage_usage_fetch(&self) {
223        // In read-only mode (e.g. a standby coordinator during a zero-downtime
224        // deployment) we cannot durably write the per-batch allocator bump or
225        // append to `mz_storage_usage_by_shard`, and we also don't want to do
226        // the slow shard scan on a process that isn't going to record the
227        // results. Skip the whole cycle and reschedule so we resume
228        // automatically once the coordinator transitions out of read-only.
229        if self.controller.read_only() {
230            tracing::info!("skipping storage usage collection in read-only mode");
231            if let Err(e) = self.internal_cmd_tx.send(Message::StorageUsageSchedule) {
232                warn!("internal_cmd_rx dropped before we could send: {:?}", e);
233            }
234            return;
235        }
236
237        let internal_cmd_tx = self.internal_cmd_tx.clone();
238        let client = self.storage_usage_client.clone();
239
240        // Record the currently live shards.
241        let live_shards: BTreeSet<_> = self
242            .controller
243            .storage
244            .active_collection_metadatas()
245            .into_iter()
246            .map(|(_id, m)| m.data_shard)
247            .collect();
248
249        let collection_metric = self.metrics.storage_usage_collection_time_seconds.clone();
250
251        // Spawn an asynchronous task to compute the storage usage, which
252        // requires a slow scan of the underlying storage engine.
253        task::spawn(|| "storage_usage_fetch", async move {
254            let collection_metric_timer = collection_metric.start_timer();
255            let shard_sizes = client.shards_usage_referenced(live_shards).await;
256            collection_metric_timer.observe_duration();
257
258            // It is not an error for shard sizes to become ready after
259            // `internal_cmd_rx` is dropped.
260            if let Err(e) = internal_cmd_tx.send(Message::StorageUsageUpdate(shard_sizes)) {
261                warn!("internal_cmd_rx dropped before we could send: {:?}", e);
262            }
263        });
264    }
265
266    #[mz_ore::instrument(level = "debug")]
267    async fn storage_usage_update(&mut self, shards_usage: ShardsUsageReferenced) {
268        // Similar to audit events, use the oracle ts so this is guaranteed to
269        // increase. This is intentionally the timestamp of when collection
270        // finished, not when it started, so that we don't write data with a
271        // timestamp in the past.
272        //
273        // `storage_usage_fetch` skips this path in read-only mode, so we can
274        // unconditionally bump the oracle write ts here.
275        let write_ts = self.get_local_write_ts().await.timestamp;
276        let collection_timestamp: EpochMillis = write_ts.into();
277
278        // All rows in this collection cycle share `batch_id` so consumers can
279        // identify rows that were collected together. We use one durable
280        // allocator bump per cycle (rather than per shard) so the id is
281        // monotonic across coordinator restarts while still keeping the
282        // coord-blocking cost proportional to one round-trip, not N.
283        let batch_id = match self.catalog().allocate_storage_usage_id(write_ts).await {
284            Ok(id) => id,
285            Err(err) => {
286                tracing::warn!("failed to allocate storage usage batch id: {:?}", err);
287                return;
288            }
289        };
290
291        let updates: Vec<_> = shards_usage
292            .by_shard
293            .into_iter()
294            .map(|(shard_id, shard_usage)| {
295                let event = VersionedStorageUsage::new(
296                    batch_id,
297                    Some(shard_id.to_string()),
298                    shard_usage.size_bytes(),
299                    collection_timestamp,
300                );
301                self.catalog().pack_storage_usage_update(event, Diff::ONE)
302            })
303            .collect();
304
305        let (table_updates, _) = self.builtin_table_update().execute(updates).await;
306
307        let internal_cmd_tx = self.internal_cmd_tx.clone();
308        let task_span = info_span!(parent: None, "coord::storage_usage_update::table_updates");
309        OpenTelemetryContext::obtain().attach_as_parent_to(&task_span);
310        task::spawn(|| "storage_usage_update_table_updates", async move {
311            table_updates.instrument(task_span).await;
312            // It is not an error for this task to be running after `internal_cmd_rx` is dropped.
313            if let Err(e) = internal_cmd_tx.send(Message::StorageUsageSchedule) {
314                warn!("internal_cmd_rx dropped before we could send: {e:?}");
315            }
316        });
317    }
318
319    #[mz_ore::instrument(level = "debug")]
320    async fn storage_usage_prune(&mut self, expired: Vec<BuiltinTableUpdate>) {
321        let (fut, _) = self.builtin_table_update().execute(expired).await;
322        task::spawn(|| "storage_usage_pruning_apply", async move {
323            fut.await;
324        });
325    }
326
327    pub async fn schedule_storage_usage_collection(&self) {
328        // Instead of using an `tokio::timer::Interval`, we calculate the time until the next
329        // usage collection and wait for that amount of time. This is so we can keep the intervals
330        // consistent even across restarts. If collection takes too long, it is possible that
331        // we miss an interval.
332
333        // 1) Deterministically pick some offset within the collection interval to prevent
334        // thundering herds across environments.
335        const SEED_LEN: usize = 32;
336        let mut seed = [0; SEED_LEN];
337        for (i, byte) in self
338            .catalog()
339            .state()
340            .config()
341            .environment_id
342            .organization_id()
343            .as_bytes()
344            .into_iter()
345            .take(SEED_LEN)
346            .enumerate()
347        {
348            seed[i] = *byte;
349        }
350        let storage_usage_collection_interval_ms: EpochMillis =
351            EpochMillis::try_from(self.storage_usage_collection_interval.as_millis())
352                .expect("storage usage collection interval must fit into u64");
353        let offset =
354            rngs::SmallRng::from_seed(seed).random_range(0..storage_usage_collection_interval_ms);
355        let now_ts: EpochMillis = self.peek_local_write_ts().await.into();
356
357        // 2) Determine the amount of ms between now and the next collection time.
358        let previous_collection_ts =
359            (now_ts - (now_ts % storage_usage_collection_interval_ms)) + offset;
360        let next_collection_ts = if previous_collection_ts > now_ts {
361            previous_collection_ts
362        } else {
363            previous_collection_ts + storage_usage_collection_interval_ms
364        };
365        let next_collection_interval = Duration::from_millis(next_collection_ts - now_ts);
366
367        // 3) Sleep for that amount of time, then initiate another storage usage collection.
368        let internal_cmd_tx = self.internal_cmd_tx.clone();
369        task::spawn(|| "storage_usage_collection", async move {
370            tokio::time::sleep(next_collection_interval).await;
371            if internal_cmd_tx.send(Message::StorageUsageFetch).is_err() {
372                // If sending fails, the main thread has shutdown.
373            }
374        });
375    }
376
377    /// Schedules the next per-object arrangement sizes snapshot.
378    ///
379    /// Aligns each fire to an `organization_id`-seeded offset within the
380    /// interval so collections stay consistent across restarts and don't
381    /// synchronize across environments. Sleeps are capped at `MAX_SLEEP`,
382    /// so dyncfg changes (interval edits or the `0s` disable sentinel) take
383    /// effect within one cap rather than after the full interval.
384    pub async fn schedule_arrangement_sizes_collection(&self) {
385        const MAX_SLEEP: Duration = Duration::from_secs(60);
386
387        let interval_duration =
388            mz_adapter_types::dyncfgs::ARRANGEMENT_SIZE_HISTORY_COLLECTION_INTERVAL
389                .get(self.catalog().system_config().dyncfgs());
390
391        // `0s` disables collection. Keep polling so re-enabling takes effect
392        // within `MAX_SLEEP` rather than requiring an envd restart.
393        if interval_duration.is_zero() {
394            let internal_cmd_tx = self.internal_cmd_tx.clone();
395            task::spawn(|| "arrangement_sizes_collection_disabled", async move {
396                tokio::time::sleep(MAX_SLEEP).await;
397                let _ = internal_cmd_tx.send(Message::ArrangementSizesSchedule);
398            });
399            return;
400        }
401
402        const SEED_LEN: usize = 32;
403        let mut seed = [0; SEED_LEN];
404        for (i, byte) in self
405            .catalog()
406            .state()
407            .config()
408            .environment_id
409            .organization_id()
410            .as_bytes()
411            .into_iter()
412            .take(SEED_LEN)
413            .enumerate()
414        {
415            seed[i] = *byte;
416        }
417        let interval_ms: EpochMillis = EpochMillis::try_from(interval_duration.as_millis())
418            .expect("arrangement_size_history_collection_interval must fit into u64");
419        // `rand::random_range` panics on an empty range.
420        let interval_ms = interval_ms.max(1);
421        let offset = rngs::SmallRng::from_seed(seed).random_range(0..interval_ms);
422        let now_ts: EpochMillis = self.peek_local_write_ts().await.into();
423
424        let previous_collection_ts = (now_ts - (now_ts % interval_ms)) + offset;
425        let next_collection_ts = if previous_collection_ts > now_ts {
426            previous_collection_ts
427        } else {
428            previous_collection_ts + interval_ms
429        };
430        let sleep_for = Duration::from_millis(next_collection_ts - now_ts);
431
432        // Within one cap of the next fire we sleep the remainder and snapshot;
433        // further out we sleep the cap and re-enter so a dyncfg change is
434        // picked up before committing to a long sleep.
435        let (capped_sleep, fire_snapshot) = if sleep_for <= MAX_SLEEP {
436            (sleep_for, true)
437        } else {
438            (MAX_SLEEP, false)
439        };
440
441        let internal_cmd_tx = self.internal_cmd_tx.clone();
442        task::spawn(|| "arrangement_sizes_collection", async move {
443            tokio::time::sleep(capped_sleep).await;
444            let msg = if fire_snapshot {
445                Message::ArrangementSizesSnapshot
446            } else {
447                Message::ArrangementSizesSchedule
448            };
449            // Send is best-effort: if the coordinator is shutting down, drop.
450            let _ = internal_cmd_tx.send(msg);
451        });
452    }
453
454    /// Snapshots the current contents of `mz_object_arrangement_sizes` and
455    /// appends them to `mz_object_arrangement_size_history`, tagged with a
456    /// shared `collection_timestamp`. Reschedules on completion.
457    ///
458    /// Each `(replica_id, object_id)` pair is recorded with a
459    /// `hydration_complete` flag derived from `mz_compute_hydration_times`:
460    /// `true` once the pair's initial hydration on that replica is finished,
461    /// `false` while still building. Consumers that want only stable sizes
462    /// should filter `WHERE hydration_complete`.
463    #[mz_ore::instrument(level = "debug")]
464    async fn arrangement_sizes_snapshot(&mut self) {
465        // The catalog server is not writable in read-only mode.
466        if self.controller.read_only() {
467            self.schedule_arrangement_sizes_collection().await;
468            return;
469        }
470
471        let collection_timer = self
472            .metrics
473            .arrangement_sizes_collection_time_seconds
474            .start_timer();
475
476        let live_item_id = self.catalog().resolve_builtin_storage_collection(
477            &mz_catalog::builtin::MZ_OBJECT_ARRANGEMENT_SIZES_UNIFIED,
478        );
479        let live_global_id = self.catalog.get_entry(&live_item_id).latest_global_id();
480        let hydration_item_id = self
481            .catalog()
482            .resolve_builtin_storage_collection(&mz_catalog::builtin::MZ_COMPUTE_HYDRATION_TIMES);
483        let hydration_global_id = self
484            .catalog
485            .get_entry(&hydration_item_id)
486            .latest_global_id();
487        let history_item_id = self
488            .catalog()
489            .resolve_builtin_table(&mz_catalog::builtin::MZ_OBJECT_ARRANGEMENT_SIZE_HISTORY);
490
491        let read_ts = self.get_local_read_ts().await;
492        let snapshot = match self
493            .controller
494            .storage_collections
495            .snapshot(live_global_id, read_ts)
496            .await
497        {
498            Ok(s) => s,
499            Err(e) => {
500                tracing::warn!("arrangement sizes snapshot failed: {e:?}");
501                drop(collection_timer);
502                self.schedule_arrangement_sizes_collection().await;
503                return;
504            }
505        };
506        let mut hydration_snapshot = match self
507            .controller
508            .storage_collections
509            .snapshot(hydration_global_id, read_ts)
510            .await
511        {
512            Ok(s) => s,
513            Err(e) => {
514                tracing::warn!("arrangement sizes hydration snapshot failed: {e:?}");
515                drop(collection_timer);
516                self.schedule_arrangement_sizes_collection().await;
517                return;
518            }
519        };
520        differential_dataflow::consolidation::consolidate(&mut hydration_snapshot);
521
522        // Build the set of pairs whose initial hydration has finished
523        // (`time_ns IS NOT NULL`). The set drives the `hydration_complete`
524        // flag for each row we emit below.
525        let mut datum_vec = mz_repr::DatumVec::new();
526        let mut hydrated: BTreeSet<(String, String)> = BTreeSet::new();
527        const HYDRATION_COL_REPLICA_ID: usize = 0;
528        const HYDRATION_COL_OBJECT_ID: usize = 1;
529        const HYDRATION_COL_TIME_NS: usize = 2;
530        const HYDRATION_COL_COUNT: usize = 3;
531        for (row, diff) in &hydration_snapshot {
532            if *diff != 1 {
533                continue;
534            }
535            let datums = datum_vec.borrow_with(row);
536            if datums.len() < HYDRATION_COL_COUNT {
537                continue;
538            }
539            if datums[HYDRATION_COL_TIME_NS].is_null() {
540                continue;
541            }
542            hydrated.insert((
543                datums[HYDRATION_COL_REPLICA_ID].unwrap_str().to_string(),
544                datums[HYDRATION_COL_OBJECT_ID].unwrap_str().to_string(),
545            ));
546        }
547
548        // `collection_ts` is stamped after the snapshot so it's always >= the
549        // state the rows describe, and monotone across restarts. The snapshot
550        // read and this stamp aren't atomic, but the resulting skew is bounded
551        // by snapshot latency and negligible at this cadence.
552        let collection_ts: EpochMillis = self.get_local_write_ts().await.timestamp.into();
553        let collection_datum = Datum::TimestampTz(
554            mz_ore::now::to_datetime(collection_ts)
555                .try_into()
556                .expect("collection_timestamp must fit into TimestampTz"),
557        );
558
559        let mut consolidated = snapshot;
560        differential_dataflow::consolidation::consolidate(&mut consolidated);
561
562        // Column positions in `mz_object_arrangement_sizes`.
563        const LIVE_COL_REPLICA_ID: usize = 0;
564        const LIVE_COL_OBJECT_ID: usize = 1;
565        const LIVE_COL_SIZE: usize = 2;
566        const LIVE_COL_COUNT: usize = 3;
567
568        let mut skipped_malformed: u64 = 0;
569        let mut skipped_null_size: u64 = 0;
570        let mut updates: Vec<BuiltinTableUpdate> = Vec::with_capacity(consolidated.len());
571        for (row, diff) in consolidated.iter() {
572            if *diff != 1 {
573                continue;
574            }
575            let datums = datum_vec.borrow_with(row);
576            // Surface schema drift via a warn log below rather than silently
577            // skipping entire snapshots.
578            if datums.len() != LIVE_COL_COUNT {
579                skipped_malformed += 1;
580                continue;
581            }
582            let replica_id = datums[LIVE_COL_REPLICA_ID].unwrap_str();
583            let object_id = datums[LIVE_COL_OBJECT_ID].unwrap_str();
584            let size_datum = datums[LIVE_COL_SIZE];
585            // The history table's `size` is non-null; fabricating zero would
586            // be misleading, so drop.
587            if size_datum.is_null() {
588                skipped_null_size += 1;
589                continue;
590            }
591            let size = size_datum.unwrap_int64();
592            // Pairs whose hydration hasn't completed yet are still recorded,
593            // tagged with `hydration_complete = false`. Consumers that care
594            // only about stable sizes can filter on `hydration_complete`.
595            let hydration_complete =
596                hydrated.contains(&(replica_id.to_string(), object_id.to_string()));
597            let new_row = Row::pack_slice(&[
598                Datum::String(replica_id),
599                Datum::String(object_id),
600                Datum::Int64(size),
601                collection_datum,
602                Datum::from(hydration_complete),
603            ]);
604            updates.push(BuiltinTableUpdate::row(history_item_id, new_row, Diff::ONE));
605        }
606        if skipped_malformed > 0 {
607            warn!(
608                "mz_object_arrangement_sizes schema drift: skipped {skipped_malformed} rows \
609                 with unexpected arity"
610            );
611        }
612        if skipped_null_size > 0 {
613            tracing::debug!("skipped {skipped_null_size} live rows with null size");
614        }
615
616        let row_count = updates.len();
617        // Captures snapshot + row construction. The async table-apply below
618        // is captured separately by `mz_append_table_duration_seconds`.
619        collection_timer.observe_duration();
620
621        if !updates.is_empty() {
622            self.metrics
623                .arrangement_sizes_rows_written
624                .inc_by(u64::cast_from(row_count));
625            // TODO(arrangement-sizes): when the writeable-catalog-server plumbing
626            // in https://github.com/MaterializeInc/materialize/pull/35436 lands,
627            // append directly on `mz_catalog_server` instead of going through
628            // the environmentd builtin-table-update path.
629            let (fut, _) = self.builtin_table_update().execute(updates).await;
630            let internal_cmd_tx = self.internal_cmd_tx.clone();
631            let task_span =
632                info_span!(parent: None, "coord::arrangement_sizes_snapshot::table_updates");
633            OpenTelemetryContext::obtain().attach_as_parent_to(&task_span);
634            task::spawn(|| "arrangement_sizes_snapshot_apply", async move {
635                fut.instrument(task_span).await;
636                if let Err(e) = internal_cmd_tx.send(Message::ArrangementSizesSchedule) {
637                    warn!("internal_cmd_rx dropped before we could send: {e:?}");
638                }
639            });
640        } else {
641            self.schedule_arrangement_sizes_collection().await;
642        }
643
644        tracing::debug!(
645            "appended {row_count} rows to mz_object_arrangement_size_history at ts {collection_ts}"
646        );
647    }
648
649    #[mz_ore::instrument(level = "debug")]
650    async fn arrangement_sizes_prune(&mut self, expired: Vec<BuiltinTableUpdate>) {
651        let (fut, _) = self.builtin_table_update().execute(expired).await;
652        task::spawn(|| "arrangement_sizes_pruning_apply", async move {
653            fut.await;
654        });
655    }
656
657    #[mz_ore::instrument(level = "debug")]
658    async fn message_command(&mut self, cmd: Command) {
659        self.handle_command(cmd).await;
660    }
661
662    #[mz_ore::instrument(level = "debug")]
663    async fn message_controller(&mut self, message: ControllerResponse) {
664        event!(Level::TRACE, message = format!("{:?}", message));
665        match message {
666            ControllerResponse::PeekNotification(uuid, response, otel_ctx) => {
667                self.handle_peek_notification(uuid, response, otel_ctx);
668            }
669            ControllerResponse::SubscribeResponse(sink_id, response) => {
670                if let Some(ActiveComputeSink::Subscribe(active_subscribe)) =
671                    self.active_compute_sinks.get_mut(&sink_id)
672                {
673                    let finished = active_subscribe.process_response(response);
674                    if finished {
675                        self.retire_compute_sinks(btreemap! {
676                            sink_id => ActiveComputeSinkRetireReason::Finished,
677                        })
678                        .await;
679                    }
680
681                    soft_assert_or_log!(
682                        !self.introspection_subscribes.contains_key(&sink_id),
683                        "`sink_id` {sink_id} unexpectedly found in both `active_subscribes` \
684                         and `introspection_subscribes`",
685                    );
686                } else if self.introspection_subscribes.contains_key(&sink_id) {
687                    self.handle_introspection_subscribe_batch(sink_id, response)
688                        .await;
689                } else {
690                    // Cancellation may cause us to receive responses for subscribes no longer
691                    // tracked, so we quietly ignore them.
692                }
693            }
694            ControllerResponse::CopyToResponse(sink_id, response) => {
695                match self.drop_compute_sink(sink_id).await {
696                    Some(ActiveComputeSink::CopyTo(active_copy_to)) => {
697                        active_copy_to.retire_with_response(response);
698                    }
699                    _ => {
700                        // Cancellation may cause us to receive responses for subscribes no longer
701                        // tracked, so we quietly ignore them.
702                    }
703                }
704            }
705            ControllerResponse::WatchSetFinished(ws_ids) => {
706                let now = self.now();
707                for ws_id in ws_ids {
708                    let Some((conn_id, rsp)) = self.installed_watch_sets.remove(&ws_id) else {
709                        continue;
710                    };
711                    self.connection_watch_sets
712                        .get_mut(&conn_id)
713                        .expect("corrupted coordinator state: unknown connection id")
714                        .remove(&ws_id);
715                    if self.connection_watch_sets[&conn_id].is_empty() {
716                        self.connection_watch_sets.remove(&conn_id);
717                    }
718
719                    match rsp {
720                        WatchSetResponse::StatementDependenciesReady(id, ev) => {
721                            self.record_statement_lifecycle_event(&id, &ev, now);
722                        }
723                        WatchSetResponse::AlterSinkReady(ctx) => {
724                            self.sequence_alter_sink_finish(ctx).await;
725                        }
726                        WatchSetResponse::AlterMaterializedViewReady(ctx) => {
727                            self.sequence_alter_materialized_view_apply_replacement_finish(ctx)
728                                .await;
729                        }
730                    }
731                }
732            }
733        }
734    }
735
736    #[mz_ore::instrument(level = "debug")]
737    async fn message_purified_statement_ready(
738        &mut self,
739        PurifiedStatementReady {
740            ctx,
741            result,
742            params,
743            mut plan_validity,
744            original_stmt,
745            otel_ctx,
746        }: PurifiedStatementReady,
747    ) {
748        otel_ctx.attach_as_parent();
749
750        // Ensure that all dependencies still exist after purification, as a
751        // `DROP CONNECTION` or other `DROP` may have sneaked in. If any have gone missing, we
752        // repurify the original statement. This will either produce a nice
753        // "unknown connector" error, or pick up a new connector that has
754        // replaced the dropped connector.
755        //
756        // n.b. an `ALTER CONNECTION` occurring during purification is OK
757        // because we always look up/populate a connection's state after
758        // committing to the catalog, so are guaranteed to see the connection's
759        // most recent version.
760        if plan_validity.check(self.catalog()).is_err() {
761            self.handle_execute_inner(original_stmt, params, ctx).await;
762            return;
763        }
764
765        let purified_statement = match result {
766            Ok(ok) => ok,
767            Err(e) => return ctx.retire(Err(e)),
768        };
769
770        let plan = match purified_statement {
771            PurifiedStatement::PurifiedCreateSource {
772                create_progress_subsource_stmt,
773                create_source_stmt,
774                subsources,
775                available_source_references,
776            } => self
777                .plan_purified_create_source(
778                    &ctx,
779                    params,
780                    create_progress_subsource_stmt,
781                    create_source_stmt,
782                    subsources,
783                    available_source_references,
784                )
785                .await
786                .map(|(plan, resolved_ids)| (plan, resolved_ids, ResolvedIds::empty())),
787            PurifiedStatement::PurifiedAlterSourceAddSubsources {
788                source_name,
789                options,
790                subsources,
791            } => self
792                .plan_purified_alter_source_add_subsource(
793                    ctx.session(),
794                    params,
795                    source_name,
796                    options,
797                    subsources,
798                )
799                .await
800                .map(|(plan, resolved_ids)| (plan, resolved_ids, ResolvedIds::empty())),
801            PurifiedStatement::PurifiedAlterSourceRefreshReferences {
802                source_name,
803                available_source_references,
804            } => self
805                .plan_purified_alter_source_refresh_references(
806                    ctx.session(),
807                    params,
808                    source_name,
809                    available_source_references,
810                )
811                .map(|(plan, resolved_ids)| (plan, resolved_ids, ResolvedIds::empty())),
812            o @ (PurifiedStatement::PurifiedAlterSource { .. }
813            | PurifiedStatement::PurifiedCreateSink(..)
814            | PurifiedStatement::PurifiedCreateTableFromSource { .. }) => {
815                // Unify these into a `Statement`.
816                let stmt = match o {
817                    PurifiedStatement::PurifiedAlterSource { alter_source_stmt } => {
818                        Statement::AlterSource(alter_source_stmt)
819                    }
820                    PurifiedStatement::PurifiedCreateTableFromSource { stmt } => {
821                        Statement::CreateTableFromSource(stmt)
822                    }
823                    PurifiedStatement::PurifiedCreateSink(stmt) => Statement::CreateSink(stmt),
824                    PurifiedStatement::PurifiedCreateSource { .. }
825                    | PurifiedStatement::PurifiedAlterSourceAddSubsources { .. }
826                    | PurifiedStatement::PurifiedAlterSourceRefreshReferences { .. } => {
827                        unreachable!("not part of exterior match stmt")
828                    }
829                };
830
831                // Determine all dependencies, not just those in the statement
832                // itself.
833                let catalog = self.catalog().for_session(ctx.session());
834                let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &stmt);
835                self.plan_statement(ctx.session(), stmt, &params, &resolved_ids)
836                    .map(|(plan, sql_impl_ids)| (plan, resolved_ids, sql_impl_ids))
837            }
838        };
839
840        match plan {
841            Ok((plan, resolved_ids, sql_impl_ids)) => {
842                self.sequence_plan(ctx, plan, resolved_ids, sql_impl_ids)
843                    .await
844            }
845            Err(e) => ctx.retire(Err(e)),
846        }
847    }
848
849    #[mz_ore::instrument(level = "debug")]
850    async fn message_create_connection_validation_ready(
851        &mut self,
852        CreateConnectionValidationReady {
853            mut ctx,
854            result,
855            connection_id,
856            connection_gid,
857            mut plan_validity,
858            otel_ctx,
859            resolved_ids,
860        }: CreateConnectionValidationReady,
861    ) {
862        otel_ctx.attach_as_parent();
863
864        // Ensure that all dependencies still exist after validation, as a
865        // `DROP SECRET` may have sneaked in.
866        //
867        // WARNING: If we support `ALTER SECRET`, we'll need to also check
868        // for connectors that were altered while we were purifying.
869        if let Err(e) = plan_validity.check(self.catalog()) {
870            if self.secrets_controller.delete(connection_id).await.is_ok() {
871                self.caching_secrets_reader.invalidate(connection_id);
872            }
873            return ctx.retire(Err(e));
874        }
875
876        let plan = match result {
877            Ok(ok) => ok,
878            Err(e) => {
879                if self.secrets_controller.delete(connection_id).await.is_ok() {
880                    self.caching_secrets_reader.invalidate(connection_id);
881                }
882                return ctx.retire(Err(e));
883            }
884        };
885
886        let result = self
887            .sequence_create_connection_stage_finish(
888                &mut ctx,
889                connection_id,
890                connection_gid,
891                plan,
892                resolved_ids,
893            )
894            .await;
895        ctx.retire(result);
896    }
897
898    #[mz_ore::instrument(level = "debug")]
899    async fn message_alter_connection_validation_ready(
900        &mut self,
901        AlterConnectionValidationReady {
902            mut ctx,
903            result,
904            connection_id,
905            connection_gid: _,
906            mut plan_validity,
907            otel_ctx,
908            resolved_ids: _,
909        }: AlterConnectionValidationReady,
910    ) {
911        otel_ctx.attach_as_parent();
912
913        // Ensure that all dependencies still exist after validation, as a
914        // `DROP SECRET` may have sneaked in.
915        //
916        // WARNING: If we support `ALTER SECRET`, we'll need to also check
917        // for connectors that were altered while we were purifying.
918        if let Err(e) = plan_validity.check(self.catalog()) {
919            return ctx.retire(Err(e));
920        }
921
922        let conn = match result {
923            Ok(ok) => ok,
924            Err(e) => {
925                return ctx.retire(Err(e));
926            }
927        };
928
929        let result = self
930            .sequence_alter_connection_stage_finish(ctx.session_mut(), connection_id, conn)
931            .await;
932        ctx.retire(result);
933    }
934
935    #[mz_ore::instrument(level = "debug")]
936    async fn message_cluster_event(&mut self, event: ClusterEvent) {
937        event!(Level::TRACE, event = format!("{:?}", event));
938
939        if let Some(segment_client) = &self.segment_client {
940            let env_id = &self.catalog().config().environment_id;
941            let mut properties = json!({
942                "cluster_id": event.cluster_id.to_string(),
943                "replica_id": event.replica_id.to_string(),
944                "process_id": event.process_id,
945                "status": event.status.as_kebab_case_str(),
946            });
947            match event.status {
948                ClusterStatus::Online => (),
949                ClusterStatus::Offline(reason) => {
950                    let properties = match &mut properties {
951                        serde_json::Value::Object(map) => map,
952                        _ => unreachable!(),
953                    };
954                    properties.insert(
955                        "reason".into(),
956                        json!(reason.display_or("unknown").to_string()),
957                    );
958                }
959            };
960            segment_client.environment_track(
961                env_id,
962                "Cluster Changed Status",
963                properties,
964                EventDetails {
965                    timestamp: Some(event.time),
966                    ..Default::default()
967                },
968            );
969        }
970
971        // It is possible that we receive a status update for a replica that has
972        // already been dropped from the catalog. Just ignore these events.
973        let Some(replica_statues) = self
974            .cluster_replica_statuses
975            .try_get_cluster_replica_statuses(event.cluster_id, event.replica_id)
976        else {
977            return;
978        };
979
980        if event.status != replica_statues[&event.process_id].status {
981            if !self.controller.read_only() {
982                let offline_reason = match event.status {
983                    ClusterStatus::Online => None,
984                    ClusterStatus::Offline(None) => None,
985                    ClusterStatus::Offline(Some(reason)) => Some(reason.to_string()),
986                };
987                let row = Row::pack_slice(&[
988                    Datum::String(&event.replica_id.to_string()),
989                    Datum::UInt64(event.process_id),
990                    Datum::String(event.status.as_kebab_case_str()),
991                    Datum::from(offline_reason.as_deref()),
992                    Datum::TimestampTz(event.time.try_into().expect("must fit")),
993                ]);
994                self.controller.storage.append_introspection_updates(
995                    IntrospectionType::ReplicaStatusHistory,
996                    vec![(row, Diff::ONE)],
997                );
998            }
999
1000            let old_replica_status =
1001                ClusterReplicaStatuses::cluster_replica_status(replica_statues);
1002
1003            let new_process_status = ClusterReplicaProcessStatus {
1004                status: event.status,
1005                time: event.time,
1006            };
1007            self.cluster_replica_statuses.ensure_cluster_status(
1008                event.cluster_id,
1009                event.replica_id,
1010                event.process_id,
1011                new_process_status,
1012            );
1013
1014            let cluster = self.catalog().get_cluster(event.cluster_id);
1015            let replica = cluster.replica(event.replica_id).expect("Replica exists");
1016            let new_replica_status = self
1017                .cluster_replica_statuses
1018                .get_cluster_replica_status(event.cluster_id, event.replica_id);
1019
1020            if old_replica_status != new_replica_status {
1021                let notifier = self.broadcast_notice_tx();
1022                let notice = AdapterNotice::ClusterReplicaStatusChanged {
1023                    cluster: cluster.name.clone(),
1024                    replica: replica.name.clone(),
1025                    status: new_replica_status,
1026                    time: event.time,
1027                };
1028                notifier(notice);
1029            }
1030        }
1031    }
1032
1033    #[mz_ore::instrument(level = "debug")]
1034    /// Linearizes sending the results of a read transaction by,
1035    ///   1. Holding back any results that were executed at some point in the future, until the
1036    ///   containing timeline has advanced to that point in the future.
1037    ///   2. Confirming that we are still the current leader before sending results to the client.
1038    async fn message_linearize_reads(&mut self) {
1039        let mut shortest_wait = Duration::MAX;
1040        let mut ready_txns = Vec::new();
1041
1042        // Cache for `TimestampOracle::read_ts` calls. These are somewhat
1043        // expensive so we cache the value. This is correct since all we're
1044        // risking is being too conservative. We will not accidentally "release"
1045        // a result too early.
1046        let mut cached_oracle_ts = BTreeMap::new();
1047
1048        for (conn_id, mut read_txn) in std::mem::take(&mut self.pending_linearize_read_txns) {
1049            if let TimestampContext::TimelineTimestamp {
1050                timeline,
1051                chosen_ts,
1052                oracle_ts,
1053            } = read_txn.timestamp_context()
1054            {
1055                let oracle_ts = match oracle_ts {
1056                    Some(oracle_ts) => oracle_ts,
1057                    None => {
1058                        // There was no oracle timestamp, so no need to delay.
1059                        ready_txns.push(read_txn);
1060                        continue;
1061                    }
1062                };
1063
1064                if chosen_ts <= oracle_ts {
1065                    // Chosen ts was already <= the oracle ts, so we're good
1066                    // to go!
1067                    ready_txns.push(read_txn);
1068                    continue;
1069                }
1070
1071                // See what the oracle timestamp is now and delay when needed.
1072                let current_oracle_ts = cached_oracle_ts.entry(timeline.clone());
1073                let current_oracle_ts = match current_oracle_ts {
1074                    btree_map::Entry::Vacant(entry) => {
1075                        let timestamp_oracle = self.get_timestamp_oracle(timeline);
1076                        let read_ts = timestamp_oracle.read_ts().await;
1077                        entry.insert(read_ts.clone());
1078                        read_ts
1079                    }
1080                    btree_map::Entry::Occupied(entry) => entry.get().clone(),
1081                };
1082
1083                if *chosen_ts <= current_oracle_ts {
1084                    ready_txns.push(read_txn);
1085                } else {
1086                    let wait =
1087                        Duration::from_millis(chosen_ts.saturating_sub(current_oracle_ts).into());
1088                    if wait < shortest_wait {
1089                        shortest_wait = wait;
1090                    }
1091                    read_txn.num_requeues += 1;
1092                    self.pending_linearize_read_txns.insert(conn_id, read_txn);
1093                }
1094            } else {
1095                ready_txns.push(read_txn);
1096            }
1097        }
1098
1099        if !ready_txns.is_empty() {
1100            // Sniff out one ctx, this is where tracing breaks down because we
1101            // process all outstanding txns as a batch here.
1102            let otel_ctx = ready_txns.first().expect("known to exist").otel_ctx.clone();
1103            let span = tracing::debug_span!("message_linearize_reads");
1104            otel_ctx.attach_as_parent_to(&span);
1105
1106            let now = Instant::now();
1107            for ready_txn in ready_txns {
1108                let span = tracing::debug_span!("retire_read_results");
1109                ready_txn.otel_ctx.attach_as_parent_to(&span);
1110                let _entered = span.enter();
1111                self.metrics
1112                    .linearize_message_seconds
1113                    .with_label_values(&[
1114                        ready_txn.txn.label(),
1115                        if ready_txn.num_requeues == 0 {
1116                            "true"
1117                        } else {
1118                            "false"
1119                        },
1120                    ])
1121                    .observe((now - ready_txn.created).as_secs_f64());
1122                if let Some((ctx, result)) = ready_txn.txn.finish() {
1123                    ctx.retire(result);
1124                }
1125            }
1126        }
1127
1128        if !self.pending_linearize_read_txns.is_empty() {
1129            // Cap wait time to 1s.
1130            let remaining_ms = std::cmp::min(shortest_wait, Duration::from_millis(1_000));
1131            let internal_cmd_tx = self.internal_cmd_tx.clone();
1132            task::spawn(|| "deferred_read_txns", async move {
1133                tokio::time::sleep(remaining_ms).await;
1134                // It is not an error for this task to be running after `internal_cmd_rx` is dropped.
1135                let result = internal_cmd_tx.send(Message::LinearizeReads);
1136                if let Err(e) = result {
1137                    warn!("internal_cmd_rx dropped before we could send: {:?}", e);
1138                }
1139            });
1140        }
1141    }
1142}