1use std::borrow::Cow;
70use std::collections::{BTreeMap, BTreeSet, VecDeque};
71use std::fmt;
72use std::net::IpAddr;
73use std::num::NonZeroI64;
74use std::ops::Neg;
75use std::str::FromStr;
76use std::sync::LazyLock;
77use std::sync::{Arc, Mutex};
78use std::thread;
79use std::time::{Duration, Instant};
80
81use anyhow::Context;
82use chrono::{DateTime, Utc};
83use derivative::Derivative;
84use differential_dataflow::lattice::Lattice;
85use fail::fail_point;
86use futures::StreamExt;
87use futures::future::{BoxFuture, FutureExt, LocalBoxFuture};
88use http::Uri;
89use ipnet::IpNet;
90use itertools::{Either, Itertools};
91use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
92use mz_adapter_types::compaction::CompactionWindow;
93use mz_adapter_types::connection::ConnectionId;
94use mz_adapter_types::dyncfgs::WITH_0DT_DEPLOYMENT_CAUGHT_UP_CHECK_INTERVAL;
95use mz_auth::password::Password;
96use mz_build_info::BuildInfo;
97use mz_catalog::builtin::{BUILTINS, BUILTINS_STATIC, MZ_AUDIT_EVENTS, MZ_STORAGE_USAGE_BY_SHARD};
98use mz_catalog::config::{AwsPrincipalContext, BuiltinItemMigrationConfig, ClusterReplicaSizeMap};
99use mz_catalog::durable::{AuditLogIterator, OpenableDurableCatalogState};
100use mz_catalog::expr_cache::{GlobalExpressions, LocalExpressions};
101use mz_catalog::memory::objects::{
102 CatalogEntry, CatalogItem, ClusterReplicaProcessStatus, ClusterVariantManaged, Connection,
103 DataSourceDesc, StateDiff, StateUpdate, StateUpdateKind, Table, TableDataSource,
104};
105use mz_cloud_resources::{CloudResourceController, VpcEndpointConfig, VpcEndpointEvent};
106use mz_compute_client::as_of_selection;
107use mz_compute_client::controller::error::InstanceMissing;
108use mz_compute_types::ComputeInstanceId;
109use mz_compute_types::dataflows::DataflowDescription;
110use mz_compute_types::plan::Plan;
111use mz_controller::clusters::{ClusterConfig, ClusterEvent, ClusterStatus, ProcessId};
112use mz_controller::{ControllerConfig, Readiness};
113use mz_controller_types::{ClusterId, ReplicaId, WatchSetId};
114use mz_expr::{MapFilterProject, OptimizedMirRelationExpr, RowSetFinishing};
115use mz_license_keys::ValidatedLicenseKey;
116use mz_orchestrator::OfflineReason;
117use mz_ore::cast::{CastFrom, CastInto, CastLossy};
118use mz_ore::channel::trigger::Trigger;
119use mz_ore::future::TimeoutError;
120use mz_ore::metrics::MetricsRegistry;
121use mz_ore::now::{EpochMillis, NowFn};
122use mz_ore::task::{JoinHandle, spawn};
123use mz_ore::thread::JoinHandleExt;
124use mz_ore::tracing::{OpenTelemetryContext, TracingHandle};
125use mz_ore::url::SensitiveUrl;
126use mz_ore::{
127 assert_none, instrument, soft_assert_eq_or_log, soft_assert_or_log, soft_panic_or_log, stack,
128};
129use mz_persist_client::PersistClient;
130use mz_persist_client::batch::ProtoBatch;
131use mz_persist_client::usage::{ShardsUsageReferenced, StorageUsageClient};
132use mz_repr::explain::{ExplainConfig, ExplainFormat};
133use mz_repr::global_id::TransientIdGen;
134use mz_repr::optimize::OptimizerFeatures;
135use mz_repr::role_id::RoleId;
136use mz_repr::{CatalogItemId, Diff, GlobalId, RelationDesc, Timestamp};
137use mz_secrets::cache::CachingSecretsReader;
138use mz_secrets::{SecretsController, SecretsReader};
139use mz_sql::ast::{Raw, Statement};
140use mz_sql::catalog::{CatalogCluster, EnvironmentId};
141use mz_sql::names::{QualifiedItemName, ResolvedIds, SchemaSpecifier};
142use mz_sql::optimizer_metrics::OptimizerMetrics;
143use mz_sql::plan::{
144 self, AlterSinkPlan, ConnectionDetails, CreateConnectionPlan, NetworkPolicyRule,
145 OnTimeoutAction, Params, QueryWhen,
146};
147use mz_sql::session::user::User;
148use mz_sql::session::vars::SystemVars;
149use mz_sql_parser::ast::ExplainStage;
150use mz_sql_parser::ast::display::AstDisplay;
151use mz_storage_client::client::TableData;
152use mz_storage_client::controller::{CollectionDescription, DataSource, ExportDescription};
153use mz_storage_types::connections::Connection as StorageConnection;
154use mz_storage_types::connections::ConnectionContext;
155use mz_storage_types::connections::inline::{IntoInlineConnection, ReferencedConnection};
156use mz_storage_types::read_holds::ReadHold;
157use mz_storage_types::sinks::{S3SinkFormat, StorageSinkDesc};
158use mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC;
159use mz_storage_types::sources::{IngestionDescription, SourceExport, Timeline};
160use mz_timestamp_oracle::WriteTimestamp;
161use mz_timestamp_oracle::postgres_oracle::{
162 PostgresTimestampOracle, PostgresTimestampOracleConfig,
163};
164use mz_transform::dataflow::DataflowMetainfo;
165use opentelemetry::trace::TraceContextExt;
166use serde::Serialize;
167use thiserror::Error;
168use timely::progress::{Antichain, Timestamp as _};
169use tokio::runtime::Handle as TokioHandle;
170use tokio::select;
171use tokio::sync::{OwnedMutexGuard, mpsc, oneshot, watch};
172use tokio::time::{Interval, MissedTickBehavior};
173use tracing::{Instrument, Level, Span, debug, info, info_span, span, warn};
174use tracing_opentelemetry::OpenTelemetrySpanExt;
175use uuid::Uuid;
176
177use crate::active_compute_sink::{ActiveComputeSink, ActiveCopyFrom};
178use crate::catalog::{BuiltinTableUpdate, Catalog, OpenCatalogResult};
179use crate::client::{Client, Handle};
180use crate::command::{Command, ExecuteResponse};
181use crate::config::{SynchronizedParameters, SystemParameterFrontend, SystemParameterSyncConfig};
182use crate::coord::appends::{
183 BuiltinTableAppendNotify, DeferredOp, GroupCommitPermit, PendingWriteTxn,
184};
185use crate::coord::caught_up::CaughtUpCheckContext;
186use crate::coord::cluster_scheduling::SchedulingDecision;
187use crate::coord::id_bundle::CollectionIdBundle;
188use crate::coord::introspection::IntrospectionSubscribe;
189use crate::coord::peek::PendingPeek;
190use crate::coord::statement_logging::{StatementLogging, StatementLoggingId};
191use crate::coord::timeline::{TimelineContext, TimelineState};
192use crate::coord::timestamp_selection::{TimestampContext, TimestampDetermination};
193use crate::coord::validity::PlanValidity;
194use crate::error::AdapterError;
195use crate::explain::insights::PlanInsightsContext;
196use crate::explain::optimizer_trace::{DispatchGuard, OptimizerTrace};
197use crate::metrics::Metrics;
198use crate::optimize::dataflows::{
199 ComputeInstanceSnapshot, DataflowBuilder, dataflow_import_id_bundle,
200};
201use crate::optimize::{self, Optimize, OptimizerConfig};
202use crate::session::{EndTransactionAction, Session};
203use crate::statement_logging::{StatementEndedExecutionReason, StatementLifecycleEvent};
204use crate::util::{ClientTransmitter, ResultExt};
205use crate::webhook::{WebhookAppenderInvalidator, WebhookConcurrencyLimiter};
206use crate::{AdapterNotice, ReadHolds, flags};
207
208pub(crate) mod id_bundle;
209pub(crate) mod in_memory_oracle;
210pub(crate) mod peek;
211pub(crate) mod statement_logging;
212pub(crate) mod timeline;
213pub(crate) mod timestamp_selection;
214
215pub mod appends;
216mod catalog_serving;
217mod caught_up;
218pub mod cluster_scheduling;
219mod command_handler;
220pub mod consistency;
221mod ddl;
222mod indexes;
223mod introspection;
224mod message_handler;
225mod privatelink_status;
226pub mod read_policy;
227mod sequencer;
228mod sql;
229mod validity;
230
231#[derive(Debug)]
232pub enum Message {
233 Command(OpenTelemetryContext, Command),
234 ControllerReady {
235 controller: ControllerReadiness,
236 },
237 PurifiedStatementReady(PurifiedStatementReady),
238 CreateConnectionValidationReady(CreateConnectionValidationReady),
239 AlterConnectionValidationReady(AlterConnectionValidationReady),
240 TryDeferred {
241 conn_id: ConnectionId,
243 acquired_lock: Option<(CatalogItemId, tokio::sync::OwnedMutexGuard<()>)>,
253 },
254 GroupCommitInitiate(Span, Option<GroupCommitPermit>),
256 DeferredStatementReady,
257 AdvanceTimelines,
258 ClusterEvent(ClusterEvent),
259 CancelPendingPeeks {
260 conn_id: ConnectionId,
261 },
262 LinearizeReads,
263 StagedBatches {
264 conn_id: ConnectionId,
265 table_id: CatalogItemId,
266 batches: Vec<Result<ProtoBatch, String>>,
267 },
268 StorageUsageSchedule,
269 StorageUsageFetch,
270 StorageUsageUpdate(ShardsUsageReferenced),
271 StorageUsagePrune(Vec<BuiltinTableUpdate>),
272 RetireExecute {
275 data: ExecuteContextExtra,
276 otel_ctx: OpenTelemetryContext,
277 reason: StatementEndedExecutionReason,
278 },
279 ExecuteSingleStatementTransaction {
280 ctx: ExecuteContext,
281 otel_ctx: OpenTelemetryContext,
282 stmt: Arc<Statement<Raw>>,
283 params: mz_sql::plan::Params,
284 },
285 PeekStageReady {
286 ctx: ExecuteContext,
287 span: Span,
288 stage: PeekStage,
289 },
290 CreateIndexStageReady {
291 ctx: ExecuteContext,
292 span: Span,
293 stage: CreateIndexStage,
294 },
295 CreateViewStageReady {
296 ctx: ExecuteContext,
297 span: Span,
298 stage: CreateViewStage,
299 },
300 CreateMaterializedViewStageReady {
301 ctx: ExecuteContext,
302 span: Span,
303 stage: CreateMaterializedViewStage,
304 },
305 SubscribeStageReady {
306 ctx: ExecuteContext,
307 span: Span,
308 stage: SubscribeStage,
309 },
310 IntrospectionSubscribeStageReady {
311 span: Span,
312 stage: IntrospectionSubscribeStage,
313 },
314 SecretStageReady {
315 ctx: ExecuteContext,
316 span: Span,
317 stage: SecretStage,
318 },
319 ClusterStageReady {
320 ctx: ExecuteContext,
321 span: Span,
322 stage: ClusterStage,
323 },
324 ExplainTimestampStageReady {
325 ctx: ExecuteContext,
326 span: Span,
327 stage: ExplainTimestampStage,
328 },
329 DrainStatementLog,
330 PrivateLinkVpcEndpointEvents(Vec<VpcEndpointEvent>),
331 CheckSchedulingPolicies,
332
333 SchedulingDecisions(Vec<(&'static str, Vec<(ClusterId, SchedulingDecision)>)>),
338}
339
340impl Message {
341 pub const fn kind(&self) -> &'static str {
343 match self {
344 Message::Command(_, msg) => match msg {
345 Command::CatalogSnapshot { .. } => "command-catalog_snapshot",
346 Command::Startup { .. } => "command-startup",
347 Command::Execute { .. } => "command-execute",
348 Command::Commit { .. } => "command-commit",
349 Command::CancelRequest { .. } => "command-cancel_request",
350 Command::PrivilegedCancelRequest { .. } => "command-privileged_cancel_request",
351 Command::GetWebhook { .. } => "command-get_webhook",
352 Command::GetSystemVars { .. } => "command-get_system_vars",
353 Command::SetSystemVars { .. } => "command-set_system_vars",
354 Command::Terminate { .. } => "command-terminate",
355 Command::RetireExecute { .. } => "command-retire_execute",
356 Command::CheckConsistency { .. } => "command-check_consistency",
357 Command::Dump { .. } => "command-dump",
358 Command::AuthenticatePassword { .. } => "command-auth_check",
359 Command::AuthenticateGetSASLChallenge { .. } => "command-auth_get_sasl_challenge",
360 Command::AuthenticateVerifySASLProof { .. } => "command-auth_verify_sasl_proof",
361 },
362 Message::ControllerReady {
363 controller: ControllerReadiness::Compute,
364 } => "controller_ready(compute)",
365 Message::ControllerReady {
366 controller: ControllerReadiness::Storage,
367 } => "controller_ready(storage)",
368 Message::ControllerReady {
369 controller: ControllerReadiness::Metrics,
370 } => "controller_ready(metrics)",
371 Message::ControllerReady {
372 controller: ControllerReadiness::Internal,
373 } => "controller_ready(internal)",
374 Message::PurifiedStatementReady(_) => "purified_statement_ready",
375 Message::CreateConnectionValidationReady(_) => "create_connection_validation_ready",
376 Message::TryDeferred { .. } => "try_deferred",
377 Message::GroupCommitInitiate(..) => "group_commit_initiate",
378 Message::AdvanceTimelines => "advance_timelines",
379 Message::ClusterEvent(_) => "cluster_event",
380 Message::CancelPendingPeeks { .. } => "cancel_pending_peeks",
381 Message::LinearizeReads => "linearize_reads",
382 Message::StagedBatches { .. } => "staged_batches",
383 Message::StorageUsageSchedule => "storage_usage_schedule",
384 Message::StorageUsageFetch => "storage_usage_fetch",
385 Message::StorageUsageUpdate(_) => "storage_usage_update",
386 Message::StorageUsagePrune(_) => "storage_usage_prune",
387 Message::RetireExecute { .. } => "retire_execute",
388 Message::ExecuteSingleStatementTransaction { .. } => {
389 "execute_single_statement_transaction"
390 }
391 Message::PeekStageReady { .. } => "peek_stage_ready",
392 Message::ExplainTimestampStageReady { .. } => "explain_timestamp_stage_ready",
393 Message::CreateIndexStageReady { .. } => "create_index_stage_ready",
394 Message::CreateViewStageReady { .. } => "create_view_stage_ready",
395 Message::CreateMaterializedViewStageReady { .. } => {
396 "create_materialized_view_stage_ready"
397 }
398 Message::SubscribeStageReady { .. } => "subscribe_stage_ready",
399 Message::IntrospectionSubscribeStageReady { .. } => {
400 "introspection_subscribe_stage_ready"
401 }
402 Message::SecretStageReady { .. } => "secret_stage_ready",
403 Message::ClusterStageReady { .. } => "cluster_stage_ready",
404 Message::DrainStatementLog => "drain_statement_log",
405 Message::AlterConnectionValidationReady(..) => "alter_connection_validation_ready",
406 Message::PrivateLinkVpcEndpointEvents(_) => "private_link_vpc_endpoint_events",
407 Message::CheckSchedulingPolicies => "check_scheduling_policies",
408 Message::SchedulingDecisions { .. } => "scheduling_decision",
409 Message::DeferredStatementReady => "deferred_statement_ready",
410 }
411 }
412}
413
414#[derive(Debug)]
416pub enum ControllerReadiness {
417 Storage,
419 Compute,
421 Metrics,
423 Internal,
425}
426
427#[derive(Derivative)]
428#[derivative(Debug)]
429pub struct BackgroundWorkResult<T> {
430 #[derivative(Debug = "ignore")]
431 pub ctx: ExecuteContext,
432 pub result: Result<T, AdapterError>,
433 pub params: Params,
434 pub plan_validity: PlanValidity,
435 pub original_stmt: Arc<Statement<Raw>>,
436 pub otel_ctx: OpenTelemetryContext,
437}
438
439pub type PurifiedStatementReady = BackgroundWorkResult<mz_sql::pure::PurifiedStatement>;
440
441#[derive(Derivative)]
442#[derivative(Debug)]
443pub struct ValidationReady<T> {
444 #[derivative(Debug = "ignore")]
445 pub ctx: ExecuteContext,
446 pub result: Result<T, AdapterError>,
447 pub resolved_ids: ResolvedIds,
448 pub connection_id: CatalogItemId,
449 pub connection_gid: GlobalId,
450 pub plan_validity: PlanValidity,
451 pub otel_ctx: OpenTelemetryContext,
452}
453
454pub type CreateConnectionValidationReady = ValidationReady<CreateConnectionPlan>;
455pub type AlterConnectionValidationReady = ValidationReady<Connection>;
456
457#[derive(Debug)]
458pub enum PeekStage {
459 LinearizeTimestamp(PeekStageLinearizeTimestamp),
461 RealTimeRecency(PeekStageRealTimeRecency),
462 TimestampReadHold(PeekStageTimestampReadHold),
463 Optimize(PeekStageOptimize),
464 Finish(PeekStageFinish),
466 ExplainPlan(PeekStageExplainPlan),
468 ExplainPushdown(PeekStageExplainPushdown),
469 CopyToPreflight(PeekStageCopyTo),
471 CopyToDataflow(PeekStageCopyTo),
473}
474
475#[derive(Debug)]
476pub struct CopyToContext {
477 pub desc: RelationDesc,
479 pub uri: Uri,
481 pub connection: StorageConnection<ReferencedConnection>,
483 pub connection_id: CatalogItemId,
485 pub format: S3SinkFormat,
487 pub max_file_size: u64,
489 pub output_batch_count: Option<u64>,
494}
495
496#[derive(Debug)]
497pub struct PeekStageLinearizeTimestamp {
498 validity: PlanValidity,
499 plan: mz_sql::plan::SelectPlan,
500 max_query_result_size: Option<u64>,
501 source_ids: BTreeSet<GlobalId>,
502 target_replica: Option<ReplicaId>,
503 timeline_context: TimelineContext,
504 optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
505 explain_ctx: ExplainContext,
508}
509
510#[derive(Debug)]
511pub struct PeekStageRealTimeRecency {
512 validity: PlanValidity,
513 plan: mz_sql::plan::SelectPlan,
514 max_query_result_size: Option<u64>,
515 source_ids: BTreeSet<GlobalId>,
516 target_replica: Option<ReplicaId>,
517 timeline_context: TimelineContext,
518 oracle_read_ts: Option<Timestamp>,
519 optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
520 explain_ctx: ExplainContext,
523}
524
525#[derive(Debug)]
526pub struct PeekStageTimestampReadHold {
527 validity: PlanValidity,
528 plan: mz_sql::plan::SelectPlan,
529 max_query_result_size: Option<u64>,
530 source_ids: BTreeSet<GlobalId>,
531 target_replica: Option<ReplicaId>,
532 timeline_context: TimelineContext,
533 oracle_read_ts: Option<Timestamp>,
534 real_time_recency_ts: Option<mz_repr::Timestamp>,
535 optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
536 explain_ctx: ExplainContext,
539}
540
541#[derive(Debug)]
542pub struct PeekStageOptimize {
543 validity: PlanValidity,
544 plan: mz_sql::plan::SelectPlan,
545 max_query_result_size: Option<u64>,
546 source_ids: BTreeSet<GlobalId>,
547 id_bundle: CollectionIdBundle,
548 target_replica: Option<ReplicaId>,
549 determination: TimestampDetermination<mz_repr::Timestamp>,
550 optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
551 explain_ctx: ExplainContext,
554}
555
556#[derive(Debug)]
557pub struct PeekStageFinish {
558 validity: PlanValidity,
559 plan: mz_sql::plan::SelectPlan,
560 max_query_result_size: Option<u64>,
561 id_bundle: CollectionIdBundle,
562 target_replica: Option<ReplicaId>,
563 source_ids: BTreeSet<GlobalId>,
564 determination: TimestampDetermination<mz_repr::Timestamp>,
565 cluster_id: ComputeInstanceId,
566 finishing: RowSetFinishing,
567 plan_insights_optimizer_trace: Option<OptimizerTrace>,
570 insights_ctx: Option<Box<PlanInsightsContext>>,
571 global_lir_plan: optimize::peek::GlobalLirPlan,
572 optimization_finished_at: EpochMillis,
573}
574
575#[derive(Debug)]
576pub struct PeekStageCopyTo {
577 validity: PlanValidity,
578 optimizer: optimize::copy_to::Optimizer,
579 global_lir_plan: optimize::copy_to::GlobalLirPlan,
580 optimization_finished_at: EpochMillis,
581 source_ids: BTreeSet<GlobalId>,
582}
583
584#[derive(Debug)]
585pub struct PeekStageExplainPlan {
586 validity: PlanValidity,
587 optimizer: optimize::peek::Optimizer,
588 df_meta: DataflowMetainfo,
589 explain_ctx: ExplainPlanContext,
590 insights_ctx: Option<Box<PlanInsightsContext>>,
591}
592
593#[derive(Debug)]
594pub struct PeekStageExplainPushdown {
595 validity: PlanValidity,
596 determination: TimestampDetermination<mz_repr::Timestamp>,
597 imports: BTreeMap<GlobalId, MapFilterProject>,
598}
599
600#[derive(Debug)]
601pub enum CreateIndexStage {
602 Optimize(CreateIndexOptimize),
603 Finish(CreateIndexFinish),
604 Explain(CreateIndexExplain),
605}
606
607#[derive(Debug)]
608pub struct CreateIndexOptimize {
609 validity: PlanValidity,
610 plan: plan::CreateIndexPlan,
611 resolved_ids: ResolvedIds,
612 explain_ctx: ExplainContext,
615}
616
617#[derive(Debug)]
618pub struct CreateIndexFinish {
619 validity: PlanValidity,
620 item_id: CatalogItemId,
621 global_id: GlobalId,
622 plan: plan::CreateIndexPlan,
623 resolved_ids: ResolvedIds,
624 global_mir_plan: optimize::index::GlobalMirPlan,
625 global_lir_plan: optimize::index::GlobalLirPlan,
626}
627
628#[derive(Debug)]
629pub struct CreateIndexExplain {
630 validity: PlanValidity,
631 exported_index_id: GlobalId,
632 plan: plan::CreateIndexPlan,
633 df_meta: DataflowMetainfo,
634 explain_ctx: ExplainPlanContext,
635}
636
637#[derive(Debug)]
638pub enum CreateViewStage {
639 Optimize(CreateViewOptimize),
640 Finish(CreateViewFinish),
641 Explain(CreateViewExplain),
642}
643
644#[derive(Debug)]
645pub struct CreateViewOptimize {
646 validity: PlanValidity,
647 plan: plan::CreateViewPlan,
648 resolved_ids: ResolvedIds,
649 explain_ctx: ExplainContext,
652}
653
654#[derive(Debug)]
655pub struct CreateViewFinish {
656 validity: PlanValidity,
657 item_id: CatalogItemId,
659 global_id: GlobalId,
661 plan: plan::CreateViewPlan,
662 resolved_ids: ResolvedIds,
664 optimized_expr: OptimizedMirRelationExpr,
665}
666
667#[derive(Debug)]
668pub struct CreateViewExplain {
669 validity: PlanValidity,
670 id: GlobalId,
671 plan: plan::CreateViewPlan,
672 explain_ctx: ExplainPlanContext,
673}
674
675#[derive(Debug)]
676pub enum ExplainTimestampStage {
677 Optimize(ExplainTimestampOptimize),
678 RealTimeRecency(ExplainTimestampRealTimeRecency),
679 Finish(ExplainTimestampFinish),
680}
681
682#[derive(Debug)]
683pub struct ExplainTimestampOptimize {
684 validity: PlanValidity,
685 plan: plan::ExplainTimestampPlan,
686 cluster_id: ClusterId,
687}
688
689#[derive(Debug)]
690pub struct ExplainTimestampRealTimeRecency {
691 validity: PlanValidity,
692 format: ExplainFormat,
693 optimized_plan: OptimizedMirRelationExpr,
694 cluster_id: ClusterId,
695 when: QueryWhen,
696}
697
698#[derive(Debug)]
699pub struct ExplainTimestampFinish {
700 validity: PlanValidity,
701 format: ExplainFormat,
702 optimized_plan: OptimizedMirRelationExpr,
703 cluster_id: ClusterId,
704 source_ids: BTreeSet<GlobalId>,
705 when: QueryWhen,
706 real_time_recency_ts: Option<Timestamp>,
707}
708
709#[derive(Debug)]
710pub enum ClusterStage {
711 Alter(AlterCluster),
712 WaitForHydrated(AlterClusterWaitForHydrated),
713 Finalize(AlterClusterFinalize),
714}
715
716#[derive(Debug)]
717pub struct AlterCluster {
718 validity: PlanValidity,
719 plan: plan::AlterClusterPlan,
720}
721
722#[derive(Debug)]
723pub struct AlterClusterWaitForHydrated {
724 validity: PlanValidity,
725 plan: plan::AlterClusterPlan,
726 new_config: ClusterVariantManaged,
727 timeout_time: Instant,
728 on_timeout: OnTimeoutAction,
729}
730
731#[derive(Debug)]
732pub struct AlterClusterFinalize {
733 validity: PlanValidity,
734 plan: plan::AlterClusterPlan,
735 new_config: ClusterVariantManaged,
736}
737
738#[derive(Debug)]
739pub enum ExplainContext {
740 None,
742 Plan(ExplainPlanContext),
744 PlanInsightsNotice(OptimizerTrace),
747 Pushdown,
749}
750
751impl ExplainContext {
752 fn dispatch_guard(&self) -> Option<DispatchGuard<'_>> {
756 let optimizer_trace = match self {
757 ExplainContext::Plan(explain_ctx) => Some(&explain_ctx.optimizer_trace),
758 ExplainContext::PlanInsightsNotice(optimizer_trace) => Some(optimizer_trace),
759 _ => None,
760 };
761 optimizer_trace.map(|optimizer_trace| optimizer_trace.as_guard())
762 }
763
764 fn needs_cluster(&self) -> bool {
765 match self {
766 ExplainContext::None => true,
767 ExplainContext::Plan(..) => false,
768 ExplainContext::PlanInsightsNotice(..) => true,
769 ExplainContext::Pushdown => false,
770 }
771 }
772
773 fn needs_plan_insights(&self) -> bool {
774 matches!(
775 self,
776 ExplainContext::Plan(ExplainPlanContext {
777 stage: ExplainStage::PlanInsights,
778 ..
779 }) | ExplainContext::PlanInsightsNotice(_)
780 )
781 }
782}
783
784#[derive(Debug)]
785pub struct ExplainPlanContext {
786 pub broken: bool,
787 pub config: ExplainConfig,
788 pub format: ExplainFormat,
789 pub stage: ExplainStage,
790 pub replan: Option<GlobalId>,
791 pub desc: Option<RelationDesc>,
792 pub optimizer_trace: OptimizerTrace,
793}
794
795#[derive(Debug)]
796pub enum CreateMaterializedViewStage {
797 Optimize(CreateMaterializedViewOptimize),
798 Finish(CreateMaterializedViewFinish),
799 Explain(CreateMaterializedViewExplain),
800}
801
802#[derive(Debug)]
803pub struct CreateMaterializedViewOptimize {
804 validity: PlanValidity,
805 plan: plan::CreateMaterializedViewPlan,
806 resolved_ids: ResolvedIds,
807 explain_ctx: ExplainContext,
810}
811
812#[derive(Debug)]
813pub struct CreateMaterializedViewFinish {
814 item_id: CatalogItemId,
816 global_id: GlobalId,
818 validity: PlanValidity,
819 plan: plan::CreateMaterializedViewPlan,
820 resolved_ids: ResolvedIds,
821 local_mir_plan: optimize::materialized_view::LocalMirPlan,
822 global_mir_plan: optimize::materialized_view::GlobalMirPlan,
823 global_lir_plan: optimize::materialized_view::GlobalLirPlan,
824}
825
826#[derive(Debug)]
827pub struct CreateMaterializedViewExplain {
828 global_id: GlobalId,
829 validity: PlanValidity,
830 plan: plan::CreateMaterializedViewPlan,
831 df_meta: DataflowMetainfo,
832 explain_ctx: ExplainPlanContext,
833}
834
835#[derive(Debug)]
836pub enum SubscribeStage {
837 OptimizeMir(SubscribeOptimizeMir),
838 TimestampOptimizeLir(SubscribeTimestampOptimizeLir),
839 Finish(SubscribeFinish),
840}
841
842#[derive(Debug)]
843pub struct SubscribeOptimizeMir {
844 validity: PlanValidity,
845 plan: plan::SubscribePlan,
846 timeline: TimelineContext,
847 dependency_ids: BTreeSet<GlobalId>,
848 cluster_id: ComputeInstanceId,
849 replica_id: Option<ReplicaId>,
850}
851
852#[derive(Debug)]
853pub struct SubscribeTimestampOptimizeLir {
854 validity: PlanValidity,
855 plan: plan::SubscribePlan,
856 timeline: TimelineContext,
857 optimizer: optimize::subscribe::Optimizer,
858 global_mir_plan: optimize::subscribe::GlobalMirPlan<optimize::subscribe::Unresolved>,
859 dependency_ids: BTreeSet<GlobalId>,
860 replica_id: Option<ReplicaId>,
861}
862
863#[derive(Debug)]
864pub struct SubscribeFinish {
865 validity: PlanValidity,
866 cluster_id: ComputeInstanceId,
867 replica_id: Option<ReplicaId>,
868 plan: plan::SubscribePlan,
869 global_lir_plan: optimize::subscribe::GlobalLirPlan,
870 dependency_ids: BTreeSet<GlobalId>,
871}
872
873#[derive(Debug)]
874pub enum IntrospectionSubscribeStage {
875 OptimizeMir(IntrospectionSubscribeOptimizeMir),
876 TimestampOptimizeLir(IntrospectionSubscribeTimestampOptimizeLir),
877 Finish(IntrospectionSubscribeFinish),
878}
879
880#[derive(Debug)]
881pub struct IntrospectionSubscribeOptimizeMir {
882 validity: PlanValidity,
883 plan: plan::SubscribePlan,
884 subscribe_id: GlobalId,
885 cluster_id: ComputeInstanceId,
886 replica_id: ReplicaId,
887}
888
889#[derive(Debug)]
890pub struct IntrospectionSubscribeTimestampOptimizeLir {
891 validity: PlanValidity,
892 optimizer: optimize::subscribe::Optimizer,
893 global_mir_plan: optimize::subscribe::GlobalMirPlan<optimize::subscribe::Unresolved>,
894 cluster_id: ComputeInstanceId,
895 replica_id: ReplicaId,
896}
897
898#[derive(Debug)]
899pub struct IntrospectionSubscribeFinish {
900 validity: PlanValidity,
901 global_lir_plan: optimize::subscribe::GlobalLirPlan,
902 read_holds: ReadHolds<Timestamp>,
903 cluster_id: ComputeInstanceId,
904 replica_id: ReplicaId,
905}
906
907#[derive(Debug)]
908pub enum SecretStage {
909 CreateEnsure(CreateSecretEnsure),
910 CreateFinish(CreateSecretFinish),
911 RotateKeysEnsure(RotateKeysSecretEnsure),
912 RotateKeysFinish(RotateKeysSecretFinish),
913 Alter(AlterSecret),
914}
915
916#[derive(Debug)]
917pub struct CreateSecretEnsure {
918 validity: PlanValidity,
919 plan: plan::CreateSecretPlan,
920}
921
922#[derive(Debug)]
923pub struct CreateSecretFinish {
924 validity: PlanValidity,
925 item_id: CatalogItemId,
926 global_id: GlobalId,
927 plan: plan::CreateSecretPlan,
928}
929
930#[derive(Debug)]
931pub struct RotateKeysSecretEnsure {
932 validity: PlanValidity,
933 id: CatalogItemId,
934}
935
936#[derive(Debug)]
937pub struct RotateKeysSecretFinish {
938 validity: PlanValidity,
939 ops: Vec<crate::catalog::Op>,
940}
941
942#[derive(Debug)]
943pub struct AlterSecret {
944 validity: PlanValidity,
945 plan: plan::AlterSecretPlan,
946}
947
948#[derive(Debug, Copy, Clone, PartialEq, Eq)]
953pub enum TargetCluster {
954 CatalogServer,
956 Active,
958 Transaction(ClusterId),
960}
961
962pub(crate) enum StageResult<T> {
964 Handle(JoinHandle<Result<T, AdapterError>>),
966 HandleRetire(JoinHandle<Result<ExecuteResponse, AdapterError>>),
968 Immediate(T),
970 Response(ExecuteResponse),
972}
973
974pub(crate) trait Staged: Send {
976 type Ctx: StagedContext;
977
978 fn validity(&mut self) -> &mut PlanValidity;
979
980 async fn stage(
982 self,
983 coord: &mut Coordinator,
984 ctx: &mut Self::Ctx,
985 ) -> Result<StageResult<Box<Self>>, AdapterError>;
986
987 fn message(self, ctx: Self::Ctx, span: Span) -> Message;
989
990 fn cancel_enabled(&self) -> bool;
992}
993
994pub trait StagedContext {
995 fn retire(self, result: Result<ExecuteResponse, AdapterError>);
996 fn session(&self) -> Option<&Session>;
997}
998
999impl StagedContext for ExecuteContext {
1000 fn retire(self, result: Result<ExecuteResponse, AdapterError>) {
1001 self.retire(result);
1002 }
1003
1004 fn session(&self) -> Option<&Session> {
1005 Some(self.session())
1006 }
1007}
1008
1009impl StagedContext for () {
1010 fn retire(self, _result: Result<ExecuteResponse, AdapterError>) {}
1011
1012 fn session(&self) -> Option<&Session> {
1013 None
1014 }
1015}
1016
1017pub struct Config {
1019 pub controller_config: ControllerConfig,
1020 pub controller_envd_epoch: NonZeroI64,
1021 pub storage: Box<dyn mz_catalog::durable::DurableCatalogState>,
1022 pub audit_logs_iterator: AuditLogIterator,
1023 pub timestamp_oracle_url: Option<SensitiveUrl>,
1024 pub unsafe_mode: bool,
1025 pub all_features: bool,
1026 pub build_info: &'static BuildInfo,
1027 pub environment_id: EnvironmentId,
1028 pub metrics_registry: MetricsRegistry,
1029 pub now: NowFn,
1030 pub secrets_controller: Arc<dyn SecretsController>,
1031 pub cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
1032 pub availability_zones: Vec<String>,
1033 pub cluster_replica_sizes: ClusterReplicaSizeMap,
1034 pub builtin_system_cluster_config: BootstrapBuiltinClusterConfig,
1035 pub builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig,
1036 pub builtin_probe_cluster_config: BootstrapBuiltinClusterConfig,
1037 pub builtin_support_cluster_config: BootstrapBuiltinClusterConfig,
1038 pub builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig,
1039 pub system_parameter_defaults: BTreeMap<String, String>,
1040 pub storage_usage_client: StorageUsageClient,
1041 pub storage_usage_collection_interval: Duration,
1042 pub storage_usage_retention_period: Option<Duration>,
1043 pub segment_client: Option<mz_segment::Client>,
1044 pub egress_addresses: Vec<IpNet>,
1045 pub remote_system_parameters: Option<BTreeMap<String, String>>,
1046 pub aws_account_id: Option<String>,
1047 pub aws_privatelink_availability_zones: Option<Vec<String>>,
1048 pub connection_context: ConnectionContext,
1049 pub connection_limit_callback: Box<dyn Fn(u64, u64) -> () + Send + Sync + 'static>,
1050 pub webhook_concurrency_limit: WebhookConcurrencyLimiter,
1051 pub http_host_name: Option<String>,
1052 pub tracing_handle: TracingHandle,
1053 pub read_only_controllers: bool,
1057
1058 pub caught_up_trigger: Option<Trigger>,
1062
1063 pub helm_chart_version: Option<String>,
1064 pub license_key: ValidatedLicenseKey,
1065 pub external_login_password_mz_system: Option<Password>,
1066}
1067
1068#[derive(Debug, Serialize)]
1070pub struct ConnMeta {
1071 secret_key: u32,
1076 connected_at: EpochMillis,
1078 user: User,
1079 application_name: String,
1080 uuid: Uuid,
1081 conn_id: ConnectionId,
1082 client_ip: Option<IpAddr>,
1083
1084 drop_sinks: BTreeSet<GlobalId>,
1087
1088 #[serde(skip)]
1090 deferred_lock: Option<OwnedMutexGuard<()>>,
1091
1092 pending_cluster_alters: BTreeSet<ClusterId>,
1095
1096 #[serde(skip)]
1098 notice_tx: mpsc::UnboundedSender<AdapterNotice>,
1099
1100 authenticated_role: RoleId,
1104}
1105
1106impl ConnMeta {
1107 pub fn conn_id(&self) -> &ConnectionId {
1108 &self.conn_id
1109 }
1110
1111 pub fn user(&self) -> &User {
1112 &self.user
1113 }
1114
1115 pub fn application_name(&self) -> &str {
1116 &self.application_name
1117 }
1118
1119 pub fn authenticated_role_id(&self) -> &RoleId {
1120 &self.authenticated_role
1121 }
1122
1123 pub fn uuid(&self) -> Uuid {
1124 self.uuid
1125 }
1126
1127 pub fn client_ip(&self) -> Option<IpAddr> {
1128 self.client_ip
1129 }
1130
1131 pub fn connected_at(&self) -> EpochMillis {
1132 self.connected_at
1133 }
1134}
1135
1136#[derive(Debug)]
1137pub struct PendingTxn {
1139 ctx: ExecuteContext,
1141 response: Result<PendingTxnResponse, AdapterError>,
1143 action: EndTransactionAction,
1145}
1146
1147#[derive(Debug)]
1148pub enum PendingTxnResponse {
1150 Committed {
1152 params: BTreeMap<&'static str, String>,
1154 },
1155 Rolledback {
1157 params: BTreeMap<&'static str, String>,
1159 },
1160}
1161
1162impl PendingTxnResponse {
1163 pub fn extend_params(&mut self, p: impl IntoIterator<Item = (&'static str, String)>) {
1164 match self {
1165 PendingTxnResponse::Committed { params }
1166 | PendingTxnResponse::Rolledback { params } => params.extend(p),
1167 }
1168 }
1169}
1170
1171impl From<PendingTxnResponse> for ExecuteResponse {
1172 fn from(value: PendingTxnResponse) -> Self {
1173 match value {
1174 PendingTxnResponse::Committed { params } => {
1175 ExecuteResponse::TransactionCommitted { params }
1176 }
1177 PendingTxnResponse::Rolledback { params } => {
1178 ExecuteResponse::TransactionRolledBack { params }
1179 }
1180 }
1181 }
1182}
1183
1184#[derive(Debug)]
1185pub struct PendingReadTxn {
1187 txn: PendingRead,
1189 timestamp_context: TimestampContext<mz_repr::Timestamp>,
1191 created: Instant,
1193 num_requeues: u64,
1197 otel_ctx: OpenTelemetryContext,
1199}
1200
1201impl PendingReadTxn {
1202 pub fn timestamp_context(&self) -> &TimestampContext<mz_repr::Timestamp> {
1204 &self.timestamp_context
1205 }
1206
1207 pub(crate) fn take_context(self) -> ExecuteContext {
1208 self.txn.take_context()
1209 }
1210}
1211
1212#[derive(Debug)]
1213enum PendingRead {
1215 Read {
1216 txn: PendingTxn,
1218 },
1219 ReadThenWrite {
1220 ctx: ExecuteContext,
1222 tx: oneshot::Sender<Option<ExecuteContext>>,
1225 },
1226}
1227
1228impl PendingRead {
1229 #[instrument(level = "debug")]
1234 pub fn finish(self) -> Option<(ExecuteContext, Result<ExecuteResponse, AdapterError>)> {
1235 match self {
1236 PendingRead::Read {
1237 txn:
1238 PendingTxn {
1239 mut ctx,
1240 response,
1241 action,
1242 },
1243 ..
1244 } => {
1245 let changed = ctx.session_mut().vars_mut().end_transaction(action);
1246 let response = response.map(|mut r| {
1248 r.extend_params(changed);
1249 ExecuteResponse::from(r)
1250 });
1251
1252 Some((ctx, response))
1253 }
1254 PendingRead::ReadThenWrite { ctx, tx, .. } => {
1255 let _ = tx.send(Some(ctx));
1257 None
1258 }
1259 }
1260 }
1261
1262 fn label(&self) -> &'static str {
1263 match self {
1264 PendingRead::Read { .. } => "read",
1265 PendingRead::ReadThenWrite { .. } => "read_then_write",
1266 }
1267 }
1268
1269 pub(crate) fn take_context(self) -> ExecuteContext {
1270 match self {
1271 PendingRead::Read { txn, .. } => txn.ctx,
1272 PendingRead::ReadThenWrite { ctx, tx, .. } => {
1273 let _ = tx.send(None);
1276 ctx
1277 }
1278 }
1279 }
1280}
1281
1282#[derive(Debug, Default)]
1293#[must_use]
1294pub struct ExecuteContextExtra {
1295 statement_uuid: Option<StatementLoggingId>,
1296}
1297
1298impl ExecuteContextExtra {
1299 pub(crate) fn new(statement_uuid: Option<StatementLoggingId>) -> Self {
1300 Self { statement_uuid }
1301 }
1302 pub fn is_trivial(&self) -> bool {
1303 let Self { statement_uuid } = self;
1304 statement_uuid.is_none()
1305 }
1306 pub fn contents(&self) -> Option<StatementLoggingId> {
1307 let Self { statement_uuid } = self;
1308 *statement_uuid
1309 }
1310 #[must_use]
1314 fn retire(mut self) -> Option<StatementLoggingId> {
1315 let Self { statement_uuid } = &mut self;
1316 statement_uuid.take()
1317 }
1318}
1319
1320impl Drop for ExecuteContextExtra {
1321 fn drop(&mut self) {
1322 let Self { statement_uuid } = &*self;
1323 if let Some(statement_uuid) = statement_uuid {
1324 soft_panic_or_log!(
1328 "execute context for statement {statement_uuid:?} dropped without being properly retired."
1329 );
1330 }
1331 }
1332}
1333
1334#[derive(Debug)]
1347pub struct ExecuteContext {
1348 inner: Box<ExecuteContextInner>,
1349}
1350
1351impl std::ops::Deref for ExecuteContext {
1352 type Target = ExecuteContextInner;
1353 fn deref(&self) -> &Self::Target {
1354 &*self.inner
1355 }
1356}
1357
1358impl std::ops::DerefMut for ExecuteContext {
1359 fn deref_mut(&mut self) -> &mut Self::Target {
1360 &mut *self.inner
1361 }
1362}
1363
1364#[derive(Debug)]
1365pub struct ExecuteContextInner {
1366 tx: ClientTransmitter<ExecuteResponse>,
1367 internal_cmd_tx: mpsc::UnboundedSender<Message>,
1368 session: Session,
1369 extra: ExecuteContextExtra,
1370}
1371
1372impl ExecuteContext {
1373 pub fn session(&self) -> &Session {
1374 &self.session
1375 }
1376
1377 pub fn session_mut(&mut self) -> &mut Session {
1378 &mut self.session
1379 }
1380
1381 pub fn tx(&self) -> &ClientTransmitter<ExecuteResponse> {
1382 &self.tx
1383 }
1384
1385 pub fn tx_mut(&mut self) -> &mut ClientTransmitter<ExecuteResponse> {
1386 &mut self.tx
1387 }
1388
1389 pub fn from_parts(
1390 tx: ClientTransmitter<ExecuteResponse>,
1391 internal_cmd_tx: mpsc::UnboundedSender<Message>,
1392 session: Session,
1393 extra: ExecuteContextExtra,
1394 ) -> Self {
1395 Self {
1396 inner: ExecuteContextInner {
1397 tx,
1398 session,
1399 extra,
1400 internal_cmd_tx,
1401 }
1402 .into(),
1403 }
1404 }
1405
1406 pub fn into_parts(
1415 self,
1416 ) -> (
1417 ClientTransmitter<ExecuteResponse>,
1418 mpsc::UnboundedSender<Message>,
1419 Session,
1420 ExecuteContextExtra,
1421 ) {
1422 let ExecuteContextInner {
1423 tx,
1424 internal_cmd_tx,
1425 session,
1426 extra,
1427 } = *self.inner;
1428 (tx, internal_cmd_tx, session, extra)
1429 }
1430
1431 #[instrument(level = "debug")]
1433 pub fn retire(self, result: Result<ExecuteResponse, AdapterError>) {
1434 let ExecuteContextInner {
1435 tx,
1436 internal_cmd_tx,
1437 session,
1438 extra,
1439 } = *self.inner;
1440 let reason = if extra.is_trivial() {
1441 None
1442 } else {
1443 Some((&result).into())
1444 };
1445 tx.send(result, session);
1446 if let Some(reason) = reason {
1447 if let Err(e) = internal_cmd_tx.send(Message::RetireExecute {
1448 otel_ctx: OpenTelemetryContext::obtain(),
1449 data: extra,
1450 reason,
1451 }) {
1452 warn!("internal_cmd_rx dropped before we could send: {:?}", e);
1453 }
1454 }
1455 }
1456
1457 pub fn extra(&self) -> &ExecuteContextExtra {
1458 &self.extra
1459 }
1460
1461 pub fn extra_mut(&mut self) -> &mut ExecuteContextExtra {
1462 &mut self.extra
1463 }
1464}
1465
1466#[derive(Debug)]
1467struct ClusterReplicaStatuses(
1468 BTreeMap<ClusterId, BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>>>,
1469);
1470
1471impl ClusterReplicaStatuses {
1472 pub(crate) fn new() -> ClusterReplicaStatuses {
1473 ClusterReplicaStatuses(BTreeMap::new())
1474 }
1475
1476 pub(crate) fn initialize_cluster_statuses(&mut self, cluster_id: ClusterId) {
1480 let prev = self.0.insert(cluster_id, BTreeMap::new());
1481 assert_eq!(
1482 prev, None,
1483 "cluster {cluster_id} statuses already initialized"
1484 );
1485 }
1486
1487 pub(crate) fn initialize_cluster_replica_statuses(
1491 &mut self,
1492 cluster_id: ClusterId,
1493 replica_id: ReplicaId,
1494 num_processes: usize,
1495 time: DateTime<Utc>,
1496 ) {
1497 tracing::info!(
1498 ?cluster_id,
1499 ?replica_id,
1500 ?time,
1501 "initializing cluster replica status"
1502 );
1503 let replica_statuses = self.0.entry(cluster_id).or_default();
1504 let process_statuses = (0..num_processes)
1505 .map(|process_id| {
1506 let status = ClusterReplicaProcessStatus {
1507 status: ClusterStatus::Offline(Some(OfflineReason::Initializing)),
1508 time: time.clone(),
1509 };
1510 (u64::cast_from(process_id), status)
1511 })
1512 .collect();
1513 let prev = replica_statuses.insert(replica_id, process_statuses);
1514 assert_none!(
1515 prev,
1516 "cluster replica {cluster_id}.{replica_id} statuses already initialized"
1517 );
1518 }
1519
1520 pub(crate) fn remove_cluster_statuses(
1524 &mut self,
1525 cluster_id: &ClusterId,
1526 ) -> BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>> {
1527 let prev = self.0.remove(cluster_id);
1528 prev.unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"))
1529 }
1530
1531 pub(crate) fn remove_cluster_replica_statuses(
1535 &mut self,
1536 cluster_id: &ClusterId,
1537 replica_id: &ReplicaId,
1538 ) -> BTreeMap<ProcessId, ClusterReplicaProcessStatus> {
1539 let replica_statuses = self
1540 .0
1541 .get_mut(cluster_id)
1542 .unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"));
1543 let prev = replica_statuses.remove(replica_id);
1544 prev.unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1545 }
1546
1547 pub(crate) fn ensure_cluster_status(
1551 &mut self,
1552 cluster_id: ClusterId,
1553 replica_id: ReplicaId,
1554 process_id: ProcessId,
1555 status: ClusterReplicaProcessStatus,
1556 ) {
1557 let replica_statuses = self
1558 .0
1559 .get_mut(&cluster_id)
1560 .unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"))
1561 .get_mut(&replica_id)
1562 .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"));
1563 replica_statuses.insert(process_id, status);
1564 }
1565
1566 pub fn get_cluster_replica_status(
1570 &self,
1571 cluster_id: ClusterId,
1572 replica_id: ReplicaId,
1573 ) -> ClusterStatus {
1574 let process_status = self.get_cluster_replica_statuses(cluster_id, replica_id);
1575 Self::cluster_replica_status(process_status)
1576 }
1577
1578 pub fn cluster_replica_status(
1580 process_status: &BTreeMap<ProcessId, ClusterReplicaProcessStatus>,
1581 ) -> ClusterStatus {
1582 process_status
1583 .values()
1584 .fold(ClusterStatus::Online, |s, p| match (s, p.status) {
1585 (ClusterStatus::Online, ClusterStatus::Online) => ClusterStatus::Online,
1586 (x, y) => {
1587 let reason_x = match x {
1588 ClusterStatus::Offline(reason) => reason,
1589 ClusterStatus::Online => None,
1590 };
1591 let reason_y = match y {
1592 ClusterStatus::Offline(reason) => reason,
1593 ClusterStatus::Online => None,
1594 };
1595 ClusterStatus::Offline(reason_x.or(reason_y))
1597 }
1598 })
1599 }
1600
1601 pub(crate) fn get_cluster_replica_statuses(
1605 &self,
1606 cluster_id: ClusterId,
1607 replica_id: ReplicaId,
1608 ) -> &BTreeMap<ProcessId, ClusterReplicaProcessStatus> {
1609 self.try_get_cluster_replica_statuses(cluster_id, replica_id)
1610 .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1611 }
1612
1613 pub(crate) fn try_get_cluster_replica_statuses(
1615 &self,
1616 cluster_id: ClusterId,
1617 replica_id: ReplicaId,
1618 ) -> Option<&BTreeMap<ProcessId, ClusterReplicaProcessStatus>> {
1619 self.try_get_cluster_statuses(cluster_id)
1620 .and_then(|statuses| statuses.get(&replica_id))
1621 }
1622
1623 pub(crate) fn try_get_cluster_statuses(
1625 &self,
1626 cluster_id: ClusterId,
1627 ) -> Option<&BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>>> {
1628 self.0.get(&cluster_id)
1629 }
1630}
1631
1632#[derive(Derivative)]
1634#[derivative(Debug)]
1635pub struct Coordinator {
1636 #[derivative(Debug = "ignore")]
1638 controller: mz_controller::Controller,
1639 catalog: Arc<Catalog>,
1647
1648 persist_client: PersistClient,
1651
1652 internal_cmd_tx: mpsc::UnboundedSender<Message>,
1654 group_commit_tx: appends::GroupCommitNotifier,
1656
1657 strict_serializable_reads_tx: mpsc::UnboundedSender<(ConnectionId, PendingReadTxn)>,
1659
1660 global_timelines: BTreeMap<Timeline, TimelineState<Timestamp>>,
1663
1664 transient_id_gen: Arc<TransientIdGen>,
1666 active_conns: BTreeMap<ConnectionId, ConnMeta>,
1669
1670 txn_read_holds: BTreeMap<ConnectionId, read_policy::ReadHolds<Timestamp>>,
1674
1675 pending_peeks: BTreeMap<Uuid, PendingPeek>,
1679 client_pending_peeks: BTreeMap<ConnectionId, BTreeMap<Uuid, ClusterId>>,
1681
1682 pending_linearize_read_txns: BTreeMap<ConnectionId, PendingReadTxn>,
1684
1685 active_compute_sinks: BTreeMap<GlobalId, ActiveComputeSink>,
1687 active_webhooks: BTreeMap<CatalogItemId, WebhookAppenderInvalidator>,
1689 active_copies: BTreeMap<ConnectionId, ActiveCopyFrom>,
1692
1693 staged_cancellation: BTreeMap<ConnectionId, (watch::Sender<bool>, watch::Receiver<bool>)>,
1696 introspection_subscribes: BTreeMap<GlobalId, IntrospectionSubscribe>,
1698
1699 write_locks: BTreeMap<CatalogItemId, Arc<tokio::sync::Mutex<()>>>,
1701 deferred_write_ops: BTreeMap<ConnectionId, DeferredOp>,
1703
1704 pending_writes: Vec<PendingWriteTxn>,
1706
1707 advance_timelines_interval: Interval,
1717
1718 serialized_ddl: LockedVecDeque<DeferredPlanStatement>,
1727
1728 secrets_controller: Arc<dyn SecretsController>,
1731 caching_secrets_reader: CachingSecretsReader,
1733
1734 cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
1737
1738 storage_usage_client: StorageUsageClient,
1740 storage_usage_collection_interval: Duration,
1742
1743 #[derivative(Debug = "ignore")]
1745 segment_client: Option<mz_segment::Client>,
1746
1747 metrics: Metrics,
1749 optimizer_metrics: OptimizerMetrics,
1751
1752 tracing_handle: TracingHandle,
1754
1755 statement_logging: StatementLogging,
1757
1758 webhook_concurrency_limit: WebhookConcurrencyLimiter,
1760
1761 pg_timestamp_oracle_config: Option<PostgresTimestampOracleConfig>,
1765
1766 check_cluster_scheduling_policies_interval: Interval,
1768
1769 cluster_scheduling_decisions: BTreeMap<ClusterId, BTreeMap<&'static str, SchedulingDecision>>,
1773
1774 caught_up_check_interval: Interval,
1777
1778 caught_up_check: Option<CaughtUpCheckContext>,
1781
1782 installed_watch_sets: BTreeMap<WatchSetId, (ConnectionId, WatchSetResponse)>,
1784
1785 connection_watch_sets: BTreeMap<ConnectionId, BTreeSet<WatchSetId>>,
1787
1788 cluster_replica_statuses: ClusterReplicaStatuses,
1790
1791 read_only_controllers: bool,
1795
1796 buffered_builtin_table_updates: Option<Vec<BuiltinTableUpdate>>,
1804
1805 license_key: ValidatedLicenseKey,
1806}
1807
1808impl Coordinator {
1809 #[instrument(name = "coord::bootstrap")]
1813 pub(crate) async fn bootstrap(
1814 &mut self,
1815 boot_ts: Timestamp,
1816 migrated_storage_collections_0dt: BTreeSet<CatalogItemId>,
1817 mut builtin_table_updates: Vec<BuiltinTableUpdate>,
1818 cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
1819 uncached_local_exprs: BTreeMap<GlobalId, LocalExpressions>,
1820 audit_logs_iterator: AuditLogIterator,
1821 ) -> Result<(), AdapterError> {
1822 let bootstrap_start = Instant::now();
1823 info!("startup: coordinator init: bootstrap beginning");
1824 info!("startup: coordinator init: bootstrap: preamble beginning");
1825
1826 let cluster_statuses: Vec<(_, Vec<_>)> = self
1829 .catalog()
1830 .clusters()
1831 .map(|cluster| {
1832 (
1833 cluster.id(),
1834 cluster
1835 .replicas()
1836 .map(|replica| {
1837 (replica.replica_id, replica.config.location.num_processes())
1838 })
1839 .collect(),
1840 )
1841 })
1842 .collect();
1843 let now = self.now_datetime();
1844 for (cluster_id, replica_statuses) in cluster_statuses {
1845 self.cluster_replica_statuses
1846 .initialize_cluster_statuses(cluster_id);
1847 for (replica_id, num_processes) in replica_statuses {
1848 self.cluster_replica_statuses
1849 .initialize_cluster_replica_statuses(
1850 cluster_id,
1851 replica_id,
1852 num_processes,
1853 now,
1854 );
1855 }
1856 }
1857
1858 let system_config = self.catalog().system_config();
1859
1860 mz_metrics::update_dyncfg(&system_config.dyncfg_updates());
1862
1863 let compute_config = flags::compute_config(system_config);
1865 let storage_config = flags::storage_config(system_config);
1866 let scheduling_config = flags::orchestrator_scheduling_config(system_config);
1867 let dyncfg_updates = system_config.dyncfg_updates();
1868 self.controller.compute.update_configuration(compute_config);
1869 self.controller.storage.update_parameters(storage_config);
1870 self.controller
1871 .update_orchestrator_scheduling_config(scheduling_config);
1872 self.controller.update_configuration(dyncfg_updates);
1873
1874 let mut policies_to_set: BTreeMap<CompactionWindow, CollectionIdBundle> =
1875 Default::default();
1876
1877 let enable_worker_core_affinity =
1878 self.catalog().system_config().enable_worker_core_affinity();
1879 for instance in self.catalog.clusters() {
1880 self.controller.create_cluster(
1881 instance.id,
1882 ClusterConfig {
1883 arranged_logs: instance.log_indexes.clone(),
1884 workload_class: instance.config.workload_class.clone(),
1885 },
1886 )?;
1887 for replica in instance.replicas() {
1888 let role = instance.role();
1889 self.controller.create_replica(
1890 instance.id,
1891 replica.replica_id,
1892 instance.name.clone(),
1893 replica.name.clone(),
1894 role,
1895 replica.config.clone(),
1896 enable_worker_core_affinity,
1897 )?;
1898 }
1899 }
1900
1901 info!(
1902 "startup: coordinator init: bootstrap: preamble complete in {:?}",
1903 bootstrap_start.elapsed()
1904 );
1905
1906 let init_storage_collections_start = Instant::now();
1907 info!("startup: coordinator init: bootstrap: storage collections init beginning");
1908 self.bootstrap_storage_collections(&migrated_storage_collections_0dt)
1909 .await;
1910 info!(
1911 "startup: coordinator init: bootstrap: storage collections init complete in {:?}",
1912 init_storage_collections_start.elapsed()
1913 );
1914
1915 self.controller.start_compute_introspection_sink();
1920
1921 let optimize_dataflows_start = Instant::now();
1922 info!("startup: coordinator init: bootstrap: optimize dataflow plans beginning");
1923 let entries: Vec<_> = self.catalog().entries().cloned().collect();
1924 let uncached_global_exps = self.bootstrap_dataflow_plans(&entries, cached_global_exprs)?;
1925 info!(
1926 "startup: coordinator init: bootstrap: optimize dataflow plans complete in {:?}",
1927 optimize_dataflows_start.elapsed()
1928 );
1929
1930 let _fut = self.catalog().update_expression_cache(
1932 uncached_local_exprs.into_iter().collect(),
1933 uncached_global_exps.into_iter().collect(),
1934 );
1935
1936 let bootstrap_as_ofs_start = Instant::now();
1940 info!("startup: coordinator init: bootstrap: dataflow as-of bootstrapping beginning");
1941 let dataflow_read_holds = self.bootstrap_dataflow_as_ofs().await;
1942 info!(
1943 "startup: coordinator init: bootstrap: dataflow as-of bootstrapping complete in {:?}",
1944 bootstrap_as_ofs_start.elapsed()
1945 );
1946
1947 let postamble_start = Instant::now();
1948 info!("startup: coordinator init: bootstrap: postamble beginning");
1949
1950 let logs: BTreeSet<_> = BUILTINS::logs()
1951 .map(|log| self.catalog().resolve_builtin_log(log))
1952 .flat_map(|item_id| self.catalog().get_global_ids(&item_id))
1953 .collect();
1954
1955 let mut privatelink_connections = BTreeMap::new();
1956
1957 for entry in &entries {
1958 debug!(
1959 "coordinator init: installing {} {}",
1960 entry.item().typ(),
1961 entry.id()
1962 );
1963 let mut policy = entry.item().initial_logical_compaction_window();
1964 match entry.item() {
1965 CatalogItem::Source(source) => {
1971 if source.custom_logical_compaction_window.is_none() {
1973 if let DataSourceDesc::IngestionExport { ingestion_id, .. } =
1974 source.data_source
1975 {
1976 policy = Some(
1977 self.catalog()
1978 .get_entry(&ingestion_id)
1979 .source()
1980 .expect("must be source")
1981 .custom_logical_compaction_window
1982 .unwrap_or_default(),
1983 );
1984 }
1985 }
1986 policies_to_set
1987 .entry(policy.expect("sources have a compaction window"))
1988 .or_insert_with(Default::default)
1989 .storage_ids
1990 .insert(source.global_id());
1991 }
1992 CatalogItem::Table(table) => {
1993 policies_to_set
1994 .entry(policy.expect("tables have a compaction window"))
1995 .or_insert_with(Default::default)
1996 .storage_ids
1997 .extend(table.global_ids());
1998 }
1999 CatalogItem::Index(idx) => {
2000 let policy_entry = policies_to_set
2001 .entry(policy.expect("indexes have a compaction window"))
2002 .or_insert_with(Default::default);
2003
2004 if logs.contains(&idx.on) {
2005 policy_entry
2006 .compute_ids
2007 .entry(idx.cluster_id)
2008 .or_insert_with(BTreeSet::new)
2009 .insert(idx.global_id());
2010 } else {
2011 let df_desc = self
2012 .catalog()
2013 .try_get_physical_plan(&idx.global_id())
2014 .expect("added in `bootstrap_dataflow_plans`")
2015 .clone();
2016
2017 let df_meta = self
2018 .catalog()
2019 .try_get_dataflow_metainfo(&idx.global_id())
2020 .expect("added in `bootstrap_dataflow_plans`");
2021
2022 if self.catalog().state().system_config().enable_mz_notices() {
2023 self.catalog().state().pack_optimizer_notices(
2025 &mut builtin_table_updates,
2026 df_meta.optimizer_notices.iter(),
2027 Diff::ONE,
2028 );
2029 }
2030
2031 policy_entry
2034 .compute_ids
2035 .entry(idx.cluster_id)
2036 .or_insert_with(Default::default)
2037 .extend(df_desc.export_ids());
2038
2039 self.controller
2040 .compute
2041 .create_dataflow(idx.cluster_id, df_desc, None)
2042 .unwrap_or_terminate("cannot fail to create dataflows");
2043 }
2044 }
2045 CatalogItem::View(_) => (),
2046 CatalogItem::MaterializedView(mview) => {
2047 policies_to_set
2048 .entry(policy.expect("materialized views have a compaction window"))
2049 .or_insert_with(Default::default)
2050 .storage_ids
2051 .insert(mview.global_id());
2052
2053 let mut df_desc = self
2054 .catalog()
2055 .try_get_physical_plan(&mview.global_id())
2056 .expect("added in `bootstrap_dataflow_plans`")
2057 .clone();
2058
2059 if let Some(initial_as_of) = mview.initial_as_of.clone() {
2060 df_desc.set_initial_as_of(initial_as_of);
2061 }
2062
2063 let until = mview
2065 .refresh_schedule
2066 .as_ref()
2067 .and_then(|s| s.last_refresh())
2068 .and_then(|r| r.try_step_forward());
2069 if let Some(until) = until {
2070 df_desc.until.meet_assign(&Antichain::from_elem(until));
2071 }
2072
2073 let df_meta = self
2074 .catalog()
2075 .try_get_dataflow_metainfo(&mview.global_id())
2076 .expect("added in `bootstrap_dataflow_plans`");
2077
2078 if self.catalog().state().system_config().enable_mz_notices() {
2079 self.catalog().state().pack_optimizer_notices(
2081 &mut builtin_table_updates,
2082 df_meta.optimizer_notices.iter(),
2083 Diff::ONE,
2084 );
2085 }
2086
2087 self.ship_dataflow(df_desc, mview.cluster_id, None).await;
2088 }
2089 CatalogItem::Sink(sink) => {
2090 policies_to_set
2091 .entry(CompactionWindow::Default)
2092 .or_insert_with(Default::default)
2093 .storage_ids
2094 .insert(sink.global_id());
2095 }
2096 CatalogItem::Connection(catalog_connection) => {
2097 if let ConnectionDetails::AwsPrivatelink(conn) = &catalog_connection.details {
2098 privatelink_connections.insert(
2099 entry.id(),
2100 VpcEndpointConfig {
2101 aws_service_name: conn.service_name.clone(),
2102 availability_zone_ids: conn.availability_zones.clone(),
2103 },
2104 );
2105 }
2106 }
2107 CatalogItem::ContinualTask(ct) => {
2108 policies_to_set
2109 .entry(policy.expect("continual tasks have a compaction window"))
2110 .or_insert_with(Default::default)
2111 .storage_ids
2112 .insert(ct.global_id());
2113
2114 let mut df_desc = self
2115 .catalog()
2116 .try_get_physical_plan(&ct.global_id())
2117 .expect("added in `bootstrap_dataflow_plans`")
2118 .clone();
2119
2120 if let Some(initial_as_of) = ct.initial_as_of.clone() {
2121 df_desc.set_initial_as_of(initial_as_of);
2122 }
2123
2124 let df_meta = self
2125 .catalog()
2126 .try_get_dataflow_metainfo(&ct.global_id())
2127 .expect("added in `bootstrap_dataflow_plans`");
2128
2129 if self.catalog().state().system_config().enable_mz_notices() {
2130 self.catalog().state().pack_optimizer_notices(
2132 &mut builtin_table_updates,
2133 df_meta.optimizer_notices.iter(),
2134 Diff::ONE,
2135 );
2136 }
2137
2138 self.ship_dataflow(df_desc, ct.cluster_id, None).await;
2139 }
2140 CatalogItem::Log(_)
2142 | CatalogItem::Type(_)
2143 | CatalogItem::Func(_)
2144 | CatalogItem::Secret(_) => {}
2145 }
2146 }
2147
2148 if let Some(cloud_resource_controller) = &self.cloud_resource_controller {
2149 let existing_vpc_endpoints = cloud_resource_controller
2151 .list_vpc_endpoints()
2152 .await
2153 .context("list vpc endpoints")?;
2154 let existing_vpc_endpoints = BTreeSet::from_iter(existing_vpc_endpoints.into_keys());
2155 let desired_vpc_endpoints = privatelink_connections.keys().cloned().collect();
2156 let vpc_endpoints_to_remove = existing_vpc_endpoints.difference(&desired_vpc_endpoints);
2157 for id in vpc_endpoints_to_remove {
2158 cloud_resource_controller
2159 .delete_vpc_endpoint(*id)
2160 .await
2161 .context("deleting extraneous vpc endpoint")?;
2162 }
2163
2164 for (id, spec) in privatelink_connections {
2166 cloud_resource_controller
2167 .ensure_vpc_endpoint(id, spec)
2168 .await
2169 .context("ensuring vpc endpoint")?;
2170 }
2171 }
2172
2173 drop(dataflow_read_holds);
2176 for (cw, policies) in policies_to_set {
2178 self.initialize_read_policies(&policies, cw).await;
2179 }
2180
2181 builtin_table_updates.extend(
2183 self.catalog().state().resolve_builtin_table_updates(
2184 self.catalog().state().pack_all_replica_size_updates(),
2185 ),
2186 );
2187
2188 debug!("startup: coordinator init: bootstrap: initializing migrated builtin tables");
2189 let migrated_updates_fut = if self.controller.read_only() {
2195 let min_timestamp = Timestamp::minimum();
2196 let migrated_builtin_table_updates: Vec<_> = builtin_table_updates
2197 .extract_if(.., |update| {
2198 let gid = self.catalog().get_entry(&update.id).latest_global_id();
2199 migrated_storage_collections_0dt.contains(&update.id)
2200 && self
2201 .controller
2202 .storage_collections
2203 .collection_frontiers(gid)
2204 .expect("all tables are registered")
2205 .write_frontier
2206 .elements()
2207 == &[min_timestamp]
2208 })
2209 .collect();
2210 if migrated_builtin_table_updates.is_empty() {
2211 futures::future::ready(()).boxed()
2212 } else {
2213 let mut grouped_appends: BTreeMap<GlobalId, Vec<TableData>> = BTreeMap::new();
2215 for update in migrated_builtin_table_updates {
2216 let gid = self.catalog().get_entry(&update.id).latest_global_id();
2217 grouped_appends.entry(gid).or_default().push(update.data);
2218 }
2219 info!(
2220 "coordinator init: rehydrating migrated builtin tables in read-only mode: {:?}",
2221 grouped_appends.keys().collect::<Vec<_>>()
2222 );
2223
2224 let mut all_appends = Vec::with_capacity(grouped_appends.len());
2226 for (item_id, table_data) in grouped_appends.into_iter() {
2227 let mut all_rows = Vec::new();
2228 let mut all_data = Vec::new();
2229 for data in table_data {
2230 match data {
2231 TableData::Rows(rows) => all_rows.extend(rows),
2232 TableData::Batches(_) => all_data.push(data),
2233 }
2234 }
2235 differential_dataflow::consolidation::consolidate(&mut all_rows);
2236 all_data.push(TableData::Rows(all_rows));
2237
2238 all_appends.push((item_id, all_data));
2240 }
2241
2242 let fut = self
2243 .controller
2244 .storage
2245 .append_table(min_timestamp, boot_ts.step_forward(), all_appends)
2246 .expect("cannot fail to append");
2247 async {
2248 fut.await
2249 .expect("One-shot shouldn't be dropped during bootstrap")
2250 .unwrap_or_terminate("cannot fail to append")
2251 }
2252 .boxed()
2253 }
2254 } else {
2255 futures::future::ready(()).boxed()
2256 };
2257
2258 info!(
2259 "startup: coordinator init: bootstrap: postamble complete in {:?}",
2260 postamble_start.elapsed()
2261 );
2262
2263 let builtin_update_start = Instant::now();
2264 info!("startup: coordinator init: bootstrap: generate builtin updates beginning");
2265
2266 if self.controller.read_only() {
2267 info!(
2268 "coordinator init: bootstrap: stashing builtin table updates while in read-only mode"
2269 );
2270
2271 let audit_join_start = Instant::now();
2273 info!("startup: coordinator init: bootstrap: audit log deserialization beginning");
2274 let audit_log_updates: Vec<_> = audit_logs_iterator
2275 .map(|(audit_log, ts)| StateUpdate {
2276 kind: StateUpdateKind::AuditLog(audit_log),
2277 ts,
2278 diff: StateDiff::Addition,
2279 })
2280 .collect();
2281 let audit_log_builtin_table_updates = self
2282 .catalog()
2283 .state()
2284 .generate_builtin_table_updates(audit_log_updates);
2285 builtin_table_updates.extend(audit_log_builtin_table_updates);
2286 info!(
2287 "startup: coordinator init: bootstrap: audit log deserialization complete in {:?}",
2288 audit_join_start.elapsed()
2289 );
2290 self.buffered_builtin_table_updates
2291 .as_mut()
2292 .expect("in read-only mode")
2293 .append(&mut builtin_table_updates);
2294 } else {
2295 self.bootstrap_tables(&entries, builtin_table_updates, audit_logs_iterator)
2296 .await;
2297 };
2298 info!(
2299 "startup: coordinator init: bootstrap: generate builtin updates complete in {:?}",
2300 builtin_update_start.elapsed()
2301 );
2302
2303 let cleanup_secrets_start = Instant::now();
2304 info!("startup: coordinator init: bootstrap: generate secret cleanup beginning");
2305 {
2309 let Self {
2312 secrets_controller,
2313 catalog,
2314 ..
2315 } = self;
2316
2317 let next_user_item_id = catalog.get_next_user_item_id().await?;
2318 let next_system_item_id = catalog.get_next_system_item_id().await?;
2319 let read_only = self.controller.read_only();
2320 let catalog_ids: BTreeSet<CatalogItemId> =
2325 catalog.entries().map(|entry| entry.id()).collect();
2326 let secrets_controller = Arc::clone(secrets_controller);
2327
2328 spawn(|| "cleanup-orphaned-secrets", async move {
2329 if read_only {
2330 info!(
2331 "coordinator init: not cleaning up orphaned secrets while in read-only mode"
2332 );
2333 return;
2334 }
2335 info!("coordinator init: cleaning up orphaned secrets");
2336
2337 match secrets_controller.list().await {
2338 Ok(controller_secrets) => {
2339 let controller_secrets: BTreeSet<CatalogItemId> =
2340 controller_secrets.into_iter().collect();
2341 let orphaned = controller_secrets.difference(&catalog_ids);
2342 for id in orphaned {
2343 let id_too_large = match id {
2344 CatalogItemId::System(id) => *id >= next_system_item_id,
2345 CatalogItemId::User(id) => *id >= next_user_item_id,
2346 CatalogItemId::IntrospectionSourceIndex(_)
2347 | CatalogItemId::Transient(_) => false,
2348 };
2349 if id_too_large {
2350 info!(
2351 %next_user_item_id, %next_system_item_id,
2352 "coordinator init: not deleting orphaned secret {id} that was likely created by a newer deploy generation"
2353 );
2354 } else {
2355 info!("coordinator init: deleting orphaned secret {id}");
2356 fail_point!("orphan_secrets");
2357 if let Err(e) = secrets_controller.delete(*id).await {
2358 warn!(
2359 "Dropping orphaned secret has encountered an error: {}",
2360 e
2361 );
2362 }
2363 }
2364 }
2365 }
2366 Err(e) => warn!("Failed to list secrets during orphan cleanup: {:?}", e),
2367 }
2368 });
2369 }
2370 info!(
2371 "startup: coordinator init: bootstrap: generate secret cleanup complete in {:?}",
2372 cleanup_secrets_start.elapsed()
2373 );
2374
2375 let final_steps_start = Instant::now();
2377 info!(
2378 "startup: coordinator init: bootstrap: migrate builtin tables in read-only mode beginning"
2379 );
2380 migrated_updates_fut
2381 .instrument(info_span!("coord::bootstrap::final"))
2382 .await;
2383
2384 debug!(
2385 "startup: coordinator init: bootstrap: announcing completion of initialization to controller"
2386 );
2387 self.controller.initialization_complete();
2389
2390 self.bootstrap_introspection_subscribes().await;
2392
2393 info!(
2394 "startup: coordinator init: bootstrap: migrate builtin tables in read-only mode complete in {:?}",
2395 final_steps_start.elapsed()
2396 );
2397
2398 info!(
2399 "startup: coordinator init: bootstrap complete in {:?}",
2400 bootstrap_start.elapsed()
2401 );
2402 Ok(())
2403 }
2404
2405 #[allow(clippy::async_yields_async)]
2410 #[instrument]
2411 async fn bootstrap_tables(
2412 &mut self,
2413 entries: &[CatalogEntry],
2414 mut builtin_table_updates: Vec<BuiltinTableUpdate>,
2415 audit_logs_iterator: AuditLogIterator,
2416 ) {
2417 struct TableMetadata<'a> {
2419 id: CatalogItemId,
2420 name: &'a QualifiedItemName,
2421 table: &'a Table,
2422 }
2423
2424 let table_metas: Vec<_> = entries
2426 .into_iter()
2427 .filter_map(|entry| {
2428 entry.table().map(|table| TableMetadata {
2429 id: entry.id(),
2430 name: entry.name(),
2431 table,
2432 })
2433 })
2434 .collect();
2435
2436 debug!("coordinator init: advancing all tables to current timestamp");
2438 let WriteTimestamp {
2439 timestamp: write_ts,
2440 advance_to,
2441 } = self.get_local_write_ts().await;
2442 let appends = table_metas
2443 .iter()
2444 .map(|meta| (meta.table.global_id_writes(), Vec::new()))
2445 .collect();
2446 let table_fence_rx = self
2450 .controller
2451 .storage
2452 .append_table(write_ts.clone(), advance_to, appends)
2453 .expect("invalid updates");
2454
2455 self.apply_local_write(write_ts).await;
2456
2457 debug!("coordinator init: resetting system tables");
2459 let read_ts = self.get_local_read_ts().await;
2460
2461 let mz_storage_usage_by_shard_schema: SchemaSpecifier = self
2464 .catalog()
2465 .resolve_system_schema(MZ_STORAGE_USAGE_BY_SHARD.schema)
2466 .into();
2467 let is_storage_usage_by_shard = |meta: &TableMetadata| -> bool {
2468 meta.name.item == MZ_STORAGE_USAGE_BY_SHARD.name
2469 && meta.name.qualifiers.schema_spec == mz_storage_usage_by_shard_schema
2470 };
2471
2472 let mut retraction_tasks = Vec::new();
2473 let mut system_tables: Vec<_> = table_metas
2474 .iter()
2475 .filter(|meta| meta.id.is_system() && !is_storage_usage_by_shard(meta))
2476 .collect();
2477
2478 let (audit_events_idx, _) = system_tables
2480 .iter()
2481 .find_position(|table| {
2482 table.id == self.catalog().resolve_builtin_table(&MZ_AUDIT_EVENTS)
2483 })
2484 .expect("mz_audit_events must exist");
2485 let audit_events = system_tables.remove(audit_events_idx);
2486 let audit_log_task = self.bootstrap_audit_log_table(
2487 audit_events.id,
2488 audit_events.name,
2489 audit_events.table,
2490 audit_logs_iterator,
2491 read_ts,
2492 );
2493
2494 for system_table in system_tables {
2495 let table_id = system_table.id;
2496 let full_name = self.catalog().resolve_full_name(system_table.name, None);
2497 debug!("coordinator init: resetting system table {full_name} ({table_id})");
2498
2499 let snapshot_fut = self
2501 .controller
2502 .storage_collections
2503 .snapshot_cursor(system_table.table.global_id_writes(), read_ts);
2504 let batch_fut = self
2505 .controller
2506 .storage_collections
2507 .create_update_builder(system_table.table.global_id_writes());
2508
2509 let task = spawn(|| format!("snapshot-{table_id}"), async move {
2510 let mut batch = batch_fut
2512 .await
2513 .unwrap_or_terminate("cannot fail to create a batch for a BuiltinTable");
2514 tracing::info!(?table_id, "starting snapshot");
2515 let mut snapshot_cursor = snapshot_fut
2517 .await
2518 .unwrap_or_terminate("cannot fail to snapshot");
2519
2520 while let Some(values) = snapshot_cursor.next().await {
2522 for ((key, _val), _t, d) in values {
2523 let key = key.expect("builtin table had errors");
2524 let d_invert = d.neg();
2525 batch.add(&key, &(), &d_invert).await;
2526 }
2527 }
2528 tracing::info!(?table_id, "finished snapshot");
2529
2530 let batch = batch.finish().await;
2531 BuiltinTableUpdate::batch(table_id, batch)
2532 });
2533 retraction_tasks.push(task);
2534 }
2535
2536 let retractions_res = futures::future::join_all(retraction_tasks).await;
2537 for retractions in retractions_res {
2538 let retractions = retractions.expect("cannot fail to fetch snapshot");
2539 builtin_table_updates.push(retractions);
2540 }
2541
2542 let audit_join_start = Instant::now();
2543 info!("startup: coordinator init: bootstrap: join audit log deserialization beginning");
2544 let audit_log_updates = audit_log_task
2545 .await
2546 .expect("cannot fail to fetch audit log updates");
2547 let audit_log_builtin_table_updates = self
2548 .catalog()
2549 .state()
2550 .generate_builtin_table_updates(audit_log_updates);
2551 builtin_table_updates.extend(audit_log_builtin_table_updates);
2552 info!(
2553 "startup: coordinator init: bootstrap: join audit log deserialization complete in {:?}",
2554 audit_join_start.elapsed()
2555 );
2556
2557 table_fence_rx
2559 .await
2560 .expect("One-shot shouldn't be dropped during bootstrap")
2561 .unwrap_or_terminate("cannot fail to append");
2562
2563 info!("coordinator init: sending builtin table updates");
2564 let (_builtin_updates_fut, write_ts) = self
2565 .builtin_table_update()
2566 .execute(builtin_table_updates)
2567 .await;
2568 info!(?write_ts, "our write ts");
2569 if let Some(write_ts) = write_ts {
2570 self.apply_local_write(write_ts).await;
2571 }
2572 }
2573
2574 #[instrument]
2578 fn bootstrap_audit_log_table<'a>(
2579 &mut self,
2580 table_id: CatalogItemId,
2581 name: &'a QualifiedItemName,
2582 table: &'a Table,
2583 audit_logs_iterator: AuditLogIterator,
2584 read_ts: Timestamp,
2585 ) -> JoinHandle<Vec<StateUpdate>> {
2586 let full_name = self.catalog().resolve_full_name(name, None);
2587 debug!("coordinator init: reconciling audit log: {full_name} ({table_id})");
2588 let current_contents_fut = self
2589 .controller
2590 .storage_collections
2591 .snapshot(table.global_id_writes(), read_ts);
2592 spawn(|| format!("snapshot-audit-log-{table_id}"), async move {
2593 let current_contents = current_contents_fut
2594 .await
2595 .unwrap_or_terminate("cannot fail to fetch snapshot");
2596 let contents_len = current_contents.len();
2597 debug!("coordinator init: audit log table ({table_id}) size {contents_len}");
2598
2599 let max_table_id = current_contents
2601 .into_iter()
2602 .filter(|(_, diff)| *diff == 1)
2603 .map(|(row, _diff)| row.unpack_first().unwrap_uint64())
2604 .sorted()
2605 .rev()
2606 .next();
2607
2608 audit_logs_iterator
2610 .take_while(|(audit_log, _)| match max_table_id {
2611 Some(id) => audit_log.event.sortable_id() > id,
2612 None => true,
2613 })
2614 .map(|(audit_log, ts)| StateUpdate {
2615 kind: StateUpdateKind::AuditLog(audit_log),
2616 ts,
2617 diff: StateDiff::Addition,
2618 })
2619 .collect::<Vec<_>>()
2620 })
2621 }
2622
2623 #[instrument]
2636 async fn bootstrap_storage_collections(
2637 &mut self,
2638 migrated_storage_collections: &BTreeSet<CatalogItemId>,
2639 ) {
2640 let catalog = self.catalog();
2641 let source_status_collection_id = catalog
2642 .resolve_builtin_storage_collection(&mz_catalog::builtin::MZ_SOURCE_STATUS_HISTORY);
2643 let source_status_collection_id = catalog
2644 .get_entry(&source_status_collection_id)
2645 .latest_global_id();
2646
2647 let source_desc = |object_id: GlobalId,
2648 data_source: &DataSourceDesc,
2649 desc: &RelationDesc,
2650 timeline: &Timeline| {
2651 let (data_source, status_collection_id) = match data_source.clone() {
2652 DataSourceDesc::Ingestion { desc, cluster_id } => {
2654 let desc = desc.into_inline_connection(catalog.state());
2655 let ingestion = IngestionDescription::new(desc, cluster_id, object_id);
2656
2657 (
2658 DataSource::Ingestion(ingestion),
2659 Some(source_status_collection_id),
2660 )
2661 }
2662 DataSourceDesc::OldSyntaxIngestion {
2663 desc,
2664 progress_subsource,
2665 data_config,
2666 details,
2667 cluster_id,
2668 } => {
2669 let desc = desc.into_inline_connection(catalog.state());
2670 let data_config = data_config.into_inline_connection(catalog.state());
2671 let progress_subsource =
2674 catalog.get_entry(&progress_subsource).latest_global_id();
2675 let mut ingestion =
2676 IngestionDescription::new(desc, cluster_id, progress_subsource);
2677 let legacy_export = SourceExport {
2678 storage_metadata: (),
2679 data_config,
2680 details,
2681 };
2682 ingestion.source_exports.insert(object_id, legacy_export);
2683
2684 (
2685 DataSource::Ingestion(ingestion),
2686 Some(source_status_collection_id),
2687 )
2688 }
2689 DataSourceDesc::IngestionExport {
2690 ingestion_id,
2691 external_reference: _,
2692 details,
2693 data_config,
2694 } => {
2695 let ingestion_id = catalog.get_entry(&ingestion_id).latest_global_id();
2698 (
2699 DataSource::IngestionExport {
2700 ingestion_id,
2701 details,
2702 data_config: data_config.into_inline_connection(catalog.state()),
2703 },
2704 Some(source_status_collection_id),
2705 )
2706 }
2707 DataSourceDesc::Webhook { .. } => {
2708 (DataSource::Webhook, Some(source_status_collection_id))
2709 }
2710 DataSourceDesc::Progress => (DataSource::Progress, None),
2711 DataSourceDesc::Introspection(introspection) => {
2712 (DataSource::Introspection(introspection), None)
2713 }
2714 };
2715 CollectionDescription {
2716 desc: desc.clone(),
2717 data_source,
2718 since: None,
2719 status_collection_id,
2720 timeline: Some(timeline.clone()),
2721 }
2722 };
2723
2724 let mut compute_collections = vec![];
2725 let mut collections = vec![];
2726 let mut new_builtin_continual_tasks = vec![];
2727 for entry in catalog.entries() {
2728 match entry.item() {
2729 CatalogItem::Source(source) => {
2730 collections.push((
2731 source.global_id(),
2732 source_desc(
2733 source.global_id(),
2734 &source.data_source,
2735 &source.desc,
2736 &source.timeline,
2737 ),
2738 ));
2739 }
2740 CatalogItem::Table(table) => {
2741 match &table.data_source {
2742 TableDataSource::TableWrites { defaults: _ } => {
2743 let versions: BTreeMap<_, _> = table
2744 .collection_descs()
2745 .map(|(gid, version, desc)| (version, (gid, desc)))
2746 .collect();
2747 let collection_descs = versions.iter().map(|(version, (gid, desc))| {
2748 let next_version = version.bump();
2749 let primary_collection =
2750 versions.get(&next_version).map(|(gid, _desc)| gid).copied();
2751 let collection_desc = CollectionDescription::for_table(
2752 desc.clone(),
2753 primary_collection,
2754 );
2755
2756 (*gid, collection_desc)
2757 });
2758 collections.extend(collection_descs);
2759 }
2760 TableDataSource::DataSource {
2761 desc: data_source_desc,
2762 timeline,
2763 } => {
2764 soft_assert_eq_or_log!(table.collections.len(), 1);
2766 let collection_descs =
2767 table.collection_descs().map(|(gid, _version, desc)| {
2768 (
2769 gid,
2770 source_desc(
2771 entry.latest_global_id(),
2772 data_source_desc,
2773 &desc,
2774 timeline,
2775 ),
2776 )
2777 });
2778 collections.extend(collection_descs);
2779 }
2780 };
2781 }
2782 CatalogItem::MaterializedView(mv) => {
2783 let collection_desc = CollectionDescription {
2784 desc: mv.desc.clone(),
2785 data_source: DataSource::Other,
2786 since: mv.initial_as_of.clone(),
2787 status_collection_id: None,
2788 timeline: None,
2789 };
2790 compute_collections.push((mv.global_id(), mv.desc.clone()));
2791 collections.push((mv.global_id(), collection_desc));
2792 }
2793 CatalogItem::ContinualTask(ct) => {
2794 let collection_desc = CollectionDescription {
2795 desc: ct.desc.clone(),
2796 data_source: DataSource::Other,
2797 since: ct.initial_as_of.clone(),
2798 status_collection_id: None,
2799 timeline: None,
2800 };
2801 if ct.global_id().is_system() && collection_desc.since.is_none() {
2802 new_builtin_continual_tasks.push((ct.global_id(), collection_desc));
2806 } else {
2807 compute_collections.push((ct.global_id(), ct.desc.clone()));
2808 collections.push((ct.global_id(), collection_desc));
2809 }
2810 }
2811 CatalogItem::Sink(sink) => {
2812 let storage_sink_from_entry = self.catalog().get_entry_by_global_id(&sink.from);
2813 let from_desc = storage_sink_from_entry
2814 .desc(&self.catalog().resolve_full_name(
2815 storage_sink_from_entry.name(),
2816 storage_sink_from_entry.conn_id(),
2817 ))
2818 .expect("sinks can only be built on items with descs")
2819 .into_owned();
2820 let collection_desc = CollectionDescription {
2821 desc: KAFKA_PROGRESS_DESC.clone(),
2823 data_source: DataSource::Sink {
2824 desc: ExportDescription {
2825 sink: StorageSinkDesc {
2826 from: sink.from,
2827 from_desc,
2828 connection: sink
2829 .connection
2830 .clone()
2831 .into_inline_connection(self.catalog().state()),
2832 envelope: sink.envelope,
2833 as_of: Antichain::from_elem(Timestamp::minimum()),
2834 with_snapshot: sink.with_snapshot,
2835 version: sink.version,
2836 from_storage_metadata: (),
2837 to_storage_metadata: (),
2838 },
2839 instance_id: sink.cluster_id,
2840 },
2841 },
2842 since: None,
2843 status_collection_id: None,
2844 timeline: None,
2845 };
2846 collections.push((sink.global_id, collection_desc));
2847 }
2848 _ => (),
2849 }
2850 }
2851
2852 let register_ts = if self.controller.read_only() {
2853 self.get_local_read_ts().await
2854 } else {
2855 self.get_local_write_ts().await.timestamp
2858 };
2859
2860 let storage_metadata = self.catalog.state().storage_metadata();
2861 let migrated_storage_collections = migrated_storage_collections
2862 .into_iter()
2863 .flat_map(|item_id| self.catalog.get_entry(item_id).global_ids())
2864 .collect();
2865
2866 self.controller
2871 .storage
2872 .evolve_nullability_for_bootstrap(storage_metadata, compute_collections)
2873 .await
2874 .unwrap_or_terminate("cannot fail to evolve collections");
2875
2876 self.controller
2877 .storage
2878 .create_collections_for_bootstrap(
2879 storage_metadata,
2880 Some(register_ts),
2881 collections,
2882 &migrated_storage_collections,
2883 )
2884 .await
2885 .unwrap_or_terminate("cannot fail to create collections");
2886
2887 self.bootstrap_builtin_continual_tasks(new_builtin_continual_tasks)
2888 .await;
2889
2890 if !self.controller.read_only() {
2891 self.apply_local_write(register_ts).await;
2892 }
2893 }
2894
2895 async fn bootstrap_builtin_continual_tasks(
2902 &mut self,
2903 mut collections: Vec<(GlobalId, CollectionDescription<Timestamp>)>,
2905 ) {
2906 for (id, collection) in &mut collections {
2907 let entry = self.catalog.get_entry_by_global_id(id);
2908 let ct = match &entry.item {
2909 CatalogItem::ContinualTask(ct) => ct.clone(),
2910 _ => unreachable!("only called with continual task builtins"),
2911 };
2912 let debug_name = self
2913 .catalog()
2914 .resolve_full_name(entry.name(), None)
2915 .to_string();
2916 let (_optimized_plan, physical_plan, _metainfo) = self
2917 .optimize_create_continual_task(&ct, *id, self.owned_catalog(), debug_name)
2918 .expect("builtin CT should optimize successfully");
2919
2920 let mut id_bundle = dataflow_import_id_bundle(&physical_plan, ct.cluster_id);
2922 id_bundle.storage_ids.remove(id);
2924 let read_holds = self.acquire_read_holds(&id_bundle);
2925 let as_of = read_holds.least_valid_read();
2926
2927 collection.since = Some(as_of.clone());
2928 }
2929 self.controller
2930 .storage
2931 .create_collections(self.catalog.state().storage_metadata(), None, collections)
2932 .await
2933 .unwrap_or_terminate("cannot fail to create collections");
2934 }
2935
2936 #[instrument]
2947 fn bootstrap_dataflow_plans(
2948 &mut self,
2949 ordered_catalog_entries: &[CatalogEntry],
2950 mut cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
2951 ) -> Result<BTreeMap<GlobalId, GlobalExpressions>, AdapterError> {
2952 let mut instance_snapshots = BTreeMap::new();
2958 let mut uncached_expressions = BTreeMap::new();
2959
2960 let optimizer_config = OptimizerConfig::from(self.catalog().system_config());
2961
2962 for entry in ordered_catalog_entries {
2963 match entry.item() {
2964 CatalogItem::Index(idx) => {
2965 let compute_instance =
2967 instance_snapshots.entry(idx.cluster_id).or_insert_with(|| {
2968 self.instance_snapshot(idx.cluster_id)
2969 .expect("compute instance exists")
2970 });
2971 let global_id = idx.global_id();
2972
2973 if compute_instance.contains_collection(&global_id) {
2976 continue;
2977 }
2978
2979 let (optimized_plan, physical_plan, metainfo) =
2980 match cached_global_exprs.remove(&global_id) {
2981 Some(global_expressions)
2982 if global_expressions.optimizer_features
2983 == optimizer_config.features =>
2984 {
2985 debug!("global expression cache hit for {global_id:?}");
2986 (
2987 global_expressions.global_mir,
2988 global_expressions.physical_plan,
2989 global_expressions.dataflow_metainfos,
2990 )
2991 }
2992 Some(_) | None => {
2993 let (optimized_plan, global_lir_plan) = {
2994 let mut optimizer = optimize::index::Optimizer::new(
2996 self.owned_catalog(),
2997 compute_instance.clone(),
2998 global_id,
2999 optimizer_config.clone(),
3000 self.optimizer_metrics(),
3001 );
3002
3003 let index_plan = optimize::index::Index::new(
3005 entry.name().clone(),
3006 idx.on,
3007 idx.keys.to_vec(),
3008 );
3009 let global_mir_plan = optimizer.optimize(index_plan)?;
3010 let optimized_plan = global_mir_plan.df_desc().clone();
3011
3012 let global_lir_plan = optimizer.optimize(global_mir_plan)?;
3014
3015 (optimized_plan, global_lir_plan)
3016 };
3017
3018 let (physical_plan, metainfo) = global_lir_plan.unapply();
3019 let metainfo = {
3020 let notice_ids =
3022 std::iter::repeat_with(|| self.allocate_transient_id())
3023 .map(|(_item_id, gid)| gid)
3024 .take(metainfo.optimizer_notices.len())
3025 .collect::<Vec<_>>();
3026 self.catalog().render_notices(
3028 metainfo,
3029 notice_ids,
3030 Some(idx.global_id()),
3031 )
3032 };
3033 uncached_expressions.insert(
3034 global_id,
3035 GlobalExpressions {
3036 global_mir: optimized_plan.clone(),
3037 physical_plan: physical_plan.clone(),
3038 dataflow_metainfos: metainfo.clone(),
3039 optimizer_features: OptimizerFeatures::from(
3040 self.catalog().system_config(),
3041 ),
3042 },
3043 );
3044 (optimized_plan, physical_plan, metainfo)
3045 }
3046 };
3047
3048 let catalog = self.catalog_mut();
3049 catalog.set_optimized_plan(idx.global_id(), optimized_plan);
3050 catalog.set_physical_plan(idx.global_id(), physical_plan);
3051 catalog.set_dataflow_metainfo(idx.global_id(), metainfo);
3052
3053 compute_instance.insert_collection(idx.global_id());
3054 }
3055 CatalogItem::MaterializedView(mv) => {
3056 let compute_instance =
3058 instance_snapshots.entry(mv.cluster_id).or_insert_with(|| {
3059 self.instance_snapshot(mv.cluster_id)
3060 .expect("compute instance exists")
3061 });
3062 let global_id = mv.global_id();
3063
3064 let (optimized_plan, physical_plan, metainfo) =
3065 match cached_global_exprs.remove(&global_id) {
3066 Some(global_expressions)
3067 if global_expressions.optimizer_features
3068 == optimizer_config.features =>
3069 {
3070 debug!("global expression cache hit for {global_id:?}");
3071 (
3072 global_expressions.global_mir,
3073 global_expressions.physical_plan,
3074 global_expressions.dataflow_metainfos,
3075 )
3076 }
3077 Some(_) | None => {
3078 let (_, internal_view_id) = self.allocate_transient_id();
3079 let debug_name = self
3080 .catalog()
3081 .resolve_full_name(entry.name(), None)
3082 .to_string();
3083 let force_non_monotonic = Default::default();
3084
3085 let (optimized_plan, global_lir_plan) = {
3086 let mut optimizer = optimize::materialized_view::Optimizer::new(
3088 self.owned_catalog().as_optimizer_catalog(),
3089 compute_instance.clone(),
3090 global_id,
3091 internal_view_id,
3092 mv.desc.iter_names().cloned().collect(),
3093 mv.non_null_assertions.clone(),
3094 mv.refresh_schedule.clone(),
3095 debug_name,
3096 optimizer_config.clone(),
3097 self.optimizer_metrics(),
3098 force_non_monotonic,
3099 );
3100
3101 let global_mir_plan =
3103 optimizer.optimize(mv.optimized_expr.as_ref().clone())?;
3104 let optimized_plan = global_mir_plan.df_desc().clone();
3105
3106 let global_lir_plan = optimizer.optimize(global_mir_plan)?;
3108
3109 (optimized_plan, global_lir_plan)
3110 };
3111
3112 let (physical_plan, metainfo) = global_lir_plan.unapply();
3113 let metainfo = {
3114 let notice_ids =
3116 std::iter::repeat_with(|| self.allocate_transient_id())
3117 .map(|(_item_id, global_id)| global_id)
3118 .take(metainfo.optimizer_notices.len())
3119 .collect::<Vec<_>>();
3120 self.catalog().render_notices(
3122 metainfo,
3123 notice_ids,
3124 Some(mv.global_id()),
3125 )
3126 };
3127 uncached_expressions.insert(
3128 global_id,
3129 GlobalExpressions {
3130 global_mir: optimized_plan.clone(),
3131 physical_plan: physical_plan.clone(),
3132 dataflow_metainfos: metainfo.clone(),
3133 optimizer_features: OptimizerFeatures::from(
3134 self.catalog().system_config(),
3135 ),
3136 },
3137 );
3138 (optimized_plan, physical_plan, metainfo)
3139 }
3140 };
3141
3142 let catalog = self.catalog_mut();
3143 catalog.set_optimized_plan(mv.global_id(), optimized_plan);
3144 catalog.set_physical_plan(mv.global_id(), physical_plan);
3145 catalog.set_dataflow_metainfo(mv.global_id(), metainfo);
3146
3147 compute_instance.insert_collection(mv.global_id());
3148 }
3149 CatalogItem::ContinualTask(ct) => {
3150 let compute_instance =
3151 instance_snapshots.entry(ct.cluster_id).or_insert_with(|| {
3152 self.instance_snapshot(ct.cluster_id)
3153 .expect("compute instance exists")
3154 });
3155 let global_id = ct.global_id();
3156
3157 let (optimized_plan, physical_plan, metainfo) =
3158 match cached_global_exprs.remove(&global_id) {
3159 Some(global_expressions)
3160 if global_expressions.optimizer_features
3161 == optimizer_config.features =>
3162 {
3163 debug!("global expression cache hit for {global_id:?}");
3164 (
3165 global_expressions.global_mir,
3166 global_expressions.physical_plan,
3167 global_expressions.dataflow_metainfos,
3168 )
3169 }
3170 Some(_) | None => {
3171 let debug_name = self
3172 .catalog()
3173 .resolve_full_name(entry.name(), None)
3174 .to_string();
3175 let (optimized_plan, physical_plan, metainfo) = self
3176 .optimize_create_continual_task(
3177 ct,
3178 global_id,
3179 self.owned_catalog(),
3180 debug_name,
3181 )?;
3182 uncached_expressions.insert(
3183 global_id,
3184 GlobalExpressions {
3185 global_mir: optimized_plan.clone(),
3186 physical_plan: physical_plan.clone(),
3187 dataflow_metainfos: metainfo.clone(),
3188 optimizer_features: OptimizerFeatures::from(
3189 self.catalog().system_config(),
3190 ),
3191 },
3192 );
3193 (optimized_plan, physical_plan, metainfo)
3194 }
3195 };
3196
3197 let catalog = self.catalog_mut();
3198 catalog.set_optimized_plan(ct.global_id(), optimized_plan);
3199 catalog.set_physical_plan(ct.global_id(), physical_plan);
3200 catalog.set_dataflow_metainfo(ct.global_id(), metainfo);
3201
3202 compute_instance.insert_collection(ct.global_id());
3203 }
3204 _ => (),
3205 }
3206 }
3207
3208 Ok(uncached_expressions)
3209 }
3210
3211 async fn bootstrap_dataflow_as_ofs(&mut self) -> BTreeMap<GlobalId, ReadHold<Timestamp>> {
3221 let mut catalog_ids = Vec::new();
3222 let mut dataflows = Vec::new();
3223 let mut read_policies = BTreeMap::new();
3224 for entry in self.catalog.entries() {
3225 let gid = match entry.item() {
3226 CatalogItem::Index(idx) => idx.global_id(),
3227 CatalogItem::MaterializedView(mv) => mv.global_id(),
3228 CatalogItem::ContinualTask(ct) => ct.global_id(),
3229 CatalogItem::Table(_)
3230 | CatalogItem::Source(_)
3231 | CatalogItem::Log(_)
3232 | CatalogItem::View(_)
3233 | CatalogItem::Sink(_)
3234 | CatalogItem::Type(_)
3235 | CatalogItem::Func(_)
3236 | CatalogItem::Secret(_)
3237 | CatalogItem::Connection(_) => continue,
3238 };
3239 if let Some(plan) = self.catalog.try_get_physical_plan(&gid) {
3240 catalog_ids.push(gid);
3241 dataflows.push(plan.clone());
3242
3243 if let Some(compaction_window) = entry.item().initial_logical_compaction_window() {
3244 read_policies.insert(gid, compaction_window.into());
3245 }
3246 }
3247 }
3248
3249 let read_ts = self.get_local_read_ts().await;
3250 let read_holds = as_of_selection::run(
3251 &mut dataflows,
3252 &read_policies,
3253 &*self.controller.storage_collections,
3254 read_ts,
3255 self.controller.read_only(),
3256 );
3257
3258 let catalog = self.catalog_mut();
3259 for (id, plan) in catalog_ids.into_iter().zip_eq(dataflows) {
3260 catalog.set_physical_plan(id, plan);
3261 }
3262
3263 read_holds
3264 }
3265
3266 fn serve(
3275 mut self,
3276 mut internal_cmd_rx: mpsc::UnboundedReceiver<Message>,
3277 mut strict_serializable_reads_rx: mpsc::UnboundedReceiver<(ConnectionId, PendingReadTxn)>,
3278 mut cmd_rx: mpsc::UnboundedReceiver<(OpenTelemetryContext, Command)>,
3279 group_commit_rx: appends::GroupCommitWaiter,
3280 ) -> LocalBoxFuture<'static, ()> {
3281 async move {
3282 let mut cluster_events = self.controller.events_stream();
3284 let last_message = Arc::new(Mutex::new(LastMessage {
3285 kind: "none",
3286 stmt: None,
3287 }));
3288
3289 let (idle_tx, mut idle_rx) = tokio::sync::mpsc::channel(1);
3290 let idle_metric = self.metrics.queue_busy_seconds.with_label_values(&[]);
3291 let last_message_watchdog = Arc::clone(&last_message);
3292
3293 spawn(|| "coord watchdog", async move {
3294 let mut interval = tokio::time::interval(Duration::from_secs(5));
3299 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
3303
3304 let mut coord_stuck = false;
3306
3307 loop {
3308 interval.tick().await;
3309
3310 let duration = tokio::time::Duration::from_secs(30);
3312 let timeout = tokio::time::timeout(duration, idle_tx.reserve()).await;
3313 let Ok(maybe_permit) = timeout else {
3314 if !coord_stuck {
3316 let last_message = last_message_watchdog.lock().expect("poisoned");
3317 tracing::warn!(
3318 last_message_kind = %last_message.kind,
3319 last_message_sql = %last_message.stmt_to_string(),
3320 "coordinator stuck for {duration:?}",
3321 );
3322 }
3323 coord_stuck = true;
3324
3325 continue;
3326 };
3327
3328 if coord_stuck {
3330 tracing::info!("Coordinator became unstuck");
3331 }
3332 coord_stuck = false;
3333
3334 let Ok(permit) = maybe_permit else {
3336 break;
3337 };
3338
3339 permit.send(idle_metric.start_timer());
3340 }
3341 });
3342
3343 self.schedule_storage_usage_collection().await;
3344 self.spawn_privatelink_vpc_endpoints_watch_task();
3345 self.spawn_statement_logging_task();
3346 flags::tracing_config(self.catalog.system_config()).apply(&self.tracing_handle);
3347
3348 let warn_threshold = self
3350 .catalog()
3351 .system_config()
3352 .coord_slow_message_warn_threshold();
3353
3354 const MESSAGE_BATCH: usize = 64;
3356 let mut messages = Vec::with_capacity(MESSAGE_BATCH);
3357 let mut cmd_messages = Vec::with_capacity(MESSAGE_BATCH);
3358
3359 let message_batch = self.metrics
3360 .message_batch
3361 .with_label_values(&[]);
3362
3363 loop {
3364 select! {
3368 biased;
3373
3374 _ = internal_cmd_rx.recv_many(&mut messages, MESSAGE_BATCH) => {},
3378 Some(event) = cluster_events.next() => messages.push(Message::ClusterEvent(event)),
3382 () = self.controller.ready() => {
3386 let controller = match self.controller.get_readiness() {
3390 Readiness::Storage => ControllerReadiness::Storage,
3391 Readiness::Compute => ControllerReadiness::Compute,
3392 Readiness::Metrics(_) => ControllerReadiness::Metrics,
3393 Readiness::Internal(_) => ControllerReadiness::Internal,
3394 Readiness::NotReady => unreachable!("just signaled as ready"),
3395 };
3396 messages.push(Message::ControllerReady { controller });
3397 }
3398 permit = group_commit_rx.ready() => {
3401 let user_write_spans = self.pending_writes.iter().flat_map(|x| match x {
3407 PendingWriteTxn::User{span, ..} => Some(span),
3408 PendingWriteTxn::System{..} => None,
3409 });
3410 let span = match user_write_spans.exactly_one() {
3411 Ok(span) => span.clone(),
3412 Err(user_write_spans) => {
3413 let span = info_span!(parent: None, "group_commit_notify");
3414 for s in user_write_spans {
3415 span.follows_from(s);
3416 }
3417 span
3418 }
3419 };
3420 messages.push(Message::GroupCommitInitiate(span, Some(permit)));
3421 },
3422 count = cmd_rx.recv_many(&mut cmd_messages, MESSAGE_BATCH) => {
3426 if count == 0 {
3427 break;
3428 } else {
3429 messages.extend(cmd_messages.drain(..).map(|(otel_ctx, cmd)| Message::Command(otel_ctx, cmd)));
3430 }
3431 },
3432 Some(pending_read_txn) = strict_serializable_reads_rx.recv() => {
3436 let mut pending_read_txns = vec![pending_read_txn];
3437 while let Ok(pending_read_txn) = strict_serializable_reads_rx.try_recv() {
3438 pending_read_txns.push(pending_read_txn);
3439 }
3440 for (conn_id, pending_read_txn) in pending_read_txns {
3441 let prev = self.pending_linearize_read_txns.insert(conn_id, pending_read_txn);
3442 soft_assert_or_log!(
3443 prev.is_none(),
3444 "connections can not have multiple concurrent reads, prev: {prev:?}"
3445 )
3446 }
3447 messages.push(Message::LinearizeReads);
3448 }
3449 _ = self.advance_timelines_interval.tick() => {
3453 let span = info_span!(parent: None, "coord::advance_timelines_interval");
3454 span.follows_from(Span::current());
3455
3456 if self.controller.read_only() {
3461 messages.push(Message::AdvanceTimelines);
3462 } else {
3463 messages.push(Message::GroupCommitInitiate(span, None));
3464 }
3465 },
3466 _ = self.check_cluster_scheduling_policies_interval.tick() => {
3470 messages.push(Message::CheckSchedulingPolicies);
3471 },
3472
3473 _ = self.caught_up_check_interval.tick() => {
3477 self.maybe_check_caught_up().await;
3482
3483 continue;
3484 },
3485
3486 timer = idle_rx.recv() => {
3491 timer.expect("does not drop").observe_duration();
3492 self.metrics
3493 .message_handling
3494 .with_label_values(&["watchdog"])
3495 .observe(0.0);
3496 continue;
3497 }
3498 };
3499
3500 message_batch.observe(f64::cast_lossy(messages.len()));
3502
3503 for msg in messages.drain(..) {
3504 let msg_kind = msg.kind();
3507 let span = span!(
3508 target: "mz_adapter::coord::handle_message_loop",
3509 Level::INFO,
3510 "coord::handle_message",
3511 kind = msg_kind
3512 );
3513 let otel_context = span.context().span().span_context().clone();
3514
3515 *last_message.lock().expect("poisoned") = LastMessage {
3519 kind: msg_kind,
3520 stmt: match &msg {
3521 Message::Command(
3522 _,
3523 Command::Execute {
3524 portal_name,
3525 session,
3526 ..
3527 },
3528 ) => session
3529 .get_portal_unverified(portal_name)
3530 .and_then(|p| p.stmt.as_ref().map(Arc::clone)),
3531 _ => None,
3532 },
3533 };
3534
3535 let start = Instant::now();
3536 self.handle_message(msg).instrument(span).await;
3537 let duration = start.elapsed();
3538
3539 self.metrics
3540 .message_handling
3541 .with_label_values(&[msg_kind])
3542 .observe(duration.as_secs_f64());
3543
3544 if duration > warn_threshold {
3546 let trace_id = otel_context.is_valid().then(|| otel_context.trace_id());
3547 tracing::error!(
3548 ?msg_kind,
3549 ?trace_id,
3550 ?duration,
3551 "very slow coordinator message"
3552 );
3553 }
3554 }
3555 }
3556 if let Some(catalog) = Arc::into_inner(self.catalog) {
3559 catalog.expire().await;
3560 }
3561 }
3562 .boxed_local()
3563 }
3564
3565 fn catalog(&self) -> &Catalog {
3567 &self.catalog
3568 }
3569
3570 fn owned_catalog(&self) -> Arc<Catalog> {
3573 Arc::clone(&self.catalog)
3574 }
3575
3576 fn optimizer_metrics(&self) -> OptimizerMetrics {
3579 self.optimizer_metrics.clone()
3580 }
3581
3582 fn catalog_mut(&mut self) -> &mut Catalog {
3584 Arc::make_mut(&mut self.catalog)
3592 }
3593
3594 fn connection_context(&self) -> &ConnectionContext {
3596 self.controller.connection_context()
3597 }
3598
3599 fn secrets_reader(&self) -> &Arc<dyn SecretsReader> {
3601 &self.connection_context().secrets_reader
3602 }
3603
3604 #[allow(dead_code)]
3609 pub(crate) fn broadcast_notice(&self, notice: AdapterNotice) {
3610 for meta in self.active_conns.values() {
3611 let _ = meta.notice_tx.send(notice.clone());
3612 }
3613 }
3614
3615 pub(crate) fn broadcast_notice_tx(
3618 &self,
3619 ) -> Box<dyn FnOnce(AdapterNotice) -> () + Send + 'static> {
3620 let senders: Vec<_> = self
3621 .active_conns
3622 .values()
3623 .map(|meta| meta.notice_tx.clone())
3624 .collect();
3625 Box::new(move |notice| {
3626 for tx in senders {
3627 let _ = tx.send(notice.clone());
3628 }
3629 })
3630 }
3631
3632 pub(crate) fn active_conns(&self) -> &BTreeMap<ConnectionId, ConnMeta> {
3633 &self.active_conns
3634 }
3635
3636 #[instrument(level = "debug")]
3637 pub(crate) fn retire_execution(
3638 &mut self,
3639 reason: StatementEndedExecutionReason,
3640 ctx_extra: ExecuteContextExtra,
3641 ) {
3642 if let Some(uuid) = ctx_extra.retire() {
3643 self.end_statement_execution(uuid, reason);
3644 }
3645 }
3646
3647 #[instrument(level = "debug")]
3649 pub fn dataflow_builder(&self, instance: ComputeInstanceId) -> DataflowBuilder<'_> {
3650 let compute = self
3651 .instance_snapshot(instance)
3652 .expect("compute instance does not exist");
3653 DataflowBuilder::new(self.catalog().state(), compute)
3654 }
3655
3656 pub fn instance_snapshot(
3658 &self,
3659 id: ComputeInstanceId,
3660 ) -> Result<ComputeInstanceSnapshot, InstanceMissing> {
3661 ComputeInstanceSnapshot::new(&self.controller, id)
3662 }
3663
3664 pub(crate) async fn ship_dataflow(
3667 &mut self,
3668 dataflow: DataflowDescription<Plan>,
3669 instance: ComputeInstanceId,
3670 subscribe_target_replica: Option<ReplicaId>,
3671 ) {
3672 let export_ids = dataflow.exported_index_ids().collect();
3675
3676 self.controller
3677 .compute
3678 .create_dataflow(instance, dataflow, subscribe_target_replica)
3679 .unwrap_or_terminate("dataflow creation cannot fail");
3680
3681 self.initialize_compute_read_policies(export_ids, instance, CompactionWindow::Default)
3682 .await;
3683 }
3684
3685 pub(crate) async fn ship_dataflow_and_notice_builtin_table_updates(
3687 &mut self,
3688 dataflow: DataflowDescription<Plan>,
3689 instance: ComputeInstanceId,
3690 notice_builtin_updates_fut: Option<BuiltinTableAppendNotify>,
3691 ) {
3692 if let Some(notice_builtin_updates_fut) = notice_builtin_updates_fut {
3693 let ship_dataflow_fut = self.ship_dataflow(dataflow, instance, None);
3694 let ((), ()) =
3695 futures::future::join(notice_builtin_updates_fut, ship_dataflow_fut).await;
3696 } else {
3697 self.ship_dataflow(dataflow, instance, None).await;
3698 }
3699 }
3700
3701 pub fn install_compute_watch_set(
3705 &mut self,
3706 conn_id: ConnectionId,
3707 objects: BTreeSet<GlobalId>,
3708 t: Timestamp,
3709 state: WatchSetResponse,
3710 ) {
3711 let ws_id = self.controller.install_compute_watch_set(objects, t);
3712 self.connection_watch_sets
3713 .entry(conn_id.clone())
3714 .or_default()
3715 .insert(ws_id);
3716 self.installed_watch_sets.insert(ws_id, (conn_id, state));
3717 }
3718
3719 pub fn install_storage_watch_set(
3723 &mut self,
3724 conn_id: ConnectionId,
3725 objects: BTreeSet<GlobalId>,
3726 t: Timestamp,
3727 state: WatchSetResponse,
3728 ) {
3729 let ws_id = self.controller.install_storage_watch_set(objects, t);
3730 self.connection_watch_sets
3731 .entry(conn_id.clone())
3732 .or_default()
3733 .insert(ws_id);
3734 self.installed_watch_sets.insert(ws_id, (conn_id, state));
3735 }
3736
3737 pub fn cancel_pending_watchsets(&mut self, conn_id: &ConnectionId) {
3739 if let Some(ws_ids) = self.connection_watch_sets.remove(conn_id) {
3740 for ws_id in ws_ids {
3741 self.installed_watch_sets.remove(&ws_id);
3742 }
3743 }
3744 }
3745
3746 pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
3750 let global_timelines: BTreeMap<_, _> = self
3756 .global_timelines
3757 .iter()
3758 .map(|(timeline, state)| (timeline.to_string(), format!("{state:?}")))
3759 .collect();
3760 let active_conns: BTreeMap<_, _> = self
3761 .active_conns
3762 .iter()
3763 .map(|(id, meta)| (id.unhandled().to_string(), format!("{meta:?}")))
3764 .collect();
3765 let txn_read_holds: BTreeMap<_, _> = self
3766 .txn_read_holds
3767 .iter()
3768 .map(|(id, capability)| (id.unhandled().to_string(), format!("{capability:?}")))
3769 .collect();
3770 let pending_peeks: BTreeMap<_, _> = self
3771 .pending_peeks
3772 .iter()
3773 .map(|(id, peek)| (id.to_string(), format!("{peek:?}")))
3774 .collect();
3775 let client_pending_peeks: BTreeMap<_, _> = self
3776 .client_pending_peeks
3777 .iter()
3778 .map(|(id, peek)| {
3779 let peek: BTreeMap<_, _> = peek
3780 .iter()
3781 .map(|(uuid, storage_id)| (uuid.to_string(), storage_id))
3782 .collect();
3783 (id.to_string(), peek)
3784 })
3785 .collect();
3786 let pending_linearize_read_txns: BTreeMap<_, _> = self
3787 .pending_linearize_read_txns
3788 .iter()
3789 .map(|(id, read_txn)| (id.unhandled().to_string(), format!("{read_txn:?}")))
3790 .collect();
3791
3792 let map = serde_json::Map::from_iter([
3793 (
3794 "global_timelines".to_string(),
3795 serde_json::to_value(global_timelines)?,
3796 ),
3797 (
3798 "active_conns".to_string(),
3799 serde_json::to_value(active_conns)?,
3800 ),
3801 (
3802 "txn_read_holds".to_string(),
3803 serde_json::to_value(txn_read_holds)?,
3804 ),
3805 (
3806 "pending_peeks".to_string(),
3807 serde_json::to_value(pending_peeks)?,
3808 ),
3809 (
3810 "client_pending_peeks".to_string(),
3811 serde_json::to_value(client_pending_peeks)?,
3812 ),
3813 (
3814 "pending_linearize_read_txns".to_string(),
3815 serde_json::to_value(pending_linearize_read_txns)?,
3816 ),
3817 ("controller".to_string(), self.controller.dump().await?),
3818 ]);
3819 Ok(serde_json::Value::Object(map))
3820 }
3821
3822 async fn prune_storage_usage_events_on_startup(&self, retention_period: Duration) {
3836 let item_id = self
3837 .catalog()
3838 .resolve_builtin_table(&MZ_STORAGE_USAGE_BY_SHARD);
3839 let global_id = self.catalog.get_entry(&item_id).latest_global_id();
3840 let read_ts = self.get_local_read_ts().await;
3841 let current_contents_fut = self
3842 .controller
3843 .storage_collections
3844 .snapshot(global_id, read_ts);
3845 let internal_cmd_tx = self.internal_cmd_tx.clone();
3846 spawn(|| "storage_usage_prune", async move {
3847 let mut current_contents = current_contents_fut
3848 .await
3849 .unwrap_or_terminate("cannot fail to fetch snapshot");
3850 differential_dataflow::consolidation::consolidate(&mut current_contents);
3851
3852 let cutoff_ts = u128::from(read_ts).saturating_sub(retention_period.as_millis());
3853 let mut expired = Vec::new();
3854 for (row, diff) in current_contents {
3855 assert_eq!(
3856 diff, 1,
3857 "consolidated contents should not contain retractions: ({row:#?}, {diff:#?})"
3858 );
3859 let collection_timestamp = row
3861 .unpack()
3862 .get(3)
3863 .expect("definition of mz_storage_by_shard changed")
3864 .unwrap_timestamptz();
3865 let collection_timestamp = collection_timestamp.timestamp_millis();
3866 let collection_timestamp: u128 = collection_timestamp
3867 .try_into()
3868 .expect("all collections happen after Jan 1 1970");
3869 if collection_timestamp < cutoff_ts {
3870 debug!("pruning storage event {row:?}");
3871 let builtin_update = BuiltinTableUpdate::row(item_id, row, Diff::MINUS_ONE);
3872 expired.push(builtin_update);
3873 }
3874 }
3875
3876 let _ = internal_cmd_tx.send(Message::StorageUsagePrune(expired));
3878 });
3879 }
3880}
3881
3882#[cfg(test)]
3883impl Coordinator {
3884 #[allow(dead_code)]
3885 async fn verify_ship_dataflow_no_error(&mut self, dataflow: DataflowDescription<Plan>) {
3886 let compute_instance = ComputeInstanceId::user(1).expect("1 is a valid ID");
3894
3895 let _: () = self.ship_dataflow(dataflow, compute_instance, None).await;
3896 }
3897}
3898
3899struct LastMessage {
3901 kind: &'static str,
3902 stmt: Option<Arc<Statement<Raw>>>,
3903}
3904
3905impl LastMessage {
3906 fn stmt_to_string(&self) -> Cow<'static, str> {
3908 self.stmt
3909 .as_ref()
3910 .map(|stmt| stmt.to_ast_string_redacted().into())
3911 .unwrap_or(Cow::Borrowed("<none>"))
3912 }
3913}
3914
3915impl fmt::Debug for LastMessage {
3916 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3917 f.debug_struct("LastMessage")
3918 .field("kind", &self.kind)
3919 .field("stmt", &self.stmt_to_string())
3920 .finish()
3921 }
3922}
3923
3924impl Drop for LastMessage {
3925 fn drop(&mut self) {
3926 if std::thread::panicking() {
3928 eprintln!("Coordinator panicking, dumping last message\n{self:?}",);
3930 }
3931 }
3932}
3933
3934pub fn serve(
3946 Config {
3947 controller_config,
3948 controller_envd_epoch,
3949 mut storage,
3950 audit_logs_iterator,
3951 timestamp_oracle_url,
3952 unsafe_mode,
3953 all_features,
3954 build_info,
3955 environment_id,
3956 metrics_registry,
3957 now,
3958 secrets_controller,
3959 cloud_resource_controller,
3960 cluster_replica_sizes,
3961 builtin_system_cluster_config,
3962 builtin_catalog_server_cluster_config,
3963 builtin_probe_cluster_config,
3964 builtin_support_cluster_config,
3965 builtin_analytics_cluster_config,
3966 system_parameter_defaults,
3967 availability_zones,
3968 storage_usage_client,
3969 storage_usage_collection_interval,
3970 storage_usage_retention_period,
3971 segment_client,
3972 egress_addresses,
3973 aws_account_id,
3974 aws_privatelink_availability_zones,
3975 connection_context,
3976 connection_limit_callback,
3977 remote_system_parameters,
3978 webhook_concurrency_limit,
3979 http_host_name,
3980 tracing_handle,
3981 read_only_controllers,
3982 caught_up_trigger: clusters_caught_up_trigger,
3983 helm_chart_version,
3984 license_key,
3985 external_login_password_mz_system,
3986 }: Config,
3987) -> BoxFuture<'static, Result<(Handle, Client), AdapterError>> {
3988 async move {
3989 let coord_start = Instant::now();
3990 info!("startup: coordinator init: beginning");
3991 info!("startup: coordinator init: preamble beginning");
3992
3993 let _builtins = LazyLock::force(&BUILTINS_STATIC);
3997
3998 let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
3999 let (internal_cmd_tx, internal_cmd_rx) = mpsc::unbounded_channel();
4000 let (strict_serializable_reads_tx, strict_serializable_reads_rx) =
4001 mpsc::unbounded_channel();
4002
4003 if !availability_zones.iter().all_unique() {
4005 coord_bail!("availability zones must be unique");
4006 }
4007
4008 let aws_principal_context = match (
4009 aws_account_id,
4010 connection_context.aws_external_id_prefix.clone(),
4011 ) {
4012 (Some(aws_account_id), Some(aws_external_id_prefix)) => Some(AwsPrincipalContext {
4013 aws_account_id,
4014 aws_external_id_prefix,
4015 }),
4016 _ => None,
4017 };
4018
4019 let aws_privatelink_availability_zones = aws_privatelink_availability_zones
4020 .map(|azs_vec| BTreeSet::from_iter(azs_vec.iter().cloned()));
4021
4022 info!(
4023 "startup: coordinator init: preamble complete in {:?}",
4024 coord_start.elapsed()
4025 );
4026 let oracle_init_start = Instant::now();
4027 info!("startup: coordinator init: timestamp oracle init beginning");
4028
4029 let pg_timestamp_oracle_config = timestamp_oracle_url
4030 .map(|pg_url| PostgresTimestampOracleConfig::new(&pg_url, &metrics_registry));
4031 let mut initial_timestamps =
4032 get_initial_oracle_timestamps(&pg_timestamp_oracle_config).await?;
4033
4034 initial_timestamps
4038 .entry(Timeline::EpochMilliseconds)
4039 .or_insert_with(mz_repr::Timestamp::minimum);
4040 let mut timestamp_oracles = BTreeMap::new();
4041 for (timeline, initial_timestamp) in initial_timestamps {
4042 Coordinator::ensure_timeline_state_with_initial_time(
4043 &timeline,
4044 initial_timestamp,
4045 now.clone(),
4046 pg_timestamp_oracle_config.clone(),
4047 &mut timestamp_oracles,
4048 read_only_controllers,
4049 )
4050 .await;
4051 }
4052
4053 let catalog_upper = storage.current_upper().await;
4057 let epoch_millis_oracle = ×tamp_oracles
4063 .get(&Timeline::EpochMilliseconds)
4064 .expect("inserted above")
4065 .oracle;
4066
4067 let mut boot_ts = if read_only_controllers {
4068 let read_ts = epoch_millis_oracle.read_ts().await;
4069 std::cmp::max(read_ts, catalog_upper)
4070 } else {
4071 epoch_millis_oracle.apply_write(catalog_upper).await;
4074 epoch_millis_oracle.write_ts().await.timestamp
4075 };
4076
4077 info!(
4078 "startup: coordinator init: timestamp oracle init complete in {:?}",
4079 oracle_init_start.elapsed()
4080 );
4081
4082 let catalog_open_start = Instant::now();
4083 info!("startup: coordinator init: catalog open beginning");
4084 let persist_client = controller_config
4085 .persist_clients
4086 .open(controller_config.persist_location.clone())
4087 .await
4088 .context("opening persist client")?;
4089 let builtin_item_migration_config =
4090 BuiltinItemMigrationConfig {
4091 persist_client: persist_client.clone(),
4092 read_only: read_only_controllers,
4093 }
4094 ;
4095 let OpenCatalogResult {
4096 mut catalog,
4097 migrated_storage_collections_0dt,
4098 new_builtin_collections,
4099 builtin_table_updates,
4100 cached_global_exprs,
4101 uncached_local_exprs,
4102 } = Catalog::open(mz_catalog::config::Config {
4103 storage,
4104 metrics_registry: &metrics_registry,
4105 state: mz_catalog::config::StateConfig {
4106 unsafe_mode,
4107 all_features,
4108 build_info,
4109 environment_id: environment_id.clone(),
4110 read_only: read_only_controllers,
4111 now: now.clone(),
4112 boot_ts: boot_ts.clone(),
4113 skip_migrations: false,
4114 cluster_replica_sizes,
4115 builtin_system_cluster_config,
4116 builtin_catalog_server_cluster_config,
4117 builtin_probe_cluster_config,
4118 builtin_support_cluster_config,
4119 builtin_analytics_cluster_config,
4120 system_parameter_defaults,
4121 remote_system_parameters,
4122 availability_zones,
4123 egress_addresses,
4124 aws_principal_context,
4125 aws_privatelink_availability_zones,
4126 connection_context,
4127 http_host_name,
4128 builtin_item_migration_config,
4129 persist_client: persist_client.clone(),
4130 enable_expression_cache_override: None,
4131 helm_chart_version,
4132 external_login_password_mz_system,
4133 license_key: license_key.clone(),
4134 },
4135 })
4136 .await?;
4137
4138 let catalog_upper = catalog.current_upper().await;
4141 boot_ts = std::cmp::max(boot_ts, catalog_upper);
4142
4143 if !read_only_controllers {
4144 epoch_millis_oracle.apply_write(boot_ts).await;
4145 }
4146
4147 info!(
4148 "startup: coordinator init: catalog open complete in {:?}",
4149 catalog_open_start.elapsed()
4150 );
4151
4152 let coord_thread_start = Instant::now();
4153 info!("startup: coordinator init: coordinator thread start beginning");
4154
4155 let session_id = catalog.config().session_id;
4156 let start_instant = catalog.config().start_instant;
4157
4158 let (bootstrap_tx, bootstrap_rx) = oneshot::channel();
4162 let handle = TokioHandle::current();
4163
4164 let metrics = Metrics::register_into(&metrics_registry);
4165 let metrics_clone = metrics.clone();
4166 let optimizer_metrics = OptimizerMetrics::register_into(
4167 &metrics_registry,
4168 catalog.system_config().optimizer_e2e_latency_warning_threshold(),
4169 );
4170 let segment_client_clone = segment_client.clone();
4171 let coord_now = now.clone();
4172 let advance_timelines_interval = tokio::time::interval(catalog.config().timestamp_interval);
4173 let mut check_scheduling_policies_interval = tokio::time::interval(
4174 catalog
4175 .system_config()
4176 .cluster_check_scheduling_policies_interval(),
4177 );
4178 check_scheduling_policies_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
4179
4180 let clusters_caught_up_check_interval = if read_only_controllers {
4181 let dyncfgs = catalog.system_config().dyncfgs();
4182 let interval = WITH_0DT_DEPLOYMENT_CAUGHT_UP_CHECK_INTERVAL.get(dyncfgs);
4183
4184 let mut interval = tokio::time::interval(interval);
4185 interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
4186 interval
4187 } else {
4188 let mut interval = tokio::time::interval(Duration::from_secs(60 * 60));
4196 interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
4197 interval
4198 };
4199
4200 let clusters_caught_up_check =
4201 clusters_caught_up_trigger.map(|trigger| CaughtUpCheckContext {
4202 trigger,
4203 exclude_collections: new_builtin_collections.into_iter().collect(),
4204 });
4205
4206 if let Some(config) = pg_timestamp_oracle_config.as_ref() {
4207 let pg_timestamp_oracle_params =
4210 flags::pg_timstamp_oracle_config(catalog.system_config());
4211 pg_timestamp_oracle_params.apply(config);
4212 }
4213
4214 let connection_limit_callback: Arc<dyn Fn(&SystemVars) + Send + Sync> =
4217 Arc::new(move |system_vars: &SystemVars| {
4218 let limit: u64 = system_vars.max_connections().cast_into();
4219 let superuser_reserved: u64 =
4220 system_vars.superuser_reserved_connections().cast_into();
4221
4222 let superuser_reserved = if superuser_reserved >= limit {
4227 tracing::warn!(
4228 "superuser_reserved ({superuser_reserved}) is greater than max connections ({limit})!"
4229 );
4230 limit
4231 } else {
4232 superuser_reserved
4233 };
4234
4235 (connection_limit_callback)(limit, superuser_reserved);
4236 });
4237 catalog.system_config_mut().register_callback(
4238 &mz_sql::session::vars::MAX_CONNECTIONS,
4239 Arc::clone(&connection_limit_callback),
4240 );
4241 catalog.system_config_mut().register_callback(
4242 &mz_sql::session::vars::SUPERUSER_RESERVED_CONNECTIONS,
4243 connection_limit_callback,
4244 );
4245
4246 let (group_commit_tx, group_commit_rx) = appends::notifier();
4247
4248 let parent_span = tracing::Span::current();
4249 let thread = thread::Builder::new()
4250 .stack_size(3 * stack::STACK_SIZE)
4254 .name("coordinator".to_string())
4255 .spawn(move || {
4256 let span = info_span!(parent: parent_span, "coord::coordinator").entered();
4257
4258 let controller = handle
4259 .block_on({
4260 catalog.initialize_controller(
4261 controller_config,
4262 controller_envd_epoch,
4263 read_only_controllers,
4264 )
4265 })
4266 .unwrap_or_terminate("failed to initialize storage_controller");
4267 let catalog_upper = handle.block_on(catalog.current_upper());
4270 boot_ts = std::cmp::max(boot_ts, catalog_upper);
4271 if !read_only_controllers {
4272 let epoch_millis_oracle = ×tamp_oracles
4273 .get(&Timeline::EpochMilliseconds)
4274 .expect("inserted above")
4275 .oracle;
4276 handle.block_on(epoch_millis_oracle.apply_write(boot_ts));
4277 }
4278
4279 let catalog = Arc::new(catalog);
4280
4281 let caching_secrets_reader = CachingSecretsReader::new(secrets_controller.reader());
4282 let mut coord = Coordinator {
4283 controller,
4284 catalog,
4285 internal_cmd_tx,
4286 group_commit_tx,
4287 strict_serializable_reads_tx,
4288 global_timelines: timestamp_oracles,
4289 transient_id_gen: Arc::new(TransientIdGen::new()),
4290 active_conns: BTreeMap::new(),
4291 txn_read_holds: Default::default(),
4292 pending_peeks: BTreeMap::new(),
4293 client_pending_peeks: BTreeMap::new(),
4294 pending_linearize_read_txns: BTreeMap::new(),
4295 serialized_ddl: LockedVecDeque::new(),
4296 active_compute_sinks: BTreeMap::new(),
4297 active_webhooks: BTreeMap::new(),
4298 active_copies: BTreeMap::new(),
4299 staged_cancellation: BTreeMap::new(),
4300 introspection_subscribes: BTreeMap::new(),
4301 write_locks: BTreeMap::new(),
4302 deferred_write_ops: BTreeMap::new(),
4303 pending_writes: Vec::new(),
4304 advance_timelines_interval,
4305 secrets_controller,
4306 caching_secrets_reader,
4307 cloud_resource_controller,
4308 storage_usage_client,
4309 storage_usage_collection_interval,
4310 segment_client,
4311 metrics,
4312 optimizer_metrics,
4313 tracing_handle,
4314 statement_logging: StatementLogging::new(coord_now.clone()),
4315 webhook_concurrency_limit,
4316 pg_timestamp_oracle_config,
4317 check_cluster_scheduling_policies_interval: check_scheduling_policies_interval,
4318 cluster_scheduling_decisions: BTreeMap::new(),
4319 caught_up_check_interval: clusters_caught_up_check_interval,
4320 caught_up_check: clusters_caught_up_check,
4321 installed_watch_sets: BTreeMap::new(),
4322 connection_watch_sets: BTreeMap::new(),
4323 cluster_replica_statuses: ClusterReplicaStatuses::new(),
4324 read_only_controllers,
4325 buffered_builtin_table_updates: Some(Vec::new()),
4326 license_key,
4327 persist_client,
4328 };
4329 let bootstrap = handle.block_on(async {
4330 coord
4331 .bootstrap(
4332 boot_ts,
4333 migrated_storage_collections_0dt,
4334 builtin_table_updates,
4335 cached_global_exprs,
4336 uncached_local_exprs,
4337 audit_logs_iterator,
4338 )
4339 .await?;
4340 coord
4341 .controller
4342 .remove_orphaned_replicas(
4343 coord.catalog().get_next_user_replica_id().await?,
4344 coord.catalog().get_next_system_replica_id().await?,
4345 )
4346 .await
4347 .map_err(AdapterError::Orchestrator)?;
4348
4349 if let Some(retention_period) = storage_usage_retention_period {
4350 coord
4351 .prune_storage_usage_events_on_startup(retention_period)
4352 .await;
4353 }
4354
4355 Ok(())
4356 });
4357 let ok = bootstrap.is_ok();
4358 drop(span);
4359 bootstrap_tx
4360 .send(bootstrap)
4361 .expect("bootstrap_rx is not dropped until it receives this message");
4362 if ok {
4363 handle.block_on(coord.serve(
4364 internal_cmd_rx,
4365 strict_serializable_reads_rx,
4366 cmd_rx,
4367 group_commit_rx,
4368 ));
4369 }
4370 })
4371 .expect("failed to create coordinator thread");
4372 match bootstrap_rx
4373 .await
4374 .expect("bootstrap_tx always sends a message or panics/halts")
4375 {
4376 Ok(()) => {
4377 info!(
4378 "startup: coordinator init: coordinator thread start complete in {:?}",
4379 coord_thread_start.elapsed()
4380 );
4381 info!(
4382 "startup: coordinator init: complete in {:?}",
4383 coord_start.elapsed()
4384 );
4385 let handle = Handle {
4386 session_id,
4387 start_instant,
4388 _thread: thread.join_on_drop(),
4389 };
4390 let client = Client::new(
4391 build_info,
4392 cmd_tx.clone(),
4393 metrics_clone,
4394 now,
4395 environment_id,
4396 segment_client_clone,
4397 );
4398 Ok((handle, client))
4399 }
4400 Err(e) => Err(e),
4401 }
4402 }
4403 .boxed()
4404}
4405
4406async fn get_initial_oracle_timestamps(
4420 pg_timestamp_oracle_config: &Option<PostgresTimestampOracleConfig>,
4421) -> Result<BTreeMap<Timeline, Timestamp>, AdapterError> {
4422 let mut initial_timestamps = BTreeMap::new();
4423
4424 if let Some(pg_timestamp_oracle_config) = pg_timestamp_oracle_config {
4425 let postgres_oracle_timestamps =
4426 PostgresTimestampOracle::<NowFn>::get_all_timelines(pg_timestamp_oracle_config.clone())
4427 .await?;
4428
4429 let debug_msg = || {
4430 postgres_oracle_timestamps
4431 .iter()
4432 .map(|(timeline, ts)| format!("{:?} -> {}", timeline, ts))
4433 .join(", ")
4434 };
4435 info!(
4436 "current timestamps from the postgres-backed timestamp oracle: {}",
4437 debug_msg()
4438 );
4439
4440 for (timeline, ts) in postgres_oracle_timestamps {
4441 let entry = initial_timestamps
4442 .entry(Timeline::from_str(&timeline).expect("could not parse timeline"));
4443
4444 entry
4445 .and_modify(|current_ts| *current_ts = std::cmp::max(*current_ts, ts))
4446 .or_insert(ts);
4447 }
4448 } else {
4449 info!("no postgres url for postgres-backed timestamp oracle configured!");
4450 };
4451
4452 let debug_msg = || {
4453 initial_timestamps
4454 .iter()
4455 .map(|(timeline, ts)| format!("{:?}: {}", timeline, ts))
4456 .join(", ")
4457 };
4458 info!("initial oracle timestamps: {}", debug_msg());
4459
4460 Ok(initial_timestamps)
4461}
4462
4463#[instrument]
4464pub async fn load_remote_system_parameters(
4465 storage: &mut Box<dyn OpenableDurableCatalogState>,
4466 system_parameter_sync_config: Option<SystemParameterSyncConfig>,
4467 system_parameter_sync_timeout: Duration,
4468) -> Result<Option<BTreeMap<String, String>>, AdapterError> {
4469 if let Some(system_parameter_sync_config) = system_parameter_sync_config {
4470 tracing::info!("parameter sync on boot: start sync");
4471
4472 let mut params = SynchronizedParameters::new(SystemVars::default());
4512 let frontend_sync = async {
4513 let frontend = SystemParameterFrontend::from(&system_parameter_sync_config).await?;
4514 frontend.pull(&mut params);
4515 let ops = params
4516 .modified()
4517 .into_iter()
4518 .map(|param| {
4519 let name = param.name;
4520 let value = param.value;
4521 tracing::info!(name, value, initial = true, "sync parameter");
4522 (name, value)
4523 })
4524 .collect();
4525 tracing::info!("parameter sync on boot: end sync");
4526 Ok(Some(ops))
4527 };
4528 if !storage.has_system_config_synced_once().await? {
4529 frontend_sync.await
4530 } else {
4531 match mz_ore::future::timeout(system_parameter_sync_timeout, frontend_sync).await {
4532 Ok(ops) => Ok(ops),
4533 Err(TimeoutError::Inner(e)) => Err(e),
4534 Err(TimeoutError::DeadlineElapsed) => {
4535 tracing::info!("parameter sync on boot: sync has timed out");
4536 Ok(None)
4537 }
4538 }
4539 }
4540 } else {
4541 Ok(None)
4542 }
4543}
4544
4545#[derive(Debug)]
4546pub enum WatchSetResponse {
4547 StatementDependenciesReady(StatementLoggingId, StatementLifecycleEvent),
4548 AlterSinkReady(AlterSinkReadyContext),
4549}
4550
4551#[derive(Debug)]
4552pub struct AlterSinkReadyContext {
4553 ctx: Option<ExecuteContext>,
4554 otel_ctx: OpenTelemetryContext,
4555 plan: AlterSinkPlan,
4556 plan_validity: PlanValidity,
4557 resolved_ids: ResolvedIds,
4558 read_hold: ReadHolds<Timestamp>,
4559}
4560
4561impl AlterSinkReadyContext {
4562 fn ctx(&mut self) -> &mut ExecuteContext {
4563 self.ctx.as_mut().expect("only cleared on drop")
4564 }
4565
4566 fn retire(mut self, result: Result<ExecuteResponse, AdapterError>) {
4567 self.ctx
4568 .take()
4569 .expect("only cleared on drop")
4570 .retire(result);
4571 }
4572}
4573
4574impl Drop for AlterSinkReadyContext {
4575 fn drop(&mut self) {
4576 if let Some(ctx) = self.ctx.take() {
4577 ctx.retire(Err(AdapterError::Canceled));
4578 }
4579 }
4580}
4581
4582#[derive(Debug)]
4585struct LockedVecDeque<T> {
4586 items: VecDeque<T>,
4587 lock: Arc<tokio::sync::Mutex<()>>,
4588}
4589
4590impl<T> LockedVecDeque<T> {
4591 pub fn new() -> Self {
4592 Self {
4593 items: VecDeque::new(),
4594 lock: Arc::new(tokio::sync::Mutex::new(())),
4595 }
4596 }
4597
4598 pub fn try_lock_owned(&self) -> Result<OwnedMutexGuard<()>, tokio::sync::TryLockError> {
4599 Arc::clone(&self.lock).try_lock_owned()
4600 }
4601
4602 pub fn is_empty(&self) -> bool {
4603 self.items.is_empty()
4604 }
4605
4606 pub fn push_back(&mut self, value: T) {
4607 self.items.push_back(value)
4608 }
4609
4610 pub fn pop_front(&mut self) -> Option<T> {
4611 self.items.pop_front()
4612 }
4613
4614 pub fn remove(&mut self, index: usize) -> Option<T> {
4615 self.items.remove(index)
4616 }
4617
4618 pub fn iter(&self) -> std::collections::vec_deque::Iter<'_, T> {
4619 self.items.iter()
4620 }
4621}
4622
4623#[derive(Debug)]
4624struct DeferredPlanStatement {
4625 ctx: ExecuteContext,
4626 ps: PlanStatement,
4627}
4628
4629#[derive(Debug)]
4630enum PlanStatement {
4631 Statement {
4632 stmt: Arc<Statement<Raw>>,
4633 params: Params,
4634 },
4635 Plan {
4636 plan: mz_sql::plan::Plan,
4637 resolved_ids: ResolvedIds,
4638 },
4639}
4640
4641#[derive(Debug, Error)]
4642pub enum NetworkPolicyError {
4643 #[error("Access denied for address {0}")]
4644 AddressDenied(IpAddr),
4645 #[error("Access denied missing IP address")]
4646 MissingIp,
4647}
4648
4649pub(crate) fn validate_ip_with_policy_rules(
4650 ip: &IpAddr,
4651 rules: &Vec<NetworkPolicyRule>,
4652) -> Result<(), NetworkPolicyError> {
4653 if rules.iter().any(|r| r.address.0.contains(ip)) {
4656 Ok(())
4657 } else {
4658 Err(NetworkPolicyError::AddressDenied(ip.clone()))
4659 }
4660}