1use std::borrow::Cow;
70use std::collections::{BTreeMap, BTreeSet, VecDeque};
71use std::net::IpAddr;
72use std::num::NonZeroI64;
73use std::ops::Neg;
74use std::str::FromStr;
75use std::sync::LazyLock;
76use std::sync::{Arc, Mutex};
77use std::thread;
78use std::time::{Duration, Instant};
79use std::{fmt, mem};
80
81use anyhow::Context;
82use chrono::{DateTime, Utc};
83use derivative::Derivative;
84use differential_dataflow::lattice::Lattice;
85use fail::fail_point;
86use futures::StreamExt;
87use futures::future::{BoxFuture, FutureExt, LocalBoxFuture};
88use http::Uri;
89use ipnet::IpNet;
90use itertools::{Either, Itertools};
91use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
92use mz_adapter_types::compaction::CompactionWindow;
93use mz_adapter_types::connection::ConnectionId;
94use mz_adapter_types::dyncfgs::{
95 USER_ID_POOL_BATCH_SIZE, WITH_0DT_DEPLOYMENT_CAUGHT_UP_CHECK_INTERVAL,
96};
97use mz_auth::password::Password;
98use mz_build_info::BuildInfo;
99use mz_catalog::builtin::{BUILTINS, BUILTINS_STATIC, MZ_AUDIT_EVENTS, MZ_STORAGE_USAGE_BY_SHARD};
100use mz_catalog::config::{AwsPrincipalContext, BuiltinItemMigrationConfig, ClusterReplicaSizeMap};
101use mz_catalog::durable::{AuditLogIterator, OpenableDurableCatalogState};
102use mz_catalog::expr_cache::{GlobalExpressions, LocalExpressions};
103use mz_catalog::memory::objects::{
104 CatalogEntry, CatalogItem, ClusterReplicaProcessStatus, ClusterVariantManaged, Connection,
105 DataSourceDesc, StateDiff, StateUpdate, StateUpdateKind, Table, TableDataSource,
106};
107use mz_cloud_resources::{CloudResourceController, VpcEndpointConfig, VpcEndpointEvent};
108use mz_compute_client::as_of_selection;
109use mz_compute_client::controller::error::{
110 CollectionLookupError, CollectionMissing, DataflowCreationError, InstanceMissing,
111};
112use mz_compute_types::ComputeInstanceId;
113use mz_compute_types::dataflows::DataflowDescription;
114use mz_compute_types::plan::Plan;
115use mz_controller::clusters::{
116 ClusterConfig, ClusterEvent, ClusterStatus, ProcessId, ReplicaLocation,
117};
118use mz_controller::{ControllerConfig, Readiness};
119use mz_controller_types::{ClusterId, ReplicaId, WatchSetId};
120use mz_expr::{MapFilterProject, MirRelationExpr, OptimizedMirRelationExpr, RowSetFinishing};
121use mz_license_keys::{ExpirationBehavior, ValidatedLicenseKey};
122use mz_orchestrator::OfflineReason;
123use mz_ore::cast::{CastFrom, CastInto, CastLossy};
124use mz_ore::channel::trigger::Trigger;
125use mz_ore::future::TimeoutError;
126use mz_ore::metrics::MetricsRegistry;
127use mz_ore::now::{EpochMillis, NowFn};
128use mz_ore::task::{JoinHandle, spawn};
129use mz_ore::thread::JoinHandleExt;
130use mz_ore::tracing::{OpenTelemetryContext, TracingHandle};
131use mz_ore::url::SensitiveUrl;
132use mz_ore::{
133 assert_none, instrument, soft_assert_eq_or_log, soft_assert_or_log, soft_panic_or_log, stack,
134};
135use mz_persist_client::PersistClient;
136use mz_persist_client::batch::ProtoBatch;
137use mz_persist_client::usage::{ShardsUsageReferenced, StorageUsageClient};
138use mz_repr::adt::numeric::Numeric;
139use mz_repr::explain::{ExplainConfig, ExplainFormat};
140use mz_repr::global_id::TransientIdGen;
141use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
142use mz_repr::role_id::RoleId;
143use mz_repr::{CatalogItemId, Diff, GlobalId, RelationDesc, SqlRelationType, Timestamp};
144use mz_secrets::cache::CachingSecretsReader;
145use mz_secrets::{SecretsController, SecretsReader};
146use mz_sql::ast::{Raw, Statement};
147use mz_sql::catalog::{CatalogCluster, EnvironmentId};
148use mz_sql::names::{QualifiedItemName, ResolvedIds, SchemaSpecifier};
149use mz_sql::optimizer_metrics::OptimizerMetrics;
150use mz_sql::plan::{
151 self, AlterSinkPlan, ConnectionDetails, CreateConnectionPlan, HirRelationExpr,
152 NetworkPolicyRule, OnTimeoutAction, Params, QueryWhen,
153};
154use mz_sql::session::user::User;
155use mz_sql::session::vars::{MAX_CREDIT_CONSUMPTION_RATE, SystemVars, Var};
156use mz_sql_parser::ast::ExplainStage;
157use mz_sql_parser::ast::display::AstDisplay;
158use mz_storage_client::client::TableData;
159use mz_storage_client::controller::{CollectionDescription, DataSource, ExportDescription};
160use mz_storage_types::connections::Connection as StorageConnection;
161use mz_storage_types::connections::ConnectionContext;
162use mz_storage_types::connections::inline::{IntoInlineConnection, ReferencedConnection};
163use mz_storage_types::read_holds::ReadHold;
164use mz_storage_types::sinks::{S3SinkFormat, StorageSinkDesc};
165use mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC;
166use mz_storage_types::sources::{IngestionDescription, SourceExport, Timeline};
167use mz_timestamp_oracle::{TimestampOracleConfig, WriteTimestamp};
168use mz_transform::dataflow::DataflowMetainfo;
169use opentelemetry::trace::TraceContextExt;
170use serde::Serialize;
171use thiserror::Error;
172use timely::progress::{Antichain, Timestamp as _};
173use tokio::runtime::Handle as TokioHandle;
174use tokio::select;
175use tokio::sync::{OwnedMutexGuard, mpsc, oneshot, watch};
176use tokio::time::{Interval, MissedTickBehavior};
177use tracing::{Instrument, Level, Span, debug, info, info_span, span, warn};
178use tracing_opentelemetry::OpenTelemetrySpanExt;
179use uuid::Uuid;
180
181use crate::active_compute_sink::{ActiveComputeSink, ActiveCopyFrom};
182use crate::catalog::{BuiltinTableUpdate, Catalog, OpenCatalogResult};
183use crate::client::{Client, Handle};
184use crate::command::{Command, ExecuteResponse};
185use crate::config::{SynchronizedParameters, SystemParameterFrontend, SystemParameterSyncConfig};
186use crate::coord::appends::{
187 BuiltinTableAppendNotify, DeferredOp, GroupCommitPermit, PendingWriteTxn,
188};
189use crate::coord::caught_up::CaughtUpCheckContext;
190use crate::coord::cluster_scheduling::SchedulingDecision;
191use crate::coord::id_bundle::CollectionIdBundle;
192use crate::coord::introspection::IntrospectionSubscribe;
193use crate::coord::peek::PendingPeek;
194use crate::coord::statement_logging::StatementLogging;
195use crate::coord::timeline::{TimelineContext, TimelineState};
196use crate::coord::timestamp_selection::{TimestampContext, TimestampDetermination};
197use crate::coord::validity::PlanValidity;
198use crate::error::AdapterError;
199use crate::explain::insights::PlanInsightsContext;
200use crate::explain::optimizer_trace::{DispatchGuard, OptimizerTrace};
201use crate::metrics::Metrics;
202use crate::optimize::dataflows::{ComputeInstanceSnapshot, DataflowBuilder};
203use crate::optimize::{self, Optimize, OptimizerConfig};
204use crate::session::{EndTransactionAction, Session};
205use crate::statement_logging::{
206 StatementEndedExecutionReason, StatementLifecycleEvent, StatementLoggingId,
207};
208use crate::util::{ClientTransmitter, ResultExt, sort_topological};
209use crate::webhook::{WebhookAppenderInvalidator, WebhookConcurrencyLimiter};
210use crate::{AdapterNotice, ReadHolds, flags};
211
212pub(crate) mod appends;
213pub(crate) mod catalog_serving;
214pub(crate) mod cluster_scheduling;
215pub(crate) mod consistency;
216pub(crate) mod id_bundle;
217pub(crate) mod in_memory_oracle;
218pub(crate) mod peek;
219pub(crate) mod read_policy;
220pub(crate) mod read_then_write;
221pub(crate) mod sequencer;
222pub(crate) mod statement_logging;
223pub(crate) mod timeline;
224pub(crate) mod timestamp_selection;
225
226pub mod catalog_implications;
227mod caught_up;
228mod command_handler;
229mod ddl;
230pub(crate) mod group_sync;
231mod indexes;
232mod introspection;
233mod message_handler;
234mod privatelink_status;
235mod sql;
236mod validity;
237
238#[derive(Debug)]
264pub(crate) struct IdPool {
265 next: u64,
266 upper: u64,
267}
268
269impl IdPool {
270 pub fn empty() -> Self {
272 IdPool { next: 0, upper: 0 }
273 }
274
275 pub fn allocate(&mut self) -> Option<u64> {
277 if self.next < self.upper {
278 let id = self.next;
279 self.next += 1;
280 Some(id)
281 } else {
282 None
283 }
284 }
285
286 pub fn allocate_many(&mut self, n: u64) -> Option<Vec<u64>> {
289 if self.remaining() >= n {
290 let ids = (self.next..self.next + n).collect();
291 self.next += n;
292 Some(ids)
293 } else {
294 None
295 }
296 }
297
298 pub fn remaining(&self) -> u64 {
300 self.upper - self.next
301 }
302
303 pub fn refill(&mut self, next: u64, upper: u64) {
305 assert!(next <= upper, "invalid pool range: {next}..{upper}");
306 self.next = next;
307 self.upper = upper;
308 }
309}
310
311#[derive(Debug)]
312pub enum Message {
313 Command(OpenTelemetryContext, Command),
314 ControllerReady {
315 controller: ControllerReadiness,
316 },
317 PurifiedStatementReady(PurifiedStatementReady),
318 CreateConnectionValidationReady(CreateConnectionValidationReady),
319 AlterConnectionValidationReady(AlterConnectionValidationReady),
320 TryDeferred {
321 conn_id: ConnectionId,
323 acquired_lock: Option<(CatalogItemId, tokio::sync::OwnedMutexGuard<()>)>,
333 },
334 GroupCommitInitiate(Span, Option<GroupCommitPermit>),
336 DeferredStatementReady,
337 AdvanceTimelines,
338 ClusterEvent(ClusterEvent),
339 CancelPendingPeeks {
340 conn_id: ConnectionId,
341 },
342 LinearizeReads,
343 StagedBatches {
344 conn_id: ConnectionId,
345 table_id: CatalogItemId,
346 batches: Vec<Result<ProtoBatch, String>>,
347 },
348 StorageUsageSchedule,
349 StorageUsageFetch,
350 StorageUsageUpdate(ShardsUsageReferenced),
351 StorageUsagePrune(Vec<BuiltinTableUpdate>),
352 ArrangementSizesSchedule,
353 ArrangementSizesSnapshot,
354 ArrangementSizesPrune(Vec<BuiltinTableUpdate>),
355 RetireExecute {
358 data: ExecuteContextExtra,
359 otel_ctx: OpenTelemetryContext,
360 reason: StatementEndedExecutionReason,
361 },
362 ExecuteSingleStatementTransaction {
363 ctx: ExecuteContext,
364 otel_ctx: OpenTelemetryContext,
365 stmt: Arc<Statement<Raw>>,
366 params: mz_sql::plan::Params,
367 },
368 PeekStageReady {
369 ctx: ExecuteContext,
370 span: Span,
371 stage: PeekStage,
372 },
373 CreateIndexStageReady {
374 ctx: ExecuteContext,
375 span: Span,
376 stage: CreateIndexStage,
377 },
378 CreateViewStageReady {
379 ctx: ExecuteContext,
380 span: Span,
381 stage: CreateViewStage,
382 },
383 CreateMaterializedViewStageReady {
384 ctx: ExecuteContext,
385 span: Span,
386 stage: CreateMaterializedViewStage,
387 },
388 SubscribeStageReady {
389 ctx: ExecuteContext,
390 span: Span,
391 stage: SubscribeStage,
392 },
393 IntrospectionSubscribeStageReady {
394 span: Span,
395 stage: IntrospectionSubscribeStage,
396 },
397 SecretStageReady {
398 ctx: ExecuteContext,
399 span: Span,
400 stage: SecretStage,
401 },
402 ClusterStageReady {
403 ctx: ExecuteContext,
404 span: Span,
405 stage: ClusterStage,
406 },
407 ExplainTimestampStageReady {
408 ctx: ExecuteContext,
409 span: Span,
410 stage: ExplainTimestampStage,
411 },
412 DrainStatementLog,
413 PrivateLinkVpcEndpointEvents(Vec<VpcEndpointEvent>),
414 CheckSchedulingPolicies,
415
416 SchedulingDecisions(Vec<(&'static str, Vec<(ClusterId, SchedulingDecision)>)>),
421}
422
423impl Message {
424 pub const fn kind(&self) -> &'static str {
426 match self {
427 Message::Command(_, msg) => match msg {
428 Command::CatalogSnapshot { .. } => "command-catalog_snapshot",
429 Command::Startup { .. } => "command-startup",
430 Command::Execute { .. } => "command-execute",
431 Command::Commit { .. } => "command-commit",
432 Command::CancelRequest { .. } => "command-cancel_request",
433 Command::PrivilegedCancelRequest { .. } => "command-privileged_cancel_request",
434 Command::GetWebhook { .. } => "command-get_webhook",
435 Command::GetSystemVars { .. } => "command-get_system_vars",
436 Command::SetSystemVars { .. } => "command-set_system_vars",
437 Command::Terminate { .. } => "command-terminate",
438 Command::RetireExecute { .. } => "command-retire_execute",
439 Command::CheckConsistency { .. } => "command-check_consistency",
440 Command::Dump { .. } => "command-dump",
441 Command::AuthenticatePassword { .. } => "command-auth_check",
442 Command::AuthenticateGetSASLChallenge { .. } => "command-auth_get_sasl_challenge",
443 Command::AuthenticateVerifySASLProof { .. } => "command-auth_verify_sasl_proof",
444 Command::CheckRoleCanLogin { .. } => "command-check_role_can_login",
445 Command::GetComputeInstanceClient { .. } => "get-compute-instance-client",
446 Command::GetOracle { .. } => "get-oracle",
447 Command::DetermineRealTimeRecentTimestamp { .. } => {
448 "determine-real-time-recent-timestamp"
449 }
450 Command::GetTransactionReadHoldsBundle { .. } => {
451 "get-transaction-read-holds-bundle"
452 }
453 Command::StoreTransactionReadHolds { .. } => "store-transaction-read-holds",
454 Command::ExecuteSlowPathPeek { .. } => "execute-slow-path-peek",
455 Command::ExecuteSubscribe { .. } => "execute-subscribe",
456 Command::CopyToPreflight { .. } => "copy-to-preflight",
457 Command::ExecuteCopyTo { .. } => "execute-copy-to",
458 Command::ExecuteSideEffectingFunc { .. } => "execute-side-effecting-func",
459 Command::RegisterFrontendPeek { .. } => "register-frontend-peek",
460 Command::UnregisterFrontendPeek { .. } => "unregister-frontend-peek",
461 Command::ExplainTimestamp { .. } => "explain-timestamp",
462 Command::FrontendStatementLogging(..) => "frontend-statement-logging",
463 Command::StartCopyFromStdin { .. } => "start-copy-from-stdin",
464 Command::InjectAuditEvents { .. } => "inject-audit-events",
465 },
466 Message::ControllerReady {
467 controller: ControllerReadiness::Compute,
468 } => "controller_ready(compute)",
469 Message::ControllerReady {
470 controller: ControllerReadiness::Storage,
471 } => "controller_ready(storage)",
472 Message::ControllerReady {
473 controller: ControllerReadiness::Metrics,
474 } => "controller_ready(metrics)",
475 Message::ControllerReady {
476 controller: ControllerReadiness::Internal,
477 } => "controller_ready(internal)",
478 Message::PurifiedStatementReady(_) => "purified_statement_ready",
479 Message::CreateConnectionValidationReady(_) => "create_connection_validation_ready",
480 Message::TryDeferred { .. } => "try_deferred",
481 Message::GroupCommitInitiate(..) => "group_commit_initiate",
482 Message::AdvanceTimelines => "advance_timelines",
483 Message::ClusterEvent(_) => "cluster_event",
484 Message::CancelPendingPeeks { .. } => "cancel_pending_peeks",
485 Message::LinearizeReads => "linearize_reads",
486 Message::StagedBatches { .. } => "staged_batches",
487 Message::StorageUsageSchedule => "storage_usage_schedule",
488 Message::StorageUsageFetch => "storage_usage_fetch",
489 Message::StorageUsageUpdate(_) => "storage_usage_update",
490 Message::StorageUsagePrune(_) => "storage_usage_prune",
491 Message::ArrangementSizesSchedule => "arrangement_sizes_schedule",
492 Message::ArrangementSizesSnapshot => "arrangement_sizes_snapshot",
493 Message::ArrangementSizesPrune(_) => "arrangement_sizes_prune",
494 Message::RetireExecute { .. } => "retire_execute",
495 Message::ExecuteSingleStatementTransaction { .. } => {
496 "execute_single_statement_transaction"
497 }
498 Message::PeekStageReady { .. } => "peek_stage_ready",
499 Message::ExplainTimestampStageReady { .. } => "explain_timestamp_stage_ready",
500 Message::CreateIndexStageReady { .. } => "create_index_stage_ready",
501 Message::CreateViewStageReady { .. } => "create_view_stage_ready",
502 Message::CreateMaterializedViewStageReady { .. } => {
503 "create_materialized_view_stage_ready"
504 }
505 Message::SubscribeStageReady { .. } => "subscribe_stage_ready",
506 Message::IntrospectionSubscribeStageReady { .. } => {
507 "introspection_subscribe_stage_ready"
508 }
509 Message::SecretStageReady { .. } => "secret_stage_ready",
510 Message::ClusterStageReady { .. } => "cluster_stage_ready",
511 Message::DrainStatementLog => "drain_statement_log",
512 Message::AlterConnectionValidationReady(..) => "alter_connection_validation_ready",
513 Message::PrivateLinkVpcEndpointEvents(_) => "private_link_vpc_endpoint_events",
514 Message::CheckSchedulingPolicies => "check_scheduling_policies",
515 Message::SchedulingDecisions { .. } => "scheduling_decision",
516 Message::DeferredStatementReady => "deferred_statement_ready",
517 }
518 }
519}
520
521#[derive(Debug)]
523pub enum ControllerReadiness {
524 Storage,
526 Compute,
528 Metrics,
530 Internal,
532}
533
534#[derive(Derivative)]
535#[derivative(Debug)]
536pub struct BackgroundWorkResult<T> {
537 #[derivative(Debug = "ignore")]
538 pub ctx: ExecuteContext,
539 pub result: Result<T, AdapterError>,
540 pub params: Params,
541 pub plan_validity: PlanValidity,
542 pub original_stmt: Arc<Statement<Raw>>,
543 pub otel_ctx: OpenTelemetryContext,
544}
545
546pub type PurifiedStatementReady = BackgroundWorkResult<mz_sql::pure::PurifiedStatement>;
547
548#[derive(Derivative)]
549#[derivative(Debug)]
550pub struct ValidationReady<T> {
551 #[derivative(Debug = "ignore")]
552 pub ctx: ExecuteContext,
553 pub result: Result<T, AdapterError>,
554 pub resolved_ids: ResolvedIds,
555 pub connection_id: CatalogItemId,
556 pub connection_gid: GlobalId,
557 pub plan_validity: PlanValidity,
558 pub otel_ctx: OpenTelemetryContext,
559}
560
561pub type CreateConnectionValidationReady = ValidationReady<CreateConnectionPlan>;
562pub type AlterConnectionValidationReady = ValidationReady<Connection>;
563
564#[derive(Debug)]
565pub enum PeekStage {
566 LinearizeTimestamp(PeekStageLinearizeTimestamp),
568 RealTimeRecency(PeekStageRealTimeRecency),
569 TimestampReadHold(PeekStageTimestampReadHold),
570 Optimize(PeekStageOptimize),
571 Finish(PeekStageFinish),
573 ExplainPlan(PeekStageExplainPlan),
575 ExplainPushdown(PeekStageExplainPushdown),
576 CopyToPreflight(PeekStageCopyTo),
578 CopyToDataflow(PeekStageCopyTo),
580}
581
582#[derive(Debug)]
583pub struct CopyToContext {
584 pub desc: RelationDesc,
586 pub uri: Uri,
588 pub connection: StorageConnection<ReferencedConnection>,
590 pub connection_id: CatalogItemId,
592 pub format: S3SinkFormat,
594 pub max_file_size: u64,
596 pub output_batch_count: Option<u64>,
601}
602
603#[derive(Debug)]
604pub struct PeekStageLinearizeTimestamp {
605 validity: PlanValidity,
606 plan: mz_sql::plan::SelectPlan,
607 max_query_result_size: Option<u64>,
608 source_ids: BTreeSet<GlobalId>,
609 target_replica: Option<ReplicaId>,
610 timeline_context: TimelineContext,
611 optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
612 explain_ctx: ExplainContext,
615}
616
617#[derive(Debug)]
618pub struct PeekStageRealTimeRecency {
619 validity: PlanValidity,
620 plan: mz_sql::plan::SelectPlan,
621 max_query_result_size: Option<u64>,
622 source_ids: BTreeSet<GlobalId>,
623 target_replica: Option<ReplicaId>,
624 timeline_context: TimelineContext,
625 oracle_read_ts: Option<Timestamp>,
626 optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
627 explain_ctx: ExplainContext,
630}
631
632#[derive(Debug)]
633pub struct PeekStageTimestampReadHold {
634 validity: PlanValidity,
635 plan: mz_sql::plan::SelectPlan,
636 max_query_result_size: Option<u64>,
637 source_ids: BTreeSet<GlobalId>,
638 target_replica: Option<ReplicaId>,
639 timeline_context: TimelineContext,
640 oracle_read_ts: Option<Timestamp>,
641 real_time_recency_ts: Option<mz_repr::Timestamp>,
642 optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
643 explain_ctx: ExplainContext,
646}
647
648#[derive(Debug)]
649pub struct PeekStageOptimize {
650 validity: PlanValidity,
651 plan: mz_sql::plan::SelectPlan,
652 max_query_result_size: Option<u64>,
653 source_ids: BTreeSet<GlobalId>,
654 id_bundle: CollectionIdBundle,
655 target_replica: Option<ReplicaId>,
656 determination: TimestampDetermination,
657 optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
658 explain_ctx: ExplainContext,
661}
662
663#[derive(Debug)]
664pub struct PeekStageFinish {
665 validity: PlanValidity,
666 plan: mz_sql::plan::SelectPlan,
667 max_query_result_size: Option<u64>,
668 id_bundle: CollectionIdBundle,
669 target_replica: Option<ReplicaId>,
670 source_ids: BTreeSet<GlobalId>,
671 determination: TimestampDetermination,
672 cluster_id: ComputeInstanceId,
673 finishing: RowSetFinishing,
674 plan_insights_optimizer_trace: Option<OptimizerTrace>,
677 insights_ctx: Option<Box<PlanInsightsContext>>,
678 global_lir_plan: optimize::peek::GlobalLirPlan,
679 optimization_finished_at: EpochMillis,
680}
681
682#[derive(Debug)]
683pub struct PeekStageCopyTo {
684 validity: PlanValidity,
685 optimizer: optimize::copy_to::Optimizer,
686 global_lir_plan: optimize::copy_to::GlobalLirPlan,
687 optimization_finished_at: EpochMillis,
688 source_ids: BTreeSet<GlobalId>,
689}
690
691#[derive(Debug)]
692pub struct PeekStageExplainPlan {
693 validity: PlanValidity,
694 optimizer: optimize::peek::Optimizer,
695 df_meta: DataflowMetainfo,
696 explain_ctx: ExplainPlanContext,
697 insights_ctx: Option<Box<PlanInsightsContext>>,
698}
699
700#[derive(Debug)]
701pub struct PeekStageExplainPushdown {
702 validity: PlanValidity,
703 determination: TimestampDetermination,
704 imports: BTreeMap<GlobalId, MapFilterProject>,
705}
706
707#[derive(Debug)]
708pub enum CreateIndexStage {
709 Optimize(CreateIndexOptimize),
710 Finish(CreateIndexFinish),
711 Explain(CreateIndexExplain),
712}
713
714#[derive(Debug)]
715pub struct CreateIndexOptimize {
716 validity: PlanValidity,
717 plan: plan::CreateIndexPlan,
718 resolved_ids: ResolvedIds,
719 explain_ctx: ExplainContext,
722}
723
724#[derive(Debug)]
725pub struct CreateIndexFinish {
726 validity: PlanValidity,
727 item_id: CatalogItemId,
728 global_id: GlobalId,
729 plan: plan::CreateIndexPlan,
730 resolved_ids: ResolvedIds,
731 global_mir_plan: optimize::index::GlobalMirPlan,
732 global_lir_plan: optimize::index::GlobalLirPlan,
733 optimizer_features: OptimizerFeatures,
734}
735
736#[derive(Debug)]
737pub struct CreateIndexExplain {
738 validity: PlanValidity,
739 exported_index_id: GlobalId,
740 plan: plan::CreateIndexPlan,
741 df_meta: DataflowMetainfo,
742 explain_ctx: ExplainPlanContext,
743}
744
745#[derive(Debug)]
746pub enum CreateViewStage {
747 Optimize(CreateViewOptimize),
748 Finish(CreateViewFinish),
749 Explain(CreateViewExplain),
750}
751
752#[derive(Debug)]
753pub struct CreateViewOptimize {
754 validity: PlanValidity,
755 plan: plan::CreateViewPlan,
756 resolved_ids: ResolvedIds,
757 explain_ctx: ExplainContext,
760}
761
762#[derive(Debug)]
763pub struct CreateViewFinish {
764 validity: PlanValidity,
765 item_id: CatalogItemId,
767 global_id: GlobalId,
769 plan: plan::CreateViewPlan,
770 resolved_ids: ResolvedIds,
772 optimized_expr: OptimizedMirRelationExpr,
773}
774
775#[derive(Debug)]
776pub struct CreateViewExplain {
777 validity: PlanValidity,
778 id: GlobalId,
779 plan: plan::CreateViewPlan,
780 explain_ctx: ExplainPlanContext,
781}
782
783#[derive(Debug)]
784pub enum ExplainTimestampStage {
785 Optimize(ExplainTimestampOptimize),
786 RealTimeRecency(ExplainTimestampRealTimeRecency),
787 Finish(ExplainTimestampFinish),
788}
789
790#[derive(Debug)]
791pub struct ExplainTimestampOptimize {
792 validity: PlanValidity,
793 plan: plan::ExplainTimestampPlan,
794 cluster_id: ClusterId,
795}
796
797#[derive(Debug)]
798pub struct ExplainTimestampRealTimeRecency {
799 validity: PlanValidity,
800 format: ExplainFormat,
801 optimized_plan: OptimizedMirRelationExpr,
802 cluster_id: ClusterId,
803 when: QueryWhen,
804}
805
806#[derive(Debug)]
807pub struct ExplainTimestampFinish {
808 validity: PlanValidity,
809 format: ExplainFormat,
810 optimized_plan: OptimizedMirRelationExpr,
811 cluster_id: ClusterId,
812 source_ids: BTreeSet<GlobalId>,
813 when: QueryWhen,
814 real_time_recency_ts: Option<Timestamp>,
815}
816
817#[derive(Debug)]
818pub enum ClusterStage {
819 Alter(AlterCluster),
820 WaitForHydrated(AlterClusterWaitForHydrated),
821 Finalize(AlterClusterFinalize),
822}
823
824#[derive(Debug)]
825pub struct AlterCluster {
826 validity: PlanValidity,
827 plan: plan::AlterClusterPlan,
828}
829
830#[derive(Debug)]
831pub struct AlterClusterWaitForHydrated {
832 validity: PlanValidity,
833 plan: plan::AlterClusterPlan,
834 new_config: ClusterVariantManaged,
835 workload_class: Option<String>,
836 timeout_time: Instant,
837 on_timeout: OnTimeoutAction,
838}
839
840#[derive(Debug)]
841pub struct AlterClusterFinalize {
842 validity: PlanValidity,
843 plan: plan::AlterClusterPlan,
844 new_config: ClusterVariantManaged,
845 workload_class: Option<String>,
846}
847
848#[derive(Debug)]
849pub enum ExplainContext {
850 None,
852 Plan(ExplainPlanContext),
854 PlanInsightsNotice(OptimizerTrace),
857 Pushdown,
859}
860
861impl ExplainContext {
862 pub(crate) fn dispatch_guard(&self) -> Option<DispatchGuard<'_>> {
866 let optimizer_trace = match self {
867 ExplainContext::Plan(explain_ctx) => Some(&explain_ctx.optimizer_trace),
868 ExplainContext::PlanInsightsNotice(optimizer_trace) => Some(optimizer_trace),
869 _ => None,
870 };
871 optimizer_trace.map(|optimizer_trace| optimizer_trace.as_guard())
872 }
873
874 pub(crate) fn needs_cluster(&self) -> bool {
875 match self {
876 ExplainContext::None => true,
877 ExplainContext::Plan(..) => false,
878 ExplainContext::PlanInsightsNotice(..) => true,
879 ExplainContext::Pushdown => false,
880 }
881 }
882
883 pub(crate) fn needs_plan_insights(&self) -> bool {
884 matches!(
885 self,
886 ExplainContext::Plan(ExplainPlanContext {
887 stage: ExplainStage::PlanInsights,
888 ..
889 }) | ExplainContext::PlanInsightsNotice(_)
890 )
891 }
892}
893
894#[derive(Debug)]
895pub struct ExplainPlanContext {
896 pub broken: bool,
901 pub config: ExplainConfig,
902 pub format: ExplainFormat,
903 pub stage: ExplainStage,
904 pub replan: Option<GlobalId>,
905 pub desc: Option<RelationDesc>,
906 pub optimizer_trace: OptimizerTrace,
907}
908
909#[derive(Debug)]
910pub enum CreateMaterializedViewStage {
911 Optimize(CreateMaterializedViewOptimize),
912 Finish(CreateMaterializedViewFinish),
913 Explain(CreateMaterializedViewExplain),
914}
915
916#[derive(Debug)]
917pub struct CreateMaterializedViewOptimize {
918 validity: PlanValidity,
919 plan: plan::CreateMaterializedViewPlan,
920 resolved_ids: ResolvedIds,
921 explain_ctx: ExplainContext,
924}
925
926#[derive(Debug)]
927pub struct CreateMaterializedViewFinish {
928 item_id: CatalogItemId,
930 global_id: GlobalId,
932 validity: PlanValidity,
933 plan: plan::CreateMaterializedViewPlan,
934 resolved_ids: ResolvedIds,
935 local_mir_plan: optimize::materialized_view::LocalMirPlan,
936 global_mir_plan: optimize::materialized_view::GlobalMirPlan,
937 global_lir_plan: optimize::materialized_view::GlobalLirPlan,
938 optimizer_features: OptimizerFeatures,
939}
940
941#[derive(Debug)]
942pub struct CreateMaterializedViewExplain {
943 global_id: GlobalId,
944 validity: PlanValidity,
945 plan: plan::CreateMaterializedViewPlan,
946 df_meta: DataflowMetainfo,
947 explain_ctx: ExplainPlanContext,
948}
949
950#[derive(Debug)]
951pub enum SubscribeStage {
952 OptimizeMir(SubscribeOptimizeMir),
953 TimestampOptimizeLir(SubscribeTimestampOptimizeLir),
954 Finish(SubscribeFinish),
955 Explain(SubscribeExplain),
956}
957
958#[derive(Debug)]
959pub struct SubscribeOptimizeMir {
960 validity: PlanValidity,
961 plan: plan::SubscribePlan,
962 timeline: TimelineContext,
963 dependency_ids: BTreeSet<GlobalId>,
964 cluster_id: ComputeInstanceId,
965 replica_id: Option<ReplicaId>,
966 explain_ctx: ExplainContext,
969}
970
971#[derive(Debug)]
972pub struct SubscribeTimestampOptimizeLir {
973 validity: PlanValidity,
974 plan: plan::SubscribePlan,
975 timeline: TimelineContext,
976 optimizer: optimize::subscribe::Optimizer,
977 global_mir_plan: optimize::subscribe::GlobalMirPlan<optimize::subscribe::Unresolved>,
978 dependency_ids: BTreeSet<GlobalId>,
979 replica_id: Option<ReplicaId>,
980 explain_ctx: ExplainContext,
983}
984
985#[derive(Debug)]
986pub struct SubscribeFinish {
987 validity: PlanValidity,
988 cluster_id: ComputeInstanceId,
989 replica_id: Option<ReplicaId>,
990 plan: plan::SubscribePlan,
991 global_lir_plan: optimize::subscribe::GlobalLirPlan,
992 dependency_ids: BTreeSet<GlobalId>,
993}
994
995#[derive(Debug)]
996pub struct SubscribeExplain {
997 validity: PlanValidity,
998 optimizer: optimize::subscribe::Optimizer,
999 df_meta: DataflowMetainfo,
1000 cluster_id: ComputeInstanceId,
1001 explain_ctx: ExplainPlanContext,
1002}
1003
1004#[derive(Debug)]
1005pub enum IntrospectionSubscribeStage {
1006 OptimizeMir(IntrospectionSubscribeOptimizeMir),
1007 TimestampOptimizeLir(IntrospectionSubscribeTimestampOptimizeLir),
1008 Finish(IntrospectionSubscribeFinish),
1009}
1010
1011#[derive(Debug)]
1012pub struct IntrospectionSubscribeOptimizeMir {
1013 validity: PlanValidity,
1014 plan: plan::SubscribePlan,
1015 subscribe_id: GlobalId,
1016 cluster_id: ComputeInstanceId,
1017 replica_id: ReplicaId,
1018}
1019
1020#[derive(Debug)]
1021pub struct IntrospectionSubscribeTimestampOptimizeLir {
1022 validity: PlanValidity,
1023 optimizer: optimize::subscribe::Optimizer,
1024 global_mir_plan: optimize::subscribe::GlobalMirPlan<optimize::subscribe::Unresolved>,
1025 cluster_id: ComputeInstanceId,
1026 replica_id: ReplicaId,
1027}
1028
1029#[derive(Debug)]
1030pub struct IntrospectionSubscribeFinish {
1031 validity: PlanValidity,
1032 global_lir_plan: optimize::subscribe::GlobalLirPlan,
1033 read_holds: ReadHolds,
1034 cluster_id: ComputeInstanceId,
1035 replica_id: ReplicaId,
1036}
1037
1038#[derive(Debug)]
1039pub enum SecretStage {
1040 CreateEnsure(CreateSecretEnsure),
1041 CreateFinish(CreateSecretFinish),
1042 RotateKeysEnsure(RotateKeysSecretEnsure),
1043 RotateKeysFinish(RotateKeysSecretFinish),
1044 Alter(AlterSecret),
1045}
1046
1047#[derive(Debug)]
1048pub struct CreateSecretEnsure {
1049 validity: PlanValidity,
1050 plan: plan::CreateSecretPlan,
1051}
1052
1053#[derive(Debug)]
1054pub struct CreateSecretFinish {
1055 validity: PlanValidity,
1056 item_id: CatalogItemId,
1057 global_id: GlobalId,
1058 plan: plan::CreateSecretPlan,
1059}
1060
1061#[derive(Debug)]
1062pub struct RotateKeysSecretEnsure {
1063 validity: PlanValidity,
1064 id: CatalogItemId,
1065}
1066
1067#[derive(Debug)]
1068pub struct RotateKeysSecretFinish {
1069 validity: PlanValidity,
1070 ops: Vec<crate::catalog::Op>,
1071}
1072
1073#[derive(Debug)]
1074pub struct AlterSecret {
1075 validity: PlanValidity,
1076 plan: plan::AlterSecretPlan,
1077}
1078
1079#[derive(Debug, Copy, Clone, PartialEq, Eq)]
1084pub enum TargetCluster {
1085 CatalogServer,
1087 Active,
1089 Transaction(ClusterId),
1091}
1092
1093pub(crate) enum StageResult<T> {
1095 Handle(JoinHandle<Result<T, AdapterError>>),
1097 HandleRetire(JoinHandle<Result<ExecuteResponse, AdapterError>>),
1099 Immediate(T),
1101 Response(ExecuteResponse),
1103}
1104
1105pub(crate) trait Staged: Send {
1107 type Ctx: StagedContext;
1108
1109 fn validity(&mut self) -> &mut PlanValidity;
1110
1111 async fn stage(
1113 self,
1114 coord: &mut Coordinator,
1115 ctx: &mut Self::Ctx,
1116 ) -> Result<StageResult<Box<Self>>, AdapterError>;
1117
1118 fn message(self, ctx: Self::Ctx, span: Span) -> Message;
1120
1121 fn cancel_enabled(&self) -> bool;
1123}
1124
1125pub trait StagedContext {
1126 fn retire(self, result: Result<ExecuteResponse, AdapterError>);
1127 fn session(&self) -> Option<&Session>;
1128}
1129
1130impl StagedContext for ExecuteContext {
1131 fn retire(self, result: Result<ExecuteResponse, AdapterError>) {
1132 self.retire(result);
1133 }
1134
1135 fn session(&self) -> Option<&Session> {
1136 Some(self.session())
1137 }
1138}
1139
1140impl StagedContext for () {
1141 fn retire(self, _result: Result<ExecuteResponse, AdapterError>) {}
1142
1143 fn session(&self) -> Option<&Session> {
1144 None
1145 }
1146}
1147
1148pub struct Config {
1150 pub controller_config: ControllerConfig,
1151 pub controller_envd_epoch: NonZeroI64,
1152 pub storage: Box<dyn mz_catalog::durable::DurableCatalogState>,
1153 pub audit_logs_iterator: AuditLogIterator,
1154 pub timestamp_oracle_url: Option<SensitiveUrl>,
1155 pub unsafe_mode: bool,
1156 pub all_features: bool,
1157 pub build_info: &'static BuildInfo,
1158 pub environment_id: EnvironmentId,
1159 pub metrics_registry: MetricsRegistry,
1160 pub now: NowFn,
1161 pub secrets_controller: Arc<dyn SecretsController>,
1162 pub cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
1163 pub availability_zones: Vec<String>,
1164 pub cluster_replica_sizes: ClusterReplicaSizeMap,
1165 pub builtin_system_cluster_config: BootstrapBuiltinClusterConfig,
1166 pub builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig,
1167 pub builtin_probe_cluster_config: BootstrapBuiltinClusterConfig,
1168 pub builtin_support_cluster_config: BootstrapBuiltinClusterConfig,
1169 pub builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig,
1170 pub system_parameter_defaults: BTreeMap<String, String>,
1171 pub storage_usage_client: StorageUsageClient,
1172 pub storage_usage_collection_interval: Duration,
1173 pub storage_usage_retention_period: Option<Duration>,
1174 pub segment_client: Option<mz_segment::Client>,
1175 pub egress_addresses: Vec<IpNet>,
1176 pub remote_system_parameters: Option<BTreeMap<String, String>>,
1177 pub aws_account_id: Option<String>,
1178 pub aws_privatelink_availability_zones: Option<Vec<String>>,
1179 pub connection_context: ConnectionContext,
1180 pub connection_limit_callback: Box<dyn Fn(u64, u64) -> () + Send + Sync + 'static>,
1181 pub webhook_concurrency_limit: WebhookConcurrencyLimiter,
1182 pub http_host_name: Option<String>,
1183 pub tracing_handle: TracingHandle,
1184 pub read_only_controllers: bool,
1188
1189 pub caught_up_trigger: Option<Trigger>,
1193
1194 pub helm_chart_version: Option<String>,
1195 pub license_key: ValidatedLicenseKey,
1196 pub external_login_password_mz_system: Option<Password>,
1197 pub force_builtin_schema_migration: Option<String>,
1198}
1199
1200#[derive(Debug, Serialize)]
1202pub struct ConnMeta {
1203 secret_key: u32,
1208 connected_at: EpochMillis,
1210 user: User,
1211 application_name: String,
1212 uuid: Uuid,
1213 conn_id: ConnectionId,
1214 client_ip: Option<IpAddr>,
1215
1216 drop_sinks: BTreeSet<GlobalId>,
1219
1220 #[serde(skip)]
1222 deferred_lock: Option<OwnedMutexGuard<()>>,
1223
1224 pending_cluster_alters: BTreeSet<ClusterId>,
1227
1228 #[serde(skip)]
1230 notice_tx: mpsc::UnboundedSender<AdapterNotice>,
1231
1232 authenticated_role: RoleId,
1236}
1237
1238impl ConnMeta {
1239 pub fn conn_id(&self) -> &ConnectionId {
1240 &self.conn_id
1241 }
1242
1243 pub fn user(&self) -> &User {
1244 &self.user
1245 }
1246
1247 pub fn application_name(&self) -> &str {
1248 &self.application_name
1249 }
1250
1251 pub fn authenticated_role_id(&self) -> &RoleId {
1252 &self.authenticated_role
1253 }
1254
1255 pub fn uuid(&self) -> Uuid {
1256 self.uuid
1257 }
1258
1259 pub fn client_ip(&self) -> Option<IpAddr> {
1260 self.client_ip
1261 }
1262
1263 pub fn connected_at(&self) -> EpochMillis {
1264 self.connected_at
1265 }
1266}
1267
1268#[derive(Debug)]
1269pub struct PendingTxn {
1271 ctx: ExecuteContext,
1273 response: Result<PendingTxnResponse, AdapterError>,
1275 action: EndTransactionAction,
1277}
1278
1279#[derive(Debug)]
1280pub enum PendingTxnResponse {
1282 Committed {
1284 params: BTreeMap<&'static str, String>,
1286 },
1287 Rolledback {
1289 params: BTreeMap<&'static str, String>,
1291 },
1292}
1293
1294impl PendingTxnResponse {
1295 pub fn extend_params(&mut self, p: impl IntoIterator<Item = (&'static str, String)>) {
1296 match self {
1297 PendingTxnResponse::Committed { params }
1298 | PendingTxnResponse::Rolledback { params } => params.extend(p),
1299 }
1300 }
1301}
1302
1303impl From<PendingTxnResponse> for ExecuteResponse {
1304 fn from(value: PendingTxnResponse) -> Self {
1305 match value {
1306 PendingTxnResponse::Committed { params } => {
1307 ExecuteResponse::TransactionCommitted { params }
1308 }
1309 PendingTxnResponse::Rolledback { params } => {
1310 ExecuteResponse::TransactionRolledBack { params }
1311 }
1312 }
1313 }
1314}
1315
1316#[derive(Debug)]
1317pub struct PendingReadTxn {
1319 txn: PendingRead,
1321 timestamp_context: TimestampContext,
1323 created: Instant,
1325 num_requeues: u64,
1329 otel_ctx: OpenTelemetryContext,
1331}
1332
1333impl PendingReadTxn {
1334 pub fn timestamp_context(&self) -> &TimestampContext {
1336 &self.timestamp_context
1337 }
1338
1339 pub(crate) fn take_context(self) -> ExecuteContext {
1340 self.txn.take_context()
1341 }
1342}
1343
1344#[derive(Debug)]
1345enum PendingRead {
1347 Read {
1348 txn: PendingTxn,
1350 },
1351 ReadThenWrite {
1352 ctx: ExecuteContext,
1354 tx: oneshot::Sender<Option<ExecuteContext>>,
1357 },
1358}
1359
1360impl PendingRead {
1361 #[instrument(level = "debug")]
1366 pub fn finish(self) -> Option<(ExecuteContext, Result<ExecuteResponse, AdapterError>)> {
1367 match self {
1368 PendingRead::Read {
1369 txn:
1370 PendingTxn {
1371 mut ctx,
1372 response,
1373 action,
1374 },
1375 ..
1376 } => {
1377 let changed = ctx.session_mut().vars_mut().end_transaction(action);
1378 let response = response.map(|mut r| {
1380 r.extend_params(changed);
1381 ExecuteResponse::from(r)
1382 });
1383
1384 Some((ctx, response))
1385 }
1386 PendingRead::ReadThenWrite { ctx, tx, .. } => {
1387 let _ = tx.send(Some(ctx));
1389 None
1390 }
1391 }
1392 }
1393
1394 fn label(&self) -> &'static str {
1395 match self {
1396 PendingRead::Read { .. } => "read",
1397 PendingRead::ReadThenWrite { .. } => "read_then_write",
1398 }
1399 }
1400
1401 pub(crate) fn take_context(self) -> ExecuteContext {
1402 match self {
1403 PendingRead::Read { txn, .. } => txn.ctx,
1404 PendingRead::ReadThenWrite { ctx, tx, .. } => {
1405 let _ = tx.send(None);
1408 ctx
1409 }
1410 }
1411 }
1412}
1413
1414#[derive(Debug, Default)]
1424#[must_use]
1425pub struct ExecuteContextExtra {
1426 statement_uuid: Option<StatementLoggingId>,
1427}
1428
1429impl ExecuteContextExtra {
1430 pub(crate) fn new(statement_uuid: Option<StatementLoggingId>) -> Self {
1431 Self { statement_uuid }
1432 }
1433 pub fn is_trivial(&self) -> bool {
1434 self.statement_uuid.is_none()
1435 }
1436 pub fn contents(&self) -> Option<StatementLoggingId> {
1437 self.statement_uuid
1438 }
1439 #[must_use]
1443 pub(crate) fn retire(self) -> Option<StatementLoggingId> {
1444 self.statement_uuid
1445 }
1446}
1447
1448#[derive(Debug)]
1458#[must_use]
1459pub struct ExecuteContextGuard {
1460 extra: ExecuteContextExtra,
1461 coordinator_tx: mpsc::UnboundedSender<Message>,
1466}
1467
1468impl Default for ExecuteContextGuard {
1469 fn default() -> Self {
1470 let (tx, _rx) = mpsc::unbounded_channel();
1474 Self {
1475 extra: ExecuteContextExtra::default(),
1476 coordinator_tx: tx,
1477 }
1478 }
1479}
1480
1481impl ExecuteContextGuard {
1482 pub(crate) fn new(
1483 statement_uuid: Option<StatementLoggingId>,
1484 coordinator_tx: mpsc::UnboundedSender<Message>,
1485 ) -> Self {
1486 Self {
1487 extra: ExecuteContextExtra::new(statement_uuid),
1488 coordinator_tx,
1489 }
1490 }
1491 pub fn is_trivial(&self) -> bool {
1492 self.extra.is_trivial()
1493 }
1494 pub fn contents(&self) -> Option<StatementLoggingId> {
1495 self.extra.contents()
1496 }
1497 pub(crate) fn defuse(mut self) -> ExecuteContextExtra {
1504 std::mem::take(&mut self.extra)
1506 }
1507}
1508
1509impl Drop for ExecuteContextGuard {
1510 fn drop(&mut self) {
1511 if let Some(statement_uuid) = self.extra.statement_uuid.take() {
1512 let msg = Message::RetireExecute {
1515 data: ExecuteContextExtra {
1516 statement_uuid: Some(statement_uuid),
1517 },
1518 otel_ctx: OpenTelemetryContext::obtain(),
1519 reason: StatementEndedExecutionReason::Aborted,
1520 };
1521 let _ = self.coordinator_tx.send(msg);
1524 }
1525 }
1526}
1527
1528#[derive(Debug)]
1540pub struct ExecuteContext {
1541 inner: Box<ExecuteContextInner>,
1542}
1543
1544impl std::ops::Deref for ExecuteContext {
1545 type Target = ExecuteContextInner;
1546 fn deref(&self) -> &Self::Target {
1547 &*self.inner
1548 }
1549}
1550
1551impl std::ops::DerefMut for ExecuteContext {
1552 fn deref_mut(&mut self) -> &mut Self::Target {
1553 &mut *self.inner
1554 }
1555}
1556
1557#[derive(Debug)]
1558pub struct ExecuteContextInner {
1559 tx: ClientTransmitter<ExecuteResponse>,
1560 internal_cmd_tx: mpsc::UnboundedSender<Message>,
1561 session: Session,
1562 extra: ExecuteContextGuard,
1563}
1564
1565impl ExecuteContext {
1566 pub fn session(&self) -> &Session {
1567 &self.session
1568 }
1569
1570 pub fn session_mut(&mut self) -> &mut Session {
1571 &mut self.session
1572 }
1573
1574 pub fn tx(&self) -> &ClientTransmitter<ExecuteResponse> {
1575 &self.tx
1576 }
1577
1578 pub fn tx_mut(&mut self) -> &mut ClientTransmitter<ExecuteResponse> {
1579 &mut self.tx
1580 }
1581
1582 pub fn from_parts(
1583 tx: ClientTransmitter<ExecuteResponse>,
1584 internal_cmd_tx: mpsc::UnboundedSender<Message>,
1585 session: Session,
1586 extra: ExecuteContextGuard,
1587 ) -> Self {
1588 Self {
1589 inner: ExecuteContextInner {
1590 tx,
1591 session,
1592 extra,
1593 internal_cmd_tx,
1594 }
1595 .into(),
1596 }
1597 }
1598
1599 pub fn into_parts(
1608 self,
1609 ) -> (
1610 ClientTransmitter<ExecuteResponse>,
1611 mpsc::UnboundedSender<Message>,
1612 Session,
1613 ExecuteContextGuard,
1614 ) {
1615 let ExecuteContextInner {
1616 tx,
1617 internal_cmd_tx,
1618 session,
1619 extra,
1620 } = *self.inner;
1621 (tx, internal_cmd_tx, session, extra)
1622 }
1623
1624 #[instrument(level = "debug")]
1626 pub fn retire(self, result: Result<ExecuteResponse, AdapterError>) {
1627 let ExecuteContextInner {
1628 tx,
1629 internal_cmd_tx,
1630 session,
1631 extra,
1632 } = *self.inner;
1633 let reason = if extra.is_trivial() {
1634 None
1635 } else {
1636 Some((&result).into())
1637 };
1638 tx.send(result, session);
1639 if let Some(reason) = reason {
1640 let extra = extra.defuse();
1642 if let Err(e) = internal_cmd_tx.send(Message::RetireExecute {
1643 otel_ctx: OpenTelemetryContext::obtain(),
1644 data: extra,
1645 reason,
1646 }) {
1647 warn!("internal_cmd_rx dropped before we could send: {:?}", e);
1648 }
1649 }
1650 }
1651
1652 pub fn extra(&self) -> &ExecuteContextGuard {
1653 &self.extra
1654 }
1655
1656 pub fn extra_mut(&mut self) -> &mut ExecuteContextGuard {
1657 &mut self.extra
1658 }
1659}
1660
1661#[derive(Debug)]
1662struct ClusterReplicaStatuses(
1663 BTreeMap<ClusterId, BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>>>,
1664);
1665
1666impl ClusterReplicaStatuses {
1667 pub(crate) fn new() -> ClusterReplicaStatuses {
1668 ClusterReplicaStatuses(BTreeMap::new())
1669 }
1670
1671 pub(crate) fn initialize_cluster_statuses(&mut self, cluster_id: ClusterId) {
1675 let prev = self.0.insert(cluster_id, BTreeMap::new());
1676 assert_eq!(
1677 prev, None,
1678 "cluster {cluster_id} statuses already initialized"
1679 );
1680 }
1681
1682 pub(crate) fn initialize_cluster_replica_statuses(
1686 &mut self,
1687 cluster_id: ClusterId,
1688 replica_id: ReplicaId,
1689 num_processes: usize,
1690 time: DateTime<Utc>,
1691 ) {
1692 tracing::info!(
1693 ?cluster_id,
1694 ?replica_id,
1695 ?time,
1696 "initializing cluster replica status"
1697 );
1698 let replica_statuses = self.0.entry(cluster_id).or_default();
1699 let process_statuses = (0..num_processes)
1700 .map(|process_id| {
1701 let status = ClusterReplicaProcessStatus {
1702 status: ClusterStatus::Offline(Some(OfflineReason::Initializing)),
1703 time: time.clone(),
1704 };
1705 (u64::cast_from(process_id), status)
1706 })
1707 .collect();
1708 let prev = replica_statuses.insert(replica_id, process_statuses);
1709 assert_none!(
1710 prev,
1711 "cluster replica {cluster_id}.{replica_id} statuses already initialized"
1712 );
1713 }
1714
1715 pub(crate) fn remove_cluster_statuses(
1719 &mut self,
1720 cluster_id: &ClusterId,
1721 ) -> BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>> {
1722 let prev = self.0.remove(cluster_id);
1723 prev.unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"))
1724 }
1725
1726 pub(crate) fn remove_cluster_replica_statuses(
1730 &mut self,
1731 cluster_id: &ClusterId,
1732 replica_id: &ReplicaId,
1733 ) -> BTreeMap<ProcessId, ClusterReplicaProcessStatus> {
1734 let replica_statuses = self
1735 .0
1736 .get_mut(cluster_id)
1737 .unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"));
1738 let prev = replica_statuses.remove(replica_id);
1739 prev.unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1740 }
1741
1742 pub(crate) fn ensure_cluster_status(
1746 &mut self,
1747 cluster_id: ClusterId,
1748 replica_id: ReplicaId,
1749 process_id: ProcessId,
1750 status: ClusterReplicaProcessStatus,
1751 ) {
1752 let replica_statuses = self
1753 .0
1754 .get_mut(&cluster_id)
1755 .unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"))
1756 .get_mut(&replica_id)
1757 .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"));
1758 replica_statuses.insert(process_id, status);
1759 }
1760
1761 pub fn get_cluster_replica_status(
1765 &self,
1766 cluster_id: ClusterId,
1767 replica_id: ReplicaId,
1768 ) -> ClusterStatus {
1769 let process_status = self.get_cluster_replica_statuses(cluster_id, replica_id);
1770 Self::cluster_replica_status(process_status)
1771 }
1772
1773 pub fn cluster_replica_status(
1775 process_status: &BTreeMap<ProcessId, ClusterReplicaProcessStatus>,
1776 ) -> ClusterStatus {
1777 process_status
1778 .values()
1779 .fold(ClusterStatus::Online, |s, p| match (s, p.status) {
1780 (ClusterStatus::Online, ClusterStatus::Online) => ClusterStatus::Online,
1781 (x, y) => {
1782 let reason_x = match x {
1783 ClusterStatus::Offline(reason) => reason,
1784 ClusterStatus::Online => None,
1785 };
1786 let reason_y = match y {
1787 ClusterStatus::Offline(reason) => reason,
1788 ClusterStatus::Online => None,
1789 };
1790 ClusterStatus::Offline(reason_x.or(reason_y))
1792 }
1793 })
1794 }
1795
1796 pub(crate) fn get_cluster_replica_statuses(
1800 &self,
1801 cluster_id: ClusterId,
1802 replica_id: ReplicaId,
1803 ) -> &BTreeMap<ProcessId, ClusterReplicaProcessStatus> {
1804 self.try_get_cluster_replica_statuses(cluster_id, replica_id)
1805 .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1806 }
1807
1808 pub(crate) fn try_get_cluster_replica_statuses(
1810 &self,
1811 cluster_id: ClusterId,
1812 replica_id: ReplicaId,
1813 ) -> Option<&BTreeMap<ProcessId, ClusterReplicaProcessStatus>> {
1814 self.try_get_cluster_statuses(cluster_id)
1815 .and_then(|statuses| statuses.get(&replica_id))
1816 }
1817
1818 pub(crate) fn try_get_cluster_statuses(
1820 &self,
1821 cluster_id: ClusterId,
1822 ) -> Option<&BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>>> {
1823 self.0.get(&cluster_id)
1824 }
1825}
1826
1827#[derive(Derivative)]
1829#[derivative(Debug)]
1830pub struct Coordinator {
1831 #[derivative(Debug = "ignore")]
1833 controller: mz_controller::Controller,
1834 catalog: Arc<Catalog>,
1842
1843 persist_client: PersistClient,
1846
1847 internal_cmd_tx: mpsc::UnboundedSender<Message>,
1849 group_commit_tx: appends::GroupCommitNotifier,
1851
1852 strict_serializable_reads_tx: mpsc::UnboundedSender<(ConnectionId, PendingReadTxn)>,
1854
1855 global_timelines: BTreeMap<Timeline, TimelineState>,
1858
1859 transient_id_gen: Arc<TransientIdGen>,
1861 active_conns: BTreeMap<ConnectionId, ConnMeta>,
1864
1865 txn_read_holds: BTreeMap<ConnectionId, read_policy::ReadHolds>,
1869
1870 pending_peeks: BTreeMap<Uuid, PendingPeek>,
1874 client_pending_peeks: BTreeMap<ConnectionId, BTreeMap<Uuid, ClusterId>>,
1876
1877 pending_linearize_read_txns: BTreeMap<ConnectionId, PendingReadTxn>,
1879
1880 active_compute_sinks: BTreeMap<GlobalId, ActiveComputeSink>,
1882 active_webhooks: BTreeMap<CatalogItemId, WebhookAppenderInvalidator>,
1884 active_copies: BTreeMap<ConnectionId, ActiveCopyFrom>,
1887
1888 staged_cancellation: BTreeMap<ConnectionId, (watch::Sender<bool>, watch::Receiver<bool>)>,
1891 introspection_subscribes: BTreeMap<GlobalId, IntrospectionSubscribe>,
1893
1894 write_locks: BTreeMap<CatalogItemId, Arc<tokio::sync::Mutex<()>>>,
1896 deferred_write_ops: BTreeMap<ConnectionId, DeferredOp>,
1898
1899 pending_writes: Vec<PendingWriteTxn>,
1901
1902 advance_timelines_interval: Interval,
1912
1913 serialized_ddl: LockedVecDeque<DeferredPlanStatement>,
1922
1923 secrets_controller: Arc<dyn SecretsController>,
1926 caching_secrets_reader: CachingSecretsReader,
1928
1929 cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
1932
1933 storage_usage_client: StorageUsageClient,
1935 storage_usage_collection_interval: Duration,
1937
1938 #[derivative(Debug = "ignore")]
1940 segment_client: Option<mz_segment::Client>,
1941
1942 metrics: Metrics,
1944 optimizer_metrics: OptimizerMetrics,
1946
1947 tracing_handle: TracingHandle,
1949
1950 statement_logging: StatementLogging,
1952
1953 webhook_concurrency_limit: WebhookConcurrencyLimiter,
1955
1956 timestamp_oracle_config: Option<TimestampOracleConfig>,
1959
1960 check_cluster_scheduling_policies_interval: Interval,
1962
1963 cluster_scheduling_decisions: BTreeMap<ClusterId, BTreeMap<&'static str, SchedulingDecision>>,
1967
1968 caught_up_check_interval: Interval,
1971
1972 caught_up_check: Option<CaughtUpCheckContext>,
1975
1976 installed_watch_sets: BTreeMap<WatchSetId, (ConnectionId, WatchSetResponse)>,
1978
1979 connection_watch_sets: BTreeMap<ConnectionId, BTreeSet<WatchSetId>>,
1981
1982 cluster_replica_statuses: ClusterReplicaStatuses,
1984
1985 read_only_controllers: bool,
1989
1990 buffered_builtin_table_updates: Option<Vec<BuiltinTableUpdate>>,
1998
1999 license_key: ValidatedLicenseKey,
2000
2001 user_id_pool: IdPool,
2003}
2004
2005impl Coordinator {
2006 #[instrument(name = "coord::bootstrap")]
2010 pub(crate) async fn bootstrap(
2011 &mut self,
2012 boot_ts: Timestamp,
2013 migrated_storage_collections_0dt: BTreeSet<CatalogItemId>,
2014 mut builtin_table_updates: Vec<BuiltinTableUpdate>,
2015 cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
2016 uncached_local_exprs: BTreeMap<GlobalId, LocalExpressions>,
2017 audit_logs_iterator: AuditLogIterator,
2018 ) -> Result<(), AdapterError> {
2019 let bootstrap_start = Instant::now();
2020 info!("startup: coordinator init: bootstrap beginning");
2021 info!("startup: coordinator init: bootstrap: preamble beginning");
2022
2023 let cluster_statuses: Vec<(_, Vec<_>)> = self
2026 .catalog()
2027 .clusters()
2028 .map(|cluster| {
2029 (
2030 cluster.id(),
2031 cluster
2032 .replicas()
2033 .map(|replica| {
2034 (replica.replica_id, replica.config.location.num_processes())
2035 })
2036 .collect(),
2037 )
2038 })
2039 .collect();
2040 let now = self.now_datetime();
2041 for (cluster_id, replica_statuses) in cluster_statuses {
2042 self.cluster_replica_statuses
2043 .initialize_cluster_statuses(cluster_id);
2044 for (replica_id, num_processes) in replica_statuses {
2045 self.cluster_replica_statuses
2046 .initialize_cluster_replica_statuses(
2047 cluster_id,
2048 replica_id,
2049 num_processes,
2050 now,
2051 );
2052 }
2053 }
2054
2055 let system_config = self.catalog().system_config();
2056
2057 mz_metrics::update_dyncfg(&system_config.dyncfg_updates());
2059
2060 let compute_config = flags::compute_config(system_config);
2062 let storage_config = flags::storage_config(system_config);
2063 let scheduling_config = flags::orchestrator_scheduling_config(system_config);
2064 let dyncfg_updates = system_config.dyncfg_updates();
2065 self.controller.compute.update_configuration(compute_config);
2066 self.controller.storage.update_parameters(storage_config);
2067 self.controller
2068 .update_orchestrator_scheduling_config(scheduling_config);
2069 self.controller.update_configuration(dyncfg_updates);
2070
2071 let enforce_credit_limit_at_bootstrap = !matches!(
2076 self.license_key.expiration_behavior,
2077 ExpirationBehavior::DisableClusterCreation,
2078 );
2079 if enforce_credit_limit_at_bootstrap {
2080 self.validate_resource_limit_numeric(
2081 Numeric::zero(),
2082 self.current_credit_consumption_rate(),
2083 |system_vars| {
2084 self.license_key
2085 .max_credit_consumption_rate()
2086 .map_or_else(|| system_vars.max_credit_consumption_rate(), Numeric::from)
2087 },
2088 "cluster replica",
2089 MAX_CREDIT_CONSUMPTION_RATE.name(),
2090 )?;
2091 }
2092
2093 let mut policies_to_set: BTreeMap<CompactionWindow, CollectionIdBundle> =
2094 Default::default();
2095
2096 let enable_worker_core_affinity =
2097 self.catalog().system_config().enable_worker_core_affinity();
2098 let enable_storage_introspection_logs = self
2099 .catalog()
2100 .system_config()
2101 .enable_storage_introspection_logs();
2102 for instance in self.catalog.clusters() {
2103 self.controller.create_cluster(
2104 instance.id,
2105 ClusterConfig {
2106 arranged_logs: instance.log_indexes.clone(),
2107 workload_class: instance.config.workload_class.clone(),
2108 },
2109 )?;
2110 for replica in instance.replicas() {
2111 let role = instance.role();
2112 self.controller.create_replica(
2113 instance.id,
2114 replica.replica_id,
2115 instance.name.clone(),
2116 replica.name.clone(),
2117 role,
2118 replica.config.clone(),
2119 enable_worker_core_affinity,
2120 enable_storage_introspection_logs,
2121 )?;
2122 }
2123 }
2124
2125 info!(
2126 "startup: coordinator init: bootstrap: preamble complete in {:?}",
2127 bootstrap_start.elapsed()
2128 );
2129
2130 let init_storage_collections_start = Instant::now();
2131 info!("startup: coordinator init: bootstrap: storage collections init beginning");
2132 self.bootstrap_storage_collections(&migrated_storage_collections_0dt)
2133 .await;
2134 info!(
2135 "startup: coordinator init: bootstrap: storage collections init complete in {:?}",
2136 init_storage_collections_start.elapsed()
2137 );
2138
2139 self.controller.start_compute_introspection_sink();
2144
2145 let sorting_start = Instant::now();
2146 info!("startup: coordinator init: bootstrap: sorting catalog entries");
2147 let entries = self.bootstrap_sort_catalog_entries();
2148 info!(
2149 "startup: coordinator init: bootstrap: sorting catalog entries complete in {:?}",
2150 sorting_start.elapsed()
2151 );
2152
2153 let optimize_dataflows_start = Instant::now();
2154 info!("startup: coordinator init: bootstrap: optimize dataflow plans beginning");
2155 let uncached_global_exps = self.bootstrap_dataflow_plans(&entries, cached_global_exprs)?;
2156 info!(
2157 "startup: coordinator init: bootstrap: optimize dataflow plans complete in {:?}",
2158 optimize_dataflows_start.elapsed()
2159 );
2160
2161 let _fut = self.catalog().update_expression_cache(
2163 uncached_local_exprs.into_iter().collect(),
2164 uncached_global_exps.into_iter().collect(),
2165 Default::default(),
2166 );
2167
2168 let bootstrap_as_ofs_start = Instant::now();
2172 info!("startup: coordinator init: bootstrap: dataflow as-of bootstrapping beginning");
2173 let dataflow_read_holds = self.bootstrap_dataflow_as_ofs().await;
2174 info!(
2175 "startup: coordinator init: bootstrap: dataflow as-of bootstrapping complete in {:?}",
2176 bootstrap_as_ofs_start.elapsed()
2177 );
2178
2179 let postamble_start = Instant::now();
2180 info!("startup: coordinator init: bootstrap: postamble beginning");
2181
2182 let logs: BTreeSet<_> = BUILTINS::logs()
2183 .map(|log| self.catalog().resolve_builtin_log(log))
2184 .flat_map(|item_id| self.catalog().get_global_ids(&item_id))
2185 .collect();
2186
2187 let mut privatelink_connections = BTreeMap::new();
2188
2189 for entry in &entries {
2190 debug!(
2191 "coordinator init: installing {} {}",
2192 entry.item().typ(),
2193 entry.id()
2194 );
2195 let mut policy = entry.item().initial_logical_compaction_window();
2196 match entry.item() {
2197 CatalogItem::Source(source) => {
2203 if source.custom_logical_compaction_window.is_none() {
2205 if let DataSourceDesc::IngestionExport { ingestion_id, .. } =
2206 source.data_source
2207 {
2208 policy = Some(
2209 self.catalog()
2210 .get_entry(&ingestion_id)
2211 .source()
2212 .expect("must be source")
2213 .custom_logical_compaction_window
2214 .unwrap_or_default(),
2215 );
2216 }
2217 }
2218 policies_to_set
2219 .entry(policy.expect("sources have a compaction window"))
2220 .or_insert_with(Default::default)
2221 .storage_ids
2222 .insert(source.global_id());
2223 }
2224 CatalogItem::Table(table) => {
2225 policies_to_set
2226 .entry(policy.expect("tables have a compaction window"))
2227 .or_insert_with(Default::default)
2228 .storage_ids
2229 .extend(table.global_ids());
2230 }
2231 CatalogItem::Index(idx) => {
2232 let policy_entry = policies_to_set
2233 .entry(policy.expect("indexes have a compaction window"))
2234 .or_insert_with(Default::default);
2235
2236 if logs.contains(&idx.on) {
2237 policy_entry
2238 .compute_ids
2239 .entry(idx.cluster_id)
2240 .or_insert_with(BTreeSet::new)
2241 .insert(idx.global_id());
2242 } else {
2243 let df_desc = self
2244 .catalog()
2245 .try_get_physical_plan(&idx.global_id())
2246 .expect("added in `bootstrap_dataflow_plans`")
2247 .clone();
2248
2249 let df_meta = self
2250 .catalog()
2251 .try_get_dataflow_metainfo(&idx.global_id())
2252 .expect("added in `bootstrap_dataflow_plans`");
2253
2254 if self.catalog().state().system_config().enable_mz_notices() {
2255 self.catalog().state().pack_optimizer_notices(
2257 &mut builtin_table_updates,
2258 df_meta.optimizer_notices.iter(),
2259 Diff::ONE,
2260 );
2261 }
2262
2263 policy_entry
2266 .compute_ids
2267 .entry(idx.cluster_id)
2268 .or_insert_with(Default::default)
2269 .extend(df_desc.export_ids());
2270
2271 self.controller
2272 .compute
2273 .create_dataflow(idx.cluster_id, df_desc, None)
2274 .unwrap_or_terminate("cannot fail to create dataflows");
2275 }
2276 }
2277 CatalogItem::View(_) => (),
2278 CatalogItem::MaterializedView(mview) => {
2279 policies_to_set
2280 .entry(policy.expect("materialized views have a compaction window"))
2281 .or_insert_with(Default::default)
2282 .storage_ids
2283 .insert(mview.global_id_writes());
2284
2285 let mut df_desc = self
2286 .catalog()
2287 .try_get_physical_plan(&mview.global_id_writes())
2288 .expect("added in `bootstrap_dataflow_plans`")
2289 .clone();
2290
2291 if let Some(initial_as_of) = mview.initial_as_of.clone() {
2292 df_desc.set_initial_as_of(initial_as_of);
2293 }
2294
2295 let until = mview
2297 .refresh_schedule
2298 .as_ref()
2299 .and_then(|s| s.last_refresh())
2300 .and_then(|r| r.try_step_forward());
2301 if let Some(until) = until {
2302 df_desc.until.meet_assign(&Antichain::from_elem(until));
2303 }
2304
2305 let df_meta = self
2306 .catalog()
2307 .try_get_dataflow_metainfo(&mview.global_id_writes())
2308 .expect("added in `bootstrap_dataflow_plans`");
2309
2310 if self.catalog().state().system_config().enable_mz_notices() {
2311 self.catalog().state().pack_optimizer_notices(
2313 &mut builtin_table_updates,
2314 df_meta.optimizer_notices.iter(),
2315 Diff::ONE,
2316 );
2317 }
2318
2319 self.ship_dataflow(df_desc, mview.cluster_id, mview.target_replica)
2320 .await;
2321
2322 if mview.replacement_target.is_none() {
2325 self.allow_writes(mview.cluster_id, mview.global_id_writes());
2326 }
2327 }
2328 CatalogItem::Sink(sink) => {
2329 policies_to_set
2330 .entry(CompactionWindow::Default)
2331 .or_insert_with(Default::default)
2332 .storage_ids
2333 .insert(sink.global_id());
2334 }
2335 CatalogItem::Connection(catalog_connection) => {
2336 if let ConnectionDetails::AwsPrivatelink(conn) = &catalog_connection.details {
2337 privatelink_connections.insert(
2338 entry.id(),
2339 VpcEndpointConfig {
2340 aws_service_name: conn.service_name.clone(),
2341 availability_zone_ids: conn.availability_zones.clone(),
2342 },
2343 );
2344 }
2345 }
2346 CatalogItem::Log(_)
2348 | CatalogItem::Type(_)
2349 | CatalogItem::Func(_)
2350 | CatalogItem::Secret(_) => {}
2351 }
2352 }
2353
2354 if let Some(cloud_resource_controller) = &self.cloud_resource_controller {
2355 let existing_vpc_endpoints = cloud_resource_controller
2357 .list_vpc_endpoints()
2358 .await
2359 .context("list vpc endpoints")?;
2360 let existing_vpc_endpoints = BTreeSet::from_iter(existing_vpc_endpoints.into_keys());
2361 let desired_vpc_endpoints = privatelink_connections.keys().cloned().collect();
2362 let vpc_endpoints_to_remove = existing_vpc_endpoints.difference(&desired_vpc_endpoints);
2363 for id in vpc_endpoints_to_remove {
2364 cloud_resource_controller
2365 .delete_vpc_endpoint(*id)
2366 .await
2367 .context("deleting extraneous vpc endpoint")?;
2368 }
2369
2370 for (id, spec) in privatelink_connections {
2372 cloud_resource_controller
2373 .ensure_vpc_endpoint(id, spec)
2374 .await
2375 .context("ensuring vpc endpoint")?;
2376 }
2377 }
2378
2379 drop(dataflow_read_holds);
2382 for (cw, policies) in policies_to_set {
2384 self.initialize_read_policies(&policies, cw).await;
2385 }
2386
2387 builtin_table_updates.extend(
2389 self.catalog().state().resolve_builtin_table_updates(
2390 self.catalog().state().pack_all_replica_size_updates(),
2391 ),
2392 );
2393
2394 debug!("startup: coordinator init: bootstrap: initializing migrated builtin tables");
2395 let migrated_updates_fut = if self.controller.read_only() {
2401 let min_timestamp = Timestamp::minimum();
2402 let migrated_builtin_table_updates: Vec<_> = builtin_table_updates
2403 .extract_if(.., |update| {
2404 let gid = self.catalog().get_entry(&update.id).latest_global_id();
2405 migrated_storage_collections_0dt.contains(&update.id)
2406 && self
2407 .controller
2408 .storage_collections
2409 .collection_frontiers(gid)
2410 .expect("all tables are registered")
2411 .write_frontier
2412 .elements()
2413 == &[min_timestamp]
2414 })
2415 .collect();
2416 if migrated_builtin_table_updates.is_empty() {
2417 futures::future::ready(()).boxed()
2418 } else {
2419 let mut grouped_appends: BTreeMap<GlobalId, Vec<TableData>> = BTreeMap::new();
2421 for update in migrated_builtin_table_updates {
2422 let gid = self.catalog().get_entry(&update.id).latest_global_id();
2423 grouped_appends.entry(gid).or_default().push(update.data);
2424 }
2425 info!(
2426 "coordinator init: rehydrating migrated builtin tables in read-only mode: {:?}",
2427 grouped_appends.keys().collect::<Vec<_>>()
2428 );
2429
2430 let mut all_appends = Vec::with_capacity(grouped_appends.len());
2432 for (item_id, table_data) in grouped_appends.into_iter() {
2433 let mut all_rows = Vec::new();
2434 let mut all_data = Vec::new();
2435 for data in table_data {
2436 match data {
2437 TableData::Rows(rows) => all_rows.extend(rows),
2438 TableData::Batches(_) => all_data.push(data),
2439 }
2440 }
2441 differential_dataflow::consolidation::consolidate(&mut all_rows);
2442 all_data.push(TableData::Rows(all_rows));
2443
2444 all_appends.push((item_id, all_data));
2446 }
2447
2448 let fut = self
2449 .controller
2450 .storage
2451 .append_table(min_timestamp, boot_ts.step_forward(), all_appends)
2452 .expect("cannot fail to append");
2453 async {
2454 fut.await
2455 .expect("One-shot shouldn't be dropped during bootstrap")
2456 .unwrap_or_terminate("cannot fail to append")
2457 }
2458 .boxed()
2459 }
2460 } else {
2461 futures::future::ready(()).boxed()
2462 };
2463
2464 info!(
2465 "startup: coordinator init: bootstrap: postamble complete in {:?}",
2466 postamble_start.elapsed()
2467 );
2468
2469 let builtin_update_start = Instant::now();
2470 info!("startup: coordinator init: bootstrap: generate builtin updates beginning");
2471
2472 if self.controller.read_only() {
2473 info!(
2474 "coordinator init: bootstrap: stashing builtin table updates while in read-only mode"
2475 );
2476
2477 let audit_join_start = Instant::now();
2479 info!("startup: coordinator init: bootstrap: audit log deserialization beginning");
2480 let audit_log_updates: Vec<_> = audit_logs_iterator
2481 .map(|(audit_log, ts)| StateUpdate {
2482 kind: StateUpdateKind::AuditLog(audit_log),
2483 ts,
2484 diff: StateDiff::Addition,
2485 })
2486 .collect();
2487 let audit_log_builtin_table_updates = self
2488 .catalog()
2489 .state()
2490 .generate_builtin_table_updates(audit_log_updates);
2491 builtin_table_updates.extend(audit_log_builtin_table_updates);
2492 info!(
2493 "startup: coordinator init: bootstrap: audit log deserialization complete in {:?}",
2494 audit_join_start.elapsed()
2495 );
2496 self.buffered_builtin_table_updates
2497 .as_mut()
2498 .expect("in read-only mode")
2499 .append(&mut builtin_table_updates);
2500 } else {
2501 self.bootstrap_tables(&entries, builtin_table_updates, audit_logs_iterator)
2502 .await;
2503 };
2504 info!(
2505 "startup: coordinator init: bootstrap: generate builtin updates complete in {:?}",
2506 builtin_update_start.elapsed()
2507 );
2508
2509 let cleanup_secrets_start = Instant::now();
2510 info!("startup: coordinator init: bootstrap: generate secret cleanup beginning");
2511 {
2515 let Self {
2518 secrets_controller,
2519 catalog,
2520 ..
2521 } = self;
2522
2523 let next_user_item_id = catalog.get_next_user_item_id().await?;
2524 let next_system_item_id = catalog.get_next_system_item_id().await?;
2525 let read_only = self.controller.read_only();
2526 let catalog_ids: BTreeSet<CatalogItemId> =
2531 catalog.entries().map(|entry| entry.id()).collect();
2532 let secrets_controller = Arc::clone(secrets_controller);
2533
2534 spawn(|| "cleanup-orphaned-secrets", async move {
2535 if read_only {
2536 info!(
2537 "coordinator init: not cleaning up orphaned secrets while in read-only mode"
2538 );
2539 return;
2540 }
2541 info!("coordinator init: cleaning up orphaned secrets");
2542
2543 match secrets_controller.list().await {
2544 Ok(controller_secrets) => {
2545 let controller_secrets: BTreeSet<CatalogItemId> =
2546 controller_secrets.into_iter().collect();
2547 let orphaned = controller_secrets.difference(&catalog_ids);
2548 for id in orphaned {
2549 let id_too_large = match id {
2550 CatalogItemId::System(id) => *id >= next_system_item_id,
2551 CatalogItemId::User(id) => *id >= next_user_item_id,
2552 CatalogItemId::IntrospectionSourceIndex(_)
2553 | CatalogItemId::Transient(_) => false,
2554 };
2555 if id_too_large {
2556 info!(
2557 %next_user_item_id, %next_system_item_id,
2558 "coordinator init: not deleting orphaned secret {id} that was likely created by a newer deploy generation"
2559 );
2560 } else {
2561 info!("coordinator init: deleting orphaned secret {id}");
2562 fail_point!("orphan_secrets");
2563 if let Err(e) = secrets_controller.delete(*id).await {
2564 warn!(
2565 "Dropping orphaned secret has encountered an error: {}",
2566 e
2567 );
2568 }
2569 }
2570 }
2571 }
2572 Err(e) => warn!("Failed to list secrets during orphan cleanup: {:?}", e),
2573 }
2574 });
2575 }
2576 info!(
2577 "startup: coordinator init: bootstrap: generate secret cleanup complete in {:?}",
2578 cleanup_secrets_start.elapsed()
2579 );
2580
2581 let final_steps_start = Instant::now();
2583 info!(
2584 "startup: coordinator init: bootstrap: migrate builtin tables in read-only mode beginning"
2585 );
2586 migrated_updates_fut
2587 .instrument(info_span!("coord::bootstrap::final"))
2588 .await;
2589
2590 debug!(
2591 "startup: coordinator init: bootstrap: announcing completion of initialization to controller"
2592 );
2593 self.controller.initialization_complete();
2595
2596 self.bootstrap_introspection_subscribes().await;
2598
2599 info!(
2600 "startup: coordinator init: bootstrap: migrate builtin tables in read-only mode complete in {:?}",
2601 final_steps_start.elapsed()
2602 );
2603
2604 info!(
2605 "startup: coordinator init: bootstrap complete in {:?}",
2606 bootstrap_start.elapsed()
2607 );
2608 Ok(())
2609 }
2610
2611 #[allow(clippy::async_yields_async)]
2616 #[instrument]
2617 async fn bootstrap_tables(
2618 &mut self,
2619 entries: &[CatalogEntry],
2620 mut builtin_table_updates: Vec<BuiltinTableUpdate>,
2621 audit_logs_iterator: AuditLogIterator,
2622 ) {
2623 struct TableMetadata<'a> {
2625 id: CatalogItemId,
2626 name: &'a QualifiedItemName,
2627 table: &'a Table,
2628 }
2629
2630 let table_metas: Vec<_> = entries
2632 .into_iter()
2633 .filter_map(|entry| {
2634 entry.table().map(|table| TableMetadata {
2635 id: entry.id(),
2636 name: entry.name(),
2637 table,
2638 })
2639 })
2640 .collect();
2641
2642 debug!("coordinator init: advancing all tables to current timestamp");
2644 let WriteTimestamp {
2645 timestamp: write_ts,
2646 advance_to,
2647 } = self.get_local_write_ts().await;
2648 let appends = table_metas
2649 .iter()
2650 .map(|meta| (meta.table.global_id_writes(), Vec::new()))
2651 .collect();
2652 let table_fence_rx = self
2656 .controller
2657 .storage
2658 .append_table(write_ts.clone(), advance_to, appends)
2659 .expect("invalid updates");
2660
2661 self.apply_local_write(write_ts).await;
2662
2663 debug!("coordinator init: resetting system tables");
2665 let read_ts = self.get_local_read_ts().await;
2666
2667 let mz_storage_usage_by_shard_schema: SchemaSpecifier = self
2670 .catalog()
2671 .resolve_system_schema(MZ_STORAGE_USAGE_BY_SHARD.schema)
2672 .into();
2673 let is_storage_usage_by_shard = |meta: &TableMetadata| -> bool {
2674 meta.name.item == MZ_STORAGE_USAGE_BY_SHARD.name
2675 && meta.name.qualifiers.schema_spec == mz_storage_usage_by_shard_schema
2676 };
2677
2678 let mut retraction_tasks = Vec::new();
2679 let mut system_tables: Vec<_> = table_metas
2680 .iter()
2681 .filter(|meta| meta.id.is_system() && !is_storage_usage_by_shard(meta))
2682 .collect();
2683
2684 let (audit_events_idx, _) = system_tables
2686 .iter()
2687 .find_position(|table| {
2688 table.id == self.catalog().resolve_builtin_table(&MZ_AUDIT_EVENTS)
2689 })
2690 .expect("mz_audit_events must exist");
2691 let audit_events = system_tables.remove(audit_events_idx);
2692 let audit_log_task = self.bootstrap_audit_log_table(
2693 audit_events.id,
2694 audit_events.name,
2695 audit_events.table,
2696 audit_logs_iterator,
2697 read_ts,
2698 );
2699
2700 for system_table in system_tables {
2701 let table_id = system_table.id;
2702 let full_name = self.catalog().resolve_full_name(system_table.name, None);
2703 debug!("coordinator init: resetting system table {full_name} ({table_id})");
2704
2705 let snapshot_fut = self
2707 .controller
2708 .storage_collections
2709 .snapshot_cursor(system_table.table.global_id_writes(), read_ts);
2710 let batch_fut = self
2711 .controller
2712 .storage_collections
2713 .create_update_builder(system_table.table.global_id_writes());
2714
2715 let task = spawn(|| format!("snapshot-{table_id}"), async move {
2716 let mut batch = batch_fut
2718 .await
2719 .unwrap_or_terminate("cannot fail to create a batch for a BuiltinTable");
2720 tracing::info!(?table_id, "starting snapshot");
2721 let mut snapshot_cursor = snapshot_fut
2723 .await
2724 .unwrap_or_terminate("cannot fail to snapshot");
2725
2726 while let Some(values) = snapshot_cursor.next().await {
2728 for (key, _t, d) in values {
2729 let d_invert = d.neg();
2730 batch.add(&key, &(), &d_invert).await;
2731 }
2732 }
2733 tracing::info!(?table_id, "finished snapshot");
2734
2735 let batch = batch.finish().await;
2736 BuiltinTableUpdate::batch(table_id, batch)
2737 });
2738 retraction_tasks.push(task);
2739 }
2740
2741 let retractions_res = futures::future::join_all(retraction_tasks).await;
2742 for retractions in retractions_res {
2743 builtin_table_updates.push(retractions);
2744 }
2745
2746 let audit_join_start = Instant::now();
2747 info!("startup: coordinator init: bootstrap: join audit log deserialization beginning");
2748 let audit_log_updates = audit_log_task.await;
2749 let audit_log_builtin_table_updates = self
2750 .catalog()
2751 .state()
2752 .generate_builtin_table_updates(audit_log_updates);
2753 builtin_table_updates.extend(audit_log_builtin_table_updates);
2754 info!(
2755 "startup: coordinator init: bootstrap: join audit log deserialization complete in {:?}",
2756 audit_join_start.elapsed()
2757 );
2758
2759 table_fence_rx
2761 .await
2762 .expect("One-shot shouldn't be dropped during bootstrap")
2763 .unwrap_or_terminate("cannot fail to append");
2764
2765 info!("coordinator init: sending builtin table updates");
2766 let (_builtin_updates_fut, write_ts) = self
2767 .builtin_table_update()
2768 .execute(builtin_table_updates)
2769 .await;
2770 info!(?write_ts, "our write ts");
2771 if let Some(write_ts) = write_ts {
2772 self.apply_local_write(write_ts).await;
2773 }
2774 }
2775
2776 #[instrument]
2780 fn bootstrap_audit_log_table<'a>(
2781 &self,
2782 table_id: CatalogItemId,
2783 name: &'a QualifiedItemName,
2784 table: &'a Table,
2785 audit_logs_iterator: AuditLogIterator,
2786 read_ts: Timestamp,
2787 ) -> JoinHandle<Vec<StateUpdate>> {
2788 let full_name = self.catalog().resolve_full_name(name, None);
2789 debug!("coordinator init: reconciling audit log: {full_name} ({table_id})");
2790 let current_contents_fut = self
2791 .controller
2792 .storage_collections
2793 .snapshot(table.global_id_writes(), read_ts);
2794 spawn(|| format!("snapshot-audit-log-{table_id}"), async move {
2795 let current_contents = current_contents_fut
2796 .await
2797 .unwrap_or_terminate("cannot fail to fetch snapshot");
2798 let contents_len = current_contents.len();
2799 debug!("coordinator init: audit log table ({table_id}) size {contents_len}");
2800
2801 let max_table_id = current_contents
2803 .into_iter()
2804 .filter(|(_, diff)| *diff == 1)
2805 .map(|(row, _diff)| row.unpack_first().unwrap_uint64())
2806 .sorted()
2807 .rev()
2808 .next();
2809
2810 audit_logs_iterator
2812 .take_while(|(audit_log, _)| match max_table_id {
2813 Some(id) => audit_log.event.sortable_id() > id,
2814 None => true,
2815 })
2816 .map(|(audit_log, ts)| StateUpdate {
2817 kind: StateUpdateKind::AuditLog(audit_log),
2818 ts,
2819 diff: StateDiff::Addition,
2820 })
2821 .collect::<Vec<_>>()
2822 })
2823 }
2824
2825 #[instrument]
2838 async fn bootstrap_storage_collections(
2839 &mut self,
2840 migrated_storage_collections: &BTreeSet<CatalogItemId>,
2841 ) {
2842 let catalog = self.catalog();
2843
2844 let source_desc = |object_id: GlobalId,
2845 data_source: &DataSourceDesc,
2846 desc: &RelationDesc,
2847 timeline: &Timeline| {
2848 let data_source = match data_source.clone() {
2849 DataSourceDesc::Ingestion { desc, cluster_id } => {
2851 let desc = desc.into_inline_connection(catalog.state());
2852 let ingestion = IngestionDescription::new(desc, cluster_id, object_id);
2853 DataSource::Ingestion(ingestion)
2854 }
2855 DataSourceDesc::OldSyntaxIngestion {
2856 desc,
2857 progress_subsource,
2858 data_config,
2859 details,
2860 cluster_id,
2861 } => {
2862 let desc = desc.into_inline_connection(catalog.state());
2863 let data_config = data_config.into_inline_connection(catalog.state());
2864 let progress_subsource =
2867 catalog.get_entry(&progress_subsource).latest_global_id();
2868 let mut ingestion =
2869 IngestionDescription::new(desc, cluster_id, progress_subsource);
2870 let legacy_export = SourceExport {
2871 storage_metadata: (),
2872 data_config,
2873 details,
2874 };
2875 ingestion.source_exports.insert(object_id, legacy_export);
2876
2877 DataSource::Ingestion(ingestion)
2878 }
2879 DataSourceDesc::IngestionExport {
2880 ingestion_id,
2881 external_reference: _,
2882 details,
2883 data_config,
2884 } => {
2885 let ingestion_id = catalog.get_entry(&ingestion_id).latest_global_id();
2888
2889 DataSource::IngestionExport {
2890 ingestion_id,
2891 details,
2892 data_config: data_config.into_inline_connection(catalog.state()),
2893 }
2894 }
2895 DataSourceDesc::Webhook { .. } => DataSource::Webhook,
2896 DataSourceDesc::Progress => DataSource::Progress,
2897 DataSourceDesc::Introspection(introspection) => {
2898 DataSource::Introspection(introspection)
2899 }
2900 DataSourceDesc::Catalog => DataSource::Other,
2901 };
2902 CollectionDescription {
2903 desc: desc.clone(),
2904 data_source,
2905 since: None,
2906 timeline: Some(timeline.clone()),
2907 primary: None,
2908 }
2909 };
2910
2911 let mut compute_collections = vec![];
2912 let mut collections = vec![];
2913 for entry in catalog.entries() {
2914 match entry.item() {
2915 CatalogItem::Source(source) => {
2916 collections.push((
2917 source.global_id(),
2918 source_desc(
2919 source.global_id(),
2920 &source.data_source,
2921 &source.desc,
2922 &source.timeline,
2923 ),
2924 ));
2925 }
2926 CatalogItem::Table(table) => {
2927 match &table.data_source {
2928 TableDataSource::TableWrites { defaults: _ } => {
2929 let versions: BTreeMap<_, _> = table
2930 .collection_descs()
2931 .map(|(gid, version, desc)| (version, (gid, desc)))
2932 .collect();
2933 let collection_descs = versions.iter().map(|(version, (gid, desc))| {
2934 let next_version = version.bump();
2935 let primary_collection =
2936 versions.get(&next_version).map(|(gid, _desc)| gid).copied();
2937 let mut collection_desc =
2938 CollectionDescription::for_table(desc.clone());
2939 collection_desc.primary = primary_collection;
2940
2941 (*gid, collection_desc)
2942 });
2943 collections.extend(collection_descs);
2944 }
2945 TableDataSource::DataSource {
2946 desc: data_source_desc,
2947 timeline,
2948 } => {
2949 soft_assert_eq_or_log!(table.collections.len(), 1);
2951 let collection_descs =
2952 table.collection_descs().map(|(gid, _version, desc)| {
2953 (
2954 gid,
2955 source_desc(
2956 entry.latest_global_id(),
2957 data_source_desc,
2958 &desc,
2959 timeline,
2960 ),
2961 )
2962 });
2963 collections.extend(collection_descs);
2964 }
2965 };
2966 }
2967 CatalogItem::MaterializedView(mv) => {
2968 let collection_descs = mv.collection_descs().map(|(gid, _version, desc)| {
2969 let collection_desc =
2970 CollectionDescription::for_other(desc, mv.initial_as_of.clone());
2971 (gid, collection_desc)
2972 });
2973
2974 collections.extend(collection_descs);
2975 compute_collections.push((mv.global_id_writes(), mv.desc.latest()));
2976 }
2977 CatalogItem::Sink(sink) => {
2978 let storage_sink_from_entry = self.catalog().get_entry_by_global_id(&sink.from);
2979 let from_desc = storage_sink_from_entry
2980 .relation_desc()
2981 .expect("sinks can only be built on items with descs")
2982 .into_owned();
2983 let collection_desc = CollectionDescription {
2984 desc: KAFKA_PROGRESS_DESC.clone(),
2986 data_source: DataSource::Sink {
2987 desc: ExportDescription {
2988 sink: StorageSinkDesc {
2989 from: sink.from,
2990 from_desc,
2991 connection: sink
2992 .connection
2993 .clone()
2994 .into_inline_connection(self.catalog().state()),
2995 envelope: sink.envelope,
2996 as_of: Antichain::from_elem(Timestamp::minimum()),
2997 with_snapshot: sink.with_snapshot,
2998 version: sink.version,
2999 from_storage_metadata: (),
3000 to_storage_metadata: (),
3001 commit_interval: sink.commit_interval,
3002 },
3003 instance_id: sink.cluster_id,
3004 },
3005 },
3006 since: None,
3007 timeline: None,
3008 primary: None,
3009 };
3010 collections.push((sink.global_id, collection_desc));
3011 }
3012 CatalogItem::Log(_)
3013 | CatalogItem::View(_)
3014 | CatalogItem::Index(_)
3015 | CatalogItem::Type(_)
3016 | CatalogItem::Func(_)
3017 | CatalogItem::Secret(_)
3018 | CatalogItem::Connection(_) => (),
3019 }
3020 }
3021
3022 let register_ts = if self.controller.read_only() {
3023 self.get_local_read_ts().await
3024 } else {
3025 self.get_local_write_ts().await.timestamp
3028 };
3029
3030 let storage_metadata = self.catalog.state().storage_metadata();
3031 let migrated_storage_collections = migrated_storage_collections
3032 .into_iter()
3033 .flat_map(|item_id| self.catalog.get_entry(item_id).global_ids())
3034 .collect();
3035
3036 self.controller
3041 .storage
3042 .evolve_nullability_for_bootstrap(storage_metadata, compute_collections)
3043 .await
3044 .unwrap_or_terminate("cannot fail to evolve collections");
3045
3046 let mut pending: BTreeMap<_, _> = collections.into_iter().collect();
3059
3060 let transitive_dep_gids: BTreeMap<_, _> = pending
3062 .keys()
3063 .map(|gid| {
3064 let entry = self.catalog.get_entry_by_global_id(gid);
3065 let item_id = entry.id();
3066 let deps = self.catalog.state().transitive_uses(item_id);
3067 let dep_gids: BTreeSet<_> = deps
3068 .filter(|dep_id| *dep_id != item_id)
3071 .map(|dep_id| self.catalog.get_entry(&dep_id).latest_global_id())
3072 .filter(|dep_gid| pending.contains_key(dep_gid))
3074 .collect();
3075 (*gid, dep_gids)
3076 })
3077 .collect();
3078
3079 while !pending.is_empty() {
3080 let ready_gids: BTreeSet<_> = pending
3083 .keys()
3084 .filter(|gid| {
3085 let mut deps = transitive_dep_gids[gid].iter();
3086 !deps.any(|dep_gid| pending.contains_key(dep_gid))
3087 })
3088 .copied()
3089 .collect();
3090 let mut ready: Vec<_> = pending
3091 .extract_if(.., |gid, _| ready_gids.contains(gid))
3092 .collect();
3093
3094 for (gid, collection) in &mut ready {
3096 if !gid.is_system() || collection.since.is_some() {
3098 continue;
3099 }
3100
3101 let mut derived_since = Antichain::from_elem(Timestamp::MIN);
3102 for dep_gid in &transitive_dep_gids[gid] {
3103 let (since, _) = self
3104 .controller
3105 .storage
3106 .collection_frontiers(*dep_gid)
3107 .expect("previously registered");
3108 derived_since.join_assign(&since);
3109 }
3110 collection.since = Some(derived_since);
3111 }
3112
3113 if ready.is_empty() {
3114 soft_panic_or_log!(
3115 "cycle in storage collections: {:?}",
3116 pending.keys().collect::<Vec<_>>(),
3117 );
3118 ready = mem::take(&mut pending).into_iter().collect();
3122 }
3123
3124 self.controller
3125 .storage
3126 .create_collections_for_bootstrap(
3127 storage_metadata,
3128 Some(register_ts),
3129 ready,
3130 &migrated_storage_collections,
3131 )
3132 .await
3133 .unwrap_or_terminate("cannot fail to create collections");
3134 }
3135
3136 if !self.controller.read_only() {
3137 self.apply_local_write(register_ts).await;
3138 }
3139 }
3140
3141 fn bootstrap_sort_catalog_entries(&self) -> Vec<CatalogEntry> {
3148 let mut indexes_on = BTreeMap::<_, Vec<_>>::new();
3149 let mut non_indexes = Vec::new();
3150 for entry in self.catalog().entries().cloned() {
3151 if let Some(index) = entry.index() {
3152 let on = self.catalog().get_entry_by_global_id(&index.on);
3153 indexes_on.entry(on.id()).or_default().push(entry);
3154 } else {
3155 non_indexes.push(entry);
3156 }
3157 }
3158
3159 let key_fn = |entry: &CatalogEntry| entry.id;
3160 let dependencies_fn = |entry: &CatalogEntry| entry.uses();
3161 sort_topological(&mut non_indexes, key_fn, dependencies_fn);
3162
3163 let mut result = Vec::new();
3164 for entry in non_indexes {
3165 let id = entry.id();
3166 result.push(entry);
3167 if let Some(mut indexes) = indexes_on.remove(&id) {
3168 result.append(&mut indexes);
3169 }
3170 }
3171
3172 soft_assert_or_log!(
3173 indexes_on.is_empty(),
3174 "indexes with missing dependencies: {indexes_on:?}",
3175 );
3176
3177 result
3178 }
3179
3180 #[instrument]
3191 fn bootstrap_dataflow_plans(
3192 &mut self,
3193 ordered_catalog_entries: &[CatalogEntry],
3194 mut cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
3195 ) -> Result<BTreeMap<GlobalId, GlobalExpressions>, AdapterError> {
3196 let mut instance_snapshots = BTreeMap::new();
3202 let mut uncached_expressions = BTreeMap::new();
3203
3204 let optimizer_config = |catalog: &Catalog, cluster_id| {
3205 let system_config = catalog.system_config();
3206 let overrides = catalog.get_cluster(cluster_id).config.features();
3207 OptimizerConfig::from(system_config).override_from(&overrides)
3208 };
3209
3210 for entry in ordered_catalog_entries {
3211 match entry.item() {
3212 CatalogItem::Index(idx) => {
3213 let compute_instance =
3215 instance_snapshots.entry(idx.cluster_id).or_insert_with(|| {
3216 self.instance_snapshot(idx.cluster_id)
3217 .expect("compute instance exists")
3218 });
3219 let global_id = idx.global_id();
3220
3221 if compute_instance.contains_collection(&global_id) {
3224 continue;
3225 }
3226
3227 let optimizer_config = optimizer_config(&self.catalog, idx.cluster_id);
3228
3229 let (optimized_plan, physical_plan, metainfo) =
3230 match cached_global_exprs.remove(&global_id) {
3231 Some(global_expressions)
3232 if global_expressions.optimizer_features
3233 == optimizer_config.features =>
3234 {
3235 debug!("global expression cache hit for {global_id:?}");
3236 (
3237 global_expressions.global_mir,
3238 global_expressions.physical_plan,
3239 global_expressions.dataflow_metainfos,
3240 )
3241 }
3242 Some(_) | None => {
3243 let (optimized_plan, global_lir_plan) = {
3244 let mut optimizer = optimize::index::Optimizer::new(
3246 self.owned_catalog(),
3247 compute_instance.clone(),
3248 global_id,
3249 optimizer_config.clone(),
3250 self.optimizer_metrics(),
3251 );
3252
3253 let index_plan = optimize::index::Index::new(
3255 entry.name().clone(),
3256 idx.on,
3257 idx.keys.to_vec(),
3258 );
3259 let global_mir_plan = optimizer.optimize(index_plan)?;
3260 let optimized_plan = global_mir_plan.df_desc().clone();
3261
3262 let global_lir_plan = optimizer.optimize(global_mir_plan)?;
3264
3265 (optimized_plan, global_lir_plan)
3266 };
3267
3268 let (physical_plan, metainfo) = global_lir_plan.unapply();
3269 let metainfo = {
3270 let notice_ids =
3272 std::iter::repeat_with(|| self.allocate_transient_id())
3273 .map(|(_item_id, gid)| gid)
3274 .take(metainfo.optimizer_notices.len())
3275 .collect::<Vec<_>>();
3276 self.catalog().render_notices(
3278 metainfo,
3279 notice_ids,
3280 Some(idx.global_id()),
3281 )
3282 };
3283 uncached_expressions.insert(
3284 global_id,
3285 GlobalExpressions {
3286 global_mir: optimized_plan.clone(),
3287 physical_plan: physical_plan.clone(),
3288 dataflow_metainfos: metainfo.clone(),
3289 optimizer_features: optimizer_config.features.clone(),
3290 },
3291 );
3292 (optimized_plan, physical_plan, metainfo)
3293 }
3294 };
3295
3296 let catalog = self.catalog_mut();
3297 catalog.set_optimized_plan(idx.global_id(), optimized_plan);
3298 catalog.set_physical_plan(idx.global_id(), physical_plan);
3299 catalog.set_dataflow_metainfo(idx.global_id(), metainfo);
3300
3301 compute_instance.insert_collection(idx.global_id());
3302 }
3303 CatalogItem::MaterializedView(mv) => {
3304 let compute_instance =
3306 instance_snapshots.entry(mv.cluster_id).or_insert_with(|| {
3307 self.instance_snapshot(mv.cluster_id)
3308 .expect("compute instance exists")
3309 });
3310 let global_id = mv.global_id_writes();
3311
3312 let optimizer_config = optimizer_config(&self.catalog, mv.cluster_id);
3313
3314 let (optimized_plan, physical_plan, metainfo) = match cached_global_exprs
3315 .remove(&global_id)
3316 {
3317 Some(global_expressions)
3318 if global_expressions.optimizer_features
3319 == optimizer_config.features =>
3320 {
3321 debug!("global expression cache hit for {global_id:?}");
3322 (
3323 global_expressions.global_mir,
3324 global_expressions.physical_plan,
3325 global_expressions.dataflow_metainfos,
3326 )
3327 }
3328 Some(_) | None => {
3329 let (_, internal_view_id) = self.allocate_transient_id();
3330 let debug_name = self
3331 .catalog()
3332 .resolve_full_name(entry.name(), None)
3333 .to_string();
3334
3335 let (optimized_plan, global_lir_plan) = {
3336 let mut optimizer = optimize::materialized_view::Optimizer::new(
3338 self.owned_catalog().as_optimizer_catalog(),
3339 compute_instance.clone(),
3340 global_id,
3341 internal_view_id,
3342 mv.desc.latest().iter_names().cloned().collect(),
3343 mv.non_null_assertions.clone(),
3344 mv.refresh_schedule.clone(),
3345 debug_name,
3346 optimizer_config.clone(),
3347 self.optimizer_metrics(),
3348 );
3349
3350 let typ = infer_sql_type_for_catalog(
3353 &mv.raw_expr,
3354 &mv.locally_optimized_expr.as_ref().clone(),
3355 );
3356 let global_mir_plan = optimizer
3357 .optimize((mv.locally_optimized_expr.as_ref().clone(), typ))?;
3358 let optimized_plan = global_mir_plan.df_desc().clone();
3359
3360 let global_lir_plan = optimizer.optimize(global_mir_plan)?;
3362
3363 (optimized_plan, global_lir_plan)
3364 };
3365
3366 let (physical_plan, metainfo) = global_lir_plan.unapply();
3367 let metainfo = {
3368 let notice_ids =
3370 std::iter::repeat_with(|| self.allocate_transient_id())
3371 .map(|(_item_id, global_id)| global_id)
3372 .take(metainfo.optimizer_notices.len())
3373 .collect::<Vec<_>>();
3374 self.catalog().render_notices(
3376 metainfo,
3377 notice_ids,
3378 Some(mv.global_id_writes()),
3379 )
3380 };
3381 uncached_expressions.insert(
3382 global_id,
3383 GlobalExpressions {
3384 global_mir: optimized_plan.clone(),
3385 physical_plan: physical_plan.clone(),
3386 dataflow_metainfos: metainfo.clone(),
3387 optimizer_features: optimizer_config.features.clone(),
3388 },
3389 );
3390 (optimized_plan, physical_plan, metainfo)
3391 }
3392 };
3393
3394 let catalog = self.catalog_mut();
3395 catalog.set_optimized_plan(mv.global_id_writes(), optimized_plan);
3396 catalog.set_physical_plan(mv.global_id_writes(), physical_plan);
3397 catalog.set_dataflow_metainfo(mv.global_id_writes(), metainfo);
3398
3399 compute_instance.insert_collection(mv.global_id_writes());
3400 }
3401 CatalogItem::Table(_)
3402 | CatalogItem::Source(_)
3403 | CatalogItem::Log(_)
3404 | CatalogItem::View(_)
3405 | CatalogItem::Sink(_)
3406 | CatalogItem::Type(_)
3407 | CatalogItem::Func(_)
3408 | CatalogItem::Secret(_)
3409 | CatalogItem::Connection(_) => (),
3410 }
3411 }
3412
3413 Ok(uncached_expressions)
3414 }
3415
3416 async fn bootstrap_dataflow_as_ofs(&mut self) -> BTreeMap<GlobalId, ReadHold> {
3426 let mut catalog_ids = Vec::new();
3427 let mut dataflows = Vec::new();
3428 let mut read_policies = BTreeMap::new();
3429 for entry in self.catalog.entries() {
3430 let gid = match entry.item() {
3431 CatalogItem::Index(idx) => idx.global_id(),
3432 CatalogItem::MaterializedView(mv) => mv.global_id_writes(),
3433 CatalogItem::Table(_)
3434 | CatalogItem::Source(_)
3435 | CatalogItem::Log(_)
3436 | CatalogItem::View(_)
3437 | CatalogItem::Sink(_)
3438 | CatalogItem::Type(_)
3439 | CatalogItem::Func(_)
3440 | CatalogItem::Secret(_)
3441 | CatalogItem::Connection(_) => continue,
3442 };
3443 if let Some(plan) = self.catalog.try_get_physical_plan(&gid) {
3444 catalog_ids.push(gid);
3445 dataflows.push(plan.clone());
3446
3447 if let Some(compaction_window) = entry.item().initial_logical_compaction_window() {
3448 read_policies.insert(gid, compaction_window.into());
3449 }
3450 }
3451 }
3452
3453 let read_ts = self.get_local_read_ts().await;
3454 let read_holds = as_of_selection::run(
3455 &mut dataflows,
3456 &read_policies,
3457 &*self.controller.storage_collections,
3458 read_ts,
3459 self.controller.read_only(),
3460 );
3461
3462 let catalog = self.catalog_mut();
3463 for (id, plan) in catalog_ids.into_iter().zip_eq(dataflows) {
3464 catalog.set_physical_plan(id, plan);
3465 }
3466
3467 read_holds
3468 }
3469
3470 fn serve(
3479 mut self,
3480 mut internal_cmd_rx: mpsc::UnboundedReceiver<Message>,
3481 mut strict_serializable_reads_rx: mpsc::UnboundedReceiver<(ConnectionId, PendingReadTxn)>,
3482 mut cmd_rx: mpsc::UnboundedReceiver<(OpenTelemetryContext, Command)>,
3483 group_commit_rx: appends::GroupCommitWaiter,
3484 ) -> LocalBoxFuture<'static, ()> {
3485 async move {
3486 let mut cluster_events = self.controller.events_stream();
3488 let last_message = Arc::new(Mutex::new(LastMessage {
3489 kind: "none",
3490 stmt: None,
3491 }));
3492
3493 let (idle_tx, mut idle_rx) = tokio::sync::mpsc::channel(1);
3494 let idle_metric = self.metrics.queue_busy_seconds.clone();
3495 let last_message_watchdog = Arc::clone(&last_message);
3496
3497 spawn(|| "coord watchdog", async move {
3498 let mut interval = tokio::time::interval(Duration::from_secs(5));
3503 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
3507
3508 let mut coord_stuck = false;
3510
3511 loop {
3512 interval.tick().await;
3513
3514 let duration = tokio::time::Duration::from_secs(30);
3516 let timeout = tokio::time::timeout(duration, idle_tx.reserve()).await;
3517 let Ok(maybe_permit) = timeout else {
3518 if !coord_stuck {
3520 let last_message = last_message_watchdog.lock().expect("poisoned");
3521 tracing::warn!(
3522 last_message_kind = %last_message.kind,
3523 last_message_sql = %last_message.stmt_to_string(),
3524 "coordinator stuck for {duration:?}",
3525 );
3526 }
3527 coord_stuck = true;
3528
3529 continue;
3530 };
3531
3532 if coord_stuck {
3534 tracing::info!("Coordinator became unstuck");
3535 }
3536 coord_stuck = false;
3537
3538 let Ok(permit) = maybe_permit else {
3540 break;
3541 };
3542
3543 permit.send(idle_metric.start_timer());
3544 }
3545 });
3546
3547 self.schedule_storage_usage_collection().await;
3548 self.schedule_arrangement_sizes_collection().await;
3549 self.spawn_privatelink_vpc_endpoints_watch_task();
3550 self.spawn_statement_logging_task();
3551 flags::tracing_config(self.catalog.system_config()).apply(&self.tracing_handle);
3552
3553 let warn_threshold = self
3555 .catalog()
3556 .system_config()
3557 .coord_slow_message_warn_threshold();
3558
3559 const MESSAGE_BATCH: usize = 64;
3561 let mut messages = Vec::with_capacity(MESSAGE_BATCH);
3562 let mut cmd_messages = Vec::with_capacity(MESSAGE_BATCH);
3563
3564 let message_batch = self.metrics.message_batch.clone();
3565
3566 loop {
3567 select! {
3571 biased;
3576
3577 _ = internal_cmd_rx.recv_many(&mut messages, MESSAGE_BATCH) => {},
3581 Some(event) = cluster_events.next() => {
3585 messages.push(Message::ClusterEvent(event))
3586 },
3587 () = self.controller.ready() => {
3591 let controller = match self.controller.get_readiness() {
3595 Readiness::Storage => ControllerReadiness::Storage,
3596 Readiness::Compute => ControllerReadiness::Compute,
3597 Readiness::Metrics(_) => ControllerReadiness::Metrics,
3598 Readiness::Internal(_) => ControllerReadiness::Internal,
3599 Readiness::NotReady => unreachable!("just signaled as ready"),
3600 };
3601 messages.push(Message::ControllerReady { controller });
3602 }
3603 permit = group_commit_rx.ready() => {
3606 let user_write_spans = self.pending_writes.iter().flat_map(|x| match x {
3612 PendingWriteTxn::User{span, ..} => Some(span),
3613 PendingWriteTxn::System{..} => None,
3614 });
3615 let span = match user_write_spans.exactly_one() {
3616 Ok(span) => span.clone(),
3617 Err(user_write_spans) => {
3618 let span = info_span!(parent: None, "group_commit_notify");
3619 for s in user_write_spans {
3620 span.follows_from(s);
3621 }
3622 span
3623 }
3624 };
3625 messages.push(Message::GroupCommitInitiate(span, Some(permit)));
3626 },
3627 count = cmd_rx.recv_many(&mut cmd_messages, MESSAGE_BATCH) => {
3631 if count == 0 {
3632 break;
3633 } else {
3634 messages.extend(cmd_messages.drain(..).map(
3635 |(otel_ctx, cmd)| Message::Command(otel_ctx, cmd),
3636 ));
3637 }
3638 },
3639 Some(pending_read_txn) = strict_serializable_reads_rx.recv() => {
3643 let mut pending_read_txns = vec![pending_read_txn];
3644 while let Ok(pending_read_txn) = strict_serializable_reads_rx.try_recv() {
3645 pending_read_txns.push(pending_read_txn);
3646 }
3647 for (conn_id, pending_read_txn) in pending_read_txns {
3648 let prev = self
3649 .pending_linearize_read_txns
3650 .insert(conn_id, pending_read_txn);
3651 soft_assert_or_log!(
3652 prev.is_none(),
3653 "connections can not have multiple concurrent reads, prev: {prev:?}"
3654 )
3655 }
3656 messages.push(Message::LinearizeReads);
3657 }
3658 _ = self.advance_timelines_interval.tick() => {
3662 let span = info_span!(parent: None, "coord::advance_timelines_interval");
3663 span.follows_from(Span::current());
3664
3665 if self.controller.read_only() {
3670 messages.push(Message::AdvanceTimelines);
3671 } else {
3672 messages.push(Message::GroupCommitInitiate(span, None));
3673 }
3674 },
3675 _ = self.check_cluster_scheduling_policies_interval.tick() => {
3679 messages.push(Message::CheckSchedulingPolicies);
3680 },
3681
3682 _ = self.caught_up_check_interval.tick() => {
3686 self.maybe_check_caught_up().await;
3691
3692 continue;
3693 },
3694
3695 timer = idle_rx.recv() => {
3700 timer.expect("does not drop").observe_duration();
3701 self.metrics
3702 .message_handling
3703 .with_label_values(&["watchdog"])
3704 .observe(0.0);
3705 continue;
3706 }
3707 };
3708
3709 message_batch.observe(f64::cast_lossy(messages.len()));
3711
3712 for msg in messages.drain(..) {
3713 let msg_kind = msg.kind();
3716 let span = span!(
3717 target: "mz_adapter::coord::handle_message_loop",
3718 Level::INFO,
3719 "coord::handle_message",
3720 kind = msg_kind
3721 );
3722 let otel_context = span.context().span().span_context().clone();
3723
3724 *last_message.lock().expect("poisoned") = LastMessage {
3728 kind: msg_kind,
3729 stmt: match &msg {
3730 Message::Command(
3731 _,
3732 Command::Execute {
3733 portal_name,
3734 session,
3735 ..
3736 },
3737 ) => session
3738 .get_portal_unverified(portal_name)
3739 .and_then(|p| p.stmt.as_ref().map(Arc::clone)),
3740 _ => None,
3741 },
3742 };
3743
3744 let start = Instant::now();
3745 self.handle_message(msg).instrument(span).await;
3746 let duration = start.elapsed();
3747
3748 self.metrics
3749 .message_handling
3750 .with_label_values(&[msg_kind])
3751 .observe(duration.as_secs_f64());
3752
3753 if duration > warn_threshold {
3755 let trace_id = otel_context.is_valid().then(|| otel_context.trace_id());
3756 tracing::error!(
3757 ?msg_kind,
3758 ?trace_id,
3759 ?duration,
3760 "very slow coordinator message"
3761 );
3762 }
3763 }
3764 }
3765 if let Some(catalog) = Arc::into_inner(self.catalog) {
3768 catalog.expire().await;
3769 }
3770 }
3771 .boxed_local()
3772 }
3773
3774 fn catalog(&self) -> &Catalog {
3776 &self.catalog
3777 }
3778
3779 fn owned_catalog(&self) -> Arc<Catalog> {
3782 Arc::clone(&self.catalog)
3783 }
3784
3785 fn optimizer_metrics(&self) -> OptimizerMetrics {
3788 self.optimizer_metrics.clone()
3789 }
3790
3791 fn catalog_mut(&mut self) -> &mut Catalog {
3793 Arc::make_mut(&mut self.catalog)
3801 }
3802
3803 async fn refill_user_id_pool(&mut self, min_count: u64) -> Result<(), AdapterError> {
3808 let batch_size = USER_ID_POOL_BATCH_SIZE.get(self.catalog().system_config().dyncfgs());
3809 let to_allocate = min_count.max(u64::from(batch_size));
3810 let id_ts = self.get_catalog_write_ts().await;
3811 let ids = self.catalog().allocate_user_ids(to_allocate, id_ts).await?;
3812 if let (Some((first_id, _)), Some((last_id, _))) = (ids.first(), ids.last()) {
3813 let start = match first_id {
3814 CatalogItemId::User(id) => *id,
3815 other => {
3816 return Err(AdapterError::Internal(format!(
3817 "expected User CatalogItemId, got {other:?}"
3818 )));
3819 }
3820 };
3821 let end = match last_id {
3822 CatalogItemId::User(id) => *id + 1, other => {
3824 return Err(AdapterError::Internal(format!(
3825 "expected User CatalogItemId, got {other:?}"
3826 )));
3827 }
3828 };
3829 self.user_id_pool.refill(start, end);
3830 } else {
3831 return Err(AdapterError::Internal(
3832 "catalog returned no user IDs".into(),
3833 ));
3834 }
3835 Ok(())
3836 }
3837
3838 async fn allocate_user_id(&mut self) -> Result<(CatalogItemId, GlobalId), AdapterError> {
3840 if let Some(id) = self.user_id_pool.allocate() {
3841 return Ok((CatalogItemId::User(id), GlobalId::User(id)));
3842 }
3843 self.refill_user_id_pool(1).await?;
3844 let id = self.user_id_pool.allocate().expect("ID pool just refilled");
3845 Ok((CatalogItemId::User(id), GlobalId::User(id)))
3846 }
3847
3848 async fn allocate_user_ids(
3850 &mut self,
3851 count: u64,
3852 ) -> Result<Vec<(CatalogItemId, GlobalId)>, AdapterError> {
3853 if self.user_id_pool.remaining() < count {
3854 self.refill_user_id_pool(count).await?;
3855 }
3856 let raw_ids = self
3857 .user_id_pool
3858 .allocate_many(count)
3859 .expect("pool has enough IDs after refill");
3860 Ok(raw_ids
3861 .into_iter()
3862 .map(|id| (CatalogItemId::User(id), GlobalId::User(id)))
3863 .collect())
3864 }
3865
3866 fn connection_context(&self) -> &ConnectionContext {
3868 self.controller.connection_context()
3869 }
3870
3871 fn secrets_reader(&self) -> &Arc<dyn SecretsReader> {
3873 &self.connection_context().secrets_reader
3874 }
3875
3876 #[allow(dead_code)]
3881 pub(crate) fn broadcast_notice(&self, notice: AdapterNotice) {
3882 for meta in self.active_conns.values() {
3883 let _ = meta.notice_tx.send(notice.clone());
3884 }
3885 }
3886
3887 pub(crate) fn broadcast_notice_tx(
3890 &self,
3891 ) -> Box<dyn FnOnce(AdapterNotice) -> () + Send + 'static> {
3892 let senders: Vec<_> = self
3893 .active_conns
3894 .values()
3895 .map(|meta| meta.notice_tx.clone())
3896 .collect();
3897 Box::new(move |notice| {
3898 for tx in senders {
3899 let _ = tx.send(notice.clone());
3900 }
3901 })
3902 }
3903
3904 pub(crate) fn active_conns(&self) -> &BTreeMap<ConnectionId, ConnMeta> {
3905 &self.active_conns
3906 }
3907
3908 #[instrument(level = "debug")]
3909 pub(crate) fn retire_execution(
3910 &mut self,
3911 reason: StatementEndedExecutionReason,
3912 ctx_extra: ExecuteContextExtra,
3913 ) {
3914 if let Some(uuid) = ctx_extra.retire() {
3915 self.end_statement_execution(uuid, reason);
3916 }
3917 }
3918
3919 #[instrument(level = "debug")]
3921 pub fn dataflow_builder(&self, instance: ComputeInstanceId) -> DataflowBuilder<'_> {
3922 let compute = self
3923 .instance_snapshot(instance)
3924 .expect("compute instance does not exist");
3925 DataflowBuilder::new(self.catalog().state(), compute)
3926 }
3927
3928 pub fn instance_snapshot(
3930 &self,
3931 id: ComputeInstanceId,
3932 ) -> Result<ComputeInstanceSnapshot, InstanceMissing> {
3933 ComputeInstanceSnapshot::new(&self.controller, id)
3934 }
3935
3936 pub(crate) async fn ship_dataflow(
3943 &mut self,
3944 dataflow: DataflowDescription<Plan>,
3945 instance: ComputeInstanceId,
3946 target_replica: Option<ReplicaId>,
3947 ) {
3948 self.try_ship_dataflow(dataflow, instance, target_replica)
3949 .await
3950 .unwrap_or_terminate("dataflow creation cannot fail");
3951 }
3952
3953 pub(crate) async fn try_ship_dataflow(
3956 &mut self,
3957 dataflow: DataflowDescription<Plan>,
3958 instance: ComputeInstanceId,
3959 target_replica: Option<ReplicaId>,
3960 ) -> Result<(), DataflowCreationError> {
3961 let export_ids = dataflow.exported_index_ids().collect();
3964
3965 self.controller
3966 .compute
3967 .create_dataflow(instance, dataflow, target_replica)?;
3968
3969 self.initialize_compute_read_policies(export_ids, instance, CompactionWindow::Default)
3970 .await;
3971
3972 Ok(())
3973 }
3974
3975 pub(crate) fn allow_writes(&mut self, instance: ComputeInstanceId, id: GlobalId) {
3979 self.controller
3980 .compute
3981 .allow_writes(instance, id)
3982 .unwrap_or_terminate("allow_writes cannot fail");
3983 }
3984
3985 pub(crate) async fn ship_dataflow_and_notice_builtin_table_updates(
3987 &mut self,
3988 dataflow: DataflowDescription<Plan>,
3989 instance: ComputeInstanceId,
3990 notice_builtin_updates_fut: Option<BuiltinTableAppendNotify>,
3991 target_replica: Option<ReplicaId>,
3992 ) {
3993 if let Some(notice_builtin_updates_fut) = notice_builtin_updates_fut {
3994 let ship_dataflow_fut = self.ship_dataflow(dataflow, instance, target_replica);
3995 let ((), ()) =
3996 futures::future::join(notice_builtin_updates_fut, ship_dataflow_fut).await;
3997 } else {
3998 self.ship_dataflow(dataflow, instance, target_replica).await;
3999 }
4000 }
4001
4002 pub fn install_compute_watch_set(
4006 &mut self,
4007 conn_id: ConnectionId,
4008 objects: BTreeSet<GlobalId>,
4009 t: Timestamp,
4010 state: WatchSetResponse,
4011 ) -> Result<(), CollectionLookupError> {
4012 let ws_id = self.controller.install_compute_watch_set(objects, t)?;
4013 self.connection_watch_sets
4014 .entry(conn_id.clone())
4015 .or_default()
4016 .insert(ws_id);
4017 self.installed_watch_sets.insert(ws_id, (conn_id, state));
4018 Ok(())
4019 }
4020
4021 pub fn install_storage_watch_set(
4025 &mut self,
4026 conn_id: ConnectionId,
4027 objects: BTreeSet<GlobalId>,
4028 t: Timestamp,
4029 state: WatchSetResponse,
4030 ) -> Result<(), CollectionMissing> {
4031 let ws_id = self.controller.install_storage_watch_set(objects, t)?;
4032 self.connection_watch_sets
4033 .entry(conn_id.clone())
4034 .or_default()
4035 .insert(ws_id);
4036 self.installed_watch_sets.insert(ws_id, (conn_id, state));
4037 Ok(())
4038 }
4039
4040 pub fn cancel_pending_watchsets(&mut self, conn_id: &ConnectionId) {
4042 if let Some(ws_ids) = self.connection_watch_sets.remove(conn_id) {
4043 for ws_id in ws_ids {
4044 self.installed_watch_sets.remove(&ws_id);
4045 }
4046 }
4047 }
4048
4049 pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
4053 let global_timelines: BTreeMap<_, _> = self
4059 .global_timelines
4060 .iter()
4061 .map(|(timeline, state)| (timeline.to_string(), format!("{state:?}")))
4062 .collect();
4063 let active_conns: BTreeMap<_, _> = self
4064 .active_conns
4065 .iter()
4066 .map(|(id, meta)| (id.unhandled().to_string(), format!("{meta:?}")))
4067 .collect();
4068 let txn_read_holds: BTreeMap<_, _> = self
4069 .txn_read_holds
4070 .iter()
4071 .map(|(id, capability)| (id.unhandled().to_string(), format!("{capability:?}")))
4072 .collect();
4073 let pending_peeks: BTreeMap<_, _> = self
4074 .pending_peeks
4075 .iter()
4076 .map(|(id, peek)| (id.to_string(), format!("{peek:?}")))
4077 .collect();
4078 let client_pending_peeks: BTreeMap<_, _> = self
4079 .client_pending_peeks
4080 .iter()
4081 .map(|(id, peek)| {
4082 let peek: BTreeMap<_, _> = peek
4083 .iter()
4084 .map(|(uuid, storage_id)| (uuid.to_string(), storage_id))
4085 .collect();
4086 (id.to_string(), peek)
4087 })
4088 .collect();
4089 let pending_linearize_read_txns: BTreeMap<_, _> = self
4090 .pending_linearize_read_txns
4091 .iter()
4092 .map(|(id, read_txn)| (id.unhandled().to_string(), format!("{read_txn:?}")))
4093 .collect();
4094
4095 Ok(serde_json::json!({
4096 "global_timelines": global_timelines,
4097 "active_conns": active_conns,
4098 "txn_read_holds": txn_read_holds,
4099 "pending_peeks": pending_peeks,
4100 "client_pending_peeks": client_pending_peeks,
4101 "pending_linearize_read_txns": pending_linearize_read_txns,
4102 "controller": self.controller.dump().await?,
4103 }))
4104 }
4105
4106 async fn prune_storage_usage_events_on_startup(&self, retention_period: Duration) {
4120 let item_id = self
4121 .catalog()
4122 .resolve_builtin_table(&MZ_STORAGE_USAGE_BY_SHARD);
4123 let global_id = self.catalog.get_entry(&item_id).latest_global_id();
4124 let read_ts = self.get_local_read_ts().await;
4125 let current_contents_fut = self
4126 .controller
4127 .storage_collections
4128 .snapshot(global_id, read_ts);
4129 let internal_cmd_tx = self.internal_cmd_tx.clone();
4130 spawn(|| "storage_usage_prune", async move {
4131 let mut current_contents = current_contents_fut
4132 .await
4133 .unwrap_or_terminate("cannot fail to fetch snapshot");
4134 differential_dataflow::consolidation::consolidate(&mut current_contents);
4135
4136 let cutoff_ts = u128::from(read_ts).saturating_sub(retention_period.as_millis());
4137 let mut expired = Vec::new();
4138 for (row, diff) in current_contents {
4139 assert_eq!(
4140 diff, 1,
4141 "consolidated contents should not contain retractions: ({row:#?}, {diff:#?})"
4142 );
4143 let collection_timestamp = row
4145 .unpack()
4146 .get(3)
4147 .expect("definition of mz_storage_by_shard changed")
4148 .unwrap_timestamptz();
4149 let collection_timestamp = collection_timestamp.timestamp_millis();
4150 let collection_timestamp: u128 = collection_timestamp
4151 .try_into()
4152 .expect("all collections happen after Jan 1 1970");
4153 if collection_timestamp < cutoff_ts {
4154 debug!("pruning storage event {row:?}");
4155 let builtin_update = BuiltinTableUpdate::row(item_id, row, Diff::MINUS_ONE);
4156 expired.push(builtin_update);
4157 }
4158 }
4159
4160 let _ = internal_cmd_tx.send(Message::StorageUsagePrune(expired));
4162 });
4163 }
4164
4165 async fn prune_arrangement_sizes_history_on_startup(&self) {
4174 if self.controller.read_only() {
4176 return;
4177 }
4178
4179 let retention_period = mz_adapter_types::dyncfgs::ARRANGEMENT_SIZE_HISTORY_RETENTION_PERIOD
4180 .get(self.catalog().system_config().dyncfgs());
4181 let item_id = self
4182 .catalog()
4183 .resolve_builtin_table(&mz_catalog::builtin::MZ_OBJECT_ARRANGEMENT_SIZE_HISTORY);
4184 let global_id = self.catalog.get_entry(&item_id).latest_global_id();
4185 let read_ts = self.get_local_read_ts().await;
4186 let current_contents_fut = self
4187 .controller
4188 .storage_collections
4189 .snapshot(global_id, read_ts);
4190 let internal_cmd_tx = self.internal_cmd_tx.clone();
4191 spawn(|| "arrangement_sizes_history_prune", async move {
4192 let mut current_contents = current_contents_fut
4193 .await
4194 .unwrap_or_terminate("cannot fail to fetch snapshot");
4195 differential_dataflow::consolidation::consolidate(&mut current_contents);
4196
4197 let cutoff_ts = u128::from(read_ts).saturating_sub(retention_period.as_millis());
4198 let expired =
4199 arrangement_sizes_expired_retractions(current_contents, cutoff_ts, item_id);
4200
4201 let _ = internal_cmd_tx.send(Message::ArrangementSizesPrune(expired));
4205 });
4206 }
4207
4208 fn current_credit_consumption_rate(&self) -> Numeric {
4209 self.catalog()
4210 .user_cluster_replicas()
4211 .filter_map(|replica| match &replica.config.location {
4212 ReplicaLocation::Managed(location) => Some(location.size_for_billing()),
4213 ReplicaLocation::Unmanaged(_) => None,
4214 })
4215 .map(|size| {
4216 self.catalog()
4217 .cluster_replica_sizes()
4218 .0
4219 .get(size)
4220 .expect("location size is validated against the cluster replica sizes")
4221 .credits_per_hour
4222 })
4223 .sum()
4224 }
4225}
4226
4227fn arrangement_sizes_expired_retractions(
4235 rows: impl IntoIterator<Item = (mz_repr::Row, i64)>,
4236 cutoff_ts: u128,
4237 item_id: CatalogItemId,
4238) -> Vec<BuiltinTableUpdate> {
4239 let mut expired = Vec::new();
4240 for (row, diff) in rows {
4241 assert_eq!(
4242 diff, 1,
4243 "consolidated contents should not contain retractions: ({row:#?}, {diff:#?})"
4244 );
4245 let collection_timestamp = row
4246 .unpack()
4247 .get(3)
4248 .expect("definition of mz_object_arrangement_size_history changed")
4249 .unwrap_timestamptz()
4250 .timestamp_millis();
4251 let collection_timestamp: u128 = collection_timestamp
4252 .try_into()
4253 .expect("all collections happen after Jan 1 1970");
4254 if collection_timestamp < cutoff_ts {
4255 expired.push(BuiltinTableUpdate::row(item_id, row, Diff::MINUS_ONE));
4256 }
4257 }
4258 expired
4259}
4260
4261#[cfg(test)]
4262impl Coordinator {
4263 #[allow(dead_code)]
4264 async fn verify_ship_dataflow_no_error(&mut self, dataflow: DataflowDescription<Plan>) {
4265 let compute_instance = ComputeInstanceId::user(1).expect("1 is a valid ID");
4273
4274 let _: () = self.ship_dataflow(dataflow, compute_instance, None).await;
4275 }
4276}
4277
4278struct LastMessage {
4280 kind: &'static str,
4281 stmt: Option<Arc<Statement<Raw>>>,
4282}
4283
4284impl LastMessage {
4285 fn stmt_to_string(&self) -> Cow<'static, str> {
4287 self.stmt
4288 .as_ref()
4289 .map(|stmt| stmt.to_ast_string_redacted().into())
4290 .unwrap_or(Cow::Borrowed("<none>"))
4291 }
4292}
4293
4294impl fmt::Debug for LastMessage {
4295 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4296 f.debug_struct("LastMessage")
4297 .field("kind", &self.kind)
4298 .field("stmt", &self.stmt_to_string())
4299 .finish()
4300 }
4301}
4302
4303impl Drop for LastMessage {
4304 fn drop(&mut self) {
4305 if std::thread::panicking() {
4307 eprintln!("Coordinator panicking, dumping last message\n{self:?}",);
4309 }
4310 }
4311}
4312
4313pub fn serve(
4325 Config {
4326 controller_config,
4327 controller_envd_epoch,
4328 mut storage,
4329 audit_logs_iterator,
4330 timestamp_oracle_url,
4331 unsafe_mode,
4332 all_features,
4333 build_info,
4334 environment_id,
4335 metrics_registry,
4336 now,
4337 secrets_controller,
4338 cloud_resource_controller,
4339 cluster_replica_sizes,
4340 builtin_system_cluster_config,
4341 builtin_catalog_server_cluster_config,
4342 builtin_probe_cluster_config,
4343 builtin_support_cluster_config,
4344 builtin_analytics_cluster_config,
4345 system_parameter_defaults,
4346 availability_zones,
4347 storage_usage_client,
4348 storage_usage_collection_interval,
4349 storage_usage_retention_period,
4350 segment_client,
4351 egress_addresses,
4352 aws_account_id,
4353 aws_privatelink_availability_zones,
4354 connection_context,
4355 connection_limit_callback,
4356 remote_system_parameters,
4357 webhook_concurrency_limit,
4358 http_host_name,
4359 tracing_handle,
4360 read_only_controllers,
4361 caught_up_trigger: clusters_caught_up_trigger,
4362 helm_chart_version,
4363 license_key,
4364 external_login_password_mz_system,
4365 force_builtin_schema_migration,
4366 }: Config,
4367) -> BoxFuture<'static, Result<(Handle, Client), AdapterError>> {
4368 async move {
4369 let coord_start = Instant::now();
4370 info!("startup: coordinator init: beginning");
4371 info!("startup: coordinator init: preamble beginning");
4372
4373 let _builtins = LazyLock::force(&BUILTINS_STATIC);
4377
4378 let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
4379 let (internal_cmd_tx, internal_cmd_rx) = mpsc::unbounded_channel();
4380 let (strict_serializable_reads_tx, strict_serializable_reads_rx) =
4381 mpsc::unbounded_channel();
4382
4383 if !availability_zones.iter().all_unique() {
4385 coord_bail!("availability zones must be unique");
4386 }
4387
4388 let aws_principal_context = match (
4389 aws_account_id,
4390 connection_context.aws_external_id_prefix.clone(),
4391 ) {
4392 (Some(aws_account_id), Some(aws_external_id_prefix)) => Some(AwsPrincipalContext {
4393 aws_account_id,
4394 aws_external_id_prefix,
4395 }),
4396 _ => None,
4397 };
4398
4399 let aws_privatelink_availability_zones = aws_privatelink_availability_zones
4400 .map(|azs_vec| BTreeSet::from_iter(azs_vec.iter().cloned()));
4401
4402 info!(
4403 "startup: coordinator init: preamble complete in {:?}",
4404 coord_start.elapsed()
4405 );
4406 let oracle_init_start = Instant::now();
4407 info!("startup: coordinator init: timestamp oracle init beginning");
4408
4409 let timestamp_oracle_config = timestamp_oracle_url
4410 .map(|url| TimestampOracleConfig::from_url(&url, &metrics_registry))
4411 .transpose()?;
4412 let mut initial_timestamps =
4413 get_initial_oracle_timestamps(×tamp_oracle_config).await?;
4414
4415 initial_timestamps
4419 .entry(Timeline::EpochMilliseconds)
4420 .or_insert_with(mz_repr::Timestamp::minimum);
4421 let mut timestamp_oracles = BTreeMap::new();
4422 for (timeline, initial_timestamp) in initial_timestamps {
4423 Coordinator::ensure_timeline_state_with_initial_time(
4424 &timeline,
4425 initial_timestamp,
4426 now.clone(),
4427 timestamp_oracle_config.clone(),
4428 &mut timestamp_oracles,
4429 read_only_controllers,
4430 )
4431 .await;
4432 }
4433
4434 let catalog_upper = storage.current_upper().await;
4438 let epoch_millis_oracle = ×tamp_oracles
4444 .get(&Timeline::EpochMilliseconds)
4445 .expect("inserted above")
4446 .oracle;
4447
4448 let mut boot_ts = if read_only_controllers {
4449 let read_ts = epoch_millis_oracle.read_ts().await;
4450 std::cmp::max(read_ts, catalog_upper)
4451 } else {
4452 epoch_millis_oracle.apply_write(catalog_upper).await;
4455 epoch_millis_oracle.write_ts().await.timestamp
4456 };
4457
4458 info!(
4459 "startup: coordinator init: timestamp oracle init complete in {:?}",
4460 oracle_init_start.elapsed()
4461 );
4462
4463 let catalog_open_start = Instant::now();
4464 info!("startup: coordinator init: catalog open beginning");
4465 let persist_client = controller_config
4466 .persist_clients
4467 .open(controller_config.persist_location.clone())
4468 .await
4469 .context("opening persist client")?;
4470 let builtin_item_migration_config =
4471 BuiltinItemMigrationConfig {
4472 persist_client: persist_client.clone(),
4473 read_only: read_only_controllers,
4474 force_migration: force_builtin_schema_migration,
4475 }
4476 ;
4477 let OpenCatalogResult {
4478 mut catalog,
4479 migrated_storage_collections_0dt,
4480 new_builtin_collections,
4481 builtin_table_updates,
4482 cached_global_exprs,
4483 uncached_local_exprs,
4484 } = Catalog::open(mz_catalog::config::Config {
4485 storage,
4486 metrics_registry: &metrics_registry,
4487 state: mz_catalog::config::StateConfig {
4488 unsafe_mode,
4489 all_features,
4490 build_info,
4491 environment_id: environment_id.clone(),
4492 read_only: read_only_controllers,
4493 now: now.clone(),
4494 boot_ts: boot_ts.clone(),
4495 skip_migrations: false,
4496 cluster_replica_sizes,
4497 builtin_system_cluster_config,
4498 builtin_catalog_server_cluster_config,
4499 builtin_probe_cluster_config,
4500 builtin_support_cluster_config,
4501 builtin_analytics_cluster_config,
4502 system_parameter_defaults,
4503 remote_system_parameters,
4504 availability_zones,
4505 egress_addresses,
4506 aws_principal_context,
4507 aws_privatelink_availability_zones,
4508 connection_context,
4509 http_host_name,
4510 builtin_item_migration_config,
4511 persist_client: persist_client.clone(),
4512 enable_expression_cache_override: None,
4513 helm_chart_version,
4514 external_login_password_mz_system,
4515 license_key: license_key.clone(),
4516 },
4517 })
4518 .await?;
4519
4520 let catalog_upper = catalog.current_upper().await;
4523 boot_ts = std::cmp::max(boot_ts, catalog_upper);
4524
4525 if !read_only_controllers {
4526 epoch_millis_oracle.apply_write(boot_ts).await;
4527 }
4528
4529 info!(
4530 "startup: coordinator init: catalog open complete in {:?}",
4531 catalog_open_start.elapsed()
4532 );
4533
4534 let coord_thread_start = Instant::now();
4535 info!("startup: coordinator init: coordinator thread start beginning");
4536
4537 let session_id = catalog.config().session_id;
4538 let start_instant = catalog.config().start_instant;
4539
4540 let (bootstrap_tx, bootstrap_rx) = oneshot::channel();
4544 let handle = TokioHandle::current();
4545
4546 let metrics = Metrics::register_into(&metrics_registry);
4547 let metrics_clone = metrics.clone();
4548 let optimizer_metrics = OptimizerMetrics::register_into(
4549 &metrics_registry,
4550 catalog.system_config().optimizer_e2e_latency_warning_threshold(),
4551 );
4552 let segment_client_clone = segment_client.clone();
4553 let coord_now = now.clone();
4554 let advance_timelines_interval =
4555 tokio::time::interval(catalog.system_config().default_timestamp_interval());
4556 let mut check_scheduling_policies_interval = tokio::time::interval(
4557 catalog
4558 .system_config()
4559 .cluster_check_scheduling_policies_interval(),
4560 );
4561 check_scheduling_policies_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
4562
4563 let clusters_caught_up_check_interval = if read_only_controllers {
4564 let dyncfgs = catalog.system_config().dyncfgs();
4565 let interval = WITH_0DT_DEPLOYMENT_CAUGHT_UP_CHECK_INTERVAL.get(dyncfgs);
4566
4567 let mut interval = tokio::time::interval(interval);
4568 interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
4569 interval
4570 } else {
4571 let mut interval = tokio::time::interval(Duration::from_secs(60 * 60));
4579 interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
4580 interval
4581 };
4582
4583 let clusters_caught_up_check =
4584 clusters_caught_up_trigger.map(|trigger| {
4585 let mut exclude_collections: BTreeSet<GlobalId> =
4586 new_builtin_collections.into_iter().collect();
4587
4588 let mut todo: Vec<_> = migrated_storage_collections_0dt
4598 .iter()
4599 .filter(|id| {
4600 catalog.state().get_entry(id).is_materialized_view()
4601 })
4602 .copied()
4603 .collect();
4604 while let Some(item_id) = todo.pop() {
4605 let entry = catalog.state().get_entry(&item_id);
4606 exclude_collections.extend(entry.global_ids());
4607 todo.extend_from_slice(entry.used_by());
4608 }
4609
4610 CaughtUpCheckContext {
4611 trigger,
4612 exclude_collections,
4613 }
4614 });
4615
4616 if let Some(TimestampOracleConfig::Postgres(pg_config)) =
4617 timestamp_oracle_config.as_ref()
4618 {
4619 let pg_timestamp_oracle_params =
4622 flags::timestamp_oracle_config(catalog.system_config());
4623 pg_timestamp_oracle_params.apply(pg_config);
4624 }
4625
4626 let connection_limit_callback: Arc<dyn Fn(&SystemVars) + Send + Sync> =
4629 Arc::new(move |system_vars: &SystemVars| {
4630 let limit: u64 = system_vars.max_connections().cast_into();
4631 let superuser_reserved: u64 =
4632 system_vars.superuser_reserved_connections().cast_into();
4633
4634 let superuser_reserved = if superuser_reserved >= limit {
4639 tracing::warn!(
4640 "superuser_reserved ({superuser_reserved}) is greater than max connections ({limit})!"
4641 );
4642 limit
4643 } else {
4644 superuser_reserved
4645 };
4646
4647 (connection_limit_callback)(limit, superuser_reserved);
4648 });
4649 catalog.system_config_mut().register_callback(
4650 &mz_sql::session::vars::MAX_CONNECTIONS,
4651 Arc::clone(&connection_limit_callback),
4652 );
4653 catalog.system_config_mut().register_callback(
4654 &mz_sql::session::vars::SUPERUSER_RESERVED_CONNECTIONS,
4655 connection_limit_callback,
4656 );
4657
4658 let (group_commit_tx, group_commit_rx) = appends::notifier();
4659
4660 let parent_span = tracing::Span::current();
4661 let thread = thread::Builder::new()
4662 .stack_size(3 * stack::STACK_SIZE)
4666 .name("coordinator".to_string())
4667 .spawn(move || {
4668 let span = info_span!(parent: parent_span, "coord::coordinator").entered();
4669
4670 let controller = handle
4671 .block_on({
4672 catalog.initialize_controller(
4673 controller_config,
4674 controller_envd_epoch,
4675 read_only_controllers,
4676 )
4677 })
4678 .unwrap_or_terminate("failed to initialize storage_controller");
4679 let catalog_upper = handle.block_on(catalog.current_upper());
4682 boot_ts = std::cmp::max(boot_ts, catalog_upper);
4683 if !read_only_controllers {
4684 let epoch_millis_oracle = ×tamp_oracles
4685 .get(&Timeline::EpochMilliseconds)
4686 .expect("inserted above")
4687 .oracle;
4688 handle.block_on(epoch_millis_oracle.apply_write(boot_ts));
4689 }
4690
4691 let catalog = Arc::new(catalog);
4692
4693 let caching_secrets_reader = CachingSecretsReader::new(secrets_controller.reader());
4694 let mut coord = Coordinator {
4695 controller,
4696 catalog,
4697 internal_cmd_tx,
4698 group_commit_tx,
4699 strict_serializable_reads_tx,
4700 global_timelines: timestamp_oracles,
4701 transient_id_gen: Arc::new(TransientIdGen::new()),
4702 active_conns: BTreeMap::new(),
4703 txn_read_holds: Default::default(),
4704 pending_peeks: BTreeMap::new(),
4705 client_pending_peeks: BTreeMap::new(),
4706 pending_linearize_read_txns: BTreeMap::new(),
4707 serialized_ddl: LockedVecDeque::new(),
4708 active_compute_sinks: BTreeMap::new(),
4709 active_webhooks: BTreeMap::new(),
4710 active_copies: BTreeMap::new(),
4711 staged_cancellation: BTreeMap::new(),
4712 introspection_subscribes: BTreeMap::new(),
4713 write_locks: BTreeMap::new(),
4714 deferred_write_ops: BTreeMap::new(),
4715 pending_writes: Vec::new(),
4716 advance_timelines_interval,
4717 secrets_controller,
4718 caching_secrets_reader,
4719 cloud_resource_controller,
4720 storage_usage_client,
4721 storage_usage_collection_interval,
4722 segment_client,
4723 metrics,
4724 optimizer_metrics,
4725 tracing_handle,
4726 statement_logging: StatementLogging::new(coord_now.clone()),
4727 webhook_concurrency_limit,
4728 timestamp_oracle_config,
4729 check_cluster_scheduling_policies_interval: check_scheduling_policies_interval,
4730 cluster_scheduling_decisions: BTreeMap::new(),
4731 caught_up_check_interval: clusters_caught_up_check_interval,
4732 caught_up_check: clusters_caught_up_check,
4733 installed_watch_sets: BTreeMap::new(),
4734 connection_watch_sets: BTreeMap::new(),
4735 cluster_replica_statuses: ClusterReplicaStatuses::new(),
4736 read_only_controllers,
4737 buffered_builtin_table_updates: Some(Vec::new()),
4738 license_key,
4739 user_id_pool: IdPool::empty(),
4740 persist_client,
4741 };
4742 let bootstrap = handle.block_on(async {
4743 coord
4744 .bootstrap(
4745 boot_ts,
4746 migrated_storage_collections_0dt,
4747 builtin_table_updates,
4748 cached_global_exprs,
4749 uncached_local_exprs,
4750 audit_logs_iterator,
4751 )
4752 .await?;
4753 coord
4754 .controller
4755 .remove_orphaned_replicas(
4756 coord.catalog().get_next_user_replica_id().await?,
4757 coord.catalog().get_next_system_replica_id().await?,
4758 )
4759 .await
4760 .map_err(AdapterError::Orchestrator)?;
4761
4762 if let Some(retention_period) = storage_usage_retention_period {
4763 coord
4764 .prune_storage_usage_events_on_startup(retention_period)
4765 .await;
4766 }
4767
4768 coord.prune_arrangement_sizes_history_on_startup().await;
4769
4770 Ok(())
4771 });
4772 let ok = bootstrap.is_ok();
4773 drop(span);
4774 bootstrap_tx
4775 .send(bootstrap)
4776 .expect("bootstrap_rx is not dropped until it receives this message");
4777 if ok {
4778 handle.block_on(coord.serve(
4779 internal_cmd_rx,
4780 strict_serializable_reads_rx,
4781 cmd_rx,
4782 group_commit_rx,
4783 ));
4784 }
4785 })
4786 .expect("failed to create coordinator thread");
4787 match bootstrap_rx
4788 .await
4789 .expect("bootstrap_tx always sends a message or panics/halts")
4790 {
4791 Ok(()) => {
4792 info!(
4793 "startup: coordinator init: coordinator thread start complete in {:?}",
4794 coord_thread_start.elapsed()
4795 );
4796 info!(
4797 "startup: coordinator init: complete in {:?}",
4798 coord_start.elapsed()
4799 );
4800 let handle = Handle {
4801 session_id,
4802 start_instant,
4803 _thread: thread.join_on_drop(),
4804 };
4805 let client = Client::new(
4806 build_info,
4807 cmd_tx,
4808 metrics_clone,
4809 now,
4810 environment_id,
4811 segment_client_clone,
4812 );
4813 Ok((handle, client))
4814 }
4815 Err(e) => Err(e),
4816 }
4817 }
4818 .boxed()
4819}
4820
4821async fn get_initial_oracle_timestamps(
4835 timestamp_oracle_config: &Option<TimestampOracleConfig>,
4836) -> Result<BTreeMap<Timeline, Timestamp>, AdapterError> {
4837 let mut initial_timestamps = BTreeMap::new();
4838
4839 if let Some(config) = timestamp_oracle_config {
4840 let oracle_timestamps = config.get_all_timelines().await?;
4841
4842 let debug_msg = || {
4843 oracle_timestamps
4844 .iter()
4845 .map(|(timeline, ts)| format!("{:?} -> {}", timeline, ts))
4846 .join(", ")
4847 };
4848 info!(
4849 "current timestamps from the timestamp oracle: {}",
4850 debug_msg()
4851 );
4852
4853 for (timeline, ts) in oracle_timestamps {
4854 let entry = initial_timestamps
4855 .entry(Timeline::from_str(&timeline).expect("could not parse timeline"));
4856
4857 entry
4858 .and_modify(|current_ts| *current_ts = std::cmp::max(*current_ts, ts))
4859 .or_insert(ts);
4860 }
4861 } else {
4862 info!("no timestamp oracle configured!");
4863 };
4864
4865 let debug_msg = || {
4866 initial_timestamps
4867 .iter()
4868 .map(|(timeline, ts)| format!("{:?}: {}", timeline, ts))
4869 .join(", ")
4870 };
4871 info!("initial oracle timestamps: {}", debug_msg());
4872
4873 Ok(initial_timestamps)
4874}
4875
4876#[instrument]
4877pub async fn load_remote_system_parameters(
4878 storage: &mut Box<dyn OpenableDurableCatalogState>,
4879 system_parameter_sync_config: Option<SystemParameterSyncConfig>,
4880 system_parameter_sync_timeout: Duration,
4881) -> Result<Option<BTreeMap<String, String>>, AdapterError> {
4882 if let Some(system_parameter_sync_config) = system_parameter_sync_config {
4883 tracing::info!("parameter sync on boot: start sync");
4884
4885 let mut params = SynchronizedParameters::new(SystemVars::default());
4925 let frontend_sync = async {
4926 let frontend = SystemParameterFrontend::from(&system_parameter_sync_config).await?;
4927 frontend.pull(&mut params);
4928 let ops = params
4929 .modified()
4930 .into_iter()
4931 .map(|param| {
4932 let name = param.name;
4933 let value = param.value;
4934 tracing::info!(name, value, initial = true, "sync parameter");
4935 (name, value)
4936 })
4937 .collect();
4938 tracing::info!("parameter sync on boot: end sync");
4939 Ok(Some(ops))
4940 };
4941 if !storage.has_system_config_synced_once().await? {
4942 frontend_sync.await
4943 } else {
4944 match mz_ore::future::timeout(system_parameter_sync_timeout, frontend_sync).await {
4945 Ok(ops) => Ok(ops),
4946 Err(TimeoutError::Inner(e)) => Err(e),
4947 Err(TimeoutError::DeadlineElapsed) => {
4948 tracing::info!("parameter sync on boot: sync has timed out");
4949 Ok(None)
4950 }
4951 }
4952 }
4953 } else {
4954 Ok(None)
4955 }
4956}
4957
4958#[derive(Debug)]
4959pub enum WatchSetResponse {
4960 StatementDependenciesReady(StatementLoggingId, StatementLifecycleEvent),
4961 AlterSinkReady(AlterSinkReadyContext),
4962 AlterMaterializedViewReady(AlterMaterializedViewReadyContext),
4963}
4964
4965#[derive(Debug)]
4966pub struct AlterSinkReadyContext {
4967 ctx: Option<ExecuteContext>,
4968 otel_ctx: OpenTelemetryContext,
4969 plan: AlterSinkPlan,
4970 plan_validity: PlanValidity,
4971 read_hold: ReadHolds,
4972}
4973
4974impl AlterSinkReadyContext {
4975 fn ctx(&mut self) -> &mut ExecuteContext {
4976 self.ctx.as_mut().expect("only cleared on drop")
4977 }
4978
4979 fn retire(mut self, result: Result<ExecuteResponse, AdapterError>) {
4980 self.ctx
4981 .take()
4982 .expect("only cleared on drop")
4983 .retire(result);
4984 }
4985}
4986
4987impl Drop for AlterSinkReadyContext {
4988 fn drop(&mut self) {
4989 if let Some(ctx) = self.ctx.take() {
4990 ctx.retire(Err(AdapterError::Canceled));
4991 }
4992 }
4993}
4994
4995#[derive(Debug)]
4996pub struct AlterMaterializedViewReadyContext {
4997 ctx: Option<ExecuteContext>,
4998 otel_ctx: OpenTelemetryContext,
4999 plan: plan::AlterMaterializedViewApplyReplacementPlan,
5000 plan_validity: PlanValidity,
5001}
5002
5003impl AlterMaterializedViewReadyContext {
5004 fn ctx(&mut self) -> &mut ExecuteContext {
5005 self.ctx.as_mut().expect("only cleared on drop")
5006 }
5007
5008 fn retire(mut self, result: Result<ExecuteResponse, AdapterError>) {
5009 self.ctx
5010 .take()
5011 .expect("only cleared on drop")
5012 .retire(result);
5013 }
5014}
5015
5016impl Drop for AlterMaterializedViewReadyContext {
5017 fn drop(&mut self) {
5018 if let Some(ctx) = self.ctx.take() {
5019 ctx.retire(Err(AdapterError::Canceled));
5020 }
5021 }
5022}
5023
5024#[derive(Debug)]
5027struct LockedVecDeque<T> {
5028 items: VecDeque<T>,
5029 lock: Arc<tokio::sync::Mutex<()>>,
5030}
5031
5032impl<T> LockedVecDeque<T> {
5033 pub fn new() -> Self {
5034 Self {
5035 items: VecDeque::new(),
5036 lock: Arc::new(tokio::sync::Mutex::new(())),
5037 }
5038 }
5039
5040 pub fn try_lock_owned(&self) -> Result<OwnedMutexGuard<()>, tokio::sync::TryLockError> {
5041 Arc::clone(&self.lock).try_lock_owned()
5042 }
5043
5044 pub fn is_empty(&self) -> bool {
5045 self.items.is_empty()
5046 }
5047
5048 pub fn push_back(&mut self, value: T) {
5049 self.items.push_back(value)
5050 }
5051
5052 pub fn pop_front(&mut self) -> Option<T> {
5053 self.items.pop_front()
5054 }
5055
5056 pub fn remove(&mut self, index: usize) -> Option<T> {
5057 self.items.remove(index)
5058 }
5059
5060 pub fn iter(&self) -> std::collections::vec_deque::Iter<'_, T> {
5061 self.items.iter()
5062 }
5063}
5064
5065#[derive(Debug)]
5066struct DeferredPlanStatement {
5067 ctx: ExecuteContext,
5068 ps: PlanStatement,
5069}
5070
5071#[derive(Debug)]
5072enum PlanStatement {
5073 Statement {
5074 stmt: Arc<Statement<Raw>>,
5075 params: Params,
5076 },
5077 Plan {
5078 plan: mz_sql::plan::Plan,
5079 resolved_ids: ResolvedIds,
5080 },
5081}
5082
5083#[derive(Debug, Error)]
5084pub enum NetworkPolicyError {
5085 #[error("Access denied for address {0}")]
5086 AddressDenied(IpAddr),
5087 #[error("Access denied missing IP address")]
5088 MissingIp,
5089}
5090
5091pub(crate) fn validate_ip_with_policy_rules(
5092 ip: &IpAddr,
5093 rules: &Vec<NetworkPolicyRule>,
5094) -> Result<(), NetworkPolicyError> {
5095 if rules.iter().any(|r| r.address.0.contains(ip)) {
5098 Ok(())
5099 } else {
5100 Err(NetworkPolicyError::AddressDenied(ip.clone()))
5101 }
5102}
5103
5104pub(crate) fn infer_sql_type_for_catalog(
5105 hir_expr: &HirRelationExpr,
5106 mir_expr: &MirRelationExpr,
5107) -> SqlRelationType {
5108 let mut typ = hir_expr.top_level_typ();
5109 typ.backport_nullability_and_keys(&mir_expr.typ());
5110 typ
5111}
5112
5113#[cfg(test)]
5114mod id_pool_tests {
5115 use super::IdPool;
5116
5117 #[mz_ore::test]
5118 fn test_empty_pool() {
5119 let mut pool = IdPool::empty();
5120 assert_eq!(pool.remaining(), 0);
5121 assert_eq!(pool.allocate(), None);
5122 assert_eq!(pool.allocate_many(1), None);
5123 }
5124
5125 #[mz_ore::test]
5126 fn test_allocate_single() {
5127 let mut pool = IdPool::empty();
5128 pool.refill(10, 13);
5129 assert_eq!(pool.remaining(), 3);
5130 assert_eq!(pool.allocate(), Some(10));
5131 assert_eq!(pool.allocate(), Some(11));
5132 assert_eq!(pool.allocate(), Some(12));
5133 assert_eq!(pool.remaining(), 0);
5134 assert_eq!(pool.allocate(), None);
5135 }
5136
5137 #[mz_ore::test]
5138 fn test_allocate_many() {
5139 let mut pool = IdPool::empty();
5140 pool.refill(100, 105);
5141 assert_eq!(pool.allocate_many(3), Some(vec![100, 101, 102]));
5142 assert_eq!(pool.remaining(), 2);
5143 assert_eq!(pool.allocate_many(3), None);
5145 assert_eq!(pool.allocate_many(2), Some(vec![103, 104]));
5147 assert_eq!(pool.remaining(), 0);
5148 }
5149
5150 #[mz_ore::test]
5151 fn test_allocate_many_zero() {
5152 let mut pool = IdPool::empty();
5153 pool.refill(1, 5);
5154 assert_eq!(pool.allocate_many(0), Some(vec![]));
5155 assert_eq!(pool.remaining(), 4);
5156 }
5157
5158 #[mz_ore::test]
5159 fn test_refill_resets_pool() {
5160 let mut pool = IdPool::empty();
5161 pool.refill(0, 2);
5162 assert_eq!(pool.allocate(), Some(0));
5163 pool.refill(50, 52);
5165 assert_eq!(pool.allocate(), Some(50));
5166 assert_eq!(pool.allocate(), Some(51));
5167 assert_eq!(pool.allocate(), None);
5168 }
5169
5170 #[mz_ore::test]
5171 fn test_mixed_allocate_and_allocate_many() {
5172 let mut pool = IdPool::empty();
5173 pool.refill(0, 10);
5174 assert_eq!(pool.allocate(), Some(0));
5175 assert_eq!(pool.allocate_many(3), Some(vec![1, 2, 3]));
5176 assert_eq!(pool.allocate(), Some(4));
5177 assert_eq!(pool.remaining(), 5);
5178 }
5179
5180 #[mz_ore::test]
5181 #[should_panic(expected = "invalid pool range")]
5182 fn test_refill_invalid_range_panics() {
5183 let mut pool = IdPool::empty();
5184 pool.refill(10, 5);
5185 }
5186}
5187
5188#[cfg(test)]
5189mod arrangement_sizes_pruner_tests {
5190 use mz_repr::catalog_item_id::CatalogItemId;
5191 use mz_repr::{Datum, Row};
5192
5193 use super::arrangement_sizes_expired_retractions;
5194
5195 fn history_row(ts_ms: i64) -> Row {
5199 let dt = mz_ore::now::to_datetime(ts_ms.try_into().expect("non-negative"));
5200 Row::pack_slice(&[
5201 Datum::String("r1"),
5202 Datum::String("u1"),
5203 Datum::Int64(123),
5204 Datum::TimestampTz(dt.try_into().expect("fits in TimestampTz")),
5205 ])
5206 }
5207
5208 fn item_id() -> CatalogItemId {
5209 CatalogItemId::User(42)
5211 }
5212
5213 #[mz_ore::test]
5214 fn empty_input_produces_no_retractions() {
5215 let out = arrangement_sizes_expired_retractions(Vec::new(), 1_000, item_id());
5216 assert!(out.is_empty());
5217 }
5218
5219 #[mz_ore::test]
5220 fn retracts_only_rows_strictly_before_cutoff() {
5221 let rows = vec![
5224 (history_row(100), 1),
5225 (history_row(500), 1),
5226 (history_row(1_000), 1), (history_row(5_000), 1),
5228 ];
5229 let out = arrangement_sizes_expired_retractions(rows, 1_000, item_id());
5230 assert_eq!(out.len(), 2);
5231 }
5232
5233 #[mz_ore::test]
5234 #[should_panic(expected = "consolidated contents should not contain retractions")]
5235 fn retraction_in_input_panics() {
5236 let rows = vec![(history_row(100), -1)];
5237 let _ = arrangement_sizes_expired_retractions(rows, 1_000, item_id());
5238 }
5239}