Skip to main content

mz_adapter/
coord.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Translation of SQL commands into timestamped `Controller` commands.
11//!
12//! The various SQL commands instruct the system to take actions that are not
13//! yet explicitly timestamped. On the other hand, the underlying data continually
14//! change as time moves forward. On the third hand, we greatly benefit from the
15//! information that some times are no longer of interest, so that we may
16//! compact the representation of the continually changing collections.
17//!
18//! The [`Coordinator`] curates these interactions by observing the progress
19//! collections make through time, choosing timestamps for its own commands,
20//! and eventually communicating that certain times have irretrievably "passed".
21//!
22//! ## Frontiers another way
23//!
24//! If the above description of frontiers left you with questions, this
25//! repackaged explanation might help.
26//!
27//! - `since` is the least recent time (i.e. oldest time) that you can read
28//!   from sources and be guaranteed that the returned data is accurate as of
29//!   that time.
30//!
31//!   Reads at times less than `since` may return values that were not actually
32//!   seen at the specified time, but arrived later (i.e. the results are
33//!   compacted).
34//!
35//!   For correctness' sake, the coordinator never chooses to read at a time
36//!   less than an arrangement's `since`.
37//!
38//! - `upper` is the first time after the most recent time that you can read
39//!   from sources and receive an immediate response. Alternately, it is the
40//!   least time at which the data may still change (that is the reason we may
41//!   not be able to respond immediately).
42//!
43//!   Reads at times >= `upper` may not immediately return because the answer
44//!   isn't known yet. However, once the `upper` is > the specified read time,
45//!   the read can return.
46//!
47//!   For the sake of returned values' freshness, the coordinator prefers
48//!   performing reads at an arrangement's `upper`. However, because we more
49//!   strongly prefer correctness, the coordinator will choose timestamps
50//!   greater than an object's `upper` if it is also being accessed alongside
51//!   objects whose `since` times are >= its `upper`.
52//!
53//! This illustration attempts to show, with time moving left to right, the
54//! relationship between `since` and `upper`.
55//!
56//! - `#`: possibly inaccurate results
57//! - `-`: immediate, correct response
58//! - `?`: not yet known
59//! - `s`: since
60//! - `u`: upper
61//! - `|`: eligible for coordinator to select
62//!
63//! ```nofmt
64//! ####s----u?????
65//!     |||||||||||
66//! ```
67//!
68
69use std::borrow::Cow;
70use std::collections::{BTreeMap, BTreeSet, VecDeque};
71use std::fmt;
72use std::net::IpAddr;
73use std::num::NonZeroI64;
74use std::ops::Neg;
75use std::str::FromStr;
76use std::sync::LazyLock;
77use std::sync::{Arc, Mutex};
78use std::thread;
79use std::time::{Duration, Instant};
80
81use anyhow::Context;
82use chrono::{DateTime, Utc};
83use derivative::Derivative;
84use differential_dataflow::lattice::Lattice;
85use fail::fail_point;
86use futures::StreamExt;
87use futures::future::{BoxFuture, FutureExt, LocalBoxFuture};
88use http::Uri;
89use ipnet::IpNet;
90use itertools::{Either, Itertools};
91use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
92use mz_adapter_types::compaction::CompactionWindow;
93use mz_adapter_types::connection::ConnectionId;
94use mz_adapter_types::dyncfgs::{
95    USER_ID_POOL_BATCH_SIZE, WITH_0DT_DEPLOYMENT_CAUGHT_UP_CHECK_INTERVAL,
96};
97use mz_auth::password::Password;
98use mz_build_info::BuildInfo;
99use mz_catalog::builtin::{BUILTINS, BUILTINS_STATIC, MZ_AUDIT_EVENTS, MZ_STORAGE_USAGE_BY_SHARD};
100use mz_catalog::config::{AwsPrincipalContext, BuiltinItemMigrationConfig, ClusterReplicaSizeMap};
101use mz_catalog::durable::{AuditLogIterator, OpenableDurableCatalogState};
102use mz_catalog::expr_cache::{GlobalExpressions, LocalExpressions};
103use mz_catalog::memory::objects::{
104    CatalogEntry, CatalogItem, ClusterReplicaProcessStatus, ClusterVariantManaged, Connection,
105    DataSourceDesc, StateDiff, StateUpdate, StateUpdateKind, Table, TableDataSource,
106};
107use mz_cloud_resources::{CloudResourceController, VpcEndpointConfig, VpcEndpointEvent};
108use mz_compute_client::as_of_selection;
109use mz_compute_client::controller::error::{
110    CollectionLookupError, CollectionMissing, DataflowCreationError, InstanceMissing,
111};
112use mz_compute_types::ComputeInstanceId;
113use mz_compute_types::dataflows::DataflowDescription;
114use mz_compute_types::plan::Plan;
115use mz_controller::clusters::{
116    ClusterConfig, ClusterEvent, ClusterStatus, ProcessId, ReplicaLocation,
117};
118use mz_controller::{ControllerConfig, Readiness};
119use mz_controller_types::{ClusterId, ReplicaId, WatchSetId};
120use mz_expr::{MapFilterProject, MirRelationExpr, OptimizedMirRelationExpr, RowSetFinishing};
121use mz_license_keys::ValidatedLicenseKey;
122use mz_orchestrator::OfflineReason;
123use mz_ore::cast::{CastFrom, CastInto, CastLossy};
124use mz_ore::channel::trigger::Trigger;
125use mz_ore::future::TimeoutError;
126use mz_ore::metrics::MetricsRegistry;
127use mz_ore::now::{EpochMillis, NowFn};
128use mz_ore::task::{JoinHandle, spawn};
129use mz_ore::thread::JoinHandleExt;
130use mz_ore::tracing::{OpenTelemetryContext, TracingHandle};
131use mz_ore::url::SensitiveUrl;
132use mz_ore::{assert_none, instrument, soft_assert_eq_or_log, soft_assert_or_log, stack};
133use mz_persist_client::PersistClient;
134use mz_persist_client::batch::ProtoBatch;
135use mz_persist_client::usage::{ShardsUsageReferenced, StorageUsageClient};
136use mz_repr::adt::numeric::Numeric;
137use mz_repr::explain::{ExplainConfig, ExplainFormat};
138use mz_repr::global_id::TransientIdGen;
139use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
140use mz_repr::role_id::RoleId;
141use mz_repr::{CatalogItemId, Diff, GlobalId, RelationDesc, SqlRelationType, Timestamp};
142use mz_secrets::cache::CachingSecretsReader;
143use mz_secrets::{SecretsController, SecretsReader};
144use mz_sql::ast::{Raw, Statement};
145use mz_sql::catalog::{CatalogCluster, EnvironmentId};
146use mz_sql::names::{QualifiedItemName, ResolvedIds, SchemaSpecifier};
147use mz_sql::optimizer_metrics::OptimizerMetrics;
148use mz_sql::plan::{
149    self, AlterSinkPlan, ConnectionDetails, CreateConnectionPlan, HirRelationExpr,
150    NetworkPolicyRule, OnTimeoutAction, Params, QueryWhen,
151};
152use mz_sql::session::user::User;
153use mz_sql::session::vars::{MAX_CREDIT_CONSUMPTION_RATE, SystemVars, Var};
154use mz_sql_parser::ast::ExplainStage;
155use mz_sql_parser::ast::display::AstDisplay;
156use mz_storage_client::client::TableData;
157use mz_storage_client::controller::{CollectionDescription, DataSource, ExportDescription};
158use mz_storage_types::connections::Connection as StorageConnection;
159use mz_storage_types::connections::ConnectionContext;
160use mz_storage_types::connections::inline::{IntoInlineConnection, ReferencedConnection};
161use mz_storage_types::read_holds::ReadHold;
162use mz_storage_types::sinks::{S3SinkFormat, StorageSinkDesc};
163use mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC;
164use mz_storage_types::sources::{IngestionDescription, SourceExport, Timeline};
165use mz_timestamp_oracle::{TimestampOracleConfig, WriteTimestamp};
166use mz_transform::dataflow::DataflowMetainfo;
167use opentelemetry::trace::TraceContextExt;
168use serde::Serialize;
169use thiserror::Error;
170use timely::progress::{Antichain, Timestamp as _};
171use tokio::runtime::Handle as TokioHandle;
172use tokio::select;
173use tokio::sync::{OwnedMutexGuard, mpsc, oneshot, watch};
174use tokio::time::{Interval, MissedTickBehavior};
175use tracing::{Instrument, Level, Span, debug, info, info_span, span, warn};
176use tracing_opentelemetry::OpenTelemetrySpanExt;
177use uuid::Uuid;
178
179use crate::active_compute_sink::{ActiveComputeSink, ActiveCopyFrom};
180use crate::catalog::{BuiltinTableUpdate, Catalog, OpenCatalogResult};
181use crate::client::{Client, Handle};
182use crate::command::{Command, ExecuteResponse};
183use crate::config::{SynchronizedParameters, SystemParameterFrontend, SystemParameterSyncConfig};
184use crate::coord::appends::{
185    BuiltinTableAppendNotify, DeferredOp, GroupCommitPermit, PendingWriteTxn,
186};
187use crate::coord::caught_up::CaughtUpCheckContext;
188use crate::coord::cluster_scheduling::SchedulingDecision;
189use crate::coord::id_bundle::CollectionIdBundle;
190use crate::coord::introspection::IntrospectionSubscribe;
191use crate::coord::peek::PendingPeek;
192use crate::coord::statement_logging::StatementLogging;
193use crate::coord::timeline::{TimelineContext, TimelineState};
194use crate::coord::timestamp_selection::{TimestampContext, TimestampDetermination};
195use crate::coord::validity::PlanValidity;
196use crate::error::AdapterError;
197use crate::explain::insights::PlanInsightsContext;
198use crate::explain::optimizer_trace::{DispatchGuard, OptimizerTrace};
199use crate::metrics::Metrics;
200use crate::optimize::dataflows::{ComputeInstanceSnapshot, DataflowBuilder};
201use crate::optimize::{self, Optimize, OptimizerConfig};
202use crate::session::{EndTransactionAction, Session};
203use crate::statement_logging::{
204    StatementEndedExecutionReason, StatementLifecycleEvent, StatementLoggingId,
205};
206use crate::util::{ClientTransmitter, ResultExt, sort_topological};
207use crate::webhook::{WebhookAppenderInvalidator, WebhookConcurrencyLimiter};
208use crate::{AdapterNotice, ReadHolds, flags};
209
210pub(crate) mod appends;
211pub(crate) mod catalog_serving;
212pub(crate) mod cluster_scheduling;
213pub(crate) mod consistency;
214pub(crate) mod id_bundle;
215pub(crate) mod in_memory_oracle;
216pub(crate) mod peek;
217pub(crate) mod read_policy;
218pub(crate) mod sequencer;
219pub(crate) mod statement_logging;
220pub(crate) mod timeline;
221pub(crate) mod timestamp_selection;
222
223pub mod catalog_implications;
224mod caught_up;
225mod command_handler;
226mod ddl;
227mod indexes;
228mod introspection;
229mod message_handler;
230mod privatelink_status;
231mod sql;
232mod validity;
233
234/// A pool of pre-allocated user IDs to avoid per-DDL persist writes.
235///
236/// IDs in the range `[next, upper)` are available for allocation.
237/// When exhausted, the pool must be refilled via the catalog.
238///
239/// # Correctness
240///
241/// The pool is owned by [`Coordinator`], which processes all requests
242/// on a single-threaded event loop. Because every access requires
243/// `&mut self` on the coordinator, there is no concurrent access to the
244/// pool — no additional synchronization is needed.
245///
246/// Global ID uniqueness is guaranteed because each refill calls
247/// [`Catalog::allocate_user_ids`], which performs a durable persist
248/// write that atomically reserves the entire batch before any IDs from
249/// it are handed out. If the process crashes after a refill but before
250/// all pre-allocated IDs are consumed, the unused IDs form harmless
251/// gaps in the sequence — user IDs are not required to be contiguous.
252///
253/// This guarantee holds even if multiple `environmentd` processes run
254/// concurrently. Each process has its own independent pool,
255/// but every refill goes through the shared persist-backed catalog,
256/// which serializes allocations across all callers. Two processes
257/// will therefore never receive overlapping ID ranges,
258/// for the same reason they could not before this pool existed.
259#[derive(Debug)]
260pub(crate) struct IdPool {
261    next: u64,
262    upper: u64,
263}
264
265impl IdPool {
266    /// Creates an empty pool.
267    pub fn empty() -> Self {
268        IdPool { next: 0, upper: 0 }
269    }
270
271    /// Allocates a single ID from the pool, returning `None` if exhausted.
272    pub fn allocate(&mut self) -> Option<u64> {
273        if self.next < self.upper {
274            let id = self.next;
275            self.next += 1;
276            Some(id)
277        } else {
278            None
279        }
280    }
281
282    /// Allocates `n` consecutive IDs from the pool, returning `None` if
283    /// insufficient IDs remain.
284    pub fn allocate_many(&mut self, n: u64) -> Option<Vec<u64>> {
285        if self.remaining() >= n {
286            let ids = (self.next..self.next + n).collect();
287            self.next += n;
288            Some(ids)
289        } else {
290            None
291        }
292    }
293
294    /// Returns the number of IDs remaining in the pool.
295    pub fn remaining(&self) -> u64 {
296        self.upper - self.next
297    }
298
299    /// Refills the pool with the given range `[next, upper)`.
300    pub fn refill(&mut self, next: u64, upper: u64) {
301        assert!(next <= upper, "invalid pool range: {next}..{upper}");
302        self.next = next;
303        self.upper = upper;
304    }
305}
306
307#[derive(Debug)]
308pub enum Message {
309    Command(OpenTelemetryContext, Command),
310    ControllerReady {
311        controller: ControllerReadiness,
312    },
313    PurifiedStatementReady(PurifiedStatementReady),
314    CreateConnectionValidationReady(CreateConnectionValidationReady),
315    AlterConnectionValidationReady(AlterConnectionValidationReady),
316    TryDeferred {
317        /// The connection that created this op.
318        conn_id: ConnectionId,
319        /// The write lock that notified us our deferred op might be able to run.
320        ///
321        /// Note: While we never want to hold a partial set of locks, it can be important to hold
322        /// onto the _one_ that notified us our op might be ready. If there are multiple operations
323        /// waiting on a single collection, and we don't hold this lock through retyring the op,
324        /// then everything waiting on this collection will get retried causing traffic in the
325        /// Coordinator's message queue.
326        ///
327        /// See [`DeferredOp::can_be_optimistically_retried`] for more detail.
328        acquired_lock: Option<(CatalogItemId, tokio::sync::OwnedMutexGuard<()>)>,
329    },
330    /// Initiates a group commit.
331    GroupCommitInitiate(Span, Option<GroupCommitPermit>),
332    DeferredStatementReady,
333    AdvanceTimelines,
334    ClusterEvent(ClusterEvent),
335    CancelPendingPeeks {
336        conn_id: ConnectionId,
337    },
338    LinearizeReads,
339    StagedBatches {
340        conn_id: ConnectionId,
341        table_id: CatalogItemId,
342        batches: Vec<Result<ProtoBatch, String>>,
343    },
344    StorageUsageSchedule,
345    StorageUsageFetch,
346    StorageUsageUpdate(ShardsUsageReferenced),
347    StorageUsagePrune(Vec<BuiltinTableUpdate>),
348    /// Performs any cleanup and logging actions necessary for
349    /// finalizing a statement execution.
350    RetireExecute {
351        data: ExecuteContextExtra,
352        otel_ctx: OpenTelemetryContext,
353        reason: StatementEndedExecutionReason,
354    },
355    ExecuteSingleStatementTransaction {
356        ctx: ExecuteContext,
357        otel_ctx: OpenTelemetryContext,
358        stmt: Arc<Statement<Raw>>,
359        params: mz_sql::plan::Params,
360    },
361    PeekStageReady {
362        ctx: ExecuteContext,
363        span: Span,
364        stage: PeekStage,
365    },
366    CreateIndexStageReady {
367        ctx: ExecuteContext,
368        span: Span,
369        stage: CreateIndexStage,
370    },
371    CreateViewStageReady {
372        ctx: ExecuteContext,
373        span: Span,
374        stage: CreateViewStage,
375    },
376    CreateMaterializedViewStageReady {
377        ctx: ExecuteContext,
378        span: Span,
379        stage: CreateMaterializedViewStage,
380    },
381    SubscribeStageReady {
382        ctx: ExecuteContext,
383        span: Span,
384        stage: SubscribeStage,
385    },
386    IntrospectionSubscribeStageReady {
387        span: Span,
388        stage: IntrospectionSubscribeStage,
389    },
390    SecretStageReady {
391        ctx: ExecuteContext,
392        span: Span,
393        stage: SecretStage,
394    },
395    ClusterStageReady {
396        ctx: ExecuteContext,
397        span: Span,
398        stage: ClusterStage,
399    },
400    ExplainTimestampStageReady {
401        ctx: ExecuteContext,
402        span: Span,
403        stage: ExplainTimestampStage,
404    },
405    DrainStatementLog,
406    PrivateLinkVpcEndpointEvents(Vec<VpcEndpointEvent>),
407    CheckSchedulingPolicies,
408
409    /// Scheduling policy decisions about turning clusters On/Off.
410    /// `Vec<(policy name, Vec of decisions by the policy)>`
411    /// A cluster will be On if and only if there is at least one On decision for it.
412    /// Scheduling decisions for clusters that have `SCHEDULE = MANUAL` are ignored.
413    SchedulingDecisions(Vec<(&'static str, Vec<(ClusterId, SchedulingDecision)>)>),
414}
415
416impl Message {
417    /// Returns a string to identify the kind of [`Message`], useful for logging.
418    pub const fn kind(&self) -> &'static str {
419        match self {
420            Message::Command(_, msg) => match msg {
421                Command::CatalogSnapshot { .. } => "command-catalog_snapshot",
422                Command::Startup { .. } => "command-startup",
423                Command::Execute { .. } => "command-execute",
424                Command::Commit { .. } => "command-commit",
425                Command::CancelRequest { .. } => "command-cancel_request",
426                Command::PrivilegedCancelRequest { .. } => "command-privileged_cancel_request",
427                Command::GetWebhook { .. } => "command-get_webhook",
428                Command::GetSystemVars { .. } => "command-get_system_vars",
429                Command::SetSystemVars { .. } => "command-set_system_vars",
430                Command::Terminate { .. } => "command-terminate",
431                Command::RetireExecute { .. } => "command-retire_execute",
432                Command::CheckConsistency { .. } => "command-check_consistency",
433                Command::Dump { .. } => "command-dump",
434                Command::AuthenticatePassword { .. } => "command-auth_check",
435                Command::AuthenticateGetSASLChallenge { .. } => "command-auth_get_sasl_challenge",
436                Command::AuthenticateVerifySASLProof { .. } => "command-auth_verify_sasl_proof",
437                Command::CheckRoleCanLogin { .. } => "command-check_role_can_login",
438                Command::GetComputeInstanceClient { .. } => "get-compute-instance-client",
439                Command::GetOracle { .. } => "get-oracle",
440                Command::DetermineRealTimeRecentTimestamp { .. } => {
441                    "determine-real-time-recent-timestamp"
442                }
443                Command::GetTransactionReadHoldsBundle { .. } => {
444                    "get-transaction-read-holds-bundle"
445                }
446                Command::StoreTransactionReadHolds { .. } => "store-transaction-read-holds",
447                Command::ExecuteSlowPathPeek { .. } => "execute-slow-path-peek",
448                Command::ExecuteSubscribe { .. } => "execute-subscribe",
449                Command::CopyToPreflight { .. } => "copy-to-preflight",
450                Command::ExecuteCopyTo { .. } => "execute-copy-to",
451                Command::ExecuteSideEffectingFunc { .. } => "execute-side-effecting-func",
452                Command::RegisterFrontendPeek { .. } => "register-frontend-peek",
453                Command::UnregisterFrontendPeek { .. } => "unregister-frontend-peek",
454                Command::ExplainTimestamp { .. } => "explain-timestamp",
455                Command::FrontendStatementLogging(..) => "frontend-statement-logging",
456                Command::StartCopyFromStdin { .. } => "start-copy-from-stdin",
457                Command::InjectAuditEvents { .. } => "inject-audit-events",
458            },
459            Message::ControllerReady {
460                controller: ControllerReadiness::Compute,
461            } => "controller_ready(compute)",
462            Message::ControllerReady {
463                controller: ControllerReadiness::Storage,
464            } => "controller_ready(storage)",
465            Message::ControllerReady {
466                controller: ControllerReadiness::Metrics,
467            } => "controller_ready(metrics)",
468            Message::ControllerReady {
469                controller: ControllerReadiness::Internal,
470            } => "controller_ready(internal)",
471            Message::PurifiedStatementReady(_) => "purified_statement_ready",
472            Message::CreateConnectionValidationReady(_) => "create_connection_validation_ready",
473            Message::TryDeferred { .. } => "try_deferred",
474            Message::GroupCommitInitiate(..) => "group_commit_initiate",
475            Message::AdvanceTimelines => "advance_timelines",
476            Message::ClusterEvent(_) => "cluster_event",
477            Message::CancelPendingPeeks { .. } => "cancel_pending_peeks",
478            Message::LinearizeReads => "linearize_reads",
479            Message::StagedBatches { .. } => "staged_batches",
480            Message::StorageUsageSchedule => "storage_usage_schedule",
481            Message::StorageUsageFetch => "storage_usage_fetch",
482            Message::StorageUsageUpdate(_) => "storage_usage_update",
483            Message::StorageUsagePrune(_) => "storage_usage_prune",
484            Message::RetireExecute { .. } => "retire_execute",
485            Message::ExecuteSingleStatementTransaction { .. } => {
486                "execute_single_statement_transaction"
487            }
488            Message::PeekStageReady { .. } => "peek_stage_ready",
489            Message::ExplainTimestampStageReady { .. } => "explain_timestamp_stage_ready",
490            Message::CreateIndexStageReady { .. } => "create_index_stage_ready",
491            Message::CreateViewStageReady { .. } => "create_view_stage_ready",
492            Message::CreateMaterializedViewStageReady { .. } => {
493                "create_materialized_view_stage_ready"
494            }
495            Message::SubscribeStageReady { .. } => "subscribe_stage_ready",
496            Message::IntrospectionSubscribeStageReady { .. } => {
497                "introspection_subscribe_stage_ready"
498            }
499            Message::SecretStageReady { .. } => "secret_stage_ready",
500            Message::ClusterStageReady { .. } => "cluster_stage_ready",
501            Message::DrainStatementLog => "drain_statement_log",
502            Message::AlterConnectionValidationReady(..) => "alter_connection_validation_ready",
503            Message::PrivateLinkVpcEndpointEvents(_) => "private_link_vpc_endpoint_events",
504            Message::CheckSchedulingPolicies => "check_scheduling_policies",
505            Message::SchedulingDecisions { .. } => "scheduling_decision",
506            Message::DeferredStatementReady => "deferred_statement_ready",
507        }
508    }
509}
510
511/// The reason for why a controller needs processing on the main loop.
512#[derive(Debug)]
513pub enum ControllerReadiness {
514    /// The storage controller is ready.
515    Storage,
516    /// The compute controller is ready.
517    Compute,
518    /// A batch of metric data is ready.
519    Metrics,
520    /// An internally-generated message is ready to be returned.
521    Internal,
522}
523
524#[derive(Derivative)]
525#[derivative(Debug)]
526pub struct BackgroundWorkResult<T> {
527    #[derivative(Debug = "ignore")]
528    pub ctx: ExecuteContext,
529    pub result: Result<T, AdapterError>,
530    pub params: Params,
531    pub plan_validity: PlanValidity,
532    pub original_stmt: Arc<Statement<Raw>>,
533    pub otel_ctx: OpenTelemetryContext,
534}
535
536pub type PurifiedStatementReady = BackgroundWorkResult<mz_sql::pure::PurifiedStatement>;
537
538#[derive(Derivative)]
539#[derivative(Debug)]
540pub struct ValidationReady<T> {
541    #[derivative(Debug = "ignore")]
542    pub ctx: ExecuteContext,
543    pub result: Result<T, AdapterError>,
544    pub resolved_ids: ResolvedIds,
545    pub connection_id: CatalogItemId,
546    pub connection_gid: GlobalId,
547    pub plan_validity: PlanValidity,
548    pub otel_ctx: OpenTelemetryContext,
549}
550
551pub type CreateConnectionValidationReady = ValidationReady<CreateConnectionPlan>;
552pub type AlterConnectionValidationReady = ValidationReady<Connection>;
553
554#[derive(Debug)]
555pub enum PeekStage {
556    /// Common stages across SELECT, EXPLAIN and COPY TO queries.
557    LinearizeTimestamp(PeekStageLinearizeTimestamp),
558    RealTimeRecency(PeekStageRealTimeRecency),
559    TimestampReadHold(PeekStageTimestampReadHold),
560    Optimize(PeekStageOptimize),
561    /// Final stage for a peek.
562    Finish(PeekStageFinish),
563    /// Final stage for an explain.
564    ExplainPlan(PeekStageExplainPlan),
565    ExplainPushdown(PeekStageExplainPushdown),
566    /// Preflight checks for a copy to operation.
567    CopyToPreflight(PeekStageCopyTo),
568    /// Final stage for a copy to which involves shipping the dataflow.
569    CopyToDataflow(PeekStageCopyTo),
570}
571
572#[derive(Debug)]
573pub struct CopyToContext {
574    /// The `RelationDesc` of the data to be copied.
575    pub desc: RelationDesc,
576    /// The destination uri of the external service where the data will be copied.
577    pub uri: Uri,
578    /// Connection information required to connect to the external service to copy the data.
579    pub connection: StorageConnection<ReferencedConnection>,
580    /// The ID of the CONNECTION object to be used for copying the data.
581    pub connection_id: CatalogItemId,
582    /// Format params to format the data.
583    pub format: S3SinkFormat,
584    /// Approximate max file size of each uploaded file.
585    pub max_file_size: u64,
586    /// Number of batches the output of the COPY TO will be partitioned into
587    /// to distribute the load across workers deterministically.
588    /// This is only an option since it's not set when CopyToContext is instantiated
589    /// but immediately after in the PeekStageValidate stage.
590    pub output_batch_count: Option<u64>,
591}
592
593#[derive(Debug)]
594pub struct PeekStageLinearizeTimestamp {
595    validity: PlanValidity,
596    plan: mz_sql::plan::SelectPlan,
597    max_query_result_size: Option<u64>,
598    source_ids: BTreeSet<GlobalId>,
599    target_replica: Option<ReplicaId>,
600    timeline_context: TimelineContext,
601    optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
602    /// An optional context set iff the state machine is initiated from
603    /// sequencing an EXPLAIN for this statement.
604    explain_ctx: ExplainContext,
605}
606
607#[derive(Debug)]
608pub struct PeekStageRealTimeRecency {
609    validity: PlanValidity,
610    plan: mz_sql::plan::SelectPlan,
611    max_query_result_size: Option<u64>,
612    source_ids: BTreeSet<GlobalId>,
613    target_replica: Option<ReplicaId>,
614    timeline_context: TimelineContext,
615    oracle_read_ts: Option<Timestamp>,
616    optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
617    /// An optional context set iff the state machine is initiated from
618    /// sequencing an EXPLAIN for this statement.
619    explain_ctx: ExplainContext,
620}
621
622#[derive(Debug)]
623pub struct PeekStageTimestampReadHold {
624    validity: PlanValidity,
625    plan: mz_sql::plan::SelectPlan,
626    max_query_result_size: Option<u64>,
627    source_ids: BTreeSet<GlobalId>,
628    target_replica: Option<ReplicaId>,
629    timeline_context: TimelineContext,
630    oracle_read_ts: Option<Timestamp>,
631    real_time_recency_ts: Option<mz_repr::Timestamp>,
632    optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
633    /// An optional context set iff the state machine is initiated from
634    /// sequencing an EXPLAIN for this statement.
635    explain_ctx: ExplainContext,
636}
637
638#[derive(Debug)]
639pub struct PeekStageOptimize {
640    validity: PlanValidity,
641    plan: mz_sql::plan::SelectPlan,
642    max_query_result_size: Option<u64>,
643    source_ids: BTreeSet<GlobalId>,
644    id_bundle: CollectionIdBundle,
645    target_replica: Option<ReplicaId>,
646    determination: TimestampDetermination<mz_repr::Timestamp>,
647    optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
648    /// An optional context set iff the state machine is initiated from
649    /// sequencing an EXPLAIN for this statement.
650    explain_ctx: ExplainContext,
651}
652
653#[derive(Debug)]
654pub struct PeekStageFinish {
655    validity: PlanValidity,
656    plan: mz_sql::plan::SelectPlan,
657    max_query_result_size: Option<u64>,
658    id_bundle: CollectionIdBundle,
659    target_replica: Option<ReplicaId>,
660    source_ids: BTreeSet<GlobalId>,
661    determination: TimestampDetermination<mz_repr::Timestamp>,
662    cluster_id: ComputeInstanceId,
663    finishing: RowSetFinishing,
664    /// When present, an optimizer trace to be used for emitting a plan insights
665    /// notice.
666    plan_insights_optimizer_trace: Option<OptimizerTrace>,
667    insights_ctx: Option<Box<PlanInsightsContext>>,
668    global_lir_plan: optimize::peek::GlobalLirPlan,
669    optimization_finished_at: EpochMillis,
670}
671
672#[derive(Debug)]
673pub struct PeekStageCopyTo {
674    validity: PlanValidity,
675    optimizer: optimize::copy_to::Optimizer,
676    global_lir_plan: optimize::copy_to::GlobalLirPlan,
677    optimization_finished_at: EpochMillis,
678    source_ids: BTreeSet<GlobalId>,
679}
680
681#[derive(Debug)]
682pub struct PeekStageExplainPlan {
683    validity: PlanValidity,
684    optimizer: optimize::peek::Optimizer,
685    df_meta: DataflowMetainfo,
686    explain_ctx: ExplainPlanContext,
687    insights_ctx: Option<Box<PlanInsightsContext>>,
688}
689
690#[derive(Debug)]
691pub struct PeekStageExplainPushdown {
692    validity: PlanValidity,
693    determination: TimestampDetermination<mz_repr::Timestamp>,
694    imports: BTreeMap<GlobalId, MapFilterProject>,
695}
696
697#[derive(Debug)]
698pub enum CreateIndexStage {
699    Optimize(CreateIndexOptimize),
700    Finish(CreateIndexFinish),
701    Explain(CreateIndexExplain),
702}
703
704#[derive(Debug)]
705pub struct CreateIndexOptimize {
706    validity: PlanValidity,
707    plan: plan::CreateIndexPlan,
708    resolved_ids: ResolvedIds,
709    /// An optional context set iff the state machine is initiated from
710    /// sequencing an EXPLAIN for this statement.
711    explain_ctx: ExplainContext,
712}
713
714#[derive(Debug)]
715pub struct CreateIndexFinish {
716    validity: PlanValidity,
717    item_id: CatalogItemId,
718    global_id: GlobalId,
719    plan: plan::CreateIndexPlan,
720    resolved_ids: ResolvedIds,
721    global_mir_plan: optimize::index::GlobalMirPlan,
722    global_lir_plan: optimize::index::GlobalLirPlan,
723    optimizer_features: OptimizerFeatures,
724}
725
726#[derive(Debug)]
727pub struct CreateIndexExplain {
728    validity: PlanValidity,
729    exported_index_id: GlobalId,
730    plan: plan::CreateIndexPlan,
731    df_meta: DataflowMetainfo,
732    explain_ctx: ExplainPlanContext,
733}
734
735#[derive(Debug)]
736pub enum CreateViewStage {
737    Optimize(CreateViewOptimize),
738    Finish(CreateViewFinish),
739    Explain(CreateViewExplain),
740}
741
742#[derive(Debug)]
743pub struct CreateViewOptimize {
744    validity: PlanValidity,
745    plan: plan::CreateViewPlan,
746    resolved_ids: ResolvedIds,
747    /// An optional context set iff the state machine is initiated from
748    /// sequencing an EXPLAIN for this statement.
749    explain_ctx: ExplainContext,
750}
751
752#[derive(Debug)]
753pub struct CreateViewFinish {
754    validity: PlanValidity,
755    /// ID of this item in the Catalog.
756    item_id: CatalogItemId,
757    /// ID by with Compute will reference this View.
758    global_id: GlobalId,
759    plan: plan::CreateViewPlan,
760    /// IDs of objects resolved during name resolution.
761    resolved_ids: ResolvedIds,
762    optimized_expr: OptimizedMirRelationExpr,
763}
764
765#[derive(Debug)]
766pub struct CreateViewExplain {
767    validity: PlanValidity,
768    id: GlobalId,
769    plan: plan::CreateViewPlan,
770    explain_ctx: ExplainPlanContext,
771}
772
773#[derive(Debug)]
774pub enum ExplainTimestampStage {
775    Optimize(ExplainTimestampOptimize),
776    RealTimeRecency(ExplainTimestampRealTimeRecency),
777    Finish(ExplainTimestampFinish),
778}
779
780#[derive(Debug)]
781pub struct ExplainTimestampOptimize {
782    validity: PlanValidity,
783    plan: plan::ExplainTimestampPlan,
784    cluster_id: ClusterId,
785}
786
787#[derive(Debug)]
788pub struct ExplainTimestampRealTimeRecency {
789    validity: PlanValidity,
790    format: ExplainFormat,
791    optimized_plan: OptimizedMirRelationExpr,
792    cluster_id: ClusterId,
793    when: QueryWhen,
794}
795
796#[derive(Debug)]
797pub struct ExplainTimestampFinish {
798    validity: PlanValidity,
799    format: ExplainFormat,
800    optimized_plan: OptimizedMirRelationExpr,
801    cluster_id: ClusterId,
802    source_ids: BTreeSet<GlobalId>,
803    when: QueryWhen,
804    real_time_recency_ts: Option<Timestamp>,
805}
806
807#[derive(Debug)]
808pub enum ClusterStage {
809    Alter(AlterCluster),
810    WaitForHydrated(AlterClusterWaitForHydrated),
811    Finalize(AlterClusterFinalize),
812}
813
814#[derive(Debug)]
815pub struct AlterCluster {
816    validity: PlanValidity,
817    plan: plan::AlterClusterPlan,
818}
819
820#[derive(Debug)]
821pub struct AlterClusterWaitForHydrated {
822    validity: PlanValidity,
823    plan: plan::AlterClusterPlan,
824    new_config: ClusterVariantManaged,
825    timeout_time: Instant,
826    on_timeout: OnTimeoutAction,
827}
828
829#[derive(Debug)]
830pub struct AlterClusterFinalize {
831    validity: PlanValidity,
832    plan: plan::AlterClusterPlan,
833    new_config: ClusterVariantManaged,
834}
835
836#[derive(Debug)]
837pub enum ExplainContext {
838    /// The ordinary, non-explain variant of the statement.
839    None,
840    /// The `EXPLAIN <level> PLAN FOR <explainee>` version of the statement.
841    Plan(ExplainPlanContext),
842    /// Generate a notice containing the `EXPLAIN PLAN INSIGHTS` output
843    /// alongside the query's normal output.
844    PlanInsightsNotice(OptimizerTrace),
845    /// `EXPLAIN FILTER PUSHDOWN`
846    Pushdown,
847}
848
849impl ExplainContext {
850    /// If available for this context, wrap the [`OptimizerTrace`] into a
851    /// [`tracing::Dispatch`] and set it as default, returning the resulting
852    /// guard in a `Some(guard)` option.
853    pub(crate) fn dispatch_guard(&self) -> Option<DispatchGuard<'_>> {
854        let optimizer_trace = match self {
855            ExplainContext::Plan(explain_ctx) => Some(&explain_ctx.optimizer_trace),
856            ExplainContext::PlanInsightsNotice(optimizer_trace) => Some(optimizer_trace),
857            _ => None,
858        };
859        optimizer_trace.map(|optimizer_trace| optimizer_trace.as_guard())
860    }
861
862    pub(crate) fn needs_cluster(&self) -> bool {
863        match self {
864            ExplainContext::None => true,
865            ExplainContext::Plan(..) => false,
866            ExplainContext::PlanInsightsNotice(..) => true,
867            ExplainContext::Pushdown => false,
868        }
869    }
870
871    pub(crate) fn needs_plan_insights(&self) -> bool {
872        matches!(
873            self,
874            ExplainContext::Plan(ExplainPlanContext {
875                stage: ExplainStage::PlanInsights,
876                ..
877            }) | ExplainContext::PlanInsightsNotice(_)
878        )
879    }
880}
881
882#[derive(Debug)]
883pub struct ExplainPlanContext {
884    /// EXPLAIN BROKEN is internal syntax for showing EXPLAIN output despite an internal error in
885    /// the optimizer: we don't immediately bail out from peek sequencing when an internal optimizer
886    /// error happens, but go on with trying to show the requested EXPLAIN stage. This can still
887    /// succeed if the requested EXPLAIN stage is before the point where the error happened.
888    pub broken: bool,
889    pub config: ExplainConfig,
890    pub format: ExplainFormat,
891    pub stage: ExplainStage,
892    pub replan: Option<GlobalId>,
893    pub desc: Option<RelationDesc>,
894    pub optimizer_trace: OptimizerTrace,
895}
896
897#[derive(Debug)]
898pub enum CreateMaterializedViewStage {
899    Optimize(CreateMaterializedViewOptimize),
900    Finish(CreateMaterializedViewFinish),
901    Explain(CreateMaterializedViewExplain),
902}
903
904#[derive(Debug)]
905pub struct CreateMaterializedViewOptimize {
906    validity: PlanValidity,
907    plan: plan::CreateMaterializedViewPlan,
908    resolved_ids: ResolvedIds,
909    /// An optional context set iff the state machine is initiated from
910    /// sequencing an EXPLAIN for this statement.
911    explain_ctx: ExplainContext,
912}
913
914#[derive(Debug)]
915pub struct CreateMaterializedViewFinish {
916    /// The ID of this Materialized View in the Catalog.
917    item_id: CatalogItemId,
918    /// The ID of the durable pTVC backing this Materialized View.
919    global_id: GlobalId,
920    validity: PlanValidity,
921    plan: plan::CreateMaterializedViewPlan,
922    resolved_ids: ResolvedIds,
923    local_mir_plan: optimize::materialized_view::LocalMirPlan,
924    global_mir_plan: optimize::materialized_view::GlobalMirPlan,
925    global_lir_plan: optimize::materialized_view::GlobalLirPlan,
926    optimizer_features: OptimizerFeatures,
927}
928
929#[derive(Debug)]
930pub struct CreateMaterializedViewExplain {
931    global_id: GlobalId,
932    validity: PlanValidity,
933    plan: plan::CreateMaterializedViewPlan,
934    df_meta: DataflowMetainfo,
935    explain_ctx: ExplainPlanContext,
936}
937
938#[derive(Debug)]
939pub enum SubscribeStage {
940    OptimizeMir(SubscribeOptimizeMir),
941    TimestampOptimizeLir(SubscribeTimestampOptimizeLir),
942    Finish(SubscribeFinish),
943    Explain(SubscribeExplain),
944}
945
946#[derive(Debug)]
947pub struct SubscribeOptimizeMir {
948    validity: PlanValidity,
949    plan: plan::SubscribePlan,
950    timeline: TimelineContext,
951    dependency_ids: BTreeSet<GlobalId>,
952    cluster_id: ComputeInstanceId,
953    replica_id: Option<ReplicaId>,
954    /// An optional context set iff the state machine is initiated from
955    /// sequencing an EXPLAIN for this statement.
956    explain_ctx: ExplainContext,
957}
958
959#[derive(Debug)]
960pub struct SubscribeTimestampOptimizeLir {
961    validity: PlanValidity,
962    plan: plan::SubscribePlan,
963    timeline: TimelineContext,
964    optimizer: optimize::subscribe::Optimizer,
965    global_mir_plan: optimize::subscribe::GlobalMirPlan<optimize::subscribe::Unresolved>,
966    dependency_ids: BTreeSet<GlobalId>,
967    replica_id: Option<ReplicaId>,
968    /// An optional context set iff the state machine is initiated from
969    /// sequencing an EXPLAIN for this statement.
970    explain_ctx: ExplainContext,
971}
972
973#[derive(Debug)]
974pub struct SubscribeFinish {
975    validity: PlanValidity,
976    cluster_id: ComputeInstanceId,
977    replica_id: Option<ReplicaId>,
978    plan: plan::SubscribePlan,
979    global_lir_plan: optimize::subscribe::GlobalLirPlan,
980    dependency_ids: BTreeSet<GlobalId>,
981}
982
983#[derive(Debug)]
984pub struct SubscribeExplain {
985    validity: PlanValidity,
986    optimizer: optimize::subscribe::Optimizer,
987    df_meta: DataflowMetainfo,
988    cluster_id: ComputeInstanceId,
989    explain_ctx: ExplainPlanContext,
990}
991
992#[derive(Debug)]
993pub enum IntrospectionSubscribeStage {
994    OptimizeMir(IntrospectionSubscribeOptimizeMir),
995    TimestampOptimizeLir(IntrospectionSubscribeTimestampOptimizeLir),
996    Finish(IntrospectionSubscribeFinish),
997}
998
999#[derive(Debug)]
1000pub struct IntrospectionSubscribeOptimizeMir {
1001    validity: PlanValidity,
1002    plan: plan::SubscribePlan,
1003    subscribe_id: GlobalId,
1004    cluster_id: ComputeInstanceId,
1005    replica_id: ReplicaId,
1006}
1007
1008#[derive(Debug)]
1009pub struct IntrospectionSubscribeTimestampOptimizeLir {
1010    validity: PlanValidity,
1011    optimizer: optimize::subscribe::Optimizer,
1012    global_mir_plan: optimize::subscribe::GlobalMirPlan<optimize::subscribe::Unresolved>,
1013    cluster_id: ComputeInstanceId,
1014    replica_id: ReplicaId,
1015}
1016
1017#[derive(Debug)]
1018pub struct IntrospectionSubscribeFinish {
1019    validity: PlanValidity,
1020    global_lir_plan: optimize::subscribe::GlobalLirPlan,
1021    read_holds: ReadHolds<Timestamp>,
1022    cluster_id: ComputeInstanceId,
1023    replica_id: ReplicaId,
1024}
1025
1026#[derive(Debug)]
1027pub enum SecretStage {
1028    CreateEnsure(CreateSecretEnsure),
1029    CreateFinish(CreateSecretFinish),
1030    RotateKeysEnsure(RotateKeysSecretEnsure),
1031    RotateKeysFinish(RotateKeysSecretFinish),
1032    Alter(AlterSecret),
1033}
1034
1035#[derive(Debug)]
1036pub struct CreateSecretEnsure {
1037    validity: PlanValidity,
1038    plan: plan::CreateSecretPlan,
1039}
1040
1041#[derive(Debug)]
1042pub struct CreateSecretFinish {
1043    validity: PlanValidity,
1044    item_id: CatalogItemId,
1045    global_id: GlobalId,
1046    plan: plan::CreateSecretPlan,
1047}
1048
1049#[derive(Debug)]
1050pub struct RotateKeysSecretEnsure {
1051    validity: PlanValidity,
1052    id: CatalogItemId,
1053}
1054
1055#[derive(Debug)]
1056pub struct RotateKeysSecretFinish {
1057    validity: PlanValidity,
1058    ops: Vec<crate::catalog::Op>,
1059}
1060
1061#[derive(Debug)]
1062pub struct AlterSecret {
1063    validity: PlanValidity,
1064    plan: plan::AlterSecretPlan,
1065}
1066
1067/// An enum describing which cluster to run a statement on.
1068///
1069/// One example usage would be that if a query depends only on system tables, we might
1070/// automatically run it on the catalog server cluster to benefit from indexes that exist there.
1071#[derive(Debug, Copy, Clone, PartialEq, Eq)]
1072pub enum TargetCluster {
1073    /// The catalog server cluster.
1074    CatalogServer,
1075    /// The current user's active cluster.
1076    Active,
1077    /// The cluster selected at the start of a transaction.
1078    Transaction(ClusterId),
1079}
1080
1081/// Result types for each stage of a sequence.
1082pub(crate) enum StageResult<T> {
1083    /// A task was spawned that will return the next stage.
1084    Handle(JoinHandle<Result<T, AdapterError>>),
1085    /// A task was spawned that will return a response for the client.
1086    HandleRetire(JoinHandle<Result<ExecuteResponse, AdapterError>>),
1087    /// The next stage is immediately ready and will execute.
1088    Immediate(T),
1089    /// The final stage was executed and is ready to respond to the client.
1090    Response(ExecuteResponse),
1091}
1092
1093/// Common functionality for [Coordinator::sequence_staged].
1094pub(crate) trait Staged: Send {
1095    type Ctx: StagedContext;
1096
1097    fn validity(&mut self) -> &mut PlanValidity;
1098
1099    /// Returns the next stage or final result.
1100    async fn stage(
1101        self,
1102        coord: &mut Coordinator,
1103        ctx: &mut Self::Ctx,
1104    ) -> Result<StageResult<Box<Self>>, AdapterError>;
1105
1106    /// Prepares a message for the Coordinator.
1107    fn message(self, ctx: Self::Ctx, span: Span) -> Message;
1108
1109    /// Whether it is safe to SQL cancel this stage.
1110    fn cancel_enabled(&self) -> bool;
1111}
1112
1113pub trait StagedContext {
1114    fn retire(self, result: Result<ExecuteResponse, AdapterError>);
1115    fn session(&self) -> Option<&Session>;
1116}
1117
1118impl StagedContext for ExecuteContext {
1119    fn retire(self, result: Result<ExecuteResponse, AdapterError>) {
1120        self.retire(result);
1121    }
1122
1123    fn session(&self) -> Option<&Session> {
1124        Some(self.session())
1125    }
1126}
1127
1128impl StagedContext for () {
1129    fn retire(self, _result: Result<ExecuteResponse, AdapterError>) {}
1130
1131    fn session(&self) -> Option<&Session> {
1132        None
1133    }
1134}
1135
1136/// Configures a coordinator.
1137pub struct Config {
1138    pub controller_config: ControllerConfig,
1139    pub controller_envd_epoch: NonZeroI64,
1140    pub storage: Box<dyn mz_catalog::durable::DurableCatalogState>,
1141    pub audit_logs_iterator: AuditLogIterator,
1142    pub timestamp_oracle_url: Option<SensitiveUrl>,
1143    pub unsafe_mode: bool,
1144    pub all_features: bool,
1145    pub build_info: &'static BuildInfo,
1146    pub environment_id: EnvironmentId,
1147    pub metrics_registry: MetricsRegistry,
1148    pub now: NowFn,
1149    pub secrets_controller: Arc<dyn SecretsController>,
1150    pub cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
1151    pub availability_zones: Vec<String>,
1152    pub cluster_replica_sizes: ClusterReplicaSizeMap,
1153    pub builtin_system_cluster_config: BootstrapBuiltinClusterConfig,
1154    pub builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig,
1155    pub builtin_probe_cluster_config: BootstrapBuiltinClusterConfig,
1156    pub builtin_support_cluster_config: BootstrapBuiltinClusterConfig,
1157    pub builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig,
1158    pub system_parameter_defaults: BTreeMap<String, String>,
1159    pub storage_usage_client: StorageUsageClient,
1160    pub storage_usage_collection_interval: Duration,
1161    pub storage_usage_retention_period: Option<Duration>,
1162    pub segment_client: Option<mz_segment::Client>,
1163    pub egress_addresses: Vec<IpNet>,
1164    pub remote_system_parameters: Option<BTreeMap<String, String>>,
1165    pub aws_account_id: Option<String>,
1166    pub aws_privatelink_availability_zones: Option<Vec<String>>,
1167    pub connection_context: ConnectionContext,
1168    pub connection_limit_callback: Box<dyn Fn(u64, u64) -> () + Send + Sync + 'static>,
1169    pub webhook_concurrency_limit: WebhookConcurrencyLimiter,
1170    pub http_host_name: Option<String>,
1171    pub tracing_handle: TracingHandle,
1172    /// Whether or not to start controllers in read-only mode. This is only
1173    /// meant for use during development of read-only clusters and 0dt upgrades
1174    /// and should go away once we have proper orchestration during upgrades.
1175    pub read_only_controllers: bool,
1176
1177    /// A trigger that signals that the current deployment has caught up with a
1178    /// previous deployment. Only used during 0dt deployment, while in read-only
1179    /// mode.
1180    pub caught_up_trigger: Option<Trigger>,
1181
1182    pub helm_chart_version: Option<String>,
1183    pub license_key: ValidatedLicenseKey,
1184    pub external_login_password_mz_system: Option<Password>,
1185    pub force_builtin_schema_migration: Option<String>,
1186}
1187
1188/// Metadata about an active connection.
1189#[derive(Debug, Serialize)]
1190pub struct ConnMeta {
1191    /// Pgwire specifies that every connection have a 32-bit secret associated
1192    /// with it, that is known to both the client and the server. Cancellation
1193    /// requests are required to authenticate with the secret of the connection
1194    /// that they are targeting.
1195    secret_key: u32,
1196    /// The time when the session's connection was initiated.
1197    connected_at: EpochMillis,
1198    user: User,
1199    application_name: String,
1200    uuid: Uuid,
1201    conn_id: ConnectionId,
1202    client_ip: Option<IpAddr>,
1203
1204    /// Sinks that will need to be dropped when the current transaction, if
1205    /// any, is cleared.
1206    drop_sinks: BTreeSet<GlobalId>,
1207
1208    /// Lock for the Coordinator's deferred statements that is dropped on transaction clear.
1209    #[serde(skip)]
1210    deferred_lock: Option<OwnedMutexGuard<()>>,
1211
1212    /// Cluster reconfigurations that will need to be
1213    /// cleaned up when the current transaction is cleared
1214    pending_cluster_alters: BTreeSet<ClusterId>,
1215
1216    /// Channel on which to send notices to a session.
1217    #[serde(skip)]
1218    notice_tx: mpsc::UnboundedSender<AdapterNotice>,
1219
1220    /// The role that initiated the database context. Fixed for the duration of the connection.
1221    /// WARNING: This role reference is not updated when the role is dropped.
1222    /// Consumers should not assume that this role exist.
1223    authenticated_role: RoleId,
1224}
1225
1226impl ConnMeta {
1227    pub fn conn_id(&self) -> &ConnectionId {
1228        &self.conn_id
1229    }
1230
1231    pub fn user(&self) -> &User {
1232        &self.user
1233    }
1234
1235    pub fn application_name(&self) -> &str {
1236        &self.application_name
1237    }
1238
1239    pub fn authenticated_role_id(&self) -> &RoleId {
1240        &self.authenticated_role
1241    }
1242
1243    pub fn uuid(&self) -> Uuid {
1244        self.uuid
1245    }
1246
1247    pub fn client_ip(&self) -> Option<IpAddr> {
1248        self.client_ip
1249    }
1250
1251    pub fn connected_at(&self) -> EpochMillis {
1252        self.connected_at
1253    }
1254}
1255
1256#[derive(Debug)]
1257/// A pending transaction waiting to be committed.
1258pub struct PendingTxn {
1259    /// Context used to send a response back to the client.
1260    ctx: ExecuteContext,
1261    /// Client response for transaction.
1262    response: Result<PendingTxnResponse, AdapterError>,
1263    /// The action to take at the end of the transaction.
1264    action: EndTransactionAction,
1265}
1266
1267#[derive(Debug)]
1268/// The response we'll send for a [`PendingTxn`].
1269pub enum PendingTxnResponse {
1270    /// The transaction will be committed.
1271    Committed {
1272        /// Parameters that will change, and their values, once this transaction is complete.
1273        params: BTreeMap<&'static str, String>,
1274    },
1275    /// The transaction will be rolled back.
1276    Rolledback {
1277        /// Parameters that will change, and their values, once this transaction is complete.
1278        params: BTreeMap<&'static str, String>,
1279    },
1280}
1281
1282impl PendingTxnResponse {
1283    pub fn extend_params(&mut self, p: impl IntoIterator<Item = (&'static str, String)>) {
1284        match self {
1285            PendingTxnResponse::Committed { params }
1286            | PendingTxnResponse::Rolledback { params } => params.extend(p),
1287        }
1288    }
1289}
1290
1291impl From<PendingTxnResponse> for ExecuteResponse {
1292    fn from(value: PendingTxnResponse) -> Self {
1293        match value {
1294            PendingTxnResponse::Committed { params } => {
1295                ExecuteResponse::TransactionCommitted { params }
1296            }
1297            PendingTxnResponse::Rolledback { params } => {
1298                ExecuteResponse::TransactionRolledBack { params }
1299            }
1300        }
1301    }
1302}
1303
1304#[derive(Debug)]
1305/// A pending read transaction waiting to be linearized along with metadata about it's state
1306pub struct PendingReadTxn {
1307    /// The transaction type
1308    txn: PendingRead,
1309    /// The timestamp context of the transaction.
1310    timestamp_context: TimestampContext<mz_repr::Timestamp>,
1311    /// When we created this pending txn, when the transaction ends. Only used for metrics.
1312    created: Instant,
1313    /// Number of times we requeued the processing of this pending read txn.
1314    /// Requeueing is necessary if the time we executed the query is after the current oracle time;
1315    /// see [`Coordinator::message_linearize_reads`] for more details.
1316    num_requeues: u64,
1317    /// Telemetry context.
1318    otel_ctx: OpenTelemetryContext,
1319}
1320
1321impl PendingReadTxn {
1322    /// Return the timestamp context of the pending read transaction.
1323    pub fn timestamp_context(&self) -> &TimestampContext<mz_repr::Timestamp> {
1324        &self.timestamp_context
1325    }
1326
1327    pub(crate) fn take_context(self) -> ExecuteContext {
1328        self.txn.take_context()
1329    }
1330}
1331
1332#[derive(Debug)]
1333/// A pending read transaction waiting to be linearized.
1334enum PendingRead {
1335    Read {
1336        /// The inner transaction.
1337        txn: PendingTxn,
1338    },
1339    ReadThenWrite {
1340        /// Context used to send a response back to the client.
1341        ctx: ExecuteContext,
1342        /// Channel used to alert the transaction that the read has been linearized and send back
1343        /// `ctx`.
1344        tx: oneshot::Sender<Option<ExecuteContext>>,
1345    },
1346}
1347
1348impl PendingRead {
1349    /// Alert the client that the read has been linearized.
1350    ///
1351    /// If it is necessary to finalize an execute, return the state necessary to do so
1352    /// (execution context and result)
1353    #[instrument(level = "debug")]
1354    pub fn finish(self) -> Option<(ExecuteContext, Result<ExecuteResponse, AdapterError>)> {
1355        match self {
1356            PendingRead::Read {
1357                txn:
1358                    PendingTxn {
1359                        mut ctx,
1360                        response,
1361                        action,
1362                    },
1363                ..
1364            } => {
1365                let changed = ctx.session_mut().vars_mut().end_transaction(action);
1366                // Append any parameters that changed to the response.
1367                let response = response.map(|mut r| {
1368                    r.extend_params(changed);
1369                    ExecuteResponse::from(r)
1370                });
1371
1372                Some((ctx, response))
1373            }
1374            PendingRead::ReadThenWrite { ctx, tx, .. } => {
1375                // Ignore errors if the caller has hung up.
1376                let _ = tx.send(Some(ctx));
1377                None
1378            }
1379        }
1380    }
1381
1382    fn label(&self) -> &'static str {
1383        match self {
1384            PendingRead::Read { .. } => "read",
1385            PendingRead::ReadThenWrite { .. } => "read_then_write",
1386        }
1387    }
1388
1389    pub(crate) fn take_context(self) -> ExecuteContext {
1390        match self {
1391            PendingRead::Read { txn, .. } => txn.ctx,
1392            PendingRead::ReadThenWrite { ctx, tx, .. } => {
1393                // Inform the transaction that we've taken their context.
1394                // Ignore errors if the caller has hung up.
1395                let _ = tx.send(None);
1396                ctx
1397            }
1398        }
1399    }
1400}
1401
1402/// State that the coordinator must process as part of retiring
1403/// command execution.  `ExecuteContextExtra::Default` is guaranteed
1404/// to produce a value that will cause the coordinator to do nothing, and
1405/// is intended for use by code that invokes the execution processing flow
1406/// (i.e., `sequence_plan`) without actually being a statement execution.
1407///
1408/// This is a pure data struct containing only the statement logging ID.
1409/// For auto-retire-on-drop behavior, use `ExecuteContextGuard` which wraps
1410/// this struct and owns the channel for sending retirement messages.
1411#[derive(Debug, Default)]
1412#[must_use]
1413pub struct ExecuteContextExtra {
1414    statement_uuid: Option<StatementLoggingId>,
1415}
1416
1417impl ExecuteContextExtra {
1418    pub(crate) fn new(statement_uuid: Option<StatementLoggingId>) -> Self {
1419        Self { statement_uuid }
1420    }
1421    pub fn is_trivial(&self) -> bool {
1422        self.statement_uuid.is_none()
1423    }
1424    pub fn contents(&self) -> Option<StatementLoggingId> {
1425        self.statement_uuid
1426    }
1427    /// Consume this extra and return the statement UUID for retirement.
1428    /// This should only be called from code that knows what to do to finish
1429    /// up logging based on the inner value.
1430    #[must_use]
1431    pub(crate) fn retire(self) -> Option<StatementLoggingId> {
1432        self.statement_uuid
1433    }
1434}
1435
1436/// A guard that wraps `ExecuteContextExtra` and owns a channel for sending
1437/// retirement messages to the coordinator.
1438///
1439/// If this guard is dropped with a `Some` `statement_uuid` in its inner
1440/// `ExecuteContextExtra`, the `Drop` implementation will automatically send a
1441/// `Message::RetireExecute` to log the statement ending.
1442/// This handles cases like connection drops where the context cannot be
1443/// explicitly retired.
1444/// See <https://github.com/MaterializeInc/database-issues/issues/7304>
1445#[derive(Debug)]
1446#[must_use]
1447pub struct ExecuteContextGuard {
1448    extra: ExecuteContextExtra,
1449    /// Channel for sending messages to the coordinator. Used for auto-retiring on drop.
1450    /// For `Default` instances, this is a dummy sender (receiver already dropped), so
1451    /// sends will fail silently - which is the desired behavior since Default instances
1452    /// should only be used for non-logged statements.
1453    coordinator_tx: mpsc::UnboundedSender<Message>,
1454}
1455
1456impl Default for ExecuteContextGuard {
1457    fn default() -> Self {
1458        // Create a dummy sender by immediately dropping the receiver.
1459        // Any send on this channel will fail silently, which is the desired
1460        // behavior for Default instances (non-logged statements).
1461        let (tx, _rx) = mpsc::unbounded_channel();
1462        Self {
1463            extra: ExecuteContextExtra::default(),
1464            coordinator_tx: tx,
1465        }
1466    }
1467}
1468
1469impl ExecuteContextGuard {
1470    pub(crate) fn new(
1471        statement_uuid: Option<StatementLoggingId>,
1472        coordinator_tx: mpsc::UnboundedSender<Message>,
1473    ) -> Self {
1474        Self {
1475            extra: ExecuteContextExtra::new(statement_uuid),
1476            coordinator_tx,
1477        }
1478    }
1479    pub fn is_trivial(&self) -> bool {
1480        self.extra.is_trivial()
1481    }
1482    pub fn contents(&self) -> Option<StatementLoggingId> {
1483        self.extra.contents()
1484    }
1485    /// Take responsibility for the contents.  This should only be
1486    /// called from code that knows what to do to finish up logging
1487    /// based on the inner value.
1488    ///
1489    /// Returns the inner `ExecuteContextExtra`, consuming the guard without
1490    /// triggering the auto-retire behavior.
1491    pub(crate) fn defuse(mut self) -> ExecuteContextExtra {
1492        // Taking statement_uuid prevents the Drop impl from sending a retire message
1493        std::mem::take(&mut self.extra)
1494    }
1495}
1496
1497impl Drop for ExecuteContextGuard {
1498    fn drop(&mut self) {
1499        if let Some(statement_uuid) = self.extra.statement_uuid.take() {
1500            // Auto-retire since the guard was dropped without explicit retirement (likely due
1501            // to connection drop).
1502            let msg = Message::RetireExecute {
1503                data: ExecuteContextExtra {
1504                    statement_uuid: Some(statement_uuid),
1505                },
1506                otel_ctx: OpenTelemetryContext::obtain(),
1507                reason: StatementEndedExecutionReason::Aborted,
1508            };
1509            // Send may fail for Default instances (dummy sender), which is fine since
1510            // Default instances should only be used for non-logged statements.
1511            let _ = self.coordinator_tx.send(msg);
1512        }
1513    }
1514}
1515
1516/// Bundle of state related to statement execution.
1517///
1518/// This struct collects a bundle of state that needs to be threaded
1519/// through various functions as part of statement execution.
1520/// It is used to finalize execution, by calling `retire`. Finalizing execution
1521/// involves sending the session back to the pgwire layer so that it
1522/// may be used to process further commands. It also involves
1523/// performing some work on the main coordinator thread
1524/// (e.g., recording the time at which the statement finished
1525/// executing). The state necessary to perform this work is bundled in
1526/// the `ExecuteContextGuard` object.
1527#[derive(Debug)]
1528pub struct ExecuteContext {
1529    inner: Box<ExecuteContextInner>,
1530}
1531
1532impl std::ops::Deref for ExecuteContext {
1533    type Target = ExecuteContextInner;
1534    fn deref(&self) -> &Self::Target {
1535        &*self.inner
1536    }
1537}
1538
1539impl std::ops::DerefMut for ExecuteContext {
1540    fn deref_mut(&mut self) -> &mut Self::Target {
1541        &mut *self.inner
1542    }
1543}
1544
1545#[derive(Debug)]
1546pub struct ExecuteContextInner {
1547    tx: ClientTransmitter<ExecuteResponse>,
1548    internal_cmd_tx: mpsc::UnboundedSender<Message>,
1549    session: Session,
1550    extra: ExecuteContextGuard,
1551}
1552
1553impl ExecuteContext {
1554    pub fn session(&self) -> &Session {
1555        &self.session
1556    }
1557
1558    pub fn session_mut(&mut self) -> &mut Session {
1559        &mut self.session
1560    }
1561
1562    pub fn tx(&self) -> &ClientTransmitter<ExecuteResponse> {
1563        &self.tx
1564    }
1565
1566    pub fn tx_mut(&mut self) -> &mut ClientTransmitter<ExecuteResponse> {
1567        &mut self.tx
1568    }
1569
1570    pub fn from_parts(
1571        tx: ClientTransmitter<ExecuteResponse>,
1572        internal_cmd_tx: mpsc::UnboundedSender<Message>,
1573        session: Session,
1574        extra: ExecuteContextGuard,
1575    ) -> Self {
1576        Self {
1577            inner: ExecuteContextInner {
1578                tx,
1579                session,
1580                extra,
1581                internal_cmd_tx,
1582            }
1583            .into(),
1584        }
1585    }
1586
1587    /// By calling this function, the caller takes responsibility for
1588    /// dealing with the instance of `ExecuteContextGuard`. This is
1589    /// intended to support protocols (like `COPY FROM`) that involve
1590    /// multiple passes of sending the session back and forth between
1591    /// the coordinator and the pgwire layer. As part of any such
1592    /// protocol, we must ensure that the `ExecuteContextGuard`
1593    /// (possibly wrapped in a new `ExecuteContext`) is passed back to the coordinator for
1594    /// eventual retirement.
1595    pub fn into_parts(
1596        self,
1597    ) -> (
1598        ClientTransmitter<ExecuteResponse>,
1599        mpsc::UnboundedSender<Message>,
1600        Session,
1601        ExecuteContextGuard,
1602    ) {
1603        let ExecuteContextInner {
1604            tx,
1605            internal_cmd_tx,
1606            session,
1607            extra,
1608        } = *self.inner;
1609        (tx, internal_cmd_tx, session, extra)
1610    }
1611
1612    /// Retire the execution, by sending a message to the coordinator.
1613    #[instrument(level = "debug")]
1614    pub fn retire(self, result: Result<ExecuteResponse, AdapterError>) {
1615        let ExecuteContextInner {
1616            tx,
1617            internal_cmd_tx,
1618            session,
1619            extra,
1620        } = *self.inner;
1621        let reason = if extra.is_trivial() {
1622            None
1623        } else {
1624            Some((&result).into())
1625        };
1626        tx.send(result, session);
1627        if let Some(reason) = reason {
1628            // Retire the guard to get the inner ExecuteContextExtra without triggering auto-retire
1629            let extra = extra.defuse();
1630            if let Err(e) = internal_cmd_tx.send(Message::RetireExecute {
1631                otel_ctx: OpenTelemetryContext::obtain(),
1632                data: extra,
1633                reason,
1634            }) {
1635                warn!("internal_cmd_rx dropped before we could send: {:?}", e);
1636            }
1637        }
1638    }
1639
1640    pub fn extra(&self) -> &ExecuteContextGuard {
1641        &self.extra
1642    }
1643
1644    pub fn extra_mut(&mut self) -> &mut ExecuteContextGuard {
1645        &mut self.extra
1646    }
1647}
1648
1649#[derive(Debug)]
1650struct ClusterReplicaStatuses(
1651    BTreeMap<ClusterId, BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>>>,
1652);
1653
1654impl ClusterReplicaStatuses {
1655    pub(crate) fn new() -> ClusterReplicaStatuses {
1656        ClusterReplicaStatuses(BTreeMap::new())
1657    }
1658
1659    /// Initializes the statuses of the specified cluster.
1660    ///
1661    /// Panics if the cluster statuses are already initialized.
1662    pub(crate) fn initialize_cluster_statuses(&mut self, cluster_id: ClusterId) {
1663        let prev = self.0.insert(cluster_id, BTreeMap::new());
1664        assert_eq!(
1665            prev, None,
1666            "cluster {cluster_id} statuses already initialized"
1667        );
1668    }
1669
1670    /// Initializes the statuses of the specified cluster replica.
1671    ///
1672    /// Panics if the cluster replica statuses are already initialized.
1673    pub(crate) fn initialize_cluster_replica_statuses(
1674        &mut self,
1675        cluster_id: ClusterId,
1676        replica_id: ReplicaId,
1677        num_processes: usize,
1678        time: DateTime<Utc>,
1679    ) {
1680        tracing::info!(
1681            ?cluster_id,
1682            ?replica_id,
1683            ?time,
1684            "initializing cluster replica status"
1685        );
1686        let replica_statuses = self.0.entry(cluster_id).or_default();
1687        let process_statuses = (0..num_processes)
1688            .map(|process_id| {
1689                let status = ClusterReplicaProcessStatus {
1690                    status: ClusterStatus::Offline(Some(OfflineReason::Initializing)),
1691                    time: time.clone(),
1692                };
1693                (u64::cast_from(process_id), status)
1694            })
1695            .collect();
1696        let prev = replica_statuses.insert(replica_id, process_statuses);
1697        assert_none!(
1698            prev,
1699            "cluster replica {cluster_id}.{replica_id} statuses already initialized"
1700        );
1701    }
1702
1703    /// Removes the statuses of the specified cluster.
1704    ///
1705    /// Panics if the cluster does not exist.
1706    pub(crate) fn remove_cluster_statuses(
1707        &mut self,
1708        cluster_id: &ClusterId,
1709    ) -> BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>> {
1710        let prev = self.0.remove(cluster_id);
1711        prev.unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"))
1712    }
1713
1714    /// Removes the statuses of the specified cluster replica.
1715    ///
1716    /// Panics if the cluster or replica does not exist.
1717    pub(crate) fn remove_cluster_replica_statuses(
1718        &mut self,
1719        cluster_id: &ClusterId,
1720        replica_id: &ReplicaId,
1721    ) -> BTreeMap<ProcessId, ClusterReplicaProcessStatus> {
1722        let replica_statuses = self
1723            .0
1724            .get_mut(cluster_id)
1725            .unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"));
1726        let prev = replica_statuses.remove(replica_id);
1727        prev.unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1728    }
1729
1730    /// Inserts or updates the status of the specified cluster replica process.
1731    ///
1732    /// Panics if the cluster or replica does not exist.
1733    pub(crate) fn ensure_cluster_status(
1734        &mut self,
1735        cluster_id: ClusterId,
1736        replica_id: ReplicaId,
1737        process_id: ProcessId,
1738        status: ClusterReplicaProcessStatus,
1739    ) {
1740        let replica_statuses = self
1741            .0
1742            .get_mut(&cluster_id)
1743            .unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"))
1744            .get_mut(&replica_id)
1745            .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"));
1746        replica_statuses.insert(process_id, status);
1747    }
1748
1749    /// Computes the status of the cluster replica as a whole.
1750    ///
1751    /// Panics if `cluster_id` or `replica_id` don't exist.
1752    pub fn get_cluster_replica_status(
1753        &self,
1754        cluster_id: ClusterId,
1755        replica_id: ReplicaId,
1756    ) -> ClusterStatus {
1757        let process_status = self.get_cluster_replica_statuses(cluster_id, replica_id);
1758        Self::cluster_replica_status(process_status)
1759    }
1760
1761    /// Computes the status of the cluster replica as a whole.
1762    pub fn cluster_replica_status(
1763        process_status: &BTreeMap<ProcessId, ClusterReplicaProcessStatus>,
1764    ) -> ClusterStatus {
1765        process_status
1766            .values()
1767            .fold(ClusterStatus::Online, |s, p| match (s, p.status) {
1768                (ClusterStatus::Online, ClusterStatus::Online) => ClusterStatus::Online,
1769                (x, y) => {
1770                    let reason_x = match x {
1771                        ClusterStatus::Offline(reason) => reason,
1772                        ClusterStatus::Online => None,
1773                    };
1774                    let reason_y = match y {
1775                        ClusterStatus::Offline(reason) => reason,
1776                        ClusterStatus::Online => None,
1777                    };
1778                    // Arbitrarily pick the first known not-ready reason.
1779                    ClusterStatus::Offline(reason_x.or(reason_y))
1780                }
1781            })
1782    }
1783
1784    /// Gets the statuses of the given cluster replica.
1785    ///
1786    /// Panics if the cluster or replica does not exist
1787    pub(crate) fn get_cluster_replica_statuses(
1788        &self,
1789        cluster_id: ClusterId,
1790        replica_id: ReplicaId,
1791    ) -> &BTreeMap<ProcessId, ClusterReplicaProcessStatus> {
1792        self.try_get_cluster_replica_statuses(cluster_id, replica_id)
1793            .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1794    }
1795
1796    /// Gets the statuses of the given cluster replica.
1797    pub(crate) fn try_get_cluster_replica_statuses(
1798        &self,
1799        cluster_id: ClusterId,
1800        replica_id: ReplicaId,
1801    ) -> Option<&BTreeMap<ProcessId, ClusterReplicaProcessStatus>> {
1802        self.try_get_cluster_statuses(cluster_id)
1803            .and_then(|statuses| statuses.get(&replica_id))
1804    }
1805
1806    /// Gets the statuses of the given cluster.
1807    pub(crate) fn try_get_cluster_statuses(
1808        &self,
1809        cluster_id: ClusterId,
1810    ) -> Option<&BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>>> {
1811        self.0.get(&cluster_id)
1812    }
1813}
1814
1815/// Glues the external world to the Timely workers.
1816#[derive(Derivative)]
1817#[derivative(Debug)]
1818pub struct Coordinator {
1819    /// The controller for the storage and compute layers.
1820    #[derivative(Debug = "ignore")]
1821    controller: mz_controller::Controller,
1822    /// The catalog in an Arc suitable for readonly references. The Arc allows
1823    /// us to hand out cheap copies of the catalog to functions that can use it
1824    /// off of the main coordinator thread. If the coordinator needs to mutate
1825    /// the catalog, call [`Self::catalog_mut`], which will clone this struct member,
1826    /// allowing it to be mutated here while the other off-thread references can
1827    /// read their catalog as long as needed. In the future we would like this
1828    /// to be a pTVC, but for now this is sufficient.
1829    catalog: Arc<Catalog>,
1830
1831    /// A client for persist. Initially, this is only used for reading stashed
1832    /// peek responses out of batches.
1833    persist_client: PersistClient,
1834
1835    /// Channel to manage internal commands from the coordinator to itself.
1836    internal_cmd_tx: mpsc::UnboundedSender<Message>,
1837    /// Notification that triggers a group commit.
1838    group_commit_tx: appends::GroupCommitNotifier,
1839
1840    /// Channel for strict serializable reads ready to commit.
1841    strict_serializable_reads_tx: mpsc::UnboundedSender<(ConnectionId, PendingReadTxn)>,
1842
1843    /// Mechanism for totally ordering write and read timestamps, so that all reads
1844    /// reflect exactly the set of writes that precede them, and no writes that follow.
1845    global_timelines: BTreeMap<Timeline, TimelineState<Timestamp>>,
1846
1847    /// A generator for transient [`GlobalId`]s, shareable with other threads.
1848    transient_id_gen: Arc<TransientIdGen>,
1849    /// A map from connection ID to metadata about that connection for all
1850    /// active connections.
1851    active_conns: BTreeMap<ConnectionId, ConnMeta>,
1852
1853    /// For each transaction, the read holds taken to support any performed reads.
1854    ///
1855    /// Upon completing a transaction, these read holds should be dropped.
1856    txn_read_holds: BTreeMap<ConnectionId, read_policy::ReadHolds<Timestamp>>,
1857
1858    /// Access to the peek fields should be restricted to methods in the [`peek`] API.
1859    /// A map from pending peek ids to the queue into which responses are sent, and
1860    /// the connection id of the client that initiated the peek.
1861    pending_peeks: BTreeMap<Uuid, PendingPeek>,
1862    /// A map from client connection ids to a set of all pending peeks for that client.
1863    client_pending_peeks: BTreeMap<ConnectionId, BTreeMap<Uuid, ClusterId>>,
1864
1865    /// A map from client connection ids to pending linearize read transaction.
1866    pending_linearize_read_txns: BTreeMap<ConnectionId, PendingReadTxn>,
1867
1868    /// A map from the compute sink ID to it's state description.
1869    active_compute_sinks: BTreeMap<GlobalId, ActiveComputeSink>,
1870    /// A map from active webhooks to their invalidation handle.
1871    active_webhooks: BTreeMap<CatalogItemId, WebhookAppenderInvalidator>,
1872    /// A map of active `COPY FROM` statements. The Coordinator waits for `clusterd`
1873    /// to stage Batches in Persist that we will then link into the shard.
1874    active_copies: BTreeMap<ConnectionId, ActiveCopyFrom>,
1875
1876    /// A map from connection ids to a watch channel that is set to `true` if the connection
1877    /// received a cancel request.
1878    staged_cancellation: BTreeMap<ConnectionId, (watch::Sender<bool>, watch::Receiver<bool>)>,
1879    /// Active introspection subscribes.
1880    introspection_subscribes: BTreeMap<GlobalId, IntrospectionSubscribe>,
1881
1882    /// Locks that grant access to a specific object, populated lazily as objects are written to.
1883    write_locks: BTreeMap<CatalogItemId, Arc<tokio::sync::Mutex<()>>>,
1884    /// Plans that are currently deferred and waiting on a write lock.
1885    deferred_write_ops: BTreeMap<ConnectionId, DeferredOp>,
1886
1887    /// Pending writes waiting for a group commit.
1888    pending_writes: Vec<PendingWriteTxn>,
1889
1890    /// For the realtime timeline, an explicit SELECT or INSERT on a table will bump the
1891    /// table's timestamps, but there are cases where timestamps are not bumped but
1892    /// we expect the closed timestamps to advance (`AS OF X`, SUBSCRIBing views over
1893    /// RT sources and tables). To address these, spawn a task that forces table
1894    /// timestamps to close on a regular interval. This roughly tracks the behavior
1895    /// of realtime sources that close off timestamps on an interval.
1896    ///
1897    /// For non-realtime timelines, nothing pushes the timestamps forward, so we must do
1898    /// it manually.
1899    advance_timelines_interval: Interval,
1900
1901    /// Serialized DDL. DDL must be serialized because:
1902    /// - Many of them do off-thread work and need to verify the catalog is in a valid state, but
1903    ///   [`PlanValidity`] does not currently support tracking all changes. Doing that correctly
1904    ///   seems to be more difficult than it's worth, so we would instead re-plan and re-sequence
1905    ///   the statements.
1906    /// - Re-planning a statement is hard because Coordinator and Session state is mutated at
1907    ///   various points, and we would need to correctly reset those changes before re-planning and
1908    ///   re-sequencing.
1909    serialized_ddl: LockedVecDeque<DeferredPlanStatement>,
1910
1911    /// Handle to secret manager that can create and delete secrets from
1912    /// an arbitrary secret storage engine.
1913    secrets_controller: Arc<dyn SecretsController>,
1914    /// A secrets reader than maintains an in-memory cache, where values have a set TTL.
1915    caching_secrets_reader: CachingSecretsReader,
1916
1917    /// Handle to a manager that can create and delete kubernetes resources
1918    /// (ie: VpcEndpoint objects)
1919    cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
1920
1921    /// Persist client for fetching storage metadata such as size metrics.
1922    storage_usage_client: StorageUsageClient,
1923    /// The interval at which to collect storage usage information.
1924    storage_usage_collection_interval: Duration,
1925
1926    /// Segment analytics client.
1927    #[derivative(Debug = "ignore")]
1928    segment_client: Option<mz_segment::Client>,
1929
1930    /// Coordinator metrics.
1931    metrics: Metrics,
1932    /// Optimizer metrics.
1933    optimizer_metrics: OptimizerMetrics,
1934
1935    /// Tracing handle.
1936    tracing_handle: TracingHandle,
1937
1938    /// Data used by the statement logging feature.
1939    statement_logging: StatementLogging,
1940
1941    /// Limit for how many concurrent webhook requests we allow.
1942    webhook_concurrency_limit: WebhookConcurrencyLimiter,
1943
1944    /// Optional config for the timestamp oracle. This is _required_ when
1945    /// a timestamp oracle backend is configured.
1946    timestamp_oracle_config: Option<TimestampOracleConfig>,
1947
1948    /// Periodically asks cluster scheduling policies to make their decisions.
1949    check_cluster_scheduling_policies_interval: Interval,
1950
1951    /// This keeps the last On/Off decision for each cluster and each scheduling policy.
1952    /// (Clusters that have been dropped or are otherwise out of scope for automatic scheduling are
1953    /// periodically cleaned up from this Map.)
1954    cluster_scheduling_decisions: BTreeMap<ClusterId, BTreeMap<&'static str, SchedulingDecision>>,
1955
1956    /// When doing 0dt upgrades/in read-only mode, periodically ask all known
1957    /// clusters/collections whether they are caught up.
1958    caught_up_check_interval: Interval,
1959
1960    /// Context needed to check whether all clusters/collections have caught up.
1961    /// Only used during 0dt deployment, while in read-only mode.
1962    caught_up_check: Option<CaughtUpCheckContext>,
1963
1964    /// Tracks the state associated with the currently installed watchsets.
1965    installed_watch_sets: BTreeMap<WatchSetId, (ConnectionId, WatchSetResponse)>,
1966
1967    /// Tracks the currently installed watchsets for each connection.
1968    connection_watch_sets: BTreeMap<ConnectionId, BTreeSet<WatchSetId>>,
1969
1970    /// Tracks the statuses of all cluster replicas.
1971    cluster_replica_statuses: ClusterReplicaStatuses,
1972
1973    /// Whether or not to start controllers in read-only mode. This is only
1974    /// meant for use during development of read-only clusters and 0dt upgrades
1975    /// and should go away once we have proper orchestration during upgrades.
1976    read_only_controllers: bool,
1977
1978    /// Updates to builtin tables that are being buffered while we are in
1979    /// read-only mode. We apply these all at once when coming out of read-only
1980    /// mode.
1981    ///
1982    /// This is a `Some` while in read-only mode and will be replaced by a
1983    /// `None` when we transition out of read-only mode and write out any
1984    /// buffered updates.
1985    buffered_builtin_table_updates: Option<Vec<BuiltinTableUpdate>>,
1986
1987    license_key: ValidatedLicenseKey,
1988
1989    /// Pre-allocated pool of user IDs to amortize persist writes across DDL operations.
1990    user_id_pool: IdPool,
1991}
1992
1993impl Coordinator {
1994    /// Initializes coordinator state based on the contained catalog. Must be
1995    /// called after creating the coordinator and before calling the
1996    /// `Coordinator::serve` method.
1997    #[instrument(name = "coord::bootstrap")]
1998    pub(crate) async fn bootstrap(
1999        &mut self,
2000        boot_ts: Timestamp,
2001        migrated_storage_collections_0dt: BTreeSet<CatalogItemId>,
2002        mut builtin_table_updates: Vec<BuiltinTableUpdate>,
2003        cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
2004        uncached_local_exprs: BTreeMap<GlobalId, LocalExpressions>,
2005        audit_logs_iterator: AuditLogIterator,
2006    ) -> Result<(), AdapterError> {
2007        let bootstrap_start = Instant::now();
2008        info!("startup: coordinator init: bootstrap beginning");
2009        info!("startup: coordinator init: bootstrap: preamble beginning");
2010
2011        // Initialize cluster replica statuses.
2012        // Gross iterator is to avoid partial borrow issues.
2013        let cluster_statuses: Vec<(_, Vec<_>)> = self
2014            .catalog()
2015            .clusters()
2016            .map(|cluster| {
2017                (
2018                    cluster.id(),
2019                    cluster
2020                        .replicas()
2021                        .map(|replica| {
2022                            (replica.replica_id, replica.config.location.num_processes())
2023                        })
2024                        .collect(),
2025                )
2026            })
2027            .collect();
2028        let now = self.now_datetime();
2029        for (cluster_id, replica_statuses) in cluster_statuses {
2030            self.cluster_replica_statuses
2031                .initialize_cluster_statuses(cluster_id);
2032            for (replica_id, num_processes) in replica_statuses {
2033                self.cluster_replica_statuses
2034                    .initialize_cluster_replica_statuses(
2035                        cluster_id,
2036                        replica_id,
2037                        num_processes,
2038                        now,
2039                    );
2040            }
2041        }
2042
2043        let system_config = self.catalog().system_config();
2044
2045        // Inform metrics about the initial system configuration.
2046        mz_metrics::update_dyncfg(&system_config.dyncfg_updates());
2047
2048        // Inform the controllers about their initial configuration.
2049        let compute_config = flags::compute_config(system_config);
2050        let storage_config = flags::storage_config(system_config);
2051        let scheduling_config = flags::orchestrator_scheduling_config(system_config);
2052        let dyncfg_updates = system_config.dyncfg_updates();
2053        self.controller.compute.update_configuration(compute_config);
2054        self.controller.storage.update_parameters(storage_config);
2055        self.controller
2056            .update_orchestrator_scheduling_config(scheduling_config);
2057        self.controller.update_configuration(dyncfg_updates);
2058
2059        self.validate_resource_limit_numeric(
2060            Numeric::zero(),
2061            self.current_credit_consumption_rate(),
2062            |system_vars| {
2063                self.license_key
2064                    .max_credit_consumption_rate()
2065                    .map_or_else(|| system_vars.max_credit_consumption_rate(), Numeric::from)
2066            },
2067            "cluster replica",
2068            MAX_CREDIT_CONSUMPTION_RATE.name(),
2069        )?;
2070
2071        let mut policies_to_set: BTreeMap<CompactionWindow, CollectionIdBundle> =
2072            Default::default();
2073
2074        let enable_worker_core_affinity =
2075            self.catalog().system_config().enable_worker_core_affinity();
2076        for instance in self.catalog.clusters() {
2077            self.controller.create_cluster(
2078                instance.id,
2079                ClusterConfig {
2080                    arranged_logs: instance.log_indexes.clone(),
2081                    workload_class: instance.config.workload_class.clone(),
2082                },
2083            )?;
2084            for replica in instance.replicas() {
2085                let role = instance.role();
2086                self.controller.create_replica(
2087                    instance.id,
2088                    replica.replica_id,
2089                    instance.name.clone(),
2090                    replica.name.clone(),
2091                    role,
2092                    replica.config.clone(),
2093                    enable_worker_core_affinity,
2094                )?;
2095            }
2096        }
2097
2098        info!(
2099            "startup: coordinator init: bootstrap: preamble complete in {:?}",
2100            bootstrap_start.elapsed()
2101        );
2102
2103        let init_storage_collections_start = Instant::now();
2104        info!("startup: coordinator init: bootstrap: storage collections init beginning");
2105        self.bootstrap_storage_collections(&migrated_storage_collections_0dt)
2106            .await;
2107        info!(
2108            "startup: coordinator init: bootstrap: storage collections init complete in {:?}",
2109            init_storage_collections_start.elapsed()
2110        );
2111
2112        // The storage controller knows about the introspection collections now, so we can start
2113        // sinking introspection updates in the compute controller. It makes sense to do that as
2114        // soon as possible, to avoid updates piling up in the compute controller's internal
2115        // buffers.
2116        self.controller.start_compute_introspection_sink();
2117
2118        let sorting_start = Instant::now();
2119        info!("startup: coordinator init: bootstrap: sorting catalog entries");
2120        let entries = self.bootstrap_sort_catalog_entries();
2121        info!(
2122            "startup: coordinator init: bootstrap: sorting catalog entries complete in {:?}",
2123            sorting_start.elapsed()
2124        );
2125
2126        let optimize_dataflows_start = Instant::now();
2127        info!("startup: coordinator init: bootstrap: optimize dataflow plans beginning");
2128        let uncached_global_exps = self.bootstrap_dataflow_plans(&entries, cached_global_exprs)?;
2129        info!(
2130            "startup: coordinator init: bootstrap: optimize dataflow plans complete in {:?}",
2131            optimize_dataflows_start.elapsed()
2132        );
2133
2134        // We don't need to wait for the cache to update.
2135        let _fut = self.catalog().update_expression_cache(
2136            uncached_local_exprs.into_iter().collect(),
2137            uncached_global_exps.into_iter().collect(),
2138            Default::default(),
2139        );
2140
2141        // Select dataflow as-ofs. This step relies on the storage collections created by
2142        // `bootstrap_storage_collections` and the dataflow plans created by
2143        // `bootstrap_dataflow_plans`.
2144        let bootstrap_as_ofs_start = Instant::now();
2145        info!("startup: coordinator init: bootstrap: dataflow as-of bootstrapping beginning");
2146        let dataflow_read_holds = self.bootstrap_dataflow_as_ofs().await;
2147        info!(
2148            "startup: coordinator init: bootstrap: dataflow as-of bootstrapping complete in {:?}",
2149            bootstrap_as_ofs_start.elapsed()
2150        );
2151
2152        let postamble_start = Instant::now();
2153        info!("startup: coordinator init: bootstrap: postamble beginning");
2154
2155        let logs: BTreeSet<_> = BUILTINS::logs()
2156            .map(|log| self.catalog().resolve_builtin_log(log))
2157            .flat_map(|item_id| self.catalog().get_global_ids(&item_id))
2158            .collect();
2159
2160        let mut privatelink_connections = BTreeMap::new();
2161
2162        for entry in &entries {
2163            debug!(
2164                "coordinator init: installing {} {}",
2165                entry.item().typ(),
2166                entry.id()
2167            );
2168            let mut policy = entry.item().initial_logical_compaction_window();
2169            match entry.item() {
2170                // Currently catalog item rebuild assumes that sinks and
2171                // indexes are always built individually and does not store information
2172                // about how it was built. If we start building multiple sinks and/or indexes
2173                // using a single dataflow, we have to make sure the rebuild process re-runs
2174                // the same multiple-build dataflow.
2175                CatalogItem::Source(source) => {
2176                    // Propagate source compaction windows to subsources if needed.
2177                    if source.custom_logical_compaction_window.is_none() {
2178                        if let DataSourceDesc::IngestionExport { ingestion_id, .. } =
2179                            source.data_source
2180                        {
2181                            policy = Some(
2182                                self.catalog()
2183                                    .get_entry(&ingestion_id)
2184                                    .source()
2185                                    .expect("must be source")
2186                                    .custom_logical_compaction_window
2187                                    .unwrap_or_default(),
2188                            );
2189                        }
2190                    }
2191                    policies_to_set
2192                        .entry(policy.expect("sources have a compaction window"))
2193                        .or_insert_with(Default::default)
2194                        .storage_ids
2195                        .insert(source.global_id());
2196                }
2197                CatalogItem::Table(table) => {
2198                    policies_to_set
2199                        .entry(policy.expect("tables have a compaction window"))
2200                        .or_insert_with(Default::default)
2201                        .storage_ids
2202                        .extend(table.global_ids());
2203                }
2204                CatalogItem::Index(idx) => {
2205                    let policy_entry = policies_to_set
2206                        .entry(policy.expect("indexes have a compaction window"))
2207                        .or_insert_with(Default::default);
2208
2209                    if logs.contains(&idx.on) {
2210                        policy_entry
2211                            .compute_ids
2212                            .entry(idx.cluster_id)
2213                            .or_insert_with(BTreeSet::new)
2214                            .insert(idx.global_id());
2215                    } else {
2216                        let df_desc = self
2217                            .catalog()
2218                            .try_get_physical_plan(&idx.global_id())
2219                            .expect("added in `bootstrap_dataflow_plans`")
2220                            .clone();
2221
2222                        let df_meta = self
2223                            .catalog()
2224                            .try_get_dataflow_metainfo(&idx.global_id())
2225                            .expect("added in `bootstrap_dataflow_plans`");
2226
2227                        if self.catalog().state().system_config().enable_mz_notices() {
2228                            // Collect optimization hint updates.
2229                            self.catalog().state().pack_optimizer_notices(
2230                                &mut builtin_table_updates,
2231                                df_meta.optimizer_notices.iter(),
2232                                Diff::ONE,
2233                            );
2234                        }
2235
2236                        // What follows is morally equivalent to `self.ship_dataflow(df, idx.cluster_id)`,
2237                        // but we cannot call that as it will also downgrade the read hold on the index.
2238                        policy_entry
2239                            .compute_ids
2240                            .entry(idx.cluster_id)
2241                            .or_insert_with(Default::default)
2242                            .extend(df_desc.export_ids());
2243
2244                        self.controller
2245                            .compute
2246                            .create_dataflow(idx.cluster_id, df_desc, None)
2247                            .unwrap_or_terminate("cannot fail to create dataflows");
2248                    }
2249                }
2250                CatalogItem::View(_) => (),
2251                CatalogItem::MaterializedView(mview) => {
2252                    policies_to_set
2253                        .entry(policy.expect("materialized views have a compaction window"))
2254                        .or_insert_with(Default::default)
2255                        .storage_ids
2256                        .insert(mview.global_id_writes());
2257
2258                    let mut df_desc = self
2259                        .catalog()
2260                        .try_get_physical_plan(&mview.global_id_writes())
2261                        .expect("added in `bootstrap_dataflow_plans`")
2262                        .clone();
2263
2264                    if let Some(initial_as_of) = mview.initial_as_of.clone() {
2265                        df_desc.set_initial_as_of(initial_as_of);
2266                    }
2267
2268                    // If we have a refresh schedule that has a last refresh, then set the `until` to the last refresh.
2269                    let until = mview
2270                        .refresh_schedule
2271                        .as_ref()
2272                        .and_then(|s| s.last_refresh())
2273                        .and_then(|r| r.try_step_forward());
2274                    if let Some(until) = until {
2275                        df_desc.until.meet_assign(&Antichain::from_elem(until));
2276                    }
2277
2278                    let df_meta = self
2279                        .catalog()
2280                        .try_get_dataflow_metainfo(&mview.global_id_writes())
2281                        .expect("added in `bootstrap_dataflow_plans`");
2282
2283                    if self.catalog().state().system_config().enable_mz_notices() {
2284                        // Collect optimization hint updates.
2285                        self.catalog().state().pack_optimizer_notices(
2286                            &mut builtin_table_updates,
2287                            df_meta.optimizer_notices.iter(),
2288                            Diff::ONE,
2289                        );
2290                    }
2291
2292                    self.ship_dataflow(df_desc, mview.cluster_id, mview.target_replica)
2293                        .await;
2294
2295                    // If this is a replacement MV, it must remain read-only until the replacement
2296                    // gets applied.
2297                    if mview.replacement_target.is_none() {
2298                        self.allow_writes(mview.cluster_id, mview.global_id_writes());
2299                    }
2300                }
2301                CatalogItem::Sink(sink) => {
2302                    policies_to_set
2303                        .entry(CompactionWindow::Default)
2304                        .or_insert_with(Default::default)
2305                        .storage_ids
2306                        .insert(sink.global_id());
2307                }
2308                CatalogItem::Connection(catalog_connection) => {
2309                    if let ConnectionDetails::AwsPrivatelink(conn) = &catalog_connection.details {
2310                        privatelink_connections.insert(
2311                            entry.id(),
2312                            VpcEndpointConfig {
2313                                aws_service_name: conn.service_name.clone(),
2314                                availability_zone_ids: conn.availability_zones.clone(),
2315                            },
2316                        );
2317                    }
2318                }
2319                CatalogItem::ContinualTask(ct) => {
2320                    policies_to_set
2321                        .entry(policy.expect("continual tasks have a compaction window"))
2322                        .or_insert_with(Default::default)
2323                        .storage_ids
2324                        .insert(ct.global_id());
2325
2326                    let mut df_desc = self
2327                        .catalog()
2328                        .try_get_physical_plan(&ct.global_id())
2329                        .expect("added in `bootstrap_dataflow_plans`")
2330                        .clone();
2331
2332                    if let Some(initial_as_of) = ct.initial_as_of.clone() {
2333                        df_desc.set_initial_as_of(initial_as_of);
2334                    }
2335
2336                    let df_meta = self
2337                        .catalog()
2338                        .try_get_dataflow_metainfo(&ct.global_id())
2339                        .expect("added in `bootstrap_dataflow_plans`");
2340
2341                    if self.catalog().state().system_config().enable_mz_notices() {
2342                        // Collect optimization hint updates.
2343                        self.catalog().state().pack_optimizer_notices(
2344                            &mut builtin_table_updates,
2345                            df_meta.optimizer_notices.iter(),
2346                            Diff::ONE,
2347                        );
2348                    }
2349
2350                    self.ship_dataflow(df_desc, ct.cluster_id, None).await;
2351                    self.allow_writes(ct.cluster_id, ct.global_id());
2352                }
2353                // Nothing to do for these cases
2354                CatalogItem::Log(_)
2355                | CatalogItem::Type(_)
2356                | CatalogItem::Func(_)
2357                | CatalogItem::Secret(_) => {}
2358            }
2359        }
2360
2361        if let Some(cloud_resource_controller) = &self.cloud_resource_controller {
2362            // Clean up any extraneous VpcEndpoints that shouldn't exist.
2363            let existing_vpc_endpoints = cloud_resource_controller
2364                .list_vpc_endpoints()
2365                .await
2366                .context("list vpc endpoints")?;
2367            let existing_vpc_endpoints = BTreeSet::from_iter(existing_vpc_endpoints.into_keys());
2368            let desired_vpc_endpoints = privatelink_connections.keys().cloned().collect();
2369            let vpc_endpoints_to_remove = existing_vpc_endpoints.difference(&desired_vpc_endpoints);
2370            for id in vpc_endpoints_to_remove {
2371                cloud_resource_controller
2372                    .delete_vpc_endpoint(*id)
2373                    .await
2374                    .context("deleting extraneous vpc endpoint")?;
2375            }
2376
2377            // Ensure desired VpcEndpoints are up to date.
2378            for (id, spec) in privatelink_connections {
2379                cloud_resource_controller
2380                    .ensure_vpc_endpoint(id, spec)
2381                    .await
2382                    .context("ensuring vpc endpoint")?;
2383            }
2384        }
2385
2386        // Having installed all entries, creating all constraints, we can now drop read holds and
2387        // relax read policies.
2388        drop(dataflow_read_holds);
2389        // TODO -- Improve `initialize_read_policies` API so we can avoid calling this in a loop.
2390        for (cw, policies) in policies_to_set {
2391            self.initialize_read_policies(&policies, cw).await;
2392        }
2393
2394        // Expose mapping from T-shirt sizes to actual sizes
2395        builtin_table_updates.extend(
2396            self.catalog().state().resolve_builtin_table_updates(
2397                self.catalog().state().pack_all_replica_size_updates(),
2398            ),
2399        );
2400
2401        debug!("startup: coordinator init: bootstrap: initializing migrated builtin tables");
2402        // When 0dt is enabled, we create new shards for any migrated builtin storage collections.
2403        // In read-only mode, the migrated builtin tables (which are a subset of migrated builtin
2404        // storage collections) need to be back-filled so that any dependent dataflow can be
2405        // hydrated. Additionally, these shards are not registered with the txn-shard, and cannot
2406        // be registered while in read-only, so they are written to directly.
2407        let migrated_updates_fut = if self.controller.read_only() {
2408            let min_timestamp = Timestamp::minimum();
2409            let migrated_builtin_table_updates: Vec<_> = builtin_table_updates
2410                .extract_if(.., |update| {
2411                    let gid = self.catalog().get_entry(&update.id).latest_global_id();
2412                    migrated_storage_collections_0dt.contains(&update.id)
2413                        && self
2414                            .controller
2415                            .storage_collections
2416                            .collection_frontiers(gid)
2417                            .expect("all tables are registered")
2418                            .write_frontier
2419                            .elements()
2420                            == &[min_timestamp]
2421                })
2422                .collect();
2423            if migrated_builtin_table_updates.is_empty() {
2424                futures::future::ready(()).boxed()
2425            } else {
2426                // Group all updates per-table.
2427                let mut grouped_appends: BTreeMap<GlobalId, Vec<TableData>> = BTreeMap::new();
2428                for update in migrated_builtin_table_updates {
2429                    let gid = self.catalog().get_entry(&update.id).latest_global_id();
2430                    grouped_appends.entry(gid).or_default().push(update.data);
2431                }
2432                info!(
2433                    "coordinator init: rehydrating migrated builtin tables in read-only mode: {:?}",
2434                    grouped_appends.keys().collect::<Vec<_>>()
2435                );
2436
2437                // Consolidate Row data, staged batches must already be consolidated.
2438                let mut all_appends = Vec::with_capacity(grouped_appends.len());
2439                for (item_id, table_data) in grouped_appends.into_iter() {
2440                    let mut all_rows = Vec::new();
2441                    let mut all_data = Vec::new();
2442                    for data in table_data {
2443                        match data {
2444                            TableData::Rows(rows) => all_rows.extend(rows),
2445                            TableData::Batches(_) => all_data.push(data),
2446                        }
2447                    }
2448                    differential_dataflow::consolidation::consolidate(&mut all_rows);
2449                    all_data.push(TableData::Rows(all_rows));
2450
2451                    // TODO(parkmycar): Use SmallVec throughout.
2452                    all_appends.push((item_id, all_data));
2453                }
2454
2455                let fut = self
2456                    .controller
2457                    .storage
2458                    .append_table(min_timestamp, boot_ts.step_forward(), all_appends)
2459                    .expect("cannot fail to append");
2460                async {
2461                    fut.await
2462                        .expect("One-shot shouldn't be dropped during bootstrap")
2463                        .unwrap_or_terminate("cannot fail to append")
2464                }
2465                .boxed()
2466            }
2467        } else {
2468            futures::future::ready(()).boxed()
2469        };
2470
2471        info!(
2472            "startup: coordinator init: bootstrap: postamble complete in {:?}",
2473            postamble_start.elapsed()
2474        );
2475
2476        let builtin_update_start = Instant::now();
2477        info!("startup: coordinator init: bootstrap: generate builtin updates beginning");
2478
2479        if self.controller.read_only() {
2480            info!(
2481                "coordinator init: bootstrap: stashing builtin table updates while in read-only mode"
2482            );
2483
2484            // TODO(jkosh44) Optimize deserializing the audit log in read-only mode.
2485            let audit_join_start = Instant::now();
2486            info!("startup: coordinator init: bootstrap: audit log deserialization beginning");
2487            let audit_log_updates: Vec<_> = audit_logs_iterator
2488                .map(|(audit_log, ts)| StateUpdate {
2489                    kind: StateUpdateKind::AuditLog(audit_log),
2490                    ts,
2491                    diff: StateDiff::Addition,
2492                })
2493                .collect();
2494            let audit_log_builtin_table_updates = self
2495                .catalog()
2496                .state()
2497                .generate_builtin_table_updates(audit_log_updates);
2498            builtin_table_updates.extend(audit_log_builtin_table_updates);
2499            info!(
2500                "startup: coordinator init: bootstrap: audit log deserialization complete in {:?}",
2501                audit_join_start.elapsed()
2502            );
2503            self.buffered_builtin_table_updates
2504                .as_mut()
2505                .expect("in read-only mode")
2506                .append(&mut builtin_table_updates);
2507        } else {
2508            self.bootstrap_tables(&entries, builtin_table_updates, audit_logs_iterator)
2509                .await;
2510        };
2511        info!(
2512            "startup: coordinator init: bootstrap: generate builtin updates complete in {:?}",
2513            builtin_update_start.elapsed()
2514        );
2515
2516        let cleanup_secrets_start = Instant::now();
2517        info!("startup: coordinator init: bootstrap: generate secret cleanup beginning");
2518        // Cleanup orphaned secrets. Errors during list() or delete() do not
2519        // need to prevent bootstrap from succeeding; we will retry next
2520        // startup.
2521        {
2522            // Destructure Self so we can selectively move fields into the async
2523            // task.
2524            let Self {
2525                secrets_controller,
2526                catalog,
2527                ..
2528            } = self;
2529
2530            let next_user_item_id = catalog.get_next_user_item_id().await?;
2531            let next_system_item_id = catalog.get_next_system_item_id().await?;
2532            let read_only = self.controller.read_only();
2533            // Fetch all IDs from the catalog to future-proof against other
2534            // things using secrets. Today, SECRET and CONNECTION objects use
2535            // secrets_controller.ensure, but more things could in the future
2536            // that would be easy to miss adding here.
2537            let catalog_ids: BTreeSet<CatalogItemId> =
2538                catalog.entries().map(|entry| entry.id()).collect();
2539            let secrets_controller = Arc::clone(secrets_controller);
2540
2541            spawn(|| "cleanup-orphaned-secrets", async move {
2542                if read_only {
2543                    info!(
2544                        "coordinator init: not cleaning up orphaned secrets while in read-only mode"
2545                    );
2546                    return;
2547                }
2548                info!("coordinator init: cleaning up orphaned secrets");
2549
2550                match secrets_controller.list().await {
2551                    Ok(controller_secrets) => {
2552                        let controller_secrets: BTreeSet<CatalogItemId> =
2553                            controller_secrets.into_iter().collect();
2554                        let orphaned = controller_secrets.difference(&catalog_ids);
2555                        for id in orphaned {
2556                            let id_too_large = match id {
2557                                CatalogItemId::System(id) => *id >= next_system_item_id,
2558                                CatalogItemId::User(id) => *id >= next_user_item_id,
2559                                CatalogItemId::IntrospectionSourceIndex(_)
2560                                | CatalogItemId::Transient(_) => false,
2561                            };
2562                            if id_too_large {
2563                                info!(
2564                                    %next_user_item_id, %next_system_item_id,
2565                                    "coordinator init: not deleting orphaned secret {id} that was likely created by a newer deploy generation"
2566                                );
2567                            } else {
2568                                info!("coordinator init: deleting orphaned secret {id}");
2569                                fail_point!("orphan_secrets");
2570                                if let Err(e) = secrets_controller.delete(*id).await {
2571                                    warn!(
2572                                        "Dropping orphaned secret has encountered an error: {}",
2573                                        e
2574                                    );
2575                                }
2576                            }
2577                        }
2578                    }
2579                    Err(e) => warn!("Failed to list secrets during orphan cleanup: {:?}", e),
2580                }
2581            });
2582        }
2583        info!(
2584            "startup: coordinator init: bootstrap: generate secret cleanup complete in {:?}",
2585            cleanup_secrets_start.elapsed()
2586        );
2587
2588        // Run all of our final steps concurrently.
2589        let final_steps_start = Instant::now();
2590        info!(
2591            "startup: coordinator init: bootstrap: migrate builtin tables in read-only mode beginning"
2592        );
2593        migrated_updates_fut
2594            .instrument(info_span!("coord::bootstrap::final"))
2595            .await;
2596
2597        debug!(
2598            "startup: coordinator init: bootstrap: announcing completion of initialization to controller"
2599        );
2600        // Announce the completion of initialization.
2601        self.controller.initialization_complete();
2602
2603        // Initialize unified introspection.
2604        self.bootstrap_introspection_subscribes().await;
2605
2606        info!(
2607            "startup: coordinator init: bootstrap: migrate builtin tables in read-only mode complete in {:?}",
2608            final_steps_start.elapsed()
2609        );
2610
2611        info!(
2612            "startup: coordinator init: bootstrap complete in {:?}",
2613            bootstrap_start.elapsed()
2614        );
2615        Ok(())
2616    }
2617
2618    /// Prepares tables for writing by resetting them to a known state and
2619    /// appending the given builtin table updates. The timestamp oracle
2620    /// will be advanced to the write timestamp of the append when this
2621    /// method returns.
2622    #[allow(clippy::async_yields_async)]
2623    #[instrument]
2624    async fn bootstrap_tables(
2625        &mut self,
2626        entries: &[CatalogEntry],
2627        mut builtin_table_updates: Vec<BuiltinTableUpdate>,
2628        audit_logs_iterator: AuditLogIterator,
2629    ) {
2630        /// Smaller helper struct of metadata for bootstrapping tables.
2631        struct TableMetadata<'a> {
2632            id: CatalogItemId,
2633            name: &'a QualifiedItemName,
2634            table: &'a Table,
2635        }
2636
2637        // Filter our entries down to just tables.
2638        let table_metas: Vec<_> = entries
2639            .into_iter()
2640            .filter_map(|entry| {
2641                entry.table().map(|table| TableMetadata {
2642                    id: entry.id(),
2643                    name: entry.name(),
2644                    table,
2645                })
2646            })
2647            .collect();
2648
2649        // Append empty batches to advance the timestamp of all tables.
2650        debug!("coordinator init: advancing all tables to current timestamp");
2651        let WriteTimestamp {
2652            timestamp: write_ts,
2653            advance_to,
2654        } = self.get_local_write_ts().await;
2655        let appends = table_metas
2656            .iter()
2657            .map(|meta| (meta.table.global_id_writes(), Vec::new()))
2658            .collect();
2659        // Append the tables in the background. We apply the write timestamp before getting a read
2660        // timestamp and reading a snapshot of each table, so the snapshots will block on their own
2661        // until the appends are complete.
2662        let table_fence_rx = self
2663            .controller
2664            .storage
2665            .append_table(write_ts.clone(), advance_to, appends)
2666            .expect("invalid updates");
2667
2668        self.apply_local_write(write_ts).await;
2669
2670        // Add builtin table updates the clear the contents of all system tables
2671        debug!("coordinator init: resetting system tables");
2672        let read_ts = self.get_local_read_ts().await;
2673
2674        // Filter out the 'mz_storage_usage_by_shard' table since we need to retain that info for
2675        // billing purposes.
2676        let mz_storage_usage_by_shard_schema: SchemaSpecifier = self
2677            .catalog()
2678            .resolve_system_schema(MZ_STORAGE_USAGE_BY_SHARD.schema)
2679            .into();
2680        let is_storage_usage_by_shard = |meta: &TableMetadata| -> bool {
2681            meta.name.item == MZ_STORAGE_USAGE_BY_SHARD.name
2682                && meta.name.qualifiers.schema_spec == mz_storage_usage_by_shard_schema
2683        };
2684
2685        let mut retraction_tasks = Vec::new();
2686        let mut system_tables: Vec<_> = table_metas
2687            .iter()
2688            .filter(|meta| meta.id.is_system() && !is_storage_usage_by_shard(meta))
2689            .collect();
2690
2691        // Special case audit events because it's append only.
2692        let (audit_events_idx, _) = system_tables
2693            .iter()
2694            .find_position(|table| {
2695                table.id == self.catalog().resolve_builtin_table(&MZ_AUDIT_EVENTS)
2696            })
2697            .expect("mz_audit_events must exist");
2698        let audit_events = system_tables.remove(audit_events_idx);
2699        let audit_log_task = self.bootstrap_audit_log_table(
2700            audit_events.id,
2701            audit_events.name,
2702            audit_events.table,
2703            audit_logs_iterator,
2704            read_ts,
2705        );
2706
2707        for system_table in system_tables {
2708            let table_id = system_table.id;
2709            let full_name = self.catalog().resolve_full_name(system_table.name, None);
2710            debug!("coordinator init: resetting system table {full_name} ({table_id})");
2711
2712            // Fetch the current contents of the table for retraction.
2713            let snapshot_fut = self
2714                .controller
2715                .storage_collections
2716                .snapshot_cursor(system_table.table.global_id_writes(), read_ts);
2717            let batch_fut = self
2718                .controller
2719                .storage_collections
2720                .create_update_builder(system_table.table.global_id_writes());
2721
2722            let task = spawn(|| format!("snapshot-{table_id}"), async move {
2723                // Create a TimestamplessUpdateBuilder.
2724                let mut batch = batch_fut
2725                    .await
2726                    .unwrap_or_terminate("cannot fail to create a batch for a BuiltinTable");
2727                tracing::info!(?table_id, "starting snapshot");
2728                // Get a cursor which will emit a consolidated snapshot.
2729                let mut snapshot_cursor = snapshot_fut
2730                    .await
2731                    .unwrap_or_terminate("cannot fail to snapshot");
2732
2733                // Retract the current contents, spilling into our builder.
2734                while let Some(values) = snapshot_cursor.next().await {
2735                    for (key, _t, d) in values {
2736                        let d_invert = d.neg();
2737                        batch.add(&key, &(), &d_invert).await;
2738                    }
2739                }
2740                tracing::info!(?table_id, "finished snapshot");
2741
2742                let batch = batch.finish().await;
2743                BuiltinTableUpdate::batch(table_id, batch)
2744            });
2745            retraction_tasks.push(task);
2746        }
2747
2748        let retractions_res = futures::future::join_all(retraction_tasks).await;
2749        for retractions in retractions_res {
2750            builtin_table_updates.push(retractions);
2751        }
2752
2753        let audit_join_start = Instant::now();
2754        info!("startup: coordinator init: bootstrap: join audit log deserialization beginning");
2755        let audit_log_updates = audit_log_task.await;
2756        let audit_log_builtin_table_updates = self
2757            .catalog()
2758            .state()
2759            .generate_builtin_table_updates(audit_log_updates);
2760        builtin_table_updates.extend(audit_log_builtin_table_updates);
2761        info!(
2762            "startup: coordinator init: bootstrap: join audit log deserialization complete in {:?}",
2763            audit_join_start.elapsed()
2764        );
2765
2766        // Now that the snapshots are complete, the appends must also be complete.
2767        table_fence_rx
2768            .await
2769            .expect("One-shot shouldn't be dropped during bootstrap")
2770            .unwrap_or_terminate("cannot fail to append");
2771
2772        info!("coordinator init: sending builtin table updates");
2773        let (_builtin_updates_fut, write_ts) = self
2774            .builtin_table_update()
2775            .execute(builtin_table_updates)
2776            .await;
2777        info!(?write_ts, "our write ts");
2778        if let Some(write_ts) = write_ts {
2779            self.apply_local_write(write_ts).await;
2780        }
2781    }
2782
2783    /// Prepare updates to the audit log table. The audit log table append only and very large, so
2784    /// we only need to find the events present in `audit_logs_iterator` but not in the audit log
2785    /// table.
2786    #[instrument]
2787    fn bootstrap_audit_log_table<'a>(
2788        &self,
2789        table_id: CatalogItemId,
2790        name: &'a QualifiedItemName,
2791        table: &'a Table,
2792        audit_logs_iterator: AuditLogIterator,
2793        read_ts: Timestamp,
2794    ) -> JoinHandle<Vec<StateUpdate>> {
2795        let full_name = self.catalog().resolve_full_name(name, None);
2796        debug!("coordinator init: reconciling audit log: {full_name} ({table_id})");
2797        let current_contents_fut = self
2798            .controller
2799            .storage_collections
2800            .snapshot(table.global_id_writes(), read_ts);
2801        spawn(|| format!("snapshot-audit-log-{table_id}"), async move {
2802            let current_contents = current_contents_fut
2803                .await
2804                .unwrap_or_terminate("cannot fail to fetch snapshot");
2805            let contents_len = current_contents.len();
2806            debug!("coordinator init: audit log table ({table_id}) size {contents_len}");
2807
2808            // Fetch the largest audit log event ID that has been written to the table.
2809            let max_table_id = current_contents
2810                .into_iter()
2811                .filter(|(_, diff)| *diff == 1)
2812                .map(|(row, _diff)| row.unpack_first().unwrap_uint64())
2813                .sorted()
2814                .rev()
2815                .next();
2816
2817            // Filter audit log catalog updates to those that are not present in the table.
2818            audit_logs_iterator
2819                .take_while(|(audit_log, _)| match max_table_id {
2820                    Some(id) => audit_log.event.sortable_id() > id,
2821                    None => true,
2822                })
2823                .map(|(audit_log, ts)| StateUpdate {
2824                    kind: StateUpdateKind::AuditLog(audit_log),
2825                    ts,
2826                    diff: StateDiff::Addition,
2827                })
2828                .collect::<Vec<_>>()
2829        })
2830    }
2831
2832    /// Initializes all storage collections required by catalog objects in the storage controller.
2833    ///
2834    /// This method takes care of collection creation, as well as migration of existing
2835    /// collections.
2836    ///
2837    /// Creating all storage collections in a single `create_collections` call, rather than on
2838    /// demand, is more efficient as it reduces the number of writes to durable storage. It also
2839    /// allows subsequent bootstrap logic to fetch metadata (such as frontiers) of arbitrary
2840    /// storage collections, without needing to worry about dependency order.
2841    ///
2842    /// `migrated_storage_collections` is a set of builtin storage collections that have been
2843    /// migrated and should be handled specially.
2844    #[instrument]
2845    async fn bootstrap_storage_collections(
2846        &mut self,
2847        migrated_storage_collections: &BTreeSet<CatalogItemId>,
2848    ) {
2849        let catalog = self.catalog();
2850
2851        let source_desc = |object_id: GlobalId,
2852                           data_source: &DataSourceDesc,
2853                           desc: &RelationDesc,
2854                           timeline: &Timeline| {
2855            let data_source = match data_source.clone() {
2856                // Re-announce the source description.
2857                DataSourceDesc::Ingestion { desc, cluster_id } => {
2858                    let desc = desc.into_inline_connection(catalog.state());
2859                    let ingestion = IngestionDescription::new(desc, cluster_id, object_id);
2860                    DataSource::Ingestion(ingestion)
2861                }
2862                DataSourceDesc::OldSyntaxIngestion {
2863                    desc,
2864                    progress_subsource,
2865                    data_config,
2866                    details,
2867                    cluster_id,
2868                } => {
2869                    let desc = desc.into_inline_connection(catalog.state());
2870                    let data_config = data_config.into_inline_connection(catalog.state());
2871                    // TODO(parkmycar): We should probably check the type here, but I'm not sure if
2872                    // this will always be a Source or a Table.
2873                    let progress_subsource =
2874                        catalog.get_entry(&progress_subsource).latest_global_id();
2875                    let mut ingestion =
2876                        IngestionDescription::new(desc, cluster_id, progress_subsource);
2877                    let legacy_export = SourceExport {
2878                        storage_metadata: (),
2879                        data_config,
2880                        details,
2881                    };
2882                    ingestion.source_exports.insert(object_id, legacy_export);
2883
2884                    DataSource::Ingestion(ingestion)
2885                }
2886                DataSourceDesc::IngestionExport {
2887                    ingestion_id,
2888                    external_reference: _,
2889                    details,
2890                    data_config,
2891                } => {
2892                    // TODO(parkmycar): We should probably check the type here, but I'm not sure if
2893                    // this will always be a Source or a Table.
2894                    let ingestion_id = catalog.get_entry(&ingestion_id).latest_global_id();
2895
2896                    DataSource::IngestionExport {
2897                        ingestion_id,
2898                        details,
2899                        data_config: data_config.into_inline_connection(catalog.state()),
2900                    }
2901                }
2902                DataSourceDesc::Webhook { .. } => DataSource::Webhook,
2903                DataSourceDesc::Progress => DataSource::Progress,
2904                DataSourceDesc::Introspection(introspection) => {
2905                    DataSource::Introspection(introspection)
2906                }
2907                DataSourceDesc::Catalog => DataSource::Other,
2908            };
2909            CollectionDescription {
2910                desc: desc.clone(),
2911                data_source,
2912                since: None,
2913                timeline: Some(timeline.clone()),
2914                primary: None,
2915            }
2916        };
2917
2918        let mut compute_collections = vec![];
2919        let mut collections = vec![];
2920        for entry in catalog.entries() {
2921            match entry.item() {
2922                CatalogItem::Source(source) => {
2923                    collections.push((
2924                        source.global_id(),
2925                        source_desc(
2926                            source.global_id(),
2927                            &source.data_source,
2928                            &source.desc,
2929                            &source.timeline,
2930                        ),
2931                    ));
2932                }
2933                CatalogItem::Table(table) => {
2934                    match &table.data_source {
2935                        TableDataSource::TableWrites { defaults: _ } => {
2936                            let versions: BTreeMap<_, _> = table
2937                                .collection_descs()
2938                                .map(|(gid, version, desc)| (version, (gid, desc)))
2939                                .collect();
2940                            let collection_descs = versions.iter().map(|(version, (gid, desc))| {
2941                                let next_version = version.bump();
2942                                let primary_collection =
2943                                    versions.get(&next_version).map(|(gid, _desc)| gid).copied();
2944                                let mut collection_desc =
2945                                    CollectionDescription::for_table(desc.clone());
2946                                collection_desc.primary = primary_collection;
2947
2948                                (*gid, collection_desc)
2949                            });
2950                            collections.extend(collection_descs);
2951                        }
2952                        TableDataSource::DataSource {
2953                            desc: data_source_desc,
2954                            timeline,
2955                        } => {
2956                            // TODO(alter_table): Support versioning tables that read from sources.
2957                            soft_assert_eq_or_log!(table.collections.len(), 1);
2958                            let collection_descs =
2959                                table.collection_descs().map(|(gid, _version, desc)| {
2960                                    (
2961                                        gid,
2962                                        source_desc(
2963                                            entry.latest_global_id(),
2964                                            data_source_desc,
2965                                            &desc,
2966                                            timeline,
2967                                        ),
2968                                    )
2969                                });
2970                            collections.extend(collection_descs);
2971                        }
2972                    };
2973                }
2974                CatalogItem::MaterializedView(mv) => {
2975                    let collection_descs = mv.collection_descs().map(|(gid, _version, desc)| {
2976                        let collection_desc =
2977                            CollectionDescription::for_other(desc, mv.initial_as_of.clone());
2978                        (gid, collection_desc)
2979                    });
2980
2981                    collections.extend(collection_descs);
2982                    compute_collections.push((mv.global_id_writes(), mv.desc.latest()));
2983                }
2984                CatalogItem::ContinualTask(ct) => {
2985                    let collection_desc =
2986                        CollectionDescription::for_other(ct.desc.clone(), ct.initial_as_of.clone());
2987                    compute_collections.push((ct.global_id(), ct.desc.clone()));
2988                    collections.push((ct.global_id(), collection_desc));
2989                }
2990                CatalogItem::Sink(sink) => {
2991                    let storage_sink_from_entry = self.catalog().get_entry_by_global_id(&sink.from);
2992                    let from_desc = storage_sink_from_entry
2993                        .relation_desc()
2994                        .expect("sinks can only be built on items with descs")
2995                        .into_owned();
2996                    let collection_desc = CollectionDescription {
2997                        // TODO(sinks): make generic once we have more than one sink type.
2998                        desc: KAFKA_PROGRESS_DESC.clone(),
2999                        data_source: DataSource::Sink {
3000                            desc: ExportDescription {
3001                                sink: StorageSinkDesc {
3002                                    from: sink.from,
3003                                    from_desc,
3004                                    connection: sink
3005                                        .connection
3006                                        .clone()
3007                                        .into_inline_connection(self.catalog().state()),
3008                                    envelope: sink.envelope,
3009                                    as_of: Antichain::from_elem(Timestamp::minimum()),
3010                                    with_snapshot: sink.with_snapshot,
3011                                    version: sink.version,
3012                                    from_storage_metadata: (),
3013                                    to_storage_metadata: (),
3014                                    commit_interval: sink.commit_interval,
3015                                },
3016                                instance_id: sink.cluster_id,
3017                            },
3018                        },
3019                        since: None,
3020                        timeline: None,
3021                        primary: None,
3022                    };
3023                    collections.push((sink.global_id, collection_desc));
3024                }
3025                CatalogItem::Log(_)
3026                | CatalogItem::View(_)
3027                | CatalogItem::Index(_)
3028                | CatalogItem::Type(_)
3029                | CatalogItem::Func(_)
3030                | CatalogItem::Secret(_)
3031                | CatalogItem::Connection(_) => (),
3032            }
3033        }
3034
3035        let register_ts = if self.controller.read_only() {
3036            self.get_local_read_ts().await
3037        } else {
3038            // Getting a write timestamp bumps the write timestamp in the
3039            // oracle, which we're not allowed in read-only mode.
3040            self.get_local_write_ts().await.timestamp
3041        };
3042
3043        // New builtin storage collections are by default created with [0] since/upper frontiers. For
3044        // collections that have dependencies on other collections (MVs, CTs), this can violate the
3045        // frontier invariants assumed by as-of selection. For example, as-of selection expects to
3046        // be able to pick up computing a materialized view from its most recent upper, but if that
3047        // upper is [0] it's likely that the required times are not available anymore in the MV
3048        // inputs.
3049        //
3050        // To avoid violating frontier invariants, we need to bump their sinces to times greater
3051        // than all of their upstream storage inputs. To know the since of a storage input, it has
3052        // to be registered with the storage controller first. Which means we need to split out
3053        // builtin storage collections that have storage inputs here, and then process them once
3054        // the input-less collections have been registered.
3055        let mut derived_builtin_storage_collections: Vec<_> = collections
3056            .extract_if(.., |(id, c)| {
3057                // Don't silently overwrite an explicitly specified `since`.
3058                if !id.is_system() || c.since.is_some() {
3059                    return false;
3060                }
3061
3062                use CatalogItem::*;
3063                match &self.catalog.get_entry_by_global_id(id).item {
3064                    // Not storage collections.
3065                    Log(_) | View(_) | Index(_) | Type(_) | Func(_) | Secret(_) | Connection(_) => {
3066                        false
3067                    }
3068                    // Storage collections without dependencies.
3069                    Table(_) | Source(_) => false,
3070                    // Storage collections with dependencies.
3071                    MaterializedView(_) | ContinualTask(_) => true,
3072                    // Storage collections not supported as builtin types.
3073                    Sink(_) => unimplemented!(),
3074                }
3075            })
3076            .collect();
3077
3078        let storage_metadata = self.catalog.state().storage_metadata();
3079        let migrated_storage_collections = migrated_storage_collections
3080            .into_iter()
3081            .flat_map(|item_id| self.catalog.get_entry(item_id).global_ids())
3082            .collect();
3083
3084        // Before possibly creating collections, make sure their schemas are correct.
3085        //
3086        // Across different versions of Materialize the nullability of columns can change based on
3087        // updates to our optimizer.
3088        self.controller
3089            .storage
3090            .evolve_nullability_for_bootstrap(storage_metadata, compute_collections)
3091            .await
3092            .unwrap_or_terminate("cannot fail to evolve collections");
3093
3094        self.controller
3095            .storage
3096            .create_collections_for_bootstrap(
3097                storage_metadata,
3098                Some(register_ts),
3099                collections,
3100                &migrated_storage_collections,
3101            )
3102            .await
3103            .unwrap_or_terminate("cannot fail to create collections");
3104
3105        // Bump and register the derived builtin collections, now that their (transitive) inputs
3106        // have been registered.
3107        for (gid, collection) in &mut derived_builtin_storage_collections {
3108            let entry = self.catalog.get_entry_by_global_id(gid);
3109            let mut derived_since = Antichain::from_elem(Timestamp::MIN);
3110            for dep_id in self.catalog.state().transitive_uses(entry.id()) {
3111                let entry = self.catalog.state().get_entry(&dep_id);
3112                let dep_gid = entry.latest_global_id();
3113
3114                // It's fine if the storage controller doesn't know about the collection. This
3115                // happens because the dependency is something else than a storage collection, or
3116                // because the dependency is the current collection itself (`transitive_uses` also
3117                // returns the input ID).
3118                if let Ok((since, _)) = self.controller.storage.collection_frontiers(dep_gid) {
3119                    derived_since.join_assign(&since);
3120                }
3121            }
3122            collection.since = Some(derived_since);
3123        }
3124        self.controller
3125            .storage
3126            .create_collections_for_bootstrap(
3127                storage_metadata,
3128                Some(register_ts),
3129                derived_builtin_storage_collections,
3130                &migrated_storage_collections,
3131            )
3132            .await
3133            .unwrap_or_terminate("cannot fail to create collections");
3134
3135        if !self.controller.read_only() {
3136            self.apply_local_write(register_ts).await;
3137        }
3138    }
3139
3140    /// Returns the current list of catalog entries, sorted into an appropriate order for
3141    /// bootstrapping.
3142    ///
3143    /// The returned entries are in dependency order. Indexes are sorted immediately after the
3144    /// objects they index, to ensure that all dependants of these indexed objects can make use of
3145    /// the respective indexes.
3146    fn bootstrap_sort_catalog_entries(&self) -> Vec<CatalogEntry> {
3147        let mut indexes_on = BTreeMap::<_, Vec<_>>::new();
3148        let mut non_indexes = Vec::new();
3149        for entry in self.catalog().entries().cloned() {
3150            if let Some(index) = entry.index() {
3151                let on = self.catalog().get_entry_by_global_id(&index.on);
3152                indexes_on.entry(on.id()).or_default().push(entry);
3153            } else {
3154                non_indexes.push(entry);
3155            }
3156        }
3157
3158        let key_fn = |entry: &CatalogEntry| entry.id;
3159        let dependencies_fn = |entry: &CatalogEntry| entry.uses();
3160        sort_topological(&mut non_indexes, key_fn, dependencies_fn);
3161
3162        let mut result = Vec::new();
3163        for entry in non_indexes {
3164            let id = entry.id();
3165            result.push(entry);
3166            if let Some(mut indexes) = indexes_on.remove(&id) {
3167                result.append(&mut indexes);
3168            }
3169        }
3170
3171        soft_assert_or_log!(
3172            indexes_on.is_empty(),
3173            "indexes with missing dependencies: {indexes_on:?}",
3174        );
3175
3176        result
3177    }
3178
3179    /// Invokes the optimizer on all indexes and materialized views in the catalog and inserts the
3180    /// resulting dataflow plans into the catalog state.
3181    ///
3182    /// `ordered_catalog_entries` must be sorted in dependency order, with dependencies ordered
3183    /// before their dependants.
3184    ///
3185    /// This method does not perform timestamp selection for the dataflows, nor does it create them
3186    /// in the compute controller. Both of these steps happen later during bootstrapping.
3187    ///
3188    /// Returns a map of expressions that were not cached.
3189    #[instrument]
3190    fn bootstrap_dataflow_plans(
3191        &mut self,
3192        ordered_catalog_entries: &[CatalogEntry],
3193        mut cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
3194    ) -> Result<BTreeMap<GlobalId, GlobalExpressions>, AdapterError> {
3195        // The optimizer expects to be able to query its `ComputeInstanceSnapshot` for
3196        // collections the current dataflow can depend on. But since we don't yet install anything
3197        // on compute instances, the snapshot information is incomplete. We fix that by manually
3198        // updating `ComputeInstanceSnapshot` objects to ensure they contain collections previously
3199        // optimized.
3200        let mut instance_snapshots = BTreeMap::new();
3201        let mut uncached_expressions = BTreeMap::new();
3202
3203        let optimizer_config = |catalog: &Catalog, cluster_id| {
3204            let system_config = catalog.system_config();
3205            let overrides = catalog.get_cluster(cluster_id).config.features();
3206            OptimizerConfig::from(system_config).override_from(&overrides)
3207        };
3208
3209        for entry in ordered_catalog_entries {
3210            match entry.item() {
3211                CatalogItem::Index(idx) => {
3212                    // Collect optimizer parameters.
3213                    let compute_instance =
3214                        instance_snapshots.entry(idx.cluster_id).or_insert_with(|| {
3215                            self.instance_snapshot(idx.cluster_id)
3216                                .expect("compute instance exists")
3217                        });
3218                    let global_id = idx.global_id();
3219
3220                    // The index may already be installed on the compute instance. For example,
3221                    // this is the case for introspection indexes.
3222                    if compute_instance.contains_collection(&global_id) {
3223                        continue;
3224                    }
3225
3226                    let optimizer_config = optimizer_config(&self.catalog, idx.cluster_id);
3227
3228                    let (optimized_plan, physical_plan, metainfo) =
3229                        match cached_global_exprs.remove(&global_id) {
3230                            Some(global_expressions)
3231                                if global_expressions.optimizer_features
3232                                    == optimizer_config.features =>
3233                            {
3234                                debug!("global expression cache hit for {global_id:?}");
3235                                (
3236                                    global_expressions.global_mir,
3237                                    global_expressions.physical_plan,
3238                                    global_expressions.dataflow_metainfos,
3239                                )
3240                            }
3241                            Some(_) | None => {
3242                                let (optimized_plan, global_lir_plan) = {
3243                                    // Build an optimizer for this INDEX.
3244                                    let mut optimizer = optimize::index::Optimizer::new(
3245                                        self.owned_catalog(),
3246                                        compute_instance.clone(),
3247                                        global_id,
3248                                        optimizer_config.clone(),
3249                                        self.optimizer_metrics(),
3250                                    );
3251
3252                                    // MIR ⇒ MIR optimization (global)
3253                                    let index_plan = optimize::index::Index::new(
3254                                        entry.name().clone(),
3255                                        idx.on,
3256                                        idx.keys.to_vec(),
3257                                    );
3258                                    let global_mir_plan = optimizer.optimize(index_plan)?;
3259                                    let optimized_plan = global_mir_plan.df_desc().clone();
3260
3261                                    // MIR ⇒ LIR lowering and LIR ⇒ LIR optimization (global)
3262                                    let global_lir_plan = optimizer.optimize(global_mir_plan)?;
3263
3264                                    (optimized_plan, global_lir_plan)
3265                                };
3266
3267                                let (physical_plan, metainfo) = global_lir_plan.unapply();
3268                                let metainfo = {
3269                                    // Pre-allocate a vector of transient GlobalIds for each notice.
3270                                    let notice_ids =
3271                                        std::iter::repeat_with(|| self.allocate_transient_id())
3272                                            .map(|(_item_id, gid)| gid)
3273                                            .take(metainfo.optimizer_notices.len())
3274                                            .collect::<Vec<_>>();
3275                                    // Return a metainfo with rendered notices.
3276                                    self.catalog().render_notices(
3277                                        metainfo,
3278                                        notice_ids,
3279                                        Some(idx.global_id()),
3280                                    )
3281                                };
3282                                uncached_expressions.insert(
3283                                    global_id,
3284                                    GlobalExpressions {
3285                                        global_mir: optimized_plan.clone(),
3286                                        physical_plan: physical_plan.clone(),
3287                                        dataflow_metainfos: metainfo.clone(),
3288                                        optimizer_features: optimizer_config.features.clone(),
3289                                    },
3290                                );
3291                                (optimized_plan, physical_plan, metainfo)
3292                            }
3293                        };
3294
3295                    let catalog = self.catalog_mut();
3296                    catalog.set_optimized_plan(idx.global_id(), optimized_plan);
3297                    catalog.set_physical_plan(idx.global_id(), physical_plan);
3298                    catalog.set_dataflow_metainfo(idx.global_id(), metainfo);
3299
3300                    compute_instance.insert_collection(idx.global_id());
3301                }
3302                CatalogItem::MaterializedView(mv) => {
3303                    // Collect optimizer parameters.
3304                    let compute_instance =
3305                        instance_snapshots.entry(mv.cluster_id).or_insert_with(|| {
3306                            self.instance_snapshot(mv.cluster_id)
3307                                .expect("compute instance exists")
3308                        });
3309                    let global_id = mv.global_id_writes();
3310
3311                    let optimizer_config = optimizer_config(&self.catalog, mv.cluster_id);
3312
3313                    let (optimized_plan, physical_plan, metainfo) =
3314                        match cached_global_exprs.remove(&global_id) {
3315                            Some(global_expressions)
3316                                if global_expressions.optimizer_features
3317                                    == optimizer_config.features =>
3318                            {
3319                                debug!("global expression cache hit for {global_id:?}");
3320                                (
3321                                    global_expressions.global_mir,
3322                                    global_expressions.physical_plan,
3323                                    global_expressions.dataflow_metainfos,
3324                                )
3325                            }
3326                            Some(_) | None => {
3327                                let (_, internal_view_id) = self.allocate_transient_id();
3328                                let debug_name = self
3329                                    .catalog()
3330                                    .resolve_full_name(entry.name(), None)
3331                                    .to_string();
3332                                let force_non_monotonic = Default::default();
3333
3334                                let (optimized_plan, global_lir_plan) = {
3335                                    // Build an optimizer for this MATERIALIZED VIEW.
3336                                    let mut optimizer = optimize::materialized_view::Optimizer::new(
3337                                        self.owned_catalog().as_optimizer_catalog(),
3338                                        compute_instance.clone(),
3339                                        global_id,
3340                                        internal_view_id,
3341                                        mv.desc.latest().iter_names().cloned().collect(),
3342                                        mv.non_null_assertions.clone(),
3343                                        mv.refresh_schedule.clone(),
3344                                        debug_name,
3345                                        optimizer_config.clone(),
3346                                        self.optimizer_metrics(),
3347                                        force_non_monotonic,
3348                                    );
3349
3350                                    // MIR ⇒ MIR optimization (global)
3351                                    // We make sure to use the HIR SQL type (since MIR SQL types may not be coherent).
3352                                    let typ = infer_sql_type_for_catalog(
3353                                        &mv.raw_expr,
3354                                        &mv.optimized_expr.as_ref().clone(),
3355                                    );
3356                                    let global_mir_plan = optimizer
3357                                        .optimize((mv.optimized_expr.as_ref().clone(), typ))?;
3358                                    let optimized_plan = global_mir_plan.df_desc().clone();
3359
3360                                    // MIR ⇒ LIR lowering and LIR ⇒ LIR optimization (global)
3361                                    let global_lir_plan = optimizer.optimize(global_mir_plan)?;
3362
3363                                    (optimized_plan, global_lir_plan)
3364                                };
3365
3366                                let (physical_plan, metainfo) = global_lir_plan.unapply();
3367                                let metainfo = {
3368                                    // Pre-allocate a vector of transient GlobalIds for each notice.
3369                                    let notice_ids =
3370                                        std::iter::repeat_with(|| self.allocate_transient_id())
3371                                            .map(|(_item_id, global_id)| global_id)
3372                                            .take(metainfo.optimizer_notices.len())
3373                                            .collect::<Vec<_>>();
3374                                    // Return a metainfo with rendered notices.
3375                                    self.catalog().render_notices(
3376                                        metainfo,
3377                                        notice_ids,
3378                                        Some(mv.global_id_writes()),
3379                                    )
3380                                };
3381                                uncached_expressions.insert(
3382                                    global_id,
3383                                    GlobalExpressions {
3384                                        global_mir: optimized_plan.clone(),
3385                                        physical_plan: physical_plan.clone(),
3386                                        dataflow_metainfos: metainfo.clone(),
3387                                        optimizer_features: optimizer_config.features.clone(),
3388                                    },
3389                                );
3390                                (optimized_plan, physical_plan, metainfo)
3391                            }
3392                        };
3393
3394                    let catalog = self.catalog_mut();
3395                    catalog.set_optimized_plan(mv.global_id_writes(), optimized_plan);
3396                    catalog.set_physical_plan(mv.global_id_writes(), physical_plan);
3397                    catalog.set_dataflow_metainfo(mv.global_id_writes(), metainfo);
3398
3399                    compute_instance.insert_collection(mv.global_id_writes());
3400                }
3401                CatalogItem::ContinualTask(ct) => {
3402                    let compute_instance =
3403                        instance_snapshots.entry(ct.cluster_id).or_insert_with(|| {
3404                            self.instance_snapshot(ct.cluster_id)
3405                                .expect("compute instance exists")
3406                        });
3407                    let global_id = ct.global_id();
3408
3409                    let optimizer_config = optimizer_config(&self.catalog, ct.cluster_id);
3410
3411                    let (optimized_plan, physical_plan, metainfo) =
3412                        match cached_global_exprs.remove(&global_id) {
3413                            Some(global_expressions)
3414                                if global_expressions.optimizer_features
3415                                    == optimizer_config.features =>
3416                            {
3417                                debug!("global expression cache hit for {global_id:?}");
3418                                (
3419                                    global_expressions.global_mir,
3420                                    global_expressions.physical_plan,
3421                                    global_expressions.dataflow_metainfos,
3422                                )
3423                            }
3424                            Some(_) | None => {
3425                                let debug_name = self
3426                                    .catalog()
3427                                    .resolve_full_name(entry.name(), None)
3428                                    .to_string();
3429                                let (optimized_plan, physical_plan, metainfo, optimizer_features) =
3430                                    self.optimize_create_continual_task(
3431                                        ct,
3432                                        global_id,
3433                                        self.owned_catalog(),
3434                                        debug_name,
3435                                    )?;
3436                                uncached_expressions.insert(
3437                                    global_id,
3438                                    GlobalExpressions {
3439                                        global_mir: optimized_plan.clone(),
3440                                        physical_plan: physical_plan.clone(),
3441                                        dataflow_metainfos: metainfo.clone(),
3442                                        optimizer_features,
3443                                    },
3444                                );
3445                                (optimized_plan, physical_plan, metainfo)
3446                            }
3447                        };
3448
3449                    let catalog = self.catalog_mut();
3450                    catalog.set_optimized_plan(ct.global_id(), optimized_plan);
3451                    catalog.set_physical_plan(ct.global_id(), physical_plan);
3452                    catalog.set_dataflow_metainfo(ct.global_id(), metainfo);
3453
3454                    compute_instance.insert_collection(ct.global_id());
3455                }
3456                CatalogItem::Table(_)
3457                | CatalogItem::Source(_)
3458                | CatalogItem::Log(_)
3459                | CatalogItem::View(_)
3460                | CatalogItem::Sink(_)
3461                | CatalogItem::Type(_)
3462                | CatalogItem::Func(_)
3463                | CatalogItem::Secret(_)
3464                | CatalogItem::Connection(_) => (),
3465            }
3466        }
3467
3468        Ok(uncached_expressions)
3469    }
3470
3471    /// Selects for each compute dataflow an as-of suitable for bootstrapping it.
3472    ///
3473    /// Returns a set of [`ReadHold`]s that ensures the read frontiers of involved collections stay
3474    /// in place and that must not be dropped before all compute dataflows have been created with
3475    /// the compute controller.
3476    ///
3477    /// This method expects all storage collections and dataflow plans to be available, so it must
3478    /// run after [`Coordinator::bootstrap_storage_collections`] and
3479    /// [`Coordinator::bootstrap_dataflow_plans`].
3480    async fn bootstrap_dataflow_as_ofs(&mut self) -> BTreeMap<GlobalId, ReadHold<Timestamp>> {
3481        let mut catalog_ids = Vec::new();
3482        let mut dataflows = Vec::new();
3483        let mut read_policies = BTreeMap::new();
3484        for entry in self.catalog.entries() {
3485            let gid = match entry.item() {
3486                CatalogItem::Index(idx) => idx.global_id(),
3487                CatalogItem::MaterializedView(mv) => mv.global_id_writes(),
3488                CatalogItem::ContinualTask(ct) => ct.global_id(),
3489                CatalogItem::Table(_)
3490                | CatalogItem::Source(_)
3491                | CatalogItem::Log(_)
3492                | CatalogItem::View(_)
3493                | CatalogItem::Sink(_)
3494                | CatalogItem::Type(_)
3495                | CatalogItem::Func(_)
3496                | CatalogItem::Secret(_)
3497                | CatalogItem::Connection(_) => continue,
3498            };
3499            if let Some(plan) = self.catalog.try_get_physical_plan(&gid) {
3500                catalog_ids.push(gid);
3501                dataflows.push(plan.clone());
3502
3503                if let Some(compaction_window) = entry.item().initial_logical_compaction_window() {
3504                    read_policies.insert(gid, compaction_window.into());
3505                }
3506            }
3507        }
3508
3509        let read_ts = self.get_local_read_ts().await;
3510        let read_holds = as_of_selection::run(
3511            &mut dataflows,
3512            &read_policies,
3513            &*self.controller.storage_collections,
3514            read_ts,
3515            self.controller.read_only(),
3516        );
3517
3518        let catalog = self.catalog_mut();
3519        for (id, plan) in catalog_ids.into_iter().zip_eq(dataflows) {
3520            catalog.set_physical_plan(id, plan);
3521        }
3522
3523        read_holds
3524    }
3525
3526    /// Serves the coordinator, receiving commands from users over `cmd_rx`
3527    /// and feedback from dataflow workers over `feedback_rx`.
3528    ///
3529    /// You must call `bootstrap` before calling this method.
3530    ///
3531    /// BOXED FUTURE: As of Nov 2023 the returned Future from this function was 92KB. This would
3532    /// get stored on the stack which is bad for runtime performance, and blow up our stack usage.
3533    /// Because of that we purposefully move this Future onto the heap (i.e. Box it).
3534    fn serve(
3535        mut self,
3536        mut internal_cmd_rx: mpsc::UnboundedReceiver<Message>,
3537        mut strict_serializable_reads_rx: mpsc::UnboundedReceiver<(ConnectionId, PendingReadTxn)>,
3538        mut cmd_rx: mpsc::UnboundedReceiver<(OpenTelemetryContext, Command)>,
3539        group_commit_rx: appends::GroupCommitWaiter,
3540    ) -> LocalBoxFuture<'static, ()> {
3541        async move {
3542            // Watcher that listens for and reports cluster service status changes.
3543            let mut cluster_events = self.controller.events_stream();
3544            let last_message = Arc::new(Mutex::new(LastMessage {
3545                kind: "none",
3546                stmt: None,
3547            }));
3548
3549            let (idle_tx, mut idle_rx) = tokio::sync::mpsc::channel(1);
3550            let idle_metric = self.metrics.queue_busy_seconds.clone();
3551            let last_message_watchdog = Arc::clone(&last_message);
3552
3553            spawn(|| "coord watchdog", async move {
3554                // Every 5 seconds, attempt to measure how long it takes for the
3555                // coord select loop to be empty, because this message is the last
3556                // processed. If it is idle, this will result in some microseconds
3557                // of measurement.
3558                let mut interval = tokio::time::interval(Duration::from_secs(5));
3559                // If we end up having to wait more than 5 seconds for the coord to respond, then the
3560                // behavior of Delay results in the interval "restarting" from whenever we yield
3561                // instead of trying to catch up.
3562                interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
3563
3564                // Track if we become stuck to de-dupe error reporting.
3565                let mut coord_stuck = false;
3566
3567                loop {
3568                    interval.tick().await;
3569
3570                    // Wait for space in the channel, if we timeout then the coordinator is stuck!
3571                    let duration = tokio::time::Duration::from_secs(30);
3572                    let timeout = tokio::time::timeout(duration, idle_tx.reserve()).await;
3573                    let Ok(maybe_permit) = timeout else {
3574                        // Only log if we're newly stuck, to prevent logging repeatedly.
3575                        if !coord_stuck {
3576                            let last_message = last_message_watchdog.lock().expect("poisoned");
3577                            tracing::warn!(
3578                                last_message_kind = %last_message.kind,
3579                                last_message_sql = %last_message.stmt_to_string(),
3580                                "coordinator stuck for {duration:?}",
3581                            );
3582                        }
3583                        coord_stuck = true;
3584
3585                        continue;
3586                    };
3587
3588                    // We got a permit, we're not stuck!
3589                    if coord_stuck {
3590                        tracing::info!("Coordinator became unstuck");
3591                    }
3592                    coord_stuck = false;
3593
3594                    // If we failed to acquire a permit it's because we're shutting down.
3595                    let Ok(permit) = maybe_permit else {
3596                        break;
3597                    };
3598
3599                    permit.send(idle_metric.start_timer());
3600                }
3601            });
3602
3603            self.schedule_storage_usage_collection().await;
3604            self.spawn_privatelink_vpc_endpoints_watch_task();
3605            self.spawn_statement_logging_task();
3606            flags::tracing_config(self.catalog.system_config()).apply(&self.tracing_handle);
3607
3608            // Report if the handling of a single message takes longer than this threshold.
3609            let warn_threshold = self
3610                .catalog()
3611                .system_config()
3612                .coord_slow_message_warn_threshold();
3613
3614            // How many messages we'd like to batch up before processing them. Must be > 0.
3615            const MESSAGE_BATCH: usize = 64;
3616            let mut messages = Vec::with_capacity(MESSAGE_BATCH);
3617            let mut cmd_messages = Vec::with_capacity(MESSAGE_BATCH);
3618
3619            let message_batch = self.metrics.message_batch.clone();
3620
3621            loop {
3622                // Before adding a branch to this select loop, please ensure that the branch is
3623                // cancellation safe and add a comment explaining why. You can refer here for more
3624                // info: https://docs.rs/tokio/latest/tokio/macro.select.html#cancellation-safety
3625                select! {
3626                    // We prioritize internal commands over other commands. However, we work through
3627                    // batches of commands in some branches of this select, which means that even if
3628                    // a command generates internal commands, we will work through the current batch
3629                    // before receiving a new batch of commands.
3630                    biased;
3631
3632                    // `recv_many()` on `UnboundedReceiver` is cancellation safe:
3633                    // https://docs.rs/tokio/1.38.0/tokio/sync/mpsc/struct.UnboundedReceiver.html#cancel-safety-1
3634                    // Receive a batch of commands.
3635                    _ = internal_cmd_rx.recv_many(&mut messages, MESSAGE_BATCH) => {},
3636                    // `next()` on any stream is cancel-safe:
3637                    // https://docs.rs/tokio-stream/0.1.9/tokio_stream/trait.StreamExt.html#cancel-safety
3638                    // Receive a single command.
3639                    Some(event) = cluster_events.next() => {
3640                        messages.push(Message::ClusterEvent(event))
3641                    },
3642                    // See [`mz_controller::Controller::Controller::ready`] for notes
3643                    // on why this is cancel-safe.
3644                    // Receive a single command.
3645                    () = self.controller.ready() => {
3646                        // NOTE: We don't get a `Readiness` back from `ready()`
3647                        // because the controller wants to keep it and it's not
3648                        // trivially `Clone` or `Copy`. Hence this accessor.
3649                        let controller = match self.controller.get_readiness() {
3650                            Readiness::Storage => ControllerReadiness::Storage,
3651                            Readiness::Compute => ControllerReadiness::Compute,
3652                            Readiness::Metrics(_) => ControllerReadiness::Metrics,
3653                            Readiness::Internal(_) => ControllerReadiness::Internal,
3654                            Readiness::NotReady => unreachable!("just signaled as ready"),
3655                        };
3656                        messages.push(Message::ControllerReady { controller });
3657                    }
3658                    // See [`appends::GroupCommitWaiter`] for notes on why this is cancel safe.
3659                    // Receive a single command.
3660                    permit = group_commit_rx.ready() => {
3661                        // If we happen to have batched exactly one user write, use
3662                        // that span so the `emit_trace_id_notice` hooks up.
3663                        // Otherwise, the best we can do is invent a new root span
3664                        // and make it follow from all the Spans in the pending
3665                        // writes.
3666                        let user_write_spans = self.pending_writes.iter().flat_map(|x| match x {
3667                            PendingWriteTxn::User{span, ..} => Some(span),
3668                            PendingWriteTxn::System{..} => None,
3669                        });
3670                        let span = match user_write_spans.exactly_one() {
3671                            Ok(span) => span.clone(),
3672                            Err(user_write_spans) => {
3673                                let span = info_span!(parent: None, "group_commit_notify");
3674                                for s in user_write_spans {
3675                                    span.follows_from(s);
3676                                }
3677                                span
3678                            }
3679                        };
3680                        messages.push(Message::GroupCommitInitiate(span, Some(permit)));
3681                    },
3682                    // `recv_many()` on `UnboundedReceiver` is cancellation safe:
3683                    // https://docs.rs/tokio/1.38.0/tokio/sync/mpsc/struct.UnboundedReceiver.html#cancel-safety-1
3684                    // Receive a batch of commands.
3685                    count = cmd_rx.recv_many(&mut cmd_messages, MESSAGE_BATCH) => {
3686                        if count == 0 {
3687                            break;
3688                        } else {
3689                            messages.extend(cmd_messages.drain(..).map(
3690                                |(otel_ctx, cmd)| Message::Command(otel_ctx, cmd),
3691                            ));
3692                        }
3693                    },
3694                    // `recv()` on `UnboundedReceiver` is cancellation safe:
3695                    // https://docs.rs/tokio/1.38.0/tokio/sync/mpsc/struct.UnboundedReceiver.html#cancel-safety
3696                    // Receive a single command.
3697                    Some(pending_read_txn) = strict_serializable_reads_rx.recv() => {
3698                        let mut pending_read_txns = vec![pending_read_txn];
3699                        while let Ok(pending_read_txn) = strict_serializable_reads_rx.try_recv() {
3700                            pending_read_txns.push(pending_read_txn);
3701                        }
3702                        for (conn_id, pending_read_txn) in pending_read_txns {
3703                            let prev = self
3704                                .pending_linearize_read_txns
3705                                .insert(conn_id, pending_read_txn);
3706                            soft_assert_or_log!(
3707                                prev.is_none(),
3708                                "connections can not have multiple concurrent reads, prev: {prev:?}"
3709                            )
3710                        }
3711                        messages.push(Message::LinearizeReads);
3712                    }
3713                    // `tick()` on `Interval` is cancel-safe:
3714                    // https://docs.rs/tokio/1.19.2/tokio/time/struct.Interval.html#cancel-safety
3715                    // Receive a single command.
3716                    _ = self.advance_timelines_interval.tick() => {
3717                        let span = info_span!(parent: None, "coord::advance_timelines_interval");
3718                        span.follows_from(Span::current());
3719
3720                        // Group commit sends an `AdvanceTimelines` message when
3721                        // done, which is what downgrades read holds. In
3722                        // read-only mode we send this message directly because
3723                        // we're not doing group commits.
3724                        if self.controller.read_only() {
3725                            messages.push(Message::AdvanceTimelines);
3726                        } else {
3727                            messages.push(Message::GroupCommitInitiate(span, None));
3728                        }
3729                    },
3730                    // `tick()` on `Interval` is cancel-safe:
3731                    // https://docs.rs/tokio/1.19.2/tokio/time/struct.Interval.html#cancel-safety
3732                    // Receive a single command.
3733                    _ = self.check_cluster_scheduling_policies_interval.tick() => {
3734                        messages.push(Message::CheckSchedulingPolicies);
3735                    },
3736
3737                    // `tick()` on `Interval` is cancel-safe:
3738                    // https://docs.rs/tokio/1.19.2/tokio/time/struct.Interval.html#cancel-safety
3739                    // Receive a single command.
3740                    _ = self.caught_up_check_interval.tick() => {
3741                        // We do this directly on the main loop instead of
3742                        // firing off a message. We are still in read-only mode,
3743                        // so optimizing for latency, not blocking the main loop
3744                        // is not that important.
3745                        self.maybe_check_caught_up().await;
3746
3747                        continue;
3748                    },
3749
3750                    // Process the idle metric at the lowest priority to sample queue non-idle time.
3751                    // `recv()` on `Receiver` is cancellation safe:
3752                    // https://docs.rs/tokio/1.8.0/tokio/sync/mpsc/struct.Receiver.html#cancel-safety
3753                    // Receive a single command.
3754                    timer = idle_rx.recv() => {
3755                        timer.expect("does not drop").observe_duration();
3756                        self.metrics
3757                            .message_handling
3758                            .with_label_values(&["watchdog"])
3759                            .observe(0.0);
3760                        continue;
3761                    }
3762                };
3763
3764                // Observe the number of messages we're processing at once.
3765                message_batch.observe(f64::cast_lossy(messages.len()));
3766
3767                for msg in messages.drain(..) {
3768                    // All message processing functions trace. Start a parent span
3769                    // for them to make it easy to find slow messages.
3770                    let msg_kind = msg.kind();
3771                    let span = span!(
3772                        target: "mz_adapter::coord::handle_message_loop",
3773                        Level::INFO,
3774                        "coord::handle_message",
3775                        kind = msg_kind
3776                    );
3777                    let otel_context = span.context().span().span_context().clone();
3778
3779                    // Record the last kind of message in case we get stuck. For
3780                    // execute commands, we additionally stash the user's SQL,
3781                    // statement, so we can log it in case we get stuck.
3782                    *last_message.lock().expect("poisoned") = LastMessage {
3783                        kind: msg_kind,
3784                        stmt: match &msg {
3785                            Message::Command(
3786                                _,
3787                                Command::Execute {
3788                                    portal_name,
3789                                    session,
3790                                    ..
3791                                },
3792                            ) => session
3793                                .get_portal_unverified(portal_name)
3794                                .and_then(|p| p.stmt.as_ref().map(Arc::clone)),
3795                            _ => None,
3796                        },
3797                    };
3798
3799                    let start = Instant::now();
3800                    self.handle_message(msg).instrument(span).await;
3801                    let duration = start.elapsed();
3802
3803                    self.metrics
3804                        .message_handling
3805                        .with_label_values(&[msg_kind])
3806                        .observe(duration.as_secs_f64());
3807
3808                    // If something is _really_ slow, print a trace id for debugging, if OTEL is enabled.
3809                    if duration > warn_threshold {
3810                        let trace_id = otel_context.is_valid().then(|| otel_context.trace_id());
3811                        tracing::error!(
3812                            ?msg_kind,
3813                            ?trace_id,
3814                            ?duration,
3815                            "very slow coordinator message"
3816                        );
3817                    }
3818                }
3819            }
3820            // Try and cleanup as a best effort. There may be some async tasks out there holding a
3821            // reference that prevents us from cleaning up.
3822            if let Some(catalog) = Arc::into_inner(self.catalog) {
3823                catalog.expire().await;
3824            }
3825        }
3826        .boxed_local()
3827    }
3828
3829    /// Obtain a read-only Catalog reference.
3830    fn catalog(&self) -> &Catalog {
3831        &self.catalog
3832    }
3833
3834    /// Obtain a read-only Catalog snapshot, suitable for giving out to
3835    /// non-Coordinator thread tasks.
3836    fn owned_catalog(&self) -> Arc<Catalog> {
3837        Arc::clone(&self.catalog)
3838    }
3839
3840    /// Obtain a handle to the optimizer metrics, suitable for giving
3841    /// out to non-Coordinator thread tasks.
3842    fn optimizer_metrics(&self) -> OptimizerMetrics {
3843        self.optimizer_metrics.clone()
3844    }
3845
3846    /// Obtain a writeable Catalog reference.
3847    fn catalog_mut(&mut self) -> &mut Catalog {
3848        // make_mut will cause any other Arc references (from owned_catalog) to
3849        // continue to be valid by cloning the catalog, putting it in a new Arc,
3850        // which lives at self._catalog. If there are no other Arc references,
3851        // then no clone is made, and it returns a reference to the existing
3852        // object. This makes this method and owned_catalog both very cheap: at
3853        // most one clone per catalog mutation, but only if there's a read-only
3854        // reference to it.
3855        Arc::make_mut(&mut self.catalog)
3856    }
3857
3858    /// Refills the user ID pool by allocating IDs from the catalog.
3859    ///
3860    /// Requests `max(min_count, batch_size)` IDs so the pool is never
3861    /// under-filled relative to the configured batch size.
3862    async fn refill_user_id_pool(&mut self, min_count: u64) -> Result<(), AdapterError> {
3863        let batch_size = USER_ID_POOL_BATCH_SIZE.get(self.catalog().system_config().dyncfgs());
3864        let to_allocate = min_count.max(u64::from(batch_size));
3865        let id_ts = self.get_catalog_write_ts().await;
3866        let ids = self.catalog().allocate_user_ids(to_allocate, id_ts).await?;
3867        if let (Some((first_id, _)), Some((last_id, _))) = (ids.first(), ids.last()) {
3868            let start = match first_id {
3869                CatalogItemId::User(id) => *id,
3870                other => {
3871                    return Err(AdapterError::Internal(format!(
3872                        "expected User CatalogItemId, got {other:?}"
3873                    )));
3874                }
3875            };
3876            let end = match last_id {
3877                CatalogItemId::User(id) => *id + 1, // exclusive upper bound
3878                other => {
3879                    return Err(AdapterError::Internal(format!(
3880                        "expected User CatalogItemId, got {other:?}"
3881                    )));
3882                }
3883            };
3884            self.user_id_pool.refill(start, end);
3885        } else {
3886            return Err(AdapterError::Internal(
3887                "catalog returned no user IDs".into(),
3888            ));
3889        }
3890        Ok(())
3891    }
3892
3893    /// Allocates a single user ID, refilling the pool from the catalog if needed.
3894    async fn allocate_user_id(&mut self) -> Result<(CatalogItemId, GlobalId), AdapterError> {
3895        if let Some(id) = self.user_id_pool.allocate() {
3896            return Ok((CatalogItemId::User(id), GlobalId::User(id)));
3897        }
3898        self.refill_user_id_pool(1).await?;
3899        let id = self.user_id_pool.allocate().expect("ID pool just refilled");
3900        Ok((CatalogItemId::User(id), GlobalId::User(id)))
3901    }
3902
3903    /// Allocates `count` user IDs, refilling the pool from the catalog if needed.
3904    async fn allocate_user_ids(
3905        &mut self,
3906        count: u64,
3907    ) -> Result<Vec<(CatalogItemId, GlobalId)>, AdapterError> {
3908        if self.user_id_pool.remaining() < count {
3909            self.refill_user_id_pool(count).await?;
3910        }
3911        let raw_ids = self
3912            .user_id_pool
3913            .allocate_many(count)
3914            .expect("pool has enough IDs after refill");
3915        Ok(raw_ids
3916            .into_iter()
3917            .map(|id| (CatalogItemId::User(id), GlobalId::User(id)))
3918            .collect())
3919    }
3920
3921    /// Obtain a reference to the coordinator's connection context.
3922    fn connection_context(&self) -> &ConnectionContext {
3923        self.controller.connection_context()
3924    }
3925
3926    /// Obtain a reference to the coordinator's secret reader, in an `Arc`.
3927    fn secrets_reader(&self) -> &Arc<dyn SecretsReader> {
3928        &self.connection_context().secrets_reader
3929    }
3930
3931    /// Publishes a notice message to all sessions.
3932    ///
3933    /// TODO(parkmycar): This code is dead, but is a nice parallel to [`Coordinator::broadcast_notice_tx`]
3934    /// so we keep it around.
3935    #[allow(dead_code)]
3936    pub(crate) fn broadcast_notice(&self, notice: AdapterNotice) {
3937        for meta in self.active_conns.values() {
3938            let _ = meta.notice_tx.send(notice.clone());
3939        }
3940    }
3941
3942    /// Returns a closure that will publish a notice to all sessions that were active at the time
3943    /// this method was called.
3944    pub(crate) fn broadcast_notice_tx(
3945        &self,
3946    ) -> Box<dyn FnOnce(AdapterNotice) -> () + Send + 'static> {
3947        let senders: Vec<_> = self
3948            .active_conns
3949            .values()
3950            .map(|meta| meta.notice_tx.clone())
3951            .collect();
3952        Box::new(move |notice| {
3953            for tx in senders {
3954                let _ = tx.send(notice.clone());
3955            }
3956        })
3957    }
3958
3959    pub(crate) fn active_conns(&self) -> &BTreeMap<ConnectionId, ConnMeta> {
3960        &self.active_conns
3961    }
3962
3963    #[instrument(level = "debug")]
3964    pub(crate) fn retire_execution(
3965        &mut self,
3966        reason: StatementEndedExecutionReason,
3967        ctx_extra: ExecuteContextExtra,
3968    ) {
3969        if let Some(uuid) = ctx_extra.retire() {
3970            self.end_statement_execution(uuid, reason);
3971        }
3972    }
3973
3974    /// Creates a new dataflow builder from the catalog and indexes in `self`.
3975    #[instrument(level = "debug")]
3976    pub fn dataflow_builder(&self, instance: ComputeInstanceId) -> DataflowBuilder<'_> {
3977        let compute = self
3978            .instance_snapshot(instance)
3979            .expect("compute instance does not exist");
3980        DataflowBuilder::new(self.catalog().state(), compute)
3981    }
3982
3983    /// Return a reference-less snapshot to the indicated compute instance.
3984    pub fn instance_snapshot(
3985        &self,
3986        id: ComputeInstanceId,
3987    ) -> Result<ComputeInstanceSnapshot, InstanceMissing> {
3988        ComputeInstanceSnapshot::new(&self.controller, id)
3989    }
3990
3991    /// Call into the compute controller to install a finalized dataflow, and
3992    /// initialize the read policies for its exported readable objects.
3993    ///
3994    /// # Panics
3995    ///
3996    /// Panics if dataflow creation fails.
3997    pub(crate) async fn ship_dataflow(
3998        &mut self,
3999        dataflow: DataflowDescription<Plan>,
4000        instance: ComputeInstanceId,
4001        target_replica: Option<ReplicaId>,
4002    ) {
4003        self.try_ship_dataflow(dataflow, instance, target_replica)
4004            .await
4005            .unwrap_or_terminate("dataflow creation cannot fail");
4006    }
4007
4008    /// Call into the compute controller to install a finalized dataflow, and
4009    /// initialize the read policies for its exported readable objects.
4010    pub(crate) async fn try_ship_dataflow(
4011        &mut self,
4012        dataflow: DataflowDescription<Plan>,
4013        instance: ComputeInstanceId,
4014        target_replica: Option<ReplicaId>,
4015    ) -> Result<(), DataflowCreationError> {
4016        // We must only install read policies for indexes, not for sinks.
4017        // Sinks are write-only compute collections that don't have read policies.
4018        let export_ids = dataflow.exported_index_ids().collect();
4019
4020        self.controller
4021            .compute
4022            .create_dataflow(instance, dataflow, target_replica)?;
4023
4024        self.initialize_compute_read_policies(export_ids, instance, CompactionWindow::Default)
4025            .await;
4026
4027        Ok(())
4028    }
4029
4030    /// Call into the compute controller to allow writes to the specified IDs
4031    /// from the specified instance. Calling this function multiple times and
4032    /// calling it on a read-only instance has no effect.
4033    pub(crate) fn allow_writes(&mut self, instance: ComputeInstanceId, id: GlobalId) {
4034        self.controller
4035            .compute
4036            .allow_writes(instance, id)
4037            .unwrap_or_terminate("allow_writes cannot fail");
4038    }
4039
4040    /// Like `ship_dataflow`, but also await on builtin table updates.
4041    pub(crate) async fn ship_dataflow_and_notice_builtin_table_updates(
4042        &mut self,
4043        dataflow: DataflowDescription<Plan>,
4044        instance: ComputeInstanceId,
4045        notice_builtin_updates_fut: Option<BuiltinTableAppendNotify>,
4046        target_replica: Option<ReplicaId>,
4047    ) {
4048        if let Some(notice_builtin_updates_fut) = notice_builtin_updates_fut {
4049            let ship_dataflow_fut = self.ship_dataflow(dataflow, instance, target_replica);
4050            let ((), ()) =
4051                futures::future::join(notice_builtin_updates_fut, ship_dataflow_fut).await;
4052        } else {
4053            self.ship_dataflow(dataflow, instance, target_replica).await;
4054        }
4055    }
4056
4057    /// Install a _watch set_ in the controller that is automatically associated with the given
4058    /// connection id. The watchset will be automatically cleared if the connection terminates
4059    /// before the watchset completes.
4060    pub fn install_compute_watch_set(
4061        &mut self,
4062        conn_id: ConnectionId,
4063        objects: BTreeSet<GlobalId>,
4064        t: Timestamp,
4065        state: WatchSetResponse,
4066    ) -> Result<(), CollectionLookupError> {
4067        let ws_id = self.controller.install_compute_watch_set(objects, t)?;
4068        self.connection_watch_sets
4069            .entry(conn_id.clone())
4070            .or_default()
4071            .insert(ws_id);
4072        self.installed_watch_sets.insert(ws_id, (conn_id, state));
4073        Ok(())
4074    }
4075
4076    /// Install a _watch set_ in the controller that is automatically associated with the given
4077    /// connection id. The watchset will be automatically cleared if the connection terminates
4078    /// before the watchset completes.
4079    pub fn install_storage_watch_set(
4080        &mut self,
4081        conn_id: ConnectionId,
4082        objects: BTreeSet<GlobalId>,
4083        t: Timestamp,
4084        state: WatchSetResponse,
4085    ) -> Result<(), CollectionMissing> {
4086        let ws_id = self.controller.install_storage_watch_set(objects, t)?;
4087        self.connection_watch_sets
4088            .entry(conn_id.clone())
4089            .or_default()
4090            .insert(ws_id);
4091        self.installed_watch_sets.insert(ws_id, (conn_id, state));
4092        Ok(())
4093    }
4094
4095    /// Cancels pending watchsets associated with the provided connection id.
4096    pub fn cancel_pending_watchsets(&mut self, conn_id: &ConnectionId) {
4097        if let Some(ws_ids) = self.connection_watch_sets.remove(conn_id) {
4098            for ws_id in ws_ids {
4099                self.installed_watch_sets.remove(&ws_id);
4100            }
4101        }
4102    }
4103
4104    /// Returns the state of the [`Coordinator`] formatted as JSON.
4105    ///
4106    /// The returned value is not guaranteed to be stable and may change at any point in time.
4107    pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
4108        // Note: We purposefully use the `Debug` formatting for the value of all fields in the
4109        // returned object as a tradeoff between usability and stability. `serde_json` will fail
4110        // to serialize an object if the keys aren't strings, so `Debug` formatting the values
4111        // prevents a future unrelated change from silently breaking this method.
4112
4113        let global_timelines: BTreeMap<_, _> = self
4114            .global_timelines
4115            .iter()
4116            .map(|(timeline, state)| (timeline.to_string(), format!("{state:?}")))
4117            .collect();
4118        let active_conns: BTreeMap<_, _> = self
4119            .active_conns
4120            .iter()
4121            .map(|(id, meta)| (id.unhandled().to_string(), format!("{meta:?}")))
4122            .collect();
4123        let txn_read_holds: BTreeMap<_, _> = self
4124            .txn_read_holds
4125            .iter()
4126            .map(|(id, capability)| (id.unhandled().to_string(), format!("{capability:?}")))
4127            .collect();
4128        let pending_peeks: BTreeMap<_, _> = self
4129            .pending_peeks
4130            .iter()
4131            .map(|(id, peek)| (id.to_string(), format!("{peek:?}")))
4132            .collect();
4133        let client_pending_peeks: BTreeMap<_, _> = self
4134            .client_pending_peeks
4135            .iter()
4136            .map(|(id, peek)| {
4137                let peek: BTreeMap<_, _> = peek
4138                    .iter()
4139                    .map(|(uuid, storage_id)| (uuid.to_string(), storage_id))
4140                    .collect();
4141                (id.to_string(), peek)
4142            })
4143            .collect();
4144        let pending_linearize_read_txns: BTreeMap<_, _> = self
4145            .pending_linearize_read_txns
4146            .iter()
4147            .map(|(id, read_txn)| (id.unhandled().to_string(), format!("{read_txn:?}")))
4148            .collect();
4149
4150        Ok(serde_json::json!({
4151            "global_timelines": global_timelines,
4152            "active_conns": active_conns,
4153            "txn_read_holds": txn_read_holds,
4154            "pending_peeks": pending_peeks,
4155            "client_pending_peeks": client_pending_peeks,
4156            "pending_linearize_read_txns": pending_linearize_read_txns,
4157            "controller": self.controller.dump().await?,
4158        }))
4159    }
4160
4161    /// Prune all storage usage events from the [`MZ_STORAGE_USAGE_BY_SHARD`] table that are older
4162    /// than `retention_period`.
4163    ///
4164    /// This method will read the entire contents of [`MZ_STORAGE_USAGE_BY_SHARD`] into memory
4165    /// which can be expensive.
4166    ///
4167    /// DO NOT call this method outside of startup. The safety of reading at the current oracle read
4168    /// timestamp and then writing at whatever the current write timestamp is (instead of
4169    /// `read_ts + 1`) relies on the fact that there are no outstanding writes during startup.
4170    ///
4171    /// Group commit, which this method uses to write the retractions, has builtin fencing, and we
4172    /// never commit retractions to [`MZ_STORAGE_USAGE_BY_SHARD`] outside of this method, which is
4173    /// only called once during startup. So we don't have to worry about double/invalid retractions.
4174    async fn prune_storage_usage_events_on_startup(&self, retention_period: Duration) {
4175        let item_id = self
4176            .catalog()
4177            .resolve_builtin_table(&MZ_STORAGE_USAGE_BY_SHARD);
4178        let global_id = self.catalog.get_entry(&item_id).latest_global_id();
4179        let read_ts = self.get_local_read_ts().await;
4180        let current_contents_fut = self
4181            .controller
4182            .storage_collections
4183            .snapshot(global_id, read_ts);
4184        let internal_cmd_tx = self.internal_cmd_tx.clone();
4185        spawn(|| "storage_usage_prune", async move {
4186            let mut current_contents = current_contents_fut
4187                .await
4188                .unwrap_or_terminate("cannot fail to fetch snapshot");
4189            differential_dataflow::consolidation::consolidate(&mut current_contents);
4190
4191            let cutoff_ts = u128::from(read_ts).saturating_sub(retention_period.as_millis());
4192            let mut expired = Vec::new();
4193            for (row, diff) in current_contents {
4194                assert_eq!(
4195                    diff, 1,
4196                    "consolidated contents should not contain retractions: ({row:#?}, {diff:#?})"
4197                );
4198                // This logic relies on the definition of `mz_storage_usage_by_shard` not changing.
4199                let collection_timestamp = row
4200                    .unpack()
4201                    .get(3)
4202                    .expect("definition of mz_storage_by_shard changed")
4203                    .unwrap_timestamptz();
4204                let collection_timestamp = collection_timestamp.timestamp_millis();
4205                let collection_timestamp: u128 = collection_timestamp
4206                    .try_into()
4207                    .expect("all collections happen after Jan 1 1970");
4208                if collection_timestamp < cutoff_ts {
4209                    debug!("pruning storage event {row:?}");
4210                    let builtin_update = BuiltinTableUpdate::row(item_id, row, Diff::MINUS_ONE);
4211                    expired.push(builtin_update);
4212                }
4213            }
4214
4215            // main thread has shut down.
4216            let _ = internal_cmd_tx.send(Message::StorageUsagePrune(expired));
4217        });
4218    }
4219
4220    fn current_credit_consumption_rate(&self) -> Numeric {
4221        self.catalog()
4222            .user_cluster_replicas()
4223            .filter_map(|replica| match &replica.config.location {
4224                ReplicaLocation::Managed(location) => Some(location.size_for_billing()),
4225                ReplicaLocation::Unmanaged(_) => None,
4226            })
4227            .map(|size| {
4228                self.catalog()
4229                    .cluster_replica_sizes()
4230                    .0
4231                    .get(size)
4232                    .expect("location size is validated against the cluster replica sizes")
4233                    .credits_per_hour
4234            })
4235            .sum()
4236    }
4237}
4238
4239#[cfg(test)]
4240impl Coordinator {
4241    #[allow(dead_code)]
4242    async fn verify_ship_dataflow_no_error(&mut self, dataflow: DataflowDescription<Plan>) {
4243        // `ship_dataflow_new` is not allowed to have a `Result` return because this function is
4244        // called after `catalog_transact`, after which no errors are allowed. This test exists to
4245        // prevent us from incorrectly teaching those functions how to return errors (which has
4246        // happened twice and is the motivation for this test).
4247
4248        // An arbitrary compute instance ID to satisfy the function calls below. Note that
4249        // this only works because this function will never run.
4250        let compute_instance = ComputeInstanceId::user(1).expect("1 is a valid ID");
4251
4252        let _: () = self.ship_dataflow(dataflow, compute_instance, None).await;
4253    }
4254}
4255
4256/// Contains information about the last message the [`Coordinator`] processed.
4257struct LastMessage {
4258    kind: &'static str,
4259    stmt: Option<Arc<Statement<Raw>>>,
4260}
4261
4262impl LastMessage {
4263    /// Returns a redacted version of the statement that is safe for logs.
4264    fn stmt_to_string(&self) -> Cow<'static, str> {
4265        self.stmt
4266            .as_ref()
4267            .map(|stmt| stmt.to_ast_string_redacted().into())
4268            .unwrap_or(Cow::Borrowed("<none>"))
4269    }
4270}
4271
4272impl fmt::Debug for LastMessage {
4273    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4274        f.debug_struct("LastMessage")
4275            .field("kind", &self.kind)
4276            .field("stmt", &self.stmt_to_string())
4277            .finish()
4278    }
4279}
4280
4281impl Drop for LastMessage {
4282    fn drop(&mut self) {
4283        // Only print the last message if we're currently panicking, otherwise we'd spam our logs.
4284        if std::thread::panicking() {
4285            // If we're panicking theres no guarantee `tracing` still works, so print to stderr.
4286            eprintln!("Coordinator panicking, dumping last message\n{self:?}",);
4287        }
4288    }
4289}
4290
4291/// Serves the coordinator based on the provided configuration.
4292///
4293/// For a high-level description of the coordinator, see the [crate
4294/// documentation](crate).
4295///
4296/// Returns a handle to the coordinator and a client to communicate with the
4297/// coordinator.
4298///
4299/// BOXED FUTURE: As of Nov 2023 the returned Future from this function was 42KB. This would
4300/// get stored on the stack which is bad for runtime performance, and blow up our stack usage.
4301/// Because of that we purposefully move this Future onto the heap (i.e. Box it).
4302pub fn serve(
4303    Config {
4304        controller_config,
4305        controller_envd_epoch,
4306        mut storage,
4307        audit_logs_iterator,
4308        timestamp_oracle_url,
4309        unsafe_mode,
4310        all_features,
4311        build_info,
4312        environment_id,
4313        metrics_registry,
4314        now,
4315        secrets_controller,
4316        cloud_resource_controller,
4317        cluster_replica_sizes,
4318        builtin_system_cluster_config,
4319        builtin_catalog_server_cluster_config,
4320        builtin_probe_cluster_config,
4321        builtin_support_cluster_config,
4322        builtin_analytics_cluster_config,
4323        system_parameter_defaults,
4324        availability_zones,
4325        storage_usage_client,
4326        storage_usage_collection_interval,
4327        storage_usage_retention_period,
4328        segment_client,
4329        egress_addresses,
4330        aws_account_id,
4331        aws_privatelink_availability_zones,
4332        connection_context,
4333        connection_limit_callback,
4334        remote_system_parameters,
4335        webhook_concurrency_limit,
4336        http_host_name,
4337        tracing_handle,
4338        read_only_controllers,
4339        caught_up_trigger: clusters_caught_up_trigger,
4340        helm_chart_version,
4341        license_key,
4342        external_login_password_mz_system,
4343        force_builtin_schema_migration,
4344    }: Config,
4345) -> BoxFuture<'static, Result<(Handle, Client), AdapterError>> {
4346    async move {
4347        let coord_start = Instant::now();
4348        info!("startup: coordinator init: beginning");
4349        info!("startup: coordinator init: preamble beginning");
4350
4351        // Initializing the builtins can be an expensive process and consume a lot of memory. We
4352        // forcibly initialize it early while the stack is relatively empty to avoid stack
4353        // overflows later.
4354        let _builtins = LazyLock::force(&BUILTINS_STATIC);
4355
4356        let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
4357        let (internal_cmd_tx, internal_cmd_rx) = mpsc::unbounded_channel();
4358        let (strict_serializable_reads_tx, strict_serializable_reads_rx) =
4359            mpsc::unbounded_channel();
4360
4361        // Validate and process availability zones.
4362        if !availability_zones.iter().all_unique() {
4363            coord_bail!("availability zones must be unique");
4364        }
4365
4366        let aws_principal_context = match (
4367            aws_account_id,
4368            connection_context.aws_external_id_prefix.clone(),
4369        ) {
4370            (Some(aws_account_id), Some(aws_external_id_prefix)) => Some(AwsPrincipalContext {
4371                aws_account_id,
4372                aws_external_id_prefix,
4373            }),
4374            _ => None,
4375        };
4376
4377        let aws_privatelink_availability_zones = aws_privatelink_availability_zones
4378            .map(|azs_vec| BTreeSet::from_iter(azs_vec.iter().cloned()));
4379
4380        info!(
4381            "startup: coordinator init: preamble complete in {:?}",
4382            coord_start.elapsed()
4383        );
4384        let oracle_init_start = Instant::now();
4385        info!("startup: coordinator init: timestamp oracle init beginning");
4386
4387        let timestamp_oracle_config = timestamp_oracle_url
4388            .map(|url| TimestampOracleConfig::from_url(&url, &metrics_registry))
4389            .transpose()?;
4390        let mut initial_timestamps =
4391            get_initial_oracle_timestamps(&timestamp_oracle_config).await?;
4392
4393        // Insert an entry for the `EpochMilliseconds` timeline if one doesn't exist,
4394        // which will ensure that the timeline is initialized since it's required
4395        // by the system.
4396        initial_timestamps
4397            .entry(Timeline::EpochMilliseconds)
4398            .or_insert_with(mz_repr::Timestamp::minimum);
4399        let mut timestamp_oracles = BTreeMap::new();
4400        for (timeline, initial_timestamp) in initial_timestamps {
4401            Coordinator::ensure_timeline_state_with_initial_time(
4402                &timeline,
4403                initial_timestamp,
4404                now.clone(),
4405                timestamp_oracle_config.clone(),
4406                &mut timestamp_oracles,
4407                read_only_controllers,
4408            )
4409            .await;
4410        }
4411
4412        // Opening the durable catalog uses one or more timestamps without communicating with
4413        // the timestamp oracle. Here we make sure to apply the catalog upper with the timestamp
4414        // oracle to linearize future operations with opening the catalog.
4415        let catalog_upper = storage.current_upper().await;
4416        // Choose a time at which to boot. This is used, for example, to prune
4417        // old storage usage data or migrate audit log entries.
4418        //
4419        // This time is usually the current system time, but with protection
4420        // against backwards time jumps, even across restarts.
4421        let epoch_millis_oracle = &timestamp_oracles
4422            .get(&Timeline::EpochMilliseconds)
4423            .expect("inserted above")
4424            .oracle;
4425
4426        let mut boot_ts = if read_only_controllers {
4427            let read_ts = epoch_millis_oracle.read_ts().await;
4428            std::cmp::max(read_ts, catalog_upper)
4429        } else {
4430            // Getting/applying a write timestamp bumps the write timestamp in the
4431            // oracle, which we're not allowed in read-only mode.
4432            epoch_millis_oracle.apply_write(catalog_upper).await;
4433            epoch_millis_oracle.write_ts().await.timestamp
4434        };
4435
4436        info!(
4437            "startup: coordinator init: timestamp oracle init complete in {:?}",
4438            oracle_init_start.elapsed()
4439        );
4440
4441        let catalog_open_start = Instant::now();
4442        info!("startup: coordinator init: catalog open beginning");
4443        let persist_client = controller_config
4444            .persist_clients
4445            .open(controller_config.persist_location.clone())
4446            .await
4447            .context("opening persist client")?;
4448        let builtin_item_migration_config =
4449            BuiltinItemMigrationConfig {
4450                persist_client: persist_client.clone(),
4451                read_only: read_only_controllers,
4452                force_migration: force_builtin_schema_migration,
4453            }
4454        ;
4455        let OpenCatalogResult {
4456            mut catalog,
4457            migrated_storage_collections_0dt,
4458            new_builtin_collections,
4459            builtin_table_updates,
4460            cached_global_exprs,
4461            uncached_local_exprs,
4462        } = Catalog::open(mz_catalog::config::Config {
4463            storage,
4464            metrics_registry: &metrics_registry,
4465            state: mz_catalog::config::StateConfig {
4466                unsafe_mode,
4467                all_features,
4468                build_info,
4469                environment_id: environment_id.clone(),
4470                read_only: read_only_controllers,
4471                now: now.clone(),
4472                boot_ts: boot_ts.clone(),
4473                skip_migrations: false,
4474                cluster_replica_sizes,
4475                builtin_system_cluster_config,
4476                builtin_catalog_server_cluster_config,
4477                builtin_probe_cluster_config,
4478                builtin_support_cluster_config,
4479                builtin_analytics_cluster_config,
4480                system_parameter_defaults,
4481                remote_system_parameters,
4482                availability_zones,
4483                egress_addresses,
4484                aws_principal_context,
4485                aws_privatelink_availability_zones,
4486                connection_context,
4487                http_host_name,
4488                builtin_item_migration_config,
4489                persist_client: persist_client.clone(),
4490                enable_expression_cache_override: None,
4491                helm_chart_version,
4492                external_login_password_mz_system,
4493                license_key: license_key.clone(),
4494            },
4495        })
4496        .await?;
4497
4498        // Opening the catalog uses one or more timestamps, so push the boot timestamp up to the
4499        // current catalog upper.
4500        let catalog_upper = catalog.current_upper().await;
4501        boot_ts = std::cmp::max(boot_ts, catalog_upper);
4502
4503        if !read_only_controllers {
4504            epoch_millis_oracle.apply_write(boot_ts).await;
4505        }
4506
4507        info!(
4508            "startup: coordinator init: catalog open complete in {:?}",
4509            catalog_open_start.elapsed()
4510        );
4511
4512        let coord_thread_start = Instant::now();
4513        info!("startup: coordinator init: coordinator thread start beginning");
4514
4515        let session_id = catalog.config().session_id;
4516        let start_instant = catalog.config().start_instant;
4517
4518        // In order for the coordinator to support Rc and Refcell types, it cannot be
4519        // sent across threads. Spawn it in a thread and have this parent thread wait
4520        // for bootstrap completion before proceeding.
4521        let (bootstrap_tx, bootstrap_rx) = oneshot::channel();
4522        let handle = TokioHandle::current();
4523
4524        let metrics = Metrics::register_into(&metrics_registry);
4525        let metrics_clone = metrics.clone();
4526        let optimizer_metrics = OptimizerMetrics::register_into(
4527            &metrics_registry,
4528            catalog.system_config().optimizer_e2e_latency_warning_threshold(),
4529        );
4530        let segment_client_clone = segment_client.clone();
4531        let coord_now = now.clone();
4532        let advance_timelines_interval =
4533            tokio::time::interval(catalog.system_config().default_timestamp_interval());
4534        let mut check_scheduling_policies_interval = tokio::time::interval(
4535            catalog
4536                .system_config()
4537                .cluster_check_scheduling_policies_interval(),
4538        );
4539        check_scheduling_policies_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
4540
4541        let clusters_caught_up_check_interval = if read_only_controllers {
4542            let dyncfgs = catalog.system_config().dyncfgs();
4543            let interval = WITH_0DT_DEPLOYMENT_CAUGHT_UP_CHECK_INTERVAL.get(dyncfgs);
4544
4545            let mut interval = tokio::time::interval(interval);
4546            interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
4547            interval
4548        } else {
4549            // When not in read-only mode, we don't do hydration checks. But we
4550            // still have to provide _some_ interval. This is large enough that
4551            // it doesn't matter.
4552            //
4553            // TODO(aljoscha): We cannot use Duration::MAX right now because of
4554            // https://github.com/tokio-rs/tokio/issues/6634. Use that once it's
4555            // fixed for good.
4556            let mut interval = tokio::time::interval(Duration::from_secs(60 * 60));
4557            interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
4558            interval
4559        };
4560
4561        let clusters_caught_up_check =
4562            clusters_caught_up_trigger.map(|trigger| {
4563                let mut exclude_collections: BTreeSet<GlobalId> =
4564                    new_builtin_collections.into_iter().collect();
4565
4566                // Migrated MVs can't make progress in read-only mode. Exclude them and all their
4567                // transitive dependents.
4568                //
4569                // TODO: Consider sending `allow_writes` for the dataflows of migrated MVs, which
4570                //       would allow them to make progress even in read-only mode. This doesn't
4571                //       work for MVs based on `mz_catalog_raw`, if the leader's version is less
4572                //       than v26.17, since before that version the catalog shard's frontier wasn't
4573                //       kept up-to-date with the current time. So this workaround has to remain in
4574                //       place upgrades from a version less than v26.17 are no longer supported.
4575                let mut todo: Vec<_> = migrated_storage_collections_0dt
4576                    .iter()
4577                    .filter(|id| {
4578                        catalog.state().get_entry(id).is_materialized_view()
4579                    })
4580                    .copied()
4581                    .collect();
4582                while let Some(item_id) = todo.pop() {
4583                    let entry = catalog.state().get_entry(&item_id);
4584                    exclude_collections.extend(entry.global_ids());
4585                    todo.extend_from_slice(entry.used_by());
4586                }
4587
4588                CaughtUpCheckContext {
4589                    trigger,
4590                    exclude_collections,
4591                }
4592            });
4593
4594        if let Some(TimestampOracleConfig::Postgres(pg_config)) =
4595            timestamp_oracle_config.as_ref()
4596        {
4597            // Apply settings from system vars as early as possible because some
4598            // of them are locked in right when an oracle is first opened!
4599            let pg_timestamp_oracle_params =
4600                flags::timestamp_oracle_config(catalog.system_config());
4601            pg_timestamp_oracle_params.apply(pg_config);
4602        }
4603
4604        // Register a callback so whenever the MAX_CONNECTIONS or SUPERUSER_RESERVED_CONNECTIONS
4605        // system variables change, we update our connection limits.
4606        let connection_limit_callback: Arc<dyn Fn(&SystemVars) + Send + Sync> =
4607            Arc::new(move |system_vars: &SystemVars| {
4608                let limit: u64 = system_vars.max_connections().cast_into();
4609                let superuser_reserved: u64 =
4610                    system_vars.superuser_reserved_connections().cast_into();
4611
4612                // If superuser_reserved > max_connections, prefer max_connections.
4613                //
4614                // In this scenario all normal users would be locked out because all connections
4615                // would be reserved for superusers so complain if this is the case.
4616                let superuser_reserved = if superuser_reserved >= limit {
4617                    tracing::warn!(
4618                        "superuser_reserved ({superuser_reserved}) is greater than max connections ({limit})!"
4619                    );
4620                    limit
4621                } else {
4622                    superuser_reserved
4623                };
4624
4625                (connection_limit_callback)(limit, superuser_reserved);
4626            });
4627        catalog.system_config_mut().register_callback(
4628            &mz_sql::session::vars::MAX_CONNECTIONS,
4629            Arc::clone(&connection_limit_callback),
4630        );
4631        catalog.system_config_mut().register_callback(
4632            &mz_sql::session::vars::SUPERUSER_RESERVED_CONNECTIONS,
4633            connection_limit_callback,
4634        );
4635
4636        let (group_commit_tx, group_commit_rx) = appends::notifier();
4637
4638        let parent_span = tracing::Span::current();
4639        let thread = thread::Builder::new()
4640            // The Coordinator thread tends to keep a lot of data on its stack. To
4641            // prevent a stack overflow we allocate a stack three times as big as the default
4642            // stack.
4643            .stack_size(3 * stack::STACK_SIZE)
4644            .name("coordinator".to_string())
4645            .spawn(move || {
4646                let span = info_span!(parent: parent_span, "coord::coordinator").entered();
4647
4648                let controller = handle
4649                    .block_on({
4650                        catalog.initialize_controller(
4651                            controller_config,
4652                            controller_envd_epoch,
4653                            read_only_controllers,
4654                        )
4655                    })
4656                    .unwrap_or_terminate("failed to initialize storage_controller");
4657                // Initializing the controller uses one or more timestamps, so push the boot timestamp up to the
4658                // current catalog upper.
4659                let catalog_upper = handle.block_on(catalog.current_upper());
4660                boot_ts = std::cmp::max(boot_ts, catalog_upper);
4661                if !read_only_controllers {
4662                    let epoch_millis_oracle = &timestamp_oracles
4663                        .get(&Timeline::EpochMilliseconds)
4664                        .expect("inserted above")
4665                        .oracle;
4666                    handle.block_on(epoch_millis_oracle.apply_write(boot_ts));
4667                }
4668
4669                let catalog = Arc::new(catalog);
4670
4671                let caching_secrets_reader = CachingSecretsReader::new(secrets_controller.reader());
4672                let mut coord = Coordinator {
4673                    controller,
4674                    catalog,
4675                    internal_cmd_tx,
4676                    group_commit_tx,
4677                    strict_serializable_reads_tx,
4678                    global_timelines: timestamp_oracles,
4679                    transient_id_gen: Arc::new(TransientIdGen::new()),
4680                    active_conns: BTreeMap::new(),
4681                    txn_read_holds: Default::default(),
4682                    pending_peeks: BTreeMap::new(),
4683                    client_pending_peeks: BTreeMap::new(),
4684                    pending_linearize_read_txns: BTreeMap::new(),
4685                    serialized_ddl: LockedVecDeque::new(),
4686                    active_compute_sinks: BTreeMap::new(),
4687                    active_webhooks: BTreeMap::new(),
4688                    active_copies: BTreeMap::new(),
4689                    staged_cancellation: BTreeMap::new(),
4690                    introspection_subscribes: BTreeMap::new(),
4691                    write_locks: BTreeMap::new(),
4692                    deferred_write_ops: BTreeMap::new(),
4693                    pending_writes: Vec::new(),
4694                    advance_timelines_interval,
4695                    secrets_controller,
4696                    caching_secrets_reader,
4697                    cloud_resource_controller,
4698                    storage_usage_client,
4699                    storage_usage_collection_interval,
4700                    segment_client,
4701                    metrics,
4702                    optimizer_metrics,
4703                    tracing_handle,
4704                    statement_logging: StatementLogging::new(coord_now.clone()),
4705                    webhook_concurrency_limit,
4706                    timestamp_oracle_config,
4707                    check_cluster_scheduling_policies_interval: check_scheduling_policies_interval,
4708                    cluster_scheduling_decisions: BTreeMap::new(),
4709                    caught_up_check_interval: clusters_caught_up_check_interval,
4710                    caught_up_check: clusters_caught_up_check,
4711                    installed_watch_sets: BTreeMap::new(),
4712                    connection_watch_sets: BTreeMap::new(),
4713                    cluster_replica_statuses: ClusterReplicaStatuses::new(),
4714                    read_only_controllers,
4715                    buffered_builtin_table_updates: Some(Vec::new()),
4716                    license_key,
4717                    user_id_pool: IdPool::empty(),
4718                    persist_client,
4719                };
4720                let bootstrap = handle.block_on(async {
4721                    coord
4722                        .bootstrap(
4723                            boot_ts,
4724                            migrated_storage_collections_0dt,
4725                            builtin_table_updates,
4726                            cached_global_exprs,
4727                            uncached_local_exprs,
4728                            audit_logs_iterator,
4729                        )
4730                        .await?;
4731                    coord
4732                        .controller
4733                        .remove_orphaned_replicas(
4734                            coord.catalog().get_next_user_replica_id().await?,
4735                            coord.catalog().get_next_system_replica_id().await?,
4736                        )
4737                        .await
4738                        .map_err(AdapterError::Orchestrator)?;
4739
4740                    if let Some(retention_period) = storage_usage_retention_period {
4741                        coord
4742                            .prune_storage_usage_events_on_startup(retention_period)
4743                            .await;
4744                    }
4745
4746                    Ok(())
4747                });
4748                let ok = bootstrap.is_ok();
4749                drop(span);
4750                bootstrap_tx
4751                    .send(bootstrap)
4752                    .expect("bootstrap_rx is not dropped until it receives this message");
4753                if ok {
4754                    handle.block_on(coord.serve(
4755                        internal_cmd_rx,
4756                        strict_serializable_reads_rx,
4757                        cmd_rx,
4758                        group_commit_rx,
4759                    ));
4760                }
4761            })
4762            .expect("failed to create coordinator thread");
4763        match bootstrap_rx
4764            .await
4765            .expect("bootstrap_tx always sends a message or panics/halts")
4766        {
4767            Ok(()) => {
4768                info!(
4769                    "startup: coordinator init: coordinator thread start complete in {:?}",
4770                    coord_thread_start.elapsed()
4771                );
4772                info!(
4773                    "startup: coordinator init: complete in {:?}",
4774                    coord_start.elapsed()
4775                );
4776                let handle = Handle {
4777                    session_id,
4778                    start_instant,
4779                    _thread: thread.join_on_drop(),
4780                };
4781                let client = Client::new(
4782                    build_info,
4783                    cmd_tx,
4784                    metrics_clone,
4785                    now,
4786                    environment_id,
4787                    segment_client_clone,
4788                );
4789                Ok((handle, client))
4790            }
4791            Err(e) => Err(e),
4792        }
4793    }
4794    .boxed()
4795}
4796
4797// Determines and returns the highest timestamp for each timeline, for all known
4798// timestamp oracle implementations.
4799//
4800// Initially, we did this so that we can switch between implementations of
4801// timestamp oracle, but now we also do this to determine a monotonic boot
4802// timestamp, a timestamp that does not regress across reboots.
4803//
4804// This mostly works, but there can be linearizability violations, because there
4805// is no central moment where we do distributed coordination for all oracle
4806// types. Working around this seems prohibitively hard, maybe even impossible so
4807// we have to live with this window of potential violations during the upgrade
4808// window (which is the only point where we should switch oracle
4809// implementations).
4810async fn get_initial_oracle_timestamps(
4811    timestamp_oracle_config: &Option<TimestampOracleConfig>,
4812) -> Result<BTreeMap<Timeline, Timestamp>, AdapterError> {
4813    let mut initial_timestamps = BTreeMap::new();
4814
4815    if let Some(config) = timestamp_oracle_config {
4816        let oracle_timestamps = config.get_all_timelines().await?;
4817
4818        let debug_msg = || {
4819            oracle_timestamps
4820                .iter()
4821                .map(|(timeline, ts)| format!("{:?} -> {}", timeline, ts))
4822                .join(", ")
4823        };
4824        info!(
4825            "current timestamps from the timestamp oracle: {}",
4826            debug_msg()
4827        );
4828
4829        for (timeline, ts) in oracle_timestamps {
4830            let entry = initial_timestamps
4831                .entry(Timeline::from_str(&timeline).expect("could not parse timeline"));
4832
4833            entry
4834                .and_modify(|current_ts| *current_ts = std::cmp::max(*current_ts, ts))
4835                .or_insert(ts);
4836        }
4837    } else {
4838        info!("no timestamp oracle configured!");
4839    };
4840
4841    let debug_msg = || {
4842        initial_timestamps
4843            .iter()
4844            .map(|(timeline, ts)| format!("{:?}: {}", timeline, ts))
4845            .join(", ")
4846    };
4847    info!("initial oracle timestamps: {}", debug_msg());
4848
4849    Ok(initial_timestamps)
4850}
4851
4852#[instrument]
4853pub async fn load_remote_system_parameters(
4854    storage: &mut Box<dyn OpenableDurableCatalogState>,
4855    system_parameter_sync_config: Option<SystemParameterSyncConfig>,
4856    system_parameter_sync_timeout: Duration,
4857) -> Result<Option<BTreeMap<String, String>>, AdapterError> {
4858    if let Some(system_parameter_sync_config) = system_parameter_sync_config {
4859        tracing::info!("parameter sync on boot: start sync");
4860
4861        // We intentionally block initial startup, potentially forever,
4862        // on initializing LaunchDarkly. This may seem scary, but the
4863        // alternative is even scarier. Over time, we expect that the
4864        // compiled-in default values for the system parameters will
4865        // drift substantially from the defaults configured in
4866        // LaunchDarkly, to the point that starting an environment
4867        // without loading the latest values from LaunchDarkly will
4868        // result in running an untested configuration.
4869        //
4870        // Note this only applies during initial startup. Restarting
4871        // after we've synced once only blocks for a maximum of
4872        // `FRONTEND_SYNC_TIMEOUT` on LaunchDarkly, as it seems
4873        // reasonable to assume that the last-synced configuration was
4874        // valid enough.
4875        //
4876        // This philosophy appears to provide a good balance between not
4877        // running untested configurations in production while also not
4878        // making LaunchDarkly a "tier 1" dependency for existing
4879        // environments.
4880        //
4881        // If this proves to be an issue, we could seek to address the
4882        // configuration drift in a different way--for example, by
4883        // writing a script that runs in CI nightly and checks for
4884        // deviation between the compiled Rust code and LaunchDarkly.
4885        //
4886        // If it is absolutely necessary to bring up a new environment
4887        // while LaunchDarkly is down, the following manual mitigation
4888        // can be performed:
4889        //
4890        //    1. Edit the environmentd startup parameters to omit the
4891        //       LaunchDarkly configuration.
4892        //    2. Boot environmentd.
4893        //    3. Use the catalog-debug tool to run `edit config "{\"key\":\"system_config_synced\"}" "{\"value\": 1}"`.
4894        //    4. Adjust any other parameters as necessary to avoid
4895        //       running a nonstandard configuration in production.
4896        //    5. Edit the environmentd startup parameters to restore the
4897        //       LaunchDarkly configuration, for when LaunchDarkly comes
4898        //       back online.
4899        //    6. Reboot environmentd.
4900        let mut params = SynchronizedParameters::new(SystemVars::default());
4901        let frontend_sync = async {
4902            let frontend = SystemParameterFrontend::from(&system_parameter_sync_config).await?;
4903            frontend.pull(&mut params);
4904            let ops = params
4905                .modified()
4906                .into_iter()
4907                .map(|param| {
4908                    let name = param.name;
4909                    let value = param.value;
4910                    tracing::info!(name, value, initial = true, "sync parameter");
4911                    (name, value)
4912                })
4913                .collect();
4914            tracing::info!("parameter sync on boot: end sync");
4915            Ok(Some(ops))
4916        };
4917        if !storage.has_system_config_synced_once().await? {
4918            frontend_sync.await
4919        } else {
4920            match mz_ore::future::timeout(system_parameter_sync_timeout, frontend_sync).await {
4921                Ok(ops) => Ok(ops),
4922                Err(TimeoutError::Inner(e)) => Err(e),
4923                Err(TimeoutError::DeadlineElapsed) => {
4924                    tracing::info!("parameter sync on boot: sync has timed out");
4925                    Ok(None)
4926                }
4927            }
4928        }
4929    } else {
4930        Ok(None)
4931    }
4932}
4933
4934#[derive(Debug)]
4935pub enum WatchSetResponse {
4936    StatementDependenciesReady(StatementLoggingId, StatementLifecycleEvent),
4937    AlterSinkReady(AlterSinkReadyContext),
4938    AlterMaterializedViewReady(AlterMaterializedViewReadyContext),
4939}
4940
4941#[derive(Debug)]
4942pub struct AlterSinkReadyContext {
4943    ctx: Option<ExecuteContext>,
4944    otel_ctx: OpenTelemetryContext,
4945    plan: AlterSinkPlan,
4946    plan_validity: PlanValidity,
4947    read_hold: ReadHolds<Timestamp>,
4948}
4949
4950impl AlterSinkReadyContext {
4951    fn ctx(&mut self) -> &mut ExecuteContext {
4952        self.ctx.as_mut().expect("only cleared on drop")
4953    }
4954
4955    fn retire(mut self, result: Result<ExecuteResponse, AdapterError>) {
4956        self.ctx
4957            .take()
4958            .expect("only cleared on drop")
4959            .retire(result);
4960    }
4961}
4962
4963impl Drop for AlterSinkReadyContext {
4964    fn drop(&mut self) {
4965        if let Some(ctx) = self.ctx.take() {
4966            ctx.retire(Err(AdapterError::Canceled));
4967        }
4968    }
4969}
4970
4971#[derive(Debug)]
4972pub struct AlterMaterializedViewReadyContext {
4973    ctx: Option<ExecuteContext>,
4974    otel_ctx: OpenTelemetryContext,
4975    plan: plan::AlterMaterializedViewApplyReplacementPlan,
4976    plan_validity: PlanValidity,
4977}
4978
4979impl AlterMaterializedViewReadyContext {
4980    fn ctx(&mut self) -> &mut ExecuteContext {
4981        self.ctx.as_mut().expect("only cleared on drop")
4982    }
4983
4984    fn retire(mut self, result: Result<ExecuteResponse, AdapterError>) {
4985        self.ctx
4986            .take()
4987            .expect("only cleared on drop")
4988            .retire(result);
4989    }
4990}
4991
4992impl Drop for AlterMaterializedViewReadyContext {
4993    fn drop(&mut self) {
4994        if let Some(ctx) = self.ctx.take() {
4995            ctx.retire(Err(AdapterError::Canceled));
4996        }
4997    }
4998}
4999
5000/// A struct for tracking the ownership of a lock and a VecDeque to store to-be-done work after the
5001/// lock is freed.
5002#[derive(Debug)]
5003struct LockedVecDeque<T> {
5004    items: VecDeque<T>,
5005    lock: Arc<tokio::sync::Mutex<()>>,
5006}
5007
5008impl<T> LockedVecDeque<T> {
5009    pub fn new() -> Self {
5010        Self {
5011            items: VecDeque::new(),
5012            lock: Arc::new(tokio::sync::Mutex::new(())),
5013        }
5014    }
5015
5016    pub fn try_lock_owned(&self) -> Result<OwnedMutexGuard<()>, tokio::sync::TryLockError> {
5017        Arc::clone(&self.lock).try_lock_owned()
5018    }
5019
5020    pub fn is_empty(&self) -> bool {
5021        self.items.is_empty()
5022    }
5023
5024    pub fn push_back(&mut self, value: T) {
5025        self.items.push_back(value)
5026    }
5027
5028    pub fn pop_front(&mut self) -> Option<T> {
5029        self.items.pop_front()
5030    }
5031
5032    pub fn remove(&mut self, index: usize) -> Option<T> {
5033        self.items.remove(index)
5034    }
5035
5036    pub fn iter(&self) -> std::collections::vec_deque::Iter<'_, T> {
5037        self.items.iter()
5038    }
5039}
5040
5041#[derive(Debug)]
5042struct DeferredPlanStatement {
5043    ctx: ExecuteContext,
5044    ps: PlanStatement,
5045}
5046
5047#[derive(Debug)]
5048enum PlanStatement {
5049    Statement {
5050        stmt: Arc<Statement<Raw>>,
5051        params: Params,
5052    },
5053    Plan {
5054        plan: mz_sql::plan::Plan,
5055        resolved_ids: ResolvedIds,
5056    },
5057}
5058
5059#[derive(Debug, Error)]
5060pub enum NetworkPolicyError {
5061    #[error("Access denied for address {0}")]
5062    AddressDenied(IpAddr),
5063    #[error("Access denied missing IP address")]
5064    MissingIp,
5065}
5066
5067pub(crate) fn validate_ip_with_policy_rules(
5068    ip: &IpAddr,
5069    rules: &Vec<NetworkPolicyRule>,
5070) -> Result<(), NetworkPolicyError> {
5071    // At the moment we're not handling action or direction
5072    // as those are only able to be "allow" and "ingress" respectively
5073    if rules.iter().any(|r| r.address.0.contains(ip)) {
5074        Ok(())
5075    } else {
5076        Err(NetworkPolicyError::AddressDenied(ip.clone()))
5077    }
5078}
5079
5080pub(crate) fn infer_sql_type_for_catalog(
5081    hir_expr: &HirRelationExpr,
5082    mir_expr: &MirRelationExpr,
5083) -> SqlRelationType {
5084    let mut typ = hir_expr.top_level_typ();
5085    typ.backport_nullability_and_keys(&mir_expr.typ());
5086    typ
5087}
5088
5089#[cfg(test)]
5090mod id_pool_tests {
5091    use super::IdPool;
5092
5093    #[mz_ore::test]
5094    fn test_empty_pool() {
5095        let mut pool = IdPool::empty();
5096        assert_eq!(pool.remaining(), 0);
5097        assert_eq!(pool.allocate(), None);
5098        assert_eq!(pool.allocate_many(1), None);
5099    }
5100
5101    #[mz_ore::test]
5102    fn test_allocate_single() {
5103        let mut pool = IdPool::empty();
5104        pool.refill(10, 13);
5105        assert_eq!(pool.remaining(), 3);
5106        assert_eq!(pool.allocate(), Some(10));
5107        assert_eq!(pool.allocate(), Some(11));
5108        assert_eq!(pool.allocate(), Some(12));
5109        assert_eq!(pool.remaining(), 0);
5110        assert_eq!(pool.allocate(), None);
5111    }
5112
5113    #[mz_ore::test]
5114    fn test_allocate_many() {
5115        let mut pool = IdPool::empty();
5116        pool.refill(100, 105);
5117        assert_eq!(pool.allocate_many(3), Some(vec![100, 101, 102]));
5118        assert_eq!(pool.remaining(), 2);
5119        // Not enough remaining for 3 more.
5120        assert_eq!(pool.allocate_many(3), None);
5121        // But 2 works.
5122        assert_eq!(pool.allocate_many(2), Some(vec![103, 104]));
5123        assert_eq!(pool.remaining(), 0);
5124    }
5125
5126    #[mz_ore::test]
5127    fn test_allocate_many_zero() {
5128        let mut pool = IdPool::empty();
5129        pool.refill(1, 5);
5130        assert_eq!(pool.allocate_many(0), Some(vec![]));
5131        assert_eq!(pool.remaining(), 4);
5132    }
5133
5134    #[mz_ore::test]
5135    fn test_refill_resets_pool() {
5136        let mut pool = IdPool::empty();
5137        pool.refill(0, 2);
5138        assert_eq!(pool.allocate(), Some(0));
5139        // Refill before exhaustion replaces the range.
5140        pool.refill(50, 52);
5141        assert_eq!(pool.allocate(), Some(50));
5142        assert_eq!(pool.allocate(), Some(51));
5143        assert_eq!(pool.allocate(), None);
5144    }
5145
5146    #[mz_ore::test]
5147    fn test_mixed_allocate_and_allocate_many() {
5148        let mut pool = IdPool::empty();
5149        pool.refill(0, 10);
5150        assert_eq!(pool.allocate(), Some(0));
5151        assert_eq!(pool.allocate_many(3), Some(vec![1, 2, 3]));
5152        assert_eq!(pool.allocate(), Some(4));
5153        assert_eq!(pool.remaining(), 5);
5154    }
5155
5156    #[mz_ore::test]
5157    #[should_panic(expected = "invalid pool range")]
5158    fn test_refill_invalid_range_panics() {
5159        let mut pool = IdPool::empty();
5160        pool.refill(10, 5);
5161    }
5162}