1use std::borrow::Cow;
70use std::collections::{BTreeMap, BTreeSet, VecDeque};
71use std::fmt;
72use std::net::IpAddr;
73use std::num::NonZeroI64;
74use std::ops::Neg;
75use std::str::FromStr;
76use std::sync::LazyLock;
77use std::sync::{Arc, Mutex};
78use std::thread;
79use std::time::{Duration, Instant};
80
81use anyhow::Context;
82use chrono::{DateTime, Utc};
83use derivative::Derivative;
84use differential_dataflow::lattice::Lattice;
85use fail::fail_point;
86use futures::StreamExt;
87use futures::future::{BoxFuture, FutureExt, LocalBoxFuture};
88use http::Uri;
89use ipnet::IpNet;
90use itertools::{Either, Itertools};
91use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
92use mz_adapter_types::compaction::CompactionWindow;
93use mz_adapter_types::connection::ConnectionId;
94use mz_adapter_types::dyncfgs::WITH_0DT_DEPLOYMENT_CAUGHT_UP_CHECK_INTERVAL;
95use mz_auth::password::Password;
96use mz_build_info::BuildInfo;
97use mz_catalog::builtin::{BUILTINS, BUILTINS_STATIC, MZ_AUDIT_EVENTS, MZ_STORAGE_USAGE_BY_SHARD};
98use mz_catalog::config::{AwsPrincipalContext, BuiltinItemMigrationConfig, ClusterReplicaSizeMap};
99use mz_catalog::durable::{AuditLogIterator, OpenableDurableCatalogState};
100use mz_catalog::expr_cache::{GlobalExpressions, LocalExpressions};
101use mz_catalog::memory::objects::{
102 CatalogEntry, CatalogItem, ClusterReplicaProcessStatus, ClusterVariantManaged, Connection,
103 DataSourceDesc, StateDiff, StateUpdate, StateUpdateKind, Table, TableDataSource,
104};
105use mz_cloud_resources::{CloudResourceController, VpcEndpointConfig, VpcEndpointEvent};
106use mz_compute_client::as_of_selection;
107use mz_compute_client::controller::error::{
108 CollectionLookupError, DataflowCreationError, InstanceMissing,
109};
110use mz_compute_types::ComputeInstanceId;
111use mz_compute_types::dataflows::DataflowDescription;
112use mz_compute_types::plan::Plan;
113use mz_controller::clusters::{
114 ClusterConfig, ClusterEvent, ClusterStatus, ProcessId, ReplicaLocation,
115};
116use mz_controller::{ControllerConfig, Readiness};
117use mz_controller_types::{ClusterId, ReplicaId, WatchSetId};
118use mz_expr::{MapFilterProject, OptimizedMirRelationExpr, RowSetFinishing};
119use mz_license_keys::ValidatedLicenseKey;
120use mz_orchestrator::OfflineReason;
121use mz_ore::cast::{CastFrom, CastInto, CastLossy};
122use mz_ore::channel::trigger::Trigger;
123use mz_ore::future::TimeoutError;
124use mz_ore::metrics::MetricsRegistry;
125use mz_ore::now::{EpochMillis, NowFn};
126use mz_ore::task::{JoinHandle, spawn};
127use mz_ore::thread::JoinHandleExt;
128use mz_ore::tracing::{OpenTelemetryContext, TracingHandle};
129use mz_ore::url::SensitiveUrl;
130use mz_ore::{assert_none, instrument, soft_assert_eq_or_log, soft_assert_or_log, stack};
131use mz_persist_client::PersistClient;
132use mz_persist_client::batch::ProtoBatch;
133use mz_persist_client::usage::{ShardsUsageReferenced, StorageUsageClient};
134use mz_repr::adt::numeric::Numeric;
135use mz_repr::explain::{ExplainConfig, ExplainFormat};
136use mz_repr::global_id::TransientIdGen;
137use mz_repr::optimize::OptimizerFeatures;
138use mz_repr::role_id::RoleId;
139use mz_repr::{CatalogItemId, Diff, GlobalId, RelationDesc, Timestamp};
140use mz_secrets::cache::CachingSecretsReader;
141use mz_secrets::{SecretsController, SecretsReader};
142use mz_sql::ast::{Raw, Statement};
143use mz_sql::catalog::{CatalogCluster, EnvironmentId};
144use mz_sql::names::{QualifiedItemName, ResolvedIds, SchemaSpecifier};
145use mz_sql::optimizer_metrics::OptimizerMetrics;
146use mz_sql::plan::{
147 self, AlterSinkPlan, ConnectionDetails, CreateConnectionPlan, NetworkPolicyRule,
148 OnTimeoutAction, Params, QueryWhen,
149};
150use mz_sql::session::user::User;
151use mz_sql::session::vars::{MAX_CREDIT_CONSUMPTION_RATE, SystemVars, Var};
152use mz_sql_parser::ast::ExplainStage;
153use mz_sql_parser::ast::display::AstDisplay;
154use mz_storage_client::client::TableData;
155use mz_storage_client::controller::{CollectionDescription, DataSource, ExportDescription};
156use mz_storage_types::connections::Connection as StorageConnection;
157use mz_storage_types::connections::ConnectionContext;
158use mz_storage_types::connections::inline::{IntoInlineConnection, ReferencedConnection};
159use mz_storage_types::read_holds::ReadHold;
160use mz_storage_types::sinks::{S3SinkFormat, StorageSinkDesc};
161use mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC;
162use mz_storage_types::sources::{IngestionDescription, SourceExport, Timeline};
163use mz_timestamp_oracle::WriteTimestamp;
164use mz_timestamp_oracle::postgres_oracle::{
165 PostgresTimestampOracle, PostgresTimestampOracleConfig,
166};
167use mz_transform::dataflow::DataflowMetainfo;
168use opentelemetry::trace::TraceContextExt;
169use serde::Serialize;
170use thiserror::Error;
171use timely::progress::{Antichain, Timestamp as _};
172use tokio::runtime::Handle as TokioHandle;
173use tokio::select;
174use tokio::sync::{OwnedMutexGuard, mpsc, oneshot, watch};
175use tokio::time::{Interval, MissedTickBehavior};
176use tracing::{Instrument, Level, Span, debug, info, info_span, span, warn};
177use tracing_opentelemetry::OpenTelemetrySpanExt;
178use uuid::Uuid;
179
180use crate::active_compute_sink::{ActiveComputeSink, ActiveCopyFrom};
181use crate::catalog::{BuiltinTableUpdate, Catalog, OpenCatalogResult};
182use crate::client::{Client, Handle};
183use crate::command::{Command, ExecuteResponse};
184use crate::config::{SynchronizedParameters, SystemParameterFrontend, SystemParameterSyncConfig};
185use crate::coord::appends::{
186 BuiltinTableAppendNotify, DeferredOp, GroupCommitPermit, PendingWriteTxn,
187};
188use crate::coord::caught_up::CaughtUpCheckContext;
189use crate::coord::cluster_scheduling::SchedulingDecision;
190use crate::coord::id_bundle::CollectionIdBundle;
191use crate::coord::introspection::IntrospectionSubscribe;
192use crate::coord::peek::PendingPeek;
193use crate::coord::statement_logging::StatementLogging;
194use crate::coord::timeline::{TimelineContext, TimelineState};
195use crate::coord::timestamp_selection::{TimestampContext, TimestampDetermination};
196use crate::coord::validity::PlanValidity;
197use crate::error::AdapterError;
198use crate::explain::insights::PlanInsightsContext;
199use crate::explain::optimizer_trace::{DispatchGuard, OptimizerTrace};
200use crate::metrics::Metrics;
201use crate::optimize::dataflows::{
202 ComputeInstanceSnapshot, DataflowBuilder, dataflow_import_id_bundle,
203};
204use crate::optimize::{self, Optimize, OptimizerConfig};
205use crate::session::{EndTransactionAction, Session};
206use crate::statement_logging::{
207 StatementEndedExecutionReason, StatementLifecycleEvent, StatementLoggingId,
208};
209use crate::util::{ClientTransmitter, ResultExt};
210use crate::webhook::{WebhookAppenderInvalidator, WebhookConcurrencyLimiter};
211use crate::{AdapterNotice, ReadHolds, flags};
212
213pub(crate) mod appends;
214pub(crate) mod catalog_serving;
215pub(crate) mod cluster_scheduling;
216pub(crate) mod consistency;
217pub(crate) mod id_bundle;
218pub(crate) mod in_memory_oracle;
219pub(crate) mod peek;
220pub(crate) mod read_policy;
221pub(crate) mod sequencer;
222pub(crate) mod statement_logging;
223pub(crate) mod timeline;
224pub(crate) mod timestamp_selection;
225
226pub mod catalog_implications;
227mod caught_up;
228mod command_handler;
229mod ddl;
230mod indexes;
231mod introspection;
232mod message_handler;
233mod privatelink_status;
234mod sql;
235mod validity;
236
237#[derive(Debug)]
238pub enum Message {
239 Command(OpenTelemetryContext, Command),
240 ControllerReady {
241 controller: ControllerReadiness,
242 },
243 PurifiedStatementReady(PurifiedStatementReady),
244 CreateConnectionValidationReady(CreateConnectionValidationReady),
245 AlterConnectionValidationReady(AlterConnectionValidationReady),
246 TryDeferred {
247 conn_id: ConnectionId,
249 acquired_lock: Option<(CatalogItemId, tokio::sync::OwnedMutexGuard<()>)>,
259 },
260 GroupCommitInitiate(Span, Option<GroupCommitPermit>),
262 DeferredStatementReady,
263 AdvanceTimelines,
264 ClusterEvent(ClusterEvent),
265 CancelPendingPeeks {
266 conn_id: ConnectionId,
267 },
268 LinearizeReads,
269 StagedBatches {
270 conn_id: ConnectionId,
271 table_id: CatalogItemId,
272 batches: Vec<Result<ProtoBatch, String>>,
273 },
274 StorageUsageSchedule,
275 StorageUsageFetch,
276 StorageUsageUpdate(ShardsUsageReferenced),
277 StorageUsagePrune(Vec<BuiltinTableUpdate>),
278 RetireExecute {
281 data: ExecuteContextExtra,
282 otel_ctx: OpenTelemetryContext,
283 reason: StatementEndedExecutionReason,
284 },
285 ExecuteSingleStatementTransaction {
286 ctx: ExecuteContext,
287 otel_ctx: OpenTelemetryContext,
288 stmt: Arc<Statement<Raw>>,
289 params: mz_sql::plan::Params,
290 },
291 PeekStageReady {
292 ctx: ExecuteContext,
293 span: Span,
294 stage: PeekStage,
295 },
296 CreateIndexStageReady {
297 ctx: ExecuteContext,
298 span: Span,
299 stage: CreateIndexStage,
300 },
301 CreateViewStageReady {
302 ctx: ExecuteContext,
303 span: Span,
304 stage: CreateViewStage,
305 },
306 CreateMaterializedViewStageReady {
307 ctx: ExecuteContext,
308 span: Span,
309 stage: CreateMaterializedViewStage,
310 },
311 SubscribeStageReady {
312 ctx: ExecuteContext,
313 span: Span,
314 stage: SubscribeStage,
315 },
316 IntrospectionSubscribeStageReady {
317 span: Span,
318 stage: IntrospectionSubscribeStage,
319 },
320 SecretStageReady {
321 ctx: ExecuteContext,
322 span: Span,
323 stage: SecretStage,
324 },
325 ClusterStageReady {
326 ctx: ExecuteContext,
327 span: Span,
328 stage: ClusterStage,
329 },
330 ExplainTimestampStageReady {
331 ctx: ExecuteContext,
332 span: Span,
333 stage: ExplainTimestampStage,
334 },
335 DrainStatementLog,
336 PrivateLinkVpcEndpointEvents(Vec<VpcEndpointEvent>),
337 CheckSchedulingPolicies,
338
339 SchedulingDecisions(Vec<(&'static str, Vec<(ClusterId, SchedulingDecision)>)>),
344}
345
346impl Message {
347 pub const fn kind(&self) -> &'static str {
349 match self {
350 Message::Command(_, msg) => match msg {
351 Command::CatalogSnapshot { .. } => "command-catalog_snapshot",
352 Command::Startup { .. } => "command-startup",
353 Command::Execute { .. } => "command-execute",
354 Command::Commit { .. } => "command-commit",
355 Command::CancelRequest { .. } => "command-cancel_request",
356 Command::PrivilegedCancelRequest { .. } => "command-privileged_cancel_request",
357 Command::GetWebhook { .. } => "command-get_webhook",
358 Command::GetSystemVars { .. } => "command-get_system_vars",
359 Command::SetSystemVars { .. } => "command-set_system_vars",
360 Command::Terminate { .. } => "command-terminate",
361 Command::RetireExecute { .. } => "command-retire_execute",
362 Command::CheckConsistency { .. } => "command-check_consistency",
363 Command::Dump { .. } => "command-dump",
364 Command::AuthenticatePassword { .. } => "command-auth_check",
365 Command::AuthenticateGetSASLChallenge { .. } => "command-auth_get_sasl_challenge",
366 Command::AuthenticateVerifySASLProof { .. } => "command-auth_verify_sasl_proof",
367 Command::GetComputeInstanceClient { .. } => "get-compute-instance-client",
368 Command::GetOracle { .. } => "get-oracle",
369 Command::DetermineRealTimeRecentTimestamp { .. } => {
370 "determine-real-time-recent-timestamp"
371 }
372 Command::GetTransactionReadHoldsBundle { .. } => {
373 "get-transaction-read-holds-bundle"
374 }
375 Command::StoreTransactionReadHolds { .. } => "store-transaction-read-holds",
376 Command::ExecuteSlowPathPeek { .. } => "execute-slow-path-peek",
377 Command::CopyToPreflight { .. } => "copy-to-preflight",
378 Command::ExecuteCopyTo { .. } => "execute-copy-to",
379 Command::ExecuteSideEffectingFunc { .. } => "execute-side-effecting-func",
380 Command::RegisterFrontendPeek { .. } => "register-frontend-peek",
381 Command::UnregisterFrontendPeek { .. } => "unregister-frontend-peek",
382 Command::ExplainTimestamp { .. } => "explain-timestamp",
383 Command::FrontendStatementLogging(..) => "frontend-statement-logging",
384 },
385 Message::ControllerReady {
386 controller: ControllerReadiness::Compute,
387 } => "controller_ready(compute)",
388 Message::ControllerReady {
389 controller: ControllerReadiness::Storage,
390 } => "controller_ready(storage)",
391 Message::ControllerReady {
392 controller: ControllerReadiness::Metrics,
393 } => "controller_ready(metrics)",
394 Message::ControllerReady {
395 controller: ControllerReadiness::Internal,
396 } => "controller_ready(internal)",
397 Message::PurifiedStatementReady(_) => "purified_statement_ready",
398 Message::CreateConnectionValidationReady(_) => "create_connection_validation_ready",
399 Message::TryDeferred { .. } => "try_deferred",
400 Message::GroupCommitInitiate(..) => "group_commit_initiate",
401 Message::AdvanceTimelines => "advance_timelines",
402 Message::ClusterEvent(_) => "cluster_event",
403 Message::CancelPendingPeeks { .. } => "cancel_pending_peeks",
404 Message::LinearizeReads => "linearize_reads",
405 Message::StagedBatches { .. } => "staged_batches",
406 Message::StorageUsageSchedule => "storage_usage_schedule",
407 Message::StorageUsageFetch => "storage_usage_fetch",
408 Message::StorageUsageUpdate(_) => "storage_usage_update",
409 Message::StorageUsagePrune(_) => "storage_usage_prune",
410 Message::RetireExecute { .. } => "retire_execute",
411 Message::ExecuteSingleStatementTransaction { .. } => {
412 "execute_single_statement_transaction"
413 }
414 Message::PeekStageReady { .. } => "peek_stage_ready",
415 Message::ExplainTimestampStageReady { .. } => "explain_timestamp_stage_ready",
416 Message::CreateIndexStageReady { .. } => "create_index_stage_ready",
417 Message::CreateViewStageReady { .. } => "create_view_stage_ready",
418 Message::CreateMaterializedViewStageReady { .. } => {
419 "create_materialized_view_stage_ready"
420 }
421 Message::SubscribeStageReady { .. } => "subscribe_stage_ready",
422 Message::IntrospectionSubscribeStageReady { .. } => {
423 "introspection_subscribe_stage_ready"
424 }
425 Message::SecretStageReady { .. } => "secret_stage_ready",
426 Message::ClusterStageReady { .. } => "cluster_stage_ready",
427 Message::DrainStatementLog => "drain_statement_log",
428 Message::AlterConnectionValidationReady(..) => "alter_connection_validation_ready",
429 Message::PrivateLinkVpcEndpointEvents(_) => "private_link_vpc_endpoint_events",
430 Message::CheckSchedulingPolicies => "check_scheduling_policies",
431 Message::SchedulingDecisions { .. } => "scheduling_decision",
432 Message::DeferredStatementReady => "deferred_statement_ready",
433 }
434 }
435}
436
437#[derive(Debug)]
439pub enum ControllerReadiness {
440 Storage,
442 Compute,
444 Metrics,
446 Internal,
448}
449
450#[derive(Derivative)]
451#[derivative(Debug)]
452pub struct BackgroundWorkResult<T> {
453 #[derivative(Debug = "ignore")]
454 pub ctx: ExecuteContext,
455 pub result: Result<T, AdapterError>,
456 pub params: Params,
457 pub plan_validity: PlanValidity,
458 pub original_stmt: Arc<Statement<Raw>>,
459 pub otel_ctx: OpenTelemetryContext,
460}
461
462pub type PurifiedStatementReady = BackgroundWorkResult<mz_sql::pure::PurifiedStatement>;
463
464#[derive(Derivative)]
465#[derivative(Debug)]
466pub struct ValidationReady<T> {
467 #[derivative(Debug = "ignore")]
468 pub ctx: ExecuteContext,
469 pub result: Result<T, AdapterError>,
470 pub resolved_ids: ResolvedIds,
471 pub connection_id: CatalogItemId,
472 pub connection_gid: GlobalId,
473 pub plan_validity: PlanValidity,
474 pub otel_ctx: OpenTelemetryContext,
475}
476
477pub type CreateConnectionValidationReady = ValidationReady<CreateConnectionPlan>;
478pub type AlterConnectionValidationReady = ValidationReady<Connection>;
479
480#[derive(Debug)]
481pub enum PeekStage {
482 LinearizeTimestamp(PeekStageLinearizeTimestamp),
484 RealTimeRecency(PeekStageRealTimeRecency),
485 TimestampReadHold(PeekStageTimestampReadHold),
486 Optimize(PeekStageOptimize),
487 Finish(PeekStageFinish),
489 ExplainPlan(PeekStageExplainPlan),
491 ExplainPushdown(PeekStageExplainPushdown),
492 CopyToPreflight(PeekStageCopyTo),
494 CopyToDataflow(PeekStageCopyTo),
496}
497
498#[derive(Debug)]
499pub struct CopyToContext {
500 pub desc: RelationDesc,
502 pub uri: Uri,
504 pub connection: StorageConnection<ReferencedConnection>,
506 pub connection_id: CatalogItemId,
508 pub format: S3SinkFormat,
510 pub max_file_size: u64,
512 pub output_batch_count: Option<u64>,
517}
518
519#[derive(Debug)]
520pub struct PeekStageLinearizeTimestamp {
521 validity: PlanValidity,
522 plan: mz_sql::plan::SelectPlan,
523 max_query_result_size: Option<u64>,
524 source_ids: BTreeSet<GlobalId>,
525 target_replica: Option<ReplicaId>,
526 timeline_context: TimelineContext,
527 optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
528 explain_ctx: ExplainContext,
531}
532
533#[derive(Debug)]
534pub struct PeekStageRealTimeRecency {
535 validity: PlanValidity,
536 plan: mz_sql::plan::SelectPlan,
537 max_query_result_size: Option<u64>,
538 source_ids: BTreeSet<GlobalId>,
539 target_replica: Option<ReplicaId>,
540 timeline_context: TimelineContext,
541 oracle_read_ts: Option<Timestamp>,
542 optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
543 explain_ctx: ExplainContext,
546}
547
548#[derive(Debug)]
549pub struct PeekStageTimestampReadHold {
550 validity: PlanValidity,
551 plan: mz_sql::plan::SelectPlan,
552 max_query_result_size: Option<u64>,
553 source_ids: BTreeSet<GlobalId>,
554 target_replica: Option<ReplicaId>,
555 timeline_context: TimelineContext,
556 oracle_read_ts: Option<Timestamp>,
557 real_time_recency_ts: Option<mz_repr::Timestamp>,
558 optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
559 explain_ctx: ExplainContext,
562}
563
564#[derive(Debug)]
565pub struct PeekStageOptimize {
566 validity: PlanValidity,
567 plan: mz_sql::plan::SelectPlan,
568 max_query_result_size: Option<u64>,
569 source_ids: BTreeSet<GlobalId>,
570 id_bundle: CollectionIdBundle,
571 target_replica: Option<ReplicaId>,
572 determination: TimestampDetermination<mz_repr::Timestamp>,
573 optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
574 explain_ctx: ExplainContext,
577}
578
579#[derive(Debug)]
580pub struct PeekStageFinish {
581 validity: PlanValidity,
582 plan: mz_sql::plan::SelectPlan,
583 max_query_result_size: Option<u64>,
584 id_bundle: CollectionIdBundle,
585 target_replica: Option<ReplicaId>,
586 source_ids: BTreeSet<GlobalId>,
587 determination: TimestampDetermination<mz_repr::Timestamp>,
588 cluster_id: ComputeInstanceId,
589 finishing: RowSetFinishing,
590 plan_insights_optimizer_trace: Option<OptimizerTrace>,
593 insights_ctx: Option<Box<PlanInsightsContext>>,
594 global_lir_plan: optimize::peek::GlobalLirPlan,
595 optimization_finished_at: EpochMillis,
596}
597
598#[derive(Debug)]
599pub struct PeekStageCopyTo {
600 validity: PlanValidity,
601 optimizer: optimize::copy_to::Optimizer,
602 global_lir_plan: optimize::copy_to::GlobalLirPlan,
603 optimization_finished_at: EpochMillis,
604 source_ids: BTreeSet<GlobalId>,
605}
606
607#[derive(Debug)]
608pub struct PeekStageExplainPlan {
609 validity: PlanValidity,
610 optimizer: optimize::peek::Optimizer,
611 df_meta: DataflowMetainfo,
612 explain_ctx: ExplainPlanContext,
613 insights_ctx: Option<Box<PlanInsightsContext>>,
614}
615
616#[derive(Debug)]
617pub struct PeekStageExplainPushdown {
618 validity: PlanValidity,
619 determination: TimestampDetermination<mz_repr::Timestamp>,
620 imports: BTreeMap<GlobalId, MapFilterProject>,
621}
622
623#[derive(Debug)]
624pub enum CreateIndexStage {
625 Optimize(CreateIndexOptimize),
626 Finish(CreateIndexFinish),
627 Explain(CreateIndexExplain),
628}
629
630#[derive(Debug)]
631pub struct CreateIndexOptimize {
632 validity: PlanValidity,
633 plan: plan::CreateIndexPlan,
634 resolved_ids: ResolvedIds,
635 explain_ctx: ExplainContext,
638}
639
640#[derive(Debug)]
641pub struct CreateIndexFinish {
642 validity: PlanValidity,
643 item_id: CatalogItemId,
644 global_id: GlobalId,
645 plan: plan::CreateIndexPlan,
646 resolved_ids: ResolvedIds,
647 global_mir_plan: optimize::index::GlobalMirPlan,
648 global_lir_plan: optimize::index::GlobalLirPlan,
649}
650
651#[derive(Debug)]
652pub struct CreateIndexExplain {
653 validity: PlanValidity,
654 exported_index_id: GlobalId,
655 plan: plan::CreateIndexPlan,
656 df_meta: DataflowMetainfo,
657 explain_ctx: ExplainPlanContext,
658}
659
660#[derive(Debug)]
661pub enum CreateViewStage {
662 Optimize(CreateViewOptimize),
663 Finish(CreateViewFinish),
664 Explain(CreateViewExplain),
665}
666
667#[derive(Debug)]
668pub struct CreateViewOptimize {
669 validity: PlanValidity,
670 plan: plan::CreateViewPlan,
671 resolved_ids: ResolvedIds,
672 explain_ctx: ExplainContext,
675}
676
677#[derive(Debug)]
678pub struct CreateViewFinish {
679 validity: PlanValidity,
680 item_id: CatalogItemId,
682 global_id: GlobalId,
684 plan: plan::CreateViewPlan,
685 resolved_ids: ResolvedIds,
687 optimized_expr: OptimizedMirRelationExpr,
688}
689
690#[derive(Debug)]
691pub struct CreateViewExplain {
692 validity: PlanValidity,
693 id: GlobalId,
694 plan: plan::CreateViewPlan,
695 explain_ctx: ExplainPlanContext,
696}
697
698#[derive(Debug)]
699pub enum ExplainTimestampStage {
700 Optimize(ExplainTimestampOptimize),
701 RealTimeRecency(ExplainTimestampRealTimeRecency),
702 Finish(ExplainTimestampFinish),
703}
704
705#[derive(Debug)]
706pub struct ExplainTimestampOptimize {
707 validity: PlanValidity,
708 plan: plan::ExplainTimestampPlan,
709 cluster_id: ClusterId,
710}
711
712#[derive(Debug)]
713pub struct ExplainTimestampRealTimeRecency {
714 validity: PlanValidity,
715 format: ExplainFormat,
716 optimized_plan: OptimizedMirRelationExpr,
717 cluster_id: ClusterId,
718 when: QueryWhen,
719}
720
721#[derive(Debug)]
722pub struct ExplainTimestampFinish {
723 validity: PlanValidity,
724 format: ExplainFormat,
725 optimized_plan: OptimizedMirRelationExpr,
726 cluster_id: ClusterId,
727 source_ids: BTreeSet<GlobalId>,
728 when: QueryWhen,
729 real_time_recency_ts: Option<Timestamp>,
730}
731
732#[derive(Debug)]
733pub enum ClusterStage {
734 Alter(AlterCluster),
735 WaitForHydrated(AlterClusterWaitForHydrated),
736 Finalize(AlterClusterFinalize),
737}
738
739#[derive(Debug)]
740pub struct AlterCluster {
741 validity: PlanValidity,
742 plan: plan::AlterClusterPlan,
743}
744
745#[derive(Debug)]
746pub struct AlterClusterWaitForHydrated {
747 validity: PlanValidity,
748 plan: plan::AlterClusterPlan,
749 new_config: ClusterVariantManaged,
750 timeout_time: Instant,
751 on_timeout: OnTimeoutAction,
752}
753
754#[derive(Debug)]
755pub struct AlterClusterFinalize {
756 validity: PlanValidity,
757 plan: plan::AlterClusterPlan,
758 new_config: ClusterVariantManaged,
759}
760
761#[derive(Debug)]
762pub enum ExplainContext {
763 None,
765 Plan(ExplainPlanContext),
767 PlanInsightsNotice(OptimizerTrace),
770 Pushdown,
772}
773
774impl ExplainContext {
775 pub(crate) fn dispatch_guard(&self) -> Option<DispatchGuard<'_>> {
779 let optimizer_trace = match self {
780 ExplainContext::Plan(explain_ctx) => Some(&explain_ctx.optimizer_trace),
781 ExplainContext::PlanInsightsNotice(optimizer_trace) => Some(optimizer_trace),
782 _ => None,
783 };
784 optimizer_trace.map(|optimizer_trace| optimizer_trace.as_guard())
785 }
786
787 pub(crate) fn needs_cluster(&self) -> bool {
788 match self {
789 ExplainContext::None => true,
790 ExplainContext::Plan(..) => false,
791 ExplainContext::PlanInsightsNotice(..) => true,
792 ExplainContext::Pushdown => false,
793 }
794 }
795
796 pub(crate) fn needs_plan_insights(&self) -> bool {
797 matches!(
798 self,
799 ExplainContext::Plan(ExplainPlanContext {
800 stage: ExplainStage::PlanInsights,
801 ..
802 }) | ExplainContext::PlanInsightsNotice(_)
803 )
804 }
805}
806
807#[derive(Debug)]
808pub struct ExplainPlanContext {
809 pub broken: bool,
814 pub config: ExplainConfig,
815 pub format: ExplainFormat,
816 pub stage: ExplainStage,
817 pub replan: Option<GlobalId>,
818 pub desc: Option<RelationDesc>,
819 pub optimizer_trace: OptimizerTrace,
820}
821
822#[derive(Debug)]
823pub enum CreateMaterializedViewStage {
824 Optimize(CreateMaterializedViewOptimize),
825 Finish(CreateMaterializedViewFinish),
826 Explain(CreateMaterializedViewExplain),
827}
828
829#[derive(Debug)]
830pub struct CreateMaterializedViewOptimize {
831 validity: PlanValidity,
832 plan: plan::CreateMaterializedViewPlan,
833 resolved_ids: ResolvedIds,
834 explain_ctx: ExplainContext,
837}
838
839#[derive(Debug)]
840pub struct CreateMaterializedViewFinish {
841 item_id: CatalogItemId,
843 global_id: GlobalId,
845 validity: PlanValidity,
846 plan: plan::CreateMaterializedViewPlan,
847 resolved_ids: ResolvedIds,
848 local_mir_plan: optimize::materialized_view::LocalMirPlan,
849 global_mir_plan: optimize::materialized_view::GlobalMirPlan,
850 global_lir_plan: optimize::materialized_view::GlobalLirPlan,
851}
852
853#[derive(Debug)]
854pub struct CreateMaterializedViewExplain {
855 global_id: GlobalId,
856 validity: PlanValidity,
857 plan: plan::CreateMaterializedViewPlan,
858 df_meta: DataflowMetainfo,
859 explain_ctx: ExplainPlanContext,
860}
861
862#[derive(Debug)]
863pub enum SubscribeStage {
864 OptimizeMir(SubscribeOptimizeMir),
865 TimestampOptimizeLir(SubscribeTimestampOptimizeLir),
866 Finish(SubscribeFinish),
867}
868
869#[derive(Debug)]
870pub struct SubscribeOptimizeMir {
871 validity: PlanValidity,
872 plan: plan::SubscribePlan,
873 timeline: TimelineContext,
874 dependency_ids: BTreeSet<GlobalId>,
875 cluster_id: ComputeInstanceId,
876 replica_id: Option<ReplicaId>,
877}
878
879#[derive(Debug)]
880pub struct SubscribeTimestampOptimizeLir {
881 validity: PlanValidity,
882 plan: plan::SubscribePlan,
883 timeline: TimelineContext,
884 optimizer: optimize::subscribe::Optimizer,
885 global_mir_plan: optimize::subscribe::GlobalMirPlan<optimize::subscribe::Unresolved>,
886 dependency_ids: BTreeSet<GlobalId>,
887 replica_id: Option<ReplicaId>,
888}
889
890#[derive(Debug)]
891pub struct SubscribeFinish {
892 validity: PlanValidity,
893 cluster_id: ComputeInstanceId,
894 replica_id: Option<ReplicaId>,
895 plan: plan::SubscribePlan,
896 global_lir_plan: optimize::subscribe::GlobalLirPlan,
897 dependency_ids: BTreeSet<GlobalId>,
898}
899
900#[derive(Debug)]
901pub enum IntrospectionSubscribeStage {
902 OptimizeMir(IntrospectionSubscribeOptimizeMir),
903 TimestampOptimizeLir(IntrospectionSubscribeTimestampOptimizeLir),
904 Finish(IntrospectionSubscribeFinish),
905}
906
907#[derive(Debug)]
908pub struct IntrospectionSubscribeOptimizeMir {
909 validity: PlanValidity,
910 plan: plan::SubscribePlan,
911 subscribe_id: GlobalId,
912 cluster_id: ComputeInstanceId,
913 replica_id: ReplicaId,
914}
915
916#[derive(Debug)]
917pub struct IntrospectionSubscribeTimestampOptimizeLir {
918 validity: PlanValidity,
919 optimizer: optimize::subscribe::Optimizer,
920 global_mir_plan: optimize::subscribe::GlobalMirPlan<optimize::subscribe::Unresolved>,
921 cluster_id: ComputeInstanceId,
922 replica_id: ReplicaId,
923}
924
925#[derive(Debug)]
926pub struct IntrospectionSubscribeFinish {
927 validity: PlanValidity,
928 global_lir_plan: optimize::subscribe::GlobalLirPlan,
929 read_holds: ReadHolds<Timestamp>,
930 cluster_id: ComputeInstanceId,
931 replica_id: ReplicaId,
932}
933
934#[derive(Debug)]
935pub enum SecretStage {
936 CreateEnsure(CreateSecretEnsure),
937 CreateFinish(CreateSecretFinish),
938 RotateKeysEnsure(RotateKeysSecretEnsure),
939 RotateKeysFinish(RotateKeysSecretFinish),
940 Alter(AlterSecret),
941}
942
943#[derive(Debug)]
944pub struct CreateSecretEnsure {
945 validity: PlanValidity,
946 plan: plan::CreateSecretPlan,
947}
948
949#[derive(Debug)]
950pub struct CreateSecretFinish {
951 validity: PlanValidity,
952 item_id: CatalogItemId,
953 global_id: GlobalId,
954 plan: plan::CreateSecretPlan,
955}
956
957#[derive(Debug)]
958pub struct RotateKeysSecretEnsure {
959 validity: PlanValidity,
960 id: CatalogItemId,
961}
962
963#[derive(Debug)]
964pub struct RotateKeysSecretFinish {
965 validity: PlanValidity,
966 ops: Vec<crate::catalog::Op>,
967}
968
969#[derive(Debug)]
970pub struct AlterSecret {
971 validity: PlanValidity,
972 plan: plan::AlterSecretPlan,
973}
974
975#[derive(Debug, Copy, Clone, PartialEq, Eq)]
980pub enum TargetCluster {
981 CatalogServer,
983 Active,
985 Transaction(ClusterId),
987}
988
989pub(crate) enum StageResult<T> {
991 Handle(JoinHandle<Result<T, AdapterError>>),
993 HandleRetire(JoinHandle<Result<ExecuteResponse, AdapterError>>),
995 Immediate(T),
997 Response(ExecuteResponse),
999}
1000
1001pub(crate) trait Staged: Send {
1003 type Ctx: StagedContext;
1004
1005 fn validity(&mut self) -> &mut PlanValidity;
1006
1007 async fn stage(
1009 self,
1010 coord: &mut Coordinator,
1011 ctx: &mut Self::Ctx,
1012 ) -> Result<StageResult<Box<Self>>, AdapterError>;
1013
1014 fn message(self, ctx: Self::Ctx, span: Span) -> Message;
1016
1017 fn cancel_enabled(&self) -> bool;
1019}
1020
1021pub trait StagedContext {
1022 fn retire(self, result: Result<ExecuteResponse, AdapterError>);
1023 fn session(&self) -> Option<&Session>;
1024}
1025
1026impl StagedContext for ExecuteContext {
1027 fn retire(self, result: Result<ExecuteResponse, AdapterError>) {
1028 self.retire(result);
1029 }
1030
1031 fn session(&self) -> Option<&Session> {
1032 Some(self.session())
1033 }
1034}
1035
1036impl StagedContext for () {
1037 fn retire(self, _result: Result<ExecuteResponse, AdapterError>) {}
1038
1039 fn session(&self) -> Option<&Session> {
1040 None
1041 }
1042}
1043
1044pub struct Config {
1046 pub controller_config: ControllerConfig,
1047 pub controller_envd_epoch: NonZeroI64,
1048 pub storage: Box<dyn mz_catalog::durable::DurableCatalogState>,
1049 pub audit_logs_iterator: AuditLogIterator,
1050 pub timestamp_oracle_url: Option<SensitiveUrl>,
1051 pub unsafe_mode: bool,
1052 pub all_features: bool,
1053 pub build_info: &'static BuildInfo,
1054 pub environment_id: EnvironmentId,
1055 pub metrics_registry: MetricsRegistry,
1056 pub now: NowFn,
1057 pub secrets_controller: Arc<dyn SecretsController>,
1058 pub cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
1059 pub availability_zones: Vec<String>,
1060 pub cluster_replica_sizes: ClusterReplicaSizeMap,
1061 pub builtin_system_cluster_config: BootstrapBuiltinClusterConfig,
1062 pub builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig,
1063 pub builtin_probe_cluster_config: BootstrapBuiltinClusterConfig,
1064 pub builtin_support_cluster_config: BootstrapBuiltinClusterConfig,
1065 pub builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig,
1066 pub system_parameter_defaults: BTreeMap<String, String>,
1067 pub storage_usage_client: StorageUsageClient,
1068 pub storage_usage_collection_interval: Duration,
1069 pub storage_usage_retention_period: Option<Duration>,
1070 pub segment_client: Option<mz_segment::Client>,
1071 pub egress_addresses: Vec<IpNet>,
1072 pub remote_system_parameters: Option<BTreeMap<String, String>>,
1073 pub aws_account_id: Option<String>,
1074 pub aws_privatelink_availability_zones: Option<Vec<String>>,
1075 pub connection_context: ConnectionContext,
1076 pub connection_limit_callback: Box<dyn Fn(u64, u64) -> () + Send + Sync + 'static>,
1077 pub webhook_concurrency_limit: WebhookConcurrencyLimiter,
1078 pub http_host_name: Option<String>,
1079 pub tracing_handle: TracingHandle,
1080 pub read_only_controllers: bool,
1084
1085 pub caught_up_trigger: Option<Trigger>,
1089
1090 pub helm_chart_version: Option<String>,
1091 pub license_key: ValidatedLicenseKey,
1092 pub external_login_password_mz_system: Option<Password>,
1093 pub force_builtin_schema_migration: Option<String>,
1094}
1095
1096#[derive(Debug, Serialize)]
1098pub struct ConnMeta {
1099 secret_key: u32,
1104 connected_at: EpochMillis,
1106 user: User,
1107 application_name: String,
1108 uuid: Uuid,
1109 conn_id: ConnectionId,
1110 client_ip: Option<IpAddr>,
1111
1112 drop_sinks: BTreeSet<GlobalId>,
1115
1116 #[serde(skip)]
1118 deferred_lock: Option<OwnedMutexGuard<()>>,
1119
1120 pending_cluster_alters: BTreeSet<ClusterId>,
1123
1124 #[serde(skip)]
1126 notice_tx: mpsc::UnboundedSender<AdapterNotice>,
1127
1128 authenticated_role: RoleId,
1132}
1133
1134impl ConnMeta {
1135 pub fn conn_id(&self) -> &ConnectionId {
1136 &self.conn_id
1137 }
1138
1139 pub fn user(&self) -> &User {
1140 &self.user
1141 }
1142
1143 pub fn application_name(&self) -> &str {
1144 &self.application_name
1145 }
1146
1147 pub fn authenticated_role_id(&self) -> &RoleId {
1148 &self.authenticated_role
1149 }
1150
1151 pub fn uuid(&self) -> Uuid {
1152 self.uuid
1153 }
1154
1155 pub fn client_ip(&self) -> Option<IpAddr> {
1156 self.client_ip
1157 }
1158
1159 pub fn connected_at(&self) -> EpochMillis {
1160 self.connected_at
1161 }
1162}
1163
1164#[derive(Debug)]
1165pub struct PendingTxn {
1167 ctx: ExecuteContext,
1169 response: Result<PendingTxnResponse, AdapterError>,
1171 action: EndTransactionAction,
1173}
1174
1175#[derive(Debug)]
1176pub enum PendingTxnResponse {
1178 Committed {
1180 params: BTreeMap<&'static str, String>,
1182 },
1183 Rolledback {
1185 params: BTreeMap<&'static str, String>,
1187 },
1188}
1189
1190impl PendingTxnResponse {
1191 pub fn extend_params(&mut self, p: impl IntoIterator<Item = (&'static str, String)>) {
1192 match self {
1193 PendingTxnResponse::Committed { params }
1194 | PendingTxnResponse::Rolledback { params } => params.extend(p),
1195 }
1196 }
1197}
1198
1199impl From<PendingTxnResponse> for ExecuteResponse {
1200 fn from(value: PendingTxnResponse) -> Self {
1201 match value {
1202 PendingTxnResponse::Committed { params } => {
1203 ExecuteResponse::TransactionCommitted { params }
1204 }
1205 PendingTxnResponse::Rolledback { params } => {
1206 ExecuteResponse::TransactionRolledBack { params }
1207 }
1208 }
1209 }
1210}
1211
1212#[derive(Debug)]
1213pub struct PendingReadTxn {
1215 txn: PendingRead,
1217 timestamp_context: TimestampContext<mz_repr::Timestamp>,
1219 created: Instant,
1221 num_requeues: u64,
1225 otel_ctx: OpenTelemetryContext,
1227}
1228
1229impl PendingReadTxn {
1230 pub fn timestamp_context(&self) -> &TimestampContext<mz_repr::Timestamp> {
1232 &self.timestamp_context
1233 }
1234
1235 pub(crate) fn take_context(self) -> ExecuteContext {
1236 self.txn.take_context()
1237 }
1238}
1239
1240#[derive(Debug)]
1241enum PendingRead {
1243 Read {
1244 txn: PendingTxn,
1246 },
1247 ReadThenWrite {
1248 ctx: ExecuteContext,
1250 tx: oneshot::Sender<Option<ExecuteContext>>,
1253 },
1254}
1255
1256impl PendingRead {
1257 #[instrument(level = "debug")]
1262 pub fn finish(self) -> Option<(ExecuteContext, Result<ExecuteResponse, AdapterError>)> {
1263 match self {
1264 PendingRead::Read {
1265 txn:
1266 PendingTxn {
1267 mut ctx,
1268 response,
1269 action,
1270 },
1271 ..
1272 } => {
1273 let changed = ctx.session_mut().vars_mut().end_transaction(action);
1274 let response = response.map(|mut r| {
1276 r.extend_params(changed);
1277 ExecuteResponse::from(r)
1278 });
1279
1280 Some((ctx, response))
1281 }
1282 PendingRead::ReadThenWrite { ctx, tx, .. } => {
1283 let _ = tx.send(Some(ctx));
1285 None
1286 }
1287 }
1288 }
1289
1290 fn label(&self) -> &'static str {
1291 match self {
1292 PendingRead::Read { .. } => "read",
1293 PendingRead::ReadThenWrite { .. } => "read_then_write",
1294 }
1295 }
1296
1297 pub(crate) fn take_context(self) -> ExecuteContext {
1298 match self {
1299 PendingRead::Read { txn, .. } => txn.ctx,
1300 PendingRead::ReadThenWrite { ctx, tx, .. } => {
1301 let _ = tx.send(None);
1304 ctx
1305 }
1306 }
1307 }
1308}
1309
1310#[derive(Debug, Default)]
1320#[must_use]
1321pub struct ExecuteContextExtra {
1322 statement_uuid: Option<StatementLoggingId>,
1323}
1324
1325impl ExecuteContextExtra {
1326 pub(crate) fn new(statement_uuid: Option<StatementLoggingId>) -> Self {
1327 Self { statement_uuid }
1328 }
1329 pub fn is_trivial(&self) -> bool {
1330 self.statement_uuid.is_none()
1331 }
1332 pub fn contents(&self) -> Option<StatementLoggingId> {
1333 self.statement_uuid
1334 }
1335 #[must_use]
1339 pub(crate) fn retire(self) -> Option<StatementLoggingId> {
1340 self.statement_uuid
1341 }
1342}
1343
1344#[derive(Debug)]
1354#[must_use]
1355pub struct ExecuteContextGuard {
1356 extra: ExecuteContextExtra,
1357 coordinator_tx: mpsc::UnboundedSender<Message>,
1362}
1363
1364impl Default for ExecuteContextGuard {
1365 fn default() -> Self {
1366 let (tx, _rx) = mpsc::unbounded_channel();
1370 Self {
1371 extra: ExecuteContextExtra::default(),
1372 coordinator_tx: tx,
1373 }
1374 }
1375}
1376
1377impl ExecuteContextGuard {
1378 pub(crate) fn new(
1379 statement_uuid: Option<StatementLoggingId>,
1380 coordinator_tx: mpsc::UnboundedSender<Message>,
1381 ) -> Self {
1382 Self {
1383 extra: ExecuteContextExtra::new(statement_uuid),
1384 coordinator_tx,
1385 }
1386 }
1387 pub fn is_trivial(&self) -> bool {
1388 self.extra.is_trivial()
1389 }
1390 pub fn contents(&self) -> Option<StatementLoggingId> {
1391 self.extra.contents()
1392 }
1393 pub(crate) fn defuse(mut self) -> ExecuteContextExtra {
1400 std::mem::take(&mut self.extra)
1402 }
1403}
1404
1405impl Drop for ExecuteContextGuard {
1406 fn drop(&mut self) {
1407 if let Some(statement_uuid) = self.extra.statement_uuid.take() {
1408 let msg = Message::RetireExecute {
1411 data: ExecuteContextExtra {
1412 statement_uuid: Some(statement_uuid),
1413 },
1414 otel_ctx: OpenTelemetryContext::obtain(),
1415 reason: StatementEndedExecutionReason::Aborted,
1416 };
1417 let _ = self.coordinator_tx.send(msg);
1420 }
1421 }
1422}
1423
1424#[derive(Debug)]
1436pub struct ExecuteContext {
1437 inner: Box<ExecuteContextInner>,
1438}
1439
1440impl std::ops::Deref for ExecuteContext {
1441 type Target = ExecuteContextInner;
1442 fn deref(&self) -> &Self::Target {
1443 &*self.inner
1444 }
1445}
1446
1447impl std::ops::DerefMut for ExecuteContext {
1448 fn deref_mut(&mut self) -> &mut Self::Target {
1449 &mut *self.inner
1450 }
1451}
1452
1453#[derive(Debug)]
1454pub struct ExecuteContextInner {
1455 tx: ClientTransmitter<ExecuteResponse>,
1456 internal_cmd_tx: mpsc::UnboundedSender<Message>,
1457 session: Session,
1458 extra: ExecuteContextGuard,
1459}
1460
1461impl ExecuteContext {
1462 pub fn session(&self) -> &Session {
1463 &self.session
1464 }
1465
1466 pub fn session_mut(&mut self) -> &mut Session {
1467 &mut self.session
1468 }
1469
1470 pub fn tx(&self) -> &ClientTransmitter<ExecuteResponse> {
1471 &self.tx
1472 }
1473
1474 pub fn tx_mut(&mut self) -> &mut ClientTransmitter<ExecuteResponse> {
1475 &mut self.tx
1476 }
1477
1478 pub fn from_parts(
1479 tx: ClientTransmitter<ExecuteResponse>,
1480 internal_cmd_tx: mpsc::UnboundedSender<Message>,
1481 session: Session,
1482 extra: ExecuteContextGuard,
1483 ) -> Self {
1484 Self {
1485 inner: ExecuteContextInner {
1486 tx,
1487 session,
1488 extra,
1489 internal_cmd_tx,
1490 }
1491 .into(),
1492 }
1493 }
1494
1495 pub fn into_parts(
1504 self,
1505 ) -> (
1506 ClientTransmitter<ExecuteResponse>,
1507 mpsc::UnboundedSender<Message>,
1508 Session,
1509 ExecuteContextGuard,
1510 ) {
1511 let ExecuteContextInner {
1512 tx,
1513 internal_cmd_tx,
1514 session,
1515 extra,
1516 } = *self.inner;
1517 (tx, internal_cmd_tx, session, extra)
1518 }
1519
1520 #[instrument(level = "debug")]
1522 pub fn retire(self, result: Result<ExecuteResponse, AdapterError>) {
1523 let ExecuteContextInner {
1524 tx,
1525 internal_cmd_tx,
1526 session,
1527 extra,
1528 } = *self.inner;
1529 let reason = if extra.is_trivial() {
1530 None
1531 } else {
1532 Some((&result).into())
1533 };
1534 tx.send(result, session);
1535 if let Some(reason) = reason {
1536 let extra = extra.defuse();
1538 if let Err(e) = internal_cmd_tx.send(Message::RetireExecute {
1539 otel_ctx: OpenTelemetryContext::obtain(),
1540 data: extra,
1541 reason,
1542 }) {
1543 warn!("internal_cmd_rx dropped before we could send: {:?}", e);
1544 }
1545 }
1546 }
1547
1548 pub fn extra(&self) -> &ExecuteContextGuard {
1549 &self.extra
1550 }
1551
1552 pub fn extra_mut(&mut self) -> &mut ExecuteContextGuard {
1553 &mut self.extra
1554 }
1555}
1556
1557#[derive(Debug)]
1558struct ClusterReplicaStatuses(
1559 BTreeMap<ClusterId, BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>>>,
1560);
1561
1562impl ClusterReplicaStatuses {
1563 pub(crate) fn new() -> ClusterReplicaStatuses {
1564 ClusterReplicaStatuses(BTreeMap::new())
1565 }
1566
1567 pub(crate) fn initialize_cluster_statuses(&mut self, cluster_id: ClusterId) {
1571 let prev = self.0.insert(cluster_id, BTreeMap::new());
1572 assert_eq!(
1573 prev, None,
1574 "cluster {cluster_id} statuses already initialized"
1575 );
1576 }
1577
1578 pub(crate) fn initialize_cluster_replica_statuses(
1582 &mut self,
1583 cluster_id: ClusterId,
1584 replica_id: ReplicaId,
1585 num_processes: usize,
1586 time: DateTime<Utc>,
1587 ) {
1588 tracing::info!(
1589 ?cluster_id,
1590 ?replica_id,
1591 ?time,
1592 "initializing cluster replica status"
1593 );
1594 let replica_statuses = self.0.entry(cluster_id).or_default();
1595 let process_statuses = (0..num_processes)
1596 .map(|process_id| {
1597 let status = ClusterReplicaProcessStatus {
1598 status: ClusterStatus::Offline(Some(OfflineReason::Initializing)),
1599 time: time.clone(),
1600 };
1601 (u64::cast_from(process_id), status)
1602 })
1603 .collect();
1604 let prev = replica_statuses.insert(replica_id, process_statuses);
1605 assert_none!(
1606 prev,
1607 "cluster replica {cluster_id}.{replica_id} statuses already initialized"
1608 );
1609 }
1610
1611 pub(crate) fn remove_cluster_statuses(
1615 &mut self,
1616 cluster_id: &ClusterId,
1617 ) -> BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>> {
1618 let prev = self.0.remove(cluster_id);
1619 prev.unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"))
1620 }
1621
1622 pub(crate) fn remove_cluster_replica_statuses(
1626 &mut self,
1627 cluster_id: &ClusterId,
1628 replica_id: &ReplicaId,
1629 ) -> BTreeMap<ProcessId, ClusterReplicaProcessStatus> {
1630 let replica_statuses = self
1631 .0
1632 .get_mut(cluster_id)
1633 .unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"));
1634 let prev = replica_statuses.remove(replica_id);
1635 prev.unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1636 }
1637
1638 pub(crate) fn ensure_cluster_status(
1642 &mut self,
1643 cluster_id: ClusterId,
1644 replica_id: ReplicaId,
1645 process_id: ProcessId,
1646 status: ClusterReplicaProcessStatus,
1647 ) {
1648 let replica_statuses = self
1649 .0
1650 .get_mut(&cluster_id)
1651 .unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"))
1652 .get_mut(&replica_id)
1653 .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"));
1654 replica_statuses.insert(process_id, status);
1655 }
1656
1657 pub fn get_cluster_replica_status(
1661 &self,
1662 cluster_id: ClusterId,
1663 replica_id: ReplicaId,
1664 ) -> ClusterStatus {
1665 let process_status = self.get_cluster_replica_statuses(cluster_id, replica_id);
1666 Self::cluster_replica_status(process_status)
1667 }
1668
1669 pub fn cluster_replica_status(
1671 process_status: &BTreeMap<ProcessId, ClusterReplicaProcessStatus>,
1672 ) -> ClusterStatus {
1673 process_status
1674 .values()
1675 .fold(ClusterStatus::Online, |s, p| match (s, p.status) {
1676 (ClusterStatus::Online, ClusterStatus::Online) => ClusterStatus::Online,
1677 (x, y) => {
1678 let reason_x = match x {
1679 ClusterStatus::Offline(reason) => reason,
1680 ClusterStatus::Online => None,
1681 };
1682 let reason_y = match y {
1683 ClusterStatus::Offline(reason) => reason,
1684 ClusterStatus::Online => None,
1685 };
1686 ClusterStatus::Offline(reason_x.or(reason_y))
1688 }
1689 })
1690 }
1691
1692 pub(crate) fn get_cluster_replica_statuses(
1696 &self,
1697 cluster_id: ClusterId,
1698 replica_id: ReplicaId,
1699 ) -> &BTreeMap<ProcessId, ClusterReplicaProcessStatus> {
1700 self.try_get_cluster_replica_statuses(cluster_id, replica_id)
1701 .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1702 }
1703
1704 pub(crate) fn try_get_cluster_replica_statuses(
1706 &self,
1707 cluster_id: ClusterId,
1708 replica_id: ReplicaId,
1709 ) -> Option<&BTreeMap<ProcessId, ClusterReplicaProcessStatus>> {
1710 self.try_get_cluster_statuses(cluster_id)
1711 .and_then(|statuses| statuses.get(&replica_id))
1712 }
1713
1714 pub(crate) fn try_get_cluster_statuses(
1716 &self,
1717 cluster_id: ClusterId,
1718 ) -> Option<&BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>>> {
1719 self.0.get(&cluster_id)
1720 }
1721}
1722
1723#[derive(Derivative)]
1725#[derivative(Debug)]
1726pub struct Coordinator {
1727 #[derivative(Debug = "ignore")]
1729 controller: mz_controller::Controller,
1730 catalog: Arc<Catalog>,
1738
1739 persist_client: PersistClient,
1742
1743 internal_cmd_tx: mpsc::UnboundedSender<Message>,
1745 group_commit_tx: appends::GroupCommitNotifier,
1747
1748 strict_serializable_reads_tx: mpsc::UnboundedSender<(ConnectionId, PendingReadTxn)>,
1750
1751 global_timelines: BTreeMap<Timeline, TimelineState<Timestamp>>,
1754
1755 transient_id_gen: Arc<TransientIdGen>,
1757 active_conns: BTreeMap<ConnectionId, ConnMeta>,
1760
1761 txn_read_holds: BTreeMap<ConnectionId, read_policy::ReadHolds<Timestamp>>,
1765
1766 pending_peeks: BTreeMap<Uuid, PendingPeek>,
1770 client_pending_peeks: BTreeMap<ConnectionId, BTreeMap<Uuid, ClusterId>>,
1772
1773 pending_linearize_read_txns: BTreeMap<ConnectionId, PendingReadTxn>,
1775
1776 active_compute_sinks: BTreeMap<GlobalId, ActiveComputeSink>,
1778 active_webhooks: BTreeMap<CatalogItemId, WebhookAppenderInvalidator>,
1780 active_copies: BTreeMap<ConnectionId, ActiveCopyFrom>,
1783
1784 staged_cancellation: BTreeMap<ConnectionId, (watch::Sender<bool>, watch::Receiver<bool>)>,
1787 introspection_subscribes: BTreeMap<GlobalId, IntrospectionSubscribe>,
1789
1790 write_locks: BTreeMap<CatalogItemId, Arc<tokio::sync::Mutex<()>>>,
1792 deferred_write_ops: BTreeMap<ConnectionId, DeferredOp>,
1794
1795 pending_writes: Vec<PendingWriteTxn>,
1797
1798 advance_timelines_interval: Interval,
1808
1809 serialized_ddl: LockedVecDeque<DeferredPlanStatement>,
1818
1819 secrets_controller: Arc<dyn SecretsController>,
1822 caching_secrets_reader: CachingSecretsReader,
1824
1825 cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
1828
1829 storage_usage_client: StorageUsageClient,
1831 storage_usage_collection_interval: Duration,
1833
1834 #[derivative(Debug = "ignore")]
1836 segment_client: Option<mz_segment::Client>,
1837
1838 metrics: Metrics,
1840 optimizer_metrics: OptimizerMetrics,
1842
1843 tracing_handle: TracingHandle,
1845
1846 statement_logging: StatementLogging,
1848
1849 webhook_concurrency_limit: WebhookConcurrencyLimiter,
1851
1852 pg_timestamp_oracle_config: Option<PostgresTimestampOracleConfig>,
1856
1857 check_cluster_scheduling_policies_interval: Interval,
1859
1860 cluster_scheduling_decisions: BTreeMap<ClusterId, BTreeMap<&'static str, SchedulingDecision>>,
1864
1865 caught_up_check_interval: Interval,
1868
1869 caught_up_check: Option<CaughtUpCheckContext>,
1872
1873 installed_watch_sets: BTreeMap<WatchSetId, (ConnectionId, WatchSetResponse)>,
1875
1876 connection_watch_sets: BTreeMap<ConnectionId, BTreeSet<WatchSetId>>,
1878
1879 cluster_replica_statuses: ClusterReplicaStatuses,
1881
1882 read_only_controllers: bool,
1886
1887 buffered_builtin_table_updates: Option<Vec<BuiltinTableUpdate>>,
1895
1896 license_key: ValidatedLicenseKey,
1897}
1898
1899impl Coordinator {
1900 #[instrument(name = "coord::bootstrap")]
1904 pub(crate) async fn bootstrap(
1905 &mut self,
1906 boot_ts: Timestamp,
1907 migrated_storage_collections_0dt: BTreeSet<CatalogItemId>,
1908 mut builtin_table_updates: Vec<BuiltinTableUpdate>,
1909 cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
1910 uncached_local_exprs: BTreeMap<GlobalId, LocalExpressions>,
1911 audit_logs_iterator: AuditLogIterator,
1912 ) -> Result<(), AdapterError> {
1913 let bootstrap_start = Instant::now();
1914 info!("startup: coordinator init: bootstrap beginning");
1915 info!("startup: coordinator init: bootstrap: preamble beginning");
1916
1917 let cluster_statuses: Vec<(_, Vec<_>)> = self
1920 .catalog()
1921 .clusters()
1922 .map(|cluster| {
1923 (
1924 cluster.id(),
1925 cluster
1926 .replicas()
1927 .map(|replica| {
1928 (replica.replica_id, replica.config.location.num_processes())
1929 })
1930 .collect(),
1931 )
1932 })
1933 .collect();
1934 let now = self.now_datetime();
1935 for (cluster_id, replica_statuses) in cluster_statuses {
1936 self.cluster_replica_statuses
1937 .initialize_cluster_statuses(cluster_id);
1938 for (replica_id, num_processes) in replica_statuses {
1939 self.cluster_replica_statuses
1940 .initialize_cluster_replica_statuses(
1941 cluster_id,
1942 replica_id,
1943 num_processes,
1944 now,
1945 );
1946 }
1947 }
1948
1949 let system_config = self.catalog().system_config();
1950
1951 mz_metrics::update_dyncfg(&system_config.dyncfg_updates());
1953
1954 let compute_config = flags::compute_config(system_config);
1956 let storage_config = flags::storage_config(system_config);
1957 let scheduling_config = flags::orchestrator_scheduling_config(system_config);
1958 let dyncfg_updates = system_config.dyncfg_updates();
1959 self.controller.compute.update_configuration(compute_config);
1960 self.controller.storage.update_parameters(storage_config);
1961 self.controller
1962 .update_orchestrator_scheduling_config(scheduling_config);
1963 self.controller.update_configuration(dyncfg_updates);
1964
1965 self.validate_resource_limit_numeric(
1966 Numeric::zero(),
1967 self.current_credit_consumption_rate(),
1968 |system_vars| {
1969 self.license_key
1970 .max_credit_consumption_rate()
1971 .map_or_else(|| system_vars.max_credit_consumption_rate(), Numeric::from)
1972 },
1973 "cluster replica",
1974 MAX_CREDIT_CONSUMPTION_RATE.name(),
1975 )?;
1976
1977 let mut policies_to_set: BTreeMap<CompactionWindow, CollectionIdBundle> =
1978 Default::default();
1979
1980 let enable_worker_core_affinity =
1981 self.catalog().system_config().enable_worker_core_affinity();
1982 for instance in self.catalog.clusters() {
1983 self.controller.create_cluster(
1984 instance.id,
1985 ClusterConfig {
1986 arranged_logs: instance.log_indexes.clone(),
1987 workload_class: instance.config.workload_class.clone(),
1988 },
1989 )?;
1990 for replica in instance.replicas() {
1991 let role = instance.role();
1992 self.controller.create_replica(
1993 instance.id,
1994 replica.replica_id,
1995 instance.name.clone(),
1996 replica.name.clone(),
1997 role,
1998 replica.config.clone(),
1999 enable_worker_core_affinity,
2000 )?;
2001 }
2002 }
2003
2004 info!(
2005 "startup: coordinator init: bootstrap: preamble complete in {:?}",
2006 bootstrap_start.elapsed()
2007 );
2008
2009 let init_storage_collections_start = Instant::now();
2010 info!("startup: coordinator init: bootstrap: storage collections init beginning");
2011 self.bootstrap_storage_collections(&migrated_storage_collections_0dt)
2012 .await;
2013 info!(
2014 "startup: coordinator init: bootstrap: storage collections init complete in {:?}",
2015 init_storage_collections_start.elapsed()
2016 );
2017
2018 self.controller.start_compute_introspection_sink();
2023
2024 let optimize_dataflows_start = Instant::now();
2025 info!("startup: coordinator init: bootstrap: optimize dataflow plans beginning");
2026 let entries: Vec<_> = self.catalog().entries().cloned().collect();
2027 let uncached_global_exps = self.bootstrap_dataflow_plans(&entries, cached_global_exprs)?;
2028 info!(
2029 "startup: coordinator init: bootstrap: optimize dataflow plans complete in {:?}",
2030 optimize_dataflows_start.elapsed()
2031 );
2032
2033 let _fut = self.catalog().update_expression_cache(
2035 uncached_local_exprs.into_iter().collect(),
2036 uncached_global_exps.into_iter().collect(),
2037 );
2038
2039 let bootstrap_as_ofs_start = Instant::now();
2043 info!("startup: coordinator init: bootstrap: dataflow as-of bootstrapping beginning");
2044 let dataflow_read_holds = self.bootstrap_dataflow_as_ofs().await;
2045 info!(
2046 "startup: coordinator init: bootstrap: dataflow as-of bootstrapping complete in {:?}",
2047 bootstrap_as_ofs_start.elapsed()
2048 );
2049
2050 let postamble_start = Instant::now();
2051 info!("startup: coordinator init: bootstrap: postamble beginning");
2052
2053 let logs: BTreeSet<_> = BUILTINS::logs()
2054 .map(|log| self.catalog().resolve_builtin_log(log))
2055 .flat_map(|item_id| self.catalog().get_global_ids(&item_id))
2056 .collect();
2057
2058 let mut privatelink_connections = BTreeMap::new();
2059
2060 for entry in &entries {
2061 debug!(
2062 "coordinator init: installing {} {}",
2063 entry.item().typ(),
2064 entry.id()
2065 );
2066 let mut policy = entry.item().initial_logical_compaction_window();
2067 match entry.item() {
2068 CatalogItem::Source(source) => {
2074 if source.custom_logical_compaction_window.is_none() {
2076 if let DataSourceDesc::IngestionExport { ingestion_id, .. } =
2077 source.data_source
2078 {
2079 policy = Some(
2080 self.catalog()
2081 .get_entry(&ingestion_id)
2082 .source()
2083 .expect("must be source")
2084 .custom_logical_compaction_window
2085 .unwrap_or_default(),
2086 );
2087 }
2088 }
2089 policies_to_set
2090 .entry(policy.expect("sources have a compaction window"))
2091 .or_insert_with(Default::default)
2092 .storage_ids
2093 .insert(source.global_id());
2094 }
2095 CatalogItem::Table(table) => {
2096 policies_to_set
2097 .entry(policy.expect("tables have a compaction window"))
2098 .or_insert_with(Default::default)
2099 .storage_ids
2100 .extend(table.global_ids());
2101 }
2102 CatalogItem::Index(idx) => {
2103 let policy_entry = policies_to_set
2104 .entry(policy.expect("indexes have a compaction window"))
2105 .or_insert_with(Default::default);
2106
2107 if logs.contains(&idx.on) {
2108 policy_entry
2109 .compute_ids
2110 .entry(idx.cluster_id)
2111 .or_insert_with(BTreeSet::new)
2112 .insert(idx.global_id());
2113 } else {
2114 let df_desc = self
2115 .catalog()
2116 .try_get_physical_plan(&idx.global_id())
2117 .expect("added in `bootstrap_dataflow_plans`")
2118 .clone();
2119
2120 let df_meta = self
2121 .catalog()
2122 .try_get_dataflow_metainfo(&idx.global_id())
2123 .expect("added in `bootstrap_dataflow_plans`");
2124
2125 if self.catalog().state().system_config().enable_mz_notices() {
2126 self.catalog().state().pack_optimizer_notices(
2128 &mut builtin_table_updates,
2129 df_meta.optimizer_notices.iter(),
2130 Diff::ONE,
2131 );
2132 }
2133
2134 policy_entry
2137 .compute_ids
2138 .entry(idx.cluster_id)
2139 .or_insert_with(Default::default)
2140 .extend(df_desc.export_ids());
2141
2142 self.controller
2143 .compute
2144 .create_dataflow(idx.cluster_id, df_desc, None)
2145 .unwrap_or_terminate("cannot fail to create dataflows");
2146 }
2147 }
2148 CatalogItem::View(_) => (),
2149 CatalogItem::MaterializedView(mview) => {
2150 policies_to_set
2151 .entry(policy.expect("materialized views have a compaction window"))
2152 .or_insert_with(Default::default)
2153 .storage_ids
2154 .insert(mview.global_id_writes());
2155
2156 let mut df_desc = self
2157 .catalog()
2158 .try_get_physical_plan(&mview.global_id_writes())
2159 .expect("added in `bootstrap_dataflow_plans`")
2160 .clone();
2161
2162 if let Some(initial_as_of) = mview.initial_as_of.clone() {
2163 df_desc.set_initial_as_of(initial_as_of);
2164 }
2165
2166 let until = mview
2168 .refresh_schedule
2169 .as_ref()
2170 .and_then(|s| s.last_refresh())
2171 .and_then(|r| r.try_step_forward());
2172 if let Some(until) = until {
2173 df_desc.until.meet_assign(&Antichain::from_elem(until));
2174 }
2175
2176 let df_meta = self
2177 .catalog()
2178 .try_get_dataflow_metainfo(&mview.global_id_writes())
2179 .expect("added in `bootstrap_dataflow_plans`");
2180
2181 if self.catalog().state().system_config().enable_mz_notices() {
2182 self.catalog().state().pack_optimizer_notices(
2184 &mut builtin_table_updates,
2185 df_meta.optimizer_notices.iter(),
2186 Diff::ONE,
2187 );
2188 }
2189
2190 self.ship_dataflow(df_desc, mview.cluster_id, None).await;
2191
2192 if mview.replacement_target.is_none() {
2195 self.allow_writes(mview.cluster_id, mview.global_id_writes());
2196 }
2197 }
2198 CatalogItem::Sink(sink) => {
2199 policies_to_set
2200 .entry(CompactionWindow::Default)
2201 .or_insert_with(Default::default)
2202 .storage_ids
2203 .insert(sink.global_id());
2204 }
2205 CatalogItem::Connection(catalog_connection) => {
2206 if let ConnectionDetails::AwsPrivatelink(conn) = &catalog_connection.details {
2207 privatelink_connections.insert(
2208 entry.id(),
2209 VpcEndpointConfig {
2210 aws_service_name: conn.service_name.clone(),
2211 availability_zone_ids: conn.availability_zones.clone(),
2212 },
2213 );
2214 }
2215 }
2216 CatalogItem::ContinualTask(ct) => {
2217 policies_to_set
2218 .entry(policy.expect("continual tasks have a compaction window"))
2219 .or_insert_with(Default::default)
2220 .storage_ids
2221 .insert(ct.global_id());
2222
2223 let mut df_desc = self
2224 .catalog()
2225 .try_get_physical_plan(&ct.global_id())
2226 .expect("added in `bootstrap_dataflow_plans`")
2227 .clone();
2228
2229 if let Some(initial_as_of) = ct.initial_as_of.clone() {
2230 df_desc.set_initial_as_of(initial_as_of);
2231 }
2232
2233 let df_meta = self
2234 .catalog()
2235 .try_get_dataflow_metainfo(&ct.global_id())
2236 .expect("added in `bootstrap_dataflow_plans`");
2237
2238 if self.catalog().state().system_config().enable_mz_notices() {
2239 self.catalog().state().pack_optimizer_notices(
2241 &mut builtin_table_updates,
2242 df_meta.optimizer_notices.iter(),
2243 Diff::ONE,
2244 );
2245 }
2246
2247 self.ship_dataflow(df_desc, ct.cluster_id, None).await;
2248 self.allow_writes(ct.cluster_id, ct.global_id());
2249 }
2250 CatalogItem::Log(_)
2252 | CatalogItem::Type(_)
2253 | CatalogItem::Func(_)
2254 | CatalogItem::Secret(_) => {}
2255 }
2256 }
2257
2258 if let Some(cloud_resource_controller) = &self.cloud_resource_controller {
2259 let existing_vpc_endpoints = cloud_resource_controller
2261 .list_vpc_endpoints()
2262 .await
2263 .context("list vpc endpoints")?;
2264 let existing_vpc_endpoints = BTreeSet::from_iter(existing_vpc_endpoints.into_keys());
2265 let desired_vpc_endpoints = privatelink_connections.keys().cloned().collect();
2266 let vpc_endpoints_to_remove = existing_vpc_endpoints.difference(&desired_vpc_endpoints);
2267 for id in vpc_endpoints_to_remove {
2268 cloud_resource_controller
2269 .delete_vpc_endpoint(*id)
2270 .await
2271 .context("deleting extraneous vpc endpoint")?;
2272 }
2273
2274 for (id, spec) in privatelink_connections {
2276 cloud_resource_controller
2277 .ensure_vpc_endpoint(id, spec)
2278 .await
2279 .context("ensuring vpc endpoint")?;
2280 }
2281 }
2282
2283 drop(dataflow_read_holds);
2286 for (cw, policies) in policies_to_set {
2288 self.initialize_read_policies(&policies, cw).await;
2289 }
2290
2291 builtin_table_updates.extend(
2293 self.catalog().state().resolve_builtin_table_updates(
2294 self.catalog().state().pack_all_replica_size_updates(),
2295 ),
2296 );
2297
2298 debug!("startup: coordinator init: bootstrap: initializing migrated builtin tables");
2299 let migrated_updates_fut = if self.controller.read_only() {
2305 let min_timestamp = Timestamp::minimum();
2306 let migrated_builtin_table_updates: Vec<_> = builtin_table_updates
2307 .extract_if(.., |update| {
2308 let gid = self.catalog().get_entry(&update.id).latest_global_id();
2309 migrated_storage_collections_0dt.contains(&update.id)
2310 && self
2311 .controller
2312 .storage_collections
2313 .collection_frontiers(gid)
2314 .expect("all tables are registered")
2315 .write_frontier
2316 .elements()
2317 == &[min_timestamp]
2318 })
2319 .collect();
2320 if migrated_builtin_table_updates.is_empty() {
2321 futures::future::ready(()).boxed()
2322 } else {
2323 let mut grouped_appends: BTreeMap<GlobalId, Vec<TableData>> = BTreeMap::new();
2325 for update in migrated_builtin_table_updates {
2326 let gid = self.catalog().get_entry(&update.id).latest_global_id();
2327 grouped_appends.entry(gid).or_default().push(update.data);
2328 }
2329 info!(
2330 "coordinator init: rehydrating migrated builtin tables in read-only mode: {:?}",
2331 grouped_appends.keys().collect::<Vec<_>>()
2332 );
2333
2334 let mut all_appends = Vec::with_capacity(grouped_appends.len());
2336 for (item_id, table_data) in grouped_appends.into_iter() {
2337 let mut all_rows = Vec::new();
2338 let mut all_data = Vec::new();
2339 for data in table_data {
2340 match data {
2341 TableData::Rows(rows) => all_rows.extend(rows),
2342 TableData::Batches(_) => all_data.push(data),
2343 }
2344 }
2345 differential_dataflow::consolidation::consolidate(&mut all_rows);
2346 all_data.push(TableData::Rows(all_rows));
2347
2348 all_appends.push((item_id, all_data));
2350 }
2351
2352 let fut = self
2353 .controller
2354 .storage
2355 .append_table(min_timestamp, boot_ts.step_forward(), all_appends)
2356 .expect("cannot fail to append");
2357 async {
2358 fut.await
2359 .expect("One-shot shouldn't be dropped during bootstrap")
2360 .unwrap_or_terminate("cannot fail to append")
2361 }
2362 .boxed()
2363 }
2364 } else {
2365 futures::future::ready(()).boxed()
2366 };
2367
2368 info!(
2369 "startup: coordinator init: bootstrap: postamble complete in {:?}",
2370 postamble_start.elapsed()
2371 );
2372
2373 let builtin_update_start = Instant::now();
2374 info!("startup: coordinator init: bootstrap: generate builtin updates beginning");
2375
2376 if self.controller.read_only() {
2377 info!(
2378 "coordinator init: bootstrap: stashing builtin table updates while in read-only mode"
2379 );
2380
2381 let audit_join_start = Instant::now();
2383 info!("startup: coordinator init: bootstrap: audit log deserialization beginning");
2384 let audit_log_updates: Vec<_> = audit_logs_iterator
2385 .map(|(audit_log, ts)| StateUpdate {
2386 kind: StateUpdateKind::AuditLog(audit_log),
2387 ts,
2388 diff: StateDiff::Addition,
2389 })
2390 .collect();
2391 let audit_log_builtin_table_updates = self
2392 .catalog()
2393 .state()
2394 .generate_builtin_table_updates(audit_log_updates);
2395 builtin_table_updates.extend(audit_log_builtin_table_updates);
2396 info!(
2397 "startup: coordinator init: bootstrap: audit log deserialization complete in {:?}",
2398 audit_join_start.elapsed()
2399 );
2400 self.buffered_builtin_table_updates
2401 .as_mut()
2402 .expect("in read-only mode")
2403 .append(&mut builtin_table_updates);
2404 } else {
2405 self.bootstrap_tables(&entries, builtin_table_updates, audit_logs_iterator)
2406 .await;
2407 };
2408 info!(
2409 "startup: coordinator init: bootstrap: generate builtin updates complete in {:?}",
2410 builtin_update_start.elapsed()
2411 );
2412
2413 let cleanup_secrets_start = Instant::now();
2414 info!("startup: coordinator init: bootstrap: generate secret cleanup beginning");
2415 {
2419 let Self {
2422 secrets_controller,
2423 catalog,
2424 ..
2425 } = self;
2426
2427 let next_user_item_id = catalog.get_next_user_item_id().await?;
2428 let next_system_item_id = catalog.get_next_system_item_id().await?;
2429 let read_only = self.controller.read_only();
2430 let catalog_ids: BTreeSet<CatalogItemId> =
2435 catalog.entries().map(|entry| entry.id()).collect();
2436 let secrets_controller = Arc::clone(secrets_controller);
2437
2438 spawn(|| "cleanup-orphaned-secrets", async move {
2439 if read_only {
2440 info!(
2441 "coordinator init: not cleaning up orphaned secrets while in read-only mode"
2442 );
2443 return;
2444 }
2445 info!("coordinator init: cleaning up orphaned secrets");
2446
2447 match secrets_controller.list().await {
2448 Ok(controller_secrets) => {
2449 let controller_secrets: BTreeSet<CatalogItemId> =
2450 controller_secrets.into_iter().collect();
2451 let orphaned = controller_secrets.difference(&catalog_ids);
2452 for id in orphaned {
2453 let id_too_large = match id {
2454 CatalogItemId::System(id) => *id >= next_system_item_id,
2455 CatalogItemId::User(id) => *id >= next_user_item_id,
2456 CatalogItemId::IntrospectionSourceIndex(_)
2457 | CatalogItemId::Transient(_) => false,
2458 };
2459 if id_too_large {
2460 info!(
2461 %next_user_item_id, %next_system_item_id,
2462 "coordinator init: not deleting orphaned secret {id} that was likely created by a newer deploy generation"
2463 );
2464 } else {
2465 info!("coordinator init: deleting orphaned secret {id}");
2466 fail_point!("orphan_secrets");
2467 if let Err(e) = secrets_controller.delete(*id).await {
2468 warn!(
2469 "Dropping orphaned secret has encountered an error: {}",
2470 e
2471 );
2472 }
2473 }
2474 }
2475 }
2476 Err(e) => warn!("Failed to list secrets during orphan cleanup: {:?}", e),
2477 }
2478 });
2479 }
2480 info!(
2481 "startup: coordinator init: bootstrap: generate secret cleanup complete in {:?}",
2482 cleanup_secrets_start.elapsed()
2483 );
2484
2485 let final_steps_start = Instant::now();
2487 info!(
2488 "startup: coordinator init: bootstrap: migrate builtin tables in read-only mode beginning"
2489 );
2490 migrated_updates_fut
2491 .instrument(info_span!("coord::bootstrap::final"))
2492 .await;
2493
2494 debug!(
2495 "startup: coordinator init: bootstrap: announcing completion of initialization to controller"
2496 );
2497 self.controller.initialization_complete();
2499
2500 self.bootstrap_introspection_subscribes().await;
2502
2503 info!(
2504 "startup: coordinator init: bootstrap: migrate builtin tables in read-only mode complete in {:?}",
2505 final_steps_start.elapsed()
2506 );
2507
2508 info!(
2509 "startup: coordinator init: bootstrap complete in {:?}",
2510 bootstrap_start.elapsed()
2511 );
2512 Ok(())
2513 }
2514
2515 #[allow(clippy::async_yields_async)]
2520 #[instrument]
2521 async fn bootstrap_tables(
2522 &mut self,
2523 entries: &[CatalogEntry],
2524 mut builtin_table_updates: Vec<BuiltinTableUpdate>,
2525 audit_logs_iterator: AuditLogIterator,
2526 ) {
2527 struct TableMetadata<'a> {
2529 id: CatalogItemId,
2530 name: &'a QualifiedItemName,
2531 table: &'a Table,
2532 }
2533
2534 let table_metas: Vec<_> = entries
2536 .into_iter()
2537 .filter_map(|entry| {
2538 entry.table().map(|table| TableMetadata {
2539 id: entry.id(),
2540 name: entry.name(),
2541 table,
2542 })
2543 })
2544 .collect();
2545
2546 debug!("coordinator init: advancing all tables to current timestamp");
2548 let WriteTimestamp {
2549 timestamp: write_ts,
2550 advance_to,
2551 } = self.get_local_write_ts().await;
2552 let appends = table_metas
2553 .iter()
2554 .map(|meta| (meta.table.global_id_writes(), Vec::new()))
2555 .collect();
2556 let table_fence_rx = self
2560 .controller
2561 .storage
2562 .append_table(write_ts.clone(), advance_to, appends)
2563 .expect("invalid updates");
2564
2565 self.apply_local_write(write_ts).await;
2566
2567 debug!("coordinator init: resetting system tables");
2569 let read_ts = self.get_local_read_ts().await;
2570
2571 let mz_storage_usage_by_shard_schema: SchemaSpecifier = self
2574 .catalog()
2575 .resolve_system_schema(MZ_STORAGE_USAGE_BY_SHARD.schema)
2576 .into();
2577 let is_storage_usage_by_shard = |meta: &TableMetadata| -> bool {
2578 meta.name.item == MZ_STORAGE_USAGE_BY_SHARD.name
2579 && meta.name.qualifiers.schema_spec == mz_storage_usage_by_shard_schema
2580 };
2581
2582 let mut retraction_tasks = Vec::new();
2583 let mut system_tables: Vec<_> = table_metas
2584 .iter()
2585 .filter(|meta| meta.id.is_system() && !is_storage_usage_by_shard(meta))
2586 .collect();
2587
2588 let (audit_events_idx, _) = system_tables
2590 .iter()
2591 .find_position(|table| {
2592 table.id == self.catalog().resolve_builtin_table(&MZ_AUDIT_EVENTS)
2593 })
2594 .expect("mz_audit_events must exist");
2595 let audit_events = system_tables.remove(audit_events_idx);
2596 let audit_log_task = self.bootstrap_audit_log_table(
2597 audit_events.id,
2598 audit_events.name,
2599 audit_events.table,
2600 audit_logs_iterator,
2601 read_ts,
2602 );
2603
2604 for system_table in system_tables {
2605 let table_id = system_table.id;
2606 let full_name = self.catalog().resolve_full_name(system_table.name, None);
2607 debug!("coordinator init: resetting system table {full_name} ({table_id})");
2608
2609 let snapshot_fut = self
2611 .controller
2612 .storage_collections
2613 .snapshot_cursor(system_table.table.global_id_writes(), read_ts);
2614 let batch_fut = self
2615 .controller
2616 .storage_collections
2617 .create_update_builder(system_table.table.global_id_writes());
2618
2619 let task = spawn(|| format!("snapshot-{table_id}"), async move {
2620 let mut batch = batch_fut
2622 .await
2623 .unwrap_or_terminate("cannot fail to create a batch for a BuiltinTable");
2624 tracing::info!(?table_id, "starting snapshot");
2625 let mut snapshot_cursor = snapshot_fut
2627 .await
2628 .unwrap_or_terminate("cannot fail to snapshot");
2629
2630 while let Some(values) = snapshot_cursor.next().await {
2632 for (key, _t, d) in values {
2633 let d_invert = d.neg();
2634 batch.add(&key, &(), &d_invert).await;
2635 }
2636 }
2637 tracing::info!(?table_id, "finished snapshot");
2638
2639 let batch = batch.finish().await;
2640 BuiltinTableUpdate::batch(table_id, batch)
2641 });
2642 retraction_tasks.push(task);
2643 }
2644
2645 let retractions_res = futures::future::join_all(retraction_tasks).await;
2646 for retractions in retractions_res {
2647 builtin_table_updates.push(retractions);
2648 }
2649
2650 let audit_join_start = Instant::now();
2651 info!("startup: coordinator init: bootstrap: join audit log deserialization beginning");
2652 let audit_log_updates = audit_log_task.await;
2653 let audit_log_builtin_table_updates = self
2654 .catalog()
2655 .state()
2656 .generate_builtin_table_updates(audit_log_updates);
2657 builtin_table_updates.extend(audit_log_builtin_table_updates);
2658 info!(
2659 "startup: coordinator init: bootstrap: join audit log deserialization complete in {:?}",
2660 audit_join_start.elapsed()
2661 );
2662
2663 table_fence_rx
2665 .await
2666 .expect("One-shot shouldn't be dropped during bootstrap")
2667 .unwrap_or_terminate("cannot fail to append");
2668
2669 info!("coordinator init: sending builtin table updates");
2670 let (_builtin_updates_fut, write_ts) = self
2671 .builtin_table_update()
2672 .execute(builtin_table_updates)
2673 .await;
2674 info!(?write_ts, "our write ts");
2675 if let Some(write_ts) = write_ts {
2676 self.apply_local_write(write_ts).await;
2677 }
2678 }
2679
2680 #[instrument]
2684 fn bootstrap_audit_log_table<'a>(
2685 &self,
2686 table_id: CatalogItemId,
2687 name: &'a QualifiedItemName,
2688 table: &'a Table,
2689 audit_logs_iterator: AuditLogIterator,
2690 read_ts: Timestamp,
2691 ) -> JoinHandle<Vec<StateUpdate>> {
2692 let full_name = self.catalog().resolve_full_name(name, None);
2693 debug!("coordinator init: reconciling audit log: {full_name} ({table_id})");
2694 let current_contents_fut = self
2695 .controller
2696 .storage_collections
2697 .snapshot(table.global_id_writes(), read_ts);
2698 spawn(|| format!("snapshot-audit-log-{table_id}"), async move {
2699 let current_contents = current_contents_fut
2700 .await
2701 .unwrap_or_terminate("cannot fail to fetch snapshot");
2702 let contents_len = current_contents.len();
2703 debug!("coordinator init: audit log table ({table_id}) size {contents_len}");
2704
2705 let max_table_id = current_contents
2707 .into_iter()
2708 .filter(|(_, diff)| *diff == 1)
2709 .map(|(row, _diff)| row.unpack_first().unwrap_uint64())
2710 .sorted()
2711 .rev()
2712 .next();
2713
2714 audit_logs_iterator
2716 .take_while(|(audit_log, _)| match max_table_id {
2717 Some(id) => audit_log.event.sortable_id() > id,
2718 None => true,
2719 })
2720 .map(|(audit_log, ts)| StateUpdate {
2721 kind: StateUpdateKind::AuditLog(audit_log),
2722 ts,
2723 diff: StateDiff::Addition,
2724 })
2725 .collect::<Vec<_>>()
2726 })
2727 }
2728
2729 #[instrument]
2742 async fn bootstrap_storage_collections(
2743 &mut self,
2744 migrated_storage_collections: &BTreeSet<CatalogItemId>,
2745 ) {
2746 let catalog = self.catalog();
2747 let source_status_collection_id = catalog
2748 .resolve_builtin_storage_collection(&mz_catalog::builtin::MZ_SOURCE_STATUS_HISTORY);
2749 let source_status_collection_id = catalog
2750 .get_entry(&source_status_collection_id)
2751 .latest_global_id();
2752
2753 let source_desc = |object_id: GlobalId,
2754 data_source: &DataSourceDesc,
2755 desc: &RelationDesc,
2756 timeline: &Timeline| {
2757 let (data_source, status_collection_id) = match data_source.clone() {
2758 DataSourceDesc::Ingestion { desc, cluster_id } => {
2760 let desc = desc.into_inline_connection(catalog.state());
2761 let ingestion = IngestionDescription::new(desc, cluster_id, object_id);
2762
2763 (
2764 DataSource::Ingestion(ingestion),
2765 Some(source_status_collection_id),
2766 )
2767 }
2768 DataSourceDesc::OldSyntaxIngestion {
2769 desc,
2770 progress_subsource,
2771 data_config,
2772 details,
2773 cluster_id,
2774 } => {
2775 let desc = desc.into_inline_connection(catalog.state());
2776 let data_config = data_config.into_inline_connection(catalog.state());
2777 let progress_subsource =
2780 catalog.get_entry(&progress_subsource).latest_global_id();
2781 let mut ingestion =
2782 IngestionDescription::new(desc, cluster_id, progress_subsource);
2783 let legacy_export = SourceExport {
2784 storage_metadata: (),
2785 data_config,
2786 details,
2787 };
2788 ingestion.source_exports.insert(object_id, legacy_export);
2789
2790 (
2791 DataSource::Ingestion(ingestion),
2792 Some(source_status_collection_id),
2793 )
2794 }
2795 DataSourceDesc::IngestionExport {
2796 ingestion_id,
2797 external_reference: _,
2798 details,
2799 data_config,
2800 } => {
2801 let ingestion_id = catalog.get_entry(&ingestion_id).latest_global_id();
2804 (
2805 DataSource::IngestionExport {
2806 ingestion_id,
2807 details,
2808 data_config: data_config.into_inline_connection(catalog.state()),
2809 },
2810 Some(source_status_collection_id),
2811 )
2812 }
2813 DataSourceDesc::Webhook { .. } => {
2814 (DataSource::Webhook, Some(source_status_collection_id))
2815 }
2816 DataSourceDesc::Progress => (DataSource::Progress, None),
2817 DataSourceDesc::Introspection(introspection) => {
2818 (DataSource::Introspection(introspection), None)
2819 }
2820 };
2821 CollectionDescription {
2822 desc: desc.clone(),
2823 data_source,
2824 since: None,
2825 status_collection_id,
2826 timeline: Some(timeline.clone()),
2827 primary: None,
2828 }
2829 };
2830
2831 let mut compute_collections = vec![];
2832 let mut collections = vec![];
2833 let mut new_builtin_continual_tasks = vec![];
2834 for entry in catalog.entries() {
2835 match entry.item() {
2836 CatalogItem::Source(source) => {
2837 collections.push((
2838 source.global_id(),
2839 source_desc(
2840 source.global_id(),
2841 &source.data_source,
2842 &source.desc,
2843 &source.timeline,
2844 ),
2845 ));
2846 }
2847 CatalogItem::Table(table) => {
2848 match &table.data_source {
2849 TableDataSource::TableWrites { defaults: _ } => {
2850 let versions: BTreeMap<_, _> = table
2851 .collection_descs()
2852 .map(|(gid, version, desc)| (version, (gid, desc)))
2853 .collect();
2854 let collection_descs = versions.iter().map(|(version, (gid, desc))| {
2855 let next_version = version.bump();
2856 let primary_collection =
2857 versions.get(&next_version).map(|(gid, _desc)| gid).copied();
2858 let mut collection_desc =
2859 CollectionDescription::for_table(desc.clone());
2860 collection_desc.primary = primary_collection;
2861
2862 (*gid, collection_desc)
2863 });
2864 collections.extend(collection_descs);
2865 }
2866 TableDataSource::DataSource {
2867 desc: data_source_desc,
2868 timeline,
2869 } => {
2870 soft_assert_eq_or_log!(table.collections.len(), 1);
2872 let collection_descs =
2873 table.collection_descs().map(|(gid, _version, desc)| {
2874 (
2875 gid,
2876 source_desc(
2877 entry.latest_global_id(),
2878 data_source_desc,
2879 &desc,
2880 timeline,
2881 ),
2882 )
2883 });
2884 collections.extend(collection_descs);
2885 }
2886 };
2887 }
2888 CatalogItem::MaterializedView(mv) => {
2889 let collection_descs = mv.collection_descs().map(|(gid, _version, desc)| {
2890 let collection_desc =
2891 CollectionDescription::for_other(desc, mv.initial_as_of.clone());
2892 (gid, collection_desc)
2893 });
2894
2895 collections.extend(collection_descs);
2896 compute_collections.push((mv.global_id_writes(), mv.desc.latest()));
2897 }
2898 CatalogItem::ContinualTask(ct) => {
2899 let collection_desc =
2900 CollectionDescription::for_other(ct.desc.clone(), ct.initial_as_of.clone());
2901 if ct.global_id().is_system() && collection_desc.since.is_none() {
2902 new_builtin_continual_tasks.push((ct.global_id(), collection_desc));
2906 } else {
2907 compute_collections.push((ct.global_id(), ct.desc.clone()));
2908 collections.push((ct.global_id(), collection_desc));
2909 }
2910 }
2911 CatalogItem::Sink(sink) => {
2912 let storage_sink_from_entry = self.catalog().get_entry_by_global_id(&sink.from);
2913 let from_desc = storage_sink_from_entry
2914 .relation_desc()
2915 .expect("sinks can only be built on items with descs")
2916 .into_owned();
2917 let collection_desc = CollectionDescription {
2918 desc: KAFKA_PROGRESS_DESC.clone(),
2920 data_source: DataSource::Sink {
2921 desc: ExportDescription {
2922 sink: StorageSinkDesc {
2923 from: sink.from,
2924 from_desc,
2925 connection: sink
2926 .connection
2927 .clone()
2928 .into_inline_connection(self.catalog().state()),
2929 envelope: sink.envelope,
2930 as_of: Antichain::from_elem(Timestamp::minimum()),
2931 with_snapshot: sink.with_snapshot,
2932 version: sink.version,
2933 from_storage_metadata: (),
2934 to_storage_metadata: (),
2935 commit_interval: sink.commit_interval,
2936 },
2937 instance_id: sink.cluster_id,
2938 },
2939 },
2940 since: None,
2941 status_collection_id: None,
2942 timeline: None,
2943 primary: None,
2944 };
2945 collections.push((sink.global_id, collection_desc));
2946 }
2947 _ => (),
2948 }
2949 }
2950
2951 let register_ts = if self.controller.read_only() {
2952 self.get_local_read_ts().await
2953 } else {
2954 self.get_local_write_ts().await.timestamp
2957 };
2958
2959 let storage_metadata = self.catalog.state().storage_metadata();
2960 let migrated_storage_collections = migrated_storage_collections
2961 .into_iter()
2962 .flat_map(|item_id| self.catalog.get_entry(item_id).global_ids())
2963 .collect();
2964
2965 self.controller
2970 .storage
2971 .evolve_nullability_for_bootstrap(storage_metadata, compute_collections)
2972 .await
2973 .unwrap_or_terminate("cannot fail to evolve collections");
2974
2975 self.controller
2976 .storage
2977 .create_collections_for_bootstrap(
2978 storage_metadata,
2979 Some(register_ts),
2980 collections,
2981 &migrated_storage_collections,
2982 )
2983 .await
2984 .unwrap_or_terminate("cannot fail to create collections");
2985
2986 self.bootstrap_builtin_continual_tasks(new_builtin_continual_tasks)
2987 .await;
2988
2989 if !self.controller.read_only() {
2990 self.apply_local_write(register_ts).await;
2991 }
2992 }
2993
2994 async fn bootstrap_builtin_continual_tasks(
3001 &mut self,
3002 mut collections: Vec<(GlobalId, CollectionDescription<Timestamp>)>,
3004 ) {
3005 for (id, collection) in &mut collections {
3006 let entry = self.catalog.get_entry_by_global_id(id);
3007 let ct = match &entry.item {
3008 CatalogItem::ContinualTask(ct) => ct.clone(),
3009 _ => unreachable!("only called with continual task builtins"),
3010 };
3011 let debug_name = self
3012 .catalog()
3013 .resolve_full_name(entry.name(), None)
3014 .to_string();
3015 let (_optimized_plan, physical_plan, _metainfo) = self
3016 .optimize_create_continual_task(&ct, *id, self.owned_catalog(), debug_name)
3017 .expect("builtin CT should optimize successfully");
3018
3019 let mut id_bundle = dataflow_import_id_bundle(&physical_plan, ct.cluster_id);
3021 id_bundle.storage_ids.remove(id);
3023 let read_holds = self.acquire_read_holds(&id_bundle);
3024 let as_of = read_holds.least_valid_read();
3025
3026 collection.since = Some(as_of.clone());
3027 }
3028 self.controller
3029 .storage
3030 .create_collections(self.catalog.state().storage_metadata(), None, collections)
3031 .await
3032 .unwrap_or_terminate("cannot fail to create collections");
3033 }
3034
3035 #[instrument]
3046 fn bootstrap_dataflow_plans(
3047 &mut self,
3048 ordered_catalog_entries: &[CatalogEntry],
3049 mut cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
3050 ) -> Result<BTreeMap<GlobalId, GlobalExpressions>, AdapterError> {
3051 let mut instance_snapshots = BTreeMap::new();
3057 let mut uncached_expressions = BTreeMap::new();
3058
3059 let optimizer_config = OptimizerConfig::from(self.catalog().system_config());
3060
3061 for entry in ordered_catalog_entries {
3062 match entry.item() {
3063 CatalogItem::Index(idx) => {
3064 let compute_instance =
3066 instance_snapshots.entry(idx.cluster_id).or_insert_with(|| {
3067 self.instance_snapshot(idx.cluster_id)
3068 .expect("compute instance exists")
3069 });
3070 let global_id = idx.global_id();
3071
3072 if compute_instance.contains_collection(&global_id) {
3075 continue;
3076 }
3077
3078 let (optimized_plan, physical_plan, metainfo) =
3079 match cached_global_exprs.remove(&global_id) {
3080 Some(global_expressions)
3081 if global_expressions.optimizer_features
3082 == optimizer_config.features =>
3083 {
3084 debug!("global expression cache hit for {global_id:?}");
3085 (
3086 global_expressions.global_mir,
3087 global_expressions.physical_plan,
3088 global_expressions.dataflow_metainfos,
3089 )
3090 }
3091 Some(_) | None => {
3092 let (optimized_plan, global_lir_plan) = {
3093 let mut optimizer = optimize::index::Optimizer::new(
3095 self.owned_catalog(),
3096 compute_instance.clone(),
3097 global_id,
3098 optimizer_config.clone(),
3099 self.optimizer_metrics(),
3100 );
3101
3102 let index_plan = optimize::index::Index::new(
3104 entry.name().clone(),
3105 idx.on,
3106 idx.keys.to_vec(),
3107 );
3108 let global_mir_plan = optimizer.optimize(index_plan)?;
3109 let optimized_plan = global_mir_plan.df_desc().clone();
3110
3111 let global_lir_plan = optimizer.optimize(global_mir_plan)?;
3113
3114 (optimized_plan, global_lir_plan)
3115 };
3116
3117 let (physical_plan, metainfo) = global_lir_plan.unapply();
3118 let metainfo = {
3119 let notice_ids =
3121 std::iter::repeat_with(|| self.allocate_transient_id())
3122 .map(|(_item_id, gid)| gid)
3123 .take(metainfo.optimizer_notices.len())
3124 .collect::<Vec<_>>();
3125 self.catalog().render_notices(
3127 metainfo,
3128 notice_ids,
3129 Some(idx.global_id()),
3130 )
3131 };
3132 uncached_expressions.insert(
3133 global_id,
3134 GlobalExpressions {
3135 global_mir: optimized_plan.clone(),
3136 physical_plan: physical_plan.clone(),
3137 dataflow_metainfos: metainfo.clone(),
3138 optimizer_features: OptimizerFeatures::from(
3139 self.catalog().system_config(),
3140 ),
3141 },
3142 );
3143 (optimized_plan, physical_plan, metainfo)
3144 }
3145 };
3146
3147 let catalog = self.catalog_mut();
3148 catalog.set_optimized_plan(idx.global_id(), optimized_plan);
3149 catalog.set_physical_plan(idx.global_id(), physical_plan);
3150 catalog.set_dataflow_metainfo(idx.global_id(), metainfo);
3151
3152 compute_instance.insert_collection(idx.global_id());
3153 }
3154 CatalogItem::MaterializedView(mv) => {
3155 let compute_instance =
3157 instance_snapshots.entry(mv.cluster_id).or_insert_with(|| {
3158 self.instance_snapshot(mv.cluster_id)
3159 .expect("compute instance exists")
3160 });
3161 let global_id = mv.global_id_writes();
3162
3163 let (optimized_plan, physical_plan, metainfo) =
3164 match cached_global_exprs.remove(&global_id) {
3165 Some(global_expressions)
3166 if global_expressions.optimizer_features
3167 == optimizer_config.features =>
3168 {
3169 debug!("global expression cache hit for {global_id:?}");
3170 (
3171 global_expressions.global_mir,
3172 global_expressions.physical_plan,
3173 global_expressions.dataflow_metainfos,
3174 )
3175 }
3176 Some(_) | None => {
3177 let (_, internal_view_id) = self.allocate_transient_id();
3178 let debug_name = self
3179 .catalog()
3180 .resolve_full_name(entry.name(), None)
3181 .to_string();
3182 let force_non_monotonic = Default::default();
3183
3184 let (optimized_plan, global_lir_plan) = {
3185 let mut optimizer = optimize::materialized_view::Optimizer::new(
3187 self.owned_catalog().as_optimizer_catalog(),
3188 compute_instance.clone(),
3189 global_id,
3190 internal_view_id,
3191 mv.desc.latest().iter_names().cloned().collect(),
3192 mv.non_null_assertions.clone(),
3193 mv.refresh_schedule.clone(),
3194 debug_name,
3195 optimizer_config.clone(),
3196 self.optimizer_metrics(),
3197 force_non_monotonic,
3198 );
3199
3200 let global_mir_plan =
3202 optimizer.optimize(mv.optimized_expr.as_ref().clone())?;
3203 let optimized_plan = global_mir_plan.df_desc().clone();
3204
3205 let global_lir_plan = optimizer.optimize(global_mir_plan)?;
3207
3208 (optimized_plan, global_lir_plan)
3209 };
3210
3211 let (physical_plan, metainfo) = global_lir_plan.unapply();
3212 let metainfo = {
3213 let notice_ids =
3215 std::iter::repeat_with(|| self.allocate_transient_id())
3216 .map(|(_item_id, global_id)| global_id)
3217 .take(metainfo.optimizer_notices.len())
3218 .collect::<Vec<_>>();
3219 self.catalog().render_notices(
3221 metainfo,
3222 notice_ids,
3223 Some(mv.global_id_writes()),
3224 )
3225 };
3226 uncached_expressions.insert(
3227 global_id,
3228 GlobalExpressions {
3229 global_mir: optimized_plan.clone(),
3230 physical_plan: physical_plan.clone(),
3231 dataflow_metainfos: metainfo.clone(),
3232 optimizer_features: OptimizerFeatures::from(
3233 self.catalog().system_config(),
3234 ),
3235 },
3236 );
3237 (optimized_plan, physical_plan, metainfo)
3238 }
3239 };
3240
3241 let catalog = self.catalog_mut();
3242 catalog.set_optimized_plan(mv.global_id_writes(), optimized_plan);
3243 catalog.set_physical_plan(mv.global_id_writes(), physical_plan);
3244 catalog.set_dataflow_metainfo(mv.global_id_writes(), metainfo);
3245
3246 compute_instance.insert_collection(mv.global_id_writes());
3247 }
3248 CatalogItem::ContinualTask(ct) => {
3249 let compute_instance =
3250 instance_snapshots.entry(ct.cluster_id).or_insert_with(|| {
3251 self.instance_snapshot(ct.cluster_id)
3252 .expect("compute instance exists")
3253 });
3254 let global_id = ct.global_id();
3255
3256 let (optimized_plan, physical_plan, metainfo) =
3257 match cached_global_exprs.remove(&global_id) {
3258 Some(global_expressions)
3259 if global_expressions.optimizer_features
3260 == optimizer_config.features =>
3261 {
3262 debug!("global expression cache hit for {global_id:?}");
3263 (
3264 global_expressions.global_mir,
3265 global_expressions.physical_plan,
3266 global_expressions.dataflow_metainfos,
3267 )
3268 }
3269 Some(_) | None => {
3270 let debug_name = self
3271 .catalog()
3272 .resolve_full_name(entry.name(), None)
3273 .to_string();
3274 let (optimized_plan, physical_plan, metainfo) = self
3275 .optimize_create_continual_task(
3276 ct,
3277 global_id,
3278 self.owned_catalog(),
3279 debug_name,
3280 )?;
3281 uncached_expressions.insert(
3282 global_id,
3283 GlobalExpressions {
3284 global_mir: optimized_plan.clone(),
3285 physical_plan: physical_plan.clone(),
3286 dataflow_metainfos: metainfo.clone(),
3287 optimizer_features: OptimizerFeatures::from(
3288 self.catalog().system_config(),
3289 ),
3290 },
3291 );
3292 (optimized_plan, physical_plan, metainfo)
3293 }
3294 };
3295
3296 let catalog = self.catalog_mut();
3297 catalog.set_optimized_plan(ct.global_id(), optimized_plan);
3298 catalog.set_physical_plan(ct.global_id(), physical_plan);
3299 catalog.set_dataflow_metainfo(ct.global_id(), metainfo);
3300
3301 compute_instance.insert_collection(ct.global_id());
3302 }
3303 _ => (),
3304 }
3305 }
3306
3307 Ok(uncached_expressions)
3308 }
3309
3310 async fn bootstrap_dataflow_as_ofs(&mut self) -> BTreeMap<GlobalId, ReadHold<Timestamp>> {
3320 let mut catalog_ids = Vec::new();
3321 let mut dataflows = Vec::new();
3322 let mut read_policies = BTreeMap::new();
3323 for entry in self.catalog.entries() {
3324 let gid = match entry.item() {
3325 CatalogItem::Index(idx) => idx.global_id(),
3326 CatalogItem::MaterializedView(mv) => mv.global_id_writes(),
3327 CatalogItem::ContinualTask(ct) => ct.global_id(),
3328 CatalogItem::Table(_)
3329 | CatalogItem::Source(_)
3330 | CatalogItem::Log(_)
3331 | CatalogItem::View(_)
3332 | CatalogItem::Sink(_)
3333 | CatalogItem::Type(_)
3334 | CatalogItem::Func(_)
3335 | CatalogItem::Secret(_)
3336 | CatalogItem::Connection(_) => continue,
3337 };
3338 if let Some(plan) = self.catalog.try_get_physical_plan(&gid) {
3339 catalog_ids.push(gid);
3340 dataflows.push(plan.clone());
3341
3342 if let Some(compaction_window) = entry.item().initial_logical_compaction_window() {
3343 read_policies.insert(gid, compaction_window.into());
3344 }
3345 }
3346 }
3347
3348 let read_ts = self.get_local_read_ts().await;
3349 let read_holds = as_of_selection::run(
3350 &mut dataflows,
3351 &read_policies,
3352 &*self.controller.storage_collections,
3353 read_ts,
3354 self.controller.read_only(),
3355 );
3356
3357 let catalog = self.catalog_mut();
3358 for (id, plan) in catalog_ids.into_iter().zip_eq(dataflows) {
3359 catalog.set_physical_plan(id, plan);
3360 }
3361
3362 read_holds
3363 }
3364
3365 fn serve(
3374 mut self,
3375 mut internal_cmd_rx: mpsc::UnboundedReceiver<Message>,
3376 mut strict_serializable_reads_rx: mpsc::UnboundedReceiver<(ConnectionId, PendingReadTxn)>,
3377 mut cmd_rx: mpsc::UnboundedReceiver<(OpenTelemetryContext, Command)>,
3378 group_commit_rx: appends::GroupCommitWaiter,
3379 ) -> LocalBoxFuture<'static, ()> {
3380 async move {
3381 let mut cluster_events = self.controller.events_stream();
3383 let last_message = Arc::new(Mutex::new(LastMessage {
3384 kind: "none",
3385 stmt: None,
3386 }));
3387
3388 let (idle_tx, mut idle_rx) = tokio::sync::mpsc::channel(1);
3389 let idle_metric = self.metrics.queue_busy_seconds.clone();
3390 let last_message_watchdog = Arc::clone(&last_message);
3391
3392 spawn(|| "coord watchdog", async move {
3393 let mut interval = tokio::time::interval(Duration::from_secs(5));
3398 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
3402
3403 let mut coord_stuck = false;
3405
3406 loop {
3407 interval.tick().await;
3408
3409 let duration = tokio::time::Duration::from_secs(30);
3411 let timeout = tokio::time::timeout(duration, idle_tx.reserve()).await;
3412 let Ok(maybe_permit) = timeout else {
3413 if !coord_stuck {
3415 let last_message = last_message_watchdog.lock().expect("poisoned");
3416 tracing::warn!(
3417 last_message_kind = %last_message.kind,
3418 last_message_sql = %last_message.stmt_to_string(),
3419 "coordinator stuck for {duration:?}",
3420 );
3421 }
3422 coord_stuck = true;
3423
3424 continue;
3425 };
3426
3427 if coord_stuck {
3429 tracing::info!("Coordinator became unstuck");
3430 }
3431 coord_stuck = false;
3432
3433 let Ok(permit) = maybe_permit else {
3435 break;
3436 };
3437
3438 permit.send(idle_metric.start_timer());
3439 }
3440 });
3441
3442 self.schedule_storage_usage_collection().await;
3443 self.spawn_privatelink_vpc_endpoints_watch_task();
3444 self.spawn_statement_logging_task();
3445 flags::tracing_config(self.catalog.system_config()).apply(&self.tracing_handle);
3446
3447 let warn_threshold = self
3449 .catalog()
3450 .system_config()
3451 .coord_slow_message_warn_threshold();
3452
3453 const MESSAGE_BATCH: usize = 64;
3455 let mut messages = Vec::with_capacity(MESSAGE_BATCH);
3456 let mut cmd_messages = Vec::with_capacity(MESSAGE_BATCH);
3457
3458 let message_batch = self.metrics.message_batch.clone();
3459
3460 loop {
3461 select! {
3465 biased;
3470
3471 _ = internal_cmd_rx.recv_many(&mut messages, MESSAGE_BATCH) => {},
3475 Some(event) = cluster_events.next() => messages.push(Message::ClusterEvent(event)),
3479 () = self.controller.ready() => {
3483 let controller = match self.controller.get_readiness() {
3487 Readiness::Storage => ControllerReadiness::Storage,
3488 Readiness::Compute => ControllerReadiness::Compute,
3489 Readiness::Metrics(_) => ControllerReadiness::Metrics,
3490 Readiness::Internal(_) => ControllerReadiness::Internal,
3491 Readiness::NotReady => unreachable!("just signaled as ready"),
3492 };
3493 messages.push(Message::ControllerReady { controller });
3494 }
3495 permit = group_commit_rx.ready() => {
3498 let user_write_spans = self.pending_writes.iter().flat_map(|x| match x {
3504 PendingWriteTxn::User{span, ..} => Some(span),
3505 PendingWriteTxn::System{..} => None,
3506 });
3507 let span = match user_write_spans.exactly_one() {
3508 Ok(span) => span.clone(),
3509 Err(user_write_spans) => {
3510 let span = info_span!(parent: None, "group_commit_notify");
3511 for s in user_write_spans {
3512 span.follows_from(s);
3513 }
3514 span
3515 }
3516 };
3517 messages.push(Message::GroupCommitInitiate(span, Some(permit)));
3518 },
3519 count = cmd_rx.recv_many(&mut cmd_messages, MESSAGE_BATCH) => {
3523 if count == 0 {
3524 break;
3525 } else {
3526 messages.extend(cmd_messages.drain(..).map(|(otel_ctx, cmd)| Message::Command(otel_ctx, cmd)));
3527 }
3528 },
3529 Some(pending_read_txn) = strict_serializable_reads_rx.recv() => {
3533 let mut pending_read_txns = vec![pending_read_txn];
3534 while let Ok(pending_read_txn) = strict_serializable_reads_rx.try_recv() {
3535 pending_read_txns.push(pending_read_txn);
3536 }
3537 for (conn_id, pending_read_txn) in pending_read_txns {
3538 let prev = self.pending_linearize_read_txns.insert(conn_id, pending_read_txn);
3539 soft_assert_or_log!(
3540 prev.is_none(),
3541 "connections can not have multiple concurrent reads, prev: {prev:?}"
3542 )
3543 }
3544 messages.push(Message::LinearizeReads);
3545 }
3546 _ = self.advance_timelines_interval.tick() => {
3550 let span = info_span!(parent: None, "coord::advance_timelines_interval");
3551 span.follows_from(Span::current());
3552
3553 if self.controller.read_only() {
3558 messages.push(Message::AdvanceTimelines);
3559 } else {
3560 messages.push(Message::GroupCommitInitiate(span, None));
3561 }
3562 },
3563 _ = self.check_cluster_scheduling_policies_interval.tick() => {
3567 messages.push(Message::CheckSchedulingPolicies);
3568 },
3569
3570 _ = self.caught_up_check_interval.tick() => {
3574 self.maybe_check_caught_up().await;
3579
3580 continue;
3581 },
3582
3583 timer = idle_rx.recv() => {
3588 timer.expect("does not drop").observe_duration();
3589 self.metrics
3590 .message_handling
3591 .with_label_values(&["watchdog"])
3592 .observe(0.0);
3593 continue;
3594 }
3595 };
3596
3597 message_batch.observe(f64::cast_lossy(messages.len()));
3599
3600 for msg in messages.drain(..) {
3601 let msg_kind = msg.kind();
3604 let span = span!(
3605 target: "mz_adapter::coord::handle_message_loop",
3606 Level::INFO,
3607 "coord::handle_message",
3608 kind = msg_kind
3609 );
3610 let otel_context = span.context().span().span_context().clone();
3611
3612 *last_message.lock().expect("poisoned") = LastMessage {
3616 kind: msg_kind,
3617 stmt: match &msg {
3618 Message::Command(
3619 _,
3620 Command::Execute {
3621 portal_name,
3622 session,
3623 ..
3624 },
3625 ) => session
3626 .get_portal_unverified(portal_name)
3627 .and_then(|p| p.stmt.as_ref().map(Arc::clone)),
3628 _ => None,
3629 },
3630 };
3631
3632 let start = Instant::now();
3633 self.handle_message(msg).instrument(span).await;
3634 let duration = start.elapsed();
3635
3636 self.metrics
3637 .message_handling
3638 .with_label_values(&[msg_kind])
3639 .observe(duration.as_secs_f64());
3640
3641 if duration > warn_threshold {
3643 let trace_id = otel_context.is_valid().then(|| otel_context.trace_id());
3644 tracing::error!(
3645 ?msg_kind,
3646 ?trace_id,
3647 ?duration,
3648 "very slow coordinator message"
3649 );
3650 }
3651 }
3652 }
3653 if let Some(catalog) = Arc::into_inner(self.catalog) {
3656 catalog.expire().await;
3657 }
3658 }
3659 .boxed_local()
3660 }
3661
3662 fn catalog(&self) -> &Catalog {
3664 &self.catalog
3665 }
3666
3667 fn owned_catalog(&self) -> Arc<Catalog> {
3670 Arc::clone(&self.catalog)
3671 }
3672
3673 fn optimizer_metrics(&self) -> OptimizerMetrics {
3676 self.optimizer_metrics.clone()
3677 }
3678
3679 fn catalog_mut(&mut self) -> &mut Catalog {
3681 Arc::make_mut(&mut self.catalog)
3689 }
3690
3691 fn connection_context(&self) -> &ConnectionContext {
3693 self.controller.connection_context()
3694 }
3695
3696 fn secrets_reader(&self) -> &Arc<dyn SecretsReader> {
3698 &self.connection_context().secrets_reader
3699 }
3700
3701 #[allow(dead_code)]
3706 pub(crate) fn broadcast_notice(&self, notice: AdapterNotice) {
3707 for meta in self.active_conns.values() {
3708 let _ = meta.notice_tx.send(notice.clone());
3709 }
3710 }
3711
3712 pub(crate) fn broadcast_notice_tx(
3715 &self,
3716 ) -> Box<dyn FnOnce(AdapterNotice) -> () + Send + 'static> {
3717 let senders: Vec<_> = self
3718 .active_conns
3719 .values()
3720 .map(|meta| meta.notice_tx.clone())
3721 .collect();
3722 Box::new(move |notice| {
3723 for tx in senders {
3724 let _ = tx.send(notice.clone());
3725 }
3726 })
3727 }
3728
3729 pub(crate) fn active_conns(&self) -> &BTreeMap<ConnectionId, ConnMeta> {
3730 &self.active_conns
3731 }
3732
3733 #[instrument(level = "debug")]
3734 pub(crate) fn retire_execution(
3735 &mut self,
3736 reason: StatementEndedExecutionReason,
3737 ctx_extra: ExecuteContextExtra,
3738 ) {
3739 if let Some(uuid) = ctx_extra.retire() {
3740 self.end_statement_execution(uuid, reason);
3741 }
3742 }
3743
3744 #[instrument(level = "debug")]
3746 pub fn dataflow_builder(&self, instance: ComputeInstanceId) -> DataflowBuilder<'_> {
3747 let compute = self
3748 .instance_snapshot(instance)
3749 .expect("compute instance does not exist");
3750 DataflowBuilder::new(self.catalog().state(), compute)
3751 }
3752
3753 pub fn instance_snapshot(
3755 &self,
3756 id: ComputeInstanceId,
3757 ) -> Result<ComputeInstanceSnapshot, InstanceMissing> {
3758 ComputeInstanceSnapshot::new(&self.controller, id)
3759 }
3760
3761 pub(crate) async fn ship_dataflow(
3768 &mut self,
3769 dataflow: DataflowDescription<Plan>,
3770 instance: ComputeInstanceId,
3771 subscribe_target_replica: Option<ReplicaId>,
3772 ) {
3773 self.try_ship_dataflow(dataflow, instance, subscribe_target_replica)
3774 .await
3775 .unwrap_or_terminate("dataflow creation cannot fail");
3776 }
3777
3778 pub(crate) async fn try_ship_dataflow(
3781 &mut self,
3782 dataflow: DataflowDescription<Plan>,
3783 instance: ComputeInstanceId,
3784 subscribe_target_replica: Option<ReplicaId>,
3785 ) -> Result<(), DataflowCreationError> {
3786 let export_ids = dataflow.exported_index_ids().collect();
3789
3790 self.controller
3791 .compute
3792 .create_dataflow(instance, dataflow, subscribe_target_replica)?;
3793
3794 self.initialize_compute_read_policies(export_ids, instance, CompactionWindow::Default)
3795 .await;
3796
3797 Ok(())
3798 }
3799
3800 pub(crate) fn allow_writes(&mut self, instance: ComputeInstanceId, id: GlobalId) {
3804 self.controller
3805 .compute
3806 .allow_writes(instance, id)
3807 .unwrap_or_terminate("allow_writes cannot fail");
3808 }
3809
3810 pub(crate) async fn ship_dataflow_and_notice_builtin_table_updates(
3812 &mut self,
3813 dataflow: DataflowDescription<Plan>,
3814 instance: ComputeInstanceId,
3815 notice_builtin_updates_fut: Option<BuiltinTableAppendNotify>,
3816 ) {
3817 if let Some(notice_builtin_updates_fut) = notice_builtin_updates_fut {
3818 let ship_dataflow_fut = self.ship_dataflow(dataflow, instance, None);
3819 let ((), ()) =
3820 futures::future::join(notice_builtin_updates_fut, ship_dataflow_fut).await;
3821 } else {
3822 self.ship_dataflow(dataflow, instance, None).await;
3823 }
3824 }
3825
3826 pub fn install_compute_watch_set(
3830 &mut self,
3831 conn_id: ConnectionId,
3832 objects: BTreeSet<GlobalId>,
3833 t: Timestamp,
3834 state: WatchSetResponse,
3835 ) -> Result<(), CollectionLookupError> {
3836 let ws_id = self.controller.install_compute_watch_set(objects, t)?;
3837 self.connection_watch_sets
3838 .entry(conn_id.clone())
3839 .or_default()
3840 .insert(ws_id);
3841 self.installed_watch_sets.insert(ws_id, (conn_id, state));
3842 Ok(())
3843 }
3844
3845 pub fn install_storage_watch_set(
3849 &mut self,
3850 conn_id: ConnectionId,
3851 objects: BTreeSet<GlobalId>,
3852 t: Timestamp,
3853 state: WatchSetResponse,
3854 ) -> Result<(), CollectionLookupError> {
3855 let ws_id = self.controller.install_storage_watch_set(objects, t)?;
3856 self.connection_watch_sets
3857 .entry(conn_id.clone())
3858 .or_default()
3859 .insert(ws_id);
3860 self.installed_watch_sets.insert(ws_id, (conn_id, state));
3861 Ok(())
3862 }
3863
3864 pub fn cancel_pending_watchsets(&mut self, conn_id: &ConnectionId) {
3866 if let Some(ws_ids) = self.connection_watch_sets.remove(conn_id) {
3867 for ws_id in ws_ids {
3868 self.installed_watch_sets.remove(&ws_id);
3869 }
3870 }
3871 }
3872
3873 pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
3877 let global_timelines: BTreeMap<_, _> = self
3883 .global_timelines
3884 .iter()
3885 .map(|(timeline, state)| (timeline.to_string(), format!("{state:?}")))
3886 .collect();
3887 let active_conns: BTreeMap<_, _> = self
3888 .active_conns
3889 .iter()
3890 .map(|(id, meta)| (id.unhandled().to_string(), format!("{meta:?}")))
3891 .collect();
3892 let txn_read_holds: BTreeMap<_, _> = self
3893 .txn_read_holds
3894 .iter()
3895 .map(|(id, capability)| (id.unhandled().to_string(), format!("{capability:?}")))
3896 .collect();
3897 let pending_peeks: BTreeMap<_, _> = self
3898 .pending_peeks
3899 .iter()
3900 .map(|(id, peek)| (id.to_string(), format!("{peek:?}")))
3901 .collect();
3902 let client_pending_peeks: BTreeMap<_, _> = self
3903 .client_pending_peeks
3904 .iter()
3905 .map(|(id, peek)| {
3906 let peek: BTreeMap<_, _> = peek
3907 .iter()
3908 .map(|(uuid, storage_id)| (uuid.to_string(), storage_id))
3909 .collect();
3910 (id.to_string(), peek)
3911 })
3912 .collect();
3913 let pending_linearize_read_txns: BTreeMap<_, _> = self
3914 .pending_linearize_read_txns
3915 .iter()
3916 .map(|(id, read_txn)| (id.unhandled().to_string(), format!("{read_txn:?}")))
3917 .collect();
3918
3919 Ok(serde_json::json!({
3920 "global_timelines": global_timelines,
3921 "active_conns": active_conns,
3922 "txn_read_holds": txn_read_holds,
3923 "pending_peeks": pending_peeks,
3924 "client_pending_peeks": client_pending_peeks,
3925 "pending_linearize_read_txns": pending_linearize_read_txns,
3926 "controller": self.controller.dump().await?,
3927 }))
3928 }
3929
3930 async fn prune_storage_usage_events_on_startup(&self, retention_period: Duration) {
3944 let item_id = self
3945 .catalog()
3946 .resolve_builtin_table(&MZ_STORAGE_USAGE_BY_SHARD);
3947 let global_id = self.catalog.get_entry(&item_id).latest_global_id();
3948 let read_ts = self.get_local_read_ts().await;
3949 let current_contents_fut = self
3950 .controller
3951 .storage_collections
3952 .snapshot(global_id, read_ts);
3953 let internal_cmd_tx = self.internal_cmd_tx.clone();
3954 spawn(|| "storage_usage_prune", async move {
3955 let mut current_contents = current_contents_fut
3956 .await
3957 .unwrap_or_terminate("cannot fail to fetch snapshot");
3958 differential_dataflow::consolidation::consolidate(&mut current_contents);
3959
3960 let cutoff_ts = u128::from(read_ts).saturating_sub(retention_period.as_millis());
3961 let mut expired = Vec::new();
3962 for (row, diff) in current_contents {
3963 assert_eq!(
3964 diff, 1,
3965 "consolidated contents should not contain retractions: ({row:#?}, {diff:#?})"
3966 );
3967 let collection_timestamp = row
3969 .unpack()
3970 .get(3)
3971 .expect("definition of mz_storage_by_shard changed")
3972 .unwrap_timestamptz();
3973 let collection_timestamp = collection_timestamp.timestamp_millis();
3974 let collection_timestamp: u128 = collection_timestamp
3975 .try_into()
3976 .expect("all collections happen after Jan 1 1970");
3977 if collection_timestamp < cutoff_ts {
3978 debug!("pruning storage event {row:?}");
3979 let builtin_update = BuiltinTableUpdate::row(item_id, row, Diff::MINUS_ONE);
3980 expired.push(builtin_update);
3981 }
3982 }
3983
3984 let _ = internal_cmd_tx.send(Message::StorageUsagePrune(expired));
3986 });
3987 }
3988
3989 fn current_credit_consumption_rate(&self) -> Numeric {
3990 self.catalog()
3991 .user_cluster_replicas()
3992 .filter_map(|replica| match &replica.config.location {
3993 ReplicaLocation::Managed(location) => Some(location.size_for_billing()),
3994 ReplicaLocation::Unmanaged(_) => None,
3995 })
3996 .map(|size| {
3997 self.catalog()
3998 .cluster_replica_sizes()
3999 .0
4000 .get(size)
4001 .expect("location size is validated against the cluster replica sizes")
4002 .credits_per_hour
4003 })
4004 .sum()
4005 }
4006}
4007
4008#[cfg(test)]
4009impl Coordinator {
4010 #[allow(dead_code)]
4011 async fn verify_ship_dataflow_no_error(&mut self, dataflow: DataflowDescription<Plan>) {
4012 let compute_instance = ComputeInstanceId::user(1).expect("1 is a valid ID");
4020
4021 let _: () = self.ship_dataflow(dataflow, compute_instance, None).await;
4022 }
4023}
4024
4025struct LastMessage {
4027 kind: &'static str,
4028 stmt: Option<Arc<Statement<Raw>>>,
4029}
4030
4031impl LastMessage {
4032 fn stmt_to_string(&self) -> Cow<'static, str> {
4034 self.stmt
4035 .as_ref()
4036 .map(|stmt| stmt.to_ast_string_redacted().into())
4037 .unwrap_or(Cow::Borrowed("<none>"))
4038 }
4039}
4040
4041impl fmt::Debug for LastMessage {
4042 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4043 f.debug_struct("LastMessage")
4044 .field("kind", &self.kind)
4045 .field("stmt", &self.stmt_to_string())
4046 .finish()
4047 }
4048}
4049
4050impl Drop for LastMessage {
4051 fn drop(&mut self) {
4052 if std::thread::panicking() {
4054 eprintln!("Coordinator panicking, dumping last message\n{self:?}",);
4056 }
4057 }
4058}
4059
4060pub fn serve(
4072 Config {
4073 controller_config,
4074 controller_envd_epoch,
4075 mut storage,
4076 audit_logs_iterator,
4077 timestamp_oracle_url,
4078 unsafe_mode,
4079 all_features,
4080 build_info,
4081 environment_id,
4082 metrics_registry,
4083 now,
4084 secrets_controller,
4085 cloud_resource_controller,
4086 cluster_replica_sizes,
4087 builtin_system_cluster_config,
4088 builtin_catalog_server_cluster_config,
4089 builtin_probe_cluster_config,
4090 builtin_support_cluster_config,
4091 builtin_analytics_cluster_config,
4092 system_parameter_defaults,
4093 availability_zones,
4094 storage_usage_client,
4095 storage_usage_collection_interval,
4096 storage_usage_retention_period,
4097 segment_client,
4098 egress_addresses,
4099 aws_account_id,
4100 aws_privatelink_availability_zones,
4101 connection_context,
4102 connection_limit_callback,
4103 remote_system_parameters,
4104 webhook_concurrency_limit,
4105 http_host_name,
4106 tracing_handle,
4107 read_only_controllers,
4108 caught_up_trigger: clusters_caught_up_trigger,
4109 helm_chart_version,
4110 license_key,
4111 external_login_password_mz_system,
4112 force_builtin_schema_migration,
4113 }: Config,
4114) -> BoxFuture<'static, Result<(Handle, Client), AdapterError>> {
4115 async move {
4116 let coord_start = Instant::now();
4117 info!("startup: coordinator init: beginning");
4118 info!("startup: coordinator init: preamble beginning");
4119
4120 let _builtins = LazyLock::force(&BUILTINS_STATIC);
4124
4125 let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
4126 let (internal_cmd_tx, internal_cmd_rx) = mpsc::unbounded_channel();
4127 let (strict_serializable_reads_tx, strict_serializable_reads_rx) =
4128 mpsc::unbounded_channel();
4129
4130 if !availability_zones.iter().all_unique() {
4132 coord_bail!("availability zones must be unique");
4133 }
4134
4135 let aws_principal_context = match (
4136 aws_account_id,
4137 connection_context.aws_external_id_prefix.clone(),
4138 ) {
4139 (Some(aws_account_id), Some(aws_external_id_prefix)) => Some(AwsPrincipalContext {
4140 aws_account_id,
4141 aws_external_id_prefix,
4142 }),
4143 _ => None,
4144 };
4145
4146 let aws_privatelink_availability_zones = aws_privatelink_availability_zones
4147 .map(|azs_vec| BTreeSet::from_iter(azs_vec.iter().cloned()));
4148
4149 info!(
4150 "startup: coordinator init: preamble complete in {:?}",
4151 coord_start.elapsed()
4152 );
4153 let oracle_init_start = Instant::now();
4154 info!("startup: coordinator init: timestamp oracle init beginning");
4155
4156 let pg_timestamp_oracle_config = timestamp_oracle_url
4157 .map(|pg_url| PostgresTimestampOracleConfig::new(&pg_url, &metrics_registry));
4158 let mut initial_timestamps =
4159 get_initial_oracle_timestamps(&pg_timestamp_oracle_config).await?;
4160
4161 initial_timestamps
4165 .entry(Timeline::EpochMilliseconds)
4166 .or_insert_with(mz_repr::Timestamp::minimum);
4167 let mut timestamp_oracles = BTreeMap::new();
4168 for (timeline, initial_timestamp) in initial_timestamps {
4169 Coordinator::ensure_timeline_state_with_initial_time(
4170 &timeline,
4171 initial_timestamp,
4172 now.clone(),
4173 pg_timestamp_oracle_config.clone(),
4174 &mut timestamp_oracles,
4175 read_only_controllers,
4176 )
4177 .await;
4178 }
4179
4180 let catalog_upper = storage.current_upper().await;
4184 let epoch_millis_oracle = ×tamp_oracles
4190 .get(&Timeline::EpochMilliseconds)
4191 .expect("inserted above")
4192 .oracle;
4193
4194 let mut boot_ts = if read_only_controllers {
4195 let read_ts = epoch_millis_oracle.read_ts().await;
4196 std::cmp::max(read_ts, catalog_upper)
4197 } else {
4198 epoch_millis_oracle.apply_write(catalog_upper).await;
4201 epoch_millis_oracle.write_ts().await.timestamp
4202 };
4203
4204 info!(
4205 "startup: coordinator init: timestamp oracle init complete in {:?}",
4206 oracle_init_start.elapsed()
4207 );
4208
4209 let catalog_open_start = Instant::now();
4210 info!("startup: coordinator init: catalog open beginning");
4211 let persist_client = controller_config
4212 .persist_clients
4213 .open(controller_config.persist_location.clone())
4214 .await
4215 .context("opening persist client")?;
4216 let builtin_item_migration_config =
4217 BuiltinItemMigrationConfig {
4218 persist_client: persist_client.clone(),
4219 read_only: read_only_controllers,
4220 force_migration: force_builtin_schema_migration,
4221 }
4222 ;
4223 let OpenCatalogResult {
4224 mut catalog,
4225 migrated_storage_collections_0dt,
4226 new_builtin_collections,
4227 builtin_table_updates,
4228 cached_global_exprs,
4229 uncached_local_exprs,
4230 } = Catalog::open(mz_catalog::config::Config {
4231 storage,
4232 metrics_registry: &metrics_registry,
4233 state: mz_catalog::config::StateConfig {
4234 unsafe_mode,
4235 all_features,
4236 build_info,
4237 deploy_generation: controller_config.deploy_generation,
4238 environment_id: environment_id.clone(),
4239 read_only: read_only_controllers,
4240 now: now.clone(),
4241 boot_ts: boot_ts.clone(),
4242 skip_migrations: false,
4243 cluster_replica_sizes,
4244 builtin_system_cluster_config,
4245 builtin_catalog_server_cluster_config,
4246 builtin_probe_cluster_config,
4247 builtin_support_cluster_config,
4248 builtin_analytics_cluster_config,
4249 system_parameter_defaults,
4250 remote_system_parameters,
4251 availability_zones,
4252 egress_addresses,
4253 aws_principal_context,
4254 aws_privatelink_availability_zones,
4255 connection_context,
4256 http_host_name,
4257 builtin_item_migration_config,
4258 persist_client: persist_client.clone(),
4259 enable_expression_cache_override: None,
4260 helm_chart_version,
4261 external_login_password_mz_system,
4262 license_key: license_key.clone(),
4263 },
4264 })
4265 .await?;
4266
4267 let catalog_upper = catalog.current_upper().await;
4270 boot_ts = std::cmp::max(boot_ts, catalog_upper);
4271
4272 if !read_only_controllers {
4273 epoch_millis_oracle.apply_write(boot_ts).await;
4274 }
4275
4276 info!(
4277 "startup: coordinator init: catalog open complete in {:?}",
4278 catalog_open_start.elapsed()
4279 );
4280
4281 let coord_thread_start = Instant::now();
4282 info!("startup: coordinator init: coordinator thread start beginning");
4283
4284 let session_id = catalog.config().session_id;
4285 let start_instant = catalog.config().start_instant;
4286
4287 let (bootstrap_tx, bootstrap_rx) = oneshot::channel();
4291 let handle = TokioHandle::current();
4292
4293 let metrics = Metrics::register_into(&metrics_registry);
4294 let metrics_clone = metrics.clone();
4295 let optimizer_metrics = OptimizerMetrics::register_into(
4296 &metrics_registry,
4297 catalog.system_config().optimizer_e2e_latency_warning_threshold(),
4298 );
4299 let segment_client_clone = segment_client.clone();
4300 let coord_now = now.clone();
4301 let advance_timelines_interval = tokio::time::interval(catalog.config().timestamp_interval);
4302 let mut check_scheduling_policies_interval = tokio::time::interval(
4303 catalog
4304 .system_config()
4305 .cluster_check_scheduling_policies_interval(),
4306 );
4307 check_scheduling_policies_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
4308
4309 let clusters_caught_up_check_interval = if read_only_controllers {
4310 let dyncfgs = catalog.system_config().dyncfgs();
4311 let interval = WITH_0DT_DEPLOYMENT_CAUGHT_UP_CHECK_INTERVAL.get(dyncfgs);
4312
4313 let mut interval = tokio::time::interval(interval);
4314 interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
4315 interval
4316 } else {
4317 let mut interval = tokio::time::interval(Duration::from_secs(60 * 60));
4325 interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
4326 interval
4327 };
4328
4329 let clusters_caught_up_check =
4330 clusters_caught_up_trigger.map(|trigger| CaughtUpCheckContext {
4331 trigger,
4332 exclude_collections: new_builtin_collections.into_iter().collect(),
4333 });
4334
4335 if let Some(config) = pg_timestamp_oracle_config.as_ref() {
4336 let pg_timestamp_oracle_params =
4339 flags::pg_timstamp_oracle_config(catalog.system_config());
4340 pg_timestamp_oracle_params.apply(config);
4341 }
4342
4343 let connection_limit_callback: Arc<dyn Fn(&SystemVars) + Send + Sync> =
4346 Arc::new(move |system_vars: &SystemVars| {
4347 let limit: u64 = system_vars.max_connections().cast_into();
4348 let superuser_reserved: u64 =
4349 system_vars.superuser_reserved_connections().cast_into();
4350
4351 let superuser_reserved = if superuser_reserved >= limit {
4356 tracing::warn!(
4357 "superuser_reserved ({superuser_reserved}) is greater than max connections ({limit})!"
4358 );
4359 limit
4360 } else {
4361 superuser_reserved
4362 };
4363
4364 (connection_limit_callback)(limit, superuser_reserved);
4365 });
4366 catalog.system_config_mut().register_callback(
4367 &mz_sql::session::vars::MAX_CONNECTIONS,
4368 Arc::clone(&connection_limit_callback),
4369 );
4370 catalog.system_config_mut().register_callback(
4371 &mz_sql::session::vars::SUPERUSER_RESERVED_CONNECTIONS,
4372 connection_limit_callback,
4373 );
4374
4375 let (group_commit_tx, group_commit_rx) = appends::notifier();
4376
4377 let parent_span = tracing::Span::current();
4378 let thread = thread::Builder::new()
4379 .stack_size(3 * stack::STACK_SIZE)
4383 .name("coordinator".to_string())
4384 .spawn(move || {
4385 let span = info_span!(parent: parent_span, "coord::coordinator").entered();
4386
4387 let controller = handle
4388 .block_on({
4389 catalog.initialize_controller(
4390 controller_config,
4391 controller_envd_epoch,
4392 read_only_controllers,
4393 )
4394 })
4395 .unwrap_or_terminate("failed to initialize storage_controller");
4396 let catalog_upper = handle.block_on(catalog.current_upper());
4399 boot_ts = std::cmp::max(boot_ts, catalog_upper);
4400 if !read_only_controllers {
4401 let epoch_millis_oracle = ×tamp_oracles
4402 .get(&Timeline::EpochMilliseconds)
4403 .expect("inserted above")
4404 .oracle;
4405 handle.block_on(epoch_millis_oracle.apply_write(boot_ts));
4406 }
4407
4408 let catalog = Arc::new(catalog);
4409
4410 let caching_secrets_reader = CachingSecretsReader::new(secrets_controller.reader());
4411 let mut coord = Coordinator {
4412 controller,
4413 catalog,
4414 internal_cmd_tx,
4415 group_commit_tx,
4416 strict_serializable_reads_tx,
4417 global_timelines: timestamp_oracles,
4418 transient_id_gen: Arc::new(TransientIdGen::new()),
4419 active_conns: BTreeMap::new(),
4420 txn_read_holds: Default::default(),
4421 pending_peeks: BTreeMap::new(),
4422 client_pending_peeks: BTreeMap::new(),
4423 pending_linearize_read_txns: BTreeMap::new(),
4424 serialized_ddl: LockedVecDeque::new(),
4425 active_compute_sinks: BTreeMap::new(),
4426 active_webhooks: BTreeMap::new(),
4427 active_copies: BTreeMap::new(),
4428 staged_cancellation: BTreeMap::new(),
4429 introspection_subscribes: BTreeMap::new(),
4430 write_locks: BTreeMap::new(),
4431 deferred_write_ops: BTreeMap::new(),
4432 pending_writes: Vec::new(),
4433 advance_timelines_interval,
4434 secrets_controller,
4435 caching_secrets_reader,
4436 cloud_resource_controller,
4437 storage_usage_client,
4438 storage_usage_collection_interval,
4439 segment_client,
4440 metrics,
4441 optimizer_metrics,
4442 tracing_handle,
4443 statement_logging: StatementLogging::new(coord_now.clone()),
4444 webhook_concurrency_limit,
4445 pg_timestamp_oracle_config,
4446 check_cluster_scheduling_policies_interval: check_scheduling_policies_interval,
4447 cluster_scheduling_decisions: BTreeMap::new(),
4448 caught_up_check_interval: clusters_caught_up_check_interval,
4449 caught_up_check: clusters_caught_up_check,
4450 installed_watch_sets: BTreeMap::new(),
4451 connection_watch_sets: BTreeMap::new(),
4452 cluster_replica_statuses: ClusterReplicaStatuses::new(),
4453 read_only_controllers,
4454 buffered_builtin_table_updates: Some(Vec::new()),
4455 license_key,
4456 persist_client,
4457 };
4458 let bootstrap = handle.block_on(async {
4459 coord
4460 .bootstrap(
4461 boot_ts,
4462 migrated_storage_collections_0dt,
4463 builtin_table_updates,
4464 cached_global_exprs,
4465 uncached_local_exprs,
4466 audit_logs_iterator,
4467 )
4468 .await?;
4469 coord
4470 .controller
4471 .remove_orphaned_replicas(
4472 coord.catalog().get_next_user_replica_id().await?,
4473 coord.catalog().get_next_system_replica_id().await?,
4474 )
4475 .await
4476 .map_err(AdapterError::Orchestrator)?;
4477
4478 if let Some(retention_period) = storage_usage_retention_period {
4479 coord
4480 .prune_storage_usage_events_on_startup(retention_period)
4481 .await;
4482 }
4483
4484 Ok(())
4485 });
4486 let ok = bootstrap.is_ok();
4487 drop(span);
4488 bootstrap_tx
4489 .send(bootstrap)
4490 .expect("bootstrap_rx is not dropped until it receives this message");
4491 if ok {
4492 handle.block_on(coord.serve(
4493 internal_cmd_rx,
4494 strict_serializable_reads_rx,
4495 cmd_rx,
4496 group_commit_rx,
4497 ));
4498 }
4499 })
4500 .expect("failed to create coordinator thread");
4501 match bootstrap_rx
4502 .await
4503 .expect("bootstrap_tx always sends a message or panics/halts")
4504 {
4505 Ok(()) => {
4506 info!(
4507 "startup: coordinator init: coordinator thread start complete in {:?}",
4508 coord_thread_start.elapsed()
4509 );
4510 info!(
4511 "startup: coordinator init: complete in {:?}",
4512 coord_start.elapsed()
4513 );
4514 let handle = Handle {
4515 session_id,
4516 start_instant,
4517 _thread: thread.join_on_drop(),
4518 };
4519 let client = Client::new(
4520 build_info,
4521 cmd_tx,
4522 metrics_clone,
4523 now,
4524 environment_id,
4525 segment_client_clone,
4526 );
4527 Ok((handle, client))
4528 }
4529 Err(e) => Err(e),
4530 }
4531 }
4532 .boxed()
4533}
4534
4535async fn get_initial_oracle_timestamps(
4549 pg_timestamp_oracle_config: &Option<PostgresTimestampOracleConfig>,
4550) -> Result<BTreeMap<Timeline, Timestamp>, AdapterError> {
4551 let mut initial_timestamps = BTreeMap::new();
4552
4553 if let Some(pg_timestamp_oracle_config) = pg_timestamp_oracle_config {
4554 let postgres_oracle_timestamps =
4555 PostgresTimestampOracle::<NowFn>::get_all_timelines(pg_timestamp_oracle_config.clone())
4556 .await?;
4557
4558 let debug_msg = || {
4559 postgres_oracle_timestamps
4560 .iter()
4561 .map(|(timeline, ts)| format!("{:?} -> {}", timeline, ts))
4562 .join(", ")
4563 };
4564 info!(
4565 "current timestamps from the postgres-backed timestamp oracle: {}",
4566 debug_msg()
4567 );
4568
4569 for (timeline, ts) in postgres_oracle_timestamps {
4570 let entry = initial_timestamps
4571 .entry(Timeline::from_str(&timeline).expect("could not parse timeline"));
4572
4573 entry
4574 .and_modify(|current_ts| *current_ts = std::cmp::max(*current_ts, ts))
4575 .or_insert(ts);
4576 }
4577 } else {
4578 info!("no postgres url for postgres-backed timestamp oracle configured!");
4579 };
4580
4581 let debug_msg = || {
4582 initial_timestamps
4583 .iter()
4584 .map(|(timeline, ts)| format!("{:?}: {}", timeline, ts))
4585 .join(", ")
4586 };
4587 info!("initial oracle timestamps: {}", debug_msg());
4588
4589 Ok(initial_timestamps)
4590}
4591
4592#[instrument]
4593pub async fn load_remote_system_parameters(
4594 storage: &mut Box<dyn OpenableDurableCatalogState>,
4595 system_parameter_sync_config: Option<SystemParameterSyncConfig>,
4596 system_parameter_sync_timeout: Duration,
4597) -> Result<Option<BTreeMap<String, String>>, AdapterError> {
4598 if let Some(system_parameter_sync_config) = system_parameter_sync_config {
4599 tracing::info!("parameter sync on boot: start sync");
4600
4601 let mut params = SynchronizedParameters::new(SystemVars::default());
4641 let frontend_sync = async {
4642 let frontend = SystemParameterFrontend::from(&system_parameter_sync_config).await?;
4643 frontend.pull(&mut params);
4644 let ops = params
4645 .modified()
4646 .into_iter()
4647 .map(|param| {
4648 let name = param.name;
4649 let value = param.value;
4650 tracing::info!(name, value, initial = true, "sync parameter");
4651 (name, value)
4652 })
4653 .collect();
4654 tracing::info!("parameter sync on boot: end sync");
4655 Ok(Some(ops))
4656 };
4657 if !storage.has_system_config_synced_once().await? {
4658 frontend_sync.await
4659 } else {
4660 match mz_ore::future::timeout(system_parameter_sync_timeout, frontend_sync).await {
4661 Ok(ops) => Ok(ops),
4662 Err(TimeoutError::Inner(e)) => Err(e),
4663 Err(TimeoutError::DeadlineElapsed) => {
4664 tracing::info!("parameter sync on boot: sync has timed out");
4665 Ok(None)
4666 }
4667 }
4668 }
4669 } else {
4670 Ok(None)
4671 }
4672}
4673
4674#[derive(Debug)]
4675pub enum WatchSetResponse {
4676 StatementDependenciesReady(StatementLoggingId, StatementLifecycleEvent),
4677 AlterSinkReady(AlterSinkReadyContext),
4678}
4679
4680#[derive(Debug)]
4681pub struct AlterSinkReadyContext {
4682 ctx: Option<ExecuteContext>,
4683 otel_ctx: OpenTelemetryContext,
4684 plan: AlterSinkPlan,
4685 plan_validity: PlanValidity,
4686 read_hold: ReadHolds<Timestamp>,
4687}
4688
4689impl AlterSinkReadyContext {
4690 fn ctx(&mut self) -> &mut ExecuteContext {
4691 self.ctx.as_mut().expect("only cleared on drop")
4692 }
4693
4694 fn retire(mut self, result: Result<ExecuteResponse, AdapterError>) {
4695 self.ctx
4696 .take()
4697 .expect("only cleared on drop")
4698 .retire(result);
4699 }
4700}
4701
4702impl Drop for AlterSinkReadyContext {
4703 fn drop(&mut self) {
4704 if let Some(ctx) = self.ctx.take() {
4705 ctx.retire(Err(AdapterError::Canceled));
4706 }
4707 }
4708}
4709
4710#[derive(Debug)]
4713struct LockedVecDeque<T> {
4714 items: VecDeque<T>,
4715 lock: Arc<tokio::sync::Mutex<()>>,
4716}
4717
4718impl<T> LockedVecDeque<T> {
4719 pub fn new() -> Self {
4720 Self {
4721 items: VecDeque::new(),
4722 lock: Arc::new(tokio::sync::Mutex::new(())),
4723 }
4724 }
4725
4726 pub fn try_lock_owned(&self) -> Result<OwnedMutexGuard<()>, tokio::sync::TryLockError> {
4727 Arc::clone(&self.lock).try_lock_owned()
4728 }
4729
4730 pub fn is_empty(&self) -> bool {
4731 self.items.is_empty()
4732 }
4733
4734 pub fn push_back(&mut self, value: T) {
4735 self.items.push_back(value)
4736 }
4737
4738 pub fn pop_front(&mut self) -> Option<T> {
4739 self.items.pop_front()
4740 }
4741
4742 pub fn remove(&mut self, index: usize) -> Option<T> {
4743 self.items.remove(index)
4744 }
4745
4746 pub fn iter(&self) -> std::collections::vec_deque::Iter<'_, T> {
4747 self.items.iter()
4748 }
4749}
4750
4751#[derive(Debug)]
4752struct DeferredPlanStatement {
4753 ctx: ExecuteContext,
4754 ps: PlanStatement,
4755}
4756
4757#[derive(Debug)]
4758enum PlanStatement {
4759 Statement {
4760 stmt: Arc<Statement<Raw>>,
4761 params: Params,
4762 },
4763 Plan {
4764 plan: mz_sql::plan::Plan,
4765 resolved_ids: ResolvedIds,
4766 },
4767}
4768
4769#[derive(Debug, Error)]
4770pub enum NetworkPolicyError {
4771 #[error("Access denied for address {0}")]
4772 AddressDenied(IpAddr),
4773 #[error("Access denied missing IP address")]
4774 MissingIp,
4775}
4776
4777pub(crate) fn validate_ip_with_policy_rules(
4778 ip: &IpAddr,
4779 rules: &Vec<NetworkPolicyRule>,
4780) -> Result<(), NetworkPolicyError> {
4781 if rules.iter().any(|r| r.address.0.contains(ip)) {
4784 Ok(())
4785 } else {
4786 Err(NetworkPolicyError::AddressDenied(ip.clone()))
4787 }
4788}