mz_adapter/
coord.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Translation of SQL commands into timestamped `Controller` commands.
11//!
12//! The various SQL commands instruct the system to take actions that are not
13//! yet explicitly timestamped. On the other hand, the underlying data continually
14//! change as time moves forward. On the third hand, we greatly benefit from the
15//! information that some times are no longer of interest, so that we may
16//! compact the representation of the continually changing collections.
17//!
18//! The [`Coordinator`] curates these interactions by observing the progress
19//! collections make through time, choosing timestamps for its own commands,
20//! and eventually communicating that certain times have irretrievably "passed".
21//!
22//! ## Frontiers another way
23//!
24//! If the above description of frontiers left you with questions, this
25//! repackaged explanation might help.
26//!
27//! - `since` is the least recent time (i.e. oldest time) that you can read
28//!   from sources and be guaranteed that the returned data is accurate as of
29//!   that time.
30//!
31//!   Reads at times less than `since` may return values that were not actually
32//!   seen at the specified time, but arrived later (i.e. the results are
33//!   compacted).
34//!
35//!   For correctness' sake, the coordinator never chooses to read at a time
36//!   less than an arrangement's `since`.
37//!
38//! - `upper` is the first time after the most recent time that you can read
39//!   from sources and receive an immediate response. Alternately, it is the
40//!   least time at which the data may still change (that is the reason we may
41//!   not be able to respond immediately).
42//!
43//!   Reads at times >= `upper` may not immediately return because the answer
44//!   isn't known yet. However, once the `upper` is > the specified read time,
45//!   the read can return.
46//!
47//!   For the sake of returned values' freshness, the coordinator prefers
48//!   performing reads at an arrangement's `upper`. However, because we more
49//!   strongly prefer correctness, the coordinator will choose timestamps
50//!   greater than an object's `upper` if it is also being accessed alongside
51//!   objects whose `since` times are >= its `upper`.
52//!
53//! This illustration attempts to show, with time moving left to right, the
54//! relationship between `since` and `upper`.
55//!
56//! - `#`: possibly inaccurate results
57//! - `-`: immediate, correct response
58//! - `?`: not yet known
59//! - `s`: since
60//! - `u`: upper
61//! - `|`: eligible for coordinator to select
62//!
63//! ```nofmt
64//! ####s----u?????
65//!     |||||||||||
66//! ```
67//!
68
69use std::borrow::Cow;
70use std::collections::{BTreeMap, BTreeSet, VecDeque};
71use std::fmt;
72use std::net::IpAddr;
73use std::num::NonZeroI64;
74use std::ops::Neg;
75use std::str::FromStr;
76use std::sync::LazyLock;
77use std::sync::{Arc, Mutex};
78use std::thread;
79use std::time::{Duration, Instant};
80
81use anyhow::Context;
82use chrono::{DateTime, Utc};
83use derivative::Derivative;
84use differential_dataflow::lattice::Lattice;
85use fail::fail_point;
86use futures::StreamExt;
87use futures::future::{BoxFuture, FutureExt, LocalBoxFuture};
88use http::Uri;
89use ipnet::IpNet;
90use itertools::{Either, Itertools};
91use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
92use mz_adapter_types::compaction::CompactionWindow;
93use mz_adapter_types::connection::ConnectionId;
94use mz_adapter_types::dyncfgs::WITH_0DT_DEPLOYMENT_CAUGHT_UP_CHECK_INTERVAL;
95use mz_auth::password::Password;
96use mz_build_info::BuildInfo;
97use mz_catalog::builtin::{BUILTINS, BUILTINS_STATIC, MZ_AUDIT_EVENTS, MZ_STORAGE_USAGE_BY_SHARD};
98use mz_catalog::config::{AwsPrincipalContext, BuiltinItemMigrationConfig, ClusterReplicaSizeMap};
99use mz_catalog::durable::{AuditLogIterator, OpenableDurableCatalogState};
100use mz_catalog::expr_cache::{GlobalExpressions, LocalExpressions};
101use mz_catalog::memory::objects::{
102    CatalogEntry, CatalogItem, ClusterReplicaProcessStatus, ClusterVariantManaged, Connection,
103    DataSourceDesc, StateDiff, StateUpdate, StateUpdateKind, Table, TableDataSource,
104};
105use mz_cloud_resources::{CloudResourceController, VpcEndpointConfig, VpcEndpointEvent};
106use mz_compute_client::as_of_selection;
107use mz_compute_client::controller::error::InstanceMissing;
108use mz_compute_types::ComputeInstanceId;
109use mz_compute_types::dataflows::DataflowDescription;
110use mz_compute_types::plan::Plan;
111use mz_controller::clusters::{ClusterConfig, ClusterEvent, ClusterStatus, ProcessId};
112use mz_controller::{ControllerConfig, Readiness};
113use mz_controller_types::{ClusterId, ReplicaId, WatchSetId};
114use mz_expr::{MapFilterProject, OptimizedMirRelationExpr, RowSetFinishing};
115use mz_license_keys::ValidatedLicenseKey;
116use mz_orchestrator::OfflineReason;
117use mz_ore::cast::{CastFrom, CastInto, CastLossy};
118use mz_ore::channel::trigger::Trigger;
119use mz_ore::future::TimeoutError;
120use mz_ore::metrics::MetricsRegistry;
121use mz_ore::now::{EpochMillis, NowFn};
122use mz_ore::task::{JoinHandle, spawn};
123use mz_ore::thread::JoinHandleExt;
124use mz_ore::tracing::{OpenTelemetryContext, TracingHandle};
125use mz_ore::url::SensitiveUrl;
126use mz_ore::{
127    assert_none, instrument, soft_assert_eq_or_log, soft_assert_or_log, soft_panic_or_log, stack,
128};
129use mz_persist_client::PersistClient;
130use mz_persist_client::batch::ProtoBatch;
131use mz_persist_client::usage::{ShardsUsageReferenced, StorageUsageClient};
132use mz_repr::explain::{ExplainConfig, ExplainFormat};
133use mz_repr::global_id::TransientIdGen;
134use mz_repr::optimize::OptimizerFeatures;
135use mz_repr::role_id::RoleId;
136use mz_repr::{CatalogItemId, Diff, GlobalId, RelationDesc, Timestamp};
137use mz_secrets::cache::CachingSecretsReader;
138use mz_secrets::{SecretsController, SecretsReader};
139use mz_sql::ast::{Raw, Statement};
140use mz_sql::catalog::{CatalogCluster, EnvironmentId};
141use mz_sql::names::{QualifiedItemName, ResolvedIds, SchemaSpecifier};
142use mz_sql::optimizer_metrics::OptimizerMetrics;
143use mz_sql::plan::{
144    self, AlterSinkPlan, ConnectionDetails, CreateConnectionPlan, NetworkPolicyRule,
145    OnTimeoutAction, Params, QueryWhen,
146};
147use mz_sql::session::user::User;
148use mz_sql::session::vars::SystemVars;
149use mz_sql_parser::ast::ExplainStage;
150use mz_sql_parser::ast::display::AstDisplay;
151use mz_storage_client::client::TableData;
152use mz_storage_client::controller::{CollectionDescription, DataSource, ExportDescription};
153use mz_storage_types::connections::Connection as StorageConnection;
154use mz_storage_types::connections::ConnectionContext;
155use mz_storage_types::connections::inline::{IntoInlineConnection, ReferencedConnection};
156use mz_storage_types::read_holds::ReadHold;
157use mz_storage_types::sinks::{S3SinkFormat, StorageSinkDesc};
158use mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC;
159use mz_storage_types::sources::{IngestionDescription, SourceExport, Timeline};
160use mz_timestamp_oracle::WriteTimestamp;
161use mz_timestamp_oracle::postgres_oracle::{
162    PostgresTimestampOracle, PostgresTimestampOracleConfig,
163};
164use mz_transform::dataflow::DataflowMetainfo;
165use opentelemetry::trace::TraceContextExt;
166use serde::Serialize;
167use thiserror::Error;
168use timely::progress::{Antichain, Timestamp as _};
169use tokio::runtime::Handle as TokioHandle;
170use tokio::select;
171use tokio::sync::{OwnedMutexGuard, mpsc, oneshot, watch};
172use tokio::time::{Interval, MissedTickBehavior};
173use tracing::{Instrument, Level, Span, debug, info, info_span, span, warn};
174use tracing_opentelemetry::OpenTelemetrySpanExt;
175use uuid::Uuid;
176
177use crate::active_compute_sink::{ActiveComputeSink, ActiveCopyFrom};
178use crate::catalog::{BuiltinTableUpdate, Catalog, OpenCatalogResult};
179use crate::client::{Client, Handle};
180use crate::command::{Command, ExecuteResponse};
181use crate::config::{SynchronizedParameters, SystemParameterFrontend, SystemParameterSyncConfig};
182use crate::coord::appends::{
183    BuiltinTableAppendNotify, DeferredOp, GroupCommitPermit, PendingWriteTxn,
184};
185use crate::coord::caught_up::CaughtUpCheckContext;
186use crate::coord::cluster_scheduling::SchedulingDecision;
187use crate::coord::id_bundle::CollectionIdBundle;
188use crate::coord::introspection::IntrospectionSubscribe;
189use crate::coord::peek::PendingPeek;
190use crate::coord::statement_logging::{StatementLogging, StatementLoggingId};
191use crate::coord::timeline::{TimelineContext, TimelineState};
192use crate::coord::timestamp_selection::{TimestampContext, TimestampDetermination};
193use crate::coord::validity::PlanValidity;
194use crate::error::AdapterError;
195use crate::explain::insights::PlanInsightsContext;
196use crate::explain::optimizer_trace::{DispatchGuard, OptimizerTrace};
197use crate::metrics::Metrics;
198use crate::optimize::dataflows::{
199    ComputeInstanceSnapshot, DataflowBuilder, dataflow_import_id_bundle,
200};
201use crate::optimize::{self, Optimize, OptimizerConfig};
202use crate::session::{EndTransactionAction, Session};
203use crate::statement_logging::{StatementEndedExecutionReason, StatementLifecycleEvent};
204use crate::util::{ClientTransmitter, ResultExt};
205use crate::webhook::{WebhookAppenderInvalidator, WebhookConcurrencyLimiter};
206use crate::{AdapterNotice, ReadHolds, flags};
207
208pub(crate) mod id_bundle;
209pub(crate) mod in_memory_oracle;
210pub(crate) mod peek;
211pub(crate) mod statement_logging;
212pub(crate) mod timeline;
213pub(crate) mod timestamp_selection;
214
215pub mod appends;
216mod catalog_serving;
217mod caught_up;
218pub mod cluster_scheduling;
219mod command_handler;
220pub mod consistency;
221mod ddl;
222mod indexes;
223mod introspection;
224mod message_handler;
225mod privatelink_status;
226pub mod read_policy;
227mod sequencer;
228mod sql;
229mod validity;
230
231#[derive(Debug)]
232pub enum Message {
233    Command(OpenTelemetryContext, Command),
234    ControllerReady {
235        controller: ControllerReadiness,
236    },
237    PurifiedStatementReady(PurifiedStatementReady),
238    CreateConnectionValidationReady(CreateConnectionValidationReady),
239    AlterConnectionValidationReady(AlterConnectionValidationReady),
240    TryDeferred {
241        /// The connection that created this op.
242        conn_id: ConnectionId,
243        /// The write lock that notified us our deferred op might be able to run.
244        ///
245        /// Note: While we never want to hold a partial set of locks, it can be important to hold
246        /// onto the _one_ that notified us our op might be ready. If there are multiple operations
247        /// waiting on a single collection, and we don't hold this lock through retyring the op,
248        /// then everything waiting on this collection will get retried causing traffic in the
249        /// Coordinator's message queue.
250        ///
251        /// See [`DeferredOp::can_be_optimistically_retried`] for more detail.
252        acquired_lock: Option<(CatalogItemId, tokio::sync::OwnedMutexGuard<()>)>,
253    },
254    /// Initiates a group commit.
255    GroupCommitInitiate(Span, Option<GroupCommitPermit>),
256    DeferredStatementReady,
257    AdvanceTimelines,
258    ClusterEvent(ClusterEvent),
259    CancelPendingPeeks {
260        conn_id: ConnectionId,
261    },
262    LinearizeReads,
263    StagedBatches {
264        conn_id: ConnectionId,
265        table_id: CatalogItemId,
266        batches: Vec<Result<ProtoBatch, String>>,
267    },
268    StorageUsageSchedule,
269    StorageUsageFetch,
270    StorageUsageUpdate(ShardsUsageReferenced),
271    StorageUsagePrune(Vec<BuiltinTableUpdate>),
272    /// Performs any cleanup and logging actions necessary for
273    /// finalizing a statement execution.
274    RetireExecute {
275        data: ExecuteContextExtra,
276        otel_ctx: OpenTelemetryContext,
277        reason: StatementEndedExecutionReason,
278    },
279    ExecuteSingleStatementTransaction {
280        ctx: ExecuteContext,
281        otel_ctx: OpenTelemetryContext,
282        stmt: Arc<Statement<Raw>>,
283        params: mz_sql::plan::Params,
284    },
285    PeekStageReady {
286        ctx: ExecuteContext,
287        span: Span,
288        stage: PeekStage,
289    },
290    CreateIndexStageReady {
291        ctx: ExecuteContext,
292        span: Span,
293        stage: CreateIndexStage,
294    },
295    CreateViewStageReady {
296        ctx: ExecuteContext,
297        span: Span,
298        stage: CreateViewStage,
299    },
300    CreateMaterializedViewStageReady {
301        ctx: ExecuteContext,
302        span: Span,
303        stage: CreateMaterializedViewStage,
304    },
305    SubscribeStageReady {
306        ctx: ExecuteContext,
307        span: Span,
308        stage: SubscribeStage,
309    },
310    IntrospectionSubscribeStageReady {
311        span: Span,
312        stage: IntrospectionSubscribeStage,
313    },
314    SecretStageReady {
315        ctx: ExecuteContext,
316        span: Span,
317        stage: SecretStage,
318    },
319    ClusterStageReady {
320        ctx: ExecuteContext,
321        span: Span,
322        stage: ClusterStage,
323    },
324    ExplainTimestampStageReady {
325        ctx: ExecuteContext,
326        span: Span,
327        stage: ExplainTimestampStage,
328    },
329    DrainStatementLog,
330    PrivateLinkVpcEndpointEvents(Vec<VpcEndpointEvent>),
331    CheckSchedulingPolicies,
332
333    /// Scheduling policy decisions about turning clusters On/Off.
334    /// `Vec<(policy name, Vec of decisions by the policy)>`
335    /// A cluster will be On if and only if there is at least one On decision for it.
336    /// Scheduling decisions for clusters that have `SCHEDULE = MANUAL` are ignored.
337    SchedulingDecisions(Vec<(&'static str, Vec<(ClusterId, SchedulingDecision)>)>),
338}
339
340impl Message {
341    /// Returns a string to identify the kind of [`Message`], useful for logging.
342    pub const fn kind(&self) -> &'static str {
343        match self {
344            Message::Command(_, msg) => match msg {
345                Command::CatalogSnapshot { .. } => "command-catalog_snapshot",
346                Command::Startup { .. } => "command-startup",
347                Command::Execute { .. } => "command-execute",
348                Command::Commit { .. } => "command-commit",
349                Command::CancelRequest { .. } => "command-cancel_request",
350                Command::PrivilegedCancelRequest { .. } => "command-privileged_cancel_request",
351                Command::GetWebhook { .. } => "command-get_webhook",
352                Command::GetSystemVars { .. } => "command-get_system_vars",
353                Command::SetSystemVars { .. } => "command-set_system_vars",
354                Command::Terminate { .. } => "command-terminate",
355                Command::RetireExecute { .. } => "command-retire_execute",
356                Command::CheckConsistency { .. } => "command-check_consistency",
357                Command::Dump { .. } => "command-dump",
358                Command::AuthenticatePassword { .. } => "command-auth_check",
359                Command::AuthenticateGetSASLChallenge { .. } => "command-auth_get_sasl_challenge",
360                Command::AuthenticateVerifySASLProof { .. } => "command-auth_verify_sasl_proof",
361            },
362            Message::ControllerReady {
363                controller: ControllerReadiness::Compute,
364            } => "controller_ready(compute)",
365            Message::ControllerReady {
366                controller: ControllerReadiness::Storage,
367            } => "controller_ready(storage)",
368            Message::ControllerReady {
369                controller: ControllerReadiness::Metrics,
370            } => "controller_ready(metrics)",
371            Message::ControllerReady {
372                controller: ControllerReadiness::Internal,
373            } => "controller_ready(internal)",
374            Message::PurifiedStatementReady(_) => "purified_statement_ready",
375            Message::CreateConnectionValidationReady(_) => "create_connection_validation_ready",
376            Message::TryDeferred { .. } => "try_deferred",
377            Message::GroupCommitInitiate(..) => "group_commit_initiate",
378            Message::AdvanceTimelines => "advance_timelines",
379            Message::ClusterEvent(_) => "cluster_event",
380            Message::CancelPendingPeeks { .. } => "cancel_pending_peeks",
381            Message::LinearizeReads => "linearize_reads",
382            Message::StagedBatches { .. } => "staged_batches",
383            Message::StorageUsageSchedule => "storage_usage_schedule",
384            Message::StorageUsageFetch => "storage_usage_fetch",
385            Message::StorageUsageUpdate(_) => "storage_usage_update",
386            Message::StorageUsagePrune(_) => "storage_usage_prune",
387            Message::RetireExecute { .. } => "retire_execute",
388            Message::ExecuteSingleStatementTransaction { .. } => {
389                "execute_single_statement_transaction"
390            }
391            Message::PeekStageReady { .. } => "peek_stage_ready",
392            Message::ExplainTimestampStageReady { .. } => "explain_timestamp_stage_ready",
393            Message::CreateIndexStageReady { .. } => "create_index_stage_ready",
394            Message::CreateViewStageReady { .. } => "create_view_stage_ready",
395            Message::CreateMaterializedViewStageReady { .. } => {
396                "create_materialized_view_stage_ready"
397            }
398            Message::SubscribeStageReady { .. } => "subscribe_stage_ready",
399            Message::IntrospectionSubscribeStageReady { .. } => {
400                "introspection_subscribe_stage_ready"
401            }
402            Message::SecretStageReady { .. } => "secret_stage_ready",
403            Message::ClusterStageReady { .. } => "cluster_stage_ready",
404            Message::DrainStatementLog => "drain_statement_log",
405            Message::AlterConnectionValidationReady(..) => "alter_connection_validation_ready",
406            Message::PrivateLinkVpcEndpointEvents(_) => "private_link_vpc_endpoint_events",
407            Message::CheckSchedulingPolicies => "check_scheduling_policies",
408            Message::SchedulingDecisions { .. } => "scheduling_decision",
409            Message::DeferredStatementReady => "deferred_statement_ready",
410        }
411    }
412}
413
414/// The reason for why a controller needs processing on the main loop.
415#[derive(Debug)]
416pub enum ControllerReadiness {
417    /// The storage controller is ready.
418    Storage,
419    /// The compute controller is ready.
420    Compute,
421    /// A batch of metric data is ready.
422    Metrics,
423    /// An internally-generated message is ready to be returned.
424    Internal,
425}
426
427#[derive(Derivative)]
428#[derivative(Debug)]
429pub struct BackgroundWorkResult<T> {
430    #[derivative(Debug = "ignore")]
431    pub ctx: ExecuteContext,
432    pub result: Result<T, AdapterError>,
433    pub params: Params,
434    pub plan_validity: PlanValidity,
435    pub original_stmt: Arc<Statement<Raw>>,
436    pub otel_ctx: OpenTelemetryContext,
437}
438
439pub type PurifiedStatementReady = BackgroundWorkResult<mz_sql::pure::PurifiedStatement>;
440
441#[derive(Derivative)]
442#[derivative(Debug)]
443pub struct ValidationReady<T> {
444    #[derivative(Debug = "ignore")]
445    pub ctx: ExecuteContext,
446    pub result: Result<T, AdapterError>,
447    pub resolved_ids: ResolvedIds,
448    pub connection_id: CatalogItemId,
449    pub connection_gid: GlobalId,
450    pub plan_validity: PlanValidity,
451    pub otel_ctx: OpenTelemetryContext,
452}
453
454pub type CreateConnectionValidationReady = ValidationReady<CreateConnectionPlan>;
455pub type AlterConnectionValidationReady = ValidationReady<Connection>;
456
457#[derive(Debug)]
458pub enum PeekStage {
459    /// Common stages across SELECT, EXPLAIN and COPY TO queries.
460    LinearizeTimestamp(PeekStageLinearizeTimestamp),
461    RealTimeRecency(PeekStageRealTimeRecency),
462    TimestampReadHold(PeekStageTimestampReadHold),
463    Optimize(PeekStageOptimize),
464    /// Final stage for a peek.
465    Finish(PeekStageFinish),
466    /// Final stage for an explain.
467    ExplainPlan(PeekStageExplainPlan),
468    ExplainPushdown(PeekStageExplainPushdown),
469    /// Preflight checks for a copy to operation.
470    CopyToPreflight(PeekStageCopyTo),
471    /// Final stage for a copy to which involves shipping the dataflow.
472    CopyToDataflow(PeekStageCopyTo),
473}
474
475#[derive(Debug)]
476pub struct CopyToContext {
477    /// The `RelationDesc` of the data to be copied.
478    pub desc: RelationDesc,
479    /// The destination uri of the external service where the data will be copied.
480    pub uri: Uri,
481    /// Connection information required to connect to the external service to copy the data.
482    pub connection: StorageConnection<ReferencedConnection>,
483    /// The ID of the CONNECTION object to be used for copying the data.
484    pub connection_id: CatalogItemId,
485    /// Format params to format the data.
486    pub format: S3SinkFormat,
487    /// Approximate max file size of each uploaded file.
488    pub max_file_size: u64,
489    /// Number of batches the output of the COPY TO will be partitioned into
490    /// to distribute the load across workers deterministically.
491    /// This is only an option since it's not set when CopyToContext is instantiated
492    /// but immediately after in the PeekStageValidate stage.
493    pub output_batch_count: Option<u64>,
494}
495
496#[derive(Debug)]
497pub struct PeekStageLinearizeTimestamp {
498    validity: PlanValidity,
499    plan: mz_sql::plan::SelectPlan,
500    max_query_result_size: Option<u64>,
501    source_ids: BTreeSet<GlobalId>,
502    target_replica: Option<ReplicaId>,
503    timeline_context: TimelineContext,
504    optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
505    /// An optional context set iff the state machine is initiated from
506    /// sequencing an EXPLAIN for this statement.
507    explain_ctx: ExplainContext,
508}
509
510#[derive(Debug)]
511pub struct PeekStageRealTimeRecency {
512    validity: PlanValidity,
513    plan: mz_sql::plan::SelectPlan,
514    max_query_result_size: Option<u64>,
515    source_ids: BTreeSet<GlobalId>,
516    target_replica: Option<ReplicaId>,
517    timeline_context: TimelineContext,
518    oracle_read_ts: Option<Timestamp>,
519    optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
520    /// An optional context set iff the state machine is initiated from
521    /// sequencing an EXPLAIN for this statement.
522    explain_ctx: ExplainContext,
523}
524
525#[derive(Debug)]
526pub struct PeekStageTimestampReadHold {
527    validity: PlanValidity,
528    plan: mz_sql::plan::SelectPlan,
529    max_query_result_size: Option<u64>,
530    source_ids: BTreeSet<GlobalId>,
531    target_replica: Option<ReplicaId>,
532    timeline_context: TimelineContext,
533    oracle_read_ts: Option<Timestamp>,
534    real_time_recency_ts: Option<mz_repr::Timestamp>,
535    optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
536    /// An optional context set iff the state machine is initiated from
537    /// sequencing an EXPLAIN for this statement.
538    explain_ctx: ExplainContext,
539}
540
541#[derive(Debug)]
542pub struct PeekStageOptimize {
543    validity: PlanValidity,
544    plan: mz_sql::plan::SelectPlan,
545    max_query_result_size: Option<u64>,
546    source_ids: BTreeSet<GlobalId>,
547    id_bundle: CollectionIdBundle,
548    target_replica: Option<ReplicaId>,
549    determination: TimestampDetermination<mz_repr::Timestamp>,
550    optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
551    /// An optional context set iff the state machine is initiated from
552    /// sequencing an EXPLAIN for this statement.
553    explain_ctx: ExplainContext,
554}
555
556#[derive(Debug)]
557pub struct PeekStageFinish {
558    validity: PlanValidity,
559    plan: mz_sql::plan::SelectPlan,
560    max_query_result_size: Option<u64>,
561    id_bundle: CollectionIdBundle,
562    target_replica: Option<ReplicaId>,
563    source_ids: BTreeSet<GlobalId>,
564    determination: TimestampDetermination<mz_repr::Timestamp>,
565    cluster_id: ComputeInstanceId,
566    finishing: RowSetFinishing,
567    /// When present, an optimizer trace to be used for emitting a plan insights
568    /// notice.
569    plan_insights_optimizer_trace: Option<OptimizerTrace>,
570    insights_ctx: Option<Box<PlanInsightsContext>>,
571    global_lir_plan: optimize::peek::GlobalLirPlan,
572    optimization_finished_at: EpochMillis,
573}
574
575#[derive(Debug)]
576pub struct PeekStageCopyTo {
577    validity: PlanValidity,
578    optimizer: optimize::copy_to::Optimizer,
579    global_lir_plan: optimize::copy_to::GlobalLirPlan,
580    optimization_finished_at: EpochMillis,
581    source_ids: BTreeSet<GlobalId>,
582}
583
584#[derive(Debug)]
585pub struct PeekStageExplainPlan {
586    validity: PlanValidity,
587    optimizer: optimize::peek::Optimizer,
588    df_meta: DataflowMetainfo,
589    explain_ctx: ExplainPlanContext,
590    insights_ctx: Option<Box<PlanInsightsContext>>,
591}
592
593#[derive(Debug)]
594pub struct PeekStageExplainPushdown {
595    validity: PlanValidity,
596    determination: TimestampDetermination<mz_repr::Timestamp>,
597    imports: BTreeMap<GlobalId, MapFilterProject>,
598}
599
600#[derive(Debug)]
601pub enum CreateIndexStage {
602    Optimize(CreateIndexOptimize),
603    Finish(CreateIndexFinish),
604    Explain(CreateIndexExplain),
605}
606
607#[derive(Debug)]
608pub struct CreateIndexOptimize {
609    validity: PlanValidity,
610    plan: plan::CreateIndexPlan,
611    resolved_ids: ResolvedIds,
612    /// An optional context set iff the state machine is initiated from
613    /// sequencing an EXPLAIN for this statement.
614    explain_ctx: ExplainContext,
615}
616
617#[derive(Debug)]
618pub struct CreateIndexFinish {
619    validity: PlanValidity,
620    item_id: CatalogItemId,
621    global_id: GlobalId,
622    plan: plan::CreateIndexPlan,
623    resolved_ids: ResolvedIds,
624    global_mir_plan: optimize::index::GlobalMirPlan,
625    global_lir_plan: optimize::index::GlobalLirPlan,
626}
627
628#[derive(Debug)]
629pub struct CreateIndexExplain {
630    validity: PlanValidity,
631    exported_index_id: GlobalId,
632    plan: plan::CreateIndexPlan,
633    df_meta: DataflowMetainfo,
634    explain_ctx: ExplainPlanContext,
635}
636
637#[derive(Debug)]
638pub enum CreateViewStage {
639    Optimize(CreateViewOptimize),
640    Finish(CreateViewFinish),
641    Explain(CreateViewExplain),
642}
643
644#[derive(Debug)]
645pub struct CreateViewOptimize {
646    validity: PlanValidity,
647    plan: plan::CreateViewPlan,
648    resolved_ids: ResolvedIds,
649    /// An optional context set iff the state machine is initiated from
650    /// sequencing an EXPLAIN for this statement.
651    explain_ctx: ExplainContext,
652}
653
654#[derive(Debug)]
655pub struct CreateViewFinish {
656    validity: PlanValidity,
657    /// ID of this item in the Catalog.
658    item_id: CatalogItemId,
659    /// ID by with Compute will reference this View.
660    global_id: GlobalId,
661    plan: plan::CreateViewPlan,
662    /// IDs of objects resolved during name resolution.
663    resolved_ids: ResolvedIds,
664    optimized_expr: OptimizedMirRelationExpr,
665}
666
667#[derive(Debug)]
668pub struct CreateViewExplain {
669    validity: PlanValidity,
670    id: GlobalId,
671    plan: plan::CreateViewPlan,
672    explain_ctx: ExplainPlanContext,
673}
674
675#[derive(Debug)]
676pub enum ExplainTimestampStage {
677    Optimize(ExplainTimestampOptimize),
678    RealTimeRecency(ExplainTimestampRealTimeRecency),
679    Finish(ExplainTimestampFinish),
680}
681
682#[derive(Debug)]
683pub struct ExplainTimestampOptimize {
684    validity: PlanValidity,
685    plan: plan::ExplainTimestampPlan,
686    cluster_id: ClusterId,
687}
688
689#[derive(Debug)]
690pub struct ExplainTimestampRealTimeRecency {
691    validity: PlanValidity,
692    format: ExplainFormat,
693    optimized_plan: OptimizedMirRelationExpr,
694    cluster_id: ClusterId,
695    when: QueryWhen,
696}
697
698#[derive(Debug)]
699pub struct ExplainTimestampFinish {
700    validity: PlanValidity,
701    format: ExplainFormat,
702    optimized_plan: OptimizedMirRelationExpr,
703    cluster_id: ClusterId,
704    source_ids: BTreeSet<GlobalId>,
705    when: QueryWhen,
706    real_time_recency_ts: Option<Timestamp>,
707}
708
709#[derive(Debug)]
710pub enum ClusterStage {
711    Alter(AlterCluster),
712    WaitForHydrated(AlterClusterWaitForHydrated),
713    Finalize(AlterClusterFinalize),
714}
715
716#[derive(Debug)]
717pub struct AlterCluster {
718    validity: PlanValidity,
719    plan: plan::AlterClusterPlan,
720}
721
722#[derive(Debug)]
723pub struct AlterClusterWaitForHydrated {
724    validity: PlanValidity,
725    plan: plan::AlterClusterPlan,
726    new_config: ClusterVariantManaged,
727    timeout_time: Instant,
728    on_timeout: OnTimeoutAction,
729}
730
731#[derive(Debug)]
732pub struct AlterClusterFinalize {
733    validity: PlanValidity,
734    plan: plan::AlterClusterPlan,
735    new_config: ClusterVariantManaged,
736}
737
738#[derive(Debug)]
739pub enum ExplainContext {
740    /// The ordinary, non-explain variant of the statement.
741    None,
742    /// The `EXPLAIN <level> PLAN FOR <explainee>` version of the statement.
743    Plan(ExplainPlanContext),
744    /// Generate a notice containing the `EXPLAIN PLAN INSIGHTS` output
745    /// alongside the query's normal output.
746    PlanInsightsNotice(OptimizerTrace),
747    /// `EXPLAIN FILTER PUSHDOWN`
748    Pushdown,
749}
750
751impl ExplainContext {
752    /// If available for this context, wrap the [`OptimizerTrace`] into a
753    /// [`tracing::Dispatch`] and set it as default, returning the resulting
754    /// guard in a `Some(guard)` option.
755    fn dispatch_guard(&self) -> Option<DispatchGuard<'_>> {
756        let optimizer_trace = match self {
757            ExplainContext::Plan(explain_ctx) => Some(&explain_ctx.optimizer_trace),
758            ExplainContext::PlanInsightsNotice(optimizer_trace) => Some(optimizer_trace),
759            _ => None,
760        };
761        optimizer_trace.map(|optimizer_trace| optimizer_trace.as_guard())
762    }
763
764    fn needs_cluster(&self) -> bool {
765        match self {
766            ExplainContext::None => true,
767            ExplainContext::Plan(..) => false,
768            ExplainContext::PlanInsightsNotice(..) => true,
769            ExplainContext::Pushdown => false,
770        }
771    }
772
773    fn needs_plan_insights(&self) -> bool {
774        matches!(
775            self,
776            ExplainContext::Plan(ExplainPlanContext {
777                stage: ExplainStage::PlanInsights,
778                ..
779            }) | ExplainContext::PlanInsightsNotice(_)
780        )
781    }
782}
783
784#[derive(Debug)]
785pub struct ExplainPlanContext {
786    pub broken: bool,
787    pub config: ExplainConfig,
788    pub format: ExplainFormat,
789    pub stage: ExplainStage,
790    pub replan: Option<GlobalId>,
791    pub desc: Option<RelationDesc>,
792    pub optimizer_trace: OptimizerTrace,
793}
794
795#[derive(Debug)]
796pub enum CreateMaterializedViewStage {
797    Optimize(CreateMaterializedViewOptimize),
798    Finish(CreateMaterializedViewFinish),
799    Explain(CreateMaterializedViewExplain),
800}
801
802#[derive(Debug)]
803pub struct CreateMaterializedViewOptimize {
804    validity: PlanValidity,
805    plan: plan::CreateMaterializedViewPlan,
806    resolved_ids: ResolvedIds,
807    /// An optional context set iff the state machine is initiated from
808    /// sequencing an EXPLAIN for this statement.
809    explain_ctx: ExplainContext,
810}
811
812#[derive(Debug)]
813pub struct CreateMaterializedViewFinish {
814    /// The ID of this Materialized View in the Catalog.
815    item_id: CatalogItemId,
816    /// The ID of the durable pTVC backing this Materialized View.
817    global_id: GlobalId,
818    validity: PlanValidity,
819    plan: plan::CreateMaterializedViewPlan,
820    resolved_ids: ResolvedIds,
821    local_mir_plan: optimize::materialized_view::LocalMirPlan,
822    global_mir_plan: optimize::materialized_view::GlobalMirPlan,
823    global_lir_plan: optimize::materialized_view::GlobalLirPlan,
824}
825
826#[derive(Debug)]
827pub struct CreateMaterializedViewExplain {
828    global_id: GlobalId,
829    validity: PlanValidity,
830    plan: plan::CreateMaterializedViewPlan,
831    df_meta: DataflowMetainfo,
832    explain_ctx: ExplainPlanContext,
833}
834
835#[derive(Debug)]
836pub enum SubscribeStage {
837    OptimizeMir(SubscribeOptimizeMir),
838    TimestampOptimizeLir(SubscribeTimestampOptimizeLir),
839    Finish(SubscribeFinish),
840}
841
842#[derive(Debug)]
843pub struct SubscribeOptimizeMir {
844    validity: PlanValidity,
845    plan: plan::SubscribePlan,
846    timeline: TimelineContext,
847    dependency_ids: BTreeSet<GlobalId>,
848    cluster_id: ComputeInstanceId,
849    replica_id: Option<ReplicaId>,
850}
851
852#[derive(Debug)]
853pub struct SubscribeTimestampOptimizeLir {
854    validity: PlanValidity,
855    plan: plan::SubscribePlan,
856    timeline: TimelineContext,
857    optimizer: optimize::subscribe::Optimizer,
858    global_mir_plan: optimize::subscribe::GlobalMirPlan<optimize::subscribe::Unresolved>,
859    dependency_ids: BTreeSet<GlobalId>,
860    replica_id: Option<ReplicaId>,
861}
862
863#[derive(Debug)]
864pub struct SubscribeFinish {
865    validity: PlanValidity,
866    cluster_id: ComputeInstanceId,
867    replica_id: Option<ReplicaId>,
868    plan: plan::SubscribePlan,
869    global_lir_plan: optimize::subscribe::GlobalLirPlan,
870    dependency_ids: BTreeSet<GlobalId>,
871}
872
873#[derive(Debug)]
874pub enum IntrospectionSubscribeStage {
875    OptimizeMir(IntrospectionSubscribeOptimizeMir),
876    TimestampOptimizeLir(IntrospectionSubscribeTimestampOptimizeLir),
877    Finish(IntrospectionSubscribeFinish),
878}
879
880#[derive(Debug)]
881pub struct IntrospectionSubscribeOptimizeMir {
882    validity: PlanValidity,
883    plan: plan::SubscribePlan,
884    subscribe_id: GlobalId,
885    cluster_id: ComputeInstanceId,
886    replica_id: ReplicaId,
887}
888
889#[derive(Debug)]
890pub struct IntrospectionSubscribeTimestampOptimizeLir {
891    validity: PlanValidity,
892    optimizer: optimize::subscribe::Optimizer,
893    global_mir_plan: optimize::subscribe::GlobalMirPlan<optimize::subscribe::Unresolved>,
894    cluster_id: ComputeInstanceId,
895    replica_id: ReplicaId,
896}
897
898#[derive(Debug)]
899pub struct IntrospectionSubscribeFinish {
900    validity: PlanValidity,
901    global_lir_plan: optimize::subscribe::GlobalLirPlan,
902    read_holds: ReadHolds<Timestamp>,
903    cluster_id: ComputeInstanceId,
904    replica_id: ReplicaId,
905}
906
907#[derive(Debug)]
908pub enum SecretStage {
909    CreateEnsure(CreateSecretEnsure),
910    CreateFinish(CreateSecretFinish),
911    RotateKeysEnsure(RotateKeysSecretEnsure),
912    RotateKeysFinish(RotateKeysSecretFinish),
913    Alter(AlterSecret),
914}
915
916#[derive(Debug)]
917pub struct CreateSecretEnsure {
918    validity: PlanValidity,
919    plan: plan::CreateSecretPlan,
920}
921
922#[derive(Debug)]
923pub struct CreateSecretFinish {
924    validity: PlanValidity,
925    item_id: CatalogItemId,
926    global_id: GlobalId,
927    plan: plan::CreateSecretPlan,
928}
929
930#[derive(Debug)]
931pub struct RotateKeysSecretEnsure {
932    validity: PlanValidity,
933    id: CatalogItemId,
934}
935
936#[derive(Debug)]
937pub struct RotateKeysSecretFinish {
938    validity: PlanValidity,
939    ops: Vec<crate::catalog::Op>,
940}
941
942#[derive(Debug)]
943pub struct AlterSecret {
944    validity: PlanValidity,
945    plan: plan::AlterSecretPlan,
946}
947
948/// An enum describing which cluster to run a statement on.
949///
950/// One example usage would be that if a query depends only on system tables, we might
951/// automatically run it on the catalog server cluster to benefit from indexes that exist there.
952#[derive(Debug, Copy, Clone, PartialEq, Eq)]
953pub enum TargetCluster {
954    /// The catalog server cluster.
955    CatalogServer,
956    /// The current user's active cluster.
957    Active,
958    /// The cluster selected at the start of a transaction.
959    Transaction(ClusterId),
960}
961
962/// Result types for each stage of a sequence.
963pub(crate) enum StageResult<T> {
964    /// A task was spawned that will return the next stage.
965    Handle(JoinHandle<Result<T, AdapterError>>),
966    /// A task was spawned that will return a response for the client.
967    HandleRetire(JoinHandle<Result<ExecuteResponse, AdapterError>>),
968    /// The next stage is immediately ready and will execute.
969    Immediate(T),
970    /// The final stage was executed and is ready to respond to the client.
971    Response(ExecuteResponse),
972}
973
974/// Common functionality for [Coordinator::sequence_staged].
975pub(crate) trait Staged: Send {
976    type Ctx: StagedContext;
977
978    fn validity(&mut self) -> &mut PlanValidity;
979
980    /// Returns the next stage or final result.
981    async fn stage(
982        self,
983        coord: &mut Coordinator,
984        ctx: &mut Self::Ctx,
985    ) -> Result<StageResult<Box<Self>>, AdapterError>;
986
987    /// Prepares a message for the Coordinator.
988    fn message(self, ctx: Self::Ctx, span: Span) -> Message;
989
990    /// Whether it is safe to SQL cancel this stage.
991    fn cancel_enabled(&self) -> bool;
992}
993
994pub trait StagedContext {
995    fn retire(self, result: Result<ExecuteResponse, AdapterError>);
996    fn session(&self) -> Option<&Session>;
997}
998
999impl StagedContext for ExecuteContext {
1000    fn retire(self, result: Result<ExecuteResponse, AdapterError>) {
1001        self.retire(result);
1002    }
1003
1004    fn session(&self) -> Option<&Session> {
1005        Some(self.session())
1006    }
1007}
1008
1009impl StagedContext for () {
1010    fn retire(self, _result: Result<ExecuteResponse, AdapterError>) {}
1011
1012    fn session(&self) -> Option<&Session> {
1013        None
1014    }
1015}
1016
1017/// Configures a coordinator.
1018pub struct Config {
1019    pub controller_config: ControllerConfig,
1020    pub controller_envd_epoch: NonZeroI64,
1021    pub storage: Box<dyn mz_catalog::durable::DurableCatalogState>,
1022    pub audit_logs_iterator: AuditLogIterator,
1023    pub timestamp_oracle_url: Option<SensitiveUrl>,
1024    pub unsafe_mode: bool,
1025    pub all_features: bool,
1026    pub build_info: &'static BuildInfo,
1027    pub environment_id: EnvironmentId,
1028    pub metrics_registry: MetricsRegistry,
1029    pub now: NowFn,
1030    pub secrets_controller: Arc<dyn SecretsController>,
1031    pub cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
1032    pub availability_zones: Vec<String>,
1033    pub cluster_replica_sizes: ClusterReplicaSizeMap,
1034    pub builtin_system_cluster_config: BootstrapBuiltinClusterConfig,
1035    pub builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig,
1036    pub builtin_probe_cluster_config: BootstrapBuiltinClusterConfig,
1037    pub builtin_support_cluster_config: BootstrapBuiltinClusterConfig,
1038    pub builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig,
1039    pub system_parameter_defaults: BTreeMap<String, String>,
1040    pub storage_usage_client: StorageUsageClient,
1041    pub storage_usage_collection_interval: Duration,
1042    pub storage_usage_retention_period: Option<Duration>,
1043    pub segment_client: Option<mz_segment::Client>,
1044    pub egress_addresses: Vec<IpNet>,
1045    pub remote_system_parameters: Option<BTreeMap<String, String>>,
1046    pub aws_account_id: Option<String>,
1047    pub aws_privatelink_availability_zones: Option<Vec<String>>,
1048    pub connection_context: ConnectionContext,
1049    pub connection_limit_callback: Box<dyn Fn(u64, u64) -> () + Send + Sync + 'static>,
1050    pub webhook_concurrency_limit: WebhookConcurrencyLimiter,
1051    pub http_host_name: Option<String>,
1052    pub tracing_handle: TracingHandle,
1053    /// Whether or not to start controllers in read-only mode. This is only
1054    /// meant for use during development of read-only clusters and 0dt upgrades
1055    /// and should go away once we have proper orchestration during upgrades.
1056    pub read_only_controllers: bool,
1057
1058    /// A trigger that signals that the current deployment has caught up with a
1059    /// previous deployment. Only used during 0dt deployment, while in read-only
1060    /// mode.
1061    pub caught_up_trigger: Option<Trigger>,
1062
1063    pub helm_chart_version: Option<String>,
1064    pub license_key: ValidatedLicenseKey,
1065    pub external_login_password_mz_system: Option<Password>,
1066}
1067
1068/// Metadata about an active connection.
1069#[derive(Debug, Serialize)]
1070pub struct ConnMeta {
1071    /// Pgwire specifies that every connection have a 32-bit secret associated
1072    /// with it, that is known to both the client and the server. Cancellation
1073    /// requests are required to authenticate with the secret of the connection
1074    /// that they are targeting.
1075    secret_key: u32,
1076    /// The time when the session's connection was initiated.
1077    connected_at: EpochMillis,
1078    user: User,
1079    application_name: String,
1080    uuid: Uuid,
1081    conn_id: ConnectionId,
1082    client_ip: Option<IpAddr>,
1083
1084    /// Sinks that will need to be dropped when the current transaction, if
1085    /// any, is cleared.
1086    drop_sinks: BTreeSet<GlobalId>,
1087
1088    /// Lock for the Coordinator's deferred statements that is dropped on transaction clear.
1089    #[serde(skip)]
1090    deferred_lock: Option<OwnedMutexGuard<()>>,
1091
1092    /// Cluster reconfigurations that will need to be
1093    /// cleaned up when the current transaction is cleared
1094    pending_cluster_alters: BTreeSet<ClusterId>,
1095
1096    /// Channel on which to send notices to a session.
1097    #[serde(skip)]
1098    notice_tx: mpsc::UnboundedSender<AdapterNotice>,
1099
1100    /// The role that initiated the database context. Fixed for the duration of the connection.
1101    /// WARNING: This role reference is not updated when the role is dropped.
1102    /// Consumers should not assume that this role exist.
1103    authenticated_role: RoleId,
1104}
1105
1106impl ConnMeta {
1107    pub fn conn_id(&self) -> &ConnectionId {
1108        &self.conn_id
1109    }
1110
1111    pub fn user(&self) -> &User {
1112        &self.user
1113    }
1114
1115    pub fn application_name(&self) -> &str {
1116        &self.application_name
1117    }
1118
1119    pub fn authenticated_role_id(&self) -> &RoleId {
1120        &self.authenticated_role
1121    }
1122
1123    pub fn uuid(&self) -> Uuid {
1124        self.uuid
1125    }
1126
1127    pub fn client_ip(&self) -> Option<IpAddr> {
1128        self.client_ip
1129    }
1130
1131    pub fn connected_at(&self) -> EpochMillis {
1132        self.connected_at
1133    }
1134}
1135
1136#[derive(Debug)]
1137/// A pending transaction waiting to be committed.
1138pub struct PendingTxn {
1139    /// Context used to send a response back to the client.
1140    ctx: ExecuteContext,
1141    /// Client response for transaction.
1142    response: Result<PendingTxnResponse, AdapterError>,
1143    /// The action to take at the end of the transaction.
1144    action: EndTransactionAction,
1145}
1146
1147#[derive(Debug)]
1148/// The response we'll send for a [`PendingTxn`].
1149pub enum PendingTxnResponse {
1150    /// The transaction will be committed.
1151    Committed {
1152        /// Parameters that will change, and their values, once this transaction is complete.
1153        params: BTreeMap<&'static str, String>,
1154    },
1155    /// The transaction will be rolled back.
1156    Rolledback {
1157        /// Parameters that will change, and their values, once this transaction is complete.
1158        params: BTreeMap<&'static str, String>,
1159    },
1160}
1161
1162impl PendingTxnResponse {
1163    pub fn extend_params(&mut self, p: impl IntoIterator<Item = (&'static str, String)>) {
1164        match self {
1165            PendingTxnResponse::Committed { params }
1166            | PendingTxnResponse::Rolledback { params } => params.extend(p),
1167        }
1168    }
1169}
1170
1171impl From<PendingTxnResponse> for ExecuteResponse {
1172    fn from(value: PendingTxnResponse) -> Self {
1173        match value {
1174            PendingTxnResponse::Committed { params } => {
1175                ExecuteResponse::TransactionCommitted { params }
1176            }
1177            PendingTxnResponse::Rolledback { params } => {
1178                ExecuteResponse::TransactionRolledBack { params }
1179            }
1180        }
1181    }
1182}
1183
1184#[derive(Debug)]
1185/// A pending read transaction waiting to be linearized along with metadata about it's state
1186pub struct PendingReadTxn {
1187    /// The transaction type
1188    txn: PendingRead,
1189    /// The timestamp context of the transaction.
1190    timestamp_context: TimestampContext<mz_repr::Timestamp>,
1191    /// When we created this pending txn, when the transaction ends. Only used for metrics.
1192    created: Instant,
1193    /// Number of times we requeued the processing of this pending read txn.
1194    /// Requeueing is necessary if the time we executed the query is after the current oracle time;
1195    /// see [`Coordinator::message_linearize_reads`] for more details.
1196    num_requeues: u64,
1197    /// Telemetry context.
1198    otel_ctx: OpenTelemetryContext,
1199}
1200
1201impl PendingReadTxn {
1202    /// Return the timestamp context of the pending read transaction.
1203    pub fn timestamp_context(&self) -> &TimestampContext<mz_repr::Timestamp> {
1204        &self.timestamp_context
1205    }
1206
1207    pub(crate) fn take_context(self) -> ExecuteContext {
1208        self.txn.take_context()
1209    }
1210}
1211
1212#[derive(Debug)]
1213/// A pending read transaction waiting to be linearized.
1214enum PendingRead {
1215    Read {
1216        /// The inner transaction.
1217        txn: PendingTxn,
1218    },
1219    ReadThenWrite {
1220        /// Context used to send a response back to the client.
1221        ctx: ExecuteContext,
1222        /// Channel used to alert the transaction that the read has been linearized and send back
1223        /// `ctx`.
1224        tx: oneshot::Sender<Option<ExecuteContext>>,
1225    },
1226}
1227
1228impl PendingRead {
1229    /// Alert the client that the read has been linearized.
1230    ///
1231    /// If it is necessary to finalize an execute, return the state necessary to do so
1232    /// (execution context and result)
1233    #[instrument(level = "debug")]
1234    pub fn finish(self) -> Option<(ExecuteContext, Result<ExecuteResponse, AdapterError>)> {
1235        match self {
1236            PendingRead::Read {
1237                txn:
1238                    PendingTxn {
1239                        mut ctx,
1240                        response,
1241                        action,
1242                    },
1243                ..
1244            } => {
1245                let changed = ctx.session_mut().vars_mut().end_transaction(action);
1246                // Append any parameters that changed to the response.
1247                let response = response.map(|mut r| {
1248                    r.extend_params(changed);
1249                    ExecuteResponse::from(r)
1250                });
1251
1252                Some((ctx, response))
1253            }
1254            PendingRead::ReadThenWrite { ctx, tx, .. } => {
1255                // Ignore errors if the caller has hung up.
1256                let _ = tx.send(Some(ctx));
1257                None
1258            }
1259        }
1260    }
1261
1262    fn label(&self) -> &'static str {
1263        match self {
1264            PendingRead::Read { .. } => "read",
1265            PendingRead::ReadThenWrite { .. } => "read_then_write",
1266        }
1267    }
1268
1269    pub(crate) fn take_context(self) -> ExecuteContext {
1270        match self {
1271            PendingRead::Read { txn, .. } => txn.ctx,
1272            PendingRead::ReadThenWrite { ctx, tx, .. } => {
1273                // Inform the transaction that we've taken their context.
1274                // Ignore errors if the caller has hung up.
1275                let _ = tx.send(None);
1276                ctx
1277            }
1278        }
1279    }
1280}
1281
1282/// State that the coordinator must process as part of retiring
1283/// command execution.  `ExecuteContextExtra::Default` is guaranteed
1284/// to produce a value that will cause the coordinator to do nothing, and
1285/// is intended for use by code that invokes the execution processing flow
1286/// (i.e., `sequence_plan`) without actually being a statement execution.
1287///
1288/// This struct must not be dropped if it contains non-trivial
1289/// state. The only valid way to get rid of it is to pass it to the
1290/// coordinator for retirement. To enforce this, we assert in the
1291/// `Drop` implementation.
1292#[derive(Debug, Default)]
1293#[must_use]
1294pub struct ExecuteContextExtra {
1295    statement_uuid: Option<StatementLoggingId>,
1296}
1297
1298impl ExecuteContextExtra {
1299    pub(crate) fn new(statement_uuid: Option<StatementLoggingId>) -> Self {
1300        Self { statement_uuid }
1301    }
1302    pub fn is_trivial(&self) -> bool {
1303        let Self { statement_uuid } = self;
1304        statement_uuid.is_none()
1305    }
1306    pub fn contents(&self) -> Option<StatementLoggingId> {
1307        let Self { statement_uuid } = self;
1308        *statement_uuid
1309    }
1310    /// Take responsibility for the contents.  This should only be
1311    /// called from code that knows what to do to finish up logging
1312    /// based on the inner value.
1313    #[must_use]
1314    fn retire(mut self) -> Option<StatementLoggingId> {
1315        let Self { statement_uuid } = &mut self;
1316        statement_uuid.take()
1317    }
1318}
1319
1320impl Drop for ExecuteContextExtra {
1321    fn drop(&mut self) {
1322        let Self { statement_uuid } = &*self;
1323        if let Some(statement_uuid) = statement_uuid {
1324            // Note: the impact when this error hits
1325            // is that the statement will never be marked
1326            // as finished in the statement log.
1327            soft_panic_or_log!(
1328                "execute context for statement {statement_uuid:?} dropped without being properly retired."
1329            );
1330        }
1331    }
1332}
1333
1334/// Bundle of state related to statement execution.
1335///
1336/// This struct collects a bundle of state that needs to be threaded
1337/// through various functions as part of statement execution.
1338/// Currently, it is only used to finalize execution, by calling one
1339/// of the methods `retire` or `retire_aysnc`. Finalizing execution
1340/// involves sending the session back to the pgwire layer so that it
1341/// may be used to process further commands. In the future, it will
1342/// also involve performing some work on the main coordinator thread
1343/// (e.g., recording the time at which the statement finished
1344/// executing) the state necessary to perform this work is bundled in
1345/// the `ExecuteContextExtra` object (today, it is simply empty).
1346#[derive(Debug)]
1347pub struct ExecuteContext {
1348    inner: Box<ExecuteContextInner>,
1349}
1350
1351impl std::ops::Deref for ExecuteContext {
1352    type Target = ExecuteContextInner;
1353    fn deref(&self) -> &Self::Target {
1354        &*self.inner
1355    }
1356}
1357
1358impl std::ops::DerefMut for ExecuteContext {
1359    fn deref_mut(&mut self) -> &mut Self::Target {
1360        &mut *self.inner
1361    }
1362}
1363
1364#[derive(Debug)]
1365pub struct ExecuteContextInner {
1366    tx: ClientTransmitter<ExecuteResponse>,
1367    internal_cmd_tx: mpsc::UnboundedSender<Message>,
1368    session: Session,
1369    extra: ExecuteContextExtra,
1370}
1371
1372impl ExecuteContext {
1373    pub fn session(&self) -> &Session {
1374        &self.session
1375    }
1376
1377    pub fn session_mut(&mut self) -> &mut Session {
1378        &mut self.session
1379    }
1380
1381    pub fn tx(&self) -> &ClientTransmitter<ExecuteResponse> {
1382        &self.tx
1383    }
1384
1385    pub fn tx_mut(&mut self) -> &mut ClientTransmitter<ExecuteResponse> {
1386        &mut self.tx
1387    }
1388
1389    pub fn from_parts(
1390        tx: ClientTransmitter<ExecuteResponse>,
1391        internal_cmd_tx: mpsc::UnboundedSender<Message>,
1392        session: Session,
1393        extra: ExecuteContextExtra,
1394    ) -> Self {
1395        Self {
1396            inner: ExecuteContextInner {
1397                tx,
1398                session,
1399                extra,
1400                internal_cmd_tx,
1401            }
1402            .into(),
1403        }
1404    }
1405
1406    /// By calling this function, the caller takes responsibility for
1407    /// dealing with the instance of `ExecuteContextExtra`. This is
1408    /// intended to support protocols (like `COPY FROM`) that involve
1409    /// multiple passes of sending the session back and forth between
1410    /// the coordinator and the pgwire layer. As part of any such
1411    /// protocol, we must ensure that the `ExecuteContextExtra`
1412    /// (possibly wrapped in a new `ExecuteContext`) is passed back to the coordinator for
1413    /// eventual retirement.
1414    pub fn into_parts(
1415        self,
1416    ) -> (
1417        ClientTransmitter<ExecuteResponse>,
1418        mpsc::UnboundedSender<Message>,
1419        Session,
1420        ExecuteContextExtra,
1421    ) {
1422        let ExecuteContextInner {
1423            tx,
1424            internal_cmd_tx,
1425            session,
1426            extra,
1427        } = *self.inner;
1428        (tx, internal_cmd_tx, session, extra)
1429    }
1430
1431    /// Retire the execution, by sending a message to the coordinator.
1432    #[instrument(level = "debug")]
1433    pub fn retire(self, result: Result<ExecuteResponse, AdapterError>) {
1434        let ExecuteContextInner {
1435            tx,
1436            internal_cmd_tx,
1437            session,
1438            extra,
1439        } = *self.inner;
1440        let reason = if extra.is_trivial() {
1441            None
1442        } else {
1443            Some((&result).into())
1444        };
1445        tx.send(result, session);
1446        if let Some(reason) = reason {
1447            if let Err(e) = internal_cmd_tx.send(Message::RetireExecute {
1448                otel_ctx: OpenTelemetryContext::obtain(),
1449                data: extra,
1450                reason,
1451            }) {
1452                warn!("internal_cmd_rx dropped before we could send: {:?}", e);
1453            }
1454        }
1455    }
1456
1457    pub fn extra(&self) -> &ExecuteContextExtra {
1458        &self.extra
1459    }
1460
1461    pub fn extra_mut(&mut self) -> &mut ExecuteContextExtra {
1462        &mut self.extra
1463    }
1464}
1465
1466#[derive(Debug)]
1467struct ClusterReplicaStatuses(
1468    BTreeMap<ClusterId, BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>>>,
1469);
1470
1471impl ClusterReplicaStatuses {
1472    pub(crate) fn new() -> ClusterReplicaStatuses {
1473        ClusterReplicaStatuses(BTreeMap::new())
1474    }
1475
1476    /// Initializes the statuses of the specified cluster.
1477    ///
1478    /// Panics if the cluster statuses are already initialized.
1479    pub(crate) fn initialize_cluster_statuses(&mut self, cluster_id: ClusterId) {
1480        let prev = self.0.insert(cluster_id, BTreeMap::new());
1481        assert_eq!(
1482            prev, None,
1483            "cluster {cluster_id} statuses already initialized"
1484        );
1485    }
1486
1487    /// Initializes the statuses of the specified cluster replica.
1488    ///
1489    /// Panics if the cluster replica statuses are already initialized.
1490    pub(crate) fn initialize_cluster_replica_statuses(
1491        &mut self,
1492        cluster_id: ClusterId,
1493        replica_id: ReplicaId,
1494        num_processes: usize,
1495        time: DateTime<Utc>,
1496    ) {
1497        tracing::info!(
1498            ?cluster_id,
1499            ?replica_id,
1500            ?time,
1501            "initializing cluster replica status"
1502        );
1503        let replica_statuses = self.0.entry(cluster_id).or_default();
1504        let process_statuses = (0..num_processes)
1505            .map(|process_id| {
1506                let status = ClusterReplicaProcessStatus {
1507                    status: ClusterStatus::Offline(Some(OfflineReason::Initializing)),
1508                    time: time.clone(),
1509                };
1510                (u64::cast_from(process_id), status)
1511            })
1512            .collect();
1513        let prev = replica_statuses.insert(replica_id, process_statuses);
1514        assert_none!(
1515            prev,
1516            "cluster replica {cluster_id}.{replica_id} statuses already initialized"
1517        );
1518    }
1519
1520    /// Removes the statuses of the specified cluster.
1521    ///
1522    /// Panics if the cluster does not exist.
1523    pub(crate) fn remove_cluster_statuses(
1524        &mut self,
1525        cluster_id: &ClusterId,
1526    ) -> BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>> {
1527        let prev = self.0.remove(cluster_id);
1528        prev.unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"))
1529    }
1530
1531    /// Removes the statuses of the specified cluster replica.
1532    ///
1533    /// Panics if the cluster or replica does not exist.
1534    pub(crate) fn remove_cluster_replica_statuses(
1535        &mut self,
1536        cluster_id: &ClusterId,
1537        replica_id: &ReplicaId,
1538    ) -> BTreeMap<ProcessId, ClusterReplicaProcessStatus> {
1539        let replica_statuses = self
1540            .0
1541            .get_mut(cluster_id)
1542            .unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"));
1543        let prev = replica_statuses.remove(replica_id);
1544        prev.unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1545    }
1546
1547    /// Inserts or updates the status of the specified cluster replica process.
1548    ///
1549    /// Panics if the cluster or replica does not exist.
1550    pub(crate) fn ensure_cluster_status(
1551        &mut self,
1552        cluster_id: ClusterId,
1553        replica_id: ReplicaId,
1554        process_id: ProcessId,
1555        status: ClusterReplicaProcessStatus,
1556    ) {
1557        let replica_statuses = self
1558            .0
1559            .get_mut(&cluster_id)
1560            .unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"))
1561            .get_mut(&replica_id)
1562            .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"));
1563        replica_statuses.insert(process_id, status);
1564    }
1565
1566    /// Computes the status of the cluster replica as a whole.
1567    ///
1568    /// Panics if `cluster_id` or `replica_id` don't exist.
1569    pub fn get_cluster_replica_status(
1570        &self,
1571        cluster_id: ClusterId,
1572        replica_id: ReplicaId,
1573    ) -> ClusterStatus {
1574        let process_status = self.get_cluster_replica_statuses(cluster_id, replica_id);
1575        Self::cluster_replica_status(process_status)
1576    }
1577
1578    /// Computes the status of the cluster replica as a whole.
1579    pub fn cluster_replica_status(
1580        process_status: &BTreeMap<ProcessId, ClusterReplicaProcessStatus>,
1581    ) -> ClusterStatus {
1582        process_status
1583            .values()
1584            .fold(ClusterStatus::Online, |s, p| match (s, p.status) {
1585                (ClusterStatus::Online, ClusterStatus::Online) => ClusterStatus::Online,
1586                (x, y) => {
1587                    let reason_x = match x {
1588                        ClusterStatus::Offline(reason) => reason,
1589                        ClusterStatus::Online => None,
1590                    };
1591                    let reason_y = match y {
1592                        ClusterStatus::Offline(reason) => reason,
1593                        ClusterStatus::Online => None,
1594                    };
1595                    // Arbitrarily pick the first known not-ready reason.
1596                    ClusterStatus::Offline(reason_x.or(reason_y))
1597                }
1598            })
1599    }
1600
1601    /// Gets the statuses of the given cluster replica.
1602    ///
1603    /// Panics if the cluster or replica does not exist
1604    pub(crate) fn get_cluster_replica_statuses(
1605        &self,
1606        cluster_id: ClusterId,
1607        replica_id: ReplicaId,
1608    ) -> &BTreeMap<ProcessId, ClusterReplicaProcessStatus> {
1609        self.try_get_cluster_replica_statuses(cluster_id, replica_id)
1610            .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1611    }
1612
1613    /// Gets the statuses of the given cluster replica.
1614    pub(crate) fn try_get_cluster_replica_statuses(
1615        &self,
1616        cluster_id: ClusterId,
1617        replica_id: ReplicaId,
1618    ) -> Option<&BTreeMap<ProcessId, ClusterReplicaProcessStatus>> {
1619        self.try_get_cluster_statuses(cluster_id)
1620            .and_then(|statuses| statuses.get(&replica_id))
1621    }
1622
1623    /// Gets the statuses of the given cluster.
1624    pub(crate) fn try_get_cluster_statuses(
1625        &self,
1626        cluster_id: ClusterId,
1627    ) -> Option<&BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>>> {
1628        self.0.get(&cluster_id)
1629    }
1630}
1631
1632/// Glues the external world to the Timely workers.
1633#[derive(Derivative)]
1634#[derivative(Debug)]
1635pub struct Coordinator {
1636    /// The controller for the storage and compute layers.
1637    #[derivative(Debug = "ignore")]
1638    controller: mz_controller::Controller,
1639    /// The catalog in an Arc suitable for readonly references. The Arc allows
1640    /// us to hand out cheap copies of the catalog to functions that can use it
1641    /// off of the main coordinator thread. If the coordinator needs to mutate
1642    /// the catalog, call [`Self::catalog_mut`], which will clone this struct member,
1643    /// allowing it to be mutated here while the other off-thread references can
1644    /// read their catalog as long as needed. In the future we would like this
1645    /// to be a pTVC, but for now this is sufficient.
1646    catalog: Arc<Catalog>,
1647
1648    /// A client for persist. Initially, this is only used for reading stashed
1649    /// peek responses out of batches.
1650    persist_client: PersistClient,
1651
1652    /// Channel to manage internal commands from the coordinator to itself.
1653    internal_cmd_tx: mpsc::UnboundedSender<Message>,
1654    /// Notification that triggers a group commit.
1655    group_commit_tx: appends::GroupCommitNotifier,
1656
1657    /// Channel for strict serializable reads ready to commit.
1658    strict_serializable_reads_tx: mpsc::UnboundedSender<(ConnectionId, PendingReadTxn)>,
1659
1660    /// Mechanism for totally ordering write and read timestamps, so that all reads
1661    /// reflect exactly the set of writes that precede them, and no writes that follow.
1662    global_timelines: BTreeMap<Timeline, TimelineState<Timestamp>>,
1663
1664    /// A generator for transient [`GlobalId`]s, shareable with other threads.
1665    transient_id_gen: Arc<TransientIdGen>,
1666    /// A map from connection ID to metadata about that connection for all
1667    /// active connections.
1668    active_conns: BTreeMap<ConnectionId, ConnMeta>,
1669
1670    /// For each transaction, the read holds taken to support any performed reads.
1671    ///
1672    /// Upon completing a transaction, these read holds should be dropped.
1673    txn_read_holds: BTreeMap<ConnectionId, read_policy::ReadHolds<Timestamp>>,
1674
1675    /// Access to the peek fields should be restricted to methods in the [`peek`] API.
1676    /// A map from pending peek ids to the queue into which responses are sent, and
1677    /// the connection id of the client that initiated the peek.
1678    pending_peeks: BTreeMap<Uuid, PendingPeek>,
1679    /// A map from client connection ids to a set of all pending peeks for that client.
1680    client_pending_peeks: BTreeMap<ConnectionId, BTreeMap<Uuid, ClusterId>>,
1681
1682    /// A map from client connection ids to pending linearize read transaction.
1683    pending_linearize_read_txns: BTreeMap<ConnectionId, PendingReadTxn>,
1684
1685    /// A map from the compute sink ID to it's state description.
1686    active_compute_sinks: BTreeMap<GlobalId, ActiveComputeSink>,
1687    /// A map from active webhooks to their invalidation handle.
1688    active_webhooks: BTreeMap<CatalogItemId, WebhookAppenderInvalidator>,
1689    /// A map of active `COPY FROM` statements. The Coordinator waits for `clusterd`
1690    /// to stage Batches in Persist that we will then link into the shard.
1691    active_copies: BTreeMap<ConnectionId, ActiveCopyFrom>,
1692
1693    /// A map from connection ids to a watch channel that is set to `true` if the connection
1694    /// received a cancel request.
1695    staged_cancellation: BTreeMap<ConnectionId, (watch::Sender<bool>, watch::Receiver<bool>)>,
1696    /// Active introspection subscribes.
1697    introspection_subscribes: BTreeMap<GlobalId, IntrospectionSubscribe>,
1698
1699    /// Locks that grant access to a specific object, populated lazily as objects are written to.
1700    write_locks: BTreeMap<CatalogItemId, Arc<tokio::sync::Mutex<()>>>,
1701    /// Plans that are currently deferred and waiting on a write lock.
1702    deferred_write_ops: BTreeMap<ConnectionId, DeferredOp>,
1703
1704    /// Pending writes waiting for a group commit.
1705    pending_writes: Vec<PendingWriteTxn>,
1706
1707    /// For the realtime timeline, an explicit SELECT or INSERT on a table will bump the
1708    /// table's timestamps, but there are cases where timestamps are not bumped but
1709    /// we expect the closed timestamps to advance (`AS OF X`, SUBSCRIBing views over
1710    /// RT sources and tables). To address these, spawn a task that forces table
1711    /// timestamps to close on a regular interval. This roughly tracks the behavior
1712    /// of realtime sources that close off timestamps on an interval.
1713    ///
1714    /// For non-realtime timelines, nothing pushes the timestamps forward, so we must do
1715    /// it manually.
1716    advance_timelines_interval: Interval,
1717
1718    /// Serialized DDL. DDL must be serialized because:
1719    /// - Many of them do off-thread work and need to verify the catalog is in a valid state, but
1720    ///   [`PlanValidity`] does not currently support tracking all changes. Doing that correctly
1721    ///   seems to be more difficult than it's worth, so we would instead re-plan and re-sequence
1722    ///   the statements.
1723    /// - Re-planning a statement is hard because Coordinator and Session state is mutated at
1724    ///   various points, and we would need to correctly reset those changes before re-planning and
1725    ///   re-sequencing.
1726    serialized_ddl: LockedVecDeque<DeferredPlanStatement>,
1727
1728    /// Handle to secret manager that can create and delete secrets from
1729    /// an arbitrary secret storage engine.
1730    secrets_controller: Arc<dyn SecretsController>,
1731    /// A secrets reader than maintains an in-memory cache, where values have a set TTL.
1732    caching_secrets_reader: CachingSecretsReader,
1733
1734    /// Handle to a manager that can create and delete kubernetes resources
1735    /// (ie: VpcEndpoint objects)
1736    cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
1737
1738    /// Persist client for fetching storage metadata such as size metrics.
1739    storage_usage_client: StorageUsageClient,
1740    /// The interval at which to collect storage usage information.
1741    storage_usage_collection_interval: Duration,
1742
1743    /// Segment analytics client.
1744    #[derivative(Debug = "ignore")]
1745    segment_client: Option<mz_segment::Client>,
1746
1747    /// Coordinator metrics.
1748    metrics: Metrics,
1749    /// Optimizer metrics.
1750    optimizer_metrics: OptimizerMetrics,
1751
1752    /// Tracing handle.
1753    tracing_handle: TracingHandle,
1754
1755    /// Data used by the statement logging feature.
1756    statement_logging: StatementLogging,
1757
1758    /// Limit for how many concurrent webhook requests we allow.
1759    webhook_concurrency_limit: WebhookConcurrencyLimiter,
1760
1761    /// Optional config for the Postgres-backed timestamp oracle. This is
1762    /// _required_ when `postgres` is configured using the `timestamp_oracle`
1763    /// system variable.
1764    pg_timestamp_oracle_config: Option<PostgresTimestampOracleConfig>,
1765
1766    /// Periodically asks cluster scheduling policies to make their decisions.
1767    check_cluster_scheduling_policies_interval: Interval,
1768
1769    /// This keeps the last On/Off decision for each cluster and each scheduling policy.
1770    /// (Clusters that have been dropped or are otherwise out of scope for automatic scheduling are
1771    /// periodically cleaned up from this Map.)
1772    cluster_scheduling_decisions: BTreeMap<ClusterId, BTreeMap<&'static str, SchedulingDecision>>,
1773
1774    /// When doing 0dt upgrades/in read-only mode, periodically ask all known
1775    /// clusters/collections whether they are caught up.
1776    caught_up_check_interval: Interval,
1777
1778    /// Context needed to check whether all clusters/collections have caught up.
1779    /// Only used during 0dt deployment, while in read-only mode.
1780    caught_up_check: Option<CaughtUpCheckContext>,
1781
1782    /// Tracks the state associated with the currently installed watchsets.
1783    installed_watch_sets: BTreeMap<WatchSetId, (ConnectionId, WatchSetResponse)>,
1784
1785    /// Tracks the currently installed watchsets for each connection.
1786    connection_watch_sets: BTreeMap<ConnectionId, BTreeSet<WatchSetId>>,
1787
1788    /// Tracks the statuses of all cluster replicas.
1789    cluster_replica_statuses: ClusterReplicaStatuses,
1790
1791    /// Whether or not to start controllers in read-only mode. This is only
1792    /// meant for use during development of read-only clusters and 0dt upgrades
1793    /// and should go away once we have proper orchestration during upgrades.
1794    read_only_controllers: bool,
1795
1796    /// Updates to builtin tables that are being buffered while we are in
1797    /// read-only mode. We apply these all at once when coming out of read-only
1798    /// mode.
1799    ///
1800    /// This is a `Some` while in read-only mode and will be replaced by a
1801    /// `None` when we transition out of read-only mode and write out any
1802    /// buffered updates.
1803    buffered_builtin_table_updates: Option<Vec<BuiltinTableUpdate>>,
1804
1805    license_key: ValidatedLicenseKey,
1806}
1807
1808impl Coordinator {
1809    /// Initializes coordinator state based on the contained catalog. Must be
1810    /// called after creating the coordinator and before calling the
1811    /// `Coordinator::serve` method.
1812    #[instrument(name = "coord::bootstrap")]
1813    pub(crate) async fn bootstrap(
1814        &mut self,
1815        boot_ts: Timestamp,
1816        migrated_storage_collections_0dt: BTreeSet<CatalogItemId>,
1817        mut builtin_table_updates: Vec<BuiltinTableUpdate>,
1818        cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
1819        uncached_local_exprs: BTreeMap<GlobalId, LocalExpressions>,
1820        audit_logs_iterator: AuditLogIterator,
1821    ) -> Result<(), AdapterError> {
1822        let bootstrap_start = Instant::now();
1823        info!("startup: coordinator init: bootstrap beginning");
1824        info!("startup: coordinator init: bootstrap: preamble beginning");
1825
1826        // Initialize cluster replica statuses.
1827        // Gross iterator is to avoid partial borrow issues.
1828        let cluster_statuses: Vec<(_, Vec<_>)> = self
1829            .catalog()
1830            .clusters()
1831            .map(|cluster| {
1832                (
1833                    cluster.id(),
1834                    cluster
1835                        .replicas()
1836                        .map(|replica| {
1837                            (replica.replica_id, replica.config.location.num_processes())
1838                        })
1839                        .collect(),
1840                )
1841            })
1842            .collect();
1843        let now = self.now_datetime();
1844        for (cluster_id, replica_statuses) in cluster_statuses {
1845            self.cluster_replica_statuses
1846                .initialize_cluster_statuses(cluster_id);
1847            for (replica_id, num_processes) in replica_statuses {
1848                self.cluster_replica_statuses
1849                    .initialize_cluster_replica_statuses(
1850                        cluster_id,
1851                        replica_id,
1852                        num_processes,
1853                        now,
1854                    );
1855            }
1856        }
1857
1858        let system_config = self.catalog().system_config();
1859
1860        // Inform metrics about the initial system configuration.
1861        mz_metrics::update_dyncfg(&system_config.dyncfg_updates());
1862
1863        // Inform the controllers about their initial configuration.
1864        let compute_config = flags::compute_config(system_config);
1865        let storage_config = flags::storage_config(system_config);
1866        let scheduling_config = flags::orchestrator_scheduling_config(system_config);
1867        let dyncfg_updates = system_config.dyncfg_updates();
1868        self.controller.compute.update_configuration(compute_config);
1869        self.controller.storage.update_parameters(storage_config);
1870        self.controller
1871            .update_orchestrator_scheduling_config(scheduling_config);
1872        self.controller.update_configuration(dyncfg_updates);
1873
1874        let mut policies_to_set: BTreeMap<CompactionWindow, CollectionIdBundle> =
1875            Default::default();
1876
1877        let enable_worker_core_affinity =
1878            self.catalog().system_config().enable_worker_core_affinity();
1879        for instance in self.catalog.clusters() {
1880            self.controller.create_cluster(
1881                instance.id,
1882                ClusterConfig {
1883                    arranged_logs: instance.log_indexes.clone(),
1884                    workload_class: instance.config.workload_class.clone(),
1885                },
1886            )?;
1887            for replica in instance.replicas() {
1888                let role = instance.role();
1889                self.controller.create_replica(
1890                    instance.id,
1891                    replica.replica_id,
1892                    instance.name.clone(),
1893                    replica.name.clone(),
1894                    role,
1895                    replica.config.clone(),
1896                    enable_worker_core_affinity,
1897                )?;
1898            }
1899        }
1900
1901        info!(
1902            "startup: coordinator init: bootstrap: preamble complete in {:?}",
1903            bootstrap_start.elapsed()
1904        );
1905
1906        let init_storage_collections_start = Instant::now();
1907        info!("startup: coordinator init: bootstrap: storage collections init beginning");
1908        self.bootstrap_storage_collections(&migrated_storage_collections_0dt)
1909            .await;
1910        info!(
1911            "startup: coordinator init: bootstrap: storage collections init complete in {:?}",
1912            init_storage_collections_start.elapsed()
1913        );
1914
1915        // The storage controller knows about the introspection collections now, so we can start
1916        // sinking introspection updates in the compute controller. It makes sense to do that as
1917        // soon as possible, to avoid updates piling up in the compute controller's internal
1918        // buffers.
1919        self.controller.start_compute_introspection_sink();
1920
1921        let optimize_dataflows_start = Instant::now();
1922        info!("startup: coordinator init: bootstrap: optimize dataflow plans beginning");
1923        let entries: Vec<_> = self.catalog().entries().cloned().collect();
1924        let uncached_global_exps = self.bootstrap_dataflow_plans(&entries, cached_global_exprs)?;
1925        info!(
1926            "startup: coordinator init: bootstrap: optimize dataflow plans complete in {:?}",
1927            optimize_dataflows_start.elapsed()
1928        );
1929
1930        // We don't need to wait for the cache to update.
1931        let _fut = self.catalog().update_expression_cache(
1932            uncached_local_exprs.into_iter().collect(),
1933            uncached_global_exps.into_iter().collect(),
1934        );
1935
1936        // Select dataflow as-ofs. This step relies on the storage collections created by
1937        // `bootstrap_storage_collections` and the dataflow plans created by
1938        // `bootstrap_dataflow_plans`.
1939        let bootstrap_as_ofs_start = Instant::now();
1940        info!("startup: coordinator init: bootstrap: dataflow as-of bootstrapping beginning");
1941        let dataflow_read_holds = self.bootstrap_dataflow_as_ofs().await;
1942        info!(
1943            "startup: coordinator init: bootstrap: dataflow as-of bootstrapping complete in {:?}",
1944            bootstrap_as_ofs_start.elapsed()
1945        );
1946
1947        let postamble_start = Instant::now();
1948        info!("startup: coordinator init: bootstrap: postamble beginning");
1949
1950        let logs: BTreeSet<_> = BUILTINS::logs()
1951            .map(|log| self.catalog().resolve_builtin_log(log))
1952            .flat_map(|item_id| self.catalog().get_global_ids(&item_id))
1953            .collect();
1954
1955        let mut privatelink_connections = BTreeMap::new();
1956
1957        for entry in &entries {
1958            debug!(
1959                "coordinator init: installing {} {}",
1960                entry.item().typ(),
1961                entry.id()
1962            );
1963            let mut policy = entry.item().initial_logical_compaction_window();
1964            match entry.item() {
1965                // Currently catalog item rebuild assumes that sinks and
1966                // indexes are always built individually and does not store information
1967                // about how it was built. If we start building multiple sinks and/or indexes
1968                // using a single dataflow, we have to make sure the rebuild process re-runs
1969                // the same multiple-build dataflow.
1970                CatalogItem::Source(source) => {
1971                    // Propagate source compaction windows to subsources if needed.
1972                    if source.custom_logical_compaction_window.is_none() {
1973                        if let DataSourceDesc::IngestionExport { ingestion_id, .. } =
1974                            source.data_source
1975                        {
1976                            policy = Some(
1977                                self.catalog()
1978                                    .get_entry(&ingestion_id)
1979                                    .source()
1980                                    .expect("must be source")
1981                                    .custom_logical_compaction_window
1982                                    .unwrap_or_default(),
1983                            );
1984                        }
1985                    }
1986                    policies_to_set
1987                        .entry(policy.expect("sources have a compaction window"))
1988                        .or_insert_with(Default::default)
1989                        .storage_ids
1990                        .insert(source.global_id());
1991                }
1992                CatalogItem::Table(table) => {
1993                    policies_to_set
1994                        .entry(policy.expect("tables have a compaction window"))
1995                        .or_insert_with(Default::default)
1996                        .storage_ids
1997                        .extend(table.global_ids());
1998                }
1999                CatalogItem::Index(idx) => {
2000                    let policy_entry = policies_to_set
2001                        .entry(policy.expect("indexes have a compaction window"))
2002                        .or_insert_with(Default::default);
2003
2004                    if logs.contains(&idx.on) {
2005                        policy_entry
2006                            .compute_ids
2007                            .entry(idx.cluster_id)
2008                            .or_insert_with(BTreeSet::new)
2009                            .insert(idx.global_id());
2010                    } else {
2011                        let df_desc = self
2012                            .catalog()
2013                            .try_get_physical_plan(&idx.global_id())
2014                            .expect("added in `bootstrap_dataflow_plans`")
2015                            .clone();
2016
2017                        let df_meta = self
2018                            .catalog()
2019                            .try_get_dataflow_metainfo(&idx.global_id())
2020                            .expect("added in `bootstrap_dataflow_plans`");
2021
2022                        if self.catalog().state().system_config().enable_mz_notices() {
2023                            // Collect optimization hint updates.
2024                            self.catalog().state().pack_optimizer_notices(
2025                                &mut builtin_table_updates,
2026                                df_meta.optimizer_notices.iter(),
2027                                Diff::ONE,
2028                            );
2029                        }
2030
2031                        // What follows is morally equivalent to `self.ship_dataflow(df, idx.cluster_id)`,
2032                        // but we cannot call that as it will also downgrade the read hold on the index.
2033                        policy_entry
2034                            .compute_ids
2035                            .entry(idx.cluster_id)
2036                            .or_insert_with(Default::default)
2037                            .extend(df_desc.export_ids());
2038
2039                        self.controller
2040                            .compute
2041                            .create_dataflow(idx.cluster_id, df_desc, None)
2042                            .unwrap_or_terminate("cannot fail to create dataflows");
2043                    }
2044                }
2045                CatalogItem::View(_) => (),
2046                CatalogItem::MaterializedView(mview) => {
2047                    policies_to_set
2048                        .entry(policy.expect("materialized views have a compaction window"))
2049                        .or_insert_with(Default::default)
2050                        .storage_ids
2051                        .insert(mview.global_id());
2052
2053                    let mut df_desc = self
2054                        .catalog()
2055                        .try_get_physical_plan(&mview.global_id())
2056                        .expect("added in `bootstrap_dataflow_plans`")
2057                        .clone();
2058
2059                    if let Some(initial_as_of) = mview.initial_as_of.clone() {
2060                        df_desc.set_initial_as_of(initial_as_of);
2061                    }
2062
2063                    // If we have a refresh schedule that has a last refresh, then set the `until` to the last refresh.
2064                    let until = mview
2065                        .refresh_schedule
2066                        .as_ref()
2067                        .and_then(|s| s.last_refresh())
2068                        .and_then(|r| r.try_step_forward());
2069                    if let Some(until) = until {
2070                        df_desc.until.meet_assign(&Antichain::from_elem(until));
2071                    }
2072
2073                    let df_meta = self
2074                        .catalog()
2075                        .try_get_dataflow_metainfo(&mview.global_id())
2076                        .expect("added in `bootstrap_dataflow_plans`");
2077
2078                    if self.catalog().state().system_config().enable_mz_notices() {
2079                        // Collect optimization hint updates.
2080                        self.catalog().state().pack_optimizer_notices(
2081                            &mut builtin_table_updates,
2082                            df_meta.optimizer_notices.iter(),
2083                            Diff::ONE,
2084                        );
2085                    }
2086
2087                    self.ship_dataflow(df_desc, mview.cluster_id, None).await;
2088                }
2089                CatalogItem::Sink(sink) => {
2090                    policies_to_set
2091                        .entry(CompactionWindow::Default)
2092                        .or_insert_with(Default::default)
2093                        .storage_ids
2094                        .insert(sink.global_id());
2095                }
2096                CatalogItem::Connection(catalog_connection) => {
2097                    if let ConnectionDetails::AwsPrivatelink(conn) = &catalog_connection.details {
2098                        privatelink_connections.insert(
2099                            entry.id(),
2100                            VpcEndpointConfig {
2101                                aws_service_name: conn.service_name.clone(),
2102                                availability_zone_ids: conn.availability_zones.clone(),
2103                            },
2104                        );
2105                    }
2106                }
2107                CatalogItem::ContinualTask(ct) => {
2108                    policies_to_set
2109                        .entry(policy.expect("continual tasks have a compaction window"))
2110                        .or_insert_with(Default::default)
2111                        .storage_ids
2112                        .insert(ct.global_id());
2113
2114                    let mut df_desc = self
2115                        .catalog()
2116                        .try_get_physical_plan(&ct.global_id())
2117                        .expect("added in `bootstrap_dataflow_plans`")
2118                        .clone();
2119
2120                    if let Some(initial_as_of) = ct.initial_as_of.clone() {
2121                        df_desc.set_initial_as_of(initial_as_of);
2122                    }
2123
2124                    let df_meta = self
2125                        .catalog()
2126                        .try_get_dataflow_metainfo(&ct.global_id())
2127                        .expect("added in `bootstrap_dataflow_plans`");
2128
2129                    if self.catalog().state().system_config().enable_mz_notices() {
2130                        // Collect optimization hint updates.
2131                        self.catalog().state().pack_optimizer_notices(
2132                            &mut builtin_table_updates,
2133                            df_meta.optimizer_notices.iter(),
2134                            Diff::ONE,
2135                        );
2136                    }
2137
2138                    self.ship_dataflow(df_desc, ct.cluster_id, None).await;
2139                }
2140                // Nothing to do for these cases
2141                CatalogItem::Log(_)
2142                | CatalogItem::Type(_)
2143                | CatalogItem::Func(_)
2144                | CatalogItem::Secret(_) => {}
2145            }
2146        }
2147
2148        if let Some(cloud_resource_controller) = &self.cloud_resource_controller {
2149            // Clean up any extraneous VpcEndpoints that shouldn't exist.
2150            let existing_vpc_endpoints = cloud_resource_controller
2151                .list_vpc_endpoints()
2152                .await
2153                .context("list vpc endpoints")?;
2154            let existing_vpc_endpoints = BTreeSet::from_iter(existing_vpc_endpoints.into_keys());
2155            let desired_vpc_endpoints = privatelink_connections.keys().cloned().collect();
2156            let vpc_endpoints_to_remove = existing_vpc_endpoints.difference(&desired_vpc_endpoints);
2157            for id in vpc_endpoints_to_remove {
2158                cloud_resource_controller
2159                    .delete_vpc_endpoint(*id)
2160                    .await
2161                    .context("deleting extraneous vpc endpoint")?;
2162            }
2163
2164            // Ensure desired VpcEndpoints are up to date.
2165            for (id, spec) in privatelink_connections {
2166                cloud_resource_controller
2167                    .ensure_vpc_endpoint(id, spec)
2168                    .await
2169                    .context("ensuring vpc endpoint")?;
2170            }
2171        }
2172
2173        // Having installed all entries, creating all constraints, we can now drop read holds and
2174        // relax read policies.
2175        drop(dataflow_read_holds);
2176        // TODO -- Improve `initialize_read_policies` API so we can avoid calling this in a loop.
2177        for (cw, policies) in policies_to_set {
2178            self.initialize_read_policies(&policies, cw).await;
2179        }
2180
2181        // Expose mapping from T-shirt sizes to actual sizes
2182        builtin_table_updates.extend(
2183            self.catalog().state().resolve_builtin_table_updates(
2184                self.catalog().state().pack_all_replica_size_updates(),
2185            ),
2186        );
2187
2188        debug!("startup: coordinator init: bootstrap: initializing migrated builtin tables");
2189        // When 0dt is enabled, we create new shards for any migrated builtin storage collections.
2190        // In read-only mode, the migrated builtin tables (which are a subset of migrated builtin
2191        // storage collections) need to be back-filled so that any dependent dataflow can be
2192        // hydrated. Additionally, these shards are not registered with the txn-shard, and cannot
2193        // be registered while in read-only, so they are written to directly.
2194        let migrated_updates_fut = if self.controller.read_only() {
2195            let min_timestamp = Timestamp::minimum();
2196            let migrated_builtin_table_updates: Vec<_> = builtin_table_updates
2197                .extract_if(.., |update| {
2198                    let gid = self.catalog().get_entry(&update.id).latest_global_id();
2199                    migrated_storage_collections_0dt.contains(&update.id)
2200                        && self
2201                            .controller
2202                            .storage_collections
2203                            .collection_frontiers(gid)
2204                            .expect("all tables are registered")
2205                            .write_frontier
2206                            .elements()
2207                            == &[min_timestamp]
2208                })
2209                .collect();
2210            if migrated_builtin_table_updates.is_empty() {
2211                futures::future::ready(()).boxed()
2212            } else {
2213                // Group all updates per-table.
2214                let mut grouped_appends: BTreeMap<GlobalId, Vec<TableData>> = BTreeMap::new();
2215                for update in migrated_builtin_table_updates {
2216                    let gid = self.catalog().get_entry(&update.id).latest_global_id();
2217                    grouped_appends.entry(gid).or_default().push(update.data);
2218                }
2219                info!(
2220                    "coordinator init: rehydrating migrated builtin tables in read-only mode: {:?}",
2221                    grouped_appends.keys().collect::<Vec<_>>()
2222                );
2223
2224                // Consolidate Row data, staged batches must already be consolidated.
2225                let mut all_appends = Vec::with_capacity(grouped_appends.len());
2226                for (item_id, table_data) in grouped_appends.into_iter() {
2227                    let mut all_rows = Vec::new();
2228                    let mut all_data = Vec::new();
2229                    for data in table_data {
2230                        match data {
2231                            TableData::Rows(rows) => all_rows.extend(rows),
2232                            TableData::Batches(_) => all_data.push(data),
2233                        }
2234                    }
2235                    differential_dataflow::consolidation::consolidate(&mut all_rows);
2236                    all_data.push(TableData::Rows(all_rows));
2237
2238                    // TODO(parkmycar): Use SmallVec throughout.
2239                    all_appends.push((item_id, all_data));
2240                }
2241
2242                let fut = self
2243                    .controller
2244                    .storage
2245                    .append_table(min_timestamp, boot_ts.step_forward(), all_appends)
2246                    .expect("cannot fail to append");
2247                async {
2248                    fut.await
2249                        .expect("One-shot shouldn't be dropped during bootstrap")
2250                        .unwrap_or_terminate("cannot fail to append")
2251                }
2252                .boxed()
2253            }
2254        } else {
2255            futures::future::ready(()).boxed()
2256        };
2257
2258        info!(
2259            "startup: coordinator init: bootstrap: postamble complete in {:?}",
2260            postamble_start.elapsed()
2261        );
2262
2263        let builtin_update_start = Instant::now();
2264        info!("startup: coordinator init: bootstrap: generate builtin updates beginning");
2265
2266        if self.controller.read_only() {
2267            info!(
2268                "coordinator init: bootstrap: stashing builtin table updates while in read-only mode"
2269            );
2270
2271            // TODO(jkosh44) Optimize deserializing the audit log in read-only mode.
2272            let audit_join_start = Instant::now();
2273            info!("startup: coordinator init: bootstrap: audit log deserialization beginning");
2274            let audit_log_updates: Vec<_> = audit_logs_iterator
2275                .map(|(audit_log, ts)| StateUpdate {
2276                    kind: StateUpdateKind::AuditLog(audit_log),
2277                    ts,
2278                    diff: StateDiff::Addition,
2279                })
2280                .collect();
2281            let audit_log_builtin_table_updates = self
2282                .catalog()
2283                .state()
2284                .generate_builtin_table_updates(audit_log_updates);
2285            builtin_table_updates.extend(audit_log_builtin_table_updates);
2286            info!(
2287                "startup: coordinator init: bootstrap: audit log deserialization complete in {:?}",
2288                audit_join_start.elapsed()
2289            );
2290            self.buffered_builtin_table_updates
2291                .as_mut()
2292                .expect("in read-only mode")
2293                .append(&mut builtin_table_updates);
2294        } else {
2295            self.bootstrap_tables(&entries, builtin_table_updates, audit_logs_iterator)
2296                .await;
2297        };
2298        info!(
2299            "startup: coordinator init: bootstrap: generate builtin updates complete in {:?}",
2300            builtin_update_start.elapsed()
2301        );
2302
2303        let cleanup_secrets_start = Instant::now();
2304        info!("startup: coordinator init: bootstrap: generate secret cleanup beginning");
2305        // Cleanup orphaned secrets. Errors during list() or delete() do not
2306        // need to prevent bootstrap from succeeding; we will retry next
2307        // startup.
2308        {
2309            // Destructure Self so we can selectively move fields into the async
2310            // task.
2311            let Self {
2312                secrets_controller,
2313                catalog,
2314                ..
2315            } = self;
2316
2317            let next_user_item_id = catalog.get_next_user_item_id().await?;
2318            let next_system_item_id = catalog.get_next_system_item_id().await?;
2319            let read_only = self.controller.read_only();
2320            // Fetch all IDs from the catalog to future-proof against other
2321            // things using secrets. Today, SECRET and CONNECTION objects use
2322            // secrets_controller.ensure, but more things could in the future
2323            // that would be easy to miss adding here.
2324            let catalog_ids: BTreeSet<CatalogItemId> =
2325                catalog.entries().map(|entry| entry.id()).collect();
2326            let secrets_controller = Arc::clone(secrets_controller);
2327
2328            spawn(|| "cleanup-orphaned-secrets", async move {
2329                if read_only {
2330                    info!(
2331                        "coordinator init: not cleaning up orphaned secrets while in read-only mode"
2332                    );
2333                    return;
2334                }
2335                info!("coordinator init: cleaning up orphaned secrets");
2336
2337                match secrets_controller.list().await {
2338                    Ok(controller_secrets) => {
2339                        let controller_secrets: BTreeSet<CatalogItemId> =
2340                            controller_secrets.into_iter().collect();
2341                        let orphaned = controller_secrets.difference(&catalog_ids);
2342                        for id in orphaned {
2343                            let id_too_large = match id {
2344                                CatalogItemId::System(id) => *id >= next_system_item_id,
2345                                CatalogItemId::User(id) => *id >= next_user_item_id,
2346                                CatalogItemId::IntrospectionSourceIndex(_)
2347                                | CatalogItemId::Transient(_) => false,
2348                            };
2349                            if id_too_large {
2350                                info!(
2351                                    %next_user_item_id, %next_system_item_id,
2352                                    "coordinator init: not deleting orphaned secret {id} that was likely created by a newer deploy generation"
2353                                );
2354                            } else {
2355                                info!("coordinator init: deleting orphaned secret {id}");
2356                                fail_point!("orphan_secrets");
2357                                if let Err(e) = secrets_controller.delete(*id).await {
2358                                    warn!(
2359                                        "Dropping orphaned secret has encountered an error: {}",
2360                                        e
2361                                    );
2362                                }
2363                            }
2364                        }
2365                    }
2366                    Err(e) => warn!("Failed to list secrets during orphan cleanup: {:?}", e),
2367                }
2368            });
2369        }
2370        info!(
2371            "startup: coordinator init: bootstrap: generate secret cleanup complete in {:?}",
2372            cleanup_secrets_start.elapsed()
2373        );
2374
2375        // Run all of our final steps concurrently.
2376        let final_steps_start = Instant::now();
2377        info!(
2378            "startup: coordinator init: bootstrap: migrate builtin tables in read-only mode beginning"
2379        );
2380        migrated_updates_fut
2381            .instrument(info_span!("coord::bootstrap::final"))
2382            .await;
2383
2384        debug!(
2385            "startup: coordinator init: bootstrap: announcing completion of initialization to controller"
2386        );
2387        // Announce the completion of initialization.
2388        self.controller.initialization_complete();
2389
2390        // Initialize unified introspection.
2391        self.bootstrap_introspection_subscribes().await;
2392
2393        info!(
2394            "startup: coordinator init: bootstrap: migrate builtin tables in read-only mode complete in {:?}",
2395            final_steps_start.elapsed()
2396        );
2397
2398        info!(
2399            "startup: coordinator init: bootstrap complete in {:?}",
2400            bootstrap_start.elapsed()
2401        );
2402        Ok(())
2403    }
2404
2405    /// Prepares tables for writing by resetting them to a known state and
2406    /// appending the given builtin table updates. The timestamp oracle
2407    /// will be advanced to the write timestamp of the append when this
2408    /// method returns.
2409    #[allow(clippy::async_yields_async)]
2410    #[instrument]
2411    async fn bootstrap_tables(
2412        &mut self,
2413        entries: &[CatalogEntry],
2414        mut builtin_table_updates: Vec<BuiltinTableUpdate>,
2415        audit_logs_iterator: AuditLogIterator,
2416    ) {
2417        /// Smaller helper struct of metadata for bootstrapping tables.
2418        struct TableMetadata<'a> {
2419            id: CatalogItemId,
2420            name: &'a QualifiedItemName,
2421            table: &'a Table,
2422        }
2423
2424        // Filter our entries down to just tables.
2425        let table_metas: Vec<_> = entries
2426            .into_iter()
2427            .filter_map(|entry| {
2428                entry.table().map(|table| TableMetadata {
2429                    id: entry.id(),
2430                    name: entry.name(),
2431                    table,
2432                })
2433            })
2434            .collect();
2435
2436        // Append empty batches to advance the timestamp of all tables.
2437        debug!("coordinator init: advancing all tables to current timestamp");
2438        let WriteTimestamp {
2439            timestamp: write_ts,
2440            advance_to,
2441        } = self.get_local_write_ts().await;
2442        let appends = table_metas
2443            .iter()
2444            .map(|meta| (meta.table.global_id_writes(), Vec::new()))
2445            .collect();
2446        // Append the tables in the background. We apply the write timestamp before getting a read
2447        // timestamp and reading a snapshot of each table, so the snapshots will block on their own
2448        // until the appends are complete.
2449        let table_fence_rx = self
2450            .controller
2451            .storage
2452            .append_table(write_ts.clone(), advance_to, appends)
2453            .expect("invalid updates");
2454
2455        self.apply_local_write(write_ts).await;
2456
2457        // Add builtin table updates the clear the contents of all system tables
2458        debug!("coordinator init: resetting system tables");
2459        let read_ts = self.get_local_read_ts().await;
2460
2461        // Filter out the 'mz_storage_usage_by_shard' table since we need to retain that info for
2462        // billing purposes.
2463        let mz_storage_usage_by_shard_schema: SchemaSpecifier = self
2464            .catalog()
2465            .resolve_system_schema(MZ_STORAGE_USAGE_BY_SHARD.schema)
2466            .into();
2467        let is_storage_usage_by_shard = |meta: &TableMetadata| -> bool {
2468            meta.name.item == MZ_STORAGE_USAGE_BY_SHARD.name
2469                && meta.name.qualifiers.schema_spec == mz_storage_usage_by_shard_schema
2470        };
2471
2472        let mut retraction_tasks = Vec::new();
2473        let mut system_tables: Vec<_> = table_metas
2474            .iter()
2475            .filter(|meta| meta.id.is_system() && !is_storage_usage_by_shard(meta))
2476            .collect();
2477
2478        // Special case audit events because it's append only.
2479        let (audit_events_idx, _) = system_tables
2480            .iter()
2481            .find_position(|table| {
2482                table.id == self.catalog().resolve_builtin_table(&MZ_AUDIT_EVENTS)
2483            })
2484            .expect("mz_audit_events must exist");
2485        let audit_events = system_tables.remove(audit_events_idx);
2486        let audit_log_task = self.bootstrap_audit_log_table(
2487            audit_events.id,
2488            audit_events.name,
2489            audit_events.table,
2490            audit_logs_iterator,
2491            read_ts,
2492        );
2493
2494        for system_table in system_tables {
2495            let table_id = system_table.id;
2496            let full_name = self.catalog().resolve_full_name(system_table.name, None);
2497            debug!("coordinator init: resetting system table {full_name} ({table_id})");
2498
2499            // Fetch the current contents of the table for retraction.
2500            let snapshot_fut = self
2501                .controller
2502                .storage_collections
2503                .snapshot_cursor(system_table.table.global_id_writes(), read_ts);
2504            let batch_fut = self
2505                .controller
2506                .storage_collections
2507                .create_update_builder(system_table.table.global_id_writes());
2508
2509            let task = spawn(|| format!("snapshot-{table_id}"), async move {
2510                // Create a TimestamplessUpdateBuilder.
2511                let mut batch = batch_fut
2512                    .await
2513                    .unwrap_or_terminate("cannot fail to create a batch for a BuiltinTable");
2514                tracing::info!(?table_id, "starting snapshot");
2515                // Get a cursor which will emit a consolidated snapshot.
2516                let mut snapshot_cursor = snapshot_fut
2517                    .await
2518                    .unwrap_or_terminate("cannot fail to snapshot");
2519
2520                // Retract the current contents, spilling into our builder.
2521                while let Some(values) = snapshot_cursor.next().await {
2522                    for ((key, _val), _t, d) in values {
2523                        let key = key.expect("builtin table had errors");
2524                        let d_invert = d.neg();
2525                        batch.add(&key, &(), &d_invert).await;
2526                    }
2527                }
2528                tracing::info!(?table_id, "finished snapshot");
2529
2530                let batch = batch.finish().await;
2531                BuiltinTableUpdate::batch(table_id, batch)
2532            });
2533            retraction_tasks.push(task);
2534        }
2535
2536        let retractions_res = futures::future::join_all(retraction_tasks).await;
2537        for retractions in retractions_res {
2538            let retractions = retractions.expect("cannot fail to fetch snapshot");
2539            builtin_table_updates.push(retractions);
2540        }
2541
2542        let audit_join_start = Instant::now();
2543        info!("startup: coordinator init: bootstrap: join audit log deserialization beginning");
2544        let audit_log_updates = audit_log_task
2545            .await
2546            .expect("cannot fail to fetch audit log updates");
2547        let audit_log_builtin_table_updates = self
2548            .catalog()
2549            .state()
2550            .generate_builtin_table_updates(audit_log_updates);
2551        builtin_table_updates.extend(audit_log_builtin_table_updates);
2552        info!(
2553            "startup: coordinator init: bootstrap: join audit log deserialization complete in {:?}",
2554            audit_join_start.elapsed()
2555        );
2556
2557        // Now that the snapshots are complete, the appends must also be complete.
2558        table_fence_rx
2559            .await
2560            .expect("One-shot shouldn't be dropped during bootstrap")
2561            .unwrap_or_terminate("cannot fail to append");
2562
2563        info!("coordinator init: sending builtin table updates");
2564        let (_builtin_updates_fut, write_ts) = self
2565            .builtin_table_update()
2566            .execute(builtin_table_updates)
2567            .await;
2568        info!(?write_ts, "our write ts");
2569        if let Some(write_ts) = write_ts {
2570            self.apply_local_write(write_ts).await;
2571        }
2572    }
2573
2574    /// Prepare updates to the audit log table. The audit log table append only and very large, so
2575    /// we only need to find the events present in `audit_logs_iterator` but not in the audit log
2576    /// table.
2577    #[instrument]
2578    fn bootstrap_audit_log_table<'a>(
2579        &mut self,
2580        table_id: CatalogItemId,
2581        name: &'a QualifiedItemName,
2582        table: &'a Table,
2583        audit_logs_iterator: AuditLogIterator,
2584        read_ts: Timestamp,
2585    ) -> JoinHandle<Vec<StateUpdate>> {
2586        let full_name = self.catalog().resolve_full_name(name, None);
2587        debug!("coordinator init: reconciling audit log: {full_name} ({table_id})");
2588        let current_contents_fut = self
2589            .controller
2590            .storage_collections
2591            .snapshot(table.global_id_writes(), read_ts);
2592        spawn(|| format!("snapshot-audit-log-{table_id}"), async move {
2593            let current_contents = current_contents_fut
2594                .await
2595                .unwrap_or_terminate("cannot fail to fetch snapshot");
2596            let contents_len = current_contents.len();
2597            debug!("coordinator init: audit log table ({table_id}) size {contents_len}");
2598
2599            // Fetch the largest audit log event ID that has been written to the table.
2600            let max_table_id = current_contents
2601                .into_iter()
2602                .filter(|(_, diff)| *diff == 1)
2603                .map(|(row, _diff)| row.unpack_first().unwrap_uint64())
2604                .sorted()
2605                .rev()
2606                .next();
2607
2608            // Filter audit log catalog updates to those that are not present in the table.
2609            audit_logs_iterator
2610                .take_while(|(audit_log, _)| match max_table_id {
2611                    Some(id) => audit_log.event.sortable_id() > id,
2612                    None => true,
2613                })
2614                .map(|(audit_log, ts)| StateUpdate {
2615                    kind: StateUpdateKind::AuditLog(audit_log),
2616                    ts,
2617                    diff: StateDiff::Addition,
2618                })
2619                .collect::<Vec<_>>()
2620        })
2621    }
2622
2623    /// Initializes all storage collections required by catalog objects in the storage controller.
2624    ///
2625    /// This method takes care of collection creation, as well as migration of existing
2626    /// collections.
2627    ///
2628    /// Creating all storage collections in a single `create_collections` call, rather than on
2629    /// demand, is more efficient as it reduces the number of writes to durable storage. It also
2630    /// allows subsequent bootstrap logic to fetch metadata (such as frontiers) of arbitrary
2631    /// storage collections, without needing to worry about dependency order.
2632    ///
2633    /// `migrated_storage_collections` is a set of builtin storage collections that have been
2634    /// migrated and should be handled specially.
2635    #[instrument]
2636    async fn bootstrap_storage_collections(
2637        &mut self,
2638        migrated_storage_collections: &BTreeSet<CatalogItemId>,
2639    ) {
2640        let catalog = self.catalog();
2641        let source_status_collection_id = catalog
2642            .resolve_builtin_storage_collection(&mz_catalog::builtin::MZ_SOURCE_STATUS_HISTORY);
2643        let source_status_collection_id = catalog
2644            .get_entry(&source_status_collection_id)
2645            .latest_global_id();
2646
2647        let source_desc = |object_id: GlobalId,
2648                           data_source: &DataSourceDesc,
2649                           desc: &RelationDesc,
2650                           timeline: &Timeline| {
2651            let (data_source, status_collection_id) = match data_source.clone() {
2652                // Re-announce the source description.
2653                DataSourceDesc::Ingestion { desc, cluster_id } => {
2654                    let desc = desc.into_inline_connection(catalog.state());
2655                    let ingestion = IngestionDescription::new(desc, cluster_id, object_id);
2656
2657                    (
2658                        DataSource::Ingestion(ingestion),
2659                        Some(source_status_collection_id),
2660                    )
2661                }
2662                DataSourceDesc::OldSyntaxIngestion {
2663                    desc,
2664                    progress_subsource,
2665                    data_config,
2666                    details,
2667                    cluster_id,
2668                } => {
2669                    let desc = desc.into_inline_connection(catalog.state());
2670                    let data_config = data_config.into_inline_connection(catalog.state());
2671                    // TODO(parkmycar): We should probably check the type here, but I'm not sure if
2672                    // this will always be a Source or a Table.
2673                    let progress_subsource =
2674                        catalog.get_entry(&progress_subsource).latest_global_id();
2675                    let mut ingestion =
2676                        IngestionDescription::new(desc, cluster_id, progress_subsource);
2677                    let legacy_export = SourceExport {
2678                        storage_metadata: (),
2679                        data_config,
2680                        details,
2681                    };
2682                    ingestion.source_exports.insert(object_id, legacy_export);
2683
2684                    (
2685                        DataSource::Ingestion(ingestion),
2686                        Some(source_status_collection_id),
2687                    )
2688                }
2689                DataSourceDesc::IngestionExport {
2690                    ingestion_id,
2691                    external_reference: _,
2692                    details,
2693                    data_config,
2694                } => {
2695                    // TODO(parkmycar): We should probably check the type here, but I'm not sure if
2696                    // this will always be a Source or a Table.
2697                    let ingestion_id = catalog.get_entry(&ingestion_id).latest_global_id();
2698                    (
2699                        DataSource::IngestionExport {
2700                            ingestion_id,
2701                            details,
2702                            data_config: data_config.into_inline_connection(catalog.state()),
2703                        },
2704                        Some(source_status_collection_id),
2705                    )
2706                }
2707                DataSourceDesc::Webhook { .. } => {
2708                    (DataSource::Webhook, Some(source_status_collection_id))
2709                }
2710                DataSourceDesc::Progress => (DataSource::Progress, None),
2711                DataSourceDesc::Introspection(introspection) => {
2712                    (DataSource::Introspection(introspection), None)
2713                }
2714            };
2715            CollectionDescription {
2716                desc: desc.clone(),
2717                data_source,
2718                since: None,
2719                status_collection_id,
2720                timeline: Some(timeline.clone()),
2721            }
2722        };
2723
2724        let mut compute_collections = vec![];
2725        let mut collections = vec![];
2726        let mut new_builtin_continual_tasks = vec![];
2727        for entry in catalog.entries() {
2728            match entry.item() {
2729                CatalogItem::Source(source) => {
2730                    collections.push((
2731                        source.global_id(),
2732                        source_desc(
2733                            source.global_id(),
2734                            &source.data_source,
2735                            &source.desc,
2736                            &source.timeline,
2737                        ),
2738                    ));
2739                }
2740                CatalogItem::Table(table) => {
2741                    match &table.data_source {
2742                        TableDataSource::TableWrites { defaults: _ } => {
2743                            let versions: BTreeMap<_, _> = table
2744                                .collection_descs()
2745                                .map(|(gid, version, desc)| (version, (gid, desc)))
2746                                .collect();
2747                            let collection_descs = versions.iter().map(|(version, (gid, desc))| {
2748                                let next_version = version.bump();
2749                                let primary_collection =
2750                                    versions.get(&next_version).map(|(gid, _desc)| gid).copied();
2751                                let collection_desc = CollectionDescription::for_table(
2752                                    desc.clone(),
2753                                    primary_collection,
2754                                );
2755
2756                                (*gid, collection_desc)
2757                            });
2758                            collections.extend(collection_descs);
2759                        }
2760                        TableDataSource::DataSource {
2761                            desc: data_source_desc,
2762                            timeline,
2763                        } => {
2764                            // TODO(alter_table): Support versioning tables that read from sources.
2765                            soft_assert_eq_or_log!(table.collections.len(), 1);
2766                            let collection_descs =
2767                                table.collection_descs().map(|(gid, _version, desc)| {
2768                                    (
2769                                        gid,
2770                                        source_desc(
2771                                            entry.latest_global_id(),
2772                                            data_source_desc,
2773                                            &desc,
2774                                            timeline,
2775                                        ),
2776                                    )
2777                                });
2778                            collections.extend(collection_descs);
2779                        }
2780                    };
2781                }
2782                CatalogItem::MaterializedView(mv) => {
2783                    let collection_desc = CollectionDescription {
2784                        desc: mv.desc.clone(),
2785                        data_source: DataSource::Other,
2786                        since: mv.initial_as_of.clone(),
2787                        status_collection_id: None,
2788                        timeline: None,
2789                    };
2790                    compute_collections.push((mv.global_id(), mv.desc.clone()));
2791                    collections.push((mv.global_id(), collection_desc));
2792                }
2793                CatalogItem::ContinualTask(ct) => {
2794                    let collection_desc = CollectionDescription {
2795                        desc: ct.desc.clone(),
2796                        data_source: DataSource::Other,
2797                        since: ct.initial_as_of.clone(),
2798                        status_collection_id: None,
2799                        timeline: None,
2800                    };
2801                    if ct.global_id().is_system() && collection_desc.since.is_none() {
2802                        // We need a non-0 since to make as_of selection work. Fill it in below with
2803                        // the `bootstrap_builtin_continual_tasks` call, which can only be run after
2804                        // `create_collections_for_bootstrap`.
2805                        new_builtin_continual_tasks.push((ct.global_id(), collection_desc));
2806                    } else {
2807                        compute_collections.push((ct.global_id(), ct.desc.clone()));
2808                        collections.push((ct.global_id(), collection_desc));
2809                    }
2810                }
2811                CatalogItem::Sink(sink) => {
2812                    let storage_sink_from_entry = self.catalog().get_entry_by_global_id(&sink.from);
2813                    let from_desc = storage_sink_from_entry
2814                        .desc(&self.catalog().resolve_full_name(
2815                            storage_sink_from_entry.name(),
2816                            storage_sink_from_entry.conn_id(),
2817                        ))
2818                        .expect("sinks can only be built on items with descs")
2819                        .into_owned();
2820                    let collection_desc = CollectionDescription {
2821                        // TODO(sinks): make generic once we have more than one sink type.
2822                        desc: KAFKA_PROGRESS_DESC.clone(),
2823                        data_source: DataSource::Sink {
2824                            desc: ExportDescription {
2825                                sink: StorageSinkDesc {
2826                                    from: sink.from,
2827                                    from_desc,
2828                                    connection: sink
2829                                        .connection
2830                                        .clone()
2831                                        .into_inline_connection(self.catalog().state()),
2832                                    envelope: sink.envelope,
2833                                    as_of: Antichain::from_elem(Timestamp::minimum()),
2834                                    with_snapshot: sink.with_snapshot,
2835                                    version: sink.version,
2836                                    from_storage_metadata: (),
2837                                    to_storage_metadata: (),
2838                                },
2839                                instance_id: sink.cluster_id,
2840                            },
2841                        },
2842                        since: None,
2843                        status_collection_id: None,
2844                        timeline: None,
2845                    };
2846                    collections.push((sink.global_id, collection_desc));
2847                }
2848                _ => (),
2849            }
2850        }
2851
2852        let register_ts = if self.controller.read_only() {
2853            self.get_local_read_ts().await
2854        } else {
2855            // Getting a write timestamp bumps the write timestamp in the
2856            // oracle, which we're not allowed in read-only mode.
2857            self.get_local_write_ts().await.timestamp
2858        };
2859
2860        let storage_metadata = self.catalog.state().storage_metadata();
2861        let migrated_storage_collections = migrated_storage_collections
2862            .into_iter()
2863            .flat_map(|item_id| self.catalog.get_entry(item_id).global_ids())
2864            .collect();
2865
2866        // Before possibly creating collections, make sure their schemas are correct.
2867        //
2868        // Across different versions of Materialize the nullability of columns can change based on
2869        // updates to our optimizer.
2870        self.controller
2871            .storage
2872            .evolve_nullability_for_bootstrap(storage_metadata, compute_collections)
2873            .await
2874            .unwrap_or_terminate("cannot fail to evolve collections");
2875
2876        self.controller
2877            .storage
2878            .create_collections_for_bootstrap(
2879                storage_metadata,
2880                Some(register_ts),
2881                collections,
2882                &migrated_storage_collections,
2883            )
2884            .await
2885            .unwrap_or_terminate("cannot fail to create collections");
2886
2887        self.bootstrap_builtin_continual_tasks(new_builtin_continual_tasks)
2888            .await;
2889
2890        if !self.controller.read_only() {
2891            self.apply_local_write(register_ts).await;
2892        }
2893    }
2894
2895    /// Make as_of selection happy for builtin CTs. Ideally we'd write the
2896    /// initial as_of down in the durable catalog, but that's hard because of
2897    /// boot ordering. Instead, we set the since of the storage collection to
2898    /// something that's a reasonable lower bound for the as_of. Then, if the
2899    /// upper is 0, the as_of selection code will allow us to jump it forward to
2900    /// this since.
2901    async fn bootstrap_builtin_continual_tasks(
2902        &mut self,
2903        // TODO(alter_table): Switch to CatalogItemId.
2904        mut collections: Vec<(GlobalId, CollectionDescription<Timestamp>)>,
2905    ) {
2906        for (id, collection) in &mut collections {
2907            let entry = self.catalog.get_entry_by_global_id(id);
2908            let ct = match &entry.item {
2909                CatalogItem::ContinualTask(ct) => ct.clone(),
2910                _ => unreachable!("only called with continual task builtins"),
2911            };
2912            let debug_name = self
2913                .catalog()
2914                .resolve_full_name(entry.name(), None)
2915                .to_string();
2916            let (_optimized_plan, physical_plan, _metainfo) = self
2917                .optimize_create_continual_task(&ct, *id, self.owned_catalog(), debug_name)
2918                .expect("builtin CT should optimize successfully");
2919
2920            // Determine an as of for the new continual task.
2921            let mut id_bundle = dataflow_import_id_bundle(&physical_plan, ct.cluster_id);
2922            // Can't acquire a read hold on ourselves because we don't exist yet.
2923            id_bundle.storage_ids.remove(id);
2924            let read_holds = self.acquire_read_holds(&id_bundle);
2925            let as_of = read_holds.least_valid_read();
2926
2927            collection.since = Some(as_of.clone());
2928        }
2929        self.controller
2930            .storage
2931            .create_collections(self.catalog.state().storage_metadata(), None, collections)
2932            .await
2933            .unwrap_or_terminate("cannot fail to create collections");
2934    }
2935
2936    /// Invokes the optimizer on all indexes and materialized views in the catalog and inserts the
2937    /// resulting dataflow plans into the catalog state.
2938    ///
2939    /// `ordered_catalog_entries` must be sorted in dependency order, with dependencies ordered
2940    /// before their dependants.
2941    ///
2942    /// This method does not perform timestamp selection for the dataflows, nor does it create them
2943    /// in the compute controller. Both of these steps happen later during bootstrapping.
2944    ///
2945    /// Returns a map of expressions that were not cached.
2946    #[instrument]
2947    fn bootstrap_dataflow_plans(
2948        &mut self,
2949        ordered_catalog_entries: &[CatalogEntry],
2950        mut cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
2951    ) -> Result<BTreeMap<GlobalId, GlobalExpressions>, AdapterError> {
2952        // The optimizer expects to be able to query its `ComputeInstanceSnapshot` for
2953        // collections the current dataflow can depend on. But since we don't yet install anything
2954        // on compute instances, the snapshot information is incomplete. We fix that by manually
2955        // updating `ComputeInstanceSnapshot` objects to ensure they contain collections previously
2956        // optimized.
2957        let mut instance_snapshots = BTreeMap::new();
2958        let mut uncached_expressions = BTreeMap::new();
2959
2960        let optimizer_config = OptimizerConfig::from(self.catalog().system_config());
2961
2962        for entry in ordered_catalog_entries {
2963            match entry.item() {
2964                CatalogItem::Index(idx) => {
2965                    // Collect optimizer parameters.
2966                    let compute_instance =
2967                        instance_snapshots.entry(idx.cluster_id).or_insert_with(|| {
2968                            self.instance_snapshot(idx.cluster_id)
2969                                .expect("compute instance exists")
2970                        });
2971                    let global_id = idx.global_id();
2972
2973                    // The index may already be installed on the compute instance. For example,
2974                    // this is the case for introspection indexes.
2975                    if compute_instance.contains_collection(&global_id) {
2976                        continue;
2977                    }
2978
2979                    let (optimized_plan, physical_plan, metainfo) =
2980                        match cached_global_exprs.remove(&global_id) {
2981                            Some(global_expressions)
2982                                if global_expressions.optimizer_features
2983                                    == optimizer_config.features =>
2984                            {
2985                                debug!("global expression cache hit for {global_id:?}");
2986                                (
2987                                    global_expressions.global_mir,
2988                                    global_expressions.physical_plan,
2989                                    global_expressions.dataflow_metainfos,
2990                                )
2991                            }
2992                            Some(_) | None => {
2993                                let (optimized_plan, global_lir_plan) = {
2994                                    // Build an optimizer for this INDEX.
2995                                    let mut optimizer = optimize::index::Optimizer::new(
2996                                        self.owned_catalog(),
2997                                        compute_instance.clone(),
2998                                        global_id,
2999                                        optimizer_config.clone(),
3000                                        self.optimizer_metrics(),
3001                                    );
3002
3003                                    // MIR ⇒ MIR optimization (global)
3004                                    let index_plan = optimize::index::Index::new(
3005                                        entry.name().clone(),
3006                                        idx.on,
3007                                        idx.keys.to_vec(),
3008                                    );
3009                                    let global_mir_plan = optimizer.optimize(index_plan)?;
3010                                    let optimized_plan = global_mir_plan.df_desc().clone();
3011
3012                                    // MIR ⇒ LIR lowering and LIR ⇒ LIR optimization (global)
3013                                    let global_lir_plan = optimizer.optimize(global_mir_plan)?;
3014
3015                                    (optimized_plan, global_lir_plan)
3016                                };
3017
3018                                let (physical_plan, metainfo) = global_lir_plan.unapply();
3019                                let metainfo = {
3020                                    // Pre-allocate a vector of transient GlobalIds for each notice.
3021                                    let notice_ids =
3022                                        std::iter::repeat_with(|| self.allocate_transient_id())
3023                                            .map(|(_item_id, gid)| gid)
3024                                            .take(metainfo.optimizer_notices.len())
3025                                            .collect::<Vec<_>>();
3026                                    // Return a metainfo with rendered notices.
3027                                    self.catalog().render_notices(
3028                                        metainfo,
3029                                        notice_ids,
3030                                        Some(idx.global_id()),
3031                                    )
3032                                };
3033                                uncached_expressions.insert(
3034                                    global_id,
3035                                    GlobalExpressions {
3036                                        global_mir: optimized_plan.clone(),
3037                                        physical_plan: physical_plan.clone(),
3038                                        dataflow_metainfos: metainfo.clone(),
3039                                        optimizer_features: OptimizerFeatures::from(
3040                                            self.catalog().system_config(),
3041                                        ),
3042                                    },
3043                                );
3044                                (optimized_plan, physical_plan, metainfo)
3045                            }
3046                        };
3047
3048                    let catalog = self.catalog_mut();
3049                    catalog.set_optimized_plan(idx.global_id(), optimized_plan);
3050                    catalog.set_physical_plan(idx.global_id(), physical_plan);
3051                    catalog.set_dataflow_metainfo(idx.global_id(), metainfo);
3052
3053                    compute_instance.insert_collection(idx.global_id());
3054                }
3055                CatalogItem::MaterializedView(mv) => {
3056                    // Collect optimizer parameters.
3057                    let compute_instance =
3058                        instance_snapshots.entry(mv.cluster_id).or_insert_with(|| {
3059                            self.instance_snapshot(mv.cluster_id)
3060                                .expect("compute instance exists")
3061                        });
3062                    let global_id = mv.global_id();
3063
3064                    let (optimized_plan, physical_plan, metainfo) =
3065                        match cached_global_exprs.remove(&global_id) {
3066                            Some(global_expressions)
3067                                if global_expressions.optimizer_features
3068                                    == optimizer_config.features =>
3069                            {
3070                                debug!("global expression cache hit for {global_id:?}");
3071                                (
3072                                    global_expressions.global_mir,
3073                                    global_expressions.physical_plan,
3074                                    global_expressions.dataflow_metainfos,
3075                                )
3076                            }
3077                            Some(_) | None => {
3078                                let (_, internal_view_id) = self.allocate_transient_id();
3079                                let debug_name = self
3080                                    .catalog()
3081                                    .resolve_full_name(entry.name(), None)
3082                                    .to_string();
3083                                let force_non_monotonic = Default::default();
3084
3085                                let (optimized_plan, global_lir_plan) = {
3086                                    // Build an optimizer for this MATERIALIZED VIEW.
3087                                    let mut optimizer = optimize::materialized_view::Optimizer::new(
3088                                        self.owned_catalog().as_optimizer_catalog(),
3089                                        compute_instance.clone(),
3090                                        global_id,
3091                                        internal_view_id,
3092                                        mv.desc.iter_names().cloned().collect(),
3093                                        mv.non_null_assertions.clone(),
3094                                        mv.refresh_schedule.clone(),
3095                                        debug_name,
3096                                        optimizer_config.clone(),
3097                                        self.optimizer_metrics(),
3098                                        force_non_monotonic,
3099                                    );
3100
3101                                    // MIR ⇒ MIR optimization (global)
3102                                    let global_mir_plan =
3103                                        optimizer.optimize(mv.optimized_expr.as_ref().clone())?;
3104                                    let optimized_plan = global_mir_plan.df_desc().clone();
3105
3106                                    // MIR ⇒ LIR lowering and LIR ⇒ LIR optimization (global)
3107                                    let global_lir_plan = optimizer.optimize(global_mir_plan)?;
3108
3109                                    (optimized_plan, global_lir_plan)
3110                                };
3111
3112                                let (physical_plan, metainfo) = global_lir_plan.unapply();
3113                                let metainfo = {
3114                                    // Pre-allocate a vector of transient GlobalIds for each notice.
3115                                    let notice_ids =
3116                                        std::iter::repeat_with(|| self.allocate_transient_id())
3117                                            .map(|(_item_id, global_id)| global_id)
3118                                            .take(metainfo.optimizer_notices.len())
3119                                            .collect::<Vec<_>>();
3120                                    // Return a metainfo with rendered notices.
3121                                    self.catalog().render_notices(
3122                                        metainfo,
3123                                        notice_ids,
3124                                        Some(mv.global_id()),
3125                                    )
3126                                };
3127                                uncached_expressions.insert(
3128                                    global_id,
3129                                    GlobalExpressions {
3130                                        global_mir: optimized_plan.clone(),
3131                                        physical_plan: physical_plan.clone(),
3132                                        dataflow_metainfos: metainfo.clone(),
3133                                        optimizer_features: OptimizerFeatures::from(
3134                                            self.catalog().system_config(),
3135                                        ),
3136                                    },
3137                                );
3138                                (optimized_plan, physical_plan, metainfo)
3139                            }
3140                        };
3141
3142                    let catalog = self.catalog_mut();
3143                    catalog.set_optimized_plan(mv.global_id(), optimized_plan);
3144                    catalog.set_physical_plan(mv.global_id(), physical_plan);
3145                    catalog.set_dataflow_metainfo(mv.global_id(), metainfo);
3146
3147                    compute_instance.insert_collection(mv.global_id());
3148                }
3149                CatalogItem::ContinualTask(ct) => {
3150                    let compute_instance =
3151                        instance_snapshots.entry(ct.cluster_id).or_insert_with(|| {
3152                            self.instance_snapshot(ct.cluster_id)
3153                                .expect("compute instance exists")
3154                        });
3155                    let global_id = ct.global_id();
3156
3157                    let (optimized_plan, physical_plan, metainfo) =
3158                        match cached_global_exprs.remove(&global_id) {
3159                            Some(global_expressions)
3160                                if global_expressions.optimizer_features
3161                                    == optimizer_config.features =>
3162                            {
3163                                debug!("global expression cache hit for {global_id:?}");
3164                                (
3165                                    global_expressions.global_mir,
3166                                    global_expressions.physical_plan,
3167                                    global_expressions.dataflow_metainfos,
3168                                )
3169                            }
3170                            Some(_) | None => {
3171                                let debug_name = self
3172                                    .catalog()
3173                                    .resolve_full_name(entry.name(), None)
3174                                    .to_string();
3175                                let (optimized_plan, physical_plan, metainfo) = self
3176                                    .optimize_create_continual_task(
3177                                        ct,
3178                                        global_id,
3179                                        self.owned_catalog(),
3180                                        debug_name,
3181                                    )?;
3182                                uncached_expressions.insert(
3183                                    global_id,
3184                                    GlobalExpressions {
3185                                        global_mir: optimized_plan.clone(),
3186                                        physical_plan: physical_plan.clone(),
3187                                        dataflow_metainfos: metainfo.clone(),
3188                                        optimizer_features: OptimizerFeatures::from(
3189                                            self.catalog().system_config(),
3190                                        ),
3191                                    },
3192                                );
3193                                (optimized_plan, physical_plan, metainfo)
3194                            }
3195                        };
3196
3197                    let catalog = self.catalog_mut();
3198                    catalog.set_optimized_plan(ct.global_id(), optimized_plan);
3199                    catalog.set_physical_plan(ct.global_id(), physical_plan);
3200                    catalog.set_dataflow_metainfo(ct.global_id(), metainfo);
3201
3202                    compute_instance.insert_collection(ct.global_id());
3203                }
3204                _ => (),
3205            }
3206        }
3207
3208        Ok(uncached_expressions)
3209    }
3210
3211    /// Selects for each compute dataflow an as-of suitable for bootstrapping it.
3212    ///
3213    /// Returns a set of [`ReadHold`]s that ensures the read frontiers of involved collections stay
3214    /// in place and that must not be dropped before all compute dataflows have been created with
3215    /// the compute controller.
3216    ///
3217    /// This method expects all storage collections and dataflow plans to be available, so it must
3218    /// run after [`Coordinator::bootstrap_storage_collections`] and
3219    /// [`Coordinator::bootstrap_dataflow_plans`].
3220    async fn bootstrap_dataflow_as_ofs(&mut self) -> BTreeMap<GlobalId, ReadHold<Timestamp>> {
3221        let mut catalog_ids = Vec::new();
3222        let mut dataflows = Vec::new();
3223        let mut read_policies = BTreeMap::new();
3224        for entry in self.catalog.entries() {
3225            let gid = match entry.item() {
3226                CatalogItem::Index(idx) => idx.global_id(),
3227                CatalogItem::MaterializedView(mv) => mv.global_id(),
3228                CatalogItem::ContinualTask(ct) => ct.global_id(),
3229                CatalogItem::Table(_)
3230                | CatalogItem::Source(_)
3231                | CatalogItem::Log(_)
3232                | CatalogItem::View(_)
3233                | CatalogItem::Sink(_)
3234                | CatalogItem::Type(_)
3235                | CatalogItem::Func(_)
3236                | CatalogItem::Secret(_)
3237                | CatalogItem::Connection(_) => continue,
3238            };
3239            if let Some(plan) = self.catalog.try_get_physical_plan(&gid) {
3240                catalog_ids.push(gid);
3241                dataflows.push(plan.clone());
3242
3243                if let Some(compaction_window) = entry.item().initial_logical_compaction_window() {
3244                    read_policies.insert(gid, compaction_window.into());
3245                }
3246            }
3247        }
3248
3249        let read_ts = self.get_local_read_ts().await;
3250        let read_holds = as_of_selection::run(
3251            &mut dataflows,
3252            &read_policies,
3253            &*self.controller.storage_collections,
3254            read_ts,
3255            self.controller.read_only(),
3256        );
3257
3258        let catalog = self.catalog_mut();
3259        for (id, plan) in catalog_ids.into_iter().zip_eq(dataflows) {
3260            catalog.set_physical_plan(id, plan);
3261        }
3262
3263        read_holds
3264    }
3265
3266    /// Serves the coordinator, receiving commands from users over `cmd_rx`
3267    /// and feedback from dataflow workers over `feedback_rx`.
3268    ///
3269    /// You must call `bootstrap` before calling this method.
3270    ///
3271    /// BOXED FUTURE: As of Nov 2023 the returned Future from this function was 92KB. This would
3272    /// get stored on the stack which is bad for runtime performance, and blow up our stack usage.
3273    /// Because of that we purposefully move this Future onto the heap (i.e. Box it).
3274    fn serve(
3275        mut self,
3276        mut internal_cmd_rx: mpsc::UnboundedReceiver<Message>,
3277        mut strict_serializable_reads_rx: mpsc::UnboundedReceiver<(ConnectionId, PendingReadTxn)>,
3278        mut cmd_rx: mpsc::UnboundedReceiver<(OpenTelemetryContext, Command)>,
3279        group_commit_rx: appends::GroupCommitWaiter,
3280    ) -> LocalBoxFuture<'static, ()> {
3281        async move {
3282            // Watcher that listens for and reports cluster service status changes.
3283            let mut cluster_events = self.controller.events_stream();
3284            let last_message = Arc::new(Mutex::new(LastMessage {
3285                kind: "none",
3286                stmt: None,
3287            }));
3288
3289            let (idle_tx, mut idle_rx) = tokio::sync::mpsc::channel(1);
3290            let idle_metric = self.metrics.queue_busy_seconds.with_label_values(&[]);
3291            let last_message_watchdog = Arc::clone(&last_message);
3292
3293            spawn(|| "coord watchdog", async move {
3294                // Every 5 seconds, attempt to measure how long it takes for the
3295                // coord select loop to be empty, because this message is the last
3296                // processed. If it is idle, this will result in some microseconds
3297                // of measurement.
3298                let mut interval = tokio::time::interval(Duration::from_secs(5));
3299                // If we end up having to wait more than 5 seconds for the coord to respond, then the
3300                // behavior of Delay results in the interval "restarting" from whenever we yield
3301                // instead of trying to catch up.
3302                interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
3303
3304                // Track if we become stuck to de-dupe error reporting.
3305                let mut coord_stuck = false;
3306
3307                loop {
3308                    interval.tick().await;
3309
3310                    // Wait for space in the channel, if we timeout then the coordinator is stuck!
3311                    let duration = tokio::time::Duration::from_secs(30);
3312                    let timeout = tokio::time::timeout(duration, idle_tx.reserve()).await;
3313                    let Ok(maybe_permit) = timeout else {
3314                        // Only log if we're newly stuck, to prevent logging repeatedly.
3315                        if !coord_stuck {
3316                            let last_message = last_message_watchdog.lock().expect("poisoned");
3317                            tracing::warn!(
3318                                last_message_kind = %last_message.kind,
3319                                last_message_sql = %last_message.stmt_to_string(),
3320                                "coordinator stuck for {duration:?}",
3321                            );
3322                        }
3323                        coord_stuck = true;
3324
3325                        continue;
3326                    };
3327
3328                    // We got a permit, we're not stuck!
3329                    if coord_stuck {
3330                        tracing::info!("Coordinator became unstuck");
3331                    }
3332                    coord_stuck = false;
3333
3334                    // If we failed to acquire a permit it's because we're shutting down.
3335                    let Ok(permit) = maybe_permit else {
3336                        break;
3337                    };
3338
3339                    permit.send(idle_metric.start_timer());
3340                }
3341            });
3342
3343            self.schedule_storage_usage_collection().await;
3344            self.spawn_privatelink_vpc_endpoints_watch_task();
3345            self.spawn_statement_logging_task();
3346            flags::tracing_config(self.catalog.system_config()).apply(&self.tracing_handle);
3347
3348            // Report if the handling of a single message takes longer than this threshold.
3349            let warn_threshold = self
3350                .catalog()
3351                .system_config()
3352                .coord_slow_message_warn_threshold();
3353
3354            // How many messages we'd like to batch up before processing them. Must be > 0.
3355            const MESSAGE_BATCH: usize = 64;
3356            let mut messages = Vec::with_capacity(MESSAGE_BATCH);
3357            let mut cmd_messages = Vec::with_capacity(MESSAGE_BATCH);
3358
3359            let message_batch = self.metrics
3360                .message_batch
3361                .with_label_values(&[]);
3362
3363            loop {
3364                // Before adding a branch to this select loop, please ensure that the branch is
3365                // cancellation safe and add a comment explaining why. You can refer here for more
3366                // info: https://docs.rs/tokio/latest/tokio/macro.select.html#cancellation-safety
3367                select! {
3368                    // We prioritize internal commands over other commands. However, we work through
3369                    // batches of commands in some branches of this select, which means that even if
3370                    // a command generates internal commands, we will work through the current batch
3371                    // before receiving a new batch of commands.
3372                    biased;
3373
3374                    // `recv_many()` on `UnboundedReceiver` is cancellation safe:
3375                    // https://docs.rs/tokio/1.38.0/tokio/sync/mpsc/struct.UnboundedReceiver.html#cancel-safety-1
3376                    // Receive a batch of commands.
3377                    _ = internal_cmd_rx.recv_many(&mut messages, MESSAGE_BATCH) => {},
3378                    // `next()` on any stream is cancel-safe:
3379                    // https://docs.rs/tokio-stream/0.1.9/tokio_stream/trait.StreamExt.html#cancel-safety
3380                    // Receive a single command.
3381                    Some(event) = cluster_events.next() => messages.push(Message::ClusterEvent(event)),
3382                    // See [`mz_controller::Controller::Controller::ready`] for notes
3383                    // on why this is cancel-safe.
3384                    // Receive a single command.
3385                    () = self.controller.ready() => {
3386                        // NOTE: We don't get a `Readiness` back from `ready()`
3387                        // because the controller wants to keep it and it's not
3388                        // trivially `Clone` or `Copy`. Hence this accessor.
3389                        let controller = match self.controller.get_readiness() {
3390                            Readiness::Storage => ControllerReadiness::Storage,
3391                            Readiness::Compute => ControllerReadiness::Compute,
3392                            Readiness::Metrics(_) => ControllerReadiness::Metrics,
3393                            Readiness::Internal(_) => ControllerReadiness::Internal,
3394                            Readiness::NotReady => unreachable!("just signaled as ready"),
3395                        };
3396                        messages.push(Message::ControllerReady { controller });
3397                    }
3398                    // See [`appends::GroupCommitWaiter`] for notes on why this is cancel safe.
3399                    // Receive a single command.
3400                    permit = group_commit_rx.ready() => {
3401                        // If we happen to have batched exactly one user write, use
3402                        // that span so the `emit_trace_id_notice` hooks up.
3403                        // Otherwise, the best we can do is invent a new root span
3404                        // and make it follow from all the Spans in the pending
3405                        // writes.
3406                        let user_write_spans = self.pending_writes.iter().flat_map(|x| match x {
3407                            PendingWriteTxn::User{span, ..} => Some(span),
3408                            PendingWriteTxn::System{..} => None,
3409                        });
3410                        let span = match user_write_spans.exactly_one() {
3411                            Ok(span) => span.clone(),
3412                            Err(user_write_spans) => {
3413                                let span = info_span!(parent: None, "group_commit_notify");
3414                                for s in user_write_spans {
3415                                    span.follows_from(s);
3416                                }
3417                                span
3418                            }
3419                        };
3420                        messages.push(Message::GroupCommitInitiate(span, Some(permit)));
3421                    },
3422                    // `recv_many()` on `UnboundedReceiver` is cancellation safe:
3423                    // https://docs.rs/tokio/1.38.0/tokio/sync/mpsc/struct.UnboundedReceiver.html#cancel-safety-1
3424                    // Receive a batch of commands.
3425                    count = cmd_rx.recv_many(&mut cmd_messages, MESSAGE_BATCH) => {
3426                        if count == 0 {
3427                            break;
3428                        } else {
3429                            messages.extend(cmd_messages.drain(..).map(|(otel_ctx, cmd)| Message::Command(otel_ctx, cmd)));
3430                        }
3431                    },
3432                    // `recv()` on `UnboundedReceiver` is cancellation safe:
3433                    // https://docs.rs/tokio/1.38.0/tokio/sync/mpsc/struct.UnboundedReceiver.html#cancel-safety
3434                    // Receive a single command.
3435                    Some(pending_read_txn) = strict_serializable_reads_rx.recv() => {
3436                        let mut pending_read_txns = vec![pending_read_txn];
3437                        while let Ok(pending_read_txn) = strict_serializable_reads_rx.try_recv() {
3438                            pending_read_txns.push(pending_read_txn);
3439                        }
3440                        for (conn_id, pending_read_txn) in pending_read_txns {
3441                            let prev = self.pending_linearize_read_txns.insert(conn_id, pending_read_txn);
3442                            soft_assert_or_log!(
3443                                prev.is_none(),
3444                                "connections can not have multiple concurrent reads, prev: {prev:?}"
3445                            )
3446                        }
3447                        messages.push(Message::LinearizeReads);
3448                    }
3449                    // `tick()` on `Interval` is cancel-safe:
3450                    // https://docs.rs/tokio/1.19.2/tokio/time/struct.Interval.html#cancel-safety
3451                    // Receive a single command.
3452                    _ = self.advance_timelines_interval.tick() => {
3453                        let span = info_span!(parent: None, "coord::advance_timelines_interval");
3454                        span.follows_from(Span::current());
3455
3456                        // Group commit sends an `AdvanceTimelines` message when
3457                        // done, which is what downgrades read holds. In
3458                        // read-only mode we send this message directly because
3459                        // we're not doing group commits.
3460                        if self.controller.read_only() {
3461                            messages.push(Message::AdvanceTimelines);
3462                        } else {
3463                            messages.push(Message::GroupCommitInitiate(span, None));
3464                        }
3465                    },
3466                    // `tick()` on `Interval` is cancel-safe:
3467                    // https://docs.rs/tokio/1.19.2/tokio/time/struct.Interval.html#cancel-safety
3468                    // Receive a single command.
3469                    _ = self.check_cluster_scheduling_policies_interval.tick() => {
3470                        messages.push(Message::CheckSchedulingPolicies);
3471                    },
3472
3473                    // `tick()` on `Interval` is cancel-safe:
3474                    // https://docs.rs/tokio/1.19.2/tokio/time/struct.Interval.html#cancel-safety
3475                    // Receive a single command.
3476                    _ = self.caught_up_check_interval.tick() => {
3477                        // We do this directly on the main loop instead of
3478                        // firing off a message. We are still in read-only mode,
3479                        // so optimizing for latency, not blocking the main loop
3480                        // is not that important.
3481                        self.maybe_check_caught_up().await;
3482
3483                        continue;
3484                    },
3485
3486                    // Process the idle metric at the lowest priority to sample queue non-idle time.
3487                    // `recv()` on `Receiver` is cancellation safe:
3488                    // https://docs.rs/tokio/1.8.0/tokio/sync/mpsc/struct.Receiver.html#cancel-safety
3489                    // Receive a single command.
3490                    timer = idle_rx.recv() => {
3491                        timer.expect("does not drop").observe_duration();
3492                        self.metrics
3493                            .message_handling
3494                            .with_label_values(&["watchdog"])
3495                            .observe(0.0);
3496                        continue;
3497                    }
3498                };
3499
3500                // Observe the number of messages we're processing at once.
3501                message_batch.observe(f64::cast_lossy(messages.len()));
3502
3503                for msg in messages.drain(..) {
3504                    // All message processing functions trace. Start a parent span
3505                    // for them to make it easy to find slow messages.
3506                    let msg_kind = msg.kind();
3507                    let span = span!(
3508                        target: "mz_adapter::coord::handle_message_loop",
3509                        Level::INFO,
3510                        "coord::handle_message",
3511                        kind = msg_kind
3512                    );
3513                    let otel_context = span.context().span().span_context().clone();
3514
3515                    // Record the last kind of message in case we get stuck. For
3516                    // execute commands, we additionally stash the user's SQL,
3517                    // statement, so we can log it in case we get stuck.
3518                    *last_message.lock().expect("poisoned") = LastMessage {
3519                        kind: msg_kind,
3520                        stmt: match &msg {
3521                            Message::Command(
3522                                _,
3523                                Command::Execute {
3524                                    portal_name,
3525                                    session,
3526                                    ..
3527                                },
3528                            ) => session
3529                                .get_portal_unverified(portal_name)
3530                                .and_then(|p| p.stmt.as_ref().map(Arc::clone)),
3531                            _ => None,
3532                        },
3533                    };
3534
3535                    let start = Instant::now();
3536                    self.handle_message(msg).instrument(span).await;
3537                    let duration = start.elapsed();
3538
3539                    self.metrics
3540                        .message_handling
3541                        .with_label_values(&[msg_kind])
3542                        .observe(duration.as_secs_f64());
3543
3544                    // If something is _really_ slow, print a trace id for debugging, if OTEL is enabled.
3545                    if duration > warn_threshold {
3546                        let trace_id = otel_context.is_valid().then(|| otel_context.trace_id());
3547                        tracing::error!(
3548                            ?msg_kind,
3549                            ?trace_id,
3550                            ?duration,
3551                            "very slow coordinator message"
3552                        );
3553                    }
3554                }
3555            }
3556            // Try and cleanup as a best effort. There may be some async tasks out there holding a
3557            // reference that prevents us from cleaning up.
3558            if let Some(catalog) = Arc::into_inner(self.catalog) {
3559                catalog.expire().await;
3560            }
3561        }
3562        .boxed_local()
3563    }
3564
3565    /// Obtain a read-only Catalog reference.
3566    fn catalog(&self) -> &Catalog {
3567        &self.catalog
3568    }
3569
3570    /// Obtain a read-only Catalog snapshot, suitable for giving out to
3571    /// non-Coordinator thread tasks.
3572    fn owned_catalog(&self) -> Arc<Catalog> {
3573        Arc::clone(&self.catalog)
3574    }
3575
3576    /// Obtain a handle to the optimizer metrics, suitable for giving
3577    /// out to non-Coordinator thread tasks.
3578    fn optimizer_metrics(&self) -> OptimizerMetrics {
3579        self.optimizer_metrics.clone()
3580    }
3581
3582    /// Obtain a writeable Catalog reference.
3583    fn catalog_mut(&mut self) -> &mut Catalog {
3584        // make_mut will cause any other Arc references (from owned_catalog) to
3585        // continue to be valid by cloning the catalog, putting it in a new Arc,
3586        // which lives at self._catalog. If there are no other Arc references,
3587        // then no clone is made, and it returns a reference to the existing
3588        // object. This makes this method and owned_catalog both very cheap: at
3589        // most one clone per catalog mutation, but only if there's a read-only
3590        // reference to it.
3591        Arc::make_mut(&mut self.catalog)
3592    }
3593
3594    /// Obtain a reference to the coordinator's connection context.
3595    fn connection_context(&self) -> &ConnectionContext {
3596        self.controller.connection_context()
3597    }
3598
3599    /// Obtain a reference to the coordinator's secret reader, in an `Arc`.
3600    fn secrets_reader(&self) -> &Arc<dyn SecretsReader> {
3601        &self.connection_context().secrets_reader
3602    }
3603
3604    /// Publishes a notice message to all sessions.
3605    ///
3606    /// TODO(parkmycar): This code is dead, but is a nice parallel to [`Coordinator::broadcast_notice_tx`]
3607    /// so we keep it around.
3608    #[allow(dead_code)]
3609    pub(crate) fn broadcast_notice(&self, notice: AdapterNotice) {
3610        for meta in self.active_conns.values() {
3611            let _ = meta.notice_tx.send(notice.clone());
3612        }
3613    }
3614
3615    /// Returns a closure that will publish a notice to all sessions that were active at the time
3616    /// this method was called.
3617    pub(crate) fn broadcast_notice_tx(
3618        &self,
3619    ) -> Box<dyn FnOnce(AdapterNotice) -> () + Send + 'static> {
3620        let senders: Vec<_> = self
3621            .active_conns
3622            .values()
3623            .map(|meta| meta.notice_tx.clone())
3624            .collect();
3625        Box::new(move |notice| {
3626            for tx in senders {
3627                let _ = tx.send(notice.clone());
3628            }
3629        })
3630    }
3631
3632    pub(crate) fn active_conns(&self) -> &BTreeMap<ConnectionId, ConnMeta> {
3633        &self.active_conns
3634    }
3635
3636    #[instrument(level = "debug")]
3637    pub(crate) fn retire_execution(
3638        &mut self,
3639        reason: StatementEndedExecutionReason,
3640        ctx_extra: ExecuteContextExtra,
3641    ) {
3642        if let Some(uuid) = ctx_extra.retire() {
3643            self.end_statement_execution(uuid, reason);
3644        }
3645    }
3646
3647    /// Creates a new dataflow builder from the catalog and indexes in `self`.
3648    #[instrument(level = "debug")]
3649    pub fn dataflow_builder(&self, instance: ComputeInstanceId) -> DataflowBuilder<'_> {
3650        let compute = self
3651            .instance_snapshot(instance)
3652            .expect("compute instance does not exist");
3653        DataflowBuilder::new(self.catalog().state(), compute)
3654    }
3655
3656    /// Return a reference-less snapshot to the indicated compute instance.
3657    pub fn instance_snapshot(
3658        &self,
3659        id: ComputeInstanceId,
3660    ) -> Result<ComputeInstanceSnapshot, InstanceMissing> {
3661        ComputeInstanceSnapshot::new(&self.controller, id)
3662    }
3663
3664    /// Call into the compute controller to install a finalized dataflow, and
3665    /// initialize the read policies for its exported readable objects.
3666    pub(crate) async fn ship_dataflow(
3667        &mut self,
3668        dataflow: DataflowDescription<Plan>,
3669        instance: ComputeInstanceId,
3670        subscribe_target_replica: Option<ReplicaId>,
3671    ) {
3672        // We must only install read policies for indexes, not for sinks.
3673        // Sinks are write-only compute collections that don't have read policies.
3674        let export_ids = dataflow.exported_index_ids().collect();
3675
3676        self.controller
3677            .compute
3678            .create_dataflow(instance, dataflow, subscribe_target_replica)
3679            .unwrap_or_terminate("dataflow creation cannot fail");
3680
3681        self.initialize_compute_read_policies(export_ids, instance, CompactionWindow::Default)
3682            .await;
3683    }
3684
3685    /// Like `ship_dataflow`, but also await on builtin table updates.
3686    pub(crate) async fn ship_dataflow_and_notice_builtin_table_updates(
3687        &mut self,
3688        dataflow: DataflowDescription<Plan>,
3689        instance: ComputeInstanceId,
3690        notice_builtin_updates_fut: Option<BuiltinTableAppendNotify>,
3691    ) {
3692        if let Some(notice_builtin_updates_fut) = notice_builtin_updates_fut {
3693            let ship_dataflow_fut = self.ship_dataflow(dataflow, instance, None);
3694            let ((), ()) =
3695                futures::future::join(notice_builtin_updates_fut, ship_dataflow_fut).await;
3696        } else {
3697            self.ship_dataflow(dataflow, instance, None).await;
3698        }
3699    }
3700
3701    /// Install a _watch set_ in the controller that is automatically associated with the given
3702    /// connection id. The watchset will be automatically cleared if the connection terminates
3703    /// before the watchset completes.
3704    pub fn install_compute_watch_set(
3705        &mut self,
3706        conn_id: ConnectionId,
3707        objects: BTreeSet<GlobalId>,
3708        t: Timestamp,
3709        state: WatchSetResponse,
3710    ) {
3711        let ws_id = self.controller.install_compute_watch_set(objects, t);
3712        self.connection_watch_sets
3713            .entry(conn_id.clone())
3714            .or_default()
3715            .insert(ws_id);
3716        self.installed_watch_sets.insert(ws_id, (conn_id, state));
3717    }
3718
3719    /// Install a _watch set_ in the controller that is automatically associated with the given
3720    /// connection id. The watchset will be automatically cleared if the connection terminates
3721    /// before the watchset completes.
3722    pub fn install_storage_watch_set(
3723        &mut self,
3724        conn_id: ConnectionId,
3725        objects: BTreeSet<GlobalId>,
3726        t: Timestamp,
3727        state: WatchSetResponse,
3728    ) {
3729        let ws_id = self.controller.install_storage_watch_set(objects, t);
3730        self.connection_watch_sets
3731            .entry(conn_id.clone())
3732            .or_default()
3733            .insert(ws_id);
3734        self.installed_watch_sets.insert(ws_id, (conn_id, state));
3735    }
3736
3737    /// Cancels pending watchsets associated with the provided connection id.
3738    pub fn cancel_pending_watchsets(&mut self, conn_id: &ConnectionId) {
3739        if let Some(ws_ids) = self.connection_watch_sets.remove(conn_id) {
3740            for ws_id in ws_ids {
3741                self.installed_watch_sets.remove(&ws_id);
3742            }
3743        }
3744    }
3745
3746    /// Returns the state of the [`Coordinator`] formatted as JSON.
3747    ///
3748    /// The returned value is not guaranteed to be stable and may change at any point in time.
3749    pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
3750        // Note: We purposefully use the `Debug` formatting for the value of all fields in the
3751        // returned object as a tradeoff between usability and stability. `serde_json` will fail
3752        // to serialize an object if the keys aren't strings, so `Debug` formatting the values
3753        // prevents a future unrelated change from silently breaking this method.
3754
3755        let global_timelines: BTreeMap<_, _> = self
3756            .global_timelines
3757            .iter()
3758            .map(|(timeline, state)| (timeline.to_string(), format!("{state:?}")))
3759            .collect();
3760        let active_conns: BTreeMap<_, _> = self
3761            .active_conns
3762            .iter()
3763            .map(|(id, meta)| (id.unhandled().to_string(), format!("{meta:?}")))
3764            .collect();
3765        let txn_read_holds: BTreeMap<_, _> = self
3766            .txn_read_holds
3767            .iter()
3768            .map(|(id, capability)| (id.unhandled().to_string(), format!("{capability:?}")))
3769            .collect();
3770        let pending_peeks: BTreeMap<_, _> = self
3771            .pending_peeks
3772            .iter()
3773            .map(|(id, peek)| (id.to_string(), format!("{peek:?}")))
3774            .collect();
3775        let client_pending_peeks: BTreeMap<_, _> = self
3776            .client_pending_peeks
3777            .iter()
3778            .map(|(id, peek)| {
3779                let peek: BTreeMap<_, _> = peek
3780                    .iter()
3781                    .map(|(uuid, storage_id)| (uuid.to_string(), storage_id))
3782                    .collect();
3783                (id.to_string(), peek)
3784            })
3785            .collect();
3786        let pending_linearize_read_txns: BTreeMap<_, _> = self
3787            .pending_linearize_read_txns
3788            .iter()
3789            .map(|(id, read_txn)| (id.unhandled().to_string(), format!("{read_txn:?}")))
3790            .collect();
3791
3792        let map = serde_json::Map::from_iter([
3793            (
3794                "global_timelines".to_string(),
3795                serde_json::to_value(global_timelines)?,
3796            ),
3797            (
3798                "active_conns".to_string(),
3799                serde_json::to_value(active_conns)?,
3800            ),
3801            (
3802                "txn_read_holds".to_string(),
3803                serde_json::to_value(txn_read_holds)?,
3804            ),
3805            (
3806                "pending_peeks".to_string(),
3807                serde_json::to_value(pending_peeks)?,
3808            ),
3809            (
3810                "client_pending_peeks".to_string(),
3811                serde_json::to_value(client_pending_peeks)?,
3812            ),
3813            (
3814                "pending_linearize_read_txns".to_string(),
3815                serde_json::to_value(pending_linearize_read_txns)?,
3816            ),
3817            ("controller".to_string(), self.controller.dump().await?),
3818        ]);
3819        Ok(serde_json::Value::Object(map))
3820    }
3821
3822    /// Prune all storage usage events from the [`MZ_STORAGE_USAGE_BY_SHARD`] table that are older
3823    /// than `retention_period`.
3824    ///
3825    /// This method will read the entire contents of [`MZ_STORAGE_USAGE_BY_SHARD`] into memory
3826    /// which can be expensive.
3827    ///
3828    /// DO NOT call this method outside of startup. The safety of reading at the current oracle read
3829    /// timestamp and then writing at whatever the current write timestamp is (instead of
3830    /// `read_ts + 1`) relies on the fact that there are no outstanding writes during startup.
3831    ///
3832    /// Group commit, which this method uses to write the retractions, has builtin fencing, and we
3833    /// never commit retractions to [`MZ_STORAGE_USAGE_BY_SHARD`] outside of this method, which is
3834    /// only called once during startup. So we don't have to worry about double/invalid retractions.
3835    async fn prune_storage_usage_events_on_startup(&self, retention_period: Duration) {
3836        let item_id = self
3837            .catalog()
3838            .resolve_builtin_table(&MZ_STORAGE_USAGE_BY_SHARD);
3839        let global_id = self.catalog.get_entry(&item_id).latest_global_id();
3840        let read_ts = self.get_local_read_ts().await;
3841        let current_contents_fut = self
3842            .controller
3843            .storage_collections
3844            .snapshot(global_id, read_ts);
3845        let internal_cmd_tx = self.internal_cmd_tx.clone();
3846        spawn(|| "storage_usage_prune", async move {
3847            let mut current_contents = current_contents_fut
3848                .await
3849                .unwrap_or_terminate("cannot fail to fetch snapshot");
3850            differential_dataflow::consolidation::consolidate(&mut current_contents);
3851
3852            let cutoff_ts = u128::from(read_ts).saturating_sub(retention_period.as_millis());
3853            let mut expired = Vec::new();
3854            for (row, diff) in current_contents {
3855                assert_eq!(
3856                    diff, 1,
3857                    "consolidated contents should not contain retractions: ({row:#?}, {diff:#?})"
3858                );
3859                // This logic relies on the definition of `mz_storage_usage_by_shard` not changing.
3860                let collection_timestamp = row
3861                    .unpack()
3862                    .get(3)
3863                    .expect("definition of mz_storage_by_shard changed")
3864                    .unwrap_timestamptz();
3865                let collection_timestamp = collection_timestamp.timestamp_millis();
3866                let collection_timestamp: u128 = collection_timestamp
3867                    .try_into()
3868                    .expect("all collections happen after Jan 1 1970");
3869                if collection_timestamp < cutoff_ts {
3870                    debug!("pruning storage event {row:?}");
3871                    let builtin_update = BuiltinTableUpdate::row(item_id, row, Diff::MINUS_ONE);
3872                    expired.push(builtin_update);
3873                }
3874            }
3875
3876            // main thread has shut down.
3877            let _ = internal_cmd_tx.send(Message::StorageUsagePrune(expired));
3878        });
3879    }
3880}
3881
3882#[cfg(test)]
3883impl Coordinator {
3884    #[allow(dead_code)]
3885    async fn verify_ship_dataflow_no_error(&mut self, dataflow: DataflowDescription<Plan>) {
3886        // `ship_dataflow_new` is not allowed to have a `Result` return because this function is
3887        // called after `catalog_transact`, after which no errors are allowed. This test exists to
3888        // prevent us from incorrectly teaching those functions how to return errors (which has
3889        // happened twice and is the motivation for this test).
3890
3891        // An arbitrary compute instance ID to satisfy the function calls below. Note that
3892        // this only works because this function will never run.
3893        let compute_instance = ComputeInstanceId::user(1).expect("1 is a valid ID");
3894
3895        let _: () = self.ship_dataflow(dataflow, compute_instance, None).await;
3896    }
3897}
3898
3899/// Contains information about the last message the [`Coordinator`] processed.
3900struct LastMessage {
3901    kind: &'static str,
3902    stmt: Option<Arc<Statement<Raw>>>,
3903}
3904
3905impl LastMessage {
3906    /// Returns a redacted version of the statement that is safe for logs.
3907    fn stmt_to_string(&self) -> Cow<'static, str> {
3908        self.stmt
3909            .as_ref()
3910            .map(|stmt| stmt.to_ast_string_redacted().into())
3911            .unwrap_or(Cow::Borrowed("<none>"))
3912    }
3913}
3914
3915impl fmt::Debug for LastMessage {
3916    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3917        f.debug_struct("LastMessage")
3918            .field("kind", &self.kind)
3919            .field("stmt", &self.stmt_to_string())
3920            .finish()
3921    }
3922}
3923
3924impl Drop for LastMessage {
3925    fn drop(&mut self) {
3926        // Only print the last message if we're currently panicking, otherwise we'd spam our logs.
3927        if std::thread::panicking() {
3928            // If we're panicking theres no guarantee `tracing` still works, so print to stderr.
3929            eprintln!("Coordinator panicking, dumping last message\n{self:?}",);
3930        }
3931    }
3932}
3933
3934/// Serves the coordinator based on the provided configuration.
3935///
3936/// For a high-level description of the coordinator, see the [crate
3937/// documentation](crate).
3938///
3939/// Returns a handle to the coordinator and a client to communicate with the
3940/// coordinator.
3941///
3942/// BOXED FUTURE: As of Nov 2023 the returned Future from this function was 42KB. This would
3943/// get stored on the stack which is bad for runtime performance, and blow up our stack usage.
3944/// Because of that we purposefully move this Future onto the heap (i.e. Box it).
3945pub fn serve(
3946    Config {
3947        controller_config,
3948        controller_envd_epoch,
3949        mut storage,
3950        audit_logs_iterator,
3951        timestamp_oracle_url,
3952        unsafe_mode,
3953        all_features,
3954        build_info,
3955        environment_id,
3956        metrics_registry,
3957        now,
3958        secrets_controller,
3959        cloud_resource_controller,
3960        cluster_replica_sizes,
3961        builtin_system_cluster_config,
3962        builtin_catalog_server_cluster_config,
3963        builtin_probe_cluster_config,
3964        builtin_support_cluster_config,
3965        builtin_analytics_cluster_config,
3966        system_parameter_defaults,
3967        availability_zones,
3968        storage_usage_client,
3969        storage_usage_collection_interval,
3970        storage_usage_retention_period,
3971        segment_client,
3972        egress_addresses,
3973        aws_account_id,
3974        aws_privatelink_availability_zones,
3975        connection_context,
3976        connection_limit_callback,
3977        remote_system_parameters,
3978        webhook_concurrency_limit,
3979        http_host_name,
3980        tracing_handle,
3981        read_only_controllers,
3982        caught_up_trigger: clusters_caught_up_trigger,
3983        helm_chart_version,
3984        license_key,
3985        external_login_password_mz_system,
3986    }: Config,
3987) -> BoxFuture<'static, Result<(Handle, Client), AdapterError>> {
3988    async move {
3989        let coord_start = Instant::now();
3990        info!("startup: coordinator init: beginning");
3991        info!("startup: coordinator init: preamble beginning");
3992
3993        // Initializing the builtins can be an expensive process and consume a lot of memory. We
3994        // forcibly initialize it early while the stack is relatively empty to avoid stack
3995        // overflows later.
3996        let _builtins = LazyLock::force(&BUILTINS_STATIC);
3997
3998        let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
3999        let (internal_cmd_tx, internal_cmd_rx) = mpsc::unbounded_channel();
4000        let (strict_serializable_reads_tx, strict_serializable_reads_rx) =
4001            mpsc::unbounded_channel();
4002
4003        // Validate and process availability zones.
4004        if !availability_zones.iter().all_unique() {
4005            coord_bail!("availability zones must be unique");
4006        }
4007
4008        let aws_principal_context = match (
4009            aws_account_id,
4010            connection_context.aws_external_id_prefix.clone(),
4011        ) {
4012            (Some(aws_account_id), Some(aws_external_id_prefix)) => Some(AwsPrincipalContext {
4013                aws_account_id,
4014                aws_external_id_prefix,
4015            }),
4016            _ => None,
4017        };
4018
4019        let aws_privatelink_availability_zones = aws_privatelink_availability_zones
4020            .map(|azs_vec| BTreeSet::from_iter(azs_vec.iter().cloned()));
4021
4022        info!(
4023            "startup: coordinator init: preamble complete in {:?}",
4024            coord_start.elapsed()
4025        );
4026        let oracle_init_start = Instant::now();
4027        info!("startup: coordinator init: timestamp oracle init beginning");
4028
4029        let pg_timestamp_oracle_config = timestamp_oracle_url
4030            .map(|pg_url| PostgresTimestampOracleConfig::new(&pg_url, &metrics_registry));
4031        let mut initial_timestamps =
4032            get_initial_oracle_timestamps(&pg_timestamp_oracle_config).await?;
4033
4034        // Insert an entry for the `EpochMilliseconds` timeline if one doesn't exist,
4035        // which will ensure that the timeline is initialized since it's required
4036        // by the system.
4037        initial_timestamps
4038            .entry(Timeline::EpochMilliseconds)
4039            .or_insert_with(mz_repr::Timestamp::minimum);
4040        let mut timestamp_oracles = BTreeMap::new();
4041        for (timeline, initial_timestamp) in initial_timestamps {
4042            Coordinator::ensure_timeline_state_with_initial_time(
4043                &timeline,
4044                initial_timestamp,
4045                now.clone(),
4046                pg_timestamp_oracle_config.clone(),
4047                &mut timestamp_oracles,
4048                read_only_controllers,
4049            )
4050            .await;
4051        }
4052
4053        // Opening the durable catalog uses one or more timestamps without communicating with
4054        // the timestamp oracle. Here we make sure to apply the catalog upper with the timestamp
4055        // oracle to linearize future operations with opening the catalog.
4056        let catalog_upper = storage.current_upper().await;
4057        // Choose a time at which to boot. This is used, for example, to prune
4058        // old storage usage data or migrate audit log entries.
4059        //
4060        // This time is usually the current system time, but with protection
4061        // against backwards time jumps, even across restarts.
4062        let epoch_millis_oracle = &timestamp_oracles
4063            .get(&Timeline::EpochMilliseconds)
4064            .expect("inserted above")
4065            .oracle;
4066
4067        let mut boot_ts = if read_only_controllers {
4068            let read_ts = epoch_millis_oracle.read_ts().await;
4069            std::cmp::max(read_ts, catalog_upper)
4070        } else {
4071            // Getting/applying a write timestamp bumps the write timestamp in the
4072            // oracle, which we're not allowed in read-only mode.
4073            epoch_millis_oracle.apply_write(catalog_upper).await;
4074            epoch_millis_oracle.write_ts().await.timestamp
4075        };
4076
4077        info!(
4078            "startup: coordinator init: timestamp oracle init complete in {:?}",
4079            oracle_init_start.elapsed()
4080        );
4081
4082        let catalog_open_start = Instant::now();
4083        info!("startup: coordinator init: catalog open beginning");
4084        let persist_client = controller_config
4085            .persist_clients
4086            .open(controller_config.persist_location.clone())
4087            .await
4088            .context("opening persist client")?;
4089        let builtin_item_migration_config =
4090            BuiltinItemMigrationConfig {
4091                persist_client: persist_client.clone(),
4092                read_only: read_only_controllers,
4093            }
4094        ;
4095        let OpenCatalogResult {
4096            mut catalog,
4097            migrated_storage_collections_0dt,
4098            new_builtin_collections,
4099            builtin_table_updates,
4100            cached_global_exprs,
4101            uncached_local_exprs,
4102        } = Catalog::open(mz_catalog::config::Config {
4103            storage,
4104            metrics_registry: &metrics_registry,
4105            state: mz_catalog::config::StateConfig {
4106                unsafe_mode,
4107                all_features,
4108                build_info,
4109                environment_id: environment_id.clone(),
4110                read_only: read_only_controllers,
4111                now: now.clone(),
4112                boot_ts: boot_ts.clone(),
4113                skip_migrations: false,
4114                cluster_replica_sizes,
4115                builtin_system_cluster_config,
4116                builtin_catalog_server_cluster_config,
4117                builtin_probe_cluster_config,
4118                builtin_support_cluster_config,
4119                builtin_analytics_cluster_config,
4120                system_parameter_defaults,
4121                remote_system_parameters,
4122                availability_zones,
4123                egress_addresses,
4124                aws_principal_context,
4125                aws_privatelink_availability_zones,
4126                connection_context,
4127                http_host_name,
4128                builtin_item_migration_config,
4129                persist_client: persist_client.clone(),
4130                enable_expression_cache_override: None,
4131                helm_chart_version,
4132                external_login_password_mz_system,
4133                license_key: license_key.clone(),
4134            },
4135        })
4136        .await?;
4137
4138        // Opening the catalog uses one or more timestamps, so push the boot timestamp up to the
4139        // current catalog upper.
4140        let catalog_upper = catalog.current_upper().await;
4141        boot_ts = std::cmp::max(boot_ts, catalog_upper);
4142
4143        if !read_only_controllers {
4144            epoch_millis_oracle.apply_write(boot_ts).await;
4145        }
4146
4147        info!(
4148            "startup: coordinator init: catalog open complete in {:?}",
4149            catalog_open_start.elapsed()
4150        );
4151
4152        let coord_thread_start = Instant::now();
4153        info!("startup: coordinator init: coordinator thread start beginning");
4154
4155        let session_id = catalog.config().session_id;
4156        let start_instant = catalog.config().start_instant;
4157
4158        // In order for the coordinator to support Rc and Refcell types, it cannot be
4159        // sent across threads. Spawn it in a thread and have this parent thread wait
4160        // for bootstrap completion before proceeding.
4161        let (bootstrap_tx, bootstrap_rx) = oneshot::channel();
4162        let handle = TokioHandle::current();
4163
4164        let metrics = Metrics::register_into(&metrics_registry);
4165        let metrics_clone = metrics.clone();
4166        let optimizer_metrics = OptimizerMetrics::register_into(
4167            &metrics_registry,
4168            catalog.system_config().optimizer_e2e_latency_warning_threshold(),
4169        );
4170        let segment_client_clone = segment_client.clone();
4171        let coord_now = now.clone();
4172        let advance_timelines_interval = tokio::time::interval(catalog.config().timestamp_interval);
4173        let mut check_scheduling_policies_interval = tokio::time::interval(
4174            catalog
4175                .system_config()
4176                .cluster_check_scheduling_policies_interval(),
4177        );
4178        check_scheduling_policies_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
4179
4180        let clusters_caught_up_check_interval = if read_only_controllers {
4181            let dyncfgs = catalog.system_config().dyncfgs();
4182            let interval = WITH_0DT_DEPLOYMENT_CAUGHT_UP_CHECK_INTERVAL.get(dyncfgs);
4183
4184            let mut interval = tokio::time::interval(interval);
4185            interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
4186            interval
4187        } else {
4188            // When not in read-only mode, we don't do hydration checks. But we
4189            // still have to provide _some_ interval. This is large enough that
4190            // it doesn't matter.
4191            //
4192            // TODO(aljoscha): We cannot use Duration::MAX right now because of
4193            // https://github.com/tokio-rs/tokio/issues/6634. Use that once it's
4194            // fixed for good.
4195            let mut interval = tokio::time::interval(Duration::from_secs(60 * 60));
4196            interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
4197            interval
4198        };
4199
4200        let clusters_caught_up_check =
4201            clusters_caught_up_trigger.map(|trigger| CaughtUpCheckContext {
4202                trigger,
4203                exclude_collections: new_builtin_collections.into_iter().collect(),
4204            });
4205
4206        if let Some(config) = pg_timestamp_oracle_config.as_ref() {
4207            // Apply settings from system vars as early as possible because some
4208            // of them are locked in right when an oracle is first opened!
4209            let pg_timestamp_oracle_params =
4210                flags::pg_timstamp_oracle_config(catalog.system_config());
4211            pg_timestamp_oracle_params.apply(config);
4212        }
4213
4214        // Register a callback so whenever the MAX_CONNECTIONS or SUPERUSER_RESERVED_CONNECTIONS
4215        // system variables change, we update our connection limits.
4216        let connection_limit_callback: Arc<dyn Fn(&SystemVars) + Send + Sync> =
4217            Arc::new(move |system_vars: &SystemVars| {
4218                let limit: u64 = system_vars.max_connections().cast_into();
4219                let superuser_reserved: u64 =
4220                    system_vars.superuser_reserved_connections().cast_into();
4221
4222                // If superuser_reserved > max_connections, prefer max_connections.
4223                //
4224                // In this scenario all normal users would be locked out because all connections
4225                // would be reserved for superusers so complain if this is the case.
4226                let superuser_reserved = if superuser_reserved >= limit {
4227                    tracing::warn!(
4228                        "superuser_reserved ({superuser_reserved}) is greater than max connections ({limit})!"
4229                    );
4230                    limit
4231                } else {
4232                    superuser_reserved
4233                };
4234
4235                (connection_limit_callback)(limit, superuser_reserved);
4236            });
4237        catalog.system_config_mut().register_callback(
4238            &mz_sql::session::vars::MAX_CONNECTIONS,
4239            Arc::clone(&connection_limit_callback),
4240        );
4241        catalog.system_config_mut().register_callback(
4242            &mz_sql::session::vars::SUPERUSER_RESERVED_CONNECTIONS,
4243            connection_limit_callback,
4244        );
4245
4246        let (group_commit_tx, group_commit_rx) = appends::notifier();
4247
4248        let parent_span = tracing::Span::current();
4249        let thread = thread::Builder::new()
4250            // The Coordinator thread tends to keep a lot of data on its stack. To
4251            // prevent a stack overflow we allocate a stack three times as big as the default
4252            // stack.
4253            .stack_size(3 * stack::STACK_SIZE)
4254            .name("coordinator".to_string())
4255            .spawn(move || {
4256                let span = info_span!(parent: parent_span, "coord::coordinator").entered();
4257
4258                let controller = handle
4259                    .block_on({
4260                        catalog.initialize_controller(
4261                            controller_config,
4262                            controller_envd_epoch,
4263                            read_only_controllers,
4264                        )
4265                    })
4266                    .unwrap_or_terminate("failed to initialize storage_controller");
4267                // Initializing the controller uses one or more timestamps, so push the boot timestamp up to the
4268                // current catalog upper.
4269                let catalog_upper = handle.block_on(catalog.current_upper());
4270                boot_ts = std::cmp::max(boot_ts, catalog_upper);
4271                if !read_only_controllers {
4272                    let epoch_millis_oracle = &timestamp_oracles
4273                        .get(&Timeline::EpochMilliseconds)
4274                        .expect("inserted above")
4275                        .oracle;
4276                    handle.block_on(epoch_millis_oracle.apply_write(boot_ts));
4277                }
4278
4279                let catalog = Arc::new(catalog);
4280
4281                let caching_secrets_reader = CachingSecretsReader::new(secrets_controller.reader());
4282                let mut coord = Coordinator {
4283                    controller,
4284                    catalog,
4285                    internal_cmd_tx,
4286                    group_commit_tx,
4287                    strict_serializable_reads_tx,
4288                    global_timelines: timestamp_oracles,
4289                    transient_id_gen: Arc::new(TransientIdGen::new()),
4290                    active_conns: BTreeMap::new(),
4291                    txn_read_holds: Default::default(),
4292                    pending_peeks: BTreeMap::new(),
4293                    client_pending_peeks: BTreeMap::new(),
4294                    pending_linearize_read_txns: BTreeMap::new(),
4295                    serialized_ddl: LockedVecDeque::new(),
4296                    active_compute_sinks: BTreeMap::new(),
4297                    active_webhooks: BTreeMap::new(),
4298                    active_copies: BTreeMap::new(),
4299                    staged_cancellation: BTreeMap::new(),
4300                    introspection_subscribes: BTreeMap::new(),
4301                    write_locks: BTreeMap::new(),
4302                    deferred_write_ops: BTreeMap::new(),
4303                    pending_writes: Vec::new(),
4304                    advance_timelines_interval,
4305                    secrets_controller,
4306                    caching_secrets_reader,
4307                    cloud_resource_controller,
4308                    storage_usage_client,
4309                    storage_usage_collection_interval,
4310                    segment_client,
4311                    metrics,
4312                    optimizer_metrics,
4313                    tracing_handle,
4314                    statement_logging: StatementLogging::new(coord_now.clone()),
4315                    webhook_concurrency_limit,
4316                    pg_timestamp_oracle_config,
4317                    check_cluster_scheduling_policies_interval: check_scheduling_policies_interval,
4318                    cluster_scheduling_decisions: BTreeMap::new(),
4319                    caught_up_check_interval: clusters_caught_up_check_interval,
4320                    caught_up_check: clusters_caught_up_check,
4321                    installed_watch_sets: BTreeMap::new(),
4322                    connection_watch_sets: BTreeMap::new(),
4323                    cluster_replica_statuses: ClusterReplicaStatuses::new(),
4324                    read_only_controllers,
4325                    buffered_builtin_table_updates: Some(Vec::new()),
4326                    license_key,
4327                    persist_client,
4328                };
4329                let bootstrap = handle.block_on(async {
4330                    coord
4331                        .bootstrap(
4332                            boot_ts,
4333                            migrated_storage_collections_0dt,
4334                            builtin_table_updates,
4335                            cached_global_exprs,
4336                            uncached_local_exprs,
4337                            audit_logs_iterator,
4338                        )
4339                        .await?;
4340                    coord
4341                        .controller
4342                        .remove_orphaned_replicas(
4343                            coord.catalog().get_next_user_replica_id().await?,
4344                            coord.catalog().get_next_system_replica_id().await?,
4345                        )
4346                        .await
4347                        .map_err(AdapterError::Orchestrator)?;
4348
4349                    if let Some(retention_period) = storage_usage_retention_period {
4350                        coord
4351                            .prune_storage_usage_events_on_startup(retention_period)
4352                            .await;
4353                    }
4354
4355                    Ok(())
4356                });
4357                let ok = bootstrap.is_ok();
4358                drop(span);
4359                bootstrap_tx
4360                    .send(bootstrap)
4361                    .expect("bootstrap_rx is not dropped until it receives this message");
4362                if ok {
4363                    handle.block_on(coord.serve(
4364                        internal_cmd_rx,
4365                        strict_serializable_reads_rx,
4366                        cmd_rx,
4367                        group_commit_rx,
4368                    ));
4369                }
4370            })
4371            .expect("failed to create coordinator thread");
4372        match bootstrap_rx
4373            .await
4374            .expect("bootstrap_tx always sends a message or panics/halts")
4375        {
4376            Ok(()) => {
4377                info!(
4378                    "startup: coordinator init: coordinator thread start complete in {:?}",
4379                    coord_thread_start.elapsed()
4380                );
4381                info!(
4382                    "startup: coordinator init: complete in {:?}",
4383                    coord_start.elapsed()
4384                );
4385                let handle = Handle {
4386                    session_id,
4387                    start_instant,
4388                    _thread: thread.join_on_drop(),
4389                };
4390                let client = Client::new(
4391                    build_info,
4392                    cmd_tx.clone(),
4393                    metrics_clone,
4394                    now,
4395                    environment_id,
4396                    segment_client_clone,
4397                );
4398                Ok((handle, client))
4399            }
4400            Err(e) => Err(e),
4401        }
4402    }
4403    .boxed()
4404}
4405
4406// Determines and returns the highest timestamp for each timeline, for all known
4407// timestamp oracle implementations.
4408//
4409// Initially, we did this so that we can switch between implementations of
4410// timestamp oracle, but now we also do this to determine a monotonic boot
4411// timestamp, a timestamp that does not regress across reboots.
4412//
4413// This mostly works, but there can be linearizability violations, because there
4414// is no central moment where we do distributed coordination for all oracle
4415// types. Working around this seems prohibitively hard, maybe even impossible so
4416// we have to live with this window of potential violations during the upgrade
4417// window (which is the only point where we should switch oracle
4418// implementations).
4419async fn get_initial_oracle_timestamps(
4420    pg_timestamp_oracle_config: &Option<PostgresTimestampOracleConfig>,
4421) -> Result<BTreeMap<Timeline, Timestamp>, AdapterError> {
4422    let mut initial_timestamps = BTreeMap::new();
4423
4424    if let Some(pg_timestamp_oracle_config) = pg_timestamp_oracle_config {
4425        let postgres_oracle_timestamps =
4426            PostgresTimestampOracle::<NowFn>::get_all_timelines(pg_timestamp_oracle_config.clone())
4427                .await?;
4428
4429        let debug_msg = || {
4430            postgres_oracle_timestamps
4431                .iter()
4432                .map(|(timeline, ts)| format!("{:?} -> {}", timeline, ts))
4433                .join(", ")
4434        };
4435        info!(
4436            "current timestamps from the postgres-backed timestamp oracle: {}",
4437            debug_msg()
4438        );
4439
4440        for (timeline, ts) in postgres_oracle_timestamps {
4441            let entry = initial_timestamps
4442                .entry(Timeline::from_str(&timeline).expect("could not parse timeline"));
4443
4444            entry
4445                .and_modify(|current_ts| *current_ts = std::cmp::max(*current_ts, ts))
4446                .or_insert(ts);
4447        }
4448    } else {
4449        info!("no postgres url for postgres-backed timestamp oracle configured!");
4450    };
4451
4452    let debug_msg = || {
4453        initial_timestamps
4454            .iter()
4455            .map(|(timeline, ts)| format!("{:?}: {}", timeline, ts))
4456            .join(", ")
4457    };
4458    info!("initial oracle timestamps: {}", debug_msg());
4459
4460    Ok(initial_timestamps)
4461}
4462
4463#[instrument]
4464pub async fn load_remote_system_parameters(
4465    storage: &mut Box<dyn OpenableDurableCatalogState>,
4466    system_parameter_sync_config: Option<SystemParameterSyncConfig>,
4467    system_parameter_sync_timeout: Duration,
4468) -> Result<Option<BTreeMap<String, String>>, AdapterError> {
4469    if let Some(system_parameter_sync_config) = system_parameter_sync_config {
4470        tracing::info!("parameter sync on boot: start sync");
4471
4472        // We intentionally block initial startup, potentially forever,
4473        // on initializing LaunchDarkly. This may seem scary, but the
4474        // alternative is even scarier. Over time, we expect that the
4475        // compiled-in default values for the system parameters will
4476        // drift substantially from the defaults configured in
4477        // LaunchDarkly, to the point that starting an environment
4478        // without loading the latest values from LaunchDarkly will
4479        // result in running an untested configuration.
4480        //
4481        // Note this only applies during initial startup. Restarting
4482        // after we've synced once only blocks for a maximum of
4483        // `FRONTEND_SYNC_TIMEOUT` on LaunchDarkly, as it seems
4484        // reasonable to assume that the last-synced configuration was
4485        // valid enough.
4486        //
4487        // This philosophy appears to provide a good balance between not
4488        // running untested configurations in production while also not
4489        // making LaunchDarkly a "tier 1" dependency for existing
4490        // environments.
4491        //
4492        // If this proves to be an issue, we could seek to address the
4493        // configuration drift in a different way--for example, by
4494        // writing a script that runs in CI nightly and checks for
4495        // deviation between the compiled Rust code and LaunchDarkly.
4496        //
4497        // If it is absolutely necessary to bring up a new environment
4498        // while LaunchDarkly is down, the following manual mitigation
4499        // can be performed:
4500        //
4501        //    1. Edit the environmentd startup parameters to omit the
4502        //       LaunchDarkly configuration.
4503        //    2. Boot environmentd.
4504        //    3. Use the catalog-debug tool to run `edit config "{\"key\":\"system_config_synced\"}" "{\"value\": 1}"`.
4505        //    4. Adjust any other parameters as necessary to avoid
4506        //       running a nonstandard configuration in production.
4507        //    5. Edit the environmentd startup parameters to restore the
4508        //       LaunchDarkly configuration, for when LaunchDarkly comes
4509        //       back online.
4510        //    6. Reboot environmentd.
4511        let mut params = SynchronizedParameters::new(SystemVars::default());
4512        let frontend_sync = async {
4513            let frontend = SystemParameterFrontend::from(&system_parameter_sync_config).await?;
4514            frontend.pull(&mut params);
4515            let ops = params
4516                .modified()
4517                .into_iter()
4518                .map(|param| {
4519                    let name = param.name;
4520                    let value = param.value;
4521                    tracing::info!(name, value, initial = true, "sync parameter");
4522                    (name, value)
4523                })
4524                .collect();
4525            tracing::info!("parameter sync on boot: end sync");
4526            Ok(Some(ops))
4527        };
4528        if !storage.has_system_config_synced_once().await? {
4529            frontend_sync.await
4530        } else {
4531            match mz_ore::future::timeout(system_parameter_sync_timeout, frontend_sync).await {
4532                Ok(ops) => Ok(ops),
4533                Err(TimeoutError::Inner(e)) => Err(e),
4534                Err(TimeoutError::DeadlineElapsed) => {
4535                    tracing::info!("parameter sync on boot: sync has timed out");
4536                    Ok(None)
4537                }
4538            }
4539        }
4540    } else {
4541        Ok(None)
4542    }
4543}
4544
4545#[derive(Debug)]
4546pub enum WatchSetResponse {
4547    StatementDependenciesReady(StatementLoggingId, StatementLifecycleEvent),
4548    AlterSinkReady(AlterSinkReadyContext),
4549}
4550
4551#[derive(Debug)]
4552pub struct AlterSinkReadyContext {
4553    ctx: Option<ExecuteContext>,
4554    otel_ctx: OpenTelemetryContext,
4555    plan: AlterSinkPlan,
4556    plan_validity: PlanValidity,
4557    resolved_ids: ResolvedIds,
4558    read_hold: ReadHolds<Timestamp>,
4559}
4560
4561impl AlterSinkReadyContext {
4562    fn ctx(&mut self) -> &mut ExecuteContext {
4563        self.ctx.as_mut().expect("only cleared on drop")
4564    }
4565
4566    fn retire(mut self, result: Result<ExecuteResponse, AdapterError>) {
4567        self.ctx
4568            .take()
4569            .expect("only cleared on drop")
4570            .retire(result);
4571    }
4572}
4573
4574impl Drop for AlterSinkReadyContext {
4575    fn drop(&mut self) {
4576        if let Some(ctx) = self.ctx.take() {
4577            ctx.retire(Err(AdapterError::Canceled));
4578        }
4579    }
4580}
4581
4582/// A struct for tracking the ownership of a lock and a VecDeque to store to-be-done work after the
4583/// lock is freed.
4584#[derive(Debug)]
4585struct LockedVecDeque<T> {
4586    items: VecDeque<T>,
4587    lock: Arc<tokio::sync::Mutex<()>>,
4588}
4589
4590impl<T> LockedVecDeque<T> {
4591    pub fn new() -> Self {
4592        Self {
4593            items: VecDeque::new(),
4594            lock: Arc::new(tokio::sync::Mutex::new(())),
4595        }
4596    }
4597
4598    pub fn try_lock_owned(&self) -> Result<OwnedMutexGuard<()>, tokio::sync::TryLockError> {
4599        Arc::clone(&self.lock).try_lock_owned()
4600    }
4601
4602    pub fn is_empty(&self) -> bool {
4603        self.items.is_empty()
4604    }
4605
4606    pub fn push_back(&mut self, value: T) {
4607        self.items.push_back(value)
4608    }
4609
4610    pub fn pop_front(&mut self) -> Option<T> {
4611        self.items.pop_front()
4612    }
4613
4614    pub fn remove(&mut self, index: usize) -> Option<T> {
4615        self.items.remove(index)
4616    }
4617
4618    pub fn iter(&self) -> std::collections::vec_deque::Iter<'_, T> {
4619        self.items.iter()
4620    }
4621}
4622
4623#[derive(Debug)]
4624struct DeferredPlanStatement {
4625    ctx: ExecuteContext,
4626    ps: PlanStatement,
4627}
4628
4629#[derive(Debug)]
4630enum PlanStatement {
4631    Statement {
4632        stmt: Arc<Statement<Raw>>,
4633        params: Params,
4634    },
4635    Plan {
4636        plan: mz_sql::plan::Plan,
4637        resolved_ids: ResolvedIds,
4638    },
4639}
4640
4641#[derive(Debug, Error)]
4642pub enum NetworkPolicyError {
4643    #[error("Access denied for address {0}")]
4644    AddressDenied(IpAddr),
4645    #[error("Access denied missing IP address")]
4646    MissingIp,
4647}
4648
4649pub(crate) fn validate_ip_with_policy_rules(
4650    ip: &IpAddr,
4651    rules: &Vec<NetworkPolicyRule>,
4652) -> Result<(), NetworkPolicyError> {
4653    // At the moment we're not handling action or direction
4654    // as those are only able to be "allow" and "ingress" respectively
4655    if rules.iter().any(|r| r.address.0.contains(ip)) {
4656        Ok(())
4657    } else {
4658        Err(NetworkPolicyError::AddressDenied(ip.clone()))
4659    }
4660}