1use std::borrow::Cow;
70use std::collections::{BTreeMap, BTreeSet, VecDeque};
71use std::net::IpAddr;
72use std::num::NonZeroI64;
73use std::ops::Neg;
74use std::str::FromStr;
75use std::sync::LazyLock;
76use std::sync::{Arc, Mutex};
77use std::thread;
78use std::time::{Duration, Instant};
79use std::{fmt, mem};
80
81use anyhow::Context;
82use chrono::{DateTime, Utc};
83use derivative::Derivative;
84use differential_dataflow::lattice::Lattice;
85use fail::fail_point;
86use futures::StreamExt;
87use futures::future::{BoxFuture, FutureExt, LocalBoxFuture};
88use http::Uri;
89use ipnet::IpNet;
90use itertools::Itertools;
91use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
92use mz_adapter_types::compaction::CompactionWindow;
93use mz_adapter_types::connection::ConnectionId;
94use mz_adapter_types::dyncfgs::{
95 ENABLE_SCOPED_SYSTEM_PARAMETERS, USER_ID_POOL_BATCH_SIZE,
96 WITH_0DT_DEPLOYMENT_CAUGHT_UP_CHECK_INTERVAL,
97};
98use mz_auth::password::Password;
99use mz_build_info::BuildInfo;
100use mz_catalog::builtin::{BUILTINS, BUILTINS_STATIC, MZ_AUDIT_EVENTS, MZ_STORAGE_USAGE_BY_SHARD};
101use mz_catalog::config::{AwsPrincipalContext, BuiltinItemMigrationConfig, ClusterReplicaSizeMap};
102use mz_catalog::durable::{AuditLogIterator, OpenableDurableCatalogState};
103use mz_catalog::expr_cache::{GlobalExpressions, LocalExpressions};
104use mz_catalog::memory::objects::{
105 CatalogEntry, CatalogItem, ClusterReplicaProcessStatus, ClusterVariantManaged, Connection,
106 DataSourceDesc, StateDiff, StateUpdate, StateUpdateKind, Table, TableDataSource,
107};
108use mz_cloud_resources::{CloudResourceController, VpcEndpointConfig, VpcEndpointEvent};
109use mz_compute_client::as_of_selection;
110use mz_compute_client::controller::error::{
111 CollectionLookupError, CollectionMissing, DataflowCreationError, InstanceMissing,
112};
113use mz_compute_types::ComputeInstanceId;
114use mz_compute_types::dataflows::DataflowDescription;
115use mz_compute_types::plan::Plan;
116use mz_controller::clusters::{
117 ClusterConfig, ClusterEvent, ClusterStatus, ProcessId, ReplicaLocation,
118};
119use mz_controller::{ControllerConfig, Readiness};
120use mz_controller_types::{ClusterId, ReplicaId, WatchSetId};
121use mz_dyncfg::ConfigUpdates;
122use mz_expr::{MapFilterProject, MirRelationExpr, OptimizedMirRelationExpr, RowSetFinishing};
123use mz_license_keys::{ExpirationBehavior, ValidatedLicenseKey};
124use mz_orchestrator::OfflineReason;
125use mz_ore::cast::{CastFrom, CastInto, CastLossy};
126use mz_ore::channel::trigger::Trigger;
127use mz_ore::future::TimeoutError;
128use mz_ore::metrics::MetricsRegistry;
129use mz_ore::now::{EpochMillis, NowFn};
130use mz_ore::task::{JoinHandle, spawn};
131use mz_ore::thread::JoinHandleExt;
132use mz_ore::tracing::{OpenTelemetryContext, TracingHandle};
133use mz_ore::url::SensitiveUrl;
134use mz_ore::{
135 assert_none, instrument, soft_assert_eq_or_log, soft_assert_or_log, soft_panic_or_log, stack,
136};
137use mz_persist_client::PersistClient;
138use mz_persist_client::batch::ProtoBatch;
139use mz_persist_client::usage::{ShardsUsageReferenced, StorageUsageClient};
140use mz_repr::adt::numeric::Numeric;
141use mz_repr::explain::{ExplainConfig, ExplainFormat};
142use mz_repr::global_id::TransientIdGen;
143use mz_repr::optimize::{OptimizerFeatureOverrides, OptimizerFeatures, OverrideFrom};
144use mz_repr::role_id::RoleId;
145use mz_repr::{CatalogItemId, Diff, GlobalId, RelationDesc, SqlRelationType, Timestamp};
146use mz_secrets::cache::CachingSecretsReader;
147use mz_secrets::{SecretsController, SecretsReader};
148use mz_sql::ast::{Raw, Statement};
149use mz_sql::catalog::{CatalogCluster, EnvironmentId};
150use mz_sql::names::{QualifiedItemName, ResolvedIds, SchemaSpecifier};
151use mz_sql::optimizer_metrics::OptimizerMetrics;
152use mz_sql::plan::{
153 self, AlterSinkPlan, ConnectionDetails, CreateConnectionPlan, HirRelationExpr,
154 NetworkPolicyRule, OnTimeoutAction, Params, QueryWhen,
155};
156use mz_sql::session::user::User;
157use mz_sql::session::vars::{MAX_CREDIT_CONSUMPTION_RATE, SystemVars, Var};
158use mz_sql_parser::ast::ExplainStage;
159use mz_sql_parser::ast::display::AstDisplay;
160use mz_storage_client::client::TableData;
161use mz_storage_client::controller::{CollectionDescription, DataSource, ExportDescription};
162use mz_storage_types::connections::Connection as StorageConnection;
163use mz_storage_types::connections::ConnectionContext;
164use mz_storage_types::connections::inline::{IntoInlineConnection, ReferencedConnection};
165use mz_storage_types::read_holds::ReadHold;
166use mz_storage_types::sinks::{S3SinkFormat, StorageSinkDesc};
167use mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC;
168use mz_storage_types::sources::{IngestionDescription, SourceExport, Timeline};
169use mz_timestamp_oracle::{TimestampOracleConfig, WriteTimestamp};
170use mz_transform::dataflow::DataflowMetainfo;
171use opentelemetry::trace::TraceContextExt;
172use serde::Serialize;
173use thiserror::Error;
174use timely::progress::{Antichain, Timestamp as _};
175use tokio::runtime::Handle as TokioHandle;
176use tokio::select;
177use tokio::sync::{OwnedMutexGuard, mpsc, oneshot, watch};
178use tokio::time::{Interval, MissedTickBehavior};
179use tracing::{Instrument, Level, Span, debug, info, info_span, span, warn};
180use tracing_opentelemetry::OpenTelemetrySpanExt;
181use uuid::Uuid;
182
183use crate::active_compute_sink::{ActiveComputeSink, ActiveCopyFrom};
184use crate::catalog::{BuiltinTableUpdate, Catalog, OpenCatalogResult};
185use crate::client::{Client, Handle};
186use crate::command::{Command, ExecuteResponse};
187use crate::config::{
188 ScopedParameters, ScopedParametersScope, SynchronizedParameters, SystemParameterFrontend,
189 SystemParameterSyncConfig, evaluate_scoped_parameters,
190};
191use crate::coord::appends::{
192 BuiltinTableAppendNotify, DeferredOp, GroupCommitPermit, PendingWriteTxn,
193};
194use crate::coord::caught_up::CaughtUpCheckContext;
195use crate::coord::cluster_scheduling::SchedulingDecision;
196use crate::coord::id_bundle::CollectionIdBundle;
197use crate::coord::introspection::IntrospectionSubscribe;
198use crate::coord::peek::PendingPeek;
199use crate::coord::statement_logging::StatementLogging;
200use crate::coord::timeline::{TimelineContext, TimelineState};
201use crate::coord::timestamp_selection::{TimestampContext, TimestampDetermination};
202use crate::coord::validity::PlanValidity;
203use crate::error::AdapterError;
204use crate::explain::insights::PlanInsightsContext;
205use crate::explain::optimizer_trace::{DispatchGuard, OptimizerTrace};
206use crate::metrics::Metrics;
207use crate::optimize::dataflows::{ComputeInstanceSnapshot, DataflowBuilder};
208use crate::optimize::{self, Optimize, OptimizerConfig};
209use crate::session::{EndTransactionAction, Session};
210use crate::statement_logging::{
211 StatementEndedExecutionReason, StatementLifecycleEvent, StatementLoggingId,
212};
213use crate::util::{ClientTransmitter, ResultExt, sort_topological};
214use crate::webhook::{WebhookAppenderInvalidator, WebhookConcurrencyLimiter};
215use crate::{AdapterNotice, ReadHolds, flags};
216
217pub(crate) mod appends;
218pub(crate) mod catalog_serving;
219pub(crate) mod cluster_scheduling;
220pub(crate) mod consistency;
221pub(crate) mod id_bundle;
222pub(crate) mod in_memory_oracle;
223pub(crate) mod peek;
224pub(crate) mod read_policy;
225pub(crate) mod read_then_write;
226pub(crate) mod sequencer;
227pub(crate) mod statement_logging;
228pub(crate) mod timeline;
229pub(crate) mod timestamp_selection;
230
231pub mod catalog_implications;
232mod caught_up;
233mod command_handler;
234mod ddl;
235pub(crate) mod group_sync;
236mod indexes;
237mod info_metrics;
238mod introspection;
239mod message_handler;
240mod privatelink_status;
241mod sql;
242mod validity;
243
244#[derive(Debug)]
270pub(crate) struct IdPool {
271 next: u64,
272 upper: u64,
273}
274
275impl IdPool {
276 pub fn empty() -> Self {
278 IdPool { next: 0, upper: 0 }
279 }
280
281 pub fn allocate(&mut self) -> Option<u64> {
283 if self.next < self.upper {
284 let id = self.next;
285 self.next += 1;
286 Some(id)
287 } else {
288 None
289 }
290 }
291
292 pub fn allocate_many(&mut self, n: u64) -> Option<Vec<u64>> {
295 if self.remaining() >= n {
296 let ids = (self.next..self.next + n).collect();
297 self.next += n;
298 Some(ids)
299 } else {
300 None
301 }
302 }
303
304 pub fn remaining(&self) -> u64 {
306 self.upper - self.next
307 }
308
309 pub fn refill(&mut self, next: u64, upper: u64) {
311 assert!(next <= upper, "invalid pool range: {next}..{upper}");
312 self.next = next;
313 self.upper = upper;
314 }
315}
316
317#[derive(Debug)]
318pub enum Message {
319 Command(OpenTelemetryContext, Command),
320 ControllerReady {
321 controller: ControllerReadiness,
322 },
323 PurifiedStatementReady(PurifiedStatementReady),
324 CreateConnectionValidationReady(CreateConnectionValidationReady),
325 AlterConnectionValidationReady(AlterConnectionValidationReady),
326 TryDeferred {
327 conn_id: ConnectionId,
329 acquired_lock: Option<(CatalogItemId, tokio::sync::OwnedMutexGuard<()>)>,
339 },
340 GroupCommitInitiate(Span, Option<GroupCommitPermit>),
342 DeferredStatementReady,
343 AdvanceTimelines,
344 ClusterEvent(ClusterEvent),
345 CancelPendingPeeks {
346 conn_id: ConnectionId,
347 },
348 LinearizeReads,
349 StagedBatches {
350 conn_id: ConnectionId,
351 table_id: CatalogItemId,
352 batches: Vec<Result<ProtoBatch, String>>,
353 },
354 StorageUsageSchedule,
355 StorageUsageFetch,
356 StorageUsageUpdate(ShardsUsageReferenced),
357 StorageUsagePrune(Vec<BuiltinTableUpdate>),
358 ArrangementSizesSchedule,
359 ArrangementSizesSnapshot,
360 ArrangementSizesPrune(Vec<BuiltinTableUpdate>),
361 RetireExecute {
364 data: ExecuteContextExtra,
365 otel_ctx: OpenTelemetryContext,
366 reason: StatementEndedExecutionReason,
367 },
368 ExecuteSingleStatementTransaction {
369 ctx: ExecuteContext,
370 otel_ctx: OpenTelemetryContext,
371 stmt: Arc<Statement<Raw>>,
372 params: mz_sql::plan::Params,
373 },
374 PeekStageReady {
375 ctx: ExecuteContext,
376 span: Span,
377 stage: PeekStage,
378 },
379 CreateIndexStageReady {
380 ctx: ExecuteContext,
381 span: Span,
382 stage: CreateIndexStage,
383 },
384 CreateViewStageReady {
385 ctx: ExecuteContext,
386 span: Span,
387 stage: CreateViewStage,
388 },
389 CreateMaterializedViewStageReady {
390 ctx: ExecuteContext,
391 span: Span,
392 stage: CreateMaterializedViewStage,
393 },
394 SubscribeStageReady {
395 ctx: ExecuteContext,
396 span: Span,
397 stage: SubscribeStage,
398 },
399 IntrospectionSubscribeStageReady {
400 span: Span,
401 stage: IntrospectionSubscribeStage,
402 },
403 SecretStageReady {
404 ctx: ExecuteContext,
405 span: Span,
406 stage: SecretStage,
407 },
408 ClusterStageReady {
409 ctx: ExecuteContext,
410 span: Span,
411 stage: ClusterStage,
412 },
413 ExplainTimestampStageReady {
414 ctx: ExecuteContext,
415 span: Span,
416 stage: ExplainTimestampStage,
417 },
418 DrainStatementLog,
419 PrivateLinkVpcEndpointEvents(Vec<VpcEndpointEvent>),
420 CheckSchedulingPolicies,
421
422 SchedulingDecisions(Vec<(&'static str, Vec<(ClusterId, SchedulingDecision)>)>),
427}
428
429impl Message {
430 pub const fn kind(&self) -> &'static str {
432 match self {
433 Message::Command(_, msg) => match msg {
434 Command::CatalogSnapshot { .. } => "command-catalog_snapshot",
435 Command::Startup { .. } => "command-startup",
436 Command::Execute { .. } => "command-execute",
437 Command::Commit { .. } => "command-commit",
438 Command::CancelRequest { .. } => "command-cancel_request",
439 Command::PrivilegedCancelRequest { .. } => "command-privileged_cancel_request",
440 Command::GetWebhook { .. } => "command-get_webhook",
441 Command::GetSystemVars { .. } => "command-get_system_vars",
442 Command::SetSystemVars { .. } => "command-set_system_vars",
443 Command::UpdateScopedSystemParameters { .. } => {
444 "command-update_scoped_system_parameters"
445 }
446 Command::InstallScopedSystemParameterFrontend { .. } => {
447 "command-install_scoped_system_parameter_frontend"
448 }
449 Command::Terminate { .. } => "command-terminate",
450 Command::RetireExecute { .. } => "command-retire_execute",
451 Command::CheckConsistency { .. } => "command-check_consistency",
452 Command::Dump { .. } => "command-dump",
453 Command::AuthenticatePassword { .. } => "command-auth_check",
454 Command::AuthenticateGetSASLChallenge { .. } => "command-auth_get_sasl_challenge",
455 Command::AuthenticateVerifySASLProof { .. } => "command-auth_verify_sasl_proof",
456 Command::CheckRoleCanLogin { .. } => "command-check_role_can_login",
457 Command::GetComputeInstanceClient { .. } => "get-compute-instance-client",
458 Command::GetOracle { .. } => "get-oracle",
459 Command::DetermineRealTimeRecentTimestamp { .. } => {
460 "determine-real-time-recent-timestamp"
461 }
462 Command::GetTransactionReadHoldsBundle { .. } => {
463 "get-transaction-read-holds-bundle"
464 }
465 Command::StoreTransactionReadHolds { .. } => "store-transaction-read-holds",
466 Command::ExecuteSlowPathPeek { .. } => "execute-slow-path-peek",
467 Command::ExecuteSubscribe { .. } => "execute-subscribe",
468 Command::CopyToPreflight { .. } => "copy-to-preflight",
469 Command::ExecuteCopyTo { .. } => "execute-copy-to",
470 Command::ExecuteSideEffectingFunc { .. } => "execute-side-effecting-func",
471 Command::RegisterFrontendPeek { .. } => "register-frontend-peek",
472 Command::UnregisterFrontendPeek { .. } => "unregister-frontend-peek",
473 Command::ExplainTimestamp { .. } => "explain-timestamp",
474 Command::FrontendStatementLogging(..) => "frontend-statement-logging",
475 Command::StartCopyFromStdin { .. } => "start-copy-from-stdin",
476 Command::InjectAuditEvents { .. } => "inject-audit-events",
477 },
478 Message::ControllerReady {
479 controller: ControllerReadiness::Compute,
480 } => "controller_ready(compute)",
481 Message::ControllerReady {
482 controller: ControllerReadiness::Storage,
483 } => "controller_ready(storage)",
484 Message::ControllerReady {
485 controller: ControllerReadiness::Metrics,
486 } => "controller_ready(metrics)",
487 Message::ControllerReady {
488 controller: ControllerReadiness::Internal,
489 } => "controller_ready(internal)",
490 Message::PurifiedStatementReady(_) => "purified_statement_ready",
491 Message::CreateConnectionValidationReady(_) => "create_connection_validation_ready",
492 Message::TryDeferred { .. } => "try_deferred",
493 Message::GroupCommitInitiate(..) => "group_commit_initiate",
494 Message::AdvanceTimelines => "advance_timelines",
495 Message::ClusterEvent(_) => "cluster_event",
496 Message::CancelPendingPeeks { .. } => "cancel_pending_peeks",
497 Message::LinearizeReads => "linearize_reads",
498 Message::StagedBatches { .. } => "staged_batches",
499 Message::StorageUsageSchedule => "storage_usage_schedule",
500 Message::StorageUsageFetch => "storage_usage_fetch",
501 Message::StorageUsageUpdate(_) => "storage_usage_update",
502 Message::StorageUsagePrune(_) => "storage_usage_prune",
503 Message::ArrangementSizesSchedule => "arrangement_sizes_schedule",
504 Message::ArrangementSizesSnapshot => "arrangement_sizes_snapshot",
505 Message::ArrangementSizesPrune(_) => "arrangement_sizes_prune",
506 Message::RetireExecute { .. } => "retire_execute",
507 Message::ExecuteSingleStatementTransaction { .. } => {
508 "execute_single_statement_transaction"
509 }
510 Message::PeekStageReady { .. } => "peek_stage_ready",
511 Message::ExplainTimestampStageReady { .. } => "explain_timestamp_stage_ready",
512 Message::CreateIndexStageReady { .. } => "create_index_stage_ready",
513 Message::CreateViewStageReady { .. } => "create_view_stage_ready",
514 Message::CreateMaterializedViewStageReady { .. } => {
515 "create_materialized_view_stage_ready"
516 }
517 Message::SubscribeStageReady { .. } => "subscribe_stage_ready",
518 Message::IntrospectionSubscribeStageReady { .. } => {
519 "introspection_subscribe_stage_ready"
520 }
521 Message::SecretStageReady { .. } => "secret_stage_ready",
522 Message::ClusterStageReady { .. } => "cluster_stage_ready",
523 Message::DrainStatementLog => "drain_statement_log",
524 Message::AlterConnectionValidationReady(..) => "alter_connection_validation_ready",
525 Message::PrivateLinkVpcEndpointEvents(_) => "private_link_vpc_endpoint_events",
526 Message::CheckSchedulingPolicies => "check_scheduling_policies",
527 Message::SchedulingDecisions { .. } => "scheduling_decision",
528 Message::DeferredStatementReady => "deferred_statement_ready",
529 }
530 }
531}
532
533#[derive(Debug)]
535pub enum ControllerReadiness {
536 Storage,
538 Compute,
540 Metrics,
542 Internal,
544}
545
546#[derive(Derivative)]
547#[derivative(Debug)]
548pub struct BackgroundWorkResult<T> {
549 #[derivative(Debug = "ignore")]
550 pub ctx: ExecuteContext,
551 pub result: Result<T, AdapterError>,
552 pub params: Params,
553 pub plan_validity: PlanValidity,
554 pub original_stmt: Arc<Statement<Raw>>,
555 pub otel_ctx: OpenTelemetryContext,
556}
557
558pub type PurifiedStatementReady = BackgroundWorkResult<mz_sql::pure::PurifiedStatement>;
559
560#[derive(Derivative)]
561#[derivative(Debug)]
562pub struct ValidationReady<T> {
563 #[derivative(Debug = "ignore")]
564 pub ctx: ExecuteContext,
565 pub result: Result<T, AdapterError>,
566 pub resolved_ids: ResolvedIds,
567 pub connection_id: CatalogItemId,
568 pub connection_gid: GlobalId,
569 pub plan_validity: PlanValidity,
570 pub otel_ctx: OpenTelemetryContext,
571}
572
573pub type CreateConnectionValidationReady = ValidationReady<CreateConnectionPlan>;
574pub type AlterConnectionValidationReady = ValidationReady<Connection>;
575
576#[derive(Debug)]
577pub enum PeekStage {
578 LinearizeTimestamp(PeekStageLinearizeTimestamp),
580 RealTimeRecency(PeekStageRealTimeRecency),
581 TimestampReadHold(PeekStageTimestampReadHold),
582 Optimize(PeekStageOptimize),
583 Finish(PeekStageFinish),
585 ExplainPlan(PeekStageExplainPlan),
587 ExplainPushdown(PeekStageExplainPushdown),
588 CopyToPreflight(PeekStageCopyTo),
590 CopyToDataflow(PeekStageCopyTo),
592}
593
594#[derive(Debug)]
595pub struct CopyToContext {
596 pub desc: RelationDesc,
598 pub uri: Uri,
600 pub connection: StorageConnection<ReferencedConnection>,
602 pub connection_id: CatalogItemId,
604 pub format: S3SinkFormat,
606 pub max_file_size: u64,
608 pub output_batch_count: Option<u64>,
613}
614
615#[derive(Debug)]
616pub struct PeekStageLinearizeTimestamp {
617 validity: PlanValidity,
618 plan: mz_sql::plan::SelectPlan,
619 max_query_result_size: Option<u64>,
620 source_ids: BTreeSet<GlobalId>,
621 target_replica: Option<ReplicaId>,
622 timeline_context: TimelineContext,
623 optimizer: optimize::PeekOptimizer,
624 explain_ctx: ExplainContext,
627}
628
629#[derive(Debug)]
630pub struct PeekStageRealTimeRecency {
631 validity: PlanValidity,
632 plan: mz_sql::plan::SelectPlan,
633 max_query_result_size: Option<u64>,
634 source_ids: BTreeSet<GlobalId>,
635 target_replica: Option<ReplicaId>,
636 timeline_context: TimelineContext,
637 oracle_read_ts: Option<Timestamp>,
638 optimizer: optimize::PeekOptimizer,
639 explain_ctx: ExplainContext,
642}
643
644#[derive(Debug)]
645pub struct PeekStageTimestampReadHold {
646 validity: PlanValidity,
647 plan: mz_sql::plan::SelectPlan,
648 max_query_result_size: Option<u64>,
649 source_ids: BTreeSet<GlobalId>,
650 target_replica: Option<ReplicaId>,
651 timeline_context: TimelineContext,
652 oracle_read_ts: Option<Timestamp>,
653 real_time_recency_ts: Option<mz_repr::Timestamp>,
654 optimizer: optimize::PeekOptimizer,
655 explain_ctx: ExplainContext,
658}
659
660#[derive(Debug)]
661pub struct PeekStageOptimize {
662 validity: PlanValidity,
663 plan: mz_sql::plan::SelectPlan,
664 max_query_result_size: Option<u64>,
665 source_ids: BTreeSet<GlobalId>,
666 id_bundle: CollectionIdBundle,
667 target_replica: Option<ReplicaId>,
668 determination: TimestampDetermination,
669 optimizer: optimize::PeekOptimizer,
670 explain_ctx: ExplainContext,
673}
674
675#[derive(Debug)]
676pub struct PeekStageFinish {
677 validity: PlanValidity,
678 plan: mz_sql::plan::SelectPlan,
679 max_query_result_size: Option<u64>,
680 id_bundle: CollectionIdBundle,
681 target_replica: Option<ReplicaId>,
682 source_ids: BTreeSet<GlobalId>,
683 determination: TimestampDetermination,
684 cluster_id: ComputeInstanceId,
685 finishing: RowSetFinishing,
686 plan_insights_optimizer_trace: Option<OptimizerTrace>,
689 insights_ctx: Option<Box<PlanInsightsContext>>,
690 global_lir_plan: optimize::peek::GlobalLirPlan,
691 optimization_finished_at: EpochMillis,
692}
693
694#[derive(Debug)]
695pub struct PeekStageCopyTo {
696 validity: PlanValidity,
697 optimizer: optimize::copy_to::Optimizer,
698 global_lir_plan: optimize::copy_to::GlobalLirPlan,
699 optimization_finished_at: EpochMillis,
700 source_ids: BTreeSet<GlobalId>,
701}
702
703#[derive(Debug)]
704pub struct PeekStageExplainPlan {
705 validity: PlanValidity,
706 optimizer: optimize::peek::Optimizer,
707 df_meta: DataflowMetainfo,
708 explain_ctx: ExplainPlanContext,
709 insights_ctx: Option<Box<PlanInsightsContext>>,
710}
711
712#[derive(Debug)]
713pub struct PeekStageExplainPushdown {
714 validity: PlanValidity,
715 determination: TimestampDetermination,
716 imports: BTreeMap<GlobalId, MapFilterProject>,
717}
718
719#[derive(Debug)]
720pub enum CreateIndexStage {
721 Optimize(CreateIndexOptimize),
722 Finish(CreateIndexFinish),
723 Explain(CreateIndexExplain),
724}
725
726#[derive(Debug)]
727pub struct CreateIndexOptimize {
728 validity: PlanValidity,
729 plan: plan::CreateIndexPlan,
730 resolved_ids: ResolvedIds,
731 explain_ctx: ExplainContext,
734}
735
736#[derive(Debug)]
737pub struct CreateIndexFinish {
738 validity: PlanValidity,
739 item_id: CatalogItemId,
740 global_id: GlobalId,
741 plan: plan::CreateIndexPlan,
742 resolved_ids: ResolvedIds,
743 global_mir_plan: optimize::index::GlobalMirPlan,
744 global_lir_plan: optimize::index::GlobalLirPlan,
745 optimizer_features: OptimizerFeatures,
746}
747
748#[derive(Debug)]
749pub struct CreateIndexExplain {
750 validity: PlanValidity,
751 exported_index_id: GlobalId,
752 plan: plan::CreateIndexPlan,
753 df_meta: DataflowMetainfo,
754 explain_ctx: ExplainPlanContext,
755}
756
757#[derive(Debug)]
758pub enum CreateViewStage {
759 Optimize(CreateViewOptimize),
760 Finish(CreateViewFinish),
761 Explain(CreateViewExplain),
762}
763
764#[derive(Debug)]
765pub struct CreateViewOptimize {
766 validity: PlanValidity,
767 plan: plan::CreateViewPlan,
768 resolved_ids: ResolvedIds,
769 explain_ctx: ExplainContext,
772}
773
774#[derive(Debug)]
775pub struct CreateViewFinish {
776 validity: PlanValidity,
777 item_id: CatalogItemId,
779 global_id: GlobalId,
781 plan: plan::CreateViewPlan,
782 resolved_ids: ResolvedIds,
784 optimized_expr: OptimizedMirRelationExpr,
785}
786
787#[derive(Debug)]
788pub struct CreateViewExplain {
789 validity: PlanValidity,
790 id: GlobalId,
791 plan: plan::CreateViewPlan,
792 explain_ctx: ExplainPlanContext,
793}
794
795#[derive(Debug)]
796pub enum ExplainTimestampStage {
797 Optimize(ExplainTimestampOptimize),
798 RealTimeRecency(ExplainTimestampRealTimeRecency),
799 Finish(ExplainTimestampFinish),
800}
801
802#[derive(Debug)]
803pub struct ExplainTimestampOptimize {
804 validity: PlanValidity,
805 plan: plan::ExplainTimestampPlan,
806 cluster_id: ClusterId,
807}
808
809#[derive(Debug)]
810pub struct ExplainTimestampRealTimeRecency {
811 validity: PlanValidity,
812 format: ExplainFormat,
813 optimized_plan: OptimizedMirRelationExpr,
814 cluster_id: ClusterId,
815 when: QueryWhen,
816}
817
818#[derive(Debug)]
819pub struct ExplainTimestampFinish {
820 validity: PlanValidity,
821 format: ExplainFormat,
822 optimized_plan: OptimizedMirRelationExpr,
823 cluster_id: ClusterId,
824 source_ids: BTreeSet<GlobalId>,
825 when: QueryWhen,
826 real_time_recency_ts: Option<Timestamp>,
827}
828
829#[derive(Debug)]
830pub enum ClusterStage {
831 Alter(AlterCluster),
832 WaitForHydrated(AlterClusterWaitForHydrated),
833 Finalize(AlterClusterFinalize),
834}
835
836#[derive(Debug)]
837pub struct AlterCluster {
838 validity: PlanValidity,
839 plan: plan::AlterClusterPlan,
840}
841
842#[derive(Debug)]
843pub struct AlterClusterWaitForHydrated {
844 validity: PlanValidity,
845 plan: plan::AlterClusterPlan,
846 new_config: ClusterVariantManaged,
847 workload_class: Option<String>,
848 timeout_time: Instant,
849 on_timeout: OnTimeoutAction,
850}
851
852#[derive(Debug)]
853pub struct AlterClusterFinalize {
854 validity: PlanValidity,
855 plan: plan::AlterClusterPlan,
856 new_config: ClusterVariantManaged,
857 workload_class: Option<String>,
858}
859
860#[derive(Debug)]
861pub enum ExplainContext {
862 None,
864 Plan(ExplainPlanContext),
866 PlanInsightsNotice(OptimizerTrace),
869 Pushdown,
871}
872
873impl ExplainContext {
874 pub(crate) fn dispatch_guard(&self) -> Option<DispatchGuard<'_>> {
878 let optimizer_trace = match self {
879 ExplainContext::Plan(explain_ctx) => Some(&explain_ctx.optimizer_trace),
880 ExplainContext::PlanInsightsNotice(optimizer_trace) => Some(optimizer_trace),
881 _ => None,
882 };
883 optimizer_trace.map(|optimizer_trace| optimizer_trace.as_guard())
884 }
885
886 pub(crate) fn needs_cluster(&self) -> bool {
887 match self {
888 ExplainContext::None => true,
889 ExplainContext::Plan(..) => false,
890 ExplainContext::PlanInsightsNotice(..) => true,
891 ExplainContext::Pushdown => false,
892 }
893 }
894
895 pub(crate) fn needs_plan_insights(&self) -> bool {
896 matches!(
897 self,
898 ExplainContext::Plan(ExplainPlanContext {
899 stage: ExplainStage::PlanInsights,
900 ..
901 }) | ExplainContext::PlanInsightsNotice(_)
902 )
903 }
904}
905
906#[derive(Debug)]
907pub struct ExplainPlanContext {
908 pub broken: bool,
913 pub config: ExplainConfig,
914 pub format: ExplainFormat,
915 pub stage: ExplainStage,
916 pub replan: Option<GlobalId>,
917 pub desc: Option<RelationDesc>,
918 pub optimizer_trace: OptimizerTrace,
919}
920
921#[derive(Debug)]
922pub enum CreateMaterializedViewStage {
923 Optimize(CreateMaterializedViewOptimize),
924 Finish(CreateMaterializedViewFinish),
925 Explain(CreateMaterializedViewExplain),
926}
927
928#[derive(Debug)]
929pub struct CreateMaterializedViewOptimize {
930 validity: PlanValidity,
931 plan: plan::CreateMaterializedViewPlan,
932 resolved_ids: ResolvedIds,
933 explain_ctx: ExplainContext,
936}
937
938#[derive(Debug)]
939pub struct CreateMaterializedViewFinish {
940 item_id: CatalogItemId,
942 global_id: GlobalId,
944 validity: PlanValidity,
945 plan: plan::CreateMaterializedViewPlan,
946 resolved_ids: ResolvedIds,
947 local_mir_plan: optimize::materialized_view::LocalMirPlan,
948 global_mir_plan: optimize::materialized_view::GlobalMirPlan,
949 global_lir_plan: optimize::materialized_view::GlobalLirPlan,
950 optimizer_features: OptimizerFeatures,
951}
952
953#[derive(Debug)]
954pub struct CreateMaterializedViewExplain {
955 global_id: GlobalId,
956 validity: PlanValidity,
957 plan: plan::CreateMaterializedViewPlan,
958 df_meta: DataflowMetainfo,
959 explain_ctx: ExplainPlanContext,
960}
961
962#[derive(Debug)]
963pub enum SubscribeStage {
964 OptimizeMir(SubscribeOptimizeMir),
965 TimestampOptimizeLir(SubscribeTimestampOptimizeLir),
966 Finish(SubscribeFinish),
967 Explain(SubscribeExplain),
968}
969
970#[derive(Debug)]
971pub struct SubscribeOptimizeMir {
972 validity: PlanValidity,
973 plan: plan::SubscribePlan,
974 timeline: TimelineContext,
975 dependency_ids: BTreeSet<GlobalId>,
976 cluster_id: ComputeInstanceId,
977 replica_id: Option<ReplicaId>,
978 explain_ctx: ExplainContext,
981}
982
983#[derive(Debug)]
984pub struct SubscribeTimestampOptimizeLir {
985 validity: PlanValidity,
986 plan: plan::SubscribePlan,
987 timeline: TimelineContext,
988 optimizer: optimize::subscribe::Optimizer,
989 global_mir_plan: optimize::subscribe::GlobalMirPlan<optimize::subscribe::Unresolved>,
990 dependency_ids: BTreeSet<GlobalId>,
991 replica_id: Option<ReplicaId>,
992 explain_ctx: ExplainContext,
995}
996
997#[derive(Debug)]
998pub struct SubscribeFinish {
999 validity: PlanValidity,
1000 cluster_id: ComputeInstanceId,
1001 replica_id: Option<ReplicaId>,
1002 plan: plan::SubscribePlan,
1003 global_lir_plan: optimize::subscribe::GlobalLirPlan,
1004 dependency_ids: BTreeSet<GlobalId>,
1005}
1006
1007#[derive(Debug)]
1008pub struct SubscribeExplain {
1009 validity: PlanValidity,
1010 optimizer: optimize::subscribe::Optimizer,
1011 df_meta: DataflowMetainfo,
1012 cluster_id: ComputeInstanceId,
1013 explain_ctx: ExplainPlanContext,
1014}
1015
1016#[derive(Debug)]
1017pub enum IntrospectionSubscribeStage {
1018 OptimizeMir(IntrospectionSubscribeOptimizeMir),
1019 TimestampOptimizeLir(IntrospectionSubscribeTimestampOptimizeLir),
1020 Finish(IntrospectionSubscribeFinish),
1021}
1022
1023#[derive(Debug)]
1024pub struct IntrospectionSubscribeOptimizeMir {
1025 validity: PlanValidity,
1026 plan: plan::SubscribePlan,
1027 subscribe_id: GlobalId,
1028 cluster_id: ComputeInstanceId,
1029 replica_id: ReplicaId,
1030}
1031
1032#[derive(Debug)]
1033pub struct IntrospectionSubscribeTimestampOptimizeLir {
1034 validity: PlanValidity,
1035 optimizer: optimize::subscribe::Optimizer,
1036 global_mir_plan: optimize::subscribe::GlobalMirPlan<optimize::subscribe::Unresolved>,
1037 cluster_id: ComputeInstanceId,
1038 replica_id: ReplicaId,
1039}
1040
1041#[derive(Debug)]
1042pub struct IntrospectionSubscribeFinish {
1043 validity: PlanValidity,
1044 global_lir_plan: optimize::subscribe::GlobalLirPlan,
1045 read_holds: ReadHolds,
1046 cluster_id: ComputeInstanceId,
1047 replica_id: ReplicaId,
1048}
1049
1050#[derive(Debug)]
1051pub enum SecretStage {
1052 CreateEnsure(CreateSecretEnsure),
1053 CreateFinish(CreateSecretFinish),
1054 RotateKeysEnsure(RotateKeysSecretEnsure),
1055 RotateKeysFinish(RotateKeysSecretFinish),
1056 Alter(AlterSecret),
1057}
1058
1059#[derive(Debug)]
1060pub struct CreateSecretEnsure {
1061 validity: PlanValidity,
1062 plan: plan::CreateSecretPlan,
1063}
1064
1065#[derive(Debug)]
1066pub struct CreateSecretFinish {
1067 validity: PlanValidity,
1068 item_id: CatalogItemId,
1069 global_id: GlobalId,
1070 plan: plan::CreateSecretPlan,
1071}
1072
1073#[derive(Debug)]
1074pub struct RotateKeysSecretEnsure {
1075 validity: PlanValidity,
1076 id: CatalogItemId,
1077}
1078
1079#[derive(Debug)]
1080pub struct RotateKeysSecretFinish {
1081 validity: PlanValidity,
1082 ops: Vec<crate::catalog::Op>,
1083}
1084
1085#[derive(Debug)]
1086pub struct AlterSecret {
1087 validity: PlanValidity,
1088 plan: plan::AlterSecretPlan,
1089}
1090
1091#[derive(Debug, Copy, Clone, PartialEq, Eq)]
1096pub enum TargetCluster {
1097 CatalogServer,
1099 Active,
1101 Transaction(ClusterId),
1103}
1104
1105pub(crate) enum StageResult<T> {
1107 Handle(JoinHandle<Result<T, AdapterError>>),
1109 HandleRetire(JoinHandle<Result<ExecuteResponse, AdapterError>>),
1111 Immediate(T),
1113 Response(ExecuteResponse),
1115}
1116
1117pub(crate) trait Staged: Send {
1119 type Ctx: StagedContext;
1120
1121 fn validity(&mut self) -> &mut PlanValidity;
1122
1123 async fn stage(
1125 self,
1126 coord: &mut Coordinator,
1127 ctx: &mut Self::Ctx,
1128 ) -> Result<StageResult<Box<Self>>, AdapterError>;
1129
1130 fn message(self, ctx: Self::Ctx, span: Span) -> Message;
1132
1133 fn cancel_enabled(&self) -> bool;
1135}
1136
1137pub trait StagedContext {
1138 fn retire(self, result: Result<ExecuteResponse, AdapterError>);
1139 fn session(&self) -> Option<&Session>;
1140}
1141
1142impl StagedContext for ExecuteContext {
1143 fn retire(self, result: Result<ExecuteResponse, AdapterError>) {
1144 self.retire(result);
1145 }
1146
1147 fn session(&self) -> Option<&Session> {
1148 Some(self.session())
1149 }
1150}
1151
1152impl StagedContext for () {
1153 fn retire(self, _result: Result<ExecuteResponse, AdapterError>) {}
1154
1155 fn session(&self) -> Option<&Session> {
1156 None
1157 }
1158}
1159
1160pub struct Config {
1162 pub controller_config: ControllerConfig,
1163 pub controller_envd_epoch: NonZeroI64,
1164 pub storage: Box<dyn mz_catalog::durable::DurableCatalogState>,
1165 pub audit_logs_iterator: AuditLogIterator,
1166 pub timestamp_oracle_url: Option<SensitiveUrl>,
1167 pub unsafe_mode: bool,
1168 pub all_features: bool,
1169 pub build_info: &'static BuildInfo,
1170 pub environment_id: EnvironmentId,
1171 pub metrics_registry: MetricsRegistry,
1172 pub now: NowFn,
1173 pub secrets_controller: Arc<dyn SecretsController>,
1174 pub cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
1175 pub availability_zones: Vec<String>,
1176 pub cluster_replica_sizes: ClusterReplicaSizeMap,
1177 pub builtin_system_cluster_config: BootstrapBuiltinClusterConfig,
1178 pub builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig,
1179 pub builtin_probe_cluster_config: BootstrapBuiltinClusterConfig,
1180 pub builtin_support_cluster_config: BootstrapBuiltinClusterConfig,
1181 pub builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig,
1182 pub system_parameter_defaults: BTreeMap<String, String>,
1183 pub storage_usage_client: StorageUsageClient,
1184 pub storage_usage_collection_interval: Duration,
1185 pub storage_usage_retention_period: Option<Duration>,
1186 pub segment_client: Option<mz_segment::Client>,
1187 pub egress_addresses: Vec<IpNet>,
1188 pub remote_system_parameters: Option<BTreeMap<String, String>>,
1189 pub aws_account_id: Option<String>,
1190 pub aws_privatelink_availability_zones: Option<Vec<String>>,
1191 pub connection_context: ConnectionContext,
1192 pub connection_limit_callback: Box<dyn Fn(u64, u64) -> () + Send + Sync + 'static>,
1193 pub webhook_concurrency_limit: WebhookConcurrencyLimiter,
1194 pub http_host_name: Option<String>,
1195 pub tracing_handle: TracingHandle,
1196 pub read_only_controllers: bool,
1200
1201 pub caught_up_trigger: Option<Trigger>,
1205
1206 pub helm_chart_version: Option<String>,
1207 pub license_key: ValidatedLicenseKey,
1208 pub external_login_password_mz_system: Option<Password>,
1209 pub force_builtin_schema_migration: Option<String>,
1210}
1211
1212#[derive(Debug, Serialize)]
1214pub struct ConnMeta {
1215 secret_key: u32,
1220 connected_at: EpochMillis,
1222 user: User,
1223 application_name: String,
1224 uuid: Uuid,
1225 conn_id: ConnectionId,
1226 client_ip: Option<IpAddr>,
1227
1228 drop_sinks: BTreeSet<GlobalId>,
1231
1232 #[serde(skip)]
1234 deferred_lock: Option<OwnedMutexGuard<()>>,
1235
1236 pending_cluster_alters: BTreeSet<ClusterId>,
1239
1240 #[serde(skip)]
1242 notice_tx: mpsc::UnboundedSender<AdapterNotice>,
1243
1244 authenticated_role: RoleId,
1248}
1249
1250impl ConnMeta {
1251 pub fn conn_id(&self) -> &ConnectionId {
1252 &self.conn_id
1253 }
1254
1255 pub fn user(&self) -> &User {
1256 &self.user
1257 }
1258
1259 pub fn application_name(&self) -> &str {
1260 &self.application_name
1261 }
1262
1263 pub fn authenticated_role_id(&self) -> &RoleId {
1264 &self.authenticated_role
1265 }
1266
1267 pub fn uuid(&self) -> Uuid {
1268 self.uuid
1269 }
1270
1271 pub fn client_ip(&self) -> Option<IpAddr> {
1272 self.client_ip
1273 }
1274
1275 pub fn connected_at(&self) -> EpochMillis {
1276 self.connected_at
1277 }
1278}
1279
1280#[derive(Debug)]
1281pub struct PendingTxn {
1283 ctx: ExecuteContext,
1285 response: Result<PendingTxnResponse, AdapterError>,
1287 action: EndTransactionAction,
1289}
1290
1291#[derive(Debug)]
1292pub enum PendingTxnResponse {
1294 Committed {
1296 params: BTreeMap<&'static str, String>,
1298 },
1299 Rolledback {
1301 params: BTreeMap<&'static str, String>,
1303 },
1304}
1305
1306impl PendingTxnResponse {
1307 pub fn extend_params(&mut self, p: impl IntoIterator<Item = (&'static str, String)>) {
1308 match self {
1309 PendingTxnResponse::Committed { params }
1310 | PendingTxnResponse::Rolledback { params } => params.extend(p),
1311 }
1312 }
1313}
1314
1315impl From<PendingTxnResponse> for ExecuteResponse {
1316 fn from(value: PendingTxnResponse) -> Self {
1317 match value {
1318 PendingTxnResponse::Committed { params } => {
1319 ExecuteResponse::TransactionCommitted { params }
1320 }
1321 PendingTxnResponse::Rolledback { params } => {
1322 ExecuteResponse::TransactionRolledBack { params }
1323 }
1324 }
1325 }
1326}
1327
1328#[derive(Debug)]
1329pub struct PendingReadTxn {
1331 txn: PendingRead,
1333 timestamp_context: TimestampContext,
1335 created: Instant,
1337 num_requeues: u64,
1341 otel_ctx: OpenTelemetryContext,
1343}
1344
1345impl PendingReadTxn {
1346 pub fn timestamp_context(&self) -> &TimestampContext {
1348 &self.timestamp_context
1349 }
1350
1351 pub(crate) fn take_context(self) -> ExecuteContext {
1352 self.txn.take_context()
1353 }
1354}
1355
1356#[derive(Debug)]
1357enum PendingRead {
1359 Read {
1360 txn: PendingTxn,
1362 },
1363 ReadThenWrite {
1364 ctx: ExecuteContext,
1366 tx: oneshot::Sender<Option<ExecuteContext>>,
1369 },
1370}
1371
1372impl PendingRead {
1373 #[instrument(level = "debug")]
1378 pub fn finish(self) -> Option<(ExecuteContext, Result<ExecuteResponse, AdapterError>)> {
1379 match self {
1380 PendingRead::Read {
1381 txn:
1382 PendingTxn {
1383 mut ctx,
1384 response,
1385 action,
1386 },
1387 ..
1388 } => {
1389 let changed = ctx.session_mut().vars_mut().end_transaction(action);
1390 let response = response.map(|mut r| {
1392 r.extend_params(changed);
1393 ExecuteResponse::from(r)
1394 });
1395
1396 Some((ctx, response))
1397 }
1398 PendingRead::ReadThenWrite { ctx, tx, .. } => {
1399 let _ = tx.send(Some(ctx));
1401 None
1402 }
1403 }
1404 }
1405
1406 fn label(&self) -> &'static str {
1407 match self {
1408 PendingRead::Read { .. } => "read",
1409 PendingRead::ReadThenWrite { .. } => "read_then_write",
1410 }
1411 }
1412
1413 pub(crate) fn take_context(self) -> ExecuteContext {
1414 match self {
1415 PendingRead::Read { txn, .. } => txn.ctx,
1416 PendingRead::ReadThenWrite { ctx, tx, .. } => {
1417 let _ = tx.send(None);
1420 ctx
1421 }
1422 }
1423 }
1424}
1425
1426#[derive(Debug, Default)]
1436#[must_use]
1437pub struct ExecuteContextExtra {
1438 statement_uuid: Option<StatementLoggingId>,
1439}
1440
1441impl ExecuteContextExtra {
1442 pub(crate) fn new(statement_uuid: Option<StatementLoggingId>) -> Self {
1443 Self { statement_uuid }
1444 }
1445 pub fn is_trivial(&self) -> bool {
1446 self.statement_uuid.is_none()
1447 }
1448 pub fn contents(&self) -> Option<StatementLoggingId> {
1449 self.statement_uuid
1450 }
1451 #[must_use]
1455 pub(crate) fn retire(self) -> Option<StatementLoggingId> {
1456 self.statement_uuid
1457 }
1458}
1459
1460#[derive(Debug)]
1470#[must_use]
1471pub struct ExecuteContextGuard {
1472 extra: ExecuteContextExtra,
1473 coordinator_tx: mpsc::UnboundedSender<Message>,
1478}
1479
1480impl Default for ExecuteContextGuard {
1481 fn default() -> Self {
1482 let (tx, _rx) = mpsc::unbounded_channel();
1486 Self {
1487 extra: ExecuteContextExtra::default(),
1488 coordinator_tx: tx,
1489 }
1490 }
1491}
1492
1493impl ExecuteContextGuard {
1494 pub(crate) fn new(
1495 statement_uuid: Option<StatementLoggingId>,
1496 coordinator_tx: mpsc::UnboundedSender<Message>,
1497 ) -> Self {
1498 Self {
1499 extra: ExecuteContextExtra::new(statement_uuid),
1500 coordinator_tx,
1501 }
1502 }
1503 pub fn is_trivial(&self) -> bool {
1504 self.extra.is_trivial()
1505 }
1506 pub fn contents(&self) -> Option<StatementLoggingId> {
1507 self.extra.contents()
1508 }
1509 pub(crate) fn defuse(mut self) -> ExecuteContextExtra {
1516 std::mem::take(&mut self.extra)
1518 }
1519}
1520
1521impl Drop for ExecuteContextGuard {
1522 fn drop(&mut self) {
1523 if let Some(statement_uuid) = self.extra.statement_uuid.take() {
1524 let msg = Message::RetireExecute {
1527 data: ExecuteContextExtra {
1528 statement_uuid: Some(statement_uuid),
1529 },
1530 otel_ctx: OpenTelemetryContext::obtain(),
1531 reason: StatementEndedExecutionReason::Aborted,
1532 };
1533 let _ = self.coordinator_tx.send(msg);
1536 }
1537 }
1538}
1539
1540#[derive(Debug)]
1552pub struct ExecuteContext {
1553 inner: Box<ExecuteContextInner>,
1554}
1555
1556impl std::ops::Deref for ExecuteContext {
1557 type Target = ExecuteContextInner;
1558 fn deref(&self) -> &Self::Target {
1559 &*self.inner
1560 }
1561}
1562
1563impl std::ops::DerefMut for ExecuteContext {
1564 fn deref_mut(&mut self) -> &mut Self::Target {
1565 &mut *self.inner
1566 }
1567}
1568
1569#[derive(Debug)]
1570pub struct ExecuteContextInner {
1571 tx: ClientTransmitter<ExecuteResponse>,
1572 internal_cmd_tx: mpsc::UnboundedSender<Message>,
1573 session: Session,
1574 extra: ExecuteContextGuard,
1575}
1576
1577impl ExecuteContext {
1578 pub fn session(&self) -> &Session {
1579 &self.session
1580 }
1581
1582 pub fn session_mut(&mut self) -> &mut Session {
1583 &mut self.session
1584 }
1585
1586 pub fn tx(&self) -> &ClientTransmitter<ExecuteResponse> {
1587 &self.tx
1588 }
1589
1590 pub fn tx_mut(&mut self) -> &mut ClientTransmitter<ExecuteResponse> {
1591 &mut self.tx
1592 }
1593
1594 pub fn from_parts(
1595 tx: ClientTransmitter<ExecuteResponse>,
1596 internal_cmd_tx: mpsc::UnboundedSender<Message>,
1597 session: Session,
1598 extra: ExecuteContextGuard,
1599 ) -> Self {
1600 Self {
1601 inner: ExecuteContextInner {
1602 tx,
1603 session,
1604 extra,
1605 internal_cmd_tx,
1606 }
1607 .into(),
1608 }
1609 }
1610
1611 pub fn into_parts(
1620 self,
1621 ) -> (
1622 ClientTransmitter<ExecuteResponse>,
1623 mpsc::UnboundedSender<Message>,
1624 Session,
1625 ExecuteContextGuard,
1626 ) {
1627 let ExecuteContextInner {
1628 tx,
1629 internal_cmd_tx,
1630 session,
1631 extra,
1632 } = *self.inner;
1633 (tx, internal_cmd_tx, session, extra)
1634 }
1635
1636 #[instrument(level = "debug")]
1638 pub fn retire(self, result: Result<ExecuteResponse, AdapterError>) {
1639 let ExecuteContextInner {
1640 tx,
1641 internal_cmd_tx,
1642 session,
1643 extra,
1644 } = *self.inner;
1645 let reason = if extra.is_trivial() {
1646 None
1647 } else {
1648 Some((&result).into())
1649 };
1650 tx.send(result, session);
1651 if let Some(reason) = reason {
1652 let extra = extra.defuse();
1654 if let Err(e) = internal_cmd_tx.send(Message::RetireExecute {
1655 otel_ctx: OpenTelemetryContext::obtain(),
1656 data: extra,
1657 reason,
1658 }) {
1659 warn!("internal_cmd_rx dropped before we could send: {:?}", e);
1660 }
1661 }
1662 }
1663
1664 pub fn extra(&self) -> &ExecuteContextGuard {
1665 &self.extra
1666 }
1667
1668 pub fn extra_mut(&mut self) -> &mut ExecuteContextGuard {
1669 &mut self.extra
1670 }
1671}
1672
1673#[derive(Debug)]
1674struct ClusterReplicaStatuses(
1675 BTreeMap<ClusterId, BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>>>,
1676);
1677
1678impl ClusterReplicaStatuses {
1679 pub(crate) fn new() -> ClusterReplicaStatuses {
1680 ClusterReplicaStatuses(BTreeMap::new())
1681 }
1682
1683 pub(crate) fn initialize_cluster_statuses(&mut self, cluster_id: ClusterId) {
1687 let prev = self.0.insert(cluster_id, BTreeMap::new());
1688 assert_eq!(
1689 prev, None,
1690 "cluster {cluster_id} statuses already initialized"
1691 );
1692 }
1693
1694 pub(crate) fn initialize_cluster_replica_statuses(
1698 &mut self,
1699 cluster_id: ClusterId,
1700 replica_id: ReplicaId,
1701 num_processes: usize,
1702 time: DateTime<Utc>,
1703 ) {
1704 tracing::info!(
1705 ?cluster_id,
1706 ?replica_id,
1707 ?time,
1708 "initializing cluster replica status"
1709 );
1710 let replica_statuses = self.0.entry(cluster_id).or_default();
1711 let process_statuses = (0..num_processes)
1712 .map(|process_id| {
1713 let status = ClusterReplicaProcessStatus {
1714 status: ClusterStatus::Offline(Some(OfflineReason::Initializing)),
1715 time: time.clone(),
1716 };
1717 (u64::cast_from(process_id), status)
1718 })
1719 .collect();
1720 let prev = replica_statuses.insert(replica_id, process_statuses);
1721 assert_none!(
1722 prev,
1723 "cluster replica {cluster_id}.{replica_id} statuses already initialized"
1724 );
1725 }
1726
1727 pub(crate) fn remove_cluster_statuses(
1731 &mut self,
1732 cluster_id: &ClusterId,
1733 ) -> BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>> {
1734 let prev = self.0.remove(cluster_id);
1735 prev.unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"))
1736 }
1737
1738 pub(crate) fn remove_cluster_replica_statuses(
1742 &mut self,
1743 cluster_id: &ClusterId,
1744 replica_id: &ReplicaId,
1745 ) -> BTreeMap<ProcessId, ClusterReplicaProcessStatus> {
1746 let replica_statuses = self
1747 .0
1748 .get_mut(cluster_id)
1749 .unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"));
1750 let prev = replica_statuses.remove(replica_id);
1751 prev.unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1752 }
1753
1754 pub(crate) fn ensure_cluster_status(
1758 &mut self,
1759 cluster_id: ClusterId,
1760 replica_id: ReplicaId,
1761 process_id: ProcessId,
1762 status: ClusterReplicaProcessStatus,
1763 ) {
1764 let replica_statuses = self
1765 .0
1766 .get_mut(&cluster_id)
1767 .unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"))
1768 .get_mut(&replica_id)
1769 .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"));
1770 replica_statuses.insert(process_id, status);
1771 }
1772
1773 pub fn get_cluster_replica_status(
1777 &self,
1778 cluster_id: ClusterId,
1779 replica_id: ReplicaId,
1780 ) -> ClusterStatus {
1781 let process_status = self.get_cluster_replica_statuses(cluster_id, replica_id);
1782 Self::cluster_replica_status(process_status)
1783 }
1784
1785 pub fn cluster_replica_status(
1787 process_status: &BTreeMap<ProcessId, ClusterReplicaProcessStatus>,
1788 ) -> ClusterStatus {
1789 process_status
1790 .values()
1791 .fold(ClusterStatus::Online, |s, p| match (s, p.status) {
1792 (ClusterStatus::Online, ClusterStatus::Online) => ClusterStatus::Online,
1793 (x, y) => {
1794 let reason_x = match x {
1795 ClusterStatus::Offline(reason) => reason,
1796 ClusterStatus::Online => None,
1797 };
1798 let reason_y = match y {
1799 ClusterStatus::Offline(reason) => reason,
1800 ClusterStatus::Online => None,
1801 };
1802 ClusterStatus::Offline(reason_x.or(reason_y))
1804 }
1805 })
1806 }
1807
1808 pub(crate) fn get_cluster_replica_statuses(
1812 &self,
1813 cluster_id: ClusterId,
1814 replica_id: ReplicaId,
1815 ) -> &BTreeMap<ProcessId, ClusterReplicaProcessStatus> {
1816 self.try_get_cluster_replica_statuses(cluster_id, replica_id)
1817 .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1818 }
1819
1820 pub(crate) fn try_get_cluster_replica_statuses(
1822 &self,
1823 cluster_id: ClusterId,
1824 replica_id: ReplicaId,
1825 ) -> Option<&BTreeMap<ProcessId, ClusterReplicaProcessStatus>> {
1826 self.try_get_cluster_statuses(cluster_id)
1827 .and_then(|statuses| statuses.get(&replica_id))
1828 }
1829
1830 pub(crate) fn try_get_cluster_statuses(
1832 &self,
1833 cluster_id: ClusterId,
1834 ) -> Option<&BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>>> {
1835 self.0.get(&cluster_id)
1836 }
1837}
1838
1839#[derive(Derivative)]
1841#[derivative(Debug)]
1842pub struct Coordinator {
1843 #[derivative(Debug = "ignore")]
1845 controller: mz_controller::Controller,
1846 catalog: Arc<Catalog>,
1854
1855 persist_client: PersistClient,
1858
1859 internal_cmd_tx: mpsc::UnboundedSender<Message>,
1861 group_commit_tx: appends::GroupCommitNotifier,
1863
1864 strict_serializable_reads_tx: mpsc::UnboundedSender<(ConnectionId, PendingReadTxn)>,
1866
1867 global_timelines: BTreeMap<Timeline, TimelineState>,
1870
1871 transient_id_gen: Arc<TransientIdGen>,
1873 active_conns: BTreeMap<ConnectionId, ConnMeta>,
1876
1877 txn_read_holds: BTreeMap<ConnectionId, read_policy::ReadHolds>,
1881
1882 pending_peeks: BTreeMap<Uuid, PendingPeek>,
1886 client_pending_peeks: BTreeMap<ConnectionId, BTreeMap<Uuid, ClusterId>>,
1888
1889 pending_linearize_read_txns: BTreeMap<ConnectionId, PendingReadTxn>,
1891
1892 active_compute_sinks: BTreeMap<GlobalId, ActiveComputeSink>,
1894 active_webhooks: BTreeMap<CatalogItemId, WebhookAppenderInvalidator>,
1896 active_copies: BTreeMap<ConnectionId, ActiveCopyFrom>,
1899
1900 connection_cancel_watches: BTreeMap<ConnectionId, (watch::Sender<bool>, watch::Receiver<bool>)>,
1908 introspection_subscribes: BTreeMap<GlobalId, IntrospectionSubscribe>,
1910
1911 write_locks: BTreeMap<CatalogItemId, Arc<tokio::sync::Mutex<()>>>,
1913 deferred_write_ops: BTreeMap<ConnectionId, DeferredOp>,
1915
1916 pending_writes: Vec<PendingWriteTxn>,
1918
1919 advance_timelines_interval: Interval,
1929
1930 serialized_ddl: LockedVecDeque<DeferredPlanStatement>,
1939
1940 secrets_controller: Arc<dyn SecretsController>,
1943 caching_secrets_reader: CachingSecretsReader,
1945
1946 cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
1949
1950 storage_usage_client: StorageUsageClient,
1952 storage_usage_collection_interval: Duration,
1954
1955 #[derivative(Debug = "ignore")]
1957 segment_client: Option<mz_segment::Client>,
1958
1959 metrics: Metrics,
1961 optimizer_metrics: OptimizerMetrics,
1963
1964 tracing_handle: TracingHandle,
1966
1967 statement_logging: StatementLogging,
1969
1970 webhook_concurrency_limit: WebhookConcurrencyLimiter,
1972
1973 timestamp_oracle_config: Option<TimestampOracleConfig>,
1976
1977 check_cluster_scheduling_policies_interval: Interval,
1979
1980 cluster_scheduling_decisions: BTreeMap<ClusterId, BTreeMap<&'static str, SchedulingDecision>>,
1984
1985 caught_up_check_interval: Interval,
1988
1989 caught_up_check: Option<CaughtUpCheckContext>,
1992
1993 catalog_info_metrics_registry: MetricsRegistry,
1996
1997 scoped_frontend: Option<Arc<SystemParameterFrontend>>,
2006
2007 installed_watch_sets: BTreeMap<WatchSetId, (ConnectionId, WatchSetResponse)>,
2009
2010 connection_watch_sets: BTreeMap<ConnectionId, BTreeSet<WatchSetId>>,
2012
2013 cluster_replica_statuses: ClusterReplicaStatuses,
2015
2016 read_only_controllers: bool,
2020
2021 buffered_builtin_table_updates: Option<Vec<BuiltinTableUpdate>>,
2029
2030 license_key: ValidatedLicenseKey,
2031
2032 user_id_pool: IdPool,
2034}
2035
2036impl Coordinator {
2037 pub(crate) async fn reconcile_scoped_system_parameters(
2055 &mut self,
2056 scoped: ScopedParameters,
2057 prune_scope: Option<ScopedParametersScope>,
2058 ) {
2059 if self.catalog().state().scoped_system_parameters() == &scoped {
2062 return;
2063 }
2064
2065 if let Err(e) = self
2073 .catalog_transact(
2074 None,
2075 vec![crate::catalog::Op::UpdateScopedSystemParameters {
2076 scoped,
2077 prune_scope,
2078 }],
2079 )
2080 .await
2081 {
2082 tracing::warn!("failed to persist scoped system parameters: {e}");
2083 }
2084 }
2085
2086 pub(crate) async fn resolve_scoped_for_new_objects(
2099 &mut self,
2100 clusters: &BTreeSet<ClusterId>,
2101 replicas: &BTreeSet<ReplicaId>,
2102 ) {
2103 if clusters.is_empty() && replicas.is_empty() {
2104 return;
2105 }
2106 let Some(frontend) = self.scoped_frontend.clone() else {
2107 return;
2108 };
2109 let catalog = self.catalog();
2110 if !ENABLE_SCOPED_SYSTEM_PARAMETERS.get(catalog.system_config().dyncfgs()) {
2111 return;
2112 }
2113
2114 let params = SynchronizedParameters::new(catalog.system_config().clone());
2117 let evaluated =
2118 evaluate_scoped_parameters(&frontend, ¶ms, catalog, Some(clusters), Some(replicas));
2119 let merged = catalog.state().scoped_system_parameters().merge(&evaluated);
2120 let prune_scope = ScopedParametersScope {
2124 clusters: catalog.clusters().map(|cluster| cluster.id).collect(),
2125 replicas: catalog
2126 .clusters()
2127 .flat_map(|cluster| cluster.replicas().map(|replica| replica.replica_id))
2128 .collect(),
2129 };
2130 self.reconcile_scoped_system_parameters(merged, Some(prune_scope))
2131 .await;
2132 }
2133
2134 pub(crate) fn push_replica_dyncfg_overrides(&mut self) {
2140 let replica_overrides = self
2143 .catalog()
2144 .state()
2145 .scoped_system_parameters()
2146 .replica
2147 .clone();
2148
2149 let dyncfgs = self.catalog().system_config().dyncfgs();
2150 let mut instance_overrides: BTreeMap<
2151 ComputeInstanceId,
2152 BTreeMap<ReplicaId, ConfigUpdates>,
2153 > = BTreeMap::new();
2154 for cluster in self.catalog().clusters() {
2155 for replica in cluster.replicas() {
2156 let Some(values) = replica_overrides.get(&replica.replica_id) else {
2157 continue;
2158 };
2159 let mut updates = ConfigUpdates::default();
2160 for (name, value) in values {
2161 let Some(entry) = dyncfgs.entry(name) else {
2162 continue;
2165 };
2166 match entry.parse_val(value) {
2167 Ok(val) => updates.add_dynamic(name, val),
2168 Err(e) => {
2169 tracing::warn!(%name, %value, "cannot parse scoped override: {e}")
2170 }
2171 }
2172 }
2173 if !updates.updates.is_empty() {
2174 instance_overrides
2175 .entry(cluster.id)
2176 .or_default()
2177 .insert(replica.replica_id, updates);
2178 }
2179 }
2180 }
2181
2182 self.controller
2193 .compute
2194 .update_replica_dyncfg_overrides(instance_overrides);
2195 let compute_config = crate::flags::compute_config(self.catalog().system_config());
2201 self.controller.compute.update_configuration(compute_config);
2202 }
2203
2204 pub(crate) fn cluster_scoped_optimizer_overrides(
2208 &self,
2209 cluster_id: ClusterId,
2210 ) -> OptimizerFeatureOverrides {
2211 self.catalog()
2212 .state()
2213 .cluster_scoped_optimizer_overrides(cluster_id)
2214 }
2215
2216 #[instrument(name = "coord::bootstrap")]
2220 pub(crate) async fn bootstrap(
2221 &mut self,
2222 boot_ts: Timestamp,
2223 migrated_storage_collections_0dt: BTreeSet<CatalogItemId>,
2224 mut builtin_table_updates: Vec<BuiltinTableUpdate>,
2225 cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
2226 uncached_local_exprs: BTreeMap<GlobalId, LocalExpressions>,
2227 audit_logs_iterator: AuditLogIterator,
2228 ) -> Result<(), AdapterError> {
2229 let bootstrap_start = Instant::now();
2230 info!("startup: coordinator init: bootstrap beginning");
2231 info!("startup: coordinator init: bootstrap: preamble beginning");
2232
2233 let cluster_statuses: Vec<(_, Vec<_>)> = self
2236 .catalog()
2237 .clusters()
2238 .map(|cluster| {
2239 (
2240 cluster.id(),
2241 cluster
2242 .replicas()
2243 .map(|replica| {
2244 (replica.replica_id, replica.config.location.num_processes())
2245 })
2246 .collect(),
2247 )
2248 })
2249 .collect();
2250 let now = self.now_datetime();
2251 for (cluster_id, replica_statuses) in cluster_statuses {
2252 self.cluster_replica_statuses
2253 .initialize_cluster_statuses(cluster_id);
2254 for (replica_id, num_processes) in replica_statuses {
2255 self.cluster_replica_statuses
2256 .initialize_cluster_replica_statuses(
2257 cluster_id,
2258 replica_id,
2259 num_processes,
2260 now,
2261 );
2262 }
2263 }
2264
2265 let system_config = self.catalog().system_config();
2266
2267 mz_metrics::update_dyncfg(&system_config.dyncfg_updates());
2269
2270 let compute_config = flags::compute_config(system_config);
2272 let storage_config = flags::storage_config(system_config);
2273 let scheduling_config = flags::orchestrator_scheduling_config(system_config);
2274 let dyncfg_updates = system_config.dyncfg_updates();
2275 self.controller.compute.update_configuration(compute_config);
2276 self.controller.storage.update_parameters(storage_config);
2277 self.controller
2278 .update_orchestrator_scheduling_config(scheduling_config);
2279 self.controller.update_configuration(dyncfg_updates);
2280
2281 let enforce_credit_limit_at_bootstrap = !matches!(
2286 self.license_key.expiration_behavior,
2287 ExpirationBehavior::DisableClusterCreation,
2288 );
2289 if enforce_credit_limit_at_bootstrap {
2290 self.validate_resource_limit_numeric(
2291 Numeric::zero(),
2292 self.current_credit_consumption_rate(),
2293 |system_vars| {
2294 self.license_key
2295 .max_credit_consumption_rate()
2296 .map_or_else(|| system_vars.max_credit_consumption_rate(), Numeric::from)
2297 },
2298 "cluster replica",
2299 MAX_CREDIT_CONSUMPTION_RATE.name(),
2300 )?;
2301 }
2302
2303 let mut policies_to_set: BTreeMap<CompactionWindow, CollectionIdBundle> =
2304 Default::default();
2305
2306 let enable_worker_core_affinity =
2307 self.catalog().system_config().enable_worker_core_affinity();
2308 let enable_storage_introspection_logs = self
2309 .catalog()
2310 .system_config()
2311 .enable_storage_introspection_logs();
2312 for instance in self.catalog.clusters() {
2313 self.controller.create_cluster(
2314 instance.id,
2315 ClusterConfig {
2316 arranged_logs: instance.log_indexes.clone(),
2317 workload_class: instance.config.workload_class.clone(),
2318 },
2319 )?;
2320 for replica in instance.replicas() {
2321 let role = instance.role();
2322 self.controller.create_replica(
2323 instance.id,
2324 replica.replica_id,
2325 instance.name.clone(),
2326 replica.name.clone(),
2327 role,
2328 replica.config.clone(),
2329 enable_worker_core_affinity,
2330 enable_storage_introspection_logs,
2331 )?;
2332 }
2333 }
2334
2335 self.push_replica_dyncfg_overrides();
2347
2348 info!(
2349 "startup: coordinator init: bootstrap: preamble complete in {:?}",
2350 bootstrap_start.elapsed()
2351 );
2352
2353 let init_storage_collections_start = Instant::now();
2354 info!("startup: coordinator init: bootstrap: storage collections init beginning");
2355 self.bootstrap_storage_collections(&migrated_storage_collections_0dt)
2356 .await;
2357 info!(
2358 "startup: coordinator init: bootstrap: storage collections init complete in {:?}",
2359 init_storage_collections_start.elapsed()
2360 );
2361
2362 self.controller.start_compute_introspection_sink();
2367
2368 let sorting_start = Instant::now();
2369 info!("startup: coordinator init: bootstrap: sorting catalog entries");
2370 let entries = self.bootstrap_sort_catalog_entries();
2371 info!(
2372 "startup: coordinator init: bootstrap: sorting catalog entries complete in {:?}",
2373 sorting_start.elapsed()
2374 );
2375
2376 let optimize_dataflows_start = Instant::now();
2377 info!("startup: coordinator init: bootstrap: optimize dataflow plans beginning");
2378 let uncached_global_exps = self.bootstrap_dataflow_plans(&entries, cached_global_exprs)?;
2379 info!(
2380 "startup: coordinator init: bootstrap: optimize dataflow plans complete in {:?}",
2381 optimize_dataflows_start.elapsed()
2382 );
2383
2384 let _fut = self.catalog().update_expression_cache(
2386 uncached_local_exprs.into_iter().collect(),
2387 uncached_global_exps.into_iter().collect(),
2388 Default::default(),
2389 );
2390
2391 let bootstrap_as_ofs_start = Instant::now();
2395 info!("startup: coordinator init: bootstrap: dataflow as-of bootstrapping beginning");
2396 let dataflow_read_holds = self.bootstrap_dataflow_as_ofs().await;
2397 info!(
2398 "startup: coordinator init: bootstrap: dataflow as-of bootstrapping complete in {:?}",
2399 bootstrap_as_ofs_start.elapsed()
2400 );
2401
2402 let postamble_start = Instant::now();
2403 info!("startup: coordinator init: bootstrap: postamble beginning");
2404
2405 let logs: BTreeSet<_> = BUILTINS::logs()
2406 .map(|log| self.catalog().resolve_builtin_log(log))
2407 .flat_map(|item_id| self.catalog().get_global_ids(&item_id))
2408 .collect();
2409
2410 let mut privatelink_connections = BTreeMap::new();
2411
2412 for entry in &entries {
2413 debug!(
2414 "coordinator init: installing {} {}",
2415 entry.item().typ(),
2416 entry.id()
2417 );
2418 let mut policy = entry.item().initial_logical_compaction_window();
2419 match entry.item() {
2420 CatalogItem::Source(source) => {
2426 if source.custom_logical_compaction_window.is_none() {
2428 if let DataSourceDesc::IngestionExport { ingestion_id, .. } =
2429 source.data_source
2430 {
2431 policy = Some(
2432 self.catalog()
2433 .get_entry(&ingestion_id)
2434 .source()
2435 .expect("must be source")
2436 .custom_logical_compaction_window
2437 .unwrap_or_default(),
2438 );
2439 }
2440 }
2441 policies_to_set
2442 .entry(policy.expect("sources have a compaction window"))
2443 .or_insert_with(Default::default)
2444 .storage_ids
2445 .insert(source.global_id());
2446 }
2447 CatalogItem::Table(table) => {
2448 policies_to_set
2449 .entry(policy.expect("tables have a compaction window"))
2450 .or_insert_with(Default::default)
2451 .storage_ids
2452 .extend(table.global_ids());
2453 }
2454 CatalogItem::Index(idx) => {
2455 let policy_entry = policies_to_set
2456 .entry(policy.expect("indexes have a compaction window"))
2457 .or_insert_with(Default::default);
2458
2459 if logs.contains(&idx.on) {
2460 policy_entry
2461 .compute_ids
2462 .entry(idx.cluster_id)
2463 .or_insert_with(BTreeSet::new)
2464 .insert(idx.global_id());
2465 } else {
2466 let df_desc = self
2467 .catalog()
2468 .try_get_physical_plan(&idx.global_id())
2469 .expect("added in `bootstrap_dataflow_plans`")
2470 .clone();
2471
2472 let df_meta = self
2473 .catalog()
2474 .try_get_dataflow_metainfo(&idx.global_id())
2475 .expect("added in `bootstrap_dataflow_plans`");
2476
2477 if self.catalog().state().system_config().enable_mz_notices() {
2478 self.catalog().state().pack_optimizer_notices(
2480 &mut builtin_table_updates,
2481 df_meta.optimizer_notices.iter(),
2482 Diff::ONE,
2483 );
2484 }
2485
2486 policy_entry
2489 .compute_ids
2490 .entry(idx.cluster_id)
2491 .or_insert_with(Default::default)
2492 .extend(df_desc.export_ids());
2493
2494 self.controller
2495 .compute
2496 .create_dataflow(idx.cluster_id, df_desc, None)
2497 .unwrap_or_terminate("cannot fail to create dataflows");
2498 }
2499 }
2500 CatalogItem::View(_) => (),
2501 CatalogItem::MaterializedView(mview) => {
2502 policies_to_set
2503 .entry(policy.expect("materialized views have a compaction window"))
2504 .or_insert_with(Default::default)
2505 .storage_ids
2506 .insert(mview.global_id_writes());
2507
2508 let mut df_desc = self
2509 .catalog()
2510 .try_get_physical_plan(&mview.global_id_writes())
2511 .expect("added in `bootstrap_dataflow_plans`")
2512 .clone();
2513
2514 if let Some(initial_as_of) = mview.initial_as_of.clone() {
2515 df_desc.set_initial_as_of(initial_as_of);
2516 }
2517
2518 let until = mview
2520 .refresh_schedule
2521 .as_ref()
2522 .and_then(|s| s.last_refresh())
2523 .and_then(|r| r.try_step_forward());
2524 if let Some(until) = until {
2525 df_desc.until.meet_assign(&Antichain::from_elem(until));
2526 }
2527
2528 let df_meta = self
2529 .catalog()
2530 .try_get_dataflow_metainfo(&mview.global_id_writes())
2531 .expect("added in `bootstrap_dataflow_plans`");
2532
2533 if self.catalog().state().system_config().enable_mz_notices() {
2534 self.catalog().state().pack_optimizer_notices(
2536 &mut builtin_table_updates,
2537 df_meta.optimizer_notices.iter(),
2538 Diff::ONE,
2539 );
2540 }
2541
2542 self.ship_dataflow(df_desc, mview.cluster_id, mview.target_replica)
2543 .await;
2544
2545 if mview.replacement_target.is_none() {
2548 self.allow_writes(mview.cluster_id, mview.global_id_writes());
2549 }
2550 }
2551 CatalogItem::Sink(sink) => {
2552 policies_to_set
2553 .entry(CompactionWindow::Default)
2554 .or_insert_with(Default::default)
2555 .storage_ids
2556 .insert(sink.global_id());
2557 }
2558 CatalogItem::Connection(catalog_connection) => {
2559 if let ConnectionDetails::AwsPrivatelink(conn) = &catalog_connection.details {
2560 privatelink_connections.insert(
2561 entry.id(),
2562 VpcEndpointConfig {
2563 aws_service_name: conn.service_name.clone(),
2564 availability_zone_ids: conn.availability_zones.clone(),
2565 },
2566 );
2567 }
2568 }
2569 CatalogItem::Log(_)
2571 | CatalogItem::Type(_)
2572 | CatalogItem::Func(_)
2573 | CatalogItem::Secret(_) => {}
2574 }
2575 }
2576
2577 if let Some(cloud_resource_controller) = &self.cloud_resource_controller {
2578 let existing_vpc_endpoints = cloud_resource_controller
2580 .list_vpc_endpoints()
2581 .await
2582 .context("list vpc endpoints")?;
2583 let existing_vpc_endpoints = BTreeSet::from_iter(existing_vpc_endpoints.into_keys());
2584 let desired_vpc_endpoints = privatelink_connections.keys().cloned().collect();
2585 let vpc_endpoints_to_remove = existing_vpc_endpoints.difference(&desired_vpc_endpoints);
2586 for id in vpc_endpoints_to_remove {
2587 cloud_resource_controller
2588 .delete_vpc_endpoint(*id)
2589 .await
2590 .context("deleting extraneous vpc endpoint")?;
2591 }
2592
2593 for (id, spec) in privatelink_connections {
2595 cloud_resource_controller
2596 .ensure_vpc_endpoint(id, spec)
2597 .await
2598 .context("ensuring vpc endpoint")?;
2599 }
2600 }
2601
2602 drop(dataflow_read_holds);
2605 for (cw, policies) in policies_to_set {
2607 self.initialize_read_policies(&policies, cw).await;
2608 }
2609
2610 builtin_table_updates.extend(
2612 self.catalog().state().resolve_builtin_table_updates(
2613 self.catalog().state().pack_all_replica_size_updates(),
2614 ),
2615 );
2616
2617 debug!("startup: coordinator init: bootstrap: initializing migrated builtin tables");
2618 let migrated_updates_fut = if self.controller.read_only() {
2624 let min_timestamp = Timestamp::minimum();
2625 let migrated_builtin_table_updates: Vec<_> = builtin_table_updates
2626 .extract_if(.., |update| {
2627 let gid = self.catalog().get_entry(&update.id).latest_global_id();
2628 migrated_storage_collections_0dt.contains(&update.id)
2629 && self
2630 .controller
2631 .storage_collections
2632 .collection_frontiers(gid)
2633 .expect("all tables are registered")
2634 .write_frontier
2635 .elements()
2636 == &[min_timestamp]
2637 })
2638 .collect();
2639 if migrated_builtin_table_updates.is_empty() {
2640 futures::future::ready(()).boxed()
2641 } else {
2642 let mut grouped_appends: BTreeMap<GlobalId, Vec<TableData>> = BTreeMap::new();
2644 for update in migrated_builtin_table_updates {
2645 let gid = self.catalog().get_entry(&update.id).latest_global_id();
2646 grouped_appends.entry(gid).or_default().push(update.data);
2647 }
2648 info!(
2649 "coordinator init: rehydrating migrated builtin tables in read-only mode: {:?}",
2650 grouped_appends.keys().collect::<Vec<_>>()
2651 );
2652
2653 let mut all_appends = Vec::with_capacity(grouped_appends.len());
2655 for (item_id, table_data) in grouped_appends.into_iter() {
2656 let mut all_rows = Vec::new();
2657 let mut all_data = Vec::new();
2658 for data in table_data {
2659 match data {
2660 TableData::Rows(rows) => all_rows.extend(rows),
2661 TableData::Batches(_) => all_data.push(data),
2662 }
2663 }
2664 differential_dataflow::consolidation::consolidate(&mut all_rows);
2665 all_data.push(TableData::Rows(all_rows));
2666
2667 all_appends.push((item_id, all_data));
2669 }
2670
2671 let fut = self
2672 .controller
2673 .storage
2674 .append_table(min_timestamp, boot_ts.step_forward(), all_appends)
2675 .expect("cannot fail to append");
2676 async {
2677 fut.await
2678 .expect("One-shot shouldn't be dropped during bootstrap")
2679 .unwrap_or_terminate("cannot fail to append")
2680 }
2681 .boxed()
2682 }
2683 } else {
2684 futures::future::ready(()).boxed()
2685 };
2686
2687 info!(
2688 "startup: coordinator init: bootstrap: postamble complete in {:?}",
2689 postamble_start.elapsed()
2690 );
2691
2692 let builtin_update_start = Instant::now();
2693 info!("startup: coordinator init: bootstrap: generate builtin updates beginning");
2694
2695 if self.controller.read_only() {
2696 info!(
2697 "coordinator init: bootstrap: stashing builtin table updates while in read-only mode"
2698 );
2699
2700 let audit_join_start = Instant::now();
2702 info!("startup: coordinator init: bootstrap: audit log deserialization beginning");
2703 let audit_log_updates: Vec<_> = audit_logs_iterator
2704 .map(|(audit_log, ts)| StateUpdate {
2705 kind: StateUpdateKind::AuditLog(audit_log),
2706 ts,
2707 diff: StateDiff::Addition,
2708 })
2709 .collect();
2710 let audit_log_builtin_table_updates = self
2711 .catalog()
2712 .state()
2713 .generate_builtin_table_updates(audit_log_updates);
2714 builtin_table_updates.extend(audit_log_builtin_table_updates);
2715 info!(
2716 "startup: coordinator init: bootstrap: audit log deserialization complete in {:?}",
2717 audit_join_start.elapsed()
2718 );
2719 self.buffered_builtin_table_updates
2720 .as_mut()
2721 .expect("in read-only mode")
2722 .append(&mut builtin_table_updates);
2723 } else {
2724 self.bootstrap_tables(&entries, builtin_table_updates, audit_logs_iterator)
2725 .await;
2726 };
2727 info!(
2728 "startup: coordinator init: bootstrap: generate builtin updates complete in {:?}",
2729 builtin_update_start.elapsed()
2730 );
2731
2732 let cleanup_secrets_start = Instant::now();
2733 info!("startup: coordinator init: bootstrap: generate secret cleanup beginning");
2734 {
2738 let Self {
2741 secrets_controller,
2742 catalog,
2743 ..
2744 } = self;
2745
2746 let next_user_item_id = catalog.get_next_user_item_id().await?;
2747 let next_system_item_id = catalog.get_next_system_item_id().await?;
2748 let read_only = self.controller.read_only();
2749 let catalog_ids: BTreeSet<CatalogItemId> =
2754 catalog.entries().map(|entry| entry.id()).collect();
2755 let secrets_controller = Arc::clone(secrets_controller);
2756
2757 spawn(|| "cleanup-orphaned-secrets", async move {
2758 if read_only {
2759 info!(
2760 "coordinator init: not cleaning up orphaned secrets while in read-only mode"
2761 );
2762 return;
2763 }
2764 info!("coordinator init: cleaning up orphaned secrets");
2765
2766 match secrets_controller.list().await {
2767 Ok(controller_secrets) => {
2768 let controller_secrets: BTreeSet<CatalogItemId> =
2769 controller_secrets.into_iter().collect();
2770 let orphaned = controller_secrets.difference(&catalog_ids);
2771 for id in orphaned {
2772 let id_too_large = match id {
2773 CatalogItemId::System(id) => *id >= next_system_item_id,
2774 CatalogItemId::User(id) => *id >= next_user_item_id,
2775 CatalogItemId::IntrospectionSourceIndex(_)
2776 | CatalogItemId::Transient(_) => false,
2777 };
2778 if id_too_large {
2779 info!(
2780 %next_user_item_id, %next_system_item_id,
2781 "coordinator init: not deleting orphaned secret {id} that was likely created by a newer deploy generation"
2782 );
2783 } else {
2784 info!("coordinator init: deleting orphaned secret {id}");
2785 fail_point!("orphan_secrets");
2786 if let Err(e) = secrets_controller.delete(*id).await {
2787 warn!(
2788 "Dropping orphaned secret has encountered an error: {}",
2789 e
2790 );
2791 }
2792 }
2793 }
2794 }
2795 Err(e) => warn!("Failed to list secrets during orphan cleanup: {:?}", e),
2796 }
2797 });
2798 }
2799 info!(
2800 "startup: coordinator init: bootstrap: generate secret cleanup complete in {:?}",
2801 cleanup_secrets_start.elapsed()
2802 );
2803
2804 let final_steps_start = Instant::now();
2806 info!(
2807 "startup: coordinator init: bootstrap: migrate builtin tables in read-only mode beginning"
2808 );
2809 migrated_updates_fut
2810 .instrument(info_span!("coord::bootstrap::final"))
2811 .await;
2812
2813 debug!(
2814 "startup: coordinator init: bootstrap: announcing completion of initialization to controller"
2815 );
2816 self.controller.initialization_complete();
2818
2819 self.bootstrap_introspection_subscribes().await;
2821
2822 info!(
2823 "startup: coordinator init: bootstrap: migrate builtin tables in read-only mode complete in {:?}",
2824 final_steps_start.elapsed()
2825 );
2826
2827 info!(
2828 "startup: coordinator init: bootstrap complete in {:?}",
2829 bootstrap_start.elapsed()
2830 );
2831 Ok(())
2832 }
2833
2834 #[allow(clippy::async_yields_async)]
2839 #[instrument]
2840 async fn bootstrap_tables(
2841 &mut self,
2842 entries: &[CatalogEntry],
2843 mut builtin_table_updates: Vec<BuiltinTableUpdate>,
2844 audit_logs_iterator: AuditLogIterator,
2845 ) {
2846 struct TableMetadata<'a> {
2848 id: CatalogItemId,
2849 name: &'a QualifiedItemName,
2850 table: &'a Table,
2851 }
2852
2853 let table_metas: Vec<_> = entries
2855 .into_iter()
2856 .filter_map(|entry| {
2857 entry.table().map(|table| TableMetadata {
2858 id: entry.id(),
2859 name: entry.name(),
2860 table,
2861 })
2862 })
2863 .collect();
2864
2865 debug!("coordinator init: advancing all tables to current timestamp");
2867 let WriteTimestamp {
2868 timestamp: write_ts,
2869 advance_to,
2870 } = self.get_local_write_ts().await;
2871 let appends = table_metas
2872 .iter()
2873 .map(|meta| (meta.table.global_id_writes(), Vec::new()))
2874 .collect();
2875 let table_fence_rx = self
2879 .controller
2880 .storage
2881 .append_table(write_ts.clone(), advance_to, appends)
2882 .expect("invalid updates");
2883
2884 self.apply_local_write(write_ts).await;
2885
2886 debug!("coordinator init: resetting system tables");
2888 let read_ts = self.get_local_read_ts().await;
2889
2890 let mz_storage_usage_by_shard_schema: SchemaSpecifier = self
2893 .catalog()
2894 .resolve_system_schema(MZ_STORAGE_USAGE_BY_SHARD.schema)
2895 .into();
2896 let is_storage_usage_by_shard = |meta: &TableMetadata| -> bool {
2897 meta.name.item == MZ_STORAGE_USAGE_BY_SHARD.name
2898 && meta.name.qualifiers.schema_spec == mz_storage_usage_by_shard_schema
2899 };
2900
2901 let mut retraction_tasks = Vec::new();
2902 let mut system_tables: Vec<_> = table_metas
2903 .iter()
2904 .filter(|meta| meta.id.is_system() && !is_storage_usage_by_shard(meta))
2905 .collect();
2906
2907 let (audit_events_idx, _) = system_tables
2909 .iter()
2910 .find_position(|table| {
2911 table.id == self.catalog().resolve_builtin_table(&MZ_AUDIT_EVENTS)
2912 })
2913 .expect("mz_audit_events must exist");
2914 let audit_events = system_tables.remove(audit_events_idx);
2915 let audit_log_task = self.bootstrap_audit_log_table(
2916 audit_events.id,
2917 audit_events.name,
2918 audit_events.table,
2919 audit_logs_iterator,
2920 read_ts,
2921 );
2922
2923 for system_table in system_tables {
2924 let table_id = system_table.id;
2925 let full_name = self.catalog().resolve_full_name(system_table.name, None);
2926 debug!("coordinator init: resetting system table {full_name} ({table_id})");
2927
2928 let snapshot_fut = self
2930 .controller
2931 .storage_collections
2932 .snapshot_cursor(system_table.table.global_id_writes(), read_ts);
2933 let batch_fut = self
2934 .controller
2935 .storage_collections
2936 .create_update_builder(system_table.table.global_id_writes());
2937
2938 let task = spawn(|| format!("snapshot-{table_id}"), async move {
2939 let mut batch = batch_fut
2941 .await
2942 .unwrap_or_terminate("cannot fail to create a batch for a BuiltinTable");
2943 tracing::info!(?table_id, "starting snapshot");
2944 let mut snapshot_cursor = snapshot_fut
2946 .await
2947 .unwrap_or_terminate("cannot fail to snapshot");
2948
2949 while let Some(values) = snapshot_cursor.next().await {
2951 for (key, _t, d) in values {
2952 let d_invert = d.neg();
2953 batch.add(&key, &(), &d_invert).await;
2954 }
2955 }
2956 tracing::info!(?table_id, "finished snapshot");
2957
2958 let batch = batch.finish().await;
2959 BuiltinTableUpdate::batch(table_id, batch)
2960 });
2961 retraction_tasks.push(task);
2962 }
2963
2964 let retractions_res = futures::future::join_all(retraction_tasks).await;
2965 for retractions in retractions_res {
2966 builtin_table_updates.push(retractions);
2967 }
2968
2969 let audit_join_start = Instant::now();
2970 info!("startup: coordinator init: bootstrap: join audit log deserialization beginning");
2971 let audit_log_updates = audit_log_task.await;
2972 let audit_log_builtin_table_updates = self
2973 .catalog()
2974 .state()
2975 .generate_builtin_table_updates(audit_log_updates);
2976 builtin_table_updates.extend(audit_log_builtin_table_updates);
2977 info!(
2978 "startup: coordinator init: bootstrap: join audit log deserialization complete in {:?}",
2979 audit_join_start.elapsed()
2980 );
2981
2982 table_fence_rx
2984 .await
2985 .expect("One-shot shouldn't be dropped during bootstrap")
2986 .unwrap_or_terminate("cannot fail to append");
2987
2988 info!("coordinator init: sending builtin table updates");
2989 let (_builtin_updates_fut, write_ts) = self
2990 .builtin_table_update()
2991 .execute(builtin_table_updates)
2992 .await;
2993 info!(?write_ts, "our write ts");
2994 if let Some(write_ts) = write_ts {
2995 self.apply_local_write(write_ts).await;
2996 }
2997 }
2998
2999 #[instrument]
3003 fn bootstrap_audit_log_table<'a>(
3004 &self,
3005 table_id: CatalogItemId,
3006 name: &'a QualifiedItemName,
3007 table: &'a Table,
3008 audit_logs_iterator: AuditLogIterator,
3009 read_ts: Timestamp,
3010 ) -> JoinHandle<Vec<StateUpdate>> {
3011 let full_name = self.catalog().resolve_full_name(name, None);
3012 debug!("coordinator init: reconciling audit log: {full_name} ({table_id})");
3013 let current_contents_fut = self
3014 .controller
3015 .storage_collections
3016 .snapshot(table.global_id_writes(), read_ts);
3017 spawn(|| format!("snapshot-audit-log-{table_id}"), async move {
3018 let current_contents = current_contents_fut
3019 .await
3020 .unwrap_or_terminate("cannot fail to fetch snapshot");
3021 let contents_len = current_contents.len();
3022 debug!("coordinator init: audit log table ({table_id}) size {contents_len}");
3023
3024 let max_table_id = current_contents
3026 .into_iter()
3027 .filter(|(_, diff)| *diff == 1)
3028 .map(|(row, _diff)| row.unpack_first().unwrap_uint64())
3029 .sorted()
3030 .rev()
3031 .next();
3032
3033 audit_logs_iterator
3035 .take_while(|(audit_log, _)| match max_table_id {
3036 Some(id) => audit_log.event.sortable_id() > id,
3037 None => true,
3038 })
3039 .map(|(audit_log, ts)| StateUpdate {
3040 kind: StateUpdateKind::AuditLog(audit_log),
3041 ts,
3042 diff: StateDiff::Addition,
3043 })
3044 .collect::<Vec<_>>()
3045 })
3046 }
3047
3048 #[instrument]
3061 async fn bootstrap_storage_collections(
3062 &mut self,
3063 migrated_storage_collections: &BTreeSet<CatalogItemId>,
3064 ) {
3065 let catalog = self.catalog();
3066
3067 let source_desc = |object_id: GlobalId,
3068 data_source: &DataSourceDesc,
3069 desc: &RelationDesc,
3070 timeline: &Timeline| {
3071 let data_source = match data_source.clone() {
3072 DataSourceDesc::Ingestion { desc, cluster_id } => {
3074 let desc = desc.into_inline_connection(catalog.state());
3075 let ingestion = IngestionDescription::new(desc, cluster_id, object_id);
3076 DataSource::Ingestion(ingestion)
3077 }
3078 DataSourceDesc::OldSyntaxIngestion {
3079 desc,
3080 progress_subsource,
3081 data_config,
3082 details,
3083 cluster_id,
3084 } => {
3085 let desc = desc.into_inline_connection(catalog.state());
3086 let data_config = data_config.into_inline_connection(catalog.state());
3087 let progress_subsource =
3090 catalog.get_entry(&progress_subsource).latest_global_id();
3091 let mut ingestion =
3092 IngestionDescription::new(desc, cluster_id, progress_subsource);
3093 let legacy_export = SourceExport {
3094 storage_metadata: (),
3095 data_config,
3096 details,
3097 };
3098 ingestion.source_exports.insert(object_id, legacy_export);
3099
3100 DataSource::Ingestion(ingestion)
3101 }
3102 DataSourceDesc::IngestionExport {
3103 ingestion_id,
3104 external_reference: _,
3105 details,
3106 data_config,
3107 } => {
3108 let ingestion_id = catalog.get_entry(&ingestion_id).latest_global_id();
3111
3112 DataSource::IngestionExport {
3113 ingestion_id,
3114 details,
3115 data_config: data_config.into_inline_connection(catalog.state()),
3116 }
3117 }
3118 DataSourceDesc::Webhook { .. } => DataSource::Webhook,
3119 DataSourceDesc::Progress => DataSource::Progress,
3120 DataSourceDesc::Introspection(introspection) => {
3121 DataSource::Introspection(introspection)
3122 }
3123 DataSourceDesc::Catalog => DataSource::Other,
3124 };
3125 CollectionDescription {
3126 desc: desc.clone(),
3127 data_source,
3128 since: None,
3129 timeline: Some(timeline.clone()),
3130 primary: None,
3131 }
3132 };
3133
3134 let mut compute_collections = vec![];
3135 let mut collections = vec![];
3136 for entry in catalog.entries() {
3137 match entry.item() {
3138 CatalogItem::Source(source) => {
3139 collections.push((
3140 source.global_id(),
3141 source_desc(
3142 source.global_id(),
3143 &source.data_source,
3144 &source.desc,
3145 &source.timeline,
3146 ),
3147 ));
3148 }
3149 CatalogItem::Table(table) => {
3150 match &table.data_source {
3151 TableDataSource::TableWrites { defaults: _ } => {
3152 let versions: BTreeMap<_, _> = table
3153 .collection_descs()
3154 .map(|(gid, version, desc)| (version, (gid, desc)))
3155 .collect();
3156 let collection_descs = versions.iter().map(|(version, (gid, desc))| {
3157 let next_version = version.bump();
3158 let primary_collection =
3159 versions.get(&next_version).map(|(gid, _desc)| gid).copied();
3160 let mut collection_desc =
3161 CollectionDescription::for_table(desc.clone());
3162 collection_desc.primary = primary_collection;
3163
3164 (*gid, collection_desc)
3165 });
3166 collections.extend(collection_descs);
3167 }
3168 TableDataSource::DataSource {
3169 desc: data_source_desc,
3170 timeline,
3171 } => {
3172 soft_assert_eq_or_log!(table.collections.len(), 1);
3174 let collection_descs =
3175 table.collection_descs().map(|(gid, _version, desc)| {
3176 (
3177 gid,
3178 source_desc(
3179 entry.latest_global_id(),
3180 data_source_desc,
3181 &desc,
3182 timeline,
3183 ),
3184 )
3185 });
3186 collections.extend(collection_descs);
3187 }
3188 };
3189 }
3190 CatalogItem::MaterializedView(mv) => {
3191 let collection_descs = mv.collection_descs().map(|(gid, _version, desc)| {
3192 let collection_desc =
3193 CollectionDescription::for_other(desc, mv.initial_as_of.clone());
3194 (gid, collection_desc)
3195 });
3196
3197 collections.extend(collection_descs);
3198 compute_collections.push((mv.global_id_writes(), mv.desc.latest()));
3199 }
3200 CatalogItem::Sink(sink) => {
3201 let storage_sink_from_entry = self.catalog().get_entry_by_global_id(&sink.from);
3202 let from_desc = storage_sink_from_entry
3203 .relation_desc()
3204 .expect("sinks can only be built on items with descs")
3205 .into_owned();
3206 let collection_desc = CollectionDescription {
3207 desc: KAFKA_PROGRESS_DESC.clone(),
3209 data_source: DataSource::Sink {
3210 desc: ExportDescription {
3211 sink: StorageSinkDesc {
3212 from: sink.from,
3213 from_desc,
3214 connection: sink
3215 .connection
3216 .clone()
3217 .into_inline_connection(self.catalog().state()),
3218 envelope: sink.envelope,
3219 as_of: Antichain::from_elem(Timestamp::minimum()),
3220 with_snapshot: sink.with_snapshot,
3221 version: sink.version,
3222 from_storage_metadata: (),
3223 to_storage_metadata: (),
3224 commit_interval: sink.commit_interval,
3225 },
3226 instance_id: sink.cluster_id,
3227 },
3228 },
3229 since: None,
3230 timeline: None,
3231 primary: None,
3232 };
3233 collections.push((sink.global_id, collection_desc));
3234 }
3235 CatalogItem::Log(_)
3236 | CatalogItem::View(_)
3237 | CatalogItem::Index(_)
3238 | CatalogItem::Type(_)
3239 | CatalogItem::Func(_)
3240 | CatalogItem::Secret(_)
3241 | CatalogItem::Connection(_) => (),
3242 }
3243 }
3244
3245 let register_ts = if self.controller.read_only() {
3246 self.get_local_read_ts().await
3247 } else {
3248 self.get_local_write_ts().await.timestamp
3251 };
3252
3253 let storage_metadata = self.catalog.state().storage_metadata();
3254 let migrated_storage_collections = migrated_storage_collections
3255 .into_iter()
3256 .flat_map(|item_id| self.catalog.get_entry(item_id).global_ids())
3257 .collect();
3258
3259 self.controller
3264 .storage
3265 .evolve_nullability_for_bootstrap(storage_metadata, compute_collections)
3266 .await
3267 .unwrap_or_terminate("cannot fail to evolve collections");
3268
3269 let mut pending: BTreeMap<_, _> = collections.into_iter().collect();
3282
3283 let transitive_dep_gids: BTreeMap<_, _> = pending
3285 .keys()
3286 .map(|gid| {
3287 let entry = self.catalog.get_entry_by_global_id(gid);
3288 let item_id = entry.id();
3289 let deps = self.catalog.state().transitive_uses(item_id);
3290 let dep_gids: BTreeSet<_> = deps
3291 .filter(|dep_id| *dep_id != item_id)
3294 .map(|dep_id| self.catalog.get_entry(&dep_id).latest_global_id())
3295 .filter(|dep_gid| pending.contains_key(dep_gid))
3297 .collect();
3298 (*gid, dep_gids)
3299 })
3300 .collect();
3301
3302 while !pending.is_empty() {
3303 let ready_gids: BTreeSet<_> = pending
3306 .keys()
3307 .filter(|gid| {
3308 let mut deps = transitive_dep_gids[gid].iter();
3309 !deps.any(|dep_gid| pending.contains_key(dep_gid))
3310 })
3311 .copied()
3312 .collect();
3313 let mut ready: Vec<_> = pending
3314 .extract_if(.., |gid, _| ready_gids.contains(gid))
3315 .collect();
3316
3317 for (gid, collection) in &mut ready {
3319 if !gid.is_system() || collection.since.is_some() {
3321 continue;
3322 }
3323
3324 let mut derived_since = Antichain::from_elem(Timestamp::MIN);
3325 for dep_gid in &transitive_dep_gids[gid] {
3326 let (since, _) = self
3327 .controller
3328 .storage
3329 .collection_frontiers(*dep_gid)
3330 .expect("previously registered");
3331 derived_since.join_assign(&since);
3332 }
3333 collection.since = Some(derived_since);
3334 }
3335
3336 if ready.is_empty() {
3337 soft_panic_or_log!(
3338 "cycle in storage collections: {:?}",
3339 pending.keys().collect::<Vec<_>>(),
3340 );
3341 ready = mem::take(&mut pending).into_iter().collect();
3345 }
3346
3347 self.controller
3348 .storage
3349 .create_collections_for_bootstrap(
3350 storage_metadata,
3351 Some(register_ts),
3352 ready,
3353 &migrated_storage_collections,
3354 )
3355 .await
3356 .unwrap_or_terminate("cannot fail to create collections");
3357 }
3358
3359 if !self.controller.read_only() {
3360 self.apply_local_write(register_ts).await;
3361 }
3362 }
3363
3364 fn bootstrap_sort_catalog_entries(&self) -> Vec<CatalogEntry> {
3371 let mut indexes_on = BTreeMap::<_, Vec<_>>::new();
3372 let mut non_indexes = Vec::new();
3373 for entry in self.catalog().entries().cloned() {
3374 if let Some(index) = entry.index() {
3375 let on = self.catalog().get_entry_by_global_id(&index.on);
3376 indexes_on.entry(on.id()).or_default().push(entry);
3377 } else {
3378 non_indexes.push(entry);
3379 }
3380 }
3381
3382 let key_fn = |entry: &CatalogEntry| entry.id;
3383 let dependencies_fn = |entry: &CatalogEntry| entry.uses();
3384 sort_topological(&mut non_indexes, key_fn, dependencies_fn);
3385
3386 let mut result = Vec::new();
3387 for entry in non_indexes {
3388 let id = entry.id();
3389 result.push(entry);
3390 if let Some(mut indexes) = indexes_on.remove(&id) {
3391 result.append(&mut indexes);
3392 }
3393 }
3394
3395 soft_assert_or_log!(
3396 indexes_on.is_empty(),
3397 "indexes with missing dependencies: {indexes_on:?}",
3398 );
3399
3400 result
3401 }
3402
3403 #[instrument]
3414 fn bootstrap_dataflow_plans(
3415 &mut self,
3416 ordered_catalog_entries: &[CatalogEntry],
3417 mut cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
3418 ) -> Result<BTreeMap<GlobalId, GlobalExpressions>, AdapterError> {
3419 let mut instance_snapshots = BTreeMap::new();
3425 let mut uncached_expressions = BTreeMap::new();
3426
3427 let optimizer_config = |catalog: &Catalog, cluster_id| {
3428 let system_config = catalog.system_config();
3429 let overrides = catalog.get_cluster(cluster_id).config.features();
3430 OptimizerConfig::from(system_config)
3431 .override_from(&overrides)
3432 .override_from(
3435 &catalog
3436 .state()
3437 .cluster_scoped_optimizer_overrides(cluster_id),
3438 )
3439 };
3440
3441 for entry in ordered_catalog_entries {
3442 match entry.item() {
3443 CatalogItem::Index(idx) => {
3444 let compute_instance =
3446 instance_snapshots.entry(idx.cluster_id).or_insert_with(|| {
3447 self.instance_snapshot(idx.cluster_id)
3448 .expect("compute instance exists")
3449 });
3450 let global_id = idx.global_id();
3451
3452 if compute_instance.contains_collection(&global_id) {
3455 continue;
3456 }
3457
3458 let optimizer_config = optimizer_config(&self.catalog, idx.cluster_id);
3459
3460 let (optimized_plan, physical_plan, metainfo) =
3461 match cached_global_exprs.remove(&global_id) {
3462 Some(global_expressions)
3463 if global_expressions.optimizer_features
3464 == optimizer_config.features =>
3465 {
3466 debug!("global expression cache hit for {global_id:?}");
3467 (
3468 global_expressions.global_mir,
3469 global_expressions.physical_plan,
3470 global_expressions.dataflow_metainfos,
3471 )
3472 }
3473 Some(_) | None => {
3474 let (optimized_plan, global_lir_plan) = {
3475 let mut optimizer = optimize::index::Optimizer::new(
3477 self.owned_catalog(),
3478 compute_instance.clone(),
3479 global_id,
3480 optimizer_config.clone(),
3481 self.optimizer_metrics(),
3482 );
3483
3484 let index_plan = optimize::index::Index::new(
3486 entry.name().clone(),
3487 idx.on,
3488 idx.keys.to_vec(),
3489 );
3490 let global_mir_plan = optimizer.optimize(index_plan)?;
3491 let optimized_plan = global_mir_plan.df_desc().clone();
3492
3493 let global_lir_plan = optimizer.optimize(global_mir_plan)?;
3495
3496 (optimized_plan, global_lir_plan)
3497 };
3498
3499 let (physical_plan, metainfo) = global_lir_plan.unapply();
3500 let metainfo = {
3501 let notice_ids =
3503 std::iter::repeat_with(|| self.allocate_transient_id())
3504 .map(|(_item_id, gid)| gid)
3505 .take(metainfo.optimizer_notices.len())
3506 .collect::<Vec<_>>();
3507 self.catalog().render_notices(
3509 metainfo,
3510 notice_ids,
3511 Some(idx.global_id()),
3512 )
3513 };
3514 uncached_expressions.insert(
3515 global_id,
3516 GlobalExpressions {
3517 global_mir: optimized_plan.clone(),
3518 physical_plan: physical_plan.clone(),
3519 dataflow_metainfos: metainfo.clone(),
3520 optimizer_features: optimizer_config.features.clone(),
3521 },
3522 );
3523 (optimized_plan, physical_plan, metainfo)
3524 }
3525 };
3526
3527 let catalog = self.catalog_mut();
3528 catalog.set_optimized_plan(idx.global_id(), optimized_plan);
3529 catalog.set_physical_plan(idx.global_id(), physical_plan);
3530 catalog.set_dataflow_metainfo(idx.global_id(), metainfo);
3531
3532 compute_instance.insert_collection(idx.global_id());
3533 }
3534 CatalogItem::MaterializedView(mv) => {
3535 let compute_instance =
3537 instance_snapshots.entry(mv.cluster_id).or_insert_with(|| {
3538 self.instance_snapshot(mv.cluster_id)
3539 .expect("compute instance exists")
3540 });
3541 let global_id = mv.global_id_writes();
3542
3543 let optimizer_config = optimizer_config(&self.catalog, mv.cluster_id);
3544
3545 let (optimized_plan, physical_plan, metainfo) = match cached_global_exprs
3546 .remove(&global_id)
3547 {
3548 Some(global_expressions)
3549 if global_expressions.optimizer_features
3550 == optimizer_config.features =>
3551 {
3552 debug!("global expression cache hit for {global_id:?}");
3553 (
3554 global_expressions.global_mir,
3555 global_expressions.physical_plan,
3556 global_expressions.dataflow_metainfos,
3557 )
3558 }
3559 Some(_) | None => {
3560 let (_, internal_view_id) = self.allocate_transient_id();
3561 let debug_name = self
3562 .catalog()
3563 .resolve_full_name(entry.name(), None)
3564 .to_string();
3565
3566 let (optimized_plan, global_lir_plan) = {
3567 let mut optimizer = optimize::materialized_view::Optimizer::new(
3569 self.owned_catalog().as_optimizer_catalog(),
3570 compute_instance.clone(),
3571 global_id,
3572 internal_view_id,
3573 mv.desc.latest().iter_names().cloned().collect(),
3574 mv.non_null_assertions.clone(),
3575 mv.refresh_schedule.clone(),
3576 debug_name,
3577 optimizer_config.clone(),
3578 self.optimizer_metrics(),
3579 );
3580
3581 let typ = infer_sql_type_for_catalog(
3584 &mv.raw_expr,
3585 &mv.locally_optimized_expr.as_ref().clone(),
3586 );
3587 let global_mir_plan = optimizer
3588 .optimize((mv.locally_optimized_expr.as_ref().clone(), typ))?;
3589 let optimized_plan = global_mir_plan.df_desc().clone();
3590
3591 let global_lir_plan = optimizer.optimize(global_mir_plan)?;
3593
3594 (optimized_plan, global_lir_plan)
3595 };
3596
3597 let (physical_plan, metainfo) = global_lir_plan.unapply();
3598 let metainfo = {
3599 let notice_ids =
3601 std::iter::repeat_with(|| self.allocate_transient_id())
3602 .map(|(_item_id, global_id)| global_id)
3603 .take(metainfo.optimizer_notices.len())
3604 .collect::<Vec<_>>();
3605 self.catalog().render_notices(
3607 metainfo,
3608 notice_ids,
3609 Some(mv.global_id_writes()),
3610 )
3611 };
3612 uncached_expressions.insert(
3613 global_id,
3614 GlobalExpressions {
3615 global_mir: optimized_plan.clone(),
3616 physical_plan: physical_plan.clone(),
3617 dataflow_metainfos: metainfo.clone(),
3618 optimizer_features: optimizer_config.features.clone(),
3619 },
3620 );
3621 (optimized_plan, physical_plan, metainfo)
3622 }
3623 };
3624
3625 let catalog = self.catalog_mut();
3626 catalog.set_optimized_plan(mv.global_id_writes(), optimized_plan);
3627 catalog.set_physical_plan(mv.global_id_writes(), physical_plan);
3628 catalog.set_dataflow_metainfo(mv.global_id_writes(), metainfo);
3629
3630 compute_instance.insert_collection(mv.global_id_writes());
3631 }
3632 CatalogItem::Table(_)
3633 | CatalogItem::Source(_)
3634 | CatalogItem::Log(_)
3635 | CatalogItem::View(_)
3636 | CatalogItem::Sink(_)
3637 | CatalogItem::Type(_)
3638 | CatalogItem::Func(_)
3639 | CatalogItem::Secret(_)
3640 | CatalogItem::Connection(_) => (),
3641 }
3642 }
3643
3644 Ok(uncached_expressions)
3645 }
3646
3647 async fn bootstrap_dataflow_as_ofs(&mut self) -> BTreeMap<GlobalId, ReadHold> {
3657 let mut catalog_ids = Vec::new();
3658 let mut dataflows = Vec::new();
3659 let mut read_policies = BTreeMap::new();
3660 for entry in self.catalog.entries() {
3661 let gid = match entry.item() {
3662 CatalogItem::Index(idx) => idx.global_id(),
3663 CatalogItem::MaterializedView(mv) => mv.global_id_writes(),
3664 CatalogItem::Table(_)
3665 | CatalogItem::Source(_)
3666 | CatalogItem::Log(_)
3667 | CatalogItem::View(_)
3668 | CatalogItem::Sink(_)
3669 | CatalogItem::Type(_)
3670 | CatalogItem::Func(_)
3671 | CatalogItem::Secret(_)
3672 | CatalogItem::Connection(_) => continue,
3673 };
3674 if let Some(plan) = self.catalog.try_get_physical_plan(&gid) {
3675 catalog_ids.push(gid);
3676 dataflows.push(plan.clone());
3677
3678 if let Some(compaction_window) = entry.item().initial_logical_compaction_window() {
3679 read_policies.insert(gid, compaction_window.into());
3680 }
3681 }
3682 }
3683
3684 let read_ts = self.get_local_read_ts().await;
3685 let read_holds = as_of_selection::run(
3686 &mut dataflows,
3687 &read_policies,
3688 &*self.controller.storage_collections,
3689 read_ts,
3690 self.controller.read_only(),
3691 );
3692
3693 let catalog = self.catalog_mut();
3694 for (id, plan) in catalog_ids.into_iter().zip_eq(dataflows) {
3695 catalog.set_physical_plan(id, plan);
3696 }
3697
3698 read_holds
3699 }
3700
3701 fn serve(
3710 mut self,
3711 mut internal_cmd_rx: mpsc::UnboundedReceiver<Message>,
3712 mut strict_serializable_reads_rx: mpsc::UnboundedReceiver<(ConnectionId, PendingReadTxn)>,
3713 mut cmd_rx: mpsc::UnboundedReceiver<(OpenTelemetryContext, Command)>,
3714 group_commit_rx: appends::GroupCommitWaiter,
3715 ) -> LocalBoxFuture<'static, ()> {
3716 async move {
3717 let mut cluster_events = self.controller.events_stream();
3719 let last_message = Arc::new(Mutex::new(LastMessage {
3720 kind: "none",
3721 stmt: None,
3722 }));
3723
3724 let (idle_tx, mut idle_rx) = tokio::sync::mpsc::channel(1);
3725 let idle_metric = self.metrics.queue_busy_seconds.clone();
3726 let last_message_watchdog = Arc::clone(&last_message);
3727
3728 spawn(|| "coord watchdog", async move {
3729 let mut interval = tokio::time::interval(Duration::from_secs(5));
3734 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
3738
3739 let mut coord_stuck = false;
3741
3742 loop {
3743 interval.tick().await;
3744
3745 let duration = tokio::time::Duration::from_secs(30);
3747 let timeout = tokio::time::timeout(duration, idle_tx.reserve()).await;
3748 let Ok(maybe_permit) = timeout else {
3749 if !coord_stuck {
3751 let last_message = last_message_watchdog.lock().expect("poisoned");
3752 tracing::warn!(
3753 last_message_kind = %last_message.kind,
3754 last_message_sql = %last_message.stmt_to_string(),
3755 "coordinator stuck for {duration:?}",
3756 );
3757 }
3758 coord_stuck = true;
3759
3760 continue;
3761 };
3762
3763 if coord_stuck {
3765 tracing::info!("Coordinator became unstuck");
3766 }
3767 coord_stuck = false;
3768
3769 let Ok(permit) = maybe_permit else {
3771 break;
3772 };
3773
3774 permit.send(idle_metric.start_timer());
3775 }
3776 });
3777
3778 self.schedule_storage_usage_collection().await;
3779 self.schedule_arrangement_sizes_collection().await;
3780 self.spawn_privatelink_vpc_endpoints_watch_task();
3781 self.spawn_statement_logging_task();
3782 self.spawn_catalog_info_metrics_task();
3783 flags::tracing_config(self.catalog.system_config()).apply(&self.tracing_handle);
3784
3785 let warn_threshold = self
3787 .catalog()
3788 .system_config()
3789 .coord_slow_message_warn_threshold();
3790
3791 const MESSAGE_BATCH: usize = 64;
3793 let mut messages = Vec::with_capacity(MESSAGE_BATCH);
3794 let mut cmd_messages = Vec::with_capacity(MESSAGE_BATCH);
3795
3796 let message_batch = self.metrics.message_batch.clone();
3797
3798 loop {
3799 select! {
3803 biased;
3808
3809 _ = internal_cmd_rx.recv_many(&mut messages, MESSAGE_BATCH) => {},
3813 Some(event) = cluster_events.next() => {
3817 messages.push(Message::ClusterEvent(event))
3818 },
3819 () = self.controller.ready() => {
3823 let controller = match self.controller.get_readiness() {
3827 Readiness::Storage => ControllerReadiness::Storage,
3828 Readiness::Compute => ControllerReadiness::Compute,
3829 Readiness::Metrics(_) => ControllerReadiness::Metrics,
3830 Readiness::Internal(_) => ControllerReadiness::Internal,
3831 Readiness::NotReady => unreachable!("just signaled as ready"),
3832 };
3833 messages.push(Message::ControllerReady { controller });
3834 }
3835 permit = group_commit_rx.ready() => {
3838 let user_write_spans = self.pending_writes.iter().flat_map(|x| match x {
3844 PendingWriteTxn::User{span, ..} => Some(span),
3845 PendingWriteTxn::System{..} => None,
3846 });
3847 let span = match user_write_spans.exactly_one() {
3848 Ok(span) => span.clone(),
3849 Err(user_write_spans) => {
3850 let span = info_span!(parent: None, "group_commit_notify");
3851 for s in user_write_spans {
3852 span.follows_from(s);
3853 }
3854 span
3855 }
3856 };
3857 messages.push(Message::GroupCommitInitiate(span, Some(permit)));
3858 },
3859 count = cmd_rx.recv_many(&mut cmd_messages, MESSAGE_BATCH) => {
3863 if count == 0 {
3864 break;
3865 } else {
3866 messages.extend(cmd_messages.drain(..).map(
3867 |(otel_ctx, cmd)| Message::Command(otel_ctx, cmd),
3868 ));
3869 }
3870 },
3871 Some(pending_read_txn) = strict_serializable_reads_rx.recv() => {
3875 let mut pending_read_txns = vec![pending_read_txn];
3876 while let Ok(pending_read_txn) = strict_serializable_reads_rx.try_recv() {
3877 pending_read_txns.push(pending_read_txn);
3878 }
3879 for (conn_id, pending_read_txn) in pending_read_txns {
3880 let prev = self
3881 .pending_linearize_read_txns
3882 .insert(conn_id, pending_read_txn);
3883 soft_assert_or_log!(
3884 prev.is_none(),
3885 "connections can not have multiple concurrent reads, prev: {prev:?}"
3886 )
3887 }
3888 messages.push(Message::LinearizeReads);
3889 }
3890 _ = self.advance_timelines_interval.tick() => {
3894 let span = info_span!(parent: None, "coord::advance_timelines_interval");
3895 span.follows_from(Span::current());
3896
3897 if self.controller.read_only() {
3902 messages.push(Message::AdvanceTimelines);
3903 } else {
3904 messages.push(Message::GroupCommitInitiate(span, None));
3905 }
3906 },
3907 _ = self.check_cluster_scheduling_policies_interval.tick() => {
3911 messages.push(Message::CheckSchedulingPolicies);
3912 },
3913
3914 _ = self.caught_up_check_interval.tick() => {
3918 self.maybe_check_caught_up().await;
3923
3924 continue;
3925 },
3926
3927 timer = idle_rx.recv() => {
3932 timer.expect("does not drop").observe_duration();
3933 self.metrics
3934 .message_handling
3935 .with_label_values(&["watchdog"])
3936 .observe(0.0);
3937 continue;
3938 }
3939 };
3940
3941 message_batch.observe(f64::cast_lossy(messages.len()));
3943
3944 for msg in messages.drain(..) {
3945 let msg_kind = msg.kind();
3948 let span = span!(
3949 target: "mz_adapter::coord::handle_message_loop",
3950 Level::INFO,
3951 "coord::handle_message",
3952 kind = msg_kind
3953 );
3954 let otel_context = span.context().span().span_context().clone();
3955
3956 *last_message.lock().expect("poisoned") = LastMessage {
3960 kind: msg_kind,
3961 stmt: match &msg {
3962 Message::Command(
3963 _,
3964 Command::Execute {
3965 portal_name,
3966 session,
3967 ..
3968 },
3969 ) => session
3970 .get_portal_unverified(portal_name)
3971 .and_then(|p| p.stmt.as_ref().map(Arc::clone)),
3972 _ => None,
3973 },
3974 };
3975
3976 let start = Instant::now();
3977 self.handle_message(msg).instrument(span).await;
3978 let duration = start.elapsed();
3979
3980 self.metrics
3981 .message_handling
3982 .with_label_values(&[msg_kind])
3983 .observe(duration.as_secs_f64());
3984
3985 if duration > warn_threshold {
3987 let trace_id = otel_context.is_valid().then(|| otel_context.trace_id());
3988 tracing::error!(
3989 ?msg_kind,
3990 ?trace_id,
3991 ?duration,
3992 "very slow coordinator message"
3993 );
3994 }
3995 }
3996 }
3997 if let Some(catalog) = Arc::into_inner(self.catalog) {
4000 catalog.expire().await;
4001 }
4002 }
4003 .boxed_local()
4004 }
4005
4006 fn catalog(&self) -> &Catalog {
4008 &self.catalog
4009 }
4010
4011 fn owned_catalog(&self) -> Arc<Catalog> {
4014 Arc::clone(&self.catalog)
4015 }
4016
4017 fn optimizer_metrics(&self) -> OptimizerMetrics {
4020 self.optimizer_metrics.clone()
4021 }
4022
4023 fn catalog_mut(&mut self) -> &mut Catalog {
4025 Arc::make_mut(&mut self.catalog)
4033 }
4034
4035 async fn refill_user_id_pool(&mut self, min_count: u64) -> Result<(), AdapterError> {
4040 let batch_size = USER_ID_POOL_BATCH_SIZE.get(self.catalog().system_config().dyncfgs());
4041 let to_allocate = min_count.max(u64::from(batch_size));
4042 let id_ts = self.get_catalog_write_ts().await;
4043 let ids = self.catalog().allocate_user_ids(to_allocate, id_ts).await?;
4044 if let (Some((first_id, _)), Some((last_id, _))) = (ids.first(), ids.last()) {
4045 let start = match first_id {
4046 CatalogItemId::User(id) => *id,
4047 other => {
4048 return Err(AdapterError::Internal(format!(
4049 "expected User CatalogItemId, got {other:?}"
4050 )));
4051 }
4052 };
4053 let end = match last_id {
4054 CatalogItemId::User(id) => *id + 1, other => {
4056 return Err(AdapterError::Internal(format!(
4057 "expected User CatalogItemId, got {other:?}"
4058 )));
4059 }
4060 };
4061 self.user_id_pool.refill(start, end);
4062 } else {
4063 return Err(AdapterError::Internal(
4064 "catalog returned no user IDs".into(),
4065 ));
4066 }
4067 Ok(())
4068 }
4069
4070 async fn allocate_user_id(&mut self) -> Result<(CatalogItemId, GlobalId), AdapterError> {
4072 if let Some(id) = self.user_id_pool.allocate() {
4073 return Ok((CatalogItemId::User(id), GlobalId::User(id)));
4074 }
4075 self.refill_user_id_pool(1).await?;
4076 let id = self.user_id_pool.allocate().expect("ID pool just refilled");
4077 Ok((CatalogItemId::User(id), GlobalId::User(id)))
4078 }
4079
4080 async fn allocate_user_ids(
4082 &mut self,
4083 count: u64,
4084 ) -> Result<Vec<(CatalogItemId, GlobalId)>, AdapterError> {
4085 if self.user_id_pool.remaining() < count {
4086 self.refill_user_id_pool(count).await?;
4087 }
4088 let raw_ids = self
4089 .user_id_pool
4090 .allocate_many(count)
4091 .expect("pool has enough IDs after refill");
4092 Ok(raw_ids
4093 .into_iter()
4094 .map(|id| (CatalogItemId::User(id), GlobalId::User(id)))
4095 .collect())
4096 }
4097
4098 fn connection_context(&self) -> &ConnectionContext {
4100 self.controller.connection_context()
4101 }
4102
4103 fn secrets_reader(&self) -> &Arc<dyn SecretsReader> {
4105 &self.connection_context().secrets_reader
4106 }
4107
4108 #[allow(dead_code)]
4113 pub(crate) fn broadcast_notice(&self, notice: AdapterNotice) {
4114 for meta in self.active_conns.values() {
4115 let _ = meta.notice_tx.send(notice.clone());
4116 }
4117 }
4118
4119 pub(crate) fn broadcast_notice_tx(
4122 &self,
4123 ) -> Box<dyn FnOnce(AdapterNotice) -> () + Send + 'static> {
4124 let senders: Vec<_> = self
4125 .active_conns
4126 .values()
4127 .map(|meta| meta.notice_tx.clone())
4128 .collect();
4129 Box::new(move |notice| {
4130 for tx in senders {
4131 let _ = tx.send(notice.clone());
4132 }
4133 })
4134 }
4135
4136 pub(crate) fn active_conns(&self) -> &BTreeMap<ConnectionId, ConnMeta> {
4137 &self.active_conns
4138 }
4139
4140 #[instrument(level = "debug")]
4141 pub(crate) fn retire_execution(
4142 &mut self,
4143 reason: StatementEndedExecutionReason,
4144 ctx_extra: ExecuteContextExtra,
4145 ) {
4146 if let Some(uuid) = ctx_extra.retire() {
4147 self.end_statement_execution(uuid, reason);
4148 }
4149 }
4150
4151 #[instrument(level = "debug")]
4153 pub fn dataflow_builder(&self, instance: ComputeInstanceId) -> DataflowBuilder<'_> {
4154 let compute = self
4155 .instance_snapshot(instance)
4156 .expect("compute instance does not exist");
4157 DataflowBuilder::new(self.catalog().state(), compute)
4158 }
4159
4160 pub fn instance_snapshot(
4162 &self,
4163 id: ComputeInstanceId,
4164 ) -> Result<ComputeInstanceSnapshot, InstanceMissing> {
4165 ComputeInstanceSnapshot::new(&self.controller, id)
4166 }
4167
4168 pub(crate) async fn ship_dataflow(
4175 &mut self,
4176 dataflow: DataflowDescription<Plan>,
4177 instance: ComputeInstanceId,
4178 target_replica: Option<ReplicaId>,
4179 ) {
4180 self.try_ship_dataflow(dataflow, instance, target_replica)
4181 .await
4182 .unwrap_or_terminate("dataflow creation cannot fail");
4183 }
4184
4185 pub(crate) async fn try_ship_dataflow(
4188 &mut self,
4189 dataflow: DataflowDescription<Plan>,
4190 instance: ComputeInstanceId,
4191 target_replica: Option<ReplicaId>,
4192 ) -> Result<(), DataflowCreationError> {
4193 let export_ids = dataflow.exported_index_ids().collect();
4196
4197 self.controller
4198 .compute
4199 .create_dataflow(instance, dataflow, target_replica)?;
4200
4201 self.initialize_compute_read_policies(export_ids, instance, CompactionWindow::Default)
4202 .await;
4203
4204 Ok(())
4205 }
4206
4207 pub(crate) fn allow_writes(&mut self, instance: ComputeInstanceId, id: GlobalId) {
4211 self.controller
4212 .compute
4213 .allow_writes(instance, id)
4214 .unwrap_or_terminate("allow_writes cannot fail");
4215 }
4216
4217 pub(crate) async fn ship_dataflow_and_notice_builtin_table_updates(
4219 &mut self,
4220 dataflow: DataflowDescription<Plan>,
4221 instance: ComputeInstanceId,
4222 notice_builtin_updates_fut: Option<BuiltinTableAppendNotify>,
4223 target_replica: Option<ReplicaId>,
4224 ) {
4225 if let Some(notice_builtin_updates_fut) = notice_builtin_updates_fut {
4226 let ship_dataflow_fut = self.ship_dataflow(dataflow, instance, target_replica);
4227 let ((), ()) =
4228 futures::future::join(notice_builtin_updates_fut, ship_dataflow_fut).await;
4229 } else {
4230 self.ship_dataflow(dataflow, instance, target_replica).await;
4231 }
4232 }
4233
4234 pub fn install_compute_watch_set(
4238 &mut self,
4239 conn_id: ConnectionId,
4240 objects: BTreeSet<GlobalId>,
4241 t: Timestamp,
4242 state: WatchSetResponse,
4243 ) -> Result<(), CollectionLookupError> {
4244 let ws_id = self.controller.install_compute_watch_set(objects, t)?;
4245 self.connection_watch_sets
4246 .entry(conn_id.clone())
4247 .or_default()
4248 .insert(ws_id);
4249 self.installed_watch_sets.insert(ws_id, (conn_id, state));
4250 Ok(())
4251 }
4252
4253 pub fn install_storage_watch_set(
4257 &mut self,
4258 conn_id: ConnectionId,
4259 objects: BTreeSet<GlobalId>,
4260 t: Timestamp,
4261 state: WatchSetResponse,
4262 ) -> Result<(), CollectionMissing> {
4263 let ws_id = self.controller.install_storage_watch_set(objects, t)?;
4264 self.connection_watch_sets
4265 .entry(conn_id.clone())
4266 .or_default()
4267 .insert(ws_id);
4268 self.installed_watch_sets.insert(ws_id, (conn_id, state));
4269 Ok(())
4270 }
4271
4272 pub fn cancel_pending_watchsets(&mut self, conn_id: &ConnectionId) {
4274 if let Some(ws_ids) = self.connection_watch_sets.remove(conn_id) {
4275 for ws_id in ws_ids {
4276 self.installed_watch_sets.remove(&ws_id);
4277 }
4278 }
4279 }
4280
4281 pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
4285 let global_timelines: BTreeMap<_, _> = self
4291 .global_timelines
4292 .iter()
4293 .map(|(timeline, state)| (timeline.to_string(), format!("{state:?}")))
4294 .collect();
4295 let active_conns: BTreeMap<_, _> = self
4296 .active_conns
4297 .iter()
4298 .map(|(id, meta)| (id.unhandled().to_string(), format!("{meta:?}")))
4299 .collect();
4300 let txn_read_holds: BTreeMap<_, _> = self
4301 .txn_read_holds
4302 .iter()
4303 .map(|(id, capability)| (id.unhandled().to_string(), format!("{capability:?}")))
4304 .collect();
4305 let pending_peeks: BTreeMap<_, _> = self
4306 .pending_peeks
4307 .iter()
4308 .map(|(id, peek)| (id.to_string(), format!("{peek:?}")))
4309 .collect();
4310 let client_pending_peeks: BTreeMap<_, _> = self
4311 .client_pending_peeks
4312 .iter()
4313 .map(|(id, peek)| {
4314 let peek: BTreeMap<_, _> = peek
4315 .iter()
4316 .map(|(uuid, storage_id)| (uuid.to_string(), storage_id))
4317 .collect();
4318 (id.to_string(), peek)
4319 })
4320 .collect();
4321 let pending_linearize_read_txns: BTreeMap<_, _> = self
4322 .pending_linearize_read_txns
4323 .iter()
4324 .map(|(id, read_txn)| (id.unhandled().to_string(), format!("{read_txn:?}")))
4325 .collect();
4326
4327 Ok(serde_json::json!({
4328 "global_timelines": global_timelines,
4329 "active_conns": active_conns,
4330 "txn_read_holds": txn_read_holds,
4331 "pending_peeks": pending_peeks,
4332 "client_pending_peeks": client_pending_peeks,
4333 "pending_linearize_read_txns": pending_linearize_read_txns,
4334 "controller": self.controller.dump().await?,
4335 }))
4336 }
4337
4338 async fn prune_storage_usage_events_on_startup(&self, retention_period: Duration) {
4352 let item_id = self
4353 .catalog()
4354 .resolve_builtin_table(&MZ_STORAGE_USAGE_BY_SHARD);
4355 let global_id = self.catalog.get_entry(&item_id).latest_global_id();
4356 let read_ts = self.get_local_read_ts().await;
4357 let current_contents_fut = self
4358 .controller
4359 .storage_collections
4360 .snapshot(global_id, read_ts);
4361 let internal_cmd_tx = self.internal_cmd_tx.clone();
4362 spawn(|| "storage_usage_prune", async move {
4363 let mut current_contents = current_contents_fut
4364 .await
4365 .unwrap_or_terminate("cannot fail to fetch snapshot");
4366 differential_dataflow::consolidation::consolidate(&mut current_contents);
4367
4368 let cutoff_ts = u128::from(read_ts).saturating_sub(retention_period.as_millis());
4369 let mut expired = Vec::new();
4370 for (row, diff) in current_contents {
4371 assert_eq!(
4372 diff, 1,
4373 "consolidated contents should not contain retractions: ({row:#?}, {diff:#?})"
4374 );
4375 let collection_timestamp = row
4377 .unpack()
4378 .get(3)
4379 .expect("definition of mz_storage_by_shard changed")
4380 .unwrap_timestamptz();
4381 let collection_timestamp = collection_timestamp.timestamp_millis();
4382 let collection_timestamp: u128 = collection_timestamp
4383 .try_into()
4384 .expect("all collections happen after Jan 1 1970");
4385 if collection_timestamp < cutoff_ts {
4386 debug!("pruning storage event {row:?}");
4387 let builtin_update = BuiltinTableUpdate::row(item_id, row, Diff::MINUS_ONE);
4388 expired.push(builtin_update);
4389 }
4390 }
4391
4392 let _ = internal_cmd_tx.send(Message::StorageUsagePrune(expired));
4394 });
4395 }
4396
4397 async fn prune_arrangement_sizes_history_on_startup(&self) {
4406 if self.controller.read_only() {
4408 return;
4409 }
4410
4411 let retention_period = mz_adapter_types::dyncfgs::ARRANGEMENT_SIZE_HISTORY_RETENTION_PERIOD
4412 .get(self.catalog().system_config().dyncfgs());
4413 let item_id = self
4414 .catalog()
4415 .resolve_builtin_table(&mz_catalog::builtin::MZ_OBJECT_ARRANGEMENT_SIZE_HISTORY);
4416 let global_id = self.catalog.get_entry(&item_id).latest_global_id();
4417 let read_ts = self.get_local_read_ts().await;
4418 let current_contents_fut = self
4419 .controller
4420 .storage_collections
4421 .snapshot(global_id, read_ts);
4422 let internal_cmd_tx = self.internal_cmd_tx.clone();
4423 spawn(|| "arrangement_sizes_history_prune", async move {
4424 let mut current_contents = current_contents_fut
4425 .await
4426 .unwrap_or_terminate("cannot fail to fetch snapshot");
4427 differential_dataflow::consolidation::consolidate(&mut current_contents);
4428
4429 let cutoff_ts = u128::from(read_ts).saturating_sub(retention_period.as_millis());
4430 let expired =
4431 arrangement_sizes_expired_retractions(current_contents, cutoff_ts, item_id);
4432
4433 let _ = internal_cmd_tx.send(Message::ArrangementSizesPrune(expired));
4437 });
4438 }
4439
4440 fn current_credit_consumption_rate(&self) -> Numeric {
4441 self.catalog()
4442 .user_cluster_replicas()
4443 .filter_map(|replica| match &replica.config.location {
4444 ReplicaLocation::Managed(location) => Some(location.size_for_billing()),
4445 ReplicaLocation::Unmanaged(_) => None,
4446 })
4447 .map(|size| {
4448 self.catalog()
4449 .cluster_replica_sizes()
4450 .0
4451 .get(size)
4452 .expect("location size is validated against the cluster replica sizes")
4453 .credits_per_hour
4454 })
4455 .sum()
4456 }
4457}
4458
4459fn arrangement_sizes_expired_retractions(
4467 rows: impl IntoIterator<Item = (mz_repr::Row, i64)>,
4468 cutoff_ts: u128,
4469 item_id: CatalogItemId,
4470) -> Vec<BuiltinTableUpdate> {
4471 let mut expired = Vec::new();
4472 for (row, diff) in rows {
4473 assert_eq!(
4474 diff, 1,
4475 "consolidated contents should not contain retractions: ({row:#?}, {diff:#?})"
4476 );
4477 let collection_timestamp = row
4478 .unpack()
4479 .get(3)
4480 .expect("definition of mz_object_arrangement_size_history changed")
4481 .unwrap_timestamptz()
4482 .timestamp_millis();
4483 let collection_timestamp: u128 = collection_timestamp
4484 .try_into()
4485 .expect("all collections happen after Jan 1 1970");
4486 if collection_timestamp < cutoff_ts {
4487 expired.push(BuiltinTableUpdate::row(item_id, row, Diff::MINUS_ONE));
4488 }
4489 }
4490 expired
4491}
4492
4493#[cfg(test)]
4494impl Coordinator {
4495 #[allow(dead_code)]
4496 async fn verify_ship_dataflow_no_error(&mut self, dataflow: DataflowDescription<Plan>) {
4497 let compute_instance = ComputeInstanceId::user(1).expect("1 is a valid ID");
4505
4506 let _: () = self.ship_dataflow(dataflow, compute_instance, None).await;
4507 }
4508}
4509
4510struct LastMessage {
4512 kind: &'static str,
4513 stmt: Option<Arc<Statement<Raw>>>,
4514}
4515
4516impl LastMessage {
4517 fn stmt_to_string(&self) -> Cow<'static, str> {
4519 self.stmt
4520 .as_ref()
4521 .map(|stmt| stmt.to_ast_string_redacted().into())
4522 .unwrap_or(Cow::Borrowed("<none>"))
4523 }
4524}
4525
4526impl fmt::Debug for LastMessage {
4527 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4528 f.debug_struct("LastMessage")
4529 .field("kind", &self.kind)
4530 .field("stmt", &self.stmt_to_string())
4531 .finish()
4532 }
4533}
4534
4535impl Drop for LastMessage {
4536 fn drop(&mut self) {
4537 if std::thread::panicking() {
4539 eprintln!("Coordinator panicking, dumping last message\n{self:?}",);
4541 }
4542 }
4543}
4544
4545pub fn serve(
4557 Config {
4558 controller_config,
4559 controller_envd_epoch,
4560 mut storage,
4561 audit_logs_iterator,
4562 timestamp_oracle_url,
4563 unsafe_mode,
4564 all_features,
4565 build_info,
4566 environment_id,
4567 metrics_registry,
4568 now,
4569 secrets_controller,
4570 cloud_resource_controller,
4571 cluster_replica_sizes,
4572 builtin_system_cluster_config,
4573 builtin_catalog_server_cluster_config,
4574 builtin_probe_cluster_config,
4575 builtin_support_cluster_config,
4576 builtin_analytics_cluster_config,
4577 system_parameter_defaults,
4578 availability_zones,
4579 storage_usage_client,
4580 storage_usage_collection_interval,
4581 storage_usage_retention_period,
4582 segment_client,
4583 egress_addresses,
4584 aws_account_id,
4585 aws_privatelink_availability_zones,
4586 connection_context,
4587 connection_limit_callback,
4588 remote_system_parameters,
4589 webhook_concurrency_limit,
4590 http_host_name,
4591 tracing_handle,
4592 read_only_controllers,
4593 caught_up_trigger: clusters_caught_up_trigger,
4594 helm_chart_version,
4595 license_key,
4596 external_login_password_mz_system,
4597 force_builtin_schema_migration,
4598 }: Config,
4599) -> BoxFuture<'static, Result<(Handle, Client), AdapterError>> {
4600 async move {
4601 let coord_start = Instant::now();
4602 info!("startup: coordinator init: beginning");
4603 info!("startup: coordinator init: preamble beginning");
4604
4605 let _builtins = LazyLock::force(&BUILTINS_STATIC);
4609
4610 let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
4611 let (internal_cmd_tx, internal_cmd_rx) = mpsc::unbounded_channel();
4612 let (strict_serializable_reads_tx, strict_serializable_reads_rx) =
4613 mpsc::unbounded_channel();
4614
4615 if !availability_zones.iter().all_unique() {
4617 coord_bail!("availability zones must be unique");
4618 }
4619
4620 let aws_principal_context = match (
4621 aws_account_id,
4622 connection_context.aws_external_id_prefix.clone(),
4623 ) {
4624 (Some(aws_account_id), Some(aws_external_id_prefix)) => Some(AwsPrincipalContext {
4625 aws_account_id,
4626 aws_external_id_prefix,
4627 }),
4628 _ => None,
4629 };
4630
4631 let aws_privatelink_availability_zones = aws_privatelink_availability_zones
4632 .map(|azs_vec| BTreeSet::from_iter(azs_vec.iter().cloned()));
4633
4634 info!(
4635 "startup: coordinator init: preamble complete in {:?}",
4636 coord_start.elapsed()
4637 );
4638 let oracle_init_start = Instant::now();
4639 info!("startup: coordinator init: timestamp oracle init beginning");
4640
4641 let timestamp_oracle_config = timestamp_oracle_url
4642 .map(|url| TimestampOracleConfig::from_url(&url, &metrics_registry))
4643 .transpose()?;
4644 let mut initial_timestamps =
4645 get_initial_oracle_timestamps(×tamp_oracle_config).await?;
4646
4647 initial_timestamps
4651 .entry(Timeline::EpochMilliseconds)
4652 .or_insert_with(mz_repr::Timestamp::minimum);
4653 let mut timestamp_oracles = BTreeMap::new();
4654 for (timeline, initial_timestamp) in initial_timestamps {
4655 Coordinator::ensure_timeline_state_with_initial_time(
4656 &timeline,
4657 initial_timestamp,
4658 now.clone(),
4659 timestamp_oracle_config.clone(),
4660 &mut timestamp_oracles,
4661 read_only_controllers,
4662 )
4663 .await;
4664 }
4665
4666 let catalog_upper = storage.current_upper().await;
4670 let epoch_millis_oracle = ×tamp_oracles
4676 .get(&Timeline::EpochMilliseconds)
4677 .expect("inserted above")
4678 .oracle;
4679
4680 let mut boot_ts = if read_only_controllers {
4681 let read_ts = epoch_millis_oracle.read_ts().await;
4682 std::cmp::max(read_ts, catalog_upper)
4683 } else {
4684 epoch_millis_oracle.apply_write(catalog_upper).await;
4687 epoch_millis_oracle.write_ts().await.timestamp
4688 };
4689
4690 info!(
4691 "startup: coordinator init: timestamp oracle init complete in {:?}",
4692 oracle_init_start.elapsed()
4693 );
4694
4695 let catalog_open_start = Instant::now();
4696 info!("startup: coordinator init: catalog open beginning");
4697 let persist_client = controller_config
4698 .persist_clients
4699 .open(controller_config.persist_location.clone())
4700 .await
4701 .context("opening persist client")?;
4702 let builtin_item_migration_config =
4703 BuiltinItemMigrationConfig {
4704 persist_client: persist_client.clone(),
4705 read_only: read_only_controllers,
4706 force_migration: force_builtin_schema_migration,
4707 }
4708 ;
4709 let OpenCatalogResult {
4710 mut catalog,
4711 migrated_storage_collections_0dt,
4712 new_builtin_collections,
4713 builtin_table_updates,
4714 cached_global_exprs,
4715 uncached_local_exprs,
4716 } = Catalog::open(mz_catalog::config::Config {
4717 storage,
4718 metrics_registry: &metrics_registry,
4719 state: mz_catalog::config::StateConfig {
4720 unsafe_mode,
4721 all_features,
4722 build_info,
4723 environment_id: environment_id.clone(),
4724 read_only: read_only_controllers,
4725 now: now.clone(),
4726 boot_ts: boot_ts.clone(),
4727 skip_migrations: false,
4728 cluster_replica_sizes,
4729 builtin_system_cluster_config,
4730 builtin_catalog_server_cluster_config,
4731 builtin_probe_cluster_config,
4732 builtin_support_cluster_config,
4733 builtin_analytics_cluster_config,
4734 system_parameter_defaults,
4735 remote_system_parameters,
4736 availability_zones,
4737 egress_addresses,
4738 aws_principal_context,
4739 aws_privatelink_availability_zones,
4740 connection_context,
4741 http_host_name,
4742 builtin_item_migration_config,
4743 persist_client: persist_client.clone(),
4744 enable_expression_cache_override: None,
4745 helm_chart_version,
4746 external_login_password_mz_system,
4747 license_key: license_key.clone(),
4748 },
4749 })
4750 .await?;
4751
4752 let catalog_upper = catalog.current_upper().await;
4755 boot_ts = std::cmp::max(boot_ts, catalog_upper);
4756
4757 if !read_only_controllers {
4758 epoch_millis_oracle.apply_write(boot_ts).await;
4759 }
4760
4761 info!(
4762 "startup: coordinator init: catalog open complete in {:?}",
4763 catalog_open_start.elapsed()
4764 );
4765
4766 let coord_thread_start = Instant::now();
4767 info!("startup: coordinator init: coordinator thread start beginning");
4768
4769 let session_id = catalog.config().session_id;
4770 let start_instant = catalog.config().start_instant;
4771
4772 let (bootstrap_tx, bootstrap_rx) = oneshot::channel();
4776 let handle = TokioHandle::current();
4777
4778 let metrics = Metrics::register_into(&metrics_registry);
4779 let metrics_clone = metrics.clone();
4780 let optimizer_metrics = OptimizerMetrics::register_into(
4781 &metrics_registry,
4782 catalog.system_config().optimizer_e2e_latency_warning_threshold(),
4783 );
4784 let segment_client_clone = segment_client.clone();
4785 let coord_now = now.clone();
4786 let advance_timelines_interval =
4787 tokio::time::interval(catalog.system_config().default_timestamp_interval());
4788 let mut check_scheduling_policies_interval = tokio::time::interval(
4789 catalog
4790 .system_config()
4791 .cluster_check_scheduling_policies_interval(),
4792 );
4793 check_scheduling_policies_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
4794
4795 let clusters_caught_up_check_interval = if read_only_controllers {
4796 let dyncfgs = catalog.system_config().dyncfgs();
4797 let interval = WITH_0DT_DEPLOYMENT_CAUGHT_UP_CHECK_INTERVAL.get(dyncfgs);
4798
4799 let mut interval = tokio::time::interval(interval);
4800 interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
4801 interval
4802 } else {
4803 let mut interval = tokio::time::interval(Duration::from_secs(60 * 60));
4811 interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
4812 interval
4813 };
4814
4815 let clusters_caught_up_check =
4816 clusters_caught_up_trigger.map(|trigger| {
4817 let mut exclude_collections: BTreeSet<GlobalId> =
4818 new_builtin_collections.into_iter().collect();
4819
4820 let mut todo: Vec<_> = migrated_storage_collections_0dt
4830 .iter()
4831 .filter(|id| {
4832 catalog.state().get_entry(id).is_materialized_view()
4833 })
4834 .copied()
4835 .collect();
4836 while let Some(item_id) = todo.pop() {
4837 let entry = catalog.state().get_entry(&item_id);
4838 exclude_collections.extend(entry.global_ids());
4839 todo.extend_from_slice(entry.used_by());
4840 }
4841
4842 CaughtUpCheckContext {
4843 trigger,
4844 exclude_collections,
4845 }
4846 });
4847
4848 if let Some(TimestampOracleConfig::Postgres(pg_config)) =
4849 timestamp_oracle_config.as_ref()
4850 {
4851 let pg_timestamp_oracle_params =
4854 flags::timestamp_oracle_config(catalog.system_config());
4855 pg_timestamp_oracle_params.apply(pg_config);
4856 }
4857
4858 let connection_limit_callback: Arc<dyn Fn(&SystemVars) + Send + Sync> =
4861 Arc::new(move |system_vars: &SystemVars| {
4862 let limit: u64 = system_vars.max_connections().cast_into();
4863 let superuser_reserved: u64 =
4864 system_vars.superuser_reserved_connections().cast_into();
4865
4866 let superuser_reserved = if superuser_reserved >= limit {
4871 tracing::warn!(
4872 "superuser_reserved ({superuser_reserved}) is greater than max connections ({limit})!"
4873 );
4874 limit
4875 } else {
4876 superuser_reserved
4877 };
4878
4879 (connection_limit_callback)(limit, superuser_reserved);
4880 });
4881 catalog.system_config_mut().register_callback(
4882 &mz_sql::session::vars::MAX_CONNECTIONS,
4883 Arc::clone(&connection_limit_callback),
4884 );
4885 catalog.system_config_mut().register_callback(
4886 &mz_sql::session::vars::SUPERUSER_RESERVED_CONNECTIONS,
4887 connection_limit_callback,
4888 );
4889
4890 let (group_commit_tx, group_commit_rx) = appends::notifier();
4891
4892 let parent_span = tracing::Span::current();
4893 let thread = thread::Builder::new()
4894 .stack_size(3 * stack::STACK_SIZE)
4898 .name("coordinator".to_string())
4899 .spawn(move || {
4900 let span = info_span!(parent: parent_span, "coord::coordinator").entered();
4901
4902 let controller = handle
4903 .block_on({
4904 catalog.initialize_controller(
4905 controller_config,
4906 controller_envd_epoch,
4907 read_only_controllers,
4908 )
4909 })
4910 .unwrap_or_terminate("failed to initialize storage_controller");
4911 let catalog_upper = handle.block_on(catalog.current_upper());
4914 boot_ts = std::cmp::max(boot_ts, catalog_upper);
4915 if !read_only_controllers {
4916 let epoch_millis_oracle = ×tamp_oracles
4917 .get(&Timeline::EpochMilliseconds)
4918 .expect("inserted above")
4919 .oracle;
4920 handle.block_on(epoch_millis_oracle.apply_write(boot_ts));
4921 }
4922
4923 let catalog = Arc::new(catalog);
4924
4925 let caching_secrets_reader = CachingSecretsReader::new(secrets_controller.reader());
4926 let mut coord = Coordinator {
4927 controller,
4928 catalog,
4929 internal_cmd_tx,
4930 group_commit_tx,
4931 strict_serializable_reads_tx,
4932 global_timelines: timestamp_oracles,
4933 transient_id_gen: Arc::new(TransientIdGen::new()),
4934 active_conns: BTreeMap::new(),
4935 txn_read_holds: Default::default(),
4936 pending_peeks: BTreeMap::new(),
4937 client_pending_peeks: BTreeMap::new(),
4938 pending_linearize_read_txns: BTreeMap::new(),
4939 serialized_ddl: LockedVecDeque::new(),
4940 active_compute_sinks: BTreeMap::new(),
4941 active_webhooks: BTreeMap::new(),
4942 active_copies: BTreeMap::new(),
4943 connection_cancel_watches: BTreeMap::new(),
4944 introspection_subscribes: BTreeMap::new(),
4945 write_locks: BTreeMap::new(),
4946 deferred_write_ops: BTreeMap::new(),
4947 pending_writes: Vec::new(),
4948 advance_timelines_interval,
4949 secrets_controller,
4950 caching_secrets_reader,
4951 cloud_resource_controller,
4952 storage_usage_client,
4953 storage_usage_collection_interval,
4954 segment_client,
4955 metrics,
4956 catalog_info_metrics_registry: metrics_registry.clone(),
4957 scoped_frontend: None,
4958 optimizer_metrics,
4959 tracing_handle,
4960 statement_logging: StatementLogging::new(coord_now.clone()),
4961 webhook_concurrency_limit,
4962 timestamp_oracle_config,
4963 check_cluster_scheduling_policies_interval: check_scheduling_policies_interval,
4964 cluster_scheduling_decisions: BTreeMap::new(),
4965 caught_up_check_interval: clusters_caught_up_check_interval,
4966 caught_up_check: clusters_caught_up_check,
4967 installed_watch_sets: BTreeMap::new(),
4968 connection_watch_sets: BTreeMap::new(),
4969 cluster_replica_statuses: ClusterReplicaStatuses::new(),
4970 read_only_controllers,
4971 buffered_builtin_table_updates: Some(Vec::new()),
4972 license_key,
4973 user_id_pool: IdPool::empty(),
4974 persist_client,
4975 };
4976 let bootstrap = handle.block_on(async {
4977 coord
4978 .bootstrap(
4979 boot_ts,
4980 migrated_storage_collections_0dt,
4981 builtin_table_updates,
4982 cached_global_exprs,
4983 uncached_local_exprs,
4984 audit_logs_iterator,
4985 )
4986 .await?;
4987 coord
4988 .controller
4989 .remove_orphaned_replicas(
4990 coord.catalog().get_next_user_replica_id().await?,
4991 coord.catalog().get_next_system_replica_id().await?,
4992 )
4993 .await
4994 .map_err(AdapterError::Orchestrator)?;
4995
4996 if let Some(retention_period) = storage_usage_retention_period {
4997 coord
4998 .prune_storage_usage_events_on_startup(retention_period)
4999 .await;
5000 }
5001
5002 coord.prune_arrangement_sizes_history_on_startup().await;
5003
5004 Ok(())
5005 });
5006 let ok = bootstrap.is_ok();
5007 drop(span);
5008 bootstrap_tx
5009 .send(bootstrap)
5010 .expect("bootstrap_rx is not dropped until it receives this message");
5011 if ok {
5012 handle.block_on(coord.serve(
5013 internal_cmd_rx,
5014 strict_serializable_reads_rx,
5015 cmd_rx,
5016 group_commit_rx,
5017 ));
5018 }
5019 })
5020 .expect("failed to create coordinator thread");
5021 match bootstrap_rx
5022 .await
5023 .expect("bootstrap_tx always sends a message or panics/halts")
5024 {
5025 Ok(()) => {
5026 info!(
5027 "startup: coordinator init: coordinator thread start complete in {:?}",
5028 coord_thread_start.elapsed()
5029 );
5030 info!(
5031 "startup: coordinator init: complete in {:?}",
5032 coord_start.elapsed()
5033 );
5034 let handle = Handle {
5035 session_id,
5036 start_instant,
5037 _thread: thread.join_on_drop(),
5038 };
5039 let client = Client::new(
5040 build_info,
5041 cmd_tx,
5042 metrics_clone,
5043 now,
5044 environment_id,
5045 segment_client_clone,
5046 );
5047 Ok((handle, client))
5048 }
5049 Err(e) => Err(e),
5050 }
5051 }
5052 .boxed()
5053}
5054
5055async fn get_initial_oracle_timestamps(
5069 timestamp_oracle_config: &Option<TimestampOracleConfig>,
5070) -> Result<BTreeMap<Timeline, Timestamp>, AdapterError> {
5071 let mut initial_timestamps = BTreeMap::new();
5072
5073 if let Some(config) = timestamp_oracle_config {
5074 let oracle_timestamps = config.get_all_timelines().await?;
5075
5076 let debug_msg = || {
5077 oracle_timestamps
5078 .iter()
5079 .map(|(timeline, ts)| format!("{:?} -> {}", timeline, ts))
5080 .join(", ")
5081 };
5082 info!(
5083 "current timestamps from the timestamp oracle: {}",
5084 debug_msg()
5085 );
5086
5087 for (timeline, ts) in oracle_timestamps {
5088 let entry = initial_timestamps
5089 .entry(Timeline::from_str(&timeline).expect("could not parse timeline"));
5090
5091 entry
5092 .and_modify(|current_ts| *current_ts = std::cmp::max(*current_ts, ts))
5093 .or_insert(ts);
5094 }
5095 } else {
5096 info!("no timestamp oracle configured!");
5097 };
5098
5099 let debug_msg = || {
5100 initial_timestamps
5101 .iter()
5102 .map(|(timeline, ts)| format!("{:?}: {}", timeline, ts))
5103 .join(", ")
5104 };
5105 info!("initial oracle timestamps: {}", debug_msg());
5106
5107 Ok(initial_timestamps)
5108}
5109
5110#[instrument]
5111pub async fn load_remote_system_parameters(
5112 storage: &mut Box<dyn OpenableDurableCatalogState>,
5113 system_parameter_sync_config: Option<SystemParameterSyncConfig>,
5114 system_parameter_sync_timeout: Duration,
5115) -> Result<Option<BTreeMap<String, String>>, AdapterError> {
5116 if let Some(system_parameter_sync_config) = system_parameter_sync_config {
5117 tracing::info!("parameter sync on boot: start sync");
5118
5119 let mut params = SynchronizedParameters::new(SystemVars::default());
5159 let frontend_sync = async {
5160 let frontend = SystemParameterFrontend::from(&system_parameter_sync_config).await?;
5161 frontend.pull(&mut params);
5162 let ops = params
5163 .modified()
5164 .into_iter()
5165 .map(|param| {
5166 let name = param.name;
5167 let value = param.value;
5168 tracing::info!(name, value, initial = true, "sync parameter");
5169 (name, value)
5170 })
5171 .collect();
5172 tracing::info!("parameter sync on boot: end sync");
5173 Ok(Some(ops))
5174 };
5175 if !storage.has_system_config_synced_once().await? {
5176 frontend_sync.await
5177 } else {
5178 match mz_ore::future::timeout(system_parameter_sync_timeout, frontend_sync).await {
5179 Ok(ops) => Ok(ops),
5180 Err(TimeoutError::Inner(e)) => Err(e),
5181 Err(TimeoutError::DeadlineElapsed) => {
5182 tracing::info!("parameter sync on boot: sync has timed out");
5183 Ok(None)
5184 }
5185 }
5186 }
5187 } else {
5188 Ok(None)
5189 }
5190}
5191
5192#[derive(Debug)]
5193pub enum WatchSetResponse {
5194 StatementDependenciesReady(StatementLoggingId, StatementLifecycleEvent),
5195 AlterSinkReady(AlterSinkReadyContext),
5196 AlterMaterializedViewReady(AlterMaterializedViewReadyContext),
5197}
5198
5199#[derive(Debug)]
5200pub struct AlterSinkReadyContext {
5201 ctx: Option<ExecuteContext>,
5202 otel_ctx: OpenTelemetryContext,
5203 plan: AlterSinkPlan,
5204 plan_validity: PlanValidity,
5205 read_hold: ReadHolds,
5206}
5207
5208impl AlterSinkReadyContext {
5209 fn ctx(&mut self) -> &mut ExecuteContext {
5210 self.ctx.as_mut().expect("only cleared on drop")
5211 }
5212
5213 fn retire(mut self, result: Result<ExecuteResponse, AdapterError>) {
5214 self.ctx
5215 .take()
5216 .expect("only cleared on drop")
5217 .retire(result);
5218 }
5219}
5220
5221impl Drop for AlterSinkReadyContext {
5222 fn drop(&mut self) {
5223 if let Some(ctx) = self.ctx.take() {
5224 ctx.retire(Err(AdapterError::Canceled));
5225 }
5226 }
5227}
5228
5229#[derive(Debug)]
5230pub struct AlterMaterializedViewReadyContext {
5231 ctx: Option<ExecuteContext>,
5232 otel_ctx: OpenTelemetryContext,
5233 plan: plan::AlterMaterializedViewApplyReplacementPlan,
5234 plan_validity: PlanValidity,
5235}
5236
5237impl AlterMaterializedViewReadyContext {
5238 fn ctx(&mut self) -> &mut ExecuteContext {
5239 self.ctx.as_mut().expect("only cleared on drop")
5240 }
5241
5242 fn retire(mut self, result: Result<ExecuteResponse, AdapterError>) {
5243 self.ctx
5244 .take()
5245 .expect("only cleared on drop")
5246 .retire(result);
5247 }
5248}
5249
5250impl Drop for AlterMaterializedViewReadyContext {
5251 fn drop(&mut self) {
5252 if let Some(ctx) = self.ctx.take() {
5253 ctx.retire(Err(AdapterError::Canceled));
5254 }
5255 }
5256}
5257
5258#[derive(Debug)]
5261struct LockedVecDeque<T> {
5262 items: VecDeque<T>,
5263 lock: Arc<tokio::sync::Mutex<()>>,
5264}
5265
5266impl<T> LockedVecDeque<T> {
5267 pub fn new() -> Self {
5268 Self {
5269 items: VecDeque::new(),
5270 lock: Arc::new(tokio::sync::Mutex::new(())),
5271 }
5272 }
5273
5274 pub fn try_lock_owned(&self) -> Result<OwnedMutexGuard<()>, tokio::sync::TryLockError> {
5275 Arc::clone(&self.lock).try_lock_owned()
5276 }
5277
5278 pub fn is_empty(&self) -> bool {
5279 self.items.is_empty()
5280 }
5281
5282 pub fn push_back(&mut self, value: T) {
5283 self.items.push_back(value)
5284 }
5285
5286 pub fn pop_front(&mut self) -> Option<T> {
5287 self.items.pop_front()
5288 }
5289
5290 pub fn remove(&mut self, index: usize) -> Option<T> {
5291 self.items.remove(index)
5292 }
5293
5294 pub fn iter(&self) -> std::collections::vec_deque::Iter<'_, T> {
5295 self.items.iter()
5296 }
5297}
5298
5299#[derive(Debug)]
5300struct DeferredPlanStatement {
5301 ctx: ExecuteContext,
5302 ps: PlanStatement,
5303}
5304
5305#[derive(Debug)]
5306enum PlanStatement {
5307 Statement {
5308 stmt: Arc<Statement<Raw>>,
5309 params: Params,
5310 },
5311 Plan {
5312 plan: mz_sql::plan::Plan,
5313 resolved_ids: ResolvedIds,
5314 sql_impl_resolved_ids: ResolvedIds,
5315 },
5316}
5317
5318#[derive(Debug, Error)]
5319pub enum NetworkPolicyError {
5320 #[error("Access denied for address {0}")]
5321 AddressDenied(IpAddr),
5322 #[error("Access denied missing IP address")]
5323 MissingIp,
5324}
5325
5326pub(crate) fn validate_ip_with_policy_rules(
5327 ip: &IpAddr,
5328 rules: &Vec<NetworkPolicyRule>,
5329) -> Result<(), NetworkPolicyError> {
5330 if rules.iter().any(|r| r.address.0.contains(ip)) {
5333 Ok(())
5334 } else {
5335 Err(NetworkPolicyError::AddressDenied(ip.clone()))
5336 }
5337}
5338
5339pub(crate) fn infer_sql_type_for_catalog(
5340 hir_expr: &HirRelationExpr,
5341 mir_expr: &MirRelationExpr,
5342) -> SqlRelationType {
5343 let mut typ = hir_expr.top_level_typ();
5344 typ.backport_nullability_and_keys(&mir_expr.typ());
5345 typ
5346}
5347
5348#[cfg(test)]
5349mod id_pool_tests {
5350 use super::IdPool;
5351
5352 #[mz_ore::test]
5353 fn test_empty_pool() {
5354 let mut pool = IdPool::empty();
5355 assert_eq!(pool.remaining(), 0);
5356 assert_eq!(pool.allocate(), None);
5357 assert_eq!(pool.allocate_many(1), None);
5358 }
5359
5360 #[mz_ore::test]
5361 fn test_allocate_single() {
5362 let mut pool = IdPool::empty();
5363 pool.refill(10, 13);
5364 assert_eq!(pool.remaining(), 3);
5365 assert_eq!(pool.allocate(), Some(10));
5366 assert_eq!(pool.allocate(), Some(11));
5367 assert_eq!(pool.allocate(), Some(12));
5368 assert_eq!(pool.remaining(), 0);
5369 assert_eq!(pool.allocate(), None);
5370 }
5371
5372 #[mz_ore::test]
5373 fn test_allocate_many() {
5374 let mut pool = IdPool::empty();
5375 pool.refill(100, 105);
5376 assert_eq!(pool.allocate_many(3), Some(vec![100, 101, 102]));
5377 assert_eq!(pool.remaining(), 2);
5378 assert_eq!(pool.allocate_many(3), None);
5380 assert_eq!(pool.allocate_many(2), Some(vec![103, 104]));
5382 assert_eq!(pool.remaining(), 0);
5383 }
5384
5385 #[mz_ore::test]
5386 fn test_allocate_many_zero() {
5387 let mut pool = IdPool::empty();
5388 pool.refill(1, 5);
5389 assert_eq!(pool.allocate_many(0), Some(vec![]));
5390 assert_eq!(pool.remaining(), 4);
5391 }
5392
5393 #[mz_ore::test]
5394 fn test_refill_resets_pool() {
5395 let mut pool = IdPool::empty();
5396 pool.refill(0, 2);
5397 assert_eq!(pool.allocate(), Some(0));
5398 pool.refill(50, 52);
5400 assert_eq!(pool.allocate(), Some(50));
5401 assert_eq!(pool.allocate(), Some(51));
5402 assert_eq!(pool.allocate(), None);
5403 }
5404
5405 #[mz_ore::test]
5406 fn test_mixed_allocate_and_allocate_many() {
5407 let mut pool = IdPool::empty();
5408 pool.refill(0, 10);
5409 assert_eq!(pool.allocate(), Some(0));
5410 assert_eq!(pool.allocate_many(3), Some(vec![1, 2, 3]));
5411 assert_eq!(pool.allocate(), Some(4));
5412 assert_eq!(pool.remaining(), 5);
5413 }
5414
5415 #[mz_ore::test]
5416 #[should_panic(expected = "invalid pool range")]
5417 fn test_refill_invalid_range_panics() {
5418 let mut pool = IdPool::empty();
5419 pool.refill(10, 5);
5420 }
5421}
5422
5423#[cfg(test)]
5424mod arrangement_sizes_pruner_tests {
5425 use mz_repr::catalog_item_id::CatalogItemId;
5426 use mz_repr::{Datum, Row};
5427
5428 use super::arrangement_sizes_expired_retractions;
5429
5430 fn history_row(ts_ms: i64) -> Row {
5434 let dt = mz_ore::now::to_datetime(ts_ms.try_into().expect("non-negative"));
5435 Row::pack_slice(&[
5436 Datum::String("r1"),
5437 Datum::String("u1"),
5438 Datum::Int64(123),
5439 Datum::TimestampTz(dt.try_into().expect("fits in TimestampTz")),
5440 ])
5441 }
5442
5443 fn item_id() -> CatalogItemId {
5444 CatalogItemId::User(42)
5446 }
5447
5448 #[mz_ore::test]
5449 fn empty_input_produces_no_retractions() {
5450 let out = arrangement_sizes_expired_retractions(Vec::new(), 1_000, item_id());
5451 assert!(out.is_empty());
5452 }
5453
5454 #[mz_ore::test]
5455 fn retracts_only_rows_strictly_before_cutoff() {
5456 let rows = vec![
5459 (history_row(100), 1),
5460 (history_row(500), 1),
5461 (history_row(1_000), 1), (history_row(5_000), 1),
5463 ];
5464 let out = arrangement_sizes_expired_retractions(rows, 1_000, item_id());
5465 assert_eq!(out.len(), 2);
5466 }
5467
5468 #[mz_ore::test]
5469 #[should_panic(expected = "consolidated contents should not contain retractions")]
5470 fn retraction_in_input_panics() {
5471 let rows = vec![(history_row(100), -1)];
5472 let _ = arrangement_sizes_expired_retractions(rows, 1_000, item_id());
5473 }
5474}