1use std::borrow::Cow;
70use std::collections::{BTreeMap, BTreeSet, VecDeque};
71use std::net::IpAddr;
72use std::num::NonZeroI64;
73use std::ops::Neg;
74use std::str::FromStr;
75use std::sync::LazyLock;
76use std::sync::{Arc, Mutex};
77use std::thread;
78use std::time::{Duration, Instant};
79use std::{fmt, mem};
80
81use anyhow::Context;
82use chrono::{DateTime, Utc};
83use derivative::Derivative;
84use differential_dataflow::lattice::Lattice;
85use fail::fail_point;
86use futures::StreamExt;
87use futures::future::{BoxFuture, FutureExt, LocalBoxFuture};
88use http::Uri;
89use ipnet::IpNet;
90use itertools::{Either, Itertools};
91use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
92use mz_adapter_types::compaction::CompactionWindow;
93use mz_adapter_types::connection::ConnectionId;
94use mz_adapter_types::dyncfgs::{
95 USER_ID_POOL_BATCH_SIZE, WITH_0DT_DEPLOYMENT_CAUGHT_UP_CHECK_INTERVAL,
96};
97use mz_auth::password::Password;
98use mz_build_info::BuildInfo;
99use mz_catalog::builtin::{BUILTINS, BUILTINS_STATIC, MZ_AUDIT_EVENTS, MZ_STORAGE_USAGE_BY_SHARD};
100use mz_catalog::config::{AwsPrincipalContext, BuiltinItemMigrationConfig, ClusterReplicaSizeMap};
101use mz_catalog::durable::{AuditLogIterator, OpenableDurableCatalogState};
102use mz_catalog::expr_cache::{GlobalExpressions, LocalExpressions};
103use mz_catalog::memory::objects::{
104 CatalogEntry, CatalogItem, ClusterReplicaProcessStatus, ClusterVariantManaged, Connection,
105 DataSourceDesc, StateDiff, StateUpdate, StateUpdateKind, Table, TableDataSource,
106};
107use mz_cloud_resources::{CloudResourceController, VpcEndpointConfig, VpcEndpointEvent};
108use mz_compute_client::as_of_selection;
109use mz_compute_client::controller::error::{
110 CollectionLookupError, CollectionMissing, DataflowCreationError, InstanceMissing,
111};
112use mz_compute_types::ComputeInstanceId;
113use mz_compute_types::dataflows::DataflowDescription;
114use mz_compute_types::plan::Plan;
115use mz_controller::clusters::{
116 ClusterConfig, ClusterEvent, ClusterStatus, ProcessId, ReplicaLocation,
117};
118use mz_controller::{ControllerConfig, Readiness};
119use mz_controller_types::{ClusterId, ReplicaId, WatchSetId};
120use mz_expr::{MapFilterProject, MirRelationExpr, OptimizedMirRelationExpr, RowSetFinishing};
121use mz_license_keys::ValidatedLicenseKey;
122use mz_orchestrator::OfflineReason;
123use mz_ore::cast::{CastFrom, CastInto, CastLossy};
124use mz_ore::channel::trigger::Trigger;
125use mz_ore::future::TimeoutError;
126use mz_ore::metrics::MetricsRegistry;
127use mz_ore::now::{EpochMillis, NowFn};
128use mz_ore::task::{JoinHandle, spawn};
129use mz_ore::thread::JoinHandleExt;
130use mz_ore::tracing::{OpenTelemetryContext, TracingHandle};
131use mz_ore::url::SensitiveUrl;
132use mz_ore::{
133 assert_none, instrument, soft_assert_eq_or_log, soft_assert_or_log, soft_panic_or_log, stack,
134};
135use mz_persist_client::PersistClient;
136use mz_persist_client::batch::ProtoBatch;
137use mz_persist_client::usage::{ShardsUsageReferenced, StorageUsageClient};
138use mz_repr::adt::numeric::Numeric;
139use mz_repr::explain::{ExplainConfig, ExplainFormat};
140use mz_repr::global_id::TransientIdGen;
141use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
142use mz_repr::role_id::RoleId;
143use mz_repr::{CatalogItemId, Diff, GlobalId, RelationDesc, SqlRelationType, Timestamp};
144use mz_secrets::cache::CachingSecretsReader;
145use mz_secrets::{SecretsController, SecretsReader};
146use mz_sql::ast::{Raw, Statement};
147use mz_sql::catalog::{CatalogCluster, EnvironmentId};
148use mz_sql::names::{QualifiedItemName, ResolvedIds, SchemaSpecifier};
149use mz_sql::optimizer_metrics::OptimizerMetrics;
150use mz_sql::plan::{
151 self, AlterSinkPlan, ConnectionDetails, CreateConnectionPlan, HirRelationExpr,
152 NetworkPolicyRule, OnTimeoutAction, Params, QueryWhen,
153};
154use mz_sql::session::user::User;
155use mz_sql::session::vars::{MAX_CREDIT_CONSUMPTION_RATE, SystemVars, Var};
156use mz_sql_parser::ast::ExplainStage;
157use mz_sql_parser::ast::display::AstDisplay;
158use mz_storage_client::client::TableData;
159use mz_storage_client::controller::{CollectionDescription, DataSource, ExportDescription};
160use mz_storage_types::connections::Connection as StorageConnection;
161use mz_storage_types::connections::ConnectionContext;
162use mz_storage_types::connections::inline::{IntoInlineConnection, ReferencedConnection};
163use mz_storage_types::read_holds::ReadHold;
164use mz_storage_types::sinks::{S3SinkFormat, StorageSinkDesc};
165use mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC;
166use mz_storage_types::sources::{IngestionDescription, SourceExport, Timeline};
167use mz_timestamp_oracle::{TimestampOracleConfig, WriteTimestamp};
168use mz_transform::dataflow::DataflowMetainfo;
169use opentelemetry::trace::TraceContextExt;
170use serde::Serialize;
171use thiserror::Error;
172use timely::progress::{Antichain, Timestamp as _};
173use tokio::runtime::Handle as TokioHandle;
174use tokio::select;
175use tokio::sync::{OwnedMutexGuard, mpsc, oneshot, watch};
176use tokio::time::{Interval, MissedTickBehavior};
177use tracing::{Instrument, Level, Span, debug, info, info_span, span, warn};
178use tracing_opentelemetry::OpenTelemetrySpanExt;
179use uuid::Uuid;
180
181use crate::active_compute_sink::{ActiveComputeSink, ActiveCopyFrom};
182use crate::catalog::{BuiltinTableUpdate, Catalog, OpenCatalogResult};
183use crate::client::{Client, Handle};
184use crate::command::{Command, ExecuteResponse};
185use crate::config::{SynchronizedParameters, SystemParameterFrontend, SystemParameterSyncConfig};
186use crate::coord::appends::{
187 BuiltinTableAppendNotify, DeferredOp, GroupCommitPermit, PendingWriteTxn,
188};
189use crate::coord::caught_up::CaughtUpCheckContext;
190use crate::coord::cluster_scheduling::SchedulingDecision;
191use crate::coord::id_bundle::CollectionIdBundle;
192use crate::coord::introspection::IntrospectionSubscribe;
193use crate::coord::peek::PendingPeek;
194use crate::coord::statement_logging::StatementLogging;
195use crate::coord::timeline::{TimelineContext, TimelineState};
196use crate::coord::timestamp_selection::{TimestampContext, TimestampDetermination};
197use crate::coord::validity::PlanValidity;
198use crate::error::AdapterError;
199use crate::explain::insights::PlanInsightsContext;
200use crate::explain::optimizer_trace::{DispatchGuard, OptimizerTrace};
201use crate::metrics::Metrics;
202use crate::optimize::dataflows::{ComputeInstanceSnapshot, DataflowBuilder};
203use crate::optimize::{self, Optimize, OptimizerConfig};
204use crate::session::{EndTransactionAction, Session};
205use crate::statement_logging::{
206 StatementEndedExecutionReason, StatementLifecycleEvent, StatementLoggingId,
207};
208use crate::util::{ClientTransmitter, ResultExt, sort_topological};
209use crate::webhook::{WebhookAppenderInvalidator, WebhookConcurrencyLimiter};
210use crate::{AdapterNotice, ReadHolds, flags};
211
212pub(crate) mod appends;
213pub(crate) mod catalog_serving;
214pub(crate) mod cluster_scheduling;
215pub(crate) mod consistency;
216pub(crate) mod id_bundle;
217pub(crate) mod in_memory_oracle;
218pub(crate) mod peek;
219pub(crate) mod read_policy;
220pub(crate) mod sequencer;
221pub(crate) mod statement_logging;
222pub(crate) mod timeline;
223pub(crate) mod timestamp_selection;
224
225pub mod catalog_implications;
226mod caught_up;
227mod command_handler;
228mod ddl;
229mod indexes;
230mod introspection;
231mod message_handler;
232mod privatelink_status;
233mod sql;
234mod validity;
235
236#[derive(Debug)]
262pub(crate) struct IdPool {
263 next: u64,
264 upper: u64,
265}
266
267impl IdPool {
268 pub fn empty() -> Self {
270 IdPool { next: 0, upper: 0 }
271 }
272
273 pub fn allocate(&mut self) -> Option<u64> {
275 if self.next < self.upper {
276 let id = self.next;
277 self.next += 1;
278 Some(id)
279 } else {
280 None
281 }
282 }
283
284 pub fn allocate_many(&mut self, n: u64) -> Option<Vec<u64>> {
287 if self.remaining() >= n {
288 let ids = (self.next..self.next + n).collect();
289 self.next += n;
290 Some(ids)
291 } else {
292 None
293 }
294 }
295
296 pub fn remaining(&self) -> u64 {
298 self.upper - self.next
299 }
300
301 pub fn refill(&mut self, next: u64, upper: u64) {
303 assert!(next <= upper, "invalid pool range: {next}..{upper}");
304 self.next = next;
305 self.upper = upper;
306 }
307}
308
309#[derive(Debug)]
310pub enum Message {
311 Command(OpenTelemetryContext, Command),
312 ControllerReady {
313 controller: ControllerReadiness,
314 },
315 PurifiedStatementReady(PurifiedStatementReady),
316 CreateConnectionValidationReady(CreateConnectionValidationReady),
317 AlterConnectionValidationReady(AlterConnectionValidationReady),
318 TryDeferred {
319 conn_id: ConnectionId,
321 acquired_lock: Option<(CatalogItemId, tokio::sync::OwnedMutexGuard<()>)>,
331 },
332 GroupCommitInitiate(Span, Option<GroupCommitPermit>),
334 DeferredStatementReady,
335 AdvanceTimelines,
336 ClusterEvent(ClusterEvent),
337 CancelPendingPeeks {
338 conn_id: ConnectionId,
339 },
340 LinearizeReads,
341 StagedBatches {
342 conn_id: ConnectionId,
343 table_id: CatalogItemId,
344 batches: Vec<Result<ProtoBatch, String>>,
345 },
346 StorageUsageSchedule,
347 StorageUsageFetch,
348 StorageUsageUpdate(ShardsUsageReferenced),
349 StorageUsagePrune(Vec<BuiltinTableUpdate>),
350 RetireExecute {
353 data: ExecuteContextExtra,
354 otel_ctx: OpenTelemetryContext,
355 reason: StatementEndedExecutionReason,
356 },
357 ExecuteSingleStatementTransaction {
358 ctx: ExecuteContext,
359 otel_ctx: OpenTelemetryContext,
360 stmt: Arc<Statement<Raw>>,
361 params: mz_sql::plan::Params,
362 },
363 PeekStageReady {
364 ctx: ExecuteContext,
365 span: Span,
366 stage: PeekStage,
367 },
368 CreateIndexStageReady {
369 ctx: ExecuteContext,
370 span: Span,
371 stage: CreateIndexStage,
372 },
373 CreateViewStageReady {
374 ctx: ExecuteContext,
375 span: Span,
376 stage: CreateViewStage,
377 },
378 CreateMaterializedViewStageReady {
379 ctx: ExecuteContext,
380 span: Span,
381 stage: CreateMaterializedViewStage,
382 },
383 SubscribeStageReady {
384 ctx: ExecuteContext,
385 span: Span,
386 stage: SubscribeStage,
387 },
388 IntrospectionSubscribeStageReady {
389 span: Span,
390 stage: IntrospectionSubscribeStage,
391 },
392 SecretStageReady {
393 ctx: ExecuteContext,
394 span: Span,
395 stage: SecretStage,
396 },
397 ClusterStageReady {
398 ctx: ExecuteContext,
399 span: Span,
400 stage: ClusterStage,
401 },
402 ExplainTimestampStageReady {
403 ctx: ExecuteContext,
404 span: Span,
405 stage: ExplainTimestampStage,
406 },
407 DrainStatementLog,
408 PrivateLinkVpcEndpointEvents(Vec<VpcEndpointEvent>),
409 CheckSchedulingPolicies,
410
411 SchedulingDecisions(Vec<(&'static str, Vec<(ClusterId, SchedulingDecision)>)>),
416}
417
418impl Message {
419 pub const fn kind(&self) -> &'static str {
421 match self {
422 Message::Command(_, msg) => match msg {
423 Command::CatalogSnapshot { .. } => "command-catalog_snapshot",
424 Command::Startup { .. } => "command-startup",
425 Command::Execute { .. } => "command-execute",
426 Command::Commit { .. } => "command-commit",
427 Command::CancelRequest { .. } => "command-cancel_request",
428 Command::PrivilegedCancelRequest { .. } => "command-privileged_cancel_request",
429 Command::GetWebhook { .. } => "command-get_webhook",
430 Command::GetSystemVars { .. } => "command-get_system_vars",
431 Command::SetSystemVars { .. } => "command-set_system_vars",
432 Command::Terminate { .. } => "command-terminate",
433 Command::RetireExecute { .. } => "command-retire_execute",
434 Command::CheckConsistency { .. } => "command-check_consistency",
435 Command::Dump { .. } => "command-dump",
436 Command::AuthenticatePassword { .. } => "command-auth_check",
437 Command::AuthenticateGetSASLChallenge { .. } => "command-auth_get_sasl_challenge",
438 Command::AuthenticateVerifySASLProof { .. } => "command-auth_verify_sasl_proof",
439 Command::CheckRoleCanLogin { .. } => "command-check_role_can_login",
440 Command::GetComputeInstanceClient { .. } => "get-compute-instance-client",
441 Command::GetOracle { .. } => "get-oracle",
442 Command::DetermineRealTimeRecentTimestamp { .. } => {
443 "determine-real-time-recent-timestamp"
444 }
445 Command::GetTransactionReadHoldsBundle { .. } => {
446 "get-transaction-read-holds-bundle"
447 }
448 Command::StoreTransactionReadHolds { .. } => "store-transaction-read-holds",
449 Command::ExecuteSlowPathPeek { .. } => "execute-slow-path-peek",
450 Command::ExecuteSubscribe { .. } => "execute-subscribe",
451 Command::CopyToPreflight { .. } => "copy-to-preflight",
452 Command::ExecuteCopyTo { .. } => "execute-copy-to",
453 Command::ExecuteSideEffectingFunc { .. } => "execute-side-effecting-func",
454 Command::RegisterFrontendPeek { .. } => "register-frontend-peek",
455 Command::UnregisterFrontendPeek { .. } => "unregister-frontend-peek",
456 Command::ExplainTimestamp { .. } => "explain-timestamp",
457 Command::FrontendStatementLogging(..) => "frontend-statement-logging",
458 Command::StartCopyFromStdin { .. } => "start-copy-from-stdin",
459 Command::InjectAuditEvents { .. } => "inject-audit-events",
460 },
461 Message::ControllerReady {
462 controller: ControllerReadiness::Compute,
463 } => "controller_ready(compute)",
464 Message::ControllerReady {
465 controller: ControllerReadiness::Storage,
466 } => "controller_ready(storage)",
467 Message::ControllerReady {
468 controller: ControllerReadiness::Metrics,
469 } => "controller_ready(metrics)",
470 Message::ControllerReady {
471 controller: ControllerReadiness::Internal,
472 } => "controller_ready(internal)",
473 Message::PurifiedStatementReady(_) => "purified_statement_ready",
474 Message::CreateConnectionValidationReady(_) => "create_connection_validation_ready",
475 Message::TryDeferred { .. } => "try_deferred",
476 Message::GroupCommitInitiate(..) => "group_commit_initiate",
477 Message::AdvanceTimelines => "advance_timelines",
478 Message::ClusterEvent(_) => "cluster_event",
479 Message::CancelPendingPeeks { .. } => "cancel_pending_peeks",
480 Message::LinearizeReads => "linearize_reads",
481 Message::StagedBatches { .. } => "staged_batches",
482 Message::StorageUsageSchedule => "storage_usage_schedule",
483 Message::StorageUsageFetch => "storage_usage_fetch",
484 Message::StorageUsageUpdate(_) => "storage_usage_update",
485 Message::StorageUsagePrune(_) => "storage_usage_prune",
486 Message::RetireExecute { .. } => "retire_execute",
487 Message::ExecuteSingleStatementTransaction { .. } => {
488 "execute_single_statement_transaction"
489 }
490 Message::PeekStageReady { .. } => "peek_stage_ready",
491 Message::ExplainTimestampStageReady { .. } => "explain_timestamp_stage_ready",
492 Message::CreateIndexStageReady { .. } => "create_index_stage_ready",
493 Message::CreateViewStageReady { .. } => "create_view_stage_ready",
494 Message::CreateMaterializedViewStageReady { .. } => {
495 "create_materialized_view_stage_ready"
496 }
497 Message::SubscribeStageReady { .. } => "subscribe_stage_ready",
498 Message::IntrospectionSubscribeStageReady { .. } => {
499 "introspection_subscribe_stage_ready"
500 }
501 Message::SecretStageReady { .. } => "secret_stage_ready",
502 Message::ClusterStageReady { .. } => "cluster_stage_ready",
503 Message::DrainStatementLog => "drain_statement_log",
504 Message::AlterConnectionValidationReady(..) => "alter_connection_validation_ready",
505 Message::PrivateLinkVpcEndpointEvents(_) => "private_link_vpc_endpoint_events",
506 Message::CheckSchedulingPolicies => "check_scheduling_policies",
507 Message::SchedulingDecisions { .. } => "scheduling_decision",
508 Message::DeferredStatementReady => "deferred_statement_ready",
509 }
510 }
511}
512
513#[derive(Debug)]
515pub enum ControllerReadiness {
516 Storage,
518 Compute,
520 Metrics,
522 Internal,
524}
525
526#[derive(Derivative)]
527#[derivative(Debug)]
528pub struct BackgroundWorkResult<T> {
529 #[derivative(Debug = "ignore")]
530 pub ctx: ExecuteContext,
531 pub result: Result<T, AdapterError>,
532 pub params: Params,
533 pub plan_validity: PlanValidity,
534 pub original_stmt: Arc<Statement<Raw>>,
535 pub otel_ctx: OpenTelemetryContext,
536}
537
538pub type PurifiedStatementReady = BackgroundWorkResult<mz_sql::pure::PurifiedStatement>;
539
540#[derive(Derivative)]
541#[derivative(Debug)]
542pub struct ValidationReady<T> {
543 #[derivative(Debug = "ignore")]
544 pub ctx: ExecuteContext,
545 pub result: Result<T, AdapterError>,
546 pub resolved_ids: ResolvedIds,
547 pub connection_id: CatalogItemId,
548 pub connection_gid: GlobalId,
549 pub plan_validity: PlanValidity,
550 pub otel_ctx: OpenTelemetryContext,
551}
552
553pub type CreateConnectionValidationReady = ValidationReady<CreateConnectionPlan>;
554pub type AlterConnectionValidationReady = ValidationReady<Connection>;
555
556#[derive(Debug)]
557pub enum PeekStage {
558 LinearizeTimestamp(PeekStageLinearizeTimestamp),
560 RealTimeRecency(PeekStageRealTimeRecency),
561 TimestampReadHold(PeekStageTimestampReadHold),
562 Optimize(PeekStageOptimize),
563 Finish(PeekStageFinish),
565 ExplainPlan(PeekStageExplainPlan),
567 ExplainPushdown(PeekStageExplainPushdown),
568 CopyToPreflight(PeekStageCopyTo),
570 CopyToDataflow(PeekStageCopyTo),
572}
573
574#[derive(Debug)]
575pub struct CopyToContext {
576 pub desc: RelationDesc,
578 pub uri: Uri,
580 pub connection: StorageConnection<ReferencedConnection>,
582 pub connection_id: CatalogItemId,
584 pub format: S3SinkFormat,
586 pub max_file_size: u64,
588 pub output_batch_count: Option<u64>,
593}
594
595#[derive(Debug)]
596pub struct PeekStageLinearizeTimestamp {
597 validity: PlanValidity,
598 plan: mz_sql::plan::SelectPlan,
599 max_query_result_size: Option<u64>,
600 source_ids: BTreeSet<GlobalId>,
601 target_replica: Option<ReplicaId>,
602 timeline_context: TimelineContext,
603 optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
604 explain_ctx: ExplainContext,
607}
608
609#[derive(Debug)]
610pub struct PeekStageRealTimeRecency {
611 validity: PlanValidity,
612 plan: mz_sql::plan::SelectPlan,
613 max_query_result_size: Option<u64>,
614 source_ids: BTreeSet<GlobalId>,
615 target_replica: Option<ReplicaId>,
616 timeline_context: TimelineContext,
617 oracle_read_ts: Option<Timestamp>,
618 optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
619 explain_ctx: ExplainContext,
622}
623
624#[derive(Debug)]
625pub struct PeekStageTimestampReadHold {
626 validity: PlanValidity,
627 plan: mz_sql::plan::SelectPlan,
628 max_query_result_size: Option<u64>,
629 source_ids: BTreeSet<GlobalId>,
630 target_replica: Option<ReplicaId>,
631 timeline_context: TimelineContext,
632 oracle_read_ts: Option<Timestamp>,
633 real_time_recency_ts: Option<mz_repr::Timestamp>,
634 optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
635 explain_ctx: ExplainContext,
638}
639
640#[derive(Debug)]
641pub struct PeekStageOptimize {
642 validity: PlanValidity,
643 plan: mz_sql::plan::SelectPlan,
644 max_query_result_size: Option<u64>,
645 source_ids: BTreeSet<GlobalId>,
646 id_bundle: CollectionIdBundle,
647 target_replica: Option<ReplicaId>,
648 determination: TimestampDetermination,
649 optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
650 explain_ctx: ExplainContext,
653}
654
655#[derive(Debug)]
656pub struct PeekStageFinish {
657 validity: PlanValidity,
658 plan: mz_sql::plan::SelectPlan,
659 max_query_result_size: Option<u64>,
660 id_bundle: CollectionIdBundle,
661 target_replica: Option<ReplicaId>,
662 source_ids: BTreeSet<GlobalId>,
663 determination: TimestampDetermination,
664 cluster_id: ComputeInstanceId,
665 finishing: RowSetFinishing,
666 plan_insights_optimizer_trace: Option<OptimizerTrace>,
669 insights_ctx: Option<Box<PlanInsightsContext>>,
670 global_lir_plan: optimize::peek::GlobalLirPlan,
671 optimization_finished_at: EpochMillis,
672}
673
674#[derive(Debug)]
675pub struct PeekStageCopyTo {
676 validity: PlanValidity,
677 optimizer: optimize::copy_to::Optimizer,
678 global_lir_plan: optimize::copy_to::GlobalLirPlan,
679 optimization_finished_at: EpochMillis,
680 source_ids: BTreeSet<GlobalId>,
681}
682
683#[derive(Debug)]
684pub struct PeekStageExplainPlan {
685 validity: PlanValidity,
686 optimizer: optimize::peek::Optimizer,
687 df_meta: DataflowMetainfo,
688 explain_ctx: ExplainPlanContext,
689 insights_ctx: Option<Box<PlanInsightsContext>>,
690}
691
692#[derive(Debug)]
693pub struct PeekStageExplainPushdown {
694 validity: PlanValidity,
695 determination: TimestampDetermination,
696 imports: BTreeMap<GlobalId, MapFilterProject>,
697}
698
699#[derive(Debug)]
700pub enum CreateIndexStage {
701 Optimize(CreateIndexOptimize),
702 Finish(CreateIndexFinish),
703 Explain(CreateIndexExplain),
704}
705
706#[derive(Debug)]
707pub struct CreateIndexOptimize {
708 validity: PlanValidity,
709 plan: plan::CreateIndexPlan,
710 resolved_ids: ResolvedIds,
711 explain_ctx: ExplainContext,
714}
715
716#[derive(Debug)]
717pub struct CreateIndexFinish {
718 validity: PlanValidity,
719 item_id: CatalogItemId,
720 global_id: GlobalId,
721 plan: plan::CreateIndexPlan,
722 resolved_ids: ResolvedIds,
723 global_mir_plan: optimize::index::GlobalMirPlan,
724 global_lir_plan: optimize::index::GlobalLirPlan,
725 optimizer_features: OptimizerFeatures,
726}
727
728#[derive(Debug)]
729pub struct CreateIndexExplain {
730 validity: PlanValidity,
731 exported_index_id: GlobalId,
732 plan: plan::CreateIndexPlan,
733 df_meta: DataflowMetainfo,
734 explain_ctx: ExplainPlanContext,
735}
736
737#[derive(Debug)]
738pub enum CreateViewStage {
739 Optimize(CreateViewOptimize),
740 Finish(CreateViewFinish),
741 Explain(CreateViewExplain),
742}
743
744#[derive(Debug)]
745pub struct CreateViewOptimize {
746 validity: PlanValidity,
747 plan: plan::CreateViewPlan,
748 resolved_ids: ResolvedIds,
749 explain_ctx: ExplainContext,
752}
753
754#[derive(Debug)]
755pub struct CreateViewFinish {
756 validity: PlanValidity,
757 item_id: CatalogItemId,
759 global_id: GlobalId,
761 plan: plan::CreateViewPlan,
762 resolved_ids: ResolvedIds,
764 optimized_expr: OptimizedMirRelationExpr,
765}
766
767#[derive(Debug)]
768pub struct CreateViewExplain {
769 validity: PlanValidity,
770 id: GlobalId,
771 plan: plan::CreateViewPlan,
772 explain_ctx: ExplainPlanContext,
773}
774
775#[derive(Debug)]
776pub enum ExplainTimestampStage {
777 Optimize(ExplainTimestampOptimize),
778 RealTimeRecency(ExplainTimestampRealTimeRecency),
779 Finish(ExplainTimestampFinish),
780}
781
782#[derive(Debug)]
783pub struct ExplainTimestampOptimize {
784 validity: PlanValidity,
785 plan: plan::ExplainTimestampPlan,
786 cluster_id: ClusterId,
787}
788
789#[derive(Debug)]
790pub struct ExplainTimestampRealTimeRecency {
791 validity: PlanValidity,
792 format: ExplainFormat,
793 optimized_plan: OptimizedMirRelationExpr,
794 cluster_id: ClusterId,
795 when: QueryWhen,
796}
797
798#[derive(Debug)]
799pub struct ExplainTimestampFinish {
800 validity: PlanValidity,
801 format: ExplainFormat,
802 optimized_plan: OptimizedMirRelationExpr,
803 cluster_id: ClusterId,
804 source_ids: BTreeSet<GlobalId>,
805 when: QueryWhen,
806 real_time_recency_ts: Option<Timestamp>,
807}
808
809#[derive(Debug)]
810pub enum ClusterStage {
811 Alter(AlterCluster),
812 WaitForHydrated(AlterClusterWaitForHydrated),
813 Finalize(AlterClusterFinalize),
814}
815
816#[derive(Debug)]
817pub struct AlterCluster {
818 validity: PlanValidity,
819 plan: plan::AlterClusterPlan,
820}
821
822#[derive(Debug)]
823pub struct AlterClusterWaitForHydrated {
824 validity: PlanValidity,
825 plan: plan::AlterClusterPlan,
826 new_config: ClusterVariantManaged,
827 timeout_time: Instant,
828 on_timeout: OnTimeoutAction,
829}
830
831#[derive(Debug)]
832pub struct AlterClusterFinalize {
833 validity: PlanValidity,
834 plan: plan::AlterClusterPlan,
835 new_config: ClusterVariantManaged,
836}
837
838#[derive(Debug)]
839pub enum ExplainContext {
840 None,
842 Plan(ExplainPlanContext),
844 PlanInsightsNotice(OptimizerTrace),
847 Pushdown,
849}
850
851impl ExplainContext {
852 pub(crate) fn dispatch_guard(&self) -> Option<DispatchGuard<'_>> {
856 let optimizer_trace = match self {
857 ExplainContext::Plan(explain_ctx) => Some(&explain_ctx.optimizer_trace),
858 ExplainContext::PlanInsightsNotice(optimizer_trace) => Some(optimizer_trace),
859 _ => None,
860 };
861 optimizer_trace.map(|optimizer_trace| optimizer_trace.as_guard())
862 }
863
864 pub(crate) fn needs_cluster(&self) -> bool {
865 match self {
866 ExplainContext::None => true,
867 ExplainContext::Plan(..) => false,
868 ExplainContext::PlanInsightsNotice(..) => true,
869 ExplainContext::Pushdown => false,
870 }
871 }
872
873 pub(crate) fn needs_plan_insights(&self) -> bool {
874 matches!(
875 self,
876 ExplainContext::Plan(ExplainPlanContext {
877 stage: ExplainStage::PlanInsights,
878 ..
879 }) | ExplainContext::PlanInsightsNotice(_)
880 )
881 }
882}
883
884#[derive(Debug)]
885pub struct ExplainPlanContext {
886 pub broken: bool,
891 pub config: ExplainConfig,
892 pub format: ExplainFormat,
893 pub stage: ExplainStage,
894 pub replan: Option<GlobalId>,
895 pub desc: Option<RelationDesc>,
896 pub optimizer_trace: OptimizerTrace,
897}
898
899#[derive(Debug)]
900pub enum CreateMaterializedViewStage {
901 Optimize(CreateMaterializedViewOptimize),
902 Finish(CreateMaterializedViewFinish),
903 Explain(CreateMaterializedViewExplain),
904}
905
906#[derive(Debug)]
907pub struct CreateMaterializedViewOptimize {
908 validity: PlanValidity,
909 plan: plan::CreateMaterializedViewPlan,
910 resolved_ids: ResolvedIds,
911 explain_ctx: ExplainContext,
914}
915
916#[derive(Debug)]
917pub struct CreateMaterializedViewFinish {
918 item_id: CatalogItemId,
920 global_id: GlobalId,
922 validity: PlanValidity,
923 plan: plan::CreateMaterializedViewPlan,
924 resolved_ids: ResolvedIds,
925 local_mir_plan: optimize::materialized_view::LocalMirPlan,
926 global_mir_plan: optimize::materialized_view::GlobalMirPlan,
927 global_lir_plan: optimize::materialized_view::GlobalLirPlan,
928 optimizer_features: OptimizerFeatures,
929}
930
931#[derive(Debug)]
932pub struct CreateMaterializedViewExplain {
933 global_id: GlobalId,
934 validity: PlanValidity,
935 plan: plan::CreateMaterializedViewPlan,
936 df_meta: DataflowMetainfo,
937 explain_ctx: ExplainPlanContext,
938}
939
940#[derive(Debug)]
941pub enum SubscribeStage {
942 OptimizeMir(SubscribeOptimizeMir),
943 TimestampOptimizeLir(SubscribeTimestampOptimizeLir),
944 Finish(SubscribeFinish),
945 Explain(SubscribeExplain),
946}
947
948#[derive(Debug)]
949pub struct SubscribeOptimizeMir {
950 validity: PlanValidity,
951 plan: plan::SubscribePlan,
952 timeline: TimelineContext,
953 dependency_ids: BTreeSet<GlobalId>,
954 cluster_id: ComputeInstanceId,
955 replica_id: Option<ReplicaId>,
956 explain_ctx: ExplainContext,
959}
960
961#[derive(Debug)]
962pub struct SubscribeTimestampOptimizeLir {
963 validity: PlanValidity,
964 plan: plan::SubscribePlan,
965 timeline: TimelineContext,
966 optimizer: optimize::subscribe::Optimizer,
967 global_mir_plan: optimize::subscribe::GlobalMirPlan<optimize::subscribe::Unresolved>,
968 dependency_ids: BTreeSet<GlobalId>,
969 replica_id: Option<ReplicaId>,
970 explain_ctx: ExplainContext,
973}
974
975#[derive(Debug)]
976pub struct SubscribeFinish {
977 validity: PlanValidity,
978 cluster_id: ComputeInstanceId,
979 replica_id: Option<ReplicaId>,
980 plan: plan::SubscribePlan,
981 global_lir_plan: optimize::subscribe::GlobalLirPlan,
982 dependency_ids: BTreeSet<GlobalId>,
983}
984
985#[derive(Debug)]
986pub struct SubscribeExplain {
987 validity: PlanValidity,
988 optimizer: optimize::subscribe::Optimizer,
989 df_meta: DataflowMetainfo,
990 cluster_id: ComputeInstanceId,
991 explain_ctx: ExplainPlanContext,
992}
993
994#[derive(Debug)]
995pub enum IntrospectionSubscribeStage {
996 OptimizeMir(IntrospectionSubscribeOptimizeMir),
997 TimestampOptimizeLir(IntrospectionSubscribeTimestampOptimizeLir),
998 Finish(IntrospectionSubscribeFinish),
999}
1000
1001#[derive(Debug)]
1002pub struct IntrospectionSubscribeOptimizeMir {
1003 validity: PlanValidity,
1004 plan: plan::SubscribePlan,
1005 subscribe_id: GlobalId,
1006 cluster_id: ComputeInstanceId,
1007 replica_id: ReplicaId,
1008}
1009
1010#[derive(Debug)]
1011pub struct IntrospectionSubscribeTimestampOptimizeLir {
1012 validity: PlanValidity,
1013 optimizer: optimize::subscribe::Optimizer,
1014 global_mir_plan: optimize::subscribe::GlobalMirPlan<optimize::subscribe::Unresolved>,
1015 cluster_id: ComputeInstanceId,
1016 replica_id: ReplicaId,
1017}
1018
1019#[derive(Debug)]
1020pub struct IntrospectionSubscribeFinish {
1021 validity: PlanValidity,
1022 global_lir_plan: optimize::subscribe::GlobalLirPlan,
1023 read_holds: ReadHolds,
1024 cluster_id: ComputeInstanceId,
1025 replica_id: ReplicaId,
1026}
1027
1028#[derive(Debug)]
1029pub enum SecretStage {
1030 CreateEnsure(CreateSecretEnsure),
1031 CreateFinish(CreateSecretFinish),
1032 RotateKeysEnsure(RotateKeysSecretEnsure),
1033 RotateKeysFinish(RotateKeysSecretFinish),
1034 Alter(AlterSecret),
1035}
1036
1037#[derive(Debug)]
1038pub struct CreateSecretEnsure {
1039 validity: PlanValidity,
1040 plan: plan::CreateSecretPlan,
1041}
1042
1043#[derive(Debug)]
1044pub struct CreateSecretFinish {
1045 validity: PlanValidity,
1046 item_id: CatalogItemId,
1047 global_id: GlobalId,
1048 plan: plan::CreateSecretPlan,
1049}
1050
1051#[derive(Debug)]
1052pub struct RotateKeysSecretEnsure {
1053 validity: PlanValidity,
1054 id: CatalogItemId,
1055}
1056
1057#[derive(Debug)]
1058pub struct RotateKeysSecretFinish {
1059 validity: PlanValidity,
1060 ops: Vec<crate::catalog::Op>,
1061}
1062
1063#[derive(Debug)]
1064pub struct AlterSecret {
1065 validity: PlanValidity,
1066 plan: plan::AlterSecretPlan,
1067}
1068
1069#[derive(Debug, Copy, Clone, PartialEq, Eq)]
1074pub enum TargetCluster {
1075 CatalogServer,
1077 Active,
1079 Transaction(ClusterId),
1081}
1082
1083pub(crate) enum StageResult<T> {
1085 Handle(JoinHandle<Result<T, AdapterError>>),
1087 HandleRetire(JoinHandle<Result<ExecuteResponse, AdapterError>>),
1089 Immediate(T),
1091 Response(ExecuteResponse),
1093}
1094
1095pub(crate) trait Staged: Send {
1097 type Ctx: StagedContext;
1098
1099 fn validity(&mut self) -> &mut PlanValidity;
1100
1101 async fn stage(
1103 self,
1104 coord: &mut Coordinator,
1105 ctx: &mut Self::Ctx,
1106 ) -> Result<StageResult<Box<Self>>, AdapterError>;
1107
1108 fn message(self, ctx: Self::Ctx, span: Span) -> Message;
1110
1111 fn cancel_enabled(&self) -> bool;
1113}
1114
1115pub trait StagedContext {
1116 fn retire(self, result: Result<ExecuteResponse, AdapterError>);
1117 fn session(&self) -> Option<&Session>;
1118}
1119
1120impl StagedContext for ExecuteContext {
1121 fn retire(self, result: Result<ExecuteResponse, AdapterError>) {
1122 self.retire(result);
1123 }
1124
1125 fn session(&self) -> Option<&Session> {
1126 Some(self.session())
1127 }
1128}
1129
1130impl StagedContext for () {
1131 fn retire(self, _result: Result<ExecuteResponse, AdapterError>) {}
1132
1133 fn session(&self) -> Option<&Session> {
1134 None
1135 }
1136}
1137
1138pub struct Config {
1140 pub controller_config: ControllerConfig,
1141 pub controller_envd_epoch: NonZeroI64,
1142 pub storage: Box<dyn mz_catalog::durable::DurableCatalogState>,
1143 pub audit_logs_iterator: AuditLogIterator,
1144 pub timestamp_oracle_url: Option<SensitiveUrl>,
1145 pub unsafe_mode: bool,
1146 pub all_features: bool,
1147 pub build_info: &'static BuildInfo,
1148 pub environment_id: EnvironmentId,
1149 pub metrics_registry: MetricsRegistry,
1150 pub now: NowFn,
1151 pub secrets_controller: Arc<dyn SecretsController>,
1152 pub cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
1153 pub availability_zones: Vec<String>,
1154 pub cluster_replica_sizes: ClusterReplicaSizeMap,
1155 pub builtin_system_cluster_config: BootstrapBuiltinClusterConfig,
1156 pub builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig,
1157 pub builtin_probe_cluster_config: BootstrapBuiltinClusterConfig,
1158 pub builtin_support_cluster_config: BootstrapBuiltinClusterConfig,
1159 pub builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig,
1160 pub system_parameter_defaults: BTreeMap<String, String>,
1161 pub storage_usage_client: StorageUsageClient,
1162 pub storage_usage_collection_interval: Duration,
1163 pub storage_usage_retention_period: Option<Duration>,
1164 pub segment_client: Option<mz_segment::Client>,
1165 pub egress_addresses: Vec<IpNet>,
1166 pub remote_system_parameters: Option<BTreeMap<String, String>>,
1167 pub aws_account_id: Option<String>,
1168 pub aws_privatelink_availability_zones: Option<Vec<String>>,
1169 pub connection_context: ConnectionContext,
1170 pub connection_limit_callback: Box<dyn Fn(u64, u64) -> () + Send + Sync + 'static>,
1171 pub webhook_concurrency_limit: WebhookConcurrencyLimiter,
1172 pub http_host_name: Option<String>,
1173 pub tracing_handle: TracingHandle,
1174 pub read_only_controllers: bool,
1178
1179 pub caught_up_trigger: Option<Trigger>,
1183
1184 pub helm_chart_version: Option<String>,
1185 pub license_key: ValidatedLicenseKey,
1186 pub external_login_password_mz_system: Option<Password>,
1187 pub force_builtin_schema_migration: Option<String>,
1188}
1189
1190#[derive(Debug, Serialize)]
1192pub struct ConnMeta {
1193 secret_key: u32,
1198 connected_at: EpochMillis,
1200 user: User,
1201 application_name: String,
1202 uuid: Uuid,
1203 conn_id: ConnectionId,
1204 client_ip: Option<IpAddr>,
1205
1206 drop_sinks: BTreeSet<GlobalId>,
1209
1210 #[serde(skip)]
1212 deferred_lock: Option<OwnedMutexGuard<()>>,
1213
1214 pending_cluster_alters: BTreeSet<ClusterId>,
1217
1218 #[serde(skip)]
1220 notice_tx: mpsc::UnboundedSender<AdapterNotice>,
1221
1222 authenticated_role: RoleId,
1226}
1227
1228impl ConnMeta {
1229 pub fn conn_id(&self) -> &ConnectionId {
1230 &self.conn_id
1231 }
1232
1233 pub fn user(&self) -> &User {
1234 &self.user
1235 }
1236
1237 pub fn application_name(&self) -> &str {
1238 &self.application_name
1239 }
1240
1241 pub fn authenticated_role_id(&self) -> &RoleId {
1242 &self.authenticated_role
1243 }
1244
1245 pub fn uuid(&self) -> Uuid {
1246 self.uuid
1247 }
1248
1249 pub fn client_ip(&self) -> Option<IpAddr> {
1250 self.client_ip
1251 }
1252
1253 pub fn connected_at(&self) -> EpochMillis {
1254 self.connected_at
1255 }
1256}
1257
1258#[derive(Debug)]
1259pub struct PendingTxn {
1261 ctx: ExecuteContext,
1263 response: Result<PendingTxnResponse, AdapterError>,
1265 action: EndTransactionAction,
1267}
1268
1269#[derive(Debug)]
1270pub enum PendingTxnResponse {
1272 Committed {
1274 params: BTreeMap<&'static str, String>,
1276 },
1277 Rolledback {
1279 params: BTreeMap<&'static str, String>,
1281 },
1282}
1283
1284impl PendingTxnResponse {
1285 pub fn extend_params(&mut self, p: impl IntoIterator<Item = (&'static str, String)>) {
1286 match self {
1287 PendingTxnResponse::Committed { params }
1288 | PendingTxnResponse::Rolledback { params } => params.extend(p),
1289 }
1290 }
1291}
1292
1293impl From<PendingTxnResponse> for ExecuteResponse {
1294 fn from(value: PendingTxnResponse) -> Self {
1295 match value {
1296 PendingTxnResponse::Committed { params } => {
1297 ExecuteResponse::TransactionCommitted { params }
1298 }
1299 PendingTxnResponse::Rolledback { params } => {
1300 ExecuteResponse::TransactionRolledBack { params }
1301 }
1302 }
1303 }
1304}
1305
1306#[derive(Debug)]
1307pub struct PendingReadTxn {
1309 txn: PendingRead,
1311 timestamp_context: TimestampContext,
1313 created: Instant,
1315 num_requeues: u64,
1319 otel_ctx: OpenTelemetryContext,
1321}
1322
1323impl PendingReadTxn {
1324 pub fn timestamp_context(&self) -> &TimestampContext {
1326 &self.timestamp_context
1327 }
1328
1329 pub(crate) fn take_context(self) -> ExecuteContext {
1330 self.txn.take_context()
1331 }
1332}
1333
1334#[derive(Debug)]
1335enum PendingRead {
1337 Read {
1338 txn: PendingTxn,
1340 },
1341 ReadThenWrite {
1342 ctx: ExecuteContext,
1344 tx: oneshot::Sender<Option<ExecuteContext>>,
1347 },
1348}
1349
1350impl PendingRead {
1351 #[instrument(level = "debug")]
1356 pub fn finish(self) -> Option<(ExecuteContext, Result<ExecuteResponse, AdapterError>)> {
1357 match self {
1358 PendingRead::Read {
1359 txn:
1360 PendingTxn {
1361 mut ctx,
1362 response,
1363 action,
1364 },
1365 ..
1366 } => {
1367 let changed = ctx.session_mut().vars_mut().end_transaction(action);
1368 let response = response.map(|mut r| {
1370 r.extend_params(changed);
1371 ExecuteResponse::from(r)
1372 });
1373
1374 Some((ctx, response))
1375 }
1376 PendingRead::ReadThenWrite { ctx, tx, .. } => {
1377 let _ = tx.send(Some(ctx));
1379 None
1380 }
1381 }
1382 }
1383
1384 fn label(&self) -> &'static str {
1385 match self {
1386 PendingRead::Read { .. } => "read",
1387 PendingRead::ReadThenWrite { .. } => "read_then_write",
1388 }
1389 }
1390
1391 pub(crate) fn take_context(self) -> ExecuteContext {
1392 match self {
1393 PendingRead::Read { txn, .. } => txn.ctx,
1394 PendingRead::ReadThenWrite { ctx, tx, .. } => {
1395 let _ = tx.send(None);
1398 ctx
1399 }
1400 }
1401 }
1402}
1403
1404#[derive(Debug, Default)]
1414#[must_use]
1415pub struct ExecuteContextExtra {
1416 statement_uuid: Option<StatementLoggingId>,
1417}
1418
1419impl ExecuteContextExtra {
1420 pub(crate) fn new(statement_uuid: Option<StatementLoggingId>) -> Self {
1421 Self { statement_uuid }
1422 }
1423 pub fn is_trivial(&self) -> bool {
1424 self.statement_uuid.is_none()
1425 }
1426 pub fn contents(&self) -> Option<StatementLoggingId> {
1427 self.statement_uuid
1428 }
1429 #[must_use]
1433 pub(crate) fn retire(self) -> Option<StatementLoggingId> {
1434 self.statement_uuid
1435 }
1436}
1437
1438#[derive(Debug)]
1448#[must_use]
1449pub struct ExecuteContextGuard {
1450 extra: ExecuteContextExtra,
1451 coordinator_tx: mpsc::UnboundedSender<Message>,
1456}
1457
1458impl Default for ExecuteContextGuard {
1459 fn default() -> Self {
1460 let (tx, _rx) = mpsc::unbounded_channel();
1464 Self {
1465 extra: ExecuteContextExtra::default(),
1466 coordinator_tx: tx,
1467 }
1468 }
1469}
1470
1471impl ExecuteContextGuard {
1472 pub(crate) fn new(
1473 statement_uuid: Option<StatementLoggingId>,
1474 coordinator_tx: mpsc::UnboundedSender<Message>,
1475 ) -> Self {
1476 Self {
1477 extra: ExecuteContextExtra::new(statement_uuid),
1478 coordinator_tx,
1479 }
1480 }
1481 pub fn is_trivial(&self) -> bool {
1482 self.extra.is_trivial()
1483 }
1484 pub fn contents(&self) -> Option<StatementLoggingId> {
1485 self.extra.contents()
1486 }
1487 pub(crate) fn defuse(mut self) -> ExecuteContextExtra {
1494 std::mem::take(&mut self.extra)
1496 }
1497}
1498
1499impl Drop for ExecuteContextGuard {
1500 fn drop(&mut self) {
1501 if let Some(statement_uuid) = self.extra.statement_uuid.take() {
1502 let msg = Message::RetireExecute {
1505 data: ExecuteContextExtra {
1506 statement_uuid: Some(statement_uuid),
1507 },
1508 otel_ctx: OpenTelemetryContext::obtain(),
1509 reason: StatementEndedExecutionReason::Aborted,
1510 };
1511 let _ = self.coordinator_tx.send(msg);
1514 }
1515 }
1516}
1517
1518#[derive(Debug)]
1530pub struct ExecuteContext {
1531 inner: Box<ExecuteContextInner>,
1532}
1533
1534impl std::ops::Deref for ExecuteContext {
1535 type Target = ExecuteContextInner;
1536 fn deref(&self) -> &Self::Target {
1537 &*self.inner
1538 }
1539}
1540
1541impl std::ops::DerefMut for ExecuteContext {
1542 fn deref_mut(&mut self) -> &mut Self::Target {
1543 &mut *self.inner
1544 }
1545}
1546
1547#[derive(Debug)]
1548pub struct ExecuteContextInner {
1549 tx: ClientTransmitter<ExecuteResponse>,
1550 internal_cmd_tx: mpsc::UnboundedSender<Message>,
1551 session: Session,
1552 extra: ExecuteContextGuard,
1553}
1554
1555impl ExecuteContext {
1556 pub fn session(&self) -> &Session {
1557 &self.session
1558 }
1559
1560 pub fn session_mut(&mut self) -> &mut Session {
1561 &mut self.session
1562 }
1563
1564 pub fn tx(&self) -> &ClientTransmitter<ExecuteResponse> {
1565 &self.tx
1566 }
1567
1568 pub fn tx_mut(&mut self) -> &mut ClientTransmitter<ExecuteResponse> {
1569 &mut self.tx
1570 }
1571
1572 pub fn from_parts(
1573 tx: ClientTransmitter<ExecuteResponse>,
1574 internal_cmd_tx: mpsc::UnboundedSender<Message>,
1575 session: Session,
1576 extra: ExecuteContextGuard,
1577 ) -> Self {
1578 Self {
1579 inner: ExecuteContextInner {
1580 tx,
1581 session,
1582 extra,
1583 internal_cmd_tx,
1584 }
1585 .into(),
1586 }
1587 }
1588
1589 pub fn into_parts(
1598 self,
1599 ) -> (
1600 ClientTransmitter<ExecuteResponse>,
1601 mpsc::UnboundedSender<Message>,
1602 Session,
1603 ExecuteContextGuard,
1604 ) {
1605 let ExecuteContextInner {
1606 tx,
1607 internal_cmd_tx,
1608 session,
1609 extra,
1610 } = *self.inner;
1611 (tx, internal_cmd_tx, session, extra)
1612 }
1613
1614 #[instrument(level = "debug")]
1616 pub fn retire(self, result: Result<ExecuteResponse, AdapterError>) {
1617 let ExecuteContextInner {
1618 tx,
1619 internal_cmd_tx,
1620 session,
1621 extra,
1622 } = *self.inner;
1623 let reason = if extra.is_trivial() {
1624 None
1625 } else {
1626 Some((&result).into())
1627 };
1628 tx.send(result, session);
1629 if let Some(reason) = reason {
1630 let extra = extra.defuse();
1632 if let Err(e) = internal_cmd_tx.send(Message::RetireExecute {
1633 otel_ctx: OpenTelemetryContext::obtain(),
1634 data: extra,
1635 reason,
1636 }) {
1637 warn!("internal_cmd_rx dropped before we could send: {:?}", e);
1638 }
1639 }
1640 }
1641
1642 pub fn extra(&self) -> &ExecuteContextGuard {
1643 &self.extra
1644 }
1645
1646 pub fn extra_mut(&mut self) -> &mut ExecuteContextGuard {
1647 &mut self.extra
1648 }
1649}
1650
1651#[derive(Debug)]
1652struct ClusterReplicaStatuses(
1653 BTreeMap<ClusterId, BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>>>,
1654);
1655
1656impl ClusterReplicaStatuses {
1657 pub(crate) fn new() -> ClusterReplicaStatuses {
1658 ClusterReplicaStatuses(BTreeMap::new())
1659 }
1660
1661 pub(crate) fn initialize_cluster_statuses(&mut self, cluster_id: ClusterId) {
1665 let prev = self.0.insert(cluster_id, BTreeMap::new());
1666 assert_eq!(
1667 prev, None,
1668 "cluster {cluster_id} statuses already initialized"
1669 );
1670 }
1671
1672 pub(crate) fn initialize_cluster_replica_statuses(
1676 &mut self,
1677 cluster_id: ClusterId,
1678 replica_id: ReplicaId,
1679 num_processes: usize,
1680 time: DateTime<Utc>,
1681 ) {
1682 tracing::info!(
1683 ?cluster_id,
1684 ?replica_id,
1685 ?time,
1686 "initializing cluster replica status"
1687 );
1688 let replica_statuses = self.0.entry(cluster_id).or_default();
1689 let process_statuses = (0..num_processes)
1690 .map(|process_id| {
1691 let status = ClusterReplicaProcessStatus {
1692 status: ClusterStatus::Offline(Some(OfflineReason::Initializing)),
1693 time: time.clone(),
1694 };
1695 (u64::cast_from(process_id), status)
1696 })
1697 .collect();
1698 let prev = replica_statuses.insert(replica_id, process_statuses);
1699 assert_none!(
1700 prev,
1701 "cluster replica {cluster_id}.{replica_id} statuses already initialized"
1702 );
1703 }
1704
1705 pub(crate) fn remove_cluster_statuses(
1709 &mut self,
1710 cluster_id: &ClusterId,
1711 ) -> BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>> {
1712 let prev = self.0.remove(cluster_id);
1713 prev.unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"))
1714 }
1715
1716 pub(crate) fn remove_cluster_replica_statuses(
1720 &mut self,
1721 cluster_id: &ClusterId,
1722 replica_id: &ReplicaId,
1723 ) -> BTreeMap<ProcessId, ClusterReplicaProcessStatus> {
1724 let replica_statuses = self
1725 .0
1726 .get_mut(cluster_id)
1727 .unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"));
1728 let prev = replica_statuses.remove(replica_id);
1729 prev.unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1730 }
1731
1732 pub(crate) fn ensure_cluster_status(
1736 &mut self,
1737 cluster_id: ClusterId,
1738 replica_id: ReplicaId,
1739 process_id: ProcessId,
1740 status: ClusterReplicaProcessStatus,
1741 ) {
1742 let replica_statuses = self
1743 .0
1744 .get_mut(&cluster_id)
1745 .unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"))
1746 .get_mut(&replica_id)
1747 .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"));
1748 replica_statuses.insert(process_id, status);
1749 }
1750
1751 pub fn get_cluster_replica_status(
1755 &self,
1756 cluster_id: ClusterId,
1757 replica_id: ReplicaId,
1758 ) -> ClusterStatus {
1759 let process_status = self.get_cluster_replica_statuses(cluster_id, replica_id);
1760 Self::cluster_replica_status(process_status)
1761 }
1762
1763 pub fn cluster_replica_status(
1765 process_status: &BTreeMap<ProcessId, ClusterReplicaProcessStatus>,
1766 ) -> ClusterStatus {
1767 process_status
1768 .values()
1769 .fold(ClusterStatus::Online, |s, p| match (s, p.status) {
1770 (ClusterStatus::Online, ClusterStatus::Online) => ClusterStatus::Online,
1771 (x, y) => {
1772 let reason_x = match x {
1773 ClusterStatus::Offline(reason) => reason,
1774 ClusterStatus::Online => None,
1775 };
1776 let reason_y = match y {
1777 ClusterStatus::Offline(reason) => reason,
1778 ClusterStatus::Online => None,
1779 };
1780 ClusterStatus::Offline(reason_x.or(reason_y))
1782 }
1783 })
1784 }
1785
1786 pub(crate) fn get_cluster_replica_statuses(
1790 &self,
1791 cluster_id: ClusterId,
1792 replica_id: ReplicaId,
1793 ) -> &BTreeMap<ProcessId, ClusterReplicaProcessStatus> {
1794 self.try_get_cluster_replica_statuses(cluster_id, replica_id)
1795 .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1796 }
1797
1798 pub(crate) fn try_get_cluster_replica_statuses(
1800 &self,
1801 cluster_id: ClusterId,
1802 replica_id: ReplicaId,
1803 ) -> Option<&BTreeMap<ProcessId, ClusterReplicaProcessStatus>> {
1804 self.try_get_cluster_statuses(cluster_id)
1805 .and_then(|statuses| statuses.get(&replica_id))
1806 }
1807
1808 pub(crate) fn try_get_cluster_statuses(
1810 &self,
1811 cluster_id: ClusterId,
1812 ) -> Option<&BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>>> {
1813 self.0.get(&cluster_id)
1814 }
1815}
1816
1817#[derive(Derivative)]
1819#[derivative(Debug)]
1820pub struct Coordinator {
1821 #[derivative(Debug = "ignore")]
1823 controller: mz_controller::Controller,
1824 catalog: Arc<Catalog>,
1832
1833 persist_client: PersistClient,
1836
1837 internal_cmd_tx: mpsc::UnboundedSender<Message>,
1839 group_commit_tx: appends::GroupCommitNotifier,
1841
1842 strict_serializable_reads_tx: mpsc::UnboundedSender<(ConnectionId, PendingReadTxn)>,
1844
1845 global_timelines: BTreeMap<Timeline, TimelineState>,
1848
1849 transient_id_gen: Arc<TransientIdGen>,
1851 active_conns: BTreeMap<ConnectionId, ConnMeta>,
1854
1855 txn_read_holds: BTreeMap<ConnectionId, read_policy::ReadHolds>,
1859
1860 pending_peeks: BTreeMap<Uuid, PendingPeek>,
1864 client_pending_peeks: BTreeMap<ConnectionId, BTreeMap<Uuid, ClusterId>>,
1866
1867 pending_linearize_read_txns: BTreeMap<ConnectionId, PendingReadTxn>,
1869
1870 active_compute_sinks: BTreeMap<GlobalId, ActiveComputeSink>,
1872 active_webhooks: BTreeMap<CatalogItemId, WebhookAppenderInvalidator>,
1874 active_copies: BTreeMap<ConnectionId, ActiveCopyFrom>,
1877
1878 staged_cancellation: BTreeMap<ConnectionId, (watch::Sender<bool>, watch::Receiver<bool>)>,
1881 introspection_subscribes: BTreeMap<GlobalId, IntrospectionSubscribe>,
1883
1884 write_locks: BTreeMap<CatalogItemId, Arc<tokio::sync::Mutex<()>>>,
1886 deferred_write_ops: BTreeMap<ConnectionId, DeferredOp>,
1888
1889 pending_writes: Vec<PendingWriteTxn>,
1891
1892 advance_timelines_interval: Interval,
1902
1903 serialized_ddl: LockedVecDeque<DeferredPlanStatement>,
1912
1913 secrets_controller: Arc<dyn SecretsController>,
1916 caching_secrets_reader: CachingSecretsReader,
1918
1919 cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
1922
1923 storage_usage_client: StorageUsageClient,
1925 storage_usage_collection_interval: Duration,
1927
1928 #[derivative(Debug = "ignore")]
1930 segment_client: Option<mz_segment::Client>,
1931
1932 metrics: Metrics,
1934 optimizer_metrics: OptimizerMetrics,
1936
1937 tracing_handle: TracingHandle,
1939
1940 statement_logging: StatementLogging,
1942
1943 webhook_concurrency_limit: WebhookConcurrencyLimiter,
1945
1946 timestamp_oracle_config: Option<TimestampOracleConfig>,
1949
1950 check_cluster_scheduling_policies_interval: Interval,
1952
1953 cluster_scheduling_decisions: BTreeMap<ClusterId, BTreeMap<&'static str, SchedulingDecision>>,
1957
1958 caught_up_check_interval: Interval,
1961
1962 caught_up_check: Option<CaughtUpCheckContext>,
1965
1966 installed_watch_sets: BTreeMap<WatchSetId, (ConnectionId, WatchSetResponse)>,
1968
1969 connection_watch_sets: BTreeMap<ConnectionId, BTreeSet<WatchSetId>>,
1971
1972 cluster_replica_statuses: ClusterReplicaStatuses,
1974
1975 read_only_controllers: bool,
1979
1980 buffered_builtin_table_updates: Option<Vec<BuiltinTableUpdate>>,
1988
1989 license_key: ValidatedLicenseKey,
1990
1991 user_id_pool: IdPool,
1993}
1994
1995impl Coordinator {
1996 #[instrument(name = "coord::bootstrap")]
2000 pub(crate) async fn bootstrap(
2001 &mut self,
2002 boot_ts: Timestamp,
2003 migrated_storage_collections_0dt: BTreeSet<CatalogItemId>,
2004 mut builtin_table_updates: Vec<BuiltinTableUpdate>,
2005 cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
2006 uncached_local_exprs: BTreeMap<GlobalId, LocalExpressions>,
2007 audit_logs_iterator: AuditLogIterator,
2008 ) -> Result<(), AdapterError> {
2009 let bootstrap_start = Instant::now();
2010 info!("startup: coordinator init: bootstrap beginning");
2011 info!("startup: coordinator init: bootstrap: preamble beginning");
2012
2013 let cluster_statuses: Vec<(_, Vec<_>)> = self
2016 .catalog()
2017 .clusters()
2018 .map(|cluster| {
2019 (
2020 cluster.id(),
2021 cluster
2022 .replicas()
2023 .map(|replica| {
2024 (replica.replica_id, replica.config.location.num_processes())
2025 })
2026 .collect(),
2027 )
2028 })
2029 .collect();
2030 let now = self.now_datetime();
2031 for (cluster_id, replica_statuses) in cluster_statuses {
2032 self.cluster_replica_statuses
2033 .initialize_cluster_statuses(cluster_id);
2034 for (replica_id, num_processes) in replica_statuses {
2035 self.cluster_replica_statuses
2036 .initialize_cluster_replica_statuses(
2037 cluster_id,
2038 replica_id,
2039 num_processes,
2040 now,
2041 );
2042 }
2043 }
2044
2045 let system_config = self.catalog().system_config();
2046
2047 mz_metrics::update_dyncfg(&system_config.dyncfg_updates());
2049
2050 let compute_config = flags::compute_config(system_config);
2052 let storage_config = flags::storage_config(system_config);
2053 let scheduling_config = flags::orchestrator_scheduling_config(system_config);
2054 let dyncfg_updates = system_config.dyncfg_updates();
2055 self.controller.compute.update_configuration(compute_config);
2056 self.controller.storage.update_parameters(storage_config);
2057 self.controller
2058 .update_orchestrator_scheduling_config(scheduling_config);
2059 self.controller.update_configuration(dyncfg_updates);
2060
2061 self.validate_resource_limit_numeric(
2062 Numeric::zero(),
2063 self.current_credit_consumption_rate(),
2064 |system_vars| {
2065 self.license_key
2066 .max_credit_consumption_rate()
2067 .map_or_else(|| system_vars.max_credit_consumption_rate(), Numeric::from)
2068 },
2069 "cluster replica",
2070 MAX_CREDIT_CONSUMPTION_RATE.name(),
2071 )?;
2072
2073 let mut policies_to_set: BTreeMap<CompactionWindow, CollectionIdBundle> =
2074 Default::default();
2075
2076 let enable_worker_core_affinity =
2077 self.catalog().system_config().enable_worker_core_affinity();
2078 let enable_storage_introspection_logs = self
2079 .catalog()
2080 .system_config()
2081 .enable_storage_introspection_logs();
2082 for instance in self.catalog.clusters() {
2083 self.controller.create_cluster(
2084 instance.id,
2085 ClusterConfig {
2086 arranged_logs: instance.log_indexes.clone(),
2087 workload_class: instance.config.workload_class.clone(),
2088 },
2089 )?;
2090 for replica in instance.replicas() {
2091 let role = instance.role();
2092 self.controller.create_replica(
2093 instance.id,
2094 replica.replica_id,
2095 instance.name.clone(),
2096 replica.name.clone(),
2097 role,
2098 replica.config.clone(),
2099 enable_worker_core_affinity,
2100 enable_storage_introspection_logs,
2101 )?;
2102 }
2103 }
2104
2105 info!(
2106 "startup: coordinator init: bootstrap: preamble complete in {:?}",
2107 bootstrap_start.elapsed()
2108 );
2109
2110 let init_storage_collections_start = Instant::now();
2111 info!("startup: coordinator init: bootstrap: storage collections init beginning");
2112 self.bootstrap_storage_collections(&migrated_storage_collections_0dt)
2113 .await;
2114 info!(
2115 "startup: coordinator init: bootstrap: storage collections init complete in {:?}",
2116 init_storage_collections_start.elapsed()
2117 );
2118
2119 self.controller.start_compute_introspection_sink();
2124
2125 let sorting_start = Instant::now();
2126 info!("startup: coordinator init: bootstrap: sorting catalog entries");
2127 let entries = self.bootstrap_sort_catalog_entries();
2128 info!(
2129 "startup: coordinator init: bootstrap: sorting catalog entries complete in {:?}",
2130 sorting_start.elapsed()
2131 );
2132
2133 let optimize_dataflows_start = Instant::now();
2134 info!("startup: coordinator init: bootstrap: optimize dataflow plans beginning");
2135 let uncached_global_exps = self.bootstrap_dataflow_plans(&entries, cached_global_exprs)?;
2136 info!(
2137 "startup: coordinator init: bootstrap: optimize dataflow plans complete in {:?}",
2138 optimize_dataflows_start.elapsed()
2139 );
2140
2141 let _fut = self.catalog().update_expression_cache(
2143 uncached_local_exprs.into_iter().collect(),
2144 uncached_global_exps.into_iter().collect(),
2145 Default::default(),
2146 );
2147
2148 let bootstrap_as_ofs_start = Instant::now();
2152 info!("startup: coordinator init: bootstrap: dataflow as-of bootstrapping beginning");
2153 let dataflow_read_holds = self.bootstrap_dataflow_as_ofs().await;
2154 info!(
2155 "startup: coordinator init: bootstrap: dataflow as-of bootstrapping complete in {:?}",
2156 bootstrap_as_ofs_start.elapsed()
2157 );
2158
2159 let postamble_start = Instant::now();
2160 info!("startup: coordinator init: bootstrap: postamble beginning");
2161
2162 let logs: BTreeSet<_> = BUILTINS::logs()
2163 .map(|log| self.catalog().resolve_builtin_log(log))
2164 .flat_map(|item_id| self.catalog().get_global_ids(&item_id))
2165 .collect();
2166
2167 let mut privatelink_connections = BTreeMap::new();
2168
2169 for entry in &entries {
2170 debug!(
2171 "coordinator init: installing {} {}",
2172 entry.item().typ(),
2173 entry.id()
2174 );
2175 let mut policy = entry.item().initial_logical_compaction_window();
2176 match entry.item() {
2177 CatalogItem::Source(source) => {
2183 if source.custom_logical_compaction_window.is_none() {
2185 if let DataSourceDesc::IngestionExport { ingestion_id, .. } =
2186 source.data_source
2187 {
2188 policy = Some(
2189 self.catalog()
2190 .get_entry(&ingestion_id)
2191 .source()
2192 .expect("must be source")
2193 .custom_logical_compaction_window
2194 .unwrap_or_default(),
2195 );
2196 }
2197 }
2198 policies_to_set
2199 .entry(policy.expect("sources have a compaction window"))
2200 .or_insert_with(Default::default)
2201 .storage_ids
2202 .insert(source.global_id());
2203 }
2204 CatalogItem::Table(table) => {
2205 policies_to_set
2206 .entry(policy.expect("tables have a compaction window"))
2207 .or_insert_with(Default::default)
2208 .storage_ids
2209 .extend(table.global_ids());
2210 }
2211 CatalogItem::Index(idx) => {
2212 let policy_entry = policies_to_set
2213 .entry(policy.expect("indexes have a compaction window"))
2214 .or_insert_with(Default::default);
2215
2216 if logs.contains(&idx.on) {
2217 policy_entry
2218 .compute_ids
2219 .entry(idx.cluster_id)
2220 .or_insert_with(BTreeSet::new)
2221 .insert(idx.global_id());
2222 } else {
2223 let df_desc = self
2224 .catalog()
2225 .try_get_physical_plan(&idx.global_id())
2226 .expect("added in `bootstrap_dataflow_plans`")
2227 .clone();
2228
2229 let df_meta = self
2230 .catalog()
2231 .try_get_dataflow_metainfo(&idx.global_id())
2232 .expect("added in `bootstrap_dataflow_plans`");
2233
2234 if self.catalog().state().system_config().enable_mz_notices() {
2235 self.catalog().state().pack_optimizer_notices(
2237 &mut builtin_table_updates,
2238 df_meta.optimizer_notices.iter(),
2239 Diff::ONE,
2240 );
2241 }
2242
2243 policy_entry
2246 .compute_ids
2247 .entry(idx.cluster_id)
2248 .or_insert_with(Default::default)
2249 .extend(df_desc.export_ids());
2250
2251 self.controller
2252 .compute
2253 .create_dataflow(idx.cluster_id, df_desc, None)
2254 .unwrap_or_terminate("cannot fail to create dataflows");
2255 }
2256 }
2257 CatalogItem::View(_) => (),
2258 CatalogItem::MaterializedView(mview) => {
2259 policies_to_set
2260 .entry(policy.expect("materialized views have a compaction window"))
2261 .or_insert_with(Default::default)
2262 .storage_ids
2263 .insert(mview.global_id_writes());
2264
2265 let mut df_desc = self
2266 .catalog()
2267 .try_get_physical_plan(&mview.global_id_writes())
2268 .expect("added in `bootstrap_dataflow_plans`")
2269 .clone();
2270
2271 if let Some(initial_as_of) = mview.initial_as_of.clone() {
2272 df_desc.set_initial_as_of(initial_as_of);
2273 }
2274
2275 let until = mview
2277 .refresh_schedule
2278 .as_ref()
2279 .and_then(|s| s.last_refresh())
2280 .and_then(|r| r.try_step_forward());
2281 if let Some(until) = until {
2282 df_desc.until.meet_assign(&Antichain::from_elem(until));
2283 }
2284
2285 let df_meta = self
2286 .catalog()
2287 .try_get_dataflow_metainfo(&mview.global_id_writes())
2288 .expect("added in `bootstrap_dataflow_plans`");
2289
2290 if self.catalog().state().system_config().enable_mz_notices() {
2291 self.catalog().state().pack_optimizer_notices(
2293 &mut builtin_table_updates,
2294 df_meta.optimizer_notices.iter(),
2295 Diff::ONE,
2296 );
2297 }
2298
2299 self.ship_dataflow(df_desc, mview.cluster_id, mview.target_replica)
2300 .await;
2301
2302 if mview.replacement_target.is_none() {
2305 self.allow_writes(mview.cluster_id, mview.global_id_writes());
2306 }
2307 }
2308 CatalogItem::Sink(sink) => {
2309 policies_to_set
2310 .entry(CompactionWindow::Default)
2311 .or_insert_with(Default::default)
2312 .storage_ids
2313 .insert(sink.global_id());
2314 }
2315 CatalogItem::Connection(catalog_connection) => {
2316 if let ConnectionDetails::AwsPrivatelink(conn) = &catalog_connection.details {
2317 privatelink_connections.insert(
2318 entry.id(),
2319 VpcEndpointConfig {
2320 aws_service_name: conn.service_name.clone(),
2321 availability_zone_ids: conn.availability_zones.clone(),
2322 },
2323 );
2324 }
2325 }
2326 CatalogItem::ContinualTask(ct) => {
2327 policies_to_set
2328 .entry(policy.expect("continual tasks have a compaction window"))
2329 .or_insert_with(Default::default)
2330 .storage_ids
2331 .insert(ct.global_id());
2332
2333 let mut df_desc = self
2334 .catalog()
2335 .try_get_physical_plan(&ct.global_id())
2336 .expect("added in `bootstrap_dataflow_plans`")
2337 .clone();
2338
2339 if let Some(initial_as_of) = ct.initial_as_of.clone() {
2340 df_desc.set_initial_as_of(initial_as_of);
2341 }
2342
2343 let df_meta = self
2344 .catalog()
2345 .try_get_dataflow_metainfo(&ct.global_id())
2346 .expect("added in `bootstrap_dataflow_plans`");
2347
2348 if self.catalog().state().system_config().enable_mz_notices() {
2349 self.catalog().state().pack_optimizer_notices(
2351 &mut builtin_table_updates,
2352 df_meta.optimizer_notices.iter(),
2353 Diff::ONE,
2354 );
2355 }
2356
2357 self.ship_dataflow(df_desc, ct.cluster_id, None).await;
2358 self.allow_writes(ct.cluster_id, ct.global_id());
2359 }
2360 CatalogItem::Log(_)
2362 | CatalogItem::Type(_)
2363 | CatalogItem::Func(_)
2364 | CatalogItem::Secret(_) => {}
2365 }
2366 }
2367
2368 if let Some(cloud_resource_controller) = &self.cloud_resource_controller {
2369 let existing_vpc_endpoints = cloud_resource_controller
2371 .list_vpc_endpoints()
2372 .await
2373 .context("list vpc endpoints")?;
2374 let existing_vpc_endpoints = BTreeSet::from_iter(existing_vpc_endpoints.into_keys());
2375 let desired_vpc_endpoints = privatelink_connections.keys().cloned().collect();
2376 let vpc_endpoints_to_remove = existing_vpc_endpoints.difference(&desired_vpc_endpoints);
2377 for id in vpc_endpoints_to_remove {
2378 cloud_resource_controller
2379 .delete_vpc_endpoint(*id)
2380 .await
2381 .context("deleting extraneous vpc endpoint")?;
2382 }
2383
2384 for (id, spec) in privatelink_connections {
2386 cloud_resource_controller
2387 .ensure_vpc_endpoint(id, spec)
2388 .await
2389 .context("ensuring vpc endpoint")?;
2390 }
2391 }
2392
2393 drop(dataflow_read_holds);
2396 for (cw, policies) in policies_to_set {
2398 self.initialize_read_policies(&policies, cw).await;
2399 }
2400
2401 builtin_table_updates.extend(
2403 self.catalog().state().resolve_builtin_table_updates(
2404 self.catalog().state().pack_all_replica_size_updates(),
2405 ),
2406 );
2407
2408 debug!("startup: coordinator init: bootstrap: initializing migrated builtin tables");
2409 let migrated_updates_fut = if self.controller.read_only() {
2415 let min_timestamp = Timestamp::minimum();
2416 let migrated_builtin_table_updates: Vec<_> = builtin_table_updates
2417 .extract_if(.., |update| {
2418 let gid = self.catalog().get_entry(&update.id).latest_global_id();
2419 migrated_storage_collections_0dt.contains(&update.id)
2420 && self
2421 .controller
2422 .storage_collections
2423 .collection_frontiers(gid)
2424 .expect("all tables are registered")
2425 .write_frontier
2426 .elements()
2427 == &[min_timestamp]
2428 })
2429 .collect();
2430 if migrated_builtin_table_updates.is_empty() {
2431 futures::future::ready(()).boxed()
2432 } else {
2433 let mut grouped_appends: BTreeMap<GlobalId, Vec<TableData>> = BTreeMap::new();
2435 for update in migrated_builtin_table_updates {
2436 let gid = self.catalog().get_entry(&update.id).latest_global_id();
2437 grouped_appends.entry(gid).or_default().push(update.data);
2438 }
2439 info!(
2440 "coordinator init: rehydrating migrated builtin tables in read-only mode: {:?}",
2441 grouped_appends.keys().collect::<Vec<_>>()
2442 );
2443
2444 let mut all_appends = Vec::with_capacity(grouped_appends.len());
2446 for (item_id, table_data) in grouped_appends.into_iter() {
2447 let mut all_rows = Vec::new();
2448 let mut all_data = Vec::new();
2449 for data in table_data {
2450 match data {
2451 TableData::Rows(rows) => all_rows.extend(rows),
2452 TableData::Batches(_) => all_data.push(data),
2453 }
2454 }
2455 differential_dataflow::consolidation::consolidate(&mut all_rows);
2456 all_data.push(TableData::Rows(all_rows));
2457
2458 all_appends.push((item_id, all_data));
2460 }
2461
2462 let fut = self
2463 .controller
2464 .storage
2465 .append_table(min_timestamp, boot_ts.step_forward(), all_appends)
2466 .expect("cannot fail to append");
2467 async {
2468 fut.await
2469 .expect("One-shot shouldn't be dropped during bootstrap")
2470 .unwrap_or_terminate("cannot fail to append")
2471 }
2472 .boxed()
2473 }
2474 } else {
2475 futures::future::ready(()).boxed()
2476 };
2477
2478 info!(
2479 "startup: coordinator init: bootstrap: postamble complete in {:?}",
2480 postamble_start.elapsed()
2481 );
2482
2483 let builtin_update_start = Instant::now();
2484 info!("startup: coordinator init: bootstrap: generate builtin updates beginning");
2485
2486 if self.controller.read_only() {
2487 info!(
2488 "coordinator init: bootstrap: stashing builtin table updates while in read-only mode"
2489 );
2490
2491 let audit_join_start = Instant::now();
2493 info!("startup: coordinator init: bootstrap: audit log deserialization beginning");
2494 let audit_log_updates: Vec<_> = audit_logs_iterator
2495 .map(|(audit_log, ts)| StateUpdate {
2496 kind: StateUpdateKind::AuditLog(audit_log),
2497 ts,
2498 diff: StateDiff::Addition,
2499 })
2500 .collect();
2501 let audit_log_builtin_table_updates = self
2502 .catalog()
2503 .state()
2504 .generate_builtin_table_updates(audit_log_updates);
2505 builtin_table_updates.extend(audit_log_builtin_table_updates);
2506 info!(
2507 "startup: coordinator init: bootstrap: audit log deserialization complete in {:?}",
2508 audit_join_start.elapsed()
2509 );
2510 self.buffered_builtin_table_updates
2511 .as_mut()
2512 .expect("in read-only mode")
2513 .append(&mut builtin_table_updates);
2514 } else {
2515 self.bootstrap_tables(&entries, builtin_table_updates, audit_logs_iterator)
2516 .await;
2517 };
2518 info!(
2519 "startup: coordinator init: bootstrap: generate builtin updates complete in {:?}",
2520 builtin_update_start.elapsed()
2521 );
2522
2523 let cleanup_secrets_start = Instant::now();
2524 info!("startup: coordinator init: bootstrap: generate secret cleanup beginning");
2525 {
2529 let Self {
2532 secrets_controller,
2533 catalog,
2534 ..
2535 } = self;
2536
2537 let next_user_item_id = catalog.get_next_user_item_id().await?;
2538 let next_system_item_id = catalog.get_next_system_item_id().await?;
2539 let read_only = self.controller.read_only();
2540 let catalog_ids: BTreeSet<CatalogItemId> =
2545 catalog.entries().map(|entry| entry.id()).collect();
2546 let secrets_controller = Arc::clone(secrets_controller);
2547
2548 spawn(|| "cleanup-orphaned-secrets", async move {
2549 if read_only {
2550 info!(
2551 "coordinator init: not cleaning up orphaned secrets while in read-only mode"
2552 );
2553 return;
2554 }
2555 info!("coordinator init: cleaning up orphaned secrets");
2556
2557 match secrets_controller.list().await {
2558 Ok(controller_secrets) => {
2559 let controller_secrets: BTreeSet<CatalogItemId> =
2560 controller_secrets.into_iter().collect();
2561 let orphaned = controller_secrets.difference(&catalog_ids);
2562 for id in orphaned {
2563 let id_too_large = match id {
2564 CatalogItemId::System(id) => *id >= next_system_item_id,
2565 CatalogItemId::User(id) => *id >= next_user_item_id,
2566 CatalogItemId::IntrospectionSourceIndex(_)
2567 | CatalogItemId::Transient(_) => false,
2568 };
2569 if id_too_large {
2570 info!(
2571 %next_user_item_id, %next_system_item_id,
2572 "coordinator init: not deleting orphaned secret {id} that was likely created by a newer deploy generation"
2573 );
2574 } else {
2575 info!("coordinator init: deleting orphaned secret {id}");
2576 fail_point!("orphan_secrets");
2577 if let Err(e) = secrets_controller.delete(*id).await {
2578 warn!(
2579 "Dropping orphaned secret has encountered an error: {}",
2580 e
2581 );
2582 }
2583 }
2584 }
2585 }
2586 Err(e) => warn!("Failed to list secrets during orphan cleanup: {:?}", e),
2587 }
2588 });
2589 }
2590 info!(
2591 "startup: coordinator init: bootstrap: generate secret cleanup complete in {:?}",
2592 cleanup_secrets_start.elapsed()
2593 );
2594
2595 let final_steps_start = Instant::now();
2597 info!(
2598 "startup: coordinator init: bootstrap: migrate builtin tables in read-only mode beginning"
2599 );
2600 migrated_updates_fut
2601 .instrument(info_span!("coord::bootstrap::final"))
2602 .await;
2603
2604 debug!(
2605 "startup: coordinator init: bootstrap: announcing completion of initialization to controller"
2606 );
2607 self.controller.initialization_complete();
2609
2610 self.bootstrap_introspection_subscribes().await;
2612
2613 info!(
2614 "startup: coordinator init: bootstrap: migrate builtin tables in read-only mode complete in {:?}",
2615 final_steps_start.elapsed()
2616 );
2617
2618 info!(
2619 "startup: coordinator init: bootstrap complete in {:?}",
2620 bootstrap_start.elapsed()
2621 );
2622 Ok(())
2623 }
2624
2625 #[allow(clippy::async_yields_async)]
2630 #[instrument]
2631 async fn bootstrap_tables(
2632 &mut self,
2633 entries: &[CatalogEntry],
2634 mut builtin_table_updates: Vec<BuiltinTableUpdate>,
2635 audit_logs_iterator: AuditLogIterator,
2636 ) {
2637 struct TableMetadata<'a> {
2639 id: CatalogItemId,
2640 name: &'a QualifiedItemName,
2641 table: &'a Table,
2642 }
2643
2644 let table_metas: Vec<_> = entries
2646 .into_iter()
2647 .filter_map(|entry| {
2648 entry.table().map(|table| TableMetadata {
2649 id: entry.id(),
2650 name: entry.name(),
2651 table,
2652 })
2653 })
2654 .collect();
2655
2656 debug!("coordinator init: advancing all tables to current timestamp");
2658 let WriteTimestamp {
2659 timestamp: write_ts,
2660 advance_to,
2661 } = self.get_local_write_ts().await;
2662 let appends = table_metas
2663 .iter()
2664 .map(|meta| (meta.table.global_id_writes(), Vec::new()))
2665 .collect();
2666 let table_fence_rx = self
2670 .controller
2671 .storage
2672 .append_table(write_ts.clone(), advance_to, appends)
2673 .expect("invalid updates");
2674
2675 self.apply_local_write(write_ts).await;
2676
2677 debug!("coordinator init: resetting system tables");
2679 let read_ts = self.get_local_read_ts().await;
2680
2681 let mz_storage_usage_by_shard_schema: SchemaSpecifier = self
2684 .catalog()
2685 .resolve_system_schema(MZ_STORAGE_USAGE_BY_SHARD.schema)
2686 .into();
2687 let is_storage_usage_by_shard = |meta: &TableMetadata| -> bool {
2688 meta.name.item == MZ_STORAGE_USAGE_BY_SHARD.name
2689 && meta.name.qualifiers.schema_spec == mz_storage_usage_by_shard_schema
2690 };
2691
2692 let mut retraction_tasks = Vec::new();
2693 let mut system_tables: Vec<_> = table_metas
2694 .iter()
2695 .filter(|meta| meta.id.is_system() && !is_storage_usage_by_shard(meta))
2696 .collect();
2697
2698 let (audit_events_idx, _) = system_tables
2700 .iter()
2701 .find_position(|table| {
2702 table.id == self.catalog().resolve_builtin_table(&MZ_AUDIT_EVENTS)
2703 })
2704 .expect("mz_audit_events must exist");
2705 let audit_events = system_tables.remove(audit_events_idx);
2706 let audit_log_task = self.bootstrap_audit_log_table(
2707 audit_events.id,
2708 audit_events.name,
2709 audit_events.table,
2710 audit_logs_iterator,
2711 read_ts,
2712 );
2713
2714 for system_table in system_tables {
2715 let table_id = system_table.id;
2716 let full_name = self.catalog().resolve_full_name(system_table.name, None);
2717 debug!("coordinator init: resetting system table {full_name} ({table_id})");
2718
2719 let snapshot_fut = self
2721 .controller
2722 .storage_collections
2723 .snapshot_cursor(system_table.table.global_id_writes(), read_ts);
2724 let batch_fut = self
2725 .controller
2726 .storage_collections
2727 .create_update_builder(system_table.table.global_id_writes());
2728
2729 let task = spawn(|| format!("snapshot-{table_id}"), async move {
2730 let mut batch = batch_fut
2732 .await
2733 .unwrap_or_terminate("cannot fail to create a batch for a BuiltinTable");
2734 tracing::info!(?table_id, "starting snapshot");
2735 let mut snapshot_cursor = snapshot_fut
2737 .await
2738 .unwrap_or_terminate("cannot fail to snapshot");
2739
2740 while let Some(values) = snapshot_cursor.next().await {
2742 for (key, _t, d) in values {
2743 let d_invert = d.neg();
2744 batch.add(&key, &(), &d_invert).await;
2745 }
2746 }
2747 tracing::info!(?table_id, "finished snapshot");
2748
2749 let batch = batch.finish().await;
2750 BuiltinTableUpdate::batch(table_id, batch)
2751 });
2752 retraction_tasks.push(task);
2753 }
2754
2755 let retractions_res = futures::future::join_all(retraction_tasks).await;
2756 for retractions in retractions_res {
2757 builtin_table_updates.push(retractions);
2758 }
2759
2760 let audit_join_start = Instant::now();
2761 info!("startup: coordinator init: bootstrap: join audit log deserialization beginning");
2762 let audit_log_updates = audit_log_task.await;
2763 let audit_log_builtin_table_updates = self
2764 .catalog()
2765 .state()
2766 .generate_builtin_table_updates(audit_log_updates);
2767 builtin_table_updates.extend(audit_log_builtin_table_updates);
2768 info!(
2769 "startup: coordinator init: bootstrap: join audit log deserialization complete in {:?}",
2770 audit_join_start.elapsed()
2771 );
2772
2773 table_fence_rx
2775 .await
2776 .expect("One-shot shouldn't be dropped during bootstrap")
2777 .unwrap_or_terminate("cannot fail to append");
2778
2779 info!("coordinator init: sending builtin table updates");
2780 let (_builtin_updates_fut, write_ts) = self
2781 .builtin_table_update()
2782 .execute(builtin_table_updates)
2783 .await;
2784 info!(?write_ts, "our write ts");
2785 if let Some(write_ts) = write_ts {
2786 self.apply_local_write(write_ts).await;
2787 }
2788 }
2789
2790 #[instrument]
2794 fn bootstrap_audit_log_table<'a>(
2795 &self,
2796 table_id: CatalogItemId,
2797 name: &'a QualifiedItemName,
2798 table: &'a Table,
2799 audit_logs_iterator: AuditLogIterator,
2800 read_ts: Timestamp,
2801 ) -> JoinHandle<Vec<StateUpdate>> {
2802 let full_name = self.catalog().resolve_full_name(name, None);
2803 debug!("coordinator init: reconciling audit log: {full_name} ({table_id})");
2804 let current_contents_fut = self
2805 .controller
2806 .storage_collections
2807 .snapshot(table.global_id_writes(), read_ts);
2808 spawn(|| format!("snapshot-audit-log-{table_id}"), async move {
2809 let current_contents = current_contents_fut
2810 .await
2811 .unwrap_or_terminate("cannot fail to fetch snapshot");
2812 let contents_len = current_contents.len();
2813 debug!("coordinator init: audit log table ({table_id}) size {contents_len}");
2814
2815 let max_table_id = current_contents
2817 .into_iter()
2818 .filter(|(_, diff)| *diff == 1)
2819 .map(|(row, _diff)| row.unpack_first().unwrap_uint64())
2820 .sorted()
2821 .rev()
2822 .next();
2823
2824 audit_logs_iterator
2826 .take_while(|(audit_log, _)| match max_table_id {
2827 Some(id) => audit_log.event.sortable_id() > id,
2828 None => true,
2829 })
2830 .map(|(audit_log, ts)| StateUpdate {
2831 kind: StateUpdateKind::AuditLog(audit_log),
2832 ts,
2833 diff: StateDiff::Addition,
2834 })
2835 .collect::<Vec<_>>()
2836 })
2837 }
2838
2839 #[instrument]
2852 async fn bootstrap_storage_collections(
2853 &mut self,
2854 migrated_storage_collections: &BTreeSet<CatalogItemId>,
2855 ) {
2856 let catalog = self.catalog();
2857
2858 let source_desc = |object_id: GlobalId,
2859 data_source: &DataSourceDesc,
2860 desc: &RelationDesc,
2861 timeline: &Timeline| {
2862 let data_source = match data_source.clone() {
2863 DataSourceDesc::Ingestion { desc, cluster_id } => {
2865 let desc = desc.into_inline_connection(catalog.state());
2866 let ingestion = IngestionDescription::new(desc, cluster_id, object_id);
2867 DataSource::Ingestion(ingestion)
2868 }
2869 DataSourceDesc::OldSyntaxIngestion {
2870 desc,
2871 progress_subsource,
2872 data_config,
2873 details,
2874 cluster_id,
2875 } => {
2876 let desc = desc.into_inline_connection(catalog.state());
2877 let data_config = data_config.into_inline_connection(catalog.state());
2878 let progress_subsource =
2881 catalog.get_entry(&progress_subsource).latest_global_id();
2882 let mut ingestion =
2883 IngestionDescription::new(desc, cluster_id, progress_subsource);
2884 let legacy_export = SourceExport {
2885 storage_metadata: (),
2886 data_config,
2887 details,
2888 };
2889 ingestion.source_exports.insert(object_id, legacy_export);
2890
2891 DataSource::Ingestion(ingestion)
2892 }
2893 DataSourceDesc::IngestionExport {
2894 ingestion_id,
2895 external_reference: _,
2896 details,
2897 data_config,
2898 } => {
2899 let ingestion_id = catalog.get_entry(&ingestion_id).latest_global_id();
2902
2903 DataSource::IngestionExport {
2904 ingestion_id,
2905 details,
2906 data_config: data_config.into_inline_connection(catalog.state()),
2907 }
2908 }
2909 DataSourceDesc::Webhook { .. } => DataSource::Webhook,
2910 DataSourceDesc::Progress => DataSource::Progress,
2911 DataSourceDesc::Introspection(introspection) => {
2912 DataSource::Introspection(introspection)
2913 }
2914 DataSourceDesc::Catalog => DataSource::Other,
2915 };
2916 CollectionDescription {
2917 desc: desc.clone(),
2918 data_source,
2919 since: None,
2920 timeline: Some(timeline.clone()),
2921 primary: None,
2922 }
2923 };
2924
2925 let mut compute_collections = vec![];
2926 let mut collections = vec![];
2927 for entry in catalog.entries() {
2928 match entry.item() {
2929 CatalogItem::Source(source) => {
2930 collections.push((
2931 source.global_id(),
2932 source_desc(
2933 source.global_id(),
2934 &source.data_source,
2935 &source.desc,
2936 &source.timeline,
2937 ),
2938 ));
2939 }
2940 CatalogItem::Table(table) => {
2941 match &table.data_source {
2942 TableDataSource::TableWrites { defaults: _ } => {
2943 let versions: BTreeMap<_, _> = table
2944 .collection_descs()
2945 .map(|(gid, version, desc)| (version, (gid, desc)))
2946 .collect();
2947 let collection_descs = versions.iter().map(|(version, (gid, desc))| {
2948 let next_version = version.bump();
2949 let primary_collection =
2950 versions.get(&next_version).map(|(gid, _desc)| gid).copied();
2951 let mut collection_desc =
2952 CollectionDescription::for_table(desc.clone());
2953 collection_desc.primary = primary_collection;
2954
2955 (*gid, collection_desc)
2956 });
2957 collections.extend(collection_descs);
2958 }
2959 TableDataSource::DataSource {
2960 desc: data_source_desc,
2961 timeline,
2962 } => {
2963 soft_assert_eq_or_log!(table.collections.len(), 1);
2965 let collection_descs =
2966 table.collection_descs().map(|(gid, _version, desc)| {
2967 (
2968 gid,
2969 source_desc(
2970 entry.latest_global_id(),
2971 data_source_desc,
2972 &desc,
2973 timeline,
2974 ),
2975 )
2976 });
2977 collections.extend(collection_descs);
2978 }
2979 };
2980 }
2981 CatalogItem::MaterializedView(mv) => {
2982 let collection_descs = mv.collection_descs().map(|(gid, _version, desc)| {
2983 let collection_desc =
2984 CollectionDescription::for_other(desc, mv.initial_as_of.clone());
2985 (gid, collection_desc)
2986 });
2987
2988 collections.extend(collection_descs);
2989 compute_collections.push((mv.global_id_writes(), mv.desc.latest()));
2990 }
2991 CatalogItem::ContinualTask(ct) => {
2992 let collection_desc =
2993 CollectionDescription::for_other(ct.desc.clone(), ct.initial_as_of.clone());
2994 compute_collections.push((ct.global_id(), ct.desc.clone()));
2995 collections.push((ct.global_id(), collection_desc));
2996 }
2997 CatalogItem::Sink(sink) => {
2998 let storage_sink_from_entry = self.catalog().get_entry_by_global_id(&sink.from);
2999 let from_desc = storage_sink_from_entry
3000 .relation_desc()
3001 .expect("sinks can only be built on items with descs")
3002 .into_owned();
3003 let collection_desc = CollectionDescription {
3004 desc: KAFKA_PROGRESS_DESC.clone(),
3006 data_source: DataSource::Sink {
3007 desc: ExportDescription {
3008 sink: StorageSinkDesc {
3009 from: sink.from,
3010 from_desc,
3011 connection: sink
3012 .connection
3013 .clone()
3014 .into_inline_connection(self.catalog().state()),
3015 envelope: sink.envelope,
3016 as_of: Antichain::from_elem(Timestamp::minimum()),
3017 with_snapshot: sink.with_snapshot,
3018 version: sink.version,
3019 from_storage_metadata: (),
3020 to_storage_metadata: (),
3021 commit_interval: sink.commit_interval,
3022 },
3023 instance_id: sink.cluster_id,
3024 },
3025 },
3026 since: None,
3027 timeline: None,
3028 primary: None,
3029 };
3030 collections.push((sink.global_id, collection_desc));
3031 }
3032 CatalogItem::Log(_)
3033 | CatalogItem::View(_)
3034 | CatalogItem::Index(_)
3035 | CatalogItem::Type(_)
3036 | CatalogItem::Func(_)
3037 | CatalogItem::Secret(_)
3038 | CatalogItem::Connection(_) => (),
3039 }
3040 }
3041
3042 let register_ts = if self.controller.read_only() {
3043 self.get_local_read_ts().await
3044 } else {
3045 self.get_local_write_ts().await.timestamp
3048 };
3049
3050 let storage_metadata = self.catalog.state().storage_metadata();
3051 let migrated_storage_collections = migrated_storage_collections
3052 .into_iter()
3053 .flat_map(|item_id| self.catalog.get_entry(item_id).global_ids())
3054 .collect();
3055
3056 self.controller
3061 .storage
3062 .evolve_nullability_for_bootstrap(storage_metadata, compute_collections)
3063 .await
3064 .unwrap_or_terminate("cannot fail to evolve collections");
3065
3066 let mut pending: BTreeMap<_, _> = collections.into_iter().collect();
3079
3080 let transitive_dep_gids: BTreeMap<_, _> = pending
3082 .keys()
3083 .map(|gid| {
3084 let entry = self.catalog.get_entry_by_global_id(gid);
3085 let item_id = entry.id();
3086 let deps = self.catalog.state().transitive_uses(item_id);
3087 let dep_gids: BTreeSet<_> = deps
3088 .filter(|dep_id| *dep_id != item_id)
3091 .map(|dep_id| self.catalog.get_entry(&dep_id).latest_global_id())
3092 .filter(|dep_gid| pending.contains_key(dep_gid))
3094 .collect();
3095 (*gid, dep_gids)
3096 })
3097 .collect();
3098
3099 while !pending.is_empty() {
3100 let ready_gids: BTreeSet<_> = pending
3103 .keys()
3104 .filter(|gid| {
3105 let mut deps = transitive_dep_gids[gid].iter();
3106 !deps.any(|dep_gid| pending.contains_key(dep_gid))
3107 })
3108 .copied()
3109 .collect();
3110 let mut ready: Vec<_> = pending
3111 .extract_if(.., |gid, _| ready_gids.contains(gid))
3112 .collect();
3113
3114 for (gid, collection) in &mut ready {
3116 if !gid.is_system() || collection.since.is_some() {
3118 continue;
3119 }
3120
3121 let mut derived_since = Antichain::from_elem(Timestamp::MIN);
3122 for dep_gid in &transitive_dep_gids[gid] {
3123 let (since, _) = self
3124 .controller
3125 .storage
3126 .collection_frontiers(*dep_gid)
3127 .expect("previously registered");
3128 derived_since.join_assign(&since);
3129 }
3130 collection.since = Some(derived_since);
3131 }
3132
3133 if ready.is_empty() {
3134 soft_panic_or_log!(
3135 "cycle in storage collections: {:?}",
3136 pending.keys().collect::<Vec<_>>(),
3137 );
3138 ready = mem::take(&mut pending).into_iter().collect();
3142 }
3143
3144 self.controller
3145 .storage
3146 .create_collections_for_bootstrap(
3147 storage_metadata,
3148 Some(register_ts),
3149 ready,
3150 &migrated_storage_collections,
3151 )
3152 .await
3153 .unwrap_or_terminate("cannot fail to create collections");
3154 }
3155
3156 if !self.controller.read_only() {
3157 self.apply_local_write(register_ts).await;
3158 }
3159 }
3160
3161 fn bootstrap_sort_catalog_entries(&self) -> Vec<CatalogEntry> {
3168 let mut indexes_on = BTreeMap::<_, Vec<_>>::new();
3169 let mut non_indexes = Vec::new();
3170 for entry in self.catalog().entries().cloned() {
3171 if let Some(index) = entry.index() {
3172 let on = self.catalog().get_entry_by_global_id(&index.on);
3173 indexes_on.entry(on.id()).or_default().push(entry);
3174 } else {
3175 non_indexes.push(entry);
3176 }
3177 }
3178
3179 let key_fn = |entry: &CatalogEntry| entry.id;
3180 let dependencies_fn = |entry: &CatalogEntry| entry.uses();
3181 sort_topological(&mut non_indexes, key_fn, dependencies_fn);
3182
3183 let mut result = Vec::new();
3184 for entry in non_indexes {
3185 let id = entry.id();
3186 result.push(entry);
3187 if let Some(mut indexes) = indexes_on.remove(&id) {
3188 result.append(&mut indexes);
3189 }
3190 }
3191
3192 soft_assert_or_log!(
3193 indexes_on.is_empty(),
3194 "indexes with missing dependencies: {indexes_on:?}",
3195 );
3196
3197 result
3198 }
3199
3200 #[instrument]
3211 fn bootstrap_dataflow_plans(
3212 &mut self,
3213 ordered_catalog_entries: &[CatalogEntry],
3214 mut cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
3215 ) -> Result<BTreeMap<GlobalId, GlobalExpressions>, AdapterError> {
3216 let mut instance_snapshots = BTreeMap::new();
3222 let mut uncached_expressions = BTreeMap::new();
3223
3224 let optimizer_config = |catalog: &Catalog, cluster_id| {
3225 let system_config = catalog.system_config();
3226 let overrides = catalog.get_cluster(cluster_id).config.features();
3227 OptimizerConfig::from(system_config).override_from(&overrides)
3228 };
3229
3230 for entry in ordered_catalog_entries {
3231 match entry.item() {
3232 CatalogItem::Index(idx) => {
3233 let compute_instance =
3235 instance_snapshots.entry(idx.cluster_id).or_insert_with(|| {
3236 self.instance_snapshot(idx.cluster_id)
3237 .expect("compute instance exists")
3238 });
3239 let global_id = idx.global_id();
3240
3241 if compute_instance.contains_collection(&global_id) {
3244 continue;
3245 }
3246
3247 let optimizer_config = optimizer_config(&self.catalog, idx.cluster_id);
3248
3249 let (optimized_plan, physical_plan, metainfo) =
3250 match cached_global_exprs.remove(&global_id) {
3251 Some(global_expressions)
3252 if global_expressions.optimizer_features
3253 == optimizer_config.features =>
3254 {
3255 debug!("global expression cache hit for {global_id:?}");
3256 (
3257 global_expressions.global_mir,
3258 global_expressions.physical_plan,
3259 global_expressions.dataflow_metainfos,
3260 )
3261 }
3262 Some(_) | None => {
3263 let (optimized_plan, global_lir_plan) = {
3264 let mut optimizer = optimize::index::Optimizer::new(
3266 self.owned_catalog(),
3267 compute_instance.clone(),
3268 global_id,
3269 optimizer_config.clone(),
3270 self.optimizer_metrics(),
3271 );
3272
3273 let index_plan = optimize::index::Index::new(
3275 entry.name().clone(),
3276 idx.on,
3277 idx.keys.to_vec(),
3278 );
3279 let global_mir_plan = optimizer.optimize(index_plan)?;
3280 let optimized_plan = global_mir_plan.df_desc().clone();
3281
3282 let global_lir_plan = optimizer.optimize(global_mir_plan)?;
3284
3285 (optimized_plan, global_lir_plan)
3286 };
3287
3288 let (physical_plan, metainfo) = global_lir_plan.unapply();
3289 let metainfo = {
3290 let notice_ids =
3292 std::iter::repeat_with(|| self.allocate_transient_id())
3293 .map(|(_item_id, gid)| gid)
3294 .take(metainfo.optimizer_notices.len())
3295 .collect::<Vec<_>>();
3296 self.catalog().render_notices(
3298 metainfo,
3299 notice_ids,
3300 Some(idx.global_id()),
3301 )
3302 };
3303 uncached_expressions.insert(
3304 global_id,
3305 GlobalExpressions {
3306 global_mir: optimized_plan.clone(),
3307 physical_plan: physical_plan.clone(),
3308 dataflow_metainfos: metainfo.clone(),
3309 optimizer_features: optimizer_config.features.clone(),
3310 },
3311 );
3312 (optimized_plan, physical_plan, metainfo)
3313 }
3314 };
3315
3316 let catalog = self.catalog_mut();
3317 catalog.set_optimized_plan(idx.global_id(), optimized_plan);
3318 catalog.set_physical_plan(idx.global_id(), physical_plan);
3319 catalog.set_dataflow_metainfo(idx.global_id(), metainfo);
3320
3321 compute_instance.insert_collection(idx.global_id());
3322 }
3323 CatalogItem::MaterializedView(mv) => {
3324 let compute_instance =
3326 instance_snapshots.entry(mv.cluster_id).or_insert_with(|| {
3327 self.instance_snapshot(mv.cluster_id)
3328 .expect("compute instance exists")
3329 });
3330 let global_id = mv.global_id_writes();
3331
3332 let optimizer_config = optimizer_config(&self.catalog, mv.cluster_id);
3333
3334 let (optimized_plan, physical_plan, metainfo) = match cached_global_exprs
3335 .remove(&global_id)
3336 {
3337 Some(global_expressions)
3338 if global_expressions.optimizer_features
3339 == optimizer_config.features =>
3340 {
3341 debug!("global expression cache hit for {global_id:?}");
3342 (
3343 global_expressions.global_mir,
3344 global_expressions.physical_plan,
3345 global_expressions.dataflow_metainfos,
3346 )
3347 }
3348 Some(_) | None => {
3349 let (_, internal_view_id) = self.allocate_transient_id();
3350 let debug_name = self
3351 .catalog()
3352 .resolve_full_name(entry.name(), None)
3353 .to_string();
3354 let force_non_monotonic = Default::default();
3355
3356 let (optimized_plan, global_lir_plan) = {
3357 let mut optimizer = optimize::materialized_view::Optimizer::new(
3359 self.owned_catalog().as_optimizer_catalog(),
3360 compute_instance.clone(),
3361 global_id,
3362 internal_view_id,
3363 mv.desc.latest().iter_names().cloned().collect(),
3364 mv.non_null_assertions.clone(),
3365 mv.refresh_schedule.clone(),
3366 debug_name,
3367 optimizer_config.clone(),
3368 self.optimizer_metrics(),
3369 force_non_monotonic,
3370 );
3371
3372 let typ = infer_sql_type_for_catalog(
3375 &mv.raw_expr,
3376 &mv.locally_optimized_expr.as_ref().clone(),
3377 );
3378 let global_mir_plan = optimizer
3379 .optimize((mv.locally_optimized_expr.as_ref().clone(), typ))?;
3380 let optimized_plan = global_mir_plan.df_desc().clone();
3381
3382 let global_lir_plan = optimizer.optimize(global_mir_plan)?;
3384
3385 (optimized_plan, global_lir_plan)
3386 };
3387
3388 let (physical_plan, metainfo) = global_lir_plan.unapply();
3389 let metainfo = {
3390 let notice_ids =
3392 std::iter::repeat_with(|| self.allocate_transient_id())
3393 .map(|(_item_id, global_id)| global_id)
3394 .take(metainfo.optimizer_notices.len())
3395 .collect::<Vec<_>>();
3396 self.catalog().render_notices(
3398 metainfo,
3399 notice_ids,
3400 Some(mv.global_id_writes()),
3401 )
3402 };
3403 uncached_expressions.insert(
3404 global_id,
3405 GlobalExpressions {
3406 global_mir: optimized_plan.clone(),
3407 physical_plan: physical_plan.clone(),
3408 dataflow_metainfos: metainfo.clone(),
3409 optimizer_features: optimizer_config.features.clone(),
3410 },
3411 );
3412 (optimized_plan, physical_plan, metainfo)
3413 }
3414 };
3415
3416 let catalog = self.catalog_mut();
3417 catalog.set_optimized_plan(mv.global_id_writes(), optimized_plan);
3418 catalog.set_physical_plan(mv.global_id_writes(), physical_plan);
3419 catalog.set_dataflow_metainfo(mv.global_id_writes(), metainfo);
3420
3421 compute_instance.insert_collection(mv.global_id_writes());
3422 }
3423 CatalogItem::ContinualTask(ct) => {
3424 let compute_instance =
3425 instance_snapshots.entry(ct.cluster_id).or_insert_with(|| {
3426 self.instance_snapshot(ct.cluster_id)
3427 .expect("compute instance exists")
3428 });
3429 let global_id = ct.global_id();
3430
3431 let optimizer_config = optimizer_config(&self.catalog, ct.cluster_id);
3432
3433 let (optimized_plan, physical_plan, metainfo) =
3434 match cached_global_exprs.remove(&global_id) {
3435 Some(global_expressions)
3436 if global_expressions.optimizer_features
3437 == optimizer_config.features =>
3438 {
3439 debug!("global expression cache hit for {global_id:?}");
3440 (
3441 global_expressions.global_mir,
3442 global_expressions.physical_plan,
3443 global_expressions.dataflow_metainfos,
3444 )
3445 }
3446 Some(_) | None => {
3447 let debug_name = self
3448 .catalog()
3449 .resolve_full_name(entry.name(), None)
3450 .to_string();
3451 let (optimized_plan, physical_plan, metainfo, optimizer_features) =
3452 self.optimize_create_continual_task(
3453 ct,
3454 global_id,
3455 self.owned_catalog(),
3456 debug_name,
3457 )?;
3458 uncached_expressions.insert(
3459 global_id,
3460 GlobalExpressions {
3461 global_mir: optimized_plan.clone(),
3462 physical_plan: physical_plan.clone(),
3463 dataflow_metainfos: metainfo.clone(),
3464 optimizer_features,
3465 },
3466 );
3467 (optimized_plan, physical_plan, metainfo)
3468 }
3469 };
3470
3471 let catalog = self.catalog_mut();
3472 catalog.set_optimized_plan(ct.global_id(), optimized_plan);
3473 catalog.set_physical_plan(ct.global_id(), physical_plan);
3474 catalog.set_dataflow_metainfo(ct.global_id(), metainfo);
3475
3476 compute_instance.insert_collection(ct.global_id());
3477 }
3478 CatalogItem::Table(_)
3479 | CatalogItem::Source(_)
3480 | CatalogItem::Log(_)
3481 | CatalogItem::View(_)
3482 | CatalogItem::Sink(_)
3483 | CatalogItem::Type(_)
3484 | CatalogItem::Func(_)
3485 | CatalogItem::Secret(_)
3486 | CatalogItem::Connection(_) => (),
3487 }
3488 }
3489
3490 Ok(uncached_expressions)
3491 }
3492
3493 async fn bootstrap_dataflow_as_ofs(&mut self) -> BTreeMap<GlobalId, ReadHold> {
3503 let mut catalog_ids = Vec::new();
3504 let mut dataflows = Vec::new();
3505 let mut read_policies = BTreeMap::new();
3506 for entry in self.catalog.entries() {
3507 let gid = match entry.item() {
3508 CatalogItem::Index(idx) => idx.global_id(),
3509 CatalogItem::MaterializedView(mv) => mv.global_id_writes(),
3510 CatalogItem::ContinualTask(ct) => ct.global_id(),
3511 CatalogItem::Table(_)
3512 | CatalogItem::Source(_)
3513 | CatalogItem::Log(_)
3514 | CatalogItem::View(_)
3515 | CatalogItem::Sink(_)
3516 | CatalogItem::Type(_)
3517 | CatalogItem::Func(_)
3518 | CatalogItem::Secret(_)
3519 | CatalogItem::Connection(_) => continue,
3520 };
3521 if let Some(plan) = self.catalog.try_get_physical_plan(&gid) {
3522 catalog_ids.push(gid);
3523 dataflows.push(plan.clone());
3524
3525 if let Some(compaction_window) = entry.item().initial_logical_compaction_window() {
3526 read_policies.insert(gid, compaction_window.into());
3527 }
3528 }
3529 }
3530
3531 let read_ts = self.get_local_read_ts().await;
3532 let read_holds = as_of_selection::run(
3533 &mut dataflows,
3534 &read_policies,
3535 &*self.controller.storage_collections,
3536 read_ts,
3537 self.controller.read_only(),
3538 );
3539
3540 let catalog = self.catalog_mut();
3541 for (id, plan) in catalog_ids.into_iter().zip_eq(dataflows) {
3542 catalog.set_physical_plan(id, plan);
3543 }
3544
3545 read_holds
3546 }
3547
3548 fn serve(
3557 mut self,
3558 mut internal_cmd_rx: mpsc::UnboundedReceiver<Message>,
3559 mut strict_serializable_reads_rx: mpsc::UnboundedReceiver<(ConnectionId, PendingReadTxn)>,
3560 mut cmd_rx: mpsc::UnboundedReceiver<(OpenTelemetryContext, Command)>,
3561 group_commit_rx: appends::GroupCommitWaiter,
3562 ) -> LocalBoxFuture<'static, ()> {
3563 async move {
3564 let mut cluster_events = self.controller.events_stream();
3566 let last_message = Arc::new(Mutex::new(LastMessage {
3567 kind: "none",
3568 stmt: None,
3569 }));
3570
3571 let (idle_tx, mut idle_rx) = tokio::sync::mpsc::channel(1);
3572 let idle_metric = self.metrics.queue_busy_seconds.clone();
3573 let last_message_watchdog = Arc::clone(&last_message);
3574
3575 spawn(|| "coord watchdog", async move {
3576 let mut interval = tokio::time::interval(Duration::from_secs(5));
3581 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
3585
3586 let mut coord_stuck = false;
3588
3589 loop {
3590 interval.tick().await;
3591
3592 let duration = tokio::time::Duration::from_secs(30);
3594 let timeout = tokio::time::timeout(duration, idle_tx.reserve()).await;
3595 let Ok(maybe_permit) = timeout else {
3596 if !coord_stuck {
3598 let last_message = last_message_watchdog.lock().expect("poisoned");
3599 tracing::warn!(
3600 last_message_kind = %last_message.kind,
3601 last_message_sql = %last_message.stmt_to_string(),
3602 "coordinator stuck for {duration:?}",
3603 );
3604 }
3605 coord_stuck = true;
3606
3607 continue;
3608 };
3609
3610 if coord_stuck {
3612 tracing::info!("Coordinator became unstuck");
3613 }
3614 coord_stuck = false;
3615
3616 let Ok(permit) = maybe_permit else {
3618 break;
3619 };
3620
3621 permit.send(idle_metric.start_timer());
3622 }
3623 });
3624
3625 self.schedule_storage_usage_collection().await;
3626 self.spawn_privatelink_vpc_endpoints_watch_task();
3627 self.spawn_statement_logging_task();
3628 flags::tracing_config(self.catalog.system_config()).apply(&self.tracing_handle);
3629
3630 let warn_threshold = self
3632 .catalog()
3633 .system_config()
3634 .coord_slow_message_warn_threshold();
3635
3636 const MESSAGE_BATCH: usize = 64;
3638 let mut messages = Vec::with_capacity(MESSAGE_BATCH);
3639 let mut cmd_messages = Vec::with_capacity(MESSAGE_BATCH);
3640
3641 let message_batch = self.metrics.message_batch.clone();
3642
3643 loop {
3644 select! {
3648 biased;
3653
3654 _ = internal_cmd_rx.recv_many(&mut messages, MESSAGE_BATCH) => {},
3658 Some(event) = cluster_events.next() => {
3662 messages.push(Message::ClusterEvent(event))
3663 },
3664 () = self.controller.ready() => {
3668 let controller = match self.controller.get_readiness() {
3672 Readiness::Storage => ControllerReadiness::Storage,
3673 Readiness::Compute => ControllerReadiness::Compute,
3674 Readiness::Metrics(_) => ControllerReadiness::Metrics,
3675 Readiness::Internal(_) => ControllerReadiness::Internal,
3676 Readiness::NotReady => unreachable!("just signaled as ready"),
3677 };
3678 messages.push(Message::ControllerReady { controller });
3679 }
3680 permit = group_commit_rx.ready() => {
3683 let user_write_spans = self.pending_writes.iter().flat_map(|x| match x {
3689 PendingWriteTxn::User{span, ..} => Some(span),
3690 PendingWriteTxn::System{..} => None,
3691 });
3692 let span = match user_write_spans.exactly_one() {
3693 Ok(span) => span.clone(),
3694 Err(user_write_spans) => {
3695 let span = info_span!(parent: None, "group_commit_notify");
3696 for s in user_write_spans {
3697 span.follows_from(s);
3698 }
3699 span
3700 }
3701 };
3702 messages.push(Message::GroupCommitInitiate(span, Some(permit)));
3703 },
3704 count = cmd_rx.recv_many(&mut cmd_messages, MESSAGE_BATCH) => {
3708 if count == 0 {
3709 break;
3710 } else {
3711 messages.extend(cmd_messages.drain(..).map(
3712 |(otel_ctx, cmd)| Message::Command(otel_ctx, cmd),
3713 ));
3714 }
3715 },
3716 Some(pending_read_txn) = strict_serializable_reads_rx.recv() => {
3720 let mut pending_read_txns = vec![pending_read_txn];
3721 while let Ok(pending_read_txn) = strict_serializable_reads_rx.try_recv() {
3722 pending_read_txns.push(pending_read_txn);
3723 }
3724 for (conn_id, pending_read_txn) in pending_read_txns {
3725 let prev = self
3726 .pending_linearize_read_txns
3727 .insert(conn_id, pending_read_txn);
3728 soft_assert_or_log!(
3729 prev.is_none(),
3730 "connections can not have multiple concurrent reads, prev: {prev:?}"
3731 )
3732 }
3733 messages.push(Message::LinearizeReads);
3734 }
3735 _ = self.advance_timelines_interval.tick() => {
3739 let span = info_span!(parent: None, "coord::advance_timelines_interval");
3740 span.follows_from(Span::current());
3741
3742 if self.controller.read_only() {
3747 messages.push(Message::AdvanceTimelines);
3748 } else {
3749 messages.push(Message::GroupCommitInitiate(span, None));
3750 }
3751 },
3752 _ = self.check_cluster_scheduling_policies_interval.tick() => {
3756 messages.push(Message::CheckSchedulingPolicies);
3757 },
3758
3759 _ = self.caught_up_check_interval.tick() => {
3763 self.maybe_check_caught_up().await;
3768
3769 continue;
3770 },
3771
3772 timer = idle_rx.recv() => {
3777 timer.expect("does not drop").observe_duration();
3778 self.metrics
3779 .message_handling
3780 .with_label_values(&["watchdog"])
3781 .observe(0.0);
3782 continue;
3783 }
3784 };
3785
3786 message_batch.observe(f64::cast_lossy(messages.len()));
3788
3789 for msg in messages.drain(..) {
3790 let msg_kind = msg.kind();
3793 let span = span!(
3794 target: "mz_adapter::coord::handle_message_loop",
3795 Level::INFO,
3796 "coord::handle_message",
3797 kind = msg_kind
3798 );
3799 let otel_context = span.context().span().span_context().clone();
3800
3801 *last_message.lock().expect("poisoned") = LastMessage {
3805 kind: msg_kind,
3806 stmt: match &msg {
3807 Message::Command(
3808 _,
3809 Command::Execute {
3810 portal_name,
3811 session,
3812 ..
3813 },
3814 ) => session
3815 .get_portal_unverified(portal_name)
3816 .and_then(|p| p.stmt.as_ref().map(Arc::clone)),
3817 _ => None,
3818 },
3819 };
3820
3821 let start = Instant::now();
3822 self.handle_message(msg).instrument(span).await;
3823 let duration = start.elapsed();
3824
3825 self.metrics
3826 .message_handling
3827 .with_label_values(&[msg_kind])
3828 .observe(duration.as_secs_f64());
3829
3830 if duration > warn_threshold {
3832 let trace_id = otel_context.is_valid().then(|| otel_context.trace_id());
3833 tracing::error!(
3834 ?msg_kind,
3835 ?trace_id,
3836 ?duration,
3837 "very slow coordinator message"
3838 );
3839 }
3840 }
3841 }
3842 if let Some(catalog) = Arc::into_inner(self.catalog) {
3845 catalog.expire().await;
3846 }
3847 }
3848 .boxed_local()
3849 }
3850
3851 fn catalog(&self) -> &Catalog {
3853 &self.catalog
3854 }
3855
3856 fn owned_catalog(&self) -> Arc<Catalog> {
3859 Arc::clone(&self.catalog)
3860 }
3861
3862 fn optimizer_metrics(&self) -> OptimizerMetrics {
3865 self.optimizer_metrics.clone()
3866 }
3867
3868 fn catalog_mut(&mut self) -> &mut Catalog {
3870 Arc::make_mut(&mut self.catalog)
3878 }
3879
3880 async fn refill_user_id_pool(&mut self, min_count: u64) -> Result<(), AdapterError> {
3885 let batch_size = USER_ID_POOL_BATCH_SIZE.get(self.catalog().system_config().dyncfgs());
3886 let to_allocate = min_count.max(u64::from(batch_size));
3887 let id_ts = self.get_catalog_write_ts().await;
3888 let ids = self.catalog().allocate_user_ids(to_allocate, id_ts).await?;
3889 if let (Some((first_id, _)), Some((last_id, _))) = (ids.first(), ids.last()) {
3890 let start = match first_id {
3891 CatalogItemId::User(id) => *id,
3892 other => {
3893 return Err(AdapterError::Internal(format!(
3894 "expected User CatalogItemId, got {other:?}"
3895 )));
3896 }
3897 };
3898 let end = match last_id {
3899 CatalogItemId::User(id) => *id + 1, other => {
3901 return Err(AdapterError::Internal(format!(
3902 "expected User CatalogItemId, got {other:?}"
3903 )));
3904 }
3905 };
3906 self.user_id_pool.refill(start, end);
3907 } else {
3908 return Err(AdapterError::Internal(
3909 "catalog returned no user IDs".into(),
3910 ));
3911 }
3912 Ok(())
3913 }
3914
3915 async fn allocate_user_id(&mut self) -> Result<(CatalogItemId, GlobalId), AdapterError> {
3917 if let Some(id) = self.user_id_pool.allocate() {
3918 return Ok((CatalogItemId::User(id), GlobalId::User(id)));
3919 }
3920 self.refill_user_id_pool(1).await?;
3921 let id = self.user_id_pool.allocate().expect("ID pool just refilled");
3922 Ok((CatalogItemId::User(id), GlobalId::User(id)))
3923 }
3924
3925 async fn allocate_user_ids(
3927 &mut self,
3928 count: u64,
3929 ) -> Result<Vec<(CatalogItemId, GlobalId)>, AdapterError> {
3930 if self.user_id_pool.remaining() < count {
3931 self.refill_user_id_pool(count).await?;
3932 }
3933 let raw_ids = self
3934 .user_id_pool
3935 .allocate_many(count)
3936 .expect("pool has enough IDs after refill");
3937 Ok(raw_ids
3938 .into_iter()
3939 .map(|id| (CatalogItemId::User(id), GlobalId::User(id)))
3940 .collect())
3941 }
3942
3943 fn connection_context(&self) -> &ConnectionContext {
3945 self.controller.connection_context()
3946 }
3947
3948 fn secrets_reader(&self) -> &Arc<dyn SecretsReader> {
3950 &self.connection_context().secrets_reader
3951 }
3952
3953 #[allow(dead_code)]
3958 pub(crate) fn broadcast_notice(&self, notice: AdapterNotice) {
3959 for meta in self.active_conns.values() {
3960 let _ = meta.notice_tx.send(notice.clone());
3961 }
3962 }
3963
3964 pub(crate) fn broadcast_notice_tx(
3967 &self,
3968 ) -> Box<dyn FnOnce(AdapterNotice) -> () + Send + 'static> {
3969 let senders: Vec<_> = self
3970 .active_conns
3971 .values()
3972 .map(|meta| meta.notice_tx.clone())
3973 .collect();
3974 Box::new(move |notice| {
3975 for tx in senders {
3976 let _ = tx.send(notice.clone());
3977 }
3978 })
3979 }
3980
3981 pub(crate) fn active_conns(&self) -> &BTreeMap<ConnectionId, ConnMeta> {
3982 &self.active_conns
3983 }
3984
3985 #[instrument(level = "debug")]
3986 pub(crate) fn retire_execution(
3987 &mut self,
3988 reason: StatementEndedExecutionReason,
3989 ctx_extra: ExecuteContextExtra,
3990 ) {
3991 if let Some(uuid) = ctx_extra.retire() {
3992 self.end_statement_execution(uuid, reason);
3993 }
3994 }
3995
3996 #[instrument(level = "debug")]
3998 pub fn dataflow_builder(&self, instance: ComputeInstanceId) -> DataflowBuilder<'_> {
3999 let compute = self
4000 .instance_snapshot(instance)
4001 .expect("compute instance does not exist");
4002 DataflowBuilder::new(self.catalog().state(), compute)
4003 }
4004
4005 pub fn instance_snapshot(
4007 &self,
4008 id: ComputeInstanceId,
4009 ) -> Result<ComputeInstanceSnapshot, InstanceMissing> {
4010 ComputeInstanceSnapshot::new(&self.controller, id)
4011 }
4012
4013 pub(crate) async fn ship_dataflow(
4020 &mut self,
4021 dataflow: DataflowDescription<Plan>,
4022 instance: ComputeInstanceId,
4023 target_replica: Option<ReplicaId>,
4024 ) {
4025 self.try_ship_dataflow(dataflow, instance, target_replica)
4026 .await
4027 .unwrap_or_terminate("dataflow creation cannot fail");
4028 }
4029
4030 pub(crate) async fn try_ship_dataflow(
4033 &mut self,
4034 dataflow: DataflowDescription<Plan>,
4035 instance: ComputeInstanceId,
4036 target_replica: Option<ReplicaId>,
4037 ) -> Result<(), DataflowCreationError> {
4038 let export_ids = dataflow.exported_index_ids().collect();
4041
4042 self.controller
4043 .compute
4044 .create_dataflow(instance, dataflow, target_replica)?;
4045
4046 self.initialize_compute_read_policies(export_ids, instance, CompactionWindow::Default)
4047 .await;
4048
4049 Ok(())
4050 }
4051
4052 pub(crate) fn allow_writes(&mut self, instance: ComputeInstanceId, id: GlobalId) {
4056 self.controller
4057 .compute
4058 .allow_writes(instance, id)
4059 .unwrap_or_terminate("allow_writes cannot fail");
4060 }
4061
4062 pub(crate) async fn ship_dataflow_and_notice_builtin_table_updates(
4064 &mut self,
4065 dataflow: DataflowDescription<Plan>,
4066 instance: ComputeInstanceId,
4067 notice_builtin_updates_fut: Option<BuiltinTableAppendNotify>,
4068 target_replica: Option<ReplicaId>,
4069 ) {
4070 if let Some(notice_builtin_updates_fut) = notice_builtin_updates_fut {
4071 let ship_dataflow_fut = self.ship_dataflow(dataflow, instance, target_replica);
4072 let ((), ()) =
4073 futures::future::join(notice_builtin_updates_fut, ship_dataflow_fut).await;
4074 } else {
4075 self.ship_dataflow(dataflow, instance, target_replica).await;
4076 }
4077 }
4078
4079 pub fn install_compute_watch_set(
4083 &mut self,
4084 conn_id: ConnectionId,
4085 objects: BTreeSet<GlobalId>,
4086 t: Timestamp,
4087 state: WatchSetResponse,
4088 ) -> Result<(), CollectionLookupError> {
4089 let ws_id = self.controller.install_compute_watch_set(objects, t)?;
4090 self.connection_watch_sets
4091 .entry(conn_id.clone())
4092 .or_default()
4093 .insert(ws_id);
4094 self.installed_watch_sets.insert(ws_id, (conn_id, state));
4095 Ok(())
4096 }
4097
4098 pub fn install_storage_watch_set(
4102 &mut self,
4103 conn_id: ConnectionId,
4104 objects: BTreeSet<GlobalId>,
4105 t: Timestamp,
4106 state: WatchSetResponse,
4107 ) -> Result<(), CollectionMissing> {
4108 let ws_id = self.controller.install_storage_watch_set(objects, t)?;
4109 self.connection_watch_sets
4110 .entry(conn_id.clone())
4111 .or_default()
4112 .insert(ws_id);
4113 self.installed_watch_sets.insert(ws_id, (conn_id, state));
4114 Ok(())
4115 }
4116
4117 pub fn cancel_pending_watchsets(&mut self, conn_id: &ConnectionId) {
4119 if let Some(ws_ids) = self.connection_watch_sets.remove(conn_id) {
4120 for ws_id in ws_ids {
4121 self.installed_watch_sets.remove(&ws_id);
4122 }
4123 }
4124 }
4125
4126 pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
4130 let global_timelines: BTreeMap<_, _> = self
4136 .global_timelines
4137 .iter()
4138 .map(|(timeline, state)| (timeline.to_string(), format!("{state:?}")))
4139 .collect();
4140 let active_conns: BTreeMap<_, _> = self
4141 .active_conns
4142 .iter()
4143 .map(|(id, meta)| (id.unhandled().to_string(), format!("{meta:?}")))
4144 .collect();
4145 let txn_read_holds: BTreeMap<_, _> = self
4146 .txn_read_holds
4147 .iter()
4148 .map(|(id, capability)| (id.unhandled().to_string(), format!("{capability:?}")))
4149 .collect();
4150 let pending_peeks: BTreeMap<_, _> = self
4151 .pending_peeks
4152 .iter()
4153 .map(|(id, peek)| (id.to_string(), format!("{peek:?}")))
4154 .collect();
4155 let client_pending_peeks: BTreeMap<_, _> = self
4156 .client_pending_peeks
4157 .iter()
4158 .map(|(id, peek)| {
4159 let peek: BTreeMap<_, _> = peek
4160 .iter()
4161 .map(|(uuid, storage_id)| (uuid.to_string(), storage_id))
4162 .collect();
4163 (id.to_string(), peek)
4164 })
4165 .collect();
4166 let pending_linearize_read_txns: BTreeMap<_, _> = self
4167 .pending_linearize_read_txns
4168 .iter()
4169 .map(|(id, read_txn)| (id.unhandled().to_string(), format!("{read_txn:?}")))
4170 .collect();
4171
4172 Ok(serde_json::json!({
4173 "global_timelines": global_timelines,
4174 "active_conns": active_conns,
4175 "txn_read_holds": txn_read_holds,
4176 "pending_peeks": pending_peeks,
4177 "client_pending_peeks": client_pending_peeks,
4178 "pending_linearize_read_txns": pending_linearize_read_txns,
4179 "controller": self.controller.dump().await?,
4180 }))
4181 }
4182
4183 async fn prune_storage_usage_events_on_startup(&self, retention_period: Duration) {
4197 let item_id = self
4198 .catalog()
4199 .resolve_builtin_table(&MZ_STORAGE_USAGE_BY_SHARD);
4200 let global_id = self.catalog.get_entry(&item_id).latest_global_id();
4201 let read_ts = self.get_local_read_ts().await;
4202 let current_contents_fut = self
4203 .controller
4204 .storage_collections
4205 .snapshot(global_id, read_ts);
4206 let internal_cmd_tx = self.internal_cmd_tx.clone();
4207 spawn(|| "storage_usage_prune", async move {
4208 let mut current_contents = current_contents_fut
4209 .await
4210 .unwrap_or_terminate("cannot fail to fetch snapshot");
4211 differential_dataflow::consolidation::consolidate(&mut current_contents);
4212
4213 let cutoff_ts = u128::from(read_ts).saturating_sub(retention_period.as_millis());
4214 let mut expired = Vec::new();
4215 for (row, diff) in current_contents {
4216 assert_eq!(
4217 diff, 1,
4218 "consolidated contents should not contain retractions: ({row:#?}, {diff:#?})"
4219 );
4220 let collection_timestamp = row
4222 .unpack()
4223 .get(3)
4224 .expect("definition of mz_storage_by_shard changed")
4225 .unwrap_timestamptz();
4226 let collection_timestamp = collection_timestamp.timestamp_millis();
4227 let collection_timestamp: u128 = collection_timestamp
4228 .try_into()
4229 .expect("all collections happen after Jan 1 1970");
4230 if collection_timestamp < cutoff_ts {
4231 debug!("pruning storage event {row:?}");
4232 let builtin_update = BuiltinTableUpdate::row(item_id, row, Diff::MINUS_ONE);
4233 expired.push(builtin_update);
4234 }
4235 }
4236
4237 let _ = internal_cmd_tx.send(Message::StorageUsagePrune(expired));
4239 });
4240 }
4241
4242 fn current_credit_consumption_rate(&self) -> Numeric {
4243 self.catalog()
4244 .user_cluster_replicas()
4245 .filter_map(|replica| match &replica.config.location {
4246 ReplicaLocation::Managed(location) => Some(location.size_for_billing()),
4247 ReplicaLocation::Unmanaged(_) => None,
4248 })
4249 .map(|size| {
4250 self.catalog()
4251 .cluster_replica_sizes()
4252 .0
4253 .get(size)
4254 .expect("location size is validated against the cluster replica sizes")
4255 .credits_per_hour
4256 })
4257 .sum()
4258 }
4259}
4260
4261#[cfg(test)]
4262impl Coordinator {
4263 #[allow(dead_code)]
4264 async fn verify_ship_dataflow_no_error(&mut self, dataflow: DataflowDescription<Plan>) {
4265 let compute_instance = ComputeInstanceId::user(1).expect("1 is a valid ID");
4273
4274 let _: () = self.ship_dataflow(dataflow, compute_instance, None).await;
4275 }
4276}
4277
4278struct LastMessage {
4280 kind: &'static str,
4281 stmt: Option<Arc<Statement<Raw>>>,
4282}
4283
4284impl LastMessage {
4285 fn stmt_to_string(&self) -> Cow<'static, str> {
4287 self.stmt
4288 .as_ref()
4289 .map(|stmt| stmt.to_ast_string_redacted().into())
4290 .unwrap_or(Cow::Borrowed("<none>"))
4291 }
4292}
4293
4294impl fmt::Debug for LastMessage {
4295 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4296 f.debug_struct("LastMessage")
4297 .field("kind", &self.kind)
4298 .field("stmt", &self.stmt_to_string())
4299 .finish()
4300 }
4301}
4302
4303impl Drop for LastMessage {
4304 fn drop(&mut self) {
4305 if std::thread::panicking() {
4307 eprintln!("Coordinator panicking, dumping last message\n{self:?}",);
4309 }
4310 }
4311}
4312
4313pub fn serve(
4325 Config {
4326 controller_config,
4327 controller_envd_epoch,
4328 mut storage,
4329 audit_logs_iterator,
4330 timestamp_oracle_url,
4331 unsafe_mode,
4332 all_features,
4333 build_info,
4334 environment_id,
4335 metrics_registry,
4336 now,
4337 secrets_controller,
4338 cloud_resource_controller,
4339 cluster_replica_sizes,
4340 builtin_system_cluster_config,
4341 builtin_catalog_server_cluster_config,
4342 builtin_probe_cluster_config,
4343 builtin_support_cluster_config,
4344 builtin_analytics_cluster_config,
4345 system_parameter_defaults,
4346 availability_zones,
4347 storage_usage_client,
4348 storage_usage_collection_interval,
4349 storage_usage_retention_period,
4350 segment_client,
4351 egress_addresses,
4352 aws_account_id,
4353 aws_privatelink_availability_zones,
4354 connection_context,
4355 connection_limit_callback,
4356 remote_system_parameters,
4357 webhook_concurrency_limit,
4358 http_host_name,
4359 tracing_handle,
4360 read_only_controllers,
4361 caught_up_trigger: clusters_caught_up_trigger,
4362 helm_chart_version,
4363 license_key,
4364 external_login_password_mz_system,
4365 force_builtin_schema_migration,
4366 }: Config,
4367) -> BoxFuture<'static, Result<(Handle, Client), AdapterError>> {
4368 async move {
4369 let coord_start = Instant::now();
4370 info!("startup: coordinator init: beginning");
4371 info!("startup: coordinator init: preamble beginning");
4372
4373 let _builtins = LazyLock::force(&BUILTINS_STATIC);
4377
4378 let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
4379 let (internal_cmd_tx, internal_cmd_rx) = mpsc::unbounded_channel();
4380 let (strict_serializable_reads_tx, strict_serializable_reads_rx) =
4381 mpsc::unbounded_channel();
4382
4383 if !availability_zones.iter().all_unique() {
4385 coord_bail!("availability zones must be unique");
4386 }
4387
4388 let aws_principal_context = match (
4389 aws_account_id,
4390 connection_context.aws_external_id_prefix.clone(),
4391 ) {
4392 (Some(aws_account_id), Some(aws_external_id_prefix)) => Some(AwsPrincipalContext {
4393 aws_account_id,
4394 aws_external_id_prefix,
4395 }),
4396 _ => None,
4397 };
4398
4399 let aws_privatelink_availability_zones = aws_privatelink_availability_zones
4400 .map(|azs_vec| BTreeSet::from_iter(azs_vec.iter().cloned()));
4401
4402 info!(
4403 "startup: coordinator init: preamble complete in {:?}",
4404 coord_start.elapsed()
4405 );
4406 let oracle_init_start = Instant::now();
4407 info!("startup: coordinator init: timestamp oracle init beginning");
4408
4409 let timestamp_oracle_config = timestamp_oracle_url
4410 .map(|url| TimestampOracleConfig::from_url(&url, &metrics_registry))
4411 .transpose()?;
4412 let mut initial_timestamps =
4413 get_initial_oracle_timestamps(×tamp_oracle_config).await?;
4414
4415 initial_timestamps
4419 .entry(Timeline::EpochMilliseconds)
4420 .or_insert_with(mz_repr::Timestamp::minimum);
4421 let mut timestamp_oracles = BTreeMap::new();
4422 for (timeline, initial_timestamp) in initial_timestamps {
4423 Coordinator::ensure_timeline_state_with_initial_time(
4424 &timeline,
4425 initial_timestamp,
4426 now.clone(),
4427 timestamp_oracle_config.clone(),
4428 &mut timestamp_oracles,
4429 read_only_controllers,
4430 )
4431 .await;
4432 }
4433
4434 let catalog_upper = storage.current_upper().await;
4438 let epoch_millis_oracle = ×tamp_oracles
4444 .get(&Timeline::EpochMilliseconds)
4445 .expect("inserted above")
4446 .oracle;
4447
4448 let mut boot_ts = if read_only_controllers {
4449 let read_ts = epoch_millis_oracle.read_ts().await;
4450 std::cmp::max(read_ts, catalog_upper)
4451 } else {
4452 epoch_millis_oracle.apply_write(catalog_upper).await;
4455 epoch_millis_oracle.write_ts().await.timestamp
4456 };
4457
4458 info!(
4459 "startup: coordinator init: timestamp oracle init complete in {:?}",
4460 oracle_init_start.elapsed()
4461 );
4462
4463 let catalog_open_start = Instant::now();
4464 info!("startup: coordinator init: catalog open beginning");
4465 let persist_client = controller_config
4466 .persist_clients
4467 .open(controller_config.persist_location.clone())
4468 .await
4469 .context("opening persist client")?;
4470 let builtin_item_migration_config =
4471 BuiltinItemMigrationConfig {
4472 persist_client: persist_client.clone(),
4473 read_only: read_only_controllers,
4474 force_migration: force_builtin_schema_migration,
4475 }
4476 ;
4477 let OpenCatalogResult {
4478 mut catalog,
4479 migrated_storage_collections_0dt,
4480 new_builtin_collections,
4481 builtin_table_updates,
4482 cached_global_exprs,
4483 uncached_local_exprs,
4484 } = Catalog::open(mz_catalog::config::Config {
4485 storage,
4486 metrics_registry: &metrics_registry,
4487 state: mz_catalog::config::StateConfig {
4488 unsafe_mode,
4489 all_features,
4490 build_info,
4491 environment_id: environment_id.clone(),
4492 read_only: read_only_controllers,
4493 now: now.clone(),
4494 boot_ts: boot_ts.clone(),
4495 skip_migrations: false,
4496 cluster_replica_sizes,
4497 builtin_system_cluster_config,
4498 builtin_catalog_server_cluster_config,
4499 builtin_probe_cluster_config,
4500 builtin_support_cluster_config,
4501 builtin_analytics_cluster_config,
4502 system_parameter_defaults,
4503 remote_system_parameters,
4504 availability_zones,
4505 egress_addresses,
4506 aws_principal_context,
4507 aws_privatelink_availability_zones,
4508 connection_context,
4509 http_host_name,
4510 builtin_item_migration_config,
4511 persist_client: persist_client.clone(),
4512 enable_expression_cache_override: None,
4513 helm_chart_version,
4514 external_login_password_mz_system,
4515 license_key: license_key.clone(),
4516 },
4517 })
4518 .await?;
4519
4520 let catalog_upper = catalog.current_upper().await;
4523 boot_ts = std::cmp::max(boot_ts, catalog_upper);
4524
4525 if !read_only_controllers {
4526 epoch_millis_oracle.apply_write(boot_ts).await;
4527 }
4528
4529 info!(
4530 "startup: coordinator init: catalog open complete in {:?}",
4531 catalog_open_start.elapsed()
4532 );
4533
4534 let coord_thread_start = Instant::now();
4535 info!("startup: coordinator init: coordinator thread start beginning");
4536
4537 let session_id = catalog.config().session_id;
4538 let start_instant = catalog.config().start_instant;
4539
4540 let (bootstrap_tx, bootstrap_rx) = oneshot::channel();
4544 let handle = TokioHandle::current();
4545
4546 let metrics = Metrics::register_into(&metrics_registry);
4547 let metrics_clone = metrics.clone();
4548 let optimizer_metrics = OptimizerMetrics::register_into(
4549 &metrics_registry,
4550 catalog.system_config().optimizer_e2e_latency_warning_threshold(),
4551 );
4552 let segment_client_clone = segment_client.clone();
4553 let coord_now = now.clone();
4554 let advance_timelines_interval =
4555 tokio::time::interval(catalog.system_config().default_timestamp_interval());
4556 let mut check_scheduling_policies_interval = tokio::time::interval(
4557 catalog
4558 .system_config()
4559 .cluster_check_scheduling_policies_interval(),
4560 );
4561 check_scheduling_policies_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
4562
4563 let clusters_caught_up_check_interval = if read_only_controllers {
4564 let dyncfgs = catalog.system_config().dyncfgs();
4565 let interval = WITH_0DT_DEPLOYMENT_CAUGHT_UP_CHECK_INTERVAL.get(dyncfgs);
4566
4567 let mut interval = tokio::time::interval(interval);
4568 interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
4569 interval
4570 } else {
4571 let mut interval = tokio::time::interval(Duration::from_secs(60 * 60));
4579 interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
4580 interval
4581 };
4582
4583 let clusters_caught_up_check =
4584 clusters_caught_up_trigger.map(|trigger| {
4585 let mut exclude_collections: BTreeSet<GlobalId> =
4586 new_builtin_collections.into_iter().collect();
4587
4588 let mut todo: Vec<_> = migrated_storage_collections_0dt
4598 .iter()
4599 .filter(|id| {
4600 catalog.state().get_entry(id).is_materialized_view()
4601 })
4602 .copied()
4603 .collect();
4604 while let Some(item_id) = todo.pop() {
4605 let entry = catalog.state().get_entry(&item_id);
4606 exclude_collections.extend(entry.global_ids());
4607 todo.extend_from_slice(entry.used_by());
4608 }
4609
4610 CaughtUpCheckContext {
4611 trigger,
4612 exclude_collections,
4613 }
4614 });
4615
4616 if let Some(TimestampOracleConfig::Postgres(pg_config)) =
4617 timestamp_oracle_config.as_ref()
4618 {
4619 let pg_timestamp_oracle_params =
4622 flags::timestamp_oracle_config(catalog.system_config());
4623 pg_timestamp_oracle_params.apply(pg_config);
4624 }
4625
4626 let connection_limit_callback: Arc<dyn Fn(&SystemVars) + Send + Sync> =
4629 Arc::new(move |system_vars: &SystemVars| {
4630 let limit: u64 = system_vars.max_connections().cast_into();
4631 let superuser_reserved: u64 =
4632 system_vars.superuser_reserved_connections().cast_into();
4633
4634 let superuser_reserved = if superuser_reserved >= limit {
4639 tracing::warn!(
4640 "superuser_reserved ({superuser_reserved}) is greater than max connections ({limit})!"
4641 );
4642 limit
4643 } else {
4644 superuser_reserved
4645 };
4646
4647 (connection_limit_callback)(limit, superuser_reserved);
4648 });
4649 catalog.system_config_mut().register_callback(
4650 &mz_sql::session::vars::MAX_CONNECTIONS,
4651 Arc::clone(&connection_limit_callback),
4652 );
4653 catalog.system_config_mut().register_callback(
4654 &mz_sql::session::vars::SUPERUSER_RESERVED_CONNECTIONS,
4655 connection_limit_callback,
4656 );
4657
4658 let (group_commit_tx, group_commit_rx) = appends::notifier();
4659
4660 let parent_span = tracing::Span::current();
4661 let thread = thread::Builder::new()
4662 .stack_size(3 * stack::STACK_SIZE)
4666 .name("coordinator".to_string())
4667 .spawn(move || {
4668 let span = info_span!(parent: parent_span, "coord::coordinator").entered();
4669
4670 let controller = handle
4671 .block_on({
4672 catalog.initialize_controller(
4673 controller_config,
4674 controller_envd_epoch,
4675 read_only_controllers,
4676 )
4677 })
4678 .unwrap_or_terminate("failed to initialize storage_controller");
4679 let catalog_upper = handle.block_on(catalog.current_upper());
4682 boot_ts = std::cmp::max(boot_ts, catalog_upper);
4683 if !read_only_controllers {
4684 let epoch_millis_oracle = ×tamp_oracles
4685 .get(&Timeline::EpochMilliseconds)
4686 .expect("inserted above")
4687 .oracle;
4688 handle.block_on(epoch_millis_oracle.apply_write(boot_ts));
4689 }
4690
4691 let catalog = Arc::new(catalog);
4692
4693 let caching_secrets_reader = CachingSecretsReader::new(secrets_controller.reader());
4694 let mut coord = Coordinator {
4695 controller,
4696 catalog,
4697 internal_cmd_tx,
4698 group_commit_tx,
4699 strict_serializable_reads_tx,
4700 global_timelines: timestamp_oracles,
4701 transient_id_gen: Arc::new(TransientIdGen::new()),
4702 active_conns: BTreeMap::new(),
4703 txn_read_holds: Default::default(),
4704 pending_peeks: BTreeMap::new(),
4705 client_pending_peeks: BTreeMap::new(),
4706 pending_linearize_read_txns: BTreeMap::new(),
4707 serialized_ddl: LockedVecDeque::new(),
4708 active_compute_sinks: BTreeMap::new(),
4709 active_webhooks: BTreeMap::new(),
4710 active_copies: BTreeMap::new(),
4711 staged_cancellation: BTreeMap::new(),
4712 introspection_subscribes: BTreeMap::new(),
4713 write_locks: BTreeMap::new(),
4714 deferred_write_ops: BTreeMap::new(),
4715 pending_writes: Vec::new(),
4716 advance_timelines_interval,
4717 secrets_controller,
4718 caching_secrets_reader,
4719 cloud_resource_controller,
4720 storage_usage_client,
4721 storage_usage_collection_interval,
4722 segment_client,
4723 metrics,
4724 optimizer_metrics,
4725 tracing_handle,
4726 statement_logging: StatementLogging::new(coord_now.clone()),
4727 webhook_concurrency_limit,
4728 timestamp_oracle_config,
4729 check_cluster_scheduling_policies_interval: check_scheduling_policies_interval,
4730 cluster_scheduling_decisions: BTreeMap::new(),
4731 caught_up_check_interval: clusters_caught_up_check_interval,
4732 caught_up_check: clusters_caught_up_check,
4733 installed_watch_sets: BTreeMap::new(),
4734 connection_watch_sets: BTreeMap::new(),
4735 cluster_replica_statuses: ClusterReplicaStatuses::new(),
4736 read_only_controllers,
4737 buffered_builtin_table_updates: Some(Vec::new()),
4738 license_key,
4739 user_id_pool: IdPool::empty(),
4740 persist_client,
4741 };
4742 let bootstrap = handle.block_on(async {
4743 coord
4744 .bootstrap(
4745 boot_ts,
4746 migrated_storage_collections_0dt,
4747 builtin_table_updates,
4748 cached_global_exprs,
4749 uncached_local_exprs,
4750 audit_logs_iterator,
4751 )
4752 .await?;
4753 coord
4754 .controller
4755 .remove_orphaned_replicas(
4756 coord.catalog().get_next_user_replica_id().await?,
4757 coord.catalog().get_next_system_replica_id().await?,
4758 )
4759 .await
4760 .map_err(AdapterError::Orchestrator)?;
4761
4762 if let Some(retention_period) = storage_usage_retention_period {
4763 coord
4764 .prune_storage_usage_events_on_startup(retention_period)
4765 .await;
4766 }
4767
4768 Ok(())
4769 });
4770 let ok = bootstrap.is_ok();
4771 drop(span);
4772 bootstrap_tx
4773 .send(bootstrap)
4774 .expect("bootstrap_rx is not dropped until it receives this message");
4775 if ok {
4776 handle.block_on(coord.serve(
4777 internal_cmd_rx,
4778 strict_serializable_reads_rx,
4779 cmd_rx,
4780 group_commit_rx,
4781 ));
4782 }
4783 })
4784 .expect("failed to create coordinator thread");
4785 match bootstrap_rx
4786 .await
4787 .expect("bootstrap_tx always sends a message or panics/halts")
4788 {
4789 Ok(()) => {
4790 info!(
4791 "startup: coordinator init: coordinator thread start complete in {:?}",
4792 coord_thread_start.elapsed()
4793 );
4794 info!(
4795 "startup: coordinator init: complete in {:?}",
4796 coord_start.elapsed()
4797 );
4798 let handle = Handle {
4799 session_id,
4800 start_instant,
4801 _thread: thread.join_on_drop(),
4802 };
4803 let client = Client::new(
4804 build_info,
4805 cmd_tx,
4806 metrics_clone,
4807 now,
4808 environment_id,
4809 segment_client_clone,
4810 );
4811 Ok((handle, client))
4812 }
4813 Err(e) => Err(e),
4814 }
4815 }
4816 .boxed()
4817}
4818
4819async fn get_initial_oracle_timestamps(
4833 timestamp_oracle_config: &Option<TimestampOracleConfig>,
4834) -> Result<BTreeMap<Timeline, Timestamp>, AdapterError> {
4835 let mut initial_timestamps = BTreeMap::new();
4836
4837 if let Some(config) = timestamp_oracle_config {
4838 let oracle_timestamps = config.get_all_timelines().await?;
4839
4840 let debug_msg = || {
4841 oracle_timestamps
4842 .iter()
4843 .map(|(timeline, ts)| format!("{:?} -> {}", timeline, ts))
4844 .join(", ")
4845 };
4846 info!(
4847 "current timestamps from the timestamp oracle: {}",
4848 debug_msg()
4849 );
4850
4851 for (timeline, ts) in oracle_timestamps {
4852 let entry = initial_timestamps
4853 .entry(Timeline::from_str(&timeline).expect("could not parse timeline"));
4854
4855 entry
4856 .and_modify(|current_ts| *current_ts = std::cmp::max(*current_ts, ts))
4857 .or_insert(ts);
4858 }
4859 } else {
4860 info!("no timestamp oracle configured!");
4861 };
4862
4863 let debug_msg = || {
4864 initial_timestamps
4865 .iter()
4866 .map(|(timeline, ts)| format!("{:?}: {}", timeline, ts))
4867 .join(", ")
4868 };
4869 info!("initial oracle timestamps: {}", debug_msg());
4870
4871 Ok(initial_timestamps)
4872}
4873
4874#[instrument]
4875pub async fn load_remote_system_parameters(
4876 storage: &mut Box<dyn OpenableDurableCatalogState>,
4877 system_parameter_sync_config: Option<SystemParameterSyncConfig>,
4878 system_parameter_sync_timeout: Duration,
4879) -> Result<Option<BTreeMap<String, String>>, AdapterError> {
4880 if let Some(system_parameter_sync_config) = system_parameter_sync_config {
4881 tracing::info!("parameter sync on boot: start sync");
4882
4883 let mut params = SynchronizedParameters::new(SystemVars::default());
4923 let frontend_sync = async {
4924 let frontend = SystemParameterFrontend::from(&system_parameter_sync_config).await?;
4925 frontend.pull(&mut params);
4926 let ops = params
4927 .modified()
4928 .into_iter()
4929 .map(|param| {
4930 let name = param.name;
4931 let value = param.value;
4932 tracing::info!(name, value, initial = true, "sync parameter");
4933 (name, value)
4934 })
4935 .collect();
4936 tracing::info!("parameter sync on boot: end sync");
4937 Ok(Some(ops))
4938 };
4939 if !storage.has_system_config_synced_once().await? {
4940 frontend_sync.await
4941 } else {
4942 match mz_ore::future::timeout(system_parameter_sync_timeout, frontend_sync).await {
4943 Ok(ops) => Ok(ops),
4944 Err(TimeoutError::Inner(e)) => Err(e),
4945 Err(TimeoutError::DeadlineElapsed) => {
4946 tracing::info!("parameter sync on boot: sync has timed out");
4947 Ok(None)
4948 }
4949 }
4950 }
4951 } else {
4952 Ok(None)
4953 }
4954}
4955
4956#[derive(Debug)]
4957pub enum WatchSetResponse {
4958 StatementDependenciesReady(StatementLoggingId, StatementLifecycleEvent),
4959 AlterSinkReady(AlterSinkReadyContext),
4960 AlterMaterializedViewReady(AlterMaterializedViewReadyContext),
4961}
4962
4963#[derive(Debug)]
4964pub struct AlterSinkReadyContext {
4965 ctx: Option<ExecuteContext>,
4966 otel_ctx: OpenTelemetryContext,
4967 plan: AlterSinkPlan,
4968 plan_validity: PlanValidity,
4969 read_hold: ReadHolds,
4970}
4971
4972impl AlterSinkReadyContext {
4973 fn ctx(&mut self) -> &mut ExecuteContext {
4974 self.ctx.as_mut().expect("only cleared on drop")
4975 }
4976
4977 fn retire(mut self, result: Result<ExecuteResponse, AdapterError>) {
4978 self.ctx
4979 .take()
4980 .expect("only cleared on drop")
4981 .retire(result);
4982 }
4983}
4984
4985impl Drop for AlterSinkReadyContext {
4986 fn drop(&mut self) {
4987 if let Some(ctx) = self.ctx.take() {
4988 ctx.retire(Err(AdapterError::Canceled));
4989 }
4990 }
4991}
4992
4993#[derive(Debug)]
4994pub struct AlterMaterializedViewReadyContext {
4995 ctx: Option<ExecuteContext>,
4996 otel_ctx: OpenTelemetryContext,
4997 plan: plan::AlterMaterializedViewApplyReplacementPlan,
4998 plan_validity: PlanValidity,
4999}
5000
5001impl AlterMaterializedViewReadyContext {
5002 fn ctx(&mut self) -> &mut ExecuteContext {
5003 self.ctx.as_mut().expect("only cleared on drop")
5004 }
5005
5006 fn retire(mut self, result: Result<ExecuteResponse, AdapterError>) {
5007 self.ctx
5008 .take()
5009 .expect("only cleared on drop")
5010 .retire(result);
5011 }
5012}
5013
5014impl Drop for AlterMaterializedViewReadyContext {
5015 fn drop(&mut self) {
5016 if let Some(ctx) = self.ctx.take() {
5017 ctx.retire(Err(AdapterError::Canceled));
5018 }
5019 }
5020}
5021
5022#[derive(Debug)]
5025struct LockedVecDeque<T> {
5026 items: VecDeque<T>,
5027 lock: Arc<tokio::sync::Mutex<()>>,
5028}
5029
5030impl<T> LockedVecDeque<T> {
5031 pub fn new() -> Self {
5032 Self {
5033 items: VecDeque::new(),
5034 lock: Arc::new(tokio::sync::Mutex::new(())),
5035 }
5036 }
5037
5038 pub fn try_lock_owned(&self) -> Result<OwnedMutexGuard<()>, tokio::sync::TryLockError> {
5039 Arc::clone(&self.lock).try_lock_owned()
5040 }
5041
5042 pub fn is_empty(&self) -> bool {
5043 self.items.is_empty()
5044 }
5045
5046 pub fn push_back(&mut self, value: T) {
5047 self.items.push_back(value)
5048 }
5049
5050 pub fn pop_front(&mut self) -> Option<T> {
5051 self.items.pop_front()
5052 }
5053
5054 pub fn remove(&mut self, index: usize) -> Option<T> {
5055 self.items.remove(index)
5056 }
5057
5058 pub fn iter(&self) -> std::collections::vec_deque::Iter<'_, T> {
5059 self.items.iter()
5060 }
5061}
5062
5063#[derive(Debug)]
5064struct DeferredPlanStatement {
5065 ctx: ExecuteContext,
5066 ps: PlanStatement,
5067}
5068
5069#[derive(Debug)]
5070enum PlanStatement {
5071 Statement {
5072 stmt: Arc<Statement<Raw>>,
5073 params: Params,
5074 },
5075 Plan {
5076 plan: mz_sql::plan::Plan,
5077 resolved_ids: ResolvedIds,
5078 },
5079}
5080
5081#[derive(Debug, Error)]
5082pub enum NetworkPolicyError {
5083 #[error("Access denied for address {0}")]
5084 AddressDenied(IpAddr),
5085 #[error("Access denied missing IP address")]
5086 MissingIp,
5087}
5088
5089pub(crate) fn validate_ip_with_policy_rules(
5090 ip: &IpAddr,
5091 rules: &Vec<NetworkPolicyRule>,
5092) -> Result<(), NetworkPolicyError> {
5093 if rules.iter().any(|r| r.address.0.contains(ip)) {
5096 Ok(())
5097 } else {
5098 Err(NetworkPolicyError::AddressDenied(ip.clone()))
5099 }
5100}
5101
5102pub(crate) fn infer_sql_type_for_catalog(
5103 hir_expr: &HirRelationExpr,
5104 mir_expr: &MirRelationExpr,
5105) -> SqlRelationType {
5106 let mut typ = hir_expr.top_level_typ();
5107 typ.backport_nullability_and_keys(&mir_expr.typ());
5108 typ
5109}
5110
5111#[cfg(test)]
5112mod id_pool_tests {
5113 use super::IdPool;
5114
5115 #[mz_ore::test]
5116 fn test_empty_pool() {
5117 let mut pool = IdPool::empty();
5118 assert_eq!(pool.remaining(), 0);
5119 assert_eq!(pool.allocate(), None);
5120 assert_eq!(pool.allocate_many(1), None);
5121 }
5122
5123 #[mz_ore::test]
5124 fn test_allocate_single() {
5125 let mut pool = IdPool::empty();
5126 pool.refill(10, 13);
5127 assert_eq!(pool.remaining(), 3);
5128 assert_eq!(pool.allocate(), Some(10));
5129 assert_eq!(pool.allocate(), Some(11));
5130 assert_eq!(pool.allocate(), Some(12));
5131 assert_eq!(pool.remaining(), 0);
5132 assert_eq!(pool.allocate(), None);
5133 }
5134
5135 #[mz_ore::test]
5136 fn test_allocate_many() {
5137 let mut pool = IdPool::empty();
5138 pool.refill(100, 105);
5139 assert_eq!(pool.allocate_many(3), Some(vec![100, 101, 102]));
5140 assert_eq!(pool.remaining(), 2);
5141 assert_eq!(pool.allocate_many(3), None);
5143 assert_eq!(pool.allocate_many(2), Some(vec![103, 104]));
5145 assert_eq!(pool.remaining(), 0);
5146 }
5147
5148 #[mz_ore::test]
5149 fn test_allocate_many_zero() {
5150 let mut pool = IdPool::empty();
5151 pool.refill(1, 5);
5152 assert_eq!(pool.allocate_many(0), Some(vec![]));
5153 assert_eq!(pool.remaining(), 4);
5154 }
5155
5156 #[mz_ore::test]
5157 fn test_refill_resets_pool() {
5158 let mut pool = IdPool::empty();
5159 pool.refill(0, 2);
5160 assert_eq!(pool.allocate(), Some(0));
5161 pool.refill(50, 52);
5163 assert_eq!(pool.allocate(), Some(50));
5164 assert_eq!(pool.allocate(), Some(51));
5165 assert_eq!(pool.allocate(), None);
5166 }
5167
5168 #[mz_ore::test]
5169 fn test_mixed_allocate_and_allocate_many() {
5170 let mut pool = IdPool::empty();
5171 pool.refill(0, 10);
5172 assert_eq!(pool.allocate(), Some(0));
5173 assert_eq!(pool.allocate_many(3), Some(vec![1, 2, 3]));
5174 assert_eq!(pool.allocate(), Some(4));
5175 assert_eq!(pool.remaining(), 5);
5176 }
5177
5178 #[mz_ore::test]
5179 #[should_panic(expected = "invalid pool range")]
5180 fn test_refill_invalid_range_panics() {
5181 let mut pool = IdPool::empty();
5182 pool.refill(10, 5);
5183 }
5184}