1use std::borrow::Cow;
70use std::collections::{BTreeMap, BTreeSet, VecDeque};
71use std::fmt;
72use std::net::IpAddr;
73use std::num::NonZeroI64;
74use std::ops::Neg;
75use std::str::FromStr;
76use std::sync::LazyLock;
77use std::sync::{Arc, Mutex};
78use std::thread;
79use std::time::{Duration, Instant};
80
81use anyhow::Context;
82use chrono::{DateTime, Utc};
83use derivative::Derivative;
84use differential_dataflow::lattice::Lattice;
85use fail::fail_point;
86use futures::StreamExt;
87use futures::future::{BoxFuture, FutureExt, LocalBoxFuture};
88use http::Uri;
89use ipnet::IpNet;
90use itertools::{Either, Itertools};
91use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
92use mz_adapter_types::compaction::CompactionWindow;
93use mz_adapter_types::connection::ConnectionId;
94use mz_adapter_types::dyncfgs::{
95 USER_ID_POOL_BATCH_SIZE, WITH_0DT_DEPLOYMENT_CAUGHT_UP_CHECK_INTERVAL,
96};
97use mz_auth::password::Password;
98use mz_build_info::BuildInfo;
99use mz_catalog::builtin::{BUILTINS, BUILTINS_STATIC, MZ_AUDIT_EVENTS, MZ_STORAGE_USAGE_BY_SHARD};
100use mz_catalog::config::{AwsPrincipalContext, BuiltinItemMigrationConfig, ClusterReplicaSizeMap};
101use mz_catalog::durable::{AuditLogIterator, OpenableDurableCatalogState};
102use mz_catalog::expr_cache::{GlobalExpressions, LocalExpressions};
103use mz_catalog::memory::objects::{
104 CatalogEntry, CatalogItem, ClusterReplicaProcessStatus, ClusterVariantManaged, Connection,
105 DataSourceDesc, StateDiff, StateUpdate, StateUpdateKind, Table, TableDataSource,
106};
107use mz_cloud_resources::{CloudResourceController, VpcEndpointConfig, VpcEndpointEvent};
108use mz_compute_client::as_of_selection;
109use mz_compute_client::controller::error::{
110 CollectionLookupError, CollectionMissing, DataflowCreationError, InstanceMissing,
111};
112use mz_compute_types::ComputeInstanceId;
113use mz_compute_types::dataflows::DataflowDescription;
114use mz_compute_types::plan::Plan;
115use mz_controller::clusters::{
116 ClusterConfig, ClusterEvent, ClusterStatus, ProcessId, ReplicaLocation,
117};
118use mz_controller::{ControllerConfig, Readiness};
119use mz_controller_types::{ClusterId, ReplicaId, WatchSetId};
120use mz_expr::{MapFilterProject, MirRelationExpr, OptimizedMirRelationExpr, RowSetFinishing};
121use mz_license_keys::ValidatedLicenseKey;
122use mz_orchestrator::OfflineReason;
123use mz_ore::cast::{CastFrom, CastInto, CastLossy};
124use mz_ore::channel::trigger::Trigger;
125use mz_ore::future::TimeoutError;
126use mz_ore::metrics::MetricsRegistry;
127use mz_ore::now::{EpochMillis, NowFn};
128use mz_ore::task::{JoinHandle, spawn};
129use mz_ore::thread::JoinHandleExt;
130use mz_ore::tracing::{OpenTelemetryContext, TracingHandle};
131use mz_ore::url::SensitiveUrl;
132use mz_ore::{assert_none, instrument, soft_assert_eq_or_log, soft_assert_or_log, stack};
133use mz_persist_client::PersistClient;
134use mz_persist_client::batch::ProtoBatch;
135use mz_persist_client::usage::{ShardsUsageReferenced, StorageUsageClient};
136use mz_repr::adt::numeric::Numeric;
137use mz_repr::explain::{ExplainConfig, ExplainFormat};
138use mz_repr::global_id::TransientIdGen;
139use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
140use mz_repr::role_id::RoleId;
141use mz_repr::{CatalogItemId, Diff, GlobalId, RelationDesc, SqlRelationType, Timestamp};
142use mz_secrets::cache::CachingSecretsReader;
143use mz_secrets::{SecretsController, SecretsReader};
144use mz_sql::ast::{Raw, Statement};
145use mz_sql::catalog::{CatalogCluster, EnvironmentId};
146use mz_sql::names::{QualifiedItemName, ResolvedIds, SchemaSpecifier};
147use mz_sql::optimizer_metrics::OptimizerMetrics;
148use mz_sql::plan::{
149 self, AlterSinkPlan, ConnectionDetails, CreateConnectionPlan, HirRelationExpr,
150 NetworkPolicyRule, OnTimeoutAction, Params, QueryWhen,
151};
152use mz_sql::session::user::User;
153use mz_sql::session::vars::{MAX_CREDIT_CONSUMPTION_RATE, SystemVars, Var};
154use mz_sql_parser::ast::ExplainStage;
155use mz_sql_parser::ast::display::AstDisplay;
156use mz_storage_client::client::TableData;
157use mz_storage_client::controller::{CollectionDescription, DataSource, ExportDescription};
158use mz_storage_types::connections::Connection as StorageConnection;
159use mz_storage_types::connections::ConnectionContext;
160use mz_storage_types::connections::inline::{IntoInlineConnection, ReferencedConnection};
161use mz_storage_types::read_holds::ReadHold;
162use mz_storage_types::sinks::{S3SinkFormat, StorageSinkDesc};
163use mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC;
164use mz_storage_types::sources::{IngestionDescription, SourceExport, Timeline};
165use mz_timestamp_oracle::{TimestampOracleConfig, WriteTimestamp};
166use mz_transform::dataflow::DataflowMetainfo;
167use opentelemetry::trace::TraceContextExt;
168use serde::Serialize;
169use thiserror::Error;
170use timely::progress::{Antichain, Timestamp as _};
171use tokio::runtime::Handle as TokioHandle;
172use tokio::select;
173use tokio::sync::{OwnedMutexGuard, mpsc, oneshot, watch};
174use tokio::time::{Interval, MissedTickBehavior};
175use tracing::{Instrument, Level, Span, debug, info, info_span, span, warn};
176use tracing_opentelemetry::OpenTelemetrySpanExt;
177use uuid::Uuid;
178
179use crate::active_compute_sink::{ActiveComputeSink, ActiveCopyFrom};
180use crate::catalog::{BuiltinTableUpdate, Catalog, OpenCatalogResult};
181use crate::client::{Client, Handle};
182use crate::command::{Command, ExecuteResponse};
183use crate::config::{SynchronizedParameters, SystemParameterFrontend, SystemParameterSyncConfig};
184use crate::coord::appends::{
185 BuiltinTableAppendNotify, DeferredOp, GroupCommitPermit, PendingWriteTxn,
186};
187use crate::coord::caught_up::CaughtUpCheckContext;
188use crate::coord::cluster_scheduling::SchedulingDecision;
189use crate::coord::id_bundle::CollectionIdBundle;
190use crate::coord::introspection::IntrospectionSubscribe;
191use crate::coord::peek::PendingPeek;
192use crate::coord::statement_logging::StatementLogging;
193use crate::coord::timeline::{TimelineContext, TimelineState};
194use crate::coord::timestamp_selection::{TimestampContext, TimestampDetermination};
195use crate::coord::validity::PlanValidity;
196use crate::error::AdapterError;
197use crate::explain::insights::PlanInsightsContext;
198use crate::explain::optimizer_trace::{DispatchGuard, OptimizerTrace};
199use crate::metrics::Metrics;
200use crate::optimize::dataflows::{ComputeInstanceSnapshot, DataflowBuilder};
201use crate::optimize::{self, Optimize, OptimizerConfig};
202use crate::session::{EndTransactionAction, Session};
203use crate::statement_logging::{
204 StatementEndedExecutionReason, StatementLifecycleEvent, StatementLoggingId,
205};
206use crate::util::{ClientTransmitter, ResultExt, sort_topological};
207use crate::webhook::{WebhookAppenderInvalidator, WebhookConcurrencyLimiter};
208use crate::{AdapterNotice, ReadHolds, flags};
209
210pub(crate) mod appends;
211pub(crate) mod catalog_serving;
212pub(crate) mod cluster_scheduling;
213pub(crate) mod consistency;
214pub(crate) mod id_bundle;
215pub(crate) mod in_memory_oracle;
216pub(crate) mod peek;
217pub(crate) mod read_policy;
218pub(crate) mod sequencer;
219pub(crate) mod statement_logging;
220pub(crate) mod timeline;
221pub(crate) mod timestamp_selection;
222
223pub mod catalog_implications;
224mod caught_up;
225mod command_handler;
226mod ddl;
227mod indexes;
228mod introspection;
229mod message_handler;
230mod privatelink_status;
231mod sql;
232mod validity;
233
234#[derive(Debug)]
260pub(crate) struct IdPool {
261 next: u64,
262 upper: u64,
263}
264
265impl IdPool {
266 pub fn empty() -> Self {
268 IdPool { next: 0, upper: 0 }
269 }
270
271 pub fn allocate(&mut self) -> Option<u64> {
273 if self.next < self.upper {
274 let id = self.next;
275 self.next += 1;
276 Some(id)
277 } else {
278 None
279 }
280 }
281
282 pub fn allocate_many(&mut self, n: u64) -> Option<Vec<u64>> {
285 if self.remaining() >= n {
286 let ids = (self.next..self.next + n).collect();
287 self.next += n;
288 Some(ids)
289 } else {
290 None
291 }
292 }
293
294 pub fn remaining(&self) -> u64 {
296 self.upper - self.next
297 }
298
299 pub fn refill(&mut self, next: u64, upper: u64) {
301 assert!(next <= upper, "invalid pool range: {next}..{upper}");
302 self.next = next;
303 self.upper = upper;
304 }
305}
306
307#[derive(Debug)]
308pub enum Message {
309 Command(OpenTelemetryContext, Command),
310 ControllerReady {
311 controller: ControllerReadiness,
312 },
313 PurifiedStatementReady(PurifiedStatementReady),
314 CreateConnectionValidationReady(CreateConnectionValidationReady),
315 AlterConnectionValidationReady(AlterConnectionValidationReady),
316 TryDeferred {
317 conn_id: ConnectionId,
319 acquired_lock: Option<(CatalogItemId, tokio::sync::OwnedMutexGuard<()>)>,
329 },
330 GroupCommitInitiate(Span, Option<GroupCommitPermit>),
332 DeferredStatementReady,
333 AdvanceTimelines,
334 ClusterEvent(ClusterEvent),
335 CancelPendingPeeks {
336 conn_id: ConnectionId,
337 },
338 LinearizeReads,
339 StagedBatches {
340 conn_id: ConnectionId,
341 table_id: CatalogItemId,
342 batches: Vec<Result<ProtoBatch, String>>,
343 },
344 StorageUsageSchedule,
345 StorageUsageFetch,
346 StorageUsageUpdate(ShardsUsageReferenced),
347 StorageUsagePrune(Vec<BuiltinTableUpdate>),
348 RetireExecute {
351 data: ExecuteContextExtra,
352 otel_ctx: OpenTelemetryContext,
353 reason: StatementEndedExecutionReason,
354 },
355 ExecuteSingleStatementTransaction {
356 ctx: ExecuteContext,
357 otel_ctx: OpenTelemetryContext,
358 stmt: Arc<Statement<Raw>>,
359 params: mz_sql::plan::Params,
360 },
361 PeekStageReady {
362 ctx: ExecuteContext,
363 span: Span,
364 stage: PeekStage,
365 },
366 CreateIndexStageReady {
367 ctx: ExecuteContext,
368 span: Span,
369 stage: CreateIndexStage,
370 },
371 CreateViewStageReady {
372 ctx: ExecuteContext,
373 span: Span,
374 stage: CreateViewStage,
375 },
376 CreateMaterializedViewStageReady {
377 ctx: ExecuteContext,
378 span: Span,
379 stage: CreateMaterializedViewStage,
380 },
381 SubscribeStageReady {
382 ctx: ExecuteContext,
383 span: Span,
384 stage: SubscribeStage,
385 },
386 IntrospectionSubscribeStageReady {
387 span: Span,
388 stage: IntrospectionSubscribeStage,
389 },
390 SecretStageReady {
391 ctx: ExecuteContext,
392 span: Span,
393 stage: SecretStage,
394 },
395 ClusterStageReady {
396 ctx: ExecuteContext,
397 span: Span,
398 stage: ClusterStage,
399 },
400 ExplainTimestampStageReady {
401 ctx: ExecuteContext,
402 span: Span,
403 stage: ExplainTimestampStage,
404 },
405 DrainStatementLog,
406 PrivateLinkVpcEndpointEvents(Vec<VpcEndpointEvent>),
407 CheckSchedulingPolicies,
408
409 SchedulingDecisions(Vec<(&'static str, Vec<(ClusterId, SchedulingDecision)>)>),
414}
415
416impl Message {
417 pub const fn kind(&self) -> &'static str {
419 match self {
420 Message::Command(_, msg) => match msg {
421 Command::CatalogSnapshot { .. } => "command-catalog_snapshot",
422 Command::Startup { .. } => "command-startup",
423 Command::Execute { .. } => "command-execute",
424 Command::Commit { .. } => "command-commit",
425 Command::CancelRequest { .. } => "command-cancel_request",
426 Command::PrivilegedCancelRequest { .. } => "command-privileged_cancel_request",
427 Command::GetWebhook { .. } => "command-get_webhook",
428 Command::GetSystemVars { .. } => "command-get_system_vars",
429 Command::SetSystemVars { .. } => "command-set_system_vars",
430 Command::Terminate { .. } => "command-terminate",
431 Command::RetireExecute { .. } => "command-retire_execute",
432 Command::CheckConsistency { .. } => "command-check_consistency",
433 Command::Dump { .. } => "command-dump",
434 Command::AuthenticatePassword { .. } => "command-auth_check",
435 Command::AuthenticateGetSASLChallenge { .. } => "command-auth_get_sasl_challenge",
436 Command::AuthenticateVerifySASLProof { .. } => "command-auth_verify_sasl_proof",
437 Command::CheckRoleCanLogin { .. } => "command-check_role_can_login",
438 Command::GetComputeInstanceClient { .. } => "get-compute-instance-client",
439 Command::GetOracle { .. } => "get-oracle",
440 Command::DetermineRealTimeRecentTimestamp { .. } => {
441 "determine-real-time-recent-timestamp"
442 }
443 Command::GetTransactionReadHoldsBundle { .. } => {
444 "get-transaction-read-holds-bundle"
445 }
446 Command::StoreTransactionReadHolds { .. } => "store-transaction-read-holds",
447 Command::ExecuteSlowPathPeek { .. } => "execute-slow-path-peek",
448 Command::ExecuteSubscribe { .. } => "execute-subscribe",
449 Command::CopyToPreflight { .. } => "copy-to-preflight",
450 Command::ExecuteCopyTo { .. } => "execute-copy-to",
451 Command::ExecuteSideEffectingFunc { .. } => "execute-side-effecting-func",
452 Command::RegisterFrontendPeek { .. } => "register-frontend-peek",
453 Command::UnregisterFrontendPeek { .. } => "unregister-frontend-peek",
454 Command::ExplainTimestamp { .. } => "explain-timestamp",
455 Command::FrontendStatementLogging(..) => "frontend-statement-logging",
456 Command::StartCopyFromStdin { .. } => "start-copy-from-stdin",
457 Command::InjectAuditEvents { .. } => "inject-audit-events",
458 },
459 Message::ControllerReady {
460 controller: ControllerReadiness::Compute,
461 } => "controller_ready(compute)",
462 Message::ControllerReady {
463 controller: ControllerReadiness::Storage,
464 } => "controller_ready(storage)",
465 Message::ControllerReady {
466 controller: ControllerReadiness::Metrics,
467 } => "controller_ready(metrics)",
468 Message::ControllerReady {
469 controller: ControllerReadiness::Internal,
470 } => "controller_ready(internal)",
471 Message::PurifiedStatementReady(_) => "purified_statement_ready",
472 Message::CreateConnectionValidationReady(_) => "create_connection_validation_ready",
473 Message::TryDeferred { .. } => "try_deferred",
474 Message::GroupCommitInitiate(..) => "group_commit_initiate",
475 Message::AdvanceTimelines => "advance_timelines",
476 Message::ClusterEvent(_) => "cluster_event",
477 Message::CancelPendingPeeks { .. } => "cancel_pending_peeks",
478 Message::LinearizeReads => "linearize_reads",
479 Message::StagedBatches { .. } => "staged_batches",
480 Message::StorageUsageSchedule => "storage_usage_schedule",
481 Message::StorageUsageFetch => "storage_usage_fetch",
482 Message::StorageUsageUpdate(_) => "storage_usage_update",
483 Message::StorageUsagePrune(_) => "storage_usage_prune",
484 Message::RetireExecute { .. } => "retire_execute",
485 Message::ExecuteSingleStatementTransaction { .. } => {
486 "execute_single_statement_transaction"
487 }
488 Message::PeekStageReady { .. } => "peek_stage_ready",
489 Message::ExplainTimestampStageReady { .. } => "explain_timestamp_stage_ready",
490 Message::CreateIndexStageReady { .. } => "create_index_stage_ready",
491 Message::CreateViewStageReady { .. } => "create_view_stage_ready",
492 Message::CreateMaterializedViewStageReady { .. } => {
493 "create_materialized_view_stage_ready"
494 }
495 Message::SubscribeStageReady { .. } => "subscribe_stage_ready",
496 Message::IntrospectionSubscribeStageReady { .. } => {
497 "introspection_subscribe_stage_ready"
498 }
499 Message::SecretStageReady { .. } => "secret_stage_ready",
500 Message::ClusterStageReady { .. } => "cluster_stage_ready",
501 Message::DrainStatementLog => "drain_statement_log",
502 Message::AlterConnectionValidationReady(..) => "alter_connection_validation_ready",
503 Message::PrivateLinkVpcEndpointEvents(_) => "private_link_vpc_endpoint_events",
504 Message::CheckSchedulingPolicies => "check_scheduling_policies",
505 Message::SchedulingDecisions { .. } => "scheduling_decision",
506 Message::DeferredStatementReady => "deferred_statement_ready",
507 }
508 }
509}
510
511#[derive(Debug)]
513pub enum ControllerReadiness {
514 Storage,
516 Compute,
518 Metrics,
520 Internal,
522}
523
524#[derive(Derivative)]
525#[derivative(Debug)]
526pub struct BackgroundWorkResult<T> {
527 #[derivative(Debug = "ignore")]
528 pub ctx: ExecuteContext,
529 pub result: Result<T, AdapterError>,
530 pub params: Params,
531 pub plan_validity: PlanValidity,
532 pub original_stmt: Arc<Statement<Raw>>,
533 pub otel_ctx: OpenTelemetryContext,
534}
535
536pub type PurifiedStatementReady = BackgroundWorkResult<mz_sql::pure::PurifiedStatement>;
537
538#[derive(Derivative)]
539#[derivative(Debug)]
540pub struct ValidationReady<T> {
541 #[derivative(Debug = "ignore")]
542 pub ctx: ExecuteContext,
543 pub result: Result<T, AdapterError>,
544 pub resolved_ids: ResolvedIds,
545 pub connection_id: CatalogItemId,
546 pub connection_gid: GlobalId,
547 pub plan_validity: PlanValidity,
548 pub otel_ctx: OpenTelemetryContext,
549}
550
551pub type CreateConnectionValidationReady = ValidationReady<CreateConnectionPlan>;
552pub type AlterConnectionValidationReady = ValidationReady<Connection>;
553
554#[derive(Debug)]
555pub enum PeekStage {
556 LinearizeTimestamp(PeekStageLinearizeTimestamp),
558 RealTimeRecency(PeekStageRealTimeRecency),
559 TimestampReadHold(PeekStageTimestampReadHold),
560 Optimize(PeekStageOptimize),
561 Finish(PeekStageFinish),
563 ExplainPlan(PeekStageExplainPlan),
565 ExplainPushdown(PeekStageExplainPushdown),
566 CopyToPreflight(PeekStageCopyTo),
568 CopyToDataflow(PeekStageCopyTo),
570}
571
572#[derive(Debug)]
573pub struct CopyToContext {
574 pub desc: RelationDesc,
576 pub uri: Uri,
578 pub connection: StorageConnection<ReferencedConnection>,
580 pub connection_id: CatalogItemId,
582 pub format: S3SinkFormat,
584 pub max_file_size: u64,
586 pub output_batch_count: Option<u64>,
591}
592
593#[derive(Debug)]
594pub struct PeekStageLinearizeTimestamp {
595 validity: PlanValidity,
596 plan: mz_sql::plan::SelectPlan,
597 max_query_result_size: Option<u64>,
598 source_ids: BTreeSet<GlobalId>,
599 target_replica: Option<ReplicaId>,
600 timeline_context: TimelineContext,
601 optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
602 explain_ctx: ExplainContext,
605}
606
607#[derive(Debug)]
608pub struct PeekStageRealTimeRecency {
609 validity: PlanValidity,
610 plan: mz_sql::plan::SelectPlan,
611 max_query_result_size: Option<u64>,
612 source_ids: BTreeSet<GlobalId>,
613 target_replica: Option<ReplicaId>,
614 timeline_context: TimelineContext,
615 oracle_read_ts: Option<Timestamp>,
616 optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
617 explain_ctx: ExplainContext,
620}
621
622#[derive(Debug)]
623pub struct PeekStageTimestampReadHold {
624 validity: PlanValidity,
625 plan: mz_sql::plan::SelectPlan,
626 max_query_result_size: Option<u64>,
627 source_ids: BTreeSet<GlobalId>,
628 target_replica: Option<ReplicaId>,
629 timeline_context: TimelineContext,
630 oracle_read_ts: Option<Timestamp>,
631 real_time_recency_ts: Option<mz_repr::Timestamp>,
632 optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
633 explain_ctx: ExplainContext,
636}
637
638#[derive(Debug)]
639pub struct PeekStageOptimize {
640 validity: PlanValidity,
641 plan: mz_sql::plan::SelectPlan,
642 max_query_result_size: Option<u64>,
643 source_ids: BTreeSet<GlobalId>,
644 id_bundle: CollectionIdBundle,
645 target_replica: Option<ReplicaId>,
646 determination: TimestampDetermination<mz_repr::Timestamp>,
647 optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
648 explain_ctx: ExplainContext,
651}
652
653#[derive(Debug)]
654pub struct PeekStageFinish {
655 validity: PlanValidity,
656 plan: mz_sql::plan::SelectPlan,
657 max_query_result_size: Option<u64>,
658 id_bundle: CollectionIdBundle,
659 target_replica: Option<ReplicaId>,
660 source_ids: BTreeSet<GlobalId>,
661 determination: TimestampDetermination<mz_repr::Timestamp>,
662 cluster_id: ComputeInstanceId,
663 finishing: RowSetFinishing,
664 plan_insights_optimizer_trace: Option<OptimizerTrace>,
667 insights_ctx: Option<Box<PlanInsightsContext>>,
668 global_lir_plan: optimize::peek::GlobalLirPlan,
669 optimization_finished_at: EpochMillis,
670}
671
672#[derive(Debug)]
673pub struct PeekStageCopyTo {
674 validity: PlanValidity,
675 optimizer: optimize::copy_to::Optimizer,
676 global_lir_plan: optimize::copy_to::GlobalLirPlan,
677 optimization_finished_at: EpochMillis,
678 source_ids: BTreeSet<GlobalId>,
679}
680
681#[derive(Debug)]
682pub struct PeekStageExplainPlan {
683 validity: PlanValidity,
684 optimizer: optimize::peek::Optimizer,
685 df_meta: DataflowMetainfo,
686 explain_ctx: ExplainPlanContext,
687 insights_ctx: Option<Box<PlanInsightsContext>>,
688}
689
690#[derive(Debug)]
691pub struct PeekStageExplainPushdown {
692 validity: PlanValidity,
693 determination: TimestampDetermination<mz_repr::Timestamp>,
694 imports: BTreeMap<GlobalId, MapFilterProject>,
695}
696
697#[derive(Debug)]
698pub enum CreateIndexStage {
699 Optimize(CreateIndexOptimize),
700 Finish(CreateIndexFinish),
701 Explain(CreateIndexExplain),
702}
703
704#[derive(Debug)]
705pub struct CreateIndexOptimize {
706 validity: PlanValidity,
707 plan: plan::CreateIndexPlan,
708 resolved_ids: ResolvedIds,
709 explain_ctx: ExplainContext,
712}
713
714#[derive(Debug)]
715pub struct CreateIndexFinish {
716 validity: PlanValidity,
717 item_id: CatalogItemId,
718 global_id: GlobalId,
719 plan: plan::CreateIndexPlan,
720 resolved_ids: ResolvedIds,
721 global_mir_plan: optimize::index::GlobalMirPlan,
722 global_lir_plan: optimize::index::GlobalLirPlan,
723 optimizer_features: OptimizerFeatures,
724}
725
726#[derive(Debug)]
727pub struct CreateIndexExplain {
728 validity: PlanValidity,
729 exported_index_id: GlobalId,
730 plan: plan::CreateIndexPlan,
731 df_meta: DataflowMetainfo,
732 explain_ctx: ExplainPlanContext,
733}
734
735#[derive(Debug)]
736pub enum CreateViewStage {
737 Optimize(CreateViewOptimize),
738 Finish(CreateViewFinish),
739 Explain(CreateViewExplain),
740}
741
742#[derive(Debug)]
743pub struct CreateViewOptimize {
744 validity: PlanValidity,
745 plan: plan::CreateViewPlan,
746 resolved_ids: ResolvedIds,
747 explain_ctx: ExplainContext,
750}
751
752#[derive(Debug)]
753pub struct CreateViewFinish {
754 validity: PlanValidity,
755 item_id: CatalogItemId,
757 global_id: GlobalId,
759 plan: plan::CreateViewPlan,
760 resolved_ids: ResolvedIds,
762 optimized_expr: OptimizedMirRelationExpr,
763}
764
765#[derive(Debug)]
766pub struct CreateViewExplain {
767 validity: PlanValidity,
768 id: GlobalId,
769 plan: plan::CreateViewPlan,
770 explain_ctx: ExplainPlanContext,
771}
772
773#[derive(Debug)]
774pub enum ExplainTimestampStage {
775 Optimize(ExplainTimestampOptimize),
776 RealTimeRecency(ExplainTimestampRealTimeRecency),
777 Finish(ExplainTimestampFinish),
778}
779
780#[derive(Debug)]
781pub struct ExplainTimestampOptimize {
782 validity: PlanValidity,
783 plan: plan::ExplainTimestampPlan,
784 cluster_id: ClusterId,
785}
786
787#[derive(Debug)]
788pub struct ExplainTimestampRealTimeRecency {
789 validity: PlanValidity,
790 format: ExplainFormat,
791 optimized_plan: OptimizedMirRelationExpr,
792 cluster_id: ClusterId,
793 when: QueryWhen,
794}
795
796#[derive(Debug)]
797pub struct ExplainTimestampFinish {
798 validity: PlanValidity,
799 format: ExplainFormat,
800 optimized_plan: OptimizedMirRelationExpr,
801 cluster_id: ClusterId,
802 source_ids: BTreeSet<GlobalId>,
803 when: QueryWhen,
804 real_time_recency_ts: Option<Timestamp>,
805}
806
807#[derive(Debug)]
808pub enum ClusterStage {
809 Alter(AlterCluster),
810 WaitForHydrated(AlterClusterWaitForHydrated),
811 Finalize(AlterClusterFinalize),
812}
813
814#[derive(Debug)]
815pub struct AlterCluster {
816 validity: PlanValidity,
817 plan: plan::AlterClusterPlan,
818}
819
820#[derive(Debug)]
821pub struct AlterClusterWaitForHydrated {
822 validity: PlanValidity,
823 plan: plan::AlterClusterPlan,
824 new_config: ClusterVariantManaged,
825 timeout_time: Instant,
826 on_timeout: OnTimeoutAction,
827}
828
829#[derive(Debug)]
830pub struct AlterClusterFinalize {
831 validity: PlanValidity,
832 plan: plan::AlterClusterPlan,
833 new_config: ClusterVariantManaged,
834}
835
836#[derive(Debug)]
837pub enum ExplainContext {
838 None,
840 Plan(ExplainPlanContext),
842 PlanInsightsNotice(OptimizerTrace),
845 Pushdown,
847}
848
849impl ExplainContext {
850 pub(crate) fn dispatch_guard(&self) -> Option<DispatchGuard<'_>> {
854 let optimizer_trace = match self {
855 ExplainContext::Plan(explain_ctx) => Some(&explain_ctx.optimizer_trace),
856 ExplainContext::PlanInsightsNotice(optimizer_trace) => Some(optimizer_trace),
857 _ => None,
858 };
859 optimizer_trace.map(|optimizer_trace| optimizer_trace.as_guard())
860 }
861
862 pub(crate) fn needs_cluster(&self) -> bool {
863 match self {
864 ExplainContext::None => true,
865 ExplainContext::Plan(..) => false,
866 ExplainContext::PlanInsightsNotice(..) => true,
867 ExplainContext::Pushdown => false,
868 }
869 }
870
871 pub(crate) fn needs_plan_insights(&self) -> bool {
872 matches!(
873 self,
874 ExplainContext::Plan(ExplainPlanContext {
875 stage: ExplainStage::PlanInsights,
876 ..
877 }) | ExplainContext::PlanInsightsNotice(_)
878 )
879 }
880}
881
882#[derive(Debug)]
883pub struct ExplainPlanContext {
884 pub broken: bool,
889 pub config: ExplainConfig,
890 pub format: ExplainFormat,
891 pub stage: ExplainStage,
892 pub replan: Option<GlobalId>,
893 pub desc: Option<RelationDesc>,
894 pub optimizer_trace: OptimizerTrace,
895}
896
897#[derive(Debug)]
898pub enum CreateMaterializedViewStage {
899 Optimize(CreateMaterializedViewOptimize),
900 Finish(CreateMaterializedViewFinish),
901 Explain(CreateMaterializedViewExplain),
902}
903
904#[derive(Debug)]
905pub struct CreateMaterializedViewOptimize {
906 validity: PlanValidity,
907 plan: plan::CreateMaterializedViewPlan,
908 resolved_ids: ResolvedIds,
909 explain_ctx: ExplainContext,
912}
913
914#[derive(Debug)]
915pub struct CreateMaterializedViewFinish {
916 item_id: CatalogItemId,
918 global_id: GlobalId,
920 validity: PlanValidity,
921 plan: plan::CreateMaterializedViewPlan,
922 resolved_ids: ResolvedIds,
923 local_mir_plan: optimize::materialized_view::LocalMirPlan,
924 global_mir_plan: optimize::materialized_view::GlobalMirPlan,
925 global_lir_plan: optimize::materialized_view::GlobalLirPlan,
926 optimizer_features: OptimizerFeatures,
927}
928
929#[derive(Debug)]
930pub struct CreateMaterializedViewExplain {
931 global_id: GlobalId,
932 validity: PlanValidity,
933 plan: plan::CreateMaterializedViewPlan,
934 df_meta: DataflowMetainfo,
935 explain_ctx: ExplainPlanContext,
936}
937
938#[derive(Debug)]
939pub enum SubscribeStage {
940 OptimizeMir(SubscribeOptimizeMir),
941 TimestampOptimizeLir(SubscribeTimestampOptimizeLir),
942 Finish(SubscribeFinish),
943 Explain(SubscribeExplain),
944}
945
946#[derive(Debug)]
947pub struct SubscribeOptimizeMir {
948 validity: PlanValidity,
949 plan: plan::SubscribePlan,
950 timeline: TimelineContext,
951 dependency_ids: BTreeSet<GlobalId>,
952 cluster_id: ComputeInstanceId,
953 replica_id: Option<ReplicaId>,
954 explain_ctx: ExplainContext,
957}
958
959#[derive(Debug)]
960pub struct SubscribeTimestampOptimizeLir {
961 validity: PlanValidity,
962 plan: plan::SubscribePlan,
963 timeline: TimelineContext,
964 optimizer: optimize::subscribe::Optimizer,
965 global_mir_plan: optimize::subscribe::GlobalMirPlan<optimize::subscribe::Unresolved>,
966 dependency_ids: BTreeSet<GlobalId>,
967 replica_id: Option<ReplicaId>,
968 explain_ctx: ExplainContext,
971}
972
973#[derive(Debug)]
974pub struct SubscribeFinish {
975 validity: PlanValidity,
976 cluster_id: ComputeInstanceId,
977 replica_id: Option<ReplicaId>,
978 plan: plan::SubscribePlan,
979 global_lir_plan: optimize::subscribe::GlobalLirPlan,
980 dependency_ids: BTreeSet<GlobalId>,
981}
982
983#[derive(Debug)]
984pub struct SubscribeExplain {
985 validity: PlanValidity,
986 optimizer: optimize::subscribe::Optimizer,
987 df_meta: DataflowMetainfo,
988 cluster_id: ComputeInstanceId,
989 explain_ctx: ExplainPlanContext,
990}
991
992#[derive(Debug)]
993pub enum IntrospectionSubscribeStage {
994 OptimizeMir(IntrospectionSubscribeOptimizeMir),
995 TimestampOptimizeLir(IntrospectionSubscribeTimestampOptimizeLir),
996 Finish(IntrospectionSubscribeFinish),
997}
998
999#[derive(Debug)]
1000pub struct IntrospectionSubscribeOptimizeMir {
1001 validity: PlanValidity,
1002 plan: plan::SubscribePlan,
1003 subscribe_id: GlobalId,
1004 cluster_id: ComputeInstanceId,
1005 replica_id: ReplicaId,
1006}
1007
1008#[derive(Debug)]
1009pub struct IntrospectionSubscribeTimestampOptimizeLir {
1010 validity: PlanValidity,
1011 optimizer: optimize::subscribe::Optimizer,
1012 global_mir_plan: optimize::subscribe::GlobalMirPlan<optimize::subscribe::Unresolved>,
1013 cluster_id: ComputeInstanceId,
1014 replica_id: ReplicaId,
1015}
1016
1017#[derive(Debug)]
1018pub struct IntrospectionSubscribeFinish {
1019 validity: PlanValidity,
1020 global_lir_plan: optimize::subscribe::GlobalLirPlan,
1021 read_holds: ReadHolds<Timestamp>,
1022 cluster_id: ComputeInstanceId,
1023 replica_id: ReplicaId,
1024}
1025
1026#[derive(Debug)]
1027pub enum SecretStage {
1028 CreateEnsure(CreateSecretEnsure),
1029 CreateFinish(CreateSecretFinish),
1030 RotateKeysEnsure(RotateKeysSecretEnsure),
1031 RotateKeysFinish(RotateKeysSecretFinish),
1032 Alter(AlterSecret),
1033}
1034
1035#[derive(Debug)]
1036pub struct CreateSecretEnsure {
1037 validity: PlanValidity,
1038 plan: plan::CreateSecretPlan,
1039}
1040
1041#[derive(Debug)]
1042pub struct CreateSecretFinish {
1043 validity: PlanValidity,
1044 item_id: CatalogItemId,
1045 global_id: GlobalId,
1046 plan: plan::CreateSecretPlan,
1047}
1048
1049#[derive(Debug)]
1050pub struct RotateKeysSecretEnsure {
1051 validity: PlanValidity,
1052 id: CatalogItemId,
1053}
1054
1055#[derive(Debug)]
1056pub struct RotateKeysSecretFinish {
1057 validity: PlanValidity,
1058 ops: Vec<crate::catalog::Op>,
1059}
1060
1061#[derive(Debug)]
1062pub struct AlterSecret {
1063 validity: PlanValidity,
1064 plan: plan::AlterSecretPlan,
1065}
1066
1067#[derive(Debug, Copy, Clone, PartialEq, Eq)]
1072pub enum TargetCluster {
1073 CatalogServer,
1075 Active,
1077 Transaction(ClusterId),
1079}
1080
1081pub(crate) enum StageResult<T> {
1083 Handle(JoinHandle<Result<T, AdapterError>>),
1085 HandleRetire(JoinHandle<Result<ExecuteResponse, AdapterError>>),
1087 Immediate(T),
1089 Response(ExecuteResponse),
1091}
1092
1093pub(crate) trait Staged: Send {
1095 type Ctx: StagedContext;
1096
1097 fn validity(&mut self) -> &mut PlanValidity;
1098
1099 async fn stage(
1101 self,
1102 coord: &mut Coordinator,
1103 ctx: &mut Self::Ctx,
1104 ) -> Result<StageResult<Box<Self>>, AdapterError>;
1105
1106 fn message(self, ctx: Self::Ctx, span: Span) -> Message;
1108
1109 fn cancel_enabled(&self) -> bool;
1111}
1112
1113pub trait StagedContext {
1114 fn retire(self, result: Result<ExecuteResponse, AdapterError>);
1115 fn session(&self) -> Option<&Session>;
1116}
1117
1118impl StagedContext for ExecuteContext {
1119 fn retire(self, result: Result<ExecuteResponse, AdapterError>) {
1120 self.retire(result);
1121 }
1122
1123 fn session(&self) -> Option<&Session> {
1124 Some(self.session())
1125 }
1126}
1127
1128impl StagedContext for () {
1129 fn retire(self, _result: Result<ExecuteResponse, AdapterError>) {}
1130
1131 fn session(&self) -> Option<&Session> {
1132 None
1133 }
1134}
1135
1136pub struct Config {
1138 pub controller_config: ControllerConfig,
1139 pub controller_envd_epoch: NonZeroI64,
1140 pub storage: Box<dyn mz_catalog::durable::DurableCatalogState>,
1141 pub audit_logs_iterator: AuditLogIterator,
1142 pub timestamp_oracle_url: Option<SensitiveUrl>,
1143 pub unsafe_mode: bool,
1144 pub all_features: bool,
1145 pub build_info: &'static BuildInfo,
1146 pub environment_id: EnvironmentId,
1147 pub metrics_registry: MetricsRegistry,
1148 pub now: NowFn,
1149 pub secrets_controller: Arc<dyn SecretsController>,
1150 pub cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
1151 pub availability_zones: Vec<String>,
1152 pub cluster_replica_sizes: ClusterReplicaSizeMap,
1153 pub builtin_system_cluster_config: BootstrapBuiltinClusterConfig,
1154 pub builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig,
1155 pub builtin_probe_cluster_config: BootstrapBuiltinClusterConfig,
1156 pub builtin_support_cluster_config: BootstrapBuiltinClusterConfig,
1157 pub builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig,
1158 pub system_parameter_defaults: BTreeMap<String, String>,
1159 pub storage_usage_client: StorageUsageClient,
1160 pub storage_usage_collection_interval: Duration,
1161 pub storage_usage_retention_period: Option<Duration>,
1162 pub segment_client: Option<mz_segment::Client>,
1163 pub egress_addresses: Vec<IpNet>,
1164 pub remote_system_parameters: Option<BTreeMap<String, String>>,
1165 pub aws_account_id: Option<String>,
1166 pub aws_privatelink_availability_zones: Option<Vec<String>>,
1167 pub connection_context: ConnectionContext,
1168 pub connection_limit_callback: Box<dyn Fn(u64, u64) -> () + Send + Sync + 'static>,
1169 pub webhook_concurrency_limit: WebhookConcurrencyLimiter,
1170 pub http_host_name: Option<String>,
1171 pub tracing_handle: TracingHandle,
1172 pub read_only_controllers: bool,
1176
1177 pub caught_up_trigger: Option<Trigger>,
1181
1182 pub helm_chart_version: Option<String>,
1183 pub license_key: ValidatedLicenseKey,
1184 pub external_login_password_mz_system: Option<Password>,
1185 pub force_builtin_schema_migration: Option<String>,
1186}
1187
1188#[derive(Debug, Serialize)]
1190pub struct ConnMeta {
1191 secret_key: u32,
1196 connected_at: EpochMillis,
1198 user: User,
1199 application_name: String,
1200 uuid: Uuid,
1201 conn_id: ConnectionId,
1202 client_ip: Option<IpAddr>,
1203
1204 drop_sinks: BTreeSet<GlobalId>,
1207
1208 #[serde(skip)]
1210 deferred_lock: Option<OwnedMutexGuard<()>>,
1211
1212 pending_cluster_alters: BTreeSet<ClusterId>,
1215
1216 #[serde(skip)]
1218 notice_tx: mpsc::UnboundedSender<AdapterNotice>,
1219
1220 authenticated_role: RoleId,
1224}
1225
1226impl ConnMeta {
1227 pub fn conn_id(&self) -> &ConnectionId {
1228 &self.conn_id
1229 }
1230
1231 pub fn user(&self) -> &User {
1232 &self.user
1233 }
1234
1235 pub fn application_name(&self) -> &str {
1236 &self.application_name
1237 }
1238
1239 pub fn authenticated_role_id(&self) -> &RoleId {
1240 &self.authenticated_role
1241 }
1242
1243 pub fn uuid(&self) -> Uuid {
1244 self.uuid
1245 }
1246
1247 pub fn client_ip(&self) -> Option<IpAddr> {
1248 self.client_ip
1249 }
1250
1251 pub fn connected_at(&self) -> EpochMillis {
1252 self.connected_at
1253 }
1254}
1255
1256#[derive(Debug)]
1257pub struct PendingTxn {
1259 ctx: ExecuteContext,
1261 response: Result<PendingTxnResponse, AdapterError>,
1263 action: EndTransactionAction,
1265}
1266
1267#[derive(Debug)]
1268pub enum PendingTxnResponse {
1270 Committed {
1272 params: BTreeMap<&'static str, String>,
1274 },
1275 Rolledback {
1277 params: BTreeMap<&'static str, String>,
1279 },
1280}
1281
1282impl PendingTxnResponse {
1283 pub fn extend_params(&mut self, p: impl IntoIterator<Item = (&'static str, String)>) {
1284 match self {
1285 PendingTxnResponse::Committed { params }
1286 | PendingTxnResponse::Rolledback { params } => params.extend(p),
1287 }
1288 }
1289}
1290
1291impl From<PendingTxnResponse> for ExecuteResponse {
1292 fn from(value: PendingTxnResponse) -> Self {
1293 match value {
1294 PendingTxnResponse::Committed { params } => {
1295 ExecuteResponse::TransactionCommitted { params }
1296 }
1297 PendingTxnResponse::Rolledback { params } => {
1298 ExecuteResponse::TransactionRolledBack { params }
1299 }
1300 }
1301 }
1302}
1303
1304#[derive(Debug)]
1305pub struct PendingReadTxn {
1307 txn: PendingRead,
1309 timestamp_context: TimestampContext<mz_repr::Timestamp>,
1311 created: Instant,
1313 num_requeues: u64,
1317 otel_ctx: OpenTelemetryContext,
1319}
1320
1321impl PendingReadTxn {
1322 pub fn timestamp_context(&self) -> &TimestampContext<mz_repr::Timestamp> {
1324 &self.timestamp_context
1325 }
1326
1327 pub(crate) fn take_context(self) -> ExecuteContext {
1328 self.txn.take_context()
1329 }
1330}
1331
1332#[derive(Debug)]
1333enum PendingRead {
1335 Read {
1336 txn: PendingTxn,
1338 },
1339 ReadThenWrite {
1340 ctx: ExecuteContext,
1342 tx: oneshot::Sender<Option<ExecuteContext>>,
1345 },
1346}
1347
1348impl PendingRead {
1349 #[instrument(level = "debug")]
1354 pub fn finish(self) -> Option<(ExecuteContext, Result<ExecuteResponse, AdapterError>)> {
1355 match self {
1356 PendingRead::Read {
1357 txn:
1358 PendingTxn {
1359 mut ctx,
1360 response,
1361 action,
1362 },
1363 ..
1364 } => {
1365 let changed = ctx.session_mut().vars_mut().end_transaction(action);
1366 let response = response.map(|mut r| {
1368 r.extend_params(changed);
1369 ExecuteResponse::from(r)
1370 });
1371
1372 Some((ctx, response))
1373 }
1374 PendingRead::ReadThenWrite { ctx, tx, .. } => {
1375 let _ = tx.send(Some(ctx));
1377 None
1378 }
1379 }
1380 }
1381
1382 fn label(&self) -> &'static str {
1383 match self {
1384 PendingRead::Read { .. } => "read",
1385 PendingRead::ReadThenWrite { .. } => "read_then_write",
1386 }
1387 }
1388
1389 pub(crate) fn take_context(self) -> ExecuteContext {
1390 match self {
1391 PendingRead::Read { txn, .. } => txn.ctx,
1392 PendingRead::ReadThenWrite { ctx, tx, .. } => {
1393 let _ = tx.send(None);
1396 ctx
1397 }
1398 }
1399 }
1400}
1401
1402#[derive(Debug, Default)]
1412#[must_use]
1413pub struct ExecuteContextExtra {
1414 statement_uuid: Option<StatementLoggingId>,
1415}
1416
1417impl ExecuteContextExtra {
1418 pub(crate) fn new(statement_uuid: Option<StatementLoggingId>) -> Self {
1419 Self { statement_uuid }
1420 }
1421 pub fn is_trivial(&self) -> bool {
1422 self.statement_uuid.is_none()
1423 }
1424 pub fn contents(&self) -> Option<StatementLoggingId> {
1425 self.statement_uuid
1426 }
1427 #[must_use]
1431 pub(crate) fn retire(self) -> Option<StatementLoggingId> {
1432 self.statement_uuid
1433 }
1434}
1435
1436#[derive(Debug)]
1446#[must_use]
1447pub struct ExecuteContextGuard {
1448 extra: ExecuteContextExtra,
1449 coordinator_tx: mpsc::UnboundedSender<Message>,
1454}
1455
1456impl Default for ExecuteContextGuard {
1457 fn default() -> Self {
1458 let (tx, _rx) = mpsc::unbounded_channel();
1462 Self {
1463 extra: ExecuteContextExtra::default(),
1464 coordinator_tx: tx,
1465 }
1466 }
1467}
1468
1469impl ExecuteContextGuard {
1470 pub(crate) fn new(
1471 statement_uuid: Option<StatementLoggingId>,
1472 coordinator_tx: mpsc::UnboundedSender<Message>,
1473 ) -> Self {
1474 Self {
1475 extra: ExecuteContextExtra::new(statement_uuid),
1476 coordinator_tx,
1477 }
1478 }
1479 pub fn is_trivial(&self) -> bool {
1480 self.extra.is_trivial()
1481 }
1482 pub fn contents(&self) -> Option<StatementLoggingId> {
1483 self.extra.contents()
1484 }
1485 pub(crate) fn defuse(mut self) -> ExecuteContextExtra {
1492 std::mem::take(&mut self.extra)
1494 }
1495}
1496
1497impl Drop for ExecuteContextGuard {
1498 fn drop(&mut self) {
1499 if let Some(statement_uuid) = self.extra.statement_uuid.take() {
1500 let msg = Message::RetireExecute {
1503 data: ExecuteContextExtra {
1504 statement_uuid: Some(statement_uuid),
1505 },
1506 otel_ctx: OpenTelemetryContext::obtain(),
1507 reason: StatementEndedExecutionReason::Aborted,
1508 };
1509 let _ = self.coordinator_tx.send(msg);
1512 }
1513 }
1514}
1515
1516#[derive(Debug)]
1528pub struct ExecuteContext {
1529 inner: Box<ExecuteContextInner>,
1530}
1531
1532impl std::ops::Deref for ExecuteContext {
1533 type Target = ExecuteContextInner;
1534 fn deref(&self) -> &Self::Target {
1535 &*self.inner
1536 }
1537}
1538
1539impl std::ops::DerefMut for ExecuteContext {
1540 fn deref_mut(&mut self) -> &mut Self::Target {
1541 &mut *self.inner
1542 }
1543}
1544
1545#[derive(Debug)]
1546pub struct ExecuteContextInner {
1547 tx: ClientTransmitter<ExecuteResponse>,
1548 internal_cmd_tx: mpsc::UnboundedSender<Message>,
1549 session: Session,
1550 extra: ExecuteContextGuard,
1551}
1552
1553impl ExecuteContext {
1554 pub fn session(&self) -> &Session {
1555 &self.session
1556 }
1557
1558 pub fn session_mut(&mut self) -> &mut Session {
1559 &mut self.session
1560 }
1561
1562 pub fn tx(&self) -> &ClientTransmitter<ExecuteResponse> {
1563 &self.tx
1564 }
1565
1566 pub fn tx_mut(&mut self) -> &mut ClientTransmitter<ExecuteResponse> {
1567 &mut self.tx
1568 }
1569
1570 pub fn from_parts(
1571 tx: ClientTransmitter<ExecuteResponse>,
1572 internal_cmd_tx: mpsc::UnboundedSender<Message>,
1573 session: Session,
1574 extra: ExecuteContextGuard,
1575 ) -> Self {
1576 Self {
1577 inner: ExecuteContextInner {
1578 tx,
1579 session,
1580 extra,
1581 internal_cmd_tx,
1582 }
1583 .into(),
1584 }
1585 }
1586
1587 pub fn into_parts(
1596 self,
1597 ) -> (
1598 ClientTransmitter<ExecuteResponse>,
1599 mpsc::UnboundedSender<Message>,
1600 Session,
1601 ExecuteContextGuard,
1602 ) {
1603 let ExecuteContextInner {
1604 tx,
1605 internal_cmd_tx,
1606 session,
1607 extra,
1608 } = *self.inner;
1609 (tx, internal_cmd_tx, session, extra)
1610 }
1611
1612 #[instrument(level = "debug")]
1614 pub fn retire(self, result: Result<ExecuteResponse, AdapterError>) {
1615 let ExecuteContextInner {
1616 tx,
1617 internal_cmd_tx,
1618 session,
1619 extra,
1620 } = *self.inner;
1621 let reason = if extra.is_trivial() {
1622 None
1623 } else {
1624 Some((&result).into())
1625 };
1626 tx.send(result, session);
1627 if let Some(reason) = reason {
1628 let extra = extra.defuse();
1630 if let Err(e) = internal_cmd_tx.send(Message::RetireExecute {
1631 otel_ctx: OpenTelemetryContext::obtain(),
1632 data: extra,
1633 reason,
1634 }) {
1635 warn!("internal_cmd_rx dropped before we could send: {:?}", e);
1636 }
1637 }
1638 }
1639
1640 pub fn extra(&self) -> &ExecuteContextGuard {
1641 &self.extra
1642 }
1643
1644 pub fn extra_mut(&mut self) -> &mut ExecuteContextGuard {
1645 &mut self.extra
1646 }
1647}
1648
1649#[derive(Debug)]
1650struct ClusterReplicaStatuses(
1651 BTreeMap<ClusterId, BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>>>,
1652);
1653
1654impl ClusterReplicaStatuses {
1655 pub(crate) fn new() -> ClusterReplicaStatuses {
1656 ClusterReplicaStatuses(BTreeMap::new())
1657 }
1658
1659 pub(crate) fn initialize_cluster_statuses(&mut self, cluster_id: ClusterId) {
1663 let prev = self.0.insert(cluster_id, BTreeMap::new());
1664 assert_eq!(
1665 prev, None,
1666 "cluster {cluster_id} statuses already initialized"
1667 );
1668 }
1669
1670 pub(crate) fn initialize_cluster_replica_statuses(
1674 &mut self,
1675 cluster_id: ClusterId,
1676 replica_id: ReplicaId,
1677 num_processes: usize,
1678 time: DateTime<Utc>,
1679 ) {
1680 tracing::info!(
1681 ?cluster_id,
1682 ?replica_id,
1683 ?time,
1684 "initializing cluster replica status"
1685 );
1686 let replica_statuses = self.0.entry(cluster_id).or_default();
1687 let process_statuses = (0..num_processes)
1688 .map(|process_id| {
1689 let status = ClusterReplicaProcessStatus {
1690 status: ClusterStatus::Offline(Some(OfflineReason::Initializing)),
1691 time: time.clone(),
1692 };
1693 (u64::cast_from(process_id), status)
1694 })
1695 .collect();
1696 let prev = replica_statuses.insert(replica_id, process_statuses);
1697 assert_none!(
1698 prev,
1699 "cluster replica {cluster_id}.{replica_id} statuses already initialized"
1700 );
1701 }
1702
1703 pub(crate) fn remove_cluster_statuses(
1707 &mut self,
1708 cluster_id: &ClusterId,
1709 ) -> BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>> {
1710 let prev = self.0.remove(cluster_id);
1711 prev.unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"))
1712 }
1713
1714 pub(crate) fn remove_cluster_replica_statuses(
1718 &mut self,
1719 cluster_id: &ClusterId,
1720 replica_id: &ReplicaId,
1721 ) -> BTreeMap<ProcessId, ClusterReplicaProcessStatus> {
1722 let replica_statuses = self
1723 .0
1724 .get_mut(cluster_id)
1725 .unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"));
1726 let prev = replica_statuses.remove(replica_id);
1727 prev.unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1728 }
1729
1730 pub(crate) fn ensure_cluster_status(
1734 &mut self,
1735 cluster_id: ClusterId,
1736 replica_id: ReplicaId,
1737 process_id: ProcessId,
1738 status: ClusterReplicaProcessStatus,
1739 ) {
1740 let replica_statuses = self
1741 .0
1742 .get_mut(&cluster_id)
1743 .unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"))
1744 .get_mut(&replica_id)
1745 .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"));
1746 replica_statuses.insert(process_id, status);
1747 }
1748
1749 pub fn get_cluster_replica_status(
1753 &self,
1754 cluster_id: ClusterId,
1755 replica_id: ReplicaId,
1756 ) -> ClusterStatus {
1757 let process_status = self.get_cluster_replica_statuses(cluster_id, replica_id);
1758 Self::cluster_replica_status(process_status)
1759 }
1760
1761 pub fn cluster_replica_status(
1763 process_status: &BTreeMap<ProcessId, ClusterReplicaProcessStatus>,
1764 ) -> ClusterStatus {
1765 process_status
1766 .values()
1767 .fold(ClusterStatus::Online, |s, p| match (s, p.status) {
1768 (ClusterStatus::Online, ClusterStatus::Online) => ClusterStatus::Online,
1769 (x, y) => {
1770 let reason_x = match x {
1771 ClusterStatus::Offline(reason) => reason,
1772 ClusterStatus::Online => None,
1773 };
1774 let reason_y = match y {
1775 ClusterStatus::Offline(reason) => reason,
1776 ClusterStatus::Online => None,
1777 };
1778 ClusterStatus::Offline(reason_x.or(reason_y))
1780 }
1781 })
1782 }
1783
1784 pub(crate) fn get_cluster_replica_statuses(
1788 &self,
1789 cluster_id: ClusterId,
1790 replica_id: ReplicaId,
1791 ) -> &BTreeMap<ProcessId, ClusterReplicaProcessStatus> {
1792 self.try_get_cluster_replica_statuses(cluster_id, replica_id)
1793 .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1794 }
1795
1796 pub(crate) fn try_get_cluster_replica_statuses(
1798 &self,
1799 cluster_id: ClusterId,
1800 replica_id: ReplicaId,
1801 ) -> Option<&BTreeMap<ProcessId, ClusterReplicaProcessStatus>> {
1802 self.try_get_cluster_statuses(cluster_id)
1803 .and_then(|statuses| statuses.get(&replica_id))
1804 }
1805
1806 pub(crate) fn try_get_cluster_statuses(
1808 &self,
1809 cluster_id: ClusterId,
1810 ) -> Option<&BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>>> {
1811 self.0.get(&cluster_id)
1812 }
1813}
1814
1815#[derive(Derivative)]
1817#[derivative(Debug)]
1818pub struct Coordinator {
1819 #[derivative(Debug = "ignore")]
1821 controller: mz_controller::Controller,
1822 catalog: Arc<Catalog>,
1830
1831 persist_client: PersistClient,
1834
1835 internal_cmd_tx: mpsc::UnboundedSender<Message>,
1837 group_commit_tx: appends::GroupCommitNotifier,
1839
1840 strict_serializable_reads_tx: mpsc::UnboundedSender<(ConnectionId, PendingReadTxn)>,
1842
1843 global_timelines: BTreeMap<Timeline, TimelineState<Timestamp>>,
1846
1847 transient_id_gen: Arc<TransientIdGen>,
1849 active_conns: BTreeMap<ConnectionId, ConnMeta>,
1852
1853 txn_read_holds: BTreeMap<ConnectionId, read_policy::ReadHolds<Timestamp>>,
1857
1858 pending_peeks: BTreeMap<Uuid, PendingPeek>,
1862 client_pending_peeks: BTreeMap<ConnectionId, BTreeMap<Uuid, ClusterId>>,
1864
1865 pending_linearize_read_txns: BTreeMap<ConnectionId, PendingReadTxn>,
1867
1868 active_compute_sinks: BTreeMap<GlobalId, ActiveComputeSink>,
1870 active_webhooks: BTreeMap<CatalogItemId, WebhookAppenderInvalidator>,
1872 active_copies: BTreeMap<ConnectionId, ActiveCopyFrom>,
1875
1876 staged_cancellation: BTreeMap<ConnectionId, (watch::Sender<bool>, watch::Receiver<bool>)>,
1879 introspection_subscribes: BTreeMap<GlobalId, IntrospectionSubscribe>,
1881
1882 write_locks: BTreeMap<CatalogItemId, Arc<tokio::sync::Mutex<()>>>,
1884 deferred_write_ops: BTreeMap<ConnectionId, DeferredOp>,
1886
1887 pending_writes: Vec<PendingWriteTxn>,
1889
1890 advance_timelines_interval: Interval,
1900
1901 serialized_ddl: LockedVecDeque<DeferredPlanStatement>,
1910
1911 secrets_controller: Arc<dyn SecretsController>,
1914 caching_secrets_reader: CachingSecretsReader,
1916
1917 cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
1920
1921 storage_usage_client: StorageUsageClient,
1923 storage_usage_collection_interval: Duration,
1925
1926 #[derivative(Debug = "ignore")]
1928 segment_client: Option<mz_segment::Client>,
1929
1930 metrics: Metrics,
1932 optimizer_metrics: OptimizerMetrics,
1934
1935 tracing_handle: TracingHandle,
1937
1938 statement_logging: StatementLogging,
1940
1941 webhook_concurrency_limit: WebhookConcurrencyLimiter,
1943
1944 timestamp_oracle_config: Option<TimestampOracleConfig>,
1947
1948 check_cluster_scheduling_policies_interval: Interval,
1950
1951 cluster_scheduling_decisions: BTreeMap<ClusterId, BTreeMap<&'static str, SchedulingDecision>>,
1955
1956 caught_up_check_interval: Interval,
1959
1960 caught_up_check: Option<CaughtUpCheckContext>,
1963
1964 installed_watch_sets: BTreeMap<WatchSetId, (ConnectionId, WatchSetResponse)>,
1966
1967 connection_watch_sets: BTreeMap<ConnectionId, BTreeSet<WatchSetId>>,
1969
1970 cluster_replica_statuses: ClusterReplicaStatuses,
1972
1973 read_only_controllers: bool,
1977
1978 buffered_builtin_table_updates: Option<Vec<BuiltinTableUpdate>>,
1986
1987 license_key: ValidatedLicenseKey,
1988
1989 user_id_pool: IdPool,
1991}
1992
1993impl Coordinator {
1994 #[instrument(name = "coord::bootstrap")]
1998 pub(crate) async fn bootstrap(
1999 &mut self,
2000 boot_ts: Timestamp,
2001 migrated_storage_collections_0dt: BTreeSet<CatalogItemId>,
2002 mut builtin_table_updates: Vec<BuiltinTableUpdate>,
2003 cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
2004 uncached_local_exprs: BTreeMap<GlobalId, LocalExpressions>,
2005 audit_logs_iterator: AuditLogIterator,
2006 ) -> Result<(), AdapterError> {
2007 let bootstrap_start = Instant::now();
2008 info!("startup: coordinator init: bootstrap beginning");
2009 info!("startup: coordinator init: bootstrap: preamble beginning");
2010
2011 let cluster_statuses: Vec<(_, Vec<_>)> = self
2014 .catalog()
2015 .clusters()
2016 .map(|cluster| {
2017 (
2018 cluster.id(),
2019 cluster
2020 .replicas()
2021 .map(|replica| {
2022 (replica.replica_id, replica.config.location.num_processes())
2023 })
2024 .collect(),
2025 )
2026 })
2027 .collect();
2028 let now = self.now_datetime();
2029 for (cluster_id, replica_statuses) in cluster_statuses {
2030 self.cluster_replica_statuses
2031 .initialize_cluster_statuses(cluster_id);
2032 for (replica_id, num_processes) in replica_statuses {
2033 self.cluster_replica_statuses
2034 .initialize_cluster_replica_statuses(
2035 cluster_id,
2036 replica_id,
2037 num_processes,
2038 now,
2039 );
2040 }
2041 }
2042
2043 let system_config = self.catalog().system_config();
2044
2045 mz_metrics::update_dyncfg(&system_config.dyncfg_updates());
2047
2048 let compute_config = flags::compute_config(system_config);
2050 let storage_config = flags::storage_config(system_config);
2051 let scheduling_config = flags::orchestrator_scheduling_config(system_config);
2052 let dyncfg_updates = system_config.dyncfg_updates();
2053 self.controller.compute.update_configuration(compute_config);
2054 self.controller.storage.update_parameters(storage_config);
2055 self.controller
2056 .update_orchestrator_scheduling_config(scheduling_config);
2057 self.controller.update_configuration(dyncfg_updates);
2058
2059 self.validate_resource_limit_numeric(
2060 Numeric::zero(),
2061 self.current_credit_consumption_rate(),
2062 |system_vars| {
2063 self.license_key
2064 .max_credit_consumption_rate()
2065 .map_or_else(|| system_vars.max_credit_consumption_rate(), Numeric::from)
2066 },
2067 "cluster replica",
2068 MAX_CREDIT_CONSUMPTION_RATE.name(),
2069 )?;
2070
2071 let mut policies_to_set: BTreeMap<CompactionWindow, CollectionIdBundle> =
2072 Default::default();
2073
2074 let enable_worker_core_affinity =
2075 self.catalog().system_config().enable_worker_core_affinity();
2076 for instance in self.catalog.clusters() {
2077 self.controller.create_cluster(
2078 instance.id,
2079 ClusterConfig {
2080 arranged_logs: instance.log_indexes.clone(),
2081 workload_class: instance.config.workload_class.clone(),
2082 },
2083 )?;
2084 for replica in instance.replicas() {
2085 let role = instance.role();
2086 self.controller.create_replica(
2087 instance.id,
2088 replica.replica_id,
2089 instance.name.clone(),
2090 replica.name.clone(),
2091 role,
2092 replica.config.clone(),
2093 enable_worker_core_affinity,
2094 )?;
2095 }
2096 }
2097
2098 info!(
2099 "startup: coordinator init: bootstrap: preamble complete in {:?}",
2100 bootstrap_start.elapsed()
2101 );
2102
2103 let init_storage_collections_start = Instant::now();
2104 info!("startup: coordinator init: bootstrap: storage collections init beginning");
2105 self.bootstrap_storage_collections(&migrated_storage_collections_0dt)
2106 .await;
2107 info!(
2108 "startup: coordinator init: bootstrap: storage collections init complete in {:?}",
2109 init_storage_collections_start.elapsed()
2110 );
2111
2112 self.controller.start_compute_introspection_sink();
2117
2118 let sorting_start = Instant::now();
2119 info!("startup: coordinator init: bootstrap: sorting catalog entries");
2120 let entries = self.bootstrap_sort_catalog_entries();
2121 info!(
2122 "startup: coordinator init: bootstrap: sorting catalog entries complete in {:?}",
2123 sorting_start.elapsed()
2124 );
2125
2126 let optimize_dataflows_start = Instant::now();
2127 info!("startup: coordinator init: bootstrap: optimize dataflow plans beginning");
2128 let uncached_global_exps = self.bootstrap_dataflow_plans(&entries, cached_global_exprs)?;
2129 info!(
2130 "startup: coordinator init: bootstrap: optimize dataflow plans complete in {:?}",
2131 optimize_dataflows_start.elapsed()
2132 );
2133
2134 let _fut = self.catalog().update_expression_cache(
2136 uncached_local_exprs.into_iter().collect(),
2137 uncached_global_exps.into_iter().collect(),
2138 Default::default(),
2139 );
2140
2141 let bootstrap_as_ofs_start = Instant::now();
2145 info!("startup: coordinator init: bootstrap: dataflow as-of bootstrapping beginning");
2146 let dataflow_read_holds = self.bootstrap_dataflow_as_ofs().await;
2147 info!(
2148 "startup: coordinator init: bootstrap: dataflow as-of bootstrapping complete in {:?}",
2149 bootstrap_as_ofs_start.elapsed()
2150 );
2151
2152 let postamble_start = Instant::now();
2153 info!("startup: coordinator init: bootstrap: postamble beginning");
2154
2155 let logs: BTreeSet<_> = BUILTINS::logs()
2156 .map(|log| self.catalog().resolve_builtin_log(log))
2157 .flat_map(|item_id| self.catalog().get_global_ids(&item_id))
2158 .collect();
2159
2160 let mut privatelink_connections = BTreeMap::new();
2161
2162 for entry in &entries {
2163 debug!(
2164 "coordinator init: installing {} {}",
2165 entry.item().typ(),
2166 entry.id()
2167 );
2168 let mut policy = entry.item().initial_logical_compaction_window();
2169 match entry.item() {
2170 CatalogItem::Source(source) => {
2176 if source.custom_logical_compaction_window.is_none() {
2178 if let DataSourceDesc::IngestionExport { ingestion_id, .. } =
2179 source.data_source
2180 {
2181 policy = Some(
2182 self.catalog()
2183 .get_entry(&ingestion_id)
2184 .source()
2185 .expect("must be source")
2186 .custom_logical_compaction_window
2187 .unwrap_or_default(),
2188 );
2189 }
2190 }
2191 policies_to_set
2192 .entry(policy.expect("sources have a compaction window"))
2193 .or_insert_with(Default::default)
2194 .storage_ids
2195 .insert(source.global_id());
2196 }
2197 CatalogItem::Table(table) => {
2198 policies_to_set
2199 .entry(policy.expect("tables have a compaction window"))
2200 .or_insert_with(Default::default)
2201 .storage_ids
2202 .extend(table.global_ids());
2203 }
2204 CatalogItem::Index(idx) => {
2205 let policy_entry = policies_to_set
2206 .entry(policy.expect("indexes have a compaction window"))
2207 .or_insert_with(Default::default);
2208
2209 if logs.contains(&idx.on) {
2210 policy_entry
2211 .compute_ids
2212 .entry(idx.cluster_id)
2213 .or_insert_with(BTreeSet::new)
2214 .insert(idx.global_id());
2215 } else {
2216 let df_desc = self
2217 .catalog()
2218 .try_get_physical_plan(&idx.global_id())
2219 .expect("added in `bootstrap_dataflow_plans`")
2220 .clone();
2221
2222 let df_meta = self
2223 .catalog()
2224 .try_get_dataflow_metainfo(&idx.global_id())
2225 .expect("added in `bootstrap_dataflow_plans`");
2226
2227 if self.catalog().state().system_config().enable_mz_notices() {
2228 self.catalog().state().pack_optimizer_notices(
2230 &mut builtin_table_updates,
2231 df_meta.optimizer_notices.iter(),
2232 Diff::ONE,
2233 );
2234 }
2235
2236 policy_entry
2239 .compute_ids
2240 .entry(idx.cluster_id)
2241 .or_insert_with(Default::default)
2242 .extend(df_desc.export_ids());
2243
2244 self.controller
2245 .compute
2246 .create_dataflow(idx.cluster_id, df_desc, None)
2247 .unwrap_or_terminate("cannot fail to create dataflows");
2248 }
2249 }
2250 CatalogItem::View(_) => (),
2251 CatalogItem::MaterializedView(mview) => {
2252 policies_to_set
2253 .entry(policy.expect("materialized views have a compaction window"))
2254 .or_insert_with(Default::default)
2255 .storage_ids
2256 .insert(mview.global_id_writes());
2257
2258 let mut df_desc = self
2259 .catalog()
2260 .try_get_physical_plan(&mview.global_id_writes())
2261 .expect("added in `bootstrap_dataflow_plans`")
2262 .clone();
2263
2264 if let Some(initial_as_of) = mview.initial_as_of.clone() {
2265 df_desc.set_initial_as_of(initial_as_of);
2266 }
2267
2268 let until = mview
2270 .refresh_schedule
2271 .as_ref()
2272 .and_then(|s| s.last_refresh())
2273 .and_then(|r| r.try_step_forward());
2274 if let Some(until) = until {
2275 df_desc.until.meet_assign(&Antichain::from_elem(until));
2276 }
2277
2278 let df_meta = self
2279 .catalog()
2280 .try_get_dataflow_metainfo(&mview.global_id_writes())
2281 .expect("added in `bootstrap_dataflow_plans`");
2282
2283 if self.catalog().state().system_config().enable_mz_notices() {
2284 self.catalog().state().pack_optimizer_notices(
2286 &mut builtin_table_updates,
2287 df_meta.optimizer_notices.iter(),
2288 Diff::ONE,
2289 );
2290 }
2291
2292 self.ship_dataflow(df_desc, mview.cluster_id, mview.target_replica)
2293 .await;
2294
2295 if mview.replacement_target.is_none() {
2298 self.allow_writes(mview.cluster_id, mview.global_id_writes());
2299 }
2300 }
2301 CatalogItem::Sink(sink) => {
2302 policies_to_set
2303 .entry(CompactionWindow::Default)
2304 .or_insert_with(Default::default)
2305 .storage_ids
2306 .insert(sink.global_id());
2307 }
2308 CatalogItem::Connection(catalog_connection) => {
2309 if let ConnectionDetails::AwsPrivatelink(conn) = &catalog_connection.details {
2310 privatelink_connections.insert(
2311 entry.id(),
2312 VpcEndpointConfig {
2313 aws_service_name: conn.service_name.clone(),
2314 availability_zone_ids: conn.availability_zones.clone(),
2315 },
2316 );
2317 }
2318 }
2319 CatalogItem::ContinualTask(ct) => {
2320 policies_to_set
2321 .entry(policy.expect("continual tasks have a compaction window"))
2322 .or_insert_with(Default::default)
2323 .storage_ids
2324 .insert(ct.global_id());
2325
2326 let mut df_desc = self
2327 .catalog()
2328 .try_get_physical_plan(&ct.global_id())
2329 .expect("added in `bootstrap_dataflow_plans`")
2330 .clone();
2331
2332 if let Some(initial_as_of) = ct.initial_as_of.clone() {
2333 df_desc.set_initial_as_of(initial_as_of);
2334 }
2335
2336 let df_meta = self
2337 .catalog()
2338 .try_get_dataflow_metainfo(&ct.global_id())
2339 .expect("added in `bootstrap_dataflow_plans`");
2340
2341 if self.catalog().state().system_config().enable_mz_notices() {
2342 self.catalog().state().pack_optimizer_notices(
2344 &mut builtin_table_updates,
2345 df_meta.optimizer_notices.iter(),
2346 Diff::ONE,
2347 );
2348 }
2349
2350 self.ship_dataflow(df_desc, ct.cluster_id, None).await;
2351 self.allow_writes(ct.cluster_id, ct.global_id());
2352 }
2353 CatalogItem::Log(_)
2355 | CatalogItem::Type(_)
2356 | CatalogItem::Func(_)
2357 | CatalogItem::Secret(_) => {}
2358 }
2359 }
2360
2361 if let Some(cloud_resource_controller) = &self.cloud_resource_controller {
2362 let existing_vpc_endpoints = cloud_resource_controller
2364 .list_vpc_endpoints()
2365 .await
2366 .context("list vpc endpoints")?;
2367 let existing_vpc_endpoints = BTreeSet::from_iter(existing_vpc_endpoints.into_keys());
2368 let desired_vpc_endpoints = privatelink_connections.keys().cloned().collect();
2369 let vpc_endpoints_to_remove = existing_vpc_endpoints.difference(&desired_vpc_endpoints);
2370 for id in vpc_endpoints_to_remove {
2371 cloud_resource_controller
2372 .delete_vpc_endpoint(*id)
2373 .await
2374 .context("deleting extraneous vpc endpoint")?;
2375 }
2376
2377 for (id, spec) in privatelink_connections {
2379 cloud_resource_controller
2380 .ensure_vpc_endpoint(id, spec)
2381 .await
2382 .context("ensuring vpc endpoint")?;
2383 }
2384 }
2385
2386 drop(dataflow_read_holds);
2389 for (cw, policies) in policies_to_set {
2391 self.initialize_read_policies(&policies, cw).await;
2392 }
2393
2394 builtin_table_updates.extend(
2396 self.catalog().state().resolve_builtin_table_updates(
2397 self.catalog().state().pack_all_replica_size_updates(),
2398 ),
2399 );
2400
2401 debug!("startup: coordinator init: bootstrap: initializing migrated builtin tables");
2402 let migrated_updates_fut = if self.controller.read_only() {
2408 let min_timestamp = Timestamp::minimum();
2409 let migrated_builtin_table_updates: Vec<_> = builtin_table_updates
2410 .extract_if(.., |update| {
2411 let gid = self.catalog().get_entry(&update.id).latest_global_id();
2412 migrated_storage_collections_0dt.contains(&update.id)
2413 && self
2414 .controller
2415 .storage_collections
2416 .collection_frontiers(gid)
2417 .expect("all tables are registered")
2418 .write_frontier
2419 .elements()
2420 == &[min_timestamp]
2421 })
2422 .collect();
2423 if migrated_builtin_table_updates.is_empty() {
2424 futures::future::ready(()).boxed()
2425 } else {
2426 let mut grouped_appends: BTreeMap<GlobalId, Vec<TableData>> = BTreeMap::new();
2428 for update in migrated_builtin_table_updates {
2429 let gid = self.catalog().get_entry(&update.id).latest_global_id();
2430 grouped_appends.entry(gid).or_default().push(update.data);
2431 }
2432 info!(
2433 "coordinator init: rehydrating migrated builtin tables in read-only mode: {:?}",
2434 grouped_appends.keys().collect::<Vec<_>>()
2435 );
2436
2437 let mut all_appends = Vec::with_capacity(grouped_appends.len());
2439 for (item_id, table_data) in grouped_appends.into_iter() {
2440 let mut all_rows = Vec::new();
2441 let mut all_data = Vec::new();
2442 for data in table_data {
2443 match data {
2444 TableData::Rows(rows) => all_rows.extend(rows),
2445 TableData::Batches(_) => all_data.push(data),
2446 }
2447 }
2448 differential_dataflow::consolidation::consolidate(&mut all_rows);
2449 all_data.push(TableData::Rows(all_rows));
2450
2451 all_appends.push((item_id, all_data));
2453 }
2454
2455 let fut = self
2456 .controller
2457 .storage
2458 .append_table(min_timestamp, boot_ts.step_forward(), all_appends)
2459 .expect("cannot fail to append");
2460 async {
2461 fut.await
2462 .expect("One-shot shouldn't be dropped during bootstrap")
2463 .unwrap_or_terminate("cannot fail to append")
2464 }
2465 .boxed()
2466 }
2467 } else {
2468 futures::future::ready(()).boxed()
2469 };
2470
2471 info!(
2472 "startup: coordinator init: bootstrap: postamble complete in {:?}",
2473 postamble_start.elapsed()
2474 );
2475
2476 let builtin_update_start = Instant::now();
2477 info!("startup: coordinator init: bootstrap: generate builtin updates beginning");
2478
2479 if self.controller.read_only() {
2480 info!(
2481 "coordinator init: bootstrap: stashing builtin table updates while in read-only mode"
2482 );
2483
2484 let audit_join_start = Instant::now();
2486 info!("startup: coordinator init: bootstrap: audit log deserialization beginning");
2487 let audit_log_updates: Vec<_> = audit_logs_iterator
2488 .map(|(audit_log, ts)| StateUpdate {
2489 kind: StateUpdateKind::AuditLog(audit_log),
2490 ts,
2491 diff: StateDiff::Addition,
2492 })
2493 .collect();
2494 let audit_log_builtin_table_updates = self
2495 .catalog()
2496 .state()
2497 .generate_builtin_table_updates(audit_log_updates);
2498 builtin_table_updates.extend(audit_log_builtin_table_updates);
2499 info!(
2500 "startup: coordinator init: bootstrap: audit log deserialization complete in {:?}",
2501 audit_join_start.elapsed()
2502 );
2503 self.buffered_builtin_table_updates
2504 .as_mut()
2505 .expect("in read-only mode")
2506 .append(&mut builtin_table_updates);
2507 } else {
2508 self.bootstrap_tables(&entries, builtin_table_updates, audit_logs_iterator)
2509 .await;
2510 };
2511 info!(
2512 "startup: coordinator init: bootstrap: generate builtin updates complete in {:?}",
2513 builtin_update_start.elapsed()
2514 );
2515
2516 let cleanup_secrets_start = Instant::now();
2517 info!("startup: coordinator init: bootstrap: generate secret cleanup beginning");
2518 {
2522 let Self {
2525 secrets_controller,
2526 catalog,
2527 ..
2528 } = self;
2529
2530 let next_user_item_id = catalog.get_next_user_item_id().await?;
2531 let next_system_item_id = catalog.get_next_system_item_id().await?;
2532 let read_only = self.controller.read_only();
2533 let catalog_ids: BTreeSet<CatalogItemId> =
2538 catalog.entries().map(|entry| entry.id()).collect();
2539 let secrets_controller = Arc::clone(secrets_controller);
2540
2541 spawn(|| "cleanup-orphaned-secrets", async move {
2542 if read_only {
2543 info!(
2544 "coordinator init: not cleaning up orphaned secrets while in read-only mode"
2545 );
2546 return;
2547 }
2548 info!("coordinator init: cleaning up orphaned secrets");
2549
2550 match secrets_controller.list().await {
2551 Ok(controller_secrets) => {
2552 let controller_secrets: BTreeSet<CatalogItemId> =
2553 controller_secrets.into_iter().collect();
2554 let orphaned = controller_secrets.difference(&catalog_ids);
2555 for id in orphaned {
2556 let id_too_large = match id {
2557 CatalogItemId::System(id) => *id >= next_system_item_id,
2558 CatalogItemId::User(id) => *id >= next_user_item_id,
2559 CatalogItemId::IntrospectionSourceIndex(_)
2560 | CatalogItemId::Transient(_) => false,
2561 };
2562 if id_too_large {
2563 info!(
2564 %next_user_item_id, %next_system_item_id,
2565 "coordinator init: not deleting orphaned secret {id} that was likely created by a newer deploy generation"
2566 );
2567 } else {
2568 info!("coordinator init: deleting orphaned secret {id}");
2569 fail_point!("orphan_secrets");
2570 if let Err(e) = secrets_controller.delete(*id).await {
2571 warn!(
2572 "Dropping orphaned secret has encountered an error: {}",
2573 e
2574 );
2575 }
2576 }
2577 }
2578 }
2579 Err(e) => warn!("Failed to list secrets during orphan cleanup: {:?}", e),
2580 }
2581 });
2582 }
2583 info!(
2584 "startup: coordinator init: bootstrap: generate secret cleanup complete in {:?}",
2585 cleanup_secrets_start.elapsed()
2586 );
2587
2588 let final_steps_start = Instant::now();
2590 info!(
2591 "startup: coordinator init: bootstrap: migrate builtin tables in read-only mode beginning"
2592 );
2593 migrated_updates_fut
2594 .instrument(info_span!("coord::bootstrap::final"))
2595 .await;
2596
2597 debug!(
2598 "startup: coordinator init: bootstrap: announcing completion of initialization to controller"
2599 );
2600 self.controller.initialization_complete();
2602
2603 self.bootstrap_introspection_subscribes().await;
2605
2606 info!(
2607 "startup: coordinator init: bootstrap: migrate builtin tables in read-only mode complete in {:?}",
2608 final_steps_start.elapsed()
2609 );
2610
2611 info!(
2612 "startup: coordinator init: bootstrap complete in {:?}",
2613 bootstrap_start.elapsed()
2614 );
2615 Ok(())
2616 }
2617
2618 #[allow(clippy::async_yields_async)]
2623 #[instrument]
2624 async fn bootstrap_tables(
2625 &mut self,
2626 entries: &[CatalogEntry],
2627 mut builtin_table_updates: Vec<BuiltinTableUpdate>,
2628 audit_logs_iterator: AuditLogIterator,
2629 ) {
2630 struct TableMetadata<'a> {
2632 id: CatalogItemId,
2633 name: &'a QualifiedItemName,
2634 table: &'a Table,
2635 }
2636
2637 let table_metas: Vec<_> = entries
2639 .into_iter()
2640 .filter_map(|entry| {
2641 entry.table().map(|table| TableMetadata {
2642 id: entry.id(),
2643 name: entry.name(),
2644 table,
2645 })
2646 })
2647 .collect();
2648
2649 debug!("coordinator init: advancing all tables to current timestamp");
2651 let WriteTimestamp {
2652 timestamp: write_ts,
2653 advance_to,
2654 } = self.get_local_write_ts().await;
2655 let appends = table_metas
2656 .iter()
2657 .map(|meta| (meta.table.global_id_writes(), Vec::new()))
2658 .collect();
2659 let table_fence_rx = self
2663 .controller
2664 .storage
2665 .append_table(write_ts.clone(), advance_to, appends)
2666 .expect("invalid updates");
2667
2668 self.apply_local_write(write_ts).await;
2669
2670 debug!("coordinator init: resetting system tables");
2672 let read_ts = self.get_local_read_ts().await;
2673
2674 let mz_storage_usage_by_shard_schema: SchemaSpecifier = self
2677 .catalog()
2678 .resolve_system_schema(MZ_STORAGE_USAGE_BY_SHARD.schema)
2679 .into();
2680 let is_storage_usage_by_shard = |meta: &TableMetadata| -> bool {
2681 meta.name.item == MZ_STORAGE_USAGE_BY_SHARD.name
2682 && meta.name.qualifiers.schema_spec == mz_storage_usage_by_shard_schema
2683 };
2684
2685 let mut retraction_tasks = Vec::new();
2686 let mut system_tables: Vec<_> = table_metas
2687 .iter()
2688 .filter(|meta| meta.id.is_system() && !is_storage_usage_by_shard(meta))
2689 .collect();
2690
2691 let (audit_events_idx, _) = system_tables
2693 .iter()
2694 .find_position(|table| {
2695 table.id == self.catalog().resolve_builtin_table(&MZ_AUDIT_EVENTS)
2696 })
2697 .expect("mz_audit_events must exist");
2698 let audit_events = system_tables.remove(audit_events_idx);
2699 let audit_log_task = self.bootstrap_audit_log_table(
2700 audit_events.id,
2701 audit_events.name,
2702 audit_events.table,
2703 audit_logs_iterator,
2704 read_ts,
2705 );
2706
2707 for system_table in system_tables {
2708 let table_id = system_table.id;
2709 let full_name = self.catalog().resolve_full_name(system_table.name, None);
2710 debug!("coordinator init: resetting system table {full_name} ({table_id})");
2711
2712 let snapshot_fut = self
2714 .controller
2715 .storage_collections
2716 .snapshot_cursor(system_table.table.global_id_writes(), read_ts);
2717 let batch_fut = self
2718 .controller
2719 .storage_collections
2720 .create_update_builder(system_table.table.global_id_writes());
2721
2722 let task = spawn(|| format!("snapshot-{table_id}"), async move {
2723 let mut batch = batch_fut
2725 .await
2726 .unwrap_or_terminate("cannot fail to create a batch for a BuiltinTable");
2727 tracing::info!(?table_id, "starting snapshot");
2728 let mut snapshot_cursor = snapshot_fut
2730 .await
2731 .unwrap_or_terminate("cannot fail to snapshot");
2732
2733 while let Some(values) = snapshot_cursor.next().await {
2735 for (key, _t, d) in values {
2736 let d_invert = d.neg();
2737 batch.add(&key, &(), &d_invert).await;
2738 }
2739 }
2740 tracing::info!(?table_id, "finished snapshot");
2741
2742 let batch = batch.finish().await;
2743 BuiltinTableUpdate::batch(table_id, batch)
2744 });
2745 retraction_tasks.push(task);
2746 }
2747
2748 let retractions_res = futures::future::join_all(retraction_tasks).await;
2749 for retractions in retractions_res {
2750 builtin_table_updates.push(retractions);
2751 }
2752
2753 let audit_join_start = Instant::now();
2754 info!("startup: coordinator init: bootstrap: join audit log deserialization beginning");
2755 let audit_log_updates = audit_log_task.await;
2756 let audit_log_builtin_table_updates = self
2757 .catalog()
2758 .state()
2759 .generate_builtin_table_updates(audit_log_updates);
2760 builtin_table_updates.extend(audit_log_builtin_table_updates);
2761 info!(
2762 "startup: coordinator init: bootstrap: join audit log deserialization complete in {:?}",
2763 audit_join_start.elapsed()
2764 );
2765
2766 table_fence_rx
2768 .await
2769 .expect("One-shot shouldn't be dropped during bootstrap")
2770 .unwrap_or_terminate("cannot fail to append");
2771
2772 info!("coordinator init: sending builtin table updates");
2773 let (_builtin_updates_fut, write_ts) = self
2774 .builtin_table_update()
2775 .execute(builtin_table_updates)
2776 .await;
2777 info!(?write_ts, "our write ts");
2778 if let Some(write_ts) = write_ts {
2779 self.apply_local_write(write_ts).await;
2780 }
2781 }
2782
2783 #[instrument]
2787 fn bootstrap_audit_log_table<'a>(
2788 &self,
2789 table_id: CatalogItemId,
2790 name: &'a QualifiedItemName,
2791 table: &'a Table,
2792 audit_logs_iterator: AuditLogIterator,
2793 read_ts: Timestamp,
2794 ) -> JoinHandle<Vec<StateUpdate>> {
2795 let full_name = self.catalog().resolve_full_name(name, None);
2796 debug!("coordinator init: reconciling audit log: {full_name} ({table_id})");
2797 let current_contents_fut = self
2798 .controller
2799 .storage_collections
2800 .snapshot(table.global_id_writes(), read_ts);
2801 spawn(|| format!("snapshot-audit-log-{table_id}"), async move {
2802 let current_contents = current_contents_fut
2803 .await
2804 .unwrap_or_terminate("cannot fail to fetch snapshot");
2805 let contents_len = current_contents.len();
2806 debug!("coordinator init: audit log table ({table_id}) size {contents_len}");
2807
2808 let max_table_id = current_contents
2810 .into_iter()
2811 .filter(|(_, diff)| *diff == 1)
2812 .map(|(row, _diff)| row.unpack_first().unwrap_uint64())
2813 .sorted()
2814 .rev()
2815 .next();
2816
2817 audit_logs_iterator
2819 .take_while(|(audit_log, _)| match max_table_id {
2820 Some(id) => audit_log.event.sortable_id() > id,
2821 None => true,
2822 })
2823 .map(|(audit_log, ts)| StateUpdate {
2824 kind: StateUpdateKind::AuditLog(audit_log),
2825 ts,
2826 diff: StateDiff::Addition,
2827 })
2828 .collect::<Vec<_>>()
2829 })
2830 }
2831
2832 #[instrument]
2845 async fn bootstrap_storage_collections(
2846 &mut self,
2847 migrated_storage_collections: &BTreeSet<CatalogItemId>,
2848 ) {
2849 let catalog = self.catalog();
2850
2851 let source_desc = |object_id: GlobalId,
2852 data_source: &DataSourceDesc,
2853 desc: &RelationDesc,
2854 timeline: &Timeline| {
2855 let data_source = match data_source.clone() {
2856 DataSourceDesc::Ingestion { desc, cluster_id } => {
2858 let desc = desc.into_inline_connection(catalog.state());
2859 let ingestion = IngestionDescription::new(desc, cluster_id, object_id);
2860 DataSource::Ingestion(ingestion)
2861 }
2862 DataSourceDesc::OldSyntaxIngestion {
2863 desc,
2864 progress_subsource,
2865 data_config,
2866 details,
2867 cluster_id,
2868 } => {
2869 let desc = desc.into_inline_connection(catalog.state());
2870 let data_config = data_config.into_inline_connection(catalog.state());
2871 let progress_subsource =
2874 catalog.get_entry(&progress_subsource).latest_global_id();
2875 let mut ingestion =
2876 IngestionDescription::new(desc, cluster_id, progress_subsource);
2877 let legacy_export = SourceExport {
2878 storage_metadata: (),
2879 data_config,
2880 details,
2881 };
2882 ingestion.source_exports.insert(object_id, legacy_export);
2883
2884 DataSource::Ingestion(ingestion)
2885 }
2886 DataSourceDesc::IngestionExport {
2887 ingestion_id,
2888 external_reference: _,
2889 details,
2890 data_config,
2891 } => {
2892 let ingestion_id = catalog.get_entry(&ingestion_id).latest_global_id();
2895
2896 DataSource::IngestionExport {
2897 ingestion_id,
2898 details,
2899 data_config: data_config.into_inline_connection(catalog.state()),
2900 }
2901 }
2902 DataSourceDesc::Webhook { .. } => DataSource::Webhook,
2903 DataSourceDesc::Progress => DataSource::Progress,
2904 DataSourceDesc::Introspection(introspection) => {
2905 DataSource::Introspection(introspection)
2906 }
2907 DataSourceDesc::Catalog => DataSource::Other,
2908 };
2909 CollectionDescription {
2910 desc: desc.clone(),
2911 data_source,
2912 since: None,
2913 timeline: Some(timeline.clone()),
2914 primary: None,
2915 }
2916 };
2917
2918 let mut compute_collections = vec![];
2919 let mut collections = vec![];
2920 for entry in catalog.entries() {
2921 match entry.item() {
2922 CatalogItem::Source(source) => {
2923 collections.push((
2924 source.global_id(),
2925 source_desc(
2926 source.global_id(),
2927 &source.data_source,
2928 &source.desc,
2929 &source.timeline,
2930 ),
2931 ));
2932 }
2933 CatalogItem::Table(table) => {
2934 match &table.data_source {
2935 TableDataSource::TableWrites { defaults: _ } => {
2936 let versions: BTreeMap<_, _> = table
2937 .collection_descs()
2938 .map(|(gid, version, desc)| (version, (gid, desc)))
2939 .collect();
2940 let collection_descs = versions.iter().map(|(version, (gid, desc))| {
2941 let next_version = version.bump();
2942 let primary_collection =
2943 versions.get(&next_version).map(|(gid, _desc)| gid).copied();
2944 let mut collection_desc =
2945 CollectionDescription::for_table(desc.clone());
2946 collection_desc.primary = primary_collection;
2947
2948 (*gid, collection_desc)
2949 });
2950 collections.extend(collection_descs);
2951 }
2952 TableDataSource::DataSource {
2953 desc: data_source_desc,
2954 timeline,
2955 } => {
2956 soft_assert_eq_or_log!(table.collections.len(), 1);
2958 let collection_descs =
2959 table.collection_descs().map(|(gid, _version, desc)| {
2960 (
2961 gid,
2962 source_desc(
2963 entry.latest_global_id(),
2964 data_source_desc,
2965 &desc,
2966 timeline,
2967 ),
2968 )
2969 });
2970 collections.extend(collection_descs);
2971 }
2972 };
2973 }
2974 CatalogItem::MaterializedView(mv) => {
2975 let collection_descs = mv.collection_descs().map(|(gid, _version, desc)| {
2976 let collection_desc =
2977 CollectionDescription::for_other(desc, mv.initial_as_of.clone());
2978 (gid, collection_desc)
2979 });
2980
2981 collections.extend(collection_descs);
2982 compute_collections.push((mv.global_id_writes(), mv.desc.latest()));
2983 }
2984 CatalogItem::ContinualTask(ct) => {
2985 let collection_desc =
2986 CollectionDescription::for_other(ct.desc.clone(), ct.initial_as_of.clone());
2987 compute_collections.push((ct.global_id(), ct.desc.clone()));
2988 collections.push((ct.global_id(), collection_desc));
2989 }
2990 CatalogItem::Sink(sink) => {
2991 let storage_sink_from_entry = self.catalog().get_entry_by_global_id(&sink.from);
2992 let from_desc = storage_sink_from_entry
2993 .relation_desc()
2994 .expect("sinks can only be built on items with descs")
2995 .into_owned();
2996 let collection_desc = CollectionDescription {
2997 desc: KAFKA_PROGRESS_DESC.clone(),
2999 data_source: DataSource::Sink {
3000 desc: ExportDescription {
3001 sink: StorageSinkDesc {
3002 from: sink.from,
3003 from_desc,
3004 connection: sink
3005 .connection
3006 .clone()
3007 .into_inline_connection(self.catalog().state()),
3008 envelope: sink.envelope,
3009 as_of: Antichain::from_elem(Timestamp::minimum()),
3010 with_snapshot: sink.with_snapshot,
3011 version: sink.version,
3012 from_storage_metadata: (),
3013 to_storage_metadata: (),
3014 commit_interval: sink.commit_interval,
3015 },
3016 instance_id: sink.cluster_id,
3017 },
3018 },
3019 since: None,
3020 timeline: None,
3021 primary: None,
3022 };
3023 collections.push((sink.global_id, collection_desc));
3024 }
3025 CatalogItem::Log(_)
3026 | CatalogItem::View(_)
3027 | CatalogItem::Index(_)
3028 | CatalogItem::Type(_)
3029 | CatalogItem::Func(_)
3030 | CatalogItem::Secret(_)
3031 | CatalogItem::Connection(_) => (),
3032 }
3033 }
3034
3035 let register_ts = if self.controller.read_only() {
3036 self.get_local_read_ts().await
3037 } else {
3038 self.get_local_write_ts().await.timestamp
3041 };
3042
3043 let mut derived_builtin_storage_collections: Vec<_> = collections
3056 .extract_if(.., |(id, c)| {
3057 if !id.is_system() || c.since.is_some() {
3059 return false;
3060 }
3061
3062 use CatalogItem::*;
3063 match &self.catalog.get_entry_by_global_id(id).item {
3064 Log(_) | View(_) | Index(_) | Type(_) | Func(_) | Secret(_) | Connection(_) => {
3066 false
3067 }
3068 Table(_) | Source(_) => false,
3070 MaterializedView(_) | ContinualTask(_) => true,
3072 Sink(_) => unimplemented!(),
3074 }
3075 })
3076 .collect();
3077
3078 let storage_metadata = self.catalog.state().storage_metadata();
3079 let migrated_storage_collections = migrated_storage_collections
3080 .into_iter()
3081 .flat_map(|item_id| self.catalog.get_entry(item_id).global_ids())
3082 .collect();
3083
3084 self.controller
3089 .storage
3090 .evolve_nullability_for_bootstrap(storage_metadata, compute_collections)
3091 .await
3092 .unwrap_or_terminate("cannot fail to evolve collections");
3093
3094 self.controller
3095 .storage
3096 .create_collections_for_bootstrap(
3097 storage_metadata,
3098 Some(register_ts),
3099 collections,
3100 &migrated_storage_collections,
3101 )
3102 .await
3103 .unwrap_or_terminate("cannot fail to create collections");
3104
3105 for (gid, collection) in &mut derived_builtin_storage_collections {
3108 let entry = self.catalog.get_entry_by_global_id(gid);
3109 let mut derived_since = Antichain::from_elem(Timestamp::MIN);
3110 for dep_id in self.catalog.state().transitive_uses(entry.id()) {
3111 let entry = self.catalog.state().get_entry(&dep_id);
3112 let dep_gid = entry.latest_global_id();
3113
3114 if let Ok((since, _)) = self.controller.storage.collection_frontiers(dep_gid) {
3119 derived_since.join_assign(&since);
3120 }
3121 }
3122 collection.since = Some(derived_since);
3123 }
3124 self.controller
3125 .storage
3126 .create_collections_for_bootstrap(
3127 storage_metadata,
3128 Some(register_ts),
3129 derived_builtin_storage_collections,
3130 &migrated_storage_collections,
3131 )
3132 .await
3133 .unwrap_or_terminate("cannot fail to create collections");
3134
3135 if !self.controller.read_only() {
3136 self.apply_local_write(register_ts).await;
3137 }
3138 }
3139
3140 fn bootstrap_sort_catalog_entries(&self) -> Vec<CatalogEntry> {
3147 let mut indexes_on = BTreeMap::<_, Vec<_>>::new();
3148 let mut non_indexes = Vec::new();
3149 for entry in self.catalog().entries().cloned() {
3150 if let Some(index) = entry.index() {
3151 let on = self.catalog().get_entry_by_global_id(&index.on);
3152 indexes_on.entry(on.id()).or_default().push(entry);
3153 } else {
3154 non_indexes.push(entry);
3155 }
3156 }
3157
3158 let key_fn = |entry: &CatalogEntry| entry.id;
3159 let dependencies_fn = |entry: &CatalogEntry| entry.uses();
3160 sort_topological(&mut non_indexes, key_fn, dependencies_fn);
3161
3162 let mut result = Vec::new();
3163 for entry in non_indexes {
3164 let id = entry.id();
3165 result.push(entry);
3166 if let Some(mut indexes) = indexes_on.remove(&id) {
3167 result.append(&mut indexes);
3168 }
3169 }
3170
3171 soft_assert_or_log!(
3172 indexes_on.is_empty(),
3173 "indexes with missing dependencies: {indexes_on:?}",
3174 );
3175
3176 result
3177 }
3178
3179 #[instrument]
3190 fn bootstrap_dataflow_plans(
3191 &mut self,
3192 ordered_catalog_entries: &[CatalogEntry],
3193 mut cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
3194 ) -> Result<BTreeMap<GlobalId, GlobalExpressions>, AdapterError> {
3195 let mut instance_snapshots = BTreeMap::new();
3201 let mut uncached_expressions = BTreeMap::new();
3202
3203 let optimizer_config = |catalog: &Catalog, cluster_id| {
3204 let system_config = catalog.system_config();
3205 let overrides = catalog.get_cluster(cluster_id).config.features();
3206 OptimizerConfig::from(system_config).override_from(&overrides)
3207 };
3208
3209 for entry in ordered_catalog_entries {
3210 match entry.item() {
3211 CatalogItem::Index(idx) => {
3212 let compute_instance =
3214 instance_snapshots.entry(idx.cluster_id).or_insert_with(|| {
3215 self.instance_snapshot(idx.cluster_id)
3216 .expect("compute instance exists")
3217 });
3218 let global_id = idx.global_id();
3219
3220 if compute_instance.contains_collection(&global_id) {
3223 continue;
3224 }
3225
3226 let optimizer_config = optimizer_config(&self.catalog, idx.cluster_id);
3227
3228 let (optimized_plan, physical_plan, metainfo) =
3229 match cached_global_exprs.remove(&global_id) {
3230 Some(global_expressions)
3231 if global_expressions.optimizer_features
3232 == optimizer_config.features =>
3233 {
3234 debug!("global expression cache hit for {global_id:?}");
3235 (
3236 global_expressions.global_mir,
3237 global_expressions.physical_plan,
3238 global_expressions.dataflow_metainfos,
3239 )
3240 }
3241 Some(_) | None => {
3242 let (optimized_plan, global_lir_plan) = {
3243 let mut optimizer = optimize::index::Optimizer::new(
3245 self.owned_catalog(),
3246 compute_instance.clone(),
3247 global_id,
3248 optimizer_config.clone(),
3249 self.optimizer_metrics(),
3250 );
3251
3252 let index_plan = optimize::index::Index::new(
3254 entry.name().clone(),
3255 idx.on,
3256 idx.keys.to_vec(),
3257 );
3258 let global_mir_plan = optimizer.optimize(index_plan)?;
3259 let optimized_plan = global_mir_plan.df_desc().clone();
3260
3261 let global_lir_plan = optimizer.optimize(global_mir_plan)?;
3263
3264 (optimized_plan, global_lir_plan)
3265 };
3266
3267 let (physical_plan, metainfo) = global_lir_plan.unapply();
3268 let metainfo = {
3269 let notice_ids =
3271 std::iter::repeat_with(|| self.allocate_transient_id())
3272 .map(|(_item_id, gid)| gid)
3273 .take(metainfo.optimizer_notices.len())
3274 .collect::<Vec<_>>();
3275 self.catalog().render_notices(
3277 metainfo,
3278 notice_ids,
3279 Some(idx.global_id()),
3280 )
3281 };
3282 uncached_expressions.insert(
3283 global_id,
3284 GlobalExpressions {
3285 global_mir: optimized_plan.clone(),
3286 physical_plan: physical_plan.clone(),
3287 dataflow_metainfos: metainfo.clone(),
3288 optimizer_features: optimizer_config.features.clone(),
3289 },
3290 );
3291 (optimized_plan, physical_plan, metainfo)
3292 }
3293 };
3294
3295 let catalog = self.catalog_mut();
3296 catalog.set_optimized_plan(idx.global_id(), optimized_plan);
3297 catalog.set_physical_plan(idx.global_id(), physical_plan);
3298 catalog.set_dataflow_metainfo(idx.global_id(), metainfo);
3299
3300 compute_instance.insert_collection(idx.global_id());
3301 }
3302 CatalogItem::MaterializedView(mv) => {
3303 let compute_instance =
3305 instance_snapshots.entry(mv.cluster_id).or_insert_with(|| {
3306 self.instance_snapshot(mv.cluster_id)
3307 .expect("compute instance exists")
3308 });
3309 let global_id = mv.global_id_writes();
3310
3311 let optimizer_config = optimizer_config(&self.catalog, mv.cluster_id);
3312
3313 let (optimized_plan, physical_plan, metainfo) =
3314 match cached_global_exprs.remove(&global_id) {
3315 Some(global_expressions)
3316 if global_expressions.optimizer_features
3317 == optimizer_config.features =>
3318 {
3319 debug!("global expression cache hit for {global_id:?}");
3320 (
3321 global_expressions.global_mir,
3322 global_expressions.physical_plan,
3323 global_expressions.dataflow_metainfos,
3324 )
3325 }
3326 Some(_) | None => {
3327 let (_, internal_view_id) = self.allocate_transient_id();
3328 let debug_name = self
3329 .catalog()
3330 .resolve_full_name(entry.name(), None)
3331 .to_string();
3332 let force_non_monotonic = Default::default();
3333
3334 let (optimized_plan, global_lir_plan) = {
3335 let mut optimizer = optimize::materialized_view::Optimizer::new(
3337 self.owned_catalog().as_optimizer_catalog(),
3338 compute_instance.clone(),
3339 global_id,
3340 internal_view_id,
3341 mv.desc.latest().iter_names().cloned().collect(),
3342 mv.non_null_assertions.clone(),
3343 mv.refresh_schedule.clone(),
3344 debug_name,
3345 optimizer_config.clone(),
3346 self.optimizer_metrics(),
3347 force_non_monotonic,
3348 );
3349
3350 let typ = infer_sql_type_for_catalog(
3353 &mv.raw_expr,
3354 &mv.optimized_expr.as_ref().clone(),
3355 );
3356 let global_mir_plan = optimizer
3357 .optimize((mv.optimized_expr.as_ref().clone(), typ))?;
3358 let optimized_plan = global_mir_plan.df_desc().clone();
3359
3360 let global_lir_plan = optimizer.optimize(global_mir_plan)?;
3362
3363 (optimized_plan, global_lir_plan)
3364 };
3365
3366 let (physical_plan, metainfo) = global_lir_plan.unapply();
3367 let metainfo = {
3368 let notice_ids =
3370 std::iter::repeat_with(|| self.allocate_transient_id())
3371 .map(|(_item_id, global_id)| global_id)
3372 .take(metainfo.optimizer_notices.len())
3373 .collect::<Vec<_>>();
3374 self.catalog().render_notices(
3376 metainfo,
3377 notice_ids,
3378 Some(mv.global_id_writes()),
3379 )
3380 };
3381 uncached_expressions.insert(
3382 global_id,
3383 GlobalExpressions {
3384 global_mir: optimized_plan.clone(),
3385 physical_plan: physical_plan.clone(),
3386 dataflow_metainfos: metainfo.clone(),
3387 optimizer_features: optimizer_config.features.clone(),
3388 },
3389 );
3390 (optimized_plan, physical_plan, metainfo)
3391 }
3392 };
3393
3394 let catalog = self.catalog_mut();
3395 catalog.set_optimized_plan(mv.global_id_writes(), optimized_plan);
3396 catalog.set_physical_plan(mv.global_id_writes(), physical_plan);
3397 catalog.set_dataflow_metainfo(mv.global_id_writes(), metainfo);
3398
3399 compute_instance.insert_collection(mv.global_id_writes());
3400 }
3401 CatalogItem::ContinualTask(ct) => {
3402 let compute_instance =
3403 instance_snapshots.entry(ct.cluster_id).or_insert_with(|| {
3404 self.instance_snapshot(ct.cluster_id)
3405 .expect("compute instance exists")
3406 });
3407 let global_id = ct.global_id();
3408
3409 let optimizer_config = optimizer_config(&self.catalog, ct.cluster_id);
3410
3411 let (optimized_plan, physical_plan, metainfo) =
3412 match cached_global_exprs.remove(&global_id) {
3413 Some(global_expressions)
3414 if global_expressions.optimizer_features
3415 == optimizer_config.features =>
3416 {
3417 debug!("global expression cache hit for {global_id:?}");
3418 (
3419 global_expressions.global_mir,
3420 global_expressions.physical_plan,
3421 global_expressions.dataflow_metainfos,
3422 )
3423 }
3424 Some(_) | None => {
3425 let debug_name = self
3426 .catalog()
3427 .resolve_full_name(entry.name(), None)
3428 .to_string();
3429 let (optimized_plan, physical_plan, metainfo, optimizer_features) =
3430 self.optimize_create_continual_task(
3431 ct,
3432 global_id,
3433 self.owned_catalog(),
3434 debug_name,
3435 )?;
3436 uncached_expressions.insert(
3437 global_id,
3438 GlobalExpressions {
3439 global_mir: optimized_plan.clone(),
3440 physical_plan: physical_plan.clone(),
3441 dataflow_metainfos: metainfo.clone(),
3442 optimizer_features,
3443 },
3444 );
3445 (optimized_plan, physical_plan, metainfo)
3446 }
3447 };
3448
3449 let catalog = self.catalog_mut();
3450 catalog.set_optimized_plan(ct.global_id(), optimized_plan);
3451 catalog.set_physical_plan(ct.global_id(), physical_plan);
3452 catalog.set_dataflow_metainfo(ct.global_id(), metainfo);
3453
3454 compute_instance.insert_collection(ct.global_id());
3455 }
3456 CatalogItem::Table(_)
3457 | CatalogItem::Source(_)
3458 | CatalogItem::Log(_)
3459 | CatalogItem::View(_)
3460 | CatalogItem::Sink(_)
3461 | CatalogItem::Type(_)
3462 | CatalogItem::Func(_)
3463 | CatalogItem::Secret(_)
3464 | CatalogItem::Connection(_) => (),
3465 }
3466 }
3467
3468 Ok(uncached_expressions)
3469 }
3470
3471 async fn bootstrap_dataflow_as_ofs(&mut self) -> BTreeMap<GlobalId, ReadHold<Timestamp>> {
3481 let mut catalog_ids = Vec::new();
3482 let mut dataflows = Vec::new();
3483 let mut read_policies = BTreeMap::new();
3484 for entry in self.catalog.entries() {
3485 let gid = match entry.item() {
3486 CatalogItem::Index(idx) => idx.global_id(),
3487 CatalogItem::MaterializedView(mv) => mv.global_id_writes(),
3488 CatalogItem::ContinualTask(ct) => ct.global_id(),
3489 CatalogItem::Table(_)
3490 | CatalogItem::Source(_)
3491 | CatalogItem::Log(_)
3492 | CatalogItem::View(_)
3493 | CatalogItem::Sink(_)
3494 | CatalogItem::Type(_)
3495 | CatalogItem::Func(_)
3496 | CatalogItem::Secret(_)
3497 | CatalogItem::Connection(_) => continue,
3498 };
3499 if let Some(plan) = self.catalog.try_get_physical_plan(&gid) {
3500 catalog_ids.push(gid);
3501 dataflows.push(plan.clone());
3502
3503 if let Some(compaction_window) = entry.item().initial_logical_compaction_window() {
3504 read_policies.insert(gid, compaction_window.into());
3505 }
3506 }
3507 }
3508
3509 let read_ts = self.get_local_read_ts().await;
3510 let read_holds = as_of_selection::run(
3511 &mut dataflows,
3512 &read_policies,
3513 &*self.controller.storage_collections,
3514 read_ts,
3515 self.controller.read_only(),
3516 );
3517
3518 let catalog = self.catalog_mut();
3519 for (id, plan) in catalog_ids.into_iter().zip_eq(dataflows) {
3520 catalog.set_physical_plan(id, plan);
3521 }
3522
3523 read_holds
3524 }
3525
3526 fn serve(
3535 mut self,
3536 mut internal_cmd_rx: mpsc::UnboundedReceiver<Message>,
3537 mut strict_serializable_reads_rx: mpsc::UnboundedReceiver<(ConnectionId, PendingReadTxn)>,
3538 mut cmd_rx: mpsc::UnboundedReceiver<(OpenTelemetryContext, Command)>,
3539 group_commit_rx: appends::GroupCommitWaiter,
3540 ) -> LocalBoxFuture<'static, ()> {
3541 async move {
3542 let mut cluster_events = self.controller.events_stream();
3544 let last_message = Arc::new(Mutex::new(LastMessage {
3545 kind: "none",
3546 stmt: None,
3547 }));
3548
3549 let (idle_tx, mut idle_rx) = tokio::sync::mpsc::channel(1);
3550 let idle_metric = self.metrics.queue_busy_seconds.clone();
3551 let last_message_watchdog = Arc::clone(&last_message);
3552
3553 spawn(|| "coord watchdog", async move {
3554 let mut interval = tokio::time::interval(Duration::from_secs(5));
3559 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
3563
3564 let mut coord_stuck = false;
3566
3567 loop {
3568 interval.tick().await;
3569
3570 let duration = tokio::time::Duration::from_secs(30);
3572 let timeout = tokio::time::timeout(duration, idle_tx.reserve()).await;
3573 let Ok(maybe_permit) = timeout else {
3574 if !coord_stuck {
3576 let last_message = last_message_watchdog.lock().expect("poisoned");
3577 tracing::warn!(
3578 last_message_kind = %last_message.kind,
3579 last_message_sql = %last_message.stmt_to_string(),
3580 "coordinator stuck for {duration:?}",
3581 );
3582 }
3583 coord_stuck = true;
3584
3585 continue;
3586 };
3587
3588 if coord_stuck {
3590 tracing::info!("Coordinator became unstuck");
3591 }
3592 coord_stuck = false;
3593
3594 let Ok(permit) = maybe_permit else {
3596 break;
3597 };
3598
3599 permit.send(idle_metric.start_timer());
3600 }
3601 });
3602
3603 self.schedule_storage_usage_collection().await;
3604 self.spawn_privatelink_vpc_endpoints_watch_task();
3605 self.spawn_statement_logging_task();
3606 flags::tracing_config(self.catalog.system_config()).apply(&self.tracing_handle);
3607
3608 let warn_threshold = self
3610 .catalog()
3611 .system_config()
3612 .coord_slow_message_warn_threshold();
3613
3614 const MESSAGE_BATCH: usize = 64;
3616 let mut messages = Vec::with_capacity(MESSAGE_BATCH);
3617 let mut cmd_messages = Vec::with_capacity(MESSAGE_BATCH);
3618
3619 let message_batch = self.metrics.message_batch.clone();
3620
3621 loop {
3622 select! {
3626 biased;
3631
3632 _ = internal_cmd_rx.recv_many(&mut messages, MESSAGE_BATCH) => {},
3636 Some(event) = cluster_events.next() => {
3640 messages.push(Message::ClusterEvent(event))
3641 },
3642 () = self.controller.ready() => {
3646 let controller = match self.controller.get_readiness() {
3650 Readiness::Storage => ControllerReadiness::Storage,
3651 Readiness::Compute => ControllerReadiness::Compute,
3652 Readiness::Metrics(_) => ControllerReadiness::Metrics,
3653 Readiness::Internal(_) => ControllerReadiness::Internal,
3654 Readiness::NotReady => unreachable!("just signaled as ready"),
3655 };
3656 messages.push(Message::ControllerReady { controller });
3657 }
3658 permit = group_commit_rx.ready() => {
3661 let user_write_spans = self.pending_writes.iter().flat_map(|x| match x {
3667 PendingWriteTxn::User{span, ..} => Some(span),
3668 PendingWriteTxn::System{..} => None,
3669 });
3670 let span = match user_write_spans.exactly_one() {
3671 Ok(span) => span.clone(),
3672 Err(user_write_spans) => {
3673 let span = info_span!(parent: None, "group_commit_notify");
3674 for s in user_write_spans {
3675 span.follows_from(s);
3676 }
3677 span
3678 }
3679 };
3680 messages.push(Message::GroupCommitInitiate(span, Some(permit)));
3681 },
3682 count = cmd_rx.recv_many(&mut cmd_messages, MESSAGE_BATCH) => {
3686 if count == 0 {
3687 break;
3688 } else {
3689 messages.extend(cmd_messages.drain(..).map(
3690 |(otel_ctx, cmd)| Message::Command(otel_ctx, cmd),
3691 ));
3692 }
3693 },
3694 Some(pending_read_txn) = strict_serializable_reads_rx.recv() => {
3698 let mut pending_read_txns = vec![pending_read_txn];
3699 while let Ok(pending_read_txn) = strict_serializable_reads_rx.try_recv() {
3700 pending_read_txns.push(pending_read_txn);
3701 }
3702 for (conn_id, pending_read_txn) in pending_read_txns {
3703 let prev = self
3704 .pending_linearize_read_txns
3705 .insert(conn_id, pending_read_txn);
3706 soft_assert_or_log!(
3707 prev.is_none(),
3708 "connections can not have multiple concurrent reads, prev: {prev:?}"
3709 )
3710 }
3711 messages.push(Message::LinearizeReads);
3712 }
3713 _ = self.advance_timelines_interval.tick() => {
3717 let span = info_span!(parent: None, "coord::advance_timelines_interval");
3718 span.follows_from(Span::current());
3719
3720 if self.controller.read_only() {
3725 messages.push(Message::AdvanceTimelines);
3726 } else {
3727 messages.push(Message::GroupCommitInitiate(span, None));
3728 }
3729 },
3730 _ = self.check_cluster_scheduling_policies_interval.tick() => {
3734 messages.push(Message::CheckSchedulingPolicies);
3735 },
3736
3737 _ = self.caught_up_check_interval.tick() => {
3741 self.maybe_check_caught_up().await;
3746
3747 continue;
3748 },
3749
3750 timer = idle_rx.recv() => {
3755 timer.expect("does not drop").observe_duration();
3756 self.metrics
3757 .message_handling
3758 .with_label_values(&["watchdog"])
3759 .observe(0.0);
3760 continue;
3761 }
3762 };
3763
3764 message_batch.observe(f64::cast_lossy(messages.len()));
3766
3767 for msg in messages.drain(..) {
3768 let msg_kind = msg.kind();
3771 let span = span!(
3772 target: "mz_adapter::coord::handle_message_loop",
3773 Level::INFO,
3774 "coord::handle_message",
3775 kind = msg_kind
3776 );
3777 let otel_context = span.context().span().span_context().clone();
3778
3779 *last_message.lock().expect("poisoned") = LastMessage {
3783 kind: msg_kind,
3784 stmt: match &msg {
3785 Message::Command(
3786 _,
3787 Command::Execute {
3788 portal_name,
3789 session,
3790 ..
3791 },
3792 ) => session
3793 .get_portal_unverified(portal_name)
3794 .and_then(|p| p.stmt.as_ref().map(Arc::clone)),
3795 _ => None,
3796 },
3797 };
3798
3799 let start = Instant::now();
3800 self.handle_message(msg).instrument(span).await;
3801 let duration = start.elapsed();
3802
3803 self.metrics
3804 .message_handling
3805 .with_label_values(&[msg_kind])
3806 .observe(duration.as_secs_f64());
3807
3808 if duration > warn_threshold {
3810 let trace_id = otel_context.is_valid().then(|| otel_context.trace_id());
3811 tracing::error!(
3812 ?msg_kind,
3813 ?trace_id,
3814 ?duration,
3815 "very slow coordinator message"
3816 );
3817 }
3818 }
3819 }
3820 if let Some(catalog) = Arc::into_inner(self.catalog) {
3823 catalog.expire().await;
3824 }
3825 }
3826 .boxed_local()
3827 }
3828
3829 fn catalog(&self) -> &Catalog {
3831 &self.catalog
3832 }
3833
3834 fn owned_catalog(&self) -> Arc<Catalog> {
3837 Arc::clone(&self.catalog)
3838 }
3839
3840 fn optimizer_metrics(&self) -> OptimizerMetrics {
3843 self.optimizer_metrics.clone()
3844 }
3845
3846 fn catalog_mut(&mut self) -> &mut Catalog {
3848 Arc::make_mut(&mut self.catalog)
3856 }
3857
3858 async fn refill_user_id_pool(&mut self, min_count: u64) -> Result<(), AdapterError> {
3863 let batch_size = USER_ID_POOL_BATCH_SIZE.get(self.catalog().system_config().dyncfgs());
3864 let to_allocate = min_count.max(u64::from(batch_size));
3865 let id_ts = self.get_catalog_write_ts().await;
3866 let ids = self.catalog().allocate_user_ids(to_allocate, id_ts).await?;
3867 if let (Some((first_id, _)), Some((last_id, _))) = (ids.first(), ids.last()) {
3868 let start = match first_id {
3869 CatalogItemId::User(id) => *id,
3870 other => {
3871 return Err(AdapterError::Internal(format!(
3872 "expected User CatalogItemId, got {other:?}"
3873 )));
3874 }
3875 };
3876 let end = match last_id {
3877 CatalogItemId::User(id) => *id + 1, other => {
3879 return Err(AdapterError::Internal(format!(
3880 "expected User CatalogItemId, got {other:?}"
3881 )));
3882 }
3883 };
3884 self.user_id_pool.refill(start, end);
3885 } else {
3886 return Err(AdapterError::Internal(
3887 "catalog returned no user IDs".into(),
3888 ));
3889 }
3890 Ok(())
3891 }
3892
3893 async fn allocate_user_id(&mut self) -> Result<(CatalogItemId, GlobalId), AdapterError> {
3895 if let Some(id) = self.user_id_pool.allocate() {
3896 return Ok((CatalogItemId::User(id), GlobalId::User(id)));
3897 }
3898 self.refill_user_id_pool(1).await?;
3899 let id = self.user_id_pool.allocate().expect("ID pool just refilled");
3900 Ok((CatalogItemId::User(id), GlobalId::User(id)))
3901 }
3902
3903 async fn allocate_user_ids(
3905 &mut self,
3906 count: u64,
3907 ) -> Result<Vec<(CatalogItemId, GlobalId)>, AdapterError> {
3908 if self.user_id_pool.remaining() < count {
3909 self.refill_user_id_pool(count).await?;
3910 }
3911 let raw_ids = self
3912 .user_id_pool
3913 .allocate_many(count)
3914 .expect("pool has enough IDs after refill");
3915 Ok(raw_ids
3916 .into_iter()
3917 .map(|id| (CatalogItemId::User(id), GlobalId::User(id)))
3918 .collect())
3919 }
3920
3921 fn connection_context(&self) -> &ConnectionContext {
3923 self.controller.connection_context()
3924 }
3925
3926 fn secrets_reader(&self) -> &Arc<dyn SecretsReader> {
3928 &self.connection_context().secrets_reader
3929 }
3930
3931 #[allow(dead_code)]
3936 pub(crate) fn broadcast_notice(&self, notice: AdapterNotice) {
3937 for meta in self.active_conns.values() {
3938 let _ = meta.notice_tx.send(notice.clone());
3939 }
3940 }
3941
3942 pub(crate) fn broadcast_notice_tx(
3945 &self,
3946 ) -> Box<dyn FnOnce(AdapterNotice) -> () + Send + 'static> {
3947 let senders: Vec<_> = self
3948 .active_conns
3949 .values()
3950 .map(|meta| meta.notice_tx.clone())
3951 .collect();
3952 Box::new(move |notice| {
3953 for tx in senders {
3954 let _ = tx.send(notice.clone());
3955 }
3956 })
3957 }
3958
3959 pub(crate) fn active_conns(&self) -> &BTreeMap<ConnectionId, ConnMeta> {
3960 &self.active_conns
3961 }
3962
3963 #[instrument(level = "debug")]
3964 pub(crate) fn retire_execution(
3965 &mut self,
3966 reason: StatementEndedExecutionReason,
3967 ctx_extra: ExecuteContextExtra,
3968 ) {
3969 if let Some(uuid) = ctx_extra.retire() {
3970 self.end_statement_execution(uuid, reason);
3971 }
3972 }
3973
3974 #[instrument(level = "debug")]
3976 pub fn dataflow_builder(&self, instance: ComputeInstanceId) -> DataflowBuilder<'_> {
3977 let compute = self
3978 .instance_snapshot(instance)
3979 .expect("compute instance does not exist");
3980 DataflowBuilder::new(self.catalog().state(), compute)
3981 }
3982
3983 pub fn instance_snapshot(
3985 &self,
3986 id: ComputeInstanceId,
3987 ) -> Result<ComputeInstanceSnapshot, InstanceMissing> {
3988 ComputeInstanceSnapshot::new(&self.controller, id)
3989 }
3990
3991 pub(crate) async fn ship_dataflow(
3998 &mut self,
3999 dataflow: DataflowDescription<Plan>,
4000 instance: ComputeInstanceId,
4001 target_replica: Option<ReplicaId>,
4002 ) {
4003 self.try_ship_dataflow(dataflow, instance, target_replica)
4004 .await
4005 .unwrap_or_terminate("dataflow creation cannot fail");
4006 }
4007
4008 pub(crate) async fn try_ship_dataflow(
4011 &mut self,
4012 dataflow: DataflowDescription<Plan>,
4013 instance: ComputeInstanceId,
4014 target_replica: Option<ReplicaId>,
4015 ) -> Result<(), DataflowCreationError> {
4016 let export_ids = dataflow.exported_index_ids().collect();
4019
4020 self.controller
4021 .compute
4022 .create_dataflow(instance, dataflow, target_replica)?;
4023
4024 self.initialize_compute_read_policies(export_ids, instance, CompactionWindow::Default)
4025 .await;
4026
4027 Ok(())
4028 }
4029
4030 pub(crate) fn allow_writes(&mut self, instance: ComputeInstanceId, id: GlobalId) {
4034 self.controller
4035 .compute
4036 .allow_writes(instance, id)
4037 .unwrap_or_terminate("allow_writes cannot fail");
4038 }
4039
4040 pub(crate) async fn ship_dataflow_and_notice_builtin_table_updates(
4042 &mut self,
4043 dataflow: DataflowDescription<Plan>,
4044 instance: ComputeInstanceId,
4045 notice_builtin_updates_fut: Option<BuiltinTableAppendNotify>,
4046 target_replica: Option<ReplicaId>,
4047 ) {
4048 if let Some(notice_builtin_updates_fut) = notice_builtin_updates_fut {
4049 let ship_dataflow_fut = self.ship_dataflow(dataflow, instance, target_replica);
4050 let ((), ()) =
4051 futures::future::join(notice_builtin_updates_fut, ship_dataflow_fut).await;
4052 } else {
4053 self.ship_dataflow(dataflow, instance, target_replica).await;
4054 }
4055 }
4056
4057 pub fn install_compute_watch_set(
4061 &mut self,
4062 conn_id: ConnectionId,
4063 objects: BTreeSet<GlobalId>,
4064 t: Timestamp,
4065 state: WatchSetResponse,
4066 ) -> Result<(), CollectionLookupError> {
4067 let ws_id = self.controller.install_compute_watch_set(objects, t)?;
4068 self.connection_watch_sets
4069 .entry(conn_id.clone())
4070 .or_default()
4071 .insert(ws_id);
4072 self.installed_watch_sets.insert(ws_id, (conn_id, state));
4073 Ok(())
4074 }
4075
4076 pub fn install_storage_watch_set(
4080 &mut self,
4081 conn_id: ConnectionId,
4082 objects: BTreeSet<GlobalId>,
4083 t: Timestamp,
4084 state: WatchSetResponse,
4085 ) -> Result<(), CollectionMissing> {
4086 let ws_id = self.controller.install_storage_watch_set(objects, t)?;
4087 self.connection_watch_sets
4088 .entry(conn_id.clone())
4089 .or_default()
4090 .insert(ws_id);
4091 self.installed_watch_sets.insert(ws_id, (conn_id, state));
4092 Ok(())
4093 }
4094
4095 pub fn cancel_pending_watchsets(&mut self, conn_id: &ConnectionId) {
4097 if let Some(ws_ids) = self.connection_watch_sets.remove(conn_id) {
4098 for ws_id in ws_ids {
4099 self.installed_watch_sets.remove(&ws_id);
4100 }
4101 }
4102 }
4103
4104 pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
4108 let global_timelines: BTreeMap<_, _> = self
4114 .global_timelines
4115 .iter()
4116 .map(|(timeline, state)| (timeline.to_string(), format!("{state:?}")))
4117 .collect();
4118 let active_conns: BTreeMap<_, _> = self
4119 .active_conns
4120 .iter()
4121 .map(|(id, meta)| (id.unhandled().to_string(), format!("{meta:?}")))
4122 .collect();
4123 let txn_read_holds: BTreeMap<_, _> = self
4124 .txn_read_holds
4125 .iter()
4126 .map(|(id, capability)| (id.unhandled().to_string(), format!("{capability:?}")))
4127 .collect();
4128 let pending_peeks: BTreeMap<_, _> = self
4129 .pending_peeks
4130 .iter()
4131 .map(|(id, peek)| (id.to_string(), format!("{peek:?}")))
4132 .collect();
4133 let client_pending_peeks: BTreeMap<_, _> = self
4134 .client_pending_peeks
4135 .iter()
4136 .map(|(id, peek)| {
4137 let peek: BTreeMap<_, _> = peek
4138 .iter()
4139 .map(|(uuid, storage_id)| (uuid.to_string(), storage_id))
4140 .collect();
4141 (id.to_string(), peek)
4142 })
4143 .collect();
4144 let pending_linearize_read_txns: BTreeMap<_, _> = self
4145 .pending_linearize_read_txns
4146 .iter()
4147 .map(|(id, read_txn)| (id.unhandled().to_string(), format!("{read_txn:?}")))
4148 .collect();
4149
4150 Ok(serde_json::json!({
4151 "global_timelines": global_timelines,
4152 "active_conns": active_conns,
4153 "txn_read_holds": txn_read_holds,
4154 "pending_peeks": pending_peeks,
4155 "client_pending_peeks": client_pending_peeks,
4156 "pending_linearize_read_txns": pending_linearize_read_txns,
4157 "controller": self.controller.dump().await?,
4158 }))
4159 }
4160
4161 async fn prune_storage_usage_events_on_startup(&self, retention_period: Duration) {
4175 let item_id = self
4176 .catalog()
4177 .resolve_builtin_table(&MZ_STORAGE_USAGE_BY_SHARD);
4178 let global_id = self.catalog.get_entry(&item_id).latest_global_id();
4179 let read_ts = self.get_local_read_ts().await;
4180 let current_contents_fut = self
4181 .controller
4182 .storage_collections
4183 .snapshot(global_id, read_ts);
4184 let internal_cmd_tx = self.internal_cmd_tx.clone();
4185 spawn(|| "storage_usage_prune", async move {
4186 let mut current_contents = current_contents_fut
4187 .await
4188 .unwrap_or_terminate("cannot fail to fetch snapshot");
4189 differential_dataflow::consolidation::consolidate(&mut current_contents);
4190
4191 let cutoff_ts = u128::from(read_ts).saturating_sub(retention_period.as_millis());
4192 let mut expired = Vec::new();
4193 for (row, diff) in current_contents {
4194 assert_eq!(
4195 diff, 1,
4196 "consolidated contents should not contain retractions: ({row:#?}, {diff:#?})"
4197 );
4198 let collection_timestamp = row
4200 .unpack()
4201 .get(3)
4202 .expect("definition of mz_storage_by_shard changed")
4203 .unwrap_timestamptz();
4204 let collection_timestamp = collection_timestamp.timestamp_millis();
4205 let collection_timestamp: u128 = collection_timestamp
4206 .try_into()
4207 .expect("all collections happen after Jan 1 1970");
4208 if collection_timestamp < cutoff_ts {
4209 debug!("pruning storage event {row:?}");
4210 let builtin_update = BuiltinTableUpdate::row(item_id, row, Diff::MINUS_ONE);
4211 expired.push(builtin_update);
4212 }
4213 }
4214
4215 let _ = internal_cmd_tx.send(Message::StorageUsagePrune(expired));
4217 });
4218 }
4219
4220 fn current_credit_consumption_rate(&self) -> Numeric {
4221 self.catalog()
4222 .user_cluster_replicas()
4223 .filter_map(|replica| match &replica.config.location {
4224 ReplicaLocation::Managed(location) => Some(location.size_for_billing()),
4225 ReplicaLocation::Unmanaged(_) => None,
4226 })
4227 .map(|size| {
4228 self.catalog()
4229 .cluster_replica_sizes()
4230 .0
4231 .get(size)
4232 .expect("location size is validated against the cluster replica sizes")
4233 .credits_per_hour
4234 })
4235 .sum()
4236 }
4237}
4238
4239#[cfg(test)]
4240impl Coordinator {
4241 #[allow(dead_code)]
4242 async fn verify_ship_dataflow_no_error(&mut self, dataflow: DataflowDescription<Plan>) {
4243 let compute_instance = ComputeInstanceId::user(1).expect("1 is a valid ID");
4251
4252 let _: () = self.ship_dataflow(dataflow, compute_instance, None).await;
4253 }
4254}
4255
4256struct LastMessage {
4258 kind: &'static str,
4259 stmt: Option<Arc<Statement<Raw>>>,
4260}
4261
4262impl LastMessage {
4263 fn stmt_to_string(&self) -> Cow<'static, str> {
4265 self.stmt
4266 .as_ref()
4267 .map(|stmt| stmt.to_ast_string_redacted().into())
4268 .unwrap_or(Cow::Borrowed("<none>"))
4269 }
4270}
4271
4272impl fmt::Debug for LastMessage {
4273 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4274 f.debug_struct("LastMessage")
4275 .field("kind", &self.kind)
4276 .field("stmt", &self.stmt_to_string())
4277 .finish()
4278 }
4279}
4280
4281impl Drop for LastMessage {
4282 fn drop(&mut self) {
4283 if std::thread::panicking() {
4285 eprintln!("Coordinator panicking, dumping last message\n{self:?}",);
4287 }
4288 }
4289}
4290
4291pub fn serve(
4303 Config {
4304 controller_config,
4305 controller_envd_epoch,
4306 mut storage,
4307 audit_logs_iterator,
4308 timestamp_oracle_url,
4309 unsafe_mode,
4310 all_features,
4311 build_info,
4312 environment_id,
4313 metrics_registry,
4314 now,
4315 secrets_controller,
4316 cloud_resource_controller,
4317 cluster_replica_sizes,
4318 builtin_system_cluster_config,
4319 builtin_catalog_server_cluster_config,
4320 builtin_probe_cluster_config,
4321 builtin_support_cluster_config,
4322 builtin_analytics_cluster_config,
4323 system_parameter_defaults,
4324 availability_zones,
4325 storage_usage_client,
4326 storage_usage_collection_interval,
4327 storage_usage_retention_period,
4328 segment_client,
4329 egress_addresses,
4330 aws_account_id,
4331 aws_privatelink_availability_zones,
4332 connection_context,
4333 connection_limit_callback,
4334 remote_system_parameters,
4335 webhook_concurrency_limit,
4336 http_host_name,
4337 tracing_handle,
4338 read_only_controllers,
4339 caught_up_trigger: clusters_caught_up_trigger,
4340 helm_chart_version,
4341 license_key,
4342 external_login_password_mz_system,
4343 force_builtin_schema_migration,
4344 }: Config,
4345) -> BoxFuture<'static, Result<(Handle, Client), AdapterError>> {
4346 async move {
4347 let coord_start = Instant::now();
4348 info!("startup: coordinator init: beginning");
4349 info!("startup: coordinator init: preamble beginning");
4350
4351 let _builtins = LazyLock::force(&BUILTINS_STATIC);
4355
4356 let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
4357 let (internal_cmd_tx, internal_cmd_rx) = mpsc::unbounded_channel();
4358 let (strict_serializable_reads_tx, strict_serializable_reads_rx) =
4359 mpsc::unbounded_channel();
4360
4361 if !availability_zones.iter().all_unique() {
4363 coord_bail!("availability zones must be unique");
4364 }
4365
4366 let aws_principal_context = match (
4367 aws_account_id,
4368 connection_context.aws_external_id_prefix.clone(),
4369 ) {
4370 (Some(aws_account_id), Some(aws_external_id_prefix)) => Some(AwsPrincipalContext {
4371 aws_account_id,
4372 aws_external_id_prefix,
4373 }),
4374 _ => None,
4375 };
4376
4377 let aws_privatelink_availability_zones = aws_privatelink_availability_zones
4378 .map(|azs_vec| BTreeSet::from_iter(azs_vec.iter().cloned()));
4379
4380 info!(
4381 "startup: coordinator init: preamble complete in {:?}",
4382 coord_start.elapsed()
4383 );
4384 let oracle_init_start = Instant::now();
4385 info!("startup: coordinator init: timestamp oracle init beginning");
4386
4387 let timestamp_oracle_config = timestamp_oracle_url
4388 .map(|url| TimestampOracleConfig::from_url(&url, &metrics_registry))
4389 .transpose()?;
4390 let mut initial_timestamps =
4391 get_initial_oracle_timestamps(×tamp_oracle_config).await?;
4392
4393 initial_timestamps
4397 .entry(Timeline::EpochMilliseconds)
4398 .or_insert_with(mz_repr::Timestamp::minimum);
4399 let mut timestamp_oracles = BTreeMap::new();
4400 for (timeline, initial_timestamp) in initial_timestamps {
4401 Coordinator::ensure_timeline_state_with_initial_time(
4402 &timeline,
4403 initial_timestamp,
4404 now.clone(),
4405 timestamp_oracle_config.clone(),
4406 &mut timestamp_oracles,
4407 read_only_controllers,
4408 )
4409 .await;
4410 }
4411
4412 let catalog_upper = storage.current_upper().await;
4416 let epoch_millis_oracle = ×tamp_oracles
4422 .get(&Timeline::EpochMilliseconds)
4423 .expect("inserted above")
4424 .oracle;
4425
4426 let mut boot_ts = if read_only_controllers {
4427 let read_ts = epoch_millis_oracle.read_ts().await;
4428 std::cmp::max(read_ts, catalog_upper)
4429 } else {
4430 epoch_millis_oracle.apply_write(catalog_upper).await;
4433 epoch_millis_oracle.write_ts().await.timestamp
4434 };
4435
4436 info!(
4437 "startup: coordinator init: timestamp oracle init complete in {:?}",
4438 oracle_init_start.elapsed()
4439 );
4440
4441 let catalog_open_start = Instant::now();
4442 info!("startup: coordinator init: catalog open beginning");
4443 let persist_client = controller_config
4444 .persist_clients
4445 .open(controller_config.persist_location.clone())
4446 .await
4447 .context("opening persist client")?;
4448 let builtin_item_migration_config =
4449 BuiltinItemMigrationConfig {
4450 persist_client: persist_client.clone(),
4451 read_only: read_only_controllers,
4452 force_migration: force_builtin_schema_migration,
4453 }
4454 ;
4455 let OpenCatalogResult {
4456 mut catalog,
4457 migrated_storage_collections_0dt,
4458 new_builtin_collections,
4459 builtin_table_updates,
4460 cached_global_exprs,
4461 uncached_local_exprs,
4462 } = Catalog::open(mz_catalog::config::Config {
4463 storage,
4464 metrics_registry: &metrics_registry,
4465 state: mz_catalog::config::StateConfig {
4466 unsafe_mode,
4467 all_features,
4468 build_info,
4469 environment_id: environment_id.clone(),
4470 read_only: read_only_controllers,
4471 now: now.clone(),
4472 boot_ts: boot_ts.clone(),
4473 skip_migrations: false,
4474 cluster_replica_sizes,
4475 builtin_system_cluster_config,
4476 builtin_catalog_server_cluster_config,
4477 builtin_probe_cluster_config,
4478 builtin_support_cluster_config,
4479 builtin_analytics_cluster_config,
4480 system_parameter_defaults,
4481 remote_system_parameters,
4482 availability_zones,
4483 egress_addresses,
4484 aws_principal_context,
4485 aws_privatelink_availability_zones,
4486 connection_context,
4487 http_host_name,
4488 builtin_item_migration_config,
4489 persist_client: persist_client.clone(),
4490 enable_expression_cache_override: None,
4491 helm_chart_version,
4492 external_login_password_mz_system,
4493 license_key: license_key.clone(),
4494 },
4495 })
4496 .await?;
4497
4498 let catalog_upper = catalog.current_upper().await;
4501 boot_ts = std::cmp::max(boot_ts, catalog_upper);
4502
4503 if !read_only_controllers {
4504 epoch_millis_oracle.apply_write(boot_ts).await;
4505 }
4506
4507 info!(
4508 "startup: coordinator init: catalog open complete in {:?}",
4509 catalog_open_start.elapsed()
4510 );
4511
4512 let coord_thread_start = Instant::now();
4513 info!("startup: coordinator init: coordinator thread start beginning");
4514
4515 let session_id = catalog.config().session_id;
4516 let start_instant = catalog.config().start_instant;
4517
4518 let (bootstrap_tx, bootstrap_rx) = oneshot::channel();
4522 let handle = TokioHandle::current();
4523
4524 let metrics = Metrics::register_into(&metrics_registry);
4525 let metrics_clone = metrics.clone();
4526 let optimizer_metrics = OptimizerMetrics::register_into(
4527 &metrics_registry,
4528 catalog.system_config().optimizer_e2e_latency_warning_threshold(),
4529 );
4530 let segment_client_clone = segment_client.clone();
4531 let coord_now = now.clone();
4532 let advance_timelines_interval =
4533 tokio::time::interval(catalog.system_config().default_timestamp_interval());
4534 let mut check_scheduling_policies_interval = tokio::time::interval(
4535 catalog
4536 .system_config()
4537 .cluster_check_scheduling_policies_interval(),
4538 );
4539 check_scheduling_policies_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
4540
4541 let clusters_caught_up_check_interval = if read_only_controllers {
4542 let dyncfgs = catalog.system_config().dyncfgs();
4543 let interval = WITH_0DT_DEPLOYMENT_CAUGHT_UP_CHECK_INTERVAL.get(dyncfgs);
4544
4545 let mut interval = tokio::time::interval(interval);
4546 interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
4547 interval
4548 } else {
4549 let mut interval = tokio::time::interval(Duration::from_secs(60 * 60));
4557 interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
4558 interval
4559 };
4560
4561 let clusters_caught_up_check =
4562 clusters_caught_up_trigger.map(|trigger| {
4563 let mut exclude_collections: BTreeSet<GlobalId> =
4564 new_builtin_collections.into_iter().collect();
4565
4566 let mut todo: Vec<_> = migrated_storage_collections_0dt
4576 .iter()
4577 .filter(|id| {
4578 catalog.state().get_entry(id).is_materialized_view()
4579 })
4580 .copied()
4581 .collect();
4582 while let Some(item_id) = todo.pop() {
4583 let entry = catalog.state().get_entry(&item_id);
4584 exclude_collections.extend(entry.global_ids());
4585 todo.extend_from_slice(entry.used_by());
4586 }
4587
4588 CaughtUpCheckContext {
4589 trigger,
4590 exclude_collections,
4591 }
4592 });
4593
4594 if let Some(TimestampOracleConfig::Postgres(pg_config)) =
4595 timestamp_oracle_config.as_ref()
4596 {
4597 let pg_timestamp_oracle_params =
4600 flags::timestamp_oracle_config(catalog.system_config());
4601 pg_timestamp_oracle_params.apply(pg_config);
4602 }
4603
4604 let connection_limit_callback: Arc<dyn Fn(&SystemVars) + Send + Sync> =
4607 Arc::new(move |system_vars: &SystemVars| {
4608 let limit: u64 = system_vars.max_connections().cast_into();
4609 let superuser_reserved: u64 =
4610 system_vars.superuser_reserved_connections().cast_into();
4611
4612 let superuser_reserved = if superuser_reserved >= limit {
4617 tracing::warn!(
4618 "superuser_reserved ({superuser_reserved}) is greater than max connections ({limit})!"
4619 );
4620 limit
4621 } else {
4622 superuser_reserved
4623 };
4624
4625 (connection_limit_callback)(limit, superuser_reserved);
4626 });
4627 catalog.system_config_mut().register_callback(
4628 &mz_sql::session::vars::MAX_CONNECTIONS,
4629 Arc::clone(&connection_limit_callback),
4630 );
4631 catalog.system_config_mut().register_callback(
4632 &mz_sql::session::vars::SUPERUSER_RESERVED_CONNECTIONS,
4633 connection_limit_callback,
4634 );
4635
4636 let (group_commit_tx, group_commit_rx) = appends::notifier();
4637
4638 let parent_span = tracing::Span::current();
4639 let thread = thread::Builder::new()
4640 .stack_size(3 * stack::STACK_SIZE)
4644 .name("coordinator".to_string())
4645 .spawn(move || {
4646 let span = info_span!(parent: parent_span, "coord::coordinator").entered();
4647
4648 let controller = handle
4649 .block_on({
4650 catalog.initialize_controller(
4651 controller_config,
4652 controller_envd_epoch,
4653 read_only_controllers,
4654 )
4655 })
4656 .unwrap_or_terminate("failed to initialize storage_controller");
4657 let catalog_upper = handle.block_on(catalog.current_upper());
4660 boot_ts = std::cmp::max(boot_ts, catalog_upper);
4661 if !read_only_controllers {
4662 let epoch_millis_oracle = ×tamp_oracles
4663 .get(&Timeline::EpochMilliseconds)
4664 .expect("inserted above")
4665 .oracle;
4666 handle.block_on(epoch_millis_oracle.apply_write(boot_ts));
4667 }
4668
4669 let catalog = Arc::new(catalog);
4670
4671 let caching_secrets_reader = CachingSecretsReader::new(secrets_controller.reader());
4672 let mut coord = Coordinator {
4673 controller,
4674 catalog,
4675 internal_cmd_tx,
4676 group_commit_tx,
4677 strict_serializable_reads_tx,
4678 global_timelines: timestamp_oracles,
4679 transient_id_gen: Arc::new(TransientIdGen::new()),
4680 active_conns: BTreeMap::new(),
4681 txn_read_holds: Default::default(),
4682 pending_peeks: BTreeMap::new(),
4683 client_pending_peeks: BTreeMap::new(),
4684 pending_linearize_read_txns: BTreeMap::new(),
4685 serialized_ddl: LockedVecDeque::new(),
4686 active_compute_sinks: BTreeMap::new(),
4687 active_webhooks: BTreeMap::new(),
4688 active_copies: BTreeMap::new(),
4689 staged_cancellation: BTreeMap::new(),
4690 introspection_subscribes: BTreeMap::new(),
4691 write_locks: BTreeMap::new(),
4692 deferred_write_ops: BTreeMap::new(),
4693 pending_writes: Vec::new(),
4694 advance_timelines_interval,
4695 secrets_controller,
4696 caching_secrets_reader,
4697 cloud_resource_controller,
4698 storage_usage_client,
4699 storage_usage_collection_interval,
4700 segment_client,
4701 metrics,
4702 optimizer_metrics,
4703 tracing_handle,
4704 statement_logging: StatementLogging::new(coord_now.clone()),
4705 webhook_concurrency_limit,
4706 timestamp_oracle_config,
4707 check_cluster_scheduling_policies_interval: check_scheduling_policies_interval,
4708 cluster_scheduling_decisions: BTreeMap::new(),
4709 caught_up_check_interval: clusters_caught_up_check_interval,
4710 caught_up_check: clusters_caught_up_check,
4711 installed_watch_sets: BTreeMap::new(),
4712 connection_watch_sets: BTreeMap::new(),
4713 cluster_replica_statuses: ClusterReplicaStatuses::new(),
4714 read_only_controllers,
4715 buffered_builtin_table_updates: Some(Vec::new()),
4716 license_key,
4717 user_id_pool: IdPool::empty(),
4718 persist_client,
4719 };
4720 let bootstrap = handle.block_on(async {
4721 coord
4722 .bootstrap(
4723 boot_ts,
4724 migrated_storage_collections_0dt,
4725 builtin_table_updates,
4726 cached_global_exprs,
4727 uncached_local_exprs,
4728 audit_logs_iterator,
4729 )
4730 .await?;
4731 coord
4732 .controller
4733 .remove_orphaned_replicas(
4734 coord.catalog().get_next_user_replica_id().await?,
4735 coord.catalog().get_next_system_replica_id().await?,
4736 )
4737 .await
4738 .map_err(AdapterError::Orchestrator)?;
4739
4740 if let Some(retention_period) = storage_usage_retention_period {
4741 coord
4742 .prune_storage_usage_events_on_startup(retention_period)
4743 .await;
4744 }
4745
4746 Ok(())
4747 });
4748 let ok = bootstrap.is_ok();
4749 drop(span);
4750 bootstrap_tx
4751 .send(bootstrap)
4752 .expect("bootstrap_rx is not dropped until it receives this message");
4753 if ok {
4754 handle.block_on(coord.serve(
4755 internal_cmd_rx,
4756 strict_serializable_reads_rx,
4757 cmd_rx,
4758 group_commit_rx,
4759 ));
4760 }
4761 })
4762 .expect("failed to create coordinator thread");
4763 match bootstrap_rx
4764 .await
4765 .expect("bootstrap_tx always sends a message or panics/halts")
4766 {
4767 Ok(()) => {
4768 info!(
4769 "startup: coordinator init: coordinator thread start complete in {:?}",
4770 coord_thread_start.elapsed()
4771 );
4772 info!(
4773 "startup: coordinator init: complete in {:?}",
4774 coord_start.elapsed()
4775 );
4776 let handle = Handle {
4777 session_id,
4778 start_instant,
4779 _thread: thread.join_on_drop(),
4780 };
4781 let client = Client::new(
4782 build_info,
4783 cmd_tx,
4784 metrics_clone,
4785 now,
4786 environment_id,
4787 segment_client_clone,
4788 );
4789 Ok((handle, client))
4790 }
4791 Err(e) => Err(e),
4792 }
4793 }
4794 .boxed()
4795}
4796
4797async fn get_initial_oracle_timestamps(
4811 timestamp_oracle_config: &Option<TimestampOracleConfig>,
4812) -> Result<BTreeMap<Timeline, Timestamp>, AdapterError> {
4813 let mut initial_timestamps = BTreeMap::new();
4814
4815 if let Some(config) = timestamp_oracle_config {
4816 let oracle_timestamps = config.get_all_timelines().await?;
4817
4818 let debug_msg = || {
4819 oracle_timestamps
4820 .iter()
4821 .map(|(timeline, ts)| format!("{:?} -> {}", timeline, ts))
4822 .join(", ")
4823 };
4824 info!(
4825 "current timestamps from the timestamp oracle: {}",
4826 debug_msg()
4827 );
4828
4829 for (timeline, ts) in oracle_timestamps {
4830 let entry = initial_timestamps
4831 .entry(Timeline::from_str(&timeline).expect("could not parse timeline"));
4832
4833 entry
4834 .and_modify(|current_ts| *current_ts = std::cmp::max(*current_ts, ts))
4835 .or_insert(ts);
4836 }
4837 } else {
4838 info!("no timestamp oracle configured!");
4839 };
4840
4841 let debug_msg = || {
4842 initial_timestamps
4843 .iter()
4844 .map(|(timeline, ts)| format!("{:?}: {}", timeline, ts))
4845 .join(", ")
4846 };
4847 info!("initial oracle timestamps: {}", debug_msg());
4848
4849 Ok(initial_timestamps)
4850}
4851
4852#[instrument]
4853pub async fn load_remote_system_parameters(
4854 storage: &mut Box<dyn OpenableDurableCatalogState>,
4855 system_parameter_sync_config: Option<SystemParameterSyncConfig>,
4856 system_parameter_sync_timeout: Duration,
4857) -> Result<Option<BTreeMap<String, String>>, AdapterError> {
4858 if let Some(system_parameter_sync_config) = system_parameter_sync_config {
4859 tracing::info!("parameter sync on boot: start sync");
4860
4861 let mut params = SynchronizedParameters::new(SystemVars::default());
4901 let frontend_sync = async {
4902 let frontend = SystemParameterFrontend::from(&system_parameter_sync_config).await?;
4903 frontend.pull(&mut params);
4904 let ops = params
4905 .modified()
4906 .into_iter()
4907 .map(|param| {
4908 let name = param.name;
4909 let value = param.value;
4910 tracing::info!(name, value, initial = true, "sync parameter");
4911 (name, value)
4912 })
4913 .collect();
4914 tracing::info!("parameter sync on boot: end sync");
4915 Ok(Some(ops))
4916 };
4917 if !storage.has_system_config_synced_once().await? {
4918 frontend_sync.await
4919 } else {
4920 match mz_ore::future::timeout(system_parameter_sync_timeout, frontend_sync).await {
4921 Ok(ops) => Ok(ops),
4922 Err(TimeoutError::Inner(e)) => Err(e),
4923 Err(TimeoutError::DeadlineElapsed) => {
4924 tracing::info!("parameter sync on boot: sync has timed out");
4925 Ok(None)
4926 }
4927 }
4928 }
4929 } else {
4930 Ok(None)
4931 }
4932}
4933
4934#[derive(Debug)]
4935pub enum WatchSetResponse {
4936 StatementDependenciesReady(StatementLoggingId, StatementLifecycleEvent),
4937 AlterSinkReady(AlterSinkReadyContext),
4938 AlterMaterializedViewReady(AlterMaterializedViewReadyContext),
4939}
4940
4941#[derive(Debug)]
4942pub struct AlterSinkReadyContext {
4943 ctx: Option<ExecuteContext>,
4944 otel_ctx: OpenTelemetryContext,
4945 plan: AlterSinkPlan,
4946 plan_validity: PlanValidity,
4947 read_hold: ReadHolds<Timestamp>,
4948}
4949
4950impl AlterSinkReadyContext {
4951 fn ctx(&mut self) -> &mut ExecuteContext {
4952 self.ctx.as_mut().expect("only cleared on drop")
4953 }
4954
4955 fn retire(mut self, result: Result<ExecuteResponse, AdapterError>) {
4956 self.ctx
4957 .take()
4958 .expect("only cleared on drop")
4959 .retire(result);
4960 }
4961}
4962
4963impl Drop for AlterSinkReadyContext {
4964 fn drop(&mut self) {
4965 if let Some(ctx) = self.ctx.take() {
4966 ctx.retire(Err(AdapterError::Canceled));
4967 }
4968 }
4969}
4970
4971#[derive(Debug)]
4972pub struct AlterMaterializedViewReadyContext {
4973 ctx: Option<ExecuteContext>,
4974 otel_ctx: OpenTelemetryContext,
4975 plan: plan::AlterMaterializedViewApplyReplacementPlan,
4976 plan_validity: PlanValidity,
4977}
4978
4979impl AlterMaterializedViewReadyContext {
4980 fn ctx(&mut self) -> &mut ExecuteContext {
4981 self.ctx.as_mut().expect("only cleared on drop")
4982 }
4983
4984 fn retire(mut self, result: Result<ExecuteResponse, AdapterError>) {
4985 self.ctx
4986 .take()
4987 .expect("only cleared on drop")
4988 .retire(result);
4989 }
4990}
4991
4992impl Drop for AlterMaterializedViewReadyContext {
4993 fn drop(&mut self) {
4994 if let Some(ctx) = self.ctx.take() {
4995 ctx.retire(Err(AdapterError::Canceled));
4996 }
4997 }
4998}
4999
5000#[derive(Debug)]
5003struct LockedVecDeque<T> {
5004 items: VecDeque<T>,
5005 lock: Arc<tokio::sync::Mutex<()>>,
5006}
5007
5008impl<T> LockedVecDeque<T> {
5009 pub fn new() -> Self {
5010 Self {
5011 items: VecDeque::new(),
5012 lock: Arc::new(tokio::sync::Mutex::new(())),
5013 }
5014 }
5015
5016 pub fn try_lock_owned(&self) -> Result<OwnedMutexGuard<()>, tokio::sync::TryLockError> {
5017 Arc::clone(&self.lock).try_lock_owned()
5018 }
5019
5020 pub fn is_empty(&self) -> bool {
5021 self.items.is_empty()
5022 }
5023
5024 pub fn push_back(&mut self, value: T) {
5025 self.items.push_back(value)
5026 }
5027
5028 pub fn pop_front(&mut self) -> Option<T> {
5029 self.items.pop_front()
5030 }
5031
5032 pub fn remove(&mut self, index: usize) -> Option<T> {
5033 self.items.remove(index)
5034 }
5035
5036 pub fn iter(&self) -> std::collections::vec_deque::Iter<'_, T> {
5037 self.items.iter()
5038 }
5039}
5040
5041#[derive(Debug)]
5042struct DeferredPlanStatement {
5043 ctx: ExecuteContext,
5044 ps: PlanStatement,
5045}
5046
5047#[derive(Debug)]
5048enum PlanStatement {
5049 Statement {
5050 stmt: Arc<Statement<Raw>>,
5051 params: Params,
5052 },
5053 Plan {
5054 plan: mz_sql::plan::Plan,
5055 resolved_ids: ResolvedIds,
5056 },
5057}
5058
5059#[derive(Debug, Error)]
5060pub enum NetworkPolicyError {
5061 #[error("Access denied for address {0}")]
5062 AddressDenied(IpAddr),
5063 #[error("Access denied missing IP address")]
5064 MissingIp,
5065}
5066
5067pub(crate) fn validate_ip_with_policy_rules(
5068 ip: &IpAddr,
5069 rules: &Vec<NetworkPolicyRule>,
5070) -> Result<(), NetworkPolicyError> {
5071 if rules.iter().any(|r| r.address.0.contains(ip)) {
5074 Ok(())
5075 } else {
5076 Err(NetworkPolicyError::AddressDenied(ip.clone()))
5077 }
5078}
5079
5080pub(crate) fn infer_sql_type_for_catalog(
5081 hir_expr: &HirRelationExpr,
5082 mir_expr: &MirRelationExpr,
5083) -> SqlRelationType {
5084 let mut typ = hir_expr.top_level_typ();
5085 typ.backport_nullability_and_keys(&mir_expr.typ());
5086 typ
5087}
5088
5089#[cfg(test)]
5090mod id_pool_tests {
5091 use super::IdPool;
5092
5093 #[mz_ore::test]
5094 fn test_empty_pool() {
5095 let mut pool = IdPool::empty();
5096 assert_eq!(pool.remaining(), 0);
5097 assert_eq!(pool.allocate(), None);
5098 assert_eq!(pool.allocate_many(1), None);
5099 }
5100
5101 #[mz_ore::test]
5102 fn test_allocate_single() {
5103 let mut pool = IdPool::empty();
5104 pool.refill(10, 13);
5105 assert_eq!(pool.remaining(), 3);
5106 assert_eq!(pool.allocate(), Some(10));
5107 assert_eq!(pool.allocate(), Some(11));
5108 assert_eq!(pool.allocate(), Some(12));
5109 assert_eq!(pool.remaining(), 0);
5110 assert_eq!(pool.allocate(), None);
5111 }
5112
5113 #[mz_ore::test]
5114 fn test_allocate_many() {
5115 let mut pool = IdPool::empty();
5116 pool.refill(100, 105);
5117 assert_eq!(pool.allocate_many(3), Some(vec![100, 101, 102]));
5118 assert_eq!(pool.remaining(), 2);
5119 assert_eq!(pool.allocate_many(3), None);
5121 assert_eq!(pool.allocate_many(2), Some(vec![103, 104]));
5123 assert_eq!(pool.remaining(), 0);
5124 }
5125
5126 #[mz_ore::test]
5127 fn test_allocate_many_zero() {
5128 let mut pool = IdPool::empty();
5129 pool.refill(1, 5);
5130 assert_eq!(pool.allocate_many(0), Some(vec![]));
5131 assert_eq!(pool.remaining(), 4);
5132 }
5133
5134 #[mz_ore::test]
5135 fn test_refill_resets_pool() {
5136 let mut pool = IdPool::empty();
5137 pool.refill(0, 2);
5138 assert_eq!(pool.allocate(), Some(0));
5139 pool.refill(50, 52);
5141 assert_eq!(pool.allocate(), Some(50));
5142 assert_eq!(pool.allocate(), Some(51));
5143 assert_eq!(pool.allocate(), None);
5144 }
5145
5146 #[mz_ore::test]
5147 fn test_mixed_allocate_and_allocate_many() {
5148 let mut pool = IdPool::empty();
5149 pool.refill(0, 10);
5150 assert_eq!(pool.allocate(), Some(0));
5151 assert_eq!(pool.allocate_many(3), Some(vec![1, 2, 3]));
5152 assert_eq!(pool.allocate(), Some(4));
5153 assert_eq!(pool.remaining(), 5);
5154 }
5155
5156 #[mz_ore::test]
5157 #[should_panic(expected = "invalid pool range")]
5158 fn test_refill_invalid_range_panics() {
5159 let mut pool = IdPool::empty();
5160 pool.refill(10, 5);
5161 }
5162}