1use std::borrow::Cow;
70use std::collections::{BTreeMap, BTreeSet, VecDeque};
71use std::net::IpAddr;
72use std::num::NonZeroI64;
73use std::ops::Neg;
74use std::str::FromStr;
75use std::sync::LazyLock;
76use std::sync::{Arc, Mutex};
77use std::thread;
78use std::time::{Duration, Instant};
79use std::{fmt, mem};
80
81use anyhow::Context;
82use chrono::{DateTime, Utc};
83use derivative::Derivative;
84use differential_dataflow::lattice::Lattice;
85use fail::fail_point;
86use futures::StreamExt;
87use futures::future::{BoxFuture, FutureExt, LocalBoxFuture};
88use http::Uri;
89use ipnet::IpNet;
90use itertools::{Either, Itertools};
91use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
92use mz_adapter_types::compaction::CompactionWindow;
93use mz_adapter_types::connection::ConnectionId;
94use mz_adapter_types::dyncfgs::{
95 USER_ID_POOL_BATCH_SIZE, WITH_0DT_DEPLOYMENT_CAUGHT_UP_CHECK_INTERVAL,
96};
97use mz_auth::password::Password;
98use mz_build_info::BuildInfo;
99use mz_catalog::builtin::{BUILTINS, BUILTINS_STATIC, MZ_AUDIT_EVENTS, MZ_STORAGE_USAGE_BY_SHARD};
100use mz_catalog::config::{AwsPrincipalContext, BuiltinItemMigrationConfig, ClusterReplicaSizeMap};
101use mz_catalog::durable::{AuditLogIterator, OpenableDurableCatalogState};
102use mz_catalog::expr_cache::{GlobalExpressions, LocalExpressions};
103use mz_catalog::memory::objects::{
104 CatalogEntry, CatalogItem, ClusterReplicaProcessStatus, ClusterVariantManaged, Connection,
105 DataSourceDesc, StateDiff, StateUpdate, StateUpdateKind, Table, TableDataSource,
106};
107use mz_cloud_resources::{CloudResourceController, VpcEndpointConfig, VpcEndpointEvent};
108use mz_compute_client::as_of_selection;
109use mz_compute_client::controller::error::{
110 CollectionLookupError, CollectionMissing, DataflowCreationError, InstanceMissing,
111};
112use mz_compute_types::ComputeInstanceId;
113use mz_compute_types::dataflows::DataflowDescription;
114use mz_compute_types::plan::Plan;
115use mz_controller::clusters::{
116 ClusterConfig, ClusterEvent, ClusterStatus, ProcessId, ReplicaLocation,
117};
118use mz_controller::{ControllerConfig, Readiness};
119use mz_controller_types::{ClusterId, ReplicaId, WatchSetId};
120use mz_expr::{MapFilterProject, MirRelationExpr, OptimizedMirRelationExpr, RowSetFinishing};
121use mz_license_keys::{ExpirationBehavior, ValidatedLicenseKey};
122use mz_orchestrator::OfflineReason;
123use mz_ore::cast::{CastFrom, CastInto, CastLossy};
124use mz_ore::channel::trigger::Trigger;
125use mz_ore::future::TimeoutError;
126use mz_ore::metrics::MetricsRegistry;
127use mz_ore::now::{EpochMillis, NowFn};
128use mz_ore::task::{JoinHandle, spawn};
129use mz_ore::thread::JoinHandleExt;
130use mz_ore::tracing::{OpenTelemetryContext, TracingHandle};
131use mz_ore::url::SensitiveUrl;
132use mz_ore::{
133 assert_none, instrument, soft_assert_eq_or_log, soft_assert_or_log, soft_panic_or_log, stack,
134};
135use mz_persist_client::PersistClient;
136use mz_persist_client::batch::ProtoBatch;
137use mz_persist_client::usage::{ShardsUsageReferenced, StorageUsageClient};
138use mz_repr::adt::numeric::Numeric;
139use mz_repr::explain::{ExplainConfig, ExplainFormat};
140use mz_repr::global_id::TransientIdGen;
141use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
142use mz_repr::role_id::RoleId;
143use mz_repr::{CatalogItemId, Diff, GlobalId, RelationDesc, SqlRelationType, Timestamp};
144use mz_secrets::cache::CachingSecretsReader;
145use mz_secrets::{SecretsController, SecretsReader};
146use mz_sql::ast::{Raw, Statement};
147use mz_sql::catalog::{CatalogCluster, EnvironmentId};
148use mz_sql::names::{QualifiedItemName, ResolvedIds, SchemaSpecifier};
149use mz_sql::optimizer_metrics::OptimizerMetrics;
150use mz_sql::plan::{
151 self, AlterSinkPlan, ConnectionDetails, CreateConnectionPlan, HirRelationExpr,
152 NetworkPolicyRule, OnTimeoutAction, Params, QueryWhen,
153};
154use mz_sql::session::user::User;
155use mz_sql::session::vars::{MAX_CREDIT_CONSUMPTION_RATE, SystemVars, Var};
156use mz_sql_parser::ast::ExplainStage;
157use mz_sql_parser::ast::display::AstDisplay;
158use mz_storage_client::client::TableData;
159use mz_storage_client::controller::{CollectionDescription, DataSource, ExportDescription};
160use mz_storage_types::connections::Connection as StorageConnection;
161use mz_storage_types::connections::ConnectionContext;
162use mz_storage_types::connections::inline::{IntoInlineConnection, ReferencedConnection};
163use mz_storage_types::read_holds::ReadHold;
164use mz_storage_types::sinks::{S3SinkFormat, StorageSinkDesc};
165use mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC;
166use mz_storage_types::sources::{IngestionDescription, SourceExport, Timeline};
167use mz_timestamp_oracle::{TimestampOracleConfig, WriteTimestamp};
168use mz_transform::dataflow::DataflowMetainfo;
169use opentelemetry::trace::TraceContextExt;
170use serde::Serialize;
171use thiserror::Error;
172use timely::progress::{Antichain, Timestamp as _};
173use tokio::runtime::Handle as TokioHandle;
174use tokio::select;
175use tokio::sync::{OwnedMutexGuard, mpsc, oneshot, watch};
176use tokio::time::{Interval, MissedTickBehavior};
177use tracing::{Instrument, Level, Span, debug, info, info_span, span, warn};
178use tracing_opentelemetry::OpenTelemetrySpanExt;
179use uuid::Uuid;
180
181use crate::active_compute_sink::{ActiveComputeSink, ActiveCopyFrom};
182use crate::catalog::{BuiltinTableUpdate, Catalog, OpenCatalogResult};
183use crate::client::{Client, Handle};
184use crate::command::{Command, ExecuteResponse};
185use crate::config::{SynchronizedParameters, SystemParameterFrontend, SystemParameterSyncConfig};
186use crate::coord::appends::{
187 BuiltinTableAppendNotify, DeferredOp, GroupCommitPermit, PendingWriteTxn,
188};
189use crate::coord::caught_up::CaughtUpCheckContext;
190use crate::coord::cluster_scheduling::SchedulingDecision;
191use crate::coord::id_bundle::CollectionIdBundle;
192use crate::coord::introspection::IntrospectionSubscribe;
193use crate::coord::peek::PendingPeek;
194use crate::coord::statement_logging::StatementLogging;
195use crate::coord::timeline::{TimelineContext, TimelineState};
196use crate::coord::timestamp_selection::{TimestampContext, TimestampDetermination};
197use crate::coord::validity::PlanValidity;
198use crate::error::AdapterError;
199use crate::explain::insights::PlanInsightsContext;
200use crate::explain::optimizer_trace::{DispatchGuard, OptimizerTrace};
201use crate::metrics::Metrics;
202use crate::optimize::dataflows::{ComputeInstanceSnapshot, DataflowBuilder};
203use crate::optimize::{self, Optimize, OptimizerConfig};
204use crate::session::{EndTransactionAction, Session};
205use crate::statement_logging::{
206 StatementEndedExecutionReason, StatementLifecycleEvent, StatementLoggingId,
207};
208use crate::util::{ClientTransmitter, ResultExt, sort_topological};
209use crate::webhook::{WebhookAppenderInvalidator, WebhookConcurrencyLimiter};
210use crate::{AdapterNotice, ReadHolds, flags};
211
212pub(crate) mod appends;
213pub(crate) mod catalog_serving;
214pub(crate) mod cluster_scheduling;
215pub(crate) mod consistency;
216pub(crate) mod id_bundle;
217pub(crate) mod in_memory_oracle;
218pub(crate) mod peek;
219pub(crate) mod read_policy;
220pub(crate) mod read_then_write;
221pub(crate) mod sequencer;
222pub(crate) mod statement_logging;
223pub(crate) mod timeline;
224pub(crate) mod timestamp_selection;
225
226pub mod catalog_implications;
227mod caught_up;
228mod command_handler;
229mod ddl;
230pub(crate) mod group_sync;
231mod indexes;
232mod introspection;
233mod message_handler;
234mod privatelink_status;
235mod sql;
236mod validity;
237
238#[derive(Debug)]
264pub(crate) struct IdPool {
265 next: u64,
266 upper: u64,
267}
268
269impl IdPool {
270 pub fn empty() -> Self {
272 IdPool { next: 0, upper: 0 }
273 }
274
275 pub fn allocate(&mut self) -> Option<u64> {
277 if self.next < self.upper {
278 let id = self.next;
279 self.next += 1;
280 Some(id)
281 } else {
282 None
283 }
284 }
285
286 pub fn allocate_many(&mut self, n: u64) -> Option<Vec<u64>> {
289 if self.remaining() >= n {
290 let ids = (self.next..self.next + n).collect();
291 self.next += n;
292 Some(ids)
293 } else {
294 None
295 }
296 }
297
298 pub fn remaining(&self) -> u64 {
300 self.upper - self.next
301 }
302
303 pub fn refill(&mut self, next: u64, upper: u64) {
305 assert!(next <= upper, "invalid pool range: {next}..{upper}");
306 self.next = next;
307 self.upper = upper;
308 }
309}
310
311#[derive(Debug)]
312pub enum Message {
313 Command(OpenTelemetryContext, Command),
314 ControllerReady {
315 controller: ControllerReadiness,
316 },
317 PurifiedStatementReady(PurifiedStatementReady),
318 CreateConnectionValidationReady(CreateConnectionValidationReady),
319 AlterConnectionValidationReady(AlterConnectionValidationReady),
320 TryDeferred {
321 conn_id: ConnectionId,
323 acquired_lock: Option<(CatalogItemId, tokio::sync::OwnedMutexGuard<()>)>,
333 },
334 GroupCommitInitiate(Span, Option<GroupCommitPermit>),
336 DeferredStatementReady,
337 AdvanceTimelines,
338 ClusterEvent(ClusterEvent),
339 CancelPendingPeeks {
340 conn_id: ConnectionId,
341 },
342 LinearizeReads,
343 StagedBatches {
344 conn_id: ConnectionId,
345 table_id: CatalogItemId,
346 batches: Vec<Result<ProtoBatch, String>>,
347 },
348 StorageUsageSchedule,
349 StorageUsageFetch,
350 StorageUsageUpdate(ShardsUsageReferenced),
351 StorageUsagePrune(Vec<BuiltinTableUpdate>),
352 ArrangementSizesSchedule,
353 ArrangementSizesSnapshot,
354 ArrangementSizesPrune(Vec<BuiltinTableUpdate>),
355 RetireExecute {
358 data: ExecuteContextExtra,
359 otel_ctx: OpenTelemetryContext,
360 reason: StatementEndedExecutionReason,
361 },
362 ExecuteSingleStatementTransaction {
363 ctx: ExecuteContext,
364 otel_ctx: OpenTelemetryContext,
365 stmt: Arc<Statement<Raw>>,
366 params: mz_sql::plan::Params,
367 },
368 PeekStageReady {
369 ctx: ExecuteContext,
370 span: Span,
371 stage: PeekStage,
372 },
373 CreateIndexStageReady {
374 ctx: ExecuteContext,
375 span: Span,
376 stage: CreateIndexStage,
377 },
378 CreateViewStageReady {
379 ctx: ExecuteContext,
380 span: Span,
381 stage: CreateViewStage,
382 },
383 CreateMaterializedViewStageReady {
384 ctx: ExecuteContext,
385 span: Span,
386 stage: CreateMaterializedViewStage,
387 },
388 SubscribeStageReady {
389 ctx: ExecuteContext,
390 span: Span,
391 stage: SubscribeStage,
392 },
393 IntrospectionSubscribeStageReady {
394 span: Span,
395 stage: IntrospectionSubscribeStage,
396 },
397 SecretStageReady {
398 ctx: ExecuteContext,
399 span: Span,
400 stage: SecretStage,
401 },
402 ClusterStageReady {
403 ctx: ExecuteContext,
404 span: Span,
405 stage: ClusterStage,
406 },
407 ExplainTimestampStageReady {
408 ctx: ExecuteContext,
409 span: Span,
410 stage: ExplainTimestampStage,
411 },
412 DrainStatementLog,
413 PrivateLinkVpcEndpointEvents(Vec<VpcEndpointEvent>),
414 CheckSchedulingPolicies,
415
416 SchedulingDecisions(Vec<(&'static str, Vec<(ClusterId, SchedulingDecision)>)>),
421}
422
423impl Message {
424 pub const fn kind(&self) -> &'static str {
426 match self {
427 Message::Command(_, msg) => match msg {
428 Command::CatalogSnapshot { .. } => "command-catalog_snapshot",
429 Command::Startup { .. } => "command-startup",
430 Command::Execute { .. } => "command-execute",
431 Command::Commit { .. } => "command-commit",
432 Command::CancelRequest { .. } => "command-cancel_request",
433 Command::PrivilegedCancelRequest { .. } => "command-privileged_cancel_request",
434 Command::GetWebhook { .. } => "command-get_webhook",
435 Command::GetSystemVars { .. } => "command-get_system_vars",
436 Command::SetSystemVars { .. } => "command-set_system_vars",
437 Command::Terminate { .. } => "command-terminate",
438 Command::RetireExecute { .. } => "command-retire_execute",
439 Command::CheckConsistency { .. } => "command-check_consistency",
440 Command::Dump { .. } => "command-dump",
441 Command::AuthenticatePassword { .. } => "command-auth_check",
442 Command::AuthenticateGetSASLChallenge { .. } => "command-auth_get_sasl_challenge",
443 Command::AuthenticateVerifySASLProof { .. } => "command-auth_verify_sasl_proof",
444 Command::CheckRoleCanLogin { .. } => "command-check_role_can_login",
445 Command::GetComputeInstanceClient { .. } => "get-compute-instance-client",
446 Command::GetOracle { .. } => "get-oracle",
447 Command::DetermineRealTimeRecentTimestamp { .. } => {
448 "determine-real-time-recent-timestamp"
449 }
450 Command::GetTransactionReadHoldsBundle { .. } => {
451 "get-transaction-read-holds-bundle"
452 }
453 Command::StoreTransactionReadHolds { .. } => "store-transaction-read-holds",
454 Command::ExecuteSlowPathPeek { .. } => "execute-slow-path-peek",
455 Command::ExecuteSubscribe { .. } => "execute-subscribe",
456 Command::CopyToPreflight { .. } => "copy-to-preflight",
457 Command::ExecuteCopyTo { .. } => "execute-copy-to",
458 Command::ExecuteSideEffectingFunc { .. } => "execute-side-effecting-func",
459 Command::RegisterFrontendPeek { .. } => "register-frontend-peek",
460 Command::UnregisterFrontendPeek { .. } => "unregister-frontend-peek",
461 Command::ExplainTimestamp { .. } => "explain-timestamp",
462 Command::FrontendStatementLogging(..) => "frontend-statement-logging",
463 Command::StartCopyFromStdin { .. } => "start-copy-from-stdin",
464 Command::InjectAuditEvents { .. } => "inject-audit-events",
465 },
466 Message::ControllerReady {
467 controller: ControllerReadiness::Compute,
468 } => "controller_ready(compute)",
469 Message::ControllerReady {
470 controller: ControllerReadiness::Storage,
471 } => "controller_ready(storage)",
472 Message::ControllerReady {
473 controller: ControllerReadiness::Metrics,
474 } => "controller_ready(metrics)",
475 Message::ControllerReady {
476 controller: ControllerReadiness::Internal,
477 } => "controller_ready(internal)",
478 Message::PurifiedStatementReady(_) => "purified_statement_ready",
479 Message::CreateConnectionValidationReady(_) => "create_connection_validation_ready",
480 Message::TryDeferred { .. } => "try_deferred",
481 Message::GroupCommitInitiate(..) => "group_commit_initiate",
482 Message::AdvanceTimelines => "advance_timelines",
483 Message::ClusterEvent(_) => "cluster_event",
484 Message::CancelPendingPeeks { .. } => "cancel_pending_peeks",
485 Message::LinearizeReads => "linearize_reads",
486 Message::StagedBatches { .. } => "staged_batches",
487 Message::StorageUsageSchedule => "storage_usage_schedule",
488 Message::StorageUsageFetch => "storage_usage_fetch",
489 Message::StorageUsageUpdate(_) => "storage_usage_update",
490 Message::StorageUsagePrune(_) => "storage_usage_prune",
491 Message::ArrangementSizesSchedule => "arrangement_sizes_schedule",
492 Message::ArrangementSizesSnapshot => "arrangement_sizes_snapshot",
493 Message::ArrangementSizesPrune(_) => "arrangement_sizes_prune",
494 Message::RetireExecute { .. } => "retire_execute",
495 Message::ExecuteSingleStatementTransaction { .. } => {
496 "execute_single_statement_transaction"
497 }
498 Message::PeekStageReady { .. } => "peek_stage_ready",
499 Message::ExplainTimestampStageReady { .. } => "explain_timestamp_stage_ready",
500 Message::CreateIndexStageReady { .. } => "create_index_stage_ready",
501 Message::CreateViewStageReady { .. } => "create_view_stage_ready",
502 Message::CreateMaterializedViewStageReady { .. } => {
503 "create_materialized_view_stage_ready"
504 }
505 Message::SubscribeStageReady { .. } => "subscribe_stage_ready",
506 Message::IntrospectionSubscribeStageReady { .. } => {
507 "introspection_subscribe_stage_ready"
508 }
509 Message::SecretStageReady { .. } => "secret_stage_ready",
510 Message::ClusterStageReady { .. } => "cluster_stage_ready",
511 Message::DrainStatementLog => "drain_statement_log",
512 Message::AlterConnectionValidationReady(..) => "alter_connection_validation_ready",
513 Message::PrivateLinkVpcEndpointEvents(_) => "private_link_vpc_endpoint_events",
514 Message::CheckSchedulingPolicies => "check_scheduling_policies",
515 Message::SchedulingDecisions { .. } => "scheduling_decision",
516 Message::DeferredStatementReady => "deferred_statement_ready",
517 }
518 }
519}
520
521#[derive(Debug)]
523pub enum ControllerReadiness {
524 Storage,
526 Compute,
528 Metrics,
530 Internal,
532}
533
534#[derive(Derivative)]
535#[derivative(Debug)]
536pub struct BackgroundWorkResult<T> {
537 #[derivative(Debug = "ignore")]
538 pub ctx: ExecuteContext,
539 pub result: Result<T, AdapterError>,
540 pub params: Params,
541 pub plan_validity: PlanValidity,
542 pub original_stmt: Arc<Statement<Raw>>,
543 pub otel_ctx: OpenTelemetryContext,
544}
545
546pub type PurifiedStatementReady = BackgroundWorkResult<mz_sql::pure::PurifiedStatement>;
547
548#[derive(Derivative)]
549#[derivative(Debug)]
550pub struct ValidationReady<T> {
551 #[derivative(Debug = "ignore")]
552 pub ctx: ExecuteContext,
553 pub result: Result<T, AdapterError>,
554 pub resolved_ids: ResolvedIds,
555 pub connection_id: CatalogItemId,
556 pub connection_gid: GlobalId,
557 pub plan_validity: PlanValidity,
558 pub otel_ctx: OpenTelemetryContext,
559}
560
561pub type CreateConnectionValidationReady = ValidationReady<CreateConnectionPlan>;
562pub type AlterConnectionValidationReady = ValidationReady<Connection>;
563
564#[derive(Debug)]
565pub enum PeekStage {
566 LinearizeTimestamp(PeekStageLinearizeTimestamp),
568 RealTimeRecency(PeekStageRealTimeRecency),
569 TimestampReadHold(PeekStageTimestampReadHold),
570 Optimize(PeekStageOptimize),
571 Finish(PeekStageFinish),
573 ExplainPlan(PeekStageExplainPlan),
575 ExplainPushdown(PeekStageExplainPushdown),
576 CopyToPreflight(PeekStageCopyTo),
578 CopyToDataflow(PeekStageCopyTo),
580}
581
582#[derive(Debug)]
583pub struct CopyToContext {
584 pub desc: RelationDesc,
586 pub uri: Uri,
588 pub connection: StorageConnection<ReferencedConnection>,
590 pub connection_id: CatalogItemId,
592 pub format: S3SinkFormat,
594 pub max_file_size: u64,
596 pub output_batch_count: Option<u64>,
601}
602
603#[derive(Debug)]
604pub struct PeekStageLinearizeTimestamp {
605 validity: PlanValidity,
606 plan: mz_sql::plan::SelectPlan,
607 max_query_result_size: Option<u64>,
608 source_ids: BTreeSet<GlobalId>,
609 target_replica: Option<ReplicaId>,
610 timeline_context: TimelineContext,
611 optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
612 explain_ctx: ExplainContext,
615}
616
617#[derive(Debug)]
618pub struct PeekStageRealTimeRecency {
619 validity: PlanValidity,
620 plan: mz_sql::plan::SelectPlan,
621 max_query_result_size: Option<u64>,
622 source_ids: BTreeSet<GlobalId>,
623 target_replica: Option<ReplicaId>,
624 timeline_context: TimelineContext,
625 oracle_read_ts: Option<Timestamp>,
626 optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
627 explain_ctx: ExplainContext,
630}
631
632#[derive(Debug)]
633pub struct PeekStageTimestampReadHold {
634 validity: PlanValidity,
635 plan: mz_sql::plan::SelectPlan,
636 max_query_result_size: Option<u64>,
637 source_ids: BTreeSet<GlobalId>,
638 target_replica: Option<ReplicaId>,
639 timeline_context: TimelineContext,
640 oracle_read_ts: Option<Timestamp>,
641 real_time_recency_ts: Option<mz_repr::Timestamp>,
642 optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
643 explain_ctx: ExplainContext,
646}
647
648#[derive(Debug)]
649pub struct PeekStageOptimize {
650 validity: PlanValidity,
651 plan: mz_sql::plan::SelectPlan,
652 max_query_result_size: Option<u64>,
653 source_ids: BTreeSet<GlobalId>,
654 id_bundle: CollectionIdBundle,
655 target_replica: Option<ReplicaId>,
656 determination: TimestampDetermination,
657 optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
658 explain_ctx: ExplainContext,
661}
662
663#[derive(Debug)]
664pub struct PeekStageFinish {
665 validity: PlanValidity,
666 plan: mz_sql::plan::SelectPlan,
667 max_query_result_size: Option<u64>,
668 id_bundle: CollectionIdBundle,
669 target_replica: Option<ReplicaId>,
670 source_ids: BTreeSet<GlobalId>,
671 determination: TimestampDetermination,
672 cluster_id: ComputeInstanceId,
673 finishing: RowSetFinishing,
674 plan_insights_optimizer_trace: Option<OptimizerTrace>,
677 insights_ctx: Option<Box<PlanInsightsContext>>,
678 global_lir_plan: optimize::peek::GlobalLirPlan,
679 optimization_finished_at: EpochMillis,
680}
681
682#[derive(Debug)]
683pub struct PeekStageCopyTo {
684 validity: PlanValidity,
685 optimizer: optimize::copy_to::Optimizer,
686 global_lir_plan: optimize::copy_to::GlobalLirPlan,
687 optimization_finished_at: EpochMillis,
688 source_ids: BTreeSet<GlobalId>,
689}
690
691#[derive(Debug)]
692pub struct PeekStageExplainPlan {
693 validity: PlanValidity,
694 optimizer: optimize::peek::Optimizer,
695 df_meta: DataflowMetainfo,
696 explain_ctx: ExplainPlanContext,
697 insights_ctx: Option<Box<PlanInsightsContext>>,
698}
699
700#[derive(Debug)]
701pub struct PeekStageExplainPushdown {
702 validity: PlanValidity,
703 determination: TimestampDetermination,
704 imports: BTreeMap<GlobalId, MapFilterProject>,
705}
706
707#[derive(Debug)]
708pub enum CreateIndexStage {
709 Optimize(CreateIndexOptimize),
710 Finish(CreateIndexFinish),
711 Explain(CreateIndexExplain),
712}
713
714#[derive(Debug)]
715pub struct CreateIndexOptimize {
716 validity: PlanValidity,
717 plan: plan::CreateIndexPlan,
718 resolved_ids: ResolvedIds,
719 explain_ctx: ExplainContext,
722}
723
724#[derive(Debug)]
725pub struct CreateIndexFinish {
726 validity: PlanValidity,
727 item_id: CatalogItemId,
728 global_id: GlobalId,
729 plan: plan::CreateIndexPlan,
730 resolved_ids: ResolvedIds,
731 global_mir_plan: optimize::index::GlobalMirPlan,
732 global_lir_plan: optimize::index::GlobalLirPlan,
733 optimizer_features: OptimizerFeatures,
734}
735
736#[derive(Debug)]
737pub struct CreateIndexExplain {
738 validity: PlanValidity,
739 exported_index_id: GlobalId,
740 plan: plan::CreateIndexPlan,
741 df_meta: DataflowMetainfo,
742 explain_ctx: ExplainPlanContext,
743}
744
745#[derive(Debug)]
746pub enum CreateViewStage {
747 Optimize(CreateViewOptimize),
748 Finish(CreateViewFinish),
749 Explain(CreateViewExplain),
750}
751
752#[derive(Debug)]
753pub struct CreateViewOptimize {
754 validity: PlanValidity,
755 plan: plan::CreateViewPlan,
756 resolved_ids: ResolvedIds,
757 explain_ctx: ExplainContext,
760}
761
762#[derive(Debug)]
763pub struct CreateViewFinish {
764 validity: PlanValidity,
765 item_id: CatalogItemId,
767 global_id: GlobalId,
769 plan: plan::CreateViewPlan,
770 resolved_ids: ResolvedIds,
772 optimized_expr: OptimizedMirRelationExpr,
773}
774
775#[derive(Debug)]
776pub struct CreateViewExplain {
777 validity: PlanValidity,
778 id: GlobalId,
779 plan: plan::CreateViewPlan,
780 explain_ctx: ExplainPlanContext,
781}
782
783#[derive(Debug)]
784pub enum ExplainTimestampStage {
785 Optimize(ExplainTimestampOptimize),
786 RealTimeRecency(ExplainTimestampRealTimeRecency),
787 Finish(ExplainTimestampFinish),
788}
789
790#[derive(Debug)]
791pub struct ExplainTimestampOptimize {
792 validity: PlanValidity,
793 plan: plan::ExplainTimestampPlan,
794 cluster_id: ClusterId,
795}
796
797#[derive(Debug)]
798pub struct ExplainTimestampRealTimeRecency {
799 validity: PlanValidity,
800 format: ExplainFormat,
801 optimized_plan: OptimizedMirRelationExpr,
802 cluster_id: ClusterId,
803 when: QueryWhen,
804}
805
806#[derive(Debug)]
807pub struct ExplainTimestampFinish {
808 validity: PlanValidity,
809 format: ExplainFormat,
810 optimized_plan: OptimizedMirRelationExpr,
811 cluster_id: ClusterId,
812 source_ids: BTreeSet<GlobalId>,
813 when: QueryWhen,
814 real_time_recency_ts: Option<Timestamp>,
815}
816
817#[derive(Debug)]
818pub enum ClusterStage {
819 Alter(AlterCluster),
820 WaitForHydrated(AlterClusterWaitForHydrated),
821 Finalize(AlterClusterFinalize),
822}
823
824#[derive(Debug)]
825pub struct AlterCluster {
826 validity: PlanValidity,
827 plan: plan::AlterClusterPlan,
828}
829
830#[derive(Debug)]
831pub struct AlterClusterWaitForHydrated {
832 validity: PlanValidity,
833 plan: plan::AlterClusterPlan,
834 new_config: ClusterVariantManaged,
835 workload_class: Option<String>,
836 timeout_time: Instant,
837 on_timeout: OnTimeoutAction,
838}
839
840#[derive(Debug)]
841pub struct AlterClusterFinalize {
842 validity: PlanValidity,
843 plan: plan::AlterClusterPlan,
844 new_config: ClusterVariantManaged,
845 workload_class: Option<String>,
846}
847
848#[derive(Debug)]
849pub enum ExplainContext {
850 None,
852 Plan(ExplainPlanContext),
854 PlanInsightsNotice(OptimizerTrace),
857 Pushdown,
859}
860
861impl ExplainContext {
862 pub(crate) fn dispatch_guard(&self) -> Option<DispatchGuard<'_>> {
866 let optimizer_trace = match self {
867 ExplainContext::Plan(explain_ctx) => Some(&explain_ctx.optimizer_trace),
868 ExplainContext::PlanInsightsNotice(optimizer_trace) => Some(optimizer_trace),
869 _ => None,
870 };
871 optimizer_trace.map(|optimizer_trace| optimizer_trace.as_guard())
872 }
873
874 pub(crate) fn needs_cluster(&self) -> bool {
875 match self {
876 ExplainContext::None => true,
877 ExplainContext::Plan(..) => false,
878 ExplainContext::PlanInsightsNotice(..) => true,
879 ExplainContext::Pushdown => false,
880 }
881 }
882
883 pub(crate) fn needs_plan_insights(&self) -> bool {
884 matches!(
885 self,
886 ExplainContext::Plan(ExplainPlanContext {
887 stage: ExplainStage::PlanInsights,
888 ..
889 }) | ExplainContext::PlanInsightsNotice(_)
890 )
891 }
892}
893
894#[derive(Debug)]
895pub struct ExplainPlanContext {
896 pub broken: bool,
901 pub config: ExplainConfig,
902 pub format: ExplainFormat,
903 pub stage: ExplainStage,
904 pub replan: Option<GlobalId>,
905 pub desc: Option<RelationDesc>,
906 pub optimizer_trace: OptimizerTrace,
907}
908
909#[derive(Debug)]
910pub enum CreateMaterializedViewStage {
911 Optimize(CreateMaterializedViewOptimize),
912 Finish(CreateMaterializedViewFinish),
913 Explain(CreateMaterializedViewExplain),
914}
915
916#[derive(Debug)]
917pub struct CreateMaterializedViewOptimize {
918 validity: PlanValidity,
919 plan: plan::CreateMaterializedViewPlan,
920 resolved_ids: ResolvedIds,
921 explain_ctx: ExplainContext,
924}
925
926#[derive(Debug)]
927pub struct CreateMaterializedViewFinish {
928 item_id: CatalogItemId,
930 global_id: GlobalId,
932 validity: PlanValidity,
933 plan: plan::CreateMaterializedViewPlan,
934 resolved_ids: ResolvedIds,
935 local_mir_plan: optimize::materialized_view::LocalMirPlan,
936 global_mir_plan: optimize::materialized_view::GlobalMirPlan,
937 global_lir_plan: optimize::materialized_view::GlobalLirPlan,
938 optimizer_features: OptimizerFeatures,
939}
940
941#[derive(Debug)]
942pub struct CreateMaterializedViewExplain {
943 global_id: GlobalId,
944 validity: PlanValidity,
945 plan: plan::CreateMaterializedViewPlan,
946 df_meta: DataflowMetainfo,
947 explain_ctx: ExplainPlanContext,
948}
949
950#[derive(Debug)]
951pub enum SubscribeStage {
952 OptimizeMir(SubscribeOptimizeMir),
953 TimestampOptimizeLir(SubscribeTimestampOptimizeLir),
954 Finish(SubscribeFinish),
955 Explain(SubscribeExplain),
956}
957
958#[derive(Debug)]
959pub struct SubscribeOptimizeMir {
960 validity: PlanValidity,
961 plan: plan::SubscribePlan,
962 timeline: TimelineContext,
963 dependency_ids: BTreeSet<GlobalId>,
964 cluster_id: ComputeInstanceId,
965 replica_id: Option<ReplicaId>,
966 explain_ctx: ExplainContext,
969}
970
971#[derive(Debug)]
972pub struct SubscribeTimestampOptimizeLir {
973 validity: PlanValidity,
974 plan: plan::SubscribePlan,
975 timeline: TimelineContext,
976 optimizer: optimize::subscribe::Optimizer,
977 global_mir_plan: optimize::subscribe::GlobalMirPlan<optimize::subscribe::Unresolved>,
978 dependency_ids: BTreeSet<GlobalId>,
979 replica_id: Option<ReplicaId>,
980 explain_ctx: ExplainContext,
983}
984
985#[derive(Debug)]
986pub struct SubscribeFinish {
987 validity: PlanValidity,
988 cluster_id: ComputeInstanceId,
989 replica_id: Option<ReplicaId>,
990 plan: plan::SubscribePlan,
991 global_lir_plan: optimize::subscribe::GlobalLirPlan,
992 dependency_ids: BTreeSet<GlobalId>,
993}
994
995#[derive(Debug)]
996pub struct SubscribeExplain {
997 validity: PlanValidity,
998 optimizer: optimize::subscribe::Optimizer,
999 df_meta: DataflowMetainfo,
1000 cluster_id: ComputeInstanceId,
1001 explain_ctx: ExplainPlanContext,
1002}
1003
1004#[derive(Debug)]
1005pub enum IntrospectionSubscribeStage {
1006 OptimizeMir(IntrospectionSubscribeOptimizeMir),
1007 TimestampOptimizeLir(IntrospectionSubscribeTimestampOptimizeLir),
1008 Finish(IntrospectionSubscribeFinish),
1009}
1010
1011#[derive(Debug)]
1012pub struct IntrospectionSubscribeOptimizeMir {
1013 validity: PlanValidity,
1014 plan: plan::SubscribePlan,
1015 subscribe_id: GlobalId,
1016 cluster_id: ComputeInstanceId,
1017 replica_id: ReplicaId,
1018}
1019
1020#[derive(Debug)]
1021pub struct IntrospectionSubscribeTimestampOptimizeLir {
1022 validity: PlanValidity,
1023 optimizer: optimize::subscribe::Optimizer,
1024 global_mir_plan: optimize::subscribe::GlobalMirPlan<optimize::subscribe::Unresolved>,
1025 cluster_id: ComputeInstanceId,
1026 replica_id: ReplicaId,
1027}
1028
1029#[derive(Debug)]
1030pub struct IntrospectionSubscribeFinish {
1031 validity: PlanValidity,
1032 global_lir_plan: optimize::subscribe::GlobalLirPlan,
1033 read_holds: ReadHolds,
1034 cluster_id: ComputeInstanceId,
1035 replica_id: ReplicaId,
1036}
1037
1038#[derive(Debug)]
1039pub enum SecretStage {
1040 CreateEnsure(CreateSecretEnsure),
1041 CreateFinish(CreateSecretFinish),
1042 RotateKeysEnsure(RotateKeysSecretEnsure),
1043 RotateKeysFinish(RotateKeysSecretFinish),
1044 Alter(AlterSecret),
1045}
1046
1047#[derive(Debug)]
1048pub struct CreateSecretEnsure {
1049 validity: PlanValidity,
1050 plan: plan::CreateSecretPlan,
1051}
1052
1053#[derive(Debug)]
1054pub struct CreateSecretFinish {
1055 validity: PlanValidity,
1056 item_id: CatalogItemId,
1057 global_id: GlobalId,
1058 plan: plan::CreateSecretPlan,
1059}
1060
1061#[derive(Debug)]
1062pub struct RotateKeysSecretEnsure {
1063 validity: PlanValidity,
1064 id: CatalogItemId,
1065}
1066
1067#[derive(Debug)]
1068pub struct RotateKeysSecretFinish {
1069 validity: PlanValidity,
1070 ops: Vec<crate::catalog::Op>,
1071}
1072
1073#[derive(Debug)]
1074pub struct AlterSecret {
1075 validity: PlanValidity,
1076 plan: plan::AlterSecretPlan,
1077}
1078
1079#[derive(Debug, Copy, Clone, PartialEq, Eq)]
1084pub enum TargetCluster {
1085 CatalogServer,
1087 Active,
1089 Transaction(ClusterId),
1091}
1092
1093pub(crate) enum StageResult<T> {
1095 Handle(JoinHandle<Result<T, AdapterError>>),
1097 HandleRetire(JoinHandle<Result<ExecuteResponse, AdapterError>>),
1099 Immediate(T),
1101 Response(ExecuteResponse),
1103}
1104
1105pub(crate) trait Staged: Send {
1107 type Ctx: StagedContext;
1108
1109 fn validity(&mut self) -> &mut PlanValidity;
1110
1111 async fn stage(
1113 self,
1114 coord: &mut Coordinator,
1115 ctx: &mut Self::Ctx,
1116 ) -> Result<StageResult<Box<Self>>, AdapterError>;
1117
1118 fn message(self, ctx: Self::Ctx, span: Span) -> Message;
1120
1121 fn cancel_enabled(&self) -> bool;
1123}
1124
1125pub trait StagedContext {
1126 fn retire(self, result: Result<ExecuteResponse, AdapterError>);
1127 fn session(&self) -> Option<&Session>;
1128}
1129
1130impl StagedContext for ExecuteContext {
1131 fn retire(self, result: Result<ExecuteResponse, AdapterError>) {
1132 self.retire(result);
1133 }
1134
1135 fn session(&self) -> Option<&Session> {
1136 Some(self.session())
1137 }
1138}
1139
1140impl StagedContext for () {
1141 fn retire(self, _result: Result<ExecuteResponse, AdapterError>) {}
1142
1143 fn session(&self) -> Option<&Session> {
1144 None
1145 }
1146}
1147
1148pub struct Config {
1150 pub controller_config: ControllerConfig,
1151 pub controller_envd_epoch: NonZeroI64,
1152 pub storage: Box<dyn mz_catalog::durable::DurableCatalogState>,
1153 pub audit_logs_iterator: AuditLogIterator,
1154 pub timestamp_oracle_url: Option<SensitiveUrl>,
1155 pub unsafe_mode: bool,
1156 pub all_features: bool,
1157 pub build_info: &'static BuildInfo,
1158 pub environment_id: EnvironmentId,
1159 pub metrics_registry: MetricsRegistry,
1160 pub now: NowFn,
1161 pub secrets_controller: Arc<dyn SecretsController>,
1162 pub cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
1163 pub availability_zones: Vec<String>,
1164 pub cluster_replica_sizes: ClusterReplicaSizeMap,
1165 pub builtin_system_cluster_config: BootstrapBuiltinClusterConfig,
1166 pub builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig,
1167 pub builtin_probe_cluster_config: BootstrapBuiltinClusterConfig,
1168 pub builtin_support_cluster_config: BootstrapBuiltinClusterConfig,
1169 pub builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig,
1170 pub system_parameter_defaults: BTreeMap<String, String>,
1171 pub storage_usage_client: StorageUsageClient,
1172 pub storage_usage_collection_interval: Duration,
1173 pub storage_usage_retention_period: Option<Duration>,
1174 pub segment_client: Option<mz_segment::Client>,
1175 pub egress_addresses: Vec<IpNet>,
1176 pub remote_system_parameters: Option<BTreeMap<String, String>>,
1177 pub aws_account_id: Option<String>,
1178 pub aws_privatelink_availability_zones: Option<Vec<String>>,
1179 pub connection_context: ConnectionContext,
1180 pub connection_limit_callback: Box<dyn Fn(u64, u64) -> () + Send + Sync + 'static>,
1181 pub webhook_concurrency_limit: WebhookConcurrencyLimiter,
1182 pub http_host_name: Option<String>,
1183 pub tracing_handle: TracingHandle,
1184 pub read_only_controllers: bool,
1188
1189 pub caught_up_trigger: Option<Trigger>,
1193
1194 pub helm_chart_version: Option<String>,
1195 pub license_key: ValidatedLicenseKey,
1196 pub external_login_password_mz_system: Option<Password>,
1197 pub force_builtin_schema_migration: Option<String>,
1198}
1199
1200#[derive(Debug, Serialize)]
1202pub struct ConnMeta {
1203 secret_key: u32,
1208 connected_at: EpochMillis,
1210 user: User,
1211 application_name: String,
1212 uuid: Uuid,
1213 conn_id: ConnectionId,
1214 client_ip: Option<IpAddr>,
1215
1216 drop_sinks: BTreeSet<GlobalId>,
1219
1220 #[serde(skip)]
1222 deferred_lock: Option<OwnedMutexGuard<()>>,
1223
1224 pending_cluster_alters: BTreeSet<ClusterId>,
1227
1228 #[serde(skip)]
1230 notice_tx: mpsc::UnboundedSender<AdapterNotice>,
1231
1232 authenticated_role: RoleId,
1236}
1237
1238impl ConnMeta {
1239 pub fn conn_id(&self) -> &ConnectionId {
1240 &self.conn_id
1241 }
1242
1243 pub fn user(&self) -> &User {
1244 &self.user
1245 }
1246
1247 pub fn application_name(&self) -> &str {
1248 &self.application_name
1249 }
1250
1251 pub fn authenticated_role_id(&self) -> &RoleId {
1252 &self.authenticated_role
1253 }
1254
1255 pub fn uuid(&self) -> Uuid {
1256 self.uuid
1257 }
1258
1259 pub fn client_ip(&self) -> Option<IpAddr> {
1260 self.client_ip
1261 }
1262
1263 pub fn connected_at(&self) -> EpochMillis {
1264 self.connected_at
1265 }
1266}
1267
1268#[derive(Debug)]
1269pub struct PendingTxn {
1271 ctx: ExecuteContext,
1273 response: Result<PendingTxnResponse, AdapterError>,
1275 action: EndTransactionAction,
1277}
1278
1279#[derive(Debug)]
1280pub enum PendingTxnResponse {
1282 Committed {
1284 params: BTreeMap<&'static str, String>,
1286 },
1287 Rolledback {
1289 params: BTreeMap<&'static str, String>,
1291 },
1292}
1293
1294impl PendingTxnResponse {
1295 pub fn extend_params(&mut self, p: impl IntoIterator<Item = (&'static str, String)>) {
1296 match self {
1297 PendingTxnResponse::Committed { params }
1298 | PendingTxnResponse::Rolledback { params } => params.extend(p),
1299 }
1300 }
1301}
1302
1303impl From<PendingTxnResponse> for ExecuteResponse {
1304 fn from(value: PendingTxnResponse) -> Self {
1305 match value {
1306 PendingTxnResponse::Committed { params } => {
1307 ExecuteResponse::TransactionCommitted { params }
1308 }
1309 PendingTxnResponse::Rolledback { params } => {
1310 ExecuteResponse::TransactionRolledBack { params }
1311 }
1312 }
1313 }
1314}
1315
1316#[derive(Debug)]
1317pub struct PendingReadTxn {
1319 txn: PendingRead,
1321 timestamp_context: TimestampContext,
1323 created: Instant,
1325 num_requeues: u64,
1329 otel_ctx: OpenTelemetryContext,
1331}
1332
1333impl PendingReadTxn {
1334 pub fn timestamp_context(&self) -> &TimestampContext {
1336 &self.timestamp_context
1337 }
1338
1339 pub(crate) fn take_context(self) -> ExecuteContext {
1340 self.txn.take_context()
1341 }
1342}
1343
1344#[derive(Debug)]
1345enum PendingRead {
1347 Read {
1348 txn: PendingTxn,
1350 },
1351 ReadThenWrite {
1352 ctx: ExecuteContext,
1354 tx: oneshot::Sender<Option<ExecuteContext>>,
1357 },
1358}
1359
1360impl PendingRead {
1361 #[instrument(level = "debug")]
1366 pub fn finish(self) -> Option<(ExecuteContext, Result<ExecuteResponse, AdapterError>)> {
1367 match self {
1368 PendingRead::Read {
1369 txn:
1370 PendingTxn {
1371 mut ctx,
1372 response,
1373 action,
1374 },
1375 ..
1376 } => {
1377 let changed = ctx.session_mut().vars_mut().end_transaction(action);
1378 let response = response.map(|mut r| {
1380 r.extend_params(changed);
1381 ExecuteResponse::from(r)
1382 });
1383
1384 Some((ctx, response))
1385 }
1386 PendingRead::ReadThenWrite { ctx, tx, .. } => {
1387 let _ = tx.send(Some(ctx));
1389 None
1390 }
1391 }
1392 }
1393
1394 fn label(&self) -> &'static str {
1395 match self {
1396 PendingRead::Read { .. } => "read",
1397 PendingRead::ReadThenWrite { .. } => "read_then_write",
1398 }
1399 }
1400
1401 pub(crate) fn take_context(self) -> ExecuteContext {
1402 match self {
1403 PendingRead::Read { txn, .. } => txn.ctx,
1404 PendingRead::ReadThenWrite { ctx, tx, .. } => {
1405 let _ = tx.send(None);
1408 ctx
1409 }
1410 }
1411 }
1412}
1413
1414#[derive(Debug, Default)]
1424#[must_use]
1425pub struct ExecuteContextExtra {
1426 statement_uuid: Option<StatementLoggingId>,
1427}
1428
1429impl ExecuteContextExtra {
1430 pub(crate) fn new(statement_uuid: Option<StatementLoggingId>) -> Self {
1431 Self { statement_uuid }
1432 }
1433 pub fn is_trivial(&self) -> bool {
1434 self.statement_uuid.is_none()
1435 }
1436 pub fn contents(&self) -> Option<StatementLoggingId> {
1437 self.statement_uuid
1438 }
1439 #[must_use]
1443 pub(crate) fn retire(self) -> Option<StatementLoggingId> {
1444 self.statement_uuid
1445 }
1446}
1447
1448#[derive(Debug)]
1458#[must_use]
1459pub struct ExecuteContextGuard {
1460 extra: ExecuteContextExtra,
1461 coordinator_tx: mpsc::UnboundedSender<Message>,
1466}
1467
1468impl Default for ExecuteContextGuard {
1469 fn default() -> Self {
1470 let (tx, _rx) = mpsc::unbounded_channel();
1474 Self {
1475 extra: ExecuteContextExtra::default(),
1476 coordinator_tx: tx,
1477 }
1478 }
1479}
1480
1481impl ExecuteContextGuard {
1482 pub(crate) fn new(
1483 statement_uuid: Option<StatementLoggingId>,
1484 coordinator_tx: mpsc::UnboundedSender<Message>,
1485 ) -> Self {
1486 Self {
1487 extra: ExecuteContextExtra::new(statement_uuid),
1488 coordinator_tx,
1489 }
1490 }
1491 pub fn is_trivial(&self) -> bool {
1492 self.extra.is_trivial()
1493 }
1494 pub fn contents(&self) -> Option<StatementLoggingId> {
1495 self.extra.contents()
1496 }
1497 pub(crate) fn defuse(mut self) -> ExecuteContextExtra {
1504 std::mem::take(&mut self.extra)
1506 }
1507}
1508
1509impl Drop for ExecuteContextGuard {
1510 fn drop(&mut self) {
1511 if let Some(statement_uuid) = self.extra.statement_uuid.take() {
1512 let msg = Message::RetireExecute {
1515 data: ExecuteContextExtra {
1516 statement_uuid: Some(statement_uuid),
1517 },
1518 otel_ctx: OpenTelemetryContext::obtain(),
1519 reason: StatementEndedExecutionReason::Aborted,
1520 };
1521 let _ = self.coordinator_tx.send(msg);
1524 }
1525 }
1526}
1527
1528#[derive(Debug)]
1540pub struct ExecuteContext {
1541 inner: Box<ExecuteContextInner>,
1542}
1543
1544impl std::ops::Deref for ExecuteContext {
1545 type Target = ExecuteContextInner;
1546 fn deref(&self) -> &Self::Target {
1547 &*self.inner
1548 }
1549}
1550
1551impl std::ops::DerefMut for ExecuteContext {
1552 fn deref_mut(&mut self) -> &mut Self::Target {
1553 &mut *self.inner
1554 }
1555}
1556
1557#[derive(Debug)]
1558pub struct ExecuteContextInner {
1559 tx: ClientTransmitter<ExecuteResponse>,
1560 internal_cmd_tx: mpsc::UnboundedSender<Message>,
1561 session: Session,
1562 extra: ExecuteContextGuard,
1563}
1564
1565impl ExecuteContext {
1566 pub fn session(&self) -> &Session {
1567 &self.session
1568 }
1569
1570 pub fn session_mut(&mut self) -> &mut Session {
1571 &mut self.session
1572 }
1573
1574 pub fn tx(&self) -> &ClientTransmitter<ExecuteResponse> {
1575 &self.tx
1576 }
1577
1578 pub fn tx_mut(&mut self) -> &mut ClientTransmitter<ExecuteResponse> {
1579 &mut self.tx
1580 }
1581
1582 pub fn from_parts(
1583 tx: ClientTransmitter<ExecuteResponse>,
1584 internal_cmd_tx: mpsc::UnboundedSender<Message>,
1585 session: Session,
1586 extra: ExecuteContextGuard,
1587 ) -> Self {
1588 Self {
1589 inner: ExecuteContextInner {
1590 tx,
1591 session,
1592 extra,
1593 internal_cmd_tx,
1594 }
1595 .into(),
1596 }
1597 }
1598
1599 pub fn into_parts(
1608 self,
1609 ) -> (
1610 ClientTransmitter<ExecuteResponse>,
1611 mpsc::UnboundedSender<Message>,
1612 Session,
1613 ExecuteContextGuard,
1614 ) {
1615 let ExecuteContextInner {
1616 tx,
1617 internal_cmd_tx,
1618 session,
1619 extra,
1620 } = *self.inner;
1621 (tx, internal_cmd_tx, session, extra)
1622 }
1623
1624 #[instrument(level = "debug")]
1626 pub fn retire(self, result: Result<ExecuteResponse, AdapterError>) {
1627 let ExecuteContextInner {
1628 tx,
1629 internal_cmd_tx,
1630 session,
1631 extra,
1632 } = *self.inner;
1633 let reason = if extra.is_trivial() {
1634 None
1635 } else {
1636 Some((&result).into())
1637 };
1638 tx.send(result, session);
1639 if let Some(reason) = reason {
1640 let extra = extra.defuse();
1642 if let Err(e) = internal_cmd_tx.send(Message::RetireExecute {
1643 otel_ctx: OpenTelemetryContext::obtain(),
1644 data: extra,
1645 reason,
1646 }) {
1647 warn!("internal_cmd_rx dropped before we could send: {:?}", e);
1648 }
1649 }
1650 }
1651
1652 pub fn extra(&self) -> &ExecuteContextGuard {
1653 &self.extra
1654 }
1655
1656 pub fn extra_mut(&mut self) -> &mut ExecuteContextGuard {
1657 &mut self.extra
1658 }
1659}
1660
1661#[derive(Debug)]
1662struct ClusterReplicaStatuses(
1663 BTreeMap<ClusterId, BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>>>,
1664);
1665
1666impl ClusterReplicaStatuses {
1667 pub(crate) fn new() -> ClusterReplicaStatuses {
1668 ClusterReplicaStatuses(BTreeMap::new())
1669 }
1670
1671 pub(crate) fn initialize_cluster_statuses(&mut self, cluster_id: ClusterId) {
1675 let prev = self.0.insert(cluster_id, BTreeMap::new());
1676 assert_eq!(
1677 prev, None,
1678 "cluster {cluster_id} statuses already initialized"
1679 );
1680 }
1681
1682 pub(crate) fn initialize_cluster_replica_statuses(
1686 &mut self,
1687 cluster_id: ClusterId,
1688 replica_id: ReplicaId,
1689 num_processes: usize,
1690 time: DateTime<Utc>,
1691 ) {
1692 tracing::info!(
1693 ?cluster_id,
1694 ?replica_id,
1695 ?time,
1696 "initializing cluster replica status"
1697 );
1698 let replica_statuses = self.0.entry(cluster_id).or_default();
1699 let process_statuses = (0..num_processes)
1700 .map(|process_id| {
1701 let status = ClusterReplicaProcessStatus {
1702 status: ClusterStatus::Offline(Some(OfflineReason::Initializing)),
1703 time: time.clone(),
1704 };
1705 (u64::cast_from(process_id), status)
1706 })
1707 .collect();
1708 let prev = replica_statuses.insert(replica_id, process_statuses);
1709 assert_none!(
1710 prev,
1711 "cluster replica {cluster_id}.{replica_id} statuses already initialized"
1712 );
1713 }
1714
1715 pub(crate) fn remove_cluster_statuses(
1719 &mut self,
1720 cluster_id: &ClusterId,
1721 ) -> BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>> {
1722 let prev = self.0.remove(cluster_id);
1723 prev.unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"))
1724 }
1725
1726 pub(crate) fn remove_cluster_replica_statuses(
1730 &mut self,
1731 cluster_id: &ClusterId,
1732 replica_id: &ReplicaId,
1733 ) -> BTreeMap<ProcessId, ClusterReplicaProcessStatus> {
1734 let replica_statuses = self
1735 .0
1736 .get_mut(cluster_id)
1737 .unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"));
1738 let prev = replica_statuses.remove(replica_id);
1739 prev.unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1740 }
1741
1742 pub(crate) fn ensure_cluster_status(
1746 &mut self,
1747 cluster_id: ClusterId,
1748 replica_id: ReplicaId,
1749 process_id: ProcessId,
1750 status: ClusterReplicaProcessStatus,
1751 ) {
1752 let replica_statuses = self
1753 .0
1754 .get_mut(&cluster_id)
1755 .unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"))
1756 .get_mut(&replica_id)
1757 .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"));
1758 replica_statuses.insert(process_id, status);
1759 }
1760
1761 pub fn get_cluster_replica_status(
1765 &self,
1766 cluster_id: ClusterId,
1767 replica_id: ReplicaId,
1768 ) -> ClusterStatus {
1769 let process_status = self.get_cluster_replica_statuses(cluster_id, replica_id);
1770 Self::cluster_replica_status(process_status)
1771 }
1772
1773 pub fn cluster_replica_status(
1775 process_status: &BTreeMap<ProcessId, ClusterReplicaProcessStatus>,
1776 ) -> ClusterStatus {
1777 process_status
1778 .values()
1779 .fold(ClusterStatus::Online, |s, p| match (s, p.status) {
1780 (ClusterStatus::Online, ClusterStatus::Online) => ClusterStatus::Online,
1781 (x, y) => {
1782 let reason_x = match x {
1783 ClusterStatus::Offline(reason) => reason,
1784 ClusterStatus::Online => None,
1785 };
1786 let reason_y = match y {
1787 ClusterStatus::Offline(reason) => reason,
1788 ClusterStatus::Online => None,
1789 };
1790 ClusterStatus::Offline(reason_x.or(reason_y))
1792 }
1793 })
1794 }
1795
1796 pub(crate) fn get_cluster_replica_statuses(
1800 &self,
1801 cluster_id: ClusterId,
1802 replica_id: ReplicaId,
1803 ) -> &BTreeMap<ProcessId, ClusterReplicaProcessStatus> {
1804 self.try_get_cluster_replica_statuses(cluster_id, replica_id)
1805 .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1806 }
1807
1808 pub(crate) fn try_get_cluster_replica_statuses(
1810 &self,
1811 cluster_id: ClusterId,
1812 replica_id: ReplicaId,
1813 ) -> Option<&BTreeMap<ProcessId, ClusterReplicaProcessStatus>> {
1814 self.try_get_cluster_statuses(cluster_id)
1815 .and_then(|statuses| statuses.get(&replica_id))
1816 }
1817
1818 pub(crate) fn try_get_cluster_statuses(
1820 &self,
1821 cluster_id: ClusterId,
1822 ) -> Option<&BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>>> {
1823 self.0.get(&cluster_id)
1824 }
1825}
1826
1827#[derive(Derivative)]
1829#[derivative(Debug)]
1830pub struct Coordinator {
1831 #[derivative(Debug = "ignore")]
1833 controller: mz_controller::Controller,
1834 catalog: Arc<Catalog>,
1842
1843 persist_client: PersistClient,
1846
1847 internal_cmd_tx: mpsc::UnboundedSender<Message>,
1849 group_commit_tx: appends::GroupCommitNotifier,
1851
1852 strict_serializable_reads_tx: mpsc::UnboundedSender<(ConnectionId, PendingReadTxn)>,
1854
1855 global_timelines: BTreeMap<Timeline, TimelineState>,
1858
1859 transient_id_gen: Arc<TransientIdGen>,
1861 active_conns: BTreeMap<ConnectionId, ConnMeta>,
1864
1865 txn_read_holds: BTreeMap<ConnectionId, read_policy::ReadHolds>,
1869
1870 pending_peeks: BTreeMap<Uuid, PendingPeek>,
1874 client_pending_peeks: BTreeMap<ConnectionId, BTreeMap<Uuid, ClusterId>>,
1876
1877 pending_linearize_read_txns: BTreeMap<ConnectionId, PendingReadTxn>,
1879
1880 active_compute_sinks: BTreeMap<GlobalId, ActiveComputeSink>,
1882 active_webhooks: BTreeMap<CatalogItemId, WebhookAppenderInvalidator>,
1884 active_copies: BTreeMap<ConnectionId, ActiveCopyFrom>,
1887
1888 connection_cancel_watches: BTreeMap<ConnectionId, (watch::Sender<bool>, watch::Receiver<bool>)>,
1896 introspection_subscribes: BTreeMap<GlobalId, IntrospectionSubscribe>,
1898
1899 write_locks: BTreeMap<CatalogItemId, Arc<tokio::sync::Mutex<()>>>,
1901 deferred_write_ops: BTreeMap<ConnectionId, DeferredOp>,
1903
1904 pending_writes: Vec<PendingWriteTxn>,
1906
1907 advance_timelines_interval: Interval,
1917
1918 serialized_ddl: LockedVecDeque<DeferredPlanStatement>,
1927
1928 secrets_controller: Arc<dyn SecretsController>,
1931 caching_secrets_reader: CachingSecretsReader,
1933
1934 cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
1937
1938 storage_usage_client: StorageUsageClient,
1940 storage_usage_collection_interval: Duration,
1942
1943 #[derivative(Debug = "ignore")]
1945 segment_client: Option<mz_segment::Client>,
1946
1947 metrics: Metrics,
1949 optimizer_metrics: OptimizerMetrics,
1951
1952 tracing_handle: TracingHandle,
1954
1955 statement_logging: StatementLogging,
1957
1958 webhook_concurrency_limit: WebhookConcurrencyLimiter,
1960
1961 timestamp_oracle_config: Option<TimestampOracleConfig>,
1964
1965 check_cluster_scheduling_policies_interval: Interval,
1967
1968 cluster_scheduling_decisions: BTreeMap<ClusterId, BTreeMap<&'static str, SchedulingDecision>>,
1972
1973 caught_up_check_interval: Interval,
1976
1977 caught_up_check: Option<CaughtUpCheckContext>,
1980
1981 installed_watch_sets: BTreeMap<WatchSetId, (ConnectionId, WatchSetResponse)>,
1983
1984 connection_watch_sets: BTreeMap<ConnectionId, BTreeSet<WatchSetId>>,
1986
1987 cluster_replica_statuses: ClusterReplicaStatuses,
1989
1990 read_only_controllers: bool,
1994
1995 buffered_builtin_table_updates: Option<Vec<BuiltinTableUpdate>>,
2003
2004 license_key: ValidatedLicenseKey,
2005
2006 user_id_pool: IdPool,
2008}
2009
2010impl Coordinator {
2011 #[instrument(name = "coord::bootstrap")]
2015 pub(crate) async fn bootstrap(
2016 &mut self,
2017 boot_ts: Timestamp,
2018 migrated_storage_collections_0dt: BTreeSet<CatalogItemId>,
2019 mut builtin_table_updates: Vec<BuiltinTableUpdate>,
2020 cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
2021 uncached_local_exprs: BTreeMap<GlobalId, LocalExpressions>,
2022 audit_logs_iterator: AuditLogIterator,
2023 ) -> Result<(), AdapterError> {
2024 let bootstrap_start = Instant::now();
2025 info!("startup: coordinator init: bootstrap beginning");
2026 info!("startup: coordinator init: bootstrap: preamble beginning");
2027
2028 let cluster_statuses: Vec<(_, Vec<_>)> = self
2031 .catalog()
2032 .clusters()
2033 .map(|cluster| {
2034 (
2035 cluster.id(),
2036 cluster
2037 .replicas()
2038 .map(|replica| {
2039 (replica.replica_id, replica.config.location.num_processes())
2040 })
2041 .collect(),
2042 )
2043 })
2044 .collect();
2045 let now = self.now_datetime();
2046 for (cluster_id, replica_statuses) in cluster_statuses {
2047 self.cluster_replica_statuses
2048 .initialize_cluster_statuses(cluster_id);
2049 for (replica_id, num_processes) in replica_statuses {
2050 self.cluster_replica_statuses
2051 .initialize_cluster_replica_statuses(
2052 cluster_id,
2053 replica_id,
2054 num_processes,
2055 now,
2056 );
2057 }
2058 }
2059
2060 let system_config = self.catalog().system_config();
2061
2062 mz_metrics::update_dyncfg(&system_config.dyncfg_updates());
2064
2065 let compute_config = flags::compute_config(system_config);
2067 let storage_config = flags::storage_config(system_config);
2068 let scheduling_config = flags::orchestrator_scheduling_config(system_config);
2069 let dyncfg_updates = system_config.dyncfg_updates();
2070 self.controller.compute.update_configuration(compute_config);
2071 self.controller.storage.update_parameters(storage_config);
2072 self.controller
2073 .update_orchestrator_scheduling_config(scheduling_config);
2074 self.controller.update_configuration(dyncfg_updates);
2075
2076 let enforce_credit_limit_at_bootstrap = !matches!(
2081 self.license_key.expiration_behavior,
2082 ExpirationBehavior::DisableClusterCreation,
2083 );
2084 if enforce_credit_limit_at_bootstrap {
2085 self.validate_resource_limit_numeric(
2086 Numeric::zero(),
2087 self.current_credit_consumption_rate(),
2088 |system_vars| {
2089 self.license_key
2090 .max_credit_consumption_rate()
2091 .map_or_else(|| system_vars.max_credit_consumption_rate(), Numeric::from)
2092 },
2093 "cluster replica",
2094 MAX_CREDIT_CONSUMPTION_RATE.name(),
2095 )?;
2096 }
2097
2098 let mut policies_to_set: BTreeMap<CompactionWindow, CollectionIdBundle> =
2099 Default::default();
2100
2101 let enable_worker_core_affinity =
2102 self.catalog().system_config().enable_worker_core_affinity();
2103 let enable_storage_introspection_logs = self
2104 .catalog()
2105 .system_config()
2106 .enable_storage_introspection_logs();
2107 for instance in self.catalog.clusters() {
2108 self.controller.create_cluster(
2109 instance.id,
2110 ClusterConfig {
2111 arranged_logs: instance.log_indexes.clone(),
2112 workload_class: instance.config.workload_class.clone(),
2113 },
2114 )?;
2115 for replica in instance.replicas() {
2116 let role = instance.role();
2117 self.controller.create_replica(
2118 instance.id,
2119 replica.replica_id,
2120 instance.name.clone(),
2121 replica.name.clone(),
2122 role,
2123 replica.config.clone(),
2124 enable_worker_core_affinity,
2125 enable_storage_introspection_logs,
2126 )?;
2127 }
2128 }
2129
2130 info!(
2131 "startup: coordinator init: bootstrap: preamble complete in {:?}",
2132 bootstrap_start.elapsed()
2133 );
2134
2135 let init_storage_collections_start = Instant::now();
2136 info!("startup: coordinator init: bootstrap: storage collections init beginning");
2137 self.bootstrap_storage_collections(&migrated_storage_collections_0dt)
2138 .await;
2139 info!(
2140 "startup: coordinator init: bootstrap: storage collections init complete in {:?}",
2141 init_storage_collections_start.elapsed()
2142 );
2143
2144 self.controller.start_compute_introspection_sink();
2149
2150 let sorting_start = Instant::now();
2151 info!("startup: coordinator init: bootstrap: sorting catalog entries");
2152 let entries = self.bootstrap_sort_catalog_entries();
2153 info!(
2154 "startup: coordinator init: bootstrap: sorting catalog entries complete in {:?}",
2155 sorting_start.elapsed()
2156 );
2157
2158 let optimize_dataflows_start = Instant::now();
2159 info!("startup: coordinator init: bootstrap: optimize dataflow plans beginning");
2160 let uncached_global_exps = self.bootstrap_dataflow_plans(&entries, cached_global_exprs)?;
2161 info!(
2162 "startup: coordinator init: bootstrap: optimize dataflow plans complete in {:?}",
2163 optimize_dataflows_start.elapsed()
2164 );
2165
2166 let _fut = self.catalog().update_expression_cache(
2168 uncached_local_exprs.into_iter().collect(),
2169 uncached_global_exps.into_iter().collect(),
2170 Default::default(),
2171 );
2172
2173 let bootstrap_as_ofs_start = Instant::now();
2177 info!("startup: coordinator init: bootstrap: dataflow as-of bootstrapping beginning");
2178 let dataflow_read_holds = self.bootstrap_dataflow_as_ofs().await;
2179 info!(
2180 "startup: coordinator init: bootstrap: dataflow as-of bootstrapping complete in {:?}",
2181 bootstrap_as_ofs_start.elapsed()
2182 );
2183
2184 let postamble_start = Instant::now();
2185 info!("startup: coordinator init: bootstrap: postamble beginning");
2186
2187 let logs: BTreeSet<_> = BUILTINS::logs()
2188 .map(|log| self.catalog().resolve_builtin_log(log))
2189 .flat_map(|item_id| self.catalog().get_global_ids(&item_id))
2190 .collect();
2191
2192 let mut privatelink_connections = BTreeMap::new();
2193
2194 for entry in &entries {
2195 debug!(
2196 "coordinator init: installing {} {}",
2197 entry.item().typ(),
2198 entry.id()
2199 );
2200 let mut policy = entry.item().initial_logical_compaction_window();
2201 match entry.item() {
2202 CatalogItem::Source(source) => {
2208 if source.custom_logical_compaction_window.is_none() {
2210 if let DataSourceDesc::IngestionExport { ingestion_id, .. } =
2211 source.data_source
2212 {
2213 policy = Some(
2214 self.catalog()
2215 .get_entry(&ingestion_id)
2216 .source()
2217 .expect("must be source")
2218 .custom_logical_compaction_window
2219 .unwrap_or_default(),
2220 );
2221 }
2222 }
2223 policies_to_set
2224 .entry(policy.expect("sources have a compaction window"))
2225 .or_insert_with(Default::default)
2226 .storage_ids
2227 .insert(source.global_id());
2228 }
2229 CatalogItem::Table(table) => {
2230 policies_to_set
2231 .entry(policy.expect("tables have a compaction window"))
2232 .or_insert_with(Default::default)
2233 .storage_ids
2234 .extend(table.global_ids());
2235 }
2236 CatalogItem::Index(idx) => {
2237 let policy_entry = policies_to_set
2238 .entry(policy.expect("indexes have a compaction window"))
2239 .or_insert_with(Default::default);
2240
2241 if logs.contains(&idx.on) {
2242 policy_entry
2243 .compute_ids
2244 .entry(idx.cluster_id)
2245 .or_insert_with(BTreeSet::new)
2246 .insert(idx.global_id());
2247 } else {
2248 let df_desc = self
2249 .catalog()
2250 .try_get_physical_plan(&idx.global_id())
2251 .expect("added in `bootstrap_dataflow_plans`")
2252 .clone();
2253
2254 let df_meta = self
2255 .catalog()
2256 .try_get_dataflow_metainfo(&idx.global_id())
2257 .expect("added in `bootstrap_dataflow_plans`");
2258
2259 if self.catalog().state().system_config().enable_mz_notices() {
2260 self.catalog().state().pack_optimizer_notices(
2262 &mut builtin_table_updates,
2263 df_meta.optimizer_notices.iter(),
2264 Diff::ONE,
2265 );
2266 }
2267
2268 policy_entry
2271 .compute_ids
2272 .entry(idx.cluster_id)
2273 .or_insert_with(Default::default)
2274 .extend(df_desc.export_ids());
2275
2276 self.controller
2277 .compute
2278 .create_dataflow(idx.cluster_id, df_desc, None)
2279 .unwrap_or_terminate("cannot fail to create dataflows");
2280 }
2281 }
2282 CatalogItem::View(_) => (),
2283 CatalogItem::MaterializedView(mview) => {
2284 policies_to_set
2285 .entry(policy.expect("materialized views have a compaction window"))
2286 .or_insert_with(Default::default)
2287 .storage_ids
2288 .insert(mview.global_id_writes());
2289
2290 let mut df_desc = self
2291 .catalog()
2292 .try_get_physical_plan(&mview.global_id_writes())
2293 .expect("added in `bootstrap_dataflow_plans`")
2294 .clone();
2295
2296 if let Some(initial_as_of) = mview.initial_as_of.clone() {
2297 df_desc.set_initial_as_of(initial_as_of);
2298 }
2299
2300 let until = mview
2302 .refresh_schedule
2303 .as_ref()
2304 .and_then(|s| s.last_refresh())
2305 .and_then(|r| r.try_step_forward());
2306 if let Some(until) = until {
2307 df_desc.until.meet_assign(&Antichain::from_elem(until));
2308 }
2309
2310 let df_meta = self
2311 .catalog()
2312 .try_get_dataflow_metainfo(&mview.global_id_writes())
2313 .expect("added in `bootstrap_dataflow_plans`");
2314
2315 if self.catalog().state().system_config().enable_mz_notices() {
2316 self.catalog().state().pack_optimizer_notices(
2318 &mut builtin_table_updates,
2319 df_meta.optimizer_notices.iter(),
2320 Diff::ONE,
2321 );
2322 }
2323
2324 self.ship_dataflow(df_desc, mview.cluster_id, mview.target_replica)
2325 .await;
2326
2327 if mview.replacement_target.is_none() {
2330 self.allow_writes(mview.cluster_id, mview.global_id_writes());
2331 }
2332 }
2333 CatalogItem::Sink(sink) => {
2334 policies_to_set
2335 .entry(CompactionWindow::Default)
2336 .or_insert_with(Default::default)
2337 .storage_ids
2338 .insert(sink.global_id());
2339 }
2340 CatalogItem::Connection(catalog_connection) => {
2341 if let ConnectionDetails::AwsPrivatelink(conn) = &catalog_connection.details {
2342 privatelink_connections.insert(
2343 entry.id(),
2344 VpcEndpointConfig {
2345 aws_service_name: conn.service_name.clone(),
2346 availability_zone_ids: conn.availability_zones.clone(),
2347 },
2348 );
2349 }
2350 }
2351 CatalogItem::Log(_)
2353 | CatalogItem::Type(_)
2354 | CatalogItem::Func(_)
2355 | CatalogItem::Secret(_) => {}
2356 }
2357 }
2358
2359 if let Some(cloud_resource_controller) = &self.cloud_resource_controller {
2360 let existing_vpc_endpoints = cloud_resource_controller
2362 .list_vpc_endpoints()
2363 .await
2364 .context("list vpc endpoints")?;
2365 let existing_vpc_endpoints = BTreeSet::from_iter(existing_vpc_endpoints.into_keys());
2366 let desired_vpc_endpoints = privatelink_connections.keys().cloned().collect();
2367 let vpc_endpoints_to_remove = existing_vpc_endpoints.difference(&desired_vpc_endpoints);
2368 for id in vpc_endpoints_to_remove {
2369 cloud_resource_controller
2370 .delete_vpc_endpoint(*id)
2371 .await
2372 .context("deleting extraneous vpc endpoint")?;
2373 }
2374
2375 for (id, spec) in privatelink_connections {
2377 cloud_resource_controller
2378 .ensure_vpc_endpoint(id, spec)
2379 .await
2380 .context("ensuring vpc endpoint")?;
2381 }
2382 }
2383
2384 drop(dataflow_read_holds);
2387 for (cw, policies) in policies_to_set {
2389 self.initialize_read_policies(&policies, cw).await;
2390 }
2391
2392 builtin_table_updates.extend(
2394 self.catalog().state().resolve_builtin_table_updates(
2395 self.catalog().state().pack_all_replica_size_updates(),
2396 ),
2397 );
2398
2399 debug!("startup: coordinator init: bootstrap: initializing migrated builtin tables");
2400 let migrated_updates_fut = if self.controller.read_only() {
2406 let min_timestamp = Timestamp::minimum();
2407 let migrated_builtin_table_updates: Vec<_> = builtin_table_updates
2408 .extract_if(.., |update| {
2409 let gid = self.catalog().get_entry(&update.id).latest_global_id();
2410 migrated_storage_collections_0dt.contains(&update.id)
2411 && self
2412 .controller
2413 .storage_collections
2414 .collection_frontiers(gid)
2415 .expect("all tables are registered")
2416 .write_frontier
2417 .elements()
2418 == &[min_timestamp]
2419 })
2420 .collect();
2421 if migrated_builtin_table_updates.is_empty() {
2422 futures::future::ready(()).boxed()
2423 } else {
2424 let mut grouped_appends: BTreeMap<GlobalId, Vec<TableData>> = BTreeMap::new();
2426 for update in migrated_builtin_table_updates {
2427 let gid = self.catalog().get_entry(&update.id).latest_global_id();
2428 grouped_appends.entry(gid).or_default().push(update.data);
2429 }
2430 info!(
2431 "coordinator init: rehydrating migrated builtin tables in read-only mode: {:?}",
2432 grouped_appends.keys().collect::<Vec<_>>()
2433 );
2434
2435 let mut all_appends = Vec::with_capacity(grouped_appends.len());
2437 for (item_id, table_data) in grouped_appends.into_iter() {
2438 let mut all_rows = Vec::new();
2439 let mut all_data = Vec::new();
2440 for data in table_data {
2441 match data {
2442 TableData::Rows(rows) => all_rows.extend(rows),
2443 TableData::Batches(_) => all_data.push(data),
2444 }
2445 }
2446 differential_dataflow::consolidation::consolidate(&mut all_rows);
2447 all_data.push(TableData::Rows(all_rows));
2448
2449 all_appends.push((item_id, all_data));
2451 }
2452
2453 let fut = self
2454 .controller
2455 .storage
2456 .append_table(min_timestamp, boot_ts.step_forward(), all_appends)
2457 .expect("cannot fail to append");
2458 async {
2459 fut.await
2460 .expect("One-shot shouldn't be dropped during bootstrap")
2461 .unwrap_or_terminate("cannot fail to append")
2462 }
2463 .boxed()
2464 }
2465 } else {
2466 futures::future::ready(()).boxed()
2467 };
2468
2469 info!(
2470 "startup: coordinator init: bootstrap: postamble complete in {:?}",
2471 postamble_start.elapsed()
2472 );
2473
2474 let builtin_update_start = Instant::now();
2475 info!("startup: coordinator init: bootstrap: generate builtin updates beginning");
2476
2477 if self.controller.read_only() {
2478 info!(
2479 "coordinator init: bootstrap: stashing builtin table updates while in read-only mode"
2480 );
2481
2482 let audit_join_start = Instant::now();
2484 info!("startup: coordinator init: bootstrap: audit log deserialization beginning");
2485 let audit_log_updates: Vec<_> = audit_logs_iterator
2486 .map(|(audit_log, ts)| StateUpdate {
2487 kind: StateUpdateKind::AuditLog(audit_log),
2488 ts,
2489 diff: StateDiff::Addition,
2490 })
2491 .collect();
2492 let audit_log_builtin_table_updates = self
2493 .catalog()
2494 .state()
2495 .generate_builtin_table_updates(audit_log_updates);
2496 builtin_table_updates.extend(audit_log_builtin_table_updates);
2497 info!(
2498 "startup: coordinator init: bootstrap: audit log deserialization complete in {:?}",
2499 audit_join_start.elapsed()
2500 );
2501 self.buffered_builtin_table_updates
2502 .as_mut()
2503 .expect("in read-only mode")
2504 .append(&mut builtin_table_updates);
2505 } else {
2506 self.bootstrap_tables(&entries, builtin_table_updates, audit_logs_iterator)
2507 .await;
2508 };
2509 info!(
2510 "startup: coordinator init: bootstrap: generate builtin updates complete in {:?}",
2511 builtin_update_start.elapsed()
2512 );
2513
2514 let cleanup_secrets_start = Instant::now();
2515 info!("startup: coordinator init: bootstrap: generate secret cleanup beginning");
2516 {
2520 let Self {
2523 secrets_controller,
2524 catalog,
2525 ..
2526 } = self;
2527
2528 let next_user_item_id = catalog.get_next_user_item_id().await?;
2529 let next_system_item_id = catalog.get_next_system_item_id().await?;
2530 let read_only = self.controller.read_only();
2531 let catalog_ids: BTreeSet<CatalogItemId> =
2536 catalog.entries().map(|entry| entry.id()).collect();
2537 let secrets_controller = Arc::clone(secrets_controller);
2538
2539 spawn(|| "cleanup-orphaned-secrets", async move {
2540 if read_only {
2541 info!(
2542 "coordinator init: not cleaning up orphaned secrets while in read-only mode"
2543 );
2544 return;
2545 }
2546 info!("coordinator init: cleaning up orphaned secrets");
2547
2548 match secrets_controller.list().await {
2549 Ok(controller_secrets) => {
2550 let controller_secrets: BTreeSet<CatalogItemId> =
2551 controller_secrets.into_iter().collect();
2552 let orphaned = controller_secrets.difference(&catalog_ids);
2553 for id in orphaned {
2554 let id_too_large = match id {
2555 CatalogItemId::System(id) => *id >= next_system_item_id,
2556 CatalogItemId::User(id) => *id >= next_user_item_id,
2557 CatalogItemId::IntrospectionSourceIndex(_)
2558 | CatalogItemId::Transient(_) => false,
2559 };
2560 if id_too_large {
2561 info!(
2562 %next_user_item_id, %next_system_item_id,
2563 "coordinator init: not deleting orphaned secret {id} that was likely created by a newer deploy generation"
2564 );
2565 } else {
2566 info!("coordinator init: deleting orphaned secret {id}");
2567 fail_point!("orphan_secrets");
2568 if let Err(e) = secrets_controller.delete(*id).await {
2569 warn!(
2570 "Dropping orphaned secret has encountered an error: {}",
2571 e
2572 );
2573 }
2574 }
2575 }
2576 }
2577 Err(e) => warn!("Failed to list secrets during orphan cleanup: {:?}", e),
2578 }
2579 });
2580 }
2581 info!(
2582 "startup: coordinator init: bootstrap: generate secret cleanup complete in {:?}",
2583 cleanup_secrets_start.elapsed()
2584 );
2585
2586 let final_steps_start = Instant::now();
2588 info!(
2589 "startup: coordinator init: bootstrap: migrate builtin tables in read-only mode beginning"
2590 );
2591 migrated_updates_fut
2592 .instrument(info_span!("coord::bootstrap::final"))
2593 .await;
2594
2595 debug!(
2596 "startup: coordinator init: bootstrap: announcing completion of initialization to controller"
2597 );
2598 self.controller.initialization_complete();
2600
2601 self.bootstrap_introspection_subscribes().await;
2603
2604 info!(
2605 "startup: coordinator init: bootstrap: migrate builtin tables in read-only mode complete in {:?}",
2606 final_steps_start.elapsed()
2607 );
2608
2609 info!(
2610 "startup: coordinator init: bootstrap complete in {:?}",
2611 bootstrap_start.elapsed()
2612 );
2613 Ok(())
2614 }
2615
2616 #[allow(clippy::async_yields_async)]
2621 #[instrument]
2622 async fn bootstrap_tables(
2623 &mut self,
2624 entries: &[CatalogEntry],
2625 mut builtin_table_updates: Vec<BuiltinTableUpdate>,
2626 audit_logs_iterator: AuditLogIterator,
2627 ) {
2628 struct TableMetadata<'a> {
2630 id: CatalogItemId,
2631 name: &'a QualifiedItemName,
2632 table: &'a Table,
2633 }
2634
2635 let table_metas: Vec<_> = entries
2637 .into_iter()
2638 .filter_map(|entry| {
2639 entry.table().map(|table| TableMetadata {
2640 id: entry.id(),
2641 name: entry.name(),
2642 table,
2643 })
2644 })
2645 .collect();
2646
2647 debug!("coordinator init: advancing all tables to current timestamp");
2649 let WriteTimestamp {
2650 timestamp: write_ts,
2651 advance_to,
2652 } = self.get_local_write_ts().await;
2653 let appends = table_metas
2654 .iter()
2655 .map(|meta| (meta.table.global_id_writes(), Vec::new()))
2656 .collect();
2657 let table_fence_rx = self
2661 .controller
2662 .storage
2663 .append_table(write_ts.clone(), advance_to, appends)
2664 .expect("invalid updates");
2665
2666 self.apply_local_write(write_ts).await;
2667
2668 debug!("coordinator init: resetting system tables");
2670 let read_ts = self.get_local_read_ts().await;
2671
2672 let mz_storage_usage_by_shard_schema: SchemaSpecifier = self
2675 .catalog()
2676 .resolve_system_schema(MZ_STORAGE_USAGE_BY_SHARD.schema)
2677 .into();
2678 let is_storage_usage_by_shard = |meta: &TableMetadata| -> bool {
2679 meta.name.item == MZ_STORAGE_USAGE_BY_SHARD.name
2680 && meta.name.qualifiers.schema_spec == mz_storage_usage_by_shard_schema
2681 };
2682
2683 let mut retraction_tasks = Vec::new();
2684 let mut system_tables: Vec<_> = table_metas
2685 .iter()
2686 .filter(|meta| meta.id.is_system() && !is_storage_usage_by_shard(meta))
2687 .collect();
2688
2689 let (audit_events_idx, _) = system_tables
2691 .iter()
2692 .find_position(|table| {
2693 table.id == self.catalog().resolve_builtin_table(&MZ_AUDIT_EVENTS)
2694 })
2695 .expect("mz_audit_events must exist");
2696 let audit_events = system_tables.remove(audit_events_idx);
2697 let audit_log_task = self.bootstrap_audit_log_table(
2698 audit_events.id,
2699 audit_events.name,
2700 audit_events.table,
2701 audit_logs_iterator,
2702 read_ts,
2703 );
2704
2705 for system_table in system_tables {
2706 let table_id = system_table.id;
2707 let full_name = self.catalog().resolve_full_name(system_table.name, None);
2708 debug!("coordinator init: resetting system table {full_name} ({table_id})");
2709
2710 let snapshot_fut = self
2712 .controller
2713 .storage_collections
2714 .snapshot_cursor(system_table.table.global_id_writes(), read_ts);
2715 let batch_fut = self
2716 .controller
2717 .storage_collections
2718 .create_update_builder(system_table.table.global_id_writes());
2719
2720 let task = spawn(|| format!("snapshot-{table_id}"), async move {
2721 let mut batch = batch_fut
2723 .await
2724 .unwrap_or_terminate("cannot fail to create a batch for a BuiltinTable");
2725 tracing::info!(?table_id, "starting snapshot");
2726 let mut snapshot_cursor = snapshot_fut
2728 .await
2729 .unwrap_or_terminate("cannot fail to snapshot");
2730
2731 while let Some(values) = snapshot_cursor.next().await {
2733 for (key, _t, d) in values {
2734 let d_invert = d.neg();
2735 batch.add(&key, &(), &d_invert).await;
2736 }
2737 }
2738 tracing::info!(?table_id, "finished snapshot");
2739
2740 let batch = batch.finish().await;
2741 BuiltinTableUpdate::batch(table_id, batch)
2742 });
2743 retraction_tasks.push(task);
2744 }
2745
2746 let retractions_res = futures::future::join_all(retraction_tasks).await;
2747 for retractions in retractions_res {
2748 builtin_table_updates.push(retractions);
2749 }
2750
2751 let audit_join_start = Instant::now();
2752 info!("startup: coordinator init: bootstrap: join audit log deserialization beginning");
2753 let audit_log_updates = audit_log_task.await;
2754 let audit_log_builtin_table_updates = self
2755 .catalog()
2756 .state()
2757 .generate_builtin_table_updates(audit_log_updates);
2758 builtin_table_updates.extend(audit_log_builtin_table_updates);
2759 info!(
2760 "startup: coordinator init: bootstrap: join audit log deserialization complete in {:?}",
2761 audit_join_start.elapsed()
2762 );
2763
2764 table_fence_rx
2766 .await
2767 .expect("One-shot shouldn't be dropped during bootstrap")
2768 .unwrap_or_terminate("cannot fail to append");
2769
2770 info!("coordinator init: sending builtin table updates");
2771 let (_builtin_updates_fut, write_ts) = self
2772 .builtin_table_update()
2773 .execute(builtin_table_updates)
2774 .await;
2775 info!(?write_ts, "our write ts");
2776 if let Some(write_ts) = write_ts {
2777 self.apply_local_write(write_ts).await;
2778 }
2779 }
2780
2781 #[instrument]
2785 fn bootstrap_audit_log_table<'a>(
2786 &self,
2787 table_id: CatalogItemId,
2788 name: &'a QualifiedItemName,
2789 table: &'a Table,
2790 audit_logs_iterator: AuditLogIterator,
2791 read_ts: Timestamp,
2792 ) -> JoinHandle<Vec<StateUpdate>> {
2793 let full_name = self.catalog().resolve_full_name(name, None);
2794 debug!("coordinator init: reconciling audit log: {full_name} ({table_id})");
2795 let current_contents_fut = self
2796 .controller
2797 .storage_collections
2798 .snapshot(table.global_id_writes(), read_ts);
2799 spawn(|| format!("snapshot-audit-log-{table_id}"), async move {
2800 let current_contents = current_contents_fut
2801 .await
2802 .unwrap_or_terminate("cannot fail to fetch snapshot");
2803 let contents_len = current_contents.len();
2804 debug!("coordinator init: audit log table ({table_id}) size {contents_len}");
2805
2806 let max_table_id = current_contents
2808 .into_iter()
2809 .filter(|(_, diff)| *diff == 1)
2810 .map(|(row, _diff)| row.unpack_first().unwrap_uint64())
2811 .sorted()
2812 .rev()
2813 .next();
2814
2815 audit_logs_iterator
2817 .take_while(|(audit_log, _)| match max_table_id {
2818 Some(id) => audit_log.event.sortable_id() > id,
2819 None => true,
2820 })
2821 .map(|(audit_log, ts)| StateUpdate {
2822 kind: StateUpdateKind::AuditLog(audit_log),
2823 ts,
2824 diff: StateDiff::Addition,
2825 })
2826 .collect::<Vec<_>>()
2827 })
2828 }
2829
2830 #[instrument]
2843 async fn bootstrap_storage_collections(
2844 &mut self,
2845 migrated_storage_collections: &BTreeSet<CatalogItemId>,
2846 ) {
2847 let catalog = self.catalog();
2848
2849 let source_desc = |object_id: GlobalId,
2850 data_source: &DataSourceDesc,
2851 desc: &RelationDesc,
2852 timeline: &Timeline| {
2853 let data_source = match data_source.clone() {
2854 DataSourceDesc::Ingestion { desc, cluster_id } => {
2856 let desc = desc.into_inline_connection(catalog.state());
2857 let ingestion = IngestionDescription::new(desc, cluster_id, object_id);
2858 DataSource::Ingestion(ingestion)
2859 }
2860 DataSourceDesc::OldSyntaxIngestion {
2861 desc,
2862 progress_subsource,
2863 data_config,
2864 details,
2865 cluster_id,
2866 } => {
2867 let desc = desc.into_inline_connection(catalog.state());
2868 let data_config = data_config.into_inline_connection(catalog.state());
2869 let progress_subsource =
2872 catalog.get_entry(&progress_subsource).latest_global_id();
2873 let mut ingestion =
2874 IngestionDescription::new(desc, cluster_id, progress_subsource);
2875 let legacy_export = SourceExport {
2876 storage_metadata: (),
2877 data_config,
2878 details,
2879 };
2880 ingestion.source_exports.insert(object_id, legacy_export);
2881
2882 DataSource::Ingestion(ingestion)
2883 }
2884 DataSourceDesc::IngestionExport {
2885 ingestion_id,
2886 external_reference: _,
2887 details,
2888 data_config,
2889 } => {
2890 let ingestion_id = catalog.get_entry(&ingestion_id).latest_global_id();
2893
2894 DataSource::IngestionExport {
2895 ingestion_id,
2896 details,
2897 data_config: data_config.into_inline_connection(catalog.state()),
2898 }
2899 }
2900 DataSourceDesc::Webhook { .. } => DataSource::Webhook,
2901 DataSourceDesc::Progress => DataSource::Progress,
2902 DataSourceDesc::Introspection(introspection) => {
2903 DataSource::Introspection(introspection)
2904 }
2905 DataSourceDesc::Catalog => DataSource::Other,
2906 };
2907 CollectionDescription {
2908 desc: desc.clone(),
2909 data_source,
2910 since: None,
2911 timeline: Some(timeline.clone()),
2912 primary: None,
2913 }
2914 };
2915
2916 let mut compute_collections = vec![];
2917 let mut collections = vec![];
2918 for entry in catalog.entries() {
2919 match entry.item() {
2920 CatalogItem::Source(source) => {
2921 collections.push((
2922 source.global_id(),
2923 source_desc(
2924 source.global_id(),
2925 &source.data_source,
2926 &source.desc,
2927 &source.timeline,
2928 ),
2929 ));
2930 }
2931 CatalogItem::Table(table) => {
2932 match &table.data_source {
2933 TableDataSource::TableWrites { defaults: _ } => {
2934 let versions: BTreeMap<_, _> = table
2935 .collection_descs()
2936 .map(|(gid, version, desc)| (version, (gid, desc)))
2937 .collect();
2938 let collection_descs = versions.iter().map(|(version, (gid, desc))| {
2939 let next_version = version.bump();
2940 let primary_collection =
2941 versions.get(&next_version).map(|(gid, _desc)| gid).copied();
2942 let mut collection_desc =
2943 CollectionDescription::for_table(desc.clone());
2944 collection_desc.primary = primary_collection;
2945
2946 (*gid, collection_desc)
2947 });
2948 collections.extend(collection_descs);
2949 }
2950 TableDataSource::DataSource {
2951 desc: data_source_desc,
2952 timeline,
2953 } => {
2954 soft_assert_eq_or_log!(table.collections.len(), 1);
2956 let collection_descs =
2957 table.collection_descs().map(|(gid, _version, desc)| {
2958 (
2959 gid,
2960 source_desc(
2961 entry.latest_global_id(),
2962 data_source_desc,
2963 &desc,
2964 timeline,
2965 ),
2966 )
2967 });
2968 collections.extend(collection_descs);
2969 }
2970 };
2971 }
2972 CatalogItem::MaterializedView(mv) => {
2973 let collection_descs = mv.collection_descs().map(|(gid, _version, desc)| {
2974 let collection_desc =
2975 CollectionDescription::for_other(desc, mv.initial_as_of.clone());
2976 (gid, collection_desc)
2977 });
2978
2979 collections.extend(collection_descs);
2980 compute_collections.push((mv.global_id_writes(), mv.desc.latest()));
2981 }
2982 CatalogItem::Sink(sink) => {
2983 let storage_sink_from_entry = self.catalog().get_entry_by_global_id(&sink.from);
2984 let from_desc = storage_sink_from_entry
2985 .relation_desc()
2986 .expect("sinks can only be built on items with descs")
2987 .into_owned();
2988 let collection_desc = CollectionDescription {
2989 desc: KAFKA_PROGRESS_DESC.clone(),
2991 data_source: DataSource::Sink {
2992 desc: ExportDescription {
2993 sink: StorageSinkDesc {
2994 from: sink.from,
2995 from_desc,
2996 connection: sink
2997 .connection
2998 .clone()
2999 .into_inline_connection(self.catalog().state()),
3000 envelope: sink.envelope,
3001 as_of: Antichain::from_elem(Timestamp::minimum()),
3002 with_snapshot: sink.with_snapshot,
3003 version: sink.version,
3004 from_storage_metadata: (),
3005 to_storage_metadata: (),
3006 commit_interval: sink.commit_interval,
3007 },
3008 instance_id: sink.cluster_id,
3009 },
3010 },
3011 since: None,
3012 timeline: None,
3013 primary: None,
3014 };
3015 collections.push((sink.global_id, collection_desc));
3016 }
3017 CatalogItem::Log(_)
3018 | CatalogItem::View(_)
3019 | CatalogItem::Index(_)
3020 | CatalogItem::Type(_)
3021 | CatalogItem::Func(_)
3022 | CatalogItem::Secret(_)
3023 | CatalogItem::Connection(_) => (),
3024 }
3025 }
3026
3027 let register_ts = if self.controller.read_only() {
3028 self.get_local_read_ts().await
3029 } else {
3030 self.get_local_write_ts().await.timestamp
3033 };
3034
3035 let storage_metadata = self.catalog.state().storage_metadata();
3036 let migrated_storage_collections = migrated_storage_collections
3037 .into_iter()
3038 .flat_map(|item_id| self.catalog.get_entry(item_id).global_ids())
3039 .collect();
3040
3041 self.controller
3046 .storage
3047 .evolve_nullability_for_bootstrap(storage_metadata, compute_collections)
3048 .await
3049 .unwrap_or_terminate("cannot fail to evolve collections");
3050
3051 let mut pending: BTreeMap<_, _> = collections.into_iter().collect();
3064
3065 let transitive_dep_gids: BTreeMap<_, _> = pending
3067 .keys()
3068 .map(|gid| {
3069 let entry = self.catalog.get_entry_by_global_id(gid);
3070 let item_id = entry.id();
3071 let deps = self.catalog.state().transitive_uses(item_id);
3072 let dep_gids: BTreeSet<_> = deps
3073 .filter(|dep_id| *dep_id != item_id)
3076 .map(|dep_id| self.catalog.get_entry(&dep_id).latest_global_id())
3077 .filter(|dep_gid| pending.contains_key(dep_gid))
3079 .collect();
3080 (*gid, dep_gids)
3081 })
3082 .collect();
3083
3084 while !pending.is_empty() {
3085 let ready_gids: BTreeSet<_> = pending
3088 .keys()
3089 .filter(|gid| {
3090 let mut deps = transitive_dep_gids[gid].iter();
3091 !deps.any(|dep_gid| pending.contains_key(dep_gid))
3092 })
3093 .copied()
3094 .collect();
3095 let mut ready: Vec<_> = pending
3096 .extract_if(.., |gid, _| ready_gids.contains(gid))
3097 .collect();
3098
3099 for (gid, collection) in &mut ready {
3101 if !gid.is_system() || collection.since.is_some() {
3103 continue;
3104 }
3105
3106 let mut derived_since = Antichain::from_elem(Timestamp::MIN);
3107 for dep_gid in &transitive_dep_gids[gid] {
3108 let (since, _) = self
3109 .controller
3110 .storage
3111 .collection_frontiers(*dep_gid)
3112 .expect("previously registered");
3113 derived_since.join_assign(&since);
3114 }
3115 collection.since = Some(derived_since);
3116 }
3117
3118 if ready.is_empty() {
3119 soft_panic_or_log!(
3120 "cycle in storage collections: {:?}",
3121 pending.keys().collect::<Vec<_>>(),
3122 );
3123 ready = mem::take(&mut pending).into_iter().collect();
3127 }
3128
3129 self.controller
3130 .storage
3131 .create_collections_for_bootstrap(
3132 storage_metadata,
3133 Some(register_ts),
3134 ready,
3135 &migrated_storage_collections,
3136 )
3137 .await
3138 .unwrap_or_terminate("cannot fail to create collections");
3139 }
3140
3141 if !self.controller.read_only() {
3142 self.apply_local_write(register_ts).await;
3143 }
3144 }
3145
3146 fn bootstrap_sort_catalog_entries(&self) -> Vec<CatalogEntry> {
3153 let mut indexes_on = BTreeMap::<_, Vec<_>>::new();
3154 let mut non_indexes = Vec::new();
3155 for entry in self.catalog().entries().cloned() {
3156 if let Some(index) = entry.index() {
3157 let on = self.catalog().get_entry_by_global_id(&index.on);
3158 indexes_on.entry(on.id()).or_default().push(entry);
3159 } else {
3160 non_indexes.push(entry);
3161 }
3162 }
3163
3164 let key_fn = |entry: &CatalogEntry| entry.id;
3165 let dependencies_fn = |entry: &CatalogEntry| entry.uses();
3166 sort_topological(&mut non_indexes, key_fn, dependencies_fn);
3167
3168 let mut result = Vec::new();
3169 for entry in non_indexes {
3170 let id = entry.id();
3171 result.push(entry);
3172 if let Some(mut indexes) = indexes_on.remove(&id) {
3173 result.append(&mut indexes);
3174 }
3175 }
3176
3177 soft_assert_or_log!(
3178 indexes_on.is_empty(),
3179 "indexes with missing dependencies: {indexes_on:?}",
3180 );
3181
3182 result
3183 }
3184
3185 #[instrument]
3196 fn bootstrap_dataflow_plans(
3197 &mut self,
3198 ordered_catalog_entries: &[CatalogEntry],
3199 mut cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
3200 ) -> Result<BTreeMap<GlobalId, GlobalExpressions>, AdapterError> {
3201 let mut instance_snapshots = BTreeMap::new();
3207 let mut uncached_expressions = BTreeMap::new();
3208
3209 let optimizer_config = |catalog: &Catalog, cluster_id| {
3210 let system_config = catalog.system_config();
3211 let overrides = catalog.get_cluster(cluster_id).config.features();
3212 OptimizerConfig::from(system_config).override_from(&overrides)
3213 };
3214
3215 for entry in ordered_catalog_entries {
3216 match entry.item() {
3217 CatalogItem::Index(idx) => {
3218 let compute_instance =
3220 instance_snapshots.entry(idx.cluster_id).or_insert_with(|| {
3221 self.instance_snapshot(idx.cluster_id)
3222 .expect("compute instance exists")
3223 });
3224 let global_id = idx.global_id();
3225
3226 if compute_instance.contains_collection(&global_id) {
3229 continue;
3230 }
3231
3232 let optimizer_config = optimizer_config(&self.catalog, idx.cluster_id);
3233
3234 let (optimized_plan, physical_plan, metainfo) =
3235 match cached_global_exprs.remove(&global_id) {
3236 Some(global_expressions)
3237 if global_expressions.optimizer_features
3238 == optimizer_config.features =>
3239 {
3240 debug!("global expression cache hit for {global_id:?}");
3241 (
3242 global_expressions.global_mir,
3243 global_expressions.physical_plan,
3244 global_expressions.dataflow_metainfos,
3245 )
3246 }
3247 Some(_) | None => {
3248 let (optimized_plan, global_lir_plan) = {
3249 let mut optimizer = optimize::index::Optimizer::new(
3251 self.owned_catalog(),
3252 compute_instance.clone(),
3253 global_id,
3254 optimizer_config.clone(),
3255 self.optimizer_metrics(),
3256 );
3257
3258 let index_plan = optimize::index::Index::new(
3260 entry.name().clone(),
3261 idx.on,
3262 idx.keys.to_vec(),
3263 );
3264 let global_mir_plan = optimizer.optimize(index_plan)?;
3265 let optimized_plan = global_mir_plan.df_desc().clone();
3266
3267 let global_lir_plan = optimizer.optimize(global_mir_plan)?;
3269
3270 (optimized_plan, global_lir_plan)
3271 };
3272
3273 let (physical_plan, metainfo) = global_lir_plan.unapply();
3274 let metainfo = {
3275 let notice_ids =
3277 std::iter::repeat_with(|| self.allocate_transient_id())
3278 .map(|(_item_id, gid)| gid)
3279 .take(metainfo.optimizer_notices.len())
3280 .collect::<Vec<_>>();
3281 self.catalog().render_notices(
3283 metainfo,
3284 notice_ids,
3285 Some(idx.global_id()),
3286 )
3287 };
3288 uncached_expressions.insert(
3289 global_id,
3290 GlobalExpressions {
3291 global_mir: optimized_plan.clone(),
3292 physical_plan: physical_plan.clone(),
3293 dataflow_metainfos: metainfo.clone(),
3294 optimizer_features: optimizer_config.features.clone(),
3295 },
3296 );
3297 (optimized_plan, physical_plan, metainfo)
3298 }
3299 };
3300
3301 let catalog = self.catalog_mut();
3302 catalog.set_optimized_plan(idx.global_id(), optimized_plan);
3303 catalog.set_physical_plan(idx.global_id(), physical_plan);
3304 catalog.set_dataflow_metainfo(idx.global_id(), metainfo);
3305
3306 compute_instance.insert_collection(idx.global_id());
3307 }
3308 CatalogItem::MaterializedView(mv) => {
3309 let compute_instance =
3311 instance_snapshots.entry(mv.cluster_id).or_insert_with(|| {
3312 self.instance_snapshot(mv.cluster_id)
3313 .expect("compute instance exists")
3314 });
3315 let global_id = mv.global_id_writes();
3316
3317 let optimizer_config = optimizer_config(&self.catalog, mv.cluster_id);
3318
3319 let (optimized_plan, physical_plan, metainfo) = match cached_global_exprs
3320 .remove(&global_id)
3321 {
3322 Some(global_expressions)
3323 if global_expressions.optimizer_features
3324 == optimizer_config.features =>
3325 {
3326 debug!("global expression cache hit for {global_id:?}");
3327 (
3328 global_expressions.global_mir,
3329 global_expressions.physical_plan,
3330 global_expressions.dataflow_metainfos,
3331 )
3332 }
3333 Some(_) | None => {
3334 let (_, internal_view_id) = self.allocate_transient_id();
3335 let debug_name = self
3336 .catalog()
3337 .resolve_full_name(entry.name(), None)
3338 .to_string();
3339
3340 let (optimized_plan, global_lir_plan) = {
3341 let mut optimizer = optimize::materialized_view::Optimizer::new(
3343 self.owned_catalog().as_optimizer_catalog(),
3344 compute_instance.clone(),
3345 global_id,
3346 internal_view_id,
3347 mv.desc.latest().iter_names().cloned().collect(),
3348 mv.non_null_assertions.clone(),
3349 mv.refresh_schedule.clone(),
3350 debug_name,
3351 optimizer_config.clone(),
3352 self.optimizer_metrics(),
3353 );
3354
3355 let typ = infer_sql_type_for_catalog(
3358 &mv.raw_expr,
3359 &mv.locally_optimized_expr.as_ref().clone(),
3360 );
3361 let global_mir_plan = optimizer
3362 .optimize((mv.locally_optimized_expr.as_ref().clone(), typ))?;
3363 let optimized_plan = global_mir_plan.df_desc().clone();
3364
3365 let global_lir_plan = optimizer.optimize(global_mir_plan)?;
3367
3368 (optimized_plan, global_lir_plan)
3369 };
3370
3371 let (physical_plan, metainfo) = global_lir_plan.unapply();
3372 let metainfo = {
3373 let notice_ids =
3375 std::iter::repeat_with(|| self.allocate_transient_id())
3376 .map(|(_item_id, global_id)| global_id)
3377 .take(metainfo.optimizer_notices.len())
3378 .collect::<Vec<_>>();
3379 self.catalog().render_notices(
3381 metainfo,
3382 notice_ids,
3383 Some(mv.global_id_writes()),
3384 )
3385 };
3386 uncached_expressions.insert(
3387 global_id,
3388 GlobalExpressions {
3389 global_mir: optimized_plan.clone(),
3390 physical_plan: physical_plan.clone(),
3391 dataflow_metainfos: metainfo.clone(),
3392 optimizer_features: optimizer_config.features.clone(),
3393 },
3394 );
3395 (optimized_plan, physical_plan, metainfo)
3396 }
3397 };
3398
3399 let catalog = self.catalog_mut();
3400 catalog.set_optimized_plan(mv.global_id_writes(), optimized_plan);
3401 catalog.set_physical_plan(mv.global_id_writes(), physical_plan);
3402 catalog.set_dataflow_metainfo(mv.global_id_writes(), metainfo);
3403
3404 compute_instance.insert_collection(mv.global_id_writes());
3405 }
3406 CatalogItem::Table(_)
3407 | CatalogItem::Source(_)
3408 | CatalogItem::Log(_)
3409 | CatalogItem::View(_)
3410 | CatalogItem::Sink(_)
3411 | CatalogItem::Type(_)
3412 | CatalogItem::Func(_)
3413 | CatalogItem::Secret(_)
3414 | CatalogItem::Connection(_) => (),
3415 }
3416 }
3417
3418 Ok(uncached_expressions)
3419 }
3420
3421 async fn bootstrap_dataflow_as_ofs(&mut self) -> BTreeMap<GlobalId, ReadHold> {
3431 let mut catalog_ids = Vec::new();
3432 let mut dataflows = Vec::new();
3433 let mut read_policies = BTreeMap::new();
3434 for entry in self.catalog.entries() {
3435 let gid = match entry.item() {
3436 CatalogItem::Index(idx) => idx.global_id(),
3437 CatalogItem::MaterializedView(mv) => mv.global_id_writes(),
3438 CatalogItem::Table(_)
3439 | CatalogItem::Source(_)
3440 | CatalogItem::Log(_)
3441 | CatalogItem::View(_)
3442 | CatalogItem::Sink(_)
3443 | CatalogItem::Type(_)
3444 | CatalogItem::Func(_)
3445 | CatalogItem::Secret(_)
3446 | CatalogItem::Connection(_) => continue,
3447 };
3448 if let Some(plan) = self.catalog.try_get_physical_plan(&gid) {
3449 catalog_ids.push(gid);
3450 dataflows.push(plan.clone());
3451
3452 if let Some(compaction_window) = entry.item().initial_logical_compaction_window() {
3453 read_policies.insert(gid, compaction_window.into());
3454 }
3455 }
3456 }
3457
3458 let read_ts = self.get_local_read_ts().await;
3459 let read_holds = as_of_selection::run(
3460 &mut dataflows,
3461 &read_policies,
3462 &*self.controller.storage_collections,
3463 read_ts,
3464 self.controller.read_only(),
3465 );
3466
3467 let catalog = self.catalog_mut();
3468 for (id, plan) in catalog_ids.into_iter().zip_eq(dataflows) {
3469 catalog.set_physical_plan(id, plan);
3470 }
3471
3472 read_holds
3473 }
3474
3475 fn serve(
3484 mut self,
3485 mut internal_cmd_rx: mpsc::UnboundedReceiver<Message>,
3486 mut strict_serializable_reads_rx: mpsc::UnboundedReceiver<(ConnectionId, PendingReadTxn)>,
3487 mut cmd_rx: mpsc::UnboundedReceiver<(OpenTelemetryContext, Command)>,
3488 group_commit_rx: appends::GroupCommitWaiter,
3489 ) -> LocalBoxFuture<'static, ()> {
3490 async move {
3491 let mut cluster_events = self.controller.events_stream();
3493 let last_message = Arc::new(Mutex::new(LastMessage {
3494 kind: "none",
3495 stmt: None,
3496 }));
3497
3498 let (idle_tx, mut idle_rx) = tokio::sync::mpsc::channel(1);
3499 let idle_metric = self.metrics.queue_busy_seconds.clone();
3500 let last_message_watchdog = Arc::clone(&last_message);
3501
3502 spawn(|| "coord watchdog", async move {
3503 let mut interval = tokio::time::interval(Duration::from_secs(5));
3508 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
3512
3513 let mut coord_stuck = false;
3515
3516 loop {
3517 interval.tick().await;
3518
3519 let duration = tokio::time::Duration::from_secs(30);
3521 let timeout = tokio::time::timeout(duration, idle_tx.reserve()).await;
3522 let Ok(maybe_permit) = timeout else {
3523 if !coord_stuck {
3525 let last_message = last_message_watchdog.lock().expect("poisoned");
3526 tracing::warn!(
3527 last_message_kind = %last_message.kind,
3528 last_message_sql = %last_message.stmt_to_string(),
3529 "coordinator stuck for {duration:?}",
3530 );
3531 }
3532 coord_stuck = true;
3533
3534 continue;
3535 };
3536
3537 if coord_stuck {
3539 tracing::info!("Coordinator became unstuck");
3540 }
3541 coord_stuck = false;
3542
3543 let Ok(permit) = maybe_permit else {
3545 break;
3546 };
3547
3548 permit.send(idle_metric.start_timer());
3549 }
3550 });
3551
3552 self.schedule_storage_usage_collection().await;
3553 self.schedule_arrangement_sizes_collection().await;
3554 self.spawn_privatelink_vpc_endpoints_watch_task();
3555 self.spawn_statement_logging_task();
3556 flags::tracing_config(self.catalog.system_config()).apply(&self.tracing_handle);
3557
3558 let warn_threshold = self
3560 .catalog()
3561 .system_config()
3562 .coord_slow_message_warn_threshold();
3563
3564 const MESSAGE_BATCH: usize = 64;
3566 let mut messages = Vec::with_capacity(MESSAGE_BATCH);
3567 let mut cmd_messages = Vec::with_capacity(MESSAGE_BATCH);
3568
3569 let message_batch = self.metrics.message_batch.clone();
3570
3571 loop {
3572 select! {
3576 biased;
3581
3582 _ = internal_cmd_rx.recv_many(&mut messages, MESSAGE_BATCH) => {},
3586 Some(event) = cluster_events.next() => {
3590 messages.push(Message::ClusterEvent(event))
3591 },
3592 () = self.controller.ready() => {
3596 let controller = match self.controller.get_readiness() {
3600 Readiness::Storage => ControllerReadiness::Storage,
3601 Readiness::Compute => ControllerReadiness::Compute,
3602 Readiness::Metrics(_) => ControllerReadiness::Metrics,
3603 Readiness::Internal(_) => ControllerReadiness::Internal,
3604 Readiness::NotReady => unreachable!("just signaled as ready"),
3605 };
3606 messages.push(Message::ControllerReady { controller });
3607 }
3608 permit = group_commit_rx.ready() => {
3611 let user_write_spans = self.pending_writes.iter().flat_map(|x| match x {
3617 PendingWriteTxn::User{span, ..} => Some(span),
3618 PendingWriteTxn::System{..} => None,
3619 });
3620 let span = match user_write_spans.exactly_one() {
3621 Ok(span) => span.clone(),
3622 Err(user_write_spans) => {
3623 let span = info_span!(parent: None, "group_commit_notify");
3624 for s in user_write_spans {
3625 span.follows_from(s);
3626 }
3627 span
3628 }
3629 };
3630 messages.push(Message::GroupCommitInitiate(span, Some(permit)));
3631 },
3632 count = cmd_rx.recv_many(&mut cmd_messages, MESSAGE_BATCH) => {
3636 if count == 0 {
3637 break;
3638 } else {
3639 messages.extend(cmd_messages.drain(..).map(
3640 |(otel_ctx, cmd)| Message::Command(otel_ctx, cmd),
3641 ));
3642 }
3643 },
3644 Some(pending_read_txn) = strict_serializable_reads_rx.recv() => {
3648 let mut pending_read_txns = vec![pending_read_txn];
3649 while let Ok(pending_read_txn) = strict_serializable_reads_rx.try_recv() {
3650 pending_read_txns.push(pending_read_txn);
3651 }
3652 for (conn_id, pending_read_txn) in pending_read_txns {
3653 let prev = self
3654 .pending_linearize_read_txns
3655 .insert(conn_id, pending_read_txn);
3656 soft_assert_or_log!(
3657 prev.is_none(),
3658 "connections can not have multiple concurrent reads, prev: {prev:?}"
3659 )
3660 }
3661 messages.push(Message::LinearizeReads);
3662 }
3663 _ = self.advance_timelines_interval.tick() => {
3667 let span = info_span!(parent: None, "coord::advance_timelines_interval");
3668 span.follows_from(Span::current());
3669
3670 if self.controller.read_only() {
3675 messages.push(Message::AdvanceTimelines);
3676 } else {
3677 messages.push(Message::GroupCommitInitiate(span, None));
3678 }
3679 },
3680 _ = self.check_cluster_scheduling_policies_interval.tick() => {
3684 messages.push(Message::CheckSchedulingPolicies);
3685 },
3686
3687 _ = self.caught_up_check_interval.tick() => {
3691 self.maybe_check_caught_up().await;
3696
3697 continue;
3698 },
3699
3700 timer = idle_rx.recv() => {
3705 timer.expect("does not drop").observe_duration();
3706 self.metrics
3707 .message_handling
3708 .with_label_values(&["watchdog"])
3709 .observe(0.0);
3710 continue;
3711 }
3712 };
3713
3714 message_batch.observe(f64::cast_lossy(messages.len()));
3716
3717 for msg in messages.drain(..) {
3718 let msg_kind = msg.kind();
3721 let span = span!(
3722 target: "mz_adapter::coord::handle_message_loop",
3723 Level::INFO,
3724 "coord::handle_message",
3725 kind = msg_kind
3726 );
3727 let otel_context = span.context().span().span_context().clone();
3728
3729 *last_message.lock().expect("poisoned") = LastMessage {
3733 kind: msg_kind,
3734 stmt: match &msg {
3735 Message::Command(
3736 _,
3737 Command::Execute {
3738 portal_name,
3739 session,
3740 ..
3741 },
3742 ) => session
3743 .get_portal_unverified(portal_name)
3744 .and_then(|p| p.stmt.as_ref().map(Arc::clone)),
3745 _ => None,
3746 },
3747 };
3748
3749 let start = Instant::now();
3750 self.handle_message(msg).instrument(span).await;
3751 let duration = start.elapsed();
3752
3753 self.metrics
3754 .message_handling
3755 .with_label_values(&[msg_kind])
3756 .observe(duration.as_secs_f64());
3757
3758 if duration > warn_threshold {
3760 let trace_id = otel_context.is_valid().then(|| otel_context.trace_id());
3761 tracing::error!(
3762 ?msg_kind,
3763 ?trace_id,
3764 ?duration,
3765 "very slow coordinator message"
3766 );
3767 }
3768 }
3769 }
3770 if let Some(catalog) = Arc::into_inner(self.catalog) {
3773 catalog.expire().await;
3774 }
3775 }
3776 .boxed_local()
3777 }
3778
3779 fn catalog(&self) -> &Catalog {
3781 &self.catalog
3782 }
3783
3784 fn owned_catalog(&self) -> Arc<Catalog> {
3787 Arc::clone(&self.catalog)
3788 }
3789
3790 fn optimizer_metrics(&self) -> OptimizerMetrics {
3793 self.optimizer_metrics.clone()
3794 }
3795
3796 fn catalog_mut(&mut self) -> &mut Catalog {
3798 Arc::make_mut(&mut self.catalog)
3806 }
3807
3808 async fn refill_user_id_pool(&mut self, min_count: u64) -> Result<(), AdapterError> {
3813 let batch_size = USER_ID_POOL_BATCH_SIZE.get(self.catalog().system_config().dyncfgs());
3814 let to_allocate = min_count.max(u64::from(batch_size));
3815 let id_ts = self.get_catalog_write_ts().await;
3816 let ids = self.catalog().allocate_user_ids(to_allocate, id_ts).await?;
3817 if let (Some((first_id, _)), Some((last_id, _))) = (ids.first(), ids.last()) {
3818 let start = match first_id {
3819 CatalogItemId::User(id) => *id,
3820 other => {
3821 return Err(AdapterError::Internal(format!(
3822 "expected User CatalogItemId, got {other:?}"
3823 )));
3824 }
3825 };
3826 let end = match last_id {
3827 CatalogItemId::User(id) => *id + 1, other => {
3829 return Err(AdapterError::Internal(format!(
3830 "expected User CatalogItemId, got {other:?}"
3831 )));
3832 }
3833 };
3834 self.user_id_pool.refill(start, end);
3835 } else {
3836 return Err(AdapterError::Internal(
3837 "catalog returned no user IDs".into(),
3838 ));
3839 }
3840 Ok(())
3841 }
3842
3843 async fn allocate_user_id(&mut self) -> Result<(CatalogItemId, GlobalId), AdapterError> {
3845 if let Some(id) = self.user_id_pool.allocate() {
3846 return Ok((CatalogItemId::User(id), GlobalId::User(id)));
3847 }
3848 self.refill_user_id_pool(1).await?;
3849 let id = self.user_id_pool.allocate().expect("ID pool just refilled");
3850 Ok((CatalogItemId::User(id), GlobalId::User(id)))
3851 }
3852
3853 async fn allocate_user_ids(
3855 &mut self,
3856 count: u64,
3857 ) -> Result<Vec<(CatalogItemId, GlobalId)>, AdapterError> {
3858 if self.user_id_pool.remaining() < count {
3859 self.refill_user_id_pool(count).await?;
3860 }
3861 let raw_ids = self
3862 .user_id_pool
3863 .allocate_many(count)
3864 .expect("pool has enough IDs after refill");
3865 Ok(raw_ids
3866 .into_iter()
3867 .map(|id| (CatalogItemId::User(id), GlobalId::User(id)))
3868 .collect())
3869 }
3870
3871 fn connection_context(&self) -> &ConnectionContext {
3873 self.controller.connection_context()
3874 }
3875
3876 fn secrets_reader(&self) -> &Arc<dyn SecretsReader> {
3878 &self.connection_context().secrets_reader
3879 }
3880
3881 #[allow(dead_code)]
3886 pub(crate) fn broadcast_notice(&self, notice: AdapterNotice) {
3887 for meta in self.active_conns.values() {
3888 let _ = meta.notice_tx.send(notice.clone());
3889 }
3890 }
3891
3892 pub(crate) fn broadcast_notice_tx(
3895 &self,
3896 ) -> Box<dyn FnOnce(AdapterNotice) -> () + Send + 'static> {
3897 let senders: Vec<_> = self
3898 .active_conns
3899 .values()
3900 .map(|meta| meta.notice_tx.clone())
3901 .collect();
3902 Box::new(move |notice| {
3903 for tx in senders {
3904 let _ = tx.send(notice.clone());
3905 }
3906 })
3907 }
3908
3909 pub(crate) fn active_conns(&self) -> &BTreeMap<ConnectionId, ConnMeta> {
3910 &self.active_conns
3911 }
3912
3913 #[instrument(level = "debug")]
3914 pub(crate) fn retire_execution(
3915 &mut self,
3916 reason: StatementEndedExecutionReason,
3917 ctx_extra: ExecuteContextExtra,
3918 ) {
3919 if let Some(uuid) = ctx_extra.retire() {
3920 self.end_statement_execution(uuid, reason);
3921 }
3922 }
3923
3924 #[instrument(level = "debug")]
3926 pub fn dataflow_builder(&self, instance: ComputeInstanceId) -> DataflowBuilder<'_> {
3927 let compute = self
3928 .instance_snapshot(instance)
3929 .expect("compute instance does not exist");
3930 DataflowBuilder::new(self.catalog().state(), compute)
3931 }
3932
3933 pub fn instance_snapshot(
3935 &self,
3936 id: ComputeInstanceId,
3937 ) -> Result<ComputeInstanceSnapshot, InstanceMissing> {
3938 ComputeInstanceSnapshot::new(&self.controller, id)
3939 }
3940
3941 pub(crate) async fn ship_dataflow(
3948 &mut self,
3949 dataflow: DataflowDescription<Plan>,
3950 instance: ComputeInstanceId,
3951 target_replica: Option<ReplicaId>,
3952 ) {
3953 self.try_ship_dataflow(dataflow, instance, target_replica)
3954 .await
3955 .unwrap_or_terminate("dataflow creation cannot fail");
3956 }
3957
3958 pub(crate) async fn try_ship_dataflow(
3961 &mut self,
3962 dataflow: DataflowDescription<Plan>,
3963 instance: ComputeInstanceId,
3964 target_replica: Option<ReplicaId>,
3965 ) -> Result<(), DataflowCreationError> {
3966 let export_ids = dataflow.exported_index_ids().collect();
3969
3970 self.controller
3971 .compute
3972 .create_dataflow(instance, dataflow, target_replica)?;
3973
3974 self.initialize_compute_read_policies(export_ids, instance, CompactionWindow::Default)
3975 .await;
3976
3977 Ok(())
3978 }
3979
3980 pub(crate) fn allow_writes(&mut self, instance: ComputeInstanceId, id: GlobalId) {
3984 self.controller
3985 .compute
3986 .allow_writes(instance, id)
3987 .unwrap_or_terminate("allow_writes cannot fail");
3988 }
3989
3990 pub(crate) async fn ship_dataflow_and_notice_builtin_table_updates(
3992 &mut self,
3993 dataflow: DataflowDescription<Plan>,
3994 instance: ComputeInstanceId,
3995 notice_builtin_updates_fut: Option<BuiltinTableAppendNotify>,
3996 target_replica: Option<ReplicaId>,
3997 ) {
3998 if let Some(notice_builtin_updates_fut) = notice_builtin_updates_fut {
3999 let ship_dataflow_fut = self.ship_dataflow(dataflow, instance, target_replica);
4000 let ((), ()) =
4001 futures::future::join(notice_builtin_updates_fut, ship_dataflow_fut).await;
4002 } else {
4003 self.ship_dataflow(dataflow, instance, target_replica).await;
4004 }
4005 }
4006
4007 pub fn install_compute_watch_set(
4011 &mut self,
4012 conn_id: ConnectionId,
4013 objects: BTreeSet<GlobalId>,
4014 t: Timestamp,
4015 state: WatchSetResponse,
4016 ) -> Result<(), CollectionLookupError> {
4017 let ws_id = self.controller.install_compute_watch_set(objects, t)?;
4018 self.connection_watch_sets
4019 .entry(conn_id.clone())
4020 .or_default()
4021 .insert(ws_id);
4022 self.installed_watch_sets.insert(ws_id, (conn_id, state));
4023 Ok(())
4024 }
4025
4026 pub fn install_storage_watch_set(
4030 &mut self,
4031 conn_id: ConnectionId,
4032 objects: BTreeSet<GlobalId>,
4033 t: Timestamp,
4034 state: WatchSetResponse,
4035 ) -> Result<(), CollectionMissing> {
4036 let ws_id = self.controller.install_storage_watch_set(objects, t)?;
4037 self.connection_watch_sets
4038 .entry(conn_id.clone())
4039 .or_default()
4040 .insert(ws_id);
4041 self.installed_watch_sets.insert(ws_id, (conn_id, state));
4042 Ok(())
4043 }
4044
4045 pub fn cancel_pending_watchsets(&mut self, conn_id: &ConnectionId) {
4047 if let Some(ws_ids) = self.connection_watch_sets.remove(conn_id) {
4048 for ws_id in ws_ids {
4049 self.installed_watch_sets.remove(&ws_id);
4050 }
4051 }
4052 }
4053
4054 pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
4058 let global_timelines: BTreeMap<_, _> = self
4064 .global_timelines
4065 .iter()
4066 .map(|(timeline, state)| (timeline.to_string(), format!("{state:?}")))
4067 .collect();
4068 let active_conns: BTreeMap<_, _> = self
4069 .active_conns
4070 .iter()
4071 .map(|(id, meta)| (id.unhandled().to_string(), format!("{meta:?}")))
4072 .collect();
4073 let txn_read_holds: BTreeMap<_, _> = self
4074 .txn_read_holds
4075 .iter()
4076 .map(|(id, capability)| (id.unhandled().to_string(), format!("{capability:?}")))
4077 .collect();
4078 let pending_peeks: BTreeMap<_, _> = self
4079 .pending_peeks
4080 .iter()
4081 .map(|(id, peek)| (id.to_string(), format!("{peek:?}")))
4082 .collect();
4083 let client_pending_peeks: BTreeMap<_, _> = self
4084 .client_pending_peeks
4085 .iter()
4086 .map(|(id, peek)| {
4087 let peek: BTreeMap<_, _> = peek
4088 .iter()
4089 .map(|(uuid, storage_id)| (uuid.to_string(), storage_id))
4090 .collect();
4091 (id.to_string(), peek)
4092 })
4093 .collect();
4094 let pending_linearize_read_txns: BTreeMap<_, _> = self
4095 .pending_linearize_read_txns
4096 .iter()
4097 .map(|(id, read_txn)| (id.unhandled().to_string(), format!("{read_txn:?}")))
4098 .collect();
4099
4100 Ok(serde_json::json!({
4101 "global_timelines": global_timelines,
4102 "active_conns": active_conns,
4103 "txn_read_holds": txn_read_holds,
4104 "pending_peeks": pending_peeks,
4105 "client_pending_peeks": client_pending_peeks,
4106 "pending_linearize_read_txns": pending_linearize_read_txns,
4107 "controller": self.controller.dump().await?,
4108 }))
4109 }
4110
4111 async fn prune_storage_usage_events_on_startup(&self, retention_period: Duration) {
4125 let item_id = self
4126 .catalog()
4127 .resolve_builtin_table(&MZ_STORAGE_USAGE_BY_SHARD);
4128 let global_id = self.catalog.get_entry(&item_id).latest_global_id();
4129 let read_ts = self.get_local_read_ts().await;
4130 let current_contents_fut = self
4131 .controller
4132 .storage_collections
4133 .snapshot(global_id, read_ts);
4134 let internal_cmd_tx = self.internal_cmd_tx.clone();
4135 spawn(|| "storage_usage_prune", async move {
4136 let mut current_contents = current_contents_fut
4137 .await
4138 .unwrap_or_terminate("cannot fail to fetch snapshot");
4139 differential_dataflow::consolidation::consolidate(&mut current_contents);
4140
4141 let cutoff_ts = u128::from(read_ts).saturating_sub(retention_period.as_millis());
4142 let mut expired = Vec::new();
4143 for (row, diff) in current_contents {
4144 assert_eq!(
4145 diff, 1,
4146 "consolidated contents should not contain retractions: ({row:#?}, {diff:#?})"
4147 );
4148 let collection_timestamp = row
4150 .unpack()
4151 .get(3)
4152 .expect("definition of mz_storage_by_shard changed")
4153 .unwrap_timestamptz();
4154 let collection_timestamp = collection_timestamp.timestamp_millis();
4155 let collection_timestamp: u128 = collection_timestamp
4156 .try_into()
4157 .expect("all collections happen after Jan 1 1970");
4158 if collection_timestamp < cutoff_ts {
4159 debug!("pruning storage event {row:?}");
4160 let builtin_update = BuiltinTableUpdate::row(item_id, row, Diff::MINUS_ONE);
4161 expired.push(builtin_update);
4162 }
4163 }
4164
4165 let _ = internal_cmd_tx.send(Message::StorageUsagePrune(expired));
4167 });
4168 }
4169
4170 async fn prune_arrangement_sizes_history_on_startup(&self) {
4179 if self.controller.read_only() {
4181 return;
4182 }
4183
4184 let retention_period = mz_adapter_types::dyncfgs::ARRANGEMENT_SIZE_HISTORY_RETENTION_PERIOD
4185 .get(self.catalog().system_config().dyncfgs());
4186 let item_id = self
4187 .catalog()
4188 .resolve_builtin_table(&mz_catalog::builtin::MZ_OBJECT_ARRANGEMENT_SIZE_HISTORY);
4189 let global_id = self.catalog.get_entry(&item_id).latest_global_id();
4190 let read_ts = self.get_local_read_ts().await;
4191 let current_contents_fut = self
4192 .controller
4193 .storage_collections
4194 .snapshot(global_id, read_ts);
4195 let internal_cmd_tx = self.internal_cmd_tx.clone();
4196 spawn(|| "arrangement_sizes_history_prune", async move {
4197 let mut current_contents = current_contents_fut
4198 .await
4199 .unwrap_or_terminate("cannot fail to fetch snapshot");
4200 differential_dataflow::consolidation::consolidate(&mut current_contents);
4201
4202 let cutoff_ts = u128::from(read_ts).saturating_sub(retention_period.as_millis());
4203 let expired =
4204 arrangement_sizes_expired_retractions(current_contents, cutoff_ts, item_id);
4205
4206 let _ = internal_cmd_tx.send(Message::ArrangementSizesPrune(expired));
4210 });
4211 }
4212
4213 fn current_credit_consumption_rate(&self) -> Numeric {
4214 self.catalog()
4215 .user_cluster_replicas()
4216 .filter_map(|replica| match &replica.config.location {
4217 ReplicaLocation::Managed(location) => Some(location.size_for_billing()),
4218 ReplicaLocation::Unmanaged(_) => None,
4219 })
4220 .map(|size| {
4221 self.catalog()
4222 .cluster_replica_sizes()
4223 .0
4224 .get(size)
4225 .expect("location size is validated against the cluster replica sizes")
4226 .credits_per_hour
4227 })
4228 .sum()
4229 }
4230}
4231
4232fn arrangement_sizes_expired_retractions(
4240 rows: impl IntoIterator<Item = (mz_repr::Row, i64)>,
4241 cutoff_ts: u128,
4242 item_id: CatalogItemId,
4243) -> Vec<BuiltinTableUpdate> {
4244 let mut expired = Vec::new();
4245 for (row, diff) in rows {
4246 assert_eq!(
4247 diff, 1,
4248 "consolidated contents should not contain retractions: ({row:#?}, {diff:#?})"
4249 );
4250 let collection_timestamp = row
4251 .unpack()
4252 .get(3)
4253 .expect("definition of mz_object_arrangement_size_history changed")
4254 .unwrap_timestamptz()
4255 .timestamp_millis();
4256 let collection_timestamp: u128 = collection_timestamp
4257 .try_into()
4258 .expect("all collections happen after Jan 1 1970");
4259 if collection_timestamp < cutoff_ts {
4260 expired.push(BuiltinTableUpdate::row(item_id, row, Diff::MINUS_ONE));
4261 }
4262 }
4263 expired
4264}
4265
4266#[cfg(test)]
4267impl Coordinator {
4268 #[allow(dead_code)]
4269 async fn verify_ship_dataflow_no_error(&mut self, dataflow: DataflowDescription<Plan>) {
4270 let compute_instance = ComputeInstanceId::user(1).expect("1 is a valid ID");
4278
4279 let _: () = self.ship_dataflow(dataflow, compute_instance, None).await;
4280 }
4281}
4282
4283struct LastMessage {
4285 kind: &'static str,
4286 stmt: Option<Arc<Statement<Raw>>>,
4287}
4288
4289impl LastMessage {
4290 fn stmt_to_string(&self) -> Cow<'static, str> {
4292 self.stmt
4293 .as_ref()
4294 .map(|stmt| stmt.to_ast_string_redacted().into())
4295 .unwrap_or(Cow::Borrowed("<none>"))
4296 }
4297}
4298
4299impl fmt::Debug for LastMessage {
4300 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4301 f.debug_struct("LastMessage")
4302 .field("kind", &self.kind)
4303 .field("stmt", &self.stmt_to_string())
4304 .finish()
4305 }
4306}
4307
4308impl Drop for LastMessage {
4309 fn drop(&mut self) {
4310 if std::thread::panicking() {
4312 eprintln!("Coordinator panicking, dumping last message\n{self:?}",);
4314 }
4315 }
4316}
4317
4318pub fn serve(
4330 Config {
4331 controller_config,
4332 controller_envd_epoch,
4333 mut storage,
4334 audit_logs_iterator,
4335 timestamp_oracle_url,
4336 unsafe_mode,
4337 all_features,
4338 build_info,
4339 environment_id,
4340 metrics_registry,
4341 now,
4342 secrets_controller,
4343 cloud_resource_controller,
4344 cluster_replica_sizes,
4345 builtin_system_cluster_config,
4346 builtin_catalog_server_cluster_config,
4347 builtin_probe_cluster_config,
4348 builtin_support_cluster_config,
4349 builtin_analytics_cluster_config,
4350 system_parameter_defaults,
4351 availability_zones,
4352 storage_usage_client,
4353 storage_usage_collection_interval,
4354 storage_usage_retention_period,
4355 segment_client,
4356 egress_addresses,
4357 aws_account_id,
4358 aws_privatelink_availability_zones,
4359 connection_context,
4360 connection_limit_callback,
4361 remote_system_parameters,
4362 webhook_concurrency_limit,
4363 http_host_name,
4364 tracing_handle,
4365 read_only_controllers,
4366 caught_up_trigger: clusters_caught_up_trigger,
4367 helm_chart_version,
4368 license_key,
4369 external_login_password_mz_system,
4370 force_builtin_schema_migration,
4371 }: Config,
4372) -> BoxFuture<'static, Result<(Handle, Client), AdapterError>> {
4373 async move {
4374 let coord_start = Instant::now();
4375 info!("startup: coordinator init: beginning");
4376 info!("startup: coordinator init: preamble beginning");
4377
4378 let _builtins = LazyLock::force(&BUILTINS_STATIC);
4382
4383 let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
4384 let (internal_cmd_tx, internal_cmd_rx) = mpsc::unbounded_channel();
4385 let (strict_serializable_reads_tx, strict_serializable_reads_rx) =
4386 mpsc::unbounded_channel();
4387
4388 if !availability_zones.iter().all_unique() {
4390 coord_bail!("availability zones must be unique");
4391 }
4392
4393 let aws_principal_context = match (
4394 aws_account_id,
4395 connection_context.aws_external_id_prefix.clone(),
4396 ) {
4397 (Some(aws_account_id), Some(aws_external_id_prefix)) => Some(AwsPrincipalContext {
4398 aws_account_id,
4399 aws_external_id_prefix,
4400 }),
4401 _ => None,
4402 };
4403
4404 let aws_privatelink_availability_zones = aws_privatelink_availability_zones
4405 .map(|azs_vec| BTreeSet::from_iter(azs_vec.iter().cloned()));
4406
4407 info!(
4408 "startup: coordinator init: preamble complete in {:?}",
4409 coord_start.elapsed()
4410 );
4411 let oracle_init_start = Instant::now();
4412 info!("startup: coordinator init: timestamp oracle init beginning");
4413
4414 let timestamp_oracle_config = timestamp_oracle_url
4415 .map(|url| TimestampOracleConfig::from_url(&url, &metrics_registry))
4416 .transpose()?;
4417 let mut initial_timestamps =
4418 get_initial_oracle_timestamps(×tamp_oracle_config).await?;
4419
4420 initial_timestamps
4424 .entry(Timeline::EpochMilliseconds)
4425 .or_insert_with(mz_repr::Timestamp::minimum);
4426 let mut timestamp_oracles = BTreeMap::new();
4427 for (timeline, initial_timestamp) in initial_timestamps {
4428 Coordinator::ensure_timeline_state_with_initial_time(
4429 &timeline,
4430 initial_timestamp,
4431 now.clone(),
4432 timestamp_oracle_config.clone(),
4433 &mut timestamp_oracles,
4434 read_only_controllers,
4435 )
4436 .await;
4437 }
4438
4439 let catalog_upper = storage.current_upper().await;
4443 let epoch_millis_oracle = ×tamp_oracles
4449 .get(&Timeline::EpochMilliseconds)
4450 .expect("inserted above")
4451 .oracle;
4452
4453 let mut boot_ts = if read_only_controllers {
4454 let read_ts = epoch_millis_oracle.read_ts().await;
4455 std::cmp::max(read_ts, catalog_upper)
4456 } else {
4457 epoch_millis_oracle.apply_write(catalog_upper).await;
4460 epoch_millis_oracle.write_ts().await.timestamp
4461 };
4462
4463 info!(
4464 "startup: coordinator init: timestamp oracle init complete in {:?}",
4465 oracle_init_start.elapsed()
4466 );
4467
4468 let catalog_open_start = Instant::now();
4469 info!("startup: coordinator init: catalog open beginning");
4470 let persist_client = controller_config
4471 .persist_clients
4472 .open(controller_config.persist_location.clone())
4473 .await
4474 .context("opening persist client")?;
4475 let builtin_item_migration_config =
4476 BuiltinItemMigrationConfig {
4477 persist_client: persist_client.clone(),
4478 read_only: read_only_controllers,
4479 force_migration: force_builtin_schema_migration,
4480 }
4481 ;
4482 let OpenCatalogResult {
4483 mut catalog,
4484 migrated_storage_collections_0dt,
4485 new_builtin_collections,
4486 builtin_table_updates,
4487 cached_global_exprs,
4488 uncached_local_exprs,
4489 } = Catalog::open(mz_catalog::config::Config {
4490 storage,
4491 metrics_registry: &metrics_registry,
4492 state: mz_catalog::config::StateConfig {
4493 unsafe_mode,
4494 all_features,
4495 build_info,
4496 environment_id: environment_id.clone(),
4497 read_only: read_only_controllers,
4498 now: now.clone(),
4499 boot_ts: boot_ts.clone(),
4500 skip_migrations: false,
4501 cluster_replica_sizes,
4502 builtin_system_cluster_config,
4503 builtin_catalog_server_cluster_config,
4504 builtin_probe_cluster_config,
4505 builtin_support_cluster_config,
4506 builtin_analytics_cluster_config,
4507 system_parameter_defaults,
4508 remote_system_parameters,
4509 availability_zones,
4510 egress_addresses,
4511 aws_principal_context,
4512 aws_privatelink_availability_zones,
4513 connection_context,
4514 http_host_name,
4515 builtin_item_migration_config,
4516 persist_client: persist_client.clone(),
4517 enable_expression_cache_override: None,
4518 helm_chart_version,
4519 external_login_password_mz_system,
4520 license_key: license_key.clone(),
4521 },
4522 })
4523 .await?;
4524
4525 let catalog_upper = catalog.current_upper().await;
4528 boot_ts = std::cmp::max(boot_ts, catalog_upper);
4529
4530 if !read_only_controllers {
4531 epoch_millis_oracle.apply_write(boot_ts).await;
4532 }
4533
4534 info!(
4535 "startup: coordinator init: catalog open complete in {:?}",
4536 catalog_open_start.elapsed()
4537 );
4538
4539 let coord_thread_start = Instant::now();
4540 info!("startup: coordinator init: coordinator thread start beginning");
4541
4542 let session_id = catalog.config().session_id;
4543 let start_instant = catalog.config().start_instant;
4544
4545 let (bootstrap_tx, bootstrap_rx) = oneshot::channel();
4549 let handle = TokioHandle::current();
4550
4551 let metrics = Metrics::register_into(&metrics_registry);
4552 let metrics_clone = metrics.clone();
4553 let optimizer_metrics = OptimizerMetrics::register_into(
4554 &metrics_registry,
4555 catalog.system_config().optimizer_e2e_latency_warning_threshold(),
4556 );
4557 let segment_client_clone = segment_client.clone();
4558 let coord_now = now.clone();
4559 let advance_timelines_interval =
4560 tokio::time::interval(catalog.system_config().default_timestamp_interval());
4561 let mut check_scheduling_policies_interval = tokio::time::interval(
4562 catalog
4563 .system_config()
4564 .cluster_check_scheduling_policies_interval(),
4565 );
4566 check_scheduling_policies_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
4567
4568 let clusters_caught_up_check_interval = if read_only_controllers {
4569 let dyncfgs = catalog.system_config().dyncfgs();
4570 let interval = WITH_0DT_DEPLOYMENT_CAUGHT_UP_CHECK_INTERVAL.get(dyncfgs);
4571
4572 let mut interval = tokio::time::interval(interval);
4573 interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
4574 interval
4575 } else {
4576 let mut interval = tokio::time::interval(Duration::from_secs(60 * 60));
4584 interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
4585 interval
4586 };
4587
4588 let clusters_caught_up_check =
4589 clusters_caught_up_trigger.map(|trigger| {
4590 let mut exclude_collections: BTreeSet<GlobalId> =
4591 new_builtin_collections.into_iter().collect();
4592
4593 let mut todo: Vec<_> = migrated_storage_collections_0dt
4603 .iter()
4604 .filter(|id| {
4605 catalog.state().get_entry(id).is_materialized_view()
4606 })
4607 .copied()
4608 .collect();
4609 while let Some(item_id) = todo.pop() {
4610 let entry = catalog.state().get_entry(&item_id);
4611 exclude_collections.extend(entry.global_ids());
4612 todo.extend_from_slice(entry.used_by());
4613 }
4614
4615 CaughtUpCheckContext {
4616 trigger,
4617 exclude_collections,
4618 }
4619 });
4620
4621 if let Some(TimestampOracleConfig::Postgres(pg_config)) =
4622 timestamp_oracle_config.as_ref()
4623 {
4624 let pg_timestamp_oracle_params =
4627 flags::timestamp_oracle_config(catalog.system_config());
4628 pg_timestamp_oracle_params.apply(pg_config);
4629 }
4630
4631 let connection_limit_callback: Arc<dyn Fn(&SystemVars) + Send + Sync> =
4634 Arc::new(move |system_vars: &SystemVars| {
4635 let limit: u64 = system_vars.max_connections().cast_into();
4636 let superuser_reserved: u64 =
4637 system_vars.superuser_reserved_connections().cast_into();
4638
4639 let superuser_reserved = if superuser_reserved >= limit {
4644 tracing::warn!(
4645 "superuser_reserved ({superuser_reserved}) is greater than max connections ({limit})!"
4646 );
4647 limit
4648 } else {
4649 superuser_reserved
4650 };
4651
4652 (connection_limit_callback)(limit, superuser_reserved);
4653 });
4654 catalog.system_config_mut().register_callback(
4655 &mz_sql::session::vars::MAX_CONNECTIONS,
4656 Arc::clone(&connection_limit_callback),
4657 );
4658 catalog.system_config_mut().register_callback(
4659 &mz_sql::session::vars::SUPERUSER_RESERVED_CONNECTIONS,
4660 connection_limit_callback,
4661 );
4662
4663 let (group_commit_tx, group_commit_rx) = appends::notifier();
4664
4665 let parent_span = tracing::Span::current();
4666 let thread = thread::Builder::new()
4667 .stack_size(3 * stack::STACK_SIZE)
4671 .name("coordinator".to_string())
4672 .spawn(move || {
4673 let span = info_span!(parent: parent_span, "coord::coordinator").entered();
4674
4675 let controller = handle
4676 .block_on({
4677 catalog.initialize_controller(
4678 controller_config,
4679 controller_envd_epoch,
4680 read_only_controllers,
4681 )
4682 })
4683 .unwrap_or_terminate("failed to initialize storage_controller");
4684 let catalog_upper = handle.block_on(catalog.current_upper());
4687 boot_ts = std::cmp::max(boot_ts, catalog_upper);
4688 if !read_only_controllers {
4689 let epoch_millis_oracle = ×tamp_oracles
4690 .get(&Timeline::EpochMilliseconds)
4691 .expect("inserted above")
4692 .oracle;
4693 handle.block_on(epoch_millis_oracle.apply_write(boot_ts));
4694 }
4695
4696 let catalog = Arc::new(catalog);
4697
4698 let caching_secrets_reader = CachingSecretsReader::new(secrets_controller.reader());
4699 let mut coord = Coordinator {
4700 controller,
4701 catalog,
4702 internal_cmd_tx,
4703 group_commit_tx,
4704 strict_serializable_reads_tx,
4705 global_timelines: timestamp_oracles,
4706 transient_id_gen: Arc::new(TransientIdGen::new()),
4707 active_conns: BTreeMap::new(),
4708 txn_read_holds: Default::default(),
4709 pending_peeks: BTreeMap::new(),
4710 client_pending_peeks: BTreeMap::new(),
4711 pending_linearize_read_txns: BTreeMap::new(),
4712 serialized_ddl: LockedVecDeque::new(),
4713 active_compute_sinks: BTreeMap::new(),
4714 active_webhooks: BTreeMap::new(),
4715 active_copies: BTreeMap::new(),
4716 connection_cancel_watches: BTreeMap::new(),
4717 introspection_subscribes: BTreeMap::new(),
4718 write_locks: BTreeMap::new(),
4719 deferred_write_ops: BTreeMap::new(),
4720 pending_writes: Vec::new(),
4721 advance_timelines_interval,
4722 secrets_controller,
4723 caching_secrets_reader,
4724 cloud_resource_controller,
4725 storage_usage_client,
4726 storage_usage_collection_interval,
4727 segment_client,
4728 metrics,
4729 optimizer_metrics,
4730 tracing_handle,
4731 statement_logging: StatementLogging::new(coord_now.clone()),
4732 webhook_concurrency_limit,
4733 timestamp_oracle_config,
4734 check_cluster_scheduling_policies_interval: check_scheduling_policies_interval,
4735 cluster_scheduling_decisions: BTreeMap::new(),
4736 caught_up_check_interval: clusters_caught_up_check_interval,
4737 caught_up_check: clusters_caught_up_check,
4738 installed_watch_sets: BTreeMap::new(),
4739 connection_watch_sets: BTreeMap::new(),
4740 cluster_replica_statuses: ClusterReplicaStatuses::new(),
4741 read_only_controllers,
4742 buffered_builtin_table_updates: Some(Vec::new()),
4743 license_key,
4744 user_id_pool: IdPool::empty(),
4745 persist_client,
4746 };
4747 let bootstrap = handle.block_on(async {
4748 coord
4749 .bootstrap(
4750 boot_ts,
4751 migrated_storage_collections_0dt,
4752 builtin_table_updates,
4753 cached_global_exprs,
4754 uncached_local_exprs,
4755 audit_logs_iterator,
4756 )
4757 .await?;
4758 coord
4759 .controller
4760 .remove_orphaned_replicas(
4761 coord.catalog().get_next_user_replica_id().await?,
4762 coord.catalog().get_next_system_replica_id().await?,
4763 )
4764 .await
4765 .map_err(AdapterError::Orchestrator)?;
4766
4767 if let Some(retention_period) = storage_usage_retention_period {
4768 coord
4769 .prune_storage_usage_events_on_startup(retention_period)
4770 .await;
4771 }
4772
4773 coord.prune_arrangement_sizes_history_on_startup().await;
4774
4775 Ok(())
4776 });
4777 let ok = bootstrap.is_ok();
4778 drop(span);
4779 bootstrap_tx
4780 .send(bootstrap)
4781 .expect("bootstrap_rx is not dropped until it receives this message");
4782 if ok {
4783 handle.block_on(coord.serve(
4784 internal_cmd_rx,
4785 strict_serializable_reads_rx,
4786 cmd_rx,
4787 group_commit_rx,
4788 ));
4789 }
4790 })
4791 .expect("failed to create coordinator thread");
4792 match bootstrap_rx
4793 .await
4794 .expect("bootstrap_tx always sends a message or panics/halts")
4795 {
4796 Ok(()) => {
4797 info!(
4798 "startup: coordinator init: coordinator thread start complete in {:?}",
4799 coord_thread_start.elapsed()
4800 );
4801 info!(
4802 "startup: coordinator init: complete in {:?}",
4803 coord_start.elapsed()
4804 );
4805 let handle = Handle {
4806 session_id,
4807 start_instant,
4808 _thread: thread.join_on_drop(),
4809 };
4810 let client = Client::new(
4811 build_info,
4812 cmd_tx,
4813 metrics_clone,
4814 now,
4815 environment_id,
4816 segment_client_clone,
4817 );
4818 Ok((handle, client))
4819 }
4820 Err(e) => Err(e),
4821 }
4822 }
4823 .boxed()
4824}
4825
4826async fn get_initial_oracle_timestamps(
4840 timestamp_oracle_config: &Option<TimestampOracleConfig>,
4841) -> Result<BTreeMap<Timeline, Timestamp>, AdapterError> {
4842 let mut initial_timestamps = BTreeMap::new();
4843
4844 if let Some(config) = timestamp_oracle_config {
4845 let oracle_timestamps = config.get_all_timelines().await?;
4846
4847 let debug_msg = || {
4848 oracle_timestamps
4849 .iter()
4850 .map(|(timeline, ts)| format!("{:?} -> {}", timeline, ts))
4851 .join(", ")
4852 };
4853 info!(
4854 "current timestamps from the timestamp oracle: {}",
4855 debug_msg()
4856 );
4857
4858 for (timeline, ts) in oracle_timestamps {
4859 let entry = initial_timestamps
4860 .entry(Timeline::from_str(&timeline).expect("could not parse timeline"));
4861
4862 entry
4863 .and_modify(|current_ts| *current_ts = std::cmp::max(*current_ts, ts))
4864 .or_insert(ts);
4865 }
4866 } else {
4867 info!("no timestamp oracle configured!");
4868 };
4869
4870 let debug_msg = || {
4871 initial_timestamps
4872 .iter()
4873 .map(|(timeline, ts)| format!("{:?}: {}", timeline, ts))
4874 .join(", ")
4875 };
4876 info!("initial oracle timestamps: {}", debug_msg());
4877
4878 Ok(initial_timestamps)
4879}
4880
4881#[instrument]
4882pub async fn load_remote_system_parameters(
4883 storage: &mut Box<dyn OpenableDurableCatalogState>,
4884 system_parameter_sync_config: Option<SystemParameterSyncConfig>,
4885 system_parameter_sync_timeout: Duration,
4886) -> Result<Option<BTreeMap<String, String>>, AdapterError> {
4887 if let Some(system_parameter_sync_config) = system_parameter_sync_config {
4888 tracing::info!("parameter sync on boot: start sync");
4889
4890 let mut params = SynchronizedParameters::new(SystemVars::default());
4930 let frontend_sync = async {
4931 let frontend = SystemParameterFrontend::from(&system_parameter_sync_config).await?;
4932 frontend.pull(&mut params);
4933 let ops = params
4934 .modified()
4935 .into_iter()
4936 .map(|param| {
4937 let name = param.name;
4938 let value = param.value;
4939 tracing::info!(name, value, initial = true, "sync parameter");
4940 (name, value)
4941 })
4942 .collect();
4943 tracing::info!("parameter sync on boot: end sync");
4944 Ok(Some(ops))
4945 };
4946 if !storage.has_system_config_synced_once().await? {
4947 frontend_sync.await
4948 } else {
4949 match mz_ore::future::timeout(system_parameter_sync_timeout, frontend_sync).await {
4950 Ok(ops) => Ok(ops),
4951 Err(TimeoutError::Inner(e)) => Err(e),
4952 Err(TimeoutError::DeadlineElapsed) => {
4953 tracing::info!("parameter sync on boot: sync has timed out");
4954 Ok(None)
4955 }
4956 }
4957 }
4958 } else {
4959 Ok(None)
4960 }
4961}
4962
4963#[derive(Debug)]
4964pub enum WatchSetResponse {
4965 StatementDependenciesReady(StatementLoggingId, StatementLifecycleEvent),
4966 AlterSinkReady(AlterSinkReadyContext),
4967 AlterMaterializedViewReady(AlterMaterializedViewReadyContext),
4968}
4969
4970#[derive(Debug)]
4971pub struct AlterSinkReadyContext {
4972 ctx: Option<ExecuteContext>,
4973 otel_ctx: OpenTelemetryContext,
4974 plan: AlterSinkPlan,
4975 plan_validity: PlanValidity,
4976 read_hold: ReadHolds,
4977}
4978
4979impl AlterSinkReadyContext {
4980 fn ctx(&mut self) -> &mut ExecuteContext {
4981 self.ctx.as_mut().expect("only cleared on drop")
4982 }
4983
4984 fn retire(mut self, result: Result<ExecuteResponse, AdapterError>) {
4985 self.ctx
4986 .take()
4987 .expect("only cleared on drop")
4988 .retire(result);
4989 }
4990}
4991
4992impl Drop for AlterSinkReadyContext {
4993 fn drop(&mut self) {
4994 if let Some(ctx) = self.ctx.take() {
4995 ctx.retire(Err(AdapterError::Canceled));
4996 }
4997 }
4998}
4999
5000#[derive(Debug)]
5001pub struct AlterMaterializedViewReadyContext {
5002 ctx: Option<ExecuteContext>,
5003 otel_ctx: OpenTelemetryContext,
5004 plan: plan::AlterMaterializedViewApplyReplacementPlan,
5005 plan_validity: PlanValidity,
5006}
5007
5008impl AlterMaterializedViewReadyContext {
5009 fn ctx(&mut self) -> &mut ExecuteContext {
5010 self.ctx.as_mut().expect("only cleared on drop")
5011 }
5012
5013 fn retire(mut self, result: Result<ExecuteResponse, AdapterError>) {
5014 self.ctx
5015 .take()
5016 .expect("only cleared on drop")
5017 .retire(result);
5018 }
5019}
5020
5021impl Drop for AlterMaterializedViewReadyContext {
5022 fn drop(&mut self) {
5023 if let Some(ctx) = self.ctx.take() {
5024 ctx.retire(Err(AdapterError::Canceled));
5025 }
5026 }
5027}
5028
5029#[derive(Debug)]
5032struct LockedVecDeque<T> {
5033 items: VecDeque<T>,
5034 lock: Arc<tokio::sync::Mutex<()>>,
5035}
5036
5037impl<T> LockedVecDeque<T> {
5038 pub fn new() -> Self {
5039 Self {
5040 items: VecDeque::new(),
5041 lock: Arc::new(tokio::sync::Mutex::new(())),
5042 }
5043 }
5044
5045 pub fn try_lock_owned(&self) -> Result<OwnedMutexGuard<()>, tokio::sync::TryLockError> {
5046 Arc::clone(&self.lock).try_lock_owned()
5047 }
5048
5049 pub fn is_empty(&self) -> bool {
5050 self.items.is_empty()
5051 }
5052
5053 pub fn push_back(&mut self, value: T) {
5054 self.items.push_back(value)
5055 }
5056
5057 pub fn pop_front(&mut self) -> Option<T> {
5058 self.items.pop_front()
5059 }
5060
5061 pub fn remove(&mut self, index: usize) -> Option<T> {
5062 self.items.remove(index)
5063 }
5064
5065 pub fn iter(&self) -> std::collections::vec_deque::Iter<'_, T> {
5066 self.items.iter()
5067 }
5068}
5069
5070#[derive(Debug)]
5071struct DeferredPlanStatement {
5072 ctx: ExecuteContext,
5073 ps: PlanStatement,
5074}
5075
5076#[derive(Debug)]
5077enum PlanStatement {
5078 Statement {
5079 stmt: Arc<Statement<Raw>>,
5080 params: Params,
5081 },
5082 Plan {
5083 plan: mz_sql::plan::Plan,
5084 resolved_ids: ResolvedIds,
5085 sql_impl_resolved_ids: ResolvedIds,
5086 },
5087}
5088
5089#[derive(Debug, Error)]
5090pub enum NetworkPolicyError {
5091 #[error("Access denied for address {0}")]
5092 AddressDenied(IpAddr),
5093 #[error("Access denied missing IP address")]
5094 MissingIp,
5095}
5096
5097pub(crate) fn validate_ip_with_policy_rules(
5098 ip: &IpAddr,
5099 rules: &Vec<NetworkPolicyRule>,
5100) -> Result<(), NetworkPolicyError> {
5101 if rules.iter().any(|r| r.address.0.contains(ip)) {
5104 Ok(())
5105 } else {
5106 Err(NetworkPolicyError::AddressDenied(ip.clone()))
5107 }
5108}
5109
5110pub(crate) fn infer_sql_type_for_catalog(
5111 hir_expr: &HirRelationExpr,
5112 mir_expr: &MirRelationExpr,
5113) -> SqlRelationType {
5114 let mut typ = hir_expr.top_level_typ();
5115 typ.backport_nullability_and_keys(&mir_expr.typ());
5116 typ
5117}
5118
5119#[cfg(test)]
5120mod id_pool_tests {
5121 use super::IdPool;
5122
5123 #[mz_ore::test]
5124 fn test_empty_pool() {
5125 let mut pool = IdPool::empty();
5126 assert_eq!(pool.remaining(), 0);
5127 assert_eq!(pool.allocate(), None);
5128 assert_eq!(pool.allocate_many(1), None);
5129 }
5130
5131 #[mz_ore::test]
5132 fn test_allocate_single() {
5133 let mut pool = IdPool::empty();
5134 pool.refill(10, 13);
5135 assert_eq!(pool.remaining(), 3);
5136 assert_eq!(pool.allocate(), Some(10));
5137 assert_eq!(pool.allocate(), Some(11));
5138 assert_eq!(pool.allocate(), Some(12));
5139 assert_eq!(pool.remaining(), 0);
5140 assert_eq!(pool.allocate(), None);
5141 }
5142
5143 #[mz_ore::test]
5144 fn test_allocate_many() {
5145 let mut pool = IdPool::empty();
5146 pool.refill(100, 105);
5147 assert_eq!(pool.allocate_many(3), Some(vec![100, 101, 102]));
5148 assert_eq!(pool.remaining(), 2);
5149 assert_eq!(pool.allocate_many(3), None);
5151 assert_eq!(pool.allocate_many(2), Some(vec![103, 104]));
5153 assert_eq!(pool.remaining(), 0);
5154 }
5155
5156 #[mz_ore::test]
5157 fn test_allocate_many_zero() {
5158 let mut pool = IdPool::empty();
5159 pool.refill(1, 5);
5160 assert_eq!(pool.allocate_many(0), Some(vec![]));
5161 assert_eq!(pool.remaining(), 4);
5162 }
5163
5164 #[mz_ore::test]
5165 fn test_refill_resets_pool() {
5166 let mut pool = IdPool::empty();
5167 pool.refill(0, 2);
5168 assert_eq!(pool.allocate(), Some(0));
5169 pool.refill(50, 52);
5171 assert_eq!(pool.allocate(), Some(50));
5172 assert_eq!(pool.allocate(), Some(51));
5173 assert_eq!(pool.allocate(), None);
5174 }
5175
5176 #[mz_ore::test]
5177 fn test_mixed_allocate_and_allocate_many() {
5178 let mut pool = IdPool::empty();
5179 pool.refill(0, 10);
5180 assert_eq!(pool.allocate(), Some(0));
5181 assert_eq!(pool.allocate_many(3), Some(vec![1, 2, 3]));
5182 assert_eq!(pool.allocate(), Some(4));
5183 assert_eq!(pool.remaining(), 5);
5184 }
5185
5186 #[mz_ore::test]
5187 #[should_panic(expected = "invalid pool range")]
5188 fn test_refill_invalid_range_panics() {
5189 let mut pool = IdPool::empty();
5190 pool.refill(10, 5);
5191 }
5192}
5193
5194#[cfg(test)]
5195mod arrangement_sizes_pruner_tests {
5196 use mz_repr::catalog_item_id::CatalogItemId;
5197 use mz_repr::{Datum, Row};
5198
5199 use super::arrangement_sizes_expired_retractions;
5200
5201 fn history_row(ts_ms: i64) -> Row {
5205 let dt = mz_ore::now::to_datetime(ts_ms.try_into().expect("non-negative"));
5206 Row::pack_slice(&[
5207 Datum::String("r1"),
5208 Datum::String("u1"),
5209 Datum::Int64(123),
5210 Datum::TimestampTz(dt.try_into().expect("fits in TimestampTz")),
5211 ])
5212 }
5213
5214 fn item_id() -> CatalogItemId {
5215 CatalogItemId::User(42)
5217 }
5218
5219 #[mz_ore::test]
5220 fn empty_input_produces_no_retractions() {
5221 let out = arrangement_sizes_expired_retractions(Vec::new(), 1_000, item_id());
5222 assert!(out.is_empty());
5223 }
5224
5225 #[mz_ore::test]
5226 fn retracts_only_rows_strictly_before_cutoff() {
5227 let rows = vec![
5230 (history_row(100), 1),
5231 (history_row(500), 1),
5232 (history_row(1_000), 1), (history_row(5_000), 1),
5234 ];
5235 let out = arrangement_sizes_expired_retractions(rows, 1_000, item_id());
5236 assert_eq!(out.len(), 2);
5237 }
5238
5239 #[mz_ore::test]
5240 #[should_panic(expected = "consolidated contents should not contain retractions")]
5241 fn retraction_in_input_panics() {
5242 let rows = vec![(history_row(100), -1)];
5243 let _ = arrangement_sizes_expired_retractions(rows, 1_000, item_id());
5244 }
5245}