Skip to main content

mz_adapter/
coord.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Translation of SQL commands into timestamped `Controller` commands.
11//!
12//! The various SQL commands instruct the system to take actions that are not
13//! yet explicitly timestamped. On the other hand, the underlying data continually
14//! change as time moves forward. On the third hand, we greatly benefit from the
15//! information that some times are no longer of interest, so that we may
16//! compact the representation of the continually changing collections.
17//!
18//! The [`Coordinator`] curates these interactions by observing the progress
19//! collections make through time, choosing timestamps for its own commands,
20//! and eventually communicating that certain times have irretrievably "passed".
21//!
22//! ## Frontiers another way
23//!
24//! If the above description of frontiers left you with questions, this
25//! repackaged explanation might help.
26//!
27//! - `since` is the least recent time (i.e. oldest time) that you can read
28//!   from sources and be guaranteed that the returned data is accurate as of
29//!   that time.
30//!
31//!   Reads at times less than `since` may return values that were not actually
32//!   seen at the specified time, but arrived later (i.e. the results are
33//!   compacted).
34//!
35//!   For correctness' sake, the coordinator never chooses to read at a time
36//!   less than an arrangement's `since`.
37//!
38//! - `upper` is the first time after the most recent time that you can read
39//!   from sources and receive an immediate response. Alternately, it is the
40//!   least time at which the data may still change (that is the reason we may
41//!   not be able to respond immediately).
42//!
43//!   Reads at times >= `upper` may not immediately return because the answer
44//!   isn't known yet. However, once the `upper` is > the specified read time,
45//!   the read can return.
46//!
47//!   For the sake of returned values' freshness, the coordinator prefers
48//!   performing reads at an arrangement's `upper`. However, because we more
49//!   strongly prefer correctness, the coordinator will choose timestamps
50//!   greater than an object's `upper` if it is also being accessed alongside
51//!   objects whose `since` times are >= its `upper`.
52//!
53//! This illustration attempts to show, with time moving left to right, the
54//! relationship between `since` and `upper`.
55//!
56//! - `#`: possibly inaccurate results
57//! - `-`: immediate, correct response
58//! - `?`: not yet known
59//! - `s`: since
60//! - `u`: upper
61//! - `|`: eligible for coordinator to select
62//!
63//! ```nofmt
64//! ####s----u?????
65//!     |||||||||||
66//! ```
67//!
68
69use std::borrow::Cow;
70use std::collections::{BTreeMap, BTreeSet, VecDeque};
71use std::net::IpAddr;
72use std::num::NonZeroI64;
73use std::ops::Neg;
74use std::str::FromStr;
75use std::sync::LazyLock;
76use std::sync::{Arc, Mutex};
77use std::thread;
78use std::time::{Duration, Instant};
79use std::{fmt, mem};
80
81use anyhow::Context;
82use chrono::{DateTime, Utc};
83use derivative::Derivative;
84use differential_dataflow::lattice::Lattice;
85use fail::fail_point;
86use futures::StreamExt;
87use futures::future::{BoxFuture, FutureExt, LocalBoxFuture};
88use http::Uri;
89use ipnet::IpNet;
90use itertools::Itertools;
91use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
92use mz_adapter_types::compaction::CompactionWindow;
93use mz_adapter_types::connection::ConnectionId;
94use mz_adapter_types::dyncfgs::{
95    ENABLE_SCOPED_SYSTEM_PARAMETERS, USER_ID_POOL_BATCH_SIZE,
96    WITH_0DT_DEPLOYMENT_CAUGHT_UP_CHECK_INTERVAL,
97};
98use mz_auth::password::Password;
99use mz_build_info::BuildInfo;
100use mz_catalog::builtin::{BUILTINS, BUILTINS_STATIC, MZ_AUDIT_EVENTS, MZ_STORAGE_USAGE_BY_SHARD};
101use mz_catalog::config::{AwsPrincipalContext, BuiltinItemMigrationConfig, ClusterReplicaSizeMap};
102use mz_catalog::durable::{AuditLogIterator, OpenableDurableCatalogState};
103use mz_catalog::expr_cache::{GlobalExpressions, LocalExpressions};
104use mz_catalog::memory::objects::{
105    CatalogEntry, CatalogItem, ClusterReplicaProcessStatus, ClusterVariantManaged, Connection,
106    DataSourceDesc, StateDiff, StateUpdate, StateUpdateKind, Table, TableDataSource,
107};
108use mz_cloud_resources::{CloudResourceController, VpcEndpointConfig, VpcEndpointEvent};
109use mz_compute_client::as_of_selection;
110use mz_compute_client::controller::error::{
111    CollectionLookupError, CollectionMissing, DataflowCreationError, InstanceMissing,
112};
113use mz_compute_types::ComputeInstanceId;
114use mz_compute_types::dataflows::DataflowDescription;
115use mz_compute_types::plan::Plan;
116use mz_controller::clusters::{
117    ClusterConfig, ClusterEvent, ClusterStatus, ProcessId, ReplicaLocation,
118};
119use mz_controller::{ControllerConfig, Readiness};
120use mz_controller_types::{ClusterId, ReplicaId, WatchSetId};
121use mz_dyncfg::ConfigUpdates;
122use mz_expr::{MapFilterProject, MirRelationExpr, OptimizedMirRelationExpr, RowSetFinishing};
123use mz_license_keys::{ExpirationBehavior, ValidatedLicenseKey};
124use mz_orchestrator::OfflineReason;
125use mz_ore::cast::{CastFrom, CastInto, CastLossy};
126use mz_ore::channel::trigger::Trigger;
127use mz_ore::future::TimeoutError;
128use mz_ore::metrics::MetricsRegistry;
129use mz_ore::now::{EpochMillis, NowFn};
130use mz_ore::task::{JoinHandle, spawn};
131use mz_ore::thread::JoinHandleExt;
132use mz_ore::tracing::{OpenTelemetryContext, TracingHandle};
133use mz_ore::url::SensitiveUrl;
134use mz_ore::{
135    assert_none, instrument, soft_assert_eq_or_log, soft_assert_or_log, soft_panic_or_log, stack,
136};
137use mz_persist_client::PersistClient;
138use mz_persist_client::batch::ProtoBatch;
139use mz_persist_client::usage::{ShardsUsageReferenced, StorageUsageClient};
140use mz_repr::adt::numeric::Numeric;
141use mz_repr::explain::{ExplainConfig, ExplainFormat};
142use mz_repr::global_id::TransientIdGen;
143use mz_repr::optimize::{OptimizerFeatureOverrides, OptimizerFeatures, OverrideFrom};
144use mz_repr::role_id::RoleId;
145use mz_repr::{CatalogItemId, Diff, GlobalId, RelationDesc, SqlRelationType, Timestamp};
146use mz_secrets::cache::CachingSecretsReader;
147use mz_secrets::{SecretsController, SecretsReader};
148use mz_sql::ast::{Raw, Statement};
149use mz_sql::catalog::{CatalogCluster, EnvironmentId};
150use mz_sql::names::{QualifiedItemName, ResolvedIds, SchemaSpecifier};
151use mz_sql::optimizer_metrics::OptimizerMetrics;
152use mz_sql::plan::{
153    self, AlterSinkPlan, ConnectionDetails, CreateConnectionPlan, HirRelationExpr,
154    NetworkPolicyRule, OnTimeoutAction, Params, QueryWhen,
155};
156use mz_sql::session::user::User;
157use mz_sql::session::vars::{MAX_CREDIT_CONSUMPTION_RATE, SystemVars, Var};
158use mz_sql_parser::ast::ExplainStage;
159use mz_sql_parser::ast::display::AstDisplay;
160use mz_storage_client::client::TableData;
161use mz_storage_client::controller::{CollectionDescription, DataSource, ExportDescription};
162use mz_storage_types::connections::Connection as StorageConnection;
163use mz_storage_types::connections::ConnectionContext;
164use mz_storage_types::connections::inline::{IntoInlineConnection, ReferencedConnection};
165use mz_storage_types::read_holds::ReadHold;
166use mz_storage_types::sinks::{S3SinkFormat, StorageSinkDesc};
167use mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC;
168use mz_storage_types::sources::{IngestionDescription, SourceExport, Timeline};
169use mz_timestamp_oracle::{TimestampOracleConfig, WriteTimestamp};
170use mz_transform::dataflow::DataflowMetainfo;
171use opentelemetry::trace::TraceContextExt;
172use serde::Serialize;
173use thiserror::Error;
174use timely::progress::{Antichain, Timestamp as _};
175use tokio::runtime::Handle as TokioHandle;
176use tokio::select;
177use tokio::sync::{OwnedMutexGuard, mpsc, oneshot, watch};
178use tokio::time::{Interval, MissedTickBehavior};
179use tracing::{Instrument, Level, Span, debug, info, info_span, span, warn};
180use tracing_opentelemetry::OpenTelemetrySpanExt;
181use uuid::Uuid;
182
183use crate::active_compute_sink::{ActiveComputeSink, ActiveCopyFrom};
184use crate::catalog::{BuiltinTableUpdate, Catalog, OpenCatalogResult};
185use crate::client::{Client, Handle};
186use crate::command::{Command, ExecuteResponse};
187use crate::config::{
188    ScopedParameters, ScopedParametersScope, SynchronizedParameters, SystemParameterFrontend,
189    SystemParameterSyncConfig, evaluate_scoped_parameters,
190};
191use crate::coord::appends::{
192    BuiltinTableAppendNotify, DeferredOp, GroupCommitPermit, PendingWriteTxn,
193};
194use crate::coord::caught_up::CaughtUpCheckContext;
195use crate::coord::cluster_scheduling::SchedulingDecision;
196use crate::coord::id_bundle::CollectionIdBundle;
197use crate::coord::introspection::IntrospectionSubscribe;
198use crate::coord::peek::PendingPeek;
199use crate::coord::statement_logging::StatementLogging;
200use crate::coord::timeline::{TimelineContext, TimelineState};
201use crate::coord::timestamp_selection::{TimestampContext, TimestampDetermination};
202use crate::coord::validity::PlanValidity;
203use crate::error::AdapterError;
204use crate::explain::insights::PlanInsightsContext;
205use crate::explain::optimizer_trace::{DispatchGuard, OptimizerTrace};
206use crate::metrics::Metrics;
207use crate::optimize::dataflows::{ComputeInstanceSnapshot, DataflowBuilder};
208use crate::optimize::{self, Optimize, OptimizerConfig};
209use crate::session::{EndTransactionAction, Session};
210use crate::statement_logging::{
211    StatementEndedExecutionReason, StatementLifecycleEvent, StatementLoggingId,
212};
213use crate::util::{ClientTransmitter, ResultExt, sort_topological};
214use crate::webhook::{WebhookAppenderInvalidator, WebhookConcurrencyLimiter};
215use crate::{AdapterNotice, ReadHolds, flags};
216
217pub(crate) mod appends;
218pub(crate) mod catalog_serving;
219pub(crate) mod cluster_scheduling;
220pub(crate) mod consistency;
221pub(crate) mod id_bundle;
222pub(crate) mod in_memory_oracle;
223pub(crate) mod peek;
224pub(crate) mod read_policy;
225pub(crate) mod read_then_write;
226pub(crate) mod sequencer;
227pub(crate) mod statement_logging;
228pub(crate) mod timeline;
229pub(crate) mod timestamp_selection;
230
231pub mod catalog_implications;
232mod caught_up;
233mod command_handler;
234mod ddl;
235pub(crate) mod group_sync;
236mod indexes;
237mod info_metrics;
238mod introspection;
239mod message_handler;
240mod privatelink_status;
241mod sql;
242mod validity;
243
244/// A pool of pre-allocated user IDs to avoid per-DDL persist writes.
245///
246/// IDs in the range `[next, upper)` are available for allocation.
247/// When exhausted, the pool must be refilled via the catalog.
248///
249/// # Correctness
250///
251/// The pool is owned by [`Coordinator`], which processes all requests
252/// on a single-threaded event loop. Because every access requires
253/// `&mut self` on the coordinator, there is no concurrent access to the
254/// pool — no additional synchronization is needed.
255///
256/// Global ID uniqueness is guaranteed because each refill calls
257/// [`Catalog::allocate_user_ids`], which performs a durable persist
258/// write that atomically reserves the entire batch before any IDs from
259/// it are handed out. If the process crashes after a refill but before
260/// all pre-allocated IDs are consumed, the unused IDs form harmless
261/// gaps in the sequence — user IDs are not required to be contiguous.
262///
263/// This guarantee holds even if multiple `environmentd` processes run
264/// concurrently. Each process has its own independent pool,
265/// but every refill goes through the shared persist-backed catalog,
266/// which serializes allocations across all callers. Two processes
267/// will therefore never receive overlapping ID ranges,
268/// for the same reason they could not before this pool existed.
269#[derive(Debug)]
270pub(crate) struct IdPool {
271    next: u64,
272    upper: u64,
273}
274
275impl IdPool {
276    /// Creates an empty pool.
277    pub fn empty() -> Self {
278        IdPool { next: 0, upper: 0 }
279    }
280
281    /// Allocates a single ID from the pool, returning `None` if exhausted.
282    pub fn allocate(&mut self) -> Option<u64> {
283        if self.next < self.upper {
284            let id = self.next;
285            self.next += 1;
286            Some(id)
287        } else {
288            None
289        }
290    }
291
292    /// Allocates `n` consecutive IDs from the pool, returning `None` if
293    /// insufficient IDs remain.
294    pub fn allocate_many(&mut self, n: u64) -> Option<Vec<u64>> {
295        if self.remaining() >= n {
296            let ids = (self.next..self.next + n).collect();
297            self.next += n;
298            Some(ids)
299        } else {
300            None
301        }
302    }
303
304    /// Returns the number of IDs remaining in the pool.
305    pub fn remaining(&self) -> u64 {
306        self.upper - self.next
307    }
308
309    /// Refills the pool with the given range `[next, upper)`.
310    pub fn refill(&mut self, next: u64, upper: u64) {
311        assert!(next <= upper, "invalid pool range: {next}..{upper}");
312        self.next = next;
313        self.upper = upper;
314    }
315}
316
317#[derive(Debug)]
318pub enum Message {
319    Command(OpenTelemetryContext, Command),
320    ControllerReady {
321        controller: ControllerReadiness,
322    },
323    PurifiedStatementReady(PurifiedStatementReady),
324    CreateConnectionValidationReady(CreateConnectionValidationReady),
325    AlterConnectionValidationReady(AlterConnectionValidationReady),
326    TryDeferred {
327        /// The connection that created this op.
328        conn_id: ConnectionId,
329        /// The write lock that notified us our deferred op might be able to run.
330        ///
331        /// Note: While we never want to hold a partial set of locks, it can be important to hold
332        /// onto the _one_ that notified us our op might be ready. If there are multiple operations
333        /// waiting on a single collection, and we don't hold this lock through retyring the op,
334        /// then everything waiting on this collection will get retried causing traffic in the
335        /// Coordinator's message queue.
336        ///
337        /// See [`DeferredOp::can_be_optimistically_retried`] for more detail.
338        acquired_lock: Option<(CatalogItemId, tokio::sync::OwnedMutexGuard<()>)>,
339    },
340    /// Initiates a group commit.
341    GroupCommitInitiate(Span, Option<GroupCommitPermit>),
342    DeferredStatementReady,
343    AdvanceTimelines,
344    ClusterEvent(ClusterEvent),
345    CancelPendingPeeks {
346        conn_id: ConnectionId,
347    },
348    LinearizeReads,
349    StagedBatches {
350        conn_id: ConnectionId,
351        table_id: CatalogItemId,
352        batches: Vec<Result<ProtoBatch, String>>,
353    },
354    StorageUsageSchedule,
355    StorageUsageFetch,
356    StorageUsageUpdate(ShardsUsageReferenced),
357    StorageUsagePrune(Vec<BuiltinTableUpdate>),
358    ArrangementSizesSchedule,
359    ArrangementSizesSnapshot,
360    ArrangementSizesPrune(Vec<BuiltinTableUpdate>),
361    /// Performs any cleanup and logging actions necessary for
362    /// finalizing a statement execution.
363    RetireExecute {
364        data: ExecuteContextExtra,
365        otel_ctx: OpenTelemetryContext,
366        reason: StatementEndedExecutionReason,
367    },
368    ExecuteSingleStatementTransaction {
369        ctx: ExecuteContext,
370        otel_ctx: OpenTelemetryContext,
371        stmt: Arc<Statement<Raw>>,
372        params: mz_sql::plan::Params,
373    },
374    PeekStageReady {
375        ctx: ExecuteContext,
376        span: Span,
377        stage: PeekStage,
378    },
379    CreateIndexStageReady {
380        ctx: ExecuteContext,
381        span: Span,
382        stage: CreateIndexStage,
383    },
384    CreateViewStageReady {
385        ctx: ExecuteContext,
386        span: Span,
387        stage: CreateViewStage,
388    },
389    CreateMaterializedViewStageReady {
390        ctx: ExecuteContext,
391        span: Span,
392        stage: CreateMaterializedViewStage,
393    },
394    SubscribeStageReady {
395        ctx: ExecuteContext,
396        span: Span,
397        stage: SubscribeStage,
398    },
399    IntrospectionSubscribeStageReady {
400        span: Span,
401        stage: IntrospectionSubscribeStage,
402    },
403    SecretStageReady {
404        ctx: ExecuteContext,
405        span: Span,
406        stage: SecretStage,
407    },
408    ClusterStageReady {
409        ctx: ExecuteContext,
410        span: Span,
411        stage: ClusterStage,
412    },
413    ExplainTimestampStageReady {
414        ctx: ExecuteContext,
415        span: Span,
416        stage: ExplainTimestampStage,
417    },
418    DrainStatementLog,
419    PrivateLinkVpcEndpointEvents(Vec<VpcEndpointEvent>),
420    CheckSchedulingPolicies,
421
422    /// Scheduling policy decisions about turning clusters On/Off.
423    /// `Vec<(policy name, Vec of decisions by the policy)>`
424    /// A cluster will be On if and only if there is at least one On decision for it.
425    /// Scheduling decisions for clusters that have `SCHEDULE = MANUAL` are ignored.
426    SchedulingDecisions(Vec<(&'static str, Vec<(ClusterId, SchedulingDecision)>)>),
427}
428
429impl Message {
430    /// Returns a string to identify the kind of [`Message`], useful for logging.
431    pub const fn kind(&self) -> &'static str {
432        match self {
433            Message::Command(_, msg) => match msg {
434                Command::CatalogSnapshot { .. } => "command-catalog_snapshot",
435                Command::Startup { .. } => "command-startup",
436                Command::Execute { .. } => "command-execute",
437                Command::Commit { .. } => "command-commit",
438                Command::CancelRequest { .. } => "command-cancel_request",
439                Command::PrivilegedCancelRequest { .. } => "command-privileged_cancel_request",
440                Command::GetWebhook { .. } => "command-get_webhook",
441                Command::GetSystemVars { .. } => "command-get_system_vars",
442                Command::SetSystemVars { .. } => "command-set_system_vars",
443                Command::UpdateScopedSystemParameters { .. } => {
444                    "command-update_scoped_system_parameters"
445                }
446                Command::InstallScopedSystemParameterFrontend { .. } => {
447                    "command-install_scoped_system_parameter_frontend"
448                }
449                Command::Terminate { .. } => "command-terminate",
450                Command::RetireExecute { .. } => "command-retire_execute",
451                Command::CheckConsistency { .. } => "command-check_consistency",
452                Command::Dump { .. } => "command-dump",
453                Command::AuthenticatePassword { .. } => "command-auth_check",
454                Command::AuthenticateGetSASLChallenge { .. } => "command-auth_get_sasl_challenge",
455                Command::AuthenticateVerifySASLProof { .. } => "command-auth_verify_sasl_proof",
456                Command::CheckRoleCanLogin { .. } => "command-check_role_can_login",
457                Command::GetComputeInstanceClient { .. } => "get-compute-instance-client",
458                Command::GetOracle { .. } => "get-oracle",
459                Command::DetermineRealTimeRecentTimestamp { .. } => {
460                    "determine-real-time-recent-timestamp"
461                }
462                Command::GetTransactionReadHoldsBundle { .. } => {
463                    "get-transaction-read-holds-bundle"
464                }
465                Command::StoreTransactionReadHolds { .. } => "store-transaction-read-holds",
466                Command::ExecuteSlowPathPeek { .. } => "execute-slow-path-peek",
467                Command::ExecuteSubscribe { .. } => "execute-subscribe",
468                Command::CopyToPreflight { .. } => "copy-to-preflight",
469                Command::ExecuteCopyTo { .. } => "execute-copy-to",
470                Command::ExecuteSideEffectingFunc { .. } => "execute-side-effecting-func",
471                Command::RegisterFrontendPeek { .. } => "register-frontend-peek",
472                Command::UnregisterFrontendPeek { .. } => "unregister-frontend-peek",
473                Command::ExplainTimestamp { .. } => "explain-timestamp",
474                Command::FrontendStatementLogging(..) => "frontend-statement-logging",
475                Command::StartCopyFromStdin { .. } => "start-copy-from-stdin",
476                Command::InjectAuditEvents { .. } => "inject-audit-events",
477            },
478            Message::ControllerReady {
479                controller: ControllerReadiness::Compute,
480            } => "controller_ready(compute)",
481            Message::ControllerReady {
482                controller: ControllerReadiness::Storage,
483            } => "controller_ready(storage)",
484            Message::ControllerReady {
485                controller: ControllerReadiness::Metrics,
486            } => "controller_ready(metrics)",
487            Message::ControllerReady {
488                controller: ControllerReadiness::Internal,
489            } => "controller_ready(internal)",
490            Message::PurifiedStatementReady(_) => "purified_statement_ready",
491            Message::CreateConnectionValidationReady(_) => "create_connection_validation_ready",
492            Message::TryDeferred { .. } => "try_deferred",
493            Message::GroupCommitInitiate(..) => "group_commit_initiate",
494            Message::AdvanceTimelines => "advance_timelines",
495            Message::ClusterEvent(_) => "cluster_event",
496            Message::CancelPendingPeeks { .. } => "cancel_pending_peeks",
497            Message::LinearizeReads => "linearize_reads",
498            Message::StagedBatches { .. } => "staged_batches",
499            Message::StorageUsageSchedule => "storage_usage_schedule",
500            Message::StorageUsageFetch => "storage_usage_fetch",
501            Message::StorageUsageUpdate(_) => "storage_usage_update",
502            Message::StorageUsagePrune(_) => "storage_usage_prune",
503            Message::ArrangementSizesSchedule => "arrangement_sizes_schedule",
504            Message::ArrangementSizesSnapshot => "arrangement_sizes_snapshot",
505            Message::ArrangementSizesPrune(_) => "arrangement_sizes_prune",
506            Message::RetireExecute { .. } => "retire_execute",
507            Message::ExecuteSingleStatementTransaction { .. } => {
508                "execute_single_statement_transaction"
509            }
510            Message::PeekStageReady { .. } => "peek_stage_ready",
511            Message::ExplainTimestampStageReady { .. } => "explain_timestamp_stage_ready",
512            Message::CreateIndexStageReady { .. } => "create_index_stage_ready",
513            Message::CreateViewStageReady { .. } => "create_view_stage_ready",
514            Message::CreateMaterializedViewStageReady { .. } => {
515                "create_materialized_view_stage_ready"
516            }
517            Message::SubscribeStageReady { .. } => "subscribe_stage_ready",
518            Message::IntrospectionSubscribeStageReady { .. } => {
519                "introspection_subscribe_stage_ready"
520            }
521            Message::SecretStageReady { .. } => "secret_stage_ready",
522            Message::ClusterStageReady { .. } => "cluster_stage_ready",
523            Message::DrainStatementLog => "drain_statement_log",
524            Message::AlterConnectionValidationReady(..) => "alter_connection_validation_ready",
525            Message::PrivateLinkVpcEndpointEvents(_) => "private_link_vpc_endpoint_events",
526            Message::CheckSchedulingPolicies => "check_scheduling_policies",
527            Message::SchedulingDecisions { .. } => "scheduling_decision",
528            Message::DeferredStatementReady => "deferred_statement_ready",
529        }
530    }
531}
532
533/// The reason for why a controller needs processing on the main loop.
534#[derive(Debug)]
535pub enum ControllerReadiness {
536    /// The storage controller is ready.
537    Storage,
538    /// The compute controller is ready.
539    Compute,
540    /// A batch of metric data is ready.
541    Metrics,
542    /// An internally-generated message is ready to be returned.
543    Internal,
544}
545
546#[derive(Derivative)]
547#[derivative(Debug)]
548pub struct BackgroundWorkResult<T> {
549    #[derivative(Debug = "ignore")]
550    pub ctx: ExecuteContext,
551    pub result: Result<T, AdapterError>,
552    pub params: Params,
553    pub plan_validity: PlanValidity,
554    pub original_stmt: Arc<Statement<Raw>>,
555    pub otel_ctx: OpenTelemetryContext,
556}
557
558pub type PurifiedStatementReady = BackgroundWorkResult<mz_sql::pure::PurifiedStatement>;
559
560#[derive(Derivative)]
561#[derivative(Debug)]
562pub struct ValidationReady<T> {
563    #[derivative(Debug = "ignore")]
564    pub ctx: ExecuteContext,
565    pub result: Result<T, AdapterError>,
566    pub resolved_ids: ResolvedIds,
567    pub connection_id: CatalogItemId,
568    pub connection_gid: GlobalId,
569    pub plan_validity: PlanValidity,
570    pub otel_ctx: OpenTelemetryContext,
571}
572
573pub type CreateConnectionValidationReady = ValidationReady<CreateConnectionPlan>;
574pub type AlterConnectionValidationReady = ValidationReady<Connection>;
575
576#[derive(Debug)]
577pub enum PeekStage {
578    /// Common stages across SELECT, EXPLAIN and COPY TO queries.
579    LinearizeTimestamp(PeekStageLinearizeTimestamp),
580    RealTimeRecency(PeekStageRealTimeRecency),
581    TimestampReadHold(PeekStageTimestampReadHold),
582    Optimize(PeekStageOptimize),
583    /// Final stage for a peek.
584    Finish(PeekStageFinish),
585    /// Final stage for an explain.
586    ExplainPlan(PeekStageExplainPlan),
587    ExplainPushdown(PeekStageExplainPushdown),
588    /// Preflight checks for a copy to operation.
589    CopyToPreflight(PeekStageCopyTo),
590    /// Final stage for a copy to which involves shipping the dataflow.
591    CopyToDataflow(PeekStageCopyTo),
592}
593
594#[derive(Debug)]
595pub struct CopyToContext {
596    /// The `RelationDesc` of the data to be copied.
597    pub desc: RelationDesc,
598    /// The destination uri of the external service where the data will be copied.
599    pub uri: Uri,
600    /// Connection information required to connect to the external service to copy the data.
601    pub connection: StorageConnection<ReferencedConnection>,
602    /// The ID of the CONNECTION object to be used for copying the data.
603    pub connection_id: CatalogItemId,
604    /// Format params to format the data.
605    pub format: S3SinkFormat,
606    /// Approximate max file size of each uploaded file.
607    pub max_file_size: u64,
608    /// Number of batches the output of the COPY TO will be partitioned into
609    /// to distribute the load across workers deterministically.
610    /// This is only an option since it's not set when CopyToContext is instantiated
611    /// but immediately after in the PeekStageValidate stage.
612    pub output_batch_count: Option<u64>,
613}
614
615#[derive(Debug)]
616pub struct PeekStageLinearizeTimestamp {
617    validity: PlanValidity,
618    plan: mz_sql::plan::SelectPlan,
619    max_query_result_size: Option<u64>,
620    source_ids: BTreeSet<GlobalId>,
621    target_replica: Option<ReplicaId>,
622    timeline_context: TimelineContext,
623    optimizer: optimize::PeekOptimizer,
624    /// An optional context set iff the state machine is initiated from
625    /// sequencing an EXPLAIN for this statement.
626    explain_ctx: ExplainContext,
627}
628
629#[derive(Debug)]
630pub struct PeekStageRealTimeRecency {
631    validity: PlanValidity,
632    plan: mz_sql::plan::SelectPlan,
633    max_query_result_size: Option<u64>,
634    source_ids: BTreeSet<GlobalId>,
635    target_replica: Option<ReplicaId>,
636    timeline_context: TimelineContext,
637    oracle_read_ts: Option<Timestamp>,
638    optimizer: optimize::PeekOptimizer,
639    /// An optional context set iff the state machine is initiated from
640    /// sequencing an EXPLAIN for this statement.
641    explain_ctx: ExplainContext,
642}
643
644#[derive(Debug)]
645pub struct PeekStageTimestampReadHold {
646    validity: PlanValidity,
647    plan: mz_sql::plan::SelectPlan,
648    max_query_result_size: Option<u64>,
649    source_ids: BTreeSet<GlobalId>,
650    target_replica: Option<ReplicaId>,
651    timeline_context: TimelineContext,
652    oracle_read_ts: Option<Timestamp>,
653    real_time_recency_ts: Option<mz_repr::Timestamp>,
654    optimizer: optimize::PeekOptimizer,
655    /// An optional context set iff the state machine is initiated from
656    /// sequencing an EXPLAIN for this statement.
657    explain_ctx: ExplainContext,
658}
659
660#[derive(Debug)]
661pub struct PeekStageOptimize {
662    validity: PlanValidity,
663    plan: mz_sql::plan::SelectPlan,
664    max_query_result_size: Option<u64>,
665    source_ids: BTreeSet<GlobalId>,
666    id_bundle: CollectionIdBundle,
667    target_replica: Option<ReplicaId>,
668    determination: TimestampDetermination,
669    optimizer: optimize::PeekOptimizer,
670    /// An optional context set iff the state machine is initiated from
671    /// sequencing an EXPLAIN for this statement.
672    explain_ctx: ExplainContext,
673}
674
675#[derive(Debug)]
676pub struct PeekStageFinish {
677    validity: PlanValidity,
678    plan: mz_sql::plan::SelectPlan,
679    max_query_result_size: Option<u64>,
680    id_bundle: CollectionIdBundle,
681    target_replica: Option<ReplicaId>,
682    source_ids: BTreeSet<GlobalId>,
683    determination: TimestampDetermination,
684    cluster_id: ComputeInstanceId,
685    finishing: RowSetFinishing,
686    /// When present, an optimizer trace to be used for emitting a plan insights
687    /// notice.
688    plan_insights_optimizer_trace: Option<OptimizerTrace>,
689    insights_ctx: Option<Box<PlanInsightsContext>>,
690    global_lir_plan: optimize::peek::GlobalLirPlan,
691    optimization_finished_at: EpochMillis,
692}
693
694#[derive(Debug)]
695pub struct PeekStageCopyTo {
696    validity: PlanValidity,
697    optimizer: optimize::copy_to::Optimizer,
698    global_lir_plan: optimize::copy_to::GlobalLirPlan,
699    optimization_finished_at: EpochMillis,
700    source_ids: BTreeSet<GlobalId>,
701}
702
703#[derive(Debug)]
704pub struct PeekStageExplainPlan {
705    validity: PlanValidity,
706    optimizer: optimize::peek::Optimizer,
707    df_meta: DataflowMetainfo,
708    explain_ctx: ExplainPlanContext,
709    insights_ctx: Option<Box<PlanInsightsContext>>,
710}
711
712#[derive(Debug)]
713pub struct PeekStageExplainPushdown {
714    validity: PlanValidity,
715    determination: TimestampDetermination,
716    imports: BTreeMap<GlobalId, MapFilterProject>,
717}
718
719#[derive(Debug)]
720pub enum CreateIndexStage {
721    Optimize(CreateIndexOptimize),
722    Finish(CreateIndexFinish),
723    Explain(CreateIndexExplain),
724}
725
726#[derive(Debug)]
727pub struct CreateIndexOptimize {
728    validity: PlanValidity,
729    plan: plan::CreateIndexPlan,
730    resolved_ids: ResolvedIds,
731    /// An optional context set iff the state machine is initiated from
732    /// sequencing an EXPLAIN for this statement.
733    explain_ctx: ExplainContext,
734}
735
736#[derive(Debug)]
737pub struct CreateIndexFinish {
738    validity: PlanValidity,
739    item_id: CatalogItemId,
740    global_id: GlobalId,
741    plan: plan::CreateIndexPlan,
742    resolved_ids: ResolvedIds,
743    global_mir_plan: optimize::index::GlobalMirPlan,
744    global_lir_plan: optimize::index::GlobalLirPlan,
745    optimizer_features: OptimizerFeatures,
746}
747
748#[derive(Debug)]
749pub struct CreateIndexExplain {
750    validity: PlanValidity,
751    exported_index_id: GlobalId,
752    plan: plan::CreateIndexPlan,
753    df_meta: DataflowMetainfo,
754    explain_ctx: ExplainPlanContext,
755}
756
757#[derive(Debug)]
758pub enum CreateViewStage {
759    Optimize(CreateViewOptimize),
760    Finish(CreateViewFinish),
761    Explain(CreateViewExplain),
762}
763
764#[derive(Debug)]
765pub struct CreateViewOptimize {
766    validity: PlanValidity,
767    plan: plan::CreateViewPlan,
768    resolved_ids: ResolvedIds,
769    /// An optional context set iff the state machine is initiated from
770    /// sequencing an EXPLAIN for this statement.
771    explain_ctx: ExplainContext,
772}
773
774#[derive(Debug)]
775pub struct CreateViewFinish {
776    validity: PlanValidity,
777    /// ID of this item in the Catalog.
778    item_id: CatalogItemId,
779    /// ID by with Compute will reference this View.
780    global_id: GlobalId,
781    plan: plan::CreateViewPlan,
782    /// IDs of objects resolved during name resolution.
783    resolved_ids: ResolvedIds,
784    optimized_expr: OptimizedMirRelationExpr,
785}
786
787#[derive(Debug)]
788pub struct CreateViewExplain {
789    validity: PlanValidity,
790    id: GlobalId,
791    plan: plan::CreateViewPlan,
792    explain_ctx: ExplainPlanContext,
793}
794
795#[derive(Debug)]
796pub enum ExplainTimestampStage {
797    Optimize(ExplainTimestampOptimize),
798    RealTimeRecency(ExplainTimestampRealTimeRecency),
799    Finish(ExplainTimestampFinish),
800}
801
802#[derive(Debug)]
803pub struct ExplainTimestampOptimize {
804    validity: PlanValidity,
805    plan: plan::ExplainTimestampPlan,
806    cluster_id: ClusterId,
807}
808
809#[derive(Debug)]
810pub struct ExplainTimestampRealTimeRecency {
811    validity: PlanValidity,
812    format: ExplainFormat,
813    optimized_plan: OptimizedMirRelationExpr,
814    cluster_id: ClusterId,
815    when: QueryWhen,
816}
817
818#[derive(Debug)]
819pub struct ExplainTimestampFinish {
820    validity: PlanValidity,
821    format: ExplainFormat,
822    optimized_plan: OptimizedMirRelationExpr,
823    cluster_id: ClusterId,
824    source_ids: BTreeSet<GlobalId>,
825    when: QueryWhen,
826    real_time_recency_ts: Option<Timestamp>,
827}
828
829#[derive(Debug)]
830pub enum ClusterStage {
831    Alter(AlterCluster),
832    WaitForHydrated(AlterClusterWaitForHydrated),
833    Finalize(AlterClusterFinalize),
834}
835
836#[derive(Debug)]
837pub struct AlterCluster {
838    validity: PlanValidity,
839    plan: plan::AlterClusterPlan,
840}
841
842#[derive(Debug)]
843pub struct AlterClusterWaitForHydrated {
844    validity: PlanValidity,
845    plan: plan::AlterClusterPlan,
846    new_config: ClusterVariantManaged,
847    workload_class: Option<String>,
848    timeout_time: Instant,
849    on_timeout: OnTimeoutAction,
850}
851
852#[derive(Debug)]
853pub struct AlterClusterFinalize {
854    validity: PlanValidity,
855    plan: plan::AlterClusterPlan,
856    new_config: ClusterVariantManaged,
857    workload_class: Option<String>,
858}
859
860#[derive(Debug)]
861pub enum ExplainContext {
862    /// The ordinary, non-explain variant of the statement.
863    None,
864    /// The `EXPLAIN <level> PLAN FOR <explainee>` version of the statement.
865    Plan(ExplainPlanContext),
866    /// Generate a notice containing the `EXPLAIN PLAN INSIGHTS` output
867    /// alongside the query's normal output.
868    PlanInsightsNotice(OptimizerTrace),
869    /// `EXPLAIN FILTER PUSHDOWN`
870    Pushdown,
871}
872
873impl ExplainContext {
874    /// If available for this context, wrap the [`OptimizerTrace`] into a
875    /// [`tracing::Dispatch`] and set it as default, returning the resulting
876    /// guard in a `Some(guard)` option.
877    pub(crate) fn dispatch_guard(&self) -> Option<DispatchGuard<'_>> {
878        let optimizer_trace = match self {
879            ExplainContext::Plan(explain_ctx) => Some(&explain_ctx.optimizer_trace),
880            ExplainContext::PlanInsightsNotice(optimizer_trace) => Some(optimizer_trace),
881            _ => None,
882        };
883        optimizer_trace.map(|optimizer_trace| optimizer_trace.as_guard())
884    }
885
886    pub(crate) fn needs_cluster(&self) -> bool {
887        match self {
888            ExplainContext::None => true,
889            ExplainContext::Plan(..) => false,
890            ExplainContext::PlanInsightsNotice(..) => true,
891            ExplainContext::Pushdown => false,
892        }
893    }
894
895    pub(crate) fn needs_plan_insights(&self) -> bool {
896        matches!(
897            self,
898            ExplainContext::Plan(ExplainPlanContext {
899                stage: ExplainStage::PlanInsights,
900                ..
901            }) | ExplainContext::PlanInsightsNotice(_)
902        )
903    }
904}
905
906#[derive(Debug)]
907pub struct ExplainPlanContext {
908    /// EXPLAIN BROKEN is internal syntax for showing EXPLAIN output despite an internal error in
909    /// the optimizer: we don't immediately bail out from peek sequencing when an internal optimizer
910    /// error happens, but go on with trying to show the requested EXPLAIN stage. This can still
911    /// succeed if the requested EXPLAIN stage is before the point where the error happened.
912    pub broken: bool,
913    pub config: ExplainConfig,
914    pub format: ExplainFormat,
915    pub stage: ExplainStage,
916    pub replan: Option<GlobalId>,
917    pub desc: Option<RelationDesc>,
918    pub optimizer_trace: OptimizerTrace,
919}
920
921#[derive(Debug)]
922pub enum CreateMaterializedViewStage {
923    Optimize(CreateMaterializedViewOptimize),
924    Finish(CreateMaterializedViewFinish),
925    Explain(CreateMaterializedViewExplain),
926}
927
928#[derive(Debug)]
929pub struct CreateMaterializedViewOptimize {
930    validity: PlanValidity,
931    plan: plan::CreateMaterializedViewPlan,
932    resolved_ids: ResolvedIds,
933    /// An optional context set iff the state machine is initiated from
934    /// sequencing an EXPLAIN for this statement.
935    explain_ctx: ExplainContext,
936}
937
938#[derive(Debug)]
939pub struct CreateMaterializedViewFinish {
940    /// The ID of this Materialized View in the Catalog.
941    item_id: CatalogItemId,
942    /// The ID of the durable pTVC backing this Materialized View.
943    global_id: GlobalId,
944    validity: PlanValidity,
945    plan: plan::CreateMaterializedViewPlan,
946    resolved_ids: ResolvedIds,
947    local_mir_plan: optimize::materialized_view::LocalMirPlan,
948    global_mir_plan: optimize::materialized_view::GlobalMirPlan,
949    global_lir_plan: optimize::materialized_view::GlobalLirPlan,
950    optimizer_features: OptimizerFeatures,
951}
952
953#[derive(Debug)]
954pub struct CreateMaterializedViewExplain {
955    global_id: GlobalId,
956    validity: PlanValidity,
957    plan: plan::CreateMaterializedViewPlan,
958    df_meta: DataflowMetainfo,
959    explain_ctx: ExplainPlanContext,
960}
961
962#[derive(Debug)]
963pub enum SubscribeStage {
964    OptimizeMir(SubscribeOptimizeMir),
965    TimestampOptimizeLir(SubscribeTimestampOptimizeLir),
966    Finish(SubscribeFinish),
967    Explain(SubscribeExplain),
968}
969
970#[derive(Debug)]
971pub struct SubscribeOptimizeMir {
972    validity: PlanValidity,
973    plan: plan::SubscribePlan,
974    timeline: TimelineContext,
975    dependency_ids: BTreeSet<GlobalId>,
976    cluster_id: ComputeInstanceId,
977    replica_id: Option<ReplicaId>,
978    /// An optional context set iff the state machine is initiated from
979    /// sequencing an EXPLAIN for this statement.
980    explain_ctx: ExplainContext,
981}
982
983#[derive(Debug)]
984pub struct SubscribeTimestampOptimizeLir {
985    validity: PlanValidity,
986    plan: plan::SubscribePlan,
987    timeline: TimelineContext,
988    optimizer: optimize::subscribe::Optimizer,
989    global_mir_plan: optimize::subscribe::GlobalMirPlan<optimize::subscribe::Unresolved>,
990    dependency_ids: BTreeSet<GlobalId>,
991    replica_id: Option<ReplicaId>,
992    /// An optional context set iff the state machine is initiated from
993    /// sequencing an EXPLAIN for this statement.
994    explain_ctx: ExplainContext,
995}
996
997#[derive(Debug)]
998pub struct SubscribeFinish {
999    validity: PlanValidity,
1000    cluster_id: ComputeInstanceId,
1001    replica_id: Option<ReplicaId>,
1002    plan: plan::SubscribePlan,
1003    global_lir_plan: optimize::subscribe::GlobalLirPlan,
1004    dependency_ids: BTreeSet<GlobalId>,
1005}
1006
1007#[derive(Debug)]
1008pub struct SubscribeExplain {
1009    validity: PlanValidity,
1010    optimizer: optimize::subscribe::Optimizer,
1011    df_meta: DataflowMetainfo,
1012    cluster_id: ComputeInstanceId,
1013    explain_ctx: ExplainPlanContext,
1014}
1015
1016#[derive(Debug)]
1017pub enum IntrospectionSubscribeStage {
1018    OptimizeMir(IntrospectionSubscribeOptimizeMir),
1019    TimestampOptimizeLir(IntrospectionSubscribeTimestampOptimizeLir),
1020    Finish(IntrospectionSubscribeFinish),
1021}
1022
1023#[derive(Debug)]
1024pub struct IntrospectionSubscribeOptimizeMir {
1025    validity: PlanValidity,
1026    plan: plan::SubscribePlan,
1027    subscribe_id: GlobalId,
1028    cluster_id: ComputeInstanceId,
1029    replica_id: ReplicaId,
1030}
1031
1032#[derive(Debug)]
1033pub struct IntrospectionSubscribeTimestampOptimizeLir {
1034    validity: PlanValidity,
1035    optimizer: optimize::subscribe::Optimizer,
1036    global_mir_plan: optimize::subscribe::GlobalMirPlan<optimize::subscribe::Unresolved>,
1037    cluster_id: ComputeInstanceId,
1038    replica_id: ReplicaId,
1039}
1040
1041#[derive(Debug)]
1042pub struct IntrospectionSubscribeFinish {
1043    validity: PlanValidity,
1044    global_lir_plan: optimize::subscribe::GlobalLirPlan,
1045    read_holds: ReadHolds,
1046    cluster_id: ComputeInstanceId,
1047    replica_id: ReplicaId,
1048}
1049
1050#[derive(Debug)]
1051pub enum SecretStage {
1052    CreateEnsure(CreateSecretEnsure),
1053    CreateFinish(CreateSecretFinish),
1054    RotateKeysEnsure(RotateKeysSecretEnsure),
1055    RotateKeysFinish(RotateKeysSecretFinish),
1056    Alter(AlterSecret),
1057}
1058
1059#[derive(Debug)]
1060pub struct CreateSecretEnsure {
1061    validity: PlanValidity,
1062    plan: plan::CreateSecretPlan,
1063}
1064
1065#[derive(Debug)]
1066pub struct CreateSecretFinish {
1067    validity: PlanValidity,
1068    item_id: CatalogItemId,
1069    global_id: GlobalId,
1070    plan: plan::CreateSecretPlan,
1071}
1072
1073#[derive(Debug)]
1074pub struct RotateKeysSecretEnsure {
1075    validity: PlanValidity,
1076    id: CatalogItemId,
1077}
1078
1079#[derive(Debug)]
1080pub struct RotateKeysSecretFinish {
1081    validity: PlanValidity,
1082    ops: Vec<crate::catalog::Op>,
1083}
1084
1085#[derive(Debug)]
1086pub struct AlterSecret {
1087    validity: PlanValidity,
1088    plan: plan::AlterSecretPlan,
1089}
1090
1091/// An enum describing which cluster to run a statement on.
1092///
1093/// One example usage would be that if a query depends only on system tables, we might
1094/// automatically run it on the catalog server cluster to benefit from indexes that exist there.
1095#[derive(Debug, Copy, Clone, PartialEq, Eq)]
1096pub enum TargetCluster {
1097    /// The catalog server cluster.
1098    CatalogServer,
1099    /// The current user's active cluster.
1100    Active,
1101    /// The cluster selected at the start of a transaction.
1102    Transaction(ClusterId),
1103}
1104
1105/// Result types for each stage of a sequence.
1106pub(crate) enum StageResult<T> {
1107    /// A task was spawned that will return the next stage.
1108    Handle(JoinHandle<Result<T, AdapterError>>),
1109    /// A task was spawned that will return a response for the client.
1110    HandleRetire(JoinHandle<Result<ExecuteResponse, AdapterError>>),
1111    /// The next stage is immediately ready and will execute.
1112    Immediate(T),
1113    /// The final stage was executed and is ready to respond to the client.
1114    Response(ExecuteResponse),
1115}
1116
1117/// Common functionality for [Coordinator::sequence_staged].
1118pub(crate) trait Staged: Send {
1119    type Ctx: StagedContext;
1120
1121    fn validity(&mut self) -> &mut PlanValidity;
1122
1123    /// Returns the next stage or final result.
1124    async fn stage(
1125        self,
1126        coord: &mut Coordinator,
1127        ctx: &mut Self::Ctx,
1128    ) -> Result<StageResult<Box<Self>>, AdapterError>;
1129
1130    /// Prepares a message for the Coordinator.
1131    fn message(self, ctx: Self::Ctx, span: Span) -> Message;
1132
1133    /// Whether it is safe to SQL cancel this stage.
1134    fn cancel_enabled(&self) -> bool;
1135}
1136
1137pub trait StagedContext {
1138    fn retire(self, result: Result<ExecuteResponse, AdapterError>);
1139    fn session(&self) -> Option<&Session>;
1140}
1141
1142impl StagedContext for ExecuteContext {
1143    fn retire(self, result: Result<ExecuteResponse, AdapterError>) {
1144        self.retire(result);
1145    }
1146
1147    fn session(&self) -> Option<&Session> {
1148        Some(self.session())
1149    }
1150}
1151
1152impl StagedContext for () {
1153    fn retire(self, _result: Result<ExecuteResponse, AdapterError>) {}
1154
1155    fn session(&self) -> Option<&Session> {
1156        None
1157    }
1158}
1159
1160/// Configures a coordinator.
1161pub struct Config {
1162    pub controller_config: ControllerConfig,
1163    pub controller_envd_epoch: NonZeroI64,
1164    pub storage: Box<dyn mz_catalog::durable::DurableCatalogState>,
1165    pub audit_logs_iterator: AuditLogIterator,
1166    pub timestamp_oracle_url: Option<SensitiveUrl>,
1167    pub unsafe_mode: bool,
1168    pub all_features: bool,
1169    pub build_info: &'static BuildInfo,
1170    pub environment_id: EnvironmentId,
1171    pub metrics_registry: MetricsRegistry,
1172    pub now: NowFn,
1173    pub secrets_controller: Arc<dyn SecretsController>,
1174    pub cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
1175    pub availability_zones: Vec<String>,
1176    pub cluster_replica_sizes: ClusterReplicaSizeMap,
1177    pub builtin_system_cluster_config: BootstrapBuiltinClusterConfig,
1178    pub builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig,
1179    pub builtin_probe_cluster_config: BootstrapBuiltinClusterConfig,
1180    pub builtin_support_cluster_config: BootstrapBuiltinClusterConfig,
1181    pub builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig,
1182    pub system_parameter_defaults: BTreeMap<String, String>,
1183    pub storage_usage_client: StorageUsageClient,
1184    pub storage_usage_collection_interval: Duration,
1185    pub storage_usage_retention_period: Option<Duration>,
1186    pub segment_client: Option<mz_segment::Client>,
1187    pub egress_addresses: Vec<IpNet>,
1188    pub remote_system_parameters: Option<BTreeMap<String, String>>,
1189    pub aws_account_id: Option<String>,
1190    pub aws_privatelink_availability_zones: Option<Vec<String>>,
1191    pub connection_context: ConnectionContext,
1192    pub connection_limit_callback: Box<dyn Fn(u64, u64) -> () + Send + Sync + 'static>,
1193    pub webhook_concurrency_limit: WebhookConcurrencyLimiter,
1194    pub http_host_name: Option<String>,
1195    pub tracing_handle: TracingHandle,
1196    /// Whether or not to start controllers in read-only mode. This is only
1197    /// meant for use during development of read-only clusters and 0dt upgrades
1198    /// and should go away once we have proper orchestration during upgrades.
1199    pub read_only_controllers: bool,
1200
1201    /// A trigger that signals that the current deployment has caught up with a
1202    /// previous deployment. Only used during 0dt deployment, while in read-only
1203    /// mode.
1204    pub caught_up_trigger: Option<Trigger>,
1205
1206    pub helm_chart_version: Option<String>,
1207    pub license_key: ValidatedLicenseKey,
1208    pub external_login_password_mz_system: Option<Password>,
1209    pub force_builtin_schema_migration: Option<String>,
1210}
1211
1212/// Metadata about an active connection.
1213#[derive(Debug, Serialize)]
1214pub struct ConnMeta {
1215    /// Pgwire specifies that every connection have a 32-bit secret associated
1216    /// with it, that is known to both the client and the server. Cancellation
1217    /// requests are required to authenticate with the secret of the connection
1218    /// that they are targeting.
1219    secret_key: u32,
1220    /// The time when the session's connection was initiated.
1221    connected_at: EpochMillis,
1222    user: User,
1223    application_name: String,
1224    uuid: Uuid,
1225    conn_id: ConnectionId,
1226    client_ip: Option<IpAddr>,
1227
1228    /// Sinks that will need to be dropped when the current transaction, if
1229    /// any, is cleared.
1230    drop_sinks: BTreeSet<GlobalId>,
1231
1232    /// Lock for the Coordinator's deferred statements that is dropped on transaction clear.
1233    #[serde(skip)]
1234    deferred_lock: Option<OwnedMutexGuard<()>>,
1235
1236    /// Cluster reconfigurations that will need to be
1237    /// cleaned up when the current transaction is cleared
1238    pending_cluster_alters: BTreeSet<ClusterId>,
1239
1240    /// Channel on which to send notices to a session.
1241    #[serde(skip)]
1242    notice_tx: mpsc::UnboundedSender<AdapterNotice>,
1243
1244    /// The role that initiated the database context. Fixed for the duration of the connection.
1245    /// WARNING: This role reference is not updated when the role is dropped.
1246    /// Consumers should not assume that this role exist.
1247    authenticated_role: RoleId,
1248}
1249
1250impl ConnMeta {
1251    pub fn conn_id(&self) -> &ConnectionId {
1252        &self.conn_id
1253    }
1254
1255    pub fn user(&self) -> &User {
1256        &self.user
1257    }
1258
1259    pub fn application_name(&self) -> &str {
1260        &self.application_name
1261    }
1262
1263    pub fn authenticated_role_id(&self) -> &RoleId {
1264        &self.authenticated_role
1265    }
1266
1267    pub fn uuid(&self) -> Uuid {
1268        self.uuid
1269    }
1270
1271    pub fn client_ip(&self) -> Option<IpAddr> {
1272        self.client_ip
1273    }
1274
1275    pub fn connected_at(&self) -> EpochMillis {
1276        self.connected_at
1277    }
1278}
1279
1280#[derive(Debug)]
1281/// A pending transaction waiting to be committed.
1282pub struct PendingTxn {
1283    /// Context used to send a response back to the client.
1284    ctx: ExecuteContext,
1285    /// Client response for transaction.
1286    response: Result<PendingTxnResponse, AdapterError>,
1287    /// The action to take at the end of the transaction.
1288    action: EndTransactionAction,
1289}
1290
1291#[derive(Debug)]
1292/// The response we'll send for a [`PendingTxn`].
1293pub enum PendingTxnResponse {
1294    /// The transaction will be committed.
1295    Committed {
1296        /// Parameters that will change, and their values, once this transaction is complete.
1297        params: BTreeMap<&'static str, String>,
1298    },
1299    /// The transaction will be rolled back.
1300    Rolledback {
1301        /// Parameters that will change, and their values, once this transaction is complete.
1302        params: BTreeMap<&'static str, String>,
1303    },
1304}
1305
1306impl PendingTxnResponse {
1307    pub fn extend_params(&mut self, p: impl IntoIterator<Item = (&'static str, String)>) {
1308        match self {
1309            PendingTxnResponse::Committed { params }
1310            | PendingTxnResponse::Rolledback { params } => params.extend(p),
1311        }
1312    }
1313}
1314
1315impl From<PendingTxnResponse> for ExecuteResponse {
1316    fn from(value: PendingTxnResponse) -> Self {
1317        match value {
1318            PendingTxnResponse::Committed { params } => {
1319                ExecuteResponse::TransactionCommitted { params }
1320            }
1321            PendingTxnResponse::Rolledback { params } => {
1322                ExecuteResponse::TransactionRolledBack { params }
1323            }
1324        }
1325    }
1326}
1327
1328#[derive(Debug)]
1329/// A pending read transaction waiting to be linearized along with metadata about it's state
1330pub struct PendingReadTxn {
1331    /// The transaction type
1332    txn: PendingRead,
1333    /// The timestamp context of the transaction.
1334    timestamp_context: TimestampContext,
1335    /// When we created this pending txn, when the transaction ends. Only used for metrics.
1336    created: Instant,
1337    /// Number of times we requeued the processing of this pending read txn.
1338    /// Requeueing is necessary if the time we executed the query is after the current oracle time;
1339    /// see [`Coordinator::message_linearize_reads`] for more details.
1340    num_requeues: u64,
1341    /// Telemetry context.
1342    otel_ctx: OpenTelemetryContext,
1343}
1344
1345impl PendingReadTxn {
1346    /// Return the timestamp context of the pending read transaction.
1347    pub fn timestamp_context(&self) -> &TimestampContext {
1348        &self.timestamp_context
1349    }
1350
1351    pub(crate) fn take_context(self) -> ExecuteContext {
1352        self.txn.take_context()
1353    }
1354}
1355
1356#[derive(Debug)]
1357/// A pending read transaction waiting to be linearized.
1358enum PendingRead {
1359    Read {
1360        /// The inner transaction.
1361        txn: PendingTxn,
1362    },
1363    ReadThenWrite {
1364        /// Context used to send a response back to the client.
1365        ctx: ExecuteContext,
1366        /// Channel used to alert the transaction that the read has been linearized and send back
1367        /// `ctx`.
1368        tx: oneshot::Sender<Option<ExecuteContext>>,
1369    },
1370}
1371
1372impl PendingRead {
1373    /// Alert the client that the read has been linearized.
1374    ///
1375    /// If it is necessary to finalize an execute, return the state necessary to do so
1376    /// (execution context and result)
1377    #[instrument(level = "debug")]
1378    pub fn finish(self) -> Option<(ExecuteContext, Result<ExecuteResponse, AdapterError>)> {
1379        match self {
1380            PendingRead::Read {
1381                txn:
1382                    PendingTxn {
1383                        mut ctx,
1384                        response,
1385                        action,
1386                    },
1387                ..
1388            } => {
1389                let changed = ctx.session_mut().vars_mut().end_transaction(action);
1390                // Append any parameters that changed to the response.
1391                let response = response.map(|mut r| {
1392                    r.extend_params(changed);
1393                    ExecuteResponse::from(r)
1394                });
1395
1396                Some((ctx, response))
1397            }
1398            PendingRead::ReadThenWrite { ctx, tx, .. } => {
1399                // Ignore errors if the caller has hung up.
1400                let _ = tx.send(Some(ctx));
1401                None
1402            }
1403        }
1404    }
1405
1406    fn label(&self) -> &'static str {
1407        match self {
1408            PendingRead::Read { .. } => "read",
1409            PendingRead::ReadThenWrite { .. } => "read_then_write",
1410        }
1411    }
1412
1413    pub(crate) fn take_context(self) -> ExecuteContext {
1414        match self {
1415            PendingRead::Read { txn, .. } => txn.ctx,
1416            PendingRead::ReadThenWrite { ctx, tx, .. } => {
1417                // Inform the transaction that we've taken their context.
1418                // Ignore errors if the caller has hung up.
1419                let _ = tx.send(None);
1420                ctx
1421            }
1422        }
1423    }
1424}
1425
1426/// State that the coordinator must process as part of retiring
1427/// command execution.  `ExecuteContextExtra::Default` is guaranteed
1428/// to produce a value that will cause the coordinator to do nothing, and
1429/// is intended for use by code that invokes the execution processing flow
1430/// (i.e., `sequence_plan`) without actually being a statement execution.
1431///
1432/// This is a pure data struct containing only the statement logging ID.
1433/// For auto-retire-on-drop behavior, use `ExecuteContextGuard` which wraps
1434/// this struct and owns the channel for sending retirement messages.
1435#[derive(Debug, Default)]
1436#[must_use]
1437pub struct ExecuteContextExtra {
1438    statement_uuid: Option<StatementLoggingId>,
1439}
1440
1441impl ExecuteContextExtra {
1442    pub(crate) fn new(statement_uuid: Option<StatementLoggingId>) -> Self {
1443        Self { statement_uuid }
1444    }
1445    pub fn is_trivial(&self) -> bool {
1446        self.statement_uuid.is_none()
1447    }
1448    pub fn contents(&self) -> Option<StatementLoggingId> {
1449        self.statement_uuid
1450    }
1451    /// Consume this extra and return the statement UUID for retirement.
1452    /// This should only be called from code that knows what to do to finish
1453    /// up logging based on the inner value.
1454    #[must_use]
1455    pub(crate) fn retire(self) -> Option<StatementLoggingId> {
1456        self.statement_uuid
1457    }
1458}
1459
1460/// A guard that wraps `ExecuteContextExtra` and owns a channel for sending
1461/// retirement messages to the coordinator.
1462///
1463/// If this guard is dropped with a `Some` `statement_uuid` in its inner
1464/// `ExecuteContextExtra`, the `Drop` implementation will automatically send a
1465/// `Message::RetireExecute` to log the statement ending.
1466/// This handles cases like connection drops where the context cannot be
1467/// explicitly retired.
1468/// See <https://github.com/MaterializeInc/database-issues/issues/7304>
1469#[derive(Debug)]
1470#[must_use]
1471pub struct ExecuteContextGuard {
1472    extra: ExecuteContextExtra,
1473    /// Channel for sending messages to the coordinator. Used for auto-retiring on drop.
1474    /// For `Default` instances, this is a dummy sender (receiver already dropped), so
1475    /// sends will fail silently - which is the desired behavior since Default instances
1476    /// should only be used for non-logged statements.
1477    coordinator_tx: mpsc::UnboundedSender<Message>,
1478}
1479
1480impl Default for ExecuteContextGuard {
1481    fn default() -> Self {
1482        // Create a dummy sender by immediately dropping the receiver.
1483        // Any send on this channel will fail silently, which is the desired
1484        // behavior for Default instances (non-logged statements).
1485        let (tx, _rx) = mpsc::unbounded_channel();
1486        Self {
1487            extra: ExecuteContextExtra::default(),
1488            coordinator_tx: tx,
1489        }
1490    }
1491}
1492
1493impl ExecuteContextGuard {
1494    pub(crate) fn new(
1495        statement_uuid: Option<StatementLoggingId>,
1496        coordinator_tx: mpsc::UnboundedSender<Message>,
1497    ) -> Self {
1498        Self {
1499            extra: ExecuteContextExtra::new(statement_uuid),
1500            coordinator_tx,
1501        }
1502    }
1503    pub fn is_trivial(&self) -> bool {
1504        self.extra.is_trivial()
1505    }
1506    pub fn contents(&self) -> Option<StatementLoggingId> {
1507        self.extra.contents()
1508    }
1509    /// Take responsibility for the contents.  This should only be
1510    /// called from code that knows what to do to finish up logging
1511    /// based on the inner value.
1512    ///
1513    /// Returns the inner `ExecuteContextExtra`, consuming the guard without
1514    /// triggering the auto-retire behavior.
1515    pub(crate) fn defuse(mut self) -> ExecuteContextExtra {
1516        // Taking statement_uuid prevents the Drop impl from sending a retire message
1517        std::mem::take(&mut self.extra)
1518    }
1519}
1520
1521impl Drop for ExecuteContextGuard {
1522    fn drop(&mut self) {
1523        if let Some(statement_uuid) = self.extra.statement_uuid.take() {
1524            // Auto-retire since the guard was dropped without explicit retirement (likely due
1525            // to connection drop).
1526            let msg = Message::RetireExecute {
1527                data: ExecuteContextExtra {
1528                    statement_uuid: Some(statement_uuid),
1529                },
1530                otel_ctx: OpenTelemetryContext::obtain(),
1531                reason: StatementEndedExecutionReason::Aborted,
1532            };
1533            // Send may fail for Default instances (dummy sender), which is fine since
1534            // Default instances should only be used for non-logged statements.
1535            let _ = self.coordinator_tx.send(msg);
1536        }
1537    }
1538}
1539
1540/// Bundle of state related to statement execution.
1541///
1542/// This struct collects a bundle of state that needs to be threaded
1543/// through various functions as part of statement execution.
1544/// It is used to finalize execution, by calling `retire`. Finalizing execution
1545/// involves sending the session back to the pgwire layer so that it
1546/// may be used to process further commands. It also involves
1547/// performing some work on the main coordinator thread
1548/// (e.g., recording the time at which the statement finished
1549/// executing). The state necessary to perform this work is bundled in
1550/// the `ExecuteContextGuard` object.
1551#[derive(Debug)]
1552pub struct ExecuteContext {
1553    inner: Box<ExecuteContextInner>,
1554}
1555
1556impl std::ops::Deref for ExecuteContext {
1557    type Target = ExecuteContextInner;
1558    fn deref(&self) -> &Self::Target {
1559        &*self.inner
1560    }
1561}
1562
1563impl std::ops::DerefMut for ExecuteContext {
1564    fn deref_mut(&mut self) -> &mut Self::Target {
1565        &mut *self.inner
1566    }
1567}
1568
1569#[derive(Debug)]
1570pub struct ExecuteContextInner {
1571    tx: ClientTransmitter<ExecuteResponse>,
1572    internal_cmd_tx: mpsc::UnboundedSender<Message>,
1573    session: Session,
1574    extra: ExecuteContextGuard,
1575}
1576
1577impl ExecuteContext {
1578    pub fn session(&self) -> &Session {
1579        &self.session
1580    }
1581
1582    pub fn session_mut(&mut self) -> &mut Session {
1583        &mut self.session
1584    }
1585
1586    pub fn tx(&self) -> &ClientTransmitter<ExecuteResponse> {
1587        &self.tx
1588    }
1589
1590    pub fn tx_mut(&mut self) -> &mut ClientTransmitter<ExecuteResponse> {
1591        &mut self.tx
1592    }
1593
1594    pub fn from_parts(
1595        tx: ClientTransmitter<ExecuteResponse>,
1596        internal_cmd_tx: mpsc::UnboundedSender<Message>,
1597        session: Session,
1598        extra: ExecuteContextGuard,
1599    ) -> Self {
1600        Self {
1601            inner: ExecuteContextInner {
1602                tx,
1603                session,
1604                extra,
1605                internal_cmd_tx,
1606            }
1607            .into(),
1608        }
1609    }
1610
1611    /// By calling this function, the caller takes responsibility for
1612    /// dealing with the instance of `ExecuteContextGuard`. This is
1613    /// intended to support protocols (like `COPY FROM`) that involve
1614    /// multiple passes of sending the session back and forth between
1615    /// the coordinator and the pgwire layer. As part of any such
1616    /// protocol, we must ensure that the `ExecuteContextGuard`
1617    /// (possibly wrapped in a new `ExecuteContext`) is passed back to the coordinator for
1618    /// eventual retirement.
1619    pub fn into_parts(
1620        self,
1621    ) -> (
1622        ClientTransmitter<ExecuteResponse>,
1623        mpsc::UnboundedSender<Message>,
1624        Session,
1625        ExecuteContextGuard,
1626    ) {
1627        let ExecuteContextInner {
1628            tx,
1629            internal_cmd_tx,
1630            session,
1631            extra,
1632        } = *self.inner;
1633        (tx, internal_cmd_tx, session, extra)
1634    }
1635
1636    /// Retire the execution, by sending a message to the coordinator.
1637    #[instrument(level = "debug")]
1638    pub fn retire(self, result: Result<ExecuteResponse, AdapterError>) {
1639        let ExecuteContextInner {
1640            tx,
1641            internal_cmd_tx,
1642            session,
1643            extra,
1644        } = *self.inner;
1645        let reason = if extra.is_trivial() {
1646            None
1647        } else {
1648            Some((&result).into())
1649        };
1650        tx.send(result, session);
1651        if let Some(reason) = reason {
1652            // Retire the guard to get the inner ExecuteContextExtra without triggering auto-retire
1653            let extra = extra.defuse();
1654            if let Err(e) = internal_cmd_tx.send(Message::RetireExecute {
1655                otel_ctx: OpenTelemetryContext::obtain(),
1656                data: extra,
1657                reason,
1658            }) {
1659                warn!("internal_cmd_rx dropped before we could send: {:?}", e);
1660            }
1661        }
1662    }
1663
1664    pub fn extra(&self) -> &ExecuteContextGuard {
1665        &self.extra
1666    }
1667
1668    pub fn extra_mut(&mut self) -> &mut ExecuteContextGuard {
1669        &mut self.extra
1670    }
1671}
1672
1673#[derive(Debug)]
1674struct ClusterReplicaStatuses(
1675    BTreeMap<ClusterId, BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>>>,
1676);
1677
1678impl ClusterReplicaStatuses {
1679    pub(crate) fn new() -> ClusterReplicaStatuses {
1680        ClusterReplicaStatuses(BTreeMap::new())
1681    }
1682
1683    /// Initializes the statuses of the specified cluster.
1684    ///
1685    /// Panics if the cluster statuses are already initialized.
1686    pub(crate) fn initialize_cluster_statuses(&mut self, cluster_id: ClusterId) {
1687        let prev = self.0.insert(cluster_id, BTreeMap::new());
1688        assert_eq!(
1689            prev, None,
1690            "cluster {cluster_id} statuses already initialized"
1691        );
1692    }
1693
1694    /// Initializes the statuses of the specified cluster replica.
1695    ///
1696    /// Panics if the cluster replica statuses are already initialized.
1697    pub(crate) fn initialize_cluster_replica_statuses(
1698        &mut self,
1699        cluster_id: ClusterId,
1700        replica_id: ReplicaId,
1701        num_processes: usize,
1702        time: DateTime<Utc>,
1703    ) {
1704        tracing::info!(
1705            ?cluster_id,
1706            ?replica_id,
1707            ?time,
1708            "initializing cluster replica status"
1709        );
1710        let replica_statuses = self.0.entry(cluster_id).or_default();
1711        let process_statuses = (0..num_processes)
1712            .map(|process_id| {
1713                let status = ClusterReplicaProcessStatus {
1714                    status: ClusterStatus::Offline(Some(OfflineReason::Initializing)),
1715                    time: time.clone(),
1716                };
1717                (u64::cast_from(process_id), status)
1718            })
1719            .collect();
1720        let prev = replica_statuses.insert(replica_id, process_statuses);
1721        assert_none!(
1722            prev,
1723            "cluster replica {cluster_id}.{replica_id} statuses already initialized"
1724        );
1725    }
1726
1727    /// Removes the statuses of the specified cluster.
1728    ///
1729    /// Panics if the cluster does not exist.
1730    pub(crate) fn remove_cluster_statuses(
1731        &mut self,
1732        cluster_id: &ClusterId,
1733    ) -> BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>> {
1734        let prev = self.0.remove(cluster_id);
1735        prev.unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"))
1736    }
1737
1738    /// Removes the statuses of the specified cluster replica.
1739    ///
1740    /// Panics if the cluster or replica does not exist.
1741    pub(crate) fn remove_cluster_replica_statuses(
1742        &mut self,
1743        cluster_id: &ClusterId,
1744        replica_id: &ReplicaId,
1745    ) -> BTreeMap<ProcessId, ClusterReplicaProcessStatus> {
1746        let replica_statuses = self
1747            .0
1748            .get_mut(cluster_id)
1749            .unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"));
1750        let prev = replica_statuses.remove(replica_id);
1751        prev.unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1752    }
1753
1754    /// Inserts or updates the status of the specified cluster replica process.
1755    ///
1756    /// Panics if the cluster or replica does not exist.
1757    pub(crate) fn ensure_cluster_status(
1758        &mut self,
1759        cluster_id: ClusterId,
1760        replica_id: ReplicaId,
1761        process_id: ProcessId,
1762        status: ClusterReplicaProcessStatus,
1763    ) {
1764        let replica_statuses = self
1765            .0
1766            .get_mut(&cluster_id)
1767            .unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"))
1768            .get_mut(&replica_id)
1769            .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"));
1770        replica_statuses.insert(process_id, status);
1771    }
1772
1773    /// Computes the status of the cluster replica as a whole.
1774    ///
1775    /// Panics if `cluster_id` or `replica_id` don't exist.
1776    pub fn get_cluster_replica_status(
1777        &self,
1778        cluster_id: ClusterId,
1779        replica_id: ReplicaId,
1780    ) -> ClusterStatus {
1781        let process_status = self.get_cluster_replica_statuses(cluster_id, replica_id);
1782        Self::cluster_replica_status(process_status)
1783    }
1784
1785    /// Computes the status of the cluster replica as a whole.
1786    pub fn cluster_replica_status(
1787        process_status: &BTreeMap<ProcessId, ClusterReplicaProcessStatus>,
1788    ) -> ClusterStatus {
1789        process_status
1790            .values()
1791            .fold(ClusterStatus::Online, |s, p| match (s, p.status) {
1792                (ClusterStatus::Online, ClusterStatus::Online) => ClusterStatus::Online,
1793                (x, y) => {
1794                    let reason_x = match x {
1795                        ClusterStatus::Offline(reason) => reason,
1796                        ClusterStatus::Online => None,
1797                    };
1798                    let reason_y = match y {
1799                        ClusterStatus::Offline(reason) => reason,
1800                        ClusterStatus::Online => None,
1801                    };
1802                    // Arbitrarily pick the first known not-ready reason.
1803                    ClusterStatus::Offline(reason_x.or(reason_y))
1804                }
1805            })
1806    }
1807
1808    /// Gets the statuses of the given cluster replica.
1809    ///
1810    /// Panics if the cluster or replica does not exist
1811    pub(crate) fn get_cluster_replica_statuses(
1812        &self,
1813        cluster_id: ClusterId,
1814        replica_id: ReplicaId,
1815    ) -> &BTreeMap<ProcessId, ClusterReplicaProcessStatus> {
1816        self.try_get_cluster_replica_statuses(cluster_id, replica_id)
1817            .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1818    }
1819
1820    /// Gets the statuses of the given cluster replica.
1821    pub(crate) fn try_get_cluster_replica_statuses(
1822        &self,
1823        cluster_id: ClusterId,
1824        replica_id: ReplicaId,
1825    ) -> Option<&BTreeMap<ProcessId, ClusterReplicaProcessStatus>> {
1826        self.try_get_cluster_statuses(cluster_id)
1827            .and_then(|statuses| statuses.get(&replica_id))
1828    }
1829
1830    /// Gets the statuses of the given cluster.
1831    pub(crate) fn try_get_cluster_statuses(
1832        &self,
1833        cluster_id: ClusterId,
1834    ) -> Option<&BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>>> {
1835        self.0.get(&cluster_id)
1836    }
1837}
1838
1839/// Glues the external world to the Timely workers.
1840#[derive(Derivative)]
1841#[derivative(Debug)]
1842pub struct Coordinator {
1843    /// The controller for the storage and compute layers.
1844    #[derivative(Debug = "ignore")]
1845    controller: mz_controller::Controller,
1846    /// The catalog in an Arc suitable for readonly references. The Arc allows
1847    /// us to hand out cheap copies of the catalog to functions that can use it
1848    /// off of the main coordinator thread. If the coordinator needs to mutate
1849    /// the catalog, call [`Self::catalog_mut`], which will clone this struct member,
1850    /// allowing it to be mutated here while the other off-thread references can
1851    /// read their catalog as long as needed. In the future we would like this
1852    /// to be a pTVC, but for now this is sufficient.
1853    catalog: Arc<Catalog>,
1854
1855    /// A client for persist. Initially, this is only used for reading stashed
1856    /// peek responses out of batches.
1857    persist_client: PersistClient,
1858
1859    /// Channel to manage internal commands from the coordinator to itself.
1860    internal_cmd_tx: mpsc::UnboundedSender<Message>,
1861    /// Notification that triggers a group commit.
1862    group_commit_tx: appends::GroupCommitNotifier,
1863
1864    /// Channel for strict serializable reads ready to commit.
1865    strict_serializable_reads_tx: mpsc::UnboundedSender<(ConnectionId, PendingReadTxn)>,
1866
1867    /// Mechanism for totally ordering write and read timestamps, so that all reads
1868    /// reflect exactly the set of writes that precede them, and no writes that follow.
1869    global_timelines: BTreeMap<Timeline, TimelineState>,
1870
1871    /// A generator for transient [`GlobalId`]s, shareable with other threads.
1872    transient_id_gen: Arc<TransientIdGen>,
1873    /// A map from connection ID to metadata about that connection for all
1874    /// active connections.
1875    active_conns: BTreeMap<ConnectionId, ConnMeta>,
1876
1877    /// For each transaction, the read holds taken to support any performed reads.
1878    ///
1879    /// Upon completing a transaction, these read holds should be dropped.
1880    txn_read_holds: BTreeMap<ConnectionId, read_policy::ReadHolds>,
1881
1882    /// Access to the peek fields should be restricted to methods in the [`peek`] API.
1883    /// A map from pending peek ids to the queue into which responses are sent, and
1884    /// the connection id of the client that initiated the peek.
1885    pending_peeks: BTreeMap<Uuid, PendingPeek>,
1886    /// A map from client connection ids to a set of all pending peeks for that client.
1887    client_pending_peeks: BTreeMap<ConnectionId, BTreeMap<Uuid, ClusterId>>,
1888
1889    /// A map from client connection ids to pending linearize read transaction.
1890    pending_linearize_read_txns: BTreeMap<ConnectionId, PendingReadTxn>,
1891
1892    /// A map from the compute sink ID to it's state description.
1893    active_compute_sinks: BTreeMap<GlobalId, ActiveComputeSink>,
1894    /// A map from active webhooks to their invalidation handle.
1895    active_webhooks: BTreeMap<CatalogItemId, WebhookAppenderInvalidator>,
1896    /// A map of active `COPY FROM` statements. The Coordinator waits for `clusterd`
1897    /// to stage Batches in Persist that we will then link into the shard.
1898    active_copies: BTreeMap<ConnectionId, ActiveCopyFrom>,
1899
1900    /// Connection-scoped cancellation watches.
1901    ///
1902    /// Each entry is a watch channel whose value is `false` until cancellation
1903    /// is requested for that connection, at which point it is set to `true`.
1904    ///
1905    /// Consumers install/remove these watches while they have cancellable work
1906    /// in flight.
1907    connection_cancel_watches: BTreeMap<ConnectionId, (watch::Sender<bool>, watch::Receiver<bool>)>,
1908    /// Active introspection subscribes.
1909    introspection_subscribes: BTreeMap<GlobalId, IntrospectionSubscribe>,
1910
1911    /// Locks that grant access to a specific object, populated lazily as objects are written to.
1912    write_locks: BTreeMap<CatalogItemId, Arc<tokio::sync::Mutex<()>>>,
1913    /// Plans that are currently deferred and waiting on a write lock.
1914    deferred_write_ops: BTreeMap<ConnectionId, DeferredOp>,
1915
1916    /// Pending writes waiting for a group commit.
1917    pending_writes: Vec<PendingWriteTxn>,
1918
1919    /// For the realtime timeline, an explicit SELECT or INSERT on a table will bump the
1920    /// table's timestamps, but there are cases where timestamps are not bumped but
1921    /// we expect the closed timestamps to advance (`AS OF X`, SUBSCRIBing views over
1922    /// RT sources and tables). To address these, spawn a task that forces table
1923    /// timestamps to close on a regular interval. This roughly tracks the behavior
1924    /// of realtime sources that close off timestamps on an interval.
1925    ///
1926    /// For non-realtime timelines, nothing pushes the timestamps forward, so we must do
1927    /// it manually.
1928    advance_timelines_interval: Interval,
1929
1930    /// Serialized DDL. DDL must be serialized because:
1931    /// - Many of them do off-thread work and need to verify the catalog is in a valid state, but
1932    ///   [`PlanValidity`] does not currently support tracking all changes. Doing that correctly
1933    ///   seems to be more difficult than it's worth, so we would instead re-plan and re-sequence
1934    ///   the statements.
1935    /// - Re-planning a statement is hard because Coordinator and Session state is mutated at
1936    ///   various points, and we would need to correctly reset those changes before re-planning and
1937    ///   re-sequencing.
1938    serialized_ddl: LockedVecDeque<DeferredPlanStatement>,
1939
1940    /// Handle to secret manager that can create and delete secrets from
1941    /// an arbitrary secret storage engine.
1942    secrets_controller: Arc<dyn SecretsController>,
1943    /// A secrets reader than maintains an in-memory cache, where values have a set TTL.
1944    caching_secrets_reader: CachingSecretsReader,
1945
1946    /// Handle to a manager that can create and delete kubernetes resources
1947    /// (ie: VpcEndpoint objects)
1948    cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
1949
1950    /// Persist client for fetching storage metadata such as size metrics.
1951    storage_usage_client: StorageUsageClient,
1952    /// The interval at which to collect storage usage information.
1953    storage_usage_collection_interval: Duration,
1954
1955    /// Segment analytics client.
1956    #[derivative(Debug = "ignore")]
1957    segment_client: Option<mz_segment::Client>,
1958
1959    /// Coordinator metrics.
1960    metrics: Metrics,
1961    /// Optimizer metrics.
1962    optimizer_metrics: OptimizerMetrics,
1963
1964    /// Tracing handle.
1965    tracing_handle: TracingHandle,
1966
1967    /// Data used by the statement logging feature.
1968    statement_logging: StatementLogging,
1969
1970    /// Limit for how many concurrent webhook requests we allow.
1971    webhook_concurrency_limit: WebhookConcurrencyLimiter,
1972
1973    /// Optional config for the timestamp oracle. This is _required_ when
1974    /// a timestamp oracle backend is configured.
1975    timestamp_oracle_config: Option<TimestampOracleConfig>,
1976
1977    /// Periodically asks cluster scheduling policies to make their decisions.
1978    check_cluster_scheduling_policies_interval: Interval,
1979
1980    /// This keeps the last On/Off decision for each cluster and each scheduling policy.
1981    /// (Clusters that have been dropped or are otherwise out of scope for automatic scheduling are
1982    /// periodically cleaned up from this Map.)
1983    cluster_scheduling_decisions: BTreeMap<ClusterId, BTreeMap<&'static str, SchedulingDecision>>,
1984
1985    /// When doing 0dt upgrades/in read-only mode, periodically ask all known
1986    /// clusters/collections whether they are caught up.
1987    caught_up_check_interval: Interval,
1988
1989    /// Context needed to check whether all clusters/collections have caught up.
1990    /// Only used during 0dt deployment, while in read-only mode.
1991    caught_up_check: Option<CaughtUpCheckContext>,
1992
1993    /// The metrics registry, handed to the catalog info-metrics background task
1994    /// so it can register and own its `*_info` series.
1995    catalog_info_metrics_registry: MetricsRegistry,
1996
1997    /// The shared system-parameter frontend, installed by the sync loop once it
1998    /// initializes (and re-installed on reconnect). `None` until then, for
1999    /// example before LaunchDarkly connects, where a newly-created object
2000    /// resolves to the environment-wide value (the cold-cache fallback). Used to
2001    /// resolve a new cluster's or replica's scoped overrides synchronously at
2002    /// create time, so its first plan or first controller configuration is
2003    /// correct rather than waiting for the next sync tick. See the scoped
2004    /// feature flags design.
2005    scoped_frontend: Option<Arc<SystemParameterFrontend>>,
2006
2007    /// Tracks the state associated with the currently installed watchsets.
2008    installed_watch_sets: BTreeMap<WatchSetId, (ConnectionId, WatchSetResponse)>,
2009
2010    /// Tracks the currently installed watchsets for each connection.
2011    connection_watch_sets: BTreeMap<ConnectionId, BTreeSet<WatchSetId>>,
2012
2013    /// Tracks the statuses of all cluster replicas.
2014    cluster_replica_statuses: ClusterReplicaStatuses,
2015
2016    /// Whether or not to start controllers in read-only mode. This is only
2017    /// meant for use during development of read-only clusters and 0dt upgrades
2018    /// and should go away once we have proper orchestration during upgrades.
2019    read_only_controllers: bool,
2020
2021    /// Updates to builtin tables that are being buffered while we are in
2022    /// read-only mode. We apply these all at once when coming out of read-only
2023    /// mode.
2024    ///
2025    /// This is a `Some` while in read-only mode and will be replaced by a
2026    /// `None` when we transition out of read-only mode and write out any
2027    /// buffered updates.
2028    buffered_builtin_table_updates: Option<Vec<BuiltinTableUpdate>>,
2029
2030    license_key: ValidatedLicenseKey,
2031
2032    /// Pre-allocated pool of user IDs to amortize persist writes across DDL operations.
2033    user_id_pool: IdPool,
2034}
2035
2036impl Coordinator {
2037    /// Persists the scoped system-parameter working copy and reconciles it into
2038    /// the per-scope resolution boundaries.
2039    ///
2040    /// The system-parameter sync loop and the create-time resolve
2041    /// (`resolve_scoped_for_new_objects`) are the only writers, both serialized
2042    /// on the coordinator loop. The diff is persisted to the
2043    /// durable cache (so values survive an `environmentd` restart and an LD
2044    /// outage) via `Op::UpdateScopedSystemParameters`, which also updates the
2045    /// in-memory working copy in [`CatalogState`] and the
2046    /// `mz_cluster_system_parameters` / `mz_replica_system_parameters`
2047    /// introspection relations. The `replica`-scoped overrides reach the compute
2048    /// controller's per-replica dyncfg layer through the catalog implication for
2049    /// the persisted change. The `cluster`-scoped layer is resolved at plan time
2050    /// via [`CatalogState::cluster_scoped_optimizer_overrides`].
2051    ///
2052    /// [`CatalogState`]: crate::catalog::CatalogState
2053    /// [`CatalogState::cluster_scoped_optimizer_overrides`]: crate::catalog::CatalogState::cluster_scoped_optimizer_overrides
2054    pub(crate) async fn reconcile_scoped_system_parameters(
2055        &mut self,
2056        scoped: ScopedParameters,
2057        prune_scope: Option<ScopedParametersScope>,
2058    ) {
2059        // Nothing changed: skip the durable write. This is the common case on
2060        // most sync ticks.
2061        if self.catalog().state().scoped_system_parameters() == &scoped {
2062            return;
2063        }
2064
2065        // Persist the diff and update the in-memory working copy + introspection
2066        // through the catalog transaction, serialized on the coordinator loop
2067        // with the create-time resolve. The replica-scoped
2068        // controller push is derived from this transaction's diff by the catalog
2069        // implication. `prune_scope` bounds removals to the evaluated objects, so
2070        // a concurrently-created object's override is not wiped. Best-effort: a
2071        // failure here is logged and retried on the next sync tick.
2072        if let Err(e) = self
2073            .catalog_transact(
2074                None,
2075                vec![crate::catalog::Op::UpdateScopedSystemParameters {
2076                    scoped,
2077                    prune_scope,
2078                }],
2079            )
2080            .await
2081        {
2082            tracing::warn!("failed to persist scoped system parameters: {e}");
2083        }
2084    }
2085
2086    /// Synchronously resolves the scoped overrides for the given freshly-created
2087    /// clusters and replicas and folds them into the working copy, so a new
2088    /// object observes its overrides immediately, before its first plan
2089    /// (cluster-coherent) or first controller configuration (replica-local),
2090    /// rather than after the next sync tick. Render-frozen flags (optimizer
2091    /// features baked into an immutable dataflow) make the tick-latency window a
2092    /// correctness problem, not just a delay, which is why this exists.
2093    ///
2094    /// No-op when the feature is gated off or the shared frontend is not yet
2095    /// installed (e.g. before LaunchDarkly connects), in which case the object
2096    /// resolves to the environment-wide value, the cold-cache fallback. The
2097    /// periodic sync loop remains the authoritative full-state reconciler.
2098    pub(crate) async fn resolve_scoped_for_new_objects(
2099        &mut self,
2100        clusters: &BTreeSet<ClusterId>,
2101        replicas: &BTreeSet<ReplicaId>,
2102    ) {
2103        if clusters.is_empty() && replicas.is_empty() {
2104            return;
2105        }
2106        let Some(frontend) = self.scoped_frontend.clone() else {
2107            return;
2108        };
2109        let catalog = self.catalog();
2110        if !ENABLE_SCOPED_SYSTEM_PARAMETERS.get(catalog.system_config().dyncfgs()) {
2111            return;
2112        }
2113
2114        // Evaluate only the new objects, then merge onto the current working copy
2115        // so reconcile keeps the overrides of objects it did not re-evaluate.
2116        let params = SynchronizedParameters::new(catalog.system_config().clone());
2117        let evaluated =
2118            evaluate_scoped_parameters(&frontend, &params, catalog, Some(clusters), Some(replicas));
2119        let merged = catalog.state().scoped_system_parameters().merge(&evaluated);
2120        // This runs synchronously after the create transaction, so the catalog
2121        // already reflects the new objects. The merged state is authoritative
2122        // for all live objects, so prune within them.
2123        let prune_scope = ScopedParametersScope {
2124            clusters: catalog.clusters().map(|cluster| cluster.id).collect(),
2125            replicas: catalog
2126                .clusters()
2127                .flat_map(|cluster| cluster.replicas().map(|replica| replica.replica_id))
2128                .collect(),
2129        };
2130        self.reconcile_scoped_system_parameters(merged, Some(prune_scope))
2131            .await;
2132    }
2133
2134    /// Resolves the replica-local scoped overrides from the catalog working copy
2135    /// into the compute controller's per-replica dyncfg layer, then re-pushes
2136    /// the environment-wide compute configuration so replicas observe the new
2137    /// values. Driven by the catalog implication for replica-scoped
2138    /// configuration changes, and called once on bootstrap.
2139    pub(crate) fn push_replica_dyncfg_overrides(&mut self) {
2140        // Clone the (sparse) replica overrides so we don't hold a catalog borrow
2141        // across the mutable controller calls below.
2142        let replica_overrides = self
2143            .catalog()
2144            .state()
2145            .scoped_system_parameters()
2146            .replica
2147            .clone();
2148
2149        let dyncfgs = self.catalog().system_config().dyncfgs();
2150        let mut instance_overrides: BTreeMap<
2151            ComputeInstanceId,
2152            BTreeMap<ReplicaId, ConfigUpdates>,
2153        > = BTreeMap::new();
2154        for cluster in self.catalog().clusters() {
2155            for replica in cluster.replicas() {
2156                let Some(values) = replica_overrides.get(&replica.replica_id) else {
2157                    continue;
2158                };
2159                let mut updates = ConfigUpdates::default();
2160                for (name, value) in values {
2161                    let Some(entry) = dyncfgs.entry(name) else {
2162                        // A replica-local parameter that is not a dyncfg has no
2163                        // per-replica realization, so skip it.
2164                        continue;
2165                    };
2166                    match entry.parse_val(value) {
2167                        Ok(val) => updates.add_dynamic(name, val),
2168                        Err(e) => {
2169                            tracing::warn!(%name, %value, "cannot parse scoped override: {e}")
2170                        }
2171                    }
2172                }
2173                if !updates.updates.is_empty() {
2174                    instance_overrides
2175                        .entry(cluster.id)
2176                        .or_default()
2177                        .insert(replica.replica_id, updates);
2178                }
2179            }
2180        }
2181
2182        // Only the compute controller's per-replica dyncfg layer is pushed, but on
2183        // `clusterd` that also reaches storage. Compute and storage share one
2184        // process, and the compute worker's `handle_update_configuration` applies
2185        // the pushed dyncfg updates both to compute's own worker `ConfigSet` and to
2186        // the shared persist client `ConfigSet` (`persist_clients.cfg()`) that the
2187        // co-located storage server reads from the same `Arc`. So persist-backed and
2188        // process-global replica-local configs such as the persist pager, LZ4,
2189        // persist client tuning, and `lgalloc` take effect on storage too. The only
2190        // gap would be a future `Replica`-scoped config realized solely in the
2191        // storage worker's own `ConfigSet`, of which none exists today.
2192        self.controller
2193            .compute
2194            .update_replica_dyncfg_overrides(instance_overrides);
2195        // Re-push the env-wide compute config so existing replicas pick up their
2196        // (possibly changed) overrides. This also reverts a removed override: the
2197        // per-replica layer no longer carries the key, so the replica falls back
2198        // to the env-wide value, which `compute_config` always includes because it
2199        // renders the full dyncfg set.
2200        let compute_config = crate::flags::compute_config(self.catalog().system_config());
2201        self.controller.compute.update_configuration(compute_config);
2202    }
2203
2204    /// Returns the cluster-coherent scoped optimizer-feature overrides for
2205    /// `cluster_id`. See
2206    /// [`CatalogState::cluster_scoped_optimizer_overrides`](crate::catalog::CatalogState::cluster_scoped_optimizer_overrides).
2207    pub(crate) fn cluster_scoped_optimizer_overrides(
2208        &self,
2209        cluster_id: ClusterId,
2210    ) -> OptimizerFeatureOverrides {
2211        self.catalog()
2212            .state()
2213            .cluster_scoped_optimizer_overrides(cluster_id)
2214    }
2215
2216    /// Initializes coordinator state based on the contained catalog. Must be
2217    /// called after creating the coordinator and before calling the
2218    /// `Coordinator::serve` method.
2219    #[instrument(name = "coord::bootstrap")]
2220    pub(crate) async fn bootstrap(
2221        &mut self,
2222        boot_ts: Timestamp,
2223        migrated_storage_collections_0dt: BTreeSet<CatalogItemId>,
2224        mut builtin_table_updates: Vec<BuiltinTableUpdate>,
2225        cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
2226        uncached_local_exprs: BTreeMap<GlobalId, LocalExpressions>,
2227        audit_logs_iterator: AuditLogIterator,
2228    ) -> Result<(), AdapterError> {
2229        let bootstrap_start = Instant::now();
2230        info!("startup: coordinator init: bootstrap beginning");
2231        info!("startup: coordinator init: bootstrap: preamble beginning");
2232
2233        // Initialize cluster replica statuses.
2234        // Gross iterator is to avoid partial borrow issues.
2235        let cluster_statuses: Vec<(_, Vec<_>)> = self
2236            .catalog()
2237            .clusters()
2238            .map(|cluster| {
2239                (
2240                    cluster.id(),
2241                    cluster
2242                        .replicas()
2243                        .map(|replica| {
2244                            (replica.replica_id, replica.config.location.num_processes())
2245                        })
2246                        .collect(),
2247                )
2248            })
2249            .collect();
2250        let now = self.now_datetime();
2251        for (cluster_id, replica_statuses) in cluster_statuses {
2252            self.cluster_replica_statuses
2253                .initialize_cluster_statuses(cluster_id);
2254            for (replica_id, num_processes) in replica_statuses {
2255                self.cluster_replica_statuses
2256                    .initialize_cluster_replica_statuses(
2257                        cluster_id,
2258                        replica_id,
2259                        num_processes,
2260                        now,
2261                    );
2262            }
2263        }
2264
2265        let system_config = self.catalog().system_config();
2266
2267        // Inform metrics about the initial system configuration.
2268        mz_metrics::update_dyncfg(&system_config.dyncfg_updates());
2269
2270        // Inform the controllers about their initial configuration.
2271        let compute_config = flags::compute_config(system_config);
2272        let storage_config = flags::storage_config(system_config);
2273        let scheduling_config = flags::orchestrator_scheduling_config(system_config);
2274        let dyncfg_updates = system_config.dyncfg_updates();
2275        self.controller.compute.update_configuration(compute_config);
2276        self.controller.storage.update_parameters(storage_config);
2277        self.controller
2278            .update_orchestrator_scheduling_config(scheduling_config);
2279        self.controller.update_configuration(dyncfg_updates);
2280
2281        // Skip the credit consumption check at bootstrap under DisableClusterCreation behavior:
2282        // this codepath validates existing replicas at startup, not cluster creation, so it
2283        // must not block startup. New cluster creation is still gated by the DDL-time check.
2284        // The Disable case is already handled by a bail! in main.rs before we reach here.
2285        let enforce_credit_limit_at_bootstrap = !matches!(
2286            self.license_key.expiration_behavior,
2287            ExpirationBehavior::DisableClusterCreation,
2288        );
2289        if enforce_credit_limit_at_bootstrap {
2290            self.validate_resource_limit_numeric(
2291                Numeric::zero(),
2292                self.current_credit_consumption_rate(),
2293                |system_vars| {
2294                    self.license_key
2295                        .max_credit_consumption_rate()
2296                        .map_or_else(|| system_vars.max_credit_consumption_rate(), Numeric::from)
2297                },
2298                "cluster replica",
2299                MAX_CREDIT_CONSUMPTION_RATE.name(),
2300            )?;
2301        }
2302
2303        let mut policies_to_set: BTreeMap<CompactionWindow, CollectionIdBundle> =
2304            Default::default();
2305
2306        let enable_worker_core_affinity =
2307            self.catalog().system_config().enable_worker_core_affinity();
2308        let enable_storage_introspection_logs = self
2309            .catalog()
2310            .system_config()
2311            .enable_storage_introspection_logs();
2312        for instance in self.catalog.clusters() {
2313            self.controller.create_cluster(
2314                instance.id,
2315                ClusterConfig {
2316                    arranged_logs: instance.log_indexes.clone(),
2317                    workload_class: instance.config.workload_class.clone(),
2318                },
2319            )?;
2320            for replica in instance.replicas() {
2321                let role = instance.role();
2322                self.controller.create_replica(
2323                    instance.id,
2324                    replica.replica_id,
2325                    instance.name.clone(),
2326                    replica.name.clone(),
2327                    role,
2328                    replica.config.clone(),
2329                    enable_worker_core_affinity,
2330                    enable_storage_introspection_logs,
2331                )?;
2332            }
2333        }
2334
2335        // Now that the compute instances and their replicas exist, push the
2336        // replica-local scoped overrides into the compute controller so existing
2337        // replicas observe them at startup. The scoped (per-cluster and
2338        // per-replica) working copy was restored from the durable cache into
2339        // `CatalogState` while opening the catalog, so the last-known values are
2340        // in effect before the first parameter sync and through a sync outage.
2341        // This must run after the creation loop above: the push iterates the
2342        // controller's instances, so before they exist it is a no-op. It also
2343        // runs before dataflows are rendered later in bootstrap, so render-frozen
2344        // replica flags take effect. The cluster-coherent layer is read at plan
2345        // time.
2346        self.push_replica_dyncfg_overrides();
2347
2348        info!(
2349            "startup: coordinator init: bootstrap: preamble complete in {:?}",
2350            bootstrap_start.elapsed()
2351        );
2352
2353        let init_storage_collections_start = Instant::now();
2354        info!("startup: coordinator init: bootstrap: storage collections init beginning");
2355        self.bootstrap_storage_collections(&migrated_storage_collections_0dt)
2356            .await;
2357        info!(
2358            "startup: coordinator init: bootstrap: storage collections init complete in {:?}",
2359            init_storage_collections_start.elapsed()
2360        );
2361
2362        // The storage controller knows about the introspection collections now, so we can start
2363        // sinking introspection updates in the compute controller. It makes sense to do that as
2364        // soon as possible, to avoid updates piling up in the compute controller's internal
2365        // buffers.
2366        self.controller.start_compute_introspection_sink();
2367
2368        let sorting_start = Instant::now();
2369        info!("startup: coordinator init: bootstrap: sorting catalog entries");
2370        let entries = self.bootstrap_sort_catalog_entries();
2371        info!(
2372            "startup: coordinator init: bootstrap: sorting catalog entries complete in {:?}",
2373            sorting_start.elapsed()
2374        );
2375
2376        let optimize_dataflows_start = Instant::now();
2377        info!("startup: coordinator init: bootstrap: optimize dataflow plans beginning");
2378        let uncached_global_exps = self.bootstrap_dataflow_plans(&entries, cached_global_exprs)?;
2379        info!(
2380            "startup: coordinator init: bootstrap: optimize dataflow plans complete in {:?}",
2381            optimize_dataflows_start.elapsed()
2382        );
2383
2384        // We don't need to wait for the cache to update.
2385        let _fut = self.catalog().update_expression_cache(
2386            uncached_local_exprs.into_iter().collect(),
2387            uncached_global_exps.into_iter().collect(),
2388            Default::default(),
2389        );
2390
2391        // Select dataflow as-ofs. This step relies on the storage collections created by
2392        // `bootstrap_storage_collections` and the dataflow plans created by
2393        // `bootstrap_dataflow_plans`.
2394        let bootstrap_as_ofs_start = Instant::now();
2395        info!("startup: coordinator init: bootstrap: dataflow as-of bootstrapping beginning");
2396        let dataflow_read_holds = self.bootstrap_dataflow_as_ofs().await;
2397        info!(
2398            "startup: coordinator init: bootstrap: dataflow as-of bootstrapping complete in {:?}",
2399            bootstrap_as_ofs_start.elapsed()
2400        );
2401
2402        let postamble_start = Instant::now();
2403        info!("startup: coordinator init: bootstrap: postamble beginning");
2404
2405        let logs: BTreeSet<_> = BUILTINS::logs()
2406            .map(|log| self.catalog().resolve_builtin_log(log))
2407            .flat_map(|item_id| self.catalog().get_global_ids(&item_id))
2408            .collect();
2409
2410        let mut privatelink_connections = BTreeMap::new();
2411
2412        for entry in &entries {
2413            debug!(
2414                "coordinator init: installing {} {}",
2415                entry.item().typ(),
2416                entry.id()
2417            );
2418            let mut policy = entry.item().initial_logical_compaction_window();
2419            match entry.item() {
2420                // Currently catalog item rebuild assumes that sinks and
2421                // indexes are always built individually and does not store information
2422                // about how it was built. If we start building multiple sinks and/or indexes
2423                // using a single dataflow, we have to make sure the rebuild process re-runs
2424                // the same multiple-build dataflow.
2425                CatalogItem::Source(source) => {
2426                    // Propagate source compaction windows to subsources if needed.
2427                    if source.custom_logical_compaction_window.is_none() {
2428                        if let DataSourceDesc::IngestionExport { ingestion_id, .. } =
2429                            source.data_source
2430                        {
2431                            policy = Some(
2432                                self.catalog()
2433                                    .get_entry(&ingestion_id)
2434                                    .source()
2435                                    .expect("must be source")
2436                                    .custom_logical_compaction_window
2437                                    .unwrap_or_default(),
2438                            );
2439                        }
2440                    }
2441                    policies_to_set
2442                        .entry(policy.expect("sources have a compaction window"))
2443                        .or_insert_with(Default::default)
2444                        .storage_ids
2445                        .insert(source.global_id());
2446                }
2447                CatalogItem::Table(table) => {
2448                    policies_to_set
2449                        .entry(policy.expect("tables have a compaction window"))
2450                        .or_insert_with(Default::default)
2451                        .storage_ids
2452                        .extend(table.global_ids());
2453                }
2454                CatalogItem::Index(idx) => {
2455                    let policy_entry = policies_to_set
2456                        .entry(policy.expect("indexes have a compaction window"))
2457                        .or_insert_with(Default::default);
2458
2459                    if logs.contains(&idx.on) {
2460                        policy_entry
2461                            .compute_ids
2462                            .entry(idx.cluster_id)
2463                            .or_insert_with(BTreeSet::new)
2464                            .insert(idx.global_id());
2465                    } else {
2466                        let df_desc = self
2467                            .catalog()
2468                            .try_get_physical_plan(&idx.global_id())
2469                            .expect("added in `bootstrap_dataflow_plans`")
2470                            .clone();
2471
2472                        let df_meta = self
2473                            .catalog()
2474                            .try_get_dataflow_metainfo(&idx.global_id())
2475                            .expect("added in `bootstrap_dataflow_plans`");
2476
2477                        if self.catalog().state().system_config().enable_mz_notices() {
2478                            // Collect optimization hint updates.
2479                            self.catalog().state().pack_optimizer_notices(
2480                                &mut builtin_table_updates,
2481                                df_meta.optimizer_notices.iter(),
2482                                Diff::ONE,
2483                            );
2484                        }
2485
2486                        // What follows is morally equivalent to `self.ship_dataflow(df, idx.cluster_id)`,
2487                        // but we cannot call that as it will also downgrade the read hold on the index.
2488                        policy_entry
2489                            .compute_ids
2490                            .entry(idx.cluster_id)
2491                            .or_insert_with(Default::default)
2492                            .extend(df_desc.export_ids());
2493
2494                        self.controller
2495                            .compute
2496                            .create_dataflow(idx.cluster_id, df_desc, None)
2497                            .unwrap_or_terminate("cannot fail to create dataflows");
2498                    }
2499                }
2500                CatalogItem::View(_) => (),
2501                CatalogItem::MaterializedView(mview) => {
2502                    policies_to_set
2503                        .entry(policy.expect("materialized views have a compaction window"))
2504                        .or_insert_with(Default::default)
2505                        .storage_ids
2506                        .insert(mview.global_id_writes());
2507
2508                    let mut df_desc = self
2509                        .catalog()
2510                        .try_get_physical_plan(&mview.global_id_writes())
2511                        .expect("added in `bootstrap_dataflow_plans`")
2512                        .clone();
2513
2514                    if let Some(initial_as_of) = mview.initial_as_of.clone() {
2515                        df_desc.set_initial_as_of(initial_as_of);
2516                    }
2517
2518                    // If we have a refresh schedule that has a last refresh, then set the `until` to the last refresh.
2519                    let until = mview
2520                        .refresh_schedule
2521                        .as_ref()
2522                        .and_then(|s| s.last_refresh())
2523                        .and_then(|r| r.try_step_forward());
2524                    if let Some(until) = until {
2525                        df_desc.until.meet_assign(&Antichain::from_elem(until));
2526                    }
2527
2528                    let df_meta = self
2529                        .catalog()
2530                        .try_get_dataflow_metainfo(&mview.global_id_writes())
2531                        .expect("added in `bootstrap_dataflow_plans`");
2532
2533                    if self.catalog().state().system_config().enable_mz_notices() {
2534                        // Collect optimization hint updates.
2535                        self.catalog().state().pack_optimizer_notices(
2536                            &mut builtin_table_updates,
2537                            df_meta.optimizer_notices.iter(),
2538                            Diff::ONE,
2539                        );
2540                    }
2541
2542                    self.ship_dataflow(df_desc, mview.cluster_id, mview.target_replica)
2543                        .await;
2544
2545                    // If this is a replacement MV, it must remain read-only until the replacement
2546                    // gets applied.
2547                    if mview.replacement_target.is_none() {
2548                        self.allow_writes(mview.cluster_id, mview.global_id_writes());
2549                    }
2550                }
2551                CatalogItem::Sink(sink) => {
2552                    policies_to_set
2553                        .entry(CompactionWindow::Default)
2554                        .or_insert_with(Default::default)
2555                        .storage_ids
2556                        .insert(sink.global_id());
2557                }
2558                CatalogItem::Connection(catalog_connection) => {
2559                    if let ConnectionDetails::AwsPrivatelink(conn) = &catalog_connection.details {
2560                        privatelink_connections.insert(
2561                            entry.id(),
2562                            VpcEndpointConfig {
2563                                aws_service_name: conn.service_name.clone(),
2564                                availability_zone_ids: conn.availability_zones.clone(),
2565                            },
2566                        );
2567                    }
2568                }
2569                // Nothing to do for these cases
2570                CatalogItem::Log(_)
2571                | CatalogItem::Type(_)
2572                | CatalogItem::Func(_)
2573                | CatalogItem::Secret(_) => {}
2574            }
2575        }
2576
2577        if let Some(cloud_resource_controller) = &self.cloud_resource_controller {
2578            // Clean up any extraneous VpcEndpoints that shouldn't exist.
2579            let existing_vpc_endpoints = cloud_resource_controller
2580                .list_vpc_endpoints()
2581                .await
2582                .context("list vpc endpoints")?;
2583            let existing_vpc_endpoints = BTreeSet::from_iter(existing_vpc_endpoints.into_keys());
2584            let desired_vpc_endpoints = privatelink_connections.keys().cloned().collect();
2585            let vpc_endpoints_to_remove = existing_vpc_endpoints.difference(&desired_vpc_endpoints);
2586            for id in vpc_endpoints_to_remove {
2587                cloud_resource_controller
2588                    .delete_vpc_endpoint(*id)
2589                    .await
2590                    .context("deleting extraneous vpc endpoint")?;
2591            }
2592
2593            // Ensure desired VpcEndpoints are up to date.
2594            for (id, spec) in privatelink_connections {
2595                cloud_resource_controller
2596                    .ensure_vpc_endpoint(id, spec)
2597                    .await
2598                    .context("ensuring vpc endpoint")?;
2599            }
2600        }
2601
2602        // Having installed all entries, creating all constraints, we can now drop read holds and
2603        // relax read policies.
2604        drop(dataflow_read_holds);
2605        // TODO -- Improve `initialize_read_policies` API so we can avoid calling this in a loop.
2606        for (cw, policies) in policies_to_set {
2607            self.initialize_read_policies(&policies, cw).await;
2608        }
2609
2610        // Expose mapping from T-shirt sizes to actual sizes
2611        builtin_table_updates.extend(
2612            self.catalog().state().resolve_builtin_table_updates(
2613                self.catalog().state().pack_all_replica_size_updates(),
2614            ),
2615        );
2616
2617        debug!("startup: coordinator init: bootstrap: initializing migrated builtin tables");
2618        // When 0dt is enabled, we create new shards for any migrated builtin storage collections.
2619        // In read-only mode, the migrated builtin tables (which are a subset of migrated builtin
2620        // storage collections) need to be back-filled so that any dependent dataflow can be
2621        // hydrated. Additionally, these shards are not registered with the txn-shard, and cannot
2622        // be registered while in read-only, so they are written to directly.
2623        let migrated_updates_fut = if self.controller.read_only() {
2624            let min_timestamp = Timestamp::minimum();
2625            let migrated_builtin_table_updates: Vec<_> = builtin_table_updates
2626                .extract_if(.., |update| {
2627                    let gid = self.catalog().get_entry(&update.id).latest_global_id();
2628                    migrated_storage_collections_0dt.contains(&update.id)
2629                        && self
2630                            .controller
2631                            .storage_collections
2632                            .collection_frontiers(gid)
2633                            .expect("all tables are registered")
2634                            .write_frontier
2635                            .elements()
2636                            == &[min_timestamp]
2637                })
2638                .collect();
2639            if migrated_builtin_table_updates.is_empty() {
2640                futures::future::ready(()).boxed()
2641            } else {
2642                // Group all updates per-table.
2643                let mut grouped_appends: BTreeMap<GlobalId, Vec<TableData>> = BTreeMap::new();
2644                for update in migrated_builtin_table_updates {
2645                    let gid = self.catalog().get_entry(&update.id).latest_global_id();
2646                    grouped_appends.entry(gid).or_default().push(update.data);
2647                }
2648                info!(
2649                    "coordinator init: rehydrating migrated builtin tables in read-only mode: {:?}",
2650                    grouped_appends.keys().collect::<Vec<_>>()
2651                );
2652
2653                // Consolidate Row data, staged batches must already be consolidated.
2654                let mut all_appends = Vec::with_capacity(grouped_appends.len());
2655                for (item_id, table_data) in grouped_appends.into_iter() {
2656                    let mut all_rows = Vec::new();
2657                    let mut all_data = Vec::new();
2658                    for data in table_data {
2659                        match data {
2660                            TableData::Rows(rows) => all_rows.extend(rows),
2661                            TableData::Batches(_) => all_data.push(data),
2662                        }
2663                    }
2664                    differential_dataflow::consolidation::consolidate(&mut all_rows);
2665                    all_data.push(TableData::Rows(all_rows));
2666
2667                    // TODO(parkmycar): Use SmallVec throughout.
2668                    all_appends.push((item_id, all_data));
2669                }
2670
2671                let fut = self
2672                    .controller
2673                    .storage
2674                    .append_table(min_timestamp, boot_ts.step_forward(), all_appends)
2675                    .expect("cannot fail to append");
2676                async {
2677                    fut.await
2678                        .expect("One-shot shouldn't be dropped during bootstrap")
2679                        .unwrap_or_terminate("cannot fail to append")
2680                }
2681                .boxed()
2682            }
2683        } else {
2684            futures::future::ready(()).boxed()
2685        };
2686
2687        info!(
2688            "startup: coordinator init: bootstrap: postamble complete in {:?}",
2689            postamble_start.elapsed()
2690        );
2691
2692        let builtin_update_start = Instant::now();
2693        info!("startup: coordinator init: bootstrap: generate builtin updates beginning");
2694
2695        if self.controller.read_only() {
2696            info!(
2697                "coordinator init: bootstrap: stashing builtin table updates while in read-only mode"
2698            );
2699
2700            // TODO(jkosh44) Optimize deserializing the audit log in read-only mode.
2701            let audit_join_start = Instant::now();
2702            info!("startup: coordinator init: bootstrap: audit log deserialization beginning");
2703            let audit_log_updates: Vec<_> = audit_logs_iterator
2704                .map(|(audit_log, ts)| StateUpdate {
2705                    kind: StateUpdateKind::AuditLog(audit_log),
2706                    ts,
2707                    diff: StateDiff::Addition,
2708                })
2709                .collect();
2710            let audit_log_builtin_table_updates = self
2711                .catalog()
2712                .state()
2713                .generate_builtin_table_updates(audit_log_updates);
2714            builtin_table_updates.extend(audit_log_builtin_table_updates);
2715            info!(
2716                "startup: coordinator init: bootstrap: audit log deserialization complete in {:?}",
2717                audit_join_start.elapsed()
2718            );
2719            self.buffered_builtin_table_updates
2720                .as_mut()
2721                .expect("in read-only mode")
2722                .append(&mut builtin_table_updates);
2723        } else {
2724            self.bootstrap_tables(&entries, builtin_table_updates, audit_logs_iterator)
2725                .await;
2726        };
2727        info!(
2728            "startup: coordinator init: bootstrap: generate builtin updates complete in {:?}",
2729            builtin_update_start.elapsed()
2730        );
2731
2732        let cleanup_secrets_start = Instant::now();
2733        info!("startup: coordinator init: bootstrap: generate secret cleanup beginning");
2734        // Cleanup orphaned secrets. Errors during list() or delete() do not
2735        // need to prevent bootstrap from succeeding; we will retry next
2736        // startup.
2737        {
2738            // Destructure Self so we can selectively move fields into the async
2739            // task.
2740            let Self {
2741                secrets_controller,
2742                catalog,
2743                ..
2744            } = self;
2745
2746            let next_user_item_id = catalog.get_next_user_item_id().await?;
2747            let next_system_item_id = catalog.get_next_system_item_id().await?;
2748            let read_only = self.controller.read_only();
2749            // Fetch all IDs from the catalog to future-proof against other
2750            // things using secrets. Today, SECRET and CONNECTION objects use
2751            // secrets_controller.ensure, but more things could in the future
2752            // that would be easy to miss adding here.
2753            let catalog_ids: BTreeSet<CatalogItemId> =
2754                catalog.entries().map(|entry| entry.id()).collect();
2755            let secrets_controller = Arc::clone(secrets_controller);
2756
2757            spawn(|| "cleanup-orphaned-secrets", async move {
2758                if read_only {
2759                    info!(
2760                        "coordinator init: not cleaning up orphaned secrets while in read-only mode"
2761                    );
2762                    return;
2763                }
2764                info!("coordinator init: cleaning up orphaned secrets");
2765
2766                match secrets_controller.list().await {
2767                    Ok(controller_secrets) => {
2768                        let controller_secrets: BTreeSet<CatalogItemId> =
2769                            controller_secrets.into_iter().collect();
2770                        let orphaned = controller_secrets.difference(&catalog_ids);
2771                        for id in orphaned {
2772                            let id_too_large = match id {
2773                                CatalogItemId::System(id) => *id >= next_system_item_id,
2774                                CatalogItemId::User(id) => *id >= next_user_item_id,
2775                                CatalogItemId::IntrospectionSourceIndex(_)
2776                                | CatalogItemId::Transient(_) => false,
2777                            };
2778                            if id_too_large {
2779                                info!(
2780                                    %next_user_item_id, %next_system_item_id,
2781                                    "coordinator init: not deleting orphaned secret {id} that was likely created by a newer deploy generation"
2782                                );
2783                            } else {
2784                                info!("coordinator init: deleting orphaned secret {id}");
2785                                fail_point!("orphan_secrets");
2786                                if let Err(e) = secrets_controller.delete(*id).await {
2787                                    warn!(
2788                                        "Dropping orphaned secret has encountered an error: {}",
2789                                        e
2790                                    );
2791                                }
2792                            }
2793                        }
2794                    }
2795                    Err(e) => warn!("Failed to list secrets during orphan cleanup: {:?}", e),
2796                }
2797            });
2798        }
2799        info!(
2800            "startup: coordinator init: bootstrap: generate secret cleanup complete in {:?}",
2801            cleanup_secrets_start.elapsed()
2802        );
2803
2804        // Run all of our final steps concurrently.
2805        let final_steps_start = Instant::now();
2806        info!(
2807            "startup: coordinator init: bootstrap: migrate builtin tables in read-only mode beginning"
2808        );
2809        migrated_updates_fut
2810            .instrument(info_span!("coord::bootstrap::final"))
2811            .await;
2812
2813        debug!(
2814            "startup: coordinator init: bootstrap: announcing completion of initialization to controller"
2815        );
2816        // Announce the completion of initialization.
2817        self.controller.initialization_complete();
2818
2819        // Initialize unified introspection.
2820        self.bootstrap_introspection_subscribes().await;
2821
2822        info!(
2823            "startup: coordinator init: bootstrap: migrate builtin tables in read-only mode complete in {:?}",
2824            final_steps_start.elapsed()
2825        );
2826
2827        info!(
2828            "startup: coordinator init: bootstrap complete in {:?}",
2829            bootstrap_start.elapsed()
2830        );
2831        Ok(())
2832    }
2833
2834    /// Prepares tables for writing by resetting them to a known state and
2835    /// appending the given builtin table updates. The timestamp oracle
2836    /// will be advanced to the write timestamp of the append when this
2837    /// method returns.
2838    #[allow(clippy::async_yields_async)]
2839    #[instrument]
2840    async fn bootstrap_tables(
2841        &mut self,
2842        entries: &[CatalogEntry],
2843        mut builtin_table_updates: Vec<BuiltinTableUpdate>,
2844        audit_logs_iterator: AuditLogIterator,
2845    ) {
2846        /// Smaller helper struct of metadata for bootstrapping tables.
2847        struct TableMetadata<'a> {
2848            id: CatalogItemId,
2849            name: &'a QualifiedItemName,
2850            table: &'a Table,
2851        }
2852
2853        // Filter our entries down to just tables.
2854        let table_metas: Vec<_> = entries
2855            .into_iter()
2856            .filter_map(|entry| {
2857                entry.table().map(|table| TableMetadata {
2858                    id: entry.id(),
2859                    name: entry.name(),
2860                    table,
2861                })
2862            })
2863            .collect();
2864
2865        // Append empty batches to advance the timestamp of all tables.
2866        debug!("coordinator init: advancing all tables to current timestamp");
2867        let WriteTimestamp {
2868            timestamp: write_ts,
2869            advance_to,
2870        } = self.get_local_write_ts().await;
2871        let appends = table_metas
2872            .iter()
2873            .map(|meta| (meta.table.global_id_writes(), Vec::new()))
2874            .collect();
2875        // Append the tables in the background. We apply the write timestamp before getting a read
2876        // timestamp and reading a snapshot of each table, so the snapshots will block on their own
2877        // until the appends are complete.
2878        let table_fence_rx = self
2879            .controller
2880            .storage
2881            .append_table(write_ts.clone(), advance_to, appends)
2882            .expect("invalid updates");
2883
2884        self.apply_local_write(write_ts).await;
2885
2886        // Add builtin table updates the clear the contents of all system tables
2887        debug!("coordinator init: resetting system tables");
2888        let read_ts = self.get_local_read_ts().await;
2889
2890        // Filter out the 'mz_storage_usage_by_shard' table since we need to retain that info for
2891        // billing purposes.
2892        let mz_storage_usage_by_shard_schema: SchemaSpecifier = self
2893            .catalog()
2894            .resolve_system_schema(MZ_STORAGE_USAGE_BY_SHARD.schema)
2895            .into();
2896        let is_storage_usage_by_shard = |meta: &TableMetadata| -> bool {
2897            meta.name.item == MZ_STORAGE_USAGE_BY_SHARD.name
2898                && meta.name.qualifiers.schema_spec == mz_storage_usage_by_shard_schema
2899        };
2900
2901        let mut retraction_tasks = Vec::new();
2902        let mut system_tables: Vec<_> = table_metas
2903            .iter()
2904            .filter(|meta| meta.id.is_system() && !is_storage_usage_by_shard(meta))
2905            .collect();
2906
2907        // Special case audit events because it's append only.
2908        let (audit_events_idx, _) = system_tables
2909            .iter()
2910            .find_position(|table| {
2911                table.id == self.catalog().resolve_builtin_table(&MZ_AUDIT_EVENTS)
2912            })
2913            .expect("mz_audit_events must exist");
2914        let audit_events = system_tables.remove(audit_events_idx);
2915        let audit_log_task = self.bootstrap_audit_log_table(
2916            audit_events.id,
2917            audit_events.name,
2918            audit_events.table,
2919            audit_logs_iterator,
2920            read_ts,
2921        );
2922
2923        for system_table in system_tables {
2924            let table_id = system_table.id;
2925            let full_name = self.catalog().resolve_full_name(system_table.name, None);
2926            debug!("coordinator init: resetting system table {full_name} ({table_id})");
2927
2928            // Fetch the current contents of the table for retraction.
2929            let snapshot_fut = self
2930                .controller
2931                .storage_collections
2932                .snapshot_cursor(system_table.table.global_id_writes(), read_ts);
2933            let batch_fut = self
2934                .controller
2935                .storage_collections
2936                .create_update_builder(system_table.table.global_id_writes());
2937
2938            let task = spawn(|| format!("snapshot-{table_id}"), async move {
2939                // Create a TimestamplessUpdateBuilder.
2940                let mut batch = batch_fut
2941                    .await
2942                    .unwrap_or_terminate("cannot fail to create a batch for a BuiltinTable");
2943                tracing::info!(?table_id, "starting snapshot");
2944                // Get a cursor which will emit a consolidated snapshot.
2945                let mut snapshot_cursor = snapshot_fut
2946                    .await
2947                    .unwrap_or_terminate("cannot fail to snapshot");
2948
2949                // Retract the current contents, spilling into our builder.
2950                while let Some(values) = snapshot_cursor.next().await {
2951                    for (key, _t, d) in values {
2952                        let d_invert = d.neg();
2953                        batch.add(&key, &(), &d_invert).await;
2954                    }
2955                }
2956                tracing::info!(?table_id, "finished snapshot");
2957
2958                let batch = batch.finish().await;
2959                BuiltinTableUpdate::batch(table_id, batch)
2960            });
2961            retraction_tasks.push(task);
2962        }
2963
2964        let retractions_res = futures::future::join_all(retraction_tasks).await;
2965        for retractions in retractions_res {
2966            builtin_table_updates.push(retractions);
2967        }
2968
2969        let audit_join_start = Instant::now();
2970        info!("startup: coordinator init: bootstrap: join audit log deserialization beginning");
2971        let audit_log_updates = audit_log_task.await;
2972        let audit_log_builtin_table_updates = self
2973            .catalog()
2974            .state()
2975            .generate_builtin_table_updates(audit_log_updates);
2976        builtin_table_updates.extend(audit_log_builtin_table_updates);
2977        info!(
2978            "startup: coordinator init: bootstrap: join audit log deserialization complete in {:?}",
2979            audit_join_start.elapsed()
2980        );
2981
2982        // Now that the snapshots are complete, the appends must also be complete.
2983        table_fence_rx
2984            .await
2985            .expect("One-shot shouldn't be dropped during bootstrap")
2986            .unwrap_or_terminate("cannot fail to append");
2987
2988        info!("coordinator init: sending builtin table updates");
2989        let (_builtin_updates_fut, write_ts) = self
2990            .builtin_table_update()
2991            .execute(builtin_table_updates)
2992            .await;
2993        info!(?write_ts, "our write ts");
2994        if let Some(write_ts) = write_ts {
2995            self.apply_local_write(write_ts).await;
2996        }
2997    }
2998
2999    /// Prepare updates to the audit log table. The audit log table append only and very large, so
3000    /// we only need to find the events present in `audit_logs_iterator` but not in the audit log
3001    /// table.
3002    #[instrument]
3003    fn bootstrap_audit_log_table<'a>(
3004        &self,
3005        table_id: CatalogItemId,
3006        name: &'a QualifiedItemName,
3007        table: &'a Table,
3008        audit_logs_iterator: AuditLogIterator,
3009        read_ts: Timestamp,
3010    ) -> JoinHandle<Vec<StateUpdate>> {
3011        let full_name = self.catalog().resolve_full_name(name, None);
3012        debug!("coordinator init: reconciling audit log: {full_name} ({table_id})");
3013        let current_contents_fut = self
3014            .controller
3015            .storage_collections
3016            .snapshot(table.global_id_writes(), read_ts);
3017        spawn(|| format!("snapshot-audit-log-{table_id}"), async move {
3018            let current_contents = current_contents_fut
3019                .await
3020                .unwrap_or_terminate("cannot fail to fetch snapshot");
3021            let contents_len = current_contents.len();
3022            debug!("coordinator init: audit log table ({table_id}) size {contents_len}");
3023
3024            // Fetch the largest audit log event ID that has been written to the table.
3025            let max_table_id = current_contents
3026                .into_iter()
3027                .filter(|(_, diff)| *diff == 1)
3028                .map(|(row, _diff)| row.unpack_first().unwrap_uint64())
3029                .sorted()
3030                .rev()
3031                .next();
3032
3033            // Filter audit log catalog updates to those that are not present in the table.
3034            audit_logs_iterator
3035                .take_while(|(audit_log, _)| match max_table_id {
3036                    Some(id) => audit_log.event.sortable_id() > id,
3037                    None => true,
3038                })
3039                .map(|(audit_log, ts)| StateUpdate {
3040                    kind: StateUpdateKind::AuditLog(audit_log),
3041                    ts,
3042                    diff: StateDiff::Addition,
3043                })
3044                .collect::<Vec<_>>()
3045        })
3046    }
3047
3048    /// Initializes all storage collections required by catalog objects in the storage controller.
3049    ///
3050    /// This method takes care of collection creation, as well as migration of existing
3051    /// collections.
3052    ///
3053    /// Creating all storage collections in a single `create_collections` call, rather than on
3054    /// demand, is more efficient as it reduces the number of writes to durable storage. It also
3055    /// allows subsequent bootstrap logic to fetch metadata (such as frontiers) of arbitrary
3056    /// storage collections, without needing to worry about dependency order.
3057    ///
3058    /// `migrated_storage_collections` is a set of builtin storage collections that have been
3059    /// migrated and should be handled specially.
3060    #[instrument]
3061    async fn bootstrap_storage_collections(
3062        &mut self,
3063        migrated_storage_collections: &BTreeSet<CatalogItemId>,
3064    ) {
3065        let catalog = self.catalog();
3066
3067        let source_desc = |object_id: GlobalId,
3068                           data_source: &DataSourceDesc,
3069                           desc: &RelationDesc,
3070                           timeline: &Timeline| {
3071            let data_source = match data_source.clone() {
3072                // Re-announce the source description.
3073                DataSourceDesc::Ingestion { desc, cluster_id } => {
3074                    let desc = desc.into_inline_connection(catalog.state());
3075                    let ingestion = IngestionDescription::new(desc, cluster_id, object_id);
3076                    DataSource::Ingestion(ingestion)
3077                }
3078                DataSourceDesc::OldSyntaxIngestion {
3079                    desc,
3080                    progress_subsource,
3081                    data_config,
3082                    details,
3083                    cluster_id,
3084                } => {
3085                    let desc = desc.into_inline_connection(catalog.state());
3086                    let data_config = data_config.into_inline_connection(catalog.state());
3087                    // TODO(parkmycar): We should probably check the type here, but I'm not sure if
3088                    // this will always be a Source or a Table.
3089                    let progress_subsource =
3090                        catalog.get_entry(&progress_subsource).latest_global_id();
3091                    let mut ingestion =
3092                        IngestionDescription::new(desc, cluster_id, progress_subsource);
3093                    let legacy_export = SourceExport {
3094                        storage_metadata: (),
3095                        data_config,
3096                        details,
3097                    };
3098                    ingestion.source_exports.insert(object_id, legacy_export);
3099
3100                    DataSource::Ingestion(ingestion)
3101                }
3102                DataSourceDesc::IngestionExport {
3103                    ingestion_id,
3104                    external_reference: _,
3105                    details,
3106                    data_config,
3107                } => {
3108                    // TODO(parkmycar): We should probably check the type here, but I'm not sure if
3109                    // this will always be a Source or a Table.
3110                    let ingestion_id = catalog.get_entry(&ingestion_id).latest_global_id();
3111
3112                    DataSource::IngestionExport {
3113                        ingestion_id,
3114                        details,
3115                        data_config: data_config.into_inline_connection(catalog.state()),
3116                    }
3117                }
3118                DataSourceDesc::Webhook { .. } => DataSource::Webhook,
3119                DataSourceDesc::Progress => DataSource::Progress,
3120                DataSourceDesc::Introspection(introspection) => {
3121                    DataSource::Introspection(introspection)
3122                }
3123                DataSourceDesc::Catalog => DataSource::Other,
3124            };
3125            CollectionDescription {
3126                desc: desc.clone(),
3127                data_source,
3128                since: None,
3129                timeline: Some(timeline.clone()),
3130                primary: None,
3131            }
3132        };
3133
3134        let mut compute_collections = vec![];
3135        let mut collections = vec![];
3136        for entry in catalog.entries() {
3137            match entry.item() {
3138                CatalogItem::Source(source) => {
3139                    collections.push((
3140                        source.global_id(),
3141                        source_desc(
3142                            source.global_id(),
3143                            &source.data_source,
3144                            &source.desc,
3145                            &source.timeline,
3146                        ),
3147                    ));
3148                }
3149                CatalogItem::Table(table) => {
3150                    match &table.data_source {
3151                        TableDataSource::TableWrites { defaults: _ } => {
3152                            let versions: BTreeMap<_, _> = table
3153                                .collection_descs()
3154                                .map(|(gid, version, desc)| (version, (gid, desc)))
3155                                .collect();
3156                            let collection_descs = versions.iter().map(|(version, (gid, desc))| {
3157                                let next_version = version.bump();
3158                                let primary_collection =
3159                                    versions.get(&next_version).map(|(gid, _desc)| gid).copied();
3160                                let mut collection_desc =
3161                                    CollectionDescription::for_table(desc.clone());
3162                                collection_desc.primary = primary_collection;
3163
3164                                (*gid, collection_desc)
3165                            });
3166                            collections.extend(collection_descs);
3167                        }
3168                        TableDataSource::DataSource {
3169                            desc: data_source_desc,
3170                            timeline,
3171                        } => {
3172                            // TODO(alter_table): Support versioning tables that read from sources.
3173                            soft_assert_eq_or_log!(table.collections.len(), 1);
3174                            let collection_descs =
3175                                table.collection_descs().map(|(gid, _version, desc)| {
3176                                    (
3177                                        gid,
3178                                        source_desc(
3179                                            entry.latest_global_id(),
3180                                            data_source_desc,
3181                                            &desc,
3182                                            timeline,
3183                                        ),
3184                                    )
3185                                });
3186                            collections.extend(collection_descs);
3187                        }
3188                    };
3189                }
3190                CatalogItem::MaterializedView(mv) => {
3191                    let collection_descs = mv.collection_descs().map(|(gid, _version, desc)| {
3192                        let collection_desc =
3193                            CollectionDescription::for_other(desc, mv.initial_as_of.clone());
3194                        (gid, collection_desc)
3195                    });
3196
3197                    collections.extend(collection_descs);
3198                    compute_collections.push((mv.global_id_writes(), mv.desc.latest()));
3199                }
3200                CatalogItem::Sink(sink) => {
3201                    let storage_sink_from_entry = self.catalog().get_entry_by_global_id(&sink.from);
3202                    let from_desc = storage_sink_from_entry
3203                        .relation_desc()
3204                        .expect("sinks can only be built on items with descs")
3205                        .into_owned();
3206                    let collection_desc = CollectionDescription {
3207                        // TODO(sinks): make generic once we have more than one sink type.
3208                        desc: KAFKA_PROGRESS_DESC.clone(),
3209                        data_source: DataSource::Sink {
3210                            desc: ExportDescription {
3211                                sink: StorageSinkDesc {
3212                                    from: sink.from,
3213                                    from_desc,
3214                                    connection: sink
3215                                        .connection
3216                                        .clone()
3217                                        .into_inline_connection(self.catalog().state()),
3218                                    envelope: sink.envelope,
3219                                    as_of: Antichain::from_elem(Timestamp::minimum()),
3220                                    with_snapshot: sink.with_snapshot,
3221                                    version: sink.version,
3222                                    from_storage_metadata: (),
3223                                    to_storage_metadata: (),
3224                                    commit_interval: sink.commit_interval,
3225                                },
3226                                instance_id: sink.cluster_id,
3227                            },
3228                        },
3229                        since: None,
3230                        timeline: None,
3231                        primary: None,
3232                    };
3233                    collections.push((sink.global_id, collection_desc));
3234                }
3235                CatalogItem::Log(_)
3236                | CatalogItem::View(_)
3237                | CatalogItem::Index(_)
3238                | CatalogItem::Type(_)
3239                | CatalogItem::Func(_)
3240                | CatalogItem::Secret(_)
3241                | CatalogItem::Connection(_) => (),
3242            }
3243        }
3244
3245        let register_ts = if self.controller.read_only() {
3246            self.get_local_read_ts().await
3247        } else {
3248            // Getting a write timestamp bumps the write timestamp in the
3249            // oracle, which we're not allowed in read-only mode.
3250            self.get_local_write_ts().await.timestamp
3251        };
3252
3253        let storage_metadata = self.catalog.state().storage_metadata();
3254        let migrated_storage_collections = migrated_storage_collections
3255            .into_iter()
3256            .flat_map(|item_id| self.catalog.get_entry(item_id).global_ids())
3257            .collect();
3258
3259        // Before possibly creating collections, make sure their schemas are correct.
3260        //
3261        // Across different versions of Materialize the nullability of columns can change based on
3262        // updates to our optimizer.
3263        self.controller
3264            .storage
3265            .evolve_nullability_for_bootstrap(storage_metadata, compute_collections)
3266            .await
3267            .unwrap_or_terminate("cannot fail to evolve collections");
3268
3269        // New builtin storage collections are by default created with [0] since/upper frontiers.
3270        // For collections that have dependencies on other collections (MVs, CTs), this can violate
3271        // the frontier invariants assumed by as-of selection. For example, as-of selection expects
3272        // to be able to pick up computing a materialized view from its most recent upper, but if
3273        // that upper is [0] it's likely that the required times are not available anymore in the
3274        // MV inputs.
3275        //
3276        // To avoid violating frontier invariants, we need to bump their sinces to times greater
3277        // than all of their upstream storage inputs. To know the since of a storage input, it has
3278        // to be registered with the storage controller first. Thus we register collections in
3279        // layers: Each iteration registers the collections whose dependencies are all already
3280        // registered.
3281        let mut pending: BTreeMap<_, _> = collections.into_iter().collect();
3282
3283        // Precompute storage-collection dependencies for each collection.
3284        let transitive_dep_gids: BTreeMap<_, _> = pending
3285            .keys()
3286            .map(|gid| {
3287                let entry = self.catalog.get_entry_by_global_id(gid);
3288                let item_id = entry.id();
3289                let deps = self.catalog.state().transitive_uses(item_id);
3290                let dep_gids: BTreeSet<_> = deps
3291                    // Ignore self-dependencies. For example, `transitive_uses` includes the input ID,
3292                    // and CTs can depend on themselves.
3293                    .filter(|dep_id| *dep_id != item_id)
3294                    .map(|dep_id| self.catalog.get_entry(&dep_id).latest_global_id())
3295                    // Ignore dependencies on objects that are not storage collections.
3296                    .filter(|dep_gid| pending.contains_key(dep_gid))
3297                    .collect();
3298                (*gid, dep_gids)
3299            })
3300            .collect();
3301
3302        while !pending.is_empty() {
3303            // Drain collections whose dependencies have all been registered already
3304            // (i.e., are not in `pending`).
3305            let ready_gids: BTreeSet<_> = pending
3306                .keys()
3307                .filter(|gid| {
3308                    let mut deps = transitive_dep_gids[gid].iter();
3309                    !deps.any(|dep_gid| pending.contains_key(dep_gid))
3310                })
3311                .copied()
3312                .collect();
3313            let mut ready: Vec<_> = pending
3314                .extract_if(.., |gid, _| ready_gids.contains(gid))
3315                .collect();
3316
3317            // Bump sinces of builtin collections.
3318            for (gid, collection) in &mut ready {
3319                // Don't silently overwrite an explicitly specified `since`.
3320                if !gid.is_system() || collection.since.is_some() {
3321                    continue;
3322                }
3323
3324                let mut derived_since = Antichain::from_elem(Timestamp::MIN);
3325                for dep_gid in &transitive_dep_gids[gid] {
3326                    let (since, _) = self
3327                        .controller
3328                        .storage
3329                        .collection_frontiers(*dep_gid)
3330                        .expect("previously registered");
3331                    derived_since.join_assign(&since);
3332                }
3333                collection.since = Some(derived_since);
3334            }
3335
3336            if ready.is_empty() {
3337                soft_panic_or_log!(
3338                    "cycle in storage collections: {:?}",
3339                    pending.keys().collect::<Vec<_>>(),
3340                );
3341                // We get here only due to a bug. Rather than crash-looping, we try our best to
3342                // reach a sane state by attempting to register all the remaining collections at
3343                // once.
3344                ready = mem::take(&mut pending).into_iter().collect();
3345            }
3346
3347            self.controller
3348                .storage
3349                .create_collections_for_bootstrap(
3350                    storage_metadata,
3351                    Some(register_ts),
3352                    ready,
3353                    &migrated_storage_collections,
3354                )
3355                .await
3356                .unwrap_or_terminate("cannot fail to create collections");
3357        }
3358
3359        if !self.controller.read_only() {
3360            self.apply_local_write(register_ts).await;
3361        }
3362    }
3363
3364    /// Returns the current list of catalog entries, sorted into an appropriate order for
3365    /// bootstrapping.
3366    ///
3367    /// The returned entries are in dependency order. Indexes are sorted immediately after the
3368    /// objects they index, to ensure that all dependants of these indexed objects can make use of
3369    /// the respective indexes.
3370    fn bootstrap_sort_catalog_entries(&self) -> Vec<CatalogEntry> {
3371        let mut indexes_on = BTreeMap::<_, Vec<_>>::new();
3372        let mut non_indexes = Vec::new();
3373        for entry in self.catalog().entries().cloned() {
3374            if let Some(index) = entry.index() {
3375                let on = self.catalog().get_entry_by_global_id(&index.on);
3376                indexes_on.entry(on.id()).or_default().push(entry);
3377            } else {
3378                non_indexes.push(entry);
3379            }
3380        }
3381
3382        let key_fn = |entry: &CatalogEntry| entry.id;
3383        let dependencies_fn = |entry: &CatalogEntry| entry.uses();
3384        sort_topological(&mut non_indexes, key_fn, dependencies_fn);
3385
3386        let mut result = Vec::new();
3387        for entry in non_indexes {
3388            let id = entry.id();
3389            result.push(entry);
3390            if let Some(mut indexes) = indexes_on.remove(&id) {
3391                result.append(&mut indexes);
3392            }
3393        }
3394
3395        soft_assert_or_log!(
3396            indexes_on.is_empty(),
3397            "indexes with missing dependencies: {indexes_on:?}",
3398        );
3399
3400        result
3401    }
3402
3403    /// Invokes the optimizer on all indexes and materialized views in the catalog and inserts the
3404    /// resulting dataflow plans into the catalog state.
3405    ///
3406    /// `ordered_catalog_entries` must be sorted in dependency order, with dependencies ordered
3407    /// before their dependants.
3408    ///
3409    /// This method does not perform timestamp selection for the dataflows, nor does it create them
3410    /// in the compute controller. Both of these steps happen later during bootstrapping.
3411    ///
3412    /// Returns a map of expressions that were not cached.
3413    #[instrument]
3414    fn bootstrap_dataflow_plans(
3415        &mut self,
3416        ordered_catalog_entries: &[CatalogEntry],
3417        mut cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
3418    ) -> Result<BTreeMap<GlobalId, GlobalExpressions>, AdapterError> {
3419        // The optimizer expects to be able to query its `ComputeInstanceSnapshot` for
3420        // collections the current dataflow can depend on. But since we don't yet install anything
3421        // on compute instances, the snapshot information is incomplete. We fix that by manually
3422        // updating `ComputeInstanceSnapshot` objects to ensure they contain collections previously
3423        // optimized.
3424        let mut instance_snapshots = BTreeMap::new();
3425        let mut uncached_expressions = BTreeMap::new();
3426
3427        let optimizer_config = |catalog: &Catalog, cluster_id| {
3428            let system_config = catalog.system_config();
3429            let overrides = catalog.get_cluster(cluster_id).config.features();
3430            OptimizerConfig::from(system_config)
3431                .override_from(&overrides)
3432                // A cluster-scoped LaunchDarkly rule beats a manual `FEATURES`
3433                // pin.
3434                .override_from(
3435                    &catalog
3436                        .state()
3437                        .cluster_scoped_optimizer_overrides(cluster_id),
3438                )
3439        };
3440
3441        for entry in ordered_catalog_entries {
3442            match entry.item() {
3443                CatalogItem::Index(idx) => {
3444                    // Collect optimizer parameters.
3445                    let compute_instance =
3446                        instance_snapshots.entry(idx.cluster_id).or_insert_with(|| {
3447                            self.instance_snapshot(idx.cluster_id)
3448                                .expect("compute instance exists")
3449                        });
3450                    let global_id = idx.global_id();
3451
3452                    // The index may already be installed on the compute instance. For example,
3453                    // this is the case for introspection indexes.
3454                    if compute_instance.contains_collection(&global_id) {
3455                        continue;
3456                    }
3457
3458                    let optimizer_config = optimizer_config(&self.catalog, idx.cluster_id);
3459
3460                    let (optimized_plan, physical_plan, metainfo) =
3461                        match cached_global_exprs.remove(&global_id) {
3462                            Some(global_expressions)
3463                                if global_expressions.optimizer_features
3464                                    == optimizer_config.features =>
3465                            {
3466                                debug!("global expression cache hit for {global_id:?}");
3467                                (
3468                                    global_expressions.global_mir,
3469                                    global_expressions.physical_plan,
3470                                    global_expressions.dataflow_metainfos,
3471                                )
3472                            }
3473                            Some(_) | None => {
3474                                let (optimized_plan, global_lir_plan) = {
3475                                    // Build an optimizer for this INDEX.
3476                                    let mut optimizer = optimize::index::Optimizer::new(
3477                                        self.owned_catalog(),
3478                                        compute_instance.clone(),
3479                                        global_id,
3480                                        optimizer_config.clone(),
3481                                        self.optimizer_metrics(),
3482                                    );
3483
3484                                    // MIR ⇒ MIR optimization (global)
3485                                    let index_plan = optimize::index::Index::new(
3486                                        entry.name().clone(),
3487                                        idx.on,
3488                                        idx.keys.to_vec(),
3489                                    );
3490                                    let global_mir_plan = optimizer.optimize(index_plan)?;
3491                                    let optimized_plan = global_mir_plan.df_desc().clone();
3492
3493                                    // MIR ⇒ LIR lowering and LIR ⇒ LIR optimization (global)
3494                                    let global_lir_plan = optimizer.optimize(global_mir_plan)?;
3495
3496                                    (optimized_plan, global_lir_plan)
3497                                };
3498
3499                                let (physical_plan, metainfo) = global_lir_plan.unapply();
3500                                let metainfo = {
3501                                    // Pre-allocate a vector of transient GlobalIds for each notice.
3502                                    let notice_ids =
3503                                        std::iter::repeat_with(|| self.allocate_transient_id())
3504                                            .map(|(_item_id, gid)| gid)
3505                                            .take(metainfo.optimizer_notices.len())
3506                                            .collect::<Vec<_>>();
3507                                    // Return a metainfo with rendered notices.
3508                                    self.catalog().render_notices(
3509                                        metainfo,
3510                                        notice_ids,
3511                                        Some(idx.global_id()),
3512                                    )
3513                                };
3514                                uncached_expressions.insert(
3515                                    global_id,
3516                                    GlobalExpressions {
3517                                        global_mir: optimized_plan.clone(),
3518                                        physical_plan: physical_plan.clone(),
3519                                        dataflow_metainfos: metainfo.clone(),
3520                                        optimizer_features: optimizer_config.features.clone(),
3521                                    },
3522                                );
3523                                (optimized_plan, physical_plan, metainfo)
3524                            }
3525                        };
3526
3527                    let catalog = self.catalog_mut();
3528                    catalog.set_optimized_plan(idx.global_id(), optimized_plan);
3529                    catalog.set_physical_plan(idx.global_id(), physical_plan);
3530                    catalog.set_dataflow_metainfo(idx.global_id(), metainfo);
3531
3532                    compute_instance.insert_collection(idx.global_id());
3533                }
3534                CatalogItem::MaterializedView(mv) => {
3535                    // Collect optimizer parameters.
3536                    let compute_instance =
3537                        instance_snapshots.entry(mv.cluster_id).or_insert_with(|| {
3538                            self.instance_snapshot(mv.cluster_id)
3539                                .expect("compute instance exists")
3540                        });
3541                    let global_id = mv.global_id_writes();
3542
3543                    let optimizer_config = optimizer_config(&self.catalog, mv.cluster_id);
3544
3545                    let (optimized_plan, physical_plan, metainfo) = match cached_global_exprs
3546                        .remove(&global_id)
3547                    {
3548                        Some(global_expressions)
3549                            if global_expressions.optimizer_features
3550                                == optimizer_config.features =>
3551                        {
3552                            debug!("global expression cache hit for {global_id:?}");
3553                            (
3554                                global_expressions.global_mir,
3555                                global_expressions.physical_plan,
3556                                global_expressions.dataflow_metainfos,
3557                            )
3558                        }
3559                        Some(_) | None => {
3560                            let (_, internal_view_id) = self.allocate_transient_id();
3561                            let debug_name = self
3562                                .catalog()
3563                                .resolve_full_name(entry.name(), None)
3564                                .to_string();
3565
3566                            let (optimized_plan, global_lir_plan) = {
3567                                // Build an optimizer for this MATERIALIZED VIEW.
3568                                let mut optimizer = optimize::materialized_view::Optimizer::new(
3569                                    self.owned_catalog().as_optimizer_catalog(),
3570                                    compute_instance.clone(),
3571                                    global_id,
3572                                    internal_view_id,
3573                                    mv.desc.latest().iter_names().cloned().collect(),
3574                                    mv.non_null_assertions.clone(),
3575                                    mv.refresh_schedule.clone(),
3576                                    debug_name,
3577                                    optimizer_config.clone(),
3578                                    self.optimizer_metrics(),
3579                                );
3580
3581                                // MIR ⇒ MIR optimization (global)
3582                                // We make sure to use the HIR SQL type (since MIR SQL types may not be coherent).
3583                                let typ = infer_sql_type_for_catalog(
3584                                    &mv.raw_expr,
3585                                    &mv.locally_optimized_expr.as_ref().clone(),
3586                                );
3587                                let global_mir_plan = optimizer
3588                                    .optimize((mv.locally_optimized_expr.as_ref().clone(), typ))?;
3589                                let optimized_plan = global_mir_plan.df_desc().clone();
3590
3591                                // MIR ⇒ LIR lowering and LIR ⇒ LIR optimization (global)
3592                                let global_lir_plan = optimizer.optimize(global_mir_plan)?;
3593
3594                                (optimized_plan, global_lir_plan)
3595                            };
3596
3597                            let (physical_plan, metainfo) = global_lir_plan.unapply();
3598                            let metainfo = {
3599                                // Pre-allocate a vector of transient GlobalIds for each notice.
3600                                let notice_ids =
3601                                    std::iter::repeat_with(|| self.allocate_transient_id())
3602                                        .map(|(_item_id, global_id)| global_id)
3603                                        .take(metainfo.optimizer_notices.len())
3604                                        .collect::<Vec<_>>();
3605                                // Return a metainfo with rendered notices.
3606                                self.catalog().render_notices(
3607                                    metainfo,
3608                                    notice_ids,
3609                                    Some(mv.global_id_writes()),
3610                                )
3611                            };
3612                            uncached_expressions.insert(
3613                                global_id,
3614                                GlobalExpressions {
3615                                    global_mir: optimized_plan.clone(),
3616                                    physical_plan: physical_plan.clone(),
3617                                    dataflow_metainfos: metainfo.clone(),
3618                                    optimizer_features: optimizer_config.features.clone(),
3619                                },
3620                            );
3621                            (optimized_plan, physical_plan, metainfo)
3622                        }
3623                    };
3624
3625                    let catalog = self.catalog_mut();
3626                    catalog.set_optimized_plan(mv.global_id_writes(), optimized_plan);
3627                    catalog.set_physical_plan(mv.global_id_writes(), physical_plan);
3628                    catalog.set_dataflow_metainfo(mv.global_id_writes(), metainfo);
3629
3630                    compute_instance.insert_collection(mv.global_id_writes());
3631                }
3632                CatalogItem::Table(_)
3633                | CatalogItem::Source(_)
3634                | CatalogItem::Log(_)
3635                | CatalogItem::View(_)
3636                | CatalogItem::Sink(_)
3637                | CatalogItem::Type(_)
3638                | CatalogItem::Func(_)
3639                | CatalogItem::Secret(_)
3640                | CatalogItem::Connection(_) => (),
3641            }
3642        }
3643
3644        Ok(uncached_expressions)
3645    }
3646
3647    /// Selects for each compute dataflow an as-of suitable for bootstrapping it.
3648    ///
3649    /// Returns a set of [`ReadHold`]s that ensures the read frontiers of involved collections stay
3650    /// in place and that must not be dropped before all compute dataflows have been created with
3651    /// the compute controller.
3652    ///
3653    /// This method expects all storage collections and dataflow plans to be available, so it must
3654    /// run after [`Coordinator::bootstrap_storage_collections`] and
3655    /// [`Coordinator::bootstrap_dataflow_plans`].
3656    async fn bootstrap_dataflow_as_ofs(&mut self) -> BTreeMap<GlobalId, ReadHold> {
3657        let mut catalog_ids = Vec::new();
3658        let mut dataflows = Vec::new();
3659        let mut read_policies = BTreeMap::new();
3660        for entry in self.catalog.entries() {
3661            let gid = match entry.item() {
3662                CatalogItem::Index(idx) => idx.global_id(),
3663                CatalogItem::MaterializedView(mv) => mv.global_id_writes(),
3664                CatalogItem::Table(_)
3665                | CatalogItem::Source(_)
3666                | CatalogItem::Log(_)
3667                | CatalogItem::View(_)
3668                | CatalogItem::Sink(_)
3669                | CatalogItem::Type(_)
3670                | CatalogItem::Func(_)
3671                | CatalogItem::Secret(_)
3672                | CatalogItem::Connection(_) => continue,
3673            };
3674            if let Some(plan) = self.catalog.try_get_physical_plan(&gid) {
3675                catalog_ids.push(gid);
3676                dataflows.push(plan.clone());
3677
3678                if let Some(compaction_window) = entry.item().initial_logical_compaction_window() {
3679                    read_policies.insert(gid, compaction_window.into());
3680                }
3681            }
3682        }
3683
3684        let read_ts = self.get_local_read_ts().await;
3685        let read_holds = as_of_selection::run(
3686            &mut dataflows,
3687            &read_policies,
3688            &*self.controller.storage_collections,
3689            read_ts,
3690            self.controller.read_only(),
3691        );
3692
3693        let catalog = self.catalog_mut();
3694        for (id, plan) in catalog_ids.into_iter().zip_eq(dataflows) {
3695            catalog.set_physical_plan(id, plan);
3696        }
3697
3698        read_holds
3699    }
3700
3701    /// Serves the coordinator, receiving commands from users over `cmd_rx`
3702    /// and feedback from dataflow workers over `feedback_rx`.
3703    ///
3704    /// You must call `bootstrap` before calling this method.
3705    ///
3706    /// BOXED FUTURE: As of Nov 2023 the returned Future from this function was 92KB. This would
3707    /// get stored on the stack which is bad for runtime performance, and blow up our stack usage.
3708    /// Because of that we purposefully move this Future onto the heap (i.e. Box it).
3709    fn serve(
3710        mut self,
3711        mut internal_cmd_rx: mpsc::UnboundedReceiver<Message>,
3712        mut strict_serializable_reads_rx: mpsc::UnboundedReceiver<(ConnectionId, PendingReadTxn)>,
3713        mut cmd_rx: mpsc::UnboundedReceiver<(OpenTelemetryContext, Command)>,
3714        group_commit_rx: appends::GroupCommitWaiter,
3715    ) -> LocalBoxFuture<'static, ()> {
3716        async move {
3717            // Watcher that listens for and reports cluster service status changes.
3718            let mut cluster_events = self.controller.events_stream();
3719            let last_message = Arc::new(Mutex::new(LastMessage {
3720                kind: "none",
3721                stmt: None,
3722            }));
3723
3724            let (idle_tx, mut idle_rx) = tokio::sync::mpsc::channel(1);
3725            let idle_metric = self.metrics.queue_busy_seconds.clone();
3726            let last_message_watchdog = Arc::clone(&last_message);
3727
3728            spawn(|| "coord watchdog", async move {
3729                // Every 5 seconds, attempt to measure how long it takes for the
3730                // coord select loop to be empty, because this message is the last
3731                // processed. If it is idle, this will result in some microseconds
3732                // of measurement.
3733                let mut interval = tokio::time::interval(Duration::from_secs(5));
3734                // If we end up having to wait more than 5 seconds for the coord to respond, then the
3735                // behavior of Delay results in the interval "restarting" from whenever we yield
3736                // instead of trying to catch up.
3737                interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
3738
3739                // Track if we become stuck to de-dupe error reporting.
3740                let mut coord_stuck = false;
3741
3742                loop {
3743                    interval.tick().await;
3744
3745                    // Wait for space in the channel, if we timeout then the coordinator is stuck!
3746                    let duration = tokio::time::Duration::from_secs(30);
3747                    let timeout = tokio::time::timeout(duration, idle_tx.reserve()).await;
3748                    let Ok(maybe_permit) = timeout else {
3749                        // Only log if we're newly stuck, to prevent logging repeatedly.
3750                        if !coord_stuck {
3751                            let last_message = last_message_watchdog.lock().expect("poisoned");
3752                            tracing::warn!(
3753                                last_message_kind = %last_message.kind,
3754                                last_message_sql = %last_message.stmt_to_string(),
3755                                "coordinator stuck for {duration:?}",
3756                            );
3757                        }
3758                        coord_stuck = true;
3759
3760                        continue;
3761                    };
3762
3763                    // We got a permit, we're not stuck!
3764                    if coord_stuck {
3765                        tracing::info!("Coordinator became unstuck");
3766                    }
3767                    coord_stuck = false;
3768
3769                    // If we failed to acquire a permit it's because we're shutting down.
3770                    let Ok(permit) = maybe_permit else {
3771                        break;
3772                    };
3773
3774                    permit.send(idle_metric.start_timer());
3775                }
3776            });
3777
3778            self.schedule_storage_usage_collection().await;
3779            self.schedule_arrangement_sizes_collection().await;
3780            self.spawn_privatelink_vpc_endpoints_watch_task();
3781            self.spawn_statement_logging_task();
3782            self.spawn_catalog_info_metrics_task();
3783            flags::tracing_config(self.catalog.system_config()).apply(&self.tracing_handle);
3784
3785            // Report if the handling of a single message takes longer than this threshold.
3786            let warn_threshold = self
3787                .catalog()
3788                .system_config()
3789                .coord_slow_message_warn_threshold();
3790
3791            // How many messages we'd like to batch up before processing them. Must be > 0.
3792            const MESSAGE_BATCH: usize = 64;
3793            let mut messages = Vec::with_capacity(MESSAGE_BATCH);
3794            let mut cmd_messages = Vec::with_capacity(MESSAGE_BATCH);
3795
3796            let message_batch = self.metrics.message_batch.clone();
3797
3798            loop {
3799                // Before adding a branch to this select loop, please ensure that the branch is
3800                // cancellation safe and add a comment explaining why. You can refer here for more
3801                // info: https://docs.rs/tokio/latest/tokio/macro.select.html#cancellation-safety
3802                select! {
3803                    // We prioritize internal commands over other commands. However, we work through
3804                    // batches of commands in some branches of this select, which means that even if
3805                    // a command generates internal commands, we will work through the current batch
3806                    // before receiving a new batch of commands.
3807                    biased;
3808
3809                    // `recv_many()` on `UnboundedReceiver` is cancellation safe:
3810                    // https://docs.rs/tokio/1.38.0/tokio/sync/mpsc/struct.UnboundedReceiver.html#cancel-safety-1
3811                    // Receive a batch of commands.
3812                    _ = internal_cmd_rx.recv_many(&mut messages, MESSAGE_BATCH) => {},
3813                    // `next()` on any stream is cancel-safe:
3814                    // https://docs.rs/tokio-stream/0.1.9/tokio_stream/trait.StreamExt.html#cancel-safety
3815                    // Receive a single command.
3816                    Some(event) = cluster_events.next() => {
3817                        messages.push(Message::ClusterEvent(event))
3818                    },
3819                    // See [`mz_controller::Controller::Controller::ready`] for notes
3820                    // on why this is cancel-safe.
3821                    // Receive a single command.
3822                    () = self.controller.ready() => {
3823                        // NOTE: We don't get a `Readiness` back from `ready()`
3824                        // because the controller wants to keep it and it's not
3825                        // trivially `Clone` or `Copy`. Hence this accessor.
3826                        let controller = match self.controller.get_readiness() {
3827                            Readiness::Storage => ControllerReadiness::Storage,
3828                            Readiness::Compute => ControllerReadiness::Compute,
3829                            Readiness::Metrics(_) => ControllerReadiness::Metrics,
3830                            Readiness::Internal(_) => ControllerReadiness::Internal,
3831                            Readiness::NotReady => unreachable!("just signaled as ready"),
3832                        };
3833                        messages.push(Message::ControllerReady { controller });
3834                    }
3835                    // See [`appends::GroupCommitWaiter`] for notes on why this is cancel safe.
3836                    // Receive a single command.
3837                    permit = group_commit_rx.ready() => {
3838                        // If we happen to have batched exactly one user write, use
3839                        // that span so the `emit_trace_id_notice` hooks up.
3840                        // Otherwise, the best we can do is invent a new root span
3841                        // and make it follow from all the Spans in the pending
3842                        // writes.
3843                        let user_write_spans = self.pending_writes.iter().flat_map(|x| match x {
3844                            PendingWriteTxn::User{span, ..} => Some(span),
3845                            PendingWriteTxn::System{..} => None,
3846                        });
3847                        let span = match user_write_spans.exactly_one() {
3848                            Ok(span) => span.clone(),
3849                            Err(user_write_spans) => {
3850                                let span = info_span!(parent: None, "group_commit_notify");
3851                                for s in user_write_spans {
3852                                    span.follows_from(s);
3853                                }
3854                                span
3855                            }
3856                        };
3857                        messages.push(Message::GroupCommitInitiate(span, Some(permit)));
3858                    },
3859                    // `recv_many()` on `UnboundedReceiver` is cancellation safe:
3860                    // https://docs.rs/tokio/1.38.0/tokio/sync/mpsc/struct.UnboundedReceiver.html#cancel-safety-1
3861                    // Receive a batch of commands.
3862                    count = cmd_rx.recv_many(&mut cmd_messages, MESSAGE_BATCH) => {
3863                        if count == 0 {
3864                            break;
3865                        } else {
3866                            messages.extend(cmd_messages.drain(..).map(
3867                                |(otel_ctx, cmd)| Message::Command(otel_ctx, cmd),
3868                            ));
3869                        }
3870                    },
3871                    // `recv()` on `UnboundedReceiver` is cancellation safe:
3872                    // https://docs.rs/tokio/1.38.0/tokio/sync/mpsc/struct.UnboundedReceiver.html#cancel-safety
3873                    // Receive a single command.
3874                    Some(pending_read_txn) = strict_serializable_reads_rx.recv() => {
3875                        let mut pending_read_txns = vec![pending_read_txn];
3876                        while let Ok(pending_read_txn) = strict_serializable_reads_rx.try_recv() {
3877                            pending_read_txns.push(pending_read_txn);
3878                        }
3879                        for (conn_id, pending_read_txn) in pending_read_txns {
3880                            let prev = self
3881                                .pending_linearize_read_txns
3882                                .insert(conn_id, pending_read_txn);
3883                            soft_assert_or_log!(
3884                                prev.is_none(),
3885                                "connections can not have multiple concurrent reads, prev: {prev:?}"
3886                            )
3887                        }
3888                        messages.push(Message::LinearizeReads);
3889                    }
3890                    // `tick()` on `Interval` is cancel-safe:
3891                    // https://docs.rs/tokio/1.19.2/tokio/time/struct.Interval.html#cancel-safety
3892                    // Receive a single command.
3893                    _ = self.advance_timelines_interval.tick() => {
3894                        let span = info_span!(parent: None, "coord::advance_timelines_interval");
3895                        span.follows_from(Span::current());
3896
3897                        // Group commit sends an `AdvanceTimelines` message when
3898                        // done, which is what downgrades read holds. In
3899                        // read-only mode we send this message directly because
3900                        // we're not doing group commits.
3901                        if self.controller.read_only() {
3902                            messages.push(Message::AdvanceTimelines);
3903                        } else {
3904                            messages.push(Message::GroupCommitInitiate(span, None));
3905                        }
3906                    },
3907                    // `tick()` on `Interval` is cancel-safe:
3908                    // https://docs.rs/tokio/1.19.2/tokio/time/struct.Interval.html#cancel-safety
3909                    // Receive a single command.
3910                    _ = self.check_cluster_scheduling_policies_interval.tick() => {
3911                        messages.push(Message::CheckSchedulingPolicies);
3912                    },
3913
3914                    // `tick()` on `Interval` is cancel-safe:
3915                    // https://docs.rs/tokio/1.19.2/tokio/time/struct.Interval.html#cancel-safety
3916                    // Receive a single command.
3917                    _ = self.caught_up_check_interval.tick() => {
3918                        // We do this directly on the main loop instead of
3919                        // firing off a message. We are still in read-only mode,
3920                        // so optimizing for latency, not blocking the main loop
3921                        // is not that important.
3922                        self.maybe_check_caught_up().await;
3923
3924                        continue;
3925                    },
3926
3927                    // Process the idle metric at the lowest priority to sample queue non-idle time.
3928                    // `recv()` on `Receiver` is cancellation safe:
3929                    // https://docs.rs/tokio/1.8.0/tokio/sync/mpsc/struct.Receiver.html#cancel-safety
3930                    // Receive a single command.
3931                    timer = idle_rx.recv() => {
3932                        timer.expect("does not drop").observe_duration();
3933                        self.metrics
3934                            .message_handling
3935                            .with_label_values(&["watchdog"])
3936                            .observe(0.0);
3937                        continue;
3938                    }
3939                };
3940
3941                // Observe the number of messages we're processing at once.
3942                message_batch.observe(f64::cast_lossy(messages.len()));
3943
3944                for msg in messages.drain(..) {
3945                    // All message processing functions trace. Start a parent span
3946                    // for them to make it easy to find slow messages.
3947                    let msg_kind = msg.kind();
3948                    let span = span!(
3949                        target: "mz_adapter::coord::handle_message_loop",
3950                        Level::INFO,
3951                        "coord::handle_message",
3952                        kind = msg_kind
3953                    );
3954                    let otel_context = span.context().span().span_context().clone();
3955
3956                    // Record the last kind of message in case we get stuck. For
3957                    // execute commands, we additionally stash the user's SQL,
3958                    // statement, so we can log it in case we get stuck.
3959                    *last_message.lock().expect("poisoned") = LastMessage {
3960                        kind: msg_kind,
3961                        stmt: match &msg {
3962                            Message::Command(
3963                                _,
3964                                Command::Execute {
3965                                    portal_name,
3966                                    session,
3967                                    ..
3968                                },
3969                            ) => session
3970                                .get_portal_unverified(portal_name)
3971                                .and_then(|p| p.stmt.as_ref().map(Arc::clone)),
3972                            _ => None,
3973                        },
3974                    };
3975
3976                    let start = Instant::now();
3977                    self.handle_message(msg).instrument(span).await;
3978                    let duration = start.elapsed();
3979
3980                    self.metrics
3981                        .message_handling
3982                        .with_label_values(&[msg_kind])
3983                        .observe(duration.as_secs_f64());
3984
3985                    // If something is _really_ slow, print a trace id for debugging, if OTEL is enabled.
3986                    if duration > warn_threshold {
3987                        let trace_id = otel_context.is_valid().then(|| otel_context.trace_id());
3988                        tracing::error!(
3989                            ?msg_kind,
3990                            ?trace_id,
3991                            ?duration,
3992                            "very slow coordinator message"
3993                        );
3994                    }
3995                }
3996            }
3997            // Try and cleanup as a best effort. There may be some async tasks out there holding a
3998            // reference that prevents us from cleaning up.
3999            if let Some(catalog) = Arc::into_inner(self.catalog) {
4000                catalog.expire().await;
4001            }
4002        }
4003        .boxed_local()
4004    }
4005
4006    /// Obtain a read-only Catalog reference.
4007    fn catalog(&self) -> &Catalog {
4008        &self.catalog
4009    }
4010
4011    /// Obtain a read-only Catalog snapshot, suitable for giving out to
4012    /// non-Coordinator thread tasks.
4013    fn owned_catalog(&self) -> Arc<Catalog> {
4014        Arc::clone(&self.catalog)
4015    }
4016
4017    /// Obtain a handle to the optimizer metrics, suitable for giving
4018    /// out to non-Coordinator thread tasks.
4019    fn optimizer_metrics(&self) -> OptimizerMetrics {
4020        self.optimizer_metrics.clone()
4021    }
4022
4023    /// Obtain a writeable Catalog reference.
4024    fn catalog_mut(&mut self) -> &mut Catalog {
4025        // make_mut will cause any other Arc references (from owned_catalog) to
4026        // continue to be valid by cloning the catalog, putting it in a new Arc,
4027        // which lives at self._catalog. If there are no other Arc references,
4028        // then no clone is made, and it returns a reference to the existing
4029        // object. This makes this method and owned_catalog both very cheap: at
4030        // most one clone per catalog mutation, but only if there's a read-only
4031        // reference to it.
4032        Arc::make_mut(&mut self.catalog)
4033    }
4034
4035    /// Refills the user ID pool by allocating IDs from the catalog.
4036    ///
4037    /// Requests `max(min_count, batch_size)` IDs so the pool is never
4038    /// under-filled relative to the configured batch size.
4039    async fn refill_user_id_pool(&mut self, min_count: u64) -> Result<(), AdapterError> {
4040        let batch_size = USER_ID_POOL_BATCH_SIZE.get(self.catalog().system_config().dyncfgs());
4041        let to_allocate = min_count.max(u64::from(batch_size));
4042        let id_ts = self.get_catalog_write_ts().await;
4043        let ids = self.catalog().allocate_user_ids(to_allocate, id_ts).await?;
4044        if let (Some((first_id, _)), Some((last_id, _))) = (ids.first(), ids.last()) {
4045            let start = match first_id {
4046                CatalogItemId::User(id) => *id,
4047                other => {
4048                    return Err(AdapterError::Internal(format!(
4049                        "expected User CatalogItemId, got {other:?}"
4050                    )));
4051                }
4052            };
4053            let end = match last_id {
4054                CatalogItemId::User(id) => *id + 1, // exclusive upper bound
4055                other => {
4056                    return Err(AdapterError::Internal(format!(
4057                        "expected User CatalogItemId, got {other:?}"
4058                    )));
4059                }
4060            };
4061            self.user_id_pool.refill(start, end);
4062        } else {
4063            return Err(AdapterError::Internal(
4064                "catalog returned no user IDs".into(),
4065            ));
4066        }
4067        Ok(())
4068    }
4069
4070    /// Allocates a single user ID, refilling the pool from the catalog if needed.
4071    async fn allocate_user_id(&mut self) -> Result<(CatalogItemId, GlobalId), AdapterError> {
4072        if let Some(id) = self.user_id_pool.allocate() {
4073            return Ok((CatalogItemId::User(id), GlobalId::User(id)));
4074        }
4075        self.refill_user_id_pool(1).await?;
4076        let id = self.user_id_pool.allocate().expect("ID pool just refilled");
4077        Ok((CatalogItemId::User(id), GlobalId::User(id)))
4078    }
4079
4080    /// Allocates `count` user IDs, refilling the pool from the catalog if needed.
4081    async fn allocate_user_ids(
4082        &mut self,
4083        count: u64,
4084    ) -> Result<Vec<(CatalogItemId, GlobalId)>, AdapterError> {
4085        if self.user_id_pool.remaining() < count {
4086            self.refill_user_id_pool(count).await?;
4087        }
4088        let raw_ids = self
4089            .user_id_pool
4090            .allocate_many(count)
4091            .expect("pool has enough IDs after refill");
4092        Ok(raw_ids
4093            .into_iter()
4094            .map(|id| (CatalogItemId::User(id), GlobalId::User(id)))
4095            .collect())
4096    }
4097
4098    /// Obtain a reference to the coordinator's connection context.
4099    fn connection_context(&self) -> &ConnectionContext {
4100        self.controller.connection_context()
4101    }
4102
4103    /// Obtain a reference to the coordinator's secret reader, in an `Arc`.
4104    fn secrets_reader(&self) -> &Arc<dyn SecretsReader> {
4105        &self.connection_context().secrets_reader
4106    }
4107
4108    /// Publishes a notice message to all sessions.
4109    ///
4110    /// TODO(parkmycar): This code is dead, but is a nice parallel to [`Coordinator::broadcast_notice_tx`]
4111    /// so we keep it around.
4112    #[allow(dead_code)]
4113    pub(crate) fn broadcast_notice(&self, notice: AdapterNotice) {
4114        for meta in self.active_conns.values() {
4115            let _ = meta.notice_tx.send(notice.clone());
4116        }
4117    }
4118
4119    /// Returns a closure that will publish a notice to all sessions that were active at the time
4120    /// this method was called.
4121    pub(crate) fn broadcast_notice_tx(
4122        &self,
4123    ) -> Box<dyn FnOnce(AdapterNotice) -> () + Send + 'static> {
4124        let senders: Vec<_> = self
4125            .active_conns
4126            .values()
4127            .map(|meta| meta.notice_tx.clone())
4128            .collect();
4129        Box::new(move |notice| {
4130            for tx in senders {
4131                let _ = tx.send(notice.clone());
4132            }
4133        })
4134    }
4135
4136    pub(crate) fn active_conns(&self) -> &BTreeMap<ConnectionId, ConnMeta> {
4137        &self.active_conns
4138    }
4139
4140    #[instrument(level = "debug")]
4141    pub(crate) fn retire_execution(
4142        &mut self,
4143        reason: StatementEndedExecutionReason,
4144        ctx_extra: ExecuteContextExtra,
4145    ) {
4146        if let Some(uuid) = ctx_extra.retire() {
4147            self.end_statement_execution(uuid, reason);
4148        }
4149    }
4150
4151    /// Creates a new dataflow builder from the catalog and indexes in `self`.
4152    #[instrument(level = "debug")]
4153    pub fn dataflow_builder(&self, instance: ComputeInstanceId) -> DataflowBuilder<'_> {
4154        let compute = self
4155            .instance_snapshot(instance)
4156            .expect("compute instance does not exist");
4157        DataflowBuilder::new(self.catalog().state(), compute)
4158    }
4159
4160    /// Return a reference-less snapshot to the indicated compute instance.
4161    pub fn instance_snapshot(
4162        &self,
4163        id: ComputeInstanceId,
4164    ) -> Result<ComputeInstanceSnapshot, InstanceMissing> {
4165        ComputeInstanceSnapshot::new(&self.controller, id)
4166    }
4167
4168    /// Call into the compute controller to install a finalized dataflow, and
4169    /// initialize the read policies for its exported readable objects.
4170    ///
4171    /// # Panics
4172    ///
4173    /// Panics if dataflow creation fails.
4174    pub(crate) async fn ship_dataflow(
4175        &mut self,
4176        dataflow: DataflowDescription<Plan>,
4177        instance: ComputeInstanceId,
4178        target_replica: Option<ReplicaId>,
4179    ) {
4180        self.try_ship_dataflow(dataflow, instance, target_replica)
4181            .await
4182            .unwrap_or_terminate("dataflow creation cannot fail");
4183    }
4184
4185    /// Call into the compute controller to install a finalized dataflow, and
4186    /// initialize the read policies for its exported readable objects.
4187    pub(crate) async fn try_ship_dataflow(
4188        &mut self,
4189        dataflow: DataflowDescription<Plan>,
4190        instance: ComputeInstanceId,
4191        target_replica: Option<ReplicaId>,
4192    ) -> Result<(), DataflowCreationError> {
4193        // We must only install read policies for indexes, not for sinks.
4194        // Sinks are write-only compute collections that don't have read policies.
4195        let export_ids = dataflow.exported_index_ids().collect();
4196
4197        self.controller
4198            .compute
4199            .create_dataflow(instance, dataflow, target_replica)?;
4200
4201        self.initialize_compute_read_policies(export_ids, instance, CompactionWindow::Default)
4202            .await;
4203
4204        Ok(())
4205    }
4206
4207    /// Call into the compute controller to allow writes to the specified IDs
4208    /// from the specified instance. Calling this function multiple times and
4209    /// calling it on a read-only instance has no effect.
4210    pub(crate) fn allow_writes(&mut self, instance: ComputeInstanceId, id: GlobalId) {
4211        self.controller
4212            .compute
4213            .allow_writes(instance, id)
4214            .unwrap_or_terminate("allow_writes cannot fail");
4215    }
4216
4217    /// Like `ship_dataflow`, but also await on builtin table updates.
4218    pub(crate) async fn ship_dataflow_and_notice_builtin_table_updates(
4219        &mut self,
4220        dataflow: DataflowDescription<Plan>,
4221        instance: ComputeInstanceId,
4222        notice_builtin_updates_fut: Option<BuiltinTableAppendNotify>,
4223        target_replica: Option<ReplicaId>,
4224    ) {
4225        if let Some(notice_builtin_updates_fut) = notice_builtin_updates_fut {
4226            let ship_dataflow_fut = self.ship_dataflow(dataflow, instance, target_replica);
4227            let ((), ()) =
4228                futures::future::join(notice_builtin_updates_fut, ship_dataflow_fut).await;
4229        } else {
4230            self.ship_dataflow(dataflow, instance, target_replica).await;
4231        }
4232    }
4233
4234    /// Install a _watch set_ in the controller that is automatically associated with the given
4235    /// connection id. The watchset will be automatically cleared if the connection terminates
4236    /// before the watchset completes.
4237    pub fn install_compute_watch_set(
4238        &mut self,
4239        conn_id: ConnectionId,
4240        objects: BTreeSet<GlobalId>,
4241        t: Timestamp,
4242        state: WatchSetResponse,
4243    ) -> Result<(), CollectionLookupError> {
4244        let ws_id = self.controller.install_compute_watch_set(objects, t)?;
4245        self.connection_watch_sets
4246            .entry(conn_id.clone())
4247            .or_default()
4248            .insert(ws_id);
4249        self.installed_watch_sets.insert(ws_id, (conn_id, state));
4250        Ok(())
4251    }
4252
4253    /// Install a _watch set_ in the controller that is automatically associated with the given
4254    /// connection id. The watchset will be automatically cleared if the connection terminates
4255    /// before the watchset completes.
4256    pub fn install_storage_watch_set(
4257        &mut self,
4258        conn_id: ConnectionId,
4259        objects: BTreeSet<GlobalId>,
4260        t: Timestamp,
4261        state: WatchSetResponse,
4262    ) -> Result<(), CollectionMissing> {
4263        let ws_id = self.controller.install_storage_watch_set(objects, t)?;
4264        self.connection_watch_sets
4265            .entry(conn_id.clone())
4266            .or_default()
4267            .insert(ws_id);
4268        self.installed_watch_sets.insert(ws_id, (conn_id, state));
4269        Ok(())
4270    }
4271
4272    /// Cancels pending watchsets associated with the provided connection id.
4273    pub fn cancel_pending_watchsets(&mut self, conn_id: &ConnectionId) {
4274        if let Some(ws_ids) = self.connection_watch_sets.remove(conn_id) {
4275            for ws_id in ws_ids {
4276                self.installed_watch_sets.remove(&ws_id);
4277            }
4278        }
4279    }
4280
4281    /// Returns the state of the [`Coordinator`] formatted as JSON.
4282    ///
4283    /// The returned value is not guaranteed to be stable and may change at any point in time.
4284    pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
4285        // Note: We purposefully use the `Debug` formatting for the value of all fields in the
4286        // returned object as a tradeoff between usability and stability. `serde_json` will fail
4287        // to serialize an object if the keys aren't strings, so `Debug` formatting the values
4288        // prevents a future unrelated change from silently breaking this method.
4289
4290        let global_timelines: BTreeMap<_, _> = self
4291            .global_timelines
4292            .iter()
4293            .map(|(timeline, state)| (timeline.to_string(), format!("{state:?}")))
4294            .collect();
4295        let active_conns: BTreeMap<_, _> = self
4296            .active_conns
4297            .iter()
4298            .map(|(id, meta)| (id.unhandled().to_string(), format!("{meta:?}")))
4299            .collect();
4300        let txn_read_holds: BTreeMap<_, _> = self
4301            .txn_read_holds
4302            .iter()
4303            .map(|(id, capability)| (id.unhandled().to_string(), format!("{capability:?}")))
4304            .collect();
4305        let pending_peeks: BTreeMap<_, _> = self
4306            .pending_peeks
4307            .iter()
4308            .map(|(id, peek)| (id.to_string(), format!("{peek:?}")))
4309            .collect();
4310        let client_pending_peeks: BTreeMap<_, _> = self
4311            .client_pending_peeks
4312            .iter()
4313            .map(|(id, peek)| {
4314                let peek: BTreeMap<_, _> = peek
4315                    .iter()
4316                    .map(|(uuid, storage_id)| (uuid.to_string(), storage_id))
4317                    .collect();
4318                (id.to_string(), peek)
4319            })
4320            .collect();
4321        let pending_linearize_read_txns: BTreeMap<_, _> = self
4322            .pending_linearize_read_txns
4323            .iter()
4324            .map(|(id, read_txn)| (id.unhandled().to_string(), format!("{read_txn:?}")))
4325            .collect();
4326
4327        Ok(serde_json::json!({
4328            "global_timelines": global_timelines,
4329            "active_conns": active_conns,
4330            "txn_read_holds": txn_read_holds,
4331            "pending_peeks": pending_peeks,
4332            "client_pending_peeks": client_pending_peeks,
4333            "pending_linearize_read_txns": pending_linearize_read_txns,
4334            "controller": self.controller.dump().await?,
4335        }))
4336    }
4337
4338    /// Prune all storage usage events from the [`MZ_STORAGE_USAGE_BY_SHARD`] table that are older
4339    /// than `retention_period`.
4340    ///
4341    /// This method will read the entire contents of [`MZ_STORAGE_USAGE_BY_SHARD`] into memory
4342    /// which can be expensive.
4343    ///
4344    /// DO NOT call this method outside of startup. The safety of reading at the current oracle read
4345    /// timestamp and then writing at whatever the current write timestamp is (instead of
4346    /// `read_ts + 1`) relies on the fact that there are no outstanding writes during startup.
4347    ///
4348    /// Group commit, which this method uses to write the retractions, has builtin fencing, and we
4349    /// never commit retractions to [`MZ_STORAGE_USAGE_BY_SHARD`] outside of this method, which is
4350    /// only called once during startup. So we don't have to worry about double/invalid retractions.
4351    async fn prune_storage_usage_events_on_startup(&self, retention_period: Duration) {
4352        let item_id = self
4353            .catalog()
4354            .resolve_builtin_table(&MZ_STORAGE_USAGE_BY_SHARD);
4355        let global_id = self.catalog.get_entry(&item_id).latest_global_id();
4356        let read_ts = self.get_local_read_ts().await;
4357        let current_contents_fut = self
4358            .controller
4359            .storage_collections
4360            .snapshot(global_id, read_ts);
4361        let internal_cmd_tx = self.internal_cmd_tx.clone();
4362        spawn(|| "storage_usage_prune", async move {
4363            let mut current_contents = current_contents_fut
4364                .await
4365                .unwrap_or_terminate("cannot fail to fetch snapshot");
4366            differential_dataflow::consolidation::consolidate(&mut current_contents);
4367
4368            let cutoff_ts = u128::from(read_ts).saturating_sub(retention_period.as_millis());
4369            let mut expired = Vec::new();
4370            for (row, diff) in current_contents {
4371                assert_eq!(
4372                    diff, 1,
4373                    "consolidated contents should not contain retractions: ({row:#?}, {diff:#?})"
4374                );
4375                // This logic relies on the definition of `mz_storage_usage_by_shard` not changing.
4376                let collection_timestamp = row
4377                    .unpack()
4378                    .get(3)
4379                    .expect("definition of mz_storage_by_shard changed")
4380                    .unwrap_timestamptz();
4381                let collection_timestamp = collection_timestamp.timestamp_millis();
4382                let collection_timestamp: u128 = collection_timestamp
4383                    .try_into()
4384                    .expect("all collections happen after Jan 1 1970");
4385                if collection_timestamp < cutoff_ts {
4386                    debug!("pruning storage event {row:?}");
4387                    let builtin_update = BuiltinTableUpdate::row(item_id, row, Diff::MINUS_ONE);
4388                    expired.push(builtin_update);
4389                }
4390            }
4391
4392            // main thread has shut down.
4393            let _ = internal_cmd_tx.send(Message::StorageUsagePrune(expired));
4394        });
4395    }
4396
4397    /// Retracts `mz_object_arrangement_size_history` rows older than the
4398    /// `arrangement_size_history_retention_period` dyncfg.
4399    ///
4400    /// Must only run at startup: it reads at the oracle read timestamp and
4401    /// writes retractions at the current write timestamp, which is only safe
4402    /// when no other writes are in flight. See [the equivalent storage-usage
4403    /// pruner](Self::prune_storage_usage_events_on_startup) for the same
4404    /// reasoning.
4405    async fn prune_arrangement_sizes_history_on_startup(&self) {
4406        // The catalog server is not writable in read-only mode.
4407        if self.controller.read_only() {
4408            return;
4409        }
4410
4411        let retention_period = mz_adapter_types::dyncfgs::ARRANGEMENT_SIZE_HISTORY_RETENTION_PERIOD
4412            .get(self.catalog().system_config().dyncfgs());
4413        let item_id = self
4414            .catalog()
4415            .resolve_builtin_table(&mz_catalog::builtin::MZ_OBJECT_ARRANGEMENT_SIZE_HISTORY);
4416        let global_id = self.catalog.get_entry(&item_id).latest_global_id();
4417        let read_ts = self.get_local_read_ts().await;
4418        let current_contents_fut = self
4419            .controller
4420            .storage_collections
4421            .snapshot(global_id, read_ts);
4422        let internal_cmd_tx = self.internal_cmd_tx.clone();
4423        spawn(|| "arrangement_sizes_history_prune", async move {
4424            let mut current_contents = current_contents_fut
4425                .await
4426                .unwrap_or_terminate("cannot fail to fetch snapshot");
4427            differential_dataflow::consolidation::consolidate(&mut current_contents);
4428
4429            let cutoff_ts = u128::from(read_ts).saturating_sub(retention_period.as_millis());
4430            let expired =
4431                arrangement_sizes_expired_retractions(current_contents, cutoff_ts, item_id);
4432
4433            // TODO(arrangement-sizes): when the writeable-catalog-server
4434            // plumbing in https://github.com/MaterializeInc/materialize/pull/35436
4435            // lands, retract directly on `mz_catalog_server`.
4436            let _ = internal_cmd_tx.send(Message::ArrangementSizesPrune(expired));
4437        });
4438    }
4439
4440    fn current_credit_consumption_rate(&self) -> Numeric {
4441        self.catalog()
4442            .user_cluster_replicas()
4443            .filter_map(|replica| match &replica.config.location {
4444                ReplicaLocation::Managed(location) => Some(location.size_for_billing()),
4445                ReplicaLocation::Unmanaged(_) => None,
4446            })
4447            .map(|size| {
4448                self.catalog()
4449                    .cluster_replica_sizes()
4450                    .0
4451                    .get(size)
4452                    .expect("location size is validated against the cluster replica sizes")
4453                    .credits_per_hour
4454            })
4455            .sum()
4456    }
4457}
4458
4459/// Returns retraction updates for rows in a consolidated
4460/// `mz_object_arrangement_size_history` snapshot whose `collection_timestamp`
4461/// (column 3) is strictly before `cutoff_ts`.
4462///
4463/// Panics if any input row has `diff != 1`: the caller must consolidate first,
4464/// and a consolidated history table should never contain retractions because
4465/// the only source of retractions is this function itself.
4466fn arrangement_sizes_expired_retractions(
4467    rows: impl IntoIterator<Item = (mz_repr::Row, i64)>,
4468    cutoff_ts: u128,
4469    item_id: CatalogItemId,
4470) -> Vec<BuiltinTableUpdate> {
4471    let mut expired = Vec::new();
4472    for (row, diff) in rows {
4473        assert_eq!(
4474            diff, 1,
4475            "consolidated contents should not contain retractions: ({row:#?}, {diff:#?})"
4476        );
4477        let collection_timestamp = row
4478            .unpack()
4479            .get(3)
4480            .expect("definition of mz_object_arrangement_size_history changed")
4481            .unwrap_timestamptz()
4482            .timestamp_millis();
4483        let collection_timestamp: u128 = collection_timestamp
4484            .try_into()
4485            .expect("all collections happen after Jan 1 1970");
4486        if collection_timestamp < cutoff_ts {
4487            expired.push(BuiltinTableUpdate::row(item_id, row, Diff::MINUS_ONE));
4488        }
4489    }
4490    expired
4491}
4492
4493#[cfg(test)]
4494impl Coordinator {
4495    #[allow(dead_code)]
4496    async fn verify_ship_dataflow_no_error(&mut self, dataflow: DataflowDescription<Plan>) {
4497        // `ship_dataflow_new` is not allowed to have a `Result` return because this function is
4498        // called after `catalog_transact`, after which no errors are allowed. This test exists to
4499        // prevent us from incorrectly teaching those functions how to return errors (which has
4500        // happened twice and is the motivation for this test).
4501
4502        // An arbitrary compute instance ID to satisfy the function calls below. Note that
4503        // this only works because this function will never run.
4504        let compute_instance = ComputeInstanceId::user(1).expect("1 is a valid ID");
4505
4506        let _: () = self.ship_dataflow(dataflow, compute_instance, None).await;
4507    }
4508}
4509
4510/// Contains information about the last message the [`Coordinator`] processed.
4511struct LastMessage {
4512    kind: &'static str,
4513    stmt: Option<Arc<Statement<Raw>>>,
4514}
4515
4516impl LastMessage {
4517    /// Returns a redacted version of the statement that is safe for logs.
4518    fn stmt_to_string(&self) -> Cow<'static, str> {
4519        self.stmt
4520            .as_ref()
4521            .map(|stmt| stmt.to_ast_string_redacted().into())
4522            .unwrap_or(Cow::Borrowed("<none>"))
4523    }
4524}
4525
4526impl fmt::Debug for LastMessage {
4527    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4528        f.debug_struct("LastMessage")
4529            .field("kind", &self.kind)
4530            .field("stmt", &self.stmt_to_string())
4531            .finish()
4532    }
4533}
4534
4535impl Drop for LastMessage {
4536    fn drop(&mut self) {
4537        // Only print the last message if we're currently panicking, otherwise we'd spam our logs.
4538        if std::thread::panicking() {
4539            // If we're panicking theres no guarantee `tracing` still works, so print to stderr.
4540            eprintln!("Coordinator panicking, dumping last message\n{self:?}",);
4541        }
4542    }
4543}
4544
4545/// Serves the coordinator based on the provided configuration.
4546///
4547/// For a high-level description of the coordinator, see the [crate
4548/// documentation](crate).
4549///
4550/// Returns a handle to the coordinator and a client to communicate with the
4551/// coordinator.
4552///
4553/// BOXED FUTURE: As of Nov 2023 the returned Future from this function was 42KB. This would
4554/// get stored on the stack which is bad for runtime performance, and blow up our stack usage.
4555/// Because of that we purposefully move this Future onto the heap (i.e. Box it).
4556pub fn serve(
4557    Config {
4558        controller_config,
4559        controller_envd_epoch,
4560        mut storage,
4561        audit_logs_iterator,
4562        timestamp_oracle_url,
4563        unsafe_mode,
4564        all_features,
4565        build_info,
4566        environment_id,
4567        metrics_registry,
4568        now,
4569        secrets_controller,
4570        cloud_resource_controller,
4571        cluster_replica_sizes,
4572        builtin_system_cluster_config,
4573        builtin_catalog_server_cluster_config,
4574        builtin_probe_cluster_config,
4575        builtin_support_cluster_config,
4576        builtin_analytics_cluster_config,
4577        system_parameter_defaults,
4578        availability_zones,
4579        storage_usage_client,
4580        storage_usage_collection_interval,
4581        storage_usage_retention_period,
4582        segment_client,
4583        egress_addresses,
4584        aws_account_id,
4585        aws_privatelink_availability_zones,
4586        connection_context,
4587        connection_limit_callback,
4588        remote_system_parameters,
4589        webhook_concurrency_limit,
4590        http_host_name,
4591        tracing_handle,
4592        read_only_controllers,
4593        caught_up_trigger: clusters_caught_up_trigger,
4594        helm_chart_version,
4595        license_key,
4596        external_login_password_mz_system,
4597        force_builtin_schema_migration,
4598    }: Config,
4599) -> BoxFuture<'static, Result<(Handle, Client), AdapterError>> {
4600    async move {
4601        let coord_start = Instant::now();
4602        info!("startup: coordinator init: beginning");
4603        info!("startup: coordinator init: preamble beginning");
4604
4605        // Initializing the builtins can be an expensive process and consume a lot of memory. We
4606        // forcibly initialize it early while the stack is relatively empty to avoid stack
4607        // overflows later.
4608        let _builtins = LazyLock::force(&BUILTINS_STATIC);
4609
4610        let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
4611        let (internal_cmd_tx, internal_cmd_rx) = mpsc::unbounded_channel();
4612        let (strict_serializable_reads_tx, strict_serializable_reads_rx) =
4613            mpsc::unbounded_channel();
4614
4615        // Validate and process availability zones.
4616        if !availability_zones.iter().all_unique() {
4617            coord_bail!("availability zones must be unique");
4618        }
4619
4620        let aws_principal_context = match (
4621            aws_account_id,
4622            connection_context.aws_external_id_prefix.clone(),
4623        ) {
4624            (Some(aws_account_id), Some(aws_external_id_prefix)) => Some(AwsPrincipalContext {
4625                aws_account_id,
4626                aws_external_id_prefix,
4627            }),
4628            _ => None,
4629        };
4630
4631        let aws_privatelink_availability_zones = aws_privatelink_availability_zones
4632            .map(|azs_vec| BTreeSet::from_iter(azs_vec.iter().cloned()));
4633
4634        info!(
4635            "startup: coordinator init: preamble complete in {:?}",
4636            coord_start.elapsed()
4637        );
4638        let oracle_init_start = Instant::now();
4639        info!("startup: coordinator init: timestamp oracle init beginning");
4640
4641        let timestamp_oracle_config = timestamp_oracle_url
4642            .map(|url| TimestampOracleConfig::from_url(&url, &metrics_registry))
4643            .transpose()?;
4644        let mut initial_timestamps =
4645            get_initial_oracle_timestamps(&timestamp_oracle_config).await?;
4646
4647        // Insert an entry for the `EpochMilliseconds` timeline if one doesn't exist,
4648        // which will ensure that the timeline is initialized since it's required
4649        // by the system.
4650        initial_timestamps
4651            .entry(Timeline::EpochMilliseconds)
4652            .or_insert_with(mz_repr::Timestamp::minimum);
4653        let mut timestamp_oracles = BTreeMap::new();
4654        for (timeline, initial_timestamp) in initial_timestamps {
4655            Coordinator::ensure_timeline_state_with_initial_time(
4656                &timeline,
4657                initial_timestamp,
4658                now.clone(),
4659                timestamp_oracle_config.clone(),
4660                &mut timestamp_oracles,
4661                read_only_controllers,
4662            )
4663            .await;
4664        }
4665
4666        // Opening the durable catalog uses one or more timestamps without communicating with
4667        // the timestamp oracle. Here we make sure to apply the catalog upper with the timestamp
4668        // oracle to linearize future operations with opening the catalog.
4669        let catalog_upper = storage.current_upper().await;
4670        // Choose a time at which to boot. This is used, for example, to prune
4671        // old storage usage data or migrate audit log entries.
4672        //
4673        // This time is usually the current system time, but with protection
4674        // against backwards time jumps, even across restarts.
4675        let epoch_millis_oracle = &timestamp_oracles
4676            .get(&Timeline::EpochMilliseconds)
4677            .expect("inserted above")
4678            .oracle;
4679
4680        let mut boot_ts = if read_only_controllers {
4681            let read_ts = epoch_millis_oracle.read_ts().await;
4682            std::cmp::max(read_ts, catalog_upper)
4683        } else {
4684            // Getting/applying a write timestamp bumps the write timestamp in the
4685            // oracle, which we're not allowed in read-only mode.
4686            epoch_millis_oracle.apply_write(catalog_upper).await;
4687            epoch_millis_oracle.write_ts().await.timestamp
4688        };
4689
4690        info!(
4691            "startup: coordinator init: timestamp oracle init complete in {:?}",
4692            oracle_init_start.elapsed()
4693        );
4694
4695        let catalog_open_start = Instant::now();
4696        info!("startup: coordinator init: catalog open beginning");
4697        let persist_client = controller_config
4698            .persist_clients
4699            .open(controller_config.persist_location.clone())
4700            .await
4701            .context("opening persist client")?;
4702        let builtin_item_migration_config =
4703            BuiltinItemMigrationConfig {
4704                persist_client: persist_client.clone(),
4705                read_only: read_only_controllers,
4706                force_migration: force_builtin_schema_migration,
4707            }
4708        ;
4709        let OpenCatalogResult {
4710            mut catalog,
4711            migrated_storage_collections_0dt,
4712            new_builtin_collections,
4713            builtin_table_updates,
4714            cached_global_exprs,
4715            uncached_local_exprs,
4716        } = Catalog::open(mz_catalog::config::Config {
4717            storage,
4718            metrics_registry: &metrics_registry,
4719            state: mz_catalog::config::StateConfig {
4720                unsafe_mode,
4721                all_features,
4722                build_info,
4723                environment_id: environment_id.clone(),
4724                read_only: read_only_controllers,
4725                now: now.clone(),
4726                boot_ts: boot_ts.clone(),
4727                skip_migrations: false,
4728                cluster_replica_sizes,
4729                builtin_system_cluster_config,
4730                builtin_catalog_server_cluster_config,
4731                builtin_probe_cluster_config,
4732                builtin_support_cluster_config,
4733                builtin_analytics_cluster_config,
4734                system_parameter_defaults,
4735                remote_system_parameters,
4736                availability_zones,
4737                egress_addresses,
4738                aws_principal_context,
4739                aws_privatelink_availability_zones,
4740                connection_context,
4741                http_host_name,
4742                builtin_item_migration_config,
4743                persist_client: persist_client.clone(),
4744                enable_expression_cache_override: None,
4745                helm_chart_version,
4746                external_login_password_mz_system,
4747                license_key: license_key.clone(),
4748            },
4749        })
4750        .await?;
4751
4752        // Opening the catalog uses one or more timestamps, so push the boot timestamp up to the
4753        // current catalog upper.
4754        let catalog_upper = catalog.current_upper().await;
4755        boot_ts = std::cmp::max(boot_ts, catalog_upper);
4756
4757        if !read_only_controllers {
4758            epoch_millis_oracle.apply_write(boot_ts).await;
4759        }
4760
4761        info!(
4762            "startup: coordinator init: catalog open complete in {:?}",
4763            catalog_open_start.elapsed()
4764        );
4765
4766        let coord_thread_start = Instant::now();
4767        info!("startup: coordinator init: coordinator thread start beginning");
4768
4769        let session_id = catalog.config().session_id;
4770        let start_instant = catalog.config().start_instant;
4771
4772        // In order for the coordinator to support Rc and Refcell types, it cannot be
4773        // sent across threads. Spawn it in a thread and have this parent thread wait
4774        // for bootstrap completion before proceeding.
4775        let (bootstrap_tx, bootstrap_rx) = oneshot::channel();
4776        let handle = TokioHandle::current();
4777
4778        let metrics = Metrics::register_into(&metrics_registry);
4779        let metrics_clone = metrics.clone();
4780        let optimizer_metrics = OptimizerMetrics::register_into(
4781            &metrics_registry,
4782            catalog.system_config().optimizer_e2e_latency_warning_threshold(),
4783        );
4784        let segment_client_clone = segment_client.clone();
4785        let coord_now = now.clone();
4786        let advance_timelines_interval =
4787            tokio::time::interval(catalog.system_config().default_timestamp_interval());
4788        let mut check_scheduling_policies_interval = tokio::time::interval(
4789            catalog
4790                .system_config()
4791                .cluster_check_scheduling_policies_interval(),
4792        );
4793        check_scheduling_policies_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
4794
4795        let clusters_caught_up_check_interval = if read_only_controllers {
4796            let dyncfgs = catalog.system_config().dyncfgs();
4797            let interval = WITH_0DT_DEPLOYMENT_CAUGHT_UP_CHECK_INTERVAL.get(dyncfgs);
4798
4799            let mut interval = tokio::time::interval(interval);
4800            interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
4801            interval
4802        } else {
4803            // When not in read-only mode, we don't do hydration checks. But we
4804            // still have to provide _some_ interval. This is large enough that
4805            // it doesn't matter.
4806            //
4807            // TODO(aljoscha): We cannot use Duration::MAX right now because of
4808            // https://github.com/tokio-rs/tokio/issues/6634. Use that once it's
4809            // fixed for good.
4810            let mut interval = tokio::time::interval(Duration::from_secs(60 * 60));
4811            interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
4812            interval
4813        };
4814
4815        let clusters_caught_up_check =
4816            clusters_caught_up_trigger.map(|trigger| {
4817                let mut exclude_collections: BTreeSet<GlobalId> =
4818                    new_builtin_collections.into_iter().collect();
4819
4820                // Migrated MVs can't make progress in read-only mode. Exclude them and all their
4821                // transitive dependents.
4822                //
4823                // TODO: Consider sending `allow_writes` for the dataflows of migrated MVs, which
4824                //       would allow them to make progress even in read-only mode. This doesn't
4825                //       work for MVs based on `mz_catalog_raw`, if the leader's version is less
4826                //       than v26.17, since before that version the catalog shard's frontier wasn't
4827                //       kept up-to-date with the current time. So this workaround has to remain in
4828                //       place upgrades from a version less than v26.17 are no longer supported.
4829                let mut todo: Vec<_> = migrated_storage_collections_0dt
4830                    .iter()
4831                    .filter(|id| {
4832                        catalog.state().get_entry(id).is_materialized_view()
4833                    })
4834                    .copied()
4835                    .collect();
4836                while let Some(item_id) = todo.pop() {
4837                    let entry = catalog.state().get_entry(&item_id);
4838                    exclude_collections.extend(entry.global_ids());
4839                    todo.extend_from_slice(entry.used_by());
4840                }
4841
4842                CaughtUpCheckContext {
4843                    trigger,
4844                    exclude_collections,
4845                }
4846            });
4847
4848        if let Some(TimestampOracleConfig::Postgres(pg_config)) =
4849            timestamp_oracle_config.as_ref()
4850        {
4851            // Apply settings from system vars as early as possible because some
4852            // of them are locked in right when an oracle is first opened!
4853            let pg_timestamp_oracle_params =
4854                flags::timestamp_oracle_config(catalog.system_config());
4855            pg_timestamp_oracle_params.apply(pg_config);
4856        }
4857
4858        // Register a callback so whenever the MAX_CONNECTIONS or SUPERUSER_RESERVED_CONNECTIONS
4859        // system variables change, we update our connection limits.
4860        let connection_limit_callback: Arc<dyn Fn(&SystemVars) + Send + Sync> =
4861            Arc::new(move |system_vars: &SystemVars| {
4862                let limit: u64 = system_vars.max_connections().cast_into();
4863                let superuser_reserved: u64 =
4864                    system_vars.superuser_reserved_connections().cast_into();
4865
4866                // If superuser_reserved > max_connections, prefer max_connections.
4867                //
4868                // In this scenario all normal users would be locked out because all connections
4869                // would be reserved for superusers so complain if this is the case.
4870                let superuser_reserved = if superuser_reserved >= limit {
4871                    tracing::warn!(
4872                        "superuser_reserved ({superuser_reserved}) is greater than max connections ({limit})!"
4873                    );
4874                    limit
4875                } else {
4876                    superuser_reserved
4877                };
4878
4879                (connection_limit_callback)(limit, superuser_reserved);
4880            });
4881        catalog.system_config_mut().register_callback(
4882            &mz_sql::session::vars::MAX_CONNECTIONS,
4883            Arc::clone(&connection_limit_callback),
4884        );
4885        catalog.system_config_mut().register_callback(
4886            &mz_sql::session::vars::SUPERUSER_RESERVED_CONNECTIONS,
4887            connection_limit_callback,
4888        );
4889
4890        let (group_commit_tx, group_commit_rx) = appends::notifier();
4891
4892        let parent_span = tracing::Span::current();
4893        let thread = thread::Builder::new()
4894            // The Coordinator thread tends to keep a lot of data on its stack. To
4895            // prevent a stack overflow we allocate a stack three times as big as the default
4896            // stack.
4897            .stack_size(3 * stack::STACK_SIZE)
4898            .name("coordinator".to_string())
4899            .spawn(move || {
4900                let span = info_span!(parent: parent_span, "coord::coordinator").entered();
4901
4902                let controller = handle
4903                    .block_on({
4904                        catalog.initialize_controller(
4905                            controller_config,
4906                            controller_envd_epoch,
4907                            read_only_controllers,
4908                        )
4909                    })
4910                    .unwrap_or_terminate("failed to initialize storage_controller");
4911                // Initializing the controller uses one or more timestamps, so push the boot timestamp up to the
4912                // current catalog upper.
4913                let catalog_upper = handle.block_on(catalog.current_upper());
4914                boot_ts = std::cmp::max(boot_ts, catalog_upper);
4915                if !read_only_controllers {
4916                    let epoch_millis_oracle = &timestamp_oracles
4917                        .get(&Timeline::EpochMilliseconds)
4918                        .expect("inserted above")
4919                        .oracle;
4920                    handle.block_on(epoch_millis_oracle.apply_write(boot_ts));
4921                }
4922
4923                let catalog = Arc::new(catalog);
4924
4925                let caching_secrets_reader = CachingSecretsReader::new(secrets_controller.reader());
4926                let mut coord = Coordinator {
4927                    controller,
4928                    catalog,
4929                    internal_cmd_tx,
4930                    group_commit_tx,
4931                    strict_serializable_reads_tx,
4932                    global_timelines: timestamp_oracles,
4933                    transient_id_gen: Arc::new(TransientIdGen::new()),
4934                    active_conns: BTreeMap::new(),
4935                    txn_read_holds: Default::default(),
4936                    pending_peeks: BTreeMap::new(),
4937                    client_pending_peeks: BTreeMap::new(),
4938                    pending_linearize_read_txns: BTreeMap::new(),
4939                    serialized_ddl: LockedVecDeque::new(),
4940                    active_compute_sinks: BTreeMap::new(),
4941                    active_webhooks: BTreeMap::new(),
4942                    active_copies: BTreeMap::new(),
4943                    connection_cancel_watches: BTreeMap::new(),
4944                    introspection_subscribes: BTreeMap::new(),
4945                    write_locks: BTreeMap::new(),
4946                    deferred_write_ops: BTreeMap::new(),
4947                    pending_writes: Vec::new(),
4948                    advance_timelines_interval,
4949                    secrets_controller,
4950                    caching_secrets_reader,
4951                    cloud_resource_controller,
4952                    storage_usage_client,
4953                    storage_usage_collection_interval,
4954                    segment_client,
4955                    metrics,
4956                    catalog_info_metrics_registry: metrics_registry.clone(),
4957                    scoped_frontend: None,
4958                    optimizer_metrics,
4959                    tracing_handle,
4960                    statement_logging: StatementLogging::new(coord_now.clone()),
4961                    webhook_concurrency_limit,
4962                    timestamp_oracle_config,
4963                    check_cluster_scheduling_policies_interval: check_scheduling_policies_interval,
4964                    cluster_scheduling_decisions: BTreeMap::new(),
4965                    caught_up_check_interval: clusters_caught_up_check_interval,
4966                    caught_up_check: clusters_caught_up_check,
4967                    installed_watch_sets: BTreeMap::new(),
4968                    connection_watch_sets: BTreeMap::new(),
4969                    cluster_replica_statuses: ClusterReplicaStatuses::new(),
4970                    read_only_controllers,
4971                    buffered_builtin_table_updates: Some(Vec::new()),
4972                    license_key,
4973                    user_id_pool: IdPool::empty(),
4974                    persist_client,
4975                };
4976                let bootstrap = handle.block_on(async {
4977                    coord
4978                        .bootstrap(
4979                            boot_ts,
4980                            migrated_storage_collections_0dt,
4981                            builtin_table_updates,
4982                            cached_global_exprs,
4983                            uncached_local_exprs,
4984                            audit_logs_iterator,
4985                        )
4986                        .await?;
4987                    coord
4988                        .controller
4989                        .remove_orphaned_replicas(
4990                            coord.catalog().get_next_user_replica_id().await?,
4991                            coord.catalog().get_next_system_replica_id().await?,
4992                        )
4993                        .await
4994                        .map_err(AdapterError::Orchestrator)?;
4995
4996                    if let Some(retention_period) = storage_usage_retention_period {
4997                        coord
4998                            .prune_storage_usage_events_on_startup(retention_period)
4999                            .await;
5000                    }
5001
5002                    coord.prune_arrangement_sizes_history_on_startup().await;
5003
5004                    Ok(())
5005                });
5006                let ok = bootstrap.is_ok();
5007                drop(span);
5008                bootstrap_tx
5009                    .send(bootstrap)
5010                    .expect("bootstrap_rx is not dropped until it receives this message");
5011                if ok {
5012                    handle.block_on(coord.serve(
5013                        internal_cmd_rx,
5014                        strict_serializable_reads_rx,
5015                        cmd_rx,
5016                        group_commit_rx,
5017                    ));
5018                }
5019            })
5020            .expect("failed to create coordinator thread");
5021        match bootstrap_rx
5022            .await
5023            .expect("bootstrap_tx always sends a message or panics/halts")
5024        {
5025            Ok(()) => {
5026                info!(
5027                    "startup: coordinator init: coordinator thread start complete in {:?}",
5028                    coord_thread_start.elapsed()
5029                );
5030                info!(
5031                    "startup: coordinator init: complete in {:?}",
5032                    coord_start.elapsed()
5033                );
5034                let handle = Handle {
5035                    session_id,
5036                    start_instant,
5037                    _thread: thread.join_on_drop(),
5038                };
5039                let client = Client::new(
5040                    build_info,
5041                    cmd_tx,
5042                    metrics_clone,
5043                    now,
5044                    environment_id,
5045                    segment_client_clone,
5046                );
5047                Ok((handle, client))
5048            }
5049            Err(e) => Err(e),
5050        }
5051    }
5052    .boxed()
5053}
5054
5055// Determines and returns the highest timestamp for each timeline, for all known
5056// timestamp oracle implementations.
5057//
5058// Initially, we did this so that we can switch between implementations of
5059// timestamp oracle, but now we also do this to determine a monotonic boot
5060// timestamp, a timestamp that does not regress across reboots.
5061//
5062// This mostly works, but there can be linearizability violations, because there
5063// is no central moment where we do distributed coordination for all oracle
5064// types. Working around this seems prohibitively hard, maybe even impossible so
5065// we have to live with this window of potential violations during the upgrade
5066// window (which is the only point where we should switch oracle
5067// implementations).
5068async fn get_initial_oracle_timestamps(
5069    timestamp_oracle_config: &Option<TimestampOracleConfig>,
5070) -> Result<BTreeMap<Timeline, Timestamp>, AdapterError> {
5071    let mut initial_timestamps = BTreeMap::new();
5072
5073    if let Some(config) = timestamp_oracle_config {
5074        let oracle_timestamps = config.get_all_timelines().await?;
5075
5076        let debug_msg = || {
5077            oracle_timestamps
5078                .iter()
5079                .map(|(timeline, ts)| format!("{:?} -> {}", timeline, ts))
5080                .join(", ")
5081        };
5082        info!(
5083            "current timestamps from the timestamp oracle: {}",
5084            debug_msg()
5085        );
5086
5087        for (timeline, ts) in oracle_timestamps {
5088            let entry = initial_timestamps
5089                .entry(Timeline::from_str(&timeline).expect("could not parse timeline"));
5090
5091            entry
5092                .and_modify(|current_ts| *current_ts = std::cmp::max(*current_ts, ts))
5093                .or_insert(ts);
5094        }
5095    } else {
5096        info!("no timestamp oracle configured!");
5097    };
5098
5099    let debug_msg = || {
5100        initial_timestamps
5101            .iter()
5102            .map(|(timeline, ts)| format!("{:?}: {}", timeline, ts))
5103            .join(", ")
5104    };
5105    info!("initial oracle timestamps: {}", debug_msg());
5106
5107    Ok(initial_timestamps)
5108}
5109
5110#[instrument]
5111pub async fn load_remote_system_parameters(
5112    storage: &mut Box<dyn OpenableDurableCatalogState>,
5113    system_parameter_sync_config: Option<SystemParameterSyncConfig>,
5114    system_parameter_sync_timeout: Duration,
5115) -> Result<Option<BTreeMap<String, String>>, AdapterError> {
5116    if let Some(system_parameter_sync_config) = system_parameter_sync_config {
5117        tracing::info!("parameter sync on boot: start sync");
5118
5119        // We intentionally block initial startup, potentially forever,
5120        // on initializing LaunchDarkly. This may seem scary, but the
5121        // alternative is even scarier. Over time, we expect that the
5122        // compiled-in default values for the system parameters will
5123        // drift substantially from the defaults configured in
5124        // LaunchDarkly, to the point that starting an environment
5125        // without loading the latest values from LaunchDarkly will
5126        // result in running an untested configuration.
5127        //
5128        // Note this only applies during initial startup. Restarting
5129        // after we've synced once only blocks for a maximum of
5130        // `FRONTEND_SYNC_TIMEOUT` on LaunchDarkly, as it seems
5131        // reasonable to assume that the last-synced configuration was
5132        // valid enough.
5133        //
5134        // This philosophy appears to provide a good balance between not
5135        // running untested configurations in production while also not
5136        // making LaunchDarkly a "tier 1" dependency for existing
5137        // environments.
5138        //
5139        // If this proves to be an issue, we could seek to address the
5140        // configuration drift in a different way--for example, by
5141        // writing a script that runs in CI nightly and checks for
5142        // deviation between the compiled Rust code and LaunchDarkly.
5143        //
5144        // If it is absolutely necessary to bring up a new environment
5145        // while LaunchDarkly is down, the following manual mitigation
5146        // can be performed:
5147        //
5148        //    1. Edit the environmentd startup parameters to omit the
5149        //       LaunchDarkly configuration.
5150        //    2. Boot environmentd.
5151        //    3. Use the catalog-debug tool to run `edit config "{\"key\":\"system_config_synced\"}" "{\"value\": 1}"`.
5152        //    4. Adjust any other parameters as necessary to avoid
5153        //       running a nonstandard configuration in production.
5154        //    5. Edit the environmentd startup parameters to restore the
5155        //       LaunchDarkly configuration, for when LaunchDarkly comes
5156        //       back online.
5157        //    6. Reboot environmentd.
5158        let mut params = SynchronizedParameters::new(SystemVars::default());
5159        let frontend_sync = async {
5160            let frontend = SystemParameterFrontend::from(&system_parameter_sync_config).await?;
5161            frontend.pull(&mut params);
5162            let ops = params
5163                .modified()
5164                .into_iter()
5165                .map(|param| {
5166                    let name = param.name;
5167                    let value = param.value;
5168                    tracing::info!(name, value, initial = true, "sync parameter");
5169                    (name, value)
5170                })
5171                .collect();
5172            tracing::info!("parameter sync on boot: end sync");
5173            Ok(Some(ops))
5174        };
5175        if !storage.has_system_config_synced_once().await? {
5176            frontend_sync.await
5177        } else {
5178            match mz_ore::future::timeout(system_parameter_sync_timeout, frontend_sync).await {
5179                Ok(ops) => Ok(ops),
5180                Err(TimeoutError::Inner(e)) => Err(e),
5181                Err(TimeoutError::DeadlineElapsed) => {
5182                    tracing::info!("parameter sync on boot: sync has timed out");
5183                    Ok(None)
5184                }
5185            }
5186        }
5187    } else {
5188        Ok(None)
5189    }
5190}
5191
5192#[derive(Debug)]
5193pub enum WatchSetResponse {
5194    StatementDependenciesReady(StatementLoggingId, StatementLifecycleEvent),
5195    AlterSinkReady(AlterSinkReadyContext),
5196    AlterMaterializedViewReady(AlterMaterializedViewReadyContext),
5197}
5198
5199#[derive(Debug)]
5200pub struct AlterSinkReadyContext {
5201    ctx: Option<ExecuteContext>,
5202    otel_ctx: OpenTelemetryContext,
5203    plan: AlterSinkPlan,
5204    plan_validity: PlanValidity,
5205    read_hold: ReadHolds,
5206}
5207
5208impl AlterSinkReadyContext {
5209    fn ctx(&mut self) -> &mut ExecuteContext {
5210        self.ctx.as_mut().expect("only cleared on drop")
5211    }
5212
5213    fn retire(mut self, result: Result<ExecuteResponse, AdapterError>) {
5214        self.ctx
5215            .take()
5216            .expect("only cleared on drop")
5217            .retire(result);
5218    }
5219}
5220
5221impl Drop for AlterSinkReadyContext {
5222    fn drop(&mut self) {
5223        if let Some(ctx) = self.ctx.take() {
5224            ctx.retire(Err(AdapterError::Canceled));
5225        }
5226    }
5227}
5228
5229#[derive(Debug)]
5230pub struct AlterMaterializedViewReadyContext {
5231    ctx: Option<ExecuteContext>,
5232    otel_ctx: OpenTelemetryContext,
5233    plan: plan::AlterMaterializedViewApplyReplacementPlan,
5234    plan_validity: PlanValidity,
5235}
5236
5237impl AlterMaterializedViewReadyContext {
5238    fn ctx(&mut self) -> &mut ExecuteContext {
5239        self.ctx.as_mut().expect("only cleared on drop")
5240    }
5241
5242    fn retire(mut self, result: Result<ExecuteResponse, AdapterError>) {
5243        self.ctx
5244            .take()
5245            .expect("only cleared on drop")
5246            .retire(result);
5247    }
5248}
5249
5250impl Drop for AlterMaterializedViewReadyContext {
5251    fn drop(&mut self) {
5252        if let Some(ctx) = self.ctx.take() {
5253            ctx.retire(Err(AdapterError::Canceled));
5254        }
5255    }
5256}
5257
5258/// A struct for tracking the ownership of a lock and a VecDeque to store to-be-done work after the
5259/// lock is freed.
5260#[derive(Debug)]
5261struct LockedVecDeque<T> {
5262    items: VecDeque<T>,
5263    lock: Arc<tokio::sync::Mutex<()>>,
5264}
5265
5266impl<T> LockedVecDeque<T> {
5267    pub fn new() -> Self {
5268        Self {
5269            items: VecDeque::new(),
5270            lock: Arc::new(tokio::sync::Mutex::new(())),
5271        }
5272    }
5273
5274    pub fn try_lock_owned(&self) -> Result<OwnedMutexGuard<()>, tokio::sync::TryLockError> {
5275        Arc::clone(&self.lock).try_lock_owned()
5276    }
5277
5278    pub fn is_empty(&self) -> bool {
5279        self.items.is_empty()
5280    }
5281
5282    pub fn push_back(&mut self, value: T) {
5283        self.items.push_back(value)
5284    }
5285
5286    pub fn pop_front(&mut self) -> Option<T> {
5287        self.items.pop_front()
5288    }
5289
5290    pub fn remove(&mut self, index: usize) -> Option<T> {
5291        self.items.remove(index)
5292    }
5293
5294    pub fn iter(&self) -> std::collections::vec_deque::Iter<'_, T> {
5295        self.items.iter()
5296    }
5297}
5298
5299#[derive(Debug)]
5300struct DeferredPlanStatement {
5301    ctx: ExecuteContext,
5302    ps: PlanStatement,
5303}
5304
5305#[derive(Debug)]
5306enum PlanStatement {
5307    Statement {
5308        stmt: Arc<Statement<Raw>>,
5309        params: Params,
5310    },
5311    Plan {
5312        plan: mz_sql::plan::Plan,
5313        resolved_ids: ResolvedIds,
5314        sql_impl_resolved_ids: ResolvedIds,
5315    },
5316}
5317
5318#[derive(Debug, Error)]
5319pub enum NetworkPolicyError {
5320    #[error("Access denied for address {0}")]
5321    AddressDenied(IpAddr),
5322    #[error("Access denied missing IP address")]
5323    MissingIp,
5324}
5325
5326pub(crate) fn validate_ip_with_policy_rules(
5327    ip: &IpAddr,
5328    rules: &Vec<NetworkPolicyRule>,
5329) -> Result<(), NetworkPolicyError> {
5330    // At the moment we're not handling action or direction
5331    // as those are only able to be "allow" and "ingress" respectively
5332    if rules.iter().any(|r| r.address.0.contains(ip)) {
5333        Ok(())
5334    } else {
5335        Err(NetworkPolicyError::AddressDenied(ip.clone()))
5336    }
5337}
5338
5339pub(crate) fn infer_sql_type_for_catalog(
5340    hir_expr: &HirRelationExpr,
5341    mir_expr: &MirRelationExpr,
5342) -> SqlRelationType {
5343    let mut typ = hir_expr.top_level_typ();
5344    typ.backport_nullability_and_keys(&mir_expr.typ());
5345    typ
5346}
5347
5348#[cfg(test)]
5349mod id_pool_tests {
5350    use super::IdPool;
5351
5352    #[mz_ore::test]
5353    fn test_empty_pool() {
5354        let mut pool = IdPool::empty();
5355        assert_eq!(pool.remaining(), 0);
5356        assert_eq!(pool.allocate(), None);
5357        assert_eq!(pool.allocate_many(1), None);
5358    }
5359
5360    #[mz_ore::test]
5361    fn test_allocate_single() {
5362        let mut pool = IdPool::empty();
5363        pool.refill(10, 13);
5364        assert_eq!(pool.remaining(), 3);
5365        assert_eq!(pool.allocate(), Some(10));
5366        assert_eq!(pool.allocate(), Some(11));
5367        assert_eq!(pool.allocate(), Some(12));
5368        assert_eq!(pool.remaining(), 0);
5369        assert_eq!(pool.allocate(), None);
5370    }
5371
5372    #[mz_ore::test]
5373    fn test_allocate_many() {
5374        let mut pool = IdPool::empty();
5375        pool.refill(100, 105);
5376        assert_eq!(pool.allocate_many(3), Some(vec![100, 101, 102]));
5377        assert_eq!(pool.remaining(), 2);
5378        // Not enough remaining for 3 more.
5379        assert_eq!(pool.allocate_many(3), None);
5380        // But 2 works.
5381        assert_eq!(pool.allocate_many(2), Some(vec![103, 104]));
5382        assert_eq!(pool.remaining(), 0);
5383    }
5384
5385    #[mz_ore::test]
5386    fn test_allocate_many_zero() {
5387        let mut pool = IdPool::empty();
5388        pool.refill(1, 5);
5389        assert_eq!(pool.allocate_many(0), Some(vec![]));
5390        assert_eq!(pool.remaining(), 4);
5391    }
5392
5393    #[mz_ore::test]
5394    fn test_refill_resets_pool() {
5395        let mut pool = IdPool::empty();
5396        pool.refill(0, 2);
5397        assert_eq!(pool.allocate(), Some(0));
5398        // Refill before exhaustion replaces the range.
5399        pool.refill(50, 52);
5400        assert_eq!(pool.allocate(), Some(50));
5401        assert_eq!(pool.allocate(), Some(51));
5402        assert_eq!(pool.allocate(), None);
5403    }
5404
5405    #[mz_ore::test]
5406    fn test_mixed_allocate_and_allocate_many() {
5407        let mut pool = IdPool::empty();
5408        pool.refill(0, 10);
5409        assert_eq!(pool.allocate(), Some(0));
5410        assert_eq!(pool.allocate_many(3), Some(vec![1, 2, 3]));
5411        assert_eq!(pool.allocate(), Some(4));
5412        assert_eq!(pool.remaining(), 5);
5413    }
5414
5415    #[mz_ore::test]
5416    #[should_panic(expected = "invalid pool range")]
5417    fn test_refill_invalid_range_panics() {
5418        let mut pool = IdPool::empty();
5419        pool.refill(10, 5);
5420    }
5421}
5422
5423#[cfg(test)]
5424mod arrangement_sizes_pruner_tests {
5425    use mz_repr::catalog_item_id::CatalogItemId;
5426    use mz_repr::{Datum, Row};
5427
5428    use super::arrangement_sizes_expired_retractions;
5429
5430    // Pack a row shaped like `mz_object_arrangement_size_history`: the pruner
5431    // only cares about column 3 (`collection_timestamp`), but we stuff the
5432    // other three columns with realistic values so shape changes would fail.
5433    fn history_row(ts_ms: i64) -> Row {
5434        let dt = mz_ore::now::to_datetime(ts_ms.try_into().expect("non-negative"));
5435        Row::pack_slice(&[
5436            Datum::String("r1"),
5437            Datum::String("u1"),
5438            Datum::Int64(123),
5439            Datum::TimestampTz(dt.try_into().expect("fits in TimestampTz")),
5440        ])
5441    }
5442
5443    fn item_id() -> CatalogItemId {
5444        // Any CatalogItemId will do; tests don't dispatch on it.
5445        CatalogItemId::User(42)
5446    }
5447
5448    #[mz_ore::test]
5449    fn empty_input_produces_no_retractions() {
5450        let out = arrangement_sizes_expired_retractions(Vec::new(), 1_000, item_id());
5451        assert!(out.is_empty());
5452    }
5453
5454    #[mz_ore::test]
5455    fn retracts_only_rows_strictly_before_cutoff() {
5456        // Mixes both sides of the filter and includes a row at exactly
5457        // the cutoff timestamp to pin down the strict-less-than boundary.
5458        let rows = vec![
5459            (history_row(100), 1),
5460            (history_row(500), 1),
5461            (history_row(1_000), 1), // at cutoff: kept (strict <)
5462            (history_row(5_000), 1),
5463        ];
5464        let out = arrangement_sizes_expired_retractions(rows, 1_000, item_id());
5465        assert_eq!(out.len(), 2);
5466    }
5467
5468    #[mz_ore::test]
5469    #[should_panic(expected = "consolidated contents should not contain retractions")]
5470    fn retraction_in_input_panics() {
5471        let rows = vec![(history_row(100), -1)];
5472        let _ = arrangement_sizes_expired_retractions(rows, 1_000, item_id());
5473    }
5474}