mz_adapter/
coord.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Translation of SQL commands into timestamped `Controller` commands.
11//!
12//! The various SQL commands instruct the system to take actions that are not
13//! yet explicitly timestamped. On the other hand, the underlying data continually
14//! change as time moves forward. On the third hand, we greatly benefit from the
15//! information that some times are no longer of interest, so that we may
16//! compact the representation of the continually changing collections.
17//!
18//! The [`Coordinator`] curates these interactions by observing the progress
19//! collections make through time, choosing timestamps for its own commands,
20//! and eventually communicating that certain times have irretrievably "passed".
21//!
22//! ## Frontiers another way
23//!
24//! If the above description of frontiers left you with questions, this
25//! repackaged explanation might help.
26//!
27//! - `since` is the least recent time (i.e. oldest time) that you can read
28//!   from sources and be guaranteed that the returned data is accurate as of
29//!   that time.
30//!
31//!   Reads at times less than `since` may return values that were not actually
32//!   seen at the specified time, but arrived later (i.e. the results are
33//!   compacted).
34//!
35//!   For correctness' sake, the coordinator never chooses to read at a time
36//!   less than an arrangement's `since`.
37//!
38//! - `upper` is the first time after the most recent time that you can read
39//!   from sources and receive an immediate response. Alternately, it is the
40//!   least time at which the data may still change (that is the reason we may
41//!   not be able to respond immediately).
42//!
43//!   Reads at times >= `upper` may not immediately return because the answer
44//!   isn't known yet. However, once the `upper` is > the specified read time,
45//!   the read can return.
46//!
47//!   For the sake of returned values' freshness, the coordinator prefers
48//!   performing reads at an arrangement's `upper`. However, because we more
49//!   strongly prefer correctness, the coordinator will choose timestamps
50//!   greater than an object's `upper` if it is also being accessed alongside
51//!   objects whose `since` times are >= its `upper`.
52//!
53//! This illustration attempts to show, with time moving left to right, the
54//! relationship between `since` and `upper`.
55//!
56//! - `#`: possibly inaccurate results
57//! - `-`: immediate, correct response
58//! - `?`: not yet known
59//! - `s`: since
60//! - `u`: upper
61//! - `|`: eligible for coordinator to select
62//!
63//! ```nofmt
64//! ####s----u?????
65//!     |||||||||||
66//! ```
67//!
68
69use std::borrow::Cow;
70use std::collections::{BTreeMap, BTreeSet, VecDeque};
71use std::fmt;
72use std::net::IpAddr;
73use std::num::NonZeroI64;
74use std::ops::Neg;
75use std::str::FromStr;
76use std::sync::LazyLock;
77use std::sync::{Arc, Mutex};
78use std::thread;
79use std::time::{Duration, Instant};
80
81use anyhow::Context;
82use chrono::{DateTime, Utc};
83use derivative::Derivative;
84use differential_dataflow::lattice::Lattice;
85use fail::fail_point;
86use futures::StreamExt;
87use futures::future::{BoxFuture, FutureExt, LocalBoxFuture};
88use http::Uri;
89use ipnet::IpNet;
90use itertools::{Either, Itertools};
91use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
92use mz_adapter_types::compaction::CompactionWindow;
93use mz_adapter_types::connection::ConnectionId;
94use mz_adapter_types::dyncfgs::WITH_0DT_DEPLOYMENT_CAUGHT_UP_CHECK_INTERVAL;
95use mz_auth::password::Password;
96use mz_build_info::BuildInfo;
97use mz_catalog::builtin::{BUILTINS, BUILTINS_STATIC, MZ_AUDIT_EVENTS, MZ_STORAGE_USAGE_BY_SHARD};
98use mz_catalog::config::{AwsPrincipalContext, BuiltinItemMigrationConfig, ClusterReplicaSizeMap};
99use mz_catalog::durable::{AuditLogIterator, OpenableDurableCatalogState};
100use mz_catalog::expr_cache::{GlobalExpressions, LocalExpressions};
101use mz_catalog::memory::objects::{
102    CatalogEntry, CatalogItem, ClusterReplicaProcessStatus, ClusterVariantManaged, Connection,
103    DataSourceDesc, StateDiff, StateUpdate, StateUpdateKind, Table, TableDataSource,
104};
105use mz_cloud_resources::{CloudResourceController, VpcEndpointConfig, VpcEndpointEvent};
106use mz_compute_client::as_of_selection;
107use mz_compute_client::controller::error::{
108    CollectionLookupError, DataflowCreationError, InstanceMissing,
109};
110use mz_compute_types::ComputeInstanceId;
111use mz_compute_types::dataflows::DataflowDescription;
112use mz_compute_types::plan::Plan;
113use mz_controller::clusters::{
114    ClusterConfig, ClusterEvent, ClusterStatus, ProcessId, ReplicaLocation,
115};
116use mz_controller::{ControllerConfig, Readiness};
117use mz_controller_types::{ClusterId, ReplicaId, WatchSetId};
118use mz_expr::{MapFilterProject, OptimizedMirRelationExpr, RowSetFinishing};
119use mz_license_keys::ValidatedLicenseKey;
120use mz_orchestrator::OfflineReason;
121use mz_ore::cast::{CastFrom, CastInto, CastLossy};
122use mz_ore::channel::trigger::Trigger;
123use mz_ore::future::TimeoutError;
124use mz_ore::metrics::MetricsRegistry;
125use mz_ore::now::{EpochMillis, NowFn};
126use mz_ore::task::{JoinHandle, spawn};
127use mz_ore::thread::JoinHandleExt;
128use mz_ore::tracing::{OpenTelemetryContext, TracingHandle};
129use mz_ore::url::SensitiveUrl;
130use mz_ore::{assert_none, instrument, soft_assert_eq_or_log, soft_assert_or_log, stack};
131use mz_persist_client::PersistClient;
132use mz_persist_client::batch::ProtoBatch;
133use mz_persist_client::usage::{ShardsUsageReferenced, StorageUsageClient};
134use mz_repr::adt::numeric::Numeric;
135use mz_repr::explain::{ExplainConfig, ExplainFormat};
136use mz_repr::global_id::TransientIdGen;
137use mz_repr::optimize::OptimizerFeatures;
138use mz_repr::role_id::RoleId;
139use mz_repr::{CatalogItemId, Diff, GlobalId, RelationDesc, Timestamp};
140use mz_secrets::cache::CachingSecretsReader;
141use mz_secrets::{SecretsController, SecretsReader};
142use mz_sql::ast::{Raw, Statement};
143use mz_sql::catalog::{CatalogCluster, EnvironmentId};
144use mz_sql::names::{QualifiedItemName, ResolvedIds, SchemaSpecifier};
145use mz_sql::optimizer_metrics::OptimizerMetrics;
146use mz_sql::plan::{
147    self, AlterSinkPlan, ConnectionDetails, CreateConnectionPlan, NetworkPolicyRule,
148    OnTimeoutAction, Params, QueryWhen,
149};
150use mz_sql::session::user::User;
151use mz_sql::session::vars::{MAX_CREDIT_CONSUMPTION_RATE, SystemVars, Var};
152use mz_sql_parser::ast::ExplainStage;
153use mz_sql_parser::ast::display::AstDisplay;
154use mz_storage_client::client::TableData;
155use mz_storage_client::controller::{CollectionDescription, DataSource, ExportDescription};
156use mz_storage_types::connections::Connection as StorageConnection;
157use mz_storage_types::connections::ConnectionContext;
158use mz_storage_types::connections::inline::{IntoInlineConnection, ReferencedConnection};
159use mz_storage_types::read_holds::ReadHold;
160use mz_storage_types::sinks::{S3SinkFormat, StorageSinkDesc};
161use mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC;
162use mz_storage_types::sources::{IngestionDescription, SourceExport, Timeline};
163use mz_timestamp_oracle::WriteTimestamp;
164use mz_timestamp_oracle::postgres_oracle::{
165    PostgresTimestampOracle, PostgresTimestampOracleConfig,
166};
167use mz_transform::dataflow::DataflowMetainfo;
168use opentelemetry::trace::TraceContextExt;
169use serde::Serialize;
170use thiserror::Error;
171use timely::progress::{Antichain, Timestamp as _};
172use tokio::runtime::Handle as TokioHandle;
173use tokio::select;
174use tokio::sync::{OwnedMutexGuard, mpsc, oneshot, watch};
175use tokio::time::{Interval, MissedTickBehavior};
176use tracing::{Instrument, Level, Span, debug, info, info_span, span, warn};
177use tracing_opentelemetry::OpenTelemetrySpanExt;
178use uuid::Uuid;
179
180use crate::active_compute_sink::{ActiveComputeSink, ActiveCopyFrom};
181use crate::catalog::{BuiltinTableUpdate, Catalog, OpenCatalogResult};
182use crate::client::{Client, Handle};
183use crate::command::{Command, ExecuteResponse};
184use crate::config::{SynchronizedParameters, SystemParameterFrontend, SystemParameterSyncConfig};
185use crate::coord::appends::{
186    BuiltinTableAppendNotify, DeferredOp, GroupCommitPermit, PendingWriteTxn,
187};
188use crate::coord::caught_up::CaughtUpCheckContext;
189use crate::coord::cluster_scheduling::SchedulingDecision;
190use crate::coord::id_bundle::CollectionIdBundle;
191use crate::coord::introspection::IntrospectionSubscribe;
192use crate::coord::peek::PendingPeek;
193use crate::coord::statement_logging::StatementLogging;
194use crate::coord::timeline::{TimelineContext, TimelineState};
195use crate::coord::timestamp_selection::{TimestampContext, TimestampDetermination};
196use crate::coord::validity::PlanValidity;
197use crate::error::AdapterError;
198use crate::explain::insights::PlanInsightsContext;
199use crate::explain::optimizer_trace::{DispatchGuard, OptimizerTrace};
200use crate::metrics::Metrics;
201use crate::optimize::dataflows::{
202    ComputeInstanceSnapshot, DataflowBuilder, dataflow_import_id_bundle,
203};
204use crate::optimize::{self, Optimize, OptimizerConfig};
205use crate::session::{EndTransactionAction, Session};
206use crate::statement_logging::{
207    StatementEndedExecutionReason, StatementLifecycleEvent, StatementLoggingId,
208};
209use crate::util::{ClientTransmitter, ResultExt};
210use crate::webhook::{WebhookAppenderInvalidator, WebhookConcurrencyLimiter};
211use crate::{AdapterNotice, ReadHolds, flags};
212
213pub(crate) mod appends;
214pub(crate) mod catalog_serving;
215pub(crate) mod cluster_scheduling;
216pub(crate) mod consistency;
217pub(crate) mod id_bundle;
218pub(crate) mod in_memory_oracle;
219pub(crate) mod peek;
220pub(crate) mod read_policy;
221pub(crate) mod sequencer;
222pub(crate) mod statement_logging;
223pub(crate) mod timeline;
224pub(crate) mod timestamp_selection;
225
226pub mod catalog_implications;
227mod caught_up;
228mod command_handler;
229mod ddl;
230mod indexes;
231mod introspection;
232mod message_handler;
233mod privatelink_status;
234mod sql;
235mod validity;
236
237#[derive(Debug)]
238pub enum Message {
239    Command(OpenTelemetryContext, Command),
240    ControllerReady {
241        controller: ControllerReadiness,
242    },
243    PurifiedStatementReady(PurifiedStatementReady),
244    CreateConnectionValidationReady(CreateConnectionValidationReady),
245    AlterConnectionValidationReady(AlterConnectionValidationReady),
246    TryDeferred {
247        /// The connection that created this op.
248        conn_id: ConnectionId,
249        /// The write lock that notified us our deferred op might be able to run.
250        ///
251        /// Note: While we never want to hold a partial set of locks, it can be important to hold
252        /// onto the _one_ that notified us our op might be ready. If there are multiple operations
253        /// waiting on a single collection, and we don't hold this lock through retyring the op,
254        /// then everything waiting on this collection will get retried causing traffic in the
255        /// Coordinator's message queue.
256        ///
257        /// See [`DeferredOp::can_be_optimistically_retried`] for more detail.
258        acquired_lock: Option<(CatalogItemId, tokio::sync::OwnedMutexGuard<()>)>,
259    },
260    /// Initiates a group commit.
261    GroupCommitInitiate(Span, Option<GroupCommitPermit>),
262    DeferredStatementReady,
263    AdvanceTimelines,
264    ClusterEvent(ClusterEvent),
265    CancelPendingPeeks {
266        conn_id: ConnectionId,
267    },
268    LinearizeReads,
269    StagedBatches {
270        conn_id: ConnectionId,
271        table_id: CatalogItemId,
272        batches: Vec<Result<ProtoBatch, String>>,
273    },
274    StorageUsageSchedule,
275    StorageUsageFetch,
276    StorageUsageUpdate(ShardsUsageReferenced),
277    StorageUsagePrune(Vec<BuiltinTableUpdate>),
278    /// Performs any cleanup and logging actions necessary for
279    /// finalizing a statement execution.
280    RetireExecute {
281        data: ExecuteContextExtra,
282        otel_ctx: OpenTelemetryContext,
283        reason: StatementEndedExecutionReason,
284    },
285    ExecuteSingleStatementTransaction {
286        ctx: ExecuteContext,
287        otel_ctx: OpenTelemetryContext,
288        stmt: Arc<Statement<Raw>>,
289        params: mz_sql::plan::Params,
290    },
291    PeekStageReady {
292        ctx: ExecuteContext,
293        span: Span,
294        stage: PeekStage,
295    },
296    CreateIndexStageReady {
297        ctx: ExecuteContext,
298        span: Span,
299        stage: CreateIndexStage,
300    },
301    CreateViewStageReady {
302        ctx: ExecuteContext,
303        span: Span,
304        stage: CreateViewStage,
305    },
306    CreateMaterializedViewStageReady {
307        ctx: ExecuteContext,
308        span: Span,
309        stage: CreateMaterializedViewStage,
310    },
311    SubscribeStageReady {
312        ctx: ExecuteContext,
313        span: Span,
314        stage: SubscribeStage,
315    },
316    IntrospectionSubscribeStageReady {
317        span: Span,
318        stage: IntrospectionSubscribeStage,
319    },
320    SecretStageReady {
321        ctx: ExecuteContext,
322        span: Span,
323        stage: SecretStage,
324    },
325    ClusterStageReady {
326        ctx: ExecuteContext,
327        span: Span,
328        stage: ClusterStage,
329    },
330    ExplainTimestampStageReady {
331        ctx: ExecuteContext,
332        span: Span,
333        stage: ExplainTimestampStage,
334    },
335    DrainStatementLog,
336    PrivateLinkVpcEndpointEvents(Vec<VpcEndpointEvent>),
337    CheckSchedulingPolicies,
338
339    /// Scheduling policy decisions about turning clusters On/Off.
340    /// `Vec<(policy name, Vec of decisions by the policy)>`
341    /// A cluster will be On if and only if there is at least one On decision for it.
342    /// Scheduling decisions for clusters that have `SCHEDULE = MANUAL` are ignored.
343    SchedulingDecisions(Vec<(&'static str, Vec<(ClusterId, SchedulingDecision)>)>),
344}
345
346impl Message {
347    /// Returns a string to identify the kind of [`Message`], useful for logging.
348    pub const fn kind(&self) -> &'static str {
349        match self {
350            Message::Command(_, msg) => match msg {
351                Command::CatalogSnapshot { .. } => "command-catalog_snapshot",
352                Command::Startup { .. } => "command-startup",
353                Command::Execute { .. } => "command-execute",
354                Command::Commit { .. } => "command-commit",
355                Command::CancelRequest { .. } => "command-cancel_request",
356                Command::PrivilegedCancelRequest { .. } => "command-privileged_cancel_request",
357                Command::GetWebhook { .. } => "command-get_webhook",
358                Command::GetSystemVars { .. } => "command-get_system_vars",
359                Command::SetSystemVars { .. } => "command-set_system_vars",
360                Command::Terminate { .. } => "command-terminate",
361                Command::RetireExecute { .. } => "command-retire_execute",
362                Command::CheckConsistency { .. } => "command-check_consistency",
363                Command::Dump { .. } => "command-dump",
364                Command::AuthenticatePassword { .. } => "command-auth_check",
365                Command::AuthenticateGetSASLChallenge { .. } => "command-auth_get_sasl_challenge",
366                Command::AuthenticateVerifySASLProof { .. } => "command-auth_verify_sasl_proof",
367                Command::GetComputeInstanceClient { .. } => "get-compute-instance-client",
368                Command::GetOracle { .. } => "get-oracle",
369                Command::DetermineRealTimeRecentTimestamp { .. } => {
370                    "determine-real-time-recent-timestamp"
371                }
372                Command::GetTransactionReadHoldsBundle { .. } => {
373                    "get-transaction-read-holds-bundle"
374                }
375                Command::StoreTransactionReadHolds { .. } => "store-transaction-read-holds",
376                Command::ExecuteSlowPathPeek { .. } => "execute-slow-path-peek",
377                Command::CopyToPreflight { .. } => "copy-to-preflight",
378                Command::ExecuteCopyTo { .. } => "execute-copy-to",
379                Command::ExecuteSideEffectingFunc { .. } => "execute-side-effecting-func",
380                Command::RegisterFrontendPeek { .. } => "register-frontend-peek",
381                Command::UnregisterFrontendPeek { .. } => "unregister-frontend-peek",
382                Command::ExplainTimestamp { .. } => "explain-timestamp",
383                Command::FrontendStatementLogging(..) => "frontend-statement-logging",
384            },
385            Message::ControllerReady {
386                controller: ControllerReadiness::Compute,
387            } => "controller_ready(compute)",
388            Message::ControllerReady {
389                controller: ControllerReadiness::Storage,
390            } => "controller_ready(storage)",
391            Message::ControllerReady {
392                controller: ControllerReadiness::Metrics,
393            } => "controller_ready(metrics)",
394            Message::ControllerReady {
395                controller: ControllerReadiness::Internal,
396            } => "controller_ready(internal)",
397            Message::PurifiedStatementReady(_) => "purified_statement_ready",
398            Message::CreateConnectionValidationReady(_) => "create_connection_validation_ready",
399            Message::TryDeferred { .. } => "try_deferred",
400            Message::GroupCommitInitiate(..) => "group_commit_initiate",
401            Message::AdvanceTimelines => "advance_timelines",
402            Message::ClusterEvent(_) => "cluster_event",
403            Message::CancelPendingPeeks { .. } => "cancel_pending_peeks",
404            Message::LinearizeReads => "linearize_reads",
405            Message::StagedBatches { .. } => "staged_batches",
406            Message::StorageUsageSchedule => "storage_usage_schedule",
407            Message::StorageUsageFetch => "storage_usage_fetch",
408            Message::StorageUsageUpdate(_) => "storage_usage_update",
409            Message::StorageUsagePrune(_) => "storage_usage_prune",
410            Message::RetireExecute { .. } => "retire_execute",
411            Message::ExecuteSingleStatementTransaction { .. } => {
412                "execute_single_statement_transaction"
413            }
414            Message::PeekStageReady { .. } => "peek_stage_ready",
415            Message::ExplainTimestampStageReady { .. } => "explain_timestamp_stage_ready",
416            Message::CreateIndexStageReady { .. } => "create_index_stage_ready",
417            Message::CreateViewStageReady { .. } => "create_view_stage_ready",
418            Message::CreateMaterializedViewStageReady { .. } => {
419                "create_materialized_view_stage_ready"
420            }
421            Message::SubscribeStageReady { .. } => "subscribe_stage_ready",
422            Message::IntrospectionSubscribeStageReady { .. } => {
423                "introspection_subscribe_stage_ready"
424            }
425            Message::SecretStageReady { .. } => "secret_stage_ready",
426            Message::ClusterStageReady { .. } => "cluster_stage_ready",
427            Message::DrainStatementLog => "drain_statement_log",
428            Message::AlterConnectionValidationReady(..) => "alter_connection_validation_ready",
429            Message::PrivateLinkVpcEndpointEvents(_) => "private_link_vpc_endpoint_events",
430            Message::CheckSchedulingPolicies => "check_scheduling_policies",
431            Message::SchedulingDecisions { .. } => "scheduling_decision",
432            Message::DeferredStatementReady => "deferred_statement_ready",
433        }
434    }
435}
436
437/// The reason for why a controller needs processing on the main loop.
438#[derive(Debug)]
439pub enum ControllerReadiness {
440    /// The storage controller is ready.
441    Storage,
442    /// The compute controller is ready.
443    Compute,
444    /// A batch of metric data is ready.
445    Metrics,
446    /// An internally-generated message is ready to be returned.
447    Internal,
448}
449
450#[derive(Derivative)]
451#[derivative(Debug)]
452pub struct BackgroundWorkResult<T> {
453    #[derivative(Debug = "ignore")]
454    pub ctx: ExecuteContext,
455    pub result: Result<T, AdapterError>,
456    pub params: Params,
457    pub plan_validity: PlanValidity,
458    pub original_stmt: Arc<Statement<Raw>>,
459    pub otel_ctx: OpenTelemetryContext,
460}
461
462pub type PurifiedStatementReady = BackgroundWorkResult<mz_sql::pure::PurifiedStatement>;
463
464#[derive(Derivative)]
465#[derivative(Debug)]
466pub struct ValidationReady<T> {
467    #[derivative(Debug = "ignore")]
468    pub ctx: ExecuteContext,
469    pub result: Result<T, AdapterError>,
470    pub resolved_ids: ResolvedIds,
471    pub connection_id: CatalogItemId,
472    pub connection_gid: GlobalId,
473    pub plan_validity: PlanValidity,
474    pub otel_ctx: OpenTelemetryContext,
475}
476
477pub type CreateConnectionValidationReady = ValidationReady<CreateConnectionPlan>;
478pub type AlterConnectionValidationReady = ValidationReady<Connection>;
479
480#[derive(Debug)]
481pub enum PeekStage {
482    /// Common stages across SELECT, EXPLAIN and COPY TO queries.
483    LinearizeTimestamp(PeekStageLinearizeTimestamp),
484    RealTimeRecency(PeekStageRealTimeRecency),
485    TimestampReadHold(PeekStageTimestampReadHold),
486    Optimize(PeekStageOptimize),
487    /// Final stage for a peek.
488    Finish(PeekStageFinish),
489    /// Final stage for an explain.
490    ExplainPlan(PeekStageExplainPlan),
491    ExplainPushdown(PeekStageExplainPushdown),
492    /// Preflight checks for a copy to operation.
493    CopyToPreflight(PeekStageCopyTo),
494    /// Final stage for a copy to which involves shipping the dataflow.
495    CopyToDataflow(PeekStageCopyTo),
496}
497
498#[derive(Debug)]
499pub struct CopyToContext {
500    /// The `RelationDesc` of the data to be copied.
501    pub desc: RelationDesc,
502    /// The destination uri of the external service where the data will be copied.
503    pub uri: Uri,
504    /// Connection information required to connect to the external service to copy the data.
505    pub connection: StorageConnection<ReferencedConnection>,
506    /// The ID of the CONNECTION object to be used for copying the data.
507    pub connection_id: CatalogItemId,
508    /// Format params to format the data.
509    pub format: S3SinkFormat,
510    /// Approximate max file size of each uploaded file.
511    pub max_file_size: u64,
512    /// Number of batches the output of the COPY TO will be partitioned into
513    /// to distribute the load across workers deterministically.
514    /// This is only an option since it's not set when CopyToContext is instantiated
515    /// but immediately after in the PeekStageValidate stage.
516    pub output_batch_count: Option<u64>,
517}
518
519#[derive(Debug)]
520pub struct PeekStageLinearizeTimestamp {
521    validity: PlanValidity,
522    plan: mz_sql::plan::SelectPlan,
523    max_query_result_size: Option<u64>,
524    source_ids: BTreeSet<GlobalId>,
525    target_replica: Option<ReplicaId>,
526    timeline_context: TimelineContext,
527    optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
528    /// An optional context set iff the state machine is initiated from
529    /// sequencing an EXPLAIN for this statement.
530    explain_ctx: ExplainContext,
531}
532
533#[derive(Debug)]
534pub struct PeekStageRealTimeRecency {
535    validity: PlanValidity,
536    plan: mz_sql::plan::SelectPlan,
537    max_query_result_size: Option<u64>,
538    source_ids: BTreeSet<GlobalId>,
539    target_replica: Option<ReplicaId>,
540    timeline_context: TimelineContext,
541    oracle_read_ts: Option<Timestamp>,
542    optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
543    /// An optional context set iff the state machine is initiated from
544    /// sequencing an EXPLAIN for this statement.
545    explain_ctx: ExplainContext,
546}
547
548#[derive(Debug)]
549pub struct PeekStageTimestampReadHold {
550    validity: PlanValidity,
551    plan: mz_sql::plan::SelectPlan,
552    max_query_result_size: Option<u64>,
553    source_ids: BTreeSet<GlobalId>,
554    target_replica: Option<ReplicaId>,
555    timeline_context: TimelineContext,
556    oracle_read_ts: Option<Timestamp>,
557    real_time_recency_ts: Option<mz_repr::Timestamp>,
558    optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
559    /// An optional context set iff the state machine is initiated from
560    /// sequencing an EXPLAIN for this statement.
561    explain_ctx: ExplainContext,
562}
563
564#[derive(Debug)]
565pub struct PeekStageOptimize {
566    validity: PlanValidity,
567    plan: mz_sql::plan::SelectPlan,
568    max_query_result_size: Option<u64>,
569    source_ids: BTreeSet<GlobalId>,
570    id_bundle: CollectionIdBundle,
571    target_replica: Option<ReplicaId>,
572    determination: TimestampDetermination<mz_repr::Timestamp>,
573    optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
574    /// An optional context set iff the state machine is initiated from
575    /// sequencing an EXPLAIN for this statement.
576    explain_ctx: ExplainContext,
577}
578
579#[derive(Debug)]
580pub struct PeekStageFinish {
581    validity: PlanValidity,
582    plan: mz_sql::plan::SelectPlan,
583    max_query_result_size: Option<u64>,
584    id_bundle: CollectionIdBundle,
585    target_replica: Option<ReplicaId>,
586    source_ids: BTreeSet<GlobalId>,
587    determination: TimestampDetermination<mz_repr::Timestamp>,
588    cluster_id: ComputeInstanceId,
589    finishing: RowSetFinishing,
590    /// When present, an optimizer trace to be used for emitting a plan insights
591    /// notice.
592    plan_insights_optimizer_trace: Option<OptimizerTrace>,
593    insights_ctx: Option<Box<PlanInsightsContext>>,
594    global_lir_plan: optimize::peek::GlobalLirPlan,
595    optimization_finished_at: EpochMillis,
596}
597
598#[derive(Debug)]
599pub struct PeekStageCopyTo {
600    validity: PlanValidity,
601    optimizer: optimize::copy_to::Optimizer,
602    global_lir_plan: optimize::copy_to::GlobalLirPlan,
603    optimization_finished_at: EpochMillis,
604    source_ids: BTreeSet<GlobalId>,
605}
606
607#[derive(Debug)]
608pub struct PeekStageExplainPlan {
609    validity: PlanValidity,
610    optimizer: optimize::peek::Optimizer,
611    df_meta: DataflowMetainfo,
612    explain_ctx: ExplainPlanContext,
613    insights_ctx: Option<Box<PlanInsightsContext>>,
614}
615
616#[derive(Debug)]
617pub struct PeekStageExplainPushdown {
618    validity: PlanValidity,
619    determination: TimestampDetermination<mz_repr::Timestamp>,
620    imports: BTreeMap<GlobalId, MapFilterProject>,
621}
622
623#[derive(Debug)]
624pub enum CreateIndexStage {
625    Optimize(CreateIndexOptimize),
626    Finish(CreateIndexFinish),
627    Explain(CreateIndexExplain),
628}
629
630#[derive(Debug)]
631pub struct CreateIndexOptimize {
632    validity: PlanValidity,
633    plan: plan::CreateIndexPlan,
634    resolved_ids: ResolvedIds,
635    /// An optional context set iff the state machine is initiated from
636    /// sequencing an EXPLAIN for this statement.
637    explain_ctx: ExplainContext,
638}
639
640#[derive(Debug)]
641pub struct CreateIndexFinish {
642    validity: PlanValidity,
643    item_id: CatalogItemId,
644    global_id: GlobalId,
645    plan: plan::CreateIndexPlan,
646    resolved_ids: ResolvedIds,
647    global_mir_plan: optimize::index::GlobalMirPlan,
648    global_lir_plan: optimize::index::GlobalLirPlan,
649}
650
651#[derive(Debug)]
652pub struct CreateIndexExplain {
653    validity: PlanValidity,
654    exported_index_id: GlobalId,
655    plan: plan::CreateIndexPlan,
656    df_meta: DataflowMetainfo,
657    explain_ctx: ExplainPlanContext,
658}
659
660#[derive(Debug)]
661pub enum CreateViewStage {
662    Optimize(CreateViewOptimize),
663    Finish(CreateViewFinish),
664    Explain(CreateViewExplain),
665}
666
667#[derive(Debug)]
668pub struct CreateViewOptimize {
669    validity: PlanValidity,
670    plan: plan::CreateViewPlan,
671    resolved_ids: ResolvedIds,
672    /// An optional context set iff the state machine is initiated from
673    /// sequencing an EXPLAIN for this statement.
674    explain_ctx: ExplainContext,
675}
676
677#[derive(Debug)]
678pub struct CreateViewFinish {
679    validity: PlanValidity,
680    /// ID of this item in the Catalog.
681    item_id: CatalogItemId,
682    /// ID by with Compute will reference this View.
683    global_id: GlobalId,
684    plan: plan::CreateViewPlan,
685    /// IDs of objects resolved during name resolution.
686    resolved_ids: ResolvedIds,
687    optimized_expr: OptimizedMirRelationExpr,
688}
689
690#[derive(Debug)]
691pub struct CreateViewExplain {
692    validity: PlanValidity,
693    id: GlobalId,
694    plan: plan::CreateViewPlan,
695    explain_ctx: ExplainPlanContext,
696}
697
698#[derive(Debug)]
699pub enum ExplainTimestampStage {
700    Optimize(ExplainTimestampOptimize),
701    RealTimeRecency(ExplainTimestampRealTimeRecency),
702    Finish(ExplainTimestampFinish),
703}
704
705#[derive(Debug)]
706pub struct ExplainTimestampOptimize {
707    validity: PlanValidity,
708    plan: plan::ExplainTimestampPlan,
709    cluster_id: ClusterId,
710}
711
712#[derive(Debug)]
713pub struct ExplainTimestampRealTimeRecency {
714    validity: PlanValidity,
715    format: ExplainFormat,
716    optimized_plan: OptimizedMirRelationExpr,
717    cluster_id: ClusterId,
718    when: QueryWhen,
719}
720
721#[derive(Debug)]
722pub struct ExplainTimestampFinish {
723    validity: PlanValidity,
724    format: ExplainFormat,
725    optimized_plan: OptimizedMirRelationExpr,
726    cluster_id: ClusterId,
727    source_ids: BTreeSet<GlobalId>,
728    when: QueryWhen,
729    real_time_recency_ts: Option<Timestamp>,
730}
731
732#[derive(Debug)]
733pub enum ClusterStage {
734    Alter(AlterCluster),
735    WaitForHydrated(AlterClusterWaitForHydrated),
736    Finalize(AlterClusterFinalize),
737}
738
739#[derive(Debug)]
740pub struct AlterCluster {
741    validity: PlanValidity,
742    plan: plan::AlterClusterPlan,
743}
744
745#[derive(Debug)]
746pub struct AlterClusterWaitForHydrated {
747    validity: PlanValidity,
748    plan: plan::AlterClusterPlan,
749    new_config: ClusterVariantManaged,
750    timeout_time: Instant,
751    on_timeout: OnTimeoutAction,
752}
753
754#[derive(Debug)]
755pub struct AlterClusterFinalize {
756    validity: PlanValidity,
757    plan: plan::AlterClusterPlan,
758    new_config: ClusterVariantManaged,
759}
760
761#[derive(Debug)]
762pub enum ExplainContext {
763    /// The ordinary, non-explain variant of the statement.
764    None,
765    /// The `EXPLAIN <level> PLAN FOR <explainee>` version of the statement.
766    Plan(ExplainPlanContext),
767    /// Generate a notice containing the `EXPLAIN PLAN INSIGHTS` output
768    /// alongside the query's normal output.
769    PlanInsightsNotice(OptimizerTrace),
770    /// `EXPLAIN FILTER PUSHDOWN`
771    Pushdown,
772}
773
774impl ExplainContext {
775    /// If available for this context, wrap the [`OptimizerTrace`] into a
776    /// [`tracing::Dispatch`] and set it as default, returning the resulting
777    /// guard in a `Some(guard)` option.
778    pub(crate) fn dispatch_guard(&self) -> Option<DispatchGuard<'_>> {
779        let optimizer_trace = match self {
780            ExplainContext::Plan(explain_ctx) => Some(&explain_ctx.optimizer_trace),
781            ExplainContext::PlanInsightsNotice(optimizer_trace) => Some(optimizer_trace),
782            _ => None,
783        };
784        optimizer_trace.map(|optimizer_trace| optimizer_trace.as_guard())
785    }
786
787    pub(crate) fn needs_cluster(&self) -> bool {
788        match self {
789            ExplainContext::None => true,
790            ExplainContext::Plan(..) => false,
791            ExplainContext::PlanInsightsNotice(..) => true,
792            ExplainContext::Pushdown => false,
793        }
794    }
795
796    pub(crate) fn needs_plan_insights(&self) -> bool {
797        matches!(
798            self,
799            ExplainContext::Plan(ExplainPlanContext {
800                stage: ExplainStage::PlanInsights,
801                ..
802            }) | ExplainContext::PlanInsightsNotice(_)
803        )
804    }
805}
806
807#[derive(Debug)]
808pub struct ExplainPlanContext {
809    /// EXPLAIN BROKEN is internal syntax for showing EXPLAIN output despite an internal error in
810    /// the optimizer: we don't immediately bail out from peek sequencing when an internal optimizer
811    /// error happens, but go on with trying to show the requested EXPLAIN stage. This can still
812    /// succeed if the requested EXPLAIN stage is before the point where the error happened.
813    pub broken: bool,
814    pub config: ExplainConfig,
815    pub format: ExplainFormat,
816    pub stage: ExplainStage,
817    pub replan: Option<GlobalId>,
818    pub desc: Option<RelationDesc>,
819    pub optimizer_trace: OptimizerTrace,
820}
821
822#[derive(Debug)]
823pub enum CreateMaterializedViewStage {
824    Optimize(CreateMaterializedViewOptimize),
825    Finish(CreateMaterializedViewFinish),
826    Explain(CreateMaterializedViewExplain),
827}
828
829#[derive(Debug)]
830pub struct CreateMaterializedViewOptimize {
831    validity: PlanValidity,
832    plan: plan::CreateMaterializedViewPlan,
833    resolved_ids: ResolvedIds,
834    /// An optional context set iff the state machine is initiated from
835    /// sequencing an EXPLAIN for this statement.
836    explain_ctx: ExplainContext,
837}
838
839#[derive(Debug)]
840pub struct CreateMaterializedViewFinish {
841    /// The ID of this Materialized View in the Catalog.
842    item_id: CatalogItemId,
843    /// The ID of the durable pTVC backing this Materialized View.
844    global_id: GlobalId,
845    validity: PlanValidity,
846    plan: plan::CreateMaterializedViewPlan,
847    resolved_ids: ResolvedIds,
848    local_mir_plan: optimize::materialized_view::LocalMirPlan,
849    global_mir_plan: optimize::materialized_view::GlobalMirPlan,
850    global_lir_plan: optimize::materialized_view::GlobalLirPlan,
851}
852
853#[derive(Debug)]
854pub struct CreateMaterializedViewExplain {
855    global_id: GlobalId,
856    validity: PlanValidity,
857    plan: plan::CreateMaterializedViewPlan,
858    df_meta: DataflowMetainfo,
859    explain_ctx: ExplainPlanContext,
860}
861
862#[derive(Debug)]
863pub enum SubscribeStage {
864    OptimizeMir(SubscribeOptimizeMir),
865    TimestampOptimizeLir(SubscribeTimestampOptimizeLir),
866    Finish(SubscribeFinish),
867}
868
869#[derive(Debug)]
870pub struct SubscribeOptimizeMir {
871    validity: PlanValidity,
872    plan: plan::SubscribePlan,
873    timeline: TimelineContext,
874    dependency_ids: BTreeSet<GlobalId>,
875    cluster_id: ComputeInstanceId,
876    replica_id: Option<ReplicaId>,
877}
878
879#[derive(Debug)]
880pub struct SubscribeTimestampOptimizeLir {
881    validity: PlanValidity,
882    plan: plan::SubscribePlan,
883    timeline: TimelineContext,
884    optimizer: optimize::subscribe::Optimizer,
885    global_mir_plan: optimize::subscribe::GlobalMirPlan<optimize::subscribe::Unresolved>,
886    dependency_ids: BTreeSet<GlobalId>,
887    replica_id: Option<ReplicaId>,
888}
889
890#[derive(Debug)]
891pub struct SubscribeFinish {
892    validity: PlanValidity,
893    cluster_id: ComputeInstanceId,
894    replica_id: Option<ReplicaId>,
895    plan: plan::SubscribePlan,
896    global_lir_plan: optimize::subscribe::GlobalLirPlan,
897    dependency_ids: BTreeSet<GlobalId>,
898}
899
900#[derive(Debug)]
901pub enum IntrospectionSubscribeStage {
902    OptimizeMir(IntrospectionSubscribeOptimizeMir),
903    TimestampOptimizeLir(IntrospectionSubscribeTimestampOptimizeLir),
904    Finish(IntrospectionSubscribeFinish),
905}
906
907#[derive(Debug)]
908pub struct IntrospectionSubscribeOptimizeMir {
909    validity: PlanValidity,
910    plan: plan::SubscribePlan,
911    subscribe_id: GlobalId,
912    cluster_id: ComputeInstanceId,
913    replica_id: ReplicaId,
914}
915
916#[derive(Debug)]
917pub struct IntrospectionSubscribeTimestampOptimizeLir {
918    validity: PlanValidity,
919    optimizer: optimize::subscribe::Optimizer,
920    global_mir_plan: optimize::subscribe::GlobalMirPlan<optimize::subscribe::Unresolved>,
921    cluster_id: ComputeInstanceId,
922    replica_id: ReplicaId,
923}
924
925#[derive(Debug)]
926pub struct IntrospectionSubscribeFinish {
927    validity: PlanValidity,
928    global_lir_plan: optimize::subscribe::GlobalLirPlan,
929    read_holds: ReadHolds<Timestamp>,
930    cluster_id: ComputeInstanceId,
931    replica_id: ReplicaId,
932}
933
934#[derive(Debug)]
935pub enum SecretStage {
936    CreateEnsure(CreateSecretEnsure),
937    CreateFinish(CreateSecretFinish),
938    RotateKeysEnsure(RotateKeysSecretEnsure),
939    RotateKeysFinish(RotateKeysSecretFinish),
940    Alter(AlterSecret),
941}
942
943#[derive(Debug)]
944pub struct CreateSecretEnsure {
945    validity: PlanValidity,
946    plan: plan::CreateSecretPlan,
947}
948
949#[derive(Debug)]
950pub struct CreateSecretFinish {
951    validity: PlanValidity,
952    item_id: CatalogItemId,
953    global_id: GlobalId,
954    plan: plan::CreateSecretPlan,
955}
956
957#[derive(Debug)]
958pub struct RotateKeysSecretEnsure {
959    validity: PlanValidity,
960    id: CatalogItemId,
961}
962
963#[derive(Debug)]
964pub struct RotateKeysSecretFinish {
965    validity: PlanValidity,
966    ops: Vec<crate::catalog::Op>,
967}
968
969#[derive(Debug)]
970pub struct AlterSecret {
971    validity: PlanValidity,
972    plan: plan::AlterSecretPlan,
973}
974
975/// An enum describing which cluster to run a statement on.
976///
977/// One example usage would be that if a query depends only on system tables, we might
978/// automatically run it on the catalog server cluster to benefit from indexes that exist there.
979#[derive(Debug, Copy, Clone, PartialEq, Eq)]
980pub enum TargetCluster {
981    /// The catalog server cluster.
982    CatalogServer,
983    /// The current user's active cluster.
984    Active,
985    /// The cluster selected at the start of a transaction.
986    Transaction(ClusterId),
987}
988
989/// Result types for each stage of a sequence.
990pub(crate) enum StageResult<T> {
991    /// A task was spawned that will return the next stage.
992    Handle(JoinHandle<Result<T, AdapterError>>),
993    /// A task was spawned that will return a response for the client.
994    HandleRetire(JoinHandle<Result<ExecuteResponse, AdapterError>>),
995    /// The next stage is immediately ready and will execute.
996    Immediate(T),
997    /// The final stage was executed and is ready to respond to the client.
998    Response(ExecuteResponse),
999}
1000
1001/// Common functionality for [Coordinator::sequence_staged].
1002pub(crate) trait Staged: Send {
1003    type Ctx: StagedContext;
1004
1005    fn validity(&mut self) -> &mut PlanValidity;
1006
1007    /// Returns the next stage or final result.
1008    async fn stage(
1009        self,
1010        coord: &mut Coordinator,
1011        ctx: &mut Self::Ctx,
1012    ) -> Result<StageResult<Box<Self>>, AdapterError>;
1013
1014    /// Prepares a message for the Coordinator.
1015    fn message(self, ctx: Self::Ctx, span: Span) -> Message;
1016
1017    /// Whether it is safe to SQL cancel this stage.
1018    fn cancel_enabled(&self) -> bool;
1019}
1020
1021pub trait StagedContext {
1022    fn retire(self, result: Result<ExecuteResponse, AdapterError>);
1023    fn session(&self) -> Option<&Session>;
1024}
1025
1026impl StagedContext for ExecuteContext {
1027    fn retire(self, result: Result<ExecuteResponse, AdapterError>) {
1028        self.retire(result);
1029    }
1030
1031    fn session(&self) -> Option<&Session> {
1032        Some(self.session())
1033    }
1034}
1035
1036impl StagedContext for () {
1037    fn retire(self, _result: Result<ExecuteResponse, AdapterError>) {}
1038
1039    fn session(&self) -> Option<&Session> {
1040        None
1041    }
1042}
1043
1044/// Configures a coordinator.
1045pub struct Config {
1046    pub controller_config: ControllerConfig,
1047    pub controller_envd_epoch: NonZeroI64,
1048    pub storage: Box<dyn mz_catalog::durable::DurableCatalogState>,
1049    pub audit_logs_iterator: AuditLogIterator,
1050    pub timestamp_oracle_url: Option<SensitiveUrl>,
1051    pub unsafe_mode: bool,
1052    pub all_features: bool,
1053    pub build_info: &'static BuildInfo,
1054    pub environment_id: EnvironmentId,
1055    pub metrics_registry: MetricsRegistry,
1056    pub now: NowFn,
1057    pub secrets_controller: Arc<dyn SecretsController>,
1058    pub cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
1059    pub availability_zones: Vec<String>,
1060    pub cluster_replica_sizes: ClusterReplicaSizeMap,
1061    pub builtin_system_cluster_config: BootstrapBuiltinClusterConfig,
1062    pub builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig,
1063    pub builtin_probe_cluster_config: BootstrapBuiltinClusterConfig,
1064    pub builtin_support_cluster_config: BootstrapBuiltinClusterConfig,
1065    pub builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig,
1066    pub system_parameter_defaults: BTreeMap<String, String>,
1067    pub storage_usage_client: StorageUsageClient,
1068    pub storage_usage_collection_interval: Duration,
1069    pub storage_usage_retention_period: Option<Duration>,
1070    pub segment_client: Option<mz_segment::Client>,
1071    pub egress_addresses: Vec<IpNet>,
1072    pub remote_system_parameters: Option<BTreeMap<String, String>>,
1073    pub aws_account_id: Option<String>,
1074    pub aws_privatelink_availability_zones: Option<Vec<String>>,
1075    pub connection_context: ConnectionContext,
1076    pub connection_limit_callback: Box<dyn Fn(u64, u64) -> () + Send + Sync + 'static>,
1077    pub webhook_concurrency_limit: WebhookConcurrencyLimiter,
1078    pub http_host_name: Option<String>,
1079    pub tracing_handle: TracingHandle,
1080    /// Whether or not to start controllers in read-only mode. This is only
1081    /// meant for use during development of read-only clusters and 0dt upgrades
1082    /// and should go away once we have proper orchestration during upgrades.
1083    pub read_only_controllers: bool,
1084
1085    /// A trigger that signals that the current deployment has caught up with a
1086    /// previous deployment. Only used during 0dt deployment, while in read-only
1087    /// mode.
1088    pub caught_up_trigger: Option<Trigger>,
1089
1090    pub helm_chart_version: Option<String>,
1091    pub license_key: ValidatedLicenseKey,
1092    pub external_login_password_mz_system: Option<Password>,
1093    pub force_builtin_schema_migration: Option<String>,
1094}
1095
1096/// Metadata about an active connection.
1097#[derive(Debug, Serialize)]
1098pub struct ConnMeta {
1099    /// Pgwire specifies that every connection have a 32-bit secret associated
1100    /// with it, that is known to both the client and the server. Cancellation
1101    /// requests are required to authenticate with the secret of the connection
1102    /// that they are targeting.
1103    secret_key: u32,
1104    /// The time when the session's connection was initiated.
1105    connected_at: EpochMillis,
1106    user: User,
1107    application_name: String,
1108    uuid: Uuid,
1109    conn_id: ConnectionId,
1110    client_ip: Option<IpAddr>,
1111
1112    /// Sinks that will need to be dropped when the current transaction, if
1113    /// any, is cleared.
1114    drop_sinks: BTreeSet<GlobalId>,
1115
1116    /// Lock for the Coordinator's deferred statements that is dropped on transaction clear.
1117    #[serde(skip)]
1118    deferred_lock: Option<OwnedMutexGuard<()>>,
1119
1120    /// Cluster reconfigurations that will need to be
1121    /// cleaned up when the current transaction is cleared
1122    pending_cluster_alters: BTreeSet<ClusterId>,
1123
1124    /// Channel on which to send notices to a session.
1125    #[serde(skip)]
1126    notice_tx: mpsc::UnboundedSender<AdapterNotice>,
1127
1128    /// The role that initiated the database context. Fixed for the duration of the connection.
1129    /// WARNING: This role reference is not updated when the role is dropped.
1130    /// Consumers should not assume that this role exist.
1131    authenticated_role: RoleId,
1132}
1133
1134impl ConnMeta {
1135    pub fn conn_id(&self) -> &ConnectionId {
1136        &self.conn_id
1137    }
1138
1139    pub fn user(&self) -> &User {
1140        &self.user
1141    }
1142
1143    pub fn application_name(&self) -> &str {
1144        &self.application_name
1145    }
1146
1147    pub fn authenticated_role_id(&self) -> &RoleId {
1148        &self.authenticated_role
1149    }
1150
1151    pub fn uuid(&self) -> Uuid {
1152        self.uuid
1153    }
1154
1155    pub fn client_ip(&self) -> Option<IpAddr> {
1156        self.client_ip
1157    }
1158
1159    pub fn connected_at(&self) -> EpochMillis {
1160        self.connected_at
1161    }
1162}
1163
1164#[derive(Debug)]
1165/// A pending transaction waiting to be committed.
1166pub struct PendingTxn {
1167    /// Context used to send a response back to the client.
1168    ctx: ExecuteContext,
1169    /// Client response for transaction.
1170    response: Result<PendingTxnResponse, AdapterError>,
1171    /// The action to take at the end of the transaction.
1172    action: EndTransactionAction,
1173}
1174
1175#[derive(Debug)]
1176/// The response we'll send for a [`PendingTxn`].
1177pub enum PendingTxnResponse {
1178    /// The transaction will be committed.
1179    Committed {
1180        /// Parameters that will change, and their values, once this transaction is complete.
1181        params: BTreeMap<&'static str, String>,
1182    },
1183    /// The transaction will be rolled back.
1184    Rolledback {
1185        /// Parameters that will change, and their values, once this transaction is complete.
1186        params: BTreeMap<&'static str, String>,
1187    },
1188}
1189
1190impl PendingTxnResponse {
1191    pub fn extend_params(&mut self, p: impl IntoIterator<Item = (&'static str, String)>) {
1192        match self {
1193            PendingTxnResponse::Committed { params }
1194            | PendingTxnResponse::Rolledback { params } => params.extend(p),
1195        }
1196    }
1197}
1198
1199impl From<PendingTxnResponse> for ExecuteResponse {
1200    fn from(value: PendingTxnResponse) -> Self {
1201        match value {
1202            PendingTxnResponse::Committed { params } => {
1203                ExecuteResponse::TransactionCommitted { params }
1204            }
1205            PendingTxnResponse::Rolledback { params } => {
1206                ExecuteResponse::TransactionRolledBack { params }
1207            }
1208        }
1209    }
1210}
1211
1212#[derive(Debug)]
1213/// A pending read transaction waiting to be linearized along with metadata about it's state
1214pub struct PendingReadTxn {
1215    /// The transaction type
1216    txn: PendingRead,
1217    /// The timestamp context of the transaction.
1218    timestamp_context: TimestampContext<mz_repr::Timestamp>,
1219    /// When we created this pending txn, when the transaction ends. Only used for metrics.
1220    created: Instant,
1221    /// Number of times we requeued the processing of this pending read txn.
1222    /// Requeueing is necessary if the time we executed the query is after the current oracle time;
1223    /// see [`Coordinator::message_linearize_reads`] for more details.
1224    num_requeues: u64,
1225    /// Telemetry context.
1226    otel_ctx: OpenTelemetryContext,
1227}
1228
1229impl PendingReadTxn {
1230    /// Return the timestamp context of the pending read transaction.
1231    pub fn timestamp_context(&self) -> &TimestampContext<mz_repr::Timestamp> {
1232        &self.timestamp_context
1233    }
1234
1235    pub(crate) fn take_context(self) -> ExecuteContext {
1236        self.txn.take_context()
1237    }
1238}
1239
1240#[derive(Debug)]
1241/// A pending read transaction waiting to be linearized.
1242enum PendingRead {
1243    Read {
1244        /// The inner transaction.
1245        txn: PendingTxn,
1246    },
1247    ReadThenWrite {
1248        /// Context used to send a response back to the client.
1249        ctx: ExecuteContext,
1250        /// Channel used to alert the transaction that the read has been linearized and send back
1251        /// `ctx`.
1252        tx: oneshot::Sender<Option<ExecuteContext>>,
1253    },
1254}
1255
1256impl PendingRead {
1257    /// Alert the client that the read has been linearized.
1258    ///
1259    /// If it is necessary to finalize an execute, return the state necessary to do so
1260    /// (execution context and result)
1261    #[instrument(level = "debug")]
1262    pub fn finish(self) -> Option<(ExecuteContext, Result<ExecuteResponse, AdapterError>)> {
1263        match self {
1264            PendingRead::Read {
1265                txn:
1266                    PendingTxn {
1267                        mut ctx,
1268                        response,
1269                        action,
1270                    },
1271                ..
1272            } => {
1273                let changed = ctx.session_mut().vars_mut().end_transaction(action);
1274                // Append any parameters that changed to the response.
1275                let response = response.map(|mut r| {
1276                    r.extend_params(changed);
1277                    ExecuteResponse::from(r)
1278                });
1279
1280                Some((ctx, response))
1281            }
1282            PendingRead::ReadThenWrite { ctx, tx, .. } => {
1283                // Ignore errors if the caller has hung up.
1284                let _ = tx.send(Some(ctx));
1285                None
1286            }
1287        }
1288    }
1289
1290    fn label(&self) -> &'static str {
1291        match self {
1292            PendingRead::Read { .. } => "read",
1293            PendingRead::ReadThenWrite { .. } => "read_then_write",
1294        }
1295    }
1296
1297    pub(crate) fn take_context(self) -> ExecuteContext {
1298        match self {
1299            PendingRead::Read { txn, .. } => txn.ctx,
1300            PendingRead::ReadThenWrite { ctx, tx, .. } => {
1301                // Inform the transaction that we've taken their context.
1302                // Ignore errors if the caller has hung up.
1303                let _ = tx.send(None);
1304                ctx
1305            }
1306        }
1307    }
1308}
1309
1310/// State that the coordinator must process as part of retiring
1311/// command execution.  `ExecuteContextExtra::Default` is guaranteed
1312/// to produce a value that will cause the coordinator to do nothing, and
1313/// is intended for use by code that invokes the execution processing flow
1314/// (i.e., `sequence_plan`) without actually being a statement execution.
1315///
1316/// This is a pure data struct containing only the statement logging ID.
1317/// For auto-retire-on-drop behavior, use `ExecuteContextGuard` which wraps
1318/// this struct and owns the channel for sending retirement messages.
1319#[derive(Debug, Default)]
1320#[must_use]
1321pub struct ExecuteContextExtra {
1322    statement_uuid: Option<StatementLoggingId>,
1323}
1324
1325impl ExecuteContextExtra {
1326    pub(crate) fn new(statement_uuid: Option<StatementLoggingId>) -> Self {
1327        Self { statement_uuid }
1328    }
1329    pub fn is_trivial(&self) -> bool {
1330        self.statement_uuid.is_none()
1331    }
1332    pub fn contents(&self) -> Option<StatementLoggingId> {
1333        self.statement_uuid
1334    }
1335    /// Consume this extra and return the statement UUID for retirement.
1336    /// This should only be called from code that knows what to do to finish
1337    /// up logging based on the inner value.
1338    #[must_use]
1339    pub(crate) fn retire(self) -> Option<StatementLoggingId> {
1340        self.statement_uuid
1341    }
1342}
1343
1344/// A guard that wraps `ExecuteContextExtra` and owns a channel for sending
1345/// retirement messages to the coordinator.
1346///
1347/// If this guard is dropped with a `Some` `statement_uuid` in its inner
1348/// `ExecuteContextExtra`, the `Drop` implementation will automatically send a
1349/// `Message::RetireExecute` to log the statement ending.
1350/// This handles cases like connection drops where the context cannot be
1351/// explicitly retired.
1352/// See <https://github.com/MaterializeInc/database-issues/issues/7304>
1353#[derive(Debug)]
1354#[must_use]
1355pub struct ExecuteContextGuard {
1356    extra: ExecuteContextExtra,
1357    /// Channel for sending messages to the coordinator. Used for auto-retiring on drop.
1358    /// For `Default` instances, this is a dummy sender (receiver already dropped), so
1359    /// sends will fail silently - which is the desired behavior since Default instances
1360    /// should only be used for non-logged statements.
1361    coordinator_tx: mpsc::UnboundedSender<Message>,
1362}
1363
1364impl Default for ExecuteContextGuard {
1365    fn default() -> Self {
1366        // Create a dummy sender by immediately dropping the receiver.
1367        // Any send on this channel will fail silently, which is the desired
1368        // behavior for Default instances (non-logged statements).
1369        let (tx, _rx) = mpsc::unbounded_channel();
1370        Self {
1371            extra: ExecuteContextExtra::default(),
1372            coordinator_tx: tx,
1373        }
1374    }
1375}
1376
1377impl ExecuteContextGuard {
1378    pub(crate) fn new(
1379        statement_uuid: Option<StatementLoggingId>,
1380        coordinator_tx: mpsc::UnboundedSender<Message>,
1381    ) -> Self {
1382        Self {
1383            extra: ExecuteContextExtra::new(statement_uuid),
1384            coordinator_tx,
1385        }
1386    }
1387    pub fn is_trivial(&self) -> bool {
1388        self.extra.is_trivial()
1389    }
1390    pub fn contents(&self) -> Option<StatementLoggingId> {
1391        self.extra.contents()
1392    }
1393    /// Take responsibility for the contents.  This should only be
1394    /// called from code that knows what to do to finish up logging
1395    /// based on the inner value.
1396    ///
1397    /// Returns the inner `ExecuteContextExtra`, consuming the guard without
1398    /// triggering the auto-retire behavior.
1399    pub(crate) fn defuse(mut self) -> ExecuteContextExtra {
1400        // Taking statement_uuid prevents the Drop impl from sending a retire message
1401        std::mem::take(&mut self.extra)
1402    }
1403}
1404
1405impl Drop for ExecuteContextGuard {
1406    fn drop(&mut self) {
1407        if let Some(statement_uuid) = self.extra.statement_uuid.take() {
1408            // Auto-retire since the guard was dropped without explicit retirement (likely due
1409            // to connection drop).
1410            let msg = Message::RetireExecute {
1411                data: ExecuteContextExtra {
1412                    statement_uuid: Some(statement_uuid),
1413                },
1414                otel_ctx: OpenTelemetryContext::obtain(),
1415                reason: StatementEndedExecutionReason::Aborted,
1416            };
1417            // Send may fail for Default instances (dummy sender), which is fine since
1418            // Default instances should only be used for non-logged statements.
1419            let _ = self.coordinator_tx.send(msg);
1420        }
1421    }
1422}
1423
1424/// Bundle of state related to statement execution.
1425///
1426/// This struct collects a bundle of state that needs to be threaded
1427/// through various functions as part of statement execution.
1428/// It is used to finalize execution, by calling `retire`. Finalizing execution
1429/// involves sending the session back to the pgwire layer so that it
1430/// may be used to process further commands. It also involves
1431/// performing some work on the main coordinator thread
1432/// (e.g., recording the time at which the statement finished
1433/// executing). The state necessary to perform this work is bundled in
1434/// the `ExecuteContextGuard` object.
1435#[derive(Debug)]
1436pub struct ExecuteContext {
1437    inner: Box<ExecuteContextInner>,
1438}
1439
1440impl std::ops::Deref for ExecuteContext {
1441    type Target = ExecuteContextInner;
1442    fn deref(&self) -> &Self::Target {
1443        &*self.inner
1444    }
1445}
1446
1447impl std::ops::DerefMut for ExecuteContext {
1448    fn deref_mut(&mut self) -> &mut Self::Target {
1449        &mut *self.inner
1450    }
1451}
1452
1453#[derive(Debug)]
1454pub struct ExecuteContextInner {
1455    tx: ClientTransmitter<ExecuteResponse>,
1456    internal_cmd_tx: mpsc::UnboundedSender<Message>,
1457    session: Session,
1458    extra: ExecuteContextGuard,
1459}
1460
1461impl ExecuteContext {
1462    pub fn session(&self) -> &Session {
1463        &self.session
1464    }
1465
1466    pub fn session_mut(&mut self) -> &mut Session {
1467        &mut self.session
1468    }
1469
1470    pub fn tx(&self) -> &ClientTransmitter<ExecuteResponse> {
1471        &self.tx
1472    }
1473
1474    pub fn tx_mut(&mut self) -> &mut ClientTransmitter<ExecuteResponse> {
1475        &mut self.tx
1476    }
1477
1478    pub fn from_parts(
1479        tx: ClientTransmitter<ExecuteResponse>,
1480        internal_cmd_tx: mpsc::UnboundedSender<Message>,
1481        session: Session,
1482        extra: ExecuteContextGuard,
1483    ) -> Self {
1484        Self {
1485            inner: ExecuteContextInner {
1486                tx,
1487                session,
1488                extra,
1489                internal_cmd_tx,
1490            }
1491            .into(),
1492        }
1493    }
1494
1495    /// By calling this function, the caller takes responsibility for
1496    /// dealing with the instance of `ExecuteContextGuard`. This is
1497    /// intended to support protocols (like `COPY FROM`) that involve
1498    /// multiple passes of sending the session back and forth between
1499    /// the coordinator and the pgwire layer. As part of any such
1500    /// protocol, we must ensure that the `ExecuteContextGuard`
1501    /// (possibly wrapped in a new `ExecuteContext`) is passed back to the coordinator for
1502    /// eventual retirement.
1503    pub fn into_parts(
1504        self,
1505    ) -> (
1506        ClientTransmitter<ExecuteResponse>,
1507        mpsc::UnboundedSender<Message>,
1508        Session,
1509        ExecuteContextGuard,
1510    ) {
1511        let ExecuteContextInner {
1512            tx,
1513            internal_cmd_tx,
1514            session,
1515            extra,
1516        } = *self.inner;
1517        (tx, internal_cmd_tx, session, extra)
1518    }
1519
1520    /// Retire the execution, by sending a message to the coordinator.
1521    #[instrument(level = "debug")]
1522    pub fn retire(self, result: Result<ExecuteResponse, AdapterError>) {
1523        let ExecuteContextInner {
1524            tx,
1525            internal_cmd_tx,
1526            session,
1527            extra,
1528        } = *self.inner;
1529        let reason = if extra.is_trivial() {
1530            None
1531        } else {
1532            Some((&result).into())
1533        };
1534        tx.send(result, session);
1535        if let Some(reason) = reason {
1536            // Retire the guard to get the inner ExecuteContextExtra without triggering auto-retire
1537            let extra = extra.defuse();
1538            if let Err(e) = internal_cmd_tx.send(Message::RetireExecute {
1539                otel_ctx: OpenTelemetryContext::obtain(),
1540                data: extra,
1541                reason,
1542            }) {
1543                warn!("internal_cmd_rx dropped before we could send: {:?}", e);
1544            }
1545        }
1546    }
1547
1548    pub fn extra(&self) -> &ExecuteContextGuard {
1549        &self.extra
1550    }
1551
1552    pub fn extra_mut(&mut self) -> &mut ExecuteContextGuard {
1553        &mut self.extra
1554    }
1555}
1556
1557#[derive(Debug)]
1558struct ClusterReplicaStatuses(
1559    BTreeMap<ClusterId, BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>>>,
1560);
1561
1562impl ClusterReplicaStatuses {
1563    pub(crate) fn new() -> ClusterReplicaStatuses {
1564        ClusterReplicaStatuses(BTreeMap::new())
1565    }
1566
1567    /// Initializes the statuses of the specified cluster.
1568    ///
1569    /// Panics if the cluster statuses are already initialized.
1570    pub(crate) fn initialize_cluster_statuses(&mut self, cluster_id: ClusterId) {
1571        let prev = self.0.insert(cluster_id, BTreeMap::new());
1572        assert_eq!(
1573            prev, None,
1574            "cluster {cluster_id} statuses already initialized"
1575        );
1576    }
1577
1578    /// Initializes the statuses of the specified cluster replica.
1579    ///
1580    /// Panics if the cluster replica statuses are already initialized.
1581    pub(crate) fn initialize_cluster_replica_statuses(
1582        &mut self,
1583        cluster_id: ClusterId,
1584        replica_id: ReplicaId,
1585        num_processes: usize,
1586        time: DateTime<Utc>,
1587    ) {
1588        tracing::info!(
1589            ?cluster_id,
1590            ?replica_id,
1591            ?time,
1592            "initializing cluster replica status"
1593        );
1594        let replica_statuses = self.0.entry(cluster_id).or_default();
1595        let process_statuses = (0..num_processes)
1596            .map(|process_id| {
1597                let status = ClusterReplicaProcessStatus {
1598                    status: ClusterStatus::Offline(Some(OfflineReason::Initializing)),
1599                    time: time.clone(),
1600                };
1601                (u64::cast_from(process_id), status)
1602            })
1603            .collect();
1604        let prev = replica_statuses.insert(replica_id, process_statuses);
1605        assert_none!(
1606            prev,
1607            "cluster replica {cluster_id}.{replica_id} statuses already initialized"
1608        );
1609    }
1610
1611    /// Removes the statuses of the specified cluster.
1612    ///
1613    /// Panics if the cluster does not exist.
1614    pub(crate) fn remove_cluster_statuses(
1615        &mut self,
1616        cluster_id: &ClusterId,
1617    ) -> BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>> {
1618        let prev = self.0.remove(cluster_id);
1619        prev.unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"))
1620    }
1621
1622    /// Removes the statuses of the specified cluster replica.
1623    ///
1624    /// Panics if the cluster or replica does not exist.
1625    pub(crate) fn remove_cluster_replica_statuses(
1626        &mut self,
1627        cluster_id: &ClusterId,
1628        replica_id: &ReplicaId,
1629    ) -> BTreeMap<ProcessId, ClusterReplicaProcessStatus> {
1630        let replica_statuses = self
1631            .0
1632            .get_mut(cluster_id)
1633            .unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"));
1634        let prev = replica_statuses.remove(replica_id);
1635        prev.unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1636    }
1637
1638    /// Inserts or updates the status of the specified cluster replica process.
1639    ///
1640    /// Panics if the cluster or replica does not exist.
1641    pub(crate) fn ensure_cluster_status(
1642        &mut self,
1643        cluster_id: ClusterId,
1644        replica_id: ReplicaId,
1645        process_id: ProcessId,
1646        status: ClusterReplicaProcessStatus,
1647    ) {
1648        let replica_statuses = self
1649            .0
1650            .get_mut(&cluster_id)
1651            .unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"))
1652            .get_mut(&replica_id)
1653            .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"));
1654        replica_statuses.insert(process_id, status);
1655    }
1656
1657    /// Computes the status of the cluster replica as a whole.
1658    ///
1659    /// Panics if `cluster_id` or `replica_id` don't exist.
1660    pub fn get_cluster_replica_status(
1661        &self,
1662        cluster_id: ClusterId,
1663        replica_id: ReplicaId,
1664    ) -> ClusterStatus {
1665        let process_status = self.get_cluster_replica_statuses(cluster_id, replica_id);
1666        Self::cluster_replica_status(process_status)
1667    }
1668
1669    /// Computes the status of the cluster replica as a whole.
1670    pub fn cluster_replica_status(
1671        process_status: &BTreeMap<ProcessId, ClusterReplicaProcessStatus>,
1672    ) -> ClusterStatus {
1673        process_status
1674            .values()
1675            .fold(ClusterStatus::Online, |s, p| match (s, p.status) {
1676                (ClusterStatus::Online, ClusterStatus::Online) => ClusterStatus::Online,
1677                (x, y) => {
1678                    let reason_x = match x {
1679                        ClusterStatus::Offline(reason) => reason,
1680                        ClusterStatus::Online => None,
1681                    };
1682                    let reason_y = match y {
1683                        ClusterStatus::Offline(reason) => reason,
1684                        ClusterStatus::Online => None,
1685                    };
1686                    // Arbitrarily pick the first known not-ready reason.
1687                    ClusterStatus::Offline(reason_x.or(reason_y))
1688                }
1689            })
1690    }
1691
1692    /// Gets the statuses of the given cluster replica.
1693    ///
1694    /// Panics if the cluster or replica does not exist
1695    pub(crate) fn get_cluster_replica_statuses(
1696        &self,
1697        cluster_id: ClusterId,
1698        replica_id: ReplicaId,
1699    ) -> &BTreeMap<ProcessId, ClusterReplicaProcessStatus> {
1700        self.try_get_cluster_replica_statuses(cluster_id, replica_id)
1701            .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1702    }
1703
1704    /// Gets the statuses of the given cluster replica.
1705    pub(crate) fn try_get_cluster_replica_statuses(
1706        &self,
1707        cluster_id: ClusterId,
1708        replica_id: ReplicaId,
1709    ) -> Option<&BTreeMap<ProcessId, ClusterReplicaProcessStatus>> {
1710        self.try_get_cluster_statuses(cluster_id)
1711            .and_then(|statuses| statuses.get(&replica_id))
1712    }
1713
1714    /// Gets the statuses of the given cluster.
1715    pub(crate) fn try_get_cluster_statuses(
1716        &self,
1717        cluster_id: ClusterId,
1718    ) -> Option<&BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>>> {
1719        self.0.get(&cluster_id)
1720    }
1721}
1722
1723/// Glues the external world to the Timely workers.
1724#[derive(Derivative)]
1725#[derivative(Debug)]
1726pub struct Coordinator {
1727    /// The controller for the storage and compute layers.
1728    #[derivative(Debug = "ignore")]
1729    controller: mz_controller::Controller,
1730    /// The catalog in an Arc suitable for readonly references. The Arc allows
1731    /// us to hand out cheap copies of the catalog to functions that can use it
1732    /// off of the main coordinator thread. If the coordinator needs to mutate
1733    /// the catalog, call [`Self::catalog_mut`], which will clone this struct member,
1734    /// allowing it to be mutated here while the other off-thread references can
1735    /// read their catalog as long as needed. In the future we would like this
1736    /// to be a pTVC, but for now this is sufficient.
1737    catalog: Arc<Catalog>,
1738
1739    /// A client for persist. Initially, this is only used for reading stashed
1740    /// peek responses out of batches.
1741    persist_client: PersistClient,
1742
1743    /// Channel to manage internal commands from the coordinator to itself.
1744    internal_cmd_tx: mpsc::UnboundedSender<Message>,
1745    /// Notification that triggers a group commit.
1746    group_commit_tx: appends::GroupCommitNotifier,
1747
1748    /// Channel for strict serializable reads ready to commit.
1749    strict_serializable_reads_tx: mpsc::UnboundedSender<(ConnectionId, PendingReadTxn)>,
1750
1751    /// Mechanism for totally ordering write and read timestamps, so that all reads
1752    /// reflect exactly the set of writes that precede them, and no writes that follow.
1753    global_timelines: BTreeMap<Timeline, TimelineState<Timestamp>>,
1754
1755    /// A generator for transient [`GlobalId`]s, shareable with other threads.
1756    transient_id_gen: Arc<TransientIdGen>,
1757    /// A map from connection ID to metadata about that connection for all
1758    /// active connections.
1759    active_conns: BTreeMap<ConnectionId, ConnMeta>,
1760
1761    /// For each transaction, the read holds taken to support any performed reads.
1762    ///
1763    /// Upon completing a transaction, these read holds should be dropped.
1764    txn_read_holds: BTreeMap<ConnectionId, read_policy::ReadHolds<Timestamp>>,
1765
1766    /// Access to the peek fields should be restricted to methods in the [`peek`] API.
1767    /// A map from pending peek ids to the queue into which responses are sent, and
1768    /// the connection id of the client that initiated the peek.
1769    pending_peeks: BTreeMap<Uuid, PendingPeek>,
1770    /// A map from client connection ids to a set of all pending peeks for that client.
1771    client_pending_peeks: BTreeMap<ConnectionId, BTreeMap<Uuid, ClusterId>>,
1772
1773    /// A map from client connection ids to pending linearize read transaction.
1774    pending_linearize_read_txns: BTreeMap<ConnectionId, PendingReadTxn>,
1775
1776    /// A map from the compute sink ID to it's state description.
1777    active_compute_sinks: BTreeMap<GlobalId, ActiveComputeSink>,
1778    /// A map from active webhooks to their invalidation handle.
1779    active_webhooks: BTreeMap<CatalogItemId, WebhookAppenderInvalidator>,
1780    /// A map of active `COPY FROM` statements. The Coordinator waits for `clusterd`
1781    /// to stage Batches in Persist that we will then link into the shard.
1782    active_copies: BTreeMap<ConnectionId, ActiveCopyFrom>,
1783
1784    /// A map from connection ids to a watch channel that is set to `true` if the connection
1785    /// received a cancel request.
1786    staged_cancellation: BTreeMap<ConnectionId, (watch::Sender<bool>, watch::Receiver<bool>)>,
1787    /// Active introspection subscribes.
1788    introspection_subscribes: BTreeMap<GlobalId, IntrospectionSubscribe>,
1789
1790    /// Locks that grant access to a specific object, populated lazily as objects are written to.
1791    write_locks: BTreeMap<CatalogItemId, Arc<tokio::sync::Mutex<()>>>,
1792    /// Plans that are currently deferred and waiting on a write lock.
1793    deferred_write_ops: BTreeMap<ConnectionId, DeferredOp>,
1794
1795    /// Pending writes waiting for a group commit.
1796    pending_writes: Vec<PendingWriteTxn>,
1797
1798    /// For the realtime timeline, an explicit SELECT or INSERT on a table will bump the
1799    /// table's timestamps, but there are cases where timestamps are not bumped but
1800    /// we expect the closed timestamps to advance (`AS OF X`, SUBSCRIBing views over
1801    /// RT sources and tables). To address these, spawn a task that forces table
1802    /// timestamps to close on a regular interval. This roughly tracks the behavior
1803    /// of realtime sources that close off timestamps on an interval.
1804    ///
1805    /// For non-realtime timelines, nothing pushes the timestamps forward, so we must do
1806    /// it manually.
1807    advance_timelines_interval: Interval,
1808
1809    /// Serialized DDL. DDL must be serialized because:
1810    /// - Many of them do off-thread work and need to verify the catalog is in a valid state, but
1811    ///   [`PlanValidity`] does not currently support tracking all changes. Doing that correctly
1812    ///   seems to be more difficult than it's worth, so we would instead re-plan and re-sequence
1813    ///   the statements.
1814    /// - Re-planning a statement is hard because Coordinator and Session state is mutated at
1815    ///   various points, and we would need to correctly reset those changes before re-planning and
1816    ///   re-sequencing.
1817    serialized_ddl: LockedVecDeque<DeferredPlanStatement>,
1818
1819    /// Handle to secret manager that can create and delete secrets from
1820    /// an arbitrary secret storage engine.
1821    secrets_controller: Arc<dyn SecretsController>,
1822    /// A secrets reader than maintains an in-memory cache, where values have a set TTL.
1823    caching_secrets_reader: CachingSecretsReader,
1824
1825    /// Handle to a manager that can create and delete kubernetes resources
1826    /// (ie: VpcEndpoint objects)
1827    cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
1828
1829    /// Persist client for fetching storage metadata such as size metrics.
1830    storage_usage_client: StorageUsageClient,
1831    /// The interval at which to collect storage usage information.
1832    storage_usage_collection_interval: Duration,
1833
1834    /// Segment analytics client.
1835    #[derivative(Debug = "ignore")]
1836    segment_client: Option<mz_segment::Client>,
1837
1838    /// Coordinator metrics.
1839    metrics: Metrics,
1840    /// Optimizer metrics.
1841    optimizer_metrics: OptimizerMetrics,
1842
1843    /// Tracing handle.
1844    tracing_handle: TracingHandle,
1845
1846    /// Data used by the statement logging feature.
1847    statement_logging: StatementLogging,
1848
1849    /// Limit for how many concurrent webhook requests we allow.
1850    webhook_concurrency_limit: WebhookConcurrencyLimiter,
1851
1852    /// Optional config for the Postgres-backed timestamp oracle. This is
1853    /// _required_ when `postgres` is configured using the `timestamp_oracle`
1854    /// system variable.
1855    pg_timestamp_oracle_config: Option<PostgresTimestampOracleConfig>,
1856
1857    /// Periodically asks cluster scheduling policies to make their decisions.
1858    check_cluster_scheduling_policies_interval: Interval,
1859
1860    /// This keeps the last On/Off decision for each cluster and each scheduling policy.
1861    /// (Clusters that have been dropped or are otherwise out of scope for automatic scheduling are
1862    /// periodically cleaned up from this Map.)
1863    cluster_scheduling_decisions: BTreeMap<ClusterId, BTreeMap<&'static str, SchedulingDecision>>,
1864
1865    /// When doing 0dt upgrades/in read-only mode, periodically ask all known
1866    /// clusters/collections whether they are caught up.
1867    caught_up_check_interval: Interval,
1868
1869    /// Context needed to check whether all clusters/collections have caught up.
1870    /// Only used during 0dt deployment, while in read-only mode.
1871    caught_up_check: Option<CaughtUpCheckContext>,
1872
1873    /// Tracks the state associated with the currently installed watchsets.
1874    installed_watch_sets: BTreeMap<WatchSetId, (ConnectionId, WatchSetResponse)>,
1875
1876    /// Tracks the currently installed watchsets for each connection.
1877    connection_watch_sets: BTreeMap<ConnectionId, BTreeSet<WatchSetId>>,
1878
1879    /// Tracks the statuses of all cluster replicas.
1880    cluster_replica_statuses: ClusterReplicaStatuses,
1881
1882    /// Whether or not to start controllers in read-only mode. This is only
1883    /// meant for use during development of read-only clusters and 0dt upgrades
1884    /// and should go away once we have proper orchestration during upgrades.
1885    read_only_controllers: bool,
1886
1887    /// Updates to builtin tables that are being buffered while we are in
1888    /// read-only mode. We apply these all at once when coming out of read-only
1889    /// mode.
1890    ///
1891    /// This is a `Some` while in read-only mode and will be replaced by a
1892    /// `None` when we transition out of read-only mode and write out any
1893    /// buffered updates.
1894    buffered_builtin_table_updates: Option<Vec<BuiltinTableUpdate>>,
1895
1896    license_key: ValidatedLicenseKey,
1897}
1898
1899impl Coordinator {
1900    /// Initializes coordinator state based on the contained catalog. Must be
1901    /// called after creating the coordinator and before calling the
1902    /// `Coordinator::serve` method.
1903    #[instrument(name = "coord::bootstrap")]
1904    pub(crate) async fn bootstrap(
1905        &mut self,
1906        boot_ts: Timestamp,
1907        migrated_storage_collections_0dt: BTreeSet<CatalogItemId>,
1908        mut builtin_table_updates: Vec<BuiltinTableUpdate>,
1909        cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
1910        uncached_local_exprs: BTreeMap<GlobalId, LocalExpressions>,
1911        audit_logs_iterator: AuditLogIterator,
1912    ) -> Result<(), AdapterError> {
1913        let bootstrap_start = Instant::now();
1914        info!("startup: coordinator init: bootstrap beginning");
1915        info!("startup: coordinator init: bootstrap: preamble beginning");
1916
1917        // Initialize cluster replica statuses.
1918        // Gross iterator is to avoid partial borrow issues.
1919        let cluster_statuses: Vec<(_, Vec<_>)> = self
1920            .catalog()
1921            .clusters()
1922            .map(|cluster| {
1923                (
1924                    cluster.id(),
1925                    cluster
1926                        .replicas()
1927                        .map(|replica| {
1928                            (replica.replica_id, replica.config.location.num_processes())
1929                        })
1930                        .collect(),
1931                )
1932            })
1933            .collect();
1934        let now = self.now_datetime();
1935        for (cluster_id, replica_statuses) in cluster_statuses {
1936            self.cluster_replica_statuses
1937                .initialize_cluster_statuses(cluster_id);
1938            for (replica_id, num_processes) in replica_statuses {
1939                self.cluster_replica_statuses
1940                    .initialize_cluster_replica_statuses(
1941                        cluster_id,
1942                        replica_id,
1943                        num_processes,
1944                        now,
1945                    );
1946            }
1947        }
1948
1949        let system_config = self.catalog().system_config();
1950
1951        // Inform metrics about the initial system configuration.
1952        mz_metrics::update_dyncfg(&system_config.dyncfg_updates());
1953
1954        // Inform the controllers about their initial configuration.
1955        let compute_config = flags::compute_config(system_config);
1956        let storage_config = flags::storage_config(system_config);
1957        let scheduling_config = flags::orchestrator_scheduling_config(system_config);
1958        let dyncfg_updates = system_config.dyncfg_updates();
1959        self.controller.compute.update_configuration(compute_config);
1960        self.controller.storage.update_parameters(storage_config);
1961        self.controller
1962            .update_orchestrator_scheduling_config(scheduling_config);
1963        self.controller.update_configuration(dyncfg_updates);
1964
1965        self.validate_resource_limit_numeric(
1966            Numeric::zero(),
1967            self.current_credit_consumption_rate(),
1968            |system_vars| {
1969                self.license_key
1970                    .max_credit_consumption_rate()
1971                    .map_or_else(|| system_vars.max_credit_consumption_rate(), Numeric::from)
1972            },
1973            "cluster replica",
1974            MAX_CREDIT_CONSUMPTION_RATE.name(),
1975        )?;
1976
1977        let mut policies_to_set: BTreeMap<CompactionWindow, CollectionIdBundle> =
1978            Default::default();
1979
1980        let enable_worker_core_affinity =
1981            self.catalog().system_config().enable_worker_core_affinity();
1982        for instance in self.catalog.clusters() {
1983            self.controller.create_cluster(
1984                instance.id,
1985                ClusterConfig {
1986                    arranged_logs: instance.log_indexes.clone(),
1987                    workload_class: instance.config.workload_class.clone(),
1988                },
1989            )?;
1990            for replica in instance.replicas() {
1991                let role = instance.role();
1992                self.controller.create_replica(
1993                    instance.id,
1994                    replica.replica_id,
1995                    instance.name.clone(),
1996                    replica.name.clone(),
1997                    role,
1998                    replica.config.clone(),
1999                    enable_worker_core_affinity,
2000                )?;
2001            }
2002        }
2003
2004        info!(
2005            "startup: coordinator init: bootstrap: preamble complete in {:?}",
2006            bootstrap_start.elapsed()
2007        );
2008
2009        let init_storage_collections_start = Instant::now();
2010        info!("startup: coordinator init: bootstrap: storage collections init beginning");
2011        self.bootstrap_storage_collections(&migrated_storage_collections_0dt)
2012            .await;
2013        info!(
2014            "startup: coordinator init: bootstrap: storage collections init complete in {:?}",
2015            init_storage_collections_start.elapsed()
2016        );
2017
2018        // The storage controller knows about the introspection collections now, so we can start
2019        // sinking introspection updates in the compute controller. It makes sense to do that as
2020        // soon as possible, to avoid updates piling up in the compute controller's internal
2021        // buffers.
2022        self.controller.start_compute_introspection_sink();
2023
2024        let optimize_dataflows_start = Instant::now();
2025        info!("startup: coordinator init: bootstrap: optimize dataflow plans beginning");
2026        let entries: Vec<_> = self.catalog().entries().cloned().collect();
2027        let uncached_global_exps = self.bootstrap_dataflow_plans(&entries, cached_global_exprs)?;
2028        info!(
2029            "startup: coordinator init: bootstrap: optimize dataflow plans complete in {:?}",
2030            optimize_dataflows_start.elapsed()
2031        );
2032
2033        // We don't need to wait for the cache to update.
2034        let _fut = self.catalog().update_expression_cache(
2035            uncached_local_exprs.into_iter().collect(),
2036            uncached_global_exps.into_iter().collect(),
2037        );
2038
2039        // Select dataflow as-ofs. This step relies on the storage collections created by
2040        // `bootstrap_storage_collections` and the dataflow plans created by
2041        // `bootstrap_dataflow_plans`.
2042        let bootstrap_as_ofs_start = Instant::now();
2043        info!("startup: coordinator init: bootstrap: dataflow as-of bootstrapping beginning");
2044        let dataflow_read_holds = self.bootstrap_dataflow_as_ofs().await;
2045        info!(
2046            "startup: coordinator init: bootstrap: dataflow as-of bootstrapping complete in {:?}",
2047            bootstrap_as_ofs_start.elapsed()
2048        );
2049
2050        let postamble_start = Instant::now();
2051        info!("startup: coordinator init: bootstrap: postamble beginning");
2052
2053        let logs: BTreeSet<_> = BUILTINS::logs()
2054            .map(|log| self.catalog().resolve_builtin_log(log))
2055            .flat_map(|item_id| self.catalog().get_global_ids(&item_id))
2056            .collect();
2057
2058        let mut privatelink_connections = BTreeMap::new();
2059
2060        for entry in &entries {
2061            debug!(
2062                "coordinator init: installing {} {}",
2063                entry.item().typ(),
2064                entry.id()
2065            );
2066            let mut policy = entry.item().initial_logical_compaction_window();
2067            match entry.item() {
2068                // Currently catalog item rebuild assumes that sinks and
2069                // indexes are always built individually and does not store information
2070                // about how it was built. If we start building multiple sinks and/or indexes
2071                // using a single dataflow, we have to make sure the rebuild process re-runs
2072                // the same multiple-build dataflow.
2073                CatalogItem::Source(source) => {
2074                    // Propagate source compaction windows to subsources if needed.
2075                    if source.custom_logical_compaction_window.is_none() {
2076                        if let DataSourceDesc::IngestionExport { ingestion_id, .. } =
2077                            source.data_source
2078                        {
2079                            policy = Some(
2080                                self.catalog()
2081                                    .get_entry(&ingestion_id)
2082                                    .source()
2083                                    .expect("must be source")
2084                                    .custom_logical_compaction_window
2085                                    .unwrap_or_default(),
2086                            );
2087                        }
2088                    }
2089                    policies_to_set
2090                        .entry(policy.expect("sources have a compaction window"))
2091                        .or_insert_with(Default::default)
2092                        .storage_ids
2093                        .insert(source.global_id());
2094                }
2095                CatalogItem::Table(table) => {
2096                    policies_to_set
2097                        .entry(policy.expect("tables have a compaction window"))
2098                        .or_insert_with(Default::default)
2099                        .storage_ids
2100                        .extend(table.global_ids());
2101                }
2102                CatalogItem::Index(idx) => {
2103                    let policy_entry = policies_to_set
2104                        .entry(policy.expect("indexes have a compaction window"))
2105                        .or_insert_with(Default::default);
2106
2107                    if logs.contains(&idx.on) {
2108                        policy_entry
2109                            .compute_ids
2110                            .entry(idx.cluster_id)
2111                            .or_insert_with(BTreeSet::new)
2112                            .insert(idx.global_id());
2113                    } else {
2114                        let df_desc = self
2115                            .catalog()
2116                            .try_get_physical_plan(&idx.global_id())
2117                            .expect("added in `bootstrap_dataflow_plans`")
2118                            .clone();
2119
2120                        let df_meta = self
2121                            .catalog()
2122                            .try_get_dataflow_metainfo(&idx.global_id())
2123                            .expect("added in `bootstrap_dataflow_plans`");
2124
2125                        if self.catalog().state().system_config().enable_mz_notices() {
2126                            // Collect optimization hint updates.
2127                            self.catalog().state().pack_optimizer_notices(
2128                                &mut builtin_table_updates,
2129                                df_meta.optimizer_notices.iter(),
2130                                Diff::ONE,
2131                            );
2132                        }
2133
2134                        // What follows is morally equivalent to `self.ship_dataflow(df, idx.cluster_id)`,
2135                        // but we cannot call that as it will also downgrade the read hold on the index.
2136                        policy_entry
2137                            .compute_ids
2138                            .entry(idx.cluster_id)
2139                            .or_insert_with(Default::default)
2140                            .extend(df_desc.export_ids());
2141
2142                        self.controller
2143                            .compute
2144                            .create_dataflow(idx.cluster_id, df_desc, None)
2145                            .unwrap_or_terminate("cannot fail to create dataflows");
2146                    }
2147                }
2148                CatalogItem::View(_) => (),
2149                CatalogItem::MaterializedView(mview) => {
2150                    policies_to_set
2151                        .entry(policy.expect("materialized views have a compaction window"))
2152                        .or_insert_with(Default::default)
2153                        .storage_ids
2154                        .insert(mview.global_id_writes());
2155
2156                    let mut df_desc = self
2157                        .catalog()
2158                        .try_get_physical_plan(&mview.global_id_writes())
2159                        .expect("added in `bootstrap_dataflow_plans`")
2160                        .clone();
2161
2162                    if let Some(initial_as_of) = mview.initial_as_of.clone() {
2163                        df_desc.set_initial_as_of(initial_as_of);
2164                    }
2165
2166                    // If we have a refresh schedule that has a last refresh, then set the `until` to the last refresh.
2167                    let until = mview
2168                        .refresh_schedule
2169                        .as_ref()
2170                        .and_then(|s| s.last_refresh())
2171                        .and_then(|r| r.try_step_forward());
2172                    if let Some(until) = until {
2173                        df_desc.until.meet_assign(&Antichain::from_elem(until));
2174                    }
2175
2176                    let df_meta = self
2177                        .catalog()
2178                        .try_get_dataflow_metainfo(&mview.global_id_writes())
2179                        .expect("added in `bootstrap_dataflow_plans`");
2180
2181                    if self.catalog().state().system_config().enable_mz_notices() {
2182                        // Collect optimization hint updates.
2183                        self.catalog().state().pack_optimizer_notices(
2184                            &mut builtin_table_updates,
2185                            df_meta.optimizer_notices.iter(),
2186                            Diff::ONE,
2187                        );
2188                    }
2189
2190                    self.ship_dataflow(df_desc, mview.cluster_id, None).await;
2191
2192                    // If this is a replacement MV, it must remain read-only until the replacement
2193                    // gets applied.
2194                    if mview.replacement_target.is_none() {
2195                        self.allow_writes(mview.cluster_id, mview.global_id_writes());
2196                    }
2197                }
2198                CatalogItem::Sink(sink) => {
2199                    policies_to_set
2200                        .entry(CompactionWindow::Default)
2201                        .or_insert_with(Default::default)
2202                        .storage_ids
2203                        .insert(sink.global_id());
2204                }
2205                CatalogItem::Connection(catalog_connection) => {
2206                    if let ConnectionDetails::AwsPrivatelink(conn) = &catalog_connection.details {
2207                        privatelink_connections.insert(
2208                            entry.id(),
2209                            VpcEndpointConfig {
2210                                aws_service_name: conn.service_name.clone(),
2211                                availability_zone_ids: conn.availability_zones.clone(),
2212                            },
2213                        );
2214                    }
2215                }
2216                CatalogItem::ContinualTask(ct) => {
2217                    policies_to_set
2218                        .entry(policy.expect("continual tasks have a compaction window"))
2219                        .or_insert_with(Default::default)
2220                        .storage_ids
2221                        .insert(ct.global_id());
2222
2223                    let mut df_desc = self
2224                        .catalog()
2225                        .try_get_physical_plan(&ct.global_id())
2226                        .expect("added in `bootstrap_dataflow_plans`")
2227                        .clone();
2228
2229                    if let Some(initial_as_of) = ct.initial_as_of.clone() {
2230                        df_desc.set_initial_as_of(initial_as_of);
2231                    }
2232
2233                    let df_meta = self
2234                        .catalog()
2235                        .try_get_dataflow_metainfo(&ct.global_id())
2236                        .expect("added in `bootstrap_dataflow_plans`");
2237
2238                    if self.catalog().state().system_config().enable_mz_notices() {
2239                        // Collect optimization hint updates.
2240                        self.catalog().state().pack_optimizer_notices(
2241                            &mut builtin_table_updates,
2242                            df_meta.optimizer_notices.iter(),
2243                            Diff::ONE,
2244                        );
2245                    }
2246
2247                    self.ship_dataflow(df_desc, ct.cluster_id, None).await;
2248                    self.allow_writes(ct.cluster_id, ct.global_id());
2249                }
2250                // Nothing to do for these cases
2251                CatalogItem::Log(_)
2252                | CatalogItem::Type(_)
2253                | CatalogItem::Func(_)
2254                | CatalogItem::Secret(_) => {}
2255            }
2256        }
2257
2258        if let Some(cloud_resource_controller) = &self.cloud_resource_controller {
2259            // Clean up any extraneous VpcEndpoints that shouldn't exist.
2260            let existing_vpc_endpoints = cloud_resource_controller
2261                .list_vpc_endpoints()
2262                .await
2263                .context("list vpc endpoints")?;
2264            let existing_vpc_endpoints = BTreeSet::from_iter(existing_vpc_endpoints.into_keys());
2265            let desired_vpc_endpoints = privatelink_connections.keys().cloned().collect();
2266            let vpc_endpoints_to_remove = existing_vpc_endpoints.difference(&desired_vpc_endpoints);
2267            for id in vpc_endpoints_to_remove {
2268                cloud_resource_controller
2269                    .delete_vpc_endpoint(*id)
2270                    .await
2271                    .context("deleting extraneous vpc endpoint")?;
2272            }
2273
2274            // Ensure desired VpcEndpoints are up to date.
2275            for (id, spec) in privatelink_connections {
2276                cloud_resource_controller
2277                    .ensure_vpc_endpoint(id, spec)
2278                    .await
2279                    .context("ensuring vpc endpoint")?;
2280            }
2281        }
2282
2283        // Having installed all entries, creating all constraints, we can now drop read holds and
2284        // relax read policies.
2285        drop(dataflow_read_holds);
2286        // TODO -- Improve `initialize_read_policies` API so we can avoid calling this in a loop.
2287        for (cw, policies) in policies_to_set {
2288            self.initialize_read_policies(&policies, cw).await;
2289        }
2290
2291        // Expose mapping from T-shirt sizes to actual sizes
2292        builtin_table_updates.extend(
2293            self.catalog().state().resolve_builtin_table_updates(
2294                self.catalog().state().pack_all_replica_size_updates(),
2295            ),
2296        );
2297
2298        debug!("startup: coordinator init: bootstrap: initializing migrated builtin tables");
2299        // When 0dt is enabled, we create new shards for any migrated builtin storage collections.
2300        // In read-only mode, the migrated builtin tables (which are a subset of migrated builtin
2301        // storage collections) need to be back-filled so that any dependent dataflow can be
2302        // hydrated. Additionally, these shards are not registered with the txn-shard, and cannot
2303        // be registered while in read-only, so they are written to directly.
2304        let migrated_updates_fut = if self.controller.read_only() {
2305            let min_timestamp = Timestamp::minimum();
2306            let migrated_builtin_table_updates: Vec<_> = builtin_table_updates
2307                .extract_if(.., |update| {
2308                    let gid = self.catalog().get_entry(&update.id).latest_global_id();
2309                    migrated_storage_collections_0dt.contains(&update.id)
2310                        && self
2311                            .controller
2312                            .storage_collections
2313                            .collection_frontiers(gid)
2314                            .expect("all tables are registered")
2315                            .write_frontier
2316                            .elements()
2317                            == &[min_timestamp]
2318                })
2319                .collect();
2320            if migrated_builtin_table_updates.is_empty() {
2321                futures::future::ready(()).boxed()
2322            } else {
2323                // Group all updates per-table.
2324                let mut grouped_appends: BTreeMap<GlobalId, Vec<TableData>> = BTreeMap::new();
2325                for update in migrated_builtin_table_updates {
2326                    let gid = self.catalog().get_entry(&update.id).latest_global_id();
2327                    grouped_appends.entry(gid).or_default().push(update.data);
2328                }
2329                info!(
2330                    "coordinator init: rehydrating migrated builtin tables in read-only mode: {:?}",
2331                    grouped_appends.keys().collect::<Vec<_>>()
2332                );
2333
2334                // Consolidate Row data, staged batches must already be consolidated.
2335                let mut all_appends = Vec::with_capacity(grouped_appends.len());
2336                for (item_id, table_data) in grouped_appends.into_iter() {
2337                    let mut all_rows = Vec::new();
2338                    let mut all_data = Vec::new();
2339                    for data in table_data {
2340                        match data {
2341                            TableData::Rows(rows) => all_rows.extend(rows),
2342                            TableData::Batches(_) => all_data.push(data),
2343                        }
2344                    }
2345                    differential_dataflow::consolidation::consolidate(&mut all_rows);
2346                    all_data.push(TableData::Rows(all_rows));
2347
2348                    // TODO(parkmycar): Use SmallVec throughout.
2349                    all_appends.push((item_id, all_data));
2350                }
2351
2352                let fut = self
2353                    .controller
2354                    .storage
2355                    .append_table(min_timestamp, boot_ts.step_forward(), all_appends)
2356                    .expect("cannot fail to append");
2357                async {
2358                    fut.await
2359                        .expect("One-shot shouldn't be dropped during bootstrap")
2360                        .unwrap_or_terminate("cannot fail to append")
2361                }
2362                .boxed()
2363            }
2364        } else {
2365            futures::future::ready(()).boxed()
2366        };
2367
2368        info!(
2369            "startup: coordinator init: bootstrap: postamble complete in {:?}",
2370            postamble_start.elapsed()
2371        );
2372
2373        let builtin_update_start = Instant::now();
2374        info!("startup: coordinator init: bootstrap: generate builtin updates beginning");
2375
2376        if self.controller.read_only() {
2377            info!(
2378                "coordinator init: bootstrap: stashing builtin table updates while in read-only mode"
2379            );
2380
2381            // TODO(jkosh44) Optimize deserializing the audit log in read-only mode.
2382            let audit_join_start = Instant::now();
2383            info!("startup: coordinator init: bootstrap: audit log deserialization beginning");
2384            let audit_log_updates: Vec<_> = audit_logs_iterator
2385                .map(|(audit_log, ts)| StateUpdate {
2386                    kind: StateUpdateKind::AuditLog(audit_log),
2387                    ts,
2388                    diff: StateDiff::Addition,
2389                })
2390                .collect();
2391            let audit_log_builtin_table_updates = self
2392                .catalog()
2393                .state()
2394                .generate_builtin_table_updates(audit_log_updates);
2395            builtin_table_updates.extend(audit_log_builtin_table_updates);
2396            info!(
2397                "startup: coordinator init: bootstrap: audit log deserialization complete in {:?}",
2398                audit_join_start.elapsed()
2399            );
2400            self.buffered_builtin_table_updates
2401                .as_mut()
2402                .expect("in read-only mode")
2403                .append(&mut builtin_table_updates);
2404        } else {
2405            self.bootstrap_tables(&entries, builtin_table_updates, audit_logs_iterator)
2406                .await;
2407        };
2408        info!(
2409            "startup: coordinator init: bootstrap: generate builtin updates complete in {:?}",
2410            builtin_update_start.elapsed()
2411        );
2412
2413        let cleanup_secrets_start = Instant::now();
2414        info!("startup: coordinator init: bootstrap: generate secret cleanup beginning");
2415        // Cleanup orphaned secrets. Errors during list() or delete() do not
2416        // need to prevent bootstrap from succeeding; we will retry next
2417        // startup.
2418        {
2419            // Destructure Self so we can selectively move fields into the async
2420            // task.
2421            let Self {
2422                secrets_controller,
2423                catalog,
2424                ..
2425            } = self;
2426
2427            let next_user_item_id = catalog.get_next_user_item_id().await?;
2428            let next_system_item_id = catalog.get_next_system_item_id().await?;
2429            let read_only = self.controller.read_only();
2430            // Fetch all IDs from the catalog to future-proof against other
2431            // things using secrets. Today, SECRET and CONNECTION objects use
2432            // secrets_controller.ensure, but more things could in the future
2433            // that would be easy to miss adding here.
2434            let catalog_ids: BTreeSet<CatalogItemId> =
2435                catalog.entries().map(|entry| entry.id()).collect();
2436            let secrets_controller = Arc::clone(secrets_controller);
2437
2438            spawn(|| "cleanup-orphaned-secrets", async move {
2439                if read_only {
2440                    info!(
2441                        "coordinator init: not cleaning up orphaned secrets while in read-only mode"
2442                    );
2443                    return;
2444                }
2445                info!("coordinator init: cleaning up orphaned secrets");
2446
2447                match secrets_controller.list().await {
2448                    Ok(controller_secrets) => {
2449                        let controller_secrets: BTreeSet<CatalogItemId> =
2450                            controller_secrets.into_iter().collect();
2451                        let orphaned = controller_secrets.difference(&catalog_ids);
2452                        for id in orphaned {
2453                            let id_too_large = match id {
2454                                CatalogItemId::System(id) => *id >= next_system_item_id,
2455                                CatalogItemId::User(id) => *id >= next_user_item_id,
2456                                CatalogItemId::IntrospectionSourceIndex(_)
2457                                | CatalogItemId::Transient(_) => false,
2458                            };
2459                            if id_too_large {
2460                                info!(
2461                                    %next_user_item_id, %next_system_item_id,
2462                                    "coordinator init: not deleting orphaned secret {id} that was likely created by a newer deploy generation"
2463                                );
2464                            } else {
2465                                info!("coordinator init: deleting orphaned secret {id}");
2466                                fail_point!("orphan_secrets");
2467                                if let Err(e) = secrets_controller.delete(*id).await {
2468                                    warn!(
2469                                        "Dropping orphaned secret has encountered an error: {}",
2470                                        e
2471                                    );
2472                                }
2473                            }
2474                        }
2475                    }
2476                    Err(e) => warn!("Failed to list secrets during orphan cleanup: {:?}", e),
2477                }
2478            });
2479        }
2480        info!(
2481            "startup: coordinator init: bootstrap: generate secret cleanup complete in {:?}",
2482            cleanup_secrets_start.elapsed()
2483        );
2484
2485        // Run all of our final steps concurrently.
2486        let final_steps_start = Instant::now();
2487        info!(
2488            "startup: coordinator init: bootstrap: migrate builtin tables in read-only mode beginning"
2489        );
2490        migrated_updates_fut
2491            .instrument(info_span!("coord::bootstrap::final"))
2492            .await;
2493
2494        debug!(
2495            "startup: coordinator init: bootstrap: announcing completion of initialization to controller"
2496        );
2497        // Announce the completion of initialization.
2498        self.controller.initialization_complete();
2499
2500        // Initialize unified introspection.
2501        self.bootstrap_introspection_subscribes().await;
2502
2503        info!(
2504            "startup: coordinator init: bootstrap: migrate builtin tables in read-only mode complete in {:?}",
2505            final_steps_start.elapsed()
2506        );
2507
2508        info!(
2509            "startup: coordinator init: bootstrap complete in {:?}",
2510            bootstrap_start.elapsed()
2511        );
2512        Ok(())
2513    }
2514
2515    /// Prepares tables for writing by resetting them to a known state and
2516    /// appending the given builtin table updates. The timestamp oracle
2517    /// will be advanced to the write timestamp of the append when this
2518    /// method returns.
2519    #[allow(clippy::async_yields_async)]
2520    #[instrument]
2521    async fn bootstrap_tables(
2522        &mut self,
2523        entries: &[CatalogEntry],
2524        mut builtin_table_updates: Vec<BuiltinTableUpdate>,
2525        audit_logs_iterator: AuditLogIterator,
2526    ) {
2527        /// Smaller helper struct of metadata for bootstrapping tables.
2528        struct TableMetadata<'a> {
2529            id: CatalogItemId,
2530            name: &'a QualifiedItemName,
2531            table: &'a Table,
2532        }
2533
2534        // Filter our entries down to just tables.
2535        let table_metas: Vec<_> = entries
2536            .into_iter()
2537            .filter_map(|entry| {
2538                entry.table().map(|table| TableMetadata {
2539                    id: entry.id(),
2540                    name: entry.name(),
2541                    table,
2542                })
2543            })
2544            .collect();
2545
2546        // Append empty batches to advance the timestamp of all tables.
2547        debug!("coordinator init: advancing all tables to current timestamp");
2548        let WriteTimestamp {
2549            timestamp: write_ts,
2550            advance_to,
2551        } = self.get_local_write_ts().await;
2552        let appends = table_metas
2553            .iter()
2554            .map(|meta| (meta.table.global_id_writes(), Vec::new()))
2555            .collect();
2556        // Append the tables in the background. We apply the write timestamp before getting a read
2557        // timestamp and reading a snapshot of each table, so the snapshots will block on their own
2558        // until the appends are complete.
2559        let table_fence_rx = self
2560            .controller
2561            .storage
2562            .append_table(write_ts.clone(), advance_to, appends)
2563            .expect("invalid updates");
2564
2565        self.apply_local_write(write_ts).await;
2566
2567        // Add builtin table updates the clear the contents of all system tables
2568        debug!("coordinator init: resetting system tables");
2569        let read_ts = self.get_local_read_ts().await;
2570
2571        // Filter out the 'mz_storage_usage_by_shard' table since we need to retain that info for
2572        // billing purposes.
2573        let mz_storage_usage_by_shard_schema: SchemaSpecifier = self
2574            .catalog()
2575            .resolve_system_schema(MZ_STORAGE_USAGE_BY_SHARD.schema)
2576            .into();
2577        let is_storage_usage_by_shard = |meta: &TableMetadata| -> bool {
2578            meta.name.item == MZ_STORAGE_USAGE_BY_SHARD.name
2579                && meta.name.qualifiers.schema_spec == mz_storage_usage_by_shard_schema
2580        };
2581
2582        let mut retraction_tasks = Vec::new();
2583        let mut system_tables: Vec<_> = table_metas
2584            .iter()
2585            .filter(|meta| meta.id.is_system() && !is_storage_usage_by_shard(meta))
2586            .collect();
2587
2588        // Special case audit events because it's append only.
2589        let (audit_events_idx, _) = system_tables
2590            .iter()
2591            .find_position(|table| {
2592                table.id == self.catalog().resolve_builtin_table(&MZ_AUDIT_EVENTS)
2593            })
2594            .expect("mz_audit_events must exist");
2595        let audit_events = system_tables.remove(audit_events_idx);
2596        let audit_log_task = self.bootstrap_audit_log_table(
2597            audit_events.id,
2598            audit_events.name,
2599            audit_events.table,
2600            audit_logs_iterator,
2601            read_ts,
2602        );
2603
2604        for system_table in system_tables {
2605            let table_id = system_table.id;
2606            let full_name = self.catalog().resolve_full_name(system_table.name, None);
2607            debug!("coordinator init: resetting system table {full_name} ({table_id})");
2608
2609            // Fetch the current contents of the table for retraction.
2610            let snapshot_fut = self
2611                .controller
2612                .storage_collections
2613                .snapshot_cursor(system_table.table.global_id_writes(), read_ts);
2614            let batch_fut = self
2615                .controller
2616                .storage_collections
2617                .create_update_builder(system_table.table.global_id_writes());
2618
2619            let task = spawn(|| format!("snapshot-{table_id}"), async move {
2620                // Create a TimestamplessUpdateBuilder.
2621                let mut batch = batch_fut
2622                    .await
2623                    .unwrap_or_terminate("cannot fail to create a batch for a BuiltinTable");
2624                tracing::info!(?table_id, "starting snapshot");
2625                // Get a cursor which will emit a consolidated snapshot.
2626                let mut snapshot_cursor = snapshot_fut
2627                    .await
2628                    .unwrap_or_terminate("cannot fail to snapshot");
2629
2630                // Retract the current contents, spilling into our builder.
2631                while let Some(values) = snapshot_cursor.next().await {
2632                    for (key, _t, d) in values {
2633                        let d_invert = d.neg();
2634                        batch.add(&key, &(), &d_invert).await;
2635                    }
2636                }
2637                tracing::info!(?table_id, "finished snapshot");
2638
2639                let batch = batch.finish().await;
2640                BuiltinTableUpdate::batch(table_id, batch)
2641            });
2642            retraction_tasks.push(task);
2643        }
2644
2645        let retractions_res = futures::future::join_all(retraction_tasks).await;
2646        for retractions in retractions_res {
2647            builtin_table_updates.push(retractions);
2648        }
2649
2650        let audit_join_start = Instant::now();
2651        info!("startup: coordinator init: bootstrap: join audit log deserialization beginning");
2652        let audit_log_updates = audit_log_task.await;
2653        let audit_log_builtin_table_updates = self
2654            .catalog()
2655            .state()
2656            .generate_builtin_table_updates(audit_log_updates);
2657        builtin_table_updates.extend(audit_log_builtin_table_updates);
2658        info!(
2659            "startup: coordinator init: bootstrap: join audit log deserialization complete in {:?}",
2660            audit_join_start.elapsed()
2661        );
2662
2663        // Now that the snapshots are complete, the appends must also be complete.
2664        table_fence_rx
2665            .await
2666            .expect("One-shot shouldn't be dropped during bootstrap")
2667            .unwrap_or_terminate("cannot fail to append");
2668
2669        info!("coordinator init: sending builtin table updates");
2670        let (_builtin_updates_fut, write_ts) = self
2671            .builtin_table_update()
2672            .execute(builtin_table_updates)
2673            .await;
2674        info!(?write_ts, "our write ts");
2675        if let Some(write_ts) = write_ts {
2676            self.apply_local_write(write_ts).await;
2677        }
2678    }
2679
2680    /// Prepare updates to the audit log table. The audit log table append only and very large, so
2681    /// we only need to find the events present in `audit_logs_iterator` but not in the audit log
2682    /// table.
2683    #[instrument]
2684    fn bootstrap_audit_log_table<'a>(
2685        &self,
2686        table_id: CatalogItemId,
2687        name: &'a QualifiedItemName,
2688        table: &'a Table,
2689        audit_logs_iterator: AuditLogIterator,
2690        read_ts: Timestamp,
2691    ) -> JoinHandle<Vec<StateUpdate>> {
2692        let full_name = self.catalog().resolve_full_name(name, None);
2693        debug!("coordinator init: reconciling audit log: {full_name} ({table_id})");
2694        let current_contents_fut = self
2695            .controller
2696            .storage_collections
2697            .snapshot(table.global_id_writes(), read_ts);
2698        spawn(|| format!("snapshot-audit-log-{table_id}"), async move {
2699            let current_contents = current_contents_fut
2700                .await
2701                .unwrap_or_terminate("cannot fail to fetch snapshot");
2702            let contents_len = current_contents.len();
2703            debug!("coordinator init: audit log table ({table_id}) size {contents_len}");
2704
2705            // Fetch the largest audit log event ID that has been written to the table.
2706            let max_table_id = current_contents
2707                .into_iter()
2708                .filter(|(_, diff)| *diff == 1)
2709                .map(|(row, _diff)| row.unpack_first().unwrap_uint64())
2710                .sorted()
2711                .rev()
2712                .next();
2713
2714            // Filter audit log catalog updates to those that are not present in the table.
2715            audit_logs_iterator
2716                .take_while(|(audit_log, _)| match max_table_id {
2717                    Some(id) => audit_log.event.sortable_id() > id,
2718                    None => true,
2719                })
2720                .map(|(audit_log, ts)| StateUpdate {
2721                    kind: StateUpdateKind::AuditLog(audit_log),
2722                    ts,
2723                    diff: StateDiff::Addition,
2724                })
2725                .collect::<Vec<_>>()
2726        })
2727    }
2728
2729    /// Initializes all storage collections required by catalog objects in the storage controller.
2730    ///
2731    /// This method takes care of collection creation, as well as migration of existing
2732    /// collections.
2733    ///
2734    /// Creating all storage collections in a single `create_collections` call, rather than on
2735    /// demand, is more efficient as it reduces the number of writes to durable storage. It also
2736    /// allows subsequent bootstrap logic to fetch metadata (such as frontiers) of arbitrary
2737    /// storage collections, without needing to worry about dependency order.
2738    ///
2739    /// `migrated_storage_collections` is a set of builtin storage collections that have been
2740    /// migrated and should be handled specially.
2741    #[instrument]
2742    async fn bootstrap_storage_collections(
2743        &mut self,
2744        migrated_storage_collections: &BTreeSet<CatalogItemId>,
2745    ) {
2746        let catalog = self.catalog();
2747        let source_status_collection_id = catalog
2748            .resolve_builtin_storage_collection(&mz_catalog::builtin::MZ_SOURCE_STATUS_HISTORY);
2749        let source_status_collection_id = catalog
2750            .get_entry(&source_status_collection_id)
2751            .latest_global_id();
2752
2753        let source_desc = |object_id: GlobalId,
2754                           data_source: &DataSourceDesc,
2755                           desc: &RelationDesc,
2756                           timeline: &Timeline| {
2757            let (data_source, status_collection_id) = match data_source.clone() {
2758                // Re-announce the source description.
2759                DataSourceDesc::Ingestion { desc, cluster_id } => {
2760                    let desc = desc.into_inline_connection(catalog.state());
2761                    let ingestion = IngestionDescription::new(desc, cluster_id, object_id);
2762
2763                    (
2764                        DataSource::Ingestion(ingestion),
2765                        Some(source_status_collection_id),
2766                    )
2767                }
2768                DataSourceDesc::OldSyntaxIngestion {
2769                    desc,
2770                    progress_subsource,
2771                    data_config,
2772                    details,
2773                    cluster_id,
2774                } => {
2775                    let desc = desc.into_inline_connection(catalog.state());
2776                    let data_config = data_config.into_inline_connection(catalog.state());
2777                    // TODO(parkmycar): We should probably check the type here, but I'm not sure if
2778                    // this will always be a Source or a Table.
2779                    let progress_subsource =
2780                        catalog.get_entry(&progress_subsource).latest_global_id();
2781                    let mut ingestion =
2782                        IngestionDescription::new(desc, cluster_id, progress_subsource);
2783                    let legacy_export = SourceExport {
2784                        storage_metadata: (),
2785                        data_config,
2786                        details,
2787                    };
2788                    ingestion.source_exports.insert(object_id, legacy_export);
2789
2790                    (
2791                        DataSource::Ingestion(ingestion),
2792                        Some(source_status_collection_id),
2793                    )
2794                }
2795                DataSourceDesc::IngestionExport {
2796                    ingestion_id,
2797                    external_reference: _,
2798                    details,
2799                    data_config,
2800                } => {
2801                    // TODO(parkmycar): We should probably check the type here, but I'm not sure if
2802                    // this will always be a Source or a Table.
2803                    let ingestion_id = catalog.get_entry(&ingestion_id).latest_global_id();
2804                    (
2805                        DataSource::IngestionExport {
2806                            ingestion_id,
2807                            details,
2808                            data_config: data_config.into_inline_connection(catalog.state()),
2809                        },
2810                        Some(source_status_collection_id),
2811                    )
2812                }
2813                DataSourceDesc::Webhook { .. } => {
2814                    (DataSource::Webhook, Some(source_status_collection_id))
2815                }
2816                DataSourceDesc::Progress => (DataSource::Progress, None),
2817                DataSourceDesc::Introspection(introspection) => {
2818                    (DataSource::Introspection(introspection), None)
2819                }
2820            };
2821            CollectionDescription {
2822                desc: desc.clone(),
2823                data_source,
2824                since: None,
2825                status_collection_id,
2826                timeline: Some(timeline.clone()),
2827                primary: None,
2828            }
2829        };
2830
2831        let mut compute_collections = vec![];
2832        let mut collections = vec![];
2833        let mut new_builtin_continual_tasks = vec![];
2834        for entry in catalog.entries() {
2835            match entry.item() {
2836                CatalogItem::Source(source) => {
2837                    collections.push((
2838                        source.global_id(),
2839                        source_desc(
2840                            source.global_id(),
2841                            &source.data_source,
2842                            &source.desc,
2843                            &source.timeline,
2844                        ),
2845                    ));
2846                }
2847                CatalogItem::Table(table) => {
2848                    match &table.data_source {
2849                        TableDataSource::TableWrites { defaults: _ } => {
2850                            let versions: BTreeMap<_, _> = table
2851                                .collection_descs()
2852                                .map(|(gid, version, desc)| (version, (gid, desc)))
2853                                .collect();
2854                            let collection_descs = versions.iter().map(|(version, (gid, desc))| {
2855                                let next_version = version.bump();
2856                                let primary_collection =
2857                                    versions.get(&next_version).map(|(gid, _desc)| gid).copied();
2858                                let mut collection_desc =
2859                                    CollectionDescription::for_table(desc.clone());
2860                                collection_desc.primary = primary_collection;
2861
2862                                (*gid, collection_desc)
2863                            });
2864                            collections.extend(collection_descs);
2865                        }
2866                        TableDataSource::DataSource {
2867                            desc: data_source_desc,
2868                            timeline,
2869                        } => {
2870                            // TODO(alter_table): Support versioning tables that read from sources.
2871                            soft_assert_eq_or_log!(table.collections.len(), 1);
2872                            let collection_descs =
2873                                table.collection_descs().map(|(gid, _version, desc)| {
2874                                    (
2875                                        gid,
2876                                        source_desc(
2877                                            entry.latest_global_id(),
2878                                            data_source_desc,
2879                                            &desc,
2880                                            timeline,
2881                                        ),
2882                                    )
2883                                });
2884                            collections.extend(collection_descs);
2885                        }
2886                    };
2887                }
2888                CatalogItem::MaterializedView(mv) => {
2889                    let collection_descs = mv.collection_descs().map(|(gid, _version, desc)| {
2890                        let collection_desc =
2891                            CollectionDescription::for_other(desc, mv.initial_as_of.clone());
2892                        (gid, collection_desc)
2893                    });
2894
2895                    collections.extend(collection_descs);
2896                    compute_collections.push((mv.global_id_writes(), mv.desc.latest()));
2897                }
2898                CatalogItem::ContinualTask(ct) => {
2899                    let collection_desc =
2900                        CollectionDescription::for_other(ct.desc.clone(), ct.initial_as_of.clone());
2901                    if ct.global_id().is_system() && collection_desc.since.is_none() {
2902                        // We need a non-0 since to make as_of selection work. Fill it in below with
2903                        // the `bootstrap_builtin_continual_tasks` call, which can only be run after
2904                        // `create_collections_for_bootstrap`.
2905                        new_builtin_continual_tasks.push((ct.global_id(), collection_desc));
2906                    } else {
2907                        compute_collections.push((ct.global_id(), ct.desc.clone()));
2908                        collections.push((ct.global_id(), collection_desc));
2909                    }
2910                }
2911                CatalogItem::Sink(sink) => {
2912                    let storage_sink_from_entry = self.catalog().get_entry_by_global_id(&sink.from);
2913                    let from_desc = storage_sink_from_entry
2914                        .relation_desc()
2915                        .expect("sinks can only be built on items with descs")
2916                        .into_owned();
2917                    let collection_desc = CollectionDescription {
2918                        // TODO(sinks): make generic once we have more than one sink type.
2919                        desc: KAFKA_PROGRESS_DESC.clone(),
2920                        data_source: DataSource::Sink {
2921                            desc: ExportDescription {
2922                                sink: StorageSinkDesc {
2923                                    from: sink.from,
2924                                    from_desc,
2925                                    connection: sink
2926                                        .connection
2927                                        .clone()
2928                                        .into_inline_connection(self.catalog().state()),
2929                                    envelope: sink.envelope,
2930                                    as_of: Antichain::from_elem(Timestamp::minimum()),
2931                                    with_snapshot: sink.with_snapshot,
2932                                    version: sink.version,
2933                                    from_storage_metadata: (),
2934                                    to_storage_metadata: (),
2935                                    commit_interval: sink.commit_interval,
2936                                },
2937                                instance_id: sink.cluster_id,
2938                            },
2939                        },
2940                        since: None,
2941                        status_collection_id: None,
2942                        timeline: None,
2943                        primary: None,
2944                    };
2945                    collections.push((sink.global_id, collection_desc));
2946                }
2947                _ => (),
2948            }
2949        }
2950
2951        let register_ts = if self.controller.read_only() {
2952            self.get_local_read_ts().await
2953        } else {
2954            // Getting a write timestamp bumps the write timestamp in the
2955            // oracle, which we're not allowed in read-only mode.
2956            self.get_local_write_ts().await.timestamp
2957        };
2958
2959        let storage_metadata = self.catalog.state().storage_metadata();
2960        let migrated_storage_collections = migrated_storage_collections
2961            .into_iter()
2962            .flat_map(|item_id| self.catalog.get_entry(item_id).global_ids())
2963            .collect();
2964
2965        // Before possibly creating collections, make sure their schemas are correct.
2966        //
2967        // Across different versions of Materialize the nullability of columns can change based on
2968        // updates to our optimizer.
2969        self.controller
2970            .storage
2971            .evolve_nullability_for_bootstrap(storage_metadata, compute_collections)
2972            .await
2973            .unwrap_or_terminate("cannot fail to evolve collections");
2974
2975        self.controller
2976            .storage
2977            .create_collections_for_bootstrap(
2978                storage_metadata,
2979                Some(register_ts),
2980                collections,
2981                &migrated_storage_collections,
2982            )
2983            .await
2984            .unwrap_or_terminate("cannot fail to create collections");
2985
2986        self.bootstrap_builtin_continual_tasks(new_builtin_continual_tasks)
2987            .await;
2988
2989        if !self.controller.read_only() {
2990            self.apply_local_write(register_ts).await;
2991        }
2992    }
2993
2994    /// Make as_of selection happy for builtin CTs. Ideally we'd write the
2995    /// initial as_of down in the durable catalog, but that's hard because of
2996    /// boot ordering. Instead, we set the since of the storage collection to
2997    /// something that's a reasonable lower bound for the as_of. Then, if the
2998    /// upper is 0, the as_of selection code will allow us to jump it forward to
2999    /// this since.
3000    async fn bootstrap_builtin_continual_tasks(
3001        &mut self,
3002        // TODO(alter_table): Switch to CatalogItemId.
3003        mut collections: Vec<(GlobalId, CollectionDescription<Timestamp>)>,
3004    ) {
3005        for (id, collection) in &mut collections {
3006            let entry = self.catalog.get_entry_by_global_id(id);
3007            let ct = match &entry.item {
3008                CatalogItem::ContinualTask(ct) => ct.clone(),
3009                _ => unreachable!("only called with continual task builtins"),
3010            };
3011            let debug_name = self
3012                .catalog()
3013                .resolve_full_name(entry.name(), None)
3014                .to_string();
3015            let (_optimized_plan, physical_plan, _metainfo) = self
3016                .optimize_create_continual_task(&ct, *id, self.owned_catalog(), debug_name)
3017                .expect("builtin CT should optimize successfully");
3018
3019            // Determine an as of for the new continual task.
3020            let mut id_bundle = dataflow_import_id_bundle(&physical_plan, ct.cluster_id);
3021            // Can't acquire a read hold on ourselves because we don't exist yet.
3022            id_bundle.storage_ids.remove(id);
3023            let read_holds = self.acquire_read_holds(&id_bundle);
3024            let as_of = read_holds.least_valid_read();
3025
3026            collection.since = Some(as_of.clone());
3027        }
3028        self.controller
3029            .storage
3030            .create_collections(self.catalog.state().storage_metadata(), None, collections)
3031            .await
3032            .unwrap_or_terminate("cannot fail to create collections");
3033    }
3034
3035    /// Invokes the optimizer on all indexes and materialized views in the catalog and inserts the
3036    /// resulting dataflow plans into the catalog state.
3037    ///
3038    /// `ordered_catalog_entries` must be sorted in dependency order, with dependencies ordered
3039    /// before their dependants.
3040    ///
3041    /// This method does not perform timestamp selection for the dataflows, nor does it create them
3042    /// in the compute controller. Both of these steps happen later during bootstrapping.
3043    ///
3044    /// Returns a map of expressions that were not cached.
3045    #[instrument]
3046    fn bootstrap_dataflow_plans(
3047        &mut self,
3048        ordered_catalog_entries: &[CatalogEntry],
3049        mut cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
3050    ) -> Result<BTreeMap<GlobalId, GlobalExpressions>, AdapterError> {
3051        // The optimizer expects to be able to query its `ComputeInstanceSnapshot` for
3052        // collections the current dataflow can depend on. But since we don't yet install anything
3053        // on compute instances, the snapshot information is incomplete. We fix that by manually
3054        // updating `ComputeInstanceSnapshot` objects to ensure they contain collections previously
3055        // optimized.
3056        let mut instance_snapshots = BTreeMap::new();
3057        let mut uncached_expressions = BTreeMap::new();
3058
3059        let optimizer_config = OptimizerConfig::from(self.catalog().system_config());
3060
3061        for entry in ordered_catalog_entries {
3062            match entry.item() {
3063                CatalogItem::Index(idx) => {
3064                    // Collect optimizer parameters.
3065                    let compute_instance =
3066                        instance_snapshots.entry(idx.cluster_id).or_insert_with(|| {
3067                            self.instance_snapshot(idx.cluster_id)
3068                                .expect("compute instance exists")
3069                        });
3070                    let global_id = idx.global_id();
3071
3072                    // The index may already be installed on the compute instance. For example,
3073                    // this is the case for introspection indexes.
3074                    if compute_instance.contains_collection(&global_id) {
3075                        continue;
3076                    }
3077
3078                    let (optimized_plan, physical_plan, metainfo) =
3079                        match cached_global_exprs.remove(&global_id) {
3080                            Some(global_expressions)
3081                                if global_expressions.optimizer_features
3082                                    == optimizer_config.features =>
3083                            {
3084                                debug!("global expression cache hit for {global_id:?}");
3085                                (
3086                                    global_expressions.global_mir,
3087                                    global_expressions.physical_plan,
3088                                    global_expressions.dataflow_metainfos,
3089                                )
3090                            }
3091                            Some(_) | None => {
3092                                let (optimized_plan, global_lir_plan) = {
3093                                    // Build an optimizer for this INDEX.
3094                                    let mut optimizer = optimize::index::Optimizer::new(
3095                                        self.owned_catalog(),
3096                                        compute_instance.clone(),
3097                                        global_id,
3098                                        optimizer_config.clone(),
3099                                        self.optimizer_metrics(),
3100                                    );
3101
3102                                    // MIR ⇒ MIR optimization (global)
3103                                    let index_plan = optimize::index::Index::new(
3104                                        entry.name().clone(),
3105                                        idx.on,
3106                                        idx.keys.to_vec(),
3107                                    );
3108                                    let global_mir_plan = optimizer.optimize(index_plan)?;
3109                                    let optimized_plan = global_mir_plan.df_desc().clone();
3110
3111                                    // MIR ⇒ LIR lowering and LIR ⇒ LIR optimization (global)
3112                                    let global_lir_plan = optimizer.optimize(global_mir_plan)?;
3113
3114                                    (optimized_plan, global_lir_plan)
3115                                };
3116
3117                                let (physical_plan, metainfo) = global_lir_plan.unapply();
3118                                let metainfo = {
3119                                    // Pre-allocate a vector of transient GlobalIds for each notice.
3120                                    let notice_ids =
3121                                        std::iter::repeat_with(|| self.allocate_transient_id())
3122                                            .map(|(_item_id, gid)| gid)
3123                                            .take(metainfo.optimizer_notices.len())
3124                                            .collect::<Vec<_>>();
3125                                    // Return a metainfo with rendered notices.
3126                                    self.catalog().render_notices(
3127                                        metainfo,
3128                                        notice_ids,
3129                                        Some(idx.global_id()),
3130                                    )
3131                                };
3132                                uncached_expressions.insert(
3133                                    global_id,
3134                                    GlobalExpressions {
3135                                        global_mir: optimized_plan.clone(),
3136                                        physical_plan: physical_plan.clone(),
3137                                        dataflow_metainfos: metainfo.clone(),
3138                                        optimizer_features: OptimizerFeatures::from(
3139                                            self.catalog().system_config(),
3140                                        ),
3141                                    },
3142                                );
3143                                (optimized_plan, physical_plan, metainfo)
3144                            }
3145                        };
3146
3147                    let catalog = self.catalog_mut();
3148                    catalog.set_optimized_plan(idx.global_id(), optimized_plan);
3149                    catalog.set_physical_plan(idx.global_id(), physical_plan);
3150                    catalog.set_dataflow_metainfo(idx.global_id(), metainfo);
3151
3152                    compute_instance.insert_collection(idx.global_id());
3153                }
3154                CatalogItem::MaterializedView(mv) => {
3155                    // Collect optimizer parameters.
3156                    let compute_instance =
3157                        instance_snapshots.entry(mv.cluster_id).or_insert_with(|| {
3158                            self.instance_snapshot(mv.cluster_id)
3159                                .expect("compute instance exists")
3160                        });
3161                    let global_id = mv.global_id_writes();
3162
3163                    let (optimized_plan, physical_plan, metainfo) =
3164                        match cached_global_exprs.remove(&global_id) {
3165                            Some(global_expressions)
3166                                if global_expressions.optimizer_features
3167                                    == optimizer_config.features =>
3168                            {
3169                                debug!("global expression cache hit for {global_id:?}");
3170                                (
3171                                    global_expressions.global_mir,
3172                                    global_expressions.physical_plan,
3173                                    global_expressions.dataflow_metainfos,
3174                                )
3175                            }
3176                            Some(_) | None => {
3177                                let (_, internal_view_id) = self.allocate_transient_id();
3178                                let debug_name = self
3179                                    .catalog()
3180                                    .resolve_full_name(entry.name(), None)
3181                                    .to_string();
3182                                let force_non_monotonic = Default::default();
3183
3184                                let (optimized_plan, global_lir_plan) = {
3185                                    // Build an optimizer for this MATERIALIZED VIEW.
3186                                    let mut optimizer = optimize::materialized_view::Optimizer::new(
3187                                        self.owned_catalog().as_optimizer_catalog(),
3188                                        compute_instance.clone(),
3189                                        global_id,
3190                                        internal_view_id,
3191                                        mv.desc.latest().iter_names().cloned().collect(),
3192                                        mv.non_null_assertions.clone(),
3193                                        mv.refresh_schedule.clone(),
3194                                        debug_name,
3195                                        optimizer_config.clone(),
3196                                        self.optimizer_metrics(),
3197                                        force_non_monotonic,
3198                                    );
3199
3200                                    // MIR ⇒ MIR optimization (global)
3201                                    let global_mir_plan =
3202                                        optimizer.optimize(mv.optimized_expr.as_ref().clone())?;
3203                                    let optimized_plan = global_mir_plan.df_desc().clone();
3204
3205                                    // MIR ⇒ LIR lowering and LIR ⇒ LIR optimization (global)
3206                                    let global_lir_plan = optimizer.optimize(global_mir_plan)?;
3207
3208                                    (optimized_plan, global_lir_plan)
3209                                };
3210
3211                                let (physical_plan, metainfo) = global_lir_plan.unapply();
3212                                let metainfo = {
3213                                    // Pre-allocate a vector of transient GlobalIds for each notice.
3214                                    let notice_ids =
3215                                        std::iter::repeat_with(|| self.allocate_transient_id())
3216                                            .map(|(_item_id, global_id)| global_id)
3217                                            .take(metainfo.optimizer_notices.len())
3218                                            .collect::<Vec<_>>();
3219                                    // Return a metainfo with rendered notices.
3220                                    self.catalog().render_notices(
3221                                        metainfo,
3222                                        notice_ids,
3223                                        Some(mv.global_id_writes()),
3224                                    )
3225                                };
3226                                uncached_expressions.insert(
3227                                    global_id,
3228                                    GlobalExpressions {
3229                                        global_mir: optimized_plan.clone(),
3230                                        physical_plan: physical_plan.clone(),
3231                                        dataflow_metainfos: metainfo.clone(),
3232                                        optimizer_features: OptimizerFeatures::from(
3233                                            self.catalog().system_config(),
3234                                        ),
3235                                    },
3236                                );
3237                                (optimized_plan, physical_plan, metainfo)
3238                            }
3239                        };
3240
3241                    let catalog = self.catalog_mut();
3242                    catalog.set_optimized_plan(mv.global_id_writes(), optimized_plan);
3243                    catalog.set_physical_plan(mv.global_id_writes(), physical_plan);
3244                    catalog.set_dataflow_metainfo(mv.global_id_writes(), metainfo);
3245
3246                    compute_instance.insert_collection(mv.global_id_writes());
3247                }
3248                CatalogItem::ContinualTask(ct) => {
3249                    let compute_instance =
3250                        instance_snapshots.entry(ct.cluster_id).or_insert_with(|| {
3251                            self.instance_snapshot(ct.cluster_id)
3252                                .expect("compute instance exists")
3253                        });
3254                    let global_id = ct.global_id();
3255
3256                    let (optimized_plan, physical_plan, metainfo) =
3257                        match cached_global_exprs.remove(&global_id) {
3258                            Some(global_expressions)
3259                                if global_expressions.optimizer_features
3260                                    == optimizer_config.features =>
3261                            {
3262                                debug!("global expression cache hit for {global_id:?}");
3263                                (
3264                                    global_expressions.global_mir,
3265                                    global_expressions.physical_plan,
3266                                    global_expressions.dataflow_metainfos,
3267                                )
3268                            }
3269                            Some(_) | None => {
3270                                let debug_name = self
3271                                    .catalog()
3272                                    .resolve_full_name(entry.name(), None)
3273                                    .to_string();
3274                                let (optimized_plan, physical_plan, metainfo) = self
3275                                    .optimize_create_continual_task(
3276                                        ct,
3277                                        global_id,
3278                                        self.owned_catalog(),
3279                                        debug_name,
3280                                    )?;
3281                                uncached_expressions.insert(
3282                                    global_id,
3283                                    GlobalExpressions {
3284                                        global_mir: optimized_plan.clone(),
3285                                        physical_plan: physical_plan.clone(),
3286                                        dataflow_metainfos: metainfo.clone(),
3287                                        optimizer_features: OptimizerFeatures::from(
3288                                            self.catalog().system_config(),
3289                                        ),
3290                                    },
3291                                );
3292                                (optimized_plan, physical_plan, metainfo)
3293                            }
3294                        };
3295
3296                    let catalog = self.catalog_mut();
3297                    catalog.set_optimized_plan(ct.global_id(), optimized_plan);
3298                    catalog.set_physical_plan(ct.global_id(), physical_plan);
3299                    catalog.set_dataflow_metainfo(ct.global_id(), metainfo);
3300
3301                    compute_instance.insert_collection(ct.global_id());
3302                }
3303                _ => (),
3304            }
3305        }
3306
3307        Ok(uncached_expressions)
3308    }
3309
3310    /// Selects for each compute dataflow an as-of suitable for bootstrapping it.
3311    ///
3312    /// Returns a set of [`ReadHold`]s that ensures the read frontiers of involved collections stay
3313    /// in place and that must not be dropped before all compute dataflows have been created with
3314    /// the compute controller.
3315    ///
3316    /// This method expects all storage collections and dataflow plans to be available, so it must
3317    /// run after [`Coordinator::bootstrap_storage_collections`] and
3318    /// [`Coordinator::bootstrap_dataflow_plans`].
3319    async fn bootstrap_dataflow_as_ofs(&mut self) -> BTreeMap<GlobalId, ReadHold<Timestamp>> {
3320        let mut catalog_ids = Vec::new();
3321        let mut dataflows = Vec::new();
3322        let mut read_policies = BTreeMap::new();
3323        for entry in self.catalog.entries() {
3324            let gid = match entry.item() {
3325                CatalogItem::Index(idx) => idx.global_id(),
3326                CatalogItem::MaterializedView(mv) => mv.global_id_writes(),
3327                CatalogItem::ContinualTask(ct) => ct.global_id(),
3328                CatalogItem::Table(_)
3329                | CatalogItem::Source(_)
3330                | CatalogItem::Log(_)
3331                | CatalogItem::View(_)
3332                | CatalogItem::Sink(_)
3333                | CatalogItem::Type(_)
3334                | CatalogItem::Func(_)
3335                | CatalogItem::Secret(_)
3336                | CatalogItem::Connection(_) => continue,
3337            };
3338            if let Some(plan) = self.catalog.try_get_physical_plan(&gid) {
3339                catalog_ids.push(gid);
3340                dataflows.push(plan.clone());
3341
3342                if let Some(compaction_window) = entry.item().initial_logical_compaction_window() {
3343                    read_policies.insert(gid, compaction_window.into());
3344                }
3345            }
3346        }
3347
3348        let read_ts = self.get_local_read_ts().await;
3349        let read_holds = as_of_selection::run(
3350            &mut dataflows,
3351            &read_policies,
3352            &*self.controller.storage_collections,
3353            read_ts,
3354            self.controller.read_only(),
3355        );
3356
3357        let catalog = self.catalog_mut();
3358        for (id, plan) in catalog_ids.into_iter().zip_eq(dataflows) {
3359            catalog.set_physical_plan(id, plan);
3360        }
3361
3362        read_holds
3363    }
3364
3365    /// Serves the coordinator, receiving commands from users over `cmd_rx`
3366    /// and feedback from dataflow workers over `feedback_rx`.
3367    ///
3368    /// You must call `bootstrap` before calling this method.
3369    ///
3370    /// BOXED FUTURE: As of Nov 2023 the returned Future from this function was 92KB. This would
3371    /// get stored on the stack which is bad for runtime performance, and blow up our stack usage.
3372    /// Because of that we purposefully move this Future onto the heap (i.e. Box it).
3373    fn serve(
3374        mut self,
3375        mut internal_cmd_rx: mpsc::UnboundedReceiver<Message>,
3376        mut strict_serializable_reads_rx: mpsc::UnboundedReceiver<(ConnectionId, PendingReadTxn)>,
3377        mut cmd_rx: mpsc::UnboundedReceiver<(OpenTelemetryContext, Command)>,
3378        group_commit_rx: appends::GroupCommitWaiter,
3379    ) -> LocalBoxFuture<'static, ()> {
3380        async move {
3381            // Watcher that listens for and reports cluster service status changes.
3382            let mut cluster_events = self.controller.events_stream();
3383            let last_message = Arc::new(Mutex::new(LastMessage {
3384                kind: "none",
3385                stmt: None,
3386            }));
3387
3388            let (idle_tx, mut idle_rx) = tokio::sync::mpsc::channel(1);
3389            let idle_metric = self.metrics.queue_busy_seconds.clone();
3390            let last_message_watchdog = Arc::clone(&last_message);
3391
3392            spawn(|| "coord watchdog", async move {
3393                // Every 5 seconds, attempt to measure how long it takes for the
3394                // coord select loop to be empty, because this message is the last
3395                // processed. If it is idle, this will result in some microseconds
3396                // of measurement.
3397                let mut interval = tokio::time::interval(Duration::from_secs(5));
3398                // If we end up having to wait more than 5 seconds for the coord to respond, then the
3399                // behavior of Delay results in the interval "restarting" from whenever we yield
3400                // instead of trying to catch up.
3401                interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
3402
3403                // Track if we become stuck to de-dupe error reporting.
3404                let mut coord_stuck = false;
3405
3406                loop {
3407                    interval.tick().await;
3408
3409                    // Wait for space in the channel, if we timeout then the coordinator is stuck!
3410                    let duration = tokio::time::Duration::from_secs(30);
3411                    let timeout = tokio::time::timeout(duration, idle_tx.reserve()).await;
3412                    let Ok(maybe_permit) = timeout else {
3413                        // Only log if we're newly stuck, to prevent logging repeatedly.
3414                        if !coord_stuck {
3415                            let last_message = last_message_watchdog.lock().expect("poisoned");
3416                            tracing::warn!(
3417                                last_message_kind = %last_message.kind,
3418                                last_message_sql = %last_message.stmt_to_string(),
3419                                "coordinator stuck for {duration:?}",
3420                            );
3421                        }
3422                        coord_stuck = true;
3423
3424                        continue;
3425                    };
3426
3427                    // We got a permit, we're not stuck!
3428                    if coord_stuck {
3429                        tracing::info!("Coordinator became unstuck");
3430                    }
3431                    coord_stuck = false;
3432
3433                    // If we failed to acquire a permit it's because we're shutting down.
3434                    let Ok(permit) = maybe_permit else {
3435                        break;
3436                    };
3437
3438                    permit.send(idle_metric.start_timer());
3439                }
3440            });
3441
3442            self.schedule_storage_usage_collection().await;
3443            self.spawn_privatelink_vpc_endpoints_watch_task();
3444            self.spawn_statement_logging_task();
3445            flags::tracing_config(self.catalog.system_config()).apply(&self.tracing_handle);
3446
3447            // Report if the handling of a single message takes longer than this threshold.
3448            let warn_threshold = self
3449                .catalog()
3450                .system_config()
3451                .coord_slow_message_warn_threshold();
3452
3453            // How many messages we'd like to batch up before processing them. Must be > 0.
3454            const MESSAGE_BATCH: usize = 64;
3455            let mut messages = Vec::with_capacity(MESSAGE_BATCH);
3456            let mut cmd_messages = Vec::with_capacity(MESSAGE_BATCH);
3457
3458            let message_batch = self.metrics.message_batch.clone();
3459
3460            loop {
3461                // Before adding a branch to this select loop, please ensure that the branch is
3462                // cancellation safe and add a comment explaining why. You can refer here for more
3463                // info: https://docs.rs/tokio/latest/tokio/macro.select.html#cancellation-safety
3464                select! {
3465                    // We prioritize internal commands over other commands. However, we work through
3466                    // batches of commands in some branches of this select, which means that even if
3467                    // a command generates internal commands, we will work through the current batch
3468                    // before receiving a new batch of commands.
3469                    biased;
3470
3471                    // `recv_many()` on `UnboundedReceiver` is cancellation safe:
3472                    // https://docs.rs/tokio/1.38.0/tokio/sync/mpsc/struct.UnboundedReceiver.html#cancel-safety-1
3473                    // Receive a batch of commands.
3474                    _ = internal_cmd_rx.recv_many(&mut messages, MESSAGE_BATCH) => {},
3475                    // `next()` on any stream is cancel-safe:
3476                    // https://docs.rs/tokio-stream/0.1.9/tokio_stream/trait.StreamExt.html#cancel-safety
3477                    // Receive a single command.
3478                    Some(event) = cluster_events.next() => messages.push(Message::ClusterEvent(event)),
3479                    // See [`mz_controller::Controller::Controller::ready`] for notes
3480                    // on why this is cancel-safe.
3481                    // Receive a single command.
3482                    () = self.controller.ready() => {
3483                        // NOTE: We don't get a `Readiness` back from `ready()`
3484                        // because the controller wants to keep it and it's not
3485                        // trivially `Clone` or `Copy`. Hence this accessor.
3486                        let controller = match self.controller.get_readiness() {
3487                            Readiness::Storage => ControllerReadiness::Storage,
3488                            Readiness::Compute => ControllerReadiness::Compute,
3489                            Readiness::Metrics(_) => ControllerReadiness::Metrics,
3490                            Readiness::Internal(_) => ControllerReadiness::Internal,
3491                            Readiness::NotReady => unreachable!("just signaled as ready"),
3492                        };
3493                        messages.push(Message::ControllerReady { controller });
3494                    }
3495                    // See [`appends::GroupCommitWaiter`] for notes on why this is cancel safe.
3496                    // Receive a single command.
3497                    permit = group_commit_rx.ready() => {
3498                        // If we happen to have batched exactly one user write, use
3499                        // that span so the `emit_trace_id_notice` hooks up.
3500                        // Otherwise, the best we can do is invent a new root span
3501                        // and make it follow from all the Spans in the pending
3502                        // writes.
3503                        let user_write_spans = self.pending_writes.iter().flat_map(|x| match x {
3504                            PendingWriteTxn::User{span, ..} => Some(span),
3505                            PendingWriteTxn::System{..} => None,
3506                        });
3507                        let span = match user_write_spans.exactly_one() {
3508                            Ok(span) => span.clone(),
3509                            Err(user_write_spans) => {
3510                                let span = info_span!(parent: None, "group_commit_notify");
3511                                for s in user_write_spans {
3512                                    span.follows_from(s);
3513                                }
3514                                span
3515                            }
3516                        };
3517                        messages.push(Message::GroupCommitInitiate(span, Some(permit)));
3518                    },
3519                    // `recv_many()` on `UnboundedReceiver` is cancellation safe:
3520                    // https://docs.rs/tokio/1.38.0/tokio/sync/mpsc/struct.UnboundedReceiver.html#cancel-safety-1
3521                    // Receive a batch of commands.
3522                    count = cmd_rx.recv_many(&mut cmd_messages, MESSAGE_BATCH) => {
3523                        if count == 0 {
3524                            break;
3525                        } else {
3526                            messages.extend(cmd_messages.drain(..).map(|(otel_ctx, cmd)| Message::Command(otel_ctx, cmd)));
3527                        }
3528                    },
3529                    // `recv()` on `UnboundedReceiver` is cancellation safe:
3530                    // https://docs.rs/tokio/1.38.0/tokio/sync/mpsc/struct.UnboundedReceiver.html#cancel-safety
3531                    // Receive a single command.
3532                    Some(pending_read_txn) = strict_serializable_reads_rx.recv() => {
3533                        let mut pending_read_txns = vec![pending_read_txn];
3534                        while let Ok(pending_read_txn) = strict_serializable_reads_rx.try_recv() {
3535                            pending_read_txns.push(pending_read_txn);
3536                        }
3537                        for (conn_id, pending_read_txn) in pending_read_txns {
3538                            let prev = self.pending_linearize_read_txns.insert(conn_id, pending_read_txn);
3539                            soft_assert_or_log!(
3540                                prev.is_none(),
3541                                "connections can not have multiple concurrent reads, prev: {prev:?}"
3542                            )
3543                        }
3544                        messages.push(Message::LinearizeReads);
3545                    }
3546                    // `tick()` on `Interval` is cancel-safe:
3547                    // https://docs.rs/tokio/1.19.2/tokio/time/struct.Interval.html#cancel-safety
3548                    // Receive a single command.
3549                    _ = self.advance_timelines_interval.tick() => {
3550                        let span = info_span!(parent: None, "coord::advance_timelines_interval");
3551                        span.follows_from(Span::current());
3552
3553                        // Group commit sends an `AdvanceTimelines` message when
3554                        // done, which is what downgrades read holds. In
3555                        // read-only mode we send this message directly because
3556                        // we're not doing group commits.
3557                        if self.controller.read_only() {
3558                            messages.push(Message::AdvanceTimelines);
3559                        } else {
3560                            messages.push(Message::GroupCommitInitiate(span, None));
3561                        }
3562                    },
3563                    // `tick()` on `Interval` is cancel-safe:
3564                    // https://docs.rs/tokio/1.19.2/tokio/time/struct.Interval.html#cancel-safety
3565                    // Receive a single command.
3566                    _ = self.check_cluster_scheduling_policies_interval.tick() => {
3567                        messages.push(Message::CheckSchedulingPolicies);
3568                    },
3569
3570                    // `tick()` on `Interval` is cancel-safe:
3571                    // https://docs.rs/tokio/1.19.2/tokio/time/struct.Interval.html#cancel-safety
3572                    // Receive a single command.
3573                    _ = self.caught_up_check_interval.tick() => {
3574                        // We do this directly on the main loop instead of
3575                        // firing off a message. We are still in read-only mode,
3576                        // so optimizing for latency, not blocking the main loop
3577                        // is not that important.
3578                        self.maybe_check_caught_up().await;
3579
3580                        continue;
3581                    },
3582
3583                    // Process the idle metric at the lowest priority to sample queue non-idle time.
3584                    // `recv()` on `Receiver` is cancellation safe:
3585                    // https://docs.rs/tokio/1.8.0/tokio/sync/mpsc/struct.Receiver.html#cancel-safety
3586                    // Receive a single command.
3587                    timer = idle_rx.recv() => {
3588                        timer.expect("does not drop").observe_duration();
3589                        self.metrics
3590                            .message_handling
3591                            .with_label_values(&["watchdog"])
3592                            .observe(0.0);
3593                        continue;
3594                    }
3595                };
3596
3597                // Observe the number of messages we're processing at once.
3598                message_batch.observe(f64::cast_lossy(messages.len()));
3599
3600                for msg in messages.drain(..) {
3601                    // All message processing functions trace. Start a parent span
3602                    // for them to make it easy to find slow messages.
3603                    let msg_kind = msg.kind();
3604                    let span = span!(
3605                        target: "mz_adapter::coord::handle_message_loop",
3606                        Level::INFO,
3607                        "coord::handle_message",
3608                        kind = msg_kind
3609                    );
3610                    let otel_context = span.context().span().span_context().clone();
3611
3612                    // Record the last kind of message in case we get stuck. For
3613                    // execute commands, we additionally stash the user's SQL,
3614                    // statement, so we can log it in case we get stuck.
3615                    *last_message.lock().expect("poisoned") = LastMessage {
3616                        kind: msg_kind,
3617                        stmt: match &msg {
3618                            Message::Command(
3619                                _,
3620                                Command::Execute {
3621                                    portal_name,
3622                                    session,
3623                                    ..
3624                                },
3625                            ) => session
3626                                .get_portal_unverified(portal_name)
3627                                .and_then(|p| p.stmt.as_ref().map(Arc::clone)),
3628                            _ => None,
3629                        },
3630                    };
3631
3632                    let start = Instant::now();
3633                    self.handle_message(msg).instrument(span).await;
3634                    let duration = start.elapsed();
3635
3636                    self.metrics
3637                        .message_handling
3638                        .with_label_values(&[msg_kind])
3639                        .observe(duration.as_secs_f64());
3640
3641                    // If something is _really_ slow, print a trace id for debugging, if OTEL is enabled.
3642                    if duration > warn_threshold {
3643                        let trace_id = otel_context.is_valid().then(|| otel_context.trace_id());
3644                        tracing::error!(
3645                            ?msg_kind,
3646                            ?trace_id,
3647                            ?duration,
3648                            "very slow coordinator message"
3649                        );
3650                    }
3651                }
3652            }
3653            // Try and cleanup as a best effort. There may be some async tasks out there holding a
3654            // reference that prevents us from cleaning up.
3655            if let Some(catalog) = Arc::into_inner(self.catalog) {
3656                catalog.expire().await;
3657            }
3658        }
3659        .boxed_local()
3660    }
3661
3662    /// Obtain a read-only Catalog reference.
3663    fn catalog(&self) -> &Catalog {
3664        &self.catalog
3665    }
3666
3667    /// Obtain a read-only Catalog snapshot, suitable for giving out to
3668    /// non-Coordinator thread tasks.
3669    fn owned_catalog(&self) -> Arc<Catalog> {
3670        Arc::clone(&self.catalog)
3671    }
3672
3673    /// Obtain a handle to the optimizer metrics, suitable for giving
3674    /// out to non-Coordinator thread tasks.
3675    fn optimizer_metrics(&self) -> OptimizerMetrics {
3676        self.optimizer_metrics.clone()
3677    }
3678
3679    /// Obtain a writeable Catalog reference.
3680    fn catalog_mut(&mut self) -> &mut Catalog {
3681        // make_mut will cause any other Arc references (from owned_catalog) to
3682        // continue to be valid by cloning the catalog, putting it in a new Arc,
3683        // which lives at self._catalog. If there are no other Arc references,
3684        // then no clone is made, and it returns a reference to the existing
3685        // object. This makes this method and owned_catalog both very cheap: at
3686        // most one clone per catalog mutation, but only if there's a read-only
3687        // reference to it.
3688        Arc::make_mut(&mut self.catalog)
3689    }
3690
3691    /// Obtain a reference to the coordinator's connection context.
3692    fn connection_context(&self) -> &ConnectionContext {
3693        self.controller.connection_context()
3694    }
3695
3696    /// Obtain a reference to the coordinator's secret reader, in an `Arc`.
3697    fn secrets_reader(&self) -> &Arc<dyn SecretsReader> {
3698        &self.connection_context().secrets_reader
3699    }
3700
3701    /// Publishes a notice message to all sessions.
3702    ///
3703    /// TODO(parkmycar): This code is dead, but is a nice parallel to [`Coordinator::broadcast_notice_tx`]
3704    /// so we keep it around.
3705    #[allow(dead_code)]
3706    pub(crate) fn broadcast_notice(&self, notice: AdapterNotice) {
3707        for meta in self.active_conns.values() {
3708            let _ = meta.notice_tx.send(notice.clone());
3709        }
3710    }
3711
3712    /// Returns a closure that will publish a notice to all sessions that were active at the time
3713    /// this method was called.
3714    pub(crate) fn broadcast_notice_tx(
3715        &self,
3716    ) -> Box<dyn FnOnce(AdapterNotice) -> () + Send + 'static> {
3717        let senders: Vec<_> = self
3718            .active_conns
3719            .values()
3720            .map(|meta| meta.notice_tx.clone())
3721            .collect();
3722        Box::new(move |notice| {
3723            for tx in senders {
3724                let _ = tx.send(notice.clone());
3725            }
3726        })
3727    }
3728
3729    pub(crate) fn active_conns(&self) -> &BTreeMap<ConnectionId, ConnMeta> {
3730        &self.active_conns
3731    }
3732
3733    #[instrument(level = "debug")]
3734    pub(crate) fn retire_execution(
3735        &mut self,
3736        reason: StatementEndedExecutionReason,
3737        ctx_extra: ExecuteContextExtra,
3738    ) {
3739        if let Some(uuid) = ctx_extra.retire() {
3740            self.end_statement_execution(uuid, reason);
3741        }
3742    }
3743
3744    /// Creates a new dataflow builder from the catalog and indexes in `self`.
3745    #[instrument(level = "debug")]
3746    pub fn dataflow_builder(&self, instance: ComputeInstanceId) -> DataflowBuilder<'_> {
3747        let compute = self
3748            .instance_snapshot(instance)
3749            .expect("compute instance does not exist");
3750        DataflowBuilder::new(self.catalog().state(), compute)
3751    }
3752
3753    /// Return a reference-less snapshot to the indicated compute instance.
3754    pub fn instance_snapshot(
3755        &self,
3756        id: ComputeInstanceId,
3757    ) -> Result<ComputeInstanceSnapshot, InstanceMissing> {
3758        ComputeInstanceSnapshot::new(&self.controller, id)
3759    }
3760
3761    /// Call into the compute controller to install a finalized dataflow, and
3762    /// initialize the read policies for its exported readable objects.
3763    ///
3764    /// # Panics
3765    ///
3766    /// Panics if dataflow creation fails.
3767    pub(crate) async fn ship_dataflow(
3768        &mut self,
3769        dataflow: DataflowDescription<Plan>,
3770        instance: ComputeInstanceId,
3771        subscribe_target_replica: Option<ReplicaId>,
3772    ) {
3773        self.try_ship_dataflow(dataflow, instance, subscribe_target_replica)
3774            .await
3775            .unwrap_or_terminate("dataflow creation cannot fail");
3776    }
3777
3778    /// Call into the compute controller to install a finalized dataflow, and
3779    /// initialize the read policies for its exported readable objects.
3780    pub(crate) async fn try_ship_dataflow(
3781        &mut self,
3782        dataflow: DataflowDescription<Plan>,
3783        instance: ComputeInstanceId,
3784        subscribe_target_replica: Option<ReplicaId>,
3785    ) -> Result<(), DataflowCreationError> {
3786        // We must only install read policies for indexes, not for sinks.
3787        // Sinks are write-only compute collections that don't have read policies.
3788        let export_ids = dataflow.exported_index_ids().collect();
3789
3790        self.controller
3791            .compute
3792            .create_dataflow(instance, dataflow, subscribe_target_replica)?;
3793
3794        self.initialize_compute_read_policies(export_ids, instance, CompactionWindow::Default)
3795            .await;
3796
3797        Ok(())
3798    }
3799
3800    /// Call into the compute controller to allow writes to the specified IDs
3801    /// from the specified instance. Calling this function multiple times and
3802    /// calling it on a read-only instance has no effect.
3803    pub(crate) fn allow_writes(&mut self, instance: ComputeInstanceId, id: GlobalId) {
3804        self.controller
3805            .compute
3806            .allow_writes(instance, id)
3807            .unwrap_or_terminate("allow_writes cannot fail");
3808    }
3809
3810    /// Like `ship_dataflow`, but also await on builtin table updates.
3811    pub(crate) async fn ship_dataflow_and_notice_builtin_table_updates(
3812        &mut self,
3813        dataflow: DataflowDescription<Plan>,
3814        instance: ComputeInstanceId,
3815        notice_builtin_updates_fut: Option<BuiltinTableAppendNotify>,
3816    ) {
3817        if let Some(notice_builtin_updates_fut) = notice_builtin_updates_fut {
3818            let ship_dataflow_fut = self.ship_dataflow(dataflow, instance, None);
3819            let ((), ()) =
3820                futures::future::join(notice_builtin_updates_fut, ship_dataflow_fut).await;
3821        } else {
3822            self.ship_dataflow(dataflow, instance, None).await;
3823        }
3824    }
3825
3826    /// Install a _watch set_ in the controller that is automatically associated with the given
3827    /// connection id. The watchset will be automatically cleared if the connection terminates
3828    /// before the watchset completes.
3829    pub fn install_compute_watch_set(
3830        &mut self,
3831        conn_id: ConnectionId,
3832        objects: BTreeSet<GlobalId>,
3833        t: Timestamp,
3834        state: WatchSetResponse,
3835    ) -> Result<(), CollectionLookupError> {
3836        let ws_id = self.controller.install_compute_watch_set(objects, t)?;
3837        self.connection_watch_sets
3838            .entry(conn_id.clone())
3839            .or_default()
3840            .insert(ws_id);
3841        self.installed_watch_sets.insert(ws_id, (conn_id, state));
3842        Ok(())
3843    }
3844
3845    /// Install a _watch set_ in the controller that is automatically associated with the given
3846    /// connection id. The watchset will be automatically cleared if the connection terminates
3847    /// before the watchset completes.
3848    pub fn install_storage_watch_set(
3849        &mut self,
3850        conn_id: ConnectionId,
3851        objects: BTreeSet<GlobalId>,
3852        t: Timestamp,
3853        state: WatchSetResponse,
3854    ) -> Result<(), CollectionLookupError> {
3855        let ws_id = self.controller.install_storage_watch_set(objects, t)?;
3856        self.connection_watch_sets
3857            .entry(conn_id.clone())
3858            .or_default()
3859            .insert(ws_id);
3860        self.installed_watch_sets.insert(ws_id, (conn_id, state));
3861        Ok(())
3862    }
3863
3864    /// Cancels pending watchsets associated with the provided connection id.
3865    pub fn cancel_pending_watchsets(&mut self, conn_id: &ConnectionId) {
3866        if let Some(ws_ids) = self.connection_watch_sets.remove(conn_id) {
3867            for ws_id in ws_ids {
3868                self.installed_watch_sets.remove(&ws_id);
3869            }
3870        }
3871    }
3872
3873    /// Returns the state of the [`Coordinator`] formatted as JSON.
3874    ///
3875    /// The returned value is not guaranteed to be stable and may change at any point in time.
3876    pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
3877        // Note: We purposefully use the `Debug` formatting for the value of all fields in the
3878        // returned object as a tradeoff between usability and stability. `serde_json` will fail
3879        // to serialize an object if the keys aren't strings, so `Debug` formatting the values
3880        // prevents a future unrelated change from silently breaking this method.
3881
3882        let global_timelines: BTreeMap<_, _> = self
3883            .global_timelines
3884            .iter()
3885            .map(|(timeline, state)| (timeline.to_string(), format!("{state:?}")))
3886            .collect();
3887        let active_conns: BTreeMap<_, _> = self
3888            .active_conns
3889            .iter()
3890            .map(|(id, meta)| (id.unhandled().to_string(), format!("{meta:?}")))
3891            .collect();
3892        let txn_read_holds: BTreeMap<_, _> = self
3893            .txn_read_holds
3894            .iter()
3895            .map(|(id, capability)| (id.unhandled().to_string(), format!("{capability:?}")))
3896            .collect();
3897        let pending_peeks: BTreeMap<_, _> = self
3898            .pending_peeks
3899            .iter()
3900            .map(|(id, peek)| (id.to_string(), format!("{peek:?}")))
3901            .collect();
3902        let client_pending_peeks: BTreeMap<_, _> = self
3903            .client_pending_peeks
3904            .iter()
3905            .map(|(id, peek)| {
3906                let peek: BTreeMap<_, _> = peek
3907                    .iter()
3908                    .map(|(uuid, storage_id)| (uuid.to_string(), storage_id))
3909                    .collect();
3910                (id.to_string(), peek)
3911            })
3912            .collect();
3913        let pending_linearize_read_txns: BTreeMap<_, _> = self
3914            .pending_linearize_read_txns
3915            .iter()
3916            .map(|(id, read_txn)| (id.unhandled().to_string(), format!("{read_txn:?}")))
3917            .collect();
3918
3919        Ok(serde_json::json!({
3920            "global_timelines": global_timelines,
3921            "active_conns": active_conns,
3922            "txn_read_holds": txn_read_holds,
3923            "pending_peeks": pending_peeks,
3924            "client_pending_peeks": client_pending_peeks,
3925            "pending_linearize_read_txns": pending_linearize_read_txns,
3926            "controller": self.controller.dump().await?,
3927        }))
3928    }
3929
3930    /// Prune all storage usage events from the [`MZ_STORAGE_USAGE_BY_SHARD`] table that are older
3931    /// than `retention_period`.
3932    ///
3933    /// This method will read the entire contents of [`MZ_STORAGE_USAGE_BY_SHARD`] into memory
3934    /// which can be expensive.
3935    ///
3936    /// DO NOT call this method outside of startup. The safety of reading at the current oracle read
3937    /// timestamp and then writing at whatever the current write timestamp is (instead of
3938    /// `read_ts + 1`) relies on the fact that there are no outstanding writes during startup.
3939    ///
3940    /// Group commit, which this method uses to write the retractions, has builtin fencing, and we
3941    /// never commit retractions to [`MZ_STORAGE_USAGE_BY_SHARD`] outside of this method, which is
3942    /// only called once during startup. So we don't have to worry about double/invalid retractions.
3943    async fn prune_storage_usage_events_on_startup(&self, retention_period: Duration) {
3944        let item_id = self
3945            .catalog()
3946            .resolve_builtin_table(&MZ_STORAGE_USAGE_BY_SHARD);
3947        let global_id = self.catalog.get_entry(&item_id).latest_global_id();
3948        let read_ts = self.get_local_read_ts().await;
3949        let current_contents_fut = self
3950            .controller
3951            .storage_collections
3952            .snapshot(global_id, read_ts);
3953        let internal_cmd_tx = self.internal_cmd_tx.clone();
3954        spawn(|| "storage_usage_prune", async move {
3955            let mut current_contents = current_contents_fut
3956                .await
3957                .unwrap_or_terminate("cannot fail to fetch snapshot");
3958            differential_dataflow::consolidation::consolidate(&mut current_contents);
3959
3960            let cutoff_ts = u128::from(read_ts).saturating_sub(retention_period.as_millis());
3961            let mut expired = Vec::new();
3962            for (row, diff) in current_contents {
3963                assert_eq!(
3964                    diff, 1,
3965                    "consolidated contents should not contain retractions: ({row:#?}, {diff:#?})"
3966                );
3967                // This logic relies on the definition of `mz_storage_usage_by_shard` not changing.
3968                let collection_timestamp = row
3969                    .unpack()
3970                    .get(3)
3971                    .expect("definition of mz_storage_by_shard changed")
3972                    .unwrap_timestamptz();
3973                let collection_timestamp = collection_timestamp.timestamp_millis();
3974                let collection_timestamp: u128 = collection_timestamp
3975                    .try_into()
3976                    .expect("all collections happen after Jan 1 1970");
3977                if collection_timestamp < cutoff_ts {
3978                    debug!("pruning storage event {row:?}");
3979                    let builtin_update = BuiltinTableUpdate::row(item_id, row, Diff::MINUS_ONE);
3980                    expired.push(builtin_update);
3981                }
3982            }
3983
3984            // main thread has shut down.
3985            let _ = internal_cmd_tx.send(Message::StorageUsagePrune(expired));
3986        });
3987    }
3988
3989    fn current_credit_consumption_rate(&self) -> Numeric {
3990        self.catalog()
3991            .user_cluster_replicas()
3992            .filter_map(|replica| match &replica.config.location {
3993                ReplicaLocation::Managed(location) => Some(location.size_for_billing()),
3994                ReplicaLocation::Unmanaged(_) => None,
3995            })
3996            .map(|size| {
3997                self.catalog()
3998                    .cluster_replica_sizes()
3999                    .0
4000                    .get(size)
4001                    .expect("location size is validated against the cluster replica sizes")
4002                    .credits_per_hour
4003            })
4004            .sum()
4005    }
4006}
4007
4008#[cfg(test)]
4009impl Coordinator {
4010    #[allow(dead_code)]
4011    async fn verify_ship_dataflow_no_error(&mut self, dataflow: DataflowDescription<Plan>) {
4012        // `ship_dataflow_new` is not allowed to have a `Result` return because this function is
4013        // called after `catalog_transact`, after which no errors are allowed. This test exists to
4014        // prevent us from incorrectly teaching those functions how to return errors (which has
4015        // happened twice and is the motivation for this test).
4016
4017        // An arbitrary compute instance ID to satisfy the function calls below. Note that
4018        // this only works because this function will never run.
4019        let compute_instance = ComputeInstanceId::user(1).expect("1 is a valid ID");
4020
4021        let _: () = self.ship_dataflow(dataflow, compute_instance, None).await;
4022    }
4023}
4024
4025/// Contains information about the last message the [`Coordinator`] processed.
4026struct LastMessage {
4027    kind: &'static str,
4028    stmt: Option<Arc<Statement<Raw>>>,
4029}
4030
4031impl LastMessage {
4032    /// Returns a redacted version of the statement that is safe for logs.
4033    fn stmt_to_string(&self) -> Cow<'static, str> {
4034        self.stmt
4035            .as_ref()
4036            .map(|stmt| stmt.to_ast_string_redacted().into())
4037            .unwrap_or(Cow::Borrowed("<none>"))
4038    }
4039}
4040
4041impl fmt::Debug for LastMessage {
4042    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4043        f.debug_struct("LastMessage")
4044            .field("kind", &self.kind)
4045            .field("stmt", &self.stmt_to_string())
4046            .finish()
4047    }
4048}
4049
4050impl Drop for LastMessage {
4051    fn drop(&mut self) {
4052        // Only print the last message if we're currently panicking, otherwise we'd spam our logs.
4053        if std::thread::panicking() {
4054            // If we're panicking theres no guarantee `tracing` still works, so print to stderr.
4055            eprintln!("Coordinator panicking, dumping last message\n{self:?}",);
4056        }
4057    }
4058}
4059
4060/// Serves the coordinator based on the provided configuration.
4061///
4062/// For a high-level description of the coordinator, see the [crate
4063/// documentation](crate).
4064///
4065/// Returns a handle to the coordinator and a client to communicate with the
4066/// coordinator.
4067///
4068/// BOXED FUTURE: As of Nov 2023 the returned Future from this function was 42KB. This would
4069/// get stored on the stack which is bad for runtime performance, and blow up our stack usage.
4070/// Because of that we purposefully move this Future onto the heap (i.e. Box it).
4071pub fn serve(
4072    Config {
4073        controller_config,
4074        controller_envd_epoch,
4075        mut storage,
4076        audit_logs_iterator,
4077        timestamp_oracle_url,
4078        unsafe_mode,
4079        all_features,
4080        build_info,
4081        environment_id,
4082        metrics_registry,
4083        now,
4084        secrets_controller,
4085        cloud_resource_controller,
4086        cluster_replica_sizes,
4087        builtin_system_cluster_config,
4088        builtin_catalog_server_cluster_config,
4089        builtin_probe_cluster_config,
4090        builtin_support_cluster_config,
4091        builtin_analytics_cluster_config,
4092        system_parameter_defaults,
4093        availability_zones,
4094        storage_usage_client,
4095        storage_usage_collection_interval,
4096        storage_usage_retention_period,
4097        segment_client,
4098        egress_addresses,
4099        aws_account_id,
4100        aws_privatelink_availability_zones,
4101        connection_context,
4102        connection_limit_callback,
4103        remote_system_parameters,
4104        webhook_concurrency_limit,
4105        http_host_name,
4106        tracing_handle,
4107        read_only_controllers,
4108        caught_up_trigger: clusters_caught_up_trigger,
4109        helm_chart_version,
4110        license_key,
4111        external_login_password_mz_system,
4112        force_builtin_schema_migration,
4113    }: Config,
4114) -> BoxFuture<'static, Result<(Handle, Client), AdapterError>> {
4115    async move {
4116        let coord_start = Instant::now();
4117        info!("startup: coordinator init: beginning");
4118        info!("startup: coordinator init: preamble beginning");
4119
4120        // Initializing the builtins can be an expensive process and consume a lot of memory. We
4121        // forcibly initialize it early while the stack is relatively empty to avoid stack
4122        // overflows later.
4123        let _builtins = LazyLock::force(&BUILTINS_STATIC);
4124
4125        let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
4126        let (internal_cmd_tx, internal_cmd_rx) = mpsc::unbounded_channel();
4127        let (strict_serializable_reads_tx, strict_serializable_reads_rx) =
4128            mpsc::unbounded_channel();
4129
4130        // Validate and process availability zones.
4131        if !availability_zones.iter().all_unique() {
4132            coord_bail!("availability zones must be unique");
4133        }
4134
4135        let aws_principal_context = match (
4136            aws_account_id,
4137            connection_context.aws_external_id_prefix.clone(),
4138        ) {
4139            (Some(aws_account_id), Some(aws_external_id_prefix)) => Some(AwsPrincipalContext {
4140                aws_account_id,
4141                aws_external_id_prefix,
4142            }),
4143            _ => None,
4144        };
4145
4146        let aws_privatelink_availability_zones = aws_privatelink_availability_zones
4147            .map(|azs_vec| BTreeSet::from_iter(azs_vec.iter().cloned()));
4148
4149        info!(
4150            "startup: coordinator init: preamble complete in {:?}",
4151            coord_start.elapsed()
4152        );
4153        let oracle_init_start = Instant::now();
4154        info!("startup: coordinator init: timestamp oracle init beginning");
4155
4156        let pg_timestamp_oracle_config = timestamp_oracle_url
4157            .map(|pg_url| PostgresTimestampOracleConfig::new(&pg_url, &metrics_registry));
4158        let mut initial_timestamps =
4159            get_initial_oracle_timestamps(&pg_timestamp_oracle_config).await?;
4160
4161        // Insert an entry for the `EpochMilliseconds` timeline if one doesn't exist,
4162        // which will ensure that the timeline is initialized since it's required
4163        // by the system.
4164        initial_timestamps
4165            .entry(Timeline::EpochMilliseconds)
4166            .or_insert_with(mz_repr::Timestamp::minimum);
4167        let mut timestamp_oracles = BTreeMap::new();
4168        for (timeline, initial_timestamp) in initial_timestamps {
4169            Coordinator::ensure_timeline_state_with_initial_time(
4170                &timeline,
4171                initial_timestamp,
4172                now.clone(),
4173                pg_timestamp_oracle_config.clone(),
4174                &mut timestamp_oracles,
4175                read_only_controllers,
4176            )
4177            .await;
4178        }
4179
4180        // Opening the durable catalog uses one or more timestamps without communicating with
4181        // the timestamp oracle. Here we make sure to apply the catalog upper with the timestamp
4182        // oracle to linearize future operations with opening the catalog.
4183        let catalog_upper = storage.current_upper().await;
4184        // Choose a time at which to boot. This is used, for example, to prune
4185        // old storage usage data or migrate audit log entries.
4186        //
4187        // This time is usually the current system time, but with protection
4188        // against backwards time jumps, even across restarts.
4189        let epoch_millis_oracle = &timestamp_oracles
4190            .get(&Timeline::EpochMilliseconds)
4191            .expect("inserted above")
4192            .oracle;
4193
4194        let mut boot_ts = if read_only_controllers {
4195            let read_ts = epoch_millis_oracle.read_ts().await;
4196            std::cmp::max(read_ts, catalog_upper)
4197        } else {
4198            // Getting/applying a write timestamp bumps the write timestamp in the
4199            // oracle, which we're not allowed in read-only mode.
4200            epoch_millis_oracle.apply_write(catalog_upper).await;
4201            epoch_millis_oracle.write_ts().await.timestamp
4202        };
4203
4204        info!(
4205            "startup: coordinator init: timestamp oracle init complete in {:?}",
4206            oracle_init_start.elapsed()
4207        );
4208
4209        let catalog_open_start = Instant::now();
4210        info!("startup: coordinator init: catalog open beginning");
4211        let persist_client = controller_config
4212            .persist_clients
4213            .open(controller_config.persist_location.clone())
4214            .await
4215            .context("opening persist client")?;
4216        let builtin_item_migration_config =
4217            BuiltinItemMigrationConfig {
4218                persist_client: persist_client.clone(),
4219                read_only: read_only_controllers,
4220                force_migration: force_builtin_schema_migration,
4221            }
4222        ;
4223        let OpenCatalogResult {
4224            mut catalog,
4225            migrated_storage_collections_0dt,
4226            new_builtin_collections,
4227            builtin_table_updates,
4228            cached_global_exprs,
4229            uncached_local_exprs,
4230        } = Catalog::open(mz_catalog::config::Config {
4231            storage,
4232            metrics_registry: &metrics_registry,
4233            state: mz_catalog::config::StateConfig {
4234                unsafe_mode,
4235                all_features,
4236                build_info,
4237                deploy_generation: controller_config.deploy_generation,
4238                environment_id: environment_id.clone(),
4239                read_only: read_only_controllers,
4240                now: now.clone(),
4241                boot_ts: boot_ts.clone(),
4242                skip_migrations: false,
4243                cluster_replica_sizes,
4244                builtin_system_cluster_config,
4245                builtin_catalog_server_cluster_config,
4246                builtin_probe_cluster_config,
4247                builtin_support_cluster_config,
4248                builtin_analytics_cluster_config,
4249                system_parameter_defaults,
4250                remote_system_parameters,
4251                availability_zones,
4252                egress_addresses,
4253                aws_principal_context,
4254                aws_privatelink_availability_zones,
4255                connection_context,
4256                http_host_name,
4257                builtin_item_migration_config,
4258                persist_client: persist_client.clone(),
4259                enable_expression_cache_override: None,
4260                helm_chart_version,
4261                external_login_password_mz_system,
4262                license_key: license_key.clone(),
4263            },
4264        })
4265        .await?;
4266
4267        // Opening the catalog uses one or more timestamps, so push the boot timestamp up to the
4268        // current catalog upper.
4269        let catalog_upper = catalog.current_upper().await;
4270        boot_ts = std::cmp::max(boot_ts, catalog_upper);
4271
4272        if !read_only_controllers {
4273            epoch_millis_oracle.apply_write(boot_ts).await;
4274        }
4275
4276        info!(
4277            "startup: coordinator init: catalog open complete in {:?}",
4278            catalog_open_start.elapsed()
4279        );
4280
4281        let coord_thread_start = Instant::now();
4282        info!("startup: coordinator init: coordinator thread start beginning");
4283
4284        let session_id = catalog.config().session_id;
4285        let start_instant = catalog.config().start_instant;
4286
4287        // In order for the coordinator to support Rc and Refcell types, it cannot be
4288        // sent across threads. Spawn it in a thread and have this parent thread wait
4289        // for bootstrap completion before proceeding.
4290        let (bootstrap_tx, bootstrap_rx) = oneshot::channel();
4291        let handle = TokioHandle::current();
4292
4293        let metrics = Metrics::register_into(&metrics_registry);
4294        let metrics_clone = metrics.clone();
4295        let optimizer_metrics = OptimizerMetrics::register_into(
4296            &metrics_registry,
4297            catalog.system_config().optimizer_e2e_latency_warning_threshold(),
4298        );
4299        let segment_client_clone = segment_client.clone();
4300        let coord_now = now.clone();
4301        let advance_timelines_interval = tokio::time::interval(catalog.config().timestamp_interval);
4302        let mut check_scheduling_policies_interval = tokio::time::interval(
4303            catalog
4304                .system_config()
4305                .cluster_check_scheduling_policies_interval(),
4306        );
4307        check_scheduling_policies_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
4308
4309        let clusters_caught_up_check_interval = if read_only_controllers {
4310            let dyncfgs = catalog.system_config().dyncfgs();
4311            let interval = WITH_0DT_DEPLOYMENT_CAUGHT_UP_CHECK_INTERVAL.get(dyncfgs);
4312
4313            let mut interval = tokio::time::interval(interval);
4314            interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
4315            interval
4316        } else {
4317            // When not in read-only mode, we don't do hydration checks. But we
4318            // still have to provide _some_ interval. This is large enough that
4319            // it doesn't matter.
4320            //
4321            // TODO(aljoscha): We cannot use Duration::MAX right now because of
4322            // https://github.com/tokio-rs/tokio/issues/6634. Use that once it's
4323            // fixed for good.
4324            let mut interval = tokio::time::interval(Duration::from_secs(60 * 60));
4325            interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
4326            interval
4327        };
4328
4329        let clusters_caught_up_check =
4330            clusters_caught_up_trigger.map(|trigger| CaughtUpCheckContext {
4331                trigger,
4332                exclude_collections: new_builtin_collections.into_iter().collect(),
4333            });
4334
4335        if let Some(config) = pg_timestamp_oracle_config.as_ref() {
4336            // Apply settings from system vars as early as possible because some
4337            // of them are locked in right when an oracle is first opened!
4338            let pg_timestamp_oracle_params =
4339                flags::pg_timstamp_oracle_config(catalog.system_config());
4340            pg_timestamp_oracle_params.apply(config);
4341        }
4342
4343        // Register a callback so whenever the MAX_CONNECTIONS or SUPERUSER_RESERVED_CONNECTIONS
4344        // system variables change, we update our connection limits.
4345        let connection_limit_callback: Arc<dyn Fn(&SystemVars) + Send + Sync> =
4346            Arc::new(move |system_vars: &SystemVars| {
4347                let limit: u64 = system_vars.max_connections().cast_into();
4348                let superuser_reserved: u64 =
4349                    system_vars.superuser_reserved_connections().cast_into();
4350
4351                // If superuser_reserved > max_connections, prefer max_connections.
4352                //
4353                // In this scenario all normal users would be locked out because all connections
4354                // would be reserved for superusers so complain if this is the case.
4355                let superuser_reserved = if superuser_reserved >= limit {
4356                    tracing::warn!(
4357                        "superuser_reserved ({superuser_reserved}) is greater than max connections ({limit})!"
4358                    );
4359                    limit
4360                } else {
4361                    superuser_reserved
4362                };
4363
4364                (connection_limit_callback)(limit, superuser_reserved);
4365            });
4366        catalog.system_config_mut().register_callback(
4367            &mz_sql::session::vars::MAX_CONNECTIONS,
4368            Arc::clone(&connection_limit_callback),
4369        );
4370        catalog.system_config_mut().register_callback(
4371            &mz_sql::session::vars::SUPERUSER_RESERVED_CONNECTIONS,
4372            connection_limit_callback,
4373        );
4374
4375        let (group_commit_tx, group_commit_rx) = appends::notifier();
4376
4377        let parent_span = tracing::Span::current();
4378        let thread = thread::Builder::new()
4379            // The Coordinator thread tends to keep a lot of data on its stack. To
4380            // prevent a stack overflow we allocate a stack three times as big as the default
4381            // stack.
4382            .stack_size(3 * stack::STACK_SIZE)
4383            .name("coordinator".to_string())
4384            .spawn(move || {
4385                let span = info_span!(parent: parent_span, "coord::coordinator").entered();
4386
4387                let controller = handle
4388                    .block_on({
4389                        catalog.initialize_controller(
4390                            controller_config,
4391                            controller_envd_epoch,
4392                            read_only_controllers,
4393                        )
4394                    })
4395                    .unwrap_or_terminate("failed to initialize storage_controller");
4396                // Initializing the controller uses one or more timestamps, so push the boot timestamp up to the
4397                // current catalog upper.
4398                let catalog_upper = handle.block_on(catalog.current_upper());
4399                boot_ts = std::cmp::max(boot_ts, catalog_upper);
4400                if !read_only_controllers {
4401                    let epoch_millis_oracle = &timestamp_oracles
4402                        .get(&Timeline::EpochMilliseconds)
4403                        .expect("inserted above")
4404                        .oracle;
4405                    handle.block_on(epoch_millis_oracle.apply_write(boot_ts));
4406                }
4407
4408                let catalog = Arc::new(catalog);
4409
4410                let caching_secrets_reader = CachingSecretsReader::new(secrets_controller.reader());
4411                let mut coord = Coordinator {
4412                    controller,
4413                    catalog,
4414                    internal_cmd_tx,
4415                    group_commit_tx,
4416                    strict_serializable_reads_tx,
4417                    global_timelines: timestamp_oracles,
4418                    transient_id_gen: Arc::new(TransientIdGen::new()),
4419                    active_conns: BTreeMap::new(),
4420                    txn_read_holds: Default::default(),
4421                    pending_peeks: BTreeMap::new(),
4422                    client_pending_peeks: BTreeMap::new(),
4423                    pending_linearize_read_txns: BTreeMap::new(),
4424                    serialized_ddl: LockedVecDeque::new(),
4425                    active_compute_sinks: BTreeMap::new(),
4426                    active_webhooks: BTreeMap::new(),
4427                    active_copies: BTreeMap::new(),
4428                    staged_cancellation: BTreeMap::new(),
4429                    introspection_subscribes: BTreeMap::new(),
4430                    write_locks: BTreeMap::new(),
4431                    deferred_write_ops: BTreeMap::new(),
4432                    pending_writes: Vec::new(),
4433                    advance_timelines_interval,
4434                    secrets_controller,
4435                    caching_secrets_reader,
4436                    cloud_resource_controller,
4437                    storage_usage_client,
4438                    storage_usage_collection_interval,
4439                    segment_client,
4440                    metrics,
4441                    optimizer_metrics,
4442                    tracing_handle,
4443                    statement_logging: StatementLogging::new(coord_now.clone()),
4444                    webhook_concurrency_limit,
4445                    pg_timestamp_oracle_config,
4446                    check_cluster_scheduling_policies_interval: check_scheduling_policies_interval,
4447                    cluster_scheduling_decisions: BTreeMap::new(),
4448                    caught_up_check_interval: clusters_caught_up_check_interval,
4449                    caught_up_check: clusters_caught_up_check,
4450                    installed_watch_sets: BTreeMap::new(),
4451                    connection_watch_sets: BTreeMap::new(),
4452                    cluster_replica_statuses: ClusterReplicaStatuses::new(),
4453                    read_only_controllers,
4454                    buffered_builtin_table_updates: Some(Vec::new()),
4455                    license_key,
4456                    persist_client,
4457                };
4458                let bootstrap = handle.block_on(async {
4459                    coord
4460                        .bootstrap(
4461                            boot_ts,
4462                            migrated_storage_collections_0dt,
4463                            builtin_table_updates,
4464                            cached_global_exprs,
4465                            uncached_local_exprs,
4466                            audit_logs_iterator,
4467                        )
4468                        .await?;
4469                    coord
4470                        .controller
4471                        .remove_orphaned_replicas(
4472                            coord.catalog().get_next_user_replica_id().await?,
4473                            coord.catalog().get_next_system_replica_id().await?,
4474                        )
4475                        .await
4476                        .map_err(AdapterError::Orchestrator)?;
4477
4478                    if let Some(retention_period) = storage_usage_retention_period {
4479                        coord
4480                            .prune_storage_usage_events_on_startup(retention_period)
4481                            .await;
4482                    }
4483
4484                    Ok(())
4485                });
4486                let ok = bootstrap.is_ok();
4487                drop(span);
4488                bootstrap_tx
4489                    .send(bootstrap)
4490                    .expect("bootstrap_rx is not dropped until it receives this message");
4491                if ok {
4492                    handle.block_on(coord.serve(
4493                        internal_cmd_rx,
4494                        strict_serializable_reads_rx,
4495                        cmd_rx,
4496                        group_commit_rx,
4497                    ));
4498                }
4499            })
4500            .expect("failed to create coordinator thread");
4501        match bootstrap_rx
4502            .await
4503            .expect("bootstrap_tx always sends a message or panics/halts")
4504        {
4505            Ok(()) => {
4506                info!(
4507                    "startup: coordinator init: coordinator thread start complete in {:?}",
4508                    coord_thread_start.elapsed()
4509                );
4510                info!(
4511                    "startup: coordinator init: complete in {:?}",
4512                    coord_start.elapsed()
4513                );
4514                let handle = Handle {
4515                    session_id,
4516                    start_instant,
4517                    _thread: thread.join_on_drop(),
4518                };
4519                let client = Client::new(
4520                    build_info,
4521                    cmd_tx,
4522                    metrics_clone,
4523                    now,
4524                    environment_id,
4525                    segment_client_clone,
4526                );
4527                Ok((handle, client))
4528            }
4529            Err(e) => Err(e),
4530        }
4531    }
4532    .boxed()
4533}
4534
4535// Determines and returns the highest timestamp for each timeline, for all known
4536// timestamp oracle implementations.
4537//
4538// Initially, we did this so that we can switch between implementations of
4539// timestamp oracle, but now we also do this to determine a monotonic boot
4540// timestamp, a timestamp that does not regress across reboots.
4541//
4542// This mostly works, but there can be linearizability violations, because there
4543// is no central moment where we do distributed coordination for all oracle
4544// types. Working around this seems prohibitively hard, maybe even impossible so
4545// we have to live with this window of potential violations during the upgrade
4546// window (which is the only point where we should switch oracle
4547// implementations).
4548async fn get_initial_oracle_timestamps(
4549    pg_timestamp_oracle_config: &Option<PostgresTimestampOracleConfig>,
4550) -> Result<BTreeMap<Timeline, Timestamp>, AdapterError> {
4551    let mut initial_timestamps = BTreeMap::new();
4552
4553    if let Some(pg_timestamp_oracle_config) = pg_timestamp_oracle_config {
4554        let postgres_oracle_timestamps =
4555            PostgresTimestampOracle::<NowFn>::get_all_timelines(pg_timestamp_oracle_config.clone())
4556                .await?;
4557
4558        let debug_msg = || {
4559            postgres_oracle_timestamps
4560                .iter()
4561                .map(|(timeline, ts)| format!("{:?} -> {}", timeline, ts))
4562                .join(", ")
4563        };
4564        info!(
4565            "current timestamps from the postgres-backed timestamp oracle: {}",
4566            debug_msg()
4567        );
4568
4569        for (timeline, ts) in postgres_oracle_timestamps {
4570            let entry = initial_timestamps
4571                .entry(Timeline::from_str(&timeline).expect("could not parse timeline"));
4572
4573            entry
4574                .and_modify(|current_ts| *current_ts = std::cmp::max(*current_ts, ts))
4575                .or_insert(ts);
4576        }
4577    } else {
4578        info!("no postgres url for postgres-backed timestamp oracle configured!");
4579    };
4580
4581    let debug_msg = || {
4582        initial_timestamps
4583            .iter()
4584            .map(|(timeline, ts)| format!("{:?}: {}", timeline, ts))
4585            .join(", ")
4586    };
4587    info!("initial oracle timestamps: {}", debug_msg());
4588
4589    Ok(initial_timestamps)
4590}
4591
4592#[instrument]
4593pub async fn load_remote_system_parameters(
4594    storage: &mut Box<dyn OpenableDurableCatalogState>,
4595    system_parameter_sync_config: Option<SystemParameterSyncConfig>,
4596    system_parameter_sync_timeout: Duration,
4597) -> Result<Option<BTreeMap<String, String>>, AdapterError> {
4598    if let Some(system_parameter_sync_config) = system_parameter_sync_config {
4599        tracing::info!("parameter sync on boot: start sync");
4600
4601        // We intentionally block initial startup, potentially forever,
4602        // on initializing LaunchDarkly. This may seem scary, but the
4603        // alternative is even scarier. Over time, we expect that the
4604        // compiled-in default values for the system parameters will
4605        // drift substantially from the defaults configured in
4606        // LaunchDarkly, to the point that starting an environment
4607        // without loading the latest values from LaunchDarkly will
4608        // result in running an untested configuration.
4609        //
4610        // Note this only applies during initial startup. Restarting
4611        // after we've synced once only blocks for a maximum of
4612        // `FRONTEND_SYNC_TIMEOUT` on LaunchDarkly, as it seems
4613        // reasonable to assume that the last-synced configuration was
4614        // valid enough.
4615        //
4616        // This philosophy appears to provide a good balance between not
4617        // running untested configurations in production while also not
4618        // making LaunchDarkly a "tier 1" dependency for existing
4619        // environments.
4620        //
4621        // If this proves to be an issue, we could seek to address the
4622        // configuration drift in a different way--for example, by
4623        // writing a script that runs in CI nightly and checks for
4624        // deviation between the compiled Rust code and LaunchDarkly.
4625        //
4626        // If it is absolutely necessary to bring up a new environment
4627        // while LaunchDarkly is down, the following manual mitigation
4628        // can be performed:
4629        //
4630        //    1. Edit the environmentd startup parameters to omit the
4631        //       LaunchDarkly configuration.
4632        //    2. Boot environmentd.
4633        //    3. Use the catalog-debug tool to run `edit config "{\"key\":\"system_config_synced\"}" "{\"value\": 1}"`.
4634        //    4. Adjust any other parameters as necessary to avoid
4635        //       running a nonstandard configuration in production.
4636        //    5. Edit the environmentd startup parameters to restore the
4637        //       LaunchDarkly configuration, for when LaunchDarkly comes
4638        //       back online.
4639        //    6. Reboot environmentd.
4640        let mut params = SynchronizedParameters::new(SystemVars::default());
4641        let frontend_sync = async {
4642            let frontend = SystemParameterFrontend::from(&system_parameter_sync_config).await?;
4643            frontend.pull(&mut params);
4644            let ops = params
4645                .modified()
4646                .into_iter()
4647                .map(|param| {
4648                    let name = param.name;
4649                    let value = param.value;
4650                    tracing::info!(name, value, initial = true, "sync parameter");
4651                    (name, value)
4652                })
4653                .collect();
4654            tracing::info!("parameter sync on boot: end sync");
4655            Ok(Some(ops))
4656        };
4657        if !storage.has_system_config_synced_once().await? {
4658            frontend_sync.await
4659        } else {
4660            match mz_ore::future::timeout(system_parameter_sync_timeout, frontend_sync).await {
4661                Ok(ops) => Ok(ops),
4662                Err(TimeoutError::Inner(e)) => Err(e),
4663                Err(TimeoutError::DeadlineElapsed) => {
4664                    tracing::info!("parameter sync on boot: sync has timed out");
4665                    Ok(None)
4666                }
4667            }
4668        }
4669    } else {
4670        Ok(None)
4671    }
4672}
4673
4674#[derive(Debug)]
4675pub enum WatchSetResponse {
4676    StatementDependenciesReady(StatementLoggingId, StatementLifecycleEvent),
4677    AlterSinkReady(AlterSinkReadyContext),
4678}
4679
4680#[derive(Debug)]
4681pub struct AlterSinkReadyContext {
4682    ctx: Option<ExecuteContext>,
4683    otel_ctx: OpenTelemetryContext,
4684    plan: AlterSinkPlan,
4685    plan_validity: PlanValidity,
4686    read_hold: ReadHolds<Timestamp>,
4687}
4688
4689impl AlterSinkReadyContext {
4690    fn ctx(&mut self) -> &mut ExecuteContext {
4691        self.ctx.as_mut().expect("only cleared on drop")
4692    }
4693
4694    fn retire(mut self, result: Result<ExecuteResponse, AdapterError>) {
4695        self.ctx
4696            .take()
4697            .expect("only cleared on drop")
4698            .retire(result);
4699    }
4700}
4701
4702impl Drop for AlterSinkReadyContext {
4703    fn drop(&mut self) {
4704        if let Some(ctx) = self.ctx.take() {
4705            ctx.retire(Err(AdapterError::Canceled));
4706        }
4707    }
4708}
4709
4710/// A struct for tracking the ownership of a lock and a VecDeque to store to-be-done work after the
4711/// lock is freed.
4712#[derive(Debug)]
4713struct LockedVecDeque<T> {
4714    items: VecDeque<T>,
4715    lock: Arc<tokio::sync::Mutex<()>>,
4716}
4717
4718impl<T> LockedVecDeque<T> {
4719    pub fn new() -> Self {
4720        Self {
4721            items: VecDeque::new(),
4722            lock: Arc::new(tokio::sync::Mutex::new(())),
4723        }
4724    }
4725
4726    pub fn try_lock_owned(&self) -> Result<OwnedMutexGuard<()>, tokio::sync::TryLockError> {
4727        Arc::clone(&self.lock).try_lock_owned()
4728    }
4729
4730    pub fn is_empty(&self) -> bool {
4731        self.items.is_empty()
4732    }
4733
4734    pub fn push_back(&mut self, value: T) {
4735        self.items.push_back(value)
4736    }
4737
4738    pub fn pop_front(&mut self) -> Option<T> {
4739        self.items.pop_front()
4740    }
4741
4742    pub fn remove(&mut self, index: usize) -> Option<T> {
4743        self.items.remove(index)
4744    }
4745
4746    pub fn iter(&self) -> std::collections::vec_deque::Iter<'_, T> {
4747        self.items.iter()
4748    }
4749}
4750
4751#[derive(Debug)]
4752struct DeferredPlanStatement {
4753    ctx: ExecuteContext,
4754    ps: PlanStatement,
4755}
4756
4757#[derive(Debug)]
4758enum PlanStatement {
4759    Statement {
4760        stmt: Arc<Statement<Raw>>,
4761        params: Params,
4762    },
4763    Plan {
4764        plan: mz_sql::plan::Plan,
4765        resolved_ids: ResolvedIds,
4766    },
4767}
4768
4769#[derive(Debug, Error)]
4770pub enum NetworkPolicyError {
4771    #[error("Access denied for address {0}")]
4772    AddressDenied(IpAddr),
4773    #[error("Access denied missing IP address")]
4774    MissingIp,
4775}
4776
4777pub(crate) fn validate_ip_with_policy_rules(
4778    ip: &IpAddr,
4779    rules: &Vec<NetworkPolicyRule>,
4780) -> Result<(), NetworkPolicyError> {
4781    // At the moment we're not handling action or direction
4782    // as those are only able to be "allow" and "ingress" respectively
4783    if rules.iter().any(|r| r.address.0.contains(ip)) {
4784        Ok(())
4785    } else {
4786        Err(NetworkPolicyError::AddressDenied(ip.clone()))
4787    }
4788}