1use std::borrow::Cow;
70use std::collections::{BTreeMap, BTreeSet, VecDeque};
71use std::fmt;
72use std::net::IpAddr;
73use std::num::NonZeroI64;
74use std::ops::Neg;
75use std::str::FromStr;
76use std::sync::LazyLock;
77use std::sync::{Arc, Mutex};
78use std::thread;
79use std::time::{Duration, Instant};
80
81use anyhow::Context;
82use chrono::{DateTime, Utc};
83use derivative::Derivative;
84use differential_dataflow::lattice::Lattice;
85use fail::fail_point;
86use futures::StreamExt;
87use futures::future::{BoxFuture, FutureExt, LocalBoxFuture};
88use http::Uri;
89use ipnet::IpNet;
90use itertools::{Either, Itertools};
91use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
92use mz_adapter_types::compaction::CompactionWindow;
93use mz_adapter_types::connection::ConnectionId;
94use mz_adapter_types::dyncfgs::WITH_0DT_DEPLOYMENT_CAUGHT_UP_CHECK_INTERVAL;
95use mz_auth::password::Password;
96use mz_build_info::BuildInfo;
97use mz_catalog::builtin::{BUILTINS, BUILTINS_STATIC, MZ_AUDIT_EVENTS, MZ_STORAGE_USAGE_BY_SHARD};
98use mz_catalog::config::{AwsPrincipalContext, BuiltinItemMigrationConfig, ClusterReplicaSizeMap};
99use mz_catalog::durable::{AuditLogIterator, OpenableDurableCatalogState};
100use mz_catalog::expr_cache::{GlobalExpressions, LocalExpressions};
101use mz_catalog::memory::objects::{
102 CatalogEntry, CatalogItem, ClusterReplicaProcessStatus, ClusterVariantManaged, Connection,
103 DataSourceDesc, StateDiff, StateUpdate, StateUpdateKind, Table, TableDataSource,
104};
105use mz_cloud_resources::{CloudResourceController, VpcEndpointConfig, VpcEndpointEvent};
106use mz_compute_client::as_of_selection;
107use mz_compute_client::controller::error::InstanceMissing;
108use mz_compute_types::ComputeInstanceId;
109use mz_compute_types::dataflows::DataflowDescription;
110use mz_compute_types::plan::Plan;
111use mz_controller::clusters::{ClusterConfig, ClusterEvent, ClusterStatus, ProcessId};
112use mz_controller::{ControllerConfig, Readiness};
113use mz_controller_types::{ClusterId, ReplicaId, WatchSetId};
114use mz_expr::{MapFilterProject, OptimizedMirRelationExpr, RowSetFinishing};
115use mz_license_keys::ValidatedLicenseKey;
116use mz_orchestrator::OfflineReason;
117use mz_ore::cast::{CastFrom, CastInto, CastLossy};
118use mz_ore::channel::trigger::Trigger;
119use mz_ore::future::TimeoutError;
120use mz_ore::metrics::MetricsRegistry;
121use mz_ore::now::{EpochMillis, NowFn};
122use mz_ore::task::{JoinHandle, spawn};
123use mz_ore::thread::JoinHandleExt;
124use mz_ore::tracing::{OpenTelemetryContext, TracingHandle};
125use mz_ore::url::SensitiveUrl;
126use mz_ore::{
127 assert_none, instrument, soft_assert_eq_or_log, soft_assert_or_log, soft_panic_or_log, stack,
128};
129use mz_persist_client::PersistClient;
130use mz_persist_client::batch::ProtoBatch;
131use mz_persist_client::usage::{ShardsUsageReferenced, StorageUsageClient};
132use mz_repr::explain::{ExplainConfig, ExplainFormat};
133use mz_repr::global_id::TransientIdGen;
134use mz_repr::optimize::OptimizerFeatures;
135use mz_repr::role_id::RoleId;
136use mz_repr::{CatalogItemId, Diff, GlobalId, RelationDesc, Timestamp};
137use mz_secrets::cache::CachingSecretsReader;
138use mz_secrets::{SecretsController, SecretsReader};
139use mz_sql::ast::{Raw, Statement};
140use mz_sql::catalog::{CatalogCluster, EnvironmentId};
141use mz_sql::names::{QualifiedItemName, ResolvedIds, SchemaSpecifier};
142use mz_sql::optimizer_metrics::OptimizerMetrics;
143use mz_sql::plan::{
144 self, AlterSinkPlan, ConnectionDetails, CreateConnectionPlan, NetworkPolicyRule,
145 OnTimeoutAction, Params, QueryWhen,
146};
147use mz_sql::session::user::User;
148use mz_sql::session::vars::SystemVars;
149use mz_sql_parser::ast::ExplainStage;
150use mz_sql_parser::ast::display::AstDisplay;
151use mz_storage_client::client::TableData;
152use mz_storage_client::controller::{CollectionDescription, DataSource, ExportDescription};
153use mz_storage_types::connections::Connection as StorageConnection;
154use mz_storage_types::connections::ConnectionContext;
155use mz_storage_types::connections::inline::{IntoInlineConnection, ReferencedConnection};
156use mz_storage_types::read_holds::ReadHold;
157use mz_storage_types::sinks::{S3SinkFormat, StorageSinkDesc};
158use mz_storage_types::sources::Timeline;
159use mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC;
160use mz_timestamp_oracle::WriteTimestamp;
161use mz_timestamp_oracle::postgres_oracle::{
162 PostgresTimestampOracle, PostgresTimestampOracleConfig,
163};
164use mz_transform::dataflow::DataflowMetainfo;
165use opentelemetry::trace::TraceContextExt;
166use serde::Serialize;
167use thiserror::Error;
168use timely::progress::{Antichain, Timestamp as _};
169use tokio::runtime::Handle as TokioHandle;
170use tokio::select;
171use tokio::sync::{OwnedMutexGuard, mpsc, oneshot, watch};
172use tokio::time::{Interval, MissedTickBehavior};
173use tracing::{Instrument, Level, Span, debug, info, info_span, span, warn};
174use tracing_opentelemetry::OpenTelemetrySpanExt;
175use uuid::Uuid;
176
177use crate::active_compute_sink::{ActiveComputeSink, ActiveCopyFrom};
178use crate::catalog::{BuiltinTableUpdate, Catalog, OpenCatalogResult};
179use crate::client::{Client, Handle};
180use crate::command::{Command, ExecuteResponse};
181use crate::config::{SynchronizedParameters, SystemParameterFrontend, SystemParameterSyncConfig};
182use crate::coord::appends::{
183 BuiltinTableAppendNotify, DeferredOp, GroupCommitPermit, PendingWriteTxn,
184};
185use crate::coord::caught_up::CaughtUpCheckContext;
186use crate::coord::cluster_scheduling::SchedulingDecision;
187use crate::coord::id_bundle::CollectionIdBundle;
188use crate::coord::introspection::IntrospectionSubscribe;
189use crate::coord::peek::PendingPeek;
190use crate::coord::statement_logging::{StatementLogging, StatementLoggingId};
191use crate::coord::timeline::{TimelineContext, TimelineState};
192use crate::coord::timestamp_selection::{TimestampContext, TimestampDetermination};
193use crate::coord::validity::PlanValidity;
194use crate::error::AdapterError;
195use crate::explain::insights::PlanInsightsContext;
196use crate::explain::optimizer_trace::{DispatchGuard, OptimizerTrace};
197use crate::metrics::Metrics;
198use crate::optimize::dataflows::{
199 ComputeInstanceSnapshot, DataflowBuilder, dataflow_import_id_bundle,
200};
201use crate::optimize::{self, Optimize, OptimizerConfig};
202use crate::session::{EndTransactionAction, Session};
203use crate::statement_logging::{StatementEndedExecutionReason, StatementLifecycleEvent};
204use crate::util::{ClientTransmitter, ResultExt};
205use crate::webhook::{WebhookAppenderInvalidator, WebhookConcurrencyLimiter};
206use crate::{AdapterNotice, ReadHolds, flags};
207
208pub(crate) mod id_bundle;
209pub(crate) mod in_memory_oracle;
210pub(crate) mod peek;
211pub(crate) mod statement_logging;
212pub(crate) mod timeline;
213pub(crate) mod timestamp_selection;
214
215pub mod appends;
216mod catalog_serving;
217mod caught_up;
218pub mod cluster_scheduling;
219mod command_handler;
220pub mod consistency;
221mod ddl;
222mod indexes;
223mod introspection;
224mod message_handler;
225mod privatelink_status;
226pub mod read_policy;
227mod sequencer;
228mod sql;
229mod validity;
230
231#[derive(Debug)]
232pub enum Message {
233 Command(OpenTelemetryContext, Command),
234 ControllerReady {
235 controller: ControllerReadiness,
236 },
237 PurifiedStatementReady(PurifiedStatementReady),
238 CreateConnectionValidationReady(CreateConnectionValidationReady),
239 AlterConnectionValidationReady(AlterConnectionValidationReady),
240 TryDeferred {
241 conn_id: ConnectionId,
243 acquired_lock: Option<(CatalogItemId, tokio::sync::OwnedMutexGuard<()>)>,
253 },
254 GroupCommitInitiate(Span, Option<GroupCommitPermit>),
256 DeferredStatementReady,
257 AdvanceTimelines,
258 ClusterEvent(ClusterEvent),
259 CancelPendingPeeks {
260 conn_id: ConnectionId,
261 },
262 LinearizeReads,
263 StagedBatches {
264 conn_id: ConnectionId,
265 table_id: CatalogItemId,
266 batches: Vec<Result<ProtoBatch, String>>,
267 },
268 StorageUsageSchedule,
269 StorageUsageFetch,
270 StorageUsageUpdate(ShardsUsageReferenced),
271 StorageUsagePrune(Vec<BuiltinTableUpdate>),
272 RetireExecute {
275 data: ExecuteContextExtra,
276 otel_ctx: OpenTelemetryContext,
277 reason: StatementEndedExecutionReason,
278 },
279 ExecuteSingleStatementTransaction {
280 ctx: ExecuteContext,
281 otel_ctx: OpenTelemetryContext,
282 stmt: Arc<Statement<Raw>>,
283 params: mz_sql::plan::Params,
284 },
285 PeekStageReady {
286 ctx: ExecuteContext,
287 span: Span,
288 stage: PeekStage,
289 },
290 CreateIndexStageReady {
291 ctx: ExecuteContext,
292 span: Span,
293 stage: CreateIndexStage,
294 },
295 CreateViewStageReady {
296 ctx: ExecuteContext,
297 span: Span,
298 stage: CreateViewStage,
299 },
300 CreateMaterializedViewStageReady {
301 ctx: ExecuteContext,
302 span: Span,
303 stage: CreateMaterializedViewStage,
304 },
305 SubscribeStageReady {
306 ctx: ExecuteContext,
307 span: Span,
308 stage: SubscribeStage,
309 },
310 IntrospectionSubscribeStageReady {
311 span: Span,
312 stage: IntrospectionSubscribeStage,
313 },
314 SecretStageReady {
315 ctx: ExecuteContext,
316 span: Span,
317 stage: SecretStage,
318 },
319 ClusterStageReady {
320 ctx: ExecuteContext,
321 span: Span,
322 stage: ClusterStage,
323 },
324 ExplainTimestampStageReady {
325 ctx: ExecuteContext,
326 span: Span,
327 stage: ExplainTimestampStage,
328 },
329 DrainStatementLog,
330 PrivateLinkVpcEndpointEvents(Vec<VpcEndpointEvent>),
331 CheckSchedulingPolicies,
332
333 SchedulingDecisions(Vec<(&'static str, Vec<(ClusterId, SchedulingDecision)>)>),
338}
339
340impl Message {
341 pub const fn kind(&self) -> &'static str {
343 match self {
344 Message::Command(_, msg) => match msg {
345 Command::CatalogSnapshot { .. } => "command-catalog_snapshot",
346 Command::Startup { .. } => "command-startup",
347 Command::Execute { .. } => "command-execute",
348 Command::Commit { .. } => "command-commit",
349 Command::CancelRequest { .. } => "command-cancel_request",
350 Command::PrivilegedCancelRequest { .. } => "command-privileged_cancel_request",
351 Command::GetWebhook { .. } => "command-get_webhook",
352 Command::GetSystemVars { .. } => "command-get_system_vars",
353 Command::SetSystemVars { .. } => "command-set_system_vars",
354 Command::Terminate { .. } => "command-terminate",
355 Command::RetireExecute { .. } => "command-retire_execute",
356 Command::CheckConsistency { .. } => "command-check_consistency",
357 Command::Dump { .. } => "command-dump",
358 Command::AuthenticatePassword { .. } => "command-auth_check",
359 },
360 Message::ControllerReady {
361 controller: ControllerReadiness::Compute,
362 } => "controller_ready(compute)",
363 Message::ControllerReady {
364 controller: ControllerReadiness::Storage,
365 } => "controller_ready(storage)",
366 Message::ControllerReady {
367 controller: ControllerReadiness::Metrics,
368 } => "controller_ready(metrics)",
369 Message::ControllerReady {
370 controller: ControllerReadiness::Internal,
371 } => "controller_ready(internal)",
372 Message::PurifiedStatementReady(_) => "purified_statement_ready",
373 Message::CreateConnectionValidationReady(_) => "create_connection_validation_ready",
374 Message::TryDeferred { .. } => "try_deferred",
375 Message::GroupCommitInitiate(..) => "group_commit_initiate",
376 Message::AdvanceTimelines => "advance_timelines",
377 Message::ClusterEvent(_) => "cluster_event",
378 Message::CancelPendingPeeks { .. } => "cancel_pending_peeks",
379 Message::LinearizeReads => "linearize_reads",
380 Message::StagedBatches { .. } => "staged_batches",
381 Message::StorageUsageSchedule => "storage_usage_schedule",
382 Message::StorageUsageFetch => "storage_usage_fetch",
383 Message::StorageUsageUpdate(_) => "storage_usage_update",
384 Message::StorageUsagePrune(_) => "storage_usage_prune",
385 Message::RetireExecute { .. } => "retire_execute",
386 Message::ExecuteSingleStatementTransaction { .. } => {
387 "execute_single_statement_transaction"
388 }
389 Message::PeekStageReady { .. } => "peek_stage_ready",
390 Message::ExplainTimestampStageReady { .. } => "explain_timestamp_stage_ready",
391 Message::CreateIndexStageReady { .. } => "create_index_stage_ready",
392 Message::CreateViewStageReady { .. } => "create_view_stage_ready",
393 Message::CreateMaterializedViewStageReady { .. } => {
394 "create_materialized_view_stage_ready"
395 }
396 Message::SubscribeStageReady { .. } => "subscribe_stage_ready",
397 Message::IntrospectionSubscribeStageReady { .. } => {
398 "introspection_subscribe_stage_ready"
399 }
400 Message::SecretStageReady { .. } => "secret_stage_ready",
401 Message::ClusterStageReady { .. } => "cluster_stage_ready",
402 Message::DrainStatementLog => "drain_statement_log",
403 Message::AlterConnectionValidationReady(..) => "alter_connection_validation_ready",
404 Message::PrivateLinkVpcEndpointEvents(_) => "private_link_vpc_endpoint_events",
405 Message::CheckSchedulingPolicies => "check_scheduling_policies",
406 Message::SchedulingDecisions { .. } => "scheduling_decision",
407 Message::DeferredStatementReady => "deferred_statement_ready",
408 }
409 }
410}
411
412#[derive(Debug)]
414pub enum ControllerReadiness {
415 Storage,
417 Compute,
419 Metrics,
421 Internal,
423}
424
425#[derive(Derivative)]
426#[derivative(Debug)]
427pub struct BackgroundWorkResult<T> {
428 #[derivative(Debug = "ignore")]
429 pub ctx: ExecuteContext,
430 pub result: Result<T, AdapterError>,
431 pub params: Params,
432 pub plan_validity: PlanValidity,
433 pub original_stmt: Arc<Statement<Raw>>,
434 pub otel_ctx: OpenTelemetryContext,
435}
436
437pub type PurifiedStatementReady = BackgroundWorkResult<mz_sql::pure::PurifiedStatement>;
438
439#[derive(Derivative)]
440#[derivative(Debug)]
441pub struct ValidationReady<T> {
442 #[derivative(Debug = "ignore")]
443 pub ctx: ExecuteContext,
444 pub result: Result<T, AdapterError>,
445 pub resolved_ids: ResolvedIds,
446 pub connection_id: CatalogItemId,
447 pub connection_gid: GlobalId,
448 pub plan_validity: PlanValidity,
449 pub otel_ctx: OpenTelemetryContext,
450}
451
452pub type CreateConnectionValidationReady = ValidationReady<CreateConnectionPlan>;
453pub type AlterConnectionValidationReady = ValidationReady<Connection>;
454
455#[derive(Debug)]
456pub enum PeekStage {
457 LinearizeTimestamp(PeekStageLinearizeTimestamp),
459 RealTimeRecency(PeekStageRealTimeRecency),
460 TimestampReadHold(PeekStageTimestampReadHold),
461 Optimize(PeekStageOptimize),
462 Finish(PeekStageFinish),
464 ExplainPlan(PeekStageExplainPlan),
466 ExplainPushdown(PeekStageExplainPushdown),
467 CopyToPreflight(PeekStageCopyTo),
469 CopyToDataflow(PeekStageCopyTo),
471}
472
473#[derive(Debug)]
474pub struct CopyToContext {
475 pub desc: RelationDesc,
477 pub uri: Uri,
479 pub connection: StorageConnection<ReferencedConnection>,
481 pub connection_id: CatalogItemId,
483 pub format: S3SinkFormat,
485 pub max_file_size: u64,
487 pub output_batch_count: Option<u64>,
492}
493
494#[derive(Debug)]
495pub struct PeekStageLinearizeTimestamp {
496 validity: PlanValidity,
497 plan: mz_sql::plan::SelectPlan,
498 max_query_result_size: Option<u64>,
499 source_ids: BTreeSet<GlobalId>,
500 target_replica: Option<ReplicaId>,
501 timeline_context: TimelineContext,
502 optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
503 explain_ctx: ExplainContext,
506}
507
508#[derive(Debug)]
509pub struct PeekStageRealTimeRecency {
510 validity: PlanValidity,
511 plan: mz_sql::plan::SelectPlan,
512 max_query_result_size: Option<u64>,
513 source_ids: BTreeSet<GlobalId>,
514 target_replica: Option<ReplicaId>,
515 timeline_context: TimelineContext,
516 oracle_read_ts: Option<Timestamp>,
517 optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
518 explain_ctx: ExplainContext,
521}
522
523#[derive(Debug)]
524pub struct PeekStageTimestampReadHold {
525 validity: PlanValidity,
526 plan: mz_sql::plan::SelectPlan,
527 max_query_result_size: Option<u64>,
528 source_ids: BTreeSet<GlobalId>,
529 target_replica: Option<ReplicaId>,
530 timeline_context: TimelineContext,
531 oracle_read_ts: Option<Timestamp>,
532 real_time_recency_ts: Option<mz_repr::Timestamp>,
533 optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
534 explain_ctx: ExplainContext,
537}
538
539#[derive(Debug)]
540pub struct PeekStageOptimize {
541 validity: PlanValidity,
542 plan: mz_sql::plan::SelectPlan,
543 max_query_result_size: Option<u64>,
544 source_ids: BTreeSet<GlobalId>,
545 id_bundle: CollectionIdBundle,
546 target_replica: Option<ReplicaId>,
547 determination: TimestampDetermination<mz_repr::Timestamp>,
548 optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
549 explain_ctx: ExplainContext,
552}
553
554#[derive(Debug)]
555pub struct PeekStageFinish {
556 validity: PlanValidity,
557 plan: mz_sql::plan::SelectPlan,
558 max_query_result_size: Option<u64>,
559 id_bundle: CollectionIdBundle,
560 target_replica: Option<ReplicaId>,
561 source_ids: BTreeSet<GlobalId>,
562 determination: TimestampDetermination<mz_repr::Timestamp>,
563 cluster_id: ComputeInstanceId,
564 finishing: RowSetFinishing,
565 plan_insights_optimizer_trace: Option<OptimizerTrace>,
568 insights_ctx: Option<Box<PlanInsightsContext>>,
569 global_lir_plan: optimize::peek::GlobalLirPlan,
570 optimization_finished_at: EpochMillis,
571}
572
573#[derive(Debug)]
574pub struct PeekStageCopyTo {
575 validity: PlanValidity,
576 optimizer: optimize::copy_to::Optimizer,
577 global_lir_plan: optimize::copy_to::GlobalLirPlan,
578 optimization_finished_at: EpochMillis,
579 source_ids: BTreeSet<GlobalId>,
580}
581
582#[derive(Debug)]
583pub struct PeekStageExplainPlan {
584 validity: PlanValidity,
585 optimizer: optimize::peek::Optimizer,
586 df_meta: DataflowMetainfo,
587 explain_ctx: ExplainPlanContext,
588 insights_ctx: Option<Box<PlanInsightsContext>>,
589}
590
591#[derive(Debug)]
592pub struct PeekStageExplainPushdown {
593 validity: PlanValidity,
594 determination: TimestampDetermination<mz_repr::Timestamp>,
595 imports: BTreeMap<GlobalId, MapFilterProject>,
596}
597
598#[derive(Debug)]
599pub enum CreateIndexStage {
600 Optimize(CreateIndexOptimize),
601 Finish(CreateIndexFinish),
602 Explain(CreateIndexExplain),
603}
604
605#[derive(Debug)]
606pub struct CreateIndexOptimize {
607 validity: PlanValidity,
608 plan: plan::CreateIndexPlan,
609 resolved_ids: ResolvedIds,
610 explain_ctx: ExplainContext,
613}
614
615#[derive(Debug)]
616pub struct CreateIndexFinish {
617 validity: PlanValidity,
618 item_id: CatalogItemId,
619 global_id: GlobalId,
620 plan: plan::CreateIndexPlan,
621 resolved_ids: ResolvedIds,
622 global_mir_plan: optimize::index::GlobalMirPlan,
623 global_lir_plan: optimize::index::GlobalLirPlan,
624}
625
626#[derive(Debug)]
627pub struct CreateIndexExplain {
628 validity: PlanValidity,
629 exported_index_id: GlobalId,
630 plan: plan::CreateIndexPlan,
631 df_meta: DataflowMetainfo,
632 explain_ctx: ExplainPlanContext,
633}
634
635#[derive(Debug)]
636pub enum CreateViewStage {
637 Optimize(CreateViewOptimize),
638 Finish(CreateViewFinish),
639 Explain(CreateViewExplain),
640}
641
642#[derive(Debug)]
643pub struct CreateViewOptimize {
644 validity: PlanValidity,
645 plan: plan::CreateViewPlan,
646 resolved_ids: ResolvedIds,
647 explain_ctx: ExplainContext,
650}
651
652#[derive(Debug)]
653pub struct CreateViewFinish {
654 validity: PlanValidity,
655 item_id: CatalogItemId,
657 global_id: GlobalId,
659 plan: plan::CreateViewPlan,
660 resolved_ids: ResolvedIds,
662 optimized_expr: OptimizedMirRelationExpr,
663}
664
665#[derive(Debug)]
666pub struct CreateViewExplain {
667 validity: PlanValidity,
668 id: GlobalId,
669 plan: plan::CreateViewPlan,
670 explain_ctx: ExplainPlanContext,
671}
672
673#[derive(Debug)]
674pub enum ExplainTimestampStage {
675 Optimize(ExplainTimestampOptimize),
676 RealTimeRecency(ExplainTimestampRealTimeRecency),
677 Finish(ExplainTimestampFinish),
678}
679
680#[derive(Debug)]
681pub struct ExplainTimestampOptimize {
682 validity: PlanValidity,
683 plan: plan::ExplainTimestampPlan,
684 cluster_id: ClusterId,
685}
686
687#[derive(Debug)]
688pub struct ExplainTimestampRealTimeRecency {
689 validity: PlanValidity,
690 format: ExplainFormat,
691 optimized_plan: OptimizedMirRelationExpr,
692 cluster_id: ClusterId,
693 when: QueryWhen,
694}
695
696#[derive(Debug)]
697pub struct ExplainTimestampFinish {
698 validity: PlanValidity,
699 format: ExplainFormat,
700 optimized_plan: OptimizedMirRelationExpr,
701 cluster_id: ClusterId,
702 source_ids: BTreeSet<GlobalId>,
703 when: QueryWhen,
704 real_time_recency_ts: Option<Timestamp>,
705}
706
707#[derive(Debug)]
708pub enum ClusterStage {
709 Alter(AlterCluster),
710 WaitForHydrated(AlterClusterWaitForHydrated),
711 Finalize(AlterClusterFinalize),
712}
713
714#[derive(Debug)]
715pub struct AlterCluster {
716 validity: PlanValidity,
717 plan: plan::AlterClusterPlan,
718}
719
720#[derive(Debug)]
721pub struct AlterClusterWaitForHydrated {
722 validity: PlanValidity,
723 plan: plan::AlterClusterPlan,
724 new_config: ClusterVariantManaged,
725 timeout_time: Instant,
726 on_timeout: OnTimeoutAction,
727}
728
729#[derive(Debug)]
730pub struct AlterClusterFinalize {
731 validity: PlanValidity,
732 plan: plan::AlterClusterPlan,
733 new_config: ClusterVariantManaged,
734}
735
736#[derive(Debug)]
737pub enum ExplainContext {
738 None,
740 Plan(ExplainPlanContext),
742 PlanInsightsNotice(OptimizerTrace),
745 Pushdown,
747}
748
749impl ExplainContext {
750 fn dispatch_guard(&self) -> Option<DispatchGuard<'_>> {
754 let optimizer_trace = match self {
755 ExplainContext::Plan(explain_ctx) => Some(&explain_ctx.optimizer_trace),
756 ExplainContext::PlanInsightsNotice(optimizer_trace) => Some(optimizer_trace),
757 _ => None,
758 };
759 optimizer_trace.map(|optimizer_trace| optimizer_trace.as_guard())
760 }
761
762 fn needs_cluster(&self) -> bool {
763 match self {
764 ExplainContext::None => true,
765 ExplainContext::Plan(..) => false,
766 ExplainContext::PlanInsightsNotice(..) => true,
767 ExplainContext::Pushdown => false,
768 }
769 }
770
771 fn needs_plan_insights(&self) -> bool {
772 matches!(
773 self,
774 ExplainContext::Plan(ExplainPlanContext {
775 stage: ExplainStage::PlanInsights,
776 ..
777 }) | ExplainContext::PlanInsightsNotice(_)
778 )
779 }
780}
781
782#[derive(Debug)]
783pub struct ExplainPlanContext {
784 pub broken: bool,
785 pub config: ExplainConfig,
786 pub format: ExplainFormat,
787 pub stage: ExplainStage,
788 pub replan: Option<GlobalId>,
789 pub desc: Option<RelationDesc>,
790 pub optimizer_trace: OptimizerTrace,
791}
792
793#[derive(Debug)]
794pub enum CreateMaterializedViewStage {
795 Optimize(CreateMaterializedViewOptimize),
796 Finish(CreateMaterializedViewFinish),
797 Explain(CreateMaterializedViewExplain),
798}
799
800#[derive(Debug)]
801pub struct CreateMaterializedViewOptimize {
802 validity: PlanValidity,
803 plan: plan::CreateMaterializedViewPlan,
804 resolved_ids: ResolvedIds,
805 explain_ctx: ExplainContext,
808}
809
810#[derive(Debug)]
811pub struct CreateMaterializedViewFinish {
812 item_id: CatalogItemId,
814 global_id: GlobalId,
816 validity: PlanValidity,
817 plan: plan::CreateMaterializedViewPlan,
818 resolved_ids: ResolvedIds,
819 local_mir_plan: optimize::materialized_view::LocalMirPlan,
820 global_mir_plan: optimize::materialized_view::GlobalMirPlan,
821 global_lir_plan: optimize::materialized_view::GlobalLirPlan,
822}
823
824#[derive(Debug)]
825pub struct CreateMaterializedViewExplain {
826 global_id: GlobalId,
827 validity: PlanValidity,
828 plan: plan::CreateMaterializedViewPlan,
829 df_meta: DataflowMetainfo,
830 explain_ctx: ExplainPlanContext,
831}
832
833#[derive(Debug)]
834pub enum SubscribeStage {
835 OptimizeMir(SubscribeOptimizeMir),
836 TimestampOptimizeLir(SubscribeTimestampOptimizeLir),
837 Finish(SubscribeFinish),
838}
839
840#[derive(Debug)]
841pub struct SubscribeOptimizeMir {
842 validity: PlanValidity,
843 plan: plan::SubscribePlan,
844 timeline: TimelineContext,
845 dependency_ids: BTreeSet<GlobalId>,
846 cluster_id: ComputeInstanceId,
847 replica_id: Option<ReplicaId>,
848}
849
850#[derive(Debug)]
851pub struct SubscribeTimestampOptimizeLir {
852 validity: PlanValidity,
853 plan: plan::SubscribePlan,
854 timeline: TimelineContext,
855 optimizer: optimize::subscribe::Optimizer,
856 global_mir_plan: optimize::subscribe::GlobalMirPlan<optimize::subscribe::Unresolved>,
857 dependency_ids: BTreeSet<GlobalId>,
858 replica_id: Option<ReplicaId>,
859}
860
861#[derive(Debug)]
862pub struct SubscribeFinish {
863 validity: PlanValidity,
864 cluster_id: ComputeInstanceId,
865 replica_id: Option<ReplicaId>,
866 plan: plan::SubscribePlan,
867 global_lir_plan: optimize::subscribe::GlobalLirPlan,
868 dependency_ids: BTreeSet<GlobalId>,
869}
870
871#[derive(Debug)]
872pub enum IntrospectionSubscribeStage {
873 OptimizeMir(IntrospectionSubscribeOptimizeMir),
874 TimestampOptimizeLir(IntrospectionSubscribeTimestampOptimizeLir),
875 Finish(IntrospectionSubscribeFinish),
876}
877
878#[derive(Debug)]
879pub struct IntrospectionSubscribeOptimizeMir {
880 validity: PlanValidity,
881 plan: plan::SubscribePlan,
882 subscribe_id: GlobalId,
883 cluster_id: ComputeInstanceId,
884 replica_id: ReplicaId,
885}
886
887#[derive(Debug)]
888pub struct IntrospectionSubscribeTimestampOptimizeLir {
889 validity: PlanValidity,
890 optimizer: optimize::subscribe::Optimizer,
891 global_mir_plan: optimize::subscribe::GlobalMirPlan<optimize::subscribe::Unresolved>,
892 cluster_id: ComputeInstanceId,
893 replica_id: ReplicaId,
894}
895
896#[derive(Debug)]
897pub struct IntrospectionSubscribeFinish {
898 validity: PlanValidity,
899 global_lir_plan: optimize::subscribe::GlobalLirPlan,
900 read_holds: ReadHolds<Timestamp>,
901 cluster_id: ComputeInstanceId,
902 replica_id: ReplicaId,
903}
904
905#[derive(Debug)]
906pub enum SecretStage {
907 CreateEnsure(CreateSecretEnsure),
908 CreateFinish(CreateSecretFinish),
909 RotateKeysEnsure(RotateKeysSecretEnsure),
910 RotateKeysFinish(RotateKeysSecretFinish),
911 Alter(AlterSecret),
912}
913
914#[derive(Debug)]
915pub struct CreateSecretEnsure {
916 validity: PlanValidity,
917 plan: plan::CreateSecretPlan,
918}
919
920#[derive(Debug)]
921pub struct CreateSecretFinish {
922 validity: PlanValidity,
923 item_id: CatalogItemId,
924 global_id: GlobalId,
925 plan: plan::CreateSecretPlan,
926}
927
928#[derive(Debug)]
929pub struct RotateKeysSecretEnsure {
930 validity: PlanValidity,
931 id: CatalogItemId,
932}
933
934#[derive(Debug)]
935pub struct RotateKeysSecretFinish {
936 validity: PlanValidity,
937 ops: Vec<crate::catalog::Op>,
938}
939
940#[derive(Debug)]
941pub struct AlterSecret {
942 validity: PlanValidity,
943 plan: plan::AlterSecretPlan,
944}
945
946#[derive(Debug, Copy, Clone, PartialEq, Eq)]
951pub enum TargetCluster {
952 CatalogServer,
954 Active,
956 Transaction(ClusterId),
958}
959
960pub(crate) enum StageResult<T> {
962 Handle(JoinHandle<Result<T, AdapterError>>),
964 HandleRetire(JoinHandle<Result<ExecuteResponse, AdapterError>>),
966 Immediate(T),
968 Response(ExecuteResponse),
970}
971
972pub(crate) trait Staged: Send {
974 type Ctx: StagedContext;
975
976 fn validity(&mut self) -> &mut PlanValidity;
977
978 async fn stage(
980 self,
981 coord: &mut Coordinator,
982 ctx: &mut Self::Ctx,
983 ) -> Result<StageResult<Box<Self>>, AdapterError>;
984
985 fn message(self, ctx: Self::Ctx, span: Span) -> Message;
987
988 fn cancel_enabled(&self) -> bool;
990}
991
992pub trait StagedContext {
993 fn retire(self, result: Result<ExecuteResponse, AdapterError>);
994 fn session(&self) -> Option<&Session>;
995}
996
997impl StagedContext for ExecuteContext {
998 fn retire(self, result: Result<ExecuteResponse, AdapterError>) {
999 self.retire(result);
1000 }
1001
1002 fn session(&self) -> Option<&Session> {
1003 Some(self.session())
1004 }
1005}
1006
1007impl StagedContext for () {
1008 fn retire(self, _result: Result<ExecuteResponse, AdapterError>) {}
1009
1010 fn session(&self) -> Option<&Session> {
1011 None
1012 }
1013}
1014
1015pub struct Config {
1017 pub controller_config: ControllerConfig,
1018 pub controller_envd_epoch: NonZeroI64,
1019 pub storage: Box<dyn mz_catalog::durable::DurableCatalogState>,
1020 pub audit_logs_iterator: AuditLogIterator,
1021 pub timestamp_oracle_url: Option<SensitiveUrl>,
1022 pub unsafe_mode: bool,
1023 pub all_features: bool,
1024 pub build_info: &'static BuildInfo,
1025 pub environment_id: EnvironmentId,
1026 pub metrics_registry: MetricsRegistry,
1027 pub now: NowFn,
1028 pub secrets_controller: Arc<dyn SecretsController>,
1029 pub cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
1030 pub availability_zones: Vec<String>,
1031 pub cluster_replica_sizes: ClusterReplicaSizeMap,
1032 pub builtin_system_cluster_config: BootstrapBuiltinClusterConfig,
1033 pub builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig,
1034 pub builtin_probe_cluster_config: BootstrapBuiltinClusterConfig,
1035 pub builtin_support_cluster_config: BootstrapBuiltinClusterConfig,
1036 pub builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig,
1037 pub system_parameter_defaults: BTreeMap<String, String>,
1038 pub storage_usage_client: StorageUsageClient,
1039 pub storage_usage_collection_interval: Duration,
1040 pub storage_usage_retention_period: Option<Duration>,
1041 pub segment_client: Option<mz_segment::Client>,
1042 pub egress_addresses: Vec<IpNet>,
1043 pub remote_system_parameters: Option<BTreeMap<String, String>>,
1044 pub aws_account_id: Option<String>,
1045 pub aws_privatelink_availability_zones: Option<Vec<String>>,
1046 pub connection_context: ConnectionContext,
1047 pub connection_limit_callback: Box<dyn Fn(u64, u64) -> () + Send + Sync + 'static>,
1048 pub webhook_concurrency_limit: WebhookConcurrencyLimiter,
1049 pub http_host_name: Option<String>,
1050 pub tracing_handle: TracingHandle,
1051 pub read_only_controllers: bool,
1055 pub enable_0dt_deployment: bool,
1057
1058 pub caught_up_trigger: Option<Trigger>,
1062
1063 pub helm_chart_version: Option<String>,
1064 pub license_key: ValidatedLicenseKey,
1065 pub external_login_password_mz_system: Option<Password>,
1066}
1067
1068#[derive(Debug, Serialize)]
1070pub struct ConnMeta {
1071 secret_key: u32,
1076 connected_at: EpochMillis,
1078 user: User,
1079 application_name: String,
1080 uuid: Uuid,
1081 conn_id: ConnectionId,
1082 client_ip: Option<IpAddr>,
1083
1084 drop_sinks: BTreeSet<GlobalId>,
1087
1088 #[serde(skip)]
1090 deferred_lock: Option<OwnedMutexGuard<()>>,
1091
1092 pending_cluster_alters: BTreeSet<ClusterId>,
1095
1096 #[serde(skip)]
1098 notice_tx: mpsc::UnboundedSender<AdapterNotice>,
1099
1100 authenticated_role: RoleId,
1104}
1105
1106impl ConnMeta {
1107 pub fn conn_id(&self) -> &ConnectionId {
1108 &self.conn_id
1109 }
1110
1111 pub fn user(&self) -> &User {
1112 &self.user
1113 }
1114
1115 pub fn application_name(&self) -> &str {
1116 &self.application_name
1117 }
1118
1119 pub fn authenticated_role_id(&self) -> &RoleId {
1120 &self.authenticated_role
1121 }
1122
1123 pub fn uuid(&self) -> Uuid {
1124 self.uuid
1125 }
1126
1127 pub fn client_ip(&self) -> Option<IpAddr> {
1128 self.client_ip
1129 }
1130
1131 pub fn connected_at(&self) -> EpochMillis {
1132 self.connected_at
1133 }
1134}
1135
1136#[derive(Debug)]
1137pub struct PendingTxn {
1139 ctx: ExecuteContext,
1141 response: Result<PendingTxnResponse, AdapterError>,
1143 action: EndTransactionAction,
1145}
1146
1147#[derive(Debug)]
1148pub enum PendingTxnResponse {
1150 Committed {
1152 params: BTreeMap<&'static str, String>,
1154 },
1155 Rolledback {
1157 params: BTreeMap<&'static str, String>,
1159 },
1160}
1161
1162impl PendingTxnResponse {
1163 pub fn extend_params(&mut self, p: impl IntoIterator<Item = (&'static str, String)>) {
1164 match self {
1165 PendingTxnResponse::Committed { params }
1166 | PendingTxnResponse::Rolledback { params } => params.extend(p),
1167 }
1168 }
1169}
1170
1171impl From<PendingTxnResponse> for ExecuteResponse {
1172 fn from(value: PendingTxnResponse) -> Self {
1173 match value {
1174 PendingTxnResponse::Committed { params } => {
1175 ExecuteResponse::TransactionCommitted { params }
1176 }
1177 PendingTxnResponse::Rolledback { params } => {
1178 ExecuteResponse::TransactionRolledBack { params }
1179 }
1180 }
1181 }
1182}
1183
1184#[derive(Debug)]
1185pub struct PendingReadTxn {
1187 txn: PendingRead,
1189 timestamp_context: TimestampContext<mz_repr::Timestamp>,
1191 created: Instant,
1193 num_requeues: u64,
1197 otel_ctx: OpenTelemetryContext,
1199}
1200
1201impl PendingReadTxn {
1202 pub fn timestamp_context(&self) -> &TimestampContext<mz_repr::Timestamp> {
1204 &self.timestamp_context
1205 }
1206
1207 pub(crate) fn take_context(self) -> ExecuteContext {
1208 self.txn.take_context()
1209 }
1210}
1211
1212#[derive(Debug)]
1213enum PendingRead {
1215 Read {
1216 txn: PendingTxn,
1218 },
1219 ReadThenWrite {
1220 ctx: ExecuteContext,
1222 tx: oneshot::Sender<Option<ExecuteContext>>,
1225 },
1226}
1227
1228impl PendingRead {
1229 #[instrument(level = "debug")]
1234 pub fn finish(self) -> Option<(ExecuteContext, Result<ExecuteResponse, AdapterError>)> {
1235 match self {
1236 PendingRead::Read {
1237 txn:
1238 PendingTxn {
1239 mut ctx,
1240 response,
1241 action,
1242 },
1243 ..
1244 } => {
1245 let changed = ctx.session_mut().vars_mut().end_transaction(action);
1246 let response = response.map(|mut r| {
1248 r.extend_params(changed);
1249 ExecuteResponse::from(r)
1250 });
1251
1252 Some((ctx, response))
1253 }
1254 PendingRead::ReadThenWrite { ctx, tx, .. } => {
1255 let _ = tx.send(Some(ctx));
1257 None
1258 }
1259 }
1260 }
1261
1262 fn label(&self) -> &'static str {
1263 match self {
1264 PendingRead::Read { .. } => "read",
1265 PendingRead::ReadThenWrite { .. } => "read_then_write",
1266 }
1267 }
1268
1269 pub(crate) fn take_context(self) -> ExecuteContext {
1270 match self {
1271 PendingRead::Read { txn, .. } => txn.ctx,
1272 PendingRead::ReadThenWrite { ctx, tx, .. } => {
1273 let _ = tx.send(None);
1276 ctx
1277 }
1278 }
1279 }
1280}
1281
1282#[derive(Debug, Default)]
1293#[must_use]
1294pub struct ExecuteContextExtra {
1295 statement_uuid: Option<StatementLoggingId>,
1296}
1297
1298impl ExecuteContextExtra {
1299 pub(crate) fn new(statement_uuid: Option<StatementLoggingId>) -> Self {
1300 Self { statement_uuid }
1301 }
1302 pub fn is_trivial(&self) -> bool {
1303 let Self { statement_uuid } = self;
1304 statement_uuid.is_none()
1305 }
1306 pub fn contents(&self) -> Option<StatementLoggingId> {
1307 let Self { statement_uuid } = self;
1308 *statement_uuid
1309 }
1310 #[must_use]
1314 fn retire(mut self) -> Option<StatementLoggingId> {
1315 let Self { statement_uuid } = &mut self;
1316 statement_uuid.take()
1317 }
1318}
1319
1320impl Drop for ExecuteContextExtra {
1321 fn drop(&mut self) {
1322 let Self { statement_uuid } = &*self;
1323 if let Some(statement_uuid) = statement_uuid {
1324 soft_panic_or_log!(
1328 "execute context for statement {statement_uuid:?} dropped without being properly retired."
1329 );
1330 }
1331 }
1332}
1333
1334#[derive(Debug)]
1347pub struct ExecuteContext {
1348 inner: Box<ExecuteContextInner>,
1349}
1350
1351impl std::ops::Deref for ExecuteContext {
1352 type Target = ExecuteContextInner;
1353 fn deref(&self) -> &Self::Target {
1354 &*self.inner
1355 }
1356}
1357
1358impl std::ops::DerefMut for ExecuteContext {
1359 fn deref_mut(&mut self) -> &mut Self::Target {
1360 &mut *self.inner
1361 }
1362}
1363
1364#[derive(Debug)]
1365pub struct ExecuteContextInner {
1366 tx: ClientTransmitter<ExecuteResponse>,
1367 internal_cmd_tx: mpsc::UnboundedSender<Message>,
1368 session: Session,
1369 extra: ExecuteContextExtra,
1370}
1371
1372impl ExecuteContext {
1373 pub fn session(&self) -> &Session {
1374 &self.session
1375 }
1376
1377 pub fn session_mut(&mut self) -> &mut Session {
1378 &mut self.session
1379 }
1380
1381 pub fn tx(&self) -> &ClientTransmitter<ExecuteResponse> {
1382 &self.tx
1383 }
1384
1385 pub fn tx_mut(&mut self) -> &mut ClientTransmitter<ExecuteResponse> {
1386 &mut self.tx
1387 }
1388
1389 pub fn from_parts(
1390 tx: ClientTransmitter<ExecuteResponse>,
1391 internal_cmd_tx: mpsc::UnboundedSender<Message>,
1392 session: Session,
1393 extra: ExecuteContextExtra,
1394 ) -> Self {
1395 Self {
1396 inner: ExecuteContextInner {
1397 tx,
1398 session,
1399 extra,
1400 internal_cmd_tx,
1401 }
1402 .into(),
1403 }
1404 }
1405
1406 pub fn into_parts(
1415 self,
1416 ) -> (
1417 ClientTransmitter<ExecuteResponse>,
1418 mpsc::UnboundedSender<Message>,
1419 Session,
1420 ExecuteContextExtra,
1421 ) {
1422 let ExecuteContextInner {
1423 tx,
1424 internal_cmd_tx,
1425 session,
1426 extra,
1427 } = *self.inner;
1428 (tx, internal_cmd_tx, session, extra)
1429 }
1430
1431 #[instrument(level = "debug")]
1433 pub fn retire(self, result: Result<ExecuteResponse, AdapterError>) {
1434 let ExecuteContextInner {
1435 tx,
1436 internal_cmd_tx,
1437 session,
1438 extra,
1439 } = *self.inner;
1440 let reason = if extra.is_trivial() {
1441 None
1442 } else {
1443 Some((&result).into())
1444 };
1445 tx.send(result, session);
1446 if let Some(reason) = reason {
1447 if let Err(e) = internal_cmd_tx.send(Message::RetireExecute {
1448 otel_ctx: OpenTelemetryContext::obtain(),
1449 data: extra,
1450 reason,
1451 }) {
1452 warn!("internal_cmd_rx dropped before we could send: {:?}", e);
1453 }
1454 }
1455 }
1456
1457 pub fn extra(&self) -> &ExecuteContextExtra {
1458 &self.extra
1459 }
1460
1461 pub fn extra_mut(&mut self) -> &mut ExecuteContextExtra {
1462 &mut self.extra
1463 }
1464}
1465
1466#[derive(Debug)]
1467struct ClusterReplicaStatuses(
1468 BTreeMap<ClusterId, BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>>>,
1469);
1470
1471impl ClusterReplicaStatuses {
1472 pub(crate) fn new() -> ClusterReplicaStatuses {
1473 ClusterReplicaStatuses(BTreeMap::new())
1474 }
1475
1476 pub(crate) fn initialize_cluster_statuses(&mut self, cluster_id: ClusterId) {
1480 let prev = self.0.insert(cluster_id, BTreeMap::new());
1481 assert_eq!(
1482 prev, None,
1483 "cluster {cluster_id} statuses already initialized"
1484 );
1485 }
1486
1487 pub(crate) fn initialize_cluster_replica_statuses(
1491 &mut self,
1492 cluster_id: ClusterId,
1493 replica_id: ReplicaId,
1494 num_processes: usize,
1495 time: DateTime<Utc>,
1496 ) {
1497 tracing::info!(
1498 ?cluster_id,
1499 ?replica_id,
1500 ?time,
1501 "initializing cluster replica status"
1502 );
1503 let replica_statuses = self.0.entry(cluster_id).or_default();
1504 let process_statuses = (0..num_processes)
1505 .map(|process_id| {
1506 let status = ClusterReplicaProcessStatus {
1507 status: ClusterStatus::Offline(Some(OfflineReason::Initializing)),
1508 time: time.clone(),
1509 };
1510 (u64::cast_from(process_id), status)
1511 })
1512 .collect();
1513 let prev = replica_statuses.insert(replica_id, process_statuses);
1514 assert_none!(
1515 prev,
1516 "cluster replica {cluster_id}.{replica_id} statuses already initialized"
1517 );
1518 }
1519
1520 pub(crate) fn remove_cluster_statuses(
1524 &mut self,
1525 cluster_id: &ClusterId,
1526 ) -> BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>> {
1527 let prev = self.0.remove(cluster_id);
1528 prev.unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"))
1529 }
1530
1531 pub(crate) fn remove_cluster_replica_statuses(
1535 &mut self,
1536 cluster_id: &ClusterId,
1537 replica_id: &ReplicaId,
1538 ) -> BTreeMap<ProcessId, ClusterReplicaProcessStatus> {
1539 let replica_statuses = self
1540 .0
1541 .get_mut(cluster_id)
1542 .unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"));
1543 let prev = replica_statuses.remove(replica_id);
1544 prev.unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1545 }
1546
1547 pub(crate) fn ensure_cluster_status(
1551 &mut self,
1552 cluster_id: ClusterId,
1553 replica_id: ReplicaId,
1554 process_id: ProcessId,
1555 status: ClusterReplicaProcessStatus,
1556 ) {
1557 let replica_statuses = self
1558 .0
1559 .get_mut(&cluster_id)
1560 .unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"))
1561 .get_mut(&replica_id)
1562 .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"));
1563 replica_statuses.insert(process_id, status);
1564 }
1565
1566 pub fn get_cluster_replica_status(
1570 &self,
1571 cluster_id: ClusterId,
1572 replica_id: ReplicaId,
1573 ) -> ClusterStatus {
1574 let process_status = self.get_cluster_replica_statuses(cluster_id, replica_id);
1575 Self::cluster_replica_status(process_status)
1576 }
1577
1578 pub fn cluster_replica_status(
1580 process_status: &BTreeMap<ProcessId, ClusterReplicaProcessStatus>,
1581 ) -> ClusterStatus {
1582 process_status
1583 .values()
1584 .fold(ClusterStatus::Online, |s, p| match (s, p.status) {
1585 (ClusterStatus::Online, ClusterStatus::Online) => ClusterStatus::Online,
1586 (x, y) => {
1587 let reason_x = match x {
1588 ClusterStatus::Offline(reason) => reason,
1589 ClusterStatus::Online => None,
1590 };
1591 let reason_y = match y {
1592 ClusterStatus::Offline(reason) => reason,
1593 ClusterStatus::Online => None,
1594 };
1595 ClusterStatus::Offline(reason_x.or(reason_y))
1597 }
1598 })
1599 }
1600
1601 pub(crate) fn get_cluster_replica_statuses(
1605 &self,
1606 cluster_id: ClusterId,
1607 replica_id: ReplicaId,
1608 ) -> &BTreeMap<ProcessId, ClusterReplicaProcessStatus> {
1609 self.try_get_cluster_replica_statuses(cluster_id, replica_id)
1610 .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1611 }
1612
1613 pub(crate) fn try_get_cluster_replica_statuses(
1615 &self,
1616 cluster_id: ClusterId,
1617 replica_id: ReplicaId,
1618 ) -> Option<&BTreeMap<ProcessId, ClusterReplicaProcessStatus>> {
1619 self.try_get_cluster_statuses(cluster_id)
1620 .and_then(|statuses| statuses.get(&replica_id))
1621 }
1622
1623 pub(crate) fn try_get_cluster_statuses(
1625 &self,
1626 cluster_id: ClusterId,
1627 ) -> Option<&BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>>> {
1628 self.0.get(&cluster_id)
1629 }
1630}
1631
1632#[derive(Derivative)]
1634#[derivative(Debug)]
1635pub struct Coordinator {
1636 #[derivative(Debug = "ignore")]
1638 controller: mz_controller::Controller,
1639 catalog: Arc<Catalog>,
1647
1648 persist_client: PersistClient,
1651
1652 internal_cmd_tx: mpsc::UnboundedSender<Message>,
1654 group_commit_tx: appends::GroupCommitNotifier,
1656
1657 strict_serializable_reads_tx: mpsc::UnboundedSender<(ConnectionId, PendingReadTxn)>,
1659
1660 global_timelines: BTreeMap<Timeline, TimelineState<Timestamp>>,
1663
1664 transient_id_gen: Arc<TransientIdGen>,
1666 active_conns: BTreeMap<ConnectionId, ConnMeta>,
1669
1670 txn_read_holds: BTreeMap<ConnectionId, read_policy::ReadHolds<Timestamp>>,
1674
1675 pending_peeks: BTreeMap<Uuid, PendingPeek>,
1679 client_pending_peeks: BTreeMap<ConnectionId, BTreeMap<Uuid, ClusterId>>,
1681
1682 pending_linearize_read_txns: BTreeMap<ConnectionId, PendingReadTxn>,
1684
1685 active_compute_sinks: BTreeMap<GlobalId, ActiveComputeSink>,
1687 active_webhooks: BTreeMap<CatalogItemId, WebhookAppenderInvalidator>,
1689 active_copies: BTreeMap<ConnectionId, ActiveCopyFrom>,
1692
1693 staged_cancellation: BTreeMap<ConnectionId, (watch::Sender<bool>, watch::Receiver<bool>)>,
1696 introspection_subscribes: BTreeMap<GlobalId, IntrospectionSubscribe>,
1698
1699 write_locks: BTreeMap<CatalogItemId, Arc<tokio::sync::Mutex<()>>>,
1701 deferred_write_ops: BTreeMap<ConnectionId, DeferredOp>,
1703
1704 pending_writes: Vec<PendingWriteTxn>,
1706
1707 advance_timelines_interval: Interval,
1717
1718 serialized_ddl: LockedVecDeque<DeferredPlanStatement>,
1727
1728 secrets_controller: Arc<dyn SecretsController>,
1731 caching_secrets_reader: CachingSecretsReader,
1733
1734 cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
1737
1738 storage_usage_client: StorageUsageClient,
1740 storage_usage_collection_interval: Duration,
1742
1743 #[derivative(Debug = "ignore")]
1745 segment_client: Option<mz_segment::Client>,
1746
1747 metrics: Metrics,
1749 optimizer_metrics: OptimizerMetrics,
1751
1752 tracing_handle: TracingHandle,
1754
1755 statement_logging: StatementLogging,
1757
1758 webhook_concurrency_limit: WebhookConcurrencyLimiter,
1760
1761 pg_timestamp_oracle_config: Option<PostgresTimestampOracleConfig>,
1765
1766 check_cluster_scheduling_policies_interval: Interval,
1768
1769 cluster_scheduling_decisions: BTreeMap<ClusterId, BTreeMap<&'static str, SchedulingDecision>>,
1773
1774 caught_up_check_interval: Interval,
1777
1778 caught_up_check: Option<CaughtUpCheckContext>,
1781
1782 installed_watch_sets: BTreeMap<WatchSetId, (ConnectionId, WatchSetResponse)>,
1784
1785 connection_watch_sets: BTreeMap<ConnectionId, BTreeSet<WatchSetId>>,
1787
1788 cluster_replica_statuses: ClusterReplicaStatuses,
1790
1791 read_only_controllers: bool,
1795
1796 buffered_builtin_table_updates: Option<Vec<BuiltinTableUpdate>>,
1804
1805 license_key: ValidatedLicenseKey,
1806}
1807
1808impl Coordinator {
1809 #[instrument(name = "coord::bootstrap")]
1813 pub(crate) async fn bootstrap(
1814 &mut self,
1815 boot_ts: Timestamp,
1816 migrated_storage_collections_0dt: BTreeSet<CatalogItemId>,
1817 mut builtin_table_updates: Vec<BuiltinTableUpdate>,
1818 cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
1819 uncached_local_exprs: BTreeMap<GlobalId, LocalExpressions>,
1820 audit_logs_iterator: AuditLogIterator,
1821 ) -> Result<(), AdapterError> {
1822 let bootstrap_start = Instant::now();
1823 info!("startup: coordinator init: bootstrap beginning");
1824 info!("startup: coordinator init: bootstrap: preamble beginning");
1825
1826 let cluster_statuses: Vec<(_, Vec<_>)> = self
1829 .catalog()
1830 .clusters()
1831 .map(|cluster| {
1832 (
1833 cluster.id(),
1834 cluster
1835 .replicas()
1836 .map(|replica| {
1837 (replica.replica_id, replica.config.location.num_processes())
1838 })
1839 .collect(),
1840 )
1841 })
1842 .collect();
1843 let now = self.now_datetime();
1844 for (cluster_id, replica_statuses) in cluster_statuses {
1845 self.cluster_replica_statuses
1846 .initialize_cluster_statuses(cluster_id);
1847 for (replica_id, num_processes) in replica_statuses {
1848 self.cluster_replica_statuses
1849 .initialize_cluster_replica_statuses(
1850 cluster_id,
1851 replica_id,
1852 num_processes,
1853 now,
1854 );
1855 }
1856 }
1857
1858 let system_config = self.catalog().system_config();
1859
1860 mz_metrics::update_dyncfg(&system_config.dyncfg_updates());
1862
1863 let compute_config = flags::compute_config(system_config);
1865 let storage_config = flags::storage_config(system_config);
1866 let scheduling_config = flags::orchestrator_scheduling_config(system_config);
1867 let dyncfg_updates = system_config.dyncfg_updates();
1868 self.controller.compute.update_configuration(compute_config);
1869 self.controller.storage.update_parameters(storage_config);
1870 self.controller
1871 .update_orchestrator_scheduling_config(scheduling_config);
1872 self.controller.update_configuration(dyncfg_updates);
1873
1874 let mut policies_to_set: BTreeMap<CompactionWindow, CollectionIdBundle> =
1875 Default::default();
1876
1877 let enable_worker_core_affinity =
1878 self.catalog().system_config().enable_worker_core_affinity();
1879 for instance in self.catalog.clusters() {
1880 self.controller.create_cluster(
1881 instance.id,
1882 ClusterConfig {
1883 arranged_logs: instance.log_indexes.clone(),
1884 workload_class: instance.config.workload_class.clone(),
1885 },
1886 )?;
1887 for replica in instance.replicas() {
1888 let role = instance.role();
1889 self.controller.create_replica(
1890 instance.id,
1891 replica.replica_id,
1892 instance.name.clone(),
1893 replica.name.clone(),
1894 role,
1895 replica.config.clone(),
1896 enable_worker_core_affinity,
1897 )?;
1898 }
1899 }
1900
1901 info!(
1902 "startup: coordinator init: bootstrap: preamble complete in {:?}",
1903 bootstrap_start.elapsed()
1904 );
1905
1906 let init_storage_collections_start = Instant::now();
1907 info!("startup: coordinator init: bootstrap: storage collections init beginning");
1908 self.bootstrap_storage_collections(&migrated_storage_collections_0dt)
1909 .await;
1910 info!(
1911 "startup: coordinator init: bootstrap: storage collections init complete in {:?}",
1912 init_storage_collections_start.elapsed()
1913 );
1914
1915 self.controller.start_compute_introspection_sink();
1920
1921 let optimize_dataflows_start = Instant::now();
1922 info!("startup: coordinator init: bootstrap: optimize dataflow plans beginning");
1923 let entries: Vec<_> = self.catalog().entries().cloned().collect();
1924 let uncached_global_exps = self.bootstrap_dataflow_plans(&entries, cached_global_exprs)?;
1925 info!(
1926 "startup: coordinator init: bootstrap: optimize dataflow plans complete in {:?}",
1927 optimize_dataflows_start.elapsed()
1928 );
1929
1930 let _fut = self.catalog().update_expression_cache(
1932 uncached_local_exprs.into_iter().collect(),
1933 uncached_global_exps.into_iter().collect(),
1934 );
1935
1936 let bootstrap_as_ofs_start = Instant::now();
1940 info!("startup: coordinator init: bootstrap: dataflow as-of bootstrapping beginning");
1941 let dataflow_read_holds = self.bootstrap_dataflow_as_ofs().await;
1942 info!(
1943 "startup: coordinator init: bootstrap: dataflow as-of bootstrapping complete in {:?}",
1944 bootstrap_as_ofs_start.elapsed()
1945 );
1946
1947 let postamble_start = Instant::now();
1948 info!("startup: coordinator init: bootstrap: postamble beginning");
1949
1950 let logs: BTreeSet<_> = BUILTINS::logs()
1951 .map(|log| self.catalog().resolve_builtin_log(log))
1952 .flat_map(|item_id| self.catalog().get_global_ids(&item_id))
1953 .collect();
1954
1955 let mut privatelink_connections = BTreeMap::new();
1956
1957 for entry in &entries {
1958 debug!(
1959 "coordinator init: installing {} {}",
1960 entry.item().typ(),
1961 entry.id()
1962 );
1963 let mut policy = entry.item().initial_logical_compaction_window();
1964 match entry.item() {
1965 CatalogItem::Source(source) => {
1971 if source.custom_logical_compaction_window.is_none() {
1973 if let DataSourceDesc::IngestionExport { ingestion_id, .. } =
1974 source.data_source
1975 {
1976 policy = Some(
1977 self.catalog()
1978 .get_entry(&ingestion_id)
1979 .source()
1980 .expect("must be source")
1981 .custom_logical_compaction_window
1982 .unwrap_or_default(),
1983 );
1984 }
1985 }
1986 policies_to_set
1987 .entry(policy.expect("sources have a compaction window"))
1988 .or_insert_with(Default::default)
1989 .storage_ids
1990 .insert(source.global_id());
1991 }
1992 CatalogItem::Table(table) => {
1993 policies_to_set
1994 .entry(policy.expect("tables have a compaction window"))
1995 .or_insert_with(Default::default)
1996 .storage_ids
1997 .extend(table.global_ids());
1998 }
1999 CatalogItem::Index(idx) => {
2000 let policy_entry = policies_to_set
2001 .entry(policy.expect("indexes have a compaction window"))
2002 .or_insert_with(Default::default);
2003
2004 if logs.contains(&idx.on) {
2005 policy_entry
2006 .compute_ids
2007 .entry(idx.cluster_id)
2008 .or_insert_with(BTreeSet::new)
2009 .insert(idx.global_id());
2010 } else {
2011 let df_desc = self
2012 .catalog()
2013 .try_get_physical_plan(&idx.global_id())
2014 .expect("added in `bootstrap_dataflow_plans`")
2015 .clone();
2016
2017 let df_meta = self
2018 .catalog()
2019 .try_get_dataflow_metainfo(&idx.global_id())
2020 .expect("added in `bootstrap_dataflow_plans`");
2021
2022 if self.catalog().state().system_config().enable_mz_notices() {
2023 self.catalog().state().pack_optimizer_notices(
2025 &mut builtin_table_updates,
2026 df_meta.optimizer_notices.iter(),
2027 Diff::ONE,
2028 );
2029 }
2030
2031 policy_entry
2034 .compute_ids
2035 .entry(idx.cluster_id)
2036 .or_insert_with(Default::default)
2037 .extend(df_desc.export_ids());
2038
2039 self.controller
2040 .compute
2041 .create_dataflow(idx.cluster_id, df_desc, None)
2042 .unwrap_or_terminate("cannot fail to create dataflows");
2043 }
2044 }
2045 CatalogItem::View(_) => (),
2046 CatalogItem::MaterializedView(mview) => {
2047 policies_to_set
2048 .entry(policy.expect("materialized views have a compaction window"))
2049 .or_insert_with(Default::default)
2050 .storage_ids
2051 .insert(mview.global_id());
2052
2053 let mut df_desc = self
2054 .catalog()
2055 .try_get_physical_plan(&mview.global_id())
2056 .expect("added in `bootstrap_dataflow_plans`")
2057 .clone();
2058
2059 if let Some(initial_as_of) = mview.initial_as_of.clone() {
2060 df_desc.set_initial_as_of(initial_as_of);
2061 }
2062
2063 let until = mview
2065 .refresh_schedule
2066 .as_ref()
2067 .and_then(|s| s.last_refresh())
2068 .and_then(|r| r.try_step_forward());
2069 if let Some(until) = until {
2070 df_desc.until.meet_assign(&Antichain::from_elem(until));
2071 }
2072
2073 let df_meta = self
2074 .catalog()
2075 .try_get_dataflow_metainfo(&mview.global_id())
2076 .expect("added in `bootstrap_dataflow_plans`");
2077
2078 if self.catalog().state().system_config().enable_mz_notices() {
2079 self.catalog().state().pack_optimizer_notices(
2081 &mut builtin_table_updates,
2082 df_meta.optimizer_notices.iter(),
2083 Diff::ONE,
2084 );
2085 }
2086
2087 self.ship_dataflow(df_desc, mview.cluster_id, None).await;
2088 }
2089 CatalogItem::Sink(sink) => {
2090 policies_to_set
2091 .entry(CompactionWindow::Default)
2092 .or_insert_with(Default::default)
2093 .storage_ids
2094 .insert(sink.global_id());
2095 }
2096 CatalogItem::Connection(catalog_connection) => {
2097 if let ConnectionDetails::AwsPrivatelink(conn) = &catalog_connection.details {
2098 privatelink_connections.insert(
2099 entry.id(),
2100 VpcEndpointConfig {
2101 aws_service_name: conn.service_name.clone(),
2102 availability_zone_ids: conn.availability_zones.clone(),
2103 },
2104 );
2105 }
2106 }
2107 CatalogItem::ContinualTask(ct) => {
2108 policies_to_set
2109 .entry(policy.expect("continual tasks have a compaction window"))
2110 .or_insert_with(Default::default)
2111 .storage_ids
2112 .insert(ct.global_id());
2113
2114 let mut df_desc = self
2115 .catalog()
2116 .try_get_physical_plan(&ct.global_id())
2117 .expect("added in `bootstrap_dataflow_plans`")
2118 .clone();
2119
2120 if let Some(initial_as_of) = ct.initial_as_of.clone() {
2121 df_desc.set_initial_as_of(initial_as_of);
2122 }
2123
2124 let df_meta = self
2125 .catalog()
2126 .try_get_dataflow_metainfo(&ct.global_id())
2127 .expect("added in `bootstrap_dataflow_plans`");
2128
2129 if self.catalog().state().system_config().enable_mz_notices() {
2130 self.catalog().state().pack_optimizer_notices(
2132 &mut builtin_table_updates,
2133 df_meta.optimizer_notices.iter(),
2134 Diff::ONE,
2135 );
2136 }
2137
2138 self.ship_dataflow(df_desc, ct.cluster_id, None).await;
2139 }
2140 CatalogItem::Log(_)
2142 | CatalogItem::Type(_)
2143 | CatalogItem::Func(_)
2144 | CatalogItem::Secret(_) => {}
2145 }
2146 }
2147
2148 if let Some(cloud_resource_controller) = &self.cloud_resource_controller {
2149 let existing_vpc_endpoints = cloud_resource_controller
2151 .list_vpc_endpoints()
2152 .await
2153 .context("list vpc endpoints")?;
2154 let existing_vpc_endpoints = BTreeSet::from_iter(existing_vpc_endpoints.into_keys());
2155 let desired_vpc_endpoints = privatelink_connections.keys().cloned().collect();
2156 let vpc_endpoints_to_remove = existing_vpc_endpoints.difference(&desired_vpc_endpoints);
2157 for id in vpc_endpoints_to_remove {
2158 cloud_resource_controller
2159 .delete_vpc_endpoint(*id)
2160 .await
2161 .context("deleting extraneous vpc endpoint")?;
2162 }
2163
2164 for (id, spec) in privatelink_connections {
2166 cloud_resource_controller
2167 .ensure_vpc_endpoint(id, spec)
2168 .await
2169 .context("ensuring vpc endpoint")?;
2170 }
2171 }
2172
2173 drop(dataflow_read_holds);
2176 for (cw, policies) in policies_to_set {
2178 self.initialize_read_policies(&policies, cw).await;
2179 }
2180
2181 builtin_table_updates.extend(
2183 self.catalog().state().resolve_builtin_table_updates(
2184 self.catalog().state().pack_all_replica_size_updates(),
2185 ),
2186 );
2187
2188 debug!("startup: coordinator init: bootstrap: initializing migrated builtin tables");
2189 let migrated_updates_fut = if self.controller.read_only() {
2195 let min_timestamp = Timestamp::minimum();
2196 let migrated_builtin_table_updates: Vec<_> = builtin_table_updates
2197 .extract_if(.., |update| {
2198 let gid = self.catalog().get_entry(&update.id).latest_global_id();
2199 migrated_storage_collections_0dt.contains(&update.id)
2200 && self
2201 .controller
2202 .storage_collections
2203 .collection_frontiers(gid)
2204 .expect("all tables are registered")
2205 .write_frontier
2206 .elements()
2207 == &[min_timestamp]
2208 })
2209 .collect();
2210 if migrated_builtin_table_updates.is_empty() {
2211 futures::future::ready(()).boxed()
2212 } else {
2213 let mut grouped_appends: BTreeMap<GlobalId, Vec<TableData>> = BTreeMap::new();
2215 for update in migrated_builtin_table_updates {
2216 let gid = self.catalog().get_entry(&update.id).latest_global_id();
2217 grouped_appends.entry(gid).or_default().push(update.data);
2218 }
2219 info!(
2220 "coordinator init: rehydrating migrated builtin tables in read-only mode: {:?}",
2221 grouped_appends.keys().collect::<Vec<_>>()
2222 );
2223
2224 let mut all_appends = Vec::with_capacity(grouped_appends.len());
2226 for (item_id, table_data) in grouped_appends.into_iter() {
2227 let mut all_rows = Vec::new();
2228 let mut all_data = Vec::new();
2229 for data in table_data {
2230 match data {
2231 TableData::Rows(rows) => all_rows.extend(rows),
2232 TableData::Batches(_) => all_data.push(data),
2233 }
2234 }
2235 differential_dataflow::consolidation::consolidate(&mut all_rows);
2236 all_data.push(TableData::Rows(all_rows));
2237
2238 all_appends.push((item_id, all_data));
2240 }
2241
2242 let fut = self
2243 .controller
2244 .storage
2245 .append_table(min_timestamp, boot_ts.step_forward(), all_appends)
2246 .expect("cannot fail to append");
2247 async {
2248 fut.await
2249 .expect("One-shot shouldn't be dropped during bootstrap")
2250 .unwrap_or_terminate("cannot fail to append")
2251 }
2252 .boxed()
2253 }
2254 } else {
2255 futures::future::ready(()).boxed()
2256 };
2257
2258 info!(
2259 "startup: coordinator init: bootstrap: postamble complete in {:?}",
2260 postamble_start.elapsed()
2261 );
2262
2263 let builtin_update_start = Instant::now();
2264 info!("startup: coordinator init: bootstrap: generate builtin updates beginning");
2265
2266 if self.controller.read_only() {
2267 info!(
2268 "coordinator init: bootstrap: stashing builtin table updates while in read-only mode"
2269 );
2270
2271 let audit_join_start = Instant::now();
2273 info!("startup: coordinator init: bootstrap: audit log deserialization beginning");
2274 let audit_log_updates: Vec<_> = audit_logs_iterator
2275 .map(|(audit_log, ts)| StateUpdate {
2276 kind: StateUpdateKind::AuditLog(audit_log),
2277 ts,
2278 diff: StateDiff::Addition,
2279 })
2280 .collect();
2281 let audit_log_builtin_table_updates = self
2282 .catalog()
2283 .state()
2284 .generate_builtin_table_updates(audit_log_updates);
2285 builtin_table_updates.extend(audit_log_builtin_table_updates);
2286 info!(
2287 "startup: coordinator init: bootstrap: audit log deserialization complete in {:?}",
2288 audit_join_start.elapsed()
2289 );
2290 self.buffered_builtin_table_updates
2291 .as_mut()
2292 .expect("in read-only mode")
2293 .append(&mut builtin_table_updates);
2294 } else {
2295 self.bootstrap_tables(&entries, builtin_table_updates, audit_logs_iterator)
2296 .await;
2297 };
2298 info!(
2299 "startup: coordinator init: bootstrap: generate builtin updates complete in {:?}",
2300 builtin_update_start.elapsed()
2301 );
2302
2303 let cleanup_secrets_start = Instant::now();
2304 info!("startup: coordinator init: bootstrap: generate secret cleanup beginning");
2305 {
2309 let Self {
2312 secrets_controller,
2313 catalog,
2314 ..
2315 } = self;
2316
2317 let next_user_item_id = catalog.get_next_user_item_id().await?;
2318 let next_system_item_id = catalog.get_next_system_item_id().await?;
2319 let read_only = self.controller.read_only();
2320 let catalog_ids: BTreeSet<CatalogItemId> =
2325 catalog.entries().map(|entry| entry.id()).collect();
2326 let secrets_controller = Arc::clone(secrets_controller);
2327
2328 spawn(|| "cleanup-orphaned-secrets", async move {
2329 if read_only {
2330 info!(
2331 "coordinator init: not cleaning up orphaned secrets while in read-only mode"
2332 );
2333 return;
2334 }
2335 info!("coordinator init: cleaning up orphaned secrets");
2336
2337 match secrets_controller.list().await {
2338 Ok(controller_secrets) => {
2339 let controller_secrets: BTreeSet<CatalogItemId> =
2340 controller_secrets.into_iter().collect();
2341 let orphaned = controller_secrets.difference(&catalog_ids);
2342 for id in orphaned {
2343 let id_too_large = match id {
2344 CatalogItemId::System(id) => *id >= next_system_item_id,
2345 CatalogItemId::User(id) => *id >= next_user_item_id,
2346 CatalogItemId::IntrospectionSourceIndex(_)
2347 | CatalogItemId::Transient(_) => false,
2348 };
2349 if id_too_large {
2350 info!(
2351 %next_user_item_id, %next_system_item_id,
2352 "coordinator init: not deleting orphaned secret {id} that was likely created by a newer deploy generation"
2353 );
2354 } else {
2355 info!("coordinator init: deleting orphaned secret {id}");
2356 fail_point!("orphan_secrets");
2357 if let Err(e) = secrets_controller.delete(*id).await {
2358 warn!(
2359 "Dropping orphaned secret has encountered an error: {}",
2360 e
2361 );
2362 }
2363 }
2364 }
2365 }
2366 Err(e) => warn!("Failed to list secrets during orphan cleanup: {:?}", e),
2367 }
2368 });
2369 }
2370 info!(
2371 "startup: coordinator init: bootstrap: generate secret cleanup complete in {:?}",
2372 cleanup_secrets_start.elapsed()
2373 );
2374
2375 let final_steps_start = Instant::now();
2377 info!(
2378 "startup: coordinator init: bootstrap: migrate builtin tables in read-only mode beginning"
2379 );
2380 migrated_updates_fut
2381 .instrument(info_span!("coord::bootstrap::final"))
2382 .await;
2383
2384 debug!(
2385 "startup: coordinator init: bootstrap: announcing completion of initialization to controller"
2386 );
2387 self.controller.initialization_complete();
2389
2390 self.bootstrap_introspection_subscribes().await;
2392
2393 info!(
2394 "startup: coordinator init: bootstrap: migrate builtin tables in read-only mode complete in {:?}",
2395 final_steps_start.elapsed()
2396 );
2397
2398 info!(
2399 "startup: coordinator init: bootstrap complete in {:?}",
2400 bootstrap_start.elapsed()
2401 );
2402 Ok(())
2403 }
2404
2405 #[allow(clippy::async_yields_async)]
2410 #[instrument]
2411 async fn bootstrap_tables(
2412 &mut self,
2413 entries: &[CatalogEntry],
2414 mut builtin_table_updates: Vec<BuiltinTableUpdate>,
2415 audit_logs_iterator: AuditLogIterator,
2416 ) {
2417 struct TableMetadata<'a> {
2419 id: CatalogItemId,
2420 name: &'a QualifiedItemName,
2421 table: &'a Table,
2422 }
2423
2424 let table_metas: Vec<_> = entries
2426 .into_iter()
2427 .filter_map(|entry| {
2428 entry.table().map(|table| TableMetadata {
2429 id: entry.id(),
2430 name: entry.name(),
2431 table,
2432 })
2433 })
2434 .collect();
2435
2436 debug!("coordinator init: advancing all tables to current timestamp");
2438 let WriteTimestamp {
2439 timestamp: write_ts,
2440 advance_to,
2441 } = self.get_local_write_ts().await;
2442 let appends = table_metas
2443 .iter()
2444 .map(|meta| (meta.table.global_id_writes(), Vec::new()))
2445 .collect();
2446 let table_fence_rx = self
2450 .controller
2451 .storage
2452 .append_table(write_ts.clone(), advance_to, appends)
2453 .expect("invalid updates");
2454
2455 self.apply_local_write(write_ts).await;
2456
2457 debug!("coordinator init: resetting system tables");
2459 let read_ts = self.get_local_read_ts().await;
2460
2461 let mz_storage_usage_by_shard_schema: SchemaSpecifier = self
2464 .catalog()
2465 .resolve_system_schema(MZ_STORAGE_USAGE_BY_SHARD.schema)
2466 .into();
2467 let is_storage_usage_by_shard = |meta: &TableMetadata| -> bool {
2468 meta.name.item == MZ_STORAGE_USAGE_BY_SHARD.name
2469 && meta.name.qualifiers.schema_spec == mz_storage_usage_by_shard_schema
2470 };
2471
2472 let mut retraction_tasks = Vec::new();
2473 let mut system_tables: Vec<_> = table_metas
2474 .iter()
2475 .filter(|meta| meta.id.is_system() && !is_storage_usage_by_shard(meta))
2476 .collect();
2477
2478 let (audit_events_idx, _) = system_tables
2480 .iter()
2481 .find_position(|table| {
2482 table.id == self.catalog().resolve_builtin_table(&MZ_AUDIT_EVENTS)
2483 })
2484 .expect("mz_audit_events must exist");
2485 let audit_events = system_tables.remove(audit_events_idx);
2486 let audit_log_task = self.bootstrap_audit_log_table(
2487 audit_events.id,
2488 audit_events.name,
2489 audit_events.table,
2490 audit_logs_iterator,
2491 read_ts,
2492 );
2493
2494 for system_table in system_tables {
2495 let table_id = system_table.id;
2496 let full_name = self.catalog().resolve_full_name(system_table.name, None);
2497 debug!("coordinator init: resetting system table {full_name} ({table_id})");
2498
2499 let snapshot_fut = self
2501 .controller
2502 .storage_collections
2503 .snapshot_cursor(system_table.table.global_id_writes(), read_ts);
2504 let batch_fut = self
2505 .controller
2506 .storage_collections
2507 .create_update_builder(system_table.table.global_id_writes());
2508
2509 let task = spawn(|| format!("snapshot-{table_id}"), async move {
2510 let mut batch = batch_fut
2512 .await
2513 .unwrap_or_terminate("cannot fail to create a batch for a BuiltinTable");
2514 tracing::info!(?table_id, "starting snapshot");
2515 let mut snapshot_cursor = snapshot_fut
2517 .await
2518 .unwrap_or_terminate("cannot fail to snapshot");
2519
2520 while let Some(values) = snapshot_cursor.next().await {
2522 for ((key, _val), _t, d) in values {
2523 let key = key.expect("builtin table had errors");
2524 let d_invert = d.neg();
2525 batch.add(&key, &(), &d_invert).await;
2526 }
2527 }
2528 tracing::info!(?table_id, "finished snapshot");
2529
2530 let batch = batch.finish().await;
2531 BuiltinTableUpdate::batch(table_id, batch)
2532 });
2533 retraction_tasks.push(task);
2534 }
2535
2536 let retractions_res = futures::future::join_all(retraction_tasks).await;
2537 for retractions in retractions_res {
2538 let retractions = retractions.expect("cannot fail to fetch snapshot");
2539 builtin_table_updates.push(retractions);
2540 }
2541
2542 let audit_join_start = Instant::now();
2543 info!("startup: coordinator init: bootstrap: join audit log deserialization beginning");
2544 let audit_log_updates = audit_log_task
2545 .await
2546 .expect("cannot fail to fetch audit log updates");
2547 let audit_log_builtin_table_updates = self
2548 .catalog()
2549 .state()
2550 .generate_builtin_table_updates(audit_log_updates);
2551 builtin_table_updates.extend(audit_log_builtin_table_updates);
2552 info!(
2553 "startup: coordinator init: bootstrap: join audit log deserialization complete in {:?}",
2554 audit_join_start.elapsed()
2555 );
2556
2557 table_fence_rx
2559 .await
2560 .expect("One-shot shouldn't be dropped during bootstrap")
2561 .unwrap_or_terminate("cannot fail to append");
2562
2563 info!("coordinator init: sending builtin table updates");
2564 let (_builtin_updates_fut, write_ts) = self
2565 .builtin_table_update()
2566 .execute(builtin_table_updates)
2567 .await;
2568 info!(?write_ts, "our write ts");
2569 if let Some(write_ts) = write_ts {
2570 self.apply_local_write(write_ts).await;
2571 }
2572 }
2573
2574 #[instrument]
2578 fn bootstrap_audit_log_table<'a>(
2579 &mut self,
2580 table_id: CatalogItemId,
2581 name: &'a QualifiedItemName,
2582 table: &'a Table,
2583 audit_logs_iterator: AuditLogIterator,
2584 read_ts: Timestamp,
2585 ) -> JoinHandle<Vec<StateUpdate>> {
2586 let full_name = self.catalog().resolve_full_name(name, None);
2587 debug!("coordinator init: reconciling audit log: {full_name} ({table_id})");
2588 let current_contents_fut = self
2589 .controller
2590 .storage_collections
2591 .snapshot(table.global_id_writes(), read_ts);
2592 spawn(|| format!("snapshot-audit-log-{table_id}"), async move {
2593 let current_contents = current_contents_fut
2594 .await
2595 .unwrap_or_terminate("cannot fail to fetch snapshot");
2596 let contents_len = current_contents.len();
2597 debug!("coordinator init: audit log table ({table_id}) size {contents_len}");
2598
2599 let max_table_id = current_contents
2601 .into_iter()
2602 .filter(|(_, diff)| *diff == 1)
2603 .map(|(row, _diff)| row.unpack_first().unwrap_uint64())
2604 .sorted()
2605 .rev()
2606 .next();
2607
2608 audit_logs_iterator
2610 .take_while(|(audit_log, _)| match max_table_id {
2611 Some(id) => audit_log.event.sortable_id() > id,
2612 None => true,
2613 })
2614 .map(|(audit_log, ts)| StateUpdate {
2615 kind: StateUpdateKind::AuditLog(audit_log),
2616 ts,
2617 diff: StateDiff::Addition,
2618 })
2619 .collect::<Vec<_>>()
2620 })
2621 }
2622
2623 #[instrument]
2636 async fn bootstrap_storage_collections(
2637 &mut self,
2638 migrated_storage_collections: &BTreeSet<CatalogItemId>,
2639 ) {
2640 let catalog = self.catalog();
2641 let source_status_collection_id = catalog
2642 .resolve_builtin_storage_collection(&mz_catalog::builtin::MZ_SOURCE_STATUS_HISTORY);
2643 let source_status_collection_id = catalog
2644 .get_entry(&source_status_collection_id)
2645 .latest_global_id();
2646
2647 let source_desc =
2648 |data_source: &DataSourceDesc, desc: &RelationDesc, timeline: &Timeline| {
2649 let (data_source, status_collection_id) = match data_source.clone() {
2650 DataSourceDesc::Ingestion {
2652 ingestion_desc:
2653 mz_sql::plan::Ingestion {
2654 desc,
2655 progress_subsource,
2656 },
2657 cluster_id,
2658 } => {
2659 let desc = desc.into_inline_connection(catalog.state());
2660 let progress_subsource =
2663 catalog.get_entry(&progress_subsource).latest_global_id();
2664 let ingestion = mz_storage_types::sources::IngestionDescription::new(
2665 desc,
2666 cluster_id,
2667 progress_subsource,
2668 );
2669
2670 (
2671 DataSource::Ingestion(ingestion.clone()),
2672 Some(source_status_collection_id),
2673 )
2674 }
2675 DataSourceDesc::IngestionExport {
2676 ingestion_id,
2677 external_reference: _,
2678 details,
2679 data_config,
2680 } => {
2681 let ingestion_id = catalog.get_entry(&ingestion_id).latest_global_id();
2684 (
2685 DataSource::IngestionExport {
2686 ingestion_id,
2687 details,
2688 data_config: data_config.into_inline_connection(catalog.state()),
2689 },
2690 Some(source_status_collection_id),
2691 )
2692 }
2693 DataSourceDesc::Webhook { .. } => {
2694 (DataSource::Webhook, Some(source_status_collection_id))
2695 }
2696 DataSourceDesc::Progress => (DataSource::Progress, None),
2697 DataSourceDesc::Introspection(introspection) => {
2698 (DataSource::Introspection(introspection), None)
2699 }
2700 };
2701 CollectionDescription {
2702 desc: desc.clone(),
2703 data_source,
2704 since: None,
2705 status_collection_id,
2706 timeline: Some(timeline.clone()),
2707 }
2708 };
2709
2710 let mut compute_collections = vec![];
2711 let mut collections = vec![];
2712 let mut new_builtin_continual_tasks = vec![];
2713 for entry in catalog.entries() {
2714 match entry.item() {
2715 CatalogItem::Source(source) => {
2716 collections.push((
2717 source.global_id(),
2718 source_desc(&source.data_source, &source.desc, &source.timeline),
2719 ));
2720 }
2721 CatalogItem::Table(table) => {
2722 match &table.data_source {
2723 TableDataSource::TableWrites { defaults: _ } => {
2724 let versions: BTreeMap<_, _> = table
2725 .collection_descs()
2726 .map(|(gid, version, desc)| (version, (gid, desc)))
2727 .collect();
2728 let collection_descs = versions.iter().map(|(version, (gid, desc))| {
2729 let next_version = version.bump();
2730 let primary_collection =
2731 versions.get(&next_version).map(|(gid, _desc)| gid).copied();
2732 let collection_desc = CollectionDescription::for_table(
2733 desc.clone(),
2734 primary_collection,
2735 );
2736
2737 (*gid, collection_desc)
2738 });
2739 collections.extend(collection_descs);
2740 }
2741 TableDataSource::DataSource {
2742 desc: data_source_desc,
2743 timeline,
2744 } => {
2745 soft_assert_eq_or_log!(table.collections.len(), 1);
2747 let collection_descs =
2748 table.collection_descs().map(|(gid, _version, desc)| {
2749 (gid, source_desc(data_source_desc, &desc, timeline))
2750 });
2751 collections.extend(collection_descs);
2752 }
2753 };
2754 }
2755 CatalogItem::MaterializedView(mv) => {
2756 let collection_desc = CollectionDescription {
2757 desc: mv.desc.clone(),
2758 data_source: DataSource::Other,
2759 since: mv.initial_as_of.clone(),
2760 status_collection_id: None,
2761 timeline: None,
2762 };
2763 compute_collections.push((mv.global_id(), mv.desc.clone()));
2764 collections.push((mv.global_id(), collection_desc));
2765 }
2766 CatalogItem::ContinualTask(ct) => {
2767 let collection_desc = CollectionDescription {
2768 desc: ct.desc.clone(),
2769 data_source: DataSource::Other,
2770 since: ct.initial_as_of.clone(),
2771 status_collection_id: None,
2772 timeline: None,
2773 };
2774 if ct.global_id().is_system() && collection_desc.since.is_none() {
2775 new_builtin_continual_tasks.push((ct.global_id(), collection_desc));
2779 } else {
2780 compute_collections.push((ct.global_id(), ct.desc.clone()));
2781 collections.push((ct.global_id(), collection_desc));
2782 }
2783 }
2784 CatalogItem::Sink(sink) => {
2785 let storage_sink_from_entry = self.catalog().get_entry_by_global_id(&sink.from);
2786 let from_desc = storage_sink_from_entry
2787 .desc(&self.catalog().resolve_full_name(
2788 storage_sink_from_entry.name(),
2789 storage_sink_from_entry.conn_id(),
2790 ))
2791 .expect("sinks can only be built on items with descs")
2792 .into_owned();
2793 let collection_desc = CollectionDescription {
2794 desc: KAFKA_PROGRESS_DESC.clone(),
2796 data_source: DataSource::Sink {
2797 desc: ExportDescription {
2798 sink: StorageSinkDesc {
2799 from: sink.from,
2800 from_desc,
2801 connection: sink
2802 .connection
2803 .clone()
2804 .into_inline_connection(self.catalog().state()),
2805 envelope: sink.envelope,
2806 as_of: Antichain::from_elem(Timestamp::minimum()),
2807 with_snapshot: sink.with_snapshot,
2808 version: sink.version,
2809 from_storage_metadata: (),
2810 to_storage_metadata: (),
2811 },
2812 instance_id: sink.cluster_id,
2813 },
2814 },
2815 since: None,
2816 status_collection_id: None,
2817 timeline: None,
2818 };
2819 collections.push((sink.global_id, collection_desc));
2820 }
2821 _ => (),
2822 }
2823 }
2824
2825 let register_ts = if self.controller.read_only() {
2826 self.get_local_read_ts().await
2827 } else {
2828 self.get_local_write_ts().await.timestamp
2831 };
2832
2833 let storage_metadata = self.catalog.state().storage_metadata();
2834 let migrated_storage_collections = migrated_storage_collections
2835 .into_iter()
2836 .flat_map(|item_id| self.catalog.get_entry(item_id).global_ids())
2837 .collect();
2838
2839 self.controller
2844 .storage
2845 .evolve_nullability_for_bootstrap(storage_metadata, compute_collections)
2846 .await
2847 .unwrap_or_terminate("cannot fail to evolve collections");
2848
2849 self.controller
2850 .storage
2851 .create_collections_for_bootstrap(
2852 storage_metadata,
2853 Some(register_ts),
2854 collections,
2855 &migrated_storage_collections,
2856 )
2857 .await
2858 .unwrap_or_terminate("cannot fail to create collections");
2859
2860 self.bootstrap_builtin_continual_tasks(new_builtin_continual_tasks)
2861 .await;
2862
2863 if !self.controller.read_only() {
2864 self.apply_local_write(register_ts).await;
2865 }
2866 }
2867
2868 async fn bootstrap_builtin_continual_tasks(
2875 &mut self,
2876 mut collections: Vec<(GlobalId, CollectionDescription<Timestamp>)>,
2878 ) {
2879 for (id, collection) in &mut collections {
2880 let entry = self.catalog.get_entry_by_global_id(id);
2881 let ct = match &entry.item {
2882 CatalogItem::ContinualTask(ct) => ct.clone(),
2883 _ => unreachable!("only called with continual task builtins"),
2884 };
2885 let debug_name = self
2886 .catalog()
2887 .resolve_full_name(entry.name(), None)
2888 .to_string();
2889 let (_optimized_plan, physical_plan, _metainfo) = self
2890 .optimize_create_continual_task(&ct, *id, self.owned_catalog(), debug_name)
2891 .expect("builtin CT should optimize successfully");
2892
2893 let mut id_bundle = dataflow_import_id_bundle(&physical_plan, ct.cluster_id);
2895 id_bundle.storage_ids.remove(id);
2897 let read_holds = self.acquire_read_holds(&id_bundle);
2898 let as_of = read_holds.least_valid_read();
2899
2900 collection.since = Some(as_of.clone());
2901 }
2902 self.controller
2903 .storage
2904 .create_collections(self.catalog.state().storage_metadata(), None, collections)
2905 .await
2906 .unwrap_or_terminate("cannot fail to create collections");
2907 }
2908
2909 #[instrument]
2920 fn bootstrap_dataflow_plans(
2921 &mut self,
2922 ordered_catalog_entries: &[CatalogEntry],
2923 mut cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
2924 ) -> Result<BTreeMap<GlobalId, GlobalExpressions>, AdapterError> {
2925 let mut instance_snapshots = BTreeMap::new();
2931 let mut uncached_expressions = BTreeMap::new();
2932
2933 let optimizer_config = OptimizerConfig::from(self.catalog().system_config());
2934
2935 for entry in ordered_catalog_entries {
2936 match entry.item() {
2937 CatalogItem::Index(idx) => {
2938 let compute_instance =
2940 instance_snapshots.entry(idx.cluster_id).or_insert_with(|| {
2941 self.instance_snapshot(idx.cluster_id)
2942 .expect("compute instance exists")
2943 });
2944 let global_id = idx.global_id();
2945
2946 if compute_instance.contains_collection(&global_id) {
2949 continue;
2950 }
2951
2952 let (optimized_plan, physical_plan, metainfo) =
2953 match cached_global_exprs.remove(&global_id) {
2954 Some(global_expressions)
2955 if global_expressions.optimizer_features
2956 == optimizer_config.features =>
2957 {
2958 debug!("global expression cache hit for {global_id:?}");
2959 (
2960 global_expressions.global_mir,
2961 global_expressions.physical_plan,
2962 global_expressions.dataflow_metainfos,
2963 )
2964 }
2965 Some(_) | None => {
2966 let (optimized_plan, global_lir_plan) = {
2967 let mut optimizer = optimize::index::Optimizer::new(
2969 self.owned_catalog(),
2970 compute_instance.clone(),
2971 global_id,
2972 optimizer_config.clone(),
2973 self.optimizer_metrics(),
2974 );
2975
2976 let index_plan = optimize::index::Index::new(
2978 entry.name().clone(),
2979 idx.on,
2980 idx.keys.to_vec(),
2981 );
2982 let global_mir_plan = optimizer.optimize(index_plan)?;
2983 let optimized_plan = global_mir_plan.df_desc().clone();
2984
2985 let global_lir_plan = optimizer.optimize(global_mir_plan)?;
2987
2988 (optimized_plan, global_lir_plan)
2989 };
2990
2991 let (physical_plan, metainfo) = global_lir_plan.unapply();
2992 let metainfo = {
2993 let notice_ids =
2995 std::iter::repeat_with(|| self.allocate_transient_id())
2996 .map(|(_item_id, gid)| gid)
2997 .take(metainfo.optimizer_notices.len())
2998 .collect::<Vec<_>>();
2999 self.catalog().render_notices(
3001 metainfo,
3002 notice_ids,
3003 Some(idx.global_id()),
3004 )
3005 };
3006 uncached_expressions.insert(
3007 global_id,
3008 GlobalExpressions {
3009 global_mir: optimized_plan.clone(),
3010 physical_plan: physical_plan.clone(),
3011 dataflow_metainfos: metainfo.clone(),
3012 optimizer_features: OptimizerFeatures::from(
3013 self.catalog().system_config(),
3014 ),
3015 },
3016 );
3017 (optimized_plan, physical_plan, metainfo)
3018 }
3019 };
3020
3021 let catalog = self.catalog_mut();
3022 catalog.set_optimized_plan(idx.global_id(), optimized_plan);
3023 catalog.set_physical_plan(idx.global_id(), physical_plan);
3024 catalog.set_dataflow_metainfo(idx.global_id(), metainfo);
3025
3026 compute_instance.insert_collection(idx.global_id());
3027 }
3028 CatalogItem::MaterializedView(mv) => {
3029 let compute_instance =
3031 instance_snapshots.entry(mv.cluster_id).or_insert_with(|| {
3032 self.instance_snapshot(mv.cluster_id)
3033 .expect("compute instance exists")
3034 });
3035 let global_id = mv.global_id();
3036
3037 let (optimized_plan, physical_plan, metainfo) =
3038 match cached_global_exprs.remove(&global_id) {
3039 Some(global_expressions)
3040 if global_expressions.optimizer_features
3041 == optimizer_config.features =>
3042 {
3043 debug!("global expression cache hit for {global_id:?}");
3044 (
3045 global_expressions.global_mir,
3046 global_expressions.physical_plan,
3047 global_expressions.dataflow_metainfos,
3048 )
3049 }
3050 Some(_) | None => {
3051 let (_, internal_view_id) = self.allocate_transient_id();
3052 let debug_name = self
3053 .catalog()
3054 .resolve_full_name(entry.name(), None)
3055 .to_string();
3056 let force_non_monotonic = Default::default();
3057
3058 let (optimized_plan, global_lir_plan) = {
3059 let mut optimizer = optimize::materialized_view::Optimizer::new(
3061 self.owned_catalog().as_optimizer_catalog(),
3062 compute_instance.clone(),
3063 global_id,
3064 internal_view_id,
3065 mv.desc.iter_names().cloned().collect(),
3066 mv.non_null_assertions.clone(),
3067 mv.refresh_schedule.clone(),
3068 debug_name,
3069 optimizer_config.clone(),
3070 self.optimizer_metrics(),
3071 force_non_monotonic,
3072 );
3073
3074 let global_mir_plan =
3076 optimizer.optimize(mv.optimized_expr.as_ref().clone())?;
3077 let optimized_plan = global_mir_plan.df_desc().clone();
3078
3079 let global_lir_plan = optimizer.optimize(global_mir_plan)?;
3081
3082 (optimized_plan, global_lir_plan)
3083 };
3084
3085 let (physical_plan, metainfo) = global_lir_plan.unapply();
3086 let metainfo = {
3087 let notice_ids =
3089 std::iter::repeat_with(|| self.allocate_transient_id())
3090 .map(|(_item_id, global_id)| global_id)
3091 .take(metainfo.optimizer_notices.len())
3092 .collect::<Vec<_>>();
3093 self.catalog().render_notices(
3095 metainfo,
3096 notice_ids,
3097 Some(mv.global_id()),
3098 )
3099 };
3100 uncached_expressions.insert(
3101 global_id,
3102 GlobalExpressions {
3103 global_mir: optimized_plan.clone(),
3104 physical_plan: physical_plan.clone(),
3105 dataflow_metainfos: metainfo.clone(),
3106 optimizer_features: OptimizerFeatures::from(
3107 self.catalog().system_config(),
3108 ),
3109 },
3110 );
3111 (optimized_plan, physical_plan, metainfo)
3112 }
3113 };
3114
3115 let catalog = self.catalog_mut();
3116 catalog.set_optimized_plan(mv.global_id(), optimized_plan);
3117 catalog.set_physical_plan(mv.global_id(), physical_plan);
3118 catalog.set_dataflow_metainfo(mv.global_id(), metainfo);
3119
3120 compute_instance.insert_collection(mv.global_id());
3121 }
3122 CatalogItem::ContinualTask(ct) => {
3123 let compute_instance =
3124 instance_snapshots.entry(ct.cluster_id).or_insert_with(|| {
3125 self.instance_snapshot(ct.cluster_id)
3126 .expect("compute instance exists")
3127 });
3128 let global_id = ct.global_id();
3129
3130 let (optimized_plan, physical_plan, metainfo) =
3131 match cached_global_exprs.remove(&global_id) {
3132 Some(global_expressions)
3133 if global_expressions.optimizer_features
3134 == optimizer_config.features =>
3135 {
3136 debug!("global expression cache hit for {global_id:?}");
3137 (
3138 global_expressions.global_mir,
3139 global_expressions.physical_plan,
3140 global_expressions.dataflow_metainfos,
3141 )
3142 }
3143 Some(_) | None => {
3144 let debug_name = self
3145 .catalog()
3146 .resolve_full_name(entry.name(), None)
3147 .to_string();
3148 let (optimized_plan, physical_plan, metainfo) = self
3149 .optimize_create_continual_task(
3150 ct,
3151 global_id,
3152 self.owned_catalog(),
3153 debug_name,
3154 )?;
3155 uncached_expressions.insert(
3156 global_id,
3157 GlobalExpressions {
3158 global_mir: optimized_plan.clone(),
3159 physical_plan: physical_plan.clone(),
3160 dataflow_metainfos: metainfo.clone(),
3161 optimizer_features: OptimizerFeatures::from(
3162 self.catalog().system_config(),
3163 ),
3164 },
3165 );
3166 (optimized_plan, physical_plan, metainfo)
3167 }
3168 };
3169
3170 let catalog = self.catalog_mut();
3171 catalog.set_optimized_plan(ct.global_id(), optimized_plan);
3172 catalog.set_physical_plan(ct.global_id(), physical_plan);
3173 catalog.set_dataflow_metainfo(ct.global_id(), metainfo);
3174
3175 compute_instance.insert_collection(ct.global_id());
3176 }
3177 _ => (),
3178 }
3179 }
3180
3181 Ok(uncached_expressions)
3182 }
3183
3184 async fn bootstrap_dataflow_as_ofs(&mut self) -> BTreeMap<GlobalId, ReadHold<Timestamp>> {
3194 let mut catalog_ids = Vec::new();
3195 let mut dataflows = Vec::new();
3196 let mut read_policies = BTreeMap::new();
3197 for entry in self.catalog.entries() {
3198 let gid = match entry.item() {
3199 CatalogItem::Index(idx) => idx.global_id(),
3200 CatalogItem::MaterializedView(mv) => mv.global_id(),
3201 CatalogItem::ContinualTask(ct) => ct.global_id(),
3202 CatalogItem::Table(_)
3203 | CatalogItem::Source(_)
3204 | CatalogItem::Log(_)
3205 | CatalogItem::View(_)
3206 | CatalogItem::Sink(_)
3207 | CatalogItem::Type(_)
3208 | CatalogItem::Func(_)
3209 | CatalogItem::Secret(_)
3210 | CatalogItem::Connection(_) => continue,
3211 };
3212 if let Some(plan) = self.catalog.try_get_physical_plan(&gid) {
3213 catalog_ids.push(gid);
3214 dataflows.push(plan.clone());
3215
3216 if let Some(compaction_window) = entry.item().initial_logical_compaction_window() {
3217 read_policies.insert(gid, compaction_window.into());
3218 }
3219 }
3220 }
3221
3222 let read_ts = self.get_local_read_ts().await;
3223 let read_holds = as_of_selection::run(
3224 &mut dataflows,
3225 &read_policies,
3226 &*self.controller.storage_collections,
3227 read_ts,
3228 self.controller.read_only(),
3229 );
3230
3231 let catalog = self.catalog_mut();
3232 for (id, plan) in catalog_ids.into_iter().zip(dataflows) {
3233 catalog.set_physical_plan(id, plan);
3234 }
3235
3236 read_holds
3237 }
3238
3239 fn serve(
3248 mut self,
3249 mut internal_cmd_rx: mpsc::UnboundedReceiver<Message>,
3250 mut strict_serializable_reads_rx: mpsc::UnboundedReceiver<(ConnectionId, PendingReadTxn)>,
3251 mut cmd_rx: mpsc::UnboundedReceiver<(OpenTelemetryContext, Command)>,
3252 group_commit_rx: appends::GroupCommitWaiter,
3253 ) -> LocalBoxFuture<'static, ()> {
3254 async move {
3255 let mut cluster_events = self.controller.events_stream();
3257 let last_message = Arc::new(Mutex::new(LastMessage {
3258 kind: "none",
3259 stmt: None,
3260 }));
3261
3262 let (idle_tx, mut idle_rx) = tokio::sync::mpsc::channel(1);
3263 let idle_metric = self.metrics.queue_busy_seconds.with_label_values(&[]);
3264 let last_message_watchdog = Arc::clone(&last_message);
3265
3266 spawn(|| "coord watchdog", async move {
3267 let mut interval = tokio::time::interval(Duration::from_secs(5));
3272 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
3276
3277 let mut coord_stuck = false;
3279
3280 loop {
3281 interval.tick().await;
3282
3283 let duration = tokio::time::Duration::from_secs(30);
3285 let timeout = tokio::time::timeout(duration, idle_tx.reserve()).await;
3286 let Ok(maybe_permit) = timeout else {
3287 if !coord_stuck {
3289 let last_message = last_message_watchdog.lock().expect("poisoned");
3290 tracing::warn!(
3291 last_message_kind = %last_message.kind,
3292 last_message_sql = %last_message.stmt_to_string(),
3293 "coordinator stuck for {duration:?}",
3294 );
3295 }
3296 coord_stuck = true;
3297
3298 continue;
3299 };
3300
3301 if coord_stuck {
3303 tracing::info!("Coordinator became unstuck");
3304 }
3305 coord_stuck = false;
3306
3307 let Ok(permit) = maybe_permit else {
3309 break;
3310 };
3311
3312 permit.send(idle_metric.start_timer());
3313 }
3314 });
3315
3316 self.schedule_storage_usage_collection().await;
3317 self.spawn_privatelink_vpc_endpoints_watch_task();
3318 self.spawn_statement_logging_task();
3319 flags::tracing_config(self.catalog.system_config()).apply(&self.tracing_handle);
3320
3321 let warn_threshold = self
3323 .catalog()
3324 .system_config()
3325 .coord_slow_message_warn_threshold();
3326
3327 const MESSAGE_BATCH: usize = 64;
3329 let mut messages = Vec::with_capacity(MESSAGE_BATCH);
3330 let mut cmd_messages = Vec::with_capacity(MESSAGE_BATCH);
3331
3332 let message_batch = self.metrics
3333 .message_batch
3334 .with_label_values(&[]);
3335
3336 loop {
3337 select! {
3341 biased;
3346
3347 _ = internal_cmd_rx.recv_many(&mut messages, MESSAGE_BATCH) => {},
3351 Some(event) = cluster_events.next() => messages.push(Message::ClusterEvent(event)),
3355 () = self.controller.ready() => {
3359 let controller = match self.controller.get_readiness() {
3363 Readiness::Storage => ControllerReadiness::Storage,
3364 Readiness::Compute => ControllerReadiness::Compute,
3365 Readiness::Metrics(_) => ControllerReadiness::Metrics,
3366 Readiness::Internal(_) => ControllerReadiness::Internal,
3367 Readiness::NotReady => unreachable!("just signaled as ready"),
3368 };
3369 messages.push(Message::ControllerReady { controller });
3370 }
3371 permit = group_commit_rx.ready() => {
3374 let user_write_spans = self.pending_writes.iter().flat_map(|x| match x {
3380 PendingWriteTxn::User{span, ..} => Some(span),
3381 PendingWriteTxn::System{..} => None,
3382 });
3383 let span = match user_write_spans.exactly_one() {
3384 Ok(span) => span.clone(),
3385 Err(user_write_spans) => {
3386 let span = info_span!(parent: None, "group_commit_notify");
3387 for s in user_write_spans {
3388 span.follows_from(s);
3389 }
3390 span
3391 }
3392 };
3393 messages.push(Message::GroupCommitInitiate(span, Some(permit)));
3394 },
3395 count = cmd_rx.recv_many(&mut cmd_messages, MESSAGE_BATCH) => {
3399 if count == 0 {
3400 break;
3401 } else {
3402 messages.extend(cmd_messages.drain(..).map(|(otel_ctx, cmd)| Message::Command(otel_ctx, cmd)));
3403 }
3404 },
3405 Some(pending_read_txn) = strict_serializable_reads_rx.recv() => {
3409 let mut pending_read_txns = vec![pending_read_txn];
3410 while let Ok(pending_read_txn) = strict_serializable_reads_rx.try_recv() {
3411 pending_read_txns.push(pending_read_txn);
3412 }
3413 for (conn_id, pending_read_txn) in pending_read_txns {
3414 let prev = self.pending_linearize_read_txns.insert(conn_id, pending_read_txn);
3415 soft_assert_or_log!(
3416 prev.is_none(),
3417 "connections can not have multiple concurrent reads, prev: {prev:?}"
3418 )
3419 }
3420 messages.push(Message::LinearizeReads);
3421 }
3422 _ = self.advance_timelines_interval.tick() => {
3426 let span = info_span!(parent: None, "coord::advance_timelines_interval");
3427 span.follows_from(Span::current());
3428
3429 if self.controller.read_only() {
3434 messages.push(Message::AdvanceTimelines);
3435 } else {
3436 messages.push(Message::GroupCommitInitiate(span, None));
3437 }
3438 },
3439 _ = self.check_cluster_scheduling_policies_interval.tick() => {
3443 messages.push(Message::CheckSchedulingPolicies);
3444 },
3445
3446 _ = self.caught_up_check_interval.tick() => {
3450 self.maybe_check_caught_up().await;
3455
3456 continue;
3457 },
3458
3459 timer = idle_rx.recv() => {
3464 timer.expect("does not drop").observe_duration();
3465 self.metrics
3466 .message_handling
3467 .with_label_values(&["watchdog"])
3468 .observe(0.0);
3469 continue;
3470 }
3471 };
3472
3473 message_batch.observe(f64::cast_lossy(messages.len()));
3475
3476 for msg in messages.drain(..) {
3477 let msg_kind = msg.kind();
3480 let span = span!(
3481 target: "mz_adapter::coord::handle_message_loop",
3482 Level::INFO,
3483 "coord::handle_message",
3484 kind = msg_kind
3485 );
3486 let otel_context = span.context().span().span_context().clone();
3487
3488 *last_message.lock().expect("poisoned") = LastMessage {
3492 kind: msg_kind,
3493 stmt: match &msg {
3494 Message::Command(
3495 _,
3496 Command::Execute {
3497 portal_name,
3498 session,
3499 ..
3500 },
3501 ) => session
3502 .get_portal_unverified(portal_name)
3503 .and_then(|p| p.stmt.as_ref().map(Arc::clone)),
3504 _ => None,
3505 },
3506 };
3507
3508 let start = Instant::now();
3509 self.handle_message(msg).instrument(span).await;
3510 let duration = start.elapsed();
3511
3512 self.metrics
3513 .message_handling
3514 .with_label_values(&[msg_kind])
3515 .observe(duration.as_secs_f64());
3516
3517 if duration > warn_threshold {
3519 let trace_id = otel_context.is_valid().then(|| otel_context.trace_id());
3520 tracing::error!(
3521 ?msg_kind,
3522 ?trace_id,
3523 ?duration,
3524 "very slow coordinator message"
3525 );
3526 }
3527 }
3528 }
3529 if let Some(catalog) = Arc::into_inner(self.catalog) {
3532 catalog.expire().await;
3533 }
3534 }
3535 .boxed_local()
3536 }
3537
3538 fn catalog(&self) -> &Catalog {
3540 &self.catalog
3541 }
3542
3543 fn owned_catalog(&self) -> Arc<Catalog> {
3546 Arc::clone(&self.catalog)
3547 }
3548
3549 fn optimizer_metrics(&self) -> OptimizerMetrics {
3552 self.optimizer_metrics.clone()
3553 }
3554
3555 fn catalog_mut(&mut self) -> &mut Catalog {
3557 Arc::make_mut(&mut self.catalog)
3565 }
3566
3567 fn connection_context(&self) -> &ConnectionContext {
3569 self.controller.connection_context()
3570 }
3571
3572 fn secrets_reader(&self) -> &Arc<dyn SecretsReader> {
3574 &self.connection_context().secrets_reader
3575 }
3576
3577 #[allow(dead_code)]
3582 pub(crate) fn broadcast_notice(&self, notice: AdapterNotice) {
3583 for meta in self.active_conns.values() {
3584 let _ = meta.notice_tx.send(notice.clone());
3585 }
3586 }
3587
3588 pub(crate) fn broadcast_notice_tx(
3591 &self,
3592 ) -> Box<dyn FnOnce(AdapterNotice) -> () + Send + 'static> {
3593 let senders: Vec<_> = self
3594 .active_conns
3595 .values()
3596 .map(|meta| meta.notice_tx.clone())
3597 .collect();
3598 Box::new(move |notice| {
3599 for tx in senders {
3600 let _ = tx.send(notice.clone());
3601 }
3602 })
3603 }
3604
3605 pub(crate) fn active_conns(&self) -> &BTreeMap<ConnectionId, ConnMeta> {
3606 &self.active_conns
3607 }
3608
3609 #[instrument(level = "debug")]
3610 pub(crate) fn retire_execution(
3611 &mut self,
3612 reason: StatementEndedExecutionReason,
3613 ctx_extra: ExecuteContextExtra,
3614 ) {
3615 if let Some(uuid) = ctx_extra.retire() {
3616 self.end_statement_execution(uuid, reason);
3617 }
3618 }
3619
3620 #[instrument(level = "debug")]
3622 pub fn dataflow_builder(&self, instance: ComputeInstanceId) -> DataflowBuilder<'_> {
3623 let compute = self
3624 .instance_snapshot(instance)
3625 .expect("compute instance does not exist");
3626 DataflowBuilder::new(self.catalog().state(), compute)
3627 }
3628
3629 pub fn instance_snapshot(
3631 &self,
3632 id: ComputeInstanceId,
3633 ) -> Result<ComputeInstanceSnapshot, InstanceMissing> {
3634 ComputeInstanceSnapshot::new(&self.controller, id)
3635 }
3636
3637 pub(crate) async fn ship_dataflow(
3640 &mut self,
3641 dataflow: DataflowDescription<Plan>,
3642 instance: ComputeInstanceId,
3643 subscribe_target_replica: Option<ReplicaId>,
3644 ) {
3645 let export_ids = dataflow.exported_index_ids().collect();
3648
3649 self.controller
3650 .compute
3651 .create_dataflow(instance, dataflow, subscribe_target_replica)
3652 .unwrap_or_terminate("dataflow creation cannot fail");
3653
3654 self.initialize_compute_read_policies(export_ids, instance, CompactionWindow::Default)
3655 .await;
3656 }
3657
3658 pub(crate) async fn ship_dataflow_and_notice_builtin_table_updates(
3660 &mut self,
3661 dataflow: DataflowDescription<Plan>,
3662 instance: ComputeInstanceId,
3663 notice_builtin_updates_fut: Option<BuiltinTableAppendNotify>,
3664 ) {
3665 if let Some(notice_builtin_updates_fut) = notice_builtin_updates_fut {
3666 let ship_dataflow_fut = self.ship_dataflow(dataflow, instance, None);
3667 let ((), ()) =
3668 futures::future::join(notice_builtin_updates_fut, ship_dataflow_fut).await;
3669 } else {
3670 self.ship_dataflow(dataflow, instance, None).await;
3671 }
3672 }
3673
3674 pub fn install_compute_watch_set(
3678 &mut self,
3679 conn_id: ConnectionId,
3680 objects: BTreeSet<GlobalId>,
3681 t: Timestamp,
3682 state: WatchSetResponse,
3683 ) {
3684 let ws_id = self.controller.install_compute_watch_set(objects, t);
3685 self.connection_watch_sets
3686 .entry(conn_id.clone())
3687 .or_default()
3688 .insert(ws_id);
3689 self.installed_watch_sets.insert(ws_id, (conn_id, state));
3690 }
3691
3692 pub fn install_storage_watch_set(
3696 &mut self,
3697 conn_id: ConnectionId,
3698 objects: BTreeSet<GlobalId>,
3699 t: Timestamp,
3700 state: WatchSetResponse,
3701 ) {
3702 let ws_id = self.controller.install_storage_watch_set(objects, t);
3703 self.connection_watch_sets
3704 .entry(conn_id.clone())
3705 .or_default()
3706 .insert(ws_id);
3707 self.installed_watch_sets.insert(ws_id, (conn_id, state));
3708 }
3709
3710 pub fn cancel_pending_watchsets(&mut self, conn_id: &ConnectionId) {
3712 if let Some(ws_ids) = self.connection_watch_sets.remove(conn_id) {
3713 for ws_id in ws_ids {
3714 self.installed_watch_sets.remove(&ws_id);
3715 }
3716 }
3717 }
3718
3719 pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
3723 let global_timelines: BTreeMap<_, _> = self
3729 .global_timelines
3730 .iter()
3731 .map(|(timeline, state)| (timeline.to_string(), format!("{state:?}")))
3732 .collect();
3733 let active_conns: BTreeMap<_, _> = self
3734 .active_conns
3735 .iter()
3736 .map(|(id, meta)| (id.unhandled().to_string(), format!("{meta:?}")))
3737 .collect();
3738 let txn_read_holds: BTreeMap<_, _> = self
3739 .txn_read_holds
3740 .iter()
3741 .map(|(id, capability)| (id.unhandled().to_string(), format!("{capability:?}")))
3742 .collect();
3743 let pending_peeks: BTreeMap<_, _> = self
3744 .pending_peeks
3745 .iter()
3746 .map(|(id, peek)| (id.to_string(), format!("{peek:?}")))
3747 .collect();
3748 let client_pending_peeks: BTreeMap<_, _> = self
3749 .client_pending_peeks
3750 .iter()
3751 .map(|(id, peek)| {
3752 let peek: BTreeMap<_, _> = peek
3753 .iter()
3754 .map(|(uuid, storage_id)| (uuid.to_string(), storage_id))
3755 .collect();
3756 (id.to_string(), peek)
3757 })
3758 .collect();
3759 let pending_linearize_read_txns: BTreeMap<_, _> = self
3760 .pending_linearize_read_txns
3761 .iter()
3762 .map(|(id, read_txn)| (id.unhandled().to_string(), format!("{read_txn:?}")))
3763 .collect();
3764
3765 let map = serde_json::Map::from_iter([
3766 (
3767 "global_timelines".to_string(),
3768 serde_json::to_value(global_timelines)?,
3769 ),
3770 (
3771 "active_conns".to_string(),
3772 serde_json::to_value(active_conns)?,
3773 ),
3774 (
3775 "txn_read_holds".to_string(),
3776 serde_json::to_value(txn_read_holds)?,
3777 ),
3778 (
3779 "pending_peeks".to_string(),
3780 serde_json::to_value(pending_peeks)?,
3781 ),
3782 (
3783 "client_pending_peeks".to_string(),
3784 serde_json::to_value(client_pending_peeks)?,
3785 ),
3786 (
3787 "pending_linearize_read_txns".to_string(),
3788 serde_json::to_value(pending_linearize_read_txns)?,
3789 ),
3790 ("controller".to_string(), self.controller.dump().await?),
3791 ]);
3792 Ok(serde_json::Value::Object(map))
3793 }
3794
3795 async fn prune_storage_usage_events_on_startup(&self, retention_period: Duration) {
3809 let item_id = self
3810 .catalog()
3811 .resolve_builtin_table(&MZ_STORAGE_USAGE_BY_SHARD);
3812 let global_id = self.catalog.get_entry(&item_id).latest_global_id();
3813 let read_ts = self.get_local_read_ts().await;
3814 let current_contents_fut = self
3815 .controller
3816 .storage_collections
3817 .snapshot(global_id, read_ts);
3818 let internal_cmd_tx = self.internal_cmd_tx.clone();
3819 spawn(|| "storage_usage_prune", async move {
3820 let mut current_contents = current_contents_fut
3821 .await
3822 .unwrap_or_terminate("cannot fail to fetch snapshot");
3823 differential_dataflow::consolidation::consolidate(&mut current_contents);
3824
3825 let cutoff_ts = u128::from(read_ts).saturating_sub(retention_period.as_millis());
3826 let mut expired = Vec::new();
3827 for (row, diff) in current_contents {
3828 assert_eq!(
3829 diff, 1,
3830 "consolidated contents should not contain retractions: ({row:#?}, {diff:#?})"
3831 );
3832 let collection_timestamp = row
3834 .unpack()
3835 .get(3)
3836 .expect("definition of mz_storage_by_shard changed")
3837 .unwrap_timestamptz();
3838 let collection_timestamp = collection_timestamp.timestamp_millis();
3839 let collection_timestamp: u128 = collection_timestamp
3840 .try_into()
3841 .expect("all collections happen after Jan 1 1970");
3842 if collection_timestamp < cutoff_ts {
3843 debug!("pruning storage event {row:?}");
3844 let builtin_update = BuiltinTableUpdate::row(item_id, row, Diff::MINUS_ONE);
3845 expired.push(builtin_update);
3846 }
3847 }
3848
3849 let _ = internal_cmd_tx.send(Message::StorageUsagePrune(expired));
3851 });
3852 }
3853}
3854
3855#[cfg(test)]
3856impl Coordinator {
3857 #[allow(dead_code)]
3858 async fn verify_ship_dataflow_no_error(&mut self, dataflow: DataflowDescription<Plan>) {
3859 let compute_instance = ComputeInstanceId::user(1).expect("1 is a valid ID");
3867
3868 let _: () = self.ship_dataflow(dataflow, compute_instance, None).await;
3869 }
3870}
3871
3872struct LastMessage {
3874 kind: &'static str,
3875 stmt: Option<Arc<Statement<Raw>>>,
3876}
3877
3878impl LastMessage {
3879 fn stmt_to_string(&self) -> Cow<'static, str> {
3881 self.stmt
3882 .as_ref()
3883 .map(|stmt| stmt.to_ast_string_redacted().into())
3884 .unwrap_or(Cow::Borrowed("<none>"))
3885 }
3886}
3887
3888impl fmt::Debug for LastMessage {
3889 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3890 f.debug_struct("LastMessage")
3891 .field("kind", &self.kind)
3892 .field("stmt", &self.stmt_to_string())
3893 .finish()
3894 }
3895}
3896
3897impl Drop for LastMessage {
3898 fn drop(&mut self) {
3899 if std::thread::panicking() {
3901 eprintln!("Coordinator panicking, dumping last message\n{self:?}",);
3903 }
3904 }
3905}
3906
3907pub fn serve(
3919 Config {
3920 controller_config,
3921 controller_envd_epoch,
3922 mut storage,
3923 audit_logs_iterator,
3924 timestamp_oracle_url,
3925 unsafe_mode,
3926 all_features,
3927 build_info,
3928 environment_id,
3929 metrics_registry,
3930 now,
3931 secrets_controller,
3932 cloud_resource_controller,
3933 cluster_replica_sizes,
3934 builtin_system_cluster_config,
3935 builtin_catalog_server_cluster_config,
3936 builtin_probe_cluster_config,
3937 builtin_support_cluster_config,
3938 builtin_analytics_cluster_config,
3939 system_parameter_defaults,
3940 availability_zones,
3941 storage_usage_client,
3942 storage_usage_collection_interval,
3943 storage_usage_retention_period,
3944 segment_client,
3945 egress_addresses,
3946 aws_account_id,
3947 aws_privatelink_availability_zones,
3948 connection_context,
3949 connection_limit_callback,
3950 remote_system_parameters,
3951 webhook_concurrency_limit,
3952 http_host_name,
3953 tracing_handle,
3954 read_only_controllers,
3955 enable_0dt_deployment,
3956 caught_up_trigger: clusters_caught_up_trigger,
3957 helm_chart_version,
3958 license_key,
3959 external_login_password_mz_system,
3960 }: Config,
3961) -> BoxFuture<'static, Result<(Handle, Client), AdapterError>> {
3962 async move {
3963 let coord_start = Instant::now();
3964 info!("startup: coordinator init: beginning");
3965 info!("startup: coordinator init: preamble beginning");
3966
3967 let _builtins = LazyLock::force(&BUILTINS_STATIC);
3971
3972 let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
3973 let (internal_cmd_tx, internal_cmd_rx) = mpsc::unbounded_channel();
3974 let (strict_serializable_reads_tx, strict_serializable_reads_rx) =
3975 mpsc::unbounded_channel();
3976
3977 if !availability_zones.iter().all_unique() {
3979 coord_bail!("availability zones must be unique");
3980 }
3981
3982 let aws_principal_context = match (
3983 aws_account_id,
3984 connection_context.aws_external_id_prefix.clone(),
3985 ) {
3986 (Some(aws_account_id), Some(aws_external_id_prefix)) => Some(AwsPrincipalContext {
3987 aws_account_id,
3988 aws_external_id_prefix,
3989 }),
3990 _ => None,
3991 };
3992
3993 let aws_privatelink_availability_zones = aws_privatelink_availability_zones
3994 .map(|azs_vec| BTreeSet::from_iter(azs_vec.iter().cloned()));
3995
3996 info!(
3997 "startup: coordinator init: preamble complete in {:?}",
3998 coord_start.elapsed()
3999 );
4000 let oracle_init_start = Instant::now();
4001 info!("startup: coordinator init: timestamp oracle init beginning");
4002
4003 let pg_timestamp_oracle_config = timestamp_oracle_url
4004 .map(|pg_url| PostgresTimestampOracleConfig::new(&pg_url, &metrics_registry));
4005 let mut initial_timestamps =
4006 get_initial_oracle_timestamps(&pg_timestamp_oracle_config).await?;
4007
4008 initial_timestamps
4012 .entry(Timeline::EpochMilliseconds)
4013 .or_insert_with(mz_repr::Timestamp::minimum);
4014 let mut timestamp_oracles = BTreeMap::new();
4015 for (timeline, initial_timestamp) in initial_timestamps {
4016 Coordinator::ensure_timeline_state_with_initial_time(
4017 &timeline,
4018 initial_timestamp,
4019 now.clone(),
4020 pg_timestamp_oracle_config.clone(),
4021 &mut timestamp_oracles,
4022 read_only_controllers,
4023 )
4024 .await;
4025 }
4026
4027 let catalog_upper = storage.current_upper().await;
4031 let epoch_millis_oracle = ×tamp_oracles
4037 .get(&Timeline::EpochMilliseconds)
4038 .expect("inserted above")
4039 .oracle;
4040
4041 let mut boot_ts = if read_only_controllers {
4042 let read_ts = epoch_millis_oracle.read_ts().await;
4043 std::cmp::max(read_ts, catalog_upper)
4044 } else {
4045 epoch_millis_oracle.apply_write(catalog_upper).await;
4048 epoch_millis_oracle.write_ts().await.timestamp
4049 };
4050
4051 info!(
4052 "startup: coordinator init: timestamp oracle init complete in {:?}",
4053 oracle_init_start.elapsed()
4054 );
4055
4056 let catalog_open_start = Instant::now();
4057 info!("startup: coordinator init: catalog open beginning");
4058 let persist_client = controller_config
4059 .persist_clients
4060 .open(controller_config.persist_location.clone())
4061 .await
4062 .context("opening persist client")?;
4063 let builtin_item_migration_config =
4064 BuiltinItemMigrationConfig {
4065 persist_client: persist_client.clone(),
4066 read_only: read_only_controllers,
4067 }
4068 ;
4069 let OpenCatalogResult {
4070 mut catalog,
4071 migrated_storage_collections_0dt,
4072 new_builtin_collections,
4073 builtin_table_updates,
4074 cached_global_exprs,
4075 uncached_local_exprs,
4076 } = Catalog::open(mz_catalog::config::Config {
4077 storage,
4078 metrics_registry: &metrics_registry,
4079 state: mz_catalog::config::StateConfig {
4080 unsafe_mode,
4081 all_features,
4082 build_info,
4083 environment_id: environment_id.clone(),
4084 read_only: read_only_controllers,
4085 now: now.clone(),
4086 boot_ts: boot_ts.clone(),
4087 skip_migrations: false,
4088 cluster_replica_sizes,
4089 builtin_system_cluster_config,
4090 builtin_catalog_server_cluster_config,
4091 builtin_probe_cluster_config,
4092 builtin_support_cluster_config,
4093 builtin_analytics_cluster_config,
4094 system_parameter_defaults,
4095 remote_system_parameters,
4096 availability_zones,
4097 egress_addresses,
4098 aws_principal_context,
4099 aws_privatelink_availability_zones,
4100 connection_context,
4101 http_host_name,
4102 builtin_item_migration_config,
4103 persist_client: persist_client.clone(),
4104 enable_expression_cache_override: None,
4105 enable_0dt_deployment,
4106 helm_chart_version,
4107 external_login_password_mz_system,
4108 license_key: license_key.clone(),
4109 },
4110 })
4111 .await?;
4112
4113 let catalog_upper = catalog.current_upper().await;
4116 boot_ts = std::cmp::max(boot_ts, catalog_upper);
4117
4118 if !read_only_controllers {
4119 epoch_millis_oracle.apply_write(boot_ts).await;
4120 }
4121
4122 info!(
4123 "startup: coordinator init: catalog open complete in {:?}",
4124 catalog_open_start.elapsed()
4125 );
4126
4127 let coord_thread_start = Instant::now();
4128 info!("startup: coordinator init: coordinator thread start beginning");
4129
4130 let session_id = catalog.config().session_id;
4131 let start_instant = catalog.config().start_instant;
4132
4133 let (bootstrap_tx, bootstrap_rx) = oneshot::channel();
4137 let handle = TokioHandle::current();
4138
4139 let metrics = Metrics::register_into(&metrics_registry);
4140 let metrics_clone = metrics.clone();
4141 let optimizer_metrics = OptimizerMetrics::register_into(
4142 &metrics_registry,
4143 catalog.system_config().optimizer_e2e_latency_warning_threshold(),
4144 );
4145 let segment_client_clone = segment_client.clone();
4146 let coord_now = now.clone();
4147 let advance_timelines_interval = tokio::time::interval(catalog.config().timestamp_interval);
4148 let mut check_scheduling_policies_interval = tokio::time::interval(
4149 catalog
4150 .system_config()
4151 .cluster_check_scheduling_policies_interval(),
4152 );
4153 check_scheduling_policies_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
4154
4155 let clusters_caught_up_check_interval = if read_only_controllers {
4156 let dyncfgs = catalog.system_config().dyncfgs();
4157 let interval = WITH_0DT_DEPLOYMENT_CAUGHT_UP_CHECK_INTERVAL.get(dyncfgs);
4158
4159 let mut interval = tokio::time::interval(interval);
4160 interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
4161 interval
4162 } else {
4163 let mut interval = tokio::time::interval(Duration::from_secs(60 * 60));
4171 interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
4172 interval
4173 };
4174
4175 let clusters_caught_up_check =
4176 clusters_caught_up_trigger.map(|trigger| CaughtUpCheckContext {
4177 trigger,
4178 exclude_collections: new_builtin_collections.into_iter().collect(),
4179 });
4180
4181 if let Some(config) = pg_timestamp_oracle_config.as_ref() {
4182 let pg_timestamp_oracle_params =
4185 flags::pg_timstamp_oracle_config(catalog.system_config());
4186 pg_timestamp_oracle_params.apply(config);
4187 }
4188
4189 let connection_limit_callback: Arc<dyn Fn(&SystemVars) + Send + Sync> =
4192 Arc::new(move |system_vars: &SystemVars| {
4193 let limit: u64 = system_vars.max_connections().cast_into();
4194 let superuser_reserved: u64 =
4195 system_vars.superuser_reserved_connections().cast_into();
4196
4197 let superuser_reserved = if superuser_reserved >= limit {
4202 tracing::warn!(
4203 "superuser_reserved ({superuser_reserved}) is greater than max connections ({limit})!"
4204 );
4205 limit
4206 } else {
4207 superuser_reserved
4208 };
4209
4210 (connection_limit_callback)(limit, superuser_reserved);
4211 });
4212 catalog.system_config_mut().register_callback(
4213 &mz_sql::session::vars::MAX_CONNECTIONS,
4214 Arc::clone(&connection_limit_callback),
4215 );
4216 catalog.system_config_mut().register_callback(
4217 &mz_sql::session::vars::SUPERUSER_RESERVED_CONNECTIONS,
4218 connection_limit_callback,
4219 );
4220
4221 let (group_commit_tx, group_commit_rx) = appends::notifier();
4222
4223 let parent_span = tracing::Span::current();
4224 let thread = thread::Builder::new()
4225 .stack_size(3 * stack::STACK_SIZE)
4229 .name("coordinator".to_string())
4230 .spawn(move || {
4231 let span = info_span!(parent: parent_span, "coord::coordinator").entered();
4232
4233 let controller = handle
4234 .block_on({
4235 catalog.initialize_controller(
4236 controller_config,
4237 controller_envd_epoch,
4238 read_only_controllers,
4239 )
4240 })
4241 .unwrap_or_terminate("failed to initialize storage_controller");
4242 let catalog_upper = handle.block_on(catalog.current_upper());
4245 boot_ts = std::cmp::max(boot_ts, catalog_upper);
4246 if !read_only_controllers {
4247 let epoch_millis_oracle = ×tamp_oracles
4248 .get(&Timeline::EpochMilliseconds)
4249 .expect("inserted above")
4250 .oracle;
4251 handle.block_on(epoch_millis_oracle.apply_write(boot_ts));
4252 }
4253
4254 let catalog = Arc::new(catalog);
4255
4256 let caching_secrets_reader = CachingSecretsReader::new(secrets_controller.reader());
4257 let mut coord = Coordinator {
4258 controller,
4259 catalog,
4260 internal_cmd_tx,
4261 group_commit_tx,
4262 strict_serializable_reads_tx,
4263 global_timelines: timestamp_oracles,
4264 transient_id_gen: Arc::new(TransientIdGen::new()),
4265 active_conns: BTreeMap::new(),
4266 txn_read_holds: Default::default(),
4267 pending_peeks: BTreeMap::new(),
4268 client_pending_peeks: BTreeMap::new(),
4269 pending_linearize_read_txns: BTreeMap::new(),
4270 serialized_ddl: LockedVecDeque::new(),
4271 active_compute_sinks: BTreeMap::new(),
4272 active_webhooks: BTreeMap::new(),
4273 active_copies: BTreeMap::new(),
4274 staged_cancellation: BTreeMap::new(),
4275 introspection_subscribes: BTreeMap::new(),
4276 write_locks: BTreeMap::new(),
4277 deferred_write_ops: BTreeMap::new(),
4278 pending_writes: Vec::new(),
4279 advance_timelines_interval,
4280 secrets_controller,
4281 caching_secrets_reader,
4282 cloud_resource_controller,
4283 storage_usage_client,
4284 storage_usage_collection_interval,
4285 segment_client,
4286 metrics,
4287 optimizer_metrics,
4288 tracing_handle,
4289 statement_logging: StatementLogging::new(coord_now.clone()),
4290 webhook_concurrency_limit,
4291 pg_timestamp_oracle_config,
4292 check_cluster_scheduling_policies_interval: check_scheduling_policies_interval,
4293 cluster_scheduling_decisions: BTreeMap::new(),
4294 caught_up_check_interval: clusters_caught_up_check_interval,
4295 caught_up_check: clusters_caught_up_check,
4296 installed_watch_sets: BTreeMap::new(),
4297 connection_watch_sets: BTreeMap::new(),
4298 cluster_replica_statuses: ClusterReplicaStatuses::new(),
4299 read_only_controllers,
4300 buffered_builtin_table_updates: Some(Vec::new()),
4301 license_key,
4302 persist_client,
4303 };
4304 let bootstrap = handle.block_on(async {
4305 coord
4306 .bootstrap(
4307 boot_ts,
4308 migrated_storage_collections_0dt,
4309 builtin_table_updates,
4310 cached_global_exprs,
4311 uncached_local_exprs,
4312 audit_logs_iterator,
4313 )
4314 .await?;
4315 coord
4316 .controller
4317 .remove_orphaned_replicas(
4318 coord.catalog().get_next_user_replica_id().await?,
4319 coord.catalog().get_next_system_replica_id().await?,
4320 )
4321 .await
4322 .map_err(AdapterError::Orchestrator)?;
4323
4324 if let Some(retention_period) = storage_usage_retention_period {
4325 coord
4326 .prune_storage_usage_events_on_startup(retention_period)
4327 .await;
4328 }
4329
4330 Ok(())
4331 });
4332 let ok = bootstrap.is_ok();
4333 drop(span);
4334 bootstrap_tx
4335 .send(bootstrap)
4336 .expect("bootstrap_rx is not dropped until it receives this message");
4337 if ok {
4338 handle.block_on(coord.serve(
4339 internal_cmd_rx,
4340 strict_serializable_reads_rx,
4341 cmd_rx,
4342 group_commit_rx,
4343 ));
4344 }
4345 })
4346 .expect("failed to create coordinator thread");
4347 match bootstrap_rx
4348 .await
4349 .expect("bootstrap_tx always sends a message or panics/halts")
4350 {
4351 Ok(()) => {
4352 info!(
4353 "startup: coordinator init: coordinator thread start complete in {:?}",
4354 coord_thread_start.elapsed()
4355 );
4356 info!(
4357 "startup: coordinator init: complete in {:?}",
4358 coord_start.elapsed()
4359 );
4360 let handle = Handle {
4361 session_id,
4362 start_instant,
4363 _thread: thread.join_on_drop(),
4364 };
4365 let client = Client::new(
4366 build_info,
4367 cmd_tx.clone(),
4368 metrics_clone,
4369 now,
4370 environment_id,
4371 segment_client_clone,
4372 );
4373 Ok((handle, client))
4374 }
4375 Err(e) => Err(e),
4376 }
4377 }
4378 .boxed()
4379}
4380
4381async fn get_initial_oracle_timestamps(
4395 pg_timestamp_oracle_config: &Option<PostgresTimestampOracleConfig>,
4396) -> Result<BTreeMap<Timeline, Timestamp>, AdapterError> {
4397 let mut initial_timestamps = BTreeMap::new();
4398
4399 if let Some(pg_timestamp_oracle_config) = pg_timestamp_oracle_config {
4400 let postgres_oracle_timestamps =
4401 PostgresTimestampOracle::<NowFn>::get_all_timelines(pg_timestamp_oracle_config.clone())
4402 .await?;
4403
4404 let debug_msg = || {
4405 postgres_oracle_timestamps
4406 .iter()
4407 .map(|(timeline, ts)| format!("{:?} -> {}", timeline, ts))
4408 .join(", ")
4409 };
4410 info!(
4411 "current timestamps from the postgres-backed timestamp oracle: {}",
4412 debug_msg()
4413 );
4414
4415 for (timeline, ts) in postgres_oracle_timestamps {
4416 let entry = initial_timestamps
4417 .entry(Timeline::from_str(&timeline).expect("could not parse timeline"));
4418
4419 entry
4420 .and_modify(|current_ts| *current_ts = std::cmp::max(*current_ts, ts))
4421 .or_insert(ts);
4422 }
4423 } else {
4424 info!("no postgres url for postgres-backed timestamp oracle configured!");
4425 };
4426
4427 let debug_msg = || {
4428 initial_timestamps
4429 .iter()
4430 .map(|(timeline, ts)| format!("{:?}: {}", timeline, ts))
4431 .join(", ")
4432 };
4433 info!("initial oracle timestamps: {}", debug_msg());
4434
4435 Ok(initial_timestamps)
4436}
4437
4438#[instrument]
4439pub async fn load_remote_system_parameters(
4440 storage: &mut Box<dyn OpenableDurableCatalogState>,
4441 system_parameter_sync_config: Option<SystemParameterSyncConfig>,
4442 system_parameter_sync_timeout: Duration,
4443) -> Result<Option<BTreeMap<String, String>>, AdapterError> {
4444 if let Some(system_parameter_sync_config) = system_parameter_sync_config {
4445 tracing::info!("parameter sync on boot: start sync");
4446
4447 let mut params = SynchronizedParameters::new(SystemVars::default());
4487 let frontend_sync = async {
4488 let frontend = SystemParameterFrontend::from(&system_parameter_sync_config).await?;
4489 frontend.pull(&mut params);
4490 let ops = params
4491 .modified()
4492 .into_iter()
4493 .map(|param| {
4494 let name = param.name;
4495 let value = param.value;
4496 tracing::info!(name, value, initial = true, "sync parameter");
4497 (name, value)
4498 })
4499 .collect();
4500 tracing::info!("parameter sync on boot: end sync");
4501 Ok(Some(ops))
4502 };
4503 if !storage.has_system_config_synced_once().await? {
4504 frontend_sync.await
4505 } else {
4506 match mz_ore::future::timeout(system_parameter_sync_timeout, frontend_sync).await {
4507 Ok(ops) => Ok(ops),
4508 Err(TimeoutError::Inner(e)) => Err(e),
4509 Err(TimeoutError::DeadlineElapsed) => {
4510 tracing::info!("parameter sync on boot: sync has timed out");
4511 Ok(None)
4512 }
4513 }
4514 }
4515 } else {
4516 Ok(None)
4517 }
4518}
4519
4520#[derive(Debug)]
4521pub enum WatchSetResponse {
4522 StatementDependenciesReady(StatementLoggingId, StatementLifecycleEvent),
4523 AlterSinkReady(AlterSinkReadyContext),
4524}
4525
4526#[derive(Debug)]
4527pub struct AlterSinkReadyContext {
4528 ctx: Option<ExecuteContext>,
4529 otel_ctx: OpenTelemetryContext,
4530 plan: AlterSinkPlan,
4531 plan_validity: PlanValidity,
4532 resolved_ids: ResolvedIds,
4533 read_hold: ReadHolds<Timestamp>,
4534}
4535
4536impl AlterSinkReadyContext {
4537 fn ctx(&mut self) -> &mut ExecuteContext {
4538 self.ctx.as_mut().expect("only cleared on drop")
4539 }
4540
4541 fn retire(mut self, result: Result<ExecuteResponse, AdapterError>) {
4542 self.ctx
4543 .take()
4544 .expect("only cleared on drop")
4545 .retire(result);
4546 }
4547}
4548
4549impl Drop for AlterSinkReadyContext {
4550 fn drop(&mut self) {
4551 if let Some(ctx) = self.ctx.take() {
4552 ctx.retire(Err(AdapterError::Canceled));
4553 }
4554 }
4555}
4556
4557#[derive(Debug)]
4560struct LockedVecDeque<T> {
4561 items: VecDeque<T>,
4562 lock: Arc<tokio::sync::Mutex<()>>,
4563}
4564
4565impl<T> LockedVecDeque<T> {
4566 pub fn new() -> Self {
4567 Self {
4568 items: VecDeque::new(),
4569 lock: Arc::new(tokio::sync::Mutex::new(())),
4570 }
4571 }
4572
4573 pub fn try_lock_owned(&self) -> Result<OwnedMutexGuard<()>, tokio::sync::TryLockError> {
4574 Arc::clone(&self.lock).try_lock_owned()
4575 }
4576
4577 pub fn is_empty(&self) -> bool {
4578 self.items.is_empty()
4579 }
4580
4581 pub fn push_back(&mut self, value: T) {
4582 self.items.push_back(value)
4583 }
4584
4585 pub fn pop_front(&mut self) -> Option<T> {
4586 self.items.pop_front()
4587 }
4588
4589 pub fn remove(&mut self, index: usize) -> Option<T> {
4590 self.items.remove(index)
4591 }
4592
4593 pub fn iter(&self) -> std::collections::vec_deque::Iter<'_, T> {
4594 self.items.iter()
4595 }
4596}
4597
4598#[derive(Debug)]
4599struct DeferredPlanStatement {
4600 ctx: ExecuteContext,
4601 ps: PlanStatement,
4602}
4603
4604#[derive(Debug)]
4605enum PlanStatement {
4606 Statement {
4607 stmt: Arc<Statement<Raw>>,
4608 params: Params,
4609 },
4610 Plan {
4611 plan: mz_sql::plan::Plan,
4612 resolved_ids: ResolvedIds,
4613 },
4614}
4615
4616#[derive(Debug, Error)]
4617pub enum NetworkPolicyError {
4618 #[error("Access denied for address {0}")]
4619 AddressDenied(IpAddr),
4620 #[error("Access denied missing IP address")]
4621 MissingIp,
4622}
4623
4624pub(crate) fn validate_ip_with_policy_rules(
4625 ip: &IpAddr,
4626 rules: &Vec<NetworkPolicyRule>,
4627) -> Result<(), NetworkPolicyError> {
4628 if rules.iter().any(|r| r.address.0.contains(ip)) {
4631 Ok(())
4632 } else {
4633 Err(NetworkPolicyError::AddressDenied(ip.clone()))
4634 }
4635}