1use std::borrow::Cow;
70use std::collections::{BTreeMap, BTreeSet, VecDeque};
71use std::fmt;
72use std::net::IpAddr;
73use std::num::NonZeroI64;
74use std::ops::Neg;
75use std::str::FromStr;
76use std::sync::LazyLock;
77use std::sync::{Arc, Mutex};
78use std::thread;
79use std::time::{Duration, Instant};
80
81use anyhow::Context;
82use chrono::{DateTime, Utc};
83use derivative::Derivative;
84use differential_dataflow::lattice::Lattice;
85use fail::fail_point;
86use futures::StreamExt;
87use futures::future::{BoxFuture, FutureExt, LocalBoxFuture};
88use http::Uri;
89use ipnet::IpNet;
90use itertools::{Either, Itertools};
91use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
92use mz_adapter_types::compaction::CompactionWindow;
93use mz_adapter_types::connection::ConnectionId;
94use mz_adapter_types::dyncfgs::WITH_0DT_DEPLOYMENT_CAUGHT_UP_CHECK_INTERVAL;
95use mz_build_info::BuildInfo;
96use mz_catalog::builtin::{BUILTINS, BUILTINS_STATIC, MZ_AUDIT_EVENTS, MZ_STORAGE_USAGE_BY_SHARD};
97use mz_catalog::config::{AwsPrincipalContext, BuiltinItemMigrationConfig, ClusterReplicaSizeMap};
98use mz_catalog::durable::{AuditLogIterator, OpenableDurableCatalogState};
99use mz_catalog::expr_cache::{GlobalExpressions, LocalExpressions};
100use mz_catalog::memory::objects::{
101 CatalogEntry, CatalogItem, ClusterReplicaProcessStatus, ClusterVariantManaged, Connection,
102 DataSourceDesc, StateDiff, StateUpdate, StateUpdateKind, Table, TableDataSource,
103};
104use mz_cloud_resources::{CloudResourceController, VpcEndpointConfig, VpcEndpointEvent};
105use mz_compute_client::as_of_selection;
106use mz_compute_client::controller::error::InstanceMissing;
107use mz_compute_types::ComputeInstanceId;
108use mz_compute_types::dataflows::DataflowDescription;
109use mz_compute_types::plan::Plan;
110use mz_controller::ControllerConfig;
111use mz_controller::clusters::{ClusterConfig, ClusterEvent, ClusterStatus, ProcessId};
112use mz_controller_types::{ClusterId, ReplicaId, WatchSetId};
113use mz_expr::{MapFilterProject, OptimizedMirRelationExpr, RowSetFinishing};
114use mz_license_keys::ValidatedLicenseKey;
115use mz_orchestrator::{OfflineReason, ServiceProcessMetrics};
116use mz_ore::cast::{CastFrom, CastInto, CastLossy};
117use mz_ore::channel::trigger::Trigger;
118use mz_ore::future::TimeoutError;
119use mz_ore::metrics::MetricsRegistry;
120use mz_ore::now::{EpochMillis, NowFn};
121use mz_ore::task::{JoinHandle, spawn};
122use mz_ore::thread::JoinHandleExt;
123use mz_ore::tracing::{OpenTelemetryContext, TracingHandle};
124use mz_ore::url::SensitiveUrl;
125use mz_ore::vec::VecExt;
126use mz_ore::{
127 assert_none, instrument, soft_assert_eq_or_log, soft_assert_or_log, soft_panic_or_log, stack,
128};
129use mz_persist_client::batch::ProtoBatch;
130use mz_persist_client::usage::{ShardsUsageReferenced, StorageUsageClient};
131use mz_repr::explain::{ExplainConfig, ExplainFormat};
132use mz_repr::global_id::TransientIdGen;
133use mz_repr::optimize::OptimizerFeatures;
134use mz_repr::role_id::RoleId;
135use mz_repr::{CatalogItemId, Diff, GlobalId, RelationDesc, Timestamp};
136use mz_secrets::cache::CachingSecretsReader;
137use mz_secrets::{SecretsController, SecretsReader};
138use mz_sql::ast::{Raw, Statement};
139use mz_sql::catalog::{CatalogCluster, EnvironmentId};
140use mz_sql::names::{QualifiedItemName, ResolvedIds, SchemaSpecifier};
141use mz_sql::optimizer_metrics::OptimizerMetrics;
142use mz_sql::plan::{
143 self, AlterSinkPlan, ConnectionDetails, CreateConnectionPlan, NetworkPolicyRule,
144 OnTimeoutAction, Params, QueryWhen,
145};
146use mz_sql::session::user::User;
147use mz_sql::session::vars::SystemVars;
148use mz_sql_parser::ast::ExplainStage;
149use mz_sql_parser::ast::display::AstDisplay;
150use mz_storage_client::client::TableData;
151use mz_storage_client::controller::{CollectionDescription, DataSource, ExportDescription};
152use mz_storage_types::connections::Connection as StorageConnection;
153use mz_storage_types::connections::ConnectionContext;
154use mz_storage_types::connections::inline::{IntoInlineConnection, ReferencedConnection};
155use mz_storage_types::read_holds::ReadHold;
156use mz_storage_types::sinks::{S3SinkFormat, StorageSinkDesc};
157use mz_storage_types::sources::Timeline;
158use mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC;
159use mz_timestamp_oracle::WriteTimestamp;
160use mz_timestamp_oracle::postgres_oracle::{
161 PostgresTimestampOracle, PostgresTimestampOracleConfig,
162};
163use mz_transform::dataflow::DataflowMetainfo;
164use opentelemetry::trace::TraceContextExt;
165use serde::Serialize;
166use thiserror::Error;
167use timely::progress::{Antichain, Timestamp as _};
168use tokio::runtime::Handle as TokioHandle;
169use tokio::select;
170use tokio::sync::{OwnedMutexGuard, mpsc, oneshot, watch};
171use tokio::time::{Interval, MissedTickBehavior};
172use tracing::{Instrument, Level, Span, debug, info, info_span, span, warn};
173use tracing_opentelemetry::OpenTelemetrySpanExt;
174use uuid::Uuid;
175
176use crate::active_compute_sink::{ActiveComputeSink, ActiveCopyFrom};
177use crate::catalog::{BuiltinTableUpdate, Catalog, OpenCatalogResult};
178use crate::client::{Client, Handle};
179use crate::command::{Command, ExecuteResponse};
180use crate::config::{SynchronizedParameters, SystemParameterFrontend, SystemParameterSyncConfig};
181use crate::coord::appends::{
182 BuiltinTableAppendNotify, DeferredOp, GroupCommitPermit, PendingWriteTxn,
183};
184use crate::coord::caught_up::CaughtUpCheckContext;
185use crate::coord::cluster_scheduling::SchedulingDecision;
186use crate::coord::id_bundle::CollectionIdBundle;
187use crate::coord::introspection::IntrospectionSubscribe;
188use crate::coord::peek::PendingPeek;
189use crate::coord::statement_logging::{StatementLogging, StatementLoggingId};
190use crate::coord::timeline::{TimelineContext, TimelineState};
191use crate::coord::timestamp_selection::{TimestampContext, TimestampDetermination};
192use crate::coord::validity::PlanValidity;
193use crate::error::AdapterError;
194use crate::explain::insights::PlanInsightsContext;
195use crate::explain::optimizer_trace::{DispatchGuard, OptimizerTrace};
196use crate::metrics::Metrics;
197use crate::optimize::dataflows::{
198 ComputeInstanceSnapshot, DataflowBuilder, dataflow_import_id_bundle,
199};
200use crate::optimize::{self, Optimize, OptimizerConfig};
201use crate::session::{EndTransactionAction, Session};
202use crate::statement_logging::{StatementEndedExecutionReason, StatementLifecycleEvent};
203use crate::util::{ClientTransmitter, ResultExt};
204use crate::webhook::{WebhookAppenderInvalidator, WebhookConcurrencyLimiter};
205use crate::{AdapterNotice, ReadHolds, flags};
206
207pub(crate) mod id_bundle;
208pub(crate) mod in_memory_oracle;
209pub(crate) mod peek;
210pub(crate) mod statement_logging;
211pub(crate) mod timeline;
212pub(crate) mod timestamp_selection;
213
214pub mod appends;
215mod catalog_serving;
216mod caught_up;
217pub mod cluster_scheduling;
218mod command_handler;
219pub mod consistency;
220mod ddl;
221mod indexes;
222mod introspection;
223mod message_handler;
224mod privatelink_status;
225pub mod read_policy;
226mod sequencer;
227mod sql;
228mod validity;
229
230#[derive(Debug)]
231pub enum Message {
232 Command(OpenTelemetryContext, Command),
233 ControllerReady,
234 PurifiedStatementReady(PurifiedStatementReady),
235 CreateConnectionValidationReady(CreateConnectionValidationReady),
236 AlterConnectionValidationReady(AlterConnectionValidationReady),
237 TryDeferred {
238 conn_id: ConnectionId,
240 acquired_lock: Option<(CatalogItemId, tokio::sync::OwnedMutexGuard<()>)>,
250 },
251 GroupCommitInitiate(Span, Option<GroupCommitPermit>),
253 DeferredStatementReady,
254 AdvanceTimelines,
255 ClusterEvent(ClusterEvent),
256 CancelPendingPeeks {
257 conn_id: ConnectionId,
258 },
259 LinearizeReads,
260 StagedBatches {
261 conn_id: ConnectionId,
262 table_id: CatalogItemId,
263 batches: Vec<Result<ProtoBatch, String>>,
264 },
265 StorageUsageSchedule,
266 StorageUsageFetch,
267 StorageUsageUpdate(ShardsUsageReferenced),
268 StorageUsagePrune(Vec<BuiltinTableUpdate>),
269 RetireExecute {
272 data: ExecuteContextExtra,
273 otel_ctx: OpenTelemetryContext,
274 reason: StatementEndedExecutionReason,
275 },
276 ExecuteSingleStatementTransaction {
277 ctx: ExecuteContext,
278 otel_ctx: OpenTelemetryContext,
279 stmt: Arc<Statement<Raw>>,
280 params: mz_sql::plan::Params,
281 },
282 PeekStageReady {
283 ctx: ExecuteContext,
284 span: Span,
285 stage: PeekStage,
286 },
287 CreateIndexStageReady {
288 ctx: ExecuteContext,
289 span: Span,
290 stage: CreateIndexStage,
291 },
292 CreateViewStageReady {
293 ctx: ExecuteContext,
294 span: Span,
295 stage: CreateViewStage,
296 },
297 CreateMaterializedViewStageReady {
298 ctx: ExecuteContext,
299 span: Span,
300 stage: CreateMaterializedViewStage,
301 },
302 SubscribeStageReady {
303 ctx: ExecuteContext,
304 span: Span,
305 stage: SubscribeStage,
306 },
307 IntrospectionSubscribeStageReady {
308 span: Span,
309 stage: IntrospectionSubscribeStage,
310 },
311 SecretStageReady {
312 ctx: ExecuteContext,
313 span: Span,
314 stage: SecretStage,
315 },
316 ClusterStageReady {
317 ctx: ExecuteContext,
318 span: Span,
319 stage: ClusterStage,
320 },
321 ExplainTimestampStageReady {
322 ctx: ExecuteContext,
323 span: Span,
324 stage: ExplainTimestampStage,
325 },
326 DrainStatementLog,
327 PrivateLinkVpcEndpointEvents(Vec<VpcEndpointEvent>),
328 CheckSchedulingPolicies,
329
330 SchedulingDecisions(Vec<(&'static str, Vec<(ClusterId, SchedulingDecision)>)>),
335}
336
337impl Message {
338 pub const fn kind(&self) -> &'static str {
340 match self {
341 Message::Command(_, msg) => match msg {
342 Command::CatalogSnapshot { .. } => "command-catalog_snapshot",
343 Command::Startup { .. } => "command-startup",
344 Command::Execute { .. } => "command-execute",
345 Command::Commit { .. } => "command-commit",
346 Command::CancelRequest { .. } => "command-cancel_request",
347 Command::PrivilegedCancelRequest { .. } => "command-privileged_cancel_request",
348 Command::GetWebhook { .. } => "command-get_webhook",
349 Command::GetSystemVars { .. } => "command-get_system_vars",
350 Command::SetSystemVars { .. } => "command-set_system_vars",
351 Command::Terminate { .. } => "command-terminate",
352 Command::RetireExecute { .. } => "command-retire_execute",
353 Command::CheckConsistency { .. } => "command-check_consistency",
354 Command::Dump { .. } => "command-dump",
355 Command::AuthenticatePassword { .. } => "command-auth_check",
356 },
357 Message::ControllerReady => "controller_ready",
358 Message::PurifiedStatementReady(_) => "purified_statement_ready",
359 Message::CreateConnectionValidationReady(_) => "create_connection_validation_ready",
360 Message::TryDeferred { .. } => "try_deferred",
361 Message::GroupCommitInitiate(..) => "group_commit_initiate",
362 Message::AdvanceTimelines => "advance_timelines",
363 Message::ClusterEvent(_) => "cluster_event",
364 Message::CancelPendingPeeks { .. } => "cancel_pending_peeks",
365 Message::LinearizeReads => "linearize_reads",
366 Message::StagedBatches { .. } => "staged_batches",
367 Message::StorageUsageSchedule => "storage_usage_schedule",
368 Message::StorageUsageFetch => "storage_usage_fetch",
369 Message::StorageUsageUpdate(_) => "storage_usage_update",
370 Message::StorageUsagePrune(_) => "storage_usage_prune",
371 Message::RetireExecute { .. } => "retire_execute",
372 Message::ExecuteSingleStatementTransaction { .. } => {
373 "execute_single_statement_transaction"
374 }
375 Message::PeekStageReady { .. } => "peek_stage_ready",
376 Message::ExplainTimestampStageReady { .. } => "explain_timestamp_stage_ready",
377 Message::CreateIndexStageReady { .. } => "create_index_stage_ready",
378 Message::CreateViewStageReady { .. } => "create_view_stage_ready",
379 Message::CreateMaterializedViewStageReady { .. } => {
380 "create_materialized_view_stage_ready"
381 }
382 Message::SubscribeStageReady { .. } => "subscribe_stage_ready",
383 Message::IntrospectionSubscribeStageReady { .. } => {
384 "introspection_subscribe_stage_ready"
385 }
386 Message::SecretStageReady { .. } => "secret_stage_ready",
387 Message::ClusterStageReady { .. } => "cluster_stage_ready",
388 Message::DrainStatementLog => "drain_statement_log",
389 Message::AlterConnectionValidationReady(..) => "alter_connection_validation_ready",
390 Message::PrivateLinkVpcEndpointEvents(_) => "private_link_vpc_endpoint_events",
391 Message::CheckSchedulingPolicies => "check_scheduling_policies",
392 Message::SchedulingDecisions { .. } => "scheduling_decision",
393 Message::DeferredStatementReady => "deferred_statement_ready",
394 }
395 }
396}
397
398#[derive(Derivative)]
399#[derivative(Debug)]
400pub struct BackgroundWorkResult<T> {
401 #[derivative(Debug = "ignore")]
402 pub ctx: ExecuteContext,
403 pub result: Result<T, AdapterError>,
404 pub params: Params,
405 pub plan_validity: PlanValidity,
406 pub original_stmt: Arc<Statement<Raw>>,
407 pub otel_ctx: OpenTelemetryContext,
408}
409
410pub type PurifiedStatementReady = BackgroundWorkResult<mz_sql::pure::PurifiedStatement>;
411
412#[derive(Derivative)]
413#[derivative(Debug)]
414pub struct ValidationReady<T> {
415 #[derivative(Debug = "ignore")]
416 pub ctx: ExecuteContext,
417 pub result: Result<T, AdapterError>,
418 pub resolved_ids: ResolvedIds,
419 pub connection_id: CatalogItemId,
420 pub connection_gid: GlobalId,
421 pub plan_validity: PlanValidity,
422 pub otel_ctx: OpenTelemetryContext,
423}
424
425pub type CreateConnectionValidationReady = ValidationReady<CreateConnectionPlan>;
426pub type AlterConnectionValidationReady = ValidationReady<Connection>;
427
428#[derive(Debug)]
429pub enum PeekStage {
430 LinearizeTimestamp(PeekStageLinearizeTimestamp),
432 RealTimeRecency(PeekStageRealTimeRecency),
433 TimestampReadHold(PeekStageTimestampReadHold),
434 Optimize(PeekStageOptimize),
435 Finish(PeekStageFinish),
437 ExplainPlan(PeekStageExplainPlan),
439 ExplainPushdown(PeekStageExplainPushdown),
440 CopyToPreflight(PeekStageCopyTo),
442 CopyToDataflow(PeekStageCopyTo),
444}
445
446#[derive(Debug)]
447pub struct CopyToContext {
448 pub desc: RelationDesc,
450 pub uri: Uri,
452 pub connection: StorageConnection<ReferencedConnection>,
454 pub connection_id: CatalogItemId,
456 pub format: S3SinkFormat,
458 pub max_file_size: u64,
460 pub output_batch_count: Option<u64>,
465}
466
467#[derive(Debug)]
468pub struct PeekStageLinearizeTimestamp {
469 validity: PlanValidity,
470 plan: mz_sql::plan::SelectPlan,
471 max_query_result_size: Option<u64>,
472 source_ids: BTreeSet<GlobalId>,
473 target_replica: Option<ReplicaId>,
474 timeline_context: TimelineContext,
475 optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
476 explain_ctx: ExplainContext,
479}
480
481#[derive(Debug)]
482pub struct PeekStageRealTimeRecency {
483 validity: PlanValidity,
484 plan: mz_sql::plan::SelectPlan,
485 max_query_result_size: Option<u64>,
486 source_ids: BTreeSet<GlobalId>,
487 target_replica: Option<ReplicaId>,
488 timeline_context: TimelineContext,
489 oracle_read_ts: Option<Timestamp>,
490 optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
491 explain_ctx: ExplainContext,
494}
495
496#[derive(Debug)]
497pub struct PeekStageTimestampReadHold {
498 validity: PlanValidity,
499 plan: mz_sql::plan::SelectPlan,
500 max_query_result_size: Option<u64>,
501 source_ids: BTreeSet<GlobalId>,
502 target_replica: Option<ReplicaId>,
503 timeline_context: TimelineContext,
504 oracle_read_ts: Option<Timestamp>,
505 real_time_recency_ts: Option<mz_repr::Timestamp>,
506 optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
507 explain_ctx: ExplainContext,
510}
511
512#[derive(Debug)]
513pub struct PeekStageOptimize {
514 validity: PlanValidity,
515 plan: mz_sql::plan::SelectPlan,
516 max_query_result_size: Option<u64>,
517 source_ids: BTreeSet<GlobalId>,
518 id_bundle: CollectionIdBundle,
519 target_replica: Option<ReplicaId>,
520 determination: TimestampDetermination<mz_repr::Timestamp>,
521 optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
522 explain_ctx: ExplainContext,
525}
526
527#[derive(Debug)]
528pub struct PeekStageFinish {
529 validity: PlanValidity,
530 plan: mz_sql::plan::SelectPlan,
531 max_query_result_size: Option<u64>,
532 id_bundle: CollectionIdBundle,
533 target_replica: Option<ReplicaId>,
534 source_ids: BTreeSet<GlobalId>,
535 determination: TimestampDetermination<mz_repr::Timestamp>,
536 cluster_id: ComputeInstanceId,
537 finishing: RowSetFinishing,
538 plan_insights_optimizer_trace: Option<OptimizerTrace>,
541 insights_ctx: Option<Box<PlanInsightsContext>>,
542 global_lir_plan: optimize::peek::GlobalLirPlan,
543 optimization_finished_at: EpochMillis,
544}
545
546#[derive(Debug)]
547pub struct PeekStageCopyTo {
548 validity: PlanValidity,
549 optimizer: optimize::copy_to::Optimizer,
550 global_lir_plan: optimize::copy_to::GlobalLirPlan,
551 optimization_finished_at: EpochMillis,
552 source_ids: BTreeSet<GlobalId>,
553}
554
555#[derive(Debug)]
556pub struct PeekStageExplainPlan {
557 validity: PlanValidity,
558 optimizer: optimize::peek::Optimizer,
559 df_meta: DataflowMetainfo,
560 explain_ctx: ExplainPlanContext,
561 insights_ctx: Option<Box<PlanInsightsContext>>,
562}
563
564#[derive(Debug)]
565pub struct PeekStageExplainPushdown {
566 validity: PlanValidity,
567 determination: TimestampDetermination<mz_repr::Timestamp>,
568 imports: BTreeMap<GlobalId, MapFilterProject>,
569}
570
571#[derive(Debug)]
572pub enum CreateIndexStage {
573 Optimize(CreateIndexOptimize),
574 Finish(CreateIndexFinish),
575 Explain(CreateIndexExplain),
576}
577
578#[derive(Debug)]
579pub struct CreateIndexOptimize {
580 validity: PlanValidity,
581 plan: plan::CreateIndexPlan,
582 resolved_ids: ResolvedIds,
583 explain_ctx: ExplainContext,
586}
587
588#[derive(Debug)]
589pub struct CreateIndexFinish {
590 validity: PlanValidity,
591 item_id: CatalogItemId,
592 global_id: GlobalId,
593 plan: plan::CreateIndexPlan,
594 resolved_ids: ResolvedIds,
595 global_mir_plan: optimize::index::GlobalMirPlan,
596 global_lir_plan: optimize::index::GlobalLirPlan,
597}
598
599#[derive(Debug)]
600pub struct CreateIndexExplain {
601 validity: PlanValidity,
602 exported_index_id: GlobalId,
603 plan: plan::CreateIndexPlan,
604 df_meta: DataflowMetainfo,
605 explain_ctx: ExplainPlanContext,
606}
607
608#[derive(Debug)]
609pub enum CreateViewStage {
610 Optimize(CreateViewOptimize),
611 Finish(CreateViewFinish),
612 Explain(CreateViewExplain),
613}
614
615#[derive(Debug)]
616pub struct CreateViewOptimize {
617 validity: PlanValidity,
618 plan: plan::CreateViewPlan,
619 resolved_ids: ResolvedIds,
620 explain_ctx: ExplainContext,
623}
624
625#[derive(Debug)]
626pub struct CreateViewFinish {
627 validity: PlanValidity,
628 item_id: CatalogItemId,
630 global_id: GlobalId,
632 plan: plan::CreateViewPlan,
633 resolved_ids: ResolvedIds,
635 optimized_expr: OptimizedMirRelationExpr,
636}
637
638#[derive(Debug)]
639pub struct CreateViewExplain {
640 validity: PlanValidity,
641 id: GlobalId,
642 plan: plan::CreateViewPlan,
643 explain_ctx: ExplainPlanContext,
644}
645
646#[derive(Debug)]
647pub enum ExplainTimestampStage {
648 Optimize(ExplainTimestampOptimize),
649 RealTimeRecency(ExplainTimestampRealTimeRecency),
650 Finish(ExplainTimestampFinish),
651}
652
653#[derive(Debug)]
654pub struct ExplainTimestampOptimize {
655 validity: PlanValidity,
656 plan: plan::ExplainTimestampPlan,
657 cluster_id: ClusterId,
658}
659
660#[derive(Debug)]
661pub struct ExplainTimestampRealTimeRecency {
662 validity: PlanValidity,
663 format: ExplainFormat,
664 optimized_plan: OptimizedMirRelationExpr,
665 cluster_id: ClusterId,
666 when: QueryWhen,
667}
668
669#[derive(Debug)]
670pub struct ExplainTimestampFinish {
671 validity: PlanValidity,
672 format: ExplainFormat,
673 optimized_plan: OptimizedMirRelationExpr,
674 cluster_id: ClusterId,
675 source_ids: BTreeSet<GlobalId>,
676 when: QueryWhen,
677 real_time_recency_ts: Option<Timestamp>,
678}
679
680#[derive(Debug)]
681pub enum ClusterStage {
682 Alter(AlterCluster),
683 WaitForHydrated(AlterClusterWaitForHydrated),
684 Finalize(AlterClusterFinalize),
685}
686
687#[derive(Debug)]
688pub struct AlterCluster {
689 validity: PlanValidity,
690 plan: plan::AlterClusterPlan,
691}
692
693#[derive(Debug)]
694pub struct AlterClusterWaitForHydrated {
695 validity: PlanValidity,
696 plan: plan::AlterClusterPlan,
697 new_config: ClusterVariantManaged,
698 timeout_time: Instant,
699 on_timeout: OnTimeoutAction,
700}
701
702#[derive(Debug)]
703pub struct AlterClusterFinalize {
704 validity: PlanValidity,
705 plan: plan::AlterClusterPlan,
706 new_config: ClusterVariantManaged,
707}
708
709#[derive(Debug)]
710pub enum ExplainContext {
711 None,
713 Plan(ExplainPlanContext),
715 PlanInsightsNotice(OptimizerTrace),
718 Pushdown,
720}
721
722impl ExplainContext {
723 fn dispatch_guard(&self) -> Option<DispatchGuard<'_>> {
727 let optimizer_trace = match self {
728 ExplainContext::Plan(explain_ctx) => Some(&explain_ctx.optimizer_trace),
729 ExplainContext::PlanInsightsNotice(optimizer_trace) => Some(optimizer_trace),
730 _ => None,
731 };
732 optimizer_trace.map(|optimizer_trace| optimizer_trace.as_guard())
733 }
734
735 fn needs_cluster(&self) -> bool {
736 match self {
737 ExplainContext::None => true,
738 ExplainContext::Plan(..) => false,
739 ExplainContext::PlanInsightsNotice(..) => true,
740 ExplainContext::Pushdown => false,
741 }
742 }
743
744 fn needs_plan_insights(&self) -> bool {
745 matches!(
746 self,
747 ExplainContext::Plan(ExplainPlanContext {
748 stage: ExplainStage::PlanInsights,
749 ..
750 }) | ExplainContext::PlanInsightsNotice(_)
751 )
752 }
753}
754
755#[derive(Debug)]
756pub struct ExplainPlanContext {
757 pub broken: bool,
758 pub config: ExplainConfig,
759 pub format: ExplainFormat,
760 pub stage: ExplainStage,
761 pub replan: Option<GlobalId>,
762 pub desc: Option<RelationDesc>,
763 pub optimizer_trace: OptimizerTrace,
764}
765
766#[derive(Debug)]
767pub enum CreateMaterializedViewStage {
768 Optimize(CreateMaterializedViewOptimize),
769 Finish(CreateMaterializedViewFinish),
770 Explain(CreateMaterializedViewExplain),
771}
772
773#[derive(Debug)]
774pub struct CreateMaterializedViewOptimize {
775 validity: PlanValidity,
776 plan: plan::CreateMaterializedViewPlan,
777 resolved_ids: ResolvedIds,
778 explain_ctx: ExplainContext,
781}
782
783#[derive(Debug)]
784pub struct CreateMaterializedViewFinish {
785 item_id: CatalogItemId,
787 global_id: GlobalId,
789 validity: PlanValidity,
790 plan: plan::CreateMaterializedViewPlan,
791 resolved_ids: ResolvedIds,
792 local_mir_plan: optimize::materialized_view::LocalMirPlan,
793 global_mir_plan: optimize::materialized_view::GlobalMirPlan,
794 global_lir_plan: optimize::materialized_view::GlobalLirPlan,
795}
796
797#[derive(Debug)]
798pub struct CreateMaterializedViewExplain {
799 global_id: GlobalId,
800 validity: PlanValidity,
801 plan: plan::CreateMaterializedViewPlan,
802 df_meta: DataflowMetainfo,
803 explain_ctx: ExplainPlanContext,
804}
805
806#[derive(Debug)]
807pub enum SubscribeStage {
808 OptimizeMir(SubscribeOptimizeMir),
809 TimestampOptimizeLir(SubscribeTimestampOptimizeLir),
810 Finish(SubscribeFinish),
811}
812
813#[derive(Debug)]
814pub struct SubscribeOptimizeMir {
815 validity: PlanValidity,
816 plan: plan::SubscribePlan,
817 timeline: TimelineContext,
818 dependency_ids: BTreeSet<GlobalId>,
819 cluster_id: ComputeInstanceId,
820 replica_id: Option<ReplicaId>,
821}
822
823#[derive(Debug)]
824pub struct SubscribeTimestampOptimizeLir {
825 validity: PlanValidity,
826 plan: plan::SubscribePlan,
827 timeline: TimelineContext,
828 optimizer: optimize::subscribe::Optimizer,
829 global_mir_plan: optimize::subscribe::GlobalMirPlan<optimize::subscribe::Unresolved>,
830 dependency_ids: BTreeSet<GlobalId>,
831 replica_id: Option<ReplicaId>,
832}
833
834#[derive(Debug)]
835pub struct SubscribeFinish {
836 validity: PlanValidity,
837 cluster_id: ComputeInstanceId,
838 replica_id: Option<ReplicaId>,
839 plan: plan::SubscribePlan,
840 global_lir_plan: optimize::subscribe::GlobalLirPlan,
841 dependency_ids: BTreeSet<GlobalId>,
842}
843
844#[derive(Debug)]
845pub enum IntrospectionSubscribeStage {
846 OptimizeMir(IntrospectionSubscribeOptimizeMir),
847 TimestampOptimizeLir(IntrospectionSubscribeTimestampOptimizeLir),
848 Finish(IntrospectionSubscribeFinish),
849}
850
851#[derive(Debug)]
852pub struct IntrospectionSubscribeOptimizeMir {
853 validity: PlanValidity,
854 plan: plan::SubscribePlan,
855 subscribe_id: GlobalId,
856 cluster_id: ComputeInstanceId,
857 replica_id: ReplicaId,
858}
859
860#[derive(Debug)]
861pub struct IntrospectionSubscribeTimestampOptimizeLir {
862 validity: PlanValidity,
863 optimizer: optimize::subscribe::Optimizer,
864 global_mir_plan: optimize::subscribe::GlobalMirPlan<optimize::subscribe::Unresolved>,
865 cluster_id: ComputeInstanceId,
866 replica_id: ReplicaId,
867}
868
869#[derive(Debug)]
870pub struct IntrospectionSubscribeFinish {
871 validity: PlanValidity,
872 global_lir_plan: optimize::subscribe::GlobalLirPlan,
873 read_holds: ReadHolds<Timestamp>,
874 cluster_id: ComputeInstanceId,
875 replica_id: ReplicaId,
876}
877
878#[derive(Debug)]
879pub enum SecretStage {
880 CreateEnsure(CreateSecretEnsure),
881 CreateFinish(CreateSecretFinish),
882 RotateKeysEnsure(RotateKeysSecretEnsure),
883 RotateKeysFinish(RotateKeysSecretFinish),
884 Alter(AlterSecret),
885}
886
887#[derive(Debug)]
888pub struct CreateSecretEnsure {
889 validity: PlanValidity,
890 plan: plan::CreateSecretPlan,
891}
892
893#[derive(Debug)]
894pub struct CreateSecretFinish {
895 validity: PlanValidity,
896 item_id: CatalogItemId,
897 global_id: GlobalId,
898 plan: plan::CreateSecretPlan,
899}
900
901#[derive(Debug)]
902pub struct RotateKeysSecretEnsure {
903 validity: PlanValidity,
904 id: CatalogItemId,
905}
906
907#[derive(Debug)]
908pub struct RotateKeysSecretFinish {
909 validity: PlanValidity,
910 ops: Vec<crate::catalog::Op>,
911}
912
913#[derive(Debug)]
914pub struct AlterSecret {
915 validity: PlanValidity,
916 plan: plan::AlterSecretPlan,
917}
918
919#[derive(Debug, Copy, Clone, PartialEq, Eq)]
924pub enum TargetCluster {
925 CatalogServer,
927 Active,
929 Transaction(ClusterId),
931}
932
933pub(crate) enum StageResult<T> {
935 Handle(JoinHandle<Result<T, AdapterError>>),
937 HandleRetire(JoinHandle<Result<ExecuteResponse, AdapterError>>),
939 Immediate(T),
941 Response(ExecuteResponse),
943}
944
945pub(crate) trait Staged: Send {
947 type Ctx: StagedContext;
948
949 fn validity(&mut self) -> &mut PlanValidity;
950
951 async fn stage(
953 self,
954 coord: &mut Coordinator,
955 ctx: &mut Self::Ctx,
956 ) -> Result<StageResult<Box<Self>>, AdapterError>;
957
958 fn message(self, ctx: Self::Ctx, span: Span) -> Message;
960
961 fn cancel_enabled(&self) -> bool;
963}
964
965pub trait StagedContext {
966 fn retire(self, result: Result<ExecuteResponse, AdapterError>);
967 fn session(&self) -> Option<&Session>;
968}
969
970impl StagedContext for ExecuteContext {
971 fn retire(self, result: Result<ExecuteResponse, AdapterError>) {
972 self.retire(result);
973 }
974
975 fn session(&self) -> Option<&Session> {
976 Some(self.session())
977 }
978}
979
980impl StagedContext for () {
981 fn retire(self, _result: Result<ExecuteResponse, AdapterError>) {}
982
983 fn session(&self) -> Option<&Session> {
984 None
985 }
986}
987
988pub struct Config {
990 pub controller_config: ControllerConfig,
991 pub controller_envd_epoch: NonZeroI64,
992 pub storage: Box<dyn mz_catalog::durable::DurableCatalogState>,
993 pub audit_logs_iterator: AuditLogIterator,
994 pub timestamp_oracle_url: Option<SensitiveUrl>,
995 pub unsafe_mode: bool,
996 pub all_features: bool,
997 pub build_info: &'static BuildInfo,
998 pub environment_id: EnvironmentId,
999 pub metrics_registry: MetricsRegistry,
1000 pub now: NowFn,
1001 pub secrets_controller: Arc<dyn SecretsController>,
1002 pub cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
1003 pub availability_zones: Vec<String>,
1004 pub cluster_replica_sizes: ClusterReplicaSizeMap,
1005 pub builtin_system_cluster_config: BootstrapBuiltinClusterConfig,
1006 pub builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig,
1007 pub builtin_probe_cluster_config: BootstrapBuiltinClusterConfig,
1008 pub builtin_support_cluster_config: BootstrapBuiltinClusterConfig,
1009 pub builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig,
1010 pub system_parameter_defaults: BTreeMap<String, String>,
1011 pub storage_usage_client: StorageUsageClient,
1012 pub storage_usage_collection_interval: Duration,
1013 pub storage_usage_retention_period: Option<Duration>,
1014 pub segment_client: Option<mz_segment::Client>,
1015 pub egress_addresses: Vec<IpNet>,
1016 pub remote_system_parameters: Option<BTreeMap<String, String>>,
1017 pub aws_account_id: Option<String>,
1018 pub aws_privatelink_availability_zones: Option<Vec<String>>,
1019 pub connection_context: ConnectionContext,
1020 pub connection_limit_callback: Box<dyn Fn(u64, u64) -> () + Send + Sync + 'static>,
1021 pub webhook_concurrency_limit: WebhookConcurrencyLimiter,
1022 pub http_host_name: Option<String>,
1023 pub tracing_handle: TracingHandle,
1024 pub read_only_controllers: bool,
1028 pub enable_0dt_deployment: bool,
1030
1031 pub caught_up_trigger: Option<Trigger>,
1035
1036 pub helm_chart_version: Option<String>,
1037 pub license_key: ValidatedLicenseKey,
1038}
1039
1040#[derive(Clone, Default, Debug, Eq, PartialEq)]
1042pub struct ReplicaMetadata {
1043 pub metrics: Option<Vec<ServiceProcessMetrics>>,
1045}
1046
1047#[derive(Debug, Serialize)]
1049pub struct ConnMeta {
1050 secret_key: u32,
1055 connected_at: EpochMillis,
1057 user: User,
1058 application_name: String,
1059 uuid: Uuid,
1060 conn_id: ConnectionId,
1061 client_ip: Option<IpAddr>,
1062
1063 drop_sinks: BTreeSet<GlobalId>,
1066
1067 #[serde(skip)]
1069 deferred_lock: Option<OwnedMutexGuard<()>>,
1070
1071 pending_cluster_alters: BTreeSet<ClusterId>,
1074
1075 #[serde(skip)]
1077 notice_tx: mpsc::UnboundedSender<AdapterNotice>,
1078
1079 authenticated_role: RoleId,
1083}
1084
1085impl ConnMeta {
1086 pub fn conn_id(&self) -> &ConnectionId {
1087 &self.conn_id
1088 }
1089
1090 pub fn user(&self) -> &User {
1091 &self.user
1092 }
1093
1094 pub fn application_name(&self) -> &str {
1095 &self.application_name
1096 }
1097
1098 pub fn authenticated_role_id(&self) -> &RoleId {
1099 &self.authenticated_role
1100 }
1101
1102 pub fn uuid(&self) -> Uuid {
1103 self.uuid
1104 }
1105
1106 pub fn client_ip(&self) -> Option<IpAddr> {
1107 self.client_ip
1108 }
1109
1110 pub fn connected_at(&self) -> EpochMillis {
1111 self.connected_at
1112 }
1113}
1114
1115#[derive(Debug)]
1116pub struct PendingTxn {
1118 ctx: ExecuteContext,
1120 response: Result<PendingTxnResponse, AdapterError>,
1122 action: EndTransactionAction,
1124}
1125
1126#[derive(Debug)]
1127pub enum PendingTxnResponse {
1129 Committed {
1131 params: BTreeMap<&'static str, String>,
1133 },
1134 Rolledback {
1136 params: BTreeMap<&'static str, String>,
1138 },
1139}
1140
1141impl PendingTxnResponse {
1142 pub fn extend_params(&mut self, p: impl IntoIterator<Item = (&'static str, String)>) {
1143 match self {
1144 PendingTxnResponse::Committed { params }
1145 | PendingTxnResponse::Rolledback { params } => params.extend(p),
1146 }
1147 }
1148}
1149
1150impl From<PendingTxnResponse> for ExecuteResponse {
1151 fn from(value: PendingTxnResponse) -> Self {
1152 match value {
1153 PendingTxnResponse::Committed { params } => {
1154 ExecuteResponse::TransactionCommitted { params }
1155 }
1156 PendingTxnResponse::Rolledback { params } => {
1157 ExecuteResponse::TransactionRolledBack { params }
1158 }
1159 }
1160 }
1161}
1162
1163#[derive(Debug)]
1164pub struct PendingReadTxn {
1166 txn: PendingRead,
1168 timestamp_context: TimestampContext<mz_repr::Timestamp>,
1170 created: Instant,
1172 num_requeues: u64,
1176 otel_ctx: OpenTelemetryContext,
1178}
1179
1180impl PendingReadTxn {
1181 pub fn timestamp_context(&self) -> &TimestampContext<mz_repr::Timestamp> {
1183 &self.timestamp_context
1184 }
1185
1186 pub(crate) fn take_context(self) -> ExecuteContext {
1187 self.txn.take_context()
1188 }
1189}
1190
1191#[derive(Debug)]
1192enum PendingRead {
1194 Read {
1195 txn: PendingTxn,
1197 },
1198 ReadThenWrite {
1199 ctx: ExecuteContext,
1201 tx: oneshot::Sender<Option<ExecuteContext>>,
1204 },
1205}
1206
1207impl PendingRead {
1208 #[instrument(level = "debug")]
1213 pub fn finish(self) -> Option<(ExecuteContext, Result<ExecuteResponse, AdapterError>)> {
1214 match self {
1215 PendingRead::Read {
1216 txn:
1217 PendingTxn {
1218 mut ctx,
1219 response,
1220 action,
1221 },
1222 ..
1223 } => {
1224 let changed = ctx.session_mut().vars_mut().end_transaction(action);
1225 let response = response.map(|mut r| {
1227 r.extend_params(changed);
1228 ExecuteResponse::from(r)
1229 });
1230
1231 Some((ctx, response))
1232 }
1233 PendingRead::ReadThenWrite { ctx, tx, .. } => {
1234 let _ = tx.send(Some(ctx));
1236 None
1237 }
1238 }
1239 }
1240
1241 fn label(&self) -> &'static str {
1242 match self {
1243 PendingRead::Read { .. } => "read",
1244 PendingRead::ReadThenWrite { .. } => "read_then_write",
1245 }
1246 }
1247
1248 pub(crate) fn take_context(self) -> ExecuteContext {
1249 match self {
1250 PendingRead::Read { txn, .. } => txn.ctx,
1251 PendingRead::ReadThenWrite { ctx, tx, .. } => {
1252 let _ = tx.send(None);
1255 ctx
1256 }
1257 }
1258 }
1259}
1260
1261#[derive(Debug, Default)]
1272#[must_use]
1273pub struct ExecuteContextExtra {
1274 statement_uuid: Option<StatementLoggingId>,
1275}
1276
1277impl ExecuteContextExtra {
1278 pub(crate) fn new(statement_uuid: Option<StatementLoggingId>) -> Self {
1279 Self { statement_uuid }
1280 }
1281 pub fn is_trivial(&self) -> bool {
1282 let Self { statement_uuid } = self;
1283 statement_uuid.is_none()
1284 }
1285 pub fn contents(&self) -> Option<StatementLoggingId> {
1286 let Self { statement_uuid } = self;
1287 *statement_uuid
1288 }
1289 #[must_use]
1293 fn retire(mut self) -> Option<StatementLoggingId> {
1294 let Self { statement_uuid } = &mut self;
1295 statement_uuid.take()
1296 }
1297}
1298
1299impl Drop for ExecuteContextExtra {
1300 fn drop(&mut self) {
1301 let Self { statement_uuid } = &*self;
1302 if let Some(statement_uuid) = statement_uuid {
1303 soft_panic_or_log!(
1307 "execute context for statement {statement_uuid:?} dropped without being properly retired."
1308 );
1309 }
1310 }
1311}
1312
1313#[derive(Debug)]
1326pub struct ExecuteContext {
1327 inner: Box<ExecuteContextInner>,
1328}
1329
1330impl std::ops::Deref for ExecuteContext {
1331 type Target = ExecuteContextInner;
1332 fn deref(&self) -> &Self::Target {
1333 &*self.inner
1334 }
1335}
1336
1337impl std::ops::DerefMut for ExecuteContext {
1338 fn deref_mut(&mut self) -> &mut Self::Target {
1339 &mut *self.inner
1340 }
1341}
1342
1343#[derive(Debug)]
1344pub struct ExecuteContextInner {
1345 tx: ClientTransmitter<ExecuteResponse>,
1346 internal_cmd_tx: mpsc::UnboundedSender<Message>,
1347 session: Session,
1348 extra: ExecuteContextExtra,
1349}
1350
1351impl ExecuteContext {
1352 pub fn session(&self) -> &Session {
1353 &self.session
1354 }
1355
1356 pub fn session_mut(&mut self) -> &mut Session {
1357 &mut self.session
1358 }
1359
1360 pub fn tx(&self) -> &ClientTransmitter<ExecuteResponse> {
1361 &self.tx
1362 }
1363
1364 pub fn tx_mut(&mut self) -> &mut ClientTransmitter<ExecuteResponse> {
1365 &mut self.tx
1366 }
1367
1368 pub fn from_parts(
1369 tx: ClientTransmitter<ExecuteResponse>,
1370 internal_cmd_tx: mpsc::UnboundedSender<Message>,
1371 session: Session,
1372 extra: ExecuteContextExtra,
1373 ) -> Self {
1374 Self {
1375 inner: ExecuteContextInner {
1376 tx,
1377 session,
1378 extra,
1379 internal_cmd_tx,
1380 }
1381 .into(),
1382 }
1383 }
1384
1385 pub fn into_parts(
1394 self,
1395 ) -> (
1396 ClientTransmitter<ExecuteResponse>,
1397 mpsc::UnboundedSender<Message>,
1398 Session,
1399 ExecuteContextExtra,
1400 ) {
1401 let ExecuteContextInner {
1402 tx,
1403 internal_cmd_tx,
1404 session,
1405 extra,
1406 } = *self.inner;
1407 (tx, internal_cmd_tx, session, extra)
1408 }
1409
1410 #[instrument(level = "debug")]
1412 pub fn retire(self, result: Result<ExecuteResponse, AdapterError>) {
1413 let ExecuteContextInner {
1414 tx,
1415 internal_cmd_tx,
1416 session,
1417 extra,
1418 } = *self.inner;
1419 let reason = if extra.is_trivial() {
1420 None
1421 } else {
1422 Some((&result).into())
1423 };
1424 tx.send(result, session);
1425 if let Some(reason) = reason {
1426 if let Err(e) = internal_cmd_tx.send(Message::RetireExecute {
1427 otel_ctx: OpenTelemetryContext::obtain(),
1428 data: extra,
1429 reason,
1430 }) {
1431 warn!("internal_cmd_rx dropped before we could send: {:?}", e);
1432 }
1433 }
1434 }
1435
1436 pub fn extra(&self) -> &ExecuteContextExtra {
1437 &self.extra
1438 }
1439
1440 pub fn extra_mut(&mut self) -> &mut ExecuteContextExtra {
1441 &mut self.extra
1442 }
1443}
1444
1445#[derive(Debug)]
1446struct ClusterReplicaStatuses(
1447 BTreeMap<ClusterId, BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>>>,
1448);
1449
1450impl ClusterReplicaStatuses {
1451 pub(crate) fn new() -> ClusterReplicaStatuses {
1452 ClusterReplicaStatuses(BTreeMap::new())
1453 }
1454
1455 pub(crate) fn initialize_cluster_statuses(&mut self, cluster_id: ClusterId) {
1459 let prev = self.0.insert(cluster_id, BTreeMap::new());
1460 assert_eq!(
1461 prev, None,
1462 "cluster {cluster_id} statuses already initialized"
1463 );
1464 }
1465
1466 pub(crate) fn initialize_cluster_replica_statuses(
1470 &mut self,
1471 cluster_id: ClusterId,
1472 replica_id: ReplicaId,
1473 num_processes: usize,
1474 time: DateTime<Utc>,
1475 ) {
1476 tracing::info!(
1477 ?cluster_id,
1478 ?replica_id,
1479 ?time,
1480 "initializing cluster replica status"
1481 );
1482 let replica_statuses = self.0.entry(cluster_id).or_default();
1483 let process_statuses = (0..num_processes)
1484 .map(|process_id| {
1485 let status = ClusterReplicaProcessStatus {
1486 status: ClusterStatus::Offline(Some(OfflineReason::Initializing)),
1487 time: time.clone(),
1488 };
1489 (u64::cast_from(process_id), status)
1490 })
1491 .collect();
1492 let prev = replica_statuses.insert(replica_id, process_statuses);
1493 assert_none!(
1494 prev,
1495 "cluster replica {cluster_id}.{replica_id} statuses already initialized"
1496 );
1497 }
1498
1499 pub(crate) fn remove_cluster_statuses(
1503 &mut self,
1504 cluster_id: &ClusterId,
1505 ) -> BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>> {
1506 let prev = self.0.remove(cluster_id);
1507 prev.unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"))
1508 }
1509
1510 pub(crate) fn remove_cluster_replica_statuses(
1514 &mut self,
1515 cluster_id: &ClusterId,
1516 replica_id: &ReplicaId,
1517 ) -> BTreeMap<ProcessId, ClusterReplicaProcessStatus> {
1518 let replica_statuses = self
1519 .0
1520 .get_mut(cluster_id)
1521 .unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"));
1522 let prev = replica_statuses.remove(replica_id);
1523 prev.unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1524 }
1525
1526 pub(crate) fn ensure_cluster_status(
1530 &mut self,
1531 cluster_id: ClusterId,
1532 replica_id: ReplicaId,
1533 process_id: ProcessId,
1534 status: ClusterReplicaProcessStatus,
1535 ) {
1536 let replica_statuses = self
1537 .0
1538 .get_mut(&cluster_id)
1539 .unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"))
1540 .get_mut(&replica_id)
1541 .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"));
1542 replica_statuses.insert(process_id, status);
1543 }
1544
1545 pub fn get_cluster_replica_status(
1549 &self,
1550 cluster_id: ClusterId,
1551 replica_id: ReplicaId,
1552 ) -> ClusterStatus {
1553 let process_status = self.get_cluster_replica_statuses(cluster_id, replica_id);
1554 Self::cluster_replica_status(process_status)
1555 }
1556
1557 pub fn cluster_replica_status(
1559 process_status: &BTreeMap<ProcessId, ClusterReplicaProcessStatus>,
1560 ) -> ClusterStatus {
1561 process_status
1562 .values()
1563 .fold(ClusterStatus::Online, |s, p| match (s, p.status) {
1564 (ClusterStatus::Online, ClusterStatus::Online) => ClusterStatus::Online,
1565 (x, y) => {
1566 let reason_x = match x {
1567 ClusterStatus::Offline(reason) => reason,
1568 ClusterStatus::Online => None,
1569 };
1570 let reason_y = match y {
1571 ClusterStatus::Offline(reason) => reason,
1572 ClusterStatus::Online => None,
1573 };
1574 ClusterStatus::Offline(reason_x.or(reason_y))
1576 }
1577 })
1578 }
1579
1580 pub(crate) fn get_cluster_replica_statuses(
1584 &self,
1585 cluster_id: ClusterId,
1586 replica_id: ReplicaId,
1587 ) -> &BTreeMap<ProcessId, ClusterReplicaProcessStatus> {
1588 self.try_get_cluster_replica_statuses(cluster_id, replica_id)
1589 .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1590 }
1591
1592 pub(crate) fn try_get_cluster_replica_statuses(
1594 &self,
1595 cluster_id: ClusterId,
1596 replica_id: ReplicaId,
1597 ) -> Option<&BTreeMap<ProcessId, ClusterReplicaProcessStatus>> {
1598 self.try_get_cluster_statuses(cluster_id)
1599 .and_then(|statuses| statuses.get(&replica_id))
1600 }
1601
1602 pub(crate) fn get_cluster_statuses(
1606 &self,
1607 cluster_id: ClusterId,
1608 ) -> &BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>> {
1609 self.try_get_cluster_statuses(cluster_id)
1610 .unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"))
1611 }
1612
1613 pub(crate) fn try_get_cluster_statuses(
1615 &self,
1616 cluster_id: ClusterId,
1617 ) -> Option<&BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>>> {
1618 self.0.get(&cluster_id)
1619 }
1620}
1621
1622#[derive(Derivative)]
1624#[derivative(Debug)]
1625pub struct Coordinator {
1626 #[derivative(Debug = "ignore")]
1628 controller: mz_controller::Controller,
1629 catalog: Arc<Catalog>,
1637
1638 internal_cmd_tx: mpsc::UnboundedSender<Message>,
1640 group_commit_tx: appends::GroupCommitNotifier,
1642
1643 strict_serializable_reads_tx: mpsc::UnboundedSender<(ConnectionId, PendingReadTxn)>,
1645
1646 global_timelines: BTreeMap<Timeline, TimelineState<Timestamp>>,
1649
1650 transient_id_gen: Arc<TransientIdGen>,
1652 active_conns: BTreeMap<ConnectionId, ConnMeta>,
1655
1656 txn_read_holds: BTreeMap<ConnectionId, read_policy::ReadHolds<Timestamp>>,
1660
1661 pending_peeks: BTreeMap<Uuid, PendingPeek>,
1665 client_pending_peeks: BTreeMap<ConnectionId, BTreeMap<Uuid, ClusterId>>,
1667
1668 pending_linearize_read_txns: BTreeMap<ConnectionId, PendingReadTxn>,
1670
1671 active_compute_sinks: BTreeMap<GlobalId, ActiveComputeSink>,
1673 active_webhooks: BTreeMap<CatalogItemId, WebhookAppenderInvalidator>,
1675 active_copies: BTreeMap<ConnectionId, ActiveCopyFrom>,
1678
1679 staged_cancellation: BTreeMap<ConnectionId, (watch::Sender<bool>, watch::Receiver<bool>)>,
1682 introspection_subscribes: BTreeMap<GlobalId, IntrospectionSubscribe>,
1684
1685 write_locks: BTreeMap<CatalogItemId, Arc<tokio::sync::Mutex<()>>>,
1687 deferred_write_ops: BTreeMap<ConnectionId, DeferredOp>,
1689
1690 pending_writes: Vec<PendingWriteTxn>,
1692
1693 advance_timelines_interval: Interval,
1703
1704 serialized_ddl: LockedVecDeque<DeferredPlanStatement>,
1713
1714 secrets_controller: Arc<dyn SecretsController>,
1717 caching_secrets_reader: CachingSecretsReader,
1719
1720 cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
1723
1724 transient_replica_metadata: BTreeMap<ReplicaId, Option<ReplicaMetadata>>,
1730
1731 storage_usage_client: StorageUsageClient,
1733 storage_usage_collection_interval: Duration,
1735
1736 #[derivative(Debug = "ignore")]
1738 segment_client: Option<mz_segment::Client>,
1739
1740 metrics: Metrics,
1742 optimizer_metrics: OptimizerMetrics,
1744
1745 tracing_handle: TracingHandle,
1747
1748 statement_logging: StatementLogging,
1750
1751 webhook_concurrency_limit: WebhookConcurrencyLimiter,
1753
1754 pg_timestamp_oracle_config: Option<PostgresTimestampOracleConfig>,
1758
1759 check_cluster_scheduling_policies_interval: Interval,
1761
1762 cluster_scheduling_decisions: BTreeMap<ClusterId, BTreeMap<&'static str, SchedulingDecision>>,
1766
1767 caught_up_check_interval: Interval,
1770
1771 caught_up_check: Option<CaughtUpCheckContext>,
1774
1775 installed_watch_sets: BTreeMap<WatchSetId, (ConnectionId, WatchSetResponse)>,
1777
1778 connection_watch_sets: BTreeMap<ConnectionId, BTreeSet<WatchSetId>>,
1780
1781 cluster_replica_statuses: ClusterReplicaStatuses,
1783
1784 read_only_controllers: bool,
1788
1789 buffered_builtin_table_updates: Option<Vec<BuiltinTableUpdate>>,
1797
1798 license_key: ValidatedLicenseKey,
1799}
1800
1801impl Coordinator {
1802 #[instrument(name = "coord::bootstrap")]
1806 pub(crate) async fn bootstrap(
1807 &mut self,
1808 boot_ts: Timestamp,
1809 migrated_storage_collections_0dt: BTreeSet<CatalogItemId>,
1810 mut builtin_table_updates: Vec<BuiltinTableUpdate>,
1811 cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
1812 uncached_local_exprs: BTreeMap<GlobalId, LocalExpressions>,
1813 audit_logs_iterator: AuditLogIterator,
1814 ) -> Result<(), AdapterError> {
1815 let bootstrap_start = Instant::now();
1816 info!("startup: coordinator init: bootstrap beginning");
1817 info!("startup: coordinator init: bootstrap: preamble beginning");
1818
1819 let cluster_statuses: Vec<(_, Vec<_>)> = self
1822 .catalog()
1823 .clusters()
1824 .map(|cluster| {
1825 (
1826 cluster.id(),
1827 cluster
1828 .replicas()
1829 .map(|replica| {
1830 (replica.replica_id, replica.config.location.num_processes())
1831 })
1832 .collect(),
1833 )
1834 })
1835 .collect();
1836 let now = self.now_datetime();
1837 for (cluster_id, replica_statuses) in cluster_statuses {
1838 self.cluster_replica_statuses
1839 .initialize_cluster_statuses(cluster_id);
1840 for (replica_id, num_processes) in replica_statuses {
1841 self.cluster_replica_statuses
1842 .initialize_cluster_replica_statuses(
1843 cluster_id,
1844 replica_id,
1845 num_processes,
1846 now,
1847 );
1848 }
1849 }
1850 for replica_statuses in self.cluster_replica_statuses.0.values() {
1851 for (replica_id, processes_statuses) in replica_statuses {
1852 for (process_id, status) in processes_statuses {
1853 let builtin_table_update =
1854 self.catalog().state().pack_cluster_replica_status_update(
1855 *replica_id,
1856 *process_id,
1857 status,
1858 Diff::ONE,
1859 );
1860 let builtin_table_update = self
1861 .catalog()
1862 .state()
1863 .resolve_builtin_table_update(builtin_table_update);
1864 builtin_table_updates.push(builtin_table_update);
1865 }
1866 }
1867 }
1868
1869 let system_config = self.catalog().system_config();
1870
1871 mz_metrics::update_dyncfg(&system_config.dyncfg_updates());
1873
1874 let compute_config = flags::compute_config(system_config);
1876 let storage_config = flags::storage_config(system_config);
1877 let scheduling_config = flags::orchestrator_scheduling_config(system_config);
1878 let exert_prop = system_config.arrangement_exert_proportionality();
1879 self.controller.compute.update_configuration(compute_config);
1880 self.controller.storage.update_parameters(storage_config);
1881 self.controller
1882 .update_orchestrator_scheduling_config(scheduling_config);
1883 self.controller
1884 .set_arrangement_exert_proportionality(exert_prop);
1885
1886 let mut policies_to_set: BTreeMap<CompactionWindow, CollectionIdBundle> =
1887 Default::default();
1888
1889 let enable_worker_core_affinity =
1890 self.catalog().system_config().enable_worker_core_affinity();
1891 for instance in self.catalog.clusters() {
1892 self.controller.create_cluster(
1893 instance.id,
1894 ClusterConfig {
1895 arranged_logs: instance.log_indexes.clone(),
1896 workload_class: instance.config.workload_class.clone(),
1897 },
1898 )?;
1899 for replica in instance.replicas() {
1900 let role = instance.role();
1901 self.controller.create_replica(
1902 instance.id,
1903 replica.replica_id,
1904 role,
1905 replica.config.clone(),
1906 enable_worker_core_affinity,
1907 )?;
1908 }
1909 }
1910
1911 info!(
1912 "startup: coordinator init: bootstrap: preamble complete in {:?}",
1913 bootstrap_start.elapsed()
1914 );
1915
1916 let init_storage_collections_start = Instant::now();
1917 info!("startup: coordinator init: bootstrap: storage collections init beginning");
1918 self.bootstrap_storage_collections(&migrated_storage_collections_0dt)
1919 .await;
1920 info!(
1921 "startup: coordinator init: bootstrap: storage collections init complete in {:?}",
1922 init_storage_collections_start.elapsed()
1923 );
1924
1925 self.controller.start_compute_introspection_sink();
1930
1931 let optimize_dataflows_start = Instant::now();
1932 info!("startup: coordinator init: bootstrap: optimize dataflow plans beginning");
1933 let entries: Vec<_> = self.catalog().entries().cloned().collect();
1934 let uncached_global_exps = self.bootstrap_dataflow_plans(&entries, cached_global_exprs)?;
1935 info!(
1936 "startup: coordinator init: bootstrap: optimize dataflow plans complete in {:?}",
1937 optimize_dataflows_start.elapsed()
1938 );
1939
1940 let _fut = self.catalog().update_expression_cache(
1942 uncached_local_exprs.into_iter().collect(),
1943 uncached_global_exps.into_iter().collect(),
1944 );
1945
1946 let bootstrap_as_ofs_start = Instant::now();
1950 info!("startup: coordinator init: bootstrap: dataflow as-of bootstrapping beginning");
1951 let dataflow_read_holds = self.bootstrap_dataflow_as_ofs().await;
1952 info!(
1953 "startup: coordinator init: bootstrap: dataflow as-of bootstrapping complete in {:?}",
1954 bootstrap_as_ofs_start.elapsed()
1955 );
1956
1957 let postamble_start = Instant::now();
1958 info!("startup: coordinator init: bootstrap: postamble beginning");
1959
1960 let logs: BTreeSet<_> = BUILTINS::logs()
1961 .map(|log| self.catalog().resolve_builtin_log(log))
1962 .flat_map(|item_id| self.catalog().get_global_ids(&item_id))
1963 .collect();
1964
1965 let mut privatelink_connections = BTreeMap::new();
1966
1967 for entry in &entries {
1968 debug!(
1969 "coordinator init: installing {} {}",
1970 entry.item().typ(),
1971 entry.id()
1972 );
1973 let mut policy = entry.item().initial_logical_compaction_window();
1974 match entry.item() {
1975 CatalogItem::Source(source) => {
1981 if source.custom_logical_compaction_window.is_none() {
1983 if let DataSourceDesc::IngestionExport { ingestion_id, .. } =
1984 source.data_source
1985 {
1986 policy = Some(
1987 self.catalog()
1988 .get_entry(&ingestion_id)
1989 .source()
1990 .expect("must be source")
1991 .custom_logical_compaction_window
1992 .unwrap_or_default(),
1993 );
1994 }
1995 }
1996 policies_to_set
1997 .entry(policy.expect("sources have a compaction window"))
1998 .or_insert_with(Default::default)
1999 .storage_ids
2000 .insert(source.global_id());
2001 }
2002 CatalogItem::Table(table) => {
2003 policies_to_set
2004 .entry(policy.expect("tables have a compaction window"))
2005 .or_insert_with(Default::default)
2006 .storage_ids
2007 .extend(table.global_ids());
2008 }
2009 CatalogItem::Index(idx) => {
2010 let policy_entry = policies_to_set
2011 .entry(policy.expect("indexes have a compaction window"))
2012 .or_insert_with(Default::default);
2013
2014 if logs.contains(&idx.on) {
2015 policy_entry
2016 .compute_ids
2017 .entry(idx.cluster_id)
2018 .or_insert_with(BTreeSet::new)
2019 .insert(idx.global_id());
2020 } else {
2021 let df_desc = self
2022 .catalog()
2023 .try_get_physical_plan(&idx.global_id())
2024 .expect("added in `bootstrap_dataflow_plans`")
2025 .clone();
2026
2027 let df_meta = self
2028 .catalog()
2029 .try_get_dataflow_metainfo(&idx.global_id())
2030 .expect("added in `bootstrap_dataflow_plans`");
2031
2032 if self.catalog().state().system_config().enable_mz_notices() {
2033 self.catalog().state().pack_optimizer_notices(
2035 &mut builtin_table_updates,
2036 df_meta.optimizer_notices.iter(),
2037 Diff::ONE,
2038 );
2039 }
2040
2041 policy_entry
2044 .compute_ids
2045 .entry(idx.cluster_id)
2046 .or_insert_with(Default::default)
2047 .extend(df_desc.export_ids());
2048
2049 self.controller
2050 .compute
2051 .create_dataflow(idx.cluster_id, df_desc, None)
2052 .unwrap_or_terminate("cannot fail to create dataflows");
2053 }
2054 }
2055 CatalogItem::View(_) => (),
2056 CatalogItem::MaterializedView(mview) => {
2057 policies_to_set
2058 .entry(policy.expect("materialized views have a compaction window"))
2059 .or_insert_with(Default::default)
2060 .storage_ids
2061 .insert(mview.global_id());
2062
2063 let mut df_desc = self
2064 .catalog()
2065 .try_get_physical_plan(&mview.global_id())
2066 .expect("added in `bootstrap_dataflow_plans`")
2067 .clone();
2068
2069 if let Some(initial_as_of) = mview.initial_as_of.clone() {
2070 df_desc.set_initial_as_of(initial_as_of);
2071 }
2072
2073 let until = mview
2075 .refresh_schedule
2076 .as_ref()
2077 .and_then(|s| s.last_refresh())
2078 .and_then(|r| r.try_step_forward());
2079 if let Some(until) = until {
2080 df_desc.until.meet_assign(&Antichain::from_elem(until));
2081 }
2082
2083 let df_meta = self
2084 .catalog()
2085 .try_get_dataflow_metainfo(&mview.global_id())
2086 .expect("added in `bootstrap_dataflow_plans`");
2087
2088 if self.catalog().state().system_config().enable_mz_notices() {
2089 self.catalog().state().pack_optimizer_notices(
2091 &mut builtin_table_updates,
2092 df_meta.optimizer_notices.iter(),
2093 Diff::ONE,
2094 );
2095 }
2096
2097 self.ship_dataflow(df_desc, mview.cluster_id, None).await;
2098 }
2099 CatalogItem::Sink(sink) => {
2100 policies_to_set
2101 .entry(CompactionWindow::Default)
2102 .or_insert_with(Default::default)
2103 .storage_ids
2104 .insert(sink.global_id());
2105 }
2106 CatalogItem::Connection(catalog_connection) => {
2107 if let ConnectionDetails::AwsPrivatelink(conn) = &catalog_connection.details {
2108 privatelink_connections.insert(
2109 entry.id(),
2110 VpcEndpointConfig {
2111 aws_service_name: conn.service_name.clone(),
2112 availability_zone_ids: conn.availability_zones.clone(),
2113 },
2114 );
2115 }
2116 }
2117 CatalogItem::ContinualTask(ct) => {
2118 policies_to_set
2119 .entry(policy.expect("continual tasks have a compaction window"))
2120 .or_insert_with(Default::default)
2121 .storage_ids
2122 .insert(ct.global_id());
2123
2124 let mut df_desc = self
2125 .catalog()
2126 .try_get_physical_plan(&ct.global_id())
2127 .expect("added in `bootstrap_dataflow_plans`")
2128 .clone();
2129
2130 if let Some(initial_as_of) = ct.initial_as_of.clone() {
2131 df_desc.set_initial_as_of(initial_as_of);
2132 }
2133
2134 let df_meta = self
2135 .catalog()
2136 .try_get_dataflow_metainfo(&ct.global_id())
2137 .expect("added in `bootstrap_dataflow_plans`");
2138
2139 if self.catalog().state().system_config().enable_mz_notices() {
2140 self.catalog().state().pack_optimizer_notices(
2142 &mut builtin_table_updates,
2143 df_meta.optimizer_notices.iter(),
2144 Diff::ONE,
2145 );
2146 }
2147
2148 self.ship_dataflow(df_desc, ct.cluster_id, None).await;
2149 }
2150 CatalogItem::Log(_)
2152 | CatalogItem::Type(_)
2153 | CatalogItem::Func(_)
2154 | CatalogItem::Secret(_) => {}
2155 }
2156 }
2157
2158 if let Some(cloud_resource_controller) = &self.cloud_resource_controller {
2159 let existing_vpc_endpoints = cloud_resource_controller.list_vpc_endpoints().await?;
2161 let existing_vpc_endpoints = BTreeSet::from_iter(existing_vpc_endpoints.into_keys());
2162 let desired_vpc_endpoints = privatelink_connections.keys().cloned().collect();
2163 let vpc_endpoints_to_remove = existing_vpc_endpoints.difference(&desired_vpc_endpoints);
2164 for id in vpc_endpoints_to_remove {
2165 cloud_resource_controller.delete_vpc_endpoint(*id).await?;
2166 }
2167
2168 for (id, spec) in privatelink_connections {
2170 cloud_resource_controller
2171 .ensure_vpc_endpoint(id, spec)
2172 .await?;
2173 }
2174 }
2175
2176 drop(dataflow_read_holds);
2179 for (cw, policies) in policies_to_set {
2181 self.initialize_read_policies(&policies, cw).await;
2182 }
2183
2184 builtin_table_updates.extend(
2186 self.catalog().state().resolve_builtin_table_updates(
2187 self.catalog().state().pack_all_replica_size_updates(),
2188 ),
2189 );
2190
2191 debug!("startup: coordinator init: bootstrap: initializing migrated builtin tables");
2192 let migrated_updates_fut = if self.controller.read_only() {
2198 let min_timestamp = Timestamp::minimum();
2199 let migrated_builtin_table_updates: Vec<_> = builtin_table_updates
2200 .drain_filter_swapping(|update| {
2201 let gid = self.catalog().get_entry(&update.id).latest_global_id();
2202 migrated_storage_collections_0dt.contains(&update.id)
2203 && self
2204 .controller
2205 .storage_collections
2206 .collection_frontiers(gid)
2207 .expect("all tables are registered")
2208 .write_frontier
2209 .elements()
2210 == &[min_timestamp]
2211 })
2212 .collect();
2213 if migrated_builtin_table_updates.is_empty() {
2214 futures::future::ready(()).boxed()
2215 } else {
2216 let mut grouped_appends: BTreeMap<GlobalId, Vec<TableData>> = BTreeMap::new();
2218 for update in migrated_builtin_table_updates {
2219 let gid = self.catalog().get_entry(&update.id).latest_global_id();
2220 grouped_appends.entry(gid).or_default().push(update.data);
2221 }
2222 info!(
2223 "coordinator init: rehydrating migrated builtin tables in read-only mode: {:?}",
2224 grouped_appends.keys().collect::<Vec<_>>()
2225 );
2226
2227 let mut all_appends = Vec::with_capacity(grouped_appends.len());
2229 for (item_id, table_data) in grouped_appends.into_iter() {
2230 let mut all_rows = Vec::new();
2231 let mut all_data = Vec::new();
2232 for data in table_data {
2233 match data {
2234 TableData::Rows(rows) => all_rows.extend(rows),
2235 TableData::Batches(_) => all_data.push(data),
2236 }
2237 }
2238 differential_dataflow::consolidation::consolidate(&mut all_rows);
2239 all_data.push(TableData::Rows(all_rows));
2240
2241 all_appends.push((item_id, all_data));
2243 }
2244
2245 let fut = self
2246 .controller
2247 .storage
2248 .append_table(min_timestamp, boot_ts.step_forward(), all_appends)
2249 .expect("cannot fail to append");
2250 async {
2251 fut.await
2252 .expect("One-shot shouldn't be dropped during bootstrap")
2253 .unwrap_or_terminate("cannot fail to append")
2254 }
2255 .boxed()
2256 }
2257 } else {
2258 futures::future::ready(()).boxed()
2259 };
2260
2261 info!(
2262 "startup: coordinator init: bootstrap: postamble complete in {:?}",
2263 postamble_start.elapsed()
2264 );
2265
2266 let builtin_update_start = Instant::now();
2267 info!("startup: coordinator init: bootstrap: generate builtin updates beginning");
2268
2269 if self.controller.read_only() {
2270 info!(
2271 "coordinator init: bootstrap: stashing builtin table updates while in read-only mode"
2272 );
2273
2274 let audit_join_start = Instant::now();
2276 info!("startup: coordinator init: bootstrap: audit log deserialization beginning");
2277 let audit_log_updates: Vec<_> = audit_logs_iterator
2278 .map(|(audit_log, ts)| StateUpdate {
2279 kind: StateUpdateKind::AuditLog(audit_log),
2280 ts,
2281 diff: StateDiff::Addition,
2282 })
2283 .collect();
2284 let audit_log_builtin_table_updates = self
2285 .catalog()
2286 .state()
2287 .generate_builtin_table_updates(audit_log_updates);
2288 builtin_table_updates.extend(audit_log_builtin_table_updates);
2289 info!(
2290 "startup: coordinator init: bootstrap: audit log deserialization complete in {:?}",
2291 audit_join_start.elapsed()
2292 );
2293 self.buffered_builtin_table_updates
2294 .as_mut()
2295 .expect("in read-only mode")
2296 .append(&mut builtin_table_updates);
2297 } else {
2298 self.bootstrap_tables(&entries, builtin_table_updates, audit_logs_iterator)
2299 .await;
2300 };
2301 info!(
2302 "startup: coordinator init: bootstrap: generate builtin updates complete in {:?}",
2303 builtin_update_start.elapsed()
2304 );
2305
2306 let cleanup_secrets_start = Instant::now();
2307 info!("startup: coordinator init: bootstrap: generate secret cleanup beginning");
2308 {
2312 let Self {
2315 secrets_controller,
2316 catalog,
2317 ..
2318 } = self;
2319
2320 let next_user_item_id = catalog.get_next_user_item_id().await?;
2321 let next_system_item_id = catalog.get_next_system_item_id().await?;
2322 let read_only = self.controller.read_only();
2323 let catalog_ids: BTreeSet<CatalogItemId> =
2328 catalog.entries().map(|entry| entry.id()).collect();
2329 let secrets_controller = Arc::clone(secrets_controller);
2330
2331 spawn(|| "cleanup-orphaned-secrets", async move {
2332 if read_only {
2333 info!(
2334 "coordinator init: not cleaning up orphaned secrets while in read-only mode"
2335 );
2336 return;
2337 }
2338 info!("coordinator init: cleaning up orphaned secrets");
2339
2340 match secrets_controller.list().await {
2341 Ok(controller_secrets) => {
2342 let controller_secrets: BTreeSet<CatalogItemId> =
2343 controller_secrets.into_iter().collect();
2344 let orphaned = controller_secrets.difference(&catalog_ids);
2345 for id in orphaned {
2346 let id_too_large = match id {
2347 CatalogItemId::System(id) => *id >= next_system_item_id,
2348 CatalogItemId::User(id) => *id >= next_user_item_id,
2349 CatalogItemId::IntrospectionSourceIndex(_)
2350 | CatalogItemId::Transient(_) => false,
2351 };
2352 if id_too_large {
2353 info!(
2354 %next_user_item_id, %next_system_item_id,
2355 "coordinator init: not deleting orphaned secret {id} that was likely created by a newer deploy generation"
2356 );
2357 } else {
2358 info!("coordinator init: deleting orphaned secret {id}");
2359 fail_point!("orphan_secrets");
2360 if let Err(e) = secrets_controller.delete(*id).await {
2361 warn!(
2362 "Dropping orphaned secret has encountered an error: {}",
2363 e
2364 );
2365 }
2366 }
2367 }
2368 }
2369 Err(e) => warn!("Failed to list secrets during orphan cleanup: {:?}", e),
2370 }
2371 });
2372 }
2373 info!(
2374 "startup: coordinator init: bootstrap: generate secret cleanup complete in {:?}",
2375 cleanup_secrets_start.elapsed()
2376 );
2377
2378 let final_steps_start = Instant::now();
2380 info!(
2381 "startup: coordinator init: bootstrap: migrate builtin tables in read-only mode beginning"
2382 );
2383 migrated_updates_fut
2384 .instrument(info_span!("coord::bootstrap::final"))
2385 .await;
2386
2387 debug!(
2388 "startup: coordinator init: bootstrap: announcing completion of initialization to controller"
2389 );
2390 self.controller.initialization_complete();
2392
2393 self.bootstrap_introspection_subscribes().await;
2395
2396 info!(
2397 "startup: coordinator init: bootstrap: migrate builtin tables in read-only mode complete in {:?}",
2398 final_steps_start.elapsed()
2399 );
2400
2401 info!(
2402 "startup: coordinator init: bootstrap complete in {:?}",
2403 bootstrap_start.elapsed()
2404 );
2405 Ok(())
2406 }
2407
2408 #[allow(clippy::async_yields_async)]
2413 #[instrument]
2414 async fn bootstrap_tables(
2415 &mut self,
2416 entries: &[CatalogEntry],
2417 mut builtin_table_updates: Vec<BuiltinTableUpdate>,
2418 audit_logs_iterator: AuditLogIterator,
2419 ) {
2420 struct TableMetadata<'a> {
2422 id: CatalogItemId,
2423 name: &'a QualifiedItemName,
2424 table: &'a Table,
2425 }
2426
2427 let table_metas: Vec<_> = entries
2429 .into_iter()
2430 .filter_map(|entry| {
2431 entry.table().map(|table| TableMetadata {
2432 id: entry.id(),
2433 name: entry.name(),
2434 table,
2435 })
2436 })
2437 .collect();
2438
2439 debug!("coordinator init: advancing all tables to current timestamp");
2441 let WriteTimestamp {
2442 timestamp: write_ts,
2443 advance_to,
2444 } = self.get_local_write_ts().await;
2445 let appends = table_metas
2446 .iter()
2447 .map(|meta| (meta.table.global_id_writes(), Vec::new()))
2448 .collect();
2449 let table_fence_rx = self
2453 .controller
2454 .storage
2455 .append_table(write_ts.clone(), advance_to, appends)
2456 .expect("invalid updates");
2457
2458 self.apply_local_write(write_ts).await;
2459
2460 debug!("coordinator init: resetting system tables");
2462 let read_ts = self.get_local_read_ts().await;
2463
2464 let mz_storage_usage_by_shard_schema: SchemaSpecifier = self
2467 .catalog()
2468 .resolve_system_schema(MZ_STORAGE_USAGE_BY_SHARD.schema)
2469 .into();
2470 let is_storage_usage_by_shard = |meta: &TableMetadata| -> bool {
2471 meta.name.item == MZ_STORAGE_USAGE_BY_SHARD.name
2472 && meta.name.qualifiers.schema_spec == mz_storage_usage_by_shard_schema
2473 };
2474
2475 let mut retraction_tasks = Vec::new();
2476 let mut system_tables: Vec<_> = table_metas
2477 .iter()
2478 .filter(|meta| meta.id.is_system() && !is_storage_usage_by_shard(meta))
2479 .collect();
2480
2481 let (audit_events_idx, _) = system_tables
2483 .iter()
2484 .find_position(|table| {
2485 table.id == self.catalog().resolve_builtin_table(&MZ_AUDIT_EVENTS)
2486 })
2487 .expect("mz_audit_events must exist");
2488 let audit_events = system_tables.remove(audit_events_idx);
2489 let audit_log_task = self.bootstrap_audit_log_table(
2490 audit_events.id,
2491 audit_events.name,
2492 audit_events.table,
2493 audit_logs_iterator,
2494 read_ts,
2495 );
2496
2497 for system_table in system_tables {
2498 let table_id = system_table.id;
2499 let full_name = self.catalog().resolve_full_name(system_table.name, None);
2500 debug!("coordinator init: resetting system table {full_name} ({table_id})");
2501
2502 let snapshot_fut = self
2504 .controller
2505 .storage_collections
2506 .snapshot_cursor(system_table.table.global_id_writes(), read_ts);
2507 let batch_fut = self
2508 .controller
2509 .storage_collections
2510 .create_update_builder(system_table.table.global_id_writes());
2511
2512 let task = spawn(|| format!("snapshot-{table_id}"), async move {
2513 let mut batch = batch_fut
2515 .await
2516 .unwrap_or_terminate("cannot fail to create a batch for a BuiltinTable");
2517 tracing::info!(?table_id, "starting snapshot");
2518 let mut snapshot_cursor = snapshot_fut
2520 .await
2521 .unwrap_or_terminate("cannot fail to snapshot");
2522
2523 while let Some(values) = snapshot_cursor.next().await {
2525 for ((key, _val), _t, d) in values {
2526 let key = key.expect("builtin table had errors");
2527 let d_invert = d.neg();
2528 batch.add(&key, &(), &d_invert).await;
2529 }
2530 }
2531 tracing::info!(?table_id, "finished snapshot");
2532
2533 let batch = batch.finish().await;
2534 BuiltinTableUpdate::batch(table_id, batch)
2535 });
2536 retraction_tasks.push(task);
2537 }
2538
2539 let retractions_res = futures::future::join_all(retraction_tasks).await;
2540 for retractions in retractions_res {
2541 let retractions = retractions.expect("cannot fail to fetch snapshot");
2542 builtin_table_updates.push(retractions);
2543 }
2544
2545 let audit_join_start = Instant::now();
2546 info!("startup: coordinator init: bootstrap: join audit log deserialization beginning");
2547 let audit_log_updates = audit_log_task
2548 .await
2549 .expect("cannot fail to fetch audit log updates");
2550 let audit_log_builtin_table_updates = self
2551 .catalog()
2552 .state()
2553 .generate_builtin_table_updates(audit_log_updates);
2554 builtin_table_updates.extend(audit_log_builtin_table_updates);
2555 info!(
2556 "startup: coordinator init: bootstrap: join audit log deserialization complete in {:?}",
2557 audit_join_start.elapsed()
2558 );
2559
2560 table_fence_rx
2562 .await
2563 .expect("One-shot shouldn't be dropped during bootstrap")
2564 .unwrap_or_terminate("cannot fail to append");
2565
2566 info!("coordinator init: sending builtin table updates");
2567 let (_builtin_updates_fut, write_ts) = self
2568 .builtin_table_update()
2569 .execute(builtin_table_updates)
2570 .await;
2571 info!(?write_ts, "our write ts");
2572 if let Some(write_ts) = write_ts {
2573 self.apply_local_write(write_ts).await;
2574 }
2575 }
2576
2577 #[instrument]
2581 fn bootstrap_audit_log_table<'a>(
2582 &mut self,
2583 table_id: CatalogItemId,
2584 name: &'a QualifiedItemName,
2585 table: &'a Table,
2586 audit_logs_iterator: AuditLogIterator,
2587 read_ts: Timestamp,
2588 ) -> JoinHandle<Vec<StateUpdate>> {
2589 let full_name = self.catalog().resolve_full_name(name, None);
2590 debug!("coordinator init: reconciling audit log: {full_name} ({table_id})");
2591 let current_contents_fut = self
2592 .controller
2593 .storage_collections
2594 .snapshot(table.global_id_writes(), read_ts);
2595 spawn(|| format!("snapshot-audit-log-{table_id}"), async move {
2596 let current_contents = current_contents_fut
2597 .await
2598 .unwrap_or_terminate("cannot fail to fetch snapshot");
2599 let contents_len = current_contents.len();
2600 debug!("coordinator init: audit log table ({table_id}) size {contents_len}");
2601
2602 let max_table_id = current_contents
2604 .into_iter()
2605 .filter(|(_, diff)| *diff == 1)
2606 .map(|(row, _diff)| row.unpack_first().unwrap_uint64())
2607 .sorted()
2608 .rev()
2609 .next();
2610
2611 audit_logs_iterator
2613 .take_while(|(audit_log, _)| match max_table_id {
2614 Some(id) => audit_log.event.sortable_id() > id,
2615 None => true,
2616 })
2617 .map(|(audit_log, ts)| StateUpdate {
2618 kind: StateUpdateKind::AuditLog(audit_log),
2619 ts,
2620 diff: StateDiff::Addition,
2621 })
2622 .collect::<Vec<_>>()
2623 })
2624 }
2625
2626 #[instrument]
2639 async fn bootstrap_storage_collections(
2640 &mut self,
2641 migrated_storage_collections: &BTreeSet<CatalogItemId>,
2642 ) {
2643 let catalog = self.catalog();
2644 let source_status_collection_id = catalog
2645 .resolve_builtin_storage_collection(&mz_catalog::builtin::MZ_SOURCE_STATUS_HISTORY);
2646 let source_status_collection_id = catalog
2647 .get_entry(&source_status_collection_id)
2648 .latest_global_id();
2649
2650 let source_desc =
2651 |data_source: &DataSourceDesc, desc: &RelationDesc, timeline: &Timeline| {
2652 let (data_source, status_collection_id) = match data_source.clone() {
2653 DataSourceDesc::Ingestion {
2655 ingestion_desc:
2656 mz_sql::plan::Ingestion {
2657 desc,
2658 progress_subsource,
2659 },
2660 cluster_id,
2661 } => {
2662 let desc = desc.into_inline_connection(catalog.state());
2663 let progress_subsource =
2666 catalog.get_entry(&progress_subsource).latest_global_id();
2667 let ingestion = mz_storage_types::sources::IngestionDescription::new(
2668 desc,
2669 cluster_id,
2670 progress_subsource,
2671 );
2672
2673 (
2674 DataSource::Ingestion(ingestion.clone()),
2675 Some(source_status_collection_id),
2676 )
2677 }
2678 DataSourceDesc::IngestionExport {
2679 ingestion_id,
2680 external_reference: _,
2681 details,
2682 data_config,
2683 } => {
2684 let ingestion_id = catalog.get_entry(&ingestion_id).latest_global_id();
2687 (
2688 DataSource::IngestionExport {
2689 ingestion_id,
2690 details,
2691 data_config: data_config.into_inline_connection(catalog.state()),
2692 },
2693 Some(source_status_collection_id),
2694 )
2695 }
2696 DataSourceDesc::Webhook { .. } => {
2697 (DataSource::Webhook, Some(source_status_collection_id))
2698 }
2699 DataSourceDesc::Progress => (DataSource::Progress, None),
2700 DataSourceDesc::Introspection(introspection) => {
2701 (DataSource::Introspection(introspection), None)
2702 }
2703 };
2704 CollectionDescription {
2705 desc: desc.clone(),
2706 data_source,
2707 since: None,
2708 status_collection_id,
2709 timeline: Some(timeline.clone()),
2710 }
2711 };
2712
2713 let mut compute_collections = vec![];
2714 let mut collections = vec![];
2715 let mut new_builtin_continual_tasks = vec![];
2716 for entry in catalog.entries() {
2717 match entry.item() {
2718 CatalogItem::Source(source) => {
2719 collections.push((
2720 source.global_id(),
2721 source_desc(&source.data_source, &source.desc, &source.timeline),
2722 ));
2723 }
2724 CatalogItem::Table(table) => {
2725 match &table.data_source {
2726 TableDataSource::TableWrites { defaults: _ } => {
2727 let versions: BTreeMap<_, _> = table
2728 .collection_descs()
2729 .map(|(gid, version, desc)| (version, (gid, desc)))
2730 .collect();
2731 let collection_descs = versions.iter().map(|(version, (gid, desc))| {
2732 let next_version = version.bump();
2733 let primary_collection =
2734 versions.get(&next_version).map(|(gid, _desc)| gid).copied();
2735 let collection_desc = CollectionDescription::for_table(
2736 desc.clone(),
2737 primary_collection,
2738 );
2739
2740 (*gid, collection_desc)
2741 });
2742 collections.extend(collection_descs);
2743 }
2744 TableDataSource::DataSource {
2745 desc: data_source_desc,
2746 timeline,
2747 } => {
2748 soft_assert_eq_or_log!(table.collections.len(), 1);
2750 let collection_descs =
2751 table.collection_descs().map(|(gid, _version, desc)| {
2752 (gid, source_desc(data_source_desc, &desc, timeline))
2753 });
2754 collections.extend(collection_descs);
2755 }
2756 };
2757 }
2758 CatalogItem::MaterializedView(mv) => {
2759 let collection_desc = CollectionDescription {
2760 desc: mv.desc.clone(),
2761 data_source: DataSource::Other,
2762 since: mv.initial_as_of.clone(),
2763 status_collection_id: None,
2764 timeline: None,
2765 };
2766 compute_collections.push((mv.global_id(), mv.desc.clone()));
2767 collections.push((mv.global_id(), collection_desc));
2768 }
2769 CatalogItem::ContinualTask(ct) => {
2770 let collection_desc = CollectionDescription {
2771 desc: ct.desc.clone(),
2772 data_source: DataSource::Other,
2773 since: ct.initial_as_of.clone(),
2774 status_collection_id: None,
2775 timeline: None,
2776 };
2777 if ct.global_id().is_system() && collection_desc.since.is_none() {
2778 new_builtin_continual_tasks.push((ct.global_id(), collection_desc));
2782 } else {
2783 compute_collections.push((ct.global_id(), ct.desc.clone()));
2784 collections.push((ct.global_id(), collection_desc));
2785 }
2786 }
2787 CatalogItem::Sink(sink) => {
2788 let storage_sink_from_entry = self.catalog().get_entry_by_global_id(&sink.from);
2789 let from_desc = storage_sink_from_entry
2790 .desc(&self.catalog().resolve_full_name(
2791 storage_sink_from_entry.name(),
2792 storage_sink_from_entry.conn_id(),
2793 ))
2794 .expect("sinks can only be built on items with descs")
2795 .into_owned();
2796 let collection_desc = CollectionDescription {
2797 desc: KAFKA_PROGRESS_DESC.clone(),
2799 data_source: DataSource::Sink {
2800 desc: ExportDescription {
2801 sink: StorageSinkDesc {
2802 from: sink.from,
2803 from_desc,
2804 connection: sink
2805 .connection
2806 .clone()
2807 .into_inline_connection(self.catalog().state()),
2808 envelope: sink.envelope,
2809 as_of: Antichain::from_elem(Timestamp::minimum()),
2810 with_snapshot: sink.with_snapshot,
2811 version: sink.version,
2812 from_storage_metadata: (),
2813 to_storage_metadata: (),
2814 },
2815 instance_id: sink.cluster_id,
2816 },
2817 },
2818 since: None,
2819 status_collection_id: None,
2820 timeline: None,
2821 };
2822 collections.push((sink.global_id, collection_desc));
2823 }
2824 _ => (),
2825 }
2826 }
2827
2828 let register_ts = if self.controller.read_only() {
2829 self.get_local_read_ts().await
2830 } else {
2831 self.get_local_write_ts().await.timestamp
2834 };
2835
2836 let storage_metadata = self.catalog.state().storage_metadata();
2837 let migrated_storage_collections = migrated_storage_collections
2838 .into_iter()
2839 .flat_map(|item_id| self.catalog.get_entry(item_id).global_ids())
2840 .collect();
2841
2842 self.controller
2847 .storage
2848 .evolve_nullability_for_bootstrap(storage_metadata, compute_collections)
2849 .await
2850 .unwrap_or_terminate("cannot fail to evolve collections");
2851
2852 self.controller
2853 .storage
2854 .create_collections_for_bootstrap(
2855 storage_metadata,
2856 Some(register_ts),
2857 collections,
2858 &migrated_storage_collections,
2859 )
2860 .await
2861 .unwrap_or_terminate("cannot fail to create collections");
2862
2863 self.bootstrap_builtin_continual_tasks(new_builtin_continual_tasks)
2864 .await;
2865
2866 if !self.controller.read_only() {
2867 self.apply_local_write(register_ts).await;
2868 }
2869 }
2870
2871 async fn bootstrap_builtin_continual_tasks(
2878 &mut self,
2879 mut collections: Vec<(GlobalId, CollectionDescription<Timestamp>)>,
2881 ) {
2882 for (id, collection) in &mut collections {
2883 let entry = self.catalog.get_entry_by_global_id(id);
2884 let ct = match &entry.item {
2885 CatalogItem::ContinualTask(ct) => ct.clone(),
2886 _ => unreachable!("only called with continual task builtins"),
2887 };
2888 let debug_name = self
2889 .catalog()
2890 .resolve_full_name(entry.name(), None)
2891 .to_string();
2892 let (_optimized_plan, physical_plan, _metainfo) = self
2893 .optimize_create_continual_task(&ct, *id, self.owned_catalog(), debug_name)
2894 .expect("builtin CT should optimize successfully");
2895
2896 let mut id_bundle = dataflow_import_id_bundle(&physical_plan, ct.cluster_id);
2898 id_bundle.storage_ids.remove(id);
2900 let read_holds = self.acquire_read_holds(&id_bundle);
2901 let as_of = read_holds.least_valid_read();
2902
2903 collection.since = Some(as_of.clone());
2904 }
2905 self.controller
2906 .storage
2907 .create_collections(self.catalog.state().storage_metadata(), None, collections)
2908 .await
2909 .unwrap_or_terminate("cannot fail to create collections");
2910 }
2911
2912 #[instrument]
2923 fn bootstrap_dataflow_plans(
2924 &mut self,
2925 ordered_catalog_entries: &[CatalogEntry],
2926 mut cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
2927 ) -> Result<BTreeMap<GlobalId, GlobalExpressions>, AdapterError> {
2928 let mut instance_snapshots = BTreeMap::new();
2934 let mut uncached_expressions = BTreeMap::new();
2935
2936 let optimizer_config = OptimizerConfig::from(self.catalog().system_config());
2937
2938 for entry in ordered_catalog_entries {
2939 match entry.item() {
2940 CatalogItem::Index(idx) => {
2941 let compute_instance =
2943 instance_snapshots.entry(idx.cluster_id).or_insert_with(|| {
2944 self.instance_snapshot(idx.cluster_id)
2945 .expect("compute instance exists")
2946 });
2947 let global_id = idx.global_id();
2948
2949 if compute_instance.contains_collection(&global_id) {
2952 continue;
2953 }
2954
2955 let (optimized_plan, physical_plan, metainfo) =
2956 match cached_global_exprs.remove(&global_id) {
2957 Some(global_expressions)
2958 if global_expressions.optimizer_features
2959 == optimizer_config.features =>
2960 {
2961 debug!("global expression cache hit for {global_id:?}");
2962 (
2963 global_expressions.global_mir,
2964 global_expressions.physical_plan,
2965 global_expressions.dataflow_metainfos,
2966 )
2967 }
2968 Some(_) | None => {
2969 let (optimized_plan, global_lir_plan) = {
2970 let mut optimizer = optimize::index::Optimizer::new(
2972 self.owned_catalog(),
2973 compute_instance.clone(),
2974 global_id,
2975 optimizer_config.clone(),
2976 self.optimizer_metrics(),
2977 );
2978
2979 let index_plan = optimize::index::Index::new(
2981 entry.name().clone(),
2982 idx.on,
2983 idx.keys.to_vec(),
2984 );
2985 let global_mir_plan = optimizer.optimize(index_plan)?;
2986 let optimized_plan = global_mir_plan.df_desc().clone();
2987
2988 let global_lir_plan = optimizer.optimize(global_mir_plan)?;
2990
2991 (optimized_plan, global_lir_plan)
2992 };
2993
2994 let (physical_plan, metainfo) = global_lir_plan.unapply();
2995 let metainfo = {
2996 let notice_ids =
2998 std::iter::repeat_with(|| self.allocate_transient_id())
2999 .map(|(_item_id, gid)| gid)
3000 .take(metainfo.optimizer_notices.len())
3001 .collect::<Vec<_>>();
3002 self.catalog().render_notices(
3004 metainfo,
3005 notice_ids,
3006 Some(idx.global_id()),
3007 )
3008 };
3009 uncached_expressions.insert(
3010 global_id,
3011 GlobalExpressions {
3012 global_mir: optimized_plan.clone(),
3013 physical_plan: physical_plan.clone(),
3014 dataflow_metainfos: metainfo.clone(),
3015 optimizer_features: OptimizerFeatures::from(
3016 self.catalog().system_config(),
3017 ),
3018 },
3019 );
3020 (optimized_plan, physical_plan, metainfo)
3021 }
3022 };
3023
3024 let catalog = self.catalog_mut();
3025 catalog.set_optimized_plan(idx.global_id(), optimized_plan);
3026 catalog.set_physical_plan(idx.global_id(), physical_plan);
3027 catalog.set_dataflow_metainfo(idx.global_id(), metainfo);
3028
3029 compute_instance.insert_collection(idx.global_id());
3030 }
3031 CatalogItem::MaterializedView(mv) => {
3032 let compute_instance =
3034 instance_snapshots.entry(mv.cluster_id).or_insert_with(|| {
3035 self.instance_snapshot(mv.cluster_id)
3036 .expect("compute instance exists")
3037 });
3038 let global_id = mv.global_id();
3039
3040 let (optimized_plan, physical_plan, metainfo) =
3041 match cached_global_exprs.remove(&global_id) {
3042 Some(global_expressions)
3043 if global_expressions.optimizer_features
3044 == optimizer_config.features =>
3045 {
3046 debug!("global expression cache hit for {global_id:?}");
3047 (
3048 global_expressions.global_mir,
3049 global_expressions.physical_plan,
3050 global_expressions.dataflow_metainfos,
3051 )
3052 }
3053 Some(_) | None => {
3054 let (_, internal_view_id) = self.allocate_transient_id();
3055 let debug_name = self
3056 .catalog()
3057 .resolve_full_name(entry.name(), None)
3058 .to_string();
3059 let force_non_monotonic = Default::default();
3060
3061 let (optimized_plan, global_lir_plan) = {
3062 let mut optimizer = optimize::materialized_view::Optimizer::new(
3064 self.owned_catalog().as_optimizer_catalog(),
3065 compute_instance.clone(),
3066 global_id,
3067 internal_view_id,
3068 mv.desc.iter_names().cloned().collect(),
3069 mv.non_null_assertions.clone(),
3070 mv.refresh_schedule.clone(),
3071 debug_name,
3072 optimizer_config.clone(),
3073 self.optimizer_metrics(),
3074 force_non_monotonic,
3075 );
3076
3077 let global_mir_plan =
3079 optimizer.optimize(mv.optimized_expr.as_ref().clone())?;
3080 let optimized_plan = global_mir_plan.df_desc().clone();
3081
3082 let global_lir_plan = optimizer.optimize(global_mir_plan)?;
3084
3085 (optimized_plan, global_lir_plan)
3086 };
3087
3088 let (physical_plan, metainfo) = global_lir_plan.unapply();
3089 let metainfo = {
3090 let notice_ids =
3092 std::iter::repeat_with(|| self.allocate_transient_id())
3093 .map(|(_item_id, global_id)| global_id)
3094 .take(metainfo.optimizer_notices.len())
3095 .collect::<Vec<_>>();
3096 self.catalog().render_notices(
3098 metainfo,
3099 notice_ids,
3100 Some(mv.global_id()),
3101 )
3102 };
3103 uncached_expressions.insert(
3104 global_id,
3105 GlobalExpressions {
3106 global_mir: optimized_plan.clone(),
3107 physical_plan: physical_plan.clone(),
3108 dataflow_metainfos: metainfo.clone(),
3109 optimizer_features: OptimizerFeatures::from(
3110 self.catalog().system_config(),
3111 ),
3112 },
3113 );
3114 (optimized_plan, physical_plan, metainfo)
3115 }
3116 };
3117
3118 let catalog = self.catalog_mut();
3119 catalog.set_optimized_plan(mv.global_id(), optimized_plan);
3120 catalog.set_physical_plan(mv.global_id(), physical_plan);
3121 catalog.set_dataflow_metainfo(mv.global_id(), metainfo);
3122
3123 compute_instance.insert_collection(mv.global_id());
3124 }
3125 CatalogItem::ContinualTask(ct) => {
3126 let compute_instance =
3127 instance_snapshots.entry(ct.cluster_id).or_insert_with(|| {
3128 self.instance_snapshot(ct.cluster_id)
3129 .expect("compute instance exists")
3130 });
3131 let global_id = ct.global_id();
3132
3133 let (optimized_plan, physical_plan, metainfo) =
3134 match cached_global_exprs.remove(&global_id) {
3135 Some(global_expressions)
3136 if global_expressions.optimizer_features
3137 == optimizer_config.features =>
3138 {
3139 debug!("global expression cache hit for {global_id:?}");
3140 (
3141 global_expressions.global_mir,
3142 global_expressions.physical_plan,
3143 global_expressions.dataflow_metainfos,
3144 )
3145 }
3146 Some(_) | None => {
3147 let debug_name = self
3148 .catalog()
3149 .resolve_full_name(entry.name(), None)
3150 .to_string();
3151 let (optimized_plan, physical_plan, metainfo) = self
3152 .optimize_create_continual_task(
3153 ct,
3154 global_id,
3155 self.owned_catalog(),
3156 debug_name,
3157 )?;
3158 uncached_expressions.insert(
3159 global_id,
3160 GlobalExpressions {
3161 global_mir: optimized_plan.clone(),
3162 physical_plan: physical_plan.clone(),
3163 dataflow_metainfos: metainfo.clone(),
3164 optimizer_features: OptimizerFeatures::from(
3165 self.catalog().system_config(),
3166 ),
3167 },
3168 );
3169 (optimized_plan, physical_plan, metainfo)
3170 }
3171 };
3172
3173 let catalog = self.catalog_mut();
3174 catalog.set_optimized_plan(ct.global_id(), optimized_plan);
3175 catalog.set_physical_plan(ct.global_id(), physical_plan);
3176 catalog.set_dataflow_metainfo(ct.global_id(), metainfo);
3177
3178 compute_instance.insert_collection(ct.global_id());
3179 }
3180 _ => (),
3181 }
3182 }
3183
3184 Ok(uncached_expressions)
3185 }
3186
3187 async fn bootstrap_dataflow_as_ofs(&mut self) -> BTreeMap<GlobalId, ReadHold<Timestamp>> {
3197 let mut catalog_ids = Vec::new();
3198 let mut dataflows = Vec::new();
3199 let mut read_policies = BTreeMap::new();
3200 for entry in self.catalog.entries() {
3201 let gid = match entry.item() {
3202 CatalogItem::Index(idx) => idx.global_id(),
3203 CatalogItem::MaterializedView(mv) => mv.global_id(),
3204 CatalogItem::ContinualTask(ct) => ct.global_id(),
3205 CatalogItem::Table(_)
3206 | CatalogItem::Source(_)
3207 | CatalogItem::Log(_)
3208 | CatalogItem::View(_)
3209 | CatalogItem::Sink(_)
3210 | CatalogItem::Type(_)
3211 | CatalogItem::Func(_)
3212 | CatalogItem::Secret(_)
3213 | CatalogItem::Connection(_) => continue,
3214 };
3215 if let Some(plan) = self.catalog.try_get_physical_plan(&gid) {
3216 catalog_ids.push(gid);
3217 dataflows.push(plan.clone());
3218
3219 if let Some(compaction_window) = entry.item().initial_logical_compaction_window() {
3220 read_policies.insert(gid, compaction_window.into());
3221 }
3222 }
3223 }
3224
3225 let read_ts = self.get_local_read_ts().await;
3226 let read_holds = as_of_selection::run(
3227 &mut dataflows,
3228 &read_policies,
3229 &*self.controller.storage_collections,
3230 read_ts,
3231 );
3232
3233 let catalog = self.catalog_mut();
3234 for (id, plan) in catalog_ids.into_iter().zip(dataflows) {
3235 catalog.set_physical_plan(id, plan);
3236 }
3237
3238 read_holds
3239 }
3240
3241 fn serve(
3250 mut self,
3251 mut internal_cmd_rx: mpsc::UnboundedReceiver<Message>,
3252 mut strict_serializable_reads_rx: mpsc::UnboundedReceiver<(ConnectionId, PendingReadTxn)>,
3253 mut cmd_rx: mpsc::UnboundedReceiver<(OpenTelemetryContext, Command)>,
3254 group_commit_rx: appends::GroupCommitWaiter,
3255 ) -> LocalBoxFuture<'static, ()> {
3256 async move {
3257 let mut cluster_events = self.controller.events_stream();
3259 let last_message = Arc::new(Mutex::new(LastMessage {
3260 kind: "none",
3261 stmt: None,
3262 }));
3263
3264 let (idle_tx, mut idle_rx) = tokio::sync::mpsc::channel(1);
3265 let idle_metric = self.metrics.queue_busy_seconds.with_label_values(&[]);
3266 let last_message_watchdog = Arc::clone(&last_message);
3267
3268 spawn(|| "coord watchdog", async move {
3269 let mut interval = tokio::time::interval(Duration::from_secs(5));
3274 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
3278
3279 let mut coord_stuck = false;
3281
3282 loop {
3283 interval.tick().await;
3284
3285 let duration = tokio::time::Duration::from_secs(30);
3287 let timeout = tokio::time::timeout(duration, idle_tx.reserve()).await;
3288 let Ok(maybe_permit) = timeout else {
3289 if !coord_stuck {
3291 let last_message = last_message_watchdog.lock().expect("poisoned");
3292 tracing::warn!(
3293 last_message_kind = %last_message.kind,
3294 last_message_sql = %last_message.stmt_to_string(),
3295 "coordinator stuck for {duration:?}",
3296 );
3297 }
3298 coord_stuck = true;
3299
3300 continue;
3301 };
3302
3303 if coord_stuck {
3305 tracing::info!("Coordinator became unstuck");
3306 }
3307 coord_stuck = false;
3308
3309 let Ok(permit) = maybe_permit else {
3311 break;
3312 };
3313
3314 permit.send(idle_metric.start_timer());
3315 }
3316 });
3317
3318 self.schedule_storage_usage_collection().await;
3319 self.spawn_privatelink_vpc_endpoints_watch_task();
3320 self.spawn_statement_logging_task();
3321 flags::tracing_config(self.catalog.system_config()).apply(&self.tracing_handle);
3322
3323 let warn_threshold = self
3325 .catalog()
3326 .system_config()
3327 .coord_slow_message_warn_threshold();
3328
3329 const MESSAGE_BATCH: usize = 64;
3331 let mut messages = Vec::with_capacity(MESSAGE_BATCH);
3332 let mut cmd_messages = Vec::with_capacity(MESSAGE_BATCH);
3333
3334 let message_batch = self.metrics
3335 .message_batch
3336 .with_label_values(&[]);
3337
3338 loop {
3339 select! {
3343 biased;
3348
3349 _ = internal_cmd_rx.recv_many(&mut messages, MESSAGE_BATCH) => {},
3353 Some(event) = cluster_events.next() => messages.push(Message::ClusterEvent(event)),
3357 () = self.controller.ready() => {
3361 messages.push(Message::ControllerReady);
3362 }
3363 permit = group_commit_rx.ready() => {
3366 let user_write_spans = self.pending_writes.iter().flat_map(|x| match x {
3372 PendingWriteTxn::User{span, ..} => Some(span),
3373 PendingWriteTxn::System{..} => None,
3374 });
3375 let span = match user_write_spans.exactly_one() {
3376 Ok(span) => span.clone(),
3377 Err(user_write_spans) => {
3378 let span = info_span!(parent: None, "group_commit_notify");
3379 for s in user_write_spans {
3380 span.follows_from(s);
3381 }
3382 span
3383 }
3384 };
3385 messages.push(Message::GroupCommitInitiate(span, Some(permit)));
3386 },
3387 count = cmd_rx.recv_many(&mut cmd_messages, MESSAGE_BATCH) => {
3391 if count == 0 {
3392 break;
3393 } else {
3394 messages.extend(cmd_messages.drain(..).map(|(otel_ctx, cmd)| Message::Command(otel_ctx, cmd)));
3395 }
3396 },
3397 Some(pending_read_txn) = strict_serializable_reads_rx.recv() => {
3401 let mut pending_read_txns = vec![pending_read_txn];
3402 while let Ok(pending_read_txn) = strict_serializable_reads_rx.try_recv() {
3403 pending_read_txns.push(pending_read_txn);
3404 }
3405 for (conn_id, pending_read_txn) in pending_read_txns {
3406 let prev = self.pending_linearize_read_txns.insert(conn_id, pending_read_txn);
3407 soft_assert_or_log!(
3408 prev.is_none(),
3409 "connections can not have multiple concurrent reads, prev: {prev:?}"
3410 )
3411 }
3412 messages.push(Message::LinearizeReads);
3413 }
3414 _ = self.advance_timelines_interval.tick() => {
3418 let span = info_span!(parent: None, "coord::advance_timelines_interval");
3419 span.follows_from(Span::current());
3420
3421 if self.controller.read_only() {
3426 messages.push(Message::AdvanceTimelines);
3427 } else {
3428 messages.push(Message::GroupCommitInitiate(span, None));
3429 }
3430 },
3431 _ = self.check_cluster_scheduling_policies_interval.tick() => {
3435 messages.push(Message::CheckSchedulingPolicies);
3436 },
3437
3438 _ = self.caught_up_check_interval.tick() => {
3442 self.maybe_check_caught_up().await;
3447
3448 continue;
3449 },
3450
3451 timer = idle_rx.recv() => {
3456 timer.expect("does not drop").observe_duration();
3457 self.metrics
3458 .message_handling
3459 .with_label_values(&["watchdog"])
3460 .observe(0.0);
3461 continue;
3462 }
3463 };
3464
3465 message_batch.observe(f64::cast_lossy(messages.len()));
3467
3468 for msg in messages.drain(..) {
3469 let msg_kind = msg.kind();
3472 let span = span!(
3473 target: "mz_adapter::coord::handle_message_loop",
3474 Level::INFO,
3475 "coord::handle_message",
3476 kind = msg_kind
3477 );
3478 let otel_context = span.context().span().span_context().clone();
3479
3480 *last_message.lock().expect("poisoned") = LastMessage {
3484 kind: msg_kind,
3485 stmt: match &msg {
3486 Message::Command(
3487 _,
3488 Command::Execute {
3489 portal_name,
3490 session,
3491 ..
3492 },
3493 ) => session
3494 .get_portal_unverified(portal_name)
3495 .and_then(|p| p.stmt.as_ref().map(Arc::clone)),
3496 _ => None,
3497 },
3498 };
3499
3500 let start = Instant::now();
3501 self.handle_message(msg).instrument(span).await;
3502 let duration = start.elapsed();
3503
3504 self.metrics
3505 .message_handling
3506 .with_label_values(&[msg_kind])
3507 .observe(duration.as_secs_f64());
3508
3509 if duration > warn_threshold {
3511 let trace_id = otel_context.is_valid().then(|| otel_context.trace_id());
3512 tracing::error!(
3513 ?msg_kind,
3514 ?trace_id,
3515 ?duration,
3516 "very slow coordinator message"
3517 );
3518 }
3519 }
3520 }
3521 if let Some(catalog) = Arc::into_inner(self.catalog) {
3524 catalog.expire().await;
3525 }
3526 }
3527 .boxed_local()
3528 }
3529
3530 fn catalog(&self) -> &Catalog {
3532 &self.catalog
3533 }
3534
3535 fn owned_catalog(&self) -> Arc<Catalog> {
3538 Arc::clone(&self.catalog)
3539 }
3540
3541 fn optimizer_metrics(&self) -> OptimizerMetrics {
3544 self.optimizer_metrics.clone()
3545 }
3546
3547 fn catalog_mut(&mut self) -> &mut Catalog {
3549 Arc::make_mut(&mut self.catalog)
3557 }
3558
3559 fn connection_context(&self) -> &ConnectionContext {
3561 self.controller.connection_context()
3562 }
3563
3564 fn secrets_reader(&self) -> &Arc<dyn SecretsReader> {
3566 &self.connection_context().secrets_reader
3567 }
3568
3569 #[allow(dead_code)]
3574 pub(crate) fn broadcast_notice(&self, notice: AdapterNotice) {
3575 for meta in self.active_conns.values() {
3576 let _ = meta.notice_tx.send(notice.clone());
3577 }
3578 }
3579
3580 pub(crate) fn broadcast_notice_tx(
3583 &self,
3584 ) -> Box<dyn FnOnce(AdapterNotice) -> () + Send + 'static> {
3585 let senders: Vec<_> = self
3586 .active_conns
3587 .values()
3588 .map(|meta| meta.notice_tx.clone())
3589 .collect();
3590 Box::new(move |notice| {
3591 for tx in senders {
3592 let _ = tx.send(notice.clone());
3593 }
3594 })
3595 }
3596
3597 pub(crate) fn active_conns(&self) -> &BTreeMap<ConnectionId, ConnMeta> {
3598 &self.active_conns
3599 }
3600
3601 #[instrument(level = "debug")]
3602 pub(crate) fn retire_execution(
3603 &mut self,
3604 reason: StatementEndedExecutionReason,
3605 ctx_extra: ExecuteContextExtra,
3606 ) {
3607 if let Some(uuid) = ctx_extra.retire() {
3608 self.end_statement_execution(uuid, reason);
3609 }
3610 }
3611
3612 #[instrument(level = "debug")]
3614 pub fn dataflow_builder(&self, instance: ComputeInstanceId) -> DataflowBuilder {
3615 let compute = self
3616 .instance_snapshot(instance)
3617 .expect("compute instance does not exist");
3618 DataflowBuilder::new(self.catalog().state(), compute)
3619 }
3620
3621 pub fn instance_snapshot(
3623 &self,
3624 id: ComputeInstanceId,
3625 ) -> Result<ComputeInstanceSnapshot, InstanceMissing> {
3626 ComputeInstanceSnapshot::new(&self.controller, id)
3627 }
3628
3629 pub(crate) async fn ship_dataflow(
3632 &mut self,
3633 dataflow: DataflowDescription<Plan>,
3634 instance: ComputeInstanceId,
3635 subscribe_target_replica: Option<ReplicaId>,
3636 ) {
3637 let export_ids = dataflow.exported_index_ids().collect();
3640
3641 self.controller
3642 .compute
3643 .create_dataflow(instance, dataflow, subscribe_target_replica)
3644 .unwrap_or_terminate("dataflow creation cannot fail");
3645
3646 self.initialize_compute_read_policies(export_ids, instance, CompactionWindow::Default)
3647 .await;
3648 }
3649
3650 pub(crate) async fn ship_dataflow_and_notice_builtin_table_updates(
3652 &mut self,
3653 dataflow: DataflowDescription<Plan>,
3654 instance: ComputeInstanceId,
3655 notice_builtin_updates_fut: Option<BuiltinTableAppendNotify>,
3656 ) {
3657 if let Some(notice_builtin_updates_fut) = notice_builtin_updates_fut {
3658 let ship_dataflow_fut = self.ship_dataflow(dataflow, instance, None);
3659 let ((), ()) =
3660 futures::future::join(notice_builtin_updates_fut, ship_dataflow_fut).await;
3661 } else {
3662 self.ship_dataflow(dataflow, instance, None).await;
3663 }
3664 }
3665
3666 pub fn install_compute_watch_set(
3670 &mut self,
3671 conn_id: ConnectionId,
3672 objects: BTreeSet<GlobalId>,
3673 t: Timestamp,
3674 state: WatchSetResponse,
3675 ) {
3676 let ws_id = self.controller.install_compute_watch_set(objects, t);
3677 self.connection_watch_sets
3678 .entry(conn_id.clone())
3679 .or_default()
3680 .insert(ws_id);
3681 self.installed_watch_sets.insert(ws_id, (conn_id, state));
3682 }
3683
3684 pub fn install_storage_watch_set(
3688 &mut self,
3689 conn_id: ConnectionId,
3690 objects: BTreeSet<GlobalId>,
3691 t: Timestamp,
3692 state: WatchSetResponse,
3693 ) {
3694 let ws_id = self.controller.install_storage_watch_set(objects, t);
3695 self.connection_watch_sets
3696 .entry(conn_id.clone())
3697 .or_default()
3698 .insert(ws_id);
3699 self.installed_watch_sets.insert(ws_id, (conn_id, state));
3700 }
3701
3702 pub fn cancel_pending_watchsets(&mut self, conn_id: &ConnectionId) {
3704 if let Some(ws_ids) = self.connection_watch_sets.remove(conn_id) {
3705 for ws_id in ws_ids {
3706 self.installed_watch_sets.remove(&ws_id);
3707 }
3708 }
3709 }
3710
3711 pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
3715 let global_timelines: BTreeMap<_, _> = self
3721 .global_timelines
3722 .iter()
3723 .map(|(timeline, state)| (timeline.to_string(), format!("{state:?}")))
3724 .collect();
3725 let active_conns: BTreeMap<_, _> = self
3726 .active_conns
3727 .iter()
3728 .map(|(id, meta)| (id.unhandled().to_string(), format!("{meta:?}")))
3729 .collect();
3730 let txn_read_holds: BTreeMap<_, _> = self
3731 .txn_read_holds
3732 .iter()
3733 .map(|(id, capability)| (id.unhandled().to_string(), format!("{capability:?}")))
3734 .collect();
3735 let pending_peeks: BTreeMap<_, _> = self
3736 .pending_peeks
3737 .iter()
3738 .map(|(id, peek)| (id.to_string(), format!("{peek:?}")))
3739 .collect();
3740 let client_pending_peeks: BTreeMap<_, _> = self
3741 .client_pending_peeks
3742 .iter()
3743 .map(|(id, peek)| {
3744 let peek: BTreeMap<_, _> = peek
3745 .iter()
3746 .map(|(uuid, storage_id)| (uuid.to_string(), storage_id))
3747 .collect();
3748 (id.to_string(), peek)
3749 })
3750 .collect();
3751 let pending_linearize_read_txns: BTreeMap<_, _> = self
3752 .pending_linearize_read_txns
3753 .iter()
3754 .map(|(id, read_txn)| (id.unhandled().to_string(), format!("{read_txn:?}")))
3755 .collect();
3756
3757 let map = serde_json::Map::from_iter([
3758 (
3759 "global_timelines".to_string(),
3760 serde_json::to_value(global_timelines)?,
3761 ),
3762 (
3763 "active_conns".to_string(),
3764 serde_json::to_value(active_conns)?,
3765 ),
3766 (
3767 "txn_read_holds".to_string(),
3768 serde_json::to_value(txn_read_holds)?,
3769 ),
3770 (
3771 "pending_peeks".to_string(),
3772 serde_json::to_value(pending_peeks)?,
3773 ),
3774 (
3775 "client_pending_peeks".to_string(),
3776 serde_json::to_value(client_pending_peeks)?,
3777 ),
3778 (
3779 "pending_linearize_read_txns".to_string(),
3780 serde_json::to_value(pending_linearize_read_txns)?,
3781 ),
3782 ("controller".to_string(), self.controller.dump().await?),
3783 ]);
3784 Ok(serde_json::Value::Object(map))
3785 }
3786
3787 async fn prune_storage_usage_events_on_startup(&self, retention_period: Duration) {
3801 let item_id = self
3802 .catalog()
3803 .resolve_builtin_table(&MZ_STORAGE_USAGE_BY_SHARD);
3804 let global_id = self.catalog.get_entry(&item_id).latest_global_id();
3805 let read_ts = self.get_local_read_ts().await;
3806 let current_contents_fut = self
3807 .controller
3808 .storage_collections
3809 .snapshot(global_id, read_ts);
3810 let internal_cmd_tx = self.internal_cmd_tx.clone();
3811 spawn(|| "storage_usage_prune", async move {
3812 let mut current_contents = current_contents_fut
3813 .await
3814 .unwrap_or_terminate("cannot fail to fetch snapshot");
3815 differential_dataflow::consolidation::consolidate(&mut current_contents);
3816
3817 let cutoff_ts = u128::from(read_ts).saturating_sub(retention_period.as_millis());
3818 let mut expired = Vec::new();
3819 for (row, diff) in current_contents {
3820 assert_eq!(
3821 diff, 1,
3822 "consolidated contents should not contain retractions: ({row:#?}, {diff:#?})"
3823 );
3824 let collection_timestamp = row
3826 .unpack()
3827 .get(3)
3828 .expect("definition of mz_storage_by_shard changed")
3829 .unwrap_timestamptz();
3830 let collection_timestamp = collection_timestamp.timestamp_millis();
3831 let collection_timestamp: u128 = collection_timestamp
3832 .try_into()
3833 .expect("all collections happen after Jan 1 1970");
3834 if collection_timestamp < cutoff_ts {
3835 debug!("pruning storage event {row:?}");
3836 let builtin_update = BuiltinTableUpdate::row(item_id, row, Diff::MINUS_ONE);
3837 expired.push(builtin_update);
3838 }
3839 }
3840
3841 let _ = internal_cmd_tx.send(Message::StorageUsagePrune(expired));
3843 });
3844 }
3845}
3846
3847#[cfg(test)]
3848impl Coordinator {
3849 #[allow(dead_code)]
3850 async fn verify_ship_dataflow_no_error(&mut self, dataflow: DataflowDescription<Plan>) {
3851 let compute_instance = ComputeInstanceId::user(1).expect("1 is a valid ID");
3859
3860 let _: () = self.ship_dataflow(dataflow, compute_instance, None).await;
3861 }
3862}
3863
3864struct LastMessage {
3866 kind: &'static str,
3867 stmt: Option<Arc<Statement<Raw>>>,
3868}
3869
3870impl LastMessage {
3871 fn stmt_to_string(&self) -> Cow<'static, str> {
3873 self.stmt
3874 .as_ref()
3875 .map(|stmt| stmt.to_ast_string_redacted().into())
3876 .unwrap_or("<none>".into())
3877 }
3878}
3879
3880impl fmt::Debug for LastMessage {
3881 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3882 f.debug_struct("LastMessage")
3883 .field("kind", &self.kind)
3884 .field("stmt", &self.stmt_to_string())
3885 .finish()
3886 }
3887}
3888
3889impl Drop for LastMessage {
3890 fn drop(&mut self) {
3891 if std::thread::panicking() {
3893 eprintln!("Coordinator panicking, dumping last message\n{self:?}",);
3895 }
3896 }
3897}
3898
3899pub fn serve(
3911 Config {
3912 controller_config,
3913 controller_envd_epoch,
3914 mut storage,
3915 audit_logs_iterator,
3916 timestamp_oracle_url,
3917 unsafe_mode,
3918 all_features,
3919 build_info,
3920 environment_id,
3921 metrics_registry,
3922 now,
3923 secrets_controller,
3924 cloud_resource_controller,
3925 cluster_replica_sizes,
3926 builtin_system_cluster_config,
3927 builtin_catalog_server_cluster_config,
3928 builtin_probe_cluster_config,
3929 builtin_support_cluster_config,
3930 builtin_analytics_cluster_config,
3931 system_parameter_defaults,
3932 availability_zones,
3933 storage_usage_client,
3934 storage_usage_collection_interval,
3935 storage_usage_retention_period,
3936 segment_client,
3937 egress_addresses,
3938 aws_account_id,
3939 aws_privatelink_availability_zones,
3940 connection_context,
3941 connection_limit_callback,
3942 remote_system_parameters,
3943 webhook_concurrency_limit,
3944 http_host_name,
3945 tracing_handle,
3946 read_only_controllers,
3947 enable_0dt_deployment,
3948 caught_up_trigger: clusters_caught_up_trigger,
3949 helm_chart_version,
3950 license_key,
3951 }: Config,
3952) -> BoxFuture<'static, Result<(Handle, Client), AdapterError>> {
3953 async move {
3954 let coord_start = Instant::now();
3955 info!("startup: coordinator init: beginning");
3956 info!("startup: coordinator init: preamble beginning");
3957
3958 let _builtins = LazyLock::force(&BUILTINS_STATIC);
3962
3963 let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
3964 let (internal_cmd_tx, internal_cmd_rx) = mpsc::unbounded_channel();
3965 let (strict_serializable_reads_tx, strict_serializable_reads_rx) =
3966 mpsc::unbounded_channel();
3967
3968 if !availability_zones.iter().all_unique() {
3970 coord_bail!("availability zones must be unique");
3971 }
3972
3973 let aws_principal_context = match (
3974 aws_account_id,
3975 connection_context.aws_external_id_prefix.clone(),
3976 ) {
3977 (Some(aws_account_id), Some(aws_external_id_prefix)) => Some(AwsPrincipalContext {
3978 aws_account_id,
3979 aws_external_id_prefix,
3980 }),
3981 _ => None,
3982 };
3983
3984 let aws_privatelink_availability_zones = aws_privatelink_availability_zones
3985 .map(|azs_vec| BTreeSet::from_iter(azs_vec.iter().cloned()));
3986
3987 info!(
3988 "startup: coordinator init: preamble complete in {:?}",
3989 coord_start.elapsed()
3990 );
3991 let oracle_init_start = Instant::now();
3992 info!("startup: coordinator init: timestamp oracle init beginning");
3993
3994 let pg_timestamp_oracle_config = timestamp_oracle_url
3995 .map(|pg_url| PostgresTimestampOracleConfig::new(&pg_url, &metrics_registry));
3996 let mut initial_timestamps =
3997 get_initial_oracle_timestamps(&pg_timestamp_oracle_config).await?;
3998
3999 initial_timestamps
4003 .entry(Timeline::EpochMilliseconds)
4004 .or_insert_with(mz_repr::Timestamp::minimum);
4005 let mut timestamp_oracles = BTreeMap::new();
4006 for (timeline, initial_timestamp) in initial_timestamps {
4007 Coordinator::ensure_timeline_state_with_initial_time(
4008 &timeline,
4009 initial_timestamp,
4010 now.clone(),
4011 pg_timestamp_oracle_config.clone(),
4012 &mut timestamp_oracles,
4013 read_only_controllers,
4014 )
4015 .await;
4016 }
4017
4018 let catalog_upper = storage.current_upper().await;
4022 let epoch_millis_oracle = ×tamp_oracles
4028 .get(&Timeline::EpochMilliseconds)
4029 .expect("inserted above")
4030 .oracle;
4031
4032 let mut boot_ts = if read_only_controllers {
4033 let read_ts = epoch_millis_oracle.read_ts().await;
4034 std::cmp::max(read_ts, catalog_upper)
4035 } else {
4036 epoch_millis_oracle.apply_write(catalog_upper).await;
4039 epoch_millis_oracle.write_ts().await.timestamp
4040 };
4041
4042 info!(
4043 "startup: coordinator init: timestamp oracle init complete in {:?}",
4044 oracle_init_start.elapsed()
4045 );
4046
4047 let catalog_open_start = Instant::now();
4048 info!("startup: coordinator init: catalog open beginning");
4049 let persist_client = controller_config
4050 .persist_clients
4051 .open(controller_config.persist_location.clone())
4052 .await
4053 .context("opening persist client")?;
4054 let builtin_item_migration_config =
4055 BuiltinItemMigrationConfig {
4056 persist_client: persist_client.clone(),
4057 read_only: read_only_controllers,
4058 }
4059 ;
4060 let OpenCatalogResult {
4061 mut catalog,
4062 migrated_storage_collections_0dt,
4063 new_builtin_collections,
4064 builtin_table_updates,
4065 cached_global_exprs,
4066 uncached_local_exprs,
4067 } = Catalog::open(mz_catalog::config::Config {
4068 storage,
4069 metrics_registry: &metrics_registry,
4070 state: mz_catalog::config::StateConfig {
4071 unsafe_mode,
4072 all_features,
4073 build_info,
4074 environment_id: environment_id.clone(),
4075 read_only: read_only_controllers,
4076 now: now.clone(),
4077 boot_ts: boot_ts.clone(),
4078 skip_migrations: false,
4079 cluster_replica_sizes,
4080 builtin_system_cluster_config,
4081 builtin_catalog_server_cluster_config,
4082 builtin_probe_cluster_config,
4083 builtin_support_cluster_config,
4084 builtin_analytics_cluster_config,
4085 system_parameter_defaults,
4086 remote_system_parameters,
4087 availability_zones,
4088 egress_addresses,
4089 aws_principal_context,
4090 aws_privatelink_availability_zones,
4091 connection_context,
4092 http_host_name,
4093 builtin_item_migration_config,
4094 persist_client: persist_client.clone(),
4095 enable_expression_cache_override: None,
4096 enable_0dt_deployment,
4097 helm_chart_version,
4098 },
4099 })
4100 .await?;
4101
4102 let catalog_upper = catalog.current_upper().await;
4105 boot_ts = std::cmp::max(boot_ts, catalog_upper);
4106
4107 if !read_only_controllers {
4108 epoch_millis_oracle.apply_write(boot_ts).await;
4109 }
4110
4111 info!(
4112 "startup: coordinator init: catalog open complete in {:?}",
4113 catalog_open_start.elapsed()
4114 );
4115
4116 let coord_thread_start = Instant::now();
4117 info!("startup: coordinator init: coordinator thread start beginning");
4118
4119 let session_id = catalog.config().session_id;
4120 let start_instant = catalog.config().start_instant;
4121
4122 let (bootstrap_tx, bootstrap_rx) = oneshot::channel();
4126 let handle = TokioHandle::current();
4127
4128 let metrics = Metrics::register_into(&metrics_registry);
4129 let metrics_clone = metrics.clone();
4130 let optimizer_metrics = OptimizerMetrics::register_into(
4131 &metrics_registry,
4132 catalog.system_config().optimizer_e2e_latency_warning_threshold(),
4133 );
4134 let segment_client_clone = segment_client.clone();
4135 let coord_now = now.clone();
4136 let advance_timelines_interval = tokio::time::interval(catalog.config().timestamp_interval);
4137 let mut check_scheduling_policies_interval = tokio::time::interval(
4138 catalog
4139 .system_config()
4140 .cluster_check_scheduling_policies_interval(),
4141 );
4142 check_scheduling_policies_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
4143
4144 let clusters_caught_up_check_interval = if read_only_controllers {
4145 let dyncfgs = catalog.system_config().dyncfgs();
4146 let interval = WITH_0DT_DEPLOYMENT_CAUGHT_UP_CHECK_INTERVAL.get(dyncfgs);
4147
4148 let mut interval = tokio::time::interval(interval);
4149 interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
4150 interval
4151 } else {
4152 let mut interval = tokio::time::interval(Duration::from_secs(60 * 60));
4160 interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
4161 interval
4162 };
4163
4164 let clusters_caught_up_check =
4165 clusters_caught_up_trigger.map(|trigger| CaughtUpCheckContext {
4166 trigger,
4167 exclude_collections: new_builtin_collections.into_iter().collect(),
4168 });
4169
4170 if let Some(config) = pg_timestamp_oracle_config.as_ref() {
4171 let pg_timestamp_oracle_params =
4174 flags::pg_timstamp_oracle_config(catalog.system_config());
4175 pg_timestamp_oracle_params.apply(config);
4176 }
4177
4178 let connection_limit_callback: Arc<dyn Fn(&SystemVars) + Send + Sync> =
4181 Arc::new(move |system_vars: &SystemVars| {
4182 let limit: u64 = system_vars.max_connections().cast_into();
4183 let superuser_reserved: u64 =
4184 system_vars.superuser_reserved_connections().cast_into();
4185
4186 let superuser_reserved = if superuser_reserved >= limit {
4191 tracing::warn!(
4192 "superuser_reserved ({superuser_reserved}) is greater than max connections ({limit})!"
4193 );
4194 limit
4195 } else {
4196 superuser_reserved
4197 };
4198
4199 (connection_limit_callback)(limit, superuser_reserved);
4200 });
4201 catalog.system_config_mut().register_callback(
4202 &mz_sql::session::vars::MAX_CONNECTIONS,
4203 Arc::clone(&connection_limit_callback),
4204 );
4205 catalog.system_config_mut().register_callback(
4206 &mz_sql::session::vars::SUPERUSER_RESERVED_CONNECTIONS,
4207 connection_limit_callback,
4208 );
4209
4210 let (group_commit_tx, group_commit_rx) = appends::notifier();
4211
4212 let parent_span = tracing::Span::current();
4213 let thread = thread::Builder::new()
4214 .stack_size(3 * stack::STACK_SIZE)
4218 .name("coordinator".to_string())
4219 .spawn(move || {
4220 let span = info_span!(parent: parent_span, "coord::coordinator").entered();
4221
4222 let controller = handle
4223 .block_on({
4224 catalog.initialize_controller(
4225 controller_config,
4226 controller_envd_epoch,
4227 read_only_controllers,
4228 )
4229 })
4230 .unwrap_or_terminate("failed to initialize storage_controller");
4231 let catalog_upper = handle.block_on(catalog.current_upper());
4234 boot_ts = std::cmp::max(boot_ts, catalog_upper);
4235 if !read_only_controllers {
4236 let epoch_millis_oracle = ×tamp_oracles
4237 .get(&Timeline::EpochMilliseconds)
4238 .expect("inserted above")
4239 .oracle;
4240 handle.block_on(epoch_millis_oracle.apply_write(boot_ts));
4241 }
4242
4243 let catalog = Arc::new(catalog);
4244
4245 let caching_secrets_reader = CachingSecretsReader::new(secrets_controller.reader());
4246 let mut coord = Coordinator {
4247 controller,
4248 catalog,
4249 internal_cmd_tx,
4250 group_commit_tx,
4251 strict_serializable_reads_tx,
4252 global_timelines: timestamp_oracles,
4253 transient_id_gen: Arc::new(TransientIdGen::new()),
4254 active_conns: BTreeMap::new(),
4255 txn_read_holds: Default::default(),
4256 pending_peeks: BTreeMap::new(),
4257 client_pending_peeks: BTreeMap::new(),
4258 pending_linearize_read_txns: BTreeMap::new(),
4259 serialized_ddl: LockedVecDeque::new(),
4260 active_compute_sinks: BTreeMap::new(),
4261 active_webhooks: BTreeMap::new(),
4262 active_copies: BTreeMap::new(),
4263 staged_cancellation: BTreeMap::new(),
4264 introspection_subscribes: BTreeMap::new(),
4265 write_locks: BTreeMap::new(),
4266 deferred_write_ops: BTreeMap::new(),
4267 pending_writes: Vec::new(),
4268 advance_timelines_interval,
4269 secrets_controller,
4270 caching_secrets_reader,
4271 cloud_resource_controller,
4272 transient_replica_metadata: BTreeMap::new(),
4273 storage_usage_client,
4274 storage_usage_collection_interval,
4275 segment_client,
4276 metrics,
4277 optimizer_metrics,
4278 tracing_handle,
4279 statement_logging: StatementLogging::new(coord_now.clone()),
4280 webhook_concurrency_limit,
4281 pg_timestamp_oracle_config,
4282 check_cluster_scheduling_policies_interval: check_scheduling_policies_interval,
4283 cluster_scheduling_decisions: BTreeMap::new(),
4284 caught_up_check_interval: clusters_caught_up_check_interval,
4285 caught_up_check: clusters_caught_up_check,
4286 installed_watch_sets: BTreeMap::new(),
4287 connection_watch_sets: BTreeMap::new(),
4288 cluster_replica_statuses: ClusterReplicaStatuses::new(),
4289 read_only_controllers,
4290 buffered_builtin_table_updates: Some(Vec::new()),
4291 license_key,
4292 };
4293 let bootstrap = handle.block_on(async {
4294 coord
4295 .bootstrap(
4296 boot_ts,
4297 migrated_storage_collections_0dt,
4298 builtin_table_updates,
4299 cached_global_exprs,
4300 uncached_local_exprs,
4301 audit_logs_iterator,
4302 )
4303 .await?;
4304 coord
4305 .controller
4306 .remove_orphaned_replicas(
4307 coord.catalog().get_next_user_replica_id().await?,
4308 coord.catalog().get_next_system_replica_id().await?,
4309 )
4310 .await
4311 .map_err(AdapterError::Orchestrator)?;
4312
4313 if let Some(retention_period) = storage_usage_retention_period {
4314 coord
4315 .prune_storage_usage_events_on_startup(retention_period)
4316 .await;
4317 }
4318
4319 Ok(())
4320 });
4321 let ok = bootstrap.is_ok();
4322 drop(span);
4323 bootstrap_tx
4324 .send(bootstrap)
4325 .expect("bootstrap_rx is not dropped until it receives this message");
4326 if ok {
4327 handle.block_on(coord.serve(
4328 internal_cmd_rx,
4329 strict_serializable_reads_rx,
4330 cmd_rx,
4331 group_commit_rx,
4332 ));
4333 }
4334 })
4335 .expect("failed to create coordinator thread");
4336 match bootstrap_rx
4337 .await
4338 .expect("bootstrap_tx always sends a message or panics/halts")
4339 {
4340 Ok(()) => {
4341 info!(
4342 "startup: coordinator init: coordinator thread start complete in {:?}",
4343 coord_thread_start.elapsed()
4344 );
4345 info!(
4346 "startup: coordinator init: complete in {:?}",
4347 coord_start.elapsed()
4348 );
4349 let handle = Handle {
4350 session_id,
4351 start_instant,
4352 _thread: thread.join_on_drop(),
4353 };
4354 let client = Client::new(
4355 build_info,
4356 cmd_tx.clone(),
4357 metrics_clone,
4358 now,
4359 environment_id,
4360 segment_client_clone,
4361 );
4362 Ok((handle, client))
4363 }
4364 Err(e) => Err(e),
4365 }
4366 }
4367 .boxed()
4368}
4369
4370async fn get_initial_oracle_timestamps(
4384 pg_timestamp_oracle_config: &Option<PostgresTimestampOracleConfig>,
4385) -> Result<BTreeMap<Timeline, Timestamp>, AdapterError> {
4386 let mut initial_timestamps = BTreeMap::new();
4387
4388 if let Some(pg_timestamp_oracle_config) = pg_timestamp_oracle_config {
4389 let postgres_oracle_timestamps =
4390 PostgresTimestampOracle::<NowFn>::get_all_timelines(pg_timestamp_oracle_config.clone())
4391 .await?;
4392
4393 let debug_msg = || {
4394 postgres_oracle_timestamps
4395 .iter()
4396 .map(|(timeline, ts)| format!("{:?} -> {}", timeline, ts))
4397 .join(", ")
4398 };
4399 info!(
4400 "current timestamps from the postgres-backed timestamp oracle: {}",
4401 debug_msg()
4402 );
4403
4404 for (timeline, ts) in postgres_oracle_timestamps {
4405 let entry = initial_timestamps
4406 .entry(Timeline::from_str(&timeline).expect("could not parse timeline"));
4407
4408 entry
4409 .and_modify(|current_ts| *current_ts = std::cmp::max(*current_ts, ts))
4410 .or_insert(ts);
4411 }
4412 } else {
4413 info!("no postgres url for postgres-backed timestamp oracle configured!");
4414 };
4415
4416 let debug_msg = || {
4417 initial_timestamps
4418 .iter()
4419 .map(|(timeline, ts)| format!("{:?}: {}", timeline, ts))
4420 .join(", ")
4421 };
4422 info!("initial oracle timestamps: {}", debug_msg());
4423
4424 Ok(initial_timestamps)
4425}
4426
4427#[instrument]
4428pub async fn load_remote_system_parameters(
4429 storage: &mut Box<dyn OpenableDurableCatalogState>,
4430 system_parameter_sync_config: Option<SystemParameterSyncConfig>,
4431 system_parameter_sync_timeout: Duration,
4432) -> Result<Option<BTreeMap<String, String>>, AdapterError> {
4433 if let Some(system_parameter_sync_config) = system_parameter_sync_config {
4434 tracing::info!("parameter sync on boot: start sync");
4435
4436 let mut params = SynchronizedParameters::new(SystemVars::default());
4476 let frontend_sync = async {
4477 let frontend = SystemParameterFrontend::from(&system_parameter_sync_config).await?;
4478 frontend.pull(&mut params);
4479 let ops = params
4480 .modified()
4481 .into_iter()
4482 .map(|param| {
4483 let name = param.name;
4484 let value = param.value;
4485 tracing::info!(name, value, initial = true, "sync parameter");
4486 (name, value)
4487 })
4488 .collect();
4489 tracing::info!("parameter sync on boot: end sync");
4490 Ok(Some(ops))
4491 };
4492 if !storage.has_system_config_synced_once().await? {
4493 frontend_sync.await
4494 } else {
4495 match mz_ore::future::timeout(system_parameter_sync_timeout, frontend_sync).await {
4496 Ok(ops) => Ok(ops),
4497 Err(TimeoutError::Inner(e)) => Err(e),
4498 Err(TimeoutError::DeadlineElapsed) => {
4499 tracing::info!("parameter sync on boot: sync has timed out");
4500 Ok(None)
4501 }
4502 }
4503 }
4504 } else {
4505 Ok(None)
4506 }
4507}
4508
4509#[derive(Debug)]
4510pub(crate) enum WatchSetResponse {
4511 StatementDependenciesReady(StatementLoggingId, StatementLifecycleEvent),
4512 AlterSinkReady(AlterSinkReadyContext),
4513}
4514
4515#[derive(Debug)]
4516pub(crate) struct AlterSinkReadyContext {
4517 ctx: Option<ExecuteContext>,
4518 otel_ctx: OpenTelemetryContext,
4519 plan: AlterSinkPlan,
4520 plan_validity: PlanValidity,
4521 resolved_ids: ResolvedIds,
4522 read_hold: ReadHolds<Timestamp>,
4523}
4524
4525impl AlterSinkReadyContext {
4526 fn ctx(&mut self) -> &mut ExecuteContext {
4527 self.ctx.as_mut().expect("only cleared on drop")
4528 }
4529
4530 fn retire(mut self, result: Result<ExecuteResponse, AdapterError>) {
4531 self.ctx
4532 .take()
4533 .expect("only cleared on drop")
4534 .retire(result);
4535 }
4536}
4537
4538impl Drop for AlterSinkReadyContext {
4539 fn drop(&mut self) {
4540 if let Some(ctx) = self.ctx.take() {
4541 ctx.retire(Err(AdapterError::Canceled));
4542 }
4543 }
4544}
4545
4546#[derive(Debug)]
4549struct LockedVecDeque<T> {
4550 items: VecDeque<T>,
4551 lock: Arc<tokio::sync::Mutex<()>>,
4552}
4553
4554impl<T> LockedVecDeque<T> {
4555 pub fn new() -> Self {
4556 Self {
4557 items: VecDeque::new(),
4558 lock: Arc::new(tokio::sync::Mutex::new(())),
4559 }
4560 }
4561
4562 pub fn try_lock_owned(&self) -> Result<OwnedMutexGuard<()>, tokio::sync::TryLockError> {
4563 Arc::clone(&self.lock).try_lock_owned()
4564 }
4565
4566 pub fn is_empty(&self) -> bool {
4567 self.items.is_empty()
4568 }
4569
4570 pub fn push_back(&mut self, value: T) {
4571 self.items.push_back(value)
4572 }
4573
4574 pub fn pop_front(&mut self) -> Option<T> {
4575 self.items.pop_front()
4576 }
4577
4578 pub fn remove(&mut self, index: usize) -> Option<T> {
4579 self.items.remove(index)
4580 }
4581
4582 pub fn iter(&self) -> std::collections::vec_deque::Iter<'_, T> {
4583 self.items.iter()
4584 }
4585}
4586
4587#[derive(Debug)]
4588struct DeferredPlanStatement {
4589 ctx: ExecuteContext,
4590 ps: PlanStatement,
4591}
4592
4593#[derive(Debug)]
4594enum PlanStatement {
4595 Statement {
4596 stmt: Arc<Statement<Raw>>,
4597 params: Params,
4598 },
4599 Plan {
4600 plan: mz_sql::plan::Plan,
4601 resolved_ids: ResolvedIds,
4602 },
4603}
4604
4605#[derive(Debug, Error)]
4606pub enum NetworkPolicyError {
4607 #[error("Access denied for address {0}")]
4608 AddressDenied(IpAddr),
4609 #[error("Access denied missing IP address")]
4610 MissingIp,
4611}
4612
4613pub(crate) fn validate_ip_with_policy_rules(
4614 ip: &IpAddr,
4615 rules: &Vec<NetworkPolicyRule>,
4616) -> Result<(), NetworkPolicyError> {
4617 if rules.iter().any(|r| r.address.0.contains(ip)) {
4620 Ok(())
4621 } else {
4622 Err(NetworkPolicyError::AddressDenied(ip.clone()))
4623 }
4624}