Skip to main content

mz_adapter/
coord.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Translation of SQL commands into timestamped `Controller` commands.
11//!
12//! The various SQL commands instruct the system to take actions that are not
13//! yet explicitly timestamped. On the other hand, the underlying data continually
14//! change as time moves forward. On the third hand, we greatly benefit from the
15//! information that some times are no longer of interest, so that we may
16//! compact the representation of the continually changing collections.
17//!
18//! The [`Coordinator`] curates these interactions by observing the progress
19//! collections make through time, choosing timestamps for its own commands,
20//! and eventually communicating that certain times have irretrievably "passed".
21//!
22//! ## Frontiers another way
23//!
24//! If the above description of frontiers left you with questions, this
25//! repackaged explanation might help.
26//!
27//! - `since` is the least recent time (i.e. oldest time) that you can read
28//!   from sources and be guaranteed that the returned data is accurate as of
29//!   that time.
30//!
31//!   Reads at times less than `since` may return values that were not actually
32//!   seen at the specified time, but arrived later (i.e. the results are
33//!   compacted).
34//!
35//!   For correctness' sake, the coordinator never chooses to read at a time
36//!   less than an arrangement's `since`.
37//!
38//! - `upper` is the first time after the most recent time that you can read
39//!   from sources and receive an immediate response. Alternately, it is the
40//!   least time at which the data may still change (that is the reason we may
41//!   not be able to respond immediately).
42//!
43//!   Reads at times >= `upper` may not immediately return because the answer
44//!   isn't known yet. However, once the `upper` is > the specified read time,
45//!   the read can return.
46//!
47//!   For the sake of returned values' freshness, the coordinator prefers
48//!   performing reads at an arrangement's `upper`. However, because we more
49//!   strongly prefer correctness, the coordinator will choose timestamps
50//!   greater than an object's `upper` if it is also being accessed alongside
51//!   objects whose `since` times are >= its `upper`.
52//!
53//! This illustration attempts to show, with time moving left to right, the
54//! relationship between `since` and `upper`.
55//!
56//! - `#`: possibly inaccurate results
57//! - `-`: immediate, correct response
58//! - `?`: not yet known
59//! - `s`: since
60//! - `u`: upper
61//! - `|`: eligible for coordinator to select
62//!
63//! ```nofmt
64//! ####s----u?????
65//!     |||||||||||
66//! ```
67//!
68
69use std::borrow::Cow;
70use std::collections::{BTreeMap, BTreeSet, VecDeque};
71use std::fmt;
72use std::net::IpAddr;
73use std::num::NonZeroI64;
74use std::ops::Neg;
75use std::str::FromStr;
76use std::sync::LazyLock;
77use std::sync::{Arc, Mutex};
78use std::thread;
79use std::time::{Duration, Instant};
80
81use anyhow::Context;
82use chrono::{DateTime, Utc};
83use derivative::Derivative;
84use differential_dataflow::lattice::Lattice;
85use fail::fail_point;
86use futures::StreamExt;
87use futures::future::{BoxFuture, FutureExt, LocalBoxFuture};
88use http::Uri;
89use ipnet::IpNet;
90use itertools::{Either, Itertools};
91use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
92use mz_adapter_types::compaction::CompactionWindow;
93use mz_adapter_types::connection::ConnectionId;
94use mz_adapter_types::dyncfgs::WITH_0DT_DEPLOYMENT_CAUGHT_UP_CHECK_INTERVAL;
95use mz_auth::password::Password;
96use mz_build_info::BuildInfo;
97use mz_catalog::builtin::{BUILTINS, BUILTINS_STATIC, MZ_AUDIT_EVENTS, MZ_STORAGE_USAGE_BY_SHARD};
98use mz_catalog::config::{AwsPrincipalContext, BuiltinItemMigrationConfig, ClusterReplicaSizeMap};
99use mz_catalog::durable::{AuditLogIterator, OpenableDurableCatalogState};
100use mz_catalog::expr_cache::{GlobalExpressions, LocalExpressions};
101use mz_catalog::memory::objects::{
102    CatalogEntry, CatalogItem, ClusterReplicaProcessStatus, ClusterVariantManaged, Connection,
103    DataSourceDesc, StateDiff, StateUpdate, StateUpdateKind, Table, TableDataSource,
104};
105use mz_cloud_resources::{CloudResourceController, VpcEndpointConfig, VpcEndpointEvent};
106use mz_compute_client::as_of_selection;
107use mz_compute_client::controller::error::{
108    CollectionLookupError, CollectionMissing, DataflowCreationError, InstanceMissing,
109};
110use mz_compute_types::ComputeInstanceId;
111use mz_compute_types::dataflows::DataflowDescription;
112use mz_compute_types::plan::Plan;
113use mz_controller::clusters::{
114    ClusterConfig, ClusterEvent, ClusterStatus, ProcessId, ReplicaLocation,
115};
116use mz_controller::{ControllerConfig, Readiness};
117use mz_controller_types::{ClusterId, ReplicaId, WatchSetId};
118use mz_expr::{MapFilterProject, MirRelationExpr, OptimizedMirRelationExpr, RowSetFinishing};
119use mz_license_keys::ValidatedLicenseKey;
120use mz_orchestrator::OfflineReason;
121use mz_ore::cast::{CastFrom, CastInto, CastLossy};
122use mz_ore::channel::trigger::Trigger;
123use mz_ore::future::TimeoutError;
124use mz_ore::metrics::MetricsRegistry;
125use mz_ore::now::{EpochMillis, NowFn};
126use mz_ore::task::{JoinHandle, spawn};
127use mz_ore::thread::JoinHandleExt;
128use mz_ore::tracing::{OpenTelemetryContext, TracingHandle};
129use mz_ore::url::SensitiveUrl;
130use mz_ore::{assert_none, instrument, soft_assert_eq_or_log, soft_assert_or_log, stack};
131use mz_persist_client::PersistClient;
132use mz_persist_client::batch::ProtoBatch;
133use mz_persist_client::usage::{ShardsUsageReferenced, StorageUsageClient};
134use mz_repr::adt::numeric::Numeric;
135use mz_repr::explain::{ExplainConfig, ExplainFormat};
136use mz_repr::global_id::TransientIdGen;
137use mz_repr::optimize::OptimizerFeatures;
138use mz_repr::role_id::RoleId;
139use mz_repr::{CatalogItemId, Diff, GlobalId, RelationDesc, SqlRelationType, Timestamp};
140use mz_secrets::cache::CachingSecretsReader;
141use mz_secrets::{SecretsController, SecretsReader};
142use mz_sql::ast::{Raw, Statement};
143use mz_sql::catalog::{CatalogCluster, EnvironmentId};
144use mz_sql::names::{QualifiedItemName, ResolvedIds, SchemaSpecifier};
145use mz_sql::optimizer_metrics::OptimizerMetrics;
146use mz_sql::plan::{
147    self, AlterSinkPlan, ConnectionDetails, CreateConnectionPlan, HirRelationExpr,
148    NetworkPolicyRule, OnTimeoutAction, Params, QueryWhen,
149};
150use mz_sql::session::user::User;
151use mz_sql::session::vars::{MAX_CREDIT_CONSUMPTION_RATE, SystemVars, Var};
152use mz_sql_parser::ast::ExplainStage;
153use mz_sql_parser::ast::display::AstDisplay;
154use mz_storage_client::client::TableData;
155use mz_storage_client::controller::{CollectionDescription, DataSource, ExportDescription};
156use mz_storage_types::connections::Connection as StorageConnection;
157use mz_storage_types::connections::ConnectionContext;
158use mz_storage_types::connections::inline::{IntoInlineConnection, ReferencedConnection};
159use mz_storage_types::read_holds::ReadHold;
160use mz_storage_types::sinks::{S3SinkFormat, StorageSinkDesc};
161use mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC;
162use mz_storage_types::sources::{IngestionDescription, SourceExport, Timeline};
163use mz_timestamp_oracle::{TimestampOracleConfig, WriteTimestamp};
164use mz_transform::dataflow::DataflowMetainfo;
165use opentelemetry::trace::TraceContextExt;
166use serde::Serialize;
167use thiserror::Error;
168use timely::progress::{Antichain, Timestamp as _};
169use tokio::runtime::Handle as TokioHandle;
170use tokio::select;
171use tokio::sync::{OwnedMutexGuard, mpsc, oneshot, watch};
172use tokio::time::{Interval, MissedTickBehavior};
173use tracing::{Instrument, Level, Span, debug, info, info_span, span, warn};
174use tracing_opentelemetry::OpenTelemetrySpanExt;
175use uuid::Uuid;
176
177use crate::active_compute_sink::{ActiveComputeSink, ActiveCopyFrom};
178use crate::catalog::{BuiltinTableUpdate, Catalog, OpenCatalogResult};
179use crate::client::{Client, Handle};
180use crate::command::{Command, ExecuteResponse};
181use crate::config::{SynchronizedParameters, SystemParameterFrontend, SystemParameterSyncConfig};
182use crate::coord::appends::{
183    BuiltinTableAppendNotify, DeferredOp, GroupCommitPermit, PendingWriteTxn,
184};
185use crate::coord::caught_up::CaughtUpCheckContext;
186use crate::coord::cluster_scheduling::SchedulingDecision;
187use crate::coord::id_bundle::CollectionIdBundle;
188use crate::coord::introspection::IntrospectionSubscribe;
189use crate::coord::peek::PendingPeek;
190use crate::coord::statement_logging::StatementLogging;
191use crate::coord::timeline::{TimelineContext, TimelineState};
192use crate::coord::timestamp_selection::{TimestampContext, TimestampDetermination};
193use crate::coord::validity::PlanValidity;
194use crate::error::AdapterError;
195use crate::explain::insights::PlanInsightsContext;
196use crate::explain::optimizer_trace::{DispatchGuard, OptimizerTrace};
197use crate::metrics::Metrics;
198use crate::optimize::dataflows::{
199    ComputeInstanceSnapshot, DataflowBuilder, dataflow_import_id_bundle,
200};
201use crate::optimize::{self, Optimize, OptimizerConfig};
202use crate::session::{EndTransactionAction, Session};
203use crate::statement_logging::{
204    StatementEndedExecutionReason, StatementLifecycleEvent, StatementLoggingId,
205};
206use crate::util::{ClientTransmitter, ResultExt};
207use crate::webhook::{WebhookAppenderInvalidator, WebhookConcurrencyLimiter};
208use crate::{AdapterNotice, ReadHolds, flags};
209
210pub(crate) mod appends;
211pub(crate) mod catalog_serving;
212pub(crate) mod cluster_scheduling;
213pub(crate) mod consistency;
214pub(crate) mod id_bundle;
215pub(crate) mod in_memory_oracle;
216pub(crate) mod peek;
217pub(crate) mod read_policy;
218pub(crate) mod sequencer;
219pub(crate) mod statement_logging;
220pub(crate) mod timeline;
221pub(crate) mod timestamp_selection;
222
223pub mod catalog_implications;
224mod caught_up;
225mod command_handler;
226mod ddl;
227mod indexes;
228mod introspection;
229mod message_handler;
230mod privatelink_status;
231mod sql;
232mod validity;
233
234#[derive(Debug)]
235pub enum Message {
236    Command(OpenTelemetryContext, Command),
237    ControllerReady {
238        controller: ControllerReadiness,
239    },
240    PurifiedStatementReady(PurifiedStatementReady),
241    CreateConnectionValidationReady(CreateConnectionValidationReady),
242    AlterConnectionValidationReady(AlterConnectionValidationReady),
243    TryDeferred {
244        /// The connection that created this op.
245        conn_id: ConnectionId,
246        /// The write lock that notified us our deferred op might be able to run.
247        ///
248        /// Note: While we never want to hold a partial set of locks, it can be important to hold
249        /// onto the _one_ that notified us our op might be ready. If there are multiple operations
250        /// waiting on a single collection, and we don't hold this lock through retyring the op,
251        /// then everything waiting on this collection will get retried causing traffic in the
252        /// Coordinator's message queue.
253        ///
254        /// See [`DeferredOp::can_be_optimistically_retried`] for more detail.
255        acquired_lock: Option<(CatalogItemId, tokio::sync::OwnedMutexGuard<()>)>,
256    },
257    /// Initiates a group commit.
258    GroupCommitInitiate(Span, Option<GroupCommitPermit>),
259    DeferredStatementReady,
260    AdvanceTimelines,
261    ClusterEvent(ClusterEvent),
262    CancelPendingPeeks {
263        conn_id: ConnectionId,
264    },
265    LinearizeReads,
266    StagedBatches {
267        conn_id: ConnectionId,
268        table_id: CatalogItemId,
269        batches: Vec<Result<ProtoBatch, String>>,
270    },
271    StorageUsageSchedule,
272    StorageUsageFetch,
273    StorageUsageUpdate(ShardsUsageReferenced),
274    StorageUsagePrune(Vec<BuiltinTableUpdate>),
275    /// Performs any cleanup and logging actions necessary for
276    /// finalizing a statement execution.
277    RetireExecute {
278        data: ExecuteContextExtra,
279        otel_ctx: OpenTelemetryContext,
280        reason: StatementEndedExecutionReason,
281    },
282    ExecuteSingleStatementTransaction {
283        ctx: ExecuteContext,
284        otel_ctx: OpenTelemetryContext,
285        stmt: Arc<Statement<Raw>>,
286        params: mz_sql::plan::Params,
287    },
288    PeekStageReady {
289        ctx: ExecuteContext,
290        span: Span,
291        stage: PeekStage,
292    },
293    CreateIndexStageReady {
294        ctx: ExecuteContext,
295        span: Span,
296        stage: CreateIndexStage,
297    },
298    CreateViewStageReady {
299        ctx: ExecuteContext,
300        span: Span,
301        stage: CreateViewStage,
302    },
303    CreateMaterializedViewStageReady {
304        ctx: ExecuteContext,
305        span: Span,
306        stage: CreateMaterializedViewStage,
307    },
308    SubscribeStageReady {
309        ctx: ExecuteContext,
310        span: Span,
311        stage: SubscribeStage,
312    },
313    IntrospectionSubscribeStageReady {
314        span: Span,
315        stage: IntrospectionSubscribeStage,
316    },
317    SecretStageReady {
318        ctx: ExecuteContext,
319        span: Span,
320        stage: SecretStage,
321    },
322    ClusterStageReady {
323        ctx: ExecuteContext,
324        span: Span,
325        stage: ClusterStage,
326    },
327    ExplainTimestampStageReady {
328        ctx: ExecuteContext,
329        span: Span,
330        stage: ExplainTimestampStage,
331    },
332    DrainStatementLog,
333    PrivateLinkVpcEndpointEvents(Vec<VpcEndpointEvent>),
334    CheckSchedulingPolicies,
335
336    /// Scheduling policy decisions about turning clusters On/Off.
337    /// `Vec<(policy name, Vec of decisions by the policy)>`
338    /// A cluster will be On if and only if there is at least one On decision for it.
339    /// Scheduling decisions for clusters that have `SCHEDULE = MANUAL` are ignored.
340    SchedulingDecisions(Vec<(&'static str, Vec<(ClusterId, SchedulingDecision)>)>),
341}
342
343impl Message {
344    /// Returns a string to identify the kind of [`Message`], useful for logging.
345    pub const fn kind(&self) -> &'static str {
346        match self {
347            Message::Command(_, msg) => match msg {
348                Command::CatalogSnapshot { .. } => "command-catalog_snapshot",
349                Command::Startup { .. } => "command-startup",
350                Command::Execute { .. } => "command-execute",
351                Command::Commit { .. } => "command-commit",
352                Command::CancelRequest { .. } => "command-cancel_request",
353                Command::PrivilegedCancelRequest { .. } => "command-privileged_cancel_request",
354                Command::GetWebhook { .. } => "command-get_webhook",
355                Command::GetSystemVars { .. } => "command-get_system_vars",
356                Command::SetSystemVars { .. } => "command-set_system_vars",
357                Command::Terminate { .. } => "command-terminate",
358                Command::RetireExecute { .. } => "command-retire_execute",
359                Command::CheckConsistency { .. } => "command-check_consistency",
360                Command::Dump { .. } => "command-dump",
361                Command::AuthenticatePassword { .. } => "command-auth_check",
362                Command::AuthenticateGetSASLChallenge { .. } => "command-auth_get_sasl_challenge",
363                Command::AuthenticateVerifySASLProof { .. } => "command-auth_verify_sasl_proof",
364                Command::GetComputeInstanceClient { .. } => "get-compute-instance-client",
365                Command::GetOracle { .. } => "get-oracle",
366                Command::DetermineRealTimeRecentTimestamp { .. } => {
367                    "determine-real-time-recent-timestamp"
368                }
369                Command::GetTransactionReadHoldsBundle { .. } => {
370                    "get-transaction-read-holds-bundle"
371                }
372                Command::StoreTransactionReadHolds { .. } => "store-transaction-read-holds",
373                Command::ExecuteSlowPathPeek { .. } => "execute-slow-path-peek",
374                Command::CopyToPreflight { .. } => "copy-to-preflight",
375                Command::ExecuteCopyTo { .. } => "execute-copy-to",
376                Command::ExecuteSideEffectingFunc { .. } => "execute-side-effecting-func",
377                Command::RegisterFrontendPeek { .. } => "register-frontend-peek",
378                Command::UnregisterFrontendPeek { .. } => "unregister-frontend-peek",
379                Command::ExplainTimestamp { .. } => "explain-timestamp",
380                Command::FrontendStatementLogging(..) => "frontend-statement-logging",
381            },
382            Message::ControllerReady {
383                controller: ControllerReadiness::Compute,
384            } => "controller_ready(compute)",
385            Message::ControllerReady {
386                controller: ControllerReadiness::Storage,
387            } => "controller_ready(storage)",
388            Message::ControllerReady {
389                controller: ControllerReadiness::Metrics,
390            } => "controller_ready(metrics)",
391            Message::ControllerReady {
392                controller: ControllerReadiness::Internal,
393            } => "controller_ready(internal)",
394            Message::PurifiedStatementReady(_) => "purified_statement_ready",
395            Message::CreateConnectionValidationReady(_) => "create_connection_validation_ready",
396            Message::TryDeferred { .. } => "try_deferred",
397            Message::GroupCommitInitiate(..) => "group_commit_initiate",
398            Message::AdvanceTimelines => "advance_timelines",
399            Message::ClusterEvent(_) => "cluster_event",
400            Message::CancelPendingPeeks { .. } => "cancel_pending_peeks",
401            Message::LinearizeReads => "linearize_reads",
402            Message::StagedBatches { .. } => "staged_batches",
403            Message::StorageUsageSchedule => "storage_usage_schedule",
404            Message::StorageUsageFetch => "storage_usage_fetch",
405            Message::StorageUsageUpdate(_) => "storage_usage_update",
406            Message::StorageUsagePrune(_) => "storage_usage_prune",
407            Message::RetireExecute { .. } => "retire_execute",
408            Message::ExecuteSingleStatementTransaction { .. } => {
409                "execute_single_statement_transaction"
410            }
411            Message::PeekStageReady { .. } => "peek_stage_ready",
412            Message::ExplainTimestampStageReady { .. } => "explain_timestamp_stage_ready",
413            Message::CreateIndexStageReady { .. } => "create_index_stage_ready",
414            Message::CreateViewStageReady { .. } => "create_view_stage_ready",
415            Message::CreateMaterializedViewStageReady { .. } => {
416                "create_materialized_view_stage_ready"
417            }
418            Message::SubscribeStageReady { .. } => "subscribe_stage_ready",
419            Message::IntrospectionSubscribeStageReady { .. } => {
420                "introspection_subscribe_stage_ready"
421            }
422            Message::SecretStageReady { .. } => "secret_stage_ready",
423            Message::ClusterStageReady { .. } => "cluster_stage_ready",
424            Message::DrainStatementLog => "drain_statement_log",
425            Message::AlterConnectionValidationReady(..) => "alter_connection_validation_ready",
426            Message::PrivateLinkVpcEndpointEvents(_) => "private_link_vpc_endpoint_events",
427            Message::CheckSchedulingPolicies => "check_scheduling_policies",
428            Message::SchedulingDecisions { .. } => "scheduling_decision",
429            Message::DeferredStatementReady => "deferred_statement_ready",
430        }
431    }
432}
433
434/// The reason for why a controller needs processing on the main loop.
435#[derive(Debug)]
436pub enum ControllerReadiness {
437    /// The storage controller is ready.
438    Storage,
439    /// The compute controller is ready.
440    Compute,
441    /// A batch of metric data is ready.
442    Metrics,
443    /// An internally-generated message is ready to be returned.
444    Internal,
445}
446
447#[derive(Derivative)]
448#[derivative(Debug)]
449pub struct BackgroundWorkResult<T> {
450    #[derivative(Debug = "ignore")]
451    pub ctx: ExecuteContext,
452    pub result: Result<T, AdapterError>,
453    pub params: Params,
454    pub plan_validity: PlanValidity,
455    pub original_stmt: Arc<Statement<Raw>>,
456    pub otel_ctx: OpenTelemetryContext,
457}
458
459pub type PurifiedStatementReady = BackgroundWorkResult<mz_sql::pure::PurifiedStatement>;
460
461#[derive(Derivative)]
462#[derivative(Debug)]
463pub struct ValidationReady<T> {
464    #[derivative(Debug = "ignore")]
465    pub ctx: ExecuteContext,
466    pub result: Result<T, AdapterError>,
467    pub resolved_ids: ResolvedIds,
468    pub connection_id: CatalogItemId,
469    pub connection_gid: GlobalId,
470    pub plan_validity: PlanValidity,
471    pub otel_ctx: OpenTelemetryContext,
472}
473
474pub type CreateConnectionValidationReady = ValidationReady<CreateConnectionPlan>;
475pub type AlterConnectionValidationReady = ValidationReady<Connection>;
476
477#[derive(Debug)]
478pub enum PeekStage {
479    /// Common stages across SELECT, EXPLAIN and COPY TO queries.
480    LinearizeTimestamp(PeekStageLinearizeTimestamp),
481    RealTimeRecency(PeekStageRealTimeRecency),
482    TimestampReadHold(PeekStageTimestampReadHold),
483    Optimize(PeekStageOptimize),
484    /// Final stage for a peek.
485    Finish(PeekStageFinish),
486    /// Final stage for an explain.
487    ExplainPlan(PeekStageExplainPlan),
488    ExplainPushdown(PeekStageExplainPushdown),
489    /// Preflight checks for a copy to operation.
490    CopyToPreflight(PeekStageCopyTo),
491    /// Final stage for a copy to which involves shipping the dataflow.
492    CopyToDataflow(PeekStageCopyTo),
493}
494
495#[derive(Debug)]
496pub struct CopyToContext {
497    /// The `RelationDesc` of the data to be copied.
498    pub desc: RelationDesc,
499    /// The destination uri of the external service where the data will be copied.
500    pub uri: Uri,
501    /// Connection information required to connect to the external service to copy the data.
502    pub connection: StorageConnection<ReferencedConnection>,
503    /// The ID of the CONNECTION object to be used for copying the data.
504    pub connection_id: CatalogItemId,
505    /// Format params to format the data.
506    pub format: S3SinkFormat,
507    /// Approximate max file size of each uploaded file.
508    pub max_file_size: u64,
509    /// Number of batches the output of the COPY TO will be partitioned into
510    /// to distribute the load across workers deterministically.
511    /// This is only an option since it's not set when CopyToContext is instantiated
512    /// but immediately after in the PeekStageValidate stage.
513    pub output_batch_count: Option<u64>,
514}
515
516#[derive(Debug)]
517pub struct PeekStageLinearizeTimestamp {
518    validity: PlanValidity,
519    plan: mz_sql::plan::SelectPlan,
520    max_query_result_size: Option<u64>,
521    source_ids: BTreeSet<GlobalId>,
522    target_replica: Option<ReplicaId>,
523    timeline_context: TimelineContext,
524    optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
525    /// An optional context set iff the state machine is initiated from
526    /// sequencing an EXPLAIN for this statement.
527    explain_ctx: ExplainContext,
528}
529
530#[derive(Debug)]
531pub struct PeekStageRealTimeRecency {
532    validity: PlanValidity,
533    plan: mz_sql::plan::SelectPlan,
534    max_query_result_size: Option<u64>,
535    source_ids: BTreeSet<GlobalId>,
536    target_replica: Option<ReplicaId>,
537    timeline_context: TimelineContext,
538    oracle_read_ts: Option<Timestamp>,
539    optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
540    /// An optional context set iff the state machine is initiated from
541    /// sequencing an EXPLAIN for this statement.
542    explain_ctx: ExplainContext,
543}
544
545#[derive(Debug)]
546pub struct PeekStageTimestampReadHold {
547    validity: PlanValidity,
548    plan: mz_sql::plan::SelectPlan,
549    max_query_result_size: Option<u64>,
550    source_ids: BTreeSet<GlobalId>,
551    target_replica: Option<ReplicaId>,
552    timeline_context: TimelineContext,
553    oracle_read_ts: Option<Timestamp>,
554    real_time_recency_ts: Option<mz_repr::Timestamp>,
555    optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
556    /// An optional context set iff the state machine is initiated from
557    /// sequencing an EXPLAIN for this statement.
558    explain_ctx: ExplainContext,
559}
560
561#[derive(Debug)]
562pub struct PeekStageOptimize {
563    validity: PlanValidity,
564    plan: mz_sql::plan::SelectPlan,
565    max_query_result_size: Option<u64>,
566    source_ids: BTreeSet<GlobalId>,
567    id_bundle: CollectionIdBundle,
568    target_replica: Option<ReplicaId>,
569    determination: TimestampDetermination<mz_repr::Timestamp>,
570    optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
571    /// An optional context set iff the state machine is initiated from
572    /// sequencing an EXPLAIN for this statement.
573    explain_ctx: ExplainContext,
574}
575
576#[derive(Debug)]
577pub struct PeekStageFinish {
578    validity: PlanValidity,
579    plan: mz_sql::plan::SelectPlan,
580    max_query_result_size: Option<u64>,
581    id_bundle: CollectionIdBundle,
582    target_replica: Option<ReplicaId>,
583    source_ids: BTreeSet<GlobalId>,
584    determination: TimestampDetermination<mz_repr::Timestamp>,
585    cluster_id: ComputeInstanceId,
586    finishing: RowSetFinishing,
587    /// When present, an optimizer trace to be used for emitting a plan insights
588    /// notice.
589    plan_insights_optimizer_trace: Option<OptimizerTrace>,
590    insights_ctx: Option<Box<PlanInsightsContext>>,
591    global_lir_plan: optimize::peek::GlobalLirPlan,
592    optimization_finished_at: EpochMillis,
593}
594
595#[derive(Debug)]
596pub struct PeekStageCopyTo {
597    validity: PlanValidity,
598    optimizer: optimize::copy_to::Optimizer,
599    global_lir_plan: optimize::copy_to::GlobalLirPlan,
600    optimization_finished_at: EpochMillis,
601    source_ids: BTreeSet<GlobalId>,
602}
603
604#[derive(Debug)]
605pub struct PeekStageExplainPlan {
606    validity: PlanValidity,
607    optimizer: optimize::peek::Optimizer,
608    df_meta: DataflowMetainfo,
609    explain_ctx: ExplainPlanContext,
610    insights_ctx: Option<Box<PlanInsightsContext>>,
611}
612
613#[derive(Debug)]
614pub struct PeekStageExplainPushdown {
615    validity: PlanValidity,
616    determination: TimestampDetermination<mz_repr::Timestamp>,
617    imports: BTreeMap<GlobalId, MapFilterProject>,
618}
619
620#[derive(Debug)]
621pub enum CreateIndexStage {
622    Optimize(CreateIndexOptimize),
623    Finish(CreateIndexFinish),
624    Explain(CreateIndexExplain),
625}
626
627#[derive(Debug)]
628pub struct CreateIndexOptimize {
629    validity: PlanValidity,
630    plan: plan::CreateIndexPlan,
631    resolved_ids: ResolvedIds,
632    /// An optional context set iff the state machine is initiated from
633    /// sequencing an EXPLAIN for this statement.
634    explain_ctx: ExplainContext,
635}
636
637#[derive(Debug)]
638pub struct CreateIndexFinish {
639    validity: PlanValidity,
640    item_id: CatalogItemId,
641    global_id: GlobalId,
642    plan: plan::CreateIndexPlan,
643    resolved_ids: ResolvedIds,
644    global_mir_plan: optimize::index::GlobalMirPlan,
645    global_lir_plan: optimize::index::GlobalLirPlan,
646}
647
648#[derive(Debug)]
649pub struct CreateIndexExplain {
650    validity: PlanValidity,
651    exported_index_id: GlobalId,
652    plan: plan::CreateIndexPlan,
653    df_meta: DataflowMetainfo,
654    explain_ctx: ExplainPlanContext,
655}
656
657#[derive(Debug)]
658pub enum CreateViewStage {
659    Optimize(CreateViewOptimize),
660    Finish(CreateViewFinish),
661    Explain(CreateViewExplain),
662}
663
664#[derive(Debug)]
665pub struct CreateViewOptimize {
666    validity: PlanValidity,
667    plan: plan::CreateViewPlan,
668    resolved_ids: ResolvedIds,
669    /// An optional context set iff the state machine is initiated from
670    /// sequencing an EXPLAIN for this statement.
671    explain_ctx: ExplainContext,
672}
673
674#[derive(Debug)]
675pub struct CreateViewFinish {
676    validity: PlanValidity,
677    /// ID of this item in the Catalog.
678    item_id: CatalogItemId,
679    /// ID by with Compute will reference this View.
680    global_id: GlobalId,
681    plan: plan::CreateViewPlan,
682    /// IDs of objects resolved during name resolution.
683    resolved_ids: ResolvedIds,
684    optimized_expr: OptimizedMirRelationExpr,
685}
686
687#[derive(Debug)]
688pub struct CreateViewExplain {
689    validity: PlanValidity,
690    id: GlobalId,
691    plan: plan::CreateViewPlan,
692    explain_ctx: ExplainPlanContext,
693}
694
695#[derive(Debug)]
696pub enum ExplainTimestampStage {
697    Optimize(ExplainTimestampOptimize),
698    RealTimeRecency(ExplainTimestampRealTimeRecency),
699    Finish(ExplainTimestampFinish),
700}
701
702#[derive(Debug)]
703pub struct ExplainTimestampOptimize {
704    validity: PlanValidity,
705    plan: plan::ExplainTimestampPlan,
706    cluster_id: ClusterId,
707}
708
709#[derive(Debug)]
710pub struct ExplainTimestampRealTimeRecency {
711    validity: PlanValidity,
712    format: ExplainFormat,
713    optimized_plan: OptimizedMirRelationExpr,
714    cluster_id: ClusterId,
715    when: QueryWhen,
716}
717
718#[derive(Debug)]
719pub struct ExplainTimestampFinish {
720    validity: PlanValidity,
721    format: ExplainFormat,
722    optimized_plan: OptimizedMirRelationExpr,
723    cluster_id: ClusterId,
724    source_ids: BTreeSet<GlobalId>,
725    when: QueryWhen,
726    real_time_recency_ts: Option<Timestamp>,
727}
728
729#[derive(Debug)]
730pub enum ClusterStage {
731    Alter(AlterCluster),
732    WaitForHydrated(AlterClusterWaitForHydrated),
733    Finalize(AlterClusterFinalize),
734}
735
736#[derive(Debug)]
737pub struct AlterCluster {
738    validity: PlanValidity,
739    plan: plan::AlterClusterPlan,
740}
741
742#[derive(Debug)]
743pub struct AlterClusterWaitForHydrated {
744    validity: PlanValidity,
745    plan: plan::AlterClusterPlan,
746    new_config: ClusterVariantManaged,
747    timeout_time: Instant,
748    on_timeout: OnTimeoutAction,
749}
750
751#[derive(Debug)]
752pub struct AlterClusterFinalize {
753    validity: PlanValidity,
754    plan: plan::AlterClusterPlan,
755    new_config: ClusterVariantManaged,
756}
757
758#[derive(Debug)]
759pub enum ExplainContext {
760    /// The ordinary, non-explain variant of the statement.
761    None,
762    /// The `EXPLAIN <level> PLAN FOR <explainee>` version of the statement.
763    Plan(ExplainPlanContext),
764    /// Generate a notice containing the `EXPLAIN PLAN INSIGHTS` output
765    /// alongside the query's normal output.
766    PlanInsightsNotice(OptimizerTrace),
767    /// `EXPLAIN FILTER PUSHDOWN`
768    Pushdown,
769}
770
771impl ExplainContext {
772    /// If available for this context, wrap the [`OptimizerTrace`] into a
773    /// [`tracing::Dispatch`] and set it as default, returning the resulting
774    /// guard in a `Some(guard)` option.
775    pub(crate) fn dispatch_guard(&self) -> Option<DispatchGuard<'_>> {
776        let optimizer_trace = match self {
777            ExplainContext::Plan(explain_ctx) => Some(&explain_ctx.optimizer_trace),
778            ExplainContext::PlanInsightsNotice(optimizer_trace) => Some(optimizer_trace),
779            _ => None,
780        };
781        optimizer_trace.map(|optimizer_trace| optimizer_trace.as_guard())
782    }
783
784    pub(crate) fn needs_cluster(&self) -> bool {
785        match self {
786            ExplainContext::None => true,
787            ExplainContext::Plan(..) => false,
788            ExplainContext::PlanInsightsNotice(..) => true,
789            ExplainContext::Pushdown => false,
790        }
791    }
792
793    pub(crate) fn needs_plan_insights(&self) -> bool {
794        matches!(
795            self,
796            ExplainContext::Plan(ExplainPlanContext {
797                stage: ExplainStage::PlanInsights,
798                ..
799            }) | ExplainContext::PlanInsightsNotice(_)
800        )
801    }
802}
803
804#[derive(Debug)]
805pub struct ExplainPlanContext {
806    /// EXPLAIN BROKEN is internal syntax for showing EXPLAIN output despite an internal error in
807    /// the optimizer: we don't immediately bail out from peek sequencing when an internal optimizer
808    /// error happens, but go on with trying to show the requested EXPLAIN stage. This can still
809    /// succeed if the requested EXPLAIN stage is before the point where the error happened.
810    pub broken: bool,
811    pub config: ExplainConfig,
812    pub format: ExplainFormat,
813    pub stage: ExplainStage,
814    pub replan: Option<GlobalId>,
815    pub desc: Option<RelationDesc>,
816    pub optimizer_trace: OptimizerTrace,
817}
818
819#[derive(Debug)]
820pub enum CreateMaterializedViewStage {
821    Optimize(CreateMaterializedViewOptimize),
822    Finish(CreateMaterializedViewFinish),
823    Explain(CreateMaterializedViewExplain),
824}
825
826#[derive(Debug)]
827pub struct CreateMaterializedViewOptimize {
828    validity: PlanValidity,
829    plan: plan::CreateMaterializedViewPlan,
830    resolved_ids: ResolvedIds,
831    /// An optional context set iff the state machine is initiated from
832    /// sequencing an EXPLAIN for this statement.
833    explain_ctx: ExplainContext,
834}
835
836#[derive(Debug)]
837pub struct CreateMaterializedViewFinish {
838    /// The ID of this Materialized View in the Catalog.
839    item_id: CatalogItemId,
840    /// The ID of the durable pTVC backing this Materialized View.
841    global_id: GlobalId,
842    validity: PlanValidity,
843    plan: plan::CreateMaterializedViewPlan,
844    resolved_ids: ResolvedIds,
845    local_mir_plan: optimize::materialized_view::LocalMirPlan,
846    global_mir_plan: optimize::materialized_view::GlobalMirPlan,
847    global_lir_plan: optimize::materialized_view::GlobalLirPlan,
848}
849
850#[derive(Debug)]
851pub struct CreateMaterializedViewExplain {
852    global_id: GlobalId,
853    validity: PlanValidity,
854    plan: plan::CreateMaterializedViewPlan,
855    df_meta: DataflowMetainfo,
856    explain_ctx: ExplainPlanContext,
857}
858
859#[derive(Debug)]
860pub enum SubscribeStage {
861    OptimizeMir(SubscribeOptimizeMir),
862    TimestampOptimizeLir(SubscribeTimestampOptimizeLir),
863    Finish(SubscribeFinish),
864    Explain(SubscribeExplain),
865}
866
867#[derive(Debug)]
868pub struct SubscribeOptimizeMir {
869    validity: PlanValidity,
870    plan: plan::SubscribePlan,
871    timeline: TimelineContext,
872    dependency_ids: BTreeSet<GlobalId>,
873    cluster_id: ComputeInstanceId,
874    replica_id: Option<ReplicaId>,
875    /// An optional context set iff the state machine is initiated from
876    /// sequencing an EXPLAIN for this statement.
877    explain_ctx: ExplainContext,
878}
879
880#[derive(Debug)]
881pub struct SubscribeTimestampOptimizeLir {
882    validity: PlanValidity,
883    plan: plan::SubscribePlan,
884    timeline: TimelineContext,
885    optimizer: optimize::subscribe::Optimizer,
886    global_mir_plan: optimize::subscribe::GlobalMirPlan<optimize::subscribe::Unresolved>,
887    dependency_ids: BTreeSet<GlobalId>,
888    replica_id: Option<ReplicaId>,
889    /// An optional context set iff the state machine is initiated from
890    /// sequencing an EXPLAIN for this statement.
891    explain_ctx: ExplainContext,
892}
893
894#[derive(Debug)]
895pub struct SubscribeFinish {
896    validity: PlanValidity,
897    cluster_id: ComputeInstanceId,
898    replica_id: Option<ReplicaId>,
899    plan: plan::SubscribePlan,
900    global_lir_plan: optimize::subscribe::GlobalLirPlan,
901    dependency_ids: BTreeSet<GlobalId>,
902}
903
904#[derive(Debug)]
905pub struct SubscribeExplain {
906    validity: PlanValidity,
907    optimizer: optimize::subscribe::Optimizer,
908    df_meta: DataflowMetainfo,
909    cluster_id: ComputeInstanceId,
910    explain_ctx: ExplainPlanContext,
911}
912
913#[derive(Debug)]
914pub enum IntrospectionSubscribeStage {
915    OptimizeMir(IntrospectionSubscribeOptimizeMir),
916    TimestampOptimizeLir(IntrospectionSubscribeTimestampOptimizeLir),
917    Finish(IntrospectionSubscribeFinish),
918}
919
920#[derive(Debug)]
921pub struct IntrospectionSubscribeOptimizeMir {
922    validity: PlanValidity,
923    plan: plan::SubscribePlan,
924    subscribe_id: GlobalId,
925    cluster_id: ComputeInstanceId,
926    replica_id: ReplicaId,
927}
928
929#[derive(Debug)]
930pub struct IntrospectionSubscribeTimestampOptimizeLir {
931    validity: PlanValidity,
932    optimizer: optimize::subscribe::Optimizer,
933    global_mir_plan: optimize::subscribe::GlobalMirPlan<optimize::subscribe::Unresolved>,
934    cluster_id: ComputeInstanceId,
935    replica_id: ReplicaId,
936}
937
938#[derive(Debug)]
939pub struct IntrospectionSubscribeFinish {
940    validity: PlanValidity,
941    global_lir_plan: optimize::subscribe::GlobalLirPlan,
942    read_holds: ReadHolds<Timestamp>,
943    cluster_id: ComputeInstanceId,
944    replica_id: ReplicaId,
945}
946
947#[derive(Debug)]
948pub enum SecretStage {
949    CreateEnsure(CreateSecretEnsure),
950    CreateFinish(CreateSecretFinish),
951    RotateKeysEnsure(RotateKeysSecretEnsure),
952    RotateKeysFinish(RotateKeysSecretFinish),
953    Alter(AlterSecret),
954}
955
956#[derive(Debug)]
957pub struct CreateSecretEnsure {
958    validity: PlanValidity,
959    plan: plan::CreateSecretPlan,
960}
961
962#[derive(Debug)]
963pub struct CreateSecretFinish {
964    validity: PlanValidity,
965    item_id: CatalogItemId,
966    global_id: GlobalId,
967    plan: plan::CreateSecretPlan,
968}
969
970#[derive(Debug)]
971pub struct RotateKeysSecretEnsure {
972    validity: PlanValidity,
973    id: CatalogItemId,
974}
975
976#[derive(Debug)]
977pub struct RotateKeysSecretFinish {
978    validity: PlanValidity,
979    ops: Vec<crate::catalog::Op>,
980}
981
982#[derive(Debug)]
983pub struct AlterSecret {
984    validity: PlanValidity,
985    plan: plan::AlterSecretPlan,
986}
987
988/// An enum describing which cluster to run a statement on.
989///
990/// One example usage would be that if a query depends only on system tables, we might
991/// automatically run it on the catalog server cluster to benefit from indexes that exist there.
992#[derive(Debug, Copy, Clone, PartialEq, Eq)]
993pub enum TargetCluster {
994    /// The catalog server cluster.
995    CatalogServer,
996    /// The current user's active cluster.
997    Active,
998    /// The cluster selected at the start of a transaction.
999    Transaction(ClusterId),
1000}
1001
1002/// Result types for each stage of a sequence.
1003pub(crate) enum StageResult<T> {
1004    /// A task was spawned that will return the next stage.
1005    Handle(JoinHandle<Result<T, AdapterError>>),
1006    /// A task was spawned that will return a response for the client.
1007    HandleRetire(JoinHandle<Result<ExecuteResponse, AdapterError>>),
1008    /// The next stage is immediately ready and will execute.
1009    Immediate(T),
1010    /// The final stage was executed and is ready to respond to the client.
1011    Response(ExecuteResponse),
1012}
1013
1014/// Common functionality for [Coordinator::sequence_staged].
1015pub(crate) trait Staged: Send {
1016    type Ctx: StagedContext;
1017
1018    fn validity(&mut self) -> &mut PlanValidity;
1019
1020    /// Returns the next stage or final result.
1021    async fn stage(
1022        self,
1023        coord: &mut Coordinator,
1024        ctx: &mut Self::Ctx,
1025    ) -> Result<StageResult<Box<Self>>, AdapterError>;
1026
1027    /// Prepares a message for the Coordinator.
1028    fn message(self, ctx: Self::Ctx, span: Span) -> Message;
1029
1030    /// Whether it is safe to SQL cancel this stage.
1031    fn cancel_enabled(&self) -> bool;
1032}
1033
1034pub trait StagedContext {
1035    fn retire(self, result: Result<ExecuteResponse, AdapterError>);
1036    fn session(&self) -> Option<&Session>;
1037}
1038
1039impl StagedContext for ExecuteContext {
1040    fn retire(self, result: Result<ExecuteResponse, AdapterError>) {
1041        self.retire(result);
1042    }
1043
1044    fn session(&self) -> Option<&Session> {
1045        Some(self.session())
1046    }
1047}
1048
1049impl StagedContext for () {
1050    fn retire(self, _result: Result<ExecuteResponse, AdapterError>) {}
1051
1052    fn session(&self) -> Option<&Session> {
1053        None
1054    }
1055}
1056
1057/// Configures a coordinator.
1058pub struct Config {
1059    pub controller_config: ControllerConfig,
1060    pub controller_envd_epoch: NonZeroI64,
1061    pub storage: Box<dyn mz_catalog::durable::DurableCatalogState>,
1062    pub audit_logs_iterator: AuditLogIterator,
1063    pub timestamp_oracle_url: Option<SensitiveUrl>,
1064    pub unsafe_mode: bool,
1065    pub all_features: bool,
1066    pub build_info: &'static BuildInfo,
1067    pub environment_id: EnvironmentId,
1068    pub metrics_registry: MetricsRegistry,
1069    pub now: NowFn,
1070    pub secrets_controller: Arc<dyn SecretsController>,
1071    pub cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
1072    pub availability_zones: Vec<String>,
1073    pub cluster_replica_sizes: ClusterReplicaSizeMap,
1074    pub builtin_system_cluster_config: BootstrapBuiltinClusterConfig,
1075    pub builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig,
1076    pub builtin_probe_cluster_config: BootstrapBuiltinClusterConfig,
1077    pub builtin_support_cluster_config: BootstrapBuiltinClusterConfig,
1078    pub builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig,
1079    pub system_parameter_defaults: BTreeMap<String, String>,
1080    pub storage_usage_client: StorageUsageClient,
1081    pub storage_usage_collection_interval: Duration,
1082    pub storage_usage_retention_period: Option<Duration>,
1083    pub segment_client: Option<mz_segment::Client>,
1084    pub egress_addresses: Vec<IpNet>,
1085    pub remote_system_parameters: Option<BTreeMap<String, String>>,
1086    pub aws_account_id: Option<String>,
1087    pub aws_privatelink_availability_zones: Option<Vec<String>>,
1088    pub connection_context: ConnectionContext,
1089    pub connection_limit_callback: Box<dyn Fn(u64, u64) -> () + Send + Sync + 'static>,
1090    pub webhook_concurrency_limit: WebhookConcurrencyLimiter,
1091    pub http_host_name: Option<String>,
1092    pub tracing_handle: TracingHandle,
1093    /// Whether or not to start controllers in read-only mode. This is only
1094    /// meant for use during development of read-only clusters and 0dt upgrades
1095    /// and should go away once we have proper orchestration during upgrades.
1096    pub read_only_controllers: bool,
1097
1098    /// A trigger that signals that the current deployment has caught up with a
1099    /// previous deployment. Only used during 0dt deployment, while in read-only
1100    /// mode.
1101    pub caught_up_trigger: Option<Trigger>,
1102
1103    pub helm_chart_version: Option<String>,
1104    pub license_key: ValidatedLicenseKey,
1105    pub external_login_password_mz_system: Option<Password>,
1106    pub force_builtin_schema_migration: Option<String>,
1107}
1108
1109/// Metadata about an active connection.
1110#[derive(Debug, Serialize)]
1111pub struct ConnMeta {
1112    /// Pgwire specifies that every connection have a 32-bit secret associated
1113    /// with it, that is known to both the client and the server. Cancellation
1114    /// requests are required to authenticate with the secret of the connection
1115    /// that they are targeting.
1116    secret_key: u32,
1117    /// The time when the session's connection was initiated.
1118    connected_at: EpochMillis,
1119    user: User,
1120    application_name: String,
1121    uuid: Uuid,
1122    conn_id: ConnectionId,
1123    client_ip: Option<IpAddr>,
1124
1125    /// Sinks that will need to be dropped when the current transaction, if
1126    /// any, is cleared.
1127    drop_sinks: BTreeSet<GlobalId>,
1128
1129    /// Lock for the Coordinator's deferred statements that is dropped on transaction clear.
1130    #[serde(skip)]
1131    deferred_lock: Option<OwnedMutexGuard<()>>,
1132
1133    /// Cluster reconfigurations that will need to be
1134    /// cleaned up when the current transaction is cleared
1135    pending_cluster_alters: BTreeSet<ClusterId>,
1136
1137    /// Channel on which to send notices to a session.
1138    #[serde(skip)]
1139    notice_tx: mpsc::UnboundedSender<AdapterNotice>,
1140
1141    /// The role that initiated the database context. Fixed for the duration of the connection.
1142    /// WARNING: This role reference is not updated when the role is dropped.
1143    /// Consumers should not assume that this role exist.
1144    authenticated_role: RoleId,
1145}
1146
1147impl ConnMeta {
1148    pub fn conn_id(&self) -> &ConnectionId {
1149        &self.conn_id
1150    }
1151
1152    pub fn user(&self) -> &User {
1153        &self.user
1154    }
1155
1156    pub fn application_name(&self) -> &str {
1157        &self.application_name
1158    }
1159
1160    pub fn authenticated_role_id(&self) -> &RoleId {
1161        &self.authenticated_role
1162    }
1163
1164    pub fn uuid(&self) -> Uuid {
1165        self.uuid
1166    }
1167
1168    pub fn client_ip(&self) -> Option<IpAddr> {
1169        self.client_ip
1170    }
1171
1172    pub fn connected_at(&self) -> EpochMillis {
1173        self.connected_at
1174    }
1175}
1176
1177#[derive(Debug)]
1178/// A pending transaction waiting to be committed.
1179pub struct PendingTxn {
1180    /// Context used to send a response back to the client.
1181    ctx: ExecuteContext,
1182    /// Client response for transaction.
1183    response: Result<PendingTxnResponse, AdapterError>,
1184    /// The action to take at the end of the transaction.
1185    action: EndTransactionAction,
1186}
1187
1188#[derive(Debug)]
1189/// The response we'll send for a [`PendingTxn`].
1190pub enum PendingTxnResponse {
1191    /// The transaction will be committed.
1192    Committed {
1193        /// Parameters that will change, and their values, once this transaction is complete.
1194        params: BTreeMap<&'static str, String>,
1195    },
1196    /// The transaction will be rolled back.
1197    Rolledback {
1198        /// Parameters that will change, and their values, once this transaction is complete.
1199        params: BTreeMap<&'static str, String>,
1200    },
1201}
1202
1203impl PendingTxnResponse {
1204    pub fn extend_params(&mut self, p: impl IntoIterator<Item = (&'static str, String)>) {
1205        match self {
1206            PendingTxnResponse::Committed { params }
1207            | PendingTxnResponse::Rolledback { params } => params.extend(p),
1208        }
1209    }
1210}
1211
1212impl From<PendingTxnResponse> for ExecuteResponse {
1213    fn from(value: PendingTxnResponse) -> Self {
1214        match value {
1215            PendingTxnResponse::Committed { params } => {
1216                ExecuteResponse::TransactionCommitted { params }
1217            }
1218            PendingTxnResponse::Rolledback { params } => {
1219                ExecuteResponse::TransactionRolledBack { params }
1220            }
1221        }
1222    }
1223}
1224
1225#[derive(Debug)]
1226/// A pending read transaction waiting to be linearized along with metadata about it's state
1227pub struct PendingReadTxn {
1228    /// The transaction type
1229    txn: PendingRead,
1230    /// The timestamp context of the transaction.
1231    timestamp_context: TimestampContext<mz_repr::Timestamp>,
1232    /// When we created this pending txn, when the transaction ends. Only used for metrics.
1233    created: Instant,
1234    /// Number of times we requeued the processing of this pending read txn.
1235    /// Requeueing is necessary if the time we executed the query is after the current oracle time;
1236    /// see [`Coordinator::message_linearize_reads`] for more details.
1237    num_requeues: u64,
1238    /// Telemetry context.
1239    otel_ctx: OpenTelemetryContext,
1240}
1241
1242impl PendingReadTxn {
1243    /// Return the timestamp context of the pending read transaction.
1244    pub fn timestamp_context(&self) -> &TimestampContext<mz_repr::Timestamp> {
1245        &self.timestamp_context
1246    }
1247
1248    pub(crate) fn take_context(self) -> ExecuteContext {
1249        self.txn.take_context()
1250    }
1251}
1252
1253#[derive(Debug)]
1254/// A pending read transaction waiting to be linearized.
1255enum PendingRead {
1256    Read {
1257        /// The inner transaction.
1258        txn: PendingTxn,
1259    },
1260    ReadThenWrite {
1261        /// Context used to send a response back to the client.
1262        ctx: ExecuteContext,
1263        /// Channel used to alert the transaction that the read has been linearized and send back
1264        /// `ctx`.
1265        tx: oneshot::Sender<Option<ExecuteContext>>,
1266    },
1267}
1268
1269impl PendingRead {
1270    /// Alert the client that the read has been linearized.
1271    ///
1272    /// If it is necessary to finalize an execute, return the state necessary to do so
1273    /// (execution context and result)
1274    #[instrument(level = "debug")]
1275    pub fn finish(self) -> Option<(ExecuteContext, Result<ExecuteResponse, AdapterError>)> {
1276        match self {
1277            PendingRead::Read {
1278                txn:
1279                    PendingTxn {
1280                        mut ctx,
1281                        response,
1282                        action,
1283                    },
1284                ..
1285            } => {
1286                let changed = ctx.session_mut().vars_mut().end_transaction(action);
1287                // Append any parameters that changed to the response.
1288                let response = response.map(|mut r| {
1289                    r.extend_params(changed);
1290                    ExecuteResponse::from(r)
1291                });
1292
1293                Some((ctx, response))
1294            }
1295            PendingRead::ReadThenWrite { ctx, tx, .. } => {
1296                // Ignore errors if the caller has hung up.
1297                let _ = tx.send(Some(ctx));
1298                None
1299            }
1300        }
1301    }
1302
1303    fn label(&self) -> &'static str {
1304        match self {
1305            PendingRead::Read { .. } => "read",
1306            PendingRead::ReadThenWrite { .. } => "read_then_write",
1307        }
1308    }
1309
1310    pub(crate) fn take_context(self) -> ExecuteContext {
1311        match self {
1312            PendingRead::Read { txn, .. } => txn.ctx,
1313            PendingRead::ReadThenWrite { ctx, tx, .. } => {
1314                // Inform the transaction that we've taken their context.
1315                // Ignore errors if the caller has hung up.
1316                let _ = tx.send(None);
1317                ctx
1318            }
1319        }
1320    }
1321}
1322
1323/// State that the coordinator must process as part of retiring
1324/// command execution.  `ExecuteContextExtra::Default` is guaranteed
1325/// to produce a value that will cause the coordinator to do nothing, and
1326/// is intended for use by code that invokes the execution processing flow
1327/// (i.e., `sequence_plan`) without actually being a statement execution.
1328///
1329/// This is a pure data struct containing only the statement logging ID.
1330/// For auto-retire-on-drop behavior, use `ExecuteContextGuard` which wraps
1331/// this struct and owns the channel for sending retirement messages.
1332#[derive(Debug, Default)]
1333#[must_use]
1334pub struct ExecuteContextExtra {
1335    statement_uuid: Option<StatementLoggingId>,
1336}
1337
1338impl ExecuteContextExtra {
1339    pub(crate) fn new(statement_uuid: Option<StatementLoggingId>) -> Self {
1340        Self { statement_uuid }
1341    }
1342    pub fn is_trivial(&self) -> bool {
1343        self.statement_uuid.is_none()
1344    }
1345    pub fn contents(&self) -> Option<StatementLoggingId> {
1346        self.statement_uuid
1347    }
1348    /// Consume this extra and return the statement UUID for retirement.
1349    /// This should only be called from code that knows what to do to finish
1350    /// up logging based on the inner value.
1351    #[must_use]
1352    pub(crate) fn retire(self) -> Option<StatementLoggingId> {
1353        self.statement_uuid
1354    }
1355}
1356
1357/// A guard that wraps `ExecuteContextExtra` and owns a channel for sending
1358/// retirement messages to the coordinator.
1359///
1360/// If this guard is dropped with a `Some` `statement_uuid` in its inner
1361/// `ExecuteContextExtra`, the `Drop` implementation will automatically send a
1362/// `Message::RetireExecute` to log the statement ending.
1363/// This handles cases like connection drops where the context cannot be
1364/// explicitly retired.
1365/// See <https://github.com/MaterializeInc/database-issues/issues/7304>
1366#[derive(Debug)]
1367#[must_use]
1368pub struct ExecuteContextGuard {
1369    extra: ExecuteContextExtra,
1370    /// Channel for sending messages to the coordinator. Used for auto-retiring on drop.
1371    /// For `Default` instances, this is a dummy sender (receiver already dropped), so
1372    /// sends will fail silently - which is the desired behavior since Default instances
1373    /// should only be used for non-logged statements.
1374    coordinator_tx: mpsc::UnboundedSender<Message>,
1375}
1376
1377impl Default for ExecuteContextGuard {
1378    fn default() -> Self {
1379        // Create a dummy sender by immediately dropping the receiver.
1380        // Any send on this channel will fail silently, which is the desired
1381        // behavior for Default instances (non-logged statements).
1382        let (tx, _rx) = mpsc::unbounded_channel();
1383        Self {
1384            extra: ExecuteContextExtra::default(),
1385            coordinator_tx: tx,
1386        }
1387    }
1388}
1389
1390impl ExecuteContextGuard {
1391    pub(crate) fn new(
1392        statement_uuid: Option<StatementLoggingId>,
1393        coordinator_tx: mpsc::UnboundedSender<Message>,
1394    ) -> Self {
1395        Self {
1396            extra: ExecuteContextExtra::new(statement_uuid),
1397            coordinator_tx,
1398        }
1399    }
1400    pub fn is_trivial(&self) -> bool {
1401        self.extra.is_trivial()
1402    }
1403    pub fn contents(&self) -> Option<StatementLoggingId> {
1404        self.extra.contents()
1405    }
1406    /// Take responsibility for the contents.  This should only be
1407    /// called from code that knows what to do to finish up logging
1408    /// based on the inner value.
1409    ///
1410    /// Returns the inner `ExecuteContextExtra`, consuming the guard without
1411    /// triggering the auto-retire behavior.
1412    pub(crate) fn defuse(mut self) -> ExecuteContextExtra {
1413        // Taking statement_uuid prevents the Drop impl from sending a retire message
1414        std::mem::take(&mut self.extra)
1415    }
1416}
1417
1418impl Drop for ExecuteContextGuard {
1419    fn drop(&mut self) {
1420        if let Some(statement_uuid) = self.extra.statement_uuid.take() {
1421            // Auto-retire since the guard was dropped without explicit retirement (likely due
1422            // to connection drop).
1423            let msg = Message::RetireExecute {
1424                data: ExecuteContextExtra {
1425                    statement_uuid: Some(statement_uuid),
1426                },
1427                otel_ctx: OpenTelemetryContext::obtain(),
1428                reason: StatementEndedExecutionReason::Aborted,
1429            };
1430            // Send may fail for Default instances (dummy sender), which is fine since
1431            // Default instances should only be used for non-logged statements.
1432            let _ = self.coordinator_tx.send(msg);
1433        }
1434    }
1435}
1436
1437/// Bundle of state related to statement execution.
1438///
1439/// This struct collects a bundle of state that needs to be threaded
1440/// through various functions as part of statement execution.
1441/// It is used to finalize execution, by calling `retire`. Finalizing execution
1442/// involves sending the session back to the pgwire layer so that it
1443/// may be used to process further commands. It also involves
1444/// performing some work on the main coordinator thread
1445/// (e.g., recording the time at which the statement finished
1446/// executing). The state necessary to perform this work is bundled in
1447/// the `ExecuteContextGuard` object.
1448#[derive(Debug)]
1449pub struct ExecuteContext {
1450    inner: Box<ExecuteContextInner>,
1451}
1452
1453impl std::ops::Deref for ExecuteContext {
1454    type Target = ExecuteContextInner;
1455    fn deref(&self) -> &Self::Target {
1456        &*self.inner
1457    }
1458}
1459
1460impl std::ops::DerefMut for ExecuteContext {
1461    fn deref_mut(&mut self) -> &mut Self::Target {
1462        &mut *self.inner
1463    }
1464}
1465
1466#[derive(Debug)]
1467pub struct ExecuteContextInner {
1468    tx: ClientTransmitter<ExecuteResponse>,
1469    internal_cmd_tx: mpsc::UnboundedSender<Message>,
1470    session: Session,
1471    extra: ExecuteContextGuard,
1472}
1473
1474impl ExecuteContext {
1475    pub fn session(&self) -> &Session {
1476        &self.session
1477    }
1478
1479    pub fn session_mut(&mut self) -> &mut Session {
1480        &mut self.session
1481    }
1482
1483    pub fn tx(&self) -> &ClientTransmitter<ExecuteResponse> {
1484        &self.tx
1485    }
1486
1487    pub fn tx_mut(&mut self) -> &mut ClientTransmitter<ExecuteResponse> {
1488        &mut self.tx
1489    }
1490
1491    pub fn from_parts(
1492        tx: ClientTransmitter<ExecuteResponse>,
1493        internal_cmd_tx: mpsc::UnboundedSender<Message>,
1494        session: Session,
1495        extra: ExecuteContextGuard,
1496    ) -> Self {
1497        Self {
1498            inner: ExecuteContextInner {
1499                tx,
1500                session,
1501                extra,
1502                internal_cmd_tx,
1503            }
1504            .into(),
1505        }
1506    }
1507
1508    /// By calling this function, the caller takes responsibility for
1509    /// dealing with the instance of `ExecuteContextGuard`. This is
1510    /// intended to support protocols (like `COPY FROM`) that involve
1511    /// multiple passes of sending the session back and forth between
1512    /// the coordinator and the pgwire layer. As part of any such
1513    /// protocol, we must ensure that the `ExecuteContextGuard`
1514    /// (possibly wrapped in a new `ExecuteContext`) is passed back to the coordinator for
1515    /// eventual retirement.
1516    pub fn into_parts(
1517        self,
1518    ) -> (
1519        ClientTransmitter<ExecuteResponse>,
1520        mpsc::UnboundedSender<Message>,
1521        Session,
1522        ExecuteContextGuard,
1523    ) {
1524        let ExecuteContextInner {
1525            tx,
1526            internal_cmd_tx,
1527            session,
1528            extra,
1529        } = *self.inner;
1530        (tx, internal_cmd_tx, session, extra)
1531    }
1532
1533    /// Retire the execution, by sending a message to the coordinator.
1534    #[instrument(level = "debug")]
1535    pub fn retire(self, result: Result<ExecuteResponse, AdapterError>) {
1536        let ExecuteContextInner {
1537            tx,
1538            internal_cmd_tx,
1539            session,
1540            extra,
1541        } = *self.inner;
1542        let reason = if extra.is_trivial() {
1543            None
1544        } else {
1545            Some((&result).into())
1546        };
1547        tx.send(result, session);
1548        if let Some(reason) = reason {
1549            // Retire the guard to get the inner ExecuteContextExtra without triggering auto-retire
1550            let extra = extra.defuse();
1551            if let Err(e) = internal_cmd_tx.send(Message::RetireExecute {
1552                otel_ctx: OpenTelemetryContext::obtain(),
1553                data: extra,
1554                reason,
1555            }) {
1556                warn!("internal_cmd_rx dropped before we could send: {:?}", e);
1557            }
1558        }
1559    }
1560
1561    pub fn extra(&self) -> &ExecuteContextGuard {
1562        &self.extra
1563    }
1564
1565    pub fn extra_mut(&mut self) -> &mut ExecuteContextGuard {
1566        &mut self.extra
1567    }
1568}
1569
1570#[derive(Debug)]
1571struct ClusterReplicaStatuses(
1572    BTreeMap<ClusterId, BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>>>,
1573);
1574
1575impl ClusterReplicaStatuses {
1576    pub(crate) fn new() -> ClusterReplicaStatuses {
1577        ClusterReplicaStatuses(BTreeMap::new())
1578    }
1579
1580    /// Initializes the statuses of the specified cluster.
1581    ///
1582    /// Panics if the cluster statuses are already initialized.
1583    pub(crate) fn initialize_cluster_statuses(&mut self, cluster_id: ClusterId) {
1584        let prev = self.0.insert(cluster_id, BTreeMap::new());
1585        assert_eq!(
1586            prev, None,
1587            "cluster {cluster_id} statuses already initialized"
1588        );
1589    }
1590
1591    /// Initializes the statuses of the specified cluster replica.
1592    ///
1593    /// Panics if the cluster replica statuses are already initialized.
1594    pub(crate) fn initialize_cluster_replica_statuses(
1595        &mut self,
1596        cluster_id: ClusterId,
1597        replica_id: ReplicaId,
1598        num_processes: usize,
1599        time: DateTime<Utc>,
1600    ) {
1601        tracing::info!(
1602            ?cluster_id,
1603            ?replica_id,
1604            ?time,
1605            "initializing cluster replica status"
1606        );
1607        let replica_statuses = self.0.entry(cluster_id).or_default();
1608        let process_statuses = (0..num_processes)
1609            .map(|process_id| {
1610                let status = ClusterReplicaProcessStatus {
1611                    status: ClusterStatus::Offline(Some(OfflineReason::Initializing)),
1612                    time: time.clone(),
1613                };
1614                (u64::cast_from(process_id), status)
1615            })
1616            .collect();
1617        let prev = replica_statuses.insert(replica_id, process_statuses);
1618        assert_none!(
1619            prev,
1620            "cluster replica {cluster_id}.{replica_id} statuses already initialized"
1621        );
1622    }
1623
1624    /// Removes the statuses of the specified cluster.
1625    ///
1626    /// Panics if the cluster does not exist.
1627    pub(crate) fn remove_cluster_statuses(
1628        &mut self,
1629        cluster_id: &ClusterId,
1630    ) -> BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>> {
1631        let prev = self.0.remove(cluster_id);
1632        prev.unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"))
1633    }
1634
1635    /// Removes the statuses of the specified cluster replica.
1636    ///
1637    /// Panics if the cluster or replica does not exist.
1638    pub(crate) fn remove_cluster_replica_statuses(
1639        &mut self,
1640        cluster_id: &ClusterId,
1641        replica_id: &ReplicaId,
1642    ) -> BTreeMap<ProcessId, ClusterReplicaProcessStatus> {
1643        let replica_statuses = self
1644            .0
1645            .get_mut(cluster_id)
1646            .unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"));
1647        let prev = replica_statuses.remove(replica_id);
1648        prev.unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1649    }
1650
1651    /// Inserts or updates the status of the specified cluster replica process.
1652    ///
1653    /// Panics if the cluster or replica does not exist.
1654    pub(crate) fn ensure_cluster_status(
1655        &mut self,
1656        cluster_id: ClusterId,
1657        replica_id: ReplicaId,
1658        process_id: ProcessId,
1659        status: ClusterReplicaProcessStatus,
1660    ) {
1661        let replica_statuses = self
1662            .0
1663            .get_mut(&cluster_id)
1664            .unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"))
1665            .get_mut(&replica_id)
1666            .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"));
1667        replica_statuses.insert(process_id, status);
1668    }
1669
1670    /// Computes the status of the cluster replica as a whole.
1671    ///
1672    /// Panics if `cluster_id` or `replica_id` don't exist.
1673    pub fn get_cluster_replica_status(
1674        &self,
1675        cluster_id: ClusterId,
1676        replica_id: ReplicaId,
1677    ) -> ClusterStatus {
1678        let process_status = self.get_cluster_replica_statuses(cluster_id, replica_id);
1679        Self::cluster_replica_status(process_status)
1680    }
1681
1682    /// Computes the status of the cluster replica as a whole.
1683    pub fn cluster_replica_status(
1684        process_status: &BTreeMap<ProcessId, ClusterReplicaProcessStatus>,
1685    ) -> ClusterStatus {
1686        process_status
1687            .values()
1688            .fold(ClusterStatus::Online, |s, p| match (s, p.status) {
1689                (ClusterStatus::Online, ClusterStatus::Online) => ClusterStatus::Online,
1690                (x, y) => {
1691                    let reason_x = match x {
1692                        ClusterStatus::Offline(reason) => reason,
1693                        ClusterStatus::Online => None,
1694                    };
1695                    let reason_y = match y {
1696                        ClusterStatus::Offline(reason) => reason,
1697                        ClusterStatus::Online => None,
1698                    };
1699                    // Arbitrarily pick the first known not-ready reason.
1700                    ClusterStatus::Offline(reason_x.or(reason_y))
1701                }
1702            })
1703    }
1704
1705    /// Gets the statuses of the given cluster replica.
1706    ///
1707    /// Panics if the cluster or replica does not exist
1708    pub(crate) fn get_cluster_replica_statuses(
1709        &self,
1710        cluster_id: ClusterId,
1711        replica_id: ReplicaId,
1712    ) -> &BTreeMap<ProcessId, ClusterReplicaProcessStatus> {
1713        self.try_get_cluster_replica_statuses(cluster_id, replica_id)
1714            .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1715    }
1716
1717    /// Gets the statuses of the given cluster replica.
1718    pub(crate) fn try_get_cluster_replica_statuses(
1719        &self,
1720        cluster_id: ClusterId,
1721        replica_id: ReplicaId,
1722    ) -> Option<&BTreeMap<ProcessId, ClusterReplicaProcessStatus>> {
1723        self.try_get_cluster_statuses(cluster_id)
1724            .and_then(|statuses| statuses.get(&replica_id))
1725    }
1726
1727    /// Gets the statuses of the given cluster.
1728    pub(crate) fn try_get_cluster_statuses(
1729        &self,
1730        cluster_id: ClusterId,
1731    ) -> Option<&BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>>> {
1732        self.0.get(&cluster_id)
1733    }
1734}
1735
1736/// Glues the external world to the Timely workers.
1737#[derive(Derivative)]
1738#[derivative(Debug)]
1739pub struct Coordinator {
1740    /// The controller for the storage and compute layers.
1741    #[derivative(Debug = "ignore")]
1742    controller: mz_controller::Controller,
1743    /// The catalog in an Arc suitable for readonly references. The Arc allows
1744    /// us to hand out cheap copies of the catalog to functions that can use it
1745    /// off of the main coordinator thread. If the coordinator needs to mutate
1746    /// the catalog, call [`Self::catalog_mut`], which will clone this struct member,
1747    /// allowing it to be mutated here while the other off-thread references can
1748    /// read their catalog as long as needed. In the future we would like this
1749    /// to be a pTVC, but for now this is sufficient.
1750    catalog: Arc<Catalog>,
1751
1752    /// A client for persist. Initially, this is only used for reading stashed
1753    /// peek responses out of batches.
1754    persist_client: PersistClient,
1755
1756    /// Channel to manage internal commands from the coordinator to itself.
1757    internal_cmd_tx: mpsc::UnboundedSender<Message>,
1758    /// Notification that triggers a group commit.
1759    group_commit_tx: appends::GroupCommitNotifier,
1760
1761    /// Channel for strict serializable reads ready to commit.
1762    strict_serializable_reads_tx: mpsc::UnboundedSender<(ConnectionId, PendingReadTxn)>,
1763
1764    /// Mechanism for totally ordering write and read timestamps, so that all reads
1765    /// reflect exactly the set of writes that precede them, and no writes that follow.
1766    global_timelines: BTreeMap<Timeline, TimelineState<Timestamp>>,
1767
1768    /// A generator for transient [`GlobalId`]s, shareable with other threads.
1769    transient_id_gen: Arc<TransientIdGen>,
1770    /// A map from connection ID to metadata about that connection for all
1771    /// active connections.
1772    active_conns: BTreeMap<ConnectionId, ConnMeta>,
1773
1774    /// For each transaction, the read holds taken to support any performed reads.
1775    ///
1776    /// Upon completing a transaction, these read holds should be dropped.
1777    txn_read_holds: BTreeMap<ConnectionId, read_policy::ReadHolds<Timestamp>>,
1778
1779    /// Access to the peek fields should be restricted to methods in the [`peek`] API.
1780    /// A map from pending peek ids to the queue into which responses are sent, and
1781    /// the connection id of the client that initiated the peek.
1782    pending_peeks: BTreeMap<Uuid, PendingPeek>,
1783    /// A map from client connection ids to a set of all pending peeks for that client.
1784    client_pending_peeks: BTreeMap<ConnectionId, BTreeMap<Uuid, ClusterId>>,
1785
1786    /// A map from client connection ids to pending linearize read transaction.
1787    pending_linearize_read_txns: BTreeMap<ConnectionId, PendingReadTxn>,
1788
1789    /// A map from the compute sink ID to it's state description.
1790    active_compute_sinks: BTreeMap<GlobalId, ActiveComputeSink>,
1791    /// A map from active webhooks to their invalidation handle.
1792    active_webhooks: BTreeMap<CatalogItemId, WebhookAppenderInvalidator>,
1793    /// A map of active `COPY FROM` statements. The Coordinator waits for `clusterd`
1794    /// to stage Batches in Persist that we will then link into the shard.
1795    active_copies: BTreeMap<ConnectionId, ActiveCopyFrom>,
1796
1797    /// A map from connection ids to a watch channel that is set to `true` if the connection
1798    /// received a cancel request.
1799    staged_cancellation: BTreeMap<ConnectionId, (watch::Sender<bool>, watch::Receiver<bool>)>,
1800    /// Active introspection subscribes.
1801    introspection_subscribes: BTreeMap<GlobalId, IntrospectionSubscribe>,
1802
1803    /// Locks that grant access to a specific object, populated lazily as objects are written to.
1804    write_locks: BTreeMap<CatalogItemId, Arc<tokio::sync::Mutex<()>>>,
1805    /// Plans that are currently deferred and waiting on a write lock.
1806    deferred_write_ops: BTreeMap<ConnectionId, DeferredOp>,
1807
1808    /// Pending writes waiting for a group commit.
1809    pending_writes: Vec<PendingWriteTxn>,
1810
1811    /// For the realtime timeline, an explicit SELECT or INSERT on a table will bump the
1812    /// table's timestamps, but there are cases where timestamps are not bumped but
1813    /// we expect the closed timestamps to advance (`AS OF X`, SUBSCRIBing views over
1814    /// RT sources and tables). To address these, spawn a task that forces table
1815    /// timestamps to close on a regular interval. This roughly tracks the behavior
1816    /// of realtime sources that close off timestamps on an interval.
1817    ///
1818    /// For non-realtime timelines, nothing pushes the timestamps forward, so we must do
1819    /// it manually.
1820    advance_timelines_interval: Interval,
1821
1822    /// Serialized DDL. DDL must be serialized because:
1823    /// - Many of them do off-thread work and need to verify the catalog is in a valid state, but
1824    ///   [`PlanValidity`] does not currently support tracking all changes. Doing that correctly
1825    ///   seems to be more difficult than it's worth, so we would instead re-plan and re-sequence
1826    ///   the statements.
1827    /// - Re-planning a statement is hard because Coordinator and Session state is mutated at
1828    ///   various points, and we would need to correctly reset those changes before re-planning and
1829    ///   re-sequencing.
1830    serialized_ddl: LockedVecDeque<DeferredPlanStatement>,
1831
1832    /// Handle to secret manager that can create and delete secrets from
1833    /// an arbitrary secret storage engine.
1834    secrets_controller: Arc<dyn SecretsController>,
1835    /// A secrets reader than maintains an in-memory cache, where values have a set TTL.
1836    caching_secrets_reader: CachingSecretsReader,
1837
1838    /// Handle to a manager that can create and delete kubernetes resources
1839    /// (ie: VpcEndpoint objects)
1840    cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
1841
1842    /// Persist client for fetching storage metadata such as size metrics.
1843    storage_usage_client: StorageUsageClient,
1844    /// The interval at which to collect storage usage information.
1845    storage_usage_collection_interval: Duration,
1846
1847    /// Segment analytics client.
1848    #[derivative(Debug = "ignore")]
1849    segment_client: Option<mz_segment::Client>,
1850
1851    /// Coordinator metrics.
1852    metrics: Metrics,
1853    /// Optimizer metrics.
1854    optimizer_metrics: OptimizerMetrics,
1855
1856    /// Tracing handle.
1857    tracing_handle: TracingHandle,
1858
1859    /// Data used by the statement logging feature.
1860    statement_logging: StatementLogging,
1861
1862    /// Limit for how many concurrent webhook requests we allow.
1863    webhook_concurrency_limit: WebhookConcurrencyLimiter,
1864
1865    /// Optional config for the timestamp oracle. This is _required_ when
1866    /// a timestamp oracle backend is configured.
1867    timestamp_oracle_config: Option<TimestampOracleConfig>,
1868
1869    /// Periodically asks cluster scheduling policies to make their decisions.
1870    check_cluster_scheduling_policies_interval: Interval,
1871
1872    /// This keeps the last On/Off decision for each cluster and each scheduling policy.
1873    /// (Clusters that have been dropped or are otherwise out of scope for automatic scheduling are
1874    /// periodically cleaned up from this Map.)
1875    cluster_scheduling_decisions: BTreeMap<ClusterId, BTreeMap<&'static str, SchedulingDecision>>,
1876
1877    /// When doing 0dt upgrades/in read-only mode, periodically ask all known
1878    /// clusters/collections whether they are caught up.
1879    caught_up_check_interval: Interval,
1880
1881    /// Context needed to check whether all clusters/collections have caught up.
1882    /// Only used during 0dt deployment, while in read-only mode.
1883    caught_up_check: Option<CaughtUpCheckContext>,
1884
1885    /// Tracks the state associated with the currently installed watchsets.
1886    installed_watch_sets: BTreeMap<WatchSetId, (ConnectionId, WatchSetResponse)>,
1887
1888    /// Tracks the currently installed watchsets for each connection.
1889    connection_watch_sets: BTreeMap<ConnectionId, BTreeSet<WatchSetId>>,
1890
1891    /// Tracks the statuses of all cluster replicas.
1892    cluster_replica_statuses: ClusterReplicaStatuses,
1893
1894    /// Whether or not to start controllers in read-only mode. This is only
1895    /// meant for use during development of read-only clusters and 0dt upgrades
1896    /// and should go away once we have proper orchestration during upgrades.
1897    read_only_controllers: bool,
1898
1899    /// Updates to builtin tables that are being buffered while we are in
1900    /// read-only mode. We apply these all at once when coming out of read-only
1901    /// mode.
1902    ///
1903    /// This is a `Some` while in read-only mode and will be replaced by a
1904    /// `None` when we transition out of read-only mode and write out any
1905    /// buffered updates.
1906    buffered_builtin_table_updates: Option<Vec<BuiltinTableUpdate>>,
1907
1908    license_key: ValidatedLicenseKey,
1909}
1910
1911impl Coordinator {
1912    /// Initializes coordinator state based on the contained catalog. Must be
1913    /// called after creating the coordinator and before calling the
1914    /// `Coordinator::serve` method.
1915    #[instrument(name = "coord::bootstrap")]
1916    pub(crate) async fn bootstrap(
1917        &mut self,
1918        boot_ts: Timestamp,
1919        migrated_storage_collections_0dt: BTreeSet<CatalogItemId>,
1920        mut builtin_table_updates: Vec<BuiltinTableUpdate>,
1921        cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
1922        uncached_local_exprs: BTreeMap<GlobalId, LocalExpressions>,
1923        audit_logs_iterator: AuditLogIterator,
1924    ) -> Result<(), AdapterError> {
1925        let bootstrap_start = Instant::now();
1926        info!("startup: coordinator init: bootstrap beginning");
1927        info!("startup: coordinator init: bootstrap: preamble beginning");
1928
1929        // Initialize cluster replica statuses.
1930        // Gross iterator is to avoid partial borrow issues.
1931        let cluster_statuses: Vec<(_, Vec<_>)> = self
1932            .catalog()
1933            .clusters()
1934            .map(|cluster| {
1935                (
1936                    cluster.id(),
1937                    cluster
1938                        .replicas()
1939                        .map(|replica| {
1940                            (replica.replica_id, replica.config.location.num_processes())
1941                        })
1942                        .collect(),
1943                )
1944            })
1945            .collect();
1946        let now = self.now_datetime();
1947        for (cluster_id, replica_statuses) in cluster_statuses {
1948            self.cluster_replica_statuses
1949                .initialize_cluster_statuses(cluster_id);
1950            for (replica_id, num_processes) in replica_statuses {
1951                self.cluster_replica_statuses
1952                    .initialize_cluster_replica_statuses(
1953                        cluster_id,
1954                        replica_id,
1955                        num_processes,
1956                        now,
1957                    );
1958            }
1959        }
1960
1961        let system_config = self.catalog().system_config();
1962
1963        // Inform metrics about the initial system configuration.
1964        mz_metrics::update_dyncfg(&system_config.dyncfg_updates());
1965
1966        // Inform the controllers about their initial configuration.
1967        let compute_config = flags::compute_config(system_config);
1968        let storage_config = flags::storage_config(system_config);
1969        let scheduling_config = flags::orchestrator_scheduling_config(system_config);
1970        let dyncfg_updates = system_config.dyncfg_updates();
1971        self.controller.compute.update_configuration(compute_config);
1972        self.controller.storage.update_parameters(storage_config);
1973        self.controller
1974            .update_orchestrator_scheduling_config(scheduling_config);
1975        self.controller.update_configuration(dyncfg_updates);
1976
1977        self.validate_resource_limit_numeric(
1978            Numeric::zero(),
1979            self.current_credit_consumption_rate(),
1980            |system_vars| {
1981                self.license_key
1982                    .max_credit_consumption_rate()
1983                    .map_or_else(|| system_vars.max_credit_consumption_rate(), Numeric::from)
1984            },
1985            "cluster replica",
1986            MAX_CREDIT_CONSUMPTION_RATE.name(),
1987        )?;
1988
1989        let mut policies_to_set: BTreeMap<CompactionWindow, CollectionIdBundle> =
1990            Default::default();
1991
1992        let enable_worker_core_affinity =
1993            self.catalog().system_config().enable_worker_core_affinity();
1994        for instance in self.catalog.clusters() {
1995            self.controller.create_cluster(
1996                instance.id,
1997                ClusterConfig {
1998                    arranged_logs: instance.log_indexes.clone(),
1999                    workload_class: instance.config.workload_class.clone(),
2000                },
2001            )?;
2002            for replica in instance.replicas() {
2003                let role = instance.role();
2004                self.controller.create_replica(
2005                    instance.id,
2006                    replica.replica_id,
2007                    instance.name.clone(),
2008                    replica.name.clone(),
2009                    role,
2010                    replica.config.clone(),
2011                    enable_worker_core_affinity,
2012                )?;
2013            }
2014        }
2015
2016        info!(
2017            "startup: coordinator init: bootstrap: preamble complete in {:?}",
2018            bootstrap_start.elapsed()
2019        );
2020
2021        let init_storage_collections_start = Instant::now();
2022        info!("startup: coordinator init: bootstrap: storage collections init beginning");
2023        self.bootstrap_storage_collections(&migrated_storage_collections_0dt)
2024            .await;
2025        info!(
2026            "startup: coordinator init: bootstrap: storage collections init complete in {:?}",
2027            init_storage_collections_start.elapsed()
2028        );
2029
2030        // The storage controller knows about the introspection collections now, so we can start
2031        // sinking introspection updates in the compute controller. It makes sense to do that as
2032        // soon as possible, to avoid updates piling up in the compute controller's internal
2033        // buffers.
2034        self.controller.start_compute_introspection_sink();
2035
2036        let optimize_dataflows_start = Instant::now();
2037        info!("startup: coordinator init: bootstrap: optimize dataflow plans beginning");
2038        let entries: Vec<_> = self.catalog().entries().cloned().collect();
2039        let uncached_global_exps = self.bootstrap_dataflow_plans(&entries, cached_global_exprs)?;
2040        info!(
2041            "startup: coordinator init: bootstrap: optimize dataflow plans complete in {:?}",
2042            optimize_dataflows_start.elapsed()
2043        );
2044
2045        // We don't need to wait for the cache to update.
2046        let _fut = self.catalog().update_expression_cache(
2047            uncached_local_exprs.into_iter().collect(),
2048            uncached_global_exps.into_iter().collect(),
2049        );
2050
2051        // Select dataflow as-ofs. This step relies on the storage collections created by
2052        // `bootstrap_storage_collections` and the dataflow plans created by
2053        // `bootstrap_dataflow_plans`.
2054        let bootstrap_as_ofs_start = Instant::now();
2055        info!("startup: coordinator init: bootstrap: dataflow as-of bootstrapping beginning");
2056        let dataflow_read_holds = self.bootstrap_dataflow_as_ofs().await;
2057        info!(
2058            "startup: coordinator init: bootstrap: dataflow as-of bootstrapping complete in {:?}",
2059            bootstrap_as_ofs_start.elapsed()
2060        );
2061
2062        let postamble_start = Instant::now();
2063        info!("startup: coordinator init: bootstrap: postamble beginning");
2064
2065        let logs: BTreeSet<_> = BUILTINS::logs()
2066            .map(|log| self.catalog().resolve_builtin_log(log))
2067            .flat_map(|item_id| self.catalog().get_global_ids(&item_id))
2068            .collect();
2069
2070        let mut privatelink_connections = BTreeMap::new();
2071
2072        for entry in &entries {
2073            debug!(
2074                "coordinator init: installing {} {}",
2075                entry.item().typ(),
2076                entry.id()
2077            );
2078            let mut policy = entry.item().initial_logical_compaction_window();
2079            match entry.item() {
2080                // Currently catalog item rebuild assumes that sinks and
2081                // indexes are always built individually and does not store information
2082                // about how it was built. If we start building multiple sinks and/or indexes
2083                // using a single dataflow, we have to make sure the rebuild process re-runs
2084                // the same multiple-build dataflow.
2085                CatalogItem::Source(source) => {
2086                    // Propagate source compaction windows to subsources if needed.
2087                    if source.custom_logical_compaction_window.is_none() {
2088                        if let DataSourceDesc::IngestionExport { ingestion_id, .. } =
2089                            source.data_source
2090                        {
2091                            policy = Some(
2092                                self.catalog()
2093                                    .get_entry(&ingestion_id)
2094                                    .source()
2095                                    .expect("must be source")
2096                                    .custom_logical_compaction_window
2097                                    .unwrap_or_default(),
2098                            );
2099                        }
2100                    }
2101                    policies_to_set
2102                        .entry(policy.expect("sources have a compaction window"))
2103                        .or_insert_with(Default::default)
2104                        .storage_ids
2105                        .insert(source.global_id());
2106                }
2107                CatalogItem::Table(table) => {
2108                    policies_to_set
2109                        .entry(policy.expect("tables have a compaction window"))
2110                        .or_insert_with(Default::default)
2111                        .storage_ids
2112                        .extend(table.global_ids());
2113                }
2114                CatalogItem::Index(idx) => {
2115                    let policy_entry = policies_to_set
2116                        .entry(policy.expect("indexes have a compaction window"))
2117                        .or_insert_with(Default::default);
2118
2119                    if logs.contains(&idx.on) {
2120                        policy_entry
2121                            .compute_ids
2122                            .entry(idx.cluster_id)
2123                            .or_insert_with(BTreeSet::new)
2124                            .insert(idx.global_id());
2125                    } else {
2126                        let df_desc = self
2127                            .catalog()
2128                            .try_get_physical_plan(&idx.global_id())
2129                            .expect("added in `bootstrap_dataflow_plans`")
2130                            .clone();
2131
2132                        let df_meta = self
2133                            .catalog()
2134                            .try_get_dataflow_metainfo(&idx.global_id())
2135                            .expect("added in `bootstrap_dataflow_plans`");
2136
2137                        if self.catalog().state().system_config().enable_mz_notices() {
2138                            // Collect optimization hint updates.
2139                            self.catalog().state().pack_optimizer_notices(
2140                                &mut builtin_table_updates,
2141                                df_meta.optimizer_notices.iter(),
2142                                Diff::ONE,
2143                            );
2144                        }
2145
2146                        // What follows is morally equivalent to `self.ship_dataflow(df, idx.cluster_id)`,
2147                        // but we cannot call that as it will also downgrade the read hold on the index.
2148                        policy_entry
2149                            .compute_ids
2150                            .entry(idx.cluster_id)
2151                            .or_insert_with(Default::default)
2152                            .extend(df_desc.export_ids());
2153
2154                        self.controller
2155                            .compute
2156                            .create_dataflow(idx.cluster_id, df_desc, None)
2157                            .unwrap_or_terminate("cannot fail to create dataflows");
2158                    }
2159                }
2160                CatalogItem::View(_) => (),
2161                CatalogItem::MaterializedView(mview) => {
2162                    policies_to_set
2163                        .entry(policy.expect("materialized views have a compaction window"))
2164                        .or_insert_with(Default::default)
2165                        .storage_ids
2166                        .insert(mview.global_id_writes());
2167
2168                    let mut df_desc = self
2169                        .catalog()
2170                        .try_get_physical_plan(&mview.global_id_writes())
2171                        .expect("added in `bootstrap_dataflow_plans`")
2172                        .clone();
2173
2174                    if let Some(initial_as_of) = mview.initial_as_of.clone() {
2175                        df_desc.set_initial_as_of(initial_as_of);
2176                    }
2177
2178                    // If we have a refresh schedule that has a last refresh, then set the `until` to the last refresh.
2179                    let until = mview
2180                        .refresh_schedule
2181                        .as_ref()
2182                        .and_then(|s| s.last_refresh())
2183                        .and_then(|r| r.try_step_forward());
2184                    if let Some(until) = until {
2185                        df_desc.until.meet_assign(&Antichain::from_elem(until));
2186                    }
2187
2188                    let df_meta = self
2189                        .catalog()
2190                        .try_get_dataflow_metainfo(&mview.global_id_writes())
2191                        .expect("added in `bootstrap_dataflow_plans`");
2192
2193                    if self.catalog().state().system_config().enable_mz_notices() {
2194                        // Collect optimization hint updates.
2195                        self.catalog().state().pack_optimizer_notices(
2196                            &mut builtin_table_updates,
2197                            df_meta.optimizer_notices.iter(),
2198                            Diff::ONE,
2199                        );
2200                    }
2201
2202                    self.ship_dataflow(df_desc, mview.cluster_id, None).await;
2203
2204                    // If this is a replacement MV, it must remain read-only until the replacement
2205                    // gets applied.
2206                    if mview.replacement_target.is_none() {
2207                        self.allow_writes(mview.cluster_id, mview.global_id_writes());
2208                    }
2209                }
2210                CatalogItem::Sink(sink) => {
2211                    policies_to_set
2212                        .entry(CompactionWindow::Default)
2213                        .or_insert_with(Default::default)
2214                        .storage_ids
2215                        .insert(sink.global_id());
2216                }
2217                CatalogItem::Connection(catalog_connection) => {
2218                    if let ConnectionDetails::AwsPrivatelink(conn) = &catalog_connection.details {
2219                        privatelink_connections.insert(
2220                            entry.id(),
2221                            VpcEndpointConfig {
2222                                aws_service_name: conn.service_name.clone(),
2223                                availability_zone_ids: conn.availability_zones.clone(),
2224                            },
2225                        );
2226                    }
2227                }
2228                CatalogItem::ContinualTask(ct) => {
2229                    policies_to_set
2230                        .entry(policy.expect("continual tasks have a compaction window"))
2231                        .or_insert_with(Default::default)
2232                        .storage_ids
2233                        .insert(ct.global_id());
2234
2235                    let mut df_desc = self
2236                        .catalog()
2237                        .try_get_physical_plan(&ct.global_id())
2238                        .expect("added in `bootstrap_dataflow_plans`")
2239                        .clone();
2240
2241                    if let Some(initial_as_of) = ct.initial_as_of.clone() {
2242                        df_desc.set_initial_as_of(initial_as_of);
2243                    }
2244
2245                    let df_meta = self
2246                        .catalog()
2247                        .try_get_dataflow_metainfo(&ct.global_id())
2248                        .expect("added in `bootstrap_dataflow_plans`");
2249
2250                    if self.catalog().state().system_config().enable_mz_notices() {
2251                        // Collect optimization hint updates.
2252                        self.catalog().state().pack_optimizer_notices(
2253                            &mut builtin_table_updates,
2254                            df_meta.optimizer_notices.iter(),
2255                            Diff::ONE,
2256                        );
2257                    }
2258
2259                    self.ship_dataflow(df_desc, ct.cluster_id, None).await;
2260                    self.allow_writes(ct.cluster_id, ct.global_id());
2261                }
2262                // Nothing to do for these cases
2263                CatalogItem::Log(_)
2264                | CatalogItem::Type(_)
2265                | CatalogItem::Func(_)
2266                | CatalogItem::Secret(_) => {}
2267            }
2268        }
2269
2270        if let Some(cloud_resource_controller) = &self.cloud_resource_controller {
2271            // Clean up any extraneous VpcEndpoints that shouldn't exist.
2272            let existing_vpc_endpoints = cloud_resource_controller
2273                .list_vpc_endpoints()
2274                .await
2275                .context("list vpc endpoints")?;
2276            let existing_vpc_endpoints = BTreeSet::from_iter(existing_vpc_endpoints.into_keys());
2277            let desired_vpc_endpoints = privatelink_connections.keys().cloned().collect();
2278            let vpc_endpoints_to_remove = existing_vpc_endpoints.difference(&desired_vpc_endpoints);
2279            for id in vpc_endpoints_to_remove {
2280                cloud_resource_controller
2281                    .delete_vpc_endpoint(*id)
2282                    .await
2283                    .context("deleting extraneous vpc endpoint")?;
2284            }
2285
2286            // Ensure desired VpcEndpoints are up to date.
2287            for (id, spec) in privatelink_connections {
2288                cloud_resource_controller
2289                    .ensure_vpc_endpoint(id, spec)
2290                    .await
2291                    .context("ensuring vpc endpoint")?;
2292            }
2293        }
2294
2295        // Having installed all entries, creating all constraints, we can now drop read holds and
2296        // relax read policies.
2297        drop(dataflow_read_holds);
2298        // TODO -- Improve `initialize_read_policies` API so we can avoid calling this in a loop.
2299        for (cw, policies) in policies_to_set {
2300            self.initialize_read_policies(&policies, cw).await;
2301        }
2302
2303        // Expose mapping from T-shirt sizes to actual sizes
2304        builtin_table_updates.extend(
2305            self.catalog().state().resolve_builtin_table_updates(
2306                self.catalog().state().pack_all_replica_size_updates(),
2307            ),
2308        );
2309
2310        debug!("startup: coordinator init: bootstrap: initializing migrated builtin tables");
2311        // When 0dt is enabled, we create new shards for any migrated builtin storage collections.
2312        // In read-only mode, the migrated builtin tables (which are a subset of migrated builtin
2313        // storage collections) need to be back-filled so that any dependent dataflow can be
2314        // hydrated. Additionally, these shards are not registered with the txn-shard, and cannot
2315        // be registered while in read-only, so they are written to directly.
2316        let migrated_updates_fut = if self.controller.read_only() {
2317            let min_timestamp = Timestamp::minimum();
2318            let migrated_builtin_table_updates: Vec<_> = builtin_table_updates
2319                .extract_if(.., |update| {
2320                    let gid = self.catalog().get_entry(&update.id).latest_global_id();
2321                    migrated_storage_collections_0dt.contains(&update.id)
2322                        && self
2323                            .controller
2324                            .storage_collections
2325                            .collection_frontiers(gid)
2326                            .expect("all tables are registered")
2327                            .write_frontier
2328                            .elements()
2329                            == &[min_timestamp]
2330                })
2331                .collect();
2332            if migrated_builtin_table_updates.is_empty() {
2333                futures::future::ready(()).boxed()
2334            } else {
2335                // Group all updates per-table.
2336                let mut grouped_appends: BTreeMap<GlobalId, Vec<TableData>> = BTreeMap::new();
2337                for update in migrated_builtin_table_updates {
2338                    let gid = self.catalog().get_entry(&update.id).latest_global_id();
2339                    grouped_appends.entry(gid).or_default().push(update.data);
2340                }
2341                info!(
2342                    "coordinator init: rehydrating migrated builtin tables in read-only mode: {:?}",
2343                    grouped_appends.keys().collect::<Vec<_>>()
2344                );
2345
2346                // Consolidate Row data, staged batches must already be consolidated.
2347                let mut all_appends = Vec::with_capacity(grouped_appends.len());
2348                for (item_id, table_data) in grouped_appends.into_iter() {
2349                    let mut all_rows = Vec::new();
2350                    let mut all_data = Vec::new();
2351                    for data in table_data {
2352                        match data {
2353                            TableData::Rows(rows) => all_rows.extend(rows),
2354                            TableData::Batches(_) => all_data.push(data),
2355                        }
2356                    }
2357                    differential_dataflow::consolidation::consolidate(&mut all_rows);
2358                    all_data.push(TableData::Rows(all_rows));
2359
2360                    // TODO(parkmycar): Use SmallVec throughout.
2361                    all_appends.push((item_id, all_data));
2362                }
2363
2364                let fut = self
2365                    .controller
2366                    .storage
2367                    .append_table(min_timestamp, boot_ts.step_forward(), all_appends)
2368                    .expect("cannot fail to append");
2369                async {
2370                    fut.await
2371                        .expect("One-shot shouldn't be dropped during bootstrap")
2372                        .unwrap_or_terminate("cannot fail to append")
2373                }
2374                .boxed()
2375            }
2376        } else {
2377            futures::future::ready(()).boxed()
2378        };
2379
2380        info!(
2381            "startup: coordinator init: bootstrap: postamble complete in {:?}",
2382            postamble_start.elapsed()
2383        );
2384
2385        let builtin_update_start = Instant::now();
2386        info!("startup: coordinator init: bootstrap: generate builtin updates beginning");
2387
2388        if self.controller.read_only() {
2389            info!(
2390                "coordinator init: bootstrap: stashing builtin table updates while in read-only mode"
2391            );
2392
2393            // TODO(jkosh44) Optimize deserializing the audit log in read-only mode.
2394            let audit_join_start = Instant::now();
2395            info!("startup: coordinator init: bootstrap: audit log deserialization beginning");
2396            let audit_log_updates: Vec<_> = audit_logs_iterator
2397                .map(|(audit_log, ts)| StateUpdate {
2398                    kind: StateUpdateKind::AuditLog(audit_log),
2399                    ts,
2400                    diff: StateDiff::Addition,
2401                })
2402                .collect();
2403            let audit_log_builtin_table_updates = self
2404                .catalog()
2405                .state()
2406                .generate_builtin_table_updates(audit_log_updates);
2407            builtin_table_updates.extend(audit_log_builtin_table_updates);
2408            info!(
2409                "startup: coordinator init: bootstrap: audit log deserialization complete in {:?}",
2410                audit_join_start.elapsed()
2411            );
2412            self.buffered_builtin_table_updates
2413                .as_mut()
2414                .expect("in read-only mode")
2415                .append(&mut builtin_table_updates);
2416        } else {
2417            self.bootstrap_tables(&entries, builtin_table_updates, audit_logs_iterator)
2418                .await;
2419        };
2420        info!(
2421            "startup: coordinator init: bootstrap: generate builtin updates complete in {:?}",
2422            builtin_update_start.elapsed()
2423        );
2424
2425        let cleanup_secrets_start = Instant::now();
2426        info!("startup: coordinator init: bootstrap: generate secret cleanup beginning");
2427        // Cleanup orphaned secrets. Errors during list() or delete() do not
2428        // need to prevent bootstrap from succeeding; we will retry next
2429        // startup.
2430        {
2431            // Destructure Self so we can selectively move fields into the async
2432            // task.
2433            let Self {
2434                secrets_controller,
2435                catalog,
2436                ..
2437            } = self;
2438
2439            let next_user_item_id = catalog.get_next_user_item_id().await?;
2440            let next_system_item_id = catalog.get_next_system_item_id().await?;
2441            let read_only = self.controller.read_only();
2442            // Fetch all IDs from the catalog to future-proof against other
2443            // things using secrets. Today, SECRET and CONNECTION objects use
2444            // secrets_controller.ensure, but more things could in the future
2445            // that would be easy to miss adding here.
2446            let catalog_ids: BTreeSet<CatalogItemId> =
2447                catalog.entries().map(|entry| entry.id()).collect();
2448            let secrets_controller = Arc::clone(secrets_controller);
2449
2450            spawn(|| "cleanup-orphaned-secrets", async move {
2451                if read_only {
2452                    info!(
2453                        "coordinator init: not cleaning up orphaned secrets while in read-only mode"
2454                    );
2455                    return;
2456                }
2457                info!("coordinator init: cleaning up orphaned secrets");
2458
2459                match secrets_controller.list().await {
2460                    Ok(controller_secrets) => {
2461                        let controller_secrets: BTreeSet<CatalogItemId> =
2462                            controller_secrets.into_iter().collect();
2463                        let orphaned = controller_secrets.difference(&catalog_ids);
2464                        for id in orphaned {
2465                            let id_too_large = match id {
2466                                CatalogItemId::System(id) => *id >= next_system_item_id,
2467                                CatalogItemId::User(id) => *id >= next_user_item_id,
2468                                CatalogItemId::IntrospectionSourceIndex(_)
2469                                | CatalogItemId::Transient(_) => false,
2470                            };
2471                            if id_too_large {
2472                                info!(
2473                                    %next_user_item_id, %next_system_item_id,
2474                                    "coordinator init: not deleting orphaned secret {id} that was likely created by a newer deploy generation"
2475                                );
2476                            } else {
2477                                info!("coordinator init: deleting orphaned secret {id}");
2478                                fail_point!("orphan_secrets");
2479                                if let Err(e) = secrets_controller.delete(*id).await {
2480                                    warn!(
2481                                        "Dropping orphaned secret has encountered an error: {}",
2482                                        e
2483                                    );
2484                                }
2485                            }
2486                        }
2487                    }
2488                    Err(e) => warn!("Failed to list secrets during orphan cleanup: {:?}", e),
2489                }
2490            });
2491        }
2492        info!(
2493            "startup: coordinator init: bootstrap: generate secret cleanup complete in {:?}",
2494            cleanup_secrets_start.elapsed()
2495        );
2496
2497        // Run all of our final steps concurrently.
2498        let final_steps_start = Instant::now();
2499        info!(
2500            "startup: coordinator init: bootstrap: migrate builtin tables in read-only mode beginning"
2501        );
2502        migrated_updates_fut
2503            .instrument(info_span!("coord::bootstrap::final"))
2504            .await;
2505
2506        debug!(
2507            "startup: coordinator init: bootstrap: announcing completion of initialization to controller"
2508        );
2509        // Announce the completion of initialization.
2510        self.controller.initialization_complete();
2511
2512        // Initialize unified introspection.
2513        self.bootstrap_introspection_subscribes().await;
2514
2515        info!(
2516            "startup: coordinator init: bootstrap: migrate builtin tables in read-only mode complete in {:?}",
2517            final_steps_start.elapsed()
2518        );
2519
2520        info!(
2521            "startup: coordinator init: bootstrap complete in {:?}",
2522            bootstrap_start.elapsed()
2523        );
2524        Ok(())
2525    }
2526
2527    /// Prepares tables for writing by resetting them to a known state and
2528    /// appending the given builtin table updates. The timestamp oracle
2529    /// will be advanced to the write timestamp of the append when this
2530    /// method returns.
2531    #[allow(clippy::async_yields_async)]
2532    #[instrument]
2533    async fn bootstrap_tables(
2534        &mut self,
2535        entries: &[CatalogEntry],
2536        mut builtin_table_updates: Vec<BuiltinTableUpdate>,
2537        audit_logs_iterator: AuditLogIterator,
2538    ) {
2539        /// Smaller helper struct of metadata for bootstrapping tables.
2540        struct TableMetadata<'a> {
2541            id: CatalogItemId,
2542            name: &'a QualifiedItemName,
2543            table: &'a Table,
2544        }
2545
2546        // Filter our entries down to just tables.
2547        let table_metas: Vec<_> = entries
2548            .into_iter()
2549            .filter_map(|entry| {
2550                entry.table().map(|table| TableMetadata {
2551                    id: entry.id(),
2552                    name: entry.name(),
2553                    table,
2554                })
2555            })
2556            .collect();
2557
2558        // Append empty batches to advance the timestamp of all tables.
2559        debug!("coordinator init: advancing all tables to current timestamp");
2560        let WriteTimestamp {
2561            timestamp: write_ts,
2562            advance_to,
2563        } = self.get_local_write_ts().await;
2564        let appends = table_metas
2565            .iter()
2566            .map(|meta| (meta.table.global_id_writes(), Vec::new()))
2567            .collect();
2568        // Append the tables in the background. We apply the write timestamp before getting a read
2569        // timestamp and reading a snapshot of each table, so the snapshots will block on their own
2570        // until the appends are complete.
2571        let table_fence_rx = self
2572            .controller
2573            .storage
2574            .append_table(write_ts.clone(), advance_to, appends)
2575            .expect("invalid updates");
2576
2577        self.apply_local_write(write_ts).await;
2578
2579        // Add builtin table updates the clear the contents of all system tables
2580        debug!("coordinator init: resetting system tables");
2581        let read_ts = self.get_local_read_ts().await;
2582
2583        // Filter out the 'mz_storage_usage_by_shard' table since we need to retain that info for
2584        // billing purposes.
2585        let mz_storage_usage_by_shard_schema: SchemaSpecifier = self
2586            .catalog()
2587            .resolve_system_schema(MZ_STORAGE_USAGE_BY_SHARD.schema)
2588            .into();
2589        let is_storage_usage_by_shard = |meta: &TableMetadata| -> bool {
2590            meta.name.item == MZ_STORAGE_USAGE_BY_SHARD.name
2591                && meta.name.qualifiers.schema_spec == mz_storage_usage_by_shard_schema
2592        };
2593
2594        let mut retraction_tasks = Vec::new();
2595        let mut system_tables: Vec<_> = table_metas
2596            .iter()
2597            .filter(|meta| meta.id.is_system() && !is_storage_usage_by_shard(meta))
2598            .collect();
2599
2600        // Special case audit events because it's append only.
2601        let (audit_events_idx, _) = system_tables
2602            .iter()
2603            .find_position(|table| {
2604                table.id == self.catalog().resolve_builtin_table(&MZ_AUDIT_EVENTS)
2605            })
2606            .expect("mz_audit_events must exist");
2607        let audit_events = system_tables.remove(audit_events_idx);
2608        let audit_log_task = self.bootstrap_audit_log_table(
2609            audit_events.id,
2610            audit_events.name,
2611            audit_events.table,
2612            audit_logs_iterator,
2613            read_ts,
2614        );
2615
2616        for system_table in system_tables {
2617            let table_id = system_table.id;
2618            let full_name = self.catalog().resolve_full_name(system_table.name, None);
2619            debug!("coordinator init: resetting system table {full_name} ({table_id})");
2620
2621            // Fetch the current contents of the table for retraction.
2622            let snapshot_fut = self
2623                .controller
2624                .storage_collections
2625                .snapshot_cursor(system_table.table.global_id_writes(), read_ts);
2626            let batch_fut = self
2627                .controller
2628                .storage_collections
2629                .create_update_builder(system_table.table.global_id_writes());
2630
2631            let task = spawn(|| format!("snapshot-{table_id}"), async move {
2632                // Create a TimestamplessUpdateBuilder.
2633                let mut batch = batch_fut
2634                    .await
2635                    .unwrap_or_terminate("cannot fail to create a batch for a BuiltinTable");
2636                tracing::info!(?table_id, "starting snapshot");
2637                // Get a cursor which will emit a consolidated snapshot.
2638                let mut snapshot_cursor = snapshot_fut
2639                    .await
2640                    .unwrap_or_terminate("cannot fail to snapshot");
2641
2642                // Retract the current contents, spilling into our builder.
2643                while let Some(values) = snapshot_cursor.next().await {
2644                    for (key, _t, d) in values {
2645                        let d_invert = d.neg();
2646                        batch.add(&key, &(), &d_invert).await;
2647                    }
2648                }
2649                tracing::info!(?table_id, "finished snapshot");
2650
2651                let batch = batch.finish().await;
2652                BuiltinTableUpdate::batch(table_id, batch)
2653            });
2654            retraction_tasks.push(task);
2655        }
2656
2657        let retractions_res = futures::future::join_all(retraction_tasks).await;
2658        for retractions in retractions_res {
2659            builtin_table_updates.push(retractions);
2660        }
2661
2662        let audit_join_start = Instant::now();
2663        info!("startup: coordinator init: bootstrap: join audit log deserialization beginning");
2664        let audit_log_updates = audit_log_task.await;
2665        let audit_log_builtin_table_updates = self
2666            .catalog()
2667            .state()
2668            .generate_builtin_table_updates(audit_log_updates);
2669        builtin_table_updates.extend(audit_log_builtin_table_updates);
2670        info!(
2671            "startup: coordinator init: bootstrap: join audit log deserialization complete in {:?}",
2672            audit_join_start.elapsed()
2673        );
2674
2675        // Now that the snapshots are complete, the appends must also be complete.
2676        table_fence_rx
2677            .await
2678            .expect("One-shot shouldn't be dropped during bootstrap")
2679            .unwrap_or_terminate("cannot fail to append");
2680
2681        info!("coordinator init: sending builtin table updates");
2682        let (_builtin_updates_fut, write_ts) = self
2683            .builtin_table_update()
2684            .execute(builtin_table_updates)
2685            .await;
2686        info!(?write_ts, "our write ts");
2687        if let Some(write_ts) = write_ts {
2688            self.apply_local_write(write_ts).await;
2689        }
2690    }
2691
2692    /// Prepare updates to the audit log table. The audit log table append only and very large, so
2693    /// we only need to find the events present in `audit_logs_iterator` but not in the audit log
2694    /// table.
2695    #[instrument]
2696    fn bootstrap_audit_log_table<'a>(
2697        &self,
2698        table_id: CatalogItemId,
2699        name: &'a QualifiedItemName,
2700        table: &'a Table,
2701        audit_logs_iterator: AuditLogIterator,
2702        read_ts: Timestamp,
2703    ) -> JoinHandle<Vec<StateUpdate>> {
2704        let full_name = self.catalog().resolve_full_name(name, None);
2705        debug!("coordinator init: reconciling audit log: {full_name} ({table_id})");
2706        let current_contents_fut = self
2707            .controller
2708            .storage_collections
2709            .snapshot(table.global_id_writes(), read_ts);
2710        spawn(|| format!("snapshot-audit-log-{table_id}"), async move {
2711            let current_contents = current_contents_fut
2712                .await
2713                .unwrap_or_terminate("cannot fail to fetch snapshot");
2714            let contents_len = current_contents.len();
2715            debug!("coordinator init: audit log table ({table_id}) size {contents_len}");
2716
2717            // Fetch the largest audit log event ID that has been written to the table.
2718            let max_table_id = current_contents
2719                .into_iter()
2720                .filter(|(_, diff)| *diff == 1)
2721                .map(|(row, _diff)| row.unpack_first().unwrap_uint64())
2722                .sorted()
2723                .rev()
2724                .next();
2725
2726            // Filter audit log catalog updates to those that are not present in the table.
2727            audit_logs_iterator
2728                .take_while(|(audit_log, _)| match max_table_id {
2729                    Some(id) => audit_log.event.sortable_id() > id,
2730                    None => true,
2731                })
2732                .map(|(audit_log, ts)| StateUpdate {
2733                    kind: StateUpdateKind::AuditLog(audit_log),
2734                    ts,
2735                    diff: StateDiff::Addition,
2736                })
2737                .collect::<Vec<_>>()
2738        })
2739    }
2740
2741    /// Initializes all storage collections required by catalog objects in the storage controller.
2742    ///
2743    /// This method takes care of collection creation, as well as migration of existing
2744    /// collections.
2745    ///
2746    /// Creating all storage collections in a single `create_collections` call, rather than on
2747    /// demand, is more efficient as it reduces the number of writes to durable storage. It also
2748    /// allows subsequent bootstrap logic to fetch metadata (such as frontiers) of arbitrary
2749    /// storage collections, without needing to worry about dependency order.
2750    ///
2751    /// `migrated_storage_collections` is a set of builtin storage collections that have been
2752    /// migrated and should be handled specially.
2753    #[instrument]
2754    async fn bootstrap_storage_collections(
2755        &mut self,
2756        migrated_storage_collections: &BTreeSet<CatalogItemId>,
2757    ) {
2758        let catalog = self.catalog();
2759
2760        let source_desc = |object_id: GlobalId,
2761                           data_source: &DataSourceDesc,
2762                           desc: &RelationDesc,
2763                           timeline: &Timeline| {
2764            let data_source = match data_source.clone() {
2765                // Re-announce the source description.
2766                DataSourceDesc::Ingestion { desc, cluster_id } => {
2767                    let desc = desc.into_inline_connection(catalog.state());
2768                    let ingestion = IngestionDescription::new(desc, cluster_id, object_id);
2769                    DataSource::Ingestion(ingestion)
2770                }
2771                DataSourceDesc::OldSyntaxIngestion {
2772                    desc,
2773                    progress_subsource,
2774                    data_config,
2775                    details,
2776                    cluster_id,
2777                } => {
2778                    let desc = desc.into_inline_connection(catalog.state());
2779                    let data_config = data_config.into_inline_connection(catalog.state());
2780                    // TODO(parkmycar): We should probably check the type here, but I'm not sure if
2781                    // this will always be a Source or a Table.
2782                    let progress_subsource =
2783                        catalog.get_entry(&progress_subsource).latest_global_id();
2784                    let mut ingestion =
2785                        IngestionDescription::new(desc, cluster_id, progress_subsource);
2786                    let legacy_export = SourceExport {
2787                        storage_metadata: (),
2788                        data_config,
2789                        details,
2790                    };
2791                    ingestion.source_exports.insert(object_id, legacy_export);
2792
2793                    DataSource::Ingestion(ingestion)
2794                }
2795                DataSourceDesc::IngestionExport {
2796                    ingestion_id,
2797                    external_reference: _,
2798                    details,
2799                    data_config,
2800                } => {
2801                    // TODO(parkmycar): We should probably check the type here, but I'm not sure if
2802                    // this will always be a Source or a Table.
2803                    let ingestion_id = catalog.get_entry(&ingestion_id).latest_global_id();
2804
2805                    DataSource::IngestionExport {
2806                        ingestion_id,
2807                        details,
2808                        data_config: data_config.into_inline_connection(catalog.state()),
2809                    }
2810                }
2811                DataSourceDesc::Webhook { .. } => DataSource::Webhook,
2812                DataSourceDesc::Progress => DataSource::Progress,
2813                DataSourceDesc::Introspection(introspection) => {
2814                    DataSource::Introspection(introspection)
2815                }
2816            };
2817            CollectionDescription {
2818                desc: desc.clone(),
2819                data_source,
2820                since: None,
2821                timeline: Some(timeline.clone()),
2822                primary: None,
2823            }
2824        };
2825
2826        let mut compute_collections = vec![];
2827        let mut collections = vec![];
2828        let mut new_builtin_continual_tasks = vec![];
2829        for entry in catalog.entries() {
2830            match entry.item() {
2831                CatalogItem::Source(source) => {
2832                    collections.push((
2833                        source.global_id(),
2834                        source_desc(
2835                            source.global_id(),
2836                            &source.data_source,
2837                            &source.desc,
2838                            &source.timeline,
2839                        ),
2840                    ));
2841                }
2842                CatalogItem::Table(table) => {
2843                    match &table.data_source {
2844                        TableDataSource::TableWrites { defaults: _ } => {
2845                            let versions: BTreeMap<_, _> = table
2846                                .collection_descs()
2847                                .map(|(gid, version, desc)| (version, (gid, desc)))
2848                                .collect();
2849                            let collection_descs = versions.iter().map(|(version, (gid, desc))| {
2850                                let next_version = version.bump();
2851                                let primary_collection =
2852                                    versions.get(&next_version).map(|(gid, _desc)| gid).copied();
2853                                let mut collection_desc =
2854                                    CollectionDescription::for_table(desc.clone());
2855                                collection_desc.primary = primary_collection;
2856
2857                                (*gid, collection_desc)
2858                            });
2859                            collections.extend(collection_descs);
2860                        }
2861                        TableDataSource::DataSource {
2862                            desc: data_source_desc,
2863                            timeline,
2864                        } => {
2865                            // TODO(alter_table): Support versioning tables that read from sources.
2866                            soft_assert_eq_or_log!(table.collections.len(), 1);
2867                            let collection_descs =
2868                                table.collection_descs().map(|(gid, _version, desc)| {
2869                                    (
2870                                        gid,
2871                                        source_desc(
2872                                            entry.latest_global_id(),
2873                                            data_source_desc,
2874                                            &desc,
2875                                            timeline,
2876                                        ),
2877                                    )
2878                                });
2879                            collections.extend(collection_descs);
2880                        }
2881                    };
2882                }
2883                CatalogItem::MaterializedView(mv) => {
2884                    let collection_descs = mv.collection_descs().map(|(gid, _version, desc)| {
2885                        let collection_desc =
2886                            CollectionDescription::for_other(desc, mv.initial_as_of.clone());
2887                        (gid, collection_desc)
2888                    });
2889
2890                    collections.extend(collection_descs);
2891                    compute_collections.push((mv.global_id_writes(), mv.desc.latest()));
2892                }
2893                CatalogItem::ContinualTask(ct) => {
2894                    let collection_desc =
2895                        CollectionDescription::for_other(ct.desc.clone(), ct.initial_as_of.clone());
2896                    if ct.global_id().is_system() && collection_desc.since.is_none() {
2897                        // We need a non-0 since to make as_of selection work. Fill it in below with
2898                        // the `bootstrap_builtin_continual_tasks` call, which can only be run after
2899                        // `create_collections_for_bootstrap`.
2900                        new_builtin_continual_tasks.push((ct.global_id(), collection_desc));
2901                    } else {
2902                        compute_collections.push((ct.global_id(), ct.desc.clone()));
2903                        collections.push((ct.global_id(), collection_desc));
2904                    }
2905                }
2906                CatalogItem::Sink(sink) => {
2907                    let storage_sink_from_entry = self.catalog().get_entry_by_global_id(&sink.from);
2908                    let from_desc = storage_sink_from_entry
2909                        .relation_desc()
2910                        .expect("sinks can only be built on items with descs")
2911                        .into_owned();
2912                    let collection_desc = CollectionDescription {
2913                        // TODO(sinks): make generic once we have more than one sink type.
2914                        desc: KAFKA_PROGRESS_DESC.clone(),
2915                        data_source: DataSource::Sink {
2916                            desc: ExportDescription {
2917                                sink: StorageSinkDesc {
2918                                    from: sink.from,
2919                                    from_desc,
2920                                    connection: sink
2921                                        .connection
2922                                        .clone()
2923                                        .into_inline_connection(self.catalog().state()),
2924                                    envelope: sink.envelope,
2925                                    as_of: Antichain::from_elem(Timestamp::minimum()),
2926                                    with_snapshot: sink.with_snapshot,
2927                                    version: sink.version,
2928                                    from_storage_metadata: (),
2929                                    to_storage_metadata: (),
2930                                    commit_interval: sink.commit_interval,
2931                                },
2932                                instance_id: sink.cluster_id,
2933                            },
2934                        },
2935                        since: None,
2936                        timeline: None,
2937                        primary: None,
2938                    };
2939                    collections.push((sink.global_id, collection_desc));
2940                }
2941                _ => (),
2942            }
2943        }
2944
2945        let register_ts = if self.controller.read_only() {
2946            self.get_local_read_ts().await
2947        } else {
2948            // Getting a write timestamp bumps the write timestamp in the
2949            // oracle, which we're not allowed in read-only mode.
2950            self.get_local_write_ts().await.timestamp
2951        };
2952
2953        let storage_metadata = self.catalog.state().storage_metadata();
2954        let migrated_storage_collections = migrated_storage_collections
2955            .into_iter()
2956            .flat_map(|item_id| self.catalog.get_entry(item_id).global_ids())
2957            .collect();
2958
2959        // Before possibly creating collections, make sure their schemas are correct.
2960        //
2961        // Across different versions of Materialize the nullability of columns can change based on
2962        // updates to our optimizer.
2963        self.controller
2964            .storage
2965            .evolve_nullability_for_bootstrap(storage_metadata, compute_collections)
2966            .await
2967            .unwrap_or_terminate("cannot fail to evolve collections");
2968
2969        self.controller
2970            .storage
2971            .create_collections_for_bootstrap(
2972                storage_metadata,
2973                Some(register_ts),
2974                collections,
2975                &migrated_storage_collections,
2976            )
2977            .await
2978            .unwrap_or_terminate("cannot fail to create collections");
2979
2980        self.bootstrap_builtin_continual_tasks(new_builtin_continual_tasks)
2981            .await;
2982
2983        if !self.controller.read_only() {
2984            self.apply_local_write(register_ts).await;
2985        }
2986    }
2987
2988    /// Make as_of selection happy for builtin CTs. Ideally we'd write the
2989    /// initial as_of down in the durable catalog, but that's hard because of
2990    /// boot ordering. Instead, we set the since of the storage collection to
2991    /// something that's a reasonable lower bound for the as_of. Then, if the
2992    /// upper is 0, the as_of selection code will allow us to jump it forward to
2993    /// this since.
2994    async fn bootstrap_builtin_continual_tasks(
2995        &mut self,
2996        // TODO(alter_table): Switch to CatalogItemId.
2997        mut collections: Vec<(GlobalId, CollectionDescription<Timestamp>)>,
2998    ) {
2999        for (id, collection) in &mut collections {
3000            let entry = self.catalog.get_entry_by_global_id(id);
3001            let ct = match &entry.item {
3002                CatalogItem::ContinualTask(ct) => ct.clone(),
3003                _ => unreachable!("only called with continual task builtins"),
3004            };
3005            let debug_name = self
3006                .catalog()
3007                .resolve_full_name(entry.name(), None)
3008                .to_string();
3009            let (_optimized_plan, physical_plan, _metainfo) = self
3010                .optimize_create_continual_task(&ct, *id, self.owned_catalog(), debug_name)
3011                .expect("builtin CT should optimize successfully");
3012
3013            // Determine an as of for the new continual task.
3014            let mut id_bundle = dataflow_import_id_bundle(&physical_plan, ct.cluster_id);
3015            // Can't acquire a read hold on ourselves because we don't exist yet.
3016            id_bundle.storage_ids.remove(id);
3017            let read_holds = self.acquire_read_holds(&id_bundle);
3018            let as_of = read_holds.least_valid_read();
3019
3020            collection.since = Some(as_of.clone());
3021        }
3022        self.controller
3023            .storage
3024            .create_collections(self.catalog.state().storage_metadata(), None, collections)
3025            .await
3026            .unwrap_or_terminate("cannot fail to create collections");
3027    }
3028
3029    /// Invokes the optimizer on all indexes and materialized views in the catalog and inserts the
3030    /// resulting dataflow plans into the catalog state.
3031    ///
3032    /// `ordered_catalog_entries` must be sorted in dependency order, with dependencies ordered
3033    /// before their dependants.
3034    ///
3035    /// This method does not perform timestamp selection for the dataflows, nor does it create them
3036    /// in the compute controller. Both of these steps happen later during bootstrapping.
3037    ///
3038    /// Returns a map of expressions that were not cached.
3039    #[instrument]
3040    fn bootstrap_dataflow_plans(
3041        &mut self,
3042        ordered_catalog_entries: &[CatalogEntry],
3043        mut cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
3044    ) -> Result<BTreeMap<GlobalId, GlobalExpressions>, AdapterError> {
3045        // The optimizer expects to be able to query its `ComputeInstanceSnapshot` for
3046        // collections the current dataflow can depend on. But since we don't yet install anything
3047        // on compute instances, the snapshot information is incomplete. We fix that by manually
3048        // updating `ComputeInstanceSnapshot` objects to ensure they contain collections previously
3049        // optimized.
3050        let mut instance_snapshots = BTreeMap::new();
3051        let mut uncached_expressions = BTreeMap::new();
3052
3053        let optimizer_config = OptimizerConfig::from(self.catalog().system_config());
3054
3055        for entry in ordered_catalog_entries {
3056            match entry.item() {
3057                CatalogItem::Index(idx) => {
3058                    // Collect optimizer parameters.
3059                    let compute_instance =
3060                        instance_snapshots.entry(idx.cluster_id).or_insert_with(|| {
3061                            self.instance_snapshot(idx.cluster_id)
3062                                .expect("compute instance exists")
3063                        });
3064                    let global_id = idx.global_id();
3065
3066                    // The index may already be installed on the compute instance. For example,
3067                    // this is the case for introspection indexes.
3068                    if compute_instance.contains_collection(&global_id) {
3069                        continue;
3070                    }
3071
3072                    let (optimized_plan, physical_plan, metainfo) =
3073                        match cached_global_exprs.remove(&global_id) {
3074                            Some(global_expressions)
3075                                if global_expressions.optimizer_features
3076                                    == optimizer_config.features =>
3077                            {
3078                                debug!("global expression cache hit for {global_id:?}");
3079                                (
3080                                    global_expressions.global_mir,
3081                                    global_expressions.physical_plan,
3082                                    global_expressions.dataflow_metainfos,
3083                                )
3084                            }
3085                            Some(_) | None => {
3086                                let (optimized_plan, global_lir_plan) = {
3087                                    // Build an optimizer for this INDEX.
3088                                    let mut optimizer = optimize::index::Optimizer::new(
3089                                        self.owned_catalog(),
3090                                        compute_instance.clone(),
3091                                        global_id,
3092                                        optimizer_config.clone(),
3093                                        self.optimizer_metrics(),
3094                                    );
3095
3096                                    // MIR ⇒ MIR optimization (global)
3097                                    let index_plan = optimize::index::Index::new(
3098                                        entry.name().clone(),
3099                                        idx.on,
3100                                        idx.keys.to_vec(),
3101                                    );
3102                                    let global_mir_plan = optimizer.optimize(index_plan)?;
3103                                    let optimized_plan = global_mir_plan.df_desc().clone();
3104
3105                                    // MIR ⇒ LIR lowering and LIR ⇒ LIR optimization (global)
3106                                    let global_lir_plan = optimizer.optimize(global_mir_plan)?;
3107
3108                                    (optimized_plan, global_lir_plan)
3109                                };
3110
3111                                let (physical_plan, metainfo) = global_lir_plan.unapply();
3112                                let metainfo = {
3113                                    // Pre-allocate a vector of transient GlobalIds for each notice.
3114                                    let notice_ids =
3115                                        std::iter::repeat_with(|| self.allocate_transient_id())
3116                                            .map(|(_item_id, gid)| gid)
3117                                            .take(metainfo.optimizer_notices.len())
3118                                            .collect::<Vec<_>>();
3119                                    // Return a metainfo with rendered notices.
3120                                    self.catalog().render_notices(
3121                                        metainfo,
3122                                        notice_ids,
3123                                        Some(idx.global_id()),
3124                                    )
3125                                };
3126                                uncached_expressions.insert(
3127                                    global_id,
3128                                    GlobalExpressions {
3129                                        global_mir: optimized_plan.clone(),
3130                                        physical_plan: physical_plan.clone(),
3131                                        dataflow_metainfos: metainfo.clone(),
3132                                        optimizer_features: OptimizerFeatures::from(
3133                                            self.catalog().system_config(),
3134                                        ),
3135                                    },
3136                                );
3137                                (optimized_plan, physical_plan, metainfo)
3138                            }
3139                        };
3140
3141                    let catalog = self.catalog_mut();
3142                    catalog.set_optimized_plan(idx.global_id(), optimized_plan);
3143                    catalog.set_physical_plan(idx.global_id(), physical_plan);
3144                    catalog.set_dataflow_metainfo(idx.global_id(), metainfo);
3145
3146                    compute_instance.insert_collection(idx.global_id());
3147                }
3148                CatalogItem::MaterializedView(mv) => {
3149                    // Collect optimizer parameters.
3150                    let compute_instance =
3151                        instance_snapshots.entry(mv.cluster_id).or_insert_with(|| {
3152                            self.instance_snapshot(mv.cluster_id)
3153                                .expect("compute instance exists")
3154                        });
3155                    let global_id = mv.global_id_writes();
3156
3157                    let (optimized_plan, physical_plan, metainfo) =
3158                        match cached_global_exprs.remove(&global_id) {
3159                            Some(global_expressions)
3160                                if global_expressions.optimizer_features
3161                                    == optimizer_config.features =>
3162                            {
3163                                debug!("global expression cache hit for {global_id:?}");
3164                                (
3165                                    global_expressions.global_mir,
3166                                    global_expressions.physical_plan,
3167                                    global_expressions.dataflow_metainfos,
3168                                )
3169                            }
3170                            Some(_) | None => {
3171                                let (_, internal_view_id) = self.allocate_transient_id();
3172                                let debug_name = self
3173                                    .catalog()
3174                                    .resolve_full_name(entry.name(), None)
3175                                    .to_string();
3176                                let force_non_monotonic = Default::default();
3177
3178                                let (optimized_plan, global_lir_plan) = {
3179                                    // Build an optimizer for this MATERIALIZED VIEW.
3180                                    let mut optimizer = optimize::materialized_view::Optimizer::new(
3181                                        self.owned_catalog().as_optimizer_catalog(),
3182                                        compute_instance.clone(),
3183                                        global_id,
3184                                        internal_view_id,
3185                                        mv.desc.latest().iter_names().cloned().collect(),
3186                                        mv.non_null_assertions.clone(),
3187                                        mv.refresh_schedule.clone(),
3188                                        debug_name,
3189                                        optimizer_config.clone(),
3190                                        self.optimizer_metrics(),
3191                                        force_non_monotonic,
3192                                    );
3193
3194                                    // MIR ⇒ MIR optimization (global)
3195                                    // We make sure to use the HIR SQL type (since MIR SQL types may not be coherent).
3196                                    let typ = infer_sql_type_for_catalog(
3197                                        &mv.raw_expr,
3198                                        &mv.optimized_expr.as_ref().clone(),
3199                                    );
3200                                    let global_mir_plan = optimizer
3201                                        .optimize((mv.optimized_expr.as_ref().clone(), typ))?;
3202                                    let optimized_plan = global_mir_plan.df_desc().clone();
3203
3204                                    // MIR ⇒ LIR lowering and LIR ⇒ LIR optimization (global)
3205                                    let global_lir_plan = optimizer.optimize(global_mir_plan)?;
3206
3207                                    (optimized_plan, global_lir_plan)
3208                                };
3209
3210                                let (physical_plan, metainfo) = global_lir_plan.unapply();
3211                                let metainfo = {
3212                                    // Pre-allocate a vector of transient GlobalIds for each notice.
3213                                    let notice_ids =
3214                                        std::iter::repeat_with(|| self.allocate_transient_id())
3215                                            .map(|(_item_id, global_id)| global_id)
3216                                            .take(metainfo.optimizer_notices.len())
3217                                            .collect::<Vec<_>>();
3218                                    // Return a metainfo with rendered notices.
3219                                    self.catalog().render_notices(
3220                                        metainfo,
3221                                        notice_ids,
3222                                        Some(mv.global_id_writes()),
3223                                    )
3224                                };
3225                                uncached_expressions.insert(
3226                                    global_id,
3227                                    GlobalExpressions {
3228                                        global_mir: optimized_plan.clone(),
3229                                        physical_plan: physical_plan.clone(),
3230                                        dataflow_metainfos: metainfo.clone(),
3231                                        optimizer_features: OptimizerFeatures::from(
3232                                            self.catalog().system_config(),
3233                                        ),
3234                                    },
3235                                );
3236                                (optimized_plan, physical_plan, metainfo)
3237                            }
3238                        };
3239
3240                    let catalog = self.catalog_mut();
3241                    catalog.set_optimized_plan(mv.global_id_writes(), optimized_plan);
3242                    catalog.set_physical_plan(mv.global_id_writes(), physical_plan);
3243                    catalog.set_dataflow_metainfo(mv.global_id_writes(), metainfo);
3244
3245                    compute_instance.insert_collection(mv.global_id_writes());
3246                }
3247                CatalogItem::ContinualTask(ct) => {
3248                    let compute_instance =
3249                        instance_snapshots.entry(ct.cluster_id).or_insert_with(|| {
3250                            self.instance_snapshot(ct.cluster_id)
3251                                .expect("compute instance exists")
3252                        });
3253                    let global_id = ct.global_id();
3254
3255                    let (optimized_plan, physical_plan, metainfo) =
3256                        match cached_global_exprs.remove(&global_id) {
3257                            Some(global_expressions)
3258                                if global_expressions.optimizer_features
3259                                    == optimizer_config.features =>
3260                            {
3261                                debug!("global expression cache hit for {global_id:?}");
3262                                (
3263                                    global_expressions.global_mir,
3264                                    global_expressions.physical_plan,
3265                                    global_expressions.dataflow_metainfos,
3266                                )
3267                            }
3268                            Some(_) | None => {
3269                                let debug_name = self
3270                                    .catalog()
3271                                    .resolve_full_name(entry.name(), None)
3272                                    .to_string();
3273                                let (optimized_plan, physical_plan, metainfo) = self
3274                                    .optimize_create_continual_task(
3275                                        ct,
3276                                        global_id,
3277                                        self.owned_catalog(),
3278                                        debug_name,
3279                                    )?;
3280                                uncached_expressions.insert(
3281                                    global_id,
3282                                    GlobalExpressions {
3283                                        global_mir: optimized_plan.clone(),
3284                                        physical_plan: physical_plan.clone(),
3285                                        dataflow_metainfos: metainfo.clone(),
3286                                        optimizer_features: OptimizerFeatures::from(
3287                                            self.catalog().system_config(),
3288                                        ),
3289                                    },
3290                                );
3291                                (optimized_plan, physical_plan, metainfo)
3292                            }
3293                        };
3294
3295                    let catalog = self.catalog_mut();
3296                    catalog.set_optimized_plan(ct.global_id(), optimized_plan);
3297                    catalog.set_physical_plan(ct.global_id(), physical_plan);
3298                    catalog.set_dataflow_metainfo(ct.global_id(), metainfo);
3299
3300                    compute_instance.insert_collection(ct.global_id());
3301                }
3302                _ => (),
3303            }
3304        }
3305
3306        Ok(uncached_expressions)
3307    }
3308
3309    /// Selects for each compute dataflow an as-of suitable for bootstrapping it.
3310    ///
3311    /// Returns a set of [`ReadHold`]s that ensures the read frontiers of involved collections stay
3312    /// in place and that must not be dropped before all compute dataflows have been created with
3313    /// the compute controller.
3314    ///
3315    /// This method expects all storage collections and dataflow plans to be available, so it must
3316    /// run after [`Coordinator::bootstrap_storage_collections`] and
3317    /// [`Coordinator::bootstrap_dataflow_plans`].
3318    async fn bootstrap_dataflow_as_ofs(&mut self) -> BTreeMap<GlobalId, ReadHold<Timestamp>> {
3319        let mut catalog_ids = Vec::new();
3320        let mut dataflows = Vec::new();
3321        let mut read_policies = BTreeMap::new();
3322        for entry in self.catalog.entries() {
3323            let gid = match entry.item() {
3324                CatalogItem::Index(idx) => idx.global_id(),
3325                CatalogItem::MaterializedView(mv) => mv.global_id_writes(),
3326                CatalogItem::ContinualTask(ct) => ct.global_id(),
3327                CatalogItem::Table(_)
3328                | CatalogItem::Source(_)
3329                | CatalogItem::Log(_)
3330                | CatalogItem::View(_)
3331                | CatalogItem::Sink(_)
3332                | CatalogItem::Type(_)
3333                | CatalogItem::Func(_)
3334                | CatalogItem::Secret(_)
3335                | CatalogItem::Connection(_) => continue,
3336            };
3337            if let Some(plan) = self.catalog.try_get_physical_plan(&gid) {
3338                catalog_ids.push(gid);
3339                dataflows.push(plan.clone());
3340
3341                if let Some(compaction_window) = entry.item().initial_logical_compaction_window() {
3342                    read_policies.insert(gid, compaction_window.into());
3343                }
3344            }
3345        }
3346
3347        let read_ts = self.get_local_read_ts().await;
3348        let read_holds = as_of_selection::run(
3349            &mut dataflows,
3350            &read_policies,
3351            &*self.controller.storage_collections,
3352            read_ts,
3353            self.controller.read_only(),
3354        );
3355
3356        let catalog = self.catalog_mut();
3357        for (id, plan) in catalog_ids.into_iter().zip_eq(dataflows) {
3358            catalog.set_physical_plan(id, plan);
3359        }
3360
3361        read_holds
3362    }
3363
3364    /// Serves the coordinator, receiving commands from users over `cmd_rx`
3365    /// and feedback from dataflow workers over `feedback_rx`.
3366    ///
3367    /// You must call `bootstrap` before calling this method.
3368    ///
3369    /// BOXED FUTURE: As of Nov 2023 the returned Future from this function was 92KB. This would
3370    /// get stored on the stack which is bad for runtime performance, and blow up our stack usage.
3371    /// Because of that we purposefully move this Future onto the heap (i.e. Box it).
3372    fn serve(
3373        mut self,
3374        mut internal_cmd_rx: mpsc::UnboundedReceiver<Message>,
3375        mut strict_serializable_reads_rx: mpsc::UnboundedReceiver<(ConnectionId, PendingReadTxn)>,
3376        mut cmd_rx: mpsc::UnboundedReceiver<(OpenTelemetryContext, Command)>,
3377        group_commit_rx: appends::GroupCommitWaiter,
3378    ) -> LocalBoxFuture<'static, ()> {
3379        async move {
3380            // Watcher that listens for and reports cluster service status changes.
3381            let mut cluster_events = self.controller.events_stream();
3382            let last_message = Arc::new(Mutex::new(LastMessage {
3383                kind: "none",
3384                stmt: None,
3385            }));
3386
3387            let (idle_tx, mut idle_rx) = tokio::sync::mpsc::channel(1);
3388            let idle_metric = self.metrics.queue_busy_seconds.clone();
3389            let last_message_watchdog = Arc::clone(&last_message);
3390
3391            spawn(|| "coord watchdog", async move {
3392                // Every 5 seconds, attempt to measure how long it takes for the
3393                // coord select loop to be empty, because this message is the last
3394                // processed. If it is idle, this will result in some microseconds
3395                // of measurement.
3396                let mut interval = tokio::time::interval(Duration::from_secs(5));
3397                // If we end up having to wait more than 5 seconds for the coord to respond, then the
3398                // behavior of Delay results in the interval "restarting" from whenever we yield
3399                // instead of trying to catch up.
3400                interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
3401
3402                // Track if we become stuck to de-dupe error reporting.
3403                let mut coord_stuck = false;
3404
3405                loop {
3406                    interval.tick().await;
3407
3408                    // Wait for space in the channel, if we timeout then the coordinator is stuck!
3409                    let duration = tokio::time::Duration::from_secs(30);
3410                    let timeout = tokio::time::timeout(duration, idle_tx.reserve()).await;
3411                    let Ok(maybe_permit) = timeout else {
3412                        // Only log if we're newly stuck, to prevent logging repeatedly.
3413                        if !coord_stuck {
3414                            let last_message = last_message_watchdog.lock().expect("poisoned");
3415                            tracing::warn!(
3416                                last_message_kind = %last_message.kind,
3417                                last_message_sql = %last_message.stmt_to_string(),
3418                                "coordinator stuck for {duration:?}",
3419                            );
3420                        }
3421                        coord_stuck = true;
3422
3423                        continue;
3424                    };
3425
3426                    // We got a permit, we're not stuck!
3427                    if coord_stuck {
3428                        tracing::info!("Coordinator became unstuck");
3429                    }
3430                    coord_stuck = false;
3431
3432                    // If we failed to acquire a permit it's because we're shutting down.
3433                    let Ok(permit) = maybe_permit else {
3434                        break;
3435                    };
3436
3437                    permit.send(idle_metric.start_timer());
3438                }
3439            });
3440
3441            self.schedule_storage_usage_collection().await;
3442            self.spawn_privatelink_vpc_endpoints_watch_task();
3443            self.spawn_statement_logging_task();
3444            flags::tracing_config(self.catalog.system_config()).apply(&self.tracing_handle);
3445
3446            // Report if the handling of a single message takes longer than this threshold.
3447            let warn_threshold = self
3448                .catalog()
3449                .system_config()
3450                .coord_slow_message_warn_threshold();
3451
3452            // How many messages we'd like to batch up before processing them. Must be > 0.
3453            const MESSAGE_BATCH: usize = 64;
3454            let mut messages = Vec::with_capacity(MESSAGE_BATCH);
3455            let mut cmd_messages = Vec::with_capacity(MESSAGE_BATCH);
3456
3457            let message_batch = self.metrics.message_batch.clone();
3458
3459            loop {
3460                // Before adding a branch to this select loop, please ensure that the branch is
3461                // cancellation safe and add a comment explaining why. You can refer here for more
3462                // info: https://docs.rs/tokio/latest/tokio/macro.select.html#cancellation-safety
3463                select! {
3464                    // We prioritize internal commands over other commands. However, we work through
3465                    // batches of commands in some branches of this select, which means that even if
3466                    // a command generates internal commands, we will work through the current batch
3467                    // before receiving a new batch of commands.
3468                    biased;
3469
3470                    // `recv_many()` on `UnboundedReceiver` is cancellation safe:
3471                    // https://docs.rs/tokio/1.38.0/tokio/sync/mpsc/struct.UnboundedReceiver.html#cancel-safety-1
3472                    // Receive a batch of commands.
3473                    _ = internal_cmd_rx.recv_many(&mut messages, MESSAGE_BATCH) => {},
3474                    // `next()` on any stream is cancel-safe:
3475                    // https://docs.rs/tokio-stream/0.1.9/tokio_stream/trait.StreamExt.html#cancel-safety
3476                    // Receive a single command.
3477                    Some(event) = cluster_events.next() => {
3478                        messages.push(Message::ClusterEvent(event))
3479                    },
3480                    // See [`mz_controller::Controller::Controller::ready`] for notes
3481                    // on why this is cancel-safe.
3482                    // Receive a single command.
3483                    () = self.controller.ready() => {
3484                        // NOTE: We don't get a `Readiness` back from `ready()`
3485                        // because the controller wants to keep it and it's not
3486                        // trivially `Clone` or `Copy`. Hence this accessor.
3487                        let controller = match self.controller.get_readiness() {
3488                            Readiness::Storage => ControllerReadiness::Storage,
3489                            Readiness::Compute => ControllerReadiness::Compute,
3490                            Readiness::Metrics(_) => ControllerReadiness::Metrics,
3491                            Readiness::Internal(_) => ControllerReadiness::Internal,
3492                            Readiness::NotReady => unreachable!("just signaled as ready"),
3493                        };
3494                        messages.push(Message::ControllerReady { controller });
3495                    }
3496                    // See [`appends::GroupCommitWaiter`] for notes on why this is cancel safe.
3497                    // Receive a single command.
3498                    permit = group_commit_rx.ready() => {
3499                        // If we happen to have batched exactly one user write, use
3500                        // that span so the `emit_trace_id_notice` hooks up.
3501                        // Otherwise, the best we can do is invent a new root span
3502                        // and make it follow from all the Spans in the pending
3503                        // writes.
3504                        let user_write_spans = self.pending_writes.iter().flat_map(|x| match x {
3505                            PendingWriteTxn::User{span, ..} => Some(span),
3506                            PendingWriteTxn::System{..} => None,
3507                        });
3508                        let span = match user_write_spans.exactly_one() {
3509                            Ok(span) => span.clone(),
3510                            Err(user_write_spans) => {
3511                                let span = info_span!(parent: None, "group_commit_notify");
3512                                for s in user_write_spans {
3513                                    span.follows_from(s);
3514                                }
3515                                span
3516                            }
3517                        };
3518                        messages.push(Message::GroupCommitInitiate(span, Some(permit)));
3519                    },
3520                    // `recv_many()` on `UnboundedReceiver` is cancellation safe:
3521                    // https://docs.rs/tokio/1.38.0/tokio/sync/mpsc/struct.UnboundedReceiver.html#cancel-safety-1
3522                    // Receive a batch of commands.
3523                    count = cmd_rx.recv_many(&mut cmd_messages, MESSAGE_BATCH) => {
3524                        if count == 0 {
3525                            break;
3526                        } else {
3527                            messages.extend(cmd_messages.drain(..).map(
3528                                |(otel_ctx, cmd)| Message::Command(otel_ctx, cmd),
3529                            ));
3530                        }
3531                    },
3532                    // `recv()` on `UnboundedReceiver` is cancellation safe:
3533                    // https://docs.rs/tokio/1.38.0/tokio/sync/mpsc/struct.UnboundedReceiver.html#cancel-safety
3534                    // Receive a single command.
3535                    Some(pending_read_txn) = strict_serializable_reads_rx.recv() => {
3536                        let mut pending_read_txns = vec![pending_read_txn];
3537                        while let Ok(pending_read_txn) = strict_serializable_reads_rx.try_recv() {
3538                            pending_read_txns.push(pending_read_txn);
3539                        }
3540                        for (conn_id, pending_read_txn) in pending_read_txns {
3541                            let prev = self
3542                                .pending_linearize_read_txns
3543                                .insert(conn_id, pending_read_txn);
3544                            soft_assert_or_log!(
3545                                prev.is_none(),
3546                                "connections can not have multiple concurrent reads, prev: {prev:?}"
3547                            )
3548                        }
3549                        messages.push(Message::LinearizeReads);
3550                    }
3551                    // `tick()` on `Interval` is cancel-safe:
3552                    // https://docs.rs/tokio/1.19.2/tokio/time/struct.Interval.html#cancel-safety
3553                    // Receive a single command.
3554                    _ = self.advance_timelines_interval.tick() => {
3555                        let span = info_span!(parent: None, "coord::advance_timelines_interval");
3556                        span.follows_from(Span::current());
3557
3558                        // Group commit sends an `AdvanceTimelines` message when
3559                        // done, which is what downgrades read holds. In
3560                        // read-only mode we send this message directly because
3561                        // we're not doing group commits.
3562                        if self.controller.read_only() {
3563                            messages.push(Message::AdvanceTimelines);
3564                        } else {
3565                            messages.push(Message::GroupCommitInitiate(span, None));
3566                        }
3567                    },
3568                    // `tick()` on `Interval` is cancel-safe:
3569                    // https://docs.rs/tokio/1.19.2/tokio/time/struct.Interval.html#cancel-safety
3570                    // Receive a single command.
3571                    _ = self.check_cluster_scheduling_policies_interval.tick() => {
3572                        messages.push(Message::CheckSchedulingPolicies);
3573                    },
3574
3575                    // `tick()` on `Interval` is cancel-safe:
3576                    // https://docs.rs/tokio/1.19.2/tokio/time/struct.Interval.html#cancel-safety
3577                    // Receive a single command.
3578                    _ = self.caught_up_check_interval.tick() => {
3579                        // We do this directly on the main loop instead of
3580                        // firing off a message. We are still in read-only mode,
3581                        // so optimizing for latency, not blocking the main loop
3582                        // is not that important.
3583                        self.maybe_check_caught_up().await;
3584
3585                        continue;
3586                    },
3587
3588                    // Process the idle metric at the lowest priority to sample queue non-idle time.
3589                    // `recv()` on `Receiver` is cancellation safe:
3590                    // https://docs.rs/tokio/1.8.0/tokio/sync/mpsc/struct.Receiver.html#cancel-safety
3591                    // Receive a single command.
3592                    timer = idle_rx.recv() => {
3593                        timer.expect("does not drop").observe_duration();
3594                        self.metrics
3595                            .message_handling
3596                            .with_label_values(&["watchdog"])
3597                            .observe(0.0);
3598                        continue;
3599                    }
3600                };
3601
3602                // Observe the number of messages we're processing at once.
3603                message_batch.observe(f64::cast_lossy(messages.len()));
3604
3605                for msg in messages.drain(..) {
3606                    // All message processing functions trace. Start a parent span
3607                    // for them to make it easy to find slow messages.
3608                    let msg_kind = msg.kind();
3609                    let span = span!(
3610                        target: "mz_adapter::coord::handle_message_loop",
3611                        Level::INFO,
3612                        "coord::handle_message",
3613                        kind = msg_kind
3614                    );
3615                    let otel_context = span.context().span().span_context().clone();
3616
3617                    // Record the last kind of message in case we get stuck. For
3618                    // execute commands, we additionally stash the user's SQL,
3619                    // statement, so we can log it in case we get stuck.
3620                    *last_message.lock().expect("poisoned") = LastMessage {
3621                        kind: msg_kind,
3622                        stmt: match &msg {
3623                            Message::Command(
3624                                _,
3625                                Command::Execute {
3626                                    portal_name,
3627                                    session,
3628                                    ..
3629                                },
3630                            ) => session
3631                                .get_portal_unverified(portal_name)
3632                                .and_then(|p| p.stmt.as_ref().map(Arc::clone)),
3633                            _ => None,
3634                        },
3635                    };
3636
3637                    let start = Instant::now();
3638                    self.handle_message(msg).instrument(span).await;
3639                    let duration = start.elapsed();
3640
3641                    self.metrics
3642                        .message_handling
3643                        .with_label_values(&[msg_kind])
3644                        .observe(duration.as_secs_f64());
3645
3646                    // If something is _really_ slow, print a trace id for debugging, if OTEL is enabled.
3647                    if duration > warn_threshold {
3648                        let trace_id = otel_context.is_valid().then(|| otel_context.trace_id());
3649                        tracing::error!(
3650                            ?msg_kind,
3651                            ?trace_id,
3652                            ?duration,
3653                            "very slow coordinator message"
3654                        );
3655                    }
3656                }
3657            }
3658            // Try and cleanup as a best effort. There may be some async tasks out there holding a
3659            // reference that prevents us from cleaning up.
3660            if let Some(catalog) = Arc::into_inner(self.catalog) {
3661                catalog.expire().await;
3662            }
3663        }
3664        .boxed_local()
3665    }
3666
3667    /// Obtain a read-only Catalog reference.
3668    fn catalog(&self) -> &Catalog {
3669        &self.catalog
3670    }
3671
3672    /// Obtain a read-only Catalog snapshot, suitable for giving out to
3673    /// non-Coordinator thread tasks.
3674    fn owned_catalog(&self) -> Arc<Catalog> {
3675        Arc::clone(&self.catalog)
3676    }
3677
3678    /// Obtain a handle to the optimizer metrics, suitable for giving
3679    /// out to non-Coordinator thread tasks.
3680    fn optimizer_metrics(&self) -> OptimizerMetrics {
3681        self.optimizer_metrics.clone()
3682    }
3683
3684    /// Obtain a writeable Catalog reference.
3685    fn catalog_mut(&mut self) -> &mut Catalog {
3686        // make_mut will cause any other Arc references (from owned_catalog) to
3687        // continue to be valid by cloning the catalog, putting it in a new Arc,
3688        // which lives at self._catalog. If there are no other Arc references,
3689        // then no clone is made, and it returns a reference to the existing
3690        // object. This makes this method and owned_catalog both very cheap: at
3691        // most one clone per catalog mutation, but only if there's a read-only
3692        // reference to it.
3693        Arc::make_mut(&mut self.catalog)
3694    }
3695
3696    /// Obtain a reference to the coordinator's connection context.
3697    fn connection_context(&self) -> &ConnectionContext {
3698        self.controller.connection_context()
3699    }
3700
3701    /// Obtain a reference to the coordinator's secret reader, in an `Arc`.
3702    fn secrets_reader(&self) -> &Arc<dyn SecretsReader> {
3703        &self.connection_context().secrets_reader
3704    }
3705
3706    /// Publishes a notice message to all sessions.
3707    ///
3708    /// TODO(parkmycar): This code is dead, but is a nice parallel to [`Coordinator::broadcast_notice_tx`]
3709    /// so we keep it around.
3710    #[allow(dead_code)]
3711    pub(crate) fn broadcast_notice(&self, notice: AdapterNotice) {
3712        for meta in self.active_conns.values() {
3713            let _ = meta.notice_tx.send(notice.clone());
3714        }
3715    }
3716
3717    /// Returns a closure that will publish a notice to all sessions that were active at the time
3718    /// this method was called.
3719    pub(crate) fn broadcast_notice_tx(
3720        &self,
3721    ) -> Box<dyn FnOnce(AdapterNotice) -> () + Send + 'static> {
3722        let senders: Vec<_> = self
3723            .active_conns
3724            .values()
3725            .map(|meta| meta.notice_tx.clone())
3726            .collect();
3727        Box::new(move |notice| {
3728            for tx in senders {
3729                let _ = tx.send(notice.clone());
3730            }
3731        })
3732    }
3733
3734    pub(crate) fn active_conns(&self) -> &BTreeMap<ConnectionId, ConnMeta> {
3735        &self.active_conns
3736    }
3737
3738    #[instrument(level = "debug")]
3739    pub(crate) fn retire_execution(
3740        &mut self,
3741        reason: StatementEndedExecutionReason,
3742        ctx_extra: ExecuteContextExtra,
3743    ) {
3744        if let Some(uuid) = ctx_extra.retire() {
3745            self.end_statement_execution(uuid, reason);
3746        }
3747    }
3748
3749    /// Creates a new dataflow builder from the catalog and indexes in `self`.
3750    #[instrument(level = "debug")]
3751    pub fn dataflow_builder(&self, instance: ComputeInstanceId) -> DataflowBuilder<'_> {
3752        let compute = self
3753            .instance_snapshot(instance)
3754            .expect("compute instance does not exist");
3755        DataflowBuilder::new(self.catalog().state(), compute)
3756    }
3757
3758    /// Return a reference-less snapshot to the indicated compute instance.
3759    pub fn instance_snapshot(
3760        &self,
3761        id: ComputeInstanceId,
3762    ) -> Result<ComputeInstanceSnapshot, InstanceMissing> {
3763        ComputeInstanceSnapshot::new(&self.controller, id)
3764    }
3765
3766    /// Call into the compute controller to install a finalized dataflow, and
3767    /// initialize the read policies for its exported readable objects.
3768    ///
3769    /// # Panics
3770    ///
3771    /// Panics if dataflow creation fails.
3772    pub(crate) async fn ship_dataflow(
3773        &mut self,
3774        dataflow: DataflowDescription<Plan>,
3775        instance: ComputeInstanceId,
3776        subscribe_target_replica: Option<ReplicaId>,
3777    ) {
3778        self.try_ship_dataflow(dataflow, instance, subscribe_target_replica)
3779            .await
3780            .unwrap_or_terminate("dataflow creation cannot fail");
3781    }
3782
3783    /// Call into the compute controller to install a finalized dataflow, and
3784    /// initialize the read policies for its exported readable objects.
3785    pub(crate) async fn try_ship_dataflow(
3786        &mut self,
3787        dataflow: DataflowDescription<Plan>,
3788        instance: ComputeInstanceId,
3789        subscribe_target_replica: Option<ReplicaId>,
3790    ) -> Result<(), DataflowCreationError> {
3791        // We must only install read policies for indexes, not for sinks.
3792        // Sinks are write-only compute collections that don't have read policies.
3793        let export_ids = dataflow.exported_index_ids().collect();
3794
3795        self.controller
3796            .compute
3797            .create_dataflow(instance, dataflow, subscribe_target_replica)?;
3798
3799        self.initialize_compute_read_policies(export_ids, instance, CompactionWindow::Default)
3800            .await;
3801
3802        Ok(())
3803    }
3804
3805    /// Call into the compute controller to allow writes to the specified IDs
3806    /// from the specified instance. Calling this function multiple times and
3807    /// calling it on a read-only instance has no effect.
3808    pub(crate) fn allow_writes(&mut self, instance: ComputeInstanceId, id: GlobalId) {
3809        self.controller
3810            .compute
3811            .allow_writes(instance, id)
3812            .unwrap_or_terminate("allow_writes cannot fail");
3813    }
3814
3815    /// Like `ship_dataflow`, but also await on builtin table updates.
3816    pub(crate) async fn ship_dataflow_and_notice_builtin_table_updates(
3817        &mut self,
3818        dataflow: DataflowDescription<Plan>,
3819        instance: ComputeInstanceId,
3820        notice_builtin_updates_fut: Option<BuiltinTableAppendNotify>,
3821    ) {
3822        if let Some(notice_builtin_updates_fut) = notice_builtin_updates_fut {
3823            let ship_dataflow_fut = self.ship_dataflow(dataflow, instance, None);
3824            let ((), ()) =
3825                futures::future::join(notice_builtin_updates_fut, ship_dataflow_fut).await;
3826        } else {
3827            self.ship_dataflow(dataflow, instance, None).await;
3828        }
3829    }
3830
3831    /// Install a _watch set_ in the controller that is automatically associated with the given
3832    /// connection id. The watchset will be automatically cleared if the connection terminates
3833    /// before the watchset completes.
3834    pub fn install_compute_watch_set(
3835        &mut self,
3836        conn_id: ConnectionId,
3837        objects: BTreeSet<GlobalId>,
3838        t: Timestamp,
3839        state: WatchSetResponse,
3840    ) -> Result<(), CollectionLookupError> {
3841        let ws_id = self.controller.install_compute_watch_set(objects, t)?;
3842        self.connection_watch_sets
3843            .entry(conn_id.clone())
3844            .or_default()
3845            .insert(ws_id);
3846        self.installed_watch_sets.insert(ws_id, (conn_id, state));
3847        Ok(())
3848    }
3849
3850    /// Install a _watch set_ in the controller that is automatically associated with the given
3851    /// connection id. The watchset will be automatically cleared if the connection terminates
3852    /// before the watchset completes.
3853    pub fn install_storage_watch_set(
3854        &mut self,
3855        conn_id: ConnectionId,
3856        objects: BTreeSet<GlobalId>,
3857        t: Timestamp,
3858        state: WatchSetResponse,
3859    ) -> Result<(), CollectionMissing> {
3860        let ws_id = self.controller.install_storage_watch_set(objects, t)?;
3861        self.connection_watch_sets
3862            .entry(conn_id.clone())
3863            .or_default()
3864            .insert(ws_id);
3865        self.installed_watch_sets.insert(ws_id, (conn_id, state));
3866        Ok(())
3867    }
3868
3869    /// Cancels pending watchsets associated with the provided connection id.
3870    pub fn cancel_pending_watchsets(&mut self, conn_id: &ConnectionId) {
3871        if let Some(ws_ids) = self.connection_watch_sets.remove(conn_id) {
3872            for ws_id in ws_ids {
3873                self.installed_watch_sets.remove(&ws_id);
3874            }
3875        }
3876    }
3877
3878    /// Returns the state of the [`Coordinator`] formatted as JSON.
3879    ///
3880    /// The returned value is not guaranteed to be stable and may change at any point in time.
3881    pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
3882        // Note: We purposefully use the `Debug` formatting for the value of all fields in the
3883        // returned object as a tradeoff between usability and stability. `serde_json` will fail
3884        // to serialize an object if the keys aren't strings, so `Debug` formatting the values
3885        // prevents a future unrelated change from silently breaking this method.
3886
3887        let global_timelines: BTreeMap<_, _> = self
3888            .global_timelines
3889            .iter()
3890            .map(|(timeline, state)| (timeline.to_string(), format!("{state:?}")))
3891            .collect();
3892        let active_conns: BTreeMap<_, _> = self
3893            .active_conns
3894            .iter()
3895            .map(|(id, meta)| (id.unhandled().to_string(), format!("{meta:?}")))
3896            .collect();
3897        let txn_read_holds: BTreeMap<_, _> = self
3898            .txn_read_holds
3899            .iter()
3900            .map(|(id, capability)| (id.unhandled().to_string(), format!("{capability:?}")))
3901            .collect();
3902        let pending_peeks: BTreeMap<_, _> = self
3903            .pending_peeks
3904            .iter()
3905            .map(|(id, peek)| (id.to_string(), format!("{peek:?}")))
3906            .collect();
3907        let client_pending_peeks: BTreeMap<_, _> = self
3908            .client_pending_peeks
3909            .iter()
3910            .map(|(id, peek)| {
3911                let peek: BTreeMap<_, _> = peek
3912                    .iter()
3913                    .map(|(uuid, storage_id)| (uuid.to_string(), storage_id))
3914                    .collect();
3915                (id.to_string(), peek)
3916            })
3917            .collect();
3918        let pending_linearize_read_txns: BTreeMap<_, _> = self
3919            .pending_linearize_read_txns
3920            .iter()
3921            .map(|(id, read_txn)| (id.unhandled().to_string(), format!("{read_txn:?}")))
3922            .collect();
3923
3924        Ok(serde_json::json!({
3925            "global_timelines": global_timelines,
3926            "active_conns": active_conns,
3927            "txn_read_holds": txn_read_holds,
3928            "pending_peeks": pending_peeks,
3929            "client_pending_peeks": client_pending_peeks,
3930            "pending_linearize_read_txns": pending_linearize_read_txns,
3931            "controller": self.controller.dump().await?,
3932        }))
3933    }
3934
3935    /// Prune all storage usage events from the [`MZ_STORAGE_USAGE_BY_SHARD`] table that are older
3936    /// than `retention_period`.
3937    ///
3938    /// This method will read the entire contents of [`MZ_STORAGE_USAGE_BY_SHARD`] into memory
3939    /// which can be expensive.
3940    ///
3941    /// DO NOT call this method outside of startup. The safety of reading at the current oracle read
3942    /// timestamp and then writing at whatever the current write timestamp is (instead of
3943    /// `read_ts + 1`) relies on the fact that there are no outstanding writes during startup.
3944    ///
3945    /// Group commit, which this method uses to write the retractions, has builtin fencing, and we
3946    /// never commit retractions to [`MZ_STORAGE_USAGE_BY_SHARD`] outside of this method, which is
3947    /// only called once during startup. So we don't have to worry about double/invalid retractions.
3948    async fn prune_storage_usage_events_on_startup(&self, retention_period: Duration) {
3949        let item_id = self
3950            .catalog()
3951            .resolve_builtin_table(&MZ_STORAGE_USAGE_BY_SHARD);
3952        let global_id = self.catalog.get_entry(&item_id).latest_global_id();
3953        let read_ts = self.get_local_read_ts().await;
3954        let current_contents_fut = self
3955            .controller
3956            .storage_collections
3957            .snapshot(global_id, read_ts);
3958        let internal_cmd_tx = self.internal_cmd_tx.clone();
3959        spawn(|| "storage_usage_prune", async move {
3960            let mut current_contents = current_contents_fut
3961                .await
3962                .unwrap_or_terminate("cannot fail to fetch snapshot");
3963            differential_dataflow::consolidation::consolidate(&mut current_contents);
3964
3965            let cutoff_ts = u128::from(read_ts).saturating_sub(retention_period.as_millis());
3966            let mut expired = Vec::new();
3967            for (row, diff) in current_contents {
3968                assert_eq!(
3969                    diff, 1,
3970                    "consolidated contents should not contain retractions: ({row:#?}, {diff:#?})"
3971                );
3972                // This logic relies on the definition of `mz_storage_usage_by_shard` not changing.
3973                let collection_timestamp = row
3974                    .unpack()
3975                    .get(3)
3976                    .expect("definition of mz_storage_by_shard changed")
3977                    .unwrap_timestamptz();
3978                let collection_timestamp = collection_timestamp.timestamp_millis();
3979                let collection_timestamp: u128 = collection_timestamp
3980                    .try_into()
3981                    .expect("all collections happen after Jan 1 1970");
3982                if collection_timestamp < cutoff_ts {
3983                    debug!("pruning storage event {row:?}");
3984                    let builtin_update = BuiltinTableUpdate::row(item_id, row, Diff::MINUS_ONE);
3985                    expired.push(builtin_update);
3986                }
3987            }
3988
3989            // main thread has shut down.
3990            let _ = internal_cmd_tx.send(Message::StorageUsagePrune(expired));
3991        });
3992    }
3993
3994    fn current_credit_consumption_rate(&self) -> Numeric {
3995        self.catalog()
3996            .user_cluster_replicas()
3997            .filter_map(|replica| match &replica.config.location {
3998                ReplicaLocation::Managed(location) => Some(location.size_for_billing()),
3999                ReplicaLocation::Unmanaged(_) => None,
4000            })
4001            .map(|size| {
4002                self.catalog()
4003                    .cluster_replica_sizes()
4004                    .0
4005                    .get(size)
4006                    .expect("location size is validated against the cluster replica sizes")
4007                    .credits_per_hour
4008            })
4009            .sum()
4010    }
4011}
4012
4013#[cfg(test)]
4014impl Coordinator {
4015    #[allow(dead_code)]
4016    async fn verify_ship_dataflow_no_error(&mut self, dataflow: DataflowDescription<Plan>) {
4017        // `ship_dataflow_new` is not allowed to have a `Result` return because this function is
4018        // called after `catalog_transact`, after which no errors are allowed. This test exists to
4019        // prevent us from incorrectly teaching those functions how to return errors (which has
4020        // happened twice and is the motivation for this test).
4021
4022        // An arbitrary compute instance ID to satisfy the function calls below. Note that
4023        // this only works because this function will never run.
4024        let compute_instance = ComputeInstanceId::user(1).expect("1 is a valid ID");
4025
4026        let _: () = self.ship_dataflow(dataflow, compute_instance, None).await;
4027    }
4028}
4029
4030/// Contains information about the last message the [`Coordinator`] processed.
4031struct LastMessage {
4032    kind: &'static str,
4033    stmt: Option<Arc<Statement<Raw>>>,
4034}
4035
4036impl LastMessage {
4037    /// Returns a redacted version of the statement that is safe for logs.
4038    fn stmt_to_string(&self) -> Cow<'static, str> {
4039        self.stmt
4040            .as_ref()
4041            .map(|stmt| stmt.to_ast_string_redacted().into())
4042            .unwrap_or(Cow::Borrowed("<none>"))
4043    }
4044}
4045
4046impl fmt::Debug for LastMessage {
4047    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4048        f.debug_struct("LastMessage")
4049            .field("kind", &self.kind)
4050            .field("stmt", &self.stmt_to_string())
4051            .finish()
4052    }
4053}
4054
4055impl Drop for LastMessage {
4056    fn drop(&mut self) {
4057        // Only print the last message if we're currently panicking, otherwise we'd spam our logs.
4058        if std::thread::panicking() {
4059            // If we're panicking theres no guarantee `tracing` still works, so print to stderr.
4060            eprintln!("Coordinator panicking, dumping last message\n{self:?}",);
4061        }
4062    }
4063}
4064
4065/// Serves the coordinator based on the provided configuration.
4066///
4067/// For a high-level description of the coordinator, see the [crate
4068/// documentation](crate).
4069///
4070/// Returns a handle to the coordinator and a client to communicate with the
4071/// coordinator.
4072///
4073/// BOXED FUTURE: As of Nov 2023 the returned Future from this function was 42KB. This would
4074/// get stored on the stack which is bad for runtime performance, and blow up our stack usage.
4075/// Because of that we purposefully move this Future onto the heap (i.e. Box it).
4076pub fn serve(
4077    Config {
4078        controller_config,
4079        controller_envd_epoch,
4080        mut storage,
4081        audit_logs_iterator,
4082        timestamp_oracle_url,
4083        unsafe_mode,
4084        all_features,
4085        build_info,
4086        environment_id,
4087        metrics_registry,
4088        now,
4089        secrets_controller,
4090        cloud_resource_controller,
4091        cluster_replica_sizes,
4092        builtin_system_cluster_config,
4093        builtin_catalog_server_cluster_config,
4094        builtin_probe_cluster_config,
4095        builtin_support_cluster_config,
4096        builtin_analytics_cluster_config,
4097        system_parameter_defaults,
4098        availability_zones,
4099        storage_usage_client,
4100        storage_usage_collection_interval,
4101        storage_usage_retention_period,
4102        segment_client,
4103        egress_addresses,
4104        aws_account_id,
4105        aws_privatelink_availability_zones,
4106        connection_context,
4107        connection_limit_callback,
4108        remote_system_parameters,
4109        webhook_concurrency_limit,
4110        http_host_name,
4111        tracing_handle,
4112        read_only_controllers,
4113        caught_up_trigger: clusters_caught_up_trigger,
4114        helm_chart_version,
4115        license_key,
4116        external_login_password_mz_system,
4117        force_builtin_schema_migration,
4118    }: Config,
4119) -> BoxFuture<'static, Result<(Handle, Client), AdapterError>> {
4120    async move {
4121        let coord_start = Instant::now();
4122        info!("startup: coordinator init: beginning");
4123        info!("startup: coordinator init: preamble beginning");
4124
4125        // Initializing the builtins can be an expensive process and consume a lot of memory. We
4126        // forcibly initialize it early while the stack is relatively empty to avoid stack
4127        // overflows later.
4128        let _builtins = LazyLock::force(&BUILTINS_STATIC);
4129
4130        let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
4131        let (internal_cmd_tx, internal_cmd_rx) = mpsc::unbounded_channel();
4132        let (strict_serializable_reads_tx, strict_serializable_reads_rx) =
4133            mpsc::unbounded_channel();
4134
4135        // Validate and process availability zones.
4136        if !availability_zones.iter().all_unique() {
4137            coord_bail!("availability zones must be unique");
4138        }
4139
4140        let aws_principal_context = match (
4141            aws_account_id,
4142            connection_context.aws_external_id_prefix.clone(),
4143        ) {
4144            (Some(aws_account_id), Some(aws_external_id_prefix)) => Some(AwsPrincipalContext {
4145                aws_account_id,
4146                aws_external_id_prefix,
4147            }),
4148            _ => None,
4149        };
4150
4151        let aws_privatelink_availability_zones = aws_privatelink_availability_zones
4152            .map(|azs_vec| BTreeSet::from_iter(azs_vec.iter().cloned()));
4153
4154        info!(
4155            "startup: coordinator init: preamble complete in {:?}",
4156            coord_start.elapsed()
4157        );
4158        let oracle_init_start = Instant::now();
4159        info!("startup: coordinator init: timestamp oracle init beginning");
4160
4161        let timestamp_oracle_config = timestamp_oracle_url
4162            .map(|url| TimestampOracleConfig::from_url(&url, &metrics_registry))
4163            .transpose()?;
4164        let mut initial_timestamps =
4165            get_initial_oracle_timestamps(&timestamp_oracle_config).await?;
4166
4167        // Insert an entry for the `EpochMilliseconds` timeline if one doesn't exist,
4168        // which will ensure that the timeline is initialized since it's required
4169        // by the system.
4170        initial_timestamps
4171            .entry(Timeline::EpochMilliseconds)
4172            .or_insert_with(mz_repr::Timestamp::minimum);
4173        let mut timestamp_oracles = BTreeMap::new();
4174        for (timeline, initial_timestamp) in initial_timestamps {
4175            Coordinator::ensure_timeline_state_with_initial_time(
4176                &timeline,
4177                initial_timestamp,
4178                now.clone(),
4179                timestamp_oracle_config.clone(),
4180                &mut timestamp_oracles,
4181                read_only_controllers,
4182            )
4183            .await;
4184        }
4185
4186        // Opening the durable catalog uses one or more timestamps without communicating with
4187        // the timestamp oracle. Here we make sure to apply the catalog upper with the timestamp
4188        // oracle to linearize future operations with opening the catalog.
4189        let catalog_upper = storage.current_upper().await;
4190        // Choose a time at which to boot. This is used, for example, to prune
4191        // old storage usage data or migrate audit log entries.
4192        //
4193        // This time is usually the current system time, but with protection
4194        // against backwards time jumps, even across restarts.
4195        let epoch_millis_oracle = &timestamp_oracles
4196            .get(&Timeline::EpochMilliseconds)
4197            .expect("inserted above")
4198            .oracle;
4199
4200        let mut boot_ts = if read_only_controllers {
4201            let read_ts = epoch_millis_oracle.read_ts().await;
4202            std::cmp::max(read_ts, catalog_upper)
4203        } else {
4204            // Getting/applying a write timestamp bumps the write timestamp in the
4205            // oracle, which we're not allowed in read-only mode.
4206            epoch_millis_oracle.apply_write(catalog_upper).await;
4207            epoch_millis_oracle.write_ts().await.timestamp
4208        };
4209
4210        info!(
4211            "startup: coordinator init: timestamp oracle init complete in {:?}",
4212            oracle_init_start.elapsed()
4213        );
4214
4215        let catalog_open_start = Instant::now();
4216        info!("startup: coordinator init: catalog open beginning");
4217        let persist_client = controller_config
4218            .persist_clients
4219            .open(controller_config.persist_location.clone())
4220            .await
4221            .context("opening persist client")?;
4222        let builtin_item_migration_config =
4223            BuiltinItemMigrationConfig {
4224                persist_client: persist_client.clone(),
4225                read_only: read_only_controllers,
4226                force_migration: force_builtin_schema_migration,
4227            }
4228        ;
4229        let OpenCatalogResult {
4230            mut catalog,
4231            migrated_storage_collections_0dt,
4232            new_builtin_collections,
4233            builtin_table_updates,
4234            cached_global_exprs,
4235            uncached_local_exprs,
4236        } = Catalog::open(mz_catalog::config::Config {
4237            storage,
4238            metrics_registry: &metrics_registry,
4239            state: mz_catalog::config::StateConfig {
4240                unsafe_mode,
4241                all_features,
4242                build_info,
4243                deploy_generation: controller_config.deploy_generation,
4244                environment_id: environment_id.clone(),
4245                read_only: read_only_controllers,
4246                now: now.clone(),
4247                boot_ts: boot_ts.clone(),
4248                skip_migrations: false,
4249                cluster_replica_sizes,
4250                builtin_system_cluster_config,
4251                builtin_catalog_server_cluster_config,
4252                builtin_probe_cluster_config,
4253                builtin_support_cluster_config,
4254                builtin_analytics_cluster_config,
4255                system_parameter_defaults,
4256                remote_system_parameters,
4257                availability_zones,
4258                egress_addresses,
4259                aws_principal_context,
4260                aws_privatelink_availability_zones,
4261                connection_context,
4262                http_host_name,
4263                builtin_item_migration_config,
4264                persist_client: persist_client.clone(),
4265                enable_expression_cache_override: None,
4266                helm_chart_version,
4267                external_login_password_mz_system,
4268                license_key: license_key.clone(),
4269            },
4270        })
4271        .await?;
4272
4273        // Opening the catalog uses one or more timestamps, so push the boot timestamp up to the
4274        // current catalog upper.
4275        let catalog_upper = catalog.current_upper().await;
4276        boot_ts = std::cmp::max(boot_ts, catalog_upper);
4277
4278        if !read_only_controllers {
4279            epoch_millis_oracle.apply_write(boot_ts).await;
4280        }
4281
4282        info!(
4283            "startup: coordinator init: catalog open complete in {:?}",
4284            catalog_open_start.elapsed()
4285        );
4286
4287        let coord_thread_start = Instant::now();
4288        info!("startup: coordinator init: coordinator thread start beginning");
4289
4290        let session_id = catalog.config().session_id;
4291        let start_instant = catalog.config().start_instant;
4292
4293        // In order for the coordinator to support Rc and Refcell types, it cannot be
4294        // sent across threads. Spawn it in a thread and have this parent thread wait
4295        // for bootstrap completion before proceeding.
4296        let (bootstrap_tx, bootstrap_rx) = oneshot::channel();
4297        let handle = TokioHandle::current();
4298
4299        let metrics = Metrics::register_into(&metrics_registry);
4300        let metrics_clone = metrics.clone();
4301        let optimizer_metrics = OptimizerMetrics::register_into(
4302            &metrics_registry,
4303            catalog.system_config().optimizer_e2e_latency_warning_threshold(),
4304        );
4305        let segment_client_clone = segment_client.clone();
4306        let coord_now = now.clone();
4307        let advance_timelines_interval = tokio::time::interval(catalog.config().timestamp_interval);
4308        let mut check_scheduling_policies_interval = tokio::time::interval(
4309            catalog
4310                .system_config()
4311                .cluster_check_scheduling_policies_interval(),
4312        );
4313        check_scheduling_policies_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
4314
4315        let clusters_caught_up_check_interval = if read_only_controllers {
4316            let dyncfgs = catalog.system_config().dyncfgs();
4317            let interval = WITH_0DT_DEPLOYMENT_CAUGHT_UP_CHECK_INTERVAL.get(dyncfgs);
4318
4319            let mut interval = tokio::time::interval(interval);
4320            interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
4321            interval
4322        } else {
4323            // When not in read-only mode, we don't do hydration checks. But we
4324            // still have to provide _some_ interval. This is large enough that
4325            // it doesn't matter.
4326            //
4327            // TODO(aljoscha): We cannot use Duration::MAX right now because of
4328            // https://github.com/tokio-rs/tokio/issues/6634. Use that once it's
4329            // fixed for good.
4330            let mut interval = tokio::time::interval(Duration::from_secs(60 * 60));
4331            interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
4332            interval
4333        };
4334
4335        let clusters_caught_up_check =
4336            clusters_caught_up_trigger.map(|trigger| CaughtUpCheckContext {
4337                trigger,
4338                exclude_collections: new_builtin_collections.into_iter().collect(),
4339            });
4340
4341        if let Some(TimestampOracleConfig::Postgres(pg_config)) =
4342            timestamp_oracle_config.as_ref()
4343        {
4344            // Apply settings from system vars as early as possible because some
4345            // of them are locked in right when an oracle is first opened!
4346            let pg_timestamp_oracle_params =
4347                flags::timestamp_oracle_config(catalog.system_config());
4348            pg_timestamp_oracle_params.apply(pg_config);
4349        }
4350
4351        // Register a callback so whenever the MAX_CONNECTIONS or SUPERUSER_RESERVED_CONNECTIONS
4352        // system variables change, we update our connection limits.
4353        let connection_limit_callback: Arc<dyn Fn(&SystemVars) + Send + Sync> =
4354            Arc::new(move |system_vars: &SystemVars| {
4355                let limit: u64 = system_vars.max_connections().cast_into();
4356                let superuser_reserved: u64 =
4357                    system_vars.superuser_reserved_connections().cast_into();
4358
4359                // If superuser_reserved > max_connections, prefer max_connections.
4360                //
4361                // In this scenario all normal users would be locked out because all connections
4362                // would be reserved for superusers so complain if this is the case.
4363                let superuser_reserved = if superuser_reserved >= limit {
4364                    tracing::warn!(
4365                        "superuser_reserved ({superuser_reserved}) is greater than max connections ({limit})!"
4366                    );
4367                    limit
4368                } else {
4369                    superuser_reserved
4370                };
4371
4372                (connection_limit_callback)(limit, superuser_reserved);
4373            });
4374        catalog.system_config_mut().register_callback(
4375            &mz_sql::session::vars::MAX_CONNECTIONS,
4376            Arc::clone(&connection_limit_callback),
4377        );
4378        catalog.system_config_mut().register_callback(
4379            &mz_sql::session::vars::SUPERUSER_RESERVED_CONNECTIONS,
4380            connection_limit_callback,
4381        );
4382
4383        let (group_commit_tx, group_commit_rx) = appends::notifier();
4384
4385        let parent_span = tracing::Span::current();
4386        let thread = thread::Builder::new()
4387            // The Coordinator thread tends to keep a lot of data on its stack. To
4388            // prevent a stack overflow we allocate a stack three times as big as the default
4389            // stack.
4390            .stack_size(3 * stack::STACK_SIZE)
4391            .name("coordinator".to_string())
4392            .spawn(move || {
4393                let span = info_span!(parent: parent_span, "coord::coordinator").entered();
4394
4395                let controller = handle
4396                    .block_on({
4397                        catalog.initialize_controller(
4398                            controller_config,
4399                            controller_envd_epoch,
4400                            read_only_controllers,
4401                        )
4402                    })
4403                    .unwrap_or_terminate("failed to initialize storage_controller");
4404                // Initializing the controller uses one or more timestamps, so push the boot timestamp up to the
4405                // current catalog upper.
4406                let catalog_upper = handle.block_on(catalog.current_upper());
4407                boot_ts = std::cmp::max(boot_ts, catalog_upper);
4408                if !read_only_controllers {
4409                    let epoch_millis_oracle = &timestamp_oracles
4410                        .get(&Timeline::EpochMilliseconds)
4411                        .expect("inserted above")
4412                        .oracle;
4413                    handle.block_on(epoch_millis_oracle.apply_write(boot_ts));
4414                }
4415
4416                let catalog = Arc::new(catalog);
4417
4418                let caching_secrets_reader = CachingSecretsReader::new(secrets_controller.reader());
4419                let mut coord = Coordinator {
4420                    controller,
4421                    catalog,
4422                    internal_cmd_tx,
4423                    group_commit_tx,
4424                    strict_serializable_reads_tx,
4425                    global_timelines: timestamp_oracles,
4426                    transient_id_gen: Arc::new(TransientIdGen::new()),
4427                    active_conns: BTreeMap::new(),
4428                    txn_read_holds: Default::default(),
4429                    pending_peeks: BTreeMap::new(),
4430                    client_pending_peeks: BTreeMap::new(),
4431                    pending_linearize_read_txns: BTreeMap::new(),
4432                    serialized_ddl: LockedVecDeque::new(),
4433                    active_compute_sinks: BTreeMap::new(),
4434                    active_webhooks: BTreeMap::new(),
4435                    active_copies: BTreeMap::new(),
4436                    staged_cancellation: BTreeMap::new(),
4437                    introspection_subscribes: BTreeMap::new(),
4438                    write_locks: BTreeMap::new(),
4439                    deferred_write_ops: BTreeMap::new(),
4440                    pending_writes: Vec::new(),
4441                    advance_timelines_interval,
4442                    secrets_controller,
4443                    caching_secrets_reader,
4444                    cloud_resource_controller,
4445                    storage_usage_client,
4446                    storage_usage_collection_interval,
4447                    segment_client,
4448                    metrics,
4449                    optimizer_metrics,
4450                    tracing_handle,
4451                    statement_logging: StatementLogging::new(coord_now.clone()),
4452                    webhook_concurrency_limit,
4453                    timestamp_oracle_config,
4454                    check_cluster_scheduling_policies_interval: check_scheduling_policies_interval,
4455                    cluster_scheduling_decisions: BTreeMap::new(),
4456                    caught_up_check_interval: clusters_caught_up_check_interval,
4457                    caught_up_check: clusters_caught_up_check,
4458                    installed_watch_sets: BTreeMap::new(),
4459                    connection_watch_sets: BTreeMap::new(),
4460                    cluster_replica_statuses: ClusterReplicaStatuses::new(),
4461                    read_only_controllers,
4462                    buffered_builtin_table_updates: Some(Vec::new()),
4463                    license_key,
4464                    persist_client,
4465                };
4466                let bootstrap = handle.block_on(async {
4467                    coord
4468                        .bootstrap(
4469                            boot_ts,
4470                            migrated_storage_collections_0dt,
4471                            builtin_table_updates,
4472                            cached_global_exprs,
4473                            uncached_local_exprs,
4474                            audit_logs_iterator,
4475                        )
4476                        .await?;
4477                    coord
4478                        .controller
4479                        .remove_orphaned_replicas(
4480                            coord.catalog().get_next_user_replica_id().await?,
4481                            coord.catalog().get_next_system_replica_id().await?,
4482                        )
4483                        .await
4484                        .map_err(AdapterError::Orchestrator)?;
4485
4486                    if let Some(retention_period) = storage_usage_retention_period {
4487                        coord
4488                            .prune_storage_usage_events_on_startup(retention_period)
4489                            .await;
4490                    }
4491
4492                    Ok(())
4493                });
4494                let ok = bootstrap.is_ok();
4495                drop(span);
4496                bootstrap_tx
4497                    .send(bootstrap)
4498                    .expect("bootstrap_rx is not dropped until it receives this message");
4499                if ok {
4500                    handle.block_on(coord.serve(
4501                        internal_cmd_rx,
4502                        strict_serializable_reads_rx,
4503                        cmd_rx,
4504                        group_commit_rx,
4505                    ));
4506                }
4507            })
4508            .expect("failed to create coordinator thread");
4509        match bootstrap_rx
4510            .await
4511            .expect("bootstrap_tx always sends a message or panics/halts")
4512        {
4513            Ok(()) => {
4514                info!(
4515                    "startup: coordinator init: coordinator thread start complete in {:?}",
4516                    coord_thread_start.elapsed()
4517                );
4518                info!(
4519                    "startup: coordinator init: complete in {:?}",
4520                    coord_start.elapsed()
4521                );
4522                let handle = Handle {
4523                    session_id,
4524                    start_instant,
4525                    _thread: thread.join_on_drop(),
4526                };
4527                let client = Client::new(
4528                    build_info,
4529                    cmd_tx,
4530                    metrics_clone,
4531                    now,
4532                    environment_id,
4533                    segment_client_clone,
4534                );
4535                Ok((handle, client))
4536            }
4537            Err(e) => Err(e),
4538        }
4539    }
4540    .boxed()
4541}
4542
4543// Determines and returns the highest timestamp for each timeline, for all known
4544// timestamp oracle implementations.
4545//
4546// Initially, we did this so that we can switch between implementations of
4547// timestamp oracle, but now we also do this to determine a monotonic boot
4548// timestamp, a timestamp that does not regress across reboots.
4549//
4550// This mostly works, but there can be linearizability violations, because there
4551// is no central moment where we do distributed coordination for all oracle
4552// types. Working around this seems prohibitively hard, maybe even impossible so
4553// we have to live with this window of potential violations during the upgrade
4554// window (which is the only point where we should switch oracle
4555// implementations).
4556async fn get_initial_oracle_timestamps(
4557    timestamp_oracle_config: &Option<TimestampOracleConfig>,
4558) -> Result<BTreeMap<Timeline, Timestamp>, AdapterError> {
4559    let mut initial_timestamps = BTreeMap::new();
4560
4561    if let Some(config) = timestamp_oracle_config {
4562        let oracle_timestamps = config.get_all_timelines().await?;
4563
4564        let debug_msg = || {
4565            oracle_timestamps
4566                .iter()
4567                .map(|(timeline, ts)| format!("{:?} -> {}", timeline, ts))
4568                .join(", ")
4569        };
4570        info!(
4571            "current timestamps from the timestamp oracle: {}",
4572            debug_msg()
4573        );
4574
4575        for (timeline, ts) in oracle_timestamps {
4576            let entry = initial_timestamps
4577                .entry(Timeline::from_str(&timeline).expect("could not parse timeline"));
4578
4579            entry
4580                .and_modify(|current_ts| *current_ts = std::cmp::max(*current_ts, ts))
4581                .or_insert(ts);
4582        }
4583    } else {
4584        info!("no timestamp oracle configured!");
4585    };
4586
4587    let debug_msg = || {
4588        initial_timestamps
4589            .iter()
4590            .map(|(timeline, ts)| format!("{:?}: {}", timeline, ts))
4591            .join(", ")
4592    };
4593    info!("initial oracle timestamps: {}", debug_msg());
4594
4595    Ok(initial_timestamps)
4596}
4597
4598#[instrument]
4599pub async fn load_remote_system_parameters(
4600    storage: &mut Box<dyn OpenableDurableCatalogState>,
4601    system_parameter_sync_config: Option<SystemParameterSyncConfig>,
4602    system_parameter_sync_timeout: Duration,
4603) -> Result<Option<BTreeMap<String, String>>, AdapterError> {
4604    if let Some(system_parameter_sync_config) = system_parameter_sync_config {
4605        tracing::info!("parameter sync on boot: start sync");
4606
4607        // We intentionally block initial startup, potentially forever,
4608        // on initializing LaunchDarkly. This may seem scary, but the
4609        // alternative is even scarier. Over time, we expect that the
4610        // compiled-in default values for the system parameters will
4611        // drift substantially from the defaults configured in
4612        // LaunchDarkly, to the point that starting an environment
4613        // without loading the latest values from LaunchDarkly will
4614        // result in running an untested configuration.
4615        //
4616        // Note this only applies during initial startup. Restarting
4617        // after we've synced once only blocks for a maximum of
4618        // `FRONTEND_SYNC_TIMEOUT` on LaunchDarkly, as it seems
4619        // reasonable to assume that the last-synced configuration was
4620        // valid enough.
4621        //
4622        // This philosophy appears to provide a good balance between not
4623        // running untested configurations in production while also not
4624        // making LaunchDarkly a "tier 1" dependency for existing
4625        // environments.
4626        //
4627        // If this proves to be an issue, we could seek to address the
4628        // configuration drift in a different way--for example, by
4629        // writing a script that runs in CI nightly and checks for
4630        // deviation between the compiled Rust code and LaunchDarkly.
4631        //
4632        // If it is absolutely necessary to bring up a new environment
4633        // while LaunchDarkly is down, the following manual mitigation
4634        // can be performed:
4635        //
4636        //    1. Edit the environmentd startup parameters to omit the
4637        //       LaunchDarkly configuration.
4638        //    2. Boot environmentd.
4639        //    3. Use the catalog-debug tool to run `edit config "{\"key\":\"system_config_synced\"}" "{\"value\": 1}"`.
4640        //    4. Adjust any other parameters as necessary to avoid
4641        //       running a nonstandard configuration in production.
4642        //    5. Edit the environmentd startup parameters to restore the
4643        //       LaunchDarkly configuration, for when LaunchDarkly comes
4644        //       back online.
4645        //    6. Reboot environmentd.
4646        let mut params = SynchronizedParameters::new(SystemVars::default());
4647        let frontend_sync = async {
4648            let frontend = SystemParameterFrontend::from(&system_parameter_sync_config).await?;
4649            frontend.pull(&mut params);
4650            let ops = params
4651                .modified()
4652                .into_iter()
4653                .map(|param| {
4654                    let name = param.name;
4655                    let value = param.value;
4656                    tracing::info!(name, value, initial = true, "sync parameter");
4657                    (name, value)
4658                })
4659                .collect();
4660            tracing::info!("parameter sync on boot: end sync");
4661            Ok(Some(ops))
4662        };
4663        if !storage.has_system_config_synced_once().await? {
4664            frontend_sync.await
4665        } else {
4666            match mz_ore::future::timeout(system_parameter_sync_timeout, frontend_sync).await {
4667                Ok(ops) => Ok(ops),
4668                Err(TimeoutError::Inner(e)) => Err(e),
4669                Err(TimeoutError::DeadlineElapsed) => {
4670                    tracing::info!("parameter sync on boot: sync has timed out");
4671                    Ok(None)
4672                }
4673            }
4674        }
4675    } else {
4676        Ok(None)
4677    }
4678}
4679
4680#[derive(Debug)]
4681pub enum WatchSetResponse {
4682    StatementDependenciesReady(StatementLoggingId, StatementLifecycleEvent),
4683    AlterSinkReady(AlterSinkReadyContext),
4684    AlterMaterializedViewReady(AlterMaterializedViewReadyContext),
4685}
4686
4687#[derive(Debug)]
4688pub struct AlterSinkReadyContext {
4689    ctx: Option<ExecuteContext>,
4690    otel_ctx: OpenTelemetryContext,
4691    plan: AlterSinkPlan,
4692    plan_validity: PlanValidity,
4693    read_hold: ReadHolds<Timestamp>,
4694}
4695
4696impl AlterSinkReadyContext {
4697    fn ctx(&mut self) -> &mut ExecuteContext {
4698        self.ctx.as_mut().expect("only cleared on drop")
4699    }
4700
4701    fn retire(mut self, result: Result<ExecuteResponse, AdapterError>) {
4702        self.ctx
4703            .take()
4704            .expect("only cleared on drop")
4705            .retire(result);
4706    }
4707}
4708
4709impl Drop for AlterSinkReadyContext {
4710    fn drop(&mut self) {
4711        if let Some(ctx) = self.ctx.take() {
4712            ctx.retire(Err(AdapterError::Canceled));
4713        }
4714    }
4715}
4716
4717#[derive(Debug)]
4718pub struct AlterMaterializedViewReadyContext {
4719    ctx: Option<ExecuteContext>,
4720    otel_ctx: OpenTelemetryContext,
4721    plan: plan::AlterMaterializedViewApplyReplacementPlan,
4722    plan_validity: PlanValidity,
4723}
4724
4725impl AlterMaterializedViewReadyContext {
4726    fn ctx(&mut self) -> &mut ExecuteContext {
4727        self.ctx.as_mut().expect("only cleared on drop")
4728    }
4729
4730    fn retire(mut self, result: Result<ExecuteResponse, AdapterError>) {
4731        self.ctx
4732            .take()
4733            .expect("only cleared on drop")
4734            .retire(result);
4735    }
4736}
4737
4738impl Drop for AlterMaterializedViewReadyContext {
4739    fn drop(&mut self) {
4740        if let Some(ctx) = self.ctx.take() {
4741            ctx.retire(Err(AdapterError::Canceled));
4742        }
4743    }
4744}
4745
4746/// A struct for tracking the ownership of a lock and a VecDeque to store to-be-done work after the
4747/// lock is freed.
4748#[derive(Debug)]
4749struct LockedVecDeque<T> {
4750    items: VecDeque<T>,
4751    lock: Arc<tokio::sync::Mutex<()>>,
4752}
4753
4754impl<T> LockedVecDeque<T> {
4755    pub fn new() -> Self {
4756        Self {
4757            items: VecDeque::new(),
4758            lock: Arc::new(tokio::sync::Mutex::new(())),
4759        }
4760    }
4761
4762    pub fn try_lock_owned(&self) -> Result<OwnedMutexGuard<()>, tokio::sync::TryLockError> {
4763        Arc::clone(&self.lock).try_lock_owned()
4764    }
4765
4766    pub fn is_empty(&self) -> bool {
4767        self.items.is_empty()
4768    }
4769
4770    pub fn push_back(&mut self, value: T) {
4771        self.items.push_back(value)
4772    }
4773
4774    pub fn pop_front(&mut self) -> Option<T> {
4775        self.items.pop_front()
4776    }
4777
4778    pub fn remove(&mut self, index: usize) -> Option<T> {
4779        self.items.remove(index)
4780    }
4781
4782    pub fn iter(&self) -> std::collections::vec_deque::Iter<'_, T> {
4783        self.items.iter()
4784    }
4785}
4786
4787#[derive(Debug)]
4788struct DeferredPlanStatement {
4789    ctx: ExecuteContext,
4790    ps: PlanStatement,
4791}
4792
4793#[derive(Debug)]
4794enum PlanStatement {
4795    Statement {
4796        stmt: Arc<Statement<Raw>>,
4797        params: Params,
4798    },
4799    Plan {
4800        plan: mz_sql::plan::Plan,
4801        resolved_ids: ResolvedIds,
4802    },
4803}
4804
4805#[derive(Debug, Error)]
4806pub enum NetworkPolicyError {
4807    #[error("Access denied for address {0}")]
4808    AddressDenied(IpAddr),
4809    #[error("Access denied missing IP address")]
4810    MissingIp,
4811}
4812
4813pub(crate) fn validate_ip_with_policy_rules(
4814    ip: &IpAddr,
4815    rules: &Vec<NetworkPolicyRule>,
4816) -> Result<(), NetworkPolicyError> {
4817    // At the moment we're not handling action or direction
4818    // as those are only able to be "allow" and "ingress" respectively
4819    if rules.iter().any(|r| r.address.0.contains(ip)) {
4820        Ok(())
4821    } else {
4822        Err(NetworkPolicyError::AddressDenied(ip.clone()))
4823    }
4824}
4825
4826pub(crate) fn infer_sql_type_for_catalog(
4827    hir_expr: &HirRelationExpr,
4828    mir_expr: &MirRelationExpr,
4829) -> SqlRelationType {
4830    let mut typ = hir_expr.top_level_typ();
4831    typ.backport_nullability_and_keys(&mir_expr.typ());
4832    typ
4833}