1use std::borrow::Cow;
70use std::collections::{BTreeMap, BTreeSet, VecDeque};
71use std::fmt;
72use std::net::IpAddr;
73use std::num::NonZeroI64;
74use std::ops::Neg;
75use std::str::FromStr;
76use std::sync::LazyLock;
77use std::sync::{Arc, Mutex};
78use std::thread;
79use std::time::{Duration, Instant};
80
81use anyhow::Context;
82use chrono::{DateTime, Utc};
83use derivative::Derivative;
84use differential_dataflow::lattice::Lattice;
85use fail::fail_point;
86use futures::StreamExt;
87use futures::future::{BoxFuture, FutureExt, LocalBoxFuture};
88use http::Uri;
89use ipnet::IpNet;
90use itertools::{Either, Itertools};
91use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
92use mz_adapter_types::compaction::CompactionWindow;
93use mz_adapter_types::connection::ConnectionId;
94use mz_adapter_types::dyncfgs::WITH_0DT_DEPLOYMENT_CAUGHT_UP_CHECK_INTERVAL;
95use mz_auth::password::Password;
96use mz_build_info::BuildInfo;
97use mz_catalog::builtin::{BUILTINS, BUILTINS_STATIC, MZ_AUDIT_EVENTS, MZ_STORAGE_USAGE_BY_SHARD};
98use mz_catalog::config::{AwsPrincipalContext, BuiltinItemMigrationConfig, ClusterReplicaSizeMap};
99use mz_catalog::durable::{AuditLogIterator, OpenableDurableCatalogState};
100use mz_catalog::expr_cache::{GlobalExpressions, LocalExpressions};
101use mz_catalog::memory::objects::{
102 CatalogEntry, CatalogItem, ClusterReplicaProcessStatus, ClusterVariantManaged, Connection,
103 DataSourceDesc, StateDiff, StateUpdate, StateUpdateKind, Table, TableDataSource,
104};
105use mz_cloud_resources::{CloudResourceController, VpcEndpointConfig, VpcEndpointEvent};
106use mz_compute_client::as_of_selection;
107use mz_compute_client::controller::error::InstanceMissing;
108use mz_compute_types::ComputeInstanceId;
109use mz_compute_types::dataflows::DataflowDescription;
110use mz_compute_types::plan::Plan;
111use mz_controller::clusters::{
112 ClusterConfig, ClusterEvent, ClusterStatus, ProcessId, ReplicaLocation,
113};
114use mz_controller::{ControllerConfig, Readiness};
115use mz_controller_types::{ClusterId, ReplicaId, WatchSetId};
116use mz_expr::{MapFilterProject, OptimizedMirRelationExpr, RowSetFinishing};
117use mz_license_keys::ValidatedLicenseKey;
118use mz_orchestrator::OfflineReason;
119use mz_ore::cast::{CastFrom, CastInto, CastLossy};
120use mz_ore::channel::trigger::Trigger;
121use mz_ore::future::TimeoutError;
122use mz_ore::metrics::MetricsRegistry;
123use mz_ore::now::{EpochMillis, NowFn};
124use mz_ore::task::{JoinHandle, spawn};
125use mz_ore::thread::JoinHandleExt;
126use mz_ore::tracing::{OpenTelemetryContext, TracingHandle};
127use mz_ore::url::SensitiveUrl;
128use mz_ore::{
129 assert_none, instrument, soft_assert_eq_or_log, soft_assert_or_log, soft_panic_or_log, stack,
130};
131use mz_persist_client::PersistClient;
132use mz_persist_client::batch::ProtoBatch;
133use mz_persist_client::usage::{ShardsUsageReferenced, StorageUsageClient};
134use mz_repr::adt::numeric::Numeric;
135use mz_repr::explain::{ExplainConfig, ExplainFormat};
136use mz_repr::global_id::TransientIdGen;
137use mz_repr::optimize::OptimizerFeatures;
138use mz_repr::role_id::RoleId;
139use mz_repr::{CatalogItemId, Diff, GlobalId, RelationDesc, Timestamp};
140use mz_secrets::cache::CachingSecretsReader;
141use mz_secrets::{SecretsController, SecretsReader};
142use mz_sql::ast::{Raw, Statement};
143use mz_sql::catalog::{CatalogCluster, EnvironmentId};
144use mz_sql::names::{QualifiedItemName, ResolvedIds, SchemaSpecifier};
145use mz_sql::optimizer_metrics::OptimizerMetrics;
146use mz_sql::plan::{
147 self, AlterSinkPlan, ConnectionDetails, CreateConnectionPlan, NetworkPolicyRule,
148 OnTimeoutAction, Params, QueryWhen,
149};
150use mz_sql::session::user::User;
151use mz_sql::session::vars::{MAX_CREDIT_CONSUMPTION_RATE, SystemVars, Var};
152use mz_sql_parser::ast::ExplainStage;
153use mz_sql_parser::ast::display::AstDisplay;
154use mz_storage_client::client::TableData;
155use mz_storage_client::controller::{CollectionDescription, DataSource, ExportDescription};
156use mz_storage_types::connections::Connection as StorageConnection;
157use mz_storage_types::connections::ConnectionContext;
158use mz_storage_types::connections::inline::{IntoInlineConnection, ReferencedConnection};
159use mz_storage_types::read_holds::ReadHold;
160use mz_storage_types::sinks::{S3SinkFormat, StorageSinkDesc};
161use mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC;
162use mz_storage_types::sources::{IngestionDescription, SourceExport, Timeline};
163use mz_timestamp_oracle::WriteTimestamp;
164use mz_timestamp_oracle::postgres_oracle::{
165 PostgresTimestampOracle, PostgresTimestampOracleConfig,
166};
167use mz_transform::dataflow::DataflowMetainfo;
168use opentelemetry::trace::TraceContextExt;
169use serde::Serialize;
170use thiserror::Error;
171use timely::progress::{Antichain, Timestamp as _};
172use tokio::runtime::Handle as TokioHandle;
173use tokio::select;
174use tokio::sync::{OwnedMutexGuard, mpsc, oneshot, watch};
175use tokio::time::{Interval, MissedTickBehavior};
176use tracing::{Instrument, Level, Span, debug, info, info_span, span, warn};
177use tracing_opentelemetry::OpenTelemetrySpanExt;
178use uuid::Uuid;
179
180use crate::active_compute_sink::{ActiveComputeSink, ActiveCopyFrom};
181use crate::catalog::{BuiltinTableUpdate, Catalog, OpenCatalogResult};
182use crate::client::{Client, Handle};
183use crate::command::{Command, ExecuteResponse};
184use crate::config::{SynchronizedParameters, SystemParameterFrontend, SystemParameterSyncConfig};
185use crate::coord::appends::{
186 BuiltinTableAppendNotify, DeferredOp, GroupCommitPermit, PendingWriteTxn,
187};
188use crate::coord::caught_up::CaughtUpCheckContext;
189use crate::coord::cluster_scheduling::SchedulingDecision;
190use crate::coord::id_bundle::CollectionIdBundle;
191use crate::coord::introspection::IntrospectionSubscribe;
192use crate::coord::peek::PendingPeek;
193use crate::coord::statement_logging::{StatementLogging, StatementLoggingId};
194use crate::coord::timeline::{TimelineContext, TimelineState};
195use crate::coord::timestamp_selection::{TimestampContext, TimestampDetermination};
196use crate::coord::validity::PlanValidity;
197use crate::error::AdapterError;
198use crate::explain::insights::PlanInsightsContext;
199use crate::explain::optimizer_trace::{DispatchGuard, OptimizerTrace};
200use crate::metrics::Metrics;
201use crate::optimize::dataflows::{
202 ComputeInstanceSnapshot, DataflowBuilder, dataflow_import_id_bundle,
203};
204use crate::optimize::{self, Optimize, OptimizerConfig};
205use crate::session::{EndTransactionAction, Session};
206use crate::statement_logging::{StatementEndedExecutionReason, StatementLifecycleEvent};
207use crate::util::{ClientTransmitter, ResultExt};
208use crate::webhook::{WebhookAppenderInvalidator, WebhookConcurrencyLimiter};
209use crate::{AdapterNotice, ReadHolds, flags};
210
211pub(crate) mod id_bundle;
212pub(crate) mod in_memory_oracle;
213pub(crate) mod peek;
214pub(crate) mod statement_logging;
215pub(crate) mod timeline;
216pub(crate) mod timestamp_selection;
217
218pub mod appends;
219mod catalog_serving;
220mod caught_up;
221pub mod cluster_scheduling;
222mod command_handler;
223pub mod consistency;
224mod ddl;
225mod indexes;
226mod introspection;
227mod message_handler;
228mod privatelink_status;
229pub mod read_policy;
230mod sequencer;
231mod sql;
232mod validity;
233
234#[derive(Debug)]
235pub enum Message {
236 Command(OpenTelemetryContext, Command),
237 ControllerReady {
238 controller: ControllerReadiness,
239 },
240 PurifiedStatementReady(PurifiedStatementReady),
241 CreateConnectionValidationReady(CreateConnectionValidationReady),
242 AlterConnectionValidationReady(AlterConnectionValidationReady),
243 TryDeferred {
244 conn_id: ConnectionId,
246 acquired_lock: Option<(CatalogItemId, tokio::sync::OwnedMutexGuard<()>)>,
256 },
257 GroupCommitInitiate(Span, Option<GroupCommitPermit>),
259 DeferredStatementReady,
260 AdvanceTimelines,
261 ClusterEvent(ClusterEvent),
262 CancelPendingPeeks {
263 conn_id: ConnectionId,
264 },
265 LinearizeReads,
266 StagedBatches {
267 conn_id: ConnectionId,
268 table_id: CatalogItemId,
269 batches: Vec<Result<ProtoBatch, String>>,
270 },
271 StorageUsageSchedule,
272 StorageUsageFetch,
273 StorageUsageUpdate(ShardsUsageReferenced),
274 StorageUsagePrune(Vec<BuiltinTableUpdate>),
275 RetireExecute {
278 data: ExecuteContextExtra,
279 otel_ctx: OpenTelemetryContext,
280 reason: StatementEndedExecutionReason,
281 },
282 ExecuteSingleStatementTransaction {
283 ctx: ExecuteContext,
284 otel_ctx: OpenTelemetryContext,
285 stmt: Arc<Statement<Raw>>,
286 params: mz_sql::plan::Params,
287 },
288 PeekStageReady {
289 ctx: ExecuteContext,
290 span: Span,
291 stage: PeekStage,
292 },
293 CreateIndexStageReady {
294 ctx: ExecuteContext,
295 span: Span,
296 stage: CreateIndexStage,
297 },
298 CreateViewStageReady {
299 ctx: ExecuteContext,
300 span: Span,
301 stage: CreateViewStage,
302 },
303 CreateMaterializedViewStageReady {
304 ctx: ExecuteContext,
305 span: Span,
306 stage: CreateMaterializedViewStage,
307 },
308 SubscribeStageReady {
309 ctx: ExecuteContext,
310 span: Span,
311 stage: SubscribeStage,
312 },
313 IntrospectionSubscribeStageReady {
314 span: Span,
315 stage: IntrospectionSubscribeStage,
316 },
317 SecretStageReady {
318 ctx: ExecuteContext,
319 span: Span,
320 stage: SecretStage,
321 },
322 ClusterStageReady {
323 ctx: ExecuteContext,
324 span: Span,
325 stage: ClusterStage,
326 },
327 ExplainTimestampStageReady {
328 ctx: ExecuteContext,
329 span: Span,
330 stage: ExplainTimestampStage,
331 },
332 DrainStatementLog,
333 PrivateLinkVpcEndpointEvents(Vec<VpcEndpointEvent>),
334 CheckSchedulingPolicies,
335
336 SchedulingDecisions(Vec<(&'static str, Vec<(ClusterId, SchedulingDecision)>)>),
341}
342
343impl Message {
344 pub const fn kind(&self) -> &'static str {
346 match self {
347 Message::Command(_, msg) => match msg {
348 Command::CatalogSnapshot { .. } => "command-catalog_snapshot",
349 Command::Startup { .. } => "command-startup",
350 Command::Execute { .. } => "command-execute",
351 Command::Commit { .. } => "command-commit",
352 Command::CancelRequest { .. } => "command-cancel_request",
353 Command::PrivilegedCancelRequest { .. } => "command-privileged_cancel_request",
354 Command::GetWebhook { .. } => "command-get_webhook",
355 Command::GetSystemVars { .. } => "command-get_system_vars",
356 Command::SetSystemVars { .. } => "command-set_system_vars",
357 Command::Terminate { .. } => "command-terminate",
358 Command::RetireExecute { .. } => "command-retire_execute",
359 Command::CheckConsistency { .. } => "command-check_consistency",
360 Command::Dump { .. } => "command-dump",
361 Command::AuthenticatePassword { .. } => "command-auth_check",
362 Command::AuthenticateGetSASLChallenge { .. } => "command-auth_get_sasl_challenge",
363 Command::AuthenticateVerifySASLProof { .. } => "command-auth_verify_sasl_proof",
364 },
365 Message::ControllerReady {
366 controller: ControllerReadiness::Compute,
367 } => "controller_ready(compute)",
368 Message::ControllerReady {
369 controller: ControllerReadiness::Storage,
370 } => "controller_ready(storage)",
371 Message::ControllerReady {
372 controller: ControllerReadiness::Metrics,
373 } => "controller_ready(metrics)",
374 Message::ControllerReady {
375 controller: ControllerReadiness::Internal,
376 } => "controller_ready(internal)",
377 Message::PurifiedStatementReady(_) => "purified_statement_ready",
378 Message::CreateConnectionValidationReady(_) => "create_connection_validation_ready",
379 Message::TryDeferred { .. } => "try_deferred",
380 Message::GroupCommitInitiate(..) => "group_commit_initiate",
381 Message::AdvanceTimelines => "advance_timelines",
382 Message::ClusterEvent(_) => "cluster_event",
383 Message::CancelPendingPeeks { .. } => "cancel_pending_peeks",
384 Message::LinearizeReads => "linearize_reads",
385 Message::StagedBatches { .. } => "staged_batches",
386 Message::StorageUsageSchedule => "storage_usage_schedule",
387 Message::StorageUsageFetch => "storage_usage_fetch",
388 Message::StorageUsageUpdate(_) => "storage_usage_update",
389 Message::StorageUsagePrune(_) => "storage_usage_prune",
390 Message::RetireExecute { .. } => "retire_execute",
391 Message::ExecuteSingleStatementTransaction { .. } => {
392 "execute_single_statement_transaction"
393 }
394 Message::PeekStageReady { .. } => "peek_stage_ready",
395 Message::ExplainTimestampStageReady { .. } => "explain_timestamp_stage_ready",
396 Message::CreateIndexStageReady { .. } => "create_index_stage_ready",
397 Message::CreateViewStageReady { .. } => "create_view_stage_ready",
398 Message::CreateMaterializedViewStageReady { .. } => {
399 "create_materialized_view_stage_ready"
400 }
401 Message::SubscribeStageReady { .. } => "subscribe_stage_ready",
402 Message::IntrospectionSubscribeStageReady { .. } => {
403 "introspection_subscribe_stage_ready"
404 }
405 Message::SecretStageReady { .. } => "secret_stage_ready",
406 Message::ClusterStageReady { .. } => "cluster_stage_ready",
407 Message::DrainStatementLog => "drain_statement_log",
408 Message::AlterConnectionValidationReady(..) => "alter_connection_validation_ready",
409 Message::PrivateLinkVpcEndpointEvents(_) => "private_link_vpc_endpoint_events",
410 Message::CheckSchedulingPolicies => "check_scheduling_policies",
411 Message::SchedulingDecisions { .. } => "scheduling_decision",
412 Message::DeferredStatementReady => "deferred_statement_ready",
413 }
414 }
415}
416
417#[derive(Debug)]
419pub enum ControllerReadiness {
420 Storage,
422 Compute,
424 Metrics,
426 Internal,
428}
429
430#[derive(Derivative)]
431#[derivative(Debug)]
432pub struct BackgroundWorkResult<T> {
433 #[derivative(Debug = "ignore")]
434 pub ctx: ExecuteContext,
435 pub result: Result<T, AdapterError>,
436 pub params: Params,
437 pub plan_validity: PlanValidity,
438 pub original_stmt: Arc<Statement<Raw>>,
439 pub otel_ctx: OpenTelemetryContext,
440}
441
442pub type PurifiedStatementReady = BackgroundWorkResult<mz_sql::pure::PurifiedStatement>;
443
444#[derive(Derivative)]
445#[derivative(Debug)]
446pub struct ValidationReady<T> {
447 #[derivative(Debug = "ignore")]
448 pub ctx: ExecuteContext,
449 pub result: Result<T, AdapterError>,
450 pub resolved_ids: ResolvedIds,
451 pub connection_id: CatalogItemId,
452 pub connection_gid: GlobalId,
453 pub plan_validity: PlanValidity,
454 pub otel_ctx: OpenTelemetryContext,
455}
456
457pub type CreateConnectionValidationReady = ValidationReady<CreateConnectionPlan>;
458pub type AlterConnectionValidationReady = ValidationReady<Connection>;
459
460#[derive(Debug)]
461pub enum PeekStage {
462 LinearizeTimestamp(PeekStageLinearizeTimestamp),
464 RealTimeRecency(PeekStageRealTimeRecency),
465 TimestampReadHold(PeekStageTimestampReadHold),
466 Optimize(PeekStageOptimize),
467 Finish(PeekStageFinish),
469 ExplainPlan(PeekStageExplainPlan),
471 ExplainPushdown(PeekStageExplainPushdown),
472 CopyToPreflight(PeekStageCopyTo),
474 CopyToDataflow(PeekStageCopyTo),
476}
477
478#[derive(Debug)]
479pub struct CopyToContext {
480 pub desc: RelationDesc,
482 pub uri: Uri,
484 pub connection: StorageConnection<ReferencedConnection>,
486 pub connection_id: CatalogItemId,
488 pub format: S3SinkFormat,
490 pub max_file_size: u64,
492 pub output_batch_count: Option<u64>,
497}
498
499#[derive(Debug)]
500pub struct PeekStageLinearizeTimestamp {
501 validity: PlanValidity,
502 plan: mz_sql::plan::SelectPlan,
503 max_query_result_size: Option<u64>,
504 source_ids: BTreeSet<GlobalId>,
505 target_replica: Option<ReplicaId>,
506 timeline_context: TimelineContext,
507 optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
508 explain_ctx: ExplainContext,
511}
512
513#[derive(Debug)]
514pub struct PeekStageRealTimeRecency {
515 validity: PlanValidity,
516 plan: mz_sql::plan::SelectPlan,
517 max_query_result_size: Option<u64>,
518 source_ids: BTreeSet<GlobalId>,
519 target_replica: Option<ReplicaId>,
520 timeline_context: TimelineContext,
521 oracle_read_ts: Option<Timestamp>,
522 optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
523 explain_ctx: ExplainContext,
526}
527
528#[derive(Debug)]
529pub struct PeekStageTimestampReadHold {
530 validity: PlanValidity,
531 plan: mz_sql::plan::SelectPlan,
532 max_query_result_size: Option<u64>,
533 source_ids: BTreeSet<GlobalId>,
534 target_replica: Option<ReplicaId>,
535 timeline_context: TimelineContext,
536 oracle_read_ts: Option<Timestamp>,
537 real_time_recency_ts: Option<mz_repr::Timestamp>,
538 optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
539 explain_ctx: ExplainContext,
542}
543
544#[derive(Debug)]
545pub struct PeekStageOptimize {
546 validity: PlanValidity,
547 plan: mz_sql::plan::SelectPlan,
548 max_query_result_size: Option<u64>,
549 source_ids: BTreeSet<GlobalId>,
550 id_bundle: CollectionIdBundle,
551 target_replica: Option<ReplicaId>,
552 determination: TimestampDetermination<mz_repr::Timestamp>,
553 optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
554 explain_ctx: ExplainContext,
557}
558
559#[derive(Debug)]
560pub struct PeekStageFinish {
561 validity: PlanValidity,
562 plan: mz_sql::plan::SelectPlan,
563 max_query_result_size: Option<u64>,
564 id_bundle: CollectionIdBundle,
565 target_replica: Option<ReplicaId>,
566 source_ids: BTreeSet<GlobalId>,
567 determination: TimestampDetermination<mz_repr::Timestamp>,
568 cluster_id: ComputeInstanceId,
569 finishing: RowSetFinishing,
570 plan_insights_optimizer_trace: Option<OptimizerTrace>,
573 insights_ctx: Option<Box<PlanInsightsContext>>,
574 global_lir_plan: optimize::peek::GlobalLirPlan,
575 optimization_finished_at: EpochMillis,
576}
577
578#[derive(Debug)]
579pub struct PeekStageCopyTo {
580 validity: PlanValidity,
581 optimizer: optimize::copy_to::Optimizer,
582 global_lir_plan: optimize::copy_to::GlobalLirPlan,
583 optimization_finished_at: EpochMillis,
584 source_ids: BTreeSet<GlobalId>,
585}
586
587#[derive(Debug)]
588pub struct PeekStageExplainPlan {
589 validity: PlanValidity,
590 optimizer: optimize::peek::Optimizer,
591 df_meta: DataflowMetainfo,
592 explain_ctx: ExplainPlanContext,
593 insights_ctx: Option<Box<PlanInsightsContext>>,
594}
595
596#[derive(Debug)]
597pub struct PeekStageExplainPushdown {
598 validity: PlanValidity,
599 determination: TimestampDetermination<mz_repr::Timestamp>,
600 imports: BTreeMap<GlobalId, MapFilterProject>,
601}
602
603#[derive(Debug)]
604pub enum CreateIndexStage {
605 Optimize(CreateIndexOptimize),
606 Finish(CreateIndexFinish),
607 Explain(CreateIndexExplain),
608}
609
610#[derive(Debug)]
611pub struct CreateIndexOptimize {
612 validity: PlanValidity,
613 plan: plan::CreateIndexPlan,
614 resolved_ids: ResolvedIds,
615 explain_ctx: ExplainContext,
618}
619
620#[derive(Debug)]
621pub struct CreateIndexFinish {
622 validity: PlanValidity,
623 item_id: CatalogItemId,
624 global_id: GlobalId,
625 plan: plan::CreateIndexPlan,
626 resolved_ids: ResolvedIds,
627 global_mir_plan: optimize::index::GlobalMirPlan,
628 global_lir_plan: optimize::index::GlobalLirPlan,
629}
630
631#[derive(Debug)]
632pub struct CreateIndexExplain {
633 validity: PlanValidity,
634 exported_index_id: GlobalId,
635 plan: plan::CreateIndexPlan,
636 df_meta: DataflowMetainfo,
637 explain_ctx: ExplainPlanContext,
638}
639
640#[derive(Debug)]
641pub enum CreateViewStage {
642 Optimize(CreateViewOptimize),
643 Finish(CreateViewFinish),
644 Explain(CreateViewExplain),
645}
646
647#[derive(Debug)]
648pub struct CreateViewOptimize {
649 validity: PlanValidity,
650 plan: plan::CreateViewPlan,
651 resolved_ids: ResolvedIds,
652 explain_ctx: ExplainContext,
655}
656
657#[derive(Debug)]
658pub struct CreateViewFinish {
659 validity: PlanValidity,
660 item_id: CatalogItemId,
662 global_id: GlobalId,
664 plan: plan::CreateViewPlan,
665 resolved_ids: ResolvedIds,
667 optimized_expr: OptimizedMirRelationExpr,
668}
669
670#[derive(Debug)]
671pub struct CreateViewExplain {
672 validity: PlanValidity,
673 id: GlobalId,
674 plan: plan::CreateViewPlan,
675 explain_ctx: ExplainPlanContext,
676}
677
678#[derive(Debug)]
679pub enum ExplainTimestampStage {
680 Optimize(ExplainTimestampOptimize),
681 RealTimeRecency(ExplainTimestampRealTimeRecency),
682 Finish(ExplainTimestampFinish),
683}
684
685#[derive(Debug)]
686pub struct ExplainTimestampOptimize {
687 validity: PlanValidity,
688 plan: plan::ExplainTimestampPlan,
689 cluster_id: ClusterId,
690}
691
692#[derive(Debug)]
693pub struct ExplainTimestampRealTimeRecency {
694 validity: PlanValidity,
695 format: ExplainFormat,
696 optimized_plan: OptimizedMirRelationExpr,
697 cluster_id: ClusterId,
698 when: QueryWhen,
699}
700
701#[derive(Debug)]
702pub struct ExplainTimestampFinish {
703 validity: PlanValidity,
704 format: ExplainFormat,
705 optimized_plan: OptimizedMirRelationExpr,
706 cluster_id: ClusterId,
707 source_ids: BTreeSet<GlobalId>,
708 when: QueryWhen,
709 real_time_recency_ts: Option<Timestamp>,
710}
711
712#[derive(Debug)]
713pub enum ClusterStage {
714 Alter(AlterCluster),
715 WaitForHydrated(AlterClusterWaitForHydrated),
716 Finalize(AlterClusterFinalize),
717}
718
719#[derive(Debug)]
720pub struct AlterCluster {
721 validity: PlanValidity,
722 plan: plan::AlterClusterPlan,
723}
724
725#[derive(Debug)]
726pub struct AlterClusterWaitForHydrated {
727 validity: PlanValidity,
728 plan: plan::AlterClusterPlan,
729 new_config: ClusterVariantManaged,
730 timeout_time: Instant,
731 on_timeout: OnTimeoutAction,
732}
733
734#[derive(Debug)]
735pub struct AlterClusterFinalize {
736 validity: PlanValidity,
737 plan: plan::AlterClusterPlan,
738 new_config: ClusterVariantManaged,
739}
740
741#[derive(Debug)]
742pub enum ExplainContext {
743 None,
745 Plan(ExplainPlanContext),
747 PlanInsightsNotice(OptimizerTrace),
750 Pushdown,
752}
753
754impl ExplainContext {
755 fn dispatch_guard(&self) -> Option<DispatchGuard<'_>> {
759 let optimizer_trace = match self {
760 ExplainContext::Plan(explain_ctx) => Some(&explain_ctx.optimizer_trace),
761 ExplainContext::PlanInsightsNotice(optimizer_trace) => Some(optimizer_trace),
762 _ => None,
763 };
764 optimizer_trace.map(|optimizer_trace| optimizer_trace.as_guard())
765 }
766
767 fn needs_cluster(&self) -> bool {
768 match self {
769 ExplainContext::None => true,
770 ExplainContext::Plan(..) => false,
771 ExplainContext::PlanInsightsNotice(..) => true,
772 ExplainContext::Pushdown => false,
773 }
774 }
775
776 fn needs_plan_insights(&self) -> bool {
777 matches!(
778 self,
779 ExplainContext::Plan(ExplainPlanContext {
780 stage: ExplainStage::PlanInsights,
781 ..
782 }) | ExplainContext::PlanInsightsNotice(_)
783 )
784 }
785}
786
787#[derive(Debug)]
788pub struct ExplainPlanContext {
789 pub broken: bool,
790 pub config: ExplainConfig,
791 pub format: ExplainFormat,
792 pub stage: ExplainStage,
793 pub replan: Option<GlobalId>,
794 pub desc: Option<RelationDesc>,
795 pub optimizer_trace: OptimizerTrace,
796}
797
798#[derive(Debug)]
799pub enum CreateMaterializedViewStage {
800 Optimize(CreateMaterializedViewOptimize),
801 Finish(CreateMaterializedViewFinish),
802 Explain(CreateMaterializedViewExplain),
803}
804
805#[derive(Debug)]
806pub struct CreateMaterializedViewOptimize {
807 validity: PlanValidity,
808 plan: plan::CreateMaterializedViewPlan,
809 resolved_ids: ResolvedIds,
810 explain_ctx: ExplainContext,
813}
814
815#[derive(Debug)]
816pub struct CreateMaterializedViewFinish {
817 item_id: CatalogItemId,
819 global_id: GlobalId,
821 validity: PlanValidity,
822 plan: plan::CreateMaterializedViewPlan,
823 resolved_ids: ResolvedIds,
824 local_mir_plan: optimize::materialized_view::LocalMirPlan,
825 global_mir_plan: optimize::materialized_view::GlobalMirPlan,
826 global_lir_plan: optimize::materialized_view::GlobalLirPlan,
827}
828
829#[derive(Debug)]
830pub struct CreateMaterializedViewExplain {
831 global_id: GlobalId,
832 validity: PlanValidity,
833 plan: plan::CreateMaterializedViewPlan,
834 df_meta: DataflowMetainfo,
835 explain_ctx: ExplainPlanContext,
836}
837
838#[derive(Debug)]
839pub enum SubscribeStage {
840 OptimizeMir(SubscribeOptimizeMir),
841 TimestampOptimizeLir(SubscribeTimestampOptimizeLir),
842 Finish(SubscribeFinish),
843}
844
845#[derive(Debug)]
846pub struct SubscribeOptimizeMir {
847 validity: PlanValidity,
848 plan: plan::SubscribePlan,
849 timeline: TimelineContext,
850 dependency_ids: BTreeSet<GlobalId>,
851 cluster_id: ComputeInstanceId,
852 replica_id: Option<ReplicaId>,
853}
854
855#[derive(Debug)]
856pub struct SubscribeTimestampOptimizeLir {
857 validity: PlanValidity,
858 plan: plan::SubscribePlan,
859 timeline: TimelineContext,
860 optimizer: optimize::subscribe::Optimizer,
861 global_mir_plan: optimize::subscribe::GlobalMirPlan<optimize::subscribe::Unresolved>,
862 dependency_ids: BTreeSet<GlobalId>,
863 replica_id: Option<ReplicaId>,
864}
865
866#[derive(Debug)]
867pub struct SubscribeFinish {
868 validity: PlanValidity,
869 cluster_id: ComputeInstanceId,
870 replica_id: Option<ReplicaId>,
871 plan: plan::SubscribePlan,
872 global_lir_plan: optimize::subscribe::GlobalLirPlan,
873 dependency_ids: BTreeSet<GlobalId>,
874}
875
876#[derive(Debug)]
877pub enum IntrospectionSubscribeStage {
878 OptimizeMir(IntrospectionSubscribeOptimizeMir),
879 TimestampOptimizeLir(IntrospectionSubscribeTimestampOptimizeLir),
880 Finish(IntrospectionSubscribeFinish),
881}
882
883#[derive(Debug)]
884pub struct IntrospectionSubscribeOptimizeMir {
885 validity: PlanValidity,
886 plan: plan::SubscribePlan,
887 subscribe_id: GlobalId,
888 cluster_id: ComputeInstanceId,
889 replica_id: ReplicaId,
890}
891
892#[derive(Debug)]
893pub struct IntrospectionSubscribeTimestampOptimizeLir {
894 validity: PlanValidity,
895 optimizer: optimize::subscribe::Optimizer,
896 global_mir_plan: optimize::subscribe::GlobalMirPlan<optimize::subscribe::Unresolved>,
897 cluster_id: ComputeInstanceId,
898 replica_id: ReplicaId,
899}
900
901#[derive(Debug)]
902pub struct IntrospectionSubscribeFinish {
903 validity: PlanValidity,
904 global_lir_plan: optimize::subscribe::GlobalLirPlan,
905 read_holds: ReadHolds<Timestamp>,
906 cluster_id: ComputeInstanceId,
907 replica_id: ReplicaId,
908}
909
910#[derive(Debug)]
911pub enum SecretStage {
912 CreateEnsure(CreateSecretEnsure),
913 CreateFinish(CreateSecretFinish),
914 RotateKeysEnsure(RotateKeysSecretEnsure),
915 RotateKeysFinish(RotateKeysSecretFinish),
916 Alter(AlterSecret),
917}
918
919#[derive(Debug)]
920pub struct CreateSecretEnsure {
921 validity: PlanValidity,
922 plan: plan::CreateSecretPlan,
923}
924
925#[derive(Debug)]
926pub struct CreateSecretFinish {
927 validity: PlanValidity,
928 item_id: CatalogItemId,
929 global_id: GlobalId,
930 plan: plan::CreateSecretPlan,
931}
932
933#[derive(Debug)]
934pub struct RotateKeysSecretEnsure {
935 validity: PlanValidity,
936 id: CatalogItemId,
937}
938
939#[derive(Debug)]
940pub struct RotateKeysSecretFinish {
941 validity: PlanValidity,
942 ops: Vec<crate::catalog::Op>,
943}
944
945#[derive(Debug)]
946pub struct AlterSecret {
947 validity: PlanValidity,
948 plan: plan::AlterSecretPlan,
949}
950
951#[derive(Debug, Copy, Clone, PartialEq, Eq)]
956pub enum TargetCluster {
957 CatalogServer,
959 Active,
961 Transaction(ClusterId),
963}
964
965pub(crate) enum StageResult<T> {
967 Handle(JoinHandle<Result<T, AdapterError>>),
969 HandleRetire(JoinHandle<Result<ExecuteResponse, AdapterError>>),
971 Immediate(T),
973 Response(ExecuteResponse),
975}
976
977pub(crate) trait Staged: Send {
979 type Ctx: StagedContext;
980
981 fn validity(&mut self) -> &mut PlanValidity;
982
983 async fn stage(
985 self,
986 coord: &mut Coordinator,
987 ctx: &mut Self::Ctx,
988 ) -> Result<StageResult<Box<Self>>, AdapterError>;
989
990 fn message(self, ctx: Self::Ctx, span: Span) -> Message;
992
993 fn cancel_enabled(&self) -> bool;
995}
996
997pub trait StagedContext {
998 fn retire(self, result: Result<ExecuteResponse, AdapterError>);
999 fn session(&self) -> Option<&Session>;
1000}
1001
1002impl StagedContext for ExecuteContext {
1003 fn retire(self, result: Result<ExecuteResponse, AdapterError>) {
1004 self.retire(result);
1005 }
1006
1007 fn session(&self) -> Option<&Session> {
1008 Some(self.session())
1009 }
1010}
1011
1012impl StagedContext for () {
1013 fn retire(self, _result: Result<ExecuteResponse, AdapterError>) {}
1014
1015 fn session(&self) -> Option<&Session> {
1016 None
1017 }
1018}
1019
1020pub struct Config {
1022 pub controller_config: ControllerConfig,
1023 pub controller_envd_epoch: NonZeroI64,
1024 pub storage: Box<dyn mz_catalog::durable::DurableCatalogState>,
1025 pub audit_logs_iterator: AuditLogIterator,
1026 pub timestamp_oracle_url: Option<SensitiveUrl>,
1027 pub unsafe_mode: bool,
1028 pub all_features: bool,
1029 pub build_info: &'static BuildInfo,
1030 pub environment_id: EnvironmentId,
1031 pub metrics_registry: MetricsRegistry,
1032 pub now: NowFn,
1033 pub secrets_controller: Arc<dyn SecretsController>,
1034 pub cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
1035 pub availability_zones: Vec<String>,
1036 pub cluster_replica_sizes: ClusterReplicaSizeMap,
1037 pub builtin_system_cluster_config: BootstrapBuiltinClusterConfig,
1038 pub builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig,
1039 pub builtin_probe_cluster_config: BootstrapBuiltinClusterConfig,
1040 pub builtin_support_cluster_config: BootstrapBuiltinClusterConfig,
1041 pub builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig,
1042 pub system_parameter_defaults: BTreeMap<String, String>,
1043 pub storage_usage_client: StorageUsageClient,
1044 pub storage_usage_collection_interval: Duration,
1045 pub storage_usage_retention_period: Option<Duration>,
1046 pub segment_client: Option<mz_segment::Client>,
1047 pub egress_addresses: Vec<IpNet>,
1048 pub remote_system_parameters: Option<BTreeMap<String, String>>,
1049 pub aws_account_id: Option<String>,
1050 pub aws_privatelink_availability_zones: Option<Vec<String>>,
1051 pub connection_context: ConnectionContext,
1052 pub connection_limit_callback: Box<dyn Fn(u64, u64) -> () + Send + Sync + 'static>,
1053 pub webhook_concurrency_limit: WebhookConcurrencyLimiter,
1054 pub http_host_name: Option<String>,
1055 pub tracing_handle: TracingHandle,
1056 pub read_only_controllers: bool,
1060
1061 pub caught_up_trigger: Option<Trigger>,
1065
1066 pub helm_chart_version: Option<String>,
1067 pub license_key: ValidatedLicenseKey,
1068 pub external_login_password_mz_system: Option<Password>,
1069}
1070
1071#[derive(Debug, Serialize)]
1073pub struct ConnMeta {
1074 secret_key: u32,
1079 connected_at: EpochMillis,
1081 user: User,
1082 application_name: String,
1083 uuid: Uuid,
1084 conn_id: ConnectionId,
1085 client_ip: Option<IpAddr>,
1086
1087 drop_sinks: BTreeSet<GlobalId>,
1090
1091 #[serde(skip)]
1093 deferred_lock: Option<OwnedMutexGuard<()>>,
1094
1095 pending_cluster_alters: BTreeSet<ClusterId>,
1098
1099 #[serde(skip)]
1101 notice_tx: mpsc::UnboundedSender<AdapterNotice>,
1102
1103 authenticated_role: RoleId,
1107}
1108
1109impl ConnMeta {
1110 pub fn conn_id(&self) -> &ConnectionId {
1111 &self.conn_id
1112 }
1113
1114 pub fn user(&self) -> &User {
1115 &self.user
1116 }
1117
1118 pub fn application_name(&self) -> &str {
1119 &self.application_name
1120 }
1121
1122 pub fn authenticated_role_id(&self) -> &RoleId {
1123 &self.authenticated_role
1124 }
1125
1126 pub fn uuid(&self) -> Uuid {
1127 self.uuid
1128 }
1129
1130 pub fn client_ip(&self) -> Option<IpAddr> {
1131 self.client_ip
1132 }
1133
1134 pub fn connected_at(&self) -> EpochMillis {
1135 self.connected_at
1136 }
1137}
1138
1139#[derive(Debug)]
1140pub struct PendingTxn {
1142 ctx: ExecuteContext,
1144 response: Result<PendingTxnResponse, AdapterError>,
1146 action: EndTransactionAction,
1148}
1149
1150#[derive(Debug)]
1151pub enum PendingTxnResponse {
1153 Committed {
1155 params: BTreeMap<&'static str, String>,
1157 },
1158 Rolledback {
1160 params: BTreeMap<&'static str, String>,
1162 },
1163}
1164
1165impl PendingTxnResponse {
1166 pub fn extend_params(&mut self, p: impl IntoIterator<Item = (&'static str, String)>) {
1167 match self {
1168 PendingTxnResponse::Committed { params }
1169 | PendingTxnResponse::Rolledback { params } => params.extend(p),
1170 }
1171 }
1172}
1173
1174impl From<PendingTxnResponse> for ExecuteResponse {
1175 fn from(value: PendingTxnResponse) -> Self {
1176 match value {
1177 PendingTxnResponse::Committed { params } => {
1178 ExecuteResponse::TransactionCommitted { params }
1179 }
1180 PendingTxnResponse::Rolledback { params } => {
1181 ExecuteResponse::TransactionRolledBack { params }
1182 }
1183 }
1184 }
1185}
1186
1187#[derive(Debug)]
1188pub struct PendingReadTxn {
1190 txn: PendingRead,
1192 timestamp_context: TimestampContext<mz_repr::Timestamp>,
1194 created: Instant,
1196 num_requeues: u64,
1200 otel_ctx: OpenTelemetryContext,
1202}
1203
1204impl PendingReadTxn {
1205 pub fn timestamp_context(&self) -> &TimestampContext<mz_repr::Timestamp> {
1207 &self.timestamp_context
1208 }
1209
1210 pub(crate) fn take_context(self) -> ExecuteContext {
1211 self.txn.take_context()
1212 }
1213}
1214
1215#[derive(Debug)]
1216enum PendingRead {
1218 Read {
1219 txn: PendingTxn,
1221 },
1222 ReadThenWrite {
1223 ctx: ExecuteContext,
1225 tx: oneshot::Sender<Option<ExecuteContext>>,
1228 },
1229}
1230
1231impl PendingRead {
1232 #[instrument(level = "debug")]
1237 pub fn finish(self) -> Option<(ExecuteContext, Result<ExecuteResponse, AdapterError>)> {
1238 match self {
1239 PendingRead::Read {
1240 txn:
1241 PendingTxn {
1242 mut ctx,
1243 response,
1244 action,
1245 },
1246 ..
1247 } => {
1248 let changed = ctx.session_mut().vars_mut().end_transaction(action);
1249 let response = response.map(|mut r| {
1251 r.extend_params(changed);
1252 ExecuteResponse::from(r)
1253 });
1254
1255 Some((ctx, response))
1256 }
1257 PendingRead::ReadThenWrite { ctx, tx, .. } => {
1258 let _ = tx.send(Some(ctx));
1260 None
1261 }
1262 }
1263 }
1264
1265 fn label(&self) -> &'static str {
1266 match self {
1267 PendingRead::Read { .. } => "read",
1268 PendingRead::ReadThenWrite { .. } => "read_then_write",
1269 }
1270 }
1271
1272 pub(crate) fn take_context(self) -> ExecuteContext {
1273 match self {
1274 PendingRead::Read { txn, .. } => txn.ctx,
1275 PendingRead::ReadThenWrite { ctx, tx, .. } => {
1276 let _ = tx.send(None);
1279 ctx
1280 }
1281 }
1282 }
1283}
1284
1285#[derive(Debug, Default)]
1296#[must_use]
1297pub struct ExecuteContextExtra {
1298 statement_uuid: Option<StatementLoggingId>,
1299}
1300
1301impl ExecuteContextExtra {
1302 pub(crate) fn new(statement_uuid: Option<StatementLoggingId>) -> Self {
1303 Self { statement_uuid }
1304 }
1305 pub fn is_trivial(&self) -> bool {
1306 let Self { statement_uuid } = self;
1307 statement_uuid.is_none()
1308 }
1309 pub fn contents(&self) -> Option<StatementLoggingId> {
1310 let Self { statement_uuid } = self;
1311 *statement_uuid
1312 }
1313 #[must_use]
1317 fn retire(mut self) -> Option<StatementLoggingId> {
1318 let Self { statement_uuid } = &mut self;
1319 statement_uuid.take()
1320 }
1321}
1322
1323impl Drop for ExecuteContextExtra {
1324 fn drop(&mut self) {
1325 let Self { statement_uuid } = &*self;
1326 if let Some(statement_uuid) = statement_uuid {
1327 soft_panic_or_log!(
1331 "execute context for statement {statement_uuid:?} dropped without being properly retired."
1332 );
1333 }
1334 }
1335}
1336
1337#[derive(Debug)]
1350pub struct ExecuteContext {
1351 inner: Box<ExecuteContextInner>,
1352}
1353
1354impl std::ops::Deref for ExecuteContext {
1355 type Target = ExecuteContextInner;
1356 fn deref(&self) -> &Self::Target {
1357 &*self.inner
1358 }
1359}
1360
1361impl std::ops::DerefMut for ExecuteContext {
1362 fn deref_mut(&mut self) -> &mut Self::Target {
1363 &mut *self.inner
1364 }
1365}
1366
1367#[derive(Debug)]
1368pub struct ExecuteContextInner {
1369 tx: ClientTransmitter<ExecuteResponse>,
1370 internal_cmd_tx: mpsc::UnboundedSender<Message>,
1371 session: Session,
1372 extra: ExecuteContextExtra,
1373}
1374
1375impl ExecuteContext {
1376 pub fn session(&self) -> &Session {
1377 &self.session
1378 }
1379
1380 pub fn session_mut(&mut self) -> &mut Session {
1381 &mut self.session
1382 }
1383
1384 pub fn tx(&self) -> &ClientTransmitter<ExecuteResponse> {
1385 &self.tx
1386 }
1387
1388 pub fn tx_mut(&mut self) -> &mut ClientTransmitter<ExecuteResponse> {
1389 &mut self.tx
1390 }
1391
1392 pub fn from_parts(
1393 tx: ClientTransmitter<ExecuteResponse>,
1394 internal_cmd_tx: mpsc::UnboundedSender<Message>,
1395 session: Session,
1396 extra: ExecuteContextExtra,
1397 ) -> Self {
1398 Self {
1399 inner: ExecuteContextInner {
1400 tx,
1401 session,
1402 extra,
1403 internal_cmd_tx,
1404 }
1405 .into(),
1406 }
1407 }
1408
1409 pub fn into_parts(
1418 self,
1419 ) -> (
1420 ClientTransmitter<ExecuteResponse>,
1421 mpsc::UnboundedSender<Message>,
1422 Session,
1423 ExecuteContextExtra,
1424 ) {
1425 let ExecuteContextInner {
1426 tx,
1427 internal_cmd_tx,
1428 session,
1429 extra,
1430 } = *self.inner;
1431 (tx, internal_cmd_tx, session, extra)
1432 }
1433
1434 #[instrument(level = "debug")]
1436 pub fn retire(self, result: Result<ExecuteResponse, AdapterError>) {
1437 let ExecuteContextInner {
1438 tx,
1439 internal_cmd_tx,
1440 session,
1441 extra,
1442 } = *self.inner;
1443 let reason = if extra.is_trivial() {
1444 None
1445 } else {
1446 Some((&result).into())
1447 };
1448 tx.send(result, session);
1449 if let Some(reason) = reason {
1450 if let Err(e) = internal_cmd_tx.send(Message::RetireExecute {
1451 otel_ctx: OpenTelemetryContext::obtain(),
1452 data: extra,
1453 reason,
1454 }) {
1455 warn!("internal_cmd_rx dropped before we could send: {:?}", e);
1456 }
1457 }
1458 }
1459
1460 pub fn extra(&self) -> &ExecuteContextExtra {
1461 &self.extra
1462 }
1463
1464 pub fn extra_mut(&mut self) -> &mut ExecuteContextExtra {
1465 &mut self.extra
1466 }
1467}
1468
1469#[derive(Debug)]
1470struct ClusterReplicaStatuses(
1471 BTreeMap<ClusterId, BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>>>,
1472);
1473
1474impl ClusterReplicaStatuses {
1475 pub(crate) fn new() -> ClusterReplicaStatuses {
1476 ClusterReplicaStatuses(BTreeMap::new())
1477 }
1478
1479 pub(crate) fn initialize_cluster_statuses(&mut self, cluster_id: ClusterId) {
1483 let prev = self.0.insert(cluster_id, BTreeMap::new());
1484 assert_eq!(
1485 prev, None,
1486 "cluster {cluster_id} statuses already initialized"
1487 );
1488 }
1489
1490 pub(crate) fn initialize_cluster_replica_statuses(
1494 &mut self,
1495 cluster_id: ClusterId,
1496 replica_id: ReplicaId,
1497 num_processes: usize,
1498 time: DateTime<Utc>,
1499 ) {
1500 tracing::info!(
1501 ?cluster_id,
1502 ?replica_id,
1503 ?time,
1504 "initializing cluster replica status"
1505 );
1506 let replica_statuses = self.0.entry(cluster_id).or_default();
1507 let process_statuses = (0..num_processes)
1508 .map(|process_id| {
1509 let status = ClusterReplicaProcessStatus {
1510 status: ClusterStatus::Offline(Some(OfflineReason::Initializing)),
1511 time: time.clone(),
1512 };
1513 (u64::cast_from(process_id), status)
1514 })
1515 .collect();
1516 let prev = replica_statuses.insert(replica_id, process_statuses);
1517 assert_none!(
1518 prev,
1519 "cluster replica {cluster_id}.{replica_id} statuses already initialized"
1520 );
1521 }
1522
1523 pub(crate) fn remove_cluster_statuses(
1527 &mut self,
1528 cluster_id: &ClusterId,
1529 ) -> BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>> {
1530 let prev = self.0.remove(cluster_id);
1531 prev.unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"))
1532 }
1533
1534 pub(crate) fn remove_cluster_replica_statuses(
1538 &mut self,
1539 cluster_id: &ClusterId,
1540 replica_id: &ReplicaId,
1541 ) -> BTreeMap<ProcessId, ClusterReplicaProcessStatus> {
1542 let replica_statuses = self
1543 .0
1544 .get_mut(cluster_id)
1545 .unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"));
1546 let prev = replica_statuses.remove(replica_id);
1547 prev.unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1548 }
1549
1550 pub(crate) fn ensure_cluster_status(
1554 &mut self,
1555 cluster_id: ClusterId,
1556 replica_id: ReplicaId,
1557 process_id: ProcessId,
1558 status: ClusterReplicaProcessStatus,
1559 ) {
1560 let replica_statuses = self
1561 .0
1562 .get_mut(&cluster_id)
1563 .unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"))
1564 .get_mut(&replica_id)
1565 .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"));
1566 replica_statuses.insert(process_id, status);
1567 }
1568
1569 pub fn get_cluster_replica_status(
1573 &self,
1574 cluster_id: ClusterId,
1575 replica_id: ReplicaId,
1576 ) -> ClusterStatus {
1577 let process_status = self.get_cluster_replica_statuses(cluster_id, replica_id);
1578 Self::cluster_replica_status(process_status)
1579 }
1580
1581 pub fn cluster_replica_status(
1583 process_status: &BTreeMap<ProcessId, ClusterReplicaProcessStatus>,
1584 ) -> ClusterStatus {
1585 process_status
1586 .values()
1587 .fold(ClusterStatus::Online, |s, p| match (s, p.status) {
1588 (ClusterStatus::Online, ClusterStatus::Online) => ClusterStatus::Online,
1589 (x, y) => {
1590 let reason_x = match x {
1591 ClusterStatus::Offline(reason) => reason,
1592 ClusterStatus::Online => None,
1593 };
1594 let reason_y = match y {
1595 ClusterStatus::Offline(reason) => reason,
1596 ClusterStatus::Online => None,
1597 };
1598 ClusterStatus::Offline(reason_x.or(reason_y))
1600 }
1601 })
1602 }
1603
1604 pub(crate) fn get_cluster_replica_statuses(
1608 &self,
1609 cluster_id: ClusterId,
1610 replica_id: ReplicaId,
1611 ) -> &BTreeMap<ProcessId, ClusterReplicaProcessStatus> {
1612 self.try_get_cluster_replica_statuses(cluster_id, replica_id)
1613 .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1614 }
1615
1616 pub(crate) fn try_get_cluster_replica_statuses(
1618 &self,
1619 cluster_id: ClusterId,
1620 replica_id: ReplicaId,
1621 ) -> Option<&BTreeMap<ProcessId, ClusterReplicaProcessStatus>> {
1622 self.try_get_cluster_statuses(cluster_id)
1623 .and_then(|statuses| statuses.get(&replica_id))
1624 }
1625
1626 pub(crate) fn try_get_cluster_statuses(
1628 &self,
1629 cluster_id: ClusterId,
1630 ) -> Option<&BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>>> {
1631 self.0.get(&cluster_id)
1632 }
1633}
1634
1635#[derive(Derivative)]
1637#[derivative(Debug)]
1638pub struct Coordinator {
1639 #[derivative(Debug = "ignore")]
1641 controller: mz_controller::Controller,
1642 catalog: Arc<Catalog>,
1650
1651 persist_client: PersistClient,
1654
1655 internal_cmd_tx: mpsc::UnboundedSender<Message>,
1657 group_commit_tx: appends::GroupCommitNotifier,
1659
1660 strict_serializable_reads_tx: mpsc::UnboundedSender<(ConnectionId, PendingReadTxn)>,
1662
1663 global_timelines: BTreeMap<Timeline, TimelineState<Timestamp>>,
1666
1667 transient_id_gen: Arc<TransientIdGen>,
1669 active_conns: BTreeMap<ConnectionId, ConnMeta>,
1672
1673 txn_read_holds: BTreeMap<ConnectionId, read_policy::ReadHolds<Timestamp>>,
1677
1678 pending_peeks: BTreeMap<Uuid, PendingPeek>,
1682 client_pending_peeks: BTreeMap<ConnectionId, BTreeMap<Uuid, ClusterId>>,
1684
1685 pending_linearize_read_txns: BTreeMap<ConnectionId, PendingReadTxn>,
1687
1688 active_compute_sinks: BTreeMap<GlobalId, ActiveComputeSink>,
1690 active_webhooks: BTreeMap<CatalogItemId, WebhookAppenderInvalidator>,
1692 active_copies: BTreeMap<ConnectionId, ActiveCopyFrom>,
1695
1696 staged_cancellation: BTreeMap<ConnectionId, (watch::Sender<bool>, watch::Receiver<bool>)>,
1699 introspection_subscribes: BTreeMap<GlobalId, IntrospectionSubscribe>,
1701
1702 write_locks: BTreeMap<CatalogItemId, Arc<tokio::sync::Mutex<()>>>,
1704 deferred_write_ops: BTreeMap<ConnectionId, DeferredOp>,
1706
1707 pending_writes: Vec<PendingWriteTxn>,
1709
1710 advance_timelines_interval: Interval,
1720
1721 serialized_ddl: LockedVecDeque<DeferredPlanStatement>,
1730
1731 secrets_controller: Arc<dyn SecretsController>,
1734 caching_secrets_reader: CachingSecretsReader,
1736
1737 cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
1740
1741 storage_usage_client: StorageUsageClient,
1743 storage_usage_collection_interval: Duration,
1745
1746 #[derivative(Debug = "ignore")]
1748 segment_client: Option<mz_segment::Client>,
1749
1750 metrics: Metrics,
1752 optimizer_metrics: OptimizerMetrics,
1754
1755 tracing_handle: TracingHandle,
1757
1758 statement_logging: StatementLogging,
1760
1761 webhook_concurrency_limit: WebhookConcurrencyLimiter,
1763
1764 pg_timestamp_oracle_config: Option<PostgresTimestampOracleConfig>,
1768
1769 check_cluster_scheduling_policies_interval: Interval,
1771
1772 cluster_scheduling_decisions: BTreeMap<ClusterId, BTreeMap<&'static str, SchedulingDecision>>,
1776
1777 caught_up_check_interval: Interval,
1780
1781 caught_up_check: Option<CaughtUpCheckContext>,
1784
1785 installed_watch_sets: BTreeMap<WatchSetId, (ConnectionId, WatchSetResponse)>,
1787
1788 connection_watch_sets: BTreeMap<ConnectionId, BTreeSet<WatchSetId>>,
1790
1791 cluster_replica_statuses: ClusterReplicaStatuses,
1793
1794 read_only_controllers: bool,
1798
1799 buffered_builtin_table_updates: Option<Vec<BuiltinTableUpdate>>,
1807
1808 license_key: ValidatedLicenseKey,
1809}
1810
1811impl Coordinator {
1812 #[instrument(name = "coord::bootstrap")]
1816 pub(crate) async fn bootstrap(
1817 &mut self,
1818 boot_ts: Timestamp,
1819 migrated_storage_collections_0dt: BTreeSet<CatalogItemId>,
1820 mut builtin_table_updates: Vec<BuiltinTableUpdate>,
1821 cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
1822 uncached_local_exprs: BTreeMap<GlobalId, LocalExpressions>,
1823 audit_logs_iterator: AuditLogIterator,
1824 ) -> Result<(), AdapterError> {
1825 let bootstrap_start = Instant::now();
1826 info!("startup: coordinator init: bootstrap beginning");
1827 info!("startup: coordinator init: bootstrap: preamble beginning");
1828
1829 let cluster_statuses: Vec<(_, Vec<_>)> = self
1832 .catalog()
1833 .clusters()
1834 .map(|cluster| {
1835 (
1836 cluster.id(),
1837 cluster
1838 .replicas()
1839 .map(|replica| {
1840 (replica.replica_id, replica.config.location.num_processes())
1841 })
1842 .collect(),
1843 )
1844 })
1845 .collect();
1846 let now = self.now_datetime();
1847 for (cluster_id, replica_statuses) in cluster_statuses {
1848 self.cluster_replica_statuses
1849 .initialize_cluster_statuses(cluster_id);
1850 for (replica_id, num_processes) in replica_statuses {
1851 self.cluster_replica_statuses
1852 .initialize_cluster_replica_statuses(
1853 cluster_id,
1854 replica_id,
1855 num_processes,
1856 now,
1857 );
1858 }
1859 }
1860
1861 let system_config = self.catalog().system_config();
1862
1863 mz_metrics::update_dyncfg(&system_config.dyncfg_updates());
1865
1866 let compute_config = flags::compute_config(system_config);
1868 let storage_config = flags::storage_config(system_config);
1869 let scheduling_config = flags::orchestrator_scheduling_config(system_config);
1870 let dyncfg_updates = system_config.dyncfg_updates();
1871 self.controller.compute.update_configuration(compute_config);
1872 self.controller.storage.update_parameters(storage_config);
1873 self.controller
1874 .update_orchestrator_scheduling_config(scheduling_config);
1875 self.controller.update_configuration(dyncfg_updates);
1876
1877 self.validate_resource_limit_numeric(
1878 Numeric::zero(),
1879 self.current_credit_consumption_rate(),
1880 |system_vars| {
1881 self.license_key
1882 .max_credit_consumption_rate()
1883 .map_or_else(|| system_vars.max_credit_consumption_rate(), Numeric::from)
1884 },
1885 "cluster replica",
1886 MAX_CREDIT_CONSUMPTION_RATE.name(),
1887 )?;
1888
1889 let mut policies_to_set: BTreeMap<CompactionWindow, CollectionIdBundle> =
1890 Default::default();
1891
1892 let enable_worker_core_affinity =
1893 self.catalog().system_config().enable_worker_core_affinity();
1894 for instance in self.catalog.clusters() {
1895 self.controller.create_cluster(
1896 instance.id,
1897 ClusterConfig {
1898 arranged_logs: instance.log_indexes.clone(),
1899 workload_class: instance.config.workload_class.clone(),
1900 },
1901 )?;
1902 for replica in instance.replicas() {
1903 let role = instance.role();
1904 self.controller.create_replica(
1905 instance.id,
1906 replica.replica_id,
1907 instance.name.clone(),
1908 replica.name.clone(),
1909 role,
1910 replica.config.clone(),
1911 enable_worker_core_affinity,
1912 )?;
1913 }
1914 }
1915
1916 info!(
1917 "startup: coordinator init: bootstrap: preamble complete in {:?}",
1918 bootstrap_start.elapsed()
1919 );
1920
1921 let init_storage_collections_start = Instant::now();
1922 info!("startup: coordinator init: bootstrap: storage collections init beginning");
1923 self.bootstrap_storage_collections(&migrated_storage_collections_0dt)
1924 .await;
1925 info!(
1926 "startup: coordinator init: bootstrap: storage collections init complete in {:?}",
1927 init_storage_collections_start.elapsed()
1928 );
1929
1930 self.controller.start_compute_introspection_sink();
1935
1936 let optimize_dataflows_start = Instant::now();
1937 info!("startup: coordinator init: bootstrap: optimize dataflow plans beginning");
1938 let entries: Vec<_> = self.catalog().entries().cloned().collect();
1939 let uncached_global_exps = self.bootstrap_dataflow_plans(&entries, cached_global_exprs)?;
1940 info!(
1941 "startup: coordinator init: bootstrap: optimize dataflow plans complete in {:?}",
1942 optimize_dataflows_start.elapsed()
1943 );
1944
1945 let _fut = self.catalog().update_expression_cache(
1947 uncached_local_exprs.into_iter().collect(),
1948 uncached_global_exps.into_iter().collect(),
1949 );
1950
1951 let bootstrap_as_ofs_start = Instant::now();
1955 info!("startup: coordinator init: bootstrap: dataflow as-of bootstrapping beginning");
1956 let dataflow_read_holds = self.bootstrap_dataflow_as_ofs().await;
1957 info!(
1958 "startup: coordinator init: bootstrap: dataflow as-of bootstrapping complete in {:?}",
1959 bootstrap_as_ofs_start.elapsed()
1960 );
1961
1962 let postamble_start = Instant::now();
1963 info!("startup: coordinator init: bootstrap: postamble beginning");
1964
1965 let logs: BTreeSet<_> = BUILTINS::logs()
1966 .map(|log| self.catalog().resolve_builtin_log(log))
1967 .flat_map(|item_id| self.catalog().get_global_ids(&item_id))
1968 .collect();
1969
1970 let mut privatelink_connections = BTreeMap::new();
1971
1972 for entry in &entries {
1973 debug!(
1974 "coordinator init: installing {} {}",
1975 entry.item().typ(),
1976 entry.id()
1977 );
1978 let mut policy = entry.item().initial_logical_compaction_window();
1979 match entry.item() {
1980 CatalogItem::Source(source) => {
1986 if source.custom_logical_compaction_window.is_none() {
1988 if let DataSourceDesc::IngestionExport { ingestion_id, .. } =
1989 source.data_source
1990 {
1991 policy = Some(
1992 self.catalog()
1993 .get_entry(&ingestion_id)
1994 .source()
1995 .expect("must be source")
1996 .custom_logical_compaction_window
1997 .unwrap_or_default(),
1998 );
1999 }
2000 }
2001 policies_to_set
2002 .entry(policy.expect("sources have a compaction window"))
2003 .or_insert_with(Default::default)
2004 .storage_ids
2005 .insert(source.global_id());
2006 }
2007 CatalogItem::Table(table) => {
2008 policies_to_set
2009 .entry(policy.expect("tables have a compaction window"))
2010 .or_insert_with(Default::default)
2011 .storage_ids
2012 .extend(table.global_ids());
2013 }
2014 CatalogItem::Index(idx) => {
2015 let policy_entry = policies_to_set
2016 .entry(policy.expect("indexes have a compaction window"))
2017 .or_insert_with(Default::default);
2018
2019 if logs.contains(&idx.on) {
2020 policy_entry
2021 .compute_ids
2022 .entry(idx.cluster_id)
2023 .or_insert_with(BTreeSet::new)
2024 .insert(idx.global_id());
2025 } else {
2026 let df_desc = self
2027 .catalog()
2028 .try_get_physical_plan(&idx.global_id())
2029 .expect("added in `bootstrap_dataflow_plans`")
2030 .clone();
2031
2032 let df_meta = self
2033 .catalog()
2034 .try_get_dataflow_metainfo(&idx.global_id())
2035 .expect("added in `bootstrap_dataflow_plans`");
2036
2037 if self.catalog().state().system_config().enable_mz_notices() {
2038 self.catalog().state().pack_optimizer_notices(
2040 &mut builtin_table_updates,
2041 df_meta.optimizer_notices.iter(),
2042 Diff::ONE,
2043 );
2044 }
2045
2046 policy_entry
2049 .compute_ids
2050 .entry(idx.cluster_id)
2051 .or_insert_with(Default::default)
2052 .extend(df_desc.export_ids());
2053
2054 self.controller
2055 .compute
2056 .create_dataflow(idx.cluster_id, df_desc, None)
2057 .unwrap_or_terminate("cannot fail to create dataflows");
2058 }
2059 }
2060 CatalogItem::View(_) => (),
2061 CatalogItem::MaterializedView(mview) => {
2062 policies_to_set
2063 .entry(policy.expect("materialized views have a compaction window"))
2064 .or_insert_with(Default::default)
2065 .storage_ids
2066 .insert(mview.global_id());
2067
2068 let mut df_desc = self
2069 .catalog()
2070 .try_get_physical_plan(&mview.global_id())
2071 .expect("added in `bootstrap_dataflow_plans`")
2072 .clone();
2073
2074 if let Some(initial_as_of) = mview.initial_as_of.clone() {
2075 df_desc.set_initial_as_of(initial_as_of);
2076 }
2077
2078 let until = mview
2080 .refresh_schedule
2081 .as_ref()
2082 .and_then(|s| s.last_refresh())
2083 .and_then(|r| r.try_step_forward());
2084 if let Some(until) = until {
2085 df_desc.until.meet_assign(&Antichain::from_elem(until));
2086 }
2087
2088 let df_meta = self
2089 .catalog()
2090 .try_get_dataflow_metainfo(&mview.global_id())
2091 .expect("added in `bootstrap_dataflow_plans`");
2092
2093 if self.catalog().state().system_config().enable_mz_notices() {
2094 self.catalog().state().pack_optimizer_notices(
2096 &mut builtin_table_updates,
2097 df_meta.optimizer_notices.iter(),
2098 Diff::ONE,
2099 );
2100 }
2101
2102 self.ship_dataflow(df_desc, mview.cluster_id, None).await;
2103 }
2104 CatalogItem::Sink(sink) => {
2105 policies_to_set
2106 .entry(CompactionWindow::Default)
2107 .or_insert_with(Default::default)
2108 .storage_ids
2109 .insert(sink.global_id());
2110 }
2111 CatalogItem::Connection(catalog_connection) => {
2112 if let ConnectionDetails::AwsPrivatelink(conn) = &catalog_connection.details {
2113 privatelink_connections.insert(
2114 entry.id(),
2115 VpcEndpointConfig {
2116 aws_service_name: conn.service_name.clone(),
2117 availability_zone_ids: conn.availability_zones.clone(),
2118 },
2119 );
2120 }
2121 }
2122 CatalogItem::ContinualTask(ct) => {
2123 policies_to_set
2124 .entry(policy.expect("continual tasks have a compaction window"))
2125 .or_insert_with(Default::default)
2126 .storage_ids
2127 .insert(ct.global_id());
2128
2129 let mut df_desc = self
2130 .catalog()
2131 .try_get_physical_plan(&ct.global_id())
2132 .expect("added in `bootstrap_dataflow_plans`")
2133 .clone();
2134
2135 if let Some(initial_as_of) = ct.initial_as_of.clone() {
2136 df_desc.set_initial_as_of(initial_as_of);
2137 }
2138
2139 let df_meta = self
2140 .catalog()
2141 .try_get_dataflow_metainfo(&ct.global_id())
2142 .expect("added in `bootstrap_dataflow_plans`");
2143
2144 if self.catalog().state().system_config().enable_mz_notices() {
2145 self.catalog().state().pack_optimizer_notices(
2147 &mut builtin_table_updates,
2148 df_meta.optimizer_notices.iter(),
2149 Diff::ONE,
2150 );
2151 }
2152
2153 self.ship_dataflow(df_desc, ct.cluster_id, None).await;
2154 }
2155 CatalogItem::Log(_)
2157 | CatalogItem::Type(_)
2158 | CatalogItem::Func(_)
2159 | CatalogItem::Secret(_) => {}
2160 }
2161 }
2162
2163 if let Some(cloud_resource_controller) = &self.cloud_resource_controller {
2164 let existing_vpc_endpoints = cloud_resource_controller
2166 .list_vpc_endpoints()
2167 .await
2168 .context("list vpc endpoints")?;
2169 let existing_vpc_endpoints = BTreeSet::from_iter(existing_vpc_endpoints.into_keys());
2170 let desired_vpc_endpoints = privatelink_connections.keys().cloned().collect();
2171 let vpc_endpoints_to_remove = existing_vpc_endpoints.difference(&desired_vpc_endpoints);
2172 for id in vpc_endpoints_to_remove {
2173 cloud_resource_controller
2174 .delete_vpc_endpoint(*id)
2175 .await
2176 .context("deleting extraneous vpc endpoint")?;
2177 }
2178
2179 for (id, spec) in privatelink_connections {
2181 cloud_resource_controller
2182 .ensure_vpc_endpoint(id, spec)
2183 .await
2184 .context("ensuring vpc endpoint")?;
2185 }
2186 }
2187
2188 drop(dataflow_read_holds);
2191 for (cw, policies) in policies_to_set {
2193 self.initialize_read_policies(&policies, cw).await;
2194 }
2195
2196 builtin_table_updates.extend(
2198 self.catalog().state().resolve_builtin_table_updates(
2199 self.catalog().state().pack_all_replica_size_updates(),
2200 ),
2201 );
2202
2203 debug!("startup: coordinator init: bootstrap: initializing migrated builtin tables");
2204 let migrated_updates_fut = if self.controller.read_only() {
2210 let min_timestamp = Timestamp::minimum();
2211 let migrated_builtin_table_updates: Vec<_> = builtin_table_updates
2212 .extract_if(.., |update| {
2213 let gid = self.catalog().get_entry(&update.id).latest_global_id();
2214 migrated_storage_collections_0dt.contains(&update.id)
2215 && self
2216 .controller
2217 .storage_collections
2218 .collection_frontiers(gid)
2219 .expect("all tables are registered")
2220 .write_frontier
2221 .elements()
2222 == &[min_timestamp]
2223 })
2224 .collect();
2225 if migrated_builtin_table_updates.is_empty() {
2226 futures::future::ready(()).boxed()
2227 } else {
2228 let mut grouped_appends: BTreeMap<GlobalId, Vec<TableData>> = BTreeMap::new();
2230 for update in migrated_builtin_table_updates {
2231 let gid = self.catalog().get_entry(&update.id).latest_global_id();
2232 grouped_appends.entry(gid).or_default().push(update.data);
2233 }
2234 info!(
2235 "coordinator init: rehydrating migrated builtin tables in read-only mode: {:?}",
2236 grouped_appends.keys().collect::<Vec<_>>()
2237 );
2238
2239 let mut all_appends = Vec::with_capacity(grouped_appends.len());
2241 for (item_id, table_data) in grouped_appends.into_iter() {
2242 let mut all_rows = Vec::new();
2243 let mut all_data = Vec::new();
2244 for data in table_data {
2245 match data {
2246 TableData::Rows(rows) => all_rows.extend(rows),
2247 TableData::Batches(_) => all_data.push(data),
2248 }
2249 }
2250 differential_dataflow::consolidation::consolidate(&mut all_rows);
2251 all_data.push(TableData::Rows(all_rows));
2252
2253 all_appends.push((item_id, all_data));
2255 }
2256
2257 let fut = self
2258 .controller
2259 .storage
2260 .append_table(min_timestamp, boot_ts.step_forward(), all_appends)
2261 .expect("cannot fail to append");
2262 async {
2263 fut.await
2264 .expect("One-shot shouldn't be dropped during bootstrap")
2265 .unwrap_or_terminate("cannot fail to append")
2266 }
2267 .boxed()
2268 }
2269 } else {
2270 futures::future::ready(()).boxed()
2271 };
2272
2273 info!(
2274 "startup: coordinator init: bootstrap: postamble complete in {:?}",
2275 postamble_start.elapsed()
2276 );
2277
2278 let builtin_update_start = Instant::now();
2279 info!("startup: coordinator init: bootstrap: generate builtin updates beginning");
2280
2281 if self.controller.read_only() {
2282 info!(
2283 "coordinator init: bootstrap: stashing builtin table updates while in read-only mode"
2284 );
2285
2286 let audit_join_start = Instant::now();
2288 info!("startup: coordinator init: bootstrap: audit log deserialization beginning");
2289 let audit_log_updates: Vec<_> = audit_logs_iterator
2290 .map(|(audit_log, ts)| StateUpdate {
2291 kind: StateUpdateKind::AuditLog(audit_log),
2292 ts,
2293 diff: StateDiff::Addition,
2294 })
2295 .collect();
2296 let audit_log_builtin_table_updates = self
2297 .catalog()
2298 .state()
2299 .generate_builtin_table_updates(audit_log_updates);
2300 builtin_table_updates.extend(audit_log_builtin_table_updates);
2301 info!(
2302 "startup: coordinator init: bootstrap: audit log deserialization complete in {:?}",
2303 audit_join_start.elapsed()
2304 );
2305 self.buffered_builtin_table_updates
2306 .as_mut()
2307 .expect("in read-only mode")
2308 .append(&mut builtin_table_updates);
2309 } else {
2310 self.bootstrap_tables(&entries, builtin_table_updates, audit_logs_iterator)
2311 .await;
2312 };
2313 info!(
2314 "startup: coordinator init: bootstrap: generate builtin updates complete in {:?}",
2315 builtin_update_start.elapsed()
2316 );
2317
2318 let cleanup_secrets_start = Instant::now();
2319 info!("startup: coordinator init: bootstrap: generate secret cleanup beginning");
2320 {
2324 let Self {
2327 secrets_controller,
2328 catalog,
2329 ..
2330 } = self;
2331
2332 let next_user_item_id = catalog.get_next_user_item_id().await?;
2333 let next_system_item_id = catalog.get_next_system_item_id().await?;
2334 let read_only = self.controller.read_only();
2335 let catalog_ids: BTreeSet<CatalogItemId> =
2340 catalog.entries().map(|entry| entry.id()).collect();
2341 let secrets_controller = Arc::clone(secrets_controller);
2342
2343 spawn(|| "cleanup-orphaned-secrets", async move {
2344 if read_only {
2345 info!(
2346 "coordinator init: not cleaning up orphaned secrets while in read-only mode"
2347 );
2348 return;
2349 }
2350 info!("coordinator init: cleaning up orphaned secrets");
2351
2352 match secrets_controller.list().await {
2353 Ok(controller_secrets) => {
2354 let controller_secrets: BTreeSet<CatalogItemId> =
2355 controller_secrets.into_iter().collect();
2356 let orphaned = controller_secrets.difference(&catalog_ids);
2357 for id in orphaned {
2358 let id_too_large = match id {
2359 CatalogItemId::System(id) => *id >= next_system_item_id,
2360 CatalogItemId::User(id) => *id >= next_user_item_id,
2361 CatalogItemId::IntrospectionSourceIndex(_)
2362 | CatalogItemId::Transient(_) => false,
2363 };
2364 if id_too_large {
2365 info!(
2366 %next_user_item_id, %next_system_item_id,
2367 "coordinator init: not deleting orphaned secret {id} that was likely created by a newer deploy generation"
2368 );
2369 } else {
2370 info!("coordinator init: deleting orphaned secret {id}");
2371 fail_point!("orphan_secrets");
2372 if let Err(e) = secrets_controller.delete(*id).await {
2373 warn!(
2374 "Dropping orphaned secret has encountered an error: {}",
2375 e
2376 );
2377 }
2378 }
2379 }
2380 }
2381 Err(e) => warn!("Failed to list secrets during orphan cleanup: {:?}", e),
2382 }
2383 });
2384 }
2385 info!(
2386 "startup: coordinator init: bootstrap: generate secret cleanup complete in {:?}",
2387 cleanup_secrets_start.elapsed()
2388 );
2389
2390 let final_steps_start = Instant::now();
2392 info!(
2393 "startup: coordinator init: bootstrap: migrate builtin tables in read-only mode beginning"
2394 );
2395 migrated_updates_fut
2396 .instrument(info_span!("coord::bootstrap::final"))
2397 .await;
2398
2399 debug!(
2400 "startup: coordinator init: bootstrap: announcing completion of initialization to controller"
2401 );
2402 self.controller.initialization_complete();
2404
2405 self.bootstrap_introspection_subscribes().await;
2407
2408 info!(
2409 "startup: coordinator init: bootstrap: migrate builtin tables in read-only mode complete in {:?}",
2410 final_steps_start.elapsed()
2411 );
2412
2413 info!(
2414 "startup: coordinator init: bootstrap complete in {:?}",
2415 bootstrap_start.elapsed()
2416 );
2417 Ok(())
2418 }
2419
2420 #[allow(clippy::async_yields_async)]
2425 #[instrument]
2426 async fn bootstrap_tables(
2427 &mut self,
2428 entries: &[CatalogEntry],
2429 mut builtin_table_updates: Vec<BuiltinTableUpdate>,
2430 audit_logs_iterator: AuditLogIterator,
2431 ) {
2432 struct TableMetadata<'a> {
2434 id: CatalogItemId,
2435 name: &'a QualifiedItemName,
2436 table: &'a Table,
2437 }
2438
2439 let table_metas: Vec<_> = entries
2441 .into_iter()
2442 .filter_map(|entry| {
2443 entry.table().map(|table| TableMetadata {
2444 id: entry.id(),
2445 name: entry.name(),
2446 table,
2447 })
2448 })
2449 .collect();
2450
2451 debug!("coordinator init: advancing all tables to current timestamp");
2453 let WriteTimestamp {
2454 timestamp: write_ts,
2455 advance_to,
2456 } = self.get_local_write_ts().await;
2457 let appends = table_metas
2458 .iter()
2459 .map(|meta| (meta.table.global_id_writes(), Vec::new()))
2460 .collect();
2461 let table_fence_rx = self
2465 .controller
2466 .storage
2467 .append_table(write_ts.clone(), advance_to, appends)
2468 .expect("invalid updates");
2469
2470 self.apply_local_write(write_ts).await;
2471
2472 debug!("coordinator init: resetting system tables");
2474 let read_ts = self.get_local_read_ts().await;
2475
2476 let mz_storage_usage_by_shard_schema: SchemaSpecifier = self
2479 .catalog()
2480 .resolve_system_schema(MZ_STORAGE_USAGE_BY_SHARD.schema)
2481 .into();
2482 let is_storage_usage_by_shard = |meta: &TableMetadata| -> bool {
2483 meta.name.item == MZ_STORAGE_USAGE_BY_SHARD.name
2484 && meta.name.qualifiers.schema_spec == mz_storage_usage_by_shard_schema
2485 };
2486
2487 let mut retraction_tasks = Vec::new();
2488 let mut system_tables: Vec<_> = table_metas
2489 .iter()
2490 .filter(|meta| meta.id.is_system() && !is_storage_usage_by_shard(meta))
2491 .collect();
2492
2493 let (audit_events_idx, _) = system_tables
2495 .iter()
2496 .find_position(|table| {
2497 table.id == self.catalog().resolve_builtin_table(&MZ_AUDIT_EVENTS)
2498 })
2499 .expect("mz_audit_events must exist");
2500 let audit_events = system_tables.remove(audit_events_idx);
2501 let audit_log_task = self.bootstrap_audit_log_table(
2502 audit_events.id,
2503 audit_events.name,
2504 audit_events.table,
2505 audit_logs_iterator,
2506 read_ts,
2507 );
2508
2509 for system_table in system_tables {
2510 let table_id = system_table.id;
2511 let full_name = self.catalog().resolve_full_name(system_table.name, None);
2512 debug!("coordinator init: resetting system table {full_name} ({table_id})");
2513
2514 let snapshot_fut = self
2516 .controller
2517 .storage_collections
2518 .snapshot_cursor(system_table.table.global_id_writes(), read_ts);
2519 let batch_fut = self
2520 .controller
2521 .storage_collections
2522 .create_update_builder(system_table.table.global_id_writes());
2523
2524 let task = spawn(|| format!("snapshot-{table_id}"), async move {
2525 let mut batch = batch_fut
2527 .await
2528 .unwrap_or_terminate("cannot fail to create a batch for a BuiltinTable");
2529 tracing::info!(?table_id, "starting snapshot");
2530 let mut snapshot_cursor = snapshot_fut
2532 .await
2533 .unwrap_or_terminate("cannot fail to snapshot");
2534
2535 while let Some(values) = snapshot_cursor.next().await {
2537 for ((key, _val), _t, d) in values {
2538 let key = key.expect("builtin table had errors");
2539 let d_invert = d.neg();
2540 batch.add(&key, &(), &d_invert).await;
2541 }
2542 }
2543 tracing::info!(?table_id, "finished snapshot");
2544
2545 let batch = batch.finish().await;
2546 BuiltinTableUpdate::batch(table_id, batch)
2547 });
2548 retraction_tasks.push(task);
2549 }
2550
2551 let retractions_res = futures::future::join_all(retraction_tasks).await;
2552 for retractions in retractions_res {
2553 let retractions = retractions.expect("cannot fail to fetch snapshot");
2554 builtin_table_updates.push(retractions);
2555 }
2556
2557 let audit_join_start = Instant::now();
2558 info!("startup: coordinator init: bootstrap: join audit log deserialization beginning");
2559 let audit_log_updates = audit_log_task
2560 .await
2561 .expect("cannot fail to fetch audit log updates");
2562 let audit_log_builtin_table_updates = self
2563 .catalog()
2564 .state()
2565 .generate_builtin_table_updates(audit_log_updates);
2566 builtin_table_updates.extend(audit_log_builtin_table_updates);
2567 info!(
2568 "startup: coordinator init: bootstrap: join audit log deserialization complete in {:?}",
2569 audit_join_start.elapsed()
2570 );
2571
2572 table_fence_rx
2574 .await
2575 .expect("One-shot shouldn't be dropped during bootstrap")
2576 .unwrap_or_terminate("cannot fail to append");
2577
2578 info!("coordinator init: sending builtin table updates");
2579 let (_builtin_updates_fut, write_ts) = self
2580 .builtin_table_update()
2581 .execute(builtin_table_updates)
2582 .await;
2583 info!(?write_ts, "our write ts");
2584 if let Some(write_ts) = write_ts {
2585 self.apply_local_write(write_ts).await;
2586 }
2587 }
2588
2589 #[instrument]
2593 fn bootstrap_audit_log_table<'a>(
2594 &mut self,
2595 table_id: CatalogItemId,
2596 name: &'a QualifiedItemName,
2597 table: &'a Table,
2598 audit_logs_iterator: AuditLogIterator,
2599 read_ts: Timestamp,
2600 ) -> JoinHandle<Vec<StateUpdate>> {
2601 let full_name = self.catalog().resolve_full_name(name, None);
2602 debug!("coordinator init: reconciling audit log: {full_name} ({table_id})");
2603 let current_contents_fut = self
2604 .controller
2605 .storage_collections
2606 .snapshot(table.global_id_writes(), read_ts);
2607 spawn(|| format!("snapshot-audit-log-{table_id}"), async move {
2608 let current_contents = current_contents_fut
2609 .await
2610 .unwrap_or_terminate("cannot fail to fetch snapshot");
2611 let contents_len = current_contents.len();
2612 debug!("coordinator init: audit log table ({table_id}) size {contents_len}");
2613
2614 let max_table_id = current_contents
2616 .into_iter()
2617 .filter(|(_, diff)| *diff == 1)
2618 .map(|(row, _diff)| row.unpack_first().unwrap_uint64())
2619 .sorted()
2620 .rev()
2621 .next();
2622
2623 audit_logs_iterator
2625 .take_while(|(audit_log, _)| match max_table_id {
2626 Some(id) => audit_log.event.sortable_id() > id,
2627 None => true,
2628 })
2629 .map(|(audit_log, ts)| StateUpdate {
2630 kind: StateUpdateKind::AuditLog(audit_log),
2631 ts,
2632 diff: StateDiff::Addition,
2633 })
2634 .collect::<Vec<_>>()
2635 })
2636 }
2637
2638 #[instrument]
2651 async fn bootstrap_storage_collections(
2652 &mut self,
2653 migrated_storage_collections: &BTreeSet<CatalogItemId>,
2654 ) {
2655 let catalog = self.catalog();
2656 let source_status_collection_id = catalog
2657 .resolve_builtin_storage_collection(&mz_catalog::builtin::MZ_SOURCE_STATUS_HISTORY);
2658 let source_status_collection_id = catalog
2659 .get_entry(&source_status_collection_id)
2660 .latest_global_id();
2661
2662 let source_desc = |object_id: GlobalId,
2663 data_source: &DataSourceDesc,
2664 desc: &RelationDesc,
2665 timeline: &Timeline| {
2666 let (data_source, status_collection_id) = match data_source.clone() {
2667 DataSourceDesc::Ingestion { desc, cluster_id } => {
2669 let desc = desc.into_inline_connection(catalog.state());
2670 let ingestion = IngestionDescription::new(desc, cluster_id, object_id);
2671
2672 (
2673 DataSource::Ingestion(ingestion),
2674 Some(source_status_collection_id),
2675 )
2676 }
2677 DataSourceDesc::OldSyntaxIngestion {
2678 desc,
2679 progress_subsource,
2680 data_config,
2681 details,
2682 cluster_id,
2683 } => {
2684 let desc = desc.into_inline_connection(catalog.state());
2685 let data_config = data_config.into_inline_connection(catalog.state());
2686 let progress_subsource =
2689 catalog.get_entry(&progress_subsource).latest_global_id();
2690 let mut ingestion =
2691 IngestionDescription::new(desc, cluster_id, progress_subsource);
2692 let legacy_export = SourceExport {
2693 storage_metadata: (),
2694 data_config,
2695 details,
2696 };
2697 ingestion.source_exports.insert(object_id, legacy_export);
2698
2699 (
2700 DataSource::Ingestion(ingestion),
2701 Some(source_status_collection_id),
2702 )
2703 }
2704 DataSourceDesc::IngestionExport {
2705 ingestion_id,
2706 external_reference: _,
2707 details,
2708 data_config,
2709 } => {
2710 let ingestion_id = catalog.get_entry(&ingestion_id).latest_global_id();
2713 (
2714 DataSource::IngestionExport {
2715 ingestion_id,
2716 details,
2717 data_config: data_config.into_inline_connection(catalog.state()),
2718 },
2719 Some(source_status_collection_id),
2720 )
2721 }
2722 DataSourceDesc::Webhook { .. } => {
2723 (DataSource::Webhook, Some(source_status_collection_id))
2724 }
2725 DataSourceDesc::Progress => (DataSource::Progress, None),
2726 DataSourceDesc::Introspection(introspection) => {
2727 (DataSource::Introspection(introspection), None)
2728 }
2729 };
2730 CollectionDescription {
2731 desc: desc.clone(),
2732 data_source,
2733 since: None,
2734 status_collection_id,
2735 timeline: Some(timeline.clone()),
2736 }
2737 };
2738
2739 let mut compute_collections = vec![];
2740 let mut collections = vec![];
2741 let mut new_builtin_continual_tasks = vec![];
2742 for entry in catalog.entries() {
2743 match entry.item() {
2744 CatalogItem::Source(source) => {
2745 collections.push((
2746 source.global_id(),
2747 source_desc(
2748 source.global_id(),
2749 &source.data_source,
2750 &source.desc,
2751 &source.timeline,
2752 ),
2753 ));
2754 }
2755 CatalogItem::Table(table) => {
2756 match &table.data_source {
2757 TableDataSource::TableWrites { defaults: _ } => {
2758 let versions: BTreeMap<_, _> = table
2759 .collection_descs()
2760 .map(|(gid, version, desc)| (version, (gid, desc)))
2761 .collect();
2762 let collection_descs = versions.iter().map(|(version, (gid, desc))| {
2763 let next_version = version.bump();
2764 let primary_collection =
2765 versions.get(&next_version).map(|(gid, _desc)| gid).copied();
2766 let collection_desc = CollectionDescription::for_table(
2767 desc.clone(),
2768 primary_collection,
2769 );
2770
2771 (*gid, collection_desc)
2772 });
2773 collections.extend(collection_descs);
2774 }
2775 TableDataSource::DataSource {
2776 desc: data_source_desc,
2777 timeline,
2778 } => {
2779 soft_assert_eq_or_log!(table.collections.len(), 1);
2781 let collection_descs =
2782 table.collection_descs().map(|(gid, _version, desc)| {
2783 (
2784 gid,
2785 source_desc(
2786 entry.latest_global_id(),
2787 data_source_desc,
2788 &desc,
2789 timeline,
2790 ),
2791 )
2792 });
2793 collections.extend(collection_descs);
2794 }
2795 };
2796 }
2797 CatalogItem::MaterializedView(mv) => {
2798 let collection_desc = CollectionDescription {
2799 desc: mv.desc.clone(),
2800 data_source: DataSource::Other,
2801 since: mv.initial_as_of.clone(),
2802 status_collection_id: None,
2803 timeline: None,
2804 };
2805 compute_collections.push((mv.global_id(), mv.desc.clone()));
2806 collections.push((mv.global_id(), collection_desc));
2807 }
2808 CatalogItem::ContinualTask(ct) => {
2809 let collection_desc = CollectionDescription {
2810 desc: ct.desc.clone(),
2811 data_source: DataSource::Other,
2812 since: ct.initial_as_of.clone(),
2813 status_collection_id: None,
2814 timeline: None,
2815 };
2816 if ct.global_id().is_system() && collection_desc.since.is_none() {
2817 new_builtin_continual_tasks.push((ct.global_id(), collection_desc));
2821 } else {
2822 compute_collections.push((ct.global_id(), ct.desc.clone()));
2823 collections.push((ct.global_id(), collection_desc));
2824 }
2825 }
2826 CatalogItem::Sink(sink) => {
2827 let storage_sink_from_entry = self.catalog().get_entry_by_global_id(&sink.from);
2828 let from_desc = storage_sink_from_entry
2829 .desc(&self.catalog().resolve_full_name(
2830 storage_sink_from_entry.name(),
2831 storage_sink_from_entry.conn_id(),
2832 ))
2833 .expect("sinks can only be built on items with descs")
2834 .into_owned();
2835 let collection_desc = CollectionDescription {
2836 desc: KAFKA_PROGRESS_DESC.clone(),
2838 data_source: DataSource::Sink {
2839 desc: ExportDescription {
2840 sink: StorageSinkDesc {
2841 from: sink.from,
2842 from_desc,
2843 connection: sink
2844 .connection
2845 .clone()
2846 .into_inline_connection(self.catalog().state()),
2847 envelope: sink.envelope,
2848 as_of: Antichain::from_elem(Timestamp::minimum()),
2849 with_snapshot: sink.with_snapshot,
2850 version: sink.version,
2851 from_storage_metadata: (),
2852 to_storage_metadata: (),
2853 },
2854 instance_id: sink.cluster_id,
2855 },
2856 },
2857 since: None,
2858 status_collection_id: None,
2859 timeline: None,
2860 };
2861 collections.push((sink.global_id, collection_desc));
2862 }
2863 _ => (),
2864 }
2865 }
2866
2867 let register_ts = if self.controller.read_only() {
2868 self.get_local_read_ts().await
2869 } else {
2870 self.get_local_write_ts().await.timestamp
2873 };
2874
2875 let storage_metadata = self.catalog.state().storage_metadata();
2876 let migrated_storage_collections = migrated_storage_collections
2877 .into_iter()
2878 .flat_map(|item_id| self.catalog.get_entry(item_id).global_ids())
2879 .collect();
2880
2881 self.controller
2886 .storage
2887 .evolve_nullability_for_bootstrap(storage_metadata, compute_collections)
2888 .await
2889 .unwrap_or_terminate("cannot fail to evolve collections");
2890
2891 self.controller
2892 .storage
2893 .create_collections_for_bootstrap(
2894 storage_metadata,
2895 Some(register_ts),
2896 collections,
2897 &migrated_storage_collections,
2898 )
2899 .await
2900 .unwrap_or_terminate("cannot fail to create collections");
2901
2902 self.bootstrap_builtin_continual_tasks(new_builtin_continual_tasks)
2903 .await;
2904
2905 if !self.controller.read_only() {
2906 self.apply_local_write(register_ts).await;
2907 }
2908 }
2909
2910 async fn bootstrap_builtin_continual_tasks(
2917 &mut self,
2918 mut collections: Vec<(GlobalId, CollectionDescription<Timestamp>)>,
2920 ) {
2921 for (id, collection) in &mut collections {
2922 let entry = self.catalog.get_entry_by_global_id(id);
2923 let ct = match &entry.item {
2924 CatalogItem::ContinualTask(ct) => ct.clone(),
2925 _ => unreachable!("only called with continual task builtins"),
2926 };
2927 let debug_name = self
2928 .catalog()
2929 .resolve_full_name(entry.name(), None)
2930 .to_string();
2931 let (_optimized_plan, physical_plan, _metainfo) = self
2932 .optimize_create_continual_task(&ct, *id, self.owned_catalog(), debug_name)
2933 .expect("builtin CT should optimize successfully");
2934
2935 let mut id_bundle = dataflow_import_id_bundle(&physical_plan, ct.cluster_id);
2937 id_bundle.storage_ids.remove(id);
2939 let read_holds = self.acquire_read_holds(&id_bundle);
2940 let as_of = read_holds.least_valid_read();
2941
2942 collection.since = Some(as_of.clone());
2943 }
2944 self.controller
2945 .storage
2946 .create_collections(self.catalog.state().storage_metadata(), None, collections)
2947 .await
2948 .unwrap_or_terminate("cannot fail to create collections");
2949 }
2950
2951 #[instrument]
2962 fn bootstrap_dataflow_plans(
2963 &mut self,
2964 ordered_catalog_entries: &[CatalogEntry],
2965 mut cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
2966 ) -> Result<BTreeMap<GlobalId, GlobalExpressions>, AdapterError> {
2967 let mut instance_snapshots = BTreeMap::new();
2973 let mut uncached_expressions = BTreeMap::new();
2974
2975 let optimizer_config = OptimizerConfig::from(self.catalog().system_config());
2976
2977 for entry in ordered_catalog_entries {
2978 match entry.item() {
2979 CatalogItem::Index(idx) => {
2980 let compute_instance =
2982 instance_snapshots.entry(idx.cluster_id).or_insert_with(|| {
2983 self.instance_snapshot(idx.cluster_id)
2984 .expect("compute instance exists")
2985 });
2986 let global_id = idx.global_id();
2987
2988 if compute_instance.contains_collection(&global_id) {
2991 continue;
2992 }
2993
2994 let (optimized_plan, physical_plan, metainfo) =
2995 match cached_global_exprs.remove(&global_id) {
2996 Some(global_expressions)
2997 if global_expressions.optimizer_features
2998 == optimizer_config.features =>
2999 {
3000 debug!("global expression cache hit for {global_id:?}");
3001 (
3002 global_expressions.global_mir,
3003 global_expressions.physical_plan,
3004 global_expressions.dataflow_metainfos,
3005 )
3006 }
3007 Some(_) | None => {
3008 let (optimized_plan, global_lir_plan) = {
3009 let mut optimizer = optimize::index::Optimizer::new(
3011 self.owned_catalog(),
3012 compute_instance.clone(),
3013 global_id,
3014 optimizer_config.clone(),
3015 self.optimizer_metrics(),
3016 );
3017
3018 let index_plan = optimize::index::Index::new(
3020 entry.name().clone(),
3021 idx.on,
3022 idx.keys.to_vec(),
3023 );
3024 let global_mir_plan = optimizer.optimize(index_plan)?;
3025 let optimized_plan = global_mir_plan.df_desc().clone();
3026
3027 let global_lir_plan = optimizer.optimize(global_mir_plan)?;
3029
3030 (optimized_plan, global_lir_plan)
3031 };
3032
3033 let (physical_plan, metainfo) = global_lir_plan.unapply();
3034 let metainfo = {
3035 let notice_ids =
3037 std::iter::repeat_with(|| self.allocate_transient_id())
3038 .map(|(_item_id, gid)| gid)
3039 .take(metainfo.optimizer_notices.len())
3040 .collect::<Vec<_>>();
3041 self.catalog().render_notices(
3043 metainfo,
3044 notice_ids,
3045 Some(idx.global_id()),
3046 )
3047 };
3048 uncached_expressions.insert(
3049 global_id,
3050 GlobalExpressions {
3051 global_mir: optimized_plan.clone(),
3052 physical_plan: physical_plan.clone(),
3053 dataflow_metainfos: metainfo.clone(),
3054 optimizer_features: OptimizerFeatures::from(
3055 self.catalog().system_config(),
3056 ),
3057 },
3058 );
3059 (optimized_plan, physical_plan, metainfo)
3060 }
3061 };
3062
3063 let catalog = self.catalog_mut();
3064 catalog.set_optimized_plan(idx.global_id(), optimized_plan);
3065 catalog.set_physical_plan(idx.global_id(), physical_plan);
3066 catalog.set_dataflow_metainfo(idx.global_id(), metainfo);
3067
3068 compute_instance.insert_collection(idx.global_id());
3069 }
3070 CatalogItem::MaterializedView(mv) => {
3071 let compute_instance =
3073 instance_snapshots.entry(mv.cluster_id).or_insert_with(|| {
3074 self.instance_snapshot(mv.cluster_id)
3075 .expect("compute instance exists")
3076 });
3077 let global_id = mv.global_id();
3078
3079 let (optimized_plan, physical_plan, metainfo) =
3080 match cached_global_exprs.remove(&global_id) {
3081 Some(global_expressions)
3082 if global_expressions.optimizer_features
3083 == optimizer_config.features =>
3084 {
3085 debug!("global expression cache hit for {global_id:?}");
3086 (
3087 global_expressions.global_mir,
3088 global_expressions.physical_plan,
3089 global_expressions.dataflow_metainfos,
3090 )
3091 }
3092 Some(_) | None => {
3093 let (_, internal_view_id) = self.allocate_transient_id();
3094 let debug_name = self
3095 .catalog()
3096 .resolve_full_name(entry.name(), None)
3097 .to_string();
3098 let force_non_monotonic = Default::default();
3099
3100 let (optimized_plan, global_lir_plan) = {
3101 let mut optimizer = optimize::materialized_view::Optimizer::new(
3103 self.owned_catalog().as_optimizer_catalog(),
3104 compute_instance.clone(),
3105 global_id,
3106 internal_view_id,
3107 mv.desc.iter_names().cloned().collect(),
3108 mv.non_null_assertions.clone(),
3109 mv.refresh_schedule.clone(),
3110 debug_name,
3111 optimizer_config.clone(),
3112 self.optimizer_metrics(),
3113 force_non_monotonic,
3114 );
3115
3116 let global_mir_plan =
3118 optimizer.optimize(mv.optimized_expr.as_ref().clone())?;
3119 let optimized_plan = global_mir_plan.df_desc().clone();
3120
3121 let global_lir_plan = optimizer.optimize(global_mir_plan)?;
3123
3124 (optimized_plan, global_lir_plan)
3125 };
3126
3127 let (physical_plan, metainfo) = global_lir_plan.unapply();
3128 let metainfo = {
3129 let notice_ids =
3131 std::iter::repeat_with(|| self.allocate_transient_id())
3132 .map(|(_item_id, global_id)| global_id)
3133 .take(metainfo.optimizer_notices.len())
3134 .collect::<Vec<_>>();
3135 self.catalog().render_notices(
3137 metainfo,
3138 notice_ids,
3139 Some(mv.global_id()),
3140 )
3141 };
3142 uncached_expressions.insert(
3143 global_id,
3144 GlobalExpressions {
3145 global_mir: optimized_plan.clone(),
3146 physical_plan: physical_plan.clone(),
3147 dataflow_metainfos: metainfo.clone(),
3148 optimizer_features: OptimizerFeatures::from(
3149 self.catalog().system_config(),
3150 ),
3151 },
3152 );
3153 (optimized_plan, physical_plan, metainfo)
3154 }
3155 };
3156
3157 let catalog = self.catalog_mut();
3158 catalog.set_optimized_plan(mv.global_id(), optimized_plan);
3159 catalog.set_physical_plan(mv.global_id(), physical_plan);
3160 catalog.set_dataflow_metainfo(mv.global_id(), metainfo);
3161
3162 compute_instance.insert_collection(mv.global_id());
3163 }
3164 CatalogItem::ContinualTask(ct) => {
3165 let compute_instance =
3166 instance_snapshots.entry(ct.cluster_id).or_insert_with(|| {
3167 self.instance_snapshot(ct.cluster_id)
3168 .expect("compute instance exists")
3169 });
3170 let global_id = ct.global_id();
3171
3172 let (optimized_plan, physical_plan, metainfo) =
3173 match cached_global_exprs.remove(&global_id) {
3174 Some(global_expressions)
3175 if global_expressions.optimizer_features
3176 == optimizer_config.features =>
3177 {
3178 debug!("global expression cache hit for {global_id:?}");
3179 (
3180 global_expressions.global_mir,
3181 global_expressions.physical_plan,
3182 global_expressions.dataflow_metainfos,
3183 )
3184 }
3185 Some(_) | None => {
3186 let debug_name = self
3187 .catalog()
3188 .resolve_full_name(entry.name(), None)
3189 .to_string();
3190 let (optimized_plan, physical_plan, metainfo) = self
3191 .optimize_create_continual_task(
3192 ct,
3193 global_id,
3194 self.owned_catalog(),
3195 debug_name,
3196 )?;
3197 uncached_expressions.insert(
3198 global_id,
3199 GlobalExpressions {
3200 global_mir: optimized_plan.clone(),
3201 physical_plan: physical_plan.clone(),
3202 dataflow_metainfos: metainfo.clone(),
3203 optimizer_features: OptimizerFeatures::from(
3204 self.catalog().system_config(),
3205 ),
3206 },
3207 );
3208 (optimized_plan, physical_plan, metainfo)
3209 }
3210 };
3211
3212 let catalog = self.catalog_mut();
3213 catalog.set_optimized_plan(ct.global_id(), optimized_plan);
3214 catalog.set_physical_plan(ct.global_id(), physical_plan);
3215 catalog.set_dataflow_metainfo(ct.global_id(), metainfo);
3216
3217 compute_instance.insert_collection(ct.global_id());
3218 }
3219 _ => (),
3220 }
3221 }
3222
3223 Ok(uncached_expressions)
3224 }
3225
3226 async fn bootstrap_dataflow_as_ofs(&mut self) -> BTreeMap<GlobalId, ReadHold<Timestamp>> {
3236 let mut catalog_ids = Vec::new();
3237 let mut dataflows = Vec::new();
3238 let mut read_policies = BTreeMap::new();
3239 for entry in self.catalog.entries() {
3240 let gid = match entry.item() {
3241 CatalogItem::Index(idx) => idx.global_id(),
3242 CatalogItem::MaterializedView(mv) => mv.global_id(),
3243 CatalogItem::ContinualTask(ct) => ct.global_id(),
3244 CatalogItem::Table(_)
3245 | CatalogItem::Source(_)
3246 | CatalogItem::Log(_)
3247 | CatalogItem::View(_)
3248 | CatalogItem::Sink(_)
3249 | CatalogItem::Type(_)
3250 | CatalogItem::Func(_)
3251 | CatalogItem::Secret(_)
3252 | CatalogItem::Connection(_) => continue,
3253 };
3254 if let Some(plan) = self.catalog.try_get_physical_plan(&gid) {
3255 catalog_ids.push(gid);
3256 dataflows.push(plan.clone());
3257
3258 if let Some(compaction_window) = entry.item().initial_logical_compaction_window() {
3259 read_policies.insert(gid, compaction_window.into());
3260 }
3261 }
3262 }
3263
3264 let read_ts = self.get_local_read_ts().await;
3265 let read_holds = as_of_selection::run(
3266 &mut dataflows,
3267 &read_policies,
3268 &*self.controller.storage_collections,
3269 read_ts,
3270 self.controller.read_only(),
3271 );
3272
3273 let catalog = self.catalog_mut();
3274 for (id, plan) in catalog_ids.into_iter().zip_eq(dataflows) {
3275 catalog.set_physical_plan(id, plan);
3276 }
3277
3278 read_holds
3279 }
3280
3281 fn serve(
3290 mut self,
3291 mut internal_cmd_rx: mpsc::UnboundedReceiver<Message>,
3292 mut strict_serializable_reads_rx: mpsc::UnboundedReceiver<(ConnectionId, PendingReadTxn)>,
3293 mut cmd_rx: mpsc::UnboundedReceiver<(OpenTelemetryContext, Command)>,
3294 group_commit_rx: appends::GroupCommitWaiter,
3295 ) -> LocalBoxFuture<'static, ()> {
3296 async move {
3297 let mut cluster_events = self.controller.events_stream();
3299 let last_message = Arc::new(Mutex::new(LastMessage {
3300 kind: "none",
3301 stmt: None,
3302 }));
3303
3304 let (idle_tx, mut idle_rx) = tokio::sync::mpsc::channel(1);
3305 let idle_metric = self.metrics.queue_busy_seconds.clone();
3306 let last_message_watchdog = Arc::clone(&last_message);
3307
3308 spawn(|| "coord watchdog", async move {
3309 let mut interval = tokio::time::interval(Duration::from_secs(5));
3314 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
3318
3319 let mut coord_stuck = false;
3321
3322 loop {
3323 interval.tick().await;
3324
3325 let duration = tokio::time::Duration::from_secs(30);
3327 let timeout = tokio::time::timeout(duration, idle_tx.reserve()).await;
3328 let Ok(maybe_permit) = timeout else {
3329 if !coord_stuck {
3331 let last_message = last_message_watchdog.lock().expect("poisoned");
3332 tracing::warn!(
3333 last_message_kind = %last_message.kind,
3334 last_message_sql = %last_message.stmt_to_string(),
3335 "coordinator stuck for {duration:?}",
3336 );
3337 }
3338 coord_stuck = true;
3339
3340 continue;
3341 };
3342
3343 if coord_stuck {
3345 tracing::info!("Coordinator became unstuck");
3346 }
3347 coord_stuck = false;
3348
3349 let Ok(permit) = maybe_permit else {
3351 break;
3352 };
3353
3354 permit.send(idle_metric.start_timer());
3355 }
3356 });
3357
3358 self.schedule_storage_usage_collection().await;
3359 self.spawn_privatelink_vpc_endpoints_watch_task();
3360 self.spawn_statement_logging_task();
3361 flags::tracing_config(self.catalog.system_config()).apply(&self.tracing_handle);
3362
3363 let warn_threshold = self
3365 .catalog()
3366 .system_config()
3367 .coord_slow_message_warn_threshold();
3368
3369 const MESSAGE_BATCH: usize = 64;
3371 let mut messages = Vec::with_capacity(MESSAGE_BATCH);
3372 let mut cmd_messages = Vec::with_capacity(MESSAGE_BATCH);
3373
3374 let message_batch = self.metrics.message_batch.clone();
3375
3376 loop {
3377 select! {
3381 biased;
3386
3387 _ = internal_cmd_rx.recv_many(&mut messages, MESSAGE_BATCH) => {},
3391 Some(event) = cluster_events.next() => messages.push(Message::ClusterEvent(event)),
3395 () = self.controller.ready() => {
3399 let controller = match self.controller.get_readiness() {
3403 Readiness::Storage => ControllerReadiness::Storage,
3404 Readiness::Compute => ControllerReadiness::Compute,
3405 Readiness::Metrics(_) => ControllerReadiness::Metrics,
3406 Readiness::Internal(_) => ControllerReadiness::Internal,
3407 Readiness::NotReady => unreachable!("just signaled as ready"),
3408 };
3409 messages.push(Message::ControllerReady { controller });
3410 }
3411 permit = group_commit_rx.ready() => {
3414 let user_write_spans = self.pending_writes.iter().flat_map(|x| match x {
3420 PendingWriteTxn::User{span, ..} => Some(span),
3421 PendingWriteTxn::System{..} => None,
3422 });
3423 let span = match user_write_spans.exactly_one() {
3424 Ok(span) => span.clone(),
3425 Err(user_write_spans) => {
3426 let span = info_span!(parent: None, "group_commit_notify");
3427 for s in user_write_spans {
3428 span.follows_from(s);
3429 }
3430 span
3431 }
3432 };
3433 messages.push(Message::GroupCommitInitiate(span, Some(permit)));
3434 },
3435 count = cmd_rx.recv_many(&mut cmd_messages, MESSAGE_BATCH) => {
3439 if count == 0 {
3440 break;
3441 } else {
3442 messages.extend(cmd_messages.drain(..).map(|(otel_ctx, cmd)| Message::Command(otel_ctx, cmd)));
3443 }
3444 },
3445 Some(pending_read_txn) = strict_serializable_reads_rx.recv() => {
3449 let mut pending_read_txns = vec![pending_read_txn];
3450 while let Ok(pending_read_txn) = strict_serializable_reads_rx.try_recv() {
3451 pending_read_txns.push(pending_read_txn);
3452 }
3453 for (conn_id, pending_read_txn) in pending_read_txns {
3454 let prev = self.pending_linearize_read_txns.insert(conn_id, pending_read_txn);
3455 soft_assert_or_log!(
3456 prev.is_none(),
3457 "connections can not have multiple concurrent reads, prev: {prev:?}"
3458 )
3459 }
3460 messages.push(Message::LinearizeReads);
3461 }
3462 _ = self.advance_timelines_interval.tick() => {
3466 let span = info_span!(parent: None, "coord::advance_timelines_interval");
3467 span.follows_from(Span::current());
3468
3469 if self.controller.read_only() {
3474 messages.push(Message::AdvanceTimelines);
3475 } else {
3476 messages.push(Message::GroupCommitInitiate(span, None));
3477 }
3478 },
3479 _ = self.check_cluster_scheduling_policies_interval.tick() => {
3483 messages.push(Message::CheckSchedulingPolicies);
3484 },
3485
3486 _ = self.caught_up_check_interval.tick() => {
3490 self.maybe_check_caught_up().await;
3495
3496 continue;
3497 },
3498
3499 timer = idle_rx.recv() => {
3504 timer.expect("does not drop").observe_duration();
3505 self.metrics
3506 .message_handling
3507 .with_label_values(&["watchdog"])
3508 .observe(0.0);
3509 continue;
3510 }
3511 };
3512
3513 message_batch.observe(f64::cast_lossy(messages.len()));
3515
3516 for msg in messages.drain(..) {
3517 let msg_kind = msg.kind();
3520 let span = span!(
3521 target: "mz_adapter::coord::handle_message_loop",
3522 Level::INFO,
3523 "coord::handle_message",
3524 kind = msg_kind
3525 );
3526 let otel_context = span.context().span().span_context().clone();
3527
3528 *last_message.lock().expect("poisoned") = LastMessage {
3532 kind: msg_kind,
3533 stmt: match &msg {
3534 Message::Command(
3535 _,
3536 Command::Execute {
3537 portal_name,
3538 session,
3539 ..
3540 },
3541 ) => session
3542 .get_portal_unverified(portal_name)
3543 .and_then(|p| p.stmt.as_ref().map(Arc::clone)),
3544 _ => None,
3545 },
3546 };
3547
3548 let start = Instant::now();
3549 self.handle_message(msg).instrument(span).await;
3550 let duration = start.elapsed();
3551
3552 self.metrics
3553 .message_handling
3554 .with_label_values(&[msg_kind])
3555 .observe(duration.as_secs_f64());
3556
3557 if duration > warn_threshold {
3559 let trace_id = otel_context.is_valid().then(|| otel_context.trace_id());
3560 tracing::error!(
3561 ?msg_kind,
3562 ?trace_id,
3563 ?duration,
3564 "very slow coordinator message"
3565 );
3566 }
3567 }
3568 }
3569 if let Some(catalog) = Arc::into_inner(self.catalog) {
3572 catalog.expire().await;
3573 }
3574 }
3575 .boxed_local()
3576 }
3577
3578 fn catalog(&self) -> &Catalog {
3580 &self.catalog
3581 }
3582
3583 fn owned_catalog(&self) -> Arc<Catalog> {
3586 Arc::clone(&self.catalog)
3587 }
3588
3589 fn optimizer_metrics(&self) -> OptimizerMetrics {
3592 self.optimizer_metrics.clone()
3593 }
3594
3595 fn catalog_mut(&mut self) -> &mut Catalog {
3597 Arc::make_mut(&mut self.catalog)
3605 }
3606
3607 fn connection_context(&self) -> &ConnectionContext {
3609 self.controller.connection_context()
3610 }
3611
3612 fn secrets_reader(&self) -> &Arc<dyn SecretsReader> {
3614 &self.connection_context().secrets_reader
3615 }
3616
3617 #[allow(dead_code)]
3622 pub(crate) fn broadcast_notice(&self, notice: AdapterNotice) {
3623 for meta in self.active_conns.values() {
3624 let _ = meta.notice_tx.send(notice.clone());
3625 }
3626 }
3627
3628 pub(crate) fn broadcast_notice_tx(
3631 &self,
3632 ) -> Box<dyn FnOnce(AdapterNotice) -> () + Send + 'static> {
3633 let senders: Vec<_> = self
3634 .active_conns
3635 .values()
3636 .map(|meta| meta.notice_tx.clone())
3637 .collect();
3638 Box::new(move |notice| {
3639 for tx in senders {
3640 let _ = tx.send(notice.clone());
3641 }
3642 })
3643 }
3644
3645 pub(crate) fn active_conns(&self) -> &BTreeMap<ConnectionId, ConnMeta> {
3646 &self.active_conns
3647 }
3648
3649 #[instrument(level = "debug")]
3650 pub(crate) fn retire_execution(
3651 &mut self,
3652 reason: StatementEndedExecutionReason,
3653 ctx_extra: ExecuteContextExtra,
3654 ) {
3655 if let Some(uuid) = ctx_extra.retire() {
3656 self.end_statement_execution(uuid, reason);
3657 }
3658 }
3659
3660 #[instrument(level = "debug")]
3662 pub fn dataflow_builder(&self, instance: ComputeInstanceId) -> DataflowBuilder<'_> {
3663 let compute = self
3664 .instance_snapshot(instance)
3665 .expect("compute instance does not exist");
3666 DataflowBuilder::new(self.catalog().state(), compute)
3667 }
3668
3669 pub fn instance_snapshot(
3671 &self,
3672 id: ComputeInstanceId,
3673 ) -> Result<ComputeInstanceSnapshot, InstanceMissing> {
3674 ComputeInstanceSnapshot::new(&self.controller, id)
3675 }
3676
3677 pub(crate) async fn ship_dataflow(
3680 &mut self,
3681 dataflow: DataflowDescription<Plan>,
3682 instance: ComputeInstanceId,
3683 subscribe_target_replica: Option<ReplicaId>,
3684 ) {
3685 let export_ids = dataflow.exported_index_ids().collect();
3688
3689 self.controller
3690 .compute
3691 .create_dataflow(instance, dataflow, subscribe_target_replica)
3692 .unwrap_or_terminate("dataflow creation cannot fail");
3693
3694 self.initialize_compute_read_policies(export_ids, instance, CompactionWindow::Default)
3695 .await;
3696 }
3697
3698 pub(crate) async fn ship_dataflow_and_notice_builtin_table_updates(
3700 &mut self,
3701 dataflow: DataflowDescription<Plan>,
3702 instance: ComputeInstanceId,
3703 notice_builtin_updates_fut: Option<BuiltinTableAppendNotify>,
3704 ) {
3705 if let Some(notice_builtin_updates_fut) = notice_builtin_updates_fut {
3706 let ship_dataflow_fut = self.ship_dataflow(dataflow, instance, None);
3707 let ((), ()) =
3708 futures::future::join(notice_builtin_updates_fut, ship_dataflow_fut).await;
3709 } else {
3710 self.ship_dataflow(dataflow, instance, None).await;
3711 }
3712 }
3713
3714 pub fn install_compute_watch_set(
3718 &mut self,
3719 conn_id: ConnectionId,
3720 objects: BTreeSet<GlobalId>,
3721 t: Timestamp,
3722 state: WatchSetResponse,
3723 ) {
3724 let ws_id = self.controller.install_compute_watch_set(objects, t);
3725 self.connection_watch_sets
3726 .entry(conn_id.clone())
3727 .or_default()
3728 .insert(ws_id);
3729 self.installed_watch_sets.insert(ws_id, (conn_id, state));
3730 }
3731
3732 pub fn install_storage_watch_set(
3736 &mut self,
3737 conn_id: ConnectionId,
3738 objects: BTreeSet<GlobalId>,
3739 t: Timestamp,
3740 state: WatchSetResponse,
3741 ) {
3742 let ws_id = self.controller.install_storage_watch_set(objects, t);
3743 self.connection_watch_sets
3744 .entry(conn_id.clone())
3745 .or_default()
3746 .insert(ws_id);
3747 self.installed_watch_sets.insert(ws_id, (conn_id, state));
3748 }
3749
3750 pub fn cancel_pending_watchsets(&mut self, conn_id: &ConnectionId) {
3752 if let Some(ws_ids) = self.connection_watch_sets.remove(conn_id) {
3753 for ws_id in ws_ids {
3754 self.installed_watch_sets.remove(&ws_id);
3755 }
3756 }
3757 }
3758
3759 pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
3763 let global_timelines: BTreeMap<_, _> = self
3769 .global_timelines
3770 .iter()
3771 .map(|(timeline, state)| (timeline.to_string(), format!("{state:?}")))
3772 .collect();
3773 let active_conns: BTreeMap<_, _> = self
3774 .active_conns
3775 .iter()
3776 .map(|(id, meta)| (id.unhandled().to_string(), format!("{meta:?}")))
3777 .collect();
3778 let txn_read_holds: BTreeMap<_, _> = self
3779 .txn_read_holds
3780 .iter()
3781 .map(|(id, capability)| (id.unhandled().to_string(), format!("{capability:?}")))
3782 .collect();
3783 let pending_peeks: BTreeMap<_, _> = self
3784 .pending_peeks
3785 .iter()
3786 .map(|(id, peek)| (id.to_string(), format!("{peek:?}")))
3787 .collect();
3788 let client_pending_peeks: BTreeMap<_, _> = self
3789 .client_pending_peeks
3790 .iter()
3791 .map(|(id, peek)| {
3792 let peek: BTreeMap<_, _> = peek
3793 .iter()
3794 .map(|(uuid, storage_id)| (uuid.to_string(), storage_id))
3795 .collect();
3796 (id.to_string(), peek)
3797 })
3798 .collect();
3799 let pending_linearize_read_txns: BTreeMap<_, _> = self
3800 .pending_linearize_read_txns
3801 .iter()
3802 .map(|(id, read_txn)| (id.unhandled().to_string(), format!("{read_txn:?}")))
3803 .collect();
3804
3805 let map = serde_json::Map::from_iter([
3806 (
3807 "global_timelines".to_string(),
3808 serde_json::to_value(global_timelines)?,
3809 ),
3810 (
3811 "active_conns".to_string(),
3812 serde_json::to_value(active_conns)?,
3813 ),
3814 (
3815 "txn_read_holds".to_string(),
3816 serde_json::to_value(txn_read_holds)?,
3817 ),
3818 (
3819 "pending_peeks".to_string(),
3820 serde_json::to_value(pending_peeks)?,
3821 ),
3822 (
3823 "client_pending_peeks".to_string(),
3824 serde_json::to_value(client_pending_peeks)?,
3825 ),
3826 (
3827 "pending_linearize_read_txns".to_string(),
3828 serde_json::to_value(pending_linearize_read_txns)?,
3829 ),
3830 ("controller".to_string(), self.controller.dump().await?),
3831 ]);
3832 Ok(serde_json::Value::Object(map))
3833 }
3834
3835 async fn prune_storage_usage_events_on_startup(&self, retention_period: Duration) {
3849 let item_id = self
3850 .catalog()
3851 .resolve_builtin_table(&MZ_STORAGE_USAGE_BY_SHARD);
3852 let global_id = self.catalog.get_entry(&item_id).latest_global_id();
3853 let read_ts = self.get_local_read_ts().await;
3854 let current_contents_fut = self
3855 .controller
3856 .storage_collections
3857 .snapshot(global_id, read_ts);
3858 let internal_cmd_tx = self.internal_cmd_tx.clone();
3859 spawn(|| "storage_usage_prune", async move {
3860 let mut current_contents = current_contents_fut
3861 .await
3862 .unwrap_or_terminate("cannot fail to fetch snapshot");
3863 differential_dataflow::consolidation::consolidate(&mut current_contents);
3864
3865 let cutoff_ts = u128::from(read_ts).saturating_sub(retention_period.as_millis());
3866 let mut expired = Vec::new();
3867 for (row, diff) in current_contents {
3868 assert_eq!(
3869 diff, 1,
3870 "consolidated contents should not contain retractions: ({row:#?}, {diff:#?})"
3871 );
3872 let collection_timestamp = row
3874 .unpack()
3875 .get(3)
3876 .expect("definition of mz_storage_by_shard changed")
3877 .unwrap_timestamptz();
3878 let collection_timestamp = collection_timestamp.timestamp_millis();
3879 let collection_timestamp: u128 = collection_timestamp
3880 .try_into()
3881 .expect("all collections happen after Jan 1 1970");
3882 if collection_timestamp < cutoff_ts {
3883 debug!("pruning storage event {row:?}");
3884 let builtin_update = BuiltinTableUpdate::row(item_id, row, Diff::MINUS_ONE);
3885 expired.push(builtin_update);
3886 }
3887 }
3888
3889 let _ = internal_cmd_tx.send(Message::StorageUsagePrune(expired));
3891 });
3892 }
3893
3894 fn current_credit_consumption_rate(&self) -> Numeric {
3895 self.catalog()
3896 .user_cluster_replicas()
3897 .filter_map(|replica| match &replica.config.location {
3898 ReplicaLocation::Managed(location) => Some(location.size_for_billing()),
3899 ReplicaLocation::Unmanaged(_) => None,
3900 })
3901 .map(|size| {
3902 self.catalog()
3903 .cluster_replica_sizes()
3904 .0
3905 .get(size)
3906 .expect("location size is validated against the cluster replica sizes")
3907 .credits_per_hour
3908 })
3909 .sum()
3910 }
3911}
3912
3913#[cfg(test)]
3914impl Coordinator {
3915 #[allow(dead_code)]
3916 async fn verify_ship_dataflow_no_error(&mut self, dataflow: DataflowDescription<Plan>) {
3917 let compute_instance = ComputeInstanceId::user(1).expect("1 is a valid ID");
3925
3926 let _: () = self.ship_dataflow(dataflow, compute_instance, None).await;
3927 }
3928}
3929
3930struct LastMessage {
3932 kind: &'static str,
3933 stmt: Option<Arc<Statement<Raw>>>,
3934}
3935
3936impl LastMessage {
3937 fn stmt_to_string(&self) -> Cow<'static, str> {
3939 self.stmt
3940 .as_ref()
3941 .map(|stmt| stmt.to_ast_string_redacted().into())
3942 .unwrap_or(Cow::Borrowed("<none>"))
3943 }
3944}
3945
3946impl fmt::Debug for LastMessage {
3947 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3948 f.debug_struct("LastMessage")
3949 .field("kind", &self.kind)
3950 .field("stmt", &self.stmt_to_string())
3951 .finish()
3952 }
3953}
3954
3955impl Drop for LastMessage {
3956 fn drop(&mut self) {
3957 if std::thread::panicking() {
3959 eprintln!("Coordinator panicking, dumping last message\n{self:?}",);
3961 }
3962 }
3963}
3964
3965pub fn serve(
3977 Config {
3978 controller_config,
3979 controller_envd_epoch,
3980 mut storage,
3981 audit_logs_iterator,
3982 timestamp_oracle_url,
3983 unsafe_mode,
3984 all_features,
3985 build_info,
3986 environment_id,
3987 metrics_registry,
3988 now,
3989 secrets_controller,
3990 cloud_resource_controller,
3991 cluster_replica_sizes,
3992 builtin_system_cluster_config,
3993 builtin_catalog_server_cluster_config,
3994 builtin_probe_cluster_config,
3995 builtin_support_cluster_config,
3996 builtin_analytics_cluster_config,
3997 system_parameter_defaults,
3998 availability_zones,
3999 storage_usage_client,
4000 storage_usage_collection_interval,
4001 storage_usage_retention_period,
4002 segment_client,
4003 egress_addresses,
4004 aws_account_id,
4005 aws_privatelink_availability_zones,
4006 connection_context,
4007 connection_limit_callback,
4008 remote_system_parameters,
4009 webhook_concurrency_limit,
4010 http_host_name,
4011 tracing_handle,
4012 read_only_controllers,
4013 caught_up_trigger: clusters_caught_up_trigger,
4014 helm_chart_version,
4015 license_key,
4016 external_login_password_mz_system,
4017 }: Config,
4018) -> BoxFuture<'static, Result<(Handle, Client), AdapterError>> {
4019 async move {
4020 let coord_start = Instant::now();
4021 info!("startup: coordinator init: beginning");
4022 info!("startup: coordinator init: preamble beginning");
4023
4024 let _builtins = LazyLock::force(&BUILTINS_STATIC);
4028
4029 let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
4030 let (internal_cmd_tx, internal_cmd_rx) = mpsc::unbounded_channel();
4031 let (strict_serializable_reads_tx, strict_serializable_reads_rx) =
4032 mpsc::unbounded_channel();
4033
4034 if !availability_zones.iter().all_unique() {
4036 coord_bail!("availability zones must be unique");
4037 }
4038
4039 let aws_principal_context = match (
4040 aws_account_id,
4041 connection_context.aws_external_id_prefix.clone(),
4042 ) {
4043 (Some(aws_account_id), Some(aws_external_id_prefix)) => Some(AwsPrincipalContext {
4044 aws_account_id,
4045 aws_external_id_prefix,
4046 }),
4047 _ => None,
4048 };
4049
4050 let aws_privatelink_availability_zones = aws_privatelink_availability_zones
4051 .map(|azs_vec| BTreeSet::from_iter(azs_vec.iter().cloned()));
4052
4053 info!(
4054 "startup: coordinator init: preamble complete in {:?}",
4055 coord_start.elapsed()
4056 );
4057 let oracle_init_start = Instant::now();
4058 info!("startup: coordinator init: timestamp oracle init beginning");
4059
4060 let pg_timestamp_oracle_config = timestamp_oracle_url
4061 .map(|pg_url| PostgresTimestampOracleConfig::new(&pg_url, &metrics_registry));
4062 let mut initial_timestamps =
4063 get_initial_oracle_timestamps(&pg_timestamp_oracle_config).await?;
4064
4065 initial_timestamps
4069 .entry(Timeline::EpochMilliseconds)
4070 .or_insert_with(mz_repr::Timestamp::minimum);
4071 let mut timestamp_oracles = BTreeMap::new();
4072 for (timeline, initial_timestamp) in initial_timestamps {
4073 Coordinator::ensure_timeline_state_with_initial_time(
4074 &timeline,
4075 initial_timestamp,
4076 now.clone(),
4077 pg_timestamp_oracle_config.clone(),
4078 &mut timestamp_oracles,
4079 read_only_controllers,
4080 )
4081 .await;
4082 }
4083
4084 let catalog_upper = storage.current_upper().await;
4088 let epoch_millis_oracle = ×tamp_oracles
4094 .get(&Timeline::EpochMilliseconds)
4095 .expect("inserted above")
4096 .oracle;
4097
4098 let mut boot_ts = if read_only_controllers {
4099 let read_ts = epoch_millis_oracle.read_ts().await;
4100 std::cmp::max(read_ts, catalog_upper)
4101 } else {
4102 epoch_millis_oracle.apply_write(catalog_upper).await;
4105 epoch_millis_oracle.write_ts().await.timestamp
4106 };
4107
4108 info!(
4109 "startup: coordinator init: timestamp oracle init complete in {:?}",
4110 oracle_init_start.elapsed()
4111 );
4112
4113 let catalog_open_start = Instant::now();
4114 info!("startup: coordinator init: catalog open beginning");
4115 let persist_client = controller_config
4116 .persist_clients
4117 .open(controller_config.persist_location.clone())
4118 .await
4119 .context("opening persist client")?;
4120 let builtin_item_migration_config =
4121 BuiltinItemMigrationConfig {
4122 persist_client: persist_client.clone(),
4123 read_only: read_only_controllers,
4124 }
4125 ;
4126 let OpenCatalogResult {
4127 mut catalog,
4128 migrated_storage_collections_0dt,
4129 new_builtin_collections,
4130 builtin_table_updates,
4131 cached_global_exprs,
4132 uncached_local_exprs,
4133 } = Catalog::open(mz_catalog::config::Config {
4134 storage,
4135 metrics_registry: &metrics_registry,
4136 state: mz_catalog::config::StateConfig {
4137 unsafe_mode,
4138 all_features,
4139 build_info,
4140 environment_id: environment_id.clone(),
4141 read_only: read_only_controllers,
4142 now: now.clone(),
4143 boot_ts: boot_ts.clone(),
4144 skip_migrations: false,
4145 cluster_replica_sizes,
4146 builtin_system_cluster_config,
4147 builtin_catalog_server_cluster_config,
4148 builtin_probe_cluster_config,
4149 builtin_support_cluster_config,
4150 builtin_analytics_cluster_config,
4151 system_parameter_defaults,
4152 remote_system_parameters,
4153 availability_zones,
4154 egress_addresses,
4155 aws_principal_context,
4156 aws_privatelink_availability_zones,
4157 connection_context,
4158 http_host_name,
4159 builtin_item_migration_config,
4160 persist_client: persist_client.clone(),
4161 enable_expression_cache_override: None,
4162 helm_chart_version,
4163 external_login_password_mz_system,
4164 license_key: license_key.clone(),
4165 },
4166 })
4167 .await?;
4168
4169 let catalog_upper = catalog.current_upper().await;
4172 boot_ts = std::cmp::max(boot_ts, catalog_upper);
4173
4174 if !read_only_controllers {
4175 epoch_millis_oracle.apply_write(boot_ts).await;
4176 }
4177
4178 info!(
4179 "startup: coordinator init: catalog open complete in {:?}",
4180 catalog_open_start.elapsed()
4181 );
4182
4183 let coord_thread_start = Instant::now();
4184 info!("startup: coordinator init: coordinator thread start beginning");
4185
4186 let session_id = catalog.config().session_id;
4187 let start_instant = catalog.config().start_instant;
4188
4189 let (bootstrap_tx, bootstrap_rx) = oneshot::channel();
4193 let handle = TokioHandle::current();
4194
4195 let metrics = Metrics::register_into(&metrics_registry);
4196 let metrics_clone = metrics.clone();
4197 let optimizer_metrics = OptimizerMetrics::register_into(
4198 &metrics_registry,
4199 catalog.system_config().optimizer_e2e_latency_warning_threshold(),
4200 );
4201 let segment_client_clone = segment_client.clone();
4202 let coord_now = now.clone();
4203 let advance_timelines_interval = tokio::time::interval(catalog.config().timestamp_interval);
4204 let mut check_scheduling_policies_interval = tokio::time::interval(
4205 catalog
4206 .system_config()
4207 .cluster_check_scheduling_policies_interval(),
4208 );
4209 check_scheduling_policies_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
4210
4211 let clusters_caught_up_check_interval = if read_only_controllers {
4212 let dyncfgs = catalog.system_config().dyncfgs();
4213 let interval = WITH_0DT_DEPLOYMENT_CAUGHT_UP_CHECK_INTERVAL.get(dyncfgs);
4214
4215 let mut interval = tokio::time::interval(interval);
4216 interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
4217 interval
4218 } else {
4219 let mut interval = tokio::time::interval(Duration::from_secs(60 * 60));
4227 interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
4228 interval
4229 };
4230
4231 let clusters_caught_up_check =
4232 clusters_caught_up_trigger.map(|trigger| CaughtUpCheckContext {
4233 trigger,
4234 exclude_collections: new_builtin_collections.into_iter().collect(),
4235 });
4236
4237 if let Some(config) = pg_timestamp_oracle_config.as_ref() {
4238 let pg_timestamp_oracle_params =
4241 flags::pg_timstamp_oracle_config(catalog.system_config());
4242 pg_timestamp_oracle_params.apply(config);
4243 }
4244
4245 let connection_limit_callback: Arc<dyn Fn(&SystemVars) + Send + Sync> =
4248 Arc::new(move |system_vars: &SystemVars| {
4249 let limit: u64 = system_vars.max_connections().cast_into();
4250 let superuser_reserved: u64 =
4251 system_vars.superuser_reserved_connections().cast_into();
4252
4253 let superuser_reserved = if superuser_reserved >= limit {
4258 tracing::warn!(
4259 "superuser_reserved ({superuser_reserved}) is greater than max connections ({limit})!"
4260 );
4261 limit
4262 } else {
4263 superuser_reserved
4264 };
4265
4266 (connection_limit_callback)(limit, superuser_reserved);
4267 });
4268 catalog.system_config_mut().register_callback(
4269 &mz_sql::session::vars::MAX_CONNECTIONS,
4270 Arc::clone(&connection_limit_callback),
4271 );
4272 catalog.system_config_mut().register_callback(
4273 &mz_sql::session::vars::SUPERUSER_RESERVED_CONNECTIONS,
4274 connection_limit_callback,
4275 );
4276
4277 let (group_commit_tx, group_commit_rx) = appends::notifier();
4278
4279 let parent_span = tracing::Span::current();
4280 let thread = thread::Builder::new()
4281 .stack_size(3 * stack::STACK_SIZE)
4285 .name("coordinator".to_string())
4286 .spawn(move || {
4287 let span = info_span!(parent: parent_span, "coord::coordinator").entered();
4288
4289 let controller = handle
4290 .block_on({
4291 catalog.initialize_controller(
4292 controller_config,
4293 controller_envd_epoch,
4294 read_only_controllers,
4295 )
4296 })
4297 .unwrap_or_terminate("failed to initialize storage_controller");
4298 let catalog_upper = handle.block_on(catalog.current_upper());
4301 boot_ts = std::cmp::max(boot_ts, catalog_upper);
4302 if !read_only_controllers {
4303 let epoch_millis_oracle = ×tamp_oracles
4304 .get(&Timeline::EpochMilliseconds)
4305 .expect("inserted above")
4306 .oracle;
4307 handle.block_on(epoch_millis_oracle.apply_write(boot_ts));
4308 }
4309
4310 let catalog = Arc::new(catalog);
4311
4312 let caching_secrets_reader = CachingSecretsReader::new(secrets_controller.reader());
4313 let mut coord = Coordinator {
4314 controller,
4315 catalog,
4316 internal_cmd_tx,
4317 group_commit_tx,
4318 strict_serializable_reads_tx,
4319 global_timelines: timestamp_oracles,
4320 transient_id_gen: Arc::new(TransientIdGen::new()),
4321 active_conns: BTreeMap::new(),
4322 txn_read_holds: Default::default(),
4323 pending_peeks: BTreeMap::new(),
4324 client_pending_peeks: BTreeMap::new(),
4325 pending_linearize_read_txns: BTreeMap::new(),
4326 serialized_ddl: LockedVecDeque::new(),
4327 active_compute_sinks: BTreeMap::new(),
4328 active_webhooks: BTreeMap::new(),
4329 active_copies: BTreeMap::new(),
4330 staged_cancellation: BTreeMap::new(),
4331 introspection_subscribes: BTreeMap::new(),
4332 write_locks: BTreeMap::new(),
4333 deferred_write_ops: BTreeMap::new(),
4334 pending_writes: Vec::new(),
4335 advance_timelines_interval,
4336 secrets_controller,
4337 caching_secrets_reader,
4338 cloud_resource_controller,
4339 storage_usage_client,
4340 storage_usage_collection_interval,
4341 segment_client,
4342 metrics,
4343 optimizer_metrics,
4344 tracing_handle,
4345 statement_logging: StatementLogging::new(coord_now.clone()),
4346 webhook_concurrency_limit,
4347 pg_timestamp_oracle_config,
4348 check_cluster_scheduling_policies_interval: check_scheduling_policies_interval,
4349 cluster_scheduling_decisions: BTreeMap::new(),
4350 caught_up_check_interval: clusters_caught_up_check_interval,
4351 caught_up_check: clusters_caught_up_check,
4352 installed_watch_sets: BTreeMap::new(),
4353 connection_watch_sets: BTreeMap::new(),
4354 cluster_replica_statuses: ClusterReplicaStatuses::new(),
4355 read_only_controllers,
4356 buffered_builtin_table_updates: Some(Vec::new()),
4357 license_key,
4358 persist_client,
4359 };
4360 let bootstrap = handle.block_on(async {
4361 coord
4362 .bootstrap(
4363 boot_ts,
4364 migrated_storage_collections_0dt,
4365 builtin_table_updates,
4366 cached_global_exprs,
4367 uncached_local_exprs,
4368 audit_logs_iterator,
4369 )
4370 .await?;
4371 coord
4372 .controller
4373 .remove_orphaned_replicas(
4374 coord.catalog().get_next_user_replica_id().await?,
4375 coord.catalog().get_next_system_replica_id().await?,
4376 )
4377 .await
4378 .map_err(AdapterError::Orchestrator)?;
4379
4380 if let Some(retention_period) = storage_usage_retention_period {
4381 coord
4382 .prune_storage_usage_events_on_startup(retention_period)
4383 .await;
4384 }
4385
4386 Ok(())
4387 });
4388 let ok = bootstrap.is_ok();
4389 drop(span);
4390 bootstrap_tx
4391 .send(bootstrap)
4392 .expect("bootstrap_rx is not dropped until it receives this message");
4393 if ok {
4394 handle.block_on(coord.serve(
4395 internal_cmd_rx,
4396 strict_serializable_reads_rx,
4397 cmd_rx,
4398 group_commit_rx,
4399 ));
4400 }
4401 })
4402 .expect("failed to create coordinator thread");
4403 match bootstrap_rx
4404 .await
4405 .expect("bootstrap_tx always sends a message or panics/halts")
4406 {
4407 Ok(()) => {
4408 info!(
4409 "startup: coordinator init: coordinator thread start complete in {:?}",
4410 coord_thread_start.elapsed()
4411 );
4412 info!(
4413 "startup: coordinator init: complete in {:?}",
4414 coord_start.elapsed()
4415 );
4416 let handle = Handle {
4417 session_id,
4418 start_instant,
4419 _thread: thread.join_on_drop(),
4420 };
4421 let client = Client::new(
4422 build_info,
4423 cmd_tx.clone(),
4424 metrics_clone,
4425 now,
4426 environment_id,
4427 segment_client_clone,
4428 );
4429 Ok((handle, client))
4430 }
4431 Err(e) => Err(e),
4432 }
4433 }
4434 .boxed()
4435}
4436
4437async fn get_initial_oracle_timestamps(
4451 pg_timestamp_oracle_config: &Option<PostgresTimestampOracleConfig>,
4452) -> Result<BTreeMap<Timeline, Timestamp>, AdapterError> {
4453 let mut initial_timestamps = BTreeMap::new();
4454
4455 if let Some(pg_timestamp_oracle_config) = pg_timestamp_oracle_config {
4456 let postgres_oracle_timestamps =
4457 PostgresTimestampOracle::<NowFn>::get_all_timelines(pg_timestamp_oracle_config.clone())
4458 .await?;
4459
4460 let debug_msg = || {
4461 postgres_oracle_timestamps
4462 .iter()
4463 .map(|(timeline, ts)| format!("{:?} -> {}", timeline, ts))
4464 .join(", ")
4465 };
4466 info!(
4467 "current timestamps from the postgres-backed timestamp oracle: {}",
4468 debug_msg()
4469 );
4470
4471 for (timeline, ts) in postgres_oracle_timestamps {
4472 let entry = initial_timestamps
4473 .entry(Timeline::from_str(&timeline).expect("could not parse timeline"));
4474
4475 entry
4476 .and_modify(|current_ts| *current_ts = std::cmp::max(*current_ts, ts))
4477 .or_insert(ts);
4478 }
4479 } else {
4480 info!("no postgres url for postgres-backed timestamp oracle configured!");
4481 };
4482
4483 let debug_msg = || {
4484 initial_timestamps
4485 .iter()
4486 .map(|(timeline, ts)| format!("{:?}: {}", timeline, ts))
4487 .join(", ")
4488 };
4489 info!("initial oracle timestamps: {}", debug_msg());
4490
4491 Ok(initial_timestamps)
4492}
4493
4494#[instrument]
4495pub async fn load_remote_system_parameters(
4496 storage: &mut Box<dyn OpenableDurableCatalogState>,
4497 system_parameter_sync_config: Option<SystemParameterSyncConfig>,
4498 system_parameter_sync_timeout: Duration,
4499) -> Result<Option<BTreeMap<String, String>>, AdapterError> {
4500 if let Some(system_parameter_sync_config) = system_parameter_sync_config {
4501 tracing::info!("parameter sync on boot: start sync");
4502
4503 let mut params = SynchronizedParameters::new(SystemVars::default());
4543 let frontend_sync = async {
4544 let frontend = SystemParameterFrontend::from(&system_parameter_sync_config).await?;
4545 frontend.pull(&mut params);
4546 let ops = params
4547 .modified()
4548 .into_iter()
4549 .map(|param| {
4550 let name = param.name;
4551 let value = param.value;
4552 tracing::info!(name, value, initial = true, "sync parameter");
4553 (name, value)
4554 })
4555 .collect();
4556 tracing::info!("parameter sync on boot: end sync");
4557 Ok(Some(ops))
4558 };
4559 if !storage.has_system_config_synced_once().await? {
4560 frontend_sync.await
4561 } else {
4562 match mz_ore::future::timeout(system_parameter_sync_timeout, frontend_sync).await {
4563 Ok(ops) => Ok(ops),
4564 Err(TimeoutError::Inner(e)) => Err(e),
4565 Err(TimeoutError::DeadlineElapsed) => {
4566 tracing::info!("parameter sync on boot: sync has timed out");
4567 Ok(None)
4568 }
4569 }
4570 }
4571 } else {
4572 Ok(None)
4573 }
4574}
4575
4576#[derive(Debug)]
4577pub enum WatchSetResponse {
4578 StatementDependenciesReady(StatementLoggingId, StatementLifecycleEvent),
4579 AlterSinkReady(AlterSinkReadyContext),
4580}
4581
4582#[derive(Debug)]
4583pub struct AlterSinkReadyContext {
4584 ctx: Option<ExecuteContext>,
4585 otel_ctx: OpenTelemetryContext,
4586 plan: AlterSinkPlan,
4587 plan_validity: PlanValidity,
4588 read_hold: ReadHolds<Timestamp>,
4589}
4590
4591impl AlterSinkReadyContext {
4592 fn ctx(&mut self) -> &mut ExecuteContext {
4593 self.ctx.as_mut().expect("only cleared on drop")
4594 }
4595
4596 fn retire(mut self, result: Result<ExecuteResponse, AdapterError>) {
4597 self.ctx
4598 .take()
4599 .expect("only cleared on drop")
4600 .retire(result);
4601 }
4602}
4603
4604impl Drop for AlterSinkReadyContext {
4605 fn drop(&mut self) {
4606 if let Some(ctx) = self.ctx.take() {
4607 ctx.retire(Err(AdapterError::Canceled));
4608 }
4609 }
4610}
4611
4612#[derive(Debug)]
4615struct LockedVecDeque<T> {
4616 items: VecDeque<T>,
4617 lock: Arc<tokio::sync::Mutex<()>>,
4618}
4619
4620impl<T> LockedVecDeque<T> {
4621 pub fn new() -> Self {
4622 Self {
4623 items: VecDeque::new(),
4624 lock: Arc::new(tokio::sync::Mutex::new(())),
4625 }
4626 }
4627
4628 pub fn try_lock_owned(&self) -> Result<OwnedMutexGuard<()>, tokio::sync::TryLockError> {
4629 Arc::clone(&self.lock).try_lock_owned()
4630 }
4631
4632 pub fn is_empty(&self) -> bool {
4633 self.items.is_empty()
4634 }
4635
4636 pub fn push_back(&mut self, value: T) {
4637 self.items.push_back(value)
4638 }
4639
4640 pub fn pop_front(&mut self) -> Option<T> {
4641 self.items.pop_front()
4642 }
4643
4644 pub fn remove(&mut self, index: usize) -> Option<T> {
4645 self.items.remove(index)
4646 }
4647
4648 pub fn iter(&self) -> std::collections::vec_deque::Iter<'_, T> {
4649 self.items.iter()
4650 }
4651}
4652
4653#[derive(Debug)]
4654struct DeferredPlanStatement {
4655 ctx: ExecuteContext,
4656 ps: PlanStatement,
4657}
4658
4659#[derive(Debug)]
4660enum PlanStatement {
4661 Statement {
4662 stmt: Arc<Statement<Raw>>,
4663 params: Params,
4664 },
4665 Plan {
4666 plan: mz_sql::plan::Plan,
4667 resolved_ids: ResolvedIds,
4668 },
4669}
4670
4671#[derive(Debug, Error)]
4672pub enum NetworkPolicyError {
4673 #[error("Access denied for address {0}")]
4674 AddressDenied(IpAddr),
4675 #[error("Access denied missing IP address")]
4676 MissingIp,
4677}
4678
4679pub(crate) fn validate_ip_with_policy_rules(
4680 ip: &IpAddr,
4681 rules: &Vec<NetworkPolicyRule>,
4682) -> Result<(), NetworkPolicyError> {
4683 if rules.iter().any(|r| r.address.0.contains(ip)) {
4686 Ok(())
4687 } else {
4688 Err(NetworkPolicyError::AddressDenied(ip.clone()))
4689 }
4690}