Skip to main content

mz_adapter/
coord.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Translation of SQL commands into timestamped `Controller` commands.
11//!
12//! The various SQL commands instruct the system to take actions that are not
13//! yet explicitly timestamped. On the other hand, the underlying data continually
14//! change as time moves forward. On the third hand, we greatly benefit from the
15//! information that some times are no longer of interest, so that we may
16//! compact the representation of the continually changing collections.
17//!
18//! The [`Coordinator`] curates these interactions by observing the progress
19//! collections make through time, choosing timestamps for its own commands,
20//! and eventually communicating that certain times have irretrievably "passed".
21//!
22//! ## Frontiers another way
23//!
24//! If the above description of frontiers left you with questions, this
25//! repackaged explanation might help.
26//!
27//! - `since` is the least recent time (i.e. oldest time) that you can read
28//!   from sources and be guaranteed that the returned data is accurate as of
29//!   that time.
30//!
31//!   Reads at times less than `since` may return values that were not actually
32//!   seen at the specified time, but arrived later (i.e. the results are
33//!   compacted).
34//!
35//!   For correctness' sake, the coordinator never chooses to read at a time
36//!   less than an arrangement's `since`.
37//!
38//! - `upper` is the first time after the most recent time that you can read
39//!   from sources and receive an immediate response. Alternately, it is the
40//!   least time at which the data may still change (that is the reason we may
41//!   not be able to respond immediately).
42//!
43//!   Reads at times >= `upper` may not immediately return because the answer
44//!   isn't known yet. However, once the `upper` is > the specified read time,
45//!   the read can return.
46//!
47//!   For the sake of returned values' freshness, the coordinator prefers
48//!   performing reads at an arrangement's `upper`. However, because we more
49//!   strongly prefer correctness, the coordinator will choose timestamps
50//!   greater than an object's `upper` if it is also being accessed alongside
51//!   objects whose `since` times are >= its `upper`.
52//!
53//! This illustration attempts to show, with time moving left to right, the
54//! relationship between `since` and `upper`.
55//!
56//! - `#`: possibly inaccurate results
57//! - `-`: immediate, correct response
58//! - `?`: not yet known
59//! - `s`: since
60//! - `u`: upper
61//! - `|`: eligible for coordinator to select
62//!
63//! ```nofmt
64//! ####s----u?????
65//!     |||||||||||
66//! ```
67//!
68
69use std::borrow::Cow;
70use std::collections::{BTreeMap, BTreeSet, VecDeque};
71use std::net::IpAddr;
72use std::num::NonZeroI64;
73use std::ops::Neg;
74use std::str::FromStr;
75use std::sync::LazyLock;
76use std::sync::{Arc, Mutex};
77use std::thread;
78use std::time::{Duration, Instant};
79use std::{fmt, mem};
80
81use anyhow::Context;
82use chrono::{DateTime, Utc};
83use derivative::Derivative;
84use differential_dataflow::lattice::Lattice;
85use fail::fail_point;
86use futures::StreamExt;
87use futures::future::{BoxFuture, FutureExt, LocalBoxFuture};
88use http::Uri;
89use ipnet::IpNet;
90use itertools::Itertools;
91use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
92use mz_adapter_types::compaction::CompactionWindow;
93use mz_adapter_types::connection::ConnectionId;
94use mz_adapter_types::dyncfgs::{
95    USER_ID_POOL_BATCH_SIZE, WITH_0DT_DEPLOYMENT_CAUGHT_UP_CHECK_INTERVAL,
96};
97use mz_auth::password::Password;
98use mz_build_info::BuildInfo;
99use mz_catalog::builtin::{BUILTINS, BUILTINS_STATIC, MZ_AUDIT_EVENTS, MZ_STORAGE_USAGE_BY_SHARD};
100use mz_catalog::config::{AwsPrincipalContext, BuiltinItemMigrationConfig, ClusterReplicaSizeMap};
101use mz_catalog::durable::{AuditLogIterator, OpenableDurableCatalogState};
102use mz_catalog::expr_cache::{GlobalExpressions, LocalExpressions};
103use mz_catalog::memory::objects::{
104    CatalogEntry, CatalogItem, ClusterReplicaProcessStatus, ClusterVariantManaged, Connection,
105    DataSourceDesc, StateDiff, StateUpdate, StateUpdateKind, Table, TableDataSource,
106};
107use mz_cloud_resources::{CloudResourceController, VpcEndpointConfig, VpcEndpointEvent};
108use mz_compute_client::as_of_selection;
109use mz_compute_client::controller::error::{
110    CollectionLookupError, CollectionMissing, DataflowCreationError, InstanceMissing,
111};
112use mz_compute_types::ComputeInstanceId;
113use mz_compute_types::dataflows::DataflowDescription;
114use mz_compute_types::plan::Plan;
115use mz_controller::clusters::{
116    ClusterConfig, ClusterEvent, ClusterStatus, ProcessId, ReplicaLocation,
117};
118use mz_controller::{ControllerConfig, Readiness};
119use mz_controller_types::{ClusterId, ReplicaId, WatchSetId};
120use mz_dyncfg::ConfigUpdates;
121use mz_expr::{MapFilterProject, MirRelationExpr, OptimizedMirRelationExpr, RowSetFinishing};
122use mz_license_keys::{ExpirationBehavior, ValidatedLicenseKey};
123use mz_orchestrator::OfflineReason;
124use mz_ore::cast::{CastFrom, CastInto, CastLossy};
125use mz_ore::channel::trigger::Trigger;
126use mz_ore::future::TimeoutError;
127use mz_ore::metrics::MetricsRegistry;
128use mz_ore::now::{EpochMillis, NowFn};
129use mz_ore::task::{JoinHandle, spawn};
130use mz_ore::thread::JoinHandleExt;
131use mz_ore::tracing::{OpenTelemetryContext, TracingHandle};
132use mz_ore::url::SensitiveUrl;
133use mz_ore::{
134    assert_none, instrument, soft_assert_eq_or_log, soft_assert_or_log, soft_panic_or_log, stack,
135};
136use mz_persist_client::PersistClient;
137use mz_persist_client::batch::ProtoBatch;
138use mz_persist_client::usage::{ShardsUsageReferenced, StorageUsageClient};
139use mz_repr::adt::numeric::Numeric;
140use mz_repr::explain::{ExplainConfig, ExplainFormat};
141use mz_repr::global_id::TransientIdGen;
142use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
143use mz_repr::role_id::RoleId;
144use mz_repr::{CatalogItemId, Diff, GlobalId, RelationDesc, SqlRelationType, Timestamp};
145use mz_secrets::cache::CachingSecretsReader;
146use mz_secrets::{SecretsController, SecretsReader};
147use mz_sql::ast::{Raw, Statement};
148use mz_sql::catalog::{CatalogCluster, EnvironmentId};
149use mz_sql::names::{QualifiedItemName, ResolvedIds, SchemaSpecifier};
150use mz_sql::optimizer_metrics::OptimizerMetrics;
151use mz_sql::plan::{
152    self, AlterSinkPlan, ConnectionDetails, CreateConnectionPlan, HirRelationExpr,
153    NetworkPolicyRule, OnTimeoutAction, Params, QueryWhen,
154};
155use mz_sql::session::user::User;
156use mz_sql::session::vars::{MAX_CREDIT_CONSUMPTION_RATE, SystemVars, Var};
157use mz_sql_parser::ast::ExplainStage;
158use mz_sql_parser::ast::display::AstDisplay;
159use mz_storage_client::client::TableData;
160use mz_storage_client::controller::{CollectionDescription, DataSource, ExportDescription};
161use mz_storage_types::connections::Connection as StorageConnection;
162use mz_storage_types::connections::ConnectionContext;
163use mz_storage_types::connections::inline::{IntoInlineConnection, ReferencedConnection};
164use mz_storage_types::read_holds::ReadHold;
165use mz_storage_types::sinks::{S3SinkFormat, StorageSinkDesc};
166use mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC;
167use mz_storage_types::sources::{IngestionDescription, SourceExport, Timeline};
168use mz_timestamp_oracle::{TimestampOracleConfig, WriteTimestamp};
169use mz_transform::dataflow::DataflowMetainfo;
170use opentelemetry::trace::TraceContextExt;
171use serde::Serialize;
172use thiserror::Error;
173use timely::progress::{Antichain, Timestamp as _};
174use tokio::runtime::Handle as TokioHandle;
175use tokio::select;
176use tokio::sync::{OwnedMutexGuard, mpsc, oneshot, watch};
177use tokio::time::{Interval, MissedTickBehavior};
178use tracing::{Instrument, Level, Span, debug, info, info_span, span, warn};
179use tracing_opentelemetry::OpenTelemetrySpanExt;
180use uuid::Uuid;
181
182use crate::active_compute_sink::{ActiveComputeSink, ActiveCopyFrom};
183use crate::catalog::{BuiltinTableUpdate, Catalog, OpenCatalogResult};
184use crate::client::{Client, Handle};
185use crate::command::{Command, ExecuteResponse};
186use crate::config::{SynchronizedParameters, SystemParameterFrontend, SystemParameterSyncConfig};
187use crate::coord::appends::{
188    BuiltinTableAppendNotify, DeferredOp, GroupCommitPermit, PendingWriteTxn,
189};
190use crate::coord::caught_up::CaughtUpCheckContext;
191use crate::coord::cluster_scheduling::SchedulingDecision;
192use crate::coord::id_bundle::CollectionIdBundle;
193use crate::coord::introspection::IntrospectionSubscribe;
194use crate::coord::peek::PendingPeek;
195use crate::coord::statement_logging::StatementLogging;
196use crate::coord::timeline::{TimelineContext, TimelineState};
197use crate::coord::timestamp_selection::{TimestampContext, TimestampDetermination};
198use crate::coord::validity::PlanValidity;
199use crate::error::AdapterError;
200use crate::explain::insights::PlanInsightsContext;
201use crate::explain::optimizer_trace::{DispatchGuard, OptimizerTrace};
202use crate::metrics::Metrics;
203use crate::optimize::dataflows::{ComputeInstanceSnapshot, DataflowBuilder};
204use crate::optimize::{self, Optimize, OptimizerConfig};
205use crate::session::{EndTransactionAction, Session};
206use crate::statement_logging::{
207    StatementEndedExecutionReason, StatementLifecycleEvent, StatementLoggingId,
208};
209use crate::util::{ClientTransmitter, ResultExt, sort_topological};
210use crate::webhook::{WebhookAppenderInvalidator, WebhookConcurrencyLimiter};
211use crate::{AdapterNotice, ReadHolds, flags};
212
213pub(crate) mod appends;
214pub(crate) mod catalog_serving;
215pub(crate) mod cluster_scheduling;
216pub(crate) mod consistency;
217pub(crate) mod id_bundle;
218pub(crate) mod in_memory_oracle;
219pub(crate) mod peek;
220pub(crate) mod read_policy;
221pub(crate) mod read_then_write;
222pub(crate) mod sequencer;
223pub(crate) mod statement_logging;
224pub(crate) mod timeline;
225pub(crate) mod timestamp_selection;
226
227pub mod catalog_implications;
228mod caught_up;
229mod command_handler;
230mod ddl;
231pub(crate) mod group_sync;
232mod indexes;
233mod info_metrics;
234mod introspection;
235mod message_handler;
236mod privatelink_status;
237mod sql;
238mod validity;
239
240/// A pool of pre-allocated user IDs to avoid per-DDL persist writes.
241///
242/// IDs in the range `[next, upper)` are available for allocation.
243/// When exhausted, the pool must be refilled via the catalog.
244///
245/// # Correctness
246///
247/// The pool is owned by [`Coordinator`], which processes all requests
248/// on a single-threaded event loop. Because every access requires
249/// `&mut self` on the coordinator, there is no concurrent access to the
250/// pool — no additional synchronization is needed.
251///
252/// Global ID uniqueness is guaranteed because each refill calls
253/// [`Catalog::allocate_user_ids`], which performs a durable persist
254/// write that atomically reserves the entire batch before any IDs from
255/// it are handed out. If the process crashes after a refill but before
256/// all pre-allocated IDs are consumed, the unused IDs form harmless
257/// gaps in the sequence — user IDs are not required to be contiguous.
258///
259/// This guarantee holds even if multiple `environmentd` processes run
260/// concurrently. Each process has its own independent pool,
261/// but every refill goes through the shared persist-backed catalog,
262/// which serializes allocations across all callers. Two processes
263/// will therefore never receive overlapping ID ranges,
264/// for the same reason they could not before this pool existed.
265#[derive(Debug)]
266pub(crate) struct IdPool {
267    next: u64,
268    upper: u64,
269}
270
271impl IdPool {
272    /// Creates an empty pool.
273    pub fn empty() -> Self {
274        IdPool { next: 0, upper: 0 }
275    }
276
277    /// Allocates a single ID from the pool, returning `None` if exhausted.
278    pub fn allocate(&mut self) -> Option<u64> {
279        if self.next < self.upper {
280            let id = self.next;
281            self.next += 1;
282            Some(id)
283        } else {
284            None
285        }
286    }
287
288    /// Allocates `n` consecutive IDs from the pool, returning `None` if
289    /// insufficient IDs remain.
290    pub fn allocate_many(&mut self, n: u64) -> Option<Vec<u64>> {
291        if self.remaining() >= n {
292            let ids = (self.next..self.next + n).collect();
293            self.next += n;
294            Some(ids)
295        } else {
296            None
297        }
298    }
299
300    /// Returns the number of IDs remaining in the pool.
301    pub fn remaining(&self) -> u64 {
302        self.upper - self.next
303    }
304
305    /// Refills the pool with the given range `[next, upper)`.
306    pub fn refill(&mut self, next: u64, upper: u64) {
307        assert!(next <= upper, "invalid pool range: {next}..{upper}");
308        self.next = next;
309        self.upper = upper;
310    }
311}
312
313#[derive(Debug)]
314pub enum Message {
315    Command(OpenTelemetryContext, Command),
316    ControllerReady {
317        controller: ControllerReadiness,
318    },
319    PurifiedStatementReady(PurifiedStatementReady),
320    CreateConnectionValidationReady(CreateConnectionValidationReady),
321    AlterConnectionValidationReady(AlterConnectionValidationReady),
322    TryDeferred {
323        /// The connection that created this op.
324        conn_id: ConnectionId,
325        /// The write lock that notified us our deferred op might be able to run.
326        ///
327        /// Note: While we never want to hold a partial set of locks, it can be important to hold
328        /// onto the _one_ that notified us our op might be ready. If there are multiple operations
329        /// waiting on a single collection, and we don't hold this lock through retyring the op,
330        /// then everything waiting on this collection will get retried causing traffic in the
331        /// Coordinator's message queue.
332        ///
333        /// See [`DeferredOp::can_be_optimistically_retried`] for more detail.
334        acquired_lock: Option<(CatalogItemId, tokio::sync::OwnedMutexGuard<()>)>,
335    },
336    /// Initiates a group commit.
337    GroupCommitInitiate(Span, Option<GroupCommitPermit>),
338    DeferredStatementReady,
339    AdvanceTimelines,
340    ClusterEvent(ClusterEvent),
341    CancelPendingPeeks {
342        conn_id: ConnectionId,
343    },
344    LinearizeReads,
345    StagedBatches {
346        conn_id: ConnectionId,
347        table_id: CatalogItemId,
348        batches: Vec<Result<ProtoBatch, String>>,
349    },
350    StorageUsageSchedule,
351    StorageUsageFetch,
352    StorageUsageUpdate(ShardsUsageReferenced),
353    StorageUsagePrune(Vec<BuiltinTableUpdate>),
354    ArrangementSizesSchedule,
355    ArrangementSizesSnapshot,
356    ArrangementSizesPrune(Vec<BuiltinTableUpdate>),
357    /// Performs any cleanup and logging actions necessary for
358    /// finalizing a statement execution.
359    RetireExecute {
360        data: ExecuteContextExtra,
361        otel_ctx: OpenTelemetryContext,
362        reason: StatementEndedExecutionReason,
363    },
364    ExecuteSingleStatementTransaction {
365        ctx: ExecuteContext,
366        otel_ctx: OpenTelemetryContext,
367        stmt: Arc<Statement<Raw>>,
368        params: mz_sql::plan::Params,
369    },
370    PeekStageReady {
371        ctx: ExecuteContext,
372        span: Span,
373        stage: PeekStage,
374    },
375    CreateIndexStageReady {
376        ctx: ExecuteContext,
377        span: Span,
378        stage: CreateIndexStage,
379    },
380    CreateViewStageReady {
381        ctx: ExecuteContext,
382        span: Span,
383        stage: CreateViewStage,
384    },
385    CreateMaterializedViewStageReady {
386        ctx: ExecuteContext,
387        span: Span,
388        stage: CreateMaterializedViewStage,
389    },
390    SubscribeStageReady {
391        ctx: ExecuteContext,
392        span: Span,
393        stage: SubscribeStage,
394    },
395    IntrospectionSubscribeStageReady {
396        span: Span,
397        stage: IntrospectionSubscribeStage,
398    },
399    SecretStageReady {
400        ctx: ExecuteContext,
401        span: Span,
402        stage: SecretStage,
403    },
404    ClusterStageReady {
405        ctx: ExecuteContext,
406        span: Span,
407        stage: ClusterStage,
408    },
409    ExplainTimestampStageReady {
410        ctx: ExecuteContext,
411        span: Span,
412        stage: ExplainTimestampStage,
413    },
414    DrainStatementLog,
415    PrivateLinkVpcEndpointEvents(Vec<VpcEndpointEvent>),
416    CheckSchedulingPolicies,
417
418    /// Scheduling policy decisions about turning clusters On/Off.
419    /// `Vec<(policy name, Vec of decisions by the policy)>`
420    /// A cluster will be On if and only if there is at least one On decision for it.
421    /// Scheduling decisions for clusters that have `SCHEDULE = MANUAL` are ignored.
422    SchedulingDecisions(Vec<(&'static str, Vec<(ClusterId, SchedulingDecision)>)>),
423}
424
425impl Message {
426    /// Returns a string to identify the kind of [`Message`], useful for logging.
427    pub const fn kind(&self) -> &'static str {
428        match self {
429            Message::Command(_, msg) => match msg {
430                Command::CatalogSnapshot { .. } => "command-catalog_snapshot",
431                Command::Startup { .. } => "command-startup",
432                Command::Execute { .. } => "command-execute",
433                Command::Commit { .. } => "command-commit",
434                Command::CancelRequest { .. } => "command-cancel_request",
435                Command::PrivilegedCancelRequest { .. } => "command-privileged_cancel_request",
436                Command::GetWebhook { .. } => "command-get_webhook",
437                Command::GetSystemVars { .. } => "command-get_system_vars",
438                Command::SetSystemVars { .. } => "command-set_system_vars",
439                Command::Terminate { .. } => "command-terminate",
440                Command::RetireExecute { .. } => "command-retire_execute",
441                Command::CheckConsistency { .. } => "command-check_consistency",
442                Command::Dump { .. } => "command-dump",
443                Command::AuthenticatePassword { .. } => "command-auth_check",
444                Command::AuthenticateGetSASLChallenge { .. } => "command-auth_get_sasl_challenge",
445                Command::AuthenticateVerifySASLProof { .. } => "command-auth_verify_sasl_proof",
446                Command::CheckRoleCanLogin { .. } => "command-check_role_can_login",
447                Command::GetComputeInstanceClient { .. } => "get-compute-instance-client",
448                Command::GetOracle { .. } => "get-oracle",
449                Command::DetermineRealTimeRecentTimestamp { .. } => {
450                    "determine-real-time-recent-timestamp"
451                }
452                Command::GetTransactionReadHoldsBundle { .. } => {
453                    "get-transaction-read-holds-bundle"
454                }
455                Command::StoreTransactionReadHolds { .. } => "store-transaction-read-holds",
456                Command::ExecuteSlowPathPeek { .. } => "execute-slow-path-peek",
457                Command::ExecuteSubscribe { .. } => "execute-subscribe",
458                Command::CopyToPreflight { .. } => "copy-to-preflight",
459                Command::ExecuteCopyTo { .. } => "execute-copy-to",
460                Command::ExecuteSideEffectingFunc { .. } => "execute-side-effecting-func",
461                Command::RegisterFrontendPeek { .. } => "register-frontend-peek",
462                Command::UnregisterFrontendPeek { .. } => "unregister-frontend-peek",
463                Command::ExplainTimestamp { .. } => "explain-timestamp",
464                Command::FrontendStatementLogging(..) => "frontend-statement-logging",
465                Command::StartCopyFromStdin { .. } => "start-copy-from-stdin",
466                Command::InjectAuditEvents { .. } => "inject-audit-events",
467            },
468            Message::ControllerReady {
469                controller: ControllerReadiness::Compute,
470            } => "controller_ready(compute)",
471            Message::ControllerReady {
472                controller: ControllerReadiness::Storage,
473            } => "controller_ready(storage)",
474            Message::ControllerReady {
475                controller: ControllerReadiness::Metrics,
476            } => "controller_ready(metrics)",
477            Message::ControllerReady {
478                controller: ControllerReadiness::Internal,
479            } => "controller_ready(internal)",
480            Message::PurifiedStatementReady(_) => "purified_statement_ready",
481            Message::CreateConnectionValidationReady(_) => "create_connection_validation_ready",
482            Message::TryDeferred { .. } => "try_deferred",
483            Message::GroupCommitInitiate(..) => "group_commit_initiate",
484            Message::AdvanceTimelines => "advance_timelines",
485            Message::ClusterEvent(_) => "cluster_event",
486            Message::CancelPendingPeeks { .. } => "cancel_pending_peeks",
487            Message::LinearizeReads => "linearize_reads",
488            Message::StagedBatches { .. } => "staged_batches",
489            Message::StorageUsageSchedule => "storage_usage_schedule",
490            Message::StorageUsageFetch => "storage_usage_fetch",
491            Message::StorageUsageUpdate(_) => "storage_usage_update",
492            Message::StorageUsagePrune(_) => "storage_usage_prune",
493            Message::ArrangementSizesSchedule => "arrangement_sizes_schedule",
494            Message::ArrangementSizesSnapshot => "arrangement_sizes_snapshot",
495            Message::ArrangementSizesPrune(_) => "arrangement_sizes_prune",
496            Message::RetireExecute { .. } => "retire_execute",
497            Message::ExecuteSingleStatementTransaction { .. } => {
498                "execute_single_statement_transaction"
499            }
500            Message::PeekStageReady { .. } => "peek_stage_ready",
501            Message::ExplainTimestampStageReady { .. } => "explain_timestamp_stage_ready",
502            Message::CreateIndexStageReady { .. } => "create_index_stage_ready",
503            Message::CreateViewStageReady { .. } => "create_view_stage_ready",
504            Message::CreateMaterializedViewStageReady { .. } => {
505                "create_materialized_view_stage_ready"
506            }
507            Message::SubscribeStageReady { .. } => "subscribe_stage_ready",
508            Message::IntrospectionSubscribeStageReady { .. } => {
509                "introspection_subscribe_stage_ready"
510            }
511            Message::SecretStageReady { .. } => "secret_stage_ready",
512            Message::ClusterStageReady { .. } => "cluster_stage_ready",
513            Message::DrainStatementLog => "drain_statement_log",
514            Message::AlterConnectionValidationReady(..) => "alter_connection_validation_ready",
515            Message::PrivateLinkVpcEndpointEvents(_) => "private_link_vpc_endpoint_events",
516            Message::CheckSchedulingPolicies => "check_scheduling_policies",
517            Message::SchedulingDecisions { .. } => "scheduling_decision",
518            Message::DeferredStatementReady => "deferred_statement_ready",
519        }
520    }
521}
522
523/// The reason for why a controller needs processing on the main loop.
524#[derive(Debug)]
525pub enum ControllerReadiness {
526    /// The storage controller is ready.
527    Storage,
528    /// The compute controller is ready.
529    Compute,
530    /// A batch of metric data is ready.
531    Metrics,
532    /// An internally-generated message is ready to be returned.
533    Internal,
534}
535
536#[derive(Derivative)]
537#[derivative(Debug)]
538pub struct BackgroundWorkResult<T> {
539    #[derivative(Debug = "ignore")]
540    pub ctx: ExecuteContext,
541    pub result: Result<T, AdapterError>,
542    pub params: Params,
543    pub plan_validity: PlanValidity,
544    pub original_stmt: Arc<Statement<Raw>>,
545    pub otel_ctx: OpenTelemetryContext,
546}
547
548pub type PurifiedStatementReady = BackgroundWorkResult<mz_sql::pure::PurifiedStatement>;
549
550#[derive(Derivative)]
551#[derivative(Debug)]
552pub struct ValidationReady<T> {
553    #[derivative(Debug = "ignore")]
554    pub ctx: ExecuteContext,
555    pub result: Result<T, AdapterError>,
556    pub resolved_ids: ResolvedIds,
557    pub connection_id: CatalogItemId,
558    pub connection_gid: GlobalId,
559    pub plan_validity: PlanValidity,
560    pub otel_ctx: OpenTelemetryContext,
561}
562
563pub type CreateConnectionValidationReady = ValidationReady<CreateConnectionPlan>;
564pub type AlterConnectionValidationReady = ValidationReady<Connection>;
565
566#[derive(Debug)]
567pub enum PeekStage {
568    /// Common stages across SELECT, EXPLAIN and COPY TO queries.
569    LinearizeTimestamp(PeekStageLinearizeTimestamp),
570    RealTimeRecency(PeekStageRealTimeRecency),
571    TimestampReadHold(PeekStageTimestampReadHold),
572    Optimize(PeekStageOptimize),
573    /// Final stage for a peek.
574    Finish(PeekStageFinish),
575    /// Final stage for an explain.
576    ExplainPlan(PeekStageExplainPlan),
577    ExplainPushdown(PeekStageExplainPushdown),
578    /// Preflight checks for a copy to operation.
579    CopyToPreflight(PeekStageCopyTo),
580    /// Final stage for a copy to which involves shipping the dataflow.
581    CopyToDataflow(PeekStageCopyTo),
582}
583
584#[derive(Debug)]
585pub struct CopyToContext {
586    /// The `RelationDesc` of the data to be copied.
587    pub desc: RelationDesc,
588    /// The destination uri of the external service where the data will be copied.
589    pub uri: Uri,
590    /// Connection information required to connect to the external service to copy the data.
591    pub connection: StorageConnection<ReferencedConnection>,
592    /// The ID of the CONNECTION object to be used for copying the data.
593    pub connection_id: CatalogItemId,
594    /// Format params to format the data.
595    pub format: S3SinkFormat,
596    /// Approximate max file size of each uploaded file.
597    pub max_file_size: u64,
598    /// Number of batches the output of the COPY TO will be partitioned into
599    /// to distribute the load across workers deterministically.
600    /// This is only an option since it's not set when CopyToContext is instantiated
601    /// but immediately after in the PeekStageValidate stage.
602    pub output_batch_count: Option<u64>,
603}
604
605#[derive(Debug)]
606pub struct PeekStageLinearizeTimestamp {
607    validity: PlanValidity,
608    plan: mz_sql::plan::SelectPlan,
609    max_query_result_size: Option<u64>,
610    source_ids: BTreeSet<GlobalId>,
611    target_replica: Option<ReplicaId>,
612    timeline_context: TimelineContext,
613    optimizer: optimize::PeekOptimizer,
614    /// An optional context set iff the state machine is initiated from
615    /// sequencing an EXPLAIN for this statement.
616    explain_ctx: ExplainContext,
617}
618
619#[derive(Debug)]
620pub struct PeekStageRealTimeRecency {
621    validity: PlanValidity,
622    plan: mz_sql::plan::SelectPlan,
623    max_query_result_size: Option<u64>,
624    source_ids: BTreeSet<GlobalId>,
625    target_replica: Option<ReplicaId>,
626    timeline_context: TimelineContext,
627    oracle_read_ts: Option<Timestamp>,
628    optimizer: optimize::PeekOptimizer,
629    /// An optional context set iff the state machine is initiated from
630    /// sequencing an EXPLAIN for this statement.
631    explain_ctx: ExplainContext,
632}
633
634#[derive(Debug)]
635pub struct PeekStageTimestampReadHold {
636    validity: PlanValidity,
637    plan: mz_sql::plan::SelectPlan,
638    max_query_result_size: Option<u64>,
639    source_ids: BTreeSet<GlobalId>,
640    target_replica: Option<ReplicaId>,
641    timeline_context: TimelineContext,
642    oracle_read_ts: Option<Timestamp>,
643    real_time_recency_ts: Option<mz_repr::Timestamp>,
644    optimizer: optimize::PeekOptimizer,
645    /// An optional context set iff the state machine is initiated from
646    /// sequencing an EXPLAIN for this statement.
647    explain_ctx: ExplainContext,
648}
649
650#[derive(Debug)]
651pub struct PeekStageOptimize {
652    validity: PlanValidity,
653    plan: mz_sql::plan::SelectPlan,
654    max_query_result_size: Option<u64>,
655    source_ids: BTreeSet<GlobalId>,
656    id_bundle: CollectionIdBundle,
657    target_replica: Option<ReplicaId>,
658    determination: TimestampDetermination,
659    optimizer: optimize::PeekOptimizer,
660    /// An optional context set iff the state machine is initiated from
661    /// sequencing an EXPLAIN for this statement.
662    explain_ctx: ExplainContext,
663}
664
665#[derive(Debug)]
666pub struct PeekStageFinish {
667    validity: PlanValidity,
668    plan: mz_sql::plan::SelectPlan,
669    max_query_result_size: Option<u64>,
670    id_bundle: CollectionIdBundle,
671    target_replica: Option<ReplicaId>,
672    source_ids: BTreeSet<GlobalId>,
673    determination: TimestampDetermination,
674    cluster_id: ComputeInstanceId,
675    finishing: RowSetFinishing,
676    /// When present, an optimizer trace to be used for emitting a plan insights
677    /// notice.
678    plan_insights_optimizer_trace: Option<OptimizerTrace>,
679    insights_ctx: Option<Box<PlanInsightsContext>>,
680    global_lir_plan: optimize::peek::GlobalLirPlan,
681    optimization_finished_at: EpochMillis,
682}
683
684#[derive(Debug)]
685pub struct PeekStageCopyTo {
686    validity: PlanValidity,
687    optimizer: optimize::copy_to::Optimizer,
688    global_lir_plan: optimize::copy_to::GlobalLirPlan,
689    optimization_finished_at: EpochMillis,
690    source_ids: BTreeSet<GlobalId>,
691}
692
693#[derive(Debug)]
694pub struct PeekStageExplainPlan {
695    validity: PlanValidity,
696    optimizer: optimize::peek::Optimizer,
697    df_meta: DataflowMetainfo,
698    explain_ctx: ExplainPlanContext,
699    insights_ctx: Option<Box<PlanInsightsContext>>,
700}
701
702#[derive(Debug)]
703pub struct PeekStageExplainPushdown {
704    validity: PlanValidity,
705    determination: TimestampDetermination,
706    imports: BTreeMap<GlobalId, MapFilterProject>,
707}
708
709#[derive(Debug)]
710pub enum CreateIndexStage {
711    Optimize(CreateIndexOptimize),
712    Finish(CreateIndexFinish),
713    Explain(CreateIndexExplain),
714}
715
716#[derive(Debug)]
717pub struct CreateIndexOptimize {
718    validity: PlanValidity,
719    plan: plan::CreateIndexPlan,
720    resolved_ids: ResolvedIds,
721    /// An optional context set iff the state machine is initiated from
722    /// sequencing an EXPLAIN for this statement.
723    explain_ctx: ExplainContext,
724}
725
726#[derive(Debug)]
727pub struct CreateIndexFinish {
728    validity: PlanValidity,
729    item_id: CatalogItemId,
730    global_id: GlobalId,
731    plan: plan::CreateIndexPlan,
732    resolved_ids: ResolvedIds,
733    global_mir_plan: optimize::index::GlobalMirPlan,
734    global_lir_plan: optimize::index::GlobalLirPlan,
735    optimizer_features: OptimizerFeatures,
736}
737
738#[derive(Debug)]
739pub struct CreateIndexExplain {
740    validity: PlanValidity,
741    exported_index_id: GlobalId,
742    plan: plan::CreateIndexPlan,
743    df_meta: DataflowMetainfo,
744    explain_ctx: ExplainPlanContext,
745}
746
747#[derive(Debug)]
748pub enum CreateViewStage {
749    Optimize(CreateViewOptimize),
750    Finish(CreateViewFinish),
751    Explain(CreateViewExplain),
752}
753
754#[derive(Debug)]
755pub struct CreateViewOptimize {
756    validity: PlanValidity,
757    plan: plan::CreateViewPlan,
758    resolved_ids: ResolvedIds,
759    /// An optional context set iff the state machine is initiated from
760    /// sequencing an EXPLAIN for this statement.
761    explain_ctx: ExplainContext,
762}
763
764#[derive(Debug)]
765pub struct CreateViewFinish {
766    validity: PlanValidity,
767    /// ID of this item in the Catalog.
768    item_id: CatalogItemId,
769    /// ID by with Compute will reference this View.
770    global_id: GlobalId,
771    plan: plan::CreateViewPlan,
772    /// IDs of objects resolved during name resolution.
773    resolved_ids: ResolvedIds,
774    optimized_expr: OptimizedMirRelationExpr,
775}
776
777#[derive(Debug)]
778pub struct CreateViewExplain {
779    validity: PlanValidity,
780    id: GlobalId,
781    plan: plan::CreateViewPlan,
782    explain_ctx: ExplainPlanContext,
783}
784
785#[derive(Debug)]
786pub enum ExplainTimestampStage {
787    Optimize(ExplainTimestampOptimize),
788    RealTimeRecency(ExplainTimestampRealTimeRecency),
789    Finish(ExplainTimestampFinish),
790}
791
792#[derive(Debug)]
793pub struct ExplainTimestampOptimize {
794    validity: PlanValidity,
795    plan: plan::ExplainTimestampPlan,
796    cluster_id: ClusterId,
797}
798
799#[derive(Debug)]
800pub struct ExplainTimestampRealTimeRecency {
801    validity: PlanValidity,
802    format: ExplainFormat,
803    optimized_plan: OptimizedMirRelationExpr,
804    cluster_id: ClusterId,
805    when: QueryWhen,
806}
807
808#[derive(Debug)]
809pub struct ExplainTimestampFinish {
810    validity: PlanValidity,
811    format: ExplainFormat,
812    optimized_plan: OptimizedMirRelationExpr,
813    cluster_id: ClusterId,
814    source_ids: BTreeSet<GlobalId>,
815    when: QueryWhen,
816    real_time_recency_ts: Option<Timestamp>,
817}
818
819#[derive(Debug)]
820pub enum ClusterStage {
821    Alter(AlterCluster),
822    WaitForHydrated(AlterClusterWaitForHydrated),
823    Finalize(AlterClusterFinalize),
824}
825
826#[derive(Debug)]
827pub struct AlterCluster {
828    validity: PlanValidity,
829    plan: plan::AlterClusterPlan,
830}
831
832#[derive(Debug)]
833pub struct AlterClusterWaitForHydrated {
834    validity: PlanValidity,
835    plan: plan::AlterClusterPlan,
836    new_config: ClusterVariantManaged,
837    workload_class: Option<String>,
838    timeout_time: Instant,
839    on_timeout: OnTimeoutAction,
840}
841
842#[derive(Debug)]
843pub struct AlterClusterFinalize {
844    validity: PlanValidity,
845    plan: plan::AlterClusterPlan,
846    new_config: ClusterVariantManaged,
847    workload_class: Option<String>,
848}
849
850#[derive(Debug)]
851pub enum ExplainContext {
852    /// The ordinary, non-explain variant of the statement.
853    None,
854    /// The `EXPLAIN <level> PLAN FOR <explainee>` version of the statement.
855    Plan(ExplainPlanContext),
856    /// Generate a notice containing the `EXPLAIN PLAN INSIGHTS` output
857    /// alongside the query's normal output.
858    PlanInsightsNotice(OptimizerTrace),
859    /// `EXPLAIN FILTER PUSHDOWN`
860    Pushdown,
861}
862
863impl ExplainContext {
864    /// If available for this context, wrap the [`OptimizerTrace`] into a
865    /// [`tracing::Dispatch`] and set it as default, returning the resulting
866    /// guard in a `Some(guard)` option.
867    pub(crate) fn dispatch_guard(&self) -> Option<DispatchGuard<'_>> {
868        let optimizer_trace = match self {
869            ExplainContext::Plan(explain_ctx) => Some(&explain_ctx.optimizer_trace),
870            ExplainContext::PlanInsightsNotice(optimizer_trace) => Some(optimizer_trace),
871            _ => None,
872        };
873        optimizer_trace.map(|optimizer_trace| optimizer_trace.as_guard())
874    }
875
876    pub(crate) fn needs_cluster(&self) -> bool {
877        match self {
878            ExplainContext::None => true,
879            ExplainContext::Plan(..) => false,
880            ExplainContext::PlanInsightsNotice(..) => true,
881            ExplainContext::Pushdown => false,
882        }
883    }
884
885    pub(crate) fn needs_plan_insights(&self) -> bool {
886        matches!(
887            self,
888            ExplainContext::Plan(ExplainPlanContext {
889                stage: ExplainStage::PlanInsights,
890                ..
891            }) | ExplainContext::PlanInsightsNotice(_)
892        )
893    }
894}
895
896#[derive(Debug)]
897pub struct ExplainPlanContext {
898    /// EXPLAIN BROKEN is internal syntax for showing EXPLAIN output despite an internal error in
899    /// the optimizer: we don't immediately bail out from peek sequencing when an internal optimizer
900    /// error happens, but go on with trying to show the requested EXPLAIN stage. This can still
901    /// succeed if the requested EXPLAIN stage is before the point where the error happened.
902    pub broken: bool,
903    pub config: ExplainConfig,
904    pub format: ExplainFormat,
905    pub stage: ExplainStage,
906    pub replan: Option<GlobalId>,
907    pub desc: Option<RelationDesc>,
908    pub optimizer_trace: OptimizerTrace,
909}
910
911#[derive(Debug)]
912pub enum CreateMaterializedViewStage {
913    Optimize(CreateMaterializedViewOptimize),
914    Finish(CreateMaterializedViewFinish),
915    Explain(CreateMaterializedViewExplain),
916}
917
918#[derive(Debug)]
919pub struct CreateMaterializedViewOptimize {
920    validity: PlanValidity,
921    plan: plan::CreateMaterializedViewPlan,
922    resolved_ids: ResolvedIds,
923    /// An optional context set iff the state machine is initiated from
924    /// sequencing an EXPLAIN for this statement.
925    explain_ctx: ExplainContext,
926}
927
928#[derive(Debug)]
929pub struct CreateMaterializedViewFinish {
930    /// The ID of this Materialized View in the Catalog.
931    item_id: CatalogItemId,
932    /// The ID of the durable pTVC backing this Materialized View.
933    global_id: GlobalId,
934    validity: PlanValidity,
935    plan: plan::CreateMaterializedViewPlan,
936    resolved_ids: ResolvedIds,
937    local_mir_plan: optimize::materialized_view::LocalMirPlan,
938    global_mir_plan: optimize::materialized_view::GlobalMirPlan,
939    global_lir_plan: optimize::materialized_view::GlobalLirPlan,
940    optimizer_features: OptimizerFeatures,
941}
942
943#[derive(Debug)]
944pub struct CreateMaterializedViewExplain {
945    global_id: GlobalId,
946    validity: PlanValidity,
947    plan: plan::CreateMaterializedViewPlan,
948    df_meta: DataflowMetainfo,
949    explain_ctx: ExplainPlanContext,
950}
951
952#[derive(Debug)]
953pub enum SubscribeStage {
954    OptimizeMir(SubscribeOptimizeMir),
955    TimestampOptimizeLir(SubscribeTimestampOptimizeLir),
956    Finish(SubscribeFinish),
957    Explain(SubscribeExplain),
958}
959
960#[derive(Debug)]
961pub struct SubscribeOptimizeMir {
962    validity: PlanValidity,
963    plan: plan::SubscribePlan,
964    timeline: TimelineContext,
965    dependency_ids: BTreeSet<GlobalId>,
966    cluster_id: ComputeInstanceId,
967    replica_id: Option<ReplicaId>,
968    /// An optional context set iff the state machine is initiated from
969    /// sequencing an EXPLAIN for this statement.
970    explain_ctx: ExplainContext,
971}
972
973#[derive(Debug)]
974pub struct SubscribeTimestampOptimizeLir {
975    validity: PlanValidity,
976    plan: plan::SubscribePlan,
977    timeline: TimelineContext,
978    optimizer: optimize::subscribe::Optimizer,
979    global_mir_plan: optimize::subscribe::GlobalMirPlan<optimize::subscribe::Unresolved>,
980    dependency_ids: BTreeSet<GlobalId>,
981    replica_id: Option<ReplicaId>,
982    /// An optional context set iff the state machine is initiated from
983    /// sequencing an EXPLAIN for this statement.
984    explain_ctx: ExplainContext,
985}
986
987#[derive(Debug)]
988pub struct SubscribeFinish {
989    validity: PlanValidity,
990    cluster_id: ComputeInstanceId,
991    replica_id: Option<ReplicaId>,
992    plan: plan::SubscribePlan,
993    global_lir_plan: optimize::subscribe::GlobalLirPlan,
994    dependency_ids: BTreeSet<GlobalId>,
995}
996
997#[derive(Debug)]
998pub struct SubscribeExplain {
999    validity: PlanValidity,
1000    optimizer: optimize::subscribe::Optimizer,
1001    df_meta: DataflowMetainfo,
1002    cluster_id: ComputeInstanceId,
1003    explain_ctx: ExplainPlanContext,
1004}
1005
1006#[derive(Debug)]
1007pub enum IntrospectionSubscribeStage {
1008    OptimizeMir(IntrospectionSubscribeOptimizeMir),
1009    TimestampOptimizeLir(IntrospectionSubscribeTimestampOptimizeLir),
1010    Finish(IntrospectionSubscribeFinish),
1011}
1012
1013#[derive(Debug)]
1014pub struct IntrospectionSubscribeOptimizeMir {
1015    validity: PlanValidity,
1016    plan: plan::SubscribePlan,
1017    subscribe_id: GlobalId,
1018    cluster_id: ComputeInstanceId,
1019    replica_id: ReplicaId,
1020}
1021
1022#[derive(Debug)]
1023pub struct IntrospectionSubscribeTimestampOptimizeLir {
1024    validity: PlanValidity,
1025    optimizer: optimize::subscribe::Optimizer,
1026    global_mir_plan: optimize::subscribe::GlobalMirPlan<optimize::subscribe::Unresolved>,
1027    cluster_id: ComputeInstanceId,
1028    replica_id: ReplicaId,
1029}
1030
1031#[derive(Debug)]
1032pub struct IntrospectionSubscribeFinish {
1033    validity: PlanValidity,
1034    global_lir_plan: optimize::subscribe::GlobalLirPlan,
1035    read_holds: ReadHolds,
1036    cluster_id: ComputeInstanceId,
1037    replica_id: ReplicaId,
1038}
1039
1040#[derive(Debug)]
1041pub enum SecretStage {
1042    CreateEnsure(CreateSecretEnsure),
1043    CreateFinish(CreateSecretFinish),
1044    RotateKeysEnsure(RotateKeysSecretEnsure),
1045    RotateKeysFinish(RotateKeysSecretFinish),
1046    Alter(AlterSecret),
1047}
1048
1049#[derive(Debug)]
1050pub struct CreateSecretEnsure {
1051    validity: PlanValidity,
1052    plan: plan::CreateSecretPlan,
1053}
1054
1055#[derive(Debug)]
1056pub struct CreateSecretFinish {
1057    validity: PlanValidity,
1058    item_id: CatalogItemId,
1059    global_id: GlobalId,
1060    plan: plan::CreateSecretPlan,
1061}
1062
1063#[derive(Debug)]
1064pub struct RotateKeysSecretEnsure {
1065    validity: PlanValidity,
1066    id: CatalogItemId,
1067}
1068
1069#[derive(Debug)]
1070pub struct RotateKeysSecretFinish {
1071    validity: PlanValidity,
1072    ops: Vec<crate::catalog::Op>,
1073}
1074
1075#[derive(Debug)]
1076pub struct AlterSecret {
1077    validity: PlanValidity,
1078    plan: plan::AlterSecretPlan,
1079}
1080
1081/// An enum describing which cluster to run a statement on.
1082///
1083/// One example usage would be that if a query depends only on system tables, we might
1084/// automatically run it on the catalog server cluster to benefit from indexes that exist there.
1085#[derive(Debug, Copy, Clone, PartialEq, Eq)]
1086pub enum TargetCluster {
1087    /// The catalog server cluster.
1088    CatalogServer,
1089    /// The current user's active cluster.
1090    Active,
1091    /// The cluster selected at the start of a transaction.
1092    Transaction(ClusterId),
1093}
1094
1095/// Result types for each stage of a sequence.
1096pub(crate) enum StageResult<T> {
1097    /// A task was spawned that will return the next stage.
1098    Handle(JoinHandle<Result<T, AdapterError>>),
1099    /// A task was spawned that will return a response for the client.
1100    HandleRetire(JoinHandle<Result<ExecuteResponse, AdapterError>>),
1101    /// The next stage is immediately ready and will execute.
1102    Immediate(T),
1103    /// The final stage was executed and is ready to respond to the client.
1104    Response(ExecuteResponse),
1105}
1106
1107/// Common functionality for [Coordinator::sequence_staged].
1108pub(crate) trait Staged: Send {
1109    type Ctx: StagedContext;
1110
1111    fn validity(&mut self) -> &mut PlanValidity;
1112
1113    /// Returns the next stage or final result.
1114    async fn stage(
1115        self,
1116        coord: &mut Coordinator,
1117        ctx: &mut Self::Ctx,
1118    ) -> Result<StageResult<Box<Self>>, AdapterError>;
1119
1120    /// Prepares a message for the Coordinator.
1121    fn message(self, ctx: Self::Ctx, span: Span) -> Message;
1122
1123    /// Whether it is safe to SQL cancel this stage.
1124    fn cancel_enabled(&self) -> bool;
1125}
1126
1127pub trait StagedContext {
1128    fn retire(self, result: Result<ExecuteResponse, AdapterError>);
1129    fn session(&self) -> Option<&Session>;
1130}
1131
1132impl StagedContext for ExecuteContext {
1133    fn retire(self, result: Result<ExecuteResponse, AdapterError>) {
1134        self.retire(result);
1135    }
1136
1137    fn session(&self) -> Option<&Session> {
1138        Some(self.session())
1139    }
1140}
1141
1142impl StagedContext for () {
1143    fn retire(self, _result: Result<ExecuteResponse, AdapterError>) {}
1144
1145    fn session(&self) -> Option<&Session> {
1146        None
1147    }
1148}
1149
1150/// Configures a coordinator.
1151pub struct Config {
1152    pub controller_config: ControllerConfig,
1153    pub controller_envd_epoch: NonZeroI64,
1154    pub storage: Box<dyn mz_catalog::durable::DurableCatalogState>,
1155    pub audit_logs_iterator: AuditLogIterator,
1156    pub timestamp_oracle_url: Option<SensitiveUrl>,
1157    pub unsafe_mode: bool,
1158    pub all_features: bool,
1159    pub build_info: &'static BuildInfo,
1160    pub environment_id: EnvironmentId,
1161    pub metrics_registry: MetricsRegistry,
1162    pub now: NowFn,
1163    pub secrets_controller: Arc<dyn SecretsController>,
1164    pub cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
1165    pub availability_zones: Vec<String>,
1166    pub cluster_replica_sizes: ClusterReplicaSizeMap,
1167    pub builtin_system_cluster_config: BootstrapBuiltinClusterConfig,
1168    pub builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig,
1169    pub builtin_probe_cluster_config: BootstrapBuiltinClusterConfig,
1170    pub builtin_support_cluster_config: BootstrapBuiltinClusterConfig,
1171    pub builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig,
1172    pub system_parameter_defaults: BTreeMap<String, String>,
1173    pub storage_usage_client: StorageUsageClient,
1174    pub storage_usage_collection_interval: Duration,
1175    pub storage_usage_retention_period: Option<Duration>,
1176    pub segment_client: Option<mz_segment::Client>,
1177    pub egress_addresses: Vec<IpNet>,
1178    pub remote_system_parameters: Option<BTreeMap<String, String>>,
1179    pub aws_account_id: Option<String>,
1180    pub aws_privatelink_availability_zones: Option<Vec<String>>,
1181    pub connection_context: ConnectionContext,
1182    pub connection_limit_callback: Box<dyn Fn(u64, u64) -> () + Send + Sync + 'static>,
1183    pub webhook_concurrency_limit: WebhookConcurrencyLimiter,
1184    pub http_host_name: Option<String>,
1185    pub tracing_handle: TracingHandle,
1186    /// Whether or not to start controllers in read-only mode. This is only
1187    /// meant for use during development of read-only clusters and 0dt upgrades
1188    /// and should go away once we have proper orchestration during upgrades.
1189    pub read_only_controllers: bool,
1190
1191    /// A trigger that signals that the current deployment has caught up with a
1192    /// previous deployment. Only used during 0dt deployment, while in read-only
1193    /// mode.
1194    pub caught_up_trigger: Option<Trigger>,
1195
1196    pub helm_chart_version: Option<String>,
1197    pub license_key: ValidatedLicenseKey,
1198    pub external_login_password_mz_system: Option<Password>,
1199    pub force_builtin_schema_migration: Option<String>,
1200}
1201
1202/// Metadata about an active connection.
1203#[derive(Debug, Serialize)]
1204pub struct ConnMeta {
1205    /// Pgwire specifies that every connection have a 32-bit secret associated
1206    /// with it, that is known to both the client and the server. Cancellation
1207    /// requests are required to authenticate with the secret of the connection
1208    /// that they are targeting.
1209    secret_key: u32,
1210    /// The time when the session's connection was initiated.
1211    connected_at: EpochMillis,
1212    user: User,
1213    application_name: String,
1214    uuid: Uuid,
1215    conn_id: ConnectionId,
1216    client_ip: Option<IpAddr>,
1217
1218    /// Sinks that will need to be dropped when the current transaction, if
1219    /// any, is cleared.
1220    drop_sinks: BTreeSet<GlobalId>,
1221
1222    /// Lock for the Coordinator's deferred statements that is dropped on transaction clear.
1223    #[serde(skip)]
1224    deferred_lock: Option<OwnedMutexGuard<()>>,
1225
1226    /// Cluster reconfigurations that will need to be
1227    /// cleaned up when the current transaction is cleared
1228    pending_cluster_alters: BTreeSet<ClusterId>,
1229
1230    /// Channel on which to send notices to a session.
1231    #[serde(skip)]
1232    notice_tx: mpsc::UnboundedSender<AdapterNotice>,
1233
1234    /// The role that initiated the database context. Fixed for the duration of the connection.
1235    /// WARNING: This role reference is not updated when the role is dropped.
1236    /// Consumers should not assume that this role exist.
1237    authenticated_role: RoleId,
1238}
1239
1240impl ConnMeta {
1241    pub fn conn_id(&self) -> &ConnectionId {
1242        &self.conn_id
1243    }
1244
1245    pub fn user(&self) -> &User {
1246        &self.user
1247    }
1248
1249    pub fn application_name(&self) -> &str {
1250        &self.application_name
1251    }
1252
1253    pub fn authenticated_role_id(&self) -> &RoleId {
1254        &self.authenticated_role
1255    }
1256
1257    pub fn uuid(&self) -> Uuid {
1258        self.uuid
1259    }
1260
1261    pub fn client_ip(&self) -> Option<IpAddr> {
1262        self.client_ip
1263    }
1264
1265    pub fn connected_at(&self) -> EpochMillis {
1266        self.connected_at
1267    }
1268}
1269
1270#[derive(Debug)]
1271/// A pending transaction waiting to be committed.
1272pub struct PendingTxn {
1273    /// Context used to send a response back to the client.
1274    ctx: ExecuteContext,
1275    /// Client response for transaction.
1276    response: Result<PendingTxnResponse, AdapterError>,
1277    /// The action to take at the end of the transaction.
1278    action: EndTransactionAction,
1279}
1280
1281#[derive(Debug)]
1282/// The response we'll send for a [`PendingTxn`].
1283pub enum PendingTxnResponse {
1284    /// The transaction will be committed.
1285    Committed {
1286        /// Parameters that will change, and their values, once this transaction is complete.
1287        params: BTreeMap<&'static str, String>,
1288    },
1289    /// The transaction will be rolled back.
1290    Rolledback {
1291        /// Parameters that will change, and their values, once this transaction is complete.
1292        params: BTreeMap<&'static str, String>,
1293    },
1294}
1295
1296impl PendingTxnResponse {
1297    pub fn extend_params(&mut self, p: impl IntoIterator<Item = (&'static str, String)>) {
1298        match self {
1299            PendingTxnResponse::Committed { params }
1300            | PendingTxnResponse::Rolledback { params } => params.extend(p),
1301        }
1302    }
1303}
1304
1305impl From<PendingTxnResponse> for ExecuteResponse {
1306    fn from(value: PendingTxnResponse) -> Self {
1307        match value {
1308            PendingTxnResponse::Committed { params } => {
1309                ExecuteResponse::TransactionCommitted { params }
1310            }
1311            PendingTxnResponse::Rolledback { params } => {
1312                ExecuteResponse::TransactionRolledBack { params }
1313            }
1314        }
1315    }
1316}
1317
1318#[derive(Debug)]
1319/// A pending read transaction waiting to be linearized along with metadata about it's state
1320pub struct PendingReadTxn {
1321    /// The transaction type
1322    txn: PendingRead,
1323    /// The timestamp context of the transaction.
1324    timestamp_context: TimestampContext,
1325    /// When we created this pending txn, when the transaction ends. Only used for metrics.
1326    created: Instant,
1327    /// Number of times we requeued the processing of this pending read txn.
1328    /// Requeueing is necessary if the time we executed the query is after the current oracle time;
1329    /// see [`Coordinator::message_linearize_reads`] for more details.
1330    num_requeues: u64,
1331    /// Telemetry context.
1332    otel_ctx: OpenTelemetryContext,
1333}
1334
1335impl PendingReadTxn {
1336    /// Return the timestamp context of the pending read transaction.
1337    pub fn timestamp_context(&self) -> &TimestampContext {
1338        &self.timestamp_context
1339    }
1340
1341    pub(crate) fn take_context(self) -> ExecuteContext {
1342        self.txn.take_context()
1343    }
1344}
1345
1346#[derive(Debug)]
1347/// A pending read transaction waiting to be linearized.
1348enum PendingRead {
1349    Read {
1350        /// The inner transaction.
1351        txn: PendingTxn,
1352    },
1353    ReadThenWrite {
1354        /// Context used to send a response back to the client.
1355        ctx: ExecuteContext,
1356        /// Channel used to alert the transaction that the read has been linearized and send back
1357        /// `ctx`.
1358        tx: oneshot::Sender<Option<ExecuteContext>>,
1359    },
1360}
1361
1362impl PendingRead {
1363    /// Alert the client that the read has been linearized.
1364    ///
1365    /// If it is necessary to finalize an execute, return the state necessary to do so
1366    /// (execution context and result)
1367    #[instrument(level = "debug")]
1368    pub fn finish(self) -> Option<(ExecuteContext, Result<ExecuteResponse, AdapterError>)> {
1369        match self {
1370            PendingRead::Read {
1371                txn:
1372                    PendingTxn {
1373                        mut ctx,
1374                        response,
1375                        action,
1376                    },
1377                ..
1378            } => {
1379                let changed = ctx.session_mut().vars_mut().end_transaction(action);
1380                // Append any parameters that changed to the response.
1381                let response = response.map(|mut r| {
1382                    r.extend_params(changed);
1383                    ExecuteResponse::from(r)
1384                });
1385
1386                Some((ctx, response))
1387            }
1388            PendingRead::ReadThenWrite { ctx, tx, .. } => {
1389                // Ignore errors if the caller has hung up.
1390                let _ = tx.send(Some(ctx));
1391                None
1392            }
1393        }
1394    }
1395
1396    fn label(&self) -> &'static str {
1397        match self {
1398            PendingRead::Read { .. } => "read",
1399            PendingRead::ReadThenWrite { .. } => "read_then_write",
1400        }
1401    }
1402
1403    pub(crate) fn take_context(self) -> ExecuteContext {
1404        match self {
1405            PendingRead::Read { txn, .. } => txn.ctx,
1406            PendingRead::ReadThenWrite { ctx, tx, .. } => {
1407                // Inform the transaction that we've taken their context.
1408                // Ignore errors if the caller has hung up.
1409                let _ = tx.send(None);
1410                ctx
1411            }
1412        }
1413    }
1414}
1415
1416/// State that the coordinator must process as part of retiring
1417/// command execution.  `ExecuteContextExtra::Default` is guaranteed
1418/// to produce a value that will cause the coordinator to do nothing, and
1419/// is intended for use by code that invokes the execution processing flow
1420/// (i.e., `sequence_plan`) without actually being a statement execution.
1421///
1422/// This is a pure data struct containing only the statement logging ID.
1423/// For auto-retire-on-drop behavior, use `ExecuteContextGuard` which wraps
1424/// this struct and owns the channel for sending retirement messages.
1425#[derive(Debug, Default)]
1426#[must_use]
1427pub struct ExecuteContextExtra {
1428    statement_uuid: Option<StatementLoggingId>,
1429}
1430
1431impl ExecuteContextExtra {
1432    pub(crate) fn new(statement_uuid: Option<StatementLoggingId>) -> Self {
1433        Self { statement_uuid }
1434    }
1435    pub fn is_trivial(&self) -> bool {
1436        self.statement_uuid.is_none()
1437    }
1438    pub fn contents(&self) -> Option<StatementLoggingId> {
1439        self.statement_uuid
1440    }
1441    /// Consume this extra and return the statement UUID for retirement.
1442    /// This should only be called from code that knows what to do to finish
1443    /// up logging based on the inner value.
1444    #[must_use]
1445    pub(crate) fn retire(self) -> Option<StatementLoggingId> {
1446        self.statement_uuid
1447    }
1448}
1449
1450/// A guard that wraps `ExecuteContextExtra` and owns a channel for sending
1451/// retirement messages to the coordinator.
1452///
1453/// If this guard is dropped with a `Some` `statement_uuid` in its inner
1454/// `ExecuteContextExtra`, the `Drop` implementation will automatically send a
1455/// `Message::RetireExecute` to log the statement ending.
1456/// This handles cases like connection drops where the context cannot be
1457/// explicitly retired.
1458/// See <https://github.com/MaterializeInc/database-issues/issues/7304>
1459#[derive(Debug)]
1460#[must_use]
1461pub struct ExecuteContextGuard {
1462    extra: ExecuteContextExtra,
1463    /// Channel for sending messages to the coordinator. Used for auto-retiring on drop.
1464    /// For `Default` instances, this is a dummy sender (receiver already dropped), so
1465    /// sends will fail silently - which is the desired behavior since Default instances
1466    /// should only be used for non-logged statements.
1467    coordinator_tx: mpsc::UnboundedSender<Message>,
1468}
1469
1470impl Default for ExecuteContextGuard {
1471    fn default() -> Self {
1472        // Create a dummy sender by immediately dropping the receiver.
1473        // Any send on this channel will fail silently, which is the desired
1474        // behavior for Default instances (non-logged statements).
1475        let (tx, _rx) = mpsc::unbounded_channel();
1476        Self {
1477            extra: ExecuteContextExtra::default(),
1478            coordinator_tx: tx,
1479        }
1480    }
1481}
1482
1483impl ExecuteContextGuard {
1484    pub(crate) fn new(
1485        statement_uuid: Option<StatementLoggingId>,
1486        coordinator_tx: mpsc::UnboundedSender<Message>,
1487    ) -> Self {
1488        Self {
1489            extra: ExecuteContextExtra::new(statement_uuid),
1490            coordinator_tx,
1491        }
1492    }
1493    pub fn is_trivial(&self) -> bool {
1494        self.extra.is_trivial()
1495    }
1496    pub fn contents(&self) -> Option<StatementLoggingId> {
1497        self.extra.contents()
1498    }
1499    /// Take responsibility for the contents.  This should only be
1500    /// called from code that knows what to do to finish up logging
1501    /// based on the inner value.
1502    ///
1503    /// Returns the inner `ExecuteContextExtra`, consuming the guard without
1504    /// triggering the auto-retire behavior.
1505    pub(crate) fn defuse(mut self) -> ExecuteContextExtra {
1506        // Taking statement_uuid prevents the Drop impl from sending a retire message
1507        std::mem::take(&mut self.extra)
1508    }
1509}
1510
1511impl Drop for ExecuteContextGuard {
1512    fn drop(&mut self) {
1513        if let Some(statement_uuid) = self.extra.statement_uuid.take() {
1514            // Auto-retire since the guard was dropped without explicit retirement (likely due
1515            // to connection drop).
1516            let msg = Message::RetireExecute {
1517                data: ExecuteContextExtra {
1518                    statement_uuid: Some(statement_uuid),
1519                },
1520                otel_ctx: OpenTelemetryContext::obtain(),
1521                reason: StatementEndedExecutionReason::Aborted,
1522            };
1523            // Send may fail for Default instances (dummy sender), which is fine since
1524            // Default instances should only be used for non-logged statements.
1525            let _ = self.coordinator_tx.send(msg);
1526        }
1527    }
1528}
1529
1530/// Bundle of state related to statement execution.
1531///
1532/// This struct collects a bundle of state that needs to be threaded
1533/// through various functions as part of statement execution.
1534/// It is used to finalize execution, by calling `retire`. Finalizing execution
1535/// involves sending the session back to the pgwire layer so that it
1536/// may be used to process further commands. It also involves
1537/// performing some work on the main coordinator thread
1538/// (e.g., recording the time at which the statement finished
1539/// executing). The state necessary to perform this work is bundled in
1540/// the `ExecuteContextGuard` object.
1541#[derive(Debug)]
1542pub struct ExecuteContext {
1543    inner: Box<ExecuteContextInner>,
1544}
1545
1546impl std::ops::Deref for ExecuteContext {
1547    type Target = ExecuteContextInner;
1548    fn deref(&self) -> &Self::Target {
1549        &*self.inner
1550    }
1551}
1552
1553impl std::ops::DerefMut for ExecuteContext {
1554    fn deref_mut(&mut self) -> &mut Self::Target {
1555        &mut *self.inner
1556    }
1557}
1558
1559#[derive(Debug)]
1560pub struct ExecuteContextInner {
1561    tx: ClientTransmitter<ExecuteResponse>,
1562    internal_cmd_tx: mpsc::UnboundedSender<Message>,
1563    session: Session,
1564    extra: ExecuteContextGuard,
1565}
1566
1567impl ExecuteContext {
1568    pub fn session(&self) -> &Session {
1569        &self.session
1570    }
1571
1572    pub fn session_mut(&mut self) -> &mut Session {
1573        &mut self.session
1574    }
1575
1576    pub fn tx(&self) -> &ClientTransmitter<ExecuteResponse> {
1577        &self.tx
1578    }
1579
1580    pub fn tx_mut(&mut self) -> &mut ClientTransmitter<ExecuteResponse> {
1581        &mut self.tx
1582    }
1583
1584    pub fn from_parts(
1585        tx: ClientTransmitter<ExecuteResponse>,
1586        internal_cmd_tx: mpsc::UnboundedSender<Message>,
1587        session: Session,
1588        extra: ExecuteContextGuard,
1589    ) -> Self {
1590        Self {
1591            inner: ExecuteContextInner {
1592                tx,
1593                session,
1594                extra,
1595                internal_cmd_tx,
1596            }
1597            .into(),
1598        }
1599    }
1600
1601    /// By calling this function, the caller takes responsibility for
1602    /// dealing with the instance of `ExecuteContextGuard`. This is
1603    /// intended to support protocols (like `COPY FROM`) that involve
1604    /// multiple passes of sending the session back and forth between
1605    /// the coordinator and the pgwire layer. As part of any such
1606    /// protocol, we must ensure that the `ExecuteContextGuard`
1607    /// (possibly wrapped in a new `ExecuteContext`) is passed back to the coordinator for
1608    /// eventual retirement.
1609    pub fn into_parts(
1610        self,
1611    ) -> (
1612        ClientTransmitter<ExecuteResponse>,
1613        mpsc::UnboundedSender<Message>,
1614        Session,
1615        ExecuteContextGuard,
1616    ) {
1617        let ExecuteContextInner {
1618            tx,
1619            internal_cmd_tx,
1620            session,
1621            extra,
1622        } = *self.inner;
1623        (tx, internal_cmd_tx, session, extra)
1624    }
1625
1626    /// Retire the execution, by sending a message to the coordinator.
1627    #[instrument(level = "debug")]
1628    pub fn retire(self, result: Result<ExecuteResponse, AdapterError>) {
1629        let ExecuteContextInner {
1630            tx,
1631            internal_cmd_tx,
1632            session,
1633            extra,
1634        } = *self.inner;
1635        let reason = if extra.is_trivial() {
1636            None
1637        } else {
1638            Some((&result).into())
1639        };
1640        tx.send(result, session);
1641        if let Some(reason) = reason {
1642            // Retire the guard to get the inner ExecuteContextExtra without triggering auto-retire
1643            let extra = extra.defuse();
1644            if let Err(e) = internal_cmd_tx.send(Message::RetireExecute {
1645                otel_ctx: OpenTelemetryContext::obtain(),
1646                data: extra,
1647                reason,
1648            }) {
1649                warn!("internal_cmd_rx dropped before we could send: {:?}", e);
1650            }
1651        }
1652    }
1653
1654    pub fn extra(&self) -> &ExecuteContextGuard {
1655        &self.extra
1656    }
1657
1658    pub fn extra_mut(&mut self) -> &mut ExecuteContextGuard {
1659        &mut self.extra
1660    }
1661}
1662
1663#[derive(Debug)]
1664struct ClusterReplicaStatuses(
1665    BTreeMap<ClusterId, BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>>>,
1666);
1667
1668impl ClusterReplicaStatuses {
1669    pub(crate) fn new() -> ClusterReplicaStatuses {
1670        ClusterReplicaStatuses(BTreeMap::new())
1671    }
1672
1673    /// Initializes the statuses of the specified cluster.
1674    ///
1675    /// Panics if the cluster statuses are already initialized.
1676    pub(crate) fn initialize_cluster_statuses(&mut self, cluster_id: ClusterId) {
1677        let prev = self.0.insert(cluster_id, BTreeMap::new());
1678        assert_eq!(
1679            prev, None,
1680            "cluster {cluster_id} statuses already initialized"
1681        );
1682    }
1683
1684    /// Initializes the statuses of the specified cluster replica.
1685    ///
1686    /// Panics if the cluster replica statuses are already initialized.
1687    pub(crate) fn initialize_cluster_replica_statuses(
1688        &mut self,
1689        cluster_id: ClusterId,
1690        replica_id: ReplicaId,
1691        num_processes: usize,
1692        time: DateTime<Utc>,
1693    ) {
1694        tracing::info!(
1695            ?cluster_id,
1696            ?replica_id,
1697            ?time,
1698            "initializing cluster replica status"
1699        );
1700        let replica_statuses = self.0.entry(cluster_id).or_default();
1701        let process_statuses = (0..num_processes)
1702            .map(|process_id| {
1703                let status = ClusterReplicaProcessStatus {
1704                    status: ClusterStatus::Offline(Some(OfflineReason::Initializing)),
1705                    time: time.clone(),
1706                };
1707                (u64::cast_from(process_id), status)
1708            })
1709            .collect();
1710        let prev = replica_statuses.insert(replica_id, process_statuses);
1711        assert_none!(
1712            prev,
1713            "cluster replica {cluster_id}.{replica_id} statuses already initialized"
1714        );
1715    }
1716
1717    /// Removes the statuses of the specified cluster.
1718    ///
1719    /// Panics if the cluster does not exist.
1720    pub(crate) fn remove_cluster_statuses(
1721        &mut self,
1722        cluster_id: &ClusterId,
1723    ) -> BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>> {
1724        let prev = self.0.remove(cluster_id);
1725        prev.unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"))
1726    }
1727
1728    /// Removes the statuses of the specified cluster replica.
1729    ///
1730    /// Panics if the cluster or replica does not exist.
1731    pub(crate) fn remove_cluster_replica_statuses(
1732        &mut self,
1733        cluster_id: &ClusterId,
1734        replica_id: &ReplicaId,
1735    ) -> BTreeMap<ProcessId, ClusterReplicaProcessStatus> {
1736        let replica_statuses = self
1737            .0
1738            .get_mut(cluster_id)
1739            .unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"));
1740        let prev = replica_statuses.remove(replica_id);
1741        prev.unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1742    }
1743
1744    /// Inserts or updates the status of the specified cluster replica process.
1745    ///
1746    /// Panics if the cluster or replica does not exist.
1747    pub(crate) fn ensure_cluster_status(
1748        &mut self,
1749        cluster_id: ClusterId,
1750        replica_id: ReplicaId,
1751        process_id: ProcessId,
1752        status: ClusterReplicaProcessStatus,
1753    ) {
1754        let replica_statuses = self
1755            .0
1756            .get_mut(&cluster_id)
1757            .unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"))
1758            .get_mut(&replica_id)
1759            .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"));
1760        replica_statuses.insert(process_id, status);
1761    }
1762
1763    /// Computes the status of the cluster replica as a whole.
1764    ///
1765    /// Panics if `cluster_id` or `replica_id` don't exist.
1766    pub fn get_cluster_replica_status(
1767        &self,
1768        cluster_id: ClusterId,
1769        replica_id: ReplicaId,
1770    ) -> ClusterStatus {
1771        let process_status = self.get_cluster_replica_statuses(cluster_id, replica_id);
1772        Self::cluster_replica_status(process_status)
1773    }
1774
1775    /// Computes the status of the cluster replica as a whole.
1776    pub fn cluster_replica_status(
1777        process_status: &BTreeMap<ProcessId, ClusterReplicaProcessStatus>,
1778    ) -> ClusterStatus {
1779        process_status
1780            .values()
1781            .fold(ClusterStatus::Online, |s, p| match (s, p.status) {
1782                (ClusterStatus::Online, ClusterStatus::Online) => ClusterStatus::Online,
1783                (x, y) => {
1784                    let reason_x = match x {
1785                        ClusterStatus::Offline(reason) => reason,
1786                        ClusterStatus::Online => None,
1787                    };
1788                    let reason_y = match y {
1789                        ClusterStatus::Offline(reason) => reason,
1790                        ClusterStatus::Online => None,
1791                    };
1792                    // Arbitrarily pick the first known not-ready reason.
1793                    ClusterStatus::Offline(reason_x.or(reason_y))
1794                }
1795            })
1796    }
1797
1798    /// Gets the statuses of the given cluster replica.
1799    ///
1800    /// Panics if the cluster or replica does not exist
1801    pub(crate) fn get_cluster_replica_statuses(
1802        &self,
1803        cluster_id: ClusterId,
1804        replica_id: ReplicaId,
1805    ) -> &BTreeMap<ProcessId, ClusterReplicaProcessStatus> {
1806        self.try_get_cluster_replica_statuses(cluster_id, replica_id)
1807            .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1808    }
1809
1810    /// Gets the statuses of the given cluster replica.
1811    pub(crate) fn try_get_cluster_replica_statuses(
1812        &self,
1813        cluster_id: ClusterId,
1814        replica_id: ReplicaId,
1815    ) -> Option<&BTreeMap<ProcessId, ClusterReplicaProcessStatus>> {
1816        self.try_get_cluster_statuses(cluster_id)
1817            .and_then(|statuses| statuses.get(&replica_id))
1818    }
1819
1820    /// Gets the statuses of the given cluster.
1821    pub(crate) fn try_get_cluster_statuses(
1822        &self,
1823        cluster_id: ClusterId,
1824    ) -> Option<&BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>>> {
1825        self.0.get(&cluster_id)
1826    }
1827}
1828
1829/// Glues the external world to the Timely workers.
1830#[derive(Derivative)]
1831#[derivative(Debug)]
1832pub struct Coordinator {
1833    /// The controller for the storage and compute layers.
1834    #[derivative(Debug = "ignore")]
1835    controller: mz_controller::Controller,
1836    /// The catalog in an Arc suitable for readonly references. The Arc allows
1837    /// us to hand out cheap copies of the catalog to functions that can use it
1838    /// off of the main coordinator thread. If the coordinator needs to mutate
1839    /// the catalog, call [`Self::catalog_mut`], which will clone this struct member,
1840    /// allowing it to be mutated here while the other off-thread references can
1841    /// read their catalog as long as needed. In the future we would like this
1842    /// to be a pTVC, but for now this is sufficient.
1843    catalog: Arc<Catalog>,
1844
1845    /// A client for persist. Initially, this is only used for reading stashed
1846    /// peek responses out of batches.
1847    persist_client: PersistClient,
1848
1849    /// Channel to manage internal commands from the coordinator to itself.
1850    internal_cmd_tx: mpsc::UnboundedSender<Message>,
1851    /// Notification that triggers a group commit.
1852    group_commit_tx: appends::GroupCommitNotifier,
1853
1854    /// Channel for strict serializable reads ready to commit.
1855    strict_serializable_reads_tx: mpsc::UnboundedSender<(ConnectionId, PendingReadTxn)>,
1856
1857    /// Mechanism for totally ordering write and read timestamps, so that all reads
1858    /// reflect exactly the set of writes that precede them, and no writes that follow.
1859    global_timelines: BTreeMap<Timeline, TimelineState>,
1860
1861    /// A generator for transient [`GlobalId`]s, shareable with other threads.
1862    transient_id_gen: Arc<TransientIdGen>,
1863    /// A map from connection ID to metadata about that connection for all
1864    /// active connections.
1865    active_conns: BTreeMap<ConnectionId, ConnMeta>,
1866
1867    /// For each transaction, the read holds taken to support any performed reads.
1868    ///
1869    /// Upon completing a transaction, these read holds should be dropped.
1870    txn_read_holds: BTreeMap<ConnectionId, read_policy::ReadHolds>,
1871
1872    /// Access to the peek fields should be restricted to methods in the [`peek`] API.
1873    /// A map from pending peek ids to the queue into which responses are sent, and
1874    /// the connection id of the client that initiated the peek.
1875    pending_peeks: BTreeMap<Uuid, PendingPeek>,
1876    /// A map from client connection ids to a set of all pending peeks for that client.
1877    client_pending_peeks: BTreeMap<ConnectionId, BTreeMap<Uuid, ClusterId>>,
1878
1879    /// A map from client connection ids to pending linearize read transaction.
1880    pending_linearize_read_txns: BTreeMap<ConnectionId, PendingReadTxn>,
1881
1882    /// A map from the compute sink ID to it's state description.
1883    active_compute_sinks: BTreeMap<GlobalId, ActiveComputeSink>,
1884    /// A map from active webhooks to their invalidation handle.
1885    active_webhooks: BTreeMap<CatalogItemId, WebhookAppenderInvalidator>,
1886    /// A map of active `COPY FROM` statements. The Coordinator waits for `clusterd`
1887    /// to stage Batches in Persist that we will then link into the shard.
1888    active_copies: BTreeMap<ConnectionId, ActiveCopyFrom>,
1889
1890    /// Connection-scoped cancellation watches.
1891    ///
1892    /// Each entry is a watch channel whose value is `false` until cancellation
1893    /// is requested for that connection, at which point it is set to `true`.
1894    ///
1895    /// Consumers install/remove these watches while they have cancellable work
1896    /// in flight.
1897    connection_cancel_watches: BTreeMap<ConnectionId, (watch::Sender<bool>, watch::Receiver<bool>)>,
1898    /// Active introspection subscribes.
1899    introspection_subscribes: BTreeMap<GlobalId, IntrospectionSubscribe>,
1900
1901    /// Locks that grant access to a specific object, populated lazily as objects are written to.
1902    write_locks: BTreeMap<CatalogItemId, Arc<tokio::sync::Mutex<()>>>,
1903    /// Plans that are currently deferred and waiting on a write lock.
1904    deferred_write_ops: BTreeMap<ConnectionId, DeferredOp>,
1905
1906    /// Pending writes waiting for a group commit.
1907    pending_writes: Vec<PendingWriteTxn>,
1908
1909    /// For the realtime timeline, an explicit SELECT or INSERT on a table will bump the
1910    /// table's timestamps, but there are cases where timestamps are not bumped but
1911    /// we expect the closed timestamps to advance (`AS OF X`, SUBSCRIBing views over
1912    /// RT sources and tables). To address these, spawn a task that forces table
1913    /// timestamps to close on a regular interval. This roughly tracks the behavior
1914    /// of realtime sources that close off timestamps on an interval.
1915    ///
1916    /// For non-realtime timelines, nothing pushes the timestamps forward, so we must do
1917    /// it manually.
1918    advance_timelines_interval: Interval,
1919
1920    /// Serialized DDL. DDL must be serialized because:
1921    /// - Many of them do off-thread work and need to verify the catalog is in a valid state, but
1922    ///   [`PlanValidity`] does not currently support tracking all changes. Doing that correctly
1923    ///   seems to be more difficult than it's worth, so we would instead re-plan and re-sequence
1924    ///   the statements.
1925    /// - Re-planning a statement is hard because Coordinator and Session state is mutated at
1926    ///   various points, and we would need to correctly reset those changes before re-planning and
1927    ///   re-sequencing.
1928    serialized_ddl: LockedVecDeque<DeferredPlanStatement>,
1929
1930    /// Handle to secret manager that can create and delete secrets from
1931    /// an arbitrary secret storage engine.
1932    secrets_controller: Arc<dyn SecretsController>,
1933    /// A secrets reader than maintains an in-memory cache, where values have a set TTL.
1934    caching_secrets_reader: CachingSecretsReader,
1935
1936    /// Handle to a manager that can create and delete kubernetes resources
1937    /// (ie: VpcEndpoint objects)
1938    cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
1939
1940    /// Persist client for fetching storage metadata such as size metrics.
1941    storage_usage_client: StorageUsageClient,
1942    /// The interval at which to collect storage usage information.
1943    storage_usage_collection_interval: Duration,
1944
1945    /// Segment analytics client.
1946    #[derivative(Debug = "ignore")]
1947    segment_client: Option<mz_segment::Client>,
1948
1949    /// Coordinator metrics.
1950    metrics: Metrics,
1951    /// Optimizer metrics.
1952    optimizer_metrics: OptimizerMetrics,
1953
1954    /// Tracing handle.
1955    tracing_handle: TracingHandle,
1956
1957    /// Data used by the statement logging feature.
1958    statement_logging: StatementLogging,
1959
1960    /// Limit for how many concurrent webhook requests we allow.
1961    webhook_concurrency_limit: WebhookConcurrencyLimiter,
1962
1963    /// Optional config for the timestamp oracle. This is _required_ when
1964    /// a timestamp oracle backend is configured.
1965    timestamp_oracle_config: Option<TimestampOracleConfig>,
1966
1967    /// Periodically asks cluster scheduling policies to make their decisions.
1968    check_cluster_scheduling_policies_interval: Interval,
1969
1970    /// This keeps the last On/Off decision for each cluster and each scheduling policy.
1971    /// (Clusters that have been dropped or are otherwise out of scope for automatic scheduling are
1972    /// periodically cleaned up from this Map.)
1973    cluster_scheduling_decisions: BTreeMap<ClusterId, BTreeMap<&'static str, SchedulingDecision>>,
1974
1975    /// When doing 0dt upgrades/in read-only mode, periodically ask all known
1976    /// clusters/collections whether they are caught up.
1977    caught_up_check_interval: Interval,
1978
1979    /// Context needed to check whether all clusters/collections have caught up.
1980    /// Only used during 0dt deployment, while in read-only mode.
1981    caught_up_check: Option<CaughtUpCheckContext>,
1982
1983    /// The metrics registry, handed to the catalog info-metrics background task
1984    /// so it can register and own its `*_info` series.
1985    catalog_info_metrics_registry: MetricsRegistry,
1986
1987    /// Tracks the state associated with the currently installed watchsets.
1988    installed_watch_sets: BTreeMap<WatchSetId, (ConnectionId, WatchSetResponse)>,
1989
1990    /// Tracks the currently installed watchsets for each connection.
1991    connection_watch_sets: BTreeMap<ConnectionId, BTreeSet<WatchSetId>>,
1992
1993    /// Tracks the statuses of all cluster replicas.
1994    cluster_replica_statuses: ClusterReplicaStatuses,
1995
1996    /// Whether or not to start controllers in read-only mode. This is only
1997    /// meant for use during development of read-only clusters and 0dt upgrades
1998    /// and should go away once we have proper orchestration during upgrades.
1999    read_only_controllers: bool,
2000
2001    /// Updates to builtin tables that are being buffered while we are in
2002    /// read-only mode. We apply these all at once when coming out of read-only
2003    /// mode.
2004    ///
2005    /// This is a `Some` while in read-only mode and will be replaced by a
2006    /// `None` when we transition out of read-only mode and write out any
2007    /// buffered updates.
2008    buffered_builtin_table_updates: Option<Vec<BuiltinTableUpdate>>,
2009
2010    license_key: ValidatedLicenseKey,
2011
2012    /// Pre-allocated pool of user IDs to amortize persist writes across DDL operations.
2013    user_id_pool: IdPool,
2014}
2015
2016impl Coordinator {
2017    /// Resolves the replica-local scoped overrides from the catalog working copy
2018    /// into the compute controller's per-replica dyncfg layer, then re-pushes
2019    /// the environment-wide compute configuration so replicas observe the new
2020    /// values. Driven by the catalog implication for replica-scoped
2021    /// configuration changes, and called once on bootstrap.
2022    pub(crate) fn push_replica_dyncfg_overrides(&mut self) {
2023        // Clone the (sparse) replica overrides so we don't hold a catalog borrow
2024        // across the mutable controller calls below.
2025        let replica_overrides = self
2026            .catalog()
2027            .state()
2028            .scoped_system_parameters()
2029            .replica
2030            .clone();
2031
2032        let dyncfgs = self.catalog().system_config().dyncfgs();
2033        let mut instance_overrides: BTreeMap<
2034            ComputeInstanceId,
2035            BTreeMap<ReplicaId, ConfigUpdates>,
2036        > = BTreeMap::new();
2037        for cluster in self.catalog().clusters() {
2038            for replica in cluster.replicas() {
2039                let Some(values) = replica_overrides.get(&replica.replica_id) else {
2040                    continue;
2041                };
2042                let mut updates = ConfigUpdates::default();
2043                for (name, value) in values {
2044                    let Some(entry) = dyncfgs.entry(name) else {
2045                        // A replica-local parameter that is not a dyncfg has no
2046                        // per-replica realization, so skip it.
2047                        continue;
2048                    };
2049                    match entry.parse_val(value) {
2050                        Ok(val) => updates.add_dynamic(name, val),
2051                        Err(e) => {
2052                            tracing::warn!(%name, %value, "cannot parse scoped override: {e}")
2053                        }
2054                    }
2055                }
2056                if !updates.updates.is_empty() {
2057                    instance_overrides
2058                        .entry(cluster.id)
2059                        .or_default()
2060                        .insert(replica.replica_id, updates);
2061                }
2062            }
2063        }
2064
2065        // Only the compute controller's per-replica dyncfg layer is pushed, but on
2066        // `clusterd` that also reaches storage. Compute and storage share one
2067        // process, and the compute worker's `handle_update_configuration` applies
2068        // the pushed dyncfg updates both to compute's own worker `ConfigSet` and to
2069        // the shared persist client `ConfigSet` (`persist_clients.cfg()`) that the
2070        // co-located storage server reads from the same `Arc`. So persist-backed and
2071        // process-global replica-local configs such as the persist pager, LZ4,
2072        // persist client tuning, and `lgalloc` take effect on storage too. The only
2073        // gap would be a future `Replica`-scoped config realized solely in the
2074        // storage worker's own `ConfigSet`, of which none exists today.
2075        self.controller
2076            .compute
2077            .update_replica_dyncfg_overrides(instance_overrides);
2078        // Re-push the env-wide compute config so existing replicas pick up their
2079        // (possibly changed) overrides. This also reverts a removed override: the
2080        // per-replica layer no longer carries the key, so the replica falls back
2081        // to the env-wide value, which `compute_config` always includes because it
2082        // renders the full dyncfg set.
2083        let compute_config = crate::flags::compute_config(self.catalog().system_config());
2084        self.controller.compute.update_configuration(compute_config);
2085    }
2086
2087    /// Initializes coordinator state based on the contained catalog. Must be
2088    /// called after creating the coordinator and before calling the
2089    /// `Coordinator::serve` method.
2090    #[instrument(name = "coord::bootstrap")]
2091    pub(crate) async fn bootstrap(
2092        &mut self,
2093        boot_ts: Timestamp,
2094        migrated_storage_collections_0dt: BTreeSet<CatalogItemId>,
2095        mut builtin_table_updates: Vec<BuiltinTableUpdate>,
2096        cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
2097        uncached_local_exprs: BTreeMap<GlobalId, LocalExpressions>,
2098        audit_logs_iterator: AuditLogIterator,
2099    ) -> Result<(), AdapterError> {
2100        let bootstrap_start = Instant::now();
2101        info!("startup: coordinator init: bootstrap beginning");
2102        info!("startup: coordinator init: bootstrap: preamble beginning");
2103
2104        // Initialize cluster replica statuses.
2105        // Gross iterator is to avoid partial borrow issues.
2106        let cluster_statuses: Vec<(_, Vec<_>)> = self
2107            .catalog()
2108            .clusters()
2109            .map(|cluster| {
2110                (
2111                    cluster.id(),
2112                    cluster
2113                        .replicas()
2114                        .map(|replica| {
2115                            (replica.replica_id, replica.config.location.num_processes())
2116                        })
2117                        .collect(),
2118                )
2119            })
2120            .collect();
2121        let now = self.now_datetime();
2122        for (cluster_id, replica_statuses) in cluster_statuses {
2123            self.cluster_replica_statuses
2124                .initialize_cluster_statuses(cluster_id);
2125            for (replica_id, num_processes) in replica_statuses {
2126                self.cluster_replica_statuses
2127                    .initialize_cluster_replica_statuses(
2128                        cluster_id,
2129                        replica_id,
2130                        num_processes,
2131                        now,
2132                    );
2133            }
2134        }
2135
2136        let system_config = self.catalog().system_config();
2137
2138        // Inform metrics about the initial system configuration.
2139        mz_metrics::update_dyncfg(&system_config.dyncfg_updates());
2140
2141        // Inform the controllers about their initial configuration.
2142        let compute_config = flags::compute_config(system_config);
2143        let storage_config = flags::storage_config(system_config);
2144        let scheduling_config = flags::orchestrator_scheduling_config(system_config);
2145        let dyncfg_updates = system_config.dyncfg_updates();
2146        self.controller.compute.update_configuration(compute_config);
2147        self.controller.storage.update_parameters(storage_config);
2148        self.controller
2149            .update_orchestrator_scheduling_config(scheduling_config);
2150        self.controller.update_configuration(dyncfg_updates);
2151
2152        // Skip the credit consumption check at bootstrap under DisableClusterCreation behavior:
2153        // this codepath validates existing replicas at startup, not cluster creation, so it
2154        // must not block startup. New cluster creation is still gated by the DDL-time check.
2155        // The Disable case is already handled by a bail! in main.rs before we reach here.
2156        let enforce_credit_limit_at_bootstrap = !matches!(
2157            self.license_key.expiration_behavior,
2158            ExpirationBehavior::DisableClusterCreation,
2159        );
2160        if enforce_credit_limit_at_bootstrap {
2161            self.validate_resource_limit_numeric(
2162                Numeric::zero(),
2163                self.current_credit_consumption_rate(),
2164                |system_vars| {
2165                    self.license_key
2166                        .max_credit_consumption_rate()
2167                        .map_or_else(|| system_vars.max_credit_consumption_rate(), Numeric::from)
2168                },
2169                "cluster replica",
2170                MAX_CREDIT_CONSUMPTION_RATE.name(),
2171            )?;
2172        }
2173
2174        let mut policies_to_set: BTreeMap<CompactionWindow, CollectionIdBundle> =
2175            Default::default();
2176
2177        let enable_worker_core_affinity =
2178            self.catalog().system_config().enable_worker_core_affinity();
2179        let enable_storage_introspection_logs = self
2180            .catalog()
2181            .system_config()
2182            .enable_storage_introspection_logs();
2183        for instance in self.catalog.clusters() {
2184            self.controller.create_cluster(
2185                instance.id,
2186                ClusterConfig {
2187                    arranged_logs: instance.log_indexes.clone(),
2188                    workload_class: instance.config.workload_class.clone(),
2189                },
2190            )?;
2191            for replica in instance.replicas() {
2192                let role = instance.role();
2193                self.controller.create_replica(
2194                    instance.id,
2195                    replica.replica_id,
2196                    instance.name.clone(),
2197                    replica.name.clone(),
2198                    role,
2199                    replica.config.clone(),
2200                    enable_worker_core_affinity,
2201                    enable_storage_introspection_logs,
2202                )?;
2203            }
2204        }
2205
2206        // Now that the compute instances and their replicas exist, push the
2207        // replica-local scoped overrides into the compute controller so existing
2208        // replicas observe them at startup. The scoped (per-cluster and
2209        // per-replica) working copy was restored from the durable cache into
2210        // `CatalogState` while opening the catalog, so the last-known values are
2211        // in effect before the first parameter sync and through a sync outage.
2212        // This must run after the creation loop above: the push iterates the
2213        // controller's instances, so before they exist it is a no-op. It also
2214        // runs before dataflows are rendered later in bootstrap, so render-frozen
2215        // replica flags take effect. The cluster-coherent layer is read at plan
2216        // time.
2217        self.push_replica_dyncfg_overrides();
2218
2219        info!(
2220            "startup: coordinator init: bootstrap: preamble complete in {:?}",
2221            bootstrap_start.elapsed()
2222        );
2223
2224        let init_storage_collections_start = Instant::now();
2225        info!("startup: coordinator init: bootstrap: storage collections init beginning");
2226        self.bootstrap_storage_collections(&migrated_storage_collections_0dt)
2227            .await;
2228        info!(
2229            "startup: coordinator init: bootstrap: storage collections init complete in {:?}",
2230            init_storage_collections_start.elapsed()
2231        );
2232
2233        // The storage controller knows about the introspection collections now, so we can start
2234        // sinking introspection updates in the compute controller. It makes sense to do that as
2235        // soon as possible, to avoid updates piling up in the compute controller's internal
2236        // buffers.
2237        self.controller.start_compute_introspection_sink();
2238
2239        let sorting_start = Instant::now();
2240        info!("startup: coordinator init: bootstrap: sorting catalog entries");
2241        let entries = self.bootstrap_sort_catalog_entries();
2242        info!(
2243            "startup: coordinator init: bootstrap: sorting catalog entries complete in {:?}",
2244            sorting_start.elapsed()
2245        );
2246
2247        let optimize_dataflows_start = Instant::now();
2248        info!("startup: coordinator init: bootstrap: optimize dataflow plans beginning");
2249        let uncached_global_exps = self.bootstrap_dataflow_plans(&entries, cached_global_exprs)?;
2250        info!(
2251            "startup: coordinator init: bootstrap: optimize dataflow plans complete in {:?}",
2252            optimize_dataflows_start.elapsed()
2253        );
2254
2255        // We don't need to wait for the cache to update.
2256        let _fut = self.catalog().update_expression_cache(
2257            uncached_local_exprs.into_iter().collect(),
2258            uncached_global_exps.into_iter().collect(),
2259            Default::default(),
2260        );
2261
2262        // Select dataflow as-ofs. This step relies on the storage collections created by
2263        // `bootstrap_storage_collections` and the dataflow plans created by
2264        // `bootstrap_dataflow_plans`.
2265        let bootstrap_as_ofs_start = Instant::now();
2266        info!("startup: coordinator init: bootstrap: dataflow as-of bootstrapping beginning");
2267        let dataflow_read_holds = self.bootstrap_dataflow_as_ofs().await;
2268        info!(
2269            "startup: coordinator init: bootstrap: dataflow as-of bootstrapping complete in {:?}",
2270            bootstrap_as_ofs_start.elapsed()
2271        );
2272
2273        let postamble_start = Instant::now();
2274        info!("startup: coordinator init: bootstrap: postamble beginning");
2275
2276        let logs: BTreeSet<_> = BUILTINS::logs()
2277            .map(|log| self.catalog().resolve_builtin_log(log))
2278            .flat_map(|item_id| self.catalog().get_global_ids(&item_id))
2279            .collect();
2280
2281        let mut privatelink_connections = BTreeMap::new();
2282
2283        for entry in &entries {
2284            debug!(
2285                "coordinator init: installing {} {}",
2286                entry.item().typ(),
2287                entry.id()
2288            );
2289            let mut policy = entry.item().initial_logical_compaction_window();
2290            match entry.item() {
2291                // Currently catalog item rebuild assumes that sinks and
2292                // indexes are always built individually and does not store information
2293                // about how it was built. If we start building multiple sinks and/or indexes
2294                // using a single dataflow, we have to make sure the rebuild process re-runs
2295                // the same multiple-build dataflow.
2296                CatalogItem::Source(source) => {
2297                    // Propagate source compaction windows to subsources if needed.
2298                    if source.custom_logical_compaction_window.is_none() {
2299                        if let DataSourceDesc::IngestionExport { ingestion_id, .. } =
2300                            source.data_source
2301                        {
2302                            policy = Some(
2303                                self.catalog()
2304                                    .get_entry(&ingestion_id)
2305                                    .source()
2306                                    .expect("must be source")
2307                                    .custom_logical_compaction_window
2308                                    .unwrap_or_default(),
2309                            );
2310                        }
2311                    }
2312                    policies_to_set
2313                        .entry(policy.expect("sources have a compaction window"))
2314                        .or_insert_with(Default::default)
2315                        .storage_ids
2316                        .insert(source.global_id());
2317                }
2318                CatalogItem::Table(table) => {
2319                    policies_to_set
2320                        .entry(policy.expect("tables have a compaction window"))
2321                        .or_insert_with(Default::default)
2322                        .storage_ids
2323                        .extend(table.global_ids());
2324                }
2325                CatalogItem::Index(idx) => {
2326                    let policy_entry = policies_to_set
2327                        .entry(policy.expect("indexes have a compaction window"))
2328                        .or_insert_with(Default::default);
2329
2330                    if logs.contains(&idx.on) {
2331                        policy_entry
2332                            .compute_ids
2333                            .entry(idx.cluster_id)
2334                            .or_insert_with(BTreeSet::new)
2335                            .insert(idx.global_id());
2336                    } else {
2337                        let df_desc = self
2338                            .catalog()
2339                            .try_get_physical_plan(&idx.global_id())
2340                            .expect("added in `bootstrap_dataflow_plans`")
2341                            .clone();
2342
2343                        let df_meta = self
2344                            .catalog()
2345                            .try_get_dataflow_metainfo(&idx.global_id())
2346                            .expect("added in `bootstrap_dataflow_plans`");
2347
2348                        if self.catalog().state().system_config().enable_mz_notices() {
2349                            // Collect optimization hint updates.
2350                            self.catalog().state().pack_optimizer_notices(
2351                                &mut builtin_table_updates,
2352                                df_meta.optimizer_notices.iter(),
2353                                Diff::ONE,
2354                            );
2355                        }
2356
2357                        // What follows is morally equivalent to `self.ship_dataflow(df, idx.cluster_id)`,
2358                        // but we cannot call that as it will also downgrade the read hold on the index.
2359                        policy_entry
2360                            .compute_ids
2361                            .entry(idx.cluster_id)
2362                            .or_insert_with(Default::default)
2363                            .extend(df_desc.export_ids());
2364
2365                        self.controller
2366                            .compute
2367                            .create_dataflow(idx.cluster_id, df_desc, None)
2368                            .unwrap_or_terminate("cannot fail to create dataflows");
2369                    }
2370                }
2371                CatalogItem::View(_) => (),
2372                CatalogItem::MaterializedView(mview) => {
2373                    policies_to_set
2374                        .entry(policy.expect("materialized views have a compaction window"))
2375                        .or_insert_with(Default::default)
2376                        .storage_ids
2377                        .insert(mview.global_id_writes());
2378
2379                    let mut df_desc = self
2380                        .catalog()
2381                        .try_get_physical_plan(&mview.global_id_writes())
2382                        .expect("added in `bootstrap_dataflow_plans`")
2383                        .clone();
2384
2385                    if let Some(initial_as_of) = mview.initial_as_of.clone() {
2386                        df_desc.set_initial_as_of(initial_as_of);
2387                    }
2388
2389                    // If we have a refresh schedule that has a last refresh, then set the `until` to the last refresh.
2390                    let until = mview
2391                        .refresh_schedule
2392                        .as_ref()
2393                        .and_then(|s| s.last_refresh())
2394                        .and_then(|r| r.try_step_forward());
2395                    if let Some(until) = until {
2396                        df_desc.until.meet_assign(&Antichain::from_elem(until));
2397                    }
2398
2399                    let df_meta = self
2400                        .catalog()
2401                        .try_get_dataflow_metainfo(&mview.global_id_writes())
2402                        .expect("added in `bootstrap_dataflow_plans`");
2403
2404                    if self.catalog().state().system_config().enable_mz_notices() {
2405                        // Collect optimization hint updates.
2406                        self.catalog().state().pack_optimizer_notices(
2407                            &mut builtin_table_updates,
2408                            df_meta.optimizer_notices.iter(),
2409                            Diff::ONE,
2410                        );
2411                    }
2412
2413                    self.ship_dataflow(df_desc, mview.cluster_id, mview.target_replica)
2414                        .await;
2415
2416                    // If this is a replacement MV, it must remain read-only until the replacement
2417                    // gets applied.
2418                    if mview.replacement_target.is_none() {
2419                        self.allow_writes(mview.cluster_id, mview.global_id_writes());
2420                    }
2421                }
2422                CatalogItem::Sink(sink) => {
2423                    policies_to_set
2424                        .entry(CompactionWindow::Default)
2425                        .or_insert_with(Default::default)
2426                        .storage_ids
2427                        .insert(sink.global_id());
2428                }
2429                CatalogItem::Connection(catalog_connection) => {
2430                    if let ConnectionDetails::AwsPrivatelink(conn) = &catalog_connection.details {
2431                        privatelink_connections.insert(
2432                            entry.id(),
2433                            VpcEndpointConfig {
2434                                aws_service_name: conn.service_name.clone(),
2435                                availability_zone_ids: conn.availability_zones.clone(),
2436                            },
2437                        );
2438                    }
2439                }
2440                // Nothing to do for these cases
2441                CatalogItem::Log(_)
2442                | CatalogItem::Type(_)
2443                | CatalogItem::Func(_)
2444                | CatalogItem::Secret(_) => {}
2445            }
2446        }
2447
2448        if let Some(cloud_resource_controller) = &self.cloud_resource_controller {
2449            // Clean up any extraneous VpcEndpoints that shouldn't exist.
2450            let existing_vpc_endpoints = cloud_resource_controller
2451                .list_vpc_endpoints()
2452                .await
2453                .context("list vpc endpoints")?;
2454            let existing_vpc_endpoints = BTreeSet::from_iter(existing_vpc_endpoints.into_keys());
2455            let desired_vpc_endpoints = privatelink_connections.keys().cloned().collect();
2456            let vpc_endpoints_to_remove = existing_vpc_endpoints.difference(&desired_vpc_endpoints);
2457            for id in vpc_endpoints_to_remove {
2458                cloud_resource_controller
2459                    .delete_vpc_endpoint(*id)
2460                    .await
2461                    .context("deleting extraneous vpc endpoint")?;
2462            }
2463
2464            // Ensure desired VpcEndpoints are up to date.
2465            for (id, spec) in privatelink_connections {
2466                cloud_resource_controller
2467                    .ensure_vpc_endpoint(id, spec)
2468                    .await
2469                    .context("ensuring vpc endpoint")?;
2470            }
2471        }
2472
2473        // Having installed all entries, creating all constraints, we can now drop read holds and
2474        // relax read policies.
2475        drop(dataflow_read_holds);
2476        // TODO -- Improve `initialize_read_policies` API so we can avoid calling this in a loop.
2477        for (cw, policies) in policies_to_set {
2478            self.initialize_read_policies(&policies, cw).await;
2479        }
2480
2481        // Expose mapping from T-shirt sizes to actual sizes
2482        builtin_table_updates.extend(
2483            self.catalog().state().resolve_builtin_table_updates(
2484                self.catalog().state().pack_all_replica_size_updates(),
2485            ),
2486        );
2487
2488        debug!("startup: coordinator init: bootstrap: initializing migrated builtin tables");
2489        // When 0dt is enabled, we create new shards for any migrated builtin storage collections.
2490        // In read-only mode, the migrated builtin tables (which are a subset of migrated builtin
2491        // storage collections) need to be back-filled so that any dependent dataflow can be
2492        // hydrated. Additionally, these shards are not registered with the txn-shard, and cannot
2493        // be registered while in read-only, so they are written to directly.
2494        let migrated_updates_fut = if self.controller.read_only() {
2495            let min_timestamp = Timestamp::minimum();
2496            let migrated_builtin_table_updates: Vec<_> = builtin_table_updates
2497                .extract_if(.., |update| {
2498                    let gid = self.catalog().get_entry(&update.id).latest_global_id();
2499                    migrated_storage_collections_0dt.contains(&update.id)
2500                        && self
2501                            .controller
2502                            .storage_collections
2503                            .collection_frontiers(gid)
2504                            .expect("all tables are registered")
2505                            .write_frontier
2506                            .elements()
2507                            == &[min_timestamp]
2508                })
2509                .collect();
2510            if migrated_builtin_table_updates.is_empty() {
2511                futures::future::ready(()).boxed()
2512            } else {
2513                // Group all updates per-table.
2514                let mut grouped_appends: BTreeMap<GlobalId, Vec<TableData>> = BTreeMap::new();
2515                for update in migrated_builtin_table_updates {
2516                    let gid = self.catalog().get_entry(&update.id).latest_global_id();
2517                    grouped_appends.entry(gid).or_default().push(update.data);
2518                }
2519                info!(
2520                    "coordinator init: rehydrating migrated builtin tables in read-only mode: {:?}",
2521                    grouped_appends.keys().collect::<Vec<_>>()
2522                );
2523
2524                // Consolidate Row data, staged batches must already be consolidated.
2525                let mut all_appends = Vec::with_capacity(grouped_appends.len());
2526                for (item_id, table_data) in grouped_appends.into_iter() {
2527                    let mut all_rows = Vec::new();
2528                    let mut all_data = Vec::new();
2529                    for data in table_data {
2530                        match data {
2531                            TableData::Rows(rows) => all_rows.extend(rows),
2532                            TableData::Batches(_) => all_data.push(data),
2533                        }
2534                    }
2535                    differential_dataflow::consolidation::consolidate(&mut all_rows);
2536                    all_data.push(TableData::Rows(all_rows));
2537
2538                    // TODO(parkmycar): Use SmallVec throughout.
2539                    all_appends.push((item_id, all_data));
2540                }
2541
2542                let fut = self
2543                    .controller
2544                    .storage
2545                    .append_table(min_timestamp, boot_ts.step_forward(), all_appends)
2546                    .expect("cannot fail to append");
2547                async {
2548                    fut.await
2549                        .expect("One-shot shouldn't be dropped during bootstrap")
2550                        .unwrap_or_terminate("cannot fail to append")
2551                }
2552                .boxed()
2553            }
2554        } else {
2555            futures::future::ready(()).boxed()
2556        };
2557
2558        info!(
2559            "startup: coordinator init: bootstrap: postamble complete in {:?}",
2560            postamble_start.elapsed()
2561        );
2562
2563        let builtin_update_start = Instant::now();
2564        info!("startup: coordinator init: bootstrap: generate builtin updates beginning");
2565
2566        if self.controller.read_only() {
2567            info!(
2568                "coordinator init: bootstrap: stashing builtin table updates while in read-only mode"
2569            );
2570
2571            // TODO(jkosh44) Optimize deserializing the audit log in read-only mode.
2572            let audit_join_start = Instant::now();
2573            info!("startup: coordinator init: bootstrap: audit log deserialization beginning");
2574            let audit_log_updates: Vec<_> = audit_logs_iterator
2575                .map(|(audit_log, ts)| StateUpdate {
2576                    kind: StateUpdateKind::AuditLog(audit_log),
2577                    ts,
2578                    diff: StateDiff::Addition,
2579                })
2580                .collect();
2581            let audit_log_builtin_table_updates = self
2582                .catalog()
2583                .state()
2584                .generate_builtin_table_updates(audit_log_updates);
2585            builtin_table_updates.extend(audit_log_builtin_table_updates);
2586            info!(
2587                "startup: coordinator init: bootstrap: audit log deserialization complete in {:?}",
2588                audit_join_start.elapsed()
2589            );
2590            self.buffered_builtin_table_updates
2591                .as_mut()
2592                .expect("in read-only mode")
2593                .append(&mut builtin_table_updates);
2594        } else {
2595            self.bootstrap_tables(&entries, builtin_table_updates, audit_logs_iterator)
2596                .await;
2597        };
2598        info!(
2599            "startup: coordinator init: bootstrap: generate builtin updates complete in {:?}",
2600            builtin_update_start.elapsed()
2601        );
2602
2603        let cleanup_secrets_start = Instant::now();
2604        info!("startup: coordinator init: bootstrap: generate secret cleanup beginning");
2605        // Cleanup orphaned secrets. Errors during list() or delete() do not
2606        // need to prevent bootstrap from succeeding; we will retry next
2607        // startup.
2608        {
2609            // Destructure Self so we can selectively move fields into the async
2610            // task.
2611            let Self {
2612                secrets_controller,
2613                catalog,
2614                ..
2615            } = self;
2616
2617            let next_user_item_id = catalog.get_next_user_item_id().await?;
2618            let next_system_item_id = catalog.get_next_system_item_id().await?;
2619            let read_only = self.controller.read_only();
2620            // Fetch all IDs from the catalog to future-proof against other
2621            // things using secrets. Today, SECRET and CONNECTION objects use
2622            // secrets_controller.ensure, but more things could in the future
2623            // that would be easy to miss adding here.
2624            let catalog_ids: BTreeSet<CatalogItemId> =
2625                catalog.entries().map(|entry| entry.id()).collect();
2626            let secrets_controller = Arc::clone(secrets_controller);
2627
2628            spawn(|| "cleanup-orphaned-secrets", async move {
2629                if read_only {
2630                    info!(
2631                        "coordinator init: not cleaning up orphaned secrets while in read-only mode"
2632                    );
2633                    return;
2634                }
2635                info!("coordinator init: cleaning up orphaned secrets");
2636
2637                match secrets_controller.list().await {
2638                    Ok(controller_secrets) => {
2639                        let controller_secrets: BTreeSet<CatalogItemId> =
2640                            controller_secrets.into_iter().collect();
2641                        let orphaned = controller_secrets.difference(&catalog_ids);
2642                        for id in orphaned {
2643                            let id_too_large = match id {
2644                                CatalogItemId::System(id) => *id >= next_system_item_id,
2645                                CatalogItemId::User(id) => *id >= next_user_item_id,
2646                                CatalogItemId::IntrospectionSourceIndex(_)
2647                                | CatalogItemId::Transient(_) => false,
2648                            };
2649                            if id_too_large {
2650                                info!(
2651                                    %next_user_item_id, %next_system_item_id,
2652                                    "coordinator init: not deleting orphaned secret {id} that was likely created by a newer deploy generation"
2653                                );
2654                            } else {
2655                                info!("coordinator init: deleting orphaned secret {id}");
2656                                fail_point!("orphan_secrets");
2657                                if let Err(e) = secrets_controller.delete(*id).await {
2658                                    warn!(
2659                                        "Dropping orphaned secret has encountered an error: {}",
2660                                        e
2661                                    );
2662                                }
2663                            }
2664                        }
2665                    }
2666                    Err(e) => warn!("Failed to list secrets during orphan cleanup: {:?}", e),
2667                }
2668            });
2669        }
2670        info!(
2671            "startup: coordinator init: bootstrap: generate secret cleanup complete in {:?}",
2672            cleanup_secrets_start.elapsed()
2673        );
2674
2675        // Run all of our final steps concurrently.
2676        let final_steps_start = Instant::now();
2677        info!(
2678            "startup: coordinator init: bootstrap: migrate builtin tables in read-only mode beginning"
2679        );
2680        migrated_updates_fut
2681            .instrument(info_span!("coord::bootstrap::final"))
2682            .await;
2683
2684        debug!(
2685            "startup: coordinator init: bootstrap: announcing completion of initialization to controller"
2686        );
2687        // Announce the completion of initialization.
2688        self.controller.initialization_complete();
2689
2690        // Initialize unified introspection.
2691        self.bootstrap_introspection_subscribes().await;
2692
2693        info!(
2694            "startup: coordinator init: bootstrap: migrate builtin tables in read-only mode complete in {:?}",
2695            final_steps_start.elapsed()
2696        );
2697
2698        info!(
2699            "startup: coordinator init: bootstrap complete in {:?}",
2700            bootstrap_start.elapsed()
2701        );
2702        Ok(())
2703    }
2704
2705    /// Prepares tables for writing by resetting them to a known state and
2706    /// appending the given builtin table updates. The timestamp oracle
2707    /// will be advanced to the write timestamp of the append when this
2708    /// method returns.
2709    #[allow(clippy::async_yields_async)]
2710    #[instrument]
2711    async fn bootstrap_tables(
2712        &mut self,
2713        entries: &[CatalogEntry],
2714        mut builtin_table_updates: Vec<BuiltinTableUpdate>,
2715        audit_logs_iterator: AuditLogIterator,
2716    ) {
2717        /// Smaller helper struct of metadata for bootstrapping tables.
2718        struct TableMetadata<'a> {
2719            id: CatalogItemId,
2720            name: &'a QualifiedItemName,
2721            table: &'a Table,
2722        }
2723
2724        // Filter our entries down to just tables.
2725        let table_metas: Vec<_> = entries
2726            .into_iter()
2727            .filter_map(|entry| {
2728                entry.table().map(|table| TableMetadata {
2729                    id: entry.id(),
2730                    name: entry.name(),
2731                    table,
2732                })
2733            })
2734            .collect();
2735
2736        // Append empty batches to advance the timestamp of all tables.
2737        debug!("coordinator init: advancing all tables to current timestamp");
2738        let WriteTimestamp {
2739            timestamp: write_ts,
2740            advance_to,
2741        } = self.get_local_write_ts().await;
2742        let appends = table_metas
2743            .iter()
2744            .map(|meta| (meta.table.global_id_writes(), Vec::new()))
2745            .collect();
2746        // Append the tables in the background. We apply the write timestamp before getting a read
2747        // timestamp and reading a snapshot of each table, so the snapshots will block on their own
2748        // until the appends are complete.
2749        let table_fence_rx = self
2750            .controller
2751            .storage
2752            .append_table(write_ts.clone(), advance_to, appends)
2753            .expect("invalid updates");
2754
2755        self.apply_local_write(write_ts).await;
2756
2757        // Add builtin table updates the clear the contents of all system tables
2758        debug!("coordinator init: resetting system tables");
2759        let read_ts = self.get_local_read_ts().await;
2760
2761        // Filter out the 'mz_storage_usage_by_shard' table since we need to retain that info for
2762        // billing purposes.
2763        let mz_storage_usage_by_shard_schema: SchemaSpecifier = self
2764            .catalog()
2765            .resolve_system_schema(MZ_STORAGE_USAGE_BY_SHARD.schema)
2766            .into();
2767        let is_storage_usage_by_shard = |meta: &TableMetadata| -> bool {
2768            meta.name.item == MZ_STORAGE_USAGE_BY_SHARD.name
2769                && meta.name.qualifiers.schema_spec == mz_storage_usage_by_shard_schema
2770        };
2771
2772        let mut retraction_tasks = Vec::new();
2773        let mut system_tables: Vec<_> = table_metas
2774            .iter()
2775            .filter(|meta| meta.id.is_system() && !is_storage_usage_by_shard(meta))
2776            .collect();
2777
2778        // Special case audit events because it's append only.
2779        let (audit_events_idx, _) = system_tables
2780            .iter()
2781            .find_position(|table| {
2782                table.id == self.catalog().resolve_builtin_table(&MZ_AUDIT_EVENTS)
2783            })
2784            .expect("mz_audit_events must exist");
2785        let audit_events = system_tables.remove(audit_events_idx);
2786        let audit_log_task = self.bootstrap_audit_log_table(
2787            audit_events.id,
2788            audit_events.name,
2789            audit_events.table,
2790            audit_logs_iterator,
2791            read_ts,
2792        );
2793
2794        for system_table in system_tables {
2795            let table_id = system_table.id;
2796            let full_name = self.catalog().resolve_full_name(system_table.name, None);
2797            debug!("coordinator init: resetting system table {full_name} ({table_id})");
2798
2799            // Fetch the current contents of the table for retraction.
2800            let snapshot_fut = self
2801                .controller
2802                .storage_collections
2803                .snapshot_cursor(system_table.table.global_id_writes(), read_ts);
2804            let batch_fut = self
2805                .controller
2806                .storage_collections
2807                .create_update_builder(system_table.table.global_id_writes());
2808
2809            let task = spawn(|| format!("snapshot-{table_id}"), async move {
2810                // Create a TimestamplessUpdateBuilder.
2811                let mut batch = batch_fut
2812                    .await
2813                    .unwrap_or_terminate("cannot fail to create a batch for a BuiltinTable");
2814                tracing::info!(?table_id, "starting snapshot");
2815                // Get a cursor which will emit a consolidated snapshot.
2816                let mut snapshot_cursor = snapshot_fut
2817                    .await
2818                    .unwrap_or_terminate("cannot fail to snapshot");
2819
2820                // Retract the current contents, spilling into our builder.
2821                while let Some(values) = snapshot_cursor.next().await {
2822                    for (key, _t, d) in values {
2823                        let d_invert = d.neg();
2824                        batch.add(&key, &(), &d_invert).await;
2825                    }
2826                }
2827                tracing::info!(?table_id, "finished snapshot");
2828
2829                let batch = batch.finish().await;
2830                BuiltinTableUpdate::batch(table_id, batch)
2831            });
2832            retraction_tasks.push(task);
2833        }
2834
2835        let retractions_res = futures::future::join_all(retraction_tasks).await;
2836        for retractions in retractions_res {
2837            builtin_table_updates.push(retractions);
2838        }
2839
2840        let audit_join_start = Instant::now();
2841        info!("startup: coordinator init: bootstrap: join audit log deserialization beginning");
2842        let audit_log_updates = audit_log_task.await;
2843        let audit_log_builtin_table_updates = self
2844            .catalog()
2845            .state()
2846            .generate_builtin_table_updates(audit_log_updates);
2847        builtin_table_updates.extend(audit_log_builtin_table_updates);
2848        info!(
2849            "startup: coordinator init: bootstrap: join audit log deserialization complete in {:?}",
2850            audit_join_start.elapsed()
2851        );
2852
2853        // Now that the snapshots are complete, the appends must also be complete.
2854        table_fence_rx
2855            .await
2856            .expect("One-shot shouldn't be dropped during bootstrap")
2857            .unwrap_or_terminate("cannot fail to append");
2858
2859        info!("coordinator init: sending builtin table updates");
2860        let (_builtin_updates_fut, write_ts) = self
2861            .builtin_table_update()
2862            .execute(builtin_table_updates)
2863            .await;
2864        info!(?write_ts, "our write ts");
2865        if let Some(write_ts) = write_ts {
2866            self.apply_local_write(write_ts).await;
2867        }
2868    }
2869
2870    /// Prepare updates to the audit log table. The audit log table append only and very large, so
2871    /// we only need to find the events present in `audit_logs_iterator` but not in the audit log
2872    /// table.
2873    #[instrument]
2874    fn bootstrap_audit_log_table<'a>(
2875        &self,
2876        table_id: CatalogItemId,
2877        name: &'a QualifiedItemName,
2878        table: &'a Table,
2879        audit_logs_iterator: AuditLogIterator,
2880        read_ts: Timestamp,
2881    ) -> JoinHandle<Vec<StateUpdate>> {
2882        let full_name = self.catalog().resolve_full_name(name, None);
2883        debug!("coordinator init: reconciling audit log: {full_name} ({table_id})");
2884        let current_contents_fut = self
2885            .controller
2886            .storage_collections
2887            .snapshot(table.global_id_writes(), read_ts);
2888        spawn(|| format!("snapshot-audit-log-{table_id}"), async move {
2889            let current_contents = current_contents_fut
2890                .await
2891                .unwrap_or_terminate("cannot fail to fetch snapshot");
2892            let contents_len = current_contents.len();
2893            debug!("coordinator init: audit log table ({table_id}) size {contents_len}");
2894
2895            // Fetch the largest audit log event ID that has been written to the table.
2896            let max_table_id = current_contents
2897                .into_iter()
2898                .filter(|(_, diff)| *diff == 1)
2899                .map(|(row, _diff)| row.unpack_first().unwrap_uint64())
2900                .sorted()
2901                .rev()
2902                .next();
2903
2904            // Filter audit log catalog updates to those that are not present in the table.
2905            audit_logs_iterator
2906                .take_while(|(audit_log, _)| match max_table_id {
2907                    Some(id) => audit_log.event.sortable_id() > id,
2908                    None => true,
2909                })
2910                .map(|(audit_log, ts)| StateUpdate {
2911                    kind: StateUpdateKind::AuditLog(audit_log),
2912                    ts,
2913                    diff: StateDiff::Addition,
2914                })
2915                .collect::<Vec<_>>()
2916        })
2917    }
2918
2919    /// Initializes all storage collections required by catalog objects in the storage controller.
2920    ///
2921    /// This method takes care of collection creation, as well as migration of existing
2922    /// collections.
2923    ///
2924    /// Creating all storage collections in a single `create_collections` call, rather than on
2925    /// demand, is more efficient as it reduces the number of writes to durable storage. It also
2926    /// allows subsequent bootstrap logic to fetch metadata (such as frontiers) of arbitrary
2927    /// storage collections, without needing to worry about dependency order.
2928    ///
2929    /// `migrated_storage_collections` is a set of builtin storage collections that have been
2930    /// migrated and should be handled specially.
2931    #[instrument]
2932    async fn bootstrap_storage_collections(
2933        &mut self,
2934        migrated_storage_collections: &BTreeSet<CatalogItemId>,
2935    ) {
2936        let catalog = self.catalog();
2937
2938        let source_desc = |object_id: GlobalId,
2939                           data_source: &DataSourceDesc,
2940                           desc: &RelationDesc,
2941                           timeline: &Timeline| {
2942            let data_source = match data_source.clone() {
2943                // Re-announce the source description.
2944                DataSourceDesc::Ingestion { desc, cluster_id } => {
2945                    let desc = desc.into_inline_connection(catalog.state());
2946                    let ingestion = IngestionDescription::new(desc, cluster_id, object_id);
2947                    DataSource::Ingestion(ingestion)
2948                }
2949                DataSourceDesc::OldSyntaxIngestion {
2950                    desc,
2951                    progress_subsource,
2952                    data_config,
2953                    details,
2954                    cluster_id,
2955                } => {
2956                    let desc = desc.into_inline_connection(catalog.state());
2957                    let data_config = data_config.into_inline_connection(catalog.state());
2958                    // TODO(parkmycar): We should probably check the type here, but I'm not sure if
2959                    // this will always be a Source or a Table.
2960                    let progress_subsource =
2961                        catalog.get_entry(&progress_subsource).latest_global_id();
2962                    let mut ingestion =
2963                        IngestionDescription::new(desc, cluster_id, progress_subsource);
2964                    let legacy_export = SourceExport {
2965                        storage_metadata: (),
2966                        data_config,
2967                        details,
2968                    };
2969                    ingestion.source_exports.insert(object_id, legacy_export);
2970
2971                    DataSource::Ingestion(ingestion)
2972                }
2973                DataSourceDesc::IngestionExport {
2974                    ingestion_id,
2975                    external_reference: _,
2976                    details,
2977                    data_config,
2978                } => {
2979                    // TODO(parkmycar): We should probably check the type here, but I'm not sure if
2980                    // this will always be a Source or a Table.
2981                    let ingestion_id = catalog.get_entry(&ingestion_id).latest_global_id();
2982
2983                    DataSource::IngestionExport {
2984                        ingestion_id,
2985                        details,
2986                        data_config: data_config.into_inline_connection(catalog.state()),
2987                    }
2988                }
2989                DataSourceDesc::Webhook { .. } => DataSource::Webhook,
2990                DataSourceDesc::Progress => DataSource::Progress,
2991                DataSourceDesc::Introspection(introspection) => {
2992                    DataSource::Introspection(introspection)
2993                }
2994                DataSourceDesc::Catalog => DataSource::Other,
2995            };
2996            CollectionDescription {
2997                desc: desc.clone(),
2998                data_source,
2999                since: None,
3000                timeline: Some(timeline.clone()),
3001                primary: None,
3002            }
3003        };
3004
3005        let mut compute_collections = vec![];
3006        let mut collections = vec![];
3007        for entry in catalog.entries() {
3008            match entry.item() {
3009                CatalogItem::Source(source) => {
3010                    collections.push((
3011                        source.global_id(),
3012                        source_desc(
3013                            source.global_id(),
3014                            &source.data_source,
3015                            &source.desc,
3016                            &source.timeline,
3017                        ),
3018                    ));
3019                }
3020                CatalogItem::Table(table) => {
3021                    match &table.data_source {
3022                        TableDataSource::TableWrites { defaults: _ } => {
3023                            let versions: BTreeMap<_, _> = table
3024                                .collection_descs()
3025                                .map(|(gid, version, desc)| (version, (gid, desc)))
3026                                .collect();
3027                            let collection_descs = versions.iter().map(|(version, (gid, desc))| {
3028                                let next_version = version.bump();
3029                                let primary_collection =
3030                                    versions.get(&next_version).map(|(gid, _desc)| gid).copied();
3031                                let mut collection_desc =
3032                                    CollectionDescription::for_table(desc.clone());
3033                                collection_desc.primary = primary_collection;
3034
3035                                (*gid, collection_desc)
3036                            });
3037                            collections.extend(collection_descs);
3038                        }
3039                        TableDataSource::DataSource {
3040                            desc: data_source_desc,
3041                            timeline,
3042                        } => {
3043                            // TODO(alter_table): Support versioning tables that read from sources.
3044                            soft_assert_eq_or_log!(table.collections.len(), 1);
3045                            let collection_descs =
3046                                table.collection_descs().map(|(gid, _version, desc)| {
3047                                    (
3048                                        gid,
3049                                        source_desc(
3050                                            entry.latest_global_id(),
3051                                            data_source_desc,
3052                                            &desc,
3053                                            timeline,
3054                                        ),
3055                                    )
3056                                });
3057                            collections.extend(collection_descs);
3058                        }
3059                    };
3060                }
3061                CatalogItem::MaterializedView(mv) => {
3062                    let collection_descs = mv.collection_descs().map(|(gid, _version, desc)| {
3063                        let collection_desc =
3064                            CollectionDescription::for_other(desc, mv.initial_as_of.clone());
3065                        (gid, collection_desc)
3066                    });
3067
3068                    collections.extend(collection_descs);
3069                    compute_collections.push((mv.global_id_writes(), mv.desc.latest()));
3070                }
3071                CatalogItem::Sink(sink) => {
3072                    let storage_sink_from_entry = self.catalog().get_entry_by_global_id(&sink.from);
3073                    let from_desc = storage_sink_from_entry
3074                        .relation_desc()
3075                        .expect("sinks can only be built on items with descs")
3076                        .into_owned();
3077                    let collection_desc = CollectionDescription {
3078                        // TODO(sinks): make generic once we have more than one sink type.
3079                        desc: KAFKA_PROGRESS_DESC.clone(),
3080                        data_source: DataSource::Sink {
3081                            desc: ExportDescription {
3082                                sink: StorageSinkDesc {
3083                                    from: sink.from,
3084                                    from_desc,
3085                                    connection: sink
3086                                        .connection
3087                                        .clone()
3088                                        .into_inline_connection(self.catalog().state()),
3089                                    envelope: sink.envelope,
3090                                    as_of: Antichain::from_elem(Timestamp::minimum()),
3091                                    with_snapshot: sink.with_snapshot,
3092                                    version: sink.version,
3093                                    from_storage_metadata: (),
3094                                    to_storage_metadata: (),
3095                                    commit_interval: sink.commit_interval,
3096                                },
3097                                instance_id: sink.cluster_id,
3098                            },
3099                        },
3100                        since: None,
3101                        timeline: None,
3102                        primary: None,
3103                    };
3104                    collections.push((sink.global_id, collection_desc));
3105                }
3106                CatalogItem::Log(_)
3107                | CatalogItem::View(_)
3108                | CatalogItem::Index(_)
3109                | CatalogItem::Type(_)
3110                | CatalogItem::Func(_)
3111                | CatalogItem::Secret(_)
3112                | CatalogItem::Connection(_) => (),
3113            }
3114        }
3115
3116        let register_ts = if self.controller.read_only() {
3117            self.get_local_read_ts().await
3118        } else {
3119            // Getting a write timestamp bumps the write timestamp in the
3120            // oracle, which we're not allowed in read-only mode.
3121            self.get_local_write_ts().await.timestamp
3122        };
3123
3124        let storage_metadata = self.catalog.state().storage_metadata();
3125        let migrated_storage_collections = migrated_storage_collections
3126            .into_iter()
3127            .flat_map(|item_id| self.catalog.get_entry(item_id).global_ids())
3128            .collect();
3129
3130        // Before possibly creating collections, make sure their schemas are correct.
3131        //
3132        // Across different versions of Materialize the nullability of columns can change based on
3133        // updates to our optimizer.
3134        self.controller
3135            .storage
3136            .evolve_nullability_for_bootstrap(storage_metadata, compute_collections)
3137            .await
3138            .unwrap_or_terminate("cannot fail to evolve collections");
3139
3140        // New builtin storage collections are by default created with [0] since/upper frontiers.
3141        // For collections that have dependencies on other collections (MVs, CTs), this can violate
3142        // the frontier invariants assumed by as-of selection. For example, as-of selection expects
3143        // to be able to pick up computing a materialized view from its most recent upper, but if
3144        // that upper is [0] it's likely that the required times are not available anymore in the
3145        // MV inputs.
3146        //
3147        // To avoid violating frontier invariants, we need to bump their sinces to times greater
3148        // than all of their upstream storage inputs. To know the since of a storage input, it has
3149        // to be registered with the storage controller first. Thus we register collections in
3150        // layers: Each iteration registers the collections whose dependencies are all already
3151        // registered.
3152        let mut pending: BTreeMap<_, _> = collections.into_iter().collect();
3153
3154        // Precompute storage-collection dependencies for each collection.
3155        let transitive_dep_gids: BTreeMap<_, _> = pending
3156            .keys()
3157            .map(|gid| {
3158                let entry = self.catalog.get_entry_by_global_id(gid);
3159                let item_id = entry.id();
3160                let deps = self.catalog.state().transitive_uses(item_id);
3161                let dep_gids: BTreeSet<_> = deps
3162                    // Ignore self-dependencies. For example, `transitive_uses` includes the input ID,
3163                    // and CTs can depend on themselves.
3164                    .filter(|dep_id| *dep_id != item_id)
3165                    .map(|dep_id| self.catalog.get_entry(&dep_id).latest_global_id())
3166                    // Ignore dependencies on objects that are not storage collections.
3167                    .filter(|dep_gid| pending.contains_key(dep_gid))
3168                    .collect();
3169                (*gid, dep_gids)
3170            })
3171            .collect();
3172
3173        while !pending.is_empty() {
3174            // Drain collections whose dependencies have all been registered already
3175            // (i.e., are not in `pending`).
3176            let ready_gids: BTreeSet<_> = pending
3177                .keys()
3178                .filter(|gid| {
3179                    let mut deps = transitive_dep_gids[gid].iter();
3180                    !deps.any(|dep_gid| pending.contains_key(dep_gid))
3181                })
3182                .copied()
3183                .collect();
3184            let mut ready: Vec<_> = pending
3185                .extract_if(.., |gid, _| ready_gids.contains(gid))
3186                .collect();
3187
3188            // Bump sinces of builtin collections.
3189            for (gid, collection) in &mut ready {
3190                // Don't silently overwrite an explicitly specified `since`.
3191                if !gid.is_system() || collection.since.is_some() {
3192                    continue;
3193                }
3194
3195                let mut derived_since = Antichain::from_elem(Timestamp::MIN);
3196                for dep_gid in &transitive_dep_gids[gid] {
3197                    let (since, _) = self
3198                        .controller
3199                        .storage
3200                        .collection_frontiers(*dep_gid)
3201                        .expect("previously registered");
3202                    derived_since.join_assign(&since);
3203                }
3204                collection.since = Some(derived_since);
3205            }
3206
3207            if ready.is_empty() {
3208                soft_panic_or_log!(
3209                    "cycle in storage collections: {:?}",
3210                    pending.keys().collect::<Vec<_>>(),
3211                );
3212                // We get here only due to a bug. Rather than crash-looping, we try our best to
3213                // reach a sane state by attempting to register all the remaining collections at
3214                // once.
3215                ready = mem::take(&mut pending).into_iter().collect();
3216            }
3217
3218            self.controller
3219                .storage
3220                .create_collections_for_bootstrap(
3221                    storage_metadata,
3222                    Some(register_ts),
3223                    ready,
3224                    &migrated_storage_collections,
3225                )
3226                .await
3227                .unwrap_or_terminate("cannot fail to create collections");
3228        }
3229
3230        if !self.controller.read_only() {
3231            self.apply_local_write(register_ts).await;
3232        }
3233    }
3234
3235    /// Returns the current list of catalog entries, sorted into an appropriate order for
3236    /// bootstrapping.
3237    ///
3238    /// The returned entries are in dependency order. Indexes are sorted immediately after the
3239    /// objects they index, to ensure that all dependants of these indexed objects can make use of
3240    /// the respective indexes.
3241    fn bootstrap_sort_catalog_entries(&self) -> Vec<CatalogEntry> {
3242        let mut indexes_on = BTreeMap::<_, Vec<_>>::new();
3243        let mut non_indexes = Vec::new();
3244        for entry in self.catalog().entries().cloned() {
3245            if let Some(index) = entry.index() {
3246                let on = self.catalog().get_entry_by_global_id(&index.on);
3247                indexes_on.entry(on.id()).or_default().push(entry);
3248            } else {
3249                non_indexes.push(entry);
3250            }
3251        }
3252
3253        let key_fn = |entry: &CatalogEntry| entry.id;
3254        let dependencies_fn = |entry: &CatalogEntry| entry.uses();
3255        sort_topological(&mut non_indexes, key_fn, dependencies_fn);
3256
3257        let mut result = Vec::new();
3258        for entry in non_indexes {
3259            let id = entry.id();
3260            result.push(entry);
3261            if let Some(mut indexes) = indexes_on.remove(&id) {
3262                result.append(&mut indexes);
3263            }
3264        }
3265
3266        soft_assert_or_log!(
3267            indexes_on.is_empty(),
3268            "indexes with missing dependencies: {indexes_on:?}",
3269        );
3270
3271        result
3272    }
3273
3274    /// Invokes the optimizer on all indexes and materialized views in the catalog and inserts the
3275    /// resulting dataflow plans into the catalog state.
3276    ///
3277    /// `ordered_catalog_entries` must be sorted in dependency order, with dependencies ordered
3278    /// before their dependants.
3279    ///
3280    /// This method does not perform timestamp selection for the dataflows, nor does it create them
3281    /// in the compute controller. Both of these steps happen later during bootstrapping.
3282    ///
3283    /// Returns a map of expressions that were not cached.
3284    #[instrument]
3285    fn bootstrap_dataflow_plans(
3286        &mut self,
3287        ordered_catalog_entries: &[CatalogEntry],
3288        mut cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
3289    ) -> Result<BTreeMap<GlobalId, GlobalExpressions>, AdapterError> {
3290        // The optimizer expects to be able to query its `ComputeInstanceSnapshot` for
3291        // collections the current dataflow can depend on. But since we don't yet install anything
3292        // on compute instances, the snapshot information is incomplete. We fix that by manually
3293        // updating `ComputeInstanceSnapshot` objects to ensure they contain collections previously
3294        // optimized.
3295        let mut instance_snapshots = BTreeMap::new();
3296        let mut uncached_expressions = BTreeMap::new();
3297
3298        let optimizer_config = |catalog: &Catalog, cluster_id| {
3299            let system_config = catalog.system_config();
3300            let overrides = catalog.get_cluster(cluster_id).config.features();
3301            OptimizerConfig::from(system_config).override_from(&overrides)
3302        };
3303
3304        for entry in ordered_catalog_entries {
3305            match entry.item() {
3306                CatalogItem::Index(idx) => {
3307                    // Collect optimizer parameters.
3308                    let compute_instance =
3309                        instance_snapshots.entry(idx.cluster_id).or_insert_with(|| {
3310                            self.instance_snapshot(idx.cluster_id)
3311                                .expect("compute instance exists")
3312                        });
3313                    let global_id = idx.global_id();
3314
3315                    // The index may already be installed on the compute instance. For example,
3316                    // this is the case for introspection indexes.
3317                    if compute_instance.contains_collection(&global_id) {
3318                        continue;
3319                    }
3320
3321                    let optimizer_config = optimizer_config(&self.catalog, idx.cluster_id);
3322
3323                    let (optimized_plan, physical_plan, metainfo) =
3324                        match cached_global_exprs.remove(&global_id) {
3325                            Some(global_expressions)
3326                                if global_expressions.optimizer_features
3327                                    == optimizer_config.features =>
3328                            {
3329                                debug!("global expression cache hit for {global_id:?}");
3330                                (
3331                                    global_expressions.global_mir,
3332                                    global_expressions.physical_plan,
3333                                    global_expressions.dataflow_metainfos,
3334                                )
3335                            }
3336                            Some(_) | None => {
3337                                let (optimized_plan, global_lir_plan) = {
3338                                    // Build an optimizer for this INDEX.
3339                                    let mut optimizer = optimize::index::Optimizer::new(
3340                                        self.owned_catalog(),
3341                                        compute_instance.clone(),
3342                                        global_id,
3343                                        optimizer_config.clone(),
3344                                        self.optimizer_metrics(),
3345                                    );
3346
3347                                    // MIR ⇒ MIR optimization (global)
3348                                    let index_plan = optimize::index::Index::new(
3349                                        entry.name().clone(),
3350                                        idx.on,
3351                                        idx.keys.to_vec(),
3352                                    );
3353                                    let global_mir_plan = optimizer.optimize(index_plan)?;
3354                                    let optimized_plan = global_mir_plan.df_desc().clone();
3355
3356                                    // MIR ⇒ LIR lowering and LIR ⇒ LIR optimization (global)
3357                                    let global_lir_plan = optimizer.optimize(global_mir_plan)?;
3358
3359                                    (optimized_plan, global_lir_plan)
3360                                };
3361
3362                                let (physical_plan, metainfo) = global_lir_plan.unapply();
3363                                let metainfo = {
3364                                    // Pre-allocate a vector of transient GlobalIds for each notice.
3365                                    let notice_ids =
3366                                        std::iter::repeat_with(|| self.allocate_transient_id())
3367                                            .map(|(_item_id, gid)| gid)
3368                                            .take(metainfo.optimizer_notices.len())
3369                                            .collect::<Vec<_>>();
3370                                    // Return a metainfo with rendered notices.
3371                                    self.catalog().render_notices(
3372                                        metainfo,
3373                                        notice_ids,
3374                                        Some(idx.global_id()),
3375                                    )
3376                                };
3377                                uncached_expressions.insert(
3378                                    global_id,
3379                                    GlobalExpressions {
3380                                        global_mir: optimized_plan.clone(),
3381                                        physical_plan: physical_plan.clone(),
3382                                        dataflow_metainfos: metainfo.clone(),
3383                                        optimizer_features: optimizer_config.features.clone(),
3384                                    },
3385                                );
3386                                (optimized_plan, physical_plan, metainfo)
3387                            }
3388                        };
3389
3390                    let catalog = self.catalog_mut();
3391                    catalog.set_optimized_plan(idx.global_id(), optimized_plan);
3392                    catalog.set_physical_plan(idx.global_id(), physical_plan);
3393                    catalog.set_dataflow_metainfo(idx.global_id(), metainfo);
3394
3395                    compute_instance.insert_collection(idx.global_id());
3396                }
3397                CatalogItem::MaterializedView(mv) => {
3398                    // Collect optimizer parameters.
3399                    let compute_instance =
3400                        instance_snapshots.entry(mv.cluster_id).or_insert_with(|| {
3401                            self.instance_snapshot(mv.cluster_id)
3402                                .expect("compute instance exists")
3403                        });
3404                    let global_id = mv.global_id_writes();
3405
3406                    let optimizer_config = optimizer_config(&self.catalog, mv.cluster_id);
3407
3408                    let (optimized_plan, physical_plan, metainfo) = match cached_global_exprs
3409                        .remove(&global_id)
3410                    {
3411                        Some(global_expressions)
3412                            if global_expressions.optimizer_features
3413                                == optimizer_config.features =>
3414                        {
3415                            debug!("global expression cache hit for {global_id:?}");
3416                            (
3417                                global_expressions.global_mir,
3418                                global_expressions.physical_plan,
3419                                global_expressions.dataflow_metainfos,
3420                            )
3421                        }
3422                        Some(_) | None => {
3423                            let (_, internal_view_id) = self.allocate_transient_id();
3424                            let debug_name = self
3425                                .catalog()
3426                                .resolve_full_name(entry.name(), None)
3427                                .to_string();
3428
3429                            let (optimized_plan, global_lir_plan) = {
3430                                // Build an optimizer for this MATERIALIZED VIEW.
3431                                let mut optimizer = optimize::materialized_view::Optimizer::new(
3432                                    self.owned_catalog().as_optimizer_catalog(),
3433                                    compute_instance.clone(),
3434                                    global_id,
3435                                    internal_view_id,
3436                                    mv.desc.latest().iter_names().cloned().collect(),
3437                                    mv.non_null_assertions.clone(),
3438                                    mv.refresh_schedule.clone(),
3439                                    debug_name,
3440                                    optimizer_config.clone(),
3441                                    self.optimizer_metrics(),
3442                                );
3443
3444                                // MIR ⇒ MIR optimization (global)
3445                                // We make sure to use the HIR SQL type (since MIR SQL types may not be coherent).
3446                                let typ = infer_sql_type_for_catalog(
3447                                    &mv.raw_expr,
3448                                    &mv.locally_optimized_expr.as_ref().clone(),
3449                                );
3450                                let global_mir_plan = optimizer
3451                                    .optimize((mv.locally_optimized_expr.as_ref().clone(), typ))?;
3452                                let optimized_plan = global_mir_plan.df_desc().clone();
3453
3454                                // MIR ⇒ LIR lowering and LIR ⇒ LIR optimization (global)
3455                                let global_lir_plan = optimizer.optimize(global_mir_plan)?;
3456
3457                                (optimized_plan, global_lir_plan)
3458                            };
3459
3460                            let (physical_plan, metainfo) = global_lir_plan.unapply();
3461                            let metainfo = {
3462                                // Pre-allocate a vector of transient GlobalIds for each notice.
3463                                let notice_ids =
3464                                    std::iter::repeat_with(|| self.allocate_transient_id())
3465                                        .map(|(_item_id, global_id)| global_id)
3466                                        .take(metainfo.optimizer_notices.len())
3467                                        .collect::<Vec<_>>();
3468                                // Return a metainfo with rendered notices.
3469                                self.catalog().render_notices(
3470                                    metainfo,
3471                                    notice_ids,
3472                                    Some(mv.global_id_writes()),
3473                                )
3474                            };
3475                            uncached_expressions.insert(
3476                                global_id,
3477                                GlobalExpressions {
3478                                    global_mir: optimized_plan.clone(),
3479                                    physical_plan: physical_plan.clone(),
3480                                    dataflow_metainfos: metainfo.clone(),
3481                                    optimizer_features: optimizer_config.features.clone(),
3482                                },
3483                            );
3484                            (optimized_plan, physical_plan, metainfo)
3485                        }
3486                    };
3487
3488                    let catalog = self.catalog_mut();
3489                    catalog.set_optimized_plan(mv.global_id_writes(), optimized_plan);
3490                    catalog.set_physical_plan(mv.global_id_writes(), physical_plan);
3491                    catalog.set_dataflow_metainfo(mv.global_id_writes(), metainfo);
3492
3493                    compute_instance.insert_collection(mv.global_id_writes());
3494                }
3495                CatalogItem::Table(_)
3496                | CatalogItem::Source(_)
3497                | CatalogItem::Log(_)
3498                | CatalogItem::View(_)
3499                | CatalogItem::Sink(_)
3500                | CatalogItem::Type(_)
3501                | CatalogItem::Func(_)
3502                | CatalogItem::Secret(_)
3503                | CatalogItem::Connection(_) => (),
3504            }
3505        }
3506
3507        Ok(uncached_expressions)
3508    }
3509
3510    /// Selects for each compute dataflow an as-of suitable for bootstrapping it.
3511    ///
3512    /// Returns a set of [`ReadHold`]s that ensures the read frontiers of involved collections stay
3513    /// in place and that must not be dropped before all compute dataflows have been created with
3514    /// the compute controller.
3515    ///
3516    /// This method expects all storage collections and dataflow plans to be available, so it must
3517    /// run after [`Coordinator::bootstrap_storage_collections`] and
3518    /// [`Coordinator::bootstrap_dataflow_plans`].
3519    async fn bootstrap_dataflow_as_ofs(&mut self) -> BTreeMap<GlobalId, ReadHold> {
3520        let mut catalog_ids = Vec::new();
3521        let mut dataflows = Vec::new();
3522        let mut read_policies = BTreeMap::new();
3523        for entry in self.catalog.entries() {
3524            let gid = match entry.item() {
3525                CatalogItem::Index(idx) => idx.global_id(),
3526                CatalogItem::MaterializedView(mv) => mv.global_id_writes(),
3527                CatalogItem::Table(_)
3528                | CatalogItem::Source(_)
3529                | CatalogItem::Log(_)
3530                | CatalogItem::View(_)
3531                | CatalogItem::Sink(_)
3532                | CatalogItem::Type(_)
3533                | CatalogItem::Func(_)
3534                | CatalogItem::Secret(_)
3535                | CatalogItem::Connection(_) => continue,
3536            };
3537            if let Some(plan) = self.catalog.try_get_physical_plan(&gid) {
3538                catalog_ids.push(gid);
3539                dataflows.push(plan.clone());
3540
3541                if let Some(compaction_window) = entry.item().initial_logical_compaction_window() {
3542                    read_policies.insert(gid, compaction_window.into());
3543                }
3544            }
3545        }
3546
3547        let read_ts = self.get_local_read_ts().await;
3548        let read_holds = as_of_selection::run(
3549            &mut dataflows,
3550            &read_policies,
3551            &*self.controller.storage_collections,
3552            read_ts,
3553            self.controller.read_only(),
3554        );
3555
3556        let catalog = self.catalog_mut();
3557        for (id, plan) in catalog_ids.into_iter().zip_eq(dataflows) {
3558            catalog.set_physical_plan(id, plan);
3559        }
3560
3561        read_holds
3562    }
3563
3564    /// Serves the coordinator, receiving commands from users over `cmd_rx`
3565    /// and feedback from dataflow workers over `feedback_rx`.
3566    ///
3567    /// You must call `bootstrap` before calling this method.
3568    ///
3569    /// BOXED FUTURE: As of Nov 2023 the returned Future from this function was 92KB. This would
3570    /// get stored on the stack which is bad for runtime performance, and blow up our stack usage.
3571    /// Because of that we purposefully move this Future onto the heap (i.e. Box it).
3572    fn serve(
3573        mut self,
3574        mut internal_cmd_rx: mpsc::UnboundedReceiver<Message>,
3575        mut strict_serializable_reads_rx: mpsc::UnboundedReceiver<(ConnectionId, PendingReadTxn)>,
3576        mut cmd_rx: mpsc::UnboundedReceiver<(OpenTelemetryContext, Command)>,
3577        group_commit_rx: appends::GroupCommitWaiter,
3578    ) -> LocalBoxFuture<'static, ()> {
3579        async move {
3580            // Watcher that listens for and reports cluster service status changes.
3581            let mut cluster_events = self.controller.events_stream();
3582            let last_message = Arc::new(Mutex::new(LastMessage {
3583                kind: "none",
3584                stmt: None,
3585            }));
3586
3587            let (idle_tx, mut idle_rx) = tokio::sync::mpsc::channel(1);
3588            let idle_metric = self.metrics.queue_busy_seconds.clone();
3589            let last_message_watchdog = Arc::clone(&last_message);
3590
3591            spawn(|| "coord watchdog", async move {
3592                // Every 5 seconds, attempt to measure how long it takes for the
3593                // coord select loop to be empty, because this message is the last
3594                // processed. If it is idle, this will result in some microseconds
3595                // of measurement.
3596                let mut interval = tokio::time::interval(Duration::from_secs(5));
3597                // If we end up having to wait more than 5 seconds for the coord to respond, then the
3598                // behavior of Delay results in the interval "restarting" from whenever we yield
3599                // instead of trying to catch up.
3600                interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
3601
3602                // Track if we become stuck to de-dupe error reporting.
3603                let mut coord_stuck = false;
3604
3605                loop {
3606                    interval.tick().await;
3607
3608                    // Wait for space in the channel, if we timeout then the coordinator is stuck!
3609                    let duration = tokio::time::Duration::from_secs(30);
3610                    let timeout = tokio::time::timeout(duration, idle_tx.reserve()).await;
3611                    let Ok(maybe_permit) = timeout else {
3612                        // Only log if we're newly stuck, to prevent logging repeatedly.
3613                        if !coord_stuck {
3614                            let last_message = last_message_watchdog.lock().expect("poisoned");
3615                            tracing::warn!(
3616                                last_message_kind = %last_message.kind,
3617                                last_message_sql = %last_message.stmt_to_string(),
3618                                "coordinator stuck for {duration:?}",
3619                            );
3620                        }
3621                        coord_stuck = true;
3622
3623                        continue;
3624                    };
3625
3626                    // We got a permit, we're not stuck!
3627                    if coord_stuck {
3628                        tracing::info!("Coordinator became unstuck");
3629                    }
3630                    coord_stuck = false;
3631
3632                    // If we failed to acquire a permit it's because we're shutting down.
3633                    let Ok(permit) = maybe_permit else {
3634                        break;
3635                    };
3636
3637                    permit.send(idle_metric.start_timer());
3638                }
3639            });
3640
3641            self.schedule_storage_usage_collection().await;
3642            self.schedule_arrangement_sizes_collection().await;
3643            self.spawn_privatelink_vpc_endpoints_watch_task();
3644            self.spawn_statement_logging_task();
3645            self.spawn_catalog_info_metrics_task();
3646            flags::tracing_config(self.catalog.system_config()).apply(&self.tracing_handle);
3647
3648            // Report if the handling of a single message takes longer than this threshold.
3649            let warn_threshold = self
3650                .catalog()
3651                .system_config()
3652                .coord_slow_message_warn_threshold();
3653
3654            // How many messages we'd like to batch up before processing them. Must be > 0.
3655            const MESSAGE_BATCH: usize = 64;
3656            let mut messages = Vec::with_capacity(MESSAGE_BATCH);
3657            let mut cmd_messages = Vec::with_capacity(MESSAGE_BATCH);
3658
3659            let message_batch = self.metrics.message_batch.clone();
3660
3661            loop {
3662                // Before adding a branch to this select loop, please ensure that the branch is
3663                // cancellation safe and add a comment explaining why. You can refer here for more
3664                // info: https://docs.rs/tokio/latest/tokio/macro.select.html#cancellation-safety
3665                select! {
3666                    // We prioritize internal commands over other commands. However, we work through
3667                    // batches of commands in some branches of this select, which means that even if
3668                    // a command generates internal commands, we will work through the current batch
3669                    // before receiving a new batch of commands.
3670                    biased;
3671
3672                    // `recv_many()` on `UnboundedReceiver` is cancellation safe:
3673                    // https://docs.rs/tokio/1.38.0/tokio/sync/mpsc/struct.UnboundedReceiver.html#cancel-safety-1
3674                    // Receive a batch of commands.
3675                    _ = internal_cmd_rx.recv_many(&mut messages, MESSAGE_BATCH) => {},
3676                    // `next()` on any stream is cancel-safe:
3677                    // https://docs.rs/tokio-stream/0.1.9/tokio_stream/trait.StreamExt.html#cancel-safety
3678                    // Receive a single command.
3679                    Some(event) = cluster_events.next() => {
3680                        messages.push(Message::ClusterEvent(event))
3681                    },
3682                    // See [`mz_controller::Controller::Controller::ready`] for notes
3683                    // on why this is cancel-safe.
3684                    // Receive a single command.
3685                    () = self.controller.ready() => {
3686                        // NOTE: We don't get a `Readiness` back from `ready()`
3687                        // because the controller wants to keep it and it's not
3688                        // trivially `Clone` or `Copy`. Hence this accessor.
3689                        let controller = match self.controller.get_readiness() {
3690                            Readiness::Storage => ControllerReadiness::Storage,
3691                            Readiness::Compute => ControllerReadiness::Compute,
3692                            Readiness::Metrics(_) => ControllerReadiness::Metrics,
3693                            Readiness::Internal(_) => ControllerReadiness::Internal,
3694                            Readiness::NotReady => unreachable!("just signaled as ready"),
3695                        };
3696                        messages.push(Message::ControllerReady { controller });
3697                    }
3698                    // See [`appends::GroupCommitWaiter`] for notes on why this is cancel safe.
3699                    // Receive a single command.
3700                    permit = group_commit_rx.ready() => {
3701                        // If we happen to have batched exactly one user write, use
3702                        // that span so the `emit_trace_id_notice` hooks up.
3703                        // Otherwise, the best we can do is invent a new root span
3704                        // and make it follow from all the Spans in the pending
3705                        // writes.
3706                        let user_write_spans = self.pending_writes.iter().flat_map(|x| match x {
3707                            PendingWriteTxn::User{span, ..} => Some(span),
3708                            PendingWriteTxn::System{..} => None,
3709                        });
3710                        let span = match user_write_spans.exactly_one() {
3711                            Ok(span) => span.clone(),
3712                            Err(user_write_spans) => {
3713                                let span = info_span!(parent: None, "group_commit_notify");
3714                                for s in user_write_spans {
3715                                    span.follows_from(s);
3716                                }
3717                                span
3718                            }
3719                        };
3720                        messages.push(Message::GroupCommitInitiate(span, Some(permit)));
3721                    },
3722                    // `recv_many()` on `UnboundedReceiver` is cancellation safe:
3723                    // https://docs.rs/tokio/1.38.0/tokio/sync/mpsc/struct.UnboundedReceiver.html#cancel-safety-1
3724                    // Receive a batch of commands.
3725                    count = cmd_rx.recv_many(&mut cmd_messages, MESSAGE_BATCH) => {
3726                        if count == 0 {
3727                            break;
3728                        } else {
3729                            messages.extend(cmd_messages.drain(..).map(
3730                                |(otel_ctx, cmd)| Message::Command(otel_ctx, cmd),
3731                            ));
3732                        }
3733                    },
3734                    // `recv()` on `UnboundedReceiver` is cancellation safe:
3735                    // https://docs.rs/tokio/1.38.0/tokio/sync/mpsc/struct.UnboundedReceiver.html#cancel-safety
3736                    // Receive a single command.
3737                    Some(pending_read_txn) = strict_serializable_reads_rx.recv() => {
3738                        let mut pending_read_txns = vec![pending_read_txn];
3739                        while let Ok(pending_read_txn) = strict_serializable_reads_rx.try_recv() {
3740                            pending_read_txns.push(pending_read_txn);
3741                        }
3742                        for (conn_id, pending_read_txn) in pending_read_txns {
3743                            let prev = self
3744                                .pending_linearize_read_txns
3745                                .insert(conn_id, pending_read_txn);
3746                            soft_assert_or_log!(
3747                                prev.is_none(),
3748                                "connections can not have multiple concurrent reads, prev: {prev:?}"
3749                            )
3750                        }
3751                        messages.push(Message::LinearizeReads);
3752                    }
3753                    // `tick()` on `Interval` is cancel-safe:
3754                    // https://docs.rs/tokio/1.19.2/tokio/time/struct.Interval.html#cancel-safety
3755                    // Receive a single command.
3756                    _ = self.advance_timelines_interval.tick() => {
3757                        let span = info_span!(parent: None, "coord::advance_timelines_interval");
3758                        span.follows_from(Span::current());
3759
3760                        // Group commit sends an `AdvanceTimelines` message when
3761                        // done, which is what downgrades read holds. In
3762                        // read-only mode we send this message directly because
3763                        // we're not doing group commits.
3764                        if self.controller.read_only() {
3765                            messages.push(Message::AdvanceTimelines);
3766                        } else {
3767                            messages.push(Message::GroupCommitInitiate(span, None));
3768                        }
3769                    },
3770                    // `tick()` on `Interval` is cancel-safe:
3771                    // https://docs.rs/tokio/1.19.2/tokio/time/struct.Interval.html#cancel-safety
3772                    // Receive a single command.
3773                    _ = self.check_cluster_scheduling_policies_interval.tick() => {
3774                        messages.push(Message::CheckSchedulingPolicies);
3775                    },
3776
3777                    // `tick()` on `Interval` is cancel-safe:
3778                    // https://docs.rs/tokio/1.19.2/tokio/time/struct.Interval.html#cancel-safety
3779                    // Receive a single command.
3780                    _ = self.caught_up_check_interval.tick() => {
3781                        // We do this directly on the main loop instead of
3782                        // firing off a message. We are still in read-only mode,
3783                        // so optimizing for latency, not blocking the main loop
3784                        // is not that important.
3785                        self.maybe_check_caught_up().await;
3786
3787                        continue;
3788                    },
3789
3790                    // Process the idle metric at the lowest priority to sample queue non-idle time.
3791                    // `recv()` on `Receiver` is cancellation safe:
3792                    // https://docs.rs/tokio/1.8.0/tokio/sync/mpsc/struct.Receiver.html#cancel-safety
3793                    // Receive a single command.
3794                    timer = idle_rx.recv() => {
3795                        timer.expect("does not drop").observe_duration();
3796                        self.metrics
3797                            .message_handling
3798                            .with_label_values(&["watchdog"])
3799                            .observe(0.0);
3800                        continue;
3801                    }
3802                };
3803
3804                // Observe the number of messages we're processing at once.
3805                message_batch.observe(f64::cast_lossy(messages.len()));
3806
3807                for msg in messages.drain(..) {
3808                    // All message processing functions trace. Start a parent span
3809                    // for them to make it easy to find slow messages.
3810                    let msg_kind = msg.kind();
3811                    let span = span!(
3812                        target: "mz_adapter::coord::handle_message_loop",
3813                        Level::INFO,
3814                        "coord::handle_message",
3815                        kind = msg_kind
3816                    );
3817                    let otel_context = span.context().span().span_context().clone();
3818
3819                    // Record the last kind of message in case we get stuck. For
3820                    // execute commands, we additionally stash the user's SQL,
3821                    // statement, so we can log it in case we get stuck.
3822                    *last_message.lock().expect("poisoned") = LastMessage {
3823                        kind: msg_kind,
3824                        stmt: match &msg {
3825                            Message::Command(
3826                                _,
3827                                Command::Execute {
3828                                    portal_name,
3829                                    session,
3830                                    ..
3831                                },
3832                            ) => session
3833                                .get_portal_unverified(portal_name)
3834                                .and_then(|p| p.stmt.as_ref().map(Arc::clone)),
3835                            _ => None,
3836                        },
3837                    };
3838
3839                    let start = Instant::now();
3840                    self.handle_message(msg).instrument(span).await;
3841                    let duration = start.elapsed();
3842
3843                    self.metrics
3844                        .message_handling
3845                        .with_label_values(&[msg_kind])
3846                        .observe(duration.as_secs_f64());
3847
3848                    // If something is _really_ slow, print a trace id for debugging, if OTEL is enabled.
3849                    if duration > warn_threshold {
3850                        let trace_id = otel_context.is_valid().then(|| otel_context.trace_id());
3851                        tracing::error!(
3852                            ?msg_kind,
3853                            ?trace_id,
3854                            ?duration,
3855                            "very slow coordinator message"
3856                        );
3857                    }
3858                }
3859            }
3860            // Try and cleanup as a best effort. There may be some async tasks out there holding a
3861            // reference that prevents us from cleaning up.
3862            if let Some(catalog) = Arc::into_inner(self.catalog) {
3863                catalog.expire().await;
3864            }
3865        }
3866        .boxed_local()
3867    }
3868
3869    /// Obtain a read-only Catalog reference.
3870    fn catalog(&self) -> &Catalog {
3871        &self.catalog
3872    }
3873
3874    /// Obtain a read-only Catalog snapshot, suitable for giving out to
3875    /// non-Coordinator thread tasks.
3876    fn owned_catalog(&self) -> Arc<Catalog> {
3877        Arc::clone(&self.catalog)
3878    }
3879
3880    /// Obtain a handle to the optimizer metrics, suitable for giving
3881    /// out to non-Coordinator thread tasks.
3882    fn optimizer_metrics(&self) -> OptimizerMetrics {
3883        self.optimizer_metrics.clone()
3884    }
3885
3886    /// Obtain a writeable Catalog reference.
3887    fn catalog_mut(&mut self) -> &mut Catalog {
3888        // make_mut will cause any other Arc references (from owned_catalog) to
3889        // continue to be valid by cloning the catalog, putting it in a new Arc,
3890        // which lives at self._catalog. If there are no other Arc references,
3891        // then no clone is made, and it returns a reference to the existing
3892        // object. This makes this method and owned_catalog both very cheap: at
3893        // most one clone per catalog mutation, but only if there's a read-only
3894        // reference to it.
3895        Arc::make_mut(&mut self.catalog)
3896    }
3897
3898    /// Refills the user ID pool by allocating IDs from the catalog.
3899    ///
3900    /// Requests `max(min_count, batch_size)` IDs so the pool is never
3901    /// under-filled relative to the configured batch size.
3902    async fn refill_user_id_pool(&mut self, min_count: u64) -> Result<(), AdapterError> {
3903        let batch_size = USER_ID_POOL_BATCH_SIZE.get(self.catalog().system_config().dyncfgs());
3904        let to_allocate = min_count.max(u64::from(batch_size));
3905        let id_ts = self.get_catalog_write_ts().await;
3906        let ids = self.catalog().allocate_user_ids(to_allocate, id_ts).await?;
3907        if let (Some((first_id, _)), Some((last_id, _))) = (ids.first(), ids.last()) {
3908            let start = match first_id {
3909                CatalogItemId::User(id) => *id,
3910                other => {
3911                    return Err(AdapterError::Internal(format!(
3912                        "expected User CatalogItemId, got {other:?}"
3913                    )));
3914                }
3915            };
3916            let end = match last_id {
3917                CatalogItemId::User(id) => *id + 1, // exclusive upper bound
3918                other => {
3919                    return Err(AdapterError::Internal(format!(
3920                        "expected User CatalogItemId, got {other:?}"
3921                    )));
3922                }
3923            };
3924            self.user_id_pool.refill(start, end);
3925        } else {
3926            return Err(AdapterError::Internal(
3927                "catalog returned no user IDs".into(),
3928            ));
3929        }
3930        Ok(())
3931    }
3932
3933    /// Allocates a single user ID, refilling the pool from the catalog if needed.
3934    async fn allocate_user_id(&mut self) -> Result<(CatalogItemId, GlobalId), AdapterError> {
3935        if let Some(id) = self.user_id_pool.allocate() {
3936            return Ok((CatalogItemId::User(id), GlobalId::User(id)));
3937        }
3938        self.refill_user_id_pool(1).await?;
3939        let id = self.user_id_pool.allocate().expect("ID pool just refilled");
3940        Ok((CatalogItemId::User(id), GlobalId::User(id)))
3941    }
3942
3943    /// Allocates `count` user IDs, refilling the pool from the catalog if needed.
3944    async fn allocate_user_ids(
3945        &mut self,
3946        count: u64,
3947    ) -> Result<Vec<(CatalogItemId, GlobalId)>, AdapterError> {
3948        if self.user_id_pool.remaining() < count {
3949            self.refill_user_id_pool(count).await?;
3950        }
3951        let raw_ids = self
3952            .user_id_pool
3953            .allocate_many(count)
3954            .expect("pool has enough IDs after refill");
3955        Ok(raw_ids
3956            .into_iter()
3957            .map(|id| (CatalogItemId::User(id), GlobalId::User(id)))
3958            .collect())
3959    }
3960
3961    /// Obtain a reference to the coordinator's connection context.
3962    fn connection_context(&self) -> &ConnectionContext {
3963        self.controller.connection_context()
3964    }
3965
3966    /// Obtain a reference to the coordinator's secret reader, in an `Arc`.
3967    fn secrets_reader(&self) -> &Arc<dyn SecretsReader> {
3968        &self.connection_context().secrets_reader
3969    }
3970
3971    /// Publishes a notice message to all sessions.
3972    ///
3973    /// TODO(parkmycar): This code is dead, but is a nice parallel to [`Coordinator::broadcast_notice_tx`]
3974    /// so we keep it around.
3975    #[allow(dead_code)]
3976    pub(crate) fn broadcast_notice(&self, notice: AdapterNotice) {
3977        for meta in self.active_conns.values() {
3978            let _ = meta.notice_tx.send(notice.clone());
3979        }
3980    }
3981
3982    /// Returns a closure that will publish a notice to all sessions that were active at the time
3983    /// this method was called.
3984    pub(crate) fn broadcast_notice_tx(
3985        &self,
3986    ) -> Box<dyn FnOnce(AdapterNotice) -> () + Send + 'static> {
3987        let senders: Vec<_> = self
3988            .active_conns
3989            .values()
3990            .map(|meta| meta.notice_tx.clone())
3991            .collect();
3992        Box::new(move |notice| {
3993            for tx in senders {
3994                let _ = tx.send(notice.clone());
3995            }
3996        })
3997    }
3998
3999    pub(crate) fn active_conns(&self) -> &BTreeMap<ConnectionId, ConnMeta> {
4000        &self.active_conns
4001    }
4002
4003    #[instrument(level = "debug")]
4004    pub(crate) fn retire_execution(
4005        &mut self,
4006        reason: StatementEndedExecutionReason,
4007        ctx_extra: ExecuteContextExtra,
4008    ) {
4009        if let Some(uuid) = ctx_extra.retire() {
4010            self.end_statement_execution(uuid, reason);
4011        }
4012    }
4013
4014    /// Creates a new dataflow builder from the catalog and indexes in `self`.
4015    #[instrument(level = "debug")]
4016    pub fn dataflow_builder(&self, instance: ComputeInstanceId) -> DataflowBuilder<'_> {
4017        let compute = self
4018            .instance_snapshot(instance)
4019            .expect("compute instance does not exist");
4020        DataflowBuilder::new(self.catalog().state(), compute)
4021    }
4022
4023    /// Return a reference-less snapshot to the indicated compute instance.
4024    pub fn instance_snapshot(
4025        &self,
4026        id: ComputeInstanceId,
4027    ) -> Result<ComputeInstanceSnapshot, InstanceMissing> {
4028        ComputeInstanceSnapshot::new(&self.controller, id)
4029    }
4030
4031    /// Call into the compute controller to install a finalized dataflow, and
4032    /// initialize the read policies for its exported readable objects.
4033    ///
4034    /// # Panics
4035    ///
4036    /// Panics if dataflow creation fails.
4037    pub(crate) async fn ship_dataflow(
4038        &mut self,
4039        dataflow: DataflowDescription<Plan>,
4040        instance: ComputeInstanceId,
4041        target_replica: Option<ReplicaId>,
4042    ) {
4043        self.try_ship_dataflow(dataflow, instance, target_replica)
4044            .await
4045            .unwrap_or_terminate("dataflow creation cannot fail");
4046    }
4047
4048    /// Call into the compute controller to install a finalized dataflow, and
4049    /// initialize the read policies for its exported readable objects.
4050    pub(crate) async fn try_ship_dataflow(
4051        &mut self,
4052        dataflow: DataflowDescription<Plan>,
4053        instance: ComputeInstanceId,
4054        target_replica: Option<ReplicaId>,
4055    ) -> Result<(), DataflowCreationError> {
4056        // We must only install read policies for indexes, not for sinks.
4057        // Sinks are write-only compute collections that don't have read policies.
4058        let export_ids = dataflow.exported_index_ids().collect();
4059
4060        self.controller
4061            .compute
4062            .create_dataflow(instance, dataflow, target_replica)?;
4063
4064        self.initialize_compute_read_policies(export_ids, instance, CompactionWindow::Default)
4065            .await;
4066
4067        Ok(())
4068    }
4069
4070    /// Call into the compute controller to allow writes to the specified IDs
4071    /// from the specified instance. Calling this function multiple times and
4072    /// calling it on a read-only instance has no effect.
4073    pub(crate) fn allow_writes(&mut self, instance: ComputeInstanceId, id: GlobalId) {
4074        self.controller
4075            .compute
4076            .allow_writes(instance, id)
4077            .unwrap_or_terminate("allow_writes cannot fail");
4078    }
4079
4080    /// Like `ship_dataflow`, but also await on builtin table updates.
4081    pub(crate) async fn ship_dataflow_and_notice_builtin_table_updates(
4082        &mut self,
4083        dataflow: DataflowDescription<Plan>,
4084        instance: ComputeInstanceId,
4085        notice_builtin_updates_fut: Option<BuiltinTableAppendNotify>,
4086        target_replica: Option<ReplicaId>,
4087    ) {
4088        if let Some(notice_builtin_updates_fut) = notice_builtin_updates_fut {
4089            let ship_dataflow_fut = self.ship_dataflow(dataflow, instance, target_replica);
4090            let ((), ()) =
4091                futures::future::join(notice_builtin_updates_fut, ship_dataflow_fut).await;
4092        } else {
4093            self.ship_dataflow(dataflow, instance, target_replica).await;
4094        }
4095    }
4096
4097    /// Install a _watch set_ in the controller that is automatically associated with the given
4098    /// connection id. The watchset will be automatically cleared if the connection terminates
4099    /// before the watchset completes.
4100    pub fn install_compute_watch_set(
4101        &mut self,
4102        conn_id: ConnectionId,
4103        objects: BTreeSet<GlobalId>,
4104        t: Timestamp,
4105        state: WatchSetResponse,
4106    ) -> Result<(), CollectionLookupError> {
4107        let ws_id = self.controller.install_compute_watch_set(objects, t)?;
4108        self.connection_watch_sets
4109            .entry(conn_id.clone())
4110            .or_default()
4111            .insert(ws_id);
4112        self.installed_watch_sets.insert(ws_id, (conn_id, state));
4113        Ok(())
4114    }
4115
4116    /// Install a _watch set_ in the controller that is automatically associated with the given
4117    /// connection id. The watchset will be automatically cleared if the connection terminates
4118    /// before the watchset completes.
4119    pub fn install_storage_watch_set(
4120        &mut self,
4121        conn_id: ConnectionId,
4122        objects: BTreeSet<GlobalId>,
4123        t: Timestamp,
4124        state: WatchSetResponse,
4125    ) -> Result<(), CollectionMissing> {
4126        let ws_id = self.controller.install_storage_watch_set(objects, t)?;
4127        self.connection_watch_sets
4128            .entry(conn_id.clone())
4129            .or_default()
4130            .insert(ws_id);
4131        self.installed_watch_sets.insert(ws_id, (conn_id, state));
4132        Ok(())
4133    }
4134
4135    /// Cancels pending watchsets associated with the provided connection id.
4136    pub fn cancel_pending_watchsets(&mut self, conn_id: &ConnectionId) {
4137        if let Some(ws_ids) = self.connection_watch_sets.remove(conn_id) {
4138            for ws_id in ws_ids {
4139                self.installed_watch_sets.remove(&ws_id);
4140            }
4141        }
4142    }
4143
4144    /// Returns the state of the [`Coordinator`] formatted as JSON.
4145    ///
4146    /// The returned value is not guaranteed to be stable and may change at any point in time.
4147    pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
4148        // Note: We purposefully use the `Debug` formatting for the value of all fields in the
4149        // returned object as a tradeoff between usability and stability. `serde_json` will fail
4150        // to serialize an object if the keys aren't strings, so `Debug` formatting the values
4151        // prevents a future unrelated change from silently breaking this method.
4152
4153        let global_timelines: BTreeMap<_, _> = self
4154            .global_timelines
4155            .iter()
4156            .map(|(timeline, state)| (timeline.to_string(), format!("{state:?}")))
4157            .collect();
4158        let active_conns: BTreeMap<_, _> = self
4159            .active_conns
4160            .iter()
4161            .map(|(id, meta)| (id.unhandled().to_string(), format!("{meta:?}")))
4162            .collect();
4163        let txn_read_holds: BTreeMap<_, _> = self
4164            .txn_read_holds
4165            .iter()
4166            .map(|(id, capability)| (id.unhandled().to_string(), format!("{capability:?}")))
4167            .collect();
4168        let pending_peeks: BTreeMap<_, _> = self
4169            .pending_peeks
4170            .iter()
4171            .map(|(id, peek)| (id.to_string(), format!("{peek:?}")))
4172            .collect();
4173        let client_pending_peeks: BTreeMap<_, _> = self
4174            .client_pending_peeks
4175            .iter()
4176            .map(|(id, peek)| {
4177                let peek: BTreeMap<_, _> = peek
4178                    .iter()
4179                    .map(|(uuid, storage_id)| (uuid.to_string(), storage_id))
4180                    .collect();
4181                (id.to_string(), peek)
4182            })
4183            .collect();
4184        let pending_linearize_read_txns: BTreeMap<_, _> = self
4185            .pending_linearize_read_txns
4186            .iter()
4187            .map(|(id, read_txn)| (id.unhandled().to_string(), format!("{read_txn:?}")))
4188            .collect();
4189
4190        Ok(serde_json::json!({
4191            "global_timelines": global_timelines,
4192            "active_conns": active_conns,
4193            "txn_read_holds": txn_read_holds,
4194            "pending_peeks": pending_peeks,
4195            "client_pending_peeks": client_pending_peeks,
4196            "pending_linearize_read_txns": pending_linearize_read_txns,
4197            "controller": self.controller.dump().await?,
4198        }))
4199    }
4200
4201    /// Prune all storage usage events from the [`MZ_STORAGE_USAGE_BY_SHARD`] table that are older
4202    /// than `retention_period`.
4203    ///
4204    /// This method will read the entire contents of [`MZ_STORAGE_USAGE_BY_SHARD`] into memory
4205    /// which can be expensive.
4206    ///
4207    /// DO NOT call this method outside of startup. The safety of reading at the current oracle read
4208    /// timestamp and then writing at whatever the current write timestamp is (instead of
4209    /// `read_ts + 1`) relies on the fact that there are no outstanding writes during startup.
4210    ///
4211    /// Group commit, which this method uses to write the retractions, has builtin fencing, and we
4212    /// never commit retractions to [`MZ_STORAGE_USAGE_BY_SHARD`] outside of this method, which is
4213    /// only called once during startup. So we don't have to worry about double/invalid retractions.
4214    async fn prune_storage_usage_events_on_startup(&self, retention_period: Duration) {
4215        let item_id = self
4216            .catalog()
4217            .resolve_builtin_table(&MZ_STORAGE_USAGE_BY_SHARD);
4218        let global_id = self.catalog.get_entry(&item_id).latest_global_id();
4219        let read_ts = self.get_local_read_ts().await;
4220        let current_contents_fut = self
4221            .controller
4222            .storage_collections
4223            .snapshot(global_id, read_ts);
4224        let internal_cmd_tx = self.internal_cmd_tx.clone();
4225        spawn(|| "storage_usage_prune", async move {
4226            let mut current_contents = current_contents_fut
4227                .await
4228                .unwrap_or_terminate("cannot fail to fetch snapshot");
4229            differential_dataflow::consolidation::consolidate(&mut current_contents);
4230
4231            let cutoff_ts = u128::from(read_ts).saturating_sub(retention_period.as_millis());
4232            let mut expired = Vec::new();
4233            for (row, diff) in current_contents {
4234                assert_eq!(
4235                    diff, 1,
4236                    "consolidated contents should not contain retractions: ({row:#?}, {diff:#?})"
4237                );
4238                // This logic relies on the definition of `mz_storage_usage_by_shard` not changing.
4239                let collection_timestamp = row
4240                    .unpack()
4241                    .get(3)
4242                    .expect("definition of mz_storage_by_shard changed")
4243                    .unwrap_timestamptz();
4244                let collection_timestamp = collection_timestamp.timestamp_millis();
4245                let collection_timestamp: u128 = collection_timestamp
4246                    .try_into()
4247                    .expect("all collections happen after Jan 1 1970");
4248                if collection_timestamp < cutoff_ts {
4249                    debug!("pruning storage event {row:?}");
4250                    let builtin_update = BuiltinTableUpdate::row(item_id, row, Diff::MINUS_ONE);
4251                    expired.push(builtin_update);
4252                }
4253            }
4254
4255            // main thread has shut down.
4256            let _ = internal_cmd_tx.send(Message::StorageUsagePrune(expired));
4257        });
4258    }
4259
4260    /// Retracts `mz_object_arrangement_size_history` rows older than the
4261    /// `arrangement_size_history_retention_period` dyncfg.
4262    ///
4263    /// Must only run at startup: it reads at the oracle read timestamp and
4264    /// writes retractions at the current write timestamp, which is only safe
4265    /// when no other writes are in flight. See [the equivalent storage-usage
4266    /// pruner](Self::prune_storage_usage_events_on_startup) for the same
4267    /// reasoning.
4268    async fn prune_arrangement_sizes_history_on_startup(&self) {
4269        // The catalog server is not writable in read-only mode.
4270        if self.controller.read_only() {
4271            return;
4272        }
4273
4274        let retention_period = mz_adapter_types::dyncfgs::ARRANGEMENT_SIZE_HISTORY_RETENTION_PERIOD
4275            .get(self.catalog().system_config().dyncfgs());
4276        let item_id = self
4277            .catalog()
4278            .resolve_builtin_table(&mz_catalog::builtin::MZ_OBJECT_ARRANGEMENT_SIZE_HISTORY);
4279        let global_id = self.catalog.get_entry(&item_id).latest_global_id();
4280        let read_ts = self.get_local_read_ts().await;
4281        let current_contents_fut = self
4282            .controller
4283            .storage_collections
4284            .snapshot(global_id, read_ts);
4285        let internal_cmd_tx = self.internal_cmd_tx.clone();
4286        spawn(|| "arrangement_sizes_history_prune", async move {
4287            let mut current_contents = current_contents_fut
4288                .await
4289                .unwrap_or_terminate("cannot fail to fetch snapshot");
4290            differential_dataflow::consolidation::consolidate(&mut current_contents);
4291
4292            let cutoff_ts = u128::from(read_ts).saturating_sub(retention_period.as_millis());
4293            let expired =
4294                arrangement_sizes_expired_retractions(current_contents, cutoff_ts, item_id);
4295
4296            // TODO(arrangement-sizes): when the writeable-catalog-server
4297            // plumbing in https://github.com/MaterializeInc/materialize/pull/35436
4298            // lands, retract directly on `mz_catalog_server`.
4299            let _ = internal_cmd_tx.send(Message::ArrangementSizesPrune(expired));
4300        });
4301    }
4302
4303    fn current_credit_consumption_rate(&self) -> Numeric {
4304        self.catalog()
4305            .user_cluster_replicas()
4306            .filter_map(|replica| match &replica.config.location {
4307                ReplicaLocation::Managed(location) => Some(location.size_for_billing()),
4308                ReplicaLocation::Unmanaged(_) => None,
4309            })
4310            .map(|size| {
4311                self.catalog()
4312                    .cluster_replica_sizes()
4313                    .0
4314                    .get(size)
4315                    .expect("location size is validated against the cluster replica sizes")
4316                    .credits_per_hour
4317            })
4318            .sum()
4319    }
4320}
4321
4322/// Returns retraction updates for rows in a consolidated
4323/// `mz_object_arrangement_size_history` snapshot whose `collection_timestamp`
4324/// (column 3) is strictly before `cutoff_ts`.
4325///
4326/// Panics if any input row has `diff != 1`: the caller must consolidate first,
4327/// and a consolidated history table should never contain retractions because
4328/// the only source of retractions is this function itself.
4329fn arrangement_sizes_expired_retractions(
4330    rows: impl IntoIterator<Item = (mz_repr::Row, i64)>,
4331    cutoff_ts: u128,
4332    item_id: CatalogItemId,
4333) -> Vec<BuiltinTableUpdate> {
4334    let mut expired = Vec::new();
4335    for (row, diff) in rows {
4336        assert_eq!(
4337            diff, 1,
4338            "consolidated contents should not contain retractions: ({row:#?}, {diff:#?})"
4339        );
4340        let collection_timestamp = row
4341            .unpack()
4342            .get(3)
4343            .expect("definition of mz_object_arrangement_size_history changed")
4344            .unwrap_timestamptz()
4345            .timestamp_millis();
4346        let collection_timestamp: u128 = collection_timestamp
4347            .try_into()
4348            .expect("all collections happen after Jan 1 1970");
4349        if collection_timestamp < cutoff_ts {
4350            expired.push(BuiltinTableUpdate::row(item_id, row, Diff::MINUS_ONE));
4351        }
4352    }
4353    expired
4354}
4355
4356#[cfg(test)]
4357impl Coordinator {
4358    #[allow(dead_code)]
4359    async fn verify_ship_dataflow_no_error(&mut self, dataflow: DataflowDescription<Plan>) {
4360        // `ship_dataflow_new` is not allowed to have a `Result` return because this function is
4361        // called after `catalog_transact`, after which no errors are allowed. This test exists to
4362        // prevent us from incorrectly teaching those functions how to return errors (which has
4363        // happened twice and is the motivation for this test).
4364
4365        // An arbitrary compute instance ID to satisfy the function calls below. Note that
4366        // this only works because this function will never run.
4367        let compute_instance = ComputeInstanceId::user(1).expect("1 is a valid ID");
4368
4369        let _: () = self.ship_dataflow(dataflow, compute_instance, None).await;
4370    }
4371}
4372
4373/// Contains information about the last message the [`Coordinator`] processed.
4374struct LastMessage {
4375    kind: &'static str,
4376    stmt: Option<Arc<Statement<Raw>>>,
4377}
4378
4379impl LastMessage {
4380    /// Returns a redacted version of the statement that is safe for logs.
4381    fn stmt_to_string(&self) -> Cow<'static, str> {
4382        self.stmt
4383            .as_ref()
4384            .map(|stmt| stmt.to_ast_string_redacted().into())
4385            .unwrap_or(Cow::Borrowed("<none>"))
4386    }
4387}
4388
4389impl fmt::Debug for LastMessage {
4390    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4391        f.debug_struct("LastMessage")
4392            .field("kind", &self.kind)
4393            .field("stmt", &self.stmt_to_string())
4394            .finish()
4395    }
4396}
4397
4398impl Drop for LastMessage {
4399    fn drop(&mut self) {
4400        // Only print the last message if we're currently panicking, otherwise we'd spam our logs.
4401        if std::thread::panicking() {
4402            // If we're panicking theres no guarantee `tracing` still works, so print to stderr.
4403            eprintln!("Coordinator panicking, dumping last message\n{self:?}",);
4404        }
4405    }
4406}
4407
4408/// Serves the coordinator based on the provided configuration.
4409///
4410/// For a high-level description of the coordinator, see the [crate
4411/// documentation](crate).
4412///
4413/// Returns a handle to the coordinator and a client to communicate with the
4414/// coordinator.
4415///
4416/// BOXED FUTURE: As of Nov 2023 the returned Future from this function was 42KB. This would
4417/// get stored on the stack which is bad for runtime performance, and blow up our stack usage.
4418/// Because of that we purposefully move this Future onto the heap (i.e. Box it).
4419pub fn serve(
4420    Config {
4421        controller_config,
4422        controller_envd_epoch,
4423        mut storage,
4424        audit_logs_iterator,
4425        timestamp_oracle_url,
4426        unsafe_mode,
4427        all_features,
4428        build_info,
4429        environment_id,
4430        metrics_registry,
4431        now,
4432        secrets_controller,
4433        cloud_resource_controller,
4434        cluster_replica_sizes,
4435        builtin_system_cluster_config,
4436        builtin_catalog_server_cluster_config,
4437        builtin_probe_cluster_config,
4438        builtin_support_cluster_config,
4439        builtin_analytics_cluster_config,
4440        system_parameter_defaults,
4441        availability_zones,
4442        storage_usage_client,
4443        storage_usage_collection_interval,
4444        storage_usage_retention_period,
4445        segment_client,
4446        egress_addresses,
4447        aws_account_id,
4448        aws_privatelink_availability_zones,
4449        connection_context,
4450        connection_limit_callback,
4451        remote_system_parameters,
4452        webhook_concurrency_limit,
4453        http_host_name,
4454        tracing_handle,
4455        read_only_controllers,
4456        caught_up_trigger: clusters_caught_up_trigger,
4457        helm_chart_version,
4458        license_key,
4459        external_login_password_mz_system,
4460        force_builtin_schema_migration,
4461    }: Config,
4462) -> BoxFuture<'static, Result<(Handle, Client), AdapterError>> {
4463    async move {
4464        let coord_start = Instant::now();
4465        info!("startup: coordinator init: beginning");
4466        info!("startup: coordinator init: preamble beginning");
4467
4468        // Initializing the builtins can be an expensive process and consume a lot of memory. We
4469        // forcibly initialize it early while the stack is relatively empty to avoid stack
4470        // overflows later.
4471        let _builtins = LazyLock::force(&BUILTINS_STATIC);
4472
4473        let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
4474        let (internal_cmd_tx, internal_cmd_rx) = mpsc::unbounded_channel();
4475        let (strict_serializable_reads_tx, strict_serializable_reads_rx) =
4476            mpsc::unbounded_channel();
4477
4478        // Validate and process availability zones.
4479        if !availability_zones.iter().all_unique() {
4480            coord_bail!("availability zones must be unique");
4481        }
4482
4483        let aws_principal_context = match (
4484            aws_account_id,
4485            connection_context.aws_external_id_prefix.clone(),
4486        ) {
4487            (Some(aws_account_id), Some(aws_external_id_prefix)) => Some(AwsPrincipalContext {
4488                aws_account_id,
4489                aws_external_id_prefix,
4490            }),
4491            _ => None,
4492        };
4493
4494        let aws_privatelink_availability_zones = aws_privatelink_availability_zones
4495            .map(|azs_vec| BTreeSet::from_iter(azs_vec.iter().cloned()));
4496
4497        info!(
4498            "startup: coordinator init: preamble complete in {:?}",
4499            coord_start.elapsed()
4500        );
4501        let oracle_init_start = Instant::now();
4502        info!("startup: coordinator init: timestamp oracle init beginning");
4503
4504        let timestamp_oracle_config = timestamp_oracle_url
4505            .map(|url| TimestampOracleConfig::from_url(&url, &metrics_registry))
4506            .transpose()?;
4507        let mut initial_timestamps =
4508            get_initial_oracle_timestamps(&timestamp_oracle_config).await?;
4509
4510        // Insert an entry for the `EpochMilliseconds` timeline if one doesn't exist,
4511        // which will ensure that the timeline is initialized since it's required
4512        // by the system.
4513        initial_timestamps
4514            .entry(Timeline::EpochMilliseconds)
4515            .or_insert_with(mz_repr::Timestamp::minimum);
4516        let mut timestamp_oracles = BTreeMap::new();
4517        for (timeline, initial_timestamp) in initial_timestamps {
4518            Coordinator::ensure_timeline_state_with_initial_time(
4519                &timeline,
4520                initial_timestamp,
4521                now.clone(),
4522                timestamp_oracle_config.clone(),
4523                &mut timestamp_oracles,
4524                read_only_controllers,
4525            )
4526            .await;
4527        }
4528
4529        // Opening the durable catalog uses one or more timestamps without communicating with
4530        // the timestamp oracle. Here we make sure to apply the catalog upper with the timestamp
4531        // oracle to linearize future operations with opening the catalog.
4532        let catalog_upper = storage.current_upper().await;
4533        // Choose a time at which to boot. This is used, for example, to prune
4534        // old storage usage data or migrate audit log entries.
4535        //
4536        // This time is usually the current system time, but with protection
4537        // against backwards time jumps, even across restarts.
4538        let epoch_millis_oracle = &timestamp_oracles
4539            .get(&Timeline::EpochMilliseconds)
4540            .expect("inserted above")
4541            .oracle;
4542
4543        let mut boot_ts = if read_only_controllers {
4544            let read_ts = epoch_millis_oracle.read_ts().await;
4545            std::cmp::max(read_ts, catalog_upper)
4546        } else {
4547            // Getting/applying a write timestamp bumps the write timestamp in the
4548            // oracle, which we're not allowed in read-only mode.
4549            epoch_millis_oracle.apply_write(catalog_upper).await;
4550            epoch_millis_oracle.write_ts().await.timestamp
4551        };
4552
4553        info!(
4554            "startup: coordinator init: timestamp oracle init complete in {:?}",
4555            oracle_init_start.elapsed()
4556        );
4557
4558        let catalog_open_start = Instant::now();
4559        info!("startup: coordinator init: catalog open beginning");
4560        let persist_client = controller_config
4561            .persist_clients
4562            .open(controller_config.persist_location.clone())
4563            .await
4564            .context("opening persist client")?;
4565        let builtin_item_migration_config =
4566            BuiltinItemMigrationConfig {
4567                persist_client: persist_client.clone(),
4568                read_only: read_only_controllers,
4569                force_migration: force_builtin_schema_migration,
4570            }
4571        ;
4572        let OpenCatalogResult {
4573            mut catalog,
4574            migrated_storage_collections_0dt,
4575            new_builtin_collections,
4576            builtin_table_updates,
4577            cached_global_exprs,
4578            uncached_local_exprs,
4579        } = Catalog::open(mz_catalog::config::Config {
4580            storage,
4581            metrics_registry: &metrics_registry,
4582            state: mz_catalog::config::StateConfig {
4583                unsafe_mode,
4584                all_features,
4585                build_info,
4586                environment_id: environment_id.clone(),
4587                read_only: read_only_controllers,
4588                now: now.clone(),
4589                boot_ts: boot_ts.clone(),
4590                skip_migrations: false,
4591                cluster_replica_sizes,
4592                builtin_system_cluster_config,
4593                builtin_catalog_server_cluster_config,
4594                builtin_probe_cluster_config,
4595                builtin_support_cluster_config,
4596                builtin_analytics_cluster_config,
4597                system_parameter_defaults,
4598                remote_system_parameters,
4599                availability_zones,
4600                egress_addresses,
4601                aws_principal_context,
4602                aws_privatelink_availability_zones,
4603                connection_context,
4604                http_host_name,
4605                builtin_item_migration_config,
4606                persist_client: persist_client.clone(),
4607                enable_expression_cache_override: None,
4608                helm_chart_version,
4609                external_login_password_mz_system,
4610                license_key: license_key.clone(),
4611            },
4612        })
4613        .await?;
4614
4615        // Opening the catalog uses one or more timestamps, so push the boot timestamp up to the
4616        // current catalog upper.
4617        let catalog_upper = catalog.current_upper().await;
4618        boot_ts = std::cmp::max(boot_ts, catalog_upper);
4619
4620        if !read_only_controllers {
4621            epoch_millis_oracle.apply_write(boot_ts).await;
4622        }
4623
4624        info!(
4625            "startup: coordinator init: catalog open complete in {:?}",
4626            catalog_open_start.elapsed()
4627        );
4628
4629        let coord_thread_start = Instant::now();
4630        info!("startup: coordinator init: coordinator thread start beginning");
4631
4632        let session_id = catalog.config().session_id;
4633        let start_instant = catalog.config().start_instant;
4634
4635        // In order for the coordinator to support Rc and Refcell types, it cannot be
4636        // sent across threads. Spawn it in a thread and have this parent thread wait
4637        // for bootstrap completion before proceeding.
4638        let (bootstrap_tx, bootstrap_rx) = oneshot::channel();
4639        let handle = TokioHandle::current();
4640
4641        let metrics = Metrics::register_into(&metrics_registry);
4642        let metrics_clone = metrics.clone();
4643        let optimizer_metrics = OptimizerMetrics::register_into(
4644            &metrics_registry,
4645            catalog.system_config().optimizer_e2e_latency_warning_threshold(),
4646        );
4647        let segment_client_clone = segment_client.clone();
4648        let coord_now = now.clone();
4649        let advance_timelines_interval =
4650            tokio::time::interval(catalog.system_config().default_timestamp_interval());
4651        let mut check_scheduling_policies_interval = tokio::time::interval(
4652            catalog
4653                .system_config()
4654                .cluster_check_scheduling_policies_interval(),
4655        );
4656        check_scheduling_policies_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
4657
4658        let clusters_caught_up_check_interval = if read_only_controllers {
4659            let dyncfgs = catalog.system_config().dyncfgs();
4660            let interval = WITH_0DT_DEPLOYMENT_CAUGHT_UP_CHECK_INTERVAL.get(dyncfgs);
4661
4662            let mut interval = tokio::time::interval(interval);
4663            interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
4664            interval
4665        } else {
4666            // When not in read-only mode, we don't do hydration checks. But we
4667            // still have to provide _some_ interval. This is large enough that
4668            // it doesn't matter.
4669            //
4670            // TODO(aljoscha): We cannot use Duration::MAX right now because of
4671            // https://github.com/tokio-rs/tokio/issues/6634. Use that once it's
4672            // fixed for good.
4673            let mut interval = tokio::time::interval(Duration::from_secs(60 * 60));
4674            interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
4675            interval
4676        };
4677
4678        let clusters_caught_up_check =
4679            clusters_caught_up_trigger.map(|trigger| {
4680                let mut exclude_collections: BTreeSet<GlobalId> =
4681                    new_builtin_collections.into_iter().collect();
4682
4683                // Migrated MVs can't make progress in read-only mode. Exclude them and all their
4684                // transitive dependents.
4685                //
4686                // TODO: Consider sending `allow_writes` for the dataflows of migrated MVs, which
4687                //       would allow them to make progress even in read-only mode. This doesn't
4688                //       work for MVs based on `mz_catalog_raw`, if the leader's version is less
4689                //       than v26.17, since before that version the catalog shard's frontier wasn't
4690                //       kept up-to-date with the current time. So this workaround has to remain in
4691                //       place upgrades from a version less than v26.17 are no longer supported.
4692                let mut todo: Vec<_> = migrated_storage_collections_0dt
4693                    .iter()
4694                    .filter(|id| {
4695                        catalog.state().get_entry(id).is_materialized_view()
4696                    })
4697                    .copied()
4698                    .collect();
4699                while let Some(item_id) = todo.pop() {
4700                    let entry = catalog.state().get_entry(&item_id);
4701                    exclude_collections.extend(entry.global_ids());
4702                    todo.extend_from_slice(entry.used_by());
4703                }
4704
4705                CaughtUpCheckContext {
4706                    trigger,
4707                    exclude_collections,
4708                }
4709            });
4710
4711        if let Some(TimestampOracleConfig::Postgres(pg_config)) =
4712            timestamp_oracle_config.as_ref()
4713        {
4714            // Apply settings from system vars as early as possible because some
4715            // of them are locked in right when an oracle is first opened!
4716            let pg_timestamp_oracle_params =
4717                flags::timestamp_oracle_config(catalog.system_config());
4718            pg_timestamp_oracle_params.apply(pg_config);
4719        }
4720
4721        // Register a callback so whenever the MAX_CONNECTIONS or SUPERUSER_RESERVED_CONNECTIONS
4722        // system variables change, we update our connection limits.
4723        let connection_limit_callback: Arc<dyn Fn(&SystemVars) + Send + Sync> =
4724            Arc::new(move |system_vars: &SystemVars| {
4725                let limit: u64 = system_vars.max_connections().cast_into();
4726                let superuser_reserved: u64 =
4727                    system_vars.superuser_reserved_connections().cast_into();
4728
4729                // If superuser_reserved > max_connections, prefer max_connections.
4730                //
4731                // In this scenario all normal users would be locked out because all connections
4732                // would be reserved for superusers so complain if this is the case.
4733                let superuser_reserved = if superuser_reserved >= limit {
4734                    tracing::warn!(
4735                        "superuser_reserved ({superuser_reserved}) is greater than max connections ({limit})!"
4736                    );
4737                    limit
4738                } else {
4739                    superuser_reserved
4740                };
4741
4742                (connection_limit_callback)(limit, superuser_reserved);
4743            });
4744        catalog.system_config_mut().register_callback(
4745            &mz_sql::session::vars::MAX_CONNECTIONS,
4746            Arc::clone(&connection_limit_callback),
4747        );
4748        catalog.system_config_mut().register_callback(
4749            &mz_sql::session::vars::SUPERUSER_RESERVED_CONNECTIONS,
4750            connection_limit_callback,
4751        );
4752
4753        let (group_commit_tx, group_commit_rx) = appends::notifier();
4754
4755        let parent_span = tracing::Span::current();
4756        let thread = thread::Builder::new()
4757            // The Coordinator thread tends to keep a lot of data on its stack. To
4758            // prevent a stack overflow we allocate a stack three times as big as the default
4759            // stack.
4760            .stack_size(3 * stack::STACK_SIZE)
4761            .name("coordinator".to_string())
4762            .spawn(move || {
4763                let span = info_span!(parent: parent_span, "coord::coordinator").entered();
4764
4765                let controller = handle
4766                    .block_on({
4767                        catalog.initialize_controller(
4768                            controller_config,
4769                            controller_envd_epoch,
4770                            read_only_controllers,
4771                        )
4772                    })
4773                    .unwrap_or_terminate("failed to initialize storage_controller");
4774                // Initializing the controller uses one or more timestamps, so push the boot timestamp up to the
4775                // current catalog upper.
4776                let catalog_upper = handle.block_on(catalog.current_upper());
4777                boot_ts = std::cmp::max(boot_ts, catalog_upper);
4778                if !read_only_controllers {
4779                    let epoch_millis_oracle = &timestamp_oracles
4780                        .get(&Timeline::EpochMilliseconds)
4781                        .expect("inserted above")
4782                        .oracle;
4783                    handle.block_on(epoch_millis_oracle.apply_write(boot_ts));
4784                }
4785
4786                let catalog = Arc::new(catalog);
4787
4788                let caching_secrets_reader = CachingSecretsReader::new(secrets_controller.reader());
4789                let mut coord = Coordinator {
4790                    controller,
4791                    catalog,
4792                    internal_cmd_tx,
4793                    group_commit_tx,
4794                    strict_serializable_reads_tx,
4795                    global_timelines: timestamp_oracles,
4796                    transient_id_gen: Arc::new(TransientIdGen::new()),
4797                    active_conns: BTreeMap::new(),
4798                    txn_read_holds: Default::default(),
4799                    pending_peeks: BTreeMap::new(),
4800                    client_pending_peeks: BTreeMap::new(),
4801                    pending_linearize_read_txns: BTreeMap::new(),
4802                    serialized_ddl: LockedVecDeque::new(),
4803                    active_compute_sinks: BTreeMap::new(),
4804                    active_webhooks: BTreeMap::new(),
4805                    active_copies: BTreeMap::new(),
4806                    connection_cancel_watches: BTreeMap::new(),
4807                    introspection_subscribes: BTreeMap::new(),
4808                    write_locks: BTreeMap::new(),
4809                    deferred_write_ops: BTreeMap::new(),
4810                    pending_writes: Vec::new(),
4811                    advance_timelines_interval,
4812                    secrets_controller,
4813                    caching_secrets_reader,
4814                    cloud_resource_controller,
4815                    storage_usage_client,
4816                    storage_usage_collection_interval,
4817                    segment_client,
4818                    metrics,
4819                    catalog_info_metrics_registry: metrics_registry.clone(),
4820                    optimizer_metrics,
4821                    tracing_handle,
4822                    statement_logging: StatementLogging::new(coord_now.clone()),
4823                    webhook_concurrency_limit,
4824                    timestamp_oracle_config,
4825                    check_cluster_scheduling_policies_interval: check_scheduling_policies_interval,
4826                    cluster_scheduling_decisions: BTreeMap::new(),
4827                    caught_up_check_interval: clusters_caught_up_check_interval,
4828                    caught_up_check: clusters_caught_up_check,
4829                    installed_watch_sets: BTreeMap::new(),
4830                    connection_watch_sets: BTreeMap::new(),
4831                    cluster_replica_statuses: ClusterReplicaStatuses::new(),
4832                    read_only_controllers,
4833                    buffered_builtin_table_updates: Some(Vec::new()),
4834                    license_key,
4835                    user_id_pool: IdPool::empty(),
4836                    persist_client,
4837                };
4838                let bootstrap = handle.block_on(async {
4839                    coord
4840                        .bootstrap(
4841                            boot_ts,
4842                            migrated_storage_collections_0dt,
4843                            builtin_table_updates,
4844                            cached_global_exprs,
4845                            uncached_local_exprs,
4846                            audit_logs_iterator,
4847                        )
4848                        .await?;
4849                    coord
4850                        .controller
4851                        .remove_orphaned_replicas(
4852                            coord.catalog().get_next_user_replica_id().await?,
4853                            coord.catalog().get_next_system_replica_id().await?,
4854                        )
4855                        .await
4856                        .map_err(AdapterError::Orchestrator)?;
4857
4858                    if let Some(retention_period) = storage_usage_retention_period {
4859                        coord
4860                            .prune_storage_usage_events_on_startup(retention_period)
4861                            .await;
4862                    }
4863
4864                    coord.prune_arrangement_sizes_history_on_startup().await;
4865
4866                    Ok(())
4867                });
4868                let ok = bootstrap.is_ok();
4869                drop(span);
4870                bootstrap_tx
4871                    .send(bootstrap)
4872                    .expect("bootstrap_rx is not dropped until it receives this message");
4873                if ok {
4874                    handle.block_on(coord.serve(
4875                        internal_cmd_rx,
4876                        strict_serializable_reads_rx,
4877                        cmd_rx,
4878                        group_commit_rx,
4879                    ));
4880                }
4881            })
4882            .expect("failed to create coordinator thread");
4883        match bootstrap_rx
4884            .await
4885            .expect("bootstrap_tx always sends a message or panics/halts")
4886        {
4887            Ok(()) => {
4888                info!(
4889                    "startup: coordinator init: coordinator thread start complete in {:?}",
4890                    coord_thread_start.elapsed()
4891                );
4892                info!(
4893                    "startup: coordinator init: complete in {:?}",
4894                    coord_start.elapsed()
4895                );
4896                let handle = Handle {
4897                    session_id,
4898                    start_instant,
4899                    _thread: thread.join_on_drop(),
4900                };
4901                let client = Client::new(
4902                    build_info,
4903                    cmd_tx,
4904                    metrics_clone,
4905                    now,
4906                    environment_id,
4907                    segment_client_clone,
4908                );
4909                Ok((handle, client))
4910            }
4911            Err(e) => Err(e),
4912        }
4913    }
4914    .boxed()
4915}
4916
4917// Determines and returns the highest timestamp for each timeline, for all known
4918// timestamp oracle implementations.
4919//
4920// Initially, we did this so that we can switch between implementations of
4921// timestamp oracle, but now we also do this to determine a monotonic boot
4922// timestamp, a timestamp that does not regress across reboots.
4923//
4924// This mostly works, but there can be linearizability violations, because there
4925// is no central moment where we do distributed coordination for all oracle
4926// types. Working around this seems prohibitively hard, maybe even impossible so
4927// we have to live with this window of potential violations during the upgrade
4928// window (which is the only point where we should switch oracle
4929// implementations).
4930async fn get_initial_oracle_timestamps(
4931    timestamp_oracle_config: &Option<TimestampOracleConfig>,
4932) -> Result<BTreeMap<Timeline, Timestamp>, AdapterError> {
4933    let mut initial_timestamps = BTreeMap::new();
4934
4935    if let Some(config) = timestamp_oracle_config {
4936        let oracle_timestamps = config.get_all_timelines().await?;
4937
4938        let debug_msg = || {
4939            oracle_timestamps
4940                .iter()
4941                .map(|(timeline, ts)| format!("{:?} -> {}", timeline, ts))
4942                .join(", ")
4943        };
4944        info!(
4945            "current timestamps from the timestamp oracle: {}",
4946            debug_msg()
4947        );
4948
4949        for (timeline, ts) in oracle_timestamps {
4950            let entry = initial_timestamps
4951                .entry(Timeline::from_str(&timeline).expect("could not parse timeline"));
4952
4953            entry
4954                .and_modify(|current_ts| *current_ts = std::cmp::max(*current_ts, ts))
4955                .or_insert(ts);
4956        }
4957    } else {
4958        info!("no timestamp oracle configured!");
4959    };
4960
4961    let debug_msg = || {
4962        initial_timestamps
4963            .iter()
4964            .map(|(timeline, ts)| format!("{:?}: {}", timeline, ts))
4965            .join(", ")
4966    };
4967    info!("initial oracle timestamps: {}", debug_msg());
4968
4969    Ok(initial_timestamps)
4970}
4971
4972#[instrument]
4973pub async fn load_remote_system_parameters(
4974    storage: &mut Box<dyn OpenableDurableCatalogState>,
4975    system_parameter_sync_config: Option<SystemParameterSyncConfig>,
4976    system_parameter_sync_timeout: Duration,
4977) -> Result<Option<BTreeMap<String, String>>, AdapterError> {
4978    if let Some(system_parameter_sync_config) = system_parameter_sync_config {
4979        tracing::info!("parameter sync on boot: start sync");
4980
4981        // We intentionally block initial startup, potentially forever,
4982        // on initializing LaunchDarkly. This may seem scary, but the
4983        // alternative is even scarier. Over time, we expect that the
4984        // compiled-in default values for the system parameters will
4985        // drift substantially from the defaults configured in
4986        // LaunchDarkly, to the point that starting an environment
4987        // without loading the latest values from LaunchDarkly will
4988        // result in running an untested configuration.
4989        //
4990        // Note this only applies during initial startup. Restarting
4991        // after we've synced once only blocks for a maximum of
4992        // `FRONTEND_SYNC_TIMEOUT` on LaunchDarkly, as it seems
4993        // reasonable to assume that the last-synced configuration was
4994        // valid enough.
4995        //
4996        // This philosophy appears to provide a good balance between not
4997        // running untested configurations in production while also not
4998        // making LaunchDarkly a "tier 1" dependency for existing
4999        // environments.
5000        //
5001        // If this proves to be an issue, we could seek to address the
5002        // configuration drift in a different way--for example, by
5003        // writing a script that runs in CI nightly and checks for
5004        // deviation between the compiled Rust code and LaunchDarkly.
5005        //
5006        // If it is absolutely necessary to bring up a new environment
5007        // while LaunchDarkly is down, the following manual mitigation
5008        // can be performed:
5009        //
5010        //    1. Edit the environmentd startup parameters to omit the
5011        //       LaunchDarkly configuration.
5012        //    2. Boot environmentd.
5013        //    3. Use the catalog-debug tool to run `edit config "{\"key\":\"system_config_synced\"}" "{\"value\": 1}"`.
5014        //    4. Adjust any other parameters as necessary to avoid
5015        //       running a nonstandard configuration in production.
5016        //    5. Edit the environmentd startup parameters to restore the
5017        //       LaunchDarkly configuration, for when LaunchDarkly comes
5018        //       back online.
5019        //    6. Reboot environmentd.
5020        let mut params = SynchronizedParameters::new(SystemVars::default());
5021        let frontend_sync = async {
5022            let frontend = SystemParameterFrontend::from(&system_parameter_sync_config).await?;
5023            frontend.pull(&mut params);
5024            let ops = params
5025                .modified()
5026                .into_iter()
5027                .map(|param| {
5028                    let name = param.name;
5029                    let value = param.value;
5030                    tracing::info!(name, value, initial = true, "sync parameter");
5031                    (name, value)
5032                })
5033                .collect();
5034            tracing::info!("parameter sync on boot: end sync");
5035            Ok(Some(ops))
5036        };
5037        if !storage.has_system_config_synced_once().await? {
5038            frontend_sync.await
5039        } else {
5040            match mz_ore::future::timeout(system_parameter_sync_timeout, frontend_sync).await {
5041                Ok(ops) => Ok(ops),
5042                Err(TimeoutError::Inner(e)) => Err(e),
5043                Err(TimeoutError::DeadlineElapsed) => {
5044                    tracing::info!("parameter sync on boot: sync has timed out");
5045                    Ok(None)
5046                }
5047            }
5048        }
5049    } else {
5050        Ok(None)
5051    }
5052}
5053
5054#[derive(Debug)]
5055pub enum WatchSetResponse {
5056    StatementDependenciesReady(StatementLoggingId, StatementLifecycleEvent),
5057    AlterSinkReady(AlterSinkReadyContext),
5058    AlterMaterializedViewReady(AlterMaterializedViewReadyContext),
5059}
5060
5061#[derive(Debug)]
5062pub struct AlterSinkReadyContext {
5063    ctx: Option<ExecuteContext>,
5064    otel_ctx: OpenTelemetryContext,
5065    plan: AlterSinkPlan,
5066    plan_validity: PlanValidity,
5067    read_hold: ReadHolds,
5068}
5069
5070impl AlterSinkReadyContext {
5071    fn ctx(&mut self) -> &mut ExecuteContext {
5072        self.ctx.as_mut().expect("only cleared on drop")
5073    }
5074
5075    fn retire(mut self, result: Result<ExecuteResponse, AdapterError>) {
5076        self.ctx
5077            .take()
5078            .expect("only cleared on drop")
5079            .retire(result);
5080    }
5081}
5082
5083impl Drop for AlterSinkReadyContext {
5084    fn drop(&mut self) {
5085        if let Some(ctx) = self.ctx.take() {
5086            ctx.retire(Err(AdapterError::Canceled));
5087        }
5088    }
5089}
5090
5091#[derive(Debug)]
5092pub struct AlterMaterializedViewReadyContext {
5093    ctx: Option<ExecuteContext>,
5094    otel_ctx: OpenTelemetryContext,
5095    plan: plan::AlterMaterializedViewApplyReplacementPlan,
5096    plan_validity: PlanValidity,
5097}
5098
5099impl AlterMaterializedViewReadyContext {
5100    fn ctx(&mut self) -> &mut ExecuteContext {
5101        self.ctx.as_mut().expect("only cleared on drop")
5102    }
5103
5104    fn retire(mut self, result: Result<ExecuteResponse, AdapterError>) {
5105        self.ctx
5106            .take()
5107            .expect("only cleared on drop")
5108            .retire(result);
5109    }
5110}
5111
5112impl Drop for AlterMaterializedViewReadyContext {
5113    fn drop(&mut self) {
5114        if let Some(ctx) = self.ctx.take() {
5115            ctx.retire(Err(AdapterError::Canceled));
5116        }
5117    }
5118}
5119
5120/// A struct for tracking the ownership of a lock and a VecDeque to store to-be-done work after the
5121/// lock is freed.
5122#[derive(Debug)]
5123struct LockedVecDeque<T> {
5124    items: VecDeque<T>,
5125    lock: Arc<tokio::sync::Mutex<()>>,
5126}
5127
5128impl<T> LockedVecDeque<T> {
5129    pub fn new() -> Self {
5130        Self {
5131            items: VecDeque::new(),
5132            lock: Arc::new(tokio::sync::Mutex::new(())),
5133        }
5134    }
5135
5136    pub fn try_lock_owned(&self) -> Result<OwnedMutexGuard<()>, tokio::sync::TryLockError> {
5137        Arc::clone(&self.lock).try_lock_owned()
5138    }
5139
5140    pub fn is_empty(&self) -> bool {
5141        self.items.is_empty()
5142    }
5143
5144    pub fn push_back(&mut self, value: T) {
5145        self.items.push_back(value)
5146    }
5147
5148    pub fn pop_front(&mut self) -> Option<T> {
5149        self.items.pop_front()
5150    }
5151
5152    pub fn remove(&mut self, index: usize) -> Option<T> {
5153        self.items.remove(index)
5154    }
5155
5156    pub fn iter(&self) -> std::collections::vec_deque::Iter<'_, T> {
5157        self.items.iter()
5158    }
5159}
5160
5161#[derive(Debug)]
5162struct DeferredPlanStatement {
5163    ctx: ExecuteContext,
5164    ps: PlanStatement,
5165}
5166
5167#[derive(Debug)]
5168enum PlanStatement {
5169    Statement {
5170        stmt: Arc<Statement<Raw>>,
5171        params: Params,
5172    },
5173    Plan {
5174        plan: mz_sql::plan::Plan,
5175        resolved_ids: ResolvedIds,
5176        sql_impl_resolved_ids: ResolvedIds,
5177    },
5178}
5179
5180#[derive(Debug, Error)]
5181pub enum NetworkPolicyError {
5182    #[error("Access denied for address {0}")]
5183    AddressDenied(IpAddr),
5184    #[error("Access denied missing IP address")]
5185    MissingIp,
5186}
5187
5188pub(crate) fn validate_ip_with_policy_rules(
5189    ip: &IpAddr,
5190    rules: &Vec<NetworkPolicyRule>,
5191) -> Result<(), NetworkPolicyError> {
5192    // At the moment we're not handling action or direction
5193    // as those are only able to be "allow" and "ingress" respectively
5194    if rules.iter().any(|r| r.address.0.contains(ip)) {
5195        Ok(())
5196    } else {
5197        Err(NetworkPolicyError::AddressDenied(ip.clone()))
5198    }
5199}
5200
5201pub(crate) fn infer_sql_type_for_catalog(
5202    hir_expr: &HirRelationExpr,
5203    mir_expr: &MirRelationExpr,
5204) -> SqlRelationType {
5205    let mut typ = hir_expr.top_level_typ();
5206    typ.backport_nullability_and_keys(&mir_expr.typ());
5207    typ
5208}
5209
5210#[cfg(test)]
5211mod id_pool_tests {
5212    use super::IdPool;
5213
5214    #[mz_ore::test]
5215    fn test_empty_pool() {
5216        let mut pool = IdPool::empty();
5217        assert_eq!(pool.remaining(), 0);
5218        assert_eq!(pool.allocate(), None);
5219        assert_eq!(pool.allocate_many(1), None);
5220    }
5221
5222    #[mz_ore::test]
5223    fn test_allocate_single() {
5224        let mut pool = IdPool::empty();
5225        pool.refill(10, 13);
5226        assert_eq!(pool.remaining(), 3);
5227        assert_eq!(pool.allocate(), Some(10));
5228        assert_eq!(pool.allocate(), Some(11));
5229        assert_eq!(pool.allocate(), Some(12));
5230        assert_eq!(pool.remaining(), 0);
5231        assert_eq!(pool.allocate(), None);
5232    }
5233
5234    #[mz_ore::test]
5235    fn test_allocate_many() {
5236        let mut pool = IdPool::empty();
5237        pool.refill(100, 105);
5238        assert_eq!(pool.allocate_many(3), Some(vec![100, 101, 102]));
5239        assert_eq!(pool.remaining(), 2);
5240        // Not enough remaining for 3 more.
5241        assert_eq!(pool.allocate_many(3), None);
5242        // But 2 works.
5243        assert_eq!(pool.allocate_many(2), Some(vec![103, 104]));
5244        assert_eq!(pool.remaining(), 0);
5245    }
5246
5247    #[mz_ore::test]
5248    fn test_allocate_many_zero() {
5249        let mut pool = IdPool::empty();
5250        pool.refill(1, 5);
5251        assert_eq!(pool.allocate_many(0), Some(vec![]));
5252        assert_eq!(pool.remaining(), 4);
5253    }
5254
5255    #[mz_ore::test]
5256    fn test_refill_resets_pool() {
5257        let mut pool = IdPool::empty();
5258        pool.refill(0, 2);
5259        assert_eq!(pool.allocate(), Some(0));
5260        // Refill before exhaustion replaces the range.
5261        pool.refill(50, 52);
5262        assert_eq!(pool.allocate(), Some(50));
5263        assert_eq!(pool.allocate(), Some(51));
5264        assert_eq!(pool.allocate(), None);
5265    }
5266
5267    #[mz_ore::test]
5268    fn test_mixed_allocate_and_allocate_many() {
5269        let mut pool = IdPool::empty();
5270        pool.refill(0, 10);
5271        assert_eq!(pool.allocate(), Some(0));
5272        assert_eq!(pool.allocate_many(3), Some(vec![1, 2, 3]));
5273        assert_eq!(pool.allocate(), Some(4));
5274        assert_eq!(pool.remaining(), 5);
5275    }
5276
5277    #[mz_ore::test]
5278    #[should_panic(expected = "invalid pool range")]
5279    fn test_refill_invalid_range_panics() {
5280        let mut pool = IdPool::empty();
5281        pool.refill(10, 5);
5282    }
5283}
5284
5285#[cfg(test)]
5286mod arrangement_sizes_pruner_tests {
5287    use mz_repr::catalog_item_id::CatalogItemId;
5288    use mz_repr::{Datum, Row};
5289
5290    use super::arrangement_sizes_expired_retractions;
5291
5292    // Pack a row shaped like `mz_object_arrangement_size_history`: the pruner
5293    // only cares about column 3 (`collection_timestamp`), but we stuff the
5294    // other three columns with realistic values so shape changes would fail.
5295    fn history_row(ts_ms: i64) -> Row {
5296        let dt = mz_ore::now::to_datetime(ts_ms.try_into().expect("non-negative"));
5297        Row::pack_slice(&[
5298            Datum::String("r1"),
5299            Datum::String("u1"),
5300            Datum::Int64(123),
5301            Datum::TimestampTz(dt.try_into().expect("fits in TimestampTz")),
5302        ])
5303    }
5304
5305    fn item_id() -> CatalogItemId {
5306        // Any CatalogItemId will do; tests don't dispatch on it.
5307        CatalogItemId::User(42)
5308    }
5309
5310    #[mz_ore::test]
5311    fn empty_input_produces_no_retractions() {
5312        let out = arrangement_sizes_expired_retractions(Vec::new(), 1_000, item_id());
5313        assert!(out.is_empty());
5314    }
5315
5316    #[mz_ore::test]
5317    fn retracts_only_rows_strictly_before_cutoff() {
5318        // Mixes both sides of the filter and includes a row at exactly
5319        // the cutoff timestamp to pin down the strict-less-than boundary.
5320        let rows = vec![
5321            (history_row(100), 1),
5322            (history_row(500), 1),
5323            (history_row(1_000), 1), // at cutoff: kept (strict <)
5324            (history_row(5_000), 1),
5325        ];
5326        let out = arrangement_sizes_expired_retractions(rows, 1_000, item_id());
5327        assert_eq!(out.len(), 2);
5328    }
5329
5330    #[mz_ore::test]
5331    #[should_panic(expected = "consolidated contents should not contain retractions")]
5332    fn retraction_in_input_panics() {
5333        let rows = vec![(history_row(100), -1)];
5334        let _ = arrangement_sizes_expired_retractions(rows, 1_000, item_id());
5335    }
5336}