1use std::borrow::Cow;
70use std::collections::{BTreeMap, BTreeSet, VecDeque};
71use std::fmt;
72use std::net::IpAddr;
73use std::num::NonZeroI64;
74use std::ops::Neg;
75use std::str::FromStr;
76use std::sync::LazyLock;
77use std::sync::{Arc, Mutex};
78use std::thread;
79use std::time::{Duration, Instant};
80
81use anyhow::Context;
82use chrono::{DateTime, Utc};
83use derivative::Derivative;
84use differential_dataflow::lattice::Lattice;
85use fail::fail_point;
86use futures::StreamExt;
87use futures::future::{BoxFuture, FutureExt, LocalBoxFuture};
88use http::Uri;
89use ipnet::IpNet;
90use itertools::{Either, Itertools};
91use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
92use mz_adapter_types::compaction::CompactionWindow;
93use mz_adapter_types::connection::ConnectionId;
94use mz_adapter_types::dyncfgs::{
95 USER_ID_POOL_BATCH_SIZE, WITH_0DT_DEPLOYMENT_CAUGHT_UP_CHECK_INTERVAL,
96};
97use mz_auth::password::Password;
98use mz_build_info::BuildInfo;
99use mz_catalog::builtin::{BUILTINS, BUILTINS_STATIC, MZ_AUDIT_EVENTS, MZ_STORAGE_USAGE_BY_SHARD};
100use mz_catalog::config::{AwsPrincipalContext, BuiltinItemMigrationConfig, ClusterReplicaSizeMap};
101use mz_catalog::durable::{AuditLogIterator, OpenableDurableCatalogState};
102use mz_catalog::expr_cache::{GlobalExpressions, LocalExpressions};
103use mz_catalog::memory::objects::{
104 CatalogEntry, CatalogItem, ClusterReplicaProcessStatus, ClusterVariantManaged, Connection,
105 DataSourceDesc, StateDiff, StateUpdate, StateUpdateKind, Table, TableDataSource,
106};
107use mz_cloud_resources::{CloudResourceController, VpcEndpointConfig, VpcEndpointEvent};
108use mz_compute_client::as_of_selection;
109use mz_compute_client::controller::error::{
110 CollectionLookupError, CollectionMissing, DataflowCreationError, InstanceMissing,
111};
112use mz_compute_types::ComputeInstanceId;
113use mz_compute_types::dataflows::DataflowDescription;
114use mz_compute_types::plan::Plan;
115use mz_controller::clusters::{
116 ClusterConfig, ClusterEvent, ClusterStatus, ProcessId, ReplicaLocation,
117};
118use mz_controller::{ControllerConfig, Readiness};
119use mz_controller_types::{ClusterId, ReplicaId, WatchSetId};
120use mz_expr::{MapFilterProject, MirRelationExpr, OptimizedMirRelationExpr, RowSetFinishing};
121use mz_license_keys::ValidatedLicenseKey;
122use mz_orchestrator::OfflineReason;
123use mz_ore::cast::{CastFrom, CastInto, CastLossy};
124use mz_ore::channel::trigger::Trigger;
125use mz_ore::future::TimeoutError;
126use mz_ore::metrics::MetricsRegistry;
127use mz_ore::now::{EpochMillis, NowFn};
128use mz_ore::task::{JoinHandle, spawn};
129use mz_ore::thread::JoinHandleExt;
130use mz_ore::tracing::{OpenTelemetryContext, TracingHandle};
131use mz_ore::url::SensitiveUrl;
132use mz_ore::{assert_none, instrument, soft_assert_eq_or_log, soft_assert_or_log, stack};
133use mz_persist_client::PersistClient;
134use mz_persist_client::batch::ProtoBatch;
135use mz_persist_client::usage::{ShardsUsageReferenced, StorageUsageClient};
136use mz_repr::adt::numeric::Numeric;
137use mz_repr::explain::{ExplainConfig, ExplainFormat};
138use mz_repr::global_id::TransientIdGen;
139use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
140use mz_repr::role_id::RoleId;
141use mz_repr::{CatalogItemId, Diff, GlobalId, RelationDesc, SqlRelationType, Timestamp};
142use mz_secrets::cache::CachingSecretsReader;
143use mz_secrets::{SecretsController, SecretsReader};
144use mz_sql::ast::{Raw, Statement};
145use mz_sql::catalog::{CatalogCluster, EnvironmentId};
146use mz_sql::names::{QualifiedItemName, ResolvedIds, SchemaSpecifier};
147use mz_sql::optimizer_metrics::OptimizerMetrics;
148use mz_sql::plan::{
149 self, AlterSinkPlan, ConnectionDetails, CreateConnectionPlan, HirRelationExpr,
150 NetworkPolicyRule, OnTimeoutAction, Params, QueryWhen,
151};
152use mz_sql::session::user::User;
153use mz_sql::session::vars::{MAX_CREDIT_CONSUMPTION_RATE, SystemVars, Var};
154use mz_sql_parser::ast::ExplainStage;
155use mz_sql_parser::ast::display::AstDisplay;
156use mz_storage_client::client::TableData;
157use mz_storage_client::controller::{CollectionDescription, DataSource, ExportDescription};
158use mz_storage_types::connections::Connection as StorageConnection;
159use mz_storage_types::connections::ConnectionContext;
160use mz_storage_types::connections::inline::{IntoInlineConnection, ReferencedConnection};
161use mz_storage_types::read_holds::ReadHold;
162use mz_storage_types::sinks::{S3SinkFormat, StorageSinkDesc};
163use mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC;
164use mz_storage_types::sources::{IngestionDescription, SourceExport, Timeline};
165use mz_timestamp_oracle::{TimestampOracleConfig, WriteTimestamp};
166use mz_transform::dataflow::DataflowMetainfo;
167use opentelemetry::trace::TraceContextExt;
168use serde::Serialize;
169use thiserror::Error;
170use timely::progress::{Antichain, Timestamp as _};
171use tokio::runtime::Handle as TokioHandle;
172use tokio::select;
173use tokio::sync::{OwnedMutexGuard, mpsc, oneshot, watch};
174use tokio::time::{Interval, MissedTickBehavior};
175use tracing::{Instrument, Level, Span, debug, info, info_span, span, warn};
176use tracing_opentelemetry::OpenTelemetrySpanExt;
177use uuid::Uuid;
178
179use crate::active_compute_sink::{ActiveComputeSink, ActiveCopyFrom};
180use crate::catalog::{BuiltinTableUpdate, Catalog, OpenCatalogResult};
181use crate::client::{Client, Handle};
182use crate::command::{Command, ExecuteResponse};
183use crate::config::{SynchronizedParameters, SystemParameterFrontend, SystemParameterSyncConfig};
184use crate::coord::appends::{
185 BuiltinTableAppendNotify, DeferredOp, GroupCommitPermit, PendingWriteTxn,
186};
187use crate::coord::caught_up::CaughtUpCheckContext;
188use crate::coord::cluster_scheduling::SchedulingDecision;
189use crate::coord::id_bundle::CollectionIdBundle;
190use crate::coord::introspection::IntrospectionSubscribe;
191use crate::coord::peek::PendingPeek;
192use crate::coord::statement_logging::StatementLogging;
193use crate::coord::timeline::{TimelineContext, TimelineState};
194use crate::coord::timestamp_selection::{TimestampContext, TimestampDetermination};
195use crate::coord::validity::PlanValidity;
196use crate::error::AdapterError;
197use crate::explain::insights::PlanInsightsContext;
198use crate::explain::optimizer_trace::{DispatchGuard, OptimizerTrace};
199use crate::metrics::Metrics;
200use crate::optimize::dataflows::{
201 ComputeInstanceSnapshot, DataflowBuilder, dataflow_import_id_bundle,
202};
203use crate::optimize::{self, Optimize, OptimizerConfig};
204use crate::session::{EndTransactionAction, Session};
205use crate::statement_logging::{
206 StatementEndedExecutionReason, StatementLifecycleEvent, StatementLoggingId,
207};
208use crate::util::{ClientTransmitter, ResultExt, sort_topological};
209use crate::webhook::{WebhookAppenderInvalidator, WebhookConcurrencyLimiter};
210use crate::{AdapterNotice, ReadHolds, flags};
211
212pub(crate) mod appends;
213pub(crate) mod catalog_serving;
214pub(crate) mod cluster_scheduling;
215pub(crate) mod consistency;
216pub(crate) mod id_bundle;
217pub(crate) mod in_memory_oracle;
218pub(crate) mod peek;
219pub(crate) mod read_policy;
220pub(crate) mod sequencer;
221pub(crate) mod statement_logging;
222pub(crate) mod timeline;
223pub(crate) mod timestamp_selection;
224
225pub mod catalog_implications;
226mod caught_up;
227mod command_handler;
228mod ddl;
229mod indexes;
230mod introspection;
231mod message_handler;
232mod privatelink_status;
233mod sql;
234mod validity;
235
236#[derive(Debug)]
262pub(crate) struct IdPool {
263 next: u64,
264 upper: u64,
265}
266
267impl IdPool {
268 pub fn empty() -> Self {
270 IdPool { next: 0, upper: 0 }
271 }
272
273 pub fn allocate(&mut self) -> Option<u64> {
275 if self.next < self.upper {
276 let id = self.next;
277 self.next += 1;
278 Some(id)
279 } else {
280 None
281 }
282 }
283
284 pub fn allocate_many(&mut self, n: u64) -> Option<Vec<u64>> {
287 if self.remaining() >= n {
288 let ids = (self.next..self.next + n).collect();
289 self.next += n;
290 Some(ids)
291 } else {
292 None
293 }
294 }
295
296 pub fn remaining(&self) -> u64 {
298 self.upper - self.next
299 }
300
301 pub fn refill(&mut self, next: u64, upper: u64) {
303 assert!(next <= upper, "invalid pool range: {next}..{upper}");
304 self.next = next;
305 self.upper = upper;
306 }
307}
308
309#[derive(Debug)]
310pub enum Message {
311 Command(OpenTelemetryContext, Command),
312 ControllerReady {
313 controller: ControllerReadiness,
314 },
315 PurifiedStatementReady(PurifiedStatementReady),
316 CreateConnectionValidationReady(CreateConnectionValidationReady),
317 AlterConnectionValidationReady(AlterConnectionValidationReady),
318 TryDeferred {
319 conn_id: ConnectionId,
321 acquired_lock: Option<(CatalogItemId, tokio::sync::OwnedMutexGuard<()>)>,
331 },
332 GroupCommitInitiate(Span, Option<GroupCommitPermit>),
334 DeferredStatementReady,
335 AdvanceTimelines,
336 ClusterEvent(ClusterEvent),
337 CancelPendingPeeks {
338 conn_id: ConnectionId,
339 },
340 LinearizeReads,
341 StagedBatches {
342 conn_id: ConnectionId,
343 table_id: CatalogItemId,
344 batches: Vec<Result<ProtoBatch, String>>,
345 },
346 StorageUsageSchedule,
347 StorageUsageFetch,
348 StorageUsageUpdate(ShardsUsageReferenced),
349 StorageUsagePrune(Vec<BuiltinTableUpdate>),
350 RetireExecute {
353 data: ExecuteContextExtra,
354 otel_ctx: OpenTelemetryContext,
355 reason: StatementEndedExecutionReason,
356 },
357 ExecuteSingleStatementTransaction {
358 ctx: ExecuteContext,
359 otel_ctx: OpenTelemetryContext,
360 stmt: Arc<Statement<Raw>>,
361 params: mz_sql::plan::Params,
362 },
363 PeekStageReady {
364 ctx: ExecuteContext,
365 span: Span,
366 stage: PeekStage,
367 },
368 CreateIndexStageReady {
369 ctx: ExecuteContext,
370 span: Span,
371 stage: CreateIndexStage,
372 },
373 CreateViewStageReady {
374 ctx: ExecuteContext,
375 span: Span,
376 stage: CreateViewStage,
377 },
378 CreateMaterializedViewStageReady {
379 ctx: ExecuteContext,
380 span: Span,
381 stage: CreateMaterializedViewStage,
382 },
383 SubscribeStageReady {
384 ctx: ExecuteContext,
385 span: Span,
386 stage: SubscribeStage,
387 },
388 IntrospectionSubscribeStageReady {
389 span: Span,
390 stage: IntrospectionSubscribeStage,
391 },
392 SecretStageReady {
393 ctx: ExecuteContext,
394 span: Span,
395 stage: SecretStage,
396 },
397 ClusterStageReady {
398 ctx: ExecuteContext,
399 span: Span,
400 stage: ClusterStage,
401 },
402 ExplainTimestampStageReady {
403 ctx: ExecuteContext,
404 span: Span,
405 stage: ExplainTimestampStage,
406 },
407 DrainStatementLog,
408 PrivateLinkVpcEndpointEvents(Vec<VpcEndpointEvent>),
409 CheckSchedulingPolicies,
410
411 SchedulingDecisions(Vec<(&'static str, Vec<(ClusterId, SchedulingDecision)>)>),
416}
417
418impl Message {
419 pub const fn kind(&self) -> &'static str {
421 match self {
422 Message::Command(_, msg) => match msg {
423 Command::CatalogSnapshot { .. } => "command-catalog_snapshot",
424 Command::Startup { .. } => "command-startup",
425 Command::Execute { .. } => "command-execute",
426 Command::Commit { .. } => "command-commit",
427 Command::CancelRequest { .. } => "command-cancel_request",
428 Command::PrivilegedCancelRequest { .. } => "command-privileged_cancel_request",
429 Command::GetWebhook { .. } => "command-get_webhook",
430 Command::GetSystemVars { .. } => "command-get_system_vars",
431 Command::SetSystemVars { .. } => "command-set_system_vars",
432 Command::Terminate { .. } => "command-terminate",
433 Command::RetireExecute { .. } => "command-retire_execute",
434 Command::CheckConsistency { .. } => "command-check_consistency",
435 Command::Dump { .. } => "command-dump",
436 Command::AuthenticatePassword { .. } => "command-auth_check",
437 Command::AuthenticateGetSASLChallenge { .. } => "command-auth_get_sasl_challenge",
438 Command::AuthenticateVerifySASLProof { .. } => "command-auth_verify_sasl_proof",
439 Command::GetComputeInstanceClient { .. } => "get-compute-instance-client",
440 Command::GetOracle { .. } => "get-oracle",
441 Command::DetermineRealTimeRecentTimestamp { .. } => {
442 "determine-real-time-recent-timestamp"
443 }
444 Command::GetTransactionReadHoldsBundle { .. } => {
445 "get-transaction-read-holds-bundle"
446 }
447 Command::StoreTransactionReadHolds { .. } => "store-transaction-read-holds",
448 Command::ExecuteSlowPathPeek { .. } => "execute-slow-path-peek",
449 Command::CopyToPreflight { .. } => "copy-to-preflight",
450 Command::ExecuteCopyTo { .. } => "execute-copy-to",
451 Command::ExecuteSideEffectingFunc { .. } => "execute-side-effecting-func",
452 Command::RegisterFrontendPeek { .. } => "register-frontend-peek",
453 Command::UnregisterFrontendPeek { .. } => "unregister-frontend-peek",
454 Command::ExplainTimestamp { .. } => "explain-timestamp",
455 Command::FrontendStatementLogging(..) => "frontend-statement-logging",
456 Command::StartCopyFromStdin { .. } => "start-copy-from-stdin",
457 },
458 Message::ControllerReady {
459 controller: ControllerReadiness::Compute,
460 } => "controller_ready(compute)",
461 Message::ControllerReady {
462 controller: ControllerReadiness::Storage,
463 } => "controller_ready(storage)",
464 Message::ControllerReady {
465 controller: ControllerReadiness::Metrics,
466 } => "controller_ready(metrics)",
467 Message::ControllerReady {
468 controller: ControllerReadiness::Internal,
469 } => "controller_ready(internal)",
470 Message::PurifiedStatementReady(_) => "purified_statement_ready",
471 Message::CreateConnectionValidationReady(_) => "create_connection_validation_ready",
472 Message::TryDeferred { .. } => "try_deferred",
473 Message::GroupCommitInitiate(..) => "group_commit_initiate",
474 Message::AdvanceTimelines => "advance_timelines",
475 Message::ClusterEvent(_) => "cluster_event",
476 Message::CancelPendingPeeks { .. } => "cancel_pending_peeks",
477 Message::LinearizeReads => "linearize_reads",
478 Message::StagedBatches { .. } => "staged_batches",
479 Message::StorageUsageSchedule => "storage_usage_schedule",
480 Message::StorageUsageFetch => "storage_usage_fetch",
481 Message::StorageUsageUpdate(_) => "storage_usage_update",
482 Message::StorageUsagePrune(_) => "storage_usage_prune",
483 Message::RetireExecute { .. } => "retire_execute",
484 Message::ExecuteSingleStatementTransaction { .. } => {
485 "execute_single_statement_transaction"
486 }
487 Message::PeekStageReady { .. } => "peek_stage_ready",
488 Message::ExplainTimestampStageReady { .. } => "explain_timestamp_stage_ready",
489 Message::CreateIndexStageReady { .. } => "create_index_stage_ready",
490 Message::CreateViewStageReady { .. } => "create_view_stage_ready",
491 Message::CreateMaterializedViewStageReady { .. } => {
492 "create_materialized_view_stage_ready"
493 }
494 Message::SubscribeStageReady { .. } => "subscribe_stage_ready",
495 Message::IntrospectionSubscribeStageReady { .. } => {
496 "introspection_subscribe_stage_ready"
497 }
498 Message::SecretStageReady { .. } => "secret_stage_ready",
499 Message::ClusterStageReady { .. } => "cluster_stage_ready",
500 Message::DrainStatementLog => "drain_statement_log",
501 Message::AlterConnectionValidationReady(..) => "alter_connection_validation_ready",
502 Message::PrivateLinkVpcEndpointEvents(_) => "private_link_vpc_endpoint_events",
503 Message::CheckSchedulingPolicies => "check_scheduling_policies",
504 Message::SchedulingDecisions { .. } => "scheduling_decision",
505 Message::DeferredStatementReady => "deferred_statement_ready",
506 }
507 }
508}
509
510#[derive(Debug)]
512pub enum ControllerReadiness {
513 Storage,
515 Compute,
517 Metrics,
519 Internal,
521}
522
523#[derive(Derivative)]
524#[derivative(Debug)]
525pub struct BackgroundWorkResult<T> {
526 #[derivative(Debug = "ignore")]
527 pub ctx: ExecuteContext,
528 pub result: Result<T, AdapterError>,
529 pub params: Params,
530 pub plan_validity: PlanValidity,
531 pub original_stmt: Arc<Statement<Raw>>,
532 pub otel_ctx: OpenTelemetryContext,
533}
534
535pub type PurifiedStatementReady = BackgroundWorkResult<mz_sql::pure::PurifiedStatement>;
536
537#[derive(Derivative)]
538#[derivative(Debug)]
539pub struct ValidationReady<T> {
540 #[derivative(Debug = "ignore")]
541 pub ctx: ExecuteContext,
542 pub result: Result<T, AdapterError>,
543 pub resolved_ids: ResolvedIds,
544 pub connection_id: CatalogItemId,
545 pub connection_gid: GlobalId,
546 pub plan_validity: PlanValidity,
547 pub otel_ctx: OpenTelemetryContext,
548}
549
550pub type CreateConnectionValidationReady = ValidationReady<CreateConnectionPlan>;
551pub type AlterConnectionValidationReady = ValidationReady<Connection>;
552
553#[derive(Debug)]
554pub enum PeekStage {
555 LinearizeTimestamp(PeekStageLinearizeTimestamp),
557 RealTimeRecency(PeekStageRealTimeRecency),
558 TimestampReadHold(PeekStageTimestampReadHold),
559 Optimize(PeekStageOptimize),
560 Finish(PeekStageFinish),
562 ExplainPlan(PeekStageExplainPlan),
564 ExplainPushdown(PeekStageExplainPushdown),
565 CopyToPreflight(PeekStageCopyTo),
567 CopyToDataflow(PeekStageCopyTo),
569}
570
571#[derive(Debug)]
572pub struct CopyToContext {
573 pub desc: RelationDesc,
575 pub uri: Uri,
577 pub connection: StorageConnection<ReferencedConnection>,
579 pub connection_id: CatalogItemId,
581 pub format: S3SinkFormat,
583 pub max_file_size: u64,
585 pub output_batch_count: Option<u64>,
590}
591
592#[derive(Debug)]
593pub struct PeekStageLinearizeTimestamp {
594 validity: PlanValidity,
595 plan: mz_sql::plan::SelectPlan,
596 max_query_result_size: Option<u64>,
597 source_ids: BTreeSet<GlobalId>,
598 target_replica: Option<ReplicaId>,
599 timeline_context: TimelineContext,
600 optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
601 explain_ctx: ExplainContext,
604}
605
606#[derive(Debug)]
607pub struct PeekStageRealTimeRecency {
608 validity: PlanValidity,
609 plan: mz_sql::plan::SelectPlan,
610 max_query_result_size: Option<u64>,
611 source_ids: BTreeSet<GlobalId>,
612 target_replica: Option<ReplicaId>,
613 timeline_context: TimelineContext,
614 oracle_read_ts: Option<Timestamp>,
615 optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
616 explain_ctx: ExplainContext,
619}
620
621#[derive(Debug)]
622pub struct PeekStageTimestampReadHold {
623 validity: PlanValidity,
624 plan: mz_sql::plan::SelectPlan,
625 max_query_result_size: Option<u64>,
626 source_ids: BTreeSet<GlobalId>,
627 target_replica: Option<ReplicaId>,
628 timeline_context: TimelineContext,
629 oracle_read_ts: Option<Timestamp>,
630 real_time_recency_ts: Option<mz_repr::Timestamp>,
631 optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
632 explain_ctx: ExplainContext,
635}
636
637#[derive(Debug)]
638pub struct PeekStageOptimize {
639 validity: PlanValidity,
640 plan: mz_sql::plan::SelectPlan,
641 max_query_result_size: Option<u64>,
642 source_ids: BTreeSet<GlobalId>,
643 id_bundle: CollectionIdBundle,
644 target_replica: Option<ReplicaId>,
645 determination: TimestampDetermination<mz_repr::Timestamp>,
646 optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
647 explain_ctx: ExplainContext,
650}
651
652#[derive(Debug)]
653pub struct PeekStageFinish {
654 validity: PlanValidity,
655 plan: mz_sql::plan::SelectPlan,
656 max_query_result_size: Option<u64>,
657 id_bundle: CollectionIdBundle,
658 target_replica: Option<ReplicaId>,
659 source_ids: BTreeSet<GlobalId>,
660 determination: TimestampDetermination<mz_repr::Timestamp>,
661 cluster_id: ComputeInstanceId,
662 finishing: RowSetFinishing,
663 plan_insights_optimizer_trace: Option<OptimizerTrace>,
666 insights_ctx: Option<Box<PlanInsightsContext>>,
667 global_lir_plan: optimize::peek::GlobalLirPlan,
668 optimization_finished_at: EpochMillis,
669}
670
671#[derive(Debug)]
672pub struct PeekStageCopyTo {
673 validity: PlanValidity,
674 optimizer: optimize::copy_to::Optimizer,
675 global_lir_plan: optimize::copy_to::GlobalLirPlan,
676 optimization_finished_at: EpochMillis,
677 source_ids: BTreeSet<GlobalId>,
678}
679
680#[derive(Debug)]
681pub struct PeekStageExplainPlan {
682 validity: PlanValidity,
683 optimizer: optimize::peek::Optimizer,
684 df_meta: DataflowMetainfo,
685 explain_ctx: ExplainPlanContext,
686 insights_ctx: Option<Box<PlanInsightsContext>>,
687}
688
689#[derive(Debug)]
690pub struct PeekStageExplainPushdown {
691 validity: PlanValidity,
692 determination: TimestampDetermination<mz_repr::Timestamp>,
693 imports: BTreeMap<GlobalId, MapFilterProject>,
694}
695
696#[derive(Debug)]
697pub enum CreateIndexStage {
698 Optimize(CreateIndexOptimize),
699 Finish(CreateIndexFinish),
700 Explain(CreateIndexExplain),
701}
702
703#[derive(Debug)]
704pub struct CreateIndexOptimize {
705 validity: PlanValidity,
706 plan: plan::CreateIndexPlan,
707 resolved_ids: ResolvedIds,
708 explain_ctx: ExplainContext,
711}
712
713#[derive(Debug)]
714pub struct CreateIndexFinish {
715 validity: PlanValidity,
716 item_id: CatalogItemId,
717 global_id: GlobalId,
718 plan: plan::CreateIndexPlan,
719 resolved_ids: ResolvedIds,
720 global_mir_plan: optimize::index::GlobalMirPlan,
721 global_lir_plan: optimize::index::GlobalLirPlan,
722 optimizer_features: OptimizerFeatures,
723}
724
725#[derive(Debug)]
726pub struct CreateIndexExplain {
727 validity: PlanValidity,
728 exported_index_id: GlobalId,
729 plan: plan::CreateIndexPlan,
730 df_meta: DataflowMetainfo,
731 explain_ctx: ExplainPlanContext,
732}
733
734#[derive(Debug)]
735pub enum CreateViewStage {
736 Optimize(CreateViewOptimize),
737 Finish(CreateViewFinish),
738 Explain(CreateViewExplain),
739}
740
741#[derive(Debug)]
742pub struct CreateViewOptimize {
743 validity: PlanValidity,
744 plan: plan::CreateViewPlan,
745 resolved_ids: ResolvedIds,
746 explain_ctx: ExplainContext,
749}
750
751#[derive(Debug)]
752pub struct CreateViewFinish {
753 validity: PlanValidity,
754 item_id: CatalogItemId,
756 global_id: GlobalId,
758 plan: plan::CreateViewPlan,
759 resolved_ids: ResolvedIds,
761 optimized_expr: OptimizedMirRelationExpr,
762}
763
764#[derive(Debug)]
765pub struct CreateViewExplain {
766 validity: PlanValidity,
767 id: GlobalId,
768 plan: plan::CreateViewPlan,
769 explain_ctx: ExplainPlanContext,
770}
771
772#[derive(Debug)]
773pub enum ExplainTimestampStage {
774 Optimize(ExplainTimestampOptimize),
775 RealTimeRecency(ExplainTimestampRealTimeRecency),
776 Finish(ExplainTimestampFinish),
777}
778
779#[derive(Debug)]
780pub struct ExplainTimestampOptimize {
781 validity: PlanValidity,
782 plan: plan::ExplainTimestampPlan,
783 cluster_id: ClusterId,
784}
785
786#[derive(Debug)]
787pub struct ExplainTimestampRealTimeRecency {
788 validity: PlanValidity,
789 format: ExplainFormat,
790 optimized_plan: OptimizedMirRelationExpr,
791 cluster_id: ClusterId,
792 when: QueryWhen,
793}
794
795#[derive(Debug)]
796pub struct ExplainTimestampFinish {
797 validity: PlanValidity,
798 format: ExplainFormat,
799 optimized_plan: OptimizedMirRelationExpr,
800 cluster_id: ClusterId,
801 source_ids: BTreeSet<GlobalId>,
802 when: QueryWhen,
803 real_time_recency_ts: Option<Timestamp>,
804}
805
806#[derive(Debug)]
807pub enum ClusterStage {
808 Alter(AlterCluster),
809 WaitForHydrated(AlterClusterWaitForHydrated),
810 Finalize(AlterClusterFinalize),
811}
812
813#[derive(Debug)]
814pub struct AlterCluster {
815 validity: PlanValidity,
816 plan: plan::AlterClusterPlan,
817}
818
819#[derive(Debug)]
820pub struct AlterClusterWaitForHydrated {
821 validity: PlanValidity,
822 plan: plan::AlterClusterPlan,
823 new_config: ClusterVariantManaged,
824 timeout_time: Instant,
825 on_timeout: OnTimeoutAction,
826}
827
828#[derive(Debug)]
829pub struct AlterClusterFinalize {
830 validity: PlanValidity,
831 plan: plan::AlterClusterPlan,
832 new_config: ClusterVariantManaged,
833}
834
835#[derive(Debug)]
836pub enum ExplainContext {
837 None,
839 Plan(ExplainPlanContext),
841 PlanInsightsNotice(OptimizerTrace),
844 Pushdown,
846}
847
848impl ExplainContext {
849 pub(crate) fn dispatch_guard(&self) -> Option<DispatchGuard<'_>> {
853 let optimizer_trace = match self {
854 ExplainContext::Plan(explain_ctx) => Some(&explain_ctx.optimizer_trace),
855 ExplainContext::PlanInsightsNotice(optimizer_trace) => Some(optimizer_trace),
856 _ => None,
857 };
858 optimizer_trace.map(|optimizer_trace| optimizer_trace.as_guard())
859 }
860
861 pub(crate) fn needs_cluster(&self) -> bool {
862 match self {
863 ExplainContext::None => true,
864 ExplainContext::Plan(..) => false,
865 ExplainContext::PlanInsightsNotice(..) => true,
866 ExplainContext::Pushdown => false,
867 }
868 }
869
870 pub(crate) fn needs_plan_insights(&self) -> bool {
871 matches!(
872 self,
873 ExplainContext::Plan(ExplainPlanContext {
874 stage: ExplainStage::PlanInsights,
875 ..
876 }) | ExplainContext::PlanInsightsNotice(_)
877 )
878 }
879}
880
881#[derive(Debug)]
882pub struct ExplainPlanContext {
883 pub broken: bool,
888 pub config: ExplainConfig,
889 pub format: ExplainFormat,
890 pub stage: ExplainStage,
891 pub replan: Option<GlobalId>,
892 pub desc: Option<RelationDesc>,
893 pub optimizer_trace: OptimizerTrace,
894}
895
896#[derive(Debug)]
897pub enum CreateMaterializedViewStage {
898 Optimize(CreateMaterializedViewOptimize),
899 Finish(CreateMaterializedViewFinish),
900 Explain(CreateMaterializedViewExplain),
901}
902
903#[derive(Debug)]
904pub struct CreateMaterializedViewOptimize {
905 validity: PlanValidity,
906 plan: plan::CreateMaterializedViewPlan,
907 resolved_ids: ResolvedIds,
908 explain_ctx: ExplainContext,
911}
912
913#[derive(Debug)]
914pub struct CreateMaterializedViewFinish {
915 item_id: CatalogItemId,
917 global_id: GlobalId,
919 validity: PlanValidity,
920 plan: plan::CreateMaterializedViewPlan,
921 resolved_ids: ResolvedIds,
922 local_mir_plan: optimize::materialized_view::LocalMirPlan,
923 global_mir_plan: optimize::materialized_view::GlobalMirPlan,
924 global_lir_plan: optimize::materialized_view::GlobalLirPlan,
925 optimizer_features: OptimizerFeatures,
926}
927
928#[derive(Debug)]
929pub struct CreateMaterializedViewExplain {
930 global_id: GlobalId,
931 validity: PlanValidity,
932 plan: plan::CreateMaterializedViewPlan,
933 df_meta: DataflowMetainfo,
934 explain_ctx: ExplainPlanContext,
935}
936
937#[derive(Debug)]
938pub enum SubscribeStage {
939 OptimizeMir(SubscribeOptimizeMir),
940 TimestampOptimizeLir(SubscribeTimestampOptimizeLir),
941 Finish(SubscribeFinish),
942 Explain(SubscribeExplain),
943}
944
945#[derive(Debug)]
946pub struct SubscribeOptimizeMir {
947 validity: PlanValidity,
948 plan: plan::SubscribePlan,
949 timeline: TimelineContext,
950 dependency_ids: BTreeSet<GlobalId>,
951 cluster_id: ComputeInstanceId,
952 replica_id: Option<ReplicaId>,
953 explain_ctx: ExplainContext,
956}
957
958#[derive(Debug)]
959pub struct SubscribeTimestampOptimizeLir {
960 validity: PlanValidity,
961 plan: plan::SubscribePlan,
962 timeline: TimelineContext,
963 optimizer: optimize::subscribe::Optimizer,
964 global_mir_plan: optimize::subscribe::GlobalMirPlan<optimize::subscribe::Unresolved>,
965 dependency_ids: BTreeSet<GlobalId>,
966 replica_id: Option<ReplicaId>,
967 explain_ctx: ExplainContext,
970}
971
972#[derive(Debug)]
973pub struct SubscribeFinish {
974 validity: PlanValidity,
975 cluster_id: ComputeInstanceId,
976 replica_id: Option<ReplicaId>,
977 plan: plan::SubscribePlan,
978 global_lir_plan: optimize::subscribe::GlobalLirPlan,
979 dependency_ids: BTreeSet<GlobalId>,
980}
981
982#[derive(Debug)]
983pub struct SubscribeExplain {
984 validity: PlanValidity,
985 optimizer: optimize::subscribe::Optimizer,
986 df_meta: DataflowMetainfo,
987 cluster_id: ComputeInstanceId,
988 explain_ctx: ExplainPlanContext,
989}
990
991#[derive(Debug)]
992pub enum IntrospectionSubscribeStage {
993 OptimizeMir(IntrospectionSubscribeOptimizeMir),
994 TimestampOptimizeLir(IntrospectionSubscribeTimestampOptimizeLir),
995 Finish(IntrospectionSubscribeFinish),
996}
997
998#[derive(Debug)]
999pub struct IntrospectionSubscribeOptimizeMir {
1000 validity: PlanValidity,
1001 plan: plan::SubscribePlan,
1002 subscribe_id: GlobalId,
1003 cluster_id: ComputeInstanceId,
1004 replica_id: ReplicaId,
1005}
1006
1007#[derive(Debug)]
1008pub struct IntrospectionSubscribeTimestampOptimizeLir {
1009 validity: PlanValidity,
1010 optimizer: optimize::subscribe::Optimizer,
1011 global_mir_plan: optimize::subscribe::GlobalMirPlan<optimize::subscribe::Unresolved>,
1012 cluster_id: ComputeInstanceId,
1013 replica_id: ReplicaId,
1014}
1015
1016#[derive(Debug)]
1017pub struct IntrospectionSubscribeFinish {
1018 validity: PlanValidity,
1019 global_lir_plan: optimize::subscribe::GlobalLirPlan,
1020 read_holds: ReadHolds<Timestamp>,
1021 cluster_id: ComputeInstanceId,
1022 replica_id: ReplicaId,
1023}
1024
1025#[derive(Debug)]
1026pub enum SecretStage {
1027 CreateEnsure(CreateSecretEnsure),
1028 CreateFinish(CreateSecretFinish),
1029 RotateKeysEnsure(RotateKeysSecretEnsure),
1030 RotateKeysFinish(RotateKeysSecretFinish),
1031 Alter(AlterSecret),
1032}
1033
1034#[derive(Debug)]
1035pub struct CreateSecretEnsure {
1036 validity: PlanValidity,
1037 plan: plan::CreateSecretPlan,
1038}
1039
1040#[derive(Debug)]
1041pub struct CreateSecretFinish {
1042 validity: PlanValidity,
1043 item_id: CatalogItemId,
1044 global_id: GlobalId,
1045 plan: plan::CreateSecretPlan,
1046}
1047
1048#[derive(Debug)]
1049pub struct RotateKeysSecretEnsure {
1050 validity: PlanValidity,
1051 id: CatalogItemId,
1052}
1053
1054#[derive(Debug)]
1055pub struct RotateKeysSecretFinish {
1056 validity: PlanValidity,
1057 ops: Vec<crate::catalog::Op>,
1058}
1059
1060#[derive(Debug)]
1061pub struct AlterSecret {
1062 validity: PlanValidity,
1063 plan: plan::AlterSecretPlan,
1064}
1065
1066#[derive(Debug, Copy, Clone, PartialEq, Eq)]
1071pub enum TargetCluster {
1072 CatalogServer,
1074 Active,
1076 Transaction(ClusterId),
1078}
1079
1080pub(crate) enum StageResult<T> {
1082 Handle(JoinHandle<Result<T, AdapterError>>),
1084 HandleRetire(JoinHandle<Result<ExecuteResponse, AdapterError>>),
1086 Immediate(T),
1088 Response(ExecuteResponse),
1090}
1091
1092pub(crate) trait Staged: Send {
1094 type Ctx: StagedContext;
1095
1096 fn validity(&mut self) -> &mut PlanValidity;
1097
1098 async fn stage(
1100 self,
1101 coord: &mut Coordinator,
1102 ctx: &mut Self::Ctx,
1103 ) -> Result<StageResult<Box<Self>>, AdapterError>;
1104
1105 fn message(self, ctx: Self::Ctx, span: Span) -> Message;
1107
1108 fn cancel_enabled(&self) -> bool;
1110}
1111
1112pub trait StagedContext {
1113 fn retire(self, result: Result<ExecuteResponse, AdapterError>);
1114 fn session(&self) -> Option<&Session>;
1115}
1116
1117impl StagedContext for ExecuteContext {
1118 fn retire(self, result: Result<ExecuteResponse, AdapterError>) {
1119 self.retire(result);
1120 }
1121
1122 fn session(&self) -> Option<&Session> {
1123 Some(self.session())
1124 }
1125}
1126
1127impl StagedContext for () {
1128 fn retire(self, _result: Result<ExecuteResponse, AdapterError>) {}
1129
1130 fn session(&self) -> Option<&Session> {
1131 None
1132 }
1133}
1134
1135pub struct Config {
1137 pub controller_config: ControllerConfig,
1138 pub controller_envd_epoch: NonZeroI64,
1139 pub storage: Box<dyn mz_catalog::durable::DurableCatalogState>,
1140 pub audit_logs_iterator: AuditLogIterator,
1141 pub timestamp_oracle_url: Option<SensitiveUrl>,
1142 pub unsafe_mode: bool,
1143 pub all_features: bool,
1144 pub build_info: &'static BuildInfo,
1145 pub environment_id: EnvironmentId,
1146 pub metrics_registry: MetricsRegistry,
1147 pub now: NowFn,
1148 pub secrets_controller: Arc<dyn SecretsController>,
1149 pub cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
1150 pub availability_zones: Vec<String>,
1151 pub cluster_replica_sizes: ClusterReplicaSizeMap,
1152 pub builtin_system_cluster_config: BootstrapBuiltinClusterConfig,
1153 pub builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig,
1154 pub builtin_probe_cluster_config: BootstrapBuiltinClusterConfig,
1155 pub builtin_support_cluster_config: BootstrapBuiltinClusterConfig,
1156 pub builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig,
1157 pub system_parameter_defaults: BTreeMap<String, String>,
1158 pub storage_usage_client: StorageUsageClient,
1159 pub storage_usage_collection_interval: Duration,
1160 pub storage_usage_retention_period: Option<Duration>,
1161 pub segment_client: Option<mz_segment::Client>,
1162 pub egress_addresses: Vec<IpNet>,
1163 pub remote_system_parameters: Option<BTreeMap<String, String>>,
1164 pub aws_account_id: Option<String>,
1165 pub aws_privatelink_availability_zones: Option<Vec<String>>,
1166 pub connection_context: ConnectionContext,
1167 pub connection_limit_callback: Box<dyn Fn(u64, u64) -> () + Send + Sync + 'static>,
1168 pub webhook_concurrency_limit: WebhookConcurrencyLimiter,
1169 pub http_host_name: Option<String>,
1170 pub tracing_handle: TracingHandle,
1171 pub read_only_controllers: bool,
1175
1176 pub caught_up_trigger: Option<Trigger>,
1180
1181 pub helm_chart_version: Option<String>,
1182 pub license_key: ValidatedLicenseKey,
1183 pub external_login_password_mz_system: Option<Password>,
1184 pub force_builtin_schema_migration: Option<String>,
1185}
1186
1187#[derive(Debug, Serialize)]
1189pub struct ConnMeta {
1190 secret_key: u32,
1195 connected_at: EpochMillis,
1197 user: User,
1198 application_name: String,
1199 uuid: Uuid,
1200 conn_id: ConnectionId,
1201 client_ip: Option<IpAddr>,
1202
1203 drop_sinks: BTreeSet<GlobalId>,
1206
1207 #[serde(skip)]
1209 deferred_lock: Option<OwnedMutexGuard<()>>,
1210
1211 pending_cluster_alters: BTreeSet<ClusterId>,
1214
1215 #[serde(skip)]
1217 notice_tx: mpsc::UnboundedSender<AdapterNotice>,
1218
1219 authenticated_role: RoleId,
1223}
1224
1225impl ConnMeta {
1226 pub fn conn_id(&self) -> &ConnectionId {
1227 &self.conn_id
1228 }
1229
1230 pub fn user(&self) -> &User {
1231 &self.user
1232 }
1233
1234 pub fn application_name(&self) -> &str {
1235 &self.application_name
1236 }
1237
1238 pub fn authenticated_role_id(&self) -> &RoleId {
1239 &self.authenticated_role
1240 }
1241
1242 pub fn uuid(&self) -> Uuid {
1243 self.uuid
1244 }
1245
1246 pub fn client_ip(&self) -> Option<IpAddr> {
1247 self.client_ip
1248 }
1249
1250 pub fn connected_at(&self) -> EpochMillis {
1251 self.connected_at
1252 }
1253}
1254
1255#[derive(Debug)]
1256pub struct PendingTxn {
1258 ctx: ExecuteContext,
1260 response: Result<PendingTxnResponse, AdapterError>,
1262 action: EndTransactionAction,
1264}
1265
1266#[derive(Debug)]
1267pub enum PendingTxnResponse {
1269 Committed {
1271 params: BTreeMap<&'static str, String>,
1273 },
1274 Rolledback {
1276 params: BTreeMap<&'static str, String>,
1278 },
1279}
1280
1281impl PendingTxnResponse {
1282 pub fn extend_params(&mut self, p: impl IntoIterator<Item = (&'static str, String)>) {
1283 match self {
1284 PendingTxnResponse::Committed { params }
1285 | PendingTxnResponse::Rolledback { params } => params.extend(p),
1286 }
1287 }
1288}
1289
1290impl From<PendingTxnResponse> for ExecuteResponse {
1291 fn from(value: PendingTxnResponse) -> Self {
1292 match value {
1293 PendingTxnResponse::Committed { params } => {
1294 ExecuteResponse::TransactionCommitted { params }
1295 }
1296 PendingTxnResponse::Rolledback { params } => {
1297 ExecuteResponse::TransactionRolledBack { params }
1298 }
1299 }
1300 }
1301}
1302
1303#[derive(Debug)]
1304pub struct PendingReadTxn {
1306 txn: PendingRead,
1308 timestamp_context: TimestampContext<mz_repr::Timestamp>,
1310 created: Instant,
1312 num_requeues: u64,
1316 otel_ctx: OpenTelemetryContext,
1318}
1319
1320impl PendingReadTxn {
1321 pub fn timestamp_context(&self) -> &TimestampContext<mz_repr::Timestamp> {
1323 &self.timestamp_context
1324 }
1325
1326 pub(crate) fn take_context(self) -> ExecuteContext {
1327 self.txn.take_context()
1328 }
1329}
1330
1331#[derive(Debug)]
1332enum PendingRead {
1334 Read {
1335 txn: PendingTxn,
1337 },
1338 ReadThenWrite {
1339 ctx: ExecuteContext,
1341 tx: oneshot::Sender<Option<ExecuteContext>>,
1344 },
1345}
1346
1347impl PendingRead {
1348 #[instrument(level = "debug")]
1353 pub fn finish(self) -> Option<(ExecuteContext, Result<ExecuteResponse, AdapterError>)> {
1354 match self {
1355 PendingRead::Read {
1356 txn:
1357 PendingTxn {
1358 mut ctx,
1359 response,
1360 action,
1361 },
1362 ..
1363 } => {
1364 let changed = ctx.session_mut().vars_mut().end_transaction(action);
1365 let response = response.map(|mut r| {
1367 r.extend_params(changed);
1368 ExecuteResponse::from(r)
1369 });
1370
1371 Some((ctx, response))
1372 }
1373 PendingRead::ReadThenWrite { ctx, tx, .. } => {
1374 let _ = tx.send(Some(ctx));
1376 None
1377 }
1378 }
1379 }
1380
1381 fn label(&self) -> &'static str {
1382 match self {
1383 PendingRead::Read { .. } => "read",
1384 PendingRead::ReadThenWrite { .. } => "read_then_write",
1385 }
1386 }
1387
1388 pub(crate) fn take_context(self) -> ExecuteContext {
1389 match self {
1390 PendingRead::Read { txn, .. } => txn.ctx,
1391 PendingRead::ReadThenWrite { ctx, tx, .. } => {
1392 let _ = tx.send(None);
1395 ctx
1396 }
1397 }
1398 }
1399}
1400
1401#[derive(Debug, Default)]
1411#[must_use]
1412pub struct ExecuteContextExtra {
1413 statement_uuid: Option<StatementLoggingId>,
1414}
1415
1416impl ExecuteContextExtra {
1417 pub(crate) fn new(statement_uuid: Option<StatementLoggingId>) -> Self {
1418 Self { statement_uuid }
1419 }
1420 pub fn is_trivial(&self) -> bool {
1421 self.statement_uuid.is_none()
1422 }
1423 pub fn contents(&self) -> Option<StatementLoggingId> {
1424 self.statement_uuid
1425 }
1426 #[must_use]
1430 pub(crate) fn retire(self) -> Option<StatementLoggingId> {
1431 self.statement_uuid
1432 }
1433}
1434
1435#[derive(Debug)]
1445#[must_use]
1446pub struct ExecuteContextGuard {
1447 extra: ExecuteContextExtra,
1448 coordinator_tx: mpsc::UnboundedSender<Message>,
1453}
1454
1455impl Default for ExecuteContextGuard {
1456 fn default() -> Self {
1457 let (tx, _rx) = mpsc::unbounded_channel();
1461 Self {
1462 extra: ExecuteContextExtra::default(),
1463 coordinator_tx: tx,
1464 }
1465 }
1466}
1467
1468impl ExecuteContextGuard {
1469 pub(crate) fn new(
1470 statement_uuid: Option<StatementLoggingId>,
1471 coordinator_tx: mpsc::UnboundedSender<Message>,
1472 ) -> Self {
1473 Self {
1474 extra: ExecuteContextExtra::new(statement_uuid),
1475 coordinator_tx,
1476 }
1477 }
1478 pub fn is_trivial(&self) -> bool {
1479 self.extra.is_trivial()
1480 }
1481 pub fn contents(&self) -> Option<StatementLoggingId> {
1482 self.extra.contents()
1483 }
1484 pub(crate) fn defuse(mut self) -> ExecuteContextExtra {
1491 std::mem::take(&mut self.extra)
1493 }
1494}
1495
1496impl Drop for ExecuteContextGuard {
1497 fn drop(&mut self) {
1498 if let Some(statement_uuid) = self.extra.statement_uuid.take() {
1499 let msg = Message::RetireExecute {
1502 data: ExecuteContextExtra {
1503 statement_uuid: Some(statement_uuid),
1504 },
1505 otel_ctx: OpenTelemetryContext::obtain(),
1506 reason: StatementEndedExecutionReason::Aborted,
1507 };
1508 let _ = self.coordinator_tx.send(msg);
1511 }
1512 }
1513}
1514
1515#[derive(Debug)]
1527pub struct ExecuteContext {
1528 inner: Box<ExecuteContextInner>,
1529}
1530
1531impl std::ops::Deref for ExecuteContext {
1532 type Target = ExecuteContextInner;
1533 fn deref(&self) -> &Self::Target {
1534 &*self.inner
1535 }
1536}
1537
1538impl std::ops::DerefMut for ExecuteContext {
1539 fn deref_mut(&mut self) -> &mut Self::Target {
1540 &mut *self.inner
1541 }
1542}
1543
1544#[derive(Debug)]
1545pub struct ExecuteContextInner {
1546 tx: ClientTransmitter<ExecuteResponse>,
1547 internal_cmd_tx: mpsc::UnboundedSender<Message>,
1548 session: Session,
1549 extra: ExecuteContextGuard,
1550}
1551
1552impl ExecuteContext {
1553 pub fn session(&self) -> &Session {
1554 &self.session
1555 }
1556
1557 pub fn session_mut(&mut self) -> &mut Session {
1558 &mut self.session
1559 }
1560
1561 pub fn tx(&self) -> &ClientTransmitter<ExecuteResponse> {
1562 &self.tx
1563 }
1564
1565 pub fn tx_mut(&mut self) -> &mut ClientTransmitter<ExecuteResponse> {
1566 &mut self.tx
1567 }
1568
1569 pub fn from_parts(
1570 tx: ClientTransmitter<ExecuteResponse>,
1571 internal_cmd_tx: mpsc::UnboundedSender<Message>,
1572 session: Session,
1573 extra: ExecuteContextGuard,
1574 ) -> Self {
1575 Self {
1576 inner: ExecuteContextInner {
1577 tx,
1578 session,
1579 extra,
1580 internal_cmd_tx,
1581 }
1582 .into(),
1583 }
1584 }
1585
1586 pub fn into_parts(
1595 self,
1596 ) -> (
1597 ClientTransmitter<ExecuteResponse>,
1598 mpsc::UnboundedSender<Message>,
1599 Session,
1600 ExecuteContextGuard,
1601 ) {
1602 let ExecuteContextInner {
1603 tx,
1604 internal_cmd_tx,
1605 session,
1606 extra,
1607 } = *self.inner;
1608 (tx, internal_cmd_tx, session, extra)
1609 }
1610
1611 #[instrument(level = "debug")]
1613 pub fn retire(self, result: Result<ExecuteResponse, AdapterError>) {
1614 let ExecuteContextInner {
1615 tx,
1616 internal_cmd_tx,
1617 session,
1618 extra,
1619 } = *self.inner;
1620 let reason = if extra.is_trivial() {
1621 None
1622 } else {
1623 Some((&result).into())
1624 };
1625 tx.send(result, session);
1626 if let Some(reason) = reason {
1627 let extra = extra.defuse();
1629 if let Err(e) = internal_cmd_tx.send(Message::RetireExecute {
1630 otel_ctx: OpenTelemetryContext::obtain(),
1631 data: extra,
1632 reason,
1633 }) {
1634 warn!("internal_cmd_rx dropped before we could send: {:?}", e);
1635 }
1636 }
1637 }
1638
1639 pub fn extra(&self) -> &ExecuteContextGuard {
1640 &self.extra
1641 }
1642
1643 pub fn extra_mut(&mut self) -> &mut ExecuteContextGuard {
1644 &mut self.extra
1645 }
1646}
1647
1648#[derive(Debug)]
1649struct ClusterReplicaStatuses(
1650 BTreeMap<ClusterId, BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>>>,
1651);
1652
1653impl ClusterReplicaStatuses {
1654 pub(crate) fn new() -> ClusterReplicaStatuses {
1655 ClusterReplicaStatuses(BTreeMap::new())
1656 }
1657
1658 pub(crate) fn initialize_cluster_statuses(&mut self, cluster_id: ClusterId) {
1662 let prev = self.0.insert(cluster_id, BTreeMap::new());
1663 assert_eq!(
1664 prev, None,
1665 "cluster {cluster_id} statuses already initialized"
1666 );
1667 }
1668
1669 pub(crate) fn initialize_cluster_replica_statuses(
1673 &mut self,
1674 cluster_id: ClusterId,
1675 replica_id: ReplicaId,
1676 num_processes: usize,
1677 time: DateTime<Utc>,
1678 ) {
1679 tracing::info!(
1680 ?cluster_id,
1681 ?replica_id,
1682 ?time,
1683 "initializing cluster replica status"
1684 );
1685 let replica_statuses = self.0.entry(cluster_id).or_default();
1686 let process_statuses = (0..num_processes)
1687 .map(|process_id| {
1688 let status = ClusterReplicaProcessStatus {
1689 status: ClusterStatus::Offline(Some(OfflineReason::Initializing)),
1690 time: time.clone(),
1691 };
1692 (u64::cast_from(process_id), status)
1693 })
1694 .collect();
1695 let prev = replica_statuses.insert(replica_id, process_statuses);
1696 assert_none!(
1697 prev,
1698 "cluster replica {cluster_id}.{replica_id} statuses already initialized"
1699 );
1700 }
1701
1702 pub(crate) fn remove_cluster_statuses(
1706 &mut self,
1707 cluster_id: &ClusterId,
1708 ) -> BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>> {
1709 let prev = self.0.remove(cluster_id);
1710 prev.unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"))
1711 }
1712
1713 pub(crate) fn remove_cluster_replica_statuses(
1717 &mut self,
1718 cluster_id: &ClusterId,
1719 replica_id: &ReplicaId,
1720 ) -> BTreeMap<ProcessId, ClusterReplicaProcessStatus> {
1721 let replica_statuses = self
1722 .0
1723 .get_mut(cluster_id)
1724 .unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"));
1725 let prev = replica_statuses.remove(replica_id);
1726 prev.unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1727 }
1728
1729 pub(crate) fn ensure_cluster_status(
1733 &mut self,
1734 cluster_id: ClusterId,
1735 replica_id: ReplicaId,
1736 process_id: ProcessId,
1737 status: ClusterReplicaProcessStatus,
1738 ) {
1739 let replica_statuses = self
1740 .0
1741 .get_mut(&cluster_id)
1742 .unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"))
1743 .get_mut(&replica_id)
1744 .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"));
1745 replica_statuses.insert(process_id, status);
1746 }
1747
1748 pub fn get_cluster_replica_status(
1752 &self,
1753 cluster_id: ClusterId,
1754 replica_id: ReplicaId,
1755 ) -> ClusterStatus {
1756 let process_status = self.get_cluster_replica_statuses(cluster_id, replica_id);
1757 Self::cluster_replica_status(process_status)
1758 }
1759
1760 pub fn cluster_replica_status(
1762 process_status: &BTreeMap<ProcessId, ClusterReplicaProcessStatus>,
1763 ) -> ClusterStatus {
1764 process_status
1765 .values()
1766 .fold(ClusterStatus::Online, |s, p| match (s, p.status) {
1767 (ClusterStatus::Online, ClusterStatus::Online) => ClusterStatus::Online,
1768 (x, y) => {
1769 let reason_x = match x {
1770 ClusterStatus::Offline(reason) => reason,
1771 ClusterStatus::Online => None,
1772 };
1773 let reason_y = match y {
1774 ClusterStatus::Offline(reason) => reason,
1775 ClusterStatus::Online => None,
1776 };
1777 ClusterStatus::Offline(reason_x.or(reason_y))
1779 }
1780 })
1781 }
1782
1783 pub(crate) fn get_cluster_replica_statuses(
1787 &self,
1788 cluster_id: ClusterId,
1789 replica_id: ReplicaId,
1790 ) -> &BTreeMap<ProcessId, ClusterReplicaProcessStatus> {
1791 self.try_get_cluster_replica_statuses(cluster_id, replica_id)
1792 .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1793 }
1794
1795 pub(crate) fn try_get_cluster_replica_statuses(
1797 &self,
1798 cluster_id: ClusterId,
1799 replica_id: ReplicaId,
1800 ) -> Option<&BTreeMap<ProcessId, ClusterReplicaProcessStatus>> {
1801 self.try_get_cluster_statuses(cluster_id)
1802 .and_then(|statuses| statuses.get(&replica_id))
1803 }
1804
1805 pub(crate) fn try_get_cluster_statuses(
1807 &self,
1808 cluster_id: ClusterId,
1809 ) -> Option<&BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>>> {
1810 self.0.get(&cluster_id)
1811 }
1812}
1813
1814#[derive(Derivative)]
1816#[derivative(Debug)]
1817pub struct Coordinator {
1818 #[derivative(Debug = "ignore")]
1820 controller: mz_controller::Controller,
1821 catalog: Arc<Catalog>,
1829
1830 persist_client: PersistClient,
1833
1834 internal_cmd_tx: mpsc::UnboundedSender<Message>,
1836 group_commit_tx: appends::GroupCommitNotifier,
1838
1839 strict_serializable_reads_tx: mpsc::UnboundedSender<(ConnectionId, PendingReadTxn)>,
1841
1842 global_timelines: BTreeMap<Timeline, TimelineState<Timestamp>>,
1845
1846 transient_id_gen: Arc<TransientIdGen>,
1848 active_conns: BTreeMap<ConnectionId, ConnMeta>,
1851
1852 txn_read_holds: BTreeMap<ConnectionId, read_policy::ReadHolds<Timestamp>>,
1856
1857 pending_peeks: BTreeMap<Uuid, PendingPeek>,
1861 client_pending_peeks: BTreeMap<ConnectionId, BTreeMap<Uuid, ClusterId>>,
1863
1864 pending_linearize_read_txns: BTreeMap<ConnectionId, PendingReadTxn>,
1866
1867 active_compute_sinks: BTreeMap<GlobalId, ActiveComputeSink>,
1869 active_webhooks: BTreeMap<CatalogItemId, WebhookAppenderInvalidator>,
1871 active_copies: BTreeMap<ConnectionId, ActiveCopyFrom>,
1874
1875 staged_cancellation: BTreeMap<ConnectionId, (watch::Sender<bool>, watch::Receiver<bool>)>,
1878 introspection_subscribes: BTreeMap<GlobalId, IntrospectionSubscribe>,
1880
1881 write_locks: BTreeMap<CatalogItemId, Arc<tokio::sync::Mutex<()>>>,
1883 deferred_write_ops: BTreeMap<ConnectionId, DeferredOp>,
1885
1886 pending_writes: Vec<PendingWriteTxn>,
1888
1889 advance_timelines_interval: Interval,
1899
1900 serialized_ddl: LockedVecDeque<DeferredPlanStatement>,
1909
1910 secrets_controller: Arc<dyn SecretsController>,
1913 caching_secrets_reader: CachingSecretsReader,
1915
1916 cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
1919
1920 storage_usage_client: StorageUsageClient,
1922 storage_usage_collection_interval: Duration,
1924
1925 #[derivative(Debug = "ignore")]
1927 segment_client: Option<mz_segment::Client>,
1928
1929 metrics: Metrics,
1931 optimizer_metrics: OptimizerMetrics,
1933
1934 tracing_handle: TracingHandle,
1936
1937 statement_logging: StatementLogging,
1939
1940 webhook_concurrency_limit: WebhookConcurrencyLimiter,
1942
1943 timestamp_oracle_config: Option<TimestampOracleConfig>,
1946
1947 check_cluster_scheduling_policies_interval: Interval,
1949
1950 cluster_scheduling_decisions: BTreeMap<ClusterId, BTreeMap<&'static str, SchedulingDecision>>,
1954
1955 caught_up_check_interval: Interval,
1958
1959 caught_up_check: Option<CaughtUpCheckContext>,
1962
1963 installed_watch_sets: BTreeMap<WatchSetId, (ConnectionId, WatchSetResponse)>,
1965
1966 connection_watch_sets: BTreeMap<ConnectionId, BTreeSet<WatchSetId>>,
1968
1969 cluster_replica_statuses: ClusterReplicaStatuses,
1971
1972 read_only_controllers: bool,
1976
1977 buffered_builtin_table_updates: Option<Vec<BuiltinTableUpdate>>,
1985
1986 license_key: ValidatedLicenseKey,
1987
1988 user_id_pool: IdPool,
1990}
1991
1992impl Coordinator {
1993 #[instrument(name = "coord::bootstrap")]
1997 pub(crate) async fn bootstrap(
1998 &mut self,
1999 boot_ts: Timestamp,
2000 migrated_storage_collections_0dt: BTreeSet<CatalogItemId>,
2001 mut builtin_table_updates: Vec<BuiltinTableUpdate>,
2002 cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
2003 uncached_local_exprs: BTreeMap<GlobalId, LocalExpressions>,
2004 audit_logs_iterator: AuditLogIterator,
2005 ) -> Result<(), AdapterError> {
2006 let bootstrap_start = Instant::now();
2007 info!("startup: coordinator init: bootstrap beginning");
2008 info!("startup: coordinator init: bootstrap: preamble beginning");
2009
2010 let cluster_statuses: Vec<(_, Vec<_>)> = self
2013 .catalog()
2014 .clusters()
2015 .map(|cluster| {
2016 (
2017 cluster.id(),
2018 cluster
2019 .replicas()
2020 .map(|replica| {
2021 (replica.replica_id, replica.config.location.num_processes())
2022 })
2023 .collect(),
2024 )
2025 })
2026 .collect();
2027 let now = self.now_datetime();
2028 for (cluster_id, replica_statuses) in cluster_statuses {
2029 self.cluster_replica_statuses
2030 .initialize_cluster_statuses(cluster_id);
2031 for (replica_id, num_processes) in replica_statuses {
2032 self.cluster_replica_statuses
2033 .initialize_cluster_replica_statuses(
2034 cluster_id,
2035 replica_id,
2036 num_processes,
2037 now,
2038 );
2039 }
2040 }
2041
2042 let system_config = self.catalog().system_config();
2043
2044 mz_metrics::update_dyncfg(&system_config.dyncfg_updates());
2046
2047 let compute_config = flags::compute_config(system_config);
2049 let storage_config = flags::storage_config(system_config);
2050 let scheduling_config = flags::orchestrator_scheduling_config(system_config);
2051 let dyncfg_updates = system_config.dyncfg_updates();
2052 self.controller.compute.update_configuration(compute_config);
2053 self.controller.storage.update_parameters(storage_config);
2054 self.controller
2055 .update_orchestrator_scheduling_config(scheduling_config);
2056 self.controller.update_configuration(dyncfg_updates);
2057
2058 self.validate_resource_limit_numeric(
2059 Numeric::zero(),
2060 self.current_credit_consumption_rate(),
2061 |system_vars| {
2062 self.license_key
2063 .max_credit_consumption_rate()
2064 .map_or_else(|| system_vars.max_credit_consumption_rate(), Numeric::from)
2065 },
2066 "cluster replica",
2067 MAX_CREDIT_CONSUMPTION_RATE.name(),
2068 )?;
2069
2070 let mut policies_to_set: BTreeMap<CompactionWindow, CollectionIdBundle> =
2071 Default::default();
2072
2073 let enable_worker_core_affinity =
2074 self.catalog().system_config().enable_worker_core_affinity();
2075 for instance in self.catalog.clusters() {
2076 self.controller.create_cluster(
2077 instance.id,
2078 ClusterConfig {
2079 arranged_logs: instance.log_indexes.clone(),
2080 workload_class: instance.config.workload_class.clone(),
2081 },
2082 )?;
2083 for replica in instance.replicas() {
2084 let role = instance.role();
2085 self.controller.create_replica(
2086 instance.id,
2087 replica.replica_id,
2088 instance.name.clone(),
2089 replica.name.clone(),
2090 role,
2091 replica.config.clone(),
2092 enable_worker_core_affinity,
2093 )?;
2094 }
2095 }
2096
2097 info!(
2098 "startup: coordinator init: bootstrap: preamble complete in {:?}",
2099 bootstrap_start.elapsed()
2100 );
2101
2102 let init_storage_collections_start = Instant::now();
2103 info!("startup: coordinator init: bootstrap: storage collections init beginning");
2104 self.bootstrap_storage_collections(&migrated_storage_collections_0dt)
2105 .await;
2106 info!(
2107 "startup: coordinator init: bootstrap: storage collections init complete in {:?}",
2108 init_storage_collections_start.elapsed()
2109 );
2110
2111 self.controller.start_compute_introspection_sink();
2116
2117 let sorting_start = Instant::now();
2118 info!("startup: coordinator init: bootstrap: sorting catalog entries");
2119 let entries = self.bootstrap_sort_catalog_entries();
2120 info!(
2121 "startup: coordinator init: bootstrap: sorting catalog entries complete in {:?}",
2122 sorting_start.elapsed()
2123 );
2124
2125 let optimize_dataflows_start = Instant::now();
2126 info!("startup: coordinator init: bootstrap: optimize dataflow plans beginning");
2127 let uncached_global_exps = self.bootstrap_dataflow_plans(&entries, cached_global_exprs)?;
2128 info!(
2129 "startup: coordinator init: bootstrap: optimize dataflow plans complete in {:?}",
2130 optimize_dataflows_start.elapsed()
2131 );
2132
2133 let _fut = self.catalog().update_expression_cache(
2135 uncached_local_exprs.into_iter().collect(),
2136 uncached_global_exps.into_iter().collect(),
2137 Default::default(),
2138 );
2139
2140 let bootstrap_as_ofs_start = Instant::now();
2144 info!("startup: coordinator init: bootstrap: dataflow as-of bootstrapping beginning");
2145 let dataflow_read_holds = self.bootstrap_dataflow_as_ofs().await;
2146 info!(
2147 "startup: coordinator init: bootstrap: dataflow as-of bootstrapping complete in {:?}",
2148 bootstrap_as_ofs_start.elapsed()
2149 );
2150
2151 let postamble_start = Instant::now();
2152 info!("startup: coordinator init: bootstrap: postamble beginning");
2153
2154 let logs: BTreeSet<_> = BUILTINS::logs()
2155 .map(|log| self.catalog().resolve_builtin_log(log))
2156 .flat_map(|item_id| self.catalog().get_global_ids(&item_id))
2157 .collect();
2158
2159 let mut privatelink_connections = BTreeMap::new();
2160
2161 for entry in &entries {
2162 debug!(
2163 "coordinator init: installing {} {}",
2164 entry.item().typ(),
2165 entry.id()
2166 );
2167 let mut policy = entry.item().initial_logical_compaction_window();
2168 match entry.item() {
2169 CatalogItem::Source(source) => {
2175 if source.custom_logical_compaction_window.is_none() {
2177 if let DataSourceDesc::IngestionExport { ingestion_id, .. } =
2178 source.data_source
2179 {
2180 policy = Some(
2181 self.catalog()
2182 .get_entry(&ingestion_id)
2183 .source()
2184 .expect("must be source")
2185 .custom_logical_compaction_window
2186 .unwrap_or_default(),
2187 );
2188 }
2189 }
2190 policies_to_set
2191 .entry(policy.expect("sources have a compaction window"))
2192 .or_insert_with(Default::default)
2193 .storage_ids
2194 .insert(source.global_id());
2195 }
2196 CatalogItem::Table(table) => {
2197 policies_to_set
2198 .entry(policy.expect("tables have a compaction window"))
2199 .or_insert_with(Default::default)
2200 .storage_ids
2201 .extend(table.global_ids());
2202 }
2203 CatalogItem::Index(idx) => {
2204 let policy_entry = policies_to_set
2205 .entry(policy.expect("indexes have a compaction window"))
2206 .or_insert_with(Default::default);
2207
2208 if logs.contains(&idx.on) {
2209 policy_entry
2210 .compute_ids
2211 .entry(idx.cluster_id)
2212 .or_insert_with(BTreeSet::new)
2213 .insert(idx.global_id());
2214 } else {
2215 let df_desc = self
2216 .catalog()
2217 .try_get_physical_plan(&idx.global_id())
2218 .expect("added in `bootstrap_dataflow_plans`")
2219 .clone();
2220
2221 let df_meta = self
2222 .catalog()
2223 .try_get_dataflow_metainfo(&idx.global_id())
2224 .expect("added in `bootstrap_dataflow_plans`");
2225
2226 if self.catalog().state().system_config().enable_mz_notices() {
2227 self.catalog().state().pack_optimizer_notices(
2229 &mut builtin_table_updates,
2230 df_meta.optimizer_notices.iter(),
2231 Diff::ONE,
2232 );
2233 }
2234
2235 policy_entry
2238 .compute_ids
2239 .entry(idx.cluster_id)
2240 .or_insert_with(Default::default)
2241 .extend(df_desc.export_ids());
2242
2243 self.controller
2244 .compute
2245 .create_dataflow(idx.cluster_id, df_desc, None)
2246 .unwrap_or_terminate("cannot fail to create dataflows");
2247 }
2248 }
2249 CatalogItem::View(_) => (),
2250 CatalogItem::MaterializedView(mview) => {
2251 policies_to_set
2252 .entry(policy.expect("materialized views have a compaction window"))
2253 .or_insert_with(Default::default)
2254 .storage_ids
2255 .insert(mview.global_id_writes());
2256
2257 let mut df_desc = self
2258 .catalog()
2259 .try_get_physical_plan(&mview.global_id_writes())
2260 .expect("added in `bootstrap_dataflow_plans`")
2261 .clone();
2262
2263 if let Some(initial_as_of) = mview.initial_as_of.clone() {
2264 df_desc.set_initial_as_of(initial_as_of);
2265 }
2266
2267 let until = mview
2269 .refresh_schedule
2270 .as_ref()
2271 .and_then(|s| s.last_refresh())
2272 .and_then(|r| r.try_step_forward());
2273 if let Some(until) = until {
2274 df_desc.until.meet_assign(&Antichain::from_elem(until));
2275 }
2276
2277 let df_meta = self
2278 .catalog()
2279 .try_get_dataflow_metainfo(&mview.global_id_writes())
2280 .expect("added in `bootstrap_dataflow_plans`");
2281
2282 if self.catalog().state().system_config().enable_mz_notices() {
2283 self.catalog().state().pack_optimizer_notices(
2285 &mut builtin_table_updates,
2286 df_meta.optimizer_notices.iter(),
2287 Diff::ONE,
2288 );
2289 }
2290
2291 self.ship_dataflow(df_desc, mview.cluster_id, mview.target_replica)
2292 .await;
2293
2294 if mview.replacement_target.is_none() {
2297 self.allow_writes(mview.cluster_id, mview.global_id_writes());
2298 }
2299 }
2300 CatalogItem::Sink(sink) => {
2301 policies_to_set
2302 .entry(CompactionWindow::Default)
2303 .or_insert_with(Default::default)
2304 .storage_ids
2305 .insert(sink.global_id());
2306 }
2307 CatalogItem::Connection(catalog_connection) => {
2308 if let ConnectionDetails::AwsPrivatelink(conn) = &catalog_connection.details {
2309 privatelink_connections.insert(
2310 entry.id(),
2311 VpcEndpointConfig {
2312 aws_service_name: conn.service_name.clone(),
2313 availability_zone_ids: conn.availability_zones.clone(),
2314 },
2315 );
2316 }
2317 }
2318 CatalogItem::ContinualTask(ct) => {
2319 policies_to_set
2320 .entry(policy.expect("continual tasks have a compaction window"))
2321 .or_insert_with(Default::default)
2322 .storage_ids
2323 .insert(ct.global_id());
2324
2325 let mut df_desc = self
2326 .catalog()
2327 .try_get_physical_plan(&ct.global_id())
2328 .expect("added in `bootstrap_dataflow_plans`")
2329 .clone();
2330
2331 if let Some(initial_as_of) = ct.initial_as_of.clone() {
2332 df_desc.set_initial_as_of(initial_as_of);
2333 }
2334
2335 let df_meta = self
2336 .catalog()
2337 .try_get_dataflow_metainfo(&ct.global_id())
2338 .expect("added in `bootstrap_dataflow_plans`");
2339
2340 if self.catalog().state().system_config().enable_mz_notices() {
2341 self.catalog().state().pack_optimizer_notices(
2343 &mut builtin_table_updates,
2344 df_meta.optimizer_notices.iter(),
2345 Diff::ONE,
2346 );
2347 }
2348
2349 self.ship_dataflow(df_desc, ct.cluster_id, None).await;
2350 self.allow_writes(ct.cluster_id, ct.global_id());
2351 }
2352 CatalogItem::Log(_)
2354 | CatalogItem::Type(_)
2355 | CatalogItem::Func(_)
2356 | CatalogItem::Secret(_) => {}
2357 }
2358 }
2359
2360 if let Some(cloud_resource_controller) = &self.cloud_resource_controller {
2361 let existing_vpc_endpoints = cloud_resource_controller
2363 .list_vpc_endpoints()
2364 .await
2365 .context("list vpc endpoints")?;
2366 let existing_vpc_endpoints = BTreeSet::from_iter(existing_vpc_endpoints.into_keys());
2367 let desired_vpc_endpoints = privatelink_connections.keys().cloned().collect();
2368 let vpc_endpoints_to_remove = existing_vpc_endpoints.difference(&desired_vpc_endpoints);
2369 for id in vpc_endpoints_to_remove {
2370 cloud_resource_controller
2371 .delete_vpc_endpoint(*id)
2372 .await
2373 .context("deleting extraneous vpc endpoint")?;
2374 }
2375
2376 for (id, spec) in privatelink_connections {
2378 cloud_resource_controller
2379 .ensure_vpc_endpoint(id, spec)
2380 .await
2381 .context("ensuring vpc endpoint")?;
2382 }
2383 }
2384
2385 drop(dataflow_read_holds);
2388 for (cw, policies) in policies_to_set {
2390 self.initialize_read_policies(&policies, cw).await;
2391 }
2392
2393 builtin_table_updates.extend(
2395 self.catalog().state().resolve_builtin_table_updates(
2396 self.catalog().state().pack_all_replica_size_updates(),
2397 ),
2398 );
2399
2400 debug!("startup: coordinator init: bootstrap: initializing migrated builtin tables");
2401 let migrated_updates_fut = if self.controller.read_only() {
2407 let min_timestamp = Timestamp::minimum();
2408 let migrated_builtin_table_updates: Vec<_> = builtin_table_updates
2409 .extract_if(.., |update| {
2410 let gid = self.catalog().get_entry(&update.id).latest_global_id();
2411 migrated_storage_collections_0dt.contains(&update.id)
2412 && self
2413 .controller
2414 .storage_collections
2415 .collection_frontiers(gid)
2416 .expect("all tables are registered")
2417 .write_frontier
2418 .elements()
2419 == &[min_timestamp]
2420 })
2421 .collect();
2422 if migrated_builtin_table_updates.is_empty() {
2423 futures::future::ready(()).boxed()
2424 } else {
2425 let mut grouped_appends: BTreeMap<GlobalId, Vec<TableData>> = BTreeMap::new();
2427 for update in migrated_builtin_table_updates {
2428 let gid = self.catalog().get_entry(&update.id).latest_global_id();
2429 grouped_appends.entry(gid).or_default().push(update.data);
2430 }
2431 info!(
2432 "coordinator init: rehydrating migrated builtin tables in read-only mode: {:?}",
2433 grouped_appends.keys().collect::<Vec<_>>()
2434 );
2435
2436 let mut all_appends = Vec::with_capacity(grouped_appends.len());
2438 for (item_id, table_data) in grouped_appends.into_iter() {
2439 let mut all_rows = Vec::new();
2440 let mut all_data = Vec::new();
2441 for data in table_data {
2442 match data {
2443 TableData::Rows(rows) => all_rows.extend(rows),
2444 TableData::Batches(_) => all_data.push(data),
2445 }
2446 }
2447 differential_dataflow::consolidation::consolidate(&mut all_rows);
2448 all_data.push(TableData::Rows(all_rows));
2449
2450 all_appends.push((item_id, all_data));
2452 }
2453
2454 let fut = self
2455 .controller
2456 .storage
2457 .append_table(min_timestamp, boot_ts.step_forward(), all_appends)
2458 .expect("cannot fail to append");
2459 async {
2460 fut.await
2461 .expect("One-shot shouldn't be dropped during bootstrap")
2462 .unwrap_or_terminate("cannot fail to append")
2463 }
2464 .boxed()
2465 }
2466 } else {
2467 futures::future::ready(()).boxed()
2468 };
2469
2470 info!(
2471 "startup: coordinator init: bootstrap: postamble complete in {:?}",
2472 postamble_start.elapsed()
2473 );
2474
2475 let builtin_update_start = Instant::now();
2476 info!("startup: coordinator init: bootstrap: generate builtin updates beginning");
2477
2478 if self.controller.read_only() {
2479 info!(
2480 "coordinator init: bootstrap: stashing builtin table updates while in read-only mode"
2481 );
2482
2483 let audit_join_start = Instant::now();
2485 info!("startup: coordinator init: bootstrap: audit log deserialization beginning");
2486 let audit_log_updates: Vec<_> = audit_logs_iterator
2487 .map(|(audit_log, ts)| StateUpdate {
2488 kind: StateUpdateKind::AuditLog(audit_log),
2489 ts,
2490 diff: StateDiff::Addition,
2491 })
2492 .collect();
2493 let audit_log_builtin_table_updates = self
2494 .catalog()
2495 .state()
2496 .generate_builtin_table_updates(audit_log_updates);
2497 builtin_table_updates.extend(audit_log_builtin_table_updates);
2498 info!(
2499 "startup: coordinator init: bootstrap: audit log deserialization complete in {:?}",
2500 audit_join_start.elapsed()
2501 );
2502 self.buffered_builtin_table_updates
2503 .as_mut()
2504 .expect("in read-only mode")
2505 .append(&mut builtin_table_updates);
2506 } else {
2507 self.bootstrap_tables(&entries, builtin_table_updates, audit_logs_iterator)
2508 .await;
2509 };
2510 info!(
2511 "startup: coordinator init: bootstrap: generate builtin updates complete in {:?}",
2512 builtin_update_start.elapsed()
2513 );
2514
2515 let cleanup_secrets_start = Instant::now();
2516 info!("startup: coordinator init: bootstrap: generate secret cleanup beginning");
2517 {
2521 let Self {
2524 secrets_controller,
2525 catalog,
2526 ..
2527 } = self;
2528
2529 let next_user_item_id = catalog.get_next_user_item_id().await?;
2530 let next_system_item_id = catalog.get_next_system_item_id().await?;
2531 let read_only = self.controller.read_only();
2532 let catalog_ids: BTreeSet<CatalogItemId> =
2537 catalog.entries().map(|entry| entry.id()).collect();
2538 let secrets_controller = Arc::clone(secrets_controller);
2539
2540 spawn(|| "cleanup-orphaned-secrets", async move {
2541 if read_only {
2542 info!(
2543 "coordinator init: not cleaning up orphaned secrets while in read-only mode"
2544 );
2545 return;
2546 }
2547 info!("coordinator init: cleaning up orphaned secrets");
2548
2549 match secrets_controller.list().await {
2550 Ok(controller_secrets) => {
2551 let controller_secrets: BTreeSet<CatalogItemId> =
2552 controller_secrets.into_iter().collect();
2553 let orphaned = controller_secrets.difference(&catalog_ids);
2554 for id in orphaned {
2555 let id_too_large = match id {
2556 CatalogItemId::System(id) => *id >= next_system_item_id,
2557 CatalogItemId::User(id) => *id >= next_user_item_id,
2558 CatalogItemId::IntrospectionSourceIndex(_)
2559 | CatalogItemId::Transient(_) => false,
2560 };
2561 if id_too_large {
2562 info!(
2563 %next_user_item_id, %next_system_item_id,
2564 "coordinator init: not deleting orphaned secret {id} that was likely created by a newer deploy generation"
2565 );
2566 } else {
2567 info!("coordinator init: deleting orphaned secret {id}");
2568 fail_point!("orphan_secrets");
2569 if let Err(e) = secrets_controller.delete(*id).await {
2570 warn!(
2571 "Dropping orphaned secret has encountered an error: {}",
2572 e
2573 );
2574 }
2575 }
2576 }
2577 }
2578 Err(e) => warn!("Failed to list secrets during orphan cleanup: {:?}", e),
2579 }
2580 });
2581 }
2582 info!(
2583 "startup: coordinator init: bootstrap: generate secret cleanup complete in {:?}",
2584 cleanup_secrets_start.elapsed()
2585 );
2586
2587 let final_steps_start = Instant::now();
2589 info!(
2590 "startup: coordinator init: bootstrap: migrate builtin tables in read-only mode beginning"
2591 );
2592 migrated_updates_fut
2593 .instrument(info_span!("coord::bootstrap::final"))
2594 .await;
2595
2596 debug!(
2597 "startup: coordinator init: bootstrap: announcing completion of initialization to controller"
2598 );
2599 self.controller.initialization_complete();
2601
2602 self.bootstrap_introspection_subscribes().await;
2604
2605 info!(
2606 "startup: coordinator init: bootstrap: migrate builtin tables in read-only mode complete in {:?}",
2607 final_steps_start.elapsed()
2608 );
2609
2610 info!(
2611 "startup: coordinator init: bootstrap complete in {:?}",
2612 bootstrap_start.elapsed()
2613 );
2614 Ok(())
2615 }
2616
2617 #[allow(clippy::async_yields_async)]
2622 #[instrument]
2623 async fn bootstrap_tables(
2624 &mut self,
2625 entries: &[CatalogEntry],
2626 mut builtin_table_updates: Vec<BuiltinTableUpdate>,
2627 audit_logs_iterator: AuditLogIterator,
2628 ) {
2629 struct TableMetadata<'a> {
2631 id: CatalogItemId,
2632 name: &'a QualifiedItemName,
2633 table: &'a Table,
2634 }
2635
2636 let table_metas: Vec<_> = entries
2638 .into_iter()
2639 .filter_map(|entry| {
2640 entry.table().map(|table| TableMetadata {
2641 id: entry.id(),
2642 name: entry.name(),
2643 table,
2644 })
2645 })
2646 .collect();
2647
2648 debug!("coordinator init: advancing all tables to current timestamp");
2650 let WriteTimestamp {
2651 timestamp: write_ts,
2652 advance_to,
2653 } = self.get_local_write_ts().await;
2654 let appends = table_metas
2655 .iter()
2656 .map(|meta| (meta.table.global_id_writes(), Vec::new()))
2657 .collect();
2658 let table_fence_rx = self
2662 .controller
2663 .storage
2664 .append_table(write_ts.clone(), advance_to, appends)
2665 .expect("invalid updates");
2666
2667 self.apply_local_write(write_ts).await;
2668
2669 debug!("coordinator init: resetting system tables");
2671 let read_ts = self.get_local_read_ts().await;
2672
2673 let mz_storage_usage_by_shard_schema: SchemaSpecifier = self
2676 .catalog()
2677 .resolve_system_schema(MZ_STORAGE_USAGE_BY_SHARD.schema)
2678 .into();
2679 let is_storage_usage_by_shard = |meta: &TableMetadata| -> bool {
2680 meta.name.item == MZ_STORAGE_USAGE_BY_SHARD.name
2681 && meta.name.qualifiers.schema_spec == mz_storage_usage_by_shard_schema
2682 };
2683
2684 let mut retraction_tasks = Vec::new();
2685 let mut system_tables: Vec<_> = table_metas
2686 .iter()
2687 .filter(|meta| meta.id.is_system() && !is_storage_usage_by_shard(meta))
2688 .collect();
2689
2690 let (audit_events_idx, _) = system_tables
2692 .iter()
2693 .find_position(|table| {
2694 table.id == self.catalog().resolve_builtin_table(&MZ_AUDIT_EVENTS)
2695 })
2696 .expect("mz_audit_events must exist");
2697 let audit_events = system_tables.remove(audit_events_idx);
2698 let audit_log_task = self.bootstrap_audit_log_table(
2699 audit_events.id,
2700 audit_events.name,
2701 audit_events.table,
2702 audit_logs_iterator,
2703 read_ts,
2704 );
2705
2706 for system_table in system_tables {
2707 let table_id = system_table.id;
2708 let full_name = self.catalog().resolve_full_name(system_table.name, None);
2709 debug!("coordinator init: resetting system table {full_name} ({table_id})");
2710
2711 let snapshot_fut = self
2713 .controller
2714 .storage_collections
2715 .snapshot_cursor(system_table.table.global_id_writes(), read_ts);
2716 let batch_fut = self
2717 .controller
2718 .storage_collections
2719 .create_update_builder(system_table.table.global_id_writes());
2720
2721 let task = spawn(|| format!("snapshot-{table_id}"), async move {
2722 let mut batch = batch_fut
2724 .await
2725 .unwrap_or_terminate("cannot fail to create a batch for a BuiltinTable");
2726 tracing::info!(?table_id, "starting snapshot");
2727 let mut snapshot_cursor = snapshot_fut
2729 .await
2730 .unwrap_or_terminate("cannot fail to snapshot");
2731
2732 while let Some(values) = snapshot_cursor.next().await {
2734 for (key, _t, d) in values {
2735 let d_invert = d.neg();
2736 batch.add(&key, &(), &d_invert).await;
2737 }
2738 }
2739 tracing::info!(?table_id, "finished snapshot");
2740
2741 let batch = batch.finish().await;
2742 BuiltinTableUpdate::batch(table_id, batch)
2743 });
2744 retraction_tasks.push(task);
2745 }
2746
2747 let retractions_res = futures::future::join_all(retraction_tasks).await;
2748 for retractions in retractions_res {
2749 builtin_table_updates.push(retractions);
2750 }
2751
2752 let audit_join_start = Instant::now();
2753 info!("startup: coordinator init: bootstrap: join audit log deserialization beginning");
2754 let audit_log_updates = audit_log_task.await;
2755 let audit_log_builtin_table_updates = self
2756 .catalog()
2757 .state()
2758 .generate_builtin_table_updates(audit_log_updates);
2759 builtin_table_updates.extend(audit_log_builtin_table_updates);
2760 info!(
2761 "startup: coordinator init: bootstrap: join audit log deserialization complete in {:?}",
2762 audit_join_start.elapsed()
2763 );
2764
2765 table_fence_rx
2767 .await
2768 .expect("One-shot shouldn't be dropped during bootstrap")
2769 .unwrap_or_terminate("cannot fail to append");
2770
2771 info!("coordinator init: sending builtin table updates");
2772 let (_builtin_updates_fut, write_ts) = self
2773 .builtin_table_update()
2774 .execute(builtin_table_updates)
2775 .await;
2776 info!(?write_ts, "our write ts");
2777 if let Some(write_ts) = write_ts {
2778 self.apply_local_write(write_ts).await;
2779 }
2780 }
2781
2782 #[instrument]
2786 fn bootstrap_audit_log_table<'a>(
2787 &self,
2788 table_id: CatalogItemId,
2789 name: &'a QualifiedItemName,
2790 table: &'a Table,
2791 audit_logs_iterator: AuditLogIterator,
2792 read_ts: Timestamp,
2793 ) -> JoinHandle<Vec<StateUpdate>> {
2794 let full_name = self.catalog().resolve_full_name(name, None);
2795 debug!("coordinator init: reconciling audit log: {full_name} ({table_id})");
2796 let current_contents_fut = self
2797 .controller
2798 .storage_collections
2799 .snapshot(table.global_id_writes(), read_ts);
2800 spawn(|| format!("snapshot-audit-log-{table_id}"), async move {
2801 let current_contents = current_contents_fut
2802 .await
2803 .unwrap_or_terminate("cannot fail to fetch snapshot");
2804 let contents_len = current_contents.len();
2805 debug!("coordinator init: audit log table ({table_id}) size {contents_len}");
2806
2807 let max_table_id = current_contents
2809 .into_iter()
2810 .filter(|(_, diff)| *diff == 1)
2811 .map(|(row, _diff)| row.unpack_first().unwrap_uint64())
2812 .sorted()
2813 .rev()
2814 .next();
2815
2816 audit_logs_iterator
2818 .take_while(|(audit_log, _)| match max_table_id {
2819 Some(id) => audit_log.event.sortable_id() > id,
2820 None => true,
2821 })
2822 .map(|(audit_log, ts)| StateUpdate {
2823 kind: StateUpdateKind::AuditLog(audit_log),
2824 ts,
2825 diff: StateDiff::Addition,
2826 })
2827 .collect::<Vec<_>>()
2828 })
2829 }
2830
2831 #[instrument]
2844 async fn bootstrap_storage_collections(
2845 &mut self,
2846 migrated_storage_collections: &BTreeSet<CatalogItemId>,
2847 ) {
2848 let catalog = self.catalog();
2849
2850 let source_desc = |object_id: GlobalId,
2851 data_source: &DataSourceDesc,
2852 desc: &RelationDesc,
2853 timeline: &Timeline| {
2854 let data_source = match data_source.clone() {
2855 DataSourceDesc::Ingestion { desc, cluster_id } => {
2857 let desc = desc.into_inline_connection(catalog.state());
2858 let ingestion = IngestionDescription::new(desc, cluster_id, object_id);
2859 DataSource::Ingestion(ingestion)
2860 }
2861 DataSourceDesc::OldSyntaxIngestion {
2862 desc,
2863 progress_subsource,
2864 data_config,
2865 details,
2866 cluster_id,
2867 } => {
2868 let desc = desc.into_inline_connection(catalog.state());
2869 let data_config = data_config.into_inline_connection(catalog.state());
2870 let progress_subsource =
2873 catalog.get_entry(&progress_subsource).latest_global_id();
2874 let mut ingestion =
2875 IngestionDescription::new(desc, cluster_id, progress_subsource);
2876 let legacy_export = SourceExport {
2877 storage_metadata: (),
2878 data_config,
2879 details,
2880 };
2881 ingestion.source_exports.insert(object_id, legacy_export);
2882
2883 DataSource::Ingestion(ingestion)
2884 }
2885 DataSourceDesc::IngestionExport {
2886 ingestion_id,
2887 external_reference: _,
2888 details,
2889 data_config,
2890 } => {
2891 let ingestion_id = catalog.get_entry(&ingestion_id).latest_global_id();
2894
2895 DataSource::IngestionExport {
2896 ingestion_id,
2897 details,
2898 data_config: data_config.into_inline_connection(catalog.state()),
2899 }
2900 }
2901 DataSourceDesc::Webhook { .. } => DataSource::Webhook,
2902 DataSourceDesc::Progress => DataSource::Progress,
2903 DataSourceDesc::Introspection(introspection) => {
2904 DataSource::Introspection(introspection)
2905 }
2906 DataSourceDesc::Catalog => DataSource::Other,
2907 };
2908 CollectionDescription {
2909 desc: desc.clone(),
2910 data_source,
2911 since: None,
2912 timeline: Some(timeline.clone()),
2913 primary: None,
2914 }
2915 };
2916
2917 let mut compute_collections = vec![];
2918 let mut collections = vec![];
2919 let mut new_builtin_continual_tasks = vec![];
2920 for entry in catalog.entries() {
2921 match entry.item() {
2922 CatalogItem::Source(source) => {
2923 collections.push((
2924 source.global_id(),
2925 source_desc(
2926 source.global_id(),
2927 &source.data_source,
2928 &source.desc,
2929 &source.timeline,
2930 ),
2931 ));
2932 }
2933 CatalogItem::Table(table) => {
2934 match &table.data_source {
2935 TableDataSource::TableWrites { defaults: _ } => {
2936 let versions: BTreeMap<_, _> = table
2937 .collection_descs()
2938 .map(|(gid, version, desc)| (version, (gid, desc)))
2939 .collect();
2940 let collection_descs = versions.iter().map(|(version, (gid, desc))| {
2941 let next_version = version.bump();
2942 let primary_collection =
2943 versions.get(&next_version).map(|(gid, _desc)| gid).copied();
2944 let mut collection_desc =
2945 CollectionDescription::for_table(desc.clone());
2946 collection_desc.primary = primary_collection;
2947
2948 (*gid, collection_desc)
2949 });
2950 collections.extend(collection_descs);
2951 }
2952 TableDataSource::DataSource {
2953 desc: data_source_desc,
2954 timeline,
2955 } => {
2956 soft_assert_eq_or_log!(table.collections.len(), 1);
2958 let collection_descs =
2959 table.collection_descs().map(|(gid, _version, desc)| {
2960 (
2961 gid,
2962 source_desc(
2963 entry.latest_global_id(),
2964 data_source_desc,
2965 &desc,
2966 timeline,
2967 ),
2968 )
2969 });
2970 collections.extend(collection_descs);
2971 }
2972 };
2973 }
2974 CatalogItem::MaterializedView(mv) => {
2975 let collection_descs = mv.collection_descs().map(|(gid, _version, desc)| {
2976 let collection_desc =
2977 CollectionDescription::for_other(desc, mv.initial_as_of.clone());
2978 (gid, collection_desc)
2979 });
2980
2981 collections.extend(collection_descs);
2982 compute_collections.push((mv.global_id_writes(), mv.desc.latest()));
2983 }
2984 CatalogItem::ContinualTask(ct) => {
2985 let collection_desc =
2986 CollectionDescription::for_other(ct.desc.clone(), ct.initial_as_of.clone());
2987 if ct.global_id().is_system() && collection_desc.since.is_none() {
2988 new_builtin_continual_tasks.push((ct.global_id(), collection_desc));
2992 } else {
2993 compute_collections.push((ct.global_id(), ct.desc.clone()));
2994 collections.push((ct.global_id(), collection_desc));
2995 }
2996 }
2997 CatalogItem::Sink(sink) => {
2998 let storage_sink_from_entry = self.catalog().get_entry_by_global_id(&sink.from);
2999 let from_desc = storage_sink_from_entry
3000 .relation_desc()
3001 .expect("sinks can only be built on items with descs")
3002 .into_owned();
3003 let collection_desc = CollectionDescription {
3004 desc: KAFKA_PROGRESS_DESC.clone(),
3006 data_source: DataSource::Sink {
3007 desc: ExportDescription {
3008 sink: StorageSinkDesc {
3009 from: sink.from,
3010 from_desc,
3011 connection: sink
3012 .connection
3013 .clone()
3014 .into_inline_connection(self.catalog().state()),
3015 envelope: sink.envelope,
3016 as_of: Antichain::from_elem(Timestamp::minimum()),
3017 with_snapshot: sink.with_snapshot,
3018 version: sink.version,
3019 from_storage_metadata: (),
3020 to_storage_metadata: (),
3021 commit_interval: sink.commit_interval,
3022 },
3023 instance_id: sink.cluster_id,
3024 },
3025 },
3026 since: None,
3027 timeline: None,
3028 primary: None,
3029 };
3030 collections.push((sink.global_id, collection_desc));
3031 }
3032 _ => (),
3033 }
3034 }
3035
3036 let register_ts = if self.controller.read_only() {
3037 self.get_local_read_ts().await
3038 } else {
3039 self.get_local_write_ts().await.timestamp
3042 };
3043
3044 let storage_metadata = self.catalog.state().storage_metadata();
3045 let migrated_storage_collections = migrated_storage_collections
3046 .into_iter()
3047 .flat_map(|item_id| self.catalog.get_entry(item_id).global_ids())
3048 .collect();
3049
3050 self.controller
3055 .storage
3056 .evolve_nullability_for_bootstrap(storage_metadata, compute_collections)
3057 .await
3058 .unwrap_or_terminate("cannot fail to evolve collections");
3059
3060 self.controller
3061 .storage
3062 .create_collections_for_bootstrap(
3063 storage_metadata,
3064 Some(register_ts),
3065 collections,
3066 &migrated_storage_collections,
3067 )
3068 .await
3069 .unwrap_or_terminate("cannot fail to create collections");
3070
3071 self.bootstrap_builtin_continual_tasks(new_builtin_continual_tasks)
3072 .await;
3073
3074 if !self.controller.read_only() {
3075 self.apply_local_write(register_ts).await;
3076 }
3077 }
3078
3079 async fn bootstrap_builtin_continual_tasks(
3086 &mut self,
3087 mut collections: Vec<(GlobalId, CollectionDescription<Timestamp>)>,
3089 ) {
3090 for (id, collection) in &mut collections {
3091 let entry = self.catalog.get_entry_by_global_id(id);
3092 let ct = match &entry.item {
3093 CatalogItem::ContinualTask(ct) => ct.clone(),
3094 _ => unreachable!("only called with continual task builtins"),
3095 };
3096 let debug_name = self
3097 .catalog()
3098 .resolve_full_name(entry.name(), None)
3099 .to_string();
3100 let (_optimized_plan, physical_plan, _metainfo, _optimizer_features) = self
3101 .optimize_create_continual_task(&ct, *id, self.owned_catalog(), debug_name)
3102 .expect("builtin CT should optimize successfully");
3103
3104 let mut id_bundle = dataflow_import_id_bundle(&physical_plan, ct.cluster_id);
3106 id_bundle.storage_ids.remove(id);
3108 let read_holds = self.acquire_read_holds(&id_bundle);
3109 let as_of = read_holds.least_valid_read();
3110
3111 collection.since = Some(as_of.clone());
3112 }
3113 self.controller
3114 .storage
3115 .create_collections(self.catalog.state().storage_metadata(), None, collections)
3116 .await
3117 .unwrap_or_terminate("cannot fail to create collections");
3118 }
3119
3120 fn bootstrap_sort_catalog_entries(&self) -> Vec<CatalogEntry> {
3127 let mut indexes_on = BTreeMap::<_, Vec<_>>::new();
3128 let mut non_indexes = Vec::new();
3129 for entry in self.catalog().entries().cloned() {
3130 if let Some(index) = entry.index() {
3131 let on = self.catalog().get_entry_by_global_id(&index.on);
3132 indexes_on.entry(on.id()).or_default().push(entry);
3133 } else {
3134 non_indexes.push(entry);
3135 }
3136 }
3137
3138 let key_fn = |entry: &CatalogEntry| entry.id;
3139 let dependencies_fn = |entry: &CatalogEntry| entry.uses();
3140 sort_topological(&mut non_indexes, key_fn, dependencies_fn);
3141
3142 let mut result = Vec::new();
3143 for entry in non_indexes {
3144 let id = entry.id();
3145 result.push(entry);
3146 if let Some(mut indexes) = indexes_on.remove(&id) {
3147 result.append(&mut indexes);
3148 }
3149 }
3150
3151 soft_assert_or_log!(
3152 indexes_on.is_empty(),
3153 "indexes with missing dependencies: {indexes_on:?}",
3154 );
3155
3156 result
3157 }
3158
3159 #[instrument]
3170 fn bootstrap_dataflow_plans(
3171 &mut self,
3172 ordered_catalog_entries: &[CatalogEntry],
3173 mut cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
3174 ) -> Result<BTreeMap<GlobalId, GlobalExpressions>, AdapterError> {
3175 let mut instance_snapshots = BTreeMap::new();
3181 let mut uncached_expressions = BTreeMap::new();
3182
3183 let optimizer_config = |catalog: &Catalog, cluster_id| {
3184 let system_config = catalog.system_config();
3185 let overrides = catalog.get_cluster(cluster_id).config.features();
3186 OptimizerConfig::from(system_config).override_from(&overrides)
3187 };
3188
3189 for entry in ordered_catalog_entries {
3190 match entry.item() {
3191 CatalogItem::Index(idx) => {
3192 let compute_instance =
3194 instance_snapshots.entry(idx.cluster_id).or_insert_with(|| {
3195 self.instance_snapshot(idx.cluster_id)
3196 .expect("compute instance exists")
3197 });
3198 let global_id = idx.global_id();
3199
3200 if compute_instance.contains_collection(&global_id) {
3203 continue;
3204 }
3205
3206 let optimizer_config = optimizer_config(&self.catalog, idx.cluster_id);
3207
3208 let (optimized_plan, physical_plan, metainfo) =
3209 match cached_global_exprs.remove(&global_id) {
3210 Some(global_expressions)
3211 if global_expressions.optimizer_features
3212 == optimizer_config.features =>
3213 {
3214 debug!("global expression cache hit for {global_id:?}");
3215 (
3216 global_expressions.global_mir,
3217 global_expressions.physical_plan,
3218 global_expressions.dataflow_metainfos,
3219 )
3220 }
3221 Some(_) | None => {
3222 let (optimized_plan, global_lir_plan) = {
3223 let mut optimizer = optimize::index::Optimizer::new(
3225 self.owned_catalog(),
3226 compute_instance.clone(),
3227 global_id,
3228 optimizer_config.clone(),
3229 self.optimizer_metrics(),
3230 );
3231
3232 let index_plan = optimize::index::Index::new(
3234 entry.name().clone(),
3235 idx.on,
3236 idx.keys.to_vec(),
3237 );
3238 let global_mir_plan = optimizer.optimize(index_plan)?;
3239 let optimized_plan = global_mir_plan.df_desc().clone();
3240
3241 let global_lir_plan = optimizer.optimize(global_mir_plan)?;
3243
3244 (optimized_plan, global_lir_plan)
3245 };
3246
3247 let (physical_plan, metainfo) = global_lir_plan.unapply();
3248 let metainfo = {
3249 let notice_ids =
3251 std::iter::repeat_with(|| self.allocate_transient_id())
3252 .map(|(_item_id, gid)| gid)
3253 .take(metainfo.optimizer_notices.len())
3254 .collect::<Vec<_>>();
3255 self.catalog().render_notices(
3257 metainfo,
3258 notice_ids,
3259 Some(idx.global_id()),
3260 )
3261 };
3262 uncached_expressions.insert(
3263 global_id,
3264 GlobalExpressions {
3265 global_mir: optimized_plan.clone(),
3266 physical_plan: physical_plan.clone(),
3267 dataflow_metainfos: metainfo.clone(),
3268 optimizer_features: optimizer_config.features.clone(),
3269 },
3270 );
3271 (optimized_plan, physical_plan, metainfo)
3272 }
3273 };
3274
3275 let catalog = self.catalog_mut();
3276 catalog.set_optimized_plan(idx.global_id(), optimized_plan);
3277 catalog.set_physical_plan(idx.global_id(), physical_plan);
3278 catalog.set_dataflow_metainfo(idx.global_id(), metainfo);
3279
3280 compute_instance.insert_collection(idx.global_id());
3281 }
3282 CatalogItem::MaterializedView(mv) => {
3283 let compute_instance =
3285 instance_snapshots.entry(mv.cluster_id).or_insert_with(|| {
3286 self.instance_snapshot(mv.cluster_id)
3287 .expect("compute instance exists")
3288 });
3289 let global_id = mv.global_id_writes();
3290
3291 let optimizer_config = optimizer_config(&self.catalog, mv.cluster_id);
3292
3293 let (optimized_plan, physical_plan, metainfo) =
3294 match cached_global_exprs.remove(&global_id) {
3295 Some(global_expressions)
3296 if global_expressions.optimizer_features
3297 == optimizer_config.features =>
3298 {
3299 debug!("global expression cache hit for {global_id:?}");
3300 (
3301 global_expressions.global_mir,
3302 global_expressions.physical_plan,
3303 global_expressions.dataflow_metainfos,
3304 )
3305 }
3306 Some(_) | None => {
3307 let (_, internal_view_id) = self.allocate_transient_id();
3308 let debug_name = self
3309 .catalog()
3310 .resolve_full_name(entry.name(), None)
3311 .to_string();
3312 let force_non_monotonic = Default::default();
3313
3314 let (optimized_plan, global_lir_plan) = {
3315 let mut optimizer = optimize::materialized_view::Optimizer::new(
3317 self.owned_catalog().as_optimizer_catalog(),
3318 compute_instance.clone(),
3319 global_id,
3320 internal_view_id,
3321 mv.desc.latest().iter_names().cloned().collect(),
3322 mv.non_null_assertions.clone(),
3323 mv.refresh_schedule.clone(),
3324 debug_name,
3325 optimizer_config.clone(),
3326 self.optimizer_metrics(),
3327 force_non_monotonic,
3328 );
3329
3330 let typ = infer_sql_type_for_catalog(
3333 &mv.raw_expr,
3334 &mv.optimized_expr.as_ref().clone(),
3335 );
3336 let global_mir_plan = optimizer
3337 .optimize((mv.optimized_expr.as_ref().clone(), typ))?;
3338 let optimized_plan = global_mir_plan.df_desc().clone();
3339
3340 let global_lir_plan = optimizer.optimize(global_mir_plan)?;
3342
3343 (optimized_plan, global_lir_plan)
3344 };
3345
3346 let (physical_plan, metainfo) = global_lir_plan.unapply();
3347 let metainfo = {
3348 let notice_ids =
3350 std::iter::repeat_with(|| self.allocate_transient_id())
3351 .map(|(_item_id, global_id)| global_id)
3352 .take(metainfo.optimizer_notices.len())
3353 .collect::<Vec<_>>();
3354 self.catalog().render_notices(
3356 metainfo,
3357 notice_ids,
3358 Some(mv.global_id_writes()),
3359 )
3360 };
3361 uncached_expressions.insert(
3362 global_id,
3363 GlobalExpressions {
3364 global_mir: optimized_plan.clone(),
3365 physical_plan: physical_plan.clone(),
3366 dataflow_metainfos: metainfo.clone(),
3367 optimizer_features: optimizer_config.features.clone(),
3368 },
3369 );
3370 (optimized_plan, physical_plan, metainfo)
3371 }
3372 };
3373
3374 let catalog = self.catalog_mut();
3375 catalog.set_optimized_plan(mv.global_id_writes(), optimized_plan);
3376 catalog.set_physical_plan(mv.global_id_writes(), physical_plan);
3377 catalog.set_dataflow_metainfo(mv.global_id_writes(), metainfo);
3378
3379 compute_instance.insert_collection(mv.global_id_writes());
3380 }
3381 CatalogItem::ContinualTask(ct) => {
3382 let compute_instance =
3383 instance_snapshots.entry(ct.cluster_id).or_insert_with(|| {
3384 self.instance_snapshot(ct.cluster_id)
3385 .expect("compute instance exists")
3386 });
3387 let global_id = ct.global_id();
3388
3389 let optimizer_config = optimizer_config(&self.catalog, ct.cluster_id);
3390
3391 let (optimized_plan, physical_plan, metainfo) =
3392 match cached_global_exprs.remove(&global_id) {
3393 Some(global_expressions)
3394 if global_expressions.optimizer_features
3395 == optimizer_config.features =>
3396 {
3397 debug!("global expression cache hit for {global_id:?}");
3398 (
3399 global_expressions.global_mir,
3400 global_expressions.physical_plan,
3401 global_expressions.dataflow_metainfos,
3402 )
3403 }
3404 Some(_) | None => {
3405 let debug_name = self
3406 .catalog()
3407 .resolve_full_name(entry.name(), None)
3408 .to_string();
3409 let (optimized_plan, physical_plan, metainfo, optimizer_features) =
3410 self.optimize_create_continual_task(
3411 ct,
3412 global_id,
3413 self.owned_catalog(),
3414 debug_name,
3415 )?;
3416 uncached_expressions.insert(
3417 global_id,
3418 GlobalExpressions {
3419 global_mir: optimized_plan.clone(),
3420 physical_plan: physical_plan.clone(),
3421 dataflow_metainfos: metainfo.clone(),
3422 optimizer_features,
3423 },
3424 );
3425 (optimized_plan, physical_plan, metainfo)
3426 }
3427 };
3428
3429 let catalog = self.catalog_mut();
3430 catalog.set_optimized_plan(ct.global_id(), optimized_plan);
3431 catalog.set_physical_plan(ct.global_id(), physical_plan);
3432 catalog.set_dataflow_metainfo(ct.global_id(), metainfo);
3433
3434 compute_instance.insert_collection(ct.global_id());
3435 }
3436 _ => (),
3437 }
3438 }
3439
3440 Ok(uncached_expressions)
3441 }
3442
3443 async fn bootstrap_dataflow_as_ofs(&mut self) -> BTreeMap<GlobalId, ReadHold<Timestamp>> {
3453 let mut catalog_ids = Vec::new();
3454 let mut dataflows = Vec::new();
3455 let mut read_policies = BTreeMap::new();
3456 for entry in self.catalog.entries() {
3457 let gid = match entry.item() {
3458 CatalogItem::Index(idx) => idx.global_id(),
3459 CatalogItem::MaterializedView(mv) => mv.global_id_writes(),
3460 CatalogItem::ContinualTask(ct) => ct.global_id(),
3461 CatalogItem::Table(_)
3462 | CatalogItem::Source(_)
3463 | CatalogItem::Log(_)
3464 | CatalogItem::View(_)
3465 | CatalogItem::Sink(_)
3466 | CatalogItem::Type(_)
3467 | CatalogItem::Func(_)
3468 | CatalogItem::Secret(_)
3469 | CatalogItem::Connection(_) => continue,
3470 };
3471 if let Some(plan) = self.catalog.try_get_physical_plan(&gid) {
3472 catalog_ids.push(gid);
3473 dataflows.push(plan.clone());
3474
3475 if let Some(compaction_window) = entry.item().initial_logical_compaction_window() {
3476 read_policies.insert(gid, compaction_window.into());
3477 }
3478 }
3479 }
3480
3481 let read_ts = self.get_local_read_ts().await;
3482 let read_holds = as_of_selection::run(
3483 &mut dataflows,
3484 &read_policies,
3485 &*self.controller.storage_collections,
3486 read_ts,
3487 self.controller.read_only(),
3488 );
3489
3490 let catalog = self.catalog_mut();
3491 for (id, plan) in catalog_ids.into_iter().zip_eq(dataflows) {
3492 catalog.set_physical_plan(id, plan);
3493 }
3494
3495 read_holds
3496 }
3497
3498 fn serve(
3507 mut self,
3508 mut internal_cmd_rx: mpsc::UnboundedReceiver<Message>,
3509 mut strict_serializable_reads_rx: mpsc::UnboundedReceiver<(ConnectionId, PendingReadTxn)>,
3510 mut cmd_rx: mpsc::UnboundedReceiver<(OpenTelemetryContext, Command)>,
3511 group_commit_rx: appends::GroupCommitWaiter,
3512 ) -> LocalBoxFuture<'static, ()> {
3513 async move {
3514 let mut cluster_events = self.controller.events_stream();
3516 let last_message = Arc::new(Mutex::new(LastMessage {
3517 kind: "none",
3518 stmt: None,
3519 }));
3520
3521 let (idle_tx, mut idle_rx) = tokio::sync::mpsc::channel(1);
3522 let idle_metric = self.metrics.queue_busy_seconds.clone();
3523 let last_message_watchdog = Arc::clone(&last_message);
3524
3525 spawn(|| "coord watchdog", async move {
3526 let mut interval = tokio::time::interval(Duration::from_secs(5));
3531 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
3535
3536 let mut coord_stuck = false;
3538
3539 loop {
3540 interval.tick().await;
3541
3542 let duration = tokio::time::Duration::from_secs(30);
3544 let timeout = tokio::time::timeout(duration, idle_tx.reserve()).await;
3545 let Ok(maybe_permit) = timeout else {
3546 if !coord_stuck {
3548 let last_message = last_message_watchdog.lock().expect("poisoned");
3549 tracing::warn!(
3550 last_message_kind = %last_message.kind,
3551 last_message_sql = %last_message.stmt_to_string(),
3552 "coordinator stuck for {duration:?}",
3553 );
3554 }
3555 coord_stuck = true;
3556
3557 continue;
3558 };
3559
3560 if coord_stuck {
3562 tracing::info!("Coordinator became unstuck");
3563 }
3564 coord_stuck = false;
3565
3566 let Ok(permit) = maybe_permit else {
3568 break;
3569 };
3570
3571 permit.send(idle_metric.start_timer());
3572 }
3573 });
3574
3575 self.schedule_storage_usage_collection().await;
3576 self.spawn_privatelink_vpc_endpoints_watch_task();
3577 self.spawn_statement_logging_task();
3578 flags::tracing_config(self.catalog.system_config()).apply(&self.tracing_handle);
3579
3580 let warn_threshold = self
3582 .catalog()
3583 .system_config()
3584 .coord_slow_message_warn_threshold();
3585
3586 const MESSAGE_BATCH: usize = 64;
3588 let mut messages = Vec::with_capacity(MESSAGE_BATCH);
3589 let mut cmd_messages = Vec::with_capacity(MESSAGE_BATCH);
3590
3591 let message_batch = self.metrics.message_batch.clone();
3592
3593 loop {
3594 select! {
3598 biased;
3603
3604 _ = internal_cmd_rx.recv_many(&mut messages, MESSAGE_BATCH) => {},
3608 Some(event) = cluster_events.next() => {
3612 messages.push(Message::ClusterEvent(event))
3613 },
3614 () = self.controller.ready() => {
3618 let controller = match self.controller.get_readiness() {
3622 Readiness::Storage => ControllerReadiness::Storage,
3623 Readiness::Compute => ControllerReadiness::Compute,
3624 Readiness::Metrics(_) => ControllerReadiness::Metrics,
3625 Readiness::Internal(_) => ControllerReadiness::Internal,
3626 Readiness::NotReady => unreachable!("just signaled as ready"),
3627 };
3628 messages.push(Message::ControllerReady { controller });
3629 }
3630 permit = group_commit_rx.ready() => {
3633 let user_write_spans = self.pending_writes.iter().flat_map(|x| match x {
3639 PendingWriteTxn::User{span, ..} => Some(span),
3640 PendingWriteTxn::System{..} => None,
3641 });
3642 let span = match user_write_spans.exactly_one() {
3643 Ok(span) => span.clone(),
3644 Err(user_write_spans) => {
3645 let span = info_span!(parent: None, "group_commit_notify");
3646 for s in user_write_spans {
3647 span.follows_from(s);
3648 }
3649 span
3650 }
3651 };
3652 messages.push(Message::GroupCommitInitiate(span, Some(permit)));
3653 },
3654 count = cmd_rx.recv_many(&mut cmd_messages, MESSAGE_BATCH) => {
3658 if count == 0 {
3659 break;
3660 } else {
3661 messages.extend(cmd_messages.drain(..).map(
3662 |(otel_ctx, cmd)| Message::Command(otel_ctx, cmd),
3663 ));
3664 }
3665 },
3666 Some(pending_read_txn) = strict_serializable_reads_rx.recv() => {
3670 let mut pending_read_txns = vec![pending_read_txn];
3671 while let Ok(pending_read_txn) = strict_serializable_reads_rx.try_recv() {
3672 pending_read_txns.push(pending_read_txn);
3673 }
3674 for (conn_id, pending_read_txn) in pending_read_txns {
3675 let prev = self
3676 .pending_linearize_read_txns
3677 .insert(conn_id, pending_read_txn);
3678 soft_assert_or_log!(
3679 prev.is_none(),
3680 "connections can not have multiple concurrent reads, prev: {prev:?}"
3681 )
3682 }
3683 messages.push(Message::LinearizeReads);
3684 }
3685 _ = self.advance_timelines_interval.tick() => {
3689 let span = info_span!(parent: None, "coord::advance_timelines_interval");
3690 span.follows_from(Span::current());
3691
3692 if self.controller.read_only() {
3697 messages.push(Message::AdvanceTimelines);
3698 } else {
3699 messages.push(Message::GroupCommitInitiate(span, None));
3700 }
3701 },
3702 _ = self.check_cluster_scheduling_policies_interval.tick() => {
3706 messages.push(Message::CheckSchedulingPolicies);
3707 },
3708
3709 _ = self.caught_up_check_interval.tick() => {
3713 self.maybe_check_caught_up().await;
3718
3719 continue;
3720 },
3721
3722 timer = idle_rx.recv() => {
3727 timer.expect("does not drop").observe_duration();
3728 self.metrics
3729 .message_handling
3730 .with_label_values(&["watchdog"])
3731 .observe(0.0);
3732 continue;
3733 }
3734 };
3735
3736 message_batch.observe(f64::cast_lossy(messages.len()));
3738
3739 for msg in messages.drain(..) {
3740 let msg_kind = msg.kind();
3743 let span = span!(
3744 target: "mz_adapter::coord::handle_message_loop",
3745 Level::INFO,
3746 "coord::handle_message",
3747 kind = msg_kind
3748 );
3749 let otel_context = span.context().span().span_context().clone();
3750
3751 *last_message.lock().expect("poisoned") = LastMessage {
3755 kind: msg_kind,
3756 stmt: match &msg {
3757 Message::Command(
3758 _,
3759 Command::Execute {
3760 portal_name,
3761 session,
3762 ..
3763 },
3764 ) => session
3765 .get_portal_unverified(portal_name)
3766 .and_then(|p| p.stmt.as_ref().map(Arc::clone)),
3767 _ => None,
3768 },
3769 };
3770
3771 let start = Instant::now();
3772 self.handle_message(msg).instrument(span).await;
3773 let duration = start.elapsed();
3774
3775 self.metrics
3776 .message_handling
3777 .with_label_values(&[msg_kind])
3778 .observe(duration.as_secs_f64());
3779
3780 if duration > warn_threshold {
3782 let trace_id = otel_context.is_valid().then(|| otel_context.trace_id());
3783 tracing::error!(
3784 ?msg_kind,
3785 ?trace_id,
3786 ?duration,
3787 "very slow coordinator message"
3788 );
3789 }
3790 }
3791 }
3792 if let Some(catalog) = Arc::into_inner(self.catalog) {
3795 catalog.expire().await;
3796 }
3797 }
3798 .boxed_local()
3799 }
3800
3801 fn catalog(&self) -> &Catalog {
3803 &self.catalog
3804 }
3805
3806 fn owned_catalog(&self) -> Arc<Catalog> {
3809 Arc::clone(&self.catalog)
3810 }
3811
3812 fn optimizer_metrics(&self) -> OptimizerMetrics {
3815 self.optimizer_metrics.clone()
3816 }
3817
3818 fn catalog_mut(&mut self) -> &mut Catalog {
3820 Arc::make_mut(&mut self.catalog)
3828 }
3829
3830 async fn refill_user_id_pool(&mut self, min_count: u64) -> Result<(), AdapterError> {
3835 let batch_size = USER_ID_POOL_BATCH_SIZE.get(self.catalog().system_config().dyncfgs());
3836 let to_allocate = min_count.max(u64::from(batch_size));
3837 let id_ts = self.get_catalog_write_ts().await;
3838 let ids = self.catalog().allocate_user_ids(to_allocate, id_ts).await?;
3839 if let (Some((first_id, _)), Some((last_id, _))) = (ids.first(), ids.last()) {
3840 let start = match first_id {
3841 CatalogItemId::User(id) => *id,
3842 other => {
3843 return Err(AdapterError::Internal(format!(
3844 "expected User CatalogItemId, got {other:?}"
3845 )));
3846 }
3847 };
3848 let end = match last_id {
3849 CatalogItemId::User(id) => *id + 1, other => {
3851 return Err(AdapterError::Internal(format!(
3852 "expected User CatalogItemId, got {other:?}"
3853 )));
3854 }
3855 };
3856 self.user_id_pool.refill(start, end);
3857 } else {
3858 return Err(AdapterError::Internal(
3859 "catalog returned no user IDs".into(),
3860 ));
3861 }
3862 Ok(())
3863 }
3864
3865 async fn allocate_user_id(&mut self) -> Result<(CatalogItemId, GlobalId), AdapterError> {
3867 if let Some(id) = self.user_id_pool.allocate() {
3868 return Ok((CatalogItemId::User(id), GlobalId::User(id)));
3869 }
3870 self.refill_user_id_pool(1).await?;
3871 let id = self.user_id_pool.allocate().expect("ID pool just refilled");
3872 Ok((CatalogItemId::User(id), GlobalId::User(id)))
3873 }
3874
3875 async fn allocate_user_ids(
3877 &mut self,
3878 count: u64,
3879 ) -> Result<Vec<(CatalogItemId, GlobalId)>, AdapterError> {
3880 if self.user_id_pool.remaining() < count {
3881 self.refill_user_id_pool(count).await?;
3882 }
3883 let raw_ids = self
3884 .user_id_pool
3885 .allocate_many(count)
3886 .expect("pool has enough IDs after refill");
3887 Ok(raw_ids
3888 .into_iter()
3889 .map(|id| (CatalogItemId::User(id), GlobalId::User(id)))
3890 .collect())
3891 }
3892
3893 fn connection_context(&self) -> &ConnectionContext {
3895 self.controller.connection_context()
3896 }
3897
3898 fn secrets_reader(&self) -> &Arc<dyn SecretsReader> {
3900 &self.connection_context().secrets_reader
3901 }
3902
3903 #[allow(dead_code)]
3908 pub(crate) fn broadcast_notice(&self, notice: AdapterNotice) {
3909 for meta in self.active_conns.values() {
3910 let _ = meta.notice_tx.send(notice.clone());
3911 }
3912 }
3913
3914 pub(crate) fn broadcast_notice_tx(
3917 &self,
3918 ) -> Box<dyn FnOnce(AdapterNotice) -> () + Send + 'static> {
3919 let senders: Vec<_> = self
3920 .active_conns
3921 .values()
3922 .map(|meta| meta.notice_tx.clone())
3923 .collect();
3924 Box::new(move |notice| {
3925 for tx in senders {
3926 let _ = tx.send(notice.clone());
3927 }
3928 })
3929 }
3930
3931 pub(crate) fn active_conns(&self) -> &BTreeMap<ConnectionId, ConnMeta> {
3932 &self.active_conns
3933 }
3934
3935 #[instrument(level = "debug")]
3936 pub(crate) fn retire_execution(
3937 &mut self,
3938 reason: StatementEndedExecutionReason,
3939 ctx_extra: ExecuteContextExtra,
3940 ) {
3941 if let Some(uuid) = ctx_extra.retire() {
3942 self.end_statement_execution(uuid, reason);
3943 }
3944 }
3945
3946 #[instrument(level = "debug")]
3948 pub fn dataflow_builder(&self, instance: ComputeInstanceId) -> DataflowBuilder<'_> {
3949 let compute = self
3950 .instance_snapshot(instance)
3951 .expect("compute instance does not exist");
3952 DataflowBuilder::new(self.catalog().state(), compute)
3953 }
3954
3955 pub fn instance_snapshot(
3957 &self,
3958 id: ComputeInstanceId,
3959 ) -> Result<ComputeInstanceSnapshot, InstanceMissing> {
3960 ComputeInstanceSnapshot::new(&self.controller, id)
3961 }
3962
3963 pub(crate) async fn ship_dataflow(
3970 &mut self,
3971 dataflow: DataflowDescription<Plan>,
3972 instance: ComputeInstanceId,
3973 target_replica: Option<ReplicaId>,
3974 ) {
3975 self.try_ship_dataflow(dataflow, instance, target_replica)
3976 .await
3977 .unwrap_or_terminate("dataflow creation cannot fail");
3978 }
3979
3980 pub(crate) async fn try_ship_dataflow(
3983 &mut self,
3984 dataflow: DataflowDescription<Plan>,
3985 instance: ComputeInstanceId,
3986 target_replica: Option<ReplicaId>,
3987 ) -> Result<(), DataflowCreationError> {
3988 let export_ids = dataflow.exported_index_ids().collect();
3991
3992 self.controller
3993 .compute
3994 .create_dataflow(instance, dataflow, target_replica)?;
3995
3996 self.initialize_compute_read_policies(export_ids, instance, CompactionWindow::Default)
3997 .await;
3998
3999 Ok(())
4000 }
4001
4002 pub(crate) fn allow_writes(&mut self, instance: ComputeInstanceId, id: GlobalId) {
4006 self.controller
4007 .compute
4008 .allow_writes(instance, id)
4009 .unwrap_or_terminate("allow_writes cannot fail");
4010 }
4011
4012 pub(crate) async fn ship_dataflow_and_notice_builtin_table_updates(
4014 &mut self,
4015 dataflow: DataflowDescription<Plan>,
4016 instance: ComputeInstanceId,
4017 notice_builtin_updates_fut: Option<BuiltinTableAppendNotify>,
4018 target_replica: Option<ReplicaId>,
4019 ) {
4020 if let Some(notice_builtin_updates_fut) = notice_builtin_updates_fut {
4021 let ship_dataflow_fut = self.ship_dataflow(dataflow, instance, target_replica);
4022 let ((), ()) =
4023 futures::future::join(notice_builtin_updates_fut, ship_dataflow_fut).await;
4024 } else {
4025 self.ship_dataflow(dataflow, instance, target_replica).await;
4026 }
4027 }
4028
4029 pub fn install_compute_watch_set(
4033 &mut self,
4034 conn_id: ConnectionId,
4035 objects: BTreeSet<GlobalId>,
4036 t: Timestamp,
4037 state: WatchSetResponse,
4038 ) -> Result<(), CollectionLookupError> {
4039 let ws_id = self.controller.install_compute_watch_set(objects, t)?;
4040 self.connection_watch_sets
4041 .entry(conn_id.clone())
4042 .or_default()
4043 .insert(ws_id);
4044 self.installed_watch_sets.insert(ws_id, (conn_id, state));
4045 Ok(())
4046 }
4047
4048 pub fn install_storage_watch_set(
4052 &mut self,
4053 conn_id: ConnectionId,
4054 objects: BTreeSet<GlobalId>,
4055 t: Timestamp,
4056 state: WatchSetResponse,
4057 ) -> Result<(), CollectionMissing> {
4058 let ws_id = self.controller.install_storage_watch_set(objects, t)?;
4059 self.connection_watch_sets
4060 .entry(conn_id.clone())
4061 .or_default()
4062 .insert(ws_id);
4063 self.installed_watch_sets.insert(ws_id, (conn_id, state));
4064 Ok(())
4065 }
4066
4067 pub fn cancel_pending_watchsets(&mut self, conn_id: &ConnectionId) {
4069 if let Some(ws_ids) = self.connection_watch_sets.remove(conn_id) {
4070 for ws_id in ws_ids {
4071 self.installed_watch_sets.remove(&ws_id);
4072 }
4073 }
4074 }
4075
4076 pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
4080 let global_timelines: BTreeMap<_, _> = self
4086 .global_timelines
4087 .iter()
4088 .map(|(timeline, state)| (timeline.to_string(), format!("{state:?}")))
4089 .collect();
4090 let active_conns: BTreeMap<_, _> = self
4091 .active_conns
4092 .iter()
4093 .map(|(id, meta)| (id.unhandled().to_string(), format!("{meta:?}")))
4094 .collect();
4095 let txn_read_holds: BTreeMap<_, _> = self
4096 .txn_read_holds
4097 .iter()
4098 .map(|(id, capability)| (id.unhandled().to_string(), format!("{capability:?}")))
4099 .collect();
4100 let pending_peeks: BTreeMap<_, _> = self
4101 .pending_peeks
4102 .iter()
4103 .map(|(id, peek)| (id.to_string(), format!("{peek:?}")))
4104 .collect();
4105 let client_pending_peeks: BTreeMap<_, _> = self
4106 .client_pending_peeks
4107 .iter()
4108 .map(|(id, peek)| {
4109 let peek: BTreeMap<_, _> = peek
4110 .iter()
4111 .map(|(uuid, storage_id)| (uuid.to_string(), storage_id))
4112 .collect();
4113 (id.to_string(), peek)
4114 })
4115 .collect();
4116 let pending_linearize_read_txns: BTreeMap<_, _> = self
4117 .pending_linearize_read_txns
4118 .iter()
4119 .map(|(id, read_txn)| (id.unhandled().to_string(), format!("{read_txn:?}")))
4120 .collect();
4121
4122 Ok(serde_json::json!({
4123 "global_timelines": global_timelines,
4124 "active_conns": active_conns,
4125 "txn_read_holds": txn_read_holds,
4126 "pending_peeks": pending_peeks,
4127 "client_pending_peeks": client_pending_peeks,
4128 "pending_linearize_read_txns": pending_linearize_read_txns,
4129 "controller": self.controller.dump().await?,
4130 }))
4131 }
4132
4133 async fn prune_storage_usage_events_on_startup(&self, retention_period: Duration) {
4147 let item_id = self
4148 .catalog()
4149 .resolve_builtin_table(&MZ_STORAGE_USAGE_BY_SHARD);
4150 let global_id = self.catalog.get_entry(&item_id).latest_global_id();
4151 let read_ts = self.get_local_read_ts().await;
4152 let current_contents_fut = self
4153 .controller
4154 .storage_collections
4155 .snapshot(global_id, read_ts);
4156 let internal_cmd_tx = self.internal_cmd_tx.clone();
4157 spawn(|| "storage_usage_prune", async move {
4158 let mut current_contents = current_contents_fut
4159 .await
4160 .unwrap_or_terminate("cannot fail to fetch snapshot");
4161 differential_dataflow::consolidation::consolidate(&mut current_contents);
4162
4163 let cutoff_ts = u128::from(read_ts).saturating_sub(retention_period.as_millis());
4164 let mut expired = Vec::new();
4165 for (row, diff) in current_contents {
4166 assert_eq!(
4167 diff, 1,
4168 "consolidated contents should not contain retractions: ({row:#?}, {diff:#?})"
4169 );
4170 let collection_timestamp = row
4172 .unpack()
4173 .get(3)
4174 .expect("definition of mz_storage_by_shard changed")
4175 .unwrap_timestamptz();
4176 let collection_timestamp = collection_timestamp.timestamp_millis();
4177 let collection_timestamp: u128 = collection_timestamp
4178 .try_into()
4179 .expect("all collections happen after Jan 1 1970");
4180 if collection_timestamp < cutoff_ts {
4181 debug!("pruning storage event {row:?}");
4182 let builtin_update = BuiltinTableUpdate::row(item_id, row, Diff::MINUS_ONE);
4183 expired.push(builtin_update);
4184 }
4185 }
4186
4187 let _ = internal_cmd_tx.send(Message::StorageUsagePrune(expired));
4189 });
4190 }
4191
4192 fn current_credit_consumption_rate(&self) -> Numeric {
4193 self.catalog()
4194 .user_cluster_replicas()
4195 .filter_map(|replica| match &replica.config.location {
4196 ReplicaLocation::Managed(location) => Some(location.size_for_billing()),
4197 ReplicaLocation::Unmanaged(_) => None,
4198 })
4199 .map(|size| {
4200 self.catalog()
4201 .cluster_replica_sizes()
4202 .0
4203 .get(size)
4204 .expect("location size is validated against the cluster replica sizes")
4205 .credits_per_hour
4206 })
4207 .sum()
4208 }
4209}
4210
4211#[cfg(test)]
4212impl Coordinator {
4213 #[allow(dead_code)]
4214 async fn verify_ship_dataflow_no_error(&mut self, dataflow: DataflowDescription<Plan>) {
4215 let compute_instance = ComputeInstanceId::user(1).expect("1 is a valid ID");
4223
4224 let _: () = self.ship_dataflow(dataflow, compute_instance, None).await;
4225 }
4226}
4227
4228struct LastMessage {
4230 kind: &'static str,
4231 stmt: Option<Arc<Statement<Raw>>>,
4232}
4233
4234impl LastMessage {
4235 fn stmt_to_string(&self) -> Cow<'static, str> {
4237 self.stmt
4238 .as_ref()
4239 .map(|stmt| stmt.to_ast_string_redacted().into())
4240 .unwrap_or(Cow::Borrowed("<none>"))
4241 }
4242}
4243
4244impl fmt::Debug for LastMessage {
4245 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4246 f.debug_struct("LastMessage")
4247 .field("kind", &self.kind)
4248 .field("stmt", &self.stmt_to_string())
4249 .finish()
4250 }
4251}
4252
4253impl Drop for LastMessage {
4254 fn drop(&mut self) {
4255 if std::thread::panicking() {
4257 eprintln!("Coordinator panicking, dumping last message\n{self:?}",);
4259 }
4260 }
4261}
4262
4263pub fn serve(
4275 Config {
4276 controller_config,
4277 controller_envd_epoch,
4278 mut storage,
4279 audit_logs_iterator,
4280 timestamp_oracle_url,
4281 unsafe_mode,
4282 all_features,
4283 build_info,
4284 environment_id,
4285 metrics_registry,
4286 now,
4287 secrets_controller,
4288 cloud_resource_controller,
4289 cluster_replica_sizes,
4290 builtin_system_cluster_config,
4291 builtin_catalog_server_cluster_config,
4292 builtin_probe_cluster_config,
4293 builtin_support_cluster_config,
4294 builtin_analytics_cluster_config,
4295 system_parameter_defaults,
4296 availability_zones,
4297 storage_usage_client,
4298 storage_usage_collection_interval,
4299 storage_usage_retention_period,
4300 segment_client,
4301 egress_addresses,
4302 aws_account_id,
4303 aws_privatelink_availability_zones,
4304 connection_context,
4305 connection_limit_callback,
4306 remote_system_parameters,
4307 webhook_concurrency_limit,
4308 http_host_name,
4309 tracing_handle,
4310 read_only_controllers,
4311 caught_up_trigger: clusters_caught_up_trigger,
4312 helm_chart_version,
4313 license_key,
4314 external_login_password_mz_system,
4315 force_builtin_schema_migration,
4316 }: Config,
4317) -> BoxFuture<'static, Result<(Handle, Client), AdapterError>> {
4318 async move {
4319 let coord_start = Instant::now();
4320 info!("startup: coordinator init: beginning");
4321 info!("startup: coordinator init: preamble beginning");
4322
4323 let _builtins = LazyLock::force(&BUILTINS_STATIC);
4327
4328 let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
4329 let (internal_cmd_tx, internal_cmd_rx) = mpsc::unbounded_channel();
4330 let (strict_serializable_reads_tx, strict_serializable_reads_rx) =
4331 mpsc::unbounded_channel();
4332
4333 if !availability_zones.iter().all_unique() {
4335 coord_bail!("availability zones must be unique");
4336 }
4337
4338 let aws_principal_context = match (
4339 aws_account_id,
4340 connection_context.aws_external_id_prefix.clone(),
4341 ) {
4342 (Some(aws_account_id), Some(aws_external_id_prefix)) => Some(AwsPrincipalContext {
4343 aws_account_id,
4344 aws_external_id_prefix,
4345 }),
4346 _ => None,
4347 };
4348
4349 let aws_privatelink_availability_zones = aws_privatelink_availability_zones
4350 .map(|azs_vec| BTreeSet::from_iter(azs_vec.iter().cloned()));
4351
4352 info!(
4353 "startup: coordinator init: preamble complete in {:?}",
4354 coord_start.elapsed()
4355 );
4356 let oracle_init_start = Instant::now();
4357 info!("startup: coordinator init: timestamp oracle init beginning");
4358
4359 let timestamp_oracle_config = timestamp_oracle_url
4360 .map(|url| TimestampOracleConfig::from_url(&url, &metrics_registry))
4361 .transpose()?;
4362 let mut initial_timestamps =
4363 get_initial_oracle_timestamps(×tamp_oracle_config).await?;
4364
4365 initial_timestamps
4369 .entry(Timeline::EpochMilliseconds)
4370 .or_insert_with(mz_repr::Timestamp::minimum);
4371 let mut timestamp_oracles = BTreeMap::new();
4372 for (timeline, initial_timestamp) in initial_timestamps {
4373 Coordinator::ensure_timeline_state_with_initial_time(
4374 &timeline,
4375 initial_timestamp,
4376 now.clone(),
4377 timestamp_oracle_config.clone(),
4378 &mut timestamp_oracles,
4379 read_only_controllers,
4380 )
4381 .await;
4382 }
4383
4384 let catalog_upper = storage.current_upper().await;
4388 let epoch_millis_oracle = ×tamp_oracles
4394 .get(&Timeline::EpochMilliseconds)
4395 .expect("inserted above")
4396 .oracle;
4397
4398 let mut boot_ts = if read_only_controllers {
4399 let read_ts = epoch_millis_oracle.read_ts().await;
4400 std::cmp::max(read_ts, catalog_upper)
4401 } else {
4402 epoch_millis_oracle.apply_write(catalog_upper).await;
4405 epoch_millis_oracle.write_ts().await.timestamp
4406 };
4407
4408 info!(
4409 "startup: coordinator init: timestamp oracle init complete in {:?}",
4410 oracle_init_start.elapsed()
4411 );
4412
4413 let catalog_open_start = Instant::now();
4414 info!("startup: coordinator init: catalog open beginning");
4415 let persist_client = controller_config
4416 .persist_clients
4417 .open(controller_config.persist_location.clone())
4418 .await
4419 .context("opening persist client")?;
4420 let builtin_item_migration_config =
4421 BuiltinItemMigrationConfig {
4422 persist_client: persist_client.clone(),
4423 read_only: read_only_controllers,
4424 force_migration: force_builtin_schema_migration,
4425 }
4426 ;
4427 let OpenCatalogResult {
4428 mut catalog,
4429 migrated_storage_collections_0dt,
4430 new_builtin_collections,
4431 builtin_table_updates,
4432 cached_global_exprs,
4433 uncached_local_exprs,
4434 } = Catalog::open(mz_catalog::config::Config {
4435 storage,
4436 metrics_registry: &metrics_registry,
4437 state: mz_catalog::config::StateConfig {
4438 unsafe_mode,
4439 all_features,
4440 build_info,
4441 deploy_generation: controller_config.deploy_generation,
4442 environment_id: environment_id.clone(),
4443 read_only: read_only_controllers,
4444 now: now.clone(),
4445 boot_ts: boot_ts.clone(),
4446 skip_migrations: false,
4447 cluster_replica_sizes,
4448 builtin_system_cluster_config,
4449 builtin_catalog_server_cluster_config,
4450 builtin_probe_cluster_config,
4451 builtin_support_cluster_config,
4452 builtin_analytics_cluster_config,
4453 system_parameter_defaults,
4454 remote_system_parameters,
4455 availability_zones,
4456 egress_addresses,
4457 aws_principal_context,
4458 aws_privatelink_availability_zones,
4459 connection_context,
4460 http_host_name,
4461 builtin_item_migration_config,
4462 persist_client: persist_client.clone(),
4463 enable_expression_cache_override: None,
4464 helm_chart_version,
4465 external_login_password_mz_system,
4466 license_key: license_key.clone(),
4467 },
4468 })
4469 .await?;
4470
4471 let catalog_upper = catalog.current_upper().await;
4474 boot_ts = std::cmp::max(boot_ts, catalog_upper);
4475
4476 if !read_only_controllers {
4477 epoch_millis_oracle.apply_write(boot_ts).await;
4478 }
4479
4480 info!(
4481 "startup: coordinator init: catalog open complete in {:?}",
4482 catalog_open_start.elapsed()
4483 );
4484
4485 let coord_thread_start = Instant::now();
4486 info!("startup: coordinator init: coordinator thread start beginning");
4487
4488 let session_id = catalog.config().session_id;
4489 let start_instant = catalog.config().start_instant;
4490
4491 let (bootstrap_tx, bootstrap_rx) = oneshot::channel();
4495 let handle = TokioHandle::current();
4496
4497 let metrics = Metrics::register_into(&metrics_registry);
4498 let metrics_clone = metrics.clone();
4499 let optimizer_metrics = OptimizerMetrics::register_into(
4500 &metrics_registry,
4501 catalog.system_config().optimizer_e2e_latency_warning_threshold(),
4502 );
4503 let segment_client_clone = segment_client.clone();
4504 let coord_now = now.clone();
4505 let advance_timelines_interval =
4506 tokio::time::interval(catalog.system_config().default_timestamp_interval());
4507 let mut check_scheduling_policies_interval = tokio::time::interval(
4508 catalog
4509 .system_config()
4510 .cluster_check_scheduling_policies_interval(),
4511 );
4512 check_scheduling_policies_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
4513
4514 let clusters_caught_up_check_interval = if read_only_controllers {
4515 let dyncfgs = catalog.system_config().dyncfgs();
4516 let interval = WITH_0DT_DEPLOYMENT_CAUGHT_UP_CHECK_INTERVAL.get(dyncfgs);
4517
4518 let mut interval = tokio::time::interval(interval);
4519 interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
4520 interval
4521 } else {
4522 let mut interval = tokio::time::interval(Duration::from_secs(60 * 60));
4530 interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
4531 interval
4532 };
4533
4534 let clusters_caught_up_check =
4535 clusters_caught_up_trigger.map(|trigger| CaughtUpCheckContext {
4536 trigger,
4537 exclude_collections: new_builtin_collections.into_iter().collect(),
4538 });
4539
4540 if let Some(TimestampOracleConfig::Postgres(pg_config)) =
4541 timestamp_oracle_config.as_ref()
4542 {
4543 let pg_timestamp_oracle_params =
4546 flags::timestamp_oracle_config(catalog.system_config());
4547 pg_timestamp_oracle_params.apply(pg_config);
4548 }
4549
4550 let connection_limit_callback: Arc<dyn Fn(&SystemVars) + Send + Sync> =
4553 Arc::new(move |system_vars: &SystemVars| {
4554 let limit: u64 = system_vars.max_connections().cast_into();
4555 let superuser_reserved: u64 =
4556 system_vars.superuser_reserved_connections().cast_into();
4557
4558 let superuser_reserved = if superuser_reserved >= limit {
4563 tracing::warn!(
4564 "superuser_reserved ({superuser_reserved}) is greater than max connections ({limit})!"
4565 );
4566 limit
4567 } else {
4568 superuser_reserved
4569 };
4570
4571 (connection_limit_callback)(limit, superuser_reserved);
4572 });
4573 catalog.system_config_mut().register_callback(
4574 &mz_sql::session::vars::MAX_CONNECTIONS,
4575 Arc::clone(&connection_limit_callback),
4576 );
4577 catalog.system_config_mut().register_callback(
4578 &mz_sql::session::vars::SUPERUSER_RESERVED_CONNECTIONS,
4579 connection_limit_callback,
4580 );
4581
4582 let (group_commit_tx, group_commit_rx) = appends::notifier();
4583
4584 let parent_span = tracing::Span::current();
4585 let thread = thread::Builder::new()
4586 .stack_size(3 * stack::STACK_SIZE)
4590 .name("coordinator".to_string())
4591 .spawn(move || {
4592 let span = info_span!(parent: parent_span, "coord::coordinator").entered();
4593
4594 let controller = handle
4595 .block_on({
4596 catalog.initialize_controller(
4597 controller_config,
4598 controller_envd_epoch,
4599 read_only_controllers,
4600 )
4601 })
4602 .unwrap_or_terminate("failed to initialize storage_controller");
4603 let catalog_upper = handle.block_on(catalog.current_upper());
4606 boot_ts = std::cmp::max(boot_ts, catalog_upper);
4607 if !read_only_controllers {
4608 let epoch_millis_oracle = ×tamp_oracles
4609 .get(&Timeline::EpochMilliseconds)
4610 .expect("inserted above")
4611 .oracle;
4612 handle.block_on(epoch_millis_oracle.apply_write(boot_ts));
4613 }
4614
4615 let catalog = Arc::new(catalog);
4616
4617 let caching_secrets_reader = CachingSecretsReader::new(secrets_controller.reader());
4618 let mut coord = Coordinator {
4619 controller,
4620 catalog,
4621 internal_cmd_tx,
4622 group_commit_tx,
4623 strict_serializable_reads_tx,
4624 global_timelines: timestamp_oracles,
4625 transient_id_gen: Arc::new(TransientIdGen::new()),
4626 active_conns: BTreeMap::new(),
4627 txn_read_holds: Default::default(),
4628 pending_peeks: BTreeMap::new(),
4629 client_pending_peeks: BTreeMap::new(),
4630 pending_linearize_read_txns: BTreeMap::new(),
4631 serialized_ddl: LockedVecDeque::new(),
4632 active_compute_sinks: BTreeMap::new(),
4633 active_webhooks: BTreeMap::new(),
4634 active_copies: BTreeMap::new(),
4635 staged_cancellation: BTreeMap::new(),
4636 introspection_subscribes: BTreeMap::new(),
4637 write_locks: BTreeMap::new(),
4638 deferred_write_ops: BTreeMap::new(),
4639 pending_writes: Vec::new(),
4640 advance_timelines_interval,
4641 secrets_controller,
4642 caching_secrets_reader,
4643 cloud_resource_controller,
4644 storage_usage_client,
4645 storage_usage_collection_interval,
4646 segment_client,
4647 metrics,
4648 optimizer_metrics,
4649 tracing_handle,
4650 statement_logging: StatementLogging::new(coord_now.clone()),
4651 webhook_concurrency_limit,
4652 timestamp_oracle_config,
4653 check_cluster_scheduling_policies_interval: check_scheduling_policies_interval,
4654 cluster_scheduling_decisions: BTreeMap::new(),
4655 caught_up_check_interval: clusters_caught_up_check_interval,
4656 caught_up_check: clusters_caught_up_check,
4657 installed_watch_sets: BTreeMap::new(),
4658 connection_watch_sets: BTreeMap::new(),
4659 cluster_replica_statuses: ClusterReplicaStatuses::new(),
4660 read_only_controllers,
4661 buffered_builtin_table_updates: Some(Vec::new()),
4662 license_key,
4663 user_id_pool: IdPool::empty(),
4664 persist_client,
4665 };
4666 let bootstrap = handle.block_on(async {
4667 coord
4668 .bootstrap(
4669 boot_ts,
4670 migrated_storage_collections_0dt,
4671 builtin_table_updates,
4672 cached_global_exprs,
4673 uncached_local_exprs,
4674 audit_logs_iterator,
4675 )
4676 .await?;
4677 coord
4678 .controller
4679 .remove_orphaned_replicas(
4680 coord.catalog().get_next_user_replica_id().await?,
4681 coord.catalog().get_next_system_replica_id().await?,
4682 )
4683 .await
4684 .map_err(AdapterError::Orchestrator)?;
4685
4686 if let Some(retention_period) = storage_usage_retention_period {
4687 coord
4688 .prune_storage_usage_events_on_startup(retention_period)
4689 .await;
4690 }
4691
4692 Ok(())
4693 });
4694 let ok = bootstrap.is_ok();
4695 drop(span);
4696 bootstrap_tx
4697 .send(bootstrap)
4698 .expect("bootstrap_rx is not dropped until it receives this message");
4699 if ok {
4700 handle.block_on(coord.serve(
4701 internal_cmd_rx,
4702 strict_serializable_reads_rx,
4703 cmd_rx,
4704 group_commit_rx,
4705 ));
4706 }
4707 })
4708 .expect("failed to create coordinator thread");
4709 match bootstrap_rx
4710 .await
4711 .expect("bootstrap_tx always sends a message or panics/halts")
4712 {
4713 Ok(()) => {
4714 info!(
4715 "startup: coordinator init: coordinator thread start complete in {:?}",
4716 coord_thread_start.elapsed()
4717 );
4718 info!(
4719 "startup: coordinator init: complete in {:?}",
4720 coord_start.elapsed()
4721 );
4722 let handle = Handle {
4723 session_id,
4724 start_instant,
4725 _thread: thread.join_on_drop(),
4726 };
4727 let client = Client::new(
4728 build_info,
4729 cmd_tx,
4730 metrics_clone,
4731 now,
4732 environment_id,
4733 segment_client_clone,
4734 );
4735 Ok((handle, client))
4736 }
4737 Err(e) => Err(e),
4738 }
4739 }
4740 .boxed()
4741}
4742
4743async fn get_initial_oracle_timestamps(
4757 timestamp_oracle_config: &Option<TimestampOracleConfig>,
4758) -> Result<BTreeMap<Timeline, Timestamp>, AdapterError> {
4759 let mut initial_timestamps = BTreeMap::new();
4760
4761 if let Some(config) = timestamp_oracle_config {
4762 let oracle_timestamps = config.get_all_timelines().await?;
4763
4764 let debug_msg = || {
4765 oracle_timestamps
4766 .iter()
4767 .map(|(timeline, ts)| format!("{:?} -> {}", timeline, ts))
4768 .join(", ")
4769 };
4770 info!(
4771 "current timestamps from the timestamp oracle: {}",
4772 debug_msg()
4773 );
4774
4775 for (timeline, ts) in oracle_timestamps {
4776 let entry = initial_timestamps
4777 .entry(Timeline::from_str(&timeline).expect("could not parse timeline"));
4778
4779 entry
4780 .and_modify(|current_ts| *current_ts = std::cmp::max(*current_ts, ts))
4781 .or_insert(ts);
4782 }
4783 } else {
4784 info!("no timestamp oracle configured!");
4785 };
4786
4787 let debug_msg = || {
4788 initial_timestamps
4789 .iter()
4790 .map(|(timeline, ts)| format!("{:?}: {}", timeline, ts))
4791 .join(", ")
4792 };
4793 info!("initial oracle timestamps: {}", debug_msg());
4794
4795 Ok(initial_timestamps)
4796}
4797
4798#[instrument]
4799pub async fn load_remote_system_parameters(
4800 storage: &mut Box<dyn OpenableDurableCatalogState>,
4801 system_parameter_sync_config: Option<SystemParameterSyncConfig>,
4802 system_parameter_sync_timeout: Duration,
4803) -> Result<Option<BTreeMap<String, String>>, AdapterError> {
4804 if let Some(system_parameter_sync_config) = system_parameter_sync_config {
4805 tracing::info!("parameter sync on boot: start sync");
4806
4807 let mut params = SynchronizedParameters::new(SystemVars::default());
4847 let frontend_sync = async {
4848 let frontend = SystemParameterFrontend::from(&system_parameter_sync_config).await?;
4849 frontend.pull(&mut params);
4850 let ops = params
4851 .modified()
4852 .into_iter()
4853 .map(|param| {
4854 let name = param.name;
4855 let value = param.value;
4856 tracing::info!(name, value, initial = true, "sync parameter");
4857 (name, value)
4858 })
4859 .collect();
4860 tracing::info!("parameter sync on boot: end sync");
4861 Ok(Some(ops))
4862 };
4863 if !storage.has_system_config_synced_once().await? {
4864 frontend_sync.await
4865 } else {
4866 match mz_ore::future::timeout(system_parameter_sync_timeout, frontend_sync).await {
4867 Ok(ops) => Ok(ops),
4868 Err(TimeoutError::Inner(e)) => Err(e),
4869 Err(TimeoutError::DeadlineElapsed) => {
4870 tracing::info!("parameter sync on boot: sync has timed out");
4871 Ok(None)
4872 }
4873 }
4874 }
4875 } else {
4876 Ok(None)
4877 }
4878}
4879
4880#[derive(Debug)]
4881pub enum WatchSetResponse {
4882 StatementDependenciesReady(StatementLoggingId, StatementLifecycleEvent),
4883 AlterSinkReady(AlterSinkReadyContext),
4884 AlterMaterializedViewReady(AlterMaterializedViewReadyContext),
4885}
4886
4887#[derive(Debug)]
4888pub struct AlterSinkReadyContext {
4889 ctx: Option<ExecuteContext>,
4890 otel_ctx: OpenTelemetryContext,
4891 plan: AlterSinkPlan,
4892 plan_validity: PlanValidity,
4893 read_hold: ReadHolds<Timestamp>,
4894}
4895
4896impl AlterSinkReadyContext {
4897 fn ctx(&mut self) -> &mut ExecuteContext {
4898 self.ctx.as_mut().expect("only cleared on drop")
4899 }
4900
4901 fn retire(mut self, result: Result<ExecuteResponse, AdapterError>) {
4902 self.ctx
4903 .take()
4904 .expect("only cleared on drop")
4905 .retire(result);
4906 }
4907}
4908
4909impl Drop for AlterSinkReadyContext {
4910 fn drop(&mut self) {
4911 if let Some(ctx) = self.ctx.take() {
4912 ctx.retire(Err(AdapterError::Canceled));
4913 }
4914 }
4915}
4916
4917#[derive(Debug)]
4918pub struct AlterMaterializedViewReadyContext {
4919 ctx: Option<ExecuteContext>,
4920 otel_ctx: OpenTelemetryContext,
4921 plan: plan::AlterMaterializedViewApplyReplacementPlan,
4922 plan_validity: PlanValidity,
4923}
4924
4925impl AlterMaterializedViewReadyContext {
4926 fn ctx(&mut self) -> &mut ExecuteContext {
4927 self.ctx.as_mut().expect("only cleared on drop")
4928 }
4929
4930 fn retire(mut self, result: Result<ExecuteResponse, AdapterError>) {
4931 self.ctx
4932 .take()
4933 .expect("only cleared on drop")
4934 .retire(result);
4935 }
4936}
4937
4938impl Drop for AlterMaterializedViewReadyContext {
4939 fn drop(&mut self) {
4940 if let Some(ctx) = self.ctx.take() {
4941 ctx.retire(Err(AdapterError::Canceled));
4942 }
4943 }
4944}
4945
4946#[derive(Debug)]
4949struct LockedVecDeque<T> {
4950 items: VecDeque<T>,
4951 lock: Arc<tokio::sync::Mutex<()>>,
4952}
4953
4954impl<T> LockedVecDeque<T> {
4955 pub fn new() -> Self {
4956 Self {
4957 items: VecDeque::new(),
4958 lock: Arc::new(tokio::sync::Mutex::new(())),
4959 }
4960 }
4961
4962 pub fn try_lock_owned(&self) -> Result<OwnedMutexGuard<()>, tokio::sync::TryLockError> {
4963 Arc::clone(&self.lock).try_lock_owned()
4964 }
4965
4966 pub fn is_empty(&self) -> bool {
4967 self.items.is_empty()
4968 }
4969
4970 pub fn push_back(&mut self, value: T) {
4971 self.items.push_back(value)
4972 }
4973
4974 pub fn pop_front(&mut self) -> Option<T> {
4975 self.items.pop_front()
4976 }
4977
4978 pub fn remove(&mut self, index: usize) -> Option<T> {
4979 self.items.remove(index)
4980 }
4981
4982 pub fn iter(&self) -> std::collections::vec_deque::Iter<'_, T> {
4983 self.items.iter()
4984 }
4985}
4986
4987#[derive(Debug)]
4988struct DeferredPlanStatement {
4989 ctx: ExecuteContext,
4990 ps: PlanStatement,
4991}
4992
4993#[derive(Debug)]
4994enum PlanStatement {
4995 Statement {
4996 stmt: Arc<Statement<Raw>>,
4997 params: Params,
4998 },
4999 Plan {
5000 plan: mz_sql::plan::Plan,
5001 resolved_ids: ResolvedIds,
5002 },
5003}
5004
5005#[derive(Debug, Error)]
5006pub enum NetworkPolicyError {
5007 #[error("Access denied for address {0}")]
5008 AddressDenied(IpAddr),
5009 #[error("Access denied missing IP address")]
5010 MissingIp,
5011}
5012
5013pub(crate) fn validate_ip_with_policy_rules(
5014 ip: &IpAddr,
5015 rules: &Vec<NetworkPolicyRule>,
5016) -> Result<(), NetworkPolicyError> {
5017 if rules.iter().any(|r| r.address.0.contains(ip)) {
5020 Ok(())
5021 } else {
5022 Err(NetworkPolicyError::AddressDenied(ip.clone()))
5023 }
5024}
5025
5026pub(crate) fn infer_sql_type_for_catalog(
5027 hir_expr: &HirRelationExpr,
5028 mir_expr: &MirRelationExpr,
5029) -> SqlRelationType {
5030 let mut typ = hir_expr.top_level_typ();
5031 typ.backport_nullability_and_keys(&mir_expr.typ());
5032 typ
5033}
5034
5035#[cfg(test)]
5036mod id_pool_tests {
5037 use super::IdPool;
5038
5039 #[mz_ore::test]
5040 fn test_empty_pool() {
5041 let mut pool = IdPool::empty();
5042 assert_eq!(pool.remaining(), 0);
5043 assert_eq!(pool.allocate(), None);
5044 assert_eq!(pool.allocate_many(1), None);
5045 }
5046
5047 #[mz_ore::test]
5048 fn test_allocate_single() {
5049 let mut pool = IdPool::empty();
5050 pool.refill(10, 13);
5051 assert_eq!(pool.remaining(), 3);
5052 assert_eq!(pool.allocate(), Some(10));
5053 assert_eq!(pool.allocate(), Some(11));
5054 assert_eq!(pool.allocate(), Some(12));
5055 assert_eq!(pool.remaining(), 0);
5056 assert_eq!(pool.allocate(), None);
5057 }
5058
5059 #[mz_ore::test]
5060 fn test_allocate_many() {
5061 let mut pool = IdPool::empty();
5062 pool.refill(100, 105);
5063 assert_eq!(pool.allocate_many(3), Some(vec![100, 101, 102]));
5064 assert_eq!(pool.remaining(), 2);
5065 assert_eq!(pool.allocate_many(3), None);
5067 assert_eq!(pool.allocate_many(2), Some(vec![103, 104]));
5069 assert_eq!(pool.remaining(), 0);
5070 }
5071
5072 #[mz_ore::test]
5073 fn test_allocate_many_zero() {
5074 let mut pool = IdPool::empty();
5075 pool.refill(1, 5);
5076 assert_eq!(pool.allocate_many(0), Some(vec![]));
5077 assert_eq!(pool.remaining(), 4);
5078 }
5079
5080 #[mz_ore::test]
5081 fn test_refill_resets_pool() {
5082 let mut pool = IdPool::empty();
5083 pool.refill(0, 2);
5084 assert_eq!(pool.allocate(), Some(0));
5085 pool.refill(50, 52);
5087 assert_eq!(pool.allocate(), Some(50));
5088 assert_eq!(pool.allocate(), Some(51));
5089 assert_eq!(pool.allocate(), None);
5090 }
5091
5092 #[mz_ore::test]
5093 fn test_mixed_allocate_and_allocate_many() {
5094 let mut pool = IdPool::empty();
5095 pool.refill(0, 10);
5096 assert_eq!(pool.allocate(), Some(0));
5097 assert_eq!(pool.allocate_many(3), Some(vec![1, 2, 3]));
5098 assert_eq!(pool.allocate(), Some(4));
5099 assert_eq!(pool.remaining(), 5);
5100 }
5101
5102 #[mz_ore::test]
5103 #[should_panic(expected = "invalid pool range")]
5104 fn test_refill_invalid_range_panics() {
5105 let mut pool = IdPool::empty();
5106 pool.refill(10, 5);
5107 }
5108}