mz_adapter/
coord.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Translation of SQL commands into timestamped `Controller` commands.
11//!
12//! The various SQL commands instruct the system to take actions that are not
13//! yet explicitly timestamped. On the other hand, the underlying data continually
14//! change as time moves forward. On the third hand, we greatly benefit from the
15//! information that some times are no longer of interest, so that we may
16//! compact the representation of the continually changing collections.
17//!
18//! The [`Coordinator`] curates these interactions by observing the progress
19//! collections make through time, choosing timestamps for its own commands,
20//! and eventually communicating that certain times have irretrievably "passed".
21//!
22//! ## Frontiers another way
23//!
24//! If the above description of frontiers left you with questions, this
25//! repackaged explanation might help.
26//!
27//! - `since` is the least recent time (i.e. oldest time) that you can read
28//!   from sources and be guaranteed that the returned data is accurate as of
29//!   that time.
30//!
31//!   Reads at times less than `since` may return values that were not actually
32//!   seen at the specified time, but arrived later (i.e. the results are
33//!   compacted).
34//!
35//!   For correctness' sake, the coordinator never chooses to read at a time
36//!   less than an arrangement's `since`.
37//!
38//! - `upper` is the first time after the most recent time that you can read
39//!   from sources and receive an immediate response. Alternately, it is the
40//!   least time at which the data may still change (that is the reason we may
41//!   not be able to respond immediately).
42//!
43//!   Reads at times >= `upper` may not immediately return because the answer
44//!   isn't known yet. However, once the `upper` is > the specified read time,
45//!   the read can return.
46//!
47//!   For the sake of returned values' freshness, the coordinator prefers
48//!   performing reads at an arrangement's `upper`. However, because we more
49//!   strongly prefer correctness, the coordinator will choose timestamps
50//!   greater than an object's `upper` if it is also being accessed alongside
51//!   objects whose `since` times are >= its `upper`.
52//!
53//! This illustration attempts to show, with time moving left to right, the
54//! relationship between `since` and `upper`.
55//!
56//! - `#`: possibly inaccurate results
57//! - `-`: immediate, correct response
58//! - `?`: not yet known
59//! - `s`: since
60//! - `u`: upper
61//! - `|`: eligible for coordinator to select
62//!
63//! ```nofmt
64//! ####s----u?????
65//!     |||||||||||
66//! ```
67//!
68
69use std::borrow::Cow;
70use std::collections::{BTreeMap, BTreeSet, VecDeque};
71use std::fmt;
72use std::net::IpAddr;
73use std::num::NonZeroI64;
74use std::ops::Neg;
75use std::str::FromStr;
76use std::sync::LazyLock;
77use std::sync::{Arc, Mutex};
78use std::thread;
79use std::time::{Duration, Instant};
80
81use anyhow::Context;
82use chrono::{DateTime, Utc};
83use derivative::Derivative;
84use differential_dataflow::lattice::Lattice;
85use fail::fail_point;
86use futures::StreamExt;
87use futures::future::{BoxFuture, FutureExt, LocalBoxFuture};
88use http::Uri;
89use ipnet::IpNet;
90use itertools::{Either, Itertools};
91use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
92use mz_adapter_types::compaction::CompactionWindow;
93use mz_adapter_types::connection::ConnectionId;
94use mz_adapter_types::dyncfgs::WITH_0DT_DEPLOYMENT_CAUGHT_UP_CHECK_INTERVAL;
95use mz_auth::password::Password;
96use mz_build_info::BuildInfo;
97use mz_catalog::builtin::{BUILTINS, BUILTINS_STATIC, MZ_AUDIT_EVENTS, MZ_STORAGE_USAGE_BY_SHARD};
98use mz_catalog::config::{AwsPrincipalContext, BuiltinItemMigrationConfig, ClusterReplicaSizeMap};
99use mz_catalog::durable::{AuditLogIterator, OpenableDurableCatalogState};
100use mz_catalog::expr_cache::{GlobalExpressions, LocalExpressions};
101use mz_catalog::memory::objects::{
102    CatalogEntry, CatalogItem, ClusterReplicaProcessStatus, ClusterVariantManaged, Connection,
103    DataSourceDesc, StateDiff, StateUpdate, StateUpdateKind, Table, TableDataSource,
104};
105use mz_cloud_resources::{CloudResourceController, VpcEndpointConfig, VpcEndpointEvent};
106use mz_compute_client::as_of_selection;
107use mz_compute_client::controller::error::InstanceMissing;
108use mz_compute_types::ComputeInstanceId;
109use mz_compute_types::dataflows::DataflowDescription;
110use mz_compute_types::plan::Plan;
111use mz_controller::clusters::{
112    ClusterConfig, ClusterEvent, ClusterStatus, ProcessId, ReplicaLocation,
113};
114use mz_controller::{ControllerConfig, Readiness};
115use mz_controller_types::{ClusterId, ReplicaId, WatchSetId};
116use mz_expr::{MapFilterProject, OptimizedMirRelationExpr, RowSetFinishing};
117use mz_license_keys::ValidatedLicenseKey;
118use mz_orchestrator::OfflineReason;
119use mz_ore::cast::{CastFrom, CastInto, CastLossy};
120use mz_ore::channel::trigger::Trigger;
121use mz_ore::future::TimeoutError;
122use mz_ore::metrics::MetricsRegistry;
123use mz_ore::now::{EpochMillis, NowFn};
124use mz_ore::task::{JoinHandle, spawn};
125use mz_ore::thread::JoinHandleExt;
126use mz_ore::tracing::{OpenTelemetryContext, TracingHandle};
127use mz_ore::url::SensitiveUrl;
128use mz_ore::{
129    assert_none, instrument, soft_assert_eq_or_log, soft_assert_or_log, soft_panic_or_log, stack,
130};
131use mz_persist_client::PersistClient;
132use mz_persist_client::batch::ProtoBatch;
133use mz_persist_client::usage::{ShardsUsageReferenced, StorageUsageClient};
134use mz_repr::adt::numeric::Numeric;
135use mz_repr::explain::{ExplainConfig, ExplainFormat};
136use mz_repr::global_id::TransientIdGen;
137use mz_repr::optimize::OptimizerFeatures;
138use mz_repr::role_id::RoleId;
139use mz_repr::{CatalogItemId, Diff, GlobalId, RelationDesc, Timestamp};
140use mz_secrets::cache::CachingSecretsReader;
141use mz_secrets::{SecretsController, SecretsReader};
142use mz_sql::ast::{Raw, Statement};
143use mz_sql::catalog::{CatalogCluster, EnvironmentId};
144use mz_sql::names::{QualifiedItemName, ResolvedIds, SchemaSpecifier};
145use mz_sql::optimizer_metrics::OptimizerMetrics;
146use mz_sql::plan::{
147    self, AlterSinkPlan, ConnectionDetails, CreateConnectionPlan, NetworkPolicyRule,
148    OnTimeoutAction, Params, QueryWhen,
149};
150use mz_sql::session::user::User;
151use mz_sql::session::vars::{MAX_CREDIT_CONSUMPTION_RATE, SystemVars, Var};
152use mz_sql_parser::ast::ExplainStage;
153use mz_sql_parser::ast::display::AstDisplay;
154use mz_storage_client::client::TableData;
155use mz_storage_client::controller::{CollectionDescription, DataSource, ExportDescription};
156use mz_storage_types::connections::Connection as StorageConnection;
157use mz_storage_types::connections::ConnectionContext;
158use mz_storage_types::connections::inline::{IntoInlineConnection, ReferencedConnection};
159use mz_storage_types::read_holds::ReadHold;
160use mz_storage_types::sinks::{S3SinkFormat, StorageSinkDesc};
161use mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC;
162use mz_storage_types::sources::{IngestionDescription, SourceExport, Timeline};
163use mz_timestamp_oracle::WriteTimestamp;
164use mz_timestamp_oracle::postgres_oracle::{
165    PostgresTimestampOracle, PostgresTimestampOracleConfig,
166};
167use mz_transform::dataflow::DataflowMetainfo;
168use opentelemetry::trace::TraceContextExt;
169use serde::Serialize;
170use thiserror::Error;
171use timely::progress::{Antichain, Timestamp as _};
172use tokio::runtime::Handle as TokioHandle;
173use tokio::select;
174use tokio::sync::{OwnedMutexGuard, mpsc, oneshot, watch};
175use tokio::time::{Interval, MissedTickBehavior};
176use tracing::{Instrument, Level, Span, debug, info, info_span, span, warn};
177use tracing_opentelemetry::OpenTelemetrySpanExt;
178use uuid::Uuid;
179
180use crate::active_compute_sink::{ActiveComputeSink, ActiveCopyFrom};
181use crate::catalog::{BuiltinTableUpdate, Catalog, OpenCatalogResult};
182use crate::client::{Client, Handle};
183use crate::command::{Command, ExecuteResponse};
184use crate::config::{SynchronizedParameters, SystemParameterFrontend, SystemParameterSyncConfig};
185use crate::coord::appends::{
186    BuiltinTableAppendNotify, DeferredOp, GroupCommitPermit, PendingWriteTxn,
187};
188use crate::coord::caught_up::CaughtUpCheckContext;
189use crate::coord::cluster_scheduling::SchedulingDecision;
190use crate::coord::id_bundle::CollectionIdBundle;
191use crate::coord::introspection::IntrospectionSubscribe;
192use crate::coord::peek::PendingPeek;
193use crate::coord::statement_logging::{StatementLogging, StatementLoggingId};
194use crate::coord::timeline::{TimelineContext, TimelineState};
195use crate::coord::timestamp_selection::{TimestampContext, TimestampDetermination};
196use crate::coord::validity::PlanValidity;
197use crate::error::AdapterError;
198use crate::explain::insights::PlanInsightsContext;
199use crate::explain::optimizer_trace::{DispatchGuard, OptimizerTrace};
200use crate::metrics::Metrics;
201use crate::optimize::dataflows::{
202    ComputeInstanceSnapshot, DataflowBuilder, dataflow_import_id_bundle,
203};
204use crate::optimize::{self, Optimize, OptimizerConfig};
205use crate::session::{EndTransactionAction, Session};
206use crate::statement_logging::{StatementEndedExecutionReason, StatementLifecycleEvent};
207use crate::util::{ClientTransmitter, ResultExt};
208use crate::webhook::{WebhookAppenderInvalidator, WebhookConcurrencyLimiter};
209use crate::{AdapterNotice, ReadHolds, flags};
210
211pub(crate) mod id_bundle;
212pub(crate) mod in_memory_oracle;
213pub(crate) mod peek;
214pub(crate) mod statement_logging;
215pub(crate) mod timeline;
216pub(crate) mod timestamp_selection;
217
218pub mod appends;
219mod catalog_serving;
220mod caught_up;
221pub mod cluster_scheduling;
222mod command_handler;
223pub mod consistency;
224mod ddl;
225mod indexes;
226mod introspection;
227mod message_handler;
228mod privatelink_status;
229pub mod read_policy;
230mod sequencer;
231mod sql;
232mod validity;
233
234#[derive(Debug)]
235pub enum Message {
236    Command(OpenTelemetryContext, Command),
237    ControllerReady {
238        controller: ControllerReadiness,
239    },
240    PurifiedStatementReady(PurifiedStatementReady),
241    CreateConnectionValidationReady(CreateConnectionValidationReady),
242    AlterConnectionValidationReady(AlterConnectionValidationReady),
243    TryDeferred {
244        /// The connection that created this op.
245        conn_id: ConnectionId,
246        /// The write lock that notified us our deferred op might be able to run.
247        ///
248        /// Note: While we never want to hold a partial set of locks, it can be important to hold
249        /// onto the _one_ that notified us our op might be ready. If there are multiple operations
250        /// waiting on a single collection, and we don't hold this lock through retyring the op,
251        /// then everything waiting on this collection will get retried causing traffic in the
252        /// Coordinator's message queue.
253        ///
254        /// See [`DeferredOp::can_be_optimistically_retried`] for more detail.
255        acquired_lock: Option<(CatalogItemId, tokio::sync::OwnedMutexGuard<()>)>,
256    },
257    /// Initiates a group commit.
258    GroupCommitInitiate(Span, Option<GroupCommitPermit>),
259    DeferredStatementReady,
260    AdvanceTimelines,
261    ClusterEvent(ClusterEvent),
262    CancelPendingPeeks {
263        conn_id: ConnectionId,
264    },
265    LinearizeReads,
266    StagedBatches {
267        conn_id: ConnectionId,
268        table_id: CatalogItemId,
269        batches: Vec<Result<ProtoBatch, String>>,
270    },
271    StorageUsageSchedule,
272    StorageUsageFetch,
273    StorageUsageUpdate(ShardsUsageReferenced),
274    StorageUsagePrune(Vec<BuiltinTableUpdate>),
275    /// Performs any cleanup and logging actions necessary for
276    /// finalizing a statement execution.
277    RetireExecute {
278        data: ExecuteContextExtra,
279        otel_ctx: OpenTelemetryContext,
280        reason: StatementEndedExecutionReason,
281    },
282    ExecuteSingleStatementTransaction {
283        ctx: ExecuteContext,
284        otel_ctx: OpenTelemetryContext,
285        stmt: Arc<Statement<Raw>>,
286        params: mz_sql::plan::Params,
287    },
288    PeekStageReady {
289        ctx: ExecuteContext,
290        span: Span,
291        stage: PeekStage,
292    },
293    CreateIndexStageReady {
294        ctx: ExecuteContext,
295        span: Span,
296        stage: CreateIndexStage,
297    },
298    CreateViewStageReady {
299        ctx: ExecuteContext,
300        span: Span,
301        stage: CreateViewStage,
302    },
303    CreateMaterializedViewStageReady {
304        ctx: ExecuteContext,
305        span: Span,
306        stage: CreateMaterializedViewStage,
307    },
308    SubscribeStageReady {
309        ctx: ExecuteContext,
310        span: Span,
311        stage: SubscribeStage,
312    },
313    IntrospectionSubscribeStageReady {
314        span: Span,
315        stage: IntrospectionSubscribeStage,
316    },
317    SecretStageReady {
318        ctx: ExecuteContext,
319        span: Span,
320        stage: SecretStage,
321    },
322    ClusterStageReady {
323        ctx: ExecuteContext,
324        span: Span,
325        stage: ClusterStage,
326    },
327    ExplainTimestampStageReady {
328        ctx: ExecuteContext,
329        span: Span,
330        stage: ExplainTimestampStage,
331    },
332    DrainStatementLog,
333    PrivateLinkVpcEndpointEvents(Vec<VpcEndpointEvent>),
334    CheckSchedulingPolicies,
335
336    /// Scheduling policy decisions about turning clusters On/Off.
337    /// `Vec<(policy name, Vec of decisions by the policy)>`
338    /// A cluster will be On if and only if there is at least one On decision for it.
339    /// Scheduling decisions for clusters that have `SCHEDULE = MANUAL` are ignored.
340    SchedulingDecisions(Vec<(&'static str, Vec<(ClusterId, SchedulingDecision)>)>),
341}
342
343impl Message {
344    /// Returns a string to identify the kind of [`Message`], useful for logging.
345    pub const fn kind(&self) -> &'static str {
346        match self {
347            Message::Command(_, msg) => match msg {
348                Command::CatalogSnapshot { .. } => "command-catalog_snapshot",
349                Command::Startup { .. } => "command-startup",
350                Command::Execute { .. } => "command-execute",
351                Command::Commit { .. } => "command-commit",
352                Command::CancelRequest { .. } => "command-cancel_request",
353                Command::PrivilegedCancelRequest { .. } => "command-privileged_cancel_request",
354                Command::GetWebhook { .. } => "command-get_webhook",
355                Command::GetSystemVars { .. } => "command-get_system_vars",
356                Command::SetSystemVars { .. } => "command-set_system_vars",
357                Command::Terminate { .. } => "command-terminate",
358                Command::RetireExecute { .. } => "command-retire_execute",
359                Command::CheckConsistency { .. } => "command-check_consistency",
360                Command::Dump { .. } => "command-dump",
361                Command::AuthenticatePassword { .. } => "command-auth_check",
362                Command::AuthenticateGetSASLChallenge { .. } => "command-auth_get_sasl_challenge",
363                Command::AuthenticateVerifySASLProof { .. } => "command-auth_verify_sasl_proof",
364            },
365            Message::ControllerReady {
366                controller: ControllerReadiness::Compute,
367            } => "controller_ready(compute)",
368            Message::ControllerReady {
369                controller: ControllerReadiness::Storage,
370            } => "controller_ready(storage)",
371            Message::ControllerReady {
372                controller: ControllerReadiness::Metrics,
373            } => "controller_ready(metrics)",
374            Message::ControllerReady {
375                controller: ControllerReadiness::Internal,
376            } => "controller_ready(internal)",
377            Message::PurifiedStatementReady(_) => "purified_statement_ready",
378            Message::CreateConnectionValidationReady(_) => "create_connection_validation_ready",
379            Message::TryDeferred { .. } => "try_deferred",
380            Message::GroupCommitInitiate(..) => "group_commit_initiate",
381            Message::AdvanceTimelines => "advance_timelines",
382            Message::ClusterEvent(_) => "cluster_event",
383            Message::CancelPendingPeeks { .. } => "cancel_pending_peeks",
384            Message::LinearizeReads => "linearize_reads",
385            Message::StagedBatches { .. } => "staged_batches",
386            Message::StorageUsageSchedule => "storage_usage_schedule",
387            Message::StorageUsageFetch => "storage_usage_fetch",
388            Message::StorageUsageUpdate(_) => "storage_usage_update",
389            Message::StorageUsagePrune(_) => "storage_usage_prune",
390            Message::RetireExecute { .. } => "retire_execute",
391            Message::ExecuteSingleStatementTransaction { .. } => {
392                "execute_single_statement_transaction"
393            }
394            Message::PeekStageReady { .. } => "peek_stage_ready",
395            Message::ExplainTimestampStageReady { .. } => "explain_timestamp_stage_ready",
396            Message::CreateIndexStageReady { .. } => "create_index_stage_ready",
397            Message::CreateViewStageReady { .. } => "create_view_stage_ready",
398            Message::CreateMaterializedViewStageReady { .. } => {
399                "create_materialized_view_stage_ready"
400            }
401            Message::SubscribeStageReady { .. } => "subscribe_stage_ready",
402            Message::IntrospectionSubscribeStageReady { .. } => {
403                "introspection_subscribe_stage_ready"
404            }
405            Message::SecretStageReady { .. } => "secret_stage_ready",
406            Message::ClusterStageReady { .. } => "cluster_stage_ready",
407            Message::DrainStatementLog => "drain_statement_log",
408            Message::AlterConnectionValidationReady(..) => "alter_connection_validation_ready",
409            Message::PrivateLinkVpcEndpointEvents(_) => "private_link_vpc_endpoint_events",
410            Message::CheckSchedulingPolicies => "check_scheduling_policies",
411            Message::SchedulingDecisions { .. } => "scheduling_decision",
412            Message::DeferredStatementReady => "deferred_statement_ready",
413        }
414    }
415}
416
417/// The reason for why a controller needs processing on the main loop.
418#[derive(Debug)]
419pub enum ControllerReadiness {
420    /// The storage controller is ready.
421    Storage,
422    /// The compute controller is ready.
423    Compute,
424    /// A batch of metric data is ready.
425    Metrics,
426    /// An internally-generated message is ready to be returned.
427    Internal,
428}
429
430#[derive(Derivative)]
431#[derivative(Debug)]
432pub struct BackgroundWorkResult<T> {
433    #[derivative(Debug = "ignore")]
434    pub ctx: ExecuteContext,
435    pub result: Result<T, AdapterError>,
436    pub params: Params,
437    pub plan_validity: PlanValidity,
438    pub original_stmt: Arc<Statement<Raw>>,
439    pub otel_ctx: OpenTelemetryContext,
440}
441
442pub type PurifiedStatementReady = BackgroundWorkResult<mz_sql::pure::PurifiedStatement>;
443
444#[derive(Derivative)]
445#[derivative(Debug)]
446pub struct ValidationReady<T> {
447    #[derivative(Debug = "ignore")]
448    pub ctx: ExecuteContext,
449    pub result: Result<T, AdapterError>,
450    pub resolved_ids: ResolvedIds,
451    pub connection_id: CatalogItemId,
452    pub connection_gid: GlobalId,
453    pub plan_validity: PlanValidity,
454    pub otel_ctx: OpenTelemetryContext,
455}
456
457pub type CreateConnectionValidationReady = ValidationReady<CreateConnectionPlan>;
458pub type AlterConnectionValidationReady = ValidationReady<Connection>;
459
460#[derive(Debug)]
461pub enum PeekStage {
462    /// Common stages across SELECT, EXPLAIN and COPY TO queries.
463    LinearizeTimestamp(PeekStageLinearizeTimestamp),
464    RealTimeRecency(PeekStageRealTimeRecency),
465    TimestampReadHold(PeekStageTimestampReadHold),
466    Optimize(PeekStageOptimize),
467    /// Final stage for a peek.
468    Finish(PeekStageFinish),
469    /// Final stage for an explain.
470    ExplainPlan(PeekStageExplainPlan),
471    ExplainPushdown(PeekStageExplainPushdown),
472    /// Preflight checks for a copy to operation.
473    CopyToPreflight(PeekStageCopyTo),
474    /// Final stage for a copy to which involves shipping the dataflow.
475    CopyToDataflow(PeekStageCopyTo),
476}
477
478#[derive(Debug)]
479pub struct CopyToContext {
480    /// The `RelationDesc` of the data to be copied.
481    pub desc: RelationDesc,
482    /// The destination uri of the external service where the data will be copied.
483    pub uri: Uri,
484    /// Connection information required to connect to the external service to copy the data.
485    pub connection: StorageConnection<ReferencedConnection>,
486    /// The ID of the CONNECTION object to be used for copying the data.
487    pub connection_id: CatalogItemId,
488    /// Format params to format the data.
489    pub format: S3SinkFormat,
490    /// Approximate max file size of each uploaded file.
491    pub max_file_size: u64,
492    /// Number of batches the output of the COPY TO will be partitioned into
493    /// to distribute the load across workers deterministically.
494    /// This is only an option since it's not set when CopyToContext is instantiated
495    /// but immediately after in the PeekStageValidate stage.
496    pub output_batch_count: Option<u64>,
497}
498
499#[derive(Debug)]
500pub struct PeekStageLinearizeTimestamp {
501    validity: PlanValidity,
502    plan: mz_sql::plan::SelectPlan,
503    max_query_result_size: Option<u64>,
504    source_ids: BTreeSet<GlobalId>,
505    target_replica: Option<ReplicaId>,
506    timeline_context: TimelineContext,
507    optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
508    /// An optional context set iff the state machine is initiated from
509    /// sequencing an EXPLAIN for this statement.
510    explain_ctx: ExplainContext,
511}
512
513#[derive(Debug)]
514pub struct PeekStageRealTimeRecency {
515    validity: PlanValidity,
516    plan: mz_sql::plan::SelectPlan,
517    max_query_result_size: Option<u64>,
518    source_ids: BTreeSet<GlobalId>,
519    target_replica: Option<ReplicaId>,
520    timeline_context: TimelineContext,
521    oracle_read_ts: Option<Timestamp>,
522    optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
523    /// An optional context set iff the state machine is initiated from
524    /// sequencing an EXPLAIN for this statement.
525    explain_ctx: ExplainContext,
526}
527
528#[derive(Debug)]
529pub struct PeekStageTimestampReadHold {
530    validity: PlanValidity,
531    plan: mz_sql::plan::SelectPlan,
532    max_query_result_size: Option<u64>,
533    source_ids: BTreeSet<GlobalId>,
534    target_replica: Option<ReplicaId>,
535    timeline_context: TimelineContext,
536    oracle_read_ts: Option<Timestamp>,
537    real_time_recency_ts: Option<mz_repr::Timestamp>,
538    optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
539    /// An optional context set iff the state machine is initiated from
540    /// sequencing an EXPLAIN for this statement.
541    explain_ctx: ExplainContext,
542}
543
544#[derive(Debug)]
545pub struct PeekStageOptimize {
546    validity: PlanValidity,
547    plan: mz_sql::plan::SelectPlan,
548    max_query_result_size: Option<u64>,
549    source_ids: BTreeSet<GlobalId>,
550    id_bundle: CollectionIdBundle,
551    target_replica: Option<ReplicaId>,
552    determination: TimestampDetermination<mz_repr::Timestamp>,
553    optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
554    /// An optional context set iff the state machine is initiated from
555    /// sequencing an EXPLAIN for this statement.
556    explain_ctx: ExplainContext,
557}
558
559#[derive(Debug)]
560pub struct PeekStageFinish {
561    validity: PlanValidity,
562    plan: mz_sql::plan::SelectPlan,
563    max_query_result_size: Option<u64>,
564    id_bundle: CollectionIdBundle,
565    target_replica: Option<ReplicaId>,
566    source_ids: BTreeSet<GlobalId>,
567    determination: TimestampDetermination<mz_repr::Timestamp>,
568    cluster_id: ComputeInstanceId,
569    finishing: RowSetFinishing,
570    /// When present, an optimizer trace to be used for emitting a plan insights
571    /// notice.
572    plan_insights_optimizer_trace: Option<OptimizerTrace>,
573    insights_ctx: Option<Box<PlanInsightsContext>>,
574    global_lir_plan: optimize::peek::GlobalLirPlan,
575    optimization_finished_at: EpochMillis,
576}
577
578#[derive(Debug)]
579pub struct PeekStageCopyTo {
580    validity: PlanValidity,
581    optimizer: optimize::copy_to::Optimizer,
582    global_lir_plan: optimize::copy_to::GlobalLirPlan,
583    optimization_finished_at: EpochMillis,
584    source_ids: BTreeSet<GlobalId>,
585}
586
587#[derive(Debug)]
588pub struct PeekStageExplainPlan {
589    validity: PlanValidity,
590    optimizer: optimize::peek::Optimizer,
591    df_meta: DataflowMetainfo,
592    explain_ctx: ExplainPlanContext,
593    insights_ctx: Option<Box<PlanInsightsContext>>,
594}
595
596#[derive(Debug)]
597pub struct PeekStageExplainPushdown {
598    validity: PlanValidity,
599    determination: TimestampDetermination<mz_repr::Timestamp>,
600    imports: BTreeMap<GlobalId, MapFilterProject>,
601}
602
603#[derive(Debug)]
604pub enum CreateIndexStage {
605    Optimize(CreateIndexOptimize),
606    Finish(CreateIndexFinish),
607    Explain(CreateIndexExplain),
608}
609
610#[derive(Debug)]
611pub struct CreateIndexOptimize {
612    validity: PlanValidity,
613    plan: plan::CreateIndexPlan,
614    resolved_ids: ResolvedIds,
615    /// An optional context set iff the state machine is initiated from
616    /// sequencing an EXPLAIN for this statement.
617    explain_ctx: ExplainContext,
618}
619
620#[derive(Debug)]
621pub struct CreateIndexFinish {
622    validity: PlanValidity,
623    item_id: CatalogItemId,
624    global_id: GlobalId,
625    plan: plan::CreateIndexPlan,
626    resolved_ids: ResolvedIds,
627    global_mir_plan: optimize::index::GlobalMirPlan,
628    global_lir_plan: optimize::index::GlobalLirPlan,
629}
630
631#[derive(Debug)]
632pub struct CreateIndexExplain {
633    validity: PlanValidity,
634    exported_index_id: GlobalId,
635    plan: plan::CreateIndexPlan,
636    df_meta: DataflowMetainfo,
637    explain_ctx: ExplainPlanContext,
638}
639
640#[derive(Debug)]
641pub enum CreateViewStage {
642    Optimize(CreateViewOptimize),
643    Finish(CreateViewFinish),
644    Explain(CreateViewExplain),
645}
646
647#[derive(Debug)]
648pub struct CreateViewOptimize {
649    validity: PlanValidity,
650    plan: plan::CreateViewPlan,
651    resolved_ids: ResolvedIds,
652    /// An optional context set iff the state machine is initiated from
653    /// sequencing an EXPLAIN for this statement.
654    explain_ctx: ExplainContext,
655}
656
657#[derive(Debug)]
658pub struct CreateViewFinish {
659    validity: PlanValidity,
660    /// ID of this item in the Catalog.
661    item_id: CatalogItemId,
662    /// ID by with Compute will reference this View.
663    global_id: GlobalId,
664    plan: plan::CreateViewPlan,
665    /// IDs of objects resolved during name resolution.
666    resolved_ids: ResolvedIds,
667    optimized_expr: OptimizedMirRelationExpr,
668}
669
670#[derive(Debug)]
671pub struct CreateViewExplain {
672    validity: PlanValidity,
673    id: GlobalId,
674    plan: plan::CreateViewPlan,
675    explain_ctx: ExplainPlanContext,
676}
677
678#[derive(Debug)]
679pub enum ExplainTimestampStage {
680    Optimize(ExplainTimestampOptimize),
681    RealTimeRecency(ExplainTimestampRealTimeRecency),
682    Finish(ExplainTimestampFinish),
683}
684
685#[derive(Debug)]
686pub struct ExplainTimestampOptimize {
687    validity: PlanValidity,
688    plan: plan::ExplainTimestampPlan,
689    cluster_id: ClusterId,
690}
691
692#[derive(Debug)]
693pub struct ExplainTimestampRealTimeRecency {
694    validity: PlanValidity,
695    format: ExplainFormat,
696    optimized_plan: OptimizedMirRelationExpr,
697    cluster_id: ClusterId,
698    when: QueryWhen,
699}
700
701#[derive(Debug)]
702pub struct ExplainTimestampFinish {
703    validity: PlanValidity,
704    format: ExplainFormat,
705    optimized_plan: OptimizedMirRelationExpr,
706    cluster_id: ClusterId,
707    source_ids: BTreeSet<GlobalId>,
708    when: QueryWhen,
709    real_time_recency_ts: Option<Timestamp>,
710}
711
712#[derive(Debug)]
713pub enum ClusterStage {
714    Alter(AlterCluster),
715    WaitForHydrated(AlterClusterWaitForHydrated),
716    Finalize(AlterClusterFinalize),
717}
718
719#[derive(Debug)]
720pub struct AlterCluster {
721    validity: PlanValidity,
722    plan: plan::AlterClusterPlan,
723}
724
725#[derive(Debug)]
726pub struct AlterClusterWaitForHydrated {
727    validity: PlanValidity,
728    plan: plan::AlterClusterPlan,
729    new_config: ClusterVariantManaged,
730    timeout_time: Instant,
731    on_timeout: OnTimeoutAction,
732}
733
734#[derive(Debug)]
735pub struct AlterClusterFinalize {
736    validity: PlanValidity,
737    plan: plan::AlterClusterPlan,
738    new_config: ClusterVariantManaged,
739}
740
741#[derive(Debug)]
742pub enum ExplainContext {
743    /// The ordinary, non-explain variant of the statement.
744    None,
745    /// The `EXPLAIN <level> PLAN FOR <explainee>` version of the statement.
746    Plan(ExplainPlanContext),
747    /// Generate a notice containing the `EXPLAIN PLAN INSIGHTS` output
748    /// alongside the query's normal output.
749    PlanInsightsNotice(OptimizerTrace),
750    /// `EXPLAIN FILTER PUSHDOWN`
751    Pushdown,
752}
753
754impl ExplainContext {
755    /// If available for this context, wrap the [`OptimizerTrace`] into a
756    /// [`tracing::Dispatch`] and set it as default, returning the resulting
757    /// guard in a `Some(guard)` option.
758    fn dispatch_guard(&self) -> Option<DispatchGuard<'_>> {
759        let optimizer_trace = match self {
760            ExplainContext::Plan(explain_ctx) => Some(&explain_ctx.optimizer_trace),
761            ExplainContext::PlanInsightsNotice(optimizer_trace) => Some(optimizer_trace),
762            _ => None,
763        };
764        optimizer_trace.map(|optimizer_trace| optimizer_trace.as_guard())
765    }
766
767    fn needs_cluster(&self) -> bool {
768        match self {
769            ExplainContext::None => true,
770            ExplainContext::Plan(..) => false,
771            ExplainContext::PlanInsightsNotice(..) => true,
772            ExplainContext::Pushdown => false,
773        }
774    }
775
776    fn needs_plan_insights(&self) -> bool {
777        matches!(
778            self,
779            ExplainContext::Plan(ExplainPlanContext {
780                stage: ExplainStage::PlanInsights,
781                ..
782            }) | ExplainContext::PlanInsightsNotice(_)
783        )
784    }
785}
786
787#[derive(Debug)]
788pub struct ExplainPlanContext {
789    pub broken: bool,
790    pub config: ExplainConfig,
791    pub format: ExplainFormat,
792    pub stage: ExplainStage,
793    pub replan: Option<GlobalId>,
794    pub desc: Option<RelationDesc>,
795    pub optimizer_trace: OptimizerTrace,
796}
797
798#[derive(Debug)]
799pub enum CreateMaterializedViewStage {
800    Optimize(CreateMaterializedViewOptimize),
801    Finish(CreateMaterializedViewFinish),
802    Explain(CreateMaterializedViewExplain),
803}
804
805#[derive(Debug)]
806pub struct CreateMaterializedViewOptimize {
807    validity: PlanValidity,
808    plan: plan::CreateMaterializedViewPlan,
809    resolved_ids: ResolvedIds,
810    /// An optional context set iff the state machine is initiated from
811    /// sequencing an EXPLAIN for this statement.
812    explain_ctx: ExplainContext,
813}
814
815#[derive(Debug)]
816pub struct CreateMaterializedViewFinish {
817    /// The ID of this Materialized View in the Catalog.
818    item_id: CatalogItemId,
819    /// The ID of the durable pTVC backing this Materialized View.
820    global_id: GlobalId,
821    validity: PlanValidity,
822    plan: plan::CreateMaterializedViewPlan,
823    resolved_ids: ResolvedIds,
824    local_mir_plan: optimize::materialized_view::LocalMirPlan,
825    global_mir_plan: optimize::materialized_view::GlobalMirPlan,
826    global_lir_plan: optimize::materialized_view::GlobalLirPlan,
827}
828
829#[derive(Debug)]
830pub struct CreateMaterializedViewExplain {
831    global_id: GlobalId,
832    validity: PlanValidity,
833    plan: plan::CreateMaterializedViewPlan,
834    df_meta: DataflowMetainfo,
835    explain_ctx: ExplainPlanContext,
836}
837
838#[derive(Debug)]
839pub enum SubscribeStage {
840    OptimizeMir(SubscribeOptimizeMir),
841    TimestampOptimizeLir(SubscribeTimestampOptimizeLir),
842    Finish(SubscribeFinish),
843}
844
845#[derive(Debug)]
846pub struct SubscribeOptimizeMir {
847    validity: PlanValidity,
848    plan: plan::SubscribePlan,
849    timeline: TimelineContext,
850    dependency_ids: BTreeSet<GlobalId>,
851    cluster_id: ComputeInstanceId,
852    replica_id: Option<ReplicaId>,
853}
854
855#[derive(Debug)]
856pub struct SubscribeTimestampOptimizeLir {
857    validity: PlanValidity,
858    plan: plan::SubscribePlan,
859    timeline: TimelineContext,
860    optimizer: optimize::subscribe::Optimizer,
861    global_mir_plan: optimize::subscribe::GlobalMirPlan<optimize::subscribe::Unresolved>,
862    dependency_ids: BTreeSet<GlobalId>,
863    replica_id: Option<ReplicaId>,
864}
865
866#[derive(Debug)]
867pub struct SubscribeFinish {
868    validity: PlanValidity,
869    cluster_id: ComputeInstanceId,
870    replica_id: Option<ReplicaId>,
871    plan: plan::SubscribePlan,
872    global_lir_plan: optimize::subscribe::GlobalLirPlan,
873    dependency_ids: BTreeSet<GlobalId>,
874}
875
876#[derive(Debug)]
877pub enum IntrospectionSubscribeStage {
878    OptimizeMir(IntrospectionSubscribeOptimizeMir),
879    TimestampOptimizeLir(IntrospectionSubscribeTimestampOptimizeLir),
880    Finish(IntrospectionSubscribeFinish),
881}
882
883#[derive(Debug)]
884pub struct IntrospectionSubscribeOptimizeMir {
885    validity: PlanValidity,
886    plan: plan::SubscribePlan,
887    subscribe_id: GlobalId,
888    cluster_id: ComputeInstanceId,
889    replica_id: ReplicaId,
890}
891
892#[derive(Debug)]
893pub struct IntrospectionSubscribeTimestampOptimizeLir {
894    validity: PlanValidity,
895    optimizer: optimize::subscribe::Optimizer,
896    global_mir_plan: optimize::subscribe::GlobalMirPlan<optimize::subscribe::Unresolved>,
897    cluster_id: ComputeInstanceId,
898    replica_id: ReplicaId,
899}
900
901#[derive(Debug)]
902pub struct IntrospectionSubscribeFinish {
903    validity: PlanValidity,
904    global_lir_plan: optimize::subscribe::GlobalLirPlan,
905    read_holds: ReadHolds<Timestamp>,
906    cluster_id: ComputeInstanceId,
907    replica_id: ReplicaId,
908}
909
910#[derive(Debug)]
911pub enum SecretStage {
912    CreateEnsure(CreateSecretEnsure),
913    CreateFinish(CreateSecretFinish),
914    RotateKeysEnsure(RotateKeysSecretEnsure),
915    RotateKeysFinish(RotateKeysSecretFinish),
916    Alter(AlterSecret),
917}
918
919#[derive(Debug)]
920pub struct CreateSecretEnsure {
921    validity: PlanValidity,
922    plan: plan::CreateSecretPlan,
923}
924
925#[derive(Debug)]
926pub struct CreateSecretFinish {
927    validity: PlanValidity,
928    item_id: CatalogItemId,
929    global_id: GlobalId,
930    plan: plan::CreateSecretPlan,
931}
932
933#[derive(Debug)]
934pub struct RotateKeysSecretEnsure {
935    validity: PlanValidity,
936    id: CatalogItemId,
937}
938
939#[derive(Debug)]
940pub struct RotateKeysSecretFinish {
941    validity: PlanValidity,
942    ops: Vec<crate::catalog::Op>,
943}
944
945#[derive(Debug)]
946pub struct AlterSecret {
947    validity: PlanValidity,
948    plan: plan::AlterSecretPlan,
949}
950
951/// An enum describing which cluster to run a statement on.
952///
953/// One example usage would be that if a query depends only on system tables, we might
954/// automatically run it on the catalog server cluster to benefit from indexes that exist there.
955#[derive(Debug, Copy, Clone, PartialEq, Eq)]
956pub enum TargetCluster {
957    /// The catalog server cluster.
958    CatalogServer,
959    /// The current user's active cluster.
960    Active,
961    /// The cluster selected at the start of a transaction.
962    Transaction(ClusterId),
963}
964
965/// Result types for each stage of a sequence.
966pub(crate) enum StageResult<T> {
967    /// A task was spawned that will return the next stage.
968    Handle(JoinHandle<Result<T, AdapterError>>),
969    /// A task was spawned that will return a response for the client.
970    HandleRetire(JoinHandle<Result<ExecuteResponse, AdapterError>>),
971    /// The next stage is immediately ready and will execute.
972    Immediate(T),
973    /// The final stage was executed and is ready to respond to the client.
974    Response(ExecuteResponse),
975}
976
977/// Common functionality for [Coordinator::sequence_staged].
978pub(crate) trait Staged: Send {
979    type Ctx: StagedContext;
980
981    fn validity(&mut self) -> &mut PlanValidity;
982
983    /// Returns the next stage or final result.
984    async fn stage(
985        self,
986        coord: &mut Coordinator,
987        ctx: &mut Self::Ctx,
988    ) -> Result<StageResult<Box<Self>>, AdapterError>;
989
990    /// Prepares a message for the Coordinator.
991    fn message(self, ctx: Self::Ctx, span: Span) -> Message;
992
993    /// Whether it is safe to SQL cancel this stage.
994    fn cancel_enabled(&self) -> bool;
995}
996
997pub trait StagedContext {
998    fn retire(self, result: Result<ExecuteResponse, AdapterError>);
999    fn session(&self) -> Option<&Session>;
1000}
1001
1002impl StagedContext for ExecuteContext {
1003    fn retire(self, result: Result<ExecuteResponse, AdapterError>) {
1004        self.retire(result);
1005    }
1006
1007    fn session(&self) -> Option<&Session> {
1008        Some(self.session())
1009    }
1010}
1011
1012impl StagedContext for () {
1013    fn retire(self, _result: Result<ExecuteResponse, AdapterError>) {}
1014
1015    fn session(&self) -> Option<&Session> {
1016        None
1017    }
1018}
1019
1020/// Configures a coordinator.
1021pub struct Config {
1022    pub controller_config: ControllerConfig,
1023    pub controller_envd_epoch: NonZeroI64,
1024    pub storage: Box<dyn mz_catalog::durable::DurableCatalogState>,
1025    pub audit_logs_iterator: AuditLogIterator,
1026    pub timestamp_oracle_url: Option<SensitiveUrl>,
1027    pub unsafe_mode: bool,
1028    pub all_features: bool,
1029    pub build_info: &'static BuildInfo,
1030    pub environment_id: EnvironmentId,
1031    pub metrics_registry: MetricsRegistry,
1032    pub now: NowFn,
1033    pub secrets_controller: Arc<dyn SecretsController>,
1034    pub cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
1035    pub availability_zones: Vec<String>,
1036    pub cluster_replica_sizes: ClusterReplicaSizeMap,
1037    pub builtin_system_cluster_config: BootstrapBuiltinClusterConfig,
1038    pub builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig,
1039    pub builtin_probe_cluster_config: BootstrapBuiltinClusterConfig,
1040    pub builtin_support_cluster_config: BootstrapBuiltinClusterConfig,
1041    pub builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig,
1042    pub system_parameter_defaults: BTreeMap<String, String>,
1043    pub storage_usage_client: StorageUsageClient,
1044    pub storage_usage_collection_interval: Duration,
1045    pub storage_usage_retention_period: Option<Duration>,
1046    pub segment_client: Option<mz_segment::Client>,
1047    pub egress_addresses: Vec<IpNet>,
1048    pub remote_system_parameters: Option<BTreeMap<String, String>>,
1049    pub aws_account_id: Option<String>,
1050    pub aws_privatelink_availability_zones: Option<Vec<String>>,
1051    pub connection_context: ConnectionContext,
1052    pub connection_limit_callback: Box<dyn Fn(u64, u64) -> () + Send + Sync + 'static>,
1053    pub webhook_concurrency_limit: WebhookConcurrencyLimiter,
1054    pub http_host_name: Option<String>,
1055    pub tracing_handle: TracingHandle,
1056    /// Whether or not to start controllers in read-only mode. This is only
1057    /// meant for use during development of read-only clusters and 0dt upgrades
1058    /// and should go away once we have proper orchestration during upgrades.
1059    pub read_only_controllers: bool,
1060
1061    /// A trigger that signals that the current deployment has caught up with a
1062    /// previous deployment. Only used during 0dt deployment, while in read-only
1063    /// mode.
1064    pub caught_up_trigger: Option<Trigger>,
1065
1066    pub helm_chart_version: Option<String>,
1067    pub license_key: ValidatedLicenseKey,
1068    pub external_login_password_mz_system: Option<Password>,
1069    pub force_builtin_schema_migration: Option<String>,
1070}
1071
1072/// Metadata about an active connection.
1073#[derive(Debug, Serialize)]
1074pub struct ConnMeta {
1075    /// Pgwire specifies that every connection have a 32-bit secret associated
1076    /// with it, that is known to both the client and the server. Cancellation
1077    /// requests are required to authenticate with the secret of the connection
1078    /// that they are targeting.
1079    secret_key: u32,
1080    /// The time when the session's connection was initiated.
1081    connected_at: EpochMillis,
1082    user: User,
1083    application_name: String,
1084    uuid: Uuid,
1085    conn_id: ConnectionId,
1086    client_ip: Option<IpAddr>,
1087
1088    /// Sinks that will need to be dropped when the current transaction, if
1089    /// any, is cleared.
1090    drop_sinks: BTreeSet<GlobalId>,
1091
1092    /// Lock for the Coordinator's deferred statements that is dropped on transaction clear.
1093    #[serde(skip)]
1094    deferred_lock: Option<OwnedMutexGuard<()>>,
1095
1096    /// Cluster reconfigurations that will need to be
1097    /// cleaned up when the current transaction is cleared
1098    pending_cluster_alters: BTreeSet<ClusterId>,
1099
1100    /// Channel on which to send notices to a session.
1101    #[serde(skip)]
1102    notice_tx: mpsc::UnboundedSender<AdapterNotice>,
1103
1104    /// The role that initiated the database context. Fixed for the duration of the connection.
1105    /// WARNING: This role reference is not updated when the role is dropped.
1106    /// Consumers should not assume that this role exist.
1107    authenticated_role: RoleId,
1108}
1109
1110impl ConnMeta {
1111    pub fn conn_id(&self) -> &ConnectionId {
1112        &self.conn_id
1113    }
1114
1115    pub fn user(&self) -> &User {
1116        &self.user
1117    }
1118
1119    pub fn application_name(&self) -> &str {
1120        &self.application_name
1121    }
1122
1123    pub fn authenticated_role_id(&self) -> &RoleId {
1124        &self.authenticated_role
1125    }
1126
1127    pub fn uuid(&self) -> Uuid {
1128        self.uuid
1129    }
1130
1131    pub fn client_ip(&self) -> Option<IpAddr> {
1132        self.client_ip
1133    }
1134
1135    pub fn connected_at(&self) -> EpochMillis {
1136        self.connected_at
1137    }
1138}
1139
1140#[derive(Debug)]
1141/// A pending transaction waiting to be committed.
1142pub struct PendingTxn {
1143    /// Context used to send a response back to the client.
1144    ctx: ExecuteContext,
1145    /// Client response for transaction.
1146    response: Result<PendingTxnResponse, AdapterError>,
1147    /// The action to take at the end of the transaction.
1148    action: EndTransactionAction,
1149}
1150
1151#[derive(Debug)]
1152/// The response we'll send for a [`PendingTxn`].
1153pub enum PendingTxnResponse {
1154    /// The transaction will be committed.
1155    Committed {
1156        /// Parameters that will change, and their values, once this transaction is complete.
1157        params: BTreeMap<&'static str, String>,
1158    },
1159    /// The transaction will be rolled back.
1160    Rolledback {
1161        /// Parameters that will change, and their values, once this transaction is complete.
1162        params: BTreeMap<&'static str, String>,
1163    },
1164}
1165
1166impl PendingTxnResponse {
1167    pub fn extend_params(&mut self, p: impl IntoIterator<Item = (&'static str, String)>) {
1168        match self {
1169            PendingTxnResponse::Committed { params }
1170            | PendingTxnResponse::Rolledback { params } => params.extend(p),
1171        }
1172    }
1173}
1174
1175impl From<PendingTxnResponse> for ExecuteResponse {
1176    fn from(value: PendingTxnResponse) -> Self {
1177        match value {
1178            PendingTxnResponse::Committed { params } => {
1179                ExecuteResponse::TransactionCommitted { params }
1180            }
1181            PendingTxnResponse::Rolledback { params } => {
1182                ExecuteResponse::TransactionRolledBack { params }
1183            }
1184        }
1185    }
1186}
1187
1188#[derive(Debug)]
1189/// A pending read transaction waiting to be linearized along with metadata about it's state
1190pub struct PendingReadTxn {
1191    /// The transaction type
1192    txn: PendingRead,
1193    /// The timestamp context of the transaction.
1194    timestamp_context: TimestampContext<mz_repr::Timestamp>,
1195    /// When we created this pending txn, when the transaction ends. Only used for metrics.
1196    created: Instant,
1197    /// Number of times we requeued the processing of this pending read txn.
1198    /// Requeueing is necessary if the time we executed the query is after the current oracle time;
1199    /// see [`Coordinator::message_linearize_reads`] for more details.
1200    num_requeues: u64,
1201    /// Telemetry context.
1202    otel_ctx: OpenTelemetryContext,
1203}
1204
1205impl PendingReadTxn {
1206    /// Return the timestamp context of the pending read transaction.
1207    pub fn timestamp_context(&self) -> &TimestampContext<mz_repr::Timestamp> {
1208        &self.timestamp_context
1209    }
1210
1211    pub(crate) fn take_context(self) -> ExecuteContext {
1212        self.txn.take_context()
1213    }
1214}
1215
1216#[derive(Debug)]
1217/// A pending read transaction waiting to be linearized.
1218enum PendingRead {
1219    Read {
1220        /// The inner transaction.
1221        txn: PendingTxn,
1222    },
1223    ReadThenWrite {
1224        /// Context used to send a response back to the client.
1225        ctx: ExecuteContext,
1226        /// Channel used to alert the transaction that the read has been linearized and send back
1227        /// `ctx`.
1228        tx: oneshot::Sender<Option<ExecuteContext>>,
1229    },
1230}
1231
1232impl PendingRead {
1233    /// Alert the client that the read has been linearized.
1234    ///
1235    /// If it is necessary to finalize an execute, return the state necessary to do so
1236    /// (execution context and result)
1237    #[instrument(level = "debug")]
1238    pub fn finish(self) -> Option<(ExecuteContext, Result<ExecuteResponse, AdapterError>)> {
1239        match self {
1240            PendingRead::Read {
1241                txn:
1242                    PendingTxn {
1243                        mut ctx,
1244                        response,
1245                        action,
1246                    },
1247                ..
1248            } => {
1249                let changed = ctx.session_mut().vars_mut().end_transaction(action);
1250                // Append any parameters that changed to the response.
1251                let response = response.map(|mut r| {
1252                    r.extend_params(changed);
1253                    ExecuteResponse::from(r)
1254                });
1255
1256                Some((ctx, response))
1257            }
1258            PendingRead::ReadThenWrite { ctx, tx, .. } => {
1259                // Ignore errors if the caller has hung up.
1260                let _ = tx.send(Some(ctx));
1261                None
1262            }
1263        }
1264    }
1265
1266    fn label(&self) -> &'static str {
1267        match self {
1268            PendingRead::Read { .. } => "read",
1269            PendingRead::ReadThenWrite { .. } => "read_then_write",
1270        }
1271    }
1272
1273    pub(crate) fn take_context(self) -> ExecuteContext {
1274        match self {
1275            PendingRead::Read { txn, .. } => txn.ctx,
1276            PendingRead::ReadThenWrite { ctx, tx, .. } => {
1277                // Inform the transaction that we've taken their context.
1278                // Ignore errors if the caller has hung up.
1279                let _ = tx.send(None);
1280                ctx
1281            }
1282        }
1283    }
1284}
1285
1286/// State that the coordinator must process as part of retiring
1287/// command execution.  `ExecuteContextExtra::Default` is guaranteed
1288/// to produce a value that will cause the coordinator to do nothing, and
1289/// is intended for use by code that invokes the execution processing flow
1290/// (i.e., `sequence_plan`) without actually being a statement execution.
1291///
1292/// This struct must not be dropped if it contains non-trivial
1293/// state. The only valid way to get rid of it is to pass it to the
1294/// coordinator for retirement. To enforce this, we assert in the
1295/// `Drop` implementation.
1296#[derive(Debug, Default)]
1297#[must_use]
1298pub struct ExecuteContextExtra {
1299    statement_uuid: Option<StatementLoggingId>,
1300}
1301
1302impl ExecuteContextExtra {
1303    pub(crate) fn new(statement_uuid: Option<StatementLoggingId>) -> Self {
1304        Self { statement_uuid }
1305    }
1306    pub fn is_trivial(&self) -> bool {
1307        let Self { statement_uuid } = self;
1308        statement_uuid.is_none()
1309    }
1310    pub fn contents(&self) -> Option<StatementLoggingId> {
1311        let Self { statement_uuid } = self;
1312        *statement_uuid
1313    }
1314    /// Take responsibility for the contents.  This should only be
1315    /// called from code that knows what to do to finish up logging
1316    /// based on the inner value.
1317    #[must_use]
1318    fn retire(mut self) -> Option<StatementLoggingId> {
1319        let Self { statement_uuid } = &mut self;
1320        statement_uuid.take()
1321    }
1322}
1323
1324impl Drop for ExecuteContextExtra {
1325    fn drop(&mut self) {
1326        let Self { statement_uuid } = &*self;
1327        if let Some(statement_uuid) = statement_uuid {
1328            // Note: the impact when this error hits
1329            // is that the statement will never be marked
1330            // as finished in the statement log.
1331            soft_panic_or_log!(
1332                "execute context for statement {statement_uuid:?} dropped without being properly retired."
1333            );
1334        }
1335    }
1336}
1337
1338/// Bundle of state related to statement execution.
1339///
1340/// This struct collects a bundle of state that needs to be threaded
1341/// through various functions as part of statement execution.
1342/// Currently, it is only used to finalize execution, by calling one
1343/// of the methods `retire` or `retire_aysnc`. Finalizing execution
1344/// involves sending the session back to the pgwire layer so that it
1345/// may be used to process further commands. In the future, it will
1346/// also involve performing some work on the main coordinator thread
1347/// (e.g., recording the time at which the statement finished
1348/// executing) the state necessary to perform this work is bundled in
1349/// the `ExecuteContextExtra` object (today, it is simply empty).
1350#[derive(Debug)]
1351pub struct ExecuteContext {
1352    inner: Box<ExecuteContextInner>,
1353}
1354
1355impl std::ops::Deref for ExecuteContext {
1356    type Target = ExecuteContextInner;
1357    fn deref(&self) -> &Self::Target {
1358        &*self.inner
1359    }
1360}
1361
1362impl std::ops::DerefMut for ExecuteContext {
1363    fn deref_mut(&mut self) -> &mut Self::Target {
1364        &mut *self.inner
1365    }
1366}
1367
1368#[derive(Debug)]
1369pub struct ExecuteContextInner {
1370    tx: ClientTransmitter<ExecuteResponse>,
1371    internal_cmd_tx: mpsc::UnboundedSender<Message>,
1372    session: Session,
1373    extra: ExecuteContextExtra,
1374}
1375
1376impl ExecuteContext {
1377    pub fn session(&self) -> &Session {
1378        &self.session
1379    }
1380
1381    pub fn session_mut(&mut self) -> &mut Session {
1382        &mut self.session
1383    }
1384
1385    pub fn tx(&self) -> &ClientTransmitter<ExecuteResponse> {
1386        &self.tx
1387    }
1388
1389    pub fn tx_mut(&mut self) -> &mut ClientTransmitter<ExecuteResponse> {
1390        &mut self.tx
1391    }
1392
1393    pub fn from_parts(
1394        tx: ClientTransmitter<ExecuteResponse>,
1395        internal_cmd_tx: mpsc::UnboundedSender<Message>,
1396        session: Session,
1397        extra: ExecuteContextExtra,
1398    ) -> Self {
1399        Self {
1400            inner: ExecuteContextInner {
1401                tx,
1402                session,
1403                extra,
1404                internal_cmd_tx,
1405            }
1406            .into(),
1407        }
1408    }
1409
1410    /// By calling this function, the caller takes responsibility for
1411    /// dealing with the instance of `ExecuteContextExtra`. This is
1412    /// intended to support protocols (like `COPY FROM`) that involve
1413    /// multiple passes of sending the session back and forth between
1414    /// the coordinator and the pgwire layer. As part of any such
1415    /// protocol, we must ensure that the `ExecuteContextExtra`
1416    /// (possibly wrapped in a new `ExecuteContext`) is passed back to the coordinator for
1417    /// eventual retirement.
1418    pub fn into_parts(
1419        self,
1420    ) -> (
1421        ClientTransmitter<ExecuteResponse>,
1422        mpsc::UnboundedSender<Message>,
1423        Session,
1424        ExecuteContextExtra,
1425    ) {
1426        let ExecuteContextInner {
1427            tx,
1428            internal_cmd_tx,
1429            session,
1430            extra,
1431        } = *self.inner;
1432        (tx, internal_cmd_tx, session, extra)
1433    }
1434
1435    /// Retire the execution, by sending a message to the coordinator.
1436    #[instrument(level = "debug")]
1437    pub fn retire(self, result: Result<ExecuteResponse, AdapterError>) {
1438        let ExecuteContextInner {
1439            tx,
1440            internal_cmd_tx,
1441            session,
1442            extra,
1443        } = *self.inner;
1444        let reason = if extra.is_trivial() {
1445            None
1446        } else {
1447            Some((&result).into())
1448        };
1449        tx.send(result, session);
1450        if let Some(reason) = reason {
1451            if let Err(e) = internal_cmd_tx.send(Message::RetireExecute {
1452                otel_ctx: OpenTelemetryContext::obtain(),
1453                data: extra,
1454                reason,
1455            }) {
1456                warn!("internal_cmd_rx dropped before we could send: {:?}", e);
1457            }
1458        }
1459    }
1460
1461    pub fn extra(&self) -> &ExecuteContextExtra {
1462        &self.extra
1463    }
1464
1465    pub fn extra_mut(&mut self) -> &mut ExecuteContextExtra {
1466        &mut self.extra
1467    }
1468}
1469
1470#[derive(Debug)]
1471struct ClusterReplicaStatuses(
1472    BTreeMap<ClusterId, BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>>>,
1473);
1474
1475impl ClusterReplicaStatuses {
1476    pub(crate) fn new() -> ClusterReplicaStatuses {
1477        ClusterReplicaStatuses(BTreeMap::new())
1478    }
1479
1480    /// Initializes the statuses of the specified cluster.
1481    ///
1482    /// Panics if the cluster statuses are already initialized.
1483    pub(crate) fn initialize_cluster_statuses(&mut self, cluster_id: ClusterId) {
1484        let prev = self.0.insert(cluster_id, BTreeMap::new());
1485        assert_eq!(
1486            prev, None,
1487            "cluster {cluster_id} statuses already initialized"
1488        );
1489    }
1490
1491    /// Initializes the statuses of the specified cluster replica.
1492    ///
1493    /// Panics if the cluster replica statuses are already initialized.
1494    pub(crate) fn initialize_cluster_replica_statuses(
1495        &mut self,
1496        cluster_id: ClusterId,
1497        replica_id: ReplicaId,
1498        num_processes: usize,
1499        time: DateTime<Utc>,
1500    ) {
1501        tracing::info!(
1502            ?cluster_id,
1503            ?replica_id,
1504            ?time,
1505            "initializing cluster replica status"
1506        );
1507        let replica_statuses = self.0.entry(cluster_id).or_default();
1508        let process_statuses = (0..num_processes)
1509            .map(|process_id| {
1510                let status = ClusterReplicaProcessStatus {
1511                    status: ClusterStatus::Offline(Some(OfflineReason::Initializing)),
1512                    time: time.clone(),
1513                };
1514                (u64::cast_from(process_id), status)
1515            })
1516            .collect();
1517        let prev = replica_statuses.insert(replica_id, process_statuses);
1518        assert_none!(
1519            prev,
1520            "cluster replica {cluster_id}.{replica_id} statuses already initialized"
1521        );
1522    }
1523
1524    /// Removes the statuses of the specified cluster.
1525    ///
1526    /// Panics if the cluster does not exist.
1527    pub(crate) fn remove_cluster_statuses(
1528        &mut self,
1529        cluster_id: &ClusterId,
1530    ) -> BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>> {
1531        let prev = self.0.remove(cluster_id);
1532        prev.unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"))
1533    }
1534
1535    /// Removes the statuses of the specified cluster replica.
1536    ///
1537    /// Panics if the cluster or replica does not exist.
1538    pub(crate) fn remove_cluster_replica_statuses(
1539        &mut self,
1540        cluster_id: &ClusterId,
1541        replica_id: &ReplicaId,
1542    ) -> BTreeMap<ProcessId, ClusterReplicaProcessStatus> {
1543        let replica_statuses = self
1544            .0
1545            .get_mut(cluster_id)
1546            .unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"));
1547        let prev = replica_statuses.remove(replica_id);
1548        prev.unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1549    }
1550
1551    /// Inserts or updates the status of the specified cluster replica process.
1552    ///
1553    /// Panics if the cluster or replica does not exist.
1554    pub(crate) fn ensure_cluster_status(
1555        &mut self,
1556        cluster_id: ClusterId,
1557        replica_id: ReplicaId,
1558        process_id: ProcessId,
1559        status: ClusterReplicaProcessStatus,
1560    ) {
1561        let replica_statuses = self
1562            .0
1563            .get_mut(&cluster_id)
1564            .unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"))
1565            .get_mut(&replica_id)
1566            .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"));
1567        replica_statuses.insert(process_id, status);
1568    }
1569
1570    /// Computes the status of the cluster replica as a whole.
1571    ///
1572    /// Panics if `cluster_id` or `replica_id` don't exist.
1573    pub fn get_cluster_replica_status(
1574        &self,
1575        cluster_id: ClusterId,
1576        replica_id: ReplicaId,
1577    ) -> ClusterStatus {
1578        let process_status = self.get_cluster_replica_statuses(cluster_id, replica_id);
1579        Self::cluster_replica_status(process_status)
1580    }
1581
1582    /// Computes the status of the cluster replica as a whole.
1583    pub fn cluster_replica_status(
1584        process_status: &BTreeMap<ProcessId, ClusterReplicaProcessStatus>,
1585    ) -> ClusterStatus {
1586        process_status
1587            .values()
1588            .fold(ClusterStatus::Online, |s, p| match (s, p.status) {
1589                (ClusterStatus::Online, ClusterStatus::Online) => ClusterStatus::Online,
1590                (x, y) => {
1591                    let reason_x = match x {
1592                        ClusterStatus::Offline(reason) => reason,
1593                        ClusterStatus::Online => None,
1594                    };
1595                    let reason_y = match y {
1596                        ClusterStatus::Offline(reason) => reason,
1597                        ClusterStatus::Online => None,
1598                    };
1599                    // Arbitrarily pick the first known not-ready reason.
1600                    ClusterStatus::Offline(reason_x.or(reason_y))
1601                }
1602            })
1603    }
1604
1605    /// Gets the statuses of the given cluster replica.
1606    ///
1607    /// Panics if the cluster or replica does not exist
1608    pub(crate) fn get_cluster_replica_statuses(
1609        &self,
1610        cluster_id: ClusterId,
1611        replica_id: ReplicaId,
1612    ) -> &BTreeMap<ProcessId, ClusterReplicaProcessStatus> {
1613        self.try_get_cluster_replica_statuses(cluster_id, replica_id)
1614            .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1615    }
1616
1617    /// Gets the statuses of the given cluster replica.
1618    pub(crate) fn try_get_cluster_replica_statuses(
1619        &self,
1620        cluster_id: ClusterId,
1621        replica_id: ReplicaId,
1622    ) -> Option<&BTreeMap<ProcessId, ClusterReplicaProcessStatus>> {
1623        self.try_get_cluster_statuses(cluster_id)
1624            .and_then(|statuses| statuses.get(&replica_id))
1625    }
1626
1627    /// Gets the statuses of the given cluster.
1628    pub(crate) fn try_get_cluster_statuses(
1629        &self,
1630        cluster_id: ClusterId,
1631    ) -> Option<&BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>>> {
1632        self.0.get(&cluster_id)
1633    }
1634}
1635
1636/// Glues the external world to the Timely workers.
1637#[derive(Derivative)]
1638#[derivative(Debug)]
1639pub struct Coordinator {
1640    /// The controller for the storage and compute layers.
1641    #[derivative(Debug = "ignore")]
1642    controller: mz_controller::Controller,
1643    /// The catalog in an Arc suitable for readonly references. The Arc allows
1644    /// us to hand out cheap copies of the catalog to functions that can use it
1645    /// off of the main coordinator thread. If the coordinator needs to mutate
1646    /// the catalog, call [`Self::catalog_mut`], which will clone this struct member,
1647    /// allowing it to be mutated here while the other off-thread references can
1648    /// read their catalog as long as needed. In the future we would like this
1649    /// to be a pTVC, but for now this is sufficient.
1650    catalog: Arc<Catalog>,
1651
1652    /// A client for persist. Initially, this is only used for reading stashed
1653    /// peek responses out of batches.
1654    persist_client: PersistClient,
1655
1656    /// Channel to manage internal commands from the coordinator to itself.
1657    internal_cmd_tx: mpsc::UnboundedSender<Message>,
1658    /// Notification that triggers a group commit.
1659    group_commit_tx: appends::GroupCommitNotifier,
1660
1661    /// Channel for strict serializable reads ready to commit.
1662    strict_serializable_reads_tx: mpsc::UnboundedSender<(ConnectionId, PendingReadTxn)>,
1663
1664    /// Mechanism for totally ordering write and read timestamps, so that all reads
1665    /// reflect exactly the set of writes that precede them, and no writes that follow.
1666    global_timelines: BTreeMap<Timeline, TimelineState<Timestamp>>,
1667
1668    /// A generator for transient [`GlobalId`]s, shareable with other threads.
1669    transient_id_gen: Arc<TransientIdGen>,
1670    /// A map from connection ID to metadata about that connection for all
1671    /// active connections.
1672    active_conns: BTreeMap<ConnectionId, ConnMeta>,
1673
1674    /// For each transaction, the read holds taken to support any performed reads.
1675    ///
1676    /// Upon completing a transaction, these read holds should be dropped.
1677    txn_read_holds: BTreeMap<ConnectionId, read_policy::ReadHolds<Timestamp>>,
1678
1679    /// Access to the peek fields should be restricted to methods in the [`peek`] API.
1680    /// A map from pending peek ids to the queue into which responses are sent, and
1681    /// the connection id of the client that initiated the peek.
1682    pending_peeks: BTreeMap<Uuid, PendingPeek>,
1683    /// A map from client connection ids to a set of all pending peeks for that client.
1684    client_pending_peeks: BTreeMap<ConnectionId, BTreeMap<Uuid, ClusterId>>,
1685
1686    /// A map from client connection ids to pending linearize read transaction.
1687    pending_linearize_read_txns: BTreeMap<ConnectionId, PendingReadTxn>,
1688
1689    /// A map from the compute sink ID to it's state description.
1690    active_compute_sinks: BTreeMap<GlobalId, ActiveComputeSink>,
1691    /// A map from active webhooks to their invalidation handle.
1692    active_webhooks: BTreeMap<CatalogItemId, WebhookAppenderInvalidator>,
1693    /// A map of active `COPY FROM` statements. The Coordinator waits for `clusterd`
1694    /// to stage Batches in Persist that we will then link into the shard.
1695    active_copies: BTreeMap<ConnectionId, ActiveCopyFrom>,
1696
1697    /// A map from connection ids to a watch channel that is set to `true` if the connection
1698    /// received a cancel request.
1699    staged_cancellation: BTreeMap<ConnectionId, (watch::Sender<bool>, watch::Receiver<bool>)>,
1700    /// Active introspection subscribes.
1701    introspection_subscribes: BTreeMap<GlobalId, IntrospectionSubscribe>,
1702
1703    /// Locks that grant access to a specific object, populated lazily as objects are written to.
1704    write_locks: BTreeMap<CatalogItemId, Arc<tokio::sync::Mutex<()>>>,
1705    /// Plans that are currently deferred and waiting on a write lock.
1706    deferred_write_ops: BTreeMap<ConnectionId, DeferredOp>,
1707
1708    /// Pending writes waiting for a group commit.
1709    pending_writes: Vec<PendingWriteTxn>,
1710
1711    /// For the realtime timeline, an explicit SELECT or INSERT on a table will bump the
1712    /// table's timestamps, but there are cases where timestamps are not bumped but
1713    /// we expect the closed timestamps to advance (`AS OF X`, SUBSCRIBing views over
1714    /// RT sources and tables). To address these, spawn a task that forces table
1715    /// timestamps to close on a regular interval. This roughly tracks the behavior
1716    /// of realtime sources that close off timestamps on an interval.
1717    ///
1718    /// For non-realtime timelines, nothing pushes the timestamps forward, so we must do
1719    /// it manually.
1720    advance_timelines_interval: Interval,
1721
1722    /// Serialized DDL. DDL must be serialized because:
1723    /// - Many of them do off-thread work and need to verify the catalog is in a valid state, but
1724    ///   [`PlanValidity`] does not currently support tracking all changes. Doing that correctly
1725    ///   seems to be more difficult than it's worth, so we would instead re-plan and re-sequence
1726    ///   the statements.
1727    /// - Re-planning a statement is hard because Coordinator and Session state is mutated at
1728    ///   various points, and we would need to correctly reset those changes before re-planning and
1729    ///   re-sequencing.
1730    serialized_ddl: LockedVecDeque<DeferredPlanStatement>,
1731
1732    /// Handle to secret manager that can create and delete secrets from
1733    /// an arbitrary secret storage engine.
1734    secrets_controller: Arc<dyn SecretsController>,
1735    /// A secrets reader than maintains an in-memory cache, where values have a set TTL.
1736    caching_secrets_reader: CachingSecretsReader,
1737
1738    /// Handle to a manager that can create and delete kubernetes resources
1739    /// (ie: VpcEndpoint objects)
1740    cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
1741
1742    /// Persist client for fetching storage metadata such as size metrics.
1743    storage_usage_client: StorageUsageClient,
1744    /// The interval at which to collect storage usage information.
1745    storage_usage_collection_interval: Duration,
1746
1747    /// Segment analytics client.
1748    #[derivative(Debug = "ignore")]
1749    segment_client: Option<mz_segment::Client>,
1750
1751    /// Coordinator metrics.
1752    metrics: Metrics,
1753    /// Optimizer metrics.
1754    optimizer_metrics: OptimizerMetrics,
1755
1756    /// Tracing handle.
1757    tracing_handle: TracingHandle,
1758
1759    /// Data used by the statement logging feature.
1760    statement_logging: StatementLogging,
1761
1762    /// Limit for how many concurrent webhook requests we allow.
1763    webhook_concurrency_limit: WebhookConcurrencyLimiter,
1764
1765    /// Optional config for the Postgres-backed timestamp oracle. This is
1766    /// _required_ when `postgres` is configured using the `timestamp_oracle`
1767    /// system variable.
1768    pg_timestamp_oracle_config: Option<PostgresTimestampOracleConfig>,
1769
1770    /// Periodically asks cluster scheduling policies to make their decisions.
1771    check_cluster_scheduling_policies_interval: Interval,
1772
1773    /// This keeps the last On/Off decision for each cluster and each scheduling policy.
1774    /// (Clusters that have been dropped or are otherwise out of scope for automatic scheduling are
1775    /// periodically cleaned up from this Map.)
1776    cluster_scheduling_decisions: BTreeMap<ClusterId, BTreeMap<&'static str, SchedulingDecision>>,
1777
1778    /// When doing 0dt upgrades/in read-only mode, periodically ask all known
1779    /// clusters/collections whether they are caught up.
1780    caught_up_check_interval: Interval,
1781
1782    /// Context needed to check whether all clusters/collections have caught up.
1783    /// Only used during 0dt deployment, while in read-only mode.
1784    caught_up_check: Option<CaughtUpCheckContext>,
1785
1786    /// Tracks the state associated with the currently installed watchsets.
1787    installed_watch_sets: BTreeMap<WatchSetId, (ConnectionId, WatchSetResponse)>,
1788
1789    /// Tracks the currently installed watchsets for each connection.
1790    connection_watch_sets: BTreeMap<ConnectionId, BTreeSet<WatchSetId>>,
1791
1792    /// Tracks the statuses of all cluster replicas.
1793    cluster_replica_statuses: ClusterReplicaStatuses,
1794
1795    /// Whether or not to start controllers in read-only mode. This is only
1796    /// meant for use during development of read-only clusters and 0dt upgrades
1797    /// and should go away once we have proper orchestration during upgrades.
1798    read_only_controllers: bool,
1799
1800    /// Updates to builtin tables that are being buffered while we are in
1801    /// read-only mode. We apply these all at once when coming out of read-only
1802    /// mode.
1803    ///
1804    /// This is a `Some` while in read-only mode and will be replaced by a
1805    /// `None` when we transition out of read-only mode and write out any
1806    /// buffered updates.
1807    buffered_builtin_table_updates: Option<Vec<BuiltinTableUpdate>>,
1808
1809    license_key: ValidatedLicenseKey,
1810}
1811
1812impl Coordinator {
1813    /// Initializes coordinator state based on the contained catalog. Must be
1814    /// called after creating the coordinator and before calling the
1815    /// `Coordinator::serve` method.
1816    #[instrument(name = "coord::bootstrap")]
1817    pub(crate) async fn bootstrap(
1818        &mut self,
1819        boot_ts: Timestamp,
1820        migrated_storage_collections_0dt: BTreeSet<CatalogItemId>,
1821        mut builtin_table_updates: Vec<BuiltinTableUpdate>,
1822        cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
1823        uncached_local_exprs: BTreeMap<GlobalId, LocalExpressions>,
1824        audit_logs_iterator: AuditLogIterator,
1825    ) -> Result<(), AdapterError> {
1826        let bootstrap_start = Instant::now();
1827        info!("startup: coordinator init: bootstrap beginning");
1828        info!("startup: coordinator init: bootstrap: preamble beginning");
1829
1830        // Initialize cluster replica statuses.
1831        // Gross iterator is to avoid partial borrow issues.
1832        let cluster_statuses: Vec<(_, Vec<_>)> = self
1833            .catalog()
1834            .clusters()
1835            .map(|cluster| {
1836                (
1837                    cluster.id(),
1838                    cluster
1839                        .replicas()
1840                        .map(|replica| {
1841                            (replica.replica_id, replica.config.location.num_processes())
1842                        })
1843                        .collect(),
1844                )
1845            })
1846            .collect();
1847        let now = self.now_datetime();
1848        for (cluster_id, replica_statuses) in cluster_statuses {
1849            self.cluster_replica_statuses
1850                .initialize_cluster_statuses(cluster_id);
1851            for (replica_id, num_processes) in replica_statuses {
1852                self.cluster_replica_statuses
1853                    .initialize_cluster_replica_statuses(
1854                        cluster_id,
1855                        replica_id,
1856                        num_processes,
1857                        now,
1858                    );
1859            }
1860        }
1861
1862        let system_config = self.catalog().system_config();
1863
1864        // Inform metrics about the initial system configuration.
1865        mz_metrics::update_dyncfg(&system_config.dyncfg_updates());
1866
1867        // Inform the controllers about their initial configuration.
1868        let compute_config = flags::compute_config(system_config);
1869        let storage_config = flags::storage_config(system_config);
1870        let scheduling_config = flags::orchestrator_scheduling_config(system_config);
1871        let dyncfg_updates = system_config.dyncfg_updates();
1872        self.controller.compute.update_configuration(compute_config);
1873        self.controller.storage.update_parameters(storage_config);
1874        self.controller
1875            .update_orchestrator_scheduling_config(scheduling_config);
1876        self.controller.update_configuration(dyncfg_updates);
1877
1878        self.validate_resource_limit_numeric(
1879            Numeric::zero(),
1880            self.current_credit_consumption_rate(),
1881            |system_vars| {
1882                self.license_key
1883                    .max_credit_consumption_rate()
1884                    .map_or_else(|| system_vars.max_credit_consumption_rate(), Numeric::from)
1885            },
1886            "cluster replica",
1887            MAX_CREDIT_CONSUMPTION_RATE.name(),
1888        )?;
1889
1890        let mut policies_to_set: BTreeMap<CompactionWindow, CollectionIdBundle> =
1891            Default::default();
1892
1893        let enable_worker_core_affinity =
1894            self.catalog().system_config().enable_worker_core_affinity();
1895        for instance in self.catalog.clusters() {
1896            self.controller.create_cluster(
1897                instance.id,
1898                ClusterConfig {
1899                    arranged_logs: instance.log_indexes.clone(),
1900                    workload_class: instance.config.workload_class.clone(),
1901                },
1902            )?;
1903            for replica in instance.replicas() {
1904                let role = instance.role();
1905                self.controller.create_replica(
1906                    instance.id,
1907                    replica.replica_id,
1908                    instance.name.clone(),
1909                    replica.name.clone(),
1910                    role,
1911                    replica.config.clone(),
1912                    enable_worker_core_affinity,
1913                )?;
1914            }
1915        }
1916
1917        info!(
1918            "startup: coordinator init: bootstrap: preamble complete in {:?}",
1919            bootstrap_start.elapsed()
1920        );
1921
1922        let init_storage_collections_start = Instant::now();
1923        info!("startup: coordinator init: bootstrap: storage collections init beginning");
1924        self.bootstrap_storage_collections(&migrated_storage_collections_0dt)
1925            .await;
1926        info!(
1927            "startup: coordinator init: bootstrap: storage collections init complete in {:?}",
1928            init_storage_collections_start.elapsed()
1929        );
1930
1931        // The storage controller knows about the introspection collections now, so we can start
1932        // sinking introspection updates in the compute controller. It makes sense to do that as
1933        // soon as possible, to avoid updates piling up in the compute controller's internal
1934        // buffers.
1935        self.controller.start_compute_introspection_sink();
1936
1937        let optimize_dataflows_start = Instant::now();
1938        info!("startup: coordinator init: bootstrap: optimize dataflow plans beginning");
1939        let entries: Vec<_> = self.catalog().entries().cloned().collect();
1940        let uncached_global_exps = self.bootstrap_dataflow_plans(&entries, cached_global_exprs)?;
1941        info!(
1942            "startup: coordinator init: bootstrap: optimize dataflow plans complete in {:?}",
1943            optimize_dataflows_start.elapsed()
1944        );
1945
1946        // We don't need to wait for the cache to update.
1947        let _fut = self.catalog().update_expression_cache(
1948            uncached_local_exprs.into_iter().collect(),
1949            uncached_global_exps.into_iter().collect(),
1950        );
1951
1952        // Select dataflow as-ofs. This step relies on the storage collections created by
1953        // `bootstrap_storage_collections` and the dataflow plans created by
1954        // `bootstrap_dataflow_plans`.
1955        let bootstrap_as_ofs_start = Instant::now();
1956        info!("startup: coordinator init: bootstrap: dataflow as-of bootstrapping beginning");
1957        let dataflow_read_holds = self.bootstrap_dataflow_as_ofs().await;
1958        info!(
1959            "startup: coordinator init: bootstrap: dataflow as-of bootstrapping complete in {:?}",
1960            bootstrap_as_ofs_start.elapsed()
1961        );
1962
1963        let postamble_start = Instant::now();
1964        info!("startup: coordinator init: bootstrap: postamble beginning");
1965
1966        let logs: BTreeSet<_> = BUILTINS::logs()
1967            .map(|log| self.catalog().resolve_builtin_log(log))
1968            .flat_map(|item_id| self.catalog().get_global_ids(&item_id))
1969            .collect();
1970
1971        let mut privatelink_connections = BTreeMap::new();
1972
1973        for entry in &entries {
1974            debug!(
1975                "coordinator init: installing {} {}",
1976                entry.item().typ(),
1977                entry.id()
1978            );
1979            let mut policy = entry.item().initial_logical_compaction_window();
1980            match entry.item() {
1981                // Currently catalog item rebuild assumes that sinks and
1982                // indexes are always built individually and does not store information
1983                // about how it was built. If we start building multiple sinks and/or indexes
1984                // using a single dataflow, we have to make sure the rebuild process re-runs
1985                // the same multiple-build dataflow.
1986                CatalogItem::Source(source) => {
1987                    // Propagate source compaction windows to subsources if needed.
1988                    if source.custom_logical_compaction_window.is_none() {
1989                        if let DataSourceDesc::IngestionExport { ingestion_id, .. } =
1990                            source.data_source
1991                        {
1992                            policy = Some(
1993                                self.catalog()
1994                                    .get_entry(&ingestion_id)
1995                                    .source()
1996                                    .expect("must be source")
1997                                    .custom_logical_compaction_window
1998                                    .unwrap_or_default(),
1999                            );
2000                        }
2001                    }
2002                    policies_to_set
2003                        .entry(policy.expect("sources have a compaction window"))
2004                        .or_insert_with(Default::default)
2005                        .storage_ids
2006                        .insert(source.global_id());
2007                }
2008                CatalogItem::Table(table) => {
2009                    policies_to_set
2010                        .entry(policy.expect("tables have a compaction window"))
2011                        .or_insert_with(Default::default)
2012                        .storage_ids
2013                        .extend(table.global_ids());
2014                }
2015                CatalogItem::Index(idx) => {
2016                    let policy_entry = policies_to_set
2017                        .entry(policy.expect("indexes have a compaction window"))
2018                        .or_insert_with(Default::default);
2019
2020                    if logs.contains(&idx.on) {
2021                        policy_entry
2022                            .compute_ids
2023                            .entry(idx.cluster_id)
2024                            .or_insert_with(BTreeSet::new)
2025                            .insert(idx.global_id());
2026                    } else {
2027                        let df_desc = self
2028                            .catalog()
2029                            .try_get_physical_plan(&idx.global_id())
2030                            .expect("added in `bootstrap_dataflow_plans`")
2031                            .clone();
2032
2033                        let df_meta = self
2034                            .catalog()
2035                            .try_get_dataflow_metainfo(&idx.global_id())
2036                            .expect("added in `bootstrap_dataflow_plans`");
2037
2038                        if self.catalog().state().system_config().enable_mz_notices() {
2039                            // Collect optimization hint updates.
2040                            self.catalog().state().pack_optimizer_notices(
2041                                &mut builtin_table_updates,
2042                                df_meta.optimizer_notices.iter(),
2043                                Diff::ONE,
2044                            );
2045                        }
2046
2047                        // What follows is morally equivalent to `self.ship_dataflow(df, idx.cluster_id)`,
2048                        // but we cannot call that as it will also downgrade the read hold on the index.
2049                        policy_entry
2050                            .compute_ids
2051                            .entry(idx.cluster_id)
2052                            .or_insert_with(Default::default)
2053                            .extend(df_desc.export_ids());
2054
2055                        self.controller
2056                            .compute
2057                            .create_dataflow(idx.cluster_id, df_desc, None)
2058                            .unwrap_or_terminate("cannot fail to create dataflows");
2059                    }
2060                }
2061                CatalogItem::View(_) => (),
2062                CatalogItem::MaterializedView(mview) => {
2063                    policies_to_set
2064                        .entry(policy.expect("materialized views have a compaction window"))
2065                        .or_insert_with(Default::default)
2066                        .storage_ids
2067                        .insert(mview.global_id());
2068
2069                    let mut df_desc = self
2070                        .catalog()
2071                        .try_get_physical_plan(&mview.global_id())
2072                        .expect("added in `bootstrap_dataflow_plans`")
2073                        .clone();
2074
2075                    if let Some(initial_as_of) = mview.initial_as_of.clone() {
2076                        df_desc.set_initial_as_of(initial_as_of);
2077                    }
2078
2079                    // If we have a refresh schedule that has a last refresh, then set the `until` to the last refresh.
2080                    let until = mview
2081                        .refresh_schedule
2082                        .as_ref()
2083                        .and_then(|s| s.last_refresh())
2084                        .and_then(|r| r.try_step_forward());
2085                    if let Some(until) = until {
2086                        df_desc.until.meet_assign(&Antichain::from_elem(until));
2087                    }
2088
2089                    let df_meta = self
2090                        .catalog()
2091                        .try_get_dataflow_metainfo(&mview.global_id())
2092                        .expect("added in `bootstrap_dataflow_plans`");
2093
2094                    if self.catalog().state().system_config().enable_mz_notices() {
2095                        // Collect optimization hint updates.
2096                        self.catalog().state().pack_optimizer_notices(
2097                            &mut builtin_table_updates,
2098                            df_meta.optimizer_notices.iter(),
2099                            Diff::ONE,
2100                        );
2101                    }
2102
2103                    self.ship_dataflow(df_desc, mview.cluster_id, None).await;
2104                    self.allow_writes(mview.cluster_id, mview.global_id());
2105                }
2106                CatalogItem::Sink(sink) => {
2107                    policies_to_set
2108                        .entry(CompactionWindow::Default)
2109                        .or_insert_with(Default::default)
2110                        .storage_ids
2111                        .insert(sink.global_id());
2112                }
2113                CatalogItem::Connection(catalog_connection) => {
2114                    if let ConnectionDetails::AwsPrivatelink(conn) = &catalog_connection.details {
2115                        privatelink_connections.insert(
2116                            entry.id(),
2117                            VpcEndpointConfig {
2118                                aws_service_name: conn.service_name.clone(),
2119                                availability_zone_ids: conn.availability_zones.clone(),
2120                            },
2121                        );
2122                    }
2123                }
2124                CatalogItem::ContinualTask(ct) => {
2125                    policies_to_set
2126                        .entry(policy.expect("continual tasks have a compaction window"))
2127                        .or_insert_with(Default::default)
2128                        .storage_ids
2129                        .insert(ct.global_id());
2130
2131                    let mut df_desc = self
2132                        .catalog()
2133                        .try_get_physical_plan(&ct.global_id())
2134                        .expect("added in `bootstrap_dataflow_plans`")
2135                        .clone();
2136
2137                    if let Some(initial_as_of) = ct.initial_as_of.clone() {
2138                        df_desc.set_initial_as_of(initial_as_of);
2139                    }
2140
2141                    let df_meta = self
2142                        .catalog()
2143                        .try_get_dataflow_metainfo(&ct.global_id())
2144                        .expect("added in `bootstrap_dataflow_plans`");
2145
2146                    if self.catalog().state().system_config().enable_mz_notices() {
2147                        // Collect optimization hint updates.
2148                        self.catalog().state().pack_optimizer_notices(
2149                            &mut builtin_table_updates,
2150                            df_meta.optimizer_notices.iter(),
2151                            Diff::ONE,
2152                        );
2153                    }
2154
2155                    self.ship_dataflow(df_desc, ct.cluster_id, None).await;
2156                    self.allow_writes(ct.cluster_id, ct.global_id());
2157                }
2158                // Nothing to do for these cases
2159                CatalogItem::Log(_)
2160                | CatalogItem::Type(_)
2161                | CatalogItem::Func(_)
2162                | CatalogItem::Secret(_) => {}
2163            }
2164        }
2165
2166        if let Some(cloud_resource_controller) = &self.cloud_resource_controller {
2167            // Clean up any extraneous VpcEndpoints that shouldn't exist.
2168            let existing_vpc_endpoints = cloud_resource_controller
2169                .list_vpc_endpoints()
2170                .await
2171                .context("list vpc endpoints")?;
2172            let existing_vpc_endpoints = BTreeSet::from_iter(existing_vpc_endpoints.into_keys());
2173            let desired_vpc_endpoints = privatelink_connections.keys().cloned().collect();
2174            let vpc_endpoints_to_remove = existing_vpc_endpoints.difference(&desired_vpc_endpoints);
2175            for id in vpc_endpoints_to_remove {
2176                cloud_resource_controller
2177                    .delete_vpc_endpoint(*id)
2178                    .await
2179                    .context("deleting extraneous vpc endpoint")?;
2180            }
2181
2182            // Ensure desired VpcEndpoints are up to date.
2183            for (id, spec) in privatelink_connections {
2184                cloud_resource_controller
2185                    .ensure_vpc_endpoint(id, spec)
2186                    .await
2187                    .context("ensuring vpc endpoint")?;
2188            }
2189        }
2190
2191        // Having installed all entries, creating all constraints, we can now drop read holds and
2192        // relax read policies.
2193        drop(dataflow_read_holds);
2194        // TODO -- Improve `initialize_read_policies` API so we can avoid calling this in a loop.
2195        for (cw, policies) in policies_to_set {
2196            self.initialize_read_policies(&policies, cw).await;
2197        }
2198
2199        // Expose mapping from T-shirt sizes to actual sizes
2200        builtin_table_updates.extend(
2201            self.catalog().state().resolve_builtin_table_updates(
2202                self.catalog().state().pack_all_replica_size_updates(),
2203            ),
2204        );
2205
2206        debug!("startup: coordinator init: bootstrap: initializing migrated builtin tables");
2207        // When 0dt is enabled, we create new shards for any migrated builtin storage collections.
2208        // In read-only mode, the migrated builtin tables (which are a subset of migrated builtin
2209        // storage collections) need to be back-filled so that any dependent dataflow can be
2210        // hydrated. Additionally, these shards are not registered with the txn-shard, and cannot
2211        // be registered while in read-only, so they are written to directly.
2212        let migrated_updates_fut = if self.controller.read_only() {
2213            let min_timestamp = Timestamp::minimum();
2214            let migrated_builtin_table_updates: Vec<_> = builtin_table_updates
2215                .extract_if(.., |update| {
2216                    let gid = self.catalog().get_entry(&update.id).latest_global_id();
2217                    migrated_storage_collections_0dt.contains(&update.id)
2218                        && self
2219                            .controller
2220                            .storage_collections
2221                            .collection_frontiers(gid)
2222                            .expect("all tables are registered")
2223                            .write_frontier
2224                            .elements()
2225                            == &[min_timestamp]
2226                })
2227                .collect();
2228            if migrated_builtin_table_updates.is_empty() {
2229                futures::future::ready(()).boxed()
2230            } else {
2231                // Group all updates per-table.
2232                let mut grouped_appends: BTreeMap<GlobalId, Vec<TableData>> = BTreeMap::new();
2233                for update in migrated_builtin_table_updates {
2234                    let gid = self.catalog().get_entry(&update.id).latest_global_id();
2235                    grouped_appends.entry(gid).or_default().push(update.data);
2236                }
2237                info!(
2238                    "coordinator init: rehydrating migrated builtin tables in read-only mode: {:?}",
2239                    grouped_appends.keys().collect::<Vec<_>>()
2240                );
2241
2242                // Consolidate Row data, staged batches must already be consolidated.
2243                let mut all_appends = Vec::with_capacity(grouped_appends.len());
2244                for (item_id, table_data) in grouped_appends.into_iter() {
2245                    let mut all_rows = Vec::new();
2246                    let mut all_data = Vec::new();
2247                    for data in table_data {
2248                        match data {
2249                            TableData::Rows(rows) => all_rows.extend(rows),
2250                            TableData::Batches(_) => all_data.push(data),
2251                        }
2252                    }
2253                    differential_dataflow::consolidation::consolidate(&mut all_rows);
2254                    all_data.push(TableData::Rows(all_rows));
2255
2256                    // TODO(parkmycar): Use SmallVec throughout.
2257                    all_appends.push((item_id, all_data));
2258                }
2259
2260                let fut = self
2261                    .controller
2262                    .storage
2263                    .append_table(min_timestamp, boot_ts.step_forward(), all_appends)
2264                    .expect("cannot fail to append");
2265                async {
2266                    fut.await
2267                        .expect("One-shot shouldn't be dropped during bootstrap")
2268                        .unwrap_or_terminate("cannot fail to append")
2269                }
2270                .boxed()
2271            }
2272        } else {
2273            futures::future::ready(()).boxed()
2274        };
2275
2276        info!(
2277            "startup: coordinator init: bootstrap: postamble complete in {:?}",
2278            postamble_start.elapsed()
2279        );
2280
2281        let builtin_update_start = Instant::now();
2282        info!("startup: coordinator init: bootstrap: generate builtin updates beginning");
2283
2284        if self.controller.read_only() {
2285            info!(
2286                "coordinator init: bootstrap: stashing builtin table updates while in read-only mode"
2287            );
2288
2289            // TODO(jkosh44) Optimize deserializing the audit log in read-only mode.
2290            let audit_join_start = Instant::now();
2291            info!("startup: coordinator init: bootstrap: audit log deserialization beginning");
2292            let audit_log_updates: Vec<_> = audit_logs_iterator
2293                .map(|(audit_log, ts)| StateUpdate {
2294                    kind: StateUpdateKind::AuditLog(audit_log),
2295                    ts,
2296                    diff: StateDiff::Addition,
2297                })
2298                .collect();
2299            let audit_log_builtin_table_updates = self
2300                .catalog()
2301                .state()
2302                .generate_builtin_table_updates(audit_log_updates);
2303            builtin_table_updates.extend(audit_log_builtin_table_updates);
2304            info!(
2305                "startup: coordinator init: bootstrap: audit log deserialization complete in {:?}",
2306                audit_join_start.elapsed()
2307            );
2308            self.buffered_builtin_table_updates
2309                .as_mut()
2310                .expect("in read-only mode")
2311                .append(&mut builtin_table_updates);
2312        } else {
2313            self.bootstrap_tables(&entries, builtin_table_updates, audit_logs_iterator)
2314                .await;
2315        };
2316        info!(
2317            "startup: coordinator init: bootstrap: generate builtin updates complete in {:?}",
2318            builtin_update_start.elapsed()
2319        );
2320
2321        let cleanup_secrets_start = Instant::now();
2322        info!("startup: coordinator init: bootstrap: generate secret cleanup beginning");
2323        // Cleanup orphaned secrets. Errors during list() or delete() do not
2324        // need to prevent bootstrap from succeeding; we will retry next
2325        // startup.
2326        {
2327            // Destructure Self so we can selectively move fields into the async
2328            // task.
2329            let Self {
2330                secrets_controller,
2331                catalog,
2332                ..
2333            } = self;
2334
2335            let next_user_item_id = catalog.get_next_user_item_id().await?;
2336            let next_system_item_id = catalog.get_next_system_item_id().await?;
2337            let read_only = self.controller.read_only();
2338            // Fetch all IDs from the catalog to future-proof against other
2339            // things using secrets. Today, SECRET and CONNECTION objects use
2340            // secrets_controller.ensure, but more things could in the future
2341            // that would be easy to miss adding here.
2342            let catalog_ids: BTreeSet<CatalogItemId> =
2343                catalog.entries().map(|entry| entry.id()).collect();
2344            let secrets_controller = Arc::clone(secrets_controller);
2345
2346            spawn(|| "cleanup-orphaned-secrets", async move {
2347                if read_only {
2348                    info!(
2349                        "coordinator init: not cleaning up orphaned secrets while in read-only mode"
2350                    );
2351                    return;
2352                }
2353                info!("coordinator init: cleaning up orphaned secrets");
2354
2355                match secrets_controller.list().await {
2356                    Ok(controller_secrets) => {
2357                        let controller_secrets: BTreeSet<CatalogItemId> =
2358                            controller_secrets.into_iter().collect();
2359                        let orphaned = controller_secrets.difference(&catalog_ids);
2360                        for id in orphaned {
2361                            let id_too_large = match id {
2362                                CatalogItemId::System(id) => *id >= next_system_item_id,
2363                                CatalogItemId::User(id) => *id >= next_user_item_id,
2364                                CatalogItemId::IntrospectionSourceIndex(_)
2365                                | CatalogItemId::Transient(_) => false,
2366                            };
2367                            if id_too_large {
2368                                info!(
2369                                    %next_user_item_id, %next_system_item_id,
2370                                    "coordinator init: not deleting orphaned secret {id} that was likely created by a newer deploy generation"
2371                                );
2372                            } else {
2373                                info!("coordinator init: deleting orphaned secret {id}");
2374                                fail_point!("orphan_secrets");
2375                                if let Err(e) = secrets_controller.delete(*id).await {
2376                                    warn!(
2377                                        "Dropping orphaned secret has encountered an error: {}",
2378                                        e
2379                                    );
2380                                }
2381                            }
2382                        }
2383                    }
2384                    Err(e) => warn!("Failed to list secrets during orphan cleanup: {:?}", e),
2385                }
2386            });
2387        }
2388        info!(
2389            "startup: coordinator init: bootstrap: generate secret cleanup complete in {:?}",
2390            cleanup_secrets_start.elapsed()
2391        );
2392
2393        // Run all of our final steps concurrently.
2394        let final_steps_start = Instant::now();
2395        info!(
2396            "startup: coordinator init: bootstrap: migrate builtin tables in read-only mode beginning"
2397        );
2398        migrated_updates_fut
2399            .instrument(info_span!("coord::bootstrap::final"))
2400            .await;
2401
2402        debug!(
2403            "startup: coordinator init: bootstrap: announcing completion of initialization to controller"
2404        );
2405        // Announce the completion of initialization.
2406        self.controller.initialization_complete();
2407
2408        // Initialize unified introspection.
2409        self.bootstrap_introspection_subscribes().await;
2410
2411        info!(
2412            "startup: coordinator init: bootstrap: migrate builtin tables in read-only mode complete in {:?}",
2413            final_steps_start.elapsed()
2414        );
2415
2416        info!(
2417            "startup: coordinator init: bootstrap complete in {:?}",
2418            bootstrap_start.elapsed()
2419        );
2420        Ok(())
2421    }
2422
2423    /// Prepares tables for writing by resetting them to a known state and
2424    /// appending the given builtin table updates. The timestamp oracle
2425    /// will be advanced to the write timestamp of the append when this
2426    /// method returns.
2427    #[allow(clippy::async_yields_async)]
2428    #[instrument]
2429    async fn bootstrap_tables(
2430        &mut self,
2431        entries: &[CatalogEntry],
2432        mut builtin_table_updates: Vec<BuiltinTableUpdate>,
2433        audit_logs_iterator: AuditLogIterator,
2434    ) {
2435        /// Smaller helper struct of metadata for bootstrapping tables.
2436        struct TableMetadata<'a> {
2437            id: CatalogItemId,
2438            name: &'a QualifiedItemName,
2439            table: &'a Table,
2440        }
2441
2442        // Filter our entries down to just tables.
2443        let table_metas: Vec<_> = entries
2444            .into_iter()
2445            .filter_map(|entry| {
2446                entry.table().map(|table| TableMetadata {
2447                    id: entry.id(),
2448                    name: entry.name(),
2449                    table,
2450                })
2451            })
2452            .collect();
2453
2454        // Append empty batches to advance the timestamp of all tables.
2455        debug!("coordinator init: advancing all tables to current timestamp");
2456        let WriteTimestamp {
2457            timestamp: write_ts,
2458            advance_to,
2459        } = self.get_local_write_ts().await;
2460        let appends = table_metas
2461            .iter()
2462            .map(|meta| (meta.table.global_id_writes(), Vec::new()))
2463            .collect();
2464        // Append the tables in the background. We apply the write timestamp before getting a read
2465        // timestamp and reading a snapshot of each table, so the snapshots will block on their own
2466        // until the appends are complete.
2467        let table_fence_rx = self
2468            .controller
2469            .storage
2470            .append_table(write_ts.clone(), advance_to, appends)
2471            .expect("invalid updates");
2472
2473        self.apply_local_write(write_ts).await;
2474
2475        // Add builtin table updates the clear the contents of all system tables
2476        debug!("coordinator init: resetting system tables");
2477        let read_ts = self.get_local_read_ts().await;
2478
2479        // Filter out the 'mz_storage_usage_by_shard' table since we need to retain that info for
2480        // billing purposes.
2481        let mz_storage_usage_by_shard_schema: SchemaSpecifier = self
2482            .catalog()
2483            .resolve_system_schema(MZ_STORAGE_USAGE_BY_SHARD.schema)
2484            .into();
2485        let is_storage_usage_by_shard = |meta: &TableMetadata| -> bool {
2486            meta.name.item == MZ_STORAGE_USAGE_BY_SHARD.name
2487                && meta.name.qualifiers.schema_spec == mz_storage_usage_by_shard_schema
2488        };
2489
2490        let mut retraction_tasks = Vec::new();
2491        let mut system_tables: Vec<_> = table_metas
2492            .iter()
2493            .filter(|meta| meta.id.is_system() && !is_storage_usage_by_shard(meta))
2494            .collect();
2495
2496        // Special case audit events because it's append only.
2497        let (audit_events_idx, _) = system_tables
2498            .iter()
2499            .find_position(|table| {
2500                table.id == self.catalog().resolve_builtin_table(&MZ_AUDIT_EVENTS)
2501            })
2502            .expect("mz_audit_events must exist");
2503        let audit_events = system_tables.remove(audit_events_idx);
2504        let audit_log_task = self.bootstrap_audit_log_table(
2505            audit_events.id,
2506            audit_events.name,
2507            audit_events.table,
2508            audit_logs_iterator,
2509            read_ts,
2510        );
2511
2512        for system_table in system_tables {
2513            let table_id = system_table.id;
2514            let full_name = self.catalog().resolve_full_name(system_table.name, None);
2515            debug!("coordinator init: resetting system table {full_name} ({table_id})");
2516
2517            // Fetch the current contents of the table for retraction.
2518            let snapshot_fut = self
2519                .controller
2520                .storage_collections
2521                .snapshot_cursor(system_table.table.global_id_writes(), read_ts);
2522            let batch_fut = self
2523                .controller
2524                .storage_collections
2525                .create_update_builder(system_table.table.global_id_writes());
2526
2527            let task = spawn(|| format!("snapshot-{table_id}"), async move {
2528                // Create a TimestamplessUpdateBuilder.
2529                let mut batch = batch_fut
2530                    .await
2531                    .unwrap_or_terminate("cannot fail to create a batch for a BuiltinTable");
2532                tracing::info!(?table_id, "starting snapshot");
2533                // Get a cursor which will emit a consolidated snapshot.
2534                let mut snapshot_cursor = snapshot_fut
2535                    .await
2536                    .unwrap_or_terminate("cannot fail to snapshot");
2537
2538                // Retract the current contents, spilling into our builder.
2539                while let Some(values) = snapshot_cursor.next().await {
2540                    for ((key, _val), _t, d) in values {
2541                        let key = key.expect("builtin table had errors");
2542                        let d_invert = d.neg();
2543                        batch.add(&key, &(), &d_invert).await;
2544                    }
2545                }
2546                tracing::info!(?table_id, "finished snapshot");
2547
2548                let batch = batch.finish().await;
2549                BuiltinTableUpdate::batch(table_id, batch)
2550            });
2551            retraction_tasks.push(task);
2552        }
2553
2554        let retractions_res = futures::future::join_all(retraction_tasks).await;
2555        for retractions in retractions_res {
2556            let retractions = retractions.expect("cannot fail to fetch snapshot");
2557            builtin_table_updates.push(retractions);
2558        }
2559
2560        let audit_join_start = Instant::now();
2561        info!("startup: coordinator init: bootstrap: join audit log deserialization beginning");
2562        let audit_log_updates = audit_log_task
2563            .await
2564            .expect("cannot fail to fetch audit log updates");
2565        let audit_log_builtin_table_updates = self
2566            .catalog()
2567            .state()
2568            .generate_builtin_table_updates(audit_log_updates);
2569        builtin_table_updates.extend(audit_log_builtin_table_updates);
2570        info!(
2571            "startup: coordinator init: bootstrap: join audit log deserialization complete in {:?}",
2572            audit_join_start.elapsed()
2573        );
2574
2575        // Now that the snapshots are complete, the appends must also be complete.
2576        table_fence_rx
2577            .await
2578            .expect("One-shot shouldn't be dropped during bootstrap")
2579            .unwrap_or_terminate("cannot fail to append");
2580
2581        info!("coordinator init: sending builtin table updates");
2582        let (_builtin_updates_fut, write_ts) = self
2583            .builtin_table_update()
2584            .execute(builtin_table_updates)
2585            .await;
2586        info!(?write_ts, "our write ts");
2587        if let Some(write_ts) = write_ts {
2588            self.apply_local_write(write_ts).await;
2589        }
2590    }
2591
2592    /// Prepare updates to the audit log table. The audit log table append only and very large, so
2593    /// we only need to find the events present in `audit_logs_iterator` but not in the audit log
2594    /// table.
2595    #[instrument]
2596    fn bootstrap_audit_log_table<'a>(
2597        &mut self,
2598        table_id: CatalogItemId,
2599        name: &'a QualifiedItemName,
2600        table: &'a Table,
2601        audit_logs_iterator: AuditLogIterator,
2602        read_ts: Timestamp,
2603    ) -> JoinHandle<Vec<StateUpdate>> {
2604        let full_name = self.catalog().resolve_full_name(name, None);
2605        debug!("coordinator init: reconciling audit log: {full_name} ({table_id})");
2606        let current_contents_fut = self
2607            .controller
2608            .storage_collections
2609            .snapshot(table.global_id_writes(), read_ts);
2610        spawn(|| format!("snapshot-audit-log-{table_id}"), async move {
2611            let current_contents = current_contents_fut
2612                .await
2613                .unwrap_or_terminate("cannot fail to fetch snapshot");
2614            let contents_len = current_contents.len();
2615            debug!("coordinator init: audit log table ({table_id}) size {contents_len}");
2616
2617            // Fetch the largest audit log event ID that has been written to the table.
2618            let max_table_id = current_contents
2619                .into_iter()
2620                .filter(|(_, diff)| *diff == 1)
2621                .map(|(row, _diff)| row.unpack_first().unwrap_uint64())
2622                .sorted()
2623                .rev()
2624                .next();
2625
2626            // Filter audit log catalog updates to those that are not present in the table.
2627            audit_logs_iterator
2628                .take_while(|(audit_log, _)| match max_table_id {
2629                    Some(id) => audit_log.event.sortable_id() > id,
2630                    None => true,
2631                })
2632                .map(|(audit_log, ts)| StateUpdate {
2633                    kind: StateUpdateKind::AuditLog(audit_log),
2634                    ts,
2635                    diff: StateDiff::Addition,
2636                })
2637                .collect::<Vec<_>>()
2638        })
2639    }
2640
2641    /// Initializes all storage collections required by catalog objects in the storage controller.
2642    ///
2643    /// This method takes care of collection creation, as well as migration of existing
2644    /// collections.
2645    ///
2646    /// Creating all storage collections in a single `create_collections` call, rather than on
2647    /// demand, is more efficient as it reduces the number of writes to durable storage. It also
2648    /// allows subsequent bootstrap logic to fetch metadata (such as frontiers) of arbitrary
2649    /// storage collections, without needing to worry about dependency order.
2650    ///
2651    /// `migrated_storage_collections` is a set of builtin storage collections that have been
2652    /// migrated and should be handled specially.
2653    #[instrument]
2654    async fn bootstrap_storage_collections(
2655        &mut self,
2656        migrated_storage_collections: &BTreeSet<CatalogItemId>,
2657    ) {
2658        let catalog = self.catalog();
2659        let source_status_collection_id = catalog
2660            .resolve_builtin_storage_collection(&mz_catalog::builtin::MZ_SOURCE_STATUS_HISTORY);
2661        let source_status_collection_id = catalog
2662            .get_entry(&source_status_collection_id)
2663            .latest_global_id();
2664
2665        let source_desc = |object_id: GlobalId,
2666                           data_source: &DataSourceDesc,
2667                           desc: &RelationDesc,
2668                           timeline: &Timeline| {
2669            let (data_source, status_collection_id) = match data_source.clone() {
2670                // Re-announce the source description.
2671                DataSourceDesc::Ingestion { desc, cluster_id } => {
2672                    let desc = desc.into_inline_connection(catalog.state());
2673                    let ingestion = IngestionDescription::new(desc, cluster_id, object_id);
2674
2675                    (
2676                        DataSource::Ingestion(ingestion),
2677                        Some(source_status_collection_id),
2678                    )
2679                }
2680                DataSourceDesc::OldSyntaxIngestion {
2681                    desc,
2682                    progress_subsource,
2683                    data_config,
2684                    details,
2685                    cluster_id,
2686                } => {
2687                    let desc = desc.into_inline_connection(catalog.state());
2688                    let data_config = data_config.into_inline_connection(catalog.state());
2689                    // TODO(parkmycar): We should probably check the type here, but I'm not sure if
2690                    // this will always be a Source or a Table.
2691                    let progress_subsource =
2692                        catalog.get_entry(&progress_subsource).latest_global_id();
2693                    let mut ingestion =
2694                        IngestionDescription::new(desc, cluster_id, progress_subsource);
2695                    let legacy_export = SourceExport {
2696                        storage_metadata: (),
2697                        data_config,
2698                        details,
2699                    };
2700                    ingestion.source_exports.insert(object_id, legacy_export);
2701
2702                    (
2703                        DataSource::Ingestion(ingestion),
2704                        Some(source_status_collection_id),
2705                    )
2706                }
2707                DataSourceDesc::IngestionExport {
2708                    ingestion_id,
2709                    external_reference: _,
2710                    details,
2711                    data_config,
2712                } => {
2713                    // TODO(parkmycar): We should probably check the type here, but I'm not sure if
2714                    // this will always be a Source or a Table.
2715                    let ingestion_id = catalog.get_entry(&ingestion_id).latest_global_id();
2716                    (
2717                        DataSource::IngestionExport {
2718                            ingestion_id,
2719                            details,
2720                            data_config: data_config.into_inline_connection(catalog.state()),
2721                        },
2722                        Some(source_status_collection_id),
2723                    )
2724                }
2725                DataSourceDesc::Webhook { .. } => {
2726                    (DataSource::Webhook, Some(source_status_collection_id))
2727                }
2728                DataSourceDesc::Progress => (DataSource::Progress, None),
2729                DataSourceDesc::Introspection(introspection) => {
2730                    (DataSource::Introspection(introspection), None)
2731                }
2732            };
2733            CollectionDescription {
2734                desc: desc.clone(),
2735                data_source,
2736                since: None,
2737                status_collection_id,
2738                timeline: Some(timeline.clone()),
2739            }
2740        };
2741
2742        let mut compute_collections = vec![];
2743        let mut collections = vec![];
2744        let mut new_builtin_continual_tasks = vec![];
2745        for entry in catalog.entries() {
2746            match entry.item() {
2747                CatalogItem::Source(source) => {
2748                    collections.push((
2749                        source.global_id(),
2750                        source_desc(
2751                            source.global_id(),
2752                            &source.data_source,
2753                            &source.desc,
2754                            &source.timeline,
2755                        ),
2756                    ));
2757                }
2758                CatalogItem::Table(table) => {
2759                    match &table.data_source {
2760                        TableDataSource::TableWrites { defaults: _ } => {
2761                            let versions: BTreeMap<_, _> = table
2762                                .collection_descs()
2763                                .map(|(gid, version, desc)| (version, (gid, desc)))
2764                                .collect();
2765                            let collection_descs = versions.iter().map(|(version, (gid, desc))| {
2766                                let next_version = version.bump();
2767                                let primary_collection =
2768                                    versions.get(&next_version).map(|(gid, _desc)| gid).copied();
2769                                let collection_desc = CollectionDescription::for_table(
2770                                    desc.clone(),
2771                                    primary_collection,
2772                                );
2773
2774                                (*gid, collection_desc)
2775                            });
2776                            collections.extend(collection_descs);
2777                        }
2778                        TableDataSource::DataSource {
2779                            desc: data_source_desc,
2780                            timeline,
2781                        } => {
2782                            // TODO(alter_table): Support versioning tables that read from sources.
2783                            soft_assert_eq_or_log!(table.collections.len(), 1);
2784                            let collection_descs =
2785                                table.collection_descs().map(|(gid, _version, desc)| {
2786                                    (
2787                                        gid,
2788                                        source_desc(
2789                                            entry.latest_global_id(),
2790                                            data_source_desc,
2791                                            &desc,
2792                                            timeline,
2793                                        ),
2794                                    )
2795                                });
2796                            collections.extend(collection_descs);
2797                        }
2798                    };
2799                }
2800                CatalogItem::MaterializedView(mv) => {
2801                    let collection_desc = CollectionDescription {
2802                        desc: mv.desc.clone(),
2803                        data_source: DataSource::Other,
2804                        since: mv.initial_as_of.clone(),
2805                        status_collection_id: None,
2806                        timeline: None,
2807                    };
2808                    compute_collections.push((mv.global_id(), mv.desc.clone()));
2809                    collections.push((mv.global_id(), collection_desc));
2810                }
2811                CatalogItem::ContinualTask(ct) => {
2812                    let collection_desc = CollectionDescription {
2813                        desc: ct.desc.clone(),
2814                        data_source: DataSource::Other,
2815                        since: ct.initial_as_of.clone(),
2816                        status_collection_id: None,
2817                        timeline: None,
2818                    };
2819                    if ct.global_id().is_system() && collection_desc.since.is_none() {
2820                        // We need a non-0 since to make as_of selection work. Fill it in below with
2821                        // the `bootstrap_builtin_continual_tasks` call, which can only be run after
2822                        // `create_collections_for_bootstrap`.
2823                        new_builtin_continual_tasks.push((ct.global_id(), collection_desc));
2824                    } else {
2825                        compute_collections.push((ct.global_id(), ct.desc.clone()));
2826                        collections.push((ct.global_id(), collection_desc));
2827                    }
2828                }
2829                CatalogItem::Sink(sink) => {
2830                    let storage_sink_from_entry = self.catalog().get_entry_by_global_id(&sink.from);
2831                    let from_desc = storage_sink_from_entry
2832                        .desc(&self.catalog().resolve_full_name(
2833                            storage_sink_from_entry.name(),
2834                            storage_sink_from_entry.conn_id(),
2835                        ))
2836                        .expect("sinks can only be built on items with descs")
2837                        .into_owned();
2838                    let collection_desc = CollectionDescription {
2839                        // TODO(sinks): make generic once we have more than one sink type.
2840                        desc: KAFKA_PROGRESS_DESC.clone(),
2841                        data_source: DataSource::Sink {
2842                            desc: ExportDescription {
2843                                sink: StorageSinkDesc {
2844                                    from: sink.from,
2845                                    from_desc,
2846                                    connection: sink
2847                                        .connection
2848                                        .clone()
2849                                        .into_inline_connection(self.catalog().state()),
2850                                    envelope: sink.envelope,
2851                                    as_of: Antichain::from_elem(Timestamp::minimum()),
2852                                    with_snapshot: sink.with_snapshot,
2853                                    version: sink.version,
2854                                    from_storage_metadata: (),
2855                                    to_storage_metadata: (),
2856                                },
2857                                instance_id: sink.cluster_id,
2858                            },
2859                        },
2860                        since: None,
2861                        status_collection_id: None,
2862                        timeline: None,
2863                    };
2864                    collections.push((sink.global_id, collection_desc));
2865                }
2866                _ => (),
2867            }
2868        }
2869
2870        let register_ts = if self.controller.read_only() {
2871            self.get_local_read_ts().await
2872        } else {
2873            // Getting a write timestamp bumps the write timestamp in the
2874            // oracle, which we're not allowed in read-only mode.
2875            self.get_local_write_ts().await.timestamp
2876        };
2877
2878        let storage_metadata = self.catalog.state().storage_metadata();
2879        let migrated_storage_collections = migrated_storage_collections
2880            .into_iter()
2881            .flat_map(|item_id| self.catalog.get_entry(item_id).global_ids())
2882            .collect();
2883
2884        // Before possibly creating collections, make sure their schemas are correct.
2885        //
2886        // Across different versions of Materialize the nullability of columns can change based on
2887        // updates to our optimizer.
2888        self.controller
2889            .storage
2890            .evolve_nullability_for_bootstrap(storage_metadata, compute_collections)
2891            .await
2892            .unwrap_or_terminate("cannot fail to evolve collections");
2893
2894        self.controller
2895            .storage
2896            .create_collections_for_bootstrap(
2897                storage_metadata,
2898                Some(register_ts),
2899                collections,
2900                &migrated_storage_collections,
2901            )
2902            .await
2903            .unwrap_or_terminate("cannot fail to create collections");
2904
2905        self.bootstrap_builtin_continual_tasks(new_builtin_continual_tasks)
2906            .await;
2907
2908        if !self.controller.read_only() {
2909            self.apply_local_write(register_ts).await;
2910        }
2911    }
2912
2913    /// Make as_of selection happy for builtin CTs. Ideally we'd write the
2914    /// initial as_of down in the durable catalog, but that's hard because of
2915    /// boot ordering. Instead, we set the since of the storage collection to
2916    /// something that's a reasonable lower bound for the as_of. Then, if the
2917    /// upper is 0, the as_of selection code will allow us to jump it forward to
2918    /// this since.
2919    async fn bootstrap_builtin_continual_tasks(
2920        &mut self,
2921        // TODO(alter_table): Switch to CatalogItemId.
2922        mut collections: Vec<(GlobalId, CollectionDescription<Timestamp>)>,
2923    ) {
2924        for (id, collection) in &mut collections {
2925            let entry = self.catalog.get_entry_by_global_id(id);
2926            let ct = match &entry.item {
2927                CatalogItem::ContinualTask(ct) => ct.clone(),
2928                _ => unreachable!("only called with continual task builtins"),
2929            };
2930            let debug_name = self
2931                .catalog()
2932                .resolve_full_name(entry.name(), None)
2933                .to_string();
2934            let (_optimized_plan, physical_plan, _metainfo) = self
2935                .optimize_create_continual_task(&ct, *id, self.owned_catalog(), debug_name)
2936                .expect("builtin CT should optimize successfully");
2937
2938            // Determine an as of for the new continual task.
2939            let mut id_bundle = dataflow_import_id_bundle(&physical_plan, ct.cluster_id);
2940            // Can't acquire a read hold on ourselves because we don't exist yet.
2941            id_bundle.storage_ids.remove(id);
2942            let read_holds = self.acquire_read_holds(&id_bundle);
2943            let as_of = read_holds.least_valid_read();
2944
2945            collection.since = Some(as_of.clone());
2946        }
2947        self.controller
2948            .storage
2949            .create_collections(self.catalog.state().storage_metadata(), None, collections)
2950            .await
2951            .unwrap_or_terminate("cannot fail to create collections");
2952    }
2953
2954    /// Invokes the optimizer on all indexes and materialized views in the catalog and inserts the
2955    /// resulting dataflow plans into the catalog state.
2956    ///
2957    /// `ordered_catalog_entries` must be sorted in dependency order, with dependencies ordered
2958    /// before their dependants.
2959    ///
2960    /// This method does not perform timestamp selection for the dataflows, nor does it create them
2961    /// in the compute controller. Both of these steps happen later during bootstrapping.
2962    ///
2963    /// Returns a map of expressions that were not cached.
2964    #[instrument]
2965    fn bootstrap_dataflow_plans(
2966        &mut self,
2967        ordered_catalog_entries: &[CatalogEntry],
2968        mut cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
2969    ) -> Result<BTreeMap<GlobalId, GlobalExpressions>, AdapterError> {
2970        // The optimizer expects to be able to query its `ComputeInstanceSnapshot` for
2971        // collections the current dataflow can depend on. But since we don't yet install anything
2972        // on compute instances, the snapshot information is incomplete. We fix that by manually
2973        // updating `ComputeInstanceSnapshot` objects to ensure they contain collections previously
2974        // optimized.
2975        let mut instance_snapshots = BTreeMap::new();
2976        let mut uncached_expressions = BTreeMap::new();
2977
2978        let optimizer_config = OptimizerConfig::from(self.catalog().system_config());
2979
2980        for entry in ordered_catalog_entries {
2981            match entry.item() {
2982                CatalogItem::Index(idx) => {
2983                    // Collect optimizer parameters.
2984                    let compute_instance =
2985                        instance_snapshots.entry(idx.cluster_id).or_insert_with(|| {
2986                            self.instance_snapshot(idx.cluster_id)
2987                                .expect("compute instance exists")
2988                        });
2989                    let global_id = idx.global_id();
2990
2991                    // The index may already be installed on the compute instance. For example,
2992                    // this is the case for introspection indexes.
2993                    if compute_instance.contains_collection(&global_id) {
2994                        continue;
2995                    }
2996
2997                    let (optimized_plan, physical_plan, metainfo) =
2998                        match cached_global_exprs.remove(&global_id) {
2999                            Some(global_expressions)
3000                                if global_expressions.optimizer_features
3001                                    == optimizer_config.features =>
3002                            {
3003                                debug!("global expression cache hit for {global_id:?}");
3004                                (
3005                                    global_expressions.global_mir,
3006                                    global_expressions.physical_plan,
3007                                    global_expressions.dataflow_metainfos,
3008                                )
3009                            }
3010                            Some(_) | None => {
3011                                let (optimized_plan, global_lir_plan) = {
3012                                    // Build an optimizer for this INDEX.
3013                                    let mut optimizer = optimize::index::Optimizer::new(
3014                                        self.owned_catalog(),
3015                                        compute_instance.clone(),
3016                                        global_id,
3017                                        optimizer_config.clone(),
3018                                        self.optimizer_metrics(),
3019                                    );
3020
3021                                    // MIR ⇒ MIR optimization (global)
3022                                    let index_plan = optimize::index::Index::new(
3023                                        entry.name().clone(),
3024                                        idx.on,
3025                                        idx.keys.to_vec(),
3026                                    );
3027                                    let global_mir_plan = optimizer.optimize(index_plan)?;
3028                                    let optimized_plan = global_mir_plan.df_desc().clone();
3029
3030                                    // MIR ⇒ LIR lowering and LIR ⇒ LIR optimization (global)
3031                                    let global_lir_plan = optimizer.optimize(global_mir_plan)?;
3032
3033                                    (optimized_plan, global_lir_plan)
3034                                };
3035
3036                                let (physical_plan, metainfo) = global_lir_plan.unapply();
3037                                let metainfo = {
3038                                    // Pre-allocate a vector of transient GlobalIds for each notice.
3039                                    let notice_ids =
3040                                        std::iter::repeat_with(|| self.allocate_transient_id())
3041                                            .map(|(_item_id, gid)| gid)
3042                                            .take(metainfo.optimizer_notices.len())
3043                                            .collect::<Vec<_>>();
3044                                    // Return a metainfo with rendered notices.
3045                                    self.catalog().render_notices(
3046                                        metainfo,
3047                                        notice_ids,
3048                                        Some(idx.global_id()),
3049                                    )
3050                                };
3051                                uncached_expressions.insert(
3052                                    global_id,
3053                                    GlobalExpressions {
3054                                        global_mir: optimized_plan.clone(),
3055                                        physical_plan: physical_plan.clone(),
3056                                        dataflow_metainfos: metainfo.clone(),
3057                                        optimizer_features: OptimizerFeatures::from(
3058                                            self.catalog().system_config(),
3059                                        ),
3060                                    },
3061                                );
3062                                (optimized_plan, physical_plan, metainfo)
3063                            }
3064                        };
3065
3066                    let catalog = self.catalog_mut();
3067                    catalog.set_optimized_plan(idx.global_id(), optimized_plan);
3068                    catalog.set_physical_plan(idx.global_id(), physical_plan);
3069                    catalog.set_dataflow_metainfo(idx.global_id(), metainfo);
3070
3071                    compute_instance.insert_collection(idx.global_id());
3072                }
3073                CatalogItem::MaterializedView(mv) => {
3074                    // Collect optimizer parameters.
3075                    let compute_instance =
3076                        instance_snapshots.entry(mv.cluster_id).or_insert_with(|| {
3077                            self.instance_snapshot(mv.cluster_id)
3078                                .expect("compute instance exists")
3079                        });
3080                    let global_id = mv.global_id();
3081
3082                    let (optimized_plan, physical_plan, metainfo) =
3083                        match cached_global_exprs.remove(&global_id) {
3084                            Some(global_expressions)
3085                                if global_expressions.optimizer_features
3086                                    == optimizer_config.features =>
3087                            {
3088                                debug!("global expression cache hit for {global_id:?}");
3089                                (
3090                                    global_expressions.global_mir,
3091                                    global_expressions.physical_plan,
3092                                    global_expressions.dataflow_metainfos,
3093                                )
3094                            }
3095                            Some(_) | None => {
3096                                let (_, internal_view_id) = self.allocate_transient_id();
3097                                let debug_name = self
3098                                    .catalog()
3099                                    .resolve_full_name(entry.name(), None)
3100                                    .to_string();
3101                                let force_non_monotonic = Default::default();
3102
3103                                let (optimized_plan, global_lir_plan) = {
3104                                    // Build an optimizer for this MATERIALIZED VIEW.
3105                                    let mut optimizer = optimize::materialized_view::Optimizer::new(
3106                                        self.owned_catalog().as_optimizer_catalog(),
3107                                        compute_instance.clone(),
3108                                        global_id,
3109                                        internal_view_id,
3110                                        mv.desc.iter_names().cloned().collect(),
3111                                        mv.non_null_assertions.clone(),
3112                                        mv.refresh_schedule.clone(),
3113                                        debug_name,
3114                                        optimizer_config.clone(),
3115                                        self.optimizer_metrics(),
3116                                        force_non_monotonic,
3117                                    );
3118
3119                                    // MIR ⇒ MIR optimization (global)
3120                                    let global_mir_plan =
3121                                        optimizer.optimize(mv.optimized_expr.as_ref().clone())?;
3122                                    let optimized_plan = global_mir_plan.df_desc().clone();
3123
3124                                    // MIR ⇒ LIR lowering and LIR ⇒ LIR optimization (global)
3125                                    let global_lir_plan = optimizer.optimize(global_mir_plan)?;
3126
3127                                    (optimized_plan, global_lir_plan)
3128                                };
3129
3130                                let (physical_plan, metainfo) = global_lir_plan.unapply();
3131                                let metainfo = {
3132                                    // Pre-allocate a vector of transient GlobalIds for each notice.
3133                                    let notice_ids =
3134                                        std::iter::repeat_with(|| self.allocate_transient_id())
3135                                            .map(|(_item_id, global_id)| global_id)
3136                                            .take(metainfo.optimizer_notices.len())
3137                                            .collect::<Vec<_>>();
3138                                    // Return a metainfo with rendered notices.
3139                                    self.catalog().render_notices(
3140                                        metainfo,
3141                                        notice_ids,
3142                                        Some(mv.global_id()),
3143                                    )
3144                                };
3145                                uncached_expressions.insert(
3146                                    global_id,
3147                                    GlobalExpressions {
3148                                        global_mir: optimized_plan.clone(),
3149                                        physical_plan: physical_plan.clone(),
3150                                        dataflow_metainfos: metainfo.clone(),
3151                                        optimizer_features: OptimizerFeatures::from(
3152                                            self.catalog().system_config(),
3153                                        ),
3154                                    },
3155                                );
3156                                (optimized_plan, physical_plan, metainfo)
3157                            }
3158                        };
3159
3160                    let catalog = self.catalog_mut();
3161                    catalog.set_optimized_plan(mv.global_id(), optimized_plan);
3162                    catalog.set_physical_plan(mv.global_id(), physical_plan);
3163                    catalog.set_dataflow_metainfo(mv.global_id(), metainfo);
3164
3165                    compute_instance.insert_collection(mv.global_id());
3166                }
3167                CatalogItem::ContinualTask(ct) => {
3168                    let compute_instance =
3169                        instance_snapshots.entry(ct.cluster_id).or_insert_with(|| {
3170                            self.instance_snapshot(ct.cluster_id)
3171                                .expect("compute instance exists")
3172                        });
3173                    let global_id = ct.global_id();
3174
3175                    let (optimized_plan, physical_plan, metainfo) =
3176                        match cached_global_exprs.remove(&global_id) {
3177                            Some(global_expressions)
3178                                if global_expressions.optimizer_features
3179                                    == optimizer_config.features =>
3180                            {
3181                                debug!("global expression cache hit for {global_id:?}");
3182                                (
3183                                    global_expressions.global_mir,
3184                                    global_expressions.physical_plan,
3185                                    global_expressions.dataflow_metainfos,
3186                                )
3187                            }
3188                            Some(_) | None => {
3189                                let debug_name = self
3190                                    .catalog()
3191                                    .resolve_full_name(entry.name(), None)
3192                                    .to_string();
3193                                let (optimized_plan, physical_plan, metainfo) = self
3194                                    .optimize_create_continual_task(
3195                                        ct,
3196                                        global_id,
3197                                        self.owned_catalog(),
3198                                        debug_name,
3199                                    )?;
3200                                uncached_expressions.insert(
3201                                    global_id,
3202                                    GlobalExpressions {
3203                                        global_mir: optimized_plan.clone(),
3204                                        physical_plan: physical_plan.clone(),
3205                                        dataflow_metainfos: metainfo.clone(),
3206                                        optimizer_features: OptimizerFeatures::from(
3207                                            self.catalog().system_config(),
3208                                        ),
3209                                    },
3210                                );
3211                                (optimized_plan, physical_plan, metainfo)
3212                            }
3213                        };
3214
3215                    let catalog = self.catalog_mut();
3216                    catalog.set_optimized_plan(ct.global_id(), optimized_plan);
3217                    catalog.set_physical_plan(ct.global_id(), physical_plan);
3218                    catalog.set_dataflow_metainfo(ct.global_id(), metainfo);
3219
3220                    compute_instance.insert_collection(ct.global_id());
3221                }
3222                _ => (),
3223            }
3224        }
3225
3226        Ok(uncached_expressions)
3227    }
3228
3229    /// Selects for each compute dataflow an as-of suitable for bootstrapping it.
3230    ///
3231    /// Returns a set of [`ReadHold`]s that ensures the read frontiers of involved collections stay
3232    /// in place and that must not be dropped before all compute dataflows have been created with
3233    /// the compute controller.
3234    ///
3235    /// This method expects all storage collections and dataflow plans to be available, so it must
3236    /// run after [`Coordinator::bootstrap_storage_collections`] and
3237    /// [`Coordinator::bootstrap_dataflow_plans`].
3238    async fn bootstrap_dataflow_as_ofs(&mut self) -> BTreeMap<GlobalId, ReadHold<Timestamp>> {
3239        let mut catalog_ids = Vec::new();
3240        let mut dataflows = Vec::new();
3241        let mut read_policies = BTreeMap::new();
3242        for entry in self.catalog.entries() {
3243            let gid = match entry.item() {
3244                CatalogItem::Index(idx) => idx.global_id(),
3245                CatalogItem::MaterializedView(mv) => mv.global_id(),
3246                CatalogItem::ContinualTask(ct) => ct.global_id(),
3247                CatalogItem::Table(_)
3248                | CatalogItem::Source(_)
3249                | CatalogItem::Log(_)
3250                | CatalogItem::View(_)
3251                | CatalogItem::Sink(_)
3252                | CatalogItem::Type(_)
3253                | CatalogItem::Func(_)
3254                | CatalogItem::Secret(_)
3255                | CatalogItem::Connection(_) => continue,
3256            };
3257            if let Some(plan) = self.catalog.try_get_physical_plan(&gid) {
3258                catalog_ids.push(gid);
3259                dataflows.push(plan.clone());
3260
3261                if let Some(compaction_window) = entry.item().initial_logical_compaction_window() {
3262                    read_policies.insert(gid, compaction_window.into());
3263                }
3264            }
3265        }
3266
3267        let read_ts = self.get_local_read_ts().await;
3268        let read_holds = as_of_selection::run(
3269            &mut dataflows,
3270            &read_policies,
3271            &*self.controller.storage_collections,
3272            read_ts,
3273            self.controller.read_only(),
3274        );
3275
3276        let catalog = self.catalog_mut();
3277        for (id, plan) in catalog_ids.into_iter().zip_eq(dataflows) {
3278            catalog.set_physical_plan(id, plan);
3279        }
3280
3281        read_holds
3282    }
3283
3284    /// Serves the coordinator, receiving commands from users over `cmd_rx`
3285    /// and feedback from dataflow workers over `feedback_rx`.
3286    ///
3287    /// You must call `bootstrap` before calling this method.
3288    ///
3289    /// BOXED FUTURE: As of Nov 2023 the returned Future from this function was 92KB. This would
3290    /// get stored on the stack which is bad for runtime performance, and blow up our stack usage.
3291    /// Because of that we purposefully move this Future onto the heap (i.e. Box it).
3292    fn serve(
3293        mut self,
3294        mut internal_cmd_rx: mpsc::UnboundedReceiver<Message>,
3295        mut strict_serializable_reads_rx: mpsc::UnboundedReceiver<(ConnectionId, PendingReadTxn)>,
3296        mut cmd_rx: mpsc::UnboundedReceiver<(OpenTelemetryContext, Command)>,
3297        group_commit_rx: appends::GroupCommitWaiter,
3298    ) -> LocalBoxFuture<'static, ()> {
3299        async move {
3300            // Watcher that listens for and reports cluster service status changes.
3301            let mut cluster_events = self.controller.events_stream();
3302            let last_message = Arc::new(Mutex::new(LastMessage {
3303                kind: "none",
3304                stmt: None,
3305            }));
3306
3307            let (idle_tx, mut idle_rx) = tokio::sync::mpsc::channel(1);
3308            let idle_metric = self.metrics.queue_busy_seconds.clone();
3309            let last_message_watchdog = Arc::clone(&last_message);
3310
3311            spawn(|| "coord watchdog", async move {
3312                // Every 5 seconds, attempt to measure how long it takes for the
3313                // coord select loop to be empty, because this message is the last
3314                // processed. If it is idle, this will result in some microseconds
3315                // of measurement.
3316                let mut interval = tokio::time::interval(Duration::from_secs(5));
3317                // If we end up having to wait more than 5 seconds for the coord to respond, then the
3318                // behavior of Delay results in the interval "restarting" from whenever we yield
3319                // instead of trying to catch up.
3320                interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
3321
3322                // Track if we become stuck to de-dupe error reporting.
3323                let mut coord_stuck = false;
3324
3325                loop {
3326                    interval.tick().await;
3327
3328                    // Wait for space in the channel, if we timeout then the coordinator is stuck!
3329                    let duration = tokio::time::Duration::from_secs(30);
3330                    let timeout = tokio::time::timeout(duration, idle_tx.reserve()).await;
3331                    let Ok(maybe_permit) = timeout else {
3332                        // Only log if we're newly stuck, to prevent logging repeatedly.
3333                        if !coord_stuck {
3334                            let last_message = last_message_watchdog.lock().expect("poisoned");
3335                            tracing::warn!(
3336                                last_message_kind = %last_message.kind,
3337                                last_message_sql = %last_message.stmt_to_string(),
3338                                "coordinator stuck for {duration:?}",
3339                            );
3340                        }
3341                        coord_stuck = true;
3342
3343                        continue;
3344                    };
3345
3346                    // We got a permit, we're not stuck!
3347                    if coord_stuck {
3348                        tracing::info!("Coordinator became unstuck");
3349                    }
3350                    coord_stuck = false;
3351
3352                    // If we failed to acquire a permit it's because we're shutting down.
3353                    let Ok(permit) = maybe_permit else {
3354                        break;
3355                    };
3356
3357                    permit.send(idle_metric.start_timer());
3358                }
3359            });
3360
3361            self.schedule_storage_usage_collection().await;
3362            self.spawn_privatelink_vpc_endpoints_watch_task();
3363            self.spawn_statement_logging_task();
3364            flags::tracing_config(self.catalog.system_config()).apply(&self.tracing_handle);
3365
3366            // Report if the handling of a single message takes longer than this threshold.
3367            let warn_threshold = self
3368                .catalog()
3369                .system_config()
3370                .coord_slow_message_warn_threshold();
3371
3372            // How many messages we'd like to batch up before processing them. Must be > 0.
3373            const MESSAGE_BATCH: usize = 64;
3374            let mut messages = Vec::with_capacity(MESSAGE_BATCH);
3375            let mut cmd_messages = Vec::with_capacity(MESSAGE_BATCH);
3376
3377            let message_batch = self.metrics.message_batch.clone();
3378
3379            loop {
3380                // Before adding a branch to this select loop, please ensure that the branch is
3381                // cancellation safe and add a comment explaining why. You can refer here for more
3382                // info: https://docs.rs/tokio/latest/tokio/macro.select.html#cancellation-safety
3383                select! {
3384                    // We prioritize internal commands over other commands. However, we work through
3385                    // batches of commands in some branches of this select, which means that even if
3386                    // a command generates internal commands, we will work through the current batch
3387                    // before receiving a new batch of commands.
3388                    biased;
3389
3390                    // `recv_many()` on `UnboundedReceiver` is cancellation safe:
3391                    // https://docs.rs/tokio/1.38.0/tokio/sync/mpsc/struct.UnboundedReceiver.html#cancel-safety-1
3392                    // Receive a batch of commands.
3393                    _ = internal_cmd_rx.recv_many(&mut messages, MESSAGE_BATCH) => {},
3394                    // `next()` on any stream is cancel-safe:
3395                    // https://docs.rs/tokio-stream/0.1.9/tokio_stream/trait.StreamExt.html#cancel-safety
3396                    // Receive a single command.
3397                    Some(event) = cluster_events.next() => messages.push(Message::ClusterEvent(event)),
3398                    // See [`mz_controller::Controller::Controller::ready`] for notes
3399                    // on why this is cancel-safe.
3400                    // Receive a single command.
3401                    () = self.controller.ready() => {
3402                        // NOTE: We don't get a `Readiness` back from `ready()`
3403                        // because the controller wants to keep it and it's not
3404                        // trivially `Clone` or `Copy`. Hence this accessor.
3405                        let controller = match self.controller.get_readiness() {
3406                            Readiness::Storage => ControllerReadiness::Storage,
3407                            Readiness::Compute => ControllerReadiness::Compute,
3408                            Readiness::Metrics(_) => ControllerReadiness::Metrics,
3409                            Readiness::Internal(_) => ControllerReadiness::Internal,
3410                            Readiness::NotReady => unreachable!("just signaled as ready"),
3411                        };
3412                        messages.push(Message::ControllerReady { controller });
3413                    }
3414                    // See [`appends::GroupCommitWaiter`] for notes on why this is cancel safe.
3415                    // Receive a single command.
3416                    permit = group_commit_rx.ready() => {
3417                        // If we happen to have batched exactly one user write, use
3418                        // that span so the `emit_trace_id_notice` hooks up.
3419                        // Otherwise, the best we can do is invent a new root span
3420                        // and make it follow from all the Spans in the pending
3421                        // writes.
3422                        let user_write_spans = self.pending_writes.iter().flat_map(|x| match x {
3423                            PendingWriteTxn::User{span, ..} => Some(span),
3424                            PendingWriteTxn::System{..} => None,
3425                        });
3426                        let span = match user_write_spans.exactly_one() {
3427                            Ok(span) => span.clone(),
3428                            Err(user_write_spans) => {
3429                                let span = info_span!(parent: None, "group_commit_notify");
3430                                for s in user_write_spans {
3431                                    span.follows_from(s);
3432                                }
3433                                span
3434                            }
3435                        };
3436                        messages.push(Message::GroupCommitInitiate(span, Some(permit)));
3437                    },
3438                    // `recv_many()` on `UnboundedReceiver` is cancellation safe:
3439                    // https://docs.rs/tokio/1.38.0/tokio/sync/mpsc/struct.UnboundedReceiver.html#cancel-safety-1
3440                    // Receive a batch of commands.
3441                    count = cmd_rx.recv_many(&mut cmd_messages, MESSAGE_BATCH) => {
3442                        if count == 0 {
3443                            break;
3444                        } else {
3445                            messages.extend(cmd_messages.drain(..).map(|(otel_ctx, cmd)| Message::Command(otel_ctx, cmd)));
3446                        }
3447                    },
3448                    // `recv()` on `UnboundedReceiver` is cancellation safe:
3449                    // https://docs.rs/tokio/1.38.0/tokio/sync/mpsc/struct.UnboundedReceiver.html#cancel-safety
3450                    // Receive a single command.
3451                    Some(pending_read_txn) = strict_serializable_reads_rx.recv() => {
3452                        let mut pending_read_txns = vec![pending_read_txn];
3453                        while let Ok(pending_read_txn) = strict_serializable_reads_rx.try_recv() {
3454                            pending_read_txns.push(pending_read_txn);
3455                        }
3456                        for (conn_id, pending_read_txn) in pending_read_txns {
3457                            let prev = self.pending_linearize_read_txns.insert(conn_id, pending_read_txn);
3458                            soft_assert_or_log!(
3459                                prev.is_none(),
3460                                "connections can not have multiple concurrent reads, prev: {prev:?}"
3461                            )
3462                        }
3463                        messages.push(Message::LinearizeReads);
3464                    }
3465                    // `tick()` on `Interval` is cancel-safe:
3466                    // https://docs.rs/tokio/1.19.2/tokio/time/struct.Interval.html#cancel-safety
3467                    // Receive a single command.
3468                    _ = self.advance_timelines_interval.tick() => {
3469                        let span = info_span!(parent: None, "coord::advance_timelines_interval");
3470                        span.follows_from(Span::current());
3471
3472                        // Group commit sends an `AdvanceTimelines` message when
3473                        // done, which is what downgrades read holds. In
3474                        // read-only mode we send this message directly because
3475                        // we're not doing group commits.
3476                        if self.controller.read_only() {
3477                            messages.push(Message::AdvanceTimelines);
3478                        } else {
3479                            messages.push(Message::GroupCommitInitiate(span, None));
3480                        }
3481                    },
3482                    // `tick()` on `Interval` is cancel-safe:
3483                    // https://docs.rs/tokio/1.19.2/tokio/time/struct.Interval.html#cancel-safety
3484                    // Receive a single command.
3485                    _ = self.check_cluster_scheduling_policies_interval.tick() => {
3486                        messages.push(Message::CheckSchedulingPolicies);
3487                    },
3488
3489                    // `tick()` on `Interval` is cancel-safe:
3490                    // https://docs.rs/tokio/1.19.2/tokio/time/struct.Interval.html#cancel-safety
3491                    // Receive a single command.
3492                    _ = self.caught_up_check_interval.tick() => {
3493                        // We do this directly on the main loop instead of
3494                        // firing off a message. We are still in read-only mode,
3495                        // so optimizing for latency, not blocking the main loop
3496                        // is not that important.
3497                        self.maybe_check_caught_up().await;
3498
3499                        continue;
3500                    },
3501
3502                    // Process the idle metric at the lowest priority to sample queue non-idle time.
3503                    // `recv()` on `Receiver` is cancellation safe:
3504                    // https://docs.rs/tokio/1.8.0/tokio/sync/mpsc/struct.Receiver.html#cancel-safety
3505                    // Receive a single command.
3506                    timer = idle_rx.recv() => {
3507                        timer.expect("does not drop").observe_duration();
3508                        self.metrics
3509                            .message_handling
3510                            .with_label_values(&["watchdog"])
3511                            .observe(0.0);
3512                        continue;
3513                    }
3514                };
3515
3516                // Observe the number of messages we're processing at once.
3517                message_batch.observe(f64::cast_lossy(messages.len()));
3518
3519                for msg in messages.drain(..) {
3520                    // All message processing functions trace. Start a parent span
3521                    // for them to make it easy to find slow messages.
3522                    let msg_kind = msg.kind();
3523                    let span = span!(
3524                        target: "mz_adapter::coord::handle_message_loop",
3525                        Level::INFO,
3526                        "coord::handle_message",
3527                        kind = msg_kind
3528                    );
3529                    let otel_context = span.context().span().span_context().clone();
3530
3531                    // Record the last kind of message in case we get stuck. For
3532                    // execute commands, we additionally stash the user's SQL,
3533                    // statement, so we can log it in case we get stuck.
3534                    *last_message.lock().expect("poisoned") = LastMessage {
3535                        kind: msg_kind,
3536                        stmt: match &msg {
3537                            Message::Command(
3538                                _,
3539                                Command::Execute {
3540                                    portal_name,
3541                                    session,
3542                                    ..
3543                                },
3544                            ) => session
3545                                .get_portal_unverified(portal_name)
3546                                .and_then(|p| p.stmt.as_ref().map(Arc::clone)),
3547                            _ => None,
3548                        },
3549                    };
3550
3551                    let start = Instant::now();
3552                    self.handle_message(msg).instrument(span).await;
3553                    let duration = start.elapsed();
3554
3555                    self.metrics
3556                        .message_handling
3557                        .with_label_values(&[msg_kind])
3558                        .observe(duration.as_secs_f64());
3559
3560                    // If something is _really_ slow, print a trace id for debugging, if OTEL is enabled.
3561                    if duration > warn_threshold {
3562                        let trace_id = otel_context.is_valid().then(|| otel_context.trace_id());
3563                        tracing::error!(
3564                            ?msg_kind,
3565                            ?trace_id,
3566                            ?duration,
3567                            "very slow coordinator message"
3568                        );
3569                    }
3570                }
3571            }
3572            // Try and cleanup as a best effort. There may be some async tasks out there holding a
3573            // reference that prevents us from cleaning up.
3574            if let Some(catalog) = Arc::into_inner(self.catalog) {
3575                catalog.expire().await;
3576            }
3577        }
3578        .boxed_local()
3579    }
3580
3581    /// Obtain a read-only Catalog reference.
3582    fn catalog(&self) -> &Catalog {
3583        &self.catalog
3584    }
3585
3586    /// Obtain a read-only Catalog snapshot, suitable for giving out to
3587    /// non-Coordinator thread tasks.
3588    fn owned_catalog(&self) -> Arc<Catalog> {
3589        Arc::clone(&self.catalog)
3590    }
3591
3592    /// Obtain a handle to the optimizer metrics, suitable for giving
3593    /// out to non-Coordinator thread tasks.
3594    fn optimizer_metrics(&self) -> OptimizerMetrics {
3595        self.optimizer_metrics.clone()
3596    }
3597
3598    /// Obtain a writeable Catalog reference.
3599    fn catalog_mut(&mut self) -> &mut Catalog {
3600        // make_mut will cause any other Arc references (from owned_catalog) to
3601        // continue to be valid by cloning the catalog, putting it in a new Arc,
3602        // which lives at self._catalog. If there are no other Arc references,
3603        // then no clone is made, and it returns a reference to the existing
3604        // object. This makes this method and owned_catalog both very cheap: at
3605        // most one clone per catalog mutation, but only if there's a read-only
3606        // reference to it.
3607        Arc::make_mut(&mut self.catalog)
3608    }
3609
3610    /// Obtain a reference to the coordinator's connection context.
3611    fn connection_context(&self) -> &ConnectionContext {
3612        self.controller.connection_context()
3613    }
3614
3615    /// Obtain a reference to the coordinator's secret reader, in an `Arc`.
3616    fn secrets_reader(&self) -> &Arc<dyn SecretsReader> {
3617        &self.connection_context().secrets_reader
3618    }
3619
3620    /// Publishes a notice message to all sessions.
3621    ///
3622    /// TODO(parkmycar): This code is dead, but is a nice parallel to [`Coordinator::broadcast_notice_tx`]
3623    /// so we keep it around.
3624    #[allow(dead_code)]
3625    pub(crate) fn broadcast_notice(&self, notice: AdapterNotice) {
3626        for meta in self.active_conns.values() {
3627            let _ = meta.notice_tx.send(notice.clone());
3628        }
3629    }
3630
3631    /// Returns a closure that will publish a notice to all sessions that were active at the time
3632    /// this method was called.
3633    pub(crate) fn broadcast_notice_tx(
3634        &self,
3635    ) -> Box<dyn FnOnce(AdapterNotice) -> () + Send + 'static> {
3636        let senders: Vec<_> = self
3637            .active_conns
3638            .values()
3639            .map(|meta| meta.notice_tx.clone())
3640            .collect();
3641        Box::new(move |notice| {
3642            for tx in senders {
3643                let _ = tx.send(notice.clone());
3644            }
3645        })
3646    }
3647
3648    pub(crate) fn active_conns(&self) -> &BTreeMap<ConnectionId, ConnMeta> {
3649        &self.active_conns
3650    }
3651
3652    #[instrument(level = "debug")]
3653    pub(crate) fn retire_execution(
3654        &mut self,
3655        reason: StatementEndedExecutionReason,
3656        ctx_extra: ExecuteContextExtra,
3657    ) {
3658        if let Some(uuid) = ctx_extra.retire() {
3659            self.end_statement_execution(uuid, reason);
3660        }
3661    }
3662
3663    /// Creates a new dataflow builder from the catalog and indexes in `self`.
3664    #[instrument(level = "debug")]
3665    pub fn dataflow_builder(&self, instance: ComputeInstanceId) -> DataflowBuilder<'_> {
3666        let compute = self
3667            .instance_snapshot(instance)
3668            .expect("compute instance does not exist");
3669        DataflowBuilder::new(self.catalog().state(), compute)
3670    }
3671
3672    /// Return a reference-less snapshot to the indicated compute instance.
3673    pub fn instance_snapshot(
3674        &self,
3675        id: ComputeInstanceId,
3676    ) -> Result<ComputeInstanceSnapshot, InstanceMissing> {
3677        ComputeInstanceSnapshot::new(&self.controller, id)
3678    }
3679
3680    /// Call into the compute controller to install a finalized dataflow, and
3681    /// initialize the read policies for its exported readable objects.
3682    pub(crate) async fn ship_dataflow(
3683        &mut self,
3684        dataflow: DataflowDescription<Plan>,
3685        instance: ComputeInstanceId,
3686        subscribe_target_replica: Option<ReplicaId>,
3687    ) {
3688        // We must only install read policies for indexes, not for sinks.
3689        // Sinks are write-only compute collections that don't have read policies.
3690        let export_ids = dataflow.exported_index_ids().collect();
3691
3692        self.controller
3693            .compute
3694            .create_dataflow(instance, dataflow, subscribe_target_replica)
3695            .unwrap_or_terminate("dataflow creation cannot fail");
3696
3697        self.initialize_compute_read_policies(export_ids, instance, CompactionWindow::Default)
3698            .await;
3699    }
3700
3701    /// Call into the compute controller to allow writes to the specified IDs
3702    /// from the specified instance. Calling this function multiple times and
3703    /// calling it on a read-only instance has no effect.
3704    pub(crate) fn allow_writes(&mut self, instance: ComputeInstanceId, id: GlobalId) {
3705        self.controller
3706            .compute
3707            .allow_writes(instance, id)
3708            .unwrap_or_terminate("allow_writes cannot fail");
3709    }
3710
3711    /// Like `ship_dataflow`, but also await on builtin table updates.
3712    pub(crate) async fn ship_dataflow_and_notice_builtin_table_updates(
3713        &mut self,
3714        dataflow: DataflowDescription<Plan>,
3715        instance: ComputeInstanceId,
3716        notice_builtin_updates_fut: Option<BuiltinTableAppendNotify>,
3717    ) {
3718        if let Some(notice_builtin_updates_fut) = notice_builtin_updates_fut {
3719            let ship_dataflow_fut = self.ship_dataflow(dataflow, instance, None);
3720            let ((), ()) =
3721                futures::future::join(notice_builtin_updates_fut, ship_dataflow_fut).await;
3722        } else {
3723            self.ship_dataflow(dataflow, instance, None).await;
3724        }
3725    }
3726
3727    /// Install a _watch set_ in the controller that is automatically associated with the given
3728    /// connection id. The watchset will be automatically cleared if the connection terminates
3729    /// before the watchset completes.
3730    pub fn install_compute_watch_set(
3731        &mut self,
3732        conn_id: ConnectionId,
3733        objects: BTreeSet<GlobalId>,
3734        t: Timestamp,
3735        state: WatchSetResponse,
3736    ) {
3737        let ws_id = self.controller.install_compute_watch_set(objects, t);
3738        self.connection_watch_sets
3739            .entry(conn_id.clone())
3740            .or_default()
3741            .insert(ws_id);
3742        self.installed_watch_sets.insert(ws_id, (conn_id, state));
3743    }
3744
3745    /// Install a _watch set_ in the controller that is automatically associated with the given
3746    /// connection id. The watchset will be automatically cleared if the connection terminates
3747    /// before the watchset completes.
3748    pub fn install_storage_watch_set(
3749        &mut self,
3750        conn_id: ConnectionId,
3751        objects: BTreeSet<GlobalId>,
3752        t: Timestamp,
3753        state: WatchSetResponse,
3754    ) {
3755        let ws_id = self.controller.install_storage_watch_set(objects, t);
3756        self.connection_watch_sets
3757            .entry(conn_id.clone())
3758            .or_default()
3759            .insert(ws_id);
3760        self.installed_watch_sets.insert(ws_id, (conn_id, state));
3761    }
3762
3763    /// Cancels pending watchsets associated with the provided connection id.
3764    pub fn cancel_pending_watchsets(&mut self, conn_id: &ConnectionId) {
3765        if let Some(ws_ids) = self.connection_watch_sets.remove(conn_id) {
3766            for ws_id in ws_ids {
3767                self.installed_watch_sets.remove(&ws_id);
3768            }
3769        }
3770    }
3771
3772    /// Returns the state of the [`Coordinator`] formatted as JSON.
3773    ///
3774    /// The returned value is not guaranteed to be stable and may change at any point in time.
3775    pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
3776        // Note: We purposefully use the `Debug` formatting for the value of all fields in the
3777        // returned object as a tradeoff between usability and stability. `serde_json` will fail
3778        // to serialize an object if the keys aren't strings, so `Debug` formatting the values
3779        // prevents a future unrelated change from silently breaking this method.
3780
3781        let global_timelines: BTreeMap<_, _> = self
3782            .global_timelines
3783            .iter()
3784            .map(|(timeline, state)| (timeline.to_string(), format!("{state:?}")))
3785            .collect();
3786        let active_conns: BTreeMap<_, _> = self
3787            .active_conns
3788            .iter()
3789            .map(|(id, meta)| (id.unhandled().to_string(), format!("{meta:?}")))
3790            .collect();
3791        let txn_read_holds: BTreeMap<_, _> = self
3792            .txn_read_holds
3793            .iter()
3794            .map(|(id, capability)| (id.unhandled().to_string(), format!("{capability:?}")))
3795            .collect();
3796        let pending_peeks: BTreeMap<_, _> = self
3797            .pending_peeks
3798            .iter()
3799            .map(|(id, peek)| (id.to_string(), format!("{peek:?}")))
3800            .collect();
3801        let client_pending_peeks: BTreeMap<_, _> = self
3802            .client_pending_peeks
3803            .iter()
3804            .map(|(id, peek)| {
3805                let peek: BTreeMap<_, _> = peek
3806                    .iter()
3807                    .map(|(uuid, storage_id)| (uuid.to_string(), storage_id))
3808                    .collect();
3809                (id.to_string(), peek)
3810            })
3811            .collect();
3812        let pending_linearize_read_txns: BTreeMap<_, _> = self
3813            .pending_linearize_read_txns
3814            .iter()
3815            .map(|(id, read_txn)| (id.unhandled().to_string(), format!("{read_txn:?}")))
3816            .collect();
3817
3818        let map = serde_json::Map::from_iter([
3819            (
3820                "global_timelines".to_string(),
3821                serde_json::to_value(global_timelines)?,
3822            ),
3823            (
3824                "active_conns".to_string(),
3825                serde_json::to_value(active_conns)?,
3826            ),
3827            (
3828                "txn_read_holds".to_string(),
3829                serde_json::to_value(txn_read_holds)?,
3830            ),
3831            (
3832                "pending_peeks".to_string(),
3833                serde_json::to_value(pending_peeks)?,
3834            ),
3835            (
3836                "client_pending_peeks".to_string(),
3837                serde_json::to_value(client_pending_peeks)?,
3838            ),
3839            (
3840                "pending_linearize_read_txns".to_string(),
3841                serde_json::to_value(pending_linearize_read_txns)?,
3842            ),
3843            ("controller".to_string(), self.controller.dump().await?),
3844        ]);
3845        Ok(serde_json::Value::Object(map))
3846    }
3847
3848    /// Prune all storage usage events from the [`MZ_STORAGE_USAGE_BY_SHARD`] table that are older
3849    /// than `retention_period`.
3850    ///
3851    /// This method will read the entire contents of [`MZ_STORAGE_USAGE_BY_SHARD`] into memory
3852    /// which can be expensive.
3853    ///
3854    /// DO NOT call this method outside of startup. The safety of reading at the current oracle read
3855    /// timestamp and then writing at whatever the current write timestamp is (instead of
3856    /// `read_ts + 1`) relies on the fact that there are no outstanding writes during startup.
3857    ///
3858    /// Group commit, which this method uses to write the retractions, has builtin fencing, and we
3859    /// never commit retractions to [`MZ_STORAGE_USAGE_BY_SHARD`] outside of this method, which is
3860    /// only called once during startup. So we don't have to worry about double/invalid retractions.
3861    async fn prune_storage_usage_events_on_startup(&self, retention_period: Duration) {
3862        let item_id = self
3863            .catalog()
3864            .resolve_builtin_table(&MZ_STORAGE_USAGE_BY_SHARD);
3865        let global_id = self.catalog.get_entry(&item_id).latest_global_id();
3866        let read_ts = self.get_local_read_ts().await;
3867        let current_contents_fut = self
3868            .controller
3869            .storage_collections
3870            .snapshot(global_id, read_ts);
3871        let internal_cmd_tx = self.internal_cmd_tx.clone();
3872        spawn(|| "storage_usage_prune", async move {
3873            let mut current_contents = current_contents_fut
3874                .await
3875                .unwrap_or_terminate("cannot fail to fetch snapshot");
3876            differential_dataflow::consolidation::consolidate(&mut current_contents);
3877
3878            let cutoff_ts = u128::from(read_ts).saturating_sub(retention_period.as_millis());
3879            let mut expired = Vec::new();
3880            for (row, diff) in current_contents {
3881                assert_eq!(
3882                    diff, 1,
3883                    "consolidated contents should not contain retractions: ({row:#?}, {diff:#?})"
3884                );
3885                // This logic relies on the definition of `mz_storage_usage_by_shard` not changing.
3886                let collection_timestamp = row
3887                    .unpack()
3888                    .get(3)
3889                    .expect("definition of mz_storage_by_shard changed")
3890                    .unwrap_timestamptz();
3891                let collection_timestamp = collection_timestamp.timestamp_millis();
3892                let collection_timestamp: u128 = collection_timestamp
3893                    .try_into()
3894                    .expect("all collections happen after Jan 1 1970");
3895                if collection_timestamp < cutoff_ts {
3896                    debug!("pruning storage event {row:?}");
3897                    let builtin_update = BuiltinTableUpdate::row(item_id, row, Diff::MINUS_ONE);
3898                    expired.push(builtin_update);
3899                }
3900            }
3901
3902            // main thread has shut down.
3903            let _ = internal_cmd_tx.send(Message::StorageUsagePrune(expired));
3904        });
3905    }
3906
3907    fn current_credit_consumption_rate(&self) -> Numeric {
3908        self.catalog()
3909            .user_cluster_replicas()
3910            .filter_map(|replica| match &replica.config.location {
3911                ReplicaLocation::Managed(location) => Some(location.size_for_billing()),
3912                ReplicaLocation::Unmanaged(_) => None,
3913            })
3914            .map(|size| {
3915                self.catalog()
3916                    .cluster_replica_sizes()
3917                    .0
3918                    .get(size)
3919                    .expect("location size is validated against the cluster replica sizes")
3920                    .credits_per_hour
3921            })
3922            .sum()
3923    }
3924}
3925
3926#[cfg(test)]
3927impl Coordinator {
3928    #[allow(dead_code)]
3929    async fn verify_ship_dataflow_no_error(&mut self, dataflow: DataflowDescription<Plan>) {
3930        // `ship_dataflow_new` is not allowed to have a `Result` return because this function is
3931        // called after `catalog_transact`, after which no errors are allowed. This test exists to
3932        // prevent us from incorrectly teaching those functions how to return errors (which has
3933        // happened twice and is the motivation for this test).
3934
3935        // An arbitrary compute instance ID to satisfy the function calls below. Note that
3936        // this only works because this function will never run.
3937        let compute_instance = ComputeInstanceId::user(1).expect("1 is a valid ID");
3938
3939        let _: () = self.ship_dataflow(dataflow, compute_instance, None).await;
3940    }
3941}
3942
3943/// Contains information about the last message the [`Coordinator`] processed.
3944struct LastMessage {
3945    kind: &'static str,
3946    stmt: Option<Arc<Statement<Raw>>>,
3947}
3948
3949impl LastMessage {
3950    /// Returns a redacted version of the statement that is safe for logs.
3951    fn stmt_to_string(&self) -> Cow<'static, str> {
3952        self.stmt
3953            .as_ref()
3954            .map(|stmt| stmt.to_ast_string_redacted().into())
3955            .unwrap_or(Cow::Borrowed("<none>"))
3956    }
3957}
3958
3959impl fmt::Debug for LastMessage {
3960    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3961        f.debug_struct("LastMessage")
3962            .field("kind", &self.kind)
3963            .field("stmt", &self.stmt_to_string())
3964            .finish()
3965    }
3966}
3967
3968impl Drop for LastMessage {
3969    fn drop(&mut self) {
3970        // Only print the last message if we're currently panicking, otherwise we'd spam our logs.
3971        if std::thread::panicking() {
3972            // If we're panicking theres no guarantee `tracing` still works, so print to stderr.
3973            eprintln!("Coordinator panicking, dumping last message\n{self:?}",);
3974        }
3975    }
3976}
3977
3978/// Serves the coordinator based on the provided configuration.
3979///
3980/// For a high-level description of the coordinator, see the [crate
3981/// documentation](crate).
3982///
3983/// Returns a handle to the coordinator and a client to communicate with the
3984/// coordinator.
3985///
3986/// BOXED FUTURE: As of Nov 2023 the returned Future from this function was 42KB. This would
3987/// get stored on the stack which is bad for runtime performance, and blow up our stack usage.
3988/// Because of that we purposefully move this Future onto the heap (i.e. Box it).
3989pub fn serve(
3990    Config {
3991        controller_config,
3992        controller_envd_epoch,
3993        mut storage,
3994        audit_logs_iterator,
3995        timestamp_oracle_url,
3996        unsafe_mode,
3997        all_features,
3998        build_info,
3999        environment_id,
4000        metrics_registry,
4001        now,
4002        secrets_controller,
4003        cloud_resource_controller,
4004        cluster_replica_sizes,
4005        builtin_system_cluster_config,
4006        builtin_catalog_server_cluster_config,
4007        builtin_probe_cluster_config,
4008        builtin_support_cluster_config,
4009        builtin_analytics_cluster_config,
4010        system_parameter_defaults,
4011        availability_zones,
4012        storage_usage_client,
4013        storage_usage_collection_interval,
4014        storage_usage_retention_period,
4015        segment_client,
4016        egress_addresses,
4017        aws_account_id,
4018        aws_privatelink_availability_zones,
4019        connection_context,
4020        connection_limit_callback,
4021        remote_system_parameters,
4022        webhook_concurrency_limit,
4023        http_host_name,
4024        tracing_handle,
4025        read_only_controllers,
4026        caught_up_trigger: clusters_caught_up_trigger,
4027        helm_chart_version,
4028        license_key,
4029        external_login_password_mz_system,
4030        force_builtin_schema_migration,
4031    }: Config,
4032) -> BoxFuture<'static, Result<(Handle, Client), AdapterError>> {
4033    async move {
4034        let coord_start = Instant::now();
4035        info!("startup: coordinator init: beginning");
4036        info!("startup: coordinator init: preamble beginning");
4037
4038        // Initializing the builtins can be an expensive process and consume a lot of memory. We
4039        // forcibly initialize it early while the stack is relatively empty to avoid stack
4040        // overflows later.
4041        let _builtins = LazyLock::force(&BUILTINS_STATIC);
4042
4043        let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
4044        let (internal_cmd_tx, internal_cmd_rx) = mpsc::unbounded_channel();
4045        let (strict_serializable_reads_tx, strict_serializable_reads_rx) =
4046            mpsc::unbounded_channel();
4047
4048        // Validate and process availability zones.
4049        if !availability_zones.iter().all_unique() {
4050            coord_bail!("availability zones must be unique");
4051        }
4052
4053        let aws_principal_context = match (
4054            aws_account_id,
4055            connection_context.aws_external_id_prefix.clone(),
4056        ) {
4057            (Some(aws_account_id), Some(aws_external_id_prefix)) => Some(AwsPrincipalContext {
4058                aws_account_id,
4059                aws_external_id_prefix,
4060            }),
4061            _ => None,
4062        };
4063
4064        let aws_privatelink_availability_zones = aws_privatelink_availability_zones
4065            .map(|azs_vec| BTreeSet::from_iter(azs_vec.iter().cloned()));
4066
4067        info!(
4068            "startup: coordinator init: preamble complete in {:?}",
4069            coord_start.elapsed()
4070        );
4071        let oracle_init_start = Instant::now();
4072        info!("startup: coordinator init: timestamp oracle init beginning");
4073
4074        let pg_timestamp_oracle_config = timestamp_oracle_url
4075            .map(|pg_url| PostgresTimestampOracleConfig::new(&pg_url, &metrics_registry));
4076        let mut initial_timestamps =
4077            get_initial_oracle_timestamps(&pg_timestamp_oracle_config).await?;
4078
4079        // Insert an entry for the `EpochMilliseconds` timeline if one doesn't exist,
4080        // which will ensure that the timeline is initialized since it's required
4081        // by the system.
4082        initial_timestamps
4083            .entry(Timeline::EpochMilliseconds)
4084            .or_insert_with(mz_repr::Timestamp::minimum);
4085        let mut timestamp_oracles = BTreeMap::new();
4086        for (timeline, initial_timestamp) in initial_timestamps {
4087            Coordinator::ensure_timeline_state_with_initial_time(
4088                &timeline,
4089                initial_timestamp,
4090                now.clone(),
4091                pg_timestamp_oracle_config.clone(),
4092                &mut timestamp_oracles,
4093                read_only_controllers,
4094            )
4095            .await;
4096        }
4097
4098        // Opening the durable catalog uses one or more timestamps without communicating with
4099        // the timestamp oracle. Here we make sure to apply the catalog upper with the timestamp
4100        // oracle to linearize future operations with opening the catalog.
4101        let catalog_upper = storage.current_upper().await;
4102        // Choose a time at which to boot. This is used, for example, to prune
4103        // old storage usage data or migrate audit log entries.
4104        //
4105        // This time is usually the current system time, but with protection
4106        // against backwards time jumps, even across restarts.
4107        let epoch_millis_oracle = &timestamp_oracles
4108            .get(&Timeline::EpochMilliseconds)
4109            .expect("inserted above")
4110            .oracle;
4111
4112        let mut boot_ts = if read_only_controllers {
4113            let read_ts = epoch_millis_oracle.read_ts().await;
4114            std::cmp::max(read_ts, catalog_upper)
4115        } else {
4116            // Getting/applying a write timestamp bumps the write timestamp in the
4117            // oracle, which we're not allowed in read-only mode.
4118            epoch_millis_oracle.apply_write(catalog_upper).await;
4119            epoch_millis_oracle.write_ts().await.timestamp
4120        };
4121
4122        info!(
4123            "startup: coordinator init: timestamp oracle init complete in {:?}",
4124            oracle_init_start.elapsed()
4125        );
4126
4127        let catalog_open_start = Instant::now();
4128        info!("startup: coordinator init: catalog open beginning");
4129        let persist_client = controller_config
4130            .persist_clients
4131            .open(controller_config.persist_location.clone())
4132            .await
4133            .context("opening persist client")?;
4134        let builtin_item_migration_config =
4135            BuiltinItemMigrationConfig {
4136                persist_client: persist_client.clone(),
4137                read_only: read_only_controllers,
4138                force_migration: force_builtin_schema_migration,
4139            }
4140        ;
4141        let OpenCatalogResult {
4142            mut catalog,
4143            migrated_storage_collections_0dt,
4144            new_builtin_collections,
4145            builtin_table_updates,
4146            cached_global_exprs,
4147            uncached_local_exprs,
4148        } = Catalog::open(mz_catalog::config::Config {
4149            storage,
4150            metrics_registry: &metrics_registry,
4151            state: mz_catalog::config::StateConfig {
4152                unsafe_mode,
4153                all_features,
4154                build_info,
4155                deploy_generation: controller_config.deploy_generation,
4156                environment_id: environment_id.clone(),
4157                read_only: read_only_controllers,
4158                now: now.clone(),
4159                boot_ts: boot_ts.clone(),
4160                skip_migrations: false,
4161                cluster_replica_sizes,
4162                builtin_system_cluster_config,
4163                builtin_catalog_server_cluster_config,
4164                builtin_probe_cluster_config,
4165                builtin_support_cluster_config,
4166                builtin_analytics_cluster_config,
4167                system_parameter_defaults,
4168                remote_system_parameters,
4169                availability_zones,
4170                egress_addresses,
4171                aws_principal_context,
4172                aws_privatelink_availability_zones,
4173                connection_context,
4174                http_host_name,
4175                builtin_item_migration_config,
4176                persist_client: persist_client.clone(),
4177                enable_expression_cache_override: None,
4178                helm_chart_version,
4179                external_login_password_mz_system,
4180                license_key: license_key.clone(),
4181            },
4182        })
4183        .await?;
4184
4185        // Opening the catalog uses one or more timestamps, so push the boot timestamp up to the
4186        // current catalog upper.
4187        let catalog_upper = catalog.current_upper().await;
4188        boot_ts = std::cmp::max(boot_ts, catalog_upper);
4189
4190        if !read_only_controllers {
4191            epoch_millis_oracle.apply_write(boot_ts).await;
4192        }
4193
4194        info!(
4195            "startup: coordinator init: catalog open complete in {:?}",
4196            catalog_open_start.elapsed()
4197        );
4198
4199        let coord_thread_start = Instant::now();
4200        info!("startup: coordinator init: coordinator thread start beginning");
4201
4202        let session_id = catalog.config().session_id;
4203        let start_instant = catalog.config().start_instant;
4204
4205        // In order for the coordinator to support Rc and Refcell types, it cannot be
4206        // sent across threads. Spawn it in a thread and have this parent thread wait
4207        // for bootstrap completion before proceeding.
4208        let (bootstrap_tx, bootstrap_rx) = oneshot::channel();
4209        let handle = TokioHandle::current();
4210
4211        let metrics = Metrics::register_into(&metrics_registry);
4212        let metrics_clone = metrics.clone();
4213        let optimizer_metrics = OptimizerMetrics::register_into(
4214            &metrics_registry,
4215            catalog.system_config().optimizer_e2e_latency_warning_threshold(),
4216        );
4217        let segment_client_clone = segment_client.clone();
4218        let coord_now = now.clone();
4219        let advance_timelines_interval = tokio::time::interval(catalog.config().timestamp_interval);
4220        let mut check_scheduling_policies_interval = tokio::time::interval(
4221            catalog
4222                .system_config()
4223                .cluster_check_scheduling_policies_interval(),
4224        );
4225        check_scheduling_policies_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
4226
4227        let clusters_caught_up_check_interval = if read_only_controllers {
4228            let dyncfgs = catalog.system_config().dyncfgs();
4229            let interval = WITH_0DT_DEPLOYMENT_CAUGHT_UP_CHECK_INTERVAL.get(dyncfgs);
4230
4231            let mut interval = tokio::time::interval(interval);
4232            interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
4233            interval
4234        } else {
4235            // When not in read-only mode, we don't do hydration checks. But we
4236            // still have to provide _some_ interval. This is large enough that
4237            // it doesn't matter.
4238            //
4239            // TODO(aljoscha): We cannot use Duration::MAX right now because of
4240            // https://github.com/tokio-rs/tokio/issues/6634. Use that once it's
4241            // fixed for good.
4242            let mut interval = tokio::time::interval(Duration::from_secs(60 * 60));
4243            interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
4244            interval
4245        };
4246
4247        let clusters_caught_up_check =
4248            clusters_caught_up_trigger.map(|trigger| CaughtUpCheckContext {
4249                trigger,
4250                exclude_collections: new_builtin_collections.into_iter().collect(),
4251            });
4252
4253        if let Some(config) = pg_timestamp_oracle_config.as_ref() {
4254            // Apply settings from system vars as early as possible because some
4255            // of them are locked in right when an oracle is first opened!
4256            let pg_timestamp_oracle_params =
4257                flags::pg_timstamp_oracle_config(catalog.system_config());
4258            pg_timestamp_oracle_params.apply(config);
4259        }
4260
4261        // Register a callback so whenever the MAX_CONNECTIONS or SUPERUSER_RESERVED_CONNECTIONS
4262        // system variables change, we update our connection limits.
4263        let connection_limit_callback: Arc<dyn Fn(&SystemVars) + Send + Sync> =
4264            Arc::new(move |system_vars: &SystemVars| {
4265                let limit: u64 = system_vars.max_connections().cast_into();
4266                let superuser_reserved: u64 =
4267                    system_vars.superuser_reserved_connections().cast_into();
4268
4269                // If superuser_reserved > max_connections, prefer max_connections.
4270                //
4271                // In this scenario all normal users would be locked out because all connections
4272                // would be reserved for superusers so complain if this is the case.
4273                let superuser_reserved = if superuser_reserved >= limit {
4274                    tracing::warn!(
4275                        "superuser_reserved ({superuser_reserved}) is greater than max connections ({limit})!"
4276                    );
4277                    limit
4278                } else {
4279                    superuser_reserved
4280                };
4281
4282                (connection_limit_callback)(limit, superuser_reserved);
4283            });
4284        catalog.system_config_mut().register_callback(
4285            &mz_sql::session::vars::MAX_CONNECTIONS,
4286            Arc::clone(&connection_limit_callback),
4287        );
4288        catalog.system_config_mut().register_callback(
4289            &mz_sql::session::vars::SUPERUSER_RESERVED_CONNECTIONS,
4290            connection_limit_callback,
4291        );
4292
4293        let (group_commit_tx, group_commit_rx) = appends::notifier();
4294
4295        let parent_span = tracing::Span::current();
4296        let thread = thread::Builder::new()
4297            // The Coordinator thread tends to keep a lot of data on its stack. To
4298            // prevent a stack overflow we allocate a stack three times as big as the default
4299            // stack.
4300            .stack_size(3 * stack::STACK_SIZE)
4301            .name("coordinator".to_string())
4302            .spawn(move || {
4303                let span = info_span!(parent: parent_span, "coord::coordinator").entered();
4304
4305                let controller = handle
4306                    .block_on({
4307                        catalog.initialize_controller(
4308                            controller_config,
4309                            controller_envd_epoch,
4310                            read_only_controllers,
4311                        )
4312                    })
4313                    .unwrap_or_terminate("failed to initialize storage_controller");
4314                // Initializing the controller uses one or more timestamps, so push the boot timestamp up to the
4315                // current catalog upper.
4316                let catalog_upper = handle.block_on(catalog.current_upper());
4317                boot_ts = std::cmp::max(boot_ts, catalog_upper);
4318                if !read_only_controllers {
4319                    let epoch_millis_oracle = &timestamp_oracles
4320                        .get(&Timeline::EpochMilliseconds)
4321                        .expect("inserted above")
4322                        .oracle;
4323                    handle.block_on(epoch_millis_oracle.apply_write(boot_ts));
4324                }
4325
4326                let catalog = Arc::new(catalog);
4327
4328                let caching_secrets_reader = CachingSecretsReader::new(secrets_controller.reader());
4329                let mut coord = Coordinator {
4330                    controller,
4331                    catalog,
4332                    internal_cmd_tx,
4333                    group_commit_tx,
4334                    strict_serializable_reads_tx,
4335                    global_timelines: timestamp_oracles,
4336                    transient_id_gen: Arc::new(TransientIdGen::new()),
4337                    active_conns: BTreeMap::new(),
4338                    txn_read_holds: Default::default(),
4339                    pending_peeks: BTreeMap::new(),
4340                    client_pending_peeks: BTreeMap::new(),
4341                    pending_linearize_read_txns: BTreeMap::new(),
4342                    serialized_ddl: LockedVecDeque::new(),
4343                    active_compute_sinks: BTreeMap::new(),
4344                    active_webhooks: BTreeMap::new(),
4345                    active_copies: BTreeMap::new(),
4346                    staged_cancellation: BTreeMap::new(),
4347                    introspection_subscribes: BTreeMap::new(),
4348                    write_locks: BTreeMap::new(),
4349                    deferred_write_ops: BTreeMap::new(),
4350                    pending_writes: Vec::new(),
4351                    advance_timelines_interval,
4352                    secrets_controller,
4353                    caching_secrets_reader,
4354                    cloud_resource_controller,
4355                    storage_usage_client,
4356                    storage_usage_collection_interval,
4357                    segment_client,
4358                    metrics,
4359                    optimizer_metrics,
4360                    tracing_handle,
4361                    statement_logging: StatementLogging::new(coord_now.clone()),
4362                    webhook_concurrency_limit,
4363                    pg_timestamp_oracle_config,
4364                    check_cluster_scheduling_policies_interval: check_scheduling_policies_interval,
4365                    cluster_scheduling_decisions: BTreeMap::new(),
4366                    caught_up_check_interval: clusters_caught_up_check_interval,
4367                    caught_up_check: clusters_caught_up_check,
4368                    installed_watch_sets: BTreeMap::new(),
4369                    connection_watch_sets: BTreeMap::new(),
4370                    cluster_replica_statuses: ClusterReplicaStatuses::new(),
4371                    read_only_controllers,
4372                    buffered_builtin_table_updates: Some(Vec::new()),
4373                    license_key,
4374                    persist_client,
4375                };
4376                let bootstrap = handle.block_on(async {
4377                    coord
4378                        .bootstrap(
4379                            boot_ts,
4380                            migrated_storage_collections_0dt,
4381                            builtin_table_updates,
4382                            cached_global_exprs,
4383                            uncached_local_exprs,
4384                            audit_logs_iterator,
4385                        )
4386                        .await?;
4387                    coord
4388                        .controller
4389                        .remove_orphaned_replicas(
4390                            coord.catalog().get_next_user_replica_id().await?,
4391                            coord.catalog().get_next_system_replica_id().await?,
4392                        )
4393                        .await
4394                        .map_err(AdapterError::Orchestrator)?;
4395
4396                    if let Some(retention_period) = storage_usage_retention_period {
4397                        coord
4398                            .prune_storage_usage_events_on_startup(retention_period)
4399                            .await;
4400                    }
4401
4402                    Ok(())
4403                });
4404                let ok = bootstrap.is_ok();
4405                drop(span);
4406                bootstrap_tx
4407                    .send(bootstrap)
4408                    .expect("bootstrap_rx is not dropped until it receives this message");
4409                if ok {
4410                    handle.block_on(coord.serve(
4411                        internal_cmd_rx,
4412                        strict_serializable_reads_rx,
4413                        cmd_rx,
4414                        group_commit_rx,
4415                    ));
4416                }
4417            })
4418            .expect("failed to create coordinator thread");
4419        match bootstrap_rx
4420            .await
4421            .expect("bootstrap_tx always sends a message or panics/halts")
4422        {
4423            Ok(()) => {
4424                info!(
4425                    "startup: coordinator init: coordinator thread start complete in {:?}",
4426                    coord_thread_start.elapsed()
4427                );
4428                info!(
4429                    "startup: coordinator init: complete in {:?}",
4430                    coord_start.elapsed()
4431                );
4432                let handle = Handle {
4433                    session_id,
4434                    start_instant,
4435                    _thread: thread.join_on_drop(),
4436                };
4437                let client = Client::new(
4438                    build_info,
4439                    cmd_tx.clone(),
4440                    metrics_clone,
4441                    now,
4442                    environment_id,
4443                    segment_client_clone,
4444                );
4445                Ok((handle, client))
4446            }
4447            Err(e) => Err(e),
4448        }
4449    }
4450    .boxed()
4451}
4452
4453// Determines and returns the highest timestamp for each timeline, for all known
4454// timestamp oracle implementations.
4455//
4456// Initially, we did this so that we can switch between implementations of
4457// timestamp oracle, but now we also do this to determine a monotonic boot
4458// timestamp, a timestamp that does not regress across reboots.
4459//
4460// This mostly works, but there can be linearizability violations, because there
4461// is no central moment where we do distributed coordination for all oracle
4462// types. Working around this seems prohibitively hard, maybe even impossible so
4463// we have to live with this window of potential violations during the upgrade
4464// window (which is the only point where we should switch oracle
4465// implementations).
4466async fn get_initial_oracle_timestamps(
4467    pg_timestamp_oracle_config: &Option<PostgresTimestampOracleConfig>,
4468) -> Result<BTreeMap<Timeline, Timestamp>, AdapterError> {
4469    let mut initial_timestamps = BTreeMap::new();
4470
4471    if let Some(pg_timestamp_oracle_config) = pg_timestamp_oracle_config {
4472        let postgres_oracle_timestamps =
4473            PostgresTimestampOracle::<NowFn>::get_all_timelines(pg_timestamp_oracle_config.clone())
4474                .await?;
4475
4476        let debug_msg = || {
4477            postgres_oracle_timestamps
4478                .iter()
4479                .map(|(timeline, ts)| format!("{:?} -> {}", timeline, ts))
4480                .join(", ")
4481        };
4482        info!(
4483            "current timestamps from the postgres-backed timestamp oracle: {}",
4484            debug_msg()
4485        );
4486
4487        for (timeline, ts) in postgres_oracle_timestamps {
4488            let entry = initial_timestamps
4489                .entry(Timeline::from_str(&timeline).expect("could not parse timeline"));
4490
4491            entry
4492                .and_modify(|current_ts| *current_ts = std::cmp::max(*current_ts, ts))
4493                .or_insert(ts);
4494        }
4495    } else {
4496        info!("no postgres url for postgres-backed timestamp oracle configured!");
4497    };
4498
4499    let debug_msg = || {
4500        initial_timestamps
4501            .iter()
4502            .map(|(timeline, ts)| format!("{:?}: {}", timeline, ts))
4503            .join(", ")
4504    };
4505    info!("initial oracle timestamps: {}", debug_msg());
4506
4507    Ok(initial_timestamps)
4508}
4509
4510#[instrument]
4511pub async fn load_remote_system_parameters(
4512    storage: &mut Box<dyn OpenableDurableCatalogState>,
4513    system_parameter_sync_config: Option<SystemParameterSyncConfig>,
4514    system_parameter_sync_timeout: Duration,
4515) -> Result<Option<BTreeMap<String, String>>, AdapterError> {
4516    if let Some(system_parameter_sync_config) = system_parameter_sync_config {
4517        tracing::info!("parameter sync on boot: start sync");
4518
4519        // We intentionally block initial startup, potentially forever,
4520        // on initializing LaunchDarkly. This may seem scary, but the
4521        // alternative is even scarier. Over time, we expect that the
4522        // compiled-in default values for the system parameters will
4523        // drift substantially from the defaults configured in
4524        // LaunchDarkly, to the point that starting an environment
4525        // without loading the latest values from LaunchDarkly will
4526        // result in running an untested configuration.
4527        //
4528        // Note this only applies during initial startup. Restarting
4529        // after we've synced once only blocks for a maximum of
4530        // `FRONTEND_SYNC_TIMEOUT` on LaunchDarkly, as it seems
4531        // reasonable to assume that the last-synced configuration was
4532        // valid enough.
4533        //
4534        // This philosophy appears to provide a good balance between not
4535        // running untested configurations in production while also not
4536        // making LaunchDarkly a "tier 1" dependency for existing
4537        // environments.
4538        //
4539        // If this proves to be an issue, we could seek to address the
4540        // configuration drift in a different way--for example, by
4541        // writing a script that runs in CI nightly and checks for
4542        // deviation between the compiled Rust code and LaunchDarkly.
4543        //
4544        // If it is absolutely necessary to bring up a new environment
4545        // while LaunchDarkly is down, the following manual mitigation
4546        // can be performed:
4547        //
4548        //    1. Edit the environmentd startup parameters to omit the
4549        //       LaunchDarkly configuration.
4550        //    2. Boot environmentd.
4551        //    3. Use the catalog-debug tool to run `edit config "{\"key\":\"system_config_synced\"}" "{\"value\": 1}"`.
4552        //    4. Adjust any other parameters as necessary to avoid
4553        //       running a nonstandard configuration in production.
4554        //    5. Edit the environmentd startup parameters to restore the
4555        //       LaunchDarkly configuration, for when LaunchDarkly comes
4556        //       back online.
4557        //    6. Reboot environmentd.
4558        let mut params = SynchronizedParameters::new(SystemVars::default());
4559        let frontend_sync = async {
4560            let frontend = SystemParameterFrontend::from(&system_parameter_sync_config).await?;
4561            frontend.pull(&mut params);
4562            let ops = params
4563                .modified()
4564                .into_iter()
4565                .map(|param| {
4566                    let name = param.name;
4567                    let value = param.value;
4568                    tracing::info!(name, value, initial = true, "sync parameter");
4569                    (name, value)
4570                })
4571                .collect();
4572            tracing::info!("parameter sync on boot: end sync");
4573            Ok(Some(ops))
4574        };
4575        if !storage.has_system_config_synced_once().await? {
4576            frontend_sync.await
4577        } else {
4578            match mz_ore::future::timeout(system_parameter_sync_timeout, frontend_sync).await {
4579                Ok(ops) => Ok(ops),
4580                Err(TimeoutError::Inner(e)) => Err(e),
4581                Err(TimeoutError::DeadlineElapsed) => {
4582                    tracing::info!("parameter sync on boot: sync has timed out");
4583                    Ok(None)
4584                }
4585            }
4586        }
4587    } else {
4588        Ok(None)
4589    }
4590}
4591
4592#[derive(Debug)]
4593pub enum WatchSetResponse {
4594    StatementDependenciesReady(StatementLoggingId, StatementLifecycleEvent),
4595    AlterSinkReady(AlterSinkReadyContext),
4596}
4597
4598#[derive(Debug)]
4599pub struct AlterSinkReadyContext {
4600    ctx: Option<ExecuteContext>,
4601    otel_ctx: OpenTelemetryContext,
4602    plan: AlterSinkPlan,
4603    plan_validity: PlanValidity,
4604    read_hold: ReadHolds<Timestamp>,
4605}
4606
4607impl AlterSinkReadyContext {
4608    fn ctx(&mut self) -> &mut ExecuteContext {
4609        self.ctx.as_mut().expect("only cleared on drop")
4610    }
4611
4612    fn retire(mut self, result: Result<ExecuteResponse, AdapterError>) {
4613        self.ctx
4614            .take()
4615            .expect("only cleared on drop")
4616            .retire(result);
4617    }
4618}
4619
4620impl Drop for AlterSinkReadyContext {
4621    fn drop(&mut self) {
4622        if let Some(ctx) = self.ctx.take() {
4623            ctx.retire(Err(AdapterError::Canceled));
4624        }
4625    }
4626}
4627
4628/// A struct for tracking the ownership of a lock and a VecDeque to store to-be-done work after the
4629/// lock is freed.
4630#[derive(Debug)]
4631struct LockedVecDeque<T> {
4632    items: VecDeque<T>,
4633    lock: Arc<tokio::sync::Mutex<()>>,
4634}
4635
4636impl<T> LockedVecDeque<T> {
4637    pub fn new() -> Self {
4638        Self {
4639            items: VecDeque::new(),
4640            lock: Arc::new(tokio::sync::Mutex::new(())),
4641        }
4642    }
4643
4644    pub fn try_lock_owned(&self) -> Result<OwnedMutexGuard<()>, tokio::sync::TryLockError> {
4645        Arc::clone(&self.lock).try_lock_owned()
4646    }
4647
4648    pub fn is_empty(&self) -> bool {
4649        self.items.is_empty()
4650    }
4651
4652    pub fn push_back(&mut self, value: T) {
4653        self.items.push_back(value)
4654    }
4655
4656    pub fn pop_front(&mut self) -> Option<T> {
4657        self.items.pop_front()
4658    }
4659
4660    pub fn remove(&mut self, index: usize) -> Option<T> {
4661        self.items.remove(index)
4662    }
4663
4664    pub fn iter(&self) -> std::collections::vec_deque::Iter<'_, T> {
4665        self.items.iter()
4666    }
4667}
4668
4669#[derive(Debug)]
4670struct DeferredPlanStatement {
4671    ctx: ExecuteContext,
4672    ps: PlanStatement,
4673}
4674
4675#[derive(Debug)]
4676enum PlanStatement {
4677    Statement {
4678        stmt: Arc<Statement<Raw>>,
4679        params: Params,
4680    },
4681    Plan {
4682        plan: mz_sql::plan::Plan,
4683        resolved_ids: ResolvedIds,
4684    },
4685}
4686
4687#[derive(Debug, Error)]
4688pub enum NetworkPolicyError {
4689    #[error("Access denied for address {0}")]
4690    AddressDenied(IpAddr),
4691    #[error("Access denied missing IP address")]
4692    MissingIp,
4693}
4694
4695pub(crate) fn validate_ip_with_policy_rules(
4696    ip: &IpAddr,
4697    rules: &Vec<NetworkPolicyRule>,
4698) -> Result<(), NetworkPolicyError> {
4699    // At the moment we're not handling action or direction
4700    // as those are only able to be "allow" and "ingress" respectively
4701    if rules.iter().any(|r| r.address.0.contains(ip)) {
4702        Ok(())
4703    } else {
4704        Err(NetworkPolicyError::AddressDenied(ip.clone()))
4705    }
4706}