mz_adapter/
coord.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Translation of SQL commands into timestamped `Controller` commands.
11//!
12//! The various SQL commands instruct the system to take actions that are not
13//! yet explicitly timestamped. On the other hand, the underlying data continually
14//! change as time moves forward. On the third hand, we greatly benefit from the
15//! information that some times are no longer of interest, so that we may
16//! compact the representation of the continually changing collections.
17//!
18//! The [`Coordinator`] curates these interactions by observing the progress
19//! collections make through time, choosing timestamps for its own commands,
20//! and eventually communicating that certain times have irretrievably "passed".
21//!
22//! ## Frontiers another way
23//!
24//! If the above description of frontiers left you with questions, this
25//! repackaged explanation might help.
26//!
27//! - `since` is the least recent time (i.e. oldest time) that you can read
28//!   from sources and be guaranteed that the returned data is accurate as of
29//!   that time.
30//!
31//!   Reads at times less than `since` may return values that were not actually
32//!   seen at the specified time, but arrived later (i.e. the results are
33//!   compacted).
34//!
35//!   For correctness' sake, the coordinator never chooses to read at a time
36//!   less than an arrangement's `since`.
37//!
38//! - `upper` is the first time after the most recent time that you can read
39//!   from sources and receive an immediate response. Alternately, it is the
40//!   least time at which the data may still change (that is the reason we may
41//!   not be able to respond immediately).
42//!
43//!   Reads at times >= `upper` may not immediately return because the answer
44//!   isn't known yet. However, once the `upper` is > the specified read time,
45//!   the read can return.
46//!
47//!   For the sake of returned values' freshness, the coordinator prefers
48//!   performing reads at an arrangement's `upper`. However, because we more
49//!   strongly prefer correctness, the coordinator will choose timestamps
50//!   greater than an object's `upper` if it is also being accessed alongside
51//!   objects whose `since` times are >= its `upper`.
52//!
53//! This illustration attempts to show, with time moving left to right, the
54//! relationship between `since` and `upper`.
55//!
56//! - `#`: possibly inaccurate results
57//! - `-`: immediate, correct response
58//! - `?`: not yet known
59//! - `s`: since
60//! - `u`: upper
61//! - `|`: eligible for coordinator to select
62//!
63//! ```nofmt
64//! ####s----u?????
65//!     |||||||||||
66//! ```
67//!
68
69use std::borrow::Cow;
70use std::collections::{BTreeMap, BTreeSet, VecDeque};
71use std::fmt;
72use std::net::IpAddr;
73use std::num::NonZeroI64;
74use std::ops::Neg;
75use std::str::FromStr;
76use std::sync::LazyLock;
77use std::sync::{Arc, Mutex};
78use std::thread;
79use std::time::{Duration, Instant};
80
81use anyhow::Context;
82use chrono::{DateTime, Utc};
83use derivative::Derivative;
84use differential_dataflow::lattice::Lattice;
85use fail::fail_point;
86use futures::StreamExt;
87use futures::future::{BoxFuture, FutureExt, LocalBoxFuture};
88use http::Uri;
89use ipnet::IpNet;
90use itertools::{Either, Itertools};
91use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
92use mz_adapter_types::compaction::CompactionWindow;
93use mz_adapter_types::connection::ConnectionId;
94use mz_adapter_types::dyncfgs::WITH_0DT_DEPLOYMENT_CAUGHT_UP_CHECK_INTERVAL;
95use mz_build_info::BuildInfo;
96use mz_catalog::builtin::{BUILTINS, BUILTINS_STATIC, MZ_AUDIT_EVENTS, MZ_STORAGE_USAGE_BY_SHARD};
97use mz_catalog::config::{AwsPrincipalContext, BuiltinItemMigrationConfig, ClusterReplicaSizeMap};
98use mz_catalog::durable::{AuditLogIterator, OpenableDurableCatalogState};
99use mz_catalog::expr_cache::{GlobalExpressions, LocalExpressions};
100use mz_catalog::memory::objects::{
101    CatalogEntry, CatalogItem, ClusterReplicaProcessStatus, ClusterVariantManaged, Connection,
102    DataSourceDesc, StateDiff, StateUpdate, StateUpdateKind, Table, TableDataSource,
103};
104use mz_cloud_resources::{CloudResourceController, VpcEndpointConfig, VpcEndpointEvent};
105use mz_compute_client::as_of_selection;
106use mz_compute_client::controller::error::InstanceMissing;
107use mz_compute_types::ComputeInstanceId;
108use mz_compute_types::dataflows::DataflowDescription;
109use mz_compute_types::plan::Plan;
110use mz_controller::ControllerConfig;
111use mz_controller::clusters::{ClusterConfig, ClusterEvent, ClusterStatus, ProcessId};
112use mz_controller_types::{ClusterId, ReplicaId, WatchSetId};
113use mz_expr::{MapFilterProject, OptimizedMirRelationExpr, RowSetFinishing};
114use mz_license_keys::ValidatedLicenseKey;
115use mz_orchestrator::{OfflineReason, ServiceProcessMetrics};
116use mz_ore::cast::{CastFrom, CastInto, CastLossy};
117use mz_ore::channel::trigger::Trigger;
118use mz_ore::future::TimeoutError;
119use mz_ore::metrics::MetricsRegistry;
120use mz_ore::now::{EpochMillis, NowFn};
121use mz_ore::task::{JoinHandle, spawn};
122use mz_ore::thread::JoinHandleExt;
123use mz_ore::tracing::{OpenTelemetryContext, TracingHandle};
124use mz_ore::url::SensitiveUrl;
125use mz_ore::vec::VecExt;
126use mz_ore::{
127    assert_none, instrument, soft_assert_eq_or_log, soft_assert_or_log, soft_panic_or_log, stack,
128};
129use mz_persist_client::batch::ProtoBatch;
130use mz_persist_client::usage::{ShardsUsageReferenced, StorageUsageClient};
131use mz_repr::explain::{ExplainConfig, ExplainFormat};
132use mz_repr::global_id::TransientIdGen;
133use mz_repr::optimize::OptimizerFeatures;
134use mz_repr::role_id::RoleId;
135use mz_repr::{CatalogItemId, Diff, GlobalId, RelationDesc, Timestamp};
136use mz_secrets::cache::CachingSecretsReader;
137use mz_secrets::{SecretsController, SecretsReader};
138use mz_sql::ast::{Raw, Statement};
139use mz_sql::catalog::{CatalogCluster, EnvironmentId};
140use mz_sql::names::{QualifiedItemName, ResolvedIds, SchemaSpecifier};
141use mz_sql::optimizer_metrics::OptimizerMetrics;
142use mz_sql::plan::{
143    self, AlterSinkPlan, ConnectionDetails, CreateConnectionPlan, NetworkPolicyRule,
144    OnTimeoutAction, Params, QueryWhen,
145};
146use mz_sql::session::user::User;
147use mz_sql::session::vars::SystemVars;
148use mz_sql_parser::ast::ExplainStage;
149use mz_sql_parser::ast::display::AstDisplay;
150use mz_storage_client::client::TableData;
151use mz_storage_client::controller::{CollectionDescription, DataSource, ExportDescription};
152use mz_storage_types::connections::Connection as StorageConnection;
153use mz_storage_types::connections::ConnectionContext;
154use mz_storage_types::connections::inline::{IntoInlineConnection, ReferencedConnection};
155use mz_storage_types::read_holds::ReadHold;
156use mz_storage_types::sinks::{S3SinkFormat, StorageSinkDesc};
157use mz_storage_types::sources::Timeline;
158use mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC;
159use mz_timestamp_oracle::WriteTimestamp;
160use mz_timestamp_oracle::postgres_oracle::{
161    PostgresTimestampOracle, PostgresTimestampOracleConfig,
162};
163use mz_transform::dataflow::DataflowMetainfo;
164use opentelemetry::trace::TraceContextExt;
165use serde::Serialize;
166use thiserror::Error;
167use timely::progress::{Antichain, Timestamp as _};
168use tokio::runtime::Handle as TokioHandle;
169use tokio::select;
170use tokio::sync::{OwnedMutexGuard, mpsc, oneshot, watch};
171use tokio::time::{Interval, MissedTickBehavior};
172use tracing::{Instrument, Level, Span, debug, info, info_span, span, warn};
173use tracing_opentelemetry::OpenTelemetrySpanExt;
174use uuid::Uuid;
175
176use crate::active_compute_sink::{ActiveComputeSink, ActiveCopyFrom};
177use crate::catalog::{BuiltinTableUpdate, Catalog, OpenCatalogResult};
178use crate::client::{Client, Handle};
179use crate::command::{Command, ExecuteResponse};
180use crate::config::{SynchronizedParameters, SystemParameterFrontend, SystemParameterSyncConfig};
181use crate::coord::appends::{
182    BuiltinTableAppendNotify, DeferredOp, GroupCommitPermit, PendingWriteTxn,
183};
184use crate::coord::caught_up::CaughtUpCheckContext;
185use crate::coord::cluster_scheduling::SchedulingDecision;
186use crate::coord::id_bundle::CollectionIdBundle;
187use crate::coord::introspection::IntrospectionSubscribe;
188use crate::coord::peek::PendingPeek;
189use crate::coord::statement_logging::{StatementLogging, StatementLoggingId};
190use crate::coord::timeline::{TimelineContext, TimelineState};
191use crate::coord::timestamp_selection::{TimestampContext, TimestampDetermination};
192use crate::coord::validity::PlanValidity;
193use crate::error::AdapterError;
194use crate::explain::insights::PlanInsightsContext;
195use crate::explain::optimizer_trace::{DispatchGuard, OptimizerTrace};
196use crate::metrics::Metrics;
197use crate::optimize::dataflows::{
198    ComputeInstanceSnapshot, DataflowBuilder, dataflow_import_id_bundle,
199};
200use crate::optimize::{self, Optimize, OptimizerConfig};
201use crate::session::{EndTransactionAction, Session};
202use crate::statement_logging::{StatementEndedExecutionReason, StatementLifecycleEvent};
203use crate::util::{ClientTransmitter, ResultExt};
204use crate::webhook::{WebhookAppenderInvalidator, WebhookConcurrencyLimiter};
205use crate::{AdapterNotice, ReadHolds, flags};
206
207pub(crate) mod id_bundle;
208pub(crate) mod in_memory_oracle;
209pub(crate) mod peek;
210pub(crate) mod statement_logging;
211pub(crate) mod timeline;
212pub(crate) mod timestamp_selection;
213
214pub mod appends;
215mod catalog_serving;
216mod caught_up;
217pub mod cluster_scheduling;
218mod command_handler;
219pub mod consistency;
220mod ddl;
221mod indexes;
222mod introspection;
223mod message_handler;
224mod privatelink_status;
225pub mod read_policy;
226mod sequencer;
227mod sql;
228mod validity;
229
230#[derive(Debug)]
231pub enum Message {
232    Command(OpenTelemetryContext, Command),
233    ControllerReady,
234    PurifiedStatementReady(PurifiedStatementReady),
235    CreateConnectionValidationReady(CreateConnectionValidationReady),
236    AlterConnectionValidationReady(AlterConnectionValidationReady),
237    TryDeferred {
238        /// The connection that created this op.
239        conn_id: ConnectionId,
240        /// The write lock that notified us our deferred op might be able to run.
241        ///
242        /// Note: While we never want to hold a partial set of locks, it can be important to hold
243        /// onto the _one_ that notified us our op might be ready. If there are multiple operations
244        /// waiting on a single collection, and we don't hold this lock through retyring the op,
245        /// then everything waiting on this collection will get retried causing traffic in the
246        /// Coordinator's message queue.
247        ///
248        /// See [`DeferredOp::can_be_optimistically_retried`] for more detail.
249        acquired_lock: Option<(CatalogItemId, tokio::sync::OwnedMutexGuard<()>)>,
250    },
251    /// Initiates a group commit.
252    GroupCommitInitiate(Span, Option<GroupCommitPermit>),
253    DeferredStatementReady,
254    AdvanceTimelines,
255    ClusterEvent(ClusterEvent),
256    CancelPendingPeeks {
257        conn_id: ConnectionId,
258    },
259    LinearizeReads,
260    StagedBatches {
261        conn_id: ConnectionId,
262        table_id: CatalogItemId,
263        batches: Vec<Result<ProtoBatch, String>>,
264    },
265    StorageUsageSchedule,
266    StorageUsageFetch,
267    StorageUsageUpdate(ShardsUsageReferenced),
268    StorageUsagePrune(Vec<BuiltinTableUpdate>),
269    /// Performs any cleanup and logging actions necessary for
270    /// finalizing a statement execution.
271    RetireExecute {
272        data: ExecuteContextExtra,
273        otel_ctx: OpenTelemetryContext,
274        reason: StatementEndedExecutionReason,
275    },
276    ExecuteSingleStatementTransaction {
277        ctx: ExecuteContext,
278        otel_ctx: OpenTelemetryContext,
279        stmt: Arc<Statement<Raw>>,
280        params: mz_sql::plan::Params,
281    },
282    PeekStageReady {
283        ctx: ExecuteContext,
284        span: Span,
285        stage: PeekStage,
286    },
287    CreateIndexStageReady {
288        ctx: ExecuteContext,
289        span: Span,
290        stage: CreateIndexStage,
291    },
292    CreateViewStageReady {
293        ctx: ExecuteContext,
294        span: Span,
295        stage: CreateViewStage,
296    },
297    CreateMaterializedViewStageReady {
298        ctx: ExecuteContext,
299        span: Span,
300        stage: CreateMaterializedViewStage,
301    },
302    SubscribeStageReady {
303        ctx: ExecuteContext,
304        span: Span,
305        stage: SubscribeStage,
306    },
307    IntrospectionSubscribeStageReady {
308        span: Span,
309        stage: IntrospectionSubscribeStage,
310    },
311    SecretStageReady {
312        ctx: ExecuteContext,
313        span: Span,
314        stage: SecretStage,
315    },
316    ClusterStageReady {
317        ctx: ExecuteContext,
318        span: Span,
319        stage: ClusterStage,
320    },
321    ExplainTimestampStageReady {
322        ctx: ExecuteContext,
323        span: Span,
324        stage: ExplainTimestampStage,
325    },
326    DrainStatementLog,
327    PrivateLinkVpcEndpointEvents(Vec<VpcEndpointEvent>),
328    CheckSchedulingPolicies,
329
330    /// Scheduling policy decisions about turning clusters On/Off.
331    /// `Vec<(policy name, Vec of decisions by the policy)>`
332    /// A cluster will be On if and only if there is at least one On decision for it.
333    /// Scheduling decisions for clusters that have `SCHEDULE = MANUAL` are ignored.
334    SchedulingDecisions(Vec<(&'static str, Vec<(ClusterId, SchedulingDecision)>)>),
335}
336
337impl Message {
338    /// Returns a string to identify the kind of [`Message`], useful for logging.
339    pub const fn kind(&self) -> &'static str {
340        match self {
341            Message::Command(_, msg) => match msg {
342                Command::CatalogSnapshot { .. } => "command-catalog_snapshot",
343                Command::Startup { .. } => "command-startup",
344                Command::Execute { .. } => "command-execute",
345                Command::Commit { .. } => "command-commit",
346                Command::CancelRequest { .. } => "command-cancel_request",
347                Command::PrivilegedCancelRequest { .. } => "command-privileged_cancel_request",
348                Command::GetWebhook { .. } => "command-get_webhook",
349                Command::GetSystemVars { .. } => "command-get_system_vars",
350                Command::SetSystemVars { .. } => "command-set_system_vars",
351                Command::Terminate { .. } => "command-terminate",
352                Command::RetireExecute { .. } => "command-retire_execute",
353                Command::CheckConsistency { .. } => "command-check_consistency",
354                Command::Dump { .. } => "command-dump",
355                Command::AuthenticatePassword { .. } => "command-auth_check",
356            },
357            Message::ControllerReady => "controller_ready",
358            Message::PurifiedStatementReady(_) => "purified_statement_ready",
359            Message::CreateConnectionValidationReady(_) => "create_connection_validation_ready",
360            Message::TryDeferred { .. } => "try_deferred",
361            Message::GroupCommitInitiate(..) => "group_commit_initiate",
362            Message::AdvanceTimelines => "advance_timelines",
363            Message::ClusterEvent(_) => "cluster_event",
364            Message::CancelPendingPeeks { .. } => "cancel_pending_peeks",
365            Message::LinearizeReads => "linearize_reads",
366            Message::StagedBatches { .. } => "staged_batches",
367            Message::StorageUsageSchedule => "storage_usage_schedule",
368            Message::StorageUsageFetch => "storage_usage_fetch",
369            Message::StorageUsageUpdate(_) => "storage_usage_update",
370            Message::StorageUsagePrune(_) => "storage_usage_prune",
371            Message::RetireExecute { .. } => "retire_execute",
372            Message::ExecuteSingleStatementTransaction { .. } => {
373                "execute_single_statement_transaction"
374            }
375            Message::PeekStageReady { .. } => "peek_stage_ready",
376            Message::ExplainTimestampStageReady { .. } => "explain_timestamp_stage_ready",
377            Message::CreateIndexStageReady { .. } => "create_index_stage_ready",
378            Message::CreateViewStageReady { .. } => "create_view_stage_ready",
379            Message::CreateMaterializedViewStageReady { .. } => {
380                "create_materialized_view_stage_ready"
381            }
382            Message::SubscribeStageReady { .. } => "subscribe_stage_ready",
383            Message::IntrospectionSubscribeStageReady { .. } => {
384                "introspection_subscribe_stage_ready"
385            }
386            Message::SecretStageReady { .. } => "secret_stage_ready",
387            Message::ClusterStageReady { .. } => "cluster_stage_ready",
388            Message::DrainStatementLog => "drain_statement_log",
389            Message::AlterConnectionValidationReady(..) => "alter_connection_validation_ready",
390            Message::PrivateLinkVpcEndpointEvents(_) => "private_link_vpc_endpoint_events",
391            Message::CheckSchedulingPolicies => "check_scheduling_policies",
392            Message::SchedulingDecisions { .. } => "scheduling_decision",
393            Message::DeferredStatementReady => "deferred_statement_ready",
394        }
395    }
396}
397
398#[derive(Derivative)]
399#[derivative(Debug)]
400pub struct BackgroundWorkResult<T> {
401    #[derivative(Debug = "ignore")]
402    pub ctx: ExecuteContext,
403    pub result: Result<T, AdapterError>,
404    pub params: Params,
405    pub plan_validity: PlanValidity,
406    pub original_stmt: Arc<Statement<Raw>>,
407    pub otel_ctx: OpenTelemetryContext,
408}
409
410pub type PurifiedStatementReady = BackgroundWorkResult<mz_sql::pure::PurifiedStatement>;
411
412#[derive(Derivative)]
413#[derivative(Debug)]
414pub struct ValidationReady<T> {
415    #[derivative(Debug = "ignore")]
416    pub ctx: ExecuteContext,
417    pub result: Result<T, AdapterError>,
418    pub resolved_ids: ResolvedIds,
419    pub connection_id: CatalogItemId,
420    pub connection_gid: GlobalId,
421    pub plan_validity: PlanValidity,
422    pub otel_ctx: OpenTelemetryContext,
423}
424
425pub type CreateConnectionValidationReady = ValidationReady<CreateConnectionPlan>;
426pub type AlterConnectionValidationReady = ValidationReady<Connection>;
427
428#[derive(Debug)]
429pub enum PeekStage {
430    /// Common stages across SELECT, EXPLAIN and COPY TO queries.
431    LinearizeTimestamp(PeekStageLinearizeTimestamp),
432    RealTimeRecency(PeekStageRealTimeRecency),
433    TimestampReadHold(PeekStageTimestampReadHold),
434    Optimize(PeekStageOptimize),
435    /// Final stage for a peek.
436    Finish(PeekStageFinish),
437    /// Final stage for an explain.
438    ExplainPlan(PeekStageExplainPlan),
439    ExplainPushdown(PeekStageExplainPushdown),
440    /// Preflight checks for a copy to operation.
441    CopyToPreflight(PeekStageCopyTo),
442    /// Final stage for a copy to which involves shipping the dataflow.
443    CopyToDataflow(PeekStageCopyTo),
444}
445
446#[derive(Debug)]
447pub struct CopyToContext {
448    /// The `RelationDesc` of the data to be copied.
449    pub desc: RelationDesc,
450    /// The destination uri of the external service where the data will be copied.
451    pub uri: Uri,
452    /// Connection information required to connect to the external service to copy the data.
453    pub connection: StorageConnection<ReferencedConnection>,
454    /// The ID of the CONNECTION object to be used for copying the data.
455    pub connection_id: CatalogItemId,
456    /// Format params to format the data.
457    pub format: S3SinkFormat,
458    /// Approximate max file size of each uploaded file.
459    pub max_file_size: u64,
460    /// Number of batches the output of the COPY TO will be partitioned into
461    /// to distribute the load across workers deterministically.
462    /// This is only an option since it's not set when CopyToContext is instantiated
463    /// but immediately after in the PeekStageValidate stage.
464    pub output_batch_count: Option<u64>,
465}
466
467#[derive(Debug)]
468pub struct PeekStageLinearizeTimestamp {
469    validity: PlanValidity,
470    plan: mz_sql::plan::SelectPlan,
471    max_query_result_size: Option<u64>,
472    source_ids: BTreeSet<GlobalId>,
473    target_replica: Option<ReplicaId>,
474    timeline_context: TimelineContext,
475    optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
476    /// An optional context set iff the state machine is initiated from
477    /// sequencing an EXPLAIN for this statement.
478    explain_ctx: ExplainContext,
479}
480
481#[derive(Debug)]
482pub struct PeekStageRealTimeRecency {
483    validity: PlanValidity,
484    plan: mz_sql::plan::SelectPlan,
485    max_query_result_size: Option<u64>,
486    source_ids: BTreeSet<GlobalId>,
487    target_replica: Option<ReplicaId>,
488    timeline_context: TimelineContext,
489    oracle_read_ts: Option<Timestamp>,
490    optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
491    /// An optional context set iff the state machine is initiated from
492    /// sequencing an EXPLAIN for this statement.
493    explain_ctx: ExplainContext,
494}
495
496#[derive(Debug)]
497pub struct PeekStageTimestampReadHold {
498    validity: PlanValidity,
499    plan: mz_sql::plan::SelectPlan,
500    max_query_result_size: Option<u64>,
501    source_ids: BTreeSet<GlobalId>,
502    target_replica: Option<ReplicaId>,
503    timeline_context: TimelineContext,
504    oracle_read_ts: Option<Timestamp>,
505    real_time_recency_ts: Option<mz_repr::Timestamp>,
506    optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
507    /// An optional context set iff the state machine is initiated from
508    /// sequencing an EXPLAIN for this statement.
509    explain_ctx: ExplainContext,
510}
511
512#[derive(Debug)]
513pub struct PeekStageOptimize {
514    validity: PlanValidity,
515    plan: mz_sql::plan::SelectPlan,
516    max_query_result_size: Option<u64>,
517    source_ids: BTreeSet<GlobalId>,
518    id_bundle: CollectionIdBundle,
519    target_replica: Option<ReplicaId>,
520    determination: TimestampDetermination<mz_repr::Timestamp>,
521    optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
522    /// An optional context set iff the state machine is initiated from
523    /// sequencing an EXPLAIN for this statement.
524    explain_ctx: ExplainContext,
525}
526
527#[derive(Debug)]
528pub struct PeekStageFinish {
529    validity: PlanValidity,
530    plan: mz_sql::plan::SelectPlan,
531    max_query_result_size: Option<u64>,
532    id_bundle: CollectionIdBundle,
533    target_replica: Option<ReplicaId>,
534    source_ids: BTreeSet<GlobalId>,
535    determination: TimestampDetermination<mz_repr::Timestamp>,
536    cluster_id: ComputeInstanceId,
537    finishing: RowSetFinishing,
538    /// When present, an optimizer trace to be used for emitting a plan insights
539    /// notice.
540    plan_insights_optimizer_trace: Option<OptimizerTrace>,
541    insights_ctx: Option<Box<PlanInsightsContext>>,
542    global_lir_plan: optimize::peek::GlobalLirPlan,
543    optimization_finished_at: EpochMillis,
544}
545
546#[derive(Debug)]
547pub struct PeekStageCopyTo {
548    validity: PlanValidity,
549    optimizer: optimize::copy_to::Optimizer,
550    global_lir_plan: optimize::copy_to::GlobalLirPlan,
551    optimization_finished_at: EpochMillis,
552    source_ids: BTreeSet<GlobalId>,
553}
554
555#[derive(Debug)]
556pub struct PeekStageExplainPlan {
557    validity: PlanValidity,
558    optimizer: optimize::peek::Optimizer,
559    df_meta: DataflowMetainfo,
560    explain_ctx: ExplainPlanContext,
561    insights_ctx: Option<Box<PlanInsightsContext>>,
562}
563
564#[derive(Debug)]
565pub struct PeekStageExplainPushdown {
566    validity: PlanValidity,
567    determination: TimestampDetermination<mz_repr::Timestamp>,
568    imports: BTreeMap<GlobalId, MapFilterProject>,
569}
570
571#[derive(Debug)]
572pub enum CreateIndexStage {
573    Optimize(CreateIndexOptimize),
574    Finish(CreateIndexFinish),
575    Explain(CreateIndexExplain),
576}
577
578#[derive(Debug)]
579pub struct CreateIndexOptimize {
580    validity: PlanValidity,
581    plan: plan::CreateIndexPlan,
582    resolved_ids: ResolvedIds,
583    /// An optional context set iff the state machine is initiated from
584    /// sequencing an EXPLAIN for this statement.
585    explain_ctx: ExplainContext,
586}
587
588#[derive(Debug)]
589pub struct CreateIndexFinish {
590    validity: PlanValidity,
591    item_id: CatalogItemId,
592    global_id: GlobalId,
593    plan: plan::CreateIndexPlan,
594    resolved_ids: ResolvedIds,
595    global_mir_plan: optimize::index::GlobalMirPlan,
596    global_lir_plan: optimize::index::GlobalLirPlan,
597}
598
599#[derive(Debug)]
600pub struct CreateIndexExplain {
601    validity: PlanValidity,
602    exported_index_id: GlobalId,
603    plan: plan::CreateIndexPlan,
604    df_meta: DataflowMetainfo,
605    explain_ctx: ExplainPlanContext,
606}
607
608#[derive(Debug)]
609pub enum CreateViewStage {
610    Optimize(CreateViewOptimize),
611    Finish(CreateViewFinish),
612    Explain(CreateViewExplain),
613}
614
615#[derive(Debug)]
616pub struct CreateViewOptimize {
617    validity: PlanValidity,
618    plan: plan::CreateViewPlan,
619    resolved_ids: ResolvedIds,
620    /// An optional context set iff the state machine is initiated from
621    /// sequencing an EXPLAIN for this statement.
622    explain_ctx: ExplainContext,
623}
624
625#[derive(Debug)]
626pub struct CreateViewFinish {
627    validity: PlanValidity,
628    /// ID of this item in the Catalog.
629    item_id: CatalogItemId,
630    /// ID by with Compute will reference this View.
631    global_id: GlobalId,
632    plan: plan::CreateViewPlan,
633    /// IDs of objects resolved during name resolution.
634    resolved_ids: ResolvedIds,
635    optimized_expr: OptimizedMirRelationExpr,
636}
637
638#[derive(Debug)]
639pub struct CreateViewExplain {
640    validity: PlanValidity,
641    id: GlobalId,
642    plan: plan::CreateViewPlan,
643    explain_ctx: ExplainPlanContext,
644}
645
646#[derive(Debug)]
647pub enum ExplainTimestampStage {
648    Optimize(ExplainTimestampOptimize),
649    RealTimeRecency(ExplainTimestampRealTimeRecency),
650    Finish(ExplainTimestampFinish),
651}
652
653#[derive(Debug)]
654pub struct ExplainTimestampOptimize {
655    validity: PlanValidity,
656    plan: plan::ExplainTimestampPlan,
657    cluster_id: ClusterId,
658}
659
660#[derive(Debug)]
661pub struct ExplainTimestampRealTimeRecency {
662    validity: PlanValidity,
663    format: ExplainFormat,
664    optimized_plan: OptimizedMirRelationExpr,
665    cluster_id: ClusterId,
666    when: QueryWhen,
667}
668
669#[derive(Debug)]
670pub struct ExplainTimestampFinish {
671    validity: PlanValidity,
672    format: ExplainFormat,
673    optimized_plan: OptimizedMirRelationExpr,
674    cluster_id: ClusterId,
675    source_ids: BTreeSet<GlobalId>,
676    when: QueryWhen,
677    real_time_recency_ts: Option<Timestamp>,
678}
679
680#[derive(Debug)]
681pub enum ClusterStage {
682    Alter(AlterCluster),
683    WaitForHydrated(AlterClusterWaitForHydrated),
684    Finalize(AlterClusterFinalize),
685}
686
687#[derive(Debug)]
688pub struct AlterCluster {
689    validity: PlanValidity,
690    plan: plan::AlterClusterPlan,
691}
692
693#[derive(Debug)]
694pub struct AlterClusterWaitForHydrated {
695    validity: PlanValidity,
696    plan: plan::AlterClusterPlan,
697    new_config: ClusterVariantManaged,
698    timeout_time: Instant,
699    on_timeout: OnTimeoutAction,
700}
701
702#[derive(Debug)]
703pub struct AlterClusterFinalize {
704    validity: PlanValidity,
705    plan: plan::AlterClusterPlan,
706    new_config: ClusterVariantManaged,
707}
708
709#[derive(Debug)]
710pub enum ExplainContext {
711    /// The ordinary, non-explain variant of the statement.
712    None,
713    /// The `EXPLAIN <level> PLAN FOR <explainee>` version of the statement.
714    Plan(ExplainPlanContext),
715    /// Generate a notice containing the `EXPLAIN PLAN INSIGHTS` output
716    /// alongside the query's normal output.
717    PlanInsightsNotice(OptimizerTrace),
718    /// `EXPLAIN FILTER PUSHDOWN`
719    Pushdown,
720}
721
722impl ExplainContext {
723    /// If available for this context, wrap the [`OptimizerTrace`] into a
724    /// [`tracing::Dispatch`] and set it as default, returning the resulting
725    /// guard in a `Some(guard)` option.
726    fn dispatch_guard(&self) -> Option<DispatchGuard<'_>> {
727        let optimizer_trace = match self {
728            ExplainContext::Plan(explain_ctx) => Some(&explain_ctx.optimizer_trace),
729            ExplainContext::PlanInsightsNotice(optimizer_trace) => Some(optimizer_trace),
730            _ => None,
731        };
732        optimizer_trace.map(|optimizer_trace| optimizer_trace.as_guard())
733    }
734
735    fn needs_cluster(&self) -> bool {
736        match self {
737            ExplainContext::None => true,
738            ExplainContext::Plan(..) => false,
739            ExplainContext::PlanInsightsNotice(..) => true,
740            ExplainContext::Pushdown => false,
741        }
742    }
743
744    fn needs_plan_insights(&self) -> bool {
745        matches!(
746            self,
747            ExplainContext::Plan(ExplainPlanContext {
748                stage: ExplainStage::PlanInsights,
749                ..
750            }) | ExplainContext::PlanInsightsNotice(_)
751        )
752    }
753}
754
755#[derive(Debug)]
756pub struct ExplainPlanContext {
757    pub broken: bool,
758    pub config: ExplainConfig,
759    pub format: ExplainFormat,
760    pub stage: ExplainStage,
761    pub replan: Option<GlobalId>,
762    pub desc: Option<RelationDesc>,
763    pub optimizer_trace: OptimizerTrace,
764}
765
766#[derive(Debug)]
767pub enum CreateMaterializedViewStage {
768    Optimize(CreateMaterializedViewOptimize),
769    Finish(CreateMaterializedViewFinish),
770    Explain(CreateMaterializedViewExplain),
771}
772
773#[derive(Debug)]
774pub struct CreateMaterializedViewOptimize {
775    validity: PlanValidity,
776    plan: plan::CreateMaterializedViewPlan,
777    resolved_ids: ResolvedIds,
778    /// An optional context set iff the state machine is initiated from
779    /// sequencing an EXPLAIN for this statement.
780    explain_ctx: ExplainContext,
781}
782
783#[derive(Debug)]
784pub struct CreateMaterializedViewFinish {
785    /// The ID of this Materialized View in the Catalog.
786    item_id: CatalogItemId,
787    /// The ID of the durable pTVC backing this Materialized View.
788    global_id: GlobalId,
789    validity: PlanValidity,
790    plan: plan::CreateMaterializedViewPlan,
791    resolved_ids: ResolvedIds,
792    local_mir_plan: optimize::materialized_view::LocalMirPlan,
793    global_mir_plan: optimize::materialized_view::GlobalMirPlan,
794    global_lir_plan: optimize::materialized_view::GlobalLirPlan,
795}
796
797#[derive(Debug)]
798pub struct CreateMaterializedViewExplain {
799    global_id: GlobalId,
800    validity: PlanValidity,
801    plan: plan::CreateMaterializedViewPlan,
802    df_meta: DataflowMetainfo,
803    explain_ctx: ExplainPlanContext,
804}
805
806#[derive(Debug)]
807pub enum SubscribeStage {
808    OptimizeMir(SubscribeOptimizeMir),
809    TimestampOptimizeLir(SubscribeTimestampOptimizeLir),
810    Finish(SubscribeFinish),
811}
812
813#[derive(Debug)]
814pub struct SubscribeOptimizeMir {
815    validity: PlanValidity,
816    plan: plan::SubscribePlan,
817    timeline: TimelineContext,
818    dependency_ids: BTreeSet<GlobalId>,
819    cluster_id: ComputeInstanceId,
820    replica_id: Option<ReplicaId>,
821}
822
823#[derive(Debug)]
824pub struct SubscribeTimestampOptimizeLir {
825    validity: PlanValidity,
826    plan: plan::SubscribePlan,
827    timeline: TimelineContext,
828    optimizer: optimize::subscribe::Optimizer,
829    global_mir_plan: optimize::subscribe::GlobalMirPlan<optimize::subscribe::Unresolved>,
830    dependency_ids: BTreeSet<GlobalId>,
831    replica_id: Option<ReplicaId>,
832}
833
834#[derive(Debug)]
835pub struct SubscribeFinish {
836    validity: PlanValidity,
837    cluster_id: ComputeInstanceId,
838    replica_id: Option<ReplicaId>,
839    plan: plan::SubscribePlan,
840    global_lir_plan: optimize::subscribe::GlobalLirPlan,
841    dependency_ids: BTreeSet<GlobalId>,
842}
843
844#[derive(Debug)]
845pub enum IntrospectionSubscribeStage {
846    OptimizeMir(IntrospectionSubscribeOptimizeMir),
847    TimestampOptimizeLir(IntrospectionSubscribeTimestampOptimizeLir),
848    Finish(IntrospectionSubscribeFinish),
849}
850
851#[derive(Debug)]
852pub struct IntrospectionSubscribeOptimizeMir {
853    validity: PlanValidity,
854    plan: plan::SubscribePlan,
855    subscribe_id: GlobalId,
856    cluster_id: ComputeInstanceId,
857    replica_id: ReplicaId,
858}
859
860#[derive(Debug)]
861pub struct IntrospectionSubscribeTimestampOptimizeLir {
862    validity: PlanValidity,
863    optimizer: optimize::subscribe::Optimizer,
864    global_mir_plan: optimize::subscribe::GlobalMirPlan<optimize::subscribe::Unresolved>,
865    cluster_id: ComputeInstanceId,
866    replica_id: ReplicaId,
867}
868
869#[derive(Debug)]
870pub struct IntrospectionSubscribeFinish {
871    validity: PlanValidity,
872    global_lir_plan: optimize::subscribe::GlobalLirPlan,
873    read_holds: ReadHolds<Timestamp>,
874    cluster_id: ComputeInstanceId,
875    replica_id: ReplicaId,
876}
877
878#[derive(Debug)]
879pub enum SecretStage {
880    CreateEnsure(CreateSecretEnsure),
881    CreateFinish(CreateSecretFinish),
882    RotateKeysEnsure(RotateKeysSecretEnsure),
883    RotateKeysFinish(RotateKeysSecretFinish),
884    Alter(AlterSecret),
885}
886
887#[derive(Debug)]
888pub struct CreateSecretEnsure {
889    validity: PlanValidity,
890    plan: plan::CreateSecretPlan,
891}
892
893#[derive(Debug)]
894pub struct CreateSecretFinish {
895    validity: PlanValidity,
896    item_id: CatalogItemId,
897    global_id: GlobalId,
898    plan: plan::CreateSecretPlan,
899}
900
901#[derive(Debug)]
902pub struct RotateKeysSecretEnsure {
903    validity: PlanValidity,
904    id: CatalogItemId,
905}
906
907#[derive(Debug)]
908pub struct RotateKeysSecretFinish {
909    validity: PlanValidity,
910    ops: Vec<crate::catalog::Op>,
911}
912
913#[derive(Debug)]
914pub struct AlterSecret {
915    validity: PlanValidity,
916    plan: plan::AlterSecretPlan,
917}
918
919/// An enum describing which cluster to run a statement on.
920///
921/// One example usage would be that if a query depends only on system tables, we might
922/// automatically run it on the catalog server cluster to benefit from indexes that exist there.
923#[derive(Debug, Copy, Clone, PartialEq, Eq)]
924pub enum TargetCluster {
925    /// The catalog server cluster.
926    CatalogServer,
927    /// The current user's active cluster.
928    Active,
929    /// The cluster selected at the start of a transaction.
930    Transaction(ClusterId),
931}
932
933/// Result types for each stage of a sequence.
934pub(crate) enum StageResult<T> {
935    /// A task was spawned that will return the next stage.
936    Handle(JoinHandle<Result<T, AdapterError>>),
937    /// A task was spawned that will return a response for the client.
938    HandleRetire(JoinHandle<Result<ExecuteResponse, AdapterError>>),
939    /// The next stage is immediately ready and will execute.
940    Immediate(T),
941    /// The final stage was executed and is ready to respond to the client.
942    Response(ExecuteResponse),
943}
944
945/// Common functionality for [Coordinator::sequence_staged].
946pub(crate) trait Staged: Send {
947    type Ctx: StagedContext;
948
949    fn validity(&mut self) -> &mut PlanValidity;
950
951    /// Returns the next stage or final result.
952    async fn stage(
953        self,
954        coord: &mut Coordinator,
955        ctx: &mut Self::Ctx,
956    ) -> Result<StageResult<Box<Self>>, AdapterError>;
957
958    /// Prepares a message for the Coordinator.
959    fn message(self, ctx: Self::Ctx, span: Span) -> Message;
960
961    /// Whether it is safe to SQL cancel this stage.
962    fn cancel_enabled(&self) -> bool;
963}
964
965pub trait StagedContext {
966    fn retire(self, result: Result<ExecuteResponse, AdapterError>);
967    fn session(&self) -> Option<&Session>;
968}
969
970impl StagedContext for ExecuteContext {
971    fn retire(self, result: Result<ExecuteResponse, AdapterError>) {
972        self.retire(result);
973    }
974
975    fn session(&self) -> Option<&Session> {
976        Some(self.session())
977    }
978}
979
980impl StagedContext for () {
981    fn retire(self, _result: Result<ExecuteResponse, AdapterError>) {}
982
983    fn session(&self) -> Option<&Session> {
984        None
985    }
986}
987
988/// Configures a coordinator.
989pub struct Config {
990    pub controller_config: ControllerConfig,
991    pub controller_envd_epoch: NonZeroI64,
992    pub storage: Box<dyn mz_catalog::durable::DurableCatalogState>,
993    pub audit_logs_iterator: AuditLogIterator,
994    pub timestamp_oracle_url: Option<SensitiveUrl>,
995    pub unsafe_mode: bool,
996    pub all_features: bool,
997    pub build_info: &'static BuildInfo,
998    pub environment_id: EnvironmentId,
999    pub metrics_registry: MetricsRegistry,
1000    pub now: NowFn,
1001    pub secrets_controller: Arc<dyn SecretsController>,
1002    pub cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
1003    pub availability_zones: Vec<String>,
1004    pub cluster_replica_sizes: ClusterReplicaSizeMap,
1005    pub builtin_system_cluster_config: BootstrapBuiltinClusterConfig,
1006    pub builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig,
1007    pub builtin_probe_cluster_config: BootstrapBuiltinClusterConfig,
1008    pub builtin_support_cluster_config: BootstrapBuiltinClusterConfig,
1009    pub builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig,
1010    pub system_parameter_defaults: BTreeMap<String, String>,
1011    pub storage_usage_client: StorageUsageClient,
1012    pub storage_usage_collection_interval: Duration,
1013    pub storage_usage_retention_period: Option<Duration>,
1014    pub segment_client: Option<mz_segment::Client>,
1015    pub egress_addresses: Vec<IpNet>,
1016    pub remote_system_parameters: Option<BTreeMap<String, String>>,
1017    pub aws_account_id: Option<String>,
1018    pub aws_privatelink_availability_zones: Option<Vec<String>>,
1019    pub connection_context: ConnectionContext,
1020    pub connection_limit_callback: Box<dyn Fn(u64, u64) -> () + Send + Sync + 'static>,
1021    pub webhook_concurrency_limit: WebhookConcurrencyLimiter,
1022    pub http_host_name: Option<String>,
1023    pub tracing_handle: TracingHandle,
1024    /// Whether or not to start controllers in read-only mode. This is only
1025    /// meant for use during development of read-only clusters and 0dt upgrades
1026    /// and should go away once we have proper orchestration during upgrades.
1027    pub read_only_controllers: bool,
1028    /// Whether to enable zero-downtime deployments.
1029    pub enable_0dt_deployment: bool,
1030
1031    /// A trigger that signals that the current deployment has caught up with a
1032    /// previous deployment. Only used during 0dt deployment, while in read-only
1033    /// mode.
1034    pub caught_up_trigger: Option<Trigger>,
1035
1036    pub helm_chart_version: Option<String>,
1037    pub license_key: ValidatedLicenseKey,
1038}
1039
1040/// Soft-state metadata about a compute replica
1041#[derive(Clone, Default, Debug, Eq, PartialEq)]
1042pub struct ReplicaMetadata {
1043    /// The last known CPU and memory metrics
1044    pub metrics: Option<Vec<ServiceProcessMetrics>>,
1045}
1046
1047/// Metadata about an active connection.
1048#[derive(Debug, Serialize)]
1049pub struct ConnMeta {
1050    /// Pgwire specifies that every connection have a 32-bit secret associated
1051    /// with it, that is known to both the client and the server. Cancellation
1052    /// requests are required to authenticate with the secret of the connection
1053    /// that they are targeting.
1054    secret_key: u32,
1055    /// The time when the session's connection was initiated.
1056    connected_at: EpochMillis,
1057    user: User,
1058    application_name: String,
1059    uuid: Uuid,
1060    conn_id: ConnectionId,
1061    client_ip: Option<IpAddr>,
1062
1063    /// Sinks that will need to be dropped when the current transaction, if
1064    /// any, is cleared.
1065    drop_sinks: BTreeSet<GlobalId>,
1066
1067    /// Lock for the Coordinator's deferred statements that is dropped on transaction clear.
1068    #[serde(skip)]
1069    deferred_lock: Option<OwnedMutexGuard<()>>,
1070
1071    /// Cluster reconfigurations that will need to be
1072    /// cleaned up when the current transaction is cleared
1073    pending_cluster_alters: BTreeSet<ClusterId>,
1074
1075    /// Channel on which to send notices to a session.
1076    #[serde(skip)]
1077    notice_tx: mpsc::UnboundedSender<AdapterNotice>,
1078
1079    /// The role that initiated the database context. Fixed for the duration of the connection.
1080    /// WARNING: This role reference is not updated when the role is dropped.
1081    /// Consumers should not assume that this role exist.
1082    authenticated_role: RoleId,
1083}
1084
1085impl ConnMeta {
1086    pub fn conn_id(&self) -> &ConnectionId {
1087        &self.conn_id
1088    }
1089
1090    pub fn user(&self) -> &User {
1091        &self.user
1092    }
1093
1094    pub fn application_name(&self) -> &str {
1095        &self.application_name
1096    }
1097
1098    pub fn authenticated_role_id(&self) -> &RoleId {
1099        &self.authenticated_role
1100    }
1101
1102    pub fn uuid(&self) -> Uuid {
1103        self.uuid
1104    }
1105
1106    pub fn client_ip(&self) -> Option<IpAddr> {
1107        self.client_ip
1108    }
1109
1110    pub fn connected_at(&self) -> EpochMillis {
1111        self.connected_at
1112    }
1113}
1114
1115#[derive(Debug)]
1116/// A pending transaction waiting to be committed.
1117pub struct PendingTxn {
1118    /// Context used to send a response back to the client.
1119    ctx: ExecuteContext,
1120    /// Client response for transaction.
1121    response: Result<PendingTxnResponse, AdapterError>,
1122    /// The action to take at the end of the transaction.
1123    action: EndTransactionAction,
1124}
1125
1126#[derive(Debug)]
1127/// The response we'll send for a [`PendingTxn`].
1128pub enum PendingTxnResponse {
1129    /// The transaction will be committed.
1130    Committed {
1131        /// Parameters that will change, and their values, once this transaction is complete.
1132        params: BTreeMap<&'static str, String>,
1133    },
1134    /// The transaction will be rolled back.
1135    Rolledback {
1136        /// Parameters that will change, and their values, once this transaction is complete.
1137        params: BTreeMap<&'static str, String>,
1138    },
1139}
1140
1141impl PendingTxnResponse {
1142    pub fn extend_params(&mut self, p: impl IntoIterator<Item = (&'static str, String)>) {
1143        match self {
1144            PendingTxnResponse::Committed { params }
1145            | PendingTxnResponse::Rolledback { params } => params.extend(p),
1146        }
1147    }
1148}
1149
1150impl From<PendingTxnResponse> for ExecuteResponse {
1151    fn from(value: PendingTxnResponse) -> Self {
1152        match value {
1153            PendingTxnResponse::Committed { params } => {
1154                ExecuteResponse::TransactionCommitted { params }
1155            }
1156            PendingTxnResponse::Rolledback { params } => {
1157                ExecuteResponse::TransactionRolledBack { params }
1158            }
1159        }
1160    }
1161}
1162
1163#[derive(Debug)]
1164/// A pending read transaction waiting to be linearized along with metadata about it's state
1165pub struct PendingReadTxn {
1166    /// The transaction type
1167    txn: PendingRead,
1168    /// The timestamp context of the transaction.
1169    timestamp_context: TimestampContext<mz_repr::Timestamp>,
1170    /// When we created this pending txn, when the transaction ends. Only used for metrics.
1171    created: Instant,
1172    /// Number of times we requeued the processing of this pending read txn.
1173    /// Requeueing is necessary if the time we executed the query is after the current oracle time;
1174    /// see [`Coordinator::message_linearize_reads`] for more details.
1175    num_requeues: u64,
1176    /// Telemetry context.
1177    otel_ctx: OpenTelemetryContext,
1178}
1179
1180impl PendingReadTxn {
1181    /// Return the timestamp context of the pending read transaction.
1182    pub fn timestamp_context(&self) -> &TimestampContext<mz_repr::Timestamp> {
1183        &self.timestamp_context
1184    }
1185
1186    pub(crate) fn take_context(self) -> ExecuteContext {
1187        self.txn.take_context()
1188    }
1189}
1190
1191#[derive(Debug)]
1192/// A pending read transaction waiting to be linearized.
1193enum PendingRead {
1194    Read {
1195        /// The inner transaction.
1196        txn: PendingTxn,
1197    },
1198    ReadThenWrite {
1199        /// Context used to send a response back to the client.
1200        ctx: ExecuteContext,
1201        /// Channel used to alert the transaction that the read has been linearized and send back
1202        /// `ctx`.
1203        tx: oneshot::Sender<Option<ExecuteContext>>,
1204    },
1205}
1206
1207impl PendingRead {
1208    /// Alert the client that the read has been linearized.
1209    ///
1210    /// If it is necessary to finalize an execute, return the state necessary to do so
1211    /// (execution context and result)
1212    #[instrument(level = "debug")]
1213    pub fn finish(self) -> Option<(ExecuteContext, Result<ExecuteResponse, AdapterError>)> {
1214        match self {
1215            PendingRead::Read {
1216                txn:
1217                    PendingTxn {
1218                        mut ctx,
1219                        response,
1220                        action,
1221                    },
1222                ..
1223            } => {
1224                let changed = ctx.session_mut().vars_mut().end_transaction(action);
1225                // Append any parameters that changed to the response.
1226                let response = response.map(|mut r| {
1227                    r.extend_params(changed);
1228                    ExecuteResponse::from(r)
1229                });
1230
1231                Some((ctx, response))
1232            }
1233            PendingRead::ReadThenWrite { ctx, tx, .. } => {
1234                // Ignore errors if the caller has hung up.
1235                let _ = tx.send(Some(ctx));
1236                None
1237            }
1238        }
1239    }
1240
1241    fn label(&self) -> &'static str {
1242        match self {
1243            PendingRead::Read { .. } => "read",
1244            PendingRead::ReadThenWrite { .. } => "read_then_write",
1245        }
1246    }
1247
1248    pub(crate) fn take_context(self) -> ExecuteContext {
1249        match self {
1250            PendingRead::Read { txn, .. } => txn.ctx,
1251            PendingRead::ReadThenWrite { ctx, tx, .. } => {
1252                // Inform the transaction that we've taken their context.
1253                // Ignore errors if the caller has hung up.
1254                let _ = tx.send(None);
1255                ctx
1256            }
1257        }
1258    }
1259}
1260
1261/// State that the coordinator must process as part of retiring
1262/// command execution.  `ExecuteContextExtra::Default` is guaranteed
1263/// to produce a value that will cause the coordinator to do nothing, and
1264/// is intended for use by code that invokes the execution processing flow
1265/// (i.e., `sequence_plan`) without actually being a statement execution.
1266///
1267/// This struct must not be dropped if it contains non-trivial
1268/// state. The only valid way to get rid of it is to pass it to the
1269/// coordinator for retirement. To enforce this, we assert in the
1270/// `Drop` implementation.
1271#[derive(Debug, Default)]
1272#[must_use]
1273pub struct ExecuteContextExtra {
1274    statement_uuid: Option<StatementLoggingId>,
1275}
1276
1277impl ExecuteContextExtra {
1278    pub(crate) fn new(statement_uuid: Option<StatementLoggingId>) -> Self {
1279        Self { statement_uuid }
1280    }
1281    pub fn is_trivial(&self) -> bool {
1282        let Self { statement_uuid } = self;
1283        statement_uuid.is_none()
1284    }
1285    pub fn contents(&self) -> Option<StatementLoggingId> {
1286        let Self { statement_uuid } = self;
1287        *statement_uuid
1288    }
1289    /// Take responsibility for the contents.  This should only be
1290    /// called from code that knows what to do to finish up logging
1291    /// based on the inner value.
1292    #[must_use]
1293    fn retire(mut self) -> Option<StatementLoggingId> {
1294        let Self { statement_uuid } = &mut self;
1295        statement_uuid.take()
1296    }
1297}
1298
1299impl Drop for ExecuteContextExtra {
1300    fn drop(&mut self) {
1301        let Self { statement_uuid } = &*self;
1302        if let Some(statement_uuid) = statement_uuid {
1303            // Note: the impact when this error hits
1304            // is that the statement will never be marked
1305            // as finished in the statement log.
1306            soft_panic_or_log!(
1307                "execute context for statement {statement_uuid:?} dropped without being properly retired."
1308            );
1309        }
1310    }
1311}
1312
1313/// Bundle of state related to statement execution.
1314///
1315/// This struct collects a bundle of state that needs to be threaded
1316/// through various functions as part of statement execution.
1317/// Currently, it is only used to finalize execution, by calling one
1318/// of the methods `retire` or `retire_aysnc`. Finalizing execution
1319/// involves sending the session back to the pgwire layer so that it
1320/// may be used to process further commands. In the future, it will
1321/// also involve performing some work on the main coordinator thread
1322/// (e.g., recording the time at which the statement finished
1323/// executing) the state necessary to perform this work is bundled in
1324/// the `ExecuteContextExtra` object (today, it is simply empty).
1325#[derive(Debug)]
1326pub struct ExecuteContext {
1327    inner: Box<ExecuteContextInner>,
1328}
1329
1330impl std::ops::Deref for ExecuteContext {
1331    type Target = ExecuteContextInner;
1332    fn deref(&self) -> &Self::Target {
1333        &*self.inner
1334    }
1335}
1336
1337impl std::ops::DerefMut for ExecuteContext {
1338    fn deref_mut(&mut self) -> &mut Self::Target {
1339        &mut *self.inner
1340    }
1341}
1342
1343#[derive(Debug)]
1344pub struct ExecuteContextInner {
1345    tx: ClientTransmitter<ExecuteResponse>,
1346    internal_cmd_tx: mpsc::UnboundedSender<Message>,
1347    session: Session,
1348    extra: ExecuteContextExtra,
1349}
1350
1351impl ExecuteContext {
1352    pub fn session(&self) -> &Session {
1353        &self.session
1354    }
1355
1356    pub fn session_mut(&mut self) -> &mut Session {
1357        &mut self.session
1358    }
1359
1360    pub fn tx(&self) -> &ClientTransmitter<ExecuteResponse> {
1361        &self.tx
1362    }
1363
1364    pub fn tx_mut(&mut self) -> &mut ClientTransmitter<ExecuteResponse> {
1365        &mut self.tx
1366    }
1367
1368    pub fn from_parts(
1369        tx: ClientTransmitter<ExecuteResponse>,
1370        internal_cmd_tx: mpsc::UnboundedSender<Message>,
1371        session: Session,
1372        extra: ExecuteContextExtra,
1373    ) -> Self {
1374        Self {
1375            inner: ExecuteContextInner {
1376                tx,
1377                session,
1378                extra,
1379                internal_cmd_tx,
1380            }
1381            .into(),
1382        }
1383    }
1384
1385    /// By calling this function, the caller takes responsibility for
1386    /// dealing with the instance of `ExecuteContextExtra`. This is
1387    /// intended to support protocols (like `COPY FROM`) that involve
1388    /// multiple passes of sending the session back and forth between
1389    /// the coordinator and the pgwire layer. As part of any such
1390    /// protocol, we must ensure that the `ExecuteContextExtra`
1391    /// (possibly wrapped in a new `ExecuteContext`) is passed back to the coordinator for
1392    /// eventual retirement.
1393    pub fn into_parts(
1394        self,
1395    ) -> (
1396        ClientTransmitter<ExecuteResponse>,
1397        mpsc::UnboundedSender<Message>,
1398        Session,
1399        ExecuteContextExtra,
1400    ) {
1401        let ExecuteContextInner {
1402            tx,
1403            internal_cmd_tx,
1404            session,
1405            extra,
1406        } = *self.inner;
1407        (tx, internal_cmd_tx, session, extra)
1408    }
1409
1410    /// Retire the execution, by sending a message to the coordinator.
1411    #[instrument(level = "debug")]
1412    pub fn retire(self, result: Result<ExecuteResponse, AdapterError>) {
1413        let ExecuteContextInner {
1414            tx,
1415            internal_cmd_tx,
1416            session,
1417            extra,
1418        } = *self.inner;
1419        let reason = if extra.is_trivial() {
1420            None
1421        } else {
1422            Some((&result).into())
1423        };
1424        tx.send(result, session);
1425        if let Some(reason) = reason {
1426            if let Err(e) = internal_cmd_tx.send(Message::RetireExecute {
1427                otel_ctx: OpenTelemetryContext::obtain(),
1428                data: extra,
1429                reason,
1430            }) {
1431                warn!("internal_cmd_rx dropped before we could send: {:?}", e);
1432            }
1433        }
1434    }
1435
1436    pub fn extra(&self) -> &ExecuteContextExtra {
1437        &self.extra
1438    }
1439
1440    pub fn extra_mut(&mut self) -> &mut ExecuteContextExtra {
1441        &mut self.extra
1442    }
1443}
1444
1445#[derive(Debug)]
1446struct ClusterReplicaStatuses(
1447    BTreeMap<ClusterId, BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>>>,
1448);
1449
1450impl ClusterReplicaStatuses {
1451    pub(crate) fn new() -> ClusterReplicaStatuses {
1452        ClusterReplicaStatuses(BTreeMap::new())
1453    }
1454
1455    /// Initializes the statuses of the specified cluster.
1456    ///
1457    /// Panics if the cluster statuses are already initialized.
1458    pub(crate) fn initialize_cluster_statuses(&mut self, cluster_id: ClusterId) {
1459        let prev = self.0.insert(cluster_id, BTreeMap::new());
1460        assert_eq!(
1461            prev, None,
1462            "cluster {cluster_id} statuses already initialized"
1463        );
1464    }
1465
1466    /// Initializes the statuses of the specified cluster replica.
1467    ///
1468    /// Panics if the cluster replica statuses are already initialized.
1469    pub(crate) fn initialize_cluster_replica_statuses(
1470        &mut self,
1471        cluster_id: ClusterId,
1472        replica_id: ReplicaId,
1473        num_processes: usize,
1474        time: DateTime<Utc>,
1475    ) {
1476        tracing::info!(
1477            ?cluster_id,
1478            ?replica_id,
1479            ?time,
1480            "initializing cluster replica status"
1481        );
1482        let replica_statuses = self.0.entry(cluster_id).or_default();
1483        let process_statuses = (0..num_processes)
1484            .map(|process_id| {
1485                let status = ClusterReplicaProcessStatus {
1486                    status: ClusterStatus::Offline(Some(OfflineReason::Initializing)),
1487                    time: time.clone(),
1488                };
1489                (u64::cast_from(process_id), status)
1490            })
1491            .collect();
1492        let prev = replica_statuses.insert(replica_id, process_statuses);
1493        assert_none!(
1494            prev,
1495            "cluster replica {cluster_id}.{replica_id} statuses already initialized"
1496        );
1497    }
1498
1499    /// Removes the statuses of the specified cluster.
1500    ///
1501    /// Panics if the cluster does not exist.
1502    pub(crate) fn remove_cluster_statuses(
1503        &mut self,
1504        cluster_id: &ClusterId,
1505    ) -> BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>> {
1506        let prev = self.0.remove(cluster_id);
1507        prev.unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"))
1508    }
1509
1510    /// Removes the statuses of the specified cluster replica.
1511    ///
1512    /// Panics if the cluster or replica does not exist.
1513    pub(crate) fn remove_cluster_replica_statuses(
1514        &mut self,
1515        cluster_id: &ClusterId,
1516        replica_id: &ReplicaId,
1517    ) -> BTreeMap<ProcessId, ClusterReplicaProcessStatus> {
1518        let replica_statuses = self
1519            .0
1520            .get_mut(cluster_id)
1521            .unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"));
1522        let prev = replica_statuses.remove(replica_id);
1523        prev.unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1524    }
1525
1526    /// Inserts or updates the status of the specified cluster replica process.
1527    ///
1528    /// Panics if the cluster or replica does not exist.
1529    pub(crate) fn ensure_cluster_status(
1530        &mut self,
1531        cluster_id: ClusterId,
1532        replica_id: ReplicaId,
1533        process_id: ProcessId,
1534        status: ClusterReplicaProcessStatus,
1535    ) {
1536        let replica_statuses = self
1537            .0
1538            .get_mut(&cluster_id)
1539            .unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"))
1540            .get_mut(&replica_id)
1541            .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"));
1542        replica_statuses.insert(process_id, status);
1543    }
1544
1545    /// Computes the status of the cluster replica as a whole.
1546    ///
1547    /// Panics if `cluster_id` or `replica_id` don't exist.
1548    pub fn get_cluster_replica_status(
1549        &self,
1550        cluster_id: ClusterId,
1551        replica_id: ReplicaId,
1552    ) -> ClusterStatus {
1553        let process_status = self.get_cluster_replica_statuses(cluster_id, replica_id);
1554        Self::cluster_replica_status(process_status)
1555    }
1556
1557    /// Computes the status of the cluster replica as a whole.
1558    pub fn cluster_replica_status(
1559        process_status: &BTreeMap<ProcessId, ClusterReplicaProcessStatus>,
1560    ) -> ClusterStatus {
1561        process_status
1562            .values()
1563            .fold(ClusterStatus::Online, |s, p| match (s, p.status) {
1564                (ClusterStatus::Online, ClusterStatus::Online) => ClusterStatus::Online,
1565                (x, y) => {
1566                    let reason_x = match x {
1567                        ClusterStatus::Offline(reason) => reason,
1568                        ClusterStatus::Online => None,
1569                    };
1570                    let reason_y = match y {
1571                        ClusterStatus::Offline(reason) => reason,
1572                        ClusterStatus::Online => None,
1573                    };
1574                    // Arbitrarily pick the first known not-ready reason.
1575                    ClusterStatus::Offline(reason_x.or(reason_y))
1576                }
1577            })
1578    }
1579
1580    /// Gets the statuses of the given cluster replica.
1581    ///
1582    /// Panics if the cluster or replica does not exist
1583    pub(crate) fn get_cluster_replica_statuses(
1584        &self,
1585        cluster_id: ClusterId,
1586        replica_id: ReplicaId,
1587    ) -> &BTreeMap<ProcessId, ClusterReplicaProcessStatus> {
1588        self.try_get_cluster_replica_statuses(cluster_id, replica_id)
1589            .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1590    }
1591
1592    /// Gets the statuses of the given cluster replica.
1593    pub(crate) fn try_get_cluster_replica_statuses(
1594        &self,
1595        cluster_id: ClusterId,
1596        replica_id: ReplicaId,
1597    ) -> Option<&BTreeMap<ProcessId, ClusterReplicaProcessStatus>> {
1598        self.try_get_cluster_statuses(cluster_id)
1599            .and_then(|statuses| statuses.get(&replica_id))
1600    }
1601
1602    /// Gets the statuses of the given cluster.
1603    ///
1604    /// Panics if the cluster does not exist
1605    pub(crate) fn get_cluster_statuses(
1606        &self,
1607        cluster_id: ClusterId,
1608    ) -> &BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>> {
1609        self.try_get_cluster_statuses(cluster_id)
1610            .unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"))
1611    }
1612
1613    /// Gets the statuses of the given cluster.
1614    pub(crate) fn try_get_cluster_statuses(
1615        &self,
1616        cluster_id: ClusterId,
1617    ) -> Option<&BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>>> {
1618        self.0.get(&cluster_id)
1619    }
1620}
1621
1622/// Glues the external world to the Timely workers.
1623#[derive(Derivative)]
1624#[derivative(Debug)]
1625pub struct Coordinator {
1626    /// The controller for the storage and compute layers.
1627    #[derivative(Debug = "ignore")]
1628    controller: mz_controller::Controller,
1629    /// The catalog in an Arc suitable for readonly references. The Arc allows
1630    /// us to hand out cheap copies of the catalog to functions that can use it
1631    /// off of the main coordinator thread. If the coordinator needs to mutate
1632    /// the catalog, call [`Self::catalog_mut`], which will clone this struct member,
1633    /// allowing it to be mutated here while the other off-thread references can
1634    /// read their catalog as long as needed. In the future we would like this
1635    /// to be a pTVC, but for now this is sufficient.
1636    catalog: Arc<Catalog>,
1637
1638    /// Channel to manage internal commands from the coordinator to itself.
1639    internal_cmd_tx: mpsc::UnboundedSender<Message>,
1640    /// Notification that triggers a group commit.
1641    group_commit_tx: appends::GroupCommitNotifier,
1642
1643    /// Channel for strict serializable reads ready to commit.
1644    strict_serializable_reads_tx: mpsc::UnboundedSender<(ConnectionId, PendingReadTxn)>,
1645
1646    /// Mechanism for totally ordering write and read timestamps, so that all reads
1647    /// reflect exactly the set of writes that precede them, and no writes that follow.
1648    global_timelines: BTreeMap<Timeline, TimelineState<Timestamp>>,
1649
1650    /// A generator for transient [`GlobalId`]s, shareable with other threads.
1651    transient_id_gen: Arc<TransientIdGen>,
1652    /// A map from connection ID to metadata about that connection for all
1653    /// active connections.
1654    active_conns: BTreeMap<ConnectionId, ConnMeta>,
1655
1656    /// For each transaction, the read holds taken to support any performed reads.
1657    ///
1658    /// Upon completing a transaction, these read holds should be dropped.
1659    txn_read_holds: BTreeMap<ConnectionId, read_policy::ReadHolds<Timestamp>>,
1660
1661    /// Access to the peek fields should be restricted to methods in the [`peek`] API.
1662    /// A map from pending peek ids to the queue into which responses are sent, and
1663    /// the connection id of the client that initiated the peek.
1664    pending_peeks: BTreeMap<Uuid, PendingPeek>,
1665    /// A map from client connection ids to a set of all pending peeks for that client.
1666    client_pending_peeks: BTreeMap<ConnectionId, BTreeMap<Uuid, ClusterId>>,
1667
1668    /// A map from client connection ids to pending linearize read transaction.
1669    pending_linearize_read_txns: BTreeMap<ConnectionId, PendingReadTxn>,
1670
1671    /// A map from the compute sink ID to it's state description.
1672    active_compute_sinks: BTreeMap<GlobalId, ActiveComputeSink>,
1673    /// A map from active webhooks to their invalidation handle.
1674    active_webhooks: BTreeMap<CatalogItemId, WebhookAppenderInvalidator>,
1675    /// A map of active `COPY FROM` statements. The Coordinator waits for `clusterd`
1676    /// to stage Batches in Persist that we will then link into the shard.
1677    active_copies: BTreeMap<ConnectionId, ActiveCopyFrom>,
1678
1679    /// A map from connection ids to a watch channel that is set to `true` if the connection
1680    /// received a cancel request.
1681    staged_cancellation: BTreeMap<ConnectionId, (watch::Sender<bool>, watch::Receiver<bool>)>,
1682    /// Active introspection subscribes.
1683    introspection_subscribes: BTreeMap<GlobalId, IntrospectionSubscribe>,
1684
1685    /// Locks that grant access to a specific object, populated lazily as objects are written to.
1686    write_locks: BTreeMap<CatalogItemId, Arc<tokio::sync::Mutex<()>>>,
1687    /// Plans that are currently deferred and waiting on a write lock.
1688    deferred_write_ops: BTreeMap<ConnectionId, DeferredOp>,
1689
1690    /// Pending writes waiting for a group commit.
1691    pending_writes: Vec<PendingWriteTxn>,
1692
1693    /// For the realtime timeline, an explicit SELECT or INSERT on a table will bump the
1694    /// table's timestamps, but there are cases where timestamps are not bumped but
1695    /// we expect the closed timestamps to advance (`AS OF X`, SUBSCRIBing views over
1696    /// RT sources and tables). To address these, spawn a task that forces table
1697    /// timestamps to close on a regular interval. This roughly tracks the behavior
1698    /// of realtime sources that close off timestamps on an interval.
1699    ///
1700    /// For non-realtime timelines, nothing pushes the timestamps forward, so we must do
1701    /// it manually.
1702    advance_timelines_interval: Interval,
1703
1704    /// Serialized DDL. DDL must be serialized because:
1705    /// - Many of them do off-thread work and need to verify the catalog is in a valid state, but
1706    ///   [`PlanValidity`] does not currently support tracking all changes. Doing that correctly
1707    ///   seems to be more difficult than it's worth, so we would instead re-plan and re-sequence
1708    ///   the statements.
1709    /// - Re-planning a statement is hard because Coordinator and Session state is mutated at
1710    ///   various points, and we would need to correctly reset those changes before re-planning and
1711    ///   re-sequencing.
1712    serialized_ddl: LockedVecDeque<DeferredPlanStatement>,
1713
1714    /// Handle to secret manager that can create and delete secrets from
1715    /// an arbitrary secret storage engine.
1716    secrets_controller: Arc<dyn SecretsController>,
1717    /// A secrets reader than maintains an in-memory cache, where values have a set TTL.
1718    caching_secrets_reader: CachingSecretsReader,
1719
1720    /// Handle to a manager that can create and delete kubernetes resources
1721    /// (ie: VpcEndpoint objects)
1722    cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
1723
1724    /// Metadata about replicas that doesn't need to be persisted.
1725    /// Intended for inclusion in system tables.
1726    ///
1727    /// `None` is used as a tombstone value for replicas that have been
1728    /// dropped and for which no further updates should be recorded.
1729    transient_replica_metadata: BTreeMap<ReplicaId, Option<ReplicaMetadata>>,
1730
1731    /// Persist client for fetching storage metadata such as size metrics.
1732    storage_usage_client: StorageUsageClient,
1733    /// The interval at which to collect storage usage information.
1734    storage_usage_collection_interval: Duration,
1735
1736    /// Segment analytics client.
1737    #[derivative(Debug = "ignore")]
1738    segment_client: Option<mz_segment::Client>,
1739
1740    /// Coordinator metrics.
1741    metrics: Metrics,
1742    /// Optimizer metrics.
1743    optimizer_metrics: OptimizerMetrics,
1744
1745    /// Tracing handle.
1746    tracing_handle: TracingHandle,
1747
1748    /// Data used by the statement logging feature.
1749    statement_logging: StatementLogging,
1750
1751    /// Limit for how many concurrent webhook requests we allow.
1752    webhook_concurrency_limit: WebhookConcurrencyLimiter,
1753
1754    /// Optional config for the Postgres-backed timestamp oracle. This is
1755    /// _required_ when `postgres` is configured using the `timestamp_oracle`
1756    /// system variable.
1757    pg_timestamp_oracle_config: Option<PostgresTimestampOracleConfig>,
1758
1759    /// Periodically asks cluster scheduling policies to make their decisions.
1760    check_cluster_scheduling_policies_interval: Interval,
1761
1762    /// This keeps the last On/Off decision for each cluster and each scheduling policy.
1763    /// (Clusters that have been dropped or are otherwise out of scope for automatic scheduling are
1764    /// periodically cleaned up from this Map.)
1765    cluster_scheduling_decisions: BTreeMap<ClusterId, BTreeMap<&'static str, SchedulingDecision>>,
1766
1767    /// When doing 0dt upgrades/in read-only mode, periodically ask all known
1768    /// clusters/collections whether they are caught up.
1769    caught_up_check_interval: Interval,
1770
1771    /// Context needed to check whether all clusters/collections have caught up.
1772    /// Only used during 0dt deployment, while in read-only mode.
1773    caught_up_check: Option<CaughtUpCheckContext>,
1774
1775    /// Tracks the state associated with the currently installed watchsets.
1776    installed_watch_sets: BTreeMap<WatchSetId, (ConnectionId, WatchSetResponse)>,
1777
1778    /// Tracks the currently installed watchsets for each connection.
1779    connection_watch_sets: BTreeMap<ConnectionId, BTreeSet<WatchSetId>>,
1780
1781    /// Tracks the statuses of all cluster replicas.
1782    cluster_replica_statuses: ClusterReplicaStatuses,
1783
1784    /// Whether or not to start controllers in read-only mode. This is only
1785    /// meant for use during development of read-only clusters and 0dt upgrades
1786    /// and should go away once we have proper orchestration during upgrades.
1787    read_only_controllers: bool,
1788
1789    /// Updates to builtin tables that are being buffered while we are in
1790    /// read-only mode. We apply these all at once when coming out of read-only
1791    /// mode.
1792    ///
1793    /// This is a `Some` while in read-only mode and will be replaced by a
1794    /// `None` when we transition out of read-only mode and write out any
1795    /// buffered updates.
1796    buffered_builtin_table_updates: Option<Vec<BuiltinTableUpdate>>,
1797
1798    license_key: ValidatedLicenseKey,
1799}
1800
1801impl Coordinator {
1802    /// Initializes coordinator state based on the contained catalog. Must be
1803    /// called after creating the coordinator and before calling the
1804    /// `Coordinator::serve` method.
1805    #[instrument(name = "coord::bootstrap")]
1806    pub(crate) async fn bootstrap(
1807        &mut self,
1808        boot_ts: Timestamp,
1809        migrated_storage_collections_0dt: BTreeSet<CatalogItemId>,
1810        mut builtin_table_updates: Vec<BuiltinTableUpdate>,
1811        cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
1812        uncached_local_exprs: BTreeMap<GlobalId, LocalExpressions>,
1813        audit_logs_iterator: AuditLogIterator,
1814    ) -> Result<(), AdapterError> {
1815        let bootstrap_start = Instant::now();
1816        info!("startup: coordinator init: bootstrap beginning");
1817        info!("startup: coordinator init: bootstrap: preamble beginning");
1818
1819        // Initialize cluster replica statuses.
1820        // Gross iterator is to avoid partial borrow issues.
1821        let cluster_statuses: Vec<(_, Vec<_>)> = self
1822            .catalog()
1823            .clusters()
1824            .map(|cluster| {
1825                (
1826                    cluster.id(),
1827                    cluster
1828                        .replicas()
1829                        .map(|replica| {
1830                            (replica.replica_id, replica.config.location.num_processes())
1831                        })
1832                        .collect(),
1833                )
1834            })
1835            .collect();
1836        let now = self.now_datetime();
1837        for (cluster_id, replica_statuses) in cluster_statuses {
1838            self.cluster_replica_statuses
1839                .initialize_cluster_statuses(cluster_id);
1840            for (replica_id, num_processes) in replica_statuses {
1841                self.cluster_replica_statuses
1842                    .initialize_cluster_replica_statuses(
1843                        cluster_id,
1844                        replica_id,
1845                        num_processes,
1846                        now,
1847                    );
1848            }
1849        }
1850        for replica_statuses in self.cluster_replica_statuses.0.values() {
1851            for (replica_id, processes_statuses) in replica_statuses {
1852                for (process_id, status) in processes_statuses {
1853                    let builtin_table_update =
1854                        self.catalog().state().pack_cluster_replica_status_update(
1855                            *replica_id,
1856                            *process_id,
1857                            status,
1858                            Diff::ONE,
1859                        );
1860                    let builtin_table_update = self
1861                        .catalog()
1862                        .state()
1863                        .resolve_builtin_table_update(builtin_table_update);
1864                    builtin_table_updates.push(builtin_table_update);
1865                }
1866            }
1867        }
1868
1869        let system_config = self.catalog().system_config();
1870
1871        // Inform metrics about the initial system configuration.
1872        mz_metrics::update_dyncfg(&system_config.dyncfg_updates());
1873
1874        // Inform the controllers about their initial configuration.
1875        let compute_config = flags::compute_config(system_config);
1876        let storage_config = flags::storage_config(system_config);
1877        let scheduling_config = flags::orchestrator_scheduling_config(system_config);
1878        let exert_prop = system_config.arrangement_exert_proportionality();
1879        self.controller.compute.update_configuration(compute_config);
1880        self.controller.storage.update_parameters(storage_config);
1881        self.controller
1882            .update_orchestrator_scheduling_config(scheduling_config);
1883        self.controller
1884            .set_arrangement_exert_proportionality(exert_prop);
1885
1886        let mut policies_to_set: BTreeMap<CompactionWindow, CollectionIdBundle> =
1887            Default::default();
1888
1889        let enable_worker_core_affinity =
1890            self.catalog().system_config().enable_worker_core_affinity();
1891        for instance in self.catalog.clusters() {
1892            self.controller.create_cluster(
1893                instance.id,
1894                ClusterConfig {
1895                    arranged_logs: instance.log_indexes.clone(),
1896                    workload_class: instance.config.workload_class.clone(),
1897                },
1898            )?;
1899            for replica in instance.replicas() {
1900                let role = instance.role();
1901                self.controller.create_replica(
1902                    instance.id,
1903                    replica.replica_id,
1904                    role,
1905                    replica.config.clone(),
1906                    enable_worker_core_affinity,
1907                )?;
1908            }
1909        }
1910
1911        info!(
1912            "startup: coordinator init: bootstrap: preamble complete in {:?}",
1913            bootstrap_start.elapsed()
1914        );
1915
1916        let init_storage_collections_start = Instant::now();
1917        info!("startup: coordinator init: bootstrap: storage collections init beginning");
1918        self.bootstrap_storage_collections(&migrated_storage_collections_0dt)
1919            .await;
1920        info!(
1921            "startup: coordinator init: bootstrap: storage collections init complete in {:?}",
1922            init_storage_collections_start.elapsed()
1923        );
1924
1925        // The storage controller knows about the introspection collections now, so we can start
1926        // sinking introspection updates in the compute controller. It makes sense to do that as
1927        // soon as possible, to avoid updates piling up in the compute controller's internal
1928        // buffers.
1929        self.controller.start_compute_introspection_sink();
1930
1931        let optimize_dataflows_start = Instant::now();
1932        info!("startup: coordinator init: bootstrap: optimize dataflow plans beginning");
1933        let entries: Vec<_> = self.catalog().entries().cloned().collect();
1934        let uncached_global_exps = self.bootstrap_dataflow_plans(&entries, cached_global_exprs)?;
1935        info!(
1936            "startup: coordinator init: bootstrap: optimize dataflow plans complete in {:?}",
1937            optimize_dataflows_start.elapsed()
1938        );
1939
1940        // We don't need to wait for the cache to update.
1941        let _fut = self.catalog().update_expression_cache(
1942            uncached_local_exprs.into_iter().collect(),
1943            uncached_global_exps.into_iter().collect(),
1944        );
1945
1946        // Select dataflow as-ofs. This step relies on the storage collections created by
1947        // `bootstrap_storage_collections` and the dataflow plans created by
1948        // `bootstrap_dataflow_plans`.
1949        let bootstrap_as_ofs_start = Instant::now();
1950        info!("startup: coordinator init: bootstrap: dataflow as-of bootstrapping beginning");
1951        let dataflow_read_holds = self.bootstrap_dataflow_as_ofs().await;
1952        info!(
1953            "startup: coordinator init: bootstrap: dataflow as-of bootstrapping complete in {:?}",
1954            bootstrap_as_ofs_start.elapsed()
1955        );
1956
1957        let postamble_start = Instant::now();
1958        info!("startup: coordinator init: bootstrap: postamble beginning");
1959
1960        let logs: BTreeSet<_> = BUILTINS::logs()
1961            .map(|log| self.catalog().resolve_builtin_log(log))
1962            .flat_map(|item_id| self.catalog().get_global_ids(&item_id))
1963            .collect();
1964
1965        let mut privatelink_connections = BTreeMap::new();
1966
1967        for entry in &entries {
1968            debug!(
1969                "coordinator init: installing {} {}",
1970                entry.item().typ(),
1971                entry.id()
1972            );
1973            let mut policy = entry.item().initial_logical_compaction_window();
1974            match entry.item() {
1975                // Currently catalog item rebuild assumes that sinks and
1976                // indexes are always built individually and does not store information
1977                // about how it was built. If we start building multiple sinks and/or indexes
1978                // using a single dataflow, we have to make sure the rebuild process re-runs
1979                // the same multiple-build dataflow.
1980                CatalogItem::Source(source) => {
1981                    // Propagate source compaction windows to subsources if needed.
1982                    if source.custom_logical_compaction_window.is_none() {
1983                        if let DataSourceDesc::IngestionExport { ingestion_id, .. } =
1984                            source.data_source
1985                        {
1986                            policy = Some(
1987                                self.catalog()
1988                                    .get_entry(&ingestion_id)
1989                                    .source()
1990                                    .expect("must be source")
1991                                    .custom_logical_compaction_window
1992                                    .unwrap_or_default(),
1993                            );
1994                        }
1995                    }
1996                    policies_to_set
1997                        .entry(policy.expect("sources have a compaction window"))
1998                        .or_insert_with(Default::default)
1999                        .storage_ids
2000                        .insert(source.global_id());
2001                }
2002                CatalogItem::Table(table) => {
2003                    policies_to_set
2004                        .entry(policy.expect("tables have a compaction window"))
2005                        .or_insert_with(Default::default)
2006                        .storage_ids
2007                        .extend(table.global_ids());
2008                }
2009                CatalogItem::Index(idx) => {
2010                    let policy_entry = policies_to_set
2011                        .entry(policy.expect("indexes have a compaction window"))
2012                        .or_insert_with(Default::default);
2013
2014                    if logs.contains(&idx.on) {
2015                        policy_entry
2016                            .compute_ids
2017                            .entry(idx.cluster_id)
2018                            .or_insert_with(BTreeSet::new)
2019                            .insert(idx.global_id());
2020                    } else {
2021                        let df_desc = self
2022                            .catalog()
2023                            .try_get_physical_plan(&idx.global_id())
2024                            .expect("added in `bootstrap_dataflow_plans`")
2025                            .clone();
2026
2027                        let df_meta = self
2028                            .catalog()
2029                            .try_get_dataflow_metainfo(&idx.global_id())
2030                            .expect("added in `bootstrap_dataflow_plans`");
2031
2032                        if self.catalog().state().system_config().enable_mz_notices() {
2033                            // Collect optimization hint updates.
2034                            self.catalog().state().pack_optimizer_notices(
2035                                &mut builtin_table_updates,
2036                                df_meta.optimizer_notices.iter(),
2037                                Diff::ONE,
2038                            );
2039                        }
2040
2041                        // What follows is morally equivalent to `self.ship_dataflow(df, idx.cluster_id)`,
2042                        // but we cannot call that as it will also downgrade the read hold on the index.
2043                        policy_entry
2044                            .compute_ids
2045                            .entry(idx.cluster_id)
2046                            .or_insert_with(Default::default)
2047                            .extend(df_desc.export_ids());
2048
2049                        self.controller
2050                            .compute
2051                            .create_dataflow(idx.cluster_id, df_desc, None)
2052                            .unwrap_or_terminate("cannot fail to create dataflows");
2053                    }
2054                }
2055                CatalogItem::View(_) => (),
2056                CatalogItem::MaterializedView(mview) => {
2057                    policies_to_set
2058                        .entry(policy.expect("materialized views have a compaction window"))
2059                        .or_insert_with(Default::default)
2060                        .storage_ids
2061                        .insert(mview.global_id());
2062
2063                    let mut df_desc = self
2064                        .catalog()
2065                        .try_get_physical_plan(&mview.global_id())
2066                        .expect("added in `bootstrap_dataflow_plans`")
2067                        .clone();
2068
2069                    if let Some(initial_as_of) = mview.initial_as_of.clone() {
2070                        df_desc.set_initial_as_of(initial_as_of);
2071                    }
2072
2073                    // If we have a refresh schedule that has a last refresh, then set the `until` to the last refresh.
2074                    let until = mview
2075                        .refresh_schedule
2076                        .as_ref()
2077                        .and_then(|s| s.last_refresh())
2078                        .and_then(|r| r.try_step_forward());
2079                    if let Some(until) = until {
2080                        df_desc.until.meet_assign(&Antichain::from_elem(until));
2081                    }
2082
2083                    let df_meta = self
2084                        .catalog()
2085                        .try_get_dataflow_metainfo(&mview.global_id())
2086                        .expect("added in `bootstrap_dataflow_plans`");
2087
2088                    if self.catalog().state().system_config().enable_mz_notices() {
2089                        // Collect optimization hint updates.
2090                        self.catalog().state().pack_optimizer_notices(
2091                            &mut builtin_table_updates,
2092                            df_meta.optimizer_notices.iter(),
2093                            Diff::ONE,
2094                        );
2095                    }
2096
2097                    self.ship_dataflow(df_desc, mview.cluster_id, None).await;
2098                }
2099                CatalogItem::Sink(sink) => {
2100                    policies_to_set
2101                        .entry(CompactionWindow::Default)
2102                        .or_insert_with(Default::default)
2103                        .storage_ids
2104                        .insert(sink.global_id());
2105                }
2106                CatalogItem::Connection(catalog_connection) => {
2107                    if let ConnectionDetails::AwsPrivatelink(conn) = &catalog_connection.details {
2108                        privatelink_connections.insert(
2109                            entry.id(),
2110                            VpcEndpointConfig {
2111                                aws_service_name: conn.service_name.clone(),
2112                                availability_zone_ids: conn.availability_zones.clone(),
2113                            },
2114                        );
2115                    }
2116                }
2117                CatalogItem::ContinualTask(ct) => {
2118                    policies_to_set
2119                        .entry(policy.expect("continual tasks have a compaction window"))
2120                        .or_insert_with(Default::default)
2121                        .storage_ids
2122                        .insert(ct.global_id());
2123
2124                    let mut df_desc = self
2125                        .catalog()
2126                        .try_get_physical_plan(&ct.global_id())
2127                        .expect("added in `bootstrap_dataflow_plans`")
2128                        .clone();
2129
2130                    if let Some(initial_as_of) = ct.initial_as_of.clone() {
2131                        df_desc.set_initial_as_of(initial_as_of);
2132                    }
2133
2134                    let df_meta = self
2135                        .catalog()
2136                        .try_get_dataflow_metainfo(&ct.global_id())
2137                        .expect("added in `bootstrap_dataflow_plans`");
2138
2139                    if self.catalog().state().system_config().enable_mz_notices() {
2140                        // Collect optimization hint updates.
2141                        self.catalog().state().pack_optimizer_notices(
2142                            &mut builtin_table_updates,
2143                            df_meta.optimizer_notices.iter(),
2144                            Diff::ONE,
2145                        );
2146                    }
2147
2148                    self.ship_dataflow(df_desc, ct.cluster_id, None).await;
2149                }
2150                // Nothing to do for these cases
2151                CatalogItem::Log(_)
2152                | CatalogItem::Type(_)
2153                | CatalogItem::Func(_)
2154                | CatalogItem::Secret(_) => {}
2155            }
2156        }
2157
2158        if let Some(cloud_resource_controller) = &self.cloud_resource_controller {
2159            // Clean up any extraneous VpcEndpoints that shouldn't exist.
2160            let existing_vpc_endpoints = cloud_resource_controller.list_vpc_endpoints().await?;
2161            let existing_vpc_endpoints = BTreeSet::from_iter(existing_vpc_endpoints.into_keys());
2162            let desired_vpc_endpoints = privatelink_connections.keys().cloned().collect();
2163            let vpc_endpoints_to_remove = existing_vpc_endpoints.difference(&desired_vpc_endpoints);
2164            for id in vpc_endpoints_to_remove {
2165                cloud_resource_controller.delete_vpc_endpoint(*id).await?;
2166            }
2167
2168            // Ensure desired VpcEndpoints are up to date.
2169            for (id, spec) in privatelink_connections {
2170                cloud_resource_controller
2171                    .ensure_vpc_endpoint(id, spec)
2172                    .await?;
2173            }
2174        }
2175
2176        // Having installed all entries, creating all constraints, we can now drop read holds and
2177        // relax read policies.
2178        drop(dataflow_read_holds);
2179        // TODO -- Improve `initialize_read_policies` API so we can avoid calling this in a loop.
2180        for (cw, policies) in policies_to_set {
2181            self.initialize_read_policies(&policies, cw).await;
2182        }
2183
2184        // Expose mapping from T-shirt sizes to actual sizes
2185        builtin_table_updates.extend(
2186            self.catalog().state().resolve_builtin_table_updates(
2187                self.catalog().state().pack_all_replica_size_updates(),
2188            ),
2189        );
2190
2191        debug!("startup: coordinator init: bootstrap: initializing migrated builtin tables");
2192        // When 0dt is enabled, we create new shards for any migrated builtin storage collections.
2193        // In read-only mode, the migrated builtin tables (which are a subset of migrated builtin
2194        // storage collections) need to be back-filled so that any dependent dataflow can be
2195        // hydrated. Additionally, these shards are not registered with the txn-shard, and cannot
2196        // be registered while in read-only, so they are written to directly.
2197        let migrated_updates_fut = if self.controller.read_only() {
2198            let min_timestamp = Timestamp::minimum();
2199            let migrated_builtin_table_updates: Vec<_> = builtin_table_updates
2200                .drain_filter_swapping(|update| {
2201                    let gid = self.catalog().get_entry(&update.id).latest_global_id();
2202                    migrated_storage_collections_0dt.contains(&update.id)
2203                        && self
2204                            .controller
2205                            .storage_collections
2206                            .collection_frontiers(gid)
2207                            .expect("all tables are registered")
2208                            .write_frontier
2209                            .elements()
2210                            == &[min_timestamp]
2211                })
2212                .collect();
2213            if migrated_builtin_table_updates.is_empty() {
2214                futures::future::ready(()).boxed()
2215            } else {
2216                // Group all updates per-table.
2217                let mut grouped_appends: BTreeMap<GlobalId, Vec<TableData>> = BTreeMap::new();
2218                for update in migrated_builtin_table_updates {
2219                    let gid = self.catalog().get_entry(&update.id).latest_global_id();
2220                    grouped_appends.entry(gid).or_default().push(update.data);
2221                }
2222                info!(
2223                    "coordinator init: rehydrating migrated builtin tables in read-only mode: {:?}",
2224                    grouped_appends.keys().collect::<Vec<_>>()
2225                );
2226
2227                // Consolidate Row data, staged batches must already be consolidated.
2228                let mut all_appends = Vec::with_capacity(grouped_appends.len());
2229                for (item_id, table_data) in grouped_appends.into_iter() {
2230                    let mut all_rows = Vec::new();
2231                    let mut all_data = Vec::new();
2232                    for data in table_data {
2233                        match data {
2234                            TableData::Rows(rows) => all_rows.extend(rows),
2235                            TableData::Batches(_) => all_data.push(data),
2236                        }
2237                    }
2238                    differential_dataflow::consolidation::consolidate(&mut all_rows);
2239                    all_data.push(TableData::Rows(all_rows));
2240
2241                    // TODO(parkmycar): Use SmallVec throughout.
2242                    all_appends.push((item_id, all_data));
2243                }
2244
2245                let fut = self
2246                    .controller
2247                    .storage
2248                    .append_table(min_timestamp, boot_ts.step_forward(), all_appends)
2249                    .expect("cannot fail to append");
2250                async {
2251                    fut.await
2252                        .expect("One-shot shouldn't be dropped during bootstrap")
2253                        .unwrap_or_terminate("cannot fail to append")
2254                }
2255                .boxed()
2256            }
2257        } else {
2258            futures::future::ready(()).boxed()
2259        };
2260
2261        info!(
2262            "startup: coordinator init: bootstrap: postamble complete in {:?}",
2263            postamble_start.elapsed()
2264        );
2265
2266        let builtin_update_start = Instant::now();
2267        info!("startup: coordinator init: bootstrap: generate builtin updates beginning");
2268
2269        if self.controller.read_only() {
2270            info!(
2271                "coordinator init: bootstrap: stashing builtin table updates while in read-only mode"
2272            );
2273
2274            // TODO(jkosh44) Optimize deserializing the audit log in read-only mode.
2275            let audit_join_start = Instant::now();
2276            info!("startup: coordinator init: bootstrap: audit log deserialization beginning");
2277            let audit_log_updates: Vec<_> = audit_logs_iterator
2278                .map(|(audit_log, ts)| StateUpdate {
2279                    kind: StateUpdateKind::AuditLog(audit_log),
2280                    ts,
2281                    diff: StateDiff::Addition,
2282                })
2283                .collect();
2284            let audit_log_builtin_table_updates = self
2285                .catalog()
2286                .state()
2287                .generate_builtin_table_updates(audit_log_updates);
2288            builtin_table_updates.extend(audit_log_builtin_table_updates);
2289            info!(
2290                "startup: coordinator init: bootstrap: audit log deserialization complete in {:?}",
2291                audit_join_start.elapsed()
2292            );
2293            self.buffered_builtin_table_updates
2294                .as_mut()
2295                .expect("in read-only mode")
2296                .append(&mut builtin_table_updates);
2297        } else {
2298            self.bootstrap_tables(&entries, builtin_table_updates, audit_logs_iterator)
2299                .await;
2300        };
2301        info!(
2302            "startup: coordinator init: bootstrap: generate builtin updates complete in {:?}",
2303            builtin_update_start.elapsed()
2304        );
2305
2306        let cleanup_secrets_start = Instant::now();
2307        info!("startup: coordinator init: bootstrap: generate secret cleanup beginning");
2308        // Cleanup orphaned secrets. Errors during list() or delete() do not
2309        // need to prevent bootstrap from succeeding; we will retry next
2310        // startup.
2311        {
2312            // Destructure Self so we can selectively move fields into the async
2313            // task.
2314            let Self {
2315                secrets_controller,
2316                catalog,
2317                ..
2318            } = self;
2319
2320            let next_user_item_id = catalog.get_next_user_item_id().await?;
2321            let next_system_item_id = catalog.get_next_system_item_id().await?;
2322            let read_only = self.controller.read_only();
2323            // Fetch all IDs from the catalog to future-proof against other
2324            // things using secrets. Today, SECRET and CONNECTION objects use
2325            // secrets_controller.ensure, but more things could in the future
2326            // that would be easy to miss adding here.
2327            let catalog_ids: BTreeSet<CatalogItemId> =
2328                catalog.entries().map(|entry| entry.id()).collect();
2329            let secrets_controller = Arc::clone(secrets_controller);
2330
2331            spawn(|| "cleanup-orphaned-secrets", async move {
2332                if read_only {
2333                    info!(
2334                        "coordinator init: not cleaning up orphaned secrets while in read-only mode"
2335                    );
2336                    return;
2337                }
2338                info!("coordinator init: cleaning up orphaned secrets");
2339
2340                match secrets_controller.list().await {
2341                    Ok(controller_secrets) => {
2342                        let controller_secrets: BTreeSet<CatalogItemId> =
2343                            controller_secrets.into_iter().collect();
2344                        let orphaned = controller_secrets.difference(&catalog_ids);
2345                        for id in orphaned {
2346                            let id_too_large = match id {
2347                                CatalogItemId::System(id) => *id >= next_system_item_id,
2348                                CatalogItemId::User(id) => *id >= next_user_item_id,
2349                                CatalogItemId::IntrospectionSourceIndex(_)
2350                                | CatalogItemId::Transient(_) => false,
2351                            };
2352                            if id_too_large {
2353                                info!(
2354                                    %next_user_item_id, %next_system_item_id,
2355                                    "coordinator init: not deleting orphaned secret {id} that was likely created by a newer deploy generation"
2356                                );
2357                            } else {
2358                                info!("coordinator init: deleting orphaned secret {id}");
2359                                fail_point!("orphan_secrets");
2360                                if let Err(e) = secrets_controller.delete(*id).await {
2361                                    warn!(
2362                                        "Dropping orphaned secret has encountered an error: {}",
2363                                        e
2364                                    );
2365                                }
2366                            }
2367                        }
2368                    }
2369                    Err(e) => warn!("Failed to list secrets during orphan cleanup: {:?}", e),
2370                }
2371            });
2372        }
2373        info!(
2374            "startup: coordinator init: bootstrap: generate secret cleanup complete in {:?}",
2375            cleanup_secrets_start.elapsed()
2376        );
2377
2378        // Run all of our final steps concurrently.
2379        let final_steps_start = Instant::now();
2380        info!(
2381            "startup: coordinator init: bootstrap: migrate builtin tables in read-only mode beginning"
2382        );
2383        migrated_updates_fut
2384            .instrument(info_span!("coord::bootstrap::final"))
2385            .await;
2386
2387        debug!(
2388            "startup: coordinator init: bootstrap: announcing completion of initialization to controller"
2389        );
2390        // Announce the completion of initialization.
2391        self.controller.initialization_complete();
2392
2393        // Initialize unified introspection.
2394        self.bootstrap_introspection_subscribes().await;
2395
2396        info!(
2397            "startup: coordinator init: bootstrap: migrate builtin tables in read-only mode complete in {:?}",
2398            final_steps_start.elapsed()
2399        );
2400
2401        info!(
2402            "startup: coordinator init: bootstrap complete in {:?}",
2403            bootstrap_start.elapsed()
2404        );
2405        Ok(())
2406    }
2407
2408    /// Prepares tables for writing by resetting them to a known state and
2409    /// appending the given builtin table updates. The timestamp oracle
2410    /// will be advanced to the write timestamp of the append when this
2411    /// method returns.
2412    #[allow(clippy::async_yields_async)]
2413    #[instrument]
2414    async fn bootstrap_tables(
2415        &mut self,
2416        entries: &[CatalogEntry],
2417        mut builtin_table_updates: Vec<BuiltinTableUpdate>,
2418        audit_logs_iterator: AuditLogIterator,
2419    ) {
2420        /// Smaller helper struct of metadata for bootstrapping tables.
2421        struct TableMetadata<'a> {
2422            id: CatalogItemId,
2423            name: &'a QualifiedItemName,
2424            table: &'a Table,
2425        }
2426
2427        // Filter our entries down to just tables.
2428        let table_metas: Vec<_> = entries
2429            .into_iter()
2430            .filter_map(|entry| {
2431                entry.table().map(|table| TableMetadata {
2432                    id: entry.id(),
2433                    name: entry.name(),
2434                    table,
2435                })
2436            })
2437            .collect();
2438
2439        // Append empty batches to advance the timestamp of all tables.
2440        debug!("coordinator init: advancing all tables to current timestamp");
2441        let WriteTimestamp {
2442            timestamp: write_ts,
2443            advance_to,
2444        } = self.get_local_write_ts().await;
2445        let appends = table_metas
2446            .iter()
2447            .map(|meta| (meta.table.global_id_writes(), Vec::new()))
2448            .collect();
2449        // Append the tables in the background. We apply the write timestamp before getting a read
2450        // timestamp and reading a snapshot of each table, so the snapshots will block on their own
2451        // until the appends are complete.
2452        let table_fence_rx = self
2453            .controller
2454            .storage
2455            .append_table(write_ts.clone(), advance_to, appends)
2456            .expect("invalid updates");
2457
2458        self.apply_local_write(write_ts).await;
2459
2460        // Add builtin table updates the clear the contents of all system tables
2461        debug!("coordinator init: resetting system tables");
2462        let read_ts = self.get_local_read_ts().await;
2463
2464        // Filter out the 'mz_storage_usage_by_shard' table since we need to retain that info for
2465        // billing purposes.
2466        let mz_storage_usage_by_shard_schema: SchemaSpecifier = self
2467            .catalog()
2468            .resolve_system_schema(MZ_STORAGE_USAGE_BY_SHARD.schema)
2469            .into();
2470        let is_storage_usage_by_shard = |meta: &TableMetadata| -> bool {
2471            meta.name.item == MZ_STORAGE_USAGE_BY_SHARD.name
2472                && meta.name.qualifiers.schema_spec == mz_storage_usage_by_shard_schema
2473        };
2474
2475        let mut retraction_tasks = Vec::new();
2476        let mut system_tables: Vec<_> = table_metas
2477            .iter()
2478            .filter(|meta| meta.id.is_system() && !is_storage_usage_by_shard(meta))
2479            .collect();
2480
2481        // Special case audit events because it's append only.
2482        let (audit_events_idx, _) = system_tables
2483            .iter()
2484            .find_position(|table| {
2485                table.id == self.catalog().resolve_builtin_table(&MZ_AUDIT_EVENTS)
2486            })
2487            .expect("mz_audit_events must exist");
2488        let audit_events = system_tables.remove(audit_events_idx);
2489        let audit_log_task = self.bootstrap_audit_log_table(
2490            audit_events.id,
2491            audit_events.name,
2492            audit_events.table,
2493            audit_logs_iterator,
2494            read_ts,
2495        );
2496
2497        for system_table in system_tables {
2498            let table_id = system_table.id;
2499            let full_name = self.catalog().resolve_full_name(system_table.name, None);
2500            debug!("coordinator init: resetting system table {full_name} ({table_id})");
2501
2502            // Fetch the current contents of the table for retraction.
2503            let snapshot_fut = self
2504                .controller
2505                .storage_collections
2506                .snapshot_cursor(system_table.table.global_id_writes(), read_ts);
2507            let batch_fut = self
2508                .controller
2509                .storage_collections
2510                .create_update_builder(system_table.table.global_id_writes());
2511
2512            let task = spawn(|| format!("snapshot-{table_id}"), async move {
2513                // Create a TimestamplessUpdateBuilder.
2514                let mut batch = batch_fut
2515                    .await
2516                    .unwrap_or_terminate("cannot fail to create a batch for a BuiltinTable");
2517                tracing::info!(?table_id, "starting snapshot");
2518                // Get a cursor which will emit a consolidated snapshot.
2519                let mut snapshot_cursor = snapshot_fut
2520                    .await
2521                    .unwrap_or_terminate("cannot fail to snapshot");
2522
2523                // Retract the current contents, spilling into our builder.
2524                while let Some(values) = snapshot_cursor.next().await {
2525                    for ((key, _val), _t, d) in values {
2526                        let key = key.expect("builtin table had errors");
2527                        let d_invert = d.neg();
2528                        batch.add(&key, &(), &d_invert).await;
2529                    }
2530                }
2531                tracing::info!(?table_id, "finished snapshot");
2532
2533                let batch = batch.finish().await;
2534                BuiltinTableUpdate::batch(table_id, batch)
2535            });
2536            retraction_tasks.push(task);
2537        }
2538
2539        let retractions_res = futures::future::join_all(retraction_tasks).await;
2540        for retractions in retractions_res {
2541            let retractions = retractions.expect("cannot fail to fetch snapshot");
2542            builtin_table_updates.push(retractions);
2543        }
2544
2545        let audit_join_start = Instant::now();
2546        info!("startup: coordinator init: bootstrap: join audit log deserialization beginning");
2547        let audit_log_updates = audit_log_task
2548            .await
2549            .expect("cannot fail to fetch audit log updates");
2550        let audit_log_builtin_table_updates = self
2551            .catalog()
2552            .state()
2553            .generate_builtin_table_updates(audit_log_updates);
2554        builtin_table_updates.extend(audit_log_builtin_table_updates);
2555        info!(
2556            "startup: coordinator init: bootstrap: join audit log deserialization complete in {:?}",
2557            audit_join_start.elapsed()
2558        );
2559
2560        // Now that the snapshots are complete, the appends must also be complete.
2561        table_fence_rx
2562            .await
2563            .expect("One-shot shouldn't be dropped during bootstrap")
2564            .unwrap_or_terminate("cannot fail to append");
2565
2566        info!("coordinator init: sending builtin table updates");
2567        let (_builtin_updates_fut, write_ts) = self
2568            .builtin_table_update()
2569            .execute(builtin_table_updates)
2570            .await;
2571        info!(?write_ts, "our write ts");
2572        if let Some(write_ts) = write_ts {
2573            self.apply_local_write(write_ts).await;
2574        }
2575    }
2576
2577    /// Prepare updates to the audit log table. The audit log table append only and very large, so
2578    /// we only need to find the events present in `audit_logs_iterator` but not in the audit log
2579    /// table.
2580    #[instrument]
2581    fn bootstrap_audit_log_table<'a>(
2582        &mut self,
2583        table_id: CatalogItemId,
2584        name: &'a QualifiedItemName,
2585        table: &'a Table,
2586        audit_logs_iterator: AuditLogIterator,
2587        read_ts: Timestamp,
2588    ) -> JoinHandle<Vec<StateUpdate>> {
2589        let full_name = self.catalog().resolve_full_name(name, None);
2590        debug!("coordinator init: reconciling audit log: {full_name} ({table_id})");
2591        let current_contents_fut = self
2592            .controller
2593            .storage_collections
2594            .snapshot(table.global_id_writes(), read_ts);
2595        spawn(|| format!("snapshot-audit-log-{table_id}"), async move {
2596            let current_contents = current_contents_fut
2597                .await
2598                .unwrap_or_terminate("cannot fail to fetch snapshot");
2599            let contents_len = current_contents.len();
2600            debug!("coordinator init: audit log table ({table_id}) size {contents_len}");
2601
2602            // Fetch the largest audit log event ID that has been written to the table.
2603            let max_table_id = current_contents
2604                .into_iter()
2605                .filter(|(_, diff)| *diff == 1)
2606                .map(|(row, _diff)| row.unpack_first().unwrap_uint64())
2607                .sorted()
2608                .rev()
2609                .next();
2610
2611            // Filter audit log catalog updates to those that are not present in the table.
2612            audit_logs_iterator
2613                .take_while(|(audit_log, _)| match max_table_id {
2614                    Some(id) => audit_log.event.sortable_id() > id,
2615                    None => true,
2616                })
2617                .map(|(audit_log, ts)| StateUpdate {
2618                    kind: StateUpdateKind::AuditLog(audit_log),
2619                    ts,
2620                    diff: StateDiff::Addition,
2621                })
2622                .collect::<Vec<_>>()
2623        })
2624    }
2625
2626    /// Initializes all storage collections required by catalog objects in the storage controller.
2627    ///
2628    /// This method takes care of collection creation, as well as migration of existing
2629    /// collections.
2630    ///
2631    /// Creating all storage collections in a single `create_collections` call, rather than on
2632    /// demand, is more efficient as it reduces the number of writes to durable storage. It also
2633    /// allows subsequent bootstrap logic to fetch metadata (such as frontiers) of arbitrary
2634    /// storage collections, without needing to worry about dependency order.
2635    ///
2636    /// `migrated_storage_collections` is a set of builtin storage collections that have been
2637    /// migrated and should be handled specially.
2638    #[instrument]
2639    async fn bootstrap_storage_collections(
2640        &mut self,
2641        migrated_storage_collections: &BTreeSet<CatalogItemId>,
2642    ) {
2643        let catalog = self.catalog();
2644        let source_status_collection_id = catalog
2645            .resolve_builtin_storage_collection(&mz_catalog::builtin::MZ_SOURCE_STATUS_HISTORY);
2646        let source_status_collection_id = catalog
2647            .get_entry(&source_status_collection_id)
2648            .latest_global_id();
2649
2650        let source_desc =
2651            |data_source: &DataSourceDesc, desc: &RelationDesc, timeline: &Timeline| {
2652                let (data_source, status_collection_id) = match data_source.clone() {
2653                    // Re-announce the source description.
2654                    DataSourceDesc::Ingestion {
2655                        ingestion_desc:
2656                            mz_sql::plan::Ingestion {
2657                                desc,
2658                                progress_subsource,
2659                            },
2660                        cluster_id,
2661                    } => {
2662                        let desc = desc.into_inline_connection(catalog.state());
2663                        // TODO(parkmycar): We should probably check the type here, but I'm not sure if
2664                        // this will always be a Source or a Table.
2665                        let progress_subsource =
2666                            catalog.get_entry(&progress_subsource).latest_global_id();
2667                        let ingestion = mz_storage_types::sources::IngestionDescription::new(
2668                            desc,
2669                            cluster_id,
2670                            progress_subsource,
2671                        );
2672
2673                        (
2674                            DataSource::Ingestion(ingestion.clone()),
2675                            Some(source_status_collection_id),
2676                        )
2677                    }
2678                    DataSourceDesc::IngestionExport {
2679                        ingestion_id,
2680                        external_reference: _,
2681                        details,
2682                        data_config,
2683                    } => {
2684                        // TODO(parkmycar): We should probably check the type here, but I'm not sure if
2685                        // this will always be a Source or a Table.
2686                        let ingestion_id = catalog.get_entry(&ingestion_id).latest_global_id();
2687                        (
2688                            DataSource::IngestionExport {
2689                                ingestion_id,
2690                                details,
2691                                data_config: data_config.into_inline_connection(catalog.state()),
2692                            },
2693                            Some(source_status_collection_id),
2694                        )
2695                    }
2696                    DataSourceDesc::Webhook { .. } => {
2697                        (DataSource::Webhook, Some(source_status_collection_id))
2698                    }
2699                    DataSourceDesc::Progress => (DataSource::Progress, None),
2700                    DataSourceDesc::Introspection(introspection) => {
2701                        (DataSource::Introspection(introspection), None)
2702                    }
2703                };
2704                CollectionDescription {
2705                    desc: desc.clone(),
2706                    data_source,
2707                    since: None,
2708                    status_collection_id,
2709                    timeline: Some(timeline.clone()),
2710                }
2711            };
2712
2713        let mut compute_collections = vec![];
2714        let mut collections = vec![];
2715        let mut new_builtin_continual_tasks = vec![];
2716        for entry in catalog.entries() {
2717            match entry.item() {
2718                CatalogItem::Source(source) => {
2719                    collections.push((
2720                        source.global_id(),
2721                        source_desc(&source.data_source, &source.desc, &source.timeline),
2722                    ));
2723                }
2724                CatalogItem::Table(table) => {
2725                    match &table.data_source {
2726                        TableDataSource::TableWrites { defaults: _ } => {
2727                            let versions: BTreeMap<_, _> = table
2728                                .collection_descs()
2729                                .map(|(gid, version, desc)| (version, (gid, desc)))
2730                                .collect();
2731                            let collection_descs = versions.iter().map(|(version, (gid, desc))| {
2732                                let next_version = version.bump();
2733                                let primary_collection =
2734                                    versions.get(&next_version).map(|(gid, _desc)| gid).copied();
2735                                let collection_desc = CollectionDescription::for_table(
2736                                    desc.clone(),
2737                                    primary_collection,
2738                                );
2739
2740                                (*gid, collection_desc)
2741                            });
2742                            collections.extend(collection_descs);
2743                        }
2744                        TableDataSource::DataSource {
2745                            desc: data_source_desc,
2746                            timeline,
2747                        } => {
2748                            // TODO(alter_table): Support versioning tables that read from sources.
2749                            soft_assert_eq_or_log!(table.collections.len(), 1);
2750                            let collection_descs =
2751                                table.collection_descs().map(|(gid, _version, desc)| {
2752                                    (gid, source_desc(data_source_desc, &desc, timeline))
2753                                });
2754                            collections.extend(collection_descs);
2755                        }
2756                    };
2757                }
2758                CatalogItem::MaterializedView(mv) => {
2759                    let collection_desc = CollectionDescription {
2760                        desc: mv.desc.clone(),
2761                        data_source: DataSource::Other,
2762                        since: mv.initial_as_of.clone(),
2763                        status_collection_id: None,
2764                        timeline: None,
2765                    };
2766                    compute_collections.push((mv.global_id(), mv.desc.clone()));
2767                    collections.push((mv.global_id(), collection_desc));
2768                }
2769                CatalogItem::ContinualTask(ct) => {
2770                    let collection_desc = CollectionDescription {
2771                        desc: ct.desc.clone(),
2772                        data_source: DataSource::Other,
2773                        since: ct.initial_as_of.clone(),
2774                        status_collection_id: None,
2775                        timeline: None,
2776                    };
2777                    if ct.global_id().is_system() && collection_desc.since.is_none() {
2778                        // We need a non-0 since to make as_of selection work. Fill it in below with
2779                        // the `bootstrap_builtin_continual_tasks` call, which can only be run after
2780                        // `create_collections_for_bootstrap`.
2781                        new_builtin_continual_tasks.push((ct.global_id(), collection_desc));
2782                    } else {
2783                        compute_collections.push((ct.global_id(), ct.desc.clone()));
2784                        collections.push((ct.global_id(), collection_desc));
2785                    }
2786                }
2787                CatalogItem::Sink(sink) => {
2788                    let storage_sink_from_entry = self.catalog().get_entry_by_global_id(&sink.from);
2789                    let from_desc = storage_sink_from_entry
2790                        .desc(&self.catalog().resolve_full_name(
2791                            storage_sink_from_entry.name(),
2792                            storage_sink_from_entry.conn_id(),
2793                        ))
2794                        .expect("sinks can only be built on items with descs")
2795                        .into_owned();
2796                    let collection_desc = CollectionDescription {
2797                        // TODO(sinks): make generic once we have more than one sink type.
2798                        desc: KAFKA_PROGRESS_DESC.clone(),
2799                        data_source: DataSource::Sink {
2800                            desc: ExportDescription {
2801                                sink: StorageSinkDesc {
2802                                    from: sink.from,
2803                                    from_desc,
2804                                    connection: sink
2805                                        .connection
2806                                        .clone()
2807                                        .into_inline_connection(self.catalog().state()),
2808                                    envelope: sink.envelope,
2809                                    as_of: Antichain::from_elem(Timestamp::minimum()),
2810                                    with_snapshot: sink.with_snapshot,
2811                                    version: sink.version,
2812                                    from_storage_metadata: (),
2813                                    to_storage_metadata: (),
2814                                },
2815                                instance_id: sink.cluster_id,
2816                            },
2817                        },
2818                        since: None,
2819                        status_collection_id: None,
2820                        timeline: None,
2821                    };
2822                    collections.push((sink.global_id, collection_desc));
2823                }
2824                _ => (),
2825            }
2826        }
2827
2828        let register_ts = if self.controller.read_only() {
2829            self.get_local_read_ts().await
2830        } else {
2831            // Getting a write timestamp bumps the write timestamp in the
2832            // oracle, which we're not allowed in read-only mode.
2833            self.get_local_write_ts().await.timestamp
2834        };
2835
2836        let storage_metadata = self.catalog.state().storage_metadata();
2837        let migrated_storage_collections = migrated_storage_collections
2838            .into_iter()
2839            .flat_map(|item_id| self.catalog.get_entry(item_id).global_ids())
2840            .collect();
2841
2842        // Before possibly creating collections, make sure their schemas are correct.
2843        //
2844        // Across different versions of Materialize the nullability of columns can change based on
2845        // updates to our optimizer.
2846        self.controller
2847            .storage
2848            .evolve_nullability_for_bootstrap(storage_metadata, compute_collections)
2849            .await
2850            .unwrap_or_terminate("cannot fail to evolve collections");
2851
2852        self.controller
2853            .storage
2854            .create_collections_for_bootstrap(
2855                storage_metadata,
2856                Some(register_ts),
2857                collections,
2858                &migrated_storage_collections,
2859            )
2860            .await
2861            .unwrap_or_terminate("cannot fail to create collections");
2862
2863        self.bootstrap_builtin_continual_tasks(new_builtin_continual_tasks)
2864            .await;
2865
2866        if !self.controller.read_only() {
2867            self.apply_local_write(register_ts).await;
2868        }
2869    }
2870
2871    /// Make as_of selection happy for builtin CTs. Ideally we'd write the
2872    /// initial as_of down in the durable catalog, but that's hard because of
2873    /// boot ordering. Instead, we set the since of the storage collection to
2874    /// something that's a reasonable lower bound for the as_of. Then, if the
2875    /// upper is 0, the as_of selection code will allow us to jump it forward to
2876    /// this since.
2877    async fn bootstrap_builtin_continual_tasks(
2878        &mut self,
2879        // TODO(alter_table): Switch to CatalogItemId.
2880        mut collections: Vec<(GlobalId, CollectionDescription<Timestamp>)>,
2881    ) {
2882        for (id, collection) in &mut collections {
2883            let entry = self.catalog.get_entry_by_global_id(id);
2884            let ct = match &entry.item {
2885                CatalogItem::ContinualTask(ct) => ct.clone(),
2886                _ => unreachable!("only called with continual task builtins"),
2887            };
2888            let debug_name = self
2889                .catalog()
2890                .resolve_full_name(entry.name(), None)
2891                .to_string();
2892            let (_optimized_plan, physical_plan, _metainfo) = self
2893                .optimize_create_continual_task(&ct, *id, self.owned_catalog(), debug_name)
2894                .expect("builtin CT should optimize successfully");
2895
2896            // Determine an as of for the new continual task.
2897            let mut id_bundle = dataflow_import_id_bundle(&physical_plan, ct.cluster_id);
2898            // Can't acquire a read hold on ourselves because we don't exist yet.
2899            id_bundle.storage_ids.remove(id);
2900            let read_holds = self.acquire_read_holds(&id_bundle);
2901            let as_of = read_holds.least_valid_read();
2902
2903            collection.since = Some(as_of.clone());
2904        }
2905        self.controller
2906            .storage
2907            .create_collections(self.catalog.state().storage_metadata(), None, collections)
2908            .await
2909            .unwrap_or_terminate("cannot fail to create collections");
2910    }
2911
2912    /// Invokes the optimizer on all indexes and materialized views in the catalog and inserts the
2913    /// resulting dataflow plans into the catalog state.
2914    ///
2915    /// `ordered_catalog_entries` must be sorted in dependency order, with dependencies ordered
2916    /// before their dependants.
2917    ///
2918    /// This method does not perform timestamp selection for the dataflows, nor does it create them
2919    /// in the compute controller. Both of these steps happen later during bootstrapping.
2920    ///
2921    /// Returns a map of expressions that were not cached.
2922    #[instrument]
2923    fn bootstrap_dataflow_plans(
2924        &mut self,
2925        ordered_catalog_entries: &[CatalogEntry],
2926        mut cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
2927    ) -> Result<BTreeMap<GlobalId, GlobalExpressions>, AdapterError> {
2928        // The optimizer expects to be able to query its `ComputeInstanceSnapshot` for
2929        // collections the current dataflow can depend on. But since we don't yet install anything
2930        // on compute instances, the snapshot information is incomplete. We fix that by manually
2931        // updating `ComputeInstanceSnapshot` objects to ensure they contain collections previously
2932        // optimized.
2933        let mut instance_snapshots = BTreeMap::new();
2934        let mut uncached_expressions = BTreeMap::new();
2935
2936        let optimizer_config = OptimizerConfig::from(self.catalog().system_config());
2937
2938        for entry in ordered_catalog_entries {
2939            match entry.item() {
2940                CatalogItem::Index(idx) => {
2941                    // Collect optimizer parameters.
2942                    let compute_instance =
2943                        instance_snapshots.entry(idx.cluster_id).or_insert_with(|| {
2944                            self.instance_snapshot(idx.cluster_id)
2945                                .expect("compute instance exists")
2946                        });
2947                    let global_id = idx.global_id();
2948
2949                    // The index may already be installed on the compute instance. For example,
2950                    // this is the case for introspection indexes.
2951                    if compute_instance.contains_collection(&global_id) {
2952                        continue;
2953                    }
2954
2955                    let (optimized_plan, physical_plan, metainfo) =
2956                        match cached_global_exprs.remove(&global_id) {
2957                            Some(global_expressions)
2958                                if global_expressions.optimizer_features
2959                                    == optimizer_config.features =>
2960                            {
2961                                debug!("global expression cache hit for {global_id:?}");
2962                                (
2963                                    global_expressions.global_mir,
2964                                    global_expressions.physical_plan,
2965                                    global_expressions.dataflow_metainfos,
2966                                )
2967                            }
2968                            Some(_) | None => {
2969                                let (optimized_plan, global_lir_plan) = {
2970                                    // Build an optimizer for this INDEX.
2971                                    let mut optimizer = optimize::index::Optimizer::new(
2972                                        self.owned_catalog(),
2973                                        compute_instance.clone(),
2974                                        global_id,
2975                                        optimizer_config.clone(),
2976                                        self.optimizer_metrics(),
2977                                    );
2978
2979                                    // MIR ⇒ MIR optimization (global)
2980                                    let index_plan = optimize::index::Index::new(
2981                                        entry.name().clone(),
2982                                        idx.on,
2983                                        idx.keys.to_vec(),
2984                                    );
2985                                    let global_mir_plan = optimizer.optimize(index_plan)?;
2986                                    let optimized_plan = global_mir_plan.df_desc().clone();
2987
2988                                    // MIR ⇒ LIR lowering and LIR ⇒ LIR optimization (global)
2989                                    let global_lir_plan = optimizer.optimize(global_mir_plan)?;
2990
2991                                    (optimized_plan, global_lir_plan)
2992                                };
2993
2994                                let (physical_plan, metainfo) = global_lir_plan.unapply();
2995                                let metainfo = {
2996                                    // Pre-allocate a vector of transient GlobalIds for each notice.
2997                                    let notice_ids =
2998                                        std::iter::repeat_with(|| self.allocate_transient_id())
2999                                            .map(|(_item_id, gid)| gid)
3000                                            .take(metainfo.optimizer_notices.len())
3001                                            .collect::<Vec<_>>();
3002                                    // Return a metainfo with rendered notices.
3003                                    self.catalog().render_notices(
3004                                        metainfo,
3005                                        notice_ids,
3006                                        Some(idx.global_id()),
3007                                    )
3008                                };
3009                                uncached_expressions.insert(
3010                                    global_id,
3011                                    GlobalExpressions {
3012                                        global_mir: optimized_plan.clone(),
3013                                        physical_plan: physical_plan.clone(),
3014                                        dataflow_metainfos: metainfo.clone(),
3015                                        optimizer_features: OptimizerFeatures::from(
3016                                            self.catalog().system_config(),
3017                                        ),
3018                                    },
3019                                );
3020                                (optimized_plan, physical_plan, metainfo)
3021                            }
3022                        };
3023
3024                    let catalog = self.catalog_mut();
3025                    catalog.set_optimized_plan(idx.global_id(), optimized_plan);
3026                    catalog.set_physical_plan(idx.global_id(), physical_plan);
3027                    catalog.set_dataflow_metainfo(idx.global_id(), metainfo);
3028
3029                    compute_instance.insert_collection(idx.global_id());
3030                }
3031                CatalogItem::MaterializedView(mv) => {
3032                    // Collect optimizer parameters.
3033                    let compute_instance =
3034                        instance_snapshots.entry(mv.cluster_id).or_insert_with(|| {
3035                            self.instance_snapshot(mv.cluster_id)
3036                                .expect("compute instance exists")
3037                        });
3038                    let global_id = mv.global_id();
3039
3040                    let (optimized_plan, physical_plan, metainfo) =
3041                        match cached_global_exprs.remove(&global_id) {
3042                            Some(global_expressions)
3043                                if global_expressions.optimizer_features
3044                                    == optimizer_config.features =>
3045                            {
3046                                debug!("global expression cache hit for {global_id:?}");
3047                                (
3048                                    global_expressions.global_mir,
3049                                    global_expressions.physical_plan,
3050                                    global_expressions.dataflow_metainfos,
3051                                )
3052                            }
3053                            Some(_) | None => {
3054                                let (_, internal_view_id) = self.allocate_transient_id();
3055                                let debug_name = self
3056                                    .catalog()
3057                                    .resolve_full_name(entry.name(), None)
3058                                    .to_string();
3059                                let force_non_monotonic = Default::default();
3060
3061                                let (optimized_plan, global_lir_plan) = {
3062                                    // Build an optimizer for this MATERIALIZED VIEW.
3063                                    let mut optimizer = optimize::materialized_view::Optimizer::new(
3064                                        self.owned_catalog().as_optimizer_catalog(),
3065                                        compute_instance.clone(),
3066                                        global_id,
3067                                        internal_view_id,
3068                                        mv.desc.iter_names().cloned().collect(),
3069                                        mv.non_null_assertions.clone(),
3070                                        mv.refresh_schedule.clone(),
3071                                        debug_name,
3072                                        optimizer_config.clone(),
3073                                        self.optimizer_metrics(),
3074                                        force_non_monotonic,
3075                                    );
3076
3077                                    // MIR ⇒ MIR optimization (global)
3078                                    let global_mir_plan =
3079                                        optimizer.optimize(mv.optimized_expr.as_ref().clone())?;
3080                                    let optimized_plan = global_mir_plan.df_desc().clone();
3081
3082                                    // MIR ⇒ LIR lowering and LIR ⇒ LIR optimization (global)
3083                                    let global_lir_plan = optimizer.optimize(global_mir_plan)?;
3084
3085                                    (optimized_plan, global_lir_plan)
3086                                };
3087
3088                                let (physical_plan, metainfo) = global_lir_plan.unapply();
3089                                let metainfo = {
3090                                    // Pre-allocate a vector of transient GlobalIds for each notice.
3091                                    let notice_ids =
3092                                        std::iter::repeat_with(|| self.allocate_transient_id())
3093                                            .map(|(_item_id, global_id)| global_id)
3094                                            .take(metainfo.optimizer_notices.len())
3095                                            .collect::<Vec<_>>();
3096                                    // Return a metainfo with rendered notices.
3097                                    self.catalog().render_notices(
3098                                        metainfo,
3099                                        notice_ids,
3100                                        Some(mv.global_id()),
3101                                    )
3102                                };
3103                                uncached_expressions.insert(
3104                                    global_id,
3105                                    GlobalExpressions {
3106                                        global_mir: optimized_plan.clone(),
3107                                        physical_plan: physical_plan.clone(),
3108                                        dataflow_metainfos: metainfo.clone(),
3109                                        optimizer_features: OptimizerFeatures::from(
3110                                            self.catalog().system_config(),
3111                                        ),
3112                                    },
3113                                );
3114                                (optimized_plan, physical_plan, metainfo)
3115                            }
3116                        };
3117
3118                    let catalog = self.catalog_mut();
3119                    catalog.set_optimized_plan(mv.global_id(), optimized_plan);
3120                    catalog.set_physical_plan(mv.global_id(), physical_plan);
3121                    catalog.set_dataflow_metainfo(mv.global_id(), metainfo);
3122
3123                    compute_instance.insert_collection(mv.global_id());
3124                }
3125                CatalogItem::ContinualTask(ct) => {
3126                    let compute_instance =
3127                        instance_snapshots.entry(ct.cluster_id).or_insert_with(|| {
3128                            self.instance_snapshot(ct.cluster_id)
3129                                .expect("compute instance exists")
3130                        });
3131                    let global_id = ct.global_id();
3132
3133                    let (optimized_plan, physical_plan, metainfo) =
3134                        match cached_global_exprs.remove(&global_id) {
3135                            Some(global_expressions)
3136                                if global_expressions.optimizer_features
3137                                    == optimizer_config.features =>
3138                            {
3139                                debug!("global expression cache hit for {global_id:?}");
3140                                (
3141                                    global_expressions.global_mir,
3142                                    global_expressions.physical_plan,
3143                                    global_expressions.dataflow_metainfos,
3144                                )
3145                            }
3146                            Some(_) | None => {
3147                                let debug_name = self
3148                                    .catalog()
3149                                    .resolve_full_name(entry.name(), None)
3150                                    .to_string();
3151                                let (optimized_plan, physical_plan, metainfo) = self
3152                                    .optimize_create_continual_task(
3153                                        ct,
3154                                        global_id,
3155                                        self.owned_catalog(),
3156                                        debug_name,
3157                                    )?;
3158                                uncached_expressions.insert(
3159                                    global_id,
3160                                    GlobalExpressions {
3161                                        global_mir: optimized_plan.clone(),
3162                                        physical_plan: physical_plan.clone(),
3163                                        dataflow_metainfos: metainfo.clone(),
3164                                        optimizer_features: OptimizerFeatures::from(
3165                                            self.catalog().system_config(),
3166                                        ),
3167                                    },
3168                                );
3169                                (optimized_plan, physical_plan, metainfo)
3170                            }
3171                        };
3172
3173                    let catalog = self.catalog_mut();
3174                    catalog.set_optimized_plan(ct.global_id(), optimized_plan);
3175                    catalog.set_physical_plan(ct.global_id(), physical_plan);
3176                    catalog.set_dataflow_metainfo(ct.global_id(), metainfo);
3177
3178                    compute_instance.insert_collection(ct.global_id());
3179                }
3180                _ => (),
3181            }
3182        }
3183
3184        Ok(uncached_expressions)
3185    }
3186
3187    /// Selects for each compute dataflow an as-of suitable for bootstrapping it.
3188    ///
3189    /// Returns a set of [`ReadHold`]s that ensures the read frontiers of involved collections stay
3190    /// in place and that must not be dropped before all compute dataflows have been created with
3191    /// the compute controller.
3192    ///
3193    /// This method expects all storage collections and dataflow plans to be available, so it must
3194    /// run after [`Coordinator::bootstrap_storage_collections`] and
3195    /// [`Coordinator::bootstrap_dataflow_plans`].
3196    async fn bootstrap_dataflow_as_ofs(&mut self) -> BTreeMap<GlobalId, ReadHold<Timestamp>> {
3197        let mut catalog_ids = Vec::new();
3198        let mut dataflows = Vec::new();
3199        let mut read_policies = BTreeMap::new();
3200        for entry in self.catalog.entries() {
3201            let gid = match entry.item() {
3202                CatalogItem::Index(idx) => idx.global_id(),
3203                CatalogItem::MaterializedView(mv) => mv.global_id(),
3204                CatalogItem::ContinualTask(ct) => ct.global_id(),
3205                CatalogItem::Table(_)
3206                | CatalogItem::Source(_)
3207                | CatalogItem::Log(_)
3208                | CatalogItem::View(_)
3209                | CatalogItem::Sink(_)
3210                | CatalogItem::Type(_)
3211                | CatalogItem::Func(_)
3212                | CatalogItem::Secret(_)
3213                | CatalogItem::Connection(_) => continue,
3214            };
3215            if let Some(plan) = self.catalog.try_get_physical_plan(&gid) {
3216                catalog_ids.push(gid);
3217                dataflows.push(plan.clone());
3218
3219                if let Some(compaction_window) = entry.item().initial_logical_compaction_window() {
3220                    read_policies.insert(gid, compaction_window.into());
3221                }
3222            }
3223        }
3224
3225        let read_ts = self.get_local_read_ts().await;
3226        let read_holds = as_of_selection::run(
3227            &mut dataflows,
3228            &read_policies,
3229            &*self.controller.storage_collections,
3230            read_ts,
3231        );
3232
3233        let catalog = self.catalog_mut();
3234        for (id, plan) in catalog_ids.into_iter().zip(dataflows) {
3235            catalog.set_physical_plan(id, plan);
3236        }
3237
3238        read_holds
3239    }
3240
3241    /// Serves the coordinator, receiving commands from users over `cmd_rx`
3242    /// and feedback from dataflow workers over `feedback_rx`.
3243    ///
3244    /// You must call `bootstrap` before calling this method.
3245    ///
3246    /// BOXED FUTURE: As of Nov 2023 the returned Future from this function was 92KB. This would
3247    /// get stored on the stack which is bad for runtime performance, and blow up our stack usage.
3248    /// Because of that we purposefully move this Future onto the heap (i.e. Box it).
3249    fn serve(
3250        mut self,
3251        mut internal_cmd_rx: mpsc::UnboundedReceiver<Message>,
3252        mut strict_serializable_reads_rx: mpsc::UnboundedReceiver<(ConnectionId, PendingReadTxn)>,
3253        mut cmd_rx: mpsc::UnboundedReceiver<(OpenTelemetryContext, Command)>,
3254        group_commit_rx: appends::GroupCommitWaiter,
3255    ) -> LocalBoxFuture<'static, ()> {
3256        async move {
3257            // Watcher that listens for and reports cluster service status changes.
3258            let mut cluster_events = self.controller.events_stream();
3259            let last_message = Arc::new(Mutex::new(LastMessage {
3260                kind: "none",
3261                stmt: None,
3262            }));
3263
3264            let (idle_tx, mut idle_rx) = tokio::sync::mpsc::channel(1);
3265            let idle_metric = self.metrics.queue_busy_seconds.with_label_values(&[]);
3266            let last_message_watchdog = Arc::clone(&last_message);
3267
3268            spawn(|| "coord watchdog", async move {
3269                // Every 5 seconds, attempt to measure how long it takes for the
3270                // coord select loop to be empty, because this message is the last
3271                // processed. If it is idle, this will result in some microseconds
3272                // of measurement.
3273                let mut interval = tokio::time::interval(Duration::from_secs(5));
3274                // If we end up having to wait more than 5 seconds for the coord to respond, then the
3275                // behavior of Delay results in the interval "restarting" from whenever we yield
3276                // instead of trying to catch up.
3277                interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
3278
3279                // Track if we become stuck to de-dupe error reporting.
3280                let mut coord_stuck = false;
3281
3282                loop {
3283                    interval.tick().await;
3284
3285                    // Wait for space in the channel, if we timeout then the coordinator is stuck!
3286                    let duration = tokio::time::Duration::from_secs(30);
3287                    let timeout = tokio::time::timeout(duration, idle_tx.reserve()).await;
3288                    let Ok(maybe_permit) = timeout else {
3289                        // Only log if we're newly stuck, to prevent logging repeatedly.
3290                        if !coord_stuck {
3291                            let last_message = last_message_watchdog.lock().expect("poisoned");
3292                            tracing::warn!(
3293                                last_message_kind = %last_message.kind,
3294                                last_message_sql = %last_message.stmt_to_string(),
3295                                "coordinator stuck for {duration:?}",
3296                            );
3297                        }
3298                        coord_stuck = true;
3299
3300                        continue;
3301                    };
3302
3303                    // We got a permit, we're not stuck!
3304                    if coord_stuck {
3305                        tracing::info!("Coordinator became unstuck");
3306                    }
3307                    coord_stuck = false;
3308
3309                    // If we failed to acquire a permit it's because we're shutting down.
3310                    let Ok(permit) = maybe_permit else {
3311                        break;
3312                    };
3313
3314                    permit.send(idle_metric.start_timer());
3315                }
3316            });
3317
3318            self.schedule_storage_usage_collection().await;
3319            self.spawn_privatelink_vpc_endpoints_watch_task();
3320            self.spawn_statement_logging_task();
3321            flags::tracing_config(self.catalog.system_config()).apply(&self.tracing_handle);
3322
3323            // Report if the handling of a single message takes longer than this threshold.
3324            let warn_threshold = self
3325                .catalog()
3326                .system_config()
3327                .coord_slow_message_warn_threshold();
3328
3329            // How many messages we'd like to batch up before processing them. Must be > 0.
3330            const MESSAGE_BATCH: usize = 64;
3331            let mut messages = Vec::with_capacity(MESSAGE_BATCH);
3332            let mut cmd_messages = Vec::with_capacity(MESSAGE_BATCH);
3333
3334            let message_batch = self.metrics
3335                .message_batch
3336                .with_label_values(&[]);
3337
3338            loop {
3339                // Before adding a branch to this select loop, please ensure that the branch is
3340                // cancellation safe and add a comment explaining why. You can refer here for more
3341                // info: https://docs.rs/tokio/latest/tokio/macro.select.html#cancellation-safety
3342                select! {
3343                    // We prioritize internal commands over other commands. However, we work through
3344                    // batches of commands in some branches of this select, which means that even if
3345                    // a command generates internal commands, we will work through the current batch
3346                    // before receiving a new batch of commands.
3347                    biased;
3348
3349                    // `recv_many()` on `UnboundedReceiver` is cancellation safe:
3350                    // https://docs.rs/tokio/1.38.0/tokio/sync/mpsc/struct.UnboundedReceiver.html#cancel-safety-1
3351                    // Receive a batch of commands.
3352                    _ = internal_cmd_rx.recv_many(&mut messages, MESSAGE_BATCH) => {},
3353                    // `next()` on any stream is cancel-safe:
3354                    // https://docs.rs/tokio-stream/0.1.9/tokio_stream/trait.StreamExt.html#cancel-safety
3355                    // Receive a single command.
3356                    Some(event) = cluster_events.next() => messages.push(Message::ClusterEvent(event)),
3357                    // See [`mz_controller::Controller::Controller::ready`] for notes
3358                    // on why this is cancel-safe.
3359                    // Receive a single command.
3360                    () = self.controller.ready() => {
3361                        messages.push(Message::ControllerReady);
3362                    }
3363                    // See [`appends::GroupCommitWaiter`] for notes on why this is cancel safe.
3364                    // Receive a single command.
3365                    permit = group_commit_rx.ready() => {
3366                        // If we happen to have batched exactly one user write, use
3367                        // that span so the `emit_trace_id_notice` hooks up.
3368                        // Otherwise, the best we can do is invent a new root span
3369                        // and make it follow from all the Spans in the pending
3370                        // writes.
3371                        let user_write_spans = self.pending_writes.iter().flat_map(|x| match x {
3372                            PendingWriteTxn::User{span, ..} => Some(span),
3373                            PendingWriteTxn::System{..} => None,
3374                        });
3375                        let span = match user_write_spans.exactly_one() {
3376                            Ok(span) => span.clone(),
3377                            Err(user_write_spans) => {
3378                                let span = info_span!(parent: None, "group_commit_notify");
3379                                for s in user_write_spans {
3380                                    span.follows_from(s);
3381                                }
3382                                span
3383                            }
3384                        };
3385                        messages.push(Message::GroupCommitInitiate(span, Some(permit)));
3386                    },
3387                    // `recv_many()` on `UnboundedReceiver` is cancellation safe:
3388                    // https://docs.rs/tokio/1.38.0/tokio/sync/mpsc/struct.UnboundedReceiver.html#cancel-safety-1
3389                    // Receive a batch of commands.
3390                    count = cmd_rx.recv_many(&mut cmd_messages, MESSAGE_BATCH) => {
3391                        if count == 0 {
3392                            break;
3393                        } else {
3394                            messages.extend(cmd_messages.drain(..).map(|(otel_ctx, cmd)| Message::Command(otel_ctx, cmd)));
3395                        }
3396                    },
3397                    // `recv()` on `UnboundedReceiver` is cancellation safe:
3398                    // https://docs.rs/tokio/1.38.0/tokio/sync/mpsc/struct.UnboundedReceiver.html#cancel-safety
3399                    // Receive a single command.
3400                    Some(pending_read_txn) = strict_serializable_reads_rx.recv() => {
3401                        let mut pending_read_txns = vec![pending_read_txn];
3402                        while let Ok(pending_read_txn) = strict_serializable_reads_rx.try_recv() {
3403                            pending_read_txns.push(pending_read_txn);
3404                        }
3405                        for (conn_id, pending_read_txn) in pending_read_txns {
3406                            let prev = self.pending_linearize_read_txns.insert(conn_id, pending_read_txn);
3407                            soft_assert_or_log!(
3408                                prev.is_none(),
3409                                "connections can not have multiple concurrent reads, prev: {prev:?}"
3410                            )
3411                        }
3412                        messages.push(Message::LinearizeReads);
3413                    }
3414                    // `tick()` on `Interval` is cancel-safe:
3415                    // https://docs.rs/tokio/1.19.2/tokio/time/struct.Interval.html#cancel-safety
3416                    // Receive a single command.
3417                    _ = self.advance_timelines_interval.tick() => {
3418                        let span = info_span!(parent: None, "coord::advance_timelines_interval");
3419                        span.follows_from(Span::current());
3420
3421                        // Group commit sends an `AdvanceTimelines` message when
3422                        // done, which is what downgrades read holds. In
3423                        // read-only mode we send this message directly because
3424                        // we're not doing group commits.
3425                        if self.controller.read_only() {
3426                            messages.push(Message::AdvanceTimelines);
3427                        } else {
3428                            messages.push(Message::GroupCommitInitiate(span, None));
3429                        }
3430                    },
3431                    // `tick()` on `Interval` is cancel-safe:
3432                    // https://docs.rs/tokio/1.19.2/tokio/time/struct.Interval.html#cancel-safety
3433                    // Receive a single command.
3434                    _ = self.check_cluster_scheduling_policies_interval.tick() => {
3435                        messages.push(Message::CheckSchedulingPolicies);
3436                    },
3437
3438                    // `tick()` on `Interval` is cancel-safe:
3439                    // https://docs.rs/tokio/1.19.2/tokio/time/struct.Interval.html#cancel-safety
3440                    // Receive a single command.
3441                    _ = self.caught_up_check_interval.tick() => {
3442                        // We do this directly on the main loop instead of
3443                        // firing off a message. We are still in read-only mode,
3444                        // so optimizing for latency, not blocking the main loop
3445                        // is not that important.
3446                        self.maybe_check_caught_up().await;
3447
3448                        continue;
3449                    },
3450
3451                    // Process the idle metric at the lowest priority to sample queue non-idle time.
3452                    // `recv()` on `Receiver` is cancellation safe:
3453                    // https://docs.rs/tokio/1.8.0/tokio/sync/mpsc/struct.Receiver.html#cancel-safety
3454                    // Receive a single command.
3455                    timer = idle_rx.recv() => {
3456                        timer.expect("does not drop").observe_duration();
3457                        self.metrics
3458                            .message_handling
3459                            .with_label_values(&["watchdog"])
3460                            .observe(0.0);
3461                        continue;
3462                    }
3463                };
3464
3465                // Observe the number of messages we're processing at once.
3466                message_batch.observe(f64::cast_lossy(messages.len()));
3467
3468                for msg in messages.drain(..) {
3469                    // All message processing functions trace. Start a parent span
3470                    // for them to make it easy to find slow messages.
3471                    let msg_kind = msg.kind();
3472                    let span = span!(
3473                        target: "mz_adapter::coord::handle_message_loop",
3474                        Level::INFO,
3475                        "coord::handle_message",
3476                        kind = msg_kind
3477                    );
3478                    let otel_context = span.context().span().span_context().clone();
3479
3480                    // Record the last kind of message in case we get stuck. For
3481                    // execute commands, we additionally stash the user's SQL,
3482                    // statement, so we can log it in case we get stuck.
3483                    *last_message.lock().expect("poisoned") = LastMessage {
3484                        kind: msg_kind,
3485                        stmt: match &msg {
3486                            Message::Command(
3487                                _,
3488                                Command::Execute {
3489                                    portal_name,
3490                                    session,
3491                                    ..
3492                                },
3493                            ) => session
3494                                .get_portal_unverified(portal_name)
3495                                .and_then(|p| p.stmt.as_ref().map(Arc::clone)),
3496                            _ => None,
3497                        },
3498                    };
3499
3500                    let start = Instant::now();
3501                    self.handle_message(msg).instrument(span).await;
3502                    let duration = start.elapsed();
3503
3504                    self.metrics
3505                        .message_handling
3506                        .with_label_values(&[msg_kind])
3507                        .observe(duration.as_secs_f64());
3508
3509                    // If something is _really_ slow, print a trace id for debugging, if OTEL is enabled.
3510                    if duration > warn_threshold {
3511                        let trace_id = otel_context.is_valid().then(|| otel_context.trace_id());
3512                        tracing::error!(
3513                            ?msg_kind,
3514                            ?trace_id,
3515                            ?duration,
3516                            "very slow coordinator message"
3517                        );
3518                    }
3519                }
3520            }
3521            // Try and cleanup as a best effort. There may be some async tasks out there holding a
3522            // reference that prevents us from cleaning up.
3523            if let Some(catalog) = Arc::into_inner(self.catalog) {
3524                catalog.expire().await;
3525            }
3526        }
3527        .boxed_local()
3528    }
3529
3530    /// Obtain a read-only Catalog reference.
3531    fn catalog(&self) -> &Catalog {
3532        &self.catalog
3533    }
3534
3535    /// Obtain a read-only Catalog snapshot, suitable for giving out to
3536    /// non-Coordinator thread tasks.
3537    fn owned_catalog(&self) -> Arc<Catalog> {
3538        Arc::clone(&self.catalog)
3539    }
3540
3541    /// Obtain a handle to the optimizer metrics, suitable for giving
3542    /// out to non-Coordinator thread tasks.
3543    fn optimizer_metrics(&self) -> OptimizerMetrics {
3544        self.optimizer_metrics.clone()
3545    }
3546
3547    /// Obtain a writeable Catalog reference.
3548    fn catalog_mut(&mut self) -> &mut Catalog {
3549        // make_mut will cause any other Arc references (from owned_catalog) to
3550        // continue to be valid by cloning the catalog, putting it in a new Arc,
3551        // which lives at self._catalog. If there are no other Arc references,
3552        // then no clone is made, and it returns a reference to the existing
3553        // object. This makes this method and owned_catalog both very cheap: at
3554        // most one clone per catalog mutation, but only if there's a read-only
3555        // reference to it.
3556        Arc::make_mut(&mut self.catalog)
3557    }
3558
3559    /// Obtain a reference to the coordinator's connection context.
3560    fn connection_context(&self) -> &ConnectionContext {
3561        self.controller.connection_context()
3562    }
3563
3564    /// Obtain a reference to the coordinator's secret reader, in an `Arc`.
3565    fn secrets_reader(&self) -> &Arc<dyn SecretsReader> {
3566        &self.connection_context().secrets_reader
3567    }
3568
3569    /// Publishes a notice message to all sessions.
3570    ///
3571    /// TODO(parkmycar): This code is dead, but is a nice parallel to [`Coordinator::broadcast_notice_tx`]
3572    /// so we keep it around.
3573    #[allow(dead_code)]
3574    pub(crate) fn broadcast_notice(&self, notice: AdapterNotice) {
3575        for meta in self.active_conns.values() {
3576            let _ = meta.notice_tx.send(notice.clone());
3577        }
3578    }
3579
3580    /// Returns a closure that will publish a notice to all sessions that were active at the time
3581    /// this method was called.
3582    pub(crate) fn broadcast_notice_tx(
3583        &self,
3584    ) -> Box<dyn FnOnce(AdapterNotice) -> () + Send + 'static> {
3585        let senders: Vec<_> = self
3586            .active_conns
3587            .values()
3588            .map(|meta| meta.notice_tx.clone())
3589            .collect();
3590        Box::new(move |notice| {
3591            for tx in senders {
3592                let _ = tx.send(notice.clone());
3593            }
3594        })
3595    }
3596
3597    pub(crate) fn active_conns(&self) -> &BTreeMap<ConnectionId, ConnMeta> {
3598        &self.active_conns
3599    }
3600
3601    #[instrument(level = "debug")]
3602    pub(crate) fn retire_execution(
3603        &mut self,
3604        reason: StatementEndedExecutionReason,
3605        ctx_extra: ExecuteContextExtra,
3606    ) {
3607        if let Some(uuid) = ctx_extra.retire() {
3608            self.end_statement_execution(uuid, reason);
3609        }
3610    }
3611
3612    /// Creates a new dataflow builder from the catalog and indexes in `self`.
3613    #[instrument(level = "debug")]
3614    pub fn dataflow_builder(&self, instance: ComputeInstanceId) -> DataflowBuilder {
3615        let compute = self
3616            .instance_snapshot(instance)
3617            .expect("compute instance does not exist");
3618        DataflowBuilder::new(self.catalog().state(), compute)
3619    }
3620
3621    /// Return a reference-less snapshot to the indicated compute instance.
3622    pub fn instance_snapshot(
3623        &self,
3624        id: ComputeInstanceId,
3625    ) -> Result<ComputeInstanceSnapshot, InstanceMissing> {
3626        ComputeInstanceSnapshot::new(&self.controller, id)
3627    }
3628
3629    /// Call into the compute controller to install a finalized dataflow, and
3630    /// initialize the read policies for its exported readable objects.
3631    pub(crate) async fn ship_dataflow(
3632        &mut self,
3633        dataflow: DataflowDescription<Plan>,
3634        instance: ComputeInstanceId,
3635        subscribe_target_replica: Option<ReplicaId>,
3636    ) {
3637        // We must only install read policies for indexes, not for sinks.
3638        // Sinks are write-only compute collections that don't have read policies.
3639        let export_ids = dataflow.exported_index_ids().collect();
3640
3641        self.controller
3642            .compute
3643            .create_dataflow(instance, dataflow, subscribe_target_replica)
3644            .unwrap_or_terminate("dataflow creation cannot fail");
3645
3646        self.initialize_compute_read_policies(export_ids, instance, CompactionWindow::Default)
3647            .await;
3648    }
3649
3650    /// Like `ship_dataflow`, but also await on builtin table updates.
3651    pub(crate) async fn ship_dataflow_and_notice_builtin_table_updates(
3652        &mut self,
3653        dataflow: DataflowDescription<Plan>,
3654        instance: ComputeInstanceId,
3655        notice_builtin_updates_fut: Option<BuiltinTableAppendNotify>,
3656    ) {
3657        if let Some(notice_builtin_updates_fut) = notice_builtin_updates_fut {
3658            let ship_dataflow_fut = self.ship_dataflow(dataflow, instance, None);
3659            let ((), ()) =
3660                futures::future::join(notice_builtin_updates_fut, ship_dataflow_fut).await;
3661        } else {
3662            self.ship_dataflow(dataflow, instance, None).await;
3663        }
3664    }
3665
3666    /// Install a _watch set_ in the controller that is automatically associated with the given
3667    /// connection id. The watchset will be automatically cleared if the connection terminates
3668    /// before the watchset completes.
3669    pub fn install_compute_watch_set(
3670        &mut self,
3671        conn_id: ConnectionId,
3672        objects: BTreeSet<GlobalId>,
3673        t: Timestamp,
3674        state: WatchSetResponse,
3675    ) {
3676        let ws_id = self.controller.install_compute_watch_set(objects, t);
3677        self.connection_watch_sets
3678            .entry(conn_id.clone())
3679            .or_default()
3680            .insert(ws_id);
3681        self.installed_watch_sets.insert(ws_id, (conn_id, state));
3682    }
3683
3684    /// Install a _watch set_ in the controller that is automatically associated with the given
3685    /// connection id. The watchset will be automatically cleared if the connection terminates
3686    /// before the watchset completes.
3687    pub fn install_storage_watch_set(
3688        &mut self,
3689        conn_id: ConnectionId,
3690        objects: BTreeSet<GlobalId>,
3691        t: Timestamp,
3692        state: WatchSetResponse,
3693    ) {
3694        let ws_id = self.controller.install_storage_watch_set(objects, t);
3695        self.connection_watch_sets
3696            .entry(conn_id.clone())
3697            .or_default()
3698            .insert(ws_id);
3699        self.installed_watch_sets.insert(ws_id, (conn_id, state));
3700    }
3701
3702    /// Cancels pending watchsets associated with the provided connection id.
3703    pub fn cancel_pending_watchsets(&mut self, conn_id: &ConnectionId) {
3704        if let Some(ws_ids) = self.connection_watch_sets.remove(conn_id) {
3705            for ws_id in ws_ids {
3706                self.installed_watch_sets.remove(&ws_id);
3707            }
3708        }
3709    }
3710
3711    /// Returns the state of the [`Coordinator`] formatted as JSON.
3712    ///
3713    /// The returned value is not guaranteed to be stable and may change at any point in time.
3714    pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
3715        // Note: We purposefully use the `Debug` formatting for the value of all fields in the
3716        // returned object as a tradeoff between usability and stability. `serde_json` will fail
3717        // to serialize an object if the keys aren't strings, so `Debug` formatting the values
3718        // prevents a future unrelated change from silently breaking this method.
3719
3720        let global_timelines: BTreeMap<_, _> = self
3721            .global_timelines
3722            .iter()
3723            .map(|(timeline, state)| (timeline.to_string(), format!("{state:?}")))
3724            .collect();
3725        let active_conns: BTreeMap<_, _> = self
3726            .active_conns
3727            .iter()
3728            .map(|(id, meta)| (id.unhandled().to_string(), format!("{meta:?}")))
3729            .collect();
3730        let txn_read_holds: BTreeMap<_, _> = self
3731            .txn_read_holds
3732            .iter()
3733            .map(|(id, capability)| (id.unhandled().to_string(), format!("{capability:?}")))
3734            .collect();
3735        let pending_peeks: BTreeMap<_, _> = self
3736            .pending_peeks
3737            .iter()
3738            .map(|(id, peek)| (id.to_string(), format!("{peek:?}")))
3739            .collect();
3740        let client_pending_peeks: BTreeMap<_, _> = self
3741            .client_pending_peeks
3742            .iter()
3743            .map(|(id, peek)| {
3744                let peek: BTreeMap<_, _> = peek
3745                    .iter()
3746                    .map(|(uuid, storage_id)| (uuid.to_string(), storage_id))
3747                    .collect();
3748                (id.to_string(), peek)
3749            })
3750            .collect();
3751        let pending_linearize_read_txns: BTreeMap<_, _> = self
3752            .pending_linearize_read_txns
3753            .iter()
3754            .map(|(id, read_txn)| (id.unhandled().to_string(), format!("{read_txn:?}")))
3755            .collect();
3756
3757        let map = serde_json::Map::from_iter([
3758            (
3759                "global_timelines".to_string(),
3760                serde_json::to_value(global_timelines)?,
3761            ),
3762            (
3763                "active_conns".to_string(),
3764                serde_json::to_value(active_conns)?,
3765            ),
3766            (
3767                "txn_read_holds".to_string(),
3768                serde_json::to_value(txn_read_holds)?,
3769            ),
3770            (
3771                "pending_peeks".to_string(),
3772                serde_json::to_value(pending_peeks)?,
3773            ),
3774            (
3775                "client_pending_peeks".to_string(),
3776                serde_json::to_value(client_pending_peeks)?,
3777            ),
3778            (
3779                "pending_linearize_read_txns".to_string(),
3780                serde_json::to_value(pending_linearize_read_txns)?,
3781            ),
3782            ("controller".to_string(), self.controller.dump().await?),
3783        ]);
3784        Ok(serde_json::Value::Object(map))
3785    }
3786
3787    /// Prune all storage usage events from the [`MZ_STORAGE_USAGE_BY_SHARD`] table that are older
3788    /// than `retention_period`.
3789    ///
3790    /// This method will read the entire contents of [`MZ_STORAGE_USAGE_BY_SHARD`] into memory
3791    /// which can be expensive.
3792    ///
3793    /// DO NOT call this method outside of startup. The safety of reading at the current oracle read
3794    /// timestamp and then writing at whatever the current write timestamp is (instead of
3795    /// `read_ts + 1`) relies on the fact that there are no outstanding writes during startup.
3796    ///
3797    /// Group commit, which this method uses to write the retractions, has builtin fencing, and we
3798    /// never commit retractions to [`MZ_STORAGE_USAGE_BY_SHARD`] outside of this method, which is
3799    /// only called once during startup. So we don't have to worry about double/invalid retractions.
3800    async fn prune_storage_usage_events_on_startup(&self, retention_period: Duration) {
3801        let item_id = self
3802            .catalog()
3803            .resolve_builtin_table(&MZ_STORAGE_USAGE_BY_SHARD);
3804        let global_id = self.catalog.get_entry(&item_id).latest_global_id();
3805        let read_ts = self.get_local_read_ts().await;
3806        let current_contents_fut = self
3807            .controller
3808            .storage_collections
3809            .snapshot(global_id, read_ts);
3810        let internal_cmd_tx = self.internal_cmd_tx.clone();
3811        spawn(|| "storage_usage_prune", async move {
3812            let mut current_contents = current_contents_fut
3813                .await
3814                .unwrap_or_terminate("cannot fail to fetch snapshot");
3815            differential_dataflow::consolidation::consolidate(&mut current_contents);
3816
3817            let cutoff_ts = u128::from(read_ts).saturating_sub(retention_period.as_millis());
3818            let mut expired = Vec::new();
3819            for (row, diff) in current_contents {
3820                assert_eq!(
3821                    diff, 1,
3822                    "consolidated contents should not contain retractions: ({row:#?}, {diff:#?})"
3823                );
3824                // This logic relies on the definition of `mz_storage_usage_by_shard` not changing.
3825                let collection_timestamp = row
3826                    .unpack()
3827                    .get(3)
3828                    .expect("definition of mz_storage_by_shard changed")
3829                    .unwrap_timestamptz();
3830                let collection_timestamp = collection_timestamp.timestamp_millis();
3831                let collection_timestamp: u128 = collection_timestamp
3832                    .try_into()
3833                    .expect("all collections happen after Jan 1 1970");
3834                if collection_timestamp < cutoff_ts {
3835                    debug!("pruning storage event {row:?}");
3836                    let builtin_update = BuiltinTableUpdate::row(item_id, row, Diff::MINUS_ONE);
3837                    expired.push(builtin_update);
3838                }
3839            }
3840
3841            // main thread has shut down.
3842            let _ = internal_cmd_tx.send(Message::StorageUsagePrune(expired));
3843        });
3844    }
3845}
3846
3847#[cfg(test)]
3848impl Coordinator {
3849    #[allow(dead_code)]
3850    async fn verify_ship_dataflow_no_error(&mut self, dataflow: DataflowDescription<Plan>) {
3851        // `ship_dataflow_new` is not allowed to have a `Result` return because this function is
3852        // called after `catalog_transact`, after which no errors are allowed. This test exists to
3853        // prevent us from incorrectly teaching those functions how to return errors (which has
3854        // happened twice and is the motivation for this test).
3855
3856        // An arbitrary compute instance ID to satisfy the function calls below. Note that
3857        // this only works because this function will never run.
3858        let compute_instance = ComputeInstanceId::user(1).expect("1 is a valid ID");
3859
3860        let _: () = self.ship_dataflow(dataflow, compute_instance, None).await;
3861    }
3862}
3863
3864/// Contains information about the last message the [`Coordinator`] processed.
3865struct LastMessage {
3866    kind: &'static str,
3867    stmt: Option<Arc<Statement<Raw>>>,
3868}
3869
3870impl LastMessage {
3871    /// Returns a redacted version of the statement that is safe for logs.
3872    fn stmt_to_string(&self) -> Cow<'static, str> {
3873        self.stmt
3874            .as_ref()
3875            .map(|stmt| stmt.to_ast_string_redacted().into())
3876            .unwrap_or("<none>".into())
3877    }
3878}
3879
3880impl fmt::Debug for LastMessage {
3881    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3882        f.debug_struct("LastMessage")
3883            .field("kind", &self.kind)
3884            .field("stmt", &self.stmt_to_string())
3885            .finish()
3886    }
3887}
3888
3889impl Drop for LastMessage {
3890    fn drop(&mut self) {
3891        // Only print the last message if we're currently panicking, otherwise we'd spam our logs.
3892        if std::thread::panicking() {
3893            // If we're panicking theres no guarantee `tracing` still works, so print to stderr.
3894            eprintln!("Coordinator panicking, dumping last message\n{self:?}",);
3895        }
3896    }
3897}
3898
3899/// Serves the coordinator based on the provided configuration.
3900///
3901/// For a high-level description of the coordinator, see the [crate
3902/// documentation](crate).
3903///
3904/// Returns a handle to the coordinator and a client to communicate with the
3905/// coordinator.
3906///
3907/// BOXED FUTURE: As of Nov 2023 the returned Future from this function was 42KB. This would
3908/// get stored on the stack which is bad for runtime performance, and blow up our stack usage.
3909/// Because of that we purposefully move this Future onto the heap (i.e. Box it).
3910pub fn serve(
3911    Config {
3912        controller_config,
3913        controller_envd_epoch,
3914        mut storage,
3915        audit_logs_iterator,
3916        timestamp_oracle_url,
3917        unsafe_mode,
3918        all_features,
3919        build_info,
3920        environment_id,
3921        metrics_registry,
3922        now,
3923        secrets_controller,
3924        cloud_resource_controller,
3925        cluster_replica_sizes,
3926        builtin_system_cluster_config,
3927        builtin_catalog_server_cluster_config,
3928        builtin_probe_cluster_config,
3929        builtin_support_cluster_config,
3930        builtin_analytics_cluster_config,
3931        system_parameter_defaults,
3932        availability_zones,
3933        storage_usage_client,
3934        storage_usage_collection_interval,
3935        storage_usage_retention_period,
3936        segment_client,
3937        egress_addresses,
3938        aws_account_id,
3939        aws_privatelink_availability_zones,
3940        connection_context,
3941        connection_limit_callback,
3942        remote_system_parameters,
3943        webhook_concurrency_limit,
3944        http_host_name,
3945        tracing_handle,
3946        read_only_controllers,
3947        enable_0dt_deployment,
3948        caught_up_trigger: clusters_caught_up_trigger,
3949        helm_chart_version,
3950        license_key,
3951    }: Config,
3952) -> BoxFuture<'static, Result<(Handle, Client), AdapterError>> {
3953    async move {
3954        let coord_start = Instant::now();
3955        info!("startup: coordinator init: beginning");
3956        info!("startup: coordinator init: preamble beginning");
3957
3958        // Initializing the builtins can be an expensive process and consume a lot of memory. We
3959        // forcibly initialize it early while the stack is relatively empty to avoid stack
3960        // overflows later.
3961        let _builtins = LazyLock::force(&BUILTINS_STATIC);
3962
3963        let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
3964        let (internal_cmd_tx, internal_cmd_rx) = mpsc::unbounded_channel();
3965        let (strict_serializable_reads_tx, strict_serializable_reads_rx) =
3966            mpsc::unbounded_channel();
3967
3968        // Validate and process availability zones.
3969        if !availability_zones.iter().all_unique() {
3970            coord_bail!("availability zones must be unique");
3971        }
3972
3973        let aws_principal_context = match (
3974            aws_account_id,
3975            connection_context.aws_external_id_prefix.clone(),
3976        ) {
3977            (Some(aws_account_id), Some(aws_external_id_prefix)) => Some(AwsPrincipalContext {
3978                aws_account_id,
3979                aws_external_id_prefix,
3980            }),
3981            _ => None,
3982        };
3983
3984        let aws_privatelink_availability_zones = aws_privatelink_availability_zones
3985            .map(|azs_vec| BTreeSet::from_iter(azs_vec.iter().cloned()));
3986
3987        info!(
3988            "startup: coordinator init: preamble complete in {:?}",
3989            coord_start.elapsed()
3990        );
3991        let oracle_init_start = Instant::now();
3992        info!("startup: coordinator init: timestamp oracle init beginning");
3993
3994        let pg_timestamp_oracle_config = timestamp_oracle_url
3995            .map(|pg_url| PostgresTimestampOracleConfig::new(&pg_url, &metrics_registry));
3996        let mut initial_timestamps =
3997            get_initial_oracle_timestamps(&pg_timestamp_oracle_config).await?;
3998
3999        // Insert an entry for the `EpochMilliseconds` timeline if one doesn't exist,
4000        // which will ensure that the timeline is initialized since it's required
4001        // by the system.
4002        initial_timestamps
4003            .entry(Timeline::EpochMilliseconds)
4004            .or_insert_with(mz_repr::Timestamp::minimum);
4005        let mut timestamp_oracles = BTreeMap::new();
4006        for (timeline, initial_timestamp) in initial_timestamps {
4007            Coordinator::ensure_timeline_state_with_initial_time(
4008                &timeline,
4009                initial_timestamp,
4010                now.clone(),
4011                pg_timestamp_oracle_config.clone(),
4012                &mut timestamp_oracles,
4013                read_only_controllers,
4014            )
4015            .await;
4016        }
4017
4018        // Opening the durable catalog uses one or more timestamps without communicating with
4019        // the timestamp oracle. Here we make sure to apply the catalog upper with the timestamp
4020        // oracle to linearize future operations with opening the catalog.
4021        let catalog_upper = storage.current_upper().await;
4022        // Choose a time at which to boot. This is used, for example, to prune
4023        // old storage usage data or migrate audit log entries.
4024        //
4025        // This time is usually the current system time, but with protection
4026        // against backwards time jumps, even across restarts.
4027        let epoch_millis_oracle = &timestamp_oracles
4028            .get(&Timeline::EpochMilliseconds)
4029            .expect("inserted above")
4030            .oracle;
4031
4032        let mut boot_ts = if read_only_controllers {
4033            let read_ts = epoch_millis_oracle.read_ts().await;
4034            std::cmp::max(read_ts, catalog_upper)
4035        } else {
4036            // Getting/applying a write timestamp bumps the write timestamp in the
4037            // oracle, which we're not allowed in read-only mode.
4038            epoch_millis_oracle.apply_write(catalog_upper).await;
4039            epoch_millis_oracle.write_ts().await.timestamp
4040        };
4041
4042        info!(
4043            "startup: coordinator init: timestamp oracle init complete in {:?}",
4044            oracle_init_start.elapsed()
4045        );
4046
4047        let catalog_open_start = Instant::now();
4048        info!("startup: coordinator init: catalog open beginning");
4049        let persist_client = controller_config
4050            .persist_clients
4051            .open(controller_config.persist_location.clone())
4052            .await
4053            .context("opening persist client")?;
4054        let builtin_item_migration_config =
4055            BuiltinItemMigrationConfig {
4056                persist_client: persist_client.clone(),
4057                read_only: read_only_controllers,
4058            }
4059        ;
4060        let OpenCatalogResult {
4061            mut catalog,
4062            migrated_storage_collections_0dt,
4063            new_builtin_collections,
4064            builtin_table_updates,
4065            cached_global_exprs,
4066            uncached_local_exprs,
4067        } = Catalog::open(mz_catalog::config::Config {
4068            storage,
4069            metrics_registry: &metrics_registry,
4070            state: mz_catalog::config::StateConfig {
4071                unsafe_mode,
4072                all_features,
4073                build_info,
4074                environment_id: environment_id.clone(),
4075                read_only: read_only_controllers,
4076                now: now.clone(),
4077                boot_ts: boot_ts.clone(),
4078                skip_migrations: false,
4079                cluster_replica_sizes,
4080                builtin_system_cluster_config,
4081                builtin_catalog_server_cluster_config,
4082                builtin_probe_cluster_config,
4083                builtin_support_cluster_config,
4084                builtin_analytics_cluster_config,
4085                system_parameter_defaults,
4086                remote_system_parameters,
4087                availability_zones,
4088                egress_addresses,
4089                aws_principal_context,
4090                aws_privatelink_availability_zones,
4091                connection_context,
4092                http_host_name,
4093                builtin_item_migration_config,
4094                persist_client: persist_client.clone(),
4095                enable_expression_cache_override: None,
4096                enable_0dt_deployment,
4097                helm_chart_version,
4098            },
4099        })
4100        .await?;
4101
4102        // Opening the catalog uses one or more timestamps, so push the boot timestamp up to the
4103        // current catalog upper.
4104        let catalog_upper = catalog.current_upper().await;
4105        boot_ts = std::cmp::max(boot_ts, catalog_upper);
4106
4107        if !read_only_controllers {
4108            epoch_millis_oracle.apply_write(boot_ts).await;
4109        }
4110
4111        info!(
4112            "startup: coordinator init: catalog open complete in {:?}",
4113            catalog_open_start.elapsed()
4114        );
4115
4116        let coord_thread_start = Instant::now();
4117        info!("startup: coordinator init: coordinator thread start beginning");
4118
4119        let session_id = catalog.config().session_id;
4120        let start_instant = catalog.config().start_instant;
4121
4122        // In order for the coordinator to support Rc and Refcell types, it cannot be
4123        // sent across threads. Spawn it in a thread and have this parent thread wait
4124        // for bootstrap completion before proceeding.
4125        let (bootstrap_tx, bootstrap_rx) = oneshot::channel();
4126        let handle = TokioHandle::current();
4127
4128        let metrics = Metrics::register_into(&metrics_registry);
4129        let metrics_clone = metrics.clone();
4130        let optimizer_metrics = OptimizerMetrics::register_into(
4131            &metrics_registry,
4132            catalog.system_config().optimizer_e2e_latency_warning_threshold(),
4133        );
4134        let segment_client_clone = segment_client.clone();
4135        let coord_now = now.clone();
4136        let advance_timelines_interval = tokio::time::interval(catalog.config().timestamp_interval);
4137        let mut check_scheduling_policies_interval = tokio::time::interval(
4138            catalog
4139                .system_config()
4140                .cluster_check_scheduling_policies_interval(),
4141        );
4142        check_scheduling_policies_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
4143
4144        let clusters_caught_up_check_interval = if read_only_controllers {
4145            let dyncfgs = catalog.system_config().dyncfgs();
4146            let interval = WITH_0DT_DEPLOYMENT_CAUGHT_UP_CHECK_INTERVAL.get(dyncfgs);
4147
4148            let mut interval = tokio::time::interval(interval);
4149            interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
4150            interval
4151        } else {
4152            // When not in read-only mode, we don't do hydration checks. But we
4153            // still have to provide _some_ interval. This is large enough that
4154            // it doesn't matter.
4155            //
4156            // TODO(aljoscha): We cannot use Duration::MAX right now because of
4157            // https://github.com/tokio-rs/tokio/issues/6634. Use that once it's
4158            // fixed for good.
4159            let mut interval = tokio::time::interval(Duration::from_secs(60 * 60));
4160            interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
4161            interval
4162        };
4163
4164        let clusters_caught_up_check =
4165            clusters_caught_up_trigger.map(|trigger| CaughtUpCheckContext {
4166                trigger,
4167                exclude_collections: new_builtin_collections.into_iter().collect(),
4168            });
4169
4170        if let Some(config) = pg_timestamp_oracle_config.as_ref() {
4171            // Apply settings from system vars as early as possible because some
4172            // of them are locked in right when an oracle is first opened!
4173            let pg_timestamp_oracle_params =
4174                flags::pg_timstamp_oracle_config(catalog.system_config());
4175            pg_timestamp_oracle_params.apply(config);
4176        }
4177
4178        // Register a callback so whenever the MAX_CONNECTIONS or SUPERUSER_RESERVED_CONNECTIONS
4179        // system variables change, we update our connection limits.
4180        let connection_limit_callback: Arc<dyn Fn(&SystemVars) + Send + Sync> =
4181            Arc::new(move |system_vars: &SystemVars| {
4182                let limit: u64 = system_vars.max_connections().cast_into();
4183                let superuser_reserved: u64 =
4184                    system_vars.superuser_reserved_connections().cast_into();
4185
4186                // If superuser_reserved > max_connections, prefer max_connections.
4187                //
4188                // In this scenario all normal users would be locked out because all connections
4189                // would be reserved for superusers so complain if this is the case.
4190                let superuser_reserved = if superuser_reserved >= limit {
4191                    tracing::warn!(
4192                        "superuser_reserved ({superuser_reserved}) is greater than max connections ({limit})!"
4193                    );
4194                    limit
4195                } else {
4196                    superuser_reserved
4197                };
4198
4199                (connection_limit_callback)(limit, superuser_reserved);
4200            });
4201        catalog.system_config_mut().register_callback(
4202            &mz_sql::session::vars::MAX_CONNECTIONS,
4203            Arc::clone(&connection_limit_callback),
4204        );
4205        catalog.system_config_mut().register_callback(
4206            &mz_sql::session::vars::SUPERUSER_RESERVED_CONNECTIONS,
4207            connection_limit_callback,
4208        );
4209
4210        let (group_commit_tx, group_commit_rx) = appends::notifier();
4211
4212        let parent_span = tracing::Span::current();
4213        let thread = thread::Builder::new()
4214            // The Coordinator thread tends to keep a lot of data on its stack. To
4215            // prevent a stack overflow we allocate a stack three times as big as the default
4216            // stack.
4217            .stack_size(3 * stack::STACK_SIZE)
4218            .name("coordinator".to_string())
4219            .spawn(move || {
4220                let span = info_span!(parent: parent_span, "coord::coordinator").entered();
4221
4222                let controller = handle
4223                    .block_on({
4224                        catalog.initialize_controller(
4225                            controller_config,
4226                            controller_envd_epoch,
4227                            read_only_controllers,
4228                        )
4229                    })
4230                    .unwrap_or_terminate("failed to initialize storage_controller");
4231                // Initializing the controller uses one or more timestamps, so push the boot timestamp up to the
4232                // current catalog upper.
4233                let catalog_upper = handle.block_on(catalog.current_upper());
4234                boot_ts = std::cmp::max(boot_ts, catalog_upper);
4235                if !read_only_controllers {
4236                    let epoch_millis_oracle = &timestamp_oracles
4237                        .get(&Timeline::EpochMilliseconds)
4238                        .expect("inserted above")
4239                        .oracle;
4240                    handle.block_on(epoch_millis_oracle.apply_write(boot_ts));
4241                }
4242
4243                let catalog = Arc::new(catalog);
4244
4245                let caching_secrets_reader = CachingSecretsReader::new(secrets_controller.reader());
4246                let mut coord = Coordinator {
4247                    controller,
4248                    catalog,
4249                    internal_cmd_tx,
4250                    group_commit_tx,
4251                    strict_serializable_reads_tx,
4252                    global_timelines: timestamp_oracles,
4253                    transient_id_gen: Arc::new(TransientIdGen::new()),
4254                    active_conns: BTreeMap::new(),
4255                    txn_read_holds: Default::default(),
4256                    pending_peeks: BTreeMap::new(),
4257                    client_pending_peeks: BTreeMap::new(),
4258                    pending_linearize_read_txns: BTreeMap::new(),
4259                    serialized_ddl: LockedVecDeque::new(),
4260                    active_compute_sinks: BTreeMap::new(),
4261                    active_webhooks: BTreeMap::new(),
4262                    active_copies: BTreeMap::new(),
4263                    staged_cancellation: BTreeMap::new(),
4264                    introspection_subscribes: BTreeMap::new(),
4265                    write_locks: BTreeMap::new(),
4266                    deferred_write_ops: BTreeMap::new(),
4267                    pending_writes: Vec::new(),
4268                    advance_timelines_interval,
4269                    secrets_controller,
4270                    caching_secrets_reader,
4271                    cloud_resource_controller,
4272                    transient_replica_metadata: BTreeMap::new(),
4273                    storage_usage_client,
4274                    storage_usage_collection_interval,
4275                    segment_client,
4276                    metrics,
4277                    optimizer_metrics,
4278                    tracing_handle,
4279                    statement_logging: StatementLogging::new(coord_now.clone()),
4280                    webhook_concurrency_limit,
4281                    pg_timestamp_oracle_config,
4282                    check_cluster_scheduling_policies_interval: check_scheduling_policies_interval,
4283                    cluster_scheduling_decisions: BTreeMap::new(),
4284                    caught_up_check_interval: clusters_caught_up_check_interval,
4285                    caught_up_check: clusters_caught_up_check,
4286                    installed_watch_sets: BTreeMap::new(),
4287                    connection_watch_sets: BTreeMap::new(),
4288                    cluster_replica_statuses: ClusterReplicaStatuses::new(),
4289                    read_only_controllers,
4290                    buffered_builtin_table_updates: Some(Vec::new()),
4291                    license_key,
4292                };
4293                let bootstrap = handle.block_on(async {
4294                    coord
4295                        .bootstrap(
4296                            boot_ts,
4297                            migrated_storage_collections_0dt,
4298                            builtin_table_updates,
4299                            cached_global_exprs,
4300                            uncached_local_exprs,
4301                            audit_logs_iterator,
4302                        )
4303                        .await?;
4304                    coord
4305                        .controller
4306                        .remove_orphaned_replicas(
4307                            coord.catalog().get_next_user_replica_id().await?,
4308                            coord.catalog().get_next_system_replica_id().await?,
4309                        )
4310                        .await
4311                        .map_err(AdapterError::Orchestrator)?;
4312
4313                    if let Some(retention_period) = storage_usage_retention_period {
4314                        coord
4315                            .prune_storage_usage_events_on_startup(retention_period)
4316                            .await;
4317                    }
4318
4319                    Ok(())
4320                });
4321                let ok = bootstrap.is_ok();
4322                drop(span);
4323                bootstrap_tx
4324                    .send(bootstrap)
4325                    .expect("bootstrap_rx is not dropped until it receives this message");
4326                if ok {
4327                    handle.block_on(coord.serve(
4328                        internal_cmd_rx,
4329                        strict_serializable_reads_rx,
4330                        cmd_rx,
4331                        group_commit_rx,
4332                    ));
4333                }
4334            })
4335            .expect("failed to create coordinator thread");
4336        match bootstrap_rx
4337            .await
4338            .expect("bootstrap_tx always sends a message or panics/halts")
4339        {
4340            Ok(()) => {
4341                info!(
4342                    "startup: coordinator init: coordinator thread start complete in {:?}",
4343                    coord_thread_start.elapsed()
4344                );
4345                info!(
4346                    "startup: coordinator init: complete in {:?}",
4347                    coord_start.elapsed()
4348                );
4349                let handle = Handle {
4350                    session_id,
4351                    start_instant,
4352                    _thread: thread.join_on_drop(),
4353                };
4354                let client = Client::new(
4355                    build_info,
4356                    cmd_tx.clone(),
4357                    metrics_clone,
4358                    now,
4359                    environment_id,
4360                    segment_client_clone,
4361                );
4362                Ok((handle, client))
4363            }
4364            Err(e) => Err(e),
4365        }
4366    }
4367    .boxed()
4368}
4369
4370// Determines and returns the highest timestamp for each timeline, for all known
4371// timestamp oracle implementations.
4372//
4373// Initially, we did this so that we can switch between implementations of
4374// timestamp oracle, but now we also do this to determine a monotonic boot
4375// timestamp, a timestamp that does not regress across reboots.
4376//
4377// This mostly works, but there can be linearizability violations, because there
4378// is no central moment where we do distributed coordination for all oracle
4379// types. Working around this seems prohibitively hard, maybe even impossible so
4380// we have to live with this window of potential violations during the upgrade
4381// window (which is the only point where we should switch oracle
4382// implementations).
4383async fn get_initial_oracle_timestamps(
4384    pg_timestamp_oracle_config: &Option<PostgresTimestampOracleConfig>,
4385) -> Result<BTreeMap<Timeline, Timestamp>, AdapterError> {
4386    let mut initial_timestamps = BTreeMap::new();
4387
4388    if let Some(pg_timestamp_oracle_config) = pg_timestamp_oracle_config {
4389        let postgres_oracle_timestamps =
4390            PostgresTimestampOracle::<NowFn>::get_all_timelines(pg_timestamp_oracle_config.clone())
4391                .await?;
4392
4393        let debug_msg = || {
4394            postgres_oracle_timestamps
4395                .iter()
4396                .map(|(timeline, ts)| format!("{:?} -> {}", timeline, ts))
4397                .join(", ")
4398        };
4399        info!(
4400            "current timestamps from the postgres-backed timestamp oracle: {}",
4401            debug_msg()
4402        );
4403
4404        for (timeline, ts) in postgres_oracle_timestamps {
4405            let entry = initial_timestamps
4406                .entry(Timeline::from_str(&timeline).expect("could not parse timeline"));
4407
4408            entry
4409                .and_modify(|current_ts| *current_ts = std::cmp::max(*current_ts, ts))
4410                .or_insert(ts);
4411        }
4412    } else {
4413        info!("no postgres url for postgres-backed timestamp oracle configured!");
4414    };
4415
4416    let debug_msg = || {
4417        initial_timestamps
4418            .iter()
4419            .map(|(timeline, ts)| format!("{:?}: {}", timeline, ts))
4420            .join(", ")
4421    };
4422    info!("initial oracle timestamps: {}", debug_msg());
4423
4424    Ok(initial_timestamps)
4425}
4426
4427#[instrument]
4428pub async fn load_remote_system_parameters(
4429    storage: &mut Box<dyn OpenableDurableCatalogState>,
4430    system_parameter_sync_config: Option<SystemParameterSyncConfig>,
4431    system_parameter_sync_timeout: Duration,
4432) -> Result<Option<BTreeMap<String, String>>, AdapterError> {
4433    if let Some(system_parameter_sync_config) = system_parameter_sync_config {
4434        tracing::info!("parameter sync on boot: start sync");
4435
4436        // We intentionally block initial startup, potentially forever,
4437        // on initializing LaunchDarkly. This may seem scary, but the
4438        // alternative is even scarier. Over time, we expect that the
4439        // compiled-in default values for the system parameters will
4440        // drift substantially from the defaults configured in
4441        // LaunchDarkly, to the point that starting an environment
4442        // without loading the latest values from LaunchDarkly will
4443        // result in running an untested configuration.
4444        //
4445        // Note this only applies during initial startup. Restarting
4446        // after we've synced once only blocks for a maximum of
4447        // `FRONTEND_SYNC_TIMEOUT` on LaunchDarkly, as it seems
4448        // reasonable to assume that the last-synced configuration was
4449        // valid enough.
4450        //
4451        // This philosophy appears to provide a good balance between not
4452        // running untested configurations in production while also not
4453        // making LaunchDarkly a "tier 1" dependency for existing
4454        // environments.
4455        //
4456        // If this proves to be an issue, we could seek to address the
4457        // configuration drift in a different way--for example, by
4458        // writing a script that runs in CI nightly and checks for
4459        // deviation between the compiled Rust code and LaunchDarkly.
4460        //
4461        // If it is absolutely necessary to bring up a new environment
4462        // while LaunchDarkly is down, the following manual mitigation
4463        // can be performed:
4464        //
4465        //    1. Edit the environmentd startup parameters to omit the
4466        //       LaunchDarkly configuration.
4467        //    2. Boot environmentd.
4468        //    3. Use the catalog-debug tool to run `edit config "{\"key\":\"system_config_synced\"}" "{\"value\": 1}"`.
4469        //    4. Adjust any other parameters as necessary to avoid
4470        //       running a nonstandard configuration in production.
4471        //    5. Edit the environmentd startup parameters to restore the
4472        //       LaunchDarkly configuration, for when LaunchDarkly comes
4473        //       back online.
4474        //    6. Reboot environmentd.
4475        let mut params = SynchronizedParameters::new(SystemVars::default());
4476        let frontend_sync = async {
4477            let frontend = SystemParameterFrontend::from(&system_parameter_sync_config).await?;
4478            frontend.pull(&mut params);
4479            let ops = params
4480                .modified()
4481                .into_iter()
4482                .map(|param| {
4483                    let name = param.name;
4484                    let value = param.value;
4485                    tracing::info!(name, value, initial = true, "sync parameter");
4486                    (name, value)
4487                })
4488                .collect();
4489            tracing::info!("parameter sync on boot: end sync");
4490            Ok(Some(ops))
4491        };
4492        if !storage.has_system_config_synced_once().await? {
4493            frontend_sync.await
4494        } else {
4495            match mz_ore::future::timeout(system_parameter_sync_timeout, frontend_sync).await {
4496                Ok(ops) => Ok(ops),
4497                Err(TimeoutError::Inner(e)) => Err(e),
4498                Err(TimeoutError::DeadlineElapsed) => {
4499                    tracing::info!("parameter sync on boot: sync has timed out");
4500                    Ok(None)
4501                }
4502            }
4503        }
4504    } else {
4505        Ok(None)
4506    }
4507}
4508
4509#[derive(Debug)]
4510pub(crate) enum WatchSetResponse {
4511    StatementDependenciesReady(StatementLoggingId, StatementLifecycleEvent),
4512    AlterSinkReady(AlterSinkReadyContext),
4513}
4514
4515#[derive(Debug)]
4516pub(crate) struct AlterSinkReadyContext {
4517    ctx: Option<ExecuteContext>,
4518    otel_ctx: OpenTelemetryContext,
4519    plan: AlterSinkPlan,
4520    plan_validity: PlanValidity,
4521    resolved_ids: ResolvedIds,
4522    read_hold: ReadHolds<Timestamp>,
4523}
4524
4525impl AlterSinkReadyContext {
4526    fn ctx(&mut self) -> &mut ExecuteContext {
4527        self.ctx.as_mut().expect("only cleared on drop")
4528    }
4529
4530    fn retire(mut self, result: Result<ExecuteResponse, AdapterError>) {
4531        self.ctx
4532            .take()
4533            .expect("only cleared on drop")
4534            .retire(result);
4535    }
4536}
4537
4538impl Drop for AlterSinkReadyContext {
4539    fn drop(&mut self) {
4540        if let Some(ctx) = self.ctx.take() {
4541            ctx.retire(Err(AdapterError::Canceled));
4542        }
4543    }
4544}
4545
4546/// A struct for tracking the ownership of a lock and a VecDeque to store to-be-done work after the
4547/// lock is freed.
4548#[derive(Debug)]
4549struct LockedVecDeque<T> {
4550    items: VecDeque<T>,
4551    lock: Arc<tokio::sync::Mutex<()>>,
4552}
4553
4554impl<T> LockedVecDeque<T> {
4555    pub fn new() -> Self {
4556        Self {
4557            items: VecDeque::new(),
4558            lock: Arc::new(tokio::sync::Mutex::new(())),
4559        }
4560    }
4561
4562    pub fn try_lock_owned(&self) -> Result<OwnedMutexGuard<()>, tokio::sync::TryLockError> {
4563        Arc::clone(&self.lock).try_lock_owned()
4564    }
4565
4566    pub fn is_empty(&self) -> bool {
4567        self.items.is_empty()
4568    }
4569
4570    pub fn push_back(&mut self, value: T) {
4571        self.items.push_back(value)
4572    }
4573
4574    pub fn pop_front(&mut self) -> Option<T> {
4575        self.items.pop_front()
4576    }
4577
4578    pub fn remove(&mut self, index: usize) -> Option<T> {
4579        self.items.remove(index)
4580    }
4581
4582    pub fn iter(&self) -> std::collections::vec_deque::Iter<'_, T> {
4583        self.items.iter()
4584    }
4585}
4586
4587#[derive(Debug)]
4588struct DeferredPlanStatement {
4589    ctx: ExecuteContext,
4590    ps: PlanStatement,
4591}
4592
4593#[derive(Debug)]
4594enum PlanStatement {
4595    Statement {
4596        stmt: Arc<Statement<Raw>>,
4597        params: Params,
4598    },
4599    Plan {
4600        plan: mz_sql::plan::Plan,
4601        resolved_ids: ResolvedIds,
4602    },
4603}
4604
4605#[derive(Debug, Error)]
4606pub enum NetworkPolicyError {
4607    #[error("Access denied for address {0}")]
4608    AddressDenied(IpAddr),
4609    #[error("Access denied missing IP address")]
4610    MissingIp,
4611}
4612
4613pub(crate) fn validate_ip_with_policy_rules(
4614    ip: &IpAddr,
4615    rules: &Vec<NetworkPolicyRule>,
4616) -> Result<(), NetworkPolicyError> {
4617    // At the moment we're not handling action or direction
4618    // as those are only able to be "allow" and "ingress" respectively
4619    if rules.iter().any(|r| r.address.0.contains(ip)) {
4620        Ok(())
4621    } else {
4622        Err(NetworkPolicyError::AddressDenied(ip.clone()))
4623    }
4624}