1use std::borrow::Cow;
70use std::collections::{BTreeMap, BTreeSet, VecDeque};
71use std::fmt;
72use std::net::IpAddr;
73use std::num::NonZeroI64;
74use std::ops::Neg;
75use std::str::FromStr;
76use std::sync::LazyLock;
77use std::sync::{Arc, Mutex};
78use std::thread;
79use std::time::{Duration, Instant};
80
81use anyhow::Context;
82use chrono::{DateTime, Utc};
83use derivative::Derivative;
84use differential_dataflow::lattice::Lattice;
85use fail::fail_point;
86use futures::StreamExt;
87use futures::future::{BoxFuture, FutureExt, LocalBoxFuture};
88use http::Uri;
89use ipnet::IpNet;
90use itertools::{Either, Itertools};
91use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
92use mz_adapter_types::compaction::CompactionWindow;
93use mz_adapter_types::connection::ConnectionId;
94use mz_adapter_types::dyncfgs::WITH_0DT_DEPLOYMENT_CAUGHT_UP_CHECK_INTERVAL;
95use mz_auth::password::Password;
96use mz_build_info::BuildInfo;
97use mz_catalog::builtin::{BUILTINS, BUILTINS_STATIC, MZ_AUDIT_EVENTS, MZ_STORAGE_USAGE_BY_SHARD};
98use mz_catalog::config::{AwsPrincipalContext, BuiltinItemMigrationConfig, ClusterReplicaSizeMap};
99use mz_catalog::durable::{AuditLogIterator, OpenableDurableCatalogState};
100use mz_catalog::expr_cache::{GlobalExpressions, LocalExpressions};
101use mz_catalog::memory::objects::{
102 CatalogEntry, CatalogItem, ClusterReplicaProcessStatus, ClusterVariantManaged, Connection,
103 DataSourceDesc, StateDiff, StateUpdate, StateUpdateKind, Table, TableDataSource,
104};
105use mz_cloud_resources::{CloudResourceController, VpcEndpointConfig, VpcEndpointEvent};
106use mz_compute_client::as_of_selection;
107use mz_compute_client::controller::error::InstanceMissing;
108use mz_compute_types::ComputeInstanceId;
109use mz_compute_types::dataflows::DataflowDescription;
110use mz_compute_types::plan::Plan;
111use mz_controller::clusters::{
112 ClusterConfig, ClusterEvent, ClusterStatus, ProcessId, ReplicaLocation,
113};
114use mz_controller::{ControllerConfig, Readiness};
115use mz_controller_types::{ClusterId, ReplicaId, WatchSetId};
116use mz_expr::{MapFilterProject, OptimizedMirRelationExpr, RowSetFinishing};
117use mz_license_keys::ValidatedLicenseKey;
118use mz_orchestrator::OfflineReason;
119use mz_ore::cast::{CastFrom, CastInto, CastLossy};
120use mz_ore::channel::trigger::Trigger;
121use mz_ore::future::TimeoutError;
122use mz_ore::metrics::MetricsRegistry;
123use mz_ore::now::{EpochMillis, NowFn};
124use mz_ore::task::{JoinHandle, spawn};
125use mz_ore::thread::JoinHandleExt;
126use mz_ore::tracing::{OpenTelemetryContext, TracingHandle};
127use mz_ore::url::SensitiveUrl;
128use mz_ore::{
129 assert_none, instrument, soft_assert_eq_or_log, soft_assert_or_log, soft_panic_or_log, stack,
130};
131use mz_persist_client::PersistClient;
132use mz_persist_client::batch::ProtoBatch;
133use mz_persist_client::usage::{ShardsUsageReferenced, StorageUsageClient};
134use mz_repr::adt::numeric::Numeric;
135use mz_repr::explain::{ExplainConfig, ExplainFormat};
136use mz_repr::global_id::TransientIdGen;
137use mz_repr::optimize::OptimizerFeatures;
138use mz_repr::role_id::RoleId;
139use mz_repr::{CatalogItemId, Diff, GlobalId, RelationDesc, Timestamp};
140use mz_secrets::cache::CachingSecretsReader;
141use mz_secrets::{SecretsController, SecretsReader};
142use mz_sql::ast::{Raw, Statement};
143use mz_sql::catalog::{CatalogCluster, EnvironmentId};
144use mz_sql::names::{QualifiedItemName, ResolvedIds, SchemaSpecifier};
145use mz_sql::optimizer_metrics::OptimizerMetrics;
146use mz_sql::plan::{
147 self, AlterSinkPlan, ConnectionDetails, CreateConnectionPlan, NetworkPolicyRule,
148 OnTimeoutAction, Params, QueryWhen,
149};
150use mz_sql::session::user::User;
151use mz_sql::session::vars::{MAX_CREDIT_CONSUMPTION_RATE, SystemVars, Var};
152use mz_sql_parser::ast::ExplainStage;
153use mz_sql_parser::ast::display::AstDisplay;
154use mz_storage_client::client::TableData;
155use mz_storage_client::controller::{CollectionDescription, DataSource, ExportDescription};
156use mz_storage_types::connections::Connection as StorageConnection;
157use mz_storage_types::connections::ConnectionContext;
158use mz_storage_types::connections::inline::{IntoInlineConnection, ReferencedConnection};
159use mz_storage_types::read_holds::ReadHold;
160use mz_storage_types::sinks::{S3SinkFormat, StorageSinkDesc};
161use mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC;
162use mz_storage_types::sources::{IngestionDescription, SourceExport, Timeline};
163use mz_timestamp_oracle::WriteTimestamp;
164use mz_timestamp_oracle::postgres_oracle::{
165 PostgresTimestampOracle, PostgresTimestampOracleConfig,
166};
167use mz_transform::dataflow::DataflowMetainfo;
168use opentelemetry::trace::TraceContextExt;
169use serde::Serialize;
170use thiserror::Error;
171use timely::progress::{Antichain, Timestamp as _};
172use tokio::runtime::Handle as TokioHandle;
173use tokio::select;
174use tokio::sync::{OwnedMutexGuard, mpsc, oneshot, watch};
175use tokio::time::{Interval, MissedTickBehavior};
176use tracing::{Instrument, Level, Span, debug, info, info_span, span, warn};
177use tracing_opentelemetry::OpenTelemetrySpanExt;
178use uuid::Uuid;
179
180use crate::active_compute_sink::{ActiveComputeSink, ActiveCopyFrom};
181use crate::catalog::{BuiltinTableUpdate, Catalog, OpenCatalogResult};
182use crate::client::{Client, Handle};
183use crate::command::{Command, ExecuteResponse};
184use crate::config::{SynchronizedParameters, SystemParameterFrontend, SystemParameterSyncConfig};
185use crate::coord::appends::{
186 BuiltinTableAppendNotify, DeferredOp, GroupCommitPermit, PendingWriteTxn,
187};
188use crate::coord::caught_up::CaughtUpCheckContext;
189use crate::coord::cluster_scheduling::SchedulingDecision;
190use crate::coord::id_bundle::CollectionIdBundle;
191use crate::coord::introspection::IntrospectionSubscribe;
192use crate::coord::peek::PendingPeek;
193use crate::coord::statement_logging::{StatementLogging, StatementLoggingId};
194use crate::coord::timeline::{TimelineContext, TimelineState};
195use crate::coord::timestamp_selection::{TimestampContext, TimestampDetermination};
196use crate::coord::validity::PlanValidity;
197use crate::error::AdapterError;
198use crate::explain::insights::PlanInsightsContext;
199use crate::explain::optimizer_trace::{DispatchGuard, OptimizerTrace};
200use crate::metrics::Metrics;
201use crate::optimize::dataflows::{
202 ComputeInstanceSnapshot, DataflowBuilder, dataflow_import_id_bundle,
203};
204use crate::optimize::{self, Optimize, OptimizerConfig};
205use crate::session::{EndTransactionAction, Session};
206use crate::statement_logging::{StatementEndedExecutionReason, StatementLifecycleEvent};
207use crate::util::{ClientTransmitter, ResultExt};
208use crate::webhook::{WebhookAppenderInvalidator, WebhookConcurrencyLimiter};
209use crate::{AdapterNotice, ReadHolds, flags};
210
211pub(crate) mod id_bundle;
212pub(crate) mod in_memory_oracle;
213pub(crate) mod peek;
214pub(crate) mod statement_logging;
215pub(crate) mod timeline;
216pub(crate) mod timestamp_selection;
217
218pub mod appends;
219mod catalog_serving;
220mod caught_up;
221pub mod cluster_scheduling;
222mod command_handler;
223pub mod consistency;
224mod ddl;
225mod indexes;
226mod introspection;
227mod message_handler;
228mod privatelink_status;
229pub mod read_policy;
230mod sequencer;
231mod sql;
232mod validity;
233
234#[derive(Debug)]
235pub enum Message {
236 Command(OpenTelemetryContext, Command),
237 ControllerReady {
238 controller: ControllerReadiness,
239 },
240 PurifiedStatementReady(PurifiedStatementReady),
241 CreateConnectionValidationReady(CreateConnectionValidationReady),
242 AlterConnectionValidationReady(AlterConnectionValidationReady),
243 TryDeferred {
244 conn_id: ConnectionId,
246 acquired_lock: Option<(CatalogItemId, tokio::sync::OwnedMutexGuard<()>)>,
256 },
257 GroupCommitInitiate(Span, Option<GroupCommitPermit>),
259 DeferredStatementReady,
260 AdvanceTimelines,
261 ClusterEvent(ClusterEvent),
262 CancelPendingPeeks {
263 conn_id: ConnectionId,
264 },
265 LinearizeReads,
266 StagedBatches {
267 conn_id: ConnectionId,
268 table_id: CatalogItemId,
269 batches: Vec<Result<ProtoBatch, String>>,
270 },
271 StorageUsageSchedule,
272 StorageUsageFetch,
273 StorageUsageUpdate(ShardsUsageReferenced),
274 StorageUsagePrune(Vec<BuiltinTableUpdate>),
275 RetireExecute {
278 data: ExecuteContextExtra,
279 otel_ctx: OpenTelemetryContext,
280 reason: StatementEndedExecutionReason,
281 },
282 ExecuteSingleStatementTransaction {
283 ctx: ExecuteContext,
284 otel_ctx: OpenTelemetryContext,
285 stmt: Arc<Statement<Raw>>,
286 params: mz_sql::plan::Params,
287 },
288 PeekStageReady {
289 ctx: ExecuteContext,
290 span: Span,
291 stage: PeekStage,
292 },
293 CreateIndexStageReady {
294 ctx: ExecuteContext,
295 span: Span,
296 stage: CreateIndexStage,
297 },
298 CreateViewStageReady {
299 ctx: ExecuteContext,
300 span: Span,
301 stage: CreateViewStage,
302 },
303 CreateMaterializedViewStageReady {
304 ctx: ExecuteContext,
305 span: Span,
306 stage: CreateMaterializedViewStage,
307 },
308 SubscribeStageReady {
309 ctx: ExecuteContext,
310 span: Span,
311 stage: SubscribeStage,
312 },
313 IntrospectionSubscribeStageReady {
314 span: Span,
315 stage: IntrospectionSubscribeStage,
316 },
317 SecretStageReady {
318 ctx: ExecuteContext,
319 span: Span,
320 stage: SecretStage,
321 },
322 ClusterStageReady {
323 ctx: ExecuteContext,
324 span: Span,
325 stage: ClusterStage,
326 },
327 ExplainTimestampStageReady {
328 ctx: ExecuteContext,
329 span: Span,
330 stage: ExplainTimestampStage,
331 },
332 DrainStatementLog,
333 PrivateLinkVpcEndpointEvents(Vec<VpcEndpointEvent>),
334 CheckSchedulingPolicies,
335
336 SchedulingDecisions(Vec<(&'static str, Vec<(ClusterId, SchedulingDecision)>)>),
341}
342
343impl Message {
344 pub const fn kind(&self) -> &'static str {
346 match self {
347 Message::Command(_, msg) => match msg {
348 Command::CatalogSnapshot { .. } => "command-catalog_snapshot",
349 Command::Startup { .. } => "command-startup",
350 Command::Execute { .. } => "command-execute",
351 Command::Commit { .. } => "command-commit",
352 Command::CancelRequest { .. } => "command-cancel_request",
353 Command::PrivilegedCancelRequest { .. } => "command-privileged_cancel_request",
354 Command::GetWebhook { .. } => "command-get_webhook",
355 Command::GetSystemVars { .. } => "command-get_system_vars",
356 Command::SetSystemVars { .. } => "command-set_system_vars",
357 Command::Terminate { .. } => "command-terminate",
358 Command::RetireExecute { .. } => "command-retire_execute",
359 Command::CheckConsistency { .. } => "command-check_consistency",
360 Command::Dump { .. } => "command-dump",
361 Command::AuthenticatePassword { .. } => "command-auth_check",
362 Command::AuthenticateGetSASLChallenge { .. } => "command-auth_get_sasl_challenge",
363 Command::AuthenticateVerifySASLProof { .. } => "command-auth_verify_sasl_proof",
364 },
365 Message::ControllerReady {
366 controller: ControllerReadiness::Compute,
367 } => "controller_ready(compute)",
368 Message::ControllerReady {
369 controller: ControllerReadiness::Storage,
370 } => "controller_ready(storage)",
371 Message::ControllerReady {
372 controller: ControllerReadiness::Metrics,
373 } => "controller_ready(metrics)",
374 Message::ControllerReady {
375 controller: ControllerReadiness::Internal,
376 } => "controller_ready(internal)",
377 Message::PurifiedStatementReady(_) => "purified_statement_ready",
378 Message::CreateConnectionValidationReady(_) => "create_connection_validation_ready",
379 Message::TryDeferred { .. } => "try_deferred",
380 Message::GroupCommitInitiate(..) => "group_commit_initiate",
381 Message::AdvanceTimelines => "advance_timelines",
382 Message::ClusterEvent(_) => "cluster_event",
383 Message::CancelPendingPeeks { .. } => "cancel_pending_peeks",
384 Message::LinearizeReads => "linearize_reads",
385 Message::StagedBatches { .. } => "staged_batches",
386 Message::StorageUsageSchedule => "storage_usage_schedule",
387 Message::StorageUsageFetch => "storage_usage_fetch",
388 Message::StorageUsageUpdate(_) => "storage_usage_update",
389 Message::StorageUsagePrune(_) => "storage_usage_prune",
390 Message::RetireExecute { .. } => "retire_execute",
391 Message::ExecuteSingleStatementTransaction { .. } => {
392 "execute_single_statement_transaction"
393 }
394 Message::PeekStageReady { .. } => "peek_stage_ready",
395 Message::ExplainTimestampStageReady { .. } => "explain_timestamp_stage_ready",
396 Message::CreateIndexStageReady { .. } => "create_index_stage_ready",
397 Message::CreateViewStageReady { .. } => "create_view_stage_ready",
398 Message::CreateMaterializedViewStageReady { .. } => {
399 "create_materialized_view_stage_ready"
400 }
401 Message::SubscribeStageReady { .. } => "subscribe_stage_ready",
402 Message::IntrospectionSubscribeStageReady { .. } => {
403 "introspection_subscribe_stage_ready"
404 }
405 Message::SecretStageReady { .. } => "secret_stage_ready",
406 Message::ClusterStageReady { .. } => "cluster_stage_ready",
407 Message::DrainStatementLog => "drain_statement_log",
408 Message::AlterConnectionValidationReady(..) => "alter_connection_validation_ready",
409 Message::PrivateLinkVpcEndpointEvents(_) => "private_link_vpc_endpoint_events",
410 Message::CheckSchedulingPolicies => "check_scheduling_policies",
411 Message::SchedulingDecisions { .. } => "scheduling_decision",
412 Message::DeferredStatementReady => "deferred_statement_ready",
413 }
414 }
415}
416
417#[derive(Debug)]
419pub enum ControllerReadiness {
420 Storage,
422 Compute,
424 Metrics,
426 Internal,
428}
429
430#[derive(Derivative)]
431#[derivative(Debug)]
432pub struct BackgroundWorkResult<T> {
433 #[derivative(Debug = "ignore")]
434 pub ctx: ExecuteContext,
435 pub result: Result<T, AdapterError>,
436 pub params: Params,
437 pub plan_validity: PlanValidity,
438 pub original_stmt: Arc<Statement<Raw>>,
439 pub otel_ctx: OpenTelemetryContext,
440}
441
442pub type PurifiedStatementReady = BackgroundWorkResult<mz_sql::pure::PurifiedStatement>;
443
444#[derive(Derivative)]
445#[derivative(Debug)]
446pub struct ValidationReady<T> {
447 #[derivative(Debug = "ignore")]
448 pub ctx: ExecuteContext,
449 pub result: Result<T, AdapterError>,
450 pub resolved_ids: ResolvedIds,
451 pub connection_id: CatalogItemId,
452 pub connection_gid: GlobalId,
453 pub plan_validity: PlanValidity,
454 pub otel_ctx: OpenTelemetryContext,
455}
456
457pub type CreateConnectionValidationReady = ValidationReady<CreateConnectionPlan>;
458pub type AlterConnectionValidationReady = ValidationReady<Connection>;
459
460#[derive(Debug)]
461pub enum PeekStage {
462 LinearizeTimestamp(PeekStageLinearizeTimestamp),
464 RealTimeRecency(PeekStageRealTimeRecency),
465 TimestampReadHold(PeekStageTimestampReadHold),
466 Optimize(PeekStageOptimize),
467 Finish(PeekStageFinish),
469 ExplainPlan(PeekStageExplainPlan),
471 ExplainPushdown(PeekStageExplainPushdown),
472 CopyToPreflight(PeekStageCopyTo),
474 CopyToDataflow(PeekStageCopyTo),
476}
477
478#[derive(Debug)]
479pub struct CopyToContext {
480 pub desc: RelationDesc,
482 pub uri: Uri,
484 pub connection: StorageConnection<ReferencedConnection>,
486 pub connection_id: CatalogItemId,
488 pub format: S3SinkFormat,
490 pub max_file_size: u64,
492 pub output_batch_count: Option<u64>,
497}
498
499#[derive(Debug)]
500pub struct PeekStageLinearizeTimestamp {
501 validity: PlanValidity,
502 plan: mz_sql::plan::SelectPlan,
503 max_query_result_size: Option<u64>,
504 source_ids: BTreeSet<GlobalId>,
505 target_replica: Option<ReplicaId>,
506 timeline_context: TimelineContext,
507 optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
508 explain_ctx: ExplainContext,
511}
512
513#[derive(Debug)]
514pub struct PeekStageRealTimeRecency {
515 validity: PlanValidity,
516 plan: mz_sql::plan::SelectPlan,
517 max_query_result_size: Option<u64>,
518 source_ids: BTreeSet<GlobalId>,
519 target_replica: Option<ReplicaId>,
520 timeline_context: TimelineContext,
521 oracle_read_ts: Option<Timestamp>,
522 optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
523 explain_ctx: ExplainContext,
526}
527
528#[derive(Debug)]
529pub struct PeekStageTimestampReadHold {
530 validity: PlanValidity,
531 plan: mz_sql::plan::SelectPlan,
532 max_query_result_size: Option<u64>,
533 source_ids: BTreeSet<GlobalId>,
534 target_replica: Option<ReplicaId>,
535 timeline_context: TimelineContext,
536 oracle_read_ts: Option<Timestamp>,
537 real_time_recency_ts: Option<mz_repr::Timestamp>,
538 optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
539 explain_ctx: ExplainContext,
542}
543
544#[derive(Debug)]
545pub struct PeekStageOptimize {
546 validity: PlanValidity,
547 plan: mz_sql::plan::SelectPlan,
548 max_query_result_size: Option<u64>,
549 source_ids: BTreeSet<GlobalId>,
550 id_bundle: CollectionIdBundle,
551 target_replica: Option<ReplicaId>,
552 determination: TimestampDetermination<mz_repr::Timestamp>,
553 optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
554 explain_ctx: ExplainContext,
557}
558
559#[derive(Debug)]
560pub struct PeekStageFinish {
561 validity: PlanValidity,
562 plan: mz_sql::plan::SelectPlan,
563 max_query_result_size: Option<u64>,
564 id_bundle: CollectionIdBundle,
565 target_replica: Option<ReplicaId>,
566 source_ids: BTreeSet<GlobalId>,
567 determination: TimestampDetermination<mz_repr::Timestamp>,
568 cluster_id: ComputeInstanceId,
569 finishing: RowSetFinishing,
570 plan_insights_optimizer_trace: Option<OptimizerTrace>,
573 insights_ctx: Option<Box<PlanInsightsContext>>,
574 global_lir_plan: optimize::peek::GlobalLirPlan,
575 optimization_finished_at: EpochMillis,
576}
577
578#[derive(Debug)]
579pub struct PeekStageCopyTo {
580 validity: PlanValidity,
581 optimizer: optimize::copy_to::Optimizer,
582 global_lir_plan: optimize::copy_to::GlobalLirPlan,
583 optimization_finished_at: EpochMillis,
584 source_ids: BTreeSet<GlobalId>,
585}
586
587#[derive(Debug)]
588pub struct PeekStageExplainPlan {
589 validity: PlanValidity,
590 optimizer: optimize::peek::Optimizer,
591 df_meta: DataflowMetainfo,
592 explain_ctx: ExplainPlanContext,
593 insights_ctx: Option<Box<PlanInsightsContext>>,
594}
595
596#[derive(Debug)]
597pub struct PeekStageExplainPushdown {
598 validity: PlanValidity,
599 determination: TimestampDetermination<mz_repr::Timestamp>,
600 imports: BTreeMap<GlobalId, MapFilterProject>,
601}
602
603#[derive(Debug)]
604pub enum CreateIndexStage {
605 Optimize(CreateIndexOptimize),
606 Finish(CreateIndexFinish),
607 Explain(CreateIndexExplain),
608}
609
610#[derive(Debug)]
611pub struct CreateIndexOptimize {
612 validity: PlanValidity,
613 plan: plan::CreateIndexPlan,
614 resolved_ids: ResolvedIds,
615 explain_ctx: ExplainContext,
618}
619
620#[derive(Debug)]
621pub struct CreateIndexFinish {
622 validity: PlanValidity,
623 item_id: CatalogItemId,
624 global_id: GlobalId,
625 plan: plan::CreateIndexPlan,
626 resolved_ids: ResolvedIds,
627 global_mir_plan: optimize::index::GlobalMirPlan,
628 global_lir_plan: optimize::index::GlobalLirPlan,
629}
630
631#[derive(Debug)]
632pub struct CreateIndexExplain {
633 validity: PlanValidity,
634 exported_index_id: GlobalId,
635 plan: plan::CreateIndexPlan,
636 df_meta: DataflowMetainfo,
637 explain_ctx: ExplainPlanContext,
638}
639
640#[derive(Debug)]
641pub enum CreateViewStage {
642 Optimize(CreateViewOptimize),
643 Finish(CreateViewFinish),
644 Explain(CreateViewExplain),
645}
646
647#[derive(Debug)]
648pub struct CreateViewOptimize {
649 validity: PlanValidity,
650 plan: plan::CreateViewPlan,
651 resolved_ids: ResolvedIds,
652 explain_ctx: ExplainContext,
655}
656
657#[derive(Debug)]
658pub struct CreateViewFinish {
659 validity: PlanValidity,
660 item_id: CatalogItemId,
662 global_id: GlobalId,
664 plan: plan::CreateViewPlan,
665 resolved_ids: ResolvedIds,
667 optimized_expr: OptimizedMirRelationExpr,
668}
669
670#[derive(Debug)]
671pub struct CreateViewExplain {
672 validity: PlanValidity,
673 id: GlobalId,
674 plan: plan::CreateViewPlan,
675 explain_ctx: ExplainPlanContext,
676}
677
678#[derive(Debug)]
679pub enum ExplainTimestampStage {
680 Optimize(ExplainTimestampOptimize),
681 RealTimeRecency(ExplainTimestampRealTimeRecency),
682 Finish(ExplainTimestampFinish),
683}
684
685#[derive(Debug)]
686pub struct ExplainTimestampOptimize {
687 validity: PlanValidity,
688 plan: plan::ExplainTimestampPlan,
689 cluster_id: ClusterId,
690}
691
692#[derive(Debug)]
693pub struct ExplainTimestampRealTimeRecency {
694 validity: PlanValidity,
695 format: ExplainFormat,
696 optimized_plan: OptimizedMirRelationExpr,
697 cluster_id: ClusterId,
698 when: QueryWhen,
699}
700
701#[derive(Debug)]
702pub struct ExplainTimestampFinish {
703 validity: PlanValidity,
704 format: ExplainFormat,
705 optimized_plan: OptimizedMirRelationExpr,
706 cluster_id: ClusterId,
707 source_ids: BTreeSet<GlobalId>,
708 when: QueryWhen,
709 real_time_recency_ts: Option<Timestamp>,
710}
711
712#[derive(Debug)]
713pub enum ClusterStage {
714 Alter(AlterCluster),
715 WaitForHydrated(AlterClusterWaitForHydrated),
716 Finalize(AlterClusterFinalize),
717}
718
719#[derive(Debug)]
720pub struct AlterCluster {
721 validity: PlanValidity,
722 plan: plan::AlterClusterPlan,
723}
724
725#[derive(Debug)]
726pub struct AlterClusterWaitForHydrated {
727 validity: PlanValidity,
728 plan: plan::AlterClusterPlan,
729 new_config: ClusterVariantManaged,
730 timeout_time: Instant,
731 on_timeout: OnTimeoutAction,
732}
733
734#[derive(Debug)]
735pub struct AlterClusterFinalize {
736 validity: PlanValidity,
737 plan: plan::AlterClusterPlan,
738 new_config: ClusterVariantManaged,
739}
740
741#[derive(Debug)]
742pub enum ExplainContext {
743 None,
745 Plan(ExplainPlanContext),
747 PlanInsightsNotice(OptimizerTrace),
750 Pushdown,
752}
753
754impl ExplainContext {
755 fn dispatch_guard(&self) -> Option<DispatchGuard<'_>> {
759 let optimizer_trace = match self {
760 ExplainContext::Plan(explain_ctx) => Some(&explain_ctx.optimizer_trace),
761 ExplainContext::PlanInsightsNotice(optimizer_trace) => Some(optimizer_trace),
762 _ => None,
763 };
764 optimizer_trace.map(|optimizer_trace| optimizer_trace.as_guard())
765 }
766
767 fn needs_cluster(&self) -> bool {
768 match self {
769 ExplainContext::None => true,
770 ExplainContext::Plan(..) => false,
771 ExplainContext::PlanInsightsNotice(..) => true,
772 ExplainContext::Pushdown => false,
773 }
774 }
775
776 fn needs_plan_insights(&self) -> bool {
777 matches!(
778 self,
779 ExplainContext::Plan(ExplainPlanContext {
780 stage: ExplainStage::PlanInsights,
781 ..
782 }) | ExplainContext::PlanInsightsNotice(_)
783 )
784 }
785}
786
787#[derive(Debug)]
788pub struct ExplainPlanContext {
789 pub broken: bool,
790 pub config: ExplainConfig,
791 pub format: ExplainFormat,
792 pub stage: ExplainStage,
793 pub replan: Option<GlobalId>,
794 pub desc: Option<RelationDesc>,
795 pub optimizer_trace: OptimizerTrace,
796}
797
798#[derive(Debug)]
799pub enum CreateMaterializedViewStage {
800 Optimize(CreateMaterializedViewOptimize),
801 Finish(CreateMaterializedViewFinish),
802 Explain(CreateMaterializedViewExplain),
803}
804
805#[derive(Debug)]
806pub struct CreateMaterializedViewOptimize {
807 validity: PlanValidity,
808 plan: plan::CreateMaterializedViewPlan,
809 resolved_ids: ResolvedIds,
810 explain_ctx: ExplainContext,
813}
814
815#[derive(Debug)]
816pub struct CreateMaterializedViewFinish {
817 item_id: CatalogItemId,
819 global_id: GlobalId,
821 validity: PlanValidity,
822 plan: plan::CreateMaterializedViewPlan,
823 resolved_ids: ResolvedIds,
824 local_mir_plan: optimize::materialized_view::LocalMirPlan,
825 global_mir_plan: optimize::materialized_view::GlobalMirPlan,
826 global_lir_plan: optimize::materialized_view::GlobalLirPlan,
827}
828
829#[derive(Debug)]
830pub struct CreateMaterializedViewExplain {
831 global_id: GlobalId,
832 validity: PlanValidity,
833 plan: plan::CreateMaterializedViewPlan,
834 df_meta: DataflowMetainfo,
835 explain_ctx: ExplainPlanContext,
836}
837
838#[derive(Debug)]
839pub enum SubscribeStage {
840 OptimizeMir(SubscribeOptimizeMir),
841 TimestampOptimizeLir(SubscribeTimestampOptimizeLir),
842 Finish(SubscribeFinish),
843}
844
845#[derive(Debug)]
846pub struct SubscribeOptimizeMir {
847 validity: PlanValidity,
848 plan: plan::SubscribePlan,
849 timeline: TimelineContext,
850 dependency_ids: BTreeSet<GlobalId>,
851 cluster_id: ComputeInstanceId,
852 replica_id: Option<ReplicaId>,
853}
854
855#[derive(Debug)]
856pub struct SubscribeTimestampOptimizeLir {
857 validity: PlanValidity,
858 plan: plan::SubscribePlan,
859 timeline: TimelineContext,
860 optimizer: optimize::subscribe::Optimizer,
861 global_mir_plan: optimize::subscribe::GlobalMirPlan<optimize::subscribe::Unresolved>,
862 dependency_ids: BTreeSet<GlobalId>,
863 replica_id: Option<ReplicaId>,
864}
865
866#[derive(Debug)]
867pub struct SubscribeFinish {
868 validity: PlanValidity,
869 cluster_id: ComputeInstanceId,
870 replica_id: Option<ReplicaId>,
871 plan: plan::SubscribePlan,
872 global_lir_plan: optimize::subscribe::GlobalLirPlan,
873 dependency_ids: BTreeSet<GlobalId>,
874}
875
876#[derive(Debug)]
877pub enum IntrospectionSubscribeStage {
878 OptimizeMir(IntrospectionSubscribeOptimizeMir),
879 TimestampOptimizeLir(IntrospectionSubscribeTimestampOptimizeLir),
880 Finish(IntrospectionSubscribeFinish),
881}
882
883#[derive(Debug)]
884pub struct IntrospectionSubscribeOptimizeMir {
885 validity: PlanValidity,
886 plan: plan::SubscribePlan,
887 subscribe_id: GlobalId,
888 cluster_id: ComputeInstanceId,
889 replica_id: ReplicaId,
890}
891
892#[derive(Debug)]
893pub struct IntrospectionSubscribeTimestampOptimizeLir {
894 validity: PlanValidity,
895 optimizer: optimize::subscribe::Optimizer,
896 global_mir_plan: optimize::subscribe::GlobalMirPlan<optimize::subscribe::Unresolved>,
897 cluster_id: ComputeInstanceId,
898 replica_id: ReplicaId,
899}
900
901#[derive(Debug)]
902pub struct IntrospectionSubscribeFinish {
903 validity: PlanValidity,
904 global_lir_plan: optimize::subscribe::GlobalLirPlan,
905 read_holds: ReadHolds<Timestamp>,
906 cluster_id: ComputeInstanceId,
907 replica_id: ReplicaId,
908}
909
910#[derive(Debug)]
911pub enum SecretStage {
912 CreateEnsure(CreateSecretEnsure),
913 CreateFinish(CreateSecretFinish),
914 RotateKeysEnsure(RotateKeysSecretEnsure),
915 RotateKeysFinish(RotateKeysSecretFinish),
916 Alter(AlterSecret),
917}
918
919#[derive(Debug)]
920pub struct CreateSecretEnsure {
921 validity: PlanValidity,
922 plan: plan::CreateSecretPlan,
923}
924
925#[derive(Debug)]
926pub struct CreateSecretFinish {
927 validity: PlanValidity,
928 item_id: CatalogItemId,
929 global_id: GlobalId,
930 plan: plan::CreateSecretPlan,
931}
932
933#[derive(Debug)]
934pub struct RotateKeysSecretEnsure {
935 validity: PlanValidity,
936 id: CatalogItemId,
937}
938
939#[derive(Debug)]
940pub struct RotateKeysSecretFinish {
941 validity: PlanValidity,
942 ops: Vec<crate::catalog::Op>,
943}
944
945#[derive(Debug)]
946pub struct AlterSecret {
947 validity: PlanValidity,
948 plan: plan::AlterSecretPlan,
949}
950
951#[derive(Debug, Copy, Clone, PartialEq, Eq)]
956pub enum TargetCluster {
957 CatalogServer,
959 Active,
961 Transaction(ClusterId),
963}
964
965pub(crate) enum StageResult<T> {
967 Handle(JoinHandle<Result<T, AdapterError>>),
969 HandleRetire(JoinHandle<Result<ExecuteResponse, AdapterError>>),
971 Immediate(T),
973 Response(ExecuteResponse),
975}
976
977pub(crate) trait Staged: Send {
979 type Ctx: StagedContext;
980
981 fn validity(&mut self) -> &mut PlanValidity;
982
983 async fn stage(
985 self,
986 coord: &mut Coordinator,
987 ctx: &mut Self::Ctx,
988 ) -> Result<StageResult<Box<Self>>, AdapterError>;
989
990 fn message(self, ctx: Self::Ctx, span: Span) -> Message;
992
993 fn cancel_enabled(&self) -> bool;
995}
996
997pub trait StagedContext {
998 fn retire(self, result: Result<ExecuteResponse, AdapterError>);
999 fn session(&self) -> Option<&Session>;
1000}
1001
1002impl StagedContext for ExecuteContext {
1003 fn retire(self, result: Result<ExecuteResponse, AdapterError>) {
1004 self.retire(result);
1005 }
1006
1007 fn session(&self) -> Option<&Session> {
1008 Some(self.session())
1009 }
1010}
1011
1012impl StagedContext for () {
1013 fn retire(self, _result: Result<ExecuteResponse, AdapterError>) {}
1014
1015 fn session(&self) -> Option<&Session> {
1016 None
1017 }
1018}
1019
1020pub struct Config {
1022 pub controller_config: ControllerConfig,
1023 pub controller_envd_epoch: NonZeroI64,
1024 pub storage: Box<dyn mz_catalog::durable::DurableCatalogState>,
1025 pub audit_logs_iterator: AuditLogIterator,
1026 pub timestamp_oracle_url: Option<SensitiveUrl>,
1027 pub unsafe_mode: bool,
1028 pub all_features: bool,
1029 pub build_info: &'static BuildInfo,
1030 pub environment_id: EnvironmentId,
1031 pub metrics_registry: MetricsRegistry,
1032 pub now: NowFn,
1033 pub secrets_controller: Arc<dyn SecretsController>,
1034 pub cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
1035 pub availability_zones: Vec<String>,
1036 pub cluster_replica_sizes: ClusterReplicaSizeMap,
1037 pub builtin_system_cluster_config: BootstrapBuiltinClusterConfig,
1038 pub builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig,
1039 pub builtin_probe_cluster_config: BootstrapBuiltinClusterConfig,
1040 pub builtin_support_cluster_config: BootstrapBuiltinClusterConfig,
1041 pub builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig,
1042 pub system_parameter_defaults: BTreeMap<String, String>,
1043 pub storage_usage_client: StorageUsageClient,
1044 pub storage_usage_collection_interval: Duration,
1045 pub storage_usage_retention_period: Option<Duration>,
1046 pub segment_client: Option<mz_segment::Client>,
1047 pub egress_addresses: Vec<IpNet>,
1048 pub remote_system_parameters: Option<BTreeMap<String, String>>,
1049 pub aws_account_id: Option<String>,
1050 pub aws_privatelink_availability_zones: Option<Vec<String>>,
1051 pub connection_context: ConnectionContext,
1052 pub connection_limit_callback: Box<dyn Fn(u64, u64) -> () + Send + Sync + 'static>,
1053 pub webhook_concurrency_limit: WebhookConcurrencyLimiter,
1054 pub http_host_name: Option<String>,
1055 pub tracing_handle: TracingHandle,
1056 pub read_only_controllers: bool,
1060
1061 pub caught_up_trigger: Option<Trigger>,
1065
1066 pub helm_chart_version: Option<String>,
1067 pub license_key: ValidatedLicenseKey,
1068 pub external_login_password_mz_system: Option<Password>,
1069 pub force_builtin_schema_migration: Option<String>,
1070}
1071
1072#[derive(Debug, Serialize)]
1074pub struct ConnMeta {
1075 secret_key: u32,
1080 connected_at: EpochMillis,
1082 user: User,
1083 application_name: String,
1084 uuid: Uuid,
1085 conn_id: ConnectionId,
1086 client_ip: Option<IpAddr>,
1087
1088 drop_sinks: BTreeSet<GlobalId>,
1091
1092 #[serde(skip)]
1094 deferred_lock: Option<OwnedMutexGuard<()>>,
1095
1096 pending_cluster_alters: BTreeSet<ClusterId>,
1099
1100 #[serde(skip)]
1102 notice_tx: mpsc::UnboundedSender<AdapterNotice>,
1103
1104 authenticated_role: RoleId,
1108}
1109
1110impl ConnMeta {
1111 pub fn conn_id(&self) -> &ConnectionId {
1112 &self.conn_id
1113 }
1114
1115 pub fn user(&self) -> &User {
1116 &self.user
1117 }
1118
1119 pub fn application_name(&self) -> &str {
1120 &self.application_name
1121 }
1122
1123 pub fn authenticated_role_id(&self) -> &RoleId {
1124 &self.authenticated_role
1125 }
1126
1127 pub fn uuid(&self) -> Uuid {
1128 self.uuid
1129 }
1130
1131 pub fn client_ip(&self) -> Option<IpAddr> {
1132 self.client_ip
1133 }
1134
1135 pub fn connected_at(&self) -> EpochMillis {
1136 self.connected_at
1137 }
1138}
1139
1140#[derive(Debug)]
1141pub struct PendingTxn {
1143 ctx: ExecuteContext,
1145 response: Result<PendingTxnResponse, AdapterError>,
1147 action: EndTransactionAction,
1149}
1150
1151#[derive(Debug)]
1152pub enum PendingTxnResponse {
1154 Committed {
1156 params: BTreeMap<&'static str, String>,
1158 },
1159 Rolledback {
1161 params: BTreeMap<&'static str, String>,
1163 },
1164}
1165
1166impl PendingTxnResponse {
1167 pub fn extend_params(&mut self, p: impl IntoIterator<Item = (&'static str, String)>) {
1168 match self {
1169 PendingTxnResponse::Committed { params }
1170 | PendingTxnResponse::Rolledback { params } => params.extend(p),
1171 }
1172 }
1173}
1174
1175impl From<PendingTxnResponse> for ExecuteResponse {
1176 fn from(value: PendingTxnResponse) -> Self {
1177 match value {
1178 PendingTxnResponse::Committed { params } => {
1179 ExecuteResponse::TransactionCommitted { params }
1180 }
1181 PendingTxnResponse::Rolledback { params } => {
1182 ExecuteResponse::TransactionRolledBack { params }
1183 }
1184 }
1185 }
1186}
1187
1188#[derive(Debug)]
1189pub struct PendingReadTxn {
1191 txn: PendingRead,
1193 timestamp_context: TimestampContext<mz_repr::Timestamp>,
1195 created: Instant,
1197 num_requeues: u64,
1201 otel_ctx: OpenTelemetryContext,
1203}
1204
1205impl PendingReadTxn {
1206 pub fn timestamp_context(&self) -> &TimestampContext<mz_repr::Timestamp> {
1208 &self.timestamp_context
1209 }
1210
1211 pub(crate) fn take_context(self) -> ExecuteContext {
1212 self.txn.take_context()
1213 }
1214}
1215
1216#[derive(Debug)]
1217enum PendingRead {
1219 Read {
1220 txn: PendingTxn,
1222 },
1223 ReadThenWrite {
1224 ctx: ExecuteContext,
1226 tx: oneshot::Sender<Option<ExecuteContext>>,
1229 },
1230}
1231
1232impl PendingRead {
1233 #[instrument(level = "debug")]
1238 pub fn finish(self) -> Option<(ExecuteContext, Result<ExecuteResponse, AdapterError>)> {
1239 match self {
1240 PendingRead::Read {
1241 txn:
1242 PendingTxn {
1243 mut ctx,
1244 response,
1245 action,
1246 },
1247 ..
1248 } => {
1249 let changed = ctx.session_mut().vars_mut().end_transaction(action);
1250 let response = response.map(|mut r| {
1252 r.extend_params(changed);
1253 ExecuteResponse::from(r)
1254 });
1255
1256 Some((ctx, response))
1257 }
1258 PendingRead::ReadThenWrite { ctx, tx, .. } => {
1259 let _ = tx.send(Some(ctx));
1261 None
1262 }
1263 }
1264 }
1265
1266 fn label(&self) -> &'static str {
1267 match self {
1268 PendingRead::Read { .. } => "read",
1269 PendingRead::ReadThenWrite { .. } => "read_then_write",
1270 }
1271 }
1272
1273 pub(crate) fn take_context(self) -> ExecuteContext {
1274 match self {
1275 PendingRead::Read { txn, .. } => txn.ctx,
1276 PendingRead::ReadThenWrite { ctx, tx, .. } => {
1277 let _ = tx.send(None);
1280 ctx
1281 }
1282 }
1283 }
1284}
1285
1286#[derive(Debug, Default)]
1297#[must_use]
1298pub struct ExecuteContextExtra {
1299 statement_uuid: Option<StatementLoggingId>,
1300}
1301
1302impl ExecuteContextExtra {
1303 pub(crate) fn new(statement_uuid: Option<StatementLoggingId>) -> Self {
1304 Self { statement_uuid }
1305 }
1306 pub fn is_trivial(&self) -> bool {
1307 let Self { statement_uuid } = self;
1308 statement_uuid.is_none()
1309 }
1310 pub fn contents(&self) -> Option<StatementLoggingId> {
1311 let Self { statement_uuid } = self;
1312 *statement_uuid
1313 }
1314 #[must_use]
1318 fn retire(mut self) -> Option<StatementLoggingId> {
1319 let Self { statement_uuid } = &mut self;
1320 statement_uuid.take()
1321 }
1322}
1323
1324impl Drop for ExecuteContextExtra {
1325 fn drop(&mut self) {
1326 let Self { statement_uuid } = &*self;
1327 if let Some(statement_uuid) = statement_uuid {
1328 soft_panic_or_log!(
1332 "execute context for statement {statement_uuid:?} dropped without being properly retired."
1333 );
1334 }
1335 }
1336}
1337
1338#[derive(Debug)]
1351pub struct ExecuteContext {
1352 inner: Box<ExecuteContextInner>,
1353}
1354
1355impl std::ops::Deref for ExecuteContext {
1356 type Target = ExecuteContextInner;
1357 fn deref(&self) -> &Self::Target {
1358 &*self.inner
1359 }
1360}
1361
1362impl std::ops::DerefMut for ExecuteContext {
1363 fn deref_mut(&mut self) -> &mut Self::Target {
1364 &mut *self.inner
1365 }
1366}
1367
1368#[derive(Debug)]
1369pub struct ExecuteContextInner {
1370 tx: ClientTransmitter<ExecuteResponse>,
1371 internal_cmd_tx: mpsc::UnboundedSender<Message>,
1372 session: Session,
1373 extra: ExecuteContextExtra,
1374}
1375
1376impl ExecuteContext {
1377 pub fn session(&self) -> &Session {
1378 &self.session
1379 }
1380
1381 pub fn session_mut(&mut self) -> &mut Session {
1382 &mut self.session
1383 }
1384
1385 pub fn tx(&self) -> &ClientTransmitter<ExecuteResponse> {
1386 &self.tx
1387 }
1388
1389 pub fn tx_mut(&mut self) -> &mut ClientTransmitter<ExecuteResponse> {
1390 &mut self.tx
1391 }
1392
1393 pub fn from_parts(
1394 tx: ClientTransmitter<ExecuteResponse>,
1395 internal_cmd_tx: mpsc::UnboundedSender<Message>,
1396 session: Session,
1397 extra: ExecuteContextExtra,
1398 ) -> Self {
1399 Self {
1400 inner: ExecuteContextInner {
1401 tx,
1402 session,
1403 extra,
1404 internal_cmd_tx,
1405 }
1406 .into(),
1407 }
1408 }
1409
1410 pub fn into_parts(
1419 self,
1420 ) -> (
1421 ClientTransmitter<ExecuteResponse>,
1422 mpsc::UnboundedSender<Message>,
1423 Session,
1424 ExecuteContextExtra,
1425 ) {
1426 let ExecuteContextInner {
1427 tx,
1428 internal_cmd_tx,
1429 session,
1430 extra,
1431 } = *self.inner;
1432 (tx, internal_cmd_tx, session, extra)
1433 }
1434
1435 #[instrument(level = "debug")]
1437 pub fn retire(self, result: Result<ExecuteResponse, AdapterError>) {
1438 let ExecuteContextInner {
1439 tx,
1440 internal_cmd_tx,
1441 session,
1442 extra,
1443 } = *self.inner;
1444 let reason = if extra.is_trivial() {
1445 None
1446 } else {
1447 Some((&result).into())
1448 };
1449 tx.send(result, session);
1450 if let Some(reason) = reason {
1451 if let Err(e) = internal_cmd_tx.send(Message::RetireExecute {
1452 otel_ctx: OpenTelemetryContext::obtain(),
1453 data: extra,
1454 reason,
1455 }) {
1456 warn!("internal_cmd_rx dropped before we could send: {:?}", e);
1457 }
1458 }
1459 }
1460
1461 pub fn extra(&self) -> &ExecuteContextExtra {
1462 &self.extra
1463 }
1464
1465 pub fn extra_mut(&mut self) -> &mut ExecuteContextExtra {
1466 &mut self.extra
1467 }
1468}
1469
1470#[derive(Debug)]
1471struct ClusterReplicaStatuses(
1472 BTreeMap<ClusterId, BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>>>,
1473);
1474
1475impl ClusterReplicaStatuses {
1476 pub(crate) fn new() -> ClusterReplicaStatuses {
1477 ClusterReplicaStatuses(BTreeMap::new())
1478 }
1479
1480 pub(crate) fn initialize_cluster_statuses(&mut self, cluster_id: ClusterId) {
1484 let prev = self.0.insert(cluster_id, BTreeMap::new());
1485 assert_eq!(
1486 prev, None,
1487 "cluster {cluster_id} statuses already initialized"
1488 );
1489 }
1490
1491 pub(crate) fn initialize_cluster_replica_statuses(
1495 &mut self,
1496 cluster_id: ClusterId,
1497 replica_id: ReplicaId,
1498 num_processes: usize,
1499 time: DateTime<Utc>,
1500 ) {
1501 tracing::info!(
1502 ?cluster_id,
1503 ?replica_id,
1504 ?time,
1505 "initializing cluster replica status"
1506 );
1507 let replica_statuses = self.0.entry(cluster_id).or_default();
1508 let process_statuses = (0..num_processes)
1509 .map(|process_id| {
1510 let status = ClusterReplicaProcessStatus {
1511 status: ClusterStatus::Offline(Some(OfflineReason::Initializing)),
1512 time: time.clone(),
1513 };
1514 (u64::cast_from(process_id), status)
1515 })
1516 .collect();
1517 let prev = replica_statuses.insert(replica_id, process_statuses);
1518 assert_none!(
1519 prev,
1520 "cluster replica {cluster_id}.{replica_id} statuses already initialized"
1521 );
1522 }
1523
1524 pub(crate) fn remove_cluster_statuses(
1528 &mut self,
1529 cluster_id: &ClusterId,
1530 ) -> BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>> {
1531 let prev = self.0.remove(cluster_id);
1532 prev.unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"))
1533 }
1534
1535 pub(crate) fn remove_cluster_replica_statuses(
1539 &mut self,
1540 cluster_id: &ClusterId,
1541 replica_id: &ReplicaId,
1542 ) -> BTreeMap<ProcessId, ClusterReplicaProcessStatus> {
1543 let replica_statuses = self
1544 .0
1545 .get_mut(cluster_id)
1546 .unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"));
1547 let prev = replica_statuses.remove(replica_id);
1548 prev.unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1549 }
1550
1551 pub(crate) fn ensure_cluster_status(
1555 &mut self,
1556 cluster_id: ClusterId,
1557 replica_id: ReplicaId,
1558 process_id: ProcessId,
1559 status: ClusterReplicaProcessStatus,
1560 ) {
1561 let replica_statuses = self
1562 .0
1563 .get_mut(&cluster_id)
1564 .unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"))
1565 .get_mut(&replica_id)
1566 .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"));
1567 replica_statuses.insert(process_id, status);
1568 }
1569
1570 pub fn get_cluster_replica_status(
1574 &self,
1575 cluster_id: ClusterId,
1576 replica_id: ReplicaId,
1577 ) -> ClusterStatus {
1578 let process_status = self.get_cluster_replica_statuses(cluster_id, replica_id);
1579 Self::cluster_replica_status(process_status)
1580 }
1581
1582 pub fn cluster_replica_status(
1584 process_status: &BTreeMap<ProcessId, ClusterReplicaProcessStatus>,
1585 ) -> ClusterStatus {
1586 process_status
1587 .values()
1588 .fold(ClusterStatus::Online, |s, p| match (s, p.status) {
1589 (ClusterStatus::Online, ClusterStatus::Online) => ClusterStatus::Online,
1590 (x, y) => {
1591 let reason_x = match x {
1592 ClusterStatus::Offline(reason) => reason,
1593 ClusterStatus::Online => None,
1594 };
1595 let reason_y = match y {
1596 ClusterStatus::Offline(reason) => reason,
1597 ClusterStatus::Online => None,
1598 };
1599 ClusterStatus::Offline(reason_x.or(reason_y))
1601 }
1602 })
1603 }
1604
1605 pub(crate) fn get_cluster_replica_statuses(
1609 &self,
1610 cluster_id: ClusterId,
1611 replica_id: ReplicaId,
1612 ) -> &BTreeMap<ProcessId, ClusterReplicaProcessStatus> {
1613 self.try_get_cluster_replica_statuses(cluster_id, replica_id)
1614 .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1615 }
1616
1617 pub(crate) fn try_get_cluster_replica_statuses(
1619 &self,
1620 cluster_id: ClusterId,
1621 replica_id: ReplicaId,
1622 ) -> Option<&BTreeMap<ProcessId, ClusterReplicaProcessStatus>> {
1623 self.try_get_cluster_statuses(cluster_id)
1624 .and_then(|statuses| statuses.get(&replica_id))
1625 }
1626
1627 pub(crate) fn try_get_cluster_statuses(
1629 &self,
1630 cluster_id: ClusterId,
1631 ) -> Option<&BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>>> {
1632 self.0.get(&cluster_id)
1633 }
1634}
1635
1636#[derive(Derivative)]
1638#[derivative(Debug)]
1639pub struct Coordinator {
1640 #[derivative(Debug = "ignore")]
1642 controller: mz_controller::Controller,
1643 catalog: Arc<Catalog>,
1651
1652 persist_client: PersistClient,
1655
1656 internal_cmd_tx: mpsc::UnboundedSender<Message>,
1658 group_commit_tx: appends::GroupCommitNotifier,
1660
1661 strict_serializable_reads_tx: mpsc::UnboundedSender<(ConnectionId, PendingReadTxn)>,
1663
1664 global_timelines: BTreeMap<Timeline, TimelineState<Timestamp>>,
1667
1668 transient_id_gen: Arc<TransientIdGen>,
1670 active_conns: BTreeMap<ConnectionId, ConnMeta>,
1673
1674 txn_read_holds: BTreeMap<ConnectionId, read_policy::ReadHolds<Timestamp>>,
1678
1679 pending_peeks: BTreeMap<Uuid, PendingPeek>,
1683 client_pending_peeks: BTreeMap<ConnectionId, BTreeMap<Uuid, ClusterId>>,
1685
1686 pending_linearize_read_txns: BTreeMap<ConnectionId, PendingReadTxn>,
1688
1689 active_compute_sinks: BTreeMap<GlobalId, ActiveComputeSink>,
1691 active_webhooks: BTreeMap<CatalogItemId, WebhookAppenderInvalidator>,
1693 active_copies: BTreeMap<ConnectionId, ActiveCopyFrom>,
1696
1697 staged_cancellation: BTreeMap<ConnectionId, (watch::Sender<bool>, watch::Receiver<bool>)>,
1700 introspection_subscribes: BTreeMap<GlobalId, IntrospectionSubscribe>,
1702
1703 write_locks: BTreeMap<CatalogItemId, Arc<tokio::sync::Mutex<()>>>,
1705 deferred_write_ops: BTreeMap<ConnectionId, DeferredOp>,
1707
1708 pending_writes: Vec<PendingWriteTxn>,
1710
1711 advance_timelines_interval: Interval,
1721
1722 serialized_ddl: LockedVecDeque<DeferredPlanStatement>,
1731
1732 secrets_controller: Arc<dyn SecretsController>,
1735 caching_secrets_reader: CachingSecretsReader,
1737
1738 cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
1741
1742 storage_usage_client: StorageUsageClient,
1744 storage_usage_collection_interval: Duration,
1746
1747 #[derivative(Debug = "ignore")]
1749 segment_client: Option<mz_segment::Client>,
1750
1751 metrics: Metrics,
1753 optimizer_metrics: OptimizerMetrics,
1755
1756 tracing_handle: TracingHandle,
1758
1759 statement_logging: StatementLogging,
1761
1762 webhook_concurrency_limit: WebhookConcurrencyLimiter,
1764
1765 pg_timestamp_oracle_config: Option<PostgresTimestampOracleConfig>,
1769
1770 check_cluster_scheduling_policies_interval: Interval,
1772
1773 cluster_scheduling_decisions: BTreeMap<ClusterId, BTreeMap<&'static str, SchedulingDecision>>,
1777
1778 caught_up_check_interval: Interval,
1781
1782 caught_up_check: Option<CaughtUpCheckContext>,
1785
1786 installed_watch_sets: BTreeMap<WatchSetId, (ConnectionId, WatchSetResponse)>,
1788
1789 connection_watch_sets: BTreeMap<ConnectionId, BTreeSet<WatchSetId>>,
1791
1792 cluster_replica_statuses: ClusterReplicaStatuses,
1794
1795 read_only_controllers: bool,
1799
1800 buffered_builtin_table_updates: Option<Vec<BuiltinTableUpdate>>,
1808
1809 license_key: ValidatedLicenseKey,
1810}
1811
1812impl Coordinator {
1813 #[instrument(name = "coord::bootstrap")]
1817 pub(crate) async fn bootstrap(
1818 &mut self,
1819 boot_ts: Timestamp,
1820 migrated_storage_collections_0dt: BTreeSet<CatalogItemId>,
1821 mut builtin_table_updates: Vec<BuiltinTableUpdate>,
1822 cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
1823 uncached_local_exprs: BTreeMap<GlobalId, LocalExpressions>,
1824 audit_logs_iterator: AuditLogIterator,
1825 ) -> Result<(), AdapterError> {
1826 let bootstrap_start = Instant::now();
1827 info!("startup: coordinator init: bootstrap beginning");
1828 info!("startup: coordinator init: bootstrap: preamble beginning");
1829
1830 let cluster_statuses: Vec<(_, Vec<_>)> = self
1833 .catalog()
1834 .clusters()
1835 .map(|cluster| {
1836 (
1837 cluster.id(),
1838 cluster
1839 .replicas()
1840 .map(|replica| {
1841 (replica.replica_id, replica.config.location.num_processes())
1842 })
1843 .collect(),
1844 )
1845 })
1846 .collect();
1847 let now = self.now_datetime();
1848 for (cluster_id, replica_statuses) in cluster_statuses {
1849 self.cluster_replica_statuses
1850 .initialize_cluster_statuses(cluster_id);
1851 for (replica_id, num_processes) in replica_statuses {
1852 self.cluster_replica_statuses
1853 .initialize_cluster_replica_statuses(
1854 cluster_id,
1855 replica_id,
1856 num_processes,
1857 now,
1858 );
1859 }
1860 }
1861
1862 let system_config = self.catalog().system_config();
1863
1864 mz_metrics::update_dyncfg(&system_config.dyncfg_updates());
1866
1867 let compute_config = flags::compute_config(system_config);
1869 let storage_config = flags::storage_config(system_config);
1870 let scheduling_config = flags::orchestrator_scheduling_config(system_config);
1871 let dyncfg_updates = system_config.dyncfg_updates();
1872 self.controller.compute.update_configuration(compute_config);
1873 self.controller.storage.update_parameters(storage_config);
1874 self.controller
1875 .update_orchestrator_scheduling_config(scheduling_config);
1876 self.controller.update_configuration(dyncfg_updates);
1877
1878 self.validate_resource_limit_numeric(
1879 Numeric::zero(),
1880 self.current_credit_consumption_rate(),
1881 |system_vars| {
1882 self.license_key
1883 .max_credit_consumption_rate()
1884 .map_or_else(|| system_vars.max_credit_consumption_rate(), Numeric::from)
1885 },
1886 "cluster replica",
1887 MAX_CREDIT_CONSUMPTION_RATE.name(),
1888 )?;
1889
1890 let mut policies_to_set: BTreeMap<CompactionWindow, CollectionIdBundle> =
1891 Default::default();
1892
1893 let enable_worker_core_affinity =
1894 self.catalog().system_config().enable_worker_core_affinity();
1895 for instance in self.catalog.clusters() {
1896 self.controller.create_cluster(
1897 instance.id,
1898 ClusterConfig {
1899 arranged_logs: instance.log_indexes.clone(),
1900 workload_class: instance.config.workload_class.clone(),
1901 },
1902 )?;
1903 for replica in instance.replicas() {
1904 let role = instance.role();
1905 self.controller.create_replica(
1906 instance.id,
1907 replica.replica_id,
1908 instance.name.clone(),
1909 replica.name.clone(),
1910 role,
1911 replica.config.clone(),
1912 enable_worker_core_affinity,
1913 )?;
1914 }
1915 }
1916
1917 info!(
1918 "startup: coordinator init: bootstrap: preamble complete in {:?}",
1919 bootstrap_start.elapsed()
1920 );
1921
1922 let init_storage_collections_start = Instant::now();
1923 info!("startup: coordinator init: bootstrap: storage collections init beginning");
1924 self.bootstrap_storage_collections(&migrated_storage_collections_0dt)
1925 .await;
1926 info!(
1927 "startup: coordinator init: bootstrap: storage collections init complete in {:?}",
1928 init_storage_collections_start.elapsed()
1929 );
1930
1931 self.controller.start_compute_introspection_sink();
1936
1937 let optimize_dataflows_start = Instant::now();
1938 info!("startup: coordinator init: bootstrap: optimize dataflow plans beginning");
1939 let entries: Vec<_> = self.catalog().entries().cloned().collect();
1940 let uncached_global_exps = self.bootstrap_dataflow_plans(&entries, cached_global_exprs)?;
1941 info!(
1942 "startup: coordinator init: bootstrap: optimize dataflow plans complete in {:?}",
1943 optimize_dataflows_start.elapsed()
1944 );
1945
1946 let _fut = self.catalog().update_expression_cache(
1948 uncached_local_exprs.into_iter().collect(),
1949 uncached_global_exps.into_iter().collect(),
1950 );
1951
1952 let bootstrap_as_ofs_start = Instant::now();
1956 info!("startup: coordinator init: bootstrap: dataflow as-of bootstrapping beginning");
1957 let dataflow_read_holds = self.bootstrap_dataflow_as_ofs().await;
1958 info!(
1959 "startup: coordinator init: bootstrap: dataflow as-of bootstrapping complete in {:?}",
1960 bootstrap_as_ofs_start.elapsed()
1961 );
1962
1963 let postamble_start = Instant::now();
1964 info!("startup: coordinator init: bootstrap: postamble beginning");
1965
1966 let logs: BTreeSet<_> = BUILTINS::logs()
1967 .map(|log| self.catalog().resolve_builtin_log(log))
1968 .flat_map(|item_id| self.catalog().get_global_ids(&item_id))
1969 .collect();
1970
1971 let mut privatelink_connections = BTreeMap::new();
1972
1973 for entry in &entries {
1974 debug!(
1975 "coordinator init: installing {} {}",
1976 entry.item().typ(),
1977 entry.id()
1978 );
1979 let mut policy = entry.item().initial_logical_compaction_window();
1980 match entry.item() {
1981 CatalogItem::Source(source) => {
1987 if source.custom_logical_compaction_window.is_none() {
1989 if let DataSourceDesc::IngestionExport { ingestion_id, .. } =
1990 source.data_source
1991 {
1992 policy = Some(
1993 self.catalog()
1994 .get_entry(&ingestion_id)
1995 .source()
1996 .expect("must be source")
1997 .custom_logical_compaction_window
1998 .unwrap_or_default(),
1999 );
2000 }
2001 }
2002 policies_to_set
2003 .entry(policy.expect("sources have a compaction window"))
2004 .or_insert_with(Default::default)
2005 .storage_ids
2006 .insert(source.global_id());
2007 }
2008 CatalogItem::Table(table) => {
2009 policies_to_set
2010 .entry(policy.expect("tables have a compaction window"))
2011 .or_insert_with(Default::default)
2012 .storage_ids
2013 .extend(table.global_ids());
2014 }
2015 CatalogItem::Index(idx) => {
2016 let policy_entry = policies_to_set
2017 .entry(policy.expect("indexes have a compaction window"))
2018 .or_insert_with(Default::default);
2019
2020 if logs.contains(&idx.on) {
2021 policy_entry
2022 .compute_ids
2023 .entry(idx.cluster_id)
2024 .or_insert_with(BTreeSet::new)
2025 .insert(idx.global_id());
2026 } else {
2027 let df_desc = self
2028 .catalog()
2029 .try_get_physical_plan(&idx.global_id())
2030 .expect("added in `bootstrap_dataflow_plans`")
2031 .clone();
2032
2033 let df_meta = self
2034 .catalog()
2035 .try_get_dataflow_metainfo(&idx.global_id())
2036 .expect("added in `bootstrap_dataflow_plans`");
2037
2038 if self.catalog().state().system_config().enable_mz_notices() {
2039 self.catalog().state().pack_optimizer_notices(
2041 &mut builtin_table_updates,
2042 df_meta.optimizer_notices.iter(),
2043 Diff::ONE,
2044 );
2045 }
2046
2047 policy_entry
2050 .compute_ids
2051 .entry(idx.cluster_id)
2052 .or_insert_with(Default::default)
2053 .extend(df_desc.export_ids());
2054
2055 self.controller
2056 .compute
2057 .create_dataflow(idx.cluster_id, df_desc, None)
2058 .unwrap_or_terminate("cannot fail to create dataflows");
2059 }
2060 }
2061 CatalogItem::View(_) => (),
2062 CatalogItem::MaterializedView(mview) => {
2063 policies_to_set
2064 .entry(policy.expect("materialized views have a compaction window"))
2065 .or_insert_with(Default::default)
2066 .storage_ids
2067 .insert(mview.global_id());
2068
2069 let mut df_desc = self
2070 .catalog()
2071 .try_get_physical_plan(&mview.global_id())
2072 .expect("added in `bootstrap_dataflow_plans`")
2073 .clone();
2074
2075 if let Some(initial_as_of) = mview.initial_as_of.clone() {
2076 df_desc.set_initial_as_of(initial_as_of);
2077 }
2078
2079 let until = mview
2081 .refresh_schedule
2082 .as_ref()
2083 .and_then(|s| s.last_refresh())
2084 .and_then(|r| r.try_step_forward());
2085 if let Some(until) = until {
2086 df_desc.until.meet_assign(&Antichain::from_elem(until));
2087 }
2088
2089 let df_meta = self
2090 .catalog()
2091 .try_get_dataflow_metainfo(&mview.global_id())
2092 .expect("added in `bootstrap_dataflow_plans`");
2093
2094 if self.catalog().state().system_config().enable_mz_notices() {
2095 self.catalog().state().pack_optimizer_notices(
2097 &mut builtin_table_updates,
2098 df_meta.optimizer_notices.iter(),
2099 Diff::ONE,
2100 );
2101 }
2102
2103 self.ship_dataflow(df_desc, mview.cluster_id, None).await;
2104 self.allow_writes(mview.cluster_id, mview.global_id());
2105 }
2106 CatalogItem::Sink(sink) => {
2107 policies_to_set
2108 .entry(CompactionWindow::Default)
2109 .or_insert_with(Default::default)
2110 .storage_ids
2111 .insert(sink.global_id());
2112 }
2113 CatalogItem::Connection(catalog_connection) => {
2114 if let ConnectionDetails::AwsPrivatelink(conn) = &catalog_connection.details {
2115 privatelink_connections.insert(
2116 entry.id(),
2117 VpcEndpointConfig {
2118 aws_service_name: conn.service_name.clone(),
2119 availability_zone_ids: conn.availability_zones.clone(),
2120 },
2121 );
2122 }
2123 }
2124 CatalogItem::ContinualTask(ct) => {
2125 policies_to_set
2126 .entry(policy.expect("continual tasks have a compaction window"))
2127 .or_insert_with(Default::default)
2128 .storage_ids
2129 .insert(ct.global_id());
2130
2131 let mut df_desc = self
2132 .catalog()
2133 .try_get_physical_plan(&ct.global_id())
2134 .expect("added in `bootstrap_dataflow_plans`")
2135 .clone();
2136
2137 if let Some(initial_as_of) = ct.initial_as_of.clone() {
2138 df_desc.set_initial_as_of(initial_as_of);
2139 }
2140
2141 let df_meta = self
2142 .catalog()
2143 .try_get_dataflow_metainfo(&ct.global_id())
2144 .expect("added in `bootstrap_dataflow_plans`");
2145
2146 if self.catalog().state().system_config().enable_mz_notices() {
2147 self.catalog().state().pack_optimizer_notices(
2149 &mut builtin_table_updates,
2150 df_meta.optimizer_notices.iter(),
2151 Diff::ONE,
2152 );
2153 }
2154
2155 self.ship_dataflow(df_desc, ct.cluster_id, None).await;
2156 self.allow_writes(ct.cluster_id, ct.global_id());
2157 }
2158 CatalogItem::Log(_)
2160 | CatalogItem::Type(_)
2161 | CatalogItem::Func(_)
2162 | CatalogItem::Secret(_) => {}
2163 }
2164 }
2165
2166 if let Some(cloud_resource_controller) = &self.cloud_resource_controller {
2167 let existing_vpc_endpoints = cloud_resource_controller
2169 .list_vpc_endpoints()
2170 .await
2171 .context("list vpc endpoints")?;
2172 let existing_vpc_endpoints = BTreeSet::from_iter(existing_vpc_endpoints.into_keys());
2173 let desired_vpc_endpoints = privatelink_connections.keys().cloned().collect();
2174 let vpc_endpoints_to_remove = existing_vpc_endpoints.difference(&desired_vpc_endpoints);
2175 for id in vpc_endpoints_to_remove {
2176 cloud_resource_controller
2177 .delete_vpc_endpoint(*id)
2178 .await
2179 .context("deleting extraneous vpc endpoint")?;
2180 }
2181
2182 for (id, spec) in privatelink_connections {
2184 cloud_resource_controller
2185 .ensure_vpc_endpoint(id, spec)
2186 .await
2187 .context("ensuring vpc endpoint")?;
2188 }
2189 }
2190
2191 drop(dataflow_read_holds);
2194 for (cw, policies) in policies_to_set {
2196 self.initialize_read_policies(&policies, cw).await;
2197 }
2198
2199 builtin_table_updates.extend(
2201 self.catalog().state().resolve_builtin_table_updates(
2202 self.catalog().state().pack_all_replica_size_updates(),
2203 ),
2204 );
2205
2206 debug!("startup: coordinator init: bootstrap: initializing migrated builtin tables");
2207 let migrated_updates_fut = if self.controller.read_only() {
2213 let min_timestamp = Timestamp::minimum();
2214 let migrated_builtin_table_updates: Vec<_> = builtin_table_updates
2215 .extract_if(.., |update| {
2216 let gid = self.catalog().get_entry(&update.id).latest_global_id();
2217 migrated_storage_collections_0dt.contains(&update.id)
2218 && self
2219 .controller
2220 .storage_collections
2221 .collection_frontiers(gid)
2222 .expect("all tables are registered")
2223 .write_frontier
2224 .elements()
2225 == &[min_timestamp]
2226 })
2227 .collect();
2228 if migrated_builtin_table_updates.is_empty() {
2229 futures::future::ready(()).boxed()
2230 } else {
2231 let mut grouped_appends: BTreeMap<GlobalId, Vec<TableData>> = BTreeMap::new();
2233 for update in migrated_builtin_table_updates {
2234 let gid = self.catalog().get_entry(&update.id).latest_global_id();
2235 grouped_appends.entry(gid).or_default().push(update.data);
2236 }
2237 info!(
2238 "coordinator init: rehydrating migrated builtin tables in read-only mode: {:?}",
2239 grouped_appends.keys().collect::<Vec<_>>()
2240 );
2241
2242 let mut all_appends = Vec::with_capacity(grouped_appends.len());
2244 for (item_id, table_data) in grouped_appends.into_iter() {
2245 let mut all_rows = Vec::new();
2246 let mut all_data = Vec::new();
2247 for data in table_data {
2248 match data {
2249 TableData::Rows(rows) => all_rows.extend(rows),
2250 TableData::Batches(_) => all_data.push(data),
2251 }
2252 }
2253 differential_dataflow::consolidation::consolidate(&mut all_rows);
2254 all_data.push(TableData::Rows(all_rows));
2255
2256 all_appends.push((item_id, all_data));
2258 }
2259
2260 let fut = self
2261 .controller
2262 .storage
2263 .append_table(min_timestamp, boot_ts.step_forward(), all_appends)
2264 .expect("cannot fail to append");
2265 async {
2266 fut.await
2267 .expect("One-shot shouldn't be dropped during bootstrap")
2268 .unwrap_or_terminate("cannot fail to append")
2269 }
2270 .boxed()
2271 }
2272 } else {
2273 futures::future::ready(()).boxed()
2274 };
2275
2276 info!(
2277 "startup: coordinator init: bootstrap: postamble complete in {:?}",
2278 postamble_start.elapsed()
2279 );
2280
2281 let builtin_update_start = Instant::now();
2282 info!("startup: coordinator init: bootstrap: generate builtin updates beginning");
2283
2284 if self.controller.read_only() {
2285 info!(
2286 "coordinator init: bootstrap: stashing builtin table updates while in read-only mode"
2287 );
2288
2289 let audit_join_start = Instant::now();
2291 info!("startup: coordinator init: bootstrap: audit log deserialization beginning");
2292 let audit_log_updates: Vec<_> = audit_logs_iterator
2293 .map(|(audit_log, ts)| StateUpdate {
2294 kind: StateUpdateKind::AuditLog(audit_log),
2295 ts,
2296 diff: StateDiff::Addition,
2297 })
2298 .collect();
2299 let audit_log_builtin_table_updates = self
2300 .catalog()
2301 .state()
2302 .generate_builtin_table_updates(audit_log_updates);
2303 builtin_table_updates.extend(audit_log_builtin_table_updates);
2304 info!(
2305 "startup: coordinator init: bootstrap: audit log deserialization complete in {:?}",
2306 audit_join_start.elapsed()
2307 );
2308 self.buffered_builtin_table_updates
2309 .as_mut()
2310 .expect("in read-only mode")
2311 .append(&mut builtin_table_updates);
2312 } else {
2313 self.bootstrap_tables(&entries, builtin_table_updates, audit_logs_iterator)
2314 .await;
2315 };
2316 info!(
2317 "startup: coordinator init: bootstrap: generate builtin updates complete in {:?}",
2318 builtin_update_start.elapsed()
2319 );
2320
2321 let cleanup_secrets_start = Instant::now();
2322 info!("startup: coordinator init: bootstrap: generate secret cleanup beginning");
2323 {
2327 let Self {
2330 secrets_controller,
2331 catalog,
2332 ..
2333 } = self;
2334
2335 let next_user_item_id = catalog.get_next_user_item_id().await?;
2336 let next_system_item_id = catalog.get_next_system_item_id().await?;
2337 let read_only = self.controller.read_only();
2338 let catalog_ids: BTreeSet<CatalogItemId> =
2343 catalog.entries().map(|entry| entry.id()).collect();
2344 let secrets_controller = Arc::clone(secrets_controller);
2345
2346 spawn(|| "cleanup-orphaned-secrets", async move {
2347 if read_only {
2348 info!(
2349 "coordinator init: not cleaning up orphaned secrets while in read-only mode"
2350 );
2351 return;
2352 }
2353 info!("coordinator init: cleaning up orphaned secrets");
2354
2355 match secrets_controller.list().await {
2356 Ok(controller_secrets) => {
2357 let controller_secrets: BTreeSet<CatalogItemId> =
2358 controller_secrets.into_iter().collect();
2359 let orphaned = controller_secrets.difference(&catalog_ids);
2360 for id in orphaned {
2361 let id_too_large = match id {
2362 CatalogItemId::System(id) => *id >= next_system_item_id,
2363 CatalogItemId::User(id) => *id >= next_user_item_id,
2364 CatalogItemId::IntrospectionSourceIndex(_)
2365 | CatalogItemId::Transient(_) => false,
2366 };
2367 if id_too_large {
2368 info!(
2369 %next_user_item_id, %next_system_item_id,
2370 "coordinator init: not deleting orphaned secret {id} that was likely created by a newer deploy generation"
2371 );
2372 } else {
2373 info!("coordinator init: deleting orphaned secret {id}");
2374 fail_point!("orphan_secrets");
2375 if let Err(e) = secrets_controller.delete(*id).await {
2376 warn!(
2377 "Dropping orphaned secret has encountered an error: {}",
2378 e
2379 );
2380 }
2381 }
2382 }
2383 }
2384 Err(e) => warn!("Failed to list secrets during orphan cleanup: {:?}", e),
2385 }
2386 });
2387 }
2388 info!(
2389 "startup: coordinator init: bootstrap: generate secret cleanup complete in {:?}",
2390 cleanup_secrets_start.elapsed()
2391 );
2392
2393 let final_steps_start = Instant::now();
2395 info!(
2396 "startup: coordinator init: bootstrap: migrate builtin tables in read-only mode beginning"
2397 );
2398 migrated_updates_fut
2399 .instrument(info_span!("coord::bootstrap::final"))
2400 .await;
2401
2402 debug!(
2403 "startup: coordinator init: bootstrap: announcing completion of initialization to controller"
2404 );
2405 self.controller.initialization_complete();
2407
2408 self.bootstrap_introspection_subscribes().await;
2410
2411 info!(
2412 "startup: coordinator init: bootstrap: migrate builtin tables in read-only mode complete in {:?}",
2413 final_steps_start.elapsed()
2414 );
2415
2416 info!(
2417 "startup: coordinator init: bootstrap complete in {:?}",
2418 bootstrap_start.elapsed()
2419 );
2420 Ok(())
2421 }
2422
2423 #[allow(clippy::async_yields_async)]
2428 #[instrument]
2429 async fn bootstrap_tables(
2430 &mut self,
2431 entries: &[CatalogEntry],
2432 mut builtin_table_updates: Vec<BuiltinTableUpdate>,
2433 audit_logs_iterator: AuditLogIterator,
2434 ) {
2435 struct TableMetadata<'a> {
2437 id: CatalogItemId,
2438 name: &'a QualifiedItemName,
2439 table: &'a Table,
2440 }
2441
2442 let table_metas: Vec<_> = entries
2444 .into_iter()
2445 .filter_map(|entry| {
2446 entry.table().map(|table| TableMetadata {
2447 id: entry.id(),
2448 name: entry.name(),
2449 table,
2450 })
2451 })
2452 .collect();
2453
2454 debug!("coordinator init: advancing all tables to current timestamp");
2456 let WriteTimestamp {
2457 timestamp: write_ts,
2458 advance_to,
2459 } = self.get_local_write_ts().await;
2460 let appends = table_metas
2461 .iter()
2462 .map(|meta| (meta.table.global_id_writes(), Vec::new()))
2463 .collect();
2464 let table_fence_rx = self
2468 .controller
2469 .storage
2470 .append_table(write_ts.clone(), advance_to, appends)
2471 .expect("invalid updates");
2472
2473 self.apply_local_write(write_ts).await;
2474
2475 debug!("coordinator init: resetting system tables");
2477 let read_ts = self.get_local_read_ts().await;
2478
2479 let mz_storage_usage_by_shard_schema: SchemaSpecifier = self
2482 .catalog()
2483 .resolve_system_schema(MZ_STORAGE_USAGE_BY_SHARD.schema)
2484 .into();
2485 let is_storage_usage_by_shard = |meta: &TableMetadata| -> bool {
2486 meta.name.item == MZ_STORAGE_USAGE_BY_SHARD.name
2487 && meta.name.qualifiers.schema_spec == mz_storage_usage_by_shard_schema
2488 };
2489
2490 let mut retraction_tasks = Vec::new();
2491 let mut system_tables: Vec<_> = table_metas
2492 .iter()
2493 .filter(|meta| meta.id.is_system() && !is_storage_usage_by_shard(meta))
2494 .collect();
2495
2496 let (audit_events_idx, _) = system_tables
2498 .iter()
2499 .find_position(|table| {
2500 table.id == self.catalog().resolve_builtin_table(&MZ_AUDIT_EVENTS)
2501 })
2502 .expect("mz_audit_events must exist");
2503 let audit_events = system_tables.remove(audit_events_idx);
2504 let audit_log_task = self.bootstrap_audit_log_table(
2505 audit_events.id,
2506 audit_events.name,
2507 audit_events.table,
2508 audit_logs_iterator,
2509 read_ts,
2510 );
2511
2512 for system_table in system_tables {
2513 let table_id = system_table.id;
2514 let full_name = self.catalog().resolve_full_name(system_table.name, None);
2515 debug!("coordinator init: resetting system table {full_name} ({table_id})");
2516
2517 let snapshot_fut = self
2519 .controller
2520 .storage_collections
2521 .snapshot_cursor(system_table.table.global_id_writes(), read_ts);
2522 let batch_fut = self
2523 .controller
2524 .storage_collections
2525 .create_update_builder(system_table.table.global_id_writes());
2526
2527 let task = spawn(|| format!("snapshot-{table_id}"), async move {
2528 let mut batch = batch_fut
2530 .await
2531 .unwrap_or_terminate("cannot fail to create a batch for a BuiltinTable");
2532 tracing::info!(?table_id, "starting snapshot");
2533 let mut snapshot_cursor = snapshot_fut
2535 .await
2536 .unwrap_or_terminate("cannot fail to snapshot");
2537
2538 while let Some(values) = snapshot_cursor.next().await {
2540 for ((key, _val), _t, d) in values {
2541 let key = key.expect("builtin table had errors");
2542 let d_invert = d.neg();
2543 batch.add(&key, &(), &d_invert).await;
2544 }
2545 }
2546 tracing::info!(?table_id, "finished snapshot");
2547
2548 let batch = batch.finish().await;
2549 BuiltinTableUpdate::batch(table_id, batch)
2550 });
2551 retraction_tasks.push(task);
2552 }
2553
2554 let retractions_res = futures::future::join_all(retraction_tasks).await;
2555 for retractions in retractions_res {
2556 let retractions = retractions.expect("cannot fail to fetch snapshot");
2557 builtin_table_updates.push(retractions);
2558 }
2559
2560 let audit_join_start = Instant::now();
2561 info!("startup: coordinator init: bootstrap: join audit log deserialization beginning");
2562 let audit_log_updates = audit_log_task
2563 .await
2564 .expect("cannot fail to fetch audit log updates");
2565 let audit_log_builtin_table_updates = self
2566 .catalog()
2567 .state()
2568 .generate_builtin_table_updates(audit_log_updates);
2569 builtin_table_updates.extend(audit_log_builtin_table_updates);
2570 info!(
2571 "startup: coordinator init: bootstrap: join audit log deserialization complete in {:?}",
2572 audit_join_start.elapsed()
2573 );
2574
2575 table_fence_rx
2577 .await
2578 .expect("One-shot shouldn't be dropped during bootstrap")
2579 .unwrap_or_terminate("cannot fail to append");
2580
2581 info!("coordinator init: sending builtin table updates");
2582 let (_builtin_updates_fut, write_ts) = self
2583 .builtin_table_update()
2584 .execute(builtin_table_updates)
2585 .await;
2586 info!(?write_ts, "our write ts");
2587 if let Some(write_ts) = write_ts {
2588 self.apply_local_write(write_ts).await;
2589 }
2590 }
2591
2592 #[instrument]
2596 fn bootstrap_audit_log_table<'a>(
2597 &mut self,
2598 table_id: CatalogItemId,
2599 name: &'a QualifiedItemName,
2600 table: &'a Table,
2601 audit_logs_iterator: AuditLogIterator,
2602 read_ts: Timestamp,
2603 ) -> JoinHandle<Vec<StateUpdate>> {
2604 let full_name = self.catalog().resolve_full_name(name, None);
2605 debug!("coordinator init: reconciling audit log: {full_name} ({table_id})");
2606 let current_contents_fut = self
2607 .controller
2608 .storage_collections
2609 .snapshot(table.global_id_writes(), read_ts);
2610 spawn(|| format!("snapshot-audit-log-{table_id}"), async move {
2611 let current_contents = current_contents_fut
2612 .await
2613 .unwrap_or_terminate("cannot fail to fetch snapshot");
2614 let contents_len = current_contents.len();
2615 debug!("coordinator init: audit log table ({table_id}) size {contents_len}");
2616
2617 let max_table_id = current_contents
2619 .into_iter()
2620 .filter(|(_, diff)| *diff == 1)
2621 .map(|(row, _diff)| row.unpack_first().unwrap_uint64())
2622 .sorted()
2623 .rev()
2624 .next();
2625
2626 audit_logs_iterator
2628 .take_while(|(audit_log, _)| match max_table_id {
2629 Some(id) => audit_log.event.sortable_id() > id,
2630 None => true,
2631 })
2632 .map(|(audit_log, ts)| StateUpdate {
2633 kind: StateUpdateKind::AuditLog(audit_log),
2634 ts,
2635 diff: StateDiff::Addition,
2636 })
2637 .collect::<Vec<_>>()
2638 })
2639 }
2640
2641 #[instrument]
2654 async fn bootstrap_storage_collections(
2655 &mut self,
2656 migrated_storage_collections: &BTreeSet<CatalogItemId>,
2657 ) {
2658 let catalog = self.catalog();
2659 let source_status_collection_id = catalog
2660 .resolve_builtin_storage_collection(&mz_catalog::builtin::MZ_SOURCE_STATUS_HISTORY);
2661 let source_status_collection_id = catalog
2662 .get_entry(&source_status_collection_id)
2663 .latest_global_id();
2664
2665 let source_desc = |object_id: GlobalId,
2666 data_source: &DataSourceDesc,
2667 desc: &RelationDesc,
2668 timeline: &Timeline| {
2669 let (data_source, status_collection_id) = match data_source.clone() {
2670 DataSourceDesc::Ingestion { desc, cluster_id } => {
2672 let desc = desc.into_inline_connection(catalog.state());
2673 let ingestion = IngestionDescription::new(desc, cluster_id, object_id);
2674
2675 (
2676 DataSource::Ingestion(ingestion),
2677 Some(source_status_collection_id),
2678 )
2679 }
2680 DataSourceDesc::OldSyntaxIngestion {
2681 desc,
2682 progress_subsource,
2683 data_config,
2684 details,
2685 cluster_id,
2686 } => {
2687 let desc = desc.into_inline_connection(catalog.state());
2688 let data_config = data_config.into_inline_connection(catalog.state());
2689 let progress_subsource =
2692 catalog.get_entry(&progress_subsource).latest_global_id();
2693 let mut ingestion =
2694 IngestionDescription::new(desc, cluster_id, progress_subsource);
2695 let legacy_export = SourceExport {
2696 storage_metadata: (),
2697 data_config,
2698 details,
2699 };
2700 ingestion.source_exports.insert(object_id, legacy_export);
2701
2702 (
2703 DataSource::Ingestion(ingestion),
2704 Some(source_status_collection_id),
2705 )
2706 }
2707 DataSourceDesc::IngestionExport {
2708 ingestion_id,
2709 external_reference: _,
2710 details,
2711 data_config,
2712 } => {
2713 let ingestion_id = catalog.get_entry(&ingestion_id).latest_global_id();
2716 (
2717 DataSource::IngestionExport {
2718 ingestion_id,
2719 details,
2720 data_config: data_config.into_inline_connection(catalog.state()),
2721 },
2722 Some(source_status_collection_id),
2723 )
2724 }
2725 DataSourceDesc::Webhook { .. } => {
2726 (DataSource::Webhook, Some(source_status_collection_id))
2727 }
2728 DataSourceDesc::Progress => (DataSource::Progress, None),
2729 DataSourceDesc::Introspection(introspection) => {
2730 (DataSource::Introspection(introspection), None)
2731 }
2732 };
2733 CollectionDescription {
2734 desc: desc.clone(),
2735 data_source,
2736 since: None,
2737 status_collection_id,
2738 timeline: Some(timeline.clone()),
2739 }
2740 };
2741
2742 let mut compute_collections = vec![];
2743 let mut collections = vec![];
2744 let mut new_builtin_continual_tasks = vec![];
2745 for entry in catalog.entries() {
2746 match entry.item() {
2747 CatalogItem::Source(source) => {
2748 collections.push((
2749 source.global_id(),
2750 source_desc(
2751 source.global_id(),
2752 &source.data_source,
2753 &source.desc,
2754 &source.timeline,
2755 ),
2756 ));
2757 }
2758 CatalogItem::Table(table) => {
2759 match &table.data_source {
2760 TableDataSource::TableWrites { defaults: _ } => {
2761 let versions: BTreeMap<_, _> = table
2762 .collection_descs()
2763 .map(|(gid, version, desc)| (version, (gid, desc)))
2764 .collect();
2765 let collection_descs = versions.iter().map(|(version, (gid, desc))| {
2766 let next_version = version.bump();
2767 let primary_collection =
2768 versions.get(&next_version).map(|(gid, _desc)| gid).copied();
2769 let collection_desc = CollectionDescription::for_table(
2770 desc.clone(),
2771 primary_collection,
2772 );
2773
2774 (*gid, collection_desc)
2775 });
2776 collections.extend(collection_descs);
2777 }
2778 TableDataSource::DataSource {
2779 desc: data_source_desc,
2780 timeline,
2781 } => {
2782 soft_assert_eq_or_log!(table.collections.len(), 1);
2784 let collection_descs =
2785 table.collection_descs().map(|(gid, _version, desc)| {
2786 (
2787 gid,
2788 source_desc(
2789 entry.latest_global_id(),
2790 data_source_desc,
2791 &desc,
2792 timeline,
2793 ),
2794 )
2795 });
2796 collections.extend(collection_descs);
2797 }
2798 };
2799 }
2800 CatalogItem::MaterializedView(mv) => {
2801 let collection_desc = CollectionDescription {
2802 desc: mv.desc.clone(),
2803 data_source: DataSource::Other,
2804 since: mv.initial_as_of.clone(),
2805 status_collection_id: None,
2806 timeline: None,
2807 };
2808 compute_collections.push((mv.global_id(), mv.desc.clone()));
2809 collections.push((mv.global_id(), collection_desc));
2810 }
2811 CatalogItem::ContinualTask(ct) => {
2812 let collection_desc = CollectionDescription {
2813 desc: ct.desc.clone(),
2814 data_source: DataSource::Other,
2815 since: ct.initial_as_of.clone(),
2816 status_collection_id: None,
2817 timeline: None,
2818 };
2819 if ct.global_id().is_system() && collection_desc.since.is_none() {
2820 new_builtin_continual_tasks.push((ct.global_id(), collection_desc));
2824 } else {
2825 compute_collections.push((ct.global_id(), ct.desc.clone()));
2826 collections.push((ct.global_id(), collection_desc));
2827 }
2828 }
2829 CatalogItem::Sink(sink) => {
2830 let storage_sink_from_entry = self.catalog().get_entry_by_global_id(&sink.from);
2831 let from_desc = storage_sink_from_entry
2832 .desc(&self.catalog().resolve_full_name(
2833 storage_sink_from_entry.name(),
2834 storage_sink_from_entry.conn_id(),
2835 ))
2836 .expect("sinks can only be built on items with descs")
2837 .into_owned();
2838 let collection_desc = CollectionDescription {
2839 desc: KAFKA_PROGRESS_DESC.clone(),
2841 data_source: DataSource::Sink {
2842 desc: ExportDescription {
2843 sink: StorageSinkDesc {
2844 from: sink.from,
2845 from_desc,
2846 connection: sink
2847 .connection
2848 .clone()
2849 .into_inline_connection(self.catalog().state()),
2850 envelope: sink.envelope,
2851 as_of: Antichain::from_elem(Timestamp::minimum()),
2852 with_snapshot: sink.with_snapshot,
2853 version: sink.version,
2854 from_storage_metadata: (),
2855 to_storage_metadata: (),
2856 },
2857 instance_id: sink.cluster_id,
2858 },
2859 },
2860 since: None,
2861 status_collection_id: None,
2862 timeline: None,
2863 };
2864 collections.push((sink.global_id, collection_desc));
2865 }
2866 _ => (),
2867 }
2868 }
2869
2870 let register_ts = if self.controller.read_only() {
2871 self.get_local_read_ts().await
2872 } else {
2873 self.get_local_write_ts().await.timestamp
2876 };
2877
2878 let storage_metadata = self.catalog.state().storage_metadata();
2879 let migrated_storage_collections = migrated_storage_collections
2880 .into_iter()
2881 .flat_map(|item_id| self.catalog.get_entry(item_id).global_ids())
2882 .collect();
2883
2884 self.controller
2889 .storage
2890 .evolve_nullability_for_bootstrap(storage_metadata, compute_collections)
2891 .await
2892 .unwrap_or_terminate("cannot fail to evolve collections");
2893
2894 self.controller
2895 .storage
2896 .create_collections_for_bootstrap(
2897 storage_metadata,
2898 Some(register_ts),
2899 collections,
2900 &migrated_storage_collections,
2901 )
2902 .await
2903 .unwrap_or_terminate("cannot fail to create collections");
2904
2905 self.bootstrap_builtin_continual_tasks(new_builtin_continual_tasks)
2906 .await;
2907
2908 if !self.controller.read_only() {
2909 self.apply_local_write(register_ts).await;
2910 }
2911 }
2912
2913 async fn bootstrap_builtin_continual_tasks(
2920 &mut self,
2921 mut collections: Vec<(GlobalId, CollectionDescription<Timestamp>)>,
2923 ) {
2924 for (id, collection) in &mut collections {
2925 let entry = self.catalog.get_entry_by_global_id(id);
2926 let ct = match &entry.item {
2927 CatalogItem::ContinualTask(ct) => ct.clone(),
2928 _ => unreachable!("only called with continual task builtins"),
2929 };
2930 let debug_name = self
2931 .catalog()
2932 .resolve_full_name(entry.name(), None)
2933 .to_string();
2934 let (_optimized_plan, physical_plan, _metainfo) = self
2935 .optimize_create_continual_task(&ct, *id, self.owned_catalog(), debug_name)
2936 .expect("builtin CT should optimize successfully");
2937
2938 let mut id_bundle = dataflow_import_id_bundle(&physical_plan, ct.cluster_id);
2940 id_bundle.storage_ids.remove(id);
2942 let read_holds = self.acquire_read_holds(&id_bundle);
2943 let as_of = read_holds.least_valid_read();
2944
2945 collection.since = Some(as_of.clone());
2946 }
2947 self.controller
2948 .storage
2949 .create_collections(self.catalog.state().storage_metadata(), None, collections)
2950 .await
2951 .unwrap_or_terminate("cannot fail to create collections");
2952 }
2953
2954 #[instrument]
2965 fn bootstrap_dataflow_plans(
2966 &mut self,
2967 ordered_catalog_entries: &[CatalogEntry],
2968 mut cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
2969 ) -> Result<BTreeMap<GlobalId, GlobalExpressions>, AdapterError> {
2970 let mut instance_snapshots = BTreeMap::new();
2976 let mut uncached_expressions = BTreeMap::new();
2977
2978 let optimizer_config = OptimizerConfig::from(self.catalog().system_config());
2979
2980 for entry in ordered_catalog_entries {
2981 match entry.item() {
2982 CatalogItem::Index(idx) => {
2983 let compute_instance =
2985 instance_snapshots.entry(idx.cluster_id).or_insert_with(|| {
2986 self.instance_snapshot(idx.cluster_id)
2987 .expect("compute instance exists")
2988 });
2989 let global_id = idx.global_id();
2990
2991 if compute_instance.contains_collection(&global_id) {
2994 continue;
2995 }
2996
2997 let (optimized_plan, physical_plan, metainfo) =
2998 match cached_global_exprs.remove(&global_id) {
2999 Some(global_expressions)
3000 if global_expressions.optimizer_features
3001 == optimizer_config.features =>
3002 {
3003 debug!("global expression cache hit for {global_id:?}");
3004 (
3005 global_expressions.global_mir,
3006 global_expressions.physical_plan,
3007 global_expressions.dataflow_metainfos,
3008 )
3009 }
3010 Some(_) | None => {
3011 let (optimized_plan, global_lir_plan) = {
3012 let mut optimizer = optimize::index::Optimizer::new(
3014 self.owned_catalog(),
3015 compute_instance.clone(),
3016 global_id,
3017 optimizer_config.clone(),
3018 self.optimizer_metrics(),
3019 );
3020
3021 let index_plan = optimize::index::Index::new(
3023 entry.name().clone(),
3024 idx.on,
3025 idx.keys.to_vec(),
3026 );
3027 let global_mir_plan = optimizer.optimize(index_plan)?;
3028 let optimized_plan = global_mir_plan.df_desc().clone();
3029
3030 let global_lir_plan = optimizer.optimize(global_mir_plan)?;
3032
3033 (optimized_plan, global_lir_plan)
3034 };
3035
3036 let (physical_plan, metainfo) = global_lir_plan.unapply();
3037 let metainfo = {
3038 let notice_ids =
3040 std::iter::repeat_with(|| self.allocate_transient_id())
3041 .map(|(_item_id, gid)| gid)
3042 .take(metainfo.optimizer_notices.len())
3043 .collect::<Vec<_>>();
3044 self.catalog().render_notices(
3046 metainfo,
3047 notice_ids,
3048 Some(idx.global_id()),
3049 )
3050 };
3051 uncached_expressions.insert(
3052 global_id,
3053 GlobalExpressions {
3054 global_mir: optimized_plan.clone(),
3055 physical_plan: physical_plan.clone(),
3056 dataflow_metainfos: metainfo.clone(),
3057 optimizer_features: OptimizerFeatures::from(
3058 self.catalog().system_config(),
3059 ),
3060 },
3061 );
3062 (optimized_plan, physical_plan, metainfo)
3063 }
3064 };
3065
3066 let catalog = self.catalog_mut();
3067 catalog.set_optimized_plan(idx.global_id(), optimized_plan);
3068 catalog.set_physical_plan(idx.global_id(), physical_plan);
3069 catalog.set_dataflow_metainfo(idx.global_id(), metainfo);
3070
3071 compute_instance.insert_collection(idx.global_id());
3072 }
3073 CatalogItem::MaterializedView(mv) => {
3074 let compute_instance =
3076 instance_snapshots.entry(mv.cluster_id).or_insert_with(|| {
3077 self.instance_snapshot(mv.cluster_id)
3078 .expect("compute instance exists")
3079 });
3080 let global_id = mv.global_id();
3081
3082 let (optimized_plan, physical_plan, metainfo) =
3083 match cached_global_exprs.remove(&global_id) {
3084 Some(global_expressions)
3085 if global_expressions.optimizer_features
3086 == optimizer_config.features =>
3087 {
3088 debug!("global expression cache hit for {global_id:?}");
3089 (
3090 global_expressions.global_mir,
3091 global_expressions.physical_plan,
3092 global_expressions.dataflow_metainfos,
3093 )
3094 }
3095 Some(_) | None => {
3096 let (_, internal_view_id) = self.allocate_transient_id();
3097 let debug_name = self
3098 .catalog()
3099 .resolve_full_name(entry.name(), None)
3100 .to_string();
3101 let force_non_monotonic = Default::default();
3102
3103 let (optimized_plan, global_lir_plan) = {
3104 let mut optimizer = optimize::materialized_view::Optimizer::new(
3106 self.owned_catalog().as_optimizer_catalog(),
3107 compute_instance.clone(),
3108 global_id,
3109 internal_view_id,
3110 mv.desc.iter_names().cloned().collect(),
3111 mv.non_null_assertions.clone(),
3112 mv.refresh_schedule.clone(),
3113 debug_name,
3114 optimizer_config.clone(),
3115 self.optimizer_metrics(),
3116 force_non_monotonic,
3117 );
3118
3119 let global_mir_plan =
3121 optimizer.optimize(mv.optimized_expr.as_ref().clone())?;
3122 let optimized_plan = global_mir_plan.df_desc().clone();
3123
3124 let global_lir_plan = optimizer.optimize(global_mir_plan)?;
3126
3127 (optimized_plan, global_lir_plan)
3128 };
3129
3130 let (physical_plan, metainfo) = global_lir_plan.unapply();
3131 let metainfo = {
3132 let notice_ids =
3134 std::iter::repeat_with(|| self.allocate_transient_id())
3135 .map(|(_item_id, global_id)| global_id)
3136 .take(metainfo.optimizer_notices.len())
3137 .collect::<Vec<_>>();
3138 self.catalog().render_notices(
3140 metainfo,
3141 notice_ids,
3142 Some(mv.global_id()),
3143 )
3144 };
3145 uncached_expressions.insert(
3146 global_id,
3147 GlobalExpressions {
3148 global_mir: optimized_plan.clone(),
3149 physical_plan: physical_plan.clone(),
3150 dataflow_metainfos: metainfo.clone(),
3151 optimizer_features: OptimizerFeatures::from(
3152 self.catalog().system_config(),
3153 ),
3154 },
3155 );
3156 (optimized_plan, physical_plan, metainfo)
3157 }
3158 };
3159
3160 let catalog = self.catalog_mut();
3161 catalog.set_optimized_plan(mv.global_id(), optimized_plan);
3162 catalog.set_physical_plan(mv.global_id(), physical_plan);
3163 catalog.set_dataflow_metainfo(mv.global_id(), metainfo);
3164
3165 compute_instance.insert_collection(mv.global_id());
3166 }
3167 CatalogItem::ContinualTask(ct) => {
3168 let compute_instance =
3169 instance_snapshots.entry(ct.cluster_id).or_insert_with(|| {
3170 self.instance_snapshot(ct.cluster_id)
3171 .expect("compute instance exists")
3172 });
3173 let global_id = ct.global_id();
3174
3175 let (optimized_plan, physical_plan, metainfo) =
3176 match cached_global_exprs.remove(&global_id) {
3177 Some(global_expressions)
3178 if global_expressions.optimizer_features
3179 == optimizer_config.features =>
3180 {
3181 debug!("global expression cache hit for {global_id:?}");
3182 (
3183 global_expressions.global_mir,
3184 global_expressions.physical_plan,
3185 global_expressions.dataflow_metainfos,
3186 )
3187 }
3188 Some(_) | None => {
3189 let debug_name = self
3190 .catalog()
3191 .resolve_full_name(entry.name(), None)
3192 .to_string();
3193 let (optimized_plan, physical_plan, metainfo) = self
3194 .optimize_create_continual_task(
3195 ct,
3196 global_id,
3197 self.owned_catalog(),
3198 debug_name,
3199 )?;
3200 uncached_expressions.insert(
3201 global_id,
3202 GlobalExpressions {
3203 global_mir: optimized_plan.clone(),
3204 physical_plan: physical_plan.clone(),
3205 dataflow_metainfos: metainfo.clone(),
3206 optimizer_features: OptimizerFeatures::from(
3207 self.catalog().system_config(),
3208 ),
3209 },
3210 );
3211 (optimized_plan, physical_plan, metainfo)
3212 }
3213 };
3214
3215 let catalog = self.catalog_mut();
3216 catalog.set_optimized_plan(ct.global_id(), optimized_plan);
3217 catalog.set_physical_plan(ct.global_id(), physical_plan);
3218 catalog.set_dataflow_metainfo(ct.global_id(), metainfo);
3219
3220 compute_instance.insert_collection(ct.global_id());
3221 }
3222 _ => (),
3223 }
3224 }
3225
3226 Ok(uncached_expressions)
3227 }
3228
3229 async fn bootstrap_dataflow_as_ofs(&mut self) -> BTreeMap<GlobalId, ReadHold<Timestamp>> {
3239 let mut catalog_ids = Vec::new();
3240 let mut dataflows = Vec::new();
3241 let mut read_policies = BTreeMap::new();
3242 for entry in self.catalog.entries() {
3243 let gid = match entry.item() {
3244 CatalogItem::Index(idx) => idx.global_id(),
3245 CatalogItem::MaterializedView(mv) => mv.global_id(),
3246 CatalogItem::ContinualTask(ct) => ct.global_id(),
3247 CatalogItem::Table(_)
3248 | CatalogItem::Source(_)
3249 | CatalogItem::Log(_)
3250 | CatalogItem::View(_)
3251 | CatalogItem::Sink(_)
3252 | CatalogItem::Type(_)
3253 | CatalogItem::Func(_)
3254 | CatalogItem::Secret(_)
3255 | CatalogItem::Connection(_) => continue,
3256 };
3257 if let Some(plan) = self.catalog.try_get_physical_plan(&gid) {
3258 catalog_ids.push(gid);
3259 dataflows.push(plan.clone());
3260
3261 if let Some(compaction_window) = entry.item().initial_logical_compaction_window() {
3262 read_policies.insert(gid, compaction_window.into());
3263 }
3264 }
3265 }
3266
3267 let read_ts = self.get_local_read_ts().await;
3268 let read_holds = as_of_selection::run(
3269 &mut dataflows,
3270 &read_policies,
3271 &*self.controller.storage_collections,
3272 read_ts,
3273 self.controller.read_only(),
3274 );
3275
3276 let catalog = self.catalog_mut();
3277 for (id, plan) in catalog_ids.into_iter().zip_eq(dataflows) {
3278 catalog.set_physical_plan(id, plan);
3279 }
3280
3281 read_holds
3282 }
3283
3284 fn serve(
3293 mut self,
3294 mut internal_cmd_rx: mpsc::UnboundedReceiver<Message>,
3295 mut strict_serializable_reads_rx: mpsc::UnboundedReceiver<(ConnectionId, PendingReadTxn)>,
3296 mut cmd_rx: mpsc::UnboundedReceiver<(OpenTelemetryContext, Command)>,
3297 group_commit_rx: appends::GroupCommitWaiter,
3298 ) -> LocalBoxFuture<'static, ()> {
3299 async move {
3300 let mut cluster_events = self.controller.events_stream();
3302 let last_message = Arc::new(Mutex::new(LastMessage {
3303 kind: "none",
3304 stmt: None,
3305 }));
3306
3307 let (idle_tx, mut idle_rx) = tokio::sync::mpsc::channel(1);
3308 let idle_metric = self.metrics.queue_busy_seconds.clone();
3309 let last_message_watchdog = Arc::clone(&last_message);
3310
3311 spawn(|| "coord watchdog", async move {
3312 let mut interval = tokio::time::interval(Duration::from_secs(5));
3317 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
3321
3322 let mut coord_stuck = false;
3324
3325 loop {
3326 interval.tick().await;
3327
3328 let duration = tokio::time::Duration::from_secs(30);
3330 let timeout = tokio::time::timeout(duration, idle_tx.reserve()).await;
3331 let Ok(maybe_permit) = timeout else {
3332 if !coord_stuck {
3334 let last_message = last_message_watchdog.lock().expect("poisoned");
3335 tracing::warn!(
3336 last_message_kind = %last_message.kind,
3337 last_message_sql = %last_message.stmt_to_string(),
3338 "coordinator stuck for {duration:?}",
3339 );
3340 }
3341 coord_stuck = true;
3342
3343 continue;
3344 };
3345
3346 if coord_stuck {
3348 tracing::info!("Coordinator became unstuck");
3349 }
3350 coord_stuck = false;
3351
3352 let Ok(permit) = maybe_permit else {
3354 break;
3355 };
3356
3357 permit.send(idle_metric.start_timer());
3358 }
3359 });
3360
3361 self.schedule_storage_usage_collection().await;
3362 self.spawn_privatelink_vpc_endpoints_watch_task();
3363 self.spawn_statement_logging_task();
3364 flags::tracing_config(self.catalog.system_config()).apply(&self.tracing_handle);
3365
3366 let warn_threshold = self
3368 .catalog()
3369 .system_config()
3370 .coord_slow_message_warn_threshold();
3371
3372 const MESSAGE_BATCH: usize = 64;
3374 let mut messages = Vec::with_capacity(MESSAGE_BATCH);
3375 let mut cmd_messages = Vec::with_capacity(MESSAGE_BATCH);
3376
3377 let message_batch = self.metrics.message_batch.clone();
3378
3379 loop {
3380 select! {
3384 biased;
3389
3390 _ = internal_cmd_rx.recv_many(&mut messages, MESSAGE_BATCH) => {},
3394 Some(event) = cluster_events.next() => messages.push(Message::ClusterEvent(event)),
3398 () = self.controller.ready() => {
3402 let controller = match self.controller.get_readiness() {
3406 Readiness::Storage => ControllerReadiness::Storage,
3407 Readiness::Compute => ControllerReadiness::Compute,
3408 Readiness::Metrics(_) => ControllerReadiness::Metrics,
3409 Readiness::Internal(_) => ControllerReadiness::Internal,
3410 Readiness::NotReady => unreachable!("just signaled as ready"),
3411 };
3412 messages.push(Message::ControllerReady { controller });
3413 }
3414 permit = group_commit_rx.ready() => {
3417 let user_write_spans = self.pending_writes.iter().flat_map(|x| match x {
3423 PendingWriteTxn::User{span, ..} => Some(span),
3424 PendingWriteTxn::System{..} => None,
3425 });
3426 let span = match user_write_spans.exactly_one() {
3427 Ok(span) => span.clone(),
3428 Err(user_write_spans) => {
3429 let span = info_span!(parent: None, "group_commit_notify");
3430 for s in user_write_spans {
3431 span.follows_from(s);
3432 }
3433 span
3434 }
3435 };
3436 messages.push(Message::GroupCommitInitiate(span, Some(permit)));
3437 },
3438 count = cmd_rx.recv_many(&mut cmd_messages, MESSAGE_BATCH) => {
3442 if count == 0 {
3443 break;
3444 } else {
3445 messages.extend(cmd_messages.drain(..).map(|(otel_ctx, cmd)| Message::Command(otel_ctx, cmd)));
3446 }
3447 },
3448 Some(pending_read_txn) = strict_serializable_reads_rx.recv() => {
3452 let mut pending_read_txns = vec![pending_read_txn];
3453 while let Ok(pending_read_txn) = strict_serializable_reads_rx.try_recv() {
3454 pending_read_txns.push(pending_read_txn);
3455 }
3456 for (conn_id, pending_read_txn) in pending_read_txns {
3457 let prev = self.pending_linearize_read_txns.insert(conn_id, pending_read_txn);
3458 soft_assert_or_log!(
3459 prev.is_none(),
3460 "connections can not have multiple concurrent reads, prev: {prev:?}"
3461 )
3462 }
3463 messages.push(Message::LinearizeReads);
3464 }
3465 _ = self.advance_timelines_interval.tick() => {
3469 let span = info_span!(parent: None, "coord::advance_timelines_interval");
3470 span.follows_from(Span::current());
3471
3472 if self.controller.read_only() {
3477 messages.push(Message::AdvanceTimelines);
3478 } else {
3479 messages.push(Message::GroupCommitInitiate(span, None));
3480 }
3481 },
3482 _ = self.check_cluster_scheduling_policies_interval.tick() => {
3486 messages.push(Message::CheckSchedulingPolicies);
3487 },
3488
3489 _ = self.caught_up_check_interval.tick() => {
3493 self.maybe_check_caught_up().await;
3498
3499 continue;
3500 },
3501
3502 timer = idle_rx.recv() => {
3507 timer.expect("does not drop").observe_duration();
3508 self.metrics
3509 .message_handling
3510 .with_label_values(&["watchdog"])
3511 .observe(0.0);
3512 continue;
3513 }
3514 };
3515
3516 message_batch.observe(f64::cast_lossy(messages.len()));
3518
3519 for msg in messages.drain(..) {
3520 let msg_kind = msg.kind();
3523 let span = span!(
3524 target: "mz_adapter::coord::handle_message_loop",
3525 Level::INFO,
3526 "coord::handle_message",
3527 kind = msg_kind
3528 );
3529 let otel_context = span.context().span().span_context().clone();
3530
3531 *last_message.lock().expect("poisoned") = LastMessage {
3535 kind: msg_kind,
3536 stmt: match &msg {
3537 Message::Command(
3538 _,
3539 Command::Execute {
3540 portal_name,
3541 session,
3542 ..
3543 },
3544 ) => session
3545 .get_portal_unverified(portal_name)
3546 .and_then(|p| p.stmt.as_ref().map(Arc::clone)),
3547 _ => None,
3548 },
3549 };
3550
3551 let start = Instant::now();
3552 self.handle_message(msg).instrument(span).await;
3553 let duration = start.elapsed();
3554
3555 self.metrics
3556 .message_handling
3557 .with_label_values(&[msg_kind])
3558 .observe(duration.as_secs_f64());
3559
3560 if duration > warn_threshold {
3562 let trace_id = otel_context.is_valid().then(|| otel_context.trace_id());
3563 tracing::error!(
3564 ?msg_kind,
3565 ?trace_id,
3566 ?duration,
3567 "very slow coordinator message"
3568 );
3569 }
3570 }
3571 }
3572 if let Some(catalog) = Arc::into_inner(self.catalog) {
3575 catalog.expire().await;
3576 }
3577 }
3578 .boxed_local()
3579 }
3580
3581 fn catalog(&self) -> &Catalog {
3583 &self.catalog
3584 }
3585
3586 fn owned_catalog(&self) -> Arc<Catalog> {
3589 Arc::clone(&self.catalog)
3590 }
3591
3592 fn optimizer_metrics(&self) -> OptimizerMetrics {
3595 self.optimizer_metrics.clone()
3596 }
3597
3598 fn catalog_mut(&mut self) -> &mut Catalog {
3600 Arc::make_mut(&mut self.catalog)
3608 }
3609
3610 fn connection_context(&self) -> &ConnectionContext {
3612 self.controller.connection_context()
3613 }
3614
3615 fn secrets_reader(&self) -> &Arc<dyn SecretsReader> {
3617 &self.connection_context().secrets_reader
3618 }
3619
3620 #[allow(dead_code)]
3625 pub(crate) fn broadcast_notice(&self, notice: AdapterNotice) {
3626 for meta in self.active_conns.values() {
3627 let _ = meta.notice_tx.send(notice.clone());
3628 }
3629 }
3630
3631 pub(crate) fn broadcast_notice_tx(
3634 &self,
3635 ) -> Box<dyn FnOnce(AdapterNotice) -> () + Send + 'static> {
3636 let senders: Vec<_> = self
3637 .active_conns
3638 .values()
3639 .map(|meta| meta.notice_tx.clone())
3640 .collect();
3641 Box::new(move |notice| {
3642 for tx in senders {
3643 let _ = tx.send(notice.clone());
3644 }
3645 })
3646 }
3647
3648 pub(crate) fn active_conns(&self) -> &BTreeMap<ConnectionId, ConnMeta> {
3649 &self.active_conns
3650 }
3651
3652 #[instrument(level = "debug")]
3653 pub(crate) fn retire_execution(
3654 &mut self,
3655 reason: StatementEndedExecutionReason,
3656 ctx_extra: ExecuteContextExtra,
3657 ) {
3658 if let Some(uuid) = ctx_extra.retire() {
3659 self.end_statement_execution(uuid, reason);
3660 }
3661 }
3662
3663 #[instrument(level = "debug")]
3665 pub fn dataflow_builder(&self, instance: ComputeInstanceId) -> DataflowBuilder<'_> {
3666 let compute = self
3667 .instance_snapshot(instance)
3668 .expect("compute instance does not exist");
3669 DataflowBuilder::new(self.catalog().state(), compute)
3670 }
3671
3672 pub fn instance_snapshot(
3674 &self,
3675 id: ComputeInstanceId,
3676 ) -> Result<ComputeInstanceSnapshot, InstanceMissing> {
3677 ComputeInstanceSnapshot::new(&self.controller, id)
3678 }
3679
3680 pub(crate) async fn ship_dataflow(
3683 &mut self,
3684 dataflow: DataflowDescription<Plan>,
3685 instance: ComputeInstanceId,
3686 subscribe_target_replica: Option<ReplicaId>,
3687 ) {
3688 let export_ids = dataflow.exported_index_ids().collect();
3691
3692 self.controller
3693 .compute
3694 .create_dataflow(instance, dataflow, subscribe_target_replica)
3695 .unwrap_or_terminate("dataflow creation cannot fail");
3696
3697 self.initialize_compute_read_policies(export_ids, instance, CompactionWindow::Default)
3698 .await;
3699 }
3700
3701 pub(crate) fn allow_writes(&mut self, instance: ComputeInstanceId, id: GlobalId) {
3705 self.controller
3706 .compute
3707 .allow_writes(instance, id)
3708 .unwrap_or_terminate("allow_writes cannot fail");
3709 }
3710
3711 pub(crate) async fn ship_dataflow_and_notice_builtin_table_updates(
3713 &mut self,
3714 dataflow: DataflowDescription<Plan>,
3715 instance: ComputeInstanceId,
3716 notice_builtin_updates_fut: Option<BuiltinTableAppendNotify>,
3717 ) {
3718 if let Some(notice_builtin_updates_fut) = notice_builtin_updates_fut {
3719 let ship_dataflow_fut = self.ship_dataflow(dataflow, instance, None);
3720 let ((), ()) =
3721 futures::future::join(notice_builtin_updates_fut, ship_dataflow_fut).await;
3722 } else {
3723 self.ship_dataflow(dataflow, instance, None).await;
3724 }
3725 }
3726
3727 pub fn install_compute_watch_set(
3731 &mut self,
3732 conn_id: ConnectionId,
3733 objects: BTreeSet<GlobalId>,
3734 t: Timestamp,
3735 state: WatchSetResponse,
3736 ) {
3737 let ws_id = self.controller.install_compute_watch_set(objects, t);
3738 self.connection_watch_sets
3739 .entry(conn_id.clone())
3740 .or_default()
3741 .insert(ws_id);
3742 self.installed_watch_sets.insert(ws_id, (conn_id, state));
3743 }
3744
3745 pub fn install_storage_watch_set(
3749 &mut self,
3750 conn_id: ConnectionId,
3751 objects: BTreeSet<GlobalId>,
3752 t: Timestamp,
3753 state: WatchSetResponse,
3754 ) {
3755 let ws_id = self.controller.install_storage_watch_set(objects, t);
3756 self.connection_watch_sets
3757 .entry(conn_id.clone())
3758 .or_default()
3759 .insert(ws_id);
3760 self.installed_watch_sets.insert(ws_id, (conn_id, state));
3761 }
3762
3763 pub fn cancel_pending_watchsets(&mut self, conn_id: &ConnectionId) {
3765 if let Some(ws_ids) = self.connection_watch_sets.remove(conn_id) {
3766 for ws_id in ws_ids {
3767 self.installed_watch_sets.remove(&ws_id);
3768 }
3769 }
3770 }
3771
3772 pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
3776 let global_timelines: BTreeMap<_, _> = self
3782 .global_timelines
3783 .iter()
3784 .map(|(timeline, state)| (timeline.to_string(), format!("{state:?}")))
3785 .collect();
3786 let active_conns: BTreeMap<_, _> = self
3787 .active_conns
3788 .iter()
3789 .map(|(id, meta)| (id.unhandled().to_string(), format!("{meta:?}")))
3790 .collect();
3791 let txn_read_holds: BTreeMap<_, _> = self
3792 .txn_read_holds
3793 .iter()
3794 .map(|(id, capability)| (id.unhandled().to_string(), format!("{capability:?}")))
3795 .collect();
3796 let pending_peeks: BTreeMap<_, _> = self
3797 .pending_peeks
3798 .iter()
3799 .map(|(id, peek)| (id.to_string(), format!("{peek:?}")))
3800 .collect();
3801 let client_pending_peeks: BTreeMap<_, _> = self
3802 .client_pending_peeks
3803 .iter()
3804 .map(|(id, peek)| {
3805 let peek: BTreeMap<_, _> = peek
3806 .iter()
3807 .map(|(uuid, storage_id)| (uuid.to_string(), storage_id))
3808 .collect();
3809 (id.to_string(), peek)
3810 })
3811 .collect();
3812 let pending_linearize_read_txns: BTreeMap<_, _> = self
3813 .pending_linearize_read_txns
3814 .iter()
3815 .map(|(id, read_txn)| (id.unhandled().to_string(), format!("{read_txn:?}")))
3816 .collect();
3817
3818 let map = serde_json::Map::from_iter([
3819 (
3820 "global_timelines".to_string(),
3821 serde_json::to_value(global_timelines)?,
3822 ),
3823 (
3824 "active_conns".to_string(),
3825 serde_json::to_value(active_conns)?,
3826 ),
3827 (
3828 "txn_read_holds".to_string(),
3829 serde_json::to_value(txn_read_holds)?,
3830 ),
3831 (
3832 "pending_peeks".to_string(),
3833 serde_json::to_value(pending_peeks)?,
3834 ),
3835 (
3836 "client_pending_peeks".to_string(),
3837 serde_json::to_value(client_pending_peeks)?,
3838 ),
3839 (
3840 "pending_linearize_read_txns".to_string(),
3841 serde_json::to_value(pending_linearize_read_txns)?,
3842 ),
3843 ("controller".to_string(), self.controller.dump().await?),
3844 ]);
3845 Ok(serde_json::Value::Object(map))
3846 }
3847
3848 async fn prune_storage_usage_events_on_startup(&self, retention_period: Duration) {
3862 let item_id = self
3863 .catalog()
3864 .resolve_builtin_table(&MZ_STORAGE_USAGE_BY_SHARD);
3865 let global_id = self.catalog.get_entry(&item_id).latest_global_id();
3866 let read_ts = self.get_local_read_ts().await;
3867 let current_contents_fut = self
3868 .controller
3869 .storage_collections
3870 .snapshot(global_id, read_ts);
3871 let internal_cmd_tx = self.internal_cmd_tx.clone();
3872 spawn(|| "storage_usage_prune", async move {
3873 let mut current_contents = current_contents_fut
3874 .await
3875 .unwrap_or_terminate("cannot fail to fetch snapshot");
3876 differential_dataflow::consolidation::consolidate(&mut current_contents);
3877
3878 let cutoff_ts = u128::from(read_ts).saturating_sub(retention_period.as_millis());
3879 let mut expired = Vec::new();
3880 for (row, diff) in current_contents {
3881 assert_eq!(
3882 diff, 1,
3883 "consolidated contents should not contain retractions: ({row:#?}, {diff:#?})"
3884 );
3885 let collection_timestamp = row
3887 .unpack()
3888 .get(3)
3889 .expect("definition of mz_storage_by_shard changed")
3890 .unwrap_timestamptz();
3891 let collection_timestamp = collection_timestamp.timestamp_millis();
3892 let collection_timestamp: u128 = collection_timestamp
3893 .try_into()
3894 .expect("all collections happen after Jan 1 1970");
3895 if collection_timestamp < cutoff_ts {
3896 debug!("pruning storage event {row:?}");
3897 let builtin_update = BuiltinTableUpdate::row(item_id, row, Diff::MINUS_ONE);
3898 expired.push(builtin_update);
3899 }
3900 }
3901
3902 let _ = internal_cmd_tx.send(Message::StorageUsagePrune(expired));
3904 });
3905 }
3906
3907 fn current_credit_consumption_rate(&self) -> Numeric {
3908 self.catalog()
3909 .user_cluster_replicas()
3910 .filter_map(|replica| match &replica.config.location {
3911 ReplicaLocation::Managed(location) => Some(location.size_for_billing()),
3912 ReplicaLocation::Unmanaged(_) => None,
3913 })
3914 .map(|size| {
3915 self.catalog()
3916 .cluster_replica_sizes()
3917 .0
3918 .get(size)
3919 .expect("location size is validated against the cluster replica sizes")
3920 .credits_per_hour
3921 })
3922 .sum()
3923 }
3924}
3925
3926#[cfg(test)]
3927impl Coordinator {
3928 #[allow(dead_code)]
3929 async fn verify_ship_dataflow_no_error(&mut self, dataflow: DataflowDescription<Plan>) {
3930 let compute_instance = ComputeInstanceId::user(1).expect("1 is a valid ID");
3938
3939 let _: () = self.ship_dataflow(dataflow, compute_instance, None).await;
3940 }
3941}
3942
3943struct LastMessage {
3945 kind: &'static str,
3946 stmt: Option<Arc<Statement<Raw>>>,
3947}
3948
3949impl LastMessage {
3950 fn stmt_to_string(&self) -> Cow<'static, str> {
3952 self.stmt
3953 .as_ref()
3954 .map(|stmt| stmt.to_ast_string_redacted().into())
3955 .unwrap_or(Cow::Borrowed("<none>"))
3956 }
3957}
3958
3959impl fmt::Debug for LastMessage {
3960 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3961 f.debug_struct("LastMessage")
3962 .field("kind", &self.kind)
3963 .field("stmt", &self.stmt_to_string())
3964 .finish()
3965 }
3966}
3967
3968impl Drop for LastMessage {
3969 fn drop(&mut self) {
3970 if std::thread::panicking() {
3972 eprintln!("Coordinator panicking, dumping last message\n{self:?}",);
3974 }
3975 }
3976}
3977
3978pub fn serve(
3990 Config {
3991 controller_config,
3992 controller_envd_epoch,
3993 mut storage,
3994 audit_logs_iterator,
3995 timestamp_oracle_url,
3996 unsafe_mode,
3997 all_features,
3998 build_info,
3999 environment_id,
4000 metrics_registry,
4001 now,
4002 secrets_controller,
4003 cloud_resource_controller,
4004 cluster_replica_sizes,
4005 builtin_system_cluster_config,
4006 builtin_catalog_server_cluster_config,
4007 builtin_probe_cluster_config,
4008 builtin_support_cluster_config,
4009 builtin_analytics_cluster_config,
4010 system_parameter_defaults,
4011 availability_zones,
4012 storage_usage_client,
4013 storage_usage_collection_interval,
4014 storage_usage_retention_period,
4015 segment_client,
4016 egress_addresses,
4017 aws_account_id,
4018 aws_privatelink_availability_zones,
4019 connection_context,
4020 connection_limit_callback,
4021 remote_system_parameters,
4022 webhook_concurrency_limit,
4023 http_host_name,
4024 tracing_handle,
4025 read_only_controllers,
4026 caught_up_trigger: clusters_caught_up_trigger,
4027 helm_chart_version,
4028 license_key,
4029 external_login_password_mz_system,
4030 force_builtin_schema_migration,
4031 }: Config,
4032) -> BoxFuture<'static, Result<(Handle, Client), AdapterError>> {
4033 async move {
4034 let coord_start = Instant::now();
4035 info!("startup: coordinator init: beginning");
4036 info!("startup: coordinator init: preamble beginning");
4037
4038 let _builtins = LazyLock::force(&BUILTINS_STATIC);
4042
4043 let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
4044 let (internal_cmd_tx, internal_cmd_rx) = mpsc::unbounded_channel();
4045 let (strict_serializable_reads_tx, strict_serializable_reads_rx) =
4046 mpsc::unbounded_channel();
4047
4048 if !availability_zones.iter().all_unique() {
4050 coord_bail!("availability zones must be unique");
4051 }
4052
4053 let aws_principal_context = match (
4054 aws_account_id,
4055 connection_context.aws_external_id_prefix.clone(),
4056 ) {
4057 (Some(aws_account_id), Some(aws_external_id_prefix)) => Some(AwsPrincipalContext {
4058 aws_account_id,
4059 aws_external_id_prefix,
4060 }),
4061 _ => None,
4062 };
4063
4064 let aws_privatelink_availability_zones = aws_privatelink_availability_zones
4065 .map(|azs_vec| BTreeSet::from_iter(azs_vec.iter().cloned()));
4066
4067 info!(
4068 "startup: coordinator init: preamble complete in {:?}",
4069 coord_start.elapsed()
4070 );
4071 let oracle_init_start = Instant::now();
4072 info!("startup: coordinator init: timestamp oracle init beginning");
4073
4074 let pg_timestamp_oracle_config = timestamp_oracle_url
4075 .map(|pg_url| PostgresTimestampOracleConfig::new(&pg_url, &metrics_registry));
4076 let mut initial_timestamps =
4077 get_initial_oracle_timestamps(&pg_timestamp_oracle_config).await?;
4078
4079 initial_timestamps
4083 .entry(Timeline::EpochMilliseconds)
4084 .or_insert_with(mz_repr::Timestamp::minimum);
4085 let mut timestamp_oracles = BTreeMap::new();
4086 for (timeline, initial_timestamp) in initial_timestamps {
4087 Coordinator::ensure_timeline_state_with_initial_time(
4088 &timeline,
4089 initial_timestamp,
4090 now.clone(),
4091 pg_timestamp_oracle_config.clone(),
4092 &mut timestamp_oracles,
4093 read_only_controllers,
4094 )
4095 .await;
4096 }
4097
4098 let catalog_upper = storage.current_upper().await;
4102 let epoch_millis_oracle = ×tamp_oracles
4108 .get(&Timeline::EpochMilliseconds)
4109 .expect("inserted above")
4110 .oracle;
4111
4112 let mut boot_ts = if read_only_controllers {
4113 let read_ts = epoch_millis_oracle.read_ts().await;
4114 std::cmp::max(read_ts, catalog_upper)
4115 } else {
4116 epoch_millis_oracle.apply_write(catalog_upper).await;
4119 epoch_millis_oracle.write_ts().await.timestamp
4120 };
4121
4122 info!(
4123 "startup: coordinator init: timestamp oracle init complete in {:?}",
4124 oracle_init_start.elapsed()
4125 );
4126
4127 let catalog_open_start = Instant::now();
4128 info!("startup: coordinator init: catalog open beginning");
4129 let persist_client = controller_config
4130 .persist_clients
4131 .open(controller_config.persist_location.clone())
4132 .await
4133 .context("opening persist client")?;
4134 let builtin_item_migration_config =
4135 BuiltinItemMigrationConfig {
4136 persist_client: persist_client.clone(),
4137 read_only: read_only_controllers,
4138 force_migration: force_builtin_schema_migration,
4139 }
4140 ;
4141 let OpenCatalogResult {
4142 mut catalog,
4143 migrated_storage_collections_0dt,
4144 new_builtin_collections,
4145 builtin_table_updates,
4146 cached_global_exprs,
4147 uncached_local_exprs,
4148 } = Catalog::open(mz_catalog::config::Config {
4149 storage,
4150 metrics_registry: &metrics_registry,
4151 state: mz_catalog::config::StateConfig {
4152 unsafe_mode,
4153 all_features,
4154 build_info,
4155 deploy_generation: controller_config.deploy_generation,
4156 environment_id: environment_id.clone(),
4157 read_only: read_only_controllers,
4158 now: now.clone(),
4159 boot_ts: boot_ts.clone(),
4160 skip_migrations: false,
4161 cluster_replica_sizes,
4162 builtin_system_cluster_config,
4163 builtin_catalog_server_cluster_config,
4164 builtin_probe_cluster_config,
4165 builtin_support_cluster_config,
4166 builtin_analytics_cluster_config,
4167 system_parameter_defaults,
4168 remote_system_parameters,
4169 availability_zones,
4170 egress_addresses,
4171 aws_principal_context,
4172 aws_privatelink_availability_zones,
4173 connection_context,
4174 http_host_name,
4175 builtin_item_migration_config,
4176 persist_client: persist_client.clone(),
4177 enable_expression_cache_override: None,
4178 helm_chart_version,
4179 external_login_password_mz_system,
4180 license_key: license_key.clone(),
4181 },
4182 })
4183 .await?;
4184
4185 let catalog_upper = catalog.current_upper().await;
4188 boot_ts = std::cmp::max(boot_ts, catalog_upper);
4189
4190 if !read_only_controllers {
4191 epoch_millis_oracle.apply_write(boot_ts).await;
4192 }
4193
4194 info!(
4195 "startup: coordinator init: catalog open complete in {:?}",
4196 catalog_open_start.elapsed()
4197 );
4198
4199 let coord_thread_start = Instant::now();
4200 info!("startup: coordinator init: coordinator thread start beginning");
4201
4202 let session_id = catalog.config().session_id;
4203 let start_instant = catalog.config().start_instant;
4204
4205 let (bootstrap_tx, bootstrap_rx) = oneshot::channel();
4209 let handle = TokioHandle::current();
4210
4211 let metrics = Metrics::register_into(&metrics_registry);
4212 let metrics_clone = metrics.clone();
4213 let optimizer_metrics = OptimizerMetrics::register_into(
4214 &metrics_registry,
4215 catalog.system_config().optimizer_e2e_latency_warning_threshold(),
4216 );
4217 let segment_client_clone = segment_client.clone();
4218 let coord_now = now.clone();
4219 let advance_timelines_interval = tokio::time::interval(catalog.config().timestamp_interval);
4220 let mut check_scheduling_policies_interval = tokio::time::interval(
4221 catalog
4222 .system_config()
4223 .cluster_check_scheduling_policies_interval(),
4224 );
4225 check_scheduling_policies_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
4226
4227 let clusters_caught_up_check_interval = if read_only_controllers {
4228 let dyncfgs = catalog.system_config().dyncfgs();
4229 let interval = WITH_0DT_DEPLOYMENT_CAUGHT_UP_CHECK_INTERVAL.get(dyncfgs);
4230
4231 let mut interval = tokio::time::interval(interval);
4232 interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
4233 interval
4234 } else {
4235 let mut interval = tokio::time::interval(Duration::from_secs(60 * 60));
4243 interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
4244 interval
4245 };
4246
4247 let clusters_caught_up_check =
4248 clusters_caught_up_trigger.map(|trigger| CaughtUpCheckContext {
4249 trigger,
4250 exclude_collections: new_builtin_collections.into_iter().collect(),
4251 });
4252
4253 if let Some(config) = pg_timestamp_oracle_config.as_ref() {
4254 let pg_timestamp_oracle_params =
4257 flags::pg_timstamp_oracle_config(catalog.system_config());
4258 pg_timestamp_oracle_params.apply(config);
4259 }
4260
4261 let connection_limit_callback: Arc<dyn Fn(&SystemVars) + Send + Sync> =
4264 Arc::new(move |system_vars: &SystemVars| {
4265 let limit: u64 = system_vars.max_connections().cast_into();
4266 let superuser_reserved: u64 =
4267 system_vars.superuser_reserved_connections().cast_into();
4268
4269 let superuser_reserved = if superuser_reserved >= limit {
4274 tracing::warn!(
4275 "superuser_reserved ({superuser_reserved}) is greater than max connections ({limit})!"
4276 );
4277 limit
4278 } else {
4279 superuser_reserved
4280 };
4281
4282 (connection_limit_callback)(limit, superuser_reserved);
4283 });
4284 catalog.system_config_mut().register_callback(
4285 &mz_sql::session::vars::MAX_CONNECTIONS,
4286 Arc::clone(&connection_limit_callback),
4287 );
4288 catalog.system_config_mut().register_callback(
4289 &mz_sql::session::vars::SUPERUSER_RESERVED_CONNECTIONS,
4290 connection_limit_callback,
4291 );
4292
4293 let (group_commit_tx, group_commit_rx) = appends::notifier();
4294
4295 let parent_span = tracing::Span::current();
4296 let thread = thread::Builder::new()
4297 .stack_size(3 * stack::STACK_SIZE)
4301 .name("coordinator".to_string())
4302 .spawn(move || {
4303 let span = info_span!(parent: parent_span, "coord::coordinator").entered();
4304
4305 let controller = handle
4306 .block_on({
4307 catalog.initialize_controller(
4308 controller_config,
4309 controller_envd_epoch,
4310 read_only_controllers,
4311 )
4312 })
4313 .unwrap_or_terminate("failed to initialize storage_controller");
4314 let catalog_upper = handle.block_on(catalog.current_upper());
4317 boot_ts = std::cmp::max(boot_ts, catalog_upper);
4318 if !read_only_controllers {
4319 let epoch_millis_oracle = ×tamp_oracles
4320 .get(&Timeline::EpochMilliseconds)
4321 .expect("inserted above")
4322 .oracle;
4323 handle.block_on(epoch_millis_oracle.apply_write(boot_ts));
4324 }
4325
4326 let catalog = Arc::new(catalog);
4327
4328 let caching_secrets_reader = CachingSecretsReader::new(secrets_controller.reader());
4329 let mut coord = Coordinator {
4330 controller,
4331 catalog,
4332 internal_cmd_tx,
4333 group_commit_tx,
4334 strict_serializable_reads_tx,
4335 global_timelines: timestamp_oracles,
4336 transient_id_gen: Arc::new(TransientIdGen::new()),
4337 active_conns: BTreeMap::new(),
4338 txn_read_holds: Default::default(),
4339 pending_peeks: BTreeMap::new(),
4340 client_pending_peeks: BTreeMap::new(),
4341 pending_linearize_read_txns: BTreeMap::new(),
4342 serialized_ddl: LockedVecDeque::new(),
4343 active_compute_sinks: BTreeMap::new(),
4344 active_webhooks: BTreeMap::new(),
4345 active_copies: BTreeMap::new(),
4346 staged_cancellation: BTreeMap::new(),
4347 introspection_subscribes: BTreeMap::new(),
4348 write_locks: BTreeMap::new(),
4349 deferred_write_ops: BTreeMap::new(),
4350 pending_writes: Vec::new(),
4351 advance_timelines_interval,
4352 secrets_controller,
4353 caching_secrets_reader,
4354 cloud_resource_controller,
4355 storage_usage_client,
4356 storage_usage_collection_interval,
4357 segment_client,
4358 metrics,
4359 optimizer_metrics,
4360 tracing_handle,
4361 statement_logging: StatementLogging::new(coord_now.clone()),
4362 webhook_concurrency_limit,
4363 pg_timestamp_oracle_config,
4364 check_cluster_scheduling_policies_interval: check_scheduling_policies_interval,
4365 cluster_scheduling_decisions: BTreeMap::new(),
4366 caught_up_check_interval: clusters_caught_up_check_interval,
4367 caught_up_check: clusters_caught_up_check,
4368 installed_watch_sets: BTreeMap::new(),
4369 connection_watch_sets: BTreeMap::new(),
4370 cluster_replica_statuses: ClusterReplicaStatuses::new(),
4371 read_only_controllers,
4372 buffered_builtin_table_updates: Some(Vec::new()),
4373 license_key,
4374 persist_client,
4375 };
4376 let bootstrap = handle.block_on(async {
4377 coord
4378 .bootstrap(
4379 boot_ts,
4380 migrated_storage_collections_0dt,
4381 builtin_table_updates,
4382 cached_global_exprs,
4383 uncached_local_exprs,
4384 audit_logs_iterator,
4385 )
4386 .await?;
4387 coord
4388 .controller
4389 .remove_orphaned_replicas(
4390 coord.catalog().get_next_user_replica_id().await?,
4391 coord.catalog().get_next_system_replica_id().await?,
4392 )
4393 .await
4394 .map_err(AdapterError::Orchestrator)?;
4395
4396 if let Some(retention_period) = storage_usage_retention_period {
4397 coord
4398 .prune_storage_usage_events_on_startup(retention_period)
4399 .await;
4400 }
4401
4402 Ok(())
4403 });
4404 let ok = bootstrap.is_ok();
4405 drop(span);
4406 bootstrap_tx
4407 .send(bootstrap)
4408 .expect("bootstrap_rx is not dropped until it receives this message");
4409 if ok {
4410 handle.block_on(coord.serve(
4411 internal_cmd_rx,
4412 strict_serializable_reads_rx,
4413 cmd_rx,
4414 group_commit_rx,
4415 ));
4416 }
4417 })
4418 .expect("failed to create coordinator thread");
4419 match bootstrap_rx
4420 .await
4421 .expect("bootstrap_tx always sends a message or panics/halts")
4422 {
4423 Ok(()) => {
4424 info!(
4425 "startup: coordinator init: coordinator thread start complete in {:?}",
4426 coord_thread_start.elapsed()
4427 );
4428 info!(
4429 "startup: coordinator init: complete in {:?}",
4430 coord_start.elapsed()
4431 );
4432 let handle = Handle {
4433 session_id,
4434 start_instant,
4435 _thread: thread.join_on_drop(),
4436 };
4437 let client = Client::new(
4438 build_info,
4439 cmd_tx.clone(),
4440 metrics_clone,
4441 now,
4442 environment_id,
4443 segment_client_clone,
4444 );
4445 Ok((handle, client))
4446 }
4447 Err(e) => Err(e),
4448 }
4449 }
4450 .boxed()
4451}
4452
4453async fn get_initial_oracle_timestamps(
4467 pg_timestamp_oracle_config: &Option<PostgresTimestampOracleConfig>,
4468) -> Result<BTreeMap<Timeline, Timestamp>, AdapterError> {
4469 let mut initial_timestamps = BTreeMap::new();
4470
4471 if let Some(pg_timestamp_oracle_config) = pg_timestamp_oracle_config {
4472 let postgres_oracle_timestamps =
4473 PostgresTimestampOracle::<NowFn>::get_all_timelines(pg_timestamp_oracle_config.clone())
4474 .await?;
4475
4476 let debug_msg = || {
4477 postgres_oracle_timestamps
4478 .iter()
4479 .map(|(timeline, ts)| format!("{:?} -> {}", timeline, ts))
4480 .join(", ")
4481 };
4482 info!(
4483 "current timestamps from the postgres-backed timestamp oracle: {}",
4484 debug_msg()
4485 );
4486
4487 for (timeline, ts) in postgres_oracle_timestamps {
4488 let entry = initial_timestamps
4489 .entry(Timeline::from_str(&timeline).expect("could not parse timeline"));
4490
4491 entry
4492 .and_modify(|current_ts| *current_ts = std::cmp::max(*current_ts, ts))
4493 .or_insert(ts);
4494 }
4495 } else {
4496 info!("no postgres url for postgres-backed timestamp oracle configured!");
4497 };
4498
4499 let debug_msg = || {
4500 initial_timestamps
4501 .iter()
4502 .map(|(timeline, ts)| format!("{:?}: {}", timeline, ts))
4503 .join(", ")
4504 };
4505 info!("initial oracle timestamps: {}", debug_msg());
4506
4507 Ok(initial_timestamps)
4508}
4509
4510#[instrument]
4511pub async fn load_remote_system_parameters(
4512 storage: &mut Box<dyn OpenableDurableCatalogState>,
4513 system_parameter_sync_config: Option<SystemParameterSyncConfig>,
4514 system_parameter_sync_timeout: Duration,
4515) -> Result<Option<BTreeMap<String, String>>, AdapterError> {
4516 if let Some(system_parameter_sync_config) = system_parameter_sync_config {
4517 tracing::info!("parameter sync on boot: start sync");
4518
4519 let mut params = SynchronizedParameters::new(SystemVars::default());
4559 let frontend_sync = async {
4560 let frontend = SystemParameterFrontend::from(&system_parameter_sync_config).await?;
4561 frontend.pull(&mut params);
4562 let ops = params
4563 .modified()
4564 .into_iter()
4565 .map(|param| {
4566 let name = param.name;
4567 let value = param.value;
4568 tracing::info!(name, value, initial = true, "sync parameter");
4569 (name, value)
4570 })
4571 .collect();
4572 tracing::info!("parameter sync on boot: end sync");
4573 Ok(Some(ops))
4574 };
4575 if !storage.has_system_config_synced_once().await? {
4576 frontend_sync.await
4577 } else {
4578 match mz_ore::future::timeout(system_parameter_sync_timeout, frontend_sync).await {
4579 Ok(ops) => Ok(ops),
4580 Err(TimeoutError::Inner(e)) => Err(e),
4581 Err(TimeoutError::DeadlineElapsed) => {
4582 tracing::info!("parameter sync on boot: sync has timed out");
4583 Ok(None)
4584 }
4585 }
4586 }
4587 } else {
4588 Ok(None)
4589 }
4590}
4591
4592#[derive(Debug)]
4593pub enum WatchSetResponse {
4594 StatementDependenciesReady(StatementLoggingId, StatementLifecycleEvent),
4595 AlterSinkReady(AlterSinkReadyContext),
4596}
4597
4598#[derive(Debug)]
4599pub struct AlterSinkReadyContext {
4600 ctx: Option<ExecuteContext>,
4601 otel_ctx: OpenTelemetryContext,
4602 plan: AlterSinkPlan,
4603 plan_validity: PlanValidity,
4604 read_hold: ReadHolds<Timestamp>,
4605}
4606
4607impl AlterSinkReadyContext {
4608 fn ctx(&mut self) -> &mut ExecuteContext {
4609 self.ctx.as_mut().expect("only cleared on drop")
4610 }
4611
4612 fn retire(mut self, result: Result<ExecuteResponse, AdapterError>) {
4613 self.ctx
4614 .take()
4615 .expect("only cleared on drop")
4616 .retire(result);
4617 }
4618}
4619
4620impl Drop for AlterSinkReadyContext {
4621 fn drop(&mut self) {
4622 if let Some(ctx) = self.ctx.take() {
4623 ctx.retire(Err(AdapterError::Canceled));
4624 }
4625 }
4626}
4627
4628#[derive(Debug)]
4631struct LockedVecDeque<T> {
4632 items: VecDeque<T>,
4633 lock: Arc<tokio::sync::Mutex<()>>,
4634}
4635
4636impl<T> LockedVecDeque<T> {
4637 pub fn new() -> Self {
4638 Self {
4639 items: VecDeque::new(),
4640 lock: Arc::new(tokio::sync::Mutex::new(())),
4641 }
4642 }
4643
4644 pub fn try_lock_owned(&self) -> Result<OwnedMutexGuard<()>, tokio::sync::TryLockError> {
4645 Arc::clone(&self.lock).try_lock_owned()
4646 }
4647
4648 pub fn is_empty(&self) -> bool {
4649 self.items.is_empty()
4650 }
4651
4652 pub fn push_back(&mut self, value: T) {
4653 self.items.push_back(value)
4654 }
4655
4656 pub fn pop_front(&mut self) -> Option<T> {
4657 self.items.pop_front()
4658 }
4659
4660 pub fn remove(&mut self, index: usize) -> Option<T> {
4661 self.items.remove(index)
4662 }
4663
4664 pub fn iter(&self) -> std::collections::vec_deque::Iter<'_, T> {
4665 self.items.iter()
4666 }
4667}
4668
4669#[derive(Debug)]
4670struct DeferredPlanStatement {
4671 ctx: ExecuteContext,
4672 ps: PlanStatement,
4673}
4674
4675#[derive(Debug)]
4676enum PlanStatement {
4677 Statement {
4678 stmt: Arc<Statement<Raw>>,
4679 params: Params,
4680 },
4681 Plan {
4682 plan: mz_sql::plan::Plan,
4683 resolved_ids: ResolvedIds,
4684 },
4685}
4686
4687#[derive(Debug, Error)]
4688pub enum NetworkPolicyError {
4689 #[error("Access denied for address {0}")]
4690 AddressDenied(IpAddr),
4691 #[error("Access denied missing IP address")]
4692 MissingIp,
4693}
4694
4695pub(crate) fn validate_ip_with_policy_rules(
4696 ip: &IpAddr,
4697 rules: &Vec<NetworkPolicyRule>,
4698) -> Result<(), NetworkPolicyError> {
4699 if rules.iter().any(|r| r.address.0.contains(ip)) {
4702 Ok(())
4703 } else {
4704 Err(NetworkPolicyError::AddressDenied(ip.clone()))
4705 }
4706}