1use std::borrow::Cow;
70use std::collections::{BTreeMap, BTreeSet, VecDeque};
71use std::net::IpAddr;
72use std::num::NonZeroI64;
73use std::ops::Neg;
74use std::str::FromStr;
75use std::sync::LazyLock;
76use std::sync::{Arc, Mutex};
77use std::thread;
78use std::time::{Duration, Instant};
79use std::{fmt, mem};
80
81use anyhow::Context;
82use chrono::{DateTime, Utc};
83use derivative::Derivative;
84use differential_dataflow::lattice::Lattice;
85use fail::fail_point;
86use futures::StreamExt;
87use futures::future::{BoxFuture, FutureExt, LocalBoxFuture};
88use http::Uri;
89use ipnet::IpNet;
90use itertools::Itertools;
91use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
92use mz_adapter_types::compaction::CompactionWindow;
93use mz_adapter_types::connection::ConnectionId;
94use mz_adapter_types::dyncfgs::{
95 USER_ID_POOL_BATCH_SIZE, WITH_0DT_DEPLOYMENT_CAUGHT_UP_CHECK_INTERVAL,
96};
97use mz_auth::password::Password;
98use mz_build_info::BuildInfo;
99use mz_catalog::builtin::{BUILTINS, BUILTINS_STATIC, MZ_AUDIT_EVENTS, MZ_STORAGE_USAGE_BY_SHARD};
100use mz_catalog::config::{AwsPrincipalContext, BuiltinItemMigrationConfig, ClusterReplicaSizeMap};
101use mz_catalog::durable::{AuditLogIterator, OpenableDurableCatalogState};
102use mz_catalog::expr_cache::{GlobalExpressions, LocalExpressions};
103use mz_catalog::memory::objects::{
104 CatalogEntry, CatalogItem, ClusterReplicaProcessStatus, ClusterVariantManaged, Connection,
105 DataSourceDesc, StateDiff, StateUpdate, StateUpdateKind, Table, TableDataSource,
106};
107use mz_cloud_resources::{CloudResourceController, VpcEndpointConfig, VpcEndpointEvent};
108use mz_compute_client::as_of_selection;
109use mz_compute_client::controller::error::{
110 CollectionLookupError, CollectionMissing, DataflowCreationError, InstanceMissing,
111};
112use mz_compute_types::ComputeInstanceId;
113use mz_compute_types::dataflows::DataflowDescription;
114use mz_compute_types::plan::Plan;
115use mz_controller::clusters::{
116 ClusterConfig, ClusterEvent, ClusterStatus, ProcessId, ReplicaLocation,
117};
118use mz_controller::{ControllerConfig, Readiness};
119use mz_controller_types::{ClusterId, ReplicaId, WatchSetId};
120use mz_dyncfg::ConfigUpdates;
121use mz_expr::{MapFilterProject, MirRelationExpr, OptimizedMirRelationExpr, RowSetFinishing};
122use mz_license_keys::{ExpirationBehavior, ValidatedLicenseKey};
123use mz_orchestrator::OfflineReason;
124use mz_ore::cast::{CastFrom, CastInto, CastLossy};
125use mz_ore::channel::trigger::Trigger;
126use mz_ore::future::TimeoutError;
127use mz_ore::metrics::MetricsRegistry;
128use mz_ore::now::{EpochMillis, NowFn};
129use mz_ore::task::{JoinHandle, spawn};
130use mz_ore::thread::JoinHandleExt;
131use mz_ore::tracing::{OpenTelemetryContext, TracingHandle};
132use mz_ore::url::SensitiveUrl;
133use mz_ore::{
134 assert_none, instrument, soft_assert_eq_or_log, soft_assert_or_log, soft_panic_or_log, stack,
135};
136use mz_persist_client::PersistClient;
137use mz_persist_client::batch::ProtoBatch;
138use mz_persist_client::usage::{ShardsUsageReferenced, StorageUsageClient};
139use mz_repr::adt::numeric::Numeric;
140use mz_repr::explain::{ExplainConfig, ExplainFormat};
141use mz_repr::global_id::TransientIdGen;
142use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
143use mz_repr::role_id::RoleId;
144use mz_repr::{CatalogItemId, Diff, GlobalId, RelationDesc, SqlRelationType, Timestamp};
145use mz_secrets::cache::CachingSecretsReader;
146use mz_secrets::{SecretsController, SecretsReader};
147use mz_sql::ast::{Raw, Statement};
148use mz_sql::catalog::{CatalogCluster, EnvironmentId};
149use mz_sql::names::{QualifiedItemName, ResolvedIds, SchemaSpecifier};
150use mz_sql::optimizer_metrics::OptimizerMetrics;
151use mz_sql::plan::{
152 self, AlterSinkPlan, ConnectionDetails, CreateConnectionPlan, HirRelationExpr,
153 NetworkPolicyRule, OnTimeoutAction, Params, QueryWhen,
154};
155use mz_sql::session::user::User;
156use mz_sql::session::vars::{MAX_CREDIT_CONSUMPTION_RATE, SystemVars, Var};
157use mz_sql_parser::ast::ExplainStage;
158use mz_sql_parser::ast::display::AstDisplay;
159use mz_storage_client::client::TableData;
160use mz_storage_client::controller::{CollectionDescription, DataSource, ExportDescription};
161use mz_storage_types::connections::Connection as StorageConnection;
162use mz_storage_types::connections::ConnectionContext;
163use mz_storage_types::connections::inline::{IntoInlineConnection, ReferencedConnection};
164use mz_storage_types::read_holds::ReadHold;
165use mz_storage_types::sinks::{S3SinkFormat, StorageSinkDesc};
166use mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC;
167use mz_storage_types::sources::{IngestionDescription, SourceExport, Timeline};
168use mz_timestamp_oracle::{TimestampOracleConfig, WriteTimestamp};
169use mz_transform::dataflow::DataflowMetainfo;
170use opentelemetry::trace::TraceContextExt;
171use serde::Serialize;
172use thiserror::Error;
173use timely::progress::{Antichain, Timestamp as _};
174use tokio::runtime::Handle as TokioHandle;
175use tokio::select;
176use tokio::sync::{OwnedMutexGuard, mpsc, oneshot, watch};
177use tokio::time::{Interval, MissedTickBehavior};
178use tracing::{Instrument, Level, Span, debug, info, info_span, span, warn};
179use tracing_opentelemetry::OpenTelemetrySpanExt;
180use uuid::Uuid;
181
182use crate::active_compute_sink::{ActiveComputeSink, ActiveCopyFrom};
183use crate::catalog::{BuiltinTableUpdate, Catalog, OpenCatalogResult};
184use crate::client::{Client, Handle};
185use crate::command::{Command, ExecuteResponse};
186use crate::config::{SynchronizedParameters, SystemParameterFrontend, SystemParameterSyncConfig};
187use crate::coord::appends::{
188 BuiltinTableAppendNotify, DeferredOp, GroupCommitPermit, PendingWriteTxn,
189};
190use crate::coord::caught_up::CaughtUpCheckContext;
191use crate::coord::cluster_scheduling::SchedulingDecision;
192use crate::coord::id_bundle::CollectionIdBundle;
193use crate::coord::introspection::IntrospectionSubscribe;
194use crate::coord::peek::PendingPeek;
195use crate::coord::statement_logging::StatementLogging;
196use crate::coord::timeline::{TimelineContext, TimelineState};
197use crate::coord::timestamp_selection::{TimestampContext, TimestampDetermination};
198use crate::coord::validity::PlanValidity;
199use crate::error::AdapterError;
200use crate::explain::insights::PlanInsightsContext;
201use crate::explain::optimizer_trace::{DispatchGuard, OptimizerTrace};
202use crate::metrics::Metrics;
203use crate::optimize::dataflows::{ComputeInstanceSnapshot, DataflowBuilder};
204use crate::optimize::{self, Optimize, OptimizerConfig};
205use crate::session::{EndTransactionAction, Session};
206use crate::statement_logging::{
207 StatementEndedExecutionReason, StatementLifecycleEvent, StatementLoggingId,
208};
209use crate::util::{ClientTransmitter, ResultExt, sort_topological};
210use crate::webhook::{WebhookAppenderInvalidator, WebhookConcurrencyLimiter};
211use crate::{AdapterNotice, ReadHolds, flags};
212
213pub(crate) mod appends;
214pub(crate) mod catalog_serving;
215pub(crate) mod cluster_scheduling;
216pub(crate) mod consistency;
217pub(crate) mod id_bundle;
218pub(crate) mod in_memory_oracle;
219pub(crate) mod peek;
220pub(crate) mod read_policy;
221pub(crate) mod read_then_write;
222pub(crate) mod sequencer;
223pub(crate) mod statement_logging;
224pub(crate) mod timeline;
225pub(crate) mod timestamp_selection;
226
227pub mod catalog_implications;
228mod caught_up;
229mod command_handler;
230mod ddl;
231pub(crate) mod group_sync;
232mod indexes;
233mod info_metrics;
234mod introspection;
235mod message_handler;
236mod privatelink_status;
237mod sql;
238mod validity;
239
240#[derive(Debug)]
266pub(crate) struct IdPool {
267 next: u64,
268 upper: u64,
269}
270
271impl IdPool {
272 pub fn empty() -> Self {
274 IdPool { next: 0, upper: 0 }
275 }
276
277 pub fn allocate(&mut self) -> Option<u64> {
279 if self.next < self.upper {
280 let id = self.next;
281 self.next += 1;
282 Some(id)
283 } else {
284 None
285 }
286 }
287
288 pub fn allocate_many(&mut self, n: u64) -> Option<Vec<u64>> {
291 if self.remaining() >= n {
292 let ids = (self.next..self.next + n).collect();
293 self.next += n;
294 Some(ids)
295 } else {
296 None
297 }
298 }
299
300 pub fn remaining(&self) -> u64 {
302 self.upper - self.next
303 }
304
305 pub fn refill(&mut self, next: u64, upper: u64) {
307 assert!(next <= upper, "invalid pool range: {next}..{upper}");
308 self.next = next;
309 self.upper = upper;
310 }
311}
312
313#[derive(Debug)]
314pub enum Message {
315 Command(OpenTelemetryContext, Command),
316 ControllerReady {
317 controller: ControllerReadiness,
318 },
319 PurifiedStatementReady(PurifiedStatementReady),
320 CreateConnectionValidationReady(CreateConnectionValidationReady),
321 AlterConnectionValidationReady(AlterConnectionValidationReady),
322 TryDeferred {
323 conn_id: ConnectionId,
325 acquired_lock: Option<(CatalogItemId, tokio::sync::OwnedMutexGuard<()>)>,
335 },
336 GroupCommitInitiate(Span, Option<GroupCommitPermit>),
338 DeferredStatementReady,
339 AdvanceTimelines,
340 ClusterEvent(ClusterEvent),
341 CancelPendingPeeks {
342 conn_id: ConnectionId,
343 },
344 LinearizeReads,
345 StagedBatches {
346 conn_id: ConnectionId,
347 table_id: CatalogItemId,
348 batches: Vec<Result<ProtoBatch, String>>,
349 },
350 StorageUsageSchedule,
351 StorageUsageFetch,
352 StorageUsageUpdate(ShardsUsageReferenced),
353 StorageUsagePrune(Vec<BuiltinTableUpdate>),
354 ArrangementSizesSchedule,
355 ArrangementSizesSnapshot,
356 ArrangementSizesPrune(Vec<BuiltinTableUpdate>),
357 RetireExecute {
360 data: ExecuteContextExtra,
361 otel_ctx: OpenTelemetryContext,
362 reason: StatementEndedExecutionReason,
363 },
364 ExecuteSingleStatementTransaction {
365 ctx: ExecuteContext,
366 otel_ctx: OpenTelemetryContext,
367 stmt: Arc<Statement<Raw>>,
368 params: mz_sql::plan::Params,
369 },
370 PeekStageReady {
371 ctx: ExecuteContext,
372 span: Span,
373 stage: PeekStage,
374 },
375 CreateIndexStageReady {
376 ctx: ExecuteContext,
377 span: Span,
378 stage: CreateIndexStage,
379 },
380 CreateViewStageReady {
381 ctx: ExecuteContext,
382 span: Span,
383 stage: CreateViewStage,
384 },
385 CreateMaterializedViewStageReady {
386 ctx: ExecuteContext,
387 span: Span,
388 stage: CreateMaterializedViewStage,
389 },
390 SubscribeStageReady {
391 ctx: ExecuteContext,
392 span: Span,
393 stage: SubscribeStage,
394 },
395 IntrospectionSubscribeStageReady {
396 span: Span,
397 stage: IntrospectionSubscribeStage,
398 },
399 SecretStageReady {
400 ctx: ExecuteContext,
401 span: Span,
402 stage: SecretStage,
403 },
404 ClusterStageReady {
405 ctx: ExecuteContext,
406 span: Span,
407 stage: ClusterStage,
408 },
409 ExplainTimestampStageReady {
410 ctx: ExecuteContext,
411 span: Span,
412 stage: ExplainTimestampStage,
413 },
414 DrainStatementLog,
415 PrivateLinkVpcEndpointEvents(Vec<VpcEndpointEvent>),
416 CheckSchedulingPolicies,
417
418 SchedulingDecisions(Vec<(&'static str, Vec<(ClusterId, SchedulingDecision)>)>),
423}
424
425impl Message {
426 pub const fn kind(&self) -> &'static str {
428 match self {
429 Message::Command(_, msg) => match msg {
430 Command::CatalogSnapshot { .. } => "command-catalog_snapshot",
431 Command::Startup { .. } => "command-startup",
432 Command::Execute { .. } => "command-execute",
433 Command::Commit { .. } => "command-commit",
434 Command::CancelRequest { .. } => "command-cancel_request",
435 Command::PrivilegedCancelRequest { .. } => "command-privileged_cancel_request",
436 Command::GetWebhook { .. } => "command-get_webhook",
437 Command::GetSystemVars { .. } => "command-get_system_vars",
438 Command::SetSystemVars { .. } => "command-set_system_vars",
439 Command::Terminate { .. } => "command-terminate",
440 Command::RetireExecute { .. } => "command-retire_execute",
441 Command::CheckConsistency { .. } => "command-check_consistency",
442 Command::Dump { .. } => "command-dump",
443 Command::AuthenticatePassword { .. } => "command-auth_check",
444 Command::AuthenticateGetSASLChallenge { .. } => "command-auth_get_sasl_challenge",
445 Command::AuthenticateVerifySASLProof { .. } => "command-auth_verify_sasl_proof",
446 Command::CheckRoleCanLogin { .. } => "command-check_role_can_login",
447 Command::GetComputeInstanceClient { .. } => "get-compute-instance-client",
448 Command::GetOracle { .. } => "get-oracle",
449 Command::DetermineRealTimeRecentTimestamp { .. } => {
450 "determine-real-time-recent-timestamp"
451 }
452 Command::GetTransactionReadHoldsBundle { .. } => {
453 "get-transaction-read-holds-bundle"
454 }
455 Command::StoreTransactionReadHolds { .. } => "store-transaction-read-holds",
456 Command::ExecuteSlowPathPeek { .. } => "execute-slow-path-peek",
457 Command::ExecuteSubscribe { .. } => "execute-subscribe",
458 Command::CopyToPreflight { .. } => "copy-to-preflight",
459 Command::ExecuteCopyTo { .. } => "execute-copy-to",
460 Command::ExecuteSideEffectingFunc { .. } => "execute-side-effecting-func",
461 Command::RegisterFrontendPeek { .. } => "register-frontend-peek",
462 Command::UnregisterFrontendPeek { .. } => "unregister-frontend-peek",
463 Command::ExplainTimestamp { .. } => "explain-timestamp",
464 Command::FrontendStatementLogging(..) => "frontend-statement-logging",
465 Command::StartCopyFromStdin { .. } => "start-copy-from-stdin",
466 Command::InjectAuditEvents { .. } => "inject-audit-events",
467 },
468 Message::ControllerReady {
469 controller: ControllerReadiness::Compute,
470 } => "controller_ready(compute)",
471 Message::ControllerReady {
472 controller: ControllerReadiness::Storage,
473 } => "controller_ready(storage)",
474 Message::ControllerReady {
475 controller: ControllerReadiness::Metrics,
476 } => "controller_ready(metrics)",
477 Message::ControllerReady {
478 controller: ControllerReadiness::Internal,
479 } => "controller_ready(internal)",
480 Message::PurifiedStatementReady(_) => "purified_statement_ready",
481 Message::CreateConnectionValidationReady(_) => "create_connection_validation_ready",
482 Message::TryDeferred { .. } => "try_deferred",
483 Message::GroupCommitInitiate(..) => "group_commit_initiate",
484 Message::AdvanceTimelines => "advance_timelines",
485 Message::ClusterEvent(_) => "cluster_event",
486 Message::CancelPendingPeeks { .. } => "cancel_pending_peeks",
487 Message::LinearizeReads => "linearize_reads",
488 Message::StagedBatches { .. } => "staged_batches",
489 Message::StorageUsageSchedule => "storage_usage_schedule",
490 Message::StorageUsageFetch => "storage_usage_fetch",
491 Message::StorageUsageUpdate(_) => "storage_usage_update",
492 Message::StorageUsagePrune(_) => "storage_usage_prune",
493 Message::ArrangementSizesSchedule => "arrangement_sizes_schedule",
494 Message::ArrangementSizesSnapshot => "arrangement_sizes_snapshot",
495 Message::ArrangementSizesPrune(_) => "arrangement_sizes_prune",
496 Message::RetireExecute { .. } => "retire_execute",
497 Message::ExecuteSingleStatementTransaction { .. } => {
498 "execute_single_statement_transaction"
499 }
500 Message::PeekStageReady { .. } => "peek_stage_ready",
501 Message::ExplainTimestampStageReady { .. } => "explain_timestamp_stage_ready",
502 Message::CreateIndexStageReady { .. } => "create_index_stage_ready",
503 Message::CreateViewStageReady { .. } => "create_view_stage_ready",
504 Message::CreateMaterializedViewStageReady { .. } => {
505 "create_materialized_view_stage_ready"
506 }
507 Message::SubscribeStageReady { .. } => "subscribe_stage_ready",
508 Message::IntrospectionSubscribeStageReady { .. } => {
509 "introspection_subscribe_stage_ready"
510 }
511 Message::SecretStageReady { .. } => "secret_stage_ready",
512 Message::ClusterStageReady { .. } => "cluster_stage_ready",
513 Message::DrainStatementLog => "drain_statement_log",
514 Message::AlterConnectionValidationReady(..) => "alter_connection_validation_ready",
515 Message::PrivateLinkVpcEndpointEvents(_) => "private_link_vpc_endpoint_events",
516 Message::CheckSchedulingPolicies => "check_scheduling_policies",
517 Message::SchedulingDecisions { .. } => "scheduling_decision",
518 Message::DeferredStatementReady => "deferred_statement_ready",
519 }
520 }
521}
522
523#[derive(Debug)]
525pub enum ControllerReadiness {
526 Storage,
528 Compute,
530 Metrics,
532 Internal,
534}
535
536#[derive(Derivative)]
537#[derivative(Debug)]
538pub struct BackgroundWorkResult<T> {
539 #[derivative(Debug = "ignore")]
540 pub ctx: ExecuteContext,
541 pub result: Result<T, AdapterError>,
542 pub params: Params,
543 pub plan_validity: PlanValidity,
544 pub original_stmt: Arc<Statement<Raw>>,
545 pub otel_ctx: OpenTelemetryContext,
546}
547
548pub type PurifiedStatementReady = BackgroundWorkResult<mz_sql::pure::PurifiedStatement>;
549
550#[derive(Derivative)]
551#[derivative(Debug)]
552pub struct ValidationReady<T> {
553 #[derivative(Debug = "ignore")]
554 pub ctx: ExecuteContext,
555 pub result: Result<T, AdapterError>,
556 pub resolved_ids: ResolvedIds,
557 pub connection_id: CatalogItemId,
558 pub connection_gid: GlobalId,
559 pub plan_validity: PlanValidity,
560 pub otel_ctx: OpenTelemetryContext,
561}
562
563pub type CreateConnectionValidationReady = ValidationReady<CreateConnectionPlan>;
564pub type AlterConnectionValidationReady = ValidationReady<Connection>;
565
566#[derive(Debug)]
567pub enum PeekStage {
568 LinearizeTimestamp(PeekStageLinearizeTimestamp),
570 RealTimeRecency(PeekStageRealTimeRecency),
571 TimestampReadHold(PeekStageTimestampReadHold),
572 Optimize(PeekStageOptimize),
573 Finish(PeekStageFinish),
575 ExplainPlan(PeekStageExplainPlan),
577 ExplainPushdown(PeekStageExplainPushdown),
578 CopyToPreflight(PeekStageCopyTo),
580 CopyToDataflow(PeekStageCopyTo),
582}
583
584#[derive(Debug)]
585pub struct CopyToContext {
586 pub desc: RelationDesc,
588 pub uri: Uri,
590 pub connection: StorageConnection<ReferencedConnection>,
592 pub connection_id: CatalogItemId,
594 pub format: S3SinkFormat,
596 pub max_file_size: u64,
598 pub output_batch_count: Option<u64>,
603}
604
605#[derive(Debug)]
606pub struct PeekStageLinearizeTimestamp {
607 validity: PlanValidity,
608 plan: mz_sql::plan::SelectPlan,
609 max_query_result_size: Option<u64>,
610 source_ids: BTreeSet<GlobalId>,
611 target_replica: Option<ReplicaId>,
612 timeline_context: TimelineContext,
613 optimizer: optimize::PeekOptimizer,
614 explain_ctx: ExplainContext,
617}
618
619#[derive(Debug)]
620pub struct PeekStageRealTimeRecency {
621 validity: PlanValidity,
622 plan: mz_sql::plan::SelectPlan,
623 max_query_result_size: Option<u64>,
624 source_ids: BTreeSet<GlobalId>,
625 target_replica: Option<ReplicaId>,
626 timeline_context: TimelineContext,
627 oracle_read_ts: Option<Timestamp>,
628 optimizer: optimize::PeekOptimizer,
629 explain_ctx: ExplainContext,
632}
633
634#[derive(Debug)]
635pub struct PeekStageTimestampReadHold {
636 validity: PlanValidity,
637 plan: mz_sql::plan::SelectPlan,
638 max_query_result_size: Option<u64>,
639 source_ids: BTreeSet<GlobalId>,
640 target_replica: Option<ReplicaId>,
641 timeline_context: TimelineContext,
642 oracle_read_ts: Option<Timestamp>,
643 real_time_recency_ts: Option<mz_repr::Timestamp>,
644 optimizer: optimize::PeekOptimizer,
645 explain_ctx: ExplainContext,
648}
649
650#[derive(Debug)]
651pub struct PeekStageOptimize {
652 validity: PlanValidity,
653 plan: mz_sql::plan::SelectPlan,
654 max_query_result_size: Option<u64>,
655 source_ids: BTreeSet<GlobalId>,
656 id_bundle: CollectionIdBundle,
657 target_replica: Option<ReplicaId>,
658 determination: TimestampDetermination,
659 optimizer: optimize::PeekOptimizer,
660 explain_ctx: ExplainContext,
663}
664
665#[derive(Debug)]
666pub struct PeekStageFinish {
667 validity: PlanValidity,
668 plan: mz_sql::plan::SelectPlan,
669 max_query_result_size: Option<u64>,
670 id_bundle: CollectionIdBundle,
671 target_replica: Option<ReplicaId>,
672 source_ids: BTreeSet<GlobalId>,
673 determination: TimestampDetermination,
674 cluster_id: ComputeInstanceId,
675 finishing: RowSetFinishing,
676 plan_insights_optimizer_trace: Option<OptimizerTrace>,
679 insights_ctx: Option<Box<PlanInsightsContext>>,
680 global_lir_plan: optimize::peek::GlobalLirPlan,
681 optimization_finished_at: EpochMillis,
682}
683
684#[derive(Debug)]
685pub struct PeekStageCopyTo {
686 validity: PlanValidity,
687 optimizer: optimize::copy_to::Optimizer,
688 global_lir_plan: optimize::copy_to::GlobalLirPlan,
689 optimization_finished_at: EpochMillis,
690 source_ids: BTreeSet<GlobalId>,
691}
692
693#[derive(Debug)]
694pub struct PeekStageExplainPlan {
695 validity: PlanValidity,
696 optimizer: optimize::peek::Optimizer,
697 df_meta: DataflowMetainfo,
698 explain_ctx: ExplainPlanContext,
699 insights_ctx: Option<Box<PlanInsightsContext>>,
700}
701
702#[derive(Debug)]
703pub struct PeekStageExplainPushdown {
704 validity: PlanValidity,
705 determination: TimestampDetermination,
706 imports: BTreeMap<GlobalId, MapFilterProject>,
707}
708
709#[derive(Debug)]
710pub enum CreateIndexStage {
711 Optimize(CreateIndexOptimize),
712 Finish(CreateIndexFinish),
713 Explain(CreateIndexExplain),
714}
715
716#[derive(Debug)]
717pub struct CreateIndexOptimize {
718 validity: PlanValidity,
719 plan: plan::CreateIndexPlan,
720 resolved_ids: ResolvedIds,
721 explain_ctx: ExplainContext,
724}
725
726#[derive(Debug)]
727pub struct CreateIndexFinish {
728 validity: PlanValidity,
729 item_id: CatalogItemId,
730 global_id: GlobalId,
731 plan: plan::CreateIndexPlan,
732 resolved_ids: ResolvedIds,
733 global_mir_plan: optimize::index::GlobalMirPlan,
734 global_lir_plan: optimize::index::GlobalLirPlan,
735 optimizer_features: OptimizerFeatures,
736}
737
738#[derive(Debug)]
739pub struct CreateIndexExplain {
740 validity: PlanValidity,
741 exported_index_id: GlobalId,
742 plan: plan::CreateIndexPlan,
743 df_meta: DataflowMetainfo,
744 explain_ctx: ExplainPlanContext,
745}
746
747#[derive(Debug)]
748pub enum CreateViewStage {
749 Optimize(CreateViewOptimize),
750 Finish(CreateViewFinish),
751 Explain(CreateViewExplain),
752}
753
754#[derive(Debug)]
755pub struct CreateViewOptimize {
756 validity: PlanValidity,
757 plan: plan::CreateViewPlan,
758 resolved_ids: ResolvedIds,
759 explain_ctx: ExplainContext,
762}
763
764#[derive(Debug)]
765pub struct CreateViewFinish {
766 validity: PlanValidity,
767 item_id: CatalogItemId,
769 global_id: GlobalId,
771 plan: plan::CreateViewPlan,
772 resolved_ids: ResolvedIds,
774 optimized_expr: OptimizedMirRelationExpr,
775}
776
777#[derive(Debug)]
778pub struct CreateViewExplain {
779 validity: PlanValidity,
780 id: GlobalId,
781 plan: plan::CreateViewPlan,
782 explain_ctx: ExplainPlanContext,
783}
784
785#[derive(Debug)]
786pub enum ExplainTimestampStage {
787 Optimize(ExplainTimestampOptimize),
788 RealTimeRecency(ExplainTimestampRealTimeRecency),
789 Finish(ExplainTimestampFinish),
790}
791
792#[derive(Debug)]
793pub struct ExplainTimestampOptimize {
794 validity: PlanValidity,
795 plan: plan::ExplainTimestampPlan,
796 cluster_id: ClusterId,
797}
798
799#[derive(Debug)]
800pub struct ExplainTimestampRealTimeRecency {
801 validity: PlanValidity,
802 format: ExplainFormat,
803 optimized_plan: OptimizedMirRelationExpr,
804 cluster_id: ClusterId,
805 when: QueryWhen,
806}
807
808#[derive(Debug)]
809pub struct ExplainTimestampFinish {
810 validity: PlanValidity,
811 format: ExplainFormat,
812 optimized_plan: OptimizedMirRelationExpr,
813 cluster_id: ClusterId,
814 source_ids: BTreeSet<GlobalId>,
815 when: QueryWhen,
816 real_time_recency_ts: Option<Timestamp>,
817}
818
819#[derive(Debug)]
820pub enum ClusterStage {
821 Alter(AlterCluster),
822 WaitForHydrated(AlterClusterWaitForHydrated),
823 Finalize(AlterClusterFinalize),
824}
825
826#[derive(Debug)]
827pub struct AlterCluster {
828 validity: PlanValidity,
829 plan: plan::AlterClusterPlan,
830}
831
832#[derive(Debug)]
833pub struct AlterClusterWaitForHydrated {
834 validity: PlanValidity,
835 plan: plan::AlterClusterPlan,
836 new_config: ClusterVariantManaged,
837 workload_class: Option<String>,
838 timeout_time: Instant,
839 on_timeout: OnTimeoutAction,
840}
841
842#[derive(Debug)]
843pub struct AlterClusterFinalize {
844 validity: PlanValidity,
845 plan: plan::AlterClusterPlan,
846 new_config: ClusterVariantManaged,
847 workload_class: Option<String>,
848}
849
850#[derive(Debug)]
851pub enum ExplainContext {
852 None,
854 Plan(ExplainPlanContext),
856 PlanInsightsNotice(OptimizerTrace),
859 Pushdown,
861}
862
863impl ExplainContext {
864 pub(crate) fn dispatch_guard(&self) -> Option<DispatchGuard<'_>> {
868 let optimizer_trace = match self {
869 ExplainContext::Plan(explain_ctx) => Some(&explain_ctx.optimizer_trace),
870 ExplainContext::PlanInsightsNotice(optimizer_trace) => Some(optimizer_trace),
871 _ => None,
872 };
873 optimizer_trace.map(|optimizer_trace| optimizer_trace.as_guard())
874 }
875
876 pub(crate) fn needs_cluster(&self) -> bool {
877 match self {
878 ExplainContext::None => true,
879 ExplainContext::Plan(..) => false,
880 ExplainContext::PlanInsightsNotice(..) => true,
881 ExplainContext::Pushdown => false,
882 }
883 }
884
885 pub(crate) fn needs_plan_insights(&self) -> bool {
886 matches!(
887 self,
888 ExplainContext::Plan(ExplainPlanContext {
889 stage: ExplainStage::PlanInsights,
890 ..
891 }) | ExplainContext::PlanInsightsNotice(_)
892 )
893 }
894}
895
896#[derive(Debug)]
897pub struct ExplainPlanContext {
898 pub broken: bool,
903 pub config: ExplainConfig,
904 pub format: ExplainFormat,
905 pub stage: ExplainStage,
906 pub replan: Option<GlobalId>,
907 pub desc: Option<RelationDesc>,
908 pub optimizer_trace: OptimizerTrace,
909}
910
911#[derive(Debug)]
912pub enum CreateMaterializedViewStage {
913 Optimize(CreateMaterializedViewOptimize),
914 Finish(CreateMaterializedViewFinish),
915 Explain(CreateMaterializedViewExplain),
916}
917
918#[derive(Debug)]
919pub struct CreateMaterializedViewOptimize {
920 validity: PlanValidity,
921 plan: plan::CreateMaterializedViewPlan,
922 resolved_ids: ResolvedIds,
923 explain_ctx: ExplainContext,
926}
927
928#[derive(Debug)]
929pub struct CreateMaterializedViewFinish {
930 item_id: CatalogItemId,
932 global_id: GlobalId,
934 validity: PlanValidity,
935 plan: plan::CreateMaterializedViewPlan,
936 resolved_ids: ResolvedIds,
937 local_mir_plan: optimize::materialized_view::LocalMirPlan,
938 global_mir_plan: optimize::materialized_view::GlobalMirPlan,
939 global_lir_plan: optimize::materialized_view::GlobalLirPlan,
940 optimizer_features: OptimizerFeatures,
941}
942
943#[derive(Debug)]
944pub struct CreateMaterializedViewExplain {
945 global_id: GlobalId,
946 validity: PlanValidity,
947 plan: plan::CreateMaterializedViewPlan,
948 df_meta: DataflowMetainfo,
949 explain_ctx: ExplainPlanContext,
950}
951
952#[derive(Debug)]
953pub enum SubscribeStage {
954 OptimizeMir(SubscribeOptimizeMir),
955 TimestampOptimizeLir(SubscribeTimestampOptimizeLir),
956 Finish(SubscribeFinish),
957 Explain(SubscribeExplain),
958}
959
960#[derive(Debug)]
961pub struct SubscribeOptimizeMir {
962 validity: PlanValidity,
963 plan: plan::SubscribePlan,
964 timeline: TimelineContext,
965 dependency_ids: BTreeSet<GlobalId>,
966 cluster_id: ComputeInstanceId,
967 replica_id: Option<ReplicaId>,
968 explain_ctx: ExplainContext,
971}
972
973#[derive(Debug)]
974pub struct SubscribeTimestampOptimizeLir {
975 validity: PlanValidity,
976 plan: plan::SubscribePlan,
977 timeline: TimelineContext,
978 optimizer: optimize::subscribe::Optimizer,
979 global_mir_plan: optimize::subscribe::GlobalMirPlan<optimize::subscribe::Unresolved>,
980 dependency_ids: BTreeSet<GlobalId>,
981 replica_id: Option<ReplicaId>,
982 explain_ctx: ExplainContext,
985}
986
987#[derive(Debug)]
988pub struct SubscribeFinish {
989 validity: PlanValidity,
990 cluster_id: ComputeInstanceId,
991 replica_id: Option<ReplicaId>,
992 plan: plan::SubscribePlan,
993 global_lir_plan: optimize::subscribe::GlobalLirPlan,
994 dependency_ids: BTreeSet<GlobalId>,
995}
996
997#[derive(Debug)]
998pub struct SubscribeExplain {
999 validity: PlanValidity,
1000 optimizer: optimize::subscribe::Optimizer,
1001 df_meta: DataflowMetainfo,
1002 cluster_id: ComputeInstanceId,
1003 explain_ctx: ExplainPlanContext,
1004}
1005
1006#[derive(Debug)]
1007pub enum IntrospectionSubscribeStage {
1008 OptimizeMir(IntrospectionSubscribeOptimizeMir),
1009 TimestampOptimizeLir(IntrospectionSubscribeTimestampOptimizeLir),
1010 Finish(IntrospectionSubscribeFinish),
1011}
1012
1013#[derive(Debug)]
1014pub struct IntrospectionSubscribeOptimizeMir {
1015 validity: PlanValidity,
1016 plan: plan::SubscribePlan,
1017 subscribe_id: GlobalId,
1018 cluster_id: ComputeInstanceId,
1019 replica_id: ReplicaId,
1020}
1021
1022#[derive(Debug)]
1023pub struct IntrospectionSubscribeTimestampOptimizeLir {
1024 validity: PlanValidity,
1025 optimizer: optimize::subscribe::Optimizer,
1026 global_mir_plan: optimize::subscribe::GlobalMirPlan<optimize::subscribe::Unresolved>,
1027 cluster_id: ComputeInstanceId,
1028 replica_id: ReplicaId,
1029}
1030
1031#[derive(Debug)]
1032pub struct IntrospectionSubscribeFinish {
1033 validity: PlanValidity,
1034 global_lir_plan: optimize::subscribe::GlobalLirPlan,
1035 read_holds: ReadHolds,
1036 cluster_id: ComputeInstanceId,
1037 replica_id: ReplicaId,
1038}
1039
1040#[derive(Debug)]
1041pub enum SecretStage {
1042 CreateEnsure(CreateSecretEnsure),
1043 CreateFinish(CreateSecretFinish),
1044 RotateKeysEnsure(RotateKeysSecretEnsure),
1045 RotateKeysFinish(RotateKeysSecretFinish),
1046 Alter(AlterSecret),
1047}
1048
1049#[derive(Debug)]
1050pub struct CreateSecretEnsure {
1051 validity: PlanValidity,
1052 plan: plan::CreateSecretPlan,
1053}
1054
1055#[derive(Debug)]
1056pub struct CreateSecretFinish {
1057 validity: PlanValidity,
1058 item_id: CatalogItemId,
1059 global_id: GlobalId,
1060 plan: plan::CreateSecretPlan,
1061}
1062
1063#[derive(Debug)]
1064pub struct RotateKeysSecretEnsure {
1065 validity: PlanValidity,
1066 id: CatalogItemId,
1067}
1068
1069#[derive(Debug)]
1070pub struct RotateKeysSecretFinish {
1071 validity: PlanValidity,
1072 ops: Vec<crate::catalog::Op>,
1073}
1074
1075#[derive(Debug)]
1076pub struct AlterSecret {
1077 validity: PlanValidity,
1078 plan: plan::AlterSecretPlan,
1079}
1080
1081#[derive(Debug, Copy, Clone, PartialEq, Eq)]
1086pub enum TargetCluster {
1087 CatalogServer,
1089 Active,
1091 Transaction(ClusterId),
1093}
1094
1095pub(crate) enum StageResult<T> {
1097 Handle(JoinHandle<Result<T, AdapterError>>),
1099 HandleRetire(JoinHandle<Result<ExecuteResponse, AdapterError>>),
1101 Immediate(T),
1103 Response(ExecuteResponse),
1105}
1106
1107pub(crate) trait Staged: Send {
1109 type Ctx: StagedContext;
1110
1111 fn validity(&mut self) -> &mut PlanValidity;
1112
1113 async fn stage(
1115 self,
1116 coord: &mut Coordinator,
1117 ctx: &mut Self::Ctx,
1118 ) -> Result<StageResult<Box<Self>>, AdapterError>;
1119
1120 fn message(self, ctx: Self::Ctx, span: Span) -> Message;
1122
1123 fn cancel_enabled(&self) -> bool;
1125}
1126
1127pub trait StagedContext {
1128 fn retire(self, result: Result<ExecuteResponse, AdapterError>);
1129 fn session(&self) -> Option<&Session>;
1130}
1131
1132impl StagedContext for ExecuteContext {
1133 fn retire(self, result: Result<ExecuteResponse, AdapterError>) {
1134 self.retire(result);
1135 }
1136
1137 fn session(&self) -> Option<&Session> {
1138 Some(self.session())
1139 }
1140}
1141
1142impl StagedContext for () {
1143 fn retire(self, _result: Result<ExecuteResponse, AdapterError>) {}
1144
1145 fn session(&self) -> Option<&Session> {
1146 None
1147 }
1148}
1149
1150pub struct Config {
1152 pub controller_config: ControllerConfig,
1153 pub controller_envd_epoch: NonZeroI64,
1154 pub storage: Box<dyn mz_catalog::durable::DurableCatalogState>,
1155 pub audit_logs_iterator: AuditLogIterator,
1156 pub timestamp_oracle_url: Option<SensitiveUrl>,
1157 pub unsafe_mode: bool,
1158 pub all_features: bool,
1159 pub build_info: &'static BuildInfo,
1160 pub environment_id: EnvironmentId,
1161 pub metrics_registry: MetricsRegistry,
1162 pub now: NowFn,
1163 pub secrets_controller: Arc<dyn SecretsController>,
1164 pub cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
1165 pub availability_zones: Vec<String>,
1166 pub cluster_replica_sizes: ClusterReplicaSizeMap,
1167 pub builtin_system_cluster_config: BootstrapBuiltinClusterConfig,
1168 pub builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig,
1169 pub builtin_probe_cluster_config: BootstrapBuiltinClusterConfig,
1170 pub builtin_support_cluster_config: BootstrapBuiltinClusterConfig,
1171 pub builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig,
1172 pub system_parameter_defaults: BTreeMap<String, String>,
1173 pub storage_usage_client: StorageUsageClient,
1174 pub storage_usage_collection_interval: Duration,
1175 pub storage_usage_retention_period: Option<Duration>,
1176 pub segment_client: Option<mz_segment::Client>,
1177 pub egress_addresses: Vec<IpNet>,
1178 pub remote_system_parameters: Option<BTreeMap<String, String>>,
1179 pub aws_account_id: Option<String>,
1180 pub aws_privatelink_availability_zones: Option<Vec<String>>,
1181 pub connection_context: ConnectionContext,
1182 pub connection_limit_callback: Box<dyn Fn(u64, u64) -> () + Send + Sync + 'static>,
1183 pub webhook_concurrency_limit: WebhookConcurrencyLimiter,
1184 pub http_host_name: Option<String>,
1185 pub tracing_handle: TracingHandle,
1186 pub read_only_controllers: bool,
1190
1191 pub caught_up_trigger: Option<Trigger>,
1195
1196 pub helm_chart_version: Option<String>,
1197 pub license_key: ValidatedLicenseKey,
1198 pub external_login_password_mz_system: Option<Password>,
1199 pub force_builtin_schema_migration: Option<String>,
1200}
1201
1202#[derive(Debug, Serialize)]
1204pub struct ConnMeta {
1205 secret_key: u32,
1210 connected_at: EpochMillis,
1212 user: User,
1213 application_name: String,
1214 uuid: Uuid,
1215 conn_id: ConnectionId,
1216 client_ip: Option<IpAddr>,
1217
1218 drop_sinks: BTreeSet<GlobalId>,
1221
1222 #[serde(skip)]
1224 deferred_lock: Option<OwnedMutexGuard<()>>,
1225
1226 pending_cluster_alters: BTreeSet<ClusterId>,
1229
1230 #[serde(skip)]
1232 notice_tx: mpsc::UnboundedSender<AdapterNotice>,
1233
1234 authenticated_role: RoleId,
1238}
1239
1240impl ConnMeta {
1241 pub fn conn_id(&self) -> &ConnectionId {
1242 &self.conn_id
1243 }
1244
1245 pub fn user(&self) -> &User {
1246 &self.user
1247 }
1248
1249 pub fn application_name(&self) -> &str {
1250 &self.application_name
1251 }
1252
1253 pub fn authenticated_role_id(&self) -> &RoleId {
1254 &self.authenticated_role
1255 }
1256
1257 pub fn uuid(&self) -> Uuid {
1258 self.uuid
1259 }
1260
1261 pub fn client_ip(&self) -> Option<IpAddr> {
1262 self.client_ip
1263 }
1264
1265 pub fn connected_at(&self) -> EpochMillis {
1266 self.connected_at
1267 }
1268}
1269
1270#[derive(Debug)]
1271pub struct PendingTxn {
1273 ctx: ExecuteContext,
1275 response: Result<PendingTxnResponse, AdapterError>,
1277 action: EndTransactionAction,
1279}
1280
1281#[derive(Debug)]
1282pub enum PendingTxnResponse {
1284 Committed {
1286 params: BTreeMap<&'static str, String>,
1288 },
1289 Rolledback {
1291 params: BTreeMap<&'static str, String>,
1293 },
1294}
1295
1296impl PendingTxnResponse {
1297 pub fn extend_params(&mut self, p: impl IntoIterator<Item = (&'static str, String)>) {
1298 match self {
1299 PendingTxnResponse::Committed { params }
1300 | PendingTxnResponse::Rolledback { params } => params.extend(p),
1301 }
1302 }
1303}
1304
1305impl From<PendingTxnResponse> for ExecuteResponse {
1306 fn from(value: PendingTxnResponse) -> Self {
1307 match value {
1308 PendingTxnResponse::Committed { params } => {
1309 ExecuteResponse::TransactionCommitted { params }
1310 }
1311 PendingTxnResponse::Rolledback { params } => {
1312 ExecuteResponse::TransactionRolledBack { params }
1313 }
1314 }
1315 }
1316}
1317
1318#[derive(Debug)]
1319pub struct PendingReadTxn {
1321 txn: PendingRead,
1323 timestamp_context: TimestampContext,
1325 created: Instant,
1327 num_requeues: u64,
1331 otel_ctx: OpenTelemetryContext,
1333}
1334
1335impl PendingReadTxn {
1336 pub fn timestamp_context(&self) -> &TimestampContext {
1338 &self.timestamp_context
1339 }
1340
1341 pub(crate) fn take_context(self) -> ExecuteContext {
1342 self.txn.take_context()
1343 }
1344}
1345
1346#[derive(Debug)]
1347enum PendingRead {
1349 Read {
1350 txn: PendingTxn,
1352 },
1353 ReadThenWrite {
1354 ctx: ExecuteContext,
1356 tx: oneshot::Sender<Option<ExecuteContext>>,
1359 },
1360}
1361
1362impl PendingRead {
1363 #[instrument(level = "debug")]
1368 pub fn finish(self) -> Option<(ExecuteContext, Result<ExecuteResponse, AdapterError>)> {
1369 match self {
1370 PendingRead::Read {
1371 txn:
1372 PendingTxn {
1373 mut ctx,
1374 response,
1375 action,
1376 },
1377 ..
1378 } => {
1379 let changed = ctx.session_mut().vars_mut().end_transaction(action);
1380 let response = response.map(|mut r| {
1382 r.extend_params(changed);
1383 ExecuteResponse::from(r)
1384 });
1385
1386 Some((ctx, response))
1387 }
1388 PendingRead::ReadThenWrite { ctx, tx, .. } => {
1389 let _ = tx.send(Some(ctx));
1391 None
1392 }
1393 }
1394 }
1395
1396 fn label(&self) -> &'static str {
1397 match self {
1398 PendingRead::Read { .. } => "read",
1399 PendingRead::ReadThenWrite { .. } => "read_then_write",
1400 }
1401 }
1402
1403 pub(crate) fn take_context(self) -> ExecuteContext {
1404 match self {
1405 PendingRead::Read { txn, .. } => txn.ctx,
1406 PendingRead::ReadThenWrite { ctx, tx, .. } => {
1407 let _ = tx.send(None);
1410 ctx
1411 }
1412 }
1413 }
1414}
1415
1416#[derive(Debug, Default)]
1426#[must_use]
1427pub struct ExecuteContextExtra {
1428 statement_uuid: Option<StatementLoggingId>,
1429}
1430
1431impl ExecuteContextExtra {
1432 pub(crate) fn new(statement_uuid: Option<StatementLoggingId>) -> Self {
1433 Self { statement_uuid }
1434 }
1435 pub fn is_trivial(&self) -> bool {
1436 self.statement_uuid.is_none()
1437 }
1438 pub fn contents(&self) -> Option<StatementLoggingId> {
1439 self.statement_uuid
1440 }
1441 #[must_use]
1445 pub(crate) fn retire(self) -> Option<StatementLoggingId> {
1446 self.statement_uuid
1447 }
1448}
1449
1450#[derive(Debug)]
1460#[must_use]
1461pub struct ExecuteContextGuard {
1462 extra: ExecuteContextExtra,
1463 coordinator_tx: mpsc::UnboundedSender<Message>,
1468}
1469
1470impl Default for ExecuteContextGuard {
1471 fn default() -> Self {
1472 let (tx, _rx) = mpsc::unbounded_channel();
1476 Self {
1477 extra: ExecuteContextExtra::default(),
1478 coordinator_tx: tx,
1479 }
1480 }
1481}
1482
1483impl ExecuteContextGuard {
1484 pub(crate) fn new(
1485 statement_uuid: Option<StatementLoggingId>,
1486 coordinator_tx: mpsc::UnboundedSender<Message>,
1487 ) -> Self {
1488 Self {
1489 extra: ExecuteContextExtra::new(statement_uuid),
1490 coordinator_tx,
1491 }
1492 }
1493 pub fn is_trivial(&self) -> bool {
1494 self.extra.is_trivial()
1495 }
1496 pub fn contents(&self) -> Option<StatementLoggingId> {
1497 self.extra.contents()
1498 }
1499 pub(crate) fn defuse(mut self) -> ExecuteContextExtra {
1506 std::mem::take(&mut self.extra)
1508 }
1509}
1510
1511impl Drop for ExecuteContextGuard {
1512 fn drop(&mut self) {
1513 if let Some(statement_uuid) = self.extra.statement_uuid.take() {
1514 let msg = Message::RetireExecute {
1517 data: ExecuteContextExtra {
1518 statement_uuid: Some(statement_uuid),
1519 },
1520 otel_ctx: OpenTelemetryContext::obtain(),
1521 reason: StatementEndedExecutionReason::Aborted,
1522 };
1523 let _ = self.coordinator_tx.send(msg);
1526 }
1527 }
1528}
1529
1530#[derive(Debug)]
1542pub struct ExecuteContext {
1543 inner: Box<ExecuteContextInner>,
1544}
1545
1546impl std::ops::Deref for ExecuteContext {
1547 type Target = ExecuteContextInner;
1548 fn deref(&self) -> &Self::Target {
1549 &*self.inner
1550 }
1551}
1552
1553impl std::ops::DerefMut for ExecuteContext {
1554 fn deref_mut(&mut self) -> &mut Self::Target {
1555 &mut *self.inner
1556 }
1557}
1558
1559#[derive(Debug)]
1560pub struct ExecuteContextInner {
1561 tx: ClientTransmitter<ExecuteResponse>,
1562 internal_cmd_tx: mpsc::UnboundedSender<Message>,
1563 session: Session,
1564 extra: ExecuteContextGuard,
1565}
1566
1567impl ExecuteContext {
1568 pub fn session(&self) -> &Session {
1569 &self.session
1570 }
1571
1572 pub fn session_mut(&mut self) -> &mut Session {
1573 &mut self.session
1574 }
1575
1576 pub fn tx(&self) -> &ClientTransmitter<ExecuteResponse> {
1577 &self.tx
1578 }
1579
1580 pub fn tx_mut(&mut self) -> &mut ClientTransmitter<ExecuteResponse> {
1581 &mut self.tx
1582 }
1583
1584 pub fn from_parts(
1585 tx: ClientTransmitter<ExecuteResponse>,
1586 internal_cmd_tx: mpsc::UnboundedSender<Message>,
1587 session: Session,
1588 extra: ExecuteContextGuard,
1589 ) -> Self {
1590 Self {
1591 inner: ExecuteContextInner {
1592 tx,
1593 session,
1594 extra,
1595 internal_cmd_tx,
1596 }
1597 .into(),
1598 }
1599 }
1600
1601 pub fn into_parts(
1610 self,
1611 ) -> (
1612 ClientTransmitter<ExecuteResponse>,
1613 mpsc::UnboundedSender<Message>,
1614 Session,
1615 ExecuteContextGuard,
1616 ) {
1617 let ExecuteContextInner {
1618 tx,
1619 internal_cmd_tx,
1620 session,
1621 extra,
1622 } = *self.inner;
1623 (tx, internal_cmd_tx, session, extra)
1624 }
1625
1626 #[instrument(level = "debug")]
1628 pub fn retire(self, result: Result<ExecuteResponse, AdapterError>) {
1629 let ExecuteContextInner {
1630 tx,
1631 internal_cmd_tx,
1632 session,
1633 extra,
1634 } = *self.inner;
1635 let reason = if extra.is_trivial() {
1636 None
1637 } else {
1638 Some((&result).into())
1639 };
1640 tx.send(result, session);
1641 if let Some(reason) = reason {
1642 let extra = extra.defuse();
1644 if let Err(e) = internal_cmd_tx.send(Message::RetireExecute {
1645 otel_ctx: OpenTelemetryContext::obtain(),
1646 data: extra,
1647 reason,
1648 }) {
1649 warn!("internal_cmd_rx dropped before we could send: {:?}", e);
1650 }
1651 }
1652 }
1653
1654 pub fn extra(&self) -> &ExecuteContextGuard {
1655 &self.extra
1656 }
1657
1658 pub fn extra_mut(&mut self) -> &mut ExecuteContextGuard {
1659 &mut self.extra
1660 }
1661}
1662
1663#[derive(Debug)]
1664struct ClusterReplicaStatuses(
1665 BTreeMap<ClusterId, BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>>>,
1666);
1667
1668impl ClusterReplicaStatuses {
1669 pub(crate) fn new() -> ClusterReplicaStatuses {
1670 ClusterReplicaStatuses(BTreeMap::new())
1671 }
1672
1673 pub(crate) fn initialize_cluster_statuses(&mut self, cluster_id: ClusterId) {
1677 let prev = self.0.insert(cluster_id, BTreeMap::new());
1678 assert_eq!(
1679 prev, None,
1680 "cluster {cluster_id} statuses already initialized"
1681 );
1682 }
1683
1684 pub(crate) fn initialize_cluster_replica_statuses(
1688 &mut self,
1689 cluster_id: ClusterId,
1690 replica_id: ReplicaId,
1691 num_processes: usize,
1692 time: DateTime<Utc>,
1693 ) {
1694 tracing::info!(
1695 ?cluster_id,
1696 ?replica_id,
1697 ?time,
1698 "initializing cluster replica status"
1699 );
1700 let replica_statuses = self.0.entry(cluster_id).or_default();
1701 let process_statuses = (0..num_processes)
1702 .map(|process_id| {
1703 let status = ClusterReplicaProcessStatus {
1704 status: ClusterStatus::Offline(Some(OfflineReason::Initializing)),
1705 time: time.clone(),
1706 };
1707 (u64::cast_from(process_id), status)
1708 })
1709 .collect();
1710 let prev = replica_statuses.insert(replica_id, process_statuses);
1711 assert_none!(
1712 prev,
1713 "cluster replica {cluster_id}.{replica_id} statuses already initialized"
1714 );
1715 }
1716
1717 pub(crate) fn remove_cluster_statuses(
1721 &mut self,
1722 cluster_id: &ClusterId,
1723 ) -> BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>> {
1724 let prev = self.0.remove(cluster_id);
1725 prev.unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"))
1726 }
1727
1728 pub(crate) fn remove_cluster_replica_statuses(
1732 &mut self,
1733 cluster_id: &ClusterId,
1734 replica_id: &ReplicaId,
1735 ) -> BTreeMap<ProcessId, ClusterReplicaProcessStatus> {
1736 let replica_statuses = self
1737 .0
1738 .get_mut(cluster_id)
1739 .unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"));
1740 let prev = replica_statuses.remove(replica_id);
1741 prev.unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1742 }
1743
1744 pub(crate) fn ensure_cluster_status(
1748 &mut self,
1749 cluster_id: ClusterId,
1750 replica_id: ReplicaId,
1751 process_id: ProcessId,
1752 status: ClusterReplicaProcessStatus,
1753 ) {
1754 let replica_statuses = self
1755 .0
1756 .get_mut(&cluster_id)
1757 .unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"))
1758 .get_mut(&replica_id)
1759 .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"));
1760 replica_statuses.insert(process_id, status);
1761 }
1762
1763 pub fn get_cluster_replica_status(
1767 &self,
1768 cluster_id: ClusterId,
1769 replica_id: ReplicaId,
1770 ) -> ClusterStatus {
1771 let process_status = self.get_cluster_replica_statuses(cluster_id, replica_id);
1772 Self::cluster_replica_status(process_status)
1773 }
1774
1775 pub fn cluster_replica_status(
1777 process_status: &BTreeMap<ProcessId, ClusterReplicaProcessStatus>,
1778 ) -> ClusterStatus {
1779 process_status
1780 .values()
1781 .fold(ClusterStatus::Online, |s, p| match (s, p.status) {
1782 (ClusterStatus::Online, ClusterStatus::Online) => ClusterStatus::Online,
1783 (x, y) => {
1784 let reason_x = match x {
1785 ClusterStatus::Offline(reason) => reason,
1786 ClusterStatus::Online => None,
1787 };
1788 let reason_y = match y {
1789 ClusterStatus::Offline(reason) => reason,
1790 ClusterStatus::Online => None,
1791 };
1792 ClusterStatus::Offline(reason_x.or(reason_y))
1794 }
1795 })
1796 }
1797
1798 pub(crate) fn get_cluster_replica_statuses(
1802 &self,
1803 cluster_id: ClusterId,
1804 replica_id: ReplicaId,
1805 ) -> &BTreeMap<ProcessId, ClusterReplicaProcessStatus> {
1806 self.try_get_cluster_replica_statuses(cluster_id, replica_id)
1807 .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1808 }
1809
1810 pub(crate) fn try_get_cluster_replica_statuses(
1812 &self,
1813 cluster_id: ClusterId,
1814 replica_id: ReplicaId,
1815 ) -> Option<&BTreeMap<ProcessId, ClusterReplicaProcessStatus>> {
1816 self.try_get_cluster_statuses(cluster_id)
1817 .and_then(|statuses| statuses.get(&replica_id))
1818 }
1819
1820 pub(crate) fn try_get_cluster_statuses(
1822 &self,
1823 cluster_id: ClusterId,
1824 ) -> Option<&BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>>> {
1825 self.0.get(&cluster_id)
1826 }
1827}
1828
1829#[derive(Derivative)]
1831#[derivative(Debug)]
1832pub struct Coordinator {
1833 #[derivative(Debug = "ignore")]
1835 controller: mz_controller::Controller,
1836 catalog: Arc<Catalog>,
1844
1845 persist_client: PersistClient,
1848
1849 internal_cmd_tx: mpsc::UnboundedSender<Message>,
1851 group_commit_tx: appends::GroupCommitNotifier,
1853
1854 strict_serializable_reads_tx: mpsc::UnboundedSender<(ConnectionId, PendingReadTxn)>,
1856
1857 global_timelines: BTreeMap<Timeline, TimelineState>,
1860
1861 transient_id_gen: Arc<TransientIdGen>,
1863 active_conns: BTreeMap<ConnectionId, ConnMeta>,
1866
1867 txn_read_holds: BTreeMap<ConnectionId, read_policy::ReadHolds>,
1871
1872 pending_peeks: BTreeMap<Uuid, PendingPeek>,
1876 client_pending_peeks: BTreeMap<ConnectionId, BTreeMap<Uuid, ClusterId>>,
1878
1879 pending_linearize_read_txns: BTreeMap<ConnectionId, PendingReadTxn>,
1881
1882 active_compute_sinks: BTreeMap<GlobalId, ActiveComputeSink>,
1884 active_webhooks: BTreeMap<CatalogItemId, WebhookAppenderInvalidator>,
1886 active_copies: BTreeMap<ConnectionId, ActiveCopyFrom>,
1889
1890 connection_cancel_watches: BTreeMap<ConnectionId, (watch::Sender<bool>, watch::Receiver<bool>)>,
1898 introspection_subscribes: BTreeMap<GlobalId, IntrospectionSubscribe>,
1900
1901 write_locks: BTreeMap<CatalogItemId, Arc<tokio::sync::Mutex<()>>>,
1903 deferred_write_ops: BTreeMap<ConnectionId, DeferredOp>,
1905
1906 pending_writes: Vec<PendingWriteTxn>,
1908
1909 advance_timelines_interval: Interval,
1919
1920 serialized_ddl: LockedVecDeque<DeferredPlanStatement>,
1929
1930 secrets_controller: Arc<dyn SecretsController>,
1933 caching_secrets_reader: CachingSecretsReader,
1935
1936 cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
1939
1940 storage_usage_client: StorageUsageClient,
1942 storage_usage_collection_interval: Duration,
1944
1945 #[derivative(Debug = "ignore")]
1947 segment_client: Option<mz_segment::Client>,
1948
1949 metrics: Metrics,
1951 optimizer_metrics: OptimizerMetrics,
1953
1954 tracing_handle: TracingHandle,
1956
1957 statement_logging: StatementLogging,
1959
1960 webhook_concurrency_limit: WebhookConcurrencyLimiter,
1962
1963 timestamp_oracle_config: Option<TimestampOracleConfig>,
1966
1967 check_cluster_scheduling_policies_interval: Interval,
1969
1970 cluster_scheduling_decisions: BTreeMap<ClusterId, BTreeMap<&'static str, SchedulingDecision>>,
1974
1975 caught_up_check_interval: Interval,
1978
1979 caught_up_check: Option<CaughtUpCheckContext>,
1982
1983 catalog_info_metrics_registry: MetricsRegistry,
1986
1987 installed_watch_sets: BTreeMap<WatchSetId, (ConnectionId, WatchSetResponse)>,
1989
1990 connection_watch_sets: BTreeMap<ConnectionId, BTreeSet<WatchSetId>>,
1992
1993 cluster_replica_statuses: ClusterReplicaStatuses,
1995
1996 read_only_controllers: bool,
2000
2001 buffered_builtin_table_updates: Option<Vec<BuiltinTableUpdate>>,
2009
2010 license_key: ValidatedLicenseKey,
2011
2012 user_id_pool: IdPool,
2014}
2015
2016impl Coordinator {
2017 pub(crate) fn push_replica_dyncfg_overrides(&mut self) {
2023 let replica_overrides = self
2026 .catalog()
2027 .state()
2028 .scoped_system_parameters()
2029 .replica
2030 .clone();
2031
2032 let dyncfgs = self.catalog().system_config().dyncfgs();
2033 let mut instance_overrides: BTreeMap<
2034 ComputeInstanceId,
2035 BTreeMap<ReplicaId, ConfigUpdates>,
2036 > = BTreeMap::new();
2037 for cluster in self.catalog().clusters() {
2038 for replica in cluster.replicas() {
2039 let Some(values) = replica_overrides.get(&replica.replica_id) else {
2040 continue;
2041 };
2042 let mut updates = ConfigUpdates::default();
2043 for (name, value) in values {
2044 let Some(entry) = dyncfgs.entry(name) else {
2045 continue;
2048 };
2049 match entry.parse_val(value) {
2050 Ok(val) => updates.add_dynamic(name, val),
2051 Err(e) => {
2052 tracing::warn!(%name, %value, "cannot parse scoped override: {e}")
2053 }
2054 }
2055 }
2056 if !updates.updates.is_empty() {
2057 instance_overrides
2058 .entry(cluster.id)
2059 .or_default()
2060 .insert(replica.replica_id, updates);
2061 }
2062 }
2063 }
2064
2065 self.controller
2076 .compute
2077 .update_replica_dyncfg_overrides(instance_overrides);
2078 let compute_config = crate::flags::compute_config(self.catalog().system_config());
2084 self.controller.compute.update_configuration(compute_config);
2085 }
2086
2087 #[instrument(name = "coord::bootstrap")]
2091 pub(crate) async fn bootstrap(
2092 &mut self,
2093 boot_ts: Timestamp,
2094 migrated_storage_collections_0dt: BTreeSet<CatalogItemId>,
2095 mut builtin_table_updates: Vec<BuiltinTableUpdate>,
2096 cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
2097 uncached_local_exprs: BTreeMap<GlobalId, LocalExpressions>,
2098 audit_logs_iterator: AuditLogIterator,
2099 ) -> Result<(), AdapterError> {
2100 let bootstrap_start = Instant::now();
2101 info!("startup: coordinator init: bootstrap beginning");
2102 info!("startup: coordinator init: bootstrap: preamble beginning");
2103
2104 let cluster_statuses: Vec<(_, Vec<_>)> = self
2107 .catalog()
2108 .clusters()
2109 .map(|cluster| {
2110 (
2111 cluster.id(),
2112 cluster
2113 .replicas()
2114 .map(|replica| {
2115 (replica.replica_id, replica.config.location.num_processes())
2116 })
2117 .collect(),
2118 )
2119 })
2120 .collect();
2121 let now = self.now_datetime();
2122 for (cluster_id, replica_statuses) in cluster_statuses {
2123 self.cluster_replica_statuses
2124 .initialize_cluster_statuses(cluster_id);
2125 for (replica_id, num_processes) in replica_statuses {
2126 self.cluster_replica_statuses
2127 .initialize_cluster_replica_statuses(
2128 cluster_id,
2129 replica_id,
2130 num_processes,
2131 now,
2132 );
2133 }
2134 }
2135
2136 let system_config = self.catalog().system_config();
2137
2138 mz_metrics::update_dyncfg(&system_config.dyncfg_updates());
2140
2141 let compute_config = flags::compute_config(system_config);
2143 let storage_config = flags::storage_config(system_config);
2144 let scheduling_config = flags::orchestrator_scheduling_config(system_config);
2145 let dyncfg_updates = system_config.dyncfg_updates();
2146 self.controller.compute.update_configuration(compute_config);
2147 self.controller.storage.update_parameters(storage_config);
2148 self.controller
2149 .update_orchestrator_scheduling_config(scheduling_config);
2150 self.controller.update_configuration(dyncfg_updates);
2151
2152 let enforce_credit_limit_at_bootstrap = !matches!(
2157 self.license_key.expiration_behavior,
2158 ExpirationBehavior::DisableClusterCreation,
2159 );
2160 if enforce_credit_limit_at_bootstrap {
2161 self.validate_resource_limit_numeric(
2162 Numeric::zero(),
2163 self.current_credit_consumption_rate(),
2164 |system_vars| {
2165 self.license_key
2166 .max_credit_consumption_rate()
2167 .map_or_else(|| system_vars.max_credit_consumption_rate(), Numeric::from)
2168 },
2169 "cluster replica",
2170 MAX_CREDIT_CONSUMPTION_RATE.name(),
2171 )?;
2172 }
2173
2174 let mut policies_to_set: BTreeMap<CompactionWindow, CollectionIdBundle> =
2175 Default::default();
2176
2177 let enable_worker_core_affinity =
2178 self.catalog().system_config().enable_worker_core_affinity();
2179 let enable_storage_introspection_logs = self
2180 .catalog()
2181 .system_config()
2182 .enable_storage_introspection_logs();
2183 for instance in self.catalog.clusters() {
2184 self.controller.create_cluster(
2185 instance.id,
2186 ClusterConfig {
2187 arranged_logs: instance.log_indexes.clone(),
2188 workload_class: instance.config.workload_class.clone(),
2189 },
2190 )?;
2191 for replica in instance.replicas() {
2192 let role = instance.role();
2193 self.controller.create_replica(
2194 instance.id,
2195 replica.replica_id,
2196 instance.name.clone(),
2197 replica.name.clone(),
2198 role,
2199 replica.config.clone(),
2200 enable_worker_core_affinity,
2201 enable_storage_introspection_logs,
2202 )?;
2203 }
2204 }
2205
2206 self.push_replica_dyncfg_overrides();
2218
2219 info!(
2220 "startup: coordinator init: bootstrap: preamble complete in {:?}",
2221 bootstrap_start.elapsed()
2222 );
2223
2224 let init_storage_collections_start = Instant::now();
2225 info!("startup: coordinator init: bootstrap: storage collections init beginning");
2226 self.bootstrap_storage_collections(&migrated_storage_collections_0dt)
2227 .await;
2228 info!(
2229 "startup: coordinator init: bootstrap: storage collections init complete in {:?}",
2230 init_storage_collections_start.elapsed()
2231 );
2232
2233 self.controller.start_compute_introspection_sink();
2238
2239 let sorting_start = Instant::now();
2240 info!("startup: coordinator init: bootstrap: sorting catalog entries");
2241 let entries = self.bootstrap_sort_catalog_entries();
2242 info!(
2243 "startup: coordinator init: bootstrap: sorting catalog entries complete in {:?}",
2244 sorting_start.elapsed()
2245 );
2246
2247 let optimize_dataflows_start = Instant::now();
2248 info!("startup: coordinator init: bootstrap: optimize dataflow plans beginning");
2249 let uncached_global_exps = self.bootstrap_dataflow_plans(&entries, cached_global_exprs)?;
2250 info!(
2251 "startup: coordinator init: bootstrap: optimize dataflow plans complete in {:?}",
2252 optimize_dataflows_start.elapsed()
2253 );
2254
2255 let _fut = self.catalog().update_expression_cache(
2257 uncached_local_exprs.into_iter().collect(),
2258 uncached_global_exps.into_iter().collect(),
2259 Default::default(),
2260 );
2261
2262 let bootstrap_as_ofs_start = Instant::now();
2266 info!("startup: coordinator init: bootstrap: dataflow as-of bootstrapping beginning");
2267 let dataflow_read_holds = self.bootstrap_dataflow_as_ofs().await;
2268 info!(
2269 "startup: coordinator init: bootstrap: dataflow as-of bootstrapping complete in {:?}",
2270 bootstrap_as_ofs_start.elapsed()
2271 );
2272
2273 let postamble_start = Instant::now();
2274 info!("startup: coordinator init: bootstrap: postamble beginning");
2275
2276 let logs: BTreeSet<_> = BUILTINS::logs()
2277 .map(|log| self.catalog().resolve_builtin_log(log))
2278 .flat_map(|item_id| self.catalog().get_global_ids(&item_id))
2279 .collect();
2280
2281 let mut privatelink_connections = BTreeMap::new();
2282
2283 for entry in &entries {
2284 debug!(
2285 "coordinator init: installing {} {}",
2286 entry.item().typ(),
2287 entry.id()
2288 );
2289 let mut policy = entry.item().initial_logical_compaction_window();
2290 match entry.item() {
2291 CatalogItem::Source(source) => {
2297 if source.custom_logical_compaction_window.is_none() {
2299 if let DataSourceDesc::IngestionExport { ingestion_id, .. } =
2300 source.data_source
2301 {
2302 policy = Some(
2303 self.catalog()
2304 .get_entry(&ingestion_id)
2305 .source()
2306 .expect("must be source")
2307 .custom_logical_compaction_window
2308 .unwrap_or_default(),
2309 );
2310 }
2311 }
2312 policies_to_set
2313 .entry(policy.expect("sources have a compaction window"))
2314 .or_insert_with(Default::default)
2315 .storage_ids
2316 .insert(source.global_id());
2317 }
2318 CatalogItem::Table(table) => {
2319 policies_to_set
2320 .entry(policy.expect("tables have a compaction window"))
2321 .or_insert_with(Default::default)
2322 .storage_ids
2323 .extend(table.global_ids());
2324 }
2325 CatalogItem::Index(idx) => {
2326 let policy_entry = policies_to_set
2327 .entry(policy.expect("indexes have a compaction window"))
2328 .or_insert_with(Default::default);
2329
2330 if logs.contains(&idx.on) {
2331 policy_entry
2332 .compute_ids
2333 .entry(idx.cluster_id)
2334 .or_insert_with(BTreeSet::new)
2335 .insert(idx.global_id());
2336 } else {
2337 let df_desc = self
2338 .catalog()
2339 .try_get_physical_plan(&idx.global_id())
2340 .expect("added in `bootstrap_dataflow_plans`")
2341 .clone();
2342
2343 let df_meta = self
2344 .catalog()
2345 .try_get_dataflow_metainfo(&idx.global_id())
2346 .expect("added in `bootstrap_dataflow_plans`");
2347
2348 if self.catalog().state().system_config().enable_mz_notices() {
2349 self.catalog().state().pack_optimizer_notices(
2351 &mut builtin_table_updates,
2352 df_meta.optimizer_notices.iter(),
2353 Diff::ONE,
2354 );
2355 }
2356
2357 policy_entry
2360 .compute_ids
2361 .entry(idx.cluster_id)
2362 .or_insert_with(Default::default)
2363 .extend(df_desc.export_ids());
2364
2365 self.controller
2366 .compute
2367 .create_dataflow(idx.cluster_id, df_desc, None)
2368 .unwrap_or_terminate("cannot fail to create dataflows");
2369 }
2370 }
2371 CatalogItem::View(_) => (),
2372 CatalogItem::MaterializedView(mview) => {
2373 policies_to_set
2374 .entry(policy.expect("materialized views have a compaction window"))
2375 .or_insert_with(Default::default)
2376 .storage_ids
2377 .insert(mview.global_id_writes());
2378
2379 let mut df_desc = self
2380 .catalog()
2381 .try_get_physical_plan(&mview.global_id_writes())
2382 .expect("added in `bootstrap_dataflow_plans`")
2383 .clone();
2384
2385 if let Some(initial_as_of) = mview.initial_as_of.clone() {
2386 df_desc.set_initial_as_of(initial_as_of);
2387 }
2388
2389 let until = mview
2391 .refresh_schedule
2392 .as_ref()
2393 .and_then(|s| s.last_refresh())
2394 .and_then(|r| r.try_step_forward());
2395 if let Some(until) = until {
2396 df_desc.until.meet_assign(&Antichain::from_elem(until));
2397 }
2398
2399 let df_meta = self
2400 .catalog()
2401 .try_get_dataflow_metainfo(&mview.global_id_writes())
2402 .expect("added in `bootstrap_dataflow_plans`");
2403
2404 if self.catalog().state().system_config().enable_mz_notices() {
2405 self.catalog().state().pack_optimizer_notices(
2407 &mut builtin_table_updates,
2408 df_meta.optimizer_notices.iter(),
2409 Diff::ONE,
2410 );
2411 }
2412
2413 self.ship_dataflow(df_desc, mview.cluster_id, mview.target_replica)
2414 .await;
2415
2416 if mview.replacement_target.is_none() {
2419 self.allow_writes(mview.cluster_id, mview.global_id_writes());
2420 }
2421 }
2422 CatalogItem::Sink(sink) => {
2423 policies_to_set
2424 .entry(CompactionWindow::Default)
2425 .or_insert_with(Default::default)
2426 .storage_ids
2427 .insert(sink.global_id());
2428 }
2429 CatalogItem::Connection(catalog_connection) => {
2430 if let ConnectionDetails::AwsPrivatelink(conn) = &catalog_connection.details {
2431 privatelink_connections.insert(
2432 entry.id(),
2433 VpcEndpointConfig {
2434 aws_service_name: conn.service_name.clone(),
2435 availability_zone_ids: conn.availability_zones.clone(),
2436 },
2437 );
2438 }
2439 }
2440 CatalogItem::Log(_)
2442 | CatalogItem::Type(_)
2443 | CatalogItem::Func(_)
2444 | CatalogItem::Secret(_) => {}
2445 }
2446 }
2447
2448 if let Some(cloud_resource_controller) = &self.cloud_resource_controller {
2449 let existing_vpc_endpoints = cloud_resource_controller
2451 .list_vpc_endpoints()
2452 .await
2453 .context("list vpc endpoints")?;
2454 let existing_vpc_endpoints = BTreeSet::from_iter(existing_vpc_endpoints.into_keys());
2455 let desired_vpc_endpoints = privatelink_connections.keys().cloned().collect();
2456 let vpc_endpoints_to_remove = existing_vpc_endpoints.difference(&desired_vpc_endpoints);
2457 for id in vpc_endpoints_to_remove {
2458 cloud_resource_controller
2459 .delete_vpc_endpoint(*id)
2460 .await
2461 .context("deleting extraneous vpc endpoint")?;
2462 }
2463
2464 for (id, spec) in privatelink_connections {
2466 cloud_resource_controller
2467 .ensure_vpc_endpoint(id, spec)
2468 .await
2469 .context("ensuring vpc endpoint")?;
2470 }
2471 }
2472
2473 drop(dataflow_read_holds);
2476 for (cw, policies) in policies_to_set {
2478 self.initialize_read_policies(&policies, cw).await;
2479 }
2480
2481 builtin_table_updates.extend(
2483 self.catalog().state().resolve_builtin_table_updates(
2484 self.catalog().state().pack_all_replica_size_updates(),
2485 ),
2486 );
2487
2488 debug!("startup: coordinator init: bootstrap: initializing migrated builtin tables");
2489 let migrated_updates_fut = if self.controller.read_only() {
2495 let min_timestamp = Timestamp::minimum();
2496 let migrated_builtin_table_updates: Vec<_> = builtin_table_updates
2497 .extract_if(.., |update| {
2498 let gid = self.catalog().get_entry(&update.id).latest_global_id();
2499 migrated_storage_collections_0dt.contains(&update.id)
2500 && self
2501 .controller
2502 .storage_collections
2503 .collection_frontiers(gid)
2504 .expect("all tables are registered")
2505 .write_frontier
2506 .elements()
2507 == &[min_timestamp]
2508 })
2509 .collect();
2510 if migrated_builtin_table_updates.is_empty() {
2511 futures::future::ready(()).boxed()
2512 } else {
2513 let mut grouped_appends: BTreeMap<GlobalId, Vec<TableData>> = BTreeMap::new();
2515 for update in migrated_builtin_table_updates {
2516 let gid = self.catalog().get_entry(&update.id).latest_global_id();
2517 grouped_appends.entry(gid).or_default().push(update.data);
2518 }
2519 info!(
2520 "coordinator init: rehydrating migrated builtin tables in read-only mode: {:?}",
2521 grouped_appends.keys().collect::<Vec<_>>()
2522 );
2523
2524 let mut all_appends = Vec::with_capacity(grouped_appends.len());
2526 for (item_id, table_data) in grouped_appends.into_iter() {
2527 let mut all_rows = Vec::new();
2528 let mut all_data = Vec::new();
2529 for data in table_data {
2530 match data {
2531 TableData::Rows(rows) => all_rows.extend(rows),
2532 TableData::Batches(_) => all_data.push(data),
2533 }
2534 }
2535 differential_dataflow::consolidation::consolidate(&mut all_rows);
2536 all_data.push(TableData::Rows(all_rows));
2537
2538 all_appends.push((item_id, all_data));
2540 }
2541
2542 let fut = self
2543 .controller
2544 .storage
2545 .append_table(min_timestamp, boot_ts.step_forward(), all_appends)
2546 .expect("cannot fail to append");
2547 async {
2548 fut.await
2549 .expect("One-shot shouldn't be dropped during bootstrap")
2550 .unwrap_or_terminate("cannot fail to append")
2551 }
2552 .boxed()
2553 }
2554 } else {
2555 futures::future::ready(()).boxed()
2556 };
2557
2558 info!(
2559 "startup: coordinator init: bootstrap: postamble complete in {:?}",
2560 postamble_start.elapsed()
2561 );
2562
2563 let builtin_update_start = Instant::now();
2564 info!("startup: coordinator init: bootstrap: generate builtin updates beginning");
2565
2566 if self.controller.read_only() {
2567 info!(
2568 "coordinator init: bootstrap: stashing builtin table updates while in read-only mode"
2569 );
2570
2571 let audit_join_start = Instant::now();
2573 info!("startup: coordinator init: bootstrap: audit log deserialization beginning");
2574 let audit_log_updates: Vec<_> = audit_logs_iterator
2575 .map(|(audit_log, ts)| StateUpdate {
2576 kind: StateUpdateKind::AuditLog(audit_log),
2577 ts,
2578 diff: StateDiff::Addition,
2579 })
2580 .collect();
2581 let audit_log_builtin_table_updates = self
2582 .catalog()
2583 .state()
2584 .generate_builtin_table_updates(audit_log_updates);
2585 builtin_table_updates.extend(audit_log_builtin_table_updates);
2586 info!(
2587 "startup: coordinator init: bootstrap: audit log deserialization complete in {:?}",
2588 audit_join_start.elapsed()
2589 );
2590 self.buffered_builtin_table_updates
2591 .as_mut()
2592 .expect("in read-only mode")
2593 .append(&mut builtin_table_updates);
2594 } else {
2595 self.bootstrap_tables(&entries, builtin_table_updates, audit_logs_iterator)
2596 .await;
2597 };
2598 info!(
2599 "startup: coordinator init: bootstrap: generate builtin updates complete in {:?}",
2600 builtin_update_start.elapsed()
2601 );
2602
2603 let cleanup_secrets_start = Instant::now();
2604 info!("startup: coordinator init: bootstrap: generate secret cleanup beginning");
2605 {
2609 let Self {
2612 secrets_controller,
2613 catalog,
2614 ..
2615 } = self;
2616
2617 let next_user_item_id = catalog.get_next_user_item_id().await?;
2618 let next_system_item_id = catalog.get_next_system_item_id().await?;
2619 let read_only = self.controller.read_only();
2620 let catalog_ids: BTreeSet<CatalogItemId> =
2625 catalog.entries().map(|entry| entry.id()).collect();
2626 let secrets_controller = Arc::clone(secrets_controller);
2627
2628 spawn(|| "cleanup-orphaned-secrets", async move {
2629 if read_only {
2630 info!(
2631 "coordinator init: not cleaning up orphaned secrets while in read-only mode"
2632 );
2633 return;
2634 }
2635 info!("coordinator init: cleaning up orphaned secrets");
2636
2637 match secrets_controller.list().await {
2638 Ok(controller_secrets) => {
2639 let controller_secrets: BTreeSet<CatalogItemId> =
2640 controller_secrets.into_iter().collect();
2641 let orphaned = controller_secrets.difference(&catalog_ids);
2642 for id in orphaned {
2643 let id_too_large = match id {
2644 CatalogItemId::System(id) => *id >= next_system_item_id,
2645 CatalogItemId::User(id) => *id >= next_user_item_id,
2646 CatalogItemId::IntrospectionSourceIndex(_)
2647 | CatalogItemId::Transient(_) => false,
2648 };
2649 if id_too_large {
2650 info!(
2651 %next_user_item_id, %next_system_item_id,
2652 "coordinator init: not deleting orphaned secret {id} that was likely created by a newer deploy generation"
2653 );
2654 } else {
2655 info!("coordinator init: deleting orphaned secret {id}");
2656 fail_point!("orphan_secrets");
2657 if let Err(e) = secrets_controller.delete(*id).await {
2658 warn!(
2659 "Dropping orphaned secret has encountered an error: {}",
2660 e
2661 );
2662 }
2663 }
2664 }
2665 }
2666 Err(e) => warn!("Failed to list secrets during orphan cleanup: {:?}", e),
2667 }
2668 });
2669 }
2670 info!(
2671 "startup: coordinator init: bootstrap: generate secret cleanup complete in {:?}",
2672 cleanup_secrets_start.elapsed()
2673 );
2674
2675 let final_steps_start = Instant::now();
2677 info!(
2678 "startup: coordinator init: bootstrap: migrate builtin tables in read-only mode beginning"
2679 );
2680 migrated_updates_fut
2681 .instrument(info_span!("coord::bootstrap::final"))
2682 .await;
2683
2684 debug!(
2685 "startup: coordinator init: bootstrap: announcing completion of initialization to controller"
2686 );
2687 self.controller.initialization_complete();
2689
2690 self.bootstrap_introspection_subscribes().await;
2692
2693 info!(
2694 "startup: coordinator init: bootstrap: migrate builtin tables in read-only mode complete in {:?}",
2695 final_steps_start.elapsed()
2696 );
2697
2698 info!(
2699 "startup: coordinator init: bootstrap complete in {:?}",
2700 bootstrap_start.elapsed()
2701 );
2702 Ok(())
2703 }
2704
2705 #[allow(clippy::async_yields_async)]
2710 #[instrument]
2711 async fn bootstrap_tables(
2712 &mut self,
2713 entries: &[CatalogEntry],
2714 mut builtin_table_updates: Vec<BuiltinTableUpdate>,
2715 audit_logs_iterator: AuditLogIterator,
2716 ) {
2717 struct TableMetadata<'a> {
2719 id: CatalogItemId,
2720 name: &'a QualifiedItemName,
2721 table: &'a Table,
2722 }
2723
2724 let table_metas: Vec<_> = entries
2726 .into_iter()
2727 .filter_map(|entry| {
2728 entry.table().map(|table| TableMetadata {
2729 id: entry.id(),
2730 name: entry.name(),
2731 table,
2732 })
2733 })
2734 .collect();
2735
2736 debug!("coordinator init: advancing all tables to current timestamp");
2738 let WriteTimestamp {
2739 timestamp: write_ts,
2740 advance_to,
2741 } = self.get_local_write_ts().await;
2742 let appends = table_metas
2743 .iter()
2744 .map(|meta| (meta.table.global_id_writes(), Vec::new()))
2745 .collect();
2746 let table_fence_rx = self
2750 .controller
2751 .storage
2752 .append_table(write_ts.clone(), advance_to, appends)
2753 .expect("invalid updates");
2754
2755 self.apply_local_write(write_ts).await;
2756
2757 debug!("coordinator init: resetting system tables");
2759 let read_ts = self.get_local_read_ts().await;
2760
2761 let mz_storage_usage_by_shard_schema: SchemaSpecifier = self
2764 .catalog()
2765 .resolve_system_schema(MZ_STORAGE_USAGE_BY_SHARD.schema)
2766 .into();
2767 let is_storage_usage_by_shard = |meta: &TableMetadata| -> bool {
2768 meta.name.item == MZ_STORAGE_USAGE_BY_SHARD.name
2769 && meta.name.qualifiers.schema_spec == mz_storage_usage_by_shard_schema
2770 };
2771
2772 let mut retraction_tasks = Vec::new();
2773 let mut system_tables: Vec<_> = table_metas
2774 .iter()
2775 .filter(|meta| meta.id.is_system() && !is_storage_usage_by_shard(meta))
2776 .collect();
2777
2778 let (audit_events_idx, _) = system_tables
2780 .iter()
2781 .find_position(|table| {
2782 table.id == self.catalog().resolve_builtin_table(&MZ_AUDIT_EVENTS)
2783 })
2784 .expect("mz_audit_events must exist");
2785 let audit_events = system_tables.remove(audit_events_idx);
2786 let audit_log_task = self.bootstrap_audit_log_table(
2787 audit_events.id,
2788 audit_events.name,
2789 audit_events.table,
2790 audit_logs_iterator,
2791 read_ts,
2792 );
2793
2794 for system_table in system_tables {
2795 let table_id = system_table.id;
2796 let full_name = self.catalog().resolve_full_name(system_table.name, None);
2797 debug!("coordinator init: resetting system table {full_name} ({table_id})");
2798
2799 let snapshot_fut = self
2801 .controller
2802 .storage_collections
2803 .snapshot_cursor(system_table.table.global_id_writes(), read_ts);
2804 let batch_fut = self
2805 .controller
2806 .storage_collections
2807 .create_update_builder(system_table.table.global_id_writes());
2808
2809 let task = spawn(|| format!("snapshot-{table_id}"), async move {
2810 let mut batch = batch_fut
2812 .await
2813 .unwrap_or_terminate("cannot fail to create a batch for a BuiltinTable");
2814 tracing::info!(?table_id, "starting snapshot");
2815 let mut snapshot_cursor = snapshot_fut
2817 .await
2818 .unwrap_or_terminate("cannot fail to snapshot");
2819
2820 while let Some(values) = snapshot_cursor.next().await {
2822 for (key, _t, d) in values {
2823 let d_invert = d.neg();
2824 batch.add(&key, &(), &d_invert).await;
2825 }
2826 }
2827 tracing::info!(?table_id, "finished snapshot");
2828
2829 let batch = batch.finish().await;
2830 BuiltinTableUpdate::batch(table_id, batch)
2831 });
2832 retraction_tasks.push(task);
2833 }
2834
2835 let retractions_res = futures::future::join_all(retraction_tasks).await;
2836 for retractions in retractions_res {
2837 builtin_table_updates.push(retractions);
2838 }
2839
2840 let audit_join_start = Instant::now();
2841 info!("startup: coordinator init: bootstrap: join audit log deserialization beginning");
2842 let audit_log_updates = audit_log_task.await;
2843 let audit_log_builtin_table_updates = self
2844 .catalog()
2845 .state()
2846 .generate_builtin_table_updates(audit_log_updates);
2847 builtin_table_updates.extend(audit_log_builtin_table_updates);
2848 info!(
2849 "startup: coordinator init: bootstrap: join audit log deserialization complete in {:?}",
2850 audit_join_start.elapsed()
2851 );
2852
2853 table_fence_rx
2855 .await
2856 .expect("One-shot shouldn't be dropped during bootstrap")
2857 .unwrap_or_terminate("cannot fail to append");
2858
2859 info!("coordinator init: sending builtin table updates");
2860 let (_builtin_updates_fut, write_ts) = self
2861 .builtin_table_update()
2862 .execute(builtin_table_updates)
2863 .await;
2864 info!(?write_ts, "our write ts");
2865 if let Some(write_ts) = write_ts {
2866 self.apply_local_write(write_ts).await;
2867 }
2868 }
2869
2870 #[instrument]
2874 fn bootstrap_audit_log_table<'a>(
2875 &self,
2876 table_id: CatalogItemId,
2877 name: &'a QualifiedItemName,
2878 table: &'a Table,
2879 audit_logs_iterator: AuditLogIterator,
2880 read_ts: Timestamp,
2881 ) -> JoinHandle<Vec<StateUpdate>> {
2882 let full_name = self.catalog().resolve_full_name(name, None);
2883 debug!("coordinator init: reconciling audit log: {full_name} ({table_id})");
2884 let current_contents_fut = self
2885 .controller
2886 .storage_collections
2887 .snapshot(table.global_id_writes(), read_ts);
2888 spawn(|| format!("snapshot-audit-log-{table_id}"), async move {
2889 let current_contents = current_contents_fut
2890 .await
2891 .unwrap_or_terminate("cannot fail to fetch snapshot");
2892 let contents_len = current_contents.len();
2893 debug!("coordinator init: audit log table ({table_id}) size {contents_len}");
2894
2895 let max_table_id = current_contents
2897 .into_iter()
2898 .filter(|(_, diff)| *diff == 1)
2899 .map(|(row, _diff)| row.unpack_first().unwrap_uint64())
2900 .sorted()
2901 .rev()
2902 .next();
2903
2904 audit_logs_iterator
2906 .take_while(|(audit_log, _)| match max_table_id {
2907 Some(id) => audit_log.event.sortable_id() > id,
2908 None => true,
2909 })
2910 .map(|(audit_log, ts)| StateUpdate {
2911 kind: StateUpdateKind::AuditLog(audit_log),
2912 ts,
2913 diff: StateDiff::Addition,
2914 })
2915 .collect::<Vec<_>>()
2916 })
2917 }
2918
2919 #[instrument]
2932 async fn bootstrap_storage_collections(
2933 &mut self,
2934 migrated_storage_collections: &BTreeSet<CatalogItemId>,
2935 ) {
2936 let catalog = self.catalog();
2937
2938 let source_desc = |object_id: GlobalId,
2939 data_source: &DataSourceDesc,
2940 desc: &RelationDesc,
2941 timeline: &Timeline| {
2942 let data_source = match data_source.clone() {
2943 DataSourceDesc::Ingestion { desc, cluster_id } => {
2945 let desc = desc.into_inline_connection(catalog.state());
2946 let ingestion = IngestionDescription::new(desc, cluster_id, object_id);
2947 DataSource::Ingestion(ingestion)
2948 }
2949 DataSourceDesc::OldSyntaxIngestion {
2950 desc,
2951 progress_subsource,
2952 data_config,
2953 details,
2954 cluster_id,
2955 } => {
2956 let desc = desc.into_inline_connection(catalog.state());
2957 let data_config = data_config.into_inline_connection(catalog.state());
2958 let progress_subsource =
2961 catalog.get_entry(&progress_subsource).latest_global_id();
2962 let mut ingestion =
2963 IngestionDescription::new(desc, cluster_id, progress_subsource);
2964 let legacy_export = SourceExport {
2965 storage_metadata: (),
2966 data_config,
2967 details,
2968 };
2969 ingestion.source_exports.insert(object_id, legacy_export);
2970
2971 DataSource::Ingestion(ingestion)
2972 }
2973 DataSourceDesc::IngestionExport {
2974 ingestion_id,
2975 external_reference: _,
2976 details,
2977 data_config,
2978 } => {
2979 let ingestion_id = catalog.get_entry(&ingestion_id).latest_global_id();
2982
2983 DataSource::IngestionExport {
2984 ingestion_id,
2985 details,
2986 data_config: data_config.into_inline_connection(catalog.state()),
2987 }
2988 }
2989 DataSourceDesc::Webhook { .. } => DataSource::Webhook,
2990 DataSourceDesc::Progress => DataSource::Progress,
2991 DataSourceDesc::Introspection(introspection) => {
2992 DataSource::Introspection(introspection)
2993 }
2994 DataSourceDesc::Catalog => DataSource::Other,
2995 };
2996 CollectionDescription {
2997 desc: desc.clone(),
2998 data_source,
2999 since: None,
3000 timeline: Some(timeline.clone()),
3001 primary: None,
3002 }
3003 };
3004
3005 let mut compute_collections = vec![];
3006 let mut collections = vec![];
3007 for entry in catalog.entries() {
3008 match entry.item() {
3009 CatalogItem::Source(source) => {
3010 collections.push((
3011 source.global_id(),
3012 source_desc(
3013 source.global_id(),
3014 &source.data_source,
3015 &source.desc,
3016 &source.timeline,
3017 ),
3018 ));
3019 }
3020 CatalogItem::Table(table) => {
3021 match &table.data_source {
3022 TableDataSource::TableWrites { defaults: _ } => {
3023 let versions: BTreeMap<_, _> = table
3024 .collection_descs()
3025 .map(|(gid, version, desc)| (version, (gid, desc)))
3026 .collect();
3027 let collection_descs = versions.iter().map(|(version, (gid, desc))| {
3028 let next_version = version.bump();
3029 let primary_collection =
3030 versions.get(&next_version).map(|(gid, _desc)| gid).copied();
3031 let mut collection_desc =
3032 CollectionDescription::for_table(desc.clone());
3033 collection_desc.primary = primary_collection;
3034
3035 (*gid, collection_desc)
3036 });
3037 collections.extend(collection_descs);
3038 }
3039 TableDataSource::DataSource {
3040 desc: data_source_desc,
3041 timeline,
3042 } => {
3043 soft_assert_eq_or_log!(table.collections.len(), 1);
3045 let collection_descs =
3046 table.collection_descs().map(|(gid, _version, desc)| {
3047 (
3048 gid,
3049 source_desc(
3050 entry.latest_global_id(),
3051 data_source_desc,
3052 &desc,
3053 timeline,
3054 ),
3055 )
3056 });
3057 collections.extend(collection_descs);
3058 }
3059 };
3060 }
3061 CatalogItem::MaterializedView(mv) => {
3062 let collection_descs = mv.collection_descs().map(|(gid, _version, desc)| {
3063 let collection_desc =
3064 CollectionDescription::for_other(desc, mv.initial_as_of.clone());
3065 (gid, collection_desc)
3066 });
3067
3068 collections.extend(collection_descs);
3069 compute_collections.push((mv.global_id_writes(), mv.desc.latest()));
3070 }
3071 CatalogItem::Sink(sink) => {
3072 let storage_sink_from_entry = self.catalog().get_entry_by_global_id(&sink.from);
3073 let from_desc = storage_sink_from_entry
3074 .relation_desc()
3075 .expect("sinks can only be built on items with descs")
3076 .into_owned();
3077 let collection_desc = CollectionDescription {
3078 desc: KAFKA_PROGRESS_DESC.clone(),
3080 data_source: DataSource::Sink {
3081 desc: ExportDescription {
3082 sink: StorageSinkDesc {
3083 from: sink.from,
3084 from_desc,
3085 connection: sink
3086 .connection
3087 .clone()
3088 .into_inline_connection(self.catalog().state()),
3089 envelope: sink.envelope,
3090 as_of: Antichain::from_elem(Timestamp::minimum()),
3091 with_snapshot: sink.with_snapshot,
3092 version: sink.version,
3093 from_storage_metadata: (),
3094 to_storage_metadata: (),
3095 commit_interval: sink.commit_interval,
3096 },
3097 instance_id: sink.cluster_id,
3098 },
3099 },
3100 since: None,
3101 timeline: None,
3102 primary: None,
3103 };
3104 collections.push((sink.global_id, collection_desc));
3105 }
3106 CatalogItem::Log(_)
3107 | CatalogItem::View(_)
3108 | CatalogItem::Index(_)
3109 | CatalogItem::Type(_)
3110 | CatalogItem::Func(_)
3111 | CatalogItem::Secret(_)
3112 | CatalogItem::Connection(_) => (),
3113 }
3114 }
3115
3116 let register_ts = if self.controller.read_only() {
3117 self.get_local_read_ts().await
3118 } else {
3119 self.get_local_write_ts().await.timestamp
3122 };
3123
3124 let storage_metadata = self.catalog.state().storage_metadata();
3125 let migrated_storage_collections = migrated_storage_collections
3126 .into_iter()
3127 .flat_map(|item_id| self.catalog.get_entry(item_id).global_ids())
3128 .collect();
3129
3130 self.controller
3135 .storage
3136 .evolve_nullability_for_bootstrap(storage_metadata, compute_collections)
3137 .await
3138 .unwrap_or_terminate("cannot fail to evolve collections");
3139
3140 let mut pending: BTreeMap<_, _> = collections.into_iter().collect();
3153
3154 let transitive_dep_gids: BTreeMap<_, _> = pending
3156 .keys()
3157 .map(|gid| {
3158 let entry = self.catalog.get_entry_by_global_id(gid);
3159 let item_id = entry.id();
3160 let deps = self.catalog.state().transitive_uses(item_id);
3161 let dep_gids: BTreeSet<_> = deps
3162 .filter(|dep_id| *dep_id != item_id)
3165 .map(|dep_id| self.catalog.get_entry(&dep_id).latest_global_id())
3166 .filter(|dep_gid| pending.contains_key(dep_gid))
3168 .collect();
3169 (*gid, dep_gids)
3170 })
3171 .collect();
3172
3173 while !pending.is_empty() {
3174 let ready_gids: BTreeSet<_> = pending
3177 .keys()
3178 .filter(|gid| {
3179 let mut deps = transitive_dep_gids[gid].iter();
3180 !deps.any(|dep_gid| pending.contains_key(dep_gid))
3181 })
3182 .copied()
3183 .collect();
3184 let mut ready: Vec<_> = pending
3185 .extract_if(.., |gid, _| ready_gids.contains(gid))
3186 .collect();
3187
3188 for (gid, collection) in &mut ready {
3190 if !gid.is_system() || collection.since.is_some() {
3192 continue;
3193 }
3194
3195 let mut derived_since = Antichain::from_elem(Timestamp::MIN);
3196 for dep_gid in &transitive_dep_gids[gid] {
3197 let (since, _) = self
3198 .controller
3199 .storage
3200 .collection_frontiers(*dep_gid)
3201 .expect("previously registered");
3202 derived_since.join_assign(&since);
3203 }
3204 collection.since = Some(derived_since);
3205 }
3206
3207 if ready.is_empty() {
3208 soft_panic_or_log!(
3209 "cycle in storage collections: {:?}",
3210 pending.keys().collect::<Vec<_>>(),
3211 );
3212 ready = mem::take(&mut pending).into_iter().collect();
3216 }
3217
3218 self.controller
3219 .storage
3220 .create_collections_for_bootstrap(
3221 storage_metadata,
3222 Some(register_ts),
3223 ready,
3224 &migrated_storage_collections,
3225 )
3226 .await
3227 .unwrap_or_terminate("cannot fail to create collections");
3228 }
3229
3230 if !self.controller.read_only() {
3231 self.apply_local_write(register_ts).await;
3232 }
3233 }
3234
3235 fn bootstrap_sort_catalog_entries(&self) -> Vec<CatalogEntry> {
3242 let mut indexes_on = BTreeMap::<_, Vec<_>>::new();
3243 let mut non_indexes = Vec::new();
3244 for entry in self.catalog().entries().cloned() {
3245 if let Some(index) = entry.index() {
3246 let on = self.catalog().get_entry_by_global_id(&index.on);
3247 indexes_on.entry(on.id()).or_default().push(entry);
3248 } else {
3249 non_indexes.push(entry);
3250 }
3251 }
3252
3253 let key_fn = |entry: &CatalogEntry| entry.id;
3254 let dependencies_fn = |entry: &CatalogEntry| entry.uses();
3255 sort_topological(&mut non_indexes, key_fn, dependencies_fn);
3256
3257 let mut result = Vec::new();
3258 for entry in non_indexes {
3259 let id = entry.id();
3260 result.push(entry);
3261 if let Some(mut indexes) = indexes_on.remove(&id) {
3262 result.append(&mut indexes);
3263 }
3264 }
3265
3266 soft_assert_or_log!(
3267 indexes_on.is_empty(),
3268 "indexes with missing dependencies: {indexes_on:?}",
3269 );
3270
3271 result
3272 }
3273
3274 #[instrument]
3285 fn bootstrap_dataflow_plans(
3286 &mut self,
3287 ordered_catalog_entries: &[CatalogEntry],
3288 mut cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
3289 ) -> Result<BTreeMap<GlobalId, GlobalExpressions>, AdapterError> {
3290 let mut instance_snapshots = BTreeMap::new();
3296 let mut uncached_expressions = BTreeMap::new();
3297
3298 let optimizer_config = |catalog: &Catalog, cluster_id| {
3299 let system_config = catalog.system_config();
3300 let overrides = catalog.get_cluster(cluster_id).config.features();
3301 OptimizerConfig::from(system_config).override_from(&overrides)
3302 };
3303
3304 for entry in ordered_catalog_entries {
3305 match entry.item() {
3306 CatalogItem::Index(idx) => {
3307 let compute_instance =
3309 instance_snapshots.entry(idx.cluster_id).or_insert_with(|| {
3310 self.instance_snapshot(idx.cluster_id)
3311 .expect("compute instance exists")
3312 });
3313 let global_id = idx.global_id();
3314
3315 if compute_instance.contains_collection(&global_id) {
3318 continue;
3319 }
3320
3321 let optimizer_config = optimizer_config(&self.catalog, idx.cluster_id);
3322
3323 let (optimized_plan, physical_plan, metainfo) =
3324 match cached_global_exprs.remove(&global_id) {
3325 Some(global_expressions)
3326 if global_expressions.optimizer_features
3327 == optimizer_config.features =>
3328 {
3329 debug!("global expression cache hit for {global_id:?}");
3330 (
3331 global_expressions.global_mir,
3332 global_expressions.physical_plan,
3333 global_expressions.dataflow_metainfos,
3334 )
3335 }
3336 Some(_) | None => {
3337 let (optimized_plan, global_lir_plan) = {
3338 let mut optimizer = optimize::index::Optimizer::new(
3340 self.owned_catalog(),
3341 compute_instance.clone(),
3342 global_id,
3343 optimizer_config.clone(),
3344 self.optimizer_metrics(),
3345 );
3346
3347 let index_plan = optimize::index::Index::new(
3349 entry.name().clone(),
3350 idx.on,
3351 idx.keys.to_vec(),
3352 );
3353 let global_mir_plan = optimizer.optimize(index_plan)?;
3354 let optimized_plan = global_mir_plan.df_desc().clone();
3355
3356 let global_lir_plan = optimizer.optimize(global_mir_plan)?;
3358
3359 (optimized_plan, global_lir_plan)
3360 };
3361
3362 let (physical_plan, metainfo) = global_lir_plan.unapply();
3363 let metainfo = {
3364 let notice_ids =
3366 std::iter::repeat_with(|| self.allocate_transient_id())
3367 .map(|(_item_id, gid)| gid)
3368 .take(metainfo.optimizer_notices.len())
3369 .collect::<Vec<_>>();
3370 self.catalog().render_notices(
3372 metainfo,
3373 notice_ids,
3374 Some(idx.global_id()),
3375 )
3376 };
3377 uncached_expressions.insert(
3378 global_id,
3379 GlobalExpressions {
3380 global_mir: optimized_plan.clone(),
3381 physical_plan: physical_plan.clone(),
3382 dataflow_metainfos: metainfo.clone(),
3383 optimizer_features: optimizer_config.features.clone(),
3384 },
3385 );
3386 (optimized_plan, physical_plan, metainfo)
3387 }
3388 };
3389
3390 let catalog = self.catalog_mut();
3391 catalog.set_optimized_plan(idx.global_id(), optimized_plan);
3392 catalog.set_physical_plan(idx.global_id(), physical_plan);
3393 catalog.set_dataflow_metainfo(idx.global_id(), metainfo);
3394
3395 compute_instance.insert_collection(idx.global_id());
3396 }
3397 CatalogItem::MaterializedView(mv) => {
3398 let compute_instance =
3400 instance_snapshots.entry(mv.cluster_id).or_insert_with(|| {
3401 self.instance_snapshot(mv.cluster_id)
3402 .expect("compute instance exists")
3403 });
3404 let global_id = mv.global_id_writes();
3405
3406 let optimizer_config = optimizer_config(&self.catalog, mv.cluster_id);
3407
3408 let (optimized_plan, physical_plan, metainfo) = match cached_global_exprs
3409 .remove(&global_id)
3410 {
3411 Some(global_expressions)
3412 if global_expressions.optimizer_features
3413 == optimizer_config.features =>
3414 {
3415 debug!("global expression cache hit for {global_id:?}");
3416 (
3417 global_expressions.global_mir,
3418 global_expressions.physical_plan,
3419 global_expressions.dataflow_metainfos,
3420 )
3421 }
3422 Some(_) | None => {
3423 let (_, internal_view_id) = self.allocate_transient_id();
3424 let debug_name = self
3425 .catalog()
3426 .resolve_full_name(entry.name(), None)
3427 .to_string();
3428
3429 let (optimized_plan, global_lir_plan) = {
3430 let mut optimizer = optimize::materialized_view::Optimizer::new(
3432 self.owned_catalog().as_optimizer_catalog(),
3433 compute_instance.clone(),
3434 global_id,
3435 internal_view_id,
3436 mv.desc.latest().iter_names().cloned().collect(),
3437 mv.non_null_assertions.clone(),
3438 mv.refresh_schedule.clone(),
3439 debug_name,
3440 optimizer_config.clone(),
3441 self.optimizer_metrics(),
3442 );
3443
3444 let typ = infer_sql_type_for_catalog(
3447 &mv.raw_expr,
3448 &mv.locally_optimized_expr.as_ref().clone(),
3449 );
3450 let global_mir_plan = optimizer
3451 .optimize((mv.locally_optimized_expr.as_ref().clone(), typ))?;
3452 let optimized_plan = global_mir_plan.df_desc().clone();
3453
3454 let global_lir_plan = optimizer.optimize(global_mir_plan)?;
3456
3457 (optimized_plan, global_lir_plan)
3458 };
3459
3460 let (physical_plan, metainfo) = global_lir_plan.unapply();
3461 let metainfo = {
3462 let notice_ids =
3464 std::iter::repeat_with(|| self.allocate_transient_id())
3465 .map(|(_item_id, global_id)| global_id)
3466 .take(metainfo.optimizer_notices.len())
3467 .collect::<Vec<_>>();
3468 self.catalog().render_notices(
3470 metainfo,
3471 notice_ids,
3472 Some(mv.global_id_writes()),
3473 )
3474 };
3475 uncached_expressions.insert(
3476 global_id,
3477 GlobalExpressions {
3478 global_mir: optimized_plan.clone(),
3479 physical_plan: physical_plan.clone(),
3480 dataflow_metainfos: metainfo.clone(),
3481 optimizer_features: optimizer_config.features.clone(),
3482 },
3483 );
3484 (optimized_plan, physical_plan, metainfo)
3485 }
3486 };
3487
3488 let catalog = self.catalog_mut();
3489 catalog.set_optimized_plan(mv.global_id_writes(), optimized_plan);
3490 catalog.set_physical_plan(mv.global_id_writes(), physical_plan);
3491 catalog.set_dataflow_metainfo(mv.global_id_writes(), metainfo);
3492
3493 compute_instance.insert_collection(mv.global_id_writes());
3494 }
3495 CatalogItem::Table(_)
3496 | CatalogItem::Source(_)
3497 | CatalogItem::Log(_)
3498 | CatalogItem::View(_)
3499 | CatalogItem::Sink(_)
3500 | CatalogItem::Type(_)
3501 | CatalogItem::Func(_)
3502 | CatalogItem::Secret(_)
3503 | CatalogItem::Connection(_) => (),
3504 }
3505 }
3506
3507 Ok(uncached_expressions)
3508 }
3509
3510 async fn bootstrap_dataflow_as_ofs(&mut self) -> BTreeMap<GlobalId, ReadHold> {
3520 let mut catalog_ids = Vec::new();
3521 let mut dataflows = Vec::new();
3522 let mut read_policies = BTreeMap::new();
3523 for entry in self.catalog.entries() {
3524 let gid = match entry.item() {
3525 CatalogItem::Index(idx) => idx.global_id(),
3526 CatalogItem::MaterializedView(mv) => mv.global_id_writes(),
3527 CatalogItem::Table(_)
3528 | CatalogItem::Source(_)
3529 | CatalogItem::Log(_)
3530 | CatalogItem::View(_)
3531 | CatalogItem::Sink(_)
3532 | CatalogItem::Type(_)
3533 | CatalogItem::Func(_)
3534 | CatalogItem::Secret(_)
3535 | CatalogItem::Connection(_) => continue,
3536 };
3537 if let Some(plan) = self.catalog.try_get_physical_plan(&gid) {
3538 catalog_ids.push(gid);
3539 dataflows.push(plan.clone());
3540
3541 if let Some(compaction_window) = entry.item().initial_logical_compaction_window() {
3542 read_policies.insert(gid, compaction_window.into());
3543 }
3544 }
3545 }
3546
3547 let read_ts = self.get_local_read_ts().await;
3548 let read_holds = as_of_selection::run(
3549 &mut dataflows,
3550 &read_policies,
3551 &*self.controller.storage_collections,
3552 read_ts,
3553 self.controller.read_only(),
3554 );
3555
3556 let catalog = self.catalog_mut();
3557 for (id, plan) in catalog_ids.into_iter().zip_eq(dataflows) {
3558 catalog.set_physical_plan(id, plan);
3559 }
3560
3561 read_holds
3562 }
3563
3564 fn serve(
3573 mut self,
3574 mut internal_cmd_rx: mpsc::UnboundedReceiver<Message>,
3575 mut strict_serializable_reads_rx: mpsc::UnboundedReceiver<(ConnectionId, PendingReadTxn)>,
3576 mut cmd_rx: mpsc::UnboundedReceiver<(OpenTelemetryContext, Command)>,
3577 group_commit_rx: appends::GroupCommitWaiter,
3578 ) -> LocalBoxFuture<'static, ()> {
3579 async move {
3580 let mut cluster_events = self.controller.events_stream();
3582 let last_message = Arc::new(Mutex::new(LastMessage {
3583 kind: "none",
3584 stmt: None,
3585 }));
3586
3587 let (idle_tx, mut idle_rx) = tokio::sync::mpsc::channel(1);
3588 let idle_metric = self.metrics.queue_busy_seconds.clone();
3589 let last_message_watchdog = Arc::clone(&last_message);
3590
3591 spawn(|| "coord watchdog", async move {
3592 let mut interval = tokio::time::interval(Duration::from_secs(5));
3597 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
3601
3602 let mut coord_stuck = false;
3604
3605 loop {
3606 interval.tick().await;
3607
3608 let duration = tokio::time::Duration::from_secs(30);
3610 let timeout = tokio::time::timeout(duration, idle_tx.reserve()).await;
3611 let Ok(maybe_permit) = timeout else {
3612 if !coord_stuck {
3614 let last_message = last_message_watchdog.lock().expect("poisoned");
3615 tracing::warn!(
3616 last_message_kind = %last_message.kind,
3617 last_message_sql = %last_message.stmt_to_string(),
3618 "coordinator stuck for {duration:?}",
3619 );
3620 }
3621 coord_stuck = true;
3622
3623 continue;
3624 };
3625
3626 if coord_stuck {
3628 tracing::info!("Coordinator became unstuck");
3629 }
3630 coord_stuck = false;
3631
3632 let Ok(permit) = maybe_permit else {
3634 break;
3635 };
3636
3637 permit.send(idle_metric.start_timer());
3638 }
3639 });
3640
3641 self.schedule_storage_usage_collection().await;
3642 self.schedule_arrangement_sizes_collection().await;
3643 self.spawn_privatelink_vpc_endpoints_watch_task();
3644 self.spawn_statement_logging_task();
3645 self.spawn_catalog_info_metrics_task();
3646 flags::tracing_config(self.catalog.system_config()).apply(&self.tracing_handle);
3647
3648 let warn_threshold = self
3650 .catalog()
3651 .system_config()
3652 .coord_slow_message_warn_threshold();
3653
3654 const MESSAGE_BATCH: usize = 64;
3656 let mut messages = Vec::with_capacity(MESSAGE_BATCH);
3657 let mut cmd_messages = Vec::with_capacity(MESSAGE_BATCH);
3658
3659 let message_batch = self.metrics.message_batch.clone();
3660
3661 loop {
3662 select! {
3666 biased;
3671
3672 _ = internal_cmd_rx.recv_many(&mut messages, MESSAGE_BATCH) => {},
3676 Some(event) = cluster_events.next() => {
3680 messages.push(Message::ClusterEvent(event))
3681 },
3682 () = self.controller.ready() => {
3686 let controller = match self.controller.get_readiness() {
3690 Readiness::Storage => ControllerReadiness::Storage,
3691 Readiness::Compute => ControllerReadiness::Compute,
3692 Readiness::Metrics(_) => ControllerReadiness::Metrics,
3693 Readiness::Internal(_) => ControllerReadiness::Internal,
3694 Readiness::NotReady => unreachable!("just signaled as ready"),
3695 };
3696 messages.push(Message::ControllerReady { controller });
3697 }
3698 permit = group_commit_rx.ready() => {
3701 let user_write_spans = self.pending_writes.iter().flat_map(|x| match x {
3707 PendingWriteTxn::User{span, ..} => Some(span),
3708 PendingWriteTxn::System{..} => None,
3709 });
3710 let span = match user_write_spans.exactly_one() {
3711 Ok(span) => span.clone(),
3712 Err(user_write_spans) => {
3713 let span = info_span!(parent: None, "group_commit_notify");
3714 for s in user_write_spans {
3715 span.follows_from(s);
3716 }
3717 span
3718 }
3719 };
3720 messages.push(Message::GroupCommitInitiate(span, Some(permit)));
3721 },
3722 count = cmd_rx.recv_many(&mut cmd_messages, MESSAGE_BATCH) => {
3726 if count == 0 {
3727 break;
3728 } else {
3729 messages.extend(cmd_messages.drain(..).map(
3730 |(otel_ctx, cmd)| Message::Command(otel_ctx, cmd),
3731 ));
3732 }
3733 },
3734 Some(pending_read_txn) = strict_serializable_reads_rx.recv() => {
3738 let mut pending_read_txns = vec![pending_read_txn];
3739 while let Ok(pending_read_txn) = strict_serializable_reads_rx.try_recv() {
3740 pending_read_txns.push(pending_read_txn);
3741 }
3742 for (conn_id, pending_read_txn) in pending_read_txns {
3743 let prev = self
3744 .pending_linearize_read_txns
3745 .insert(conn_id, pending_read_txn);
3746 soft_assert_or_log!(
3747 prev.is_none(),
3748 "connections can not have multiple concurrent reads, prev: {prev:?}"
3749 )
3750 }
3751 messages.push(Message::LinearizeReads);
3752 }
3753 _ = self.advance_timelines_interval.tick() => {
3757 let span = info_span!(parent: None, "coord::advance_timelines_interval");
3758 span.follows_from(Span::current());
3759
3760 if self.controller.read_only() {
3765 messages.push(Message::AdvanceTimelines);
3766 } else {
3767 messages.push(Message::GroupCommitInitiate(span, None));
3768 }
3769 },
3770 _ = self.check_cluster_scheduling_policies_interval.tick() => {
3774 messages.push(Message::CheckSchedulingPolicies);
3775 },
3776
3777 _ = self.caught_up_check_interval.tick() => {
3781 self.maybe_check_caught_up().await;
3786
3787 continue;
3788 },
3789
3790 timer = idle_rx.recv() => {
3795 timer.expect("does not drop").observe_duration();
3796 self.metrics
3797 .message_handling
3798 .with_label_values(&["watchdog"])
3799 .observe(0.0);
3800 continue;
3801 }
3802 };
3803
3804 message_batch.observe(f64::cast_lossy(messages.len()));
3806
3807 for msg in messages.drain(..) {
3808 let msg_kind = msg.kind();
3811 let span = span!(
3812 target: "mz_adapter::coord::handle_message_loop",
3813 Level::INFO,
3814 "coord::handle_message",
3815 kind = msg_kind
3816 );
3817 let otel_context = span.context().span().span_context().clone();
3818
3819 *last_message.lock().expect("poisoned") = LastMessage {
3823 kind: msg_kind,
3824 stmt: match &msg {
3825 Message::Command(
3826 _,
3827 Command::Execute {
3828 portal_name,
3829 session,
3830 ..
3831 },
3832 ) => session
3833 .get_portal_unverified(portal_name)
3834 .and_then(|p| p.stmt.as_ref().map(Arc::clone)),
3835 _ => None,
3836 },
3837 };
3838
3839 let start = Instant::now();
3840 self.handle_message(msg).instrument(span).await;
3841 let duration = start.elapsed();
3842
3843 self.metrics
3844 .message_handling
3845 .with_label_values(&[msg_kind])
3846 .observe(duration.as_secs_f64());
3847
3848 if duration > warn_threshold {
3850 let trace_id = otel_context.is_valid().then(|| otel_context.trace_id());
3851 tracing::error!(
3852 ?msg_kind,
3853 ?trace_id,
3854 ?duration,
3855 "very slow coordinator message"
3856 );
3857 }
3858 }
3859 }
3860 if let Some(catalog) = Arc::into_inner(self.catalog) {
3863 catalog.expire().await;
3864 }
3865 }
3866 .boxed_local()
3867 }
3868
3869 fn catalog(&self) -> &Catalog {
3871 &self.catalog
3872 }
3873
3874 fn owned_catalog(&self) -> Arc<Catalog> {
3877 Arc::clone(&self.catalog)
3878 }
3879
3880 fn optimizer_metrics(&self) -> OptimizerMetrics {
3883 self.optimizer_metrics.clone()
3884 }
3885
3886 fn catalog_mut(&mut self) -> &mut Catalog {
3888 Arc::make_mut(&mut self.catalog)
3896 }
3897
3898 async fn refill_user_id_pool(&mut self, min_count: u64) -> Result<(), AdapterError> {
3903 let batch_size = USER_ID_POOL_BATCH_SIZE.get(self.catalog().system_config().dyncfgs());
3904 let to_allocate = min_count.max(u64::from(batch_size));
3905 let id_ts = self.get_catalog_write_ts().await;
3906 let ids = self.catalog().allocate_user_ids(to_allocate, id_ts).await?;
3907 if let (Some((first_id, _)), Some((last_id, _))) = (ids.first(), ids.last()) {
3908 let start = match first_id {
3909 CatalogItemId::User(id) => *id,
3910 other => {
3911 return Err(AdapterError::Internal(format!(
3912 "expected User CatalogItemId, got {other:?}"
3913 )));
3914 }
3915 };
3916 let end = match last_id {
3917 CatalogItemId::User(id) => *id + 1, other => {
3919 return Err(AdapterError::Internal(format!(
3920 "expected User CatalogItemId, got {other:?}"
3921 )));
3922 }
3923 };
3924 self.user_id_pool.refill(start, end);
3925 } else {
3926 return Err(AdapterError::Internal(
3927 "catalog returned no user IDs".into(),
3928 ));
3929 }
3930 Ok(())
3931 }
3932
3933 async fn allocate_user_id(&mut self) -> Result<(CatalogItemId, GlobalId), AdapterError> {
3935 if let Some(id) = self.user_id_pool.allocate() {
3936 return Ok((CatalogItemId::User(id), GlobalId::User(id)));
3937 }
3938 self.refill_user_id_pool(1).await?;
3939 let id = self.user_id_pool.allocate().expect("ID pool just refilled");
3940 Ok((CatalogItemId::User(id), GlobalId::User(id)))
3941 }
3942
3943 async fn allocate_user_ids(
3945 &mut self,
3946 count: u64,
3947 ) -> Result<Vec<(CatalogItemId, GlobalId)>, AdapterError> {
3948 if self.user_id_pool.remaining() < count {
3949 self.refill_user_id_pool(count).await?;
3950 }
3951 let raw_ids = self
3952 .user_id_pool
3953 .allocate_many(count)
3954 .expect("pool has enough IDs after refill");
3955 Ok(raw_ids
3956 .into_iter()
3957 .map(|id| (CatalogItemId::User(id), GlobalId::User(id)))
3958 .collect())
3959 }
3960
3961 fn connection_context(&self) -> &ConnectionContext {
3963 self.controller.connection_context()
3964 }
3965
3966 fn secrets_reader(&self) -> &Arc<dyn SecretsReader> {
3968 &self.connection_context().secrets_reader
3969 }
3970
3971 #[allow(dead_code)]
3976 pub(crate) fn broadcast_notice(&self, notice: AdapterNotice) {
3977 for meta in self.active_conns.values() {
3978 let _ = meta.notice_tx.send(notice.clone());
3979 }
3980 }
3981
3982 pub(crate) fn broadcast_notice_tx(
3985 &self,
3986 ) -> Box<dyn FnOnce(AdapterNotice) -> () + Send + 'static> {
3987 let senders: Vec<_> = self
3988 .active_conns
3989 .values()
3990 .map(|meta| meta.notice_tx.clone())
3991 .collect();
3992 Box::new(move |notice| {
3993 for tx in senders {
3994 let _ = tx.send(notice.clone());
3995 }
3996 })
3997 }
3998
3999 pub(crate) fn active_conns(&self) -> &BTreeMap<ConnectionId, ConnMeta> {
4000 &self.active_conns
4001 }
4002
4003 #[instrument(level = "debug")]
4004 pub(crate) fn retire_execution(
4005 &mut self,
4006 reason: StatementEndedExecutionReason,
4007 ctx_extra: ExecuteContextExtra,
4008 ) {
4009 if let Some(uuid) = ctx_extra.retire() {
4010 self.end_statement_execution(uuid, reason);
4011 }
4012 }
4013
4014 #[instrument(level = "debug")]
4016 pub fn dataflow_builder(&self, instance: ComputeInstanceId) -> DataflowBuilder<'_> {
4017 let compute = self
4018 .instance_snapshot(instance)
4019 .expect("compute instance does not exist");
4020 DataflowBuilder::new(self.catalog().state(), compute)
4021 }
4022
4023 pub fn instance_snapshot(
4025 &self,
4026 id: ComputeInstanceId,
4027 ) -> Result<ComputeInstanceSnapshot, InstanceMissing> {
4028 ComputeInstanceSnapshot::new(&self.controller, id)
4029 }
4030
4031 pub(crate) async fn ship_dataflow(
4038 &mut self,
4039 dataflow: DataflowDescription<Plan>,
4040 instance: ComputeInstanceId,
4041 target_replica: Option<ReplicaId>,
4042 ) {
4043 self.try_ship_dataflow(dataflow, instance, target_replica)
4044 .await
4045 .unwrap_or_terminate("dataflow creation cannot fail");
4046 }
4047
4048 pub(crate) async fn try_ship_dataflow(
4051 &mut self,
4052 dataflow: DataflowDescription<Plan>,
4053 instance: ComputeInstanceId,
4054 target_replica: Option<ReplicaId>,
4055 ) -> Result<(), DataflowCreationError> {
4056 let export_ids = dataflow.exported_index_ids().collect();
4059
4060 self.controller
4061 .compute
4062 .create_dataflow(instance, dataflow, target_replica)?;
4063
4064 self.initialize_compute_read_policies(export_ids, instance, CompactionWindow::Default)
4065 .await;
4066
4067 Ok(())
4068 }
4069
4070 pub(crate) fn allow_writes(&mut self, instance: ComputeInstanceId, id: GlobalId) {
4074 self.controller
4075 .compute
4076 .allow_writes(instance, id)
4077 .unwrap_or_terminate("allow_writes cannot fail");
4078 }
4079
4080 pub(crate) async fn ship_dataflow_and_notice_builtin_table_updates(
4082 &mut self,
4083 dataflow: DataflowDescription<Plan>,
4084 instance: ComputeInstanceId,
4085 notice_builtin_updates_fut: Option<BuiltinTableAppendNotify>,
4086 target_replica: Option<ReplicaId>,
4087 ) {
4088 if let Some(notice_builtin_updates_fut) = notice_builtin_updates_fut {
4089 let ship_dataflow_fut = self.ship_dataflow(dataflow, instance, target_replica);
4090 let ((), ()) =
4091 futures::future::join(notice_builtin_updates_fut, ship_dataflow_fut).await;
4092 } else {
4093 self.ship_dataflow(dataflow, instance, target_replica).await;
4094 }
4095 }
4096
4097 pub fn install_compute_watch_set(
4101 &mut self,
4102 conn_id: ConnectionId,
4103 objects: BTreeSet<GlobalId>,
4104 t: Timestamp,
4105 state: WatchSetResponse,
4106 ) -> Result<(), CollectionLookupError> {
4107 let ws_id = self.controller.install_compute_watch_set(objects, t)?;
4108 self.connection_watch_sets
4109 .entry(conn_id.clone())
4110 .or_default()
4111 .insert(ws_id);
4112 self.installed_watch_sets.insert(ws_id, (conn_id, state));
4113 Ok(())
4114 }
4115
4116 pub fn install_storage_watch_set(
4120 &mut self,
4121 conn_id: ConnectionId,
4122 objects: BTreeSet<GlobalId>,
4123 t: Timestamp,
4124 state: WatchSetResponse,
4125 ) -> Result<(), CollectionMissing> {
4126 let ws_id = self.controller.install_storage_watch_set(objects, t)?;
4127 self.connection_watch_sets
4128 .entry(conn_id.clone())
4129 .or_default()
4130 .insert(ws_id);
4131 self.installed_watch_sets.insert(ws_id, (conn_id, state));
4132 Ok(())
4133 }
4134
4135 pub fn cancel_pending_watchsets(&mut self, conn_id: &ConnectionId) {
4137 if let Some(ws_ids) = self.connection_watch_sets.remove(conn_id) {
4138 for ws_id in ws_ids {
4139 self.installed_watch_sets.remove(&ws_id);
4140 }
4141 }
4142 }
4143
4144 pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
4148 let global_timelines: BTreeMap<_, _> = self
4154 .global_timelines
4155 .iter()
4156 .map(|(timeline, state)| (timeline.to_string(), format!("{state:?}")))
4157 .collect();
4158 let active_conns: BTreeMap<_, _> = self
4159 .active_conns
4160 .iter()
4161 .map(|(id, meta)| (id.unhandled().to_string(), format!("{meta:?}")))
4162 .collect();
4163 let txn_read_holds: BTreeMap<_, _> = self
4164 .txn_read_holds
4165 .iter()
4166 .map(|(id, capability)| (id.unhandled().to_string(), format!("{capability:?}")))
4167 .collect();
4168 let pending_peeks: BTreeMap<_, _> = self
4169 .pending_peeks
4170 .iter()
4171 .map(|(id, peek)| (id.to_string(), format!("{peek:?}")))
4172 .collect();
4173 let client_pending_peeks: BTreeMap<_, _> = self
4174 .client_pending_peeks
4175 .iter()
4176 .map(|(id, peek)| {
4177 let peek: BTreeMap<_, _> = peek
4178 .iter()
4179 .map(|(uuid, storage_id)| (uuid.to_string(), storage_id))
4180 .collect();
4181 (id.to_string(), peek)
4182 })
4183 .collect();
4184 let pending_linearize_read_txns: BTreeMap<_, _> = self
4185 .pending_linearize_read_txns
4186 .iter()
4187 .map(|(id, read_txn)| (id.unhandled().to_string(), format!("{read_txn:?}")))
4188 .collect();
4189
4190 Ok(serde_json::json!({
4191 "global_timelines": global_timelines,
4192 "active_conns": active_conns,
4193 "txn_read_holds": txn_read_holds,
4194 "pending_peeks": pending_peeks,
4195 "client_pending_peeks": client_pending_peeks,
4196 "pending_linearize_read_txns": pending_linearize_read_txns,
4197 "controller": self.controller.dump().await?,
4198 }))
4199 }
4200
4201 async fn prune_storage_usage_events_on_startup(&self, retention_period: Duration) {
4215 let item_id = self
4216 .catalog()
4217 .resolve_builtin_table(&MZ_STORAGE_USAGE_BY_SHARD);
4218 let global_id = self.catalog.get_entry(&item_id).latest_global_id();
4219 let read_ts = self.get_local_read_ts().await;
4220 let current_contents_fut = self
4221 .controller
4222 .storage_collections
4223 .snapshot(global_id, read_ts);
4224 let internal_cmd_tx = self.internal_cmd_tx.clone();
4225 spawn(|| "storage_usage_prune", async move {
4226 let mut current_contents = current_contents_fut
4227 .await
4228 .unwrap_or_terminate("cannot fail to fetch snapshot");
4229 differential_dataflow::consolidation::consolidate(&mut current_contents);
4230
4231 let cutoff_ts = u128::from(read_ts).saturating_sub(retention_period.as_millis());
4232 let mut expired = Vec::new();
4233 for (row, diff) in current_contents {
4234 assert_eq!(
4235 diff, 1,
4236 "consolidated contents should not contain retractions: ({row:#?}, {diff:#?})"
4237 );
4238 let collection_timestamp = row
4240 .unpack()
4241 .get(3)
4242 .expect("definition of mz_storage_by_shard changed")
4243 .unwrap_timestamptz();
4244 let collection_timestamp = collection_timestamp.timestamp_millis();
4245 let collection_timestamp: u128 = collection_timestamp
4246 .try_into()
4247 .expect("all collections happen after Jan 1 1970");
4248 if collection_timestamp < cutoff_ts {
4249 debug!("pruning storage event {row:?}");
4250 let builtin_update = BuiltinTableUpdate::row(item_id, row, Diff::MINUS_ONE);
4251 expired.push(builtin_update);
4252 }
4253 }
4254
4255 let _ = internal_cmd_tx.send(Message::StorageUsagePrune(expired));
4257 });
4258 }
4259
4260 async fn prune_arrangement_sizes_history_on_startup(&self) {
4269 if self.controller.read_only() {
4271 return;
4272 }
4273
4274 let retention_period = mz_adapter_types::dyncfgs::ARRANGEMENT_SIZE_HISTORY_RETENTION_PERIOD
4275 .get(self.catalog().system_config().dyncfgs());
4276 let item_id = self
4277 .catalog()
4278 .resolve_builtin_table(&mz_catalog::builtin::MZ_OBJECT_ARRANGEMENT_SIZE_HISTORY);
4279 let global_id = self.catalog.get_entry(&item_id).latest_global_id();
4280 let read_ts = self.get_local_read_ts().await;
4281 let current_contents_fut = self
4282 .controller
4283 .storage_collections
4284 .snapshot(global_id, read_ts);
4285 let internal_cmd_tx = self.internal_cmd_tx.clone();
4286 spawn(|| "arrangement_sizes_history_prune", async move {
4287 let mut current_contents = current_contents_fut
4288 .await
4289 .unwrap_or_terminate("cannot fail to fetch snapshot");
4290 differential_dataflow::consolidation::consolidate(&mut current_contents);
4291
4292 let cutoff_ts = u128::from(read_ts).saturating_sub(retention_period.as_millis());
4293 let expired =
4294 arrangement_sizes_expired_retractions(current_contents, cutoff_ts, item_id);
4295
4296 let _ = internal_cmd_tx.send(Message::ArrangementSizesPrune(expired));
4300 });
4301 }
4302
4303 fn current_credit_consumption_rate(&self) -> Numeric {
4304 self.catalog()
4305 .user_cluster_replicas()
4306 .filter_map(|replica| match &replica.config.location {
4307 ReplicaLocation::Managed(location) => Some(location.size_for_billing()),
4308 ReplicaLocation::Unmanaged(_) => None,
4309 })
4310 .map(|size| {
4311 self.catalog()
4312 .cluster_replica_sizes()
4313 .0
4314 .get(size)
4315 .expect("location size is validated against the cluster replica sizes")
4316 .credits_per_hour
4317 })
4318 .sum()
4319 }
4320}
4321
4322fn arrangement_sizes_expired_retractions(
4330 rows: impl IntoIterator<Item = (mz_repr::Row, i64)>,
4331 cutoff_ts: u128,
4332 item_id: CatalogItemId,
4333) -> Vec<BuiltinTableUpdate> {
4334 let mut expired = Vec::new();
4335 for (row, diff) in rows {
4336 assert_eq!(
4337 diff, 1,
4338 "consolidated contents should not contain retractions: ({row:#?}, {diff:#?})"
4339 );
4340 let collection_timestamp = row
4341 .unpack()
4342 .get(3)
4343 .expect("definition of mz_object_arrangement_size_history changed")
4344 .unwrap_timestamptz()
4345 .timestamp_millis();
4346 let collection_timestamp: u128 = collection_timestamp
4347 .try_into()
4348 .expect("all collections happen after Jan 1 1970");
4349 if collection_timestamp < cutoff_ts {
4350 expired.push(BuiltinTableUpdate::row(item_id, row, Diff::MINUS_ONE));
4351 }
4352 }
4353 expired
4354}
4355
4356#[cfg(test)]
4357impl Coordinator {
4358 #[allow(dead_code)]
4359 async fn verify_ship_dataflow_no_error(&mut self, dataflow: DataflowDescription<Plan>) {
4360 let compute_instance = ComputeInstanceId::user(1).expect("1 is a valid ID");
4368
4369 let _: () = self.ship_dataflow(dataflow, compute_instance, None).await;
4370 }
4371}
4372
4373struct LastMessage {
4375 kind: &'static str,
4376 stmt: Option<Arc<Statement<Raw>>>,
4377}
4378
4379impl LastMessage {
4380 fn stmt_to_string(&self) -> Cow<'static, str> {
4382 self.stmt
4383 .as_ref()
4384 .map(|stmt| stmt.to_ast_string_redacted().into())
4385 .unwrap_or(Cow::Borrowed("<none>"))
4386 }
4387}
4388
4389impl fmt::Debug for LastMessage {
4390 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4391 f.debug_struct("LastMessage")
4392 .field("kind", &self.kind)
4393 .field("stmt", &self.stmt_to_string())
4394 .finish()
4395 }
4396}
4397
4398impl Drop for LastMessage {
4399 fn drop(&mut self) {
4400 if std::thread::panicking() {
4402 eprintln!("Coordinator panicking, dumping last message\n{self:?}",);
4404 }
4405 }
4406}
4407
4408pub fn serve(
4420 Config {
4421 controller_config,
4422 controller_envd_epoch,
4423 mut storage,
4424 audit_logs_iterator,
4425 timestamp_oracle_url,
4426 unsafe_mode,
4427 all_features,
4428 build_info,
4429 environment_id,
4430 metrics_registry,
4431 now,
4432 secrets_controller,
4433 cloud_resource_controller,
4434 cluster_replica_sizes,
4435 builtin_system_cluster_config,
4436 builtin_catalog_server_cluster_config,
4437 builtin_probe_cluster_config,
4438 builtin_support_cluster_config,
4439 builtin_analytics_cluster_config,
4440 system_parameter_defaults,
4441 availability_zones,
4442 storage_usage_client,
4443 storage_usage_collection_interval,
4444 storage_usage_retention_period,
4445 segment_client,
4446 egress_addresses,
4447 aws_account_id,
4448 aws_privatelink_availability_zones,
4449 connection_context,
4450 connection_limit_callback,
4451 remote_system_parameters,
4452 webhook_concurrency_limit,
4453 http_host_name,
4454 tracing_handle,
4455 read_only_controllers,
4456 caught_up_trigger: clusters_caught_up_trigger,
4457 helm_chart_version,
4458 license_key,
4459 external_login_password_mz_system,
4460 force_builtin_schema_migration,
4461 }: Config,
4462) -> BoxFuture<'static, Result<(Handle, Client), AdapterError>> {
4463 async move {
4464 let coord_start = Instant::now();
4465 info!("startup: coordinator init: beginning");
4466 info!("startup: coordinator init: preamble beginning");
4467
4468 let _builtins = LazyLock::force(&BUILTINS_STATIC);
4472
4473 let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
4474 let (internal_cmd_tx, internal_cmd_rx) = mpsc::unbounded_channel();
4475 let (strict_serializable_reads_tx, strict_serializable_reads_rx) =
4476 mpsc::unbounded_channel();
4477
4478 if !availability_zones.iter().all_unique() {
4480 coord_bail!("availability zones must be unique");
4481 }
4482
4483 let aws_principal_context = match (
4484 aws_account_id,
4485 connection_context.aws_external_id_prefix.clone(),
4486 ) {
4487 (Some(aws_account_id), Some(aws_external_id_prefix)) => Some(AwsPrincipalContext {
4488 aws_account_id,
4489 aws_external_id_prefix,
4490 }),
4491 _ => None,
4492 };
4493
4494 let aws_privatelink_availability_zones = aws_privatelink_availability_zones
4495 .map(|azs_vec| BTreeSet::from_iter(azs_vec.iter().cloned()));
4496
4497 info!(
4498 "startup: coordinator init: preamble complete in {:?}",
4499 coord_start.elapsed()
4500 );
4501 let oracle_init_start = Instant::now();
4502 info!("startup: coordinator init: timestamp oracle init beginning");
4503
4504 let timestamp_oracle_config = timestamp_oracle_url
4505 .map(|url| TimestampOracleConfig::from_url(&url, &metrics_registry))
4506 .transpose()?;
4507 let mut initial_timestamps =
4508 get_initial_oracle_timestamps(×tamp_oracle_config).await?;
4509
4510 initial_timestamps
4514 .entry(Timeline::EpochMilliseconds)
4515 .or_insert_with(mz_repr::Timestamp::minimum);
4516 let mut timestamp_oracles = BTreeMap::new();
4517 for (timeline, initial_timestamp) in initial_timestamps {
4518 Coordinator::ensure_timeline_state_with_initial_time(
4519 &timeline,
4520 initial_timestamp,
4521 now.clone(),
4522 timestamp_oracle_config.clone(),
4523 &mut timestamp_oracles,
4524 read_only_controllers,
4525 )
4526 .await;
4527 }
4528
4529 let catalog_upper = storage.current_upper().await;
4533 let epoch_millis_oracle = ×tamp_oracles
4539 .get(&Timeline::EpochMilliseconds)
4540 .expect("inserted above")
4541 .oracle;
4542
4543 let mut boot_ts = if read_only_controllers {
4544 let read_ts = epoch_millis_oracle.read_ts().await;
4545 std::cmp::max(read_ts, catalog_upper)
4546 } else {
4547 epoch_millis_oracle.apply_write(catalog_upper).await;
4550 epoch_millis_oracle.write_ts().await.timestamp
4551 };
4552
4553 info!(
4554 "startup: coordinator init: timestamp oracle init complete in {:?}",
4555 oracle_init_start.elapsed()
4556 );
4557
4558 let catalog_open_start = Instant::now();
4559 info!("startup: coordinator init: catalog open beginning");
4560 let persist_client = controller_config
4561 .persist_clients
4562 .open(controller_config.persist_location.clone())
4563 .await
4564 .context("opening persist client")?;
4565 let builtin_item_migration_config =
4566 BuiltinItemMigrationConfig {
4567 persist_client: persist_client.clone(),
4568 read_only: read_only_controllers,
4569 force_migration: force_builtin_schema_migration,
4570 }
4571 ;
4572 let OpenCatalogResult {
4573 mut catalog,
4574 migrated_storage_collections_0dt,
4575 new_builtin_collections,
4576 builtin_table_updates,
4577 cached_global_exprs,
4578 uncached_local_exprs,
4579 } = Catalog::open(mz_catalog::config::Config {
4580 storage,
4581 metrics_registry: &metrics_registry,
4582 state: mz_catalog::config::StateConfig {
4583 unsafe_mode,
4584 all_features,
4585 build_info,
4586 environment_id: environment_id.clone(),
4587 read_only: read_only_controllers,
4588 now: now.clone(),
4589 boot_ts: boot_ts.clone(),
4590 skip_migrations: false,
4591 cluster_replica_sizes,
4592 builtin_system_cluster_config,
4593 builtin_catalog_server_cluster_config,
4594 builtin_probe_cluster_config,
4595 builtin_support_cluster_config,
4596 builtin_analytics_cluster_config,
4597 system_parameter_defaults,
4598 remote_system_parameters,
4599 availability_zones,
4600 egress_addresses,
4601 aws_principal_context,
4602 aws_privatelink_availability_zones,
4603 connection_context,
4604 http_host_name,
4605 builtin_item_migration_config,
4606 persist_client: persist_client.clone(),
4607 enable_expression_cache_override: None,
4608 helm_chart_version,
4609 external_login_password_mz_system,
4610 license_key: license_key.clone(),
4611 },
4612 })
4613 .await?;
4614
4615 let catalog_upper = catalog.current_upper().await;
4618 boot_ts = std::cmp::max(boot_ts, catalog_upper);
4619
4620 if !read_only_controllers {
4621 epoch_millis_oracle.apply_write(boot_ts).await;
4622 }
4623
4624 info!(
4625 "startup: coordinator init: catalog open complete in {:?}",
4626 catalog_open_start.elapsed()
4627 );
4628
4629 let coord_thread_start = Instant::now();
4630 info!("startup: coordinator init: coordinator thread start beginning");
4631
4632 let session_id = catalog.config().session_id;
4633 let start_instant = catalog.config().start_instant;
4634
4635 let (bootstrap_tx, bootstrap_rx) = oneshot::channel();
4639 let handle = TokioHandle::current();
4640
4641 let metrics = Metrics::register_into(&metrics_registry);
4642 let metrics_clone = metrics.clone();
4643 let optimizer_metrics = OptimizerMetrics::register_into(
4644 &metrics_registry,
4645 catalog.system_config().optimizer_e2e_latency_warning_threshold(),
4646 );
4647 let segment_client_clone = segment_client.clone();
4648 let coord_now = now.clone();
4649 let advance_timelines_interval =
4650 tokio::time::interval(catalog.system_config().default_timestamp_interval());
4651 let mut check_scheduling_policies_interval = tokio::time::interval(
4652 catalog
4653 .system_config()
4654 .cluster_check_scheduling_policies_interval(),
4655 );
4656 check_scheduling_policies_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
4657
4658 let clusters_caught_up_check_interval = if read_only_controllers {
4659 let dyncfgs = catalog.system_config().dyncfgs();
4660 let interval = WITH_0DT_DEPLOYMENT_CAUGHT_UP_CHECK_INTERVAL.get(dyncfgs);
4661
4662 let mut interval = tokio::time::interval(interval);
4663 interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
4664 interval
4665 } else {
4666 let mut interval = tokio::time::interval(Duration::from_secs(60 * 60));
4674 interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
4675 interval
4676 };
4677
4678 let clusters_caught_up_check =
4679 clusters_caught_up_trigger.map(|trigger| {
4680 let mut exclude_collections: BTreeSet<GlobalId> =
4681 new_builtin_collections.into_iter().collect();
4682
4683 let mut todo: Vec<_> = migrated_storage_collections_0dt
4693 .iter()
4694 .filter(|id| {
4695 catalog.state().get_entry(id).is_materialized_view()
4696 })
4697 .copied()
4698 .collect();
4699 while let Some(item_id) = todo.pop() {
4700 let entry = catalog.state().get_entry(&item_id);
4701 exclude_collections.extend(entry.global_ids());
4702 todo.extend_from_slice(entry.used_by());
4703 }
4704
4705 CaughtUpCheckContext {
4706 trigger,
4707 exclude_collections,
4708 }
4709 });
4710
4711 if let Some(TimestampOracleConfig::Postgres(pg_config)) =
4712 timestamp_oracle_config.as_ref()
4713 {
4714 let pg_timestamp_oracle_params =
4717 flags::timestamp_oracle_config(catalog.system_config());
4718 pg_timestamp_oracle_params.apply(pg_config);
4719 }
4720
4721 let connection_limit_callback: Arc<dyn Fn(&SystemVars) + Send + Sync> =
4724 Arc::new(move |system_vars: &SystemVars| {
4725 let limit: u64 = system_vars.max_connections().cast_into();
4726 let superuser_reserved: u64 =
4727 system_vars.superuser_reserved_connections().cast_into();
4728
4729 let superuser_reserved = if superuser_reserved >= limit {
4734 tracing::warn!(
4735 "superuser_reserved ({superuser_reserved}) is greater than max connections ({limit})!"
4736 );
4737 limit
4738 } else {
4739 superuser_reserved
4740 };
4741
4742 (connection_limit_callback)(limit, superuser_reserved);
4743 });
4744 catalog.system_config_mut().register_callback(
4745 &mz_sql::session::vars::MAX_CONNECTIONS,
4746 Arc::clone(&connection_limit_callback),
4747 );
4748 catalog.system_config_mut().register_callback(
4749 &mz_sql::session::vars::SUPERUSER_RESERVED_CONNECTIONS,
4750 connection_limit_callback,
4751 );
4752
4753 let (group_commit_tx, group_commit_rx) = appends::notifier();
4754
4755 let parent_span = tracing::Span::current();
4756 let thread = thread::Builder::new()
4757 .stack_size(3 * stack::STACK_SIZE)
4761 .name("coordinator".to_string())
4762 .spawn(move || {
4763 let span = info_span!(parent: parent_span, "coord::coordinator").entered();
4764
4765 let controller = handle
4766 .block_on({
4767 catalog.initialize_controller(
4768 controller_config,
4769 controller_envd_epoch,
4770 read_only_controllers,
4771 )
4772 })
4773 .unwrap_or_terminate("failed to initialize storage_controller");
4774 let catalog_upper = handle.block_on(catalog.current_upper());
4777 boot_ts = std::cmp::max(boot_ts, catalog_upper);
4778 if !read_only_controllers {
4779 let epoch_millis_oracle = ×tamp_oracles
4780 .get(&Timeline::EpochMilliseconds)
4781 .expect("inserted above")
4782 .oracle;
4783 handle.block_on(epoch_millis_oracle.apply_write(boot_ts));
4784 }
4785
4786 let catalog = Arc::new(catalog);
4787
4788 let caching_secrets_reader = CachingSecretsReader::new(secrets_controller.reader());
4789 let mut coord = Coordinator {
4790 controller,
4791 catalog,
4792 internal_cmd_tx,
4793 group_commit_tx,
4794 strict_serializable_reads_tx,
4795 global_timelines: timestamp_oracles,
4796 transient_id_gen: Arc::new(TransientIdGen::new()),
4797 active_conns: BTreeMap::new(),
4798 txn_read_holds: Default::default(),
4799 pending_peeks: BTreeMap::new(),
4800 client_pending_peeks: BTreeMap::new(),
4801 pending_linearize_read_txns: BTreeMap::new(),
4802 serialized_ddl: LockedVecDeque::new(),
4803 active_compute_sinks: BTreeMap::new(),
4804 active_webhooks: BTreeMap::new(),
4805 active_copies: BTreeMap::new(),
4806 connection_cancel_watches: BTreeMap::new(),
4807 introspection_subscribes: BTreeMap::new(),
4808 write_locks: BTreeMap::new(),
4809 deferred_write_ops: BTreeMap::new(),
4810 pending_writes: Vec::new(),
4811 advance_timelines_interval,
4812 secrets_controller,
4813 caching_secrets_reader,
4814 cloud_resource_controller,
4815 storage_usage_client,
4816 storage_usage_collection_interval,
4817 segment_client,
4818 metrics,
4819 catalog_info_metrics_registry: metrics_registry.clone(),
4820 optimizer_metrics,
4821 tracing_handle,
4822 statement_logging: StatementLogging::new(coord_now.clone()),
4823 webhook_concurrency_limit,
4824 timestamp_oracle_config,
4825 check_cluster_scheduling_policies_interval: check_scheduling_policies_interval,
4826 cluster_scheduling_decisions: BTreeMap::new(),
4827 caught_up_check_interval: clusters_caught_up_check_interval,
4828 caught_up_check: clusters_caught_up_check,
4829 installed_watch_sets: BTreeMap::new(),
4830 connection_watch_sets: BTreeMap::new(),
4831 cluster_replica_statuses: ClusterReplicaStatuses::new(),
4832 read_only_controllers,
4833 buffered_builtin_table_updates: Some(Vec::new()),
4834 license_key,
4835 user_id_pool: IdPool::empty(),
4836 persist_client,
4837 };
4838 let bootstrap = handle.block_on(async {
4839 coord
4840 .bootstrap(
4841 boot_ts,
4842 migrated_storage_collections_0dt,
4843 builtin_table_updates,
4844 cached_global_exprs,
4845 uncached_local_exprs,
4846 audit_logs_iterator,
4847 )
4848 .await?;
4849 coord
4850 .controller
4851 .remove_orphaned_replicas(
4852 coord.catalog().get_next_user_replica_id().await?,
4853 coord.catalog().get_next_system_replica_id().await?,
4854 )
4855 .await
4856 .map_err(AdapterError::Orchestrator)?;
4857
4858 if let Some(retention_period) = storage_usage_retention_period {
4859 coord
4860 .prune_storage_usage_events_on_startup(retention_period)
4861 .await;
4862 }
4863
4864 coord.prune_arrangement_sizes_history_on_startup().await;
4865
4866 Ok(())
4867 });
4868 let ok = bootstrap.is_ok();
4869 drop(span);
4870 bootstrap_tx
4871 .send(bootstrap)
4872 .expect("bootstrap_rx is not dropped until it receives this message");
4873 if ok {
4874 handle.block_on(coord.serve(
4875 internal_cmd_rx,
4876 strict_serializable_reads_rx,
4877 cmd_rx,
4878 group_commit_rx,
4879 ));
4880 }
4881 })
4882 .expect("failed to create coordinator thread");
4883 match bootstrap_rx
4884 .await
4885 .expect("bootstrap_tx always sends a message or panics/halts")
4886 {
4887 Ok(()) => {
4888 info!(
4889 "startup: coordinator init: coordinator thread start complete in {:?}",
4890 coord_thread_start.elapsed()
4891 );
4892 info!(
4893 "startup: coordinator init: complete in {:?}",
4894 coord_start.elapsed()
4895 );
4896 let handle = Handle {
4897 session_id,
4898 start_instant,
4899 _thread: thread.join_on_drop(),
4900 };
4901 let client = Client::new(
4902 build_info,
4903 cmd_tx,
4904 metrics_clone,
4905 now,
4906 environment_id,
4907 segment_client_clone,
4908 );
4909 Ok((handle, client))
4910 }
4911 Err(e) => Err(e),
4912 }
4913 }
4914 .boxed()
4915}
4916
4917async fn get_initial_oracle_timestamps(
4931 timestamp_oracle_config: &Option<TimestampOracleConfig>,
4932) -> Result<BTreeMap<Timeline, Timestamp>, AdapterError> {
4933 let mut initial_timestamps = BTreeMap::new();
4934
4935 if let Some(config) = timestamp_oracle_config {
4936 let oracle_timestamps = config.get_all_timelines().await?;
4937
4938 let debug_msg = || {
4939 oracle_timestamps
4940 .iter()
4941 .map(|(timeline, ts)| format!("{:?} -> {}", timeline, ts))
4942 .join(", ")
4943 };
4944 info!(
4945 "current timestamps from the timestamp oracle: {}",
4946 debug_msg()
4947 );
4948
4949 for (timeline, ts) in oracle_timestamps {
4950 let entry = initial_timestamps
4951 .entry(Timeline::from_str(&timeline).expect("could not parse timeline"));
4952
4953 entry
4954 .and_modify(|current_ts| *current_ts = std::cmp::max(*current_ts, ts))
4955 .or_insert(ts);
4956 }
4957 } else {
4958 info!("no timestamp oracle configured!");
4959 };
4960
4961 let debug_msg = || {
4962 initial_timestamps
4963 .iter()
4964 .map(|(timeline, ts)| format!("{:?}: {}", timeline, ts))
4965 .join(", ")
4966 };
4967 info!("initial oracle timestamps: {}", debug_msg());
4968
4969 Ok(initial_timestamps)
4970}
4971
4972#[instrument]
4973pub async fn load_remote_system_parameters(
4974 storage: &mut Box<dyn OpenableDurableCatalogState>,
4975 system_parameter_sync_config: Option<SystemParameterSyncConfig>,
4976 system_parameter_sync_timeout: Duration,
4977) -> Result<Option<BTreeMap<String, String>>, AdapterError> {
4978 if let Some(system_parameter_sync_config) = system_parameter_sync_config {
4979 tracing::info!("parameter sync on boot: start sync");
4980
4981 let mut params = SynchronizedParameters::new(SystemVars::default());
5021 let frontend_sync = async {
5022 let frontend = SystemParameterFrontend::from(&system_parameter_sync_config).await?;
5023 frontend.pull(&mut params);
5024 let ops = params
5025 .modified()
5026 .into_iter()
5027 .map(|param| {
5028 let name = param.name;
5029 let value = param.value;
5030 tracing::info!(name, value, initial = true, "sync parameter");
5031 (name, value)
5032 })
5033 .collect();
5034 tracing::info!("parameter sync on boot: end sync");
5035 Ok(Some(ops))
5036 };
5037 if !storage.has_system_config_synced_once().await? {
5038 frontend_sync.await
5039 } else {
5040 match mz_ore::future::timeout(system_parameter_sync_timeout, frontend_sync).await {
5041 Ok(ops) => Ok(ops),
5042 Err(TimeoutError::Inner(e)) => Err(e),
5043 Err(TimeoutError::DeadlineElapsed) => {
5044 tracing::info!("parameter sync on boot: sync has timed out");
5045 Ok(None)
5046 }
5047 }
5048 }
5049 } else {
5050 Ok(None)
5051 }
5052}
5053
5054#[derive(Debug)]
5055pub enum WatchSetResponse {
5056 StatementDependenciesReady(StatementLoggingId, StatementLifecycleEvent),
5057 AlterSinkReady(AlterSinkReadyContext),
5058 AlterMaterializedViewReady(AlterMaterializedViewReadyContext),
5059}
5060
5061#[derive(Debug)]
5062pub struct AlterSinkReadyContext {
5063 ctx: Option<ExecuteContext>,
5064 otel_ctx: OpenTelemetryContext,
5065 plan: AlterSinkPlan,
5066 plan_validity: PlanValidity,
5067 read_hold: ReadHolds,
5068}
5069
5070impl AlterSinkReadyContext {
5071 fn ctx(&mut self) -> &mut ExecuteContext {
5072 self.ctx.as_mut().expect("only cleared on drop")
5073 }
5074
5075 fn retire(mut self, result: Result<ExecuteResponse, AdapterError>) {
5076 self.ctx
5077 .take()
5078 .expect("only cleared on drop")
5079 .retire(result);
5080 }
5081}
5082
5083impl Drop for AlterSinkReadyContext {
5084 fn drop(&mut self) {
5085 if let Some(ctx) = self.ctx.take() {
5086 ctx.retire(Err(AdapterError::Canceled));
5087 }
5088 }
5089}
5090
5091#[derive(Debug)]
5092pub struct AlterMaterializedViewReadyContext {
5093 ctx: Option<ExecuteContext>,
5094 otel_ctx: OpenTelemetryContext,
5095 plan: plan::AlterMaterializedViewApplyReplacementPlan,
5096 plan_validity: PlanValidity,
5097}
5098
5099impl AlterMaterializedViewReadyContext {
5100 fn ctx(&mut self) -> &mut ExecuteContext {
5101 self.ctx.as_mut().expect("only cleared on drop")
5102 }
5103
5104 fn retire(mut self, result: Result<ExecuteResponse, AdapterError>) {
5105 self.ctx
5106 .take()
5107 .expect("only cleared on drop")
5108 .retire(result);
5109 }
5110}
5111
5112impl Drop for AlterMaterializedViewReadyContext {
5113 fn drop(&mut self) {
5114 if let Some(ctx) = self.ctx.take() {
5115 ctx.retire(Err(AdapterError::Canceled));
5116 }
5117 }
5118}
5119
5120#[derive(Debug)]
5123struct LockedVecDeque<T> {
5124 items: VecDeque<T>,
5125 lock: Arc<tokio::sync::Mutex<()>>,
5126}
5127
5128impl<T> LockedVecDeque<T> {
5129 pub fn new() -> Self {
5130 Self {
5131 items: VecDeque::new(),
5132 lock: Arc::new(tokio::sync::Mutex::new(())),
5133 }
5134 }
5135
5136 pub fn try_lock_owned(&self) -> Result<OwnedMutexGuard<()>, tokio::sync::TryLockError> {
5137 Arc::clone(&self.lock).try_lock_owned()
5138 }
5139
5140 pub fn is_empty(&self) -> bool {
5141 self.items.is_empty()
5142 }
5143
5144 pub fn push_back(&mut self, value: T) {
5145 self.items.push_back(value)
5146 }
5147
5148 pub fn pop_front(&mut self) -> Option<T> {
5149 self.items.pop_front()
5150 }
5151
5152 pub fn remove(&mut self, index: usize) -> Option<T> {
5153 self.items.remove(index)
5154 }
5155
5156 pub fn iter(&self) -> std::collections::vec_deque::Iter<'_, T> {
5157 self.items.iter()
5158 }
5159}
5160
5161#[derive(Debug)]
5162struct DeferredPlanStatement {
5163 ctx: ExecuteContext,
5164 ps: PlanStatement,
5165}
5166
5167#[derive(Debug)]
5168enum PlanStatement {
5169 Statement {
5170 stmt: Arc<Statement<Raw>>,
5171 params: Params,
5172 },
5173 Plan {
5174 plan: mz_sql::plan::Plan,
5175 resolved_ids: ResolvedIds,
5176 sql_impl_resolved_ids: ResolvedIds,
5177 },
5178}
5179
5180#[derive(Debug, Error)]
5181pub enum NetworkPolicyError {
5182 #[error("Access denied for address {0}")]
5183 AddressDenied(IpAddr),
5184 #[error("Access denied missing IP address")]
5185 MissingIp,
5186}
5187
5188pub(crate) fn validate_ip_with_policy_rules(
5189 ip: &IpAddr,
5190 rules: &Vec<NetworkPolicyRule>,
5191) -> Result<(), NetworkPolicyError> {
5192 if rules.iter().any(|r| r.address.0.contains(ip)) {
5195 Ok(())
5196 } else {
5197 Err(NetworkPolicyError::AddressDenied(ip.clone()))
5198 }
5199}
5200
5201pub(crate) fn infer_sql_type_for_catalog(
5202 hir_expr: &HirRelationExpr,
5203 mir_expr: &MirRelationExpr,
5204) -> SqlRelationType {
5205 let mut typ = hir_expr.top_level_typ();
5206 typ.backport_nullability_and_keys(&mir_expr.typ());
5207 typ
5208}
5209
5210#[cfg(test)]
5211mod id_pool_tests {
5212 use super::IdPool;
5213
5214 #[mz_ore::test]
5215 fn test_empty_pool() {
5216 let mut pool = IdPool::empty();
5217 assert_eq!(pool.remaining(), 0);
5218 assert_eq!(pool.allocate(), None);
5219 assert_eq!(pool.allocate_many(1), None);
5220 }
5221
5222 #[mz_ore::test]
5223 fn test_allocate_single() {
5224 let mut pool = IdPool::empty();
5225 pool.refill(10, 13);
5226 assert_eq!(pool.remaining(), 3);
5227 assert_eq!(pool.allocate(), Some(10));
5228 assert_eq!(pool.allocate(), Some(11));
5229 assert_eq!(pool.allocate(), Some(12));
5230 assert_eq!(pool.remaining(), 0);
5231 assert_eq!(pool.allocate(), None);
5232 }
5233
5234 #[mz_ore::test]
5235 fn test_allocate_many() {
5236 let mut pool = IdPool::empty();
5237 pool.refill(100, 105);
5238 assert_eq!(pool.allocate_many(3), Some(vec![100, 101, 102]));
5239 assert_eq!(pool.remaining(), 2);
5240 assert_eq!(pool.allocate_many(3), None);
5242 assert_eq!(pool.allocate_many(2), Some(vec![103, 104]));
5244 assert_eq!(pool.remaining(), 0);
5245 }
5246
5247 #[mz_ore::test]
5248 fn test_allocate_many_zero() {
5249 let mut pool = IdPool::empty();
5250 pool.refill(1, 5);
5251 assert_eq!(pool.allocate_many(0), Some(vec![]));
5252 assert_eq!(pool.remaining(), 4);
5253 }
5254
5255 #[mz_ore::test]
5256 fn test_refill_resets_pool() {
5257 let mut pool = IdPool::empty();
5258 pool.refill(0, 2);
5259 assert_eq!(pool.allocate(), Some(0));
5260 pool.refill(50, 52);
5262 assert_eq!(pool.allocate(), Some(50));
5263 assert_eq!(pool.allocate(), Some(51));
5264 assert_eq!(pool.allocate(), None);
5265 }
5266
5267 #[mz_ore::test]
5268 fn test_mixed_allocate_and_allocate_many() {
5269 let mut pool = IdPool::empty();
5270 pool.refill(0, 10);
5271 assert_eq!(pool.allocate(), Some(0));
5272 assert_eq!(pool.allocate_many(3), Some(vec![1, 2, 3]));
5273 assert_eq!(pool.allocate(), Some(4));
5274 assert_eq!(pool.remaining(), 5);
5275 }
5276
5277 #[mz_ore::test]
5278 #[should_panic(expected = "invalid pool range")]
5279 fn test_refill_invalid_range_panics() {
5280 let mut pool = IdPool::empty();
5281 pool.refill(10, 5);
5282 }
5283}
5284
5285#[cfg(test)]
5286mod arrangement_sizes_pruner_tests {
5287 use mz_repr::catalog_item_id::CatalogItemId;
5288 use mz_repr::{Datum, Row};
5289
5290 use super::arrangement_sizes_expired_retractions;
5291
5292 fn history_row(ts_ms: i64) -> Row {
5296 let dt = mz_ore::now::to_datetime(ts_ms.try_into().expect("non-negative"));
5297 Row::pack_slice(&[
5298 Datum::String("r1"),
5299 Datum::String("u1"),
5300 Datum::Int64(123),
5301 Datum::TimestampTz(dt.try_into().expect("fits in TimestampTz")),
5302 ])
5303 }
5304
5305 fn item_id() -> CatalogItemId {
5306 CatalogItemId::User(42)
5308 }
5309
5310 #[mz_ore::test]
5311 fn empty_input_produces_no_retractions() {
5312 let out = arrangement_sizes_expired_retractions(Vec::new(), 1_000, item_id());
5313 assert!(out.is_empty());
5314 }
5315
5316 #[mz_ore::test]
5317 fn retracts_only_rows_strictly_before_cutoff() {
5318 let rows = vec![
5321 (history_row(100), 1),
5322 (history_row(500), 1),
5323 (history_row(1_000), 1), (history_row(5_000), 1),
5325 ];
5326 let out = arrangement_sizes_expired_retractions(rows, 1_000, item_id());
5327 assert_eq!(out.len(), 2);
5328 }
5329
5330 #[mz_ore::test]
5331 #[should_panic(expected = "consolidated contents should not contain retractions")]
5332 fn retraction_in_input_panics() {
5333 let rows = vec![(history_row(100), -1)];
5334 let _ = arrangement_sizes_expired_retractions(rows, 1_000, item_id());
5335 }
5336}