1use std::borrow::Cow;
70use std::collections::{BTreeMap, BTreeSet, VecDeque};
71use std::fmt;
72use std::net::IpAddr;
73use std::num::NonZeroI64;
74use std::ops::Neg;
75use std::str::FromStr;
76use std::sync::LazyLock;
77use std::sync::{Arc, Mutex};
78use std::thread;
79use std::time::{Duration, Instant};
80
81use anyhow::Context;
82use chrono::{DateTime, Utc};
83use derivative::Derivative;
84use differential_dataflow::lattice::Lattice;
85use fail::fail_point;
86use futures::StreamExt;
87use futures::future::{BoxFuture, FutureExt, LocalBoxFuture};
88use http::Uri;
89use ipnet::IpNet;
90use itertools::{Either, Itertools};
91use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
92use mz_adapter_types::compaction::CompactionWindow;
93use mz_adapter_types::connection::ConnectionId;
94use mz_adapter_types::dyncfgs::WITH_0DT_DEPLOYMENT_CAUGHT_UP_CHECK_INTERVAL;
95use mz_auth::password::Password;
96use mz_build_info::BuildInfo;
97use mz_catalog::builtin::{BUILTINS, BUILTINS_STATIC, MZ_AUDIT_EVENTS, MZ_STORAGE_USAGE_BY_SHARD};
98use mz_catalog::config::{AwsPrincipalContext, BuiltinItemMigrationConfig, ClusterReplicaSizeMap};
99use mz_catalog::durable::{AuditLogIterator, OpenableDurableCatalogState};
100use mz_catalog::expr_cache::{GlobalExpressions, LocalExpressions};
101use mz_catalog::memory::objects::{
102 CatalogEntry, CatalogItem, ClusterReplicaProcessStatus, ClusterVariantManaged, Connection,
103 DataSourceDesc, StateDiff, StateUpdate, StateUpdateKind, Table, TableDataSource,
104};
105use mz_cloud_resources::{CloudResourceController, VpcEndpointConfig, VpcEndpointEvent};
106use mz_compute_client::as_of_selection;
107use mz_compute_client::controller::error::{
108 CollectionLookupError, CollectionMissing, DataflowCreationError, InstanceMissing,
109};
110use mz_compute_types::ComputeInstanceId;
111use mz_compute_types::dataflows::DataflowDescription;
112use mz_compute_types::plan::Plan;
113use mz_controller::clusters::{
114 ClusterConfig, ClusterEvent, ClusterStatus, ProcessId, ReplicaLocation,
115};
116use mz_controller::{ControllerConfig, Readiness};
117use mz_controller_types::{ClusterId, ReplicaId, WatchSetId};
118use mz_expr::{MapFilterProject, MirRelationExpr, OptimizedMirRelationExpr, RowSetFinishing};
119use mz_license_keys::ValidatedLicenseKey;
120use mz_orchestrator::OfflineReason;
121use mz_ore::cast::{CastFrom, CastInto, CastLossy};
122use mz_ore::channel::trigger::Trigger;
123use mz_ore::future::TimeoutError;
124use mz_ore::metrics::MetricsRegistry;
125use mz_ore::now::{EpochMillis, NowFn};
126use mz_ore::task::{JoinHandle, spawn};
127use mz_ore::thread::JoinHandleExt;
128use mz_ore::tracing::{OpenTelemetryContext, TracingHandle};
129use mz_ore::url::SensitiveUrl;
130use mz_ore::{assert_none, instrument, soft_assert_eq_or_log, soft_assert_or_log, stack};
131use mz_persist_client::PersistClient;
132use mz_persist_client::batch::ProtoBatch;
133use mz_persist_client::usage::{ShardsUsageReferenced, StorageUsageClient};
134use mz_repr::adt::numeric::Numeric;
135use mz_repr::explain::{ExplainConfig, ExplainFormat};
136use mz_repr::global_id::TransientIdGen;
137use mz_repr::optimize::OptimizerFeatures;
138use mz_repr::role_id::RoleId;
139use mz_repr::{CatalogItemId, Diff, GlobalId, RelationDesc, SqlRelationType, Timestamp};
140use mz_secrets::cache::CachingSecretsReader;
141use mz_secrets::{SecretsController, SecretsReader};
142use mz_sql::ast::{Raw, Statement};
143use mz_sql::catalog::{CatalogCluster, EnvironmentId};
144use mz_sql::names::{QualifiedItemName, ResolvedIds, SchemaSpecifier};
145use mz_sql::optimizer_metrics::OptimizerMetrics;
146use mz_sql::plan::{
147 self, AlterSinkPlan, ConnectionDetails, CreateConnectionPlan, HirRelationExpr,
148 NetworkPolicyRule, OnTimeoutAction, Params, QueryWhen,
149};
150use mz_sql::session::user::User;
151use mz_sql::session::vars::{MAX_CREDIT_CONSUMPTION_RATE, SystemVars, Var};
152use mz_sql_parser::ast::ExplainStage;
153use mz_sql_parser::ast::display::AstDisplay;
154use mz_storage_client::client::TableData;
155use mz_storage_client::controller::{CollectionDescription, DataSource, ExportDescription};
156use mz_storage_types::connections::Connection as StorageConnection;
157use mz_storage_types::connections::ConnectionContext;
158use mz_storage_types::connections::inline::{IntoInlineConnection, ReferencedConnection};
159use mz_storage_types::read_holds::ReadHold;
160use mz_storage_types::sinks::{S3SinkFormat, StorageSinkDesc};
161use mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC;
162use mz_storage_types::sources::{IngestionDescription, SourceExport, Timeline};
163use mz_timestamp_oracle::{TimestampOracleConfig, WriteTimestamp};
164use mz_transform::dataflow::DataflowMetainfo;
165use opentelemetry::trace::TraceContextExt;
166use serde::Serialize;
167use thiserror::Error;
168use timely::progress::{Antichain, Timestamp as _};
169use tokio::runtime::Handle as TokioHandle;
170use tokio::select;
171use tokio::sync::{OwnedMutexGuard, mpsc, oneshot, watch};
172use tokio::time::{Interval, MissedTickBehavior};
173use tracing::{Instrument, Level, Span, debug, info, info_span, span, warn};
174use tracing_opentelemetry::OpenTelemetrySpanExt;
175use uuid::Uuid;
176
177use crate::active_compute_sink::{ActiveComputeSink, ActiveCopyFrom};
178use crate::catalog::{BuiltinTableUpdate, Catalog, OpenCatalogResult};
179use crate::client::{Client, Handle};
180use crate::command::{Command, ExecuteResponse};
181use crate::config::{SynchronizedParameters, SystemParameterFrontend, SystemParameterSyncConfig};
182use crate::coord::appends::{
183 BuiltinTableAppendNotify, DeferredOp, GroupCommitPermit, PendingWriteTxn,
184};
185use crate::coord::caught_up::CaughtUpCheckContext;
186use crate::coord::cluster_scheduling::SchedulingDecision;
187use crate::coord::id_bundle::CollectionIdBundle;
188use crate::coord::introspection::IntrospectionSubscribe;
189use crate::coord::peek::PendingPeek;
190use crate::coord::statement_logging::StatementLogging;
191use crate::coord::timeline::{TimelineContext, TimelineState};
192use crate::coord::timestamp_selection::{TimestampContext, TimestampDetermination};
193use crate::coord::validity::PlanValidity;
194use crate::error::AdapterError;
195use crate::explain::insights::PlanInsightsContext;
196use crate::explain::optimizer_trace::{DispatchGuard, OptimizerTrace};
197use crate::metrics::Metrics;
198use crate::optimize::dataflows::{
199 ComputeInstanceSnapshot, DataflowBuilder, dataflow_import_id_bundle,
200};
201use crate::optimize::{self, Optimize, OptimizerConfig};
202use crate::session::{EndTransactionAction, Session};
203use crate::statement_logging::{
204 StatementEndedExecutionReason, StatementLifecycleEvent, StatementLoggingId,
205};
206use crate::util::{ClientTransmitter, ResultExt};
207use crate::webhook::{WebhookAppenderInvalidator, WebhookConcurrencyLimiter};
208use crate::{AdapterNotice, ReadHolds, flags};
209
210pub(crate) mod appends;
211pub(crate) mod catalog_serving;
212pub(crate) mod cluster_scheduling;
213pub(crate) mod consistency;
214pub(crate) mod id_bundle;
215pub(crate) mod in_memory_oracle;
216pub(crate) mod peek;
217pub(crate) mod read_policy;
218pub(crate) mod sequencer;
219pub(crate) mod statement_logging;
220pub(crate) mod timeline;
221pub(crate) mod timestamp_selection;
222
223pub mod catalog_implications;
224mod caught_up;
225mod command_handler;
226mod ddl;
227mod indexes;
228mod introspection;
229mod message_handler;
230mod privatelink_status;
231mod sql;
232mod validity;
233
234#[derive(Debug)]
235pub enum Message {
236 Command(OpenTelemetryContext, Command),
237 ControllerReady {
238 controller: ControllerReadiness,
239 },
240 PurifiedStatementReady(PurifiedStatementReady),
241 CreateConnectionValidationReady(CreateConnectionValidationReady),
242 AlterConnectionValidationReady(AlterConnectionValidationReady),
243 TryDeferred {
244 conn_id: ConnectionId,
246 acquired_lock: Option<(CatalogItemId, tokio::sync::OwnedMutexGuard<()>)>,
256 },
257 GroupCommitInitiate(Span, Option<GroupCommitPermit>),
259 DeferredStatementReady,
260 AdvanceTimelines,
261 ClusterEvent(ClusterEvent),
262 CancelPendingPeeks {
263 conn_id: ConnectionId,
264 },
265 LinearizeReads,
266 StagedBatches {
267 conn_id: ConnectionId,
268 table_id: CatalogItemId,
269 batches: Vec<Result<ProtoBatch, String>>,
270 },
271 StorageUsageSchedule,
272 StorageUsageFetch,
273 StorageUsageUpdate(ShardsUsageReferenced),
274 StorageUsagePrune(Vec<BuiltinTableUpdate>),
275 RetireExecute {
278 data: ExecuteContextExtra,
279 otel_ctx: OpenTelemetryContext,
280 reason: StatementEndedExecutionReason,
281 },
282 ExecuteSingleStatementTransaction {
283 ctx: ExecuteContext,
284 otel_ctx: OpenTelemetryContext,
285 stmt: Arc<Statement<Raw>>,
286 params: mz_sql::plan::Params,
287 },
288 PeekStageReady {
289 ctx: ExecuteContext,
290 span: Span,
291 stage: PeekStage,
292 },
293 CreateIndexStageReady {
294 ctx: ExecuteContext,
295 span: Span,
296 stage: CreateIndexStage,
297 },
298 CreateViewStageReady {
299 ctx: ExecuteContext,
300 span: Span,
301 stage: CreateViewStage,
302 },
303 CreateMaterializedViewStageReady {
304 ctx: ExecuteContext,
305 span: Span,
306 stage: CreateMaterializedViewStage,
307 },
308 SubscribeStageReady {
309 ctx: ExecuteContext,
310 span: Span,
311 stage: SubscribeStage,
312 },
313 IntrospectionSubscribeStageReady {
314 span: Span,
315 stage: IntrospectionSubscribeStage,
316 },
317 SecretStageReady {
318 ctx: ExecuteContext,
319 span: Span,
320 stage: SecretStage,
321 },
322 ClusterStageReady {
323 ctx: ExecuteContext,
324 span: Span,
325 stage: ClusterStage,
326 },
327 ExplainTimestampStageReady {
328 ctx: ExecuteContext,
329 span: Span,
330 stage: ExplainTimestampStage,
331 },
332 DrainStatementLog,
333 PrivateLinkVpcEndpointEvents(Vec<VpcEndpointEvent>),
334 CheckSchedulingPolicies,
335
336 SchedulingDecisions(Vec<(&'static str, Vec<(ClusterId, SchedulingDecision)>)>),
341}
342
343impl Message {
344 pub const fn kind(&self) -> &'static str {
346 match self {
347 Message::Command(_, msg) => match msg {
348 Command::CatalogSnapshot { .. } => "command-catalog_snapshot",
349 Command::Startup { .. } => "command-startup",
350 Command::Execute { .. } => "command-execute",
351 Command::Commit { .. } => "command-commit",
352 Command::CancelRequest { .. } => "command-cancel_request",
353 Command::PrivilegedCancelRequest { .. } => "command-privileged_cancel_request",
354 Command::GetWebhook { .. } => "command-get_webhook",
355 Command::GetSystemVars { .. } => "command-get_system_vars",
356 Command::SetSystemVars { .. } => "command-set_system_vars",
357 Command::Terminate { .. } => "command-terminate",
358 Command::RetireExecute { .. } => "command-retire_execute",
359 Command::CheckConsistency { .. } => "command-check_consistency",
360 Command::Dump { .. } => "command-dump",
361 Command::AuthenticatePassword { .. } => "command-auth_check",
362 Command::AuthenticateGetSASLChallenge { .. } => "command-auth_get_sasl_challenge",
363 Command::AuthenticateVerifySASLProof { .. } => "command-auth_verify_sasl_proof",
364 Command::GetComputeInstanceClient { .. } => "get-compute-instance-client",
365 Command::GetOracle { .. } => "get-oracle",
366 Command::DetermineRealTimeRecentTimestamp { .. } => {
367 "determine-real-time-recent-timestamp"
368 }
369 Command::GetTransactionReadHoldsBundle { .. } => {
370 "get-transaction-read-holds-bundle"
371 }
372 Command::StoreTransactionReadHolds { .. } => "store-transaction-read-holds",
373 Command::ExecuteSlowPathPeek { .. } => "execute-slow-path-peek",
374 Command::CopyToPreflight { .. } => "copy-to-preflight",
375 Command::ExecuteCopyTo { .. } => "execute-copy-to",
376 Command::ExecuteSideEffectingFunc { .. } => "execute-side-effecting-func",
377 Command::RegisterFrontendPeek { .. } => "register-frontend-peek",
378 Command::UnregisterFrontendPeek { .. } => "unregister-frontend-peek",
379 Command::ExplainTimestamp { .. } => "explain-timestamp",
380 Command::FrontendStatementLogging(..) => "frontend-statement-logging",
381 },
382 Message::ControllerReady {
383 controller: ControllerReadiness::Compute,
384 } => "controller_ready(compute)",
385 Message::ControllerReady {
386 controller: ControllerReadiness::Storage,
387 } => "controller_ready(storage)",
388 Message::ControllerReady {
389 controller: ControllerReadiness::Metrics,
390 } => "controller_ready(metrics)",
391 Message::ControllerReady {
392 controller: ControllerReadiness::Internal,
393 } => "controller_ready(internal)",
394 Message::PurifiedStatementReady(_) => "purified_statement_ready",
395 Message::CreateConnectionValidationReady(_) => "create_connection_validation_ready",
396 Message::TryDeferred { .. } => "try_deferred",
397 Message::GroupCommitInitiate(..) => "group_commit_initiate",
398 Message::AdvanceTimelines => "advance_timelines",
399 Message::ClusterEvent(_) => "cluster_event",
400 Message::CancelPendingPeeks { .. } => "cancel_pending_peeks",
401 Message::LinearizeReads => "linearize_reads",
402 Message::StagedBatches { .. } => "staged_batches",
403 Message::StorageUsageSchedule => "storage_usage_schedule",
404 Message::StorageUsageFetch => "storage_usage_fetch",
405 Message::StorageUsageUpdate(_) => "storage_usage_update",
406 Message::StorageUsagePrune(_) => "storage_usage_prune",
407 Message::RetireExecute { .. } => "retire_execute",
408 Message::ExecuteSingleStatementTransaction { .. } => {
409 "execute_single_statement_transaction"
410 }
411 Message::PeekStageReady { .. } => "peek_stage_ready",
412 Message::ExplainTimestampStageReady { .. } => "explain_timestamp_stage_ready",
413 Message::CreateIndexStageReady { .. } => "create_index_stage_ready",
414 Message::CreateViewStageReady { .. } => "create_view_stage_ready",
415 Message::CreateMaterializedViewStageReady { .. } => {
416 "create_materialized_view_stage_ready"
417 }
418 Message::SubscribeStageReady { .. } => "subscribe_stage_ready",
419 Message::IntrospectionSubscribeStageReady { .. } => {
420 "introspection_subscribe_stage_ready"
421 }
422 Message::SecretStageReady { .. } => "secret_stage_ready",
423 Message::ClusterStageReady { .. } => "cluster_stage_ready",
424 Message::DrainStatementLog => "drain_statement_log",
425 Message::AlterConnectionValidationReady(..) => "alter_connection_validation_ready",
426 Message::PrivateLinkVpcEndpointEvents(_) => "private_link_vpc_endpoint_events",
427 Message::CheckSchedulingPolicies => "check_scheduling_policies",
428 Message::SchedulingDecisions { .. } => "scheduling_decision",
429 Message::DeferredStatementReady => "deferred_statement_ready",
430 }
431 }
432}
433
434#[derive(Debug)]
436pub enum ControllerReadiness {
437 Storage,
439 Compute,
441 Metrics,
443 Internal,
445}
446
447#[derive(Derivative)]
448#[derivative(Debug)]
449pub struct BackgroundWorkResult<T> {
450 #[derivative(Debug = "ignore")]
451 pub ctx: ExecuteContext,
452 pub result: Result<T, AdapterError>,
453 pub params: Params,
454 pub plan_validity: PlanValidity,
455 pub original_stmt: Arc<Statement<Raw>>,
456 pub otel_ctx: OpenTelemetryContext,
457}
458
459pub type PurifiedStatementReady = BackgroundWorkResult<mz_sql::pure::PurifiedStatement>;
460
461#[derive(Derivative)]
462#[derivative(Debug)]
463pub struct ValidationReady<T> {
464 #[derivative(Debug = "ignore")]
465 pub ctx: ExecuteContext,
466 pub result: Result<T, AdapterError>,
467 pub resolved_ids: ResolvedIds,
468 pub connection_id: CatalogItemId,
469 pub connection_gid: GlobalId,
470 pub plan_validity: PlanValidity,
471 pub otel_ctx: OpenTelemetryContext,
472}
473
474pub type CreateConnectionValidationReady = ValidationReady<CreateConnectionPlan>;
475pub type AlterConnectionValidationReady = ValidationReady<Connection>;
476
477#[derive(Debug)]
478pub enum PeekStage {
479 LinearizeTimestamp(PeekStageLinearizeTimestamp),
481 RealTimeRecency(PeekStageRealTimeRecency),
482 TimestampReadHold(PeekStageTimestampReadHold),
483 Optimize(PeekStageOptimize),
484 Finish(PeekStageFinish),
486 ExplainPlan(PeekStageExplainPlan),
488 ExplainPushdown(PeekStageExplainPushdown),
489 CopyToPreflight(PeekStageCopyTo),
491 CopyToDataflow(PeekStageCopyTo),
493}
494
495#[derive(Debug)]
496pub struct CopyToContext {
497 pub desc: RelationDesc,
499 pub uri: Uri,
501 pub connection: StorageConnection<ReferencedConnection>,
503 pub connection_id: CatalogItemId,
505 pub format: S3SinkFormat,
507 pub max_file_size: u64,
509 pub output_batch_count: Option<u64>,
514}
515
516#[derive(Debug)]
517pub struct PeekStageLinearizeTimestamp {
518 validity: PlanValidity,
519 plan: mz_sql::plan::SelectPlan,
520 max_query_result_size: Option<u64>,
521 source_ids: BTreeSet<GlobalId>,
522 target_replica: Option<ReplicaId>,
523 timeline_context: TimelineContext,
524 optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
525 explain_ctx: ExplainContext,
528}
529
530#[derive(Debug)]
531pub struct PeekStageRealTimeRecency {
532 validity: PlanValidity,
533 plan: mz_sql::plan::SelectPlan,
534 max_query_result_size: Option<u64>,
535 source_ids: BTreeSet<GlobalId>,
536 target_replica: Option<ReplicaId>,
537 timeline_context: TimelineContext,
538 oracle_read_ts: Option<Timestamp>,
539 optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
540 explain_ctx: ExplainContext,
543}
544
545#[derive(Debug)]
546pub struct PeekStageTimestampReadHold {
547 validity: PlanValidity,
548 plan: mz_sql::plan::SelectPlan,
549 max_query_result_size: Option<u64>,
550 source_ids: BTreeSet<GlobalId>,
551 target_replica: Option<ReplicaId>,
552 timeline_context: TimelineContext,
553 oracle_read_ts: Option<Timestamp>,
554 real_time_recency_ts: Option<mz_repr::Timestamp>,
555 optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
556 explain_ctx: ExplainContext,
559}
560
561#[derive(Debug)]
562pub struct PeekStageOptimize {
563 validity: PlanValidity,
564 plan: mz_sql::plan::SelectPlan,
565 max_query_result_size: Option<u64>,
566 source_ids: BTreeSet<GlobalId>,
567 id_bundle: CollectionIdBundle,
568 target_replica: Option<ReplicaId>,
569 determination: TimestampDetermination<mz_repr::Timestamp>,
570 optimizer: Either<optimize::peek::Optimizer, optimize::copy_to::Optimizer>,
571 explain_ctx: ExplainContext,
574}
575
576#[derive(Debug)]
577pub struct PeekStageFinish {
578 validity: PlanValidity,
579 plan: mz_sql::plan::SelectPlan,
580 max_query_result_size: Option<u64>,
581 id_bundle: CollectionIdBundle,
582 target_replica: Option<ReplicaId>,
583 source_ids: BTreeSet<GlobalId>,
584 determination: TimestampDetermination<mz_repr::Timestamp>,
585 cluster_id: ComputeInstanceId,
586 finishing: RowSetFinishing,
587 plan_insights_optimizer_trace: Option<OptimizerTrace>,
590 insights_ctx: Option<Box<PlanInsightsContext>>,
591 global_lir_plan: optimize::peek::GlobalLirPlan,
592 optimization_finished_at: EpochMillis,
593}
594
595#[derive(Debug)]
596pub struct PeekStageCopyTo {
597 validity: PlanValidity,
598 optimizer: optimize::copy_to::Optimizer,
599 global_lir_plan: optimize::copy_to::GlobalLirPlan,
600 optimization_finished_at: EpochMillis,
601 source_ids: BTreeSet<GlobalId>,
602}
603
604#[derive(Debug)]
605pub struct PeekStageExplainPlan {
606 validity: PlanValidity,
607 optimizer: optimize::peek::Optimizer,
608 df_meta: DataflowMetainfo,
609 explain_ctx: ExplainPlanContext,
610 insights_ctx: Option<Box<PlanInsightsContext>>,
611}
612
613#[derive(Debug)]
614pub struct PeekStageExplainPushdown {
615 validity: PlanValidity,
616 determination: TimestampDetermination<mz_repr::Timestamp>,
617 imports: BTreeMap<GlobalId, MapFilterProject>,
618}
619
620#[derive(Debug)]
621pub enum CreateIndexStage {
622 Optimize(CreateIndexOptimize),
623 Finish(CreateIndexFinish),
624 Explain(CreateIndexExplain),
625}
626
627#[derive(Debug)]
628pub struct CreateIndexOptimize {
629 validity: PlanValidity,
630 plan: plan::CreateIndexPlan,
631 resolved_ids: ResolvedIds,
632 explain_ctx: ExplainContext,
635}
636
637#[derive(Debug)]
638pub struct CreateIndexFinish {
639 validity: PlanValidity,
640 item_id: CatalogItemId,
641 global_id: GlobalId,
642 plan: plan::CreateIndexPlan,
643 resolved_ids: ResolvedIds,
644 global_mir_plan: optimize::index::GlobalMirPlan,
645 global_lir_plan: optimize::index::GlobalLirPlan,
646}
647
648#[derive(Debug)]
649pub struct CreateIndexExplain {
650 validity: PlanValidity,
651 exported_index_id: GlobalId,
652 plan: plan::CreateIndexPlan,
653 df_meta: DataflowMetainfo,
654 explain_ctx: ExplainPlanContext,
655}
656
657#[derive(Debug)]
658pub enum CreateViewStage {
659 Optimize(CreateViewOptimize),
660 Finish(CreateViewFinish),
661 Explain(CreateViewExplain),
662}
663
664#[derive(Debug)]
665pub struct CreateViewOptimize {
666 validity: PlanValidity,
667 plan: plan::CreateViewPlan,
668 resolved_ids: ResolvedIds,
669 explain_ctx: ExplainContext,
672}
673
674#[derive(Debug)]
675pub struct CreateViewFinish {
676 validity: PlanValidity,
677 item_id: CatalogItemId,
679 global_id: GlobalId,
681 plan: plan::CreateViewPlan,
682 resolved_ids: ResolvedIds,
684 optimized_expr: OptimizedMirRelationExpr,
685}
686
687#[derive(Debug)]
688pub struct CreateViewExplain {
689 validity: PlanValidity,
690 id: GlobalId,
691 plan: plan::CreateViewPlan,
692 explain_ctx: ExplainPlanContext,
693}
694
695#[derive(Debug)]
696pub enum ExplainTimestampStage {
697 Optimize(ExplainTimestampOptimize),
698 RealTimeRecency(ExplainTimestampRealTimeRecency),
699 Finish(ExplainTimestampFinish),
700}
701
702#[derive(Debug)]
703pub struct ExplainTimestampOptimize {
704 validity: PlanValidity,
705 plan: plan::ExplainTimestampPlan,
706 cluster_id: ClusterId,
707}
708
709#[derive(Debug)]
710pub struct ExplainTimestampRealTimeRecency {
711 validity: PlanValidity,
712 format: ExplainFormat,
713 optimized_plan: OptimizedMirRelationExpr,
714 cluster_id: ClusterId,
715 when: QueryWhen,
716}
717
718#[derive(Debug)]
719pub struct ExplainTimestampFinish {
720 validity: PlanValidity,
721 format: ExplainFormat,
722 optimized_plan: OptimizedMirRelationExpr,
723 cluster_id: ClusterId,
724 source_ids: BTreeSet<GlobalId>,
725 when: QueryWhen,
726 real_time_recency_ts: Option<Timestamp>,
727}
728
729#[derive(Debug)]
730pub enum ClusterStage {
731 Alter(AlterCluster),
732 WaitForHydrated(AlterClusterWaitForHydrated),
733 Finalize(AlterClusterFinalize),
734}
735
736#[derive(Debug)]
737pub struct AlterCluster {
738 validity: PlanValidity,
739 plan: plan::AlterClusterPlan,
740}
741
742#[derive(Debug)]
743pub struct AlterClusterWaitForHydrated {
744 validity: PlanValidity,
745 plan: plan::AlterClusterPlan,
746 new_config: ClusterVariantManaged,
747 timeout_time: Instant,
748 on_timeout: OnTimeoutAction,
749}
750
751#[derive(Debug)]
752pub struct AlterClusterFinalize {
753 validity: PlanValidity,
754 plan: plan::AlterClusterPlan,
755 new_config: ClusterVariantManaged,
756}
757
758#[derive(Debug)]
759pub enum ExplainContext {
760 None,
762 Plan(ExplainPlanContext),
764 PlanInsightsNotice(OptimizerTrace),
767 Pushdown,
769}
770
771impl ExplainContext {
772 pub(crate) fn dispatch_guard(&self) -> Option<DispatchGuard<'_>> {
776 let optimizer_trace = match self {
777 ExplainContext::Plan(explain_ctx) => Some(&explain_ctx.optimizer_trace),
778 ExplainContext::PlanInsightsNotice(optimizer_trace) => Some(optimizer_trace),
779 _ => None,
780 };
781 optimizer_trace.map(|optimizer_trace| optimizer_trace.as_guard())
782 }
783
784 pub(crate) fn needs_cluster(&self) -> bool {
785 match self {
786 ExplainContext::None => true,
787 ExplainContext::Plan(..) => false,
788 ExplainContext::PlanInsightsNotice(..) => true,
789 ExplainContext::Pushdown => false,
790 }
791 }
792
793 pub(crate) fn needs_plan_insights(&self) -> bool {
794 matches!(
795 self,
796 ExplainContext::Plan(ExplainPlanContext {
797 stage: ExplainStage::PlanInsights,
798 ..
799 }) | ExplainContext::PlanInsightsNotice(_)
800 )
801 }
802}
803
804#[derive(Debug)]
805pub struct ExplainPlanContext {
806 pub broken: bool,
811 pub config: ExplainConfig,
812 pub format: ExplainFormat,
813 pub stage: ExplainStage,
814 pub replan: Option<GlobalId>,
815 pub desc: Option<RelationDesc>,
816 pub optimizer_trace: OptimizerTrace,
817}
818
819#[derive(Debug)]
820pub enum CreateMaterializedViewStage {
821 Optimize(CreateMaterializedViewOptimize),
822 Finish(CreateMaterializedViewFinish),
823 Explain(CreateMaterializedViewExplain),
824}
825
826#[derive(Debug)]
827pub struct CreateMaterializedViewOptimize {
828 validity: PlanValidity,
829 plan: plan::CreateMaterializedViewPlan,
830 resolved_ids: ResolvedIds,
831 explain_ctx: ExplainContext,
834}
835
836#[derive(Debug)]
837pub struct CreateMaterializedViewFinish {
838 item_id: CatalogItemId,
840 global_id: GlobalId,
842 validity: PlanValidity,
843 plan: plan::CreateMaterializedViewPlan,
844 resolved_ids: ResolvedIds,
845 local_mir_plan: optimize::materialized_view::LocalMirPlan,
846 global_mir_plan: optimize::materialized_view::GlobalMirPlan,
847 global_lir_plan: optimize::materialized_view::GlobalLirPlan,
848}
849
850#[derive(Debug)]
851pub struct CreateMaterializedViewExplain {
852 global_id: GlobalId,
853 validity: PlanValidity,
854 plan: plan::CreateMaterializedViewPlan,
855 df_meta: DataflowMetainfo,
856 explain_ctx: ExplainPlanContext,
857}
858
859#[derive(Debug)]
860pub enum SubscribeStage {
861 OptimizeMir(SubscribeOptimizeMir),
862 TimestampOptimizeLir(SubscribeTimestampOptimizeLir),
863 Finish(SubscribeFinish),
864 Explain(SubscribeExplain),
865}
866
867#[derive(Debug)]
868pub struct SubscribeOptimizeMir {
869 validity: PlanValidity,
870 plan: plan::SubscribePlan,
871 timeline: TimelineContext,
872 dependency_ids: BTreeSet<GlobalId>,
873 cluster_id: ComputeInstanceId,
874 replica_id: Option<ReplicaId>,
875 explain_ctx: ExplainContext,
878}
879
880#[derive(Debug)]
881pub struct SubscribeTimestampOptimizeLir {
882 validity: PlanValidity,
883 plan: plan::SubscribePlan,
884 timeline: TimelineContext,
885 optimizer: optimize::subscribe::Optimizer,
886 global_mir_plan: optimize::subscribe::GlobalMirPlan<optimize::subscribe::Unresolved>,
887 dependency_ids: BTreeSet<GlobalId>,
888 replica_id: Option<ReplicaId>,
889 explain_ctx: ExplainContext,
892}
893
894#[derive(Debug)]
895pub struct SubscribeFinish {
896 validity: PlanValidity,
897 cluster_id: ComputeInstanceId,
898 replica_id: Option<ReplicaId>,
899 plan: plan::SubscribePlan,
900 global_lir_plan: optimize::subscribe::GlobalLirPlan,
901 dependency_ids: BTreeSet<GlobalId>,
902}
903
904#[derive(Debug)]
905pub struct SubscribeExplain {
906 validity: PlanValidity,
907 optimizer: optimize::subscribe::Optimizer,
908 df_meta: DataflowMetainfo,
909 cluster_id: ComputeInstanceId,
910 explain_ctx: ExplainPlanContext,
911}
912
913#[derive(Debug)]
914pub enum IntrospectionSubscribeStage {
915 OptimizeMir(IntrospectionSubscribeOptimizeMir),
916 TimestampOptimizeLir(IntrospectionSubscribeTimestampOptimizeLir),
917 Finish(IntrospectionSubscribeFinish),
918}
919
920#[derive(Debug)]
921pub struct IntrospectionSubscribeOptimizeMir {
922 validity: PlanValidity,
923 plan: plan::SubscribePlan,
924 subscribe_id: GlobalId,
925 cluster_id: ComputeInstanceId,
926 replica_id: ReplicaId,
927}
928
929#[derive(Debug)]
930pub struct IntrospectionSubscribeTimestampOptimizeLir {
931 validity: PlanValidity,
932 optimizer: optimize::subscribe::Optimizer,
933 global_mir_plan: optimize::subscribe::GlobalMirPlan<optimize::subscribe::Unresolved>,
934 cluster_id: ComputeInstanceId,
935 replica_id: ReplicaId,
936}
937
938#[derive(Debug)]
939pub struct IntrospectionSubscribeFinish {
940 validity: PlanValidity,
941 global_lir_plan: optimize::subscribe::GlobalLirPlan,
942 read_holds: ReadHolds<Timestamp>,
943 cluster_id: ComputeInstanceId,
944 replica_id: ReplicaId,
945}
946
947#[derive(Debug)]
948pub enum SecretStage {
949 CreateEnsure(CreateSecretEnsure),
950 CreateFinish(CreateSecretFinish),
951 RotateKeysEnsure(RotateKeysSecretEnsure),
952 RotateKeysFinish(RotateKeysSecretFinish),
953 Alter(AlterSecret),
954}
955
956#[derive(Debug)]
957pub struct CreateSecretEnsure {
958 validity: PlanValidity,
959 plan: plan::CreateSecretPlan,
960}
961
962#[derive(Debug)]
963pub struct CreateSecretFinish {
964 validity: PlanValidity,
965 item_id: CatalogItemId,
966 global_id: GlobalId,
967 plan: plan::CreateSecretPlan,
968}
969
970#[derive(Debug)]
971pub struct RotateKeysSecretEnsure {
972 validity: PlanValidity,
973 id: CatalogItemId,
974}
975
976#[derive(Debug)]
977pub struct RotateKeysSecretFinish {
978 validity: PlanValidity,
979 ops: Vec<crate::catalog::Op>,
980}
981
982#[derive(Debug)]
983pub struct AlterSecret {
984 validity: PlanValidity,
985 plan: plan::AlterSecretPlan,
986}
987
988#[derive(Debug, Copy, Clone, PartialEq, Eq)]
993pub enum TargetCluster {
994 CatalogServer,
996 Active,
998 Transaction(ClusterId),
1000}
1001
1002pub(crate) enum StageResult<T> {
1004 Handle(JoinHandle<Result<T, AdapterError>>),
1006 HandleRetire(JoinHandle<Result<ExecuteResponse, AdapterError>>),
1008 Immediate(T),
1010 Response(ExecuteResponse),
1012}
1013
1014pub(crate) trait Staged: Send {
1016 type Ctx: StagedContext;
1017
1018 fn validity(&mut self) -> &mut PlanValidity;
1019
1020 async fn stage(
1022 self,
1023 coord: &mut Coordinator,
1024 ctx: &mut Self::Ctx,
1025 ) -> Result<StageResult<Box<Self>>, AdapterError>;
1026
1027 fn message(self, ctx: Self::Ctx, span: Span) -> Message;
1029
1030 fn cancel_enabled(&self) -> bool;
1032}
1033
1034pub trait StagedContext {
1035 fn retire(self, result: Result<ExecuteResponse, AdapterError>);
1036 fn session(&self) -> Option<&Session>;
1037}
1038
1039impl StagedContext for ExecuteContext {
1040 fn retire(self, result: Result<ExecuteResponse, AdapterError>) {
1041 self.retire(result);
1042 }
1043
1044 fn session(&self) -> Option<&Session> {
1045 Some(self.session())
1046 }
1047}
1048
1049impl StagedContext for () {
1050 fn retire(self, _result: Result<ExecuteResponse, AdapterError>) {}
1051
1052 fn session(&self) -> Option<&Session> {
1053 None
1054 }
1055}
1056
1057pub struct Config {
1059 pub controller_config: ControllerConfig,
1060 pub controller_envd_epoch: NonZeroI64,
1061 pub storage: Box<dyn mz_catalog::durable::DurableCatalogState>,
1062 pub audit_logs_iterator: AuditLogIterator,
1063 pub timestamp_oracle_url: Option<SensitiveUrl>,
1064 pub unsafe_mode: bool,
1065 pub all_features: bool,
1066 pub build_info: &'static BuildInfo,
1067 pub environment_id: EnvironmentId,
1068 pub metrics_registry: MetricsRegistry,
1069 pub now: NowFn,
1070 pub secrets_controller: Arc<dyn SecretsController>,
1071 pub cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
1072 pub availability_zones: Vec<String>,
1073 pub cluster_replica_sizes: ClusterReplicaSizeMap,
1074 pub builtin_system_cluster_config: BootstrapBuiltinClusterConfig,
1075 pub builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig,
1076 pub builtin_probe_cluster_config: BootstrapBuiltinClusterConfig,
1077 pub builtin_support_cluster_config: BootstrapBuiltinClusterConfig,
1078 pub builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig,
1079 pub system_parameter_defaults: BTreeMap<String, String>,
1080 pub storage_usage_client: StorageUsageClient,
1081 pub storage_usage_collection_interval: Duration,
1082 pub storage_usage_retention_period: Option<Duration>,
1083 pub segment_client: Option<mz_segment::Client>,
1084 pub egress_addresses: Vec<IpNet>,
1085 pub remote_system_parameters: Option<BTreeMap<String, String>>,
1086 pub aws_account_id: Option<String>,
1087 pub aws_privatelink_availability_zones: Option<Vec<String>>,
1088 pub connection_context: ConnectionContext,
1089 pub connection_limit_callback: Box<dyn Fn(u64, u64) -> () + Send + Sync + 'static>,
1090 pub webhook_concurrency_limit: WebhookConcurrencyLimiter,
1091 pub http_host_name: Option<String>,
1092 pub tracing_handle: TracingHandle,
1093 pub read_only_controllers: bool,
1097
1098 pub caught_up_trigger: Option<Trigger>,
1102
1103 pub helm_chart_version: Option<String>,
1104 pub license_key: ValidatedLicenseKey,
1105 pub external_login_password_mz_system: Option<Password>,
1106 pub force_builtin_schema_migration: Option<String>,
1107}
1108
1109#[derive(Debug, Serialize)]
1111pub struct ConnMeta {
1112 secret_key: u32,
1117 connected_at: EpochMillis,
1119 user: User,
1120 application_name: String,
1121 uuid: Uuid,
1122 conn_id: ConnectionId,
1123 client_ip: Option<IpAddr>,
1124
1125 drop_sinks: BTreeSet<GlobalId>,
1128
1129 #[serde(skip)]
1131 deferred_lock: Option<OwnedMutexGuard<()>>,
1132
1133 pending_cluster_alters: BTreeSet<ClusterId>,
1136
1137 #[serde(skip)]
1139 notice_tx: mpsc::UnboundedSender<AdapterNotice>,
1140
1141 authenticated_role: RoleId,
1145}
1146
1147impl ConnMeta {
1148 pub fn conn_id(&self) -> &ConnectionId {
1149 &self.conn_id
1150 }
1151
1152 pub fn user(&self) -> &User {
1153 &self.user
1154 }
1155
1156 pub fn application_name(&self) -> &str {
1157 &self.application_name
1158 }
1159
1160 pub fn authenticated_role_id(&self) -> &RoleId {
1161 &self.authenticated_role
1162 }
1163
1164 pub fn uuid(&self) -> Uuid {
1165 self.uuid
1166 }
1167
1168 pub fn client_ip(&self) -> Option<IpAddr> {
1169 self.client_ip
1170 }
1171
1172 pub fn connected_at(&self) -> EpochMillis {
1173 self.connected_at
1174 }
1175}
1176
1177#[derive(Debug)]
1178pub struct PendingTxn {
1180 ctx: ExecuteContext,
1182 response: Result<PendingTxnResponse, AdapterError>,
1184 action: EndTransactionAction,
1186}
1187
1188#[derive(Debug)]
1189pub enum PendingTxnResponse {
1191 Committed {
1193 params: BTreeMap<&'static str, String>,
1195 },
1196 Rolledback {
1198 params: BTreeMap<&'static str, String>,
1200 },
1201}
1202
1203impl PendingTxnResponse {
1204 pub fn extend_params(&mut self, p: impl IntoIterator<Item = (&'static str, String)>) {
1205 match self {
1206 PendingTxnResponse::Committed { params }
1207 | PendingTxnResponse::Rolledback { params } => params.extend(p),
1208 }
1209 }
1210}
1211
1212impl From<PendingTxnResponse> for ExecuteResponse {
1213 fn from(value: PendingTxnResponse) -> Self {
1214 match value {
1215 PendingTxnResponse::Committed { params } => {
1216 ExecuteResponse::TransactionCommitted { params }
1217 }
1218 PendingTxnResponse::Rolledback { params } => {
1219 ExecuteResponse::TransactionRolledBack { params }
1220 }
1221 }
1222 }
1223}
1224
1225#[derive(Debug)]
1226pub struct PendingReadTxn {
1228 txn: PendingRead,
1230 timestamp_context: TimestampContext<mz_repr::Timestamp>,
1232 created: Instant,
1234 num_requeues: u64,
1238 otel_ctx: OpenTelemetryContext,
1240}
1241
1242impl PendingReadTxn {
1243 pub fn timestamp_context(&self) -> &TimestampContext<mz_repr::Timestamp> {
1245 &self.timestamp_context
1246 }
1247
1248 pub(crate) fn take_context(self) -> ExecuteContext {
1249 self.txn.take_context()
1250 }
1251}
1252
1253#[derive(Debug)]
1254enum PendingRead {
1256 Read {
1257 txn: PendingTxn,
1259 },
1260 ReadThenWrite {
1261 ctx: ExecuteContext,
1263 tx: oneshot::Sender<Option<ExecuteContext>>,
1266 },
1267}
1268
1269impl PendingRead {
1270 #[instrument(level = "debug")]
1275 pub fn finish(self) -> Option<(ExecuteContext, Result<ExecuteResponse, AdapterError>)> {
1276 match self {
1277 PendingRead::Read {
1278 txn:
1279 PendingTxn {
1280 mut ctx,
1281 response,
1282 action,
1283 },
1284 ..
1285 } => {
1286 let changed = ctx.session_mut().vars_mut().end_transaction(action);
1287 let response = response.map(|mut r| {
1289 r.extend_params(changed);
1290 ExecuteResponse::from(r)
1291 });
1292
1293 Some((ctx, response))
1294 }
1295 PendingRead::ReadThenWrite { ctx, tx, .. } => {
1296 let _ = tx.send(Some(ctx));
1298 None
1299 }
1300 }
1301 }
1302
1303 fn label(&self) -> &'static str {
1304 match self {
1305 PendingRead::Read { .. } => "read",
1306 PendingRead::ReadThenWrite { .. } => "read_then_write",
1307 }
1308 }
1309
1310 pub(crate) fn take_context(self) -> ExecuteContext {
1311 match self {
1312 PendingRead::Read { txn, .. } => txn.ctx,
1313 PendingRead::ReadThenWrite { ctx, tx, .. } => {
1314 let _ = tx.send(None);
1317 ctx
1318 }
1319 }
1320 }
1321}
1322
1323#[derive(Debug, Default)]
1333#[must_use]
1334pub struct ExecuteContextExtra {
1335 statement_uuid: Option<StatementLoggingId>,
1336}
1337
1338impl ExecuteContextExtra {
1339 pub(crate) fn new(statement_uuid: Option<StatementLoggingId>) -> Self {
1340 Self { statement_uuid }
1341 }
1342 pub fn is_trivial(&self) -> bool {
1343 self.statement_uuid.is_none()
1344 }
1345 pub fn contents(&self) -> Option<StatementLoggingId> {
1346 self.statement_uuid
1347 }
1348 #[must_use]
1352 pub(crate) fn retire(self) -> Option<StatementLoggingId> {
1353 self.statement_uuid
1354 }
1355}
1356
1357#[derive(Debug)]
1367#[must_use]
1368pub struct ExecuteContextGuard {
1369 extra: ExecuteContextExtra,
1370 coordinator_tx: mpsc::UnboundedSender<Message>,
1375}
1376
1377impl Default for ExecuteContextGuard {
1378 fn default() -> Self {
1379 let (tx, _rx) = mpsc::unbounded_channel();
1383 Self {
1384 extra: ExecuteContextExtra::default(),
1385 coordinator_tx: tx,
1386 }
1387 }
1388}
1389
1390impl ExecuteContextGuard {
1391 pub(crate) fn new(
1392 statement_uuid: Option<StatementLoggingId>,
1393 coordinator_tx: mpsc::UnboundedSender<Message>,
1394 ) -> Self {
1395 Self {
1396 extra: ExecuteContextExtra::new(statement_uuid),
1397 coordinator_tx,
1398 }
1399 }
1400 pub fn is_trivial(&self) -> bool {
1401 self.extra.is_trivial()
1402 }
1403 pub fn contents(&self) -> Option<StatementLoggingId> {
1404 self.extra.contents()
1405 }
1406 pub(crate) fn defuse(mut self) -> ExecuteContextExtra {
1413 std::mem::take(&mut self.extra)
1415 }
1416}
1417
1418impl Drop for ExecuteContextGuard {
1419 fn drop(&mut self) {
1420 if let Some(statement_uuid) = self.extra.statement_uuid.take() {
1421 let msg = Message::RetireExecute {
1424 data: ExecuteContextExtra {
1425 statement_uuid: Some(statement_uuid),
1426 },
1427 otel_ctx: OpenTelemetryContext::obtain(),
1428 reason: StatementEndedExecutionReason::Aborted,
1429 };
1430 let _ = self.coordinator_tx.send(msg);
1433 }
1434 }
1435}
1436
1437#[derive(Debug)]
1449pub struct ExecuteContext {
1450 inner: Box<ExecuteContextInner>,
1451}
1452
1453impl std::ops::Deref for ExecuteContext {
1454 type Target = ExecuteContextInner;
1455 fn deref(&self) -> &Self::Target {
1456 &*self.inner
1457 }
1458}
1459
1460impl std::ops::DerefMut for ExecuteContext {
1461 fn deref_mut(&mut self) -> &mut Self::Target {
1462 &mut *self.inner
1463 }
1464}
1465
1466#[derive(Debug)]
1467pub struct ExecuteContextInner {
1468 tx: ClientTransmitter<ExecuteResponse>,
1469 internal_cmd_tx: mpsc::UnboundedSender<Message>,
1470 session: Session,
1471 extra: ExecuteContextGuard,
1472}
1473
1474impl ExecuteContext {
1475 pub fn session(&self) -> &Session {
1476 &self.session
1477 }
1478
1479 pub fn session_mut(&mut self) -> &mut Session {
1480 &mut self.session
1481 }
1482
1483 pub fn tx(&self) -> &ClientTransmitter<ExecuteResponse> {
1484 &self.tx
1485 }
1486
1487 pub fn tx_mut(&mut self) -> &mut ClientTransmitter<ExecuteResponse> {
1488 &mut self.tx
1489 }
1490
1491 pub fn from_parts(
1492 tx: ClientTransmitter<ExecuteResponse>,
1493 internal_cmd_tx: mpsc::UnboundedSender<Message>,
1494 session: Session,
1495 extra: ExecuteContextGuard,
1496 ) -> Self {
1497 Self {
1498 inner: ExecuteContextInner {
1499 tx,
1500 session,
1501 extra,
1502 internal_cmd_tx,
1503 }
1504 .into(),
1505 }
1506 }
1507
1508 pub fn into_parts(
1517 self,
1518 ) -> (
1519 ClientTransmitter<ExecuteResponse>,
1520 mpsc::UnboundedSender<Message>,
1521 Session,
1522 ExecuteContextGuard,
1523 ) {
1524 let ExecuteContextInner {
1525 tx,
1526 internal_cmd_tx,
1527 session,
1528 extra,
1529 } = *self.inner;
1530 (tx, internal_cmd_tx, session, extra)
1531 }
1532
1533 #[instrument(level = "debug")]
1535 pub fn retire(self, result: Result<ExecuteResponse, AdapterError>) {
1536 let ExecuteContextInner {
1537 tx,
1538 internal_cmd_tx,
1539 session,
1540 extra,
1541 } = *self.inner;
1542 let reason = if extra.is_trivial() {
1543 None
1544 } else {
1545 Some((&result).into())
1546 };
1547 tx.send(result, session);
1548 if let Some(reason) = reason {
1549 let extra = extra.defuse();
1551 if let Err(e) = internal_cmd_tx.send(Message::RetireExecute {
1552 otel_ctx: OpenTelemetryContext::obtain(),
1553 data: extra,
1554 reason,
1555 }) {
1556 warn!("internal_cmd_rx dropped before we could send: {:?}", e);
1557 }
1558 }
1559 }
1560
1561 pub fn extra(&self) -> &ExecuteContextGuard {
1562 &self.extra
1563 }
1564
1565 pub fn extra_mut(&mut self) -> &mut ExecuteContextGuard {
1566 &mut self.extra
1567 }
1568}
1569
1570#[derive(Debug)]
1571struct ClusterReplicaStatuses(
1572 BTreeMap<ClusterId, BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>>>,
1573);
1574
1575impl ClusterReplicaStatuses {
1576 pub(crate) fn new() -> ClusterReplicaStatuses {
1577 ClusterReplicaStatuses(BTreeMap::new())
1578 }
1579
1580 pub(crate) fn initialize_cluster_statuses(&mut self, cluster_id: ClusterId) {
1584 let prev = self.0.insert(cluster_id, BTreeMap::new());
1585 assert_eq!(
1586 prev, None,
1587 "cluster {cluster_id} statuses already initialized"
1588 );
1589 }
1590
1591 pub(crate) fn initialize_cluster_replica_statuses(
1595 &mut self,
1596 cluster_id: ClusterId,
1597 replica_id: ReplicaId,
1598 num_processes: usize,
1599 time: DateTime<Utc>,
1600 ) {
1601 tracing::info!(
1602 ?cluster_id,
1603 ?replica_id,
1604 ?time,
1605 "initializing cluster replica status"
1606 );
1607 let replica_statuses = self.0.entry(cluster_id).or_default();
1608 let process_statuses = (0..num_processes)
1609 .map(|process_id| {
1610 let status = ClusterReplicaProcessStatus {
1611 status: ClusterStatus::Offline(Some(OfflineReason::Initializing)),
1612 time: time.clone(),
1613 };
1614 (u64::cast_from(process_id), status)
1615 })
1616 .collect();
1617 let prev = replica_statuses.insert(replica_id, process_statuses);
1618 assert_none!(
1619 prev,
1620 "cluster replica {cluster_id}.{replica_id} statuses already initialized"
1621 );
1622 }
1623
1624 pub(crate) fn remove_cluster_statuses(
1628 &mut self,
1629 cluster_id: &ClusterId,
1630 ) -> BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>> {
1631 let prev = self.0.remove(cluster_id);
1632 prev.unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"))
1633 }
1634
1635 pub(crate) fn remove_cluster_replica_statuses(
1639 &mut self,
1640 cluster_id: &ClusterId,
1641 replica_id: &ReplicaId,
1642 ) -> BTreeMap<ProcessId, ClusterReplicaProcessStatus> {
1643 let replica_statuses = self
1644 .0
1645 .get_mut(cluster_id)
1646 .unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"));
1647 let prev = replica_statuses.remove(replica_id);
1648 prev.unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1649 }
1650
1651 pub(crate) fn ensure_cluster_status(
1655 &mut self,
1656 cluster_id: ClusterId,
1657 replica_id: ReplicaId,
1658 process_id: ProcessId,
1659 status: ClusterReplicaProcessStatus,
1660 ) {
1661 let replica_statuses = self
1662 .0
1663 .get_mut(&cluster_id)
1664 .unwrap_or_else(|| panic!("unknown cluster: {cluster_id}"))
1665 .get_mut(&replica_id)
1666 .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"));
1667 replica_statuses.insert(process_id, status);
1668 }
1669
1670 pub fn get_cluster_replica_status(
1674 &self,
1675 cluster_id: ClusterId,
1676 replica_id: ReplicaId,
1677 ) -> ClusterStatus {
1678 let process_status = self.get_cluster_replica_statuses(cluster_id, replica_id);
1679 Self::cluster_replica_status(process_status)
1680 }
1681
1682 pub fn cluster_replica_status(
1684 process_status: &BTreeMap<ProcessId, ClusterReplicaProcessStatus>,
1685 ) -> ClusterStatus {
1686 process_status
1687 .values()
1688 .fold(ClusterStatus::Online, |s, p| match (s, p.status) {
1689 (ClusterStatus::Online, ClusterStatus::Online) => ClusterStatus::Online,
1690 (x, y) => {
1691 let reason_x = match x {
1692 ClusterStatus::Offline(reason) => reason,
1693 ClusterStatus::Online => None,
1694 };
1695 let reason_y = match y {
1696 ClusterStatus::Offline(reason) => reason,
1697 ClusterStatus::Online => None,
1698 };
1699 ClusterStatus::Offline(reason_x.or(reason_y))
1701 }
1702 })
1703 }
1704
1705 pub(crate) fn get_cluster_replica_statuses(
1709 &self,
1710 cluster_id: ClusterId,
1711 replica_id: ReplicaId,
1712 ) -> &BTreeMap<ProcessId, ClusterReplicaProcessStatus> {
1713 self.try_get_cluster_replica_statuses(cluster_id, replica_id)
1714 .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1715 }
1716
1717 pub(crate) fn try_get_cluster_replica_statuses(
1719 &self,
1720 cluster_id: ClusterId,
1721 replica_id: ReplicaId,
1722 ) -> Option<&BTreeMap<ProcessId, ClusterReplicaProcessStatus>> {
1723 self.try_get_cluster_statuses(cluster_id)
1724 .and_then(|statuses| statuses.get(&replica_id))
1725 }
1726
1727 pub(crate) fn try_get_cluster_statuses(
1729 &self,
1730 cluster_id: ClusterId,
1731 ) -> Option<&BTreeMap<ReplicaId, BTreeMap<ProcessId, ClusterReplicaProcessStatus>>> {
1732 self.0.get(&cluster_id)
1733 }
1734}
1735
1736#[derive(Derivative)]
1738#[derivative(Debug)]
1739pub struct Coordinator {
1740 #[derivative(Debug = "ignore")]
1742 controller: mz_controller::Controller,
1743 catalog: Arc<Catalog>,
1751
1752 persist_client: PersistClient,
1755
1756 internal_cmd_tx: mpsc::UnboundedSender<Message>,
1758 group_commit_tx: appends::GroupCommitNotifier,
1760
1761 strict_serializable_reads_tx: mpsc::UnboundedSender<(ConnectionId, PendingReadTxn)>,
1763
1764 global_timelines: BTreeMap<Timeline, TimelineState<Timestamp>>,
1767
1768 transient_id_gen: Arc<TransientIdGen>,
1770 active_conns: BTreeMap<ConnectionId, ConnMeta>,
1773
1774 txn_read_holds: BTreeMap<ConnectionId, read_policy::ReadHolds<Timestamp>>,
1778
1779 pending_peeks: BTreeMap<Uuid, PendingPeek>,
1783 client_pending_peeks: BTreeMap<ConnectionId, BTreeMap<Uuid, ClusterId>>,
1785
1786 pending_linearize_read_txns: BTreeMap<ConnectionId, PendingReadTxn>,
1788
1789 active_compute_sinks: BTreeMap<GlobalId, ActiveComputeSink>,
1791 active_webhooks: BTreeMap<CatalogItemId, WebhookAppenderInvalidator>,
1793 active_copies: BTreeMap<ConnectionId, ActiveCopyFrom>,
1796
1797 staged_cancellation: BTreeMap<ConnectionId, (watch::Sender<bool>, watch::Receiver<bool>)>,
1800 introspection_subscribes: BTreeMap<GlobalId, IntrospectionSubscribe>,
1802
1803 write_locks: BTreeMap<CatalogItemId, Arc<tokio::sync::Mutex<()>>>,
1805 deferred_write_ops: BTreeMap<ConnectionId, DeferredOp>,
1807
1808 pending_writes: Vec<PendingWriteTxn>,
1810
1811 advance_timelines_interval: Interval,
1821
1822 serialized_ddl: LockedVecDeque<DeferredPlanStatement>,
1831
1832 secrets_controller: Arc<dyn SecretsController>,
1835 caching_secrets_reader: CachingSecretsReader,
1837
1838 cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
1841
1842 storage_usage_client: StorageUsageClient,
1844 storage_usage_collection_interval: Duration,
1846
1847 #[derivative(Debug = "ignore")]
1849 segment_client: Option<mz_segment::Client>,
1850
1851 metrics: Metrics,
1853 optimizer_metrics: OptimizerMetrics,
1855
1856 tracing_handle: TracingHandle,
1858
1859 statement_logging: StatementLogging,
1861
1862 webhook_concurrency_limit: WebhookConcurrencyLimiter,
1864
1865 timestamp_oracle_config: Option<TimestampOracleConfig>,
1868
1869 check_cluster_scheduling_policies_interval: Interval,
1871
1872 cluster_scheduling_decisions: BTreeMap<ClusterId, BTreeMap<&'static str, SchedulingDecision>>,
1876
1877 caught_up_check_interval: Interval,
1880
1881 caught_up_check: Option<CaughtUpCheckContext>,
1884
1885 installed_watch_sets: BTreeMap<WatchSetId, (ConnectionId, WatchSetResponse)>,
1887
1888 connection_watch_sets: BTreeMap<ConnectionId, BTreeSet<WatchSetId>>,
1890
1891 cluster_replica_statuses: ClusterReplicaStatuses,
1893
1894 read_only_controllers: bool,
1898
1899 buffered_builtin_table_updates: Option<Vec<BuiltinTableUpdate>>,
1907
1908 license_key: ValidatedLicenseKey,
1909}
1910
1911impl Coordinator {
1912 #[instrument(name = "coord::bootstrap")]
1916 pub(crate) async fn bootstrap(
1917 &mut self,
1918 boot_ts: Timestamp,
1919 migrated_storage_collections_0dt: BTreeSet<CatalogItemId>,
1920 mut builtin_table_updates: Vec<BuiltinTableUpdate>,
1921 cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
1922 uncached_local_exprs: BTreeMap<GlobalId, LocalExpressions>,
1923 audit_logs_iterator: AuditLogIterator,
1924 ) -> Result<(), AdapterError> {
1925 let bootstrap_start = Instant::now();
1926 info!("startup: coordinator init: bootstrap beginning");
1927 info!("startup: coordinator init: bootstrap: preamble beginning");
1928
1929 let cluster_statuses: Vec<(_, Vec<_>)> = self
1932 .catalog()
1933 .clusters()
1934 .map(|cluster| {
1935 (
1936 cluster.id(),
1937 cluster
1938 .replicas()
1939 .map(|replica| {
1940 (replica.replica_id, replica.config.location.num_processes())
1941 })
1942 .collect(),
1943 )
1944 })
1945 .collect();
1946 let now = self.now_datetime();
1947 for (cluster_id, replica_statuses) in cluster_statuses {
1948 self.cluster_replica_statuses
1949 .initialize_cluster_statuses(cluster_id);
1950 for (replica_id, num_processes) in replica_statuses {
1951 self.cluster_replica_statuses
1952 .initialize_cluster_replica_statuses(
1953 cluster_id,
1954 replica_id,
1955 num_processes,
1956 now,
1957 );
1958 }
1959 }
1960
1961 let system_config = self.catalog().system_config();
1962
1963 mz_metrics::update_dyncfg(&system_config.dyncfg_updates());
1965
1966 let compute_config = flags::compute_config(system_config);
1968 let storage_config = flags::storage_config(system_config);
1969 let scheduling_config = flags::orchestrator_scheduling_config(system_config);
1970 let dyncfg_updates = system_config.dyncfg_updates();
1971 self.controller.compute.update_configuration(compute_config);
1972 self.controller.storage.update_parameters(storage_config);
1973 self.controller
1974 .update_orchestrator_scheduling_config(scheduling_config);
1975 self.controller.update_configuration(dyncfg_updates);
1976
1977 self.validate_resource_limit_numeric(
1978 Numeric::zero(),
1979 self.current_credit_consumption_rate(),
1980 |system_vars| {
1981 self.license_key
1982 .max_credit_consumption_rate()
1983 .map_or_else(|| system_vars.max_credit_consumption_rate(), Numeric::from)
1984 },
1985 "cluster replica",
1986 MAX_CREDIT_CONSUMPTION_RATE.name(),
1987 )?;
1988
1989 let mut policies_to_set: BTreeMap<CompactionWindow, CollectionIdBundle> =
1990 Default::default();
1991
1992 let enable_worker_core_affinity =
1993 self.catalog().system_config().enable_worker_core_affinity();
1994 for instance in self.catalog.clusters() {
1995 self.controller.create_cluster(
1996 instance.id,
1997 ClusterConfig {
1998 arranged_logs: instance.log_indexes.clone(),
1999 workload_class: instance.config.workload_class.clone(),
2000 },
2001 )?;
2002 for replica in instance.replicas() {
2003 let role = instance.role();
2004 self.controller.create_replica(
2005 instance.id,
2006 replica.replica_id,
2007 instance.name.clone(),
2008 replica.name.clone(),
2009 role,
2010 replica.config.clone(),
2011 enable_worker_core_affinity,
2012 )?;
2013 }
2014 }
2015
2016 info!(
2017 "startup: coordinator init: bootstrap: preamble complete in {:?}",
2018 bootstrap_start.elapsed()
2019 );
2020
2021 let init_storage_collections_start = Instant::now();
2022 info!("startup: coordinator init: bootstrap: storage collections init beginning");
2023 self.bootstrap_storage_collections(&migrated_storage_collections_0dt)
2024 .await;
2025 info!(
2026 "startup: coordinator init: bootstrap: storage collections init complete in {:?}",
2027 init_storage_collections_start.elapsed()
2028 );
2029
2030 self.controller.start_compute_introspection_sink();
2035
2036 let optimize_dataflows_start = Instant::now();
2037 info!("startup: coordinator init: bootstrap: optimize dataflow plans beginning");
2038 let entries: Vec<_> = self.catalog().entries().cloned().collect();
2039 let uncached_global_exps = self.bootstrap_dataflow_plans(&entries, cached_global_exprs)?;
2040 info!(
2041 "startup: coordinator init: bootstrap: optimize dataflow plans complete in {:?}",
2042 optimize_dataflows_start.elapsed()
2043 );
2044
2045 let _fut = self.catalog().update_expression_cache(
2047 uncached_local_exprs.into_iter().collect(),
2048 uncached_global_exps.into_iter().collect(),
2049 );
2050
2051 let bootstrap_as_ofs_start = Instant::now();
2055 info!("startup: coordinator init: bootstrap: dataflow as-of bootstrapping beginning");
2056 let dataflow_read_holds = self.bootstrap_dataflow_as_ofs().await;
2057 info!(
2058 "startup: coordinator init: bootstrap: dataflow as-of bootstrapping complete in {:?}",
2059 bootstrap_as_ofs_start.elapsed()
2060 );
2061
2062 let postamble_start = Instant::now();
2063 info!("startup: coordinator init: bootstrap: postamble beginning");
2064
2065 let logs: BTreeSet<_> = BUILTINS::logs()
2066 .map(|log| self.catalog().resolve_builtin_log(log))
2067 .flat_map(|item_id| self.catalog().get_global_ids(&item_id))
2068 .collect();
2069
2070 let mut privatelink_connections = BTreeMap::new();
2071
2072 for entry in &entries {
2073 debug!(
2074 "coordinator init: installing {} {}",
2075 entry.item().typ(),
2076 entry.id()
2077 );
2078 let mut policy = entry.item().initial_logical_compaction_window();
2079 match entry.item() {
2080 CatalogItem::Source(source) => {
2086 if source.custom_logical_compaction_window.is_none() {
2088 if let DataSourceDesc::IngestionExport { ingestion_id, .. } =
2089 source.data_source
2090 {
2091 policy = Some(
2092 self.catalog()
2093 .get_entry(&ingestion_id)
2094 .source()
2095 .expect("must be source")
2096 .custom_logical_compaction_window
2097 .unwrap_or_default(),
2098 );
2099 }
2100 }
2101 policies_to_set
2102 .entry(policy.expect("sources have a compaction window"))
2103 .or_insert_with(Default::default)
2104 .storage_ids
2105 .insert(source.global_id());
2106 }
2107 CatalogItem::Table(table) => {
2108 policies_to_set
2109 .entry(policy.expect("tables have a compaction window"))
2110 .or_insert_with(Default::default)
2111 .storage_ids
2112 .extend(table.global_ids());
2113 }
2114 CatalogItem::Index(idx) => {
2115 let policy_entry = policies_to_set
2116 .entry(policy.expect("indexes have a compaction window"))
2117 .or_insert_with(Default::default);
2118
2119 if logs.contains(&idx.on) {
2120 policy_entry
2121 .compute_ids
2122 .entry(idx.cluster_id)
2123 .or_insert_with(BTreeSet::new)
2124 .insert(idx.global_id());
2125 } else {
2126 let df_desc = self
2127 .catalog()
2128 .try_get_physical_plan(&idx.global_id())
2129 .expect("added in `bootstrap_dataflow_plans`")
2130 .clone();
2131
2132 let df_meta = self
2133 .catalog()
2134 .try_get_dataflow_metainfo(&idx.global_id())
2135 .expect("added in `bootstrap_dataflow_plans`");
2136
2137 if self.catalog().state().system_config().enable_mz_notices() {
2138 self.catalog().state().pack_optimizer_notices(
2140 &mut builtin_table_updates,
2141 df_meta.optimizer_notices.iter(),
2142 Diff::ONE,
2143 );
2144 }
2145
2146 policy_entry
2149 .compute_ids
2150 .entry(idx.cluster_id)
2151 .or_insert_with(Default::default)
2152 .extend(df_desc.export_ids());
2153
2154 self.controller
2155 .compute
2156 .create_dataflow(idx.cluster_id, df_desc, None)
2157 .unwrap_or_terminate("cannot fail to create dataflows");
2158 }
2159 }
2160 CatalogItem::View(_) => (),
2161 CatalogItem::MaterializedView(mview) => {
2162 policies_to_set
2163 .entry(policy.expect("materialized views have a compaction window"))
2164 .or_insert_with(Default::default)
2165 .storage_ids
2166 .insert(mview.global_id_writes());
2167
2168 let mut df_desc = self
2169 .catalog()
2170 .try_get_physical_plan(&mview.global_id_writes())
2171 .expect("added in `bootstrap_dataflow_plans`")
2172 .clone();
2173
2174 if let Some(initial_as_of) = mview.initial_as_of.clone() {
2175 df_desc.set_initial_as_of(initial_as_of);
2176 }
2177
2178 let until = mview
2180 .refresh_schedule
2181 .as_ref()
2182 .and_then(|s| s.last_refresh())
2183 .and_then(|r| r.try_step_forward());
2184 if let Some(until) = until {
2185 df_desc.until.meet_assign(&Antichain::from_elem(until));
2186 }
2187
2188 let df_meta = self
2189 .catalog()
2190 .try_get_dataflow_metainfo(&mview.global_id_writes())
2191 .expect("added in `bootstrap_dataflow_plans`");
2192
2193 if self.catalog().state().system_config().enable_mz_notices() {
2194 self.catalog().state().pack_optimizer_notices(
2196 &mut builtin_table_updates,
2197 df_meta.optimizer_notices.iter(),
2198 Diff::ONE,
2199 );
2200 }
2201
2202 self.ship_dataflow(df_desc, mview.cluster_id, None).await;
2203
2204 if mview.replacement_target.is_none() {
2207 self.allow_writes(mview.cluster_id, mview.global_id_writes());
2208 }
2209 }
2210 CatalogItem::Sink(sink) => {
2211 policies_to_set
2212 .entry(CompactionWindow::Default)
2213 .or_insert_with(Default::default)
2214 .storage_ids
2215 .insert(sink.global_id());
2216 }
2217 CatalogItem::Connection(catalog_connection) => {
2218 if let ConnectionDetails::AwsPrivatelink(conn) = &catalog_connection.details {
2219 privatelink_connections.insert(
2220 entry.id(),
2221 VpcEndpointConfig {
2222 aws_service_name: conn.service_name.clone(),
2223 availability_zone_ids: conn.availability_zones.clone(),
2224 },
2225 );
2226 }
2227 }
2228 CatalogItem::ContinualTask(ct) => {
2229 policies_to_set
2230 .entry(policy.expect("continual tasks have a compaction window"))
2231 .or_insert_with(Default::default)
2232 .storage_ids
2233 .insert(ct.global_id());
2234
2235 let mut df_desc = self
2236 .catalog()
2237 .try_get_physical_plan(&ct.global_id())
2238 .expect("added in `bootstrap_dataflow_plans`")
2239 .clone();
2240
2241 if let Some(initial_as_of) = ct.initial_as_of.clone() {
2242 df_desc.set_initial_as_of(initial_as_of);
2243 }
2244
2245 let df_meta = self
2246 .catalog()
2247 .try_get_dataflow_metainfo(&ct.global_id())
2248 .expect("added in `bootstrap_dataflow_plans`");
2249
2250 if self.catalog().state().system_config().enable_mz_notices() {
2251 self.catalog().state().pack_optimizer_notices(
2253 &mut builtin_table_updates,
2254 df_meta.optimizer_notices.iter(),
2255 Diff::ONE,
2256 );
2257 }
2258
2259 self.ship_dataflow(df_desc, ct.cluster_id, None).await;
2260 self.allow_writes(ct.cluster_id, ct.global_id());
2261 }
2262 CatalogItem::Log(_)
2264 | CatalogItem::Type(_)
2265 | CatalogItem::Func(_)
2266 | CatalogItem::Secret(_) => {}
2267 }
2268 }
2269
2270 if let Some(cloud_resource_controller) = &self.cloud_resource_controller {
2271 let existing_vpc_endpoints = cloud_resource_controller
2273 .list_vpc_endpoints()
2274 .await
2275 .context("list vpc endpoints")?;
2276 let existing_vpc_endpoints = BTreeSet::from_iter(existing_vpc_endpoints.into_keys());
2277 let desired_vpc_endpoints = privatelink_connections.keys().cloned().collect();
2278 let vpc_endpoints_to_remove = existing_vpc_endpoints.difference(&desired_vpc_endpoints);
2279 for id in vpc_endpoints_to_remove {
2280 cloud_resource_controller
2281 .delete_vpc_endpoint(*id)
2282 .await
2283 .context("deleting extraneous vpc endpoint")?;
2284 }
2285
2286 for (id, spec) in privatelink_connections {
2288 cloud_resource_controller
2289 .ensure_vpc_endpoint(id, spec)
2290 .await
2291 .context("ensuring vpc endpoint")?;
2292 }
2293 }
2294
2295 drop(dataflow_read_holds);
2298 for (cw, policies) in policies_to_set {
2300 self.initialize_read_policies(&policies, cw).await;
2301 }
2302
2303 builtin_table_updates.extend(
2305 self.catalog().state().resolve_builtin_table_updates(
2306 self.catalog().state().pack_all_replica_size_updates(),
2307 ),
2308 );
2309
2310 debug!("startup: coordinator init: bootstrap: initializing migrated builtin tables");
2311 let migrated_updates_fut = if self.controller.read_only() {
2317 let min_timestamp = Timestamp::minimum();
2318 let migrated_builtin_table_updates: Vec<_> = builtin_table_updates
2319 .extract_if(.., |update| {
2320 let gid = self.catalog().get_entry(&update.id).latest_global_id();
2321 migrated_storage_collections_0dt.contains(&update.id)
2322 && self
2323 .controller
2324 .storage_collections
2325 .collection_frontiers(gid)
2326 .expect("all tables are registered")
2327 .write_frontier
2328 .elements()
2329 == &[min_timestamp]
2330 })
2331 .collect();
2332 if migrated_builtin_table_updates.is_empty() {
2333 futures::future::ready(()).boxed()
2334 } else {
2335 let mut grouped_appends: BTreeMap<GlobalId, Vec<TableData>> = BTreeMap::new();
2337 for update in migrated_builtin_table_updates {
2338 let gid = self.catalog().get_entry(&update.id).latest_global_id();
2339 grouped_appends.entry(gid).or_default().push(update.data);
2340 }
2341 info!(
2342 "coordinator init: rehydrating migrated builtin tables in read-only mode: {:?}",
2343 grouped_appends.keys().collect::<Vec<_>>()
2344 );
2345
2346 let mut all_appends = Vec::with_capacity(grouped_appends.len());
2348 for (item_id, table_data) in grouped_appends.into_iter() {
2349 let mut all_rows = Vec::new();
2350 let mut all_data = Vec::new();
2351 for data in table_data {
2352 match data {
2353 TableData::Rows(rows) => all_rows.extend(rows),
2354 TableData::Batches(_) => all_data.push(data),
2355 }
2356 }
2357 differential_dataflow::consolidation::consolidate(&mut all_rows);
2358 all_data.push(TableData::Rows(all_rows));
2359
2360 all_appends.push((item_id, all_data));
2362 }
2363
2364 let fut = self
2365 .controller
2366 .storage
2367 .append_table(min_timestamp, boot_ts.step_forward(), all_appends)
2368 .expect("cannot fail to append");
2369 async {
2370 fut.await
2371 .expect("One-shot shouldn't be dropped during bootstrap")
2372 .unwrap_or_terminate("cannot fail to append")
2373 }
2374 .boxed()
2375 }
2376 } else {
2377 futures::future::ready(()).boxed()
2378 };
2379
2380 info!(
2381 "startup: coordinator init: bootstrap: postamble complete in {:?}",
2382 postamble_start.elapsed()
2383 );
2384
2385 let builtin_update_start = Instant::now();
2386 info!("startup: coordinator init: bootstrap: generate builtin updates beginning");
2387
2388 if self.controller.read_only() {
2389 info!(
2390 "coordinator init: bootstrap: stashing builtin table updates while in read-only mode"
2391 );
2392
2393 let audit_join_start = Instant::now();
2395 info!("startup: coordinator init: bootstrap: audit log deserialization beginning");
2396 let audit_log_updates: Vec<_> = audit_logs_iterator
2397 .map(|(audit_log, ts)| StateUpdate {
2398 kind: StateUpdateKind::AuditLog(audit_log),
2399 ts,
2400 diff: StateDiff::Addition,
2401 })
2402 .collect();
2403 let audit_log_builtin_table_updates = self
2404 .catalog()
2405 .state()
2406 .generate_builtin_table_updates(audit_log_updates);
2407 builtin_table_updates.extend(audit_log_builtin_table_updates);
2408 info!(
2409 "startup: coordinator init: bootstrap: audit log deserialization complete in {:?}",
2410 audit_join_start.elapsed()
2411 );
2412 self.buffered_builtin_table_updates
2413 .as_mut()
2414 .expect("in read-only mode")
2415 .append(&mut builtin_table_updates);
2416 } else {
2417 self.bootstrap_tables(&entries, builtin_table_updates, audit_logs_iterator)
2418 .await;
2419 };
2420 info!(
2421 "startup: coordinator init: bootstrap: generate builtin updates complete in {:?}",
2422 builtin_update_start.elapsed()
2423 );
2424
2425 let cleanup_secrets_start = Instant::now();
2426 info!("startup: coordinator init: bootstrap: generate secret cleanup beginning");
2427 {
2431 let Self {
2434 secrets_controller,
2435 catalog,
2436 ..
2437 } = self;
2438
2439 let next_user_item_id = catalog.get_next_user_item_id().await?;
2440 let next_system_item_id = catalog.get_next_system_item_id().await?;
2441 let read_only = self.controller.read_only();
2442 let catalog_ids: BTreeSet<CatalogItemId> =
2447 catalog.entries().map(|entry| entry.id()).collect();
2448 let secrets_controller = Arc::clone(secrets_controller);
2449
2450 spawn(|| "cleanup-orphaned-secrets", async move {
2451 if read_only {
2452 info!(
2453 "coordinator init: not cleaning up orphaned secrets while in read-only mode"
2454 );
2455 return;
2456 }
2457 info!("coordinator init: cleaning up orphaned secrets");
2458
2459 match secrets_controller.list().await {
2460 Ok(controller_secrets) => {
2461 let controller_secrets: BTreeSet<CatalogItemId> =
2462 controller_secrets.into_iter().collect();
2463 let orphaned = controller_secrets.difference(&catalog_ids);
2464 for id in orphaned {
2465 let id_too_large = match id {
2466 CatalogItemId::System(id) => *id >= next_system_item_id,
2467 CatalogItemId::User(id) => *id >= next_user_item_id,
2468 CatalogItemId::IntrospectionSourceIndex(_)
2469 | CatalogItemId::Transient(_) => false,
2470 };
2471 if id_too_large {
2472 info!(
2473 %next_user_item_id, %next_system_item_id,
2474 "coordinator init: not deleting orphaned secret {id} that was likely created by a newer deploy generation"
2475 );
2476 } else {
2477 info!("coordinator init: deleting orphaned secret {id}");
2478 fail_point!("orphan_secrets");
2479 if let Err(e) = secrets_controller.delete(*id).await {
2480 warn!(
2481 "Dropping orphaned secret has encountered an error: {}",
2482 e
2483 );
2484 }
2485 }
2486 }
2487 }
2488 Err(e) => warn!("Failed to list secrets during orphan cleanup: {:?}", e),
2489 }
2490 });
2491 }
2492 info!(
2493 "startup: coordinator init: bootstrap: generate secret cleanup complete in {:?}",
2494 cleanup_secrets_start.elapsed()
2495 );
2496
2497 let final_steps_start = Instant::now();
2499 info!(
2500 "startup: coordinator init: bootstrap: migrate builtin tables in read-only mode beginning"
2501 );
2502 migrated_updates_fut
2503 .instrument(info_span!("coord::bootstrap::final"))
2504 .await;
2505
2506 debug!(
2507 "startup: coordinator init: bootstrap: announcing completion of initialization to controller"
2508 );
2509 self.controller.initialization_complete();
2511
2512 self.bootstrap_introspection_subscribes().await;
2514
2515 info!(
2516 "startup: coordinator init: bootstrap: migrate builtin tables in read-only mode complete in {:?}",
2517 final_steps_start.elapsed()
2518 );
2519
2520 info!(
2521 "startup: coordinator init: bootstrap complete in {:?}",
2522 bootstrap_start.elapsed()
2523 );
2524 Ok(())
2525 }
2526
2527 #[allow(clippy::async_yields_async)]
2532 #[instrument]
2533 async fn bootstrap_tables(
2534 &mut self,
2535 entries: &[CatalogEntry],
2536 mut builtin_table_updates: Vec<BuiltinTableUpdate>,
2537 audit_logs_iterator: AuditLogIterator,
2538 ) {
2539 struct TableMetadata<'a> {
2541 id: CatalogItemId,
2542 name: &'a QualifiedItemName,
2543 table: &'a Table,
2544 }
2545
2546 let table_metas: Vec<_> = entries
2548 .into_iter()
2549 .filter_map(|entry| {
2550 entry.table().map(|table| TableMetadata {
2551 id: entry.id(),
2552 name: entry.name(),
2553 table,
2554 })
2555 })
2556 .collect();
2557
2558 debug!("coordinator init: advancing all tables to current timestamp");
2560 let WriteTimestamp {
2561 timestamp: write_ts,
2562 advance_to,
2563 } = self.get_local_write_ts().await;
2564 let appends = table_metas
2565 .iter()
2566 .map(|meta| (meta.table.global_id_writes(), Vec::new()))
2567 .collect();
2568 let table_fence_rx = self
2572 .controller
2573 .storage
2574 .append_table(write_ts.clone(), advance_to, appends)
2575 .expect("invalid updates");
2576
2577 self.apply_local_write(write_ts).await;
2578
2579 debug!("coordinator init: resetting system tables");
2581 let read_ts = self.get_local_read_ts().await;
2582
2583 let mz_storage_usage_by_shard_schema: SchemaSpecifier = self
2586 .catalog()
2587 .resolve_system_schema(MZ_STORAGE_USAGE_BY_SHARD.schema)
2588 .into();
2589 let is_storage_usage_by_shard = |meta: &TableMetadata| -> bool {
2590 meta.name.item == MZ_STORAGE_USAGE_BY_SHARD.name
2591 && meta.name.qualifiers.schema_spec == mz_storage_usage_by_shard_schema
2592 };
2593
2594 let mut retraction_tasks = Vec::new();
2595 let mut system_tables: Vec<_> = table_metas
2596 .iter()
2597 .filter(|meta| meta.id.is_system() && !is_storage_usage_by_shard(meta))
2598 .collect();
2599
2600 let (audit_events_idx, _) = system_tables
2602 .iter()
2603 .find_position(|table| {
2604 table.id == self.catalog().resolve_builtin_table(&MZ_AUDIT_EVENTS)
2605 })
2606 .expect("mz_audit_events must exist");
2607 let audit_events = system_tables.remove(audit_events_idx);
2608 let audit_log_task = self.bootstrap_audit_log_table(
2609 audit_events.id,
2610 audit_events.name,
2611 audit_events.table,
2612 audit_logs_iterator,
2613 read_ts,
2614 );
2615
2616 for system_table in system_tables {
2617 let table_id = system_table.id;
2618 let full_name = self.catalog().resolve_full_name(system_table.name, None);
2619 debug!("coordinator init: resetting system table {full_name} ({table_id})");
2620
2621 let snapshot_fut = self
2623 .controller
2624 .storage_collections
2625 .snapshot_cursor(system_table.table.global_id_writes(), read_ts);
2626 let batch_fut = self
2627 .controller
2628 .storage_collections
2629 .create_update_builder(system_table.table.global_id_writes());
2630
2631 let task = spawn(|| format!("snapshot-{table_id}"), async move {
2632 let mut batch = batch_fut
2634 .await
2635 .unwrap_or_terminate("cannot fail to create a batch for a BuiltinTable");
2636 tracing::info!(?table_id, "starting snapshot");
2637 let mut snapshot_cursor = snapshot_fut
2639 .await
2640 .unwrap_or_terminate("cannot fail to snapshot");
2641
2642 while let Some(values) = snapshot_cursor.next().await {
2644 for (key, _t, d) in values {
2645 let d_invert = d.neg();
2646 batch.add(&key, &(), &d_invert).await;
2647 }
2648 }
2649 tracing::info!(?table_id, "finished snapshot");
2650
2651 let batch = batch.finish().await;
2652 BuiltinTableUpdate::batch(table_id, batch)
2653 });
2654 retraction_tasks.push(task);
2655 }
2656
2657 let retractions_res = futures::future::join_all(retraction_tasks).await;
2658 for retractions in retractions_res {
2659 builtin_table_updates.push(retractions);
2660 }
2661
2662 let audit_join_start = Instant::now();
2663 info!("startup: coordinator init: bootstrap: join audit log deserialization beginning");
2664 let audit_log_updates = audit_log_task.await;
2665 let audit_log_builtin_table_updates = self
2666 .catalog()
2667 .state()
2668 .generate_builtin_table_updates(audit_log_updates);
2669 builtin_table_updates.extend(audit_log_builtin_table_updates);
2670 info!(
2671 "startup: coordinator init: bootstrap: join audit log deserialization complete in {:?}",
2672 audit_join_start.elapsed()
2673 );
2674
2675 table_fence_rx
2677 .await
2678 .expect("One-shot shouldn't be dropped during bootstrap")
2679 .unwrap_or_terminate("cannot fail to append");
2680
2681 info!("coordinator init: sending builtin table updates");
2682 let (_builtin_updates_fut, write_ts) = self
2683 .builtin_table_update()
2684 .execute(builtin_table_updates)
2685 .await;
2686 info!(?write_ts, "our write ts");
2687 if let Some(write_ts) = write_ts {
2688 self.apply_local_write(write_ts).await;
2689 }
2690 }
2691
2692 #[instrument]
2696 fn bootstrap_audit_log_table<'a>(
2697 &self,
2698 table_id: CatalogItemId,
2699 name: &'a QualifiedItemName,
2700 table: &'a Table,
2701 audit_logs_iterator: AuditLogIterator,
2702 read_ts: Timestamp,
2703 ) -> JoinHandle<Vec<StateUpdate>> {
2704 let full_name = self.catalog().resolve_full_name(name, None);
2705 debug!("coordinator init: reconciling audit log: {full_name} ({table_id})");
2706 let current_contents_fut = self
2707 .controller
2708 .storage_collections
2709 .snapshot(table.global_id_writes(), read_ts);
2710 spawn(|| format!("snapshot-audit-log-{table_id}"), async move {
2711 let current_contents = current_contents_fut
2712 .await
2713 .unwrap_or_terminate("cannot fail to fetch snapshot");
2714 let contents_len = current_contents.len();
2715 debug!("coordinator init: audit log table ({table_id}) size {contents_len}");
2716
2717 let max_table_id = current_contents
2719 .into_iter()
2720 .filter(|(_, diff)| *diff == 1)
2721 .map(|(row, _diff)| row.unpack_first().unwrap_uint64())
2722 .sorted()
2723 .rev()
2724 .next();
2725
2726 audit_logs_iterator
2728 .take_while(|(audit_log, _)| match max_table_id {
2729 Some(id) => audit_log.event.sortable_id() > id,
2730 None => true,
2731 })
2732 .map(|(audit_log, ts)| StateUpdate {
2733 kind: StateUpdateKind::AuditLog(audit_log),
2734 ts,
2735 diff: StateDiff::Addition,
2736 })
2737 .collect::<Vec<_>>()
2738 })
2739 }
2740
2741 #[instrument]
2754 async fn bootstrap_storage_collections(
2755 &mut self,
2756 migrated_storage_collections: &BTreeSet<CatalogItemId>,
2757 ) {
2758 let catalog = self.catalog();
2759
2760 let source_desc = |object_id: GlobalId,
2761 data_source: &DataSourceDesc,
2762 desc: &RelationDesc,
2763 timeline: &Timeline| {
2764 let data_source = match data_source.clone() {
2765 DataSourceDesc::Ingestion { desc, cluster_id } => {
2767 let desc = desc.into_inline_connection(catalog.state());
2768 let ingestion = IngestionDescription::new(desc, cluster_id, object_id);
2769 DataSource::Ingestion(ingestion)
2770 }
2771 DataSourceDesc::OldSyntaxIngestion {
2772 desc,
2773 progress_subsource,
2774 data_config,
2775 details,
2776 cluster_id,
2777 } => {
2778 let desc = desc.into_inline_connection(catalog.state());
2779 let data_config = data_config.into_inline_connection(catalog.state());
2780 let progress_subsource =
2783 catalog.get_entry(&progress_subsource).latest_global_id();
2784 let mut ingestion =
2785 IngestionDescription::new(desc, cluster_id, progress_subsource);
2786 let legacy_export = SourceExport {
2787 storage_metadata: (),
2788 data_config,
2789 details,
2790 };
2791 ingestion.source_exports.insert(object_id, legacy_export);
2792
2793 DataSource::Ingestion(ingestion)
2794 }
2795 DataSourceDesc::IngestionExport {
2796 ingestion_id,
2797 external_reference: _,
2798 details,
2799 data_config,
2800 } => {
2801 let ingestion_id = catalog.get_entry(&ingestion_id).latest_global_id();
2804
2805 DataSource::IngestionExport {
2806 ingestion_id,
2807 details,
2808 data_config: data_config.into_inline_connection(catalog.state()),
2809 }
2810 }
2811 DataSourceDesc::Webhook { .. } => DataSource::Webhook,
2812 DataSourceDesc::Progress => DataSource::Progress,
2813 DataSourceDesc::Introspection(introspection) => {
2814 DataSource::Introspection(introspection)
2815 }
2816 };
2817 CollectionDescription {
2818 desc: desc.clone(),
2819 data_source,
2820 since: None,
2821 timeline: Some(timeline.clone()),
2822 primary: None,
2823 }
2824 };
2825
2826 let mut compute_collections = vec![];
2827 let mut collections = vec![];
2828 let mut new_builtin_continual_tasks = vec![];
2829 for entry in catalog.entries() {
2830 match entry.item() {
2831 CatalogItem::Source(source) => {
2832 collections.push((
2833 source.global_id(),
2834 source_desc(
2835 source.global_id(),
2836 &source.data_source,
2837 &source.desc,
2838 &source.timeline,
2839 ),
2840 ));
2841 }
2842 CatalogItem::Table(table) => {
2843 match &table.data_source {
2844 TableDataSource::TableWrites { defaults: _ } => {
2845 let versions: BTreeMap<_, _> = table
2846 .collection_descs()
2847 .map(|(gid, version, desc)| (version, (gid, desc)))
2848 .collect();
2849 let collection_descs = versions.iter().map(|(version, (gid, desc))| {
2850 let next_version = version.bump();
2851 let primary_collection =
2852 versions.get(&next_version).map(|(gid, _desc)| gid).copied();
2853 let mut collection_desc =
2854 CollectionDescription::for_table(desc.clone());
2855 collection_desc.primary = primary_collection;
2856
2857 (*gid, collection_desc)
2858 });
2859 collections.extend(collection_descs);
2860 }
2861 TableDataSource::DataSource {
2862 desc: data_source_desc,
2863 timeline,
2864 } => {
2865 soft_assert_eq_or_log!(table.collections.len(), 1);
2867 let collection_descs =
2868 table.collection_descs().map(|(gid, _version, desc)| {
2869 (
2870 gid,
2871 source_desc(
2872 entry.latest_global_id(),
2873 data_source_desc,
2874 &desc,
2875 timeline,
2876 ),
2877 )
2878 });
2879 collections.extend(collection_descs);
2880 }
2881 };
2882 }
2883 CatalogItem::MaterializedView(mv) => {
2884 let collection_descs = mv.collection_descs().map(|(gid, _version, desc)| {
2885 let collection_desc =
2886 CollectionDescription::for_other(desc, mv.initial_as_of.clone());
2887 (gid, collection_desc)
2888 });
2889
2890 collections.extend(collection_descs);
2891 compute_collections.push((mv.global_id_writes(), mv.desc.latest()));
2892 }
2893 CatalogItem::ContinualTask(ct) => {
2894 let collection_desc =
2895 CollectionDescription::for_other(ct.desc.clone(), ct.initial_as_of.clone());
2896 if ct.global_id().is_system() && collection_desc.since.is_none() {
2897 new_builtin_continual_tasks.push((ct.global_id(), collection_desc));
2901 } else {
2902 compute_collections.push((ct.global_id(), ct.desc.clone()));
2903 collections.push((ct.global_id(), collection_desc));
2904 }
2905 }
2906 CatalogItem::Sink(sink) => {
2907 let storage_sink_from_entry = self.catalog().get_entry_by_global_id(&sink.from);
2908 let from_desc = storage_sink_from_entry
2909 .relation_desc()
2910 .expect("sinks can only be built on items with descs")
2911 .into_owned();
2912 let collection_desc = CollectionDescription {
2913 desc: KAFKA_PROGRESS_DESC.clone(),
2915 data_source: DataSource::Sink {
2916 desc: ExportDescription {
2917 sink: StorageSinkDesc {
2918 from: sink.from,
2919 from_desc,
2920 connection: sink
2921 .connection
2922 .clone()
2923 .into_inline_connection(self.catalog().state()),
2924 envelope: sink.envelope,
2925 as_of: Antichain::from_elem(Timestamp::minimum()),
2926 with_snapshot: sink.with_snapshot,
2927 version: sink.version,
2928 from_storage_metadata: (),
2929 to_storage_metadata: (),
2930 commit_interval: sink.commit_interval,
2931 },
2932 instance_id: sink.cluster_id,
2933 },
2934 },
2935 since: None,
2936 timeline: None,
2937 primary: None,
2938 };
2939 collections.push((sink.global_id, collection_desc));
2940 }
2941 _ => (),
2942 }
2943 }
2944
2945 let register_ts = if self.controller.read_only() {
2946 self.get_local_read_ts().await
2947 } else {
2948 self.get_local_write_ts().await.timestamp
2951 };
2952
2953 let storage_metadata = self.catalog.state().storage_metadata();
2954 let migrated_storage_collections = migrated_storage_collections
2955 .into_iter()
2956 .flat_map(|item_id| self.catalog.get_entry(item_id).global_ids())
2957 .collect();
2958
2959 self.controller
2964 .storage
2965 .evolve_nullability_for_bootstrap(storage_metadata, compute_collections)
2966 .await
2967 .unwrap_or_terminate("cannot fail to evolve collections");
2968
2969 self.controller
2970 .storage
2971 .create_collections_for_bootstrap(
2972 storage_metadata,
2973 Some(register_ts),
2974 collections,
2975 &migrated_storage_collections,
2976 )
2977 .await
2978 .unwrap_or_terminate("cannot fail to create collections");
2979
2980 self.bootstrap_builtin_continual_tasks(new_builtin_continual_tasks)
2981 .await;
2982
2983 if !self.controller.read_only() {
2984 self.apply_local_write(register_ts).await;
2985 }
2986 }
2987
2988 async fn bootstrap_builtin_continual_tasks(
2995 &mut self,
2996 mut collections: Vec<(GlobalId, CollectionDescription<Timestamp>)>,
2998 ) {
2999 for (id, collection) in &mut collections {
3000 let entry = self.catalog.get_entry_by_global_id(id);
3001 let ct = match &entry.item {
3002 CatalogItem::ContinualTask(ct) => ct.clone(),
3003 _ => unreachable!("only called with continual task builtins"),
3004 };
3005 let debug_name = self
3006 .catalog()
3007 .resolve_full_name(entry.name(), None)
3008 .to_string();
3009 let (_optimized_plan, physical_plan, _metainfo) = self
3010 .optimize_create_continual_task(&ct, *id, self.owned_catalog(), debug_name)
3011 .expect("builtin CT should optimize successfully");
3012
3013 let mut id_bundle = dataflow_import_id_bundle(&physical_plan, ct.cluster_id);
3015 id_bundle.storage_ids.remove(id);
3017 let read_holds = self.acquire_read_holds(&id_bundle);
3018 let as_of = read_holds.least_valid_read();
3019
3020 collection.since = Some(as_of.clone());
3021 }
3022 self.controller
3023 .storage
3024 .create_collections(self.catalog.state().storage_metadata(), None, collections)
3025 .await
3026 .unwrap_or_terminate("cannot fail to create collections");
3027 }
3028
3029 #[instrument]
3040 fn bootstrap_dataflow_plans(
3041 &mut self,
3042 ordered_catalog_entries: &[CatalogEntry],
3043 mut cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
3044 ) -> Result<BTreeMap<GlobalId, GlobalExpressions>, AdapterError> {
3045 let mut instance_snapshots = BTreeMap::new();
3051 let mut uncached_expressions = BTreeMap::new();
3052
3053 let optimizer_config = OptimizerConfig::from(self.catalog().system_config());
3054
3055 for entry in ordered_catalog_entries {
3056 match entry.item() {
3057 CatalogItem::Index(idx) => {
3058 let compute_instance =
3060 instance_snapshots.entry(idx.cluster_id).or_insert_with(|| {
3061 self.instance_snapshot(idx.cluster_id)
3062 .expect("compute instance exists")
3063 });
3064 let global_id = idx.global_id();
3065
3066 if compute_instance.contains_collection(&global_id) {
3069 continue;
3070 }
3071
3072 let (optimized_plan, physical_plan, metainfo) =
3073 match cached_global_exprs.remove(&global_id) {
3074 Some(global_expressions)
3075 if global_expressions.optimizer_features
3076 == optimizer_config.features =>
3077 {
3078 debug!("global expression cache hit for {global_id:?}");
3079 (
3080 global_expressions.global_mir,
3081 global_expressions.physical_plan,
3082 global_expressions.dataflow_metainfos,
3083 )
3084 }
3085 Some(_) | None => {
3086 let (optimized_plan, global_lir_plan) = {
3087 let mut optimizer = optimize::index::Optimizer::new(
3089 self.owned_catalog(),
3090 compute_instance.clone(),
3091 global_id,
3092 optimizer_config.clone(),
3093 self.optimizer_metrics(),
3094 );
3095
3096 let index_plan = optimize::index::Index::new(
3098 entry.name().clone(),
3099 idx.on,
3100 idx.keys.to_vec(),
3101 );
3102 let global_mir_plan = optimizer.optimize(index_plan)?;
3103 let optimized_plan = global_mir_plan.df_desc().clone();
3104
3105 let global_lir_plan = optimizer.optimize(global_mir_plan)?;
3107
3108 (optimized_plan, global_lir_plan)
3109 };
3110
3111 let (physical_plan, metainfo) = global_lir_plan.unapply();
3112 let metainfo = {
3113 let notice_ids =
3115 std::iter::repeat_with(|| self.allocate_transient_id())
3116 .map(|(_item_id, gid)| gid)
3117 .take(metainfo.optimizer_notices.len())
3118 .collect::<Vec<_>>();
3119 self.catalog().render_notices(
3121 metainfo,
3122 notice_ids,
3123 Some(idx.global_id()),
3124 )
3125 };
3126 uncached_expressions.insert(
3127 global_id,
3128 GlobalExpressions {
3129 global_mir: optimized_plan.clone(),
3130 physical_plan: physical_plan.clone(),
3131 dataflow_metainfos: metainfo.clone(),
3132 optimizer_features: OptimizerFeatures::from(
3133 self.catalog().system_config(),
3134 ),
3135 },
3136 );
3137 (optimized_plan, physical_plan, metainfo)
3138 }
3139 };
3140
3141 let catalog = self.catalog_mut();
3142 catalog.set_optimized_plan(idx.global_id(), optimized_plan);
3143 catalog.set_physical_plan(idx.global_id(), physical_plan);
3144 catalog.set_dataflow_metainfo(idx.global_id(), metainfo);
3145
3146 compute_instance.insert_collection(idx.global_id());
3147 }
3148 CatalogItem::MaterializedView(mv) => {
3149 let compute_instance =
3151 instance_snapshots.entry(mv.cluster_id).or_insert_with(|| {
3152 self.instance_snapshot(mv.cluster_id)
3153 .expect("compute instance exists")
3154 });
3155 let global_id = mv.global_id_writes();
3156
3157 let (optimized_plan, physical_plan, metainfo) =
3158 match cached_global_exprs.remove(&global_id) {
3159 Some(global_expressions)
3160 if global_expressions.optimizer_features
3161 == optimizer_config.features =>
3162 {
3163 debug!("global expression cache hit for {global_id:?}");
3164 (
3165 global_expressions.global_mir,
3166 global_expressions.physical_plan,
3167 global_expressions.dataflow_metainfos,
3168 )
3169 }
3170 Some(_) | None => {
3171 let (_, internal_view_id) = self.allocate_transient_id();
3172 let debug_name = self
3173 .catalog()
3174 .resolve_full_name(entry.name(), None)
3175 .to_string();
3176 let force_non_monotonic = Default::default();
3177
3178 let (optimized_plan, global_lir_plan) = {
3179 let mut optimizer = optimize::materialized_view::Optimizer::new(
3181 self.owned_catalog().as_optimizer_catalog(),
3182 compute_instance.clone(),
3183 global_id,
3184 internal_view_id,
3185 mv.desc.latest().iter_names().cloned().collect(),
3186 mv.non_null_assertions.clone(),
3187 mv.refresh_schedule.clone(),
3188 debug_name,
3189 optimizer_config.clone(),
3190 self.optimizer_metrics(),
3191 force_non_monotonic,
3192 );
3193
3194 let typ = infer_sql_type_for_catalog(
3197 &mv.raw_expr,
3198 &mv.optimized_expr.as_ref().clone(),
3199 );
3200 let global_mir_plan = optimizer
3201 .optimize((mv.optimized_expr.as_ref().clone(), typ))?;
3202 let optimized_plan = global_mir_plan.df_desc().clone();
3203
3204 let global_lir_plan = optimizer.optimize(global_mir_plan)?;
3206
3207 (optimized_plan, global_lir_plan)
3208 };
3209
3210 let (physical_plan, metainfo) = global_lir_plan.unapply();
3211 let metainfo = {
3212 let notice_ids =
3214 std::iter::repeat_with(|| self.allocate_transient_id())
3215 .map(|(_item_id, global_id)| global_id)
3216 .take(metainfo.optimizer_notices.len())
3217 .collect::<Vec<_>>();
3218 self.catalog().render_notices(
3220 metainfo,
3221 notice_ids,
3222 Some(mv.global_id_writes()),
3223 )
3224 };
3225 uncached_expressions.insert(
3226 global_id,
3227 GlobalExpressions {
3228 global_mir: optimized_plan.clone(),
3229 physical_plan: physical_plan.clone(),
3230 dataflow_metainfos: metainfo.clone(),
3231 optimizer_features: OptimizerFeatures::from(
3232 self.catalog().system_config(),
3233 ),
3234 },
3235 );
3236 (optimized_plan, physical_plan, metainfo)
3237 }
3238 };
3239
3240 let catalog = self.catalog_mut();
3241 catalog.set_optimized_plan(mv.global_id_writes(), optimized_plan);
3242 catalog.set_physical_plan(mv.global_id_writes(), physical_plan);
3243 catalog.set_dataflow_metainfo(mv.global_id_writes(), metainfo);
3244
3245 compute_instance.insert_collection(mv.global_id_writes());
3246 }
3247 CatalogItem::ContinualTask(ct) => {
3248 let compute_instance =
3249 instance_snapshots.entry(ct.cluster_id).or_insert_with(|| {
3250 self.instance_snapshot(ct.cluster_id)
3251 .expect("compute instance exists")
3252 });
3253 let global_id = ct.global_id();
3254
3255 let (optimized_plan, physical_plan, metainfo) =
3256 match cached_global_exprs.remove(&global_id) {
3257 Some(global_expressions)
3258 if global_expressions.optimizer_features
3259 == optimizer_config.features =>
3260 {
3261 debug!("global expression cache hit for {global_id:?}");
3262 (
3263 global_expressions.global_mir,
3264 global_expressions.physical_plan,
3265 global_expressions.dataflow_metainfos,
3266 )
3267 }
3268 Some(_) | None => {
3269 let debug_name = self
3270 .catalog()
3271 .resolve_full_name(entry.name(), None)
3272 .to_string();
3273 let (optimized_plan, physical_plan, metainfo) = self
3274 .optimize_create_continual_task(
3275 ct,
3276 global_id,
3277 self.owned_catalog(),
3278 debug_name,
3279 )?;
3280 uncached_expressions.insert(
3281 global_id,
3282 GlobalExpressions {
3283 global_mir: optimized_plan.clone(),
3284 physical_plan: physical_plan.clone(),
3285 dataflow_metainfos: metainfo.clone(),
3286 optimizer_features: OptimizerFeatures::from(
3287 self.catalog().system_config(),
3288 ),
3289 },
3290 );
3291 (optimized_plan, physical_plan, metainfo)
3292 }
3293 };
3294
3295 let catalog = self.catalog_mut();
3296 catalog.set_optimized_plan(ct.global_id(), optimized_plan);
3297 catalog.set_physical_plan(ct.global_id(), physical_plan);
3298 catalog.set_dataflow_metainfo(ct.global_id(), metainfo);
3299
3300 compute_instance.insert_collection(ct.global_id());
3301 }
3302 _ => (),
3303 }
3304 }
3305
3306 Ok(uncached_expressions)
3307 }
3308
3309 async fn bootstrap_dataflow_as_ofs(&mut self) -> BTreeMap<GlobalId, ReadHold<Timestamp>> {
3319 let mut catalog_ids = Vec::new();
3320 let mut dataflows = Vec::new();
3321 let mut read_policies = BTreeMap::new();
3322 for entry in self.catalog.entries() {
3323 let gid = match entry.item() {
3324 CatalogItem::Index(idx) => idx.global_id(),
3325 CatalogItem::MaterializedView(mv) => mv.global_id_writes(),
3326 CatalogItem::ContinualTask(ct) => ct.global_id(),
3327 CatalogItem::Table(_)
3328 | CatalogItem::Source(_)
3329 | CatalogItem::Log(_)
3330 | CatalogItem::View(_)
3331 | CatalogItem::Sink(_)
3332 | CatalogItem::Type(_)
3333 | CatalogItem::Func(_)
3334 | CatalogItem::Secret(_)
3335 | CatalogItem::Connection(_) => continue,
3336 };
3337 if let Some(plan) = self.catalog.try_get_physical_plan(&gid) {
3338 catalog_ids.push(gid);
3339 dataflows.push(plan.clone());
3340
3341 if let Some(compaction_window) = entry.item().initial_logical_compaction_window() {
3342 read_policies.insert(gid, compaction_window.into());
3343 }
3344 }
3345 }
3346
3347 let read_ts = self.get_local_read_ts().await;
3348 let read_holds = as_of_selection::run(
3349 &mut dataflows,
3350 &read_policies,
3351 &*self.controller.storage_collections,
3352 read_ts,
3353 self.controller.read_only(),
3354 );
3355
3356 let catalog = self.catalog_mut();
3357 for (id, plan) in catalog_ids.into_iter().zip_eq(dataflows) {
3358 catalog.set_physical_plan(id, plan);
3359 }
3360
3361 read_holds
3362 }
3363
3364 fn serve(
3373 mut self,
3374 mut internal_cmd_rx: mpsc::UnboundedReceiver<Message>,
3375 mut strict_serializable_reads_rx: mpsc::UnboundedReceiver<(ConnectionId, PendingReadTxn)>,
3376 mut cmd_rx: mpsc::UnboundedReceiver<(OpenTelemetryContext, Command)>,
3377 group_commit_rx: appends::GroupCommitWaiter,
3378 ) -> LocalBoxFuture<'static, ()> {
3379 async move {
3380 let mut cluster_events = self.controller.events_stream();
3382 let last_message = Arc::new(Mutex::new(LastMessage {
3383 kind: "none",
3384 stmt: None,
3385 }));
3386
3387 let (idle_tx, mut idle_rx) = tokio::sync::mpsc::channel(1);
3388 let idle_metric = self.metrics.queue_busy_seconds.clone();
3389 let last_message_watchdog = Arc::clone(&last_message);
3390
3391 spawn(|| "coord watchdog", async move {
3392 let mut interval = tokio::time::interval(Duration::from_secs(5));
3397 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
3401
3402 let mut coord_stuck = false;
3404
3405 loop {
3406 interval.tick().await;
3407
3408 let duration = tokio::time::Duration::from_secs(30);
3410 let timeout = tokio::time::timeout(duration, idle_tx.reserve()).await;
3411 let Ok(maybe_permit) = timeout else {
3412 if !coord_stuck {
3414 let last_message = last_message_watchdog.lock().expect("poisoned");
3415 tracing::warn!(
3416 last_message_kind = %last_message.kind,
3417 last_message_sql = %last_message.stmt_to_string(),
3418 "coordinator stuck for {duration:?}",
3419 );
3420 }
3421 coord_stuck = true;
3422
3423 continue;
3424 };
3425
3426 if coord_stuck {
3428 tracing::info!("Coordinator became unstuck");
3429 }
3430 coord_stuck = false;
3431
3432 let Ok(permit) = maybe_permit else {
3434 break;
3435 };
3436
3437 permit.send(idle_metric.start_timer());
3438 }
3439 });
3440
3441 self.schedule_storage_usage_collection().await;
3442 self.spawn_privatelink_vpc_endpoints_watch_task();
3443 self.spawn_statement_logging_task();
3444 flags::tracing_config(self.catalog.system_config()).apply(&self.tracing_handle);
3445
3446 let warn_threshold = self
3448 .catalog()
3449 .system_config()
3450 .coord_slow_message_warn_threshold();
3451
3452 const MESSAGE_BATCH: usize = 64;
3454 let mut messages = Vec::with_capacity(MESSAGE_BATCH);
3455 let mut cmd_messages = Vec::with_capacity(MESSAGE_BATCH);
3456
3457 let message_batch = self.metrics.message_batch.clone();
3458
3459 loop {
3460 select! {
3464 biased;
3469
3470 _ = internal_cmd_rx.recv_many(&mut messages, MESSAGE_BATCH) => {},
3474 Some(event) = cluster_events.next() => {
3478 messages.push(Message::ClusterEvent(event))
3479 },
3480 () = self.controller.ready() => {
3484 let controller = match self.controller.get_readiness() {
3488 Readiness::Storage => ControllerReadiness::Storage,
3489 Readiness::Compute => ControllerReadiness::Compute,
3490 Readiness::Metrics(_) => ControllerReadiness::Metrics,
3491 Readiness::Internal(_) => ControllerReadiness::Internal,
3492 Readiness::NotReady => unreachable!("just signaled as ready"),
3493 };
3494 messages.push(Message::ControllerReady { controller });
3495 }
3496 permit = group_commit_rx.ready() => {
3499 let user_write_spans = self.pending_writes.iter().flat_map(|x| match x {
3505 PendingWriteTxn::User{span, ..} => Some(span),
3506 PendingWriteTxn::System{..} => None,
3507 });
3508 let span = match user_write_spans.exactly_one() {
3509 Ok(span) => span.clone(),
3510 Err(user_write_spans) => {
3511 let span = info_span!(parent: None, "group_commit_notify");
3512 for s in user_write_spans {
3513 span.follows_from(s);
3514 }
3515 span
3516 }
3517 };
3518 messages.push(Message::GroupCommitInitiate(span, Some(permit)));
3519 },
3520 count = cmd_rx.recv_many(&mut cmd_messages, MESSAGE_BATCH) => {
3524 if count == 0 {
3525 break;
3526 } else {
3527 messages.extend(cmd_messages.drain(..).map(
3528 |(otel_ctx, cmd)| Message::Command(otel_ctx, cmd),
3529 ));
3530 }
3531 },
3532 Some(pending_read_txn) = strict_serializable_reads_rx.recv() => {
3536 let mut pending_read_txns = vec![pending_read_txn];
3537 while let Ok(pending_read_txn) = strict_serializable_reads_rx.try_recv() {
3538 pending_read_txns.push(pending_read_txn);
3539 }
3540 for (conn_id, pending_read_txn) in pending_read_txns {
3541 let prev = self
3542 .pending_linearize_read_txns
3543 .insert(conn_id, pending_read_txn);
3544 soft_assert_or_log!(
3545 prev.is_none(),
3546 "connections can not have multiple concurrent reads, prev: {prev:?}"
3547 )
3548 }
3549 messages.push(Message::LinearizeReads);
3550 }
3551 _ = self.advance_timelines_interval.tick() => {
3555 let span = info_span!(parent: None, "coord::advance_timelines_interval");
3556 span.follows_from(Span::current());
3557
3558 if self.controller.read_only() {
3563 messages.push(Message::AdvanceTimelines);
3564 } else {
3565 messages.push(Message::GroupCommitInitiate(span, None));
3566 }
3567 },
3568 _ = self.check_cluster_scheduling_policies_interval.tick() => {
3572 messages.push(Message::CheckSchedulingPolicies);
3573 },
3574
3575 _ = self.caught_up_check_interval.tick() => {
3579 self.maybe_check_caught_up().await;
3584
3585 continue;
3586 },
3587
3588 timer = idle_rx.recv() => {
3593 timer.expect("does not drop").observe_duration();
3594 self.metrics
3595 .message_handling
3596 .with_label_values(&["watchdog"])
3597 .observe(0.0);
3598 continue;
3599 }
3600 };
3601
3602 message_batch.observe(f64::cast_lossy(messages.len()));
3604
3605 for msg in messages.drain(..) {
3606 let msg_kind = msg.kind();
3609 let span = span!(
3610 target: "mz_adapter::coord::handle_message_loop",
3611 Level::INFO,
3612 "coord::handle_message",
3613 kind = msg_kind
3614 );
3615 let otel_context = span.context().span().span_context().clone();
3616
3617 *last_message.lock().expect("poisoned") = LastMessage {
3621 kind: msg_kind,
3622 stmt: match &msg {
3623 Message::Command(
3624 _,
3625 Command::Execute {
3626 portal_name,
3627 session,
3628 ..
3629 },
3630 ) => session
3631 .get_portal_unverified(portal_name)
3632 .and_then(|p| p.stmt.as_ref().map(Arc::clone)),
3633 _ => None,
3634 },
3635 };
3636
3637 let start = Instant::now();
3638 self.handle_message(msg).instrument(span).await;
3639 let duration = start.elapsed();
3640
3641 self.metrics
3642 .message_handling
3643 .with_label_values(&[msg_kind])
3644 .observe(duration.as_secs_f64());
3645
3646 if duration > warn_threshold {
3648 let trace_id = otel_context.is_valid().then(|| otel_context.trace_id());
3649 tracing::error!(
3650 ?msg_kind,
3651 ?trace_id,
3652 ?duration,
3653 "very slow coordinator message"
3654 );
3655 }
3656 }
3657 }
3658 if let Some(catalog) = Arc::into_inner(self.catalog) {
3661 catalog.expire().await;
3662 }
3663 }
3664 .boxed_local()
3665 }
3666
3667 fn catalog(&self) -> &Catalog {
3669 &self.catalog
3670 }
3671
3672 fn owned_catalog(&self) -> Arc<Catalog> {
3675 Arc::clone(&self.catalog)
3676 }
3677
3678 fn optimizer_metrics(&self) -> OptimizerMetrics {
3681 self.optimizer_metrics.clone()
3682 }
3683
3684 fn catalog_mut(&mut self) -> &mut Catalog {
3686 Arc::make_mut(&mut self.catalog)
3694 }
3695
3696 fn connection_context(&self) -> &ConnectionContext {
3698 self.controller.connection_context()
3699 }
3700
3701 fn secrets_reader(&self) -> &Arc<dyn SecretsReader> {
3703 &self.connection_context().secrets_reader
3704 }
3705
3706 #[allow(dead_code)]
3711 pub(crate) fn broadcast_notice(&self, notice: AdapterNotice) {
3712 for meta in self.active_conns.values() {
3713 let _ = meta.notice_tx.send(notice.clone());
3714 }
3715 }
3716
3717 pub(crate) fn broadcast_notice_tx(
3720 &self,
3721 ) -> Box<dyn FnOnce(AdapterNotice) -> () + Send + 'static> {
3722 let senders: Vec<_> = self
3723 .active_conns
3724 .values()
3725 .map(|meta| meta.notice_tx.clone())
3726 .collect();
3727 Box::new(move |notice| {
3728 for tx in senders {
3729 let _ = tx.send(notice.clone());
3730 }
3731 })
3732 }
3733
3734 pub(crate) fn active_conns(&self) -> &BTreeMap<ConnectionId, ConnMeta> {
3735 &self.active_conns
3736 }
3737
3738 #[instrument(level = "debug")]
3739 pub(crate) fn retire_execution(
3740 &mut self,
3741 reason: StatementEndedExecutionReason,
3742 ctx_extra: ExecuteContextExtra,
3743 ) {
3744 if let Some(uuid) = ctx_extra.retire() {
3745 self.end_statement_execution(uuid, reason);
3746 }
3747 }
3748
3749 #[instrument(level = "debug")]
3751 pub fn dataflow_builder(&self, instance: ComputeInstanceId) -> DataflowBuilder<'_> {
3752 let compute = self
3753 .instance_snapshot(instance)
3754 .expect("compute instance does not exist");
3755 DataflowBuilder::new(self.catalog().state(), compute)
3756 }
3757
3758 pub fn instance_snapshot(
3760 &self,
3761 id: ComputeInstanceId,
3762 ) -> Result<ComputeInstanceSnapshot, InstanceMissing> {
3763 ComputeInstanceSnapshot::new(&self.controller, id)
3764 }
3765
3766 pub(crate) async fn ship_dataflow(
3773 &mut self,
3774 dataflow: DataflowDescription<Plan>,
3775 instance: ComputeInstanceId,
3776 subscribe_target_replica: Option<ReplicaId>,
3777 ) {
3778 self.try_ship_dataflow(dataflow, instance, subscribe_target_replica)
3779 .await
3780 .unwrap_or_terminate("dataflow creation cannot fail");
3781 }
3782
3783 pub(crate) async fn try_ship_dataflow(
3786 &mut self,
3787 dataflow: DataflowDescription<Plan>,
3788 instance: ComputeInstanceId,
3789 subscribe_target_replica: Option<ReplicaId>,
3790 ) -> Result<(), DataflowCreationError> {
3791 let export_ids = dataflow.exported_index_ids().collect();
3794
3795 self.controller
3796 .compute
3797 .create_dataflow(instance, dataflow, subscribe_target_replica)?;
3798
3799 self.initialize_compute_read_policies(export_ids, instance, CompactionWindow::Default)
3800 .await;
3801
3802 Ok(())
3803 }
3804
3805 pub(crate) fn allow_writes(&mut self, instance: ComputeInstanceId, id: GlobalId) {
3809 self.controller
3810 .compute
3811 .allow_writes(instance, id)
3812 .unwrap_or_terminate("allow_writes cannot fail");
3813 }
3814
3815 pub(crate) async fn ship_dataflow_and_notice_builtin_table_updates(
3817 &mut self,
3818 dataflow: DataflowDescription<Plan>,
3819 instance: ComputeInstanceId,
3820 notice_builtin_updates_fut: Option<BuiltinTableAppendNotify>,
3821 ) {
3822 if let Some(notice_builtin_updates_fut) = notice_builtin_updates_fut {
3823 let ship_dataflow_fut = self.ship_dataflow(dataflow, instance, None);
3824 let ((), ()) =
3825 futures::future::join(notice_builtin_updates_fut, ship_dataflow_fut).await;
3826 } else {
3827 self.ship_dataflow(dataflow, instance, None).await;
3828 }
3829 }
3830
3831 pub fn install_compute_watch_set(
3835 &mut self,
3836 conn_id: ConnectionId,
3837 objects: BTreeSet<GlobalId>,
3838 t: Timestamp,
3839 state: WatchSetResponse,
3840 ) -> Result<(), CollectionLookupError> {
3841 let ws_id = self.controller.install_compute_watch_set(objects, t)?;
3842 self.connection_watch_sets
3843 .entry(conn_id.clone())
3844 .or_default()
3845 .insert(ws_id);
3846 self.installed_watch_sets.insert(ws_id, (conn_id, state));
3847 Ok(())
3848 }
3849
3850 pub fn install_storage_watch_set(
3854 &mut self,
3855 conn_id: ConnectionId,
3856 objects: BTreeSet<GlobalId>,
3857 t: Timestamp,
3858 state: WatchSetResponse,
3859 ) -> Result<(), CollectionMissing> {
3860 let ws_id = self.controller.install_storage_watch_set(objects, t)?;
3861 self.connection_watch_sets
3862 .entry(conn_id.clone())
3863 .or_default()
3864 .insert(ws_id);
3865 self.installed_watch_sets.insert(ws_id, (conn_id, state));
3866 Ok(())
3867 }
3868
3869 pub fn cancel_pending_watchsets(&mut self, conn_id: &ConnectionId) {
3871 if let Some(ws_ids) = self.connection_watch_sets.remove(conn_id) {
3872 for ws_id in ws_ids {
3873 self.installed_watch_sets.remove(&ws_id);
3874 }
3875 }
3876 }
3877
3878 pub async fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
3882 let global_timelines: BTreeMap<_, _> = self
3888 .global_timelines
3889 .iter()
3890 .map(|(timeline, state)| (timeline.to_string(), format!("{state:?}")))
3891 .collect();
3892 let active_conns: BTreeMap<_, _> = self
3893 .active_conns
3894 .iter()
3895 .map(|(id, meta)| (id.unhandled().to_string(), format!("{meta:?}")))
3896 .collect();
3897 let txn_read_holds: BTreeMap<_, _> = self
3898 .txn_read_holds
3899 .iter()
3900 .map(|(id, capability)| (id.unhandled().to_string(), format!("{capability:?}")))
3901 .collect();
3902 let pending_peeks: BTreeMap<_, _> = self
3903 .pending_peeks
3904 .iter()
3905 .map(|(id, peek)| (id.to_string(), format!("{peek:?}")))
3906 .collect();
3907 let client_pending_peeks: BTreeMap<_, _> = self
3908 .client_pending_peeks
3909 .iter()
3910 .map(|(id, peek)| {
3911 let peek: BTreeMap<_, _> = peek
3912 .iter()
3913 .map(|(uuid, storage_id)| (uuid.to_string(), storage_id))
3914 .collect();
3915 (id.to_string(), peek)
3916 })
3917 .collect();
3918 let pending_linearize_read_txns: BTreeMap<_, _> = self
3919 .pending_linearize_read_txns
3920 .iter()
3921 .map(|(id, read_txn)| (id.unhandled().to_string(), format!("{read_txn:?}")))
3922 .collect();
3923
3924 Ok(serde_json::json!({
3925 "global_timelines": global_timelines,
3926 "active_conns": active_conns,
3927 "txn_read_holds": txn_read_holds,
3928 "pending_peeks": pending_peeks,
3929 "client_pending_peeks": client_pending_peeks,
3930 "pending_linearize_read_txns": pending_linearize_read_txns,
3931 "controller": self.controller.dump().await?,
3932 }))
3933 }
3934
3935 async fn prune_storage_usage_events_on_startup(&self, retention_period: Duration) {
3949 let item_id = self
3950 .catalog()
3951 .resolve_builtin_table(&MZ_STORAGE_USAGE_BY_SHARD);
3952 let global_id = self.catalog.get_entry(&item_id).latest_global_id();
3953 let read_ts = self.get_local_read_ts().await;
3954 let current_contents_fut = self
3955 .controller
3956 .storage_collections
3957 .snapshot(global_id, read_ts);
3958 let internal_cmd_tx = self.internal_cmd_tx.clone();
3959 spawn(|| "storage_usage_prune", async move {
3960 let mut current_contents = current_contents_fut
3961 .await
3962 .unwrap_or_terminate("cannot fail to fetch snapshot");
3963 differential_dataflow::consolidation::consolidate(&mut current_contents);
3964
3965 let cutoff_ts = u128::from(read_ts).saturating_sub(retention_period.as_millis());
3966 let mut expired = Vec::new();
3967 for (row, diff) in current_contents {
3968 assert_eq!(
3969 diff, 1,
3970 "consolidated contents should not contain retractions: ({row:#?}, {diff:#?})"
3971 );
3972 let collection_timestamp = row
3974 .unpack()
3975 .get(3)
3976 .expect("definition of mz_storage_by_shard changed")
3977 .unwrap_timestamptz();
3978 let collection_timestamp = collection_timestamp.timestamp_millis();
3979 let collection_timestamp: u128 = collection_timestamp
3980 .try_into()
3981 .expect("all collections happen after Jan 1 1970");
3982 if collection_timestamp < cutoff_ts {
3983 debug!("pruning storage event {row:?}");
3984 let builtin_update = BuiltinTableUpdate::row(item_id, row, Diff::MINUS_ONE);
3985 expired.push(builtin_update);
3986 }
3987 }
3988
3989 let _ = internal_cmd_tx.send(Message::StorageUsagePrune(expired));
3991 });
3992 }
3993
3994 fn current_credit_consumption_rate(&self) -> Numeric {
3995 self.catalog()
3996 .user_cluster_replicas()
3997 .filter_map(|replica| match &replica.config.location {
3998 ReplicaLocation::Managed(location) => Some(location.size_for_billing()),
3999 ReplicaLocation::Unmanaged(_) => None,
4000 })
4001 .map(|size| {
4002 self.catalog()
4003 .cluster_replica_sizes()
4004 .0
4005 .get(size)
4006 .expect("location size is validated against the cluster replica sizes")
4007 .credits_per_hour
4008 })
4009 .sum()
4010 }
4011}
4012
4013#[cfg(test)]
4014impl Coordinator {
4015 #[allow(dead_code)]
4016 async fn verify_ship_dataflow_no_error(&mut self, dataflow: DataflowDescription<Plan>) {
4017 let compute_instance = ComputeInstanceId::user(1).expect("1 is a valid ID");
4025
4026 let _: () = self.ship_dataflow(dataflow, compute_instance, None).await;
4027 }
4028}
4029
4030struct LastMessage {
4032 kind: &'static str,
4033 stmt: Option<Arc<Statement<Raw>>>,
4034}
4035
4036impl LastMessage {
4037 fn stmt_to_string(&self) -> Cow<'static, str> {
4039 self.stmt
4040 .as_ref()
4041 .map(|stmt| stmt.to_ast_string_redacted().into())
4042 .unwrap_or(Cow::Borrowed("<none>"))
4043 }
4044}
4045
4046impl fmt::Debug for LastMessage {
4047 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4048 f.debug_struct("LastMessage")
4049 .field("kind", &self.kind)
4050 .field("stmt", &self.stmt_to_string())
4051 .finish()
4052 }
4053}
4054
4055impl Drop for LastMessage {
4056 fn drop(&mut self) {
4057 if std::thread::panicking() {
4059 eprintln!("Coordinator panicking, dumping last message\n{self:?}",);
4061 }
4062 }
4063}
4064
4065pub fn serve(
4077 Config {
4078 controller_config,
4079 controller_envd_epoch,
4080 mut storage,
4081 audit_logs_iterator,
4082 timestamp_oracle_url,
4083 unsafe_mode,
4084 all_features,
4085 build_info,
4086 environment_id,
4087 metrics_registry,
4088 now,
4089 secrets_controller,
4090 cloud_resource_controller,
4091 cluster_replica_sizes,
4092 builtin_system_cluster_config,
4093 builtin_catalog_server_cluster_config,
4094 builtin_probe_cluster_config,
4095 builtin_support_cluster_config,
4096 builtin_analytics_cluster_config,
4097 system_parameter_defaults,
4098 availability_zones,
4099 storage_usage_client,
4100 storage_usage_collection_interval,
4101 storage_usage_retention_period,
4102 segment_client,
4103 egress_addresses,
4104 aws_account_id,
4105 aws_privatelink_availability_zones,
4106 connection_context,
4107 connection_limit_callback,
4108 remote_system_parameters,
4109 webhook_concurrency_limit,
4110 http_host_name,
4111 tracing_handle,
4112 read_only_controllers,
4113 caught_up_trigger: clusters_caught_up_trigger,
4114 helm_chart_version,
4115 license_key,
4116 external_login_password_mz_system,
4117 force_builtin_schema_migration,
4118 }: Config,
4119) -> BoxFuture<'static, Result<(Handle, Client), AdapterError>> {
4120 async move {
4121 let coord_start = Instant::now();
4122 info!("startup: coordinator init: beginning");
4123 info!("startup: coordinator init: preamble beginning");
4124
4125 let _builtins = LazyLock::force(&BUILTINS_STATIC);
4129
4130 let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
4131 let (internal_cmd_tx, internal_cmd_rx) = mpsc::unbounded_channel();
4132 let (strict_serializable_reads_tx, strict_serializable_reads_rx) =
4133 mpsc::unbounded_channel();
4134
4135 if !availability_zones.iter().all_unique() {
4137 coord_bail!("availability zones must be unique");
4138 }
4139
4140 let aws_principal_context = match (
4141 aws_account_id,
4142 connection_context.aws_external_id_prefix.clone(),
4143 ) {
4144 (Some(aws_account_id), Some(aws_external_id_prefix)) => Some(AwsPrincipalContext {
4145 aws_account_id,
4146 aws_external_id_prefix,
4147 }),
4148 _ => None,
4149 };
4150
4151 let aws_privatelink_availability_zones = aws_privatelink_availability_zones
4152 .map(|azs_vec| BTreeSet::from_iter(azs_vec.iter().cloned()));
4153
4154 info!(
4155 "startup: coordinator init: preamble complete in {:?}",
4156 coord_start.elapsed()
4157 );
4158 let oracle_init_start = Instant::now();
4159 info!("startup: coordinator init: timestamp oracle init beginning");
4160
4161 let timestamp_oracle_config = timestamp_oracle_url
4162 .map(|url| TimestampOracleConfig::from_url(&url, &metrics_registry))
4163 .transpose()?;
4164 let mut initial_timestamps =
4165 get_initial_oracle_timestamps(×tamp_oracle_config).await?;
4166
4167 initial_timestamps
4171 .entry(Timeline::EpochMilliseconds)
4172 .or_insert_with(mz_repr::Timestamp::minimum);
4173 let mut timestamp_oracles = BTreeMap::new();
4174 for (timeline, initial_timestamp) in initial_timestamps {
4175 Coordinator::ensure_timeline_state_with_initial_time(
4176 &timeline,
4177 initial_timestamp,
4178 now.clone(),
4179 timestamp_oracle_config.clone(),
4180 &mut timestamp_oracles,
4181 read_only_controllers,
4182 )
4183 .await;
4184 }
4185
4186 let catalog_upper = storage.current_upper().await;
4190 let epoch_millis_oracle = ×tamp_oracles
4196 .get(&Timeline::EpochMilliseconds)
4197 .expect("inserted above")
4198 .oracle;
4199
4200 let mut boot_ts = if read_only_controllers {
4201 let read_ts = epoch_millis_oracle.read_ts().await;
4202 std::cmp::max(read_ts, catalog_upper)
4203 } else {
4204 epoch_millis_oracle.apply_write(catalog_upper).await;
4207 epoch_millis_oracle.write_ts().await.timestamp
4208 };
4209
4210 info!(
4211 "startup: coordinator init: timestamp oracle init complete in {:?}",
4212 oracle_init_start.elapsed()
4213 );
4214
4215 let catalog_open_start = Instant::now();
4216 info!("startup: coordinator init: catalog open beginning");
4217 let persist_client = controller_config
4218 .persist_clients
4219 .open(controller_config.persist_location.clone())
4220 .await
4221 .context("opening persist client")?;
4222 let builtin_item_migration_config =
4223 BuiltinItemMigrationConfig {
4224 persist_client: persist_client.clone(),
4225 read_only: read_only_controllers,
4226 force_migration: force_builtin_schema_migration,
4227 }
4228 ;
4229 let OpenCatalogResult {
4230 mut catalog,
4231 migrated_storage_collections_0dt,
4232 new_builtin_collections,
4233 builtin_table_updates,
4234 cached_global_exprs,
4235 uncached_local_exprs,
4236 } = Catalog::open(mz_catalog::config::Config {
4237 storage,
4238 metrics_registry: &metrics_registry,
4239 state: mz_catalog::config::StateConfig {
4240 unsafe_mode,
4241 all_features,
4242 build_info,
4243 deploy_generation: controller_config.deploy_generation,
4244 environment_id: environment_id.clone(),
4245 read_only: read_only_controllers,
4246 now: now.clone(),
4247 boot_ts: boot_ts.clone(),
4248 skip_migrations: false,
4249 cluster_replica_sizes,
4250 builtin_system_cluster_config,
4251 builtin_catalog_server_cluster_config,
4252 builtin_probe_cluster_config,
4253 builtin_support_cluster_config,
4254 builtin_analytics_cluster_config,
4255 system_parameter_defaults,
4256 remote_system_parameters,
4257 availability_zones,
4258 egress_addresses,
4259 aws_principal_context,
4260 aws_privatelink_availability_zones,
4261 connection_context,
4262 http_host_name,
4263 builtin_item_migration_config,
4264 persist_client: persist_client.clone(),
4265 enable_expression_cache_override: None,
4266 helm_chart_version,
4267 external_login_password_mz_system,
4268 license_key: license_key.clone(),
4269 },
4270 })
4271 .await?;
4272
4273 let catalog_upper = catalog.current_upper().await;
4276 boot_ts = std::cmp::max(boot_ts, catalog_upper);
4277
4278 if !read_only_controllers {
4279 epoch_millis_oracle.apply_write(boot_ts).await;
4280 }
4281
4282 info!(
4283 "startup: coordinator init: catalog open complete in {:?}",
4284 catalog_open_start.elapsed()
4285 );
4286
4287 let coord_thread_start = Instant::now();
4288 info!("startup: coordinator init: coordinator thread start beginning");
4289
4290 let session_id = catalog.config().session_id;
4291 let start_instant = catalog.config().start_instant;
4292
4293 let (bootstrap_tx, bootstrap_rx) = oneshot::channel();
4297 let handle = TokioHandle::current();
4298
4299 let metrics = Metrics::register_into(&metrics_registry);
4300 let metrics_clone = metrics.clone();
4301 let optimizer_metrics = OptimizerMetrics::register_into(
4302 &metrics_registry,
4303 catalog.system_config().optimizer_e2e_latency_warning_threshold(),
4304 );
4305 let segment_client_clone = segment_client.clone();
4306 let coord_now = now.clone();
4307 let advance_timelines_interval = tokio::time::interval(catalog.config().timestamp_interval);
4308 let mut check_scheduling_policies_interval = tokio::time::interval(
4309 catalog
4310 .system_config()
4311 .cluster_check_scheduling_policies_interval(),
4312 );
4313 check_scheduling_policies_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
4314
4315 let clusters_caught_up_check_interval = if read_only_controllers {
4316 let dyncfgs = catalog.system_config().dyncfgs();
4317 let interval = WITH_0DT_DEPLOYMENT_CAUGHT_UP_CHECK_INTERVAL.get(dyncfgs);
4318
4319 let mut interval = tokio::time::interval(interval);
4320 interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
4321 interval
4322 } else {
4323 let mut interval = tokio::time::interval(Duration::from_secs(60 * 60));
4331 interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
4332 interval
4333 };
4334
4335 let clusters_caught_up_check =
4336 clusters_caught_up_trigger.map(|trigger| CaughtUpCheckContext {
4337 trigger,
4338 exclude_collections: new_builtin_collections.into_iter().collect(),
4339 });
4340
4341 if let Some(TimestampOracleConfig::Postgres(pg_config)) =
4342 timestamp_oracle_config.as_ref()
4343 {
4344 let pg_timestamp_oracle_params =
4347 flags::timestamp_oracle_config(catalog.system_config());
4348 pg_timestamp_oracle_params.apply(pg_config);
4349 }
4350
4351 let connection_limit_callback: Arc<dyn Fn(&SystemVars) + Send + Sync> =
4354 Arc::new(move |system_vars: &SystemVars| {
4355 let limit: u64 = system_vars.max_connections().cast_into();
4356 let superuser_reserved: u64 =
4357 system_vars.superuser_reserved_connections().cast_into();
4358
4359 let superuser_reserved = if superuser_reserved >= limit {
4364 tracing::warn!(
4365 "superuser_reserved ({superuser_reserved}) is greater than max connections ({limit})!"
4366 );
4367 limit
4368 } else {
4369 superuser_reserved
4370 };
4371
4372 (connection_limit_callback)(limit, superuser_reserved);
4373 });
4374 catalog.system_config_mut().register_callback(
4375 &mz_sql::session::vars::MAX_CONNECTIONS,
4376 Arc::clone(&connection_limit_callback),
4377 );
4378 catalog.system_config_mut().register_callback(
4379 &mz_sql::session::vars::SUPERUSER_RESERVED_CONNECTIONS,
4380 connection_limit_callback,
4381 );
4382
4383 let (group_commit_tx, group_commit_rx) = appends::notifier();
4384
4385 let parent_span = tracing::Span::current();
4386 let thread = thread::Builder::new()
4387 .stack_size(3 * stack::STACK_SIZE)
4391 .name("coordinator".to_string())
4392 .spawn(move || {
4393 let span = info_span!(parent: parent_span, "coord::coordinator").entered();
4394
4395 let controller = handle
4396 .block_on({
4397 catalog.initialize_controller(
4398 controller_config,
4399 controller_envd_epoch,
4400 read_only_controllers,
4401 )
4402 })
4403 .unwrap_or_terminate("failed to initialize storage_controller");
4404 let catalog_upper = handle.block_on(catalog.current_upper());
4407 boot_ts = std::cmp::max(boot_ts, catalog_upper);
4408 if !read_only_controllers {
4409 let epoch_millis_oracle = ×tamp_oracles
4410 .get(&Timeline::EpochMilliseconds)
4411 .expect("inserted above")
4412 .oracle;
4413 handle.block_on(epoch_millis_oracle.apply_write(boot_ts));
4414 }
4415
4416 let catalog = Arc::new(catalog);
4417
4418 let caching_secrets_reader = CachingSecretsReader::new(secrets_controller.reader());
4419 let mut coord = Coordinator {
4420 controller,
4421 catalog,
4422 internal_cmd_tx,
4423 group_commit_tx,
4424 strict_serializable_reads_tx,
4425 global_timelines: timestamp_oracles,
4426 transient_id_gen: Arc::new(TransientIdGen::new()),
4427 active_conns: BTreeMap::new(),
4428 txn_read_holds: Default::default(),
4429 pending_peeks: BTreeMap::new(),
4430 client_pending_peeks: BTreeMap::new(),
4431 pending_linearize_read_txns: BTreeMap::new(),
4432 serialized_ddl: LockedVecDeque::new(),
4433 active_compute_sinks: BTreeMap::new(),
4434 active_webhooks: BTreeMap::new(),
4435 active_copies: BTreeMap::new(),
4436 staged_cancellation: BTreeMap::new(),
4437 introspection_subscribes: BTreeMap::new(),
4438 write_locks: BTreeMap::new(),
4439 deferred_write_ops: BTreeMap::new(),
4440 pending_writes: Vec::new(),
4441 advance_timelines_interval,
4442 secrets_controller,
4443 caching_secrets_reader,
4444 cloud_resource_controller,
4445 storage_usage_client,
4446 storage_usage_collection_interval,
4447 segment_client,
4448 metrics,
4449 optimizer_metrics,
4450 tracing_handle,
4451 statement_logging: StatementLogging::new(coord_now.clone()),
4452 webhook_concurrency_limit,
4453 timestamp_oracle_config,
4454 check_cluster_scheduling_policies_interval: check_scheduling_policies_interval,
4455 cluster_scheduling_decisions: BTreeMap::new(),
4456 caught_up_check_interval: clusters_caught_up_check_interval,
4457 caught_up_check: clusters_caught_up_check,
4458 installed_watch_sets: BTreeMap::new(),
4459 connection_watch_sets: BTreeMap::new(),
4460 cluster_replica_statuses: ClusterReplicaStatuses::new(),
4461 read_only_controllers,
4462 buffered_builtin_table_updates: Some(Vec::new()),
4463 license_key,
4464 persist_client,
4465 };
4466 let bootstrap = handle.block_on(async {
4467 coord
4468 .bootstrap(
4469 boot_ts,
4470 migrated_storage_collections_0dt,
4471 builtin_table_updates,
4472 cached_global_exprs,
4473 uncached_local_exprs,
4474 audit_logs_iterator,
4475 )
4476 .await?;
4477 coord
4478 .controller
4479 .remove_orphaned_replicas(
4480 coord.catalog().get_next_user_replica_id().await?,
4481 coord.catalog().get_next_system_replica_id().await?,
4482 )
4483 .await
4484 .map_err(AdapterError::Orchestrator)?;
4485
4486 if let Some(retention_period) = storage_usage_retention_period {
4487 coord
4488 .prune_storage_usage_events_on_startup(retention_period)
4489 .await;
4490 }
4491
4492 Ok(())
4493 });
4494 let ok = bootstrap.is_ok();
4495 drop(span);
4496 bootstrap_tx
4497 .send(bootstrap)
4498 .expect("bootstrap_rx is not dropped until it receives this message");
4499 if ok {
4500 handle.block_on(coord.serve(
4501 internal_cmd_rx,
4502 strict_serializable_reads_rx,
4503 cmd_rx,
4504 group_commit_rx,
4505 ));
4506 }
4507 })
4508 .expect("failed to create coordinator thread");
4509 match bootstrap_rx
4510 .await
4511 .expect("bootstrap_tx always sends a message or panics/halts")
4512 {
4513 Ok(()) => {
4514 info!(
4515 "startup: coordinator init: coordinator thread start complete in {:?}",
4516 coord_thread_start.elapsed()
4517 );
4518 info!(
4519 "startup: coordinator init: complete in {:?}",
4520 coord_start.elapsed()
4521 );
4522 let handle = Handle {
4523 session_id,
4524 start_instant,
4525 _thread: thread.join_on_drop(),
4526 };
4527 let client = Client::new(
4528 build_info,
4529 cmd_tx,
4530 metrics_clone,
4531 now,
4532 environment_id,
4533 segment_client_clone,
4534 );
4535 Ok((handle, client))
4536 }
4537 Err(e) => Err(e),
4538 }
4539 }
4540 .boxed()
4541}
4542
4543async fn get_initial_oracle_timestamps(
4557 timestamp_oracle_config: &Option<TimestampOracleConfig>,
4558) -> Result<BTreeMap<Timeline, Timestamp>, AdapterError> {
4559 let mut initial_timestamps = BTreeMap::new();
4560
4561 if let Some(config) = timestamp_oracle_config {
4562 let oracle_timestamps = config.get_all_timelines().await?;
4563
4564 let debug_msg = || {
4565 oracle_timestamps
4566 .iter()
4567 .map(|(timeline, ts)| format!("{:?} -> {}", timeline, ts))
4568 .join(", ")
4569 };
4570 info!(
4571 "current timestamps from the timestamp oracle: {}",
4572 debug_msg()
4573 );
4574
4575 for (timeline, ts) in oracle_timestamps {
4576 let entry = initial_timestamps
4577 .entry(Timeline::from_str(&timeline).expect("could not parse timeline"));
4578
4579 entry
4580 .and_modify(|current_ts| *current_ts = std::cmp::max(*current_ts, ts))
4581 .or_insert(ts);
4582 }
4583 } else {
4584 info!("no timestamp oracle configured!");
4585 };
4586
4587 let debug_msg = || {
4588 initial_timestamps
4589 .iter()
4590 .map(|(timeline, ts)| format!("{:?}: {}", timeline, ts))
4591 .join(", ")
4592 };
4593 info!("initial oracle timestamps: {}", debug_msg());
4594
4595 Ok(initial_timestamps)
4596}
4597
4598#[instrument]
4599pub async fn load_remote_system_parameters(
4600 storage: &mut Box<dyn OpenableDurableCatalogState>,
4601 system_parameter_sync_config: Option<SystemParameterSyncConfig>,
4602 system_parameter_sync_timeout: Duration,
4603) -> Result<Option<BTreeMap<String, String>>, AdapterError> {
4604 if let Some(system_parameter_sync_config) = system_parameter_sync_config {
4605 tracing::info!("parameter sync on boot: start sync");
4606
4607 let mut params = SynchronizedParameters::new(SystemVars::default());
4647 let frontend_sync = async {
4648 let frontend = SystemParameterFrontend::from(&system_parameter_sync_config).await?;
4649 frontend.pull(&mut params);
4650 let ops = params
4651 .modified()
4652 .into_iter()
4653 .map(|param| {
4654 let name = param.name;
4655 let value = param.value;
4656 tracing::info!(name, value, initial = true, "sync parameter");
4657 (name, value)
4658 })
4659 .collect();
4660 tracing::info!("parameter sync on boot: end sync");
4661 Ok(Some(ops))
4662 };
4663 if !storage.has_system_config_synced_once().await? {
4664 frontend_sync.await
4665 } else {
4666 match mz_ore::future::timeout(system_parameter_sync_timeout, frontend_sync).await {
4667 Ok(ops) => Ok(ops),
4668 Err(TimeoutError::Inner(e)) => Err(e),
4669 Err(TimeoutError::DeadlineElapsed) => {
4670 tracing::info!("parameter sync on boot: sync has timed out");
4671 Ok(None)
4672 }
4673 }
4674 }
4675 } else {
4676 Ok(None)
4677 }
4678}
4679
4680#[derive(Debug)]
4681pub enum WatchSetResponse {
4682 StatementDependenciesReady(StatementLoggingId, StatementLifecycleEvent),
4683 AlterSinkReady(AlterSinkReadyContext),
4684 AlterMaterializedViewReady(AlterMaterializedViewReadyContext),
4685}
4686
4687#[derive(Debug)]
4688pub struct AlterSinkReadyContext {
4689 ctx: Option<ExecuteContext>,
4690 otel_ctx: OpenTelemetryContext,
4691 plan: AlterSinkPlan,
4692 plan_validity: PlanValidity,
4693 read_hold: ReadHolds<Timestamp>,
4694}
4695
4696impl AlterSinkReadyContext {
4697 fn ctx(&mut self) -> &mut ExecuteContext {
4698 self.ctx.as_mut().expect("only cleared on drop")
4699 }
4700
4701 fn retire(mut self, result: Result<ExecuteResponse, AdapterError>) {
4702 self.ctx
4703 .take()
4704 .expect("only cleared on drop")
4705 .retire(result);
4706 }
4707}
4708
4709impl Drop for AlterSinkReadyContext {
4710 fn drop(&mut self) {
4711 if let Some(ctx) = self.ctx.take() {
4712 ctx.retire(Err(AdapterError::Canceled));
4713 }
4714 }
4715}
4716
4717#[derive(Debug)]
4718pub struct AlterMaterializedViewReadyContext {
4719 ctx: Option<ExecuteContext>,
4720 otel_ctx: OpenTelemetryContext,
4721 plan: plan::AlterMaterializedViewApplyReplacementPlan,
4722 plan_validity: PlanValidity,
4723}
4724
4725impl AlterMaterializedViewReadyContext {
4726 fn ctx(&mut self) -> &mut ExecuteContext {
4727 self.ctx.as_mut().expect("only cleared on drop")
4728 }
4729
4730 fn retire(mut self, result: Result<ExecuteResponse, AdapterError>) {
4731 self.ctx
4732 .take()
4733 .expect("only cleared on drop")
4734 .retire(result);
4735 }
4736}
4737
4738impl Drop for AlterMaterializedViewReadyContext {
4739 fn drop(&mut self) {
4740 if let Some(ctx) = self.ctx.take() {
4741 ctx.retire(Err(AdapterError::Canceled));
4742 }
4743 }
4744}
4745
4746#[derive(Debug)]
4749struct LockedVecDeque<T> {
4750 items: VecDeque<T>,
4751 lock: Arc<tokio::sync::Mutex<()>>,
4752}
4753
4754impl<T> LockedVecDeque<T> {
4755 pub fn new() -> Self {
4756 Self {
4757 items: VecDeque::new(),
4758 lock: Arc::new(tokio::sync::Mutex::new(())),
4759 }
4760 }
4761
4762 pub fn try_lock_owned(&self) -> Result<OwnedMutexGuard<()>, tokio::sync::TryLockError> {
4763 Arc::clone(&self.lock).try_lock_owned()
4764 }
4765
4766 pub fn is_empty(&self) -> bool {
4767 self.items.is_empty()
4768 }
4769
4770 pub fn push_back(&mut self, value: T) {
4771 self.items.push_back(value)
4772 }
4773
4774 pub fn pop_front(&mut self) -> Option<T> {
4775 self.items.pop_front()
4776 }
4777
4778 pub fn remove(&mut self, index: usize) -> Option<T> {
4779 self.items.remove(index)
4780 }
4781
4782 pub fn iter(&self) -> std::collections::vec_deque::Iter<'_, T> {
4783 self.items.iter()
4784 }
4785}
4786
4787#[derive(Debug)]
4788struct DeferredPlanStatement {
4789 ctx: ExecuteContext,
4790 ps: PlanStatement,
4791}
4792
4793#[derive(Debug)]
4794enum PlanStatement {
4795 Statement {
4796 stmt: Arc<Statement<Raw>>,
4797 params: Params,
4798 },
4799 Plan {
4800 plan: mz_sql::plan::Plan,
4801 resolved_ids: ResolvedIds,
4802 },
4803}
4804
4805#[derive(Debug, Error)]
4806pub enum NetworkPolicyError {
4807 #[error("Access denied for address {0}")]
4808 AddressDenied(IpAddr),
4809 #[error("Access denied missing IP address")]
4810 MissingIp,
4811}
4812
4813pub(crate) fn validate_ip_with_policy_rules(
4814 ip: &IpAddr,
4815 rules: &Vec<NetworkPolicyRule>,
4816) -> Result<(), NetworkPolicyError> {
4817 if rules.iter().any(|r| r.address.0.contains(ip)) {
4820 Ok(())
4821 } else {
4822 Err(NetworkPolicyError::AddressDenied(ip.clone()))
4823 }
4824}
4825
4826pub(crate) fn infer_sql_type_for_catalog(
4827 hir_expr: &HirRelationExpr,
4828 mir_expr: &MirRelationExpr,
4829) -> SqlRelationType {
4830 let mut typ = hir_expr.top_level_typ();
4831 typ.backport_nullability_and_keys(&mir_expr.typ());
4832 typ
4833}