mz_adapter/
coord.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Translation of SQL commands into timestamped `Controller` commands.
11//!
12//! The various SQL commands instruct the system to take actions that are not
13//! yet explicitly timestamped. On the other hand, the underlying data continually
14//! change as time moves forward. On the third hand, we greatly benefit from the
15//! information that some times are no longer of interest, so that we may
16//! compact the representation of the continually changing collections.
17//!
18//! The [`Coordinator`] curates these interactions by observing the progress
19//! collections make through time, choosing timestamps for its own commands,
20//! and eventually communicating that certain times have irretrievably "passed".
21//!
22//! ## Frontiers another way
23//!
24//! If the above description of frontiers left you with questions, this
25//! repackaged explanation might help.
26//!
27//! - `since` is the least recent time (i.e. oldest time) that you can read
28//!   from sources and be guaranteed that the returned data is accurate as of
29//!   that time.
30//!
31//!   Reads at times less than `since` may return values that were not actually
32//!   seen at the specified time, but arrived later (i.e. the results are
33//!   compacted).
34//!
35//!   For correctness' sake, the coordinator never chooses to read at a time
36//!   less than an arrangement's `since`.
37//!
38//! - `upper` is the first time after the most recent time that you can read
39//!   from sources and receive an immediate response. Alternately, it is the
40//!   least time at which the data may still change (that is the reason we may
41//!   not be able to respond immediately).
42//!
43//!   Reads at times >= `upper` may not immediately return because the answer
44//!   isn't known yet. However, once the `upper` is > the specified read time,
45//!   the read can return.
46//!
47//!   For the sake of returned values' freshness, the coordinator prefers
48//!   performing reads at an arrangement's `upper`. However, because we more
49//!   strongly prefer correctness, the coordinator will choose timestamps
50//!   greater than an object's `upper` if it is also being accessed alongside
51//!   objects whose `since` times are >= its `upper`.
52//!
53//! This illustration attempts to show, with time moving left to right, the
54//! relationship between `since` and `upper`.
55//!
56//! - `#`: possibly inaccurate results
57//! - `-`: immediate, correct response
58//! - `?`: not yet known
59//! - `s`: since
60//! - `u`: upper
61//! - `|`: eligible for coordinator to select
62//!
63//! ```nofmt
64//! ####s----u?????
65//!     |||||||||||
66//! ```
67//!
68
69use std::borrow::Cow;
70use std::collections::{BTreeMap, BTreeSet, VecDeque};
71use std::fmt;
72use std::net::IpAddr;
73use std::num::NonZeroI64;
74use std::ops::Neg;
75use std::str::FromStr;
76use std::sync::LazyLock;
77use std::sync::{Arc, Mutex};
78use std::thread;
79use std::time::{Duration, Instant};
80
81use anyhow::Context;
82use chrono::{DateTime, Utc};
83use derivative::Derivative;
84use differential_dataflow::lattice::Lattice;
85use fail::fail_point;
86use futures::StreamExt;
87use futures::future::{BoxFuture, FutureExt, LocalBoxFuture};
88use http::Uri;
89use ipnet::IpNet;
90use itertools::{Either, Itertools};
91use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
92use mz_adapter_types::compaction::CompactionWindow;
93use mz_adapter_types::connection::ConnectionId;
94use mz_adapter_types::dyncfgs::WITH_0DT_DEPLOYMENT_CAUGHT_UP_CHECK_INTERVAL;
95use mz_auth::password::Password;
96use mz_build_info::BuildInfo;
97use mz_catalog::builtin::{BUILTINS, BUILTINS_STATIC, MZ_AUDIT_EVENTS, MZ_STORAGE_USAGE_BY_SHARD};
98use mz_catalog::config::{AwsPrincipalContext, BuiltinItemMigrationConfig, ClusterReplicaSizeMap};
99use mz_catalog::durable::{AuditLogIterator, OpenableDurableCatalogState};
100use mz_catalog::expr_cache::{GlobalExpressions, LocalExpressions};
101use mz_catalog::memory::objects::{
102    CatalogEntry, CatalogItem, ClusterReplicaProcessStatus, ClusterVariantManaged, Connection,
103    DataSourceDesc, StateDiff, StateUpdate, StateUpdateKind, Table, TableDataSource,
104};
105use mz_cloud_resources::{CloudResourceController, VpcEndpointConfig, VpcEndpointEvent};
106use mz_compute_client::as_of_selection;
107use mz_compute_client::controller::error::InstanceMissing;
108use mz_compute_types::ComputeInstanceId;
109use mz_compute_types::dataflows::DataflowDescription;
110use mz_compute_types::plan::Plan;
111use mz_controller::clusters::{
112    ClusterConfig, ClusterEvent, ClusterStatus, ProcessId, ReplicaLocation,
113};
114use mz_controller::{ControllerConfig, Readiness};
115use mz_controller_types::{ClusterId, ReplicaId, WatchSetId};
116use mz_expr::{MapFilterProject, OptimizedMirRelationExpr, RowSetFinishing};
117use mz_license_keys::ValidatedLicenseKey;
118use mz_orchestrator::OfflineReason;
119use mz_ore::cast::{CastFrom, CastInto, CastLossy};
120use mz_ore::channel::trigger::Trigger;
121use mz_ore::future::TimeoutError;
122use mz_ore::metrics::MetricsRegistry;
123use mz_ore::now::{EpochMillis, NowFn};
124use mz_ore::task::{JoinHandle, spawn};
125use mz_ore::thread::JoinHandleExt;
126use mz_ore::tracing::{OpenTelemetryContext, TracingHandle};
127use mz_ore::url::SensitiveUrl;
128use mz_ore::{
129    assert_none, instrument, soft_assert_eq_or_log, soft_assert_or_log, soft_panic_or_log, stack,
130};
131use mz_persist_client::PersistClient;
132use mz_persist_client::batch::ProtoBatch;
133use mz_persist_client::usage::{ShardsUsageReferenced, StorageUsageClient};
134use mz_repr::adt::numeric::Numeric;
135use mz_repr::explain::{ExplainConfig, ExplainFormat};
136use mz_repr::global_id::TransientIdGen;
137use mz_repr::optimize::OptimizerFeatures;
138use mz_repr::role_id::RoleId;
139use mz_repr::{CatalogItemId, Diff, GlobalId, RelationDesc, Timestamp};
140use mz_secrets::cache::CachingSecretsReader;
141use mz_secrets::{SecretsController, SecretsReader};
142use mz_sql::ast::{Raw, Statement};
143use mz_sql::catalog::{CatalogCluster, EnvironmentId};
144use mz_sql::names::{QualifiedItemName, ResolvedIds, SchemaSpecifier};
145use mz_sql::optimizer_metrics::OptimizerMetrics;
146use mz_sql::plan::{
147    self, AlterSinkPlan, ConnectionDetails, CreateConnectionPlan, NetworkPolicyRule,
148    OnTimeoutAction, Params, QueryWhen,
149};
150use mz_sql::session::user::User;
151use mz_sql::session::vars::{MAX_CREDIT_CONSUMPTION_RATE, SystemVars, Var};
152use mz_sql_parser::ast::ExplainStage;
153use mz_sql_parser::ast::display::AstDisplay;
154use mz_storage_client::client::TableData;
155use mz_storage_client::controller::{CollectionDescription, DataSource, ExportDescription};
156use mz_storage_types::connections::Connection as StorageConnection;
157use mz_storage_types::connections::ConnectionContext;
158use mz_storage_types::connections::inline::{IntoInlineConnection, ReferencedConnection};
159use mz_storage_types::read_holds::ReadHold;
160use mz_storage_types::sinks::{S3SinkFormat, StorageSinkDesc};
161use mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC;
162use mz_storage_types::sources::{IngestionDescription, SourceExport, Timeline};
163use mz_timestamp_oracle::WriteTimestamp;
164use mz_timestamp_oracle::postgres_oracle::{
165    PostgresTimestampOracle, PostgresTimestampOracleConfig,
166};
167use mz_transform::dataflow::DataflowMetainfo;
168use opentelemetry::trace::TraceContextExt;
169use serde::Serialize;
170use thiserror::Error;
171use timely::progress::{Antichain, Timestamp as _};
172use tokio::runtime::Handle as TokioHandle;
173use tokio::select;
174use tokio::sync::{OwnedMutexGuard, mpsc, oneshot, watch};
175use tokio::time::{Interval, MissedTickBehavior};
176use tracing::{Instrument, Level, Span, debug, info, info_span, span, warn};
177use tracing_opentelemetry::OpenTelemetrySpanExt;
178use uuid::Uuid;
179
180use crate::active_compute_sink::{ActiveComputeSink, ActiveCopyFrom};
181use crate::catalog::{BuiltinTableUpdate, Catalog, OpenCatalogResult};
182use crate::client::{Client, Handle};
183use crate::command::{Command, ExecuteResponse};
184use crate::config::{SynchronizedParameters, SystemParameterFrontend, SystemParameterSyncConfig};
185use crate::coord::appends::{
186    BuiltinTableAppendNotify, DeferredOp, GroupCommitPermit, PendingWriteTxn,
187};
188use crate::coord::caught_up::CaughtUpCheckContext;
189use crate::coord::cluster_scheduling::SchedulingDecision;
190use crate::coord::id_bundle::CollectionIdBundle;
191use crate::coord::introspection::IntrospectionSubscribe;
192use crate::coord::peek::PendingPeek;
193use crate::coord::statement_logging::{StatementLogging, StatementLoggingId};
194use crate::coord::timeline::{TimelineContext, TimelineState};
195use crate::coord::timestamp_selection::{TimestampContext, TimestampDetermination};
196use crate::coord::validity::PlanValidity;
197use crate::error::AdapterError;
198use crate::explain::insights::PlanInsightsContext;
199use crate::explain::optimizer_trace::{DispatchGuard, OptimizerTrace};
200use crate::metrics::Metrics;
201use crate::optimize::dataflows::{
202    ComputeInstanceSnapshot, DataflowBuilder, dataflow_import_id_bundle,
203};
204use crate::optimize::{self, Optimize, OptimizerConfig};
205use crate::session::{EndTransactionAction, Session};
206use crate::statement_logging::{StatementEndedExecutionReason, StatementLifecycleEvent};
207use crate::util::{ClientTransmitter, ResultExt};
208use crate::webhook::{WebhookAppenderInvalidator, WebhookConcurrencyLimiter};
209use crate::{AdapterNotice, ReadHolds, flags};
210
211pub(crate) mod id_bundle;
212pub(crate) mod in_memory_oracle;
213pub(crate) mod peek;
214pub(crate) mod statement_logging;
215pub(crate) mod timeline;
216pub(crate) mod timestamp_selection;
217
218pub mod appends;
219mod catalog_serving;
220mod caught_up;
221pub mod cluster_scheduling;
222mod command_handler;
223pub mod consistency;
224mod ddl;
225mod indexes;
226mod introspection;
227mod message_handler;
228mod privatelink_status;
229pub mod read_policy;
230mod sequencer;
231mod sql;
232mod validity;
233
234#[derive(Debug)]
235pub enum Message {
236    Command(OpenTelemetryContext, Command),
237    ControllerReady {
238        controller: ControllerReadiness,
239    },
240    PurifiedStatementReady(PurifiedStatementReady),
241    CreateConnectionValidationReady(CreateConnectionValidationReady),
242    AlterConnectionValidationReady(AlterConnectionValidationReady),
243    TryDeferred {
244        /// The connection that created this op.
245        conn_id: ConnectionId,
246        /// The write lock that notified us our deferred op might be able to run.
247        ///
248        /// Note: While we never want to hold a partial set of locks, it can be important to hold
249        /// onto the _one_ that notified us our op might be ready. If there are multiple operations
250        /// waiting on a single collection, and we don't hold this lock through retyring the op,
251        /// then everything waiting on this collection will get retried causing traffic in the
252        /// Coordinator's message queue.
253        ///
254        /// See [`DeferredOp::can_be_optimistically_retried`] for more detail.
255        acquired_lock: Option<(CatalogItemId, tokio::sync::OwnedMutexGuard<()>)>,
256    },
257    /// Initiates a group commit.
258    GroupCommitInitiate(Span, Option<GroupCommitPermit>),
259    DeferredStatementReady,
260    AdvanceTimelines,
261    ClusterEvent(ClusterEvent),
262    CancelPendingPeeks {
263        conn_id: ConnectionId,
264    },
265    LinearizeReads,
266    StagedBatches {
267        conn_id: ConnectionId,
268        table_id: CatalogItemId,
269        batches: Vec<Result<ProtoBatch, String>>,
270    },
271    StorageUsageSchedule,
272    StorageUsageFetch,
273    StorageUsageUpdate(ShardsUsageReferenced),
274    StorageUsagePrune(Vec<BuiltinTableUpdate>),
275    /// Performs any cleanup and logging actions necessary for
276    /// finalizing a statement execution.
277    RetireExecute {
278        data: ExecuteContextExtra,
279        otel_ctx: OpenTelemetryContext,
280        reason: StatementEndedExecutionReason,
281    },
282    ExecuteSingleStatementTransaction {
283        ctx: ExecuteContext,
284        otel_ctx: OpenTelemetryContext,
285        stmt: Arc<Statement<Raw>>,
286        params: mz_sql::plan::Params,
287    },
288    PeekStageReady {
289        ctx: ExecuteContext,
290        span: Span,
291        stage: PeekStage,
292    },
293    CreateIndexStageReady {
294        ctx: ExecuteContext,
295        span: Span,
296        stage: CreateIndexStage,
297    },
298    CreateViewStageReady {
299        ctx: ExecuteContext,
300        span: Span,
301        stage: CreateViewStage,
302    },
303    CreateMaterializedViewStageReady {
304        ctx: ExecuteContext,
305        span: Span,
306        stage: CreateMaterializedViewStage,
307    },
308    SubscribeStageReady {
309        ctx: ExecuteContext,
310        span: Span,
311        stage: SubscribeStage,
312    },
313    IntrospectionSubscribeStageReady {
314        span: Span,
315        stage: IntrospectionSubscribeStage,
316    },
317    SecretStageReady {
318        ctx: ExecuteContext,
319        span: Span,
320        stage: SecretStage,
321    },
322    ClusterStageReady {
323        ctx: ExecuteContext,
324        span: Span,
325        stage: ClusterStage,
326    },
327    ExplainTimestampStageReady {
328        ctx: ExecuteContext,
329        span: Span,
330        stage: ExplainTimestampStage,
331    },
332    DrainStatementLog,
333    PrivateLinkVpcEndpointEvents(Vec<VpcEndpointEvent>),
334    CheckSchedulingPolicies,
335
336    /// Scheduling policy decisions about turning clusters On/Off.
337    /// `Vec<(policy name, Vec of decisions by the policy)>`
338    /// A cluster will be On if and only if there is at least one On decision for it.
339    /// Scheduling decisions for clusters that have `SCHEDULE = MANUAL` are ignored.
340    SchedulingDecisions(Vec<(&'static str, Vec<(ClusterId, SchedulingDecision)>)>),
341}
342
343impl Message {
344    /// Returns a string to identify the kind of [`Message`], useful for logging.
345    pub const fn kind(&self) -> &'static str {
346        match self {
347            Message::Command(_, msg) => match msg {
348                Command::CatalogSnapshot { .. } => "command-catalog_snapshot",
349                Command::Startup { .. } => "command-startup",
350                Command::Execute { .. } => "command-execute",
351                Command::Commit { .. } => "command-commit",
352                Command::CancelRequest { .. } => "command-cancel_request",
353                Command::PrivilegedCancelRequest { .. } => "command-privileged_cancel_request",
354                Command::GetWebhook { .. } => "command-get_webhook",
355                Command::GetSystemVars { .. } => "command-get_system_vars",
356                Command::SetSystemVars { .. } => "command-set_system_vars",
357                Command::Terminate { .. } => "command-terminate",
358                Command::RetireExecute { .. } => "command-retire_execute",
359                Command::CheckConsistency { .. } => "command-check_consistency",
360                Command::Dump { .. } => "command-dump",
361                Command::AuthenticatePassword { .. } => "command-auth_check",
362                Command::AuthenticateGetSASLChallenge { .. } => "command-auth_get_sasl_challenge",
363                Command::AuthenticateVerifySASLProof { .. } => "command-auth_verify_sasl_proof",
364            },
365            Message::ControllerReady {
366                controller: ControllerReadiness::Compute,
367            } => "controller_ready(compute)",
368            Message::ControllerReady {
369                controller: ControllerReadiness::Storage,
370            } => "controller_ready(storage)",
371            Message::ControllerReady {
372                controller: ControllerReadiness::Metrics,
373            } => "controller_ready(metrics)",
374            Message::ControllerReady {
375                controller: ControllerReadiness::Internal,
376            } => "controller_ready(internal)",
377            Message::PurifiedStatementReady(_) => "purified_statement_ready",
378            Message::CreateConnectionValidationReady(_) => "create_connection_validation_ready",
379            Message::TryDeferred { .. } => "try_deferred",
380            Message::GroupCommitInitiate(..) => "group_commit_initiate",
381            Message::AdvanceTimelines => "advance_timelines",
382            Message::ClusterEvent(_) => "cluster_event",
383            Message::CancelPendingPeeks { .. } => "cancel_pending_peeks",
384            Message::LinearizeReads => "linearize_reads",
385            Message::StagedBatches { .. } => "staged_batches",
386            Message::StorageUsageSchedule => "storage_usage_schedule",
387            Message::StorageUsageFetch => "storage_usage_fetch",
388            Message::StorageUsageUpdate(_) => "storage_usage_update",
389            Message::StorageUsagePrune(_) => "storage_usage_prune",
390            Message::RetireExecute { .. } => "retire_execute",
391            Message::ExecuteSingleStatementTransaction { .. } => {
392                "execute_single_statement_transaction"
393            }
394            Message::PeekStageReady { .. } => "peek_stage_ready",
395            Message::ExplainTimestampStageReady { .. } => "explain_timestamp_stage_ready",
396            Message::CreateIndexStageReady { .. } => "create_index_stage_ready",
397            Message::CreateViewStageReady { .. } => "create_view_stage_ready",
398            Message::CreateMaterializedViewStageReady { .. } => {
399                "create_materialized_view_stage_ready"
400            }
401            Message::SubscribeStageReady { .. } => "subscribe_stage_ready",
402            Message::IntrospectionSubscribeStageReady { .. } => {
403                "introspection_subscribe_stage_ready"
404            }
405            Message::SecretStageReady { .. } => "secret_stage_ready",
406            Message::ClusterStageReady { .. } => "cluster_stage_ready",
407            Message::DrainStatementLog => "drain_statement_log",
408            Message::AlterConnectionValidationReady(..) => "alter_connection_validation_ready",
409            Message::PrivateLinkVpcEndpointEvents(_) => "private_link_vpc_endpoint_events",
410            Message::CheckSchedulingPolicies => "check_scheduling_policies",
411            Message::SchedulingDecisions { .. } => "scheduling_decision",
412            Message::DeferredStatementReady => "deferred_statement_ready",
413        }
414    }
415}
416
417/// The reason for why a controller needs processing on the main loop.
418#[derive(Debug)]
419pub enum ControllerReadiness {
420    /// The storage controller is ready.
421    Storage,
422    /// The compute controller is ready.
423    Compute,
424    /// A batch of metric data is ready.
425    Metrics,
426    /// An internally-generated message is ready to be returned.
427    Internal,
428}
429
430#[derive(Derivative)]
431#[derivative(Debug)]
432pub struct BackgroundWorkResult<T> {
433    #[derivative(Debug = "ignore")]
434    pub ctx: ExecuteContext,
435    pub result: Result<T, AdapterError>,
436    pub params: Params,
437    pub plan_validity: PlanValidity,
438    pub original_stmt: Arc<Statement<Raw>>,
439    pub otel_ctx: OpenTelemetryContext,
440}
441
442pub type PurifiedStatementReady = BackgroundWorkResult<mz_sql::pure::PurifiedStatement>;
443
444#[derive(Derivative)]
445#[derivative(Debug)]
446pub struct ValidationReady<T> {
447    #[derivative(Debug = "ignore")]
448    pub ctx: ExecuteContext,
449    pub result: Result<T, AdapterError>,
450    pub resolved_ids: ResolvedIds,
451    pub connection_id: CatalogItemId,
452    pub connection_gid: GlobalId,
453    pub plan_validity: PlanValidity,
454    pub otel_ctx: OpenTelemetryContext,
455}
456
457pub type CreateConnectionValidationReady = ValidationReady<CreateConnectionPlan>;
458pub type AlterConnectionValidationReady = ValidationReady<Connection>;
459
460#[derive(Debug)]
461pub enum PeekStage {
462    /// Common stages across SELECT, EXPLAIN and COPY TO queries.
463    LinearizeTimestamp(PeekStageLinearizeTimestamp),
464    RealTimeRecency(PeekStageRealTimeRecency),
465    TimestampReadHold(PeekStageTimestampReadHold),
466    Optimize(PeekStageOptimize),
467    /// Final stage for a peek.
468    Finish(PeekStageFinish),
469    /// Final stage for an explain.
470    ExplainPlan(PeekStageExplainPlan),
471    ExplainPushdown(PeekStageExplainPushdown),
472    /// Preflight checks for a copy to operation.
473    CopyToPreflight(PeekStageCopyTo),
474    /// Final stage for a copy to which involves shipping the dataflow.
475    CopyToDataflow(PeekStageCopyTo),
476}
477
478#[derive(Debug)]
479pub struct CopyToContext {
480    /// The `RelationDesc` of the data to be copied.
481    pub desc: RelationDesc,
482    /// The destination uri of the external service where the data will be copied.
483    pub uri: Uri,
484    /// Connection information required to connect to the external service to copy the data.
485    pub connection: StorageConnection<ReferencedConnection>,
486    /// The ID of the CONNECTION object to be used for copying the data.
487    pub connection_id: CatalogItemId,
488    /// Format params to format the data.
489    pub format: S3SinkFormat,
490    /// Approximate max file size of each uploaded file.
491    pub max_file_size: u64,
492    /// Number of batches the output of the COPY TO will be partitioned into
493    /// to distribute the load across workers deterministically.
494    /// This is only an option since it's not set when CopyToContext is instantiated
495    /// but immediately after in the PeekStageValidate stage.
496    pub output_batch_count: Option<u64>,
497}
498
499#[derive(Debug)]
500pub struct PeekStageLinearizeTimestamp {
501    validity: PlanValidity,
502    plan: mz_sql::plan::SelectPlan,
503    max_query_result_size: Option<u64>,
504    source_ids: BTreeSet<GlobalId>,
505    target_replica: Option<ReplicaId>,
506    timeline_context: TimelineContext,
507    optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
508    /// An optional context set iff the state machine is initiated from
509    /// sequencing an EXPLAIN for this statement.
510    explain_ctx: ExplainContext,
511}
512
513#[derive(Debug)]
514pub struct PeekStageRealTimeRecency {
515    validity: PlanValidity,
516    plan: mz_sql::plan::SelectPlan,
517    max_query_result_size: Option<u64>,
518    source_ids: BTreeSet<GlobalId>,
519    target_replica: Option<ReplicaId>,
520    timeline_context: TimelineContext,
521    oracle_read_ts: Option<Timestamp>,
522    optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
523    /// An optional context set iff the state machine is initiated from
524    /// sequencing an EXPLAIN for this statement.
525    explain_ctx: ExplainContext,
526}
527
528#[derive(Debug)]
529pub struct PeekStageTimestampReadHold {
530    validity: PlanValidity,
531    plan: mz_sql::plan::SelectPlan,
532    max_query_result_size: Option<u64>,
533    source_ids: BTreeSet<GlobalId>,
534    target_replica: Option<ReplicaId>,
535    timeline_context: TimelineContext,
536    oracle_read_ts: Option<Timestamp>,
537    real_time_recency_ts: Option<mz_repr::Timestamp>,
538    optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
539    /// An optional context set iff the state machine is initiated from
540    /// sequencing an EXPLAIN for this statement.
541    explain_ctx: ExplainContext,
542}
543
544#[derive(Debug)]
545pub struct PeekStageOptimize {
546    validity: PlanValidity,
547    plan: mz_sql::plan::SelectPlan,
548    max_query_result_size: Option<u64>,
549    source_ids: BTreeSet<GlobalId>,
550    id_bundle: CollectionIdBundle,
551    target_replica: Option<ReplicaId>,
552    determination: TimestampDetermination<mz_repr::Timestamp>,
553    optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
554    /// An optional context set iff the state machine is initiated from
555    /// sequencing an EXPLAIN for this statement.
556    explain_ctx: ExplainContext,
557}
558
559#[derive(Debug)]
560pub struct PeekStageFinish {
561    validity: PlanValidity,
562    plan: mz_sql::plan::SelectPlan,
563    max_query_result_size: Option<u64>,
564    id_bundle: CollectionIdBundle,
565    target_replica: Option<ReplicaId>,
566    source_ids: BTreeSet<GlobalId>,
567    determination: TimestampDetermination<mz_repr::Timestamp>,
568    cluster_id: ComputeInstanceId,
569    finishing: RowSetFinishing,
570    /// When present, an optimizer trace to be used for emitting a plan insights
571    /// notice.
572    plan_insights_optimizer_trace: Option<OptimizerTrace>,
573    insights_ctx: Option<Box<PlanInsightsContext>>,
574    global_lir_plan: optimize::peek::GlobalLirPlan,
575    optimization_finished_at: EpochMillis,
576}
577
578#[derive(Debug)]
579pub struct PeekStageCopyTo {
580    validity: PlanValidity,
581    optimizer: optimize::copy_to::Optimizer,
582    global_lir_plan: optimize::copy_to::GlobalLirPlan,
583    optimization_finished_at: EpochMillis,
584    source_ids: BTreeSet<GlobalId>,
585}
586
587#[derive(Debug)]
588pub struct PeekStageExplainPlan {
589    validity: PlanValidity,
590    optimizer: optimize::peek::Optimizer,
591    df_meta: DataflowMetainfo,
592    explain_ctx: ExplainPlanContext,
593    insights_ctx: Option<Box<PlanInsightsContext>>,
594}
595
596#[derive(Debug)]
597pub struct PeekStageExplainPushdown {
598    validity: PlanValidity,
599    determination: TimestampDetermination<mz_repr::Timestamp>,
600    imports: BTreeMap<GlobalId, MapFilterProject>,
601}
602
603#[derive(Debug)]
604pub enum CreateIndexStage {
605    Optimize(CreateIndexOptimize),
606    Finish(CreateIndexFinish),
607    Explain(CreateIndexExplain),
608}
609
610#[derive(Debug)]
611pub struct CreateIndexOptimize {
612    validity: PlanValidity,
613    plan: plan::CreateIndexPlan,
614    resolved_ids: ResolvedIds,
615    /// An optional context set iff the state machine is initiated from
616    /// sequencing an EXPLAIN for this statement.
617    explain_ctx: ExplainContext,
618}
619
620#[derive(Debug)]
621pub struct CreateIndexFinish {
622    validity: PlanValidity,
623    item_id: CatalogItemId,
624    global_id: GlobalId,
625    plan: plan::CreateIndexPlan,
626    resolved_ids: ResolvedIds,
627    global_mir_plan: optimize::index::GlobalMirPlan,
628    global_lir_plan: optimize::index::GlobalLirPlan,
629}
630
631#[derive(Debug)]
632pub struct CreateIndexExplain {
633    validity: PlanValidity,
634    exported_index_id: GlobalId,
635    plan: plan::CreateIndexPlan,
636    df_meta: DataflowMetainfo,
637    explain_ctx: ExplainPlanContext,
638}
639
640#[derive(Debug)]
641pub enum CreateViewStage {
642    Optimize(CreateViewOptimize),
643    Finish(CreateViewFinish),
644    Explain(CreateViewExplain),
645}
646
647#[derive(Debug)]
648pub struct CreateViewOptimize {
649    validity: PlanValidity,
650    plan: plan::CreateViewPlan,
651    resolved_ids: ResolvedIds,
652    /// An optional context set iff the state machine is initiated from
653    /// sequencing an EXPLAIN for this statement.
654    explain_ctx: ExplainContext,
655}
656
657#[derive(Debug)]
658pub struct CreateViewFinish {
659    validity: PlanValidity,
660    /// ID of this item in the Catalog.
661    item_id: CatalogItemId,
662    /// ID by with Compute will reference this View.
663    global_id: GlobalId,
664    plan: plan::CreateViewPlan,
665    /// IDs of objects resolved during name resolution.
666    resolved_ids: ResolvedIds,
667    optimized_expr: OptimizedMirRelationExpr,
668}
669
670#[derive(Debug)]
671pub struct CreateViewExplain {
672    validity: PlanValidity,
673    id: GlobalId,
674    plan: plan::CreateViewPlan,
675    explain_ctx: ExplainPlanContext,
676}
677
678#[derive(Debug)]
679pub enum ExplainTimestampStage {
680    Optimize(ExplainTimestampOptimize),
681    RealTimeRecency(ExplainTimestampRealTimeRecency),
682    Finish(ExplainTimestampFinish),
683}
684
685#[derive(Debug)]
686pub struct ExplainTimestampOptimize {
687    validity: PlanValidity,
688    plan: plan::ExplainTimestampPlan,
689    cluster_id: ClusterId,
690}
691
692#[derive(Debug)]
693pub struct ExplainTimestampRealTimeRecency {
694    validity: PlanValidity,
695    format: ExplainFormat,
696    optimized_plan: OptimizedMirRelationExpr,
697    cluster_id: ClusterId,
698    when: QueryWhen,
699}
700
701#[derive(Debug)]
702pub struct ExplainTimestampFinish {
703    validity: PlanValidity,
704    format: ExplainFormat,
705    optimized_plan: OptimizedMirRelationExpr,
706    cluster_id: ClusterId,
707    source_ids: BTreeSet<GlobalId>,
708    when: QueryWhen,
709    real_time_recency_ts: Option<Timestamp>,
710}
711
712#[derive(Debug)]
713pub enum ClusterStage {
714    Alter(AlterCluster),
715    WaitForHydrated(AlterClusterWaitForHydrated),
716    Finalize(AlterClusterFinalize),
717}
718
719#[derive(Debug)]
720pub struct AlterCluster {
721    validity: PlanValidity,
722    plan: plan::AlterClusterPlan,
723}
724
725#[derive(Debug)]
726pub struct AlterClusterWaitForHydrated {
727    validity: PlanValidity,
728    plan: plan::AlterClusterPlan,
729    new_config: ClusterVariantManaged,
730    timeout_time: Instant,
731    on_timeout: OnTimeoutAction,
732}
733
734#[derive(Debug)]
735pub struct AlterClusterFinalize {
736    validity: PlanValidity,
737    plan: plan::AlterClusterPlan,
738    new_config: ClusterVariantManaged,
739}
740
741#[derive(Debug)]
742pub enum ExplainContext {
743    /// The ordinary, non-explain variant of the statement.
744    None,
745    /// The `EXPLAIN <level> PLAN FOR <explainee>` version of the statement.
746    Plan(ExplainPlanContext),
747    /// Generate a notice containing the `EXPLAIN PLAN INSIGHTS` output
748    /// alongside the query's normal output.
749    PlanInsightsNotice(OptimizerTrace),
750    /// `EXPLAIN FILTER PUSHDOWN`
751    Pushdown,
752}
753
754impl ExplainContext {
755    /// If available for this context, wrap the [`OptimizerTrace`] into a
756    /// [`tracing::Dispatch`] and set it as default, returning the resulting
757    /// guard in a `Some(guard)` option.
758    fn dispatch_guard(&self) -> Option<DispatchGuard<'_>> {
759        let optimizer_trace = match self {
760            ExplainContext::Plan(explain_ctx) => Some(&explain_ctx.optimizer_trace),
761            ExplainContext::PlanInsightsNotice(optimizer_trace) => Some(optimizer_trace),
762            _ => None,
763        };
764        optimizer_trace.map(|optimizer_trace| optimizer_trace.as_guard())
765    }
766
767    fn needs_cluster(&self) -> bool {
768        match self {
769            ExplainContext::None => true,
770            ExplainContext::Plan(..) => false,
771            ExplainContext::PlanInsightsNotice(..) => true,
772            ExplainContext::Pushdown => false,
773        }
774    }
775
776    fn needs_plan_insights(&self) -> bool {
777        matches!(
778            self,
779            ExplainContext::Plan(ExplainPlanContext {
780                stage: ExplainStage::PlanInsights,
781                ..
782            }) | ExplainContext::PlanInsightsNotice(_)
783        )
784    }
785}
786
787#[derive(Debug)]
788pub struct ExplainPlanContext {
789    pub broken: bool,
790    pub config: ExplainConfig,
791    pub format: ExplainFormat,
792    pub stage: ExplainStage,
793    pub replan: Option<GlobalId>,
794    pub desc: Option<RelationDesc>,
795    pub optimizer_trace: OptimizerTrace,
796}
797
798#[derive(Debug)]
799pub enum CreateMaterializedViewStage {
800    Optimize(CreateMaterializedViewOptimize),
801    Finish(CreateMaterializedViewFinish),
802    Explain(CreateMaterializedViewExplain),
803}
804
805#[derive(Debug)]
806pub struct CreateMaterializedViewOptimize {
807    validity: PlanValidity,
808    plan: plan::CreateMaterializedViewPlan,
809    resolved_ids: ResolvedIds,
810    /// An optional context set iff the state machine is initiated from
811    /// sequencing an EXPLAIN for this statement.
812    explain_ctx: ExplainContext,
813}
814
815#[derive(Debug)]
816pub struct CreateMaterializedViewFinish {
817    /// The ID of this Materialized View in the Catalog.
818    item_id: CatalogItemId,
819    /// The ID of the durable pTVC backing this Materialized View.
820    global_id: GlobalId,
821    validity: PlanValidity,
822    plan: plan::CreateMaterializedViewPlan,
823    resolved_ids: ResolvedIds,
824    local_mir_plan: optimize::materialized_view::LocalMirPlan,
825    global_mir_plan: optimize::materialized_view::GlobalMirPlan,
826    global_lir_plan: optimize::materialized_view::GlobalLirPlan,
827}
828
829#[derive(Debug)]
830pub struct CreateMaterializedViewExplain {
831    global_id: GlobalId,
832    validity: PlanValidity,
833    plan: plan::CreateMaterializedViewPlan,
834    df_meta: DataflowMetainfo,
835    explain_ctx: ExplainPlanContext,
836}
837
838#[derive(Debug)]
839pub enum SubscribeStage {
840    OptimizeMir(SubscribeOptimizeMir),
841    TimestampOptimizeLir(SubscribeTimestampOptimizeLir),
842    Finish(SubscribeFinish),
843}
844
845#[derive(Debug)]
846pub struct SubscribeOptimizeMir {
847    validity: PlanValidity,
848    plan: plan::SubscribePlan,
849    timeline: TimelineContext,
850    dependency_ids: BTreeSet<GlobalId>,
851    cluster_id: ComputeInstanceId,
852    replica_id: Option<ReplicaId>,
853}
854
855#[derive(Debug)]
856pub struct SubscribeTimestampOptimizeLir {
857    validity: PlanValidity,
858    plan: plan::SubscribePlan,
859    timeline: TimelineContext,
860    optimizer: optimize::subscribe::Optimizer,
861    global_mir_plan: optimize::subscribe::GlobalMirPlan<optimize::subscribe::Unresolved>,
862    dependency_ids: BTreeSet<GlobalId>,
863    replica_id: Option<ReplicaId>,
864}
865
866#[derive(Debug)]
867pub struct SubscribeFinish {
868    validity: PlanValidity,
869    cluster_id: ComputeInstanceId,
870    replica_id: Option<ReplicaId>,
871    plan: plan::SubscribePlan,
872    global_lir_plan: optimize::subscribe::GlobalLirPlan,
873    dependency_ids: BTreeSet<GlobalId>,
874}
875
876#[derive(Debug)]
877pub enum IntrospectionSubscribeStage {
878    OptimizeMir(IntrospectionSubscribeOptimizeMir),
879    TimestampOptimizeLir(IntrospectionSubscribeTimestampOptimizeLir),
880    Finish(IntrospectionSubscribeFinish),
881}
882
883#[derive(Debug)]
884pub struct IntrospectionSubscribeOptimizeMir {
885    validity: PlanValidity,
886    plan: plan::SubscribePlan,
887    subscribe_id: GlobalId,
888    cluster_id: ComputeInstanceId,
889    replica_id: ReplicaId,
890}
891
892#[derive(Debug)]
893pub struct IntrospectionSubscribeTimestampOptimizeLir {
894    validity: PlanValidity,
895    optimizer: optimize::subscribe::Optimizer,
896    global_mir_plan: optimize::subscribe::GlobalMirPlan<optimize::subscribe::Unresolved>,
897    cluster_id: ComputeInstanceId,
898    replica_id: ReplicaId,
899}
900
901#[derive(Debug)]
902pub struct IntrospectionSubscribeFinish {
903    validity: PlanValidity,
904    global_lir_plan: optimize::subscribe::GlobalLirPlan,
905    read_holds: ReadHolds<Timestamp>,
906    cluster_id: ComputeInstanceId,
907    replica_id: ReplicaId,
908}
909
910#[derive(Debug)]
911pub enum SecretStage {
912    CreateEnsure(CreateSecretEnsure),
913    CreateFinish(CreateSecretFinish),
914    RotateKeysEnsure(RotateKeysSecretEnsure),
915    RotateKeysFinish(RotateKeysSecretFinish),
916    Alter(AlterSecret),
917}
918
919#[derive(Debug)]
920pub struct CreateSecretEnsure {
921    validity: PlanValidity,
922    plan: plan::CreateSecretPlan,
923}
924
925#[derive(Debug)]
926pub struct CreateSecretFinish {
927    validity: PlanValidity,
928    item_id: CatalogItemId,
929    global_id: GlobalId,
930    plan: plan::CreateSecretPlan,
931}
932
933#[derive(Debug)]
934pub struct RotateKeysSecretEnsure {
935    validity: PlanValidity,
936    id: CatalogItemId,
937}
938
939#[derive(Debug)]
940pub struct RotateKeysSecretFinish {
941    validity: PlanValidity,
942    ops: Vec<crate::catalog::Op>,
943}
944
945#[derive(Debug)]
946pub struct AlterSecret {
947    validity: PlanValidity,
948    plan: plan::AlterSecretPlan,
949}
950
951/// An enum describing which cluster to run a statement on.
952///
953/// One example usage would be that if a query depends only on system tables, we might
954/// automatically run it on the catalog server cluster to benefit from indexes that exist there.
955#[derive(Debug, Copy, Clone, PartialEq, Eq)]
956pub enum TargetCluster {
957    /// The catalog server cluster.
958    CatalogServer,
959    /// The current user's active cluster.
960    Active,
961    /// The cluster selected at the start of a transaction.
962    Transaction(ClusterId),
963}
964
965/// Result types for each stage of a sequence.
966pub(crate) enum StageResult<T> {
967    /// A task was spawned that will return the next stage.
968    Handle(JoinHandle<Result<T, AdapterError>>),
969    /// A task was spawned that will return a response for the client.
970    HandleRetire(JoinHandle<Result<ExecuteResponse, AdapterError>>),
971    /// The next stage is immediately ready and will execute.
972    Immediate(T),
973    /// The final stage was executed and is ready to respond to the client.
974    Response(ExecuteResponse),
975}
976
977/// Common functionality for [Coordinator::sequence_staged].
978pub(crate) trait Staged: Send {
979    type Ctx: StagedContext;
980
981    fn validity(&mut self) -> &mut PlanValidity;
982
983    /// Returns the next stage or final result.
984    async fn stage(
985        self,
986        coord: &mut Coordinator,
987        ctx: &mut Self::Ctx,
988    ) -> Result<StageResult<Box<Self>>, AdapterError>;
989
990    /// Prepares a message for the Coordinator.
991    fn message(self, ctx: Self::Ctx, span: Span) -> Message;
992
993    /// Whether it is safe to SQL cancel this stage.
994    fn cancel_enabled(&self) -> bool;
995}
996
997pub trait StagedContext {
998    fn retire(self, result: Result<ExecuteResponse, AdapterError>);
999    fn session(&self) -> Option<&Session>;
1000}
1001
1002impl StagedContext for ExecuteContext {
1003    fn retire(self, result: Result<ExecuteResponse, AdapterError>) {
1004        self.retire(result);
1005    }
1006
1007    fn session(&self) -> Option<&Session> {
1008        Some(self.session())
1009    }
1010}
1011
1012impl StagedContext for () {
1013    fn retire(self, _result: Result<ExecuteResponse, AdapterError>) {}
1014
1015    fn session(&self) -> Option<&Session> {
1016        None
1017    }
1018}
1019
1020/// Configures a coordinator.
1021pub struct Config {
1022    pub controller_config: ControllerConfig,
1023    pub controller_envd_epoch: NonZeroI64,
1024    pub storage: Box<dyn mz_catalog::durable::DurableCatalogState>,
1025    pub audit_logs_iterator: AuditLogIterator,
1026    pub timestamp_oracle_url: Option<SensitiveUrl>,
1027    pub unsafe_mode: bool,
1028    pub all_features: bool,
1029    pub build_info: &'static BuildInfo,
1030    pub environment_id: EnvironmentId,
1031    pub metrics_registry: MetricsRegistry,
1032    pub now: NowFn,
1033    pub secrets_controller: Arc<dyn SecretsController>,
1034    pub cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
1035    pub availability_zones: Vec<String>,
1036    pub cluster_replica_sizes: ClusterReplicaSizeMap,
1037    pub builtin_system_cluster_config: BootstrapBuiltinClusterConfig,
1038    pub builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig,
1039    pub builtin_probe_cluster_config: BootstrapBuiltinClusterConfig,
1040    pub builtin_support_cluster_config: BootstrapBuiltinClusterConfig,
1041    pub builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig,
1042    pub system_parameter_defaults: BTreeMap<String, String>,
1043    pub storage_usage_client: StorageUsageClient,
1044    pub storage_usage_collection_interval: Duration,
1045    pub storage_usage_retention_period: Option<Duration>,
1046    pub segment_client: Option<mz_segment::Client>,
1047    pub egress_addresses: Vec<IpNet>,
1048    pub remote_system_parameters: Option<BTreeMap<String, String>>,
1049    pub aws_account_id: Option<String>,
1050    pub aws_privatelink_availability_zones: Option<Vec<String>>,
1051    pub connection_context: ConnectionContext,
1052    pub connection_limit_callback: Box<dyn Fn(u64, u64) -> () + Send + Sync + 'static>,
1053    pub webhook_concurrency_limit: WebhookConcurrencyLimiter,
1054    pub http_host_name: Option<String>,
1055    pub tracing_handle: TracingHandle,
1056    /// Whether or not to start controllers in read-only mode. This is only
1057    /// meant for use during development of read-only clusters and 0dt upgrades
1058    /// and should go away once we have proper orchestration during upgrades.
1059    pub read_only_controllers: bool,
1060
1061    /// A trigger that signals that the current deployment has caught up with a
1062    /// previous deployment. Only used during 0dt deployment, while in read-only
1063    /// mode.
1064    pub caught_up_trigger: Option<Trigger>,
1065
1066    pub helm_chart_version: Option<String>,
1067    pub license_key: ValidatedLicenseKey,
1068    pub external_login_password_mz_system: Option<Password>,
1069}
1070
1071/// Metadata about an active connection.
1072#[derive(Debug, Serialize)]
1073pub struct ConnMeta {
1074    /// Pgwire specifies that every connection have a 32-bit secret associated
1075    /// with it, that is known to both the client and the server. Cancellation
1076    /// requests are required to authenticate with the secret of the connection
1077    /// that they are targeting.
1078    secret_key: u32,
1079    /// The time when the session's connection was initiated.
1080    connected_at: EpochMillis,
1081    user: User,
1082    application_name: String,
1083    uuid: Uuid,
1084    conn_id: ConnectionId,
1085    client_ip: Option<IpAddr>,
1086
1087    /// Sinks that will need to be dropped when the current transaction, if
1088    /// any, is cleared.
1089    drop_sinks: BTreeSet<GlobalId>,
1090
1091    /// Lock for the Coordinator's deferred statements that is dropped on transaction clear.
1092    #[serde(skip)]
1093    deferred_lock: Option<OwnedMutexGuard<()>>,
1094
1095    /// Cluster reconfigurations that will need to be
1096    /// cleaned up when the current transaction is cleared
1097    pending_cluster_alters: BTreeSet<ClusterId>,
1098
1099    /// Channel on which to send notices to a session.
1100    #[serde(skip)]
1101    notice_tx: mpsc::UnboundedSender<AdapterNotice>,
1102
1103    /// The role that initiated the database context. Fixed for the duration of the connection.
1104    /// WARNING: This role reference is not updated when the role is dropped.
1105    /// Consumers should not assume that this role exist.
1106    authenticated_role: RoleId,
1107}
1108
1109impl ConnMeta {
1110    pub fn conn_id(&self) -> &ConnectionId {
1111        &self.conn_id
1112    }
1113
1114    pub fn user(&self) -> &User {
1115        &self.user
1116    }
1117
1118    pub fn application_name(&self) -> &str {
1119        &self.application_name
1120    }
1121
1122    pub fn authenticated_role_id(&self) -> &RoleId {
1123        &self.authenticated_role
1124    }
1125
1126    pub fn uuid(&self) -> Uuid {
1127        self.uuid
1128    }
1129
1130    pub fn client_ip(&self) -> Option<IpAddr> {
1131        self.client_ip
1132    }
1133
1134    pub fn connected_at(&self) -> EpochMillis {
1135        self.connected_at
1136    }
1137}
1138
1139#[derive(Debug)]
1140/// A pending transaction waiting to be committed.
1141pub struct PendingTxn {
1142    /// Context used to send a response back to the client.
1143    ctx: ExecuteContext,
1144    /// Client response for transaction.
1145    response: Result<PendingTxnResponse, AdapterError>,
1146    /// The action to take at the end of the transaction.
1147    action: EndTransactionAction,
1148}
1149
1150#[derive(Debug)]
1151/// The response we'll send for a [`PendingTxn`].
1152pub enum PendingTxnResponse {
1153    /// The transaction will be committed.
1154    Committed {
1155        /// Parameters that will change, and their values, once this transaction is complete.
1156        params: BTreeMap<&'static str, String>,
1157    },
1158    /// The transaction will be rolled back.
1159    Rolledback {
1160        /// Parameters that will change, and their values, once this transaction is complete.
1161        params: BTreeMap<&'static str, String>,
1162    },
1163}
1164
1165impl PendingTxnResponse {
1166    pub fn extend_params(&mut self, p: impl IntoIterator<Item = (&'static str, String)>) {
1167        match self {
1168            PendingTxnResponse::Committed { params }
1169            | PendingTxnResponse::Rolledback { params } => params.extend(p),
1170        }
1171    }
1172}
1173
1174impl From<PendingTxnResponse> for ExecuteResponse {
1175    fn from(value: PendingTxnResponse) -> Self {
1176        match value {
1177            PendingTxnResponse::Committed { params } => {
1178                ExecuteResponse::TransactionCommitted { params }
1179            }
1180            PendingTxnResponse::Rolledback { params } => {
1181                ExecuteResponse::TransactionRolledBack { params }
1182            }
1183        }
1184    }
1185}
1186
1187#[derive(Debug)]
1188/// A pending read transaction waiting to be linearized along with metadata about it's state
1189pub struct PendingReadTxn {
1190    /// The transaction type
1191    txn: PendingRead,
1192    /// The timestamp context of the transaction.
1193    timestamp_context: TimestampContext<mz_repr::Timestamp>,
1194    /// When we created this pending txn, when the transaction ends. Only used for metrics.
1195    created: Instant,
1196    /// Number of times we requeued the processing of this pending read txn.
1197    /// Requeueing is necessary if the time we executed the query is after the current oracle time;
1198    /// see [`Coordinator::message_linearize_reads`] for more details.
1199    num_requeues: u64,
1200    /// Telemetry context.
1201    otel_ctx: OpenTelemetryContext,
1202}
1203
1204impl PendingReadTxn {
1205    /// Return the timestamp context of the pending read transaction.
1206    pub fn timestamp_context(&self) -> &TimestampContext<mz_repr::Timestamp> {
1207        &self.timestamp_context
1208    }
1209
1210    pub(crate) fn take_context(self) -> ExecuteContext {
1211        self.txn.take_context()
1212    }
1213}
1214
1215#[derive(Debug)]
1216/// A pending read transaction waiting to be linearized.
1217enum PendingRead {
1218    Read {
1219        /// The inner transaction.
1220        txn: PendingTxn,
1221    },
1222    ReadThenWrite {
1223        /// Context used to send a response back to the client.
1224        ctx: ExecuteContext,
1225        /// Channel used to alert the transaction that the read has been linearized and send back
1226        /// `ctx`.
1227        tx: oneshot::Sender<Option<ExecuteContext>>,
1228    },
1229}
1230
1231impl PendingRead {
1232    /// Alert the client that the read has been linearized.
1233    ///
1234    /// If it is necessary to finalize an execute, return the state necessary to do so
1235    /// (execution context and result)
1236    #[instrument(level = "debug")]
1237    pub fn finish(self) -> Option<(ExecuteContext, Result<ExecuteResponse, AdapterError>)> {
1238        match self {
1239            PendingRead::Read {
1240                txn:
1241                    PendingTxn {
1242                        mut ctx,
1243                        response,
1244                        action,
1245                    },
1246                ..
1247            } => {
1248                let changed = ctx.session_mut().vars_mut().end_transaction(action);
1249                // Append any parameters that changed to the response.
1250                let response = response.map(|mut r| {
1251                    r.extend_params(changed);
1252                    ExecuteResponse::from(r)
1253                });
1254
1255                Some((ctx, response))
1256            }
1257            PendingRead::ReadThenWrite { ctx, tx, .. } => {
1258                // Ignore errors if the caller has hung up.
1259                let _ = tx.send(Some(ctx));
1260                None
1261            }
1262        }
1263    }
1264
1265    fn label(&self) -> &'static str {
1266        match self {
1267            PendingRead::Read { .. } => "read",
1268            PendingRead::ReadThenWrite { .. } => "read_then_write",
1269        }
1270    }
1271
1272    pub(crate) fn take_context(self) -> ExecuteContext {
1273        match self {
1274            PendingRead::Read { txn, .. } => txn.ctx,
1275            PendingRead::ReadThenWrite { ctx, tx, .. } => {
1276                // Inform the transaction that we've taken their context.
1277                // Ignore errors if the caller has hung up.
1278                let _ = tx.send(None);
1279                ctx
1280            }
1281        }
1282    }
1283}
1284
1285/// State that the coordinator must process as part of retiring
1286/// command execution.  `ExecuteContextExtra::Default` is guaranteed
1287/// to produce a value that will cause the coordinator to do nothing, and
1288/// is intended for use by code that invokes the execution processing flow
1289/// (i.e., `sequence_plan`) without actually being a statement execution.
1290///
1291/// This struct must not be dropped if it contains non-trivial
1292/// state. The only valid way to get rid of it is to pass it to the
1293/// coordinator for retirement. To enforce this, we assert in the
1294/// `Drop` implementation.
1295#[derive(Debug, Default)]
1296#[must_use]
1297pub struct ExecuteContextExtra {
1298    statement_uuid: Option<StatementLoggingId>,
1299}
1300
1301impl ExecuteContextExtra {
1302    pub(crate) fn new(statement_uuid: Option<StatementLoggingId>) -> Self {
1303        Self { statement_uuid }
1304    }
1305    pub fn is_trivial(&self) -> bool {
1306        let Self { statement_uuid } = self;
1307        statement_uuid.is_none()
1308    }
1309    pub fn contents(&self) -> Option<StatementLoggingId> {
1310        let Self { statement_uuid } = self;
1311        *statement_uuid
1312    }
1313    /// Take responsibility for the contents.  This should only be
1314    /// called from code that knows what to do to finish up logging
1315    /// based on the inner value.
1316    #[must_use]
1317    fn retire(mut self) -> Option<StatementLoggingId> {
1318        let Self { statement_uuid } = &mut self;
1319        statement_uuid.take()
1320    }
1321}
1322
1323impl Drop for ExecuteContextExtra {
1324    fn drop(&mut self) {
1325        let Self { statement_uuid } = &*self;
1326        if let Some(statement_uuid) = statement_uuid {
1327            // Note: the impact when this error hits
1328            // is that the statement will never be marked
1329            // as finished in the statement log.
1330            soft_panic_or_log!(
1331                "execute context for statement {statement_uuid:?} dropped without being properly retired."
1332            );
1333        }
1334    }
1335}
1336
1337/// Bundle of state related to statement execution.
1338///
1339/// This struct collects a bundle of state that needs to be threaded
1340/// through various functions as part of statement execution.
1341/// Currently, it is only used to finalize execution, by calling one
1342/// of the methods `retire` or `retire_aysnc`. Finalizing execution
1343/// involves sending the session back to the pgwire layer so that it
1344/// may be used to process further commands. In the future, it will
1345/// also involve performing some work on the main coordinator thread
1346/// (e.g., recording the time at which the statement finished
1347/// executing) the state necessary to perform this work is bundled in
1348/// the `ExecuteContextExtra` object (today, it is simply empty).
1349#[derive(Debug)]
1350pub struct ExecuteContext {
1351    inner: Box<ExecuteContextInner>,
1352}
1353
1354impl std::ops::Deref for ExecuteContext {
1355    type Target = ExecuteContextInner;
1356    fn deref(&self) -> &Self::Target {
1357        &*self.inner
1358    }
1359}
1360
1361impl std::ops::DerefMut for ExecuteContext {
1362    fn deref_mut(&mut self) -> &mut Self::Target {
1363        &mut *self.inner
1364    }
1365}
1366
1367#[derive(Debug)]
1368pub struct ExecuteContextInner {
1369    tx: ClientTransmitter<ExecuteResponse>,
1370    internal_cmd_tx: mpsc::UnboundedSender<Message>,
1371    session: Session,
1372    extra: ExecuteContextExtra,
1373}
1374
1375impl ExecuteContext {
1376    pub fn session(&self) -> &Session {
1377        &self.session
1378    }
1379
1380    pub fn session_mut(&mut self) -> &mut Session {
1381        &mut self.session
1382    }
1383
1384    pub fn tx(&self) -> &ClientTransmitter<ExecuteResponse> {
1385        &self.tx
1386    }
1387
1388    pub fn tx_mut(&mut self) -> &mut ClientTransmitter<ExecuteResponse> {
1389        &mut self.tx
1390    }
1391
1392    pub fn from_parts(
1393        tx: ClientTransmitter<ExecuteResponse>,
1394        internal_cmd_tx: mpsc::UnboundedSender<Message>,
1395        session: Session,
1396        extra: ExecuteContextExtra,
1397    ) -> Self {
1398        Self {
1399            inner: ExecuteContextInner {
1400                tx,
1401                session,
1402                extra,
1403                internal_cmd_tx,
1404            }
1405            .into(),
1406        }
1407    }
1408
1409    /// By calling this function, the caller takes responsibility for
1410    /// dealing with the instance of `ExecuteContextExtra`. This is
1411    /// intended to support protocols (like `COPY FROM`) that involve
1412    /// multiple passes of sending the session back and forth between
1413    /// the coordinator and the pgwire layer. As part of any such
1414    /// protocol, we must ensure that the `ExecuteContextExtra`
1415    /// (possibly wrapped in a new `ExecuteContext`) is passed back to the coordinator for
1416    /// eventual retirement.
1417    pub fn into_parts(
1418        self,
1419    ) -> (
1420        ClientTransmitter<ExecuteResponse>,
1421        mpsc::UnboundedSender<Message>,
1422        Session,
1423        ExecuteContextExtra,
1424    ) {
1425        let ExecuteContextInner {
1426            tx,
1427            internal_cmd_tx,
1428            session,
1429            extra,
1430        } = *self.inner;
1431        (tx, internal_cmd_tx, session, extra)
1432    }
1433
1434    /// Retire the execution, by sending a message to the coordinator.
1435    #[instrument(level = "debug")]
1436    pub fn retire(self, result: Result<ExecuteResponse, AdapterError>) {
1437        let ExecuteContextInner {
1438            tx,
1439            internal_cmd_tx,
1440            session,
1441            extra,
1442        } = *self.inner;
1443        let reason = if extra.is_trivial() {
1444            None
1445        } else {
1446            Some((&result).into())
1447        };
1448        tx.send(result, session);
1449        if let Some(reason) = reason {
1450            if let Err(e) = internal_cmd_tx.send(Message::RetireExecute {
1451                otel_ctx: OpenTelemetryContext::obtain(),
1452                data: extra,
1453                reason,
1454            }) {
1455                warn!("internal_cmd_rx dropped before we could send: {:?}", e);
1456            }
1457        }
1458    }
1459
1460    pub fn extra(&self) -> &ExecuteContextExtra {
1461        &self.extra
1462    }
1463
1464    pub fn extra_mut(&mut self) -> &mut ExecuteContextExtra {
1465        &mut self.extra
1466    }
1467}
1468
1469#[derive(Debug)]
1470struct ClusterReplicaStatuses(
1471    BTreeMap<ClusterId, BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>>>,
1472);
1473
1474impl ClusterReplicaStatuses {
1475    pub(crate) fn new() -> ClusterReplicaStatuses {
1476        ClusterReplicaStatuses(BTreeMap::new())
1477    }
1478
1479    /// Initializes the statuses of the specified cluster.
1480    ///
1481    /// Panics if the cluster statuses are already initialized.
1482    pub(crate) fn initialize_cluster_statuses(&mut self, cluster_id: ClusterId) {
1483        let prev = self.0.insert(cluster_id, BTreeMap::new());
1484        assert_eq!(
1485            prev, None,
1486            "cluster {cluster_id} statuses already initialized"
1487        );
1488    }
1489
1490    /// Initializes the statuses of the specified cluster replica.
1491    ///
1492    /// Panics if the cluster replica statuses are already initialized.
1493    pub(crate) fn initialize_cluster_replica_statuses(
1494        &mut self,
1495        cluster_id: ClusterId,
1496        replica_id: ReplicaId,
1497        num_processes: usize,
1498        time: DateTime<Utc>,
1499    ) {
1500        tracing::info!(
1501            ?cluster_id,
1502            ?replica_id,
1503            ?time,
1504            "initializing cluster replica status"
1505        );
1506        let replica_statuses = self.0.entry(cluster_id).or_default();
1507        let process_statuses = (0..num_processes)
1508            .map(|process_id| {
1509                let status = ClusterReplicaProcessStatus {
1510                    status: ClusterStatus::Offline(Some(OfflineReason::Initializing)),
1511                    time: time.clone(),
1512                };
1513                (u64::cast_from(process_id), status)
1514            })
1515            .collect();
1516        let prev = replica_statuses.insert(replica_id, process_statuses);
1517        assert_none!(
1518            prev,
1519            "cluster replica {cluster_id}.{replica_id} statuses already initialized"
1520        );
1521    }
1522
1523    /// Removes the statuses of the specified cluster.
1524    ///
1525    /// Panics if the cluster does not exist.
1526    pub(crate) fn remove_cluster_statuses(
1527        &mut self,
1528        cluster_id: &ClusterId,
1529    ) -> BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>> {
1530        let prev = self.0.remove(cluster_id);
1531        prev.unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"))
1532    }
1533
1534    /// Removes the statuses of the specified cluster replica.
1535    ///
1536    /// Panics if the cluster or replica does not exist.
1537    pub(crate) fn remove_cluster_replica_statuses(
1538        &mut self,
1539        cluster_id: &ClusterId,
1540        replica_id: &ReplicaId,
1541    ) -> BTreeMap<ProcessId, ClusterReplicaProcessStatus> {
1542        let replica_statuses = self
1543            .0
1544            .get_mut(cluster_id)
1545            .unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"));
1546        let prev = replica_statuses.remove(replica_id);
1547        prev.unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1548    }
1549
1550    /// Inserts or updates the status of the specified cluster replica process.
1551    ///
1552    /// Panics if the cluster or replica does not exist.
1553    pub(crate) fn ensure_cluster_status(
1554        &mut self,
1555        cluster_id: ClusterId,
1556        replica_id: ReplicaId,
1557        process_id: ProcessId,
1558        status: ClusterReplicaProcessStatus,
1559    ) {
1560        let replica_statuses = self
1561            .0
1562            .get_mut(&cluster_id)
1563            .unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"))
1564            .get_mut(&replica_id)
1565            .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"));
1566        replica_statuses.insert(process_id, status);
1567    }
1568
1569    /// Computes the status of the cluster replica as a whole.
1570    ///
1571    /// Panics if `cluster_id` or `replica_id` don't exist.
1572    pub fn get_cluster_replica_status(
1573        &self,
1574        cluster_id: ClusterId,
1575        replica_id: ReplicaId,
1576    ) -> ClusterStatus {
1577        let process_status = self.get_cluster_replica_statuses(cluster_id, replica_id);
1578        Self::cluster_replica_status(process_status)
1579    }
1580
1581    /// Computes the status of the cluster replica as a whole.
1582    pub fn cluster_replica_status(
1583        process_status: &BTreeMap<ProcessId, ClusterReplicaProcessStatus>,
1584    ) -> ClusterStatus {
1585        process_status
1586            .values()
1587            .fold(ClusterStatus::Online, |s, p| match (s, p.status) {
1588                (ClusterStatus::Online, ClusterStatus::Online) => ClusterStatus::Online,
1589                (x, y) => {
1590                    let reason_x = match x {
1591                        ClusterStatus::Offline(reason) => reason,
1592                        ClusterStatus::Online => None,
1593                    };
1594                    let reason_y = match y {
1595                        ClusterStatus::Offline(reason) => reason,
1596                        ClusterStatus::Online => None,
1597                    };
1598                    // Arbitrarily pick the first known not-ready reason.
1599                    ClusterStatus::Offline(reason_x.or(reason_y))
1600                }
1601            })
1602    }
1603
1604    /// Gets the statuses of the given cluster replica.
1605    ///
1606    /// Panics if the cluster or replica does not exist
1607    pub(crate) fn get_cluster_replica_statuses(
1608        &self,
1609        cluster_id: ClusterId,
1610        replica_id: ReplicaId,
1611    ) -> &BTreeMap<ProcessId, ClusterReplicaProcessStatus> {
1612        self.try_get_cluster_replica_statuses(cluster_id, replica_id)
1613            .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1614    }
1615
1616    /// Gets the statuses of the given cluster replica.
1617    pub(crate) fn try_get_cluster_replica_statuses(
1618        &self,
1619        cluster_id: ClusterId,
1620        replica_id: ReplicaId,
1621    ) -> Option<&BTreeMap<ProcessId, ClusterReplicaProcessStatus>> {
1622        self.try_get_cluster_statuses(cluster_id)
1623            .and_then(|statuses| statuses.get(&replica_id))
1624    }
1625
1626    /// Gets the statuses of the given cluster.
1627    pub(crate) fn try_get_cluster_statuses(
1628        &self,
1629        cluster_id: ClusterId,
1630    ) -> Option<&BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>>> {
1631        self.0.get(&cluster_id)
1632    }
1633}
1634
1635/// Glues the external world to the Timely workers.
1636#[derive(Derivative)]
1637#[derivative(Debug)]
1638pub struct Coordinator {
1639    /// The controller for the storage and compute layers.
1640    #[derivative(Debug = "ignore")]
1641    controller: mz_controller::Controller,
1642    /// The catalog in an Arc suitable for readonly references. The Arc allows
1643    /// us to hand out cheap copies of the catalog to functions that can use it
1644    /// off of the main coordinator thread. If the coordinator needs to mutate
1645    /// the catalog, call [`Self::catalog_mut`], which will clone this struct member,
1646    /// allowing it to be mutated here while the other off-thread references can
1647    /// read their catalog as long as needed. In the future we would like this
1648    /// to be a pTVC, but for now this is sufficient.
1649    catalog: Arc<Catalog>,
1650
1651    /// A client for persist. Initially, this is only used for reading stashed
1652    /// peek responses out of batches.
1653    persist_client: PersistClient,
1654
1655    /// Channel to manage internal commands from the coordinator to itself.
1656    internal_cmd_tx: mpsc::UnboundedSender<Message>,
1657    /// Notification that triggers a group commit.
1658    group_commit_tx: appends::GroupCommitNotifier,
1659
1660    /// Channel for strict serializable reads ready to commit.
1661    strict_serializable_reads_tx: mpsc::UnboundedSender<(ConnectionId, PendingReadTxn)>,
1662
1663    /// Mechanism for totally ordering write and read timestamps, so that all reads
1664    /// reflect exactly the set of writes that precede them, and no writes that follow.
1665    global_timelines: BTreeMap<Timeline, TimelineState<Timestamp>>,
1666
1667    /// A generator for transient [`GlobalId`]s, shareable with other threads.
1668    transient_id_gen: Arc<TransientIdGen>,
1669    /// A map from connection ID to metadata about that connection for all
1670    /// active connections.
1671    active_conns: BTreeMap<ConnectionId, ConnMeta>,
1672
1673    /// For each transaction, the read holds taken to support any performed reads.
1674    ///
1675    /// Upon completing a transaction, these read holds should be dropped.
1676    txn_read_holds: BTreeMap<ConnectionId, read_policy::ReadHolds<Timestamp>>,
1677
1678    /// Access to the peek fields should be restricted to methods in the [`peek`] API.
1679    /// A map from pending peek ids to the queue into which responses are sent, and
1680    /// the connection id of the client that initiated the peek.
1681    pending_peeks: BTreeMap<Uuid, PendingPeek>,
1682    /// A map from client connection ids to a set of all pending peeks for that client.
1683    client_pending_peeks: BTreeMap<ConnectionId, BTreeMap<Uuid, ClusterId>>,
1684
1685    /// A map from client connection ids to pending linearize read transaction.
1686    pending_linearize_read_txns: BTreeMap<ConnectionId, PendingReadTxn>,
1687
1688    /// A map from the compute sink ID to it's state description.
1689    active_compute_sinks: BTreeMap<GlobalId, ActiveComputeSink>,
1690    /// A map from active webhooks to their invalidation handle.
1691    active_webhooks: BTreeMap<CatalogItemId, WebhookAppenderInvalidator>,
1692    /// A map of active `COPY FROM` statements. The Coordinator waits for `clusterd`
1693    /// to stage Batches in Persist that we will then link into the shard.
1694    active_copies: BTreeMap<ConnectionId, ActiveCopyFrom>,
1695
1696    /// A map from connection ids to a watch channel that is set to `true` if the connection
1697    /// received a cancel request.
1698    staged_cancellation: BTreeMap<ConnectionId, (watch::Sender<bool>, watch::Receiver<bool>)>,
1699    /// Active introspection subscribes.
1700    introspection_subscribes: BTreeMap<GlobalId, IntrospectionSubscribe>,
1701
1702    /// Locks that grant access to a specific object, populated lazily as objects are written to.
1703    write_locks: BTreeMap<CatalogItemId, Arc<tokio::sync::Mutex<()>>>,
1704    /// Plans that are currently deferred and waiting on a write lock.
1705    deferred_write_ops: BTreeMap<ConnectionId, DeferredOp>,
1706
1707    /// Pending writes waiting for a group commit.
1708    pending_writes: Vec<PendingWriteTxn>,
1709
1710    /// For the realtime timeline, an explicit SELECT or INSERT on a table will bump the
1711    /// table's timestamps, but there are cases where timestamps are not bumped but
1712    /// we expect the closed timestamps to advance (`AS OF X`, SUBSCRIBing views over
1713    /// RT sources and tables). To address these, spawn a task that forces table
1714    /// timestamps to close on a regular interval. This roughly tracks the behavior
1715    /// of realtime sources that close off timestamps on an interval.
1716    ///
1717    /// For non-realtime timelines, nothing pushes the timestamps forward, so we must do
1718    /// it manually.
1719    advance_timelines_interval: Interval,
1720
1721    /// Serialized DDL. DDL must be serialized because:
1722    /// - Many of them do off-thread work and need to verify the catalog is in a valid state, but
1723    ///   [`PlanValidity`] does not currently support tracking all changes. Doing that correctly
1724    ///   seems to be more difficult than it's worth, so we would instead re-plan and re-sequence
1725    ///   the statements.
1726    /// - Re-planning a statement is hard because Coordinator and Session state is mutated at
1727    ///   various points, and we would need to correctly reset those changes before re-planning and
1728    ///   re-sequencing.
1729    serialized_ddl: LockedVecDeque<DeferredPlanStatement>,
1730
1731    /// Handle to secret manager that can create and delete secrets from
1732    /// an arbitrary secret storage engine.
1733    secrets_controller: Arc<dyn SecretsController>,
1734    /// A secrets reader than maintains an in-memory cache, where values have a set TTL.
1735    caching_secrets_reader: CachingSecretsReader,
1736
1737    /// Handle to a manager that can create and delete kubernetes resources
1738    /// (ie: VpcEndpoint objects)
1739    cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
1740
1741    /// Persist client for fetching storage metadata such as size metrics.
1742    storage_usage_client: StorageUsageClient,
1743    /// The interval at which to collect storage usage information.
1744    storage_usage_collection_interval: Duration,
1745
1746    /// Segment analytics client.
1747    #[derivative(Debug = "ignore")]
1748    segment_client: Option<mz_segment::Client>,
1749
1750    /// Coordinator metrics.
1751    metrics: Metrics,
1752    /// Optimizer metrics.
1753    optimizer_metrics: OptimizerMetrics,
1754
1755    /// Tracing handle.
1756    tracing_handle: TracingHandle,
1757
1758    /// Data used by the statement logging feature.
1759    statement_logging: StatementLogging,
1760
1761    /// Limit for how many concurrent webhook requests we allow.
1762    webhook_concurrency_limit: WebhookConcurrencyLimiter,
1763
1764    /// Optional config for the Postgres-backed timestamp oracle. This is
1765    /// _required_ when `postgres` is configured using the `timestamp_oracle`
1766    /// system variable.
1767    pg_timestamp_oracle_config: Option<PostgresTimestampOracleConfig>,
1768
1769    /// Periodically asks cluster scheduling policies to make their decisions.
1770    check_cluster_scheduling_policies_interval: Interval,
1771
1772    /// This keeps the last On/Off decision for each cluster and each scheduling policy.
1773    /// (Clusters that have been dropped or are otherwise out of scope for automatic scheduling are
1774    /// periodically cleaned up from this Map.)
1775    cluster_scheduling_decisions: BTreeMap<ClusterId, BTreeMap<&'static str, SchedulingDecision>>,
1776
1777    /// When doing 0dt upgrades/in read-only mode, periodically ask all known
1778    /// clusters/collections whether they are caught up.
1779    caught_up_check_interval: Interval,
1780
1781    /// Context needed to check whether all clusters/collections have caught up.
1782    /// Only used during 0dt deployment, while in read-only mode.
1783    caught_up_check: Option<CaughtUpCheckContext>,
1784
1785    /// Tracks the state associated with the currently installed watchsets.
1786    installed_watch_sets: BTreeMap<WatchSetId, (ConnectionId, WatchSetResponse)>,
1787
1788    /// Tracks the currently installed watchsets for each connection.
1789    connection_watch_sets: BTreeMap<ConnectionId, BTreeSet<WatchSetId>>,
1790
1791    /// Tracks the statuses of all cluster replicas.
1792    cluster_replica_statuses: ClusterReplicaStatuses,
1793
1794    /// Whether or not to start controllers in read-only mode. This is only
1795    /// meant for use during development of read-only clusters and 0dt upgrades
1796    /// and should go away once we have proper orchestration during upgrades.
1797    read_only_controllers: bool,
1798
1799    /// Updates to builtin tables that are being buffered while we are in
1800    /// read-only mode. We apply these all at once when coming out of read-only
1801    /// mode.
1802    ///
1803    /// This is a `Some` while in read-only mode and will be replaced by a
1804    /// `None` when we transition out of read-only mode and write out any
1805    /// buffered updates.
1806    buffered_builtin_table_updates: Option<Vec<BuiltinTableUpdate>>,
1807
1808    license_key: ValidatedLicenseKey,
1809}
1810
1811impl Coordinator {
1812    /// Initializes coordinator state based on the contained catalog. Must be
1813    /// called after creating the coordinator and before calling the
1814    /// `Coordinator::serve` method.
1815    #[instrument(name = "coord::bootstrap")]
1816    pub(crate) async fn bootstrap(
1817        &mut self,
1818        boot_ts: Timestamp,
1819        migrated_storage_collections_0dt: BTreeSet<CatalogItemId>,
1820        mut builtin_table_updates: Vec<BuiltinTableUpdate>,
1821        cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
1822        uncached_local_exprs: BTreeMap<GlobalId, LocalExpressions>,
1823        audit_logs_iterator: AuditLogIterator,
1824    ) -> Result<(), AdapterError> {
1825        let bootstrap_start = Instant::now();
1826        info!("startup: coordinator init: bootstrap beginning");
1827        info!("startup: coordinator init: bootstrap: preamble beginning");
1828
1829        // Initialize cluster replica statuses.
1830        // Gross iterator is to avoid partial borrow issues.
1831        let cluster_statuses: Vec<(_, Vec<_>)> = self
1832            .catalog()
1833            .clusters()
1834            .map(|cluster| {
1835                (
1836                    cluster.id(),
1837                    cluster
1838                        .replicas()
1839                        .map(|replica| {
1840                            (replica.replica_id, replica.config.location.num_processes())
1841                        })
1842                        .collect(),
1843                )
1844            })
1845            .collect();
1846        let now = self.now_datetime();
1847        for (cluster_id, replica_statuses) in cluster_statuses {
1848            self.cluster_replica_statuses
1849                .initialize_cluster_statuses(cluster_id);
1850            for (replica_id, num_processes) in replica_statuses {
1851                self.cluster_replica_statuses
1852                    .initialize_cluster_replica_statuses(
1853                        cluster_id,
1854                        replica_id,
1855                        num_processes,
1856                        now,
1857                    );
1858            }
1859        }
1860
1861        let system_config = self.catalog().system_config();
1862
1863        // Inform metrics about the initial system configuration.
1864        mz_metrics::update_dyncfg(&system_config.dyncfg_updates());
1865
1866        // Inform the controllers about their initial configuration.
1867        let compute_config = flags::compute_config(system_config);
1868        let storage_config = flags::storage_config(system_config);
1869        let scheduling_config = flags::orchestrator_scheduling_config(system_config);
1870        let dyncfg_updates = system_config.dyncfg_updates();
1871        self.controller.compute.update_configuration(compute_config);
1872        self.controller.storage.update_parameters(storage_config);
1873        self.controller
1874            .update_orchestrator_scheduling_config(scheduling_config);
1875        self.controller.update_configuration(dyncfg_updates);
1876
1877        self.validate_resource_limit_numeric(
1878            Numeric::zero(),
1879            self.current_credit_consumption_rate(),
1880            |system_vars| {
1881                self.license_key
1882                    .max_credit_consumption_rate()
1883                    .map_or_else(|| system_vars.max_credit_consumption_rate(), Numeric::from)
1884            },
1885            "cluster replica",
1886            MAX_CREDIT_CONSUMPTION_RATE.name(),
1887        )?;
1888
1889        let mut policies_to_set: BTreeMap<CompactionWindow, CollectionIdBundle> =
1890            Default::default();
1891
1892        let enable_worker_core_affinity =
1893            self.catalog().system_config().enable_worker_core_affinity();
1894        for instance in self.catalog.clusters() {
1895            self.controller.create_cluster(
1896                instance.id,
1897                ClusterConfig {
1898                    arranged_logs: instance.log_indexes.clone(),
1899                    workload_class: instance.config.workload_class.clone(),
1900                },
1901            )?;
1902            for replica in instance.replicas() {
1903                let role = instance.role();
1904                self.controller.create_replica(
1905                    instance.id,
1906                    replica.replica_id,
1907                    instance.name.clone(),
1908                    replica.name.clone(),
1909                    role,
1910                    replica.config.clone(),
1911                    enable_worker_core_affinity,
1912                )?;
1913            }
1914        }
1915
1916        info!(
1917            "startup: coordinator init: bootstrap: preamble complete in {:?}",
1918            bootstrap_start.elapsed()
1919        );
1920
1921        let init_storage_collections_start = Instant::now();
1922        info!("startup: coordinator init: bootstrap: storage collections init beginning");
1923        self.bootstrap_storage_collections(&migrated_storage_collections_0dt)
1924            .await;
1925        info!(
1926            "startup: coordinator init: bootstrap: storage collections init complete in {:?}",
1927            init_storage_collections_start.elapsed()
1928        );
1929
1930        // The storage controller knows about the introspection collections now, so we can start
1931        // sinking introspection updates in the compute controller. It makes sense to do that as
1932        // soon as possible, to avoid updates piling up in the compute controller's internal
1933        // buffers.
1934        self.controller.start_compute_introspection_sink();
1935
1936        let optimize_dataflows_start = Instant::now();
1937        info!("startup: coordinator init: bootstrap: optimize dataflow plans beginning");
1938        let entries: Vec<_> = self.catalog().entries().cloned().collect();
1939        let uncached_global_exps = self.bootstrap_dataflow_plans(&entries, cached_global_exprs)?;
1940        info!(
1941            "startup: coordinator init: bootstrap: optimize dataflow plans complete in {:?}",
1942            optimize_dataflows_start.elapsed()
1943        );
1944
1945        // We don't need to wait for the cache to update.
1946        let _fut = self.catalog().update_expression_cache(
1947            uncached_local_exprs.into_iter().collect(),
1948            uncached_global_exps.into_iter().collect(),
1949        );
1950
1951        // Select dataflow as-ofs. This step relies on the storage collections created by
1952        // `bootstrap_storage_collections` and the dataflow plans created by
1953        // `bootstrap_dataflow_plans`.
1954        let bootstrap_as_ofs_start = Instant::now();
1955        info!("startup: coordinator init: bootstrap: dataflow as-of bootstrapping beginning");
1956        let dataflow_read_holds = self.bootstrap_dataflow_as_ofs().await;
1957        info!(
1958            "startup: coordinator init: bootstrap: dataflow as-of bootstrapping complete in {:?}",
1959            bootstrap_as_ofs_start.elapsed()
1960        );
1961
1962        let postamble_start = Instant::now();
1963        info!("startup: coordinator init: bootstrap: postamble beginning");
1964
1965        let logs: BTreeSet<_> = BUILTINS::logs()
1966            .map(|log| self.catalog().resolve_builtin_log(log))
1967            .flat_map(|item_id| self.catalog().get_global_ids(&item_id))
1968            .collect();
1969
1970        let mut privatelink_connections = BTreeMap::new();
1971
1972        for entry in &entries {
1973            debug!(
1974                "coordinator init: installing {} {}",
1975                entry.item().typ(),
1976                entry.id()
1977            );
1978            let mut policy = entry.item().initial_logical_compaction_window();
1979            match entry.item() {
1980                // Currently catalog item rebuild assumes that sinks and
1981                // indexes are always built individually and does not store information
1982                // about how it was built. If we start building multiple sinks and/or indexes
1983                // using a single dataflow, we have to make sure the rebuild process re-runs
1984                // the same multiple-build dataflow.
1985                CatalogItem::Source(source) => {
1986                    // Propagate source compaction windows to subsources if needed.
1987                    if source.custom_logical_compaction_window.is_none() {
1988                        if let DataSourceDesc::IngestionExport { ingestion_id, .. } =
1989                            source.data_source
1990                        {
1991                            policy = Some(
1992                                self.catalog()
1993                                    .get_entry(&ingestion_id)
1994                                    .source()
1995                                    .expect("must be source")
1996                                    .custom_logical_compaction_window
1997                                    .unwrap_or_default(),
1998                            );
1999                        }
2000                    }
2001                    policies_to_set
2002                        .entry(policy.expect("sources have a compaction window"))
2003                        .or_insert_with(Default::default)
2004                        .storage_ids
2005                        .insert(source.global_id());
2006                }
2007                CatalogItem::Table(table) => {
2008                    policies_to_set
2009                        .entry(policy.expect("tables have a compaction window"))
2010                        .or_insert_with(Default::default)
2011                        .storage_ids
2012                        .extend(table.global_ids());
2013                }
2014                CatalogItem::Index(idx) => {
2015                    let policy_entry = policies_to_set
2016                        .entry(policy.expect("indexes have a compaction window"))
2017                        .or_insert_with(Default::default);
2018
2019                    if logs.contains(&idx.on) {
2020                        policy_entry
2021                            .compute_ids
2022                            .entry(idx.cluster_id)
2023                            .or_insert_with(BTreeSet::new)
2024                            .insert(idx.global_id());
2025                    } else {
2026                        let df_desc = self
2027                            .catalog()
2028                            .try_get_physical_plan(&idx.global_id())
2029                            .expect("added in `bootstrap_dataflow_plans`")
2030                            .clone();
2031
2032                        let df_meta = self
2033                            .catalog()
2034                            .try_get_dataflow_metainfo(&idx.global_id())
2035                            .expect("added in `bootstrap_dataflow_plans`");
2036
2037                        if self.catalog().state().system_config().enable_mz_notices() {
2038                            // Collect optimization hint updates.
2039                            self.catalog().state().pack_optimizer_notices(
2040                                &mut builtin_table_updates,
2041                                df_meta.optimizer_notices.iter(),
2042                                Diff::ONE,
2043                            );
2044                        }
2045
2046                        // What follows is morally equivalent to `self.ship_dataflow(df, idx.cluster_id)`,
2047                        // but we cannot call that as it will also downgrade the read hold on the index.
2048                        policy_entry
2049                            .compute_ids
2050                            .entry(idx.cluster_id)
2051                            .or_insert_with(Default::default)
2052                            .extend(df_desc.export_ids());
2053
2054                        self.controller
2055                            .compute
2056                            .create_dataflow(idx.cluster_id, df_desc, None)
2057                            .unwrap_or_terminate("cannot fail to create dataflows");
2058                    }
2059                }
2060                CatalogItem::View(_) => (),
2061                CatalogItem::MaterializedView(mview) => {
2062                    policies_to_set
2063                        .entry(policy.expect("materialized views have a compaction window"))
2064                        .or_insert_with(Default::default)
2065                        .storage_ids
2066                        .insert(mview.global_id());
2067
2068                    let mut df_desc = self
2069                        .catalog()
2070                        .try_get_physical_plan(&mview.global_id())
2071                        .expect("added in `bootstrap_dataflow_plans`")
2072                        .clone();
2073
2074                    if let Some(initial_as_of) = mview.initial_as_of.clone() {
2075                        df_desc.set_initial_as_of(initial_as_of);
2076                    }
2077
2078                    // If we have a refresh schedule that has a last refresh, then set the `until` to the last refresh.
2079                    let until = mview
2080                        .refresh_schedule
2081                        .as_ref()
2082                        .and_then(|s| s.last_refresh())
2083                        .and_then(|r| r.try_step_forward());
2084                    if let Some(until) = until {
2085                        df_desc.until.meet_assign(&Antichain::from_elem(until));
2086                    }
2087
2088                    let df_meta = self
2089                        .catalog()
2090                        .try_get_dataflow_metainfo(&mview.global_id())
2091                        .expect("added in `bootstrap_dataflow_plans`");
2092
2093                    if self.catalog().state().system_config().enable_mz_notices() {
2094                        // Collect optimization hint updates.
2095                        self.catalog().state().pack_optimizer_notices(
2096                            &mut builtin_table_updates,
2097                            df_meta.optimizer_notices.iter(),
2098                            Diff::ONE,
2099                        );
2100                    }
2101
2102                    self.ship_dataflow(df_desc, mview.cluster_id, None).await;
2103                }
2104                CatalogItem::Sink(sink) => {
2105                    policies_to_set
2106                        .entry(CompactionWindow::Default)
2107                        .or_insert_with(Default::default)
2108                        .storage_ids
2109                        .insert(sink.global_id());
2110                }
2111                CatalogItem::Connection(catalog_connection) => {
2112                    if let ConnectionDetails::AwsPrivatelink(conn) = &catalog_connection.details {
2113                        privatelink_connections.insert(
2114                            entry.id(),
2115                            VpcEndpointConfig {
2116                                aws_service_name: conn.service_name.clone(),
2117                                availability_zone_ids: conn.availability_zones.clone(),
2118                            },
2119                        );
2120                    }
2121                }
2122                CatalogItem::ContinualTask(ct) => {
2123                    policies_to_set
2124                        .entry(policy.expect("continual tasks have a compaction window"))
2125                        .or_insert_with(Default::default)
2126                        .storage_ids
2127                        .insert(ct.global_id());
2128
2129                    let mut df_desc = self
2130                        .catalog()
2131                        .try_get_physical_plan(&ct.global_id())
2132                        .expect("added in `bootstrap_dataflow_plans`")
2133                        .clone();
2134
2135                    if let Some(initial_as_of) = ct.initial_as_of.clone() {
2136                        df_desc.set_initial_as_of(initial_as_of);
2137                    }
2138
2139                    let df_meta = self
2140                        .catalog()
2141                        .try_get_dataflow_metainfo(&ct.global_id())
2142                        .expect("added in `bootstrap_dataflow_plans`");
2143
2144                    if self.catalog().state().system_config().enable_mz_notices() {
2145                        // Collect optimization hint updates.
2146                        self.catalog().state().pack_optimizer_notices(
2147                            &mut builtin_table_updates,
2148                            df_meta.optimizer_notices.iter(),
2149                            Diff::ONE,
2150                        );
2151                    }
2152
2153                    self.ship_dataflow(df_desc, ct.cluster_id, None).await;
2154                }
2155                // Nothing to do for these cases
2156                CatalogItem::Log(_)
2157                | CatalogItem::Type(_)
2158                | CatalogItem::Func(_)
2159                | CatalogItem::Secret(_) => {}
2160            }
2161        }
2162
2163        if let Some(cloud_resource_controller) = &self.cloud_resource_controller {
2164            // Clean up any extraneous VpcEndpoints that shouldn't exist.
2165            let existing_vpc_endpoints = cloud_resource_controller
2166                .list_vpc_endpoints()
2167                .await
2168                .context("list vpc endpoints")?;
2169            let existing_vpc_endpoints = BTreeSet::from_iter(existing_vpc_endpoints.into_keys());
2170            let desired_vpc_endpoints = privatelink_connections.keys().cloned().collect();
2171            let vpc_endpoints_to_remove = existing_vpc_endpoints.difference(&desired_vpc_endpoints);
2172            for id in vpc_endpoints_to_remove {
2173                cloud_resource_controller
2174                    .delete_vpc_endpoint(*id)
2175                    .await
2176                    .context("deleting extraneous vpc endpoint")?;
2177            }
2178
2179            // Ensure desired VpcEndpoints are up to date.
2180            for (id, spec) in privatelink_connections {
2181                cloud_resource_controller
2182                    .ensure_vpc_endpoint(id, spec)
2183                    .await
2184                    .context("ensuring vpc endpoint")?;
2185            }
2186        }
2187
2188        // Having installed all entries, creating all constraints, we can now drop read holds and
2189        // relax read policies.
2190        drop(dataflow_read_holds);
2191        // TODO -- Improve `initialize_read_policies` API so we can avoid calling this in a loop.
2192        for (cw, policies) in policies_to_set {
2193            self.initialize_read_policies(&policies, cw).await;
2194        }
2195
2196        // Expose mapping from T-shirt sizes to actual sizes
2197        builtin_table_updates.extend(
2198            self.catalog().state().resolve_builtin_table_updates(
2199                self.catalog().state().pack_all_replica_size_updates(),
2200            ),
2201        );
2202
2203        debug!("startup: coordinator init: bootstrap: initializing migrated builtin tables");
2204        // When 0dt is enabled, we create new shards for any migrated builtin storage collections.
2205        // In read-only mode, the migrated builtin tables (which are a subset of migrated builtin
2206        // storage collections) need to be back-filled so that any dependent dataflow can be
2207        // hydrated. Additionally, these shards are not registered with the txn-shard, and cannot
2208        // be registered while in read-only, so they are written to directly.
2209        let migrated_updates_fut = if self.controller.read_only() {
2210            let min_timestamp = Timestamp::minimum();
2211            let migrated_builtin_table_updates: Vec<_> = builtin_table_updates
2212                .extract_if(.., |update| {
2213                    let gid = self.catalog().get_entry(&update.id).latest_global_id();
2214                    migrated_storage_collections_0dt.contains(&update.id)
2215                        && self
2216                            .controller
2217                            .storage_collections
2218                            .collection_frontiers(gid)
2219                            .expect("all tables are registered")
2220                            .write_frontier
2221                            .elements()
2222                            == &[min_timestamp]
2223                })
2224                .collect();
2225            if migrated_builtin_table_updates.is_empty() {
2226                futures::future::ready(()).boxed()
2227            } else {
2228                // Group all updates per-table.
2229                let mut grouped_appends: BTreeMap<GlobalId, Vec<TableData>> = BTreeMap::new();
2230                for update in migrated_builtin_table_updates {
2231                    let gid = self.catalog().get_entry(&update.id).latest_global_id();
2232                    grouped_appends.entry(gid).or_default().push(update.data);
2233                }
2234                info!(
2235                    "coordinator init: rehydrating migrated builtin tables in read-only mode: {:?}",
2236                    grouped_appends.keys().collect::<Vec<_>>()
2237                );
2238
2239                // Consolidate Row data, staged batches must already be consolidated.
2240                let mut all_appends = Vec::with_capacity(grouped_appends.len());
2241                for (item_id, table_data) in grouped_appends.into_iter() {
2242                    let mut all_rows = Vec::new();
2243                    let mut all_data = Vec::new();
2244                    for data in table_data {
2245                        match data {
2246                            TableData::Rows(rows) => all_rows.extend(rows),
2247                            TableData::Batches(_) => all_data.push(data),
2248                        }
2249                    }
2250                    differential_dataflow::consolidation::consolidate(&mut all_rows);
2251                    all_data.push(TableData::Rows(all_rows));
2252
2253                    // TODO(parkmycar): Use SmallVec throughout.
2254                    all_appends.push((item_id, all_data));
2255                }
2256
2257                let fut = self
2258                    .controller
2259                    .storage
2260                    .append_table(min_timestamp, boot_ts.step_forward(), all_appends)
2261                    .expect("cannot fail to append");
2262                async {
2263                    fut.await
2264                        .expect("One-shot shouldn't be dropped during bootstrap")
2265                        .unwrap_or_terminate("cannot fail to append")
2266                }
2267                .boxed()
2268            }
2269        } else {
2270            futures::future::ready(()).boxed()
2271        };
2272
2273        info!(
2274            "startup: coordinator init: bootstrap: postamble complete in {:?}",
2275            postamble_start.elapsed()
2276        );
2277
2278        let builtin_update_start = Instant::now();
2279        info!("startup: coordinator init: bootstrap: generate builtin updates beginning");
2280
2281        if self.controller.read_only() {
2282            info!(
2283                "coordinator init: bootstrap: stashing builtin table updates while in read-only mode"
2284            );
2285
2286            // TODO(jkosh44) Optimize deserializing the audit log in read-only mode.
2287            let audit_join_start = Instant::now();
2288            info!("startup: coordinator init: bootstrap: audit log deserialization beginning");
2289            let audit_log_updates: Vec<_> = audit_logs_iterator
2290                .map(|(audit_log, ts)| StateUpdate {
2291                    kind: StateUpdateKind::AuditLog(audit_log),
2292                    ts,
2293                    diff: StateDiff::Addition,
2294                })
2295                .collect();
2296            let audit_log_builtin_table_updates = self
2297                .catalog()
2298                .state()
2299                .generate_builtin_table_updates(audit_log_updates);
2300            builtin_table_updates.extend(audit_log_builtin_table_updates);
2301            info!(
2302                "startup: coordinator init: bootstrap: audit log deserialization complete in {:?}",
2303                audit_join_start.elapsed()
2304            );
2305            self.buffered_builtin_table_updates
2306                .as_mut()
2307                .expect("in read-only mode")
2308                .append(&mut builtin_table_updates);
2309        } else {
2310            self.bootstrap_tables(&entries, builtin_table_updates, audit_logs_iterator)
2311                .await;
2312        };
2313        info!(
2314            "startup: coordinator init: bootstrap: generate builtin updates complete in {:?}",
2315            builtin_update_start.elapsed()
2316        );
2317
2318        let cleanup_secrets_start = Instant::now();
2319        info!("startup: coordinator init: bootstrap: generate secret cleanup beginning");
2320        // Cleanup orphaned secrets. Errors during list() or delete() do not
2321        // need to prevent bootstrap from succeeding; we will retry next
2322        // startup.
2323        {
2324            // Destructure Self so we can selectively move fields into the async
2325            // task.
2326            let Self {
2327                secrets_controller,
2328                catalog,
2329                ..
2330            } = self;
2331
2332            let next_user_item_id = catalog.get_next_user_item_id().await?;
2333            let next_system_item_id = catalog.get_next_system_item_id().await?;
2334            let read_only = self.controller.read_only();
2335            // Fetch all IDs from the catalog to future-proof against other
2336            // things using secrets. Today, SECRET and CONNECTION objects use
2337            // secrets_controller.ensure, but more things could in the future
2338            // that would be easy to miss adding here.
2339            let catalog_ids: BTreeSet<CatalogItemId> =
2340                catalog.entries().map(|entry| entry.id()).collect();
2341            let secrets_controller = Arc::clone(secrets_controller);
2342
2343            spawn(|| "cleanup-orphaned-secrets", async move {
2344                if read_only {
2345                    info!(
2346                        "coordinator init: not cleaning up orphaned secrets while in read-only mode"
2347                    );
2348                    return;
2349                }
2350                info!("coordinator init: cleaning up orphaned secrets");
2351
2352                match secrets_controller.list().await {
2353                    Ok(controller_secrets) => {
2354                        let controller_secrets: BTreeSet<CatalogItemId> =
2355                            controller_secrets.into_iter().collect();
2356                        let orphaned = controller_secrets.difference(&catalog_ids);
2357                        for id in orphaned {
2358                            let id_too_large = match id {
2359                                CatalogItemId::System(id) => *id >= next_system_item_id,
2360                                CatalogItemId::User(id) => *id >= next_user_item_id,
2361                                CatalogItemId::IntrospectionSourceIndex(_)
2362                                | CatalogItemId::Transient(_) => false,
2363                            };
2364                            if id_too_large {
2365                                info!(
2366                                    %next_user_item_id, %next_system_item_id,
2367                                    "coordinator init: not deleting orphaned secret {id} that was likely created by a newer deploy generation"
2368                                );
2369                            } else {
2370                                info!("coordinator init: deleting orphaned secret {id}");
2371                                fail_point!("orphan_secrets");
2372                                if let Err(e) = secrets_controller.delete(*id).await {
2373                                    warn!(
2374                                        "Dropping orphaned secret has encountered an error: {}",
2375                                        e
2376                                    );
2377                                }
2378                            }
2379                        }
2380                    }
2381                    Err(e) => warn!("Failed to list secrets during orphan cleanup: {:?}", e),
2382                }
2383            });
2384        }
2385        info!(
2386            "startup: coordinator init: bootstrap: generate secret cleanup complete in {:?}",
2387            cleanup_secrets_start.elapsed()
2388        );
2389
2390        // Run all of our final steps concurrently.
2391        let final_steps_start = Instant::now();
2392        info!(
2393            "startup: coordinator init: bootstrap: migrate builtin tables in read-only mode beginning"
2394        );
2395        migrated_updates_fut
2396            .instrument(info_span!("coord::bootstrap::final"))
2397            .await;
2398
2399        debug!(
2400            "startup: coordinator init: bootstrap: announcing completion of initialization to controller"
2401        );
2402        // Announce the completion of initialization.
2403        self.controller.initialization_complete();
2404
2405        // Initialize unified introspection.
2406        self.bootstrap_introspection_subscribes().await;
2407
2408        info!(
2409            "startup: coordinator init: bootstrap: migrate builtin tables in read-only mode complete in {:?}",
2410            final_steps_start.elapsed()
2411        );
2412
2413        info!(
2414            "startup: coordinator init: bootstrap complete in {:?}",
2415            bootstrap_start.elapsed()
2416        );
2417        Ok(())
2418    }
2419
2420    /// Prepares tables for writing by resetting them to a known state and
2421    /// appending the given builtin table updates. The timestamp oracle
2422    /// will be advanced to the write timestamp of the append when this
2423    /// method returns.
2424    #[allow(clippy::async_yields_async)]
2425    #[instrument]
2426    async fn bootstrap_tables(
2427        &mut self,
2428        entries: &[CatalogEntry],
2429        mut builtin_table_updates: Vec<BuiltinTableUpdate>,
2430        audit_logs_iterator: AuditLogIterator,
2431    ) {
2432        /// Smaller helper struct of metadata for bootstrapping tables.
2433        struct TableMetadata<'a> {
2434            id: CatalogItemId,
2435            name: &'a QualifiedItemName,
2436            table: &'a Table,
2437        }
2438
2439        // Filter our entries down to just tables.
2440        let table_metas: Vec<_> = entries
2441            .into_iter()
2442            .filter_map(|entry| {
2443                entry.table().map(|table| TableMetadata {
2444                    id: entry.id(),
2445                    name: entry.name(),
2446                    table,
2447                })
2448            })
2449            .collect();
2450
2451        // Append empty batches to advance the timestamp of all tables.
2452        debug!("coordinator init: advancing all tables to current timestamp");
2453        let WriteTimestamp {
2454            timestamp: write_ts,
2455            advance_to,
2456        } = self.get_local_write_ts().await;
2457        let appends = table_metas
2458            .iter()
2459            .map(|meta| (meta.table.global_id_writes(), Vec::new()))
2460            .collect();
2461        // Append the tables in the background. We apply the write timestamp before getting a read
2462        // timestamp and reading a snapshot of each table, so the snapshots will block on their own
2463        // until the appends are complete.
2464        let table_fence_rx = self
2465            .controller
2466            .storage
2467            .append_table(write_ts.clone(), advance_to, appends)
2468            .expect("invalid updates");
2469
2470        self.apply_local_write(write_ts).await;
2471
2472        // Add builtin table updates the clear the contents of all system tables
2473        debug!("coordinator init: resetting system tables");
2474        let read_ts = self.get_local_read_ts().await;
2475
2476        // Filter out the 'mz_storage_usage_by_shard' table since we need to retain that info for
2477        // billing purposes.
2478        let mz_storage_usage_by_shard_schema: SchemaSpecifier = self
2479            .catalog()
2480            .resolve_system_schema(MZ_STORAGE_USAGE_BY_SHARD.schema)
2481            .into();
2482        let is_storage_usage_by_shard = |meta: &TableMetadata| -> bool {
2483            meta.name.item == MZ_STORAGE_USAGE_BY_SHARD.name
2484                && meta.name.qualifiers.schema_spec == mz_storage_usage_by_shard_schema
2485        };
2486
2487        let mut retraction_tasks = Vec::new();
2488        let mut system_tables: Vec<_> = table_metas
2489            .iter()
2490            .filter(|meta| meta.id.is_system() && !is_storage_usage_by_shard(meta))
2491            .collect();
2492
2493        // Special case audit events because it's append only.
2494        let (audit_events_idx, _) = system_tables
2495            .iter()
2496            .find_position(|table| {
2497                table.id == self.catalog().resolve_builtin_table(&MZ_AUDIT_EVENTS)
2498            })
2499            .expect("mz_audit_events must exist");
2500        let audit_events = system_tables.remove(audit_events_idx);
2501        let audit_log_task = self.bootstrap_audit_log_table(
2502            audit_events.id,
2503            audit_events.name,
2504            audit_events.table,
2505            audit_logs_iterator,
2506            read_ts,
2507        );
2508
2509        for system_table in system_tables {
2510            let table_id = system_table.id;
2511            let full_name = self.catalog().resolve_full_name(system_table.name, None);
2512            debug!("coordinator init: resetting system table {full_name} ({table_id})");
2513
2514            // Fetch the current contents of the table for retraction.
2515            let snapshot_fut = self
2516                .controller
2517                .storage_collections
2518                .snapshot_cursor(system_table.table.global_id_writes(), read_ts);
2519            let batch_fut = self
2520                .controller
2521                .storage_collections
2522                .create_update_builder(system_table.table.global_id_writes());
2523
2524            let task = spawn(|| format!("snapshot-{table_id}"), async move {
2525                // Create a TimestamplessUpdateBuilder.
2526                let mut batch = batch_fut
2527                    .await
2528                    .unwrap_or_terminate("cannot fail to create a batch for a BuiltinTable");
2529                tracing::info!(?table_id, "starting snapshot");
2530                // Get a cursor which will emit a consolidated snapshot.
2531                let mut snapshot_cursor = snapshot_fut
2532                    .await
2533                    .unwrap_or_terminate("cannot fail to snapshot");
2534
2535                // Retract the current contents, spilling into our builder.
2536                while let Some(values) = snapshot_cursor.next().await {
2537                    for ((key, _val), _t, d) in values {
2538                        let key = key.expect("builtin table had errors");
2539                        let d_invert = d.neg();
2540                        batch.add(&key, &(), &d_invert).await;
2541                    }
2542                }
2543                tracing::info!(?table_id, "finished snapshot");
2544
2545                let batch = batch.finish().await;
2546                BuiltinTableUpdate::batch(table_id, batch)
2547            });
2548            retraction_tasks.push(task);
2549        }
2550
2551        let retractions_res = futures::future::join_all(retraction_tasks).await;
2552        for retractions in retractions_res {
2553            let retractions = retractions.expect("cannot fail to fetch snapshot");
2554            builtin_table_updates.push(retractions);
2555        }
2556
2557        let audit_join_start = Instant::now();
2558        info!("startup: coordinator init: bootstrap: join audit log deserialization beginning");
2559        let audit_log_updates = audit_log_task
2560            .await
2561            .expect("cannot fail to fetch audit log updates");
2562        let audit_log_builtin_table_updates = self
2563            .catalog()
2564            .state()
2565            .generate_builtin_table_updates(audit_log_updates);
2566        builtin_table_updates.extend(audit_log_builtin_table_updates);
2567        info!(
2568            "startup: coordinator init: bootstrap: join audit log deserialization complete in {:?}",
2569            audit_join_start.elapsed()
2570        );
2571
2572        // Now that the snapshots are complete, the appends must also be complete.
2573        table_fence_rx
2574            .await
2575            .expect("One-shot shouldn't be dropped during bootstrap")
2576            .unwrap_or_terminate("cannot fail to append");
2577
2578        info!("coordinator init: sending builtin table updates");
2579        let (_builtin_updates_fut, write_ts) = self
2580            .builtin_table_update()
2581            .execute(builtin_table_updates)
2582            .await;
2583        info!(?write_ts, "our write ts");
2584        if let Some(write_ts) = write_ts {
2585            self.apply_local_write(write_ts).await;
2586        }
2587    }
2588
2589    /// Prepare updates to the audit log table. The audit log table append only and very large, so
2590    /// we only need to find the events present in `audit_logs_iterator` but not in the audit log
2591    /// table.
2592    #[instrument]
2593    fn bootstrap_audit_log_table<'a>(
2594        &mut self,
2595        table_id: CatalogItemId,
2596        name: &'a QualifiedItemName,
2597        table: &'a Table,
2598        audit_logs_iterator: AuditLogIterator,
2599        read_ts: Timestamp,
2600    ) -> JoinHandle<Vec<StateUpdate>> {
2601        let full_name = self.catalog().resolve_full_name(name, None);
2602        debug!("coordinator init: reconciling audit log: {full_name} ({table_id})");
2603        let current_contents_fut = self
2604            .controller
2605            .storage_collections
2606            .snapshot(table.global_id_writes(), read_ts);
2607        spawn(|| format!("snapshot-audit-log-{table_id}"), async move {
2608            let current_contents = current_contents_fut
2609                .await
2610                .unwrap_or_terminate("cannot fail to fetch snapshot");
2611            let contents_len = current_contents.len();
2612            debug!("coordinator init: audit log table ({table_id}) size {contents_len}");
2613
2614            // Fetch the largest audit log event ID that has been written to the table.
2615            let max_table_id = current_contents
2616                .into_iter()
2617                .filter(|(_, diff)| *diff == 1)
2618                .map(|(row, _diff)| row.unpack_first().unwrap_uint64())
2619                .sorted()
2620                .rev()
2621                .next();
2622
2623            // Filter audit log catalog updates to those that are not present in the table.
2624            audit_logs_iterator
2625                .take_while(|(audit_log, _)| match max_table_id {
2626                    Some(id) => audit_log.event.sortable_id() > id,
2627                    None => true,
2628                })
2629                .map(|(audit_log, ts)| StateUpdate {
2630                    kind: StateUpdateKind::AuditLog(audit_log),
2631                    ts,
2632                    diff: StateDiff::Addition,
2633                })
2634                .collect::<Vec<_>>()
2635        })
2636    }
2637
2638    /// Initializes all storage collections required by catalog objects in the storage controller.
2639    ///
2640    /// This method takes care of collection creation, as well as migration of existing
2641    /// collections.
2642    ///
2643    /// Creating all storage collections in a single `create_collections` call, rather than on
2644    /// demand, is more efficient as it reduces the number of writes to durable storage. It also
2645    /// allows subsequent bootstrap logic to fetch metadata (such as frontiers) of arbitrary
2646    /// storage collections, without needing to worry about dependency order.
2647    ///
2648    /// `migrated_storage_collections` is a set of builtin storage collections that have been
2649    /// migrated and should be handled specially.
2650    #[instrument]
2651    async fn bootstrap_storage_collections(
2652        &mut self,
2653        migrated_storage_collections: &BTreeSet<CatalogItemId>,
2654    ) {
2655        let catalog = self.catalog();
2656        let source_status_collection_id = catalog
2657            .resolve_builtin_storage_collection(&mz_catalog::builtin::MZ_SOURCE_STATUS_HISTORY);
2658        let source_status_collection_id = catalog
2659            .get_entry(&source_status_collection_id)
2660            .latest_global_id();
2661
2662        let source_desc = |object_id: GlobalId,
2663                           data_source: &DataSourceDesc,
2664                           desc: &RelationDesc,
2665                           timeline: &Timeline| {
2666            let (data_source, status_collection_id) = match data_source.clone() {
2667                // Re-announce the source description.
2668                DataSourceDesc::Ingestion { desc, cluster_id } => {
2669                    let desc = desc.into_inline_connection(catalog.state());
2670                    let ingestion = IngestionDescription::new(desc, cluster_id, object_id);
2671
2672                    (
2673                        DataSource::Ingestion(ingestion),
2674                        Some(source_status_collection_id),
2675                    )
2676                }
2677                DataSourceDesc::OldSyntaxIngestion {
2678                    desc,
2679                    progress_subsource,
2680                    data_config,
2681                    details,
2682                    cluster_id,
2683                } => {
2684                    let desc = desc.into_inline_connection(catalog.state());
2685                    let data_config = data_config.into_inline_connection(catalog.state());
2686                    // TODO(parkmycar): We should probably check the type here, but I'm not sure if
2687                    // this will always be a Source or a Table.
2688                    let progress_subsource =
2689                        catalog.get_entry(&progress_subsource).latest_global_id();
2690                    let mut ingestion =
2691                        IngestionDescription::new(desc, cluster_id, progress_subsource);
2692                    let legacy_export = SourceExport {
2693                        storage_metadata: (),
2694                        data_config,
2695                        details,
2696                    };
2697                    ingestion.source_exports.insert(object_id, legacy_export);
2698
2699                    (
2700                        DataSource::Ingestion(ingestion),
2701                        Some(source_status_collection_id),
2702                    )
2703                }
2704                DataSourceDesc::IngestionExport {
2705                    ingestion_id,
2706                    external_reference: _,
2707                    details,
2708                    data_config,
2709                } => {
2710                    // TODO(parkmycar): We should probably check the type here, but I'm not sure if
2711                    // this will always be a Source or a Table.
2712                    let ingestion_id = catalog.get_entry(&ingestion_id).latest_global_id();
2713                    (
2714                        DataSource::IngestionExport {
2715                            ingestion_id,
2716                            details,
2717                            data_config: data_config.into_inline_connection(catalog.state()),
2718                        },
2719                        Some(source_status_collection_id),
2720                    )
2721                }
2722                DataSourceDesc::Webhook { .. } => {
2723                    (DataSource::Webhook, Some(source_status_collection_id))
2724                }
2725                DataSourceDesc::Progress => (DataSource::Progress, None),
2726                DataSourceDesc::Introspection(introspection) => {
2727                    (DataSource::Introspection(introspection), None)
2728                }
2729            };
2730            CollectionDescription {
2731                desc: desc.clone(),
2732                data_source,
2733                since: None,
2734                status_collection_id,
2735                timeline: Some(timeline.clone()),
2736            }
2737        };
2738
2739        let mut compute_collections = vec![];
2740        let mut collections = vec![];
2741        let mut new_builtin_continual_tasks = vec![];
2742        for entry in catalog.entries() {
2743            match entry.item() {
2744                CatalogItem::Source(source) => {
2745                    collections.push((
2746                        source.global_id(),
2747                        source_desc(
2748                            source.global_id(),
2749                            &source.data_source,
2750                            &source.desc,
2751                            &source.timeline,
2752                        ),
2753                    ));
2754                }
2755                CatalogItem::Table(table) => {
2756                    match &table.data_source {
2757                        TableDataSource::TableWrites { defaults: _ } => {
2758                            let versions: BTreeMap<_, _> = table
2759                                .collection_descs()
2760                                .map(|(gid, version, desc)| (version, (gid, desc)))
2761                                .collect();
2762                            let collection_descs = versions.iter().map(|(version, (gid, desc))| {
2763                                let next_version = version.bump();
2764                                let primary_collection =
2765                                    versions.get(&next_version).map(|(gid, _desc)| gid).copied();
2766                                let collection_desc = CollectionDescription::for_table(
2767                                    desc.clone(),
2768                                    primary_collection,
2769                                );
2770
2771                                (*gid, collection_desc)
2772                            });
2773                            collections.extend(collection_descs);
2774                        }
2775                        TableDataSource::DataSource {
2776                            desc: data_source_desc,
2777                            timeline,
2778                        } => {
2779                            // TODO(alter_table): Support versioning tables that read from sources.
2780                            soft_assert_eq_or_log!(table.collections.len(), 1);
2781                            let collection_descs =
2782                                table.collection_descs().map(|(gid, _version, desc)| {
2783                                    (
2784                                        gid,
2785                                        source_desc(
2786                                            entry.latest_global_id(),
2787                                            data_source_desc,
2788                                            &desc,
2789                                            timeline,
2790                                        ),
2791                                    )
2792                                });
2793                            collections.extend(collection_descs);
2794                        }
2795                    };
2796                }
2797                CatalogItem::MaterializedView(mv) => {
2798                    let collection_desc = CollectionDescription {
2799                        desc: mv.desc.clone(),
2800                        data_source: DataSource::Other,
2801                        since: mv.initial_as_of.clone(),
2802                        status_collection_id: None,
2803                        timeline: None,
2804                    };
2805                    compute_collections.push((mv.global_id(), mv.desc.clone()));
2806                    collections.push((mv.global_id(), collection_desc));
2807                }
2808                CatalogItem::ContinualTask(ct) => {
2809                    let collection_desc = CollectionDescription {
2810                        desc: ct.desc.clone(),
2811                        data_source: DataSource::Other,
2812                        since: ct.initial_as_of.clone(),
2813                        status_collection_id: None,
2814                        timeline: None,
2815                    };
2816                    if ct.global_id().is_system() && collection_desc.since.is_none() {
2817                        // We need a non-0 since to make as_of selection work. Fill it in below with
2818                        // the `bootstrap_builtin_continual_tasks` call, which can only be run after
2819                        // `create_collections_for_bootstrap`.
2820                        new_builtin_continual_tasks.push((ct.global_id(), collection_desc));
2821                    } else {
2822                        compute_collections.push((ct.global_id(), ct.desc.clone()));
2823                        collections.push((ct.global_id(), collection_desc));
2824                    }
2825                }
2826                CatalogItem::Sink(sink) => {
2827                    let storage_sink_from_entry = self.catalog().get_entry_by_global_id(&sink.from);
2828                    let from_desc = storage_sink_from_entry
2829                        .desc(&self.catalog().resolve_full_name(
2830                            storage_sink_from_entry.name(),
2831                            storage_sink_from_entry.conn_id(),
2832                        ))
2833                        .expect("sinks can only be built on items with descs")
2834                        .into_owned();
2835                    let collection_desc = CollectionDescription {
2836                        // TODO(sinks): make generic once we have more than one sink type.
2837                        desc: KAFKA_PROGRESS_DESC.clone(),
2838                        data_source: DataSource::Sink {
2839                            desc: ExportDescription {
2840                                sink: StorageSinkDesc {
2841                                    from: sink.from,
2842                                    from_desc,
2843                                    connection: sink
2844                                        .connection
2845                                        .clone()
2846                                        .into_inline_connection(self.catalog().state()),
2847                                    envelope: sink.envelope,
2848                                    as_of: Antichain::from_elem(Timestamp::minimum()),
2849                                    with_snapshot: sink.with_snapshot,
2850                                    version: sink.version,
2851                                    from_storage_metadata: (),
2852                                    to_storage_metadata: (),
2853                                },
2854                                instance_id: sink.cluster_id,
2855                            },
2856                        },
2857                        since: None,
2858                        status_collection_id: None,
2859                        timeline: None,
2860                    };
2861                    collections.push((sink.global_id, collection_desc));
2862                }
2863                _ => (),
2864            }
2865        }
2866
2867        let register_ts = if self.controller.read_only() {
2868            self.get_local_read_ts().await
2869        } else {
2870            // Getting a write timestamp bumps the write timestamp in the
2871            // oracle, which we're not allowed in read-only mode.
2872            self.get_local_write_ts().await.timestamp
2873        };
2874
2875        let storage_metadata = self.catalog.state().storage_metadata();
2876        let migrated_storage_collections = migrated_storage_collections
2877            .into_iter()
2878            .flat_map(|item_id| self.catalog.get_entry(item_id).global_ids())
2879            .collect();
2880
2881        // Before possibly creating collections, make sure their schemas are correct.
2882        //
2883        // Across different versions of Materialize the nullability of columns can change based on
2884        // updates to our optimizer.
2885        self.controller
2886            .storage
2887            .evolve_nullability_for_bootstrap(storage_metadata, compute_collections)
2888            .await
2889            .unwrap_or_terminate("cannot fail to evolve collections");
2890
2891        self.controller
2892            .storage
2893            .create_collections_for_bootstrap(
2894                storage_metadata,
2895                Some(register_ts),
2896                collections,
2897                &migrated_storage_collections,
2898            )
2899            .await
2900            .unwrap_or_terminate("cannot fail to create collections");
2901
2902        self.bootstrap_builtin_continual_tasks(new_builtin_continual_tasks)
2903            .await;
2904
2905        if !self.controller.read_only() {
2906            self.apply_local_write(register_ts).await;
2907        }
2908    }
2909
2910    /// Make as_of selection happy for builtin CTs. Ideally we'd write the
2911    /// initial as_of down in the durable catalog, but that's hard because of
2912    /// boot ordering. Instead, we set the since of the storage collection to
2913    /// something that's a reasonable lower bound for the as_of. Then, if the
2914    /// upper is 0, the as_of selection code will allow us to jump it forward to
2915    /// this since.
2916    async fn bootstrap_builtin_continual_tasks(
2917        &mut self,
2918        // TODO(alter_table): Switch to CatalogItemId.
2919        mut collections: Vec<(GlobalId, CollectionDescription<Timestamp>)>,
2920    ) {
2921        for (id, collection) in &mut collections {
2922            let entry = self.catalog.get_entry_by_global_id(id);
2923            let ct = match &entry.item {
2924                CatalogItem::ContinualTask(ct) => ct.clone(),
2925                _ => unreachable!("only called with continual task builtins"),
2926            };
2927            let debug_name = self
2928                .catalog()
2929                .resolve_full_name(entry.name(), None)
2930                .to_string();
2931            let (_optimized_plan, physical_plan, _metainfo) = self
2932                .optimize_create_continual_task(&ct, *id, self.owned_catalog(), debug_name)
2933                .expect("builtin CT should optimize successfully");
2934
2935            // Determine an as of for the new continual task.
2936            let mut id_bundle = dataflow_import_id_bundle(&physical_plan, ct.cluster_id);
2937            // Can't acquire a read hold on ourselves because we don't exist yet.
2938            id_bundle.storage_ids.remove(id);
2939            let read_holds = self.acquire_read_holds(&id_bundle);
2940            let as_of = read_holds.least_valid_read();
2941
2942            collection.since = Some(as_of.clone());
2943        }
2944        self.controller
2945            .storage
2946            .create_collections(self.catalog.state().storage_metadata(), None, collections)
2947            .await
2948            .unwrap_or_terminate("cannot fail to create collections");
2949    }
2950
2951    /// Invokes the optimizer on all indexes and materialized views in the catalog and inserts the
2952    /// resulting dataflow plans into the catalog state.
2953    ///
2954    /// `ordered_catalog_entries` must be sorted in dependency order, with dependencies ordered
2955    /// before their dependants.
2956    ///
2957    /// This method does not perform timestamp selection for the dataflows, nor does it create them
2958    /// in the compute controller. Both of these steps happen later during bootstrapping.
2959    ///
2960    /// Returns a map of expressions that were not cached.
2961    #[instrument]
2962    fn bootstrap_dataflow_plans(
2963        &mut self,
2964        ordered_catalog_entries: &[CatalogEntry],
2965        mut cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
2966    ) -> Result<BTreeMap<GlobalId, GlobalExpressions>, AdapterError> {
2967        // The optimizer expects to be able to query its `ComputeInstanceSnapshot` for
2968        // collections the current dataflow can depend on. But since we don't yet install anything
2969        // on compute instances, the snapshot information is incomplete. We fix that by manually
2970        // updating `ComputeInstanceSnapshot` objects to ensure they contain collections previously
2971        // optimized.
2972        let mut instance_snapshots = BTreeMap::new();
2973        let mut uncached_expressions = BTreeMap::new();
2974
2975        let optimizer_config = OptimizerConfig::from(self.catalog().system_config());
2976
2977        for entry in ordered_catalog_entries {
2978            match entry.item() {
2979                CatalogItem::Index(idx) => {
2980                    // Collect optimizer parameters.
2981                    let compute_instance =
2982                        instance_snapshots.entry(idx.cluster_id).or_insert_with(|| {
2983                            self.instance_snapshot(idx.cluster_id)
2984                                .expect("compute instance exists")
2985                        });
2986                    let global_id = idx.global_id();
2987
2988                    // The index may already be installed on the compute instance. For example,
2989                    // this is the case for introspection indexes.
2990                    if compute_instance.contains_collection(&global_id) {
2991                        continue;
2992                    }
2993
2994                    let (optimized_plan, physical_plan, metainfo) =
2995                        match cached_global_exprs.remove(&global_id) {
2996                            Some(global_expressions)
2997                                if global_expressions.optimizer_features
2998                                    == optimizer_config.features =>
2999                            {
3000                                debug!("global expression cache hit for {global_id:?}");
3001                                (
3002                                    global_expressions.global_mir,
3003                                    global_expressions.physical_plan,
3004                                    global_expressions.dataflow_metainfos,
3005                                )
3006                            }
3007                            Some(_) | None => {
3008                                let (optimized_plan, global_lir_plan) = {
3009                                    // Build an optimizer for this INDEX.
3010                                    let mut optimizer = optimize::index::Optimizer::new(
3011                                        self.owned_catalog(),
3012                                        compute_instance.clone(),
3013                                        global_id,
3014                                        optimizer_config.clone(),
3015                                        self.optimizer_metrics(),
3016                                    );
3017
3018                                    // MIR ⇒ MIR optimization (global)
3019                                    let index_plan = optimize::index::Index::new(
3020                                        entry.name().clone(),
3021                                        idx.on,
3022                                        idx.keys.to_vec(),
3023                                    );
3024                                    let global_mir_plan = optimizer.optimize(index_plan)?;
3025                                    let optimized_plan = global_mir_plan.df_desc().clone();
3026
3027                                    // MIR ⇒ LIR lowering and LIR ⇒ LIR optimization (global)
3028                                    let global_lir_plan = optimizer.optimize(global_mir_plan)?;
3029
3030                                    (optimized_plan, global_lir_plan)
3031                                };
3032
3033                                let (physical_plan, metainfo) = global_lir_plan.unapply();
3034                                let metainfo = {
3035                                    // Pre-allocate a vector of transient GlobalIds for each notice.
3036                                    let notice_ids =
3037                                        std::iter::repeat_with(|| self.allocate_transient_id())
3038                                            .map(|(_item_id, gid)| gid)
3039                                            .take(metainfo.optimizer_notices.len())
3040                                            .collect::<Vec<_>>();
3041                                    // Return a metainfo with rendered notices.
3042                                    self.catalog().render_notices(
3043                                        metainfo,
3044                                        notice_ids,
3045                                        Some(idx.global_id()),
3046                                    )
3047                                };
3048                                uncached_expressions.insert(
3049                                    global_id,
3050                                    GlobalExpressions {
3051                                        global_mir: optimized_plan.clone(),
3052                                        physical_plan: physical_plan.clone(),
3053                                        dataflow_metainfos: metainfo.clone(),
3054                                        optimizer_features: OptimizerFeatures::from(
3055                                            self.catalog().system_config(),
3056                                        ),
3057                                    },
3058                                );
3059                                (optimized_plan, physical_plan, metainfo)
3060                            }
3061                        };
3062
3063                    let catalog = self.catalog_mut();
3064                    catalog.set_optimized_plan(idx.global_id(), optimized_plan);
3065                    catalog.set_physical_plan(idx.global_id(), physical_plan);
3066                    catalog.set_dataflow_metainfo(idx.global_id(), metainfo);
3067
3068                    compute_instance.insert_collection(idx.global_id());
3069                }
3070                CatalogItem::MaterializedView(mv) => {
3071                    // Collect optimizer parameters.
3072                    let compute_instance =
3073                        instance_snapshots.entry(mv.cluster_id).or_insert_with(|| {
3074                            self.instance_snapshot(mv.cluster_id)
3075                                .expect("compute instance exists")
3076                        });
3077                    let global_id = mv.global_id();
3078
3079                    let (optimized_plan, physical_plan, metainfo) =
3080                        match cached_global_exprs.remove(&global_id) {
3081                            Some(global_expressions)
3082                                if global_expressions.optimizer_features
3083                                    == optimizer_config.features =>
3084                            {
3085                                debug!("global expression cache hit for {global_id:?}");
3086                                (
3087                                    global_expressions.global_mir,
3088                                    global_expressions.physical_plan,
3089                                    global_expressions.dataflow_metainfos,
3090                                )
3091                            }
3092                            Some(_) | None => {
3093                                let (_, internal_view_id) = self.allocate_transient_id();
3094                                let debug_name = self
3095                                    .catalog()
3096                                    .resolve_full_name(entry.name(), None)
3097                                    .to_string();
3098                                let force_non_monotonic = Default::default();
3099
3100                                let (optimized_plan, global_lir_plan) = {
3101                                    // Build an optimizer for this MATERIALIZED VIEW.
3102                                    let mut optimizer = optimize::materialized_view::Optimizer::new(
3103                                        self.owned_catalog().as_optimizer_catalog(),
3104                                        compute_instance.clone(),
3105                                        global_id,
3106                                        internal_view_id,
3107                                        mv.desc.iter_names().cloned().collect(),
3108                                        mv.non_null_assertions.clone(),
3109                                        mv.refresh_schedule.clone(),
3110                                        debug_name,
3111                                        optimizer_config.clone(),
3112                                        self.optimizer_metrics(),
3113                                        force_non_monotonic,
3114                                    );
3115
3116                                    // MIR ⇒ MIR optimization (global)
3117                                    let global_mir_plan =
3118                                        optimizer.optimize(mv.optimized_expr.as_ref().clone())?;
3119                                    let optimized_plan = global_mir_plan.df_desc().clone();
3120
3121                                    // MIR ⇒ LIR lowering and LIR ⇒ LIR optimization (global)
3122                                    let global_lir_plan = optimizer.optimize(global_mir_plan)?;
3123
3124                                    (optimized_plan, global_lir_plan)
3125                                };
3126
3127                                let (physical_plan, metainfo) = global_lir_plan.unapply();
3128                                let metainfo = {
3129                                    // Pre-allocate a vector of transient GlobalIds for each notice.
3130                                    let notice_ids =
3131                                        std::iter::repeat_with(|| self.allocate_transient_id())
3132                                            .map(|(_item_id, global_id)| global_id)
3133                                            .take(metainfo.optimizer_notices.len())
3134                                            .collect::<Vec<_>>();
3135                                    // Return a metainfo with rendered notices.
3136                                    self.catalog().render_notices(
3137                                        metainfo,
3138                                        notice_ids,
3139                                        Some(mv.global_id()),
3140                                    )
3141                                };
3142                                uncached_expressions.insert(
3143                                    global_id,
3144                                    GlobalExpressions {
3145                                        global_mir: optimized_plan.clone(),
3146                                        physical_plan: physical_plan.clone(),
3147                                        dataflow_metainfos: metainfo.clone(),
3148                                        optimizer_features: OptimizerFeatures::from(
3149                                            self.catalog().system_config(),
3150                                        ),
3151                                    },
3152                                );
3153                                (optimized_plan, physical_plan, metainfo)
3154                            }
3155                        };
3156
3157                    let catalog = self.catalog_mut();
3158                    catalog.set_optimized_plan(mv.global_id(), optimized_plan);
3159                    catalog.set_physical_plan(mv.global_id(), physical_plan);
3160                    catalog.set_dataflow_metainfo(mv.global_id(), metainfo);
3161
3162                    compute_instance.insert_collection(mv.global_id());
3163                }
3164                CatalogItem::ContinualTask(ct) => {
3165                    let compute_instance =
3166                        instance_snapshots.entry(ct.cluster_id).or_insert_with(|| {
3167                            self.instance_snapshot(ct.cluster_id)
3168                                .expect("compute instance exists")
3169                        });
3170                    let global_id = ct.global_id();
3171
3172                    let (optimized_plan, physical_plan, metainfo) =
3173                        match cached_global_exprs.remove(&global_id) {
3174                            Some(global_expressions)
3175                                if global_expressions.optimizer_features
3176                                    == optimizer_config.features =>
3177                            {
3178                                debug!("global expression cache hit for {global_id:?}");
3179                                (
3180                                    global_expressions.global_mir,
3181                                    global_expressions.physical_plan,
3182                                    global_expressions.dataflow_metainfos,
3183                                )
3184                            }
3185                            Some(_) | None => {
3186                                let debug_name = self
3187                                    .catalog()
3188                                    .resolve_full_name(entry.name(), None)
3189                                    .to_string();
3190                                let (optimized_plan, physical_plan, metainfo) = self
3191                                    .optimize_create_continual_task(
3192                                        ct,
3193                                        global_id,
3194                                        self.owned_catalog(),
3195                                        debug_name,
3196                                    )?;
3197                                uncached_expressions.insert(
3198                                    global_id,
3199                                    GlobalExpressions {
3200                                        global_mir: optimized_plan.clone(),
3201                                        physical_plan: physical_plan.clone(),
3202                                        dataflow_metainfos: metainfo.clone(),
3203                                        optimizer_features: OptimizerFeatures::from(
3204                                            self.catalog().system_config(),
3205                                        ),
3206                                    },
3207                                );
3208                                (optimized_plan, physical_plan, metainfo)
3209                            }
3210                        };
3211
3212                    let catalog = self.catalog_mut();
3213                    catalog.set_optimized_plan(ct.global_id(), optimized_plan);
3214                    catalog.set_physical_plan(ct.global_id(), physical_plan);
3215                    catalog.set_dataflow_metainfo(ct.global_id(), metainfo);
3216
3217                    compute_instance.insert_collection(ct.global_id());
3218                }
3219                _ => (),
3220            }
3221        }
3222
3223        Ok(uncached_expressions)
3224    }
3225
3226    /// Selects for each compute dataflow an as-of suitable for bootstrapping it.
3227    ///
3228    /// Returns a set of [`ReadHold`]s that ensures the read frontiers of involved collections stay
3229    /// in place and that must not be dropped before all compute dataflows have been created with
3230    /// the compute controller.
3231    ///
3232    /// This method expects all storage collections and dataflow plans to be available, so it must
3233    /// run after [`Coordinator::bootstrap_storage_collections`] and
3234    /// [`Coordinator::bootstrap_dataflow_plans`].
3235    async fn bootstrap_dataflow_as_ofs(&mut self) -> BTreeMap<GlobalId, ReadHold<Timestamp>> {
3236        let mut catalog_ids = Vec::new();
3237        let mut dataflows = Vec::new();
3238        let mut read_policies = BTreeMap::new();
3239        for entry in self.catalog.entries() {
3240            let gid = match entry.item() {
3241                CatalogItem::Index(idx) => idx.global_id(),
3242                CatalogItem::MaterializedView(mv) => mv.global_id(),
3243                CatalogItem::ContinualTask(ct) => ct.global_id(),
3244                CatalogItem::Table(_)
3245                | CatalogItem::Source(_)
3246                | CatalogItem::Log(_)
3247                | CatalogItem::View(_)
3248                | CatalogItem::Sink(_)
3249                | CatalogItem::Type(_)
3250                | CatalogItem::Func(_)
3251                | CatalogItem::Secret(_)
3252                | CatalogItem::Connection(_) => continue,
3253            };
3254            if let Some(plan) = self.catalog.try_get_physical_plan(&gid) {
3255                catalog_ids.push(gid);
3256                dataflows.push(plan.clone());
3257
3258                if let Some(compaction_window) = entry.item().initial_logical_compaction_window() {
3259                    read_policies.insert(gid, compaction_window.into());
3260                }
3261            }
3262        }
3263
3264        let read_ts = self.get_local_read_ts().await;
3265        let read_holds = as_of_selection::run(
3266            &mut dataflows,
3267            &read_policies,
3268            &*self.controller.storage_collections,
3269            read_ts,
3270            self.controller.read_only(),
3271        );
3272
3273        let catalog = self.catalog_mut();
3274        for (id, plan) in catalog_ids.into_iter().zip_eq(dataflows) {
3275            catalog.set_physical_plan(id, plan);
3276        }
3277
3278        read_holds
3279    }
3280
3281    /// Serves the coordinator, receiving commands from users over `cmd_rx`
3282    /// and feedback from dataflow workers over `feedback_rx`.
3283    ///
3284    /// You must call `bootstrap` before calling this method.
3285    ///
3286    /// BOXED FUTURE: As of Nov 2023 the returned Future from this function was 92KB. This would
3287    /// get stored on the stack which is bad for runtime performance, and blow up our stack usage.
3288    /// Because of that we purposefully move this Future onto the heap (i.e. Box it).
3289    fn serve(
3290        mut self,
3291        mut internal_cmd_rx: mpsc::UnboundedReceiver<Message>,
3292        mut strict_serializable_reads_rx: mpsc::UnboundedReceiver<(ConnectionId, PendingReadTxn)>,
3293        mut cmd_rx: mpsc::UnboundedReceiver<(OpenTelemetryContext, Command)>,
3294        group_commit_rx: appends::GroupCommitWaiter,
3295    ) -> LocalBoxFuture<'static, ()> {
3296        async move {
3297            // Watcher that listens for and reports cluster service status changes.
3298            let mut cluster_events = self.controller.events_stream();
3299            let last_message = Arc::new(Mutex::new(LastMessage {
3300                kind: "none",
3301                stmt: None,
3302            }));
3303
3304            let (idle_tx, mut idle_rx) = tokio::sync::mpsc::channel(1);
3305            let idle_metric = self.metrics.queue_busy_seconds.clone();
3306            let last_message_watchdog = Arc::clone(&last_message);
3307
3308            spawn(|| "coord watchdog", async move {
3309                // Every 5 seconds, attempt to measure how long it takes for the
3310                // coord select loop to be empty, because this message is the last
3311                // processed. If it is idle, this will result in some microseconds
3312                // of measurement.
3313                let mut interval = tokio::time::interval(Duration::from_secs(5));
3314                // If we end up having to wait more than 5 seconds for the coord to respond, then the
3315                // behavior of Delay results in the interval "restarting" from whenever we yield
3316                // instead of trying to catch up.
3317                interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
3318
3319                // Track if we become stuck to de-dupe error reporting.
3320                let mut coord_stuck = false;
3321
3322                loop {
3323                    interval.tick().await;
3324
3325                    // Wait for space in the channel, if we timeout then the coordinator is stuck!
3326                    let duration = tokio::time::Duration::from_secs(30);
3327                    let timeout = tokio::time::timeout(duration, idle_tx.reserve()).await;
3328                    let Ok(maybe_permit) = timeout else {
3329                        // Only log if we're newly stuck, to prevent logging repeatedly.
3330                        if !coord_stuck {
3331                            let last_message = last_message_watchdog.lock().expect("poisoned");
3332                            tracing::warn!(
3333                                last_message_kind = %last_message.kind,
3334                                last_message_sql = %last_message.stmt_to_string(),
3335                                "coordinator stuck for {duration:?}",
3336                            );
3337                        }
3338                        coord_stuck = true;
3339
3340                        continue;
3341                    };
3342
3343                    // We got a permit, we're not stuck!
3344                    if coord_stuck {
3345                        tracing::info!("Coordinator became unstuck");
3346                    }
3347                    coord_stuck = false;
3348
3349                    // If we failed to acquire a permit it's because we're shutting down.
3350                    let Ok(permit) = maybe_permit else {
3351                        break;
3352                    };
3353
3354                    permit.send(idle_metric.start_timer());
3355                }
3356            });
3357
3358            self.schedule_storage_usage_collection().await;
3359            self.spawn_privatelink_vpc_endpoints_watch_task();
3360            self.spawn_statement_logging_task();
3361            flags::tracing_config(self.catalog.system_config()).apply(&self.tracing_handle);
3362
3363            // Report if the handling of a single message takes longer than this threshold.
3364            let warn_threshold = self
3365                .catalog()
3366                .system_config()
3367                .coord_slow_message_warn_threshold();
3368
3369            // How many messages we'd like to batch up before processing them. Must be > 0.
3370            const MESSAGE_BATCH: usize = 64;
3371            let mut messages = Vec::with_capacity(MESSAGE_BATCH);
3372            let mut cmd_messages = Vec::with_capacity(MESSAGE_BATCH);
3373
3374            let message_batch = self.metrics.message_batch.clone();
3375
3376            loop {
3377                // Before adding a branch to this select loop, please ensure that the branch is
3378                // cancellation safe and add a comment explaining why. You can refer here for more
3379                // info: https://docs.rs/tokio/latest/tokio/macro.select.html#cancellation-safety
3380                select! {
3381                    // We prioritize internal commands over other commands. However, we work through
3382                    // batches of commands in some branches of this select, which means that even if
3383                    // a command generates internal commands, we will work through the current batch
3384                    // before receiving a new batch of commands.
3385                    biased;
3386
3387                    // `recv_many()` on `UnboundedReceiver` is cancellation safe:
3388                    // https://docs.rs/tokio/1.38.0/tokio/sync/mpsc/struct.UnboundedReceiver.html#cancel-safety-1
3389                    // Receive a batch of commands.
3390                    _ = internal_cmd_rx.recv_many(&mut messages, MESSAGE_BATCH) => {},
3391                    // `next()` on any stream is cancel-safe:
3392                    // https://docs.rs/tokio-stream/0.1.9/tokio_stream/trait.StreamExt.html#cancel-safety
3393                    // Receive a single command.
3394                    Some(event) = cluster_events.next() => messages.push(Message::ClusterEvent(event)),
3395                    // See [`mz_controller::Controller::Controller::ready`] for notes
3396                    // on why this is cancel-safe.
3397                    // Receive a single command.
3398                    () = self.controller.ready() => {
3399                        // NOTE: We don't get a `Readiness` back from `ready()`
3400                        // because the controller wants to keep it and it's not
3401                        // trivially `Clone` or `Copy`. Hence this accessor.
3402                        let controller = match self.controller.get_readiness() {
3403                            Readiness::Storage => ControllerReadiness::Storage,
3404                            Readiness::Compute => ControllerReadiness::Compute,
3405                            Readiness::Metrics(_) => ControllerReadiness::Metrics,
3406                            Readiness::Internal(_) => ControllerReadiness::Internal,
3407                            Readiness::NotReady => unreachable!("just signaled as ready"),
3408                        };
3409                        messages.push(Message::ControllerReady { controller });
3410                    }
3411                    // See [`appends::GroupCommitWaiter`] for notes on why this is cancel safe.
3412                    // Receive a single command.
3413                    permit = group_commit_rx.ready() => {
3414                        // If we happen to have batched exactly one user write, use
3415                        // that span so the `emit_trace_id_notice` hooks up.
3416                        // Otherwise, the best we can do is invent a new root span
3417                        // and make it follow from all the Spans in the pending
3418                        // writes.
3419                        let user_write_spans = self.pending_writes.iter().flat_map(|x| match x {
3420                            PendingWriteTxn::User{span, ..} => Some(span),
3421                            PendingWriteTxn::System{..} => None,
3422                        });
3423                        let span = match user_write_spans.exactly_one() {
3424                            Ok(span) => span.clone(),
3425                            Err(user_write_spans) => {
3426                                let span = info_span!(parent: None, "group_commit_notify");
3427                                for s in user_write_spans {
3428                                    span.follows_from(s);
3429                                }
3430                                span
3431                            }
3432                        };
3433                        messages.push(Message::GroupCommitInitiate(span, Some(permit)));
3434                    },
3435                    // `recv_many()` on `UnboundedReceiver` is cancellation safe:
3436                    // https://docs.rs/tokio/1.38.0/tokio/sync/mpsc/struct.UnboundedReceiver.html#cancel-safety-1
3437                    // Receive a batch of commands.
3438                    count = cmd_rx.recv_many(&mut cmd_messages, MESSAGE_BATCH) => {
3439                        if count == 0 {
3440                            break;
3441                        } else {
3442                            messages.extend(cmd_messages.drain(..).map(|(otel_ctx, cmd)| Message::Command(otel_ctx, cmd)));
3443                        }
3444                    },
3445                    // `recv()` on `UnboundedReceiver` is cancellation safe:
3446                    // https://docs.rs/tokio/1.38.0/tokio/sync/mpsc/struct.UnboundedReceiver.html#cancel-safety
3447                    // Receive a single command.
3448                    Some(pending_read_txn) = strict_serializable_reads_rx.recv() => {
3449                        let mut pending_read_txns = vec![pending_read_txn];
3450                        while let Ok(pending_read_txn) = strict_serializable_reads_rx.try_recv() {
3451                            pending_read_txns.push(pending_read_txn);
3452                        }
3453                        for (conn_id, pending_read_txn) in pending_read_txns {
3454                            let prev = self.pending_linearize_read_txns.insert(conn_id, pending_read_txn);
3455                            soft_assert_or_log!(
3456                                prev.is_none(),
3457                                "connections can not have multiple concurrent reads, prev: {prev:?}"
3458                            )
3459                        }
3460                        messages.push(Message::LinearizeReads);
3461                    }
3462                    // `tick()` on `Interval` is cancel-safe:
3463                    // https://docs.rs/tokio/1.19.2/tokio/time/struct.Interval.html#cancel-safety
3464                    // Receive a single command.
3465                    _ = self.advance_timelines_interval.tick() => {
3466                        let span = info_span!(parent: None, "coord::advance_timelines_interval");
3467                        span.follows_from(Span::current());
3468
3469                        // Group commit sends an `AdvanceTimelines` message when
3470                        // done, which is what downgrades read holds. In
3471                        // read-only mode we send this message directly because
3472                        // we're not doing group commits.
3473                        if self.controller.read_only() {
3474                            messages.push(Message::AdvanceTimelines);
3475                        } else {
3476                            messages.push(Message::GroupCommitInitiate(span, None));
3477                        }
3478                    },
3479                    // `tick()` on `Interval` is cancel-safe:
3480                    // https://docs.rs/tokio/1.19.2/tokio/time/struct.Interval.html#cancel-safety
3481                    // Receive a single command.
3482                    _ = self.check_cluster_scheduling_policies_interval.tick() => {
3483                        messages.push(Message::CheckSchedulingPolicies);
3484                    },
3485
3486                    // `tick()` on `Interval` is cancel-safe:
3487                    // https://docs.rs/tokio/1.19.2/tokio/time/struct.Interval.html#cancel-safety
3488                    // Receive a single command.
3489                    _ = self.caught_up_check_interval.tick() => {
3490                        // We do this directly on the main loop instead of
3491                        // firing off a message. We are still in read-only mode,
3492                        // so optimizing for latency, not blocking the main loop
3493                        // is not that important.
3494                        self.maybe_check_caught_up().await;
3495
3496                        continue;
3497                    },
3498
3499                    // Process the idle metric at the lowest priority to sample queue non-idle time.
3500                    // `recv()` on `Receiver` is cancellation safe:
3501                    // https://docs.rs/tokio/1.8.0/tokio/sync/mpsc/struct.Receiver.html#cancel-safety
3502                    // Receive a single command.
3503                    timer = idle_rx.recv() => {
3504                        timer.expect("does not drop").observe_duration();
3505                        self.metrics
3506                            .message_handling
3507                            .with_label_values(&["watchdog"])
3508                            .observe(0.0);
3509                        continue;
3510                    }
3511                };
3512
3513                // Observe the number of messages we're processing at once.
3514                message_batch.observe(f64::cast_lossy(messages.len()));
3515
3516                for msg in messages.drain(..) {
3517                    // All message processing functions trace. Start a parent span
3518                    // for them to make it easy to find slow messages.
3519                    let msg_kind = msg.kind();
3520                    let span = span!(
3521                        target: "mz_adapter::coord::handle_message_loop",
3522                        Level::INFO,
3523                        "coord::handle_message",
3524                        kind = msg_kind
3525                    );
3526                    let otel_context = span.context().span().span_context().clone();
3527
3528                    // Record the last kind of message in case we get stuck. For
3529                    // execute commands, we additionally stash the user's SQL,
3530                    // statement, so we can log it in case we get stuck.
3531                    *last_message.lock().expect("poisoned") = LastMessage {
3532                        kind: msg_kind,
3533                        stmt: match &msg {
3534                            Message::Command(
3535                                _,
3536                                Command::Execute {
3537                                    portal_name,
3538                                    session,
3539                                    ..
3540                                },
3541                            ) => session
3542                                .get_portal_unverified(portal_name)
3543                                .and_then(|p| p.stmt.as_ref().map(Arc::clone)),
3544                            _ => None,
3545                        },
3546                    };
3547
3548                    let start = Instant::now();
3549                    self.handle_message(msg).instrument(span).await;
3550                    let duration = start.elapsed();
3551
3552                    self.metrics
3553                        .message_handling
3554                        .with_label_values(&[msg_kind])
3555                        .observe(duration.as_secs_f64());
3556
3557                    // If something is _really_ slow, print a trace id for debugging, if OTEL is enabled.
3558                    if duration > warn_threshold {
3559                        let trace_id = otel_context.is_valid().then(|| otel_context.trace_id());
3560                        tracing::error!(
3561                            ?msg_kind,
3562                            ?trace_id,
3563                            ?duration,
3564                            "very slow coordinator message"
3565                        );
3566                    }
3567                }
3568            }
3569            // Try and cleanup as a best effort. There may be some async tasks out there holding a
3570            // reference that prevents us from cleaning up.
3571            if let Some(catalog) = Arc::into_inner(self.catalog) {
3572                catalog.expire().await;
3573            }
3574        }
3575        .boxed_local()
3576    }
3577
3578    /// Obtain a read-only Catalog reference.
3579    fn catalog(&self) -> &Catalog {
3580        &self.catalog
3581    }
3582
3583    /// Obtain a read-only Catalog snapshot, suitable for giving out to
3584    /// non-Coordinator thread tasks.
3585    fn owned_catalog(&self) -> Arc<Catalog> {
3586        Arc::clone(&self.catalog)
3587    }
3588
3589    /// Obtain a handle to the optimizer metrics, suitable for giving
3590    /// out to non-Coordinator thread tasks.
3591    fn optimizer_metrics(&self) -> OptimizerMetrics {
3592        self.optimizer_metrics.clone()
3593    }
3594
3595    /// Obtain a writeable Catalog reference.
3596    fn catalog_mut(&mut self) -> &mut Catalog {
3597        // make_mut will cause any other Arc references (from owned_catalog) to
3598        // continue to be valid by cloning the catalog, putting it in a new Arc,
3599        // which lives at self._catalog. If there are no other Arc references,
3600        // then no clone is made, and it returns a reference to the existing
3601        // object. This makes this method and owned_catalog both very cheap: at
3602        // most one clone per catalog mutation, but only if there's a read-only
3603        // reference to it.
3604        Arc::make_mut(&mut self.catalog)
3605    }
3606
3607    /// Obtain a reference to the coordinator's connection context.
3608    fn connection_context(&self) -> &ConnectionContext {
3609        self.controller.connection_context()
3610    }
3611
3612    /// Obtain a reference to the coordinator's secret reader, in an `Arc`.
3613    fn secrets_reader(&self) -> &Arc<dyn SecretsReader> {
3614        &self.connection_context().secrets_reader
3615    }
3616
3617    /// Publishes a notice message to all sessions.
3618    ///
3619    /// TODO(parkmycar): This code is dead, but is a nice parallel to [`Coordinator::broadcast_notice_tx`]
3620    /// so we keep it around.
3621    #[allow(dead_code)]
3622    pub(crate) fn broadcast_notice(&self, notice: AdapterNotice) {
3623        for meta in self.active_conns.values() {
3624            let _ = meta.notice_tx.send(notice.clone());
3625        }
3626    }
3627
3628    /// Returns a closure that will publish a notice to all sessions that were active at the time
3629    /// this method was called.
3630    pub(crate) fn broadcast_notice_tx(
3631        &self,
3632    ) -> Box<dyn FnOnce(AdapterNotice) -> () + Send + 'static> {
3633        let senders: Vec<_> = self
3634            .active_conns
3635            .values()
3636            .map(|meta| meta.notice_tx.clone())
3637            .collect();
3638        Box::new(move |notice| {
3639            for tx in senders {
3640                let _ = tx.send(notice.clone());
3641            }
3642        })
3643    }
3644
3645    pub(crate) fn active_conns(&self) -> &BTreeMap<ConnectionId, ConnMeta> {
3646        &self.active_conns
3647    }
3648
3649    #[instrument(level = "debug")]
3650    pub(crate) fn retire_execution(
3651        &mut self,
3652        reason: StatementEndedExecutionReason,
3653        ctx_extra: ExecuteContextExtra,
3654    ) {
3655        if let Some(uuid) = ctx_extra.retire() {
3656            self.end_statement_execution(uuid, reason);
3657        }
3658    }
3659
3660    /// Creates a new dataflow builder from the catalog and indexes in `self`.
3661    #[instrument(level = "debug")]
3662    pub fn dataflow_builder(&self, instance: ComputeInstanceId) -> DataflowBuilder<'_> {
3663        let compute = self
3664            .instance_snapshot(instance)
3665            .expect("compute instance does not exist");
3666        DataflowBuilder::new(self.catalog().state(), compute)
3667    }
3668
3669    /// Return a reference-less snapshot to the indicated compute instance.
3670    pub fn instance_snapshot(
3671        &self,
3672        id: ComputeInstanceId,
3673    ) -> Result<ComputeInstanceSnapshot, InstanceMissing> {
3674        ComputeInstanceSnapshot::new(&self.controller, id)
3675    }
3676
3677    /// Call into the compute controller to install a finalized dataflow, and
3678    /// initialize the read policies for its exported readable objects.
3679    pub(crate) async fn ship_dataflow(
3680        &mut self,
3681        dataflow: DataflowDescription<Plan>,
3682        instance: ComputeInstanceId,
3683        subscribe_target_replica: Option<ReplicaId>,
3684    ) {
3685        // We must only install read policies for indexes, not for sinks.
3686        // Sinks are write-only compute collections that don't have read policies.
3687        let export_ids = dataflow.exported_index_ids().collect();
3688
3689        self.controller
3690            .compute
3691            .create_dataflow(instance, dataflow, subscribe_target_replica)
3692            .unwrap_or_terminate("dataflow creation cannot fail");
3693
3694        self.initialize_compute_read_policies(export_ids, instance, CompactionWindow::Default)
3695            .await;
3696    }
3697
3698    /// Like `ship_dataflow`, but also await on builtin table updates.
3699    pub(crate) async fn ship_dataflow_and_notice_builtin_table_updates(
3700        &mut self,
3701        dataflow: DataflowDescription<Plan>,
3702        instance: ComputeInstanceId,
3703        notice_builtin_updates_fut: Option<BuiltinTableAppendNotify>,
3704    ) {
3705        if let Some(notice_builtin_updates_fut) = notice_builtin_updates_fut {
3706            let ship_dataflow_fut = self.ship_dataflow(dataflow, instance, None);
3707            let ((), ()) =
3708                futures::future::join(notice_builtin_updates_fut, ship_dataflow_fut).await;
3709        } else {
3710            self.ship_dataflow(dataflow, instance, None).await;
3711        }
3712    }
3713
3714    /// Install a _watch set_ in the controller that is automatically associated with the given
3715    /// connection id. The watchset will be automatically cleared if the connection terminates
3716    /// before the watchset completes.
3717    pub fn install_compute_watch_set(
3718        &mut self,
3719        conn_id: ConnectionId,
3720        objects: BTreeSet<GlobalId>,
3721        t: Timestamp,
3722        state: WatchSetResponse,
3723    ) {
3724        let ws_id = self.controller.install_compute_watch_set(objects, t);
3725        self.connection_watch_sets
3726            .entry(conn_id.clone())
3727            .or_default()
3728            .insert(ws_id);
3729        self.installed_watch_sets.insert(ws_id, (conn_id, state));
3730    }
3731
3732    /// Install a _watch set_ in the controller that is automatically associated with the given
3733    /// connection id. The watchset will be automatically cleared if the connection terminates
3734    /// before the watchset completes.
3735    pub fn install_storage_watch_set(
3736        &mut self,
3737        conn_id: ConnectionId,
3738        objects: BTreeSet<GlobalId>,
3739        t: Timestamp,
3740        state: WatchSetResponse,
3741    ) {
3742        let ws_id = self.controller.install_storage_watch_set(objects, t);
3743        self.connection_watch_sets
3744            .entry(conn_id.clone())
3745            .or_default()
3746            .insert(ws_id);
3747        self.installed_watch_sets.insert(ws_id, (conn_id, state));
3748    }
3749
3750    /// Cancels pending watchsets associated with the provided connection id.
3751    pub fn cancel_pending_watchsets(&mut self, conn_id: &ConnectionId) {
3752        if let Some(ws_ids) = self.connection_watch_sets.remove(conn_id) {
3753            for ws_id in ws_ids {
3754                self.installed_watch_sets.remove(&ws_id);
3755            }
3756        }
3757    }
3758
3759    /// Returns the state of the [`Coordinator`] formatted as JSON.
3760    ///
3761    /// The returned value is not guaranteed to be stable and may change at any point in time.
3762    pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
3763        // Note: We purposefully use the `Debug` formatting for the value of all fields in the
3764        // returned object as a tradeoff between usability and stability. `serde_json` will fail
3765        // to serialize an object if the keys aren't strings, so `Debug` formatting the values
3766        // prevents a future unrelated change from silently breaking this method.
3767
3768        let global_timelines: BTreeMap<_, _> = self
3769            .global_timelines
3770            .iter()
3771            .map(|(timeline, state)| (timeline.to_string(), format!("{state:?}")))
3772            .collect();
3773        let active_conns: BTreeMap<_, _> = self
3774            .active_conns
3775            .iter()
3776            .map(|(id, meta)| (id.unhandled().to_string(), format!("{meta:?}")))
3777            .collect();
3778        let txn_read_holds: BTreeMap<_, _> = self
3779            .txn_read_holds
3780            .iter()
3781            .map(|(id, capability)| (id.unhandled().to_string(), format!("{capability:?}")))
3782            .collect();
3783        let pending_peeks: BTreeMap<_, _> = self
3784            .pending_peeks
3785            .iter()
3786            .map(|(id, peek)| (id.to_string(), format!("{peek:?}")))
3787            .collect();
3788        let client_pending_peeks: BTreeMap<_, _> = self
3789            .client_pending_peeks
3790            .iter()
3791            .map(|(id, peek)| {
3792                let peek: BTreeMap<_, _> = peek
3793                    .iter()
3794                    .map(|(uuid, storage_id)| (uuid.to_string(), storage_id))
3795                    .collect();
3796                (id.to_string(), peek)
3797            })
3798            .collect();
3799        let pending_linearize_read_txns: BTreeMap<_, _> = self
3800            .pending_linearize_read_txns
3801            .iter()
3802            .map(|(id, read_txn)| (id.unhandled().to_string(), format!("{read_txn:?}")))
3803            .collect();
3804
3805        let map = serde_json::Map::from_iter([
3806            (
3807                "global_timelines".to_string(),
3808                serde_json::to_value(global_timelines)?,
3809            ),
3810            (
3811                "active_conns".to_string(),
3812                serde_json::to_value(active_conns)?,
3813            ),
3814            (
3815                "txn_read_holds".to_string(),
3816                serde_json::to_value(txn_read_holds)?,
3817            ),
3818            (
3819                "pending_peeks".to_string(),
3820                serde_json::to_value(pending_peeks)?,
3821            ),
3822            (
3823                "client_pending_peeks".to_string(),
3824                serde_json::to_value(client_pending_peeks)?,
3825            ),
3826            (
3827                "pending_linearize_read_txns".to_string(),
3828                serde_json::to_value(pending_linearize_read_txns)?,
3829            ),
3830            ("controller".to_string(), self.controller.dump().await?),
3831        ]);
3832        Ok(serde_json::Value::Object(map))
3833    }
3834
3835    /// Prune all storage usage events from the [`MZ_STORAGE_USAGE_BY_SHARD`] table that are older
3836    /// than `retention_period`.
3837    ///
3838    /// This method will read the entire contents of [`MZ_STORAGE_USAGE_BY_SHARD`] into memory
3839    /// which can be expensive.
3840    ///
3841    /// DO NOT call this method outside of startup. The safety of reading at the current oracle read
3842    /// timestamp and then writing at whatever the current write timestamp is (instead of
3843    /// `read_ts + 1`) relies on the fact that there are no outstanding writes during startup.
3844    ///
3845    /// Group commit, which this method uses to write the retractions, has builtin fencing, and we
3846    /// never commit retractions to [`MZ_STORAGE_USAGE_BY_SHARD`] outside of this method, which is
3847    /// only called once during startup. So we don't have to worry about double/invalid retractions.
3848    async fn prune_storage_usage_events_on_startup(&self, retention_period: Duration) {
3849        let item_id = self
3850            .catalog()
3851            .resolve_builtin_table(&MZ_STORAGE_USAGE_BY_SHARD);
3852        let global_id = self.catalog.get_entry(&item_id).latest_global_id();
3853        let read_ts = self.get_local_read_ts().await;
3854        let current_contents_fut = self
3855            .controller
3856            .storage_collections
3857            .snapshot(global_id, read_ts);
3858        let internal_cmd_tx = self.internal_cmd_tx.clone();
3859        spawn(|| "storage_usage_prune", async move {
3860            let mut current_contents = current_contents_fut
3861                .await
3862                .unwrap_or_terminate("cannot fail to fetch snapshot");
3863            differential_dataflow::consolidation::consolidate(&mut current_contents);
3864
3865            let cutoff_ts = u128::from(read_ts).saturating_sub(retention_period.as_millis());
3866            let mut expired = Vec::new();
3867            for (row, diff) in current_contents {
3868                assert_eq!(
3869                    diff, 1,
3870                    "consolidated contents should not contain retractions: ({row:#?}, {diff:#?})"
3871                );
3872                // This logic relies on the definition of `mz_storage_usage_by_shard` not changing.
3873                let collection_timestamp = row
3874                    .unpack()
3875                    .get(3)
3876                    .expect("definition of mz_storage_by_shard changed")
3877                    .unwrap_timestamptz();
3878                let collection_timestamp = collection_timestamp.timestamp_millis();
3879                let collection_timestamp: u128 = collection_timestamp
3880                    .try_into()
3881                    .expect("all collections happen after Jan 1 1970");
3882                if collection_timestamp < cutoff_ts {
3883                    debug!("pruning storage event {row:?}");
3884                    let builtin_update = BuiltinTableUpdate::row(item_id, row, Diff::MINUS_ONE);
3885                    expired.push(builtin_update);
3886                }
3887            }
3888
3889            // main thread has shut down.
3890            let _ = internal_cmd_tx.send(Message::StorageUsagePrune(expired));
3891        });
3892    }
3893
3894    fn current_credit_consumption_rate(&self) -> Numeric {
3895        self.catalog()
3896            .user_cluster_replicas()
3897            .filter_map(|replica| match &replica.config.location {
3898                ReplicaLocation::Managed(location) => Some(location.size_for_billing()),
3899                ReplicaLocation::Unmanaged(_) => None,
3900            })
3901            .map(|size| {
3902                self.catalog()
3903                    .cluster_replica_sizes()
3904                    .0
3905                    .get(size)
3906                    .expect("location size is validated against the cluster replica sizes")
3907                    .credits_per_hour
3908            })
3909            .sum()
3910    }
3911}
3912
3913#[cfg(test)]
3914impl Coordinator {
3915    #[allow(dead_code)]
3916    async fn verify_ship_dataflow_no_error(&mut self, dataflow: DataflowDescription<Plan>) {
3917        // `ship_dataflow_new` is not allowed to have a `Result` return because this function is
3918        // called after `catalog_transact`, after which no errors are allowed. This test exists to
3919        // prevent us from incorrectly teaching those functions how to return errors (which has
3920        // happened twice and is the motivation for this test).
3921
3922        // An arbitrary compute instance ID to satisfy the function calls below. Note that
3923        // this only works because this function will never run.
3924        let compute_instance = ComputeInstanceId::user(1).expect("1 is a valid ID");
3925
3926        let _: () = self.ship_dataflow(dataflow, compute_instance, None).await;
3927    }
3928}
3929
3930/// Contains information about the last message the [`Coordinator`] processed.
3931struct LastMessage {
3932    kind: &'static str,
3933    stmt: Option<Arc<Statement<Raw>>>,
3934}
3935
3936impl LastMessage {
3937    /// Returns a redacted version of the statement that is safe for logs.
3938    fn stmt_to_string(&self) -> Cow<'static, str> {
3939        self.stmt
3940            .as_ref()
3941            .map(|stmt| stmt.to_ast_string_redacted().into())
3942            .unwrap_or(Cow::Borrowed("<none>"))
3943    }
3944}
3945
3946impl fmt::Debug for LastMessage {
3947    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3948        f.debug_struct("LastMessage")
3949            .field("kind", &self.kind)
3950            .field("stmt", &self.stmt_to_string())
3951            .finish()
3952    }
3953}
3954
3955impl Drop for LastMessage {
3956    fn drop(&mut self) {
3957        // Only print the last message if we're currently panicking, otherwise we'd spam our logs.
3958        if std::thread::panicking() {
3959            // If we're panicking theres no guarantee `tracing` still works, so print to stderr.
3960            eprintln!("Coordinator panicking, dumping last message\n{self:?}",);
3961        }
3962    }
3963}
3964
3965/// Serves the coordinator based on the provided configuration.
3966///
3967/// For a high-level description of the coordinator, see the [crate
3968/// documentation](crate).
3969///
3970/// Returns a handle to the coordinator and a client to communicate with the
3971/// coordinator.
3972///
3973/// BOXED FUTURE: As of Nov 2023 the returned Future from this function was 42KB. This would
3974/// get stored on the stack which is bad for runtime performance, and blow up our stack usage.
3975/// Because of that we purposefully move this Future onto the heap (i.e. Box it).
3976pub fn serve(
3977    Config {
3978        controller_config,
3979        controller_envd_epoch,
3980        mut storage,
3981        audit_logs_iterator,
3982        timestamp_oracle_url,
3983        unsafe_mode,
3984        all_features,
3985        build_info,
3986        environment_id,
3987        metrics_registry,
3988        now,
3989        secrets_controller,
3990        cloud_resource_controller,
3991        cluster_replica_sizes,
3992        builtin_system_cluster_config,
3993        builtin_catalog_server_cluster_config,
3994        builtin_probe_cluster_config,
3995        builtin_support_cluster_config,
3996        builtin_analytics_cluster_config,
3997        system_parameter_defaults,
3998        availability_zones,
3999        storage_usage_client,
4000        storage_usage_collection_interval,
4001        storage_usage_retention_period,
4002        segment_client,
4003        egress_addresses,
4004        aws_account_id,
4005        aws_privatelink_availability_zones,
4006        connection_context,
4007        connection_limit_callback,
4008        remote_system_parameters,
4009        webhook_concurrency_limit,
4010        http_host_name,
4011        tracing_handle,
4012        read_only_controllers,
4013        caught_up_trigger: clusters_caught_up_trigger,
4014        helm_chart_version,
4015        license_key,
4016        external_login_password_mz_system,
4017    }: Config,
4018) -> BoxFuture<'static, Result<(Handle, Client), AdapterError>> {
4019    async move {
4020        let coord_start = Instant::now();
4021        info!("startup: coordinator init: beginning");
4022        info!("startup: coordinator init: preamble beginning");
4023
4024        // Initializing the builtins can be an expensive process and consume a lot of memory. We
4025        // forcibly initialize it early while the stack is relatively empty to avoid stack
4026        // overflows later.
4027        let _builtins = LazyLock::force(&BUILTINS_STATIC);
4028
4029        let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
4030        let (internal_cmd_tx, internal_cmd_rx) = mpsc::unbounded_channel();
4031        let (strict_serializable_reads_tx, strict_serializable_reads_rx) =
4032            mpsc::unbounded_channel();
4033
4034        // Validate and process availability zones.
4035        if !availability_zones.iter().all_unique() {
4036            coord_bail!("availability zones must be unique");
4037        }
4038
4039        let aws_principal_context = match (
4040            aws_account_id,
4041            connection_context.aws_external_id_prefix.clone(),
4042        ) {
4043            (Some(aws_account_id), Some(aws_external_id_prefix)) => Some(AwsPrincipalContext {
4044                aws_account_id,
4045                aws_external_id_prefix,
4046            }),
4047            _ => None,
4048        };
4049
4050        let aws_privatelink_availability_zones = aws_privatelink_availability_zones
4051            .map(|azs_vec| BTreeSet::from_iter(azs_vec.iter().cloned()));
4052
4053        info!(
4054            "startup: coordinator init: preamble complete in {:?}",
4055            coord_start.elapsed()
4056        );
4057        let oracle_init_start = Instant::now();
4058        info!("startup: coordinator init: timestamp oracle init beginning");
4059
4060        let pg_timestamp_oracle_config = timestamp_oracle_url
4061            .map(|pg_url| PostgresTimestampOracleConfig::new(&pg_url, &metrics_registry));
4062        let mut initial_timestamps =
4063            get_initial_oracle_timestamps(&pg_timestamp_oracle_config).await?;
4064
4065        // Insert an entry for the `EpochMilliseconds` timeline if one doesn't exist,
4066        // which will ensure that the timeline is initialized since it's required
4067        // by the system.
4068        initial_timestamps
4069            .entry(Timeline::EpochMilliseconds)
4070            .or_insert_with(mz_repr::Timestamp::minimum);
4071        let mut timestamp_oracles = BTreeMap::new();
4072        for (timeline, initial_timestamp) in initial_timestamps {
4073            Coordinator::ensure_timeline_state_with_initial_time(
4074                &timeline,
4075                initial_timestamp,
4076                now.clone(),
4077                pg_timestamp_oracle_config.clone(),
4078                &mut timestamp_oracles,
4079                read_only_controllers,
4080            )
4081            .await;
4082        }
4083
4084        // Opening the durable catalog uses one or more timestamps without communicating with
4085        // the timestamp oracle. Here we make sure to apply the catalog upper with the timestamp
4086        // oracle to linearize future operations with opening the catalog.
4087        let catalog_upper = storage.current_upper().await;
4088        // Choose a time at which to boot. This is used, for example, to prune
4089        // old storage usage data or migrate audit log entries.
4090        //
4091        // This time is usually the current system time, but with protection
4092        // against backwards time jumps, even across restarts.
4093        let epoch_millis_oracle = &timestamp_oracles
4094            .get(&Timeline::EpochMilliseconds)
4095            .expect("inserted above")
4096            .oracle;
4097
4098        let mut boot_ts = if read_only_controllers {
4099            let read_ts = epoch_millis_oracle.read_ts().await;
4100            std::cmp::max(read_ts, catalog_upper)
4101        } else {
4102            // Getting/applying a write timestamp bumps the write timestamp in the
4103            // oracle, which we're not allowed in read-only mode.
4104            epoch_millis_oracle.apply_write(catalog_upper).await;
4105            epoch_millis_oracle.write_ts().await.timestamp
4106        };
4107
4108        info!(
4109            "startup: coordinator init: timestamp oracle init complete in {:?}",
4110            oracle_init_start.elapsed()
4111        );
4112
4113        let catalog_open_start = Instant::now();
4114        info!("startup: coordinator init: catalog open beginning");
4115        let persist_client = controller_config
4116            .persist_clients
4117            .open(controller_config.persist_location.clone())
4118            .await
4119            .context("opening persist client")?;
4120        let builtin_item_migration_config =
4121            BuiltinItemMigrationConfig {
4122                persist_client: persist_client.clone(),
4123                read_only: read_only_controllers,
4124            }
4125        ;
4126        let OpenCatalogResult {
4127            mut catalog,
4128            migrated_storage_collections_0dt,
4129            new_builtin_collections,
4130            builtin_table_updates,
4131            cached_global_exprs,
4132            uncached_local_exprs,
4133        } = Catalog::open(mz_catalog::config::Config {
4134            storage,
4135            metrics_registry: &metrics_registry,
4136            state: mz_catalog::config::StateConfig {
4137                unsafe_mode,
4138                all_features,
4139                build_info,
4140                environment_id: environment_id.clone(),
4141                read_only: read_only_controllers,
4142                now: now.clone(),
4143                boot_ts: boot_ts.clone(),
4144                skip_migrations: false,
4145                cluster_replica_sizes,
4146                builtin_system_cluster_config,
4147                builtin_catalog_server_cluster_config,
4148                builtin_probe_cluster_config,
4149                builtin_support_cluster_config,
4150                builtin_analytics_cluster_config,
4151                system_parameter_defaults,
4152                remote_system_parameters,
4153                availability_zones,
4154                egress_addresses,
4155                aws_principal_context,
4156                aws_privatelink_availability_zones,
4157                connection_context,
4158                http_host_name,
4159                builtin_item_migration_config,
4160                persist_client: persist_client.clone(),
4161                enable_expression_cache_override: None,
4162                helm_chart_version,
4163                external_login_password_mz_system,
4164                license_key: license_key.clone(),
4165            },
4166        })
4167        .await?;
4168
4169        // Opening the catalog uses one or more timestamps, so push the boot timestamp up to the
4170        // current catalog upper.
4171        let catalog_upper = catalog.current_upper().await;
4172        boot_ts = std::cmp::max(boot_ts, catalog_upper);
4173
4174        if !read_only_controllers {
4175            epoch_millis_oracle.apply_write(boot_ts).await;
4176        }
4177
4178        info!(
4179            "startup: coordinator init: catalog open complete in {:?}",
4180            catalog_open_start.elapsed()
4181        );
4182
4183        let coord_thread_start = Instant::now();
4184        info!("startup: coordinator init: coordinator thread start beginning");
4185
4186        let session_id = catalog.config().session_id;
4187        let start_instant = catalog.config().start_instant;
4188
4189        // In order for the coordinator to support Rc and Refcell types, it cannot be
4190        // sent across threads. Spawn it in a thread and have this parent thread wait
4191        // for bootstrap completion before proceeding.
4192        let (bootstrap_tx, bootstrap_rx) = oneshot::channel();
4193        let handle = TokioHandle::current();
4194
4195        let metrics = Metrics::register_into(&metrics_registry);
4196        let metrics_clone = metrics.clone();
4197        let optimizer_metrics = OptimizerMetrics::register_into(
4198            &metrics_registry,
4199            catalog.system_config().optimizer_e2e_latency_warning_threshold(),
4200        );
4201        let segment_client_clone = segment_client.clone();
4202        let coord_now = now.clone();
4203        let advance_timelines_interval = tokio::time::interval(catalog.config().timestamp_interval);
4204        let mut check_scheduling_policies_interval = tokio::time::interval(
4205            catalog
4206                .system_config()
4207                .cluster_check_scheduling_policies_interval(),
4208        );
4209        check_scheduling_policies_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
4210
4211        let clusters_caught_up_check_interval = if read_only_controllers {
4212            let dyncfgs = catalog.system_config().dyncfgs();
4213            let interval = WITH_0DT_DEPLOYMENT_CAUGHT_UP_CHECK_INTERVAL.get(dyncfgs);
4214
4215            let mut interval = tokio::time::interval(interval);
4216            interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
4217            interval
4218        } else {
4219            // When not in read-only mode, we don't do hydration checks. But we
4220            // still have to provide _some_ interval. This is large enough that
4221            // it doesn't matter.
4222            //
4223            // TODO(aljoscha): We cannot use Duration::MAX right now because of
4224            // https://github.com/tokio-rs/tokio/issues/6634. Use that once it's
4225            // fixed for good.
4226            let mut interval = tokio::time::interval(Duration::from_secs(60 * 60));
4227            interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
4228            interval
4229        };
4230
4231        let clusters_caught_up_check =
4232            clusters_caught_up_trigger.map(|trigger| CaughtUpCheckContext {
4233                trigger,
4234                exclude_collections: new_builtin_collections.into_iter().collect(),
4235            });
4236
4237        if let Some(config) = pg_timestamp_oracle_config.as_ref() {
4238            // Apply settings from system vars as early as possible because some
4239            // of them are locked in right when an oracle is first opened!
4240            let pg_timestamp_oracle_params =
4241                flags::pg_timstamp_oracle_config(catalog.system_config());
4242            pg_timestamp_oracle_params.apply(config);
4243        }
4244
4245        // Register a callback so whenever the MAX_CONNECTIONS or SUPERUSER_RESERVED_CONNECTIONS
4246        // system variables change, we update our connection limits.
4247        let connection_limit_callback: Arc<dyn Fn(&SystemVars) + Send + Sync> =
4248            Arc::new(move |system_vars: &SystemVars| {
4249                let limit: u64 = system_vars.max_connections().cast_into();
4250                let superuser_reserved: u64 =
4251                    system_vars.superuser_reserved_connections().cast_into();
4252
4253                // If superuser_reserved > max_connections, prefer max_connections.
4254                //
4255                // In this scenario all normal users would be locked out because all connections
4256                // would be reserved for superusers so complain if this is the case.
4257                let superuser_reserved = if superuser_reserved >= limit {
4258                    tracing::warn!(
4259                        "superuser_reserved ({superuser_reserved}) is greater than max connections ({limit})!"
4260                    );
4261                    limit
4262                } else {
4263                    superuser_reserved
4264                };
4265
4266                (connection_limit_callback)(limit, superuser_reserved);
4267            });
4268        catalog.system_config_mut().register_callback(
4269            &mz_sql::session::vars::MAX_CONNECTIONS,
4270            Arc::clone(&connection_limit_callback),
4271        );
4272        catalog.system_config_mut().register_callback(
4273            &mz_sql::session::vars::SUPERUSER_RESERVED_CONNECTIONS,
4274            connection_limit_callback,
4275        );
4276
4277        let (group_commit_tx, group_commit_rx) = appends::notifier();
4278
4279        let parent_span = tracing::Span::current();
4280        let thread = thread::Builder::new()
4281            // The Coordinator thread tends to keep a lot of data on its stack. To
4282            // prevent a stack overflow we allocate a stack three times as big as the default
4283            // stack.
4284            .stack_size(3 * stack::STACK_SIZE)
4285            .name("coordinator".to_string())
4286            .spawn(move || {
4287                let span = info_span!(parent: parent_span, "coord::coordinator").entered();
4288
4289                let controller = handle
4290                    .block_on({
4291                        catalog.initialize_controller(
4292                            controller_config,
4293                            controller_envd_epoch,
4294                            read_only_controllers,
4295                        )
4296                    })
4297                    .unwrap_or_terminate("failed to initialize storage_controller");
4298                // Initializing the controller uses one or more timestamps, so push the boot timestamp up to the
4299                // current catalog upper.
4300                let catalog_upper = handle.block_on(catalog.current_upper());
4301                boot_ts = std::cmp::max(boot_ts, catalog_upper);
4302                if !read_only_controllers {
4303                    let epoch_millis_oracle = &timestamp_oracles
4304                        .get(&Timeline::EpochMilliseconds)
4305                        .expect("inserted above")
4306                        .oracle;
4307                    handle.block_on(epoch_millis_oracle.apply_write(boot_ts));
4308                }
4309
4310                let catalog = Arc::new(catalog);
4311
4312                let caching_secrets_reader = CachingSecretsReader::new(secrets_controller.reader());
4313                let mut coord = Coordinator {
4314                    controller,
4315                    catalog,
4316                    internal_cmd_tx,
4317                    group_commit_tx,
4318                    strict_serializable_reads_tx,
4319                    global_timelines: timestamp_oracles,
4320                    transient_id_gen: Arc::new(TransientIdGen::new()),
4321                    active_conns: BTreeMap::new(),
4322                    txn_read_holds: Default::default(),
4323                    pending_peeks: BTreeMap::new(),
4324                    client_pending_peeks: BTreeMap::new(),
4325                    pending_linearize_read_txns: BTreeMap::new(),
4326                    serialized_ddl: LockedVecDeque::new(),
4327                    active_compute_sinks: BTreeMap::new(),
4328                    active_webhooks: BTreeMap::new(),
4329                    active_copies: BTreeMap::new(),
4330                    staged_cancellation: BTreeMap::new(),
4331                    introspection_subscribes: BTreeMap::new(),
4332                    write_locks: BTreeMap::new(),
4333                    deferred_write_ops: BTreeMap::new(),
4334                    pending_writes: Vec::new(),
4335                    advance_timelines_interval,
4336                    secrets_controller,
4337                    caching_secrets_reader,
4338                    cloud_resource_controller,
4339                    storage_usage_client,
4340                    storage_usage_collection_interval,
4341                    segment_client,
4342                    metrics,
4343                    optimizer_metrics,
4344                    tracing_handle,
4345                    statement_logging: StatementLogging::new(coord_now.clone()),
4346                    webhook_concurrency_limit,
4347                    pg_timestamp_oracle_config,
4348                    check_cluster_scheduling_policies_interval: check_scheduling_policies_interval,
4349                    cluster_scheduling_decisions: BTreeMap::new(),
4350                    caught_up_check_interval: clusters_caught_up_check_interval,
4351                    caught_up_check: clusters_caught_up_check,
4352                    installed_watch_sets: BTreeMap::new(),
4353                    connection_watch_sets: BTreeMap::new(),
4354                    cluster_replica_statuses: ClusterReplicaStatuses::new(),
4355                    read_only_controllers,
4356                    buffered_builtin_table_updates: Some(Vec::new()),
4357                    license_key,
4358                    persist_client,
4359                };
4360                let bootstrap = handle.block_on(async {
4361                    coord
4362                        .bootstrap(
4363                            boot_ts,
4364                            migrated_storage_collections_0dt,
4365                            builtin_table_updates,
4366                            cached_global_exprs,
4367                            uncached_local_exprs,
4368                            audit_logs_iterator,
4369                        )
4370                        .await?;
4371                    coord
4372                        .controller
4373                        .remove_orphaned_replicas(
4374                            coord.catalog().get_next_user_replica_id().await?,
4375                            coord.catalog().get_next_system_replica_id().await?,
4376                        )
4377                        .await
4378                        .map_err(AdapterError::Orchestrator)?;
4379
4380                    if let Some(retention_period) = storage_usage_retention_period {
4381                        coord
4382                            .prune_storage_usage_events_on_startup(retention_period)
4383                            .await;
4384                    }
4385
4386                    Ok(())
4387                });
4388                let ok = bootstrap.is_ok();
4389                drop(span);
4390                bootstrap_tx
4391                    .send(bootstrap)
4392                    .expect("bootstrap_rx is not dropped until it receives this message");
4393                if ok {
4394                    handle.block_on(coord.serve(
4395                        internal_cmd_rx,
4396                        strict_serializable_reads_rx,
4397                        cmd_rx,
4398                        group_commit_rx,
4399                    ));
4400                }
4401            })
4402            .expect("failed to create coordinator thread");
4403        match bootstrap_rx
4404            .await
4405            .expect("bootstrap_tx always sends a message or panics/halts")
4406        {
4407            Ok(()) => {
4408                info!(
4409                    "startup: coordinator init: coordinator thread start complete in {:?}",
4410                    coord_thread_start.elapsed()
4411                );
4412                info!(
4413                    "startup: coordinator init: complete in {:?}",
4414                    coord_start.elapsed()
4415                );
4416                let handle = Handle {
4417                    session_id,
4418                    start_instant,
4419                    _thread: thread.join_on_drop(),
4420                };
4421                let client = Client::new(
4422                    build_info,
4423                    cmd_tx.clone(),
4424                    metrics_clone,
4425                    now,
4426                    environment_id,
4427                    segment_client_clone,
4428                );
4429                Ok((handle, client))
4430            }
4431            Err(e) => Err(e),
4432        }
4433    }
4434    .boxed()
4435}
4436
4437// Determines and returns the highest timestamp for each timeline, for all known
4438// timestamp oracle implementations.
4439//
4440// Initially, we did this so that we can switch between implementations of
4441// timestamp oracle, but now we also do this to determine a monotonic boot
4442// timestamp, a timestamp that does not regress across reboots.
4443//
4444// This mostly works, but there can be linearizability violations, because there
4445// is no central moment where we do distributed coordination for all oracle
4446// types. Working around this seems prohibitively hard, maybe even impossible so
4447// we have to live with this window of potential violations during the upgrade
4448// window (which is the only point where we should switch oracle
4449// implementations).
4450async fn get_initial_oracle_timestamps(
4451    pg_timestamp_oracle_config: &Option<PostgresTimestampOracleConfig>,
4452) -> Result<BTreeMap<Timeline, Timestamp>, AdapterError> {
4453    let mut initial_timestamps = BTreeMap::new();
4454
4455    if let Some(pg_timestamp_oracle_config) = pg_timestamp_oracle_config {
4456        let postgres_oracle_timestamps =
4457            PostgresTimestampOracle::<NowFn>::get_all_timelines(pg_timestamp_oracle_config.clone())
4458                .await?;
4459
4460        let debug_msg = || {
4461            postgres_oracle_timestamps
4462                .iter()
4463                .map(|(timeline, ts)| format!("{:?} -> {}", timeline, ts))
4464                .join(", ")
4465        };
4466        info!(
4467            "current timestamps from the postgres-backed timestamp oracle: {}",
4468            debug_msg()
4469        );
4470
4471        for (timeline, ts) in postgres_oracle_timestamps {
4472            let entry = initial_timestamps
4473                .entry(Timeline::from_str(&timeline).expect("could not parse timeline"));
4474
4475            entry
4476                .and_modify(|current_ts| *current_ts = std::cmp::max(*current_ts, ts))
4477                .or_insert(ts);
4478        }
4479    } else {
4480        info!("no postgres url for postgres-backed timestamp oracle configured!");
4481    };
4482
4483    let debug_msg = || {
4484        initial_timestamps
4485            .iter()
4486            .map(|(timeline, ts)| format!("{:?}: {}", timeline, ts))
4487            .join(", ")
4488    };
4489    info!("initial oracle timestamps: {}", debug_msg());
4490
4491    Ok(initial_timestamps)
4492}
4493
4494#[instrument]
4495pub async fn load_remote_system_parameters(
4496    storage: &mut Box<dyn OpenableDurableCatalogState>,
4497    system_parameter_sync_config: Option<SystemParameterSyncConfig>,
4498    system_parameter_sync_timeout: Duration,
4499) -> Result<Option<BTreeMap<String, String>>, AdapterError> {
4500    if let Some(system_parameter_sync_config) = system_parameter_sync_config {
4501        tracing::info!("parameter sync on boot: start sync");
4502
4503        // We intentionally block initial startup, potentially forever,
4504        // on initializing LaunchDarkly. This may seem scary, but the
4505        // alternative is even scarier. Over time, we expect that the
4506        // compiled-in default values for the system parameters will
4507        // drift substantially from the defaults configured in
4508        // LaunchDarkly, to the point that starting an environment
4509        // without loading the latest values from LaunchDarkly will
4510        // result in running an untested configuration.
4511        //
4512        // Note this only applies during initial startup. Restarting
4513        // after we've synced once only blocks for a maximum of
4514        // `FRONTEND_SYNC_TIMEOUT` on LaunchDarkly, as it seems
4515        // reasonable to assume that the last-synced configuration was
4516        // valid enough.
4517        //
4518        // This philosophy appears to provide a good balance between not
4519        // running untested configurations in production while also not
4520        // making LaunchDarkly a "tier 1" dependency for existing
4521        // environments.
4522        //
4523        // If this proves to be an issue, we could seek to address the
4524        // configuration drift in a different way--for example, by
4525        // writing a script that runs in CI nightly and checks for
4526        // deviation between the compiled Rust code and LaunchDarkly.
4527        //
4528        // If it is absolutely necessary to bring up a new environment
4529        // while LaunchDarkly is down, the following manual mitigation
4530        // can be performed:
4531        //
4532        //    1. Edit the environmentd startup parameters to omit the
4533        //       LaunchDarkly configuration.
4534        //    2. Boot environmentd.
4535        //    3. Use the catalog-debug tool to run `edit config "{\"key\":\"system_config_synced\"}" "{\"value\": 1}"`.
4536        //    4. Adjust any other parameters as necessary to avoid
4537        //       running a nonstandard configuration in production.
4538        //    5. Edit the environmentd startup parameters to restore the
4539        //       LaunchDarkly configuration, for when LaunchDarkly comes
4540        //       back online.
4541        //    6. Reboot environmentd.
4542        let mut params = SynchronizedParameters::new(SystemVars::default());
4543        let frontend_sync = async {
4544            let frontend = SystemParameterFrontend::from(&system_parameter_sync_config).await?;
4545            frontend.pull(&mut params);
4546            let ops = params
4547                .modified()
4548                .into_iter()
4549                .map(|param| {
4550                    let name = param.name;
4551                    let value = param.value;
4552                    tracing::info!(name, value, initial = true, "sync parameter");
4553                    (name, value)
4554                })
4555                .collect();
4556            tracing::info!("parameter sync on boot: end sync");
4557            Ok(Some(ops))
4558        };
4559        if !storage.has_system_config_synced_once().await? {
4560            frontend_sync.await
4561        } else {
4562            match mz_ore::future::timeout(system_parameter_sync_timeout, frontend_sync).await {
4563                Ok(ops) => Ok(ops),
4564                Err(TimeoutError::Inner(e)) => Err(e),
4565                Err(TimeoutError::DeadlineElapsed) => {
4566                    tracing::info!("parameter sync on boot: sync has timed out");
4567                    Ok(None)
4568                }
4569            }
4570        }
4571    } else {
4572        Ok(None)
4573    }
4574}
4575
4576#[derive(Debug)]
4577pub enum WatchSetResponse {
4578    StatementDependenciesReady(StatementLoggingId, StatementLifecycleEvent),
4579    AlterSinkReady(AlterSinkReadyContext),
4580}
4581
4582#[derive(Debug)]
4583pub struct AlterSinkReadyContext {
4584    ctx: Option<ExecuteContext>,
4585    otel_ctx: OpenTelemetryContext,
4586    plan: AlterSinkPlan,
4587    plan_validity: PlanValidity,
4588    read_hold: ReadHolds<Timestamp>,
4589}
4590
4591impl AlterSinkReadyContext {
4592    fn ctx(&mut self) -> &mut ExecuteContext {
4593        self.ctx.as_mut().expect("only cleared on drop")
4594    }
4595
4596    fn retire(mut self, result: Result<ExecuteResponse, AdapterError>) {
4597        self.ctx
4598            .take()
4599            .expect("only cleared on drop")
4600            .retire(result);
4601    }
4602}
4603
4604impl Drop for AlterSinkReadyContext {
4605    fn drop(&mut self) {
4606        if let Some(ctx) = self.ctx.take() {
4607            ctx.retire(Err(AdapterError::Canceled));
4608        }
4609    }
4610}
4611
4612/// A struct for tracking the ownership of a lock and a VecDeque to store to-be-done work after the
4613/// lock is freed.
4614#[derive(Debug)]
4615struct LockedVecDeque<T> {
4616    items: VecDeque<T>,
4617    lock: Arc<tokio::sync::Mutex<()>>,
4618}
4619
4620impl<T> LockedVecDeque<T> {
4621    pub fn new() -> Self {
4622        Self {
4623            items: VecDeque::new(),
4624            lock: Arc::new(tokio::sync::Mutex::new(())),
4625        }
4626    }
4627
4628    pub fn try_lock_owned(&self) -> Result<OwnedMutexGuard<()>, tokio::sync::TryLockError> {
4629        Arc::clone(&self.lock).try_lock_owned()
4630    }
4631
4632    pub fn is_empty(&self) -> bool {
4633        self.items.is_empty()
4634    }
4635
4636    pub fn push_back(&mut self, value: T) {
4637        self.items.push_back(value)
4638    }
4639
4640    pub fn pop_front(&mut self) -> Option<T> {
4641        self.items.pop_front()
4642    }
4643
4644    pub fn remove(&mut self, index: usize) -> Option<T> {
4645        self.items.remove(index)
4646    }
4647
4648    pub fn iter(&self) -> std::collections::vec_deque::Iter<'_, T> {
4649        self.items.iter()
4650    }
4651}
4652
4653#[derive(Debug)]
4654struct DeferredPlanStatement {
4655    ctx: ExecuteContext,
4656    ps: PlanStatement,
4657}
4658
4659#[derive(Debug)]
4660enum PlanStatement {
4661    Statement {
4662        stmt: Arc<Statement<Raw>>,
4663        params: Params,
4664    },
4665    Plan {
4666        plan: mz_sql::plan::Plan,
4667        resolved_ids: ResolvedIds,
4668    },
4669}
4670
4671#[derive(Debug, Error)]
4672pub enum NetworkPolicyError {
4673    #[error("Access denied for address {0}")]
4674    AddressDenied(IpAddr),
4675    #[error("Access denied missing IP address")]
4676    MissingIp,
4677}
4678
4679pub(crate) fn validate_ip_with_policy_rules(
4680    ip: &IpAddr,
4681    rules: &Vec<NetworkPolicyRule>,
4682) -> Result<(), NetworkPolicyError> {
4683    // At the moment we're not handling action or direction
4684    // as those are only able to be "allow" and "ingress" respectively
4685    if rules.iter().any(|r| r.address.0.contains(ip)) {
4686        Ok(())
4687    } else {
4688        Err(NetworkPolicyError::AddressDenied(ip.clone()))
4689    }
4690}