mz_adapter/
coord.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Translation of SQL commands into timestamped `Controller` commands.
11//!
12//! The various SQL commands instruct the system to take actions that are not
13//! yet explicitly timestamped. On the other hand, the underlying data continually
14//! change as time moves forward. On the third hand, we greatly benefit from the
15//! information that some times are no longer of interest, so that we may
16//! compact the representation of the continually changing collections.
17//!
18//! The [`Coordinator`] curates these interactions by observing the progress
19//! collections make through time, choosing timestamps for its own commands,
20//! and eventually communicating that certain times have irretrievably "passed".
21//!
22//! ## Frontiers another way
23//!
24//! If the above description of frontiers left you with questions, this
25//! repackaged explanation might help.
26//!
27//! - `since` is the least recent time (i.e. oldest time) that you can read
28//!   from sources and be guaranteed that the returned data is accurate as of
29//!   that time.
30//!
31//!   Reads at times less than `since` may return values that were not actually
32//!   seen at the specified time, but arrived later (i.e. the results are
33//!   compacted).
34//!
35//!   For correctness' sake, the coordinator never chooses to read at a time
36//!   less than an arrangement's `since`.
37//!
38//! - `upper` is the first time after the most recent time that you can read
39//!   from sources and receive an immediate response. Alternately, it is the
40//!   least time at which the data may still change (that is the reason we may
41//!   not be able to respond immediately).
42//!
43//!   Reads at times >= `upper` may not immediately return because the answer
44//!   isn't known yet. However, once the `upper` is > the specified read time,
45//!   the read can return.
46//!
47//!   For the sake of returned values' freshness, the coordinator prefers
48//!   performing reads at an arrangement's `upper`. However, because we more
49//!   strongly prefer correctness, the coordinator will choose timestamps
50//!   greater than an object's `upper` if it is also being accessed alongside
51//!   objects whose `since` times are >= its `upper`.
52//!
53//! This illustration attempts to show, with time moving left to right, the
54//! relationship between `since` and `upper`.
55//!
56//! - `#`: possibly inaccurate results
57//! - `-`: immediate, correct response
58//! - `?`: not yet known
59//! - `s`: since
60//! - `u`: upper
61//! - `|`: eligible for coordinator to select
62//!
63//! ```nofmt
64//! ####s----u?????
65//!     |||||||||||
66//! ```
67//!
68
69use std::borrow::Cow;
70use std::collections::{BTreeMap, BTreeSet, VecDeque};
71use std::fmt;
72use std::net::IpAddr;
73use std::num::NonZeroI64;
74use std::ops::Neg;
75use std::str::FromStr;
76use std::sync::LazyLock;
77use std::sync::{Arc, Mutex};
78use std::thread;
79use std::time::{Duration, Instant};
80
81use anyhow::Context;
82use chrono::{DateTime, Utc};
83use derivative::Derivative;
84use differential_dataflow::lattice::Lattice;
85use fail::fail_point;
86use futures::StreamExt;
87use futures::future::{BoxFuture, FutureExt, LocalBoxFuture};
88use http::Uri;
89use ipnet::IpNet;
90use itertools::{Either, Itertools};
91use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
92use mz_adapter_types::compaction::CompactionWindow;
93use mz_adapter_types::connection::ConnectionId;
94use mz_adapter_types::dyncfgs::WITH_0DT_DEPLOYMENT_CAUGHT_UP_CHECK_INTERVAL;
95use mz_auth::password::Password;
96use mz_build_info::BuildInfo;
97use mz_catalog::builtin::{BUILTINS, BUILTINS_STATIC, MZ_AUDIT_EVENTS, MZ_STORAGE_USAGE_BY_SHARD};
98use mz_catalog::config::{AwsPrincipalContext, BuiltinItemMigrationConfig, ClusterReplicaSizeMap};
99use mz_catalog::durable::{AuditLogIterator, OpenableDurableCatalogState};
100use mz_catalog::expr_cache::{GlobalExpressions, LocalExpressions};
101use mz_catalog::memory::objects::{
102    CatalogEntry, CatalogItem, ClusterReplicaProcessStatus, ClusterVariantManaged, Connection,
103    DataSourceDesc, StateDiff, StateUpdate, StateUpdateKind, Table, TableDataSource,
104};
105use mz_cloud_resources::{CloudResourceController, VpcEndpointConfig, VpcEndpointEvent};
106use mz_compute_client::as_of_selection;
107use mz_compute_client::controller::error::{DataflowCreationError, InstanceMissing};
108use mz_compute_types::ComputeInstanceId;
109use mz_compute_types::dataflows::DataflowDescription;
110use mz_compute_types::plan::Plan;
111use mz_controller::clusters::{
112    ClusterConfig, ClusterEvent, ClusterStatus, ProcessId, ReplicaLocation,
113};
114use mz_controller::{ControllerConfig, Readiness};
115use mz_controller_types::{ClusterId, ReplicaId, WatchSetId};
116use mz_expr::{MapFilterProject, OptimizedMirRelationExpr, RowSetFinishing};
117use mz_license_keys::ValidatedLicenseKey;
118use mz_orchestrator::OfflineReason;
119use mz_ore::cast::{CastFrom, CastInto, CastLossy};
120use mz_ore::channel::trigger::Trigger;
121use mz_ore::future::TimeoutError;
122use mz_ore::metrics::MetricsRegistry;
123use mz_ore::now::{EpochMillis, NowFn};
124use mz_ore::task::{JoinHandle, spawn};
125use mz_ore::thread::JoinHandleExt;
126use mz_ore::tracing::{OpenTelemetryContext, TracingHandle};
127use mz_ore::url::SensitiveUrl;
128use mz_ore::{
129    assert_none, instrument, soft_assert_eq_or_log, soft_assert_or_log, soft_panic_or_log, stack,
130};
131use mz_persist_client::PersistClient;
132use mz_persist_client::batch::ProtoBatch;
133use mz_persist_client::usage::{ShardsUsageReferenced, StorageUsageClient};
134use mz_repr::adt::numeric::Numeric;
135use mz_repr::explain::{ExplainConfig, ExplainFormat};
136use mz_repr::global_id::TransientIdGen;
137use mz_repr::optimize::OptimizerFeatures;
138use mz_repr::role_id::RoleId;
139use mz_repr::{CatalogItemId, Diff, GlobalId, RelationDesc, Timestamp};
140use mz_secrets::cache::CachingSecretsReader;
141use mz_secrets::{SecretsController, SecretsReader};
142use mz_sql::ast::{Raw, Statement};
143use mz_sql::catalog::{CatalogCluster, EnvironmentId};
144use mz_sql::names::{QualifiedItemName, ResolvedIds, SchemaSpecifier};
145use mz_sql::optimizer_metrics::OptimizerMetrics;
146use mz_sql::plan::{
147    self, AlterSinkPlan, ConnectionDetails, CreateConnectionPlan, NetworkPolicyRule,
148    OnTimeoutAction, Params, QueryWhen,
149};
150use mz_sql::session::user::User;
151use mz_sql::session::vars::{MAX_CREDIT_CONSUMPTION_RATE, SystemVars, Var};
152use mz_sql_parser::ast::ExplainStage;
153use mz_sql_parser::ast::display::AstDisplay;
154use mz_storage_client::client::TableData;
155use mz_storage_client::controller::{CollectionDescription, DataSource, ExportDescription};
156use mz_storage_types::connections::Connection as StorageConnection;
157use mz_storage_types::connections::ConnectionContext;
158use mz_storage_types::connections::inline::{IntoInlineConnection, ReferencedConnection};
159use mz_storage_types::read_holds::ReadHold;
160use mz_storage_types::sinks::{S3SinkFormat, StorageSinkDesc};
161use mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC;
162use mz_storage_types::sources::{IngestionDescription, SourceExport, Timeline};
163use mz_timestamp_oracle::WriteTimestamp;
164use mz_timestamp_oracle::postgres_oracle::{
165    PostgresTimestampOracle, PostgresTimestampOracleConfig,
166};
167use mz_transform::dataflow::DataflowMetainfo;
168use opentelemetry::trace::TraceContextExt;
169use serde::Serialize;
170use thiserror::Error;
171use timely::progress::{Antichain, Timestamp as _};
172use tokio::runtime::Handle as TokioHandle;
173use tokio::select;
174use tokio::sync::{OwnedMutexGuard, mpsc, oneshot, watch};
175use tokio::time::{Interval, MissedTickBehavior};
176use tracing::{Instrument, Level, Span, debug, info, info_span, span, warn};
177use tracing_opentelemetry::OpenTelemetrySpanExt;
178use uuid::Uuid;
179
180use crate::active_compute_sink::{ActiveComputeSink, ActiveCopyFrom};
181use crate::catalog::{BuiltinTableUpdate, Catalog, OpenCatalogResult};
182use crate::client::{Client, Handle};
183use crate::command::{Command, ExecuteResponse};
184use crate::config::{SynchronizedParameters, SystemParameterFrontend, SystemParameterSyncConfig};
185use crate::coord::appends::{
186    BuiltinTableAppendNotify, DeferredOp, GroupCommitPermit, PendingWriteTxn,
187};
188use crate::coord::caught_up::CaughtUpCheckContext;
189use crate::coord::cluster_scheduling::SchedulingDecision;
190use crate::coord::id_bundle::CollectionIdBundle;
191use crate::coord::introspection::IntrospectionSubscribe;
192use crate::coord::peek::PendingPeek;
193use crate::coord::statement_logging::{StatementLogging, StatementLoggingId};
194use crate::coord::timeline::{TimelineContext, TimelineState};
195use crate::coord::timestamp_selection::{TimestampContext, TimestampDetermination};
196use crate::coord::validity::PlanValidity;
197use crate::error::AdapterError;
198use crate::explain::insights::PlanInsightsContext;
199use crate::explain::optimizer_trace::{DispatchGuard, OptimizerTrace};
200use crate::metrics::Metrics;
201use crate::optimize::dataflows::{
202    ComputeInstanceSnapshot, DataflowBuilder, dataflow_import_id_bundle,
203};
204use crate::optimize::{self, Optimize, OptimizerConfig};
205use crate::session::{EndTransactionAction, Session};
206use crate::statement_logging::{StatementEndedExecutionReason, StatementLifecycleEvent};
207use crate::util::{ClientTransmitter, ResultExt};
208use crate::webhook::{WebhookAppenderInvalidator, WebhookConcurrencyLimiter};
209use crate::{AdapterNotice, ReadHolds, flags};
210
211pub(crate) mod appends;
212pub(crate) mod catalog_serving;
213pub(crate) mod cluster_scheduling;
214pub(crate) mod consistency;
215pub(crate) mod id_bundle;
216pub(crate) mod in_memory_oracle;
217pub(crate) mod peek;
218pub(crate) mod read_policy;
219pub(crate) mod sequencer;
220pub(crate) mod statement_logging;
221pub(crate) mod timeline;
222pub(crate) mod timestamp_selection;
223
224pub mod catalog_implications;
225mod caught_up;
226mod command_handler;
227mod ddl;
228mod indexes;
229mod introspection;
230mod message_handler;
231mod privatelink_status;
232mod sql;
233mod validity;
234
235#[derive(Debug)]
236pub enum Message {
237    Command(OpenTelemetryContext, Command),
238    ControllerReady {
239        controller: ControllerReadiness,
240    },
241    PurifiedStatementReady(PurifiedStatementReady),
242    CreateConnectionValidationReady(CreateConnectionValidationReady),
243    AlterConnectionValidationReady(AlterConnectionValidationReady),
244    TryDeferred {
245        /// The connection that created this op.
246        conn_id: ConnectionId,
247        /// The write lock that notified us our deferred op might be able to run.
248        ///
249        /// Note: While we never want to hold a partial set of locks, it can be important to hold
250        /// onto the _one_ that notified us our op might be ready. If there are multiple operations
251        /// waiting on a single collection, and we don't hold this lock through retyring the op,
252        /// then everything waiting on this collection will get retried causing traffic in the
253        /// Coordinator's message queue.
254        ///
255        /// See [`DeferredOp::can_be_optimistically_retried`] for more detail.
256        acquired_lock: Option<(CatalogItemId, tokio::sync::OwnedMutexGuard<()>)>,
257    },
258    /// Initiates a group commit.
259    GroupCommitInitiate(Span, Option<GroupCommitPermit>),
260    DeferredStatementReady,
261    AdvanceTimelines,
262    ClusterEvent(ClusterEvent),
263    CancelPendingPeeks {
264        conn_id: ConnectionId,
265    },
266    LinearizeReads,
267    StagedBatches {
268        conn_id: ConnectionId,
269        table_id: CatalogItemId,
270        batches: Vec<Result<ProtoBatch, String>>,
271    },
272    StorageUsageSchedule,
273    StorageUsageFetch,
274    StorageUsageUpdate(ShardsUsageReferenced),
275    StorageUsagePrune(Vec<BuiltinTableUpdate>),
276    /// Performs any cleanup and logging actions necessary for
277    /// finalizing a statement execution.
278    RetireExecute {
279        data: ExecuteContextExtra,
280        otel_ctx: OpenTelemetryContext,
281        reason: StatementEndedExecutionReason,
282    },
283    ExecuteSingleStatementTransaction {
284        ctx: ExecuteContext,
285        otel_ctx: OpenTelemetryContext,
286        stmt: Arc<Statement<Raw>>,
287        params: mz_sql::plan::Params,
288    },
289    PeekStageReady {
290        ctx: ExecuteContext,
291        span: Span,
292        stage: PeekStage,
293    },
294    CreateIndexStageReady {
295        ctx: ExecuteContext,
296        span: Span,
297        stage: CreateIndexStage,
298    },
299    CreateViewStageReady {
300        ctx: ExecuteContext,
301        span: Span,
302        stage: CreateViewStage,
303    },
304    CreateMaterializedViewStageReady {
305        ctx: ExecuteContext,
306        span: Span,
307        stage: CreateMaterializedViewStage,
308    },
309    SubscribeStageReady {
310        ctx: ExecuteContext,
311        span: Span,
312        stage: SubscribeStage,
313    },
314    IntrospectionSubscribeStageReady {
315        span: Span,
316        stage: IntrospectionSubscribeStage,
317    },
318    SecretStageReady {
319        ctx: ExecuteContext,
320        span: Span,
321        stage: SecretStage,
322    },
323    ClusterStageReady {
324        ctx: ExecuteContext,
325        span: Span,
326        stage: ClusterStage,
327    },
328    ExplainTimestampStageReady {
329        ctx: ExecuteContext,
330        span: Span,
331        stage: ExplainTimestampStage,
332    },
333    DrainStatementLog,
334    PrivateLinkVpcEndpointEvents(Vec<VpcEndpointEvent>),
335    CheckSchedulingPolicies,
336
337    /// Scheduling policy decisions about turning clusters On/Off.
338    /// `Vec<(policy name, Vec of decisions by the policy)>`
339    /// A cluster will be On if and only if there is at least one On decision for it.
340    /// Scheduling decisions for clusters that have `SCHEDULE = MANUAL` are ignored.
341    SchedulingDecisions(Vec<(&'static str, Vec<(ClusterId, SchedulingDecision)>)>),
342}
343
344impl Message {
345    /// Returns a string to identify the kind of [`Message`], useful for logging.
346    pub const fn kind(&self) -> &'static str {
347        match self {
348            Message::Command(_, msg) => match msg {
349                Command::CatalogSnapshot { .. } => "command-catalog_snapshot",
350                Command::Startup { .. } => "command-startup",
351                Command::Execute { .. } => "command-execute",
352                Command::Commit { .. } => "command-commit",
353                Command::CancelRequest { .. } => "command-cancel_request",
354                Command::PrivilegedCancelRequest { .. } => "command-privileged_cancel_request",
355                Command::GetWebhook { .. } => "command-get_webhook",
356                Command::GetSystemVars { .. } => "command-get_system_vars",
357                Command::SetSystemVars { .. } => "command-set_system_vars",
358                Command::Terminate { .. } => "command-terminate",
359                Command::RetireExecute { .. } => "command-retire_execute",
360                Command::CheckConsistency { .. } => "command-check_consistency",
361                Command::Dump { .. } => "command-dump",
362                Command::AuthenticatePassword { .. } => "command-auth_check",
363                Command::AuthenticateGetSASLChallenge { .. } => "command-auth_get_sasl_challenge",
364                Command::AuthenticateVerifySASLProof { .. } => "command-auth_verify_sasl_proof",
365                Command::GetComputeInstanceClient { .. } => "get-compute-instance-client",
366                Command::GetOracle { .. } => "get-oracle",
367                Command::DetermineRealTimeRecentTimestamp { .. } => {
368                    "determine-real-time-recent-timestamp"
369                }
370                Command::GetTransactionReadHoldsBundle { .. } => {
371                    "get-transaction-read-holds-bundle"
372                }
373                Command::StoreTransactionReadHolds { .. } => "store-transaction-read-holds",
374                Command::ExecuteSlowPathPeek { .. } => "execute-slow-path-peek",
375                Command::ExecuteCopyTo { .. } => "execute-copy-to",
376            },
377            Message::ControllerReady {
378                controller: ControllerReadiness::Compute,
379            } => "controller_ready(compute)",
380            Message::ControllerReady {
381                controller: ControllerReadiness::Storage,
382            } => "controller_ready(storage)",
383            Message::ControllerReady {
384                controller: ControllerReadiness::Metrics,
385            } => "controller_ready(metrics)",
386            Message::ControllerReady {
387                controller: ControllerReadiness::Internal,
388            } => "controller_ready(internal)",
389            Message::PurifiedStatementReady(_) => "purified_statement_ready",
390            Message::CreateConnectionValidationReady(_) => "create_connection_validation_ready",
391            Message::TryDeferred { .. } => "try_deferred",
392            Message::GroupCommitInitiate(..) => "group_commit_initiate",
393            Message::AdvanceTimelines => "advance_timelines",
394            Message::ClusterEvent(_) => "cluster_event",
395            Message::CancelPendingPeeks { .. } => "cancel_pending_peeks",
396            Message::LinearizeReads => "linearize_reads",
397            Message::StagedBatches { .. } => "staged_batches",
398            Message::StorageUsageSchedule => "storage_usage_schedule",
399            Message::StorageUsageFetch => "storage_usage_fetch",
400            Message::StorageUsageUpdate(_) => "storage_usage_update",
401            Message::StorageUsagePrune(_) => "storage_usage_prune",
402            Message::RetireExecute { .. } => "retire_execute",
403            Message::ExecuteSingleStatementTransaction { .. } => {
404                "execute_single_statement_transaction"
405            }
406            Message::PeekStageReady { .. } => "peek_stage_ready",
407            Message::ExplainTimestampStageReady { .. } => "explain_timestamp_stage_ready",
408            Message::CreateIndexStageReady { .. } => "create_index_stage_ready",
409            Message::CreateViewStageReady { .. } => "create_view_stage_ready",
410            Message::CreateMaterializedViewStageReady { .. } => {
411                "create_materialized_view_stage_ready"
412            }
413            Message::SubscribeStageReady { .. } => "subscribe_stage_ready",
414            Message::IntrospectionSubscribeStageReady { .. } => {
415                "introspection_subscribe_stage_ready"
416            }
417            Message::SecretStageReady { .. } => "secret_stage_ready",
418            Message::ClusterStageReady { .. } => "cluster_stage_ready",
419            Message::DrainStatementLog => "drain_statement_log",
420            Message::AlterConnectionValidationReady(..) => "alter_connection_validation_ready",
421            Message::PrivateLinkVpcEndpointEvents(_) => "private_link_vpc_endpoint_events",
422            Message::CheckSchedulingPolicies => "check_scheduling_policies",
423            Message::SchedulingDecisions { .. } => "scheduling_decision",
424            Message::DeferredStatementReady => "deferred_statement_ready",
425        }
426    }
427}
428
429/// The reason for why a controller needs processing on the main loop.
430#[derive(Debug)]
431pub enum ControllerReadiness {
432    /// The storage controller is ready.
433    Storage,
434    /// The compute controller is ready.
435    Compute,
436    /// A batch of metric data is ready.
437    Metrics,
438    /// An internally-generated message is ready to be returned.
439    Internal,
440}
441
442#[derive(Derivative)]
443#[derivative(Debug)]
444pub struct BackgroundWorkResult<T> {
445    #[derivative(Debug = "ignore")]
446    pub ctx: ExecuteContext,
447    pub result: Result<T, AdapterError>,
448    pub params: Params,
449    pub plan_validity: PlanValidity,
450    pub original_stmt: Arc<Statement<Raw>>,
451    pub otel_ctx: OpenTelemetryContext,
452}
453
454pub type PurifiedStatementReady = BackgroundWorkResult<mz_sql::pure::PurifiedStatement>;
455
456#[derive(Derivative)]
457#[derivative(Debug)]
458pub struct ValidationReady<T> {
459    #[derivative(Debug = "ignore")]
460    pub ctx: ExecuteContext,
461    pub result: Result<T, AdapterError>,
462    pub resolved_ids: ResolvedIds,
463    pub connection_id: CatalogItemId,
464    pub connection_gid: GlobalId,
465    pub plan_validity: PlanValidity,
466    pub otel_ctx: OpenTelemetryContext,
467}
468
469pub type CreateConnectionValidationReady = ValidationReady<CreateConnectionPlan>;
470pub type AlterConnectionValidationReady = ValidationReady<Connection>;
471
472#[derive(Debug)]
473pub enum PeekStage {
474    /// Common stages across SELECT, EXPLAIN and COPY TO queries.
475    LinearizeTimestamp(PeekStageLinearizeTimestamp),
476    RealTimeRecency(PeekStageRealTimeRecency),
477    TimestampReadHold(PeekStageTimestampReadHold),
478    Optimize(PeekStageOptimize),
479    /// Final stage for a peek.
480    Finish(PeekStageFinish),
481    /// Final stage for an explain.
482    ExplainPlan(PeekStageExplainPlan),
483    ExplainPushdown(PeekStageExplainPushdown),
484    /// Preflight checks for a copy to operation.
485    CopyToPreflight(PeekStageCopyTo),
486    /// Final stage for a copy to which involves shipping the dataflow.
487    CopyToDataflow(PeekStageCopyTo),
488}
489
490#[derive(Debug)]
491pub struct CopyToContext {
492    /// The `RelationDesc` of the data to be copied.
493    pub desc: RelationDesc,
494    /// The destination uri of the external service where the data will be copied.
495    pub uri: Uri,
496    /// Connection information required to connect to the external service to copy the data.
497    pub connection: StorageConnection<ReferencedConnection>,
498    /// The ID of the CONNECTION object to be used for copying the data.
499    pub connection_id: CatalogItemId,
500    /// Format params to format the data.
501    pub format: S3SinkFormat,
502    /// Approximate max file size of each uploaded file.
503    pub max_file_size: u64,
504    /// Number of batches the output of the COPY TO will be partitioned into
505    /// to distribute the load across workers deterministically.
506    /// This is only an option since it's not set when CopyToContext is instantiated
507    /// but immediately after in the PeekStageValidate stage.
508    pub output_batch_count: Option<u64>,
509}
510
511#[derive(Debug)]
512pub struct PeekStageLinearizeTimestamp {
513    validity: PlanValidity,
514    plan: mz_sql::plan::SelectPlan,
515    max_query_result_size: Option<u64>,
516    source_ids: BTreeSet<GlobalId>,
517    target_replica: Option<ReplicaId>,
518    timeline_context: TimelineContext,
519    optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
520    /// An optional context set iff the state machine is initiated from
521    /// sequencing an EXPLAIN for this statement.
522    explain_ctx: ExplainContext,
523}
524
525#[derive(Debug)]
526pub struct PeekStageRealTimeRecency {
527    validity: PlanValidity,
528    plan: mz_sql::plan::SelectPlan,
529    max_query_result_size: Option<u64>,
530    source_ids: BTreeSet<GlobalId>,
531    target_replica: Option<ReplicaId>,
532    timeline_context: TimelineContext,
533    oracle_read_ts: Option<Timestamp>,
534    optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
535    /// An optional context set iff the state machine is initiated from
536    /// sequencing an EXPLAIN for this statement.
537    explain_ctx: ExplainContext,
538}
539
540#[derive(Debug)]
541pub struct PeekStageTimestampReadHold {
542    validity: PlanValidity,
543    plan: mz_sql::plan::SelectPlan,
544    max_query_result_size: Option<u64>,
545    source_ids: BTreeSet<GlobalId>,
546    target_replica: Option<ReplicaId>,
547    timeline_context: TimelineContext,
548    oracle_read_ts: Option<Timestamp>,
549    real_time_recency_ts: Option<mz_repr::Timestamp>,
550    optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
551    /// An optional context set iff the state machine is initiated from
552    /// sequencing an EXPLAIN for this statement.
553    explain_ctx: ExplainContext,
554}
555
556#[derive(Debug)]
557pub struct PeekStageOptimize {
558    validity: PlanValidity,
559    plan: mz_sql::plan::SelectPlan,
560    max_query_result_size: Option<u64>,
561    source_ids: BTreeSet<GlobalId>,
562    id_bundle: CollectionIdBundle,
563    target_replica: Option<ReplicaId>,
564    determination: TimestampDetermination<mz_repr::Timestamp>,
565    optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
566    /// An optional context set iff the state machine is initiated from
567    /// sequencing an EXPLAIN for this statement.
568    explain_ctx: ExplainContext,
569}
570
571#[derive(Debug)]
572pub struct PeekStageFinish {
573    validity: PlanValidity,
574    plan: mz_sql::plan::SelectPlan,
575    max_query_result_size: Option<u64>,
576    id_bundle: CollectionIdBundle,
577    target_replica: Option<ReplicaId>,
578    source_ids: BTreeSet<GlobalId>,
579    determination: TimestampDetermination<mz_repr::Timestamp>,
580    cluster_id: ComputeInstanceId,
581    finishing: RowSetFinishing,
582    /// When present, an optimizer trace to be used for emitting a plan insights
583    /// notice.
584    plan_insights_optimizer_trace: Option<OptimizerTrace>,
585    insights_ctx: Option<Box<PlanInsightsContext>>,
586    global_lir_plan: optimize::peek::GlobalLirPlan,
587    optimization_finished_at: EpochMillis,
588}
589
590#[derive(Debug)]
591pub struct PeekStageCopyTo {
592    validity: PlanValidity,
593    optimizer: optimize::copy_to::Optimizer,
594    global_lir_plan: optimize::copy_to::GlobalLirPlan,
595    optimization_finished_at: EpochMillis,
596    source_ids: BTreeSet<GlobalId>,
597}
598
599#[derive(Debug)]
600pub struct PeekStageExplainPlan {
601    validity: PlanValidity,
602    optimizer: optimize::peek::Optimizer,
603    df_meta: DataflowMetainfo,
604    explain_ctx: ExplainPlanContext,
605    insights_ctx: Option<Box<PlanInsightsContext>>,
606}
607
608#[derive(Debug)]
609pub struct PeekStageExplainPushdown {
610    validity: PlanValidity,
611    determination: TimestampDetermination<mz_repr::Timestamp>,
612    imports: BTreeMap<GlobalId, MapFilterProject>,
613}
614
615#[derive(Debug)]
616pub enum CreateIndexStage {
617    Optimize(CreateIndexOptimize),
618    Finish(CreateIndexFinish),
619    Explain(CreateIndexExplain),
620}
621
622#[derive(Debug)]
623pub struct CreateIndexOptimize {
624    validity: PlanValidity,
625    plan: plan::CreateIndexPlan,
626    resolved_ids: ResolvedIds,
627    /// An optional context set iff the state machine is initiated from
628    /// sequencing an EXPLAIN for this statement.
629    explain_ctx: ExplainContext,
630}
631
632#[derive(Debug)]
633pub struct CreateIndexFinish {
634    validity: PlanValidity,
635    item_id: CatalogItemId,
636    global_id: GlobalId,
637    plan: plan::CreateIndexPlan,
638    resolved_ids: ResolvedIds,
639    global_mir_plan: optimize::index::GlobalMirPlan,
640    global_lir_plan: optimize::index::GlobalLirPlan,
641}
642
643#[derive(Debug)]
644pub struct CreateIndexExplain {
645    validity: PlanValidity,
646    exported_index_id: GlobalId,
647    plan: plan::CreateIndexPlan,
648    df_meta: DataflowMetainfo,
649    explain_ctx: ExplainPlanContext,
650}
651
652#[derive(Debug)]
653pub enum CreateViewStage {
654    Optimize(CreateViewOptimize),
655    Finish(CreateViewFinish),
656    Explain(CreateViewExplain),
657}
658
659#[derive(Debug)]
660pub struct CreateViewOptimize {
661    validity: PlanValidity,
662    plan: plan::CreateViewPlan,
663    resolved_ids: ResolvedIds,
664    /// An optional context set iff the state machine is initiated from
665    /// sequencing an EXPLAIN for this statement.
666    explain_ctx: ExplainContext,
667}
668
669#[derive(Debug)]
670pub struct CreateViewFinish {
671    validity: PlanValidity,
672    /// ID of this item in the Catalog.
673    item_id: CatalogItemId,
674    /// ID by with Compute will reference this View.
675    global_id: GlobalId,
676    plan: plan::CreateViewPlan,
677    /// IDs of objects resolved during name resolution.
678    resolved_ids: ResolvedIds,
679    optimized_expr: OptimizedMirRelationExpr,
680}
681
682#[derive(Debug)]
683pub struct CreateViewExplain {
684    validity: PlanValidity,
685    id: GlobalId,
686    plan: plan::CreateViewPlan,
687    explain_ctx: ExplainPlanContext,
688}
689
690#[derive(Debug)]
691pub enum ExplainTimestampStage {
692    Optimize(ExplainTimestampOptimize),
693    RealTimeRecency(ExplainTimestampRealTimeRecency),
694    Finish(ExplainTimestampFinish),
695}
696
697#[derive(Debug)]
698pub struct ExplainTimestampOptimize {
699    validity: PlanValidity,
700    plan: plan::ExplainTimestampPlan,
701    cluster_id: ClusterId,
702}
703
704#[derive(Debug)]
705pub struct ExplainTimestampRealTimeRecency {
706    validity: PlanValidity,
707    format: ExplainFormat,
708    optimized_plan: OptimizedMirRelationExpr,
709    cluster_id: ClusterId,
710    when: QueryWhen,
711}
712
713#[derive(Debug)]
714pub struct ExplainTimestampFinish {
715    validity: PlanValidity,
716    format: ExplainFormat,
717    optimized_plan: OptimizedMirRelationExpr,
718    cluster_id: ClusterId,
719    source_ids: BTreeSet<GlobalId>,
720    when: QueryWhen,
721    real_time_recency_ts: Option<Timestamp>,
722}
723
724#[derive(Debug)]
725pub enum ClusterStage {
726    Alter(AlterCluster),
727    WaitForHydrated(AlterClusterWaitForHydrated),
728    Finalize(AlterClusterFinalize),
729}
730
731#[derive(Debug)]
732pub struct AlterCluster {
733    validity: PlanValidity,
734    plan: plan::AlterClusterPlan,
735}
736
737#[derive(Debug)]
738pub struct AlterClusterWaitForHydrated {
739    validity: PlanValidity,
740    plan: plan::AlterClusterPlan,
741    new_config: ClusterVariantManaged,
742    timeout_time: Instant,
743    on_timeout: OnTimeoutAction,
744}
745
746#[derive(Debug)]
747pub struct AlterClusterFinalize {
748    validity: PlanValidity,
749    plan: plan::AlterClusterPlan,
750    new_config: ClusterVariantManaged,
751}
752
753#[derive(Debug)]
754pub enum ExplainContext {
755    /// The ordinary, non-explain variant of the statement.
756    None,
757    /// The `EXPLAIN <level> PLAN FOR <explainee>` version of the statement.
758    Plan(ExplainPlanContext),
759    /// Generate a notice containing the `EXPLAIN PLAN INSIGHTS` output
760    /// alongside the query's normal output.
761    PlanInsightsNotice(OptimizerTrace),
762    /// `EXPLAIN FILTER PUSHDOWN`
763    Pushdown,
764}
765
766impl ExplainContext {
767    /// If available for this context, wrap the [`OptimizerTrace`] into a
768    /// [`tracing::Dispatch`] and set it as default, returning the resulting
769    /// guard in a `Some(guard)` option.
770    pub(crate) fn dispatch_guard(&self) -> Option<DispatchGuard<'_>> {
771        let optimizer_trace = match self {
772            ExplainContext::Plan(explain_ctx) => Some(&explain_ctx.optimizer_trace),
773            ExplainContext::PlanInsightsNotice(optimizer_trace) => Some(optimizer_trace),
774            _ => None,
775        };
776        optimizer_trace.map(|optimizer_trace| optimizer_trace.as_guard())
777    }
778
779    pub(crate) fn needs_cluster(&self) -> bool {
780        match self {
781            ExplainContext::None => true,
782            ExplainContext::Plan(..) => false,
783            ExplainContext::PlanInsightsNotice(..) => true,
784            ExplainContext::Pushdown => false,
785        }
786    }
787
788    pub(crate) fn needs_plan_insights(&self) -> bool {
789        matches!(
790            self,
791            ExplainContext::Plan(ExplainPlanContext {
792                stage: ExplainStage::PlanInsights,
793                ..
794            }) | ExplainContext::PlanInsightsNotice(_)
795        )
796    }
797}
798
799#[derive(Debug)]
800pub struct ExplainPlanContext {
801    /// EXPLAIN BROKEN is internal syntax for showing EXPLAIN output despite an internal error in
802    /// the optimizer: we don't immediately bail out from peek sequencing when an internal optimizer
803    /// error happens, but go on with trying to show the requested EXPLAIN stage. This can still
804    /// succeed if the requested EXPLAIN stage is before the point where the error happened.
805    pub broken: bool,
806    pub config: ExplainConfig,
807    pub format: ExplainFormat,
808    pub stage: ExplainStage,
809    pub replan: Option<GlobalId>,
810    pub desc: Option<RelationDesc>,
811    pub optimizer_trace: OptimizerTrace,
812}
813
814#[derive(Debug)]
815pub enum CreateMaterializedViewStage {
816    Optimize(CreateMaterializedViewOptimize),
817    Finish(CreateMaterializedViewFinish),
818    Explain(CreateMaterializedViewExplain),
819}
820
821#[derive(Debug)]
822pub struct CreateMaterializedViewOptimize {
823    validity: PlanValidity,
824    plan: plan::CreateMaterializedViewPlan,
825    resolved_ids: ResolvedIds,
826    /// An optional context set iff the state machine is initiated from
827    /// sequencing an EXPLAIN for this statement.
828    explain_ctx: ExplainContext,
829}
830
831#[derive(Debug)]
832pub struct CreateMaterializedViewFinish {
833    /// The ID of this Materialized View in the Catalog.
834    item_id: CatalogItemId,
835    /// The ID of the durable pTVC backing this Materialized View.
836    global_id: GlobalId,
837    validity: PlanValidity,
838    plan: plan::CreateMaterializedViewPlan,
839    resolved_ids: ResolvedIds,
840    local_mir_plan: optimize::materialized_view::LocalMirPlan,
841    global_mir_plan: optimize::materialized_view::GlobalMirPlan,
842    global_lir_plan: optimize::materialized_view::GlobalLirPlan,
843}
844
845#[derive(Debug)]
846pub struct CreateMaterializedViewExplain {
847    global_id: GlobalId,
848    validity: PlanValidity,
849    plan: plan::CreateMaterializedViewPlan,
850    df_meta: DataflowMetainfo,
851    explain_ctx: ExplainPlanContext,
852}
853
854#[derive(Debug)]
855pub enum SubscribeStage {
856    OptimizeMir(SubscribeOptimizeMir),
857    TimestampOptimizeLir(SubscribeTimestampOptimizeLir),
858    Finish(SubscribeFinish),
859}
860
861#[derive(Debug)]
862pub struct SubscribeOptimizeMir {
863    validity: PlanValidity,
864    plan: plan::SubscribePlan,
865    timeline: TimelineContext,
866    dependency_ids: BTreeSet<GlobalId>,
867    cluster_id: ComputeInstanceId,
868    replica_id: Option<ReplicaId>,
869}
870
871#[derive(Debug)]
872pub struct SubscribeTimestampOptimizeLir {
873    validity: PlanValidity,
874    plan: plan::SubscribePlan,
875    timeline: TimelineContext,
876    optimizer: optimize::subscribe::Optimizer,
877    global_mir_plan: optimize::subscribe::GlobalMirPlan<optimize::subscribe::Unresolved>,
878    dependency_ids: BTreeSet<GlobalId>,
879    replica_id: Option<ReplicaId>,
880}
881
882#[derive(Debug)]
883pub struct SubscribeFinish {
884    validity: PlanValidity,
885    cluster_id: ComputeInstanceId,
886    replica_id: Option<ReplicaId>,
887    plan: plan::SubscribePlan,
888    global_lir_plan: optimize::subscribe::GlobalLirPlan,
889    dependency_ids: BTreeSet<GlobalId>,
890}
891
892#[derive(Debug)]
893pub enum IntrospectionSubscribeStage {
894    OptimizeMir(IntrospectionSubscribeOptimizeMir),
895    TimestampOptimizeLir(IntrospectionSubscribeTimestampOptimizeLir),
896    Finish(IntrospectionSubscribeFinish),
897}
898
899#[derive(Debug)]
900pub struct IntrospectionSubscribeOptimizeMir {
901    validity: PlanValidity,
902    plan: plan::SubscribePlan,
903    subscribe_id: GlobalId,
904    cluster_id: ComputeInstanceId,
905    replica_id: ReplicaId,
906}
907
908#[derive(Debug)]
909pub struct IntrospectionSubscribeTimestampOptimizeLir {
910    validity: PlanValidity,
911    optimizer: optimize::subscribe::Optimizer,
912    global_mir_plan: optimize::subscribe::GlobalMirPlan<optimize::subscribe::Unresolved>,
913    cluster_id: ComputeInstanceId,
914    replica_id: ReplicaId,
915}
916
917#[derive(Debug)]
918pub struct IntrospectionSubscribeFinish {
919    validity: PlanValidity,
920    global_lir_plan: optimize::subscribe::GlobalLirPlan,
921    read_holds: ReadHolds<Timestamp>,
922    cluster_id: ComputeInstanceId,
923    replica_id: ReplicaId,
924}
925
926#[derive(Debug)]
927pub enum SecretStage {
928    CreateEnsure(CreateSecretEnsure),
929    CreateFinish(CreateSecretFinish),
930    RotateKeysEnsure(RotateKeysSecretEnsure),
931    RotateKeysFinish(RotateKeysSecretFinish),
932    Alter(AlterSecret),
933}
934
935#[derive(Debug)]
936pub struct CreateSecretEnsure {
937    validity: PlanValidity,
938    plan: plan::CreateSecretPlan,
939}
940
941#[derive(Debug)]
942pub struct CreateSecretFinish {
943    validity: PlanValidity,
944    item_id: CatalogItemId,
945    global_id: GlobalId,
946    plan: plan::CreateSecretPlan,
947}
948
949#[derive(Debug)]
950pub struct RotateKeysSecretEnsure {
951    validity: PlanValidity,
952    id: CatalogItemId,
953}
954
955#[derive(Debug)]
956pub struct RotateKeysSecretFinish {
957    validity: PlanValidity,
958    ops: Vec<crate::catalog::Op>,
959}
960
961#[derive(Debug)]
962pub struct AlterSecret {
963    validity: PlanValidity,
964    plan: plan::AlterSecretPlan,
965}
966
967/// An enum describing which cluster to run a statement on.
968///
969/// One example usage would be that if a query depends only on system tables, we might
970/// automatically run it on the catalog server cluster to benefit from indexes that exist there.
971#[derive(Debug, Copy, Clone, PartialEq, Eq)]
972pub enum TargetCluster {
973    /// The catalog server cluster.
974    CatalogServer,
975    /// The current user's active cluster.
976    Active,
977    /// The cluster selected at the start of a transaction.
978    Transaction(ClusterId),
979}
980
981/// Result types for each stage of a sequence.
982pub(crate) enum StageResult<T> {
983    /// A task was spawned that will return the next stage.
984    Handle(JoinHandle<Result<T, AdapterError>>),
985    /// A task was spawned that will return a response for the client.
986    HandleRetire(JoinHandle<Result<ExecuteResponse, AdapterError>>),
987    /// The next stage is immediately ready and will execute.
988    Immediate(T),
989    /// The final stage was executed and is ready to respond to the client.
990    Response(ExecuteResponse),
991}
992
993/// Common functionality for [Coordinator::sequence_staged].
994pub(crate) trait Staged: Send {
995    type Ctx: StagedContext;
996
997    fn validity(&mut self) -> &mut PlanValidity;
998
999    /// Returns the next stage or final result.
1000    async fn stage(
1001        self,
1002        coord: &mut Coordinator,
1003        ctx: &mut Self::Ctx,
1004    ) -> Result<StageResult<Box<Self>>, AdapterError>;
1005
1006    /// Prepares a message for the Coordinator.
1007    fn message(self, ctx: Self::Ctx, span: Span) -> Message;
1008
1009    /// Whether it is safe to SQL cancel this stage.
1010    fn cancel_enabled(&self) -> bool;
1011}
1012
1013pub trait StagedContext {
1014    fn retire(self, result: Result<ExecuteResponse, AdapterError>);
1015    fn session(&self) -> Option<&Session>;
1016}
1017
1018impl StagedContext for ExecuteContext {
1019    fn retire(self, result: Result<ExecuteResponse, AdapterError>) {
1020        self.retire(result);
1021    }
1022
1023    fn session(&self) -> Option<&Session> {
1024        Some(self.session())
1025    }
1026}
1027
1028impl StagedContext for () {
1029    fn retire(self, _result: Result<ExecuteResponse, AdapterError>) {}
1030
1031    fn session(&self) -> Option<&Session> {
1032        None
1033    }
1034}
1035
1036/// Configures a coordinator.
1037pub struct Config {
1038    pub controller_config: ControllerConfig,
1039    pub controller_envd_epoch: NonZeroI64,
1040    pub storage: Box<dyn mz_catalog::durable::DurableCatalogState>,
1041    pub audit_logs_iterator: AuditLogIterator,
1042    pub timestamp_oracle_url: Option<SensitiveUrl>,
1043    pub unsafe_mode: bool,
1044    pub all_features: bool,
1045    pub build_info: &'static BuildInfo,
1046    pub environment_id: EnvironmentId,
1047    pub metrics_registry: MetricsRegistry,
1048    pub now: NowFn,
1049    pub secrets_controller: Arc<dyn SecretsController>,
1050    pub cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
1051    pub availability_zones: Vec<String>,
1052    pub cluster_replica_sizes: ClusterReplicaSizeMap,
1053    pub builtin_system_cluster_config: BootstrapBuiltinClusterConfig,
1054    pub builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig,
1055    pub builtin_probe_cluster_config: BootstrapBuiltinClusterConfig,
1056    pub builtin_support_cluster_config: BootstrapBuiltinClusterConfig,
1057    pub builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig,
1058    pub system_parameter_defaults: BTreeMap<String, String>,
1059    pub storage_usage_client: StorageUsageClient,
1060    pub storage_usage_collection_interval: Duration,
1061    pub storage_usage_retention_period: Option<Duration>,
1062    pub segment_client: Option<mz_segment::Client>,
1063    pub egress_addresses: Vec<IpNet>,
1064    pub remote_system_parameters: Option<BTreeMap<String, String>>,
1065    pub aws_account_id: Option<String>,
1066    pub aws_privatelink_availability_zones: Option<Vec<String>>,
1067    pub connection_context: ConnectionContext,
1068    pub connection_limit_callback: Box<dyn Fn(u64, u64) -> () + Send + Sync + 'static>,
1069    pub webhook_concurrency_limit: WebhookConcurrencyLimiter,
1070    pub http_host_name: Option<String>,
1071    pub tracing_handle: TracingHandle,
1072    /// Whether or not to start controllers in read-only mode. This is only
1073    /// meant for use during development of read-only clusters and 0dt upgrades
1074    /// and should go away once we have proper orchestration during upgrades.
1075    pub read_only_controllers: bool,
1076
1077    /// A trigger that signals that the current deployment has caught up with a
1078    /// previous deployment. Only used during 0dt deployment, while in read-only
1079    /// mode.
1080    pub caught_up_trigger: Option<Trigger>,
1081
1082    pub helm_chart_version: Option<String>,
1083    pub license_key: ValidatedLicenseKey,
1084    pub external_login_password_mz_system: Option<Password>,
1085    pub force_builtin_schema_migration: Option<String>,
1086}
1087
1088/// Metadata about an active connection.
1089#[derive(Debug, Serialize)]
1090pub struct ConnMeta {
1091    /// Pgwire specifies that every connection have a 32-bit secret associated
1092    /// with it, that is known to both the client and the server. Cancellation
1093    /// requests are required to authenticate with the secret of the connection
1094    /// that they are targeting.
1095    secret_key: u32,
1096    /// The time when the session's connection was initiated.
1097    connected_at: EpochMillis,
1098    user: User,
1099    application_name: String,
1100    uuid: Uuid,
1101    conn_id: ConnectionId,
1102    client_ip: Option<IpAddr>,
1103
1104    /// Sinks that will need to be dropped when the current transaction, if
1105    /// any, is cleared.
1106    drop_sinks: BTreeSet<GlobalId>,
1107
1108    /// Lock for the Coordinator's deferred statements that is dropped on transaction clear.
1109    #[serde(skip)]
1110    deferred_lock: Option<OwnedMutexGuard<()>>,
1111
1112    /// Cluster reconfigurations that will need to be
1113    /// cleaned up when the current transaction is cleared
1114    pending_cluster_alters: BTreeSet<ClusterId>,
1115
1116    /// Channel on which to send notices to a session.
1117    #[serde(skip)]
1118    notice_tx: mpsc::UnboundedSender<AdapterNotice>,
1119
1120    /// The role that initiated the database context. Fixed for the duration of the connection.
1121    /// WARNING: This role reference is not updated when the role is dropped.
1122    /// Consumers should not assume that this role exist.
1123    authenticated_role: RoleId,
1124}
1125
1126impl ConnMeta {
1127    pub fn conn_id(&self) -> &ConnectionId {
1128        &self.conn_id
1129    }
1130
1131    pub fn user(&self) -> &User {
1132        &self.user
1133    }
1134
1135    pub fn application_name(&self) -> &str {
1136        &self.application_name
1137    }
1138
1139    pub fn authenticated_role_id(&self) -> &RoleId {
1140        &self.authenticated_role
1141    }
1142
1143    pub fn uuid(&self) -> Uuid {
1144        self.uuid
1145    }
1146
1147    pub fn client_ip(&self) -> Option<IpAddr> {
1148        self.client_ip
1149    }
1150
1151    pub fn connected_at(&self) -> EpochMillis {
1152        self.connected_at
1153    }
1154}
1155
1156#[derive(Debug)]
1157/// A pending transaction waiting to be committed.
1158pub struct PendingTxn {
1159    /// Context used to send a response back to the client.
1160    ctx: ExecuteContext,
1161    /// Client response for transaction.
1162    response: Result<PendingTxnResponse, AdapterError>,
1163    /// The action to take at the end of the transaction.
1164    action: EndTransactionAction,
1165}
1166
1167#[derive(Debug)]
1168/// The response we'll send for a [`PendingTxn`].
1169pub enum PendingTxnResponse {
1170    /// The transaction will be committed.
1171    Committed {
1172        /// Parameters that will change, and their values, once this transaction is complete.
1173        params: BTreeMap<&'static str, String>,
1174    },
1175    /// The transaction will be rolled back.
1176    Rolledback {
1177        /// Parameters that will change, and their values, once this transaction is complete.
1178        params: BTreeMap<&'static str, String>,
1179    },
1180}
1181
1182impl PendingTxnResponse {
1183    pub fn extend_params(&mut self, p: impl IntoIterator<Item = (&'static str, String)>) {
1184        match self {
1185            PendingTxnResponse::Committed { params }
1186            | PendingTxnResponse::Rolledback { params } => params.extend(p),
1187        }
1188    }
1189}
1190
1191impl From<PendingTxnResponse> for ExecuteResponse {
1192    fn from(value: PendingTxnResponse) -> Self {
1193        match value {
1194            PendingTxnResponse::Committed { params } => {
1195                ExecuteResponse::TransactionCommitted { params }
1196            }
1197            PendingTxnResponse::Rolledback { params } => {
1198                ExecuteResponse::TransactionRolledBack { params }
1199            }
1200        }
1201    }
1202}
1203
1204#[derive(Debug)]
1205/// A pending read transaction waiting to be linearized along with metadata about it's state
1206pub struct PendingReadTxn {
1207    /// The transaction type
1208    txn: PendingRead,
1209    /// The timestamp context of the transaction.
1210    timestamp_context: TimestampContext<mz_repr::Timestamp>,
1211    /// When we created this pending txn, when the transaction ends. Only used for metrics.
1212    created: Instant,
1213    /// Number of times we requeued the processing of this pending read txn.
1214    /// Requeueing is necessary if the time we executed the query is after the current oracle time;
1215    /// see [`Coordinator::message_linearize_reads`] for more details.
1216    num_requeues: u64,
1217    /// Telemetry context.
1218    otel_ctx: OpenTelemetryContext,
1219}
1220
1221impl PendingReadTxn {
1222    /// Return the timestamp context of the pending read transaction.
1223    pub fn timestamp_context(&self) -> &TimestampContext<mz_repr::Timestamp> {
1224        &self.timestamp_context
1225    }
1226
1227    pub(crate) fn take_context(self) -> ExecuteContext {
1228        self.txn.take_context()
1229    }
1230}
1231
1232#[derive(Debug)]
1233/// A pending read transaction waiting to be linearized.
1234enum PendingRead {
1235    Read {
1236        /// The inner transaction.
1237        txn: PendingTxn,
1238    },
1239    ReadThenWrite {
1240        /// Context used to send a response back to the client.
1241        ctx: ExecuteContext,
1242        /// Channel used to alert the transaction that the read has been linearized and send back
1243        /// `ctx`.
1244        tx: oneshot::Sender<Option<ExecuteContext>>,
1245    },
1246}
1247
1248impl PendingRead {
1249    /// Alert the client that the read has been linearized.
1250    ///
1251    /// If it is necessary to finalize an execute, return the state necessary to do so
1252    /// (execution context and result)
1253    #[instrument(level = "debug")]
1254    pub fn finish(self) -> Option<(ExecuteContext, Result<ExecuteResponse, AdapterError>)> {
1255        match self {
1256            PendingRead::Read {
1257                txn:
1258                    PendingTxn {
1259                        mut ctx,
1260                        response,
1261                        action,
1262                    },
1263                ..
1264            } => {
1265                let changed = ctx.session_mut().vars_mut().end_transaction(action);
1266                // Append any parameters that changed to the response.
1267                let response = response.map(|mut r| {
1268                    r.extend_params(changed);
1269                    ExecuteResponse::from(r)
1270                });
1271
1272                Some((ctx, response))
1273            }
1274            PendingRead::ReadThenWrite { ctx, tx, .. } => {
1275                // Ignore errors if the caller has hung up.
1276                let _ = tx.send(Some(ctx));
1277                None
1278            }
1279        }
1280    }
1281
1282    fn label(&self) -> &'static str {
1283        match self {
1284            PendingRead::Read { .. } => "read",
1285            PendingRead::ReadThenWrite { .. } => "read_then_write",
1286        }
1287    }
1288
1289    pub(crate) fn take_context(self) -> ExecuteContext {
1290        match self {
1291            PendingRead::Read { txn, .. } => txn.ctx,
1292            PendingRead::ReadThenWrite { ctx, tx, .. } => {
1293                // Inform the transaction that we've taken their context.
1294                // Ignore errors if the caller has hung up.
1295                let _ = tx.send(None);
1296                ctx
1297            }
1298        }
1299    }
1300}
1301
1302/// State that the coordinator must process as part of retiring
1303/// command execution.  `ExecuteContextExtra::Default` is guaranteed
1304/// to produce a value that will cause the coordinator to do nothing, and
1305/// is intended for use by code that invokes the execution processing flow
1306/// (i.e., `sequence_plan`) without actually being a statement execution.
1307///
1308/// This struct must not be dropped if it contains non-trivial
1309/// state. The only valid way to get rid of it is to pass it to the
1310/// coordinator for retirement. To enforce this, we assert in the
1311/// `Drop` implementation.
1312#[derive(Debug, Default)]
1313#[must_use]
1314pub struct ExecuteContextExtra {
1315    statement_uuid: Option<StatementLoggingId>,
1316}
1317
1318impl ExecuteContextExtra {
1319    pub(crate) fn new(statement_uuid: Option<StatementLoggingId>) -> Self {
1320        Self { statement_uuid }
1321    }
1322    pub fn is_trivial(&self) -> bool {
1323        let Self { statement_uuid } = self;
1324        statement_uuid.is_none()
1325    }
1326    pub fn contents(&self) -> Option<StatementLoggingId> {
1327        let Self { statement_uuid } = self;
1328        *statement_uuid
1329    }
1330    /// Take responsibility for the contents.  This should only be
1331    /// called from code that knows what to do to finish up logging
1332    /// based on the inner value.
1333    #[must_use]
1334    fn retire(mut self) -> Option<StatementLoggingId> {
1335        let Self { statement_uuid } = &mut self;
1336        statement_uuid.take()
1337    }
1338}
1339
1340impl Drop for ExecuteContextExtra {
1341    fn drop(&mut self) {
1342        let Self { statement_uuid } = &*self;
1343        if let Some(statement_uuid) = statement_uuid {
1344            // Note: the impact when this error hits
1345            // is that the statement will never be marked
1346            // as finished in the statement log.
1347            soft_panic_or_log!(
1348                "execute context for statement {statement_uuid:?} dropped without being properly retired."
1349            );
1350        }
1351    }
1352}
1353
1354/// Bundle of state related to statement execution.
1355///
1356/// This struct collects a bundle of state that needs to be threaded
1357/// through various functions as part of statement execution.
1358/// Currently, it is only used to finalize execution, by calling one
1359/// of the methods `retire` or `retire_aysnc`. Finalizing execution
1360/// involves sending the session back to the pgwire layer so that it
1361/// may be used to process further commands. In the future, it will
1362/// also involve performing some work on the main coordinator thread
1363/// (e.g., recording the time at which the statement finished
1364/// executing) the state necessary to perform this work is bundled in
1365/// the `ExecuteContextExtra` object (today, it is simply empty).
1366#[derive(Debug)]
1367pub struct ExecuteContext {
1368    inner: Box<ExecuteContextInner>,
1369}
1370
1371impl std::ops::Deref for ExecuteContext {
1372    type Target = ExecuteContextInner;
1373    fn deref(&self) -> &Self::Target {
1374        &*self.inner
1375    }
1376}
1377
1378impl std::ops::DerefMut for ExecuteContext {
1379    fn deref_mut(&mut self) -> &mut Self::Target {
1380        &mut *self.inner
1381    }
1382}
1383
1384#[derive(Debug)]
1385pub struct ExecuteContextInner {
1386    tx: ClientTransmitter<ExecuteResponse>,
1387    internal_cmd_tx: mpsc::UnboundedSender<Message>,
1388    session: Session,
1389    extra: ExecuteContextExtra,
1390}
1391
1392impl ExecuteContext {
1393    pub fn session(&self) -> &Session {
1394        &self.session
1395    }
1396
1397    pub fn session_mut(&mut self) -> &mut Session {
1398        &mut self.session
1399    }
1400
1401    pub fn tx(&self) -> &ClientTransmitter<ExecuteResponse> {
1402        &self.tx
1403    }
1404
1405    pub fn tx_mut(&mut self) -> &mut ClientTransmitter<ExecuteResponse> {
1406        &mut self.tx
1407    }
1408
1409    pub fn from_parts(
1410        tx: ClientTransmitter<ExecuteResponse>,
1411        internal_cmd_tx: mpsc::UnboundedSender<Message>,
1412        session: Session,
1413        extra: ExecuteContextExtra,
1414    ) -> Self {
1415        Self {
1416            inner: ExecuteContextInner {
1417                tx,
1418                session,
1419                extra,
1420                internal_cmd_tx,
1421            }
1422            .into(),
1423        }
1424    }
1425
1426    /// By calling this function, the caller takes responsibility for
1427    /// dealing with the instance of `ExecuteContextExtra`. This is
1428    /// intended to support protocols (like `COPY FROM`) that involve
1429    /// multiple passes of sending the session back and forth between
1430    /// the coordinator and the pgwire layer. As part of any such
1431    /// protocol, we must ensure that the `ExecuteContextExtra`
1432    /// (possibly wrapped in a new `ExecuteContext`) is passed back to the coordinator for
1433    /// eventual retirement.
1434    pub fn into_parts(
1435        self,
1436    ) -> (
1437        ClientTransmitter<ExecuteResponse>,
1438        mpsc::UnboundedSender<Message>,
1439        Session,
1440        ExecuteContextExtra,
1441    ) {
1442        let ExecuteContextInner {
1443            tx,
1444            internal_cmd_tx,
1445            session,
1446            extra,
1447        } = *self.inner;
1448        (tx, internal_cmd_tx, session, extra)
1449    }
1450
1451    /// Retire the execution, by sending a message to the coordinator.
1452    #[instrument(level = "debug")]
1453    pub fn retire(self, result: Result<ExecuteResponse, AdapterError>) {
1454        let ExecuteContextInner {
1455            tx,
1456            internal_cmd_tx,
1457            session,
1458            extra,
1459        } = *self.inner;
1460        let reason = if extra.is_trivial() {
1461            None
1462        } else {
1463            Some((&result).into())
1464        };
1465        tx.send(result, session);
1466        if let Some(reason) = reason {
1467            if let Err(e) = internal_cmd_tx.send(Message::RetireExecute {
1468                otel_ctx: OpenTelemetryContext::obtain(),
1469                data: extra,
1470                reason,
1471            }) {
1472                warn!("internal_cmd_rx dropped before we could send: {:?}", e);
1473            }
1474        }
1475    }
1476
1477    pub fn extra(&self) -> &ExecuteContextExtra {
1478        &self.extra
1479    }
1480
1481    pub fn extra_mut(&mut self) -> &mut ExecuteContextExtra {
1482        &mut self.extra
1483    }
1484}
1485
1486#[derive(Debug)]
1487struct ClusterReplicaStatuses(
1488    BTreeMap<ClusterId, BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>>>,
1489);
1490
1491impl ClusterReplicaStatuses {
1492    pub(crate) fn new() -> ClusterReplicaStatuses {
1493        ClusterReplicaStatuses(BTreeMap::new())
1494    }
1495
1496    /// Initializes the statuses of the specified cluster.
1497    ///
1498    /// Panics if the cluster statuses are already initialized.
1499    pub(crate) fn initialize_cluster_statuses(&mut self, cluster_id: ClusterId) {
1500        let prev = self.0.insert(cluster_id, BTreeMap::new());
1501        assert_eq!(
1502            prev, None,
1503            "cluster {cluster_id} statuses already initialized"
1504        );
1505    }
1506
1507    /// Initializes the statuses of the specified cluster replica.
1508    ///
1509    /// Panics if the cluster replica statuses are already initialized.
1510    pub(crate) fn initialize_cluster_replica_statuses(
1511        &mut self,
1512        cluster_id: ClusterId,
1513        replica_id: ReplicaId,
1514        num_processes: usize,
1515        time: DateTime<Utc>,
1516    ) {
1517        tracing::info!(
1518            ?cluster_id,
1519            ?replica_id,
1520            ?time,
1521            "initializing cluster replica status"
1522        );
1523        let replica_statuses = self.0.entry(cluster_id).or_default();
1524        let process_statuses = (0..num_processes)
1525            .map(|process_id| {
1526                let status = ClusterReplicaProcessStatus {
1527                    status: ClusterStatus::Offline(Some(OfflineReason::Initializing)),
1528                    time: time.clone(),
1529                };
1530                (u64::cast_from(process_id), status)
1531            })
1532            .collect();
1533        let prev = replica_statuses.insert(replica_id, process_statuses);
1534        assert_none!(
1535            prev,
1536            "cluster replica {cluster_id}.{replica_id} statuses already initialized"
1537        );
1538    }
1539
1540    /// Removes the statuses of the specified cluster.
1541    ///
1542    /// Panics if the cluster does not exist.
1543    pub(crate) fn remove_cluster_statuses(
1544        &mut self,
1545        cluster_id: &ClusterId,
1546    ) -> BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>> {
1547        let prev = self.0.remove(cluster_id);
1548        prev.unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"))
1549    }
1550
1551    /// Removes the statuses of the specified cluster replica.
1552    ///
1553    /// Panics if the cluster or replica does not exist.
1554    pub(crate) fn remove_cluster_replica_statuses(
1555        &mut self,
1556        cluster_id: &ClusterId,
1557        replica_id: &ReplicaId,
1558    ) -> BTreeMap<ProcessId, ClusterReplicaProcessStatus> {
1559        let replica_statuses = self
1560            .0
1561            .get_mut(cluster_id)
1562            .unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"));
1563        let prev = replica_statuses.remove(replica_id);
1564        prev.unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1565    }
1566
1567    /// Inserts or updates the status of the specified cluster replica process.
1568    ///
1569    /// Panics if the cluster or replica does not exist.
1570    pub(crate) fn ensure_cluster_status(
1571        &mut self,
1572        cluster_id: ClusterId,
1573        replica_id: ReplicaId,
1574        process_id: ProcessId,
1575        status: ClusterReplicaProcessStatus,
1576    ) {
1577        let replica_statuses = self
1578            .0
1579            .get_mut(&cluster_id)
1580            .unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"))
1581            .get_mut(&replica_id)
1582            .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"));
1583        replica_statuses.insert(process_id, status);
1584    }
1585
1586    /// Computes the status of the cluster replica as a whole.
1587    ///
1588    /// Panics if `cluster_id` or `replica_id` don't exist.
1589    pub fn get_cluster_replica_status(
1590        &self,
1591        cluster_id: ClusterId,
1592        replica_id: ReplicaId,
1593    ) -> ClusterStatus {
1594        let process_status = self.get_cluster_replica_statuses(cluster_id, replica_id);
1595        Self::cluster_replica_status(process_status)
1596    }
1597
1598    /// Computes the status of the cluster replica as a whole.
1599    pub fn cluster_replica_status(
1600        process_status: &BTreeMap<ProcessId, ClusterReplicaProcessStatus>,
1601    ) -> ClusterStatus {
1602        process_status
1603            .values()
1604            .fold(ClusterStatus::Online, |s, p| match (s, p.status) {
1605                (ClusterStatus::Online, ClusterStatus::Online) => ClusterStatus::Online,
1606                (x, y) => {
1607                    let reason_x = match x {
1608                        ClusterStatus::Offline(reason) => reason,
1609                        ClusterStatus::Online => None,
1610                    };
1611                    let reason_y = match y {
1612                        ClusterStatus::Offline(reason) => reason,
1613                        ClusterStatus::Online => None,
1614                    };
1615                    // Arbitrarily pick the first known not-ready reason.
1616                    ClusterStatus::Offline(reason_x.or(reason_y))
1617                }
1618            })
1619    }
1620
1621    /// Gets the statuses of the given cluster replica.
1622    ///
1623    /// Panics if the cluster or replica does not exist
1624    pub(crate) fn get_cluster_replica_statuses(
1625        &self,
1626        cluster_id: ClusterId,
1627        replica_id: ReplicaId,
1628    ) -> &BTreeMap<ProcessId, ClusterReplicaProcessStatus> {
1629        self.try_get_cluster_replica_statuses(cluster_id, replica_id)
1630            .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1631    }
1632
1633    /// Gets the statuses of the given cluster replica.
1634    pub(crate) fn try_get_cluster_replica_statuses(
1635        &self,
1636        cluster_id: ClusterId,
1637        replica_id: ReplicaId,
1638    ) -> Option<&BTreeMap<ProcessId, ClusterReplicaProcessStatus>> {
1639        self.try_get_cluster_statuses(cluster_id)
1640            .and_then(|statuses| statuses.get(&replica_id))
1641    }
1642
1643    /// Gets the statuses of the given cluster.
1644    pub(crate) fn try_get_cluster_statuses(
1645        &self,
1646        cluster_id: ClusterId,
1647    ) -> Option<&BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>>> {
1648        self.0.get(&cluster_id)
1649    }
1650}
1651
1652/// Glues the external world to the Timely workers.
1653#[derive(Derivative)]
1654#[derivative(Debug)]
1655pub struct Coordinator {
1656    /// The controller for the storage and compute layers.
1657    #[derivative(Debug = "ignore")]
1658    controller: mz_controller::Controller,
1659    /// The catalog in an Arc suitable for readonly references. The Arc allows
1660    /// us to hand out cheap copies of the catalog to functions that can use it
1661    /// off of the main coordinator thread. If the coordinator needs to mutate
1662    /// the catalog, call [`Self::catalog_mut`], which will clone this struct member,
1663    /// allowing it to be mutated here while the other off-thread references can
1664    /// read their catalog as long as needed. In the future we would like this
1665    /// to be a pTVC, but for now this is sufficient.
1666    catalog: Arc<Catalog>,
1667
1668    /// A client for persist. Initially, this is only used for reading stashed
1669    /// peek responses out of batches.
1670    persist_client: PersistClient,
1671
1672    /// Channel to manage internal commands from the coordinator to itself.
1673    internal_cmd_tx: mpsc::UnboundedSender<Message>,
1674    /// Notification that triggers a group commit.
1675    group_commit_tx: appends::GroupCommitNotifier,
1676
1677    /// Channel for strict serializable reads ready to commit.
1678    strict_serializable_reads_tx: mpsc::UnboundedSender<(ConnectionId, PendingReadTxn)>,
1679
1680    /// Mechanism for totally ordering write and read timestamps, so that all reads
1681    /// reflect exactly the set of writes that precede them, and no writes that follow.
1682    global_timelines: BTreeMap<Timeline, TimelineState<Timestamp>>,
1683
1684    /// A generator for transient [`GlobalId`]s, shareable with other threads.
1685    transient_id_gen: Arc<TransientIdGen>,
1686    /// A map from connection ID to metadata about that connection for all
1687    /// active connections.
1688    active_conns: BTreeMap<ConnectionId, ConnMeta>,
1689
1690    /// For each transaction, the read holds taken to support any performed reads.
1691    ///
1692    /// Upon completing a transaction, these read holds should be dropped.
1693    txn_read_holds: BTreeMap<ConnectionId, read_policy::ReadHolds<Timestamp>>,
1694
1695    /// Access to the peek fields should be restricted to methods in the [`peek`] API.
1696    /// A map from pending peek ids to the queue into which responses are sent, and
1697    /// the connection id of the client that initiated the peek.
1698    pending_peeks: BTreeMap<Uuid, PendingPeek>,
1699    /// A map from client connection ids to a set of all pending peeks for that client.
1700    client_pending_peeks: BTreeMap<ConnectionId, BTreeMap<Uuid, ClusterId>>,
1701
1702    /// A map from client connection ids to pending linearize read transaction.
1703    pending_linearize_read_txns: BTreeMap<ConnectionId, PendingReadTxn>,
1704
1705    /// A map from the compute sink ID to it's state description.
1706    active_compute_sinks: BTreeMap<GlobalId, ActiveComputeSink>,
1707    /// A map from active webhooks to their invalidation handle.
1708    active_webhooks: BTreeMap<CatalogItemId, WebhookAppenderInvalidator>,
1709    /// A map of active `COPY FROM` statements. The Coordinator waits for `clusterd`
1710    /// to stage Batches in Persist that we will then link into the shard.
1711    active_copies: BTreeMap<ConnectionId, ActiveCopyFrom>,
1712
1713    /// A map from connection ids to a watch channel that is set to `true` if the connection
1714    /// received a cancel request.
1715    staged_cancellation: BTreeMap<ConnectionId, (watch::Sender<bool>, watch::Receiver<bool>)>,
1716    /// Active introspection subscribes.
1717    introspection_subscribes: BTreeMap<GlobalId, IntrospectionSubscribe>,
1718
1719    /// Locks that grant access to a specific object, populated lazily as objects are written to.
1720    write_locks: BTreeMap<CatalogItemId, Arc<tokio::sync::Mutex<()>>>,
1721    /// Plans that are currently deferred and waiting on a write lock.
1722    deferred_write_ops: BTreeMap<ConnectionId, DeferredOp>,
1723
1724    /// Pending writes waiting for a group commit.
1725    pending_writes: Vec<PendingWriteTxn>,
1726
1727    /// For the realtime timeline, an explicit SELECT or INSERT on a table will bump the
1728    /// table's timestamps, but there are cases where timestamps are not bumped but
1729    /// we expect the closed timestamps to advance (`AS OF X`, SUBSCRIBing views over
1730    /// RT sources and tables). To address these, spawn a task that forces table
1731    /// timestamps to close on a regular interval. This roughly tracks the behavior
1732    /// of realtime sources that close off timestamps on an interval.
1733    ///
1734    /// For non-realtime timelines, nothing pushes the timestamps forward, so we must do
1735    /// it manually.
1736    advance_timelines_interval: Interval,
1737
1738    /// Serialized DDL. DDL must be serialized because:
1739    /// - Many of them do off-thread work and need to verify the catalog is in a valid state, but
1740    ///   [`PlanValidity`] does not currently support tracking all changes. Doing that correctly
1741    ///   seems to be more difficult than it's worth, so we would instead re-plan and re-sequence
1742    ///   the statements.
1743    /// - Re-planning a statement is hard because Coordinator and Session state is mutated at
1744    ///   various points, and we would need to correctly reset those changes before re-planning and
1745    ///   re-sequencing.
1746    serialized_ddl: LockedVecDeque<DeferredPlanStatement>,
1747
1748    /// Handle to secret manager that can create and delete secrets from
1749    /// an arbitrary secret storage engine.
1750    secrets_controller: Arc<dyn SecretsController>,
1751    /// A secrets reader than maintains an in-memory cache, where values have a set TTL.
1752    caching_secrets_reader: CachingSecretsReader,
1753
1754    /// Handle to a manager that can create and delete kubernetes resources
1755    /// (ie: VpcEndpoint objects)
1756    cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
1757
1758    /// Persist client for fetching storage metadata such as size metrics.
1759    storage_usage_client: StorageUsageClient,
1760    /// The interval at which to collect storage usage information.
1761    storage_usage_collection_interval: Duration,
1762
1763    /// Segment analytics client.
1764    #[derivative(Debug = "ignore")]
1765    segment_client: Option<mz_segment::Client>,
1766
1767    /// Coordinator metrics.
1768    metrics: Metrics,
1769    /// Optimizer metrics.
1770    optimizer_metrics: OptimizerMetrics,
1771
1772    /// Tracing handle.
1773    tracing_handle: TracingHandle,
1774
1775    /// Data used by the statement logging feature.
1776    statement_logging: StatementLogging,
1777
1778    /// Limit for how many concurrent webhook requests we allow.
1779    webhook_concurrency_limit: WebhookConcurrencyLimiter,
1780
1781    /// Optional config for the Postgres-backed timestamp oracle. This is
1782    /// _required_ when `postgres` is configured using the `timestamp_oracle`
1783    /// system variable.
1784    pg_timestamp_oracle_config: Option<PostgresTimestampOracleConfig>,
1785
1786    /// Periodically asks cluster scheduling policies to make their decisions.
1787    check_cluster_scheduling_policies_interval: Interval,
1788
1789    /// This keeps the last On/Off decision for each cluster and each scheduling policy.
1790    /// (Clusters that have been dropped or are otherwise out of scope for automatic scheduling are
1791    /// periodically cleaned up from this Map.)
1792    cluster_scheduling_decisions: BTreeMap<ClusterId, BTreeMap<&'static str, SchedulingDecision>>,
1793
1794    /// When doing 0dt upgrades/in read-only mode, periodically ask all known
1795    /// clusters/collections whether they are caught up.
1796    caught_up_check_interval: Interval,
1797
1798    /// Context needed to check whether all clusters/collections have caught up.
1799    /// Only used during 0dt deployment, while in read-only mode.
1800    caught_up_check: Option<CaughtUpCheckContext>,
1801
1802    /// Tracks the state associated with the currently installed watchsets.
1803    installed_watch_sets: BTreeMap<WatchSetId, (ConnectionId, WatchSetResponse)>,
1804
1805    /// Tracks the currently installed watchsets for each connection.
1806    connection_watch_sets: BTreeMap<ConnectionId, BTreeSet<WatchSetId>>,
1807
1808    /// Tracks the statuses of all cluster replicas.
1809    cluster_replica_statuses: ClusterReplicaStatuses,
1810
1811    /// Whether or not to start controllers in read-only mode. This is only
1812    /// meant for use during development of read-only clusters and 0dt upgrades
1813    /// and should go away once we have proper orchestration during upgrades.
1814    read_only_controllers: bool,
1815
1816    /// Updates to builtin tables that are being buffered while we are in
1817    /// read-only mode. We apply these all at once when coming out of read-only
1818    /// mode.
1819    ///
1820    /// This is a `Some` while in read-only mode and will be replaced by a
1821    /// `None` when we transition out of read-only mode and write out any
1822    /// buffered updates.
1823    buffered_builtin_table_updates: Option<Vec<BuiltinTableUpdate>>,
1824
1825    license_key: ValidatedLicenseKey,
1826}
1827
1828impl Coordinator {
1829    /// Initializes coordinator state based on the contained catalog. Must be
1830    /// called after creating the coordinator and before calling the
1831    /// `Coordinator::serve` method.
1832    #[instrument(name = "coord::bootstrap")]
1833    pub(crate) async fn bootstrap(
1834        &mut self,
1835        boot_ts: Timestamp,
1836        migrated_storage_collections_0dt: BTreeSet<CatalogItemId>,
1837        mut builtin_table_updates: Vec<BuiltinTableUpdate>,
1838        cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
1839        uncached_local_exprs: BTreeMap<GlobalId, LocalExpressions>,
1840        audit_logs_iterator: AuditLogIterator,
1841    ) -> Result<(), AdapterError> {
1842        let bootstrap_start = Instant::now();
1843        info!("startup: coordinator init: bootstrap beginning");
1844        info!("startup: coordinator init: bootstrap: preamble beginning");
1845
1846        // Initialize cluster replica statuses.
1847        // Gross iterator is to avoid partial borrow issues.
1848        let cluster_statuses: Vec<(_, Vec<_>)> = self
1849            .catalog()
1850            .clusters()
1851            .map(|cluster| {
1852                (
1853                    cluster.id(),
1854                    cluster
1855                        .replicas()
1856                        .map(|replica| {
1857                            (replica.replica_id, replica.config.location.num_processes())
1858                        })
1859                        .collect(),
1860                )
1861            })
1862            .collect();
1863        let now = self.now_datetime();
1864        for (cluster_id, replica_statuses) in cluster_statuses {
1865            self.cluster_replica_statuses
1866                .initialize_cluster_statuses(cluster_id);
1867            for (replica_id, num_processes) in replica_statuses {
1868                self.cluster_replica_statuses
1869                    .initialize_cluster_replica_statuses(
1870                        cluster_id,
1871                        replica_id,
1872                        num_processes,
1873                        now,
1874                    );
1875            }
1876        }
1877
1878        let system_config = self.catalog().system_config();
1879
1880        // Inform metrics about the initial system configuration.
1881        mz_metrics::update_dyncfg(&system_config.dyncfg_updates());
1882
1883        // Inform the controllers about their initial configuration.
1884        let compute_config = flags::compute_config(system_config);
1885        let storage_config = flags::storage_config(system_config);
1886        let scheduling_config = flags::orchestrator_scheduling_config(system_config);
1887        let dyncfg_updates = system_config.dyncfg_updates();
1888        self.controller.compute.update_configuration(compute_config);
1889        self.controller.storage.update_parameters(storage_config);
1890        self.controller
1891            .update_orchestrator_scheduling_config(scheduling_config);
1892        self.controller.update_configuration(dyncfg_updates);
1893
1894        self.validate_resource_limit_numeric(
1895            Numeric::zero(),
1896            self.current_credit_consumption_rate(),
1897            |system_vars| {
1898                self.license_key
1899                    .max_credit_consumption_rate()
1900                    .map_or_else(|| system_vars.max_credit_consumption_rate(), Numeric::from)
1901            },
1902            "cluster replica",
1903            MAX_CREDIT_CONSUMPTION_RATE.name(),
1904        )?;
1905
1906        let mut policies_to_set: BTreeMap<CompactionWindow, CollectionIdBundle> =
1907            Default::default();
1908
1909        let enable_worker_core_affinity =
1910            self.catalog().system_config().enable_worker_core_affinity();
1911        for instance in self.catalog.clusters() {
1912            self.controller.create_cluster(
1913                instance.id,
1914                ClusterConfig {
1915                    arranged_logs: instance.log_indexes.clone(),
1916                    workload_class: instance.config.workload_class.clone(),
1917                },
1918            )?;
1919            for replica in instance.replicas() {
1920                let role = instance.role();
1921                self.controller.create_replica(
1922                    instance.id,
1923                    replica.replica_id,
1924                    instance.name.clone(),
1925                    replica.name.clone(),
1926                    role,
1927                    replica.config.clone(),
1928                    enable_worker_core_affinity,
1929                )?;
1930            }
1931        }
1932
1933        info!(
1934            "startup: coordinator init: bootstrap: preamble complete in {:?}",
1935            bootstrap_start.elapsed()
1936        );
1937
1938        let init_storage_collections_start = Instant::now();
1939        info!("startup: coordinator init: bootstrap: storage collections init beginning");
1940        self.bootstrap_storage_collections(&migrated_storage_collections_0dt)
1941            .await;
1942        info!(
1943            "startup: coordinator init: bootstrap: storage collections init complete in {:?}",
1944            init_storage_collections_start.elapsed()
1945        );
1946
1947        // The storage controller knows about the introspection collections now, so we can start
1948        // sinking introspection updates in the compute controller. It makes sense to do that as
1949        // soon as possible, to avoid updates piling up in the compute controller's internal
1950        // buffers.
1951        self.controller.start_compute_introspection_sink();
1952
1953        let optimize_dataflows_start = Instant::now();
1954        info!("startup: coordinator init: bootstrap: optimize dataflow plans beginning");
1955        let entries: Vec<_> = self.catalog().entries().cloned().collect();
1956        let uncached_global_exps = self.bootstrap_dataflow_plans(&entries, cached_global_exprs)?;
1957        info!(
1958            "startup: coordinator init: bootstrap: optimize dataflow plans complete in {:?}",
1959            optimize_dataflows_start.elapsed()
1960        );
1961
1962        // We don't need to wait for the cache to update.
1963        let _fut = self.catalog().update_expression_cache(
1964            uncached_local_exprs.into_iter().collect(),
1965            uncached_global_exps.into_iter().collect(),
1966        );
1967
1968        // Select dataflow as-ofs. This step relies on the storage collections created by
1969        // `bootstrap_storage_collections` and the dataflow plans created by
1970        // `bootstrap_dataflow_plans`.
1971        let bootstrap_as_ofs_start = Instant::now();
1972        info!("startup: coordinator init: bootstrap: dataflow as-of bootstrapping beginning");
1973        let dataflow_read_holds = self.bootstrap_dataflow_as_ofs().await;
1974        info!(
1975            "startup: coordinator init: bootstrap: dataflow as-of bootstrapping complete in {:?}",
1976            bootstrap_as_ofs_start.elapsed()
1977        );
1978
1979        let postamble_start = Instant::now();
1980        info!("startup: coordinator init: bootstrap: postamble beginning");
1981
1982        let logs: BTreeSet<_> = BUILTINS::logs()
1983            .map(|log| self.catalog().resolve_builtin_log(log))
1984            .flat_map(|item_id| self.catalog().get_global_ids(&item_id))
1985            .collect();
1986
1987        let mut privatelink_connections = BTreeMap::new();
1988
1989        for entry in &entries {
1990            debug!(
1991                "coordinator init: installing {} {}",
1992                entry.item().typ(),
1993                entry.id()
1994            );
1995            let mut policy = entry.item().initial_logical_compaction_window();
1996            match entry.item() {
1997                // Currently catalog item rebuild assumes that sinks and
1998                // indexes are always built individually and does not store information
1999                // about how it was built. If we start building multiple sinks and/or indexes
2000                // using a single dataflow, we have to make sure the rebuild process re-runs
2001                // the same multiple-build dataflow.
2002                CatalogItem::Source(source) => {
2003                    // Propagate source compaction windows to subsources if needed.
2004                    if source.custom_logical_compaction_window.is_none() {
2005                        if let DataSourceDesc::IngestionExport { ingestion_id, .. } =
2006                            source.data_source
2007                        {
2008                            policy = Some(
2009                                self.catalog()
2010                                    .get_entry(&ingestion_id)
2011                                    .source()
2012                                    .expect("must be source")
2013                                    .custom_logical_compaction_window
2014                                    .unwrap_or_default(),
2015                            );
2016                        }
2017                    }
2018                    policies_to_set
2019                        .entry(policy.expect("sources have a compaction window"))
2020                        .or_insert_with(Default::default)
2021                        .storage_ids
2022                        .insert(source.global_id());
2023                }
2024                CatalogItem::Table(table) => {
2025                    policies_to_set
2026                        .entry(policy.expect("tables have a compaction window"))
2027                        .or_insert_with(Default::default)
2028                        .storage_ids
2029                        .extend(table.global_ids());
2030                }
2031                CatalogItem::Index(idx) => {
2032                    let policy_entry = policies_to_set
2033                        .entry(policy.expect("indexes have a compaction window"))
2034                        .or_insert_with(Default::default);
2035
2036                    if logs.contains(&idx.on) {
2037                        policy_entry
2038                            .compute_ids
2039                            .entry(idx.cluster_id)
2040                            .or_insert_with(BTreeSet::new)
2041                            .insert(idx.global_id());
2042                    } else {
2043                        let df_desc = self
2044                            .catalog()
2045                            .try_get_physical_plan(&idx.global_id())
2046                            .expect("added in `bootstrap_dataflow_plans`")
2047                            .clone();
2048
2049                        let df_meta = self
2050                            .catalog()
2051                            .try_get_dataflow_metainfo(&idx.global_id())
2052                            .expect("added in `bootstrap_dataflow_plans`");
2053
2054                        if self.catalog().state().system_config().enable_mz_notices() {
2055                            // Collect optimization hint updates.
2056                            self.catalog().state().pack_optimizer_notices(
2057                                &mut builtin_table_updates,
2058                                df_meta.optimizer_notices.iter(),
2059                                Diff::ONE,
2060                            );
2061                        }
2062
2063                        // What follows is morally equivalent to `self.ship_dataflow(df, idx.cluster_id)`,
2064                        // but we cannot call that as it will also downgrade the read hold on the index.
2065                        policy_entry
2066                            .compute_ids
2067                            .entry(idx.cluster_id)
2068                            .or_insert_with(Default::default)
2069                            .extend(df_desc.export_ids());
2070
2071                        self.controller
2072                            .compute
2073                            .create_dataflow(idx.cluster_id, df_desc, None)
2074                            .unwrap_or_terminate("cannot fail to create dataflows");
2075                    }
2076                }
2077                CatalogItem::View(_) => (),
2078                CatalogItem::MaterializedView(mview) => {
2079                    policies_to_set
2080                        .entry(policy.expect("materialized views have a compaction window"))
2081                        .or_insert_with(Default::default)
2082                        .storage_ids
2083                        .insert(mview.global_id_writes());
2084
2085                    let mut df_desc = self
2086                        .catalog()
2087                        .try_get_physical_plan(&mview.global_id_writes())
2088                        .expect("added in `bootstrap_dataflow_plans`")
2089                        .clone();
2090
2091                    if let Some(initial_as_of) = mview.initial_as_of.clone() {
2092                        df_desc.set_initial_as_of(initial_as_of);
2093                    }
2094
2095                    // If we have a refresh schedule that has a last refresh, then set the `until` to the last refresh.
2096                    let until = mview
2097                        .refresh_schedule
2098                        .as_ref()
2099                        .and_then(|s| s.last_refresh())
2100                        .and_then(|r| r.try_step_forward());
2101                    if let Some(until) = until {
2102                        df_desc.until.meet_assign(&Antichain::from_elem(until));
2103                    }
2104
2105                    let df_meta = self
2106                        .catalog()
2107                        .try_get_dataflow_metainfo(&mview.global_id_writes())
2108                        .expect("added in `bootstrap_dataflow_plans`");
2109
2110                    if self.catalog().state().system_config().enable_mz_notices() {
2111                        // Collect optimization hint updates.
2112                        self.catalog().state().pack_optimizer_notices(
2113                            &mut builtin_table_updates,
2114                            df_meta.optimizer_notices.iter(),
2115                            Diff::ONE,
2116                        );
2117                    }
2118
2119                    self.ship_dataflow(df_desc, mview.cluster_id, None).await;
2120
2121                    // If this is a replacement MV, it must remain read-only until the replacement
2122                    // gets applied.
2123                    if mview.replacement_target.is_none() {
2124                        self.allow_writes(mview.cluster_id, mview.global_id_writes());
2125                    }
2126                }
2127                CatalogItem::Sink(sink) => {
2128                    policies_to_set
2129                        .entry(CompactionWindow::Default)
2130                        .or_insert_with(Default::default)
2131                        .storage_ids
2132                        .insert(sink.global_id());
2133                }
2134                CatalogItem::Connection(catalog_connection) => {
2135                    if let ConnectionDetails::AwsPrivatelink(conn) = &catalog_connection.details {
2136                        privatelink_connections.insert(
2137                            entry.id(),
2138                            VpcEndpointConfig {
2139                                aws_service_name: conn.service_name.clone(),
2140                                availability_zone_ids: conn.availability_zones.clone(),
2141                            },
2142                        );
2143                    }
2144                }
2145                CatalogItem::ContinualTask(ct) => {
2146                    policies_to_set
2147                        .entry(policy.expect("continual tasks have a compaction window"))
2148                        .or_insert_with(Default::default)
2149                        .storage_ids
2150                        .insert(ct.global_id());
2151
2152                    let mut df_desc = self
2153                        .catalog()
2154                        .try_get_physical_plan(&ct.global_id())
2155                        .expect("added in `bootstrap_dataflow_plans`")
2156                        .clone();
2157
2158                    if let Some(initial_as_of) = ct.initial_as_of.clone() {
2159                        df_desc.set_initial_as_of(initial_as_of);
2160                    }
2161
2162                    let df_meta = self
2163                        .catalog()
2164                        .try_get_dataflow_metainfo(&ct.global_id())
2165                        .expect("added in `bootstrap_dataflow_plans`");
2166
2167                    if self.catalog().state().system_config().enable_mz_notices() {
2168                        // Collect optimization hint updates.
2169                        self.catalog().state().pack_optimizer_notices(
2170                            &mut builtin_table_updates,
2171                            df_meta.optimizer_notices.iter(),
2172                            Diff::ONE,
2173                        );
2174                    }
2175
2176                    self.ship_dataflow(df_desc, ct.cluster_id, None).await;
2177                    self.allow_writes(ct.cluster_id, ct.global_id());
2178                }
2179                // Nothing to do for these cases
2180                CatalogItem::Log(_)
2181                | CatalogItem::Type(_)
2182                | CatalogItem::Func(_)
2183                | CatalogItem::Secret(_) => {}
2184            }
2185        }
2186
2187        if let Some(cloud_resource_controller) = &self.cloud_resource_controller {
2188            // Clean up any extraneous VpcEndpoints that shouldn't exist.
2189            let existing_vpc_endpoints = cloud_resource_controller
2190                .list_vpc_endpoints()
2191                .await
2192                .context("list vpc endpoints")?;
2193            let existing_vpc_endpoints = BTreeSet::from_iter(existing_vpc_endpoints.into_keys());
2194            let desired_vpc_endpoints = privatelink_connections.keys().cloned().collect();
2195            let vpc_endpoints_to_remove = existing_vpc_endpoints.difference(&desired_vpc_endpoints);
2196            for id in vpc_endpoints_to_remove {
2197                cloud_resource_controller
2198                    .delete_vpc_endpoint(*id)
2199                    .await
2200                    .context("deleting extraneous vpc endpoint")?;
2201            }
2202
2203            // Ensure desired VpcEndpoints are up to date.
2204            for (id, spec) in privatelink_connections {
2205                cloud_resource_controller
2206                    .ensure_vpc_endpoint(id, spec)
2207                    .await
2208                    .context("ensuring vpc endpoint")?;
2209            }
2210        }
2211
2212        // Having installed all entries, creating all constraints, we can now drop read holds and
2213        // relax read policies.
2214        drop(dataflow_read_holds);
2215        // TODO -- Improve `initialize_read_policies` API so we can avoid calling this in a loop.
2216        for (cw, policies) in policies_to_set {
2217            self.initialize_read_policies(&policies, cw).await;
2218        }
2219
2220        // Expose mapping from T-shirt sizes to actual sizes
2221        builtin_table_updates.extend(
2222            self.catalog().state().resolve_builtin_table_updates(
2223                self.catalog().state().pack_all_replica_size_updates(),
2224            ),
2225        );
2226
2227        debug!("startup: coordinator init: bootstrap: initializing migrated builtin tables");
2228        // When 0dt is enabled, we create new shards for any migrated builtin storage collections.
2229        // In read-only mode, the migrated builtin tables (which are a subset of migrated builtin
2230        // storage collections) need to be back-filled so that any dependent dataflow can be
2231        // hydrated. Additionally, these shards are not registered with the txn-shard, and cannot
2232        // be registered while in read-only, so they are written to directly.
2233        let migrated_updates_fut = if self.controller.read_only() {
2234            let min_timestamp = Timestamp::minimum();
2235            let migrated_builtin_table_updates: Vec<_> = builtin_table_updates
2236                .extract_if(.., |update| {
2237                    let gid = self.catalog().get_entry(&update.id).latest_global_id();
2238                    migrated_storage_collections_0dt.contains(&update.id)
2239                        && self
2240                            .controller
2241                            .storage_collections
2242                            .collection_frontiers(gid)
2243                            .expect("all tables are registered")
2244                            .write_frontier
2245                            .elements()
2246                            == &[min_timestamp]
2247                })
2248                .collect();
2249            if migrated_builtin_table_updates.is_empty() {
2250                futures::future::ready(()).boxed()
2251            } else {
2252                // Group all updates per-table.
2253                let mut grouped_appends: BTreeMap<GlobalId, Vec<TableData>> = BTreeMap::new();
2254                for update in migrated_builtin_table_updates {
2255                    let gid = self.catalog().get_entry(&update.id).latest_global_id();
2256                    grouped_appends.entry(gid).or_default().push(update.data);
2257                }
2258                info!(
2259                    "coordinator init: rehydrating migrated builtin tables in read-only mode: {:?}",
2260                    grouped_appends.keys().collect::<Vec<_>>()
2261                );
2262
2263                // Consolidate Row data, staged batches must already be consolidated.
2264                let mut all_appends = Vec::with_capacity(grouped_appends.len());
2265                for (item_id, table_data) in grouped_appends.into_iter() {
2266                    let mut all_rows = Vec::new();
2267                    let mut all_data = Vec::new();
2268                    for data in table_data {
2269                        match data {
2270                            TableData::Rows(rows) => all_rows.extend(rows),
2271                            TableData::Batches(_) => all_data.push(data),
2272                        }
2273                    }
2274                    differential_dataflow::consolidation::consolidate(&mut all_rows);
2275                    all_data.push(TableData::Rows(all_rows));
2276
2277                    // TODO(parkmycar): Use SmallVec throughout.
2278                    all_appends.push((item_id, all_data));
2279                }
2280
2281                let fut = self
2282                    .controller
2283                    .storage
2284                    .append_table(min_timestamp, boot_ts.step_forward(), all_appends)
2285                    .expect("cannot fail to append");
2286                async {
2287                    fut.await
2288                        .expect("One-shot shouldn't be dropped during bootstrap")
2289                        .unwrap_or_terminate("cannot fail to append")
2290                }
2291                .boxed()
2292            }
2293        } else {
2294            futures::future::ready(()).boxed()
2295        };
2296
2297        info!(
2298            "startup: coordinator init: bootstrap: postamble complete in {:?}",
2299            postamble_start.elapsed()
2300        );
2301
2302        let builtin_update_start = Instant::now();
2303        info!("startup: coordinator init: bootstrap: generate builtin updates beginning");
2304
2305        if self.controller.read_only() {
2306            info!(
2307                "coordinator init: bootstrap: stashing builtin table updates while in read-only mode"
2308            );
2309
2310            // TODO(jkosh44) Optimize deserializing the audit log in read-only mode.
2311            let audit_join_start = Instant::now();
2312            info!("startup: coordinator init: bootstrap: audit log deserialization beginning");
2313            let audit_log_updates: Vec<_> = audit_logs_iterator
2314                .map(|(audit_log, ts)| StateUpdate {
2315                    kind: StateUpdateKind::AuditLog(audit_log),
2316                    ts,
2317                    diff: StateDiff::Addition,
2318                })
2319                .collect();
2320            let audit_log_builtin_table_updates = self
2321                .catalog()
2322                .state()
2323                .generate_builtin_table_updates(audit_log_updates);
2324            builtin_table_updates.extend(audit_log_builtin_table_updates);
2325            info!(
2326                "startup: coordinator init: bootstrap: audit log deserialization complete in {:?}",
2327                audit_join_start.elapsed()
2328            );
2329            self.buffered_builtin_table_updates
2330                .as_mut()
2331                .expect("in read-only mode")
2332                .append(&mut builtin_table_updates);
2333        } else {
2334            self.bootstrap_tables(&entries, builtin_table_updates, audit_logs_iterator)
2335                .await;
2336        };
2337        info!(
2338            "startup: coordinator init: bootstrap: generate builtin updates complete in {:?}",
2339            builtin_update_start.elapsed()
2340        );
2341
2342        let cleanup_secrets_start = Instant::now();
2343        info!("startup: coordinator init: bootstrap: generate secret cleanup beginning");
2344        // Cleanup orphaned secrets. Errors during list() or delete() do not
2345        // need to prevent bootstrap from succeeding; we will retry next
2346        // startup.
2347        {
2348            // Destructure Self so we can selectively move fields into the async
2349            // task.
2350            let Self {
2351                secrets_controller,
2352                catalog,
2353                ..
2354            } = self;
2355
2356            let next_user_item_id = catalog.get_next_user_item_id().await?;
2357            let next_system_item_id = catalog.get_next_system_item_id().await?;
2358            let read_only = self.controller.read_only();
2359            // Fetch all IDs from the catalog to future-proof against other
2360            // things using secrets. Today, SECRET and CONNECTION objects use
2361            // secrets_controller.ensure, but more things could in the future
2362            // that would be easy to miss adding here.
2363            let catalog_ids: BTreeSet<CatalogItemId> =
2364                catalog.entries().map(|entry| entry.id()).collect();
2365            let secrets_controller = Arc::clone(secrets_controller);
2366
2367            spawn(|| "cleanup-orphaned-secrets", async move {
2368                if read_only {
2369                    info!(
2370                        "coordinator init: not cleaning up orphaned secrets while in read-only mode"
2371                    );
2372                    return;
2373                }
2374                info!("coordinator init: cleaning up orphaned secrets");
2375
2376                match secrets_controller.list().await {
2377                    Ok(controller_secrets) => {
2378                        let controller_secrets: BTreeSet<CatalogItemId> =
2379                            controller_secrets.into_iter().collect();
2380                        let orphaned = controller_secrets.difference(&catalog_ids);
2381                        for id in orphaned {
2382                            let id_too_large = match id {
2383                                CatalogItemId::System(id) => *id >= next_system_item_id,
2384                                CatalogItemId::User(id) => *id >= next_user_item_id,
2385                                CatalogItemId::IntrospectionSourceIndex(_)
2386                                | CatalogItemId::Transient(_) => false,
2387                            };
2388                            if id_too_large {
2389                                info!(
2390                                    %next_user_item_id, %next_system_item_id,
2391                                    "coordinator init: not deleting orphaned secret {id} that was likely created by a newer deploy generation"
2392                                );
2393                            } else {
2394                                info!("coordinator init: deleting orphaned secret {id}");
2395                                fail_point!("orphan_secrets");
2396                                if let Err(e) = secrets_controller.delete(*id).await {
2397                                    warn!(
2398                                        "Dropping orphaned secret has encountered an error: {}",
2399                                        e
2400                                    );
2401                                }
2402                            }
2403                        }
2404                    }
2405                    Err(e) => warn!("Failed to list secrets during orphan cleanup: {:?}", e),
2406                }
2407            });
2408        }
2409        info!(
2410            "startup: coordinator init: bootstrap: generate secret cleanup complete in {:?}",
2411            cleanup_secrets_start.elapsed()
2412        );
2413
2414        // Run all of our final steps concurrently.
2415        let final_steps_start = Instant::now();
2416        info!(
2417            "startup: coordinator init: bootstrap: migrate builtin tables in read-only mode beginning"
2418        );
2419        migrated_updates_fut
2420            .instrument(info_span!("coord::bootstrap::final"))
2421            .await;
2422
2423        debug!(
2424            "startup: coordinator init: bootstrap: announcing completion of initialization to controller"
2425        );
2426        // Announce the completion of initialization.
2427        self.controller.initialization_complete();
2428
2429        // Initialize unified introspection.
2430        self.bootstrap_introspection_subscribes().await;
2431
2432        info!(
2433            "startup: coordinator init: bootstrap: migrate builtin tables in read-only mode complete in {:?}",
2434            final_steps_start.elapsed()
2435        );
2436
2437        info!(
2438            "startup: coordinator init: bootstrap complete in {:?}",
2439            bootstrap_start.elapsed()
2440        );
2441        Ok(())
2442    }
2443
2444    /// Prepares tables for writing by resetting them to a known state and
2445    /// appending the given builtin table updates. The timestamp oracle
2446    /// will be advanced to the write timestamp of the append when this
2447    /// method returns.
2448    #[allow(clippy::async_yields_async)]
2449    #[instrument]
2450    async fn bootstrap_tables(
2451        &mut self,
2452        entries: &[CatalogEntry],
2453        mut builtin_table_updates: Vec<BuiltinTableUpdate>,
2454        audit_logs_iterator: AuditLogIterator,
2455    ) {
2456        /// Smaller helper struct of metadata for bootstrapping tables.
2457        struct TableMetadata<'a> {
2458            id: CatalogItemId,
2459            name: &'a QualifiedItemName,
2460            table: &'a Table,
2461        }
2462
2463        // Filter our entries down to just tables.
2464        let table_metas: Vec<_> = entries
2465            .into_iter()
2466            .filter_map(|entry| {
2467                entry.table().map(|table| TableMetadata {
2468                    id: entry.id(),
2469                    name: entry.name(),
2470                    table,
2471                })
2472            })
2473            .collect();
2474
2475        // Append empty batches to advance the timestamp of all tables.
2476        debug!("coordinator init: advancing all tables to current timestamp");
2477        let WriteTimestamp {
2478            timestamp: write_ts,
2479            advance_to,
2480        } = self.get_local_write_ts().await;
2481        let appends = table_metas
2482            .iter()
2483            .map(|meta| (meta.table.global_id_writes(), Vec::new()))
2484            .collect();
2485        // Append the tables in the background. We apply the write timestamp before getting a read
2486        // timestamp and reading a snapshot of each table, so the snapshots will block on their own
2487        // until the appends are complete.
2488        let table_fence_rx = self
2489            .controller
2490            .storage
2491            .append_table(write_ts.clone(), advance_to, appends)
2492            .expect("invalid updates");
2493
2494        self.apply_local_write(write_ts).await;
2495
2496        // Add builtin table updates the clear the contents of all system tables
2497        debug!("coordinator init: resetting system tables");
2498        let read_ts = self.get_local_read_ts().await;
2499
2500        // Filter out the 'mz_storage_usage_by_shard' table since we need to retain that info for
2501        // billing purposes.
2502        let mz_storage_usage_by_shard_schema: SchemaSpecifier = self
2503            .catalog()
2504            .resolve_system_schema(MZ_STORAGE_USAGE_BY_SHARD.schema)
2505            .into();
2506        let is_storage_usage_by_shard = |meta: &TableMetadata| -> bool {
2507            meta.name.item == MZ_STORAGE_USAGE_BY_SHARD.name
2508                && meta.name.qualifiers.schema_spec == mz_storage_usage_by_shard_schema
2509        };
2510
2511        let mut retraction_tasks = Vec::new();
2512        let mut system_tables: Vec<_> = table_metas
2513            .iter()
2514            .filter(|meta| meta.id.is_system() && !is_storage_usage_by_shard(meta))
2515            .collect();
2516
2517        // Special case audit events because it's append only.
2518        let (audit_events_idx, _) = system_tables
2519            .iter()
2520            .find_position(|table| {
2521                table.id == self.catalog().resolve_builtin_table(&MZ_AUDIT_EVENTS)
2522            })
2523            .expect("mz_audit_events must exist");
2524        let audit_events = system_tables.remove(audit_events_idx);
2525        let audit_log_task = self.bootstrap_audit_log_table(
2526            audit_events.id,
2527            audit_events.name,
2528            audit_events.table,
2529            audit_logs_iterator,
2530            read_ts,
2531        );
2532
2533        for system_table in system_tables {
2534            let table_id = system_table.id;
2535            let full_name = self.catalog().resolve_full_name(system_table.name, None);
2536            debug!("coordinator init: resetting system table {full_name} ({table_id})");
2537
2538            // Fetch the current contents of the table for retraction.
2539            let snapshot_fut = self
2540                .controller
2541                .storage_collections
2542                .snapshot_cursor(system_table.table.global_id_writes(), read_ts);
2543            let batch_fut = self
2544                .controller
2545                .storage_collections
2546                .create_update_builder(system_table.table.global_id_writes());
2547
2548            let task = spawn(|| format!("snapshot-{table_id}"), async move {
2549                // Create a TimestamplessUpdateBuilder.
2550                let mut batch = batch_fut
2551                    .await
2552                    .unwrap_or_terminate("cannot fail to create a batch for a BuiltinTable");
2553                tracing::info!(?table_id, "starting snapshot");
2554                // Get a cursor which will emit a consolidated snapshot.
2555                let mut snapshot_cursor = snapshot_fut
2556                    .await
2557                    .unwrap_or_terminate("cannot fail to snapshot");
2558
2559                // Retract the current contents, spilling into our builder.
2560                while let Some(values) = snapshot_cursor.next().await {
2561                    for ((key, _val), _t, d) in values {
2562                        let key = key.expect("builtin table had errors");
2563                        let d_invert = d.neg();
2564                        batch.add(&key, &(), &d_invert).await;
2565                    }
2566                }
2567                tracing::info!(?table_id, "finished snapshot");
2568
2569                let batch = batch.finish().await;
2570                BuiltinTableUpdate::batch(table_id, batch)
2571            });
2572            retraction_tasks.push(task);
2573        }
2574
2575        let retractions_res = futures::future::join_all(retraction_tasks).await;
2576        for retractions in retractions_res {
2577            builtin_table_updates.push(retractions);
2578        }
2579
2580        let audit_join_start = Instant::now();
2581        info!("startup: coordinator init: bootstrap: join audit log deserialization beginning");
2582        let audit_log_updates = audit_log_task.await;
2583        let audit_log_builtin_table_updates = self
2584            .catalog()
2585            .state()
2586            .generate_builtin_table_updates(audit_log_updates);
2587        builtin_table_updates.extend(audit_log_builtin_table_updates);
2588        info!(
2589            "startup: coordinator init: bootstrap: join audit log deserialization complete in {:?}",
2590            audit_join_start.elapsed()
2591        );
2592
2593        // Now that the snapshots are complete, the appends must also be complete.
2594        table_fence_rx
2595            .await
2596            .expect("One-shot shouldn't be dropped during bootstrap")
2597            .unwrap_or_terminate("cannot fail to append");
2598
2599        info!("coordinator init: sending builtin table updates");
2600        let (_builtin_updates_fut, write_ts) = self
2601            .builtin_table_update()
2602            .execute(builtin_table_updates)
2603            .await;
2604        info!(?write_ts, "our write ts");
2605        if let Some(write_ts) = write_ts {
2606            self.apply_local_write(write_ts).await;
2607        }
2608    }
2609
2610    /// Prepare updates to the audit log table. The audit log table append only and very large, so
2611    /// we only need to find the events present in `audit_logs_iterator` but not in the audit log
2612    /// table.
2613    #[instrument]
2614    fn bootstrap_audit_log_table<'a>(
2615        &mut self,
2616        table_id: CatalogItemId,
2617        name: &'a QualifiedItemName,
2618        table: &'a Table,
2619        audit_logs_iterator: AuditLogIterator,
2620        read_ts: Timestamp,
2621    ) -> JoinHandle<Vec<StateUpdate>> {
2622        let full_name = self.catalog().resolve_full_name(name, None);
2623        debug!("coordinator init: reconciling audit log: {full_name} ({table_id})");
2624        let current_contents_fut = self
2625            .controller
2626            .storage_collections
2627            .snapshot(table.global_id_writes(), read_ts);
2628        spawn(|| format!("snapshot-audit-log-{table_id}"), async move {
2629            let current_contents = current_contents_fut
2630                .await
2631                .unwrap_or_terminate("cannot fail to fetch snapshot");
2632            let contents_len = current_contents.len();
2633            debug!("coordinator init: audit log table ({table_id}) size {contents_len}");
2634
2635            // Fetch the largest audit log event ID that has been written to the table.
2636            let max_table_id = current_contents
2637                .into_iter()
2638                .filter(|(_, diff)| *diff == 1)
2639                .map(|(row, _diff)| row.unpack_first().unwrap_uint64())
2640                .sorted()
2641                .rev()
2642                .next();
2643
2644            // Filter audit log catalog updates to those that are not present in the table.
2645            audit_logs_iterator
2646                .take_while(|(audit_log, _)| match max_table_id {
2647                    Some(id) => audit_log.event.sortable_id() > id,
2648                    None => true,
2649                })
2650                .map(|(audit_log, ts)| StateUpdate {
2651                    kind: StateUpdateKind::AuditLog(audit_log),
2652                    ts,
2653                    diff: StateDiff::Addition,
2654                })
2655                .collect::<Vec<_>>()
2656        })
2657    }
2658
2659    /// Initializes all storage collections required by catalog objects in the storage controller.
2660    ///
2661    /// This method takes care of collection creation, as well as migration of existing
2662    /// collections.
2663    ///
2664    /// Creating all storage collections in a single `create_collections` call, rather than on
2665    /// demand, is more efficient as it reduces the number of writes to durable storage. It also
2666    /// allows subsequent bootstrap logic to fetch metadata (such as frontiers) of arbitrary
2667    /// storage collections, without needing to worry about dependency order.
2668    ///
2669    /// `migrated_storage_collections` is a set of builtin storage collections that have been
2670    /// migrated and should be handled specially.
2671    #[instrument]
2672    async fn bootstrap_storage_collections(
2673        &mut self,
2674        migrated_storage_collections: &BTreeSet<CatalogItemId>,
2675    ) {
2676        let catalog = self.catalog();
2677        let source_status_collection_id = catalog
2678            .resolve_builtin_storage_collection(&mz_catalog::builtin::MZ_SOURCE_STATUS_HISTORY);
2679        let source_status_collection_id = catalog
2680            .get_entry(&source_status_collection_id)
2681            .latest_global_id();
2682
2683        let source_desc = |object_id: GlobalId,
2684                           data_source: &DataSourceDesc,
2685                           desc: &RelationDesc,
2686                           timeline: &Timeline| {
2687            let (data_source, status_collection_id) = match data_source.clone() {
2688                // Re-announce the source description.
2689                DataSourceDesc::Ingestion { desc, cluster_id } => {
2690                    let desc = desc.into_inline_connection(catalog.state());
2691                    let ingestion = IngestionDescription::new(desc, cluster_id, object_id);
2692
2693                    (
2694                        DataSource::Ingestion(ingestion),
2695                        Some(source_status_collection_id),
2696                    )
2697                }
2698                DataSourceDesc::OldSyntaxIngestion {
2699                    desc,
2700                    progress_subsource,
2701                    data_config,
2702                    details,
2703                    cluster_id,
2704                } => {
2705                    let desc = desc.into_inline_connection(catalog.state());
2706                    let data_config = data_config.into_inline_connection(catalog.state());
2707                    // TODO(parkmycar): We should probably check the type here, but I'm not sure if
2708                    // this will always be a Source or a Table.
2709                    let progress_subsource =
2710                        catalog.get_entry(&progress_subsource).latest_global_id();
2711                    let mut ingestion =
2712                        IngestionDescription::new(desc, cluster_id, progress_subsource);
2713                    let legacy_export = SourceExport {
2714                        storage_metadata: (),
2715                        data_config,
2716                        details,
2717                    };
2718                    ingestion.source_exports.insert(object_id, legacy_export);
2719
2720                    (
2721                        DataSource::Ingestion(ingestion),
2722                        Some(source_status_collection_id),
2723                    )
2724                }
2725                DataSourceDesc::IngestionExport {
2726                    ingestion_id,
2727                    external_reference: _,
2728                    details,
2729                    data_config,
2730                } => {
2731                    // TODO(parkmycar): We should probably check the type here, but I'm not sure if
2732                    // this will always be a Source or a Table.
2733                    let ingestion_id = catalog.get_entry(&ingestion_id).latest_global_id();
2734                    (
2735                        DataSource::IngestionExport {
2736                            ingestion_id,
2737                            details,
2738                            data_config: data_config.into_inline_connection(catalog.state()),
2739                        },
2740                        Some(source_status_collection_id),
2741                    )
2742                }
2743                DataSourceDesc::Webhook { .. } => {
2744                    (DataSource::Webhook, Some(source_status_collection_id))
2745                }
2746                DataSourceDesc::Progress => (DataSource::Progress, None),
2747                DataSourceDesc::Introspection(introspection) => {
2748                    (DataSource::Introspection(introspection), None)
2749                }
2750            };
2751            CollectionDescription {
2752                desc: desc.clone(),
2753                data_source,
2754                since: None,
2755                status_collection_id,
2756                timeline: Some(timeline.clone()),
2757                primary: None,
2758            }
2759        };
2760
2761        let mut compute_collections = vec![];
2762        let mut collections = vec![];
2763        let mut new_builtin_continual_tasks = vec![];
2764        for entry in catalog.entries() {
2765            match entry.item() {
2766                CatalogItem::Source(source) => {
2767                    collections.push((
2768                        source.global_id(),
2769                        source_desc(
2770                            source.global_id(),
2771                            &source.data_source,
2772                            &source.desc,
2773                            &source.timeline,
2774                        ),
2775                    ));
2776                }
2777                CatalogItem::Table(table) => {
2778                    match &table.data_source {
2779                        TableDataSource::TableWrites { defaults: _ } => {
2780                            let versions: BTreeMap<_, _> = table
2781                                .collection_descs()
2782                                .map(|(gid, version, desc)| (version, (gid, desc)))
2783                                .collect();
2784                            let collection_descs = versions.iter().map(|(version, (gid, desc))| {
2785                                let next_version = version.bump();
2786                                let primary_collection =
2787                                    versions.get(&next_version).map(|(gid, _desc)| gid).copied();
2788                                let mut collection_desc =
2789                                    CollectionDescription::for_table(desc.clone());
2790                                collection_desc.primary = primary_collection;
2791
2792                                (*gid, collection_desc)
2793                            });
2794                            collections.extend(collection_descs);
2795                        }
2796                        TableDataSource::DataSource {
2797                            desc: data_source_desc,
2798                            timeline,
2799                        } => {
2800                            // TODO(alter_table): Support versioning tables that read from sources.
2801                            soft_assert_eq_or_log!(table.collections.len(), 1);
2802                            let collection_descs =
2803                                table.collection_descs().map(|(gid, _version, desc)| {
2804                                    (
2805                                        gid,
2806                                        source_desc(
2807                                            entry.latest_global_id(),
2808                                            data_source_desc,
2809                                            &desc,
2810                                            timeline,
2811                                        ),
2812                                    )
2813                                });
2814                            collections.extend(collection_descs);
2815                        }
2816                    };
2817                }
2818                CatalogItem::MaterializedView(mv) => {
2819                    let collection_descs = mv.collection_descs().map(|(gid, _version, desc)| {
2820                        let collection_desc =
2821                            CollectionDescription::for_other(desc, mv.initial_as_of.clone());
2822                        (gid, collection_desc)
2823                    });
2824
2825                    collections.extend(collection_descs);
2826                    compute_collections.push((mv.global_id_writes(), mv.desc.latest()));
2827                }
2828                CatalogItem::ContinualTask(ct) => {
2829                    let collection_desc =
2830                        CollectionDescription::for_other(ct.desc.clone(), ct.initial_as_of.clone());
2831                    if ct.global_id().is_system() && collection_desc.since.is_none() {
2832                        // We need a non-0 since to make as_of selection work. Fill it in below with
2833                        // the `bootstrap_builtin_continual_tasks` call, which can only be run after
2834                        // `create_collections_for_bootstrap`.
2835                        new_builtin_continual_tasks.push((ct.global_id(), collection_desc));
2836                    } else {
2837                        compute_collections.push((ct.global_id(), ct.desc.clone()));
2838                        collections.push((ct.global_id(), collection_desc));
2839                    }
2840                }
2841                CatalogItem::Sink(sink) => {
2842                    let storage_sink_from_entry = self.catalog().get_entry_by_global_id(&sink.from);
2843                    let from_desc = storage_sink_from_entry
2844                        .desc(&self.catalog().resolve_full_name(
2845                            storage_sink_from_entry.name(),
2846                            storage_sink_from_entry.conn_id(),
2847                        ))
2848                        .expect("sinks can only be built on items with descs")
2849                        .into_owned();
2850                    let collection_desc = CollectionDescription {
2851                        // TODO(sinks): make generic once we have more than one sink type.
2852                        desc: KAFKA_PROGRESS_DESC.clone(),
2853                        data_source: DataSource::Sink {
2854                            desc: ExportDescription {
2855                                sink: StorageSinkDesc {
2856                                    from: sink.from,
2857                                    from_desc,
2858                                    connection: sink
2859                                        .connection
2860                                        .clone()
2861                                        .into_inline_connection(self.catalog().state()),
2862                                    envelope: sink.envelope,
2863                                    as_of: Antichain::from_elem(Timestamp::minimum()),
2864                                    with_snapshot: sink.with_snapshot,
2865                                    version: sink.version,
2866                                    from_storage_metadata: (),
2867                                    to_storage_metadata: (),
2868                                },
2869                                instance_id: sink.cluster_id,
2870                            },
2871                        },
2872                        since: None,
2873                        status_collection_id: None,
2874                        timeline: None,
2875                        primary: None,
2876                    };
2877                    collections.push((sink.global_id, collection_desc));
2878                }
2879                _ => (),
2880            }
2881        }
2882
2883        let register_ts = if self.controller.read_only() {
2884            self.get_local_read_ts().await
2885        } else {
2886            // Getting a write timestamp bumps the write timestamp in the
2887            // oracle, which we're not allowed in read-only mode.
2888            self.get_local_write_ts().await.timestamp
2889        };
2890
2891        let storage_metadata = self.catalog.state().storage_metadata();
2892        let migrated_storage_collections = migrated_storage_collections
2893            .into_iter()
2894            .flat_map(|item_id| self.catalog.get_entry(item_id).global_ids())
2895            .collect();
2896
2897        // Before possibly creating collections, make sure their schemas are correct.
2898        //
2899        // Across different versions of Materialize the nullability of columns can change based on
2900        // updates to our optimizer.
2901        self.controller
2902            .storage
2903            .evolve_nullability_for_bootstrap(storage_metadata, compute_collections)
2904            .await
2905            .unwrap_or_terminate("cannot fail to evolve collections");
2906
2907        self.controller
2908            .storage
2909            .create_collections_for_bootstrap(
2910                storage_metadata,
2911                Some(register_ts),
2912                collections,
2913                &migrated_storage_collections,
2914            )
2915            .await
2916            .unwrap_or_terminate("cannot fail to create collections");
2917
2918        self.bootstrap_builtin_continual_tasks(new_builtin_continual_tasks)
2919            .await;
2920
2921        if !self.controller.read_only() {
2922            self.apply_local_write(register_ts).await;
2923        }
2924    }
2925
2926    /// Make as_of selection happy for builtin CTs. Ideally we'd write the
2927    /// initial as_of down in the durable catalog, but that's hard because of
2928    /// boot ordering. Instead, we set the since of the storage collection to
2929    /// something that's a reasonable lower bound for the as_of. Then, if the
2930    /// upper is 0, the as_of selection code will allow us to jump it forward to
2931    /// this since.
2932    async fn bootstrap_builtin_continual_tasks(
2933        &mut self,
2934        // TODO(alter_table): Switch to CatalogItemId.
2935        mut collections: Vec<(GlobalId, CollectionDescription<Timestamp>)>,
2936    ) {
2937        for (id, collection) in &mut collections {
2938            let entry = self.catalog.get_entry_by_global_id(id);
2939            let ct = match &entry.item {
2940                CatalogItem::ContinualTask(ct) => ct.clone(),
2941                _ => unreachable!("only called with continual task builtins"),
2942            };
2943            let debug_name = self
2944                .catalog()
2945                .resolve_full_name(entry.name(), None)
2946                .to_string();
2947            let (_optimized_plan, physical_plan, _metainfo) = self
2948                .optimize_create_continual_task(&ct, *id, self.owned_catalog(), debug_name)
2949                .expect("builtin CT should optimize successfully");
2950
2951            // Determine an as of for the new continual task.
2952            let mut id_bundle = dataflow_import_id_bundle(&physical_plan, ct.cluster_id);
2953            // Can't acquire a read hold on ourselves because we don't exist yet.
2954            id_bundle.storage_ids.remove(id);
2955            let read_holds = self.acquire_read_holds(&id_bundle);
2956            let as_of = read_holds.least_valid_read();
2957
2958            collection.since = Some(as_of.clone());
2959        }
2960        self.controller
2961            .storage
2962            .create_collections(self.catalog.state().storage_metadata(), None, collections)
2963            .await
2964            .unwrap_or_terminate("cannot fail to create collections");
2965    }
2966
2967    /// Invokes the optimizer on all indexes and materialized views in the catalog and inserts the
2968    /// resulting dataflow plans into the catalog state.
2969    ///
2970    /// `ordered_catalog_entries` must be sorted in dependency order, with dependencies ordered
2971    /// before their dependants.
2972    ///
2973    /// This method does not perform timestamp selection for the dataflows, nor does it create them
2974    /// in the compute controller. Both of these steps happen later during bootstrapping.
2975    ///
2976    /// Returns a map of expressions that were not cached.
2977    #[instrument]
2978    fn bootstrap_dataflow_plans(
2979        &mut self,
2980        ordered_catalog_entries: &[CatalogEntry],
2981        mut cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
2982    ) -> Result<BTreeMap<GlobalId, GlobalExpressions>, AdapterError> {
2983        // The optimizer expects to be able to query its `ComputeInstanceSnapshot` for
2984        // collections the current dataflow can depend on. But since we don't yet install anything
2985        // on compute instances, the snapshot information is incomplete. We fix that by manually
2986        // updating `ComputeInstanceSnapshot` objects to ensure they contain collections previously
2987        // optimized.
2988        let mut instance_snapshots = BTreeMap::new();
2989        let mut uncached_expressions = BTreeMap::new();
2990
2991        let optimizer_config = OptimizerConfig::from(self.catalog().system_config());
2992
2993        for entry in ordered_catalog_entries {
2994            match entry.item() {
2995                CatalogItem::Index(idx) => {
2996                    // Collect optimizer parameters.
2997                    let compute_instance =
2998                        instance_snapshots.entry(idx.cluster_id).or_insert_with(|| {
2999                            self.instance_snapshot(idx.cluster_id)
3000                                .expect("compute instance exists")
3001                        });
3002                    let global_id = idx.global_id();
3003
3004                    // The index may already be installed on the compute instance. For example,
3005                    // this is the case for introspection indexes.
3006                    if compute_instance.contains_collection(&global_id) {
3007                        continue;
3008                    }
3009
3010                    let (optimized_plan, physical_plan, metainfo) =
3011                        match cached_global_exprs.remove(&global_id) {
3012                            Some(global_expressions)
3013                                if global_expressions.optimizer_features
3014                                    == optimizer_config.features =>
3015                            {
3016                                debug!("global expression cache hit for {global_id:?}");
3017                                (
3018                                    global_expressions.global_mir,
3019                                    global_expressions.physical_plan,
3020                                    global_expressions.dataflow_metainfos,
3021                                )
3022                            }
3023                            Some(_) | None => {
3024                                let (optimized_plan, global_lir_plan) = {
3025                                    // Build an optimizer for this INDEX.
3026                                    let mut optimizer = optimize::index::Optimizer::new(
3027                                        self.owned_catalog(),
3028                                        compute_instance.clone(),
3029                                        global_id,
3030                                        optimizer_config.clone(),
3031                                        self.optimizer_metrics(),
3032                                    );
3033
3034                                    // MIR ⇒ MIR optimization (global)
3035                                    let index_plan = optimize::index::Index::new(
3036                                        entry.name().clone(),
3037                                        idx.on,
3038                                        idx.keys.to_vec(),
3039                                    );
3040                                    let global_mir_plan = optimizer.optimize(index_plan)?;
3041                                    let optimized_plan = global_mir_plan.df_desc().clone();
3042
3043                                    // MIR ⇒ LIR lowering and LIR ⇒ LIR optimization (global)
3044                                    let global_lir_plan = optimizer.optimize(global_mir_plan)?;
3045
3046                                    (optimized_plan, global_lir_plan)
3047                                };
3048
3049                                let (physical_plan, metainfo) = global_lir_plan.unapply();
3050                                let metainfo = {
3051                                    // Pre-allocate a vector of transient GlobalIds for each notice.
3052                                    let notice_ids =
3053                                        std::iter::repeat_with(|| self.allocate_transient_id())
3054                                            .map(|(_item_id, gid)| gid)
3055                                            .take(metainfo.optimizer_notices.len())
3056                                            .collect::<Vec<_>>();
3057                                    // Return a metainfo with rendered notices.
3058                                    self.catalog().render_notices(
3059                                        metainfo,
3060                                        notice_ids,
3061                                        Some(idx.global_id()),
3062                                    )
3063                                };
3064                                uncached_expressions.insert(
3065                                    global_id,
3066                                    GlobalExpressions {
3067                                        global_mir: optimized_plan.clone(),
3068                                        physical_plan: physical_plan.clone(),
3069                                        dataflow_metainfos: metainfo.clone(),
3070                                        optimizer_features: OptimizerFeatures::from(
3071                                            self.catalog().system_config(),
3072                                        ),
3073                                    },
3074                                );
3075                                (optimized_plan, physical_plan, metainfo)
3076                            }
3077                        };
3078
3079                    let catalog = self.catalog_mut();
3080                    catalog.set_optimized_plan(idx.global_id(), optimized_plan);
3081                    catalog.set_physical_plan(idx.global_id(), physical_plan);
3082                    catalog.set_dataflow_metainfo(idx.global_id(), metainfo);
3083
3084                    compute_instance.insert_collection(idx.global_id());
3085                }
3086                CatalogItem::MaterializedView(mv) => {
3087                    // Collect optimizer parameters.
3088                    let compute_instance =
3089                        instance_snapshots.entry(mv.cluster_id).or_insert_with(|| {
3090                            self.instance_snapshot(mv.cluster_id)
3091                                .expect("compute instance exists")
3092                        });
3093                    let global_id = mv.global_id_writes();
3094
3095                    let (optimized_plan, physical_plan, metainfo) =
3096                        match cached_global_exprs.remove(&global_id) {
3097                            Some(global_expressions)
3098                                if global_expressions.optimizer_features
3099                                    == optimizer_config.features =>
3100                            {
3101                                debug!("global expression cache hit for {global_id:?}");
3102                                (
3103                                    global_expressions.global_mir,
3104                                    global_expressions.physical_plan,
3105                                    global_expressions.dataflow_metainfos,
3106                                )
3107                            }
3108                            Some(_) | None => {
3109                                let (_, internal_view_id) = self.allocate_transient_id();
3110                                let debug_name = self
3111                                    .catalog()
3112                                    .resolve_full_name(entry.name(), None)
3113                                    .to_string();
3114                                let force_non_monotonic = Default::default();
3115
3116                                let (optimized_plan, global_lir_plan) = {
3117                                    // Build an optimizer for this MATERIALIZED VIEW.
3118                                    let mut optimizer = optimize::materialized_view::Optimizer::new(
3119                                        self.owned_catalog().as_optimizer_catalog(),
3120                                        compute_instance.clone(),
3121                                        global_id,
3122                                        internal_view_id,
3123                                        mv.desc.latest().iter_names().cloned().collect(),
3124                                        mv.non_null_assertions.clone(),
3125                                        mv.refresh_schedule.clone(),
3126                                        debug_name,
3127                                        optimizer_config.clone(),
3128                                        self.optimizer_metrics(),
3129                                        force_non_monotonic,
3130                                    );
3131
3132                                    // MIR ⇒ MIR optimization (global)
3133                                    let global_mir_plan =
3134                                        optimizer.optimize(mv.optimized_expr.as_ref().clone())?;
3135                                    let optimized_plan = global_mir_plan.df_desc().clone();
3136
3137                                    // MIR ⇒ LIR lowering and LIR ⇒ LIR optimization (global)
3138                                    let global_lir_plan = optimizer.optimize(global_mir_plan)?;
3139
3140                                    (optimized_plan, global_lir_plan)
3141                                };
3142
3143                                let (physical_plan, metainfo) = global_lir_plan.unapply();
3144                                let metainfo = {
3145                                    // Pre-allocate a vector of transient GlobalIds for each notice.
3146                                    let notice_ids =
3147                                        std::iter::repeat_with(|| self.allocate_transient_id())
3148                                            .map(|(_item_id, global_id)| global_id)
3149                                            .take(metainfo.optimizer_notices.len())
3150                                            .collect::<Vec<_>>();
3151                                    // Return a metainfo with rendered notices.
3152                                    self.catalog().render_notices(
3153                                        metainfo,
3154                                        notice_ids,
3155                                        Some(mv.global_id_writes()),
3156                                    )
3157                                };
3158                                uncached_expressions.insert(
3159                                    global_id,
3160                                    GlobalExpressions {
3161                                        global_mir: optimized_plan.clone(),
3162                                        physical_plan: physical_plan.clone(),
3163                                        dataflow_metainfos: metainfo.clone(),
3164                                        optimizer_features: OptimizerFeatures::from(
3165                                            self.catalog().system_config(),
3166                                        ),
3167                                    },
3168                                );
3169                                (optimized_plan, physical_plan, metainfo)
3170                            }
3171                        };
3172
3173                    let catalog = self.catalog_mut();
3174                    catalog.set_optimized_plan(mv.global_id_writes(), optimized_plan);
3175                    catalog.set_physical_plan(mv.global_id_writes(), physical_plan);
3176                    catalog.set_dataflow_metainfo(mv.global_id_writes(), metainfo);
3177
3178                    compute_instance.insert_collection(mv.global_id_writes());
3179                }
3180                CatalogItem::ContinualTask(ct) => {
3181                    let compute_instance =
3182                        instance_snapshots.entry(ct.cluster_id).or_insert_with(|| {
3183                            self.instance_snapshot(ct.cluster_id)
3184                                .expect("compute instance exists")
3185                        });
3186                    let global_id = ct.global_id();
3187
3188                    let (optimized_plan, physical_plan, metainfo) =
3189                        match cached_global_exprs.remove(&global_id) {
3190                            Some(global_expressions)
3191                                if global_expressions.optimizer_features
3192                                    == optimizer_config.features =>
3193                            {
3194                                debug!("global expression cache hit for {global_id:?}");
3195                                (
3196                                    global_expressions.global_mir,
3197                                    global_expressions.physical_plan,
3198                                    global_expressions.dataflow_metainfos,
3199                                )
3200                            }
3201                            Some(_) | None => {
3202                                let debug_name = self
3203                                    .catalog()
3204                                    .resolve_full_name(entry.name(), None)
3205                                    .to_string();
3206                                let (optimized_plan, physical_plan, metainfo) = self
3207                                    .optimize_create_continual_task(
3208                                        ct,
3209                                        global_id,
3210                                        self.owned_catalog(),
3211                                        debug_name,
3212                                    )?;
3213                                uncached_expressions.insert(
3214                                    global_id,
3215                                    GlobalExpressions {
3216                                        global_mir: optimized_plan.clone(),
3217                                        physical_plan: physical_plan.clone(),
3218                                        dataflow_metainfos: metainfo.clone(),
3219                                        optimizer_features: OptimizerFeatures::from(
3220                                            self.catalog().system_config(),
3221                                        ),
3222                                    },
3223                                );
3224                                (optimized_plan, physical_plan, metainfo)
3225                            }
3226                        };
3227
3228                    let catalog = self.catalog_mut();
3229                    catalog.set_optimized_plan(ct.global_id(), optimized_plan);
3230                    catalog.set_physical_plan(ct.global_id(), physical_plan);
3231                    catalog.set_dataflow_metainfo(ct.global_id(), metainfo);
3232
3233                    compute_instance.insert_collection(ct.global_id());
3234                }
3235                _ => (),
3236            }
3237        }
3238
3239        Ok(uncached_expressions)
3240    }
3241
3242    /// Selects for each compute dataflow an as-of suitable for bootstrapping it.
3243    ///
3244    /// Returns a set of [`ReadHold`]s that ensures the read frontiers of involved collections stay
3245    /// in place and that must not be dropped before all compute dataflows have been created with
3246    /// the compute controller.
3247    ///
3248    /// This method expects all storage collections and dataflow plans to be available, so it must
3249    /// run after [`Coordinator::bootstrap_storage_collections`] and
3250    /// [`Coordinator::bootstrap_dataflow_plans`].
3251    async fn bootstrap_dataflow_as_ofs(&mut self) -> BTreeMap<GlobalId, ReadHold<Timestamp>> {
3252        let mut catalog_ids = Vec::new();
3253        let mut dataflows = Vec::new();
3254        let mut read_policies = BTreeMap::new();
3255        for entry in self.catalog.entries() {
3256            let gid = match entry.item() {
3257                CatalogItem::Index(idx) => idx.global_id(),
3258                CatalogItem::MaterializedView(mv) => mv.global_id_writes(),
3259                CatalogItem::ContinualTask(ct) => ct.global_id(),
3260                CatalogItem::Table(_)
3261                | CatalogItem::Source(_)
3262                | CatalogItem::Log(_)
3263                | CatalogItem::View(_)
3264                | CatalogItem::Sink(_)
3265                | CatalogItem::Type(_)
3266                | CatalogItem::Func(_)
3267                | CatalogItem::Secret(_)
3268                | CatalogItem::Connection(_) => continue,
3269            };
3270            if let Some(plan) = self.catalog.try_get_physical_plan(&gid) {
3271                catalog_ids.push(gid);
3272                dataflows.push(plan.clone());
3273
3274                if let Some(compaction_window) = entry.item().initial_logical_compaction_window() {
3275                    read_policies.insert(gid, compaction_window.into());
3276                }
3277            }
3278        }
3279
3280        let read_ts = self.get_local_read_ts().await;
3281        let read_holds = as_of_selection::run(
3282            &mut dataflows,
3283            &read_policies,
3284            &*self.controller.storage_collections,
3285            read_ts,
3286            self.controller.read_only(),
3287        );
3288
3289        let catalog = self.catalog_mut();
3290        for (id, plan) in catalog_ids.into_iter().zip_eq(dataflows) {
3291            catalog.set_physical_plan(id, plan);
3292        }
3293
3294        read_holds
3295    }
3296
3297    /// Serves the coordinator, receiving commands from users over `cmd_rx`
3298    /// and feedback from dataflow workers over `feedback_rx`.
3299    ///
3300    /// You must call `bootstrap` before calling this method.
3301    ///
3302    /// BOXED FUTURE: As of Nov 2023 the returned Future from this function was 92KB. This would
3303    /// get stored on the stack which is bad for runtime performance, and blow up our stack usage.
3304    /// Because of that we purposefully move this Future onto the heap (i.e. Box it).
3305    fn serve(
3306        mut self,
3307        mut internal_cmd_rx: mpsc::UnboundedReceiver<Message>,
3308        mut strict_serializable_reads_rx: mpsc::UnboundedReceiver<(ConnectionId, PendingReadTxn)>,
3309        mut cmd_rx: mpsc::UnboundedReceiver<(OpenTelemetryContext, Command)>,
3310        group_commit_rx: appends::GroupCommitWaiter,
3311    ) -> LocalBoxFuture<'static, ()> {
3312        async move {
3313            // Watcher that listens for and reports cluster service status changes.
3314            let mut cluster_events = self.controller.events_stream();
3315            let last_message = Arc::new(Mutex::new(LastMessage {
3316                kind: "none",
3317                stmt: None,
3318            }));
3319
3320            let (idle_tx, mut idle_rx) = tokio::sync::mpsc::channel(1);
3321            let idle_metric = self.metrics.queue_busy_seconds.clone();
3322            let last_message_watchdog = Arc::clone(&last_message);
3323
3324            spawn(|| "coord watchdog", async move {
3325                // Every 5 seconds, attempt to measure how long it takes for the
3326                // coord select loop to be empty, because this message is the last
3327                // processed. If it is idle, this will result in some microseconds
3328                // of measurement.
3329                let mut interval = tokio::time::interval(Duration::from_secs(5));
3330                // If we end up having to wait more than 5 seconds for the coord to respond, then the
3331                // behavior of Delay results in the interval "restarting" from whenever we yield
3332                // instead of trying to catch up.
3333                interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
3334
3335                // Track if we become stuck to de-dupe error reporting.
3336                let mut coord_stuck = false;
3337
3338                loop {
3339                    interval.tick().await;
3340
3341                    // Wait for space in the channel, if we timeout then the coordinator is stuck!
3342                    let duration = tokio::time::Duration::from_secs(30);
3343                    let timeout = tokio::time::timeout(duration, idle_tx.reserve()).await;
3344                    let Ok(maybe_permit) = timeout else {
3345                        // Only log if we're newly stuck, to prevent logging repeatedly.
3346                        if !coord_stuck {
3347                            let last_message = last_message_watchdog.lock().expect("poisoned");
3348                            tracing::warn!(
3349                                last_message_kind = %last_message.kind,
3350                                last_message_sql = %last_message.stmt_to_string(),
3351                                "coordinator stuck for {duration:?}",
3352                            );
3353                        }
3354                        coord_stuck = true;
3355
3356                        continue;
3357                    };
3358
3359                    // We got a permit, we're not stuck!
3360                    if coord_stuck {
3361                        tracing::info!("Coordinator became unstuck");
3362                    }
3363                    coord_stuck = false;
3364
3365                    // If we failed to acquire a permit it's because we're shutting down.
3366                    let Ok(permit) = maybe_permit else {
3367                        break;
3368                    };
3369
3370                    permit.send(idle_metric.start_timer());
3371                }
3372            });
3373
3374            self.schedule_storage_usage_collection().await;
3375            self.spawn_privatelink_vpc_endpoints_watch_task();
3376            self.spawn_statement_logging_task();
3377            flags::tracing_config(self.catalog.system_config()).apply(&self.tracing_handle);
3378
3379            // Report if the handling of a single message takes longer than this threshold.
3380            let warn_threshold = self
3381                .catalog()
3382                .system_config()
3383                .coord_slow_message_warn_threshold();
3384
3385            // How many messages we'd like to batch up before processing them. Must be > 0.
3386            const MESSAGE_BATCH: usize = 64;
3387            let mut messages = Vec::with_capacity(MESSAGE_BATCH);
3388            let mut cmd_messages = Vec::with_capacity(MESSAGE_BATCH);
3389
3390            let message_batch = self.metrics.message_batch.clone();
3391
3392            loop {
3393                // Before adding a branch to this select loop, please ensure that the branch is
3394                // cancellation safe and add a comment explaining why. You can refer here for more
3395                // info: https://docs.rs/tokio/latest/tokio/macro.select.html#cancellation-safety
3396                select! {
3397                    // We prioritize internal commands over other commands. However, we work through
3398                    // batches of commands in some branches of this select, which means that even if
3399                    // a command generates internal commands, we will work through the current batch
3400                    // before receiving a new batch of commands.
3401                    biased;
3402
3403                    // `recv_many()` on `UnboundedReceiver` is cancellation safe:
3404                    // https://docs.rs/tokio/1.38.0/tokio/sync/mpsc/struct.UnboundedReceiver.html#cancel-safety-1
3405                    // Receive a batch of commands.
3406                    _ = internal_cmd_rx.recv_many(&mut messages, MESSAGE_BATCH) => {},
3407                    // `next()` on any stream is cancel-safe:
3408                    // https://docs.rs/tokio-stream/0.1.9/tokio_stream/trait.StreamExt.html#cancel-safety
3409                    // Receive a single command.
3410                    Some(event) = cluster_events.next() => messages.push(Message::ClusterEvent(event)),
3411                    // See [`mz_controller::Controller::Controller::ready`] for notes
3412                    // on why this is cancel-safe.
3413                    // Receive a single command.
3414                    () = self.controller.ready() => {
3415                        // NOTE: We don't get a `Readiness` back from `ready()`
3416                        // because the controller wants to keep it and it's not
3417                        // trivially `Clone` or `Copy`. Hence this accessor.
3418                        let controller = match self.controller.get_readiness() {
3419                            Readiness::Storage => ControllerReadiness::Storage,
3420                            Readiness::Compute => ControllerReadiness::Compute,
3421                            Readiness::Metrics(_) => ControllerReadiness::Metrics,
3422                            Readiness::Internal(_) => ControllerReadiness::Internal,
3423                            Readiness::NotReady => unreachable!("just signaled as ready"),
3424                        };
3425                        messages.push(Message::ControllerReady { controller });
3426                    }
3427                    // See [`appends::GroupCommitWaiter`] for notes on why this is cancel safe.
3428                    // Receive a single command.
3429                    permit = group_commit_rx.ready() => {
3430                        // If we happen to have batched exactly one user write, use
3431                        // that span so the `emit_trace_id_notice` hooks up.
3432                        // Otherwise, the best we can do is invent a new root span
3433                        // and make it follow from all the Spans in the pending
3434                        // writes.
3435                        let user_write_spans = self.pending_writes.iter().flat_map(|x| match x {
3436                            PendingWriteTxn::User{span, ..} => Some(span),
3437                            PendingWriteTxn::System{..} => None,
3438                        });
3439                        let span = match user_write_spans.exactly_one() {
3440                            Ok(span) => span.clone(),
3441                            Err(user_write_spans) => {
3442                                let span = info_span!(parent: None, "group_commit_notify");
3443                                for s in user_write_spans {
3444                                    span.follows_from(s);
3445                                }
3446                                span
3447                            }
3448                        };
3449                        messages.push(Message::GroupCommitInitiate(span, Some(permit)));
3450                    },
3451                    // `recv_many()` on `UnboundedReceiver` is cancellation safe:
3452                    // https://docs.rs/tokio/1.38.0/tokio/sync/mpsc/struct.UnboundedReceiver.html#cancel-safety-1
3453                    // Receive a batch of commands.
3454                    count = cmd_rx.recv_many(&mut cmd_messages, MESSAGE_BATCH) => {
3455                        if count == 0 {
3456                            break;
3457                        } else {
3458                            messages.extend(cmd_messages.drain(..).map(|(otel_ctx, cmd)| Message::Command(otel_ctx, cmd)));
3459                        }
3460                    },
3461                    // `recv()` on `UnboundedReceiver` is cancellation safe:
3462                    // https://docs.rs/tokio/1.38.0/tokio/sync/mpsc/struct.UnboundedReceiver.html#cancel-safety
3463                    // Receive a single command.
3464                    Some(pending_read_txn) = strict_serializable_reads_rx.recv() => {
3465                        let mut pending_read_txns = vec![pending_read_txn];
3466                        while let Ok(pending_read_txn) = strict_serializable_reads_rx.try_recv() {
3467                            pending_read_txns.push(pending_read_txn);
3468                        }
3469                        for (conn_id, pending_read_txn) in pending_read_txns {
3470                            let prev = self.pending_linearize_read_txns.insert(conn_id, pending_read_txn);
3471                            soft_assert_or_log!(
3472                                prev.is_none(),
3473                                "connections can not have multiple concurrent reads, prev: {prev:?}"
3474                            )
3475                        }
3476                        messages.push(Message::LinearizeReads);
3477                    }
3478                    // `tick()` on `Interval` is cancel-safe:
3479                    // https://docs.rs/tokio/1.19.2/tokio/time/struct.Interval.html#cancel-safety
3480                    // Receive a single command.
3481                    _ = self.advance_timelines_interval.tick() => {
3482                        let span = info_span!(parent: None, "coord::advance_timelines_interval");
3483                        span.follows_from(Span::current());
3484
3485                        // Group commit sends an `AdvanceTimelines` message when
3486                        // done, which is what downgrades read holds. In
3487                        // read-only mode we send this message directly because
3488                        // we're not doing group commits.
3489                        if self.controller.read_only() {
3490                            messages.push(Message::AdvanceTimelines);
3491                        } else {
3492                            messages.push(Message::GroupCommitInitiate(span, None));
3493                        }
3494                    },
3495                    // `tick()` on `Interval` is cancel-safe:
3496                    // https://docs.rs/tokio/1.19.2/tokio/time/struct.Interval.html#cancel-safety
3497                    // Receive a single command.
3498                    _ = self.check_cluster_scheduling_policies_interval.tick() => {
3499                        messages.push(Message::CheckSchedulingPolicies);
3500                    },
3501
3502                    // `tick()` on `Interval` is cancel-safe:
3503                    // https://docs.rs/tokio/1.19.2/tokio/time/struct.Interval.html#cancel-safety
3504                    // Receive a single command.
3505                    _ = self.caught_up_check_interval.tick() => {
3506                        // We do this directly on the main loop instead of
3507                        // firing off a message. We are still in read-only mode,
3508                        // so optimizing for latency, not blocking the main loop
3509                        // is not that important.
3510                        self.maybe_check_caught_up().await;
3511
3512                        continue;
3513                    },
3514
3515                    // Process the idle metric at the lowest priority to sample queue non-idle time.
3516                    // `recv()` on `Receiver` is cancellation safe:
3517                    // https://docs.rs/tokio/1.8.0/tokio/sync/mpsc/struct.Receiver.html#cancel-safety
3518                    // Receive a single command.
3519                    timer = idle_rx.recv() => {
3520                        timer.expect("does not drop").observe_duration();
3521                        self.metrics
3522                            .message_handling
3523                            .with_label_values(&["watchdog"])
3524                            .observe(0.0);
3525                        continue;
3526                    }
3527                };
3528
3529                // Observe the number of messages we're processing at once.
3530                message_batch.observe(f64::cast_lossy(messages.len()));
3531
3532                for msg in messages.drain(..) {
3533                    // All message processing functions trace. Start a parent span
3534                    // for them to make it easy to find slow messages.
3535                    let msg_kind = msg.kind();
3536                    let span = span!(
3537                        target: "mz_adapter::coord::handle_message_loop",
3538                        Level::INFO,
3539                        "coord::handle_message",
3540                        kind = msg_kind
3541                    );
3542                    let otel_context = span.context().span().span_context().clone();
3543
3544                    // Record the last kind of message in case we get stuck. For
3545                    // execute commands, we additionally stash the user's SQL,
3546                    // statement, so we can log it in case we get stuck.
3547                    *last_message.lock().expect("poisoned") = LastMessage {
3548                        kind: msg_kind,
3549                        stmt: match &msg {
3550                            Message::Command(
3551                                _,
3552                                Command::Execute {
3553                                    portal_name,
3554                                    session,
3555                                    ..
3556                                },
3557                            ) => session
3558                                .get_portal_unverified(portal_name)
3559                                .and_then(|p| p.stmt.as_ref().map(Arc::clone)),
3560                            _ => None,
3561                        },
3562                    };
3563
3564                    let start = Instant::now();
3565                    self.handle_message(msg).instrument(span).await;
3566                    let duration = start.elapsed();
3567
3568                    self.metrics
3569                        .message_handling
3570                        .with_label_values(&[msg_kind])
3571                        .observe(duration.as_secs_f64());
3572
3573                    // If something is _really_ slow, print a trace id for debugging, if OTEL is enabled.
3574                    if duration > warn_threshold {
3575                        let trace_id = otel_context.is_valid().then(|| otel_context.trace_id());
3576                        tracing::error!(
3577                            ?msg_kind,
3578                            ?trace_id,
3579                            ?duration,
3580                            "very slow coordinator message"
3581                        );
3582                    }
3583                }
3584            }
3585            // Try and cleanup as a best effort. There may be some async tasks out there holding a
3586            // reference that prevents us from cleaning up.
3587            if let Some(catalog) = Arc::into_inner(self.catalog) {
3588                catalog.expire().await;
3589            }
3590        }
3591        .boxed_local()
3592    }
3593
3594    /// Obtain a read-only Catalog reference.
3595    fn catalog(&self) -> &Catalog {
3596        &self.catalog
3597    }
3598
3599    /// Obtain a read-only Catalog snapshot, suitable for giving out to
3600    /// non-Coordinator thread tasks.
3601    fn owned_catalog(&self) -> Arc<Catalog> {
3602        Arc::clone(&self.catalog)
3603    }
3604
3605    /// Obtain a handle to the optimizer metrics, suitable for giving
3606    /// out to non-Coordinator thread tasks.
3607    fn optimizer_metrics(&self) -> OptimizerMetrics {
3608        self.optimizer_metrics.clone()
3609    }
3610
3611    /// Obtain a writeable Catalog reference.
3612    fn catalog_mut(&mut self) -> &mut Catalog {
3613        // make_mut will cause any other Arc references (from owned_catalog) to
3614        // continue to be valid by cloning the catalog, putting it in a new Arc,
3615        // which lives at self._catalog. If there are no other Arc references,
3616        // then no clone is made, and it returns a reference to the existing
3617        // object. This makes this method and owned_catalog both very cheap: at
3618        // most one clone per catalog mutation, but only if there's a read-only
3619        // reference to it.
3620        Arc::make_mut(&mut self.catalog)
3621    }
3622
3623    /// Obtain a reference to the coordinator's connection context.
3624    fn connection_context(&self) -> &ConnectionContext {
3625        self.controller.connection_context()
3626    }
3627
3628    /// Obtain a reference to the coordinator's secret reader, in an `Arc`.
3629    fn secrets_reader(&self) -> &Arc<dyn SecretsReader> {
3630        &self.connection_context().secrets_reader
3631    }
3632
3633    /// Publishes a notice message to all sessions.
3634    ///
3635    /// TODO(parkmycar): This code is dead, but is a nice parallel to [`Coordinator::broadcast_notice_tx`]
3636    /// so we keep it around.
3637    #[allow(dead_code)]
3638    pub(crate) fn broadcast_notice(&self, notice: AdapterNotice) {
3639        for meta in self.active_conns.values() {
3640            let _ = meta.notice_tx.send(notice.clone());
3641        }
3642    }
3643
3644    /// Returns a closure that will publish a notice to all sessions that were active at the time
3645    /// this method was called.
3646    pub(crate) fn broadcast_notice_tx(
3647        &self,
3648    ) -> Box<dyn FnOnce(AdapterNotice) -> () + Send + 'static> {
3649        let senders: Vec<_> = self
3650            .active_conns
3651            .values()
3652            .map(|meta| meta.notice_tx.clone())
3653            .collect();
3654        Box::new(move |notice| {
3655            for tx in senders {
3656                let _ = tx.send(notice.clone());
3657            }
3658        })
3659    }
3660
3661    pub(crate) fn active_conns(&self) -> &BTreeMap<ConnectionId, ConnMeta> {
3662        &self.active_conns
3663    }
3664
3665    #[instrument(level = "debug")]
3666    pub(crate) fn retire_execution(
3667        &mut self,
3668        reason: StatementEndedExecutionReason,
3669        ctx_extra: ExecuteContextExtra,
3670    ) {
3671        if let Some(uuid) = ctx_extra.retire() {
3672            self.end_statement_execution(uuid, reason);
3673        }
3674    }
3675
3676    /// Creates a new dataflow builder from the catalog and indexes in `self`.
3677    #[instrument(level = "debug")]
3678    pub fn dataflow_builder(&self, instance: ComputeInstanceId) -> DataflowBuilder<'_> {
3679        let compute = self
3680            .instance_snapshot(instance)
3681            .expect("compute instance does not exist");
3682        DataflowBuilder::new(self.catalog().state(), compute)
3683    }
3684
3685    /// Return a reference-less snapshot to the indicated compute instance.
3686    pub fn instance_snapshot(
3687        &self,
3688        id: ComputeInstanceId,
3689    ) -> Result<ComputeInstanceSnapshot, InstanceMissing> {
3690        ComputeInstanceSnapshot::new(&self.controller, id)
3691    }
3692
3693    /// Call into the compute controller to install a finalized dataflow, and
3694    /// initialize the read policies for its exported readable objects.
3695    ///
3696    /// # Panics
3697    ///
3698    /// Panics if dataflow creation fails.
3699    pub(crate) async fn ship_dataflow(
3700        &mut self,
3701        dataflow: DataflowDescription<Plan>,
3702        instance: ComputeInstanceId,
3703        subscribe_target_replica: Option<ReplicaId>,
3704    ) {
3705        self.try_ship_dataflow(dataflow, instance, subscribe_target_replica)
3706            .await
3707            .unwrap_or_terminate("dataflow creation cannot fail");
3708    }
3709
3710    /// Call into the compute controller to install a finalized dataflow, and
3711    /// initialize the read policies for its exported readable objects.
3712    pub(crate) async fn try_ship_dataflow(
3713        &mut self,
3714        dataflow: DataflowDescription<Plan>,
3715        instance: ComputeInstanceId,
3716        subscribe_target_replica: Option<ReplicaId>,
3717    ) -> Result<(), DataflowCreationError> {
3718        // We must only install read policies for indexes, not for sinks.
3719        // Sinks are write-only compute collections that don't have read policies.
3720        let export_ids = dataflow.exported_index_ids().collect();
3721
3722        self.controller
3723            .compute
3724            .create_dataflow(instance, dataflow, subscribe_target_replica)?;
3725
3726        self.initialize_compute_read_policies(export_ids, instance, CompactionWindow::Default)
3727            .await;
3728
3729        Ok(())
3730    }
3731
3732    /// Call into the compute controller to allow writes to the specified IDs
3733    /// from the specified instance. Calling this function multiple times and
3734    /// calling it on a read-only instance has no effect.
3735    pub(crate) fn allow_writes(&mut self, instance: ComputeInstanceId, id: GlobalId) {
3736        self.controller
3737            .compute
3738            .allow_writes(instance, id)
3739            .unwrap_or_terminate("allow_writes cannot fail");
3740    }
3741
3742    /// Like `ship_dataflow`, but also await on builtin table updates.
3743    pub(crate) async fn ship_dataflow_and_notice_builtin_table_updates(
3744        &mut self,
3745        dataflow: DataflowDescription<Plan>,
3746        instance: ComputeInstanceId,
3747        notice_builtin_updates_fut: Option<BuiltinTableAppendNotify>,
3748    ) {
3749        if let Some(notice_builtin_updates_fut) = notice_builtin_updates_fut {
3750            let ship_dataflow_fut = self.ship_dataflow(dataflow, instance, None);
3751            let ((), ()) =
3752                futures::future::join(notice_builtin_updates_fut, ship_dataflow_fut).await;
3753        } else {
3754            self.ship_dataflow(dataflow, instance, None).await;
3755        }
3756    }
3757
3758    /// Install a _watch set_ in the controller that is automatically associated with the given
3759    /// connection id. The watchset will be automatically cleared if the connection terminates
3760    /// before the watchset completes.
3761    pub fn install_compute_watch_set(
3762        &mut self,
3763        conn_id: ConnectionId,
3764        objects: BTreeSet<GlobalId>,
3765        t: Timestamp,
3766        state: WatchSetResponse,
3767    ) {
3768        let ws_id = self.controller.install_compute_watch_set(objects, t);
3769        self.connection_watch_sets
3770            .entry(conn_id.clone())
3771            .or_default()
3772            .insert(ws_id);
3773        self.installed_watch_sets.insert(ws_id, (conn_id, state));
3774    }
3775
3776    /// Install a _watch set_ in the controller that is automatically associated with the given
3777    /// connection id. The watchset will be automatically cleared if the connection terminates
3778    /// before the watchset completes.
3779    pub fn install_storage_watch_set(
3780        &mut self,
3781        conn_id: ConnectionId,
3782        objects: BTreeSet<GlobalId>,
3783        t: Timestamp,
3784        state: WatchSetResponse,
3785    ) {
3786        let ws_id = self.controller.install_storage_watch_set(objects, t);
3787        self.connection_watch_sets
3788            .entry(conn_id.clone())
3789            .or_default()
3790            .insert(ws_id);
3791        self.installed_watch_sets.insert(ws_id, (conn_id, state));
3792    }
3793
3794    /// Cancels pending watchsets associated with the provided connection id.
3795    pub fn cancel_pending_watchsets(&mut self, conn_id: &ConnectionId) {
3796        if let Some(ws_ids) = self.connection_watch_sets.remove(conn_id) {
3797            for ws_id in ws_ids {
3798                self.installed_watch_sets.remove(&ws_id);
3799            }
3800        }
3801    }
3802
3803    /// Returns the state of the [`Coordinator`] formatted as JSON.
3804    ///
3805    /// The returned value is not guaranteed to be stable and may change at any point in time.
3806    pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
3807        // Note: We purposefully use the `Debug` formatting for the value of all fields in the
3808        // returned object as a tradeoff between usability and stability. `serde_json` will fail
3809        // to serialize an object if the keys aren't strings, so `Debug` formatting the values
3810        // prevents a future unrelated change from silently breaking this method.
3811
3812        let global_timelines: BTreeMap<_, _> = self
3813            .global_timelines
3814            .iter()
3815            .map(|(timeline, state)| (timeline.to_string(), format!("{state:?}")))
3816            .collect();
3817        let active_conns: BTreeMap<_, _> = self
3818            .active_conns
3819            .iter()
3820            .map(|(id, meta)| (id.unhandled().to_string(), format!("{meta:?}")))
3821            .collect();
3822        let txn_read_holds: BTreeMap<_, _> = self
3823            .txn_read_holds
3824            .iter()
3825            .map(|(id, capability)| (id.unhandled().to_string(), format!("{capability:?}")))
3826            .collect();
3827        let pending_peeks: BTreeMap<_, _> = self
3828            .pending_peeks
3829            .iter()
3830            .map(|(id, peek)| (id.to_string(), format!("{peek:?}")))
3831            .collect();
3832        let client_pending_peeks: BTreeMap<_, _> = self
3833            .client_pending_peeks
3834            .iter()
3835            .map(|(id, peek)| {
3836                let peek: BTreeMap<_, _> = peek
3837                    .iter()
3838                    .map(|(uuid, storage_id)| (uuid.to_string(), storage_id))
3839                    .collect();
3840                (id.to_string(), peek)
3841            })
3842            .collect();
3843        let pending_linearize_read_txns: BTreeMap<_, _> = self
3844            .pending_linearize_read_txns
3845            .iter()
3846            .map(|(id, read_txn)| (id.unhandled().to_string(), format!("{read_txn:?}")))
3847            .collect();
3848
3849        let map = serde_json::Map::from_iter([
3850            (
3851                "global_timelines".to_string(),
3852                serde_json::to_value(global_timelines)?,
3853            ),
3854            (
3855                "active_conns".to_string(),
3856                serde_json::to_value(active_conns)?,
3857            ),
3858            (
3859                "txn_read_holds".to_string(),
3860                serde_json::to_value(txn_read_holds)?,
3861            ),
3862            (
3863                "pending_peeks".to_string(),
3864                serde_json::to_value(pending_peeks)?,
3865            ),
3866            (
3867                "client_pending_peeks".to_string(),
3868                serde_json::to_value(client_pending_peeks)?,
3869            ),
3870            (
3871                "pending_linearize_read_txns".to_string(),
3872                serde_json::to_value(pending_linearize_read_txns)?,
3873            ),
3874            ("controller".to_string(), self.controller.dump().await?),
3875        ]);
3876        Ok(serde_json::Value::Object(map))
3877    }
3878
3879    /// Prune all storage usage events from the [`MZ_STORAGE_USAGE_BY_SHARD`] table that are older
3880    /// than `retention_period`.
3881    ///
3882    /// This method will read the entire contents of [`MZ_STORAGE_USAGE_BY_SHARD`] into memory
3883    /// which can be expensive.
3884    ///
3885    /// DO NOT call this method outside of startup. The safety of reading at the current oracle read
3886    /// timestamp and then writing at whatever the current write timestamp is (instead of
3887    /// `read_ts + 1`) relies on the fact that there are no outstanding writes during startup.
3888    ///
3889    /// Group commit, which this method uses to write the retractions, has builtin fencing, and we
3890    /// never commit retractions to [`MZ_STORAGE_USAGE_BY_SHARD`] outside of this method, which is
3891    /// only called once during startup. So we don't have to worry about double/invalid retractions.
3892    async fn prune_storage_usage_events_on_startup(&self, retention_period: Duration) {
3893        let item_id = self
3894            .catalog()
3895            .resolve_builtin_table(&MZ_STORAGE_USAGE_BY_SHARD);
3896        let global_id = self.catalog.get_entry(&item_id).latest_global_id();
3897        let read_ts = self.get_local_read_ts().await;
3898        let current_contents_fut = self
3899            .controller
3900            .storage_collections
3901            .snapshot(global_id, read_ts);
3902        let internal_cmd_tx = self.internal_cmd_tx.clone();
3903        spawn(|| "storage_usage_prune", async move {
3904            let mut current_contents = current_contents_fut
3905                .await
3906                .unwrap_or_terminate("cannot fail to fetch snapshot");
3907            differential_dataflow::consolidation::consolidate(&mut current_contents);
3908
3909            let cutoff_ts = u128::from(read_ts).saturating_sub(retention_period.as_millis());
3910            let mut expired = Vec::new();
3911            for (row, diff) in current_contents {
3912                assert_eq!(
3913                    diff, 1,
3914                    "consolidated contents should not contain retractions: ({row:#?}, {diff:#?})"
3915                );
3916                // This logic relies on the definition of `mz_storage_usage_by_shard` not changing.
3917                let collection_timestamp = row
3918                    .unpack()
3919                    .get(3)
3920                    .expect("definition of mz_storage_by_shard changed")
3921                    .unwrap_timestamptz();
3922                let collection_timestamp = collection_timestamp.timestamp_millis();
3923                let collection_timestamp: u128 = collection_timestamp
3924                    .try_into()
3925                    .expect("all collections happen after Jan 1 1970");
3926                if collection_timestamp < cutoff_ts {
3927                    debug!("pruning storage event {row:?}");
3928                    let builtin_update = BuiltinTableUpdate::row(item_id, row, Diff::MINUS_ONE);
3929                    expired.push(builtin_update);
3930                }
3931            }
3932
3933            // main thread has shut down.
3934            let _ = internal_cmd_tx.send(Message::StorageUsagePrune(expired));
3935        });
3936    }
3937
3938    fn current_credit_consumption_rate(&self) -> Numeric {
3939        self.catalog()
3940            .user_cluster_replicas()
3941            .filter_map(|replica| match &replica.config.location {
3942                ReplicaLocation::Managed(location) => Some(location.size_for_billing()),
3943                ReplicaLocation::Unmanaged(_) => None,
3944            })
3945            .map(|size| {
3946                self.catalog()
3947                    .cluster_replica_sizes()
3948                    .0
3949                    .get(size)
3950                    .expect("location size is validated against the cluster replica sizes")
3951                    .credits_per_hour
3952            })
3953            .sum()
3954    }
3955}
3956
3957#[cfg(test)]
3958impl Coordinator {
3959    #[allow(dead_code)]
3960    async fn verify_ship_dataflow_no_error(&mut self, dataflow: DataflowDescription<Plan>) {
3961        // `ship_dataflow_new` is not allowed to have a `Result` return because this function is
3962        // called after `catalog_transact`, after which no errors are allowed. This test exists to
3963        // prevent us from incorrectly teaching those functions how to return errors (which has
3964        // happened twice and is the motivation for this test).
3965
3966        // An arbitrary compute instance ID to satisfy the function calls below. Note that
3967        // this only works because this function will never run.
3968        let compute_instance = ComputeInstanceId::user(1).expect("1 is a valid ID");
3969
3970        let _: () = self.ship_dataflow(dataflow, compute_instance, None).await;
3971    }
3972}
3973
3974/// Contains information about the last message the [`Coordinator`] processed.
3975struct LastMessage {
3976    kind: &'static str,
3977    stmt: Option<Arc<Statement<Raw>>>,
3978}
3979
3980impl LastMessage {
3981    /// Returns a redacted version of the statement that is safe for logs.
3982    fn stmt_to_string(&self) -> Cow<'static, str> {
3983        self.stmt
3984            .as_ref()
3985            .map(|stmt| stmt.to_ast_string_redacted().into())
3986            .unwrap_or(Cow::Borrowed("<none>"))
3987    }
3988}
3989
3990impl fmt::Debug for LastMessage {
3991    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3992        f.debug_struct("LastMessage")
3993            .field("kind", &self.kind)
3994            .field("stmt", &self.stmt_to_string())
3995            .finish()
3996    }
3997}
3998
3999impl Drop for LastMessage {
4000    fn drop(&mut self) {
4001        // Only print the last message if we're currently panicking, otherwise we'd spam our logs.
4002        if std::thread::panicking() {
4003            // If we're panicking theres no guarantee `tracing` still works, so print to stderr.
4004            eprintln!("Coordinator panicking, dumping last message\n{self:?}",);
4005        }
4006    }
4007}
4008
4009/// Serves the coordinator based on the provided configuration.
4010///
4011/// For a high-level description of the coordinator, see the [crate
4012/// documentation](crate).
4013///
4014/// Returns a handle to the coordinator and a client to communicate with the
4015/// coordinator.
4016///
4017/// BOXED FUTURE: As of Nov 2023 the returned Future from this function was 42KB. This would
4018/// get stored on the stack which is bad for runtime performance, and blow up our stack usage.
4019/// Because of that we purposefully move this Future onto the heap (i.e. Box it).
4020pub fn serve(
4021    Config {
4022        controller_config,
4023        controller_envd_epoch,
4024        mut storage,
4025        audit_logs_iterator,
4026        timestamp_oracle_url,
4027        unsafe_mode,
4028        all_features,
4029        build_info,
4030        environment_id,
4031        metrics_registry,
4032        now,
4033        secrets_controller,
4034        cloud_resource_controller,
4035        cluster_replica_sizes,
4036        builtin_system_cluster_config,
4037        builtin_catalog_server_cluster_config,
4038        builtin_probe_cluster_config,
4039        builtin_support_cluster_config,
4040        builtin_analytics_cluster_config,
4041        system_parameter_defaults,
4042        availability_zones,
4043        storage_usage_client,
4044        storage_usage_collection_interval,
4045        storage_usage_retention_period,
4046        segment_client,
4047        egress_addresses,
4048        aws_account_id,
4049        aws_privatelink_availability_zones,
4050        connection_context,
4051        connection_limit_callback,
4052        remote_system_parameters,
4053        webhook_concurrency_limit,
4054        http_host_name,
4055        tracing_handle,
4056        read_only_controllers,
4057        caught_up_trigger: clusters_caught_up_trigger,
4058        helm_chart_version,
4059        license_key,
4060        external_login_password_mz_system,
4061        force_builtin_schema_migration,
4062    }: Config,
4063) -> BoxFuture<'static, Result<(Handle, Client), AdapterError>> {
4064    async move {
4065        let coord_start = Instant::now();
4066        info!("startup: coordinator init: beginning");
4067        info!("startup: coordinator init: preamble beginning");
4068
4069        // Initializing the builtins can be an expensive process and consume a lot of memory. We
4070        // forcibly initialize it early while the stack is relatively empty to avoid stack
4071        // overflows later.
4072        let _builtins = LazyLock::force(&BUILTINS_STATIC);
4073
4074        let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
4075        let (internal_cmd_tx, internal_cmd_rx) = mpsc::unbounded_channel();
4076        let (strict_serializable_reads_tx, strict_serializable_reads_rx) =
4077            mpsc::unbounded_channel();
4078
4079        // Validate and process availability zones.
4080        if !availability_zones.iter().all_unique() {
4081            coord_bail!("availability zones must be unique");
4082        }
4083
4084        let aws_principal_context = match (
4085            aws_account_id,
4086            connection_context.aws_external_id_prefix.clone(),
4087        ) {
4088            (Some(aws_account_id), Some(aws_external_id_prefix)) => Some(AwsPrincipalContext {
4089                aws_account_id,
4090                aws_external_id_prefix,
4091            }),
4092            _ => None,
4093        };
4094
4095        let aws_privatelink_availability_zones = aws_privatelink_availability_zones
4096            .map(|azs_vec| BTreeSet::from_iter(azs_vec.iter().cloned()));
4097
4098        info!(
4099            "startup: coordinator init: preamble complete in {:?}",
4100            coord_start.elapsed()
4101        );
4102        let oracle_init_start = Instant::now();
4103        info!("startup: coordinator init: timestamp oracle init beginning");
4104
4105        let pg_timestamp_oracle_config = timestamp_oracle_url
4106            .map(|pg_url| PostgresTimestampOracleConfig::new(&pg_url, &metrics_registry));
4107        let mut initial_timestamps =
4108            get_initial_oracle_timestamps(&pg_timestamp_oracle_config).await?;
4109
4110        // Insert an entry for the `EpochMilliseconds` timeline if one doesn't exist,
4111        // which will ensure that the timeline is initialized since it's required
4112        // by the system.
4113        initial_timestamps
4114            .entry(Timeline::EpochMilliseconds)
4115            .or_insert_with(mz_repr::Timestamp::minimum);
4116        let mut timestamp_oracles = BTreeMap::new();
4117        for (timeline, initial_timestamp) in initial_timestamps {
4118            Coordinator::ensure_timeline_state_with_initial_time(
4119                &timeline,
4120                initial_timestamp,
4121                now.clone(),
4122                pg_timestamp_oracle_config.clone(),
4123                &mut timestamp_oracles,
4124                read_only_controllers,
4125            )
4126            .await;
4127        }
4128
4129        // Opening the durable catalog uses one or more timestamps without communicating with
4130        // the timestamp oracle. Here we make sure to apply the catalog upper with the timestamp
4131        // oracle to linearize future operations with opening the catalog.
4132        let catalog_upper = storage.current_upper().await;
4133        // Choose a time at which to boot. This is used, for example, to prune
4134        // old storage usage data or migrate audit log entries.
4135        //
4136        // This time is usually the current system time, but with protection
4137        // against backwards time jumps, even across restarts.
4138        let epoch_millis_oracle = &timestamp_oracles
4139            .get(&Timeline::EpochMilliseconds)
4140            .expect("inserted above")
4141            .oracle;
4142
4143        let mut boot_ts = if read_only_controllers {
4144            let read_ts = epoch_millis_oracle.read_ts().await;
4145            std::cmp::max(read_ts, catalog_upper)
4146        } else {
4147            // Getting/applying a write timestamp bumps the write timestamp in the
4148            // oracle, which we're not allowed in read-only mode.
4149            epoch_millis_oracle.apply_write(catalog_upper).await;
4150            epoch_millis_oracle.write_ts().await.timestamp
4151        };
4152
4153        info!(
4154            "startup: coordinator init: timestamp oracle init complete in {:?}",
4155            oracle_init_start.elapsed()
4156        );
4157
4158        let catalog_open_start = Instant::now();
4159        info!("startup: coordinator init: catalog open beginning");
4160        let persist_client = controller_config
4161            .persist_clients
4162            .open(controller_config.persist_location.clone())
4163            .await
4164            .context("opening persist client")?;
4165        let builtin_item_migration_config =
4166            BuiltinItemMigrationConfig {
4167                persist_client: persist_client.clone(),
4168                read_only: read_only_controllers,
4169                force_migration: force_builtin_schema_migration,
4170            }
4171        ;
4172        let OpenCatalogResult {
4173            mut catalog,
4174            migrated_storage_collections_0dt,
4175            new_builtin_collections,
4176            builtin_table_updates,
4177            cached_global_exprs,
4178            uncached_local_exprs,
4179        } = Catalog::open(mz_catalog::config::Config {
4180            storage,
4181            metrics_registry: &metrics_registry,
4182            state: mz_catalog::config::StateConfig {
4183                unsafe_mode,
4184                all_features,
4185                build_info,
4186                deploy_generation: controller_config.deploy_generation,
4187                environment_id: environment_id.clone(),
4188                read_only: read_only_controllers,
4189                now: now.clone(),
4190                boot_ts: boot_ts.clone(),
4191                skip_migrations: false,
4192                cluster_replica_sizes,
4193                builtin_system_cluster_config,
4194                builtin_catalog_server_cluster_config,
4195                builtin_probe_cluster_config,
4196                builtin_support_cluster_config,
4197                builtin_analytics_cluster_config,
4198                system_parameter_defaults,
4199                remote_system_parameters,
4200                availability_zones,
4201                egress_addresses,
4202                aws_principal_context,
4203                aws_privatelink_availability_zones,
4204                connection_context,
4205                http_host_name,
4206                builtin_item_migration_config,
4207                persist_client: persist_client.clone(),
4208                enable_expression_cache_override: None,
4209                helm_chart_version,
4210                external_login_password_mz_system,
4211                license_key: license_key.clone(),
4212            },
4213        })
4214        .await?;
4215
4216        // Opening the catalog uses one or more timestamps, so push the boot timestamp up to the
4217        // current catalog upper.
4218        let catalog_upper = catalog.current_upper().await;
4219        boot_ts = std::cmp::max(boot_ts, catalog_upper);
4220
4221        if !read_only_controllers {
4222            epoch_millis_oracle.apply_write(boot_ts).await;
4223        }
4224
4225        info!(
4226            "startup: coordinator init: catalog open complete in {:?}",
4227            catalog_open_start.elapsed()
4228        );
4229
4230        let coord_thread_start = Instant::now();
4231        info!("startup: coordinator init: coordinator thread start beginning");
4232
4233        let session_id = catalog.config().session_id;
4234        let start_instant = catalog.config().start_instant;
4235
4236        // In order for the coordinator to support Rc and Refcell types, it cannot be
4237        // sent across threads. Spawn it in a thread and have this parent thread wait
4238        // for bootstrap completion before proceeding.
4239        let (bootstrap_tx, bootstrap_rx) = oneshot::channel();
4240        let handle = TokioHandle::current();
4241
4242        let metrics = Metrics::register_into(&metrics_registry);
4243        let metrics_clone = metrics.clone();
4244        let optimizer_metrics = OptimizerMetrics::register_into(
4245            &metrics_registry,
4246            catalog.system_config().optimizer_e2e_latency_warning_threshold(),
4247        );
4248        let segment_client_clone = segment_client.clone();
4249        let coord_now = now.clone();
4250        let advance_timelines_interval = tokio::time::interval(catalog.config().timestamp_interval);
4251        let mut check_scheduling_policies_interval = tokio::time::interval(
4252            catalog
4253                .system_config()
4254                .cluster_check_scheduling_policies_interval(),
4255        );
4256        check_scheduling_policies_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
4257
4258        let clusters_caught_up_check_interval = if read_only_controllers {
4259            let dyncfgs = catalog.system_config().dyncfgs();
4260            let interval = WITH_0DT_DEPLOYMENT_CAUGHT_UP_CHECK_INTERVAL.get(dyncfgs);
4261
4262            let mut interval = tokio::time::interval(interval);
4263            interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
4264            interval
4265        } else {
4266            // When not in read-only mode, we don't do hydration checks. But we
4267            // still have to provide _some_ interval. This is large enough that
4268            // it doesn't matter.
4269            //
4270            // TODO(aljoscha): We cannot use Duration::MAX right now because of
4271            // https://github.com/tokio-rs/tokio/issues/6634. Use that once it's
4272            // fixed for good.
4273            let mut interval = tokio::time::interval(Duration::from_secs(60 * 60));
4274            interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
4275            interval
4276        };
4277
4278        let clusters_caught_up_check =
4279            clusters_caught_up_trigger.map(|trigger| CaughtUpCheckContext {
4280                trigger,
4281                exclude_collections: new_builtin_collections.into_iter().collect(),
4282            });
4283
4284        if let Some(config) = pg_timestamp_oracle_config.as_ref() {
4285            // Apply settings from system vars as early as possible because some
4286            // of them are locked in right when an oracle is first opened!
4287            let pg_timestamp_oracle_params =
4288                flags::pg_timstamp_oracle_config(catalog.system_config());
4289            pg_timestamp_oracle_params.apply(config);
4290        }
4291
4292        // Register a callback so whenever the MAX_CONNECTIONS or SUPERUSER_RESERVED_CONNECTIONS
4293        // system variables change, we update our connection limits.
4294        let connection_limit_callback: Arc<dyn Fn(&SystemVars) + Send + Sync> =
4295            Arc::new(move |system_vars: &SystemVars| {
4296                let limit: u64 = system_vars.max_connections().cast_into();
4297                let superuser_reserved: u64 =
4298                    system_vars.superuser_reserved_connections().cast_into();
4299
4300                // If superuser_reserved > max_connections, prefer max_connections.
4301                //
4302                // In this scenario all normal users would be locked out because all connections
4303                // would be reserved for superusers so complain if this is the case.
4304                let superuser_reserved = if superuser_reserved >= limit {
4305                    tracing::warn!(
4306                        "superuser_reserved ({superuser_reserved}) is greater than max connections ({limit})!"
4307                    );
4308                    limit
4309                } else {
4310                    superuser_reserved
4311                };
4312
4313                (connection_limit_callback)(limit, superuser_reserved);
4314            });
4315        catalog.system_config_mut().register_callback(
4316            &mz_sql::session::vars::MAX_CONNECTIONS,
4317            Arc::clone(&connection_limit_callback),
4318        );
4319        catalog.system_config_mut().register_callback(
4320            &mz_sql::session::vars::SUPERUSER_RESERVED_CONNECTIONS,
4321            connection_limit_callback,
4322        );
4323
4324        let (group_commit_tx, group_commit_rx) = appends::notifier();
4325
4326        let parent_span = tracing::Span::current();
4327        let thread = thread::Builder::new()
4328            // The Coordinator thread tends to keep a lot of data on its stack. To
4329            // prevent a stack overflow we allocate a stack three times as big as the default
4330            // stack.
4331            .stack_size(3 * stack::STACK_SIZE)
4332            .name("coordinator".to_string())
4333            .spawn(move || {
4334                let span = info_span!(parent: parent_span, "coord::coordinator").entered();
4335
4336                let controller = handle
4337                    .block_on({
4338                        catalog.initialize_controller(
4339                            controller_config,
4340                            controller_envd_epoch,
4341                            read_only_controllers,
4342                        )
4343                    })
4344                    .unwrap_or_terminate("failed to initialize storage_controller");
4345                // Initializing the controller uses one or more timestamps, so push the boot timestamp up to the
4346                // current catalog upper.
4347                let catalog_upper = handle.block_on(catalog.current_upper());
4348                boot_ts = std::cmp::max(boot_ts, catalog_upper);
4349                if !read_only_controllers {
4350                    let epoch_millis_oracle = &timestamp_oracles
4351                        .get(&Timeline::EpochMilliseconds)
4352                        .expect("inserted above")
4353                        .oracle;
4354                    handle.block_on(epoch_millis_oracle.apply_write(boot_ts));
4355                }
4356
4357                let catalog = Arc::new(catalog);
4358
4359                let caching_secrets_reader = CachingSecretsReader::new(secrets_controller.reader());
4360                let mut coord = Coordinator {
4361                    controller,
4362                    catalog,
4363                    internal_cmd_tx,
4364                    group_commit_tx,
4365                    strict_serializable_reads_tx,
4366                    global_timelines: timestamp_oracles,
4367                    transient_id_gen: Arc::new(TransientIdGen::new()),
4368                    active_conns: BTreeMap::new(),
4369                    txn_read_holds: Default::default(),
4370                    pending_peeks: BTreeMap::new(),
4371                    client_pending_peeks: BTreeMap::new(),
4372                    pending_linearize_read_txns: BTreeMap::new(),
4373                    serialized_ddl: LockedVecDeque::new(),
4374                    active_compute_sinks: BTreeMap::new(),
4375                    active_webhooks: BTreeMap::new(),
4376                    active_copies: BTreeMap::new(),
4377                    staged_cancellation: BTreeMap::new(),
4378                    introspection_subscribes: BTreeMap::new(),
4379                    write_locks: BTreeMap::new(),
4380                    deferred_write_ops: BTreeMap::new(),
4381                    pending_writes: Vec::new(),
4382                    advance_timelines_interval,
4383                    secrets_controller,
4384                    caching_secrets_reader,
4385                    cloud_resource_controller,
4386                    storage_usage_client,
4387                    storage_usage_collection_interval,
4388                    segment_client,
4389                    metrics,
4390                    optimizer_metrics,
4391                    tracing_handle,
4392                    statement_logging: StatementLogging::new(coord_now.clone()),
4393                    webhook_concurrency_limit,
4394                    pg_timestamp_oracle_config,
4395                    check_cluster_scheduling_policies_interval: check_scheduling_policies_interval,
4396                    cluster_scheduling_decisions: BTreeMap::new(),
4397                    caught_up_check_interval: clusters_caught_up_check_interval,
4398                    caught_up_check: clusters_caught_up_check,
4399                    installed_watch_sets: BTreeMap::new(),
4400                    connection_watch_sets: BTreeMap::new(),
4401                    cluster_replica_statuses: ClusterReplicaStatuses::new(),
4402                    read_only_controllers,
4403                    buffered_builtin_table_updates: Some(Vec::new()),
4404                    license_key,
4405                    persist_client,
4406                };
4407                let bootstrap = handle.block_on(async {
4408                    coord
4409                        .bootstrap(
4410                            boot_ts,
4411                            migrated_storage_collections_0dt,
4412                            builtin_table_updates,
4413                            cached_global_exprs,
4414                            uncached_local_exprs,
4415                            audit_logs_iterator,
4416                        )
4417                        .await?;
4418                    coord
4419                        .controller
4420                        .remove_orphaned_replicas(
4421                            coord.catalog().get_next_user_replica_id().await?,
4422                            coord.catalog().get_next_system_replica_id().await?,
4423                        )
4424                        .await
4425                        .map_err(AdapterError::Orchestrator)?;
4426
4427                    if let Some(retention_period) = storage_usage_retention_period {
4428                        coord
4429                            .prune_storage_usage_events_on_startup(retention_period)
4430                            .await;
4431                    }
4432
4433                    Ok(())
4434                });
4435                let ok = bootstrap.is_ok();
4436                drop(span);
4437                bootstrap_tx
4438                    .send(bootstrap)
4439                    .expect("bootstrap_rx is not dropped until it receives this message");
4440                if ok {
4441                    handle.block_on(coord.serve(
4442                        internal_cmd_rx,
4443                        strict_serializable_reads_rx,
4444                        cmd_rx,
4445                        group_commit_rx,
4446                    ));
4447                }
4448            })
4449            .expect("failed to create coordinator thread");
4450        match bootstrap_rx
4451            .await
4452            .expect("bootstrap_tx always sends a message or panics/halts")
4453        {
4454            Ok(()) => {
4455                info!(
4456                    "startup: coordinator init: coordinator thread start complete in {:?}",
4457                    coord_thread_start.elapsed()
4458                );
4459                info!(
4460                    "startup: coordinator init: complete in {:?}",
4461                    coord_start.elapsed()
4462                );
4463                let handle = Handle {
4464                    session_id,
4465                    start_instant,
4466                    _thread: thread.join_on_drop(),
4467                };
4468                let client = Client::new(
4469                    build_info,
4470                    cmd_tx.clone(),
4471                    metrics_clone,
4472                    now,
4473                    environment_id,
4474                    segment_client_clone,
4475                );
4476                Ok((handle, client))
4477            }
4478            Err(e) => Err(e),
4479        }
4480    }
4481    .boxed()
4482}
4483
4484// Determines and returns the highest timestamp for each timeline, for all known
4485// timestamp oracle implementations.
4486//
4487// Initially, we did this so that we can switch between implementations of
4488// timestamp oracle, but now we also do this to determine a monotonic boot
4489// timestamp, a timestamp that does not regress across reboots.
4490//
4491// This mostly works, but there can be linearizability violations, because there
4492// is no central moment where we do distributed coordination for all oracle
4493// types. Working around this seems prohibitively hard, maybe even impossible so
4494// we have to live with this window of potential violations during the upgrade
4495// window (which is the only point where we should switch oracle
4496// implementations).
4497async fn get_initial_oracle_timestamps(
4498    pg_timestamp_oracle_config: &Option<PostgresTimestampOracleConfig>,
4499) -> Result<BTreeMap<Timeline, Timestamp>, AdapterError> {
4500    let mut initial_timestamps = BTreeMap::new();
4501
4502    if let Some(pg_timestamp_oracle_config) = pg_timestamp_oracle_config {
4503        let postgres_oracle_timestamps =
4504            PostgresTimestampOracle::<NowFn>::get_all_timelines(pg_timestamp_oracle_config.clone())
4505                .await?;
4506
4507        let debug_msg = || {
4508            postgres_oracle_timestamps
4509                .iter()
4510                .map(|(timeline, ts)| format!("{:?} -> {}", timeline, ts))
4511                .join(", ")
4512        };
4513        info!(
4514            "current timestamps from the postgres-backed timestamp oracle: {}",
4515            debug_msg()
4516        );
4517
4518        for (timeline, ts) in postgres_oracle_timestamps {
4519            let entry = initial_timestamps
4520                .entry(Timeline::from_str(&timeline).expect("could not parse timeline"));
4521
4522            entry
4523                .and_modify(|current_ts| *current_ts = std::cmp::max(*current_ts, ts))
4524                .or_insert(ts);
4525        }
4526    } else {
4527        info!("no postgres url for postgres-backed timestamp oracle configured!");
4528    };
4529
4530    let debug_msg = || {
4531        initial_timestamps
4532            .iter()
4533            .map(|(timeline, ts)| format!("{:?}: {}", timeline, ts))
4534            .join(", ")
4535    };
4536    info!("initial oracle timestamps: {}", debug_msg());
4537
4538    Ok(initial_timestamps)
4539}
4540
4541#[instrument]
4542pub async fn load_remote_system_parameters(
4543    storage: &mut Box<dyn OpenableDurableCatalogState>,
4544    system_parameter_sync_config: Option<SystemParameterSyncConfig>,
4545    system_parameter_sync_timeout: Duration,
4546) -> Result<Option<BTreeMap<String, String>>, AdapterError> {
4547    if let Some(system_parameter_sync_config) = system_parameter_sync_config {
4548        tracing::info!("parameter sync on boot: start sync");
4549
4550        // We intentionally block initial startup, potentially forever,
4551        // on initializing LaunchDarkly. This may seem scary, but the
4552        // alternative is even scarier. Over time, we expect that the
4553        // compiled-in default values for the system parameters will
4554        // drift substantially from the defaults configured in
4555        // LaunchDarkly, to the point that starting an environment
4556        // without loading the latest values from LaunchDarkly will
4557        // result in running an untested configuration.
4558        //
4559        // Note this only applies during initial startup. Restarting
4560        // after we've synced once only blocks for a maximum of
4561        // `FRONTEND_SYNC_TIMEOUT` on LaunchDarkly, as it seems
4562        // reasonable to assume that the last-synced configuration was
4563        // valid enough.
4564        //
4565        // This philosophy appears to provide a good balance between not
4566        // running untested configurations in production while also not
4567        // making LaunchDarkly a "tier 1" dependency for existing
4568        // environments.
4569        //
4570        // If this proves to be an issue, we could seek to address the
4571        // configuration drift in a different way--for example, by
4572        // writing a script that runs in CI nightly and checks for
4573        // deviation between the compiled Rust code and LaunchDarkly.
4574        //
4575        // If it is absolutely necessary to bring up a new environment
4576        // while LaunchDarkly is down, the following manual mitigation
4577        // can be performed:
4578        //
4579        //    1. Edit the environmentd startup parameters to omit the
4580        //       LaunchDarkly configuration.
4581        //    2. Boot environmentd.
4582        //    3. Use the catalog-debug tool to run `edit config "{\"key\":\"system_config_synced\"}" "{\"value\": 1}"`.
4583        //    4. Adjust any other parameters as necessary to avoid
4584        //       running a nonstandard configuration in production.
4585        //    5. Edit the environmentd startup parameters to restore the
4586        //       LaunchDarkly configuration, for when LaunchDarkly comes
4587        //       back online.
4588        //    6. Reboot environmentd.
4589        let mut params = SynchronizedParameters::new(SystemVars::default());
4590        let frontend_sync = async {
4591            let frontend = SystemParameterFrontend::from(&system_parameter_sync_config).await?;
4592            frontend.pull(&mut params);
4593            let ops = params
4594                .modified()
4595                .into_iter()
4596                .map(|param| {
4597                    let name = param.name;
4598                    let value = param.value;
4599                    tracing::info!(name, value, initial = true, "sync parameter");
4600                    (name, value)
4601                })
4602                .collect();
4603            tracing::info!("parameter sync on boot: end sync");
4604            Ok(Some(ops))
4605        };
4606        if !storage.has_system_config_synced_once().await? {
4607            frontend_sync.await
4608        } else {
4609            match mz_ore::future::timeout(system_parameter_sync_timeout, frontend_sync).await {
4610                Ok(ops) => Ok(ops),
4611                Err(TimeoutError::Inner(e)) => Err(e),
4612                Err(TimeoutError::DeadlineElapsed) => {
4613                    tracing::info!("parameter sync on boot: sync has timed out");
4614                    Ok(None)
4615                }
4616            }
4617        }
4618    } else {
4619        Ok(None)
4620    }
4621}
4622
4623#[derive(Debug)]
4624pub enum WatchSetResponse {
4625    StatementDependenciesReady(StatementLoggingId, StatementLifecycleEvent),
4626    AlterSinkReady(AlterSinkReadyContext),
4627}
4628
4629#[derive(Debug)]
4630pub struct AlterSinkReadyContext {
4631    ctx: Option<ExecuteContext>,
4632    otel_ctx: OpenTelemetryContext,
4633    plan: AlterSinkPlan,
4634    plan_validity: PlanValidity,
4635    read_hold: ReadHolds<Timestamp>,
4636}
4637
4638impl AlterSinkReadyContext {
4639    fn ctx(&mut self) -> &mut ExecuteContext {
4640        self.ctx.as_mut().expect("only cleared on drop")
4641    }
4642
4643    fn retire(mut self, result: Result<ExecuteResponse, AdapterError>) {
4644        self.ctx
4645            .take()
4646            .expect("only cleared on drop")
4647            .retire(result);
4648    }
4649}
4650
4651impl Drop for AlterSinkReadyContext {
4652    fn drop(&mut self) {
4653        if let Some(ctx) = self.ctx.take() {
4654            ctx.retire(Err(AdapterError::Canceled));
4655        }
4656    }
4657}
4658
4659/// A struct for tracking the ownership of a lock and a VecDeque to store to-be-done work after the
4660/// lock is freed.
4661#[derive(Debug)]
4662struct LockedVecDeque<T> {
4663    items: VecDeque<T>,
4664    lock: Arc<tokio::sync::Mutex<()>>,
4665}
4666
4667impl<T> LockedVecDeque<T> {
4668    pub fn new() -> Self {
4669        Self {
4670            items: VecDeque::new(),
4671            lock: Arc::new(tokio::sync::Mutex::new(())),
4672        }
4673    }
4674
4675    pub fn try_lock_owned(&self) -> Result<OwnedMutexGuard<()>, tokio::sync::TryLockError> {
4676        Arc::clone(&self.lock).try_lock_owned()
4677    }
4678
4679    pub fn is_empty(&self) -> bool {
4680        self.items.is_empty()
4681    }
4682
4683    pub fn push_back(&mut self, value: T) {
4684        self.items.push_back(value)
4685    }
4686
4687    pub fn pop_front(&mut self) -> Option<T> {
4688        self.items.pop_front()
4689    }
4690
4691    pub fn remove(&mut self, index: usize) -> Option<T> {
4692        self.items.remove(index)
4693    }
4694
4695    pub fn iter(&self) -> std::collections::vec_deque::Iter<'_, T> {
4696        self.items.iter()
4697    }
4698}
4699
4700#[derive(Debug)]
4701struct DeferredPlanStatement {
4702    ctx: ExecuteContext,
4703    ps: PlanStatement,
4704}
4705
4706#[derive(Debug)]
4707enum PlanStatement {
4708    Statement {
4709        stmt: Arc<Statement<Raw>>,
4710        params: Params,
4711    },
4712    Plan {
4713        plan: mz_sql::plan::Plan,
4714        resolved_ids: ResolvedIds,
4715    },
4716}
4717
4718#[derive(Debug, Error)]
4719pub enum NetworkPolicyError {
4720    #[error("Access denied for address {0}")]
4721    AddressDenied(IpAddr),
4722    #[error("Access denied missing IP address")]
4723    MissingIp,
4724}
4725
4726pub(crate) fn validate_ip_with_policy_rules(
4727    ip: &IpAddr,
4728    rules: &Vec<NetworkPolicyRule>,
4729) -> Result<(), NetworkPolicyError> {
4730    // At the moment we're not handling action or direction
4731    // as those are only able to be "allow" and "ingress" respectively
4732    if rules.iter().any(|r| r.address.0.contains(ip)) {
4733        Ok(())
4734    } else {
4735        Err(NetworkPolicyError::AddressDenied(ip.clone()))
4736    }
4737}