Skip to main content

mz_adapter/
coord.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Translation of SQL commands into timestamped `Controller` commands.
11//!
12//! The various SQL commands instruct the system to take actions that are not
13//! yet explicitly timestamped. On the other hand, the underlying data continually
14//! change as time moves forward. On the third hand, we greatly benefit from the
15//! information that some times are no longer of interest, so that we may
16//! compact the representation of the continually changing collections.
17//!
18//! The [`Coordinator`] curates these interactions by observing the progress
19//! collections make through time, choosing timestamps for its own commands,
20//! and eventually communicating that certain times have irretrievably "passed".
21//!
22//! ## Frontiers another way
23//!
24//! If the above description of frontiers left you with questions, this
25//! repackaged explanation might help.
26//!
27//! - `since` is the least recent time (i.e. oldest time) that you can read
28//!   from sources and be guaranteed that the returned data is accurate as of
29//!   that time.
30//!
31//!   Reads at times less than `since` may return values that were not actually
32//!   seen at the specified time, but arrived later (i.e. the results are
33//!   compacted).
34//!
35//!   For correctness' sake, the coordinator never chooses to read at a time
36//!   less than an arrangement's `since`.
37//!
38//! - `upper` is the first time after the most recent time that you can read
39//!   from sources and receive an immediate response. Alternately, it is the
40//!   least time at which the data may still change (that is the reason we may
41//!   not be able to respond immediately).
42//!
43//!   Reads at times >= `upper` may not immediately return because the answer
44//!   isn't known yet. However, once the `upper` is > the specified read time,
45//!   the read can return.
46//!
47//!   For the sake of returned values' freshness, the coordinator prefers
48//!   performing reads at an arrangement's `upper`. However, because we more
49//!   strongly prefer correctness, the coordinator will choose timestamps
50//!   greater than an object's `upper` if it is also being accessed alongside
51//!   objects whose `since` times are >= its `upper`.
52//!
53//! This illustration attempts to show, with time moving left to right, the
54//! relationship between `since` and `upper`.
55//!
56//! - `#`: possibly inaccurate results
57//! - `-`: immediate, correct response
58//! - `?`: not yet known
59//! - `s`: since
60//! - `u`: upper
61//! - `|`: eligible for coordinator to select
62//!
63//! ```nofmt
64//! ####s----u?????
65//!     |||||||||||
66//! ```
67//!
68
69use std::borrow::Cow;
70use std::collections::{BTreeMap, BTreeSet, VecDeque};
71use std::net::IpAddr;
72use std::num::NonZeroI64;
73use std::ops::Neg;
74use std::str::FromStr;
75use std::sync::LazyLock;
76use std::sync::{Arc, Mutex};
77use std::thread;
78use std::time::{Duration, Instant};
79use std::{fmt, mem};
80
81use anyhow::Context;
82use chrono::{DateTime, Utc};
83use derivative::Derivative;
84use differential_dataflow::lattice::Lattice;
85use fail::fail_point;
86use futures::StreamExt;
87use futures::future::{BoxFuture, FutureExt, LocalBoxFuture};
88use http::Uri;
89use ipnet::IpNet;
90use itertools::{Either, Itertools};
91use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
92use mz_adapter_types::compaction::CompactionWindow;
93use mz_adapter_types::connection::ConnectionId;
94use mz_adapter_types::dyncfgs::{
95    USER_ID_POOL_BATCH_SIZE, WITH_0DT_DEPLOYMENT_CAUGHT_UP_CHECK_INTERVAL,
96};
97use mz_auth::password::Password;
98use mz_build_info::BuildInfo;
99use mz_catalog::builtin::{BUILTINS, BUILTINS_STATIC, MZ_AUDIT_EVENTS, MZ_STORAGE_USAGE_BY_SHARD};
100use mz_catalog::config::{AwsPrincipalContext, BuiltinItemMigrationConfig, ClusterReplicaSizeMap};
101use mz_catalog::durable::{AuditLogIterator, OpenableDurableCatalogState};
102use mz_catalog::expr_cache::{GlobalExpressions, LocalExpressions};
103use mz_catalog::memory::objects::{
104    CatalogEntry, CatalogItem, ClusterReplicaProcessStatus, ClusterVariantManaged, Connection,
105    DataSourceDesc, StateDiff, StateUpdate, StateUpdateKind, Table, TableDataSource,
106};
107use mz_cloud_resources::{CloudResourceController, VpcEndpointConfig, VpcEndpointEvent};
108use mz_compute_client::as_of_selection;
109use mz_compute_client::controller::error::{
110    CollectionLookupError, CollectionMissing, DataflowCreationError, InstanceMissing,
111};
112use mz_compute_types::ComputeInstanceId;
113use mz_compute_types::dataflows::DataflowDescription;
114use mz_compute_types::plan::Plan;
115use mz_controller::clusters::{
116    ClusterConfig, ClusterEvent, ClusterStatus, ProcessId, ReplicaLocation,
117};
118use mz_controller::{ControllerConfig, Readiness};
119use mz_controller_types::{ClusterId, ReplicaId, WatchSetId};
120use mz_expr::{MapFilterProject, MirRelationExpr, OptimizedMirRelationExpr, RowSetFinishing};
121use mz_license_keys::{ExpirationBehavior, ValidatedLicenseKey};
122use mz_orchestrator::OfflineReason;
123use mz_ore::cast::{CastFrom, CastInto, CastLossy};
124use mz_ore::channel::trigger::Trigger;
125use mz_ore::future::TimeoutError;
126use mz_ore::metrics::MetricsRegistry;
127use mz_ore::now::{EpochMillis, NowFn};
128use mz_ore::task::{JoinHandle, spawn};
129use mz_ore::thread::JoinHandleExt;
130use mz_ore::tracing::{OpenTelemetryContext, TracingHandle};
131use mz_ore::url::SensitiveUrl;
132use mz_ore::{
133    assert_none, instrument, soft_assert_eq_or_log, soft_assert_or_log, soft_panic_or_log, stack,
134};
135use mz_persist_client::PersistClient;
136use mz_persist_client::batch::ProtoBatch;
137use mz_persist_client::usage::{ShardsUsageReferenced, StorageUsageClient};
138use mz_repr::adt::numeric::Numeric;
139use mz_repr::explain::{ExplainConfig, ExplainFormat};
140use mz_repr::global_id::TransientIdGen;
141use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
142use mz_repr::role_id::RoleId;
143use mz_repr::{CatalogItemId, Diff, GlobalId, RelationDesc, SqlRelationType, Timestamp};
144use mz_secrets::cache::CachingSecretsReader;
145use mz_secrets::{SecretsController, SecretsReader};
146use mz_sql::ast::{Raw, Statement};
147use mz_sql::catalog::{CatalogCluster, EnvironmentId};
148use mz_sql::names::{QualifiedItemName, ResolvedIds, SchemaSpecifier};
149use mz_sql::optimizer_metrics::OptimizerMetrics;
150use mz_sql::plan::{
151    self, AlterSinkPlan, ConnectionDetails, CreateConnectionPlan, HirRelationExpr,
152    NetworkPolicyRule, OnTimeoutAction, Params, QueryWhen,
153};
154use mz_sql::session::user::User;
155use mz_sql::session::vars::{MAX_CREDIT_CONSUMPTION_RATE, SystemVars, Var};
156use mz_sql_parser::ast::ExplainStage;
157use mz_sql_parser::ast::display::AstDisplay;
158use mz_storage_client::client::TableData;
159use mz_storage_client::controller::{CollectionDescription, DataSource, ExportDescription};
160use mz_storage_types::connections::Connection as StorageConnection;
161use mz_storage_types::connections::ConnectionContext;
162use mz_storage_types::connections::inline::{IntoInlineConnection, ReferencedConnection};
163use mz_storage_types::read_holds::ReadHold;
164use mz_storage_types::sinks::{S3SinkFormat, StorageSinkDesc};
165use mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC;
166use mz_storage_types::sources::{IngestionDescription, SourceExport, Timeline};
167use mz_timestamp_oracle::{TimestampOracleConfig, WriteTimestamp};
168use mz_transform::dataflow::DataflowMetainfo;
169use opentelemetry::trace::TraceContextExt;
170use serde::Serialize;
171use thiserror::Error;
172use timely::progress::{Antichain, Timestamp as _};
173use tokio::runtime::Handle as TokioHandle;
174use tokio::select;
175use tokio::sync::{OwnedMutexGuard, mpsc, oneshot, watch};
176use tokio::time::{Interval, MissedTickBehavior};
177use tracing::{Instrument, Level, Span, debug, info, info_span, span, warn};
178use tracing_opentelemetry::OpenTelemetrySpanExt;
179use uuid::Uuid;
180
181use crate::active_compute_sink::{ActiveComputeSink, ActiveCopyFrom};
182use crate::catalog::{BuiltinTableUpdate, Catalog, OpenCatalogResult};
183use crate::client::{Client, Handle};
184use crate::command::{Command, ExecuteResponse};
185use crate::config::{SynchronizedParameters, SystemParameterFrontend, SystemParameterSyncConfig};
186use crate::coord::appends::{
187    BuiltinTableAppendNotify, DeferredOp, GroupCommitPermit, PendingWriteTxn,
188};
189use crate::coord::caught_up::CaughtUpCheckContext;
190use crate::coord::cluster_scheduling::SchedulingDecision;
191use crate::coord::id_bundle::CollectionIdBundle;
192use crate::coord::introspection::IntrospectionSubscribe;
193use crate::coord::peek::PendingPeek;
194use crate::coord::statement_logging::StatementLogging;
195use crate::coord::timeline::{TimelineContext, TimelineState};
196use crate::coord::timestamp_selection::{TimestampContext, TimestampDetermination};
197use crate::coord::validity::PlanValidity;
198use crate::error::AdapterError;
199use crate::explain::insights::PlanInsightsContext;
200use crate::explain::optimizer_trace::{DispatchGuard, OptimizerTrace};
201use crate::metrics::Metrics;
202use crate::optimize::dataflows::{ComputeInstanceSnapshot, DataflowBuilder};
203use crate::optimize::{self, Optimize, OptimizerConfig};
204use crate::session::{EndTransactionAction, Session};
205use crate::statement_logging::{
206    StatementEndedExecutionReason, StatementLifecycleEvent, StatementLoggingId,
207};
208use crate::util::{ClientTransmitter, ResultExt, sort_topological};
209use crate::webhook::{WebhookAppenderInvalidator, WebhookConcurrencyLimiter};
210use crate::{AdapterNotice, ReadHolds, flags};
211
212pub(crate) mod appends;
213pub(crate) mod catalog_serving;
214pub(crate) mod cluster_scheduling;
215pub(crate) mod consistency;
216pub(crate) mod id_bundle;
217pub(crate) mod in_memory_oracle;
218pub(crate) mod peek;
219pub(crate) mod read_policy;
220pub(crate) mod read_then_write;
221pub(crate) mod sequencer;
222pub(crate) mod statement_logging;
223pub(crate) mod timeline;
224pub(crate) mod timestamp_selection;
225
226pub mod catalog_implications;
227mod caught_up;
228mod command_handler;
229mod ddl;
230pub(crate) mod group_sync;
231mod indexes;
232mod introspection;
233mod message_handler;
234mod privatelink_status;
235mod sql;
236mod validity;
237
238/// A pool of pre-allocated user IDs to avoid per-DDL persist writes.
239///
240/// IDs in the range `[next, upper)` are available for allocation.
241/// When exhausted, the pool must be refilled via the catalog.
242///
243/// # Correctness
244///
245/// The pool is owned by [`Coordinator`], which processes all requests
246/// on a single-threaded event loop. Because every access requires
247/// `&mut self` on the coordinator, there is no concurrent access to the
248/// pool — no additional synchronization is needed.
249///
250/// Global ID uniqueness is guaranteed because each refill calls
251/// [`Catalog::allocate_user_ids`], which performs a durable persist
252/// write that atomically reserves the entire batch before any IDs from
253/// it are handed out. If the process crashes after a refill but before
254/// all pre-allocated IDs are consumed, the unused IDs form harmless
255/// gaps in the sequence — user IDs are not required to be contiguous.
256///
257/// This guarantee holds even if multiple `environmentd` processes run
258/// concurrently. Each process has its own independent pool,
259/// but every refill goes through the shared persist-backed catalog,
260/// which serializes allocations across all callers. Two processes
261/// will therefore never receive overlapping ID ranges,
262/// for the same reason they could not before this pool existed.
263#[derive(Debug)]
264pub(crate) struct IdPool {
265    next: u64,
266    upper: u64,
267}
268
269impl IdPool {
270    /// Creates an empty pool.
271    pub fn empty() -> Self {
272        IdPool { next: 0, upper: 0 }
273    }
274
275    /// Allocates a single ID from the pool, returning `None` if exhausted.
276    pub fn allocate(&mut self) -> Option<u64> {
277        if self.next < self.upper {
278            let id = self.next;
279            self.next += 1;
280            Some(id)
281        } else {
282            None
283        }
284    }
285
286    /// Allocates `n` consecutive IDs from the pool, returning `None` if
287    /// insufficient IDs remain.
288    pub fn allocate_many(&mut self, n: u64) -> Option<Vec<u64>> {
289        if self.remaining() >= n {
290            let ids = (self.next..self.next + n).collect();
291            self.next += n;
292            Some(ids)
293        } else {
294            None
295        }
296    }
297
298    /// Returns the number of IDs remaining in the pool.
299    pub fn remaining(&self) -> u64 {
300        self.upper - self.next
301    }
302
303    /// Refills the pool with the given range `[next, upper)`.
304    pub fn refill(&mut self, next: u64, upper: u64) {
305        assert!(next <= upper, "invalid pool range: {next}..{upper}");
306        self.next = next;
307        self.upper = upper;
308    }
309}
310
311#[derive(Debug)]
312pub enum Message {
313    Command(OpenTelemetryContext, Command),
314    ControllerReady {
315        controller: ControllerReadiness,
316    },
317    PurifiedStatementReady(PurifiedStatementReady),
318    CreateConnectionValidationReady(CreateConnectionValidationReady),
319    AlterConnectionValidationReady(AlterConnectionValidationReady),
320    TryDeferred {
321        /// The connection that created this op.
322        conn_id: ConnectionId,
323        /// The write lock that notified us our deferred op might be able to run.
324        ///
325        /// Note: While we never want to hold a partial set of locks, it can be important to hold
326        /// onto the _one_ that notified us our op might be ready. If there are multiple operations
327        /// waiting on a single collection, and we don't hold this lock through retyring the op,
328        /// then everything waiting on this collection will get retried causing traffic in the
329        /// Coordinator's message queue.
330        ///
331        /// See [`DeferredOp::can_be_optimistically_retried`] for more detail.
332        acquired_lock: Option<(CatalogItemId, tokio::sync::OwnedMutexGuard<()>)>,
333    },
334    /// Initiates a group commit.
335    GroupCommitInitiate(Span, Option<GroupCommitPermit>),
336    DeferredStatementReady,
337    AdvanceTimelines,
338    ClusterEvent(ClusterEvent),
339    CancelPendingPeeks {
340        conn_id: ConnectionId,
341    },
342    LinearizeReads,
343    StagedBatches {
344        conn_id: ConnectionId,
345        table_id: CatalogItemId,
346        batches: Vec<Result<ProtoBatch, String>>,
347    },
348    StorageUsageSchedule,
349    StorageUsageFetch,
350    StorageUsageUpdate(ShardsUsageReferenced),
351    StorageUsagePrune(Vec<BuiltinTableUpdate>),
352    ArrangementSizesSchedule,
353    ArrangementSizesSnapshot,
354    ArrangementSizesPrune(Vec<BuiltinTableUpdate>),
355    /// Performs any cleanup and logging actions necessary for
356    /// finalizing a statement execution.
357    RetireExecute {
358        data: ExecuteContextExtra,
359        otel_ctx: OpenTelemetryContext,
360        reason: StatementEndedExecutionReason,
361    },
362    ExecuteSingleStatementTransaction {
363        ctx: ExecuteContext,
364        otel_ctx: OpenTelemetryContext,
365        stmt: Arc<Statement<Raw>>,
366        params: mz_sql::plan::Params,
367    },
368    PeekStageReady {
369        ctx: ExecuteContext,
370        span: Span,
371        stage: PeekStage,
372    },
373    CreateIndexStageReady {
374        ctx: ExecuteContext,
375        span: Span,
376        stage: CreateIndexStage,
377    },
378    CreateViewStageReady {
379        ctx: ExecuteContext,
380        span: Span,
381        stage: CreateViewStage,
382    },
383    CreateMaterializedViewStageReady {
384        ctx: ExecuteContext,
385        span: Span,
386        stage: CreateMaterializedViewStage,
387    },
388    SubscribeStageReady {
389        ctx: ExecuteContext,
390        span: Span,
391        stage: SubscribeStage,
392    },
393    IntrospectionSubscribeStageReady {
394        span: Span,
395        stage: IntrospectionSubscribeStage,
396    },
397    SecretStageReady {
398        ctx: ExecuteContext,
399        span: Span,
400        stage: SecretStage,
401    },
402    ClusterStageReady {
403        ctx: ExecuteContext,
404        span: Span,
405        stage: ClusterStage,
406    },
407    ExplainTimestampStageReady {
408        ctx: ExecuteContext,
409        span: Span,
410        stage: ExplainTimestampStage,
411    },
412    DrainStatementLog,
413    PrivateLinkVpcEndpointEvents(Vec<VpcEndpointEvent>),
414    CheckSchedulingPolicies,
415
416    /// Scheduling policy decisions about turning clusters On/Off.
417    /// `Vec<(policy name, Vec of decisions by the policy)>`
418    /// A cluster will be On if and only if there is at least one On decision for it.
419    /// Scheduling decisions for clusters that have `SCHEDULE = MANUAL` are ignored.
420    SchedulingDecisions(Vec<(&'static str, Vec<(ClusterId, SchedulingDecision)>)>),
421}
422
423impl Message {
424    /// Returns a string to identify the kind of [`Message`], useful for logging.
425    pub const fn kind(&self) -> &'static str {
426        match self {
427            Message::Command(_, msg) => match msg {
428                Command::CatalogSnapshot { .. } => "command-catalog_snapshot",
429                Command::Startup { .. } => "command-startup",
430                Command::Execute { .. } => "command-execute",
431                Command::Commit { .. } => "command-commit",
432                Command::CancelRequest { .. } => "command-cancel_request",
433                Command::PrivilegedCancelRequest { .. } => "command-privileged_cancel_request",
434                Command::GetWebhook { .. } => "command-get_webhook",
435                Command::GetSystemVars { .. } => "command-get_system_vars",
436                Command::SetSystemVars { .. } => "command-set_system_vars",
437                Command::Terminate { .. } => "command-terminate",
438                Command::RetireExecute { .. } => "command-retire_execute",
439                Command::CheckConsistency { .. } => "command-check_consistency",
440                Command::Dump { .. } => "command-dump",
441                Command::AuthenticatePassword { .. } => "command-auth_check",
442                Command::AuthenticateGetSASLChallenge { .. } => "command-auth_get_sasl_challenge",
443                Command::AuthenticateVerifySASLProof { .. } => "command-auth_verify_sasl_proof",
444                Command::CheckRoleCanLogin { .. } => "command-check_role_can_login",
445                Command::GetComputeInstanceClient { .. } => "get-compute-instance-client",
446                Command::GetOracle { .. } => "get-oracle",
447                Command::DetermineRealTimeRecentTimestamp { .. } => {
448                    "determine-real-time-recent-timestamp"
449                }
450                Command::GetTransactionReadHoldsBundle { .. } => {
451                    "get-transaction-read-holds-bundle"
452                }
453                Command::StoreTransactionReadHolds { .. } => "store-transaction-read-holds",
454                Command::ExecuteSlowPathPeek { .. } => "execute-slow-path-peek",
455                Command::ExecuteSubscribe { .. } => "execute-subscribe",
456                Command::CopyToPreflight { .. } => "copy-to-preflight",
457                Command::ExecuteCopyTo { .. } => "execute-copy-to",
458                Command::ExecuteSideEffectingFunc { .. } => "execute-side-effecting-func",
459                Command::RegisterFrontendPeek { .. } => "register-frontend-peek",
460                Command::UnregisterFrontendPeek { .. } => "unregister-frontend-peek",
461                Command::ExplainTimestamp { .. } => "explain-timestamp",
462                Command::FrontendStatementLogging(..) => "frontend-statement-logging",
463                Command::StartCopyFromStdin { .. } => "start-copy-from-stdin",
464                Command::InjectAuditEvents { .. } => "inject-audit-events",
465            },
466            Message::ControllerReady {
467                controller: ControllerReadiness::Compute,
468            } => "controller_ready(compute)",
469            Message::ControllerReady {
470                controller: ControllerReadiness::Storage,
471            } => "controller_ready(storage)",
472            Message::ControllerReady {
473                controller: ControllerReadiness::Metrics,
474            } => "controller_ready(metrics)",
475            Message::ControllerReady {
476                controller: ControllerReadiness::Internal,
477            } => "controller_ready(internal)",
478            Message::PurifiedStatementReady(_) => "purified_statement_ready",
479            Message::CreateConnectionValidationReady(_) => "create_connection_validation_ready",
480            Message::TryDeferred { .. } => "try_deferred",
481            Message::GroupCommitInitiate(..) => "group_commit_initiate",
482            Message::AdvanceTimelines => "advance_timelines",
483            Message::ClusterEvent(_) => "cluster_event",
484            Message::CancelPendingPeeks { .. } => "cancel_pending_peeks",
485            Message::LinearizeReads => "linearize_reads",
486            Message::StagedBatches { .. } => "staged_batches",
487            Message::StorageUsageSchedule => "storage_usage_schedule",
488            Message::StorageUsageFetch => "storage_usage_fetch",
489            Message::StorageUsageUpdate(_) => "storage_usage_update",
490            Message::StorageUsagePrune(_) => "storage_usage_prune",
491            Message::ArrangementSizesSchedule => "arrangement_sizes_schedule",
492            Message::ArrangementSizesSnapshot => "arrangement_sizes_snapshot",
493            Message::ArrangementSizesPrune(_) => "arrangement_sizes_prune",
494            Message::RetireExecute { .. } => "retire_execute",
495            Message::ExecuteSingleStatementTransaction { .. } => {
496                "execute_single_statement_transaction"
497            }
498            Message::PeekStageReady { .. } => "peek_stage_ready",
499            Message::ExplainTimestampStageReady { .. } => "explain_timestamp_stage_ready",
500            Message::CreateIndexStageReady { .. } => "create_index_stage_ready",
501            Message::CreateViewStageReady { .. } => "create_view_stage_ready",
502            Message::CreateMaterializedViewStageReady { .. } => {
503                "create_materialized_view_stage_ready"
504            }
505            Message::SubscribeStageReady { .. } => "subscribe_stage_ready",
506            Message::IntrospectionSubscribeStageReady { .. } => {
507                "introspection_subscribe_stage_ready"
508            }
509            Message::SecretStageReady { .. } => "secret_stage_ready",
510            Message::ClusterStageReady { .. } => "cluster_stage_ready",
511            Message::DrainStatementLog => "drain_statement_log",
512            Message::AlterConnectionValidationReady(..) => "alter_connection_validation_ready",
513            Message::PrivateLinkVpcEndpointEvents(_) => "private_link_vpc_endpoint_events",
514            Message::CheckSchedulingPolicies => "check_scheduling_policies",
515            Message::SchedulingDecisions { .. } => "scheduling_decision",
516            Message::DeferredStatementReady => "deferred_statement_ready",
517        }
518    }
519}
520
521/// The reason for why a controller needs processing on the main loop.
522#[derive(Debug)]
523pub enum ControllerReadiness {
524    /// The storage controller is ready.
525    Storage,
526    /// The compute controller is ready.
527    Compute,
528    /// A batch of metric data is ready.
529    Metrics,
530    /// An internally-generated message is ready to be returned.
531    Internal,
532}
533
534#[derive(Derivative)]
535#[derivative(Debug)]
536pub struct BackgroundWorkResult<T> {
537    #[derivative(Debug = "ignore")]
538    pub ctx: ExecuteContext,
539    pub result: Result<T, AdapterError>,
540    pub params: Params,
541    pub plan_validity: PlanValidity,
542    pub original_stmt: Arc<Statement<Raw>>,
543    pub otel_ctx: OpenTelemetryContext,
544}
545
546pub type PurifiedStatementReady = BackgroundWorkResult<mz_sql::pure::PurifiedStatement>;
547
548#[derive(Derivative)]
549#[derivative(Debug)]
550pub struct ValidationReady<T> {
551    #[derivative(Debug = "ignore")]
552    pub ctx: ExecuteContext,
553    pub result: Result<T, AdapterError>,
554    pub resolved_ids: ResolvedIds,
555    pub connection_id: CatalogItemId,
556    pub connection_gid: GlobalId,
557    pub plan_validity: PlanValidity,
558    pub otel_ctx: OpenTelemetryContext,
559}
560
561pub type CreateConnectionValidationReady = ValidationReady<CreateConnectionPlan>;
562pub type AlterConnectionValidationReady = ValidationReady<Connection>;
563
564#[derive(Debug)]
565pub enum PeekStage {
566    /// Common stages across SELECT, EXPLAIN and COPY TO queries.
567    LinearizeTimestamp(PeekStageLinearizeTimestamp),
568    RealTimeRecency(PeekStageRealTimeRecency),
569    TimestampReadHold(PeekStageTimestampReadHold),
570    Optimize(PeekStageOptimize),
571    /// Final stage for a peek.
572    Finish(PeekStageFinish),
573    /// Final stage for an explain.
574    ExplainPlan(PeekStageExplainPlan),
575    ExplainPushdown(PeekStageExplainPushdown),
576    /// Preflight checks for a copy to operation.
577    CopyToPreflight(PeekStageCopyTo),
578    /// Final stage for a copy to which involves shipping the dataflow.
579    CopyToDataflow(PeekStageCopyTo),
580}
581
582#[derive(Debug)]
583pub struct CopyToContext {
584    /// The `RelationDesc` of the data to be copied.
585    pub desc: RelationDesc,
586    /// The destination uri of the external service where the data will be copied.
587    pub uri: Uri,
588    /// Connection information required to connect to the external service to copy the data.
589    pub connection: StorageConnection<ReferencedConnection>,
590    /// The ID of the CONNECTION object to be used for copying the data.
591    pub connection_id: CatalogItemId,
592    /// Format params to format the data.
593    pub format: S3SinkFormat,
594    /// Approximate max file size of each uploaded file.
595    pub max_file_size: u64,
596    /// Number of batches the output of the COPY TO will be partitioned into
597    /// to distribute the load across workers deterministically.
598    /// This is only an option since it's not set when CopyToContext is instantiated
599    /// but immediately after in the PeekStageValidate stage.
600    pub output_batch_count: Option<u64>,
601}
602
603#[derive(Debug)]
604pub struct PeekStageLinearizeTimestamp {
605    validity: PlanValidity,
606    plan: mz_sql::plan::SelectPlan,
607    max_query_result_size: Option<u64>,
608    source_ids: BTreeSet<GlobalId>,
609    target_replica: Option<ReplicaId>,
610    timeline_context: TimelineContext,
611    optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
612    /// An optional context set iff the state machine is initiated from
613    /// sequencing an EXPLAIN for this statement.
614    explain_ctx: ExplainContext,
615}
616
617#[derive(Debug)]
618pub struct PeekStageRealTimeRecency {
619    validity: PlanValidity,
620    plan: mz_sql::plan::SelectPlan,
621    max_query_result_size: Option<u64>,
622    source_ids: BTreeSet<GlobalId>,
623    target_replica: Option<ReplicaId>,
624    timeline_context: TimelineContext,
625    oracle_read_ts: Option<Timestamp>,
626    optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
627    /// An optional context set iff the state machine is initiated from
628    /// sequencing an EXPLAIN for this statement.
629    explain_ctx: ExplainContext,
630}
631
632#[derive(Debug)]
633pub struct PeekStageTimestampReadHold {
634    validity: PlanValidity,
635    plan: mz_sql::plan::SelectPlan,
636    max_query_result_size: Option<u64>,
637    source_ids: BTreeSet<GlobalId>,
638    target_replica: Option<ReplicaId>,
639    timeline_context: TimelineContext,
640    oracle_read_ts: Option<Timestamp>,
641    real_time_recency_ts: Option<mz_repr::Timestamp>,
642    optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
643    /// An optional context set iff the state machine is initiated from
644    /// sequencing an EXPLAIN for this statement.
645    explain_ctx: ExplainContext,
646}
647
648#[derive(Debug)]
649pub struct PeekStageOptimize {
650    validity: PlanValidity,
651    plan: mz_sql::plan::SelectPlan,
652    max_query_result_size: Option<u64>,
653    source_ids: BTreeSet<GlobalId>,
654    id_bundle: CollectionIdBundle,
655    target_replica: Option<ReplicaId>,
656    determination: TimestampDetermination,
657    optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
658    /// An optional context set iff the state machine is initiated from
659    /// sequencing an EXPLAIN for this statement.
660    explain_ctx: ExplainContext,
661}
662
663#[derive(Debug)]
664pub struct PeekStageFinish {
665    validity: PlanValidity,
666    plan: mz_sql::plan::SelectPlan,
667    max_query_result_size: Option<u64>,
668    id_bundle: CollectionIdBundle,
669    target_replica: Option<ReplicaId>,
670    source_ids: BTreeSet<GlobalId>,
671    determination: TimestampDetermination,
672    cluster_id: ComputeInstanceId,
673    finishing: RowSetFinishing,
674    /// When present, an optimizer trace to be used for emitting a plan insights
675    /// notice.
676    plan_insights_optimizer_trace: Option<OptimizerTrace>,
677    insights_ctx: Option<Box<PlanInsightsContext>>,
678    global_lir_plan: optimize::peek::GlobalLirPlan,
679    optimization_finished_at: EpochMillis,
680}
681
682#[derive(Debug)]
683pub struct PeekStageCopyTo {
684    validity: PlanValidity,
685    optimizer: optimize::copy_to::Optimizer,
686    global_lir_plan: optimize::copy_to::GlobalLirPlan,
687    optimization_finished_at: EpochMillis,
688    source_ids: BTreeSet<GlobalId>,
689}
690
691#[derive(Debug)]
692pub struct PeekStageExplainPlan {
693    validity: PlanValidity,
694    optimizer: optimize::peek::Optimizer,
695    df_meta: DataflowMetainfo,
696    explain_ctx: ExplainPlanContext,
697    insights_ctx: Option<Box<PlanInsightsContext>>,
698}
699
700#[derive(Debug)]
701pub struct PeekStageExplainPushdown {
702    validity: PlanValidity,
703    determination: TimestampDetermination,
704    imports: BTreeMap<GlobalId, MapFilterProject>,
705}
706
707#[derive(Debug)]
708pub enum CreateIndexStage {
709    Optimize(CreateIndexOptimize),
710    Finish(CreateIndexFinish),
711    Explain(CreateIndexExplain),
712}
713
714#[derive(Debug)]
715pub struct CreateIndexOptimize {
716    validity: PlanValidity,
717    plan: plan::CreateIndexPlan,
718    resolved_ids: ResolvedIds,
719    /// An optional context set iff the state machine is initiated from
720    /// sequencing an EXPLAIN for this statement.
721    explain_ctx: ExplainContext,
722}
723
724#[derive(Debug)]
725pub struct CreateIndexFinish {
726    validity: PlanValidity,
727    item_id: CatalogItemId,
728    global_id: GlobalId,
729    plan: plan::CreateIndexPlan,
730    resolved_ids: ResolvedIds,
731    global_mir_plan: optimize::index::GlobalMirPlan,
732    global_lir_plan: optimize::index::GlobalLirPlan,
733    optimizer_features: OptimizerFeatures,
734}
735
736#[derive(Debug)]
737pub struct CreateIndexExplain {
738    validity: PlanValidity,
739    exported_index_id: GlobalId,
740    plan: plan::CreateIndexPlan,
741    df_meta: DataflowMetainfo,
742    explain_ctx: ExplainPlanContext,
743}
744
745#[derive(Debug)]
746pub enum CreateViewStage {
747    Optimize(CreateViewOptimize),
748    Finish(CreateViewFinish),
749    Explain(CreateViewExplain),
750}
751
752#[derive(Debug)]
753pub struct CreateViewOptimize {
754    validity: PlanValidity,
755    plan: plan::CreateViewPlan,
756    resolved_ids: ResolvedIds,
757    /// An optional context set iff the state machine is initiated from
758    /// sequencing an EXPLAIN for this statement.
759    explain_ctx: ExplainContext,
760}
761
762#[derive(Debug)]
763pub struct CreateViewFinish {
764    validity: PlanValidity,
765    /// ID of this item in the Catalog.
766    item_id: CatalogItemId,
767    /// ID by with Compute will reference this View.
768    global_id: GlobalId,
769    plan: plan::CreateViewPlan,
770    /// IDs of objects resolved during name resolution.
771    resolved_ids: ResolvedIds,
772    optimized_expr: OptimizedMirRelationExpr,
773}
774
775#[derive(Debug)]
776pub struct CreateViewExplain {
777    validity: PlanValidity,
778    id: GlobalId,
779    plan: plan::CreateViewPlan,
780    explain_ctx: ExplainPlanContext,
781}
782
783#[derive(Debug)]
784pub enum ExplainTimestampStage {
785    Optimize(ExplainTimestampOptimize),
786    RealTimeRecency(ExplainTimestampRealTimeRecency),
787    Finish(ExplainTimestampFinish),
788}
789
790#[derive(Debug)]
791pub struct ExplainTimestampOptimize {
792    validity: PlanValidity,
793    plan: plan::ExplainTimestampPlan,
794    cluster_id: ClusterId,
795}
796
797#[derive(Debug)]
798pub struct ExplainTimestampRealTimeRecency {
799    validity: PlanValidity,
800    format: ExplainFormat,
801    optimized_plan: OptimizedMirRelationExpr,
802    cluster_id: ClusterId,
803    when: QueryWhen,
804}
805
806#[derive(Debug)]
807pub struct ExplainTimestampFinish {
808    validity: PlanValidity,
809    format: ExplainFormat,
810    optimized_plan: OptimizedMirRelationExpr,
811    cluster_id: ClusterId,
812    source_ids: BTreeSet<GlobalId>,
813    when: QueryWhen,
814    real_time_recency_ts: Option<Timestamp>,
815}
816
817#[derive(Debug)]
818pub enum ClusterStage {
819    Alter(AlterCluster),
820    WaitForHydrated(AlterClusterWaitForHydrated),
821    Finalize(AlterClusterFinalize),
822}
823
824#[derive(Debug)]
825pub struct AlterCluster {
826    validity: PlanValidity,
827    plan: plan::AlterClusterPlan,
828}
829
830#[derive(Debug)]
831pub struct AlterClusterWaitForHydrated {
832    validity: PlanValidity,
833    plan: plan::AlterClusterPlan,
834    new_config: ClusterVariantManaged,
835    workload_class: Option<String>,
836    timeout_time: Instant,
837    on_timeout: OnTimeoutAction,
838}
839
840#[derive(Debug)]
841pub struct AlterClusterFinalize {
842    validity: PlanValidity,
843    plan: plan::AlterClusterPlan,
844    new_config: ClusterVariantManaged,
845    workload_class: Option<String>,
846}
847
848#[derive(Debug)]
849pub enum ExplainContext {
850    /// The ordinary, non-explain variant of the statement.
851    None,
852    /// The `EXPLAIN <level> PLAN FOR <explainee>` version of the statement.
853    Plan(ExplainPlanContext),
854    /// Generate a notice containing the `EXPLAIN PLAN INSIGHTS` output
855    /// alongside the query's normal output.
856    PlanInsightsNotice(OptimizerTrace),
857    /// `EXPLAIN FILTER PUSHDOWN`
858    Pushdown,
859}
860
861impl ExplainContext {
862    /// If available for this context, wrap the [`OptimizerTrace`] into a
863    /// [`tracing::Dispatch`] and set it as default, returning the resulting
864    /// guard in a `Some(guard)` option.
865    pub(crate) fn dispatch_guard(&self) -> Option<DispatchGuard<'_>> {
866        let optimizer_trace = match self {
867            ExplainContext::Plan(explain_ctx) => Some(&explain_ctx.optimizer_trace),
868            ExplainContext::PlanInsightsNotice(optimizer_trace) => Some(optimizer_trace),
869            _ => None,
870        };
871        optimizer_trace.map(|optimizer_trace| optimizer_trace.as_guard())
872    }
873
874    pub(crate) fn needs_cluster(&self) -> bool {
875        match self {
876            ExplainContext::None => true,
877            ExplainContext::Plan(..) => false,
878            ExplainContext::PlanInsightsNotice(..) => true,
879            ExplainContext::Pushdown => false,
880        }
881    }
882
883    pub(crate) fn needs_plan_insights(&self) -> bool {
884        matches!(
885            self,
886            ExplainContext::Plan(ExplainPlanContext {
887                stage: ExplainStage::PlanInsights,
888                ..
889            }) | ExplainContext::PlanInsightsNotice(_)
890        )
891    }
892}
893
894#[derive(Debug)]
895pub struct ExplainPlanContext {
896    /// EXPLAIN BROKEN is internal syntax for showing EXPLAIN output despite an internal error in
897    /// the optimizer: we don't immediately bail out from peek sequencing when an internal optimizer
898    /// error happens, but go on with trying to show the requested EXPLAIN stage. This can still
899    /// succeed if the requested EXPLAIN stage is before the point where the error happened.
900    pub broken: bool,
901    pub config: ExplainConfig,
902    pub format: ExplainFormat,
903    pub stage: ExplainStage,
904    pub replan: Option<GlobalId>,
905    pub desc: Option<RelationDesc>,
906    pub optimizer_trace: OptimizerTrace,
907}
908
909#[derive(Debug)]
910pub enum CreateMaterializedViewStage {
911    Optimize(CreateMaterializedViewOptimize),
912    Finish(CreateMaterializedViewFinish),
913    Explain(CreateMaterializedViewExplain),
914}
915
916#[derive(Debug)]
917pub struct CreateMaterializedViewOptimize {
918    validity: PlanValidity,
919    plan: plan::CreateMaterializedViewPlan,
920    resolved_ids: ResolvedIds,
921    /// An optional context set iff the state machine is initiated from
922    /// sequencing an EXPLAIN for this statement.
923    explain_ctx: ExplainContext,
924}
925
926#[derive(Debug)]
927pub struct CreateMaterializedViewFinish {
928    /// The ID of this Materialized View in the Catalog.
929    item_id: CatalogItemId,
930    /// The ID of the durable pTVC backing this Materialized View.
931    global_id: GlobalId,
932    validity: PlanValidity,
933    plan: plan::CreateMaterializedViewPlan,
934    resolved_ids: ResolvedIds,
935    local_mir_plan: optimize::materialized_view::LocalMirPlan,
936    global_mir_plan: optimize::materialized_view::GlobalMirPlan,
937    global_lir_plan: optimize::materialized_view::GlobalLirPlan,
938    optimizer_features: OptimizerFeatures,
939}
940
941#[derive(Debug)]
942pub struct CreateMaterializedViewExplain {
943    global_id: GlobalId,
944    validity: PlanValidity,
945    plan: plan::CreateMaterializedViewPlan,
946    df_meta: DataflowMetainfo,
947    explain_ctx: ExplainPlanContext,
948}
949
950#[derive(Debug)]
951pub enum SubscribeStage {
952    OptimizeMir(SubscribeOptimizeMir),
953    TimestampOptimizeLir(SubscribeTimestampOptimizeLir),
954    Finish(SubscribeFinish),
955    Explain(SubscribeExplain),
956}
957
958#[derive(Debug)]
959pub struct SubscribeOptimizeMir {
960    validity: PlanValidity,
961    plan: plan::SubscribePlan,
962    timeline: TimelineContext,
963    dependency_ids: BTreeSet<GlobalId>,
964    cluster_id: ComputeInstanceId,
965    replica_id: Option<ReplicaId>,
966    /// An optional context set iff the state machine is initiated from
967    /// sequencing an EXPLAIN for this statement.
968    explain_ctx: ExplainContext,
969}
970
971#[derive(Debug)]
972pub struct SubscribeTimestampOptimizeLir {
973    validity: PlanValidity,
974    plan: plan::SubscribePlan,
975    timeline: TimelineContext,
976    optimizer: optimize::subscribe::Optimizer,
977    global_mir_plan: optimize::subscribe::GlobalMirPlan<optimize::subscribe::Unresolved>,
978    dependency_ids: BTreeSet<GlobalId>,
979    replica_id: Option<ReplicaId>,
980    /// An optional context set iff the state machine is initiated from
981    /// sequencing an EXPLAIN for this statement.
982    explain_ctx: ExplainContext,
983}
984
985#[derive(Debug)]
986pub struct SubscribeFinish {
987    validity: PlanValidity,
988    cluster_id: ComputeInstanceId,
989    replica_id: Option<ReplicaId>,
990    plan: plan::SubscribePlan,
991    global_lir_plan: optimize::subscribe::GlobalLirPlan,
992    dependency_ids: BTreeSet<GlobalId>,
993}
994
995#[derive(Debug)]
996pub struct SubscribeExplain {
997    validity: PlanValidity,
998    optimizer: optimize::subscribe::Optimizer,
999    df_meta: DataflowMetainfo,
1000    cluster_id: ComputeInstanceId,
1001    explain_ctx: ExplainPlanContext,
1002}
1003
1004#[derive(Debug)]
1005pub enum IntrospectionSubscribeStage {
1006    OptimizeMir(IntrospectionSubscribeOptimizeMir),
1007    TimestampOptimizeLir(IntrospectionSubscribeTimestampOptimizeLir),
1008    Finish(IntrospectionSubscribeFinish),
1009}
1010
1011#[derive(Debug)]
1012pub struct IntrospectionSubscribeOptimizeMir {
1013    validity: PlanValidity,
1014    plan: plan::SubscribePlan,
1015    subscribe_id: GlobalId,
1016    cluster_id: ComputeInstanceId,
1017    replica_id: ReplicaId,
1018}
1019
1020#[derive(Debug)]
1021pub struct IntrospectionSubscribeTimestampOptimizeLir {
1022    validity: PlanValidity,
1023    optimizer: optimize::subscribe::Optimizer,
1024    global_mir_plan: optimize::subscribe::GlobalMirPlan<optimize::subscribe::Unresolved>,
1025    cluster_id: ComputeInstanceId,
1026    replica_id: ReplicaId,
1027}
1028
1029#[derive(Debug)]
1030pub struct IntrospectionSubscribeFinish {
1031    validity: PlanValidity,
1032    global_lir_plan: optimize::subscribe::GlobalLirPlan,
1033    read_holds: ReadHolds,
1034    cluster_id: ComputeInstanceId,
1035    replica_id: ReplicaId,
1036}
1037
1038#[derive(Debug)]
1039pub enum SecretStage {
1040    CreateEnsure(CreateSecretEnsure),
1041    CreateFinish(CreateSecretFinish),
1042    RotateKeysEnsure(RotateKeysSecretEnsure),
1043    RotateKeysFinish(RotateKeysSecretFinish),
1044    Alter(AlterSecret),
1045}
1046
1047#[derive(Debug)]
1048pub struct CreateSecretEnsure {
1049    validity: PlanValidity,
1050    plan: plan::CreateSecretPlan,
1051}
1052
1053#[derive(Debug)]
1054pub struct CreateSecretFinish {
1055    validity: PlanValidity,
1056    item_id: CatalogItemId,
1057    global_id: GlobalId,
1058    plan: plan::CreateSecretPlan,
1059}
1060
1061#[derive(Debug)]
1062pub struct RotateKeysSecretEnsure {
1063    validity: PlanValidity,
1064    id: CatalogItemId,
1065}
1066
1067#[derive(Debug)]
1068pub struct RotateKeysSecretFinish {
1069    validity: PlanValidity,
1070    ops: Vec<crate::catalog::Op>,
1071}
1072
1073#[derive(Debug)]
1074pub struct AlterSecret {
1075    validity: PlanValidity,
1076    plan: plan::AlterSecretPlan,
1077}
1078
1079/// An enum describing which cluster to run a statement on.
1080///
1081/// One example usage would be that if a query depends only on system tables, we might
1082/// automatically run it on the catalog server cluster to benefit from indexes that exist there.
1083#[derive(Debug, Copy, Clone, PartialEq, Eq)]
1084pub enum TargetCluster {
1085    /// The catalog server cluster.
1086    CatalogServer,
1087    /// The current user's active cluster.
1088    Active,
1089    /// The cluster selected at the start of a transaction.
1090    Transaction(ClusterId),
1091}
1092
1093/// Result types for each stage of a sequence.
1094pub(crate) enum StageResult<T> {
1095    /// A task was spawned that will return the next stage.
1096    Handle(JoinHandle<Result<T, AdapterError>>),
1097    /// A task was spawned that will return a response for the client.
1098    HandleRetire(JoinHandle<Result<ExecuteResponse, AdapterError>>),
1099    /// The next stage is immediately ready and will execute.
1100    Immediate(T),
1101    /// The final stage was executed and is ready to respond to the client.
1102    Response(ExecuteResponse),
1103}
1104
1105/// Common functionality for [Coordinator::sequence_staged].
1106pub(crate) trait Staged: Send {
1107    type Ctx: StagedContext;
1108
1109    fn validity(&mut self) -> &mut PlanValidity;
1110
1111    /// Returns the next stage or final result.
1112    async fn stage(
1113        self,
1114        coord: &mut Coordinator,
1115        ctx: &mut Self::Ctx,
1116    ) -> Result<StageResult<Box<Self>>, AdapterError>;
1117
1118    /// Prepares a message for the Coordinator.
1119    fn message(self, ctx: Self::Ctx, span: Span) -> Message;
1120
1121    /// Whether it is safe to SQL cancel this stage.
1122    fn cancel_enabled(&self) -> bool;
1123}
1124
1125pub trait StagedContext {
1126    fn retire(self, result: Result<ExecuteResponse, AdapterError>);
1127    fn session(&self) -> Option<&Session>;
1128}
1129
1130impl StagedContext for ExecuteContext {
1131    fn retire(self, result: Result<ExecuteResponse, AdapterError>) {
1132        self.retire(result);
1133    }
1134
1135    fn session(&self) -> Option<&Session> {
1136        Some(self.session())
1137    }
1138}
1139
1140impl StagedContext for () {
1141    fn retire(self, _result: Result<ExecuteResponse, AdapterError>) {}
1142
1143    fn session(&self) -> Option<&Session> {
1144        None
1145    }
1146}
1147
1148/// Configures a coordinator.
1149pub struct Config {
1150    pub controller_config: ControllerConfig,
1151    pub controller_envd_epoch: NonZeroI64,
1152    pub storage: Box<dyn mz_catalog::durable::DurableCatalogState>,
1153    pub audit_logs_iterator: AuditLogIterator,
1154    pub timestamp_oracle_url: Option<SensitiveUrl>,
1155    pub unsafe_mode: bool,
1156    pub all_features: bool,
1157    pub build_info: &'static BuildInfo,
1158    pub environment_id: EnvironmentId,
1159    pub metrics_registry: MetricsRegistry,
1160    pub now: NowFn,
1161    pub secrets_controller: Arc<dyn SecretsController>,
1162    pub cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
1163    pub availability_zones: Vec<String>,
1164    pub cluster_replica_sizes: ClusterReplicaSizeMap,
1165    pub builtin_system_cluster_config: BootstrapBuiltinClusterConfig,
1166    pub builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig,
1167    pub builtin_probe_cluster_config: BootstrapBuiltinClusterConfig,
1168    pub builtin_support_cluster_config: BootstrapBuiltinClusterConfig,
1169    pub builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig,
1170    pub system_parameter_defaults: BTreeMap<String, String>,
1171    pub storage_usage_client: StorageUsageClient,
1172    pub storage_usage_collection_interval: Duration,
1173    pub storage_usage_retention_period: Option<Duration>,
1174    pub segment_client: Option<mz_segment::Client>,
1175    pub egress_addresses: Vec<IpNet>,
1176    pub remote_system_parameters: Option<BTreeMap<String, String>>,
1177    pub aws_account_id: Option<String>,
1178    pub aws_privatelink_availability_zones: Option<Vec<String>>,
1179    pub connection_context: ConnectionContext,
1180    pub connection_limit_callback: Box<dyn Fn(u64, u64) -> () + Send + Sync + 'static>,
1181    pub webhook_concurrency_limit: WebhookConcurrencyLimiter,
1182    pub http_host_name: Option<String>,
1183    pub tracing_handle: TracingHandle,
1184    /// Whether or not to start controllers in read-only mode. This is only
1185    /// meant for use during development of read-only clusters and 0dt upgrades
1186    /// and should go away once we have proper orchestration during upgrades.
1187    pub read_only_controllers: bool,
1188
1189    /// A trigger that signals that the current deployment has caught up with a
1190    /// previous deployment. Only used during 0dt deployment, while in read-only
1191    /// mode.
1192    pub caught_up_trigger: Option<Trigger>,
1193
1194    pub helm_chart_version: Option<String>,
1195    pub license_key: ValidatedLicenseKey,
1196    pub external_login_password_mz_system: Option<Password>,
1197    pub force_builtin_schema_migration: Option<String>,
1198}
1199
1200/// Metadata about an active connection.
1201#[derive(Debug, Serialize)]
1202pub struct ConnMeta {
1203    /// Pgwire specifies that every connection have a 32-bit secret associated
1204    /// with it, that is known to both the client and the server. Cancellation
1205    /// requests are required to authenticate with the secret of the connection
1206    /// that they are targeting.
1207    secret_key: u32,
1208    /// The time when the session's connection was initiated.
1209    connected_at: EpochMillis,
1210    user: User,
1211    application_name: String,
1212    uuid: Uuid,
1213    conn_id: ConnectionId,
1214    client_ip: Option<IpAddr>,
1215
1216    /// Sinks that will need to be dropped when the current transaction, if
1217    /// any, is cleared.
1218    drop_sinks: BTreeSet<GlobalId>,
1219
1220    /// Lock for the Coordinator's deferred statements that is dropped on transaction clear.
1221    #[serde(skip)]
1222    deferred_lock: Option<OwnedMutexGuard<()>>,
1223
1224    /// Cluster reconfigurations that will need to be
1225    /// cleaned up when the current transaction is cleared
1226    pending_cluster_alters: BTreeSet<ClusterId>,
1227
1228    /// Channel on which to send notices to a session.
1229    #[serde(skip)]
1230    notice_tx: mpsc::UnboundedSender<AdapterNotice>,
1231
1232    /// The role that initiated the database context. Fixed for the duration of the connection.
1233    /// WARNING: This role reference is not updated when the role is dropped.
1234    /// Consumers should not assume that this role exist.
1235    authenticated_role: RoleId,
1236}
1237
1238impl ConnMeta {
1239    pub fn conn_id(&self) -> &ConnectionId {
1240        &self.conn_id
1241    }
1242
1243    pub fn user(&self) -> &User {
1244        &self.user
1245    }
1246
1247    pub fn application_name(&self) -> &str {
1248        &self.application_name
1249    }
1250
1251    pub fn authenticated_role_id(&self) -> &RoleId {
1252        &self.authenticated_role
1253    }
1254
1255    pub fn uuid(&self) -> Uuid {
1256        self.uuid
1257    }
1258
1259    pub fn client_ip(&self) -> Option<IpAddr> {
1260        self.client_ip
1261    }
1262
1263    pub fn connected_at(&self) -> EpochMillis {
1264        self.connected_at
1265    }
1266}
1267
1268#[derive(Debug)]
1269/// A pending transaction waiting to be committed.
1270pub struct PendingTxn {
1271    /// Context used to send a response back to the client.
1272    ctx: ExecuteContext,
1273    /// Client response for transaction.
1274    response: Result<PendingTxnResponse, AdapterError>,
1275    /// The action to take at the end of the transaction.
1276    action: EndTransactionAction,
1277}
1278
1279#[derive(Debug)]
1280/// The response we'll send for a [`PendingTxn`].
1281pub enum PendingTxnResponse {
1282    /// The transaction will be committed.
1283    Committed {
1284        /// Parameters that will change, and their values, once this transaction is complete.
1285        params: BTreeMap<&'static str, String>,
1286    },
1287    /// The transaction will be rolled back.
1288    Rolledback {
1289        /// Parameters that will change, and their values, once this transaction is complete.
1290        params: BTreeMap<&'static str, String>,
1291    },
1292}
1293
1294impl PendingTxnResponse {
1295    pub fn extend_params(&mut self, p: impl IntoIterator<Item = (&'static str, String)>) {
1296        match self {
1297            PendingTxnResponse::Committed { params }
1298            | PendingTxnResponse::Rolledback { params } => params.extend(p),
1299        }
1300    }
1301}
1302
1303impl From<PendingTxnResponse> for ExecuteResponse {
1304    fn from(value: PendingTxnResponse) -> Self {
1305        match value {
1306            PendingTxnResponse::Committed { params } => {
1307                ExecuteResponse::TransactionCommitted { params }
1308            }
1309            PendingTxnResponse::Rolledback { params } => {
1310                ExecuteResponse::TransactionRolledBack { params }
1311            }
1312        }
1313    }
1314}
1315
1316#[derive(Debug)]
1317/// A pending read transaction waiting to be linearized along with metadata about it's state
1318pub struct PendingReadTxn {
1319    /// The transaction type
1320    txn: PendingRead,
1321    /// The timestamp context of the transaction.
1322    timestamp_context: TimestampContext,
1323    /// When we created this pending txn, when the transaction ends. Only used for metrics.
1324    created: Instant,
1325    /// Number of times we requeued the processing of this pending read txn.
1326    /// Requeueing is necessary if the time we executed the query is after the current oracle time;
1327    /// see [`Coordinator::message_linearize_reads`] for more details.
1328    num_requeues: u64,
1329    /// Telemetry context.
1330    otel_ctx: OpenTelemetryContext,
1331}
1332
1333impl PendingReadTxn {
1334    /// Return the timestamp context of the pending read transaction.
1335    pub fn timestamp_context(&self) -> &TimestampContext {
1336        &self.timestamp_context
1337    }
1338
1339    pub(crate) fn take_context(self) -> ExecuteContext {
1340        self.txn.take_context()
1341    }
1342}
1343
1344#[derive(Debug)]
1345/// A pending read transaction waiting to be linearized.
1346enum PendingRead {
1347    Read {
1348        /// The inner transaction.
1349        txn: PendingTxn,
1350    },
1351    ReadThenWrite {
1352        /// Context used to send a response back to the client.
1353        ctx: ExecuteContext,
1354        /// Channel used to alert the transaction that the read has been linearized and send back
1355        /// `ctx`.
1356        tx: oneshot::Sender<Option<ExecuteContext>>,
1357    },
1358}
1359
1360impl PendingRead {
1361    /// Alert the client that the read has been linearized.
1362    ///
1363    /// If it is necessary to finalize an execute, return the state necessary to do so
1364    /// (execution context and result)
1365    #[instrument(level = "debug")]
1366    pub fn finish(self) -> Option<(ExecuteContext, Result<ExecuteResponse, AdapterError>)> {
1367        match self {
1368            PendingRead::Read {
1369                txn:
1370                    PendingTxn {
1371                        mut ctx,
1372                        response,
1373                        action,
1374                    },
1375                ..
1376            } => {
1377                let changed = ctx.session_mut().vars_mut().end_transaction(action);
1378                // Append any parameters that changed to the response.
1379                let response = response.map(|mut r| {
1380                    r.extend_params(changed);
1381                    ExecuteResponse::from(r)
1382                });
1383
1384                Some((ctx, response))
1385            }
1386            PendingRead::ReadThenWrite { ctx, tx, .. } => {
1387                // Ignore errors if the caller has hung up.
1388                let _ = tx.send(Some(ctx));
1389                None
1390            }
1391        }
1392    }
1393
1394    fn label(&self) -> &'static str {
1395        match self {
1396            PendingRead::Read { .. } => "read",
1397            PendingRead::ReadThenWrite { .. } => "read_then_write",
1398        }
1399    }
1400
1401    pub(crate) fn take_context(self) -> ExecuteContext {
1402        match self {
1403            PendingRead::Read { txn, .. } => txn.ctx,
1404            PendingRead::ReadThenWrite { ctx, tx, .. } => {
1405                // Inform the transaction that we've taken their context.
1406                // Ignore errors if the caller has hung up.
1407                let _ = tx.send(None);
1408                ctx
1409            }
1410        }
1411    }
1412}
1413
1414/// State that the coordinator must process as part of retiring
1415/// command execution.  `ExecuteContextExtra::Default` is guaranteed
1416/// to produce a value that will cause the coordinator to do nothing, and
1417/// is intended for use by code that invokes the execution processing flow
1418/// (i.e., `sequence_plan`) without actually being a statement execution.
1419///
1420/// This is a pure data struct containing only the statement logging ID.
1421/// For auto-retire-on-drop behavior, use `ExecuteContextGuard` which wraps
1422/// this struct and owns the channel for sending retirement messages.
1423#[derive(Debug, Default)]
1424#[must_use]
1425pub struct ExecuteContextExtra {
1426    statement_uuid: Option<StatementLoggingId>,
1427}
1428
1429impl ExecuteContextExtra {
1430    pub(crate) fn new(statement_uuid: Option<StatementLoggingId>) -> Self {
1431        Self { statement_uuid }
1432    }
1433    pub fn is_trivial(&self) -> bool {
1434        self.statement_uuid.is_none()
1435    }
1436    pub fn contents(&self) -> Option<StatementLoggingId> {
1437        self.statement_uuid
1438    }
1439    /// Consume this extra and return the statement UUID for retirement.
1440    /// This should only be called from code that knows what to do to finish
1441    /// up logging based on the inner value.
1442    #[must_use]
1443    pub(crate) fn retire(self) -> Option<StatementLoggingId> {
1444        self.statement_uuid
1445    }
1446}
1447
1448/// A guard that wraps `ExecuteContextExtra` and owns a channel for sending
1449/// retirement messages to the coordinator.
1450///
1451/// If this guard is dropped with a `Some` `statement_uuid` in its inner
1452/// `ExecuteContextExtra`, the `Drop` implementation will automatically send a
1453/// `Message::RetireExecute` to log the statement ending.
1454/// This handles cases like connection drops where the context cannot be
1455/// explicitly retired.
1456/// See <https://github.com/MaterializeInc/database-issues/issues/7304>
1457#[derive(Debug)]
1458#[must_use]
1459pub struct ExecuteContextGuard {
1460    extra: ExecuteContextExtra,
1461    /// Channel for sending messages to the coordinator. Used for auto-retiring on drop.
1462    /// For `Default` instances, this is a dummy sender (receiver already dropped), so
1463    /// sends will fail silently - which is the desired behavior since Default instances
1464    /// should only be used for non-logged statements.
1465    coordinator_tx: mpsc::UnboundedSender<Message>,
1466}
1467
1468impl Default for ExecuteContextGuard {
1469    fn default() -> Self {
1470        // Create a dummy sender by immediately dropping the receiver.
1471        // Any send on this channel will fail silently, which is the desired
1472        // behavior for Default instances (non-logged statements).
1473        let (tx, _rx) = mpsc::unbounded_channel();
1474        Self {
1475            extra: ExecuteContextExtra::default(),
1476            coordinator_tx: tx,
1477        }
1478    }
1479}
1480
1481impl ExecuteContextGuard {
1482    pub(crate) fn new(
1483        statement_uuid: Option<StatementLoggingId>,
1484        coordinator_tx: mpsc::UnboundedSender<Message>,
1485    ) -> Self {
1486        Self {
1487            extra: ExecuteContextExtra::new(statement_uuid),
1488            coordinator_tx,
1489        }
1490    }
1491    pub fn is_trivial(&self) -> bool {
1492        self.extra.is_trivial()
1493    }
1494    pub fn contents(&self) -> Option<StatementLoggingId> {
1495        self.extra.contents()
1496    }
1497    /// Take responsibility for the contents.  This should only be
1498    /// called from code that knows what to do to finish up logging
1499    /// based on the inner value.
1500    ///
1501    /// Returns the inner `ExecuteContextExtra`, consuming the guard without
1502    /// triggering the auto-retire behavior.
1503    pub(crate) fn defuse(mut self) -> ExecuteContextExtra {
1504        // Taking statement_uuid prevents the Drop impl from sending a retire message
1505        std::mem::take(&mut self.extra)
1506    }
1507}
1508
1509impl Drop for ExecuteContextGuard {
1510    fn drop(&mut self) {
1511        if let Some(statement_uuid) = self.extra.statement_uuid.take() {
1512            // Auto-retire since the guard was dropped without explicit retirement (likely due
1513            // to connection drop).
1514            let msg = Message::RetireExecute {
1515                data: ExecuteContextExtra {
1516                    statement_uuid: Some(statement_uuid),
1517                },
1518                otel_ctx: OpenTelemetryContext::obtain(),
1519                reason: StatementEndedExecutionReason::Aborted,
1520            };
1521            // Send may fail for Default instances (dummy sender), which is fine since
1522            // Default instances should only be used for non-logged statements.
1523            let _ = self.coordinator_tx.send(msg);
1524        }
1525    }
1526}
1527
1528/// Bundle of state related to statement execution.
1529///
1530/// This struct collects a bundle of state that needs to be threaded
1531/// through various functions as part of statement execution.
1532/// It is used to finalize execution, by calling `retire`. Finalizing execution
1533/// involves sending the session back to the pgwire layer so that it
1534/// may be used to process further commands. It also involves
1535/// performing some work on the main coordinator thread
1536/// (e.g., recording the time at which the statement finished
1537/// executing). The state necessary to perform this work is bundled in
1538/// the `ExecuteContextGuard` object.
1539#[derive(Debug)]
1540pub struct ExecuteContext {
1541    inner: Box<ExecuteContextInner>,
1542}
1543
1544impl std::ops::Deref for ExecuteContext {
1545    type Target = ExecuteContextInner;
1546    fn deref(&self) -> &Self::Target {
1547        &*self.inner
1548    }
1549}
1550
1551impl std::ops::DerefMut for ExecuteContext {
1552    fn deref_mut(&mut self) -> &mut Self::Target {
1553        &mut *self.inner
1554    }
1555}
1556
1557#[derive(Debug)]
1558pub struct ExecuteContextInner {
1559    tx: ClientTransmitter<ExecuteResponse>,
1560    internal_cmd_tx: mpsc::UnboundedSender<Message>,
1561    session: Session,
1562    extra: ExecuteContextGuard,
1563}
1564
1565impl ExecuteContext {
1566    pub fn session(&self) -> &Session {
1567        &self.session
1568    }
1569
1570    pub fn session_mut(&mut self) -> &mut Session {
1571        &mut self.session
1572    }
1573
1574    pub fn tx(&self) -> &ClientTransmitter<ExecuteResponse> {
1575        &self.tx
1576    }
1577
1578    pub fn tx_mut(&mut self) -> &mut ClientTransmitter<ExecuteResponse> {
1579        &mut self.tx
1580    }
1581
1582    pub fn from_parts(
1583        tx: ClientTransmitter<ExecuteResponse>,
1584        internal_cmd_tx: mpsc::UnboundedSender<Message>,
1585        session: Session,
1586        extra: ExecuteContextGuard,
1587    ) -> Self {
1588        Self {
1589            inner: ExecuteContextInner {
1590                tx,
1591                session,
1592                extra,
1593                internal_cmd_tx,
1594            }
1595            .into(),
1596        }
1597    }
1598
1599    /// By calling this function, the caller takes responsibility for
1600    /// dealing with the instance of `ExecuteContextGuard`. This is
1601    /// intended to support protocols (like `COPY FROM`) that involve
1602    /// multiple passes of sending the session back and forth between
1603    /// the coordinator and the pgwire layer. As part of any such
1604    /// protocol, we must ensure that the `ExecuteContextGuard`
1605    /// (possibly wrapped in a new `ExecuteContext`) is passed back to the coordinator for
1606    /// eventual retirement.
1607    pub fn into_parts(
1608        self,
1609    ) -> (
1610        ClientTransmitter<ExecuteResponse>,
1611        mpsc::UnboundedSender<Message>,
1612        Session,
1613        ExecuteContextGuard,
1614    ) {
1615        let ExecuteContextInner {
1616            tx,
1617            internal_cmd_tx,
1618            session,
1619            extra,
1620        } = *self.inner;
1621        (tx, internal_cmd_tx, session, extra)
1622    }
1623
1624    /// Retire the execution, by sending a message to the coordinator.
1625    #[instrument(level = "debug")]
1626    pub fn retire(self, result: Result<ExecuteResponse, AdapterError>) {
1627        let ExecuteContextInner {
1628            tx,
1629            internal_cmd_tx,
1630            session,
1631            extra,
1632        } = *self.inner;
1633        let reason = if extra.is_trivial() {
1634            None
1635        } else {
1636            Some((&result).into())
1637        };
1638        tx.send(result, session);
1639        if let Some(reason) = reason {
1640            // Retire the guard to get the inner ExecuteContextExtra without triggering auto-retire
1641            let extra = extra.defuse();
1642            if let Err(e) = internal_cmd_tx.send(Message::RetireExecute {
1643                otel_ctx: OpenTelemetryContext::obtain(),
1644                data: extra,
1645                reason,
1646            }) {
1647                warn!("internal_cmd_rx dropped before we could send: {:?}", e);
1648            }
1649        }
1650    }
1651
1652    pub fn extra(&self) -> &ExecuteContextGuard {
1653        &self.extra
1654    }
1655
1656    pub fn extra_mut(&mut self) -> &mut ExecuteContextGuard {
1657        &mut self.extra
1658    }
1659}
1660
1661#[derive(Debug)]
1662struct ClusterReplicaStatuses(
1663    BTreeMap<ClusterId, BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>>>,
1664);
1665
1666impl ClusterReplicaStatuses {
1667    pub(crate) fn new() -> ClusterReplicaStatuses {
1668        ClusterReplicaStatuses(BTreeMap::new())
1669    }
1670
1671    /// Initializes the statuses of the specified cluster.
1672    ///
1673    /// Panics if the cluster statuses are already initialized.
1674    pub(crate) fn initialize_cluster_statuses(&mut self, cluster_id: ClusterId) {
1675        let prev = self.0.insert(cluster_id, BTreeMap::new());
1676        assert_eq!(
1677            prev, None,
1678            "cluster {cluster_id} statuses already initialized"
1679        );
1680    }
1681
1682    /// Initializes the statuses of the specified cluster replica.
1683    ///
1684    /// Panics if the cluster replica statuses are already initialized.
1685    pub(crate) fn initialize_cluster_replica_statuses(
1686        &mut self,
1687        cluster_id: ClusterId,
1688        replica_id: ReplicaId,
1689        num_processes: usize,
1690        time: DateTime<Utc>,
1691    ) {
1692        tracing::info!(
1693            ?cluster_id,
1694            ?replica_id,
1695            ?time,
1696            "initializing cluster replica status"
1697        );
1698        let replica_statuses = self.0.entry(cluster_id).or_default();
1699        let process_statuses = (0..num_processes)
1700            .map(|process_id| {
1701                let status = ClusterReplicaProcessStatus {
1702                    status: ClusterStatus::Offline(Some(OfflineReason::Initializing)),
1703                    time: time.clone(),
1704                };
1705                (u64::cast_from(process_id), status)
1706            })
1707            .collect();
1708        let prev = replica_statuses.insert(replica_id, process_statuses);
1709        assert_none!(
1710            prev,
1711            "cluster replica {cluster_id}.{replica_id} statuses already initialized"
1712        );
1713    }
1714
1715    /// Removes the statuses of the specified cluster.
1716    ///
1717    /// Panics if the cluster does not exist.
1718    pub(crate) fn remove_cluster_statuses(
1719        &mut self,
1720        cluster_id: &ClusterId,
1721    ) -> BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>> {
1722        let prev = self.0.remove(cluster_id);
1723        prev.unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"))
1724    }
1725
1726    /// Removes the statuses of the specified cluster replica.
1727    ///
1728    /// Panics if the cluster or replica does not exist.
1729    pub(crate) fn remove_cluster_replica_statuses(
1730        &mut self,
1731        cluster_id: &ClusterId,
1732        replica_id: &ReplicaId,
1733    ) -> BTreeMap<ProcessId, ClusterReplicaProcessStatus> {
1734        let replica_statuses = self
1735            .0
1736            .get_mut(cluster_id)
1737            .unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"));
1738        let prev = replica_statuses.remove(replica_id);
1739        prev.unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1740    }
1741
1742    /// Inserts or updates the status of the specified cluster replica process.
1743    ///
1744    /// Panics if the cluster or replica does not exist.
1745    pub(crate) fn ensure_cluster_status(
1746        &mut self,
1747        cluster_id: ClusterId,
1748        replica_id: ReplicaId,
1749        process_id: ProcessId,
1750        status: ClusterReplicaProcessStatus,
1751    ) {
1752        let replica_statuses = self
1753            .0
1754            .get_mut(&cluster_id)
1755            .unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"))
1756            .get_mut(&replica_id)
1757            .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"));
1758        replica_statuses.insert(process_id, status);
1759    }
1760
1761    /// Computes the status of the cluster replica as a whole.
1762    ///
1763    /// Panics if `cluster_id` or `replica_id` don't exist.
1764    pub fn get_cluster_replica_status(
1765        &self,
1766        cluster_id: ClusterId,
1767        replica_id: ReplicaId,
1768    ) -> ClusterStatus {
1769        let process_status = self.get_cluster_replica_statuses(cluster_id, replica_id);
1770        Self::cluster_replica_status(process_status)
1771    }
1772
1773    /// Computes the status of the cluster replica as a whole.
1774    pub fn cluster_replica_status(
1775        process_status: &BTreeMap<ProcessId, ClusterReplicaProcessStatus>,
1776    ) -> ClusterStatus {
1777        process_status
1778            .values()
1779            .fold(ClusterStatus::Online, |s, p| match (s, p.status) {
1780                (ClusterStatus::Online, ClusterStatus::Online) => ClusterStatus::Online,
1781                (x, y) => {
1782                    let reason_x = match x {
1783                        ClusterStatus::Offline(reason) => reason,
1784                        ClusterStatus::Online => None,
1785                    };
1786                    let reason_y = match y {
1787                        ClusterStatus::Offline(reason) => reason,
1788                        ClusterStatus::Online => None,
1789                    };
1790                    // Arbitrarily pick the first known not-ready reason.
1791                    ClusterStatus::Offline(reason_x.or(reason_y))
1792                }
1793            })
1794    }
1795
1796    /// Gets the statuses of the given cluster replica.
1797    ///
1798    /// Panics if the cluster or replica does not exist
1799    pub(crate) fn get_cluster_replica_statuses(
1800        &self,
1801        cluster_id: ClusterId,
1802        replica_id: ReplicaId,
1803    ) -> &BTreeMap<ProcessId, ClusterReplicaProcessStatus> {
1804        self.try_get_cluster_replica_statuses(cluster_id, replica_id)
1805            .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1806    }
1807
1808    /// Gets the statuses of the given cluster replica.
1809    pub(crate) fn try_get_cluster_replica_statuses(
1810        &self,
1811        cluster_id: ClusterId,
1812        replica_id: ReplicaId,
1813    ) -> Option<&BTreeMap<ProcessId, ClusterReplicaProcessStatus>> {
1814        self.try_get_cluster_statuses(cluster_id)
1815            .and_then(|statuses| statuses.get(&replica_id))
1816    }
1817
1818    /// Gets the statuses of the given cluster.
1819    pub(crate) fn try_get_cluster_statuses(
1820        &self,
1821        cluster_id: ClusterId,
1822    ) -> Option<&BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>>> {
1823        self.0.get(&cluster_id)
1824    }
1825}
1826
1827/// Glues the external world to the Timely workers.
1828#[derive(Derivative)]
1829#[derivative(Debug)]
1830pub struct Coordinator {
1831    /// The controller for the storage and compute layers.
1832    #[derivative(Debug = "ignore")]
1833    controller: mz_controller::Controller,
1834    /// The catalog in an Arc suitable for readonly references. The Arc allows
1835    /// us to hand out cheap copies of the catalog to functions that can use it
1836    /// off of the main coordinator thread. If the coordinator needs to mutate
1837    /// the catalog, call [`Self::catalog_mut`], which will clone this struct member,
1838    /// allowing it to be mutated here while the other off-thread references can
1839    /// read their catalog as long as needed. In the future we would like this
1840    /// to be a pTVC, but for now this is sufficient.
1841    catalog: Arc<Catalog>,
1842
1843    /// A client for persist. Initially, this is only used for reading stashed
1844    /// peek responses out of batches.
1845    persist_client: PersistClient,
1846
1847    /// Channel to manage internal commands from the coordinator to itself.
1848    internal_cmd_tx: mpsc::UnboundedSender<Message>,
1849    /// Notification that triggers a group commit.
1850    group_commit_tx: appends::GroupCommitNotifier,
1851
1852    /// Channel for strict serializable reads ready to commit.
1853    strict_serializable_reads_tx: mpsc::UnboundedSender<(ConnectionId, PendingReadTxn)>,
1854
1855    /// Mechanism for totally ordering write and read timestamps, so that all reads
1856    /// reflect exactly the set of writes that precede them, and no writes that follow.
1857    global_timelines: BTreeMap<Timeline, TimelineState>,
1858
1859    /// A generator for transient [`GlobalId`]s, shareable with other threads.
1860    transient_id_gen: Arc<TransientIdGen>,
1861    /// A map from connection ID to metadata about that connection for all
1862    /// active connections.
1863    active_conns: BTreeMap<ConnectionId, ConnMeta>,
1864
1865    /// For each transaction, the read holds taken to support any performed reads.
1866    ///
1867    /// Upon completing a transaction, these read holds should be dropped.
1868    txn_read_holds: BTreeMap<ConnectionId, read_policy::ReadHolds>,
1869
1870    /// Access to the peek fields should be restricted to methods in the [`peek`] API.
1871    /// A map from pending peek ids to the queue into which responses are sent, and
1872    /// the connection id of the client that initiated the peek.
1873    pending_peeks: BTreeMap<Uuid, PendingPeek>,
1874    /// A map from client connection ids to a set of all pending peeks for that client.
1875    client_pending_peeks: BTreeMap<ConnectionId, BTreeMap<Uuid, ClusterId>>,
1876
1877    /// A map from client connection ids to pending linearize read transaction.
1878    pending_linearize_read_txns: BTreeMap<ConnectionId, PendingReadTxn>,
1879
1880    /// A map from the compute sink ID to it's state description.
1881    active_compute_sinks: BTreeMap<GlobalId, ActiveComputeSink>,
1882    /// A map from active webhooks to their invalidation handle.
1883    active_webhooks: BTreeMap<CatalogItemId, WebhookAppenderInvalidator>,
1884    /// A map of active `COPY FROM` statements. The Coordinator waits for `clusterd`
1885    /// to stage Batches in Persist that we will then link into the shard.
1886    active_copies: BTreeMap<ConnectionId, ActiveCopyFrom>,
1887
1888    /// Connection-scoped cancellation watches.
1889    ///
1890    /// Each entry is a watch channel whose value is `false` until cancellation
1891    /// is requested for that connection, at which point it is set to `true`.
1892    ///
1893    /// Consumers install/remove these watches while they have cancellable work
1894    /// in flight.
1895    connection_cancel_watches: BTreeMap<ConnectionId, (watch::Sender<bool>, watch::Receiver<bool>)>,
1896    /// Active introspection subscribes.
1897    introspection_subscribes: BTreeMap<GlobalId, IntrospectionSubscribe>,
1898
1899    /// Locks that grant access to a specific object, populated lazily as objects are written to.
1900    write_locks: BTreeMap<CatalogItemId, Arc<tokio::sync::Mutex<()>>>,
1901    /// Plans that are currently deferred and waiting on a write lock.
1902    deferred_write_ops: BTreeMap<ConnectionId, DeferredOp>,
1903
1904    /// Pending writes waiting for a group commit.
1905    pending_writes: Vec<PendingWriteTxn>,
1906
1907    /// For the realtime timeline, an explicit SELECT or INSERT on a table will bump the
1908    /// table's timestamps, but there are cases where timestamps are not bumped but
1909    /// we expect the closed timestamps to advance (`AS OF X`, SUBSCRIBing views over
1910    /// RT sources and tables). To address these, spawn a task that forces table
1911    /// timestamps to close on a regular interval. This roughly tracks the behavior
1912    /// of realtime sources that close off timestamps on an interval.
1913    ///
1914    /// For non-realtime timelines, nothing pushes the timestamps forward, so we must do
1915    /// it manually.
1916    advance_timelines_interval: Interval,
1917
1918    /// Serialized DDL. DDL must be serialized because:
1919    /// - Many of them do off-thread work and need to verify the catalog is in a valid state, but
1920    ///   [`PlanValidity`] does not currently support tracking all changes. Doing that correctly
1921    ///   seems to be more difficult than it's worth, so we would instead re-plan and re-sequence
1922    ///   the statements.
1923    /// - Re-planning a statement is hard because Coordinator and Session state is mutated at
1924    ///   various points, and we would need to correctly reset those changes before re-planning and
1925    ///   re-sequencing.
1926    serialized_ddl: LockedVecDeque<DeferredPlanStatement>,
1927
1928    /// Handle to secret manager that can create and delete secrets from
1929    /// an arbitrary secret storage engine.
1930    secrets_controller: Arc<dyn SecretsController>,
1931    /// A secrets reader than maintains an in-memory cache, where values have a set TTL.
1932    caching_secrets_reader: CachingSecretsReader,
1933
1934    /// Handle to a manager that can create and delete kubernetes resources
1935    /// (ie: VpcEndpoint objects)
1936    cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
1937
1938    /// Persist client for fetching storage metadata such as size metrics.
1939    storage_usage_client: StorageUsageClient,
1940    /// The interval at which to collect storage usage information.
1941    storage_usage_collection_interval: Duration,
1942
1943    /// Segment analytics client.
1944    #[derivative(Debug = "ignore")]
1945    segment_client: Option<mz_segment::Client>,
1946
1947    /// Coordinator metrics.
1948    metrics: Metrics,
1949    /// Optimizer metrics.
1950    optimizer_metrics: OptimizerMetrics,
1951
1952    /// Tracing handle.
1953    tracing_handle: TracingHandle,
1954
1955    /// Data used by the statement logging feature.
1956    statement_logging: StatementLogging,
1957
1958    /// Limit for how many concurrent webhook requests we allow.
1959    webhook_concurrency_limit: WebhookConcurrencyLimiter,
1960
1961    /// Optional config for the timestamp oracle. This is _required_ when
1962    /// a timestamp oracle backend is configured.
1963    timestamp_oracle_config: Option<TimestampOracleConfig>,
1964
1965    /// Periodically asks cluster scheduling policies to make their decisions.
1966    check_cluster_scheduling_policies_interval: Interval,
1967
1968    /// This keeps the last On/Off decision for each cluster and each scheduling policy.
1969    /// (Clusters that have been dropped or are otherwise out of scope for automatic scheduling are
1970    /// periodically cleaned up from this Map.)
1971    cluster_scheduling_decisions: BTreeMap<ClusterId, BTreeMap<&'static str, SchedulingDecision>>,
1972
1973    /// When doing 0dt upgrades/in read-only mode, periodically ask all known
1974    /// clusters/collections whether they are caught up.
1975    caught_up_check_interval: Interval,
1976
1977    /// Context needed to check whether all clusters/collections have caught up.
1978    /// Only used during 0dt deployment, while in read-only mode.
1979    caught_up_check: Option<CaughtUpCheckContext>,
1980
1981    /// Tracks the state associated with the currently installed watchsets.
1982    installed_watch_sets: BTreeMap<WatchSetId, (ConnectionId, WatchSetResponse)>,
1983
1984    /// Tracks the currently installed watchsets for each connection.
1985    connection_watch_sets: BTreeMap<ConnectionId, BTreeSet<WatchSetId>>,
1986
1987    /// Tracks the statuses of all cluster replicas.
1988    cluster_replica_statuses: ClusterReplicaStatuses,
1989
1990    /// Whether or not to start controllers in read-only mode. This is only
1991    /// meant for use during development of read-only clusters and 0dt upgrades
1992    /// and should go away once we have proper orchestration during upgrades.
1993    read_only_controllers: bool,
1994
1995    /// Updates to builtin tables that are being buffered while we are in
1996    /// read-only mode. We apply these all at once when coming out of read-only
1997    /// mode.
1998    ///
1999    /// This is a `Some` while in read-only mode and will be replaced by a
2000    /// `None` when we transition out of read-only mode and write out any
2001    /// buffered updates.
2002    buffered_builtin_table_updates: Option<Vec<BuiltinTableUpdate>>,
2003
2004    license_key: ValidatedLicenseKey,
2005
2006    /// Pre-allocated pool of user IDs to amortize persist writes across DDL operations.
2007    user_id_pool: IdPool,
2008}
2009
2010impl Coordinator {
2011    /// Initializes coordinator state based on the contained catalog. Must be
2012    /// called after creating the coordinator and before calling the
2013    /// `Coordinator::serve` method.
2014    #[instrument(name = "coord::bootstrap")]
2015    pub(crate) async fn bootstrap(
2016        &mut self,
2017        boot_ts: Timestamp,
2018        migrated_storage_collections_0dt: BTreeSet<CatalogItemId>,
2019        mut builtin_table_updates: Vec<BuiltinTableUpdate>,
2020        cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
2021        uncached_local_exprs: BTreeMap<GlobalId, LocalExpressions>,
2022        audit_logs_iterator: AuditLogIterator,
2023    ) -> Result<(), AdapterError> {
2024        let bootstrap_start = Instant::now();
2025        info!("startup: coordinator init: bootstrap beginning");
2026        info!("startup: coordinator init: bootstrap: preamble beginning");
2027
2028        // Initialize cluster replica statuses.
2029        // Gross iterator is to avoid partial borrow issues.
2030        let cluster_statuses: Vec<(_, Vec<_>)> = self
2031            .catalog()
2032            .clusters()
2033            .map(|cluster| {
2034                (
2035                    cluster.id(),
2036                    cluster
2037                        .replicas()
2038                        .map(|replica| {
2039                            (replica.replica_id, replica.config.location.num_processes())
2040                        })
2041                        .collect(),
2042                )
2043            })
2044            .collect();
2045        let now = self.now_datetime();
2046        for (cluster_id, replica_statuses) in cluster_statuses {
2047            self.cluster_replica_statuses
2048                .initialize_cluster_statuses(cluster_id);
2049            for (replica_id, num_processes) in replica_statuses {
2050                self.cluster_replica_statuses
2051                    .initialize_cluster_replica_statuses(
2052                        cluster_id,
2053                        replica_id,
2054                        num_processes,
2055                        now,
2056                    );
2057            }
2058        }
2059
2060        let system_config = self.catalog().system_config();
2061
2062        // Inform metrics about the initial system configuration.
2063        mz_metrics::update_dyncfg(&system_config.dyncfg_updates());
2064
2065        // Inform the controllers about their initial configuration.
2066        let compute_config = flags::compute_config(system_config);
2067        let storage_config = flags::storage_config(system_config);
2068        let scheduling_config = flags::orchestrator_scheduling_config(system_config);
2069        let dyncfg_updates = system_config.dyncfg_updates();
2070        self.controller.compute.update_configuration(compute_config);
2071        self.controller.storage.update_parameters(storage_config);
2072        self.controller
2073            .update_orchestrator_scheduling_config(scheduling_config);
2074        self.controller.update_configuration(dyncfg_updates);
2075
2076        // Skip the credit consumption check at bootstrap under DisableClusterCreation behavior:
2077        // this codepath validates existing replicas at startup, not cluster creation, so it
2078        // must not block startup. New cluster creation is still gated by the DDL-time check.
2079        // The Disable case is already handled by a bail! in main.rs before we reach here.
2080        let enforce_credit_limit_at_bootstrap = !matches!(
2081            self.license_key.expiration_behavior,
2082            ExpirationBehavior::DisableClusterCreation,
2083        );
2084        if enforce_credit_limit_at_bootstrap {
2085            self.validate_resource_limit_numeric(
2086                Numeric::zero(),
2087                self.current_credit_consumption_rate(),
2088                |system_vars| {
2089                    self.license_key
2090                        .max_credit_consumption_rate()
2091                        .map_or_else(|| system_vars.max_credit_consumption_rate(), Numeric::from)
2092                },
2093                "cluster replica",
2094                MAX_CREDIT_CONSUMPTION_RATE.name(),
2095            )?;
2096        }
2097
2098        let mut policies_to_set: BTreeMap<CompactionWindow, CollectionIdBundle> =
2099            Default::default();
2100
2101        let enable_worker_core_affinity =
2102            self.catalog().system_config().enable_worker_core_affinity();
2103        let enable_storage_introspection_logs = self
2104            .catalog()
2105            .system_config()
2106            .enable_storage_introspection_logs();
2107        for instance in self.catalog.clusters() {
2108            self.controller.create_cluster(
2109                instance.id,
2110                ClusterConfig {
2111                    arranged_logs: instance.log_indexes.clone(),
2112                    workload_class: instance.config.workload_class.clone(),
2113                },
2114            )?;
2115            for replica in instance.replicas() {
2116                let role = instance.role();
2117                self.controller.create_replica(
2118                    instance.id,
2119                    replica.replica_id,
2120                    instance.name.clone(),
2121                    replica.name.clone(),
2122                    role,
2123                    replica.config.clone(),
2124                    enable_worker_core_affinity,
2125                    enable_storage_introspection_logs,
2126                )?;
2127            }
2128        }
2129
2130        info!(
2131            "startup: coordinator init: bootstrap: preamble complete in {:?}",
2132            bootstrap_start.elapsed()
2133        );
2134
2135        let init_storage_collections_start = Instant::now();
2136        info!("startup: coordinator init: bootstrap: storage collections init beginning");
2137        self.bootstrap_storage_collections(&migrated_storage_collections_0dt)
2138            .await;
2139        info!(
2140            "startup: coordinator init: bootstrap: storage collections init complete in {:?}",
2141            init_storage_collections_start.elapsed()
2142        );
2143
2144        // The storage controller knows about the introspection collections now, so we can start
2145        // sinking introspection updates in the compute controller. It makes sense to do that as
2146        // soon as possible, to avoid updates piling up in the compute controller's internal
2147        // buffers.
2148        self.controller.start_compute_introspection_sink();
2149
2150        let sorting_start = Instant::now();
2151        info!("startup: coordinator init: bootstrap: sorting catalog entries");
2152        let entries = self.bootstrap_sort_catalog_entries();
2153        info!(
2154            "startup: coordinator init: bootstrap: sorting catalog entries complete in {:?}",
2155            sorting_start.elapsed()
2156        );
2157
2158        let optimize_dataflows_start = Instant::now();
2159        info!("startup: coordinator init: bootstrap: optimize dataflow plans beginning");
2160        let uncached_global_exps = self.bootstrap_dataflow_plans(&entries, cached_global_exprs)?;
2161        info!(
2162            "startup: coordinator init: bootstrap: optimize dataflow plans complete in {:?}",
2163            optimize_dataflows_start.elapsed()
2164        );
2165
2166        // We don't need to wait for the cache to update.
2167        let _fut = self.catalog().update_expression_cache(
2168            uncached_local_exprs.into_iter().collect(),
2169            uncached_global_exps.into_iter().collect(),
2170            Default::default(),
2171        );
2172
2173        // Select dataflow as-ofs. This step relies on the storage collections created by
2174        // `bootstrap_storage_collections` and the dataflow plans created by
2175        // `bootstrap_dataflow_plans`.
2176        let bootstrap_as_ofs_start = Instant::now();
2177        info!("startup: coordinator init: bootstrap: dataflow as-of bootstrapping beginning");
2178        let dataflow_read_holds = self.bootstrap_dataflow_as_ofs().await;
2179        info!(
2180            "startup: coordinator init: bootstrap: dataflow as-of bootstrapping complete in {:?}",
2181            bootstrap_as_ofs_start.elapsed()
2182        );
2183
2184        let postamble_start = Instant::now();
2185        info!("startup: coordinator init: bootstrap: postamble beginning");
2186
2187        let logs: BTreeSet<_> = BUILTINS::logs()
2188            .map(|log| self.catalog().resolve_builtin_log(log))
2189            .flat_map(|item_id| self.catalog().get_global_ids(&item_id))
2190            .collect();
2191
2192        let mut privatelink_connections = BTreeMap::new();
2193
2194        for entry in &entries {
2195            debug!(
2196                "coordinator init: installing {} {}",
2197                entry.item().typ(),
2198                entry.id()
2199            );
2200            let mut policy = entry.item().initial_logical_compaction_window();
2201            match entry.item() {
2202                // Currently catalog item rebuild assumes that sinks and
2203                // indexes are always built individually and does not store information
2204                // about how it was built. If we start building multiple sinks and/or indexes
2205                // using a single dataflow, we have to make sure the rebuild process re-runs
2206                // the same multiple-build dataflow.
2207                CatalogItem::Source(source) => {
2208                    // Propagate source compaction windows to subsources if needed.
2209                    if source.custom_logical_compaction_window.is_none() {
2210                        if let DataSourceDesc::IngestionExport { ingestion_id, .. } =
2211                            source.data_source
2212                        {
2213                            policy = Some(
2214                                self.catalog()
2215                                    .get_entry(&ingestion_id)
2216                                    .source()
2217                                    .expect("must be source")
2218                                    .custom_logical_compaction_window
2219                                    .unwrap_or_default(),
2220                            );
2221                        }
2222                    }
2223                    policies_to_set
2224                        .entry(policy.expect("sources have a compaction window"))
2225                        .or_insert_with(Default::default)
2226                        .storage_ids
2227                        .insert(source.global_id());
2228                }
2229                CatalogItem::Table(table) => {
2230                    policies_to_set
2231                        .entry(policy.expect("tables have a compaction window"))
2232                        .or_insert_with(Default::default)
2233                        .storage_ids
2234                        .extend(table.global_ids());
2235                }
2236                CatalogItem::Index(idx) => {
2237                    let policy_entry = policies_to_set
2238                        .entry(policy.expect("indexes have a compaction window"))
2239                        .or_insert_with(Default::default);
2240
2241                    if logs.contains(&idx.on) {
2242                        policy_entry
2243                            .compute_ids
2244                            .entry(idx.cluster_id)
2245                            .or_insert_with(BTreeSet::new)
2246                            .insert(idx.global_id());
2247                    } else {
2248                        let df_desc = self
2249                            .catalog()
2250                            .try_get_physical_plan(&idx.global_id())
2251                            .expect("added in `bootstrap_dataflow_plans`")
2252                            .clone();
2253
2254                        let df_meta = self
2255                            .catalog()
2256                            .try_get_dataflow_metainfo(&idx.global_id())
2257                            .expect("added in `bootstrap_dataflow_plans`");
2258
2259                        if self.catalog().state().system_config().enable_mz_notices() {
2260                            // Collect optimization hint updates.
2261                            self.catalog().state().pack_optimizer_notices(
2262                                &mut builtin_table_updates,
2263                                df_meta.optimizer_notices.iter(),
2264                                Diff::ONE,
2265                            );
2266                        }
2267
2268                        // What follows is morally equivalent to `self.ship_dataflow(df, idx.cluster_id)`,
2269                        // but we cannot call that as it will also downgrade the read hold on the index.
2270                        policy_entry
2271                            .compute_ids
2272                            .entry(idx.cluster_id)
2273                            .or_insert_with(Default::default)
2274                            .extend(df_desc.export_ids());
2275
2276                        self.controller
2277                            .compute
2278                            .create_dataflow(idx.cluster_id, df_desc, None)
2279                            .unwrap_or_terminate("cannot fail to create dataflows");
2280                    }
2281                }
2282                CatalogItem::View(_) => (),
2283                CatalogItem::MaterializedView(mview) => {
2284                    policies_to_set
2285                        .entry(policy.expect("materialized views have a compaction window"))
2286                        .or_insert_with(Default::default)
2287                        .storage_ids
2288                        .insert(mview.global_id_writes());
2289
2290                    let mut df_desc = self
2291                        .catalog()
2292                        .try_get_physical_plan(&mview.global_id_writes())
2293                        .expect("added in `bootstrap_dataflow_plans`")
2294                        .clone();
2295
2296                    if let Some(initial_as_of) = mview.initial_as_of.clone() {
2297                        df_desc.set_initial_as_of(initial_as_of);
2298                    }
2299
2300                    // If we have a refresh schedule that has a last refresh, then set the `until` to the last refresh.
2301                    let until = mview
2302                        .refresh_schedule
2303                        .as_ref()
2304                        .and_then(|s| s.last_refresh())
2305                        .and_then(|r| r.try_step_forward());
2306                    if let Some(until) = until {
2307                        df_desc.until.meet_assign(&Antichain::from_elem(until));
2308                    }
2309
2310                    let df_meta = self
2311                        .catalog()
2312                        .try_get_dataflow_metainfo(&mview.global_id_writes())
2313                        .expect("added in `bootstrap_dataflow_plans`");
2314
2315                    if self.catalog().state().system_config().enable_mz_notices() {
2316                        // Collect optimization hint updates.
2317                        self.catalog().state().pack_optimizer_notices(
2318                            &mut builtin_table_updates,
2319                            df_meta.optimizer_notices.iter(),
2320                            Diff::ONE,
2321                        );
2322                    }
2323
2324                    self.ship_dataflow(df_desc, mview.cluster_id, mview.target_replica)
2325                        .await;
2326
2327                    // If this is a replacement MV, it must remain read-only until the replacement
2328                    // gets applied.
2329                    if mview.replacement_target.is_none() {
2330                        self.allow_writes(mview.cluster_id, mview.global_id_writes());
2331                    }
2332                }
2333                CatalogItem::Sink(sink) => {
2334                    policies_to_set
2335                        .entry(CompactionWindow::Default)
2336                        .or_insert_with(Default::default)
2337                        .storage_ids
2338                        .insert(sink.global_id());
2339                }
2340                CatalogItem::Connection(catalog_connection) => {
2341                    if let ConnectionDetails::AwsPrivatelink(conn) = &catalog_connection.details {
2342                        privatelink_connections.insert(
2343                            entry.id(),
2344                            VpcEndpointConfig {
2345                                aws_service_name: conn.service_name.clone(),
2346                                availability_zone_ids: conn.availability_zones.clone(),
2347                            },
2348                        );
2349                    }
2350                }
2351                // Nothing to do for these cases
2352                CatalogItem::Log(_)
2353                | CatalogItem::Type(_)
2354                | CatalogItem::Func(_)
2355                | CatalogItem::Secret(_) => {}
2356            }
2357        }
2358
2359        if let Some(cloud_resource_controller) = &self.cloud_resource_controller {
2360            // Clean up any extraneous VpcEndpoints that shouldn't exist.
2361            let existing_vpc_endpoints = cloud_resource_controller
2362                .list_vpc_endpoints()
2363                .await
2364                .context("list vpc endpoints")?;
2365            let existing_vpc_endpoints = BTreeSet::from_iter(existing_vpc_endpoints.into_keys());
2366            let desired_vpc_endpoints = privatelink_connections.keys().cloned().collect();
2367            let vpc_endpoints_to_remove = existing_vpc_endpoints.difference(&desired_vpc_endpoints);
2368            for id in vpc_endpoints_to_remove {
2369                cloud_resource_controller
2370                    .delete_vpc_endpoint(*id)
2371                    .await
2372                    .context("deleting extraneous vpc endpoint")?;
2373            }
2374
2375            // Ensure desired VpcEndpoints are up to date.
2376            for (id, spec) in privatelink_connections {
2377                cloud_resource_controller
2378                    .ensure_vpc_endpoint(id, spec)
2379                    .await
2380                    .context("ensuring vpc endpoint")?;
2381            }
2382        }
2383
2384        // Having installed all entries, creating all constraints, we can now drop read holds and
2385        // relax read policies.
2386        drop(dataflow_read_holds);
2387        // TODO -- Improve `initialize_read_policies` API so we can avoid calling this in a loop.
2388        for (cw, policies) in policies_to_set {
2389            self.initialize_read_policies(&policies, cw).await;
2390        }
2391
2392        // Expose mapping from T-shirt sizes to actual sizes
2393        builtin_table_updates.extend(
2394            self.catalog().state().resolve_builtin_table_updates(
2395                self.catalog().state().pack_all_replica_size_updates(),
2396            ),
2397        );
2398
2399        debug!("startup: coordinator init: bootstrap: initializing migrated builtin tables");
2400        // When 0dt is enabled, we create new shards for any migrated builtin storage collections.
2401        // In read-only mode, the migrated builtin tables (which are a subset of migrated builtin
2402        // storage collections) need to be back-filled so that any dependent dataflow can be
2403        // hydrated. Additionally, these shards are not registered with the txn-shard, and cannot
2404        // be registered while in read-only, so they are written to directly.
2405        let migrated_updates_fut = if self.controller.read_only() {
2406            let min_timestamp = Timestamp::minimum();
2407            let migrated_builtin_table_updates: Vec<_> = builtin_table_updates
2408                .extract_if(.., |update| {
2409                    let gid = self.catalog().get_entry(&update.id).latest_global_id();
2410                    migrated_storage_collections_0dt.contains(&update.id)
2411                        && self
2412                            .controller
2413                            .storage_collections
2414                            .collection_frontiers(gid)
2415                            .expect("all tables are registered")
2416                            .write_frontier
2417                            .elements()
2418                            == &[min_timestamp]
2419                })
2420                .collect();
2421            if migrated_builtin_table_updates.is_empty() {
2422                futures::future::ready(()).boxed()
2423            } else {
2424                // Group all updates per-table.
2425                let mut grouped_appends: BTreeMap<GlobalId, Vec<TableData>> = BTreeMap::new();
2426                for update in migrated_builtin_table_updates {
2427                    let gid = self.catalog().get_entry(&update.id).latest_global_id();
2428                    grouped_appends.entry(gid).or_default().push(update.data);
2429                }
2430                info!(
2431                    "coordinator init: rehydrating migrated builtin tables in read-only mode: {:?}",
2432                    grouped_appends.keys().collect::<Vec<_>>()
2433                );
2434
2435                // Consolidate Row data, staged batches must already be consolidated.
2436                let mut all_appends = Vec::with_capacity(grouped_appends.len());
2437                for (item_id, table_data) in grouped_appends.into_iter() {
2438                    let mut all_rows = Vec::new();
2439                    let mut all_data = Vec::new();
2440                    for data in table_data {
2441                        match data {
2442                            TableData::Rows(rows) => all_rows.extend(rows),
2443                            TableData::Batches(_) => all_data.push(data),
2444                        }
2445                    }
2446                    differential_dataflow::consolidation::consolidate(&mut all_rows);
2447                    all_data.push(TableData::Rows(all_rows));
2448
2449                    // TODO(parkmycar): Use SmallVec throughout.
2450                    all_appends.push((item_id, all_data));
2451                }
2452
2453                let fut = self
2454                    .controller
2455                    .storage
2456                    .append_table(min_timestamp, boot_ts.step_forward(), all_appends)
2457                    .expect("cannot fail to append");
2458                async {
2459                    fut.await
2460                        .expect("One-shot shouldn't be dropped during bootstrap")
2461                        .unwrap_or_terminate("cannot fail to append")
2462                }
2463                .boxed()
2464            }
2465        } else {
2466            futures::future::ready(()).boxed()
2467        };
2468
2469        info!(
2470            "startup: coordinator init: bootstrap: postamble complete in {:?}",
2471            postamble_start.elapsed()
2472        );
2473
2474        let builtin_update_start = Instant::now();
2475        info!("startup: coordinator init: bootstrap: generate builtin updates beginning");
2476
2477        if self.controller.read_only() {
2478            info!(
2479                "coordinator init: bootstrap: stashing builtin table updates while in read-only mode"
2480            );
2481
2482            // TODO(jkosh44) Optimize deserializing the audit log in read-only mode.
2483            let audit_join_start = Instant::now();
2484            info!("startup: coordinator init: bootstrap: audit log deserialization beginning");
2485            let audit_log_updates: Vec<_> = audit_logs_iterator
2486                .map(|(audit_log, ts)| StateUpdate {
2487                    kind: StateUpdateKind::AuditLog(audit_log),
2488                    ts,
2489                    diff: StateDiff::Addition,
2490                })
2491                .collect();
2492            let audit_log_builtin_table_updates = self
2493                .catalog()
2494                .state()
2495                .generate_builtin_table_updates(audit_log_updates);
2496            builtin_table_updates.extend(audit_log_builtin_table_updates);
2497            info!(
2498                "startup: coordinator init: bootstrap: audit log deserialization complete in {:?}",
2499                audit_join_start.elapsed()
2500            );
2501            self.buffered_builtin_table_updates
2502                .as_mut()
2503                .expect("in read-only mode")
2504                .append(&mut builtin_table_updates);
2505        } else {
2506            self.bootstrap_tables(&entries, builtin_table_updates, audit_logs_iterator)
2507                .await;
2508        };
2509        info!(
2510            "startup: coordinator init: bootstrap: generate builtin updates complete in {:?}",
2511            builtin_update_start.elapsed()
2512        );
2513
2514        let cleanup_secrets_start = Instant::now();
2515        info!("startup: coordinator init: bootstrap: generate secret cleanup beginning");
2516        // Cleanup orphaned secrets. Errors during list() or delete() do not
2517        // need to prevent bootstrap from succeeding; we will retry next
2518        // startup.
2519        {
2520            // Destructure Self so we can selectively move fields into the async
2521            // task.
2522            let Self {
2523                secrets_controller,
2524                catalog,
2525                ..
2526            } = self;
2527
2528            let next_user_item_id = catalog.get_next_user_item_id().await?;
2529            let next_system_item_id = catalog.get_next_system_item_id().await?;
2530            let read_only = self.controller.read_only();
2531            // Fetch all IDs from the catalog to future-proof against other
2532            // things using secrets. Today, SECRET and CONNECTION objects use
2533            // secrets_controller.ensure, but more things could in the future
2534            // that would be easy to miss adding here.
2535            let catalog_ids: BTreeSet<CatalogItemId> =
2536                catalog.entries().map(|entry| entry.id()).collect();
2537            let secrets_controller = Arc::clone(secrets_controller);
2538
2539            spawn(|| "cleanup-orphaned-secrets", async move {
2540                if read_only {
2541                    info!(
2542                        "coordinator init: not cleaning up orphaned secrets while in read-only mode"
2543                    );
2544                    return;
2545                }
2546                info!("coordinator init: cleaning up orphaned secrets");
2547
2548                match secrets_controller.list().await {
2549                    Ok(controller_secrets) => {
2550                        let controller_secrets: BTreeSet<CatalogItemId> =
2551                            controller_secrets.into_iter().collect();
2552                        let orphaned = controller_secrets.difference(&catalog_ids);
2553                        for id in orphaned {
2554                            let id_too_large = match id {
2555                                CatalogItemId::System(id) => *id >= next_system_item_id,
2556                                CatalogItemId::User(id) => *id >= next_user_item_id,
2557                                CatalogItemId::IntrospectionSourceIndex(_)
2558                                | CatalogItemId::Transient(_) => false,
2559                            };
2560                            if id_too_large {
2561                                info!(
2562                                    %next_user_item_id, %next_system_item_id,
2563                                    "coordinator init: not deleting orphaned secret {id} that was likely created by a newer deploy generation"
2564                                );
2565                            } else {
2566                                info!("coordinator init: deleting orphaned secret {id}");
2567                                fail_point!("orphan_secrets");
2568                                if let Err(e) = secrets_controller.delete(*id).await {
2569                                    warn!(
2570                                        "Dropping orphaned secret has encountered an error: {}",
2571                                        e
2572                                    );
2573                                }
2574                            }
2575                        }
2576                    }
2577                    Err(e) => warn!("Failed to list secrets during orphan cleanup: {:?}", e),
2578                }
2579            });
2580        }
2581        info!(
2582            "startup: coordinator init: bootstrap: generate secret cleanup complete in {:?}",
2583            cleanup_secrets_start.elapsed()
2584        );
2585
2586        // Run all of our final steps concurrently.
2587        let final_steps_start = Instant::now();
2588        info!(
2589            "startup: coordinator init: bootstrap: migrate builtin tables in read-only mode beginning"
2590        );
2591        migrated_updates_fut
2592            .instrument(info_span!("coord::bootstrap::final"))
2593            .await;
2594
2595        debug!(
2596            "startup: coordinator init: bootstrap: announcing completion of initialization to controller"
2597        );
2598        // Announce the completion of initialization.
2599        self.controller.initialization_complete();
2600
2601        // Initialize unified introspection.
2602        self.bootstrap_introspection_subscribes().await;
2603
2604        info!(
2605            "startup: coordinator init: bootstrap: migrate builtin tables in read-only mode complete in {:?}",
2606            final_steps_start.elapsed()
2607        );
2608
2609        info!(
2610            "startup: coordinator init: bootstrap complete in {:?}",
2611            bootstrap_start.elapsed()
2612        );
2613        Ok(())
2614    }
2615
2616    /// Prepares tables for writing by resetting them to a known state and
2617    /// appending the given builtin table updates. The timestamp oracle
2618    /// will be advanced to the write timestamp of the append when this
2619    /// method returns.
2620    #[allow(clippy::async_yields_async)]
2621    #[instrument]
2622    async fn bootstrap_tables(
2623        &mut self,
2624        entries: &[CatalogEntry],
2625        mut builtin_table_updates: Vec<BuiltinTableUpdate>,
2626        audit_logs_iterator: AuditLogIterator,
2627    ) {
2628        /// Smaller helper struct of metadata for bootstrapping tables.
2629        struct TableMetadata<'a> {
2630            id: CatalogItemId,
2631            name: &'a QualifiedItemName,
2632            table: &'a Table,
2633        }
2634
2635        // Filter our entries down to just tables.
2636        let table_metas: Vec<_> = entries
2637            .into_iter()
2638            .filter_map(|entry| {
2639                entry.table().map(|table| TableMetadata {
2640                    id: entry.id(),
2641                    name: entry.name(),
2642                    table,
2643                })
2644            })
2645            .collect();
2646
2647        // Append empty batches to advance the timestamp of all tables.
2648        debug!("coordinator init: advancing all tables to current timestamp");
2649        let WriteTimestamp {
2650            timestamp: write_ts,
2651            advance_to,
2652        } = self.get_local_write_ts().await;
2653        let appends = table_metas
2654            .iter()
2655            .map(|meta| (meta.table.global_id_writes(), Vec::new()))
2656            .collect();
2657        // Append the tables in the background. We apply the write timestamp before getting a read
2658        // timestamp and reading a snapshot of each table, so the snapshots will block on their own
2659        // until the appends are complete.
2660        let table_fence_rx = self
2661            .controller
2662            .storage
2663            .append_table(write_ts.clone(), advance_to, appends)
2664            .expect("invalid updates");
2665
2666        self.apply_local_write(write_ts).await;
2667
2668        // Add builtin table updates the clear the contents of all system tables
2669        debug!("coordinator init: resetting system tables");
2670        let read_ts = self.get_local_read_ts().await;
2671
2672        // Filter out the 'mz_storage_usage_by_shard' table since we need to retain that info for
2673        // billing purposes.
2674        let mz_storage_usage_by_shard_schema: SchemaSpecifier = self
2675            .catalog()
2676            .resolve_system_schema(MZ_STORAGE_USAGE_BY_SHARD.schema)
2677            .into();
2678        let is_storage_usage_by_shard = |meta: &TableMetadata| -> bool {
2679            meta.name.item == MZ_STORAGE_USAGE_BY_SHARD.name
2680                && meta.name.qualifiers.schema_spec == mz_storage_usage_by_shard_schema
2681        };
2682
2683        let mut retraction_tasks = Vec::new();
2684        let mut system_tables: Vec<_> = table_metas
2685            .iter()
2686            .filter(|meta| meta.id.is_system() && !is_storage_usage_by_shard(meta))
2687            .collect();
2688
2689        // Special case audit events because it's append only.
2690        let (audit_events_idx, _) = system_tables
2691            .iter()
2692            .find_position(|table| {
2693                table.id == self.catalog().resolve_builtin_table(&MZ_AUDIT_EVENTS)
2694            })
2695            .expect("mz_audit_events must exist");
2696        let audit_events = system_tables.remove(audit_events_idx);
2697        let audit_log_task = self.bootstrap_audit_log_table(
2698            audit_events.id,
2699            audit_events.name,
2700            audit_events.table,
2701            audit_logs_iterator,
2702            read_ts,
2703        );
2704
2705        for system_table in system_tables {
2706            let table_id = system_table.id;
2707            let full_name = self.catalog().resolve_full_name(system_table.name, None);
2708            debug!("coordinator init: resetting system table {full_name} ({table_id})");
2709
2710            // Fetch the current contents of the table for retraction.
2711            let snapshot_fut = self
2712                .controller
2713                .storage_collections
2714                .snapshot_cursor(system_table.table.global_id_writes(), read_ts);
2715            let batch_fut = self
2716                .controller
2717                .storage_collections
2718                .create_update_builder(system_table.table.global_id_writes());
2719
2720            let task = spawn(|| format!("snapshot-{table_id}"), async move {
2721                // Create a TimestamplessUpdateBuilder.
2722                let mut batch = batch_fut
2723                    .await
2724                    .unwrap_or_terminate("cannot fail to create a batch for a BuiltinTable");
2725                tracing::info!(?table_id, "starting snapshot");
2726                // Get a cursor which will emit a consolidated snapshot.
2727                let mut snapshot_cursor = snapshot_fut
2728                    .await
2729                    .unwrap_or_terminate("cannot fail to snapshot");
2730
2731                // Retract the current contents, spilling into our builder.
2732                while let Some(values) = snapshot_cursor.next().await {
2733                    for (key, _t, d) in values {
2734                        let d_invert = d.neg();
2735                        batch.add(&key, &(), &d_invert).await;
2736                    }
2737                }
2738                tracing::info!(?table_id, "finished snapshot");
2739
2740                let batch = batch.finish().await;
2741                BuiltinTableUpdate::batch(table_id, batch)
2742            });
2743            retraction_tasks.push(task);
2744        }
2745
2746        let retractions_res = futures::future::join_all(retraction_tasks).await;
2747        for retractions in retractions_res {
2748            builtin_table_updates.push(retractions);
2749        }
2750
2751        let audit_join_start = Instant::now();
2752        info!("startup: coordinator init: bootstrap: join audit log deserialization beginning");
2753        let audit_log_updates = audit_log_task.await;
2754        let audit_log_builtin_table_updates = self
2755            .catalog()
2756            .state()
2757            .generate_builtin_table_updates(audit_log_updates);
2758        builtin_table_updates.extend(audit_log_builtin_table_updates);
2759        info!(
2760            "startup: coordinator init: bootstrap: join audit log deserialization complete in {:?}",
2761            audit_join_start.elapsed()
2762        );
2763
2764        // Now that the snapshots are complete, the appends must also be complete.
2765        table_fence_rx
2766            .await
2767            .expect("One-shot shouldn't be dropped during bootstrap")
2768            .unwrap_or_terminate("cannot fail to append");
2769
2770        info!("coordinator init: sending builtin table updates");
2771        let (_builtin_updates_fut, write_ts) = self
2772            .builtin_table_update()
2773            .execute(builtin_table_updates)
2774            .await;
2775        info!(?write_ts, "our write ts");
2776        if let Some(write_ts) = write_ts {
2777            self.apply_local_write(write_ts).await;
2778        }
2779    }
2780
2781    /// Prepare updates to the audit log table. The audit log table append only and very large, so
2782    /// we only need to find the events present in `audit_logs_iterator` but not in the audit log
2783    /// table.
2784    #[instrument]
2785    fn bootstrap_audit_log_table<'a>(
2786        &self,
2787        table_id: CatalogItemId,
2788        name: &'a QualifiedItemName,
2789        table: &'a Table,
2790        audit_logs_iterator: AuditLogIterator,
2791        read_ts: Timestamp,
2792    ) -> JoinHandle<Vec<StateUpdate>> {
2793        let full_name = self.catalog().resolve_full_name(name, None);
2794        debug!("coordinator init: reconciling audit log: {full_name} ({table_id})");
2795        let current_contents_fut = self
2796            .controller
2797            .storage_collections
2798            .snapshot(table.global_id_writes(), read_ts);
2799        spawn(|| format!("snapshot-audit-log-{table_id}"), async move {
2800            let current_contents = current_contents_fut
2801                .await
2802                .unwrap_or_terminate("cannot fail to fetch snapshot");
2803            let contents_len = current_contents.len();
2804            debug!("coordinator init: audit log table ({table_id}) size {contents_len}");
2805
2806            // Fetch the largest audit log event ID that has been written to the table.
2807            let max_table_id = current_contents
2808                .into_iter()
2809                .filter(|(_, diff)| *diff == 1)
2810                .map(|(row, _diff)| row.unpack_first().unwrap_uint64())
2811                .sorted()
2812                .rev()
2813                .next();
2814
2815            // Filter audit log catalog updates to those that are not present in the table.
2816            audit_logs_iterator
2817                .take_while(|(audit_log, _)| match max_table_id {
2818                    Some(id) => audit_log.event.sortable_id() > id,
2819                    None => true,
2820                })
2821                .map(|(audit_log, ts)| StateUpdate {
2822                    kind: StateUpdateKind::AuditLog(audit_log),
2823                    ts,
2824                    diff: StateDiff::Addition,
2825                })
2826                .collect::<Vec<_>>()
2827        })
2828    }
2829
2830    /// Initializes all storage collections required by catalog objects in the storage controller.
2831    ///
2832    /// This method takes care of collection creation, as well as migration of existing
2833    /// collections.
2834    ///
2835    /// Creating all storage collections in a single `create_collections` call, rather than on
2836    /// demand, is more efficient as it reduces the number of writes to durable storage. It also
2837    /// allows subsequent bootstrap logic to fetch metadata (such as frontiers) of arbitrary
2838    /// storage collections, without needing to worry about dependency order.
2839    ///
2840    /// `migrated_storage_collections` is a set of builtin storage collections that have been
2841    /// migrated and should be handled specially.
2842    #[instrument]
2843    async fn bootstrap_storage_collections(
2844        &mut self,
2845        migrated_storage_collections: &BTreeSet<CatalogItemId>,
2846    ) {
2847        let catalog = self.catalog();
2848
2849        let source_desc = |object_id: GlobalId,
2850                           data_source: &DataSourceDesc,
2851                           desc: &RelationDesc,
2852                           timeline: &Timeline| {
2853            let data_source = match data_source.clone() {
2854                // Re-announce the source description.
2855                DataSourceDesc::Ingestion { desc, cluster_id } => {
2856                    let desc = desc.into_inline_connection(catalog.state());
2857                    let ingestion = IngestionDescription::new(desc, cluster_id, object_id);
2858                    DataSource::Ingestion(ingestion)
2859                }
2860                DataSourceDesc::OldSyntaxIngestion {
2861                    desc,
2862                    progress_subsource,
2863                    data_config,
2864                    details,
2865                    cluster_id,
2866                } => {
2867                    let desc = desc.into_inline_connection(catalog.state());
2868                    let data_config = data_config.into_inline_connection(catalog.state());
2869                    // TODO(parkmycar): We should probably check the type here, but I'm not sure if
2870                    // this will always be a Source or a Table.
2871                    let progress_subsource =
2872                        catalog.get_entry(&progress_subsource).latest_global_id();
2873                    let mut ingestion =
2874                        IngestionDescription::new(desc, cluster_id, progress_subsource);
2875                    let legacy_export = SourceExport {
2876                        storage_metadata: (),
2877                        data_config,
2878                        details,
2879                    };
2880                    ingestion.source_exports.insert(object_id, legacy_export);
2881
2882                    DataSource::Ingestion(ingestion)
2883                }
2884                DataSourceDesc::IngestionExport {
2885                    ingestion_id,
2886                    external_reference: _,
2887                    details,
2888                    data_config,
2889                } => {
2890                    // TODO(parkmycar): We should probably check the type here, but I'm not sure if
2891                    // this will always be a Source or a Table.
2892                    let ingestion_id = catalog.get_entry(&ingestion_id).latest_global_id();
2893
2894                    DataSource::IngestionExport {
2895                        ingestion_id,
2896                        details,
2897                        data_config: data_config.into_inline_connection(catalog.state()),
2898                    }
2899                }
2900                DataSourceDesc::Webhook { .. } => DataSource::Webhook,
2901                DataSourceDesc::Progress => DataSource::Progress,
2902                DataSourceDesc::Introspection(introspection) => {
2903                    DataSource::Introspection(introspection)
2904                }
2905                DataSourceDesc::Catalog => DataSource::Other,
2906            };
2907            CollectionDescription {
2908                desc: desc.clone(),
2909                data_source,
2910                since: None,
2911                timeline: Some(timeline.clone()),
2912                primary: None,
2913            }
2914        };
2915
2916        let mut compute_collections = vec![];
2917        let mut collections = vec![];
2918        for entry in catalog.entries() {
2919            match entry.item() {
2920                CatalogItem::Source(source) => {
2921                    collections.push((
2922                        source.global_id(),
2923                        source_desc(
2924                            source.global_id(),
2925                            &source.data_source,
2926                            &source.desc,
2927                            &source.timeline,
2928                        ),
2929                    ));
2930                }
2931                CatalogItem::Table(table) => {
2932                    match &table.data_source {
2933                        TableDataSource::TableWrites { defaults: _ } => {
2934                            let versions: BTreeMap<_, _> = table
2935                                .collection_descs()
2936                                .map(|(gid, version, desc)| (version, (gid, desc)))
2937                                .collect();
2938                            let collection_descs = versions.iter().map(|(version, (gid, desc))| {
2939                                let next_version = version.bump();
2940                                let primary_collection =
2941                                    versions.get(&next_version).map(|(gid, _desc)| gid).copied();
2942                                let mut collection_desc =
2943                                    CollectionDescription::for_table(desc.clone());
2944                                collection_desc.primary = primary_collection;
2945
2946                                (*gid, collection_desc)
2947                            });
2948                            collections.extend(collection_descs);
2949                        }
2950                        TableDataSource::DataSource {
2951                            desc: data_source_desc,
2952                            timeline,
2953                        } => {
2954                            // TODO(alter_table): Support versioning tables that read from sources.
2955                            soft_assert_eq_or_log!(table.collections.len(), 1);
2956                            let collection_descs =
2957                                table.collection_descs().map(|(gid, _version, desc)| {
2958                                    (
2959                                        gid,
2960                                        source_desc(
2961                                            entry.latest_global_id(),
2962                                            data_source_desc,
2963                                            &desc,
2964                                            timeline,
2965                                        ),
2966                                    )
2967                                });
2968                            collections.extend(collection_descs);
2969                        }
2970                    };
2971                }
2972                CatalogItem::MaterializedView(mv) => {
2973                    let collection_descs = mv.collection_descs().map(|(gid, _version, desc)| {
2974                        let collection_desc =
2975                            CollectionDescription::for_other(desc, mv.initial_as_of.clone());
2976                        (gid, collection_desc)
2977                    });
2978
2979                    collections.extend(collection_descs);
2980                    compute_collections.push((mv.global_id_writes(), mv.desc.latest()));
2981                }
2982                CatalogItem::Sink(sink) => {
2983                    let storage_sink_from_entry = self.catalog().get_entry_by_global_id(&sink.from);
2984                    let from_desc = storage_sink_from_entry
2985                        .relation_desc()
2986                        .expect("sinks can only be built on items with descs")
2987                        .into_owned();
2988                    let collection_desc = CollectionDescription {
2989                        // TODO(sinks): make generic once we have more than one sink type.
2990                        desc: KAFKA_PROGRESS_DESC.clone(),
2991                        data_source: DataSource::Sink {
2992                            desc: ExportDescription {
2993                                sink: StorageSinkDesc {
2994                                    from: sink.from,
2995                                    from_desc,
2996                                    connection: sink
2997                                        .connection
2998                                        .clone()
2999                                        .into_inline_connection(self.catalog().state()),
3000                                    envelope: sink.envelope,
3001                                    as_of: Antichain::from_elem(Timestamp::minimum()),
3002                                    with_snapshot: sink.with_snapshot,
3003                                    version: sink.version,
3004                                    from_storage_metadata: (),
3005                                    to_storage_metadata: (),
3006                                    commit_interval: sink.commit_interval,
3007                                },
3008                                instance_id: sink.cluster_id,
3009                            },
3010                        },
3011                        since: None,
3012                        timeline: None,
3013                        primary: None,
3014                    };
3015                    collections.push((sink.global_id, collection_desc));
3016                }
3017                CatalogItem::Log(_)
3018                | CatalogItem::View(_)
3019                | CatalogItem::Index(_)
3020                | CatalogItem::Type(_)
3021                | CatalogItem::Func(_)
3022                | CatalogItem::Secret(_)
3023                | CatalogItem::Connection(_) => (),
3024            }
3025        }
3026
3027        let register_ts = if self.controller.read_only() {
3028            self.get_local_read_ts().await
3029        } else {
3030            // Getting a write timestamp bumps the write timestamp in the
3031            // oracle, which we're not allowed in read-only mode.
3032            self.get_local_write_ts().await.timestamp
3033        };
3034
3035        let storage_metadata = self.catalog.state().storage_metadata();
3036        let migrated_storage_collections = migrated_storage_collections
3037            .into_iter()
3038            .flat_map(|item_id| self.catalog.get_entry(item_id).global_ids())
3039            .collect();
3040
3041        // Before possibly creating collections, make sure their schemas are correct.
3042        //
3043        // Across different versions of Materialize the nullability of columns can change based on
3044        // updates to our optimizer.
3045        self.controller
3046            .storage
3047            .evolve_nullability_for_bootstrap(storage_metadata, compute_collections)
3048            .await
3049            .unwrap_or_terminate("cannot fail to evolve collections");
3050
3051        // New builtin storage collections are by default created with [0] since/upper frontiers.
3052        // For collections that have dependencies on other collections (MVs, CTs), this can violate
3053        // the frontier invariants assumed by as-of selection. For example, as-of selection expects
3054        // to be able to pick up computing a materialized view from its most recent upper, but if
3055        // that upper is [0] it's likely that the required times are not available anymore in the
3056        // MV inputs.
3057        //
3058        // To avoid violating frontier invariants, we need to bump their sinces to times greater
3059        // than all of their upstream storage inputs. To know the since of a storage input, it has
3060        // to be registered with the storage controller first. Thus we register collections in
3061        // layers: Each iteration registers the collections whose dependencies are all already
3062        // registered.
3063        let mut pending: BTreeMap<_, _> = collections.into_iter().collect();
3064
3065        // Precompute storage-collection dependencies for each collection.
3066        let transitive_dep_gids: BTreeMap<_, _> = pending
3067            .keys()
3068            .map(|gid| {
3069                let entry = self.catalog.get_entry_by_global_id(gid);
3070                let item_id = entry.id();
3071                let deps = self.catalog.state().transitive_uses(item_id);
3072                let dep_gids: BTreeSet<_> = deps
3073                    // Ignore self-dependencies. For example, `transitive_uses` includes the input ID,
3074                    // and CTs can depend on themselves.
3075                    .filter(|dep_id| *dep_id != item_id)
3076                    .map(|dep_id| self.catalog.get_entry(&dep_id).latest_global_id())
3077                    // Ignore dependencies on objects that are not storage collections.
3078                    .filter(|dep_gid| pending.contains_key(dep_gid))
3079                    .collect();
3080                (*gid, dep_gids)
3081            })
3082            .collect();
3083
3084        while !pending.is_empty() {
3085            // Drain collections whose dependencies have all been registered already
3086            // (i.e., are not in `pending`).
3087            let ready_gids: BTreeSet<_> = pending
3088                .keys()
3089                .filter(|gid| {
3090                    let mut deps = transitive_dep_gids[gid].iter();
3091                    !deps.any(|dep_gid| pending.contains_key(dep_gid))
3092                })
3093                .copied()
3094                .collect();
3095            let mut ready: Vec<_> = pending
3096                .extract_if(.., |gid, _| ready_gids.contains(gid))
3097                .collect();
3098
3099            // Bump sinces of builtin collections.
3100            for (gid, collection) in &mut ready {
3101                // Don't silently overwrite an explicitly specified `since`.
3102                if !gid.is_system() || collection.since.is_some() {
3103                    continue;
3104                }
3105
3106                let mut derived_since = Antichain::from_elem(Timestamp::MIN);
3107                for dep_gid in &transitive_dep_gids[gid] {
3108                    let (since, _) = self
3109                        .controller
3110                        .storage
3111                        .collection_frontiers(*dep_gid)
3112                        .expect("previously registered");
3113                    derived_since.join_assign(&since);
3114                }
3115                collection.since = Some(derived_since);
3116            }
3117
3118            if ready.is_empty() {
3119                soft_panic_or_log!(
3120                    "cycle in storage collections: {:?}",
3121                    pending.keys().collect::<Vec<_>>(),
3122                );
3123                // We get here only due to a bug. Rather than crash-looping, we try our best to
3124                // reach a sane state by attempting to register all the remaining collections at
3125                // once.
3126                ready = mem::take(&mut pending).into_iter().collect();
3127            }
3128
3129            self.controller
3130                .storage
3131                .create_collections_for_bootstrap(
3132                    storage_metadata,
3133                    Some(register_ts),
3134                    ready,
3135                    &migrated_storage_collections,
3136                )
3137                .await
3138                .unwrap_or_terminate("cannot fail to create collections");
3139        }
3140
3141        if !self.controller.read_only() {
3142            self.apply_local_write(register_ts).await;
3143        }
3144    }
3145
3146    /// Returns the current list of catalog entries, sorted into an appropriate order for
3147    /// bootstrapping.
3148    ///
3149    /// The returned entries are in dependency order. Indexes are sorted immediately after the
3150    /// objects they index, to ensure that all dependants of these indexed objects can make use of
3151    /// the respective indexes.
3152    fn bootstrap_sort_catalog_entries(&self) -> Vec<CatalogEntry> {
3153        let mut indexes_on = BTreeMap::<_, Vec<_>>::new();
3154        let mut non_indexes = Vec::new();
3155        for entry in self.catalog().entries().cloned() {
3156            if let Some(index) = entry.index() {
3157                let on = self.catalog().get_entry_by_global_id(&index.on);
3158                indexes_on.entry(on.id()).or_default().push(entry);
3159            } else {
3160                non_indexes.push(entry);
3161            }
3162        }
3163
3164        let key_fn = |entry: &CatalogEntry| entry.id;
3165        let dependencies_fn = |entry: &CatalogEntry| entry.uses();
3166        sort_topological(&mut non_indexes, key_fn, dependencies_fn);
3167
3168        let mut result = Vec::new();
3169        for entry in non_indexes {
3170            let id = entry.id();
3171            result.push(entry);
3172            if let Some(mut indexes) = indexes_on.remove(&id) {
3173                result.append(&mut indexes);
3174            }
3175        }
3176
3177        soft_assert_or_log!(
3178            indexes_on.is_empty(),
3179            "indexes with missing dependencies: {indexes_on:?}",
3180        );
3181
3182        result
3183    }
3184
3185    /// Invokes the optimizer on all indexes and materialized views in the catalog and inserts the
3186    /// resulting dataflow plans into the catalog state.
3187    ///
3188    /// `ordered_catalog_entries` must be sorted in dependency order, with dependencies ordered
3189    /// before their dependants.
3190    ///
3191    /// This method does not perform timestamp selection for the dataflows, nor does it create them
3192    /// in the compute controller. Both of these steps happen later during bootstrapping.
3193    ///
3194    /// Returns a map of expressions that were not cached.
3195    #[instrument]
3196    fn bootstrap_dataflow_plans(
3197        &mut self,
3198        ordered_catalog_entries: &[CatalogEntry],
3199        mut cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
3200    ) -> Result<BTreeMap<GlobalId, GlobalExpressions>, AdapterError> {
3201        // The optimizer expects to be able to query its `ComputeInstanceSnapshot` for
3202        // collections the current dataflow can depend on. But since we don't yet install anything
3203        // on compute instances, the snapshot information is incomplete. We fix that by manually
3204        // updating `ComputeInstanceSnapshot` objects to ensure they contain collections previously
3205        // optimized.
3206        let mut instance_snapshots = BTreeMap::new();
3207        let mut uncached_expressions = BTreeMap::new();
3208
3209        let optimizer_config = |catalog: &Catalog, cluster_id| {
3210            let system_config = catalog.system_config();
3211            let overrides = catalog.get_cluster(cluster_id).config.features();
3212            OptimizerConfig::from(system_config).override_from(&overrides)
3213        };
3214
3215        for entry in ordered_catalog_entries {
3216            match entry.item() {
3217                CatalogItem::Index(idx) => {
3218                    // Collect optimizer parameters.
3219                    let compute_instance =
3220                        instance_snapshots.entry(idx.cluster_id).or_insert_with(|| {
3221                            self.instance_snapshot(idx.cluster_id)
3222                                .expect("compute instance exists")
3223                        });
3224                    let global_id = idx.global_id();
3225
3226                    // The index may already be installed on the compute instance. For example,
3227                    // this is the case for introspection indexes.
3228                    if compute_instance.contains_collection(&global_id) {
3229                        continue;
3230                    }
3231
3232                    let optimizer_config = optimizer_config(&self.catalog, idx.cluster_id);
3233
3234                    let (optimized_plan, physical_plan, metainfo) =
3235                        match cached_global_exprs.remove(&global_id) {
3236                            Some(global_expressions)
3237                                if global_expressions.optimizer_features
3238                                    == optimizer_config.features =>
3239                            {
3240                                debug!("global expression cache hit for {global_id:?}");
3241                                (
3242                                    global_expressions.global_mir,
3243                                    global_expressions.physical_plan,
3244                                    global_expressions.dataflow_metainfos,
3245                                )
3246                            }
3247                            Some(_) | None => {
3248                                let (optimized_plan, global_lir_plan) = {
3249                                    // Build an optimizer for this INDEX.
3250                                    let mut optimizer = optimize::index::Optimizer::new(
3251                                        self.owned_catalog(),
3252                                        compute_instance.clone(),
3253                                        global_id,
3254                                        optimizer_config.clone(),
3255                                        self.optimizer_metrics(),
3256                                    );
3257
3258                                    // MIR ⇒ MIR optimization (global)
3259                                    let index_plan = optimize::index::Index::new(
3260                                        entry.name().clone(),
3261                                        idx.on,
3262                                        idx.keys.to_vec(),
3263                                    );
3264                                    let global_mir_plan = optimizer.optimize(index_plan)?;
3265                                    let optimized_plan = global_mir_plan.df_desc().clone();
3266
3267                                    // MIR ⇒ LIR lowering and LIR ⇒ LIR optimization (global)
3268                                    let global_lir_plan = optimizer.optimize(global_mir_plan)?;
3269
3270                                    (optimized_plan, global_lir_plan)
3271                                };
3272
3273                                let (physical_plan, metainfo) = global_lir_plan.unapply();
3274                                let metainfo = {
3275                                    // Pre-allocate a vector of transient GlobalIds for each notice.
3276                                    let notice_ids =
3277                                        std::iter::repeat_with(|| self.allocate_transient_id())
3278                                            .map(|(_item_id, gid)| gid)
3279                                            .take(metainfo.optimizer_notices.len())
3280                                            .collect::<Vec<_>>();
3281                                    // Return a metainfo with rendered notices.
3282                                    self.catalog().render_notices(
3283                                        metainfo,
3284                                        notice_ids,
3285                                        Some(idx.global_id()),
3286                                    )
3287                                };
3288                                uncached_expressions.insert(
3289                                    global_id,
3290                                    GlobalExpressions {
3291                                        global_mir: optimized_plan.clone(),
3292                                        physical_plan: physical_plan.clone(),
3293                                        dataflow_metainfos: metainfo.clone(),
3294                                        optimizer_features: optimizer_config.features.clone(),
3295                                    },
3296                                );
3297                                (optimized_plan, physical_plan, metainfo)
3298                            }
3299                        };
3300
3301                    let catalog = self.catalog_mut();
3302                    catalog.set_optimized_plan(idx.global_id(), optimized_plan);
3303                    catalog.set_physical_plan(idx.global_id(), physical_plan);
3304                    catalog.set_dataflow_metainfo(idx.global_id(), metainfo);
3305
3306                    compute_instance.insert_collection(idx.global_id());
3307                }
3308                CatalogItem::MaterializedView(mv) => {
3309                    // Collect optimizer parameters.
3310                    let compute_instance =
3311                        instance_snapshots.entry(mv.cluster_id).or_insert_with(|| {
3312                            self.instance_snapshot(mv.cluster_id)
3313                                .expect("compute instance exists")
3314                        });
3315                    let global_id = mv.global_id_writes();
3316
3317                    let optimizer_config = optimizer_config(&self.catalog, mv.cluster_id);
3318
3319                    let (optimized_plan, physical_plan, metainfo) = match cached_global_exprs
3320                        .remove(&global_id)
3321                    {
3322                        Some(global_expressions)
3323                            if global_expressions.optimizer_features
3324                                == optimizer_config.features =>
3325                        {
3326                            debug!("global expression cache hit for {global_id:?}");
3327                            (
3328                                global_expressions.global_mir,
3329                                global_expressions.physical_plan,
3330                                global_expressions.dataflow_metainfos,
3331                            )
3332                        }
3333                        Some(_) | None => {
3334                            let (_, internal_view_id) = self.allocate_transient_id();
3335                            let debug_name = self
3336                                .catalog()
3337                                .resolve_full_name(entry.name(), None)
3338                                .to_string();
3339
3340                            let (optimized_plan, global_lir_plan) = {
3341                                // Build an optimizer for this MATERIALIZED VIEW.
3342                                let mut optimizer = optimize::materialized_view::Optimizer::new(
3343                                    self.owned_catalog().as_optimizer_catalog(),
3344                                    compute_instance.clone(),
3345                                    global_id,
3346                                    internal_view_id,
3347                                    mv.desc.latest().iter_names().cloned().collect(),
3348                                    mv.non_null_assertions.clone(),
3349                                    mv.refresh_schedule.clone(),
3350                                    debug_name,
3351                                    optimizer_config.clone(),
3352                                    self.optimizer_metrics(),
3353                                );
3354
3355                                // MIR ⇒ MIR optimization (global)
3356                                // We make sure to use the HIR SQL type (since MIR SQL types may not be coherent).
3357                                let typ = infer_sql_type_for_catalog(
3358                                    &mv.raw_expr,
3359                                    &mv.locally_optimized_expr.as_ref().clone(),
3360                                );
3361                                let global_mir_plan = optimizer
3362                                    .optimize((mv.locally_optimized_expr.as_ref().clone(), typ))?;
3363                                let optimized_plan = global_mir_plan.df_desc().clone();
3364
3365                                // MIR ⇒ LIR lowering and LIR ⇒ LIR optimization (global)
3366                                let global_lir_plan = optimizer.optimize(global_mir_plan)?;
3367
3368                                (optimized_plan, global_lir_plan)
3369                            };
3370
3371                            let (physical_plan, metainfo) = global_lir_plan.unapply();
3372                            let metainfo = {
3373                                // Pre-allocate a vector of transient GlobalIds for each notice.
3374                                let notice_ids =
3375                                    std::iter::repeat_with(|| self.allocate_transient_id())
3376                                        .map(|(_item_id, global_id)| global_id)
3377                                        .take(metainfo.optimizer_notices.len())
3378                                        .collect::<Vec<_>>();
3379                                // Return a metainfo with rendered notices.
3380                                self.catalog().render_notices(
3381                                    metainfo,
3382                                    notice_ids,
3383                                    Some(mv.global_id_writes()),
3384                                )
3385                            };
3386                            uncached_expressions.insert(
3387                                global_id,
3388                                GlobalExpressions {
3389                                    global_mir: optimized_plan.clone(),
3390                                    physical_plan: physical_plan.clone(),
3391                                    dataflow_metainfos: metainfo.clone(),
3392                                    optimizer_features: optimizer_config.features.clone(),
3393                                },
3394                            );
3395                            (optimized_plan, physical_plan, metainfo)
3396                        }
3397                    };
3398
3399                    let catalog = self.catalog_mut();
3400                    catalog.set_optimized_plan(mv.global_id_writes(), optimized_plan);
3401                    catalog.set_physical_plan(mv.global_id_writes(), physical_plan);
3402                    catalog.set_dataflow_metainfo(mv.global_id_writes(), metainfo);
3403
3404                    compute_instance.insert_collection(mv.global_id_writes());
3405                }
3406                CatalogItem::Table(_)
3407                | CatalogItem::Source(_)
3408                | CatalogItem::Log(_)
3409                | CatalogItem::View(_)
3410                | CatalogItem::Sink(_)
3411                | CatalogItem::Type(_)
3412                | CatalogItem::Func(_)
3413                | CatalogItem::Secret(_)
3414                | CatalogItem::Connection(_) => (),
3415            }
3416        }
3417
3418        Ok(uncached_expressions)
3419    }
3420
3421    /// Selects for each compute dataflow an as-of suitable for bootstrapping it.
3422    ///
3423    /// Returns a set of [`ReadHold`]s that ensures the read frontiers of involved collections stay
3424    /// in place and that must not be dropped before all compute dataflows have been created with
3425    /// the compute controller.
3426    ///
3427    /// This method expects all storage collections and dataflow plans to be available, so it must
3428    /// run after [`Coordinator::bootstrap_storage_collections`] and
3429    /// [`Coordinator::bootstrap_dataflow_plans`].
3430    async fn bootstrap_dataflow_as_ofs(&mut self) -> BTreeMap<GlobalId, ReadHold> {
3431        let mut catalog_ids = Vec::new();
3432        let mut dataflows = Vec::new();
3433        let mut read_policies = BTreeMap::new();
3434        for entry in self.catalog.entries() {
3435            let gid = match entry.item() {
3436                CatalogItem::Index(idx) => idx.global_id(),
3437                CatalogItem::MaterializedView(mv) => mv.global_id_writes(),
3438                CatalogItem::Table(_)
3439                | CatalogItem::Source(_)
3440                | CatalogItem::Log(_)
3441                | CatalogItem::View(_)
3442                | CatalogItem::Sink(_)
3443                | CatalogItem::Type(_)
3444                | CatalogItem::Func(_)
3445                | CatalogItem::Secret(_)
3446                | CatalogItem::Connection(_) => continue,
3447            };
3448            if let Some(plan) = self.catalog.try_get_physical_plan(&gid) {
3449                catalog_ids.push(gid);
3450                dataflows.push(plan.clone());
3451
3452                if let Some(compaction_window) = entry.item().initial_logical_compaction_window() {
3453                    read_policies.insert(gid, compaction_window.into());
3454                }
3455            }
3456        }
3457
3458        let read_ts = self.get_local_read_ts().await;
3459        let read_holds = as_of_selection::run(
3460            &mut dataflows,
3461            &read_policies,
3462            &*self.controller.storage_collections,
3463            read_ts,
3464            self.controller.read_only(),
3465        );
3466
3467        let catalog = self.catalog_mut();
3468        for (id, plan) in catalog_ids.into_iter().zip_eq(dataflows) {
3469            catalog.set_physical_plan(id, plan);
3470        }
3471
3472        read_holds
3473    }
3474
3475    /// Serves the coordinator, receiving commands from users over `cmd_rx`
3476    /// and feedback from dataflow workers over `feedback_rx`.
3477    ///
3478    /// You must call `bootstrap` before calling this method.
3479    ///
3480    /// BOXED FUTURE: As of Nov 2023 the returned Future from this function was 92KB. This would
3481    /// get stored on the stack which is bad for runtime performance, and blow up our stack usage.
3482    /// Because of that we purposefully move this Future onto the heap (i.e. Box it).
3483    fn serve(
3484        mut self,
3485        mut internal_cmd_rx: mpsc::UnboundedReceiver<Message>,
3486        mut strict_serializable_reads_rx: mpsc::UnboundedReceiver<(ConnectionId, PendingReadTxn)>,
3487        mut cmd_rx: mpsc::UnboundedReceiver<(OpenTelemetryContext, Command)>,
3488        group_commit_rx: appends::GroupCommitWaiter,
3489    ) -> LocalBoxFuture<'static, ()> {
3490        async move {
3491            // Watcher that listens for and reports cluster service status changes.
3492            let mut cluster_events = self.controller.events_stream();
3493            let last_message = Arc::new(Mutex::new(LastMessage {
3494                kind: "none",
3495                stmt: None,
3496            }));
3497
3498            let (idle_tx, mut idle_rx) = tokio::sync::mpsc::channel(1);
3499            let idle_metric = self.metrics.queue_busy_seconds.clone();
3500            let last_message_watchdog = Arc::clone(&last_message);
3501
3502            spawn(|| "coord watchdog", async move {
3503                // Every 5 seconds, attempt to measure how long it takes for the
3504                // coord select loop to be empty, because this message is the last
3505                // processed. If it is idle, this will result in some microseconds
3506                // of measurement.
3507                let mut interval = tokio::time::interval(Duration::from_secs(5));
3508                // If we end up having to wait more than 5 seconds for the coord to respond, then the
3509                // behavior of Delay results in the interval "restarting" from whenever we yield
3510                // instead of trying to catch up.
3511                interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
3512
3513                // Track if we become stuck to de-dupe error reporting.
3514                let mut coord_stuck = false;
3515
3516                loop {
3517                    interval.tick().await;
3518
3519                    // Wait for space in the channel, if we timeout then the coordinator is stuck!
3520                    let duration = tokio::time::Duration::from_secs(30);
3521                    let timeout = tokio::time::timeout(duration, idle_tx.reserve()).await;
3522                    let Ok(maybe_permit) = timeout else {
3523                        // Only log if we're newly stuck, to prevent logging repeatedly.
3524                        if !coord_stuck {
3525                            let last_message = last_message_watchdog.lock().expect("poisoned");
3526                            tracing::warn!(
3527                                last_message_kind = %last_message.kind,
3528                                last_message_sql = %last_message.stmt_to_string(),
3529                                "coordinator stuck for {duration:?}",
3530                            );
3531                        }
3532                        coord_stuck = true;
3533
3534                        continue;
3535                    };
3536
3537                    // We got a permit, we're not stuck!
3538                    if coord_stuck {
3539                        tracing::info!("Coordinator became unstuck");
3540                    }
3541                    coord_stuck = false;
3542
3543                    // If we failed to acquire a permit it's because we're shutting down.
3544                    let Ok(permit) = maybe_permit else {
3545                        break;
3546                    };
3547
3548                    permit.send(idle_metric.start_timer());
3549                }
3550            });
3551
3552            self.schedule_storage_usage_collection().await;
3553            self.schedule_arrangement_sizes_collection().await;
3554            self.spawn_privatelink_vpc_endpoints_watch_task();
3555            self.spawn_statement_logging_task();
3556            flags::tracing_config(self.catalog.system_config()).apply(&self.tracing_handle);
3557
3558            // Report if the handling of a single message takes longer than this threshold.
3559            let warn_threshold = self
3560                .catalog()
3561                .system_config()
3562                .coord_slow_message_warn_threshold();
3563
3564            // How many messages we'd like to batch up before processing them. Must be > 0.
3565            const MESSAGE_BATCH: usize = 64;
3566            let mut messages = Vec::with_capacity(MESSAGE_BATCH);
3567            let mut cmd_messages = Vec::with_capacity(MESSAGE_BATCH);
3568
3569            let message_batch = self.metrics.message_batch.clone();
3570
3571            loop {
3572                // Before adding a branch to this select loop, please ensure that the branch is
3573                // cancellation safe and add a comment explaining why. You can refer here for more
3574                // info: https://docs.rs/tokio/latest/tokio/macro.select.html#cancellation-safety
3575                select! {
3576                    // We prioritize internal commands over other commands. However, we work through
3577                    // batches of commands in some branches of this select, which means that even if
3578                    // a command generates internal commands, we will work through the current batch
3579                    // before receiving a new batch of commands.
3580                    biased;
3581
3582                    // `recv_many()` on `UnboundedReceiver` is cancellation safe:
3583                    // https://docs.rs/tokio/1.38.0/tokio/sync/mpsc/struct.UnboundedReceiver.html#cancel-safety-1
3584                    // Receive a batch of commands.
3585                    _ = internal_cmd_rx.recv_many(&mut messages, MESSAGE_BATCH) => {},
3586                    // `next()` on any stream is cancel-safe:
3587                    // https://docs.rs/tokio-stream/0.1.9/tokio_stream/trait.StreamExt.html#cancel-safety
3588                    // Receive a single command.
3589                    Some(event) = cluster_events.next() => {
3590                        messages.push(Message::ClusterEvent(event))
3591                    },
3592                    // See [`mz_controller::Controller::Controller::ready`] for notes
3593                    // on why this is cancel-safe.
3594                    // Receive a single command.
3595                    () = self.controller.ready() => {
3596                        // NOTE: We don't get a `Readiness` back from `ready()`
3597                        // because the controller wants to keep it and it's not
3598                        // trivially `Clone` or `Copy`. Hence this accessor.
3599                        let controller = match self.controller.get_readiness() {
3600                            Readiness::Storage => ControllerReadiness::Storage,
3601                            Readiness::Compute => ControllerReadiness::Compute,
3602                            Readiness::Metrics(_) => ControllerReadiness::Metrics,
3603                            Readiness::Internal(_) => ControllerReadiness::Internal,
3604                            Readiness::NotReady => unreachable!("just signaled as ready"),
3605                        };
3606                        messages.push(Message::ControllerReady { controller });
3607                    }
3608                    // See [`appends::GroupCommitWaiter`] for notes on why this is cancel safe.
3609                    // Receive a single command.
3610                    permit = group_commit_rx.ready() => {
3611                        // If we happen to have batched exactly one user write, use
3612                        // that span so the `emit_trace_id_notice` hooks up.
3613                        // Otherwise, the best we can do is invent a new root span
3614                        // and make it follow from all the Spans in the pending
3615                        // writes.
3616                        let user_write_spans = self.pending_writes.iter().flat_map(|x| match x {
3617                            PendingWriteTxn::User{span, ..} => Some(span),
3618                            PendingWriteTxn::System{..} => None,
3619                        });
3620                        let span = match user_write_spans.exactly_one() {
3621                            Ok(span) => span.clone(),
3622                            Err(user_write_spans) => {
3623                                let span = info_span!(parent: None, "group_commit_notify");
3624                                for s in user_write_spans {
3625                                    span.follows_from(s);
3626                                }
3627                                span
3628                            }
3629                        };
3630                        messages.push(Message::GroupCommitInitiate(span, Some(permit)));
3631                    },
3632                    // `recv_many()` on `UnboundedReceiver` is cancellation safe:
3633                    // https://docs.rs/tokio/1.38.0/tokio/sync/mpsc/struct.UnboundedReceiver.html#cancel-safety-1
3634                    // Receive a batch of commands.
3635                    count = cmd_rx.recv_many(&mut cmd_messages, MESSAGE_BATCH) => {
3636                        if count == 0 {
3637                            break;
3638                        } else {
3639                            messages.extend(cmd_messages.drain(..).map(
3640                                |(otel_ctx, cmd)| Message::Command(otel_ctx, cmd),
3641                            ));
3642                        }
3643                    },
3644                    // `recv()` on `UnboundedReceiver` is cancellation safe:
3645                    // https://docs.rs/tokio/1.38.0/tokio/sync/mpsc/struct.UnboundedReceiver.html#cancel-safety
3646                    // Receive a single command.
3647                    Some(pending_read_txn) = strict_serializable_reads_rx.recv() => {
3648                        let mut pending_read_txns = vec![pending_read_txn];
3649                        while let Ok(pending_read_txn) = strict_serializable_reads_rx.try_recv() {
3650                            pending_read_txns.push(pending_read_txn);
3651                        }
3652                        for (conn_id, pending_read_txn) in pending_read_txns {
3653                            let prev = self
3654                                .pending_linearize_read_txns
3655                                .insert(conn_id, pending_read_txn);
3656                            soft_assert_or_log!(
3657                                prev.is_none(),
3658                                "connections can not have multiple concurrent reads, prev: {prev:?}"
3659                            )
3660                        }
3661                        messages.push(Message::LinearizeReads);
3662                    }
3663                    // `tick()` on `Interval` is cancel-safe:
3664                    // https://docs.rs/tokio/1.19.2/tokio/time/struct.Interval.html#cancel-safety
3665                    // Receive a single command.
3666                    _ = self.advance_timelines_interval.tick() => {
3667                        let span = info_span!(parent: None, "coord::advance_timelines_interval");
3668                        span.follows_from(Span::current());
3669
3670                        // Group commit sends an `AdvanceTimelines` message when
3671                        // done, which is what downgrades read holds. In
3672                        // read-only mode we send this message directly because
3673                        // we're not doing group commits.
3674                        if self.controller.read_only() {
3675                            messages.push(Message::AdvanceTimelines);
3676                        } else {
3677                            messages.push(Message::GroupCommitInitiate(span, None));
3678                        }
3679                    },
3680                    // `tick()` on `Interval` is cancel-safe:
3681                    // https://docs.rs/tokio/1.19.2/tokio/time/struct.Interval.html#cancel-safety
3682                    // Receive a single command.
3683                    _ = self.check_cluster_scheduling_policies_interval.tick() => {
3684                        messages.push(Message::CheckSchedulingPolicies);
3685                    },
3686
3687                    // `tick()` on `Interval` is cancel-safe:
3688                    // https://docs.rs/tokio/1.19.2/tokio/time/struct.Interval.html#cancel-safety
3689                    // Receive a single command.
3690                    _ = self.caught_up_check_interval.tick() => {
3691                        // We do this directly on the main loop instead of
3692                        // firing off a message. We are still in read-only mode,
3693                        // so optimizing for latency, not blocking the main loop
3694                        // is not that important.
3695                        self.maybe_check_caught_up().await;
3696
3697                        continue;
3698                    },
3699
3700                    // Process the idle metric at the lowest priority to sample queue non-idle time.
3701                    // `recv()` on `Receiver` is cancellation safe:
3702                    // https://docs.rs/tokio/1.8.0/tokio/sync/mpsc/struct.Receiver.html#cancel-safety
3703                    // Receive a single command.
3704                    timer = idle_rx.recv() => {
3705                        timer.expect("does not drop").observe_duration();
3706                        self.metrics
3707                            .message_handling
3708                            .with_label_values(&["watchdog"])
3709                            .observe(0.0);
3710                        continue;
3711                    }
3712                };
3713
3714                // Observe the number of messages we're processing at once.
3715                message_batch.observe(f64::cast_lossy(messages.len()));
3716
3717                for msg in messages.drain(..) {
3718                    // All message processing functions trace. Start a parent span
3719                    // for them to make it easy to find slow messages.
3720                    let msg_kind = msg.kind();
3721                    let span = span!(
3722                        target: "mz_adapter::coord::handle_message_loop",
3723                        Level::INFO,
3724                        "coord::handle_message",
3725                        kind = msg_kind
3726                    );
3727                    let otel_context = span.context().span().span_context().clone();
3728
3729                    // Record the last kind of message in case we get stuck. For
3730                    // execute commands, we additionally stash the user's SQL,
3731                    // statement, so we can log it in case we get stuck.
3732                    *last_message.lock().expect("poisoned") = LastMessage {
3733                        kind: msg_kind,
3734                        stmt: match &msg {
3735                            Message::Command(
3736                                _,
3737                                Command::Execute {
3738                                    portal_name,
3739                                    session,
3740                                    ..
3741                                },
3742                            ) => session
3743                                .get_portal_unverified(portal_name)
3744                                .and_then(|p| p.stmt.as_ref().map(Arc::clone)),
3745                            _ => None,
3746                        },
3747                    };
3748
3749                    let start = Instant::now();
3750                    self.handle_message(msg).instrument(span).await;
3751                    let duration = start.elapsed();
3752
3753                    self.metrics
3754                        .message_handling
3755                        .with_label_values(&[msg_kind])
3756                        .observe(duration.as_secs_f64());
3757
3758                    // If something is _really_ slow, print a trace id for debugging, if OTEL is enabled.
3759                    if duration > warn_threshold {
3760                        let trace_id = otel_context.is_valid().then(|| otel_context.trace_id());
3761                        tracing::error!(
3762                            ?msg_kind,
3763                            ?trace_id,
3764                            ?duration,
3765                            "very slow coordinator message"
3766                        );
3767                    }
3768                }
3769            }
3770            // Try and cleanup as a best effort. There may be some async tasks out there holding a
3771            // reference that prevents us from cleaning up.
3772            if let Some(catalog) = Arc::into_inner(self.catalog) {
3773                catalog.expire().await;
3774            }
3775        }
3776        .boxed_local()
3777    }
3778
3779    /// Obtain a read-only Catalog reference.
3780    fn catalog(&self) -> &Catalog {
3781        &self.catalog
3782    }
3783
3784    /// Obtain a read-only Catalog snapshot, suitable for giving out to
3785    /// non-Coordinator thread tasks.
3786    fn owned_catalog(&self) -> Arc<Catalog> {
3787        Arc::clone(&self.catalog)
3788    }
3789
3790    /// Obtain a handle to the optimizer metrics, suitable for giving
3791    /// out to non-Coordinator thread tasks.
3792    fn optimizer_metrics(&self) -> OptimizerMetrics {
3793        self.optimizer_metrics.clone()
3794    }
3795
3796    /// Obtain a writeable Catalog reference.
3797    fn catalog_mut(&mut self) -> &mut Catalog {
3798        // make_mut will cause any other Arc references (from owned_catalog) to
3799        // continue to be valid by cloning the catalog, putting it in a new Arc,
3800        // which lives at self._catalog. If there are no other Arc references,
3801        // then no clone is made, and it returns a reference to the existing
3802        // object. This makes this method and owned_catalog both very cheap: at
3803        // most one clone per catalog mutation, but only if there's a read-only
3804        // reference to it.
3805        Arc::make_mut(&mut self.catalog)
3806    }
3807
3808    /// Refills the user ID pool by allocating IDs from the catalog.
3809    ///
3810    /// Requests `max(min_count, batch_size)` IDs so the pool is never
3811    /// under-filled relative to the configured batch size.
3812    async fn refill_user_id_pool(&mut self, min_count: u64) -> Result<(), AdapterError> {
3813        let batch_size = USER_ID_POOL_BATCH_SIZE.get(self.catalog().system_config().dyncfgs());
3814        let to_allocate = min_count.max(u64::from(batch_size));
3815        let id_ts = self.get_catalog_write_ts().await;
3816        let ids = self.catalog().allocate_user_ids(to_allocate, id_ts).await?;
3817        if let (Some((first_id, _)), Some((last_id, _))) = (ids.first(), ids.last()) {
3818            let start = match first_id {
3819                CatalogItemId::User(id) => *id,
3820                other => {
3821                    return Err(AdapterError::Internal(format!(
3822                        "expected User CatalogItemId, got {other:?}"
3823                    )));
3824                }
3825            };
3826            let end = match last_id {
3827                CatalogItemId::User(id) => *id + 1, // exclusive upper bound
3828                other => {
3829                    return Err(AdapterError::Internal(format!(
3830                        "expected User CatalogItemId, got {other:?}"
3831                    )));
3832                }
3833            };
3834            self.user_id_pool.refill(start, end);
3835        } else {
3836            return Err(AdapterError::Internal(
3837                "catalog returned no user IDs".into(),
3838            ));
3839        }
3840        Ok(())
3841    }
3842
3843    /// Allocates a single user ID, refilling the pool from the catalog if needed.
3844    async fn allocate_user_id(&mut self) -> Result<(CatalogItemId, GlobalId), AdapterError> {
3845        if let Some(id) = self.user_id_pool.allocate() {
3846            return Ok((CatalogItemId::User(id), GlobalId::User(id)));
3847        }
3848        self.refill_user_id_pool(1).await?;
3849        let id = self.user_id_pool.allocate().expect("ID pool just refilled");
3850        Ok((CatalogItemId::User(id), GlobalId::User(id)))
3851    }
3852
3853    /// Allocates `count` user IDs, refilling the pool from the catalog if needed.
3854    async fn allocate_user_ids(
3855        &mut self,
3856        count: u64,
3857    ) -> Result<Vec<(CatalogItemId, GlobalId)>, AdapterError> {
3858        if self.user_id_pool.remaining() < count {
3859            self.refill_user_id_pool(count).await?;
3860        }
3861        let raw_ids = self
3862            .user_id_pool
3863            .allocate_many(count)
3864            .expect("pool has enough IDs after refill");
3865        Ok(raw_ids
3866            .into_iter()
3867            .map(|id| (CatalogItemId::User(id), GlobalId::User(id)))
3868            .collect())
3869    }
3870
3871    /// Obtain a reference to the coordinator's connection context.
3872    fn connection_context(&self) -> &ConnectionContext {
3873        self.controller.connection_context()
3874    }
3875
3876    /// Obtain a reference to the coordinator's secret reader, in an `Arc`.
3877    fn secrets_reader(&self) -> &Arc<dyn SecretsReader> {
3878        &self.connection_context().secrets_reader
3879    }
3880
3881    /// Publishes a notice message to all sessions.
3882    ///
3883    /// TODO(parkmycar): This code is dead, but is a nice parallel to [`Coordinator::broadcast_notice_tx`]
3884    /// so we keep it around.
3885    #[allow(dead_code)]
3886    pub(crate) fn broadcast_notice(&self, notice: AdapterNotice) {
3887        for meta in self.active_conns.values() {
3888            let _ = meta.notice_tx.send(notice.clone());
3889        }
3890    }
3891
3892    /// Returns a closure that will publish a notice to all sessions that were active at the time
3893    /// this method was called.
3894    pub(crate) fn broadcast_notice_tx(
3895        &self,
3896    ) -> Box<dyn FnOnce(AdapterNotice) -> () + Send + 'static> {
3897        let senders: Vec<_> = self
3898            .active_conns
3899            .values()
3900            .map(|meta| meta.notice_tx.clone())
3901            .collect();
3902        Box::new(move |notice| {
3903            for tx in senders {
3904                let _ = tx.send(notice.clone());
3905            }
3906        })
3907    }
3908
3909    pub(crate) fn active_conns(&self) -> &BTreeMap<ConnectionId, ConnMeta> {
3910        &self.active_conns
3911    }
3912
3913    #[instrument(level = "debug")]
3914    pub(crate) fn retire_execution(
3915        &mut self,
3916        reason: StatementEndedExecutionReason,
3917        ctx_extra: ExecuteContextExtra,
3918    ) {
3919        if let Some(uuid) = ctx_extra.retire() {
3920            self.end_statement_execution(uuid, reason);
3921        }
3922    }
3923
3924    /// Creates a new dataflow builder from the catalog and indexes in `self`.
3925    #[instrument(level = "debug")]
3926    pub fn dataflow_builder(&self, instance: ComputeInstanceId) -> DataflowBuilder<'_> {
3927        let compute = self
3928            .instance_snapshot(instance)
3929            .expect("compute instance does not exist");
3930        DataflowBuilder::new(self.catalog().state(), compute)
3931    }
3932
3933    /// Return a reference-less snapshot to the indicated compute instance.
3934    pub fn instance_snapshot(
3935        &self,
3936        id: ComputeInstanceId,
3937    ) -> Result<ComputeInstanceSnapshot, InstanceMissing> {
3938        ComputeInstanceSnapshot::new(&self.controller, id)
3939    }
3940
3941    /// Call into the compute controller to install a finalized dataflow, and
3942    /// initialize the read policies for its exported readable objects.
3943    ///
3944    /// # Panics
3945    ///
3946    /// Panics if dataflow creation fails.
3947    pub(crate) async fn ship_dataflow(
3948        &mut self,
3949        dataflow: DataflowDescription<Plan>,
3950        instance: ComputeInstanceId,
3951        target_replica: Option<ReplicaId>,
3952    ) {
3953        self.try_ship_dataflow(dataflow, instance, target_replica)
3954            .await
3955            .unwrap_or_terminate("dataflow creation cannot fail");
3956    }
3957
3958    /// Call into the compute controller to install a finalized dataflow, and
3959    /// initialize the read policies for its exported readable objects.
3960    pub(crate) async fn try_ship_dataflow(
3961        &mut self,
3962        dataflow: DataflowDescription<Plan>,
3963        instance: ComputeInstanceId,
3964        target_replica: Option<ReplicaId>,
3965    ) -> Result<(), DataflowCreationError> {
3966        // We must only install read policies for indexes, not for sinks.
3967        // Sinks are write-only compute collections that don't have read policies.
3968        let export_ids = dataflow.exported_index_ids().collect();
3969
3970        self.controller
3971            .compute
3972            .create_dataflow(instance, dataflow, target_replica)?;
3973
3974        self.initialize_compute_read_policies(export_ids, instance, CompactionWindow::Default)
3975            .await;
3976
3977        Ok(())
3978    }
3979
3980    /// Call into the compute controller to allow writes to the specified IDs
3981    /// from the specified instance. Calling this function multiple times and
3982    /// calling it on a read-only instance has no effect.
3983    pub(crate) fn allow_writes(&mut self, instance: ComputeInstanceId, id: GlobalId) {
3984        self.controller
3985            .compute
3986            .allow_writes(instance, id)
3987            .unwrap_or_terminate("allow_writes cannot fail");
3988    }
3989
3990    /// Like `ship_dataflow`, but also await on builtin table updates.
3991    pub(crate) async fn ship_dataflow_and_notice_builtin_table_updates(
3992        &mut self,
3993        dataflow: DataflowDescription<Plan>,
3994        instance: ComputeInstanceId,
3995        notice_builtin_updates_fut: Option<BuiltinTableAppendNotify>,
3996        target_replica: Option<ReplicaId>,
3997    ) {
3998        if let Some(notice_builtin_updates_fut) = notice_builtin_updates_fut {
3999            let ship_dataflow_fut = self.ship_dataflow(dataflow, instance, target_replica);
4000            let ((), ()) =
4001                futures::future::join(notice_builtin_updates_fut, ship_dataflow_fut).await;
4002        } else {
4003            self.ship_dataflow(dataflow, instance, target_replica).await;
4004        }
4005    }
4006
4007    /// Install a _watch set_ in the controller that is automatically associated with the given
4008    /// connection id. The watchset will be automatically cleared if the connection terminates
4009    /// before the watchset completes.
4010    pub fn install_compute_watch_set(
4011        &mut self,
4012        conn_id: ConnectionId,
4013        objects: BTreeSet<GlobalId>,
4014        t: Timestamp,
4015        state: WatchSetResponse,
4016    ) -> Result<(), CollectionLookupError> {
4017        let ws_id = self.controller.install_compute_watch_set(objects, t)?;
4018        self.connection_watch_sets
4019            .entry(conn_id.clone())
4020            .or_default()
4021            .insert(ws_id);
4022        self.installed_watch_sets.insert(ws_id, (conn_id, state));
4023        Ok(())
4024    }
4025
4026    /// Install a _watch set_ in the controller that is automatically associated with the given
4027    /// connection id. The watchset will be automatically cleared if the connection terminates
4028    /// before the watchset completes.
4029    pub fn install_storage_watch_set(
4030        &mut self,
4031        conn_id: ConnectionId,
4032        objects: BTreeSet<GlobalId>,
4033        t: Timestamp,
4034        state: WatchSetResponse,
4035    ) -> Result<(), CollectionMissing> {
4036        let ws_id = self.controller.install_storage_watch_set(objects, t)?;
4037        self.connection_watch_sets
4038            .entry(conn_id.clone())
4039            .or_default()
4040            .insert(ws_id);
4041        self.installed_watch_sets.insert(ws_id, (conn_id, state));
4042        Ok(())
4043    }
4044
4045    /// Cancels pending watchsets associated with the provided connection id.
4046    pub fn cancel_pending_watchsets(&mut self, conn_id: &ConnectionId) {
4047        if let Some(ws_ids) = self.connection_watch_sets.remove(conn_id) {
4048            for ws_id in ws_ids {
4049                self.installed_watch_sets.remove(&ws_id);
4050            }
4051        }
4052    }
4053
4054    /// Returns the state of the [`Coordinator`] formatted as JSON.
4055    ///
4056    /// The returned value is not guaranteed to be stable and may change at any point in time.
4057    pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
4058        // Note: We purposefully use the `Debug` formatting for the value of all fields in the
4059        // returned object as a tradeoff between usability and stability. `serde_json` will fail
4060        // to serialize an object if the keys aren't strings, so `Debug` formatting the values
4061        // prevents a future unrelated change from silently breaking this method.
4062
4063        let global_timelines: BTreeMap<_, _> = self
4064            .global_timelines
4065            .iter()
4066            .map(|(timeline, state)| (timeline.to_string(), format!("{state:?}")))
4067            .collect();
4068        let active_conns: BTreeMap<_, _> = self
4069            .active_conns
4070            .iter()
4071            .map(|(id, meta)| (id.unhandled().to_string(), format!("{meta:?}")))
4072            .collect();
4073        let txn_read_holds: BTreeMap<_, _> = self
4074            .txn_read_holds
4075            .iter()
4076            .map(|(id, capability)| (id.unhandled().to_string(), format!("{capability:?}")))
4077            .collect();
4078        let pending_peeks: BTreeMap<_, _> = self
4079            .pending_peeks
4080            .iter()
4081            .map(|(id, peek)| (id.to_string(), format!("{peek:?}")))
4082            .collect();
4083        let client_pending_peeks: BTreeMap<_, _> = self
4084            .client_pending_peeks
4085            .iter()
4086            .map(|(id, peek)| {
4087                let peek: BTreeMap<_, _> = peek
4088                    .iter()
4089                    .map(|(uuid, storage_id)| (uuid.to_string(), storage_id))
4090                    .collect();
4091                (id.to_string(), peek)
4092            })
4093            .collect();
4094        let pending_linearize_read_txns: BTreeMap<_, _> = self
4095            .pending_linearize_read_txns
4096            .iter()
4097            .map(|(id, read_txn)| (id.unhandled().to_string(), format!("{read_txn:?}")))
4098            .collect();
4099
4100        Ok(serde_json::json!({
4101            "global_timelines": global_timelines,
4102            "active_conns": active_conns,
4103            "txn_read_holds": txn_read_holds,
4104            "pending_peeks": pending_peeks,
4105            "client_pending_peeks": client_pending_peeks,
4106            "pending_linearize_read_txns": pending_linearize_read_txns,
4107            "controller": self.controller.dump().await?,
4108        }))
4109    }
4110
4111    /// Prune all storage usage events from the [`MZ_STORAGE_USAGE_BY_SHARD`] table that are older
4112    /// than `retention_period`.
4113    ///
4114    /// This method will read the entire contents of [`MZ_STORAGE_USAGE_BY_SHARD`] into memory
4115    /// which can be expensive.
4116    ///
4117    /// DO NOT call this method outside of startup. The safety of reading at the current oracle read
4118    /// timestamp and then writing at whatever the current write timestamp is (instead of
4119    /// `read_ts + 1`) relies on the fact that there are no outstanding writes during startup.
4120    ///
4121    /// Group commit, which this method uses to write the retractions, has builtin fencing, and we
4122    /// never commit retractions to [`MZ_STORAGE_USAGE_BY_SHARD`] outside of this method, which is
4123    /// only called once during startup. So we don't have to worry about double/invalid retractions.
4124    async fn prune_storage_usage_events_on_startup(&self, retention_period: Duration) {
4125        let item_id = self
4126            .catalog()
4127            .resolve_builtin_table(&MZ_STORAGE_USAGE_BY_SHARD);
4128        let global_id = self.catalog.get_entry(&item_id).latest_global_id();
4129        let read_ts = self.get_local_read_ts().await;
4130        let current_contents_fut = self
4131            .controller
4132            .storage_collections
4133            .snapshot(global_id, read_ts);
4134        let internal_cmd_tx = self.internal_cmd_tx.clone();
4135        spawn(|| "storage_usage_prune", async move {
4136            let mut current_contents = current_contents_fut
4137                .await
4138                .unwrap_or_terminate("cannot fail to fetch snapshot");
4139            differential_dataflow::consolidation::consolidate(&mut current_contents);
4140
4141            let cutoff_ts = u128::from(read_ts).saturating_sub(retention_period.as_millis());
4142            let mut expired = Vec::new();
4143            for (row, diff) in current_contents {
4144                assert_eq!(
4145                    diff, 1,
4146                    "consolidated contents should not contain retractions: ({row:#?}, {diff:#?})"
4147                );
4148                // This logic relies on the definition of `mz_storage_usage_by_shard` not changing.
4149                let collection_timestamp = row
4150                    .unpack()
4151                    .get(3)
4152                    .expect("definition of mz_storage_by_shard changed")
4153                    .unwrap_timestamptz();
4154                let collection_timestamp = collection_timestamp.timestamp_millis();
4155                let collection_timestamp: u128 = collection_timestamp
4156                    .try_into()
4157                    .expect("all collections happen after Jan 1 1970");
4158                if collection_timestamp < cutoff_ts {
4159                    debug!("pruning storage event {row:?}");
4160                    let builtin_update = BuiltinTableUpdate::row(item_id, row, Diff::MINUS_ONE);
4161                    expired.push(builtin_update);
4162                }
4163            }
4164
4165            // main thread has shut down.
4166            let _ = internal_cmd_tx.send(Message::StorageUsagePrune(expired));
4167        });
4168    }
4169
4170    /// Retracts `mz_object_arrangement_size_history` rows older than the
4171    /// `arrangement_size_history_retention_period` dyncfg.
4172    ///
4173    /// Must only run at startup: it reads at the oracle read timestamp and
4174    /// writes retractions at the current write timestamp, which is only safe
4175    /// when no other writes are in flight. See [the equivalent storage-usage
4176    /// pruner](Self::prune_storage_usage_events_on_startup) for the same
4177    /// reasoning.
4178    async fn prune_arrangement_sizes_history_on_startup(&self) {
4179        // The catalog server is not writable in read-only mode.
4180        if self.controller.read_only() {
4181            return;
4182        }
4183
4184        let retention_period = mz_adapter_types::dyncfgs::ARRANGEMENT_SIZE_HISTORY_RETENTION_PERIOD
4185            .get(self.catalog().system_config().dyncfgs());
4186        let item_id = self
4187            .catalog()
4188            .resolve_builtin_table(&mz_catalog::builtin::MZ_OBJECT_ARRANGEMENT_SIZE_HISTORY);
4189        let global_id = self.catalog.get_entry(&item_id).latest_global_id();
4190        let read_ts = self.get_local_read_ts().await;
4191        let current_contents_fut = self
4192            .controller
4193            .storage_collections
4194            .snapshot(global_id, read_ts);
4195        let internal_cmd_tx = self.internal_cmd_tx.clone();
4196        spawn(|| "arrangement_sizes_history_prune", async move {
4197            let mut current_contents = current_contents_fut
4198                .await
4199                .unwrap_or_terminate("cannot fail to fetch snapshot");
4200            differential_dataflow::consolidation::consolidate(&mut current_contents);
4201
4202            let cutoff_ts = u128::from(read_ts).saturating_sub(retention_period.as_millis());
4203            let expired =
4204                arrangement_sizes_expired_retractions(current_contents, cutoff_ts, item_id);
4205
4206            // TODO(arrangement-sizes): when the writeable-catalog-server
4207            // plumbing in https://github.com/MaterializeInc/materialize/pull/35436
4208            // lands, retract directly on `mz_catalog_server`.
4209            let _ = internal_cmd_tx.send(Message::ArrangementSizesPrune(expired));
4210        });
4211    }
4212
4213    fn current_credit_consumption_rate(&self) -> Numeric {
4214        self.catalog()
4215            .user_cluster_replicas()
4216            .filter_map(|replica| match &replica.config.location {
4217                ReplicaLocation::Managed(location) => Some(location.size_for_billing()),
4218                ReplicaLocation::Unmanaged(_) => None,
4219            })
4220            .map(|size| {
4221                self.catalog()
4222                    .cluster_replica_sizes()
4223                    .0
4224                    .get(size)
4225                    .expect("location size is validated against the cluster replica sizes")
4226                    .credits_per_hour
4227            })
4228            .sum()
4229    }
4230}
4231
4232/// Returns retraction updates for rows in a consolidated
4233/// `mz_object_arrangement_size_history` snapshot whose `collection_timestamp`
4234/// (column 3) is strictly before `cutoff_ts`.
4235///
4236/// Panics if any input row has `diff != 1`: the caller must consolidate first,
4237/// and a consolidated history table should never contain retractions because
4238/// the only source of retractions is this function itself.
4239fn arrangement_sizes_expired_retractions(
4240    rows: impl IntoIterator<Item = (mz_repr::Row, i64)>,
4241    cutoff_ts: u128,
4242    item_id: CatalogItemId,
4243) -> Vec<BuiltinTableUpdate> {
4244    let mut expired = Vec::new();
4245    for (row, diff) in rows {
4246        assert_eq!(
4247            diff, 1,
4248            "consolidated contents should not contain retractions: ({row:#?}, {diff:#?})"
4249        );
4250        let collection_timestamp = row
4251            .unpack()
4252            .get(3)
4253            .expect("definition of mz_object_arrangement_size_history changed")
4254            .unwrap_timestamptz()
4255            .timestamp_millis();
4256        let collection_timestamp: u128 = collection_timestamp
4257            .try_into()
4258            .expect("all collections happen after Jan 1 1970");
4259        if collection_timestamp < cutoff_ts {
4260            expired.push(BuiltinTableUpdate::row(item_id, row, Diff::MINUS_ONE));
4261        }
4262    }
4263    expired
4264}
4265
4266#[cfg(test)]
4267impl Coordinator {
4268    #[allow(dead_code)]
4269    async fn verify_ship_dataflow_no_error(&mut self, dataflow: DataflowDescription<Plan>) {
4270        // `ship_dataflow_new` is not allowed to have a `Result` return because this function is
4271        // called after `catalog_transact`, after which no errors are allowed. This test exists to
4272        // prevent us from incorrectly teaching those functions how to return errors (which has
4273        // happened twice and is the motivation for this test).
4274
4275        // An arbitrary compute instance ID to satisfy the function calls below. Note that
4276        // this only works because this function will never run.
4277        let compute_instance = ComputeInstanceId::user(1).expect("1 is a valid ID");
4278
4279        let _: () = self.ship_dataflow(dataflow, compute_instance, None).await;
4280    }
4281}
4282
4283/// Contains information about the last message the [`Coordinator`] processed.
4284struct LastMessage {
4285    kind: &'static str,
4286    stmt: Option<Arc<Statement<Raw>>>,
4287}
4288
4289impl LastMessage {
4290    /// Returns a redacted version of the statement that is safe for logs.
4291    fn stmt_to_string(&self) -> Cow<'static, str> {
4292        self.stmt
4293            .as_ref()
4294            .map(|stmt| stmt.to_ast_string_redacted().into())
4295            .unwrap_or(Cow::Borrowed("<none>"))
4296    }
4297}
4298
4299impl fmt::Debug for LastMessage {
4300    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4301        f.debug_struct("LastMessage")
4302            .field("kind", &self.kind)
4303            .field("stmt", &self.stmt_to_string())
4304            .finish()
4305    }
4306}
4307
4308impl Drop for LastMessage {
4309    fn drop(&mut self) {
4310        // Only print the last message if we're currently panicking, otherwise we'd spam our logs.
4311        if std::thread::panicking() {
4312            // If we're panicking theres no guarantee `tracing` still works, so print to stderr.
4313            eprintln!("Coordinator panicking, dumping last message\n{self:?}",);
4314        }
4315    }
4316}
4317
4318/// Serves the coordinator based on the provided configuration.
4319///
4320/// For a high-level description of the coordinator, see the [crate
4321/// documentation](crate).
4322///
4323/// Returns a handle to the coordinator and a client to communicate with the
4324/// coordinator.
4325///
4326/// BOXED FUTURE: As of Nov 2023 the returned Future from this function was 42KB. This would
4327/// get stored on the stack which is bad for runtime performance, and blow up our stack usage.
4328/// Because of that we purposefully move this Future onto the heap (i.e. Box it).
4329pub fn serve(
4330    Config {
4331        controller_config,
4332        controller_envd_epoch,
4333        mut storage,
4334        audit_logs_iterator,
4335        timestamp_oracle_url,
4336        unsafe_mode,
4337        all_features,
4338        build_info,
4339        environment_id,
4340        metrics_registry,
4341        now,
4342        secrets_controller,
4343        cloud_resource_controller,
4344        cluster_replica_sizes,
4345        builtin_system_cluster_config,
4346        builtin_catalog_server_cluster_config,
4347        builtin_probe_cluster_config,
4348        builtin_support_cluster_config,
4349        builtin_analytics_cluster_config,
4350        system_parameter_defaults,
4351        availability_zones,
4352        storage_usage_client,
4353        storage_usage_collection_interval,
4354        storage_usage_retention_period,
4355        segment_client,
4356        egress_addresses,
4357        aws_account_id,
4358        aws_privatelink_availability_zones,
4359        connection_context,
4360        connection_limit_callback,
4361        remote_system_parameters,
4362        webhook_concurrency_limit,
4363        http_host_name,
4364        tracing_handle,
4365        read_only_controllers,
4366        caught_up_trigger: clusters_caught_up_trigger,
4367        helm_chart_version,
4368        license_key,
4369        external_login_password_mz_system,
4370        force_builtin_schema_migration,
4371    }: Config,
4372) -> BoxFuture<'static, Result<(Handle, Client), AdapterError>> {
4373    async move {
4374        let coord_start = Instant::now();
4375        info!("startup: coordinator init: beginning");
4376        info!("startup: coordinator init: preamble beginning");
4377
4378        // Initializing the builtins can be an expensive process and consume a lot of memory. We
4379        // forcibly initialize it early while the stack is relatively empty to avoid stack
4380        // overflows later.
4381        let _builtins = LazyLock::force(&BUILTINS_STATIC);
4382
4383        let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
4384        let (internal_cmd_tx, internal_cmd_rx) = mpsc::unbounded_channel();
4385        let (strict_serializable_reads_tx, strict_serializable_reads_rx) =
4386            mpsc::unbounded_channel();
4387
4388        // Validate and process availability zones.
4389        if !availability_zones.iter().all_unique() {
4390            coord_bail!("availability zones must be unique");
4391        }
4392
4393        let aws_principal_context = match (
4394            aws_account_id,
4395            connection_context.aws_external_id_prefix.clone(),
4396        ) {
4397            (Some(aws_account_id), Some(aws_external_id_prefix)) => Some(AwsPrincipalContext {
4398                aws_account_id,
4399                aws_external_id_prefix,
4400            }),
4401            _ => None,
4402        };
4403
4404        let aws_privatelink_availability_zones = aws_privatelink_availability_zones
4405            .map(|azs_vec| BTreeSet::from_iter(azs_vec.iter().cloned()));
4406
4407        info!(
4408            "startup: coordinator init: preamble complete in {:?}",
4409            coord_start.elapsed()
4410        );
4411        let oracle_init_start = Instant::now();
4412        info!("startup: coordinator init: timestamp oracle init beginning");
4413
4414        let timestamp_oracle_config = timestamp_oracle_url
4415            .map(|url| TimestampOracleConfig::from_url(&url, &metrics_registry))
4416            .transpose()?;
4417        let mut initial_timestamps =
4418            get_initial_oracle_timestamps(&timestamp_oracle_config).await?;
4419
4420        // Insert an entry for the `EpochMilliseconds` timeline if one doesn't exist,
4421        // which will ensure that the timeline is initialized since it's required
4422        // by the system.
4423        initial_timestamps
4424            .entry(Timeline::EpochMilliseconds)
4425            .or_insert_with(mz_repr::Timestamp::minimum);
4426        let mut timestamp_oracles = BTreeMap::new();
4427        for (timeline, initial_timestamp) in initial_timestamps {
4428            Coordinator::ensure_timeline_state_with_initial_time(
4429                &timeline,
4430                initial_timestamp,
4431                now.clone(),
4432                timestamp_oracle_config.clone(),
4433                &mut timestamp_oracles,
4434                read_only_controllers,
4435            )
4436            .await;
4437        }
4438
4439        // Opening the durable catalog uses one or more timestamps without communicating with
4440        // the timestamp oracle. Here we make sure to apply the catalog upper with the timestamp
4441        // oracle to linearize future operations with opening the catalog.
4442        let catalog_upper = storage.current_upper().await;
4443        // Choose a time at which to boot. This is used, for example, to prune
4444        // old storage usage data or migrate audit log entries.
4445        //
4446        // This time is usually the current system time, but with protection
4447        // against backwards time jumps, even across restarts.
4448        let epoch_millis_oracle = &timestamp_oracles
4449            .get(&Timeline::EpochMilliseconds)
4450            .expect("inserted above")
4451            .oracle;
4452
4453        let mut boot_ts = if read_only_controllers {
4454            let read_ts = epoch_millis_oracle.read_ts().await;
4455            std::cmp::max(read_ts, catalog_upper)
4456        } else {
4457            // Getting/applying a write timestamp bumps the write timestamp in the
4458            // oracle, which we're not allowed in read-only mode.
4459            epoch_millis_oracle.apply_write(catalog_upper).await;
4460            epoch_millis_oracle.write_ts().await.timestamp
4461        };
4462
4463        info!(
4464            "startup: coordinator init: timestamp oracle init complete in {:?}",
4465            oracle_init_start.elapsed()
4466        );
4467
4468        let catalog_open_start = Instant::now();
4469        info!("startup: coordinator init: catalog open beginning");
4470        let persist_client = controller_config
4471            .persist_clients
4472            .open(controller_config.persist_location.clone())
4473            .await
4474            .context("opening persist client")?;
4475        let builtin_item_migration_config =
4476            BuiltinItemMigrationConfig {
4477                persist_client: persist_client.clone(),
4478                read_only: read_only_controllers,
4479                force_migration: force_builtin_schema_migration,
4480            }
4481        ;
4482        let OpenCatalogResult {
4483            mut catalog,
4484            migrated_storage_collections_0dt,
4485            new_builtin_collections,
4486            builtin_table_updates,
4487            cached_global_exprs,
4488            uncached_local_exprs,
4489        } = Catalog::open(mz_catalog::config::Config {
4490            storage,
4491            metrics_registry: &metrics_registry,
4492            state: mz_catalog::config::StateConfig {
4493                unsafe_mode,
4494                all_features,
4495                build_info,
4496                environment_id: environment_id.clone(),
4497                read_only: read_only_controllers,
4498                now: now.clone(),
4499                boot_ts: boot_ts.clone(),
4500                skip_migrations: false,
4501                cluster_replica_sizes,
4502                builtin_system_cluster_config,
4503                builtin_catalog_server_cluster_config,
4504                builtin_probe_cluster_config,
4505                builtin_support_cluster_config,
4506                builtin_analytics_cluster_config,
4507                system_parameter_defaults,
4508                remote_system_parameters,
4509                availability_zones,
4510                egress_addresses,
4511                aws_principal_context,
4512                aws_privatelink_availability_zones,
4513                connection_context,
4514                http_host_name,
4515                builtin_item_migration_config,
4516                persist_client: persist_client.clone(),
4517                enable_expression_cache_override: None,
4518                helm_chart_version,
4519                external_login_password_mz_system,
4520                license_key: license_key.clone(),
4521            },
4522        })
4523        .await?;
4524
4525        // Opening the catalog uses one or more timestamps, so push the boot timestamp up to the
4526        // current catalog upper.
4527        let catalog_upper = catalog.current_upper().await;
4528        boot_ts = std::cmp::max(boot_ts, catalog_upper);
4529
4530        if !read_only_controllers {
4531            epoch_millis_oracle.apply_write(boot_ts).await;
4532        }
4533
4534        info!(
4535            "startup: coordinator init: catalog open complete in {:?}",
4536            catalog_open_start.elapsed()
4537        );
4538
4539        let coord_thread_start = Instant::now();
4540        info!("startup: coordinator init: coordinator thread start beginning");
4541
4542        let session_id = catalog.config().session_id;
4543        let start_instant = catalog.config().start_instant;
4544
4545        // In order for the coordinator to support Rc and Refcell types, it cannot be
4546        // sent across threads. Spawn it in a thread and have this parent thread wait
4547        // for bootstrap completion before proceeding.
4548        let (bootstrap_tx, bootstrap_rx) = oneshot::channel();
4549        let handle = TokioHandle::current();
4550
4551        let metrics = Metrics::register_into(&metrics_registry);
4552        let metrics_clone = metrics.clone();
4553        let optimizer_metrics = OptimizerMetrics::register_into(
4554            &metrics_registry,
4555            catalog.system_config().optimizer_e2e_latency_warning_threshold(),
4556        );
4557        let segment_client_clone = segment_client.clone();
4558        let coord_now = now.clone();
4559        let advance_timelines_interval =
4560            tokio::time::interval(catalog.system_config().default_timestamp_interval());
4561        let mut check_scheduling_policies_interval = tokio::time::interval(
4562            catalog
4563                .system_config()
4564                .cluster_check_scheduling_policies_interval(),
4565        );
4566        check_scheduling_policies_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
4567
4568        let clusters_caught_up_check_interval = if read_only_controllers {
4569            let dyncfgs = catalog.system_config().dyncfgs();
4570            let interval = WITH_0DT_DEPLOYMENT_CAUGHT_UP_CHECK_INTERVAL.get(dyncfgs);
4571
4572            let mut interval = tokio::time::interval(interval);
4573            interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
4574            interval
4575        } else {
4576            // When not in read-only mode, we don't do hydration checks. But we
4577            // still have to provide _some_ interval. This is large enough that
4578            // it doesn't matter.
4579            //
4580            // TODO(aljoscha): We cannot use Duration::MAX right now because of
4581            // https://github.com/tokio-rs/tokio/issues/6634. Use that once it's
4582            // fixed for good.
4583            let mut interval = tokio::time::interval(Duration::from_secs(60 * 60));
4584            interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
4585            interval
4586        };
4587
4588        let clusters_caught_up_check =
4589            clusters_caught_up_trigger.map(|trigger| {
4590                let mut exclude_collections: BTreeSet<GlobalId> =
4591                    new_builtin_collections.into_iter().collect();
4592
4593                // Migrated MVs can't make progress in read-only mode. Exclude them and all their
4594                // transitive dependents.
4595                //
4596                // TODO: Consider sending `allow_writes` for the dataflows of migrated MVs, which
4597                //       would allow them to make progress even in read-only mode. This doesn't
4598                //       work for MVs based on `mz_catalog_raw`, if the leader's version is less
4599                //       than v26.17, since before that version the catalog shard's frontier wasn't
4600                //       kept up-to-date with the current time. So this workaround has to remain in
4601                //       place upgrades from a version less than v26.17 are no longer supported.
4602                let mut todo: Vec<_> = migrated_storage_collections_0dt
4603                    .iter()
4604                    .filter(|id| {
4605                        catalog.state().get_entry(id).is_materialized_view()
4606                    })
4607                    .copied()
4608                    .collect();
4609                while let Some(item_id) = todo.pop() {
4610                    let entry = catalog.state().get_entry(&item_id);
4611                    exclude_collections.extend(entry.global_ids());
4612                    todo.extend_from_slice(entry.used_by());
4613                }
4614
4615                CaughtUpCheckContext {
4616                    trigger,
4617                    exclude_collections,
4618                }
4619            });
4620
4621        if let Some(TimestampOracleConfig::Postgres(pg_config)) =
4622            timestamp_oracle_config.as_ref()
4623        {
4624            // Apply settings from system vars as early as possible because some
4625            // of them are locked in right when an oracle is first opened!
4626            let pg_timestamp_oracle_params =
4627                flags::timestamp_oracle_config(catalog.system_config());
4628            pg_timestamp_oracle_params.apply(pg_config);
4629        }
4630
4631        // Register a callback so whenever the MAX_CONNECTIONS or SUPERUSER_RESERVED_CONNECTIONS
4632        // system variables change, we update our connection limits.
4633        let connection_limit_callback: Arc<dyn Fn(&SystemVars) + Send + Sync> =
4634            Arc::new(move |system_vars: &SystemVars| {
4635                let limit: u64 = system_vars.max_connections().cast_into();
4636                let superuser_reserved: u64 =
4637                    system_vars.superuser_reserved_connections().cast_into();
4638
4639                // If superuser_reserved > max_connections, prefer max_connections.
4640                //
4641                // In this scenario all normal users would be locked out because all connections
4642                // would be reserved for superusers so complain if this is the case.
4643                let superuser_reserved = if superuser_reserved >= limit {
4644                    tracing::warn!(
4645                        "superuser_reserved ({superuser_reserved}) is greater than max connections ({limit})!"
4646                    );
4647                    limit
4648                } else {
4649                    superuser_reserved
4650                };
4651
4652                (connection_limit_callback)(limit, superuser_reserved);
4653            });
4654        catalog.system_config_mut().register_callback(
4655            &mz_sql::session::vars::MAX_CONNECTIONS,
4656            Arc::clone(&connection_limit_callback),
4657        );
4658        catalog.system_config_mut().register_callback(
4659            &mz_sql::session::vars::SUPERUSER_RESERVED_CONNECTIONS,
4660            connection_limit_callback,
4661        );
4662
4663        let (group_commit_tx, group_commit_rx) = appends::notifier();
4664
4665        let parent_span = tracing::Span::current();
4666        let thread = thread::Builder::new()
4667            // The Coordinator thread tends to keep a lot of data on its stack. To
4668            // prevent a stack overflow we allocate a stack three times as big as the default
4669            // stack.
4670            .stack_size(3 * stack::STACK_SIZE)
4671            .name("coordinator".to_string())
4672            .spawn(move || {
4673                let span = info_span!(parent: parent_span, "coord::coordinator").entered();
4674
4675                let controller = handle
4676                    .block_on({
4677                        catalog.initialize_controller(
4678                            controller_config,
4679                            controller_envd_epoch,
4680                            read_only_controllers,
4681                        )
4682                    })
4683                    .unwrap_or_terminate("failed to initialize storage_controller");
4684                // Initializing the controller uses one or more timestamps, so push the boot timestamp up to the
4685                // current catalog upper.
4686                let catalog_upper = handle.block_on(catalog.current_upper());
4687                boot_ts = std::cmp::max(boot_ts, catalog_upper);
4688                if !read_only_controllers {
4689                    let epoch_millis_oracle = &timestamp_oracles
4690                        .get(&Timeline::EpochMilliseconds)
4691                        .expect("inserted above")
4692                        .oracle;
4693                    handle.block_on(epoch_millis_oracle.apply_write(boot_ts));
4694                }
4695
4696                let catalog = Arc::new(catalog);
4697
4698                let caching_secrets_reader = CachingSecretsReader::new(secrets_controller.reader());
4699                let mut coord = Coordinator {
4700                    controller,
4701                    catalog,
4702                    internal_cmd_tx,
4703                    group_commit_tx,
4704                    strict_serializable_reads_tx,
4705                    global_timelines: timestamp_oracles,
4706                    transient_id_gen: Arc::new(TransientIdGen::new()),
4707                    active_conns: BTreeMap::new(),
4708                    txn_read_holds: Default::default(),
4709                    pending_peeks: BTreeMap::new(),
4710                    client_pending_peeks: BTreeMap::new(),
4711                    pending_linearize_read_txns: BTreeMap::new(),
4712                    serialized_ddl: LockedVecDeque::new(),
4713                    active_compute_sinks: BTreeMap::new(),
4714                    active_webhooks: BTreeMap::new(),
4715                    active_copies: BTreeMap::new(),
4716                    connection_cancel_watches: BTreeMap::new(),
4717                    introspection_subscribes: BTreeMap::new(),
4718                    write_locks: BTreeMap::new(),
4719                    deferred_write_ops: BTreeMap::new(),
4720                    pending_writes: Vec::new(),
4721                    advance_timelines_interval,
4722                    secrets_controller,
4723                    caching_secrets_reader,
4724                    cloud_resource_controller,
4725                    storage_usage_client,
4726                    storage_usage_collection_interval,
4727                    segment_client,
4728                    metrics,
4729                    optimizer_metrics,
4730                    tracing_handle,
4731                    statement_logging: StatementLogging::new(coord_now.clone()),
4732                    webhook_concurrency_limit,
4733                    timestamp_oracle_config,
4734                    check_cluster_scheduling_policies_interval: check_scheduling_policies_interval,
4735                    cluster_scheduling_decisions: BTreeMap::new(),
4736                    caught_up_check_interval: clusters_caught_up_check_interval,
4737                    caught_up_check: clusters_caught_up_check,
4738                    installed_watch_sets: BTreeMap::new(),
4739                    connection_watch_sets: BTreeMap::new(),
4740                    cluster_replica_statuses: ClusterReplicaStatuses::new(),
4741                    read_only_controllers,
4742                    buffered_builtin_table_updates: Some(Vec::new()),
4743                    license_key,
4744                    user_id_pool: IdPool::empty(),
4745                    persist_client,
4746                };
4747                let bootstrap = handle.block_on(async {
4748                    coord
4749                        .bootstrap(
4750                            boot_ts,
4751                            migrated_storage_collections_0dt,
4752                            builtin_table_updates,
4753                            cached_global_exprs,
4754                            uncached_local_exprs,
4755                            audit_logs_iterator,
4756                        )
4757                        .await?;
4758                    coord
4759                        .controller
4760                        .remove_orphaned_replicas(
4761                            coord.catalog().get_next_user_replica_id().await?,
4762                            coord.catalog().get_next_system_replica_id().await?,
4763                        )
4764                        .await
4765                        .map_err(AdapterError::Orchestrator)?;
4766
4767                    if let Some(retention_period) = storage_usage_retention_period {
4768                        coord
4769                            .prune_storage_usage_events_on_startup(retention_period)
4770                            .await;
4771                    }
4772
4773                    coord.prune_arrangement_sizes_history_on_startup().await;
4774
4775                    Ok(())
4776                });
4777                let ok = bootstrap.is_ok();
4778                drop(span);
4779                bootstrap_tx
4780                    .send(bootstrap)
4781                    .expect("bootstrap_rx is not dropped until it receives this message");
4782                if ok {
4783                    handle.block_on(coord.serve(
4784                        internal_cmd_rx,
4785                        strict_serializable_reads_rx,
4786                        cmd_rx,
4787                        group_commit_rx,
4788                    ));
4789                }
4790            })
4791            .expect("failed to create coordinator thread");
4792        match bootstrap_rx
4793            .await
4794            .expect("bootstrap_tx always sends a message or panics/halts")
4795        {
4796            Ok(()) => {
4797                info!(
4798                    "startup: coordinator init: coordinator thread start complete in {:?}",
4799                    coord_thread_start.elapsed()
4800                );
4801                info!(
4802                    "startup: coordinator init: complete in {:?}",
4803                    coord_start.elapsed()
4804                );
4805                let handle = Handle {
4806                    session_id,
4807                    start_instant,
4808                    _thread: thread.join_on_drop(),
4809                };
4810                let client = Client::new(
4811                    build_info,
4812                    cmd_tx,
4813                    metrics_clone,
4814                    now,
4815                    environment_id,
4816                    segment_client_clone,
4817                );
4818                Ok((handle, client))
4819            }
4820            Err(e) => Err(e),
4821        }
4822    }
4823    .boxed()
4824}
4825
4826// Determines and returns the highest timestamp for each timeline, for all known
4827// timestamp oracle implementations.
4828//
4829// Initially, we did this so that we can switch between implementations of
4830// timestamp oracle, but now we also do this to determine a monotonic boot
4831// timestamp, a timestamp that does not regress across reboots.
4832//
4833// This mostly works, but there can be linearizability violations, because there
4834// is no central moment where we do distributed coordination for all oracle
4835// types. Working around this seems prohibitively hard, maybe even impossible so
4836// we have to live with this window of potential violations during the upgrade
4837// window (which is the only point where we should switch oracle
4838// implementations).
4839async fn get_initial_oracle_timestamps(
4840    timestamp_oracle_config: &Option<TimestampOracleConfig>,
4841) -> Result<BTreeMap<Timeline, Timestamp>, AdapterError> {
4842    let mut initial_timestamps = BTreeMap::new();
4843
4844    if let Some(config) = timestamp_oracle_config {
4845        let oracle_timestamps = config.get_all_timelines().await?;
4846
4847        let debug_msg = || {
4848            oracle_timestamps
4849                .iter()
4850                .map(|(timeline, ts)| format!("{:?} -> {}", timeline, ts))
4851                .join(", ")
4852        };
4853        info!(
4854            "current timestamps from the timestamp oracle: {}",
4855            debug_msg()
4856        );
4857
4858        for (timeline, ts) in oracle_timestamps {
4859            let entry = initial_timestamps
4860                .entry(Timeline::from_str(&timeline).expect("could not parse timeline"));
4861
4862            entry
4863                .and_modify(|current_ts| *current_ts = std::cmp::max(*current_ts, ts))
4864                .or_insert(ts);
4865        }
4866    } else {
4867        info!("no timestamp oracle configured!");
4868    };
4869
4870    let debug_msg = || {
4871        initial_timestamps
4872            .iter()
4873            .map(|(timeline, ts)| format!("{:?}: {}", timeline, ts))
4874            .join(", ")
4875    };
4876    info!("initial oracle timestamps: {}", debug_msg());
4877
4878    Ok(initial_timestamps)
4879}
4880
4881#[instrument]
4882pub async fn load_remote_system_parameters(
4883    storage: &mut Box<dyn OpenableDurableCatalogState>,
4884    system_parameter_sync_config: Option<SystemParameterSyncConfig>,
4885    system_parameter_sync_timeout: Duration,
4886) -> Result<Option<BTreeMap<String, String>>, AdapterError> {
4887    if let Some(system_parameter_sync_config) = system_parameter_sync_config {
4888        tracing::info!("parameter sync on boot: start sync");
4889
4890        // We intentionally block initial startup, potentially forever,
4891        // on initializing LaunchDarkly. This may seem scary, but the
4892        // alternative is even scarier. Over time, we expect that the
4893        // compiled-in default values for the system parameters will
4894        // drift substantially from the defaults configured in
4895        // LaunchDarkly, to the point that starting an environment
4896        // without loading the latest values from LaunchDarkly will
4897        // result in running an untested configuration.
4898        //
4899        // Note this only applies during initial startup. Restarting
4900        // after we've synced once only blocks for a maximum of
4901        // `FRONTEND_SYNC_TIMEOUT` on LaunchDarkly, as it seems
4902        // reasonable to assume that the last-synced configuration was
4903        // valid enough.
4904        //
4905        // This philosophy appears to provide a good balance between not
4906        // running untested configurations in production while also not
4907        // making LaunchDarkly a "tier 1" dependency for existing
4908        // environments.
4909        //
4910        // If this proves to be an issue, we could seek to address the
4911        // configuration drift in a different way--for example, by
4912        // writing a script that runs in CI nightly and checks for
4913        // deviation between the compiled Rust code and LaunchDarkly.
4914        //
4915        // If it is absolutely necessary to bring up a new environment
4916        // while LaunchDarkly is down, the following manual mitigation
4917        // can be performed:
4918        //
4919        //    1. Edit the environmentd startup parameters to omit the
4920        //       LaunchDarkly configuration.
4921        //    2. Boot environmentd.
4922        //    3. Use the catalog-debug tool to run `edit config "{\"key\":\"system_config_synced\"}" "{\"value\": 1}"`.
4923        //    4. Adjust any other parameters as necessary to avoid
4924        //       running a nonstandard configuration in production.
4925        //    5. Edit the environmentd startup parameters to restore the
4926        //       LaunchDarkly configuration, for when LaunchDarkly comes
4927        //       back online.
4928        //    6. Reboot environmentd.
4929        let mut params = SynchronizedParameters::new(SystemVars::default());
4930        let frontend_sync = async {
4931            let frontend = SystemParameterFrontend::from(&system_parameter_sync_config).await?;
4932            frontend.pull(&mut params);
4933            let ops = params
4934                .modified()
4935                .into_iter()
4936                .map(|param| {
4937                    let name = param.name;
4938                    let value = param.value;
4939                    tracing::info!(name, value, initial = true, "sync parameter");
4940                    (name, value)
4941                })
4942                .collect();
4943            tracing::info!("parameter sync on boot: end sync");
4944            Ok(Some(ops))
4945        };
4946        if !storage.has_system_config_synced_once().await? {
4947            frontend_sync.await
4948        } else {
4949            match mz_ore::future::timeout(system_parameter_sync_timeout, frontend_sync).await {
4950                Ok(ops) => Ok(ops),
4951                Err(TimeoutError::Inner(e)) => Err(e),
4952                Err(TimeoutError::DeadlineElapsed) => {
4953                    tracing::info!("parameter sync on boot: sync has timed out");
4954                    Ok(None)
4955                }
4956            }
4957        }
4958    } else {
4959        Ok(None)
4960    }
4961}
4962
4963#[derive(Debug)]
4964pub enum WatchSetResponse {
4965    StatementDependenciesReady(StatementLoggingId, StatementLifecycleEvent),
4966    AlterSinkReady(AlterSinkReadyContext),
4967    AlterMaterializedViewReady(AlterMaterializedViewReadyContext),
4968}
4969
4970#[derive(Debug)]
4971pub struct AlterSinkReadyContext {
4972    ctx: Option<ExecuteContext>,
4973    otel_ctx: OpenTelemetryContext,
4974    plan: AlterSinkPlan,
4975    plan_validity: PlanValidity,
4976    read_hold: ReadHolds,
4977}
4978
4979impl AlterSinkReadyContext {
4980    fn ctx(&mut self) -> &mut ExecuteContext {
4981        self.ctx.as_mut().expect("only cleared on drop")
4982    }
4983
4984    fn retire(mut self, result: Result<ExecuteResponse, AdapterError>) {
4985        self.ctx
4986            .take()
4987            .expect("only cleared on drop")
4988            .retire(result);
4989    }
4990}
4991
4992impl Drop for AlterSinkReadyContext {
4993    fn drop(&mut self) {
4994        if let Some(ctx) = self.ctx.take() {
4995            ctx.retire(Err(AdapterError::Canceled));
4996        }
4997    }
4998}
4999
5000#[derive(Debug)]
5001pub struct AlterMaterializedViewReadyContext {
5002    ctx: Option<ExecuteContext>,
5003    otel_ctx: OpenTelemetryContext,
5004    plan: plan::AlterMaterializedViewApplyReplacementPlan,
5005    plan_validity: PlanValidity,
5006}
5007
5008impl AlterMaterializedViewReadyContext {
5009    fn ctx(&mut self) -> &mut ExecuteContext {
5010        self.ctx.as_mut().expect("only cleared on drop")
5011    }
5012
5013    fn retire(mut self, result: Result<ExecuteResponse, AdapterError>) {
5014        self.ctx
5015            .take()
5016            .expect("only cleared on drop")
5017            .retire(result);
5018    }
5019}
5020
5021impl Drop for AlterMaterializedViewReadyContext {
5022    fn drop(&mut self) {
5023        if let Some(ctx) = self.ctx.take() {
5024            ctx.retire(Err(AdapterError::Canceled));
5025        }
5026    }
5027}
5028
5029/// A struct for tracking the ownership of a lock and a VecDeque to store to-be-done work after the
5030/// lock is freed.
5031#[derive(Debug)]
5032struct LockedVecDeque<T> {
5033    items: VecDeque<T>,
5034    lock: Arc<tokio::sync::Mutex<()>>,
5035}
5036
5037impl<T> LockedVecDeque<T> {
5038    pub fn new() -> Self {
5039        Self {
5040            items: VecDeque::new(),
5041            lock: Arc::new(tokio::sync::Mutex::new(())),
5042        }
5043    }
5044
5045    pub fn try_lock_owned(&self) -> Result<OwnedMutexGuard<()>, tokio::sync::TryLockError> {
5046        Arc::clone(&self.lock).try_lock_owned()
5047    }
5048
5049    pub fn is_empty(&self) -> bool {
5050        self.items.is_empty()
5051    }
5052
5053    pub fn push_back(&mut self, value: T) {
5054        self.items.push_back(value)
5055    }
5056
5057    pub fn pop_front(&mut self) -> Option<T> {
5058        self.items.pop_front()
5059    }
5060
5061    pub fn remove(&mut self, index: usize) -> Option<T> {
5062        self.items.remove(index)
5063    }
5064
5065    pub fn iter(&self) -> std::collections::vec_deque::Iter<'_, T> {
5066        self.items.iter()
5067    }
5068}
5069
5070#[derive(Debug)]
5071struct DeferredPlanStatement {
5072    ctx: ExecuteContext,
5073    ps: PlanStatement,
5074}
5075
5076#[derive(Debug)]
5077enum PlanStatement {
5078    Statement {
5079        stmt: Arc<Statement<Raw>>,
5080        params: Params,
5081    },
5082    Plan {
5083        plan: mz_sql::plan::Plan,
5084        resolved_ids: ResolvedIds,
5085        sql_impl_resolved_ids: ResolvedIds,
5086    },
5087}
5088
5089#[derive(Debug, Error)]
5090pub enum NetworkPolicyError {
5091    #[error("Access denied for address {0}")]
5092    AddressDenied(IpAddr),
5093    #[error("Access denied missing IP address")]
5094    MissingIp,
5095}
5096
5097pub(crate) fn validate_ip_with_policy_rules(
5098    ip: &IpAddr,
5099    rules: &Vec<NetworkPolicyRule>,
5100) -> Result<(), NetworkPolicyError> {
5101    // At the moment we're not handling action or direction
5102    // as those are only able to be "allow" and "ingress" respectively
5103    if rules.iter().any(|r| r.address.0.contains(ip)) {
5104        Ok(())
5105    } else {
5106        Err(NetworkPolicyError::AddressDenied(ip.clone()))
5107    }
5108}
5109
5110pub(crate) fn infer_sql_type_for_catalog(
5111    hir_expr: &HirRelationExpr,
5112    mir_expr: &MirRelationExpr,
5113) -> SqlRelationType {
5114    let mut typ = hir_expr.top_level_typ();
5115    typ.backport_nullability_and_keys(&mir_expr.typ());
5116    typ
5117}
5118
5119#[cfg(test)]
5120mod id_pool_tests {
5121    use super::IdPool;
5122
5123    #[mz_ore::test]
5124    fn test_empty_pool() {
5125        let mut pool = IdPool::empty();
5126        assert_eq!(pool.remaining(), 0);
5127        assert_eq!(pool.allocate(), None);
5128        assert_eq!(pool.allocate_many(1), None);
5129    }
5130
5131    #[mz_ore::test]
5132    fn test_allocate_single() {
5133        let mut pool = IdPool::empty();
5134        pool.refill(10, 13);
5135        assert_eq!(pool.remaining(), 3);
5136        assert_eq!(pool.allocate(), Some(10));
5137        assert_eq!(pool.allocate(), Some(11));
5138        assert_eq!(pool.allocate(), Some(12));
5139        assert_eq!(pool.remaining(), 0);
5140        assert_eq!(pool.allocate(), None);
5141    }
5142
5143    #[mz_ore::test]
5144    fn test_allocate_many() {
5145        let mut pool = IdPool::empty();
5146        pool.refill(100, 105);
5147        assert_eq!(pool.allocate_many(3), Some(vec![100, 101, 102]));
5148        assert_eq!(pool.remaining(), 2);
5149        // Not enough remaining for 3 more.
5150        assert_eq!(pool.allocate_many(3), None);
5151        // But 2 works.
5152        assert_eq!(pool.allocate_many(2), Some(vec![103, 104]));
5153        assert_eq!(pool.remaining(), 0);
5154    }
5155
5156    #[mz_ore::test]
5157    fn test_allocate_many_zero() {
5158        let mut pool = IdPool::empty();
5159        pool.refill(1, 5);
5160        assert_eq!(pool.allocate_many(0), Some(vec![]));
5161        assert_eq!(pool.remaining(), 4);
5162    }
5163
5164    #[mz_ore::test]
5165    fn test_refill_resets_pool() {
5166        let mut pool = IdPool::empty();
5167        pool.refill(0, 2);
5168        assert_eq!(pool.allocate(), Some(0));
5169        // Refill before exhaustion replaces the range.
5170        pool.refill(50, 52);
5171        assert_eq!(pool.allocate(), Some(50));
5172        assert_eq!(pool.allocate(), Some(51));
5173        assert_eq!(pool.allocate(), None);
5174    }
5175
5176    #[mz_ore::test]
5177    fn test_mixed_allocate_and_allocate_many() {
5178        let mut pool = IdPool::empty();
5179        pool.refill(0, 10);
5180        assert_eq!(pool.allocate(), Some(0));
5181        assert_eq!(pool.allocate_many(3), Some(vec![1, 2, 3]));
5182        assert_eq!(pool.allocate(), Some(4));
5183        assert_eq!(pool.remaining(), 5);
5184    }
5185
5186    #[mz_ore::test]
5187    #[should_panic(expected = "invalid pool range")]
5188    fn test_refill_invalid_range_panics() {
5189        let mut pool = IdPool::empty();
5190        pool.refill(10, 5);
5191    }
5192}
5193
5194#[cfg(test)]
5195mod arrangement_sizes_pruner_tests {
5196    use mz_repr::catalog_item_id::CatalogItemId;
5197    use mz_repr::{Datum, Row};
5198
5199    use super::arrangement_sizes_expired_retractions;
5200
5201    // Pack a row shaped like `mz_object_arrangement_size_history`: the pruner
5202    // only cares about column 3 (`collection_timestamp`), but we stuff the
5203    // other three columns with realistic values so shape changes would fail.
5204    fn history_row(ts_ms: i64) -> Row {
5205        let dt = mz_ore::now::to_datetime(ts_ms.try_into().expect("non-negative"));
5206        Row::pack_slice(&[
5207            Datum::String("r1"),
5208            Datum::String("u1"),
5209            Datum::Int64(123),
5210            Datum::TimestampTz(dt.try_into().expect("fits in TimestampTz")),
5211        ])
5212    }
5213
5214    fn item_id() -> CatalogItemId {
5215        // Any CatalogItemId will do; tests don't dispatch on it.
5216        CatalogItemId::User(42)
5217    }
5218
5219    #[mz_ore::test]
5220    fn empty_input_produces_no_retractions() {
5221        let out = arrangement_sizes_expired_retractions(Vec::new(), 1_000, item_id());
5222        assert!(out.is_empty());
5223    }
5224
5225    #[mz_ore::test]
5226    fn retracts_only_rows_strictly_before_cutoff() {
5227        // Mixes both sides of the filter and includes a row at exactly
5228        // the cutoff timestamp to pin down the strict-less-than boundary.
5229        let rows = vec![
5230            (history_row(100), 1),
5231            (history_row(500), 1),
5232            (history_row(1_000), 1), // at cutoff: kept (strict <)
5233            (history_row(5_000), 1),
5234        ];
5235        let out = arrangement_sizes_expired_retractions(rows, 1_000, item_id());
5236        assert_eq!(out.len(), 2);
5237    }
5238
5239    #[mz_ore::test]
5240    #[should_panic(expected = "consolidated contents should not contain retractions")]
5241    fn retraction_in_input_panics() {
5242        let rows = vec![(history_row(100), -1)];
5243        let _ = arrangement_sizes_expired_retractions(rows, 1_000, item_id());
5244    }
5245}