Skip to main content

mz_adapter/
coord.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Translation of SQL commands into timestamped `Controller` commands.
11//!
12//! The various SQL commands instruct the system to take actions that are not
13//! yet explicitly timestamped. On the other hand, the underlying data continually
14//! change as time moves forward. On the third hand, we greatly benefit from the
15//! information that some times are no longer of interest, so that we may
16//! compact the representation of the continually changing collections.
17//!
18//! The [`Coordinator`] curates these interactions by observing the progress
19//! collections make through time, choosing timestamps for its own commands,
20//! and eventually communicating that certain times have irretrievably "passed".
21//!
22//! ## Frontiers another way
23//!
24//! If the above description of frontiers left you with questions, this
25//! repackaged explanation might help.
26//!
27//! - `since` is the least recent time (i.e. oldest time) that you can read
28//!   from sources and be guaranteed that the returned data is accurate as of
29//!   that time.
30//!
31//!   Reads at times less than `since` may return values that were not actually
32//!   seen at the specified time, but arrived later (i.e. the results are
33//!   compacted).
34//!
35//!   For correctness' sake, the coordinator never chooses to read at a time
36//!   less than an arrangement's `since`.
37//!
38//! - `upper` is the first time after the most recent time that you can read
39//!   from sources and receive an immediate response. Alternately, it is the
40//!   least time at which the data may still change (that is the reason we may
41//!   not be able to respond immediately).
42//!
43//!   Reads at times >= `upper` may not immediately return because the answer
44//!   isn't known yet. However, once the `upper` is > the specified read time,
45//!   the read can return.
46//!
47//!   For the sake of returned values' freshness, the coordinator prefers
48//!   performing reads at an arrangement's `upper`. However, because we more
49//!   strongly prefer correctness, the coordinator will choose timestamps
50//!   greater than an object's `upper` if it is also being accessed alongside
51//!   objects whose `since` times are >= its `upper`.
52//!
53//! This illustration attempts to show, with time moving left to right, the
54//! relationship between `since` and `upper`.
55//!
56//! - `#`: possibly inaccurate results
57//! - `-`: immediate, correct response
58//! - `?`: not yet known
59//! - `s`: since
60//! - `u`: upper
61//! - `|`: eligible for coordinator to select
62//!
63//! ```nofmt
64//! ####s----u?????
65//!     |||||||||||
66//! ```
67//!
68
69use std::borrow::Cow;
70use std::collections::{BTreeMap, BTreeSet, VecDeque};
71use std::fmt;
72use std::net::IpAddr;
73use std::num::NonZeroI64;
74use std::ops::Neg;
75use std::str::FromStr;
76use std::sync::LazyLock;
77use std::sync::{Arc, Mutex};
78use std::thread;
79use std::time::{Duration, Instant};
80
81use anyhow::Context;
82use chrono::{DateTime, Utc};
83use derivative::Derivative;
84use differential_dataflow::lattice::Lattice;
85use fail::fail_point;
86use futures::StreamExt;
87use futures::future::{BoxFuture, FutureExt, LocalBoxFuture};
88use http::Uri;
89use ipnet::IpNet;
90use itertools::{Either, Itertools};
91use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
92use mz_adapter_types::compaction::CompactionWindow;
93use mz_adapter_types::connection::ConnectionId;
94use mz_adapter_types::dyncfgs::{
95    USER_ID_POOL_BATCH_SIZE, WITH_0DT_DEPLOYMENT_CAUGHT_UP_CHECK_INTERVAL,
96};
97use mz_auth::password::Password;
98use mz_build_info::BuildInfo;
99use mz_catalog::builtin::{BUILTINS, BUILTINS_STATIC, MZ_AUDIT_EVENTS, MZ_STORAGE_USAGE_BY_SHARD};
100use mz_catalog::config::{AwsPrincipalContext, BuiltinItemMigrationConfig, ClusterReplicaSizeMap};
101use mz_catalog::durable::{AuditLogIterator, OpenableDurableCatalogState};
102use mz_catalog::expr_cache::{GlobalExpressions, LocalExpressions};
103use mz_catalog::memory::objects::{
104    CatalogEntry, CatalogItem, ClusterReplicaProcessStatus, ClusterVariantManaged, Connection,
105    DataSourceDesc, StateDiff, StateUpdate, StateUpdateKind, Table, TableDataSource,
106};
107use mz_cloud_resources::{CloudResourceController, VpcEndpointConfig, VpcEndpointEvent};
108use mz_compute_client::as_of_selection;
109use mz_compute_client::controller::error::{
110    CollectionLookupError, CollectionMissing, DataflowCreationError, InstanceMissing,
111};
112use mz_compute_types::ComputeInstanceId;
113use mz_compute_types::dataflows::DataflowDescription;
114use mz_compute_types::plan::Plan;
115use mz_controller::clusters::{
116    ClusterConfig, ClusterEvent, ClusterStatus, ProcessId, ReplicaLocation,
117};
118use mz_controller::{ControllerConfig, Readiness};
119use mz_controller_types::{ClusterId, ReplicaId, WatchSetId};
120use mz_expr::{MapFilterProject, MirRelationExpr, OptimizedMirRelationExpr, RowSetFinishing};
121use mz_license_keys::ValidatedLicenseKey;
122use mz_orchestrator::OfflineReason;
123use mz_ore::cast::{CastFrom, CastInto, CastLossy};
124use mz_ore::channel::trigger::Trigger;
125use mz_ore::future::TimeoutError;
126use mz_ore::metrics::MetricsRegistry;
127use mz_ore::now::{EpochMillis, NowFn};
128use mz_ore::task::{JoinHandle, spawn};
129use mz_ore::thread::JoinHandleExt;
130use mz_ore::tracing::{OpenTelemetryContext, TracingHandle};
131use mz_ore::url::SensitiveUrl;
132use mz_ore::{assert_none, instrument, soft_assert_eq_or_log, soft_assert_or_log, stack};
133use mz_persist_client::PersistClient;
134use mz_persist_client::batch::ProtoBatch;
135use mz_persist_client::usage::{ShardsUsageReferenced, StorageUsageClient};
136use mz_repr::adt::numeric::Numeric;
137use mz_repr::explain::{ExplainConfig, ExplainFormat};
138use mz_repr::global_id::TransientIdGen;
139use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
140use mz_repr::role_id::RoleId;
141use mz_repr::{CatalogItemId, Diff, GlobalId, RelationDesc, SqlRelationType, Timestamp};
142use mz_secrets::cache::CachingSecretsReader;
143use mz_secrets::{SecretsController, SecretsReader};
144use mz_sql::ast::{Raw, Statement};
145use mz_sql::catalog::{CatalogCluster, EnvironmentId};
146use mz_sql::names::{QualifiedItemName, ResolvedIds, SchemaSpecifier};
147use mz_sql::optimizer_metrics::OptimizerMetrics;
148use mz_sql::plan::{
149    self, AlterSinkPlan, ConnectionDetails, CreateConnectionPlan, HirRelationExpr,
150    NetworkPolicyRule, OnTimeoutAction, Params, QueryWhen,
151};
152use mz_sql::session::user::User;
153use mz_sql::session::vars::{MAX_CREDIT_CONSUMPTION_RATE, SystemVars, Var};
154use mz_sql_parser::ast::ExplainStage;
155use mz_sql_parser::ast::display::AstDisplay;
156use mz_storage_client::client::TableData;
157use mz_storage_client::controller::{CollectionDescription, DataSource, ExportDescription};
158use mz_storage_types::connections::Connection as StorageConnection;
159use mz_storage_types::connections::ConnectionContext;
160use mz_storage_types::connections::inline::{IntoInlineConnection, ReferencedConnection};
161use mz_storage_types::read_holds::ReadHold;
162use mz_storage_types::sinks::{S3SinkFormat, StorageSinkDesc};
163use mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC;
164use mz_storage_types::sources::{IngestionDescription, SourceExport, Timeline};
165use mz_timestamp_oracle::{TimestampOracleConfig, WriteTimestamp};
166use mz_transform::dataflow::DataflowMetainfo;
167use opentelemetry::trace::TraceContextExt;
168use serde::Serialize;
169use thiserror::Error;
170use timely::progress::{Antichain, Timestamp as _};
171use tokio::runtime::Handle as TokioHandle;
172use tokio::select;
173use tokio::sync::{OwnedMutexGuard, mpsc, oneshot, watch};
174use tokio::time::{Interval, MissedTickBehavior};
175use tracing::{Instrument, Level, Span, debug, info, info_span, span, warn};
176use tracing_opentelemetry::OpenTelemetrySpanExt;
177use uuid::Uuid;
178
179use crate::active_compute_sink::{ActiveComputeSink, ActiveCopyFrom};
180use crate::catalog::{BuiltinTableUpdate, Catalog, OpenCatalogResult};
181use crate::client::{Client, Handle};
182use crate::command::{Command, ExecuteResponse};
183use crate::config::{SynchronizedParameters, SystemParameterFrontend, SystemParameterSyncConfig};
184use crate::coord::appends::{
185    BuiltinTableAppendNotify, DeferredOp, GroupCommitPermit, PendingWriteTxn,
186};
187use crate::coord::caught_up::CaughtUpCheckContext;
188use crate::coord::cluster_scheduling::SchedulingDecision;
189use crate::coord::id_bundle::CollectionIdBundle;
190use crate::coord::introspection::IntrospectionSubscribe;
191use crate::coord::peek::PendingPeek;
192use crate::coord::statement_logging::StatementLogging;
193use crate::coord::timeline::{TimelineContext, TimelineState};
194use crate::coord::timestamp_selection::{TimestampContext, TimestampDetermination};
195use crate::coord::validity::PlanValidity;
196use crate::error::AdapterError;
197use crate::explain::insights::PlanInsightsContext;
198use crate::explain::optimizer_trace::{DispatchGuard, OptimizerTrace};
199use crate::metrics::Metrics;
200use crate::optimize::dataflows::{
201    ComputeInstanceSnapshot, DataflowBuilder, dataflow_import_id_bundle,
202};
203use crate::optimize::{self, Optimize, OptimizerConfig};
204use crate::session::{EndTransactionAction, Session};
205use crate::statement_logging::{
206    StatementEndedExecutionReason, StatementLifecycleEvent, StatementLoggingId,
207};
208use crate::util::{ClientTransmitter, ResultExt, sort_topological};
209use crate::webhook::{WebhookAppenderInvalidator, WebhookConcurrencyLimiter};
210use crate::{AdapterNotice, ReadHolds, flags};
211
212pub(crate) mod appends;
213pub(crate) mod catalog_serving;
214pub(crate) mod cluster_scheduling;
215pub(crate) mod consistency;
216pub(crate) mod id_bundle;
217pub(crate) mod in_memory_oracle;
218pub(crate) mod peek;
219pub(crate) mod read_policy;
220pub(crate) mod sequencer;
221pub(crate) mod statement_logging;
222pub(crate) mod timeline;
223pub(crate) mod timestamp_selection;
224
225pub mod catalog_implications;
226mod caught_up;
227mod command_handler;
228mod ddl;
229mod indexes;
230mod introspection;
231mod message_handler;
232mod privatelink_status;
233mod sql;
234mod validity;
235
236/// A pool of pre-allocated user IDs to avoid per-DDL persist writes.
237///
238/// IDs in the range `[next, upper)` are available for allocation.
239/// When exhausted, the pool must be refilled via the catalog.
240///
241/// # Correctness
242///
243/// The pool is owned by [`Coordinator`], which processes all requests
244/// on a single-threaded event loop. Because every access requires
245/// `&mut self` on the coordinator, there is no concurrent access to the
246/// pool — no additional synchronization is needed.
247///
248/// Global ID uniqueness is guaranteed because each refill calls
249/// [`Catalog::allocate_user_ids`], which performs a durable persist
250/// write that atomically reserves the entire batch before any IDs from
251/// it are handed out. If the process crashes after a refill but before
252/// all pre-allocated IDs are consumed, the unused IDs form harmless
253/// gaps in the sequence — user IDs are not required to be contiguous.
254///
255/// This guarantee holds even if multiple `environmentd` processes run
256/// concurrently. Each process has its own independent pool,
257/// but every refill goes through the shared persist-backed catalog,
258/// which serializes allocations across all callers. Two processes
259/// will therefore never receive overlapping ID ranges,
260/// for the same reason they could not before this pool existed.
261#[derive(Debug)]
262pub(crate) struct IdPool {
263    next: u64,
264    upper: u64,
265}
266
267impl IdPool {
268    /// Creates an empty pool.
269    pub fn empty() -> Self {
270        IdPool { next: 0, upper: 0 }
271    }
272
273    /// Allocates a single ID from the pool, returning `None` if exhausted.
274    pub fn allocate(&mut self) -> Option<u64> {
275        if self.next < self.upper {
276            let id = self.next;
277            self.next += 1;
278            Some(id)
279        } else {
280            None
281        }
282    }
283
284    /// Allocates `n` consecutive IDs from the pool, returning `None` if
285    /// insufficient IDs remain.
286    pub fn allocate_many(&mut self, n: u64) -> Option<Vec<u64>> {
287        if self.remaining() >= n {
288            let ids = (self.next..self.next + n).collect();
289            self.next += n;
290            Some(ids)
291        } else {
292            None
293        }
294    }
295
296    /// Returns the number of IDs remaining in the pool.
297    pub fn remaining(&self) -> u64 {
298        self.upper - self.next
299    }
300
301    /// Refills the pool with the given range `[next, upper)`.
302    pub fn refill(&mut self, next: u64, upper: u64) {
303        assert!(next <= upper, "invalid pool range: {next}..{upper}");
304        self.next = next;
305        self.upper = upper;
306    }
307}
308
309#[derive(Debug)]
310pub enum Message {
311    Command(OpenTelemetryContext, Command),
312    ControllerReady {
313        controller: ControllerReadiness,
314    },
315    PurifiedStatementReady(PurifiedStatementReady),
316    CreateConnectionValidationReady(CreateConnectionValidationReady),
317    AlterConnectionValidationReady(AlterConnectionValidationReady),
318    TryDeferred {
319        /// The connection that created this op.
320        conn_id: ConnectionId,
321        /// The write lock that notified us our deferred op might be able to run.
322        ///
323        /// Note: While we never want to hold a partial set of locks, it can be important to hold
324        /// onto the _one_ that notified us our op might be ready. If there are multiple operations
325        /// waiting on a single collection, and we don't hold this lock through retyring the op,
326        /// then everything waiting on this collection will get retried causing traffic in the
327        /// Coordinator's message queue.
328        ///
329        /// See [`DeferredOp::can_be_optimistically_retried`] for more detail.
330        acquired_lock: Option<(CatalogItemId, tokio::sync::OwnedMutexGuard<()>)>,
331    },
332    /// Initiates a group commit.
333    GroupCommitInitiate(Span, Option<GroupCommitPermit>),
334    DeferredStatementReady,
335    AdvanceTimelines,
336    ClusterEvent(ClusterEvent),
337    CancelPendingPeeks {
338        conn_id: ConnectionId,
339    },
340    LinearizeReads,
341    StagedBatches {
342        conn_id: ConnectionId,
343        table_id: CatalogItemId,
344        batches: Vec<Result<ProtoBatch, String>>,
345    },
346    StorageUsageSchedule,
347    StorageUsageFetch,
348    StorageUsageUpdate(ShardsUsageReferenced),
349    StorageUsagePrune(Vec<BuiltinTableUpdate>),
350    /// Performs any cleanup and logging actions necessary for
351    /// finalizing a statement execution.
352    RetireExecute {
353        data: ExecuteContextExtra,
354        otel_ctx: OpenTelemetryContext,
355        reason: StatementEndedExecutionReason,
356    },
357    ExecuteSingleStatementTransaction {
358        ctx: ExecuteContext,
359        otel_ctx: OpenTelemetryContext,
360        stmt: Arc<Statement<Raw>>,
361        params: mz_sql::plan::Params,
362    },
363    PeekStageReady {
364        ctx: ExecuteContext,
365        span: Span,
366        stage: PeekStage,
367    },
368    CreateIndexStageReady {
369        ctx: ExecuteContext,
370        span: Span,
371        stage: CreateIndexStage,
372    },
373    CreateViewStageReady {
374        ctx: ExecuteContext,
375        span: Span,
376        stage: CreateViewStage,
377    },
378    CreateMaterializedViewStageReady {
379        ctx: ExecuteContext,
380        span: Span,
381        stage: CreateMaterializedViewStage,
382    },
383    SubscribeStageReady {
384        ctx: ExecuteContext,
385        span: Span,
386        stage: SubscribeStage,
387    },
388    IntrospectionSubscribeStageReady {
389        span: Span,
390        stage: IntrospectionSubscribeStage,
391    },
392    SecretStageReady {
393        ctx: ExecuteContext,
394        span: Span,
395        stage: SecretStage,
396    },
397    ClusterStageReady {
398        ctx: ExecuteContext,
399        span: Span,
400        stage: ClusterStage,
401    },
402    ExplainTimestampStageReady {
403        ctx: ExecuteContext,
404        span: Span,
405        stage: ExplainTimestampStage,
406    },
407    DrainStatementLog,
408    PrivateLinkVpcEndpointEvents(Vec<VpcEndpointEvent>),
409    CheckSchedulingPolicies,
410
411    /// Scheduling policy decisions about turning clusters On/Off.
412    /// `Vec<(policy name, Vec of decisions by the policy)>`
413    /// A cluster will be On if and only if there is at least one On decision for it.
414    /// Scheduling decisions for clusters that have `SCHEDULE = MANUAL` are ignored.
415    SchedulingDecisions(Vec<(&'static str, Vec<(ClusterId, SchedulingDecision)>)>),
416}
417
418impl Message {
419    /// Returns a string to identify the kind of [`Message`], useful for logging.
420    pub const fn kind(&self) -> &'static str {
421        match self {
422            Message::Command(_, msg) => match msg {
423                Command::CatalogSnapshot { .. } => "command-catalog_snapshot",
424                Command::Startup { .. } => "command-startup",
425                Command::Execute { .. } => "command-execute",
426                Command::Commit { .. } => "command-commit",
427                Command::CancelRequest { .. } => "command-cancel_request",
428                Command::PrivilegedCancelRequest { .. } => "command-privileged_cancel_request",
429                Command::GetWebhook { .. } => "command-get_webhook",
430                Command::GetSystemVars { .. } => "command-get_system_vars",
431                Command::SetSystemVars { .. } => "command-set_system_vars",
432                Command::Terminate { .. } => "command-terminate",
433                Command::RetireExecute { .. } => "command-retire_execute",
434                Command::CheckConsistency { .. } => "command-check_consistency",
435                Command::Dump { .. } => "command-dump",
436                Command::AuthenticatePassword { .. } => "command-auth_check",
437                Command::AuthenticateGetSASLChallenge { .. } => "command-auth_get_sasl_challenge",
438                Command::AuthenticateVerifySASLProof { .. } => "command-auth_verify_sasl_proof",
439                Command::GetComputeInstanceClient { .. } => "get-compute-instance-client",
440                Command::GetOracle { .. } => "get-oracle",
441                Command::DetermineRealTimeRecentTimestamp { .. } => {
442                    "determine-real-time-recent-timestamp"
443                }
444                Command::GetTransactionReadHoldsBundle { .. } => {
445                    "get-transaction-read-holds-bundle"
446                }
447                Command::StoreTransactionReadHolds { .. } => "store-transaction-read-holds",
448                Command::ExecuteSlowPathPeek { .. } => "execute-slow-path-peek",
449                Command::CopyToPreflight { .. } => "copy-to-preflight",
450                Command::ExecuteCopyTo { .. } => "execute-copy-to",
451                Command::ExecuteSideEffectingFunc { .. } => "execute-side-effecting-func",
452                Command::RegisterFrontendPeek { .. } => "register-frontend-peek",
453                Command::UnregisterFrontendPeek { .. } => "unregister-frontend-peek",
454                Command::ExplainTimestamp { .. } => "explain-timestamp",
455                Command::FrontendStatementLogging(..) => "frontend-statement-logging",
456                Command::StartCopyFromStdin { .. } => "start-copy-from-stdin",
457            },
458            Message::ControllerReady {
459                controller: ControllerReadiness::Compute,
460            } => "controller_ready(compute)",
461            Message::ControllerReady {
462                controller: ControllerReadiness::Storage,
463            } => "controller_ready(storage)",
464            Message::ControllerReady {
465                controller: ControllerReadiness::Metrics,
466            } => "controller_ready(metrics)",
467            Message::ControllerReady {
468                controller: ControllerReadiness::Internal,
469            } => "controller_ready(internal)",
470            Message::PurifiedStatementReady(_) => "purified_statement_ready",
471            Message::CreateConnectionValidationReady(_) => "create_connection_validation_ready",
472            Message::TryDeferred { .. } => "try_deferred",
473            Message::GroupCommitInitiate(..) => "group_commit_initiate",
474            Message::AdvanceTimelines => "advance_timelines",
475            Message::ClusterEvent(_) => "cluster_event",
476            Message::CancelPendingPeeks { .. } => "cancel_pending_peeks",
477            Message::LinearizeReads => "linearize_reads",
478            Message::StagedBatches { .. } => "staged_batches",
479            Message::StorageUsageSchedule => "storage_usage_schedule",
480            Message::StorageUsageFetch => "storage_usage_fetch",
481            Message::StorageUsageUpdate(_) => "storage_usage_update",
482            Message::StorageUsagePrune(_) => "storage_usage_prune",
483            Message::RetireExecute { .. } => "retire_execute",
484            Message::ExecuteSingleStatementTransaction { .. } => {
485                "execute_single_statement_transaction"
486            }
487            Message::PeekStageReady { .. } => "peek_stage_ready",
488            Message::ExplainTimestampStageReady { .. } => "explain_timestamp_stage_ready",
489            Message::CreateIndexStageReady { .. } => "create_index_stage_ready",
490            Message::CreateViewStageReady { .. } => "create_view_stage_ready",
491            Message::CreateMaterializedViewStageReady { .. } => {
492                "create_materialized_view_stage_ready"
493            }
494            Message::SubscribeStageReady { .. } => "subscribe_stage_ready",
495            Message::IntrospectionSubscribeStageReady { .. } => {
496                "introspection_subscribe_stage_ready"
497            }
498            Message::SecretStageReady { .. } => "secret_stage_ready",
499            Message::ClusterStageReady { .. } => "cluster_stage_ready",
500            Message::DrainStatementLog => "drain_statement_log",
501            Message::AlterConnectionValidationReady(..) => "alter_connection_validation_ready",
502            Message::PrivateLinkVpcEndpointEvents(_) => "private_link_vpc_endpoint_events",
503            Message::CheckSchedulingPolicies => "check_scheduling_policies",
504            Message::SchedulingDecisions { .. } => "scheduling_decision",
505            Message::DeferredStatementReady => "deferred_statement_ready",
506        }
507    }
508}
509
510/// The reason for why a controller needs processing on the main loop.
511#[derive(Debug)]
512pub enum ControllerReadiness {
513    /// The storage controller is ready.
514    Storage,
515    /// The compute controller is ready.
516    Compute,
517    /// A batch of metric data is ready.
518    Metrics,
519    /// An internally-generated message is ready to be returned.
520    Internal,
521}
522
523#[derive(Derivative)]
524#[derivative(Debug)]
525pub struct BackgroundWorkResult<T> {
526    #[derivative(Debug = "ignore")]
527    pub ctx: ExecuteContext,
528    pub result: Result<T, AdapterError>,
529    pub params: Params,
530    pub plan_validity: PlanValidity,
531    pub original_stmt: Arc<Statement<Raw>>,
532    pub otel_ctx: OpenTelemetryContext,
533}
534
535pub type PurifiedStatementReady = BackgroundWorkResult<mz_sql::pure::PurifiedStatement>;
536
537#[derive(Derivative)]
538#[derivative(Debug)]
539pub struct ValidationReady<T> {
540    #[derivative(Debug = "ignore")]
541    pub ctx: ExecuteContext,
542    pub result: Result<T, AdapterError>,
543    pub resolved_ids: ResolvedIds,
544    pub connection_id: CatalogItemId,
545    pub connection_gid: GlobalId,
546    pub plan_validity: PlanValidity,
547    pub otel_ctx: OpenTelemetryContext,
548}
549
550pub type CreateConnectionValidationReady = ValidationReady<CreateConnectionPlan>;
551pub type AlterConnectionValidationReady = ValidationReady<Connection>;
552
553#[derive(Debug)]
554pub enum PeekStage {
555    /// Common stages across SELECT, EXPLAIN and COPY TO queries.
556    LinearizeTimestamp(PeekStageLinearizeTimestamp),
557    RealTimeRecency(PeekStageRealTimeRecency),
558    TimestampReadHold(PeekStageTimestampReadHold),
559    Optimize(PeekStageOptimize),
560    /// Final stage for a peek.
561    Finish(PeekStageFinish),
562    /// Final stage for an explain.
563    ExplainPlan(PeekStageExplainPlan),
564    ExplainPushdown(PeekStageExplainPushdown),
565    /// Preflight checks for a copy to operation.
566    CopyToPreflight(PeekStageCopyTo),
567    /// Final stage for a copy to which involves shipping the dataflow.
568    CopyToDataflow(PeekStageCopyTo),
569}
570
571#[derive(Debug)]
572pub struct CopyToContext {
573    /// The `RelationDesc` of the data to be copied.
574    pub desc: RelationDesc,
575    /// The destination uri of the external service where the data will be copied.
576    pub uri: Uri,
577    /// Connection information required to connect to the external service to copy the data.
578    pub connection: StorageConnection<ReferencedConnection>,
579    /// The ID of the CONNECTION object to be used for copying the data.
580    pub connection_id: CatalogItemId,
581    /// Format params to format the data.
582    pub format: S3SinkFormat,
583    /// Approximate max file size of each uploaded file.
584    pub max_file_size: u64,
585    /// Number of batches the output of the COPY TO will be partitioned into
586    /// to distribute the load across workers deterministically.
587    /// This is only an option since it's not set when CopyToContext is instantiated
588    /// but immediately after in the PeekStageValidate stage.
589    pub output_batch_count: Option<u64>,
590}
591
592#[derive(Debug)]
593pub struct PeekStageLinearizeTimestamp {
594    validity: PlanValidity,
595    plan: mz_sql::plan::SelectPlan,
596    max_query_result_size: Option<u64>,
597    source_ids: BTreeSet<GlobalId>,
598    target_replica: Option<ReplicaId>,
599    timeline_context: TimelineContext,
600    optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
601    /// An optional context set iff the state machine is initiated from
602    /// sequencing an EXPLAIN for this statement.
603    explain_ctx: ExplainContext,
604}
605
606#[derive(Debug)]
607pub struct PeekStageRealTimeRecency {
608    validity: PlanValidity,
609    plan: mz_sql::plan::SelectPlan,
610    max_query_result_size: Option<u64>,
611    source_ids: BTreeSet<GlobalId>,
612    target_replica: Option<ReplicaId>,
613    timeline_context: TimelineContext,
614    oracle_read_ts: Option<Timestamp>,
615    optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
616    /// An optional context set iff the state machine is initiated from
617    /// sequencing an EXPLAIN for this statement.
618    explain_ctx: ExplainContext,
619}
620
621#[derive(Debug)]
622pub struct PeekStageTimestampReadHold {
623    validity: PlanValidity,
624    plan: mz_sql::plan::SelectPlan,
625    max_query_result_size: Option<u64>,
626    source_ids: BTreeSet<GlobalId>,
627    target_replica: Option<ReplicaId>,
628    timeline_context: TimelineContext,
629    oracle_read_ts: Option<Timestamp>,
630    real_time_recency_ts: Option<mz_repr::Timestamp>,
631    optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
632    /// An optional context set iff the state machine is initiated from
633    /// sequencing an EXPLAIN for this statement.
634    explain_ctx: ExplainContext,
635}
636
637#[derive(Debug)]
638pub struct PeekStageOptimize {
639    validity: PlanValidity,
640    plan: mz_sql::plan::SelectPlan,
641    max_query_result_size: Option<u64>,
642    source_ids: BTreeSet<GlobalId>,
643    id_bundle: CollectionIdBundle,
644    target_replica: Option<ReplicaId>,
645    determination: TimestampDetermination<mz_repr::Timestamp>,
646    optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
647    /// An optional context set iff the state machine is initiated from
648    /// sequencing an EXPLAIN for this statement.
649    explain_ctx: ExplainContext,
650}
651
652#[derive(Debug)]
653pub struct PeekStageFinish {
654    validity: PlanValidity,
655    plan: mz_sql::plan::SelectPlan,
656    max_query_result_size: Option<u64>,
657    id_bundle: CollectionIdBundle,
658    target_replica: Option<ReplicaId>,
659    source_ids: BTreeSet<GlobalId>,
660    determination: TimestampDetermination<mz_repr::Timestamp>,
661    cluster_id: ComputeInstanceId,
662    finishing: RowSetFinishing,
663    /// When present, an optimizer trace to be used for emitting a plan insights
664    /// notice.
665    plan_insights_optimizer_trace: Option<OptimizerTrace>,
666    insights_ctx: Option<Box<PlanInsightsContext>>,
667    global_lir_plan: optimize::peek::GlobalLirPlan,
668    optimization_finished_at: EpochMillis,
669}
670
671#[derive(Debug)]
672pub struct PeekStageCopyTo {
673    validity: PlanValidity,
674    optimizer: optimize::copy_to::Optimizer,
675    global_lir_plan: optimize::copy_to::GlobalLirPlan,
676    optimization_finished_at: EpochMillis,
677    source_ids: BTreeSet<GlobalId>,
678}
679
680#[derive(Debug)]
681pub struct PeekStageExplainPlan {
682    validity: PlanValidity,
683    optimizer: optimize::peek::Optimizer,
684    df_meta: DataflowMetainfo,
685    explain_ctx: ExplainPlanContext,
686    insights_ctx: Option<Box<PlanInsightsContext>>,
687}
688
689#[derive(Debug)]
690pub struct PeekStageExplainPushdown {
691    validity: PlanValidity,
692    determination: TimestampDetermination<mz_repr::Timestamp>,
693    imports: BTreeMap<GlobalId, MapFilterProject>,
694}
695
696#[derive(Debug)]
697pub enum CreateIndexStage {
698    Optimize(CreateIndexOptimize),
699    Finish(CreateIndexFinish),
700    Explain(CreateIndexExplain),
701}
702
703#[derive(Debug)]
704pub struct CreateIndexOptimize {
705    validity: PlanValidity,
706    plan: plan::CreateIndexPlan,
707    resolved_ids: ResolvedIds,
708    /// An optional context set iff the state machine is initiated from
709    /// sequencing an EXPLAIN for this statement.
710    explain_ctx: ExplainContext,
711}
712
713#[derive(Debug)]
714pub struct CreateIndexFinish {
715    validity: PlanValidity,
716    item_id: CatalogItemId,
717    global_id: GlobalId,
718    plan: plan::CreateIndexPlan,
719    resolved_ids: ResolvedIds,
720    global_mir_plan: optimize::index::GlobalMirPlan,
721    global_lir_plan: optimize::index::GlobalLirPlan,
722    optimizer_features: OptimizerFeatures,
723}
724
725#[derive(Debug)]
726pub struct CreateIndexExplain {
727    validity: PlanValidity,
728    exported_index_id: GlobalId,
729    plan: plan::CreateIndexPlan,
730    df_meta: DataflowMetainfo,
731    explain_ctx: ExplainPlanContext,
732}
733
734#[derive(Debug)]
735pub enum CreateViewStage {
736    Optimize(CreateViewOptimize),
737    Finish(CreateViewFinish),
738    Explain(CreateViewExplain),
739}
740
741#[derive(Debug)]
742pub struct CreateViewOptimize {
743    validity: PlanValidity,
744    plan: plan::CreateViewPlan,
745    resolved_ids: ResolvedIds,
746    /// An optional context set iff the state machine is initiated from
747    /// sequencing an EXPLAIN for this statement.
748    explain_ctx: ExplainContext,
749}
750
751#[derive(Debug)]
752pub struct CreateViewFinish {
753    validity: PlanValidity,
754    /// ID of this item in the Catalog.
755    item_id: CatalogItemId,
756    /// ID by with Compute will reference this View.
757    global_id: GlobalId,
758    plan: plan::CreateViewPlan,
759    /// IDs of objects resolved during name resolution.
760    resolved_ids: ResolvedIds,
761    optimized_expr: OptimizedMirRelationExpr,
762}
763
764#[derive(Debug)]
765pub struct CreateViewExplain {
766    validity: PlanValidity,
767    id: GlobalId,
768    plan: plan::CreateViewPlan,
769    explain_ctx: ExplainPlanContext,
770}
771
772#[derive(Debug)]
773pub enum ExplainTimestampStage {
774    Optimize(ExplainTimestampOptimize),
775    RealTimeRecency(ExplainTimestampRealTimeRecency),
776    Finish(ExplainTimestampFinish),
777}
778
779#[derive(Debug)]
780pub struct ExplainTimestampOptimize {
781    validity: PlanValidity,
782    plan: plan::ExplainTimestampPlan,
783    cluster_id: ClusterId,
784}
785
786#[derive(Debug)]
787pub struct ExplainTimestampRealTimeRecency {
788    validity: PlanValidity,
789    format: ExplainFormat,
790    optimized_plan: OptimizedMirRelationExpr,
791    cluster_id: ClusterId,
792    when: QueryWhen,
793}
794
795#[derive(Debug)]
796pub struct ExplainTimestampFinish {
797    validity: PlanValidity,
798    format: ExplainFormat,
799    optimized_plan: OptimizedMirRelationExpr,
800    cluster_id: ClusterId,
801    source_ids: BTreeSet<GlobalId>,
802    when: QueryWhen,
803    real_time_recency_ts: Option<Timestamp>,
804}
805
806#[derive(Debug)]
807pub enum ClusterStage {
808    Alter(AlterCluster),
809    WaitForHydrated(AlterClusterWaitForHydrated),
810    Finalize(AlterClusterFinalize),
811}
812
813#[derive(Debug)]
814pub struct AlterCluster {
815    validity: PlanValidity,
816    plan: plan::AlterClusterPlan,
817}
818
819#[derive(Debug)]
820pub struct AlterClusterWaitForHydrated {
821    validity: PlanValidity,
822    plan: plan::AlterClusterPlan,
823    new_config: ClusterVariantManaged,
824    timeout_time: Instant,
825    on_timeout: OnTimeoutAction,
826}
827
828#[derive(Debug)]
829pub struct AlterClusterFinalize {
830    validity: PlanValidity,
831    plan: plan::AlterClusterPlan,
832    new_config: ClusterVariantManaged,
833}
834
835#[derive(Debug)]
836pub enum ExplainContext {
837    /// The ordinary, non-explain variant of the statement.
838    None,
839    /// The `EXPLAIN <level> PLAN FOR <explainee>` version of the statement.
840    Plan(ExplainPlanContext),
841    /// Generate a notice containing the `EXPLAIN PLAN INSIGHTS` output
842    /// alongside the query's normal output.
843    PlanInsightsNotice(OptimizerTrace),
844    /// `EXPLAIN FILTER PUSHDOWN`
845    Pushdown,
846}
847
848impl ExplainContext {
849    /// If available for this context, wrap the [`OptimizerTrace`] into a
850    /// [`tracing::Dispatch`] and set it as default, returning the resulting
851    /// guard in a `Some(guard)` option.
852    pub(crate) fn dispatch_guard(&self) -> Option<DispatchGuard<'_>> {
853        let optimizer_trace = match self {
854            ExplainContext::Plan(explain_ctx) => Some(&explain_ctx.optimizer_trace),
855            ExplainContext::PlanInsightsNotice(optimizer_trace) => Some(optimizer_trace),
856            _ => None,
857        };
858        optimizer_trace.map(|optimizer_trace| optimizer_trace.as_guard())
859    }
860
861    pub(crate) fn needs_cluster(&self) -> bool {
862        match self {
863            ExplainContext::None => true,
864            ExplainContext::Plan(..) => false,
865            ExplainContext::PlanInsightsNotice(..) => true,
866            ExplainContext::Pushdown => false,
867        }
868    }
869
870    pub(crate) fn needs_plan_insights(&self) -> bool {
871        matches!(
872            self,
873            ExplainContext::Plan(ExplainPlanContext {
874                stage: ExplainStage::PlanInsights,
875                ..
876            }) | ExplainContext::PlanInsightsNotice(_)
877        )
878    }
879}
880
881#[derive(Debug)]
882pub struct ExplainPlanContext {
883    /// EXPLAIN BROKEN is internal syntax for showing EXPLAIN output despite an internal error in
884    /// the optimizer: we don't immediately bail out from peek sequencing when an internal optimizer
885    /// error happens, but go on with trying to show the requested EXPLAIN stage. This can still
886    /// succeed if the requested EXPLAIN stage is before the point where the error happened.
887    pub broken: bool,
888    pub config: ExplainConfig,
889    pub format: ExplainFormat,
890    pub stage: ExplainStage,
891    pub replan: Option<GlobalId>,
892    pub desc: Option<RelationDesc>,
893    pub optimizer_trace: OptimizerTrace,
894}
895
896#[derive(Debug)]
897pub enum CreateMaterializedViewStage {
898    Optimize(CreateMaterializedViewOptimize),
899    Finish(CreateMaterializedViewFinish),
900    Explain(CreateMaterializedViewExplain),
901}
902
903#[derive(Debug)]
904pub struct CreateMaterializedViewOptimize {
905    validity: PlanValidity,
906    plan: plan::CreateMaterializedViewPlan,
907    resolved_ids: ResolvedIds,
908    /// An optional context set iff the state machine is initiated from
909    /// sequencing an EXPLAIN for this statement.
910    explain_ctx: ExplainContext,
911}
912
913#[derive(Debug)]
914pub struct CreateMaterializedViewFinish {
915    /// The ID of this Materialized View in the Catalog.
916    item_id: CatalogItemId,
917    /// The ID of the durable pTVC backing this Materialized View.
918    global_id: GlobalId,
919    validity: PlanValidity,
920    plan: plan::CreateMaterializedViewPlan,
921    resolved_ids: ResolvedIds,
922    local_mir_plan: optimize::materialized_view::LocalMirPlan,
923    global_mir_plan: optimize::materialized_view::GlobalMirPlan,
924    global_lir_plan: optimize::materialized_view::GlobalLirPlan,
925    optimizer_features: OptimizerFeatures,
926}
927
928#[derive(Debug)]
929pub struct CreateMaterializedViewExplain {
930    global_id: GlobalId,
931    validity: PlanValidity,
932    plan: plan::CreateMaterializedViewPlan,
933    df_meta: DataflowMetainfo,
934    explain_ctx: ExplainPlanContext,
935}
936
937#[derive(Debug)]
938pub enum SubscribeStage {
939    OptimizeMir(SubscribeOptimizeMir),
940    TimestampOptimizeLir(SubscribeTimestampOptimizeLir),
941    Finish(SubscribeFinish),
942    Explain(SubscribeExplain),
943}
944
945#[derive(Debug)]
946pub struct SubscribeOptimizeMir {
947    validity: PlanValidity,
948    plan: plan::SubscribePlan,
949    timeline: TimelineContext,
950    dependency_ids: BTreeSet<GlobalId>,
951    cluster_id: ComputeInstanceId,
952    replica_id: Option<ReplicaId>,
953    /// An optional context set iff the state machine is initiated from
954    /// sequencing an EXPLAIN for this statement.
955    explain_ctx: ExplainContext,
956}
957
958#[derive(Debug)]
959pub struct SubscribeTimestampOptimizeLir {
960    validity: PlanValidity,
961    plan: plan::SubscribePlan,
962    timeline: TimelineContext,
963    optimizer: optimize::subscribe::Optimizer,
964    global_mir_plan: optimize::subscribe::GlobalMirPlan<optimize::subscribe::Unresolved>,
965    dependency_ids: BTreeSet<GlobalId>,
966    replica_id: Option<ReplicaId>,
967    /// An optional context set iff the state machine is initiated from
968    /// sequencing an EXPLAIN for this statement.
969    explain_ctx: ExplainContext,
970}
971
972#[derive(Debug)]
973pub struct SubscribeFinish {
974    validity: PlanValidity,
975    cluster_id: ComputeInstanceId,
976    replica_id: Option<ReplicaId>,
977    plan: plan::SubscribePlan,
978    global_lir_plan: optimize::subscribe::GlobalLirPlan,
979    dependency_ids: BTreeSet<GlobalId>,
980}
981
982#[derive(Debug)]
983pub struct SubscribeExplain {
984    validity: PlanValidity,
985    optimizer: optimize::subscribe::Optimizer,
986    df_meta: DataflowMetainfo,
987    cluster_id: ComputeInstanceId,
988    explain_ctx: ExplainPlanContext,
989}
990
991#[derive(Debug)]
992pub enum IntrospectionSubscribeStage {
993    OptimizeMir(IntrospectionSubscribeOptimizeMir),
994    TimestampOptimizeLir(IntrospectionSubscribeTimestampOptimizeLir),
995    Finish(IntrospectionSubscribeFinish),
996}
997
998#[derive(Debug)]
999pub struct IntrospectionSubscribeOptimizeMir {
1000    validity: PlanValidity,
1001    plan: plan::SubscribePlan,
1002    subscribe_id: GlobalId,
1003    cluster_id: ComputeInstanceId,
1004    replica_id: ReplicaId,
1005}
1006
1007#[derive(Debug)]
1008pub struct IntrospectionSubscribeTimestampOptimizeLir {
1009    validity: PlanValidity,
1010    optimizer: optimize::subscribe::Optimizer,
1011    global_mir_plan: optimize::subscribe::GlobalMirPlan<optimize::subscribe::Unresolved>,
1012    cluster_id: ComputeInstanceId,
1013    replica_id: ReplicaId,
1014}
1015
1016#[derive(Debug)]
1017pub struct IntrospectionSubscribeFinish {
1018    validity: PlanValidity,
1019    global_lir_plan: optimize::subscribe::GlobalLirPlan,
1020    read_holds: ReadHolds<Timestamp>,
1021    cluster_id: ComputeInstanceId,
1022    replica_id: ReplicaId,
1023}
1024
1025#[derive(Debug)]
1026pub enum SecretStage {
1027    CreateEnsure(CreateSecretEnsure),
1028    CreateFinish(CreateSecretFinish),
1029    RotateKeysEnsure(RotateKeysSecretEnsure),
1030    RotateKeysFinish(RotateKeysSecretFinish),
1031    Alter(AlterSecret),
1032}
1033
1034#[derive(Debug)]
1035pub struct CreateSecretEnsure {
1036    validity: PlanValidity,
1037    plan: plan::CreateSecretPlan,
1038}
1039
1040#[derive(Debug)]
1041pub struct CreateSecretFinish {
1042    validity: PlanValidity,
1043    item_id: CatalogItemId,
1044    global_id: GlobalId,
1045    plan: plan::CreateSecretPlan,
1046}
1047
1048#[derive(Debug)]
1049pub struct RotateKeysSecretEnsure {
1050    validity: PlanValidity,
1051    id: CatalogItemId,
1052}
1053
1054#[derive(Debug)]
1055pub struct RotateKeysSecretFinish {
1056    validity: PlanValidity,
1057    ops: Vec<crate::catalog::Op>,
1058}
1059
1060#[derive(Debug)]
1061pub struct AlterSecret {
1062    validity: PlanValidity,
1063    plan: plan::AlterSecretPlan,
1064}
1065
1066/// An enum describing which cluster to run a statement on.
1067///
1068/// One example usage would be that if a query depends only on system tables, we might
1069/// automatically run it on the catalog server cluster to benefit from indexes that exist there.
1070#[derive(Debug, Copy, Clone, PartialEq, Eq)]
1071pub enum TargetCluster {
1072    /// The catalog server cluster.
1073    CatalogServer,
1074    /// The current user's active cluster.
1075    Active,
1076    /// The cluster selected at the start of a transaction.
1077    Transaction(ClusterId),
1078}
1079
1080/// Result types for each stage of a sequence.
1081pub(crate) enum StageResult<T> {
1082    /// A task was spawned that will return the next stage.
1083    Handle(JoinHandle<Result<T, AdapterError>>),
1084    /// A task was spawned that will return a response for the client.
1085    HandleRetire(JoinHandle<Result<ExecuteResponse, AdapterError>>),
1086    /// The next stage is immediately ready and will execute.
1087    Immediate(T),
1088    /// The final stage was executed and is ready to respond to the client.
1089    Response(ExecuteResponse),
1090}
1091
1092/// Common functionality for [Coordinator::sequence_staged].
1093pub(crate) trait Staged: Send {
1094    type Ctx: StagedContext;
1095
1096    fn validity(&mut self) -> &mut PlanValidity;
1097
1098    /// Returns the next stage or final result.
1099    async fn stage(
1100        self,
1101        coord: &mut Coordinator,
1102        ctx: &mut Self::Ctx,
1103    ) -> Result<StageResult<Box<Self>>, AdapterError>;
1104
1105    /// Prepares a message for the Coordinator.
1106    fn message(self, ctx: Self::Ctx, span: Span) -> Message;
1107
1108    /// Whether it is safe to SQL cancel this stage.
1109    fn cancel_enabled(&self) -> bool;
1110}
1111
1112pub trait StagedContext {
1113    fn retire(self, result: Result<ExecuteResponse, AdapterError>);
1114    fn session(&self) -> Option<&Session>;
1115}
1116
1117impl StagedContext for ExecuteContext {
1118    fn retire(self, result: Result<ExecuteResponse, AdapterError>) {
1119        self.retire(result);
1120    }
1121
1122    fn session(&self) -> Option<&Session> {
1123        Some(self.session())
1124    }
1125}
1126
1127impl StagedContext for () {
1128    fn retire(self, _result: Result<ExecuteResponse, AdapterError>) {}
1129
1130    fn session(&self) -> Option<&Session> {
1131        None
1132    }
1133}
1134
1135/// Configures a coordinator.
1136pub struct Config {
1137    pub controller_config: ControllerConfig,
1138    pub controller_envd_epoch: NonZeroI64,
1139    pub storage: Box<dyn mz_catalog::durable::DurableCatalogState>,
1140    pub audit_logs_iterator: AuditLogIterator,
1141    pub timestamp_oracle_url: Option<SensitiveUrl>,
1142    pub unsafe_mode: bool,
1143    pub all_features: bool,
1144    pub build_info: &'static BuildInfo,
1145    pub environment_id: EnvironmentId,
1146    pub metrics_registry: MetricsRegistry,
1147    pub now: NowFn,
1148    pub secrets_controller: Arc<dyn SecretsController>,
1149    pub cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
1150    pub availability_zones: Vec<String>,
1151    pub cluster_replica_sizes: ClusterReplicaSizeMap,
1152    pub builtin_system_cluster_config: BootstrapBuiltinClusterConfig,
1153    pub builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig,
1154    pub builtin_probe_cluster_config: BootstrapBuiltinClusterConfig,
1155    pub builtin_support_cluster_config: BootstrapBuiltinClusterConfig,
1156    pub builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig,
1157    pub system_parameter_defaults: BTreeMap<String, String>,
1158    pub storage_usage_client: StorageUsageClient,
1159    pub storage_usage_collection_interval: Duration,
1160    pub storage_usage_retention_period: Option<Duration>,
1161    pub segment_client: Option<mz_segment::Client>,
1162    pub egress_addresses: Vec<IpNet>,
1163    pub remote_system_parameters: Option<BTreeMap<String, String>>,
1164    pub aws_account_id: Option<String>,
1165    pub aws_privatelink_availability_zones: Option<Vec<String>>,
1166    pub connection_context: ConnectionContext,
1167    pub connection_limit_callback: Box<dyn Fn(u64, u64) -> () + Send + Sync + 'static>,
1168    pub webhook_concurrency_limit: WebhookConcurrencyLimiter,
1169    pub http_host_name: Option<String>,
1170    pub tracing_handle: TracingHandle,
1171    /// Whether or not to start controllers in read-only mode. This is only
1172    /// meant for use during development of read-only clusters and 0dt upgrades
1173    /// and should go away once we have proper orchestration during upgrades.
1174    pub read_only_controllers: bool,
1175
1176    /// A trigger that signals that the current deployment has caught up with a
1177    /// previous deployment. Only used during 0dt deployment, while in read-only
1178    /// mode.
1179    pub caught_up_trigger: Option<Trigger>,
1180
1181    pub helm_chart_version: Option<String>,
1182    pub license_key: ValidatedLicenseKey,
1183    pub external_login_password_mz_system: Option<Password>,
1184    pub force_builtin_schema_migration: Option<String>,
1185}
1186
1187/// Metadata about an active connection.
1188#[derive(Debug, Serialize)]
1189pub struct ConnMeta {
1190    /// Pgwire specifies that every connection have a 32-bit secret associated
1191    /// with it, that is known to both the client and the server. Cancellation
1192    /// requests are required to authenticate with the secret of the connection
1193    /// that they are targeting.
1194    secret_key: u32,
1195    /// The time when the session's connection was initiated.
1196    connected_at: EpochMillis,
1197    user: User,
1198    application_name: String,
1199    uuid: Uuid,
1200    conn_id: ConnectionId,
1201    client_ip: Option<IpAddr>,
1202
1203    /// Sinks that will need to be dropped when the current transaction, if
1204    /// any, is cleared.
1205    drop_sinks: BTreeSet<GlobalId>,
1206
1207    /// Lock for the Coordinator's deferred statements that is dropped on transaction clear.
1208    #[serde(skip)]
1209    deferred_lock: Option<OwnedMutexGuard<()>>,
1210
1211    /// Cluster reconfigurations that will need to be
1212    /// cleaned up when the current transaction is cleared
1213    pending_cluster_alters: BTreeSet<ClusterId>,
1214
1215    /// Channel on which to send notices to a session.
1216    #[serde(skip)]
1217    notice_tx: mpsc::UnboundedSender<AdapterNotice>,
1218
1219    /// The role that initiated the database context. Fixed for the duration of the connection.
1220    /// WARNING: This role reference is not updated when the role is dropped.
1221    /// Consumers should not assume that this role exist.
1222    authenticated_role: RoleId,
1223}
1224
1225impl ConnMeta {
1226    pub fn conn_id(&self) -> &ConnectionId {
1227        &self.conn_id
1228    }
1229
1230    pub fn user(&self) -> &User {
1231        &self.user
1232    }
1233
1234    pub fn application_name(&self) -> &str {
1235        &self.application_name
1236    }
1237
1238    pub fn authenticated_role_id(&self) -> &RoleId {
1239        &self.authenticated_role
1240    }
1241
1242    pub fn uuid(&self) -> Uuid {
1243        self.uuid
1244    }
1245
1246    pub fn client_ip(&self) -> Option<IpAddr> {
1247        self.client_ip
1248    }
1249
1250    pub fn connected_at(&self) -> EpochMillis {
1251        self.connected_at
1252    }
1253}
1254
1255#[derive(Debug)]
1256/// A pending transaction waiting to be committed.
1257pub struct PendingTxn {
1258    /// Context used to send a response back to the client.
1259    ctx: ExecuteContext,
1260    /// Client response for transaction.
1261    response: Result<PendingTxnResponse, AdapterError>,
1262    /// The action to take at the end of the transaction.
1263    action: EndTransactionAction,
1264}
1265
1266#[derive(Debug)]
1267/// The response we'll send for a [`PendingTxn`].
1268pub enum PendingTxnResponse {
1269    /// The transaction will be committed.
1270    Committed {
1271        /// Parameters that will change, and their values, once this transaction is complete.
1272        params: BTreeMap<&'static str, String>,
1273    },
1274    /// The transaction will be rolled back.
1275    Rolledback {
1276        /// Parameters that will change, and their values, once this transaction is complete.
1277        params: BTreeMap<&'static str, String>,
1278    },
1279}
1280
1281impl PendingTxnResponse {
1282    pub fn extend_params(&mut self, p: impl IntoIterator<Item = (&'static str, String)>) {
1283        match self {
1284            PendingTxnResponse::Committed { params }
1285            | PendingTxnResponse::Rolledback { params } => params.extend(p),
1286        }
1287    }
1288}
1289
1290impl From<PendingTxnResponse> for ExecuteResponse {
1291    fn from(value: PendingTxnResponse) -> Self {
1292        match value {
1293            PendingTxnResponse::Committed { params } => {
1294                ExecuteResponse::TransactionCommitted { params }
1295            }
1296            PendingTxnResponse::Rolledback { params } => {
1297                ExecuteResponse::TransactionRolledBack { params }
1298            }
1299        }
1300    }
1301}
1302
1303#[derive(Debug)]
1304/// A pending read transaction waiting to be linearized along with metadata about it's state
1305pub struct PendingReadTxn {
1306    /// The transaction type
1307    txn: PendingRead,
1308    /// The timestamp context of the transaction.
1309    timestamp_context: TimestampContext<mz_repr::Timestamp>,
1310    /// When we created this pending txn, when the transaction ends. Only used for metrics.
1311    created: Instant,
1312    /// Number of times we requeued the processing of this pending read txn.
1313    /// Requeueing is necessary if the time we executed the query is after the current oracle time;
1314    /// see [`Coordinator::message_linearize_reads`] for more details.
1315    num_requeues: u64,
1316    /// Telemetry context.
1317    otel_ctx: OpenTelemetryContext,
1318}
1319
1320impl PendingReadTxn {
1321    /// Return the timestamp context of the pending read transaction.
1322    pub fn timestamp_context(&self) -> &TimestampContext<mz_repr::Timestamp> {
1323        &self.timestamp_context
1324    }
1325
1326    pub(crate) fn take_context(self) -> ExecuteContext {
1327        self.txn.take_context()
1328    }
1329}
1330
1331#[derive(Debug)]
1332/// A pending read transaction waiting to be linearized.
1333enum PendingRead {
1334    Read {
1335        /// The inner transaction.
1336        txn: PendingTxn,
1337    },
1338    ReadThenWrite {
1339        /// Context used to send a response back to the client.
1340        ctx: ExecuteContext,
1341        /// Channel used to alert the transaction that the read has been linearized and send back
1342        /// `ctx`.
1343        tx: oneshot::Sender<Option<ExecuteContext>>,
1344    },
1345}
1346
1347impl PendingRead {
1348    /// Alert the client that the read has been linearized.
1349    ///
1350    /// If it is necessary to finalize an execute, return the state necessary to do so
1351    /// (execution context and result)
1352    #[instrument(level = "debug")]
1353    pub fn finish(self) -> Option<(ExecuteContext, Result<ExecuteResponse, AdapterError>)> {
1354        match self {
1355            PendingRead::Read {
1356                txn:
1357                    PendingTxn {
1358                        mut ctx,
1359                        response,
1360                        action,
1361                    },
1362                ..
1363            } => {
1364                let changed = ctx.session_mut().vars_mut().end_transaction(action);
1365                // Append any parameters that changed to the response.
1366                let response = response.map(|mut r| {
1367                    r.extend_params(changed);
1368                    ExecuteResponse::from(r)
1369                });
1370
1371                Some((ctx, response))
1372            }
1373            PendingRead::ReadThenWrite { ctx, tx, .. } => {
1374                // Ignore errors if the caller has hung up.
1375                let _ = tx.send(Some(ctx));
1376                None
1377            }
1378        }
1379    }
1380
1381    fn label(&self) -> &'static str {
1382        match self {
1383            PendingRead::Read { .. } => "read",
1384            PendingRead::ReadThenWrite { .. } => "read_then_write",
1385        }
1386    }
1387
1388    pub(crate) fn take_context(self) -> ExecuteContext {
1389        match self {
1390            PendingRead::Read { txn, .. } => txn.ctx,
1391            PendingRead::ReadThenWrite { ctx, tx, .. } => {
1392                // Inform the transaction that we've taken their context.
1393                // Ignore errors if the caller has hung up.
1394                let _ = tx.send(None);
1395                ctx
1396            }
1397        }
1398    }
1399}
1400
1401/// State that the coordinator must process as part of retiring
1402/// command execution.  `ExecuteContextExtra::Default` is guaranteed
1403/// to produce a value that will cause the coordinator to do nothing, and
1404/// is intended for use by code that invokes the execution processing flow
1405/// (i.e., `sequence_plan`) without actually being a statement execution.
1406///
1407/// This is a pure data struct containing only the statement logging ID.
1408/// For auto-retire-on-drop behavior, use `ExecuteContextGuard` which wraps
1409/// this struct and owns the channel for sending retirement messages.
1410#[derive(Debug, Default)]
1411#[must_use]
1412pub struct ExecuteContextExtra {
1413    statement_uuid: Option<StatementLoggingId>,
1414}
1415
1416impl ExecuteContextExtra {
1417    pub(crate) fn new(statement_uuid: Option<StatementLoggingId>) -> Self {
1418        Self { statement_uuid }
1419    }
1420    pub fn is_trivial(&self) -> bool {
1421        self.statement_uuid.is_none()
1422    }
1423    pub fn contents(&self) -> Option<StatementLoggingId> {
1424        self.statement_uuid
1425    }
1426    /// Consume this extra and return the statement UUID for retirement.
1427    /// This should only be called from code that knows what to do to finish
1428    /// up logging based on the inner value.
1429    #[must_use]
1430    pub(crate) fn retire(self) -> Option<StatementLoggingId> {
1431        self.statement_uuid
1432    }
1433}
1434
1435/// A guard that wraps `ExecuteContextExtra` and owns a channel for sending
1436/// retirement messages to the coordinator.
1437///
1438/// If this guard is dropped with a `Some` `statement_uuid` in its inner
1439/// `ExecuteContextExtra`, the `Drop` implementation will automatically send a
1440/// `Message::RetireExecute` to log the statement ending.
1441/// This handles cases like connection drops where the context cannot be
1442/// explicitly retired.
1443/// See <https://github.com/MaterializeInc/database-issues/issues/7304>
1444#[derive(Debug)]
1445#[must_use]
1446pub struct ExecuteContextGuard {
1447    extra: ExecuteContextExtra,
1448    /// Channel for sending messages to the coordinator. Used for auto-retiring on drop.
1449    /// For `Default` instances, this is a dummy sender (receiver already dropped), so
1450    /// sends will fail silently - which is the desired behavior since Default instances
1451    /// should only be used for non-logged statements.
1452    coordinator_tx: mpsc::UnboundedSender<Message>,
1453}
1454
1455impl Default for ExecuteContextGuard {
1456    fn default() -> Self {
1457        // Create a dummy sender by immediately dropping the receiver.
1458        // Any send on this channel will fail silently, which is the desired
1459        // behavior for Default instances (non-logged statements).
1460        let (tx, _rx) = mpsc::unbounded_channel();
1461        Self {
1462            extra: ExecuteContextExtra::default(),
1463            coordinator_tx: tx,
1464        }
1465    }
1466}
1467
1468impl ExecuteContextGuard {
1469    pub(crate) fn new(
1470        statement_uuid: Option<StatementLoggingId>,
1471        coordinator_tx: mpsc::UnboundedSender<Message>,
1472    ) -> Self {
1473        Self {
1474            extra: ExecuteContextExtra::new(statement_uuid),
1475            coordinator_tx,
1476        }
1477    }
1478    pub fn is_trivial(&self) -> bool {
1479        self.extra.is_trivial()
1480    }
1481    pub fn contents(&self) -> Option<StatementLoggingId> {
1482        self.extra.contents()
1483    }
1484    /// Take responsibility for the contents.  This should only be
1485    /// called from code that knows what to do to finish up logging
1486    /// based on the inner value.
1487    ///
1488    /// Returns the inner `ExecuteContextExtra`, consuming the guard without
1489    /// triggering the auto-retire behavior.
1490    pub(crate) fn defuse(mut self) -> ExecuteContextExtra {
1491        // Taking statement_uuid prevents the Drop impl from sending a retire message
1492        std::mem::take(&mut self.extra)
1493    }
1494}
1495
1496impl Drop for ExecuteContextGuard {
1497    fn drop(&mut self) {
1498        if let Some(statement_uuid) = self.extra.statement_uuid.take() {
1499            // Auto-retire since the guard was dropped without explicit retirement (likely due
1500            // to connection drop).
1501            let msg = Message::RetireExecute {
1502                data: ExecuteContextExtra {
1503                    statement_uuid: Some(statement_uuid),
1504                },
1505                otel_ctx: OpenTelemetryContext::obtain(),
1506                reason: StatementEndedExecutionReason::Aborted,
1507            };
1508            // Send may fail for Default instances (dummy sender), which is fine since
1509            // Default instances should only be used for non-logged statements.
1510            let _ = self.coordinator_tx.send(msg);
1511        }
1512    }
1513}
1514
1515/// Bundle of state related to statement execution.
1516///
1517/// This struct collects a bundle of state that needs to be threaded
1518/// through various functions as part of statement execution.
1519/// It is used to finalize execution, by calling `retire`. Finalizing execution
1520/// involves sending the session back to the pgwire layer so that it
1521/// may be used to process further commands. It also involves
1522/// performing some work on the main coordinator thread
1523/// (e.g., recording the time at which the statement finished
1524/// executing). The state necessary to perform this work is bundled in
1525/// the `ExecuteContextGuard` object.
1526#[derive(Debug)]
1527pub struct ExecuteContext {
1528    inner: Box<ExecuteContextInner>,
1529}
1530
1531impl std::ops::Deref for ExecuteContext {
1532    type Target = ExecuteContextInner;
1533    fn deref(&self) -> &Self::Target {
1534        &*self.inner
1535    }
1536}
1537
1538impl std::ops::DerefMut for ExecuteContext {
1539    fn deref_mut(&mut self) -> &mut Self::Target {
1540        &mut *self.inner
1541    }
1542}
1543
1544#[derive(Debug)]
1545pub struct ExecuteContextInner {
1546    tx: ClientTransmitter<ExecuteResponse>,
1547    internal_cmd_tx: mpsc::UnboundedSender<Message>,
1548    session: Session,
1549    extra: ExecuteContextGuard,
1550}
1551
1552impl ExecuteContext {
1553    pub fn session(&self) -> &Session {
1554        &self.session
1555    }
1556
1557    pub fn session_mut(&mut self) -> &mut Session {
1558        &mut self.session
1559    }
1560
1561    pub fn tx(&self) -> &ClientTransmitter<ExecuteResponse> {
1562        &self.tx
1563    }
1564
1565    pub fn tx_mut(&mut self) -> &mut ClientTransmitter<ExecuteResponse> {
1566        &mut self.tx
1567    }
1568
1569    pub fn from_parts(
1570        tx: ClientTransmitter<ExecuteResponse>,
1571        internal_cmd_tx: mpsc::UnboundedSender<Message>,
1572        session: Session,
1573        extra: ExecuteContextGuard,
1574    ) -> Self {
1575        Self {
1576            inner: ExecuteContextInner {
1577                tx,
1578                session,
1579                extra,
1580                internal_cmd_tx,
1581            }
1582            .into(),
1583        }
1584    }
1585
1586    /// By calling this function, the caller takes responsibility for
1587    /// dealing with the instance of `ExecuteContextGuard`. This is
1588    /// intended to support protocols (like `COPY FROM`) that involve
1589    /// multiple passes of sending the session back and forth between
1590    /// the coordinator and the pgwire layer. As part of any such
1591    /// protocol, we must ensure that the `ExecuteContextGuard`
1592    /// (possibly wrapped in a new `ExecuteContext`) is passed back to the coordinator for
1593    /// eventual retirement.
1594    pub fn into_parts(
1595        self,
1596    ) -> (
1597        ClientTransmitter<ExecuteResponse>,
1598        mpsc::UnboundedSender<Message>,
1599        Session,
1600        ExecuteContextGuard,
1601    ) {
1602        let ExecuteContextInner {
1603            tx,
1604            internal_cmd_tx,
1605            session,
1606            extra,
1607        } = *self.inner;
1608        (tx, internal_cmd_tx, session, extra)
1609    }
1610
1611    /// Retire the execution, by sending a message to the coordinator.
1612    #[instrument(level = "debug")]
1613    pub fn retire(self, result: Result<ExecuteResponse, AdapterError>) {
1614        let ExecuteContextInner {
1615            tx,
1616            internal_cmd_tx,
1617            session,
1618            extra,
1619        } = *self.inner;
1620        let reason = if extra.is_trivial() {
1621            None
1622        } else {
1623            Some((&result).into())
1624        };
1625        tx.send(result, session);
1626        if let Some(reason) = reason {
1627            // Retire the guard to get the inner ExecuteContextExtra without triggering auto-retire
1628            let extra = extra.defuse();
1629            if let Err(e) = internal_cmd_tx.send(Message::RetireExecute {
1630                otel_ctx: OpenTelemetryContext::obtain(),
1631                data: extra,
1632                reason,
1633            }) {
1634                warn!("internal_cmd_rx dropped before we could send: {:?}", e);
1635            }
1636        }
1637    }
1638
1639    pub fn extra(&self) -> &ExecuteContextGuard {
1640        &self.extra
1641    }
1642
1643    pub fn extra_mut(&mut self) -> &mut ExecuteContextGuard {
1644        &mut self.extra
1645    }
1646}
1647
1648#[derive(Debug)]
1649struct ClusterReplicaStatuses(
1650    BTreeMap<ClusterId, BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>>>,
1651);
1652
1653impl ClusterReplicaStatuses {
1654    pub(crate) fn new() -> ClusterReplicaStatuses {
1655        ClusterReplicaStatuses(BTreeMap::new())
1656    }
1657
1658    /// Initializes the statuses of the specified cluster.
1659    ///
1660    /// Panics if the cluster statuses are already initialized.
1661    pub(crate) fn initialize_cluster_statuses(&mut self, cluster_id: ClusterId) {
1662        let prev = self.0.insert(cluster_id, BTreeMap::new());
1663        assert_eq!(
1664            prev, None,
1665            "cluster {cluster_id} statuses already initialized"
1666        );
1667    }
1668
1669    /// Initializes the statuses of the specified cluster replica.
1670    ///
1671    /// Panics if the cluster replica statuses are already initialized.
1672    pub(crate) fn initialize_cluster_replica_statuses(
1673        &mut self,
1674        cluster_id: ClusterId,
1675        replica_id: ReplicaId,
1676        num_processes: usize,
1677        time: DateTime<Utc>,
1678    ) {
1679        tracing::info!(
1680            ?cluster_id,
1681            ?replica_id,
1682            ?time,
1683            "initializing cluster replica status"
1684        );
1685        let replica_statuses = self.0.entry(cluster_id).or_default();
1686        let process_statuses = (0..num_processes)
1687            .map(|process_id| {
1688                let status = ClusterReplicaProcessStatus {
1689                    status: ClusterStatus::Offline(Some(OfflineReason::Initializing)),
1690                    time: time.clone(),
1691                };
1692                (u64::cast_from(process_id), status)
1693            })
1694            .collect();
1695        let prev = replica_statuses.insert(replica_id, process_statuses);
1696        assert_none!(
1697            prev,
1698            "cluster replica {cluster_id}.{replica_id} statuses already initialized"
1699        );
1700    }
1701
1702    /// Removes the statuses of the specified cluster.
1703    ///
1704    /// Panics if the cluster does not exist.
1705    pub(crate) fn remove_cluster_statuses(
1706        &mut self,
1707        cluster_id: &ClusterId,
1708    ) -> BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>> {
1709        let prev = self.0.remove(cluster_id);
1710        prev.unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"))
1711    }
1712
1713    /// Removes the statuses of the specified cluster replica.
1714    ///
1715    /// Panics if the cluster or replica does not exist.
1716    pub(crate) fn remove_cluster_replica_statuses(
1717        &mut self,
1718        cluster_id: &ClusterId,
1719        replica_id: &ReplicaId,
1720    ) -> BTreeMap<ProcessId, ClusterReplicaProcessStatus> {
1721        let replica_statuses = self
1722            .0
1723            .get_mut(cluster_id)
1724            .unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"));
1725        let prev = replica_statuses.remove(replica_id);
1726        prev.unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1727    }
1728
1729    /// Inserts or updates the status of the specified cluster replica process.
1730    ///
1731    /// Panics if the cluster or replica does not exist.
1732    pub(crate) fn ensure_cluster_status(
1733        &mut self,
1734        cluster_id: ClusterId,
1735        replica_id: ReplicaId,
1736        process_id: ProcessId,
1737        status: ClusterReplicaProcessStatus,
1738    ) {
1739        let replica_statuses = self
1740            .0
1741            .get_mut(&cluster_id)
1742            .unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"))
1743            .get_mut(&replica_id)
1744            .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"));
1745        replica_statuses.insert(process_id, status);
1746    }
1747
1748    /// Computes the status of the cluster replica as a whole.
1749    ///
1750    /// Panics if `cluster_id` or `replica_id` don't exist.
1751    pub fn get_cluster_replica_status(
1752        &self,
1753        cluster_id: ClusterId,
1754        replica_id: ReplicaId,
1755    ) -> ClusterStatus {
1756        let process_status = self.get_cluster_replica_statuses(cluster_id, replica_id);
1757        Self::cluster_replica_status(process_status)
1758    }
1759
1760    /// Computes the status of the cluster replica as a whole.
1761    pub fn cluster_replica_status(
1762        process_status: &BTreeMap<ProcessId, ClusterReplicaProcessStatus>,
1763    ) -> ClusterStatus {
1764        process_status
1765            .values()
1766            .fold(ClusterStatus::Online, |s, p| match (s, p.status) {
1767                (ClusterStatus::Online, ClusterStatus::Online) => ClusterStatus::Online,
1768                (x, y) => {
1769                    let reason_x = match x {
1770                        ClusterStatus::Offline(reason) => reason,
1771                        ClusterStatus::Online => None,
1772                    };
1773                    let reason_y = match y {
1774                        ClusterStatus::Offline(reason) => reason,
1775                        ClusterStatus::Online => None,
1776                    };
1777                    // Arbitrarily pick the first known not-ready reason.
1778                    ClusterStatus::Offline(reason_x.or(reason_y))
1779                }
1780            })
1781    }
1782
1783    /// Gets the statuses of the given cluster replica.
1784    ///
1785    /// Panics if the cluster or replica does not exist
1786    pub(crate) fn get_cluster_replica_statuses(
1787        &self,
1788        cluster_id: ClusterId,
1789        replica_id: ReplicaId,
1790    ) -> &BTreeMap<ProcessId, ClusterReplicaProcessStatus> {
1791        self.try_get_cluster_replica_statuses(cluster_id, replica_id)
1792            .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1793    }
1794
1795    /// Gets the statuses of the given cluster replica.
1796    pub(crate) fn try_get_cluster_replica_statuses(
1797        &self,
1798        cluster_id: ClusterId,
1799        replica_id: ReplicaId,
1800    ) -> Option<&BTreeMap<ProcessId, ClusterReplicaProcessStatus>> {
1801        self.try_get_cluster_statuses(cluster_id)
1802            .and_then(|statuses| statuses.get(&replica_id))
1803    }
1804
1805    /// Gets the statuses of the given cluster.
1806    pub(crate) fn try_get_cluster_statuses(
1807        &self,
1808        cluster_id: ClusterId,
1809    ) -> Option<&BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>>> {
1810        self.0.get(&cluster_id)
1811    }
1812}
1813
1814/// Glues the external world to the Timely workers.
1815#[derive(Derivative)]
1816#[derivative(Debug)]
1817pub struct Coordinator {
1818    /// The controller for the storage and compute layers.
1819    #[derivative(Debug = "ignore")]
1820    controller: mz_controller::Controller,
1821    /// The catalog in an Arc suitable for readonly references. The Arc allows
1822    /// us to hand out cheap copies of the catalog to functions that can use it
1823    /// off of the main coordinator thread. If the coordinator needs to mutate
1824    /// the catalog, call [`Self::catalog_mut`], which will clone this struct member,
1825    /// allowing it to be mutated here while the other off-thread references can
1826    /// read their catalog as long as needed. In the future we would like this
1827    /// to be a pTVC, but for now this is sufficient.
1828    catalog: Arc<Catalog>,
1829
1830    /// A client for persist. Initially, this is only used for reading stashed
1831    /// peek responses out of batches.
1832    persist_client: PersistClient,
1833
1834    /// Channel to manage internal commands from the coordinator to itself.
1835    internal_cmd_tx: mpsc::UnboundedSender<Message>,
1836    /// Notification that triggers a group commit.
1837    group_commit_tx: appends::GroupCommitNotifier,
1838
1839    /// Channel for strict serializable reads ready to commit.
1840    strict_serializable_reads_tx: mpsc::UnboundedSender<(ConnectionId, PendingReadTxn)>,
1841
1842    /// Mechanism for totally ordering write and read timestamps, so that all reads
1843    /// reflect exactly the set of writes that precede them, and no writes that follow.
1844    global_timelines: BTreeMap<Timeline, TimelineState<Timestamp>>,
1845
1846    /// A generator for transient [`GlobalId`]s, shareable with other threads.
1847    transient_id_gen: Arc<TransientIdGen>,
1848    /// A map from connection ID to metadata about that connection for all
1849    /// active connections.
1850    active_conns: BTreeMap<ConnectionId, ConnMeta>,
1851
1852    /// For each transaction, the read holds taken to support any performed reads.
1853    ///
1854    /// Upon completing a transaction, these read holds should be dropped.
1855    txn_read_holds: BTreeMap<ConnectionId, read_policy::ReadHolds<Timestamp>>,
1856
1857    /// Access to the peek fields should be restricted to methods in the [`peek`] API.
1858    /// A map from pending peek ids to the queue into which responses are sent, and
1859    /// the connection id of the client that initiated the peek.
1860    pending_peeks: BTreeMap<Uuid, PendingPeek>,
1861    /// A map from client connection ids to a set of all pending peeks for that client.
1862    client_pending_peeks: BTreeMap<ConnectionId, BTreeMap<Uuid, ClusterId>>,
1863
1864    /// A map from client connection ids to pending linearize read transaction.
1865    pending_linearize_read_txns: BTreeMap<ConnectionId, PendingReadTxn>,
1866
1867    /// A map from the compute sink ID to it's state description.
1868    active_compute_sinks: BTreeMap<GlobalId, ActiveComputeSink>,
1869    /// A map from active webhooks to their invalidation handle.
1870    active_webhooks: BTreeMap<CatalogItemId, WebhookAppenderInvalidator>,
1871    /// A map of active `COPY FROM` statements. The Coordinator waits for `clusterd`
1872    /// to stage Batches in Persist that we will then link into the shard.
1873    active_copies: BTreeMap<ConnectionId, ActiveCopyFrom>,
1874
1875    /// A map from connection ids to a watch channel that is set to `true` if the connection
1876    /// received a cancel request.
1877    staged_cancellation: BTreeMap<ConnectionId, (watch::Sender<bool>, watch::Receiver<bool>)>,
1878    /// Active introspection subscribes.
1879    introspection_subscribes: BTreeMap<GlobalId, IntrospectionSubscribe>,
1880
1881    /// Locks that grant access to a specific object, populated lazily as objects are written to.
1882    write_locks: BTreeMap<CatalogItemId, Arc<tokio::sync::Mutex<()>>>,
1883    /// Plans that are currently deferred and waiting on a write lock.
1884    deferred_write_ops: BTreeMap<ConnectionId, DeferredOp>,
1885
1886    /// Pending writes waiting for a group commit.
1887    pending_writes: Vec<PendingWriteTxn>,
1888
1889    /// For the realtime timeline, an explicit SELECT or INSERT on a table will bump the
1890    /// table's timestamps, but there are cases where timestamps are not bumped but
1891    /// we expect the closed timestamps to advance (`AS OF X`, SUBSCRIBing views over
1892    /// RT sources and tables). To address these, spawn a task that forces table
1893    /// timestamps to close on a regular interval. This roughly tracks the behavior
1894    /// of realtime sources that close off timestamps on an interval.
1895    ///
1896    /// For non-realtime timelines, nothing pushes the timestamps forward, so we must do
1897    /// it manually.
1898    advance_timelines_interval: Interval,
1899
1900    /// Serialized DDL. DDL must be serialized because:
1901    /// - Many of them do off-thread work and need to verify the catalog is in a valid state, but
1902    ///   [`PlanValidity`] does not currently support tracking all changes. Doing that correctly
1903    ///   seems to be more difficult than it's worth, so we would instead re-plan and re-sequence
1904    ///   the statements.
1905    /// - Re-planning a statement is hard because Coordinator and Session state is mutated at
1906    ///   various points, and we would need to correctly reset those changes before re-planning and
1907    ///   re-sequencing.
1908    serialized_ddl: LockedVecDeque<DeferredPlanStatement>,
1909
1910    /// Handle to secret manager that can create and delete secrets from
1911    /// an arbitrary secret storage engine.
1912    secrets_controller: Arc<dyn SecretsController>,
1913    /// A secrets reader than maintains an in-memory cache, where values have a set TTL.
1914    caching_secrets_reader: CachingSecretsReader,
1915
1916    /// Handle to a manager that can create and delete kubernetes resources
1917    /// (ie: VpcEndpoint objects)
1918    cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
1919
1920    /// Persist client for fetching storage metadata such as size metrics.
1921    storage_usage_client: StorageUsageClient,
1922    /// The interval at which to collect storage usage information.
1923    storage_usage_collection_interval: Duration,
1924
1925    /// Segment analytics client.
1926    #[derivative(Debug = "ignore")]
1927    segment_client: Option<mz_segment::Client>,
1928
1929    /// Coordinator metrics.
1930    metrics: Metrics,
1931    /// Optimizer metrics.
1932    optimizer_metrics: OptimizerMetrics,
1933
1934    /// Tracing handle.
1935    tracing_handle: TracingHandle,
1936
1937    /// Data used by the statement logging feature.
1938    statement_logging: StatementLogging,
1939
1940    /// Limit for how many concurrent webhook requests we allow.
1941    webhook_concurrency_limit: WebhookConcurrencyLimiter,
1942
1943    /// Optional config for the timestamp oracle. This is _required_ when
1944    /// a timestamp oracle backend is configured.
1945    timestamp_oracle_config: Option<TimestampOracleConfig>,
1946
1947    /// Periodically asks cluster scheduling policies to make their decisions.
1948    check_cluster_scheduling_policies_interval: Interval,
1949
1950    /// This keeps the last On/Off decision for each cluster and each scheduling policy.
1951    /// (Clusters that have been dropped or are otherwise out of scope for automatic scheduling are
1952    /// periodically cleaned up from this Map.)
1953    cluster_scheduling_decisions: BTreeMap<ClusterId, BTreeMap<&'static str, SchedulingDecision>>,
1954
1955    /// When doing 0dt upgrades/in read-only mode, periodically ask all known
1956    /// clusters/collections whether they are caught up.
1957    caught_up_check_interval: Interval,
1958
1959    /// Context needed to check whether all clusters/collections have caught up.
1960    /// Only used during 0dt deployment, while in read-only mode.
1961    caught_up_check: Option<CaughtUpCheckContext>,
1962
1963    /// Tracks the state associated with the currently installed watchsets.
1964    installed_watch_sets: BTreeMap<WatchSetId, (ConnectionId, WatchSetResponse)>,
1965
1966    /// Tracks the currently installed watchsets for each connection.
1967    connection_watch_sets: BTreeMap<ConnectionId, BTreeSet<WatchSetId>>,
1968
1969    /// Tracks the statuses of all cluster replicas.
1970    cluster_replica_statuses: ClusterReplicaStatuses,
1971
1972    /// Whether or not to start controllers in read-only mode. This is only
1973    /// meant for use during development of read-only clusters and 0dt upgrades
1974    /// and should go away once we have proper orchestration during upgrades.
1975    read_only_controllers: bool,
1976
1977    /// Updates to builtin tables that are being buffered while we are in
1978    /// read-only mode. We apply these all at once when coming out of read-only
1979    /// mode.
1980    ///
1981    /// This is a `Some` while in read-only mode and will be replaced by a
1982    /// `None` when we transition out of read-only mode and write out any
1983    /// buffered updates.
1984    buffered_builtin_table_updates: Option<Vec<BuiltinTableUpdate>>,
1985
1986    license_key: ValidatedLicenseKey,
1987
1988    /// Pre-allocated pool of user IDs to amortize persist writes across DDL operations.
1989    user_id_pool: IdPool,
1990}
1991
1992impl Coordinator {
1993    /// Initializes coordinator state based on the contained catalog. Must be
1994    /// called after creating the coordinator and before calling the
1995    /// `Coordinator::serve` method.
1996    #[instrument(name = "coord::bootstrap")]
1997    pub(crate) async fn bootstrap(
1998        &mut self,
1999        boot_ts: Timestamp,
2000        migrated_storage_collections_0dt: BTreeSet<CatalogItemId>,
2001        mut builtin_table_updates: Vec<BuiltinTableUpdate>,
2002        cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
2003        uncached_local_exprs: BTreeMap<GlobalId, LocalExpressions>,
2004        audit_logs_iterator: AuditLogIterator,
2005    ) -> Result<(), AdapterError> {
2006        let bootstrap_start = Instant::now();
2007        info!("startup: coordinator init: bootstrap beginning");
2008        info!("startup: coordinator init: bootstrap: preamble beginning");
2009
2010        // Initialize cluster replica statuses.
2011        // Gross iterator is to avoid partial borrow issues.
2012        let cluster_statuses: Vec<(_, Vec<_>)> = self
2013            .catalog()
2014            .clusters()
2015            .map(|cluster| {
2016                (
2017                    cluster.id(),
2018                    cluster
2019                        .replicas()
2020                        .map(|replica| {
2021                            (replica.replica_id, replica.config.location.num_processes())
2022                        })
2023                        .collect(),
2024                )
2025            })
2026            .collect();
2027        let now = self.now_datetime();
2028        for (cluster_id, replica_statuses) in cluster_statuses {
2029            self.cluster_replica_statuses
2030                .initialize_cluster_statuses(cluster_id);
2031            for (replica_id, num_processes) in replica_statuses {
2032                self.cluster_replica_statuses
2033                    .initialize_cluster_replica_statuses(
2034                        cluster_id,
2035                        replica_id,
2036                        num_processes,
2037                        now,
2038                    );
2039            }
2040        }
2041
2042        let system_config = self.catalog().system_config();
2043
2044        // Inform metrics about the initial system configuration.
2045        mz_metrics::update_dyncfg(&system_config.dyncfg_updates());
2046
2047        // Inform the controllers about their initial configuration.
2048        let compute_config = flags::compute_config(system_config);
2049        let storage_config = flags::storage_config(system_config);
2050        let scheduling_config = flags::orchestrator_scheduling_config(system_config);
2051        let dyncfg_updates = system_config.dyncfg_updates();
2052        self.controller.compute.update_configuration(compute_config);
2053        self.controller.storage.update_parameters(storage_config);
2054        self.controller
2055            .update_orchestrator_scheduling_config(scheduling_config);
2056        self.controller.update_configuration(dyncfg_updates);
2057
2058        self.validate_resource_limit_numeric(
2059            Numeric::zero(),
2060            self.current_credit_consumption_rate(),
2061            |system_vars| {
2062                self.license_key
2063                    .max_credit_consumption_rate()
2064                    .map_or_else(|| system_vars.max_credit_consumption_rate(), Numeric::from)
2065            },
2066            "cluster replica",
2067            MAX_CREDIT_CONSUMPTION_RATE.name(),
2068        )?;
2069
2070        let mut policies_to_set: BTreeMap<CompactionWindow, CollectionIdBundle> =
2071            Default::default();
2072
2073        let enable_worker_core_affinity =
2074            self.catalog().system_config().enable_worker_core_affinity();
2075        for instance in self.catalog.clusters() {
2076            self.controller.create_cluster(
2077                instance.id,
2078                ClusterConfig {
2079                    arranged_logs: instance.log_indexes.clone(),
2080                    workload_class: instance.config.workload_class.clone(),
2081                },
2082            )?;
2083            for replica in instance.replicas() {
2084                let role = instance.role();
2085                self.controller.create_replica(
2086                    instance.id,
2087                    replica.replica_id,
2088                    instance.name.clone(),
2089                    replica.name.clone(),
2090                    role,
2091                    replica.config.clone(),
2092                    enable_worker_core_affinity,
2093                )?;
2094            }
2095        }
2096
2097        info!(
2098            "startup: coordinator init: bootstrap: preamble complete in {:?}",
2099            bootstrap_start.elapsed()
2100        );
2101
2102        let init_storage_collections_start = Instant::now();
2103        info!("startup: coordinator init: bootstrap: storage collections init beginning");
2104        self.bootstrap_storage_collections(&migrated_storage_collections_0dt)
2105            .await;
2106        info!(
2107            "startup: coordinator init: bootstrap: storage collections init complete in {:?}",
2108            init_storage_collections_start.elapsed()
2109        );
2110
2111        // The storage controller knows about the introspection collections now, so we can start
2112        // sinking introspection updates in the compute controller. It makes sense to do that as
2113        // soon as possible, to avoid updates piling up in the compute controller's internal
2114        // buffers.
2115        self.controller.start_compute_introspection_sink();
2116
2117        let sorting_start = Instant::now();
2118        info!("startup: coordinator init: bootstrap: sorting catalog entries");
2119        let entries = self.bootstrap_sort_catalog_entries();
2120        info!(
2121            "startup: coordinator init: bootstrap: sorting catalog entries complete in {:?}",
2122            sorting_start.elapsed()
2123        );
2124
2125        let optimize_dataflows_start = Instant::now();
2126        info!("startup: coordinator init: bootstrap: optimize dataflow plans beginning");
2127        let uncached_global_exps = self.bootstrap_dataflow_plans(&entries, cached_global_exprs)?;
2128        info!(
2129            "startup: coordinator init: bootstrap: optimize dataflow plans complete in {:?}",
2130            optimize_dataflows_start.elapsed()
2131        );
2132
2133        // We don't need to wait for the cache to update.
2134        let _fut = self.catalog().update_expression_cache(
2135            uncached_local_exprs.into_iter().collect(),
2136            uncached_global_exps.into_iter().collect(),
2137            Default::default(),
2138        );
2139
2140        // Select dataflow as-ofs. This step relies on the storage collections created by
2141        // `bootstrap_storage_collections` and the dataflow plans created by
2142        // `bootstrap_dataflow_plans`.
2143        let bootstrap_as_ofs_start = Instant::now();
2144        info!("startup: coordinator init: bootstrap: dataflow as-of bootstrapping beginning");
2145        let dataflow_read_holds = self.bootstrap_dataflow_as_ofs().await;
2146        info!(
2147            "startup: coordinator init: bootstrap: dataflow as-of bootstrapping complete in {:?}",
2148            bootstrap_as_ofs_start.elapsed()
2149        );
2150
2151        let postamble_start = Instant::now();
2152        info!("startup: coordinator init: bootstrap: postamble beginning");
2153
2154        let logs: BTreeSet<_> = BUILTINS::logs()
2155            .map(|log| self.catalog().resolve_builtin_log(log))
2156            .flat_map(|item_id| self.catalog().get_global_ids(&item_id))
2157            .collect();
2158
2159        let mut privatelink_connections = BTreeMap::new();
2160
2161        for entry in &entries {
2162            debug!(
2163                "coordinator init: installing {} {}",
2164                entry.item().typ(),
2165                entry.id()
2166            );
2167            let mut policy = entry.item().initial_logical_compaction_window();
2168            match entry.item() {
2169                // Currently catalog item rebuild assumes that sinks and
2170                // indexes are always built individually and does not store information
2171                // about how it was built. If we start building multiple sinks and/or indexes
2172                // using a single dataflow, we have to make sure the rebuild process re-runs
2173                // the same multiple-build dataflow.
2174                CatalogItem::Source(source) => {
2175                    // Propagate source compaction windows to subsources if needed.
2176                    if source.custom_logical_compaction_window.is_none() {
2177                        if let DataSourceDesc::IngestionExport { ingestion_id, .. } =
2178                            source.data_source
2179                        {
2180                            policy = Some(
2181                                self.catalog()
2182                                    .get_entry(&ingestion_id)
2183                                    .source()
2184                                    .expect("must be source")
2185                                    .custom_logical_compaction_window
2186                                    .unwrap_or_default(),
2187                            );
2188                        }
2189                    }
2190                    policies_to_set
2191                        .entry(policy.expect("sources have a compaction window"))
2192                        .or_insert_with(Default::default)
2193                        .storage_ids
2194                        .insert(source.global_id());
2195                }
2196                CatalogItem::Table(table) => {
2197                    policies_to_set
2198                        .entry(policy.expect("tables have a compaction window"))
2199                        .or_insert_with(Default::default)
2200                        .storage_ids
2201                        .extend(table.global_ids());
2202                }
2203                CatalogItem::Index(idx) => {
2204                    let policy_entry = policies_to_set
2205                        .entry(policy.expect("indexes have a compaction window"))
2206                        .or_insert_with(Default::default);
2207
2208                    if logs.contains(&idx.on) {
2209                        policy_entry
2210                            .compute_ids
2211                            .entry(idx.cluster_id)
2212                            .or_insert_with(BTreeSet::new)
2213                            .insert(idx.global_id());
2214                    } else {
2215                        let df_desc = self
2216                            .catalog()
2217                            .try_get_physical_plan(&idx.global_id())
2218                            .expect("added in `bootstrap_dataflow_plans`")
2219                            .clone();
2220
2221                        let df_meta = self
2222                            .catalog()
2223                            .try_get_dataflow_metainfo(&idx.global_id())
2224                            .expect("added in `bootstrap_dataflow_plans`");
2225
2226                        if self.catalog().state().system_config().enable_mz_notices() {
2227                            // Collect optimization hint updates.
2228                            self.catalog().state().pack_optimizer_notices(
2229                                &mut builtin_table_updates,
2230                                df_meta.optimizer_notices.iter(),
2231                                Diff::ONE,
2232                            );
2233                        }
2234
2235                        // What follows is morally equivalent to `self.ship_dataflow(df, idx.cluster_id)`,
2236                        // but we cannot call that as it will also downgrade the read hold on the index.
2237                        policy_entry
2238                            .compute_ids
2239                            .entry(idx.cluster_id)
2240                            .or_insert_with(Default::default)
2241                            .extend(df_desc.export_ids());
2242
2243                        self.controller
2244                            .compute
2245                            .create_dataflow(idx.cluster_id, df_desc, None)
2246                            .unwrap_or_terminate("cannot fail to create dataflows");
2247                    }
2248                }
2249                CatalogItem::View(_) => (),
2250                CatalogItem::MaterializedView(mview) => {
2251                    policies_to_set
2252                        .entry(policy.expect("materialized views have a compaction window"))
2253                        .or_insert_with(Default::default)
2254                        .storage_ids
2255                        .insert(mview.global_id_writes());
2256
2257                    let mut df_desc = self
2258                        .catalog()
2259                        .try_get_physical_plan(&mview.global_id_writes())
2260                        .expect("added in `bootstrap_dataflow_plans`")
2261                        .clone();
2262
2263                    if let Some(initial_as_of) = mview.initial_as_of.clone() {
2264                        df_desc.set_initial_as_of(initial_as_of);
2265                    }
2266
2267                    // If we have a refresh schedule that has a last refresh, then set the `until` to the last refresh.
2268                    let until = mview
2269                        .refresh_schedule
2270                        .as_ref()
2271                        .and_then(|s| s.last_refresh())
2272                        .and_then(|r| r.try_step_forward());
2273                    if let Some(until) = until {
2274                        df_desc.until.meet_assign(&Antichain::from_elem(until));
2275                    }
2276
2277                    let df_meta = self
2278                        .catalog()
2279                        .try_get_dataflow_metainfo(&mview.global_id_writes())
2280                        .expect("added in `bootstrap_dataflow_plans`");
2281
2282                    if self.catalog().state().system_config().enable_mz_notices() {
2283                        // Collect optimization hint updates.
2284                        self.catalog().state().pack_optimizer_notices(
2285                            &mut builtin_table_updates,
2286                            df_meta.optimizer_notices.iter(),
2287                            Diff::ONE,
2288                        );
2289                    }
2290
2291                    self.ship_dataflow(df_desc, mview.cluster_id, mview.target_replica)
2292                        .await;
2293
2294                    // If this is a replacement MV, it must remain read-only until the replacement
2295                    // gets applied.
2296                    if mview.replacement_target.is_none() {
2297                        self.allow_writes(mview.cluster_id, mview.global_id_writes());
2298                    }
2299                }
2300                CatalogItem::Sink(sink) => {
2301                    policies_to_set
2302                        .entry(CompactionWindow::Default)
2303                        .or_insert_with(Default::default)
2304                        .storage_ids
2305                        .insert(sink.global_id());
2306                }
2307                CatalogItem::Connection(catalog_connection) => {
2308                    if let ConnectionDetails::AwsPrivatelink(conn) = &catalog_connection.details {
2309                        privatelink_connections.insert(
2310                            entry.id(),
2311                            VpcEndpointConfig {
2312                                aws_service_name: conn.service_name.clone(),
2313                                availability_zone_ids: conn.availability_zones.clone(),
2314                            },
2315                        );
2316                    }
2317                }
2318                CatalogItem::ContinualTask(ct) => {
2319                    policies_to_set
2320                        .entry(policy.expect("continual tasks have a compaction window"))
2321                        .or_insert_with(Default::default)
2322                        .storage_ids
2323                        .insert(ct.global_id());
2324
2325                    let mut df_desc = self
2326                        .catalog()
2327                        .try_get_physical_plan(&ct.global_id())
2328                        .expect("added in `bootstrap_dataflow_plans`")
2329                        .clone();
2330
2331                    if let Some(initial_as_of) = ct.initial_as_of.clone() {
2332                        df_desc.set_initial_as_of(initial_as_of);
2333                    }
2334
2335                    let df_meta = self
2336                        .catalog()
2337                        .try_get_dataflow_metainfo(&ct.global_id())
2338                        .expect("added in `bootstrap_dataflow_plans`");
2339
2340                    if self.catalog().state().system_config().enable_mz_notices() {
2341                        // Collect optimization hint updates.
2342                        self.catalog().state().pack_optimizer_notices(
2343                            &mut builtin_table_updates,
2344                            df_meta.optimizer_notices.iter(),
2345                            Diff::ONE,
2346                        );
2347                    }
2348
2349                    self.ship_dataflow(df_desc, ct.cluster_id, None).await;
2350                    self.allow_writes(ct.cluster_id, ct.global_id());
2351                }
2352                // Nothing to do for these cases
2353                CatalogItem::Log(_)
2354                | CatalogItem::Type(_)
2355                | CatalogItem::Func(_)
2356                | CatalogItem::Secret(_) => {}
2357            }
2358        }
2359
2360        if let Some(cloud_resource_controller) = &self.cloud_resource_controller {
2361            // Clean up any extraneous VpcEndpoints that shouldn't exist.
2362            let existing_vpc_endpoints = cloud_resource_controller
2363                .list_vpc_endpoints()
2364                .await
2365                .context("list vpc endpoints")?;
2366            let existing_vpc_endpoints = BTreeSet::from_iter(existing_vpc_endpoints.into_keys());
2367            let desired_vpc_endpoints = privatelink_connections.keys().cloned().collect();
2368            let vpc_endpoints_to_remove = existing_vpc_endpoints.difference(&desired_vpc_endpoints);
2369            for id in vpc_endpoints_to_remove {
2370                cloud_resource_controller
2371                    .delete_vpc_endpoint(*id)
2372                    .await
2373                    .context("deleting extraneous vpc endpoint")?;
2374            }
2375
2376            // Ensure desired VpcEndpoints are up to date.
2377            for (id, spec) in privatelink_connections {
2378                cloud_resource_controller
2379                    .ensure_vpc_endpoint(id, spec)
2380                    .await
2381                    .context("ensuring vpc endpoint")?;
2382            }
2383        }
2384
2385        // Having installed all entries, creating all constraints, we can now drop read holds and
2386        // relax read policies.
2387        drop(dataflow_read_holds);
2388        // TODO -- Improve `initialize_read_policies` API so we can avoid calling this in a loop.
2389        for (cw, policies) in policies_to_set {
2390            self.initialize_read_policies(&policies, cw).await;
2391        }
2392
2393        // Expose mapping from T-shirt sizes to actual sizes
2394        builtin_table_updates.extend(
2395            self.catalog().state().resolve_builtin_table_updates(
2396                self.catalog().state().pack_all_replica_size_updates(),
2397            ),
2398        );
2399
2400        debug!("startup: coordinator init: bootstrap: initializing migrated builtin tables");
2401        // When 0dt is enabled, we create new shards for any migrated builtin storage collections.
2402        // In read-only mode, the migrated builtin tables (which are a subset of migrated builtin
2403        // storage collections) need to be back-filled so that any dependent dataflow can be
2404        // hydrated. Additionally, these shards are not registered with the txn-shard, and cannot
2405        // be registered while in read-only, so they are written to directly.
2406        let migrated_updates_fut = if self.controller.read_only() {
2407            let min_timestamp = Timestamp::minimum();
2408            let migrated_builtin_table_updates: Vec<_> = builtin_table_updates
2409                .extract_if(.., |update| {
2410                    let gid = self.catalog().get_entry(&update.id).latest_global_id();
2411                    migrated_storage_collections_0dt.contains(&update.id)
2412                        && self
2413                            .controller
2414                            .storage_collections
2415                            .collection_frontiers(gid)
2416                            .expect("all tables are registered")
2417                            .write_frontier
2418                            .elements()
2419                            == &[min_timestamp]
2420                })
2421                .collect();
2422            if migrated_builtin_table_updates.is_empty() {
2423                futures::future::ready(()).boxed()
2424            } else {
2425                // Group all updates per-table.
2426                let mut grouped_appends: BTreeMap<GlobalId, Vec<TableData>> = BTreeMap::new();
2427                for update in migrated_builtin_table_updates {
2428                    let gid = self.catalog().get_entry(&update.id).latest_global_id();
2429                    grouped_appends.entry(gid).or_default().push(update.data);
2430                }
2431                info!(
2432                    "coordinator init: rehydrating migrated builtin tables in read-only mode: {:?}",
2433                    grouped_appends.keys().collect::<Vec<_>>()
2434                );
2435
2436                // Consolidate Row data, staged batches must already be consolidated.
2437                let mut all_appends = Vec::with_capacity(grouped_appends.len());
2438                for (item_id, table_data) in grouped_appends.into_iter() {
2439                    let mut all_rows = Vec::new();
2440                    let mut all_data = Vec::new();
2441                    for data in table_data {
2442                        match data {
2443                            TableData::Rows(rows) => all_rows.extend(rows),
2444                            TableData::Batches(_) => all_data.push(data),
2445                        }
2446                    }
2447                    differential_dataflow::consolidation::consolidate(&mut all_rows);
2448                    all_data.push(TableData::Rows(all_rows));
2449
2450                    // TODO(parkmycar): Use SmallVec throughout.
2451                    all_appends.push((item_id, all_data));
2452                }
2453
2454                let fut = self
2455                    .controller
2456                    .storage
2457                    .append_table(min_timestamp, boot_ts.step_forward(), all_appends)
2458                    .expect("cannot fail to append");
2459                async {
2460                    fut.await
2461                        .expect("One-shot shouldn't be dropped during bootstrap")
2462                        .unwrap_or_terminate("cannot fail to append")
2463                }
2464                .boxed()
2465            }
2466        } else {
2467            futures::future::ready(()).boxed()
2468        };
2469
2470        info!(
2471            "startup: coordinator init: bootstrap: postamble complete in {:?}",
2472            postamble_start.elapsed()
2473        );
2474
2475        let builtin_update_start = Instant::now();
2476        info!("startup: coordinator init: bootstrap: generate builtin updates beginning");
2477
2478        if self.controller.read_only() {
2479            info!(
2480                "coordinator init: bootstrap: stashing builtin table updates while in read-only mode"
2481            );
2482
2483            // TODO(jkosh44) Optimize deserializing the audit log in read-only mode.
2484            let audit_join_start = Instant::now();
2485            info!("startup: coordinator init: bootstrap: audit log deserialization beginning");
2486            let audit_log_updates: Vec<_> = audit_logs_iterator
2487                .map(|(audit_log, ts)| StateUpdate {
2488                    kind: StateUpdateKind::AuditLog(audit_log),
2489                    ts,
2490                    diff: StateDiff::Addition,
2491                })
2492                .collect();
2493            let audit_log_builtin_table_updates = self
2494                .catalog()
2495                .state()
2496                .generate_builtin_table_updates(audit_log_updates);
2497            builtin_table_updates.extend(audit_log_builtin_table_updates);
2498            info!(
2499                "startup: coordinator init: bootstrap: audit log deserialization complete in {:?}",
2500                audit_join_start.elapsed()
2501            );
2502            self.buffered_builtin_table_updates
2503                .as_mut()
2504                .expect("in read-only mode")
2505                .append(&mut builtin_table_updates);
2506        } else {
2507            self.bootstrap_tables(&entries, builtin_table_updates, audit_logs_iterator)
2508                .await;
2509        };
2510        info!(
2511            "startup: coordinator init: bootstrap: generate builtin updates complete in {:?}",
2512            builtin_update_start.elapsed()
2513        );
2514
2515        let cleanup_secrets_start = Instant::now();
2516        info!("startup: coordinator init: bootstrap: generate secret cleanup beginning");
2517        // Cleanup orphaned secrets. Errors during list() or delete() do not
2518        // need to prevent bootstrap from succeeding; we will retry next
2519        // startup.
2520        {
2521            // Destructure Self so we can selectively move fields into the async
2522            // task.
2523            let Self {
2524                secrets_controller,
2525                catalog,
2526                ..
2527            } = self;
2528
2529            let next_user_item_id = catalog.get_next_user_item_id().await?;
2530            let next_system_item_id = catalog.get_next_system_item_id().await?;
2531            let read_only = self.controller.read_only();
2532            // Fetch all IDs from the catalog to future-proof against other
2533            // things using secrets. Today, SECRET and CONNECTION objects use
2534            // secrets_controller.ensure, but more things could in the future
2535            // that would be easy to miss adding here.
2536            let catalog_ids: BTreeSet<CatalogItemId> =
2537                catalog.entries().map(|entry| entry.id()).collect();
2538            let secrets_controller = Arc::clone(secrets_controller);
2539
2540            spawn(|| "cleanup-orphaned-secrets", async move {
2541                if read_only {
2542                    info!(
2543                        "coordinator init: not cleaning up orphaned secrets while in read-only mode"
2544                    );
2545                    return;
2546                }
2547                info!("coordinator init: cleaning up orphaned secrets");
2548
2549                match secrets_controller.list().await {
2550                    Ok(controller_secrets) => {
2551                        let controller_secrets: BTreeSet<CatalogItemId> =
2552                            controller_secrets.into_iter().collect();
2553                        let orphaned = controller_secrets.difference(&catalog_ids);
2554                        for id in orphaned {
2555                            let id_too_large = match id {
2556                                CatalogItemId::System(id) => *id >= next_system_item_id,
2557                                CatalogItemId::User(id) => *id >= next_user_item_id,
2558                                CatalogItemId::IntrospectionSourceIndex(_)
2559                                | CatalogItemId::Transient(_) => false,
2560                            };
2561                            if id_too_large {
2562                                info!(
2563                                    %next_user_item_id, %next_system_item_id,
2564                                    "coordinator init: not deleting orphaned secret {id} that was likely created by a newer deploy generation"
2565                                );
2566                            } else {
2567                                info!("coordinator init: deleting orphaned secret {id}");
2568                                fail_point!("orphan_secrets");
2569                                if let Err(e) = secrets_controller.delete(*id).await {
2570                                    warn!(
2571                                        "Dropping orphaned secret has encountered an error: {}",
2572                                        e
2573                                    );
2574                                }
2575                            }
2576                        }
2577                    }
2578                    Err(e) => warn!("Failed to list secrets during orphan cleanup: {:?}", e),
2579                }
2580            });
2581        }
2582        info!(
2583            "startup: coordinator init: bootstrap: generate secret cleanup complete in {:?}",
2584            cleanup_secrets_start.elapsed()
2585        );
2586
2587        // Run all of our final steps concurrently.
2588        let final_steps_start = Instant::now();
2589        info!(
2590            "startup: coordinator init: bootstrap: migrate builtin tables in read-only mode beginning"
2591        );
2592        migrated_updates_fut
2593            .instrument(info_span!("coord::bootstrap::final"))
2594            .await;
2595
2596        debug!(
2597            "startup: coordinator init: bootstrap: announcing completion of initialization to controller"
2598        );
2599        // Announce the completion of initialization.
2600        self.controller.initialization_complete();
2601
2602        // Initialize unified introspection.
2603        self.bootstrap_introspection_subscribes().await;
2604
2605        info!(
2606            "startup: coordinator init: bootstrap: migrate builtin tables in read-only mode complete in {:?}",
2607            final_steps_start.elapsed()
2608        );
2609
2610        info!(
2611            "startup: coordinator init: bootstrap complete in {:?}",
2612            bootstrap_start.elapsed()
2613        );
2614        Ok(())
2615    }
2616
2617    /// Prepares tables for writing by resetting them to a known state and
2618    /// appending the given builtin table updates. The timestamp oracle
2619    /// will be advanced to the write timestamp of the append when this
2620    /// method returns.
2621    #[allow(clippy::async_yields_async)]
2622    #[instrument]
2623    async fn bootstrap_tables(
2624        &mut self,
2625        entries: &[CatalogEntry],
2626        mut builtin_table_updates: Vec<BuiltinTableUpdate>,
2627        audit_logs_iterator: AuditLogIterator,
2628    ) {
2629        /// Smaller helper struct of metadata for bootstrapping tables.
2630        struct TableMetadata<'a> {
2631            id: CatalogItemId,
2632            name: &'a QualifiedItemName,
2633            table: &'a Table,
2634        }
2635
2636        // Filter our entries down to just tables.
2637        let table_metas: Vec<_> = entries
2638            .into_iter()
2639            .filter_map(|entry| {
2640                entry.table().map(|table| TableMetadata {
2641                    id: entry.id(),
2642                    name: entry.name(),
2643                    table,
2644                })
2645            })
2646            .collect();
2647
2648        // Append empty batches to advance the timestamp of all tables.
2649        debug!("coordinator init: advancing all tables to current timestamp");
2650        let WriteTimestamp {
2651            timestamp: write_ts,
2652            advance_to,
2653        } = self.get_local_write_ts().await;
2654        let appends = table_metas
2655            .iter()
2656            .map(|meta| (meta.table.global_id_writes(), Vec::new()))
2657            .collect();
2658        // Append the tables in the background. We apply the write timestamp before getting a read
2659        // timestamp and reading a snapshot of each table, so the snapshots will block on their own
2660        // until the appends are complete.
2661        let table_fence_rx = self
2662            .controller
2663            .storage
2664            .append_table(write_ts.clone(), advance_to, appends)
2665            .expect("invalid updates");
2666
2667        self.apply_local_write(write_ts).await;
2668
2669        // Add builtin table updates the clear the contents of all system tables
2670        debug!("coordinator init: resetting system tables");
2671        let read_ts = self.get_local_read_ts().await;
2672
2673        // Filter out the 'mz_storage_usage_by_shard' table since we need to retain that info for
2674        // billing purposes.
2675        let mz_storage_usage_by_shard_schema: SchemaSpecifier = self
2676            .catalog()
2677            .resolve_system_schema(MZ_STORAGE_USAGE_BY_SHARD.schema)
2678            .into();
2679        let is_storage_usage_by_shard = |meta: &TableMetadata| -> bool {
2680            meta.name.item == MZ_STORAGE_USAGE_BY_SHARD.name
2681                && meta.name.qualifiers.schema_spec == mz_storage_usage_by_shard_schema
2682        };
2683
2684        let mut retraction_tasks = Vec::new();
2685        let mut system_tables: Vec<_> = table_metas
2686            .iter()
2687            .filter(|meta| meta.id.is_system() && !is_storage_usage_by_shard(meta))
2688            .collect();
2689
2690        // Special case audit events because it's append only.
2691        let (audit_events_idx, _) = system_tables
2692            .iter()
2693            .find_position(|table| {
2694                table.id == self.catalog().resolve_builtin_table(&MZ_AUDIT_EVENTS)
2695            })
2696            .expect("mz_audit_events must exist");
2697        let audit_events = system_tables.remove(audit_events_idx);
2698        let audit_log_task = self.bootstrap_audit_log_table(
2699            audit_events.id,
2700            audit_events.name,
2701            audit_events.table,
2702            audit_logs_iterator,
2703            read_ts,
2704        );
2705
2706        for system_table in system_tables {
2707            let table_id = system_table.id;
2708            let full_name = self.catalog().resolve_full_name(system_table.name, None);
2709            debug!("coordinator init: resetting system table {full_name} ({table_id})");
2710
2711            // Fetch the current contents of the table for retraction.
2712            let snapshot_fut = self
2713                .controller
2714                .storage_collections
2715                .snapshot_cursor(system_table.table.global_id_writes(), read_ts);
2716            let batch_fut = self
2717                .controller
2718                .storage_collections
2719                .create_update_builder(system_table.table.global_id_writes());
2720
2721            let task = spawn(|| format!("snapshot-{table_id}"), async move {
2722                // Create a TimestamplessUpdateBuilder.
2723                let mut batch = batch_fut
2724                    .await
2725                    .unwrap_or_terminate("cannot fail to create a batch for a BuiltinTable");
2726                tracing::info!(?table_id, "starting snapshot");
2727                // Get a cursor which will emit a consolidated snapshot.
2728                let mut snapshot_cursor = snapshot_fut
2729                    .await
2730                    .unwrap_or_terminate("cannot fail to snapshot");
2731
2732                // Retract the current contents, spilling into our builder.
2733                while let Some(values) = snapshot_cursor.next().await {
2734                    for (key, _t, d) in values {
2735                        let d_invert = d.neg();
2736                        batch.add(&key, &(), &d_invert).await;
2737                    }
2738                }
2739                tracing::info!(?table_id, "finished snapshot");
2740
2741                let batch = batch.finish().await;
2742                BuiltinTableUpdate::batch(table_id, batch)
2743            });
2744            retraction_tasks.push(task);
2745        }
2746
2747        let retractions_res = futures::future::join_all(retraction_tasks).await;
2748        for retractions in retractions_res {
2749            builtin_table_updates.push(retractions);
2750        }
2751
2752        let audit_join_start = Instant::now();
2753        info!("startup: coordinator init: bootstrap: join audit log deserialization beginning");
2754        let audit_log_updates = audit_log_task.await;
2755        let audit_log_builtin_table_updates = self
2756            .catalog()
2757            .state()
2758            .generate_builtin_table_updates(audit_log_updates);
2759        builtin_table_updates.extend(audit_log_builtin_table_updates);
2760        info!(
2761            "startup: coordinator init: bootstrap: join audit log deserialization complete in {:?}",
2762            audit_join_start.elapsed()
2763        );
2764
2765        // Now that the snapshots are complete, the appends must also be complete.
2766        table_fence_rx
2767            .await
2768            .expect("One-shot shouldn't be dropped during bootstrap")
2769            .unwrap_or_terminate("cannot fail to append");
2770
2771        info!("coordinator init: sending builtin table updates");
2772        let (_builtin_updates_fut, write_ts) = self
2773            .builtin_table_update()
2774            .execute(builtin_table_updates)
2775            .await;
2776        info!(?write_ts, "our write ts");
2777        if let Some(write_ts) = write_ts {
2778            self.apply_local_write(write_ts).await;
2779        }
2780    }
2781
2782    /// Prepare updates to the audit log table. The audit log table append only and very large, so
2783    /// we only need to find the events present in `audit_logs_iterator` but not in the audit log
2784    /// table.
2785    #[instrument]
2786    fn bootstrap_audit_log_table<'a>(
2787        &self,
2788        table_id: CatalogItemId,
2789        name: &'a QualifiedItemName,
2790        table: &'a Table,
2791        audit_logs_iterator: AuditLogIterator,
2792        read_ts: Timestamp,
2793    ) -> JoinHandle<Vec<StateUpdate>> {
2794        let full_name = self.catalog().resolve_full_name(name, None);
2795        debug!("coordinator init: reconciling audit log: {full_name} ({table_id})");
2796        let current_contents_fut = self
2797            .controller
2798            .storage_collections
2799            .snapshot(table.global_id_writes(), read_ts);
2800        spawn(|| format!("snapshot-audit-log-{table_id}"), async move {
2801            let current_contents = current_contents_fut
2802                .await
2803                .unwrap_or_terminate("cannot fail to fetch snapshot");
2804            let contents_len = current_contents.len();
2805            debug!("coordinator init: audit log table ({table_id}) size {contents_len}");
2806
2807            // Fetch the largest audit log event ID that has been written to the table.
2808            let max_table_id = current_contents
2809                .into_iter()
2810                .filter(|(_, diff)| *diff == 1)
2811                .map(|(row, _diff)| row.unpack_first().unwrap_uint64())
2812                .sorted()
2813                .rev()
2814                .next();
2815
2816            // Filter audit log catalog updates to those that are not present in the table.
2817            audit_logs_iterator
2818                .take_while(|(audit_log, _)| match max_table_id {
2819                    Some(id) => audit_log.event.sortable_id() > id,
2820                    None => true,
2821                })
2822                .map(|(audit_log, ts)| StateUpdate {
2823                    kind: StateUpdateKind::AuditLog(audit_log),
2824                    ts,
2825                    diff: StateDiff::Addition,
2826                })
2827                .collect::<Vec<_>>()
2828        })
2829    }
2830
2831    /// Initializes all storage collections required by catalog objects in the storage controller.
2832    ///
2833    /// This method takes care of collection creation, as well as migration of existing
2834    /// collections.
2835    ///
2836    /// Creating all storage collections in a single `create_collections` call, rather than on
2837    /// demand, is more efficient as it reduces the number of writes to durable storage. It also
2838    /// allows subsequent bootstrap logic to fetch metadata (such as frontiers) of arbitrary
2839    /// storage collections, without needing to worry about dependency order.
2840    ///
2841    /// `migrated_storage_collections` is a set of builtin storage collections that have been
2842    /// migrated and should be handled specially.
2843    #[instrument]
2844    async fn bootstrap_storage_collections(
2845        &mut self,
2846        migrated_storage_collections: &BTreeSet<CatalogItemId>,
2847    ) {
2848        let catalog = self.catalog();
2849
2850        let source_desc = |object_id: GlobalId,
2851                           data_source: &DataSourceDesc,
2852                           desc: &RelationDesc,
2853                           timeline: &Timeline| {
2854            let data_source = match data_source.clone() {
2855                // Re-announce the source description.
2856                DataSourceDesc::Ingestion { desc, cluster_id } => {
2857                    let desc = desc.into_inline_connection(catalog.state());
2858                    let ingestion = IngestionDescription::new(desc, cluster_id, object_id);
2859                    DataSource::Ingestion(ingestion)
2860                }
2861                DataSourceDesc::OldSyntaxIngestion {
2862                    desc,
2863                    progress_subsource,
2864                    data_config,
2865                    details,
2866                    cluster_id,
2867                } => {
2868                    let desc = desc.into_inline_connection(catalog.state());
2869                    let data_config = data_config.into_inline_connection(catalog.state());
2870                    // TODO(parkmycar): We should probably check the type here, but I'm not sure if
2871                    // this will always be a Source or a Table.
2872                    let progress_subsource =
2873                        catalog.get_entry(&progress_subsource).latest_global_id();
2874                    let mut ingestion =
2875                        IngestionDescription::new(desc, cluster_id, progress_subsource);
2876                    let legacy_export = SourceExport {
2877                        storage_metadata: (),
2878                        data_config,
2879                        details,
2880                    };
2881                    ingestion.source_exports.insert(object_id, legacy_export);
2882
2883                    DataSource::Ingestion(ingestion)
2884                }
2885                DataSourceDesc::IngestionExport {
2886                    ingestion_id,
2887                    external_reference: _,
2888                    details,
2889                    data_config,
2890                } => {
2891                    // TODO(parkmycar): We should probably check the type here, but I'm not sure if
2892                    // this will always be a Source or a Table.
2893                    let ingestion_id = catalog.get_entry(&ingestion_id).latest_global_id();
2894
2895                    DataSource::IngestionExport {
2896                        ingestion_id,
2897                        details,
2898                        data_config: data_config.into_inline_connection(catalog.state()),
2899                    }
2900                }
2901                DataSourceDesc::Webhook { .. } => DataSource::Webhook,
2902                DataSourceDesc::Progress => DataSource::Progress,
2903                DataSourceDesc::Introspection(introspection) => {
2904                    DataSource::Introspection(introspection)
2905                }
2906                DataSourceDesc::Catalog => DataSource::Other,
2907            };
2908            CollectionDescription {
2909                desc: desc.clone(),
2910                data_source,
2911                since: None,
2912                timeline: Some(timeline.clone()),
2913                primary: None,
2914            }
2915        };
2916
2917        let mut compute_collections = vec![];
2918        let mut collections = vec![];
2919        let mut new_builtin_continual_tasks = vec![];
2920        for entry in catalog.entries() {
2921            match entry.item() {
2922                CatalogItem::Source(source) => {
2923                    collections.push((
2924                        source.global_id(),
2925                        source_desc(
2926                            source.global_id(),
2927                            &source.data_source,
2928                            &source.desc,
2929                            &source.timeline,
2930                        ),
2931                    ));
2932                }
2933                CatalogItem::Table(table) => {
2934                    match &table.data_source {
2935                        TableDataSource::TableWrites { defaults: _ } => {
2936                            let versions: BTreeMap<_, _> = table
2937                                .collection_descs()
2938                                .map(|(gid, version, desc)| (version, (gid, desc)))
2939                                .collect();
2940                            let collection_descs = versions.iter().map(|(version, (gid, desc))| {
2941                                let next_version = version.bump();
2942                                let primary_collection =
2943                                    versions.get(&next_version).map(|(gid, _desc)| gid).copied();
2944                                let mut collection_desc =
2945                                    CollectionDescription::for_table(desc.clone());
2946                                collection_desc.primary = primary_collection;
2947
2948                                (*gid, collection_desc)
2949                            });
2950                            collections.extend(collection_descs);
2951                        }
2952                        TableDataSource::DataSource {
2953                            desc: data_source_desc,
2954                            timeline,
2955                        } => {
2956                            // TODO(alter_table): Support versioning tables that read from sources.
2957                            soft_assert_eq_or_log!(table.collections.len(), 1);
2958                            let collection_descs =
2959                                table.collection_descs().map(|(gid, _version, desc)| {
2960                                    (
2961                                        gid,
2962                                        source_desc(
2963                                            entry.latest_global_id(),
2964                                            data_source_desc,
2965                                            &desc,
2966                                            timeline,
2967                                        ),
2968                                    )
2969                                });
2970                            collections.extend(collection_descs);
2971                        }
2972                    };
2973                }
2974                CatalogItem::MaterializedView(mv) => {
2975                    let collection_descs = mv.collection_descs().map(|(gid, _version, desc)| {
2976                        let collection_desc =
2977                            CollectionDescription::for_other(desc, mv.initial_as_of.clone());
2978                        (gid, collection_desc)
2979                    });
2980
2981                    collections.extend(collection_descs);
2982                    compute_collections.push((mv.global_id_writes(), mv.desc.latest()));
2983                }
2984                CatalogItem::ContinualTask(ct) => {
2985                    let collection_desc =
2986                        CollectionDescription::for_other(ct.desc.clone(), ct.initial_as_of.clone());
2987                    if ct.global_id().is_system() && collection_desc.since.is_none() {
2988                        // We need a non-0 since to make as_of selection work. Fill it in below with
2989                        // the `bootstrap_builtin_continual_tasks` call, which can only be run after
2990                        // `create_collections_for_bootstrap`.
2991                        new_builtin_continual_tasks.push((ct.global_id(), collection_desc));
2992                    } else {
2993                        compute_collections.push((ct.global_id(), ct.desc.clone()));
2994                        collections.push((ct.global_id(), collection_desc));
2995                    }
2996                }
2997                CatalogItem::Sink(sink) => {
2998                    let storage_sink_from_entry = self.catalog().get_entry_by_global_id(&sink.from);
2999                    let from_desc = storage_sink_from_entry
3000                        .relation_desc()
3001                        .expect("sinks can only be built on items with descs")
3002                        .into_owned();
3003                    let collection_desc = CollectionDescription {
3004                        // TODO(sinks): make generic once we have more than one sink type.
3005                        desc: KAFKA_PROGRESS_DESC.clone(),
3006                        data_source: DataSource::Sink {
3007                            desc: ExportDescription {
3008                                sink: StorageSinkDesc {
3009                                    from: sink.from,
3010                                    from_desc,
3011                                    connection: sink
3012                                        .connection
3013                                        .clone()
3014                                        .into_inline_connection(self.catalog().state()),
3015                                    envelope: sink.envelope,
3016                                    as_of: Antichain::from_elem(Timestamp::minimum()),
3017                                    with_snapshot: sink.with_snapshot,
3018                                    version: sink.version,
3019                                    from_storage_metadata: (),
3020                                    to_storage_metadata: (),
3021                                    commit_interval: sink.commit_interval,
3022                                },
3023                                instance_id: sink.cluster_id,
3024                            },
3025                        },
3026                        since: None,
3027                        timeline: None,
3028                        primary: None,
3029                    };
3030                    collections.push((sink.global_id, collection_desc));
3031                }
3032                _ => (),
3033            }
3034        }
3035
3036        let register_ts = if self.controller.read_only() {
3037            self.get_local_read_ts().await
3038        } else {
3039            // Getting a write timestamp bumps the write timestamp in the
3040            // oracle, which we're not allowed in read-only mode.
3041            self.get_local_write_ts().await.timestamp
3042        };
3043
3044        let storage_metadata = self.catalog.state().storage_metadata();
3045        let migrated_storage_collections = migrated_storage_collections
3046            .into_iter()
3047            .flat_map(|item_id| self.catalog.get_entry(item_id).global_ids())
3048            .collect();
3049
3050        // Before possibly creating collections, make sure their schemas are correct.
3051        //
3052        // Across different versions of Materialize the nullability of columns can change based on
3053        // updates to our optimizer.
3054        self.controller
3055            .storage
3056            .evolve_nullability_for_bootstrap(storage_metadata, compute_collections)
3057            .await
3058            .unwrap_or_terminate("cannot fail to evolve collections");
3059
3060        self.controller
3061            .storage
3062            .create_collections_for_bootstrap(
3063                storage_metadata,
3064                Some(register_ts),
3065                collections,
3066                &migrated_storage_collections,
3067            )
3068            .await
3069            .unwrap_or_terminate("cannot fail to create collections");
3070
3071        self.bootstrap_builtin_continual_tasks(new_builtin_continual_tasks)
3072            .await;
3073
3074        if !self.controller.read_only() {
3075            self.apply_local_write(register_ts).await;
3076        }
3077    }
3078
3079    /// Make as_of selection happy for builtin CTs. Ideally we'd write the
3080    /// initial as_of down in the durable catalog, but that's hard because of
3081    /// boot ordering. Instead, we set the since of the storage collection to
3082    /// something that's a reasonable lower bound for the as_of. Then, if the
3083    /// upper is 0, the as_of selection code will allow us to jump it forward to
3084    /// this since.
3085    async fn bootstrap_builtin_continual_tasks(
3086        &mut self,
3087        // TODO(alter_table): Switch to CatalogItemId.
3088        mut collections: Vec<(GlobalId, CollectionDescription<Timestamp>)>,
3089    ) {
3090        for (id, collection) in &mut collections {
3091            let entry = self.catalog.get_entry_by_global_id(id);
3092            let ct = match &entry.item {
3093                CatalogItem::ContinualTask(ct) => ct.clone(),
3094                _ => unreachable!("only called with continual task builtins"),
3095            };
3096            let debug_name = self
3097                .catalog()
3098                .resolve_full_name(entry.name(), None)
3099                .to_string();
3100            let (_optimized_plan, physical_plan, _metainfo, _optimizer_features) = self
3101                .optimize_create_continual_task(&ct, *id, self.owned_catalog(), debug_name)
3102                .expect("builtin CT should optimize successfully");
3103
3104            // Determine an as of for the new continual task.
3105            let mut id_bundle = dataflow_import_id_bundle(&physical_plan, ct.cluster_id);
3106            // Can't acquire a read hold on ourselves because we don't exist yet.
3107            id_bundle.storage_ids.remove(id);
3108            let read_holds = self.acquire_read_holds(&id_bundle);
3109            let as_of = read_holds.least_valid_read();
3110
3111            collection.since = Some(as_of.clone());
3112        }
3113        self.controller
3114            .storage
3115            .create_collections(self.catalog.state().storage_metadata(), None, collections)
3116            .await
3117            .unwrap_or_terminate("cannot fail to create collections");
3118    }
3119
3120    /// Returns the current list of catalog entries, sorted into an appropriate order for
3121    /// bootstrapping.
3122    ///
3123    /// The returned entries are in dependency order. Indexes are sorted immediately after the
3124    /// objects they index, to ensure that all dependants of these indexed objects can make use of
3125    /// the respective indexes.
3126    fn bootstrap_sort_catalog_entries(&self) -> Vec<CatalogEntry> {
3127        let mut indexes_on = BTreeMap::<_, Vec<_>>::new();
3128        let mut non_indexes = Vec::new();
3129        for entry in self.catalog().entries().cloned() {
3130            if let Some(index) = entry.index() {
3131                let on = self.catalog().get_entry_by_global_id(&index.on);
3132                indexes_on.entry(on.id()).or_default().push(entry);
3133            } else {
3134                non_indexes.push(entry);
3135            }
3136        }
3137
3138        let key_fn = |entry: &CatalogEntry| entry.id;
3139        let dependencies_fn = |entry: &CatalogEntry| entry.uses();
3140        sort_topological(&mut non_indexes, key_fn, dependencies_fn);
3141
3142        let mut result = Vec::new();
3143        for entry in non_indexes {
3144            let id = entry.id();
3145            result.push(entry);
3146            if let Some(mut indexes) = indexes_on.remove(&id) {
3147                result.append(&mut indexes);
3148            }
3149        }
3150
3151        soft_assert_or_log!(
3152            indexes_on.is_empty(),
3153            "indexes with missing dependencies: {indexes_on:?}",
3154        );
3155
3156        result
3157    }
3158
3159    /// Invokes the optimizer on all indexes and materialized views in the catalog and inserts the
3160    /// resulting dataflow plans into the catalog state.
3161    ///
3162    /// `ordered_catalog_entries` must be sorted in dependency order, with dependencies ordered
3163    /// before their dependants.
3164    ///
3165    /// This method does not perform timestamp selection for the dataflows, nor does it create them
3166    /// in the compute controller. Both of these steps happen later during bootstrapping.
3167    ///
3168    /// Returns a map of expressions that were not cached.
3169    #[instrument]
3170    fn bootstrap_dataflow_plans(
3171        &mut self,
3172        ordered_catalog_entries: &[CatalogEntry],
3173        mut cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
3174    ) -> Result<BTreeMap<GlobalId, GlobalExpressions>, AdapterError> {
3175        // The optimizer expects to be able to query its `ComputeInstanceSnapshot` for
3176        // collections the current dataflow can depend on. But since we don't yet install anything
3177        // on compute instances, the snapshot information is incomplete. We fix that by manually
3178        // updating `ComputeInstanceSnapshot` objects to ensure they contain collections previously
3179        // optimized.
3180        let mut instance_snapshots = BTreeMap::new();
3181        let mut uncached_expressions = BTreeMap::new();
3182
3183        let optimizer_config = |catalog: &Catalog, cluster_id| {
3184            let system_config = catalog.system_config();
3185            let overrides = catalog.get_cluster(cluster_id).config.features();
3186            OptimizerConfig::from(system_config).override_from(&overrides)
3187        };
3188
3189        for entry in ordered_catalog_entries {
3190            match entry.item() {
3191                CatalogItem::Index(idx) => {
3192                    // Collect optimizer parameters.
3193                    let compute_instance =
3194                        instance_snapshots.entry(idx.cluster_id).or_insert_with(|| {
3195                            self.instance_snapshot(idx.cluster_id)
3196                                .expect("compute instance exists")
3197                        });
3198                    let global_id = idx.global_id();
3199
3200                    // The index may already be installed on the compute instance. For example,
3201                    // this is the case for introspection indexes.
3202                    if compute_instance.contains_collection(&global_id) {
3203                        continue;
3204                    }
3205
3206                    let optimizer_config = optimizer_config(&self.catalog, idx.cluster_id);
3207
3208                    let (optimized_plan, physical_plan, metainfo) =
3209                        match cached_global_exprs.remove(&global_id) {
3210                            Some(global_expressions)
3211                                if global_expressions.optimizer_features
3212                                    == optimizer_config.features =>
3213                            {
3214                                debug!("global expression cache hit for {global_id:?}");
3215                                (
3216                                    global_expressions.global_mir,
3217                                    global_expressions.physical_plan,
3218                                    global_expressions.dataflow_metainfos,
3219                                )
3220                            }
3221                            Some(_) | None => {
3222                                let (optimized_plan, global_lir_plan) = {
3223                                    // Build an optimizer for this INDEX.
3224                                    let mut optimizer = optimize::index::Optimizer::new(
3225                                        self.owned_catalog(),
3226                                        compute_instance.clone(),
3227                                        global_id,
3228                                        optimizer_config.clone(),
3229                                        self.optimizer_metrics(),
3230                                    );
3231
3232                                    // MIR ⇒ MIR optimization (global)
3233                                    let index_plan = optimize::index::Index::new(
3234                                        entry.name().clone(),
3235                                        idx.on,
3236                                        idx.keys.to_vec(),
3237                                    );
3238                                    let global_mir_plan = optimizer.optimize(index_plan)?;
3239                                    let optimized_plan = global_mir_plan.df_desc().clone();
3240
3241                                    // MIR ⇒ LIR lowering and LIR ⇒ LIR optimization (global)
3242                                    let global_lir_plan = optimizer.optimize(global_mir_plan)?;
3243
3244                                    (optimized_plan, global_lir_plan)
3245                                };
3246
3247                                let (physical_plan, metainfo) = global_lir_plan.unapply();
3248                                let metainfo = {
3249                                    // Pre-allocate a vector of transient GlobalIds for each notice.
3250                                    let notice_ids =
3251                                        std::iter::repeat_with(|| self.allocate_transient_id())
3252                                            .map(|(_item_id, gid)| gid)
3253                                            .take(metainfo.optimizer_notices.len())
3254                                            .collect::<Vec<_>>();
3255                                    // Return a metainfo with rendered notices.
3256                                    self.catalog().render_notices(
3257                                        metainfo,
3258                                        notice_ids,
3259                                        Some(idx.global_id()),
3260                                    )
3261                                };
3262                                uncached_expressions.insert(
3263                                    global_id,
3264                                    GlobalExpressions {
3265                                        global_mir: optimized_plan.clone(),
3266                                        physical_plan: physical_plan.clone(),
3267                                        dataflow_metainfos: metainfo.clone(),
3268                                        optimizer_features: optimizer_config.features.clone(),
3269                                    },
3270                                );
3271                                (optimized_plan, physical_plan, metainfo)
3272                            }
3273                        };
3274
3275                    let catalog = self.catalog_mut();
3276                    catalog.set_optimized_plan(idx.global_id(), optimized_plan);
3277                    catalog.set_physical_plan(idx.global_id(), physical_plan);
3278                    catalog.set_dataflow_metainfo(idx.global_id(), metainfo);
3279
3280                    compute_instance.insert_collection(idx.global_id());
3281                }
3282                CatalogItem::MaterializedView(mv) => {
3283                    // Collect optimizer parameters.
3284                    let compute_instance =
3285                        instance_snapshots.entry(mv.cluster_id).or_insert_with(|| {
3286                            self.instance_snapshot(mv.cluster_id)
3287                                .expect("compute instance exists")
3288                        });
3289                    let global_id = mv.global_id_writes();
3290
3291                    let optimizer_config = optimizer_config(&self.catalog, mv.cluster_id);
3292
3293                    let (optimized_plan, physical_plan, metainfo) =
3294                        match cached_global_exprs.remove(&global_id) {
3295                            Some(global_expressions)
3296                                if global_expressions.optimizer_features
3297                                    == optimizer_config.features =>
3298                            {
3299                                debug!("global expression cache hit for {global_id:?}");
3300                                (
3301                                    global_expressions.global_mir,
3302                                    global_expressions.physical_plan,
3303                                    global_expressions.dataflow_metainfos,
3304                                )
3305                            }
3306                            Some(_) | None => {
3307                                let (_, internal_view_id) = self.allocate_transient_id();
3308                                let debug_name = self
3309                                    .catalog()
3310                                    .resolve_full_name(entry.name(), None)
3311                                    .to_string();
3312                                let force_non_monotonic = Default::default();
3313
3314                                let (optimized_plan, global_lir_plan) = {
3315                                    // Build an optimizer for this MATERIALIZED VIEW.
3316                                    let mut optimizer = optimize::materialized_view::Optimizer::new(
3317                                        self.owned_catalog().as_optimizer_catalog(),
3318                                        compute_instance.clone(),
3319                                        global_id,
3320                                        internal_view_id,
3321                                        mv.desc.latest().iter_names().cloned().collect(),
3322                                        mv.non_null_assertions.clone(),
3323                                        mv.refresh_schedule.clone(),
3324                                        debug_name,
3325                                        optimizer_config.clone(),
3326                                        self.optimizer_metrics(),
3327                                        force_non_monotonic,
3328                                    );
3329
3330                                    // MIR ⇒ MIR optimization (global)
3331                                    // We make sure to use the HIR SQL type (since MIR SQL types may not be coherent).
3332                                    let typ = infer_sql_type_for_catalog(
3333                                        &mv.raw_expr,
3334                                        &mv.optimized_expr.as_ref().clone(),
3335                                    );
3336                                    let global_mir_plan = optimizer
3337                                        .optimize((mv.optimized_expr.as_ref().clone(), typ))?;
3338                                    let optimized_plan = global_mir_plan.df_desc().clone();
3339
3340                                    // MIR ⇒ LIR lowering and LIR ⇒ LIR optimization (global)
3341                                    let global_lir_plan = optimizer.optimize(global_mir_plan)?;
3342
3343                                    (optimized_plan, global_lir_plan)
3344                                };
3345
3346                                let (physical_plan, metainfo) = global_lir_plan.unapply();
3347                                let metainfo = {
3348                                    // Pre-allocate a vector of transient GlobalIds for each notice.
3349                                    let notice_ids =
3350                                        std::iter::repeat_with(|| self.allocate_transient_id())
3351                                            .map(|(_item_id, global_id)| global_id)
3352                                            .take(metainfo.optimizer_notices.len())
3353                                            .collect::<Vec<_>>();
3354                                    // Return a metainfo with rendered notices.
3355                                    self.catalog().render_notices(
3356                                        metainfo,
3357                                        notice_ids,
3358                                        Some(mv.global_id_writes()),
3359                                    )
3360                                };
3361                                uncached_expressions.insert(
3362                                    global_id,
3363                                    GlobalExpressions {
3364                                        global_mir: optimized_plan.clone(),
3365                                        physical_plan: physical_plan.clone(),
3366                                        dataflow_metainfos: metainfo.clone(),
3367                                        optimizer_features: optimizer_config.features.clone(),
3368                                    },
3369                                );
3370                                (optimized_plan, physical_plan, metainfo)
3371                            }
3372                        };
3373
3374                    let catalog = self.catalog_mut();
3375                    catalog.set_optimized_plan(mv.global_id_writes(), optimized_plan);
3376                    catalog.set_physical_plan(mv.global_id_writes(), physical_plan);
3377                    catalog.set_dataflow_metainfo(mv.global_id_writes(), metainfo);
3378
3379                    compute_instance.insert_collection(mv.global_id_writes());
3380                }
3381                CatalogItem::ContinualTask(ct) => {
3382                    let compute_instance =
3383                        instance_snapshots.entry(ct.cluster_id).or_insert_with(|| {
3384                            self.instance_snapshot(ct.cluster_id)
3385                                .expect("compute instance exists")
3386                        });
3387                    let global_id = ct.global_id();
3388
3389                    let optimizer_config = optimizer_config(&self.catalog, ct.cluster_id);
3390
3391                    let (optimized_plan, physical_plan, metainfo) =
3392                        match cached_global_exprs.remove(&global_id) {
3393                            Some(global_expressions)
3394                                if global_expressions.optimizer_features
3395                                    == optimizer_config.features =>
3396                            {
3397                                debug!("global expression cache hit for {global_id:?}");
3398                                (
3399                                    global_expressions.global_mir,
3400                                    global_expressions.physical_plan,
3401                                    global_expressions.dataflow_metainfos,
3402                                )
3403                            }
3404                            Some(_) | None => {
3405                                let debug_name = self
3406                                    .catalog()
3407                                    .resolve_full_name(entry.name(), None)
3408                                    .to_string();
3409                                let (optimized_plan, physical_plan, metainfo, optimizer_features) =
3410                                    self.optimize_create_continual_task(
3411                                        ct,
3412                                        global_id,
3413                                        self.owned_catalog(),
3414                                        debug_name,
3415                                    )?;
3416                                uncached_expressions.insert(
3417                                    global_id,
3418                                    GlobalExpressions {
3419                                        global_mir: optimized_plan.clone(),
3420                                        physical_plan: physical_plan.clone(),
3421                                        dataflow_metainfos: metainfo.clone(),
3422                                        optimizer_features,
3423                                    },
3424                                );
3425                                (optimized_plan, physical_plan, metainfo)
3426                            }
3427                        };
3428
3429                    let catalog = self.catalog_mut();
3430                    catalog.set_optimized_plan(ct.global_id(), optimized_plan);
3431                    catalog.set_physical_plan(ct.global_id(), physical_plan);
3432                    catalog.set_dataflow_metainfo(ct.global_id(), metainfo);
3433
3434                    compute_instance.insert_collection(ct.global_id());
3435                }
3436                _ => (),
3437            }
3438        }
3439
3440        Ok(uncached_expressions)
3441    }
3442
3443    /// Selects for each compute dataflow an as-of suitable for bootstrapping it.
3444    ///
3445    /// Returns a set of [`ReadHold`]s that ensures the read frontiers of involved collections stay
3446    /// in place and that must not be dropped before all compute dataflows have been created with
3447    /// the compute controller.
3448    ///
3449    /// This method expects all storage collections and dataflow plans to be available, so it must
3450    /// run after [`Coordinator::bootstrap_storage_collections`] and
3451    /// [`Coordinator::bootstrap_dataflow_plans`].
3452    async fn bootstrap_dataflow_as_ofs(&mut self) -> BTreeMap<GlobalId, ReadHold<Timestamp>> {
3453        let mut catalog_ids = Vec::new();
3454        let mut dataflows = Vec::new();
3455        let mut read_policies = BTreeMap::new();
3456        for entry in self.catalog.entries() {
3457            let gid = match entry.item() {
3458                CatalogItem::Index(idx) => idx.global_id(),
3459                CatalogItem::MaterializedView(mv) => mv.global_id_writes(),
3460                CatalogItem::ContinualTask(ct) => ct.global_id(),
3461                CatalogItem::Table(_)
3462                | CatalogItem::Source(_)
3463                | CatalogItem::Log(_)
3464                | CatalogItem::View(_)
3465                | CatalogItem::Sink(_)
3466                | CatalogItem::Type(_)
3467                | CatalogItem::Func(_)
3468                | CatalogItem::Secret(_)
3469                | CatalogItem::Connection(_) => continue,
3470            };
3471            if let Some(plan) = self.catalog.try_get_physical_plan(&gid) {
3472                catalog_ids.push(gid);
3473                dataflows.push(plan.clone());
3474
3475                if let Some(compaction_window) = entry.item().initial_logical_compaction_window() {
3476                    read_policies.insert(gid, compaction_window.into());
3477                }
3478            }
3479        }
3480
3481        let read_ts = self.get_local_read_ts().await;
3482        let read_holds = as_of_selection::run(
3483            &mut dataflows,
3484            &read_policies,
3485            &*self.controller.storage_collections,
3486            read_ts,
3487            self.controller.read_only(),
3488        );
3489
3490        let catalog = self.catalog_mut();
3491        for (id, plan) in catalog_ids.into_iter().zip_eq(dataflows) {
3492            catalog.set_physical_plan(id, plan);
3493        }
3494
3495        read_holds
3496    }
3497
3498    /// Serves the coordinator, receiving commands from users over `cmd_rx`
3499    /// and feedback from dataflow workers over `feedback_rx`.
3500    ///
3501    /// You must call `bootstrap` before calling this method.
3502    ///
3503    /// BOXED FUTURE: As of Nov 2023 the returned Future from this function was 92KB. This would
3504    /// get stored on the stack which is bad for runtime performance, and blow up our stack usage.
3505    /// Because of that we purposefully move this Future onto the heap (i.e. Box it).
3506    fn serve(
3507        mut self,
3508        mut internal_cmd_rx: mpsc::UnboundedReceiver<Message>,
3509        mut strict_serializable_reads_rx: mpsc::UnboundedReceiver<(ConnectionId, PendingReadTxn)>,
3510        mut cmd_rx: mpsc::UnboundedReceiver<(OpenTelemetryContext, Command)>,
3511        group_commit_rx: appends::GroupCommitWaiter,
3512    ) -> LocalBoxFuture<'static, ()> {
3513        async move {
3514            // Watcher that listens for and reports cluster service status changes.
3515            let mut cluster_events = self.controller.events_stream();
3516            let last_message = Arc::new(Mutex::new(LastMessage {
3517                kind: "none",
3518                stmt: None,
3519            }));
3520
3521            let (idle_tx, mut idle_rx) = tokio::sync::mpsc::channel(1);
3522            let idle_metric = self.metrics.queue_busy_seconds.clone();
3523            let last_message_watchdog = Arc::clone(&last_message);
3524
3525            spawn(|| "coord watchdog", async move {
3526                // Every 5 seconds, attempt to measure how long it takes for the
3527                // coord select loop to be empty, because this message is the last
3528                // processed. If it is idle, this will result in some microseconds
3529                // of measurement.
3530                let mut interval = tokio::time::interval(Duration::from_secs(5));
3531                // If we end up having to wait more than 5 seconds for the coord to respond, then the
3532                // behavior of Delay results in the interval "restarting" from whenever we yield
3533                // instead of trying to catch up.
3534                interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
3535
3536                // Track if we become stuck to de-dupe error reporting.
3537                let mut coord_stuck = false;
3538
3539                loop {
3540                    interval.tick().await;
3541
3542                    // Wait for space in the channel, if we timeout then the coordinator is stuck!
3543                    let duration = tokio::time::Duration::from_secs(30);
3544                    let timeout = tokio::time::timeout(duration, idle_tx.reserve()).await;
3545                    let Ok(maybe_permit) = timeout else {
3546                        // Only log if we're newly stuck, to prevent logging repeatedly.
3547                        if !coord_stuck {
3548                            let last_message = last_message_watchdog.lock().expect("poisoned");
3549                            tracing::warn!(
3550                                last_message_kind = %last_message.kind,
3551                                last_message_sql = %last_message.stmt_to_string(),
3552                                "coordinator stuck for {duration:?}",
3553                            );
3554                        }
3555                        coord_stuck = true;
3556
3557                        continue;
3558                    };
3559
3560                    // We got a permit, we're not stuck!
3561                    if coord_stuck {
3562                        tracing::info!("Coordinator became unstuck");
3563                    }
3564                    coord_stuck = false;
3565
3566                    // If we failed to acquire a permit it's because we're shutting down.
3567                    let Ok(permit) = maybe_permit else {
3568                        break;
3569                    };
3570
3571                    permit.send(idle_metric.start_timer());
3572                }
3573            });
3574
3575            self.schedule_storage_usage_collection().await;
3576            self.spawn_privatelink_vpc_endpoints_watch_task();
3577            self.spawn_statement_logging_task();
3578            flags::tracing_config(self.catalog.system_config()).apply(&self.tracing_handle);
3579
3580            // Report if the handling of a single message takes longer than this threshold.
3581            let warn_threshold = self
3582                .catalog()
3583                .system_config()
3584                .coord_slow_message_warn_threshold();
3585
3586            // How many messages we'd like to batch up before processing them. Must be > 0.
3587            const MESSAGE_BATCH: usize = 64;
3588            let mut messages = Vec::with_capacity(MESSAGE_BATCH);
3589            let mut cmd_messages = Vec::with_capacity(MESSAGE_BATCH);
3590
3591            let message_batch = self.metrics.message_batch.clone();
3592
3593            loop {
3594                // Before adding a branch to this select loop, please ensure that the branch is
3595                // cancellation safe and add a comment explaining why. You can refer here for more
3596                // info: https://docs.rs/tokio/latest/tokio/macro.select.html#cancellation-safety
3597                select! {
3598                    // We prioritize internal commands over other commands. However, we work through
3599                    // batches of commands in some branches of this select, which means that even if
3600                    // a command generates internal commands, we will work through the current batch
3601                    // before receiving a new batch of commands.
3602                    biased;
3603
3604                    // `recv_many()` on `UnboundedReceiver` is cancellation safe:
3605                    // https://docs.rs/tokio/1.38.0/tokio/sync/mpsc/struct.UnboundedReceiver.html#cancel-safety-1
3606                    // Receive a batch of commands.
3607                    _ = internal_cmd_rx.recv_many(&mut messages, MESSAGE_BATCH) => {},
3608                    // `next()` on any stream is cancel-safe:
3609                    // https://docs.rs/tokio-stream/0.1.9/tokio_stream/trait.StreamExt.html#cancel-safety
3610                    // Receive a single command.
3611                    Some(event) = cluster_events.next() => {
3612                        messages.push(Message::ClusterEvent(event))
3613                    },
3614                    // See [`mz_controller::Controller::Controller::ready`] for notes
3615                    // on why this is cancel-safe.
3616                    // Receive a single command.
3617                    () = self.controller.ready() => {
3618                        // NOTE: We don't get a `Readiness` back from `ready()`
3619                        // because the controller wants to keep it and it's not
3620                        // trivially `Clone` or `Copy`. Hence this accessor.
3621                        let controller = match self.controller.get_readiness() {
3622                            Readiness::Storage => ControllerReadiness::Storage,
3623                            Readiness::Compute => ControllerReadiness::Compute,
3624                            Readiness::Metrics(_) => ControllerReadiness::Metrics,
3625                            Readiness::Internal(_) => ControllerReadiness::Internal,
3626                            Readiness::NotReady => unreachable!("just signaled as ready"),
3627                        };
3628                        messages.push(Message::ControllerReady { controller });
3629                    }
3630                    // See [`appends::GroupCommitWaiter`] for notes on why this is cancel safe.
3631                    // Receive a single command.
3632                    permit = group_commit_rx.ready() => {
3633                        // If we happen to have batched exactly one user write, use
3634                        // that span so the `emit_trace_id_notice` hooks up.
3635                        // Otherwise, the best we can do is invent a new root span
3636                        // and make it follow from all the Spans in the pending
3637                        // writes.
3638                        let user_write_spans = self.pending_writes.iter().flat_map(|x| match x {
3639                            PendingWriteTxn::User{span, ..} => Some(span),
3640                            PendingWriteTxn::System{..} => None,
3641                        });
3642                        let span = match user_write_spans.exactly_one() {
3643                            Ok(span) => span.clone(),
3644                            Err(user_write_spans) => {
3645                                let span = info_span!(parent: None, "group_commit_notify");
3646                                for s in user_write_spans {
3647                                    span.follows_from(s);
3648                                }
3649                                span
3650                            }
3651                        };
3652                        messages.push(Message::GroupCommitInitiate(span, Some(permit)));
3653                    },
3654                    // `recv_many()` on `UnboundedReceiver` is cancellation safe:
3655                    // https://docs.rs/tokio/1.38.0/tokio/sync/mpsc/struct.UnboundedReceiver.html#cancel-safety-1
3656                    // Receive a batch of commands.
3657                    count = cmd_rx.recv_many(&mut cmd_messages, MESSAGE_BATCH) => {
3658                        if count == 0 {
3659                            break;
3660                        } else {
3661                            messages.extend(cmd_messages.drain(..).map(
3662                                |(otel_ctx, cmd)| Message::Command(otel_ctx, cmd),
3663                            ));
3664                        }
3665                    },
3666                    // `recv()` on `UnboundedReceiver` is cancellation safe:
3667                    // https://docs.rs/tokio/1.38.0/tokio/sync/mpsc/struct.UnboundedReceiver.html#cancel-safety
3668                    // Receive a single command.
3669                    Some(pending_read_txn) = strict_serializable_reads_rx.recv() => {
3670                        let mut pending_read_txns = vec![pending_read_txn];
3671                        while let Ok(pending_read_txn) = strict_serializable_reads_rx.try_recv() {
3672                            pending_read_txns.push(pending_read_txn);
3673                        }
3674                        for (conn_id, pending_read_txn) in pending_read_txns {
3675                            let prev = self
3676                                .pending_linearize_read_txns
3677                                .insert(conn_id, pending_read_txn);
3678                            soft_assert_or_log!(
3679                                prev.is_none(),
3680                                "connections can not have multiple concurrent reads, prev: {prev:?}"
3681                            )
3682                        }
3683                        messages.push(Message::LinearizeReads);
3684                    }
3685                    // `tick()` on `Interval` is cancel-safe:
3686                    // https://docs.rs/tokio/1.19.2/tokio/time/struct.Interval.html#cancel-safety
3687                    // Receive a single command.
3688                    _ = self.advance_timelines_interval.tick() => {
3689                        let span = info_span!(parent: None, "coord::advance_timelines_interval");
3690                        span.follows_from(Span::current());
3691
3692                        // Group commit sends an `AdvanceTimelines` message when
3693                        // done, which is what downgrades read holds. In
3694                        // read-only mode we send this message directly because
3695                        // we're not doing group commits.
3696                        if self.controller.read_only() {
3697                            messages.push(Message::AdvanceTimelines);
3698                        } else {
3699                            messages.push(Message::GroupCommitInitiate(span, None));
3700                        }
3701                    },
3702                    // `tick()` on `Interval` is cancel-safe:
3703                    // https://docs.rs/tokio/1.19.2/tokio/time/struct.Interval.html#cancel-safety
3704                    // Receive a single command.
3705                    _ = self.check_cluster_scheduling_policies_interval.tick() => {
3706                        messages.push(Message::CheckSchedulingPolicies);
3707                    },
3708
3709                    // `tick()` on `Interval` is cancel-safe:
3710                    // https://docs.rs/tokio/1.19.2/tokio/time/struct.Interval.html#cancel-safety
3711                    // Receive a single command.
3712                    _ = self.caught_up_check_interval.tick() => {
3713                        // We do this directly on the main loop instead of
3714                        // firing off a message. We are still in read-only mode,
3715                        // so optimizing for latency, not blocking the main loop
3716                        // is not that important.
3717                        self.maybe_check_caught_up().await;
3718
3719                        continue;
3720                    },
3721
3722                    // Process the idle metric at the lowest priority to sample queue non-idle time.
3723                    // `recv()` on `Receiver` is cancellation safe:
3724                    // https://docs.rs/tokio/1.8.0/tokio/sync/mpsc/struct.Receiver.html#cancel-safety
3725                    // Receive a single command.
3726                    timer = idle_rx.recv() => {
3727                        timer.expect("does not drop").observe_duration();
3728                        self.metrics
3729                            .message_handling
3730                            .with_label_values(&["watchdog"])
3731                            .observe(0.0);
3732                        continue;
3733                    }
3734                };
3735
3736                // Observe the number of messages we're processing at once.
3737                message_batch.observe(f64::cast_lossy(messages.len()));
3738
3739                for msg in messages.drain(..) {
3740                    // All message processing functions trace. Start a parent span
3741                    // for them to make it easy to find slow messages.
3742                    let msg_kind = msg.kind();
3743                    let span = span!(
3744                        target: "mz_adapter::coord::handle_message_loop",
3745                        Level::INFO,
3746                        "coord::handle_message",
3747                        kind = msg_kind
3748                    );
3749                    let otel_context = span.context().span().span_context().clone();
3750
3751                    // Record the last kind of message in case we get stuck. For
3752                    // execute commands, we additionally stash the user's SQL,
3753                    // statement, so we can log it in case we get stuck.
3754                    *last_message.lock().expect("poisoned") = LastMessage {
3755                        kind: msg_kind,
3756                        stmt: match &msg {
3757                            Message::Command(
3758                                _,
3759                                Command::Execute {
3760                                    portal_name,
3761                                    session,
3762                                    ..
3763                                },
3764                            ) => session
3765                                .get_portal_unverified(portal_name)
3766                                .and_then(|p| p.stmt.as_ref().map(Arc::clone)),
3767                            _ => None,
3768                        },
3769                    };
3770
3771                    let start = Instant::now();
3772                    self.handle_message(msg).instrument(span).await;
3773                    let duration = start.elapsed();
3774
3775                    self.metrics
3776                        .message_handling
3777                        .with_label_values(&[msg_kind])
3778                        .observe(duration.as_secs_f64());
3779
3780                    // If something is _really_ slow, print a trace id for debugging, if OTEL is enabled.
3781                    if duration > warn_threshold {
3782                        let trace_id = otel_context.is_valid().then(|| otel_context.trace_id());
3783                        tracing::error!(
3784                            ?msg_kind,
3785                            ?trace_id,
3786                            ?duration,
3787                            "very slow coordinator message"
3788                        );
3789                    }
3790                }
3791            }
3792            // Try and cleanup as a best effort. There may be some async tasks out there holding a
3793            // reference that prevents us from cleaning up.
3794            if let Some(catalog) = Arc::into_inner(self.catalog) {
3795                catalog.expire().await;
3796            }
3797        }
3798        .boxed_local()
3799    }
3800
3801    /// Obtain a read-only Catalog reference.
3802    fn catalog(&self) -> &Catalog {
3803        &self.catalog
3804    }
3805
3806    /// Obtain a read-only Catalog snapshot, suitable for giving out to
3807    /// non-Coordinator thread tasks.
3808    fn owned_catalog(&self) -> Arc<Catalog> {
3809        Arc::clone(&self.catalog)
3810    }
3811
3812    /// Obtain a handle to the optimizer metrics, suitable for giving
3813    /// out to non-Coordinator thread tasks.
3814    fn optimizer_metrics(&self) -> OptimizerMetrics {
3815        self.optimizer_metrics.clone()
3816    }
3817
3818    /// Obtain a writeable Catalog reference.
3819    fn catalog_mut(&mut self) -> &mut Catalog {
3820        // make_mut will cause any other Arc references (from owned_catalog) to
3821        // continue to be valid by cloning the catalog, putting it in a new Arc,
3822        // which lives at self._catalog. If there are no other Arc references,
3823        // then no clone is made, and it returns a reference to the existing
3824        // object. This makes this method and owned_catalog both very cheap: at
3825        // most one clone per catalog mutation, but only if there's a read-only
3826        // reference to it.
3827        Arc::make_mut(&mut self.catalog)
3828    }
3829
3830    /// Refills the user ID pool by allocating IDs from the catalog.
3831    ///
3832    /// Requests `max(min_count, batch_size)` IDs so the pool is never
3833    /// under-filled relative to the configured batch size.
3834    async fn refill_user_id_pool(&mut self, min_count: u64) -> Result<(), AdapterError> {
3835        let batch_size = USER_ID_POOL_BATCH_SIZE.get(self.catalog().system_config().dyncfgs());
3836        let to_allocate = min_count.max(u64::from(batch_size));
3837        let id_ts = self.get_catalog_write_ts().await;
3838        let ids = self.catalog().allocate_user_ids(to_allocate, id_ts).await?;
3839        if let (Some((first_id, _)), Some((last_id, _))) = (ids.first(), ids.last()) {
3840            let start = match first_id {
3841                CatalogItemId::User(id) => *id,
3842                other => {
3843                    return Err(AdapterError::Internal(format!(
3844                        "expected User CatalogItemId, got {other:?}"
3845                    )));
3846                }
3847            };
3848            let end = match last_id {
3849                CatalogItemId::User(id) => *id + 1, // exclusive upper bound
3850                other => {
3851                    return Err(AdapterError::Internal(format!(
3852                        "expected User CatalogItemId, got {other:?}"
3853                    )));
3854                }
3855            };
3856            self.user_id_pool.refill(start, end);
3857        } else {
3858            return Err(AdapterError::Internal(
3859                "catalog returned no user IDs".into(),
3860            ));
3861        }
3862        Ok(())
3863    }
3864
3865    /// Allocates a single user ID, refilling the pool from the catalog if needed.
3866    async fn allocate_user_id(&mut self) -> Result<(CatalogItemId, GlobalId), AdapterError> {
3867        if let Some(id) = self.user_id_pool.allocate() {
3868            return Ok((CatalogItemId::User(id), GlobalId::User(id)));
3869        }
3870        self.refill_user_id_pool(1).await?;
3871        let id = self.user_id_pool.allocate().expect("ID pool just refilled");
3872        Ok((CatalogItemId::User(id), GlobalId::User(id)))
3873    }
3874
3875    /// Allocates `count` user IDs, refilling the pool from the catalog if needed.
3876    async fn allocate_user_ids(
3877        &mut self,
3878        count: u64,
3879    ) -> Result<Vec<(CatalogItemId, GlobalId)>, AdapterError> {
3880        if self.user_id_pool.remaining() < count {
3881            self.refill_user_id_pool(count).await?;
3882        }
3883        let raw_ids = self
3884            .user_id_pool
3885            .allocate_many(count)
3886            .expect("pool has enough IDs after refill");
3887        Ok(raw_ids
3888            .into_iter()
3889            .map(|id| (CatalogItemId::User(id), GlobalId::User(id)))
3890            .collect())
3891    }
3892
3893    /// Obtain a reference to the coordinator's connection context.
3894    fn connection_context(&self) -> &ConnectionContext {
3895        self.controller.connection_context()
3896    }
3897
3898    /// Obtain a reference to the coordinator's secret reader, in an `Arc`.
3899    fn secrets_reader(&self) -> &Arc<dyn SecretsReader> {
3900        &self.connection_context().secrets_reader
3901    }
3902
3903    /// Publishes a notice message to all sessions.
3904    ///
3905    /// TODO(parkmycar): This code is dead, but is a nice parallel to [`Coordinator::broadcast_notice_tx`]
3906    /// so we keep it around.
3907    #[allow(dead_code)]
3908    pub(crate) fn broadcast_notice(&self, notice: AdapterNotice) {
3909        for meta in self.active_conns.values() {
3910            let _ = meta.notice_tx.send(notice.clone());
3911        }
3912    }
3913
3914    /// Returns a closure that will publish a notice to all sessions that were active at the time
3915    /// this method was called.
3916    pub(crate) fn broadcast_notice_tx(
3917        &self,
3918    ) -> Box<dyn FnOnce(AdapterNotice) -> () + Send + 'static> {
3919        let senders: Vec<_> = self
3920            .active_conns
3921            .values()
3922            .map(|meta| meta.notice_tx.clone())
3923            .collect();
3924        Box::new(move |notice| {
3925            for tx in senders {
3926                let _ = tx.send(notice.clone());
3927            }
3928        })
3929    }
3930
3931    pub(crate) fn active_conns(&self) -> &BTreeMap<ConnectionId, ConnMeta> {
3932        &self.active_conns
3933    }
3934
3935    #[instrument(level = "debug")]
3936    pub(crate) fn retire_execution(
3937        &mut self,
3938        reason: StatementEndedExecutionReason,
3939        ctx_extra: ExecuteContextExtra,
3940    ) {
3941        if let Some(uuid) = ctx_extra.retire() {
3942            self.end_statement_execution(uuid, reason);
3943        }
3944    }
3945
3946    /// Creates a new dataflow builder from the catalog and indexes in `self`.
3947    #[instrument(level = "debug")]
3948    pub fn dataflow_builder(&self, instance: ComputeInstanceId) -> DataflowBuilder<'_> {
3949        let compute = self
3950            .instance_snapshot(instance)
3951            .expect("compute instance does not exist");
3952        DataflowBuilder::new(self.catalog().state(), compute)
3953    }
3954
3955    /// Return a reference-less snapshot to the indicated compute instance.
3956    pub fn instance_snapshot(
3957        &self,
3958        id: ComputeInstanceId,
3959    ) -> Result<ComputeInstanceSnapshot, InstanceMissing> {
3960        ComputeInstanceSnapshot::new(&self.controller, id)
3961    }
3962
3963    /// Call into the compute controller to install a finalized dataflow, and
3964    /// initialize the read policies for its exported readable objects.
3965    ///
3966    /// # Panics
3967    ///
3968    /// Panics if dataflow creation fails.
3969    pub(crate) async fn ship_dataflow(
3970        &mut self,
3971        dataflow: DataflowDescription<Plan>,
3972        instance: ComputeInstanceId,
3973        target_replica: Option<ReplicaId>,
3974    ) {
3975        self.try_ship_dataflow(dataflow, instance, target_replica)
3976            .await
3977            .unwrap_or_terminate("dataflow creation cannot fail");
3978    }
3979
3980    /// Call into the compute controller to install a finalized dataflow, and
3981    /// initialize the read policies for its exported readable objects.
3982    pub(crate) async fn try_ship_dataflow(
3983        &mut self,
3984        dataflow: DataflowDescription<Plan>,
3985        instance: ComputeInstanceId,
3986        target_replica: Option<ReplicaId>,
3987    ) -> Result<(), DataflowCreationError> {
3988        // We must only install read policies for indexes, not for sinks.
3989        // Sinks are write-only compute collections that don't have read policies.
3990        let export_ids = dataflow.exported_index_ids().collect();
3991
3992        self.controller
3993            .compute
3994            .create_dataflow(instance, dataflow, target_replica)?;
3995
3996        self.initialize_compute_read_policies(export_ids, instance, CompactionWindow::Default)
3997            .await;
3998
3999        Ok(())
4000    }
4001
4002    /// Call into the compute controller to allow writes to the specified IDs
4003    /// from the specified instance. Calling this function multiple times and
4004    /// calling it on a read-only instance has no effect.
4005    pub(crate) fn allow_writes(&mut self, instance: ComputeInstanceId, id: GlobalId) {
4006        self.controller
4007            .compute
4008            .allow_writes(instance, id)
4009            .unwrap_or_terminate("allow_writes cannot fail");
4010    }
4011
4012    /// Like `ship_dataflow`, but also await on builtin table updates.
4013    pub(crate) async fn ship_dataflow_and_notice_builtin_table_updates(
4014        &mut self,
4015        dataflow: DataflowDescription<Plan>,
4016        instance: ComputeInstanceId,
4017        notice_builtin_updates_fut: Option<BuiltinTableAppendNotify>,
4018        target_replica: Option<ReplicaId>,
4019    ) {
4020        if let Some(notice_builtin_updates_fut) = notice_builtin_updates_fut {
4021            let ship_dataflow_fut = self.ship_dataflow(dataflow, instance, target_replica);
4022            let ((), ()) =
4023                futures::future::join(notice_builtin_updates_fut, ship_dataflow_fut).await;
4024        } else {
4025            self.ship_dataflow(dataflow, instance, target_replica).await;
4026        }
4027    }
4028
4029    /// Install a _watch set_ in the controller that is automatically associated with the given
4030    /// connection id. The watchset will be automatically cleared if the connection terminates
4031    /// before the watchset completes.
4032    pub fn install_compute_watch_set(
4033        &mut self,
4034        conn_id: ConnectionId,
4035        objects: BTreeSet<GlobalId>,
4036        t: Timestamp,
4037        state: WatchSetResponse,
4038    ) -> Result<(), CollectionLookupError> {
4039        let ws_id = self.controller.install_compute_watch_set(objects, t)?;
4040        self.connection_watch_sets
4041            .entry(conn_id.clone())
4042            .or_default()
4043            .insert(ws_id);
4044        self.installed_watch_sets.insert(ws_id, (conn_id, state));
4045        Ok(())
4046    }
4047
4048    /// Install a _watch set_ in the controller that is automatically associated with the given
4049    /// connection id. The watchset will be automatically cleared if the connection terminates
4050    /// before the watchset completes.
4051    pub fn install_storage_watch_set(
4052        &mut self,
4053        conn_id: ConnectionId,
4054        objects: BTreeSet<GlobalId>,
4055        t: Timestamp,
4056        state: WatchSetResponse,
4057    ) -> Result<(), CollectionMissing> {
4058        let ws_id = self.controller.install_storage_watch_set(objects, t)?;
4059        self.connection_watch_sets
4060            .entry(conn_id.clone())
4061            .or_default()
4062            .insert(ws_id);
4063        self.installed_watch_sets.insert(ws_id, (conn_id, state));
4064        Ok(())
4065    }
4066
4067    /// Cancels pending watchsets associated with the provided connection id.
4068    pub fn cancel_pending_watchsets(&mut self, conn_id: &ConnectionId) {
4069        if let Some(ws_ids) = self.connection_watch_sets.remove(conn_id) {
4070            for ws_id in ws_ids {
4071                self.installed_watch_sets.remove(&ws_id);
4072            }
4073        }
4074    }
4075
4076    /// Returns the state of the [`Coordinator`] formatted as JSON.
4077    ///
4078    /// The returned value is not guaranteed to be stable and may change at any point in time.
4079    pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
4080        // Note: We purposefully use the `Debug` formatting for the value of all fields in the
4081        // returned object as a tradeoff between usability and stability. `serde_json` will fail
4082        // to serialize an object if the keys aren't strings, so `Debug` formatting the values
4083        // prevents a future unrelated change from silently breaking this method.
4084
4085        let global_timelines: BTreeMap<_, _> = self
4086            .global_timelines
4087            .iter()
4088            .map(|(timeline, state)| (timeline.to_string(), format!("{state:?}")))
4089            .collect();
4090        let active_conns: BTreeMap<_, _> = self
4091            .active_conns
4092            .iter()
4093            .map(|(id, meta)| (id.unhandled().to_string(), format!("{meta:?}")))
4094            .collect();
4095        let txn_read_holds: BTreeMap<_, _> = self
4096            .txn_read_holds
4097            .iter()
4098            .map(|(id, capability)| (id.unhandled().to_string(), format!("{capability:?}")))
4099            .collect();
4100        let pending_peeks: BTreeMap<_, _> = self
4101            .pending_peeks
4102            .iter()
4103            .map(|(id, peek)| (id.to_string(), format!("{peek:?}")))
4104            .collect();
4105        let client_pending_peeks: BTreeMap<_, _> = self
4106            .client_pending_peeks
4107            .iter()
4108            .map(|(id, peek)| {
4109                let peek: BTreeMap<_, _> = peek
4110                    .iter()
4111                    .map(|(uuid, storage_id)| (uuid.to_string(), storage_id))
4112                    .collect();
4113                (id.to_string(), peek)
4114            })
4115            .collect();
4116        let pending_linearize_read_txns: BTreeMap<_, _> = self
4117            .pending_linearize_read_txns
4118            .iter()
4119            .map(|(id, read_txn)| (id.unhandled().to_string(), format!("{read_txn:?}")))
4120            .collect();
4121
4122        Ok(serde_json::json!({
4123            "global_timelines": global_timelines,
4124            "active_conns": active_conns,
4125            "txn_read_holds": txn_read_holds,
4126            "pending_peeks": pending_peeks,
4127            "client_pending_peeks": client_pending_peeks,
4128            "pending_linearize_read_txns": pending_linearize_read_txns,
4129            "controller": self.controller.dump().await?,
4130        }))
4131    }
4132
4133    /// Prune all storage usage events from the [`MZ_STORAGE_USAGE_BY_SHARD`] table that are older
4134    /// than `retention_period`.
4135    ///
4136    /// This method will read the entire contents of [`MZ_STORAGE_USAGE_BY_SHARD`] into memory
4137    /// which can be expensive.
4138    ///
4139    /// DO NOT call this method outside of startup. The safety of reading at the current oracle read
4140    /// timestamp and then writing at whatever the current write timestamp is (instead of
4141    /// `read_ts + 1`) relies on the fact that there are no outstanding writes during startup.
4142    ///
4143    /// Group commit, which this method uses to write the retractions, has builtin fencing, and we
4144    /// never commit retractions to [`MZ_STORAGE_USAGE_BY_SHARD`] outside of this method, which is
4145    /// only called once during startup. So we don't have to worry about double/invalid retractions.
4146    async fn prune_storage_usage_events_on_startup(&self, retention_period: Duration) {
4147        let item_id = self
4148            .catalog()
4149            .resolve_builtin_table(&MZ_STORAGE_USAGE_BY_SHARD);
4150        let global_id = self.catalog.get_entry(&item_id).latest_global_id();
4151        let read_ts = self.get_local_read_ts().await;
4152        let current_contents_fut = self
4153            .controller
4154            .storage_collections
4155            .snapshot(global_id, read_ts);
4156        let internal_cmd_tx = self.internal_cmd_tx.clone();
4157        spawn(|| "storage_usage_prune", async move {
4158            let mut current_contents = current_contents_fut
4159                .await
4160                .unwrap_or_terminate("cannot fail to fetch snapshot");
4161            differential_dataflow::consolidation::consolidate(&mut current_contents);
4162
4163            let cutoff_ts = u128::from(read_ts).saturating_sub(retention_period.as_millis());
4164            let mut expired = Vec::new();
4165            for (row, diff) in current_contents {
4166                assert_eq!(
4167                    diff, 1,
4168                    "consolidated contents should not contain retractions: ({row:#?}, {diff:#?})"
4169                );
4170                // This logic relies on the definition of `mz_storage_usage_by_shard` not changing.
4171                let collection_timestamp = row
4172                    .unpack()
4173                    .get(3)
4174                    .expect("definition of mz_storage_by_shard changed")
4175                    .unwrap_timestamptz();
4176                let collection_timestamp = collection_timestamp.timestamp_millis();
4177                let collection_timestamp: u128 = collection_timestamp
4178                    .try_into()
4179                    .expect("all collections happen after Jan 1 1970");
4180                if collection_timestamp < cutoff_ts {
4181                    debug!("pruning storage event {row:?}");
4182                    let builtin_update = BuiltinTableUpdate::row(item_id, row, Diff::MINUS_ONE);
4183                    expired.push(builtin_update);
4184                }
4185            }
4186
4187            // main thread has shut down.
4188            let _ = internal_cmd_tx.send(Message::StorageUsagePrune(expired));
4189        });
4190    }
4191
4192    fn current_credit_consumption_rate(&self) -> Numeric {
4193        self.catalog()
4194            .user_cluster_replicas()
4195            .filter_map(|replica| match &replica.config.location {
4196                ReplicaLocation::Managed(location) => Some(location.size_for_billing()),
4197                ReplicaLocation::Unmanaged(_) => None,
4198            })
4199            .map(|size| {
4200                self.catalog()
4201                    .cluster_replica_sizes()
4202                    .0
4203                    .get(size)
4204                    .expect("location size is validated against the cluster replica sizes")
4205                    .credits_per_hour
4206            })
4207            .sum()
4208    }
4209}
4210
4211#[cfg(test)]
4212impl Coordinator {
4213    #[allow(dead_code)]
4214    async fn verify_ship_dataflow_no_error(&mut self, dataflow: DataflowDescription<Plan>) {
4215        // `ship_dataflow_new` is not allowed to have a `Result` return because this function is
4216        // called after `catalog_transact`, after which no errors are allowed. This test exists to
4217        // prevent us from incorrectly teaching those functions how to return errors (which has
4218        // happened twice and is the motivation for this test).
4219
4220        // An arbitrary compute instance ID to satisfy the function calls below. Note that
4221        // this only works because this function will never run.
4222        let compute_instance = ComputeInstanceId::user(1).expect("1 is a valid ID");
4223
4224        let _: () = self.ship_dataflow(dataflow, compute_instance, None).await;
4225    }
4226}
4227
4228/// Contains information about the last message the [`Coordinator`] processed.
4229struct LastMessage {
4230    kind: &'static str,
4231    stmt: Option<Arc<Statement<Raw>>>,
4232}
4233
4234impl LastMessage {
4235    /// Returns a redacted version of the statement that is safe for logs.
4236    fn stmt_to_string(&self) -> Cow<'static, str> {
4237        self.stmt
4238            .as_ref()
4239            .map(|stmt| stmt.to_ast_string_redacted().into())
4240            .unwrap_or(Cow::Borrowed("<none>"))
4241    }
4242}
4243
4244impl fmt::Debug for LastMessage {
4245    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4246        f.debug_struct("LastMessage")
4247            .field("kind", &self.kind)
4248            .field("stmt", &self.stmt_to_string())
4249            .finish()
4250    }
4251}
4252
4253impl Drop for LastMessage {
4254    fn drop(&mut self) {
4255        // Only print the last message if we're currently panicking, otherwise we'd spam our logs.
4256        if std::thread::panicking() {
4257            // If we're panicking theres no guarantee `tracing` still works, so print to stderr.
4258            eprintln!("Coordinator panicking, dumping last message\n{self:?}",);
4259        }
4260    }
4261}
4262
4263/// Serves the coordinator based on the provided configuration.
4264///
4265/// For a high-level description of the coordinator, see the [crate
4266/// documentation](crate).
4267///
4268/// Returns a handle to the coordinator and a client to communicate with the
4269/// coordinator.
4270///
4271/// BOXED FUTURE: As of Nov 2023 the returned Future from this function was 42KB. This would
4272/// get stored on the stack which is bad for runtime performance, and blow up our stack usage.
4273/// Because of that we purposefully move this Future onto the heap (i.e. Box it).
4274pub fn serve(
4275    Config {
4276        controller_config,
4277        controller_envd_epoch,
4278        mut storage,
4279        audit_logs_iterator,
4280        timestamp_oracle_url,
4281        unsafe_mode,
4282        all_features,
4283        build_info,
4284        environment_id,
4285        metrics_registry,
4286        now,
4287        secrets_controller,
4288        cloud_resource_controller,
4289        cluster_replica_sizes,
4290        builtin_system_cluster_config,
4291        builtin_catalog_server_cluster_config,
4292        builtin_probe_cluster_config,
4293        builtin_support_cluster_config,
4294        builtin_analytics_cluster_config,
4295        system_parameter_defaults,
4296        availability_zones,
4297        storage_usage_client,
4298        storage_usage_collection_interval,
4299        storage_usage_retention_period,
4300        segment_client,
4301        egress_addresses,
4302        aws_account_id,
4303        aws_privatelink_availability_zones,
4304        connection_context,
4305        connection_limit_callback,
4306        remote_system_parameters,
4307        webhook_concurrency_limit,
4308        http_host_name,
4309        tracing_handle,
4310        read_only_controllers,
4311        caught_up_trigger: clusters_caught_up_trigger,
4312        helm_chart_version,
4313        license_key,
4314        external_login_password_mz_system,
4315        force_builtin_schema_migration,
4316    }: Config,
4317) -> BoxFuture<'static, Result<(Handle, Client), AdapterError>> {
4318    async move {
4319        let coord_start = Instant::now();
4320        info!("startup: coordinator init: beginning");
4321        info!("startup: coordinator init: preamble beginning");
4322
4323        // Initializing the builtins can be an expensive process and consume a lot of memory. We
4324        // forcibly initialize it early while the stack is relatively empty to avoid stack
4325        // overflows later.
4326        let _builtins = LazyLock::force(&BUILTINS_STATIC);
4327
4328        let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
4329        let (internal_cmd_tx, internal_cmd_rx) = mpsc::unbounded_channel();
4330        let (strict_serializable_reads_tx, strict_serializable_reads_rx) =
4331            mpsc::unbounded_channel();
4332
4333        // Validate and process availability zones.
4334        if !availability_zones.iter().all_unique() {
4335            coord_bail!("availability zones must be unique");
4336        }
4337
4338        let aws_principal_context = match (
4339            aws_account_id,
4340            connection_context.aws_external_id_prefix.clone(),
4341        ) {
4342            (Some(aws_account_id), Some(aws_external_id_prefix)) => Some(AwsPrincipalContext {
4343                aws_account_id,
4344                aws_external_id_prefix,
4345            }),
4346            _ => None,
4347        };
4348
4349        let aws_privatelink_availability_zones = aws_privatelink_availability_zones
4350            .map(|azs_vec| BTreeSet::from_iter(azs_vec.iter().cloned()));
4351
4352        info!(
4353            "startup: coordinator init: preamble complete in {:?}",
4354            coord_start.elapsed()
4355        );
4356        let oracle_init_start = Instant::now();
4357        info!("startup: coordinator init: timestamp oracle init beginning");
4358
4359        let timestamp_oracle_config = timestamp_oracle_url
4360            .map(|url| TimestampOracleConfig::from_url(&url, &metrics_registry))
4361            .transpose()?;
4362        let mut initial_timestamps =
4363            get_initial_oracle_timestamps(&timestamp_oracle_config).await?;
4364
4365        // Insert an entry for the `EpochMilliseconds` timeline if one doesn't exist,
4366        // which will ensure that the timeline is initialized since it's required
4367        // by the system.
4368        initial_timestamps
4369            .entry(Timeline::EpochMilliseconds)
4370            .or_insert_with(mz_repr::Timestamp::minimum);
4371        let mut timestamp_oracles = BTreeMap::new();
4372        for (timeline, initial_timestamp) in initial_timestamps {
4373            Coordinator::ensure_timeline_state_with_initial_time(
4374                &timeline,
4375                initial_timestamp,
4376                now.clone(),
4377                timestamp_oracle_config.clone(),
4378                &mut timestamp_oracles,
4379                read_only_controllers,
4380            )
4381            .await;
4382        }
4383
4384        // Opening the durable catalog uses one or more timestamps without communicating with
4385        // the timestamp oracle. Here we make sure to apply the catalog upper with the timestamp
4386        // oracle to linearize future operations with opening the catalog.
4387        let catalog_upper = storage.current_upper().await;
4388        // Choose a time at which to boot. This is used, for example, to prune
4389        // old storage usage data or migrate audit log entries.
4390        //
4391        // This time is usually the current system time, but with protection
4392        // against backwards time jumps, even across restarts.
4393        let epoch_millis_oracle = &timestamp_oracles
4394            .get(&Timeline::EpochMilliseconds)
4395            .expect("inserted above")
4396            .oracle;
4397
4398        let mut boot_ts = if read_only_controllers {
4399            let read_ts = epoch_millis_oracle.read_ts().await;
4400            std::cmp::max(read_ts, catalog_upper)
4401        } else {
4402            // Getting/applying a write timestamp bumps the write timestamp in the
4403            // oracle, which we're not allowed in read-only mode.
4404            epoch_millis_oracle.apply_write(catalog_upper).await;
4405            epoch_millis_oracle.write_ts().await.timestamp
4406        };
4407
4408        info!(
4409            "startup: coordinator init: timestamp oracle init complete in {:?}",
4410            oracle_init_start.elapsed()
4411        );
4412
4413        let catalog_open_start = Instant::now();
4414        info!("startup: coordinator init: catalog open beginning");
4415        let persist_client = controller_config
4416            .persist_clients
4417            .open(controller_config.persist_location.clone())
4418            .await
4419            .context("opening persist client")?;
4420        let builtin_item_migration_config =
4421            BuiltinItemMigrationConfig {
4422                persist_client: persist_client.clone(),
4423                read_only: read_only_controllers,
4424                force_migration: force_builtin_schema_migration,
4425            }
4426        ;
4427        let OpenCatalogResult {
4428            mut catalog,
4429            migrated_storage_collections_0dt,
4430            new_builtin_collections,
4431            builtin_table_updates,
4432            cached_global_exprs,
4433            uncached_local_exprs,
4434        } = Catalog::open(mz_catalog::config::Config {
4435            storage,
4436            metrics_registry: &metrics_registry,
4437            state: mz_catalog::config::StateConfig {
4438                unsafe_mode,
4439                all_features,
4440                build_info,
4441                deploy_generation: controller_config.deploy_generation,
4442                environment_id: environment_id.clone(),
4443                read_only: read_only_controllers,
4444                now: now.clone(),
4445                boot_ts: boot_ts.clone(),
4446                skip_migrations: false,
4447                cluster_replica_sizes,
4448                builtin_system_cluster_config,
4449                builtin_catalog_server_cluster_config,
4450                builtin_probe_cluster_config,
4451                builtin_support_cluster_config,
4452                builtin_analytics_cluster_config,
4453                system_parameter_defaults,
4454                remote_system_parameters,
4455                availability_zones,
4456                egress_addresses,
4457                aws_principal_context,
4458                aws_privatelink_availability_zones,
4459                connection_context,
4460                http_host_name,
4461                builtin_item_migration_config,
4462                persist_client: persist_client.clone(),
4463                enable_expression_cache_override: None,
4464                helm_chart_version,
4465                external_login_password_mz_system,
4466                license_key: license_key.clone(),
4467            },
4468        })
4469        .await?;
4470
4471        // Opening the catalog uses one or more timestamps, so push the boot timestamp up to the
4472        // current catalog upper.
4473        let catalog_upper = catalog.current_upper().await;
4474        boot_ts = std::cmp::max(boot_ts, catalog_upper);
4475
4476        if !read_only_controllers {
4477            epoch_millis_oracle.apply_write(boot_ts).await;
4478        }
4479
4480        info!(
4481            "startup: coordinator init: catalog open complete in {:?}",
4482            catalog_open_start.elapsed()
4483        );
4484
4485        let coord_thread_start = Instant::now();
4486        info!("startup: coordinator init: coordinator thread start beginning");
4487
4488        let session_id = catalog.config().session_id;
4489        let start_instant = catalog.config().start_instant;
4490
4491        // In order for the coordinator to support Rc and Refcell types, it cannot be
4492        // sent across threads. Spawn it in a thread and have this parent thread wait
4493        // for bootstrap completion before proceeding.
4494        let (bootstrap_tx, bootstrap_rx) = oneshot::channel();
4495        let handle = TokioHandle::current();
4496
4497        let metrics = Metrics::register_into(&metrics_registry);
4498        let metrics_clone = metrics.clone();
4499        let optimizer_metrics = OptimizerMetrics::register_into(
4500            &metrics_registry,
4501            catalog.system_config().optimizer_e2e_latency_warning_threshold(),
4502        );
4503        let segment_client_clone = segment_client.clone();
4504        let coord_now = now.clone();
4505        let advance_timelines_interval =
4506            tokio::time::interval(catalog.system_config().default_timestamp_interval());
4507        let mut check_scheduling_policies_interval = tokio::time::interval(
4508            catalog
4509                .system_config()
4510                .cluster_check_scheduling_policies_interval(),
4511        );
4512        check_scheduling_policies_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
4513
4514        let clusters_caught_up_check_interval = if read_only_controllers {
4515            let dyncfgs = catalog.system_config().dyncfgs();
4516            let interval = WITH_0DT_DEPLOYMENT_CAUGHT_UP_CHECK_INTERVAL.get(dyncfgs);
4517
4518            let mut interval = tokio::time::interval(interval);
4519            interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
4520            interval
4521        } else {
4522            // When not in read-only mode, we don't do hydration checks. But we
4523            // still have to provide _some_ interval. This is large enough that
4524            // it doesn't matter.
4525            //
4526            // TODO(aljoscha): We cannot use Duration::MAX right now because of
4527            // https://github.com/tokio-rs/tokio/issues/6634. Use that once it's
4528            // fixed for good.
4529            let mut interval = tokio::time::interval(Duration::from_secs(60 * 60));
4530            interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
4531            interval
4532        };
4533
4534        let clusters_caught_up_check =
4535            clusters_caught_up_trigger.map(|trigger| CaughtUpCheckContext {
4536                trigger,
4537                exclude_collections: new_builtin_collections.into_iter().collect(),
4538            });
4539
4540        if let Some(TimestampOracleConfig::Postgres(pg_config)) =
4541            timestamp_oracle_config.as_ref()
4542        {
4543            // Apply settings from system vars as early as possible because some
4544            // of them are locked in right when an oracle is first opened!
4545            let pg_timestamp_oracle_params =
4546                flags::timestamp_oracle_config(catalog.system_config());
4547            pg_timestamp_oracle_params.apply(pg_config);
4548        }
4549
4550        // Register a callback so whenever the MAX_CONNECTIONS or SUPERUSER_RESERVED_CONNECTIONS
4551        // system variables change, we update our connection limits.
4552        let connection_limit_callback: Arc<dyn Fn(&SystemVars) + Send + Sync> =
4553            Arc::new(move |system_vars: &SystemVars| {
4554                let limit: u64 = system_vars.max_connections().cast_into();
4555                let superuser_reserved: u64 =
4556                    system_vars.superuser_reserved_connections().cast_into();
4557
4558                // If superuser_reserved > max_connections, prefer max_connections.
4559                //
4560                // In this scenario all normal users would be locked out because all connections
4561                // would be reserved for superusers so complain if this is the case.
4562                let superuser_reserved = if superuser_reserved >= limit {
4563                    tracing::warn!(
4564                        "superuser_reserved ({superuser_reserved}) is greater than max connections ({limit})!"
4565                    );
4566                    limit
4567                } else {
4568                    superuser_reserved
4569                };
4570
4571                (connection_limit_callback)(limit, superuser_reserved);
4572            });
4573        catalog.system_config_mut().register_callback(
4574            &mz_sql::session::vars::MAX_CONNECTIONS,
4575            Arc::clone(&connection_limit_callback),
4576        );
4577        catalog.system_config_mut().register_callback(
4578            &mz_sql::session::vars::SUPERUSER_RESERVED_CONNECTIONS,
4579            connection_limit_callback,
4580        );
4581
4582        let (group_commit_tx, group_commit_rx) = appends::notifier();
4583
4584        let parent_span = tracing::Span::current();
4585        let thread = thread::Builder::new()
4586            // The Coordinator thread tends to keep a lot of data on its stack. To
4587            // prevent a stack overflow we allocate a stack three times as big as the default
4588            // stack.
4589            .stack_size(3 * stack::STACK_SIZE)
4590            .name("coordinator".to_string())
4591            .spawn(move || {
4592                let span = info_span!(parent: parent_span, "coord::coordinator").entered();
4593
4594                let controller = handle
4595                    .block_on({
4596                        catalog.initialize_controller(
4597                            controller_config,
4598                            controller_envd_epoch,
4599                            read_only_controllers,
4600                        )
4601                    })
4602                    .unwrap_or_terminate("failed to initialize storage_controller");
4603                // Initializing the controller uses one or more timestamps, so push the boot timestamp up to the
4604                // current catalog upper.
4605                let catalog_upper = handle.block_on(catalog.current_upper());
4606                boot_ts = std::cmp::max(boot_ts, catalog_upper);
4607                if !read_only_controllers {
4608                    let epoch_millis_oracle = &timestamp_oracles
4609                        .get(&Timeline::EpochMilliseconds)
4610                        .expect("inserted above")
4611                        .oracle;
4612                    handle.block_on(epoch_millis_oracle.apply_write(boot_ts));
4613                }
4614
4615                let catalog = Arc::new(catalog);
4616
4617                let caching_secrets_reader = CachingSecretsReader::new(secrets_controller.reader());
4618                let mut coord = Coordinator {
4619                    controller,
4620                    catalog,
4621                    internal_cmd_tx,
4622                    group_commit_tx,
4623                    strict_serializable_reads_tx,
4624                    global_timelines: timestamp_oracles,
4625                    transient_id_gen: Arc::new(TransientIdGen::new()),
4626                    active_conns: BTreeMap::new(),
4627                    txn_read_holds: Default::default(),
4628                    pending_peeks: BTreeMap::new(),
4629                    client_pending_peeks: BTreeMap::new(),
4630                    pending_linearize_read_txns: BTreeMap::new(),
4631                    serialized_ddl: LockedVecDeque::new(),
4632                    active_compute_sinks: BTreeMap::new(),
4633                    active_webhooks: BTreeMap::new(),
4634                    active_copies: BTreeMap::new(),
4635                    staged_cancellation: BTreeMap::new(),
4636                    introspection_subscribes: BTreeMap::new(),
4637                    write_locks: BTreeMap::new(),
4638                    deferred_write_ops: BTreeMap::new(),
4639                    pending_writes: Vec::new(),
4640                    advance_timelines_interval,
4641                    secrets_controller,
4642                    caching_secrets_reader,
4643                    cloud_resource_controller,
4644                    storage_usage_client,
4645                    storage_usage_collection_interval,
4646                    segment_client,
4647                    metrics,
4648                    optimizer_metrics,
4649                    tracing_handle,
4650                    statement_logging: StatementLogging::new(coord_now.clone()),
4651                    webhook_concurrency_limit,
4652                    timestamp_oracle_config,
4653                    check_cluster_scheduling_policies_interval: check_scheduling_policies_interval,
4654                    cluster_scheduling_decisions: BTreeMap::new(),
4655                    caught_up_check_interval: clusters_caught_up_check_interval,
4656                    caught_up_check: clusters_caught_up_check,
4657                    installed_watch_sets: BTreeMap::new(),
4658                    connection_watch_sets: BTreeMap::new(),
4659                    cluster_replica_statuses: ClusterReplicaStatuses::new(),
4660                    read_only_controllers,
4661                    buffered_builtin_table_updates: Some(Vec::new()),
4662                    license_key,
4663                    user_id_pool: IdPool::empty(),
4664                    persist_client,
4665                };
4666                let bootstrap = handle.block_on(async {
4667                    coord
4668                        .bootstrap(
4669                            boot_ts,
4670                            migrated_storage_collections_0dt,
4671                            builtin_table_updates,
4672                            cached_global_exprs,
4673                            uncached_local_exprs,
4674                            audit_logs_iterator,
4675                        )
4676                        .await?;
4677                    coord
4678                        .controller
4679                        .remove_orphaned_replicas(
4680                            coord.catalog().get_next_user_replica_id().await?,
4681                            coord.catalog().get_next_system_replica_id().await?,
4682                        )
4683                        .await
4684                        .map_err(AdapterError::Orchestrator)?;
4685
4686                    if let Some(retention_period) = storage_usage_retention_period {
4687                        coord
4688                            .prune_storage_usage_events_on_startup(retention_period)
4689                            .await;
4690                    }
4691
4692                    Ok(())
4693                });
4694                let ok = bootstrap.is_ok();
4695                drop(span);
4696                bootstrap_tx
4697                    .send(bootstrap)
4698                    .expect("bootstrap_rx is not dropped until it receives this message");
4699                if ok {
4700                    handle.block_on(coord.serve(
4701                        internal_cmd_rx,
4702                        strict_serializable_reads_rx,
4703                        cmd_rx,
4704                        group_commit_rx,
4705                    ));
4706                }
4707            })
4708            .expect("failed to create coordinator thread");
4709        match bootstrap_rx
4710            .await
4711            .expect("bootstrap_tx always sends a message or panics/halts")
4712        {
4713            Ok(()) => {
4714                info!(
4715                    "startup: coordinator init: coordinator thread start complete in {:?}",
4716                    coord_thread_start.elapsed()
4717                );
4718                info!(
4719                    "startup: coordinator init: complete in {:?}",
4720                    coord_start.elapsed()
4721                );
4722                let handle = Handle {
4723                    session_id,
4724                    start_instant,
4725                    _thread: thread.join_on_drop(),
4726                };
4727                let client = Client::new(
4728                    build_info,
4729                    cmd_tx,
4730                    metrics_clone,
4731                    now,
4732                    environment_id,
4733                    segment_client_clone,
4734                );
4735                Ok((handle, client))
4736            }
4737            Err(e) => Err(e),
4738        }
4739    }
4740    .boxed()
4741}
4742
4743// Determines and returns the highest timestamp for each timeline, for all known
4744// timestamp oracle implementations.
4745//
4746// Initially, we did this so that we can switch between implementations of
4747// timestamp oracle, but now we also do this to determine a monotonic boot
4748// timestamp, a timestamp that does not regress across reboots.
4749//
4750// This mostly works, but there can be linearizability violations, because there
4751// is no central moment where we do distributed coordination for all oracle
4752// types. Working around this seems prohibitively hard, maybe even impossible so
4753// we have to live with this window of potential violations during the upgrade
4754// window (which is the only point where we should switch oracle
4755// implementations).
4756async fn get_initial_oracle_timestamps(
4757    timestamp_oracle_config: &Option<TimestampOracleConfig>,
4758) -> Result<BTreeMap<Timeline, Timestamp>, AdapterError> {
4759    let mut initial_timestamps = BTreeMap::new();
4760
4761    if let Some(config) = timestamp_oracle_config {
4762        let oracle_timestamps = config.get_all_timelines().await?;
4763
4764        let debug_msg = || {
4765            oracle_timestamps
4766                .iter()
4767                .map(|(timeline, ts)| format!("{:?} -> {}", timeline, ts))
4768                .join(", ")
4769        };
4770        info!(
4771            "current timestamps from the timestamp oracle: {}",
4772            debug_msg()
4773        );
4774
4775        for (timeline, ts) in oracle_timestamps {
4776            let entry = initial_timestamps
4777                .entry(Timeline::from_str(&timeline).expect("could not parse timeline"));
4778
4779            entry
4780                .and_modify(|current_ts| *current_ts = std::cmp::max(*current_ts, ts))
4781                .or_insert(ts);
4782        }
4783    } else {
4784        info!("no timestamp oracle configured!");
4785    };
4786
4787    let debug_msg = || {
4788        initial_timestamps
4789            .iter()
4790            .map(|(timeline, ts)| format!("{:?}: {}", timeline, ts))
4791            .join(", ")
4792    };
4793    info!("initial oracle timestamps: {}", debug_msg());
4794
4795    Ok(initial_timestamps)
4796}
4797
4798#[instrument]
4799pub async fn load_remote_system_parameters(
4800    storage: &mut Box<dyn OpenableDurableCatalogState>,
4801    system_parameter_sync_config: Option<SystemParameterSyncConfig>,
4802    system_parameter_sync_timeout: Duration,
4803) -> Result<Option<BTreeMap<String, String>>, AdapterError> {
4804    if let Some(system_parameter_sync_config) = system_parameter_sync_config {
4805        tracing::info!("parameter sync on boot: start sync");
4806
4807        // We intentionally block initial startup, potentially forever,
4808        // on initializing LaunchDarkly. This may seem scary, but the
4809        // alternative is even scarier. Over time, we expect that the
4810        // compiled-in default values for the system parameters will
4811        // drift substantially from the defaults configured in
4812        // LaunchDarkly, to the point that starting an environment
4813        // without loading the latest values from LaunchDarkly will
4814        // result in running an untested configuration.
4815        //
4816        // Note this only applies during initial startup. Restarting
4817        // after we've synced once only blocks for a maximum of
4818        // `FRONTEND_SYNC_TIMEOUT` on LaunchDarkly, as it seems
4819        // reasonable to assume that the last-synced configuration was
4820        // valid enough.
4821        //
4822        // This philosophy appears to provide a good balance between not
4823        // running untested configurations in production while also not
4824        // making LaunchDarkly a "tier 1" dependency for existing
4825        // environments.
4826        //
4827        // If this proves to be an issue, we could seek to address the
4828        // configuration drift in a different way--for example, by
4829        // writing a script that runs in CI nightly and checks for
4830        // deviation between the compiled Rust code and LaunchDarkly.
4831        //
4832        // If it is absolutely necessary to bring up a new environment
4833        // while LaunchDarkly is down, the following manual mitigation
4834        // can be performed:
4835        //
4836        //    1. Edit the environmentd startup parameters to omit the
4837        //       LaunchDarkly configuration.
4838        //    2. Boot environmentd.
4839        //    3. Use the catalog-debug tool to run `edit config "{\"key\":\"system_config_synced\"}" "{\"value\": 1}"`.
4840        //    4. Adjust any other parameters as necessary to avoid
4841        //       running a nonstandard configuration in production.
4842        //    5. Edit the environmentd startup parameters to restore the
4843        //       LaunchDarkly configuration, for when LaunchDarkly comes
4844        //       back online.
4845        //    6. Reboot environmentd.
4846        let mut params = SynchronizedParameters::new(SystemVars::default());
4847        let frontend_sync = async {
4848            let frontend = SystemParameterFrontend::from(&system_parameter_sync_config).await?;
4849            frontend.pull(&mut params);
4850            let ops = params
4851                .modified()
4852                .into_iter()
4853                .map(|param| {
4854                    let name = param.name;
4855                    let value = param.value;
4856                    tracing::info!(name, value, initial = true, "sync parameter");
4857                    (name, value)
4858                })
4859                .collect();
4860            tracing::info!("parameter sync on boot: end sync");
4861            Ok(Some(ops))
4862        };
4863        if !storage.has_system_config_synced_once().await? {
4864            frontend_sync.await
4865        } else {
4866            match mz_ore::future::timeout(system_parameter_sync_timeout, frontend_sync).await {
4867                Ok(ops) => Ok(ops),
4868                Err(TimeoutError::Inner(e)) => Err(e),
4869                Err(TimeoutError::DeadlineElapsed) => {
4870                    tracing::info!("parameter sync on boot: sync has timed out");
4871                    Ok(None)
4872                }
4873            }
4874        }
4875    } else {
4876        Ok(None)
4877    }
4878}
4879
4880#[derive(Debug)]
4881pub enum WatchSetResponse {
4882    StatementDependenciesReady(StatementLoggingId, StatementLifecycleEvent),
4883    AlterSinkReady(AlterSinkReadyContext),
4884    AlterMaterializedViewReady(AlterMaterializedViewReadyContext),
4885}
4886
4887#[derive(Debug)]
4888pub struct AlterSinkReadyContext {
4889    ctx: Option<ExecuteContext>,
4890    otel_ctx: OpenTelemetryContext,
4891    plan: AlterSinkPlan,
4892    plan_validity: PlanValidity,
4893    read_hold: ReadHolds<Timestamp>,
4894}
4895
4896impl AlterSinkReadyContext {
4897    fn ctx(&mut self) -> &mut ExecuteContext {
4898        self.ctx.as_mut().expect("only cleared on drop")
4899    }
4900
4901    fn retire(mut self, result: Result<ExecuteResponse, AdapterError>) {
4902        self.ctx
4903            .take()
4904            .expect("only cleared on drop")
4905            .retire(result);
4906    }
4907}
4908
4909impl Drop for AlterSinkReadyContext {
4910    fn drop(&mut self) {
4911        if let Some(ctx) = self.ctx.take() {
4912            ctx.retire(Err(AdapterError::Canceled));
4913        }
4914    }
4915}
4916
4917#[derive(Debug)]
4918pub struct AlterMaterializedViewReadyContext {
4919    ctx: Option<ExecuteContext>,
4920    otel_ctx: OpenTelemetryContext,
4921    plan: plan::AlterMaterializedViewApplyReplacementPlan,
4922    plan_validity: PlanValidity,
4923}
4924
4925impl AlterMaterializedViewReadyContext {
4926    fn ctx(&mut self) -> &mut ExecuteContext {
4927        self.ctx.as_mut().expect("only cleared on drop")
4928    }
4929
4930    fn retire(mut self, result: Result<ExecuteResponse, AdapterError>) {
4931        self.ctx
4932            .take()
4933            .expect("only cleared on drop")
4934            .retire(result);
4935    }
4936}
4937
4938impl Drop for AlterMaterializedViewReadyContext {
4939    fn drop(&mut self) {
4940        if let Some(ctx) = self.ctx.take() {
4941            ctx.retire(Err(AdapterError::Canceled));
4942        }
4943    }
4944}
4945
4946/// A struct for tracking the ownership of a lock and a VecDeque to store to-be-done work after the
4947/// lock is freed.
4948#[derive(Debug)]
4949struct LockedVecDeque<T> {
4950    items: VecDeque<T>,
4951    lock: Arc<tokio::sync::Mutex<()>>,
4952}
4953
4954impl<T> LockedVecDeque<T> {
4955    pub fn new() -> Self {
4956        Self {
4957            items: VecDeque::new(),
4958            lock: Arc::new(tokio::sync::Mutex::new(())),
4959        }
4960    }
4961
4962    pub fn try_lock_owned(&self) -> Result<OwnedMutexGuard<()>, tokio::sync::TryLockError> {
4963        Arc::clone(&self.lock).try_lock_owned()
4964    }
4965
4966    pub fn is_empty(&self) -> bool {
4967        self.items.is_empty()
4968    }
4969
4970    pub fn push_back(&mut self, value: T) {
4971        self.items.push_back(value)
4972    }
4973
4974    pub fn pop_front(&mut self) -> Option<T> {
4975        self.items.pop_front()
4976    }
4977
4978    pub fn remove(&mut self, index: usize) -> Option<T> {
4979        self.items.remove(index)
4980    }
4981
4982    pub fn iter(&self) -> std::collections::vec_deque::Iter<'_, T> {
4983        self.items.iter()
4984    }
4985}
4986
4987#[derive(Debug)]
4988struct DeferredPlanStatement {
4989    ctx: ExecuteContext,
4990    ps: PlanStatement,
4991}
4992
4993#[derive(Debug)]
4994enum PlanStatement {
4995    Statement {
4996        stmt: Arc<Statement<Raw>>,
4997        params: Params,
4998    },
4999    Plan {
5000        plan: mz_sql::plan::Plan,
5001        resolved_ids: ResolvedIds,
5002    },
5003}
5004
5005#[derive(Debug, Error)]
5006pub enum NetworkPolicyError {
5007    #[error("Access denied for address {0}")]
5008    AddressDenied(IpAddr),
5009    #[error("Access denied missing IP address")]
5010    MissingIp,
5011}
5012
5013pub(crate) fn validate_ip_with_policy_rules(
5014    ip: &IpAddr,
5015    rules: &Vec<NetworkPolicyRule>,
5016) -> Result<(), NetworkPolicyError> {
5017    // At the moment we're not handling action or direction
5018    // as those are only able to be "allow" and "ingress" respectively
5019    if rules.iter().any(|r| r.address.0.contains(ip)) {
5020        Ok(())
5021    } else {
5022        Err(NetworkPolicyError::AddressDenied(ip.clone()))
5023    }
5024}
5025
5026pub(crate) fn infer_sql_type_for_catalog(
5027    hir_expr: &HirRelationExpr,
5028    mir_expr: &MirRelationExpr,
5029) -> SqlRelationType {
5030    let mut typ = hir_expr.top_level_typ();
5031    typ.backport_nullability_and_keys(&mir_expr.typ());
5032    typ
5033}
5034
5035#[cfg(test)]
5036mod id_pool_tests {
5037    use super::IdPool;
5038
5039    #[mz_ore::test]
5040    fn test_empty_pool() {
5041        let mut pool = IdPool::empty();
5042        assert_eq!(pool.remaining(), 0);
5043        assert_eq!(pool.allocate(), None);
5044        assert_eq!(pool.allocate_many(1), None);
5045    }
5046
5047    #[mz_ore::test]
5048    fn test_allocate_single() {
5049        let mut pool = IdPool::empty();
5050        pool.refill(10, 13);
5051        assert_eq!(pool.remaining(), 3);
5052        assert_eq!(pool.allocate(), Some(10));
5053        assert_eq!(pool.allocate(), Some(11));
5054        assert_eq!(pool.allocate(), Some(12));
5055        assert_eq!(pool.remaining(), 0);
5056        assert_eq!(pool.allocate(), None);
5057    }
5058
5059    #[mz_ore::test]
5060    fn test_allocate_many() {
5061        let mut pool = IdPool::empty();
5062        pool.refill(100, 105);
5063        assert_eq!(pool.allocate_many(3), Some(vec![100, 101, 102]));
5064        assert_eq!(pool.remaining(), 2);
5065        // Not enough remaining for 3 more.
5066        assert_eq!(pool.allocate_many(3), None);
5067        // But 2 works.
5068        assert_eq!(pool.allocate_many(2), Some(vec![103, 104]));
5069        assert_eq!(pool.remaining(), 0);
5070    }
5071
5072    #[mz_ore::test]
5073    fn test_allocate_many_zero() {
5074        let mut pool = IdPool::empty();
5075        pool.refill(1, 5);
5076        assert_eq!(pool.allocate_many(0), Some(vec![]));
5077        assert_eq!(pool.remaining(), 4);
5078    }
5079
5080    #[mz_ore::test]
5081    fn test_refill_resets_pool() {
5082        let mut pool = IdPool::empty();
5083        pool.refill(0, 2);
5084        assert_eq!(pool.allocate(), Some(0));
5085        // Refill before exhaustion replaces the range.
5086        pool.refill(50, 52);
5087        assert_eq!(pool.allocate(), Some(50));
5088        assert_eq!(pool.allocate(), Some(51));
5089        assert_eq!(pool.allocate(), None);
5090    }
5091
5092    #[mz_ore::test]
5093    fn test_mixed_allocate_and_allocate_many() {
5094        let mut pool = IdPool::empty();
5095        pool.refill(0, 10);
5096        assert_eq!(pool.allocate(), Some(0));
5097        assert_eq!(pool.allocate_many(3), Some(vec![1, 2, 3]));
5098        assert_eq!(pool.allocate(), Some(4));
5099        assert_eq!(pool.remaining(), 5);
5100    }
5101
5102    #[mz_ore::test]
5103    #[should_panic(expected = "invalid pool range")]
5104    fn test_refill_invalid_range_panics() {
5105        let mut pool = IdPool::empty();
5106        pool.refill(10, 5);
5107    }
5108}