Skip to main content

mz_adapter/
coord.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Translation of SQL commands into timestamped `Controller` commands.
11//!
12//! The various SQL commands instruct the system to take actions that are not
13//! yet explicitly timestamped. On the other hand, the underlying data continually
14//! change as time moves forward. On the third hand, we greatly benefit from the
15//! information that some times are no longer of interest, so that we may
16//! compact the representation of the continually changing collections.
17//!
18//! The [`Coordinator`] curates these interactions by observing the progress
19//! collections make through time, choosing timestamps for its own commands,
20//! and eventually communicating that certain times have irretrievably "passed".
21//!
22//! ## Frontiers another way
23//!
24//! If the above description of frontiers left you with questions, this
25//! repackaged explanation might help.
26//!
27//! - `since` is the least recent time (i.e. oldest time) that you can read
28//!   from sources and be guaranteed that the returned data is accurate as of
29//!   that time.
30//!
31//!   Reads at times less than `since` may return values that were not actually
32//!   seen at the specified time, but arrived later (i.e. the results are
33//!   compacted).
34//!
35//!   For correctness' sake, the coordinator never chooses to read at a time
36//!   less than an arrangement's `since`.
37//!
38//! - `upper` is the first time after the most recent time that you can read
39//!   from sources and receive an immediate response. Alternately, it is the
40//!   least time at which the data may still change (that is the reason we may
41//!   not be able to respond immediately).
42//!
43//!   Reads at times >= `upper` may not immediately return because the answer
44//!   isn't known yet. However, once the `upper` is > the specified read time,
45//!   the read can return.
46//!
47//!   For the sake of returned values' freshness, the coordinator prefers
48//!   performing reads at an arrangement's `upper`. However, because we more
49//!   strongly prefer correctness, the coordinator will choose timestamps
50//!   greater than an object's `upper` if it is also being accessed alongside
51//!   objects whose `since` times are >= its `upper`.
52//!
53//! This illustration attempts to show, with time moving left to right, the
54//! relationship between `since` and `upper`.
55//!
56//! - `#`: possibly inaccurate results
57//! - `-`: immediate, correct response
58//! - `?`: not yet known
59//! - `s`: since
60//! - `u`: upper
61//! - `|`: eligible for coordinator to select
62//!
63//! ```nofmt
64//! ####s----u?????
65//!     |||||||||||
66//! ```
67//!
68
69use std::borrow::Cow;
70use std::collections::{BTreeMap, BTreeSet, VecDeque};
71use std::net::IpAddr;
72use std::num::NonZeroI64;
73use std::ops::Neg;
74use std::str::FromStr;
75use std::sync::LazyLock;
76use std::sync::{Arc, Mutex};
77use std::thread;
78use std::time::{Duration, Instant};
79use std::{fmt, mem};
80
81use anyhow::Context;
82use chrono::{DateTime, Utc};
83use derivative::Derivative;
84use differential_dataflow::lattice::Lattice;
85use fail::fail_point;
86use futures::StreamExt;
87use futures::future::{BoxFuture, FutureExt, LocalBoxFuture};
88use http::Uri;
89use ipnet::IpNet;
90use itertools::{Either, Itertools};
91use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
92use mz_adapter_types::compaction::CompactionWindow;
93use mz_adapter_types::connection::ConnectionId;
94use mz_adapter_types::dyncfgs::{
95    USER_ID_POOL_BATCH_SIZE, WITH_0DT_DEPLOYMENT_CAUGHT_UP_CHECK_INTERVAL,
96};
97use mz_auth::password::Password;
98use mz_build_info::BuildInfo;
99use mz_catalog::builtin::{BUILTINS, BUILTINS_STATIC, MZ_AUDIT_EVENTS, MZ_STORAGE_USAGE_BY_SHARD};
100use mz_catalog::config::{AwsPrincipalContext, BuiltinItemMigrationConfig, ClusterReplicaSizeMap};
101use mz_catalog::durable::{AuditLogIterator, OpenableDurableCatalogState};
102use mz_catalog::expr_cache::{GlobalExpressions, LocalExpressions};
103use mz_catalog::memory::objects::{
104    CatalogEntry, CatalogItem, ClusterReplicaProcessStatus, ClusterVariantManaged, Connection,
105    DataSourceDesc, StateDiff, StateUpdate, StateUpdateKind, Table, TableDataSource,
106};
107use mz_cloud_resources::{CloudResourceController, VpcEndpointConfig, VpcEndpointEvent};
108use mz_compute_client::as_of_selection;
109use mz_compute_client::controller::error::{
110    CollectionLookupError, CollectionMissing, DataflowCreationError, InstanceMissing,
111};
112use mz_compute_types::ComputeInstanceId;
113use mz_compute_types::dataflows::DataflowDescription;
114use mz_compute_types::plan::Plan;
115use mz_controller::clusters::{
116    ClusterConfig, ClusterEvent, ClusterStatus, ProcessId, ReplicaLocation,
117};
118use mz_controller::{ControllerConfig, Readiness};
119use mz_controller_types::{ClusterId, ReplicaId, WatchSetId};
120use mz_expr::{MapFilterProject, MirRelationExpr, OptimizedMirRelationExpr, RowSetFinishing};
121use mz_license_keys::ValidatedLicenseKey;
122use mz_orchestrator::OfflineReason;
123use mz_ore::cast::{CastFrom, CastInto, CastLossy};
124use mz_ore::channel::trigger::Trigger;
125use mz_ore::future::TimeoutError;
126use mz_ore::metrics::MetricsRegistry;
127use mz_ore::now::{EpochMillis, NowFn};
128use mz_ore::task::{JoinHandle, spawn};
129use mz_ore::thread::JoinHandleExt;
130use mz_ore::tracing::{OpenTelemetryContext, TracingHandle};
131use mz_ore::url::SensitiveUrl;
132use mz_ore::{
133    assert_none, instrument, soft_assert_eq_or_log, soft_assert_or_log, soft_panic_or_log, stack,
134};
135use mz_persist_client::PersistClient;
136use mz_persist_client::batch::ProtoBatch;
137use mz_persist_client::usage::{ShardsUsageReferenced, StorageUsageClient};
138use mz_repr::adt::numeric::Numeric;
139use mz_repr::explain::{ExplainConfig, ExplainFormat};
140use mz_repr::global_id::TransientIdGen;
141use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
142use mz_repr::role_id::RoleId;
143use mz_repr::{CatalogItemId, Diff, GlobalId, RelationDesc, SqlRelationType, Timestamp};
144use mz_secrets::cache::CachingSecretsReader;
145use mz_secrets::{SecretsController, SecretsReader};
146use mz_sql::ast::{Raw, Statement};
147use mz_sql::catalog::{CatalogCluster, EnvironmentId};
148use mz_sql::names::{QualifiedItemName, ResolvedIds, SchemaSpecifier};
149use mz_sql::optimizer_metrics::OptimizerMetrics;
150use mz_sql::plan::{
151    self, AlterSinkPlan, ConnectionDetails, CreateConnectionPlan, HirRelationExpr,
152    NetworkPolicyRule, OnTimeoutAction, Params, QueryWhen,
153};
154use mz_sql::session::user::User;
155use mz_sql::session::vars::{MAX_CREDIT_CONSUMPTION_RATE, SystemVars, Var};
156use mz_sql_parser::ast::ExplainStage;
157use mz_sql_parser::ast::display::AstDisplay;
158use mz_storage_client::client::TableData;
159use mz_storage_client::controller::{CollectionDescription, DataSource, ExportDescription};
160use mz_storage_types::connections::Connection as StorageConnection;
161use mz_storage_types::connections::ConnectionContext;
162use mz_storage_types::connections::inline::{IntoInlineConnection, ReferencedConnection};
163use mz_storage_types::read_holds::ReadHold;
164use mz_storage_types::sinks::{S3SinkFormat, StorageSinkDesc};
165use mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC;
166use mz_storage_types::sources::{IngestionDescription, SourceExport, Timeline};
167use mz_timestamp_oracle::{TimestampOracleConfig, WriteTimestamp};
168use mz_transform::dataflow::DataflowMetainfo;
169use opentelemetry::trace::TraceContextExt;
170use serde::Serialize;
171use thiserror::Error;
172use timely::progress::{Antichain, Timestamp as _};
173use tokio::runtime::Handle as TokioHandle;
174use tokio::select;
175use tokio::sync::{OwnedMutexGuard, mpsc, oneshot, watch};
176use tokio::time::{Interval, MissedTickBehavior};
177use tracing::{Instrument, Level, Span, debug, info, info_span, span, warn};
178use tracing_opentelemetry::OpenTelemetrySpanExt;
179use uuid::Uuid;
180
181use crate::active_compute_sink::{ActiveComputeSink, ActiveCopyFrom};
182use crate::catalog::{BuiltinTableUpdate, Catalog, OpenCatalogResult};
183use crate::client::{Client, Handle};
184use crate::command::{Command, ExecuteResponse};
185use crate::config::{SynchronizedParameters, SystemParameterFrontend, SystemParameterSyncConfig};
186use crate::coord::appends::{
187    BuiltinTableAppendNotify, DeferredOp, GroupCommitPermit, PendingWriteTxn,
188};
189use crate::coord::caught_up::CaughtUpCheckContext;
190use crate::coord::cluster_scheduling::SchedulingDecision;
191use crate::coord::id_bundle::CollectionIdBundle;
192use crate::coord::introspection::IntrospectionSubscribe;
193use crate::coord::peek::PendingPeek;
194use crate::coord::statement_logging::StatementLogging;
195use crate::coord::timeline::{TimelineContext, TimelineState};
196use crate::coord::timestamp_selection::{TimestampContext, TimestampDetermination};
197use crate::coord::validity::PlanValidity;
198use crate::error::AdapterError;
199use crate::explain::insights::PlanInsightsContext;
200use crate::explain::optimizer_trace::{DispatchGuard, OptimizerTrace};
201use crate::metrics::Metrics;
202use crate::optimize::dataflows::{ComputeInstanceSnapshot, DataflowBuilder};
203use crate::optimize::{self, Optimize, OptimizerConfig};
204use crate::session::{EndTransactionAction, Session};
205use crate::statement_logging::{
206    StatementEndedExecutionReason, StatementLifecycleEvent, StatementLoggingId,
207};
208use crate::util::{ClientTransmitter, ResultExt, sort_topological};
209use crate::webhook::{WebhookAppenderInvalidator, WebhookConcurrencyLimiter};
210use crate::{AdapterNotice, ReadHolds, flags};
211
212pub(crate) mod appends;
213pub(crate) mod catalog_serving;
214pub(crate) mod cluster_scheduling;
215pub(crate) mod consistency;
216pub(crate) mod id_bundle;
217pub(crate) mod in_memory_oracle;
218pub(crate) mod peek;
219pub(crate) mod read_policy;
220pub(crate) mod sequencer;
221pub(crate) mod statement_logging;
222pub(crate) mod timeline;
223pub(crate) mod timestamp_selection;
224
225pub mod catalog_implications;
226mod caught_up;
227mod command_handler;
228mod ddl;
229mod indexes;
230mod introspection;
231mod message_handler;
232mod privatelink_status;
233mod sql;
234mod validity;
235
236/// A pool of pre-allocated user IDs to avoid per-DDL persist writes.
237///
238/// IDs in the range `[next, upper)` are available for allocation.
239/// When exhausted, the pool must be refilled via the catalog.
240///
241/// # Correctness
242///
243/// The pool is owned by [`Coordinator`], which processes all requests
244/// on a single-threaded event loop. Because every access requires
245/// `&mut self` on the coordinator, there is no concurrent access to the
246/// pool — no additional synchronization is needed.
247///
248/// Global ID uniqueness is guaranteed because each refill calls
249/// [`Catalog::allocate_user_ids`], which performs a durable persist
250/// write that atomically reserves the entire batch before any IDs from
251/// it are handed out. If the process crashes after a refill but before
252/// all pre-allocated IDs are consumed, the unused IDs form harmless
253/// gaps in the sequence — user IDs are not required to be contiguous.
254///
255/// This guarantee holds even if multiple `environmentd` processes run
256/// concurrently. Each process has its own independent pool,
257/// but every refill goes through the shared persist-backed catalog,
258/// which serializes allocations across all callers. Two processes
259/// will therefore never receive overlapping ID ranges,
260/// for the same reason they could not before this pool existed.
261#[derive(Debug)]
262pub(crate) struct IdPool {
263    next: u64,
264    upper: u64,
265}
266
267impl IdPool {
268    /// Creates an empty pool.
269    pub fn empty() -> Self {
270        IdPool { next: 0, upper: 0 }
271    }
272
273    /// Allocates a single ID from the pool, returning `None` if exhausted.
274    pub fn allocate(&mut self) -> Option<u64> {
275        if self.next < self.upper {
276            let id = self.next;
277            self.next += 1;
278            Some(id)
279        } else {
280            None
281        }
282    }
283
284    /// Allocates `n` consecutive IDs from the pool, returning `None` if
285    /// insufficient IDs remain.
286    pub fn allocate_many(&mut self, n: u64) -> Option<Vec<u64>> {
287        if self.remaining() >= n {
288            let ids = (self.next..self.next + n).collect();
289            self.next += n;
290            Some(ids)
291        } else {
292            None
293        }
294    }
295
296    /// Returns the number of IDs remaining in the pool.
297    pub fn remaining(&self) -> u64 {
298        self.upper - self.next
299    }
300
301    /// Refills the pool with the given range `[next, upper)`.
302    pub fn refill(&mut self, next: u64, upper: u64) {
303        assert!(next <= upper, "invalid pool range: {next}..{upper}");
304        self.next = next;
305        self.upper = upper;
306    }
307}
308
309#[derive(Debug)]
310pub enum Message {
311    Command(OpenTelemetryContext, Command),
312    ControllerReady {
313        controller: ControllerReadiness,
314    },
315    PurifiedStatementReady(PurifiedStatementReady),
316    CreateConnectionValidationReady(CreateConnectionValidationReady),
317    AlterConnectionValidationReady(AlterConnectionValidationReady),
318    TryDeferred {
319        /// The connection that created this op.
320        conn_id: ConnectionId,
321        /// The write lock that notified us our deferred op might be able to run.
322        ///
323        /// Note: While we never want to hold a partial set of locks, it can be important to hold
324        /// onto the _one_ that notified us our op might be ready. If there are multiple operations
325        /// waiting on a single collection, and we don't hold this lock through retyring the op,
326        /// then everything waiting on this collection will get retried causing traffic in the
327        /// Coordinator's message queue.
328        ///
329        /// See [`DeferredOp::can_be_optimistically_retried`] for more detail.
330        acquired_lock: Option<(CatalogItemId, tokio::sync::OwnedMutexGuard<()>)>,
331    },
332    /// Initiates a group commit.
333    GroupCommitInitiate(Span, Option<GroupCommitPermit>),
334    DeferredStatementReady,
335    AdvanceTimelines,
336    ClusterEvent(ClusterEvent),
337    CancelPendingPeeks {
338        conn_id: ConnectionId,
339    },
340    LinearizeReads,
341    StagedBatches {
342        conn_id: ConnectionId,
343        table_id: CatalogItemId,
344        batches: Vec<Result<ProtoBatch, String>>,
345    },
346    StorageUsageSchedule,
347    StorageUsageFetch,
348    StorageUsageUpdate(ShardsUsageReferenced),
349    StorageUsagePrune(Vec<BuiltinTableUpdate>),
350    /// Performs any cleanup and logging actions necessary for
351    /// finalizing a statement execution.
352    RetireExecute {
353        data: ExecuteContextExtra,
354        otel_ctx: OpenTelemetryContext,
355        reason: StatementEndedExecutionReason,
356    },
357    ExecuteSingleStatementTransaction {
358        ctx: ExecuteContext,
359        otel_ctx: OpenTelemetryContext,
360        stmt: Arc<Statement<Raw>>,
361        params: mz_sql::plan::Params,
362    },
363    PeekStageReady {
364        ctx: ExecuteContext,
365        span: Span,
366        stage: PeekStage,
367    },
368    CreateIndexStageReady {
369        ctx: ExecuteContext,
370        span: Span,
371        stage: CreateIndexStage,
372    },
373    CreateViewStageReady {
374        ctx: ExecuteContext,
375        span: Span,
376        stage: CreateViewStage,
377    },
378    CreateMaterializedViewStageReady {
379        ctx: ExecuteContext,
380        span: Span,
381        stage: CreateMaterializedViewStage,
382    },
383    SubscribeStageReady {
384        ctx: ExecuteContext,
385        span: Span,
386        stage: SubscribeStage,
387    },
388    IntrospectionSubscribeStageReady {
389        span: Span,
390        stage: IntrospectionSubscribeStage,
391    },
392    SecretStageReady {
393        ctx: ExecuteContext,
394        span: Span,
395        stage: SecretStage,
396    },
397    ClusterStageReady {
398        ctx: ExecuteContext,
399        span: Span,
400        stage: ClusterStage,
401    },
402    ExplainTimestampStageReady {
403        ctx: ExecuteContext,
404        span: Span,
405        stage: ExplainTimestampStage,
406    },
407    DrainStatementLog,
408    PrivateLinkVpcEndpointEvents(Vec<VpcEndpointEvent>),
409    CheckSchedulingPolicies,
410
411    /// Scheduling policy decisions about turning clusters On/Off.
412    /// `Vec<(policy name, Vec of decisions by the policy)>`
413    /// A cluster will be On if and only if there is at least one On decision for it.
414    /// Scheduling decisions for clusters that have `SCHEDULE = MANUAL` are ignored.
415    SchedulingDecisions(Vec<(&'static str, Vec<(ClusterId, SchedulingDecision)>)>),
416}
417
418impl Message {
419    /// Returns a string to identify the kind of [`Message`], useful for logging.
420    pub const fn kind(&self) -> &'static str {
421        match self {
422            Message::Command(_, msg) => match msg {
423                Command::CatalogSnapshot { .. } => "command-catalog_snapshot",
424                Command::Startup { .. } => "command-startup",
425                Command::Execute { .. } => "command-execute",
426                Command::Commit { .. } => "command-commit",
427                Command::CancelRequest { .. } => "command-cancel_request",
428                Command::PrivilegedCancelRequest { .. } => "command-privileged_cancel_request",
429                Command::GetWebhook { .. } => "command-get_webhook",
430                Command::GetSystemVars { .. } => "command-get_system_vars",
431                Command::SetSystemVars { .. } => "command-set_system_vars",
432                Command::Terminate { .. } => "command-terminate",
433                Command::RetireExecute { .. } => "command-retire_execute",
434                Command::CheckConsistency { .. } => "command-check_consistency",
435                Command::Dump { .. } => "command-dump",
436                Command::AuthenticatePassword { .. } => "command-auth_check",
437                Command::AuthenticateGetSASLChallenge { .. } => "command-auth_get_sasl_challenge",
438                Command::AuthenticateVerifySASLProof { .. } => "command-auth_verify_sasl_proof",
439                Command::CheckRoleCanLogin { .. } => "command-check_role_can_login",
440                Command::GetComputeInstanceClient { .. } => "get-compute-instance-client",
441                Command::GetOracle { .. } => "get-oracle",
442                Command::DetermineRealTimeRecentTimestamp { .. } => {
443                    "determine-real-time-recent-timestamp"
444                }
445                Command::GetTransactionReadHoldsBundle { .. } => {
446                    "get-transaction-read-holds-bundle"
447                }
448                Command::StoreTransactionReadHolds { .. } => "store-transaction-read-holds",
449                Command::ExecuteSlowPathPeek { .. } => "execute-slow-path-peek",
450                Command::ExecuteSubscribe { .. } => "execute-subscribe",
451                Command::CopyToPreflight { .. } => "copy-to-preflight",
452                Command::ExecuteCopyTo { .. } => "execute-copy-to",
453                Command::ExecuteSideEffectingFunc { .. } => "execute-side-effecting-func",
454                Command::RegisterFrontendPeek { .. } => "register-frontend-peek",
455                Command::UnregisterFrontendPeek { .. } => "unregister-frontend-peek",
456                Command::ExplainTimestamp { .. } => "explain-timestamp",
457                Command::FrontendStatementLogging(..) => "frontend-statement-logging",
458                Command::StartCopyFromStdin { .. } => "start-copy-from-stdin",
459                Command::InjectAuditEvents { .. } => "inject-audit-events",
460            },
461            Message::ControllerReady {
462                controller: ControllerReadiness::Compute,
463            } => "controller_ready(compute)",
464            Message::ControllerReady {
465                controller: ControllerReadiness::Storage,
466            } => "controller_ready(storage)",
467            Message::ControllerReady {
468                controller: ControllerReadiness::Metrics,
469            } => "controller_ready(metrics)",
470            Message::ControllerReady {
471                controller: ControllerReadiness::Internal,
472            } => "controller_ready(internal)",
473            Message::PurifiedStatementReady(_) => "purified_statement_ready",
474            Message::CreateConnectionValidationReady(_) => "create_connection_validation_ready",
475            Message::TryDeferred { .. } => "try_deferred",
476            Message::GroupCommitInitiate(..) => "group_commit_initiate",
477            Message::AdvanceTimelines => "advance_timelines",
478            Message::ClusterEvent(_) => "cluster_event",
479            Message::CancelPendingPeeks { .. } => "cancel_pending_peeks",
480            Message::LinearizeReads => "linearize_reads",
481            Message::StagedBatches { .. } => "staged_batches",
482            Message::StorageUsageSchedule => "storage_usage_schedule",
483            Message::StorageUsageFetch => "storage_usage_fetch",
484            Message::StorageUsageUpdate(_) => "storage_usage_update",
485            Message::StorageUsagePrune(_) => "storage_usage_prune",
486            Message::RetireExecute { .. } => "retire_execute",
487            Message::ExecuteSingleStatementTransaction { .. } => {
488                "execute_single_statement_transaction"
489            }
490            Message::PeekStageReady { .. } => "peek_stage_ready",
491            Message::ExplainTimestampStageReady { .. } => "explain_timestamp_stage_ready",
492            Message::CreateIndexStageReady { .. } => "create_index_stage_ready",
493            Message::CreateViewStageReady { .. } => "create_view_stage_ready",
494            Message::CreateMaterializedViewStageReady { .. } => {
495                "create_materialized_view_stage_ready"
496            }
497            Message::SubscribeStageReady { .. } => "subscribe_stage_ready",
498            Message::IntrospectionSubscribeStageReady { .. } => {
499                "introspection_subscribe_stage_ready"
500            }
501            Message::SecretStageReady { .. } => "secret_stage_ready",
502            Message::ClusterStageReady { .. } => "cluster_stage_ready",
503            Message::DrainStatementLog => "drain_statement_log",
504            Message::AlterConnectionValidationReady(..) => "alter_connection_validation_ready",
505            Message::PrivateLinkVpcEndpointEvents(_) => "private_link_vpc_endpoint_events",
506            Message::CheckSchedulingPolicies => "check_scheduling_policies",
507            Message::SchedulingDecisions { .. } => "scheduling_decision",
508            Message::DeferredStatementReady => "deferred_statement_ready",
509        }
510    }
511}
512
513/// The reason for why a controller needs processing on the main loop.
514#[derive(Debug)]
515pub enum ControllerReadiness {
516    /// The storage controller is ready.
517    Storage,
518    /// The compute controller is ready.
519    Compute,
520    /// A batch of metric data is ready.
521    Metrics,
522    /// An internally-generated message is ready to be returned.
523    Internal,
524}
525
526#[derive(Derivative)]
527#[derivative(Debug)]
528pub struct BackgroundWorkResult<T> {
529    #[derivative(Debug = "ignore")]
530    pub ctx: ExecuteContext,
531    pub result: Result<T, AdapterError>,
532    pub params: Params,
533    pub plan_validity: PlanValidity,
534    pub original_stmt: Arc<Statement<Raw>>,
535    pub otel_ctx: OpenTelemetryContext,
536}
537
538pub type PurifiedStatementReady = BackgroundWorkResult<mz_sql::pure::PurifiedStatement>;
539
540#[derive(Derivative)]
541#[derivative(Debug)]
542pub struct ValidationReady<T> {
543    #[derivative(Debug = "ignore")]
544    pub ctx: ExecuteContext,
545    pub result: Result<T, AdapterError>,
546    pub resolved_ids: ResolvedIds,
547    pub connection_id: CatalogItemId,
548    pub connection_gid: GlobalId,
549    pub plan_validity: PlanValidity,
550    pub otel_ctx: OpenTelemetryContext,
551}
552
553pub type CreateConnectionValidationReady = ValidationReady<CreateConnectionPlan>;
554pub type AlterConnectionValidationReady = ValidationReady<Connection>;
555
556#[derive(Debug)]
557pub enum PeekStage {
558    /// Common stages across SELECT, EXPLAIN and COPY TO queries.
559    LinearizeTimestamp(PeekStageLinearizeTimestamp),
560    RealTimeRecency(PeekStageRealTimeRecency),
561    TimestampReadHold(PeekStageTimestampReadHold),
562    Optimize(PeekStageOptimize),
563    /// Final stage for a peek.
564    Finish(PeekStageFinish),
565    /// Final stage for an explain.
566    ExplainPlan(PeekStageExplainPlan),
567    ExplainPushdown(PeekStageExplainPushdown),
568    /// Preflight checks for a copy to operation.
569    CopyToPreflight(PeekStageCopyTo),
570    /// Final stage for a copy to which involves shipping the dataflow.
571    CopyToDataflow(PeekStageCopyTo),
572}
573
574#[derive(Debug)]
575pub struct CopyToContext {
576    /// The `RelationDesc` of the data to be copied.
577    pub desc: RelationDesc,
578    /// The destination uri of the external service where the data will be copied.
579    pub uri: Uri,
580    /// Connection information required to connect to the external service to copy the data.
581    pub connection: StorageConnection<ReferencedConnection>,
582    /// The ID of the CONNECTION object to be used for copying the data.
583    pub connection_id: CatalogItemId,
584    /// Format params to format the data.
585    pub format: S3SinkFormat,
586    /// Approximate max file size of each uploaded file.
587    pub max_file_size: u64,
588    /// Number of batches the output of the COPY TO will be partitioned into
589    /// to distribute the load across workers deterministically.
590    /// This is only an option since it's not set when CopyToContext is instantiated
591    /// but immediately after in the PeekStageValidate stage.
592    pub output_batch_count: Option<u64>,
593}
594
595#[derive(Debug)]
596pub struct PeekStageLinearizeTimestamp {
597    validity: PlanValidity,
598    plan: mz_sql::plan::SelectPlan,
599    max_query_result_size: Option<u64>,
600    source_ids: BTreeSet<GlobalId>,
601    target_replica: Option<ReplicaId>,
602    timeline_context: TimelineContext,
603    optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
604    /// An optional context set iff the state machine is initiated from
605    /// sequencing an EXPLAIN for this statement.
606    explain_ctx: ExplainContext,
607}
608
609#[derive(Debug)]
610pub struct PeekStageRealTimeRecency {
611    validity: PlanValidity,
612    plan: mz_sql::plan::SelectPlan,
613    max_query_result_size: Option<u64>,
614    source_ids: BTreeSet<GlobalId>,
615    target_replica: Option<ReplicaId>,
616    timeline_context: TimelineContext,
617    oracle_read_ts: Option<Timestamp>,
618    optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
619    /// An optional context set iff the state machine is initiated from
620    /// sequencing an EXPLAIN for this statement.
621    explain_ctx: ExplainContext,
622}
623
624#[derive(Debug)]
625pub struct PeekStageTimestampReadHold {
626    validity: PlanValidity,
627    plan: mz_sql::plan::SelectPlan,
628    max_query_result_size: Option<u64>,
629    source_ids: BTreeSet<GlobalId>,
630    target_replica: Option<ReplicaId>,
631    timeline_context: TimelineContext,
632    oracle_read_ts: Option<Timestamp>,
633    real_time_recency_ts: Option<mz_repr::Timestamp>,
634    optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
635    /// An optional context set iff the state machine is initiated from
636    /// sequencing an EXPLAIN for this statement.
637    explain_ctx: ExplainContext,
638}
639
640#[derive(Debug)]
641pub struct PeekStageOptimize {
642    validity: PlanValidity,
643    plan: mz_sql::plan::SelectPlan,
644    max_query_result_size: Option<u64>,
645    source_ids: BTreeSet<GlobalId>,
646    id_bundle: CollectionIdBundle,
647    target_replica: Option<ReplicaId>,
648    determination: TimestampDetermination,
649    optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
650    /// An optional context set iff the state machine is initiated from
651    /// sequencing an EXPLAIN for this statement.
652    explain_ctx: ExplainContext,
653}
654
655#[derive(Debug)]
656pub struct PeekStageFinish {
657    validity: PlanValidity,
658    plan: mz_sql::plan::SelectPlan,
659    max_query_result_size: Option<u64>,
660    id_bundle: CollectionIdBundle,
661    target_replica: Option<ReplicaId>,
662    source_ids: BTreeSet<GlobalId>,
663    determination: TimestampDetermination,
664    cluster_id: ComputeInstanceId,
665    finishing: RowSetFinishing,
666    /// When present, an optimizer trace to be used for emitting a plan insights
667    /// notice.
668    plan_insights_optimizer_trace: Option<OptimizerTrace>,
669    insights_ctx: Option<Box<PlanInsightsContext>>,
670    global_lir_plan: optimize::peek::GlobalLirPlan,
671    optimization_finished_at: EpochMillis,
672}
673
674#[derive(Debug)]
675pub struct PeekStageCopyTo {
676    validity: PlanValidity,
677    optimizer: optimize::copy_to::Optimizer,
678    global_lir_plan: optimize::copy_to::GlobalLirPlan,
679    optimization_finished_at: EpochMillis,
680    source_ids: BTreeSet<GlobalId>,
681}
682
683#[derive(Debug)]
684pub struct PeekStageExplainPlan {
685    validity: PlanValidity,
686    optimizer: optimize::peek::Optimizer,
687    df_meta: DataflowMetainfo,
688    explain_ctx: ExplainPlanContext,
689    insights_ctx: Option<Box<PlanInsightsContext>>,
690}
691
692#[derive(Debug)]
693pub struct PeekStageExplainPushdown {
694    validity: PlanValidity,
695    determination: TimestampDetermination,
696    imports: BTreeMap<GlobalId, MapFilterProject>,
697}
698
699#[derive(Debug)]
700pub enum CreateIndexStage {
701    Optimize(CreateIndexOptimize),
702    Finish(CreateIndexFinish),
703    Explain(CreateIndexExplain),
704}
705
706#[derive(Debug)]
707pub struct CreateIndexOptimize {
708    validity: PlanValidity,
709    plan: plan::CreateIndexPlan,
710    resolved_ids: ResolvedIds,
711    /// An optional context set iff the state machine is initiated from
712    /// sequencing an EXPLAIN for this statement.
713    explain_ctx: ExplainContext,
714}
715
716#[derive(Debug)]
717pub struct CreateIndexFinish {
718    validity: PlanValidity,
719    item_id: CatalogItemId,
720    global_id: GlobalId,
721    plan: plan::CreateIndexPlan,
722    resolved_ids: ResolvedIds,
723    global_mir_plan: optimize::index::GlobalMirPlan,
724    global_lir_plan: optimize::index::GlobalLirPlan,
725    optimizer_features: OptimizerFeatures,
726}
727
728#[derive(Debug)]
729pub struct CreateIndexExplain {
730    validity: PlanValidity,
731    exported_index_id: GlobalId,
732    plan: plan::CreateIndexPlan,
733    df_meta: DataflowMetainfo,
734    explain_ctx: ExplainPlanContext,
735}
736
737#[derive(Debug)]
738pub enum CreateViewStage {
739    Optimize(CreateViewOptimize),
740    Finish(CreateViewFinish),
741    Explain(CreateViewExplain),
742}
743
744#[derive(Debug)]
745pub struct CreateViewOptimize {
746    validity: PlanValidity,
747    plan: plan::CreateViewPlan,
748    resolved_ids: ResolvedIds,
749    /// An optional context set iff the state machine is initiated from
750    /// sequencing an EXPLAIN for this statement.
751    explain_ctx: ExplainContext,
752}
753
754#[derive(Debug)]
755pub struct CreateViewFinish {
756    validity: PlanValidity,
757    /// ID of this item in the Catalog.
758    item_id: CatalogItemId,
759    /// ID by with Compute will reference this View.
760    global_id: GlobalId,
761    plan: plan::CreateViewPlan,
762    /// IDs of objects resolved during name resolution.
763    resolved_ids: ResolvedIds,
764    optimized_expr: OptimizedMirRelationExpr,
765}
766
767#[derive(Debug)]
768pub struct CreateViewExplain {
769    validity: PlanValidity,
770    id: GlobalId,
771    plan: plan::CreateViewPlan,
772    explain_ctx: ExplainPlanContext,
773}
774
775#[derive(Debug)]
776pub enum ExplainTimestampStage {
777    Optimize(ExplainTimestampOptimize),
778    RealTimeRecency(ExplainTimestampRealTimeRecency),
779    Finish(ExplainTimestampFinish),
780}
781
782#[derive(Debug)]
783pub struct ExplainTimestampOptimize {
784    validity: PlanValidity,
785    plan: plan::ExplainTimestampPlan,
786    cluster_id: ClusterId,
787}
788
789#[derive(Debug)]
790pub struct ExplainTimestampRealTimeRecency {
791    validity: PlanValidity,
792    format: ExplainFormat,
793    optimized_plan: OptimizedMirRelationExpr,
794    cluster_id: ClusterId,
795    when: QueryWhen,
796}
797
798#[derive(Debug)]
799pub struct ExplainTimestampFinish {
800    validity: PlanValidity,
801    format: ExplainFormat,
802    optimized_plan: OptimizedMirRelationExpr,
803    cluster_id: ClusterId,
804    source_ids: BTreeSet<GlobalId>,
805    when: QueryWhen,
806    real_time_recency_ts: Option<Timestamp>,
807}
808
809#[derive(Debug)]
810pub enum ClusterStage {
811    Alter(AlterCluster),
812    WaitForHydrated(AlterClusterWaitForHydrated),
813    Finalize(AlterClusterFinalize),
814}
815
816#[derive(Debug)]
817pub struct AlterCluster {
818    validity: PlanValidity,
819    plan: plan::AlterClusterPlan,
820}
821
822#[derive(Debug)]
823pub struct AlterClusterWaitForHydrated {
824    validity: PlanValidity,
825    plan: plan::AlterClusterPlan,
826    new_config: ClusterVariantManaged,
827    timeout_time: Instant,
828    on_timeout: OnTimeoutAction,
829}
830
831#[derive(Debug)]
832pub struct AlterClusterFinalize {
833    validity: PlanValidity,
834    plan: plan::AlterClusterPlan,
835    new_config: ClusterVariantManaged,
836}
837
838#[derive(Debug)]
839pub enum ExplainContext {
840    /// The ordinary, non-explain variant of the statement.
841    None,
842    /// The `EXPLAIN <level> PLAN FOR <explainee>` version of the statement.
843    Plan(ExplainPlanContext),
844    /// Generate a notice containing the `EXPLAIN PLAN INSIGHTS` output
845    /// alongside the query's normal output.
846    PlanInsightsNotice(OptimizerTrace),
847    /// `EXPLAIN FILTER PUSHDOWN`
848    Pushdown,
849}
850
851impl ExplainContext {
852    /// If available for this context, wrap the [`OptimizerTrace`] into a
853    /// [`tracing::Dispatch`] and set it as default, returning the resulting
854    /// guard in a `Some(guard)` option.
855    pub(crate) fn dispatch_guard(&self) -> Option<DispatchGuard<'_>> {
856        let optimizer_trace = match self {
857            ExplainContext::Plan(explain_ctx) => Some(&explain_ctx.optimizer_trace),
858            ExplainContext::PlanInsightsNotice(optimizer_trace) => Some(optimizer_trace),
859            _ => None,
860        };
861        optimizer_trace.map(|optimizer_trace| optimizer_trace.as_guard())
862    }
863
864    pub(crate) fn needs_cluster(&self) -> bool {
865        match self {
866            ExplainContext::None => true,
867            ExplainContext::Plan(..) => false,
868            ExplainContext::PlanInsightsNotice(..) => true,
869            ExplainContext::Pushdown => false,
870        }
871    }
872
873    pub(crate) fn needs_plan_insights(&self) -> bool {
874        matches!(
875            self,
876            ExplainContext::Plan(ExplainPlanContext {
877                stage: ExplainStage::PlanInsights,
878                ..
879            }) | ExplainContext::PlanInsightsNotice(_)
880        )
881    }
882}
883
884#[derive(Debug)]
885pub struct ExplainPlanContext {
886    /// EXPLAIN BROKEN is internal syntax for showing EXPLAIN output despite an internal error in
887    /// the optimizer: we don't immediately bail out from peek sequencing when an internal optimizer
888    /// error happens, but go on with trying to show the requested EXPLAIN stage. This can still
889    /// succeed if the requested EXPLAIN stage is before the point where the error happened.
890    pub broken: bool,
891    pub config: ExplainConfig,
892    pub format: ExplainFormat,
893    pub stage: ExplainStage,
894    pub replan: Option<GlobalId>,
895    pub desc: Option<RelationDesc>,
896    pub optimizer_trace: OptimizerTrace,
897}
898
899#[derive(Debug)]
900pub enum CreateMaterializedViewStage {
901    Optimize(CreateMaterializedViewOptimize),
902    Finish(CreateMaterializedViewFinish),
903    Explain(CreateMaterializedViewExplain),
904}
905
906#[derive(Debug)]
907pub struct CreateMaterializedViewOptimize {
908    validity: PlanValidity,
909    plan: plan::CreateMaterializedViewPlan,
910    resolved_ids: ResolvedIds,
911    /// An optional context set iff the state machine is initiated from
912    /// sequencing an EXPLAIN for this statement.
913    explain_ctx: ExplainContext,
914}
915
916#[derive(Debug)]
917pub struct CreateMaterializedViewFinish {
918    /// The ID of this Materialized View in the Catalog.
919    item_id: CatalogItemId,
920    /// The ID of the durable pTVC backing this Materialized View.
921    global_id: GlobalId,
922    validity: PlanValidity,
923    plan: plan::CreateMaterializedViewPlan,
924    resolved_ids: ResolvedIds,
925    local_mir_plan: optimize::materialized_view::LocalMirPlan,
926    global_mir_plan: optimize::materialized_view::GlobalMirPlan,
927    global_lir_plan: optimize::materialized_view::GlobalLirPlan,
928    optimizer_features: OptimizerFeatures,
929}
930
931#[derive(Debug)]
932pub struct CreateMaterializedViewExplain {
933    global_id: GlobalId,
934    validity: PlanValidity,
935    plan: plan::CreateMaterializedViewPlan,
936    df_meta: DataflowMetainfo,
937    explain_ctx: ExplainPlanContext,
938}
939
940#[derive(Debug)]
941pub enum SubscribeStage {
942    OptimizeMir(SubscribeOptimizeMir),
943    TimestampOptimizeLir(SubscribeTimestampOptimizeLir),
944    Finish(SubscribeFinish),
945    Explain(SubscribeExplain),
946}
947
948#[derive(Debug)]
949pub struct SubscribeOptimizeMir {
950    validity: PlanValidity,
951    plan: plan::SubscribePlan,
952    timeline: TimelineContext,
953    dependency_ids: BTreeSet<GlobalId>,
954    cluster_id: ComputeInstanceId,
955    replica_id: Option<ReplicaId>,
956    /// An optional context set iff the state machine is initiated from
957    /// sequencing an EXPLAIN for this statement.
958    explain_ctx: ExplainContext,
959}
960
961#[derive(Debug)]
962pub struct SubscribeTimestampOptimizeLir {
963    validity: PlanValidity,
964    plan: plan::SubscribePlan,
965    timeline: TimelineContext,
966    optimizer: optimize::subscribe::Optimizer,
967    global_mir_plan: optimize::subscribe::GlobalMirPlan<optimize::subscribe::Unresolved>,
968    dependency_ids: BTreeSet<GlobalId>,
969    replica_id: Option<ReplicaId>,
970    /// An optional context set iff the state machine is initiated from
971    /// sequencing an EXPLAIN for this statement.
972    explain_ctx: ExplainContext,
973}
974
975#[derive(Debug)]
976pub struct SubscribeFinish {
977    validity: PlanValidity,
978    cluster_id: ComputeInstanceId,
979    replica_id: Option<ReplicaId>,
980    plan: plan::SubscribePlan,
981    global_lir_plan: optimize::subscribe::GlobalLirPlan,
982    dependency_ids: BTreeSet<GlobalId>,
983}
984
985#[derive(Debug)]
986pub struct SubscribeExplain {
987    validity: PlanValidity,
988    optimizer: optimize::subscribe::Optimizer,
989    df_meta: DataflowMetainfo,
990    cluster_id: ComputeInstanceId,
991    explain_ctx: ExplainPlanContext,
992}
993
994#[derive(Debug)]
995pub enum IntrospectionSubscribeStage {
996    OptimizeMir(IntrospectionSubscribeOptimizeMir),
997    TimestampOptimizeLir(IntrospectionSubscribeTimestampOptimizeLir),
998    Finish(IntrospectionSubscribeFinish),
999}
1000
1001#[derive(Debug)]
1002pub struct IntrospectionSubscribeOptimizeMir {
1003    validity: PlanValidity,
1004    plan: plan::SubscribePlan,
1005    subscribe_id: GlobalId,
1006    cluster_id: ComputeInstanceId,
1007    replica_id: ReplicaId,
1008}
1009
1010#[derive(Debug)]
1011pub struct IntrospectionSubscribeTimestampOptimizeLir {
1012    validity: PlanValidity,
1013    optimizer: optimize::subscribe::Optimizer,
1014    global_mir_plan: optimize::subscribe::GlobalMirPlan<optimize::subscribe::Unresolved>,
1015    cluster_id: ComputeInstanceId,
1016    replica_id: ReplicaId,
1017}
1018
1019#[derive(Debug)]
1020pub struct IntrospectionSubscribeFinish {
1021    validity: PlanValidity,
1022    global_lir_plan: optimize::subscribe::GlobalLirPlan,
1023    read_holds: ReadHolds,
1024    cluster_id: ComputeInstanceId,
1025    replica_id: ReplicaId,
1026}
1027
1028#[derive(Debug)]
1029pub enum SecretStage {
1030    CreateEnsure(CreateSecretEnsure),
1031    CreateFinish(CreateSecretFinish),
1032    RotateKeysEnsure(RotateKeysSecretEnsure),
1033    RotateKeysFinish(RotateKeysSecretFinish),
1034    Alter(AlterSecret),
1035}
1036
1037#[derive(Debug)]
1038pub struct CreateSecretEnsure {
1039    validity: PlanValidity,
1040    plan: plan::CreateSecretPlan,
1041}
1042
1043#[derive(Debug)]
1044pub struct CreateSecretFinish {
1045    validity: PlanValidity,
1046    item_id: CatalogItemId,
1047    global_id: GlobalId,
1048    plan: plan::CreateSecretPlan,
1049}
1050
1051#[derive(Debug)]
1052pub struct RotateKeysSecretEnsure {
1053    validity: PlanValidity,
1054    id: CatalogItemId,
1055}
1056
1057#[derive(Debug)]
1058pub struct RotateKeysSecretFinish {
1059    validity: PlanValidity,
1060    ops: Vec<crate::catalog::Op>,
1061}
1062
1063#[derive(Debug)]
1064pub struct AlterSecret {
1065    validity: PlanValidity,
1066    plan: plan::AlterSecretPlan,
1067}
1068
1069/// An enum describing which cluster to run a statement on.
1070///
1071/// One example usage would be that if a query depends only on system tables, we might
1072/// automatically run it on the catalog server cluster to benefit from indexes that exist there.
1073#[derive(Debug, Copy, Clone, PartialEq, Eq)]
1074pub enum TargetCluster {
1075    /// The catalog server cluster.
1076    CatalogServer,
1077    /// The current user's active cluster.
1078    Active,
1079    /// The cluster selected at the start of a transaction.
1080    Transaction(ClusterId),
1081}
1082
1083/// Result types for each stage of a sequence.
1084pub(crate) enum StageResult<T> {
1085    /// A task was spawned that will return the next stage.
1086    Handle(JoinHandle<Result<T, AdapterError>>),
1087    /// A task was spawned that will return a response for the client.
1088    HandleRetire(JoinHandle<Result<ExecuteResponse, AdapterError>>),
1089    /// The next stage is immediately ready and will execute.
1090    Immediate(T),
1091    /// The final stage was executed and is ready to respond to the client.
1092    Response(ExecuteResponse),
1093}
1094
1095/// Common functionality for [Coordinator::sequence_staged].
1096pub(crate) trait Staged: Send {
1097    type Ctx: StagedContext;
1098
1099    fn validity(&mut self) -> &mut PlanValidity;
1100
1101    /// Returns the next stage or final result.
1102    async fn stage(
1103        self,
1104        coord: &mut Coordinator,
1105        ctx: &mut Self::Ctx,
1106    ) -> Result<StageResult<Box<Self>>, AdapterError>;
1107
1108    /// Prepares a message for the Coordinator.
1109    fn message(self, ctx: Self::Ctx, span: Span) -> Message;
1110
1111    /// Whether it is safe to SQL cancel this stage.
1112    fn cancel_enabled(&self) -> bool;
1113}
1114
1115pub trait StagedContext {
1116    fn retire(self, result: Result<ExecuteResponse, AdapterError>);
1117    fn session(&self) -> Option<&Session>;
1118}
1119
1120impl StagedContext for ExecuteContext {
1121    fn retire(self, result: Result<ExecuteResponse, AdapterError>) {
1122        self.retire(result);
1123    }
1124
1125    fn session(&self) -> Option<&Session> {
1126        Some(self.session())
1127    }
1128}
1129
1130impl StagedContext for () {
1131    fn retire(self, _result: Result<ExecuteResponse, AdapterError>) {}
1132
1133    fn session(&self) -> Option<&Session> {
1134        None
1135    }
1136}
1137
1138/// Configures a coordinator.
1139pub struct Config {
1140    pub controller_config: ControllerConfig,
1141    pub controller_envd_epoch: NonZeroI64,
1142    pub storage: Box<dyn mz_catalog::durable::DurableCatalogState>,
1143    pub audit_logs_iterator: AuditLogIterator,
1144    pub timestamp_oracle_url: Option<SensitiveUrl>,
1145    pub unsafe_mode: bool,
1146    pub all_features: bool,
1147    pub build_info: &'static BuildInfo,
1148    pub environment_id: EnvironmentId,
1149    pub metrics_registry: MetricsRegistry,
1150    pub now: NowFn,
1151    pub secrets_controller: Arc<dyn SecretsController>,
1152    pub cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
1153    pub availability_zones: Vec<String>,
1154    pub cluster_replica_sizes: ClusterReplicaSizeMap,
1155    pub builtin_system_cluster_config: BootstrapBuiltinClusterConfig,
1156    pub builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig,
1157    pub builtin_probe_cluster_config: BootstrapBuiltinClusterConfig,
1158    pub builtin_support_cluster_config: BootstrapBuiltinClusterConfig,
1159    pub builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig,
1160    pub system_parameter_defaults: BTreeMap<String, String>,
1161    pub storage_usage_client: StorageUsageClient,
1162    pub storage_usage_collection_interval: Duration,
1163    pub storage_usage_retention_period: Option<Duration>,
1164    pub segment_client: Option<mz_segment::Client>,
1165    pub egress_addresses: Vec<IpNet>,
1166    pub remote_system_parameters: Option<BTreeMap<String, String>>,
1167    pub aws_account_id: Option<String>,
1168    pub aws_privatelink_availability_zones: Option<Vec<String>>,
1169    pub connection_context: ConnectionContext,
1170    pub connection_limit_callback: Box<dyn Fn(u64, u64) -> () + Send + Sync + 'static>,
1171    pub webhook_concurrency_limit: WebhookConcurrencyLimiter,
1172    pub http_host_name: Option<String>,
1173    pub tracing_handle: TracingHandle,
1174    /// Whether or not to start controllers in read-only mode. This is only
1175    /// meant for use during development of read-only clusters and 0dt upgrades
1176    /// and should go away once we have proper orchestration during upgrades.
1177    pub read_only_controllers: bool,
1178
1179    /// A trigger that signals that the current deployment has caught up with a
1180    /// previous deployment. Only used during 0dt deployment, while in read-only
1181    /// mode.
1182    pub caught_up_trigger: Option<Trigger>,
1183
1184    pub helm_chart_version: Option<String>,
1185    pub license_key: ValidatedLicenseKey,
1186    pub external_login_password_mz_system: Option<Password>,
1187    pub force_builtin_schema_migration: Option<String>,
1188}
1189
1190/// Metadata about an active connection.
1191#[derive(Debug, Serialize)]
1192pub struct ConnMeta {
1193    /// Pgwire specifies that every connection have a 32-bit secret associated
1194    /// with it, that is known to both the client and the server. Cancellation
1195    /// requests are required to authenticate with the secret of the connection
1196    /// that they are targeting.
1197    secret_key: u32,
1198    /// The time when the session's connection was initiated.
1199    connected_at: EpochMillis,
1200    user: User,
1201    application_name: String,
1202    uuid: Uuid,
1203    conn_id: ConnectionId,
1204    client_ip: Option<IpAddr>,
1205
1206    /// Sinks that will need to be dropped when the current transaction, if
1207    /// any, is cleared.
1208    drop_sinks: BTreeSet<GlobalId>,
1209
1210    /// Lock for the Coordinator's deferred statements that is dropped on transaction clear.
1211    #[serde(skip)]
1212    deferred_lock: Option<OwnedMutexGuard<()>>,
1213
1214    /// Cluster reconfigurations that will need to be
1215    /// cleaned up when the current transaction is cleared
1216    pending_cluster_alters: BTreeSet<ClusterId>,
1217
1218    /// Channel on which to send notices to a session.
1219    #[serde(skip)]
1220    notice_tx: mpsc::UnboundedSender<AdapterNotice>,
1221
1222    /// The role that initiated the database context. Fixed for the duration of the connection.
1223    /// WARNING: This role reference is not updated when the role is dropped.
1224    /// Consumers should not assume that this role exist.
1225    authenticated_role: RoleId,
1226}
1227
1228impl ConnMeta {
1229    pub fn conn_id(&self) -> &ConnectionId {
1230        &self.conn_id
1231    }
1232
1233    pub fn user(&self) -> &User {
1234        &self.user
1235    }
1236
1237    pub fn application_name(&self) -> &str {
1238        &self.application_name
1239    }
1240
1241    pub fn authenticated_role_id(&self) -> &RoleId {
1242        &self.authenticated_role
1243    }
1244
1245    pub fn uuid(&self) -> Uuid {
1246        self.uuid
1247    }
1248
1249    pub fn client_ip(&self) -> Option<IpAddr> {
1250        self.client_ip
1251    }
1252
1253    pub fn connected_at(&self) -> EpochMillis {
1254        self.connected_at
1255    }
1256}
1257
1258#[derive(Debug)]
1259/// A pending transaction waiting to be committed.
1260pub struct PendingTxn {
1261    /// Context used to send a response back to the client.
1262    ctx: ExecuteContext,
1263    /// Client response for transaction.
1264    response: Result<PendingTxnResponse, AdapterError>,
1265    /// The action to take at the end of the transaction.
1266    action: EndTransactionAction,
1267}
1268
1269#[derive(Debug)]
1270/// The response we'll send for a [`PendingTxn`].
1271pub enum PendingTxnResponse {
1272    /// The transaction will be committed.
1273    Committed {
1274        /// Parameters that will change, and their values, once this transaction is complete.
1275        params: BTreeMap<&'static str, String>,
1276    },
1277    /// The transaction will be rolled back.
1278    Rolledback {
1279        /// Parameters that will change, and their values, once this transaction is complete.
1280        params: BTreeMap<&'static str, String>,
1281    },
1282}
1283
1284impl PendingTxnResponse {
1285    pub fn extend_params(&mut self, p: impl IntoIterator<Item = (&'static str, String)>) {
1286        match self {
1287            PendingTxnResponse::Committed { params }
1288            | PendingTxnResponse::Rolledback { params } => params.extend(p),
1289        }
1290    }
1291}
1292
1293impl From<PendingTxnResponse> for ExecuteResponse {
1294    fn from(value: PendingTxnResponse) -> Self {
1295        match value {
1296            PendingTxnResponse::Committed { params } => {
1297                ExecuteResponse::TransactionCommitted { params }
1298            }
1299            PendingTxnResponse::Rolledback { params } => {
1300                ExecuteResponse::TransactionRolledBack { params }
1301            }
1302        }
1303    }
1304}
1305
1306#[derive(Debug)]
1307/// A pending read transaction waiting to be linearized along with metadata about it's state
1308pub struct PendingReadTxn {
1309    /// The transaction type
1310    txn: PendingRead,
1311    /// The timestamp context of the transaction.
1312    timestamp_context: TimestampContext,
1313    /// When we created this pending txn, when the transaction ends. Only used for metrics.
1314    created: Instant,
1315    /// Number of times we requeued the processing of this pending read txn.
1316    /// Requeueing is necessary if the time we executed the query is after the current oracle time;
1317    /// see [`Coordinator::message_linearize_reads`] for more details.
1318    num_requeues: u64,
1319    /// Telemetry context.
1320    otel_ctx: OpenTelemetryContext,
1321}
1322
1323impl PendingReadTxn {
1324    /// Return the timestamp context of the pending read transaction.
1325    pub fn timestamp_context(&self) -> &TimestampContext {
1326        &self.timestamp_context
1327    }
1328
1329    pub(crate) fn take_context(self) -> ExecuteContext {
1330        self.txn.take_context()
1331    }
1332}
1333
1334#[derive(Debug)]
1335/// A pending read transaction waiting to be linearized.
1336enum PendingRead {
1337    Read {
1338        /// The inner transaction.
1339        txn: PendingTxn,
1340    },
1341    ReadThenWrite {
1342        /// Context used to send a response back to the client.
1343        ctx: ExecuteContext,
1344        /// Channel used to alert the transaction that the read has been linearized and send back
1345        /// `ctx`.
1346        tx: oneshot::Sender<Option<ExecuteContext>>,
1347    },
1348}
1349
1350impl PendingRead {
1351    /// Alert the client that the read has been linearized.
1352    ///
1353    /// If it is necessary to finalize an execute, return the state necessary to do so
1354    /// (execution context and result)
1355    #[instrument(level = "debug")]
1356    pub fn finish(self) -> Option<(ExecuteContext, Result<ExecuteResponse, AdapterError>)> {
1357        match self {
1358            PendingRead::Read {
1359                txn:
1360                    PendingTxn {
1361                        mut ctx,
1362                        response,
1363                        action,
1364                    },
1365                ..
1366            } => {
1367                let changed = ctx.session_mut().vars_mut().end_transaction(action);
1368                // Append any parameters that changed to the response.
1369                let response = response.map(|mut r| {
1370                    r.extend_params(changed);
1371                    ExecuteResponse::from(r)
1372                });
1373
1374                Some((ctx, response))
1375            }
1376            PendingRead::ReadThenWrite { ctx, tx, .. } => {
1377                // Ignore errors if the caller has hung up.
1378                let _ = tx.send(Some(ctx));
1379                None
1380            }
1381        }
1382    }
1383
1384    fn label(&self) -> &'static str {
1385        match self {
1386            PendingRead::Read { .. } => "read",
1387            PendingRead::ReadThenWrite { .. } => "read_then_write",
1388        }
1389    }
1390
1391    pub(crate) fn take_context(self) -> ExecuteContext {
1392        match self {
1393            PendingRead::Read { txn, .. } => txn.ctx,
1394            PendingRead::ReadThenWrite { ctx, tx, .. } => {
1395                // Inform the transaction that we've taken their context.
1396                // Ignore errors if the caller has hung up.
1397                let _ = tx.send(None);
1398                ctx
1399            }
1400        }
1401    }
1402}
1403
1404/// State that the coordinator must process as part of retiring
1405/// command execution.  `ExecuteContextExtra::Default` is guaranteed
1406/// to produce a value that will cause the coordinator to do nothing, and
1407/// is intended for use by code that invokes the execution processing flow
1408/// (i.e., `sequence_plan`) without actually being a statement execution.
1409///
1410/// This is a pure data struct containing only the statement logging ID.
1411/// For auto-retire-on-drop behavior, use `ExecuteContextGuard` which wraps
1412/// this struct and owns the channel for sending retirement messages.
1413#[derive(Debug, Default)]
1414#[must_use]
1415pub struct ExecuteContextExtra {
1416    statement_uuid: Option<StatementLoggingId>,
1417}
1418
1419impl ExecuteContextExtra {
1420    pub(crate) fn new(statement_uuid: Option<StatementLoggingId>) -> Self {
1421        Self { statement_uuid }
1422    }
1423    pub fn is_trivial(&self) -> bool {
1424        self.statement_uuid.is_none()
1425    }
1426    pub fn contents(&self) -> Option<StatementLoggingId> {
1427        self.statement_uuid
1428    }
1429    /// Consume this extra and return the statement UUID for retirement.
1430    /// This should only be called from code that knows what to do to finish
1431    /// up logging based on the inner value.
1432    #[must_use]
1433    pub(crate) fn retire(self) -> Option<StatementLoggingId> {
1434        self.statement_uuid
1435    }
1436}
1437
1438/// A guard that wraps `ExecuteContextExtra` and owns a channel for sending
1439/// retirement messages to the coordinator.
1440///
1441/// If this guard is dropped with a `Some` `statement_uuid` in its inner
1442/// `ExecuteContextExtra`, the `Drop` implementation will automatically send a
1443/// `Message::RetireExecute` to log the statement ending.
1444/// This handles cases like connection drops where the context cannot be
1445/// explicitly retired.
1446/// See <https://github.com/MaterializeInc/database-issues/issues/7304>
1447#[derive(Debug)]
1448#[must_use]
1449pub struct ExecuteContextGuard {
1450    extra: ExecuteContextExtra,
1451    /// Channel for sending messages to the coordinator. Used for auto-retiring on drop.
1452    /// For `Default` instances, this is a dummy sender (receiver already dropped), so
1453    /// sends will fail silently - which is the desired behavior since Default instances
1454    /// should only be used for non-logged statements.
1455    coordinator_tx: mpsc::UnboundedSender<Message>,
1456}
1457
1458impl Default for ExecuteContextGuard {
1459    fn default() -> Self {
1460        // Create a dummy sender by immediately dropping the receiver.
1461        // Any send on this channel will fail silently, which is the desired
1462        // behavior for Default instances (non-logged statements).
1463        let (tx, _rx) = mpsc::unbounded_channel();
1464        Self {
1465            extra: ExecuteContextExtra::default(),
1466            coordinator_tx: tx,
1467        }
1468    }
1469}
1470
1471impl ExecuteContextGuard {
1472    pub(crate) fn new(
1473        statement_uuid: Option<StatementLoggingId>,
1474        coordinator_tx: mpsc::UnboundedSender<Message>,
1475    ) -> Self {
1476        Self {
1477            extra: ExecuteContextExtra::new(statement_uuid),
1478            coordinator_tx,
1479        }
1480    }
1481    pub fn is_trivial(&self) -> bool {
1482        self.extra.is_trivial()
1483    }
1484    pub fn contents(&self) -> Option<StatementLoggingId> {
1485        self.extra.contents()
1486    }
1487    /// Take responsibility for the contents.  This should only be
1488    /// called from code that knows what to do to finish up logging
1489    /// based on the inner value.
1490    ///
1491    /// Returns the inner `ExecuteContextExtra`, consuming the guard without
1492    /// triggering the auto-retire behavior.
1493    pub(crate) fn defuse(mut self) -> ExecuteContextExtra {
1494        // Taking statement_uuid prevents the Drop impl from sending a retire message
1495        std::mem::take(&mut self.extra)
1496    }
1497}
1498
1499impl Drop for ExecuteContextGuard {
1500    fn drop(&mut self) {
1501        if let Some(statement_uuid) = self.extra.statement_uuid.take() {
1502            // Auto-retire since the guard was dropped without explicit retirement (likely due
1503            // to connection drop).
1504            let msg = Message::RetireExecute {
1505                data: ExecuteContextExtra {
1506                    statement_uuid: Some(statement_uuid),
1507                },
1508                otel_ctx: OpenTelemetryContext::obtain(),
1509                reason: StatementEndedExecutionReason::Aborted,
1510            };
1511            // Send may fail for Default instances (dummy sender), which is fine since
1512            // Default instances should only be used for non-logged statements.
1513            let _ = self.coordinator_tx.send(msg);
1514        }
1515    }
1516}
1517
1518/// Bundle of state related to statement execution.
1519///
1520/// This struct collects a bundle of state that needs to be threaded
1521/// through various functions as part of statement execution.
1522/// It is used to finalize execution, by calling `retire`. Finalizing execution
1523/// involves sending the session back to the pgwire layer so that it
1524/// may be used to process further commands. It also involves
1525/// performing some work on the main coordinator thread
1526/// (e.g., recording the time at which the statement finished
1527/// executing). The state necessary to perform this work is bundled in
1528/// the `ExecuteContextGuard` object.
1529#[derive(Debug)]
1530pub struct ExecuteContext {
1531    inner: Box<ExecuteContextInner>,
1532}
1533
1534impl std::ops::Deref for ExecuteContext {
1535    type Target = ExecuteContextInner;
1536    fn deref(&self) -> &Self::Target {
1537        &*self.inner
1538    }
1539}
1540
1541impl std::ops::DerefMut for ExecuteContext {
1542    fn deref_mut(&mut self) -> &mut Self::Target {
1543        &mut *self.inner
1544    }
1545}
1546
1547#[derive(Debug)]
1548pub struct ExecuteContextInner {
1549    tx: ClientTransmitter<ExecuteResponse>,
1550    internal_cmd_tx: mpsc::UnboundedSender<Message>,
1551    session: Session,
1552    extra: ExecuteContextGuard,
1553}
1554
1555impl ExecuteContext {
1556    pub fn session(&self) -> &Session {
1557        &self.session
1558    }
1559
1560    pub fn session_mut(&mut self) -> &mut Session {
1561        &mut self.session
1562    }
1563
1564    pub fn tx(&self) -> &ClientTransmitter<ExecuteResponse> {
1565        &self.tx
1566    }
1567
1568    pub fn tx_mut(&mut self) -> &mut ClientTransmitter<ExecuteResponse> {
1569        &mut self.tx
1570    }
1571
1572    pub fn from_parts(
1573        tx: ClientTransmitter<ExecuteResponse>,
1574        internal_cmd_tx: mpsc::UnboundedSender<Message>,
1575        session: Session,
1576        extra: ExecuteContextGuard,
1577    ) -> Self {
1578        Self {
1579            inner: ExecuteContextInner {
1580                tx,
1581                session,
1582                extra,
1583                internal_cmd_tx,
1584            }
1585            .into(),
1586        }
1587    }
1588
1589    /// By calling this function, the caller takes responsibility for
1590    /// dealing with the instance of `ExecuteContextGuard`. This is
1591    /// intended to support protocols (like `COPY FROM`) that involve
1592    /// multiple passes of sending the session back and forth between
1593    /// the coordinator and the pgwire layer. As part of any such
1594    /// protocol, we must ensure that the `ExecuteContextGuard`
1595    /// (possibly wrapped in a new `ExecuteContext`) is passed back to the coordinator for
1596    /// eventual retirement.
1597    pub fn into_parts(
1598        self,
1599    ) -> (
1600        ClientTransmitter<ExecuteResponse>,
1601        mpsc::UnboundedSender<Message>,
1602        Session,
1603        ExecuteContextGuard,
1604    ) {
1605        let ExecuteContextInner {
1606            tx,
1607            internal_cmd_tx,
1608            session,
1609            extra,
1610        } = *self.inner;
1611        (tx, internal_cmd_tx, session, extra)
1612    }
1613
1614    /// Retire the execution, by sending a message to the coordinator.
1615    #[instrument(level = "debug")]
1616    pub fn retire(self, result: Result<ExecuteResponse, AdapterError>) {
1617        let ExecuteContextInner {
1618            tx,
1619            internal_cmd_tx,
1620            session,
1621            extra,
1622        } = *self.inner;
1623        let reason = if extra.is_trivial() {
1624            None
1625        } else {
1626            Some((&result).into())
1627        };
1628        tx.send(result, session);
1629        if let Some(reason) = reason {
1630            // Retire the guard to get the inner ExecuteContextExtra without triggering auto-retire
1631            let extra = extra.defuse();
1632            if let Err(e) = internal_cmd_tx.send(Message::RetireExecute {
1633                otel_ctx: OpenTelemetryContext::obtain(),
1634                data: extra,
1635                reason,
1636            }) {
1637                warn!("internal_cmd_rx dropped before we could send: {:?}", e);
1638            }
1639        }
1640    }
1641
1642    pub fn extra(&self) -> &ExecuteContextGuard {
1643        &self.extra
1644    }
1645
1646    pub fn extra_mut(&mut self) -> &mut ExecuteContextGuard {
1647        &mut self.extra
1648    }
1649}
1650
1651#[derive(Debug)]
1652struct ClusterReplicaStatuses(
1653    BTreeMap<ClusterId, BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>>>,
1654);
1655
1656impl ClusterReplicaStatuses {
1657    pub(crate) fn new() -> ClusterReplicaStatuses {
1658        ClusterReplicaStatuses(BTreeMap::new())
1659    }
1660
1661    /// Initializes the statuses of the specified cluster.
1662    ///
1663    /// Panics if the cluster statuses are already initialized.
1664    pub(crate) fn initialize_cluster_statuses(&mut self, cluster_id: ClusterId) {
1665        let prev = self.0.insert(cluster_id, BTreeMap::new());
1666        assert_eq!(
1667            prev, None,
1668            "cluster {cluster_id} statuses already initialized"
1669        );
1670    }
1671
1672    /// Initializes the statuses of the specified cluster replica.
1673    ///
1674    /// Panics if the cluster replica statuses are already initialized.
1675    pub(crate) fn initialize_cluster_replica_statuses(
1676        &mut self,
1677        cluster_id: ClusterId,
1678        replica_id: ReplicaId,
1679        num_processes: usize,
1680        time: DateTime<Utc>,
1681    ) {
1682        tracing::info!(
1683            ?cluster_id,
1684            ?replica_id,
1685            ?time,
1686            "initializing cluster replica status"
1687        );
1688        let replica_statuses = self.0.entry(cluster_id).or_default();
1689        let process_statuses = (0..num_processes)
1690            .map(|process_id| {
1691                let status = ClusterReplicaProcessStatus {
1692                    status: ClusterStatus::Offline(Some(OfflineReason::Initializing)),
1693                    time: time.clone(),
1694                };
1695                (u64::cast_from(process_id), status)
1696            })
1697            .collect();
1698        let prev = replica_statuses.insert(replica_id, process_statuses);
1699        assert_none!(
1700            prev,
1701            "cluster replica {cluster_id}.{replica_id} statuses already initialized"
1702        );
1703    }
1704
1705    /// Removes the statuses of the specified cluster.
1706    ///
1707    /// Panics if the cluster does not exist.
1708    pub(crate) fn remove_cluster_statuses(
1709        &mut self,
1710        cluster_id: &ClusterId,
1711    ) -> BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>> {
1712        let prev = self.0.remove(cluster_id);
1713        prev.unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"))
1714    }
1715
1716    /// Removes the statuses of the specified cluster replica.
1717    ///
1718    /// Panics if the cluster or replica does not exist.
1719    pub(crate) fn remove_cluster_replica_statuses(
1720        &mut self,
1721        cluster_id: &ClusterId,
1722        replica_id: &ReplicaId,
1723    ) -> BTreeMap<ProcessId, ClusterReplicaProcessStatus> {
1724        let replica_statuses = self
1725            .0
1726            .get_mut(cluster_id)
1727            .unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"));
1728        let prev = replica_statuses.remove(replica_id);
1729        prev.unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1730    }
1731
1732    /// Inserts or updates the status of the specified cluster replica process.
1733    ///
1734    /// Panics if the cluster or replica does not exist.
1735    pub(crate) fn ensure_cluster_status(
1736        &mut self,
1737        cluster_id: ClusterId,
1738        replica_id: ReplicaId,
1739        process_id: ProcessId,
1740        status: ClusterReplicaProcessStatus,
1741    ) {
1742        let replica_statuses = self
1743            .0
1744            .get_mut(&cluster_id)
1745            .unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"))
1746            .get_mut(&replica_id)
1747            .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"));
1748        replica_statuses.insert(process_id, status);
1749    }
1750
1751    /// Computes the status of the cluster replica as a whole.
1752    ///
1753    /// Panics if `cluster_id` or `replica_id` don't exist.
1754    pub fn get_cluster_replica_status(
1755        &self,
1756        cluster_id: ClusterId,
1757        replica_id: ReplicaId,
1758    ) -> ClusterStatus {
1759        let process_status = self.get_cluster_replica_statuses(cluster_id, replica_id);
1760        Self::cluster_replica_status(process_status)
1761    }
1762
1763    /// Computes the status of the cluster replica as a whole.
1764    pub fn cluster_replica_status(
1765        process_status: &BTreeMap<ProcessId, ClusterReplicaProcessStatus>,
1766    ) -> ClusterStatus {
1767        process_status
1768            .values()
1769            .fold(ClusterStatus::Online, |s, p| match (s, p.status) {
1770                (ClusterStatus::Online, ClusterStatus::Online) => ClusterStatus::Online,
1771                (x, y) => {
1772                    let reason_x = match x {
1773                        ClusterStatus::Offline(reason) => reason,
1774                        ClusterStatus::Online => None,
1775                    };
1776                    let reason_y = match y {
1777                        ClusterStatus::Offline(reason) => reason,
1778                        ClusterStatus::Online => None,
1779                    };
1780                    // Arbitrarily pick the first known not-ready reason.
1781                    ClusterStatus::Offline(reason_x.or(reason_y))
1782                }
1783            })
1784    }
1785
1786    /// Gets the statuses of the given cluster replica.
1787    ///
1788    /// Panics if the cluster or replica does not exist
1789    pub(crate) fn get_cluster_replica_statuses(
1790        &self,
1791        cluster_id: ClusterId,
1792        replica_id: ReplicaId,
1793    ) -> &BTreeMap<ProcessId, ClusterReplicaProcessStatus> {
1794        self.try_get_cluster_replica_statuses(cluster_id, replica_id)
1795            .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1796    }
1797
1798    /// Gets the statuses of the given cluster replica.
1799    pub(crate) fn try_get_cluster_replica_statuses(
1800        &self,
1801        cluster_id: ClusterId,
1802        replica_id: ReplicaId,
1803    ) -> Option<&BTreeMap<ProcessId, ClusterReplicaProcessStatus>> {
1804        self.try_get_cluster_statuses(cluster_id)
1805            .and_then(|statuses| statuses.get(&replica_id))
1806    }
1807
1808    /// Gets the statuses of the given cluster.
1809    pub(crate) fn try_get_cluster_statuses(
1810        &self,
1811        cluster_id: ClusterId,
1812    ) -> Option<&BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>>> {
1813        self.0.get(&cluster_id)
1814    }
1815}
1816
1817/// Glues the external world to the Timely workers.
1818#[derive(Derivative)]
1819#[derivative(Debug)]
1820pub struct Coordinator {
1821    /// The controller for the storage and compute layers.
1822    #[derivative(Debug = "ignore")]
1823    controller: mz_controller::Controller,
1824    /// The catalog in an Arc suitable for readonly references. The Arc allows
1825    /// us to hand out cheap copies of the catalog to functions that can use it
1826    /// off of the main coordinator thread. If the coordinator needs to mutate
1827    /// the catalog, call [`Self::catalog_mut`], which will clone this struct member,
1828    /// allowing it to be mutated here while the other off-thread references can
1829    /// read their catalog as long as needed. In the future we would like this
1830    /// to be a pTVC, but for now this is sufficient.
1831    catalog: Arc<Catalog>,
1832
1833    /// A client for persist. Initially, this is only used for reading stashed
1834    /// peek responses out of batches.
1835    persist_client: PersistClient,
1836
1837    /// Channel to manage internal commands from the coordinator to itself.
1838    internal_cmd_tx: mpsc::UnboundedSender<Message>,
1839    /// Notification that triggers a group commit.
1840    group_commit_tx: appends::GroupCommitNotifier,
1841
1842    /// Channel for strict serializable reads ready to commit.
1843    strict_serializable_reads_tx: mpsc::UnboundedSender<(ConnectionId, PendingReadTxn)>,
1844
1845    /// Mechanism for totally ordering write and read timestamps, so that all reads
1846    /// reflect exactly the set of writes that precede them, and no writes that follow.
1847    global_timelines: BTreeMap<Timeline, TimelineState>,
1848
1849    /// A generator for transient [`GlobalId`]s, shareable with other threads.
1850    transient_id_gen: Arc<TransientIdGen>,
1851    /// A map from connection ID to metadata about that connection for all
1852    /// active connections.
1853    active_conns: BTreeMap<ConnectionId, ConnMeta>,
1854
1855    /// For each transaction, the read holds taken to support any performed reads.
1856    ///
1857    /// Upon completing a transaction, these read holds should be dropped.
1858    txn_read_holds: BTreeMap<ConnectionId, read_policy::ReadHolds>,
1859
1860    /// Access to the peek fields should be restricted to methods in the [`peek`] API.
1861    /// A map from pending peek ids to the queue into which responses are sent, and
1862    /// the connection id of the client that initiated the peek.
1863    pending_peeks: BTreeMap<Uuid, PendingPeek>,
1864    /// A map from client connection ids to a set of all pending peeks for that client.
1865    client_pending_peeks: BTreeMap<ConnectionId, BTreeMap<Uuid, ClusterId>>,
1866
1867    /// A map from client connection ids to pending linearize read transaction.
1868    pending_linearize_read_txns: BTreeMap<ConnectionId, PendingReadTxn>,
1869
1870    /// A map from the compute sink ID to it's state description.
1871    active_compute_sinks: BTreeMap<GlobalId, ActiveComputeSink>,
1872    /// A map from active webhooks to their invalidation handle.
1873    active_webhooks: BTreeMap<CatalogItemId, WebhookAppenderInvalidator>,
1874    /// A map of active `COPY FROM` statements. The Coordinator waits for `clusterd`
1875    /// to stage Batches in Persist that we will then link into the shard.
1876    active_copies: BTreeMap<ConnectionId, ActiveCopyFrom>,
1877
1878    /// A map from connection ids to a watch channel that is set to `true` if the connection
1879    /// received a cancel request.
1880    staged_cancellation: BTreeMap<ConnectionId, (watch::Sender<bool>, watch::Receiver<bool>)>,
1881    /// Active introspection subscribes.
1882    introspection_subscribes: BTreeMap<GlobalId, IntrospectionSubscribe>,
1883
1884    /// Locks that grant access to a specific object, populated lazily as objects are written to.
1885    write_locks: BTreeMap<CatalogItemId, Arc<tokio::sync::Mutex<()>>>,
1886    /// Plans that are currently deferred and waiting on a write lock.
1887    deferred_write_ops: BTreeMap<ConnectionId, DeferredOp>,
1888
1889    /// Pending writes waiting for a group commit.
1890    pending_writes: Vec<PendingWriteTxn>,
1891
1892    /// For the realtime timeline, an explicit SELECT or INSERT on a table will bump the
1893    /// table's timestamps, but there are cases where timestamps are not bumped but
1894    /// we expect the closed timestamps to advance (`AS OF X`, SUBSCRIBing views over
1895    /// RT sources and tables). To address these, spawn a task that forces table
1896    /// timestamps to close on a regular interval. This roughly tracks the behavior
1897    /// of realtime sources that close off timestamps on an interval.
1898    ///
1899    /// For non-realtime timelines, nothing pushes the timestamps forward, so we must do
1900    /// it manually.
1901    advance_timelines_interval: Interval,
1902
1903    /// Serialized DDL. DDL must be serialized because:
1904    /// - Many of them do off-thread work and need to verify the catalog is in a valid state, but
1905    ///   [`PlanValidity`] does not currently support tracking all changes. Doing that correctly
1906    ///   seems to be more difficult than it's worth, so we would instead re-plan and re-sequence
1907    ///   the statements.
1908    /// - Re-planning a statement is hard because Coordinator and Session state is mutated at
1909    ///   various points, and we would need to correctly reset those changes before re-planning and
1910    ///   re-sequencing.
1911    serialized_ddl: LockedVecDeque<DeferredPlanStatement>,
1912
1913    /// Handle to secret manager that can create and delete secrets from
1914    /// an arbitrary secret storage engine.
1915    secrets_controller: Arc<dyn SecretsController>,
1916    /// A secrets reader than maintains an in-memory cache, where values have a set TTL.
1917    caching_secrets_reader: CachingSecretsReader,
1918
1919    /// Handle to a manager that can create and delete kubernetes resources
1920    /// (ie: VpcEndpoint objects)
1921    cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
1922
1923    /// Persist client for fetching storage metadata such as size metrics.
1924    storage_usage_client: StorageUsageClient,
1925    /// The interval at which to collect storage usage information.
1926    storage_usage_collection_interval: Duration,
1927
1928    /// Segment analytics client.
1929    #[derivative(Debug = "ignore")]
1930    segment_client: Option<mz_segment::Client>,
1931
1932    /// Coordinator metrics.
1933    metrics: Metrics,
1934    /// Optimizer metrics.
1935    optimizer_metrics: OptimizerMetrics,
1936
1937    /// Tracing handle.
1938    tracing_handle: TracingHandle,
1939
1940    /// Data used by the statement logging feature.
1941    statement_logging: StatementLogging,
1942
1943    /// Limit for how many concurrent webhook requests we allow.
1944    webhook_concurrency_limit: WebhookConcurrencyLimiter,
1945
1946    /// Optional config for the timestamp oracle. This is _required_ when
1947    /// a timestamp oracle backend is configured.
1948    timestamp_oracle_config: Option<TimestampOracleConfig>,
1949
1950    /// Periodically asks cluster scheduling policies to make their decisions.
1951    check_cluster_scheduling_policies_interval: Interval,
1952
1953    /// This keeps the last On/Off decision for each cluster and each scheduling policy.
1954    /// (Clusters that have been dropped or are otherwise out of scope for automatic scheduling are
1955    /// periodically cleaned up from this Map.)
1956    cluster_scheduling_decisions: BTreeMap<ClusterId, BTreeMap<&'static str, SchedulingDecision>>,
1957
1958    /// When doing 0dt upgrades/in read-only mode, periodically ask all known
1959    /// clusters/collections whether they are caught up.
1960    caught_up_check_interval: Interval,
1961
1962    /// Context needed to check whether all clusters/collections have caught up.
1963    /// Only used during 0dt deployment, while in read-only mode.
1964    caught_up_check: Option<CaughtUpCheckContext>,
1965
1966    /// Tracks the state associated with the currently installed watchsets.
1967    installed_watch_sets: BTreeMap<WatchSetId, (ConnectionId, WatchSetResponse)>,
1968
1969    /// Tracks the currently installed watchsets for each connection.
1970    connection_watch_sets: BTreeMap<ConnectionId, BTreeSet<WatchSetId>>,
1971
1972    /// Tracks the statuses of all cluster replicas.
1973    cluster_replica_statuses: ClusterReplicaStatuses,
1974
1975    /// Whether or not to start controllers in read-only mode. This is only
1976    /// meant for use during development of read-only clusters and 0dt upgrades
1977    /// and should go away once we have proper orchestration during upgrades.
1978    read_only_controllers: bool,
1979
1980    /// Updates to builtin tables that are being buffered while we are in
1981    /// read-only mode. We apply these all at once when coming out of read-only
1982    /// mode.
1983    ///
1984    /// This is a `Some` while in read-only mode and will be replaced by a
1985    /// `None` when we transition out of read-only mode and write out any
1986    /// buffered updates.
1987    buffered_builtin_table_updates: Option<Vec<BuiltinTableUpdate>>,
1988
1989    license_key: ValidatedLicenseKey,
1990
1991    /// Pre-allocated pool of user IDs to amortize persist writes across DDL operations.
1992    user_id_pool: IdPool,
1993}
1994
1995impl Coordinator {
1996    /// Initializes coordinator state based on the contained catalog. Must be
1997    /// called after creating the coordinator and before calling the
1998    /// `Coordinator::serve` method.
1999    #[instrument(name = "coord::bootstrap")]
2000    pub(crate) async fn bootstrap(
2001        &mut self,
2002        boot_ts: Timestamp,
2003        migrated_storage_collections_0dt: BTreeSet<CatalogItemId>,
2004        mut builtin_table_updates: Vec<BuiltinTableUpdate>,
2005        cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
2006        uncached_local_exprs: BTreeMap<GlobalId, LocalExpressions>,
2007        audit_logs_iterator: AuditLogIterator,
2008    ) -> Result<(), AdapterError> {
2009        let bootstrap_start = Instant::now();
2010        info!("startup: coordinator init: bootstrap beginning");
2011        info!("startup: coordinator init: bootstrap: preamble beginning");
2012
2013        // Initialize cluster replica statuses.
2014        // Gross iterator is to avoid partial borrow issues.
2015        let cluster_statuses: Vec<(_, Vec<_>)> = self
2016            .catalog()
2017            .clusters()
2018            .map(|cluster| {
2019                (
2020                    cluster.id(),
2021                    cluster
2022                        .replicas()
2023                        .map(|replica| {
2024                            (replica.replica_id, replica.config.location.num_processes())
2025                        })
2026                        .collect(),
2027                )
2028            })
2029            .collect();
2030        let now = self.now_datetime();
2031        for (cluster_id, replica_statuses) in cluster_statuses {
2032            self.cluster_replica_statuses
2033                .initialize_cluster_statuses(cluster_id);
2034            for (replica_id, num_processes) in replica_statuses {
2035                self.cluster_replica_statuses
2036                    .initialize_cluster_replica_statuses(
2037                        cluster_id,
2038                        replica_id,
2039                        num_processes,
2040                        now,
2041                    );
2042            }
2043        }
2044
2045        let system_config = self.catalog().system_config();
2046
2047        // Inform metrics about the initial system configuration.
2048        mz_metrics::update_dyncfg(&system_config.dyncfg_updates());
2049
2050        // Inform the controllers about their initial configuration.
2051        let compute_config = flags::compute_config(system_config);
2052        let storage_config = flags::storage_config(system_config);
2053        let scheduling_config = flags::orchestrator_scheduling_config(system_config);
2054        let dyncfg_updates = system_config.dyncfg_updates();
2055        self.controller.compute.update_configuration(compute_config);
2056        self.controller.storage.update_parameters(storage_config);
2057        self.controller
2058            .update_orchestrator_scheduling_config(scheduling_config);
2059        self.controller.update_configuration(dyncfg_updates);
2060
2061        self.validate_resource_limit_numeric(
2062            Numeric::zero(),
2063            self.current_credit_consumption_rate(),
2064            |system_vars| {
2065                self.license_key
2066                    .max_credit_consumption_rate()
2067                    .map_or_else(|| system_vars.max_credit_consumption_rate(), Numeric::from)
2068            },
2069            "cluster replica",
2070            MAX_CREDIT_CONSUMPTION_RATE.name(),
2071        )?;
2072
2073        let mut policies_to_set: BTreeMap<CompactionWindow, CollectionIdBundle> =
2074            Default::default();
2075
2076        let enable_worker_core_affinity =
2077            self.catalog().system_config().enable_worker_core_affinity();
2078        let enable_storage_introspection_logs = self
2079            .catalog()
2080            .system_config()
2081            .enable_storage_introspection_logs();
2082        for instance in self.catalog.clusters() {
2083            self.controller.create_cluster(
2084                instance.id,
2085                ClusterConfig {
2086                    arranged_logs: instance.log_indexes.clone(),
2087                    workload_class: instance.config.workload_class.clone(),
2088                },
2089            )?;
2090            for replica in instance.replicas() {
2091                let role = instance.role();
2092                self.controller.create_replica(
2093                    instance.id,
2094                    replica.replica_id,
2095                    instance.name.clone(),
2096                    replica.name.clone(),
2097                    role,
2098                    replica.config.clone(),
2099                    enable_worker_core_affinity,
2100                    enable_storage_introspection_logs,
2101                )?;
2102            }
2103        }
2104
2105        info!(
2106            "startup: coordinator init: bootstrap: preamble complete in {:?}",
2107            bootstrap_start.elapsed()
2108        );
2109
2110        let init_storage_collections_start = Instant::now();
2111        info!("startup: coordinator init: bootstrap: storage collections init beginning");
2112        self.bootstrap_storage_collections(&migrated_storage_collections_0dt)
2113            .await;
2114        info!(
2115            "startup: coordinator init: bootstrap: storage collections init complete in {:?}",
2116            init_storage_collections_start.elapsed()
2117        );
2118
2119        // The storage controller knows about the introspection collections now, so we can start
2120        // sinking introspection updates in the compute controller. It makes sense to do that as
2121        // soon as possible, to avoid updates piling up in the compute controller's internal
2122        // buffers.
2123        self.controller.start_compute_introspection_sink();
2124
2125        let sorting_start = Instant::now();
2126        info!("startup: coordinator init: bootstrap: sorting catalog entries");
2127        let entries = self.bootstrap_sort_catalog_entries();
2128        info!(
2129            "startup: coordinator init: bootstrap: sorting catalog entries complete in {:?}",
2130            sorting_start.elapsed()
2131        );
2132
2133        let optimize_dataflows_start = Instant::now();
2134        info!("startup: coordinator init: bootstrap: optimize dataflow plans beginning");
2135        let uncached_global_exps = self.bootstrap_dataflow_plans(&entries, cached_global_exprs)?;
2136        info!(
2137            "startup: coordinator init: bootstrap: optimize dataflow plans complete in {:?}",
2138            optimize_dataflows_start.elapsed()
2139        );
2140
2141        // We don't need to wait for the cache to update.
2142        let _fut = self.catalog().update_expression_cache(
2143            uncached_local_exprs.into_iter().collect(),
2144            uncached_global_exps.into_iter().collect(),
2145            Default::default(),
2146        );
2147
2148        // Select dataflow as-ofs. This step relies on the storage collections created by
2149        // `bootstrap_storage_collections` and the dataflow plans created by
2150        // `bootstrap_dataflow_plans`.
2151        let bootstrap_as_ofs_start = Instant::now();
2152        info!("startup: coordinator init: bootstrap: dataflow as-of bootstrapping beginning");
2153        let dataflow_read_holds = self.bootstrap_dataflow_as_ofs().await;
2154        info!(
2155            "startup: coordinator init: bootstrap: dataflow as-of bootstrapping complete in {:?}",
2156            bootstrap_as_ofs_start.elapsed()
2157        );
2158
2159        let postamble_start = Instant::now();
2160        info!("startup: coordinator init: bootstrap: postamble beginning");
2161
2162        let logs: BTreeSet<_> = BUILTINS::logs()
2163            .map(|log| self.catalog().resolve_builtin_log(log))
2164            .flat_map(|item_id| self.catalog().get_global_ids(&item_id))
2165            .collect();
2166
2167        let mut privatelink_connections = BTreeMap::new();
2168
2169        for entry in &entries {
2170            debug!(
2171                "coordinator init: installing {} {}",
2172                entry.item().typ(),
2173                entry.id()
2174            );
2175            let mut policy = entry.item().initial_logical_compaction_window();
2176            match entry.item() {
2177                // Currently catalog item rebuild assumes that sinks and
2178                // indexes are always built individually and does not store information
2179                // about how it was built. If we start building multiple sinks and/or indexes
2180                // using a single dataflow, we have to make sure the rebuild process re-runs
2181                // the same multiple-build dataflow.
2182                CatalogItem::Source(source) => {
2183                    // Propagate source compaction windows to subsources if needed.
2184                    if source.custom_logical_compaction_window.is_none() {
2185                        if let DataSourceDesc::IngestionExport { ingestion_id, .. } =
2186                            source.data_source
2187                        {
2188                            policy = Some(
2189                                self.catalog()
2190                                    .get_entry(&ingestion_id)
2191                                    .source()
2192                                    .expect("must be source")
2193                                    .custom_logical_compaction_window
2194                                    .unwrap_or_default(),
2195                            );
2196                        }
2197                    }
2198                    policies_to_set
2199                        .entry(policy.expect("sources have a compaction window"))
2200                        .or_insert_with(Default::default)
2201                        .storage_ids
2202                        .insert(source.global_id());
2203                }
2204                CatalogItem::Table(table) => {
2205                    policies_to_set
2206                        .entry(policy.expect("tables have a compaction window"))
2207                        .or_insert_with(Default::default)
2208                        .storage_ids
2209                        .extend(table.global_ids());
2210                }
2211                CatalogItem::Index(idx) => {
2212                    let policy_entry = policies_to_set
2213                        .entry(policy.expect("indexes have a compaction window"))
2214                        .or_insert_with(Default::default);
2215
2216                    if logs.contains(&idx.on) {
2217                        policy_entry
2218                            .compute_ids
2219                            .entry(idx.cluster_id)
2220                            .or_insert_with(BTreeSet::new)
2221                            .insert(idx.global_id());
2222                    } else {
2223                        let df_desc = self
2224                            .catalog()
2225                            .try_get_physical_plan(&idx.global_id())
2226                            .expect("added in `bootstrap_dataflow_plans`")
2227                            .clone();
2228
2229                        let df_meta = self
2230                            .catalog()
2231                            .try_get_dataflow_metainfo(&idx.global_id())
2232                            .expect("added in `bootstrap_dataflow_plans`");
2233
2234                        if self.catalog().state().system_config().enable_mz_notices() {
2235                            // Collect optimization hint updates.
2236                            self.catalog().state().pack_optimizer_notices(
2237                                &mut builtin_table_updates,
2238                                df_meta.optimizer_notices.iter(),
2239                                Diff::ONE,
2240                            );
2241                        }
2242
2243                        // What follows is morally equivalent to `self.ship_dataflow(df, idx.cluster_id)`,
2244                        // but we cannot call that as it will also downgrade the read hold on the index.
2245                        policy_entry
2246                            .compute_ids
2247                            .entry(idx.cluster_id)
2248                            .or_insert_with(Default::default)
2249                            .extend(df_desc.export_ids());
2250
2251                        self.controller
2252                            .compute
2253                            .create_dataflow(idx.cluster_id, df_desc, None)
2254                            .unwrap_or_terminate("cannot fail to create dataflows");
2255                    }
2256                }
2257                CatalogItem::View(_) => (),
2258                CatalogItem::MaterializedView(mview) => {
2259                    policies_to_set
2260                        .entry(policy.expect("materialized views have a compaction window"))
2261                        .or_insert_with(Default::default)
2262                        .storage_ids
2263                        .insert(mview.global_id_writes());
2264
2265                    let mut df_desc = self
2266                        .catalog()
2267                        .try_get_physical_plan(&mview.global_id_writes())
2268                        .expect("added in `bootstrap_dataflow_plans`")
2269                        .clone();
2270
2271                    if let Some(initial_as_of) = mview.initial_as_of.clone() {
2272                        df_desc.set_initial_as_of(initial_as_of);
2273                    }
2274
2275                    // If we have a refresh schedule that has a last refresh, then set the `until` to the last refresh.
2276                    let until = mview
2277                        .refresh_schedule
2278                        .as_ref()
2279                        .and_then(|s| s.last_refresh())
2280                        .and_then(|r| r.try_step_forward());
2281                    if let Some(until) = until {
2282                        df_desc.until.meet_assign(&Antichain::from_elem(until));
2283                    }
2284
2285                    let df_meta = self
2286                        .catalog()
2287                        .try_get_dataflow_metainfo(&mview.global_id_writes())
2288                        .expect("added in `bootstrap_dataflow_plans`");
2289
2290                    if self.catalog().state().system_config().enable_mz_notices() {
2291                        // Collect optimization hint updates.
2292                        self.catalog().state().pack_optimizer_notices(
2293                            &mut builtin_table_updates,
2294                            df_meta.optimizer_notices.iter(),
2295                            Diff::ONE,
2296                        );
2297                    }
2298
2299                    self.ship_dataflow(df_desc, mview.cluster_id, mview.target_replica)
2300                        .await;
2301
2302                    // If this is a replacement MV, it must remain read-only until the replacement
2303                    // gets applied.
2304                    if mview.replacement_target.is_none() {
2305                        self.allow_writes(mview.cluster_id, mview.global_id_writes());
2306                    }
2307                }
2308                CatalogItem::Sink(sink) => {
2309                    policies_to_set
2310                        .entry(CompactionWindow::Default)
2311                        .or_insert_with(Default::default)
2312                        .storage_ids
2313                        .insert(sink.global_id());
2314                }
2315                CatalogItem::Connection(catalog_connection) => {
2316                    if let ConnectionDetails::AwsPrivatelink(conn) = &catalog_connection.details {
2317                        privatelink_connections.insert(
2318                            entry.id(),
2319                            VpcEndpointConfig {
2320                                aws_service_name: conn.service_name.clone(),
2321                                availability_zone_ids: conn.availability_zones.clone(),
2322                            },
2323                        );
2324                    }
2325                }
2326                CatalogItem::ContinualTask(ct) => {
2327                    policies_to_set
2328                        .entry(policy.expect("continual tasks have a compaction window"))
2329                        .or_insert_with(Default::default)
2330                        .storage_ids
2331                        .insert(ct.global_id());
2332
2333                    let mut df_desc = self
2334                        .catalog()
2335                        .try_get_physical_plan(&ct.global_id())
2336                        .expect("added in `bootstrap_dataflow_plans`")
2337                        .clone();
2338
2339                    if let Some(initial_as_of) = ct.initial_as_of.clone() {
2340                        df_desc.set_initial_as_of(initial_as_of);
2341                    }
2342
2343                    let df_meta = self
2344                        .catalog()
2345                        .try_get_dataflow_metainfo(&ct.global_id())
2346                        .expect("added in `bootstrap_dataflow_plans`");
2347
2348                    if self.catalog().state().system_config().enable_mz_notices() {
2349                        // Collect optimization hint updates.
2350                        self.catalog().state().pack_optimizer_notices(
2351                            &mut builtin_table_updates,
2352                            df_meta.optimizer_notices.iter(),
2353                            Diff::ONE,
2354                        );
2355                    }
2356
2357                    self.ship_dataflow(df_desc, ct.cluster_id, None).await;
2358                    self.allow_writes(ct.cluster_id, ct.global_id());
2359                }
2360                // Nothing to do for these cases
2361                CatalogItem::Log(_)
2362                | CatalogItem::Type(_)
2363                | CatalogItem::Func(_)
2364                | CatalogItem::Secret(_) => {}
2365            }
2366        }
2367
2368        if let Some(cloud_resource_controller) = &self.cloud_resource_controller {
2369            // Clean up any extraneous VpcEndpoints that shouldn't exist.
2370            let existing_vpc_endpoints = cloud_resource_controller
2371                .list_vpc_endpoints()
2372                .await
2373                .context("list vpc endpoints")?;
2374            let existing_vpc_endpoints = BTreeSet::from_iter(existing_vpc_endpoints.into_keys());
2375            let desired_vpc_endpoints = privatelink_connections.keys().cloned().collect();
2376            let vpc_endpoints_to_remove = existing_vpc_endpoints.difference(&desired_vpc_endpoints);
2377            for id in vpc_endpoints_to_remove {
2378                cloud_resource_controller
2379                    .delete_vpc_endpoint(*id)
2380                    .await
2381                    .context("deleting extraneous vpc endpoint")?;
2382            }
2383
2384            // Ensure desired VpcEndpoints are up to date.
2385            for (id, spec) in privatelink_connections {
2386                cloud_resource_controller
2387                    .ensure_vpc_endpoint(id, spec)
2388                    .await
2389                    .context("ensuring vpc endpoint")?;
2390            }
2391        }
2392
2393        // Having installed all entries, creating all constraints, we can now drop read holds and
2394        // relax read policies.
2395        drop(dataflow_read_holds);
2396        // TODO -- Improve `initialize_read_policies` API so we can avoid calling this in a loop.
2397        for (cw, policies) in policies_to_set {
2398            self.initialize_read_policies(&policies, cw).await;
2399        }
2400
2401        // Expose mapping from T-shirt sizes to actual sizes
2402        builtin_table_updates.extend(
2403            self.catalog().state().resolve_builtin_table_updates(
2404                self.catalog().state().pack_all_replica_size_updates(),
2405            ),
2406        );
2407
2408        debug!("startup: coordinator init: bootstrap: initializing migrated builtin tables");
2409        // When 0dt is enabled, we create new shards for any migrated builtin storage collections.
2410        // In read-only mode, the migrated builtin tables (which are a subset of migrated builtin
2411        // storage collections) need to be back-filled so that any dependent dataflow can be
2412        // hydrated. Additionally, these shards are not registered with the txn-shard, and cannot
2413        // be registered while in read-only, so they are written to directly.
2414        let migrated_updates_fut = if self.controller.read_only() {
2415            let min_timestamp = Timestamp::minimum();
2416            let migrated_builtin_table_updates: Vec<_> = builtin_table_updates
2417                .extract_if(.., |update| {
2418                    let gid = self.catalog().get_entry(&update.id).latest_global_id();
2419                    migrated_storage_collections_0dt.contains(&update.id)
2420                        && self
2421                            .controller
2422                            .storage_collections
2423                            .collection_frontiers(gid)
2424                            .expect("all tables are registered")
2425                            .write_frontier
2426                            .elements()
2427                            == &[min_timestamp]
2428                })
2429                .collect();
2430            if migrated_builtin_table_updates.is_empty() {
2431                futures::future::ready(()).boxed()
2432            } else {
2433                // Group all updates per-table.
2434                let mut grouped_appends: BTreeMap<GlobalId, Vec<TableData>> = BTreeMap::new();
2435                for update in migrated_builtin_table_updates {
2436                    let gid = self.catalog().get_entry(&update.id).latest_global_id();
2437                    grouped_appends.entry(gid).or_default().push(update.data);
2438                }
2439                info!(
2440                    "coordinator init: rehydrating migrated builtin tables in read-only mode: {:?}",
2441                    grouped_appends.keys().collect::<Vec<_>>()
2442                );
2443
2444                // Consolidate Row data, staged batches must already be consolidated.
2445                let mut all_appends = Vec::with_capacity(grouped_appends.len());
2446                for (item_id, table_data) in grouped_appends.into_iter() {
2447                    let mut all_rows = Vec::new();
2448                    let mut all_data = Vec::new();
2449                    for data in table_data {
2450                        match data {
2451                            TableData::Rows(rows) => all_rows.extend(rows),
2452                            TableData::Batches(_) => all_data.push(data),
2453                        }
2454                    }
2455                    differential_dataflow::consolidation::consolidate(&mut all_rows);
2456                    all_data.push(TableData::Rows(all_rows));
2457
2458                    // TODO(parkmycar): Use SmallVec throughout.
2459                    all_appends.push((item_id, all_data));
2460                }
2461
2462                let fut = self
2463                    .controller
2464                    .storage
2465                    .append_table(min_timestamp, boot_ts.step_forward(), all_appends)
2466                    .expect("cannot fail to append");
2467                async {
2468                    fut.await
2469                        .expect("One-shot shouldn't be dropped during bootstrap")
2470                        .unwrap_or_terminate("cannot fail to append")
2471                }
2472                .boxed()
2473            }
2474        } else {
2475            futures::future::ready(()).boxed()
2476        };
2477
2478        info!(
2479            "startup: coordinator init: bootstrap: postamble complete in {:?}",
2480            postamble_start.elapsed()
2481        );
2482
2483        let builtin_update_start = Instant::now();
2484        info!("startup: coordinator init: bootstrap: generate builtin updates beginning");
2485
2486        if self.controller.read_only() {
2487            info!(
2488                "coordinator init: bootstrap: stashing builtin table updates while in read-only mode"
2489            );
2490
2491            // TODO(jkosh44) Optimize deserializing the audit log in read-only mode.
2492            let audit_join_start = Instant::now();
2493            info!("startup: coordinator init: bootstrap: audit log deserialization beginning");
2494            let audit_log_updates: Vec<_> = audit_logs_iterator
2495                .map(|(audit_log, ts)| StateUpdate {
2496                    kind: StateUpdateKind::AuditLog(audit_log),
2497                    ts,
2498                    diff: StateDiff::Addition,
2499                })
2500                .collect();
2501            let audit_log_builtin_table_updates = self
2502                .catalog()
2503                .state()
2504                .generate_builtin_table_updates(audit_log_updates);
2505            builtin_table_updates.extend(audit_log_builtin_table_updates);
2506            info!(
2507                "startup: coordinator init: bootstrap: audit log deserialization complete in {:?}",
2508                audit_join_start.elapsed()
2509            );
2510            self.buffered_builtin_table_updates
2511                .as_mut()
2512                .expect("in read-only mode")
2513                .append(&mut builtin_table_updates);
2514        } else {
2515            self.bootstrap_tables(&entries, builtin_table_updates, audit_logs_iterator)
2516                .await;
2517        };
2518        info!(
2519            "startup: coordinator init: bootstrap: generate builtin updates complete in {:?}",
2520            builtin_update_start.elapsed()
2521        );
2522
2523        let cleanup_secrets_start = Instant::now();
2524        info!("startup: coordinator init: bootstrap: generate secret cleanup beginning");
2525        // Cleanup orphaned secrets. Errors during list() or delete() do not
2526        // need to prevent bootstrap from succeeding; we will retry next
2527        // startup.
2528        {
2529            // Destructure Self so we can selectively move fields into the async
2530            // task.
2531            let Self {
2532                secrets_controller,
2533                catalog,
2534                ..
2535            } = self;
2536
2537            let next_user_item_id = catalog.get_next_user_item_id().await?;
2538            let next_system_item_id = catalog.get_next_system_item_id().await?;
2539            let read_only = self.controller.read_only();
2540            // Fetch all IDs from the catalog to future-proof against other
2541            // things using secrets. Today, SECRET and CONNECTION objects use
2542            // secrets_controller.ensure, but more things could in the future
2543            // that would be easy to miss adding here.
2544            let catalog_ids: BTreeSet<CatalogItemId> =
2545                catalog.entries().map(|entry| entry.id()).collect();
2546            let secrets_controller = Arc::clone(secrets_controller);
2547
2548            spawn(|| "cleanup-orphaned-secrets", async move {
2549                if read_only {
2550                    info!(
2551                        "coordinator init: not cleaning up orphaned secrets while in read-only mode"
2552                    );
2553                    return;
2554                }
2555                info!("coordinator init: cleaning up orphaned secrets");
2556
2557                match secrets_controller.list().await {
2558                    Ok(controller_secrets) => {
2559                        let controller_secrets: BTreeSet<CatalogItemId> =
2560                            controller_secrets.into_iter().collect();
2561                        let orphaned = controller_secrets.difference(&catalog_ids);
2562                        for id in orphaned {
2563                            let id_too_large = match id {
2564                                CatalogItemId::System(id) => *id >= next_system_item_id,
2565                                CatalogItemId::User(id) => *id >= next_user_item_id,
2566                                CatalogItemId::IntrospectionSourceIndex(_)
2567                                | CatalogItemId::Transient(_) => false,
2568                            };
2569                            if id_too_large {
2570                                info!(
2571                                    %next_user_item_id, %next_system_item_id,
2572                                    "coordinator init: not deleting orphaned secret {id} that was likely created by a newer deploy generation"
2573                                );
2574                            } else {
2575                                info!("coordinator init: deleting orphaned secret {id}");
2576                                fail_point!("orphan_secrets");
2577                                if let Err(e) = secrets_controller.delete(*id).await {
2578                                    warn!(
2579                                        "Dropping orphaned secret has encountered an error: {}",
2580                                        e
2581                                    );
2582                                }
2583                            }
2584                        }
2585                    }
2586                    Err(e) => warn!("Failed to list secrets during orphan cleanup: {:?}", e),
2587                }
2588            });
2589        }
2590        info!(
2591            "startup: coordinator init: bootstrap: generate secret cleanup complete in {:?}",
2592            cleanup_secrets_start.elapsed()
2593        );
2594
2595        // Run all of our final steps concurrently.
2596        let final_steps_start = Instant::now();
2597        info!(
2598            "startup: coordinator init: bootstrap: migrate builtin tables in read-only mode beginning"
2599        );
2600        migrated_updates_fut
2601            .instrument(info_span!("coord::bootstrap::final"))
2602            .await;
2603
2604        debug!(
2605            "startup: coordinator init: bootstrap: announcing completion of initialization to controller"
2606        );
2607        // Announce the completion of initialization.
2608        self.controller.initialization_complete();
2609
2610        // Initialize unified introspection.
2611        self.bootstrap_introspection_subscribes().await;
2612
2613        info!(
2614            "startup: coordinator init: bootstrap: migrate builtin tables in read-only mode complete in {:?}",
2615            final_steps_start.elapsed()
2616        );
2617
2618        info!(
2619            "startup: coordinator init: bootstrap complete in {:?}",
2620            bootstrap_start.elapsed()
2621        );
2622        Ok(())
2623    }
2624
2625    /// Prepares tables for writing by resetting them to a known state and
2626    /// appending the given builtin table updates. The timestamp oracle
2627    /// will be advanced to the write timestamp of the append when this
2628    /// method returns.
2629    #[allow(clippy::async_yields_async)]
2630    #[instrument]
2631    async fn bootstrap_tables(
2632        &mut self,
2633        entries: &[CatalogEntry],
2634        mut builtin_table_updates: Vec<BuiltinTableUpdate>,
2635        audit_logs_iterator: AuditLogIterator,
2636    ) {
2637        /// Smaller helper struct of metadata for bootstrapping tables.
2638        struct TableMetadata<'a> {
2639            id: CatalogItemId,
2640            name: &'a QualifiedItemName,
2641            table: &'a Table,
2642        }
2643
2644        // Filter our entries down to just tables.
2645        let table_metas: Vec<_> = entries
2646            .into_iter()
2647            .filter_map(|entry| {
2648                entry.table().map(|table| TableMetadata {
2649                    id: entry.id(),
2650                    name: entry.name(),
2651                    table,
2652                })
2653            })
2654            .collect();
2655
2656        // Append empty batches to advance the timestamp of all tables.
2657        debug!("coordinator init: advancing all tables to current timestamp");
2658        let WriteTimestamp {
2659            timestamp: write_ts,
2660            advance_to,
2661        } = self.get_local_write_ts().await;
2662        let appends = table_metas
2663            .iter()
2664            .map(|meta| (meta.table.global_id_writes(), Vec::new()))
2665            .collect();
2666        // Append the tables in the background. We apply the write timestamp before getting a read
2667        // timestamp and reading a snapshot of each table, so the snapshots will block on their own
2668        // until the appends are complete.
2669        let table_fence_rx = self
2670            .controller
2671            .storage
2672            .append_table(write_ts.clone(), advance_to, appends)
2673            .expect("invalid updates");
2674
2675        self.apply_local_write(write_ts).await;
2676
2677        // Add builtin table updates the clear the contents of all system tables
2678        debug!("coordinator init: resetting system tables");
2679        let read_ts = self.get_local_read_ts().await;
2680
2681        // Filter out the 'mz_storage_usage_by_shard' table since we need to retain that info for
2682        // billing purposes.
2683        let mz_storage_usage_by_shard_schema: SchemaSpecifier = self
2684            .catalog()
2685            .resolve_system_schema(MZ_STORAGE_USAGE_BY_SHARD.schema)
2686            .into();
2687        let is_storage_usage_by_shard = |meta: &TableMetadata| -> bool {
2688            meta.name.item == MZ_STORAGE_USAGE_BY_SHARD.name
2689                && meta.name.qualifiers.schema_spec == mz_storage_usage_by_shard_schema
2690        };
2691
2692        let mut retraction_tasks = Vec::new();
2693        let mut system_tables: Vec<_> = table_metas
2694            .iter()
2695            .filter(|meta| meta.id.is_system() && !is_storage_usage_by_shard(meta))
2696            .collect();
2697
2698        // Special case audit events because it's append only.
2699        let (audit_events_idx, _) = system_tables
2700            .iter()
2701            .find_position(|table| {
2702                table.id == self.catalog().resolve_builtin_table(&MZ_AUDIT_EVENTS)
2703            })
2704            .expect("mz_audit_events must exist");
2705        let audit_events = system_tables.remove(audit_events_idx);
2706        let audit_log_task = self.bootstrap_audit_log_table(
2707            audit_events.id,
2708            audit_events.name,
2709            audit_events.table,
2710            audit_logs_iterator,
2711            read_ts,
2712        );
2713
2714        for system_table in system_tables {
2715            let table_id = system_table.id;
2716            let full_name = self.catalog().resolve_full_name(system_table.name, None);
2717            debug!("coordinator init: resetting system table {full_name} ({table_id})");
2718
2719            // Fetch the current contents of the table for retraction.
2720            let snapshot_fut = self
2721                .controller
2722                .storage_collections
2723                .snapshot_cursor(system_table.table.global_id_writes(), read_ts);
2724            let batch_fut = self
2725                .controller
2726                .storage_collections
2727                .create_update_builder(system_table.table.global_id_writes());
2728
2729            let task = spawn(|| format!("snapshot-{table_id}"), async move {
2730                // Create a TimestamplessUpdateBuilder.
2731                let mut batch = batch_fut
2732                    .await
2733                    .unwrap_or_terminate("cannot fail to create a batch for a BuiltinTable");
2734                tracing::info!(?table_id, "starting snapshot");
2735                // Get a cursor which will emit a consolidated snapshot.
2736                let mut snapshot_cursor = snapshot_fut
2737                    .await
2738                    .unwrap_or_terminate("cannot fail to snapshot");
2739
2740                // Retract the current contents, spilling into our builder.
2741                while let Some(values) = snapshot_cursor.next().await {
2742                    for (key, _t, d) in values {
2743                        let d_invert = d.neg();
2744                        batch.add(&key, &(), &d_invert).await;
2745                    }
2746                }
2747                tracing::info!(?table_id, "finished snapshot");
2748
2749                let batch = batch.finish().await;
2750                BuiltinTableUpdate::batch(table_id, batch)
2751            });
2752            retraction_tasks.push(task);
2753        }
2754
2755        let retractions_res = futures::future::join_all(retraction_tasks).await;
2756        for retractions in retractions_res {
2757            builtin_table_updates.push(retractions);
2758        }
2759
2760        let audit_join_start = Instant::now();
2761        info!("startup: coordinator init: bootstrap: join audit log deserialization beginning");
2762        let audit_log_updates = audit_log_task.await;
2763        let audit_log_builtin_table_updates = self
2764            .catalog()
2765            .state()
2766            .generate_builtin_table_updates(audit_log_updates);
2767        builtin_table_updates.extend(audit_log_builtin_table_updates);
2768        info!(
2769            "startup: coordinator init: bootstrap: join audit log deserialization complete in {:?}",
2770            audit_join_start.elapsed()
2771        );
2772
2773        // Now that the snapshots are complete, the appends must also be complete.
2774        table_fence_rx
2775            .await
2776            .expect("One-shot shouldn't be dropped during bootstrap")
2777            .unwrap_or_terminate("cannot fail to append");
2778
2779        info!("coordinator init: sending builtin table updates");
2780        let (_builtin_updates_fut, write_ts) = self
2781            .builtin_table_update()
2782            .execute(builtin_table_updates)
2783            .await;
2784        info!(?write_ts, "our write ts");
2785        if let Some(write_ts) = write_ts {
2786            self.apply_local_write(write_ts).await;
2787        }
2788    }
2789
2790    /// Prepare updates to the audit log table. The audit log table append only and very large, so
2791    /// we only need to find the events present in `audit_logs_iterator` but not in the audit log
2792    /// table.
2793    #[instrument]
2794    fn bootstrap_audit_log_table<'a>(
2795        &self,
2796        table_id: CatalogItemId,
2797        name: &'a QualifiedItemName,
2798        table: &'a Table,
2799        audit_logs_iterator: AuditLogIterator,
2800        read_ts: Timestamp,
2801    ) -> JoinHandle<Vec<StateUpdate>> {
2802        let full_name = self.catalog().resolve_full_name(name, None);
2803        debug!("coordinator init: reconciling audit log: {full_name} ({table_id})");
2804        let current_contents_fut = self
2805            .controller
2806            .storage_collections
2807            .snapshot(table.global_id_writes(), read_ts);
2808        spawn(|| format!("snapshot-audit-log-{table_id}"), async move {
2809            let current_contents = current_contents_fut
2810                .await
2811                .unwrap_or_terminate("cannot fail to fetch snapshot");
2812            let contents_len = current_contents.len();
2813            debug!("coordinator init: audit log table ({table_id}) size {contents_len}");
2814
2815            // Fetch the largest audit log event ID that has been written to the table.
2816            let max_table_id = current_contents
2817                .into_iter()
2818                .filter(|(_, diff)| *diff == 1)
2819                .map(|(row, _diff)| row.unpack_first().unwrap_uint64())
2820                .sorted()
2821                .rev()
2822                .next();
2823
2824            // Filter audit log catalog updates to those that are not present in the table.
2825            audit_logs_iterator
2826                .take_while(|(audit_log, _)| match max_table_id {
2827                    Some(id) => audit_log.event.sortable_id() > id,
2828                    None => true,
2829                })
2830                .map(|(audit_log, ts)| StateUpdate {
2831                    kind: StateUpdateKind::AuditLog(audit_log),
2832                    ts,
2833                    diff: StateDiff::Addition,
2834                })
2835                .collect::<Vec<_>>()
2836        })
2837    }
2838
2839    /// Initializes all storage collections required by catalog objects in the storage controller.
2840    ///
2841    /// This method takes care of collection creation, as well as migration of existing
2842    /// collections.
2843    ///
2844    /// Creating all storage collections in a single `create_collections` call, rather than on
2845    /// demand, is more efficient as it reduces the number of writes to durable storage. It also
2846    /// allows subsequent bootstrap logic to fetch metadata (such as frontiers) of arbitrary
2847    /// storage collections, without needing to worry about dependency order.
2848    ///
2849    /// `migrated_storage_collections` is a set of builtin storage collections that have been
2850    /// migrated and should be handled specially.
2851    #[instrument]
2852    async fn bootstrap_storage_collections(
2853        &mut self,
2854        migrated_storage_collections: &BTreeSet<CatalogItemId>,
2855    ) {
2856        let catalog = self.catalog();
2857
2858        let source_desc = |object_id: GlobalId,
2859                           data_source: &DataSourceDesc,
2860                           desc: &RelationDesc,
2861                           timeline: &Timeline| {
2862            let data_source = match data_source.clone() {
2863                // Re-announce the source description.
2864                DataSourceDesc::Ingestion { desc, cluster_id } => {
2865                    let desc = desc.into_inline_connection(catalog.state());
2866                    let ingestion = IngestionDescription::new(desc, cluster_id, object_id);
2867                    DataSource::Ingestion(ingestion)
2868                }
2869                DataSourceDesc::OldSyntaxIngestion {
2870                    desc,
2871                    progress_subsource,
2872                    data_config,
2873                    details,
2874                    cluster_id,
2875                } => {
2876                    let desc = desc.into_inline_connection(catalog.state());
2877                    let data_config = data_config.into_inline_connection(catalog.state());
2878                    // TODO(parkmycar): We should probably check the type here, but I'm not sure if
2879                    // this will always be a Source or a Table.
2880                    let progress_subsource =
2881                        catalog.get_entry(&progress_subsource).latest_global_id();
2882                    let mut ingestion =
2883                        IngestionDescription::new(desc, cluster_id, progress_subsource);
2884                    let legacy_export = SourceExport {
2885                        storage_metadata: (),
2886                        data_config,
2887                        details,
2888                    };
2889                    ingestion.source_exports.insert(object_id, legacy_export);
2890
2891                    DataSource::Ingestion(ingestion)
2892                }
2893                DataSourceDesc::IngestionExport {
2894                    ingestion_id,
2895                    external_reference: _,
2896                    details,
2897                    data_config,
2898                } => {
2899                    // TODO(parkmycar): We should probably check the type here, but I'm not sure if
2900                    // this will always be a Source or a Table.
2901                    let ingestion_id = catalog.get_entry(&ingestion_id).latest_global_id();
2902
2903                    DataSource::IngestionExport {
2904                        ingestion_id,
2905                        details,
2906                        data_config: data_config.into_inline_connection(catalog.state()),
2907                    }
2908                }
2909                DataSourceDesc::Webhook { .. } => DataSource::Webhook,
2910                DataSourceDesc::Progress => DataSource::Progress,
2911                DataSourceDesc::Introspection(introspection) => {
2912                    DataSource::Introspection(introspection)
2913                }
2914                DataSourceDesc::Catalog => DataSource::Other,
2915            };
2916            CollectionDescription {
2917                desc: desc.clone(),
2918                data_source,
2919                since: None,
2920                timeline: Some(timeline.clone()),
2921                primary: None,
2922            }
2923        };
2924
2925        let mut compute_collections = vec![];
2926        let mut collections = vec![];
2927        for entry in catalog.entries() {
2928            match entry.item() {
2929                CatalogItem::Source(source) => {
2930                    collections.push((
2931                        source.global_id(),
2932                        source_desc(
2933                            source.global_id(),
2934                            &source.data_source,
2935                            &source.desc,
2936                            &source.timeline,
2937                        ),
2938                    ));
2939                }
2940                CatalogItem::Table(table) => {
2941                    match &table.data_source {
2942                        TableDataSource::TableWrites { defaults: _ } => {
2943                            let versions: BTreeMap<_, _> = table
2944                                .collection_descs()
2945                                .map(|(gid, version, desc)| (version, (gid, desc)))
2946                                .collect();
2947                            let collection_descs = versions.iter().map(|(version, (gid, desc))| {
2948                                let next_version = version.bump();
2949                                let primary_collection =
2950                                    versions.get(&next_version).map(|(gid, _desc)| gid).copied();
2951                                let mut collection_desc =
2952                                    CollectionDescription::for_table(desc.clone());
2953                                collection_desc.primary = primary_collection;
2954
2955                                (*gid, collection_desc)
2956                            });
2957                            collections.extend(collection_descs);
2958                        }
2959                        TableDataSource::DataSource {
2960                            desc: data_source_desc,
2961                            timeline,
2962                        } => {
2963                            // TODO(alter_table): Support versioning tables that read from sources.
2964                            soft_assert_eq_or_log!(table.collections.len(), 1);
2965                            let collection_descs =
2966                                table.collection_descs().map(|(gid, _version, desc)| {
2967                                    (
2968                                        gid,
2969                                        source_desc(
2970                                            entry.latest_global_id(),
2971                                            data_source_desc,
2972                                            &desc,
2973                                            timeline,
2974                                        ),
2975                                    )
2976                                });
2977                            collections.extend(collection_descs);
2978                        }
2979                    };
2980                }
2981                CatalogItem::MaterializedView(mv) => {
2982                    let collection_descs = mv.collection_descs().map(|(gid, _version, desc)| {
2983                        let collection_desc =
2984                            CollectionDescription::for_other(desc, mv.initial_as_of.clone());
2985                        (gid, collection_desc)
2986                    });
2987
2988                    collections.extend(collection_descs);
2989                    compute_collections.push((mv.global_id_writes(), mv.desc.latest()));
2990                }
2991                CatalogItem::ContinualTask(ct) => {
2992                    let collection_desc =
2993                        CollectionDescription::for_other(ct.desc.clone(), ct.initial_as_of.clone());
2994                    compute_collections.push((ct.global_id(), ct.desc.clone()));
2995                    collections.push((ct.global_id(), collection_desc));
2996                }
2997                CatalogItem::Sink(sink) => {
2998                    let storage_sink_from_entry = self.catalog().get_entry_by_global_id(&sink.from);
2999                    let from_desc = storage_sink_from_entry
3000                        .relation_desc()
3001                        .expect("sinks can only be built on items with descs")
3002                        .into_owned();
3003                    let collection_desc = CollectionDescription {
3004                        // TODO(sinks): make generic once we have more than one sink type.
3005                        desc: KAFKA_PROGRESS_DESC.clone(),
3006                        data_source: DataSource::Sink {
3007                            desc: ExportDescription {
3008                                sink: StorageSinkDesc {
3009                                    from: sink.from,
3010                                    from_desc,
3011                                    connection: sink
3012                                        .connection
3013                                        .clone()
3014                                        .into_inline_connection(self.catalog().state()),
3015                                    envelope: sink.envelope,
3016                                    as_of: Antichain::from_elem(Timestamp::minimum()),
3017                                    with_snapshot: sink.with_snapshot,
3018                                    version: sink.version,
3019                                    from_storage_metadata: (),
3020                                    to_storage_metadata: (),
3021                                    commit_interval: sink.commit_interval,
3022                                },
3023                                instance_id: sink.cluster_id,
3024                            },
3025                        },
3026                        since: None,
3027                        timeline: None,
3028                        primary: None,
3029                    };
3030                    collections.push((sink.global_id, collection_desc));
3031                }
3032                CatalogItem::Log(_)
3033                | CatalogItem::View(_)
3034                | CatalogItem::Index(_)
3035                | CatalogItem::Type(_)
3036                | CatalogItem::Func(_)
3037                | CatalogItem::Secret(_)
3038                | CatalogItem::Connection(_) => (),
3039            }
3040        }
3041
3042        let register_ts = if self.controller.read_only() {
3043            self.get_local_read_ts().await
3044        } else {
3045            // Getting a write timestamp bumps the write timestamp in the
3046            // oracle, which we're not allowed in read-only mode.
3047            self.get_local_write_ts().await.timestamp
3048        };
3049
3050        let storage_metadata = self.catalog.state().storage_metadata();
3051        let migrated_storage_collections = migrated_storage_collections
3052            .into_iter()
3053            .flat_map(|item_id| self.catalog.get_entry(item_id).global_ids())
3054            .collect();
3055
3056        // Before possibly creating collections, make sure their schemas are correct.
3057        //
3058        // Across different versions of Materialize the nullability of columns can change based on
3059        // updates to our optimizer.
3060        self.controller
3061            .storage
3062            .evolve_nullability_for_bootstrap(storage_metadata, compute_collections)
3063            .await
3064            .unwrap_or_terminate("cannot fail to evolve collections");
3065
3066        // New builtin storage collections are by default created with [0] since/upper frontiers.
3067        // For collections that have dependencies on other collections (MVs, CTs), this can violate
3068        // the frontier invariants assumed by as-of selection. For example, as-of selection expects
3069        // to be able to pick up computing a materialized view from its most recent upper, but if
3070        // that upper is [0] it's likely that the required times are not available anymore in the
3071        // MV inputs.
3072        //
3073        // To avoid violating frontier invariants, we need to bump their sinces to times greater
3074        // than all of their upstream storage inputs. To know the since of a storage input, it has
3075        // to be registered with the storage controller first. Thus we register collections in
3076        // layers: Each iteration registers the collections whose dependencies are all already
3077        // registered.
3078        let mut pending: BTreeMap<_, _> = collections.into_iter().collect();
3079
3080        // Precompute storage-collection dependencies for each collection.
3081        let transitive_dep_gids: BTreeMap<_, _> = pending
3082            .keys()
3083            .map(|gid| {
3084                let entry = self.catalog.get_entry_by_global_id(gid);
3085                let item_id = entry.id();
3086                let deps = self.catalog.state().transitive_uses(item_id);
3087                let dep_gids: BTreeSet<_> = deps
3088                    // Ignore self-dependencies. For example, `transitive_uses` includes the input ID,
3089                    // and CTs can depend on themselves.
3090                    .filter(|dep_id| *dep_id != item_id)
3091                    .map(|dep_id| self.catalog.get_entry(&dep_id).latest_global_id())
3092                    // Ignore dependencies on objects that are not storage collections.
3093                    .filter(|dep_gid| pending.contains_key(dep_gid))
3094                    .collect();
3095                (*gid, dep_gids)
3096            })
3097            .collect();
3098
3099        while !pending.is_empty() {
3100            // Drain collections whose dependencies have all been registered already
3101            // (i.e., are not in `pending`).
3102            let ready_gids: BTreeSet<_> = pending
3103                .keys()
3104                .filter(|gid| {
3105                    let mut deps = transitive_dep_gids[gid].iter();
3106                    !deps.any(|dep_gid| pending.contains_key(dep_gid))
3107                })
3108                .copied()
3109                .collect();
3110            let mut ready: Vec<_> = pending
3111                .extract_if(.., |gid, _| ready_gids.contains(gid))
3112                .collect();
3113
3114            // Bump sinces of builtin collections.
3115            for (gid, collection) in &mut ready {
3116                // Don't silently overwrite an explicitly specified `since`.
3117                if !gid.is_system() || collection.since.is_some() {
3118                    continue;
3119                }
3120
3121                let mut derived_since = Antichain::from_elem(Timestamp::MIN);
3122                for dep_gid in &transitive_dep_gids[gid] {
3123                    let (since, _) = self
3124                        .controller
3125                        .storage
3126                        .collection_frontiers(*dep_gid)
3127                        .expect("previously registered");
3128                    derived_since.join_assign(&since);
3129                }
3130                collection.since = Some(derived_since);
3131            }
3132
3133            if ready.is_empty() {
3134                soft_panic_or_log!(
3135                    "cycle in storage collections: {:?}",
3136                    pending.keys().collect::<Vec<_>>(),
3137                );
3138                // We get here only due to a bug. Rather than crash-looping, we try our best to
3139                // reach a sane state by attempting to register all the remaining collections at
3140                // once.
3141                ready = mem::take(&mut pending).into_iter().collect();
3142            }
3143
3144            self.controller
3145                .storage
3146                .create_collections_for_bootstrap(
3147                    storage_metadata,
3148                    Some(register_ts),
3149                    ready,
3150                    &migrated_storage_collections,
3151                )
3152                .await
3153                .unwrap_or_terminate("cannot fail to create collections");
3154        }
3155
3156        if !self.controller.read_only() {
3157            self.apply_local_write(register_ts).await;
3158        }
3159    }
3160
3161    /// Returns the current list of catalog entries, sorted into an appropriate order for
3162    /// bootstrapping.
3163    ///
3164    /// The returned entries are in dependency order. Indexes are sorted immediately after the
3165    /// objects they index, to ensure that all dependants of these indexed objects can make use of
3166    /// the respective indexes.
3167    fn bootstrap_sort_catalog_entries(&self) -> Vec<CatalogEntry> {
3168        let mut indexes_on = BTreeMap::<_, Vec<_>>::new();
3169        let mut non_indexes = Vec::new();
3170        for entry in self.catalog().entries().cloned() {
3171            if let Some(index) = entry.index() {
3172                let on = self.catalog().get_entry_by_global_id(&index.on);
3173                indexes_on.entry(on.id()).or_default().push(entry);
3174            } else {
3175                non_indexes.push(entry);
3176            }
3177        }
3178
3179        let key_fn = |entry: &CatalogEntry| entry.id;
3180        let dependencies_fn = |entry: &CatalogEntry| entry.uses();
3181        sort_topological(&mut non_indexes, key_fn, dependencies_fn);
3182
3183        let mut result = Vec::new();
3184        for entry in non_indexes {
3185            let id = entry.id();
3186            result.push(entry);
3187            if let Some(mut indexes) = indexes_on.remove(&id) {
3188                result.append(&mut indexes);
3189            }
3190        }
3191
3192        soft_assert_or_log!(
3193            indexes_on.is_empty(),
3194            "indexes with missing dependencies: {indexes_on:?}",
3195        );
3196
3197        result
3198    }
3199
3200    /// Invokes the optimizer on all indexes and materialized views in the catalog and inserts the
3201    /// resulting dataflow plans into the catalog state.
3202    ///
3203    /// `ordered_catalog_entries` must be sorted in dependency order, with dependencies ordered
3204    /// before their dependants.
3205    ///
3206    /// This method does not perform timestamp selection for the dataflows, nor does it create them
3207    /// in the compute controller. Both of these steps happen later during bootstrapping.
3208    ///
3209    /// Returns a map of expressions that were not cached.
3210    #[instrument]
3211    fn bootstrap_dataflow_plans(
3212        &mut self,
3213        ordered_catalog_entries: &[CatalogEntry],
3214        mut cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
3215    ) -> Result<BTreeMap<GlobalId, GlobalExpressions>, AdapterError> {
3216        // The optimizer expects to be able to query its `ComputeInstanceSnapshot` for
3217        // collections the current dataflow can depend on. But since we don't yet install anything
3218        // on compute instances, the snapshot information is incomplete. We fix that by manually
3219        // updating `ComputeInstanceSnapshot` objects to ensure they contain collections previously
3220        // optimized.
3221        let mut instance_snapshots = BTreeMap::new();
3222        let mut uncached_expressions = BTreeMap::new();
3223
3224        let optimizer_config = |catalog: &Catalog, cluster_id| {
3225            let system_config = catalog.system_config();
3226            let overrides = catalog.get_cluster(cluster_id).config.features();
3227            OptimizerConfig::from(system_config).override_from(&overrides)
3228        };
3229
3230        for entry in ordered_catalog_entries {
3231            match entry.item() {
3232                CatalogItem::Index(idx) => {
3233                    // Collect optimizer parameters.
3234                    let compute_instance =
3235                        instance_snapshots.entry(idx.cluster_id).or_insert_with(|| {
3236                            self.instance_snapshot(idx.cluster_id)
3237                                .expect("compute instance exists")
3238                        });
3239                    let global_id = idx.global_id();
3240
3241                    // The index may already be installed on the compute instance. For example,
3242                    // this is the case for introspection indexes.
3243                    if compute_instance.contains_collection(&global_id) {
3244                        continue;
3245                    }
3246
3247                    let optimizer_config = optimizer_config(&self.catalog, idx.cluster_id);
3248
3249                    let (optimized_plan, physical_plan, metainfo) =
3250                        match cached_global_exprs.remove(&global_id) {
3251                            Some(global_expressions)
3252                                if global_expressions.optimizer_features
3253                                    == optimizer_config.features =>
3254                            {
3255                                debug!("global expression cache hit for {global_id:?}");
3256                                (
3257                                    global_expressions.global_mir,
3258                                    global_expressions.physical_plan,
3259                                    global_expressions.dataflow_metainfos,
3260                                )
3261                            }
3262                            Some(_) | None => {
3263                                let (optimized_plan, global_lir_plan) = {
3264                                    // Build an optimizer for this INDEX.
3265                                    let mut optimizer = optimize::index::Optimizer::new(
3266                                        self.owned_catalog(),
3267                                        compute_instance.clone(),
3268                                        global_id,
3269                                        optimizer_config.clone(),
3270                                        self.optimizer_metrics(),
3271                                    );
3272
3273                                    // MIR ⇒ MIR optimization (global)
3274                                    let index_plan = optimize::index::Index::new(
3275                                        entry.name().clone(),
3276                                        idx.on,
3277                                        idx.keys.to_vec(),
3278                                    );
3279                                    let global_mir_plan = optimizer.optimize(index_plan)?;
3280                                    let optimized_plan = global_mir_plan.df_desc().clone();
3281
3282                                    // MIR ⇒ LIR lowering and LIR ⇒ LIR optimization (global)
3283                                    let global_lir_plan = optimizer.optimize(global_mir_plan)?;
3284
3285                                    (optimized_plan, global_lir_plan)
3286                                };
3287
3288                                let (physical_plan, metainfo) = global_lir_plan.unapply();
3289                                let metainfo = {
3290                                    // Pre-allocate a vector of transient GlobalIds for each notice.
3291                                    let notice_ids =
3292                                        std::iter::repeat_with(|| self.allocate_transient_id())
3293                                            .map(|(_item_id, gid)| gid)
3294                                            .take(metainfo.optimizer_notices.len())
3295                                            .collect::<Vec<_>>();
3296                                    // Return a metainfo with rendered notices.
3297                                    self.catalog().render_notices(
3298                                        metainfo,
3299                                        notice_ids,
3300                                        Some(idx.global_id()),
3301                                    )
3302                                };
3303                                uncached_expressions.insert(
3304                                    global_id,
3305                                    GlobalExpressions {
3306                                        global_mir: optimized_plan.clone(),
3307                                        physical_plan: physical_plan.clone(),
3308                                        dataflow_metainfos: metainfo.clone(),
3309                                        optimizer_features: optimizer_config.features.clone(),
3310                                    },
3311                                );
3312                                (optimized_plan, physical_plan, metainfo)
3313                            }
3314                        };
3315
3316                    let catalog = self.catalog_mut();
3317                    catalog.set_optimized_plan(idx.global_id(), optimized_plan);
3318                    catalog.set_physical_plan(idx.global_id(), physical_plan);
3319                    catalog.set_dataflow_metainfo(idx.global_id(), metainfo);
3320
3321                    compute_instance.insert_collection(idx.global_id());
3322                }
3323                CatalogItem::MaterializedView(mv) => {
3324                    // Collect optimizer parameters.
3325                    let compute_instance =
3326                        instance_snapshots.entry(mv.cluster_id).or_insert_with(|| {
3327                            self.instance_snapshot(mv.cluster_id)
3328                                .expect("compute instance exists")
3329                        });
3330                    let global_id = mv.global_id_writes();
3331
3332                    let optimizer_config = optimizer_config(&self.catalog, mv.cluster_id);
3333
3334                    let (optimized_plan, physical_plan, metainfo) = match cached_global_exprs
3335                        .remove(&global_id)
3336                    {
3337                        Some(global_expressions)
3338                            if global_expressions.optimizer_features
3339                                == optimizer_config.features =>
3340                        {
3341                            debug!("global expression cache hit for {global_id:?}");
3342                            (
3343                                global_expressions.global_mir,
3344                                global_expressions.physical_plan,
3345                                global_expressions.dataflow_metainfos,
3346                            )
3347                        }
3348                        Some(_) | None => {
3349                            let (_, internal_view_id) = self.allocate_transient_id();
3350                            let debug_name = self
3351                                .catalog()
3352                                .resolve_full_name(entry.name(), None)
3353                                .to_string();
3354                            let force_non_monotonic = Default::default();
3355
3356                            let (optimized_plan, global_lir_plan) = {
3357                                // Build an optimizer for this MATERIALIZED VIEW.
3358                                let mut optimizer = optimize::materialized_view::Optimizer::new(
3359                                    self.owned_catalog().as_optimizer_catalog(),
3360                                    compute_instance.clone(),
3361                                    global_id,
3362                                    internal_view_id,
3363                                    mv.desc.latest().iter_names().cloned().collect(),
3364                                    mv.non_null_assertions.clone(),
3365                                    mv.refresh_schedule.clone(),
3366                                    debug_name,
3367                                    optimizer_config.clone(),
3368                                    self.optimizer_metrics(),
3369                                    force_non_monotonic,
3370                                );
3371
3372                                // MIR ⇒ MIR optimization (global)
3373                                // We make sure to use the HIR SQL type (since MIR SQL types may not be coherent).
3374                                let typ = infer_sql_type_for_catalog(
3375                                    &mv.raw_expr,
3376                                    &mv.locally_optimized_expr.as_ref().clone(),
3377                                );
3378                                let global_mir_plan = optimizer
3379                                    .optimize((mv.locally_optimized_expr.as_ref().clone(), typ))?;
3380                                let optimized_plan = global_mir_plan.df_desc().clone();
3381
3382                                // MIR ⇒ LIR lowering and LIR ⇒ LIR optimization (global)
3383                                let global_lir_plan = optimizer.optimize(global_mir_plan)?;
3384
3385                                (optimized_plan, global_lir_plan)
3386                            };
3387
3388                            let (physical_plan, metainfo) = global_lir_plan.unapply();
3389                            let metainfo = {
3390                                // Pre-allocate a vector of transient GlobalIds for each notice.
3391                                let notice_ids =
3392                                    std::iter::repeat_with(|| self.allocate_transient_id())
3393                                        .map(|(_item_id, global_id)| global_id)
3394                                        .take(metainfo.optimizer_notices.len())
3395                                        .collect::<Vec<_>>();
3396                                // Return a metainfo with rendered notices.
3397                                self.catalog().render_notices(
3398                                    metainfo,
3399                                    notice_ids,
3400                                    Some(mv.global_id_writes()),
3401                                )
3402                            };
3403                            uncached_expressions.insert(
3404                                global_id,
3405                                GlobalExpressions {
3406                                    global_mir: optimized_plan.clone(),
3407                                    physical_plan: physical_plan.clone(),
3408                                    dataflow_metainfos: metainfo.clone(),
3409                                    optimizer_features: optimizer_config.features.clone(),
3410                                },
3411                            );
3412                            (optimized_plan, physical_plan, metainfo)
3413                        }
3414                    };
3415
3416                    let catalog = self.catalog_mut();
3417                    catalog.set_optimized_plan(mv.global_id_writes(), optimized_plan);
3418                    catalog.set_physical_plan(mv.global_id_writes(), physical_plan);
3419                    catalog.set_dataflow_metainfo(mv.global_id_writes(), metainfo);
3420
3421                    compute_instance.insert_collection(mv.global_id_writes());
3422                }
3423                CatalogItem::ContinualTask(ct) => {
3424                    let compute_instance =
3425                        instance_snapshots.entry(ct.cluster_id).or_insert_with(|| {
3426                            self.instance_snapshot(ct.cluster_id)
3427                                .expect("compute instance exists")
3428                        });
3429                    let global_id = ct.global_id();
3430
3431                    let optimizer_config = optimizer_config(&self.catalog, ct.cluster_id);
3432
3433                    let (optimized_plan, physical_plan, metainfo) =
3434                        match cached_global_exprs.remove(&global_id) {
3435                            Some(global_expressions)
3436                                if global_expressions.optimizer_features
3437                                    == optimizer_config.features =>
3438                            {
3439                                debug!("global expression cache hit for {global_id:?}");
3440                                (
3441                                    global_expressions.global_mir,
3442                                    global_expressions.physical_plan,
3443                                    global_expressions.dataflow_metainfos,
3444                                )
3445                            }
3446                            Some(_) | None => {
3447                                let debug_name = self
3448                                    .catalog()
3449                                    .resolve_full_name(entry.name(), None)
3450                                    .to_string();
3451                                let (optimized_plan, physical_plan, metainfo, optimizer_features) =
3452                                    self.optimize_create_continual_task(
3453                                        ct,
3454                                        global_id,
3455                                        self.owned_catalog(),
3456                                        debug_name,
3457                                    )?;
3458                                uncached_expressions.insert(
3459                                    global_id,
3460                                    GlobalExpressions {
3461                                        global_mir: optimized_plan.clone(),
3462                                        physical_plan: physical_plan.clone(),
3463                                        dataflow_metainfos: metainfo.clone(),
3464                                        optimizer_features,
3465                                    },
3466                                );
3467                                (optimized_plan, physical_plan, metainfo)
3468                            }
3469                        };
3470
3471                    let catalog = self.catalog_mut();
3472                    catalog.set_optimized_plan(ct.global_id(), optimized_plan);
3473                    catalog.set_physical_plan(ct.global_id(), physical_plan);
3474                    catalog.set_dataflow_metainfo(ct.global_id(), metainfo);
3475
3476                    compute_instance.insert_collection(ct.global_id());
3477                }
3478                CatalogItem::Table(_)
3479                | CatalogItem::Source(_)
3480                | CatalogItem::Log(_)
3481                | CatalogItem::View(_)
3482                | CatalogItem::Sink(_)
3483                | CatalogItem::Type(_)
3484                | CatalogItem::Func(_)
3485                | CatalogItem::Secret(_)
3486                | CatalogItem::Connection(_) => (),
3487            }
3488        }
3489
3490        Ok(uncached_expressions)
3491    }
3492
3493    /// Selects for each compute dataflow an as-of suitable for bootstrapping it.
3494    ///
3495    /// Returns a set of [`ReadHold`]s that ensures the read frontiers of involved collections stay
3496    /// in place and that must not be dropped before all compute dataflows have been created with
3497    /// the compute controller.
3498    ///
3499    /// This method expects all storage collections and dataflow plans to be available, so it must
3500    /// run after [`Coordinator::bootstrap_storage_collections`] and
3501    /// [`Coordinator::bootstrap_dataflow_plans`].
3502    async fn bootstrap_dataflow_as_ofs(&mut self) -> BTreeMap<GlobalId, ReadHold> {
3503        let mut catalog_ids = Vec::new();
3504        let mut dataflows = Vec::new();
3505        let mut read_policies = BTreeMap::new();
3506        for entry in self.catalog.entries() {
3507            let gid = match entry.item() {
3508                CatalogItem::Index(idx) => idx.global_id(),
3509                CatalogItem::MaterializedView(mv) => mv.global_id_writes(),
3510                CatalogItem::ContinualTask(ct) => ct.global_id(),
3511                CatalogItem::Table(_)
3512                | CatalogItem::Source(_)
3513                | CatalogItem::Log(_)
3514                | CatalogItem::View(_)
3515                | CatalogItem::Sink(_)
3516                | CatalogItem::Type(_)
3517                | CatalogItem::Func(_)
3518                | CatalogItem::Secret(_)
3519                | CatalogItem::Connection(_) => continue,
3520            };
3521            if let Some(plan) = self.catalog.try_get_physical_plan(&gid) {
3522                catalog_ids.push(gid);
3523                dataflows.push(plan.clone());
3524
3525                if let Some(compaction_window) = entry.item().initial_logical_compaction_window() {
3526                    read_policies.insert(gid, compaction_window.into());
3527                }
3528            }
3529        }
3530
3531        let read_ts = self.get_local_read_ts().await;
3532        let read_holds = as_of_selection::run(
3533            &mut dataflows,
3534            &read_policies,
3535            &*self.controller.storage_collections,
3536            read_ts,
3537            self.controller.read_only(),
3538        );
3539
3540        let catalog = self.catalog_mut();
3541        for (id, plan) in catalog_ids.into_iter().zip_eq(dataflows) {
3542            catalog.set_physical_plan(id, plan);
3543        }
3544
3545        read_holds
3546    }
3547
3548    /// Serves the coordinator, receiving commands from users over `cmd_rx`
3549    /// and feedback from dataflow workers over `feedback_rx`.
3550    ///
3551    /// You must call `bootstrap` before calling this method.
3552    ///
3553    /// BOXED FUTURE: As of Nov 2023 the returned Future from this function was 92KB. This would
3554    /// get stored on the stack which is bad for runtime performance, and blow up our stack usage.
3555    /// Because of that we purposefully move this Future onto the heap (i.e. Box it).
3556    fn serve(
3557        mut self,
3558        mut internal_cmd_rx: mpsc::UnboundedReceiver<Message>,
3559        mut strict_serializable_reads_rx: mpsc::UnboundedReceiver<(ConnectionId, PendingReadTxn)>,
3560        mut cmd_rx: mpsc::UnboundedReceiver<(OpenTelemetryContext, Command)>,
3561        group_commit_rx: appends::GroupCommitWaiter,
3562    ) -> LocalBoxFuture<'static, ()> {
3563        async move {
3564            // Watcher that listens for and reports cluster service status changes.
3565            let mut cluster_events = self.controller.events_stream();
3566            let last_message = Arc::new(Mutex::new(LastMessage {
3567                kind: "none",
3568                stmt: None,
3569            }));
3570
3571            let (idle_tx, mut idle_rx) = tokio::sync::mpsc::channel(1);
3572            let idle_metric = self.metrics.queue_busy_seconds.clone();
3573            let last_message_watchdog = Arc::clone(&last_message);
3574
3575            spawn(|| "coord watchdog", async move {
3576                // Every 5 seconds, attempt to measure how long it takes for the
3577                // coord select loop to be empty, because this message is the last
3578                // processed. If it is idle, this will result in some microseconds
3579                // of measurement.
3580                let mut interval = tokio::time::interval(Duration::from_secs(5));
3581                // If we end up having to wait more than 5 seconds for the coord to respond, then the
3582                // behavior of Delay results in the interval "restarting" from whenever we yield
3583                // instead of trying to catch up.
3584                interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
3585
3586                // Track if we become stuck to de-dupe error reporting.
3587                let mut coord_stuck = false;
3588
3589                loop {
3590                    interval.tick().await;
3591
3592                    // Wait for space in the channel, if we timeout then the coordinator is stuck!
3593                    let duration = tokio::time::Duration::from_secs(30);
3594                    let timeout = tokio::time::timeout(duration, idle_tx.reserve()).await;
3595                    let Ok(maybe_permit) = timeout else {
3596                        // Only log if we're newly stuck, to prevent logging repeatedly.
3597                        if !coord_stuck {
3598                            let last_message = last_message_watchdog.lock().expect("poisoned");
3599                            tracing::warn!(
3600                                last_message_kind = %last_message.kind,
3601                                last_message_sql = %last_message.stmt_to_string(),
3602                                "coordinator stuck for {duration:?}",
3603                            );
3604                        }
3605                        coord_stuck = true;
3606
3607                        continue;
3608                    };
3609
3610                    // We got a permit, we're not stuck!
3611                    if coord_stuck {
3612                        tracing::info!("Coordinator became unstuck");
3613                    }
3614                    coord_stuck = false;
3615
3616                    // If we failed to acquire a permit it's because we're shutting down.
3617                    let Ok(permit) = maybe_permit else {
3618                        break;
3619                    };
3620
3621                    permit.send(idle_metric.start_timer());
3622                }
3623            });
3624
3625            self.schedule_storage_usage_collection().await;
3626            self.spawn_privatelink_vpc_endpoints_watch_task();
3627            self.spawn_statement_logging_task();
3628            flags::tracing_config(self.catalog.system_config()).apply(&self.tracing_handle);
3629
3630            // Report if the handling of a single message takes longer than this threshold.
3631            let warn_threshold = self
3632                .catalog()
3633                .system_config()
3634                .coord_slow_message_warn_threshold();
3635
3636            // How many messages we'd like to batch up before processing them. Must be > 0.
3637            const MESSAGE_BATCH: usize = 64;
3638            let mut messages = Vec::with_capacity(MESSAGE_BATCH);
3639            let mut cmd_messages = Vec::with_capacity(MESSAGE_BATCH);
3640
3641            let message_batch = self.metrics.message_batch.clone();
3642
3643            loop {
3644                // Before adding a branch to this select loop, please ensure that the branch is
3645                // cancellation safe and add a comment explaining why. You can refer here for more
3646                // info: https://docs.rs/tokio/latest/tokio/macro.select.html#cancellation-safety
3647                select! {
3648                    // We prioritize internal commands over other commands. However, we work through
3649                    // batches of commands in some branches of this select, which means that even if
3650                    // a command generates internal commands, we will work through the current batch
3651                    // before receiving a new batch of commands.
3652                    biased;
3653
3654                    // `recv_many()` on `UnboundedReceiver` is cancellation safe:
3655                    // https://docs.rs/tokio/1.38.0/tokio/sync/mpsc/struct.UnboundedReceiver.html#cancel-safety-1
3656                    // Receive a batch of commands.
3657                    _ = internal_cmd_rx.recv_many(&mut messages, MESSAGE_BATCH) => {},
3658                    // `next()` on any stream is cancel-safe:
3659                    // https://docs.rs/tokio-stream/0.1.9/tokio_stream/trait.StreamExt.html#cancel-safety
3660                    // Receive a single command.
3661                    Some(event) = cluster_events.next() => {
3662                        messages.push(Message::ClusterEvent(event))
3663                    },
3664                    // See [`mz_controller::Controller::Controller::ready`] for notes
3665                    // on why this is cancel-safe.
3666                    // Receive a single command.
3667                    () = self.controller.ready() => {
3668                        // NOTE: We don't get a `Readiness` back from `ready()`
3669                        // because the controller wants to keep it and it's not
3670                        // trivially `Clone` or `Copy`. Hence this accessor.
3671                        let controller = match self.controller.get_readiness() {
3672                            Readiness::Storage => ControllerReadiness::Storage,
3673                            Readiness::Compute => ControllerReadiness::Compute,
3674                            Readiness::Metrics(_) => ControllerReadiness::Metrics,
3675                            Readiness::Internal(_) => ControllerReadiness::Internal,
3676                            Readiness::NotReady => unreachable!("just signaled as ready"),
3677                        };
3678                        messages.push(Message::ControllerReady { controller });
3679                    }
3680                    // See [`appends::GroupCommitWaiter`] for notes on why this is cancel safe.
3681                    // Receive a single command.
3682                    permit = group_commit_rx.ready() => {
3683                        // If we happen to have batched exactly one user write, use
3684                        // that span so the `emit_trace_id_notice` hooks up.
3685                        // Otherwise, the best we can do is invent a new root span
3686                        // and make it follow from all the Spans in the pending
3687                        // writes.
3688                        let user_write_spans = self.pending_writes.iter().flat_map(|x| match x {
3689                            PendingWriteTxn::User{span, ..} => Some(span),
3690                            PendingWriteTxn::System{..} => None,
3691                        });
3692                        let span = match user_write_spans.exactly_one() {
3693                            Ok(span) => span.clone(),
3694                            Err(user_write_spans) => {
3695                                let span = info_span!(parent: None, "group_commit_notify");
3696                                for s in user_write_spans {
3697                                    span.follows_from(s);
3698                                }
3699                                span
3700                            }
3701                        };
3702                        messages.push(Message::GroupCommitInitiate(span, Some(permit)));
3703                    },
3704                    // `recv_many()` on `UnboundedReceiver` is cancellation safe:
3705                    // https://docs.rs/tokio/1.38.0/tokio/sync/mpsc/struct.UnboundedReceiver.html#cancel-safety-1
3706                    // Receive a batch of commands.
3707                    count = cmd_rx.recv_many(&mut cmd_messages, MESSAGE_BATCH) => {
3708                        if count == 0 {
3709                            break;
3710                        } else {
3711                            messages.extend(cmd_messages.drain(..).map(
3712                                |(otel_ctx, cmd)| Message::Command(otel_ctx, cmd),
3713                            ));
3714                        }
3715                    },
3716                    // `recv()` on `UnboundedReceiver` is cancellation safe:
3717                    // https://docs.rs/tokio/1.38.0/tokio/sync/mpsc/struct.UnboundedReceiver.html#cancel-safety
3718                    // Receive a single command.
3719                    Some(pending_read_txn) = strict_serializable_reads_rx.recv() => {
3720                        let mut pending_read_txns = vec![pending_read_txn];
3721                        while let Ok(pending_read_txn) = strict_serializable_reads_rx.try_recv() {
3722                            pending_read_txns.push(pending_read_txn);
3723                        }
3724                        for (conn_id, pending_read_txn) in pending_read_txns {
3725                            let prev = self
3726                                .pending_linearize_read_txns
3727                                .insert(conn_id, pending_read_txn);
3728                            soft_assert_or_log!(
3729                                prev.is_none(),
3730                                "connections can not have multiple concurrent reads, prev: {prev:?}"
3731                            )
3732                        }
3733                        messages.push(Message::LinearizeReads);
3734                    }
3735                    // `tick()` on `Interval` is cancel-safe:
3736                    // https://docs.rs/tokio/1.19.2/tokio/time/struct.Interval.html#cancel-safety
3737                    // Receive a single command.
3738                    _ = self.advance_timelines_interval.tick() => {
3739                        let span = info_span!(parent: None, "coord::advance_timelines_interval");
3740                        span.follows_from(Span::current());
3741
3742                        // Group commit sends an `AdvanceTimelines` message when
3743                        // done, which is what downgrades read holds. In
3744                        // read-only mode we send this message directly because
3745                        // we're not doing group commits.
3746                        if self.controller.read_only() {
3747                            messages.push(Message::AdvanceTimelines);
3748                        } else {
3749                            messages.push(Message::GroupCommitInitiate(span, None));
3750                        }
3751                    },
3752                    // `tick()` on `Interval` is cancel-safe:
3753                    // https://docs.rs/tokio/1.19.2/tokio/time/struct.Interval.html#cancel-safety
3754                    // Receive a single command.
3755                    _ = self.check_cluster_scheduling_policies_interval.tick() => {
3756                        messages.push(Message::CheckSchedulingPolicies);
3757                    },
3758
3759                    // `tick()` on `Interval` is cancel-safe:
3760                    // https://docs.rs/tokio/1.19.2/tokio/time/struct.Interval.html#cancel-safety
3761                    // Receive a single command.
3762                    _ = self.caught_up_check_interval.tick() => {
3763                        // We do this directly on the main loop instead of
3764                        // firing off a message. We are still in read-only mode,
3765                        // so optimizing for latency, not blocking the main loop
3766                        // is not that important.
3767                        self.maybe_check_caught_up().await;
3768
3769                        continue;
3770                    },
3771
3772                    // Process the idle metric at the lowest priority to sample queue non-idle time.
3773                    // `recv()` on `Receiver` is cancellation safe:
3774                    // https://docs.rs/tokio/1.8.0/tokio/sync/mpsc/struct.Receiver.html#cancel-safety
3775                    // Receive a single command.
3776                    timer = idle_rx.recv() => {
3777                        timer.expect("does not drop").observe_duration();
3778                        self.metrics
3779                            .message_handling
3780                            .with_label_values(&["watchdog"])
3781                            .observe(0.0);
3782                        continue;
3783                    }
3784                };
3785
3786                // Observe the number of messages we're processing at once.
3787                message_batch.observe(f64::cast_lossy(messages.len()));
3788
3789                for msg in messages.drain(..) {
3790                    // All message processing functions trace. Start a parent span
3791                    // for them to make it easy to find slow messages.
3792                    let msg_kind = msg.kind();
3793                    let span = span!(
3794                        target: "mz_adapter::coord::handle_message_loop",
3795                        Level::INFO,
3796                        "coord::handle_message",
3797                        kind = msg_kind
3798                    );
3799                    let otel_context = span.context().span().span_context().clone();
3800
3801                    // Record the last kind of message in case we get stuck. For
3802                    // execute commands, we additionally stash the user's SQL,
3803                    // statement, so we can log it in case we get stuck.
3804                    *last_message.lock().expect("poisoned") = LastMessage {
3805                        kind: msg_kind,
3806                        stmt: match &msg {
3807                            Message::Command(
3808                                _,
3809                                Command::Execute {
3810                                    portal_name,
3811                                    session,
3812                                    ..
3813                                },
3814                            ) => session
3815                                .get_portal_unverified(portal_name)
3816                                .and_then(|p| p.stmt.as_ref().map(Arc::clone)),
3817                            _ => None,
3818                        },
3819                    };
3820
3821                    let start = Instant::now();
3822                    self.handle_message(msg).instrument(span).await;
3823                    let duration = start.elapsed();
3824
3825                    self.metrics
3826                        .message_handling
3827                        .with_label_values(&[msg_kind])
3828                        .observe(duration.as_secs_f64());
3829
3830                    // If something is _really_ slow, print a trace id for debugging, if OTEL is enabled.
3831                    if duration > warn_threshold {
3832                        let trace_id = otel_context.is_valid().then(|| otel_context.trace_id());
3833                        tracing::error!(
3834                            ?msg_kind,
3835                            ?trace_id,
3836                            ?duration,
3837                            "very slow coordinator message"
3838                        );
3839                    }
3840                }
3841            }
3842            // Try and cleanup as a best effort. There may be some async tasks out there holding a
3843            // reference that prevents us from cleaning up.
3844            if let Some(catalog) = Arc::into_inner(self.catalog) {
3845                catalog.expire().await;
3846            }
3847        }
3848        .boxed_local()
3849    }
3850
3851    /// Obtain a read-only Catalog reference.
3852    fn catalog(&self) -> &Catalog {
3853        &self.catalog
3854    }
3855
3856    /// Obtain a read-only Catalog snapshot, suitable for giving out to
3857    /// non-Coordinator thread tasks.
3858    fn owned_catalog(&self) -> Arc<Catalog> {
3859        Arc::clone(&self.catalog)
3860    }
3861
3862    /// Obtain a handle to the optimizer metrics, suitable for giving
3863    /// out to non-Coordinator thread tasks.
3864    fn optimizer_metrics(&self) -> OptimizerMetrics {
3865        self.optimizer_metrics.clone()
3866    }
3867
3868    /// Obtain a writeable Catalog reference.
3869    fn catalog_mut(&mut self) -> &mut Catalog {
3870        // make_mut will cause any other Arc references (from owned_catalog) to
3871        // continue to be valid by cloning the catalog, putting it in a new Arc,
3872        // which lives at self._catalog. If there are no other Arc references,
3873        // then no clone is made, and it returns a reference to the existing
3874        // object. This makes this method and owned_catalog both very cheap: at
3875        // most one clone per catalog mutation, but only if there's a read-only
3876        // reference to it.
3877        Arc::make_mut(&mut self.catalog)
3878    }
3879
3880    /// Refills the user ID pool by allocating IDs from the catalog.
3881    ///
3882    /// Requests `max(min_count, batch_size)` IDs so the pool is never
3883    /// under-filled relative to the configured batch size.
3884    async fn refill_user_id_pool(&mut self, min_count: u64) -> Result<(), AdapterError> {
3885        let batch_size = USER_ID_POOL_BATCH_SIZE.get(self.catalog().system_config().dyncfgs());
3886        let to_allocate = min_count.max(u64::from(batch_size));
3887        let id_ts = self.get_catalog_write_ts().await;
3888        let ids = self.catalog().allocate_user_ids(to_allocate, id_ts).await?;
3889        if let (Some((first_id, _)), Some((last_id, _))) = (ids.first(), ids.last()) {
3890            let start = match first_id {
3891                CatalogItemId::User(id) => *id,
3892                other => {
3893                    return Err(AdapterError::Internal(format!(
3894                        "expected User CatalogItemId, got {other:?}"
3895                    )));
3896                }
3897            };
3898            let end = match last_id {
3899                CatalogItemId::User(id) => *id + 1, // exclusive upper bound
3900                other => {
3901                    return Err(AdapterError::Internal(format!(
3902                        "expected User CatalogItemId, got {other:?}"
3903                    )));
3904                }
3905            };
3906            self.user_id_pool.refill(start, end);
3907        } else {
3908            return Err(AdapterError::Internal(
3909                "catalog returned no user IDs".into(),
3910            ));
3911        }
3912        Ok(())
3913    }
3914
3915    /// Allocates a single user ID, refilling the pool from the catalog if needed.
3916    async fn allocate_user_id(&mut self) -> Result<(CatalogItemId, GlobalId), AdapterError> {
3917        if let Some(id) = self.user_id_pool.allocate() {
3918            return Ok((CatalogItemId::User(id), GlobalId::User(id)));
3919        }
3920        self.refill_user_id_pool(1).await?;
3921        let id = self.user_id_pool.allocate().expect("ID pool just refilled");
3922        Ok((CatalogItemId::User(id), GlobalId::User(id)))
3923    }
3924
3925    /// Allocates `count` user IDs, refilling the pool from the catalog if needed.
3926    async fn allocate_user_ids(
3927        &mut self,
3928        count: u64,
3929    ) -> Result<Vec<(CatalogItemId, GlobalId)>, AdapterError> {
3930        if self.user_id_pool.remaining() < count {
3931            self.refill_user_id_pool(count).await?;
3932        }
3933        let raw_ids = self
3934            .user_id_pool
3935            .allocate_many(count)
3936            .expect("pool has enough IDs after refill");
3937        Ok(raw_ids
3938            .into_iter()
3939            .map(|id| (CatalogItemId::User(id), GlobalId::User(id)))
3940            .collect())
3941    }
3942
3943    /// Obtain a reference to the coordinator's connection context.
3944    fn connection_context(&self) -> &ConnectionContext {
3945        self.controller.connection_context()
3946    }
3947
3948    /// Obtain a reference to the coordinator's secret reader, in an `Arc`.
3949    fn secrets_reader(&self) -> &Arc<dyn SecretsReader> {
3950        &self.connection_context().secrets_reader
3951    }
3952
3953    /// Publishes a notice message to all sessions.
3954    ///
3955    /// TODO(parkmycar): This code is dead, but is a nice parallel to [`Coordinator::broadcast_notice_tx`]
3956    /// so we keep it around.
3957    #[allow(dead_code)]
3958    pub(crate) fn broadcast_notice(&self, notice: AdapterNotice) {
3959        for meta in self.active_conns.values() {
3960            let _ = meta.notice_tx.send(notice.clone());
3961        }
3962    }
3963
3964    /// Returns a closure that will publish a notice to all sessions that were active at the time
3965    /// this method was called.
3966    pub(crate) fn broadcast_notice_tx(
3967        &self,
3968    ) -> Box<dyn FnOnce(AdapterNotice) -> () + Send + 'static> {
3969        let senders: Vec<_> = self
3970            .active_conns
3971            .values()
3972            .map(|meta| meta.notice_tx.clone())
3973            .collect();
3974        Box::new(move |notice| {
3975            for tx in senders {
3976                let _ = tx.send(notice.clone());
3977            }
3978        })
3979    }
3980
3981    pub(crate) fn active_conns(&self) -> &BTreeMap<ConnectionId, ConnMeta> {
3982        &self.active_conns
3983    }
3984
3985    #[instrument(level = "debug")]
3986    pub(crate) fn retire_execution(
3987        &mut self,
3988        reason: StatementEndedExecutionReason,
3989        ctx_extra: ExecuteContextExtra,
3990    ) {
3991        if let Some(uuid) = ctx_extra.retire() {
3992            self.end_statement_execution(uuid, reason);
3993        }
3994    }
3995
3996    /// Creates a new dataflow builder from the catalog and indexes in `self`.
3997    #[instrument(level = "debug")]
3998    pub fn dataflow_builder(&self, instance: ComputeInstanceId) -> DataflowBuilder<'_> {
3999        let compute = self
4000            .instance_snapshot(instance)
4001            .expect("compute instance does not exist");
4002        DataflowBuilder::new(self.catalog().state(), compute)
4003    }
4004
4005    /// Return a reference-less snapshot to the indicated compute instance.
4006    pub fn instance_snapshot(
4007        &self,
4008        id: ComputeInstanceId,
4009    ) -> Result<ComputeInstanceSnapshot, InstanceMissing> {
4010        ComputeInstanceSnapshot::new(&self.controller, id)
4011    }
4012
4013    /// Call into the compute controller to install a finalized dataflow, and
4014    /// initialize the read policies for its exported readable objects.
4015    ///
4016    /// # Panics
4017    ///
4018    /// Panics if dataflow creation fails.
4019    pub(crate) async fn ship_dataflow(
4020        &mut self,
4021        dataflow: DataflowDescription<Plan>,
4022        instance: ComputeInstanceId,
4023        target_replica: Option<ReplicaId>,
4024    ) {
4025        self.try_ship_dataflow(dataflow, instance, target_replica)
4026            .await
4027            .unwrap_or_terminate("dataflow creation cannot fail");
4028    }
4029
4030    /// Call into the compute controller to install a finalized dataflow, and
4031    /// initialize the read policies for its exported readable objects.
4032    pub(crate) async fn try_ship_dataflow(
4033        &mut self,
4034        dataflow: DataflowDescription<Plan>,
4035        instance: ComputeInstanceId,
4036        target_replica: Option<ReplicaId>,
4037    ) -> Result<(), DataflowCreationError> {
4038        // We must only install read policies for indexes, not for sinks.
4039        // Sinks are write-only compute collections that don't have read policies.
4040        let export_ids = dataflow.exported_index_ids().collect();
4041
4042        self.controller
4043            .compute
4044            .create_dataflow(instance, dataflow, target_replica)?;
4045
4046        self.initialize_compute_read_policies(export_ids, instance, CompactionWindow::Default)
4047            .await;
4048
4049        Ok(())
4050    }
4051
4052    /// Call into the compute controller to allow writes to the specified IDs
4053    /// from the specified instance. Calling this function multiple times and
4054    /// calling it on a read-only instance has no effect.
4055    pub(crate) fn allow_writes(&mut self, instance: ComputeInstanceId, id: GlobalId) {
4056        self.controller
4057            .compute
4058            .allow_writes(instance, id)
4059            .unwrap_or_terminate("allow_writes cannot fail");
4060    }
4061
4062    /// Like `ship_dataflow`, but also await on builtin table updates.
4063    pub(crate) async fn ship_dataflow_and_notice_builtin_table_updates(
4064        &mut self,
4065        dataflow: DataflowDescription<Plan>,
4066        instance: ComputeInstanceId,
4067        notice_builtin_updates_fut: Option<BuiltinTableAppendNotify>,
4068        target_replica: Option<ReplicaId>,
4069    ) {
4070        if let Some(notice_builtin_updates_fut) = notice_builtin_updates_fut {
4071            let ship_dataflow_fut = self.ship_dataflow(dataflow, instance, target_replica);
4072            let ((), ()) =
4073                futures::future::join(notice_builtin_updates_fut, ship_dataflow_fut).await;
4074        } else {
4075            self.ship_dataflow(dataflow, instance, target_replica).await;
4076        }
4077    }
4078
4079    /// Install a _watch set_ in the controller that is automatically associated with the given
4080    /// connection id. The watchset will be automatically cleared if the connection terminates
4081    /// before the watchset completes.
4082    pub fn install_compute_watch_set(
4083        &mut self,
4084        conn_id: ConnectionId,
4085        objects: BTreeSet<GlobalId>,
4086        t: Timestamp,
4087        state: WatchSetResponse,
4088    ) -> Result<(), CollectionLookupError> {
4089        let ws_id = self.controller.install_compute_watch_set(objects, t)?;
4090        self.connection_watch_sets
4091            .entry(conn_id.clone())
4092            .or_default()
4093            .insert(ws_id);
4094        self.installed_watch_sets.insert(ws_id, (conn_id, state));
4095        Ok(())
4096    }
4097
4098    /// Install a _watch set_ in the controller that is automatically associated with the given
4099    /// connection id. The watchset will be automatically cleared if the connection terminates
4100    /// before the watchset completes.
4101    pub fn install_storage_watch_set(
4102        &mut self,
4103        conn_id: ConnectionId,
4104        objects: BTreeSet<GlobalId>,
4105        t: Timestamp,
4106        state: WatchSetResponse,
4107    ) -> Result<(), CollectionMissing> {
4108        let ws_id = self.controller.install_storage_watch_set(objects, t)?;
4109        self.connection_watch_sets
4110            .entry(conn_id.clone())
4111            .or_default()
4112            .insert(ws_id);
4113        self.installed_watch_sets.insert(ws_id, (conn_id, state));
4114        Ok(())
4115    }
4116
4117    /// Cancels pending watchsets associated with the provided connection id.
4118    pub fn cancel_pending_watchsets(&mut self, conn_id: &ConnectionId) {
4119        if let Some(ws_ids) = self.connection_watch_sets.remove(conn_id) {
4120            for ws_id in ws_ids {
4121                self.installed_watch_sets.remove(&ws_id);
4122            }
4123        }
4124    }
4125
4126    /// Returns the state of the [`Coordinator`] formatted as JSON.
4127    ///
4128    /// The returned value is not guaranteed to be stable and may change at any point in time.
4129    pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
4130        // Note: We purposefully use the `Debug` formatting for the value of all fields in the
4131        // returned object as a tradeoff between usability and stability. `serde_json` will fail
4132        // to serialize an object if the keys aren't strings, so `Debug` formatting the values
4133        // prevents a future unrelated change from silently breaking this method.
4134
4135        let global_timelines: BTreeMap<_, _> = self
4136            .global_timelines
4137            .iter()
4138            .map(|(timeline, state)| (timeline.to_string(), format!("{state:?}")))
4139            .collect();
4140        let active_conns: BTreeMap<_, _> = self
4141            .active_conns
4142            .iter()
4143            .map(|(id, meta)| (id.unhandled().to_string(), format!("{meta:?}")))
4144            .collect();
4145        let txn_read_holds: BTreeMap<_, _> = self
4146            .txn_read_holds
4147            .iter()
4148            .map(|(id, capability)| (id.unhandled().to_string(), format!("{capability:?}")))
4149            .collect();
4150        let pending_peeks: BTreeMap<_, _> = self
4151            .pending_peeks
4152            .iter()
4153            .map(|(id, peek)| (id.to_string(), format!("{peek:?}")))
4154            .collect();
4155        let client_pending_peeks: BTreeMap<_, _> = self
4156            .client_pending_peeks
4157            .iter()
4158            .map(|(id, peek)| {
4159                let peek: BTreeMap<_, _> = peek
4160                    .iter()
4161                    .map(|(uuid, storage_id)| (uuid.to_string(), storage_id))
4162                    .collect();
4163                (id.to_string(), peek)
4164            })
4165            .collect();
4166        let pending_linearize_read_txns: BTreeMap<_, _> = self
4167            .pending_linearize_read_txns
4168            .iter()
4169            .map(|(id, read_txn)| (id.unhandled().to_string(), format!("{read_txn:?}")))
4170            .collect();
4171
4172        Ok(serde_json::json!({
4173            "global_timelines": global_timelines,
4174            "active_conns": active_conns,
4175            "txn_read_holds": txn_read_holds,
4176            "pending_peeks": pending_peeks,
4177            "client_pending_peeks": client_pending_peeks,
4178            "pending_linearize_read_txns": pending_linearize_read_txns,
4179            "controller": self.controller.dump().await?,
4180        }))
4181    }
4182
4183    /// Prune all storage usage events from the [`MZ_STORAGE_USAGE_BY_SHARD`] table that are older
4184    /// than `retention_period`.
4185    ///
4186    /// This method will read the entire contents of [`MZ_STORAGE_USAGE_BY_SHARD`] into memory
4187    /// which can be expensive.
4188    ///
4189    /// DO NOT call this method outside of startup. The safety of reading at the current oracle read
4190    /// timestamp and then writing at whatever the current write timestamp is (instead of
4191    /// `read_ts + 1`) relies on the fact that there are no outstanding writes during startup.
4192    ///
4193    /// Group commit, which this method uses to write the retractions, has builtin fencing, and we
4194    /// never commit retractions to [`MZ_STORAGE_USAGE_BY_SHARD`] outside of this method, which is
4195    /// only called once during startup. So we don't have to worry about double/invalid retractions.
4196    async fn prune_storage_usage_events_on_startup(&self, retention_period: Duration) {
4197        let item_id = self
4198            .catalog()
4199            .resolve_builtin_table(&MZ_STORAGE_USAGE_BY_SHARD);
4200        let global_id = self.catalog.get_entry(&item_id).latest_global_id();
4201        let read_ts = self.get_local_read_ts().await;
4202        let current_contents_fut = self
4203            .controller
4204            .storage_collections
4205            .snapshot(global_id, read_ts);
4206        let internal_cmd_tx = self.internal_cmd_tx.clone();
4207        spawn(|| "storage_usage_prune", async move {
4208            let mut current_contents = current_contents_fut
4209                .await
4210                .unwrap_or_terminate("cannot fail to fetch snapshot");
4211            differential_dataflow::consolidation::consolidate(&mut current_contents);
4212
4213            let cutoff_ts = u128::from(read_ts).saturating_sub(retention_period.as_millis());
4214            let mut expired = Vec::new();
4215            for (row, diff) in current_contents {
4216                assert_eq!(
4217                    diff, 1,
4218                    "consolidated contents should not contain retractions: ({row:#?}, {diff:#?})"
4219                );
4220                // This logic relies on the definition of `mz_storage_usage_by_shard` not changing.
4221                let collection_timestamp = row
4222                    .unpack()
4223                    .get(3)
4224                    .expect("definition of mz_storage_by_shard changed")
4225                    .unwrap_timestamptz();
4226                let collection_timestamp = collection_timestamp.timestamp_millis();
4227                let collection_timestamp: u128 = collection_timestamp
4228                    .try_into()
4229                    .expect("all collections happen after Jan 1 1970");
4230                if collection_timestamp < cutoff_ts {
4231                    debug!("pruning storage event {row:?}");
4232                    let builtin_update = BuiltinTableUpdate::row(item_id, row, Diff::MINUS_ONE);
4233                    expired.push(builtin_update);
4234                }
4235            }
4236
4237            // main thread has shut down.
4238            let _ = internal_cmd_tx.send(Message::StorageUsagePrune(expired));
4239        });
4240    }
4241
4242    fn current_credit_consumption_rate(&self) -> Numeric {
4243        self.catalog()
4244            .user_cluster_replicas()
4245            .filter_map(|replica| match &replica.config.location {
4246                ReplicaLocation::Managed(location) => Some(location.size_for_billing()),
4247                ReplicaLocation::Unmanaged(_) => None,
4248            })
4249            .map(|size| {
4250                self.catalog()
4251                    .cluster_replica_sizes()
4252                    .0
4253                    .get(size)
4254                    .expect("location size is validated against the cluster replica sizes")
4255                    .credits_per_hour
4256            })
4257            .sum()
4258    }
4259}
4260
4261#[cfg(test)]
4262impl Coordinator {
4263    #[allow(dead_code)]
4264    async fn verify_ship_dataflow_no_error(&mut self, dataflow: DataflowDescription<Plan>) {
4265        // `ship_dataflow_new` is not allowed to have a `Result` return because this function is
4266        // called after `catalog_transact`, after which no errors are allowed. This test exists to
4267        // prevent us from incorrectly teaching those functions how to return errors (which has
4268        // happened twice and is the motivation for this test).
4269
4270        // An arbitrary compute instance ID to satisfy the function calls below. Note that
4271        // this only works because this function will never run.
4272        let compute_instance = ComputeInstanceId::user(1).expect("1 is a valid ID");
4273
4274        let _: () = self.ship_dataflow(dataflow, compute_instance, None).await;
4275    }
4276}
4277
4278/// Contains information about the last message the [`Coordinator`] processed.
4279struct LastMessage {
4280    kind: &'static str,
4281    stmt: Option<Arc<Statement<Raw>>>,
4282}
4283
4284impl LastMessage {
4285    /// Returns a redacted version of the statement that is safe for logs.
4286    fn stmt_to_string(&self) -> Cow<'static, str> {
4287        self.stmt
4288            .as_ref()
4289            .map(|stmt| stmt.to_ast_string_redacted().into())
4290            .unwrap_or(Cow::Borrowed("<none>"))
4291    }
4292}
4293
4294impl fmt::Debug for LastMessage {
4295    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4296        f.debug_struct("LastMessage")
4297            .field("kind", &self.kind)
4298            .field("stmt", &self.stmt_to_string())
4299            .finish()
4300    }
4301}
4302
4303impl Drop for LastMessage {
4304    fn drop(&mut self) {
4305        // Only print the last message if we're currently panicking, otherwise we'd spam our logs.
4306        if std::thread::panicking() {
4307            // If we're panicking theres no guarantee `tracing` still works, so print to stderr.
4308            eprintln!("Coordinator panicking, dumping last message\n{self:?}",);
4309        }
4310    }
4311}
4312
4313/// Serves the coordinator based on the provided configuration.
4314///
4315/// For a high-level description of the coordinator, see the [crate
4316/// documentation](crate).
4317///
4318/// Returns a handle to the coordinator and a client to communicate with the
4319/// coordinator.
4320///
4321/// BOXED FUTURE: As of Nov 2023 the returned Future from this function was 42KB. This would
4322/// get stored on the stack which is bad for runtime performance, and blow up our stack usage.
4323/// Because of that we purposefully move this Future onto the heap (i.e. Box it).
4324pub fn serve(
4325    Config {
4326        controller_config,
4327        controller_envd_epoch,
4328        mut storage,
4329        audit_logs_iterator,
4330        timestamp_oracle_url,
4331        unsafe_mode,
4332        all_features,
4333        build_info,
4334        environment_id,
4335        metrics_registry,
4336        now,
4337        secrets_controller,
4338        cloud_resource_controller,
4339        cluster_replica_sizes,
4340        builtin_system_cluster_config,
4341        builtin_catalog_server_cluster_config,
4342        builtin_probe_cluster_config,
4343        builtin_support_cluster_config,
4344        builtin_analytics_cluster_config,
4345        system_parameter_defaults,
4346        availability_zones,
4347        storage_usage_client,
4348        storage_usage_collection_interval,
4349        storage_usage_retention_period,
4350        segment_client,
4351        egress_addresses,
4352        aws_account_id,
4353        aws_privatelink_availability_zones,
4354        connection_context,
4355        connection_limit_callback,
4356        remote_system_parameters,
4357        webhook_concurrency_limit,
4358        http_host_name,
4359        tracing_handle,
4360        read_only_controllers,
4361        caught_up_trigger: clusters_caught_up_trigger,
4362        helm_chart_version,
4363        license_key,
4364        external_login_password_mz_system,
4365        force_builtin_schema_migration,
4366    }: Config,
4367) -> BoxFuture<'static, Result<(Handle, Client), AdapterError>> {
4368    async move {
4369        let coord_start = Instant::now();
4370        info!("startup: coordinator init: beginning");
4371        info!("startup: coordinator init: preamble beginning");
4372
4373        // Initializing the builtins can be an expensive process and consume a lot of memory. We
4374        // forcibly initialize it early while the stack is relatively empty to avoid stack
4375        // overflows later.
4376        let _builtins = LazyLock::force(&BUILTINS_STATIC);
4377
4378        let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
4379        let (internal_cmd_tx, internal_cmd_rx) = mpsc::unbounded_channel();
4380        let (strict_serializable_reads_tx, strict_serializable_reads_rx) =
4381            mpsc::unbounded_channel();
4382
4383        // Validate and process availability zones.
4384        if !availability_zones.iter().all_unique() {
4385            coord_bail!("availability zones must be unique");
4386        }
4387
4388        let aws_principal_context = match (
4389            aws_account_id,
4390            connection_context.aws_external_id_prefix.clone(),
4391        ) {
4392            (Some(aws_account_id), Some(aws_external_id_prefix)) => Some(AwsPrincipalContext {
4393                aws_account_id,
4394                aws_external_id_prefix,
4395            }),
4396            _ => None,
4397        };
4398
4399        let aws_privatelink_availability_zones = aws_privatelink_availability_zones
4400            .map(|azs_vec| BTreeSet::from_iter(azs_vec.iter().cloned()));
4401
4402        info!(
4403            "startup: coordinator init: preamble complete in {:?}",
4404            coord_start.elapsed()
4405        );
4406        let oracle_init_start = Instant::now();
4407        info!("startup: coordinator init: timestamp oracle init beginning");
4408
4409        let timestamp_oracle_config = timestamp_oracle_url
4410            .map(|url| TimestampOracleConfig::from_url(&url, &metrics_registry))
4411            .transpose()?;
4412        let mut initial_timestamps =
4413            get_initial_oracle_timestamps(&timestamp_oracle_config).await?;
4414
4415        // Insert an entry for the `EpochMilliseconds` timeline if one doesn't exist,
4416        // which will ensure that the timeline is initialized since it's required
4417        // by the system.
4418        initial_timestamps
4419            .entry(Timeline::EpochMilliseconds)
4420            .or_insert_with(mz_repr::Timestamp::minimum);
4421        let mut timestamp_oracles = BTreeMap::new();
4422        for (timeline, initial_timestamp) in initial_timestamps {
4423            Coordinator::ensure_timeline_state_with_initial_time(
4424                &timeline,
4425                initial_timestamp,
4426                now.clone(),
4427                timestamp_oracle_config.clone(),
4428                &mut timestamp_oracles,
4429                read_only_controllers,
4430            )
4431            .await;
4432        }
4433
4434        // Opening the durable catalog uses one or more timestamps without communicating with
4435        // the timestamp oracle. Here we make sure to apply the catalog upper with the timestamp
4436        // oracle to linearize future operations with opening the catalog.
4437        let catalog_upper = storage.current_upper().await;
4438        // Choose a time at which to boot. This is used, for example, to prune
4439        // old storage usage data or migrate audit log entries.
4440        //
4441        // This time is usually the current system time, but with protection
4442        // against backwards time jumps, even across restarts.
4443        let epoch_millis_oracle = &timestamp_oracles
4444            .get(&Timeline::EpochMilliseconds)
4445            .expect("inserted above")
4446            .oracle;
4447
4448        let mut boot_ts = if read_only_controllers {
4449            let read_ts = epoch_millis_oracle.read_ts().await;
4450            std::cmp::max(read_ts, catalog_upper)
4451        } else {
4452            // Getting/applying a write timestamp bumps the write timestamp in the
4453            // oracle, which we're not allowed in read-only mode.
4454            epoch_millis_oracle.apply_write(catalog_upper).await;
4455            epoch_millis_oracle.write_ts().await.timestamp
4456        };
4457
4458        info!(
4459            "startup: coordinator init: timestamp oracle init complete in {:?}",
4460            oracle_init_start.elapsed()
4461        );
4462
4463        let catalog_open_start = Instant::now();
4464        info!("startup: coordinator init: catalog open beginning");
4465        let persist_client = controller_config
4466            .persist_clients
4467            .open(controller_config.persist_location.clone())
4468            .await
4469            .context("opening persist client")?;
4470        let builtin_item_migration_config =
4471            BuiltinItemMigrationConfig {
4472                persist_client: persist_client.clone(),
4473                read_only: read_only_controllers,
4474                force_migration: force_builtin_schema_migration,
4475            }
4476        ;
4477        let OpenCatalogResult {
4478            mut catalog,
4479            migrated_storage_collections_0dt,
4480            new_builtin_collections,
4481            builtin_table_updates,
4482            cached_global_exprs,
4483            uncached_local_exprs,
4484        } = Catalog::open(mz_catalog::config::Config {
4485            storage,
4486            metrics_registry: &metrics_registry,
4487            state: mz_catalog::config::StateConfig {
4488                unsafe_mode,
4489                all_features,
4490                build_info,
4491                environment_id: environment_id.clone(),
4492                read_only: read_only_controllers,
4493                now: now.clone(),
4494                boot_ts: boot_ts.clone(),
4495                skip_migrations: false,
4496                cluster_replica_sizes,
4497                builtin_system_cluster_config,
4498                builtin_catalog_server_cluster_config,
4499                builtin_probe_cluster_config,
4500                builtin_support_cluster_config,
4501                builtin_analytics_cluster_config,
4502                system_parameter_defaults,
4503                remote_system_parameters,
4504                availability_zones,
4505                egress_addresses,
4506                aws_principal_context,
4507                aws_privatelink_availability_zones,
4508                connection_context,
4509                http_host_name,
4510                builtin_item_migration_config,
4511                persist_client: persist_client.clone(),
4512                enable_expression_cache_override: None,
4513                helm_chart_version,
4514                external_login_password_mz_system,
4515                license_key: license_key.clone(),
4516            },
4517        })
4518        .await?;
4519
4520        // Opening the catalog uses one or more timestamps, so push the boot timestamp up to the
4521        // current catalog upper.
4522        let catalog_upper = catalog.current_upper().await;
4523        boot_ts = std::cmp::max(boot_ts, catalog_upper);
4524
4525        if !read_only_controllers {
4526            epoch_millis_oracle.apply_write(boot_ts).await;
4527        }
4528
4529        info!(
4530            "startup: coordinator init: catalog open complete in {:?}",
4531            catalog_open_start.elapsed()
4532        );
4533
4534        let coord_thread_start = Instant::now();
4535        info!("startup: coordinator init: coordinator thread start beginning");
4536
4537        let session_id = catalog.config().session_id;
4538        let start_instant = catalog.config().start_instant;
4539
4540        // In order for the coordinator to support Rc and Refcell types, it cannot be
4541        // sent across threads. Spawn it in a thread and have this parent thread wait
4542        // for bootstrap completion before proceeding.
4543        let (bootstrap_tx, bootstrap_rx) = oneshot::channel();
4544        let handle = TokioHandle::current();
4545
4546        let metrics = Metrics::register_into(&metrics_registry);
4547        let metrics_clone = metrics.clone();
4548        let optimizer_metrics = OptimizerMetrics::register_into(
4549            &metrics_registry,
4550            catalog.system_config().optimizer_e2e_latency_warning_threshold(),
4551        );
4552        let segment_client_clone = segment_client.clone();
4553        let coord_now = now.clone();
4554        let advance_timelines_interval =
4555            tokio::time::interval(catalog.system_config().default_timestamp_interval());
4556        let mut check_scheduling_policies_interval = tokio::time::interval(
4557            catalog
4558                .system_config()
4559                .cluster_check_scheduling_policies_interval(),
4560        );
4561        check_scheduling_policies_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
4562
4563        let clusters_caught_up_check_interval = if read_only_controllers {
4564            let dyncfgs = catalog.system_config().dyncfgs();
4565            let interval = WITH_0DT_DEPLOYMENT_CAUGHT_UP_CHECK_INTERVAL.get(dyncfgs);
4566
4567            let mut interval = tokio::time::interval(interval);
4568            interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
4569            interval
4570        } else {
4571            // When not in read-only mode, we don't do hydration checks. But we
4572            // still have to provide _some_ interval. This is large enough that
4573            // it doesn't matter.
4574            //
4575            // TODO(aljoscha): We cannot use Duration::MAX right now because of
4576            // https://github.com/tokio-rs/tokio/issues/6634. Use that once it's
4577            // fixed for good.
4578            let mut interval = tokio::time::interval(Duration::from_secs(60 * 60));
4579            interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
4580            interval
4581        };
4582
4583        let clusters_caught_up_check =
4584            clusters_caught_up_trigger.map(|trigger| {
4585                let mut exclude_collections: BTreeSet<GlobalId> =
4586                    new_builtin_collections.into_iter().collect();
4587
4588                // Migrated MVs can't make progress in read-only mode. Exclude them and all their
4589                // transitive dependents.
4590                //
4591                // TODO: Consider sending `allow_writes` for the dataflows of migrated MVs, which
4592                //       would allow them to make progress even in read-only mode. This doesn't
4593                //       work for MVs based on `mz_catalog_raw`, if the leader's version is less
4594                //       than v26.17, since before that version the catalog shard's frontier wasn't
4595                //       kept up-to-date with the current time. So this workaround has to remain in
4596                //       place upgrades from a version less than v26.17 are no longer supported.
4597                let mut todo: Vec<_> = migrated_storage_collections_0dt
4598                    .iter()
4599                    .filter(|id| {
4600                        catalog.state().get_entry(id).is_materialized_view()
4601                    })
4602                    .copied()
4603                    .collect();
4604                while let Some(item_id) = todo.pop() {
4605                    let entry = catalog.state().get_entry(&item_id);
4606                    exclude_collections.extend(entry.global_ids());
4607                    todo.extend_from_slice(entry.used_by());
4608                }
4609
4610                CaughtUpCheckContext {
4611                    trigger,
4612                    exclude_collections,
4613                }
4614            });
4615
4616        if let Some(TimestampOracleConfig::Postgres(pg_config)) =
4617            timestamp_oracle_config.as_ref()
4618        {
4619            // Apply settings from system vars as early as possible because some
4620            // of them are locked in right when an oracle is first opened!
4621            let pg_timestamp_oracle_params =
4622                flags::timestamp_oracle_config(catalog.system_config());
4623            pg_timestamp_oracle_params.apply(pg_config);
4624        }
4625
4626        // Register a callback so whenever the MAX_CONNECTIONS or SUPERUSER_RESERVED_CONNECTIONS
4627        // system variables change, we update our connection limits.
4628        let connection_limit_callback: Arc<dyn Fn(&SystemVars) + Send + Sync> =
4629            Arc::new(move |system_vars: &SystemVars| {
4630                let limit: u64 = system_vars.max_connections().cast_into();
4631                let superuser_reserved: u64 =
4632                    system_vars.superuser_reserved_connections().cast_into();
4633
4634                // If superuser_reserved > max_connections, prefer max_connections.
4635                //
4636                // In this scenario all normal users would be locked out because all connections
4637                // would be reserved for superusers so complain if this is the case.
4638                let superuser_reserved = if superuser_reserved >= limit {
4639                    tracing::warn!(
4640                        "superuser_reserved ({superuser_reserved}) is greater than max connections ({limit})!"
4641                    );
4642                    limit
4643                } else {
4644                    superuser_reserved
4645                };
4646
4647                (connection_limit_callback)(limit, superuser_reserved);
4648            });
4649        catalog.system_config_mut().register_callback(
4650            &mz_sql::session::vars::MAX_CONNECTIONS,
4651            Arc::clone(&connection_limit_callback),
4652        );
4653        catalog.system_config_mut().register_callback(
4654            &mz_sql::session::vars::SUPERUSER_RESERVED_CONNECTIONS,
4655            connection_limit_callback,
4656        );
4657
4658        let (group_commit_tx, group_commit_rx) = appends::notifier();
4659
4660        let parent_span = tracing::Span::current();
4661        let thread = thread::Builder::new()
4662            // The Coordinator thread tends to keep a lot of data on its stack. To
4663            // prevent a stack overflow we allocate a stack three times as big as the default
4664            // stack.
4665            .stack_size(3 * stack::STACK_SIZE)
4666            .name("coordinator".to_string())
4667            .spawn(move || {
4668                let span = info_span!(parent: parent_span, "coord::coordinator").entered();
4669
4670                let controller = handle
4671                    .block_on({
4672                        catalog.initialize_controller(
4673                            controller_config,
4674                            controller_envd_epoch,
4675                            read_only_controllers,
4676                        )
4677                    })
4678                    .unwrap_or_terminate("failed to initialize storage_controller");
4679                // Initializing the controller uses one or more timestamps, so push the boot timestamp up to the
4680                // current catalog upper.
4681                let catalog_upper = handle.block_on(catalog.current_upper());
4682                boot_ts = std::cmp::max(boot_ts, catalog_upper);
4683                if !read_only_controllers {
4684                    let epoch_millis_oracle = &timestamp_oracles
4685                        .get(&Timeline::EpochMilliseconds)
4686                        .expect("inserted above")
4687                        .oracle;
4688                    handle.block_on(epoch_millis_oracle.apply_write(boot_ts));
4689                }
4690
4691                let catalog = Arc::new(catalog);
4692
4693                let caching_secrets_reader = CachingSecretsReader::new(secrets_controller.reader());
4694                let mut coord = Coordinator {
4695                    controller,
4696                    catalog,
4697                    internal_cmd_tx,
4698                    group_commit_tx,
4699                    strict_serializable_reads_tx,
4700                    global_timelines: timestamp_oracles,
4701                    transient_id_gen: Arc::new(TransientIdGen::new()),
4702                    active_conns: BTreeMap::new(),
4703                    txn_read_holds: Default::default(),
4704                    pending_peeks: BTreeMap::new(),
4705                    client_pending_peeks: BTreeMap::new(),
4706                    pending_linearize_read_txns: BTreeMap::new(),
4707                    serialized_ddl: LockedVecDeque::new(),
4708                    active_compute_sinks: BTreeMap::new(),
4709                    active_webhooks: BTreeMap::new(),
4710                    active_copies: BTreeMap::new(),
4711                    staged_cancellation: BTreeMap::new(),
4712                    introspection_subscribes: BTreeMap::new(),
4713                    write_locks: BTreeMap::new(),
4714                    deferred_write_ops: BTreeMap::new(),
4715                    pending_writes: Vec::new(),
4716                    advance_timelines_interval,
4717                    secrets_controller,
4718                    caching_secrets_reader,
4719                    cloud_resource_controller,
4720                    storage_usage_client,
4721                    storage_usage_collection_interval,
4722                    segment_client,
4723                    metrics,
4724                    optimizer_metrics,
4725                    tracing_handle,
4726                    statement_logging: StatementLogging::new(coord_now.clone()),
4727                    webhook_concurrency_limit,
4728                    timestamp_oracle_config,
4729                    check_cluster_scheduling_policies_interval: check_scheduling_policies_interval,
4730                    cluster_scheduling_decisions: BTreeMap::new(),
4731                    caught_up_check_interval: clusters_caught_up_check_interval,
4732                    caught_up_check: clusters_caught_up_check,
4733                    installed_watch_sets: BTreeMap::new(),
4734                    connection_watch_sets: BTreeMap::new(),
4735                    cluster_replica_statuses: ClusterReplicaStatuses::new(),
4736                    read_only_controllers,
4737                    buffered_builtin_table_updates: Some(Vec::new()),
4738                    license_key,
4739                    user_id_pool: IdPool::empty(),
4740                    persist_client,
4741                };
4742                let bootstrap = handle.block_on(async {
4743                    coord
4744                        .bootstrap(
4745                            boot_ts,
4746                            migrated_storage_collections_0dt,
4747                            builtin_table_updates,
4748                            cached_global_exprs,
4749                            uncached_local_exprs,
4750                            audit_logs_iterator,
4751                        )
4752                        .await?;
4753                    coord
4754                        .controller
4755                        .remove_orphaned_replicas(
4756                            coord.catalog().get_next_user_replica_id().await?,
4757                            coord.catalog().get_next_system_replica_id().await?,
4758                        )
4759                        .await
4760                        .map_err(AdapterError::Orchestrator)?;
4761
4762                    if let Some(retention_period) = storage_usage_retention_period {
4763                        coord
4764                            .prune_storage_usage_events_on_startup(retention_period)
4765                            .await;
4766                    }
4767
4768                    Ok(())
4769                });
4770                let ok = bootstrap.is_ok();
4771                drop(span);
4772                bootstrap_tx
4773                    .send(bootstrap)
4774                    .expect("bootstrap_rx is not dropped until it receives this message");
4775                if ok {
4776                    handle.block_on(coord.serve(
4777                        internal_cmd_rx,
4778                        strict_serializable_reads_rx,
4779                        cmd_rx,
4780                        group_commit_rx,
4781                    ));
4782                }
4783            })
4784            .expect("failed to create coordinator thread");
4785        match bootstrap_rx
4786            .await
4787            .expect("bootstrap_tx always sends a message or panics/halts")
4788        {
4789            Ok(()) => {
4790                info!(
4791                    "startup: coordinator init: coordinator thread start complete in {:?}",
4792                    coord_thread_start.elapsed()
4793                );
4794                info!(
4795                    "startup: coordinator init: complete in {:?}",
4796                    coord_start.elapsed()
4797                );
4798                let handle = Handle {
4799                    session_id,
4800                    start_instant,
4801                    _thread: thread.join_on_drop(),
4802                };
4803                let client = Client::new(
4804                    build_info,
4805                    cmd_tx,
4806                    metrics_clone,
4807                    now,
4808                    environment_id,
4809                    segment_client_clone,
4810                );
4811                Ok((handle, client))
4812            }
4813            Err(e) => Err(e),
4814        }
4815    }
4816    .boxed()
4817}
4818
4819// Determines and returns the highest timestamp for each timeline, for all known
4820// timestamp oracle implementations.
4821//
4822// Initially, we did this so that we can switch between implementations of
4823// timestamp oracle, but now we also do this to determine a monotonic boot
4824// timestamp, a timestamp that does not regress across reboots.
4825//
4826// This mostly works, but there can be linearizability violations, because there
4827// is no central moment where we do distributed coordination for all oracle
4828// types. Working around this seems prohibitively hard, maybe even impossible so
4829// we have to live with this window of potential violations during the upgrade
4830// window (which is the only point where we should switch oracle
4831// implementations).
4832async fn get_initial_oracle_timestamps(
4833    timestamp_oracle_config: &Option<TimestampOracleConfig>,
4834) -> Result<BTreeMap<Timeline, Timestamp>, AdapterError> {
4835    let mut initial_timestamps = BTreeMap::new();
4836
4837    if let Some(config) = timestamp_oracle_config {
4838        let oracle_timestamps = config.get_all_timelines().await?;
4839
4840        let debug_msg = || {
4841            oracle_timestamps
4842                .iter()
4843                .map(|(timeline, ts)| format!("{:?} -> {}", timeline, ts))
4844                .join(", ")
4845        };
4846        info!(
4847            "current timestamps from the timestamp oracle: {}",
4848            debug_msg()
4849        );
4850
4851        for (timeline, ts) in oracle_timestamps {
4852            let entry = initial_timestamps
4853                .entry(Timeline::from_str(&timeline).expect("could not parse timeline"));
4854
4855            entry
4856                .and_modify(|current_ts| *current_ts = std::cmp::max(*current_ts, ts))
4857                .or_insert(ts);
4858        }
4859    } else {
4860        info!("no timestamp oracle configured!");
4861    };
4862
4863    let debug_msg = || {
4864        initial_timestamps
4865            .iter()
4866            .map(|(timeline, ts)| format!("{:?}: {}", timeline, ts))
4867            .join(", ")
4868    };
4869    info!("initial oracle timestamps: {}", debug_msg());
4870
4871    Ok(initial_timestamps)
4872}
4873
4874#[instrument]
4875pub async fn load_remote_system_parameters(
4876    storage: &mut Box<dyn OpenableDurableCatalogState>,
4877    system_parameter_sync_config: Option<SystemParameterSyncConfig>,
4878    system_parameter_sync_timeout: Duration,
4879) -> Result<Option<BTreeMap<String, String>>, AdapterError> {
4880    if let Some(system_parameter_sync_config) = system_parameter_sync_config {
4881        tracing::info!("parameter sync on boot: start sync");
4882
4883        // We intentionally block initial startup, potentially forever,
4884        // on initializing LaunchDarkly. This may seem scary, but the
4885        // alternative is even scarier. Over time, we expect that the
4886        // compiled-in default values for the system parameters will
4887        // drift substantially from the defaults configured in
4888        // LaunchDarkly, to the point that starting an environment
4889        // without loading the latest values from LaunchDarkly will
4890        // result in running an untested configuration.
4891        //
4892        // Note this only applies during initial startup. Restarting
4893        // after we've synced once only blocks for a maximum of
4894        // `FRONTEND_SYNC_TIMEOUT` on LaunchDarkly, as it seems
4895        // reasonable to assume that the last-synced configuration was
4896        // valid enough.
4897        //
4898        // This philosophy appears to provide a good balance between not
4899        // running untested configurations in production while also not
4900        // making LaunchDarkly a "tier 1" dependency for existing
4901        // environments.
4902        //
4903        // If this proves to be an issue, we could seek to address the
4904        // configuration drift in a different way--for example, by
4905        // writing a script that runs in CI nightly and checks for
4906        // deviation between the compiled Rust code and LaunchDarkly.
4907        //
4908        // If it is absolutely necessary to bring up a new environment
4909        // while LaunchDarkly is down, the following manual mitigation
4910        // can be performed:
4911        //
4912        //    1. Edit the environmentd startup parameters to omit the
4913        //       LaunchDarkly configuration.
4914        //    2. Boot environmentd.
4915        //    3. Use the catalog-debug tool to run `edit config "{\"key\":\"system_config_synced\"}" "{\"value\": 1}"`.
4916        //    4. Adjust any other parameters as necessary to avoid
4917        //       running a nonstandard configuration in production.
4918        //    5. Edit the environmentd startup parameters to restore the
4919        //       LaunchDarkly configuration, for when LaunchDarkly comes
4920        //       back online.
4921        //    6. Reboot environmentd.
4922        let mut params = SynchronizedParameters::new(SystemVars::default());
4923        let frontend_sync = async {
4924            let frontend = SystemParameterFrontend::from(&system_parameter_sync_config).await?;
4925            frontend.pull(&mut params);
4926            let ops = params
4927                .modified()
4928                .into_iter()
4929                .map(|param| {
4930                    let name = param.name;
4931                    let value = param.value;
4932                    tracing::info!(name, value, initial = true, "sync parameter");
4933                    (name, value)
4934                })
4935                .collect();
4936            tracing::info!("parameter sync on boot: end sync");
4937            Ok(Some(ops))
4938        };
4939        if !storage.has_system_config_synced_once().await? {
4940            frontend_sync.await
4941        } else {
4942            match mz_ore::future::timeout(system_parameter_sync_timeout, frontend_sync).await {
4943                Ok(ops) => Ok(ops),
4944                Err(TimeoutError::Inner(e)) => Err(e),
4945                Err(TimeoutError::DeadlineElapsed) => {
4946                    tracing::info!("parameter sync on boot: sync has timed out");
4947                    Ok(None)
4948                }
4949            }
4950        }
4951    } else {
4952        Ok(None)
4953    }
4954}
4955
4956#[derive(Debug)]
4957pub enum WatchSetResponse {
4958    StatementDependenciesReady(StatementLoggingId, StatementLifecycleEvent),
4959    AlterSinkReady(AlterSinkReadyContext),
4960    AlterMaterializedViewReady(AlterMaterializedViewReadyContext),
4961}
4962
4963#[derive(Debug)]
4964pub struct AlterSinkReadyContext {
4965    ctx: Option<ExecuteContext>,
4966    otel_ctx: OpenTelemetryContext,
4967    plan: AlterSinkPlan,
4968    plan_validity: PlanValidity,
4969    read_hold: ReadHolds,
4970}
4971
4972impl AlterSinkReadyContext {
4973    fn ctx(&mut self) -> &mut ExecuteContext {
4974        self.ctx.as_mut().expect("only cleared on drop")
4975    }
4976
4977    fn retire(mut self, result: Result<ExecuteResponse, AdapterError>) {
4978        self.ctx
4979            .take()
4980            .expect("only cleared on drop")
4981            .retire(result);
4982    }
4983}
4984
4985impl Drop for AlterSinkReadyContext {
4986    fn drop(&mut self) {
4987        if let Some(ctx) = self.ctx.take() {
4988            ctx.retire(Err(AdapterError::Canceled));
4989        }
4990    }
4991}
4992
4993#[derive(Debug)]
4994pub struct AlterMaterializedViewReadyContext {
4995    ctx: Option<ExecuteContext>,
4996    otel_ctx: OpenTelemetryContext,
4997    plan: plan::AlterMaterializedViewApplyReplacementPlan,
4998    plan_validity: PlanValidity,
4999}
5000
5001impl AlterMaterializedViewReadyContext {
5002    fn ctx(&mut self) -> &mut ExecuteContext {
5003        self.ctx.as_mut().expect("only cleared on drop")
5004    }
5005
5006    fn retire(mut self, result: Result<ExecuteResponse, AdapterError>) {
5007        self.ctx
5008            .take()
5009            .expect("only cleared on drop")
5010            .retire(result);
5011    }
5012}
5013
5014impl Drop for AlterMaterializedViewReadyContext {
5015    fn drop(&mut self) {
5016        if let Some(ctx) = self.ctx.take() {
5017            ctx.retire(Err(AdapterError::Canceled));
5018        }
5019    }
5020}
5021
5022/// A struct for tracking the ownership of a lock and a VecDeque to store to-be-done work after the
5023/// lock is freed.
5024#[derive(Debug)]
5025struct LockedVecDeque<T> {
5026    items: VecDeque<T>,
5027    lock: Arc<tokio::sync::Mutex<()>>,
5028}
5029
5030impl<T> LockedVecDeque<T> {
5031    pub fn new() -> Self {
5032        Self {
5033            items: VecDeque::new(),
5034            lock: Arc::new(tokio::sync::Mutex::new(())),
5035        }
5036    }
5037
5038    pub fn try_lock_owned(&self) -> Result<OwnedMutexGuard<()>, tokio::sync::TryLockError> {
5039        Arc::clone(&self.lock).try_lock_owned()
5040    }
5041
5042    pub fn is_empty(&self) -> bool {
5043        self.items.is_empty()
5044    }
5045
5046    pub fn push_back(&mut self, value: T) {
5047        self.items.push_back(value)
5048    }
5049
5050    pub fn pop_front(&mut self) -> Option<T> {
5051        self.items.pop_front()
5052    }
5053
5054    pub fn remove(&mut self, index: usize) -> Option<T> {
5055        self.items.remove(index)
5056    }
5057
5058    pub fn iter(&self) -> std::collections::vec_deque::Iter<'_, T> {
5059        self.items.iter()
5060    }
5061}
5062
5063#[derive(Debug)]
5064struct DeferredPlanStatement {
5065    ctx: ExecuteContext,
5066    ps: PlanStatement,
5067}
5068
5069#[derive(Debug)]
5070enum PlanStatement {
5071    Statement {
5072        stmt: Arc<Statement<Raw>>,
5073        params: Params,
5074    },
5075    Plan {
5076        plan: mz_sql::plan::Plan,
5077        resolved_ids: ResolvedIds,
5078    },
5079}
5080
5081#[derive(Debug, Error)]
5082pub enum NetworkPolicyError {
5083    #[error("Access denied for address {0}")]
5084    AddressDenied(IpAddr),
5085    #[error("Access denied missing IP address")]
5086    MissingIp,
5087}
5088
5089pub(crate) fn validate_ip_with_policy_rules(
5090    ip: &IpAddr,
5091    rules: &Vec<NetworkPolicyRule>,
5092) -> Result<(), NetworkPolicyError> {
5093    // At the moment we're not handling action or direction
5094    // as those are only able to be "allow" and "ingress" respectively
5095    if rules.iter().any(|r| r.address.0.contains(ip)) {
5096        Ok(())
5097    } else {
5098        Err(NetworkPolicyError::AddressDenied(ip.clone()))
5099    }
5100}
5101
5102pub(crate) fn infer_sql_type_for_catalog(
5103    hir_expr: &HirRelationExpr,
5104    mir_expr: &MirRelationExpr,
5105) -> SqlRelationType {
5106    let mut typ = hir_expr.top_level_typ();
5107    typ.backport_nullability_and_keys(&mir_expr.typ());
5108    typ
5109}
5110
5111#[cfg(test)]
5112mod id_pool_tests {
5113    use super::IdPool;
5114
5115    #[mz_ore::test]
5116    fn test_empty_pool() {
5117        let mut pool = IdPool::empty();
5118        assert_eq!(pool.remaining(), 0);
5119        assert_eq!(pool.allocate(), None);
5120        assert_eq!(pool.allocate_many(1), None);
5121    }
5122
5123    #[mz_ore::test]
5124    fn test_allocate_single() {
5125        let mut pool = IdPool::empty();
5126        pool.refill(10, 13);
5127        assert_eq!(pool.remaining(), 3);
5128        assert_eq!(pool.allocate(), Some(10));
5129        assert_eq!(pool.allocate(), Some(11));
5130        assert_eq!(pool.allocate(), Some(12));
5131        assert_eq!(pool.remaining(), 0);
5132        assert_eq!(pool.allocate(), None);
5133    }
5134
5135    #[mz_ore::test]
5136    fn test_allocate_many() {
5137        let mut pool = IdPool::empty();
5138        pool.refill(100, 105);
5139        assert_eq!(pool.allocate_many(3), Some(vec![100, 101, 102]));
5140        assert_eq!(pool.remaining(), 2);
5141        // Not enough remaining for 3 more.
5142        assert_eq!(pool.allocate_many(3), None);
5143        // But 2 works.
5144        assert_eq!(pool.allocate_many(2), Some(vec![103, 104]));
5145        assert_eq!(pool.remaining(), 0);
5146    }
5147
5148    #[mz_ore::test]
5149    fn test_allocate_many_zero() {
5150        let mut pool = IdPool::empty();
5151        pool.refill(1, 5);
5152        assert_eq!(pool.allocate_many(0), Some(vec![]));
5153        assert_eq!(pool.remaining(), 4);
5154    }
5155
5156    #[mz_ore::test]
5157    fn test_refill_resets_pool() {
5158        let mut pool = IdPool::empty();
5159        pool.refill(0, 2);
5160        assert_eq!(pool.allocate(), Some(0));
5161        // Refill before exhaustion replaces the range.
5162        pool.refill(50, 52);
5163        assert_eq!(pool.allocate(), Some(50));
5164        assert_eq!(pool.allocate(), Some(51));
5165        assert_eq!(pool.allocate(), None);
5166    }
5167
5168    #[mz_ore::test]
5169    fn test_mixed_allocate_and_allocate_many() {
5170        let mut pool = IdPool::empty();
5171        pool.refill(0, 10);
5172        assert_eq!(pool.allocate(), Some(0));
5173        assert_eq!(pool.allocate_many(3), Some(vec![1, 2, 3]));
5174        assert_eq!(pool.allocate(), Some(4));
5175        assert_eq!(pool.remaining(), 5);
5176    }
5177
5178    #[mz_ore::test]
5179    #[should_panic(expected = "invalid pool range")]
5180    fn test_refill_invalid_range_panics() {
5181        let mut pool = IdPool::empty();
5182        pool.refill(10, 5);
5183    }
5184}