1use std::borrow::Cow;
70use std::collections::{BTreeMap, BTreeSet, VecDeque};
71use std::fmt;
72use std::net::IpAddr;
73use std::num::NonZeroI64;
74use std::ops::Neg;
75use std::str::FromStr;
76use std::sync::LazyLock;
77use std::sync::{Arc, Mutex};
78use std::thread;
79use std::time::{Duration, Instant};
80
81use anyhow::Context;
82use chrono::{DateTime, Utc};
83use derivative::Derivative;
84use differential_dataflow::lattice::Lattice;
85use fail::fail_point;
86use futures::StreamExt;
87use futures::future::{BoxFuture, FutureExt, LocalBoxFuture};
88use http::Uri;
89use ipnet::IpNet;
90use itertools::{Either, Itertools};
91use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
92use mz_adapter_types::compaction::CompactionWindow;
93use mz_adapter_types::connection::ConnectionId;
94use mz_adapter_types::dyncfgs::WITH_0DT_DEPLOYMENT_CAUGHT_UP_CHECK_INTERVAL;
95use mz_auth::password::Password;
96use mz_build_info::BuildInfo;
97use mz_catalog::builtin::{BUILTINS, BUILTINS_STATIC, MZ_AUDIT_EVENTS, MZ_STORAGE_USAGE_BY_SHARD};
98use mz_catalog::config::{AwsPrincipalContext, BuiltinItemMigrationConfig, ClusterReplicaSizeMap};
99use mz_catalog::durable::{AuditLogIterator, OpenableDurableCatalogState};
100use mz_catalog::expr_cache::{GlobalExpressions, LocalExpressions};
101use mz_catalog::memory::objects::{
102 CatalogEntry, CatalogItem, ClusterReplicaProcessStatus, ClusterVariantManaged, Connection,
103 DataSourceDesc, StateDiff, StateUpdate, StateUpdateKind, Table, TableDataSource,
104};
105use mz_cloud_resources::{CloudResourceController, VpcEndpointConfig, VpcEndpointEvent};
106use mz_compute_client::as_of_selection;
107use mz_compute_client::controller::error::{DataflowCreationError, InstanceMissing};
108use mz_compute_types::ComputeInstanceId;
109use mz_compute_types::dataflows::DataflowDescription;
110use mz_compute_types::plan::Plan;
111use mz_controller::clusters::{
112 ClusterConfig, ClusterEvent, ClusterStatus, ProcessId, ReplicaLocation,
113};
114use mz_controller::{ControllerConfig, Readiness};
115use mz_controller_types::{ClusterId, ReplicaId, WatchSetId};
116use mz_expr::{MapFilterProject, OptimizedMirRelationExpr, RowSetFinishing};
117use mz_license_keys::ValidatedLicenseKey;
118use mz_orchestrator::OfflineReason;
119use mz_ore::cast::{CastFrom, CastInto, CastLossy};
120use mz_ore::channel::trigger::Trigger;
121use mz_ore::future::TimeoutError;
122use mz_ore::metrics::MetricsRegistry;
123use mz_ore::now::{EpochMillis, NowFn};
124use mz_ore::task::{JoinHandle, spawn};
125use mz_ore::thread::JoinHandleExt;
126use mz_ore::tracing::{OpenTelemetryContext, TracingHandle};
127use mz_ore::url::SensitiveUrl;
128use mz_ore::{
129 assert_none, instrument, soft_assert_eq_or_log, soft_assert_or_log, soft_panic_or_log, stack,
130};
131use mz_persist_client::PersistClient;
132use mz_persist_client::batch::ProtoBatch;
133use mz_persist_client::usage::{ShardsUsageReferenced, StorageUsageClient};
134use mz_repr::adt::numeric::Numeric;
135use mz_repr::explain::{ExplainConfig, ExplainFormat};
136use mz_repr::global_id::TransientIdGen;
137use mz_repr::optimize::OptimizerFeatures;
138use mz_repr::role_id::RoleId;
139use mz_repr::{CatalogItemId, Diff, GlobalId, RelationDesc, Timestamp};
140use mz_secrets::cache::CachingSecretsReader;
141use mz_secrets::{SecretsController, SecretsReader};
142use mz_sql::ast::{Raw, Statement};
143use mz_sql::catalog::{CatalogCluster, EnvironmentId};
144use mz_sql::names::{QualifiedItemName, ResolvedIds, SchemaSpecifier};
145use mz_sql::optimizer_metrics::OptimizerMetrics;
146use mz_sql::plan::{
147 self, AlterSinkPlan, ConnectionDetails, CreateConnectionPlan, NetworkPolicyRule,
148 OnTimeoutAction, Params, QueryWhen,
149};
150use mz_sql::session::user::User;
151use mz_sql::session::vars::{MAX_CREDIT_CONSUMPTION_RATE, SystemVars, Var};
152use mz_sql_parser::ast::ExplainStage;
153use mz_sql_parser::ast::display::AstDisplay;
154use mz_storage_client::client::TableData;
155use mz_storage_client::controller::{CollectionDescription, DataSource, ExportDescription};
156use mz_storage_types::connections::Connection as StorageConnection;
157use mz_storage_types::connections::ConnectionContext;
158use mz_storage_types::connections::inline::{IntoInlineConnection, ReferencedConnection};
159use mz_storage_types::read_holds::ReadHold;
160use mz_storage_types::sinks::{S3SinkFormat, StorageSinkDesc};
161use mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC;
162use mz_storage_types::sources::{IngestionDescription, SourceExport, Timeline};
163use mz_timestamp_oracle::WriteTimestamp;
164use mz_timestamp_oracle::postgres_oracle::{
165 PostgresTimestampOracle, PostgresTimestampOracleConfig,
166};
167use mz_transform::dataflow::DataflowMetainfo;
168use opentelemetry::trace::TraceContextExt;
169use serde::Serialize;
170use thiserror::Error;
171use timely::progress::{Antichain, Timestamp as _};
172use tokio::runtime::Handle as TokioHandle;
173use tokio::select;
174use tokio::sync::{OwnedMutexGuard, mpsc, oneshot, watch};
175use tokio::time::{Interval, MissedTickBehavior};
176use tracing::{Instrument, Level, Span, debug, info, info_span, span, warn};
177use tracing_opentelemetry::OpenTelemetrySpanExt;
178use uuid::Uuid;
179
180use crate::active_compute_sink::{ActiveComputeSink, ActiveCopyFrom};
181use crate::catalog::{BuiltinTableUpdate, Catalog, OpenCatalogResult};
182use crate::client::{Client, Handle};
183use crate::command::{Command, ExecuteResponse};
184use crate::config::{SynchronizedParameters, SystemParameterFrontend, SystemParameterSyncConfig};
185use crate::coord::appends::{
186 BuiltinTableAppendNotify, DeferredOp, GroupCommitPermit, PendingWriteTxn,
187};
188use crate::coord::caught_up::CaughtUpCheckContext;
189use crate::coord::cluster_scheduling::SchedulingDecision;
190use crate::coord::id_bundle::CollectionIdBundle;
191use crate::coord::introspection::IntrospectionSubscribe;
192use crate::coord::peek::PendingPeek;
193use crate::coord::statement_logging::{StatementLogging, StatementLoggingId};
194use crate::coord::timeline::{TimelineContext, TimelineState};
195use crate::coord::timestamp_selection::{TimestampContext, TimestampDetermination};
196use crate::coord::validity::PlanValidity;
197use crate::error::AdapterError;
198use crate::explain::insights::PlanInsightsContext;
199use crate::explain::optimizer_trace::{DispatchGuard, OptimizerTrace};
200use crate::metrics::Metrics;
201use crate::optimize::dataflows::{
202 ComputeInstanceSnapshot, DataflowBuilder, dataflow_import_id_bundle,
203};
204use crate::optimize::{self, Optimize, OptimizerConfig};
205use crate::session::{EndTransactionAction, Session};
206use crate::statement_logging::{StatementEndedExecutionReason, StatementLifecycleEvent};
207use crate::util::{ClientTransmitter, ResultExt};
208use crate::webhook::{WebhookAppenderInvalidator, WebhookConcurrencyLimiter};
209use crate::{AdapterNotice, ReadHolds, flags};
210
211pub(crate) mod appends;
212pub(crate) mod catalog_serving;
213pub(crate) mod cluster_scheduling;
214pub(crate) mod consistency;
215pub(crate) mod id_bundle;
216pub(crate) mod in_memory_oracle;
217pub(crate) mod peek;
218pub(crate) mod read_policy;
219pub(crate) mod sequencer;
220pub(crate) mod statement_logging;
221pub(crate) mod timeline;
222pub(crate) mod timestamp_selection;
223
224pub mod catalog_implications;
225mod caught_up;
226mod command_handler;
227mod ddl;
228mod indexes;
229mod introspection;
230mod message_handler;
231mod privatelink_status;
232mod sql;
233mod validity;
234
235#[derive(Debug)]
236pub enum Message {
237 Command(OpenTelemetryContext, Command),
238 ControllerReady {
239 controller: ControllerReadiness,
240 },
241 PurifiedStatementReady(PurifiedStatementReady),
242 CreateConnectionValidationReady(CreateConnectionValidationReady),
243 AlterConnectionValidationReady(AlterConnectionValidationReady),
244 TryDeferred {
245 conn_id: ConnectionId,
247 acquired_lock: Option<(CatalogItemId, tokio::sync::OwnedMutexGuard<()>)>,
257 },
258 GroupCommitInitiate(Span, Option<GroupCommitPermit>),
260 DeferredStatementReady,
261 AdvanceTimelines,
262 ClusterEvent(ClusterEvent),
263 CancelPendingPeeks {
264 conn_id: ConnectionId,
265 },
266 LinearizeReads,
267 StagedBatches {
268 conn_id: ConnectionId,
269 table_id: CatalogItemId,
270 batches: Vec<Result<ProtoBatch, String>>,
271 },
272 StorageUsageSchedule,
273 StorageUsageFetch,
274 StorageUsageUpdate(ShardsUsageReferenced),
275 StorageUsagePrune(Vec<BuiltinTableUpdate>),
276 RetireExecute {
279 data: ExecuteContextExtra,
280 otel_ctx: OpenTelemetryContext,
281 reason: StatementEndedExecutionReason,
282 },
283 ExecuteSingleStatementTransaction {
284 ctx: ExecuteContext,
285 otel_ctx: OpenTelemetryContext,
286 stmt: Arc<Statement<Raw>>,
287 params: mz_sql::plan::Params,
288 },
289 PeekStageReady {
290 ctx: ExecuteContext,
291 span: Span,
292 stage: PeekStage,
293 },
294 CreateIndexStageReady {
295 ctx: ExecuteContext,
296 span: Span,
297 stage: CreateIndexStage,
298 },
299 CreateViewStageReady {
300 ctx: ExecuteContext,
301 span: Span,
302 stage: CreateViewStage,
303 },
304 CreateMaterializedViewStageReady {
305 ctx: ExecuteContext,
306 span: Span,
307 stage: CreateMaterializedViewStage,
308 },
309 SubscribeStageReady {
310 ctx: ExecuteContext,
311 span: Span,
312 stage: SubscribeStage,
313 },
314 IntrospectionSubscribeStageReady {
315 span: Span,
316 stage: IntrospectionSubscribeStage,
317 },
318 SecretStageReady {
319 ctx: ExecuteContext,
320 span: Span,
321 stage: SecretStage,
322 },
323 ClusterStageReady {
324 ctx: ExecuteContext,
325 span: Span,
326 stage: ClusterStage,
327 },
328 ExplainTimestampStageReady {
329 ctx: ExecuteContext,
330 span: Span,
331 stage: ExplainTimestampStage,
332 },
333 DrainStatementLog,
334 PrivateLinkVpcEndpointEvents(Vec<VpcEndpointEvent>),
335 CheckSchedulingPolicies,
336
337 SchedulingDecisions(Vec<(&'static str, Vec<(ClusterId, SchedulingDecision)>)>),
342}
343
344impl Message {
345 pub const fn kind(&self) -> &'static str {
347 match self {
348 Message::Command(_, msg) => match msg {
349 Command::CatalogSnapshot { .. } => "command-catalog_snapshot",
350 Command::Startup { .. } => "command-startup",
351 Command::Execute { .. } => "command-execute",
352 Command::Commit { .. } => "command-commit",
353 Command::CancelRequest { .. } => "command-cancel_request",
354 Command::PrivilegedCancelRequest { .. } => "command-privileged_cancel_request",
355 Command::GetWebhook { .. } => "command-get_webhook",
356 Command::GetSystemVars { .. } => "command-get_system_vars",
357 Command::SetSystemVars { .. } => "command-set_system_vars",
358 Command::Terminate { .. } => "command-terminate",
359 Command::RetireExecute { .. } => "command-retire_execute",
360 Command::CheckConsistency { .. } => "command-check_consistency",
361 Command::Dump { .. } => "command-dump",
362 Command::AuthenticatePassword { .. } => "command-auth_check",
363 Command::AuthenticateGetSASLChallenge { .. } => "command-auth_get_sasl_challenge",
364 Command::AuthenticateVerifySASLProof { .. } => "command-auth_verify_sasl_proof",
365 Command::GetComputeInstanceClient { .. } => "get-compute-instance-client",
366 Command::GetOracle { .. } => "get-oracle",
367 Command::DetermineRealTimeRecentTimestamp { .. } => {
368 "determine-real-time-recent-timestamp"
369 }
370 Command::GetTransactionReadHoldsBundle { .. } => {
371 "get-transaction-read-holds-bundle"
372 }
373 Command::StoreTransactionReadHolds { .. } => "store-transaction-read-holds",
374 Command::ExecuteSlowPathPeek { .. } => "execute-slow-path-peek",
375 Command::ExecuteCopyTo { .. } => "execute-copy-to",
376 },
377 Message::ControllerReady {
378 controller: ControllerReadiness::Compute,
379 } => "controller_ready(compute)",
380 Message::ControllerReady {
381 controller: ControllerReadiness::Storage,
382 } => "controller_ready(storage)",
383 Message::ControllerReady {
384 controller: ControllerReadiness::Metrics,
385 } => "controller_ready(metrics)",
386 Message::ControllerReady {
387 controller: ControllerReadiness::Internal,
388 } => "controller_ready(internal)",
389 Message::PurifiedStatementReady(_) => "purified_statement_ready",
390 Message::CreateConnectionValidationReady(_) => "create_connection_validation_ready",
391 Message::TryDeferred { .. } => "try_deferred",
392 Message::GroupCommitInitiate(..) => "group_commit_initiate",
393 Message::AdvanceTimelines => "advance_timelines",
394 Message::ClusterEvent(_) => "cluster_event",
395 Message::CancelPendingPeeks { .. } => "cancel_pending_peeks",
396 Message::LinearizeReads => "linearize_reads",
397 Message::StagedBatches { .. } => "staged_batches",
398 Message::StorageUsageSchedule => "storage_usage_schedule",
399 Message::StorageUsageFetch => "storage_usage_fetch",
400 Message::StorageUsageUpdate(_) => "storage_usage_update",
401 Message::StorageUsagePrune(_) => "storage_usage_prune",
402 Message::RetireExecute { .. } => "retire_execute",
403 Message::ExecuteSingleStatementTransaction { .. } => {
404 "execute_single_statement_transaction"
405 }
406 Message::PeekStageReady { .. } => "peek_stage_ready",
407 Message::ExplainTimestampStageReady { .. } => "explain_timestamp_stage_ready",
408 Message::CreateIndexStageReady { .. } => "create_index_stage_ready",
409 Message::CreateViewStageReady { .. } => "create_view_stage_ready",
410 Message::CreateMaterializedViewStageReady { .. } => {
411 "create_materialized_view_stage_ready"
412 }
413 Message::SubscribeStageReady { .. } => "subscribe_stage_ready",
414 Message::IntrospectionSubscribeStageReady { .. } => {
415 "introspection_subscribe_stage_ready"
416 }
417 Message::SecretStageReady { .. } => "secret_stage_ready",
418 Message::ClusterStageReady { .. } => "cluster_stage_ready",
419 Message::DrainStatementLog => "drain_statement_log",
420 Message::AlterConnectionValidationReady(..) => "alter_connection_validation_ready",
421 Message::PrivateLinkVpcEndpointEvents(_) => "private_link_vpc_endpoint_events",
422 Message::CheckSchedulingPolicies => "check_scheduling_policies",
423 Message::SchedulingDecisions { .. } => "scheduling_decision",
424 Message::DeferredStatementReady => "deferred_statement_ready",
425 }
426 }
427}
428
429#[derive(Debug)]
431pub enum ControllerReadiness {
432 Storage,
434 Compute,
436 Metrics,
438 Internal,
440}
441
442#[derive(Derivative)]
443#[derivative(Debug)]
444pub struct BackgroundWorkResult<T> {
445 #[derivative(Debug = "ignore")]
446 pub ctx: ExecuteContext,
447 pub result: Result<T, AdapterError>,
448 pub params: Params,
449 pub plan_validity: PlanValidity,
450 pub original_stmt: Arc<Statement<Raw>>,
451 pub otel_ctx: OpenTelemetryContext,
452}
453
454pub type PurifiedStatementReady = BackgroundWorkResult<mz_sql::pure::PurifiedStatement>;
455
456#[derive(Derivative)]
457#[derivative(Debug)]
458pub struct ValidationReady<T> {
459 #[derivative(Debug = "ignore")]
460 pub ctx: ExecuteContext,
461 pub result: Result<T, AdapterError>,
462 pub resolved_ids: ResolvedIds,
463 pub connection_id: CatalogItemId,
464 pub connection_gid: GlobalId,
465 pub plan_validity: PlanValidity,
466 pub otel_ctx: OpenTelemetryContext,
467}
468
469pub type CreateConnectionValidationReady = ValidationReady<CreateConnectionPlan>;
470pub type AlterConnectionValidationReady = ValidationReady<Connection>;
471
472#[derive(Debug)]
473pub enum PeekStage {
474 LinearizeTimestamp(PeekStageLinearizeTimestamp),
476 RealTimeRecency(PeekStageRealTimeRecency),
477 TimestampReadHold(PeekStageTimestampReadHold),
478 Optimize(PeekStageOptimize),
479 Finish(PeekStageFinish),
481 ExplainPlan(PeekStageExplainPlan),
483 ExplainPushdown(PeekStageExplainPushdown),
484 CopyToPreflight(PeekStageCopyTo),
486 CopyToDataflow(PeekStageCopyTo),
488}
489
490#[derive(Debug)]
491pub struct CopyToContext {
492 pub desc: RelationDesc,
494 pub uri: Uri,
496 pub connection: StorageConnection<ReferencedConnection>,
498 pub connection_id: CatalogItemId,
500 pub format: S3SinkFormat,
502 pub max_file_size: u64,
504 pub output_batch_count: Option<u64>,
509}
510
511#[derive(Debug)]
512pub struct PeekStageLinearizeTimestamp {
513 validity: PlanValidity,
514 plan: mz_sql::plan::SelectPlan,
515 max_query_result_size: Option<u64>,
516 source_ids: BTreeSet<GlobalId>,
517 target_replica: Option<ReplicaId>,
518 timeline_context: TimelineContext,
519 optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
520 explain_ctx: ExplainContext,
523}
524
525#[derive(Debug)]
526pub struct PeekStageRealTimeRecency {
527 validity: PlanValidity,
528 plan: mz_sql::plan::SelectPlan,
529 max_query_result_size: Option<u64>,
530 source_ids: BTreeSet<GlobalId>,
531 target_replica: Option<ReplicaId>,
532 timeline_context: TimelineContext,
533 oracle_read_ts: Option<Timestamp>,
534 optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
535 explain_ctx: ExplainContext,
538}
539
540#[derive(Debug)]
541pub struct PeekStageTimestampReadHold {
542 validity: PlanValidity,
543 plan: mz_sql::plan::SelectPlan,
544 max_query_result_size: Option<u64>,
545 source_ids: BTreeSet<GlobalId>,
546 target_replica: Option<ReplicaId>,
547 timeline_context: TimelineContext,
548 oracle_read_ts: Option<Timestamp>,
549 real_time_recency_ts: Option<mz_repr::Timestamp>,
550 optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
551 explain_ctx: ExplainContext,
554}
555
556#[derive(Debug)]
557pub struct PeekStageOptimize {
558 validity: PlanValidity,
559 plan: mz_sql::plan::SelectPlan,
560 max_query_result_size: Option<u64>,
561 source_ids: BTreeSet<GlobalId>,
562 id_bundle: CollectionIdBundle,
563 target_replica: Option<ReplicaId>,
564 determination: TimestampDetermination<mz_repr::Timestamp>,
565 optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
566 explain_ctx: ExplainContext,
569}
570
571#[derive(Debug)]
572pub struct PeekStageFinish {
573 validity: PlanValidity,
574 plan: mz_sql::plan::SelectPlan,
575 max_query_result_size: Option<u64>,
576 id_bundle: CollectionIdBundle,
577 target_replica: Option<ReplicaId>,
578 source_ids: BTreeSet<GlobalId>,
579 determination: TimestampDetermination<mz_repr::Timestamp>,
580 cluster_id: ComputeInstanceId,
581 finishing: RowSetFinishing,
582 plan_insights_optimizer_trace: Option<OptimizerTrace>,
585 insights_ctx: Option<Box<PlanInsightsContext>>,
586 global_lir_plan: optimize::peek::GlobalLirPlan,
587 optimization_finished_at: EpochMillis,
588}
589
590#[derive(Debug)]
591pub struct PeekStageCopyTo {
592 validity: PlanValidity,
593 optimizer: optimize::copy_to::Optimizer,
594 global_lir_plan: optimize::copy_to::GlobalLirPlan,
595 optimization_finished_at: EpochMillis,
596 source_ids: BTreeSet<GlobalId>,
597}
598
599#[derive(Debug)]
600pub struct PeekStageExplainPlan {
601 validity: PlanValidity,
602 optimizer: optimize::peek::Optimizer,
603 df_meta: DataflowMetainfo,
604 explain_ctx: ExplainPlanContext,
605 insights_ctx: Option<Box<PlanInsightsContext>>,
606}
607
608#[derive(Debug)]
609pub struct PeekStageExplainPushdown {
610 validity: PlanValidity,
611 determination: TimestampDetermination<mz_repr::Timestamp>,
612 imports: BTreeMap<GlobalId, MapFilterProject>,
613}
614
615#[derive(Debug)]
616pub enum CreateIndexStage {
617 Optimize(CreateIndexOptimize),
618 Finish(CreateIndexFinish),
619 Explain(CreateIndexExplain),
620}
621
622#[derive(Debug)]
623pub struct CreateIndexOptimize {
624 validity: PlanValidity,
625 plan: plan::CreateIndexPlan,
626 resolved_ids: ResolvedIds,
627 explain_ctx: ExplainContext,
630}
631
632#[derive(Debug)]
633pub struct CreateIndexFinish {
634 validity: PlanValidity,
635 item_id: CatalogItemId,
636 global_id: GlobalId,
637 plan: plan::CreateIndexPlan,
638 resolved_ids: ResolvedIds,
639 global_mir_plan: optimize::index::GlobalMirPlan,
640 global_lir_plan: optimize::index::GlobalLirPlan,
641}
642
643#[derive(Debug)]
644pub struct CreateIndexExplain {
645 validity: PlanValidity,
646 exported_index_id: GlobalId,
647 plan: plan::CreateIndexPlan,
648 df_meta: DataflowMetainfo,
649 explain_ctx: ExplainPlanContext,
650}
651
652#[derive(Debug)]
653pub enum CreateViewStage {
654 Optimize(CreateViewOptimize),
655 Finish(CreateViewFinish),
656 Explain(CreateViewExplain),
657}
658
659#[derive(Debug)]
660pub struct CreateViewOptimize {
661 validity: PlanValidity,
662 plan: plan::CreateViewPlan,
663 resolved_ids: ResolvedIds,
664 explain_ctx: ExplainContext,
667}
668
669#[derive(Debug)]
670pub struct CreateViewFinish {
671 validity: PlanValidity,
672 item_id: CatalogItemId,
674 global_id: GlobalId,
676 plan: plan::CreateViewPlan,
677 resolved_ids: ResolvedIds,
679 optimized_expr: OptimizedMirRelationExpr,
680}
681
682#[derive(Debug)]
683pub struct CreateViewExplain {
684 validity: PlanValidity,
685 id: GlobalId,
686 plan: plan::CreateViewPlan,
687 explain_ctx: ExplainPlanContext,
688}
689
690#[derive(Debug)]
691pub enum ExplainTimestampStage {
692 Optimize(ExplainTimestampOptimize),
693 RealTimeRecency(ExplainTimestampRealTimeRecency),
694 Finish(ExplainTimestampFinish),
695}
696
697#[derive(Debug)]
698pub struct ExplainTimestampOptimize {
699 validity: PlanValidity,
700 plan: plan::ExplainTimestampPlan,
701 cluster_id: ClusterId,
702}
703
704#[derive(Debug)]
705pub struct ExplainTimestampRealTimeRecency {
706 validity: PlanValidity,
707 format: ExplainFormat,
708 optimized_plan: OptimizedMirRelationExpr,
709 cluster_id: ClusterId,
710 when: QueryWhen,
711}
712
713#[derive(Debug)]
714pub struct ExplainTimestampFinish {
715 validity: PlanValidity,
716 format: ExplainFormat,
717 optimized_plan: OptimizedMirRelationExpr,
718 cluster_id: ClusterId,
719 source_ids: BTreeSet<GlobalId>,
720 when: QueryWhen,
721 real_time_recency_ts: Option<Timestamp>,
722}
723
724#[derive(Debug)]
725pub enum ClusterStage {
726 Alter(AlterCluster),
727 WaitForHydrated(AlterClusterWaitForHydrated),
728 Finalize(AlterClusterFinalize),
729}
730
731#[derive(Debug)]
732pub struct AlterCluster {
733 validity: PlanValidity,
734 plan: plan::AlterClusterPlan,
735}
736
737#[derive(Debug)]
738pub struct AlterClusterWaitForHydrated {
739 validity: PlanValidity,
740 plan: plan::AlterClusterPlan,
741 new_config: ClusterVariantManaged,
742 timeout_time: Instant,
743 on_timeout: OnTimeoutAction,
744}
745
746#[derive(Debug)]
747pub struct AlterClusterFinalize {
748 validity: PlanValidity,
749 plan: plan::AlterClusterPlan,
750 new_config: ClusterVariantManaged,
751}
752
753#[derive(Debug)]
754pub enum ExplainContext {
755 None,
757 Plan(ExplainPlanContext),
759 PlanInsightsNotice(OptimizerTrace),
762 Pushdown,
764}
765
766impl ExplainContext {
767 pub(crate) fn dispatch_guard(&self) -> Option<DispatchGuard<'_>> {
771 let optimizer_trace = match self {
772 ExplainContext::Plan(explain_ctx) => Some(&explain_ctx.optimizer_trace),
773 ExplainContext::PlanInsightsNotice(optimizer_trace) => Some(optimizer_trace),
774 _ => None,
775 };
776 optimizer_trace.map(|optimizer_trace| optimizer_trace.as_guard())
777 }
778
779 pub(crate) fn needs_cluster(&self) -> bool {
780 match self {
781 ExplainContext::None => true,
782 ExplainContext::Plan(..) => false,
783 ExplainContext::PlanInsightsNotice(..) => true,
784 ExplainContext::Pushdown => false,
785 }
786 }
787
788 pub(crate) fn needs_plan_insights(&self) -> bool {
789 matches!(
790 self,
791 ExplainContext::Plan(ExplainPlanContext {
792 stage: ExplainStage::PlanInsights,
793 ..
794 }) | ExplainContext::PlanInsightsNotice(_)
795 )
796 }
797}
798
799#[derive(Debug)]
800pub struct ExplainPlanContext {
801 pub broken: bool,
806 pub config: ExplainConfig,
807 pub format: ExplainFormat,
808 pub stage: ExplainStage,
809 pub replan: Option<GlobalId>,
810 pub desc: Option<RelationDesc>,
811 pub optimizer_trace: OptimizerTrace,
812}
813
814#[derive(Debug)]
815pub enum CreateMaterializedViewStage {
816 Optimize(CreateMaterializedViewOptimize),
817 Finish(CreateMaterializedViewFinish),
818 Explain(CreateMaterializedViewExplain),
819}
820
821#[derive(Debug)]
822pub struct CreateMaterializedViewOptimize {
823 validity: PlanValidity,
824 plan: plan::CreateMaterializedViewPlan,
825 resolved_ids: ResolvedIds,
826 explain_ctx: ExplainContext,
829}
830
831#[derive(Debug)]
832pub struct CreateMaterializedViewFinish {
833 item_id: CatalogItemId,
835 global_id: GlobalId,
837 validity: PlanValidity,
838 plan: plan::CreateMaterializedViewPlan,
839 resolved_ids: ResolvedIds,
840 local_mir_plan: optimize::materialized_view::LocalMirPlan,
841 global_mir_plan: optimize::materialized_view::GlobalMirPlan,
842 global_lir_plan: optimize::materialized_view::GlobalLirPlan,
843}
844
845#[derive(Debug)]
846pub struct CreateMaterializedViewExplain {
847 global_id: GlobalId,
848 validity: PlanValidity,
849 plan: plan::CreateMaterializedViewPlan,
850 df_meta: DataflowMetainfo,
851 explain_ctx: ExplainPlanContext,
852}
853
854#[derive(Debug)]
855pub enum SubscribeStage {
856 OptimizeMir(SubscribeOptimizeMir),
857 TimestampOptimizeLir(SubscribeTimestampOptimizeLir),
858 Finish(SubscribeFinish),
859}
860
861#[derive(Debug)]
862pub struct SubscribeOptimizeMir {
863 validity: PlanValidity,
864 plan: plan::SubscribePlan,
865 timeline: TimelineContext,
866 dependency_ids: BTreeSet<GlobalId>,
867 cluster_id: ComputeInstanceId,
868 replica_id: Option<ReplicaId>,
869}
870
871#[derive(Debug)]
872pub struct SubscribeTimestampOptimizeLir {
873 validity: PlanValidity,
874 plan: plan::SubscribePlan,
875 timeline: TimelineContext,
876 optimizer: optimize::subscribe::Optimizer,
877 global_mir_plan: optimize::subscribe::GlobalMirPlan<optimize::subscribe::Unresolved>,
878 dependency_ids: BTreeSet<GlobalId>,
879 replica_id: Option<ReplicaId>,
880}
881
882#[derive(Debug)]
883pub struct SubscribeFinish {
884 validity: PlanValidity,
885 cluster_id: ComputeInstanceId,
886 replica_id: Option<ReplicaId>,
887 plan: plan::SubscribePlan,
888 global_lir_plan: optimize::subscribe::GlobalLirPlan,
889 dependency_ids: BTreeSet<GlobalId>,
890}
891
892#[derive(Debug)]
893pub enum IntrospectionSubscribeStage {
894 OptimizeMir(IntrospectionSubscribeOptimizeMir),
895 TimestampOptimizeLir(IntrospectionSubscribeTimestampOptimizeLir),
896 Finish(IntrospectionSubscribeFinish),
897}
898
899#[derive(Debug)]
900pub struct IntrospectionSubscribeOptimizeMir {
901 validity: PlanValidity,
902 plan: plan::SubscribePlan,
903 subscribe_id: GlobalId,
904 cluster_id: ComputeInstanceId,
905 replica_id: ReplicaId,
906}
907
908#[derive(Debug)]
909pub struct IntrospectionSubscribeTimestampOptimizeLir {
910 validity: PlanValidity,
911 optimizer: optimize::subscribe::Optimizer,
912 global_mir_plan: optimize::subscribe::GlobalMirPlan<optimize::subscribe::Unresolved>,
913 cluster_id: ComputeInstanceId,
914 replica_id: ReplicaId,
915}
916
917#[derive(Debug)]
918pub struct IntrospectionSubscribeFinish {
919 validity: PlanValidity,
920 global_lir_plan: optimize::subscribe::GlobalLirPlan,
921 read_holds: ReadHolds<Timestamp>,
922 cluster_id: ComputeInstanceId,
923 replica_id: ReplicaId,
924}
925
926#[derive(Debug)]
927pub enum SecretStage {
928 CreateEnsure(CreateSecretEnsure),
929 CreateFinish(CreateSecretFinish),
930 RotateKeysEnsure(RotateKeysSecretEnsure),
931 RotateKeysFinish(RotateKeysSecretFinish),
932 Alter(AlterSecret),
933}
934
935#[derive(Debug)]
936pub struct CreateSecretEnsure {
937 validity: PlanValidity,
938 plan: plan::CreateSecretPlan,
939}
940
941#[derive(Debug)]
942pub struct CreateSecretFinish {
943 validity: PlanValidity,
944 item_id: CatalogItemId,
945 global_id: GlobalId,
946 plan: plan::CreateSecretPlan,
947}
948
949#[derive(Debug)]
950pub struct RotateKeysSecretEnsure {
951 validity: PlanValidity,
952 id: CatalogItemId,
953}
954
955#[derive(Debug)]
956pub struct RotateKeysSecretFinish {
957 validity: PlanValidity,
958 ops: Vec<crate::catalog::Op>,
959}
960
961#[derive(Debug)]
962pub struct AlterSecret {
963 validity: PlanValidity,
964 plan: plan::AlterSecretPlan,
965}
966
967#[derive(Debug, Copy, Clone, PartialEq, Eq)]
972pub enum TargetCluster {
973 CatalogServer,
975 Active,
977 Transaction(ClusterId),
979}
980
981pub(crate) enum StageResult<T> {
983 Handle(JoinHandle<Result<T, AdapterError>>),
985 HandleRetire(JoinHandle<Result<ExecuteResponse, AdapterError>>),
987 Immediate(T),
989 Response(ExecuteResponse),
991}
992
993pub(crate) trait Staged: Send {
995 type Ctx: StagedContext;
996
997 fn validity(&mut self) -> &mut PlanValidity;
998
999 async fn stage(
1001 self,
1002 coord: &mut Coordinator,
1003 ctx: &mut Self::Ctx,
1004 ) -> Result<StageResult<Box<Self>>, AdapterError>;
1005
1006 fn message(self, ctx: Self::Ctx, span: Span) -> Message;
1008
1009 fn cancel_enabled(&self) -> bool;
1011}
1012
1013pub trait StagedContext {
1014 fn retire(self, result: Result<ExecuteResponse, AdapterError>);
1015 fn session(&self) -> Option<&Session>;
1016}
1017
1018impl StagedContext for ExecuteContext {
1019 fn retire(self, result: Result<ExecuteResponse, AdapterError>) {
1020 self.retire(result);
1021 }
1022
1023 fn session(&self) -> Option<&Session> {
1024 Some(self.session())
1025 }
1026}
1027
1028impl StagedContext for () {
1029 fn retire(self, _result: Result<ExecuteResponse, AdapterError>) {}
1030
1031 fn session(&self) -> Option<&Session> {
1032 None
1033 }
1034}
1035
1036pub struct Config {
1038 pub controller_config: ControllerConfig,
1039 pub controller_envd_epoch: NonZeroI64,
1040 pub storage: Box<dyn mz_catalog::durable::DurableCatalogState>,
1041 pub audit_logs_iterator: AuditLogIterator,
1042 pub timestamp_oracle_url: Option<SensitiveUrl>,
1043 pub unsafe_mode: bool,
1044 pub all_features: bool,
1045 pub build_info: &'static BuildInfo,
1046 pub environment_id: EnvironmentId,
1047 pub metrics_registry: MetricsRegistry,
1048 pub now: NowFn,
1049 pub secrets_controller: Arc<dyn SecretsController>,
1050 pub cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
1051 pub availability_zones: Vec<String>,
1052 pub cluster_replica_sizes: ClusterReplicaSizeMap,
1053 pub builtin_system_cluster_config: BootstrapBuiltinClusterConfig,
1054 pub builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig,
1055 pub builtin_probe_cluster_config: BootstrapBuiltinClusterConfig,
1056 pub builtin_support_cluster_config: BootstrapBuiltinClusterConfig,
1057 pub builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig,
1058 pub system_parameter_defaults: BTreeMap<String, String>,
1059 pub storage_usage_client: StorageUsageClient,
1060 pub storage_usage_collection_interval: Duration,
1061 pub storage_usage_retention_period: Option<Duration>,
1062 pub segment_client: Option<mz_segment::Client>,
1063 pub egress_addresses: Vec<IpNet>,
1064 pub remote_system_parameters: Option<BTreeMap<String, String>>,
1065 pub aws_account_id: Option<String>,
1066 pub aws_privatelink_availability_zones: Option<Vec<String>>,
1067 pub connection_context: ConnectionContext,
1068 pub connection_limit_callback: Box<dyn Fn(u64, u64) -> () + Send + Sync + 'static>,
1069 pub webhook_concurrency_limit: WebhookConcurrencyLimiter,
1070 pub http_host_name: Option<String>,
1071 pub tracing_handle: TracingHandle,
1072 pub read_only_controllers: bool,
1076
1077 pub caught_up_trigger: Option<Trigger>,
1081
1082 pub helm_chart_version: Option<String>,
1083 pub license_key: ValidatedLicenseKey,
1084 pub external_login_password_mz_system: Option<Password>,
1085 pub force_builtin_schema_migration: Option<String>,
1086}
1087
1088#[derive(Debug, Serialize)]
1090pub struct ConnMeta {
1091 secret_key: u32,
1096 connected_at: EpochMillis,
1098 user: User,
1099 application_name: String,
1100 uuid: Uuid,
1101 conn_id: ConnectionId,
1102 client_ip: Option<IpAddr>,
1103
1104 drop_sinks: BTreeSet<GlobalId>,
1107
1108 #[serde(skip)]
1110 deferred_lock: Option<OwnedMutexGuard<()>>,
1111
1112 pending_cluster_alters: BTreeSet<ClusterId>,
1115
1116 #[serde(skip)]
1118 notice_tx: mpsc::UnboundedSender<AdapterNotice>,
1119
1120 authenticated_role: RoleId,
1124}
1125
1126impl ConnMeta {
1127 pub fn conn_id(&self) -> &ConnectionId {
1128 &self.conn_id
1129 }
1130
1131 pub fn user(&self) -> &User {
1132 &self.user
1133 }
1134
1135 pub fn application_name(&self) -> &str {
1136 &self.application_name
1137 }
1138
1139 pub fn authenticated_role_id(&self) -> &RoleId {
1140 &self.authenticated_role
1141 }
1142
1143 pub fn uuid(&self) -> Uuid {
1144 self.uuid
1145 }
1146
1147 pub fn client_ip(&self) -> Option<IpAddr> {
1148 self.client_ip
1149 }
1150
1151 pub fn connected_at(&self) -> EpochMillis {
1152 self.connected_at
1153 }
1154}
1155
1156#[derive(Debug)]
1157pub struct PendingTxn {
1159 ctx: ExecuteContext,
1161 response: Result<PendingTxnResponse, AdapterError>,
1163 action: EndTransactionAction,
1165}
1166
1167#[derive(Debug)]
1168pub enum PendingTxnResponse {
1170 Committed {
1172 params: BTreeMap<&'static str, String>,
1174 },
1175 Rolledback {
1177 params: BTreeMap<&'static str, String>,
1179 },
1180}
1181
1182impl PendingTxnResponse {
1183 pub fn extend_params(&mut self, p: impl IntoIterator<Item = (&'static str, String)>) {
1184 match self {
1185 PendingTxnResponse::Committed { params }
1186 | PendingTxnResponse::Rolledback { params } => params.extend(p),
1187 }
1188 }
1189}
1190
1191impl From<PendingTxnResponse> for ExecuteResponse {
1192 fn from(value: PendingTxnResponse) -> Self {
1193 match value {
1194 PendingTxnResponse::Committed { params } => {
1195 ExecuteResponse::TransactionCommitted { params }
1196 }
1197 PendingTxnResponse::Rolledback { params } => {
1198 ExecuteResponse::TransactionRolledBack { params }
1199 }
1200 }
1201 }
1202}
1203
1204#[derive(Debug)]
1205pub struct PendingReadTxn {
1207 txn: PendingRead,
1209 timestamp_context: TimestampContext<mz_repr::Timestamp>,
1211 created: Instant,
1213 num_requeues: u64,
1217 otel_ctx: OpenTelemetryContext,
1219}
1220
1221impl PendingReadTxn {
1222 pub fn timestamp_context(&self) -> &TimestampContext<mz_repr::Timestamp> {
1224 &self.timestamp_context
1225 }
1226
1227 pub(crate) fn take_context(self) -> ExecuteContext {
1228 self.txn.take_context()
1229 }
1230}
1231
1232#[derive(Debug)]
1233enum PendingRead {
1235 Read {
1236 txn: PendingTxn,
1238 },
1239 ReadThenWrite {
1240 ctx: ExecuteContext,
1242 tx: oneshot::Sender<Option<ExecuteContext>>,
1245 },
1246}
1247
1248impl PendingRead {
1249 #[instrument(level = "debug")]
1254 pub fn finish(self) -> Option<(ExecuteContext, Result<ExecuteResponse, AdapterError>)> {
1255 match self {
1256 PendingRead::Read {
1257 txn:
1258 PendingTxn {
1259 mut ctx,
1260 response,
1261 action,
1262 },
1263 ..
1264 } => {
1265 let changed = ctx.session_mut().vars_mut().end_transaction(action);
1266 let response = response.map(|mut r| {
1268 r.extend_params(changed);
1269 ExecuteResponse::from(r)
1270 });
1271
1272 Some((ctx, response))
1273 }
1274 PendingRead::ReadThenWrite { ctx, tx, .. } => {
1275 let _ = tx.send(Some(ctx));
1277 None
1278 }
1279 }
1280 }
1281
1282 fn label(&self) -> &'static str {
1283 match self {
1284 PendingRead::Read { .. } => "read",
1285 PendingRead::ReadThenWrite { .. } => "read_then_write",
1286 }
1287 }
1288
1289 pub(crate) fn take_context(self) -> ExecuteContext {
1290 match self {
1291 PendingRead::Read { txn, .. } => txn.ctx,
1292 PendingRead::ReadThenWrite { ctx, tx, .. } => {
1293 let _ = tx.send(None);
1296 ctx
1297 }
1298 }
1299 }
1300}
1301
1302#[derive(Debug, Default)]
1313#[must_use]
1314pub struct ExecuteContextExtra {
1315 statement_uuid: Option<StatementLoggingId>,
1316}
1317
1318impl ExecuteContextExtra {
1319 pub(crate) fn new(statement_uuid: Option<StatementLoggingId>) -> Self {
1320 Self { statement_uuid }
1321 }
1322 pub fn is_trivial(&self) -> bool {
1323 let Self { statement_uuid } = self;
1324 statement_uuid.is_none()
1325 }
1326 pub fn contents(&self) -> Option<StatementLoggingId> {
1327 let Self { statement_uuid } = self;
1328 *statement_uuid
1329 }
1330 #[must_use]
1334 fn retire(mut self) -> Option<StatementLoggingId> {
1335 let Self { statement_uuid } = &mut self;
1336 statement_uuid.take()
1337 }
1338}
1339
1340impl Drop for ExecuteContextExtra {
1341 fn drop(&mut self) {
1342 let Self { statement_uuid } = &*self;
1343 if let Some(statement_uuid) = statement_uuid {
1344 soft_panic_or_log!(
1348 "execute context for statement {statement_uuid:?} dropped without being properly retired."
1349 );
1350 }
1351 }
1352}
1353
1354#[derive(Debug)]
1367pub struct ExecuteContext {
1368 inner: Box<ExecuteContextInner>,
1369}
1370
1371impl std::ops::Deref for ExecuteContext {
1372 type Target = ExecuteContextInner;
1373 fn deref(&self) -> &Self::Target {
1374 &*self.inner
1375 }
1376}
1377
1378impl std::ops::DerefMut for ExecuteContext {
1379 fn deref_mut(&mut self) -> &mut Self::Target {
1380 &mut *self.inner
1381 }
1382}
1383
1384#[derive(Debug)]
1385pub struct ExecuteContextInner {
1386 tx: ClientTransmitter<ExecuteResponse>,
1387 internal_cmd_tx: mpsc::UnboundedSender<Message>,
1388 session: Session,
1389 extra: ExecuteContextExtra,
1390}
1391
1392impl ExecuteContext {
1393 pub fn session(&self) -> &Session {
1394 &self.session
1395 }
1396
1397 pub fn session_mut(&mut self) -> &mut Session {
1398 &mut self.session
1399 }
1400
1401 pub fn tx(&self) -> &ClientTransmitter<ExecuteResponse> {
1402 &self.tx
1403 }
1404
1405 pub fn tx_mut(&mut self) -> &mut ClientTransmitter<ExecuteResponse> {
1406 &mut self.tx
1407 }
1408
1409 pub fn from_parts(
1410 tx: ClientTransmitter<ExecuteResponse>,
1411 internal_cmd_tx: mpsc::UnboundedSender<Message>,
1412 session: Session,
1413 extra: ExecuteContextExtra,
1414 ) -> Self {
1415 Self {
1416 inner: ExecuteContextInner {
1417 tx,
1418 session,
1419 extra,
1420 internal_cmd_tx,
1421 }
1422 .into(),
1423 }
1424 }
1425
1426 pub fn into_parts(
1435 self,
1436 ) -> (
1437 ClientTransmitter<ExecuteResponse>,
1438 mpsc::UnboundedSender<Message>,
1439 Session,
1440 ExecuteContextExtra,
1441 ) {
1442 let ExecuteContextInner {
1443 tx,
1444 internal_cmd_tx,
1445 session,
1446 extra,
1447 } = *self.inner;
1448 (tx, internal_cmd_tx, session, extra)
1449 }
1450
1451 #[instrument(level = "debug")]
1453 pub fn retire(self, result: Result<ExecuteResponse, AdapterError>) {
1454 let ExecuteContextInner {
1455 tx,
1456 internal_cmd_tx,
1457 session,
1458 extra,
1459 } = *self.inner;
1460 let reason = if extra.is_trivial() {
1461 None
1462 } else {
1463 Some((&result).into())
1464 };
1465 tx.send(result, session);
1466 if let Some(reason) = reason {
1467 if let Err(e) = internal_cmd_tx.send(Message::RetireExecute {
1468 otel_ctx: OpenTelemetryContext::obtain(),
1469 data: extra,
1470 reason,
1471 }) {
1472 warn!("internal_cmd_rx dropped before we could send: {:?}", e);
1473 }
1474 }
1475 }
1476
1477 pub fn extra(&self) -> &ExecuteContextExtra {
1478 &self.extra
1479 }
1480
1481 pub fn extra_mut(&mut self) -> &mut ExecuteContextExtra {
1482 &mut self.extra
1483 }
1484}
1485
1486#[derive(Debug)]
1487struct ClusterReplicaStatuses(
1488 BTreeMap<ClusterId, BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>>>,
1489);
1490
1491impl ClusterReplicaStatuses {
1492 pub(crate) fn new() -> ClusterReplicaStatuses {
1493 ClusterReplicaStatuses(BTreeMap::new())
1494 }
1495
1496 pub(crate) fn initialize_cluster_statuses(&mut self, cluster_id: ClusterId) {
1500 let prev = self.0.insert(cluster_id, BTreeMap::new());
1501 assert_eq!(
1502 prev, None,
1503 "cluster {cluster_id} statuses already initialized"
1504 );
1505 }
1506
1507 pub(crate) fn initialize_cluster_replica_statuses(
1511 &mut self,
1512 cluster_id: ClusterId,
1513 replica_id: ReplicaId,
1514 num_processes: usize,
1515 time: DateTime<Utc>,
1516 ) {
1517 tracing::info!(
1518 ?cluster_id,
1519 ?replica_id,
1520 ?time,
1521 "initializing cluster replica status"
1522 );
1523 let replica_statuses = self.0.entry(cluster_id).or_default();
1524 let process_statuses = (0..num_processes)
1525 .map(|process_id| {
1526 let status = ClusterReplicaProcessStatus {
1527 status: ClusterStatus::Offline(Some(OfflineReason::Initializing)),
1528 time: time.clone(),
1529 };
1530 (u64::cast_from(process_id), status)
1531 })
1532 .collect();
1533 let prev = replica_statuses.insert(replica_id, process_statuses);
1534 assert_none!(
1535 prev,
1536 "cluster replica {cluster_id}.{replica_id} statuses already initialized"
1537 );
1538 }
1539
1540 pub(crate) fn remove_cluster_statuses(
1544 &mut self,
1545 cluster_id: &ClusterId,
1546 ) -> BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>> {
1547 let prev = self.0.remove(cluster_id);
1548 prev.unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"))
1549 }
1550
1551 pub(crate) fn remove_cluster_replica_statuses(
1555 &mut self,
1556 cluster_id: &ClusterId,
1557 replica_id: &ReplicaId,
1558 ) -> BTreeMap<ProcessId, ClusterReplicaProcessStatus> {
1559 let replica_statuses = self
1560 .0
1561 .get_mut(cluster_id)
1562 .unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"));
1563 let prev = replica_statuses.remove(replica_id);
1564 prev.unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1565 }
1566
1567 pub(crate) fn ensure_cluster_status(
1571 &mut self,
1572 cluster_id: ClusterId,
1573 replica_id: ReplicaId,
1574 process_id: ProcessId,
1575 status: ClusterReplicaProcessStatus,
1576 ) {
1577 let replica_statuses = self
1578 .0
1579 .get_mut(&cluster_id)
1580 .unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"))
1581 .get_mut(&replica_id)
1582 .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"));
1583 replica_statuses.insert(process_id, status);
1584 }
1585
1586 pub fn get_cluster_replica_status(
1590 &self,
1591 cluster_id: ClusterId,
1592 replica_id: ReplicaId,
1593 ) -> ClusterStatus {
1594 let process_status = self.get_cluster_replica_statuses(cluster_id, replica_id);
1595 Self::cluster_replica_status(process_status)
1596 }
1597
1598 pub fn cluster_replica_status(
1600 process_status: &BTreeMap<ProcessId, ClusterReplicaProcessStatus>,
1601 ) -> ClusterStatus {
1602 process_status
1603 .values()
1604 .fold(ClusterStatus::Online, |s, p| match (s, p.status) {
1605 (ClusterStatus::Online, ClusterStatus::Online) => ClusterStatus::Online,
1606 (x, y) => {
1607 let reason_x = match x {
1608 ClusterStatus::Offline(reason) => reason,
1609 ClusterStatus::Online => None,
1610 };
1611 let reason_y = match y {
1612 ClusterStatus::Offline(reason) => reason,
1613 ClusterStatus::Online => None,
1614 };
1615 ClusterStatus::Offline(reason_x.or(reason_y))
1617 }
1618 })
1619 }
1620
1621 pub(crate) fn get_cluster_replica_statuses(
1625 &self,
1626 cluster_id: ClusterId,
1627 replica_id: ReplicaId,
1628 ) -> &BTreeMap<ProcessId, ClusterReplicaProcessStatus> {
1629 self.try_get_cluster_replica_statuses(cluster_id, replica_id)
1630 .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1631 }
1632
1633 pub(crate) fn try_get_cluster_replica_statuses(
1635 &self,
1636 cluster_id: ClusterId,
1637 replica_id: ReplicaId,
1638 ) -> Option<&BTreeMap<ProcessId, ClusterReplicaProcessStatus>> {
1639 self.try_get_cluster_statuses(cluster_id)
1640 .and_then(|statuses| statuses.get(&replica_id))
1641 }
1642
1643 pub(crate) fn try_get_cluster_statuses(
1645 &self,
1646 cluster_id: ClusterId,
1647 ) -> Option<&BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>>> {
1648 self.0.get(&cluster_id)
1649 }
1650}
1651
1652#[derive(Derivative)]
1654#[derivative(Debug)]
1655pub struct Coordinator {
1656 #[derivative(Debug = "ignore")]
1658 controller: mz_controller::Controller,
1659 catalog: Arc<Catalog>,
1667
1668 persist_client: PersistClient,
1671
1672 internal_cmd_tx: mpsc::UnboundedSender<Message>,
1674 group_commit_tx: appends::GroupCommitNotifier,
1676
1677 strict_serializable_reads_tx: mpsc::UnboundedSender<(ConnectionId, PendingReadTxn)>,
1679
1680 global_timelines: BTreeMap<Timeline, TimelineState<Timestamp>>,
1683
1684 transient_id_gen: Arc<TransientIdGen>,
1686 active_conns: BTreeMap<ConnectionId, ConnMeta>,
1689
1690 txn_read_holds: BTreeMap<ConnectionId, read_policy::ReadHolds<Timestamp>>,
1694
1695 pending_peeks: BTreeMap<Uuid, PendingPeek>,
1699 client_pending_peeks: BTreeMap<ConnectionId, BTreeMap<Uuid, ClusterId>>,
1701
1702 pending_linearize_read_txns: BTreeMap<ConnectionId, PendingReadTxn>,
1704
1705 active_compute_sinks: BTreeMap<GlobalId, ActiveComputeSink>,
1707 active_webhooks: BTreeMap<CatalogItemId, WebhookAppenderInvalidator>,
1709 active_copies: BTreeMap<ConnectionId, ActiveCopyFrom>,
1712
1713 staged_cancellation: BTreeMap<ConnectionId, (watch::Sender<bool>, watch::Receiver<bool>)>,
1716 introspection_subscribes: BTreeMap<GlobalId, IntrospectionSubscribe>,
1718
1719 write_locks: BTreeMap<CatalogItemId, Arc<tokio::sync::Mutex<()>>>,
1721 deferred_write_ops: BTreeMap<ConnectionId, DeferredOp>,
1723
1724 pending_writes: Vec<PendingWriteTxn>,
1726
1727 advance_timelines_interval: Interval,
1737
1738 serialized_ddl: LockedVecDeque<DeferredPlanStatement>,
1747
1748 secrets_controller: Arc<dyn SecretsController>,
1751 caching_secrets_reader: CachingSecretsReader,
1753
1754 cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
1757
1758 storage_usage_client: StorageUsageClient,
1760 storage_usage_collection_interval: Duration,
1762
1763 #[derivative(Debug = "ignore")]
1765 segment_client: Option<mz_segment::Client>,
1766
1767 metrics: Metrics,
1769 optimizer_metrics: OptimizerMetrics,
1771
1772 tracing_handle: TracingHandle,
1774
1775 statement_logging: StatementLogging,
1777
1778 webhook_concurrency_limit: WebhookConcurrencyLimiter,
1780
1781 pg_timestamp_oracle_config: Option<PostgresTimestampOracleConfig>,
1785
1786 check_cluster_scheduling_policies_interval: Interval,
1788
1789 cluster_scheduling_decisions: BTreeMap<ClusterId, BTreeMap<&'static str, SchedulingDecision>>,
1793
1794 caught_up_check_interval: Interval,
1797
1798 caught_up_check: Option<CaughtUpCheckContext>,
1801
1802 installed_watch_sets: BTreeMap<WatchSetId, (ConnectionId, WatchSetResponse)>,
1804
1805 connection_watch_sets: BTreeMap<ConnectionId, BTreeSet<WatchSetId>>,
1807
1808 cluster_replica_statuses: ClusterReplicaStatuses,
1810
1811 read_only_controllers: bool,
1815
1816 buffered_builtin_table_updates: Option<Vec<BuiltinTableUpdate>>,
1824
1825 license_key: ValidatedLicenseKey,
1826}
1827
1828impl Coordinator {
1829 #[instrument(name = "coord::bootstrap")]
1833 pub(crate) async fn bootstrap(
1834 &mut self,
1835 boot_ts: Timestamp,
1836 migrated_storage_collections_0dt: BTreeSet<CatalogItemId>,
1837 mut builtin_table_updates: Vec<BuiltinTableUpdate>,
1838 cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
1839 uncached_local_exprs: BTreeMap<GlobalId, LocalExpressions>,
1840 audit_logs_iterator: AuditLogIterator,
1841 ) -> Result<(), AdapterError> {
1842 let bootstrap_start = Instant::now();
1843 info!("startup: coordinator init: bootstrap beginning");
1844 info!("startup: coordinator init: bootstrap: preamble beginning");
1845
1846 let cluster_statuses: Vec<(_, Vec<_>)> = self
1849 .catalog()
1850 .clusters()
1851 .map(|cluster| {
1852 (
1853 cluster.id(),
1854 cluster
1855 .replicas()
1856 .map(|replica| {
1857 (replica.replica_id, replica.config.location.num_processes())
1858 })
1859 .collect(),
1860 )
1861 })
1862 .collect();
1863 let now = self.now_datetime();
1864 for (cluster_id, replica_statuses) in cluster_statuses {
1865 self.cluster_replica_statuses
1866 .initialize_cluster_statuses(cluster_id);
1867 for (replica_id, num_processes) in replica_statuses {
1868 self.cluster_replica_statuses
1869 .initialize_cluster_replica_statuses(
1870 cluster_id,
1871 replica_id,
1872 num_processes,
1873 now,
1874 );
1875 }
1876 }
1877
1878 let system_config = self.catalog().system_config();
1879
1880 mz_metrics::update_dyncfg(&system_config.dyncfg_updates());
1882
1883 let compute_config = flags::compute_config(system_config);
1885 let storage_config = flags::storage_config(system_config);
1886 let scheduling_config = flags::orchestrator_scheduling_config(system_config);
1887 let dyncfg_updates = system_config.dyncfg_updates();
1888 self.controller.compute.update_configuration(compute_config);
1889 self.controller.storage.update_parameters(storage_config);
1890 self.controller
1891 .update_orchestrator_scheduling_config(scheduling_config);
1892 self.controller.update_configuration(dyncfg_updates);
1893
1894 self.validate_resource_limit_numeric(
1895 Numeric::zero(),
1896 self.current_credit_consumption_rate(),
1897 |system_vars| {
1898 self.license_key
1899 .max_credit_consumption_rate()
1900 .map_or_else(|| system_vars.max_credit_consumption_rate(), Numeric::from)
1901 },
1902 "cluster replica",
1903 MAX_CREDIT_CONSUMPTION_RATE.name(),
1904 )?;
1905
1906 let mut policies_to_set: BTreeMap<CompactionWindow, CollectionIdBundle> =
1907 Default::default();
1908
1909 let enable_worker_core_affinity =
1910 self.catalog().system_config().enable_worker_core_affinity();
1911 for instance in self.catalog.clusters() {
1912 self.controller.create_cluster(
1913 instance.id,
1914 ClusterConfig {
1915 arranged_logs: instance.log_indexes.clone(),
1916 workload_class: instance.config.workload_class.clone(),
1917 },
1918 )?;
1919 for replica in instance.replicas() {
1920 let role = instance.role();
1921 self.controller.create_replica(
1922 instance.id,
1923 replica.replica_id,
1924 instance.name.clone(),
1925 replica.name.clone(),
1926 role,
1927 replica.config.clone(),
1928 enable_worker_core_affinity,
1929 )?;
1930 }
1931 }
1932
1933 info!(
1934 "startup: coordinator init: bootstrap: preamble complete in {:?}",
1935 bootstrap_start.elapsed()
1936 );
1937
1938 let init_storage_collections_start = Instant::now();
1939 info!("startup: coordinator init: bootstrap: storage collections init beginning");
1940 self.bootstrap_storage_collections(&migrated_storage_collections_0dt)
1941 .await;
1942 info!(
1943 "startup: coordinator init: bootstrap: storage collections init complete in {:?}",
1944 init_storage_collections_start.elapsed()
1945 );
1946
1947 self.controller.start_compute_introspection_sink();
1952
1953 let optimize_dataflows_start = Instant::now();
1954 info!("startup: coordinator init: bootstrap: optimize dataflow plans beginning");
1955 let entries: Vec<_> = self.catalog().entries().cloned().collect();
1956 let uncached_global_exps = self.bootstrap_dataflow_plans(&entries, cached_global_exprs)?;
1957 info!(
1958 "startup: coordinator init: bootstrap: optimize dataflow plans complete in {:?}",
1959 optimize_dataflows_start.elapsed()
1960 );
1961
1962 let _fut = self.catalog().update_expression_cache(
1964 uncached_local_exprs.into_iter().collect(),
1965 uncached_global_exps.into_iter().collect(),
1966 );
1967
1968 let bootstrap_as_ofs_start = Instant::now();
1972 info!("startup: coordinator init: bootstrap: dataflow as-of bootstrapping beginning");
1973 let dataflow_read_holds = self.bootstrap_dataflow_as_ofs().await;
1974 info!(
1975 "startup: coordinator init: bootstrap: dataflow as-of bootstrapping complete in {:?}",
1976 bootstrap_as_ofs_start.elapsed()
1977 );
1978
1979 let postamble_start = Instant::now();
1980 info!("startup: coordinator init: bootstrap: postamble beginning");
1981
1982 let logs: BTreeSet<_> = BUILTINS::logs()
1983 .map(|log| self.catalog().resolve_builtin_log(log))
1984 .flat_map(|item_id| self.catalog().get_global_ids(&item_id))
1985 .collect();
1986
1987 let mut privatelink_connections = BTreeMap::new();
1988
1989 for entry in &entries {
1990 debug!(
1991 "coordinator init: installing {} {}",
1992 entry.item().typ(),
1993 entry.id()
1994 );
1995 let mut policy = entry.item().initial_logical_compaction_window();
1996 match entry.item() {
1997 CatalogItem::Source(source) => {
2003 if source.custom_logical_compaction_window.is_none() {
2005 if let DataSourceDesc::IngestionExport { ingestion_id, .. } =
2006 source.data_source
2007 {
2008 policy = Some(
2009 self.catalog()
2010 .get_entry(&ingestion_id)
2011 .source()
2012 .expect("must be source")
2013 .custom_logical_compaction_window
2014 .unwrap_or_default(),
2015 );
2016 }
2017 }
2018 policies_to_set
2019 .entry(policy.expect("sources have a compaction window"))
2020 .or_insert_with(Default::default)
2021 .storage_ids
2022 .insert(source.global_id());
2023 }
2024 CatalogItem::Table(table) => {
2025 policies_to_set
2026 .entry(policy.expect("tables have a compaction window"))
2027 .or_insert_with(Default::default)
2028 .storage_ids
2029 .extend(table.global_ids());
2030 }
2031 CatalogItem::Index(idx) => {
2032 let policy_entry = policies_to_set
2033 .entry(policy.expect("indexes have a compaction window"))
2034 .or_insert_with(Default::default);
2035
2036 if logs.contains(&idx.on) {
2037 policy_entry
2038 .compute_ids
2039 .entry(idx.cluster_id)
2040 .or_insert_with(BTreeSet::new)
2041 .insert(idx.global_id());
2042 } else {
2043 let df_desc = self
2044 .catalog()
2045 .try_get_physical_plan(&idx.global_id())
2046 .expect("added in `bootstrap_dataflow_plans`")
2047 .clone();
2048
2049 let df_meta = self
2050 .catalog()
2051 .try_get_dataflow_metainfo(&idx.global_id())
2052 .expect("added in `bootstrap_dataflow_plans`");
2053
2054 if self.catalog().state().system_config().enable_mz_notices() {
2055 self.catalog().state().pack_optimizer_notices(
2057 &mut builtin_table_updates,
2058 df_meta.optimizer_notices.iter(),
2059 Diff::ONE,
2060 );
2061 }
2062
2063 policy_entry
2066 .compute_ids
2067 .entry(idx.cluster_id)
2068 .or_insert_with(Default::default)
2069 .extend(df_desc.export_ids());
2070
2071 self.controller
2072 .compute
2073 .create_dataflow(idx.cluster_id, df_desc, None)
2074 .unwrap_or_terminate("cannot fail to create dataflows");
2075 }
2076 }
2077 CatalogItem::View(_) => (),
2078 CatalogItem::MaterializedView(mview) => {
2079 policies_to_set
2080 .entry(policy.expect("materialized views have a compaction window"))
2081 .or_insert_with(Default::default)
2082 .storage_ids
2083 .insert(mview.global_id_writes());
2084
2085 let mut df_desc = self
2086 .catalog()
2087 .try_get_physical_plan(&mview.global_id_writes())
2088 .expect("added in `bootstrap_dataflow_plans`")
2089 .clone();
2090
2091 if let Some(initial_as_of) = mview.initial_as_of.clone() {
2092 df_desc.set_initial_as_of(initial_as_of);
2093 }
2094
2095 let until = mview
2097 .refresh_schedule
2098 .as_ref()
2099 .and_then(|s| s.last_refresh())
2100 .and_then(|r| r.try_step_forward());
2101 if let Some(until) = until {
2102 df_desc.until.meet_assign(&Antichain::from_elem(until));
2103 }
2104
2105 let df_meta = self
2106 .catalog()
2107 .try_get_dataflow_metainfo(&mview.global_id_writes())
2108 .expect("added in `bootstrap_dataflow_plans`");
2109
2110 if self.catalog().state().system_config().enable_mz_notices() {
2111 self.catalog().state().pack_optimizer_notices(
2113 &mut builtin_table_updates,
2114 df_meta.optimizer_notices.iter(),
2115 Diff::ONE,
2116 );
2117 }
2118
2119 self.ship_dataflow(df_desc, mview.cluster_id, None).await;
2120
2121 if mview.replacement_target.is_none() {
2124 self.allow_writes(mview.cluster_id, mview.global_id_writes());
2125 }
2126 }
2127 CatalogItem::Sink(sink) => {
2128 policies_to_set
2129 .entry(CompactionWindow::Default)
2130 .or_insert_with(Default::default)
2131 .storage_ids
2132 .insert(sink.global_id());
2133 }
2134 CatalogItem::Connection(catalog_connection) => {
2135 if let ConnectionDetails::AwsPrivatelink(conn) = &catalog_connection.details {
2136 privatelink_connections.insert(
2137 entry.id(),
2138 VpcEndpointConfig {
2139 aws_service_name: conn.service_name.clone(),
2140 availability_zone_ids: conn.availability_zones.clone(),
2141 },
2142 );
2143 }
2144 }
2145 CatalogItem::ContinualTask(ct) => {
2146 policies_to_set
2147 .entry(policy.expect("continual tasks have a compaction window"))
2148 .or_insert_with(Default::default)
2149 .storage_ids
2150 .insert(ct.global_id());
2151
2152 let mut df_desc = self
2153 .catalog()
2154 .try_get_physical_plan(&ct.global_id())
2155 .expect("added in `bootstrap_dataflow_plans`")
2156 .clone();
2157
2158 if let Some(initial_as_of) = ct.initial_as_of.clone() {
2159 df_desc.set_initial_as_of(initial_as_of);
2160 }
2161
2162 let df_meta = self
2163 .catalog()
2164 .try_get_dataflow_metainfo(&ct.global_id())
2165 .expect("added in `bootstrap_dataflow_plans`");
2166
2167 if self.catalog().state().system_config().enable_mz_notices() {
2168 self.catalog().state().pack_optimizer_notices(
2170 &mut builtin_table_updates,
2171 df_meta.optimizer_notices.iter(),
2172 Diff::ONE,
2173 );
2174 }
2175
2176 self.ship_dataflow(df_desc, ct.cluster_id, None).await;
2177 self.allow_writes(ct.cluster_id, ct.global_id());
2178 }
2179 CatalogItem::Log(_)
2181 | CatalogItem::Type(_)
2182 | CatalogItem::Func(_)
2183 | CatalogItem::Secret(_) => {}
2184 }
2185 }
2186
2187 if let Some(cloud_resource_controller) = &self.cloud_resource_controller {
2188 let existing_vpc_endpoints = cloud_resource_controller
2190 .list_vpc_endpoints()
2191 .await
2192 .context("list vpc endpoints")?;
2193 let existing_vpc_endpoints = BTreeSet::from_iter(existing_vpc_endpoints.into_keys());
2194 let desired_vpc_endpoints = privatelink_connections.keys().cloned().collect();
2195 let vpc_endpoints_to_remove = existing_vpc_endpoints.difference(&desired_vpc_endpoints);
2196 for id in vpc_endpoints_to_remove {
2197 cloud_resource_controller
2198 .delete_vpc_endpoint(*id)
2199 .await
2200 .context("deleting extraneous vpc endpoint")?;
2201 }
2202
2203 for (id, spec) in privatelink_connections {
2205 cloud_resource_controller
2206 .ensure_vpc_endpoint(id, spec)
2207 .await
2208 .context("ensuring vpc endpoint")?;
2209 }
2210 }
2211
2212 drop(dataflow_read_holds);
2215 for (cw, policies) in policies_to_set {
2217 self.initialize_read_policies(&policies, cw).await;
2218 }
2219
2220 builtin_table_updates.extend(
2222 self.catalog().state().resolve_builtin_table_updates(
2223 self.catalog().state().pack_all_replica_size_updates(),
2224 ),
2225 );
2226
2227 debug!("startup: coordinator init: bootstrap: initializing migrated builtin tables");
2228 let migrated_updates_fut = if self.controller.read_only() {
2234 let min_timestamp = Timestamp::minimum();
2235 let migrated_builtin_table_updates: Vec<_> = builtin_table_updates
2236 .extract_if(.., |update| {
2237 let gid = self.catalog().get_entry(&update.id).latest_global_id();
2238 migrated_storage_collections_0dt.contains(&update.id)
2239 && self
2240 .controller
2241 .storage_collections
2242 .collection_frontiers(gid)
2243 .expect("all tables are registered")
2244 .write_frontier
2245 .elements()
2246 == &[min_timestamp]
2247 })
2248 .collect();
2249 if migrated_builtin_table_updates.is_empty() {
2250 futures::future::ready(()).boxed()
2251 } else {
2252 let mut grouped_appends: BTreeMap<GlobalId, Vec<TableData>> = BTreeMap::new();
2254 for update in migrated_builtin_table_updates {
2255 let gid = self.catalog().get_entry(&update.id).latest_global_id();
2256 grouped_appends.entry(gid).or_default().push(update.data);
2257 }
2258 info!(
2259 "coordinator init: rehydrating migrated builtin tables in read-only mode: {:?}",
2260 grouped_appends.keys().collect::<Vec<_>>()
2261 );
2262
2263 let mut all_appends = Vec::with_capacity(grouped_appends.len());
2265 for (item_id, table_data) in grouped_appends.into_iter() {
2266 let mut all_rows = Vec::new();
2267 let mut all_data = Vec::new();
2268 for data in table_data {
2269 match data {
2270 TableData::Rows(rows) => all_rows.extend(rows),
2271 TableData::Batches(_) => all_data.push(data),
2272 }
2273 }
2274 differential_dataflow::consolidation::consolidate(&mut all_rows);
2275 all_data.push(TableData::Rows(all_rows));
2276
2277 all_appends.push((item_id, all_data));
2279 }
2280
2281 let fut = self
2282 .controller
2283 .storage
2284 .append_table(min_timestamp, boot_ts.step_forward(), all_appends)
2285 .expect("cannot fail to append");
2286 async {
2287 fut.await
2288 .expect("One-shot shouldn't be dropped during bootstrap")
2289 .unwrap_or_terminate("cannot fail to append")
2290 }
2291 .boxed()
2292 }
2293 } else {
2294 futures::future::ready(()).boxed()
2295 };
2296
2297 info!(
2298 "startup: coordinator init: bootstrap: postamble complete in {:?}",
2299 postamble_start.elapsed()
2300 );
2301
2302 let builtin_update_start = Instant::now();
2303 info!("startup: coordinator init: bootstrap: generate builtin updates beginning");
2304
2305 if self.controller.read_only() {
2306 info!(
2307 "coordinator init: bootstrap: stashing builtin table updates while in read-only mode"
2308 );
2309
2310 let audit_join_start = Instant::now();
2312 info!("startup: coordinator init: bootstrap: audit log deserialization beginning");
2313 let audit_log_updates: Vec<_> = audit_logs_iterator
2314 .map(|(audit_log, ts)| StateUpdate {
2315 kind: StateUpdateKind::AuditLog(audit_log),
2316 ts,
2317 diff: StateDiff::Addition,
2318 })
2319 .collect();
2320 let audit_log_builtin_table_updates = self
2321 .catalog()
2322 .state()
2323 .generate_builtin_table_updates(audit_log_updates);
2324 builtin_table_updates.extend(audit_log_builtin_table_updates);
2325 info!(
2326 "startup: coordinator init: bootstrap: audit log deserialization complete in {:?}",
2327 audit_join_start.elapsed()
2328 );
2329 self.buffered_builtin_table_updates
2330 .as_mut()
2331 .expect("in read-only mode")
2332 .append(&mut builtin_table_updates);
2333 } else {
2334 self.bootstrap_tables(&entries, builtin_table_updates, audit_logs_iterator)
2335 .await;
2336 };
2337 info!(
2338 "startup: coordinator init: bootstrap: generate builtin updates complete in {:?}",
2339 builtin_update_start.elapsed()
2340 );
2341
2342 let cleanup_secrets_start = Instant::now();
2343 info!("startup: coordinator init: bootstrap: generate secret cleanup beginning");
2344 {
2348 let Self {
2351 secrets_controller,
2352 catalog,
2353 ..
2354 } = self;
2355
2356 let next_user_item_id = catalog.get_next_user_item_id().await?;
2357 let next_system_item_id = catalog.get_next_system_item_id().await?;
2358 let read_only = self.controller.read_only();
2359 let catalog_ids: BTreeSet<CatalogItemId> =
2364 catalog.entries().map(|entry| entry.id()).collect();
2365 let secrets_controller = Arc::clone(secrets_controller);
2366
2367 spawn(|| "cleanup-orphaned-secrets", async move {
2368 if read_only {
2369 info!(
2370 "coordinator init: not cleaning up orphaned secrets while in read-only mode"
2371 );
2372 return;
2373 }
2374 info!("coordinator init: cleaning up orphaned secrets");
2375
2376 match secrets_controller.list().await {
2377 Ok(controller_secrets) => {
2378 let controller_secrets: BTreeSet<CatalogItemId> =
2379 controller_secrets.into_iter().collect();
2380 let orphaned = controller_secrets.difference(&catalog_ids);
2381 for id in orphaned {
2382 let id_too_large = match id {
2383 CatalogItemId::System(id) => *id >= next_system_item_id,
2384 CatalogItemId::User(id) => *id >= next_user_item_id,
2385 CatalogItemId::IntrospectionSourceIndex(_)
2386 | CatalogItemId::Transient(_) => false,
2387 };
2388 if id_too_large {
2389 info!(
2390 %next_user_item_id, %next_system_item_id,
2391 "coordinator init: not deleting orphaned secret {id} that was likely created by a newer deploy generation"
2392 );
2393 } else {
2394 info!("coordinator init: deleting orphaned secret {id}");
2395 fail_point!("orphan_secrets");
2396 if let Err(e) = secrets_controller.delete(*id).await {
2397 warn!(
2398 "Dropping orphaned secret has encountered an error: {}",
2399 e
2400 );
2401 }
2402 }
2403 }
2404 }
2405 Err(e) => warn!("Failed to list secrets during orphan cleanup: {:?}", e),
2406 }
2407 });
2408 }
2409 info!(
2410 "startup: coordinator init: bootstrap: generate secret cleanup complete in {:?}",
2411 cleanup_secrets_start.elapsed()
2412 );
2413
2414 let final_steps_start = Instant::now();
2416 info!(
2417 "startup: coordinator init: bootstrap: migrate builtin tables in read-only mode beginning"
2418 );
2419 migrated_updates_fut
2420 .instrument(info_span!("coord::bootstrap::final"))
2421 .await;
2422
2423 debug!(
2424 "startup: coordinator init: bootstrap: announcing completion of initialization to controller"
2425 );
2426 self.controller.initialization_complete();
2428
2429 self.bootstrap_introspection_subscribes().await;
2431
2432 info!(
2433 "startup: coordinator init: bootstrap: migrate builtin tables in read-only mode complete in {:?}",
2434 final_steps_start.elapsed()
2435 );
2436
2437 info!(
2438 "startup: coordinator init: bootstrap complete in {:?}",
2439 bootstrap_start.elapsed()
2440 );
2441 Ok(())
2442 }
2443
2444 #[allow(clippy::async_yields_async)]
2449 #[instrument]
2450 async fn bootstrap_tables(
2451 &mut self,
2452 entries: &[CatalogEntry],
2453 mut builtin_table_updates: Vec<BuiltinTableUpdate>,
2454 audit_logs_iterator: AuditLogIterator,
2455 ) {
2456 struct TableMetadata<'a> {
2458 id: CatalogItemId,
2459 name: &'a QualifiedItemName,
2460 table: &'a Table,
2461 }
2462
2463 let table_metas: Vec<_> = entries
2465 .into_iter()
2466 .filter_map(|entry| {
2467 entry.table().map(|table| TableMetadata {
2468 id: entry.id(),
2469 name: entry.name(),
2470 table,
2471 })
2472 })
2473 .collect();
2474
2475 debug!("coordinator init: advancing all tables to current timestamp");
2477 let WriteTimestamp {
2478 timestamp: write_ts,
2479 advance_to,
2480 } = self.get_local_write_ts().await;
2481 let appends = table_metas
2482 .iter()
2483 .map(|meta| (meta.table.global_id_writes(), Vec::new()))
2484 .collect();
2485 let table_fence_rx = self
2489 .controller
2490 .storage
2491 .append_table(write_ts.clone(), advance_to, appends)
2492 .expect("invalid updates");
2493
2494 self.apply_local_write(write_ts).await;
2495
2496 debug!("coordinator init: resetting system tables");
2498 let read_ts = self.get_local_read_ts().await;
2499
2500 let mz_storage_usage_by_shard_schema: SchemaSpecifier = self
2503 .catalog()
2504 .resolve_system_schema(MZ_STORAGE_USAGE_BY_SHARD.schema)
2505 .into();
2506 let is_storage_usage_by_shard = |meta: &TableMetadata| -> bool {
2507 meta.name.item == MZ_STORAGE_USAGE_BY_SHARD.name
2508 && meta.name.qualifiers.schema_spec == mz_storage_usage_by_shard_schema
2509 };
2510
2511 let mut retraction_tasks = Vec::new();
2512 let mut system_tables: Vec<_> = table_metas
2513 .iter()
2514 .filter(|meta| meta.id.is_system() && !is_storage_usage_by_shard(meta))
2515 .collect();
2516
2517 let (audit_events_idx, _) = system_tables
2519 .iter()
2520 .find_position(|table| {
2521 table.id == self.catalog().resolve_builtin_table(&MZ_AUDIT_EVENTS)
2522 })
2523 .expect("mz_audit_events must exist");
2524 let audit_events = system_tables.remove(audit_events_idx);
2525 let audit_log_task = self.bootstrap_audit_log_table(
2526 audit_events.id,
2527 audit_events.name,
2528 audit_events.table,
2529 audit_logs_iterator,
2530 read_ts,
2531 );
2532
2533 for system_table in system_tables {
2534 let table_id = system_table.id;
2535 let full_name = self.catalog().resolve_full_name(system_table.name, None);
2536 debug!("coordinator init: resetting system table {full_name} ({table_id})");
2537
2538 let snapshot_fut = self
2540 .controller
2541 .storage_collections
2542 .snapshot_cursor(system_table.table.global_id_writes(), read_ts);
2543 let batch_fut = self
2544 .controller
2545 .storage_collections
2546 .create_update_builder(system_table.table.global_id_writes());
2547
2548 let task = spawn(|| format!("snapshot-{table_id}"), async move {
2549 let mut batch = batch_fut
2551 .await
2552 .unwrap_or_terminate("cannot fail to create a batch for a BuiltinTable");
2553 tracing::info!(?table_id, "starting snapshot");
2554 let mut snapshot_cursor = snapshot_fut
2556 .await
2557 .unwrap_or_terminate("cannot fail to snapshot");
2558
2559 while let Some(values) = snapshot_cursor.next().await {
2561 for ((key, _val), _t, d) in values {
2562 let key = key.expect("builtin table had errors");
2563 let d_invert = d.neg();
2564 batch.add(&key, &(), &d_invert).await;
2565 }
2566 }
2567 tracing::info!(?table_id, "finished snapshot");
2568
2569 let batch = batch.finish().await;
2570 BuiltinTableUpdate::batch(table_id, batch)
2571 });
2572 retraction_tasks.push(task);
2573 }
2574
2575 let retractions_res = futures::future::join_all(retraction_tasks).await;
2576 for retractions in retractions_res {
2577 builtin_table_updates.push(retractions);
2578 }
2579
2580 let audit_join_start = Instant::now();
2581 info!("startup: coordinator init: bootstrap: join audit log deserialization beginning");
2582 let audit_log_updates = audit_log_task.await;
2583 let audit_log_builtin_table_updates = self
2584 .catalog()
2585 .state()
2586 .generate_builtin_table_updates(audit_log_updates);
2587 builtin_table_updates.extend(audit_log_builtin_table_updates);
2588 info!(
2589 "startup: coordinator init: bootstrap: join audit log deserialization complete in {:?}",
2590 audit_join_start.elapsed()
2591 );
2592
2593 table_fence_rx
2595 .await
2596 .expect("One-shot shouldn't be dropped during bootstrap")
2597 .unwrap_or_terminate("cannot fail to append");
2598
2599 info!("coordinator init: sending builtin table updates");
2600 let (_builtin_updates_fut, write_ts) = self
2601 .builtin_table_update()
2602 .execute(builtin_table_updates)
2603 .await;
2604 info!(?write_ts, "our write ts");
2605 if let Some(write_ts) = write_ts {
2606 self.apply_local_write(write_ts).await;
2607 }
2608 }
2609
2610 #[instrument]
2614 fn bootstrap_audit_log_table<'a>(
2615 &mut self,
2616 table_id: CatalogItemId,
2617 name: &'a QualifiedItemName,
2618 table: &'a Table,
2619 audit_logs_iterator: AuditLogIterator,
2620 read_ts: Timestamp,
2621 ) -> JoinHandle<Vec<StateUpdate>> {
2622 let full_name = self.catalog().resolve_full_name(name, None);
2623 debug!("coordinator init: reconciling audit log: {full_name} ({table_id})");
2624 let current_contents_fut = self
2625 .controller
2626 .storage_collections
2627 .snapshot(table.global_id_writes(), read_ts);
2628 spawn(|| format!("snapshot-audit-log-{table_id}"), async move {
2629 let current_contents = current_contents_fut
2630 .await
2631 .unwrap_or_terminate("cannot fail to fetch snapshot");
2632 let contents_len = current_contents.len();
2633 debug!("coordinator init: audit log table ({table_id}) size {contents_len}");
2634
2635 let max_table_id = current_contents
2637 .into_iter()
2638 .filter(|(_, diff)| *diff == 1)
2639 .map(|(row, _diff)| row.unpack_first().unwrap_uint64())
2640 .sorted()
2641 .rev()
2642 .next();
2643
2644 audit_logs_iterator
2646 .take_while(|(audit_log, _)| match max_table_id {
2647 Some(id) => audit_log.event.sortable_id() > id,
2648 None => true,
2649 })
2650 .map(|(audit_log, ts)| StateUpdate {
2651 kind: StateUpdateKind::AuditLog(audit_log),
2652 ts,
2653 diff: StateDiff::Addition,
2654 })
2655 .collect::<Vec<_>>()
2656 })
2657 }
2658
2659 #[instrument]
2672 async fn bootstrap_storage_collections(
2673 &mut self,
2674 migrated_storage_collections: &BTreeSet<CatalogItemId>,
2675 ) {
2676 let catalog = self.catalog();
2677 let source_status_collection_id = catalog
2678 .resolve_builtin_storage_collection(&mz_catalog::builtin::MZ_SOURCE_STATUS_HISTORY);
2679 let source_status_collection_id = catalog
2680 .get_entry(&source_status_collection_id)
2681 .latest_global_id();
2682
2683 let source_desc = |object_id: GlobalId,
2684 data_source: &DataSourceDesc,
2685 desc: &RelationDesc,
2686 timeline: &Timeline| {
2687 let (data_source, status_collection_id) = match data_source.clone() {
2688 DataSourceDesc::Ingestion { desc, cluster_id } => {
2690 let desc = desc.into_inline_connection(catalog.state());
2691 let ingestion = IngestionDescription::new(desc, cluster_id, object_id);
2692
2693 (
2694 DataSource::Ingestion(ingestion),
2695 Some(source_status_collection_id),
2696 )
2697 }
2698 DataSourceDesc::OldSyntaxIngestion {
2699 desc,
2700 progress_subsource,
2701 data_config,
2702 details,
2703 cluster_id,
2704 } => {
2705 let desc = desc.into_inline_connection(catalog.state());
2706 let data_config = data_config.into_inline_connection(catalog.state());
2707 let progress_subsource =
2710 catalog.get_entry(&progress_subsource).latest_global_id();
2711 let mut ingestion =
2712 IngestionDescription::new(desc, cluster_id, progress_subsource);
2713 let legacy_export = SourceExport {
2714 storage_metadata: (),
2715 data_config,
2716 details,
2717 };
2718 ingestion.source_exports.insert(object_id, legacy_export);
2719
2720 (
2721 DataSource::Ingestion(ingestion),
2722 Some(source_status_collection_id),
2723 )
2724 }
2725 DataSourceDesc::IngestionExport {
2726 ingestion_id,
2727 external_reference: _,
2728 details,
2729 data_config,
2730 } => {
2731 let ingestion_id = catalog.get_entry(&ingestion_id).latest_global_id();
2734 (
2735 DataSource::IngestionExport {
2736 ingestion_id,
2737 details,
2738 data_config: data_config.into_inline_connection(catalog.state()),
2739 },
2740 Some(source_status_collection_id),
2741 )
2742 }
2743 DataSourceDesc::Webhook { .. } => {
2744 (DataSource::Webhook, Some(source_status_collection_id))
2745 }
2746 DataSourceDesc::Progress => (DataSource::Progress, None),
2747 DataSourceDesc::Introspection(introspection) => {
2748 (DataSource::Introspection(introspection), None)
2749 }
2750 };
2751 CollectionDescription {
2752 desc: desc.clone(),
2753 data_source,
2754 since: None,
2755 status_collection_id,
2756 timeline: Some(timeline.clone()),
2757 primary: None,
2758 }
2759 };
2760
2761 let mut compute_collections = vec![];
2762 let mut collections = vec![];
2763 let mut new_builtin_continual_tasks = vec![];
2764 for entry in catalog.entries() {
2765 match entry.item() {
2766 CatalogItem::Source(source) => {
2767 collections.push((
2768 source.global_id(),
2769 source_desc(
2770 source.global_id(),
2771 &source.data_source,
2772 &source.desc,
2773 &source.timeline,
2774 ),
2775 ));
2776 }
2777 CatalogItem::Table(table) => {
2778 match &table.data_source {
2779 TableDataSource::TableWrites { defaults: _ } => {
2780 let versions: BTreeMap<_, _> = table
2781 .collection_descs()
2782 .map(|(gid, version, desc)| (version, (gid, desc)))
2783 .collect();
2784 let collection_descs = versions.iter().map(|(version, (gid, desc))| {
2785 let next_version = version.bump();
2786 let primary_collection =
2787 versions.get(&next_version).map(|(gid, _desc)| gid).copied();
2788 let mut collection_desc =
2789 CollectionDescription::for_table(desc.clone());
2790 collection_desc.primary = primary_collection;
2791
2792 (*gid, collection_desc)
2793 });
2794 collections.extend(collection_descs);
2795 }
2796 TableDataSource::DataSource {
2797 desc: data_source_desc,
2798 timeline,
2799 } => {
2800 soft_assert_eq_or_log!(table.collections.len(), 1);
2802 let collection_descs =
2803 table.collection_descs().map(|(gid, _version, desc)| {
2804 (
2805 gid,
2806 source_desc(
2807 entry.latest_global_id(),
2808 data_source_desc,
2809 &desc,
2810 timeline,
2811 ),
2812 )
2813 });
2814 collections.extend(collection_descs);
2815 }
2816 };
2817 }
2818 CatalogItem::MaterializedView(mv) => {
2819 let collection_descs = mv.collection_descs().map(|(gid, _version, desc)| {
2820 let collection_desc =
2821 CollectionDescription::for_other(desc, mv.initial_as_of.clone());
2822 (gid, collection_desc)
2823 });
2824
2825 collections.extend(collection_descs);
2826 compute_collections.push((mv.global_id_writes(), mv.desc.latest()));
2827 }
2828 CatalogItem::ContinualTask(ct) => {
2829 let collection_desc =
2830 CollectionDescription::for_other(ct.desc.clone(), ct.initial_as_of.clone());
2831 if ct.global_id().is_system() && collection_desc.since.is_none() {
2832 new_builtin_continual_tasks.push((ct.global_id(), collection_desc));
2836 } else {
2837 compute_collections.push((ct.global_id(), ct.desc.clone()));
2838 collections.push((ct.global_id(), collection_desc));
2839 }
2840 }
2841 CatalogItem::Sink(sink) => {
2842 let storage_sink_from_entry = self.catalog().get_entry_by_global_id(&sink.from);
2843 let from_desc = storage_sink_from_entry
2844 .desc(&self.catalog().resolve_full_name(
2845 storage_sink_from_entry.name(),
2846 storage_sink_from_entry.conn_id(),
2847 ))
2848 .expect("sinks can only be built on items with descs")
2849 .into_owned();
2850 let collection_desc = CollectionDescription {
2851 desc: KAFKA_PROGRESS_DESC.clone(),
2853 data_source: DataSource::Sink {
2854 desc: ExportDescription {
2855 sink: StorageSinkDesc {
2856 from: sink.from,
2857 from_desc,
2858 connection: sink
2859 .connection
2860 .clone()
2861 .into_inline_connection(self.catalog().state()),
2862 envelope: sink.envelope,
2863 as_of: Antichain::from_elem(Timestamp::minimum()),
2864 with_snapshot: sink.with_snapshot,
2865 version: sink.version,
2866 from_storage_metadata: (),
2867 to_storage_metadata: (),
2868 },
2869 instance_id: sink.cluster_id,
2870 },
2871 },
2872 since: None,
2873 status_collection_id: None,
2874 timeline: None,
2875 primary: None,
2876 };
2877 collections.push((sink.global_id, collection_desc));
2878 }
2879 _ => (),
2880 }
2881 }
2882
2883 let register_ts = if self.controller.read_only() {
2884 self.get_local_read_ts().await
2885 } else {
2886 self.get_local_write_ts().await.timestamp
2889 };
2890
2891 let storage_metadata = self.catalog.state().storage_metadata();
2892 let migrated_storage_collections = migrated_storage_collections
2893 .into_iter()
2894 .flat_map(|item_id| self.catalog.get_entry(item_id).global_ids())
2895 .collect();
2896
2897 self.controller
2902 .storage
2903 .evolve_nullability_for_bootstrap(storage_metadata, compute_collections)
2904 .await
2905 .unwrap_or_terminate("cannot fail to evolve collections");
2906
2907 self.controller
2908 .storage
2909 .create_collections_for_bootstrap(
2910 storage_metadata,
2911 Some(register_ts),
2912 collections,
2913 &migrated_storage_collections,
2914 )
2915 .await
2916 .unwrap_or_terminate("cannot fail to create collections");
2917
2918 self.bootstrap_builtin_continual_tasks(new_builtin_continual_tasks)
2919 .await;
2920
2921 if !self.controller.read_only() {
2922 self.apply_local_write(register_ts).await;
2923 }
2924 }
2925
2926 async fn bootstrap_builtin_continual_tasks(
2933 &mut self,
2934 mut collections: Vec<(GlobalId, CollectionDescription<Timestamp>)>,
2936 ) {
2937 for (id, collection) in &mut collections {
2938 let entry = self.catalog.get_entry_by_global_id(id);
2939 let ct = match &entry.item {
2940 CatalogItem::ContinualTask(ct) => ct.clone(),
2941 _ => unreachable!("only called with continual task builtins"),
2942 };
2943 let debug_name = self
2944 .catalog()
2945 .resolve_full_name(entry.name(), None)
2946 .to_string();
2947 let (_optimized_plan, physical_plan, _metainfo) = self
2948 .optimize_create_continual_task(&ct, *id, self.owned_catalog(), debug_name)
2949 .expect("builtin CT should optimize successfully");
2950
2951 let mut id_bundle = dataflow_import_id_bundle(&physical_plan, ct.cluster_id);
2953 id_bundle.storage_ids.remove(id);
2955 let read_holds = self.acquire_read_holds(&id_bundle);
2956 let as_of = read_holds.least_valid_read();
2957
2958 collection.since = Some(as_of.clone());
2959 }
2960 self.controller
2961 .storage
2962 .create_collections(self.catalog.state().storage_metadata(), None, collections)
2963 .await
2964 .unwrap_or_terminate("cannot fail to create collections");
2965 }
2966
2967 #[instrument]
2978 fn bootstrap_dataflow_plans(
2979 &mut self,
2980 ordered_catalog_entries: &[CatalogEntry],
2981 mut cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
2982 ) -> Result<BTreeMap<GlobalId, GlobalExpressions>, AdapterError> {
2983 let mut instance_snapshots = BTreeMap::new();
2989 let mut uncached_expressions = BTreeMap::new();
2990
2991 let optimizer_config = OptimizerConfig::from(self.catalog().system_config());
2992
2993 for entry in ordered_catalog_entries {
2994 match entry.item() {
2995 CatalogItem::Index(idx) => {
2996 let compute_instance =
2998 instance_snapshots.entry(idx.cluster_id).or_insert_with(|| {
2999 self.instance_snapshot(idx.cluster_id)
3000 .expect("compute instance exists")
3001 });
3002 let global_id = idx.global_id();
3003
3004 if compute_instance.contains_collection(&global_id) {
3007 continue;
3008 }
3009
3010 let (optimized_plan, physical_plan, metainfo) =
3011 match cached_global_exprs.remove(&global_id) {
3012 Some(global_expressions)
3013 if global_expressions.optimizer_features
3014 == optimizer_config.features =>
3015 {
3016 debug!("global expression cache hit for {global_id:?}");
3017 (
3018 global_expressions.global_mir,
3019 global_expressions.physical_plan,
3020 global_expressions.dataflow_metainfos,
3021 )
3022 }
3023 Some(_) | None => {
3024 let (optimized_plan, global_lir_plan) = {
3025 let mut optimizer = optimize::index::Optimizer::new(
3027 self.owned_catalog(),
3028 compute_instance.clone(),
3029 global_id,
3030 optimizer_config.clone(),
3031 self.optimizer_metrics(),
3032 );
3033
3034 let index_plan = optimize::index::Index::new(
3036 entry.name().clone(),
3037 idx.on,
3038 idx.keys.to_vec(),
3039 );
3040 let global_mir_plan = optimizer.optimize(index_plan)?;
3041 let optimized_plan = global_mir_plan.df_desc().clone();
3042
3043 let global_lir_plan = optimizer.optimize(global_mir_plan)?;
3045
3046 (optimized_plan, global_lir_plan)
3047 };
3048
3049 let (physical_plan, metainfo) = global_lir_plan.unapply();
3050 let metainfo = {
3051 let notice_ids =
3053 std::iter::repeat_with(|| self.allocate_transient_id())
3054 .map(|(_item_id, gid)| gid)
3055 .take(metainfo.optimizer_notices.len())
3056 .collect::<Vec<_>>();
3057 self.catalog().render_notices(
3059 metainfo,
3060 notice_ids,
3061 Some(idx.global_id()),
3062 )
3063 };
3064 uncached_expressions.insert(
3065 global_id,
3066 GlobalExpressions {
3067 global_mir: optimized_plan.clone(),
3068 physical_plan: physical_plan.clone(),
3069 dataflow_metainfos: metainfo.clone(),
3070 optimizer_features: OptimizerFeatures::from(
3071 self.catalog().system_config(),
3072 ),
3073 },
3074 );
3075 (optimized_plan, physical_plan, metainfo)
3076 }
3077 };
3078
3079 let catalog = self.catalog_mut();
3080 catalog.set_optimized_plan(idx.global_id(), optimized_plan);
3081 catalog.set_physical_plan(idx.global_id(), physical_plan);
3082 catalog.set_dataflow_metainfo(idx.global_id(), metainfo);
3083
3084 compute_instance.insert_collection(idx.global_id());
3085 }
3086 CatalogItem::MaterializedView(mv) => {
3087 let compute_instance =
3089 instance_snapshots.entry(mv.cluster_id).or_insert_with(|| {
3090 self.instance_snapshot(mv.cluster_id)
3091 .expect("compute instance exists")
3092 });
3093 let global_id = mv.global_id_writes();
3094
3095 let (optimized_plan, physical_plan, metainfo) =
3096 match cached_global_exprs.remove(&global_id) {
3097 Some(global_expressions)
3098 if global_expressions.optimizer_features
3099 == optimizer_config.features =>
3100 {
3101 debug!("global expression cache hit for {global_id:?}");
3102 (
3103 global_expressions.global_mir,
3104 global_expressions.physical_plan,
3105 global_expressions.dataflow_metainfos,
3106 )
3107 }
3108 Some(_) | None => {
3109 let (_, internal_view_id) = self.allocate_transient_id();
3110 let debug_name = self
3111 .catalog()
3112 .resolve_full_name(entry.name(), None)
3113 .to_string();
3114 let force_non_monotonic = Default::default();
3115
3116 let (optimized_plan, global_lir_plan) = {
3117 let mut optimizer = optimize::materialized_view::Optimizer::new(
3119 self.owned_catalog().as_optimizer_catalog(),
3120 compute_instance.clone(),
3121 global_id,
3122 internal_view_id,
3123 mv.desc.latest().iter_names().cloned().collect(),
3124 mv.non_null_assertions.clone(),
3125 mv.refresh_schedule.clone(),
3126 debug_name,
3127 optimizer_config.clone(),
3128 self.optimizer_metrics(),
3129 force_non_monotonic,
3130 );
3131
3132 let global_mir_plan =
3134 optimizer.optimize(mv.optimized_expr.as_ref().clone())?;
3135 let optimized_plan = global_mir_plan.df_desc().clone();
3136
3137 let global_lir_plan = optimizer.optimize(global_mir_plan)?;
3139
3140 (optimized_plan, global_lir_plan)
3141 };
3142
3143 let (physical_plan, metainfo) = global_lir_plan.unapply();
3144 let metainfo = {
3145 let notice_ids =
3147 std::iter::repeat_with(|| self.allocate_transient_id())
3148 .map(|(_item_id, global_id)| global_id)
3149 .take(metainfo.optimizer_notices.len())
3150 .collect::<Vec<_>>();
3151 self.catalog().render_notices(
3153 metainfo,
3154 notice_ids,
3155 Some(mv.global_id_writes()),
3156 )
3157 };
3158 uncached_expressions.insert(
3159 global_id,
3160 GlobalExpressions {
3161 global_mir: optimized_plan.clone(),
3162 physical_plan: physical_plan.clone(),
3163 dataflow_metainfos: metainfo.clone(),
3164 optimizer_features: OptimizerFeatures::from(
3165 self.catalog().system_config(),
3166 ),
3167 },
3168 );
3169 (optimized_plan, physical_plan, metainfo)
3170 }
3171 };
3172
3173 let catalog = self.catalog_mut();
3174 catalog.set_optimized_plan(mv.global_id_writes(), optimized_plan);
3175 catalog.set_physical_plan(mv.global_id_writes(), physical_plan);
3176 catalog.set_dataflow_metainfo(mv.global_id_writes(), metainfo);
3177
3178 compute_instance.insert_collection(mv.global_id_writes());
3179 }
3180 CatalogItem::ContinualTask(ct) => {
3181 let compute_instance =
3182 instance_snapshots.entry(ct.cluster_id).or_insert_with(|| {
3183 self.instance_snapshot(ct.cluster_id)
3184 .expect("compute instance exists")
3185 });
3186 let global_id = ct.global_id();
3187
3188 let (optimized_plan, physical_plan, metainfo) =
3189 match cached_global_exprs.remove(&global_id) {
3190 Some(global_expressions)
3191 if global_expressions.optimizer_features
3192 == optimizer_config.features =>
3193 {
3194 debug!("global expression cache hit for {global_id:?}");
3195 (
3196 global_expressions.global_mir,
3197 global_expressions.physical_plan,
3198 global_expressions.dataflow_metainfos,
3199 )
3200 }
3201 Some(_) | None => {
3202 let debug_name = self
3203 .catalog()
3204 .resolve_full_name(entry.name(), None)
3205 .to_string();
3206 let (optimized_plan, physical_plan, metainfo) = self
3207 .optimize_create_continual_task(
3208 ct,
3209 global_id,
3210 self.owned_catalog(),
3211 debug_name,
3212 )?;
3213 uncached_expressions.insert(
3214 global_id,
3215 GlobalExpressions {
3216 global_mir: optimized_plan.clone(),
3217 physical_plan: physical_plan.clone(),
3218 dataflow_metainfos: metainfo.clone(),
3219 optimizer_features: OptimizerFeatures::from(
3220 self.catalog().system_config(),
3221 ),
3222 },
3223 );
3224 (optimized_plan, physical_plan, metainfo)
3225 }
3226 };
3227
3228 let catalog = self.catalog_mut();
3229 catalog.set_optimized_plan(ct.global_id(), optimized_plan);
3230 catalog.set_physical_plan(ct.global_id(), physical_plan);
3231 catalog.set_dataflow_metainfo(ct.global_id(), metainfo);
3232
3233 compute_instance.insert_collection(ct.global_id());
3234 }
3235 _ => (),
3236 }
3237 }
3238
3239 Ok(uncached_expressions)
3240 }
3241
3242 async fn bootstrap_dataflow_as_ofs(&mut self) -> BTreeMap<GlobalId, ReadHold<Timestamp>> {
3252 let mut catalog_ids = Vec::new();
3253 let mut dataflows = Vec::new();
3254 let mut read_policies = BTreeMap::new();
3255 for entry in self.catalog.entries() {
3256 let gid = match entry.item() {
3257 CatalogItem::Index(idx) => idx.global_id(),
3258 CatalogItem::MaterializedView(mv) => mv.global_id_writes(),
3259 CatalogItem::ContinualTask(ct) => ct.global_id(),
3260 CatalogItem::Table(_)
3261 | CatalogItem::Source(_)
3262 | CatalogItem::Log(_)
3263 | CatalogItem::View(_)
3264 | CatalogItem::Sink(_)
3265 | CatalogItem::Type(_)
3266 | CatalogItem::Func(_)
3267 | CatalogItem::Secret(_)
3268 | CatalogItem::Connection(_) => continue,
3269 };
3270 if let Some(plan) = self.catalog.try_get_physical_plan(&gid) {
3271 catalog_ids.push(gid);
3272 dataflows.push(plan.clone());
3273
3274 if let Some(compaction_window) = entry.item().initial_logical_compaction_window() {
3275 read_policies.insert(gid, compaction_window.into());
3276 }
3277 }
3278 }
3279
3280 let read_ts = self.get_local_read_ts().await;
3281 let read_holds = as_of_selection::run(
3282 &mut dataflows,
3283 &read_policies,
3284 &*self.controller.storage_collections,
3285 read_ts,
3286 self.controller.read_only(),
3287 );
3288
3289 let catalog = self.catalog_mut();
3290 for (id, plan) in catalog_ids.into_iter().zip_eq(dataflows) {
3291 catalog.set_physical_plan(id, plan);
3292 }
3293
3294 read_holds
3295 }
3296
3297 fn serve(
3306 mut self,
3307 mut internal_cmd_rx: mpsc::UnboundedReceiver<Message>,
3308 mut strict_serializable_reads_rx: mpsc::UnboundedReceiver<(ConnectionId, PendingReadTxn)>,
3309 mut cmd_rx: mpsc::UnboundedReceiver<(OpenTelemetryContext, Command)>,
3310 group_commit_rx: appends::GroupCommitWaiter,
3311 ) -> LocalBoxFuture<'static, ()> {
3312 async move {
3313 let mut cluster_events = self.controller.events_stream();
3315 let last_message = Arc::new(Mutex::new(LastMessage {
3316 kind: "none",
3317 stmt: None,
3318 }));
3319
3320 let (idle_tx, mut idle_rx) = tokio::sync::mpsc::channel(1);
3321 let idle_metric = self.metrics.queue_busy_seconds.clone();
3322 let last_message_watchdog = Arc::clone(&last_message);
3323
3324 spawn(|| "coord watchdog", async move {
3325 let mut interval = tokio::time::interval(Duration::from_secs(5));
3330 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
3334
3335 let mut coord_stuck = false;
3337
3338 loop {
3339 interval.tick().await;
3340
3341 let duration = tokio::time::Duration::from_secs(30);
3343 let timeout = tokio::time::timeout(duration, idle_tx.reserve()).await;
3344 let Ok(maybe_permit) = timeout else {
3345 if !coord_stuck {
3347 let last_message = last_message_watchdog.lock().expect("poisoned");
3348 tracing::warn!(
3349 last_message_kind = %last_message.kind,
3350 last_message_sql = %last_message.stmt_to_string(),
3351 "coordinator stuck for {duration:?}",
3352 );
3353 }
3354 coord_stuck = true;
3355
3356 continue;
3357 };
3358
3359 if coord_stuck {
3361 tracing::info!("Coordinator became unstuck");
3362 }
3363 coord_stuck = false;
3364
3365 let Ok(permit) = maybe_permit else {
3367 break;
3368 };
3369
3370 permit.send(idle_metric.start_timer());
3371 }
3372 });
3373
3374 self.schedule_storage_usage_collection().await;
3375 self.spawn_privatelink_vpc_endpoints_watch_task();
3376 self.spawn_statement_logging_task();
3377 flags::tracing_config(self.catalog.system_config()).apply(&self.tracing_handle);
3378
3379 let warn_threshold = self
3381 .catalog()
3382 .system_config()
3383 .coord_slow_message_warn_threshold();
3384
3385 const MESSAGE_BATCH: usize = 64;
3387 let mut messages = Vec::with_capacity(MESSAGE_BATCH);
3388 let mut cmd_messages = Vec::with_capacity(MESSAGE_BATCH);
3389
3390 let message_batch = self.metrics.message_batch.clone();
3391
3392 loop {
3393 select! {
3397 biased;
3402
3403 _ = internal_cmd_rx.recv_many(&mut messages, MESSAGE_BATCH) => {},
3407 Some(event) = cluster_events.next() => messages.push(Message::ClusterEvent(event)),
3411 () = self.controller.ready() => {
3415 let controller = match self.controller.get_readiness() {
3419 Readiness::Storage => ControllerReadiness::Storage,
3420 Readiness::Compute => ControllerReadiness::Compute,
3421 Readiness::Metrics(_) => ControllerReadiness::Metrics,
3422 Readiness::Internal(_) => ControllerReadiness::Internal,
3423 Readiness::NotReady => unreachable!("just signaled as ready"),
3424 };
3425 messages.push(Message::ControllerReady { controller });
3426 }
3427 permit = group_commit_rx.ready() => {
3430 let user_write_spans = self.pending_writes.iter().flat_map(|x| match x {
3436 PendingWriteTxn::User{span, ..} => Some(span),
3437 PendingWriteTxn::System{..} => None,
3438 });
3439 let span = match user_write_spans.exactly_one() {
3440 Ok(span) => span.clone(),
3441 Err(user_write_spans) => {
3442 let span = info_span!(parent: None, "group_commit_notify");
3443 for s in user_write_spans {
3444 span.follows_from(s);
3445 }
3446 span
3447 }
3448 };
3449 messages.push(Message::GroupCommitInitiate(span, Some(permit)));
3450 },
3451 count = cmd_rx.recv_many(&mut cmd_messages, MESSAGE_BATCH) => {
3455 if count == 0 {
3456 break;
3457 } else {
3458 messages.extend(cmd_messages.drain(..).map(|(otel_ctx, cmd)| Message::Command(otel_ctx, cmd)));
3459 }
3460 },
3461 Some(pending_read_txn) = strict_serializable_reads_rx.recv() => {
3465 let mut pending_read_txns = vec![pending_read_txn];
3466 while let Ok(pending_read_txn) = strict_serializable_reads_rx.try_recv() {
3467 pending_read_txns.push(pending_read_txn);
3468 }
3469 for (conn_id, pending_read_txn) in pending_read_txns {
3470 let prev = self.pending_linearize_read_txns.insert(conn_id, pending_read_txn);
3471 soft_assert_or_log!(
3472 prev.is_none(),
3473 "connections can not have multiple concurrent reads, prev: {prev:?}"
3474 )
3475 }
3476 messages.push(Message::LinearizeReads);
3477 }
3478 _ = self.advance_timelines_interval.tick() => {
3482 let span = info_span!(parent: None, "coord::advance_timelines_interval");
3483 span.follows_from(Span::current());
3484
3485 if self.controller.read_only() {
3490 messages.push(Message::AdvanceTimelines);
3491 } else {
3492 messages.push(Message::GroupCommitInitiate(span, None));
3493 }
3494 },
3495 _ = self.check_cluster_scheduling_policies_interval.tick() => {
3499 messages.push(Message::CheckSchedulingPolicies);
3500 },
3501
3502 _ = self.caught_up_check_interval.tick() => {
3506 self.maybe_check_caught_up().await;
3511
3512 continue;
3513 },
3514
3515 timer = idle_rx.recv() => {
3520 timer.expect("does not drop").observe_duration();
3521 self.metrics
3522 .message_handling
3523 .with_label_values(&["watchdog"])
3524 .observe(0.0);
3525 continue;
3526 }
3527 };
3528
3529 message_batch.observe(f64::cast_lossy(messages.len()));
3531
3532 for msg in messages.drain(..) {
3533 let msg_kind = msg.kind();
3536 let span = span!(
3537 target: "mz_adapter::coord::handle_message_loop",
3538 Level::INFO,
3539 "coord::handle_message",
3540 kind = msg_kind
3541 );
3542 let otel_context = span.context().span().span_context().clone();
3543
3544 *last_message.lock().expect("poisoned") = LastMessage {
3548 kind: msg_kind,
3549 stmt: match &msg {
3550 Message::Command(
3551 _,
3552 Command::Execute {
3553 portal_name,
3554 session,
3555 ..
3556 },
3557 ) => session
3558 .get_portal_unverified(portal_name)
3559 .and_then(|p| p.stmt.as_ref().map(Arc::clone)),
3560 _ => None,
3561 },
3562 };
3563
3564 let start = Instant::now();
3565 self.handle_message(msg).instrument(span).await;
3566 let duration = start.elapsed();
3567
3568 self.metrics
3569 .message_handling
3570 .with_label_values(&[msg_kind])
3571 .observe(duration.as_secs_f64());
3572
3573 if duration > warn_threshold {
3575 let trace_id = otel_context.is_valid().then(|| otel_context.trace_id());
3576 tracing::error!(
3577 ?msg_kind,
3578 ?trace_id,
3579 ?duration,
3580 "very slow coordinator message"
3581 );
3582 }
3583 }
3584 }
3585 if let Some(catalog) = Arc::into_inner(self.catalog) {
3588 catalog.expire().await;
3589 }
3590 }
3591 .boxed_local()
3592 }
3593
3594 fn catalog(&self) -> &Catalog {
3596 &self.catalog
3597 }
3598
3599 fn owned_catalog(&self) -> Arc<Catalog> {
3602 Arc::clone(&self.catalog)
3603 }
3604
3605 fn optimizer_metrics(&self) -> OptimizerMetrics {
3608 self.optimizer_metrics.clone()
3609 }
3610
3611 fn catalog_mut(&mut self) -> &mut Catalog {
3613 Arc::make_mut(&mut self.catalog)
3621 }
3622
3623 fn connection_context(&self) -> &ConnectionContext {
3625 self.controller.connection_context()
3626 }
3627
3628 fn secrets_reader(&self) -> &Arc<dyn SecretsReader> {
3630 &self.connection_context().secrets_reader
3631 }
3632
3633 #[allow(dead_code)]
3638 pub(crate) fn broadcast_notice(&self, notice: AdapterNotice) {
3639 for meta in self.active_conns.values() {
3640 let _ = meta.notice_tx.send(notice.clone());
3641 }
3642 }
3643
3644 pub(crate) fn broadcast_notice_tx(
3647 &self,
3648 ) -> Box<dyn FnOnce(AdapterNotice) -> () + Send + 'static> {
3649 let senders: Vec<_> = self
3650 .active_conns
3651 .values()
3652 .map(|meta| meta.notice_tx.clone())
3653 .collect();
3654 Box::new(move |notice| {
3655 for tx in senders {
3656 let _ = tx.send(notice.clone());
3657 }
3658 })
3659 }
3660
3661 pub(crate) fn active_conns(&self) -> &BTreeMap<ConnectionId, ConnMeta> {
3662 &self.active_conns
3663 }
3664
3665 #[instrument(level = "debug")]
3666 pub(crate) fn retire_execution(
3667 &mut self,
3668 reason: StatementEndedExecutionReason,
3669 ctx_extra: ExecuteContextExtra,
3670 ) {
3671 if let Some(uuid) = ctx_extra.retire() {
3672 self.end_statement_execution(uuid, reason);
3673 }
3674 }
3675
3676 #[instrument(level = "debug")]
3678 pub fn dataflow_builder(&self, instance: ComputeInstanceId) -> DataflowBuilder<'_> {
3679 let compute = self
3680 .instance_snapshot(instance)
3681 .expect("compute instance does not exist");
3682 DataflowBuilder::new(self.catalog().state(), compute)
3683 }
3684
3685 pub fn instance_snapshot(
3687 &self,
3688 id: ComputeInstanceId,
3689 ) -> Result<ComputeInstanceSnapshot, InstanceMissing> {
3690 ComputeInstanceSnapshot::new(&self.controller, id)
3691 }
3692
3693 pub(crate) async fn ship_dataflow(
3700 &mut self,
3701 dataflow: DataflowDescription<Plan>,
3702 instance: ComputeInstanceId,
3703 subscribe_target_replica: Option<ReplicaId>,
3704 ) {
3705 self.try_ship_dataflow(dataflow, instance, subscribe_target_replica)
3706 .await
3707 .unwrap_or_terminate("dataflow creation cannot fail");
3708 }
3709
3710 pub(crate) async fn try_ship_dataflow(
3713 &mut self,
3714 dataflow: DataflowDescription<Plan>,
3715 instance: ComputeInstanceId,
3716 subscribe_target_replica: Option<ReplicaId>,
3717 ) -> Result<(), DataflowCreationError> {
3718 let export_ids = dataflow.exported_index_ids().collect();
3721
3722 self.controller
3723 .compute
3724 .create_dataflow(instance, dataflow, subscribe_target_replica)?;
3725
3726 self.initialize_compute_read_policies(export_ids, instance, CompactionWindow::Default)
3727 .await;
3728
3729 Ok(())
3730 }
3731
3732 pub(crate) fn allow_writes(&mut self, instance: ComputeInstanceId, id: GlobalId) {
3736 self.controller
3737 .compute
3738 .allow_writes(instance, id)
3739 .unwrap_or_terminate("allow_writes cannot fail");
3740 }
3741
3742 pub(crate) async fn ship_dataflow_and_notice_builtin_table_updates(
3744 &mut self,
3745 dataflow: DataflowDescription<Plan>,
3746 instance: ComputeInstanceId,
3747 notice_builtin_updates_fut: Option<BuiltinTableAppendNotify>,
3748 ) {
3749 if let Some(notice_builtin_updates_fut) = notice_builtin_updates_fut {
3750 let ship_dataflow_fut = self.ship_dataflow(dataflow, instance, None);
3751 let ((), ()) =
3752 futures::future::join(notice_builtin_updates_fut, ship_dataflow_fut).await;
3753 } else {
3754 self.ship_dataflow(dataflow, instance, None).await;
3755 }
3756 }
3757
3758 pub fn install_compute_watch_set(
3762 &mut self,
3763 conn_id: ConnectionId,
3764 objects: BTreeSet<GlobalId>,
3765 t: Timestamp,
3766 state: WatchSetResponse,
3767 ) {
3768 let ws_id = self.controller.install_compute_watch_set(objects, t);
3769 self.connection_watch_sets
3770 .entry(conn_id.clone())
3771 .or_default()
3772 .insert(ws_id);
3773 self.installed_watch_sets.insert(ws_id, (conn_id, state));
3774 }
3775
3776 pub fn install_storage_watch_set(
3780 &mut self,
3781 conn_id: ConnectionId,
3782 objects: BTreeSet<GlobalId>,
3783 t: Timestamp,
3784 state: WatchSetResponse,
3785 ) {
3786 let ws_id = self.controller.install_storage_watch_set(objects, t);
3787 self.connection_watch_sets
3788 .entry(conn_id.clone())
3789 .or_default()
3790 .insert(ws_id);
3791 self.installed_watch_sets.insert(ws_id, (conn_id, state));
3792 }
3793
3794 pub fn cancel_pending_watchsets(&mut self, conn_id: &ConnectionId) {
3796 if let Some(ws_ids) = self.connection_watch_sets.remove(conn_id) {
3797 for ws_id in ws_ids {
3798 self.installed_watch_sets.remove(&ws_id);
3799 }
3800 }
3801 }
3802
3803 pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
3807 let global_timelines: BTreeMap<_, _> = self
3813 .global_timelines
3814 .iter()
3815 .map(|(timeline, state)| (timeline.to_string(), format!("{state:?}")))
3816 .collect();
3817 let active_conns: BTreeMap<_, _> = self
3818 .active_conns
3819 .iter()
3820 .map(|(id, meta)| (id.unhandled().to_string(), format!("{meta:?}")))
3821 .collect();
3822 let txn_read_holds: BTreeMap<_, _> = self
3823 .txn_read_holds
3824 .iter()
3825 .map(|(id, capability)| (id.unhandled().to_string(), format!("{capability:?}")))
3826 .collect();
3827 let pending_peeks: BTreeMap<_, _> = self
3828 .pending_peeks
3829 .iter()
3830 .map(|(id, peek)| (id.to_string(), format!("{peek:?}")))
3831 .collect();
3832 let client_pending_peeks: BTreeMap<_, _> = self
3833 .client_pending_peeks
3834 .iter()
3835 .map(|(id, peek)| {
3836 let peek: BTreeMap<_, _> = peek
3837 .iter()
3838 .map(|(uuid, storage_id)| (uuid.to_string(), storage_id))
3839 .collect();
3840 (id.to_string(), peek)
3841 })
3842 .collect();
3843 let pending_linearize_read_txns: BTreeMap<_, _> = self
3844 .pending_linearize_read_txns
3845 .iter()
3846 .map(|(id, read_txn)| (id.unhandled().to_string(), format!("{read_txn:?}")))
3847 .collect();
3848
3849 let map = serde_json::Map::from_iter([
3850 (
3851 "global_timelines".to_string(),
3852 serde_json::to_value(global_timelines)?,
3853 ),
3854 (
3855 "active_conns".to_string(),
3856 serde_json::to_value(active_conns)?,
3857 ),
3858 (
3859 "txn_read_holds".to_string(),
3860 serde_json::to_value(txn_read_holds)?,
3861 ),
3862 (
3863 "pending_peeks".to_string(),
3864 serde_json::to_value(pending_peeks)?,
3865 ),
3866 (
3867 "client_pending_peeks".to_string(),
3868 serde_json::to_value(client_pending_peeks)?,
3869 ),
3870 (
3871 "pending_linearize_read_txns".to_string(),
3872 serde_json::to_value(pending_linearize_read_txns)?,
3873 ),
3874 ("controller".to_string(), self.controller.dump().await?),
3875 ]);
3876 Ok(serde_json::Value::Object(map))
3877 }
3878
3879 async fn prune_storage_usage_events_on_startup(&self, retention_period: Duration) {
3893 let item_id = self
3894 .catalog()
3895 .resolve_builtin_table(&MZ_STORAGE_USAGE_BY_SHARD);
3896 let global_id = self.catalog.get_entry(&item_id).latest_global_id();
3897 let read_ts = self.get_local_read_ts().await;
3898 let current_contents_fut = self
3899 .controller
3900 .storage_collections
3901 .snapshot(global_id, read_ts);
3902 let internal_cmd_tx = self.internal_cmd_tx.clone();
3903 spawn(|| "storage_usage_prune", async move {
3904 let mut current_contents = current_contents_fut
3905 .await
3906 .unwrap_or_terminate("cannot fail to fetch snapshot");
3907 differential_dataflow::consolidation::consolidate(&mut current_contents);
3908
3909 let cutoff_ts = u128::from(read_ts).saturating_sub(retention_period.as_millis());
3910 let mut expired = Vec::new();
3911 for (row, diff) in current_contents {
3912 assert_eq!(
3913 diff, 1,
3914 "consolidated contents should not contain retractions: ({row:#?}, {diff:#?})"
3915 );
3916 let collection_timestamp = row
3918 .unpack()
3919 .get(3)
3920 .expect("definition of mz_storage_by_shard changed")
3921 .unwrap_timestamptz();
3922 let collection_timestamp = collection_timestamp.timestamp_millis();
3923 let collection_timestamp: u128 = collection_timestamp
3924 .try_into()
3925 .expect("all collections happen after Jan 1 1970");
3926 if collection_timestamp < cutoff_ts {
3927 debug!("pruning storage event {row:?}");
3928 let builtin_update = BuiltinTableUpdate::row(item_id, row, Diff::MINUS_ONE);
3929 expired.push(builtin_update);
3930 }
3931 }
3932
3933 let _ = internal_cmd_tx.send(Message::StorageUsagePrune(expired));
3935 });
3936 }
3937
3938 fn current_credit_consumption_rate(&self) -> Numeric {
3939 self.catalog()
3940 .user_cluster_replicas()
3941 .filter_map(|replica| match &replica.config.location {
3942 ReplicaLocation::Managed(location) => Some(location.size_for_billing()),
3943 ReplicaLocation::Unmanaged(_) => None,
3944 })
3945 .map(|size| {
3946 self.catalog()
3947 .cluster_replica_sizes()
3948 .0
3949 .get(size)
3950 .expect("location size is validated against the cluster replica sizes")
3951 .credits_per_hour
3952 })
3953 .sum()
3954 }
3955}
3956
3957#[cfg(test)]
3958impl Coordinator {
3959 #[allow(dead_code)]
3960 async fn verify_ship_dataflow_no_error(&mut self, dataflow: DataflowDescription<Plan>) {
3961 let compute_instance = ComputeInstanceId::user(1).expect("1 is a valid ID");
3969
3970 let _: () = self.ship_dataflow(dataflow, compute_instance, None).await;
3971 }
3972}
3973
3974struct LastMessage {
3976 kind: &'static str,
3977 stmt: Option<Arc<Statement<Raw>>>,
3978}
3979
3980impl LastMessage {
3981 fn stmt_to_string(&self) -> Cow<'static, str> {
3983 self.stmt
3984 .as_ref()
3985 .map(|stmt| stmt.to_ast_string_redacted().into())
3986 .unwrap_or(Cow::Borrowed("<none>"))
3987 }
3988}
3989
3990impl fmt::Debug for LastMessage {
3991 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3992 f.debug_struct("LastMessage")
3993 .field("kind", &self.kind)
3994 .field("stmt", &self.stmt_to_string())
3995 .finish()
3996 }
3997}
3998
3999impl Drop for LastMessage {
4000 fn drop(&mut self) {
4001 if std::thread::panicking() {
4003 eprintln!("Coordinator panicking, dumping last message\n{self:?}",);
4005 }
4006 }
4007}
4008
4009pub fn serve(
4021 Config {
4022 controller_config,
4023 controller_envd_epoch,
4024 mut storage,
4025 audit_logs_iterator,
4026 timestamp_oracle_url,
4027 unsafe_mode,
4028 all_features,
4029 build_info,
4030 environment_id,
4031 metrics_registry,
4032 now,
4033 secrets_controller,
4034 cloud_resource_controller,
4035 cluster_replica_sizes,
4036 builtin_system_cluster_config,
4037 builtin_catalog_server_cluster_config,
4038 builtin_probe_cluster_config,
4039 builtin_support_cluster_config,
4040 builtin_analytics_cluster_config,
4041 system_parameter_defaults,
4042 availability_zones,
4043 storage_usage_client,
4044 storage_usage_collection_interval,
4045 storage_usage_retention_period,
4046 segment_client,
4047 egress_addresses,
4048 aws_account_id,
4049 aws_privatelink_availability_zones,
4050 connection_context,
4051 connection_limit_callback,
4052 remote_system_parameters,
4053 webhook_concurrency_limit,
4054 http_host_name,
4055 tracing_handle,
4056 read_only_controllers,
4057 caught_up_trigger: clusters_caught_up_trigger,
4058 helm_chart_version,
4059 license_key,
4060 external_login_password_mz_system,
4061 force_builtin_schema_migration,
4062 }: Config,
4063) -> BoxFuture<'static, Result<(Handle, Client), AdapterError>> {
4064 async move {
4065 let coord_start = Instant::now();
4066 info!("startup: coordinator init: beginning");
4067 info!("startup: coordinator init: preamble beginning");
4068
4069 let _builtins = LazyLock::force(&BUILTINS_STATIC);
4073
4074 let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
4075 let (internal_cmd_tx, internal_cmd_rx) = mpsc::unbounded_channel();
4076 let (strict_serializable_reads_tx, strict_serializable_reads_rx) =
4077 mpsc::unbounded_channel();
4078
4079 if !availability_zones.iter().all_unique() {
4081 coord_bail!("availability zones must be unique");
4082 }
4083
4084 let aws_principal_context = match (
4085 aws_account_id,
4086 connection_context.aws_external_id_prefix.clone(),
4087 ) {
4088 (Some(aws_account_id), Some(aws_external_id_prefix)) => Some(AwsPrincipalContext {
4089 aws_account_id,
4090 aws_external_id_prefix,
4091 }),
4092 _ => None,
4093 };
4094
4095 let aws_privatelink_availability_zones = aws_privatelink_availability_zones
4096 .map(|azs_vec| BTreeSet::from_iter(azs_vec.iter().cloned()));
4097
4098 info!(
4099 "startup: coordinator init: preamble complete in {:?}",
4100 coord_start.elapsed()
4101 );
4102 let oracle_init_start = Instant::now();
4103 info!("startup: coordinator init: timestamp oracle init beginning");
4104
4105 let pg_timestamp_oracle_config = timestamp_oracle_url
4106 .map(|pg_url| PostgresTimestampOracleConfig::new(&pg_url, &metrics_registry));
4107 let mut initial_timestamps =
4108 get_initial_oracle_timestamps(&pg_timestamp_oracle_config).await?;
4109
4110 initial_timestamps
4114 .entry(Timeline::EpochMilliseconds)
4115 .or_insert_with(mz_repr::Timestamp::minimum);
4116 let mut timestamp_oracles = BTreeMap::new();
4117 for (timeline, initial_timestamp) in initial_timestamps {
4118 Coordinator::ensure_timeline_state_with_initial_time(
4119 &timeline,
4120 initial_timestamp,
4121 now.clone(),
4122 pg_timestamp_oracle_config.clone(),
4123 &mut timestamp_oracles,
4124 read_only_controllers,
4125 )
4126 .await;
4127 }
4128
4129 let catalog_upper = storage.current_upper().await;
4133 let epoch_millis_oracle = ×tamp_oracles
4139 .get(&Timeline::EpochMilliseconds)
4140 .expect("inserted above")
4141 .oracle;
4142
4143 let mut boot_ts = if read_only_controllers {
4144 let read_ts = epoch_millis_oracle.read_ts().await;
4145 std::cmp::max(read_ts, catalog_upper)
4146 } else {
4147 epoch_millis_oracle.apply_write(catalog_upper).await;
4150 epoch_millis_oracle.write_ts().await.timestamp
4151 };
4152
4153 info!(
4154 "startup: coordinator init: timestamp oracle init complete in {:?}",
4155 oracle_init_start.elapsed()
4156 );
4157
4158 let catalog_open_start = Instant::now();
4159 info!("startup: coordinator init: catalog open beginning");
4160 let persist_client = controller_config
4161 .persist_clients
4162 .open(controller_config.persist_location.clone())
4163 .await
4164 .context("opening persist client")?;
4165 let builtin_item_migration_config =
4166 BuiltinItemMigrationConfig {
4167 persist_client: persist_client.clone(),
4168 read_only: read_only_controllers,
4169 force_migration: force_builtin_schema_migration,
4170 }
4171 ;
4172 let OpenCatalogResult {
4173 mut catalog,
4174 migrated_storage_collections_0dt,
4175 new_builtin_collections,
4176 builtin_table_updates,
4177 cached_global_exprs,
4178 uncached_local_exprs,
4179 } = Catalog::open(mz_catalog::config::Config {
4180 storage,
4181 metrics_registry: &metrics_registry,
4182 state: mz_catalog::config::StateConfig {
4183 unsafe_mode,
4184 all_features,
4185 build_info,
4186 deploy_generation: controller_config.deploy_generation,
4187 environment_id: environment_id.clone(),
4188 read_only: read_only_controllers,
4189 now: now.clone(),
4190 boot_ts: boot_ts.clone(),
4191 skip_migrations: false,
4192 cluster_replica_sizes,
4193 builtin_system_cluster_config,
4194 builtin_catalog_server_cluster_config,
4195 builtin_probe_cluster_config,
4196 builtin_support_cluster_config,
4197 builtin_analytics_cluster_config,
4198 system_parameter_defaults,
4199 remote_system_parameters,
4200 availability_zones,
4201 egress_addresses,
4202 aws_principal_context,
4203 aws_privatelink_availability_zones,
4204 connection_context,
4205 http_host_name,
4206 builtin_item_migration_config,
4207 persist_client: persist_client.clone(),
4208 enable_expression_cache_override: None,
4209 helm_chart_version,
4210 external_login_password_mz_system,
4211 license_key: license_key.clone(),
4212 },
4213 })
4214 .await?;
4215
4216 let catalog_upper = catalog.current_upper().await;
4219 boot_ts = std::cmp::max(boot_ts, catalog_upper);
4220
4221 if !read_only_controllers {
4222 epoch_millis_oracle.apply_write(boot_ts).await;
4223 }
4224
4225 info!(
4226 "startup: coordinator init: catalog open complete in {:?}",
4227 catalog_open_start.elapsed()
4228 );
4229
4230 let coord_thread_start = Instant::now();
4231 info!("startup: coordinator init: coordinator thread start beginning");
4232
4233 let session_id = catalog.config().session_id;
4234 let start_instant = catalog.config().start_instant;
4235
4236 let (bootstrap_tx, bootstrap_rx) = oneshot::channel();
4240 let handle = TokioHandle::current();
4241
4242 let metrics = Metrics::register_into(&metrics_registry);
4243 let metrics_clone = metrics.clone();
4244 let optimizer_metrics = OptimizerMetrics::register_into(
4245 &metrics_registry,
4246 catalog.system_config().optimizer_e2e_latency_warning_threshold(),
4247 );
4248 let segment_client_clone = segment_client.clone();
4249 let coord_now = now.clone();
4250 let advance_timelines_interval = tokio::time::interval(catalog.config().timestamp_interval);
4251 let mut check_scheduling_policies_interval = tokio::time::interval(
4252 catalog
4253 .system_config()
4254 .cluster_check_scheduling_policies_interval(),
4255 );
4256 check_scheduling_policies_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
4257
4258 let clusters_caught_up_check_interval = if read_only_controllers {
4259 let dyncfgs = catalog.system_config().dyncfgs();
4260 let interval = WITH_0DT_DEPLOYMENT_CAUGHT_UP_CHECK_INTERVAL.get(dyncfgs);
4261
4262 let mut interval = tokio::time::interval(interval);
4263 interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
4264 interval
4265 } else {
4266 let mut interval = tokio::time::interval(Duration::from_secs(60 * 60));
4274 interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
4275 interval
4276 };
4277
4278 let clusters_caught_up_check =
4279 clusters_caught_up_trigger.map(|trigger| CaughtUpCheckContext {
4280 trigger,
4281 exclude_collections: new_builtin_collections.into_iter().collect(),
4282 });
4283
4284 if let Some(config) = pg_timestamp_oracle_config.as_ref() {
4285 let pg_timestamp_oracle_params =
4288 flags::pg_timstamp_oracle_config(catalog.system_config());
4289 pg_timestamp_oracle_params.apply(config);
4290 }
4291
4292 let connection_limit_callback: Arc<dyn Fn(&SystemVars) + Send + Sync> =
4295 Arc::new(move |system_vars: &SystemVars| {
4296 let limit: u64 = system_vars.max_connections().cast_into();
4297 let superuser_reserved: u64 =
4298 system_vars.superuser_reserved_connections().cast_into();
4299
4300 let superuser_reserved = if superuser_reserved >= limit {
4305 tracing::warn!(
4306 "superuser_reserved ({superuser_reserved}) is greater than max connections ({limit})!"
4307 );
4308 limit
4309 } else {
4310 superuser_reserved
4311 };
4312
4313 (connection_limit_callback)(limit, superuser_reserved);
4314 });
4315 catalog.system_config_mut().register_callback(
4316 &mz_sql::session::vars::MAX_CONNECTIONS,
4317 Arc::clone(&connection_limit_callback),
4318 );
4319 catalog.system_config_mut().register_callback(
4320 &mz_sql::session::vars::SUPERUSER_RESERVED_CONNECTIONS,
4321 connection_limit_callback,
4322 );
4323
4324 let (group_commit_tx, group_commit_rx) = appends::notifier();
4325
4326 let parent_span = tracing::Span::current();
4327 let thread = thread::Builder::new()
4328 .stack_size(3 * stack::STACK_SIZE)
4332 .name("coordinator".to_string())
4333 .spawn(move || {
4334 let span = info_span!(parent: parent_span, "coord::coordinator").entered();
4335
4336 let controller = handle
4337 .block_on({
4338 catalog.initialize_controller(
4339 controller_config,
4340 controller_envd_epoch,
4341 read_only_controllers,
4342 )
4343 })
4344 .unwrap_or_terminate("failed to initialize storage_controller");
4345 let catalog_upper = handle.block_on(catalog.current_upper());
4348 boot_ts = std::cmp::max(boot_ts, catalog_upper);
4349 if !read_only_controllers {
4350 let epoch_millis_oracle = ×tamp_oracles
4351 .get(&Timeline::EpochMilliseconds)
4352 .expect("inserted above")
4353 .oracle;
4354 handle.block_on(epoch_millis_oracle.apply_write(boot_ts));
4355 }
4356
4357 let catalog = Arc::new(catalog);
4358
4359 let caching_secrets_reader = CachingSecretsReader::new(secrets_controller.reader());
4360 let mut coord = Coordinator {
4361 controller,
4362 catalog,
4363 internal_cmd_tx,
4364 group_commit_tx,
4365 strict_serializable_reads_tx,
4366 global_timelines: timestamp_oracles,
4367 transient_id_gen: Arc::new(TransientIdGen::new()),
4368 active_conns: BTreeMap::new(),
4369 txn_read_holds: Default::default(),
4370 pending_peeks: BTreeMap::new(),
4371 client_pending_peeks: BTreeMap::new(),
4372 pending_linearize_read_txns: BTreeMap::new(),
4373 serialized_ddl: LockedVecDeque::new(),
4374 active_compute_sinks: BTreeMap::new(),
4375 active_webhooks: BTreeMap::new(),
4376 active_copies: BTreeMap::new(),
4377 staged_cancellation: BTreeMap::new(),
4378 introspection_subscribes: BTreeMap::new(),
4379 write_locks: BTreeMap::new(),
4380 deferred_write_ops: BTreeMap::new(),
4381 pending_writes: Vec::new(),
4382 advance_timelines_interval,
4383 secrets_controller,
4384 caching_secrets_reader,
4385 cloud_resource_controller,
4386 storage_usage_client,
4387 storage_usage_collection_interval,
4388 segment_client,
4389 metrics,
4390 optimizer_metrics,
4391 tracing_handle,
4392 statement_logging: StatementLogging::new(coord_now.clone()),
4393 webhook_concurrency_limit,
4394 pg_timestamp_oracle_config,
4395 check_cluster_scheduling_policies_interval: check_scheduling_policies_interval,
4396 cluster_scheduling_decisions: BTreeMap::new(),
4397 caught_up_check_interval: clusters_caught_up_check_interval,
4398 caught_up_check: clusters_caught_up_check,
4399 installed_watch_sets: BTreeMap::new(),
4400 connection_watch_sets: BTreeMap::new(),
4401 cluster_replica_statuses: ClusterReplicaStatuses::new(),
4402 read_only_controllers,
4403 buffered_builtin_table_updates: Some(Vec::new()),
4404 license_key,
4405 persist_client,
4406 };
4407 let bootstrap = handle.block_on(async {
4408 coord
4409 .bootstrap(
4410 boot_ts,
4411 migrated_storage_collections_0dt,
4412 builtin_table_updates,
4413 cached_global_exprs,
4414 uncached_local_exprs,
4415 audit_logs_iterator,
4416 )
4417 .await?;
4418 coord
4419 .controller
4420 .remove_orphaned_replicas(
4421 coord.catalog().get_next_user_replica_id().await?,
4422 coord.catalog().get_next_system_replica_id().await?,
4423 )
4424 .await
4425 .map_err(AdapterError::Orchestrator)?;
4426
4427 if let Some(retention_period) = storage_usage_retention_period {
4428 coord
4429 .prune_storage_usage_events_on_startup(retention_period)
4430 .await;
4431 }
4432
4433 Ok(())
4434 });
4435 let ok = bootstrap.is_ok();
4436 drop(span);
4437 bootstrap_tx
4438 .send(bootstrap)
4439 .expect("bootstrap_rx is not dropped until it receives this message");
4440 if ok {
4441 handle.block_on(coord.serve(
4442 internal_cmd_rx,
4443 strict_serializable_reads_rx,
4444 cmd_rx,
4445 group_commit_rx,
4446 ));
4447 }
4448 })
4449 .expect("failed to create coordinator thread");
4450 match bootstrap_rx
4451 .await
4452 .expect("bootstrap_tx always sends a message or panics/halts")
4453 {
4454 Ok(()) => {
4455 info!(
4456 "startup: coordinator init: coordinator thread start complete in {:?}",
4457 coord_thread_start.elapsed()
4458 );
4459 info!(
4460 "startup: coordinator init: complete in {:?}",
4461 coord_start.elapsed()
4462 );
4463 let handle = Handle {
4464 session_id,
4465 start_instant,
4466 _thread: thread.join_on_drop(),
4467 };
4468 let client = Client::new(
4469 build_info,
4470 cmd_tx.clone(),
4471 metrics_clone,
4472 now,
4473 environment_id,
4474 segment_client_clone,
4475 );
4476 Ok((handle, client))
4477 }
4478 Err(e) => Err(e),
4479 }
4480 }
4481 .boxed()
4482}
4483
4484async fn get_initial_oracle_timestamps(
4498 pg_timestamp_oracle_config: &Option<PostgresTimestampOracleConfig>,
4499) -> Result<BTreeMap<Timeline, Timestamp>, AdapterError> {
4500 let mut initial_timestamps = BTreeMap::new();
4501
4502 if let Some(pg_timestamp_oracle_config) = pg_timestamp_oracle_config {
4503 let postgres_oracle_timestamps =
4504 PostgresTimestampOracle::<NowFn>::get_all_timelines(pg_timestamp_oracle_config.clone())
4505 .await?;
4506
4507 let debug_msg = || {
4508 postgres_oracle_timestamps
4509 .iter()
4510 .map(|(timeline, ts)| format!("{:?} -> {}", timeline, ts))
4511 .join(", ")
4512 };
4513 info!(
4514 "current timestamps from the postgres-backed timestamp oracle: {}",
4515 debug_msg()
4516 );
4517
4518 for (timeline, ts) in postgres_oracle_timestamps {
4519 let entry = initial_timestamps
4520 .entry(Timeline::from_str(&timeline).expect("could not parse timeline"));
4521
4522 entry
4523 .and_modify(|current_ts| *current_ts = std::cmp::max(*current_ts, ts))
4524 .or_insert(ts);
4525 }
4526 } else {
4527 info!("no postgres url for postgres-backed timestamp oracle configured!");
4528 };
4529
4530 let debug_msg = || {
4531 initial_timestamps
4532 .iter()
4533 .map(|(timeline, ts)| format!("{:?}: {}", timeline, ts))
4534 .join(", ")
4535 };
4536 info!("initial oracle timestamps: {}", debug_msg());
4537
4538 Ok(initial_timestamps)
4539}
4540
4541#[instrument]
4542pub async fn load_remote_system_parameters(
4543 storage: &mut Box<dyn OpenableDurableCatalogState>,
4544 system_parameter_sync_config: Option<SystemParameterSyncConfig>,
4545 system_parameter_sync_timeout: Duration,
4546) -> Result<Option<BTreeMap<String, String>>, AdapterError> {
4547 if let Some(system_parameter_sync_config) = system_parameter_sync_config {
4548 tracing::info!("parameter sync on boot: start sync");
4549
4550 let mut params = SynchronizedParameters::new(SystemVars::default());
4590 let frontend_sync = async {
4591 let frontend = SystemParameterFrontend::from(&system_parameter_sync_config).await?;
4592 frontend.pull(&mut params);
4593 let ops = params
4594 .modified()
4595 .into_iter()
4596 .map(|param| {
4597 let name = param.name;
4598 let value = param.value;
4599 tracing::info!(name, value, initial = true, "sync parameter");
4600 (name, value)
4601 })
4602 .collect();
4603 tracing::info!("parameter sync on boot: end sync");
4604 Ok(Some(ops))
4605 };
4606 if !storage.has_system_config_synced_once().await? {
4607 frontend_sync.await
4608 } else {
4609 match mz_ore::future::timeout(system_parameter_sync_timeout, frontend_sync).await {
4610 Ok(ops) => Ok(ops),
4611 Err(TimeoutError::Inner(e)) => Err(e),
4612 Err(TimeoutError::DeadlineElapsed) => {
4613 tracing::info!("parameter sync on boot: sync has timed out");
4614 Ok(None)
4615 }
4616 }
4617 }
4618 } else {
4619 Ok(None)
4620 }
4621}
4622
4623#[derive(Debug)]
4624pub enum WatchSetResponse {
4625 StatementDependenciesReady(StatementLoggingId, StatementLifecycleEvent),
4626 AlterSinkReady(AlterSinkReadyContext),
4627}
4628
4629#[derive(Debug)]
4630pub struct AlterSinkReadyContext {
4631 ctx: Option<ExecuteContext>,
4632 otel_ctx: OpenTelemetryContext,
4633 plan: AlterSinkPlan,
4634 plan_validity: PlanValidity,
4635 read_hold: ReadHolds<Timestamp>,
4636}
4637
4638impl AlterSinkReadyContext {
4639 fn ctx(&mut self) -> &mut ExecuteContext {
4640 self.ctx.as_mut().expect("only cleared on drop")
4641 }
4642
4643 fn retire(mut self, result: Result<ExecuteResponse, AdapterError>) {
4644 self.ctx
4645 .take()
4646 .expect("only cleared on drop")
4647 .retire(result);
4648 }
4649}
4650
4651impl Drop for AlterSinkReadyContext {
4652 fn drop(&mut self) {
4653 if let Some(ctx) = self.ctx.take() {
4654 ctx.retire(Err(AdapterError::Canceled));
4655 }
4656 }
4657}
4658
4659#[derive(Debug)]
4662struct LockedVecDeque<T> {
4663 items: VecDeque<T>,
4664 lock: Arc<tokio::sync::Mutex<()>>,
4665}
4666
4667impl<T> LockedVecDeque<T> {
4668 pub fn new() -> Self {
4669 Self {
4670 items: VecDeque::new(),
4671 lock: Arc::new(tokio::sync::Mutex::new(())),
4672 }
4673 }
4674
4675 pub fn try_lock_owned(&self) -> Result<OwnedMutexGuard<()>, tokio::sync::TryLockError> {
4676 Arc::clone(&self.lock).try_lock_owned()
4677 }
4678
4679 pub fn is_empty(&self) -> bool {
4680 self.items.is_empty()
4681 }
4682
4683 pub fn push_back(&mut self, value: T) {
4684 self.items.push_back(value)
4685 }
4686
4687 pub fn pop_front(&mut self) -> Option<T> {
4688 self.items.pop_front()
4689 }
4690
4691 pub fn remove(&mut self, index: usize) -> Option<T> {
4692 self.items.remove(index)
4693 }
4694
4695 pub fn iter(&self) -> std::collections::vec_deque::Iter<'_, T> {
4696 self.items.iter()
4697 }
4698}
4699
4700#[derive(Debug)]
4701struct DeferredPlanStatement {
4702 ctx: ExecuteContext,
4703 ps: PlanStatement,
4704}
4705
4706#[derive(Debug)]
4707enum PlanStatement {
4708 Statement {
4709 stmt: Arc<Statement<Raw>>,
4710 params: Params,
4711 },
4712 Plan {
4713 plan: mz_sql::plan::Plan,
4714 resolved_ids: ResolvedIds,
4715 },
4716}
4717
4718#[derive(Debug, Error)]
4719pub enum NetworkPolicyError {
4720 #[error("Access denied for address {0}")]
4721 AddressDenied(IpAddr),
4722 #[error("Access denied missing IP address")]
4723 MissingIp,
4724}
4725
4726pub(crate) fn validate_ip_with_policy_rules(
4727 ip: &IpAddr,
4728 rules: &Vec<NetworkPolicyRule>,
4729) -> Result<(), NetworkPolicyError> {
4730 if rules.iter().any(|r| r.address.0.contains(ip)) {
4733 Ok(())
4734 } else {
4735 Err(NetworkPolicyError::AddressDenied(ip.clone()))
4736 }
4737}