Struct mz_adapter::coord::Coordinator

source ·
pub struct Coordinator {
Show 43 fields controller: Controller, catalog: Arc<Catalog>, internal_cmd_tx: UnboundedSender<Message>, group_commit_tx: GroupCommitNotifier, strict_serializable_reads_tx: UnboundedSender<(ConnectionId, PendingReadTxn)>, global_timelines: BTreeMap<Timeline, TimelineState<Timestamp>>, transient_id_gen: Arc<TransientIdGen>, active_conns: BTreeMap<ConnectionId, ConnMeta>, txn_read_holds: BTreeMap<ConnectionId, ReadHolds<Timestamp>>, pending_peeks: BTreeMap<Uuid, PendingPeek>, client_pending_peeks: BTreeMap<ConnectionId, BTreeMap<Uuid, ClusterId>>, pending_linearize_read_txns: BTreeMap<ConnectionId, PendingReadTxn>, active_compute_sinks: BTreeMap<GlobalId, ActiveComputeSink>, active_webhooks: BTreeMap<CatalogItemId, WebhookAppenderInvalidator>, staged_cancellation: BTreeMap<ConnectionId, (Sender<bool>, Receiver<bool>)>, introspection_subscribes: BTreeMap<GlobalId, IntrospectionSubscribe>, write_locks: BTreeMap<CatalogItemId, Arc<Mutex<()>>>, deferred_write_ops: BTreeMap<ConnectionId, DeferredWriteOp>, pending_writes: Vec<PendingWriteTxn>, advance_timelines_interval: Interval, serialized_ddl: LockedVecDeque<DeferredPlanStatement>, secrets_controller: Arc<dyn SecretsController>, caching_secrets_reader: CachingSecretsReader, cloud_resource_controller: Option<Arc<dyn CloudResourceController>>, transient_replica_metadata: BTreeMap<ReplicaId, Option<ReplicaMetadata>>, storage_usage_client: StorageUsageClient, storage_usage_collection_interval: Duration, segment_client: Option<Client>, metrics: Metrics, optimizer_metrics: OptimizerMetrics, tracing_handle: TracingHandle, statement_logging: StatementLogging, webhook_concurrency_limit: WebhookConcurrencyLimiter, pg_timestamp_oracle_config: Option<PostgresTimestampOracleConfig>, check_cluster_scheduling_policies_interval: Interval, cluster_scheduling_decisions: BTreeMap<ClusterId, BTreeMap<&'static str, SchedulingDecision>>, caught_up_check_interval: Interval, caught_up_check: Option<CaughtUpCheckContext>, installed_watch_sets: BTreeMap<WatchSetId, (ConnectionId, WatchSetResponse)>, connection_watch_sets: BTreeMap<ConnectionId, BTreeSet<WatchSetId>>, cluster_replica_statuses: ClusterReplicaStatuses, read_only_controllers: bool, buffered_builtin_table_updates: Option<Vec<BuiltinTableUpdate>>,
}
Expand description

Glues the external world to the Timely workers.

Fields§

§controller: Controller

The controller for the storage and compute layers.

§catalog: Arc<Catalog>

The catalog in an Arc suitable for readonly references. The Arc allows us to hand out cheap copies of the catalog to functions that can use it off of the main coordinator thread. If the coordinator needs to mutate the catalog, call Self::catalog_mut, which will clone this struct member, allowing it to be mutated here while the other off-thread references can read their catalog as long as needed. In the future we would like this to be a pTVC, but for now this is sufficient.

§internal_cmd_tx: UnboundedSender<Message>

Channel to manage internal commands from the coordinator to itself.

§group_commit_tx: GroupCommitNotifier

Notification that triggers a group commit.

§strict_serializable_reads_tx: UnboundedSender<(ConnectionId, PendingReadTxn)>

Channel for strict serializable reads ready to commit.

§global_timelines: BTreeMap<Timeline, TimelineState<Timestamp>>

Mechanism for totally ordering write and read timestamps, so that all reads reflect exactly the set of writes that precede them, and no writes that follow.

§transient_id_gen: Arc<TransientIdGen>

A generator for transient GlobalIds, shareable with other threads.

§active_conns: BTreeMap<ConnectionId, ConnMeta>

A map from connection ID to metadata about that connection for all active connections.

§txn_read_holds: BTreeMap<ConnectionId, ReadHolds<Timestamp>>

For each transaction, the read holds taken to support any performed reads.

Upon completing a transaction, these read holds should be dropped.

§pending_peeks: BTreeMap<Uuid, PendingPeek>

Access to the peek fields should be restricted to methods in the peek API. A map from pending peek ids to the queue into which responses are sent, and the connection id of the client that initiated the peek.

§client_pending_peeks: BTreeMap<ConnectionId, BTreeMap<Uuid, ClusterId>>

A map from client connection ids to a set of all pending peeks for that client.

§pending_linearize_read_txns: BTreeMap<ConnectionId, PendingReadTxn>

A map from client connection ids to pending linearize read transaction.

§active_compute_sinks: BTreeMap<GlobalId, ActiveComputeSink>

A map from the compute sink ID to it’s state description.

§active_webhooks: BTreeMap<CatalogItemId, WebhookAppenderInvalidator>

A map from active webhooks to their invalidation handle.

§staged_cancellation: BTreeMap<ConnectionId, (Sender<bool>, Receiver<bool>)>

A map from connection ids to a watch channel that is set to true if the connection received a cancel request.

§introspection_subscribes: BTreeMap<GlobalId, IntrospectionSubscribe>

Active introspection subscribes.

§write_locks: BTreeMap<CatalogItemId, Arc<Mutex<()>>>

Locks that grant access to a specific object, populated lazily as objects are written to.

§deferred_write_ops: BTreeMap<ConnectionId, DeferredWriteOp>

Plans that are currently deferred and waiting on a write lock.

§pending_writes: Vec<PendingWriteTxn>

Pending writes waiting for a group commit.

§advance_timelines_interval: Interval

For the realtime timeline, an explicit SELECT or INSERT on a table will bump the table’s timestamps, but there are cases where timestamps are not bumped but we expect the closed timestamps to advance (AS OF X, SUBSCRIBing views over RT sources and tables). To address these, spawn a task that forces table timestamps to close on a regular interval. This roughly tracks the behavior of realtime sources that close off timestamps on an interval.

For non-realtime timelines, nothing pushes the timestamps forward, so we must do it manually.

§serialized_ddl: LockedVecDeque<DeferredPlanStatement>

Serialized DDL. DDL must be serialized because:

  • Many of them do off-thread work and need to verify the catalog is in a valid state, but PlanValidity does not currently support tracking all changes. Doing that correctly seems to be more difficult than it’s worth, so we would instead re-plan and re-sequence the statements.
  • Re-planning a statement is hard because Coordinator and Session state is mutated at various points, and we would need to correctly reset those changes before re-planning and re-sequencing.
§secrets_controller: Arc<dyn SecretsController>

Handle to secret manager that can create and delete secrets from an arbitrary secret storage engine.

§caching_secrets_reader: CachingSecretsReader

A secrets reader than maintains an in-memory cache, where values have a set TTL.

§cloud_resource_controller: Option<Arc<dyn CloudResourceController>>

Handle to a manager that can create and delete kubernetes resources (ie: VpcEndpoint objects)

§transient_replica_metadata: BTreeMap<ReplicaId, Option<ReplicaMetadata>>

Metadata about replicas that doesn’t need to be persisted. Intended for inclusion in system tables.

None is used as a tombstone value for replicas that have been dropped and for which no further updates should be recorded.

§storage_usage_client: StorageUsageClient

Persist client for fetching storage metadata such as size metrics.

§storage_usage_collection_interval: Duration

The interval at which to collect storage usage information.

§segment_client: Option<Client>

Segment analytics client.

§metrics: Metrics

Coordinator metrics.

§optimizer_metrics: OptimizerMetrics

Optimizer metrics.

§tracing_handle: TracingHandle

Tracing handle.

§statement_logging: StatementLogging

Data used by the statement logging feature.

§webhook_concurrency_limit: WebhookConcurrencyLimiter

Limit for how many concurrent webhook requests we allow.

§pg_timestamp_oracle_config: Option<PostgresTimestampOracleConfig>

Optional config for the Postgres-backed timestamp oracle. This is required when postgres is configured using the timestamp_oracle system variable.

§check_cluster_scheduling_policies_interval: Interval

Periodically asks cluster scheduling policies to make their decisions.

§cluster_scheduling_decisions: BTreeMap<ClusterId, BTreeMap<&'static str, SchedulingDecision>>

This keeps the last On/Off decision for each cluster and each scheduling policy. (Clusters that have been dropped or are otherwise out of scope for automatic scheduling are periodically cleaned up from this Map.)

§caught_up_check_interval: Interval

When doing 0dt upgrades/in read-only mode, periodically ask all known clusters/collections whether they are caught up.

§caught_up_check: Option<CaughtUpCheckContext>

Context needed to check whether all clusters/collections have caught up. Only used during 0dt deployment, while in read-only mode.

§installed_watch_sets: BTreeMap<WatchSetId, (ConnectionId, WatchSetResponse)>

Tracks the state associated with the currently installed watchsets.

§connection_watch_sets: BTreeMap<ConnectionId, BTreeSet<WatchSetId>>

Tracks the currently installed watchsets for each connection.

§cluster_replica_statuses: ClusterReplicaStatuses

Tracks the statuses of all cluster replicas.

§read_only_controllers: bool

Whether or not to start controllers in read-only mode. This is only meant for use during development of read-only clusters and 0dt upgrades and should go away once we have proper orchestration during upgrades.

§buffered_builtin_table_updates: Option<Vec<BuiltinTableUpdate>>

Updates to builtin tables that are being buffered while we are in read-only mode. We apply these all at once when coming out of read-only mode.

This is a Some while in read-only mode and will be replaced by a None when we transition out of read-only mode and write out any buffered updates.

Implementations§

source§

impl Coordinator

source

pub fn resolve_collection_id_bundle_names( &self, session: &Session, id_bundle: &CollectionIdBundle, ) -> Vec<String>

Resolves the full name from the corresponding catalog entry for each item in id_bundle. If an item in the bundle does not exist in the catalog, it’s not included in the result.

source§

impl Coordinator

source

pub async fn implement_peek_plan( &mut self, ctx_extra: &mut ExecuteContextExtra, plan: PlannedPeek, finishing: RowSetFinishing, compute_instance: ComputeInstanceId, target_replica: Option<ReplicaId>, max_result_size: u64, max_returned_query_size: Option<u64>, ) -> Result<ExecuteResponse, AdapterError>

Implements a peek plan produced by create_plan above.

source

pub(crate) fn cancel_pending_peeks(&mut self, conn_id: &ConnectionId)

Cancel and remove all pending peeks that were initiated by the client with conn_id.

source

pub(crate) fn handle_peek_notification( &mut self, uuid: Uuid, notification: PeekNotification, otel_ctx: OpenTelemetryContext, )

Handle a peek notification and retire the corresponding execution. Does nothing for already-removed peeks.

source

pub(crate) fn remove_pending_peek(&mut self, uuid: &Uuid) -> Option<PendingPeek>

Clean up a peek’s state.

source

pub(crate) fn send_immediate_rows<I>(rows: I) -> ExecuteResponse
where I: IntoRowIterator, I::Iter: Send + Sync + 'static,

Constructs an ExecuteResponse that that will send some rows to the client immediately, as opposed to asking the dataflow layer to send along the rows after some computation.

source§

impl Coordinator

source

pub(crate) fn spawn_statement_logging_task(&self)

source

pub(crate) fn drain_statement_log(&mut self)

source

fn statement_logging_throttling_check(&mut self, cost: usize) -> Option<usize>

Check whether we need to do throttling (i.e., whether STATEMENT_LOGGING_TARGET_DATA_RATE is set). If so, actually do the check.

Returns None if we must throttle this statement, and Some(n) otherwise, where n is the number of statements that were dropped due to throttling before this one.

source

pub(crate) fn log_prepared_statement( &mut self, session: &mut Session, logging: &Arc<QCell<PreparedStatementLoggingInfo>>, ) -> Option<(Option<(StatementPreparedRecord, PreparedStatementEvent)>, Uuid)>

Returns any statement logging events needed for a particular prepared statement. Possibly mutates the PreparedStatementLoggingInfo metadata.

This function does not do a sampling check, and assumes we did so in a higher layer.

It does do a throttling check, and returns None if we must not log due to throttling.

source

pub fn statement_execution_sample_rate(&self, session: &Session) -> f64

The rate at which statement execution should be sampled. This is the value of the session var statement_logging_sample_rate, constrained by the system var statement_logging_max_sample_rate.

source

pub fn end_statement_execution( &mut self, id: StatementLoggingId, reason: StatementEndedExecutionReason, )

Record the end of statement execution for a statement whose beginning was logged. It is an error to call this function for a statement whose beginning was not logged (because it was not sampled). Requiring the opaque StatementLoggingId type, which is only instantiated by begin_statement_execution if the statement is actually logged, should prevent this.

source

fn pack_statement_execution_inner( record: &StatementBeganExecutionRecord, packer: &mut RowPacker<'_>, )

source

fn pack_statement_began_execution_update( record: &StatementBeganExecutionRecord, ) -> Row

source

fn pack_statement_prepared_update( record: &StatementPreparedRecord, packer: &mut RowPacker<'_>, )

source

fn pack_session_history_update(event: &SessionHistoryEvent) -> Row

source

fn pack_statement_lifecycle_event( StatementLoggingId: &StatementLoggingId, event: &StatementLifecycleEvent, when: EpochMillis, ) -> Row

source

pub fn pack_full_statement_execution_update( began_record: &StatementBeganExecutionRecord, ended_record: &StatementEndedExecutionRecord, ) -> Row

source

pub fn pack_statement_ended_execution_updates( began_record: &StatementBeganExecutionRecord, ended_record: &StatementEndedExecutionRecord, ) -> [(Row, Diff); 2]

source

fn mutate_record<F: FnOnce(&mut StatementBeganExecutionRecord)>( &mut self, StatementLoggingId: StatementLoggingId, f: F, )

Mutate a statement execution record via the given function f.

source

pub fn set_statement_execution_cluster( &mut self, id: StatementLoggingId, cluster_id: ClusterId, )

Set the cluster_id for a statement, once it’s known.

source

pub fn set_statement_execution_timestamp( &mut self, id: StatementLoggingId, timestamp: Timestamp, )

Set the execution_timestamp for a statement, once it’s known

source

pub fn set_transient_index_id( &mut self, id: StatementLoggingId, transient_index_id: GlobalId, )

source

pub fn begin_statement_execution( &mut self, session: &mut Session, params: &Params, logging: &Arc<QCell<PreparedStatementLoggingInfo>>, ) -> Option<StatementLoggingId>

Possibly record the beginning of statement execution, depending on a randomly-chosen value. If the execution beginning was indeed logged, returns a StatementLoggingId that must be passed to end_statement_execution to record when it ends.

source

pub fn begin_session_for_statement_logging(&mut self, session: &ConnMeta)

Record a new connection event

source

pub fn end_session_for_statement_logging(&mut self, uuid: Uuid)

source

pub fn record_statement_lifecycle_event( &mut self, id: &StatementLoggingId, event: &StatementLifecycleEvent, when: EpochMillis, )

source§

impl Coordinator

source

pub(crate) fn now(&self) -> EpochMillis

source

pub(crate) fn now_datetime(&self) -> DateTime<Utc>

source

pub(crate) fn get_timestamp_oracle( &self, timeline: &Timeline, ) -> Arc<dyn TimestampOracle<Timestamp> + Send + Sync>

source

pub(crate) fn get_local_timestamp_oracle( &self, ) -> Arc<dyn TimestampOracle<Timestamp> + Send + Sync>

Returns a TimestampOracle used for reads and writes from/to a local input.

source

pub(crate) async fn get_local_read_ts(&self) -> Timestamp

Assign a timestamp for a read from a local input. Reads following writes must be at a time >= the write’s timestamp; we choose “equal to” for simplicity’s sake and to open as few new timestamps as possible.

source

pub(crate) async fn get_local_write_ts(&mut self) -> WriteTimestamp

Assign a timestamp for a write to a local input and increase the local ts. Writes following reads must ensure that they are assigned a strictly larger timestamp to ensure they are not visible to any real-time earlier reads.

source

pub(crate) async fn peek_local_write_ts(&self) -> Timestamp

Peek the current timestamp used for operations on local inputs. Used to determine how much to block group commits by.

source

pub(crate) fn apply_local_write( &self, timestamp: Timestamp, ) -> impl Future<Output = ()> + Send + 'static

Marks a write at timestamp as completed, using a TimestampOracle.

source

pub(crate) async fn ensure_timeline_state<'a>( &'a mut self, timeline: &'a Timeline, ) -> &mut TimelineState<Timestamp>

Ensures that a global timeline state exists for timeline.

source

pub(crate) async fn ensure_timeline_state_with_initial_time<'a>( timeline: &'a Timeline, initially: Timestamp, now: NowFn, pg_oracle_config: Option<PostgresTimestampOracleConfig>, global_timelines: &'a mut BTreeMap<Timeline, TimelineState<Timestamp>>, read_only: bool, ) -> &'a mut TimelineState<Timestamp>

Ensures that a global timeline state exists for timeline, with an initial time of initially.

source

pub(crate) fn build_collection_id_bundle( &self, storage_ids: impl IntoIterator<Item = GlobalId>, compute_ids: impl IntoIterator<Item = (ComputeInstanceId, GlobalId)>, clusters: impl IntoIterator<Item = ComputeInstanceId>, ) -> CollectionIdBundle

Groups together storage and compute resources into a CollectionIdBundle

source

pub(crate) fn remove_resources_associated_with_timeline( &mut self, timeline: Timeline, ids: CollectionIdBundle, ) -> bool

Given a Timeline and a CollectionIdBundle, removes all of the “storage ids” and “compute ids” in the bundle, from the timeline.

source

pub(crate) fn remove_compute_ids_from_timeline<I>( &mut self, ids: I, ) -> Vec<Timeline>

source

pub(crate) fn ids_in_timeline(&self, timeline: &Timeline) -> CollectionIdBundle

source

pub(crate) fn validate_timeline_context<I>( &self, ids: I, ) -> Result<TimelineContext, AdapterError>
where I: IntoIterator<Item = GlobalId>,

Return an error if the ids are from incompatible TimelineContexts. This should be used to prevent users from doing things that are either meaningless (joining data from timelines that have similar numbers with different meanings like two separate debezium topics) or will never complete (joining cdcv2 and realtime data).

source

pub(crate) fn get_timeline_context(&self, id: CatalogItemId) -> TimelineContext

Return the TimelineContext belonging to a CatalogItemId, if one exists.

source

pub(crate) fn get_timeline_context_for_global_id( &self, id: GlobalId, ) -> TimelineContext

Return the TimelineContext belonging to a GlobalId, if one exists.

source

fn get_timeline_contexts<I>(&self, ids: I) -> BTreeSet<TimelineContext>
where I: IntoIterator<Item = CatalogItemId>,

Return the TimelineContexts belonging to a list of CatalogItemIds, if any exist.

source

pub fn partition_ids_by_timeline_context( &self, id_bundle: &CollectionIdBundle, ) -> impl Iterator<Item = (TimelineContext, CollectionIdBundle)>

Returns an iterator that partitions an id bundle by the TimelineContext that each id belongs to.

source

pub(crate) fn timedomain_for<'a, I>( &self, uses_ids: I, timeline_context: &TimelineContext, conn_id: &ConnectionId, compute_instance: ComputeInstanceId, ) -> Result<CollectionIdBundle, AdapterError>
where I: IntoIterator<Item = &'a GlobalId>,

Return the set of ids in a timedomain and verify timeline correctness.

When a user starts a transaction, we need to prevent compaction of anything they might read from. We use a heuristic of “anything in the same database schemas with the same timeline as whatever the first query is”.

source

pub(crate) async fn advance_timelines(&mut self)

source§

impl Coordinator

source

pub(crate) async fn oracle_read_ts( &self, session: &Session, timeline_ctx: &TimelineContext, when: &QueryWhen, ) -> Option<Timestamp>

source

pub(crate) fn determine_timestamp( &self, session: &Session, id_bundle: &CollectionIdBundle, when: &QueryWhen, compute_instance: ComputeInstanceId, timeline_context: &TimelineContext, oracle_read_ts: Option<Timestamp>, real_time_recency_ts: Option<Timestamp>, ) -> Result<(TimestampDetermination<Timestamp>, ReadHolds<Timestamp>), AdapterError>

Determines the timestamp for a query, acquires read holds that ensure the query remains executable at that time, and returns those.

The caller is responsible for eventually dropping those read holds.

source

pub(crate) fn largest_not_in_advance_of_upper( upper: &Antichain<Timestamp>, ) -> Timestamp

The largest element not in advance of any object in the collection.

Times that are not greater to this frontier are complete for all collections identified as arguments.

source

pub(crate) fn evaluate_when( catalog: &CatalogState, timestamp: MirScalarExpr, session: &Session, ) -> Result<Timestamp, AdapterError>

source§

impl Coordinator

source

pub(crate) fn trigger_group_commit(&mut self)

Send a message to the Coordinate to start a group commit.

source

pub(crate) async fn try_deferred( &mut self, conn_id: ConnectionId, acquired_lock: Option<(CatalogItemId, OwnedMutexGuard<()>)>, )

Tries to execute a previously DeferredWriteOp that requires write locks.

If we can’t acquire all of the write locks then we’ll defer the plan again and wait for the necessary locks to become available.

source

pub(crate) async fn try_group_commit( &mut self, permit: Option<GroupCommitPermit>, )

Attempts to commit all pending write transactions in a group commit. If the timestamp chosen for the writes is not ahead of now(), then we can execute and commit the writes immediately. Otherwise we must wait for now() to advance past the timestamp chosen for the writes.

source

pub(crate) async fn group_commit( &mut self, permit: Option<GroupCommitPermit>, ) -> Timestamp

Tries to commit all pending writes transactions at the same timestamp.

If the caller of this function has the write_lock acquired, then they can optionally pass it in to this method. If the caller does not have the write_lock acquired and the write_lock is currently locked by another operation, then only writes to system tables and table advancements will be applied. If the caller does not have the write_lock acquired and the write_lock is not currently locked by another operation, then group commit will acquire it and all writes will be applied.

All applicable pending writes will be combined into a single Append command and sent to STORAGE as a single batch. All applicable writes will happen at the same timestamp and all involved tables will be advanced to some timestamp larger than the timestamp of the write.

Returns the timestamp of the write.

source

pub(crate) fn submit_write(&mut self, pending_write_txn: PendingWriteTxn)

Submit a write to be executed during the next group commit and trigger a group commit.

source

pub(crate) fn builtin_table_update<'a>(&'a mut self) -> BuiltinTableAppend<'a>

Append some BuiltinTableUpdates, with various degrees of waiting and blocking.

source

pub(crate) fn defer_op<F>(&mut self, acquire_future: F, op: DeferredWriteOp)
where F: Future<Output = (CatalogItemId, OwnedMutexGuard<()>)> + Send + 'static,

source

pub(crate) fn grant_object_write_lock( &mut self, object_id: CatalogItemId, ) -> impl Future<Output = (CatalogItemId, OwnedMutexGuard<()>)> + 'static

Returns a future that waits until it can get an exclusive lock on the specified collection.

source

pub(crate) fn try_grant_object_write_lock( &mut self, object_id: CatalogItemId, ) -> Option<OwnedMutexGuard<()>>

Lazily creates the lock for the provided object_id, and grants it if possible, returns None if the lock is already held.

source§

impl Coordinator

source

pub async fn maybe_check_caught_up(&mut self)

Checks that all clusters/collections are caught up. If so, this will trigger self.catchup_check.trigger.

This method is a no-op when the trigger has already been fired.

source

async fn maybe_check_caught_up_new(&mut self)

source

async fn clusters_caught_up( &self, allowed_lag: Timestamp, cutoff: Timestamp, now: Timestamp, live_frontiers: &BTreeMap<GlobalId, Antichain<Timestamp>>, exclude_collections: &BTreeSet<GlobalId>, ) -> bool

Returns true if all non-transient, non-excluded collections have their write frontier (aka. upper) within allowed_lag of the “live” frontier reported in live_frontiers. The “live” frontiers are frontiers as reported by a currently running environmentd deployment, during a 0dt upgrade.

Collections whose write frontier is behind now by more than the cutoff are ignored.

For this check, zero-replica clusters are always considered caught up. Their collections would never normally be considered caught up but it’s clearly intentional that they have no replicas.

source

async fn collections_caught_up( &self, cluster: &Cluster, allowed_lag: Timestamp, cutoff: Timestamp, now: Timestamp, live_frontiers: &BTreeMap<GlobalId, Antichain<Timestamp>>, exclude_collections: &BTreeSet<GlobalId>, ) -> Result<bool, Error>

Returns true if all non-transient, non-excluded collections have their write frontier (aka. upper) within allowed_lag of the “live” frontier reported in live_frontiers. The “live” frontiers are frontiers as reported by a currently running environmentd deployment, during a 0dt upgrade.

Collections whose write frontier is behind now by more than the cutoff are ignored.

This also returns true in case this cluster does not have any replicas.

source

async fn maybe_check_caught_up_legacy(&mut self)

source§

impl Coordinator

source

pub(crate) async fn check_scheduling_policies(&mut self)

Call each scheduling policy.

source

fn check_refresh_policy(&self)

Runs the SCHEDULE = ON REFRESH cluster scheduling policy, which makes cluster On/Off decisions based on REFRESH materialized view write frontiers and the current time (the local oracle read ts), and sends Message::SchedulingDecisions with these decisions. (Queries the timestamp oracle on a background task.)

source

pub(crate) async fn handle_scheduling_decisions( &mut self, decisions: Vec<(&'static str, Vec<(ClusterId, SchedulingDecision)>)>, )

Handles SchedulingDecisions:

  1. Adds the newly made decisions to cluster_scheduling_decisions.
  2. Cleans up old decisions that are for clusters no longer in scope of automated scheduling decisions.
  3. For each cluster, it sums up cluster_scheduling_decisions, checks the summed up decision against the cluster state, and turns cluster On/Off if needed.
source

fn get_managed_cluster_config( &self, cluster_id: ClusterId, ) -> Option<ClusterVariantManaged>

Returns the managed config for a cluster. Returns None if the cluster doesn’t exist or if it’s an unmanaged cluster.

source§

impl Coordinator

source

pub(crate) fn handle_command(&mut self, cmd: Command) -> LocalBoxFuture<'_, ()>

BOXED FUTURE: As of Nov 2023 the returned Future from this function was 58KB. This would get stored on the stack which is bad for runtime performance, and blow up our stack usage. Because of that we purposefully move this Future onto the heap (i.e. Box it).

source

async fn handle_startup( &mut self, tx: Sender<Result<StartupResponse, AdapterError>>, user: User, conn_id: ConnectionId, secret_key: u32, uuid: Uuid, client_ip: Option<IpAddr>, application_name: String, notice_tx: UnboundedSender<AdapterNotice>, )

source

async fn handle_startup_inner( &mut self, user: &User, conn_id: &ConnectionId, client_ip: &Option<IpAddr>, ) -> Result<(RoleId, BTreeMap<String, OwnedVarInput>), AdapterError>

source

pub(crate) async fn handle_execute( &mut self, portal_name: String, session: Session, tx: ClientTransmitter<ExecuteResponse>, outer_context: Option<ExecuteContextExtra>, )

Handles an execute command.

source

pub(crate) async fn handle_execute_inner( &mut self, stmt: Arc<Statement<Raw>>, params: Params, ctx: ExecuteContext, )

source

fn must_serialize_ddl(stmt: &Statement<Raw>, ctx: &ExecuteContext) -> bool

Whether the statement must be serialized and is DDL.

source

fn must_spawn_purification<A: AstInfo>(stmt: &Statement<A>) -> bool

Whether the statement must be purified off of the Coordinator thread.

source

async fn resolve_mz_now_for_create_materialized_view<'a>( &mut self, cmvs: &CreateMaterializedViewStatement<Aug>, resolved_ids: &ResolvedIds, session: &Session, acquire_read_holds: bool, ) -> Result<Option<Timestamp>, AdapterError>

Chooses a timestamp for mz_now(), if mz_now() occurs in a REFRESH option of the materialized view. Additionally, if acquire_read_holds is true and the MV has any REFRESH option, this function grabs read holds at the earliest possible time on input collections that might be involved in the MV.

Note that this is NOT what handles mz_now() in the query part of the MV. (handles it only in with_options).

(Note that the chosen timestamp won’t be the same timestamp as the system table inserts, unfortunately.)

source

async fn handle_cancel(&mut self, conn_id: ConnectionIdType, secret_key: u32)

Instruct the dataflow layer to cancel any ongoing, interactive work for the named conn_id if the correct secret key is specified.

Note: Here we take a ConnectionIdType as opposed to an owned ConnectionId because this method gets called by external clients when they request to cancel a request.

source

pub(crate) async fn handle_privileged_cancel(&mut self, conn_id: ConnectionId)

Unconditionally instructs the dataflow layer to cancel any ongoing, interactive work for the named conn_id.

source

async fn handle_terminate(&mut self, conn_id: ConnectionId)

Handle termination of a client session.

This cleans up any state in the coordinator associated with the session.

source

fn handle_get_webhook( &mut self, database: String, schema: String, name: String, tx: Sender<Result<AppendWebhookResponse, AppendWebhookError>>, )

Returns the necessary metadata for appending to a webhook source, and a channel to send rows.

source§

impl Coordinator

source

pub fn check_consistency(&self) -> Result<(), CoordinatorInconsistencies>

Checks the Coordinator to make sure we’re internally consistent.

source

fn check_read_holds(&self) -> Result<(), Vec<ReadHoldsInconsistency>>

§Invariants:
  • Read holds should reference known objects.
source

fn check_active_webhooks(&self) -> Result<(), Vec<ActiveWebhookInconsistency>>

§Invariants
  • All GlobalIds in the active_webhooks map should reference known webhook sources.
source

fn check_cluster_statuses(&self) -> Result<(), Vec<ClusterStatusInconsistency>>

§Invariants
  • All ClusterIds in the cluster_replica_statuses map should reference known clusters.
  • All ReplicaIds in the cluster_replica_statuses map should reference known cluster replicas.
source§

impl Coordinator

source

pub(crate) async fn catalog_transact( &mut self, session: Option<&Session>, ops: Vec<Op>, ) -> Result<(), AdapterError>

Same as Self::catalog_transact_conn but takes a Session.

source

pub(crate) async fn catalog_transact_with_side_effects<'c, F, Fut>( &'c mut self, session: Option<&Session>, ops: Vec<Op>, side_effect: F, ) -> Result<(), AdapterError>
where F: FnOnce(&'c mut Coordinator) -> Fut, Fut: Future<Output = ()>,

Same as Self::catalog_transact_conn but takes a Session and runs builtin table updates concurrently with any side effects (e.g. creating collections).

source

pub(crate) async fn catalog_transact_conn( &mut self, conn_id: Option<&ConnectionId>, ops: Vec<Op>, ) -> Result<(), AdapterError>

Same as Self::catalog_transact_inner but awaits the table updates.

source

pub(crate) async fn catalog_transact_with_ddl_transaction( &mut self, session: &mut Session, ops: Vec<Op>, ) -> Result<(), AdapterError>

Executes a Catalog transaction with handling if the provided Session is in a SQL transaction that is executing DDL.

source

pub(crate) async fn catalog_transact_inner<'a>( &mut self, conn_id: Option<&ConnectionId>, ops: Vec<Op>, ) -> Result<BoxFuture<'static, ()>, AdapterError>

Perform a catalog transaction. Coordinator::ship_dataflow must be called after this function successfully returns on any built DataflowDesc.

source

fn drop_replica(&mut self, cluster_id: ClusterId, replica_id: ReplicaId)

source

fn drop_sources(&mut self, sources: Vec<(CatalogItemId, GlobalId)>)

A convenience method for dropping sources.

source

fn drop_tables(&mut self, table_gids: Vec<GlobalId>, ts: Timestamp)

source

fn restart_webhook_sources( &mut self, sources: impl IntoIterator<Item = CatalogItemId>, )

source

pub async fn drop_compute_sink( &mut self, sink_id: GlobalId, ) -> Option<ActiveComputeSink>

Like drop_compute_sinks, but for a single compute sink.

Returns the controller’s state for the compute sink if the identified sink was known to the controller. It is the caller’s responsibility to retire the returned sink. Consider using retire_compute_sinks instead.

source

pub async fn drop_compute_sinks( &mut self, sink_ids: impl IntoIterator<Item = GlobalId>, ) -> BTreeMap<GlobalId, ActiveComputeSink>

Drops a batch of compute sinks.

For each sink that exists, the coordinator and controller’s state associated with the sink is removed.

Returns a map containing the controller’s state for each sink that was removed. It is the caller’s responsibility to retire the returned sinks. Consider using retire_compute_sinks instead.

source

pub async fn retire_compute_sinks( &mut self, reasons: BTreeMap<GlobalId, ActiveComputeSinkRetireReason>, )

Retires a batch of sinks with disparate reasons for retirement.

Each sink identified in reasons is dropped (see drop_compute_sinks), then retired with its corresponding reason.

source

pub async fn drop_reconfiguration_replicas( &mut self, cluster_ids: BTreeSet<ClusterId>, ) -> Result<(), AdapterError>

Drops all pending replicas for a set of clusters that are undergoing reconfiguration.

source

pub(crate) async fn cancel_compute_sinks_for_conn( &mut self, conn_id: &ConnectionId, )

Cancels all active compute sinks for the identified connection.

source

pub(crate) async fn cancel_cluster_reconfigurations_for_conn( &mut self, conn_id: &ConnectionId, )

Cancels all active cluster reconfigurations sinks for the identified connection.

source

pub(crate) async fn retire_compute_sinks_for_conn( &mut self, conn_id: &ConnectionId, reason: ActiveComputeSinkRetireReason, )

Retires all active compute sinks for the identified connection with the specified reason.

source

pub(crate) async fn retire_cluster_reconfigurations_for_conn( &mut self, conn_id: &ConnectionId, )

Cleans pending cluster reconfiguraiotns for the identified connection

source

pub(crate) fn drop_storage_sinks(&mut self, sink_gids: Vec<GlobalId>)

source

pub(crate) fn drop_indexes(&mut self, indexes: Vec<(ClusterId, GlobalId)>)

source

fn drop_materialized_views(&mut self, mviews: Vec<(ClusterId, GlobalId)>)

A convenience method for dropping materialized views.

source

fn drop_continual_tasks( &mut self, cts: Vec<(CatalogItemId, ClusterId, GlobalId)>, )

A convenience method for dropping continual tasks.

source

fn drop_vpc_endpoints_in_background(&self, vpc_endpoints: Vec<CatalogItemId>)

source

pub(crate) async fn drop_temp_items(&mut self, conn_id: &ConnectionId)

Removes all temporary items created by the specified connection, though not the temporary schema itself.

source

fn update_cluster_scheduling_config(&self)

source

fn update_secrets_caching_config(&self)

source

fn update_tracing_config(&self)

source

fn update_compute_config(&mut self)

source

fn update_storage_config(&mut self)

source

fn update_pg_timestamp_oracle_config(&self)

source

fn update_metrics_retention(&mut self)

source

fn update_arrangement_exert_proportionality(&mut self)

source

fn update_http_config(&mut self)

source

pub(crate) async fn create_storage_export( &mut self, id: GlobalId, sink: &Sink, ) -> Result<(), AdapterError>

source

fn validate_resource_limits( &self, ops: &Vec<Op>, conn_id: &ConnectionId, ) -> Result<(), AdapterError>

Validate all resource limits in a catalog transaction and return an error if that limit is exceeded.

source

pub(crate) fn validate_resource_limit<F>( &self, current_amount: usize, new_instances: i64, resource_limit: F, resource_type: &str, limit_name: &str, ) -> Result<(), AdapterError>
where F: Fn(&SystemVars) -> u32,

Validate a specific type of resource limit and return an error if that limit is exceeded.

source

fn validate_resource_limit_numeric<F>( &self, current_amount: Numeric, new_amount: Numeric, resource_limit: F, resource_type: &str, limit_name: &str, ) -> Result<(), AdapterError>
where F: Fn(&SystemVars) -> Numeric,

Validate a specific type of float resource limit and return an error if that limit is exceeded.

This is very similar to Self::validate_resource_limit but for numerics.

source§

impl Coordinator

source

pub fn index_oracle(&self, instance: ComputeInstanceId) -> DataflowBuilder<'_>

Creates a new index oracle for the specified compute instance.

source§

impl Coordinator

source

pub(super) async fn bootstrap_introspection_subscribes(&mut self)

Installs introspection subscribes on all existing replicas.

Meant to be invoked during coordinator bootstrapping.

source

pub(super) async fn install_introspection_subscribes( &mut self, cluster_id: ClusterId, replica_id: ReplicaId, )

Installs introspection subscribes on the given replica.

source

async fn install_introspection_subscribe( &mut self, cluster_id: ClusterId, replica_id: ReplicaId, spec: &'static SubscribeSpec, )

source

async fn sequence_introspection_subscribe( &mut self, subscribe_id: GlobalId, spec: &'static SubscribeSpec, cluster_id: ClusterId, replica_id: ReplicaId, )

source

fn sequence_introspection_subscribe_optimize_mir( &self, stage: IntrospectionSubscribeOptimizeMir, ) -> Result<StageResult<Box<IntrospectionSubscribeStage>>, AdapterError>

source

fn sequence_introspection_subscribe_timestamp_optimize_lir( &self, stage: IntrospectionSubscribeTimestampOptimizeLir, ) -> Result<StageResult<Box<IntrospectionSubscribeStage>>, AdapterError>

source

async fn sequence_introspection_subscribe_finish( &mut self, stage: IntrospectionSubscribeFinish, ) -> Result<StageResult<Box<IntrospectionSubscribeStage>>, AdapterError>

source

pub(super) fn drop_introspection_subscribes(&mut self, replica_id: ReplicaId)

Drops the introspection subscribes installed on the given replica.

Dropping an introspection subscribe entails:

source

fn drop_introspection_subscribe(&mut self, id: GlobalId)

source

async fn reinstall_introspection_subscribe(&mut self, id: GlobalId)

source

pub(super) async fn handle_introspection_subscribe_batch( &mut self, id: GlobalId, batch: SubscribeBatch, )

Processes a batch returned by an introspection subscribe.

Depending on the contents of the batch, this either appends received updates to the corresponding storage-managed collection, or reinstalls a disconnected subscribe.

source§

impl Coordinator

source

pub(crate) async fn handle_message(&mut self, msg: Message)

BOXED FUTURE: As of Nov 2023 the returned Future from this function was 74KB. This would get stored on the stack which is bad for runtime performance, and blow up our stack usage. Because of that we purposefully move Futures of inner function calls onto the heap (i.e. Box it).

source

pub async fn storage_usage_fetch(&mut self)

source

async fn storage_usage_update(&mut self, shards_usage: ShardsUsageReferenced)

source

async fn storage_usage_prune(&mut self, expired: Vec<BuiltinTableUpdate>)

source

pub async fn schedule_storage_usage_collection(&self)

source

async fn message_command(&mut self, cmd: Command)

source

async fn message_controller(&mut self, message: ControllerResponse)

source

async fn message_purified_statement_ready( &mut self, __arg1: BackgroundWorkResult<PurifiedStatement>, )

source

async fn message_create_connection_validation_ready( &mut self, __arg1: ValidationReady<CreateConnectionPlan>, )

source

async fn message_alter_connection_validation_ready( &mut self, __arg1: ValidationReady<Connection>, )

source

async fn message_cluster_event(&mut self, event: ClusterEvent)

source

async fn message_linearize_reads(&mut self)

Linearizes sending the results of a read transaction by,

  1. Holding back any results that were executed at some point in the future, until the containing timeline has advanced to that point in the future.
  2. Confirming that we are still the current leader before sending results to the client.
source§

impl Coordinator

source§

impl Coordinator

source

pub(crate) async fn initialize_storage_read_policies( &mut self, ids: BTreeSet<CatalogItemId>, compaction_window: CompactionWindow, )

Initialize the storage read policies.

This should be called only after a storage collection is created, and ideally very soon afterwards. The collection is otherwise initialized with a read policy that allows no compaction.

source

pub(crate) async fn initialize_compute_read_policies( &mut self, ids: Vec<GlobalId>, instance: ComputeInstanceId, compaction_window: CompactionWindow, )

Initialize the compute read policies.

This should be called only after a compute collection is created, and ideally very soon afterwards. The collection is otherwise initialized with a read policy that allows no compaction.

source

pub(crate) async fn initialize_read_policies( &mut self, id_bundle: &CollectionIdBundle, compaction_window: CompactionWindow, )

Initialize the storage and compute read policies.

This should be called only after a collection is created, and ideally very soon afterwards. The collection is otherwise initialized with a read policy that allows no compaction.

source

pub(crate) fn update_storage_read_policies( &mut self, policies: Vec<(CatalogItemId, ReadPolicy<Timestamp>)>, )

source

pub(crate) fn update_compute_read_policies( &self, policies: Vec<(ComputeInstanceId, CatalogItemId, ReadPolicy<Timestamp>)>, )

source

pub(crate) fn update_compute_read_policy( &self, compute_instance: ComputeInstanceId, item_id: CatalogItemId, base_policy: ReadPolicy<Timestamp>, )

source

pub(crate) fn acquire_read_holds( &self, id_bundle: &CollectionIdBundle, ) -> ReadHolds<Timestamp>

Attempt to acquire read holds on the indicated collections at the earliest available time.

§Panics

Will panic if any of the referenced collections in id_bundle don’t exist.

source

pub(crate) fn store_transaction_read_holds( &mut self, session: &Session, read_holds: ReadHolds<Timestamp>, )

Stash transaction read holds. They will be released when the transaction is cleaned up.

source§

impl Coordinator

source

pub(crate) async fn sequence_alter_cluster_staged( &mut self, ctx: ExecuteContext, plan: AlterClusterPlan, )

source

async fn alter_cluster_validate( &mut self, session: &Session, plan: AlterClusterPlan, ) -> Result<ClusterStage, AdapterError>

source

async fn sequence_alter_cluster_stage( &mut self, session: &Session, plan: AlterClusterPlan, validity: PlanValidity, ) -> Result<StageResult<Box<ClusterStage>>, AdapterError>

source

async fn finalize_alter_cluster_stage( &mut self, session: &Session, __arg2: AlterClusterPlan, new_config: ClusterVariantManaged, ) -> Result<StageResult<Box<ClusterStage>>, AdapterError>

source

async fn check_if_pending_replicas_hydrated_stage( &mut self, session: &Session, plan: AlterClusterPlan, new_config: ClusterVariantManaged, timeout_time: Instant, on_timeout: OnTimeoutAction, validity: PlanValidity, ) -> Result<StageResult<Box<ClusterStage>>, AdapterError>

source

pub(crate) async fn sequence_create_cluster( &mut self, session: &Session, __arg2: CreateClusterPlan, ) -> Result<ExecuteResponse, AdapterError>

source

async fn sequence_create_managed_cluster( &mut self, session: &Session, __arg2: CreateClusterManagedPlan, cluster_id: ClusterId, ops: Vec<Op>, ) -> Result<ExecuteResponse, AdapterError>

source

fn create_managed_cluster_replica_op( &self, cluster_id: ClusterId, name: String, compute: &ComputeReplicaConfig, size: &String, ops: &mut Vec<Op>, azs: Option<&[String]>, disk: bool, pending: bool, owner_id: RoleId, reason: ReplicaCreateDropReason, ) -> Result<(), AdapterError>

source

fn ensure_valid_azs<'a, I: IntoIterator<Item = &'a String>>( &self, azs: I, ) -> Result<(), AdapterError>

source

async fn sequence_create_unmanaged_cluster( &mut self, session: &Session, __arg2: CreateClusterUnmanagedPlan, id: ClusterId, ops: Vec<Op>, ) -> Result<ExecuteResponse, AdapterError>

source

async fn create_cluster(&mut self, cluster_id: ClusterId)

source

pub(crate) async fn sequence_create_cluster_replica( &mut self, session: &Session, __arg2: CreateClusterReplicaPlan, ) -> Result<ExecuteResponse, AdapterError>

source

async fn create_cluster_replica( &mut self, cluster_id: ClusterId, replica_id: ReplicaId, )

source

pub(crate) async fn sequence_alter_cluster_managed_to_managed( &mut self, session: Option<&Session>, cluster_id: ClusterId, new_config: ClusterConfig, reason: ReplicaCreateDropReason, strategy: AlterClusterPlanStrategy, ) -> Result<NeedsFinalization, AdapterError>

When this is called by the automated cluster scheduling, scheduling_decision_reason should contain information on why is a cluster being turned On/Off. It will be forwarded to the details field of the audit log event that records creating or dropping replicas.

§Panics

Panics if the identified cluster is not a managed cluster. Panics if new_config is not a configuration for a managed cluster.

source

async fn sequence_alter_cluster_unmanaged_to_managed( &mut self, session: &Session, cluster_id: ClusterId, new_config: ClusterConfig, options: PlanClusterOption, ) -> Result<(), AdapterError>

§Panics

Panics if new_config is not a configuration for a managed cluster.

source

async fn sequence_alter_cluster_managed_to_unmanaged( &mut self, session: &Session, cluster_id: ClusterId, new_config: ClusterConfig, ) -> Result<(), AdapterError>

source

async fn sequence_alter_cluster_unmanaged_to_unmanaged( &mut self, session: &Session, cluster_id: ClusterId, new_config: ClusterConfig, replicas: AlterOptionParameter<Vec<(String, ReplicaConfig)>>, ) -> Result<(), AdapterError>

source

pub(crate) async fn sequence_alter_cluster_rename( &mut self, session: &mut Session, __arg2: AlterClusterRenamePlan, ) -> Result<ExecuteResponse, AdapterError>

source

pub(crate) async fn sequence_alter_cluster_swap( &mut self, session: &mut Session, __arg2: AlterClusterSwapPlan, ) -> Result<ExecuteResponse, AdapterError>

source

pub(crate) async fn sequence_alter_cluster_replica_rename( &mut self, session: &Session, __arg2: AlterClusterReplicaRenamePlan, ) -> Result<ExecuteResponse, AdapterError>

source

pub(crate) async fn sequence_alter_set_cluster( &self, _session: &Session, __arg2: AlterSetClusterPlan, ) -> Result<ExecuteResponse, AdapterError>

Convert a AlterSetClusterPlan to a sequence of catalog operators and adjust state.

source§

impl Coordinator

source§

impl Coordinator

source

pub(crate) async fn sequence_create_index( &mut self, ctx: ExecuteContext, plan: CreateIndexPlan, resolved_ids: ResolvedIds, )

source

pub(crate) async fn explain_create_index( &mut self, ctx: ExecuteContext, __arg2: ExplainPlanPlan, )

source

pub(crate) async fn explain_replan_index( &mut self, ctx: ExecuteContext, __arg2: ExplainPlanPlan, )

source

pub(crate) fn explain_index( &mut self, ctx: &ExecuteContext, _: ExplainPlanPlan, ) -> Result<ExecuteResponse, AdapterError>

source

fn create_index_validate( &mut self, plan: CreateIndexPlan, resolved_ids: ResolvedIds, explain_ctx: ExplainContext, ) -> Result<CreateIndexStage, AdapterError>

source

async fn create_index_optimize( &mut self, __arg1: CreateIndexOptimize, ) -> Result<StageResult<Box<CreateIndexStage>>, AdapterError>

source

async fn create_index_finish( &mut self, session: &Session, __arg2: CreateIndexFinish, ) -> Result<StageResult<Box<CreateIndexStage>>, AdapterError>

source

async fn create_index_explain( &mut self, session: &Session, __arg2: CreateIndexExplain, ) -> Result<StageResult<Box<CreateIndexStage>>, AdapterError>

source§

impl Coordinator

source

pub(crate) async fn sequence_create_materialized_view( &mut self, ctx: ExecuteContext, plan: CreateMaterializedViewPlan, resolved_ids: ResolvedIds, )

source

pub(crate) async fn explain_create_materialized_view( &mut self, ctx: ExecuteContext, __arg2: ExplainPlanPlan, )

source

pub(crate) async fn explain_replan_materialized_view( &mut self, ctx: ExecuteContext, __arg2: ExplainPlanPlan, )

source

pub(super) fn explain_materialized_view( &mut self, ctx: &ExecuteContext, _: ExplainPlanPlan, ) -> Result<ExecuteResponse, AdapterError>

source

fn create_materialized_view_validate( &mut self, session: &Session, plan: CreateMaterializedViewPlan, resolved_ids: ResolvedIds, explain_ctx: ExplainContext, ) -> Result<CreateMaterializedViewStage, AdapterError>

source

async fn create_materialized_view_optimize( &mut self, __arg1: CreateMaterializedViewOptimize, ) -> Result<StageResult<Box<CreateMaterializedViewStage>>, AdapterError>

source

async fn create_materialized_view_finish( &mut self, session: &Session, __arg2: CreateMaterializedViewFinish, ) -> Result<StageResult<Box<CreateMaterializedViewStage>>, AdapterError>

source

fn select_timestamps( &self, id_bundle: CollectionIdBundle, refresh_schedule: Option<&RefreshSchedule>, read_holds: &ReadHolds<Timestamp>, ) -> Result<(Antichain<Timestamp>, Antichain<Timestamp>, Antichain<Timestamp>), AdapterError>

Select the initial dataflow_as_of, storage_as_of, and until frontiers for a materialized view.

source

async fn create_materialized_view_explain( &mut self, session: &Session, __arg2: CreateMaterializedViewExplain, ) -> Result<StageResult<Box<CreateMaterializedViewStage>>, AdapterError>

source

pub(crate) async fn explain_pushdown_materialized_view( &self, ctx: ExecuteContext, item_id: CatalogItemId, )

source§

impl Coordinator

source

pub(crate) async fn sequence_create_view( &mut self, ctx: ExecuteContext, plan: CreateViewPlan, resolved_ids: ResolvedIds, )

source

pub(crate) async fn explain_create_view( &mut self, ctx: ExecuteContext, __arg2: ExplainPlanPlan, )

source

pub(crate) async fn explain_replan_view( &mut self, ctx: ExecuteContext, __arg2: ExplainPlanPlan, )

source

pub(crate) fn explain_view( &mut self, ctx: &ExecuteContext, _: ExplainPlanPlan, ) -> Result<ExecuteResponse, AdapterError>

source

fn create_view_validate( &mut self, plan: CreateViewPlan, resolved_ids: ResolvedIds, explain_ctx: ExplainContext, ) -> Result<CreateViewStage, AdapterError>

source

async fn create_view_optimize( &mut self, __arg1: CreateViewOptimize, ) -> Result<StageResult<Box<CreateViewStage>>, AdapterError>

source

async fn create_view_finish( &mut self, session: &Session, __arg2: CreateViewFinish, ) -> Result<StageResult<Box<CreateViewStage>>, AdapterError>

source

async fn create_view_explain( &mut self, session: &Session, __arg2: CreateViewExplain, ) -> Result<StageResult<Box<CreateViewStage>>, AdapterError>

source§

impl Coordinator

source§

impl Coordinator

source

pub(crate) async fn sequence_peek( &mut self, ctx: ExecuteContext, plan: SelectPlan, target_cluster: TargetCluster, max_query_result_size: Option<u64>, )

Sequence a peek, determining a timestamp and the most efficient dataflow interaction.

Peeks are sequenced by assigning a timestamp for evaluation, and then determining and deploying the most efficient evaluation plan. The peek could evaluate to a constant, be a simple read out of an existing arrangement, or required a new dataflow to build the results to return.

source

pub(crate) async fn sequence_copy_to( &mut self, ctx: ExecuteContext, __arg2: CopyToPlan, target_cluster: TargetCluster, )

source

pub(crate) async fn explain_peek( &mut self, ctx: ExecuteContext, __arg2: ExplainPlanPlan, target_cluster: TargetCluster, )

source

pub fn peek_validate( &self, session: &Session, plan: SelectPlan, target_cluster: TargetCluster, copy_to_ctx: Option<CopyToContext>, explain_ctx: ExplainContext, max_query_result_size: Option<u64>, ) -> Result<PeekStage, AdapterError>

Do some simple validation. We must defer most of it until after any off-thread work.

source

async fn peek_linearize_timestamp( &self, session: &Session, __arg2: PeekStageLinearizeTimestamp, ) -> Result<StageResult<Box<PeekStage>>, AdapterError>

Possibly linearize a timestamp from a TimestampOracle.

source

fn peek_timestamp_read_hold( &mut self, session: &mut Session, _: PeekStageTimestampReadHold, ) -> Result<StageResult<Box<PeekStage>>, AdapterError>

Determine a read timestamp and create appropriate read holds.

source

async fn peek_optimize( &self, session: &Session, __arg2: PeekStageOptimize, ) -> Result<StageResult<Box<PeekStage>>, AdapterError>

source

async fn peek_real_time_recency( &self, session: &Session, __arg2: PeekStageRealTimeRecency, ) -> Result<StageResult<Box<PeekStage>>, AdapterError>

source

async fn peek_finish( &mut self, ctx: &mut ExecuteContext, __arg2: PeekStageFinish, ) -> Result<StageResult<Box<PeekStage>>, AdapterError>

source

async fn peek_copy_to_dataflow( &mut self, ctx: &ExecuteContext, __arg2: PeekStageCopyTo, ) -> Result<StageResult<Box<PeekStage>>, AdapterError>

source

async fn peek_explain_plan( &self, session: &Session, __arg2: PeekStageExplainPlan, ) -> Result<StageResult<Box<PeekStage>>, AdapterError>

source

async fn peek_explain_pushdown( &self, session: &Session, stage: PeekStageExplainPushdown, ) -> Result<StageResult<Box<PeekStage>>, AdapterError>

source

pub(super) fn sequence_peek_timestamp( &mut self, session: &mut Session, when: &QueryWhen, cluster_id: ClusterId, timeline_context: TimelineContext, oracle_read_ts: Option<Timestamp>, source_bundle: &CollectionIdBundle, source_ids: &BTreeSet<GlobalId>, real_time_recency_ts: Option<Timestamp>, requires_linearization: RequireLinearization, ) -> Result<TimestampDetermination<Timestamp>, AdapterError>

Determines the query timestamp and acquires read holds on dependent sources if necessary.

source§

impl Coordinator

source§

impl Coordinator

source§

impl Coordinator

source

pub(crate) async fn sequence_staged<S>( &mut self, ctx: S::Ctx, parent_span: Span, stage: S, )
where S: Staged + 'static, S::Ctx: Send + 'static,

Sequences the next staged of a Staged plan. This is designed for use with plans that execute both on and off of the coordinator thread. Stages can either produce another stage to execute or a final response. An explicit Span is passed to allow for convenient tracing.

source

fn handle_spawn<C, T, F>( &self, ctx: C, handle: JoinHandle<Result<T, AdapterError>>, cancel_enabled: bool, f: F, )
where C: StagedContext + Send + 'static, T: Send + 'static, F: FnOnce(C, T) + Send + 'static,

source

async fn create_source_inner( &self, session: &Session, plans: Vec<CreateSourcePlanBundle>, ) -> Result<CreateSourceInner, AdapterError>

source

pub(crate) async fn plan_subsource( &mut self, session: &Session, params: &Params, subsource_stmt: CreateSubsourceStatement<Aug>, ) -> Result<CreateSourcePlanBundle, AdapterError>

Subsources are planned differently from other statements because they are typically synthesized from other statements, e.g. CREATE SOURCE. Because of this, we have usually “missed” the opportunity to plan them through the normal statement execution life cycle (the exception being during bootstrapping).

source

pub(crate) async fn plan_purified_alter_source_add_subsource( &mut self, session: &Session, params: Params, source_name: ResolvedItemName, options: Vec<AlterSourceAddSubsourceOption<Aug>>, subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>, ) -> Result<(Plan, ResolvedIds), AdapterError>

Prepares an ALTER SOURCE...ADD SUBSOURCE.

source

pub(crate) fn plan_purified_alter_source_refresh_references( &self, _session: &Session, _params: Params, source_name: ResolvedItemName, available_source_references: SourceReferences, ) -> Result<(Plan, ResolvedIds), AdapterError>

Prepares an ALTER SOURCE...REFRESH REFERENCES.

source

pub(crate) async fn plan_purified_create_source( &mut self, ctx: &ExecuteContext, params: Params, progress_stmt: CreateSubsourceStatement<Aug>, source_stmt: CreateSourceStatement<Aug>, subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>, available_source_references: SourceReferences, ) -> Result<(Plan, ResolvedIds), AdapterError>

Prepares a CREATE SOURCE statement to create its progress subsource, the primary source, and any ingestion export subsources (e.g. PG tables).

source

pub(super) async fn sequence_create_source( &mut self, session: &mut Session, plans: Vec<CreateSourcePlanBundle>, ) -> Result<ExecuteResponse, AdapterError>

source

pub(super) async fn sequence_create_connection( &mut self, ctx: ExecuteContext, plan: CreateConnectionPlan, resolved_ids: ResolvedIds, )

source

pub(crate) async fn sequence_create_connection_stage_finish( &mut self, session: &mut Session, connection_id: CatalogItemId, connection_gid: GlobalId, plan: CreateConnectionPlan, resolved_ids: ResolvedIds, ) -> Result<ExecuteResponse, AdapterError>

source

pub(super) async fn sequence_create_database( &mut self, session: &mut Session, plan: CreateDatabasePlan, ) -> Result<ExecuteResponse, AdapterError>

source

pub(super) async fn sequence_create_schema( &mut self, session: &mut Session, plan: CreateSchemaPlan, ) -> Result<ExecuteResponse, AdapterError>

source

pub(super) async fn sequence_create_role( &mut self, conn_id: Option<&ConnectionId>, __arg2: CreateRolePlan, ) -> Result<ExecuteResponse, AdapterError>

source

pub(super) async fn sequence_create_network_policy( &mut self, session: &Session, __arg2: CreateNetworkPolicyPlan, ) -> Result<ExecuteResponse, AdapterError>

source

pub(super) async fn sequence_alter_network_policy( &mut self, session: &Session, __arg2: AlterNetworkPolicyPlan, ) -> Result<ExecuteResponse, AdapterError>

source

pub(super) async fn sequence_create_table( &mut self, ctx: &mut ExecuteContext, plan: CreateTablePlan, resolved_ids: ResolvedIds, ) -> Result<ExecuteResponse, AdapterError>

source

pub(super) async fn sequence_create_sink( &mut self, ctx: ExecuteContext, plan: CreateSinkPlan, resolved_ids: ResolvedIds, )

source

pub(super) fn validate_system_column_references( &self, uses_ambiguous_columns: bool, depends_on: &BTreeSet<GlobalId>, ) -> Result<(), AdapterError>

Validates that a view definition does not contain any expressions that may lead to ambiguous column references to system tables. For example NATURAL JOIN or SELECT *.

We prevent these expressions so that we can add columns to system tables without changing the definition of the view.

Here is a bit of a hand wavy proof as to why we only need to check the immediate view definition for system objects and ambiguous column references, and not the entire dependency tree:

  • A view with no object references cannot have any ambiguous column references to a system object, because it has no system objects.
  • A view with a direct reference to a system object and a * or NATURAL JOIN will be rejected due to ambiguous column references.
  • A view with system objects but no * or NATURAL JOINs cannot have any ambiguous column references to a system object, because all column references are explicitly named.
  • A view with * or NATURAL JOINs, that doesn’t directly reference a system object cannot have any ambiguous column references to a system object, because there are no system objects in the top level view and all sub-views are guaranteed to have no ambiguous column references to system objects.
source

pub(super) async fn sequence_create_type( &mut self, session: &Session, plan: CreateTypePlan, resolved_ids: ResolvedIds, ) -> Result<ExecuteResponse, AdapterError>

source

pub(super) async fn sequence_comment_on( &mut self, session: &Session, plan: CommentPlan, ) -> Result<ExecuteResponse, AdapterError>

source

pub(super) async fn sequence_drop_objects( &mut self, session: &Session, __arg2: DropObjectsPlan, ) -> Result<ExecuteResponse, AdapterError>

source

fn validate_dropped_role_ownership( &self, session: &Session, dropped_roles: &BTreeMap<RoleId, &str>, ) -> Result<(), AdapterError>

source

pub(super) async fn sequence_drop_owned( &mut self, session: &Session, plan: DropOwnedPlan, ) -> Result<ExecuteResponse, AdapterError>

source

fn sequence_drop_common( &self, session: &Session, ids: Vec<ObjectId>, ) -> Result<DropOps, AdapterError>

source

pub(super) fn sequence_explain_schema( &self, _: ExplainSinkSchemaPlan, ) -> Result<ExecuteResponse, AdapterError>

source

pub(super) fn sequence_show_all_variables( &self, session: &Session, ) -> Result<ExecuteResponse, AdapterError>

source

pub(super) fn sequence_show_variable( &self, session: &Session, plan: ShowVariablePlan, ) -> Result<ExecuteResponse, AdapterError>

source

pub(super) async fn sequence_inspect_shard( &self, session: &Session, plan: InspectShardPlan, ) -> Result<ExecuteResponse, AdapterError>

source

pub(super) fn sequence_set_variable( &self, session: &mut Session, plan: SetVariablePlan, ) -> Result<ExecuteResponse, AdapterError>

source

pub(super) fn sequence_reset_variable( &self, session: &mut Session, plan: ResetVariablePlan, ) -> Result<ExecuteResponse, AdapterError>

source

pub(super) fn sequence_set_transaction( &self, session: &mut Session, plan: SetTransactionPlan, ) -> Result<ExecuteResponse, AdapterError>

source

fn validate_set_isolation_level( &self, session: &Session, ) -> Result<(), AdapterError>

source

fn validate_set_cluster(&self, session: &Session) -> Result<(), AdapterError>

source

pub(super) async fn sequence_end_transaction( &mut self, ctx: ExecuteContext, action: EndTransactionAction, )

source

async fn sequence_end_transaction_inner( &mut self, session: &mut Session, action: EndTransactionAction, ) -> Result<(Option<TransactionOps<Timestamp>>, Option<WriteLocks>), AdapterError>

source

pub(super) async fn sequence_side_effecting_func( &mut self, ctx: ExecuteContext, plan: SideEffectingFunc, )

source

pub(super) async fn determine_real_time_recent_timestamp( &self, session: &Session, source_ids: impl Iterator<Item = CatalogItemId>, ) -> Result<Option<BoxFuture<'static, Result<Timestamp, StorageError<Timestamp>>>>, AdapterError>

Checks to see if the session needs a real time recency timestamp and if so returns a future that will return the timestamp.

source

pub(super) async fn sequence_explain_plan( &mut self, ctx: ExecuteContext, plan: ExplainPlanPlan, target_cluster: TargetCluster, )

source

pub(super) async fn sequence_explain_pushdown( &mut self, ctx: ExecuteContext, plan: ExplainPushdownPlan, target_cluster: TargetCluster, )

source

async fn render_explain_pushdown( &self, ctx: ExecuteContext, as_of: Antichain<Timestamp>, mz_now: ResultSpec<'static>, read_holds: Option<ReadHolds<Timestamp>>, imports: impl IntoIterator<Item = (GlobalId, MapFilterProject)> + 'static, )

source

async fn render_explain_pushdown_prepare( &self, session: &Session, as_of: Antichain<Timestamp>, mz_now: ResultSpec<'static>, imports: impl IntoIterator<Item = (GlobalId, MapFilterProject)>, ) -> impl Future<Output = Result<ExecuteResponse, AdapterError>>

source

pub(super) async fn sequence_insert( &mut self, ctx: ExecuteContext, plan: InsertPlan, )

source

pub(super) async fn sequence_read_then_write( &mut self, ctx: ExecuteContext, plan: ReadThenWritePlan, )

ReadThenWrite is a plan whose writes depend on the results of a read. This works by doing a Peek then queuing a SendDiffs. No writes or read-then-writes can occur between the Peek and SendDiff otherwise a serializability violation could occur.

source

pub(super) async fn sequence_alter_item_rename( &mut self, session: &mut Session, plan: AlterItemRenamePlan, ) -> Result<ExecuteResponse, AdapterError>

source

pub(super) async fn sequence_alter_retain_history( &mut self, session: &mut Session, plan: AlterRetainHistoryPlan, ) -> Result<ExecuteResponse, AdapterError>

source

pub(super) async fn sequence_alter_schema_rename( &mut self, session: &mut Session, plan: AlterSchemaRenamePlan, ) -> Result<ExecuteResponse, AdapterError>

source

pub(super) async fn sequence_alter_schema_swap( &mut self, session: &mut Session, plan: AlterSchemaSwapPlan, ) -> Result<ExecuteResponse, AdapterError>

source

pub(super) async fn sequence_alter_role( &mut self, session: &Session, __arg2: AlterRolePlan, ) -> Result<ExecuteResponse, AdapterError>

source

pub(super) async fn sequence_alter_sink_prepare( &mut self, ctx: ExecuteContext, plan: AlterSinkPlan, )

source

pub async fn sequence_alter_sink_finish(&mut self, ctx: AlterSinkReadyContext)

source

pub(super) async fn sequence_alter_connection( &mut self, ctx: ExecuteContext, __arg2: AlterConnectionPlan, )

source

async fn sequence_alter_connection_options( &mut self, ctx: ExecuteContext, id: CatalogItemId, set_options: BTreeMap<ConnectionOptionName, Option<WithOptionValue<Aug>>>, drop_options: BTreeSet<ConnectionOptionName>, validate: bool, )

source

pub(crate) async fn sequence_alter_connection_stage_finish( &mut self, session: &mut Session, id: CatalogItemId, connection: Connection, ) -> Result<ExecuteResponse, AdapterError>

source

pub(super) async fn sequence_alter_source( &mut self, session: &Session, __arg2: AlterSourcePlan, ) -> Result<ExecuteResponse, AdapterError>

source

pub(super) async fn sequence_alter_system_set( &mut self, session: &Session, __arg2: AlterSystemSetPlan, ) -> Result<ExecuteResponse, AdapterError>

source

pub(super) async fn sequence_alter_system_reset( &mut self, session: &Session, __arg2: AlterSystemResetPlan, ) -> Result<ExecuteResponse, AdapterError>

source

pub(super) async fn sequence_alter_system_reset_all( &mut self, session: &Session, _: AlterSystemResetAllPlan, ) -> Result<ExecuteResponse, AdapterError>

source

fn is_user_allowed_to_alter_system( &self, session: &Session, var_name: Option<&str>, ) -> Result<(), AdapterError>

source

fn validate_alter_system_network_policy( &self, session: &Session, policy_value: &VariableValue, ) -> Result<(), AdapterError>

source

fn validate_alter_network_policy( &self, session: &Session, policy_rules: &Vec<NetworkPolicyRule>, ) -> Result<(), AdapterError>

Validates that a set of NetworkPolicyRules is valid for the current Session.

This helps prevent users from modifying network policies in a way that would lock out their current connection.

source

pub(super) fn sequence_execute( &mut self, session: &mut Session, plan: ExecutePlan, ) -> Result<String, AdapterError>

source

pub(super) async fn sequence_grant_privileges( &mut self, session: &Session, __arg2: GrantPrivilegesPlan, ) -> Result<ExecuteResponse, AdapterError>

source

pub(super) async fn sequence_revoke_privileges( &mut self, session: &Session, __arg2: RevokePrivilegesPlan, ) -> Result<ExecuteResponse, AdapterError>

source

async fn sequence_update_privileges( &mut self, session: &Session, update_privileges: Vec<UpdatePrivilege>, grantees: Vec<RoleId>, variant: UpdatePrivilegeVariant, ) -> Result<ExecuteResponse, AdapterError>

source

pub(super) async fn sequence_alter_default_privileges( &mut self, session: &Session, __arg2: AlterDefaultPrivilegesPlan, ) -> Result<ExecuteResponse, AdapterError>

source

pub(super) async fn sequence_grant_role( &mut self, session: &Session, __arg2: GrantRolePlan, ) -> Result<ExecuteResponse, AdapterError>

source

pub(super) async fn sequence_revoke_role( &mut self, session: &Session, __arg2: RevokeRolePlan, ) -> Result<ExecuteResponse, AdapterError>

source

pub(super) async fn sequence_alter_owner( &mut self, session: &Session, __arg2: AlterOwnerPlan, ) -> Result<ExecuteResponse, AdapterError>

source

pub(super) async fn sequence_reassign_owned( &mut self, session: &Session, __arg2: ReassignOwnedPlan, ) -> Result<ExecuteResponse, AdapterError>

source

pub(crate) async fn handle_deferred_statement(&mut self)

source

pub(super) async fn sequence_alter_table( &mut self, _session: &Session, _plan: AlterTablePlan, ) -> Result<ExecuteResponse, AdapterError>

source§

impl Coordinator

source

pub(super) async fn statistics_oracle( &self, session: &Session, source_ids: &BTreeSet<GlobalId>, query_as_of: &Antichain<Timestamp>, is_oneshot: bool, ) -> Result<Box<dyn StatisticsOracle>, AdapterError>

source§

impl Coordinator

source

fn emit_optimizer_notices( &self, session: &Session, notices: &Vec<RawOptimizerNotice>, )

Forward notices that we got from the optimizer.

source

async fn process_dataflow_metainfo( &mut self, df_meta: DataflowMetainfo, export_id: GlobalId, session: &Session, notice_ids: Vec<GlobalId>, ) -> Option<BoxFuture<'static, ()>>

Process the metainfo from a newly created non-transient dataflow.

source§

impl Coordinator

source

pub(crate) fn sequence_plan( &mut self, ctx: ExecuteContext, plan: Plan, resolved_ids: ResolvedIds, ) -> LocalBoxFuture<'_, ()>

BOXED FUTURE: As of Nov 2023 the returned Future from this function was 34KB. This would get stored on the stack which is bad for runtime performance, and blow up our stack usage. Because of that we purposefully move this Future onto the heap (i.e. Box it).

source

pub(crate) async fn sequence_execute_single_statement_transaction( &mut self, ctx: ExecuteContext, stmt: Arc<Statement<Raw>>, params: Params, )

source

pub(crate) async fn sequence_create_role_for_startup( &mut self, plan: CreateRolePlan, ) -> Result<ExecuteResponse, AdapterError>

Creates a role during connection startup.

This should not be called from anywhere except connection startup.

source

pub(crate) fn allocate_transient_id(&self) -> (CatalogItemId, GlobalId)

source

fn should_emit_rbac_notice(&self, session: &Session) -> Option<AdapterNotice>

source

pub(crate) fn insert_constant( catalog: &Catalog, session: &mut Session, id: CatalogItemId, constants: MirRelationExpr, ) -> Result<ExecuteResponse, AdapterError>

source

pub(crate) fn send_diffs( session: &mut Session, plan: SendDiffsPlan, ) -> Result<ExecuteResponse, AdapterError>

source§

impl Coordinator

source

pub(crate) fn plan_statement( &self, session: &Session, stmt: Statement<Aug>, params: &Params, resolved_ids: &ResolvedIds, ) -> Result<Plan, AdapterError>

source

pub(crate) fn declare( &self, ctx: ExecuteContext, name: String, stmt: Statement<Raw>, sql: String, params: Params, )

source

fn declare_inner( session: &mut Session, catalog: &Catalog, name: String, stmt: Statement<Raw>, sql: String, params: Params, now: EpochMillis, ) -> Result<(), AdapterError>

source

pub(crate) fn describe( catalog: &Catalog, session: &Session, stmt: Option<Statement<Raw>>, param_types: Vec<Option<ScalarType>>, ) -> Result<StatementDesc, AdapterError>

source

pub(crate) fn verify_prepared_statement( catalog: &Catalog, session: &mut Session, name: &str, ) -> Result<(), AdapterError>

Verify a prepared statement is still valid. This will return an error if the catalog’s revision has changed and the statement now produces a different type than its original.

source

pub(crate) fn verify_portal( &self, session: &mut Session, name: &str, ) -> Result<(), AdapterError>

Verify a portal is still valid.

source

fn verify_statement_revision( catalog: &Catalog, session: &Session, stmt: Option<&Statement<Raw>>, desc: &StatementDesc, catalog_revision: u64, ) -> Result<Option<u64>, AdapterError>

If the catalog and portal revisions don’t match, re-describe the statement and ensure its result type has not changed. Return Some(x) with the new (valid) revision if its plan has changed. Return None if the revisions match. Return an error if the plan has changed.

source

pub(crate) async fn clear_transaction( &mut self, session: &mut Session, ) -> TransactionStatus<Timestamp>

Handle removing in-progress transaction state regardless of the end action of the transaction.

source

pub(crate) async fn clear_connection(&mut self, conn_id: &ConnectionId)

Clears coordinator state for a connection.

source

pub(crate) async fn add_active_compute_sink( &mut self, id: GlobalId, active_sink: ActiveComputeSink, ) -> BoxFuture<'static, ()>

Adds coordinator bookkeeping for an active compute sink.

This is a low-level method. The caller is responsible for installing the sink in the controller.

source

pub(crate) async fn remove_active_compute_sink( &mut self, id: GlobalId, ) -> Option<ActiveComputeSink>

Removes coordinator bookkeeping for an active compute sink.

This is a low-level method. The caller is responsible for dropping the sink from the controller. Consider calling drop_compute_sink or retire_compute_sink instead.

source§

impl Coordinator

source

pub(crate) async fn bootstrap( &mut self, boot_ts: Timestamp, migrated_storage_collections_0dt: BTreeSet<CatalogItemId>, builtin_table_updates: Vec<BuiltinTableUpdate>, cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>, uncached_local_exprs: BTreeMap<GlobalId, LocalExpressions>, ) -> Result<(), AdapterError>

Initializes coordinator state based on the contained catalog. Must be called after creating the coordinator and before calling the Coordinator::serve method.

source

async fn bootstrap_tables( &mut self, entries: &[CatalogEntry], builtin_table_updates: Vec<BuiltinTableUpdate>, )

Prepares tables for writing by resetting them to a known state and appending the given builtin table updates. The timestamp oracle will be advanced to the write timestamp of the append when this method returns.

source

async fn bootstrap_storage_collections( &mut self, migrated_storage_collections: &BTreeSet<CatalogItemId>, )

Initializes all storage collections required by catalog objects in the storage controller.

This method takes care of collection creation, as well as migration of existing collections.

Creating all storage collections in a single create_collections call, rather than on demand, is more efficient as it reduces the number of writes to durable storage. It also allows subsequent bootstrap logic to fetch metadata (such as frontiers) of arbitrary storage collections, without needing to worry about dependency order.

migrated_storage_collections is a set of builtin storage collections that have been migrated and should be handled specially.

source

async fn bootstrap_builtin_continual_tasks( &mut self, collections: Vec<(GlobalId, CollectionDescription<Timestamp>)>, )

Make as_of selection happy for builtin CTs. Ideally we’d write the initial as_of down in the durable catalog, but that’s hard because of boot ordering. Instead, we set the since of the storage collection to something that’s a reasonable lower bound for the as_of. Then, if the upper is 0, the as_of selection code will allow us to jump it forward to this since.

source

fn bootstrap_dataflow_plans( &mut self, ordered_catalog_entries: &[CatalogEntry], cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>, ) -> Result<BTreeMap<GlobalId, GlobalExpressions>, AdapterError>

Invokes the optimizer on all indexes and materialized views in the catalog and inserts the resulting dataflow plans into the catalog state.

ordered_catalog_entries must be sorted in dependency order, with dependencies ordered before their dependants.

This method does not perform timestamp selection for the dataflows, nor does it create them in the compute controller. Both of these steps happen later during bootstrapping.

Returns a map of expressions that were not cached.

source

async fn bootstrap_dataflow_as_ofs( &mut self, ) -> BTreeMap<GlobalId, ReadHold<Timestamp>>

Selects for each compute dataflow an as-of suitable for bootstrapping it.

Returns a set of ReadHolds that ensures the read frontiers of involved collections stay in place and that must not be dropped before all compute dataflows have been created with the compute controller.

This method expects all storage collections and dataflow plans to be available, so it must run after Coordinator::bootstrap_storage_collections and Coordinator::bootstrap_dataflow_plans.

source

fn serve( self, internal_cmd_rx: UnboundedReceiver<Message>, strict_serializable_reads_rx: UnboundedReceiver<(ConnectionId, PendingReadTxn)>, cmd_rx: UnboundedReceiver<(OpenTelemetryContext, Command)>, group_commit_rx: GroupCommitWaiter, ) -> LocalBoxFuture<'static, ()>

Serves the coordinator, receiving commands from users over cmd_rx and feedback from dataflow workers over feedback_rx.

You must call bootstrap before calling this method.

BOXED FUTURE: As of Nov 2023 the returned Future from this function was 92KB. This would get stored on the stack which is bad for runtime performance, and blow up our stack usage. Because of that we purposefully move this Future onto the heap (i.e. Box it).

source

fn catalog(&self) -> &Catalog

Obtain a read-only Catalog reference.

source

fn owned_catalog(&self) -> Arc<Catalog>

Obtain a read-only Catalog snapshot, suitable for giving out to non-Coordinator thread tasks.

source

fn optimizer_metrics(&self) -> OptimizerMetrics

Obtain a handle to the optimizer metrics, suitable for giving out to non-Coordinator thread tasks.

source

fn catalog_mut(&mut self) -> &mut Catalog

Obtain a writeable Catalog reference.

source

fn connection_context(&self) -> &ConnectionContext

Obtain a reference to the coordinator’s connection context.

source

fn secrets_reader(&self) -> &Arc<dyn SecretsReader>

Obtain a reference to the coordinator’s secret reader, in an Arc.

source

pub(crate) fn broadcast_notice(&self, notice: AdapterNotice)

Publishes a notice message to all sessions.

source

pub(crate) fn active_conns(&self) -> &BTreeMap<ConnectionId, ConnMeta>

source

pub(crate) fn retire_execution( &mut self, reason: StatementEndedExecutionReason, ctx_extra: ExecuteContextExtra, )

source

pub fn dataflow_builder( &self, instance: ComputeInstanceId, ) -> DataflowBuilder<'_>

Creates a new dataflow builder from the catalog and indexes in self.

source

pub fn instance_snapshot( &self, id: ComputeInstanceId, ) -> Result<ComputeInstanceSnapshot, InstanceMissing>

Return a reference-less snapshot to the indicated compute instance.

source

pub(crate) async fn ship_dataflow( &mut self, dataflow: DataflowDescription<Plan>, instance: ComputeInstanceId, subscribe_target_replica: Option<ReplicaId>, )

Call into the compute controller to install a finalized dataflow, and initialize the read policies for its exported readable objects.

source

pub(crate) async fn ship_dataflow_and_notice_builtin_table_updates( &mut self, dataflow: DataflowDescription<Plan>, instance: ComputeInstanceId, notice_builtin_updates_fut: Option<BoxFuture<'static, ()>>, )

Like ship_dataflow, but also await on builtin table updates.

source

pub fn install_compute_watch_set( &mut self, conn_id: ConnectionId, objects: BTreeSet<GlobalId>, t: Timestamp, state: WatchSetResponse, )

Install a watch set in the controller that is automatically associated with the given connection id. The watchset will be automatically cleared if the connection terminates before the watchset completes.

source

pub fn install_storage_watch_set( &mut self, conn_id: ConnectionId, objects: BTreeSet<GlobalId>, t: Timestamp, state: WatchSetResponse, )

Install a watch set in the controller that is automatically associated with the given connection id. The watchset will be automatically cleared if the connection terminates before the watchset completes.

source

pub fn cancel_pending_watchsets(&mut self, conn_id: &ConnectionId)

Cancels pending watchsets associated with the provided connection id.

source

pub async fn dump(&self) -> Result<Value, Error>

Returns the state of the Coordinator formatted as JSON.

The returned value is not guaranteed to be stable and may change at any point in time.

source

async fn prune_storage_usage_events_on_startup( &self, retention_period: Duration, )

Prune all storage usage events from the MZ_STORAGE_USAGE_BY_SHARD table that are older than retention_period.

This method will read the entire contents of MZ_STORAGE_USAGE_BY_SHARD into memory which can be expensive.

DO NOT call this method outside of startup. The safety of reading at the current oracle read timestamp and then writing at whatever the current write timestamp is (instead of read_ts + 1) relies on the fact that there are no outstanding writes during startup.

Group commit, which this method uses to write the retractions, has builtin fencing, and we never commit retractions to MZ_STORAGE_USAGE_BY_SHARD outside of this method, which is only called once during startup. So we don’t have to worry about double/invalid retractions.

Trait Implementations§

source§

impl Debug for Coordinator

source§

fn fmt(&self, __f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
source§

impl TimestampProvider for Coordinator

source§

fn compute_read_frontier( &self, instance: ComputeInstanceId, id: GlobalId, ) -> Antichain<Timestamp>

Reports a collection’s current read frontier.

source§

fn compute_write_frontier( &self, instance: ComputeInstanceId, id: GlobalId, ) -> Antichain<Timestamp>

Reports a collection’s current write frontier.

source§

fn storage_frontiers( &self, ids: Vec<GlobalId>, ) -> Vec<(GlobalId, Antichain<Timestamp>, Antichain<Timestamp>)>

Returns the implied capability (since) and write frontier (upper) for the specified storage collections.
source§

fn acquire_read_holds( &self, id_bundle: &CollectionIdBundle, ) -> ReadHolds<Timestamp>

Acquires ReadHolds, for the given id_bundle at the earliest possible times.
source§

fn catalog_state(&self) -> &CatalogState

source§

fn get_timeline(timeline_context: &TimelineContext) -> Option<Timeline>

source§

fn needs_linearized_read_ts( isolation_level: &IsolationLevel, when: &QueryWhen, ) -> bool

Returns true if-and-only-if the given configuration needs a linearized read timetamp from a timestamp oracle. Read more
source§

fn determine_timestamp_for( &self, session: &Session, id_bundle: &CollectionIdBundle, when: &QueryWhen, compute_instance: ComputeInstanceId, timeline_context: &TimelineContext, oracle_read_ts: Option<Timestamp>, real_time_recency_ts: Option<Timestamp>, isolation_level: &IsolationLevel, ) -> Result<(TimestampDetermination<Timestamp>, ReadHolds<Timestamp>), AdapterError>

Determines the timestamp for a query. Read more
source§

fn least_valid_read( &self, read_holds: &ReadHolds<Timestamp>, ) -> Antichain<Timestamp>

The smallest common valid read frontier among times in the given ReadHolds.
source§

fn least_valid_write( &self, id_bundle: &CollectionIdBundle, ) -> Antichain<Timestamp>

The smallest common valid write frontier among the specified collections. Read more
source§

fn greatest_available_read( &self, id_bundle: &CollectionIdBundle, ) -> Antichain<Timestamp>

Returns least_valid_write - 1, i.e., each time in least_valid_write stepped back in a saturating way.

Auto Trait Implementations§

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> AsAny for T
where T: Any,

source§

fn as_any(&self) -> &(dyn Any + 'static)

source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T, U> CastInto<U> for T
where U: CastFrom<T>,

source§

fn cast_into(self) -> U

Performs the cast.
source§

impl<T> Conv for T

source§

fn conv<T>(self) -> T
where Self: Into<T>,

Converts self into T using Into<T>. Read more
source§

impl<T> FmtForward for T

source§

fn fmt_binary(self) -> FmtBinary<Self>
where Self: Binary,

Causes self to use its Binary implementation when Debug-formatted.
source§

fn fmt_display(self) -> FmtDisplay<Self>
where Self: Display,

Causes self to use its Display implementation when Debug-formatted.
source§

fn fmt_lower_exp(self) -> FmtLowerExp<Self>
where Self: LowerExp,

Causes self to use its LowerExp implementation when Debug-formatted.
source§

fn fmt_lower_hex(self) -> FmtLowerHex<Self>
where Self: LowerHex,

Causes self to use its LowerHex implementation when Debug-formatted.
source§

fn fmt_octal(self) -> FmtOctal<Self>
where Self: Octal,

Causes self to use its Octal implementation when Debug-formatted.
source§

fn fmt_pointer(self) -> FmtPointer<Self>
where Self: Pointer,

Causes self to use its Pointer implementation when Debug-formatted.
source§

fn fmt_upper_exp(self) -> FmtUpperExp<Self>
where Self: UpperExp,

Causes self to use its UpperExp implementation when Debug-formatted.
source§

fn fmt_upper_hex(self) -> FmtUpperHex<Self>
where Self: UpperHex,

Causes self to use its UpperHex implementation when Debug-formatted.
source§

fn fmt_list(self) -> FmtList<Self>
where &'a Self: for<'a> IntoIterator,

Formats each item in a sequence. Read more
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T> FutureExt for T

source§

fn with_context(self, otel_cx: Context) -> WithContext<Self>

Attaches the provided Context to this type, returning a WithContext wrapper. Read more
source§

fn with_current_context(self) -> WithContext<Self>

Attaches the current Context to this type, returning a WithContext wrapper. Read more
source§

impl<T> Instrument for T

source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
source§

impl<T, U> Into<U> for T
where U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T> IntoRequest<T> for T

source§

fn into_request(self) -> Request<T>

Wrap the input message T in a tonic::Request
source§

impl<Unshared, Shared> IntoShared<Shared> for Unshared
where Shared: FromUnshared<Unshared>,

source§

fn into_shared(self) -> Shared

Creates a shared type from an unshared type.
source§

impl<T, U> OverrideFrom<Option<&T>> for U
where U: OverrideFrom<T>,

source§

fn override_from(self, layer: &Option<&T>) -> U

Override the configuration represented by Self with values from the given layer.
source§

impl<T> Pipe for T
where T: ?Sized,

source§

fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> R
where Self: Sized,

Pipes by value. This is generally the method you want to use. Read more
source§

fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> R
where R: 'a,

Borrows self and passes that borrow into the pipe function. Read more
source§

fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> R
where R: 'a,

Mutably borrows self and passes that borrow into the pipe function. Read more
source§

fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
where Self: Borrow<B>, B: 'a + ?Sized, R: 'a,

Borrows self, then passes self.borrow() into the pipe function. Read more
source§

fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R, ) -> R
where Self: BorrowMut<B>, B: 'a + ?Sized, R: 'a,

Mutably borrows self, then passes self.borrow_mut() into the pipe function. Read more
source§

fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
where Self: AsRef<U>, U: 'a + ?Sized, R: 'a,

Borrows self, then passes self.as_ref() into the pipe function.
source§

fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
where Self: AsMut<U>, U: 'a + ?Sized, R: 'a,

Mutably borrows self, then passes self.as_mut() into the pipe function.
source§

fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
where Self: Deref<Target = T>, T: 'a + ?Sized, R: 'a,

Borrows self, then passes self.deref() into the pipe function.
source§

fn pipe_deref_mut<'a, T, R>( &'a mut self, func: impl FnOnce(&'a mut T) -> R, ) -> R
where Self: DerefMut<Target = T> + Deref, T: 'a + ?Sized, R: 'a,

Mutably borrows self, then passes self.deref_mut() into the pipe function.
source§

impl<T> Pointable for T

source§

const ALIGN: usize = _

The alignment of pointer.
§

type Init = T

The type for initializers.
source§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
source§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
source§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
source§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
source§

impl<P, R> ProtoType<R> for P
where R: RustType<P>,

source§

impl<T> Same for T

§

type Output = T

Should always be Self
source§

impl<'a, S, T> Semigroup<&'a S> for T
where T: Semigroup<S>,

source§

fn plus_equals(&mut self, rhs: &&'a S)

The method of std::ops::AddAssign, for types that do not implement AddAssign.
source§

impl<T> Tap for T

source§

fn tap(self, func: impl FnOnce(&Self)) -> Self

Immutable access to a value. Read more
source§

fn tap_mut(self, func: impl FnOnce(&mut Self)) -> Self

Mutable access to a value. Read more
source§

fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
where Self: Borrow<B>, B: ?Sized,

Immutable access to the Borrow<B> of a value. Read more
source§

fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
where Self: BorrowMut<B>, B: ?Sized,

Mutable access to the BorrowMut<B> of a value. Read more
source§

fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
where Self: AsRef<R>, R: ?Sized,

Immutable access to the AsRef<R> view of a value. Read more
source§

fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
where Self: AsMut<R>, R: ?Sized,

Mutable access to the AsMut<R> view of a value. Read more
source§

fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
where Self: Deref<Target = T>, T: ?Sized,

Immutable access to the Deref::Target of a value. Read more
source§

fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
where Self: DerefMut<Target = T> + Deref, T: ?Sized,

Mutable access to the Deref::Target of a value. Read more
source§

fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self

Calls .tap() only in debug builds, and is erased in release builds.
source§

fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self

Calls .tap_mut() only in debug builds, and is erased in release builds.
source§

fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
where Self: Borrow<B>, B: ?Sized,

Calls .tap_borrow() only in debug builds, and is erased in release builds.
source§

fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
where Self: BorrowMut<B>, B: ?Sized,

Calls .tap_borrow_mut() only in debug builds, and is erased in release builds.
source§

fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
where Self: AsRef<R>, R: ?Sized,

Calls .tap_ref() only in debug builds, and is erased in release builds.
source§

fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
where Self: AsMut<R>, R: ?Sized,

Calls .tap_ref_mut() only in debug builds, and is erased in release builds.
source§

fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
where Self: Deref<Target = T>, T: ?Sized,

Calls .tap_deref() only in debug builds, and is erased in release builds.
source§

fn tap_deref_mut_dbg<T>(self, func: impl FnOnce(&mut T)) -> Self
where Self: DerefMut<Target = T> + Deref, T: ?Sized,

Calls .tap_deref_mut() only in debug builds, and is erased in release builds.
source§

impl<T> TryConv for T

source§

fn try_conv<T>(self) -> Result<T, Self::Error>
where Self: TryInto<T>,

Attempts to convert self into T using TryInto<T>. Read more
source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

source§

fn vzip(self) -> V

source§

impl<T> WithSubscriber for T

source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more