
Struct Coordinator

pub struct Coordinator {
Show 44 fields controller: Controller, catalog: Arc<Catalog>, internal_cmd_tx: UnboundedSender<Message>, group_commit_tx: GroupCommitNotifier, strict_serializable_reads_tx: UnboundedSender<(ConnectionId, PendingReadTxn)>, global_timelines: BTreeMap<Timeline, TimelineState<Timestamp>>, transient_id_gen: Arc<TransientIdGen>, active_conns: BTreeMap<ConnectionId, ConnMeta>, txn_read_holds: BTreeMap<ConnectionId, ReadHolds<Timestamp>>, pending_peeks: BTreeMap<Uuid, PendingPeek>, client_pending_peeks: BTreeMap<ConnectionId, BTreeMap<Uuid, ClusterId>>, pending_linearize_read_txns: BTreeMap<ConnectionId, PendingReadTxn>, active_compute_sinks: BTreeMap<GlobalId, ActiveComputeSink>, active_webhooks: BTreeMap<CatalogItemId, WebhookAppenderInvalidator>, active_copies: BTreeMap<ConnectionId, ActiveCopyFrom>, staged_cancellation: BTreeMap<ConnectionId, (Sender<bool>, Receiver<bool>)>, introspection_subscribes: BTreeMap<GlobalId, IntrospectionSubscribe>, write_locks: BTreeMap<CatalogItemId, Arc<Mutex<()>>>, deferred_write_ops: BTreeMap<ConnectionId, DeferredOp>, pending_writes: Vec<PendingWriteTxn>, advance_timelines_interval: Interval, serialized_ddl: LockedVecDeque<DeferredPlanStatement>, secrets_controller: Arc<dyn SecretsController>, caching_secrets_reader: CachingSecretsReader, cloud_resource_controller: Option<Arc<dyn CloudResourceController>>, transient_replica_metadata: BTreeMap<ReplicaId, Option<ReplicaMetadata>>, storage_usage_client: StorageUsageClient, storage_usage_collection_interval: Duration, segment_client: Option<Client>, metrics: Metrics, optimizer_metrics: OptimizerMetrics, tracing_handle: TracingHandle, statement_logging: StatementLogging, webhook_concurrency_limit: WebhookConcurrencyLimiter, pg_timestamp_oracle_config: Option<PostgresTimestampOracleConfig>, check_cluster_scheduling_policies_interval: Interval, cluster_scheduling_decisions: BTreeMap<ClusterId, BTreeMap<&'static str, SchedulingDecision>>, caught_up_check_interval: Interval, caught_up_check: Option<CaughtUpCheckContext>, installed_watch_sets: BTreeMap<WatchSetId, (ConnectionId, WatchSetResponse)>, connection_watch_sets: BTreeMap<ConnectionId, BTreeSet<WatchSetId>>, cluster_replica_statuses: ClusterReplicaStatuses, read_only_controllers: bool, buffered_builtin_table_updates: Option<Vec<BuiltinTableUpdate>>,
Expand description

Glues the external world to the Timely workers.


§controller: Controller

The controller for the storage and compute layers.

§catalog: Arc<Catalog>

The catalog in an Arc suitable for readonly references. The Arc allows us to hand out cheap copies of the catalog to functions that can use it off of the main coordinator thread. If the coordinator needs to mutate the catalog, call Self::catalog_mut, which will clone this struct member, allowing it to be mutated here while the other off-thread references can read their catalog as long as needed. In the future we would like this to be a pTVC, but for now this is sufficient.

§internal_cmd_tx: UnboundedSender<Message>

Channel to manage internal commands from the coordinator to itself.

§group_commit_tx: GroupCommitNotifier

Notification that triggers a group commit.

§strict_serializable_reads_tx: UnboundedSender<(ConnectionId, PendingReadTxn)>

Channel for strict serializable reads ready to commit.

§global_timelines: BTreeMap<Timeline, TimelineState<Timestamp>>

Mechanism for totally ordering write and read timestamps, so that all reads reflect exactly the set of writes that precede them, and no writes that follow.

§transient_id_gen: Arc<TransientIdGen>

A generator for transient GlobalIds, shareable with other threads.

§active_conns: BTreeMap<ConnectionId, ConnMeta>

A map from connection ID to metadata about that connection for all active connections.

§txn_read_holds: BTreeMap<ConnectionId, ReadHolds<Timestamp>>

For each transaction, the read holds taken to support any performed reads.

Upon completing a transaction, these read holds should be dropped.

§pending_peeks: BTreeMap<Uuid, PendingPeek>

Access to the peek fields should be restricted to methods in the peek API. A map from pending peek ids to the queue into which responses are sent, and the connection id of the client that initiated the peek.

§client_pending_peeks: BTreeMap<ConnectionId, BTreeMap<Uuid, ClusterId>>

A map from client connection ids to a set of all pending peeks for that client.

§pending_linearize_read_txns: BTreeMap<ConnectionId, PendingReadTxn>

A map from client connection ids to pending linearize read transaction.

§active_compute_sinks: BTreeMap<GlobalId, ActiveComputeSink>

A map from the compute sink ID to it’s state description.

§active_webhooks: BTreeMap<CatalogItemId, WebhookAppenderInvalidator>

A map from active webhooks to their invalidation handle.

§active_copies: BTreeMap<ConnectionId, ActiveCopyFrom>

A map of active COPY FROM statements. The Coordinator waits for clusterd to stage Batches in Persist that we will then link into the shard.

§staged_cancellation: BTreeMap<ConnectionId, (Sender<bool>, Receiver<bool>)>

A map from connection ids to a watch channel that is set to true if the connection received a cancel request.

§introspection_subscribes: BTreeMap<GlobalId, IntrospectionSubscribe>

Active introspection subscribes.

§write_locks: BTreeMap<CatalogItemId, Arc<Mutex<()>>>

Locks that grant access to a specific object, populated lazily as objects are written to.

§deferred_write_ops: BTreeMap<ConnectionId, DeferredOp>

Plans that are currently deferred and waiting on a write lock.

§pending_writes: Vec<PendingWriteTxn>

Pending writes waiting for a group commit.

§advance_timelines_interval: Interval

For the realtime timeline, an explicit SELECT or INSERT on a table will bump the table’s timestamps, but there are cases where timestamps are not bumped but we expect the closed timestamps to advance (AS OF X, SUBSCRIBing views over RT sources and tables). To address these, spawn a task that forces table timestamps to close on a regular interval. This roughly tracks the behavior of realtime sources that close off timestamps on an interval.

For non-realtime timelines, nothing pushes the timestamps forward, so we must do it manually.

§serialized_ddl: LockedVecDeque<DeferredPlanStatement>

Serialized DDL. DDL must be serialized because:

  • Many of them do off-thread work and need to verify the catalog is in a valid state, but PlanValidity does not currently support tracking all changes. Doing that correctly seems to be more difficult than it’s worth, so we would instead re-plan and re-sequence the statements.
  • Re-planning a statement is hard because Coordinator and Session state is mutated at various points, and we would need to correctly reset those changes before re-planning and re-sequencing.
§secrets_controller: Arc<dyn SecretsController>

Handle to secret manager that can create and delete secrets from an arbitrary secret storage engine.

§caching_secrets_reader: CachingSecretsReader

A secrets reader than maintains an in-memory cache, where values have a set TTL.

§cloud_resource_controller: Option<Arc<dyn CloudResourceController>>

Handle to a manager that can create and delete kubernetes resources (ie: VpcEndpoint objects)

§transient_replica_metadata: BTreeMap<ReplicaId, Option<ReplicaMetadata>>

Metadata about replicas that doesn’t need to be persisted. Intended for inclusion in system tables.

None is used as a tombstone value for replicas that have been dropped and for which no further updates should be recorded.

§storage_usage_client: StorageUsageClient

Persist client for fetching storage metadata such as size metrics.

§storage_usage_collection_interval: Duration

The interval at which to collect storage usage information.

§segment_client: Option<Client>

Segment analytics client.

§metrics: Metrics

Coordinator metrics.

§optimizer_metrics: OptimizerMetrics

Optimizer metrics.

§tracing_handle: TracingHandle

Tracing handle.

§statement_logging: StatementLogging

Data used by the statement logging feature.

§webhook_concurrency_limit: WebhookConcurrencyLimiter

Limit for how many concurrent webhook requests we allow.

§pg_timestamp_oracle_config: Option<PostgresTimestampOracleConfig>

Optional config for the Postgres-backed timestamp oracle. This is required when postgres is configured using the timestamp_oracle system variable.

§check_cluster_scheduling_policies_interval: Interval

Periodically asks cluster scheduling policies to make their decisions.

§cluster_scheduling_decisions: BTreeMap<ClusterId, BTreeMap<&'static str, SchedulingDecision>>

This keeps the last On/Off decision for each cluster and each scheduling policy. (Clusters that have been dropped or are otherwise out of scope for automatic scheduling are periodically cleaned up from this Map.)

§caught_up_check_interval: Interval

When doing 0dt upgrades/in read-only mode, periodically ask all known clusters/collections whether they are caught up.

§caught_up_check: Option<CaughtUpCheckContext>

Context needed to check whether all clusters/collections have caught up. Only used during 0dt deployment, while in read-only mode.

§installed_watch_sets: BTreeMap<WatchSetId, (ConnectionId, WatchSetResponse)>

Tracks the state associated with the currently installed watchsets.

§connection_watch_sets: BTreeMap<ConnectionId, BTreeSet<WatchSetId>>

Tracks the currently installed watchsets for each connection.

§cluster_replica_statuses: ClusterReplicaStatuses

Tracks the statuses of all cluster replicas.

§read_only_controllers: bool

Whether or not to start controllers in read-only mode. This is only meant for use during development of read-only clusters and 0dt upgrades and should go away once we have proper orchestration during upgrades.

§buffered_builtin_table_updates: Option<Vec<BuiltinTableUpdate>>

Updates to builtin tables that are being buffered while we are in read-only mode. We apply these all at once when coming out of read-only mode.

This is a Some while in read-only mode and will be replaced by a None when we transition out of read-only mode and write out any buffered updates.



impl Coordinator


pub fn resolve_collection_id_bundle_names( &self, session: &Session, id_bundle: &CollectionIdBundle, ) -> Vec<String>

Resolves the full name from the corresponding catalog entry for each item in id_bundle. If an item in the bundle does not exist in the catalog, it’s not included in the result.


impl Coordinator


pub async fn implement_peek_plan( &mut self, ctx_extra: &mut ExecuteContextExtra, plan: PlannedPeek, finishing: RowSetFinishing, compute_instance: ComputeInstanceId, target_replica: Option<ReplicaId>, max_result_size: u64, max_returned_query_size: Option<u64>, ) -> Result<ExecuteResponse, AdapterError>

Implements a peek plan produced by create_plan above.


pub(crate) fn cancel_pending_peeks(&mut self, conn_id: &ConnectionId)

Cancel and remove all pending peeks that were initiated by the client with conn_id.


pub(crate) fn handle_peek_notification( &mut self, uuid: Uuid, notification: PeekNotification, otel_ctx: OpenTelemetryContext, )

Handle a peek notification and retire the corresponding execution. Does nothing for already-removed peeks.


pub(crate) fn remove_pending_peek(&mut self, uuid: &Uuid) -> Option<PendingPeek>

Clean up a peek’s state.


pub(crate) fn send_immediate_rows<I>(rows: I) -> ExecuteResponse
where I: IntoRowIterator, I::Iter: Send + Sync + 'static,

Constructs an ExecuteResponse that that will send some rows to the client immediately, as opposed to asking the dataflow layer to send along the rows after some computation.


impl Coordinator


pub(crate) fn spawn_statement_logging_task(&self)


pub(crate) fn drain_statement_log(&mut self)


fn statement_logging_throttling_check(&mut self, cost: usize) -> Option<usize>

Check whether we need to do throttling (i.e., whether STATEMENT_LOGGING_TARGET_DATA_RATE is set). If so, actually do the check.

Returns None if we must throttle this statement, and Some(n) otherwise, where n is the number of statements that were dropped due to throttling before this one.


pub(crate) fn log_prepared_statement( &mut self, session: &mut Session, logging: &Arc<QCell<PreparedStatementLoggingInfo>>, ) -> Option<(Option<(StatementPreparedRecord, PreparedStatementEvent)>, Uuid)>

Returns any statement logging events needed for a particular prepared statement. Possibly mutates the PreparedStatementLoggingInfo metadata.

This function does not do a sampling check, and assumes we did so in a higher layer.

It does do a throttling check, and returns None if we must not log due to throttling.


pub fn statement_execution_sample_rate(&self, session: &Session) -> f64

The rate at which statement execution should be sampled. This is the value of the session var statement_logging_sample_rate, constrained by the system var statement_logging_max_sample_rate.


pub fn end_statement_execution( &mut self, id: StatementLoggingId, reason: StatementEndedExecutionReason, )

Record the end of statement execution for a statement whose beginning was logged. It is an error to call this function for a statement whose beginning was not logged (because it was not sampled). Requiring the opaque StatementLoggingId type, which is only instantiated by begin_statement_execution if the statement is actually logged, should prevent this.


fn pack_statement_execution_inner( record: &StatementBeganExecutionRecord, packer: &mut RowPacker<'_>, )


fn pack_statement_began_execution_update( record: &StatementBeganExecutionRecord, ) -> Row


fn pack_statement_prepared_update( record: &StatementPreparedRecord, packer: &mut RowPacker<'_>, )


fn pack_session_history_update(event: &SessionHistoryEvent) -> Row


fn pack_statement_lifecycle_event( StatementLoggingId: &StatementLoggingId, event: &StatementLifecycleEvent, when: EpochMillis, ) -> Row


pub fn pack_full_statement_execution_update( began_record: &StatementBeganExecutionRecord, ended_record: &StatementEndedExecutionRecord, ) -> Row


pub fn pack_statement_ended_execution_updates( began_record: &StatementBeganExecutionRecord, ended_record: &StatementEndedExecutionRecord, ) -> [(Row, Diff); 2]


fn mutate_record<F: FnOnce(&mut StatementBeganExecutionRecord)>( &mut self, StatementLoggingId: StatementLoggingId, f: F, )

Mutate a statement execution record via the given function f.


pub fn set_statement_execution_cluster( &mut self, id: StatementLoggingId, cluster_id: ClusterId, )

Set the cluster_id for a statement, once it’s known.


pub fn set_statement_execution_timestamp( &mut self, id: StatementLoggingId, timestamp: Timestamp, )

Set the execution_timestamp for a statement, once it’s known


pub fn set_transient_index_id( &mut self, id: StatementLoggingId, transient_index_id: GlobalId, )


pub fn begin_statement_execution( &mut self, session: &mut Session, params: &Params, logging: &Arc<QCell<PreparedStatementLoggingInfo>>, ) -> Option<StatementLoggingId>

Possibly record the beginning of statement execution, depending on a randomly-chosen value. If the execution beginning was indeed logged, returns a StatementLoggingId that must be passed to end_statement_execution to record when it ends.


pub fn begin_session_for_statement_logging(&mut self, session: &ConnMeta)

Record a new connection event


pub fn end_session_for_statement_logging(&mut self, uuid: Uuid)


pub fn record_statement_lifecycle_event( &mut self, id: &StatementLoggingId, event: &StatementLifecycleEvent, when: EpochMillis, )


impl Coordinator


pub(crate) fn now(&self) -> EpochMillis


pub(crate) fn now_datetime(&self) -> DateTime<Utc>


pub(crate) fn get_timestamp_oracle( &self, timeline: &Timeline, ) -> Arc<dyn TimestampOracle<Timestamp> + Send + Sync>


pub(crate) fn get_local_timestamp_oracle( &self, ) -> Arc<dyn TimestampOracle<Timestamp> + Send + Sync>

Returns a TimestampOracle used for reads and writes from/to a local input.


pub(crate) async fn get_local_read_ts(&self) -> Timestamp

Assign a timestamp for a read from a local input. Reads following writes must be at a time >= the write’s timestamp; we choose “equal to” for simplicity’s sake and to open as few new timestamps as possible.


pub(crate) async fn get_local_write_ts(&mut self) -> WriteTimestamp

Assign a timestamp for a write to a local input and increase the local ts. Writes following reads must ensure that they are assigned a strictly larger timestamp to ensure they are not visible to any real-time earlier reads.


pub(crate) async fn peek_local_write_ts(&self) -> Timestamp

Peek the current timestamp used for operations on local inputs. Used to determine how much to block group commits by.


pub(crate) fn apply_local_write( &self, timestamp: Timestamp, ) -> impl Future<Output = ()> + Send + 'static

Marks a write at timestamp as completed, using a TimestampOracle.


pub(crate) async fn get_catalog_write_ts(&mut self) -> Timestamp

Assign a timestamp for a write to the catalog. This timestamp should have the following properties:

  • Monotonically increasing.
  • Greater than or equal to the current catalog upper.
  • Greater than the largest write timestamp used in the epoch millisecond timeline.

In general this is fully satisfied by the getting the current write timestamp in the epoch millisecond timeline from the timestamp oracle, however, in read-only mode we cannot modify the timestamp oracle.


pub(crate) async fn ensure_timeline_state<'a>( &'a mut self, timeline: &'a Timeline, ) -> &'a mut TimelineState<Timestamp>

Ensures that a global timeline state exists for timeline.


pub(crate) async fn ensure_timeline_state_with_initial_time<'a>( timeline: &'a Timeline, initially: Timestamp, now: NowFn, pg_oracle_config: Option<PostgresTimestampOracleConfig>, global_timelines: &'a mut BTreeMap<Timeline, TimelineState<Timestamp>>, read_only: bool, ) -> &'a mut TimelineState<Timestamp>

Ensures that a global timeline state exists for timeline, with an initial time of initially.


pub(crate) fn build_collection_id_bundle( &self, storage_ids: impl IntoIterator<Item = GlobalId>, compute_ids: impl IntoIterator<Item = (ComputeInstanceId, GlobalId)>, clusters: impl IntoIterator<Item = ComputeInstanceId>, ) -> CollectionIdBundle

Groups together storage and compute resources into a CollectionIdBundle


pub(crate) fn remove_resources_associated_with_timeline( &mut self, timeline: Timeline, ids: CollectionIdBundle, ) -> bool

Given a Timeline and a CollectionIdBundle, removes all of the “storage ids” and “compute ids” in the bundle, from the timeline.


pub(crate) fn remove_compute_ids_from_timeline<I>( &mut self, ids: I, ) -> Vec<Timeline>


pub(crate) fn ids_in_timeline(&self, timeline: &Timeline) -> CollectionIdBundle


pub(crate) fn validate_timeline_context<I>( &self, ids: I, ) -> Result<TimelineContext, AdapterError>
where I: IntoIterator<Item = GlobalId>,

Return an error if the ids are from incompatible TimelineContexts. This should be used to prevent users from doing things that are either meaningless (joining data from timelines that have similar numbers with different meanings like two separate debezium topics) or will never complete (joining cdcv2 and realtime data).


pub(crate) fn get_timeline_context(&self, id: CatalogItemId) -> TimelineContext

Return the TimelineContext belonging to a CatalogItemId, if one exists.


pub(crate) fn get_timeline_context_for_global_id( &self, id: GlobalId, ) -> TimelineContext

Return the TimelineContext belonging to a GlobalId, if one exists.


fn get_timeline_contexts<I>(&self, ids: I) -> BTreeSet<TimelineContext>
where I: IntoIterator<Item = CatalogItemId>,

Return the TimelineContexts belonging to a list of CatalogItemIds, if any exist.


pub fn partition_ids_by_timeline_context( &self, id_bundle: &CollectionIdBundle, ) -> impl Iterator<Item = (TimelineContext, CollectionIdBundle)>

Returns an iterator that partitions an id bundle by the TimelineContext that each id belongs to.


pub(crate) fn timedomain_for<'a, I>( &self, uses_ids: I, timeline_context: &TimelineContext, conn_id: &ConnectionId, compute_instance: ComputeInstanceId, ) -> Result<CollectionIdBundle, AdapterError>
where I: IntoIterator<Item = &'a GlobalId>,

Return the set of ids in a timedomain and verify timeline correctness.

When a user starts a transaction, we need to prevent compaction of anything they might read from. We use a heuristic of “anything in the same database schemas with the same timeline as whatever the first query is”.


pub(crate) async fn advance_timelines(&mut self)


impl Coordinator


pub(crate) async fn oracle_read_ts( &self, session: &Session, timeline_ctx: &TimelineContext, when: &QueryWhen, ) -> Option<Timestamp>


pub(crate) fn determine_timestamp( &self, session: &Session, id_bundle: &CollectionIdBundle, when: &QueryWhen, compute_instance: ComputeInstanceId, timeline_context: &TimelineContext, oracle_read_ts: Option<Timestamp>, real_time_recency_ts: Option<Timestamp>, ) -> Result<(TimestampDetermination<Timestamp>, ReadHolds<Timestamp>), AdapterError>

Determines the timestamp for a query, acquires read holds that ensure the query remains executable at that time, and returns those.

The caller is responsible for eventually dropping those read holds.


pub(crate) fn largest_not_in_advance_of_upper( upper: &Antichain<Timestamp>, ) -> Timestamp

The largest element not in advance of any object in the collection.

Times that are not greater to this frontier are complete for all collections identified as arguments.


pub(crate) fn evaluate_when( catalog: &CatalogState, timestamp: MirScalarExpr, session: &Session, ) -> Result<Timestamp, AdapterError>


impl Coordinator


pub(crate) fn trigger_group_commit(&mut self)

Send a message to the Coordinate to start a group commit.


pub(crate) async fn try_deferred( &mut self, conn_id: ConnectionId, acquired_lock: Option<(CatalogItemId, OwnedMutexGuard<()>)>, )

Tries to execute a previously DeferredOp that requires write locks.

If we can’t acquire all of the write locks then we’ll defer the plan again and wait for the necessary locks to become available.


pub(crate) async fn try_group_commit( &mut self, permit: Option<GroupCommitPermit>, )

Attempts to commit all pending write transactions in a group commit. If the timestamp chosen for the writes is not ahead of now(), then we can execute and commit the writes immediately. Otherwise we must wait for now() to advance past the timestamp chosen for the writes.


pub(crate) async fn group_commit( &mut self, permit: Option<GroupCommitPermit>, ) -> Timestamp

Tries to commit all pending writes transactions at the same timestamp.

If the caller of this function has the write_lock acquired, then they can optionally pass it in to this method. If the caller does not have the write_lock acquired and the write_lock is currently locked by another operation, then only writes to system tables and table advancements will be applied. If the caller does not have the write_lock acquired and the write_lock is not currently locked by another operation, then group commit will acquire it and all writes will be applied.

All applicable pending writes will be combined into a single Append command and sent to STORAGE as a single batch. All applicable writes will happen at the same timestamp and all involved tables will be advanced to some timestamp larger than the timestamp of the write.

Returns the timestamp of the write.


pub(crate) fn submit_write(&mut self, pending_write_txn: PendingWriteTxn)

Submit a write to be executed during the next group commit and trigger a group commit.


pub(crate) fn builtin_table_update<'a>(&'a mut self) -> BuiltinTableAppend<'a>

Append some BuiltinTableUpdates, with various degrees of waiting and blocking.


pub(crate) fn defer_op<F>(&mut self, acquire_future: F, op: DeferredOp)
where F: Future<Output = Option<(CatalogItemId, OwnedMutexGuard<()>)>> + Send + 'static,


pub(crate) fn grant_object_write_lock( &mut self, object_id: CatalogItemId, ) -> impl Future<Output = (CatalogItemId, OwnedMutexGuard<()>)> + 'static

Returns a future that waits until it can get an exclusive lock on the specified collection.


pub(crate) fn try_grant_object_write_lock( &mut self, object_id: CatalogItemId, ) -> Option<OwnedMutexGuard<()>>

Lazily creates the lock for the provided object_id, and grants it if possible, returns None if the lock is already held.


impl Coordinator


pub async fn maybe_check_caught_up(&mut self)

Checks that all clusters/collections are caught up. If so, this will trigger self.catchup_check.trigger.

This method is a no-op when the trigger has already been fired.


async fn maybe_check_caught_up_new(&mut self)


async fn clusters_caught_up( &self, allowed_lag: Timestamp, cutoff: Timestamp, now: Timestamp, live_frontiers: &BTreeMap<GlobalId, Antichain<Timestamp>>, exclude_collections: &BTreeSet<GlobalId>, ) -> bool

Returns true if all non-transient, non-excluded collections have their write frontier (aka. upper) within allowed_lag of the “live” frontier reported in live_frontiers. The “live” frontiers are frontiers as reported by a currently running environmentd deployment, during a 0dt upgrade.

Collections whose write frontier is behind now by more than the cutoff are ignored.

For this check, zero-replica clusters are always considered caught up. Their collections would never normally be considered caught up but it’s clearly intentional that they have no replicas.


async fn collections_caught_up( &self, cluster: &Cluster, allowed_lag: Timestamp, cutoff: Timestamp, now: Timestamp, live_frontiers: &BTreeMap<GlobalId, Antichain<Timestamp>>, exclude_collections: &BTreeSet<GlobalId>, ) -> Result<bool, Error>

Returns true if all non-transient, non-excluded collections have their write frontier (aka. upper) within allowed_lag of the “live” frontier reported in live_frontiers. The “live” frontiers are frontiers as reported by a currently running environmentd deployment, during a 0dt upgrade.

Collections whose write frontier is behind now by more than the cutoff are ignored.

This also returns true in case this cluster does not have any replicas.


async fn maybe_check_caught_up_legacy(&mut self)


impl Coordinator


pub(crate) async fn check_scheduling_policies(&mut self)

Call each scheduling policy.


fn check_refresh_policy(&self)

Runs the SCHEDULE = ON REFRESH cluster scheduling policy, which makes cluster On/Off decisions based on REFRESH materialized view write frontiers and the current time (the local oracle read ts), and sends Message::SchedulingDecisions with these decisions. (Queries the timestamp oracle on a background task.)


pub(crate) async fn handle_scheduling_decisions( &mut self, decisions: Vec<(&'static str, Vec<(ClusterId, SchedulingDecision)>)>, )

Handles SchedulingDecisions:

  1. Adds the newly made decisions to cluster_scheduling_decisions.
  2. Cleans up old decisions that are for clusters no longer in scope of automated scheduling decisions.
  3. For each cluster, it sums up cluster_scheduling_decisions, checks the summed up decision against the cluster state, and turns cluster On/Off if needed.

fn get_managed_cluster_config( &self, cluster_id: ClusterId, ) -> Option<ClusterVariantManaged>

Returns the managed config for a cluster. Returns None if the cluster doesn’t exist or if it’s an unmanaged cluster.


impl Coordinator


pub(crate) fn handle_command(&mut self, cmd: Command) -> LocalBoxFuture<'_, ()>

BOXED FUTURE: As of Nov 2023 the returned Future from this function was 58KB. This would get stored on the stack which is bad for runtime performance, and blow up our stack usage. Because of that we purposefully move this Future onto the heap (i.e. Box it).


async fn handle_startup( &mut self, tx: Sender<Result<StartupResponse, AdapterError>>, user: User, conn_id: ConnectionId, secret_key: u32, uuid: Uuid, client_ip: Option<IpAddr>, application_name: String, notice_tx: UnboundedSender<AdapterNotice>, )


async fn handle_startup_inner( &mut self, user: &User, conn_id: &ConnectionId, client_ip: &Option<IpAddr>, ) -> Result<(RoleId, BTreeMap<String, OwnedVarInput>), AdapterError>


pub(crate) async fn handle_execute( &mut self, portal_name: String, session: Session, tx: ClientTransmitter<ExecuteResponse>, outer_context: Option<ExecuteContextExtra>, )

Handles an execute command.


pub(crate) async fn handle_execute_inner( &mut self, stmt: Arc<Statement<Raw>>, params: Params, ctx: ExecuteContext, )


fn must_serialize_ddl(stmt: &Statement<Raw>, ctx: &ExecuteContext) -> bool

Whether the statement must be serialized and is DDL.


fn must_spawn_purification<A: AstInfo>(stmt: &Statement<A>) -> bool

Whether the statement must be purified off of the Coordinator thread.


async fn resolve_mz_now_for_create_materialized_view( &mut self, cmvs: &CreateMaterializedViewStatement<Aug>, resolved_ids: &ResolvedIds, session: &Session, acquire_read_holds: bool, ) -> Result<Option<Timestamp>, AdapterError>

Chooses a timestamp for mz_now(), if mz_now() occurs in a REFRESH option of the materialized view. Additionally, if acquire_read_holds is true and the MV has any REFRESH option, this function grabs read holds at the earliest possible time on input collections that might be involved in the MV.

Note that this is NOT what handles mz_now() in the query part of the MV. (handles it only in with_options).

(Note that the chosen timestamp won’t be the same timestamp as the system table inserts, unfortunately.)


async fn handle_cancel(&mut self, conn_id: ConnectionIdType, secret_key: u32)

Instruct the dataflow layer to cancel any ongoing, interactive work for the named conn_id if the correct secret key is specified.

Note: Here we take a ConnectionIdType as opposed to an owned ConnectionId because this method gets called by external clients when they request to cancel a request.


pub(crate) async fn handle_privileged_cancel(&mut self, conn_id: ConnectionId)

Unconditionally instructs the dataflow layer to cancel any ongoing, interactive work for the named conn_id.


async fn handle_terminate(&mut self, conn_id: ConnectionId)

Handle termination of a client session.

This cleans up any state in the coordinator associated with the session.


fn handle_get_webhook( &mut self, database: String, schema: String, name: String, tx: Sender<Result<AppendWebhookResponse, AppendWebhookError>>, )

Returns the necessary metadata for appending to a webhook source, and a channel to send rows.


impl Coordinator


pub fn check_consistency(&self) -> Result<(), CoordinatorInconsistencies>

Checks the Coordinator to make sure we’re internally consistent.


fn check_read_holds(&self) -> Result<(), Vec<ReadHoldsInconsistency>>

  • Read holds should reference known objects.

fn check_active_webhooks(&self) -> Result<(), Vec<ActiveWebhookInconsistency>>

  • All GlobalIds in the active_webhooks map should reference known webhook sources.

fn check_cluster_statuses(&self) -> Result<(), Vec<ClusterStatusInconsistency>>

  • All ClusterIds in the cluster_replica_statuses map should reference known clusters.
  • All ReplicaIds in the cluster_replica_statuses map should reference known cluster replicas.

impl Coordinator


pub(crate) async fn catalog_transact( &mut self, session: Option<&Session>, ops: Vec<Op>, ) -> Result<(), AdapterError>

Same as Self::catalog_transact_conn but takes a Session.


pub(crate) async fn catalog_transact_with_side_effects<'c, F, Fut>( &'c mut self, session: Option<&Session>, ops: Vec<Op>, side_effect: F, ) -> Result<(), AdapterError>
where F: FnOnce(&'c mut Coordinator) -> Fut, Fut: Future<Output = ()>,

Same as Self::catalog_transact_conn but takes a Session and runs builtin table updates concurrently with any side effects (e.g. creating collections).


pub(crate) async fn catalog_transact_conn( &mut self, conn_id: Option<&ConnectionId>, ops: Vec<Op>, ) -> Result<(), AdapterError>

Same as Self::catalog_transact_inner but awaits the table updates.


pub(crate) async fn catalog_transact_with_ddl_transaction( &mut self, session: &mut Session, ops: Vec<Op>, ) -> Result<(), AdapterError>

Executes a Catalog transaction with handling if the provided Session is in a SQL transaction that is executing DDL.


pub(crate) async fn catalog_transact_inner<'a>( &mut self, conn_id: Option<&ConnectionId>, ops: Vec<Op>, ) -> Result<Pin<Box<dyn Future<Output = ()> + Send + Sync + 'static>>, AdapterError>

Perform a catalog transaction. Coordinator::ship_dataflow must be called after this function successfully returns on any built DataflowDesc.


fn drop_replica(&mut self, cluster_id: ClusterId, replica_id: ReplicaId)


fn drop_sources(&mut self, sources: Vec<(CatalogItemId, GlobalId)>)

A convenience method for dropping sources.


fn drop_tables(&mut self, tables: Vec<(CatalogItemId, GlobalId)>, ts: Timestamp)


fn restart_webhook_sources( &mut self, sources: impl IntoIterator<Item = CatalogItemId>, )


pub async fn drop_compute_sink( &mut self, sink_id: GlobalId, ) -> Option<ActiveComputeSink>

Like drop_compute_sinks, but for a single compute sink.

Returns the controller’s state for the compute sink if the identified sink was known to the controller. It is the caller’s responsibility to retire the returned sink. Consider using retire_compute_sinks instead.


pub async fn drop_compute_sinks( &mut self, sink_ids: impl IntoIterator<Item = GlobalId>, ) -> BTreeMap<GlobalId, ActiveComputeSink>

Drops a batch of compute sinks.

For each sink that exists, the coordinator and controller’s state associated with the sink is removed.

Returns a map containing the controller’s state for each sink that was removed. It is the caller’s responsibility to retire the returned sinks. Consider using retire_compute_sinks instead.


pub async fn retire_compute_sinks( &mut self, reasons: BTreeMap<GlobalId, ActiveComputeSinkRetireReason>, )

Retires a batch of sinks with disparate reasons for retirement.

Each sink identified in reasons is dropped (see drop_compute_sinks), then retired with its corresponding reason.


pub async fn drop_reconfiguration_replicas( &mut self, cluster_ids: BTreeSet<ClusterId>, ) -> Result<(), AdapterError>

Drops all pending replicas for a set of clusters that are undergoing reconfiguration.


pub(crate) async fn cancel_compute_sinks_for_conn( &mut self, conn_id: &ConnectionId, )

Cancels all active compute sinks for the identified connection.


pub(crate) async fn cancel_cluster_reconfigurations_for_conn( &mut self, conn_id: &ConnectionId, )

Cancels all active cluster reconfigurations sinks for the identified connection.


pub(crate) async fn retire_compute_sinks_for_conn( &mut self, conn_id: &ConnectionId, reason: ActiveComputeSinkRetireReason, )

Retires all active compute sinks for the identified connection with the specified reason.


pub(crate) async fn retire_cluster_reconfigurations_for_conn( &mut self, conn_id: &ConnectionId, )

Cleans pending cluster reconfiguraiotns for the identified connection


pub(crate) fn drop_storage_sinks(&mut self, sink_gids: Vec<GlobalId>)


pub(crate) fn drop_indexes(&mut self, indexes: Vec<(ClusterId, GlobalId)>)


fn drop_materialized_views(&mut self, mviews: Vec<(ClusterId, GlobalId)>)

A convenience method for dropping materialized views.


fn drop_continual_tasks( &mut self, cts: Vec<(CatalogItemId, ClusterId, GlobalId)>, )

A convenience method for dropping continual tasks.


fn drop_vpc_endpoints_in_background(&self, vpc_endpoints: Vec<CatalogItemId>)


pub(crate) async fn drop_temp_items(&mut self, conn_id: &ConnectionId)

Removes all temporary items created by the specified connection, though not the temporary schema itself.


fn update_cluster_scheduling_config(&self)


fn update_secrets_caching_config(&self)


fn update_tracing_config(&self)


fn update_compute_config(&mut self)


fn update_storage_config(&mut self)


fn update_pg_timestamp_oracle_config(&self)


fn update_metrics_retention(&self)


fn update_arrangement_exert_proportionality(&mut self)


fn update_http_config(&mut self)


pub(crate) async fn create_storage_export( &mut self, id: GlobalId, sink: &Sink, ) -> Result<(), AdapterError>


fn validate_resource_limits( &self, ops: &Vec<Op>, conn_id: &ConnectionId, ) -> Result<(), AdapterError>

Validate all resource limits in a catalog transaction and return an error if that limit is exceeded.


pub(crate) fn validate_resource_limit<F>( &self, current_amount: usize, new_instances: i64, resource_limit: F, resource_type: &str, limit_name: &str, ) -> Result<(), AdapterError>
where F: Fn(&SystemVars) -> u32,

Validate a specific type of resource limit and return an error if that limit is exceeded.


fn validate_resource_limit_numeric<F>( &self, current_amount: Numeric, new_amount: Numeric, resource_limit: F, resource_type: &str, limit_name: &str, ) -> Result<(), AdapterError>
where F: Fn(&SystemVars) -> Numeric,

Validate a specific type of float resource limit and return an error if that limit is exceeded.

This is very similar to Self::validate_resource_limit but for numerics.


impl Coordinator


pub fn index_oracle(&self, instance: ComputeInstanceId) -> DataflowBuilder<'_>

Creates a new index oracle for the specified compute instance.


impl Coordinator


pub(super) async fn bootstrap_introspection_subscribes(&mut self)

Installs introspection subscribes on all existing replicas.

Meant to be invoked during coordinator bootstrapping.


pub(super) async fn install_introspection_subscribes( &mut self, cluster_id: ClusterId, replica_id: ReplicaId, )

Installs introspection subscribes on the given replica.


async fn install_introspection_subscribe( &mut self, cluster_id: ClusterId, replica_id: ReplicaId, spec: &'static SubscribeSpec, )


async fn sequence_introspection_subscribe( &mut self, subscribe_id: GlobalId, spec: &'static SubscribeSpec, cluster_id: ClusterId, replica_id: ReplicaId, )


fn sequence_introspection_subscribe_optimize_mir( &self, stage: IntrospectionSubscribeOptimizeMir, ) -> Result<StageResult<Box<IntrospectionSubscribeStage>>, AdapterError>


fn sequence_introspection_subscribe_timestamp_optimize_lir( &self, stage: IntrospectionSubscribeTimestampOptimizeLir, ) -> Result<StageResult<Box<IntrospectionSubscribeStage>>, AdapterError>


async fn sequence_introspection_subscribe_finish( &mut self, stage: IntrospectionSubscribeFinish, ) -> Result<StageResult<Box<IntrospectionSubscribeStage>>, AdapterError>


pub(super) fn drop_introspection_subscribes(&mut self, replica_id: ReplicaId)

Drops the introspection subscribes installed on the given replica.

Dropping an introspection subscribe entails:


fn drop_introspection_subscribe(&mut self, id: GlobalId)


async fn reinstall_introspection_subscribe(&mut self, id: GlobalId)


pub(super) async fn handle_introspection_subscribe_batch( &mut self, id: GlobalId, batch: SubscribeBatch, )

Processes a batch returned by an introspection subscribe.

Depending on the contents of the batch, this either appends received updates to the corresponding storage-managed collection, or reinstalls a disconnected subscribe.


impl Coordinator


pub(crate) async fn handle_message(&mut self, msg: Message)

BOXED FUTURE: As of Nov 2023 the returned Future from this function was 74KB. This would get stored on the stack which is bad for runtime performance, and blow up our stack usage. Because of that we purposefully move Futures of inner function calls onto the heap (i.e. Box it).


pub async fn storage_usage_fetch(&mut self)


async fn storage_usage_update(&mut self, shards_usage: ShardsUsageReferenced)


async fn storage_usage_prune(&mut self, expired: Vec<BuiltinTableUpdate>)


pub async fn schedule_storage_usage_collection(&self)


async fn message_command(&mut self, cmd: Command)


async fn message_controller(&mut self, message: ControllerResponse)


async fn message_purified_statement_ready( &mut self, __arg1: BackgroundWorkResult<PurifiedStatement>, )


async fn message_create_connection_validation_ready( &mut self, __arg1: ValidationReady<CreateConnectionPlan>, )


async fn message_alter_connection_validation_ready( &mut self, __arg1: ValidationReady<Connection>, )


async fn message_cluster_event(&mut self, event: ClusterEvent)


async fn message_linearize_reads(&mut self)

Linearizes sending the results of a read transaction by,

  1. Holding back any results that were executed at some point in the future, until the containing timeline has advanced to that point in the future.
  2. Confirming that we are still the current leader before sending results to the client.

impl Coordinator


impl Coordinator


pub(crate) async fn initialize_storage_read_policies( &mut self, ids: BTreeSet<CatalogItemId>, compaction_window: CompactionWindow, )

Initialize the storage read policies.

This should be called only after a storage collection is created, and ideally very soon afterwards. The collection is otherwise initialized with a read policy that allows no compaction.


pub(crate) async fn initialize_compute_read_policies( &mut self, ids: Vec<GlobalId>, instance: ComputeInstanceId, compaction_window: CompactionWindow, )

Initialize the compute read policies.

This should be called only after a compute collection is created, and ideally very soon afterwards. The collection is otherwise initialized with a read policy that allows no compaction.


pub(crate) async fn initialize_read_policies( &mut self, id_bundle: &CollectionIdBundle, compaction_window: CompactionWindow, )

Initialize the storage and compute read policies.

This should be called only after a collection is created, and ideally very soon afterwards. The collection is otherwise initialized with a read policy that allows no compaction.


pub(crate) fn update_storage_read_policies( &self, policies: Vec<(CatalogItemId, ReadPolicy<Timestamp>)>, )


pub(crate) fn update_compute_read_policies( &self, policies: Vec<(ComputeInstanceId, CatalogItemId, ReadPolicy<Timestamp>)>, )


pub(crate) fn update_compute_read_policy( &self, compute_instance: ComputeInstanceId, item_id: CatalogItemId, base_policy: ReadPolicy<Timestamp>, )


pub(crate) fn acquire_read_holds( &self, id_bundle: &CollectionIdBundle, ) -> ReadHolds<Timestamp>

Attempt to acquire read holds on the indicated collections at the earliest available time.


Will panic if any of the referenced collections in id_bundle don’t exist.


pub(crate) fn store_transaction_read_holds( &mut self, session: &Session, read_holds: ReadHolds<Timestamp>, )

Stash transaction read holds. They will be released when the transaction is cleaned up.


impl Coordinator


pub(crate) async fn sequence_alter_cluster_staged( &mut self, ctx: ExecuteContext, plan: AlterClusterPlan, )


async fn alter_cluster_validate( &mut self, session: &Session, plan: AlterClusterPlan, ) -> Result<ClusterStage, AdapterError>


async fn sequence_alter_cluster_stage( &mut self, session: &Session, plan: AlterClusterPlan, validity: PlanValidity, ) -> Result<StageResult<Box<ClusterStage>>, AdapterError>


async fn finalize_alter_cluster_stage( &mut self, session: &Session, __arg2: AlterClusterPlan, new_config: ClusterVariantManaged, ) -> Result<StageResult<Box<ClusterStage>>, AdapterError>


async fn check_if_pending_replicas_hydrated_stage( &mut self, session: &Session, plan: AlterClusterPlan, new_config: ClusterVariantManaged, timeout_time: Instant, on_timeout: OnTimeoutAction, validity: PlanValidity, ) -> Result<StageResult<Box<ClusterStage>>, AdapterError>


pub(crate) async fn sequence_create_cluster( &mut self, session: &Session, __arg2: CreateClusterPlan, ) -> Result<ExecuteResponse, AdapterError>


async fn sequence_create_managed_cluster( &mut self, session: &Session, __arg2: CreateClusterManagedPlan, cluster_id: ClusterId, ops: Vec<Op>, ) -> Result<ExecuteResponse, AdapterError>


fn create_managed_cluster_replica_op( &self, cluster_id: ClusterId, name: String, compute: &ComputeReplicaConfig, size: &String, ops: &mut Vec<Op>, azs: Option<&[String]>, disk: bool, pending: bool, owner_id: RoleId, reason: ReplicaCreateDropReason, ) -> Result<(), AdapterError>


fn ensure_valid_azs<'a, I: IntoIterator<Item = &'a String>>( &self, azs: I, ) -> Result<(), AdapterError>


async fn sequence_create_unmanaged_cluster( &mut self, session: &Session, __arg2: CreateClusterUnmanagedPlan, id: ClusterId, ops: Vec<Op>, ) -> Result<ExecuteResponse, AdapterError>


async fn create_cluster(&mut self, cluster_id: ClusterId)


pub(crate) async fn sequence_create_cluster_replica( &mut self, session: &Session, __arg2: CreateClusterReplicaPlan, ) -> Result<ExecuteResponse, AdapterError>


async fn create_cluster_replica( &mut self, cluster_id: ClusterId, replica_id: ReplicaId, )


pub(crate) async fn sequence_alter_cluster_managed_to_managed( &mut self, session: Option<&Session>, cluster_id: ClusterId, new_config: ClusterConfig, reason: ReplicaCreateDropReason, strategy: AlterClusterPlanStrategy, ) -> Result<NeedsFinalization, AdapterError>

When this is called by the automated cluster scheduling, scheduling_decision_reason should contain information on why is a cluster being turned On/Off. It will be forwarded to the details field of the audit log event that records creating or dropping replicas.


Panics if the identified cluster is not a managed cluster. Panics if new_config is not a configuration for a managed cluster.


async fn sequence_alter_cluster_unmanaged_to_managed( &mut self, session: &Session, cluster_id: ClusterId, new_config: ClusterConfig, options: PlanClusterOption, ) -> Result<(), AdapterError>


Panics if new_config is not a configuration for a managed cluster.


async fn sequence_alter_cluster_managed_to_unmanaged( &mut self, session: &Session, cluster_id: ClusterId, new_config: ClusterConfig, ) -> Result<(), AdapterError>


async fn sequence_alter_cluster_unmanaged_to_unmanaged( &mut self, session: &Session, cluster_id: ClusterId, new_config: ClusterConfig, replicas: AlterOptionParameter<Vec<(String, ReplicaConfig)>>, ) -> Result<(), AdapterError>


pub(crate) async fn sequence_alter_cluster_rename( &mut self, session: &mut Session, __arg2: AlterClusterRenamePlan, ) -> Result<ExecuteResponse, AdapterError>


pub(crate) async fn sequence_alter_cluster_swap( &mut self, session: &mut Session, __arg2: AlterClusterSwapPlan, ) -> Result<ExecuteResponse, AdapterError>


pub(crate) async fn sequence_alter_cluster_replica_rename( &mut self, session: &Session, __arg2: AlterClusterReplicaRenamePlan, ) -> Result<ExecuteResponse, AdapterError>


pub(crate) async fn sequence_alter_set_cluster( &self, _session: &Session, __arg2: AlterSetClusterPlan, ) -> Result<ExecuteResponse, AdapterError>

Convert a AlterSetClusterPlan to a sequence of catalog operators and adjust state.


impl Coordinator


pub(crate) async fn sequence_copy_from( &mut self, ctx: ExecuteContext, plan: CopyFromPlan, target_cluster: TargetCluster, )


pub(crate) fn commit_staged_batches( &mut self, conn_id: ConnectionId, table_id: CatalogItemId, batches: Vec<Result<ProtoBatch, String>>, )


pub(crate) fn cancel_pending_copy(&mut self, conn_id: &ConnectionId)

Cancel any active COPY FROM statements/oneshot ingestions.


impl Coordinator


impl Coordinator


pub(crate) async fn sequence_create_index( &mut self, ctx: ExecuteContext, plan: CreateIndexPlan, resolved_ids: ResolvedIds, )


pub(crate) async fn explain_create_index( &mut self, ctx: ExecuteContext, __arg2: ExplainPlanPlan, )


pub(crate) async fn explain_replan_index( &mut self, ctx: ExecuteContext, __arg2: ExplainPlanPlan, )


pub(crate) fn explain_index( &mut self, ctx: &ExecuteContext, _: ExplainPlanPlan, ) -> Result<ExecuteResponse, AdapterError>


fn create_index_validate( &mut self, plan: CreateIndexPlan, resolved_ids: ResolvedIds, explain_ctx: ExplainContext, ) -> Result<CreateIndexStage, AdapterError>


async fn create_index_optimize( &mut self, __arg1: CreateIndexOptimize, ) -> Result<StageResult<Box<CreateIndexStage>>, AdapterError>


async fn create_index_finish( &mut self, session: &Session, __arg2: CreateIndexFinish, ) -> Result<StageResult<Box<CreateIndexStage>>, AdapterError>


async fn create_index_explain( &mut self, session: &Session, __arg2: CreateIndexExplain, ) -> Result<StageResult<Box<CreateIndexStage>>, AdapterError>


impl Coordinator


pub(crate) async fn sequence_create_materialized_view( &mut self, ctx: ExecuteContext, plan: CreateMaterializedViewPlan, resolved_ids: ResolvedIds, )


pub(crate) async fn explain_create_materialized_view( &mut self, ctx: ExecuteContext, __arg2: ExplainPlanPlan, )


pub(crate) async fn explain_replan_materialized_view( &mut self, ctx: ExecuteContext, __arg2: ExplainPlanPlan, )


pub(super) fn explain_materialized_view( &mut self, ctx: &ExecuteContext, _: ExplainPlanPlan, ) -> Result<ExecuteResponse, AdapterError>


fn create_materialized_view_validate( &mut self, session: &Session, plan: CreateMaterializedViewPlan, resolved_ids: ResolvedIds, explain_ctx: ExplainContext, ) -> Result<CreateMaterializedViewStage, AdapterError>


async fn create_materialized_view_optimize( &mut self, __arg1: CreateMaterializedViewOptimize, ) -> Result<StageResult<Box<CreateMaterializedViewStage>>, AdapterError>


async fn create_materialized_view_finish( &mut self, session: &Session, __arg2: CreateMaterializedViewFinish, ) -> Result<StageResult<Box<CreateMaterializedViewStage>>, AdapterError>


fn select_timestamps( &self, id_bundle: CollectionIdBundle, refresh_schedule: Option<&RefreshSchedule>, read_holds: &ReadHolds<Timestamp>, ) -> Result<(Antichain<Timestamp>, Antichain<Timestamp>, Antichain<Timestamp>), AdapterError>

Select the initial dataflow_as_of, storage_as_of, and until frontiers for a materialized view.


async fn create_materialized_view_explain( &mut self, session: &Session, __arg2: CreateMaterializedViewExplain, ) -> Result<StageResult<Box<CreateMaterializedViewStage>>, AdapterError>


pub(crate) async fn explain_pushdown_materialized_view( &self, ctx: ExecuteContext, item_id: CatalogItemId, )


impl Coordinator


pub(crate) async fn sequence_create_view( &mut self, ctx: ExecuteContext, plan: CreateViewPlan, resolved_ids: ResolvedIds, )


pub(crate) async fn explain_create_view( &mut self, ctx: ExecuteContext, __arg2: ExplainPlanPlan, )


pub(crate) async fn explain_replan_view( &mut self, ctx: ExecuteContext, __arg2: ExplainPlanPlan, )


pub(crate) fn explain_view( &mut self, ctx: &ExecuteContext, _: ExplainPlanPlan, ) -> Result<ExecuteResponse, AdapterError>


fn create_view_validate( &mut self, plan: CreateViewPlan, resolved_ids: ResolvedIds, explain_ctx: ExplainContext, ) -> Result<CreateViewStage, AdapterError>


async fn create_view_optimize( &mut self, __arg1: CreateViewOptimize, ) -> Result<StageResult<Box<CreateViewStage>>, AdapterError>


async fn create_view_finish( &mut self, session: &Session, __arg2: CreateViewFinish, ) -> Result<StageResult<Box<CreateViewStage>>, AdapterError>


async fn create_view_explain( &mut self, session: &Session, __arg2: CreateViewExplain, ) -> Result<StageResult<Box<CreateViewStage>>, AdapterError>


impl Coordinator


impl Coordinator


pub(crate) async fn sequence_peek( &mut self, ctx: ExecuteContext, plan: SelectPlan, target_cluster: TargetCluster, max_query_result_size: Option<u64>, )

Sequence a peek, determining a timestamp and the most efficient dataflow interaction.

Peeks are sequenced by assigning a timestamp for evaluation, and then determining and deploying the most efficient evaluation plan. The peek could evaluate to a constant, be a simple read out of an existing arrangement, or required a new dataflow to build the results to return.


pub(crate) async fn sequence_copy_to( &mut self, ctx: ExecuteContext, __arg2: CopyToPlan, target_cluster: TargetCluster, )


pub(crate) async fn explain_peek( &mut self, ctx: ExecuteContext, __arg2: ExplainPlanPlan, target_cluster: TargetCluster, )


pub fn peek_validate( &self, session: &Session, plan: SelectPlan, target_cluster: TargetCluster, copy_to_ctx: Option<CopyToContext>, explain_ctx: ExplainContext, max_query_result_size: Option<u64>, ) -> Result<PeekStage, AdapterError>

Do some simple validation. We must defer most of it until after any off-thread work.


async fn peek_linearize_timestamp( &self, session: &Session, __arg2: PeekStageLinearizeTimestamp, ) -> Result<StageResult<Box<PeekStage>>, AdapterError>

Possibly linearize a timestamp from a TimestampOracle.


fn peek_timestamp_read_hold( &mut self, session: &mut Session, _: PeekStageTimestampReadHold, ) -> Result<StageResult<Box<PeekStage>>, AdapterError>

Determine a read timestamp and create appropriate read holds.


async fn peek_optimize( &self, session: &Session, __arg2: PeekStageOptimize, ) -> Result<StageResult<Box<PeekStage>>, AdapterError>


async fn peek_real_time_recency( &self, session: &Session, __arg2: PeekStageRealTimeRecency, ) -> Result<StageResult<Box<PeekStage>>, AdapterError>


async fn peek_finish( &mut self, ctx: &mut ExecuteContext, __arg2: PeekStageFinish, ) -> Result<StageResult<Box<PeekStage>>, AdapterError>


async fn peek_copy_to_preflight( &mut self, copy_to: PeekStageCopyTo, ) -> Result<StageResult<Box<PeekStage>>, AdapterError>


async fn peek_copy_to_dataflow( &mut self, ctx: &ExecuteContext, __arg2: PeekStageCopyTo, ) -> Result<StageResult<Box<PeekStage>>, AdapterError>


async fn peek_explain_plan( &self, session: &Session, __arg2: PeekStageExplainPlan, ) -> Result<StageResult<Box<PeekStage>>, AdapterError>


async fn peek_explain_pushdown( &self, session: &Session, stage: PeekStageExplainPushdown, ) -> Result<StageResult<Box<PeekStage>>, AdapterError>


pub(super) fn sequence_peek_timestamp( &mut self, session: &mut Session, when: &QueryWhen, cluster_id: ClusterId, timeline_context: TimelineContext, oracle_read_ts: Option<Timestamp>, source_bundle: &CollectionIdBundle, source_ids: &BTreeSet<GlobalId>, real_time_recency_ts: Option<Timestamp>, requires_linearization: RequireLinearization, ) -> Result<TimestampDetermination<Timestamp>, AdapterError>

Determines the query timestamp and acquires read holds on dependent sources if necessary.


impl Coordinator


impl Coordinator


impl Coordinator


pub(crate) async fn sequence_staged<S>( &mut self, ctx: S::Ctx, parent_span: Span, stage: S, )
where S: Staged + 'static, S::Ctx: Send + 'static,

Sequences the next staged of a Staged plan. This is designed for use with plans that execute both on and off of the coordinator thread. Stages can either produce another stage to execute or a final response. An explicit Span is passed to allow for convenient tracing.


fn handle_spawn<C, T, F>( &self, ctx: C, handle: JoinHandle<Result<T, AdapterError>>, cancel_enabled: bool, f: F, )
where C: StagedContext + Send + 'static, T: Send + 'static, F: FnOnce(C, T) + Send + 'static,


async fn create_source_inner( &self, session: &Session, plans: Vec<CreateSourcePlanBundle>, ) -> Result<CreateSourceInner, AdapterError>


pub(crate) fn plan_subsource( &self, session: &Session, params: &Params, subsource_stmt: CreateSubsourceStatement<Aug>, item_id: CatalogItemId, global_id: GlobalId, ) -> Result<CreateSourcePlanBundle, AdapterError>

Subsources are planned differently from other statements because they are typically synthesized from other statements, e.g. CREATE SOURCE. Because of this, we have usually “missed” the opportunity to plan them through the normal statement execution life cycle (the exception being during bootstrapping).

The caller needs to provide a CatalogItemId and GlobalId for the sub-source.


pub(crate) async fn plan_purified_alter_source_add_subsource( &mut self, session: &Session, params: Params, source_name: ResolvedItemName, options: Vec<AlterSourceAddSubsourceOption<Aug>>, subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>, ) -> Result<(Plan, ResolvedIds), AdapterError>



pub(crate) fn plan_purified_alter_source_refresh_references( &self, _session: &Session, _params: Params, source_name: ResolvedItemName, available_source_references: SourceReferences, ) -> Result<(Plan, ResolvedIds), AdapterError>



pub(crate) async fn plan_purified_create_source( &mut self, ctx: &ExecuteContext, params: Params, progress_stmt: CreateSubsourceStatement<Aug>, source_stmt: CreateSourceStatement<Aug>, subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>, available_source_references: SourceReferences, ) -> Result<(Plan, ResolvedIds), AdapterError>

Prepares a CREATE SOURCE statement to create its progress subsource, the primary source, and any ingestion export subsources (e.g. PG tables).


pub(super) async fn sequence_create_source( &mut self, session: &mut Session, plans: Vec<CreateSourcePlanBundle>, ) -> Result<ExecuteResponse, AdapterError>


pub(super) async fn sequence_create_connection( &mut self, ctx: ExecuteContext, plan: CreateConnectionPlan, resolved_ids: ResolvedIds, )


pub(crate) async fn sequence_create_connection_stage_finish( &mut self, session: &mut Session, connection_id: CatalogItemId, connection_gid: GlobalId, plan: CreateConnectionPlan, resolved_ids: ResolvedIds, ) -> Result<ExecuteResponse, AdapterError>


pub(super) async fn sequence_create_database( &mut self, session: &mut Session, plan: CreateDatabasePlan, ) -> Result<ExecuteResponse, AdapterError>


pub(super) async fn sequence_create_schema( &mut self, session: &mut Session, plan: CreateSchemaPlan, ) -> Result<ExecuteResponse, AdapterError>


pub(super) async fn sequence_create_role( &mut self, conn_id: Option<&ConnectionId>, __arg2: CreateRolePlan, ) -> Result<ExecuteResponse, AdapterError>


pub(super) async fn sequence_create_network_policy( &mut self, session: &Session, __arg2: CreateNetworkPolicyPlan, ) -> Result<ExecuteResponse, AdapterError>


pub(super) async fn sequence_alter_network_policy( &mut self, session: &Session, __arg2: AlterNetworkPolicyPlan, ) -> Result<ExecuteResponse, AdapterError>


pub(super) async fn sequence_create_table( &mut self, ctx: &mut ExecuteContext, plan: CreateTablePlan, resolved_ids: ResolvedIds, ) -> Result<ExecuteResponse, AdapterError>


pub(super) async fn sequence_create_sink( &mut self, ctx: ExecuteContext, plan: CreateSinkPlan, resolved_ids: ResolvedIds, )


pub(super) fn validate_system_column_references( &self, uses_ambiguous_columns: bool, depends_on: &BTreeSet<GlobalId>, ) -> Result<(), AdapterError>

Validates that a view definition does not contain any expressions that may lead to ambiguous column references to system tables. For example NATURAL JOIN or SELECT *.

We prevent these expressions so that we can add columns to system tables without changing the definition of the view.

Here is a bit of a hand wavy proof as to why we only need to check the immediate view definition for system objects and ambiguous column references, and not the entire dependency tree:

  • A view with no object references cannot have any ambiguous column references to a system object, because it has no system objects.
  • A view with a direct reference to a system object and a * or NATURAL JOIN will be rejected due to ambiguous column references.
  • A view with system objects but no * or NATURAL JOINs cannot have any ambiguous column references to a system object, because all column references are explicitly named.
  • A view with * or NATURAL JOINs, that doesn’t directly reference a system object cannot have any ambiguous column references to a system object, because there are no system objects in the top level view and all sub-views are guaranteed to have no ambiguous column references to system objects.

pub(super) async fn sequence_create_type( &mut self, session: &Session, plan: CreateTypePlan, resolved_ids: ResolvedIds, ) -> Result<ExecuteResponse, AdapterError>


pub(super) async fn sequence_comment_on( &mut self, session: &Session, plan: CommentPlan, ) -> Result<ExecuteResponse, AdapterError>


pub(super) async fn sequence_drop_objects( &mut self, session: &Session, __arg2: DropObjectsPlan, ) -> Result<ExecuteResponse, AdapterError>


fn validate_dropped_role_ownership( &self, session: &Session, dropped_roles: &BTreeMap<RoleId, &str>, ) -> Result<(), AdapterError>


pub(super) async fn sequence_drop_owned( &mut self, session: &Session, plan: DropOwnedPlan, ) -> Result<ExecuteResponse, AdapterError>


fn sequence_drop_common( &self, session: &Session, ids: Vec<ObjectId>, ) -> Result<DropOps, AdapterError>


pub(super) fn sequence_explain_schema( &self, _: ExplainSinkSchemaPlan, ) -> Result<ExecuteResponse, AdapterError>


pub(super) fn sequence_show_all_variables( &self, session: &Session, ) -> Result<ExecuteResponse, AdapterError>


pub(super) fn sequence_show_variable( &self, session: &Session, plan: ShowVariablePlan, ) -> Result<ExecuteResponse, AdapterError>


pub(super) async fn sequence_inspect_shard( &self, session: &Session, plan: InspectShardPlan, ) -> Result<ExecuteResponse, AdapterError>


pub(super) fn sequence_set_variable( &self, session: &mut Session, plan: SetVariablePlan, ) -> Result<ExecuteResponse, AdapterError>


pub(super) fn sequence_reset_variable( &self, session: &mut Session, plan: ResetVariablePlan, ) -> Result<ExecuteResponse, AdapterError>


pub(super) fn sequence_set_transaction( &self, session: &mut Session, plan: SetTransactionPlan, ) -> Result<ExecuteResponse, AdapterError>


fn validate_set_isolation_level( &self, session: &Session, ) -> Result<(), AdapterError>


fn validate_set_cluster(&self, session: &Session) -> Result<(), AdapterError>


pub(super) async fn sequence_end_transaction( &mut self, ctx: ExecuteContext, action: EndTransactionAction, )


async fn sequence_end_transaction_inner( &mut self, session: &mut Session, action: EndTransactionAction, ) -> Result<(Option<TransactionOps<Timestamp>>, Option<WriteLocks>), AdapterError>


pub(super) async fn sequence_side_effecting_func( &mut self, ctx: ExecuteContext, plan: SideEffectingFunc, )


pub(super) async fn determine_real_time_recent_timestamp( &self, session: &Session, source_ids: impl Iterator<Item = CatalogItemId>, ) -> Result<Option<BoxFuture<'static, Result<Timestamp, StorageError<Timestamp>>>>, AdapterError>

Checks to see if the session needs a real time recency timestamp and if so returns a future that will return the timestamp.


pub(super) async fn sequence_explain_plan( &mut self, ctx: ExecuteContext, plan: ExplainPlanPlan, target_cluster: TargetCluster, )


pub(super) async fn sequence_explain_pushdown( &mut self, ctx: ExecuteContext, plan: ExplainPushdownPlan, target_cluster: TargetCluster, )


async fn render_explain_pushdown( &self, ctx: ExecuteContext, as_of: Antichain<Timestamp>, mz_now: ResultSpec<'static>, read_holds: Option<ReadHolds<Timestamp>>, imports: impl IntoIterator<Item = (GlobalId, MapFilterProject)> + 'static, )


async fn render_explain_pushdown_prepare( &self, session: &Session, as_of: Antichain<Timestamp>, mz_now: ResultSpec<'static>, imports: impl IntoIterator<Item = (GlobalId, MapFilterProject)>, ) -> impl Future<Output = Result<ExecuteResponse, AdapterError>>


pub(super) async fn sequence_insert( &mut self, ctx: ExecuteContext, plan: InsertPlan, )


pub(super) async fn sequence_read_then_write( &mut self, ctx: ExecuteContext, plan: ReadThenWritePlan, )

ReadThenWrite is a plan whose writes depend on the results of a read. This works by doing a Peek then queuing a SendDiffs. No writes or read-then-writes can occur between the Peek and SendDiff otherwise a serializability violation could occur.


pub(super) async fn sequence_alter_item_rename( &mut self, session: &mut Session, plan: AlterItemRenamePlan, ) -> Result<ExecuteResponse, AdapterError>


pub(super) async fn sequence_alter_retain_history( &mut self, session: &mut Session, plan: AlterRetainHistoryPlan, ) -> Result<ExecuteResponse, AdapterError>


pub(super) async fn sequence_alter_schema_rename( &mut self, session: &mut Session, plan: AlterSchemaRenamePlan, ) -> Result<ExecuteResponse, AdapterError>


pub(super) async fn sequence_alter_schema_swap( &mut self, session: &mut Session, plan: AlterSchemaSwapPlan, ) -> Result<ExecuteResponse, AdapterError>


pub(super) async fn sequence_alter_role( &mut self, session: &Session, __arg2: AlterRolePlan, ) -> Result<ExecuteResponse, AdapterError>


pub(super) async fn sequence_alter_sink_prepare( &mut self, ctx: ExecuteContext, plan: AlterSinkPlan, )


pub async fn sequence_alter_sink_finish(&mut self, ctx: AlterSinkReadyContext)


pub(super) async fn sequence_alter_connection( &mut self, ctx: ExecuteContext, __arg2: AlterConnectionPlan, )


async fn sequence_alter_connection_options( &mut self, ctx: ExecuteContext, id: CatalogItemId, set_options: BTreeMap<ConnectionOptionName, Option<WithOptionValue<Aug>>>, drop_options: BTreeSet<ConnectionOptionName>, validate: bool, )


pub(crate) async fn sequence_alter_connection_stage_finish( &mut self, session: &mut Session, id: CatalogItemId, connection: Connection, ) -> Result<ExecuteResponse, AdapterError>


pub(super) async fn sequence_alter_source( &mut self, session: &Session, __arg2: AlterSourcePlan, ) -> Result<ExecuteResponse, AdapterError>


pub(super) async fn sequence_alter_system_set( &mut self, session: &Session, __arg2: AlterSystemSetPlan, ) -> Result<ExecuteResponse, AdapterError>


pub(super) async fn sequence_alter_system_reset( &mut self, session: &Session, __arg2: AlterSystemResetPlan, ) -> Result<ExecuteResponse, AdapterError>


pub(super) async fn sequence_alter_system_reset_all( &mut self, session: &Session, _: AlterSystemResetAllPlan, ) -> Result<ExecuteResponse, AdapterError>


fn is_user_allowed_to_alter_system( &self, session: &Session, var_name: Option<&str>, ) -> Result<(), AdapterError>


fn validate_alter_system_network_policy( &self, session: &Session, policy_value: &VariableValue, ) -> Result<(), AdapterError>


fn validate_alter_network_policy( &self, session: &Session, policy_rules: &Vec<NetworkPolicyRule>, ) -> Result<(), AdapterError>

Validates that a set of NetworkPolicyRules is valid for the current Session.

This helps prevent users from modifying network policies in a way that would lock out their current connection.


pub(super) fn sequence_execute( &mut self, session: &mut Session, plan: ExecutePlan, ) -> Result<String, AdapterError>


pub(super) async fn sequence_grant_privileges( &mut self, session: &Session, __arg2: GrantPrivilegesPlan, ) -> Result<ExecuteResponse, AdapterError>


pub(super) async fn sequence_revoke_privileges( &mut self, session: &Session, __arg2: RevokePrivilegesPlan, ) -> Result<ExecuteResponse, AdapterError>


async fn sequence_update_privileges( &mut self, session: &Session, update_privileges: Vec<UpdatePrivilege>, grantees: Vec<RoleId>, variant: UpdatePrivilegeVariant, ) -> Result<ExecuteResponse, AdapterError>


pub(super) async fn sequence_alter_default_privileges( &mut self, session: &Session, __arg2: AlterDefaultPrivilegesPlan, ) -> Result<ExecuteResponse, AdapterError>


pub(super) async fn sequence_grant_role( &mut self, session: &Session, __arg2: GrantRolePlan, ) -> Result<ExecuteResponse, AdapterError>


pub(super) async fn sequence_revoke_role( &mut self, session: &Session, __arg2: RevokeRolePlan, ) -> Result<ExecuteResponse, AdapterError>


pub(super) async fn sequence_alter_owner( &mut self, session: &Session, __arg2: AlterOwnerPlan, ) -> Result<ExecuteResponse, AdapterError>


pub(super) async fn sequence_reassign_owned( &mut self, session: &Session, __arg2: ReassignOwnedPlan, ) -> Result<ExecuteResponse, AdapterError>


pub(crate) async fn handle_deferred_statement(&mut self)


pub(super) async fn sequence_alter_table( &mut self, session: &Session, plan: AlterTablePlan, ) -> Result<ExecuteResponse, AdapterError>


impl Coordinator


pub(super) async fn statistics_oracle( &self, session: &Session, source_ids: &BTreeSet<GlobalId>, query_as_of: &Antichain<Timestamp>, is_oneshot: bool, ) -> Result<Box<dyn StatisticsOracle>, AdapterError>


impl Coordinator


fn emit_optimizer_notices( &self, session: &Session, notices: &Vec<RawOptimizerNotice>, )

Forward notices that we got from the optimizer.


async fn process_dataflow_metainfo( &mut self, df_meta: DataflowMetainfo, export_id: GlobalId, session: &Session, notice_ids: Vec<GlobalId>, ) -> Option<Pin<Box<dyn Future<Output = ()> + Send + Sync + 'static>>>

Process the metainfo from a newly created non-transient dataflow.


impl Coordinator


pub(crate) fn sequence_plan( &mut self, ctx: ExecuteContext, plan: Plan, resolved_ids: ResolvedIds, ) -> LocalBoxFuture<'_, ()>

BOXED FUTURE: As of Nov 2023 the returned Future from this function was 34KB. This would get stored on the stack which is bad for runtime performance, and blow up our stack usage. Because of that we purposefully move this Future onto the heap (i.e. Box it).


pub(crate) async fn sequence_execute_single_statement_transaction( &mut self, ctx: ExecuteContext, stmt: Arc<Statement<Raw>>, params: Params, )


pub(crate) async fn sequence_create_role_for_startup( &mut self, plan: CreateRolePlan, ) -> Result<ExecuteResponse, AdapterError>

Creates a role during connection startup.

This should not be called from anywhere except connection startup.


pub(crate) fn allocate_transient_id(&self) -> (CatalogItemId, GlobalId)


fn should_emit_rbac_notice(&self, session: &Session) -> Option<AdapterNotice>


pub(crate) fn insert_constant( catalog: &Catalog, session: &mut Session, id: CatalogItemId, constants: MirRelationExpr, ) -> Result<ExecuteResponse, AdapterError>


pub(crate) fn send_diffs( session: &mut Session, plan: SendDiffsPlan, ) -> Result<ExecuteResponse, AdapterError>


impl Coordinator


pub(crate) fn plan_statement( &self, session: &Session, stmt: Statement<Aug>, params: &Params, resolved_ids: &ResolvedIds, ) -> Result<Plan, AdapterError>


pub(crate) fn declare( &self, ctx: ExecuteContext, name: String, stmt: Statement<Raw>, sql: String, params: Params, )


fn declare_inner( session: &mut Session, catalog: &Catalog, name: String, stmt: Statement<Raw>, sql: String, params: Params, now: EpochMillis, ) -> Result<(), AdapterError>


pub(crate) fn describe( catalog: &Catalog, session: &Session, stmt: Option<Statement<Raw>>, param_types: Vec<Option<ScalarType>>, ) -> Result<StatementDesc, AdapterError>


pub(crate) fn verify_prepared_statement( catalog: &Catalog, session: &mut Session, name: &str, ) -> Result<(), AdapterError>

Verify a prepared statement is still valid. This will return an error if the catalog’s revision has changed and the statement now produces a different type than its original.


pub(crate) fn verify_portal( &self, session: &mut Session, name: &str, ) -> Result<(), AdapterError>

Verify a portal is still valid.


fn verify_statement_revision( catalog: &Catalog, session: &Session, stmt: Option<&Statement<Raw>>, desc: &StatementDesc, catalog_revision: u64, ) -> Result<Option<u64>, AdapterError>

If the catalog and portal revisions don’t match, re-describe the statement and ensure its result type has not changed. Return Some(x) with the new (valid) revision if its plan has changed. Return None if the revisions match. Return an error if the plan has changed.


pub(crate) async fn clear_transaction( &mut self, session: &mut Session, ) -> TransactionStatus<Timestamp>

Handle removing in-progress transaction state regardless of the end action of the transaction.


pub(crate) async fn clear_connection(&mut self, conn_id: &ConnectionId)

Clears coordinator state for a connection.


pub(crate) async fn add_active_compute_sink( &mut self, id: GlobalId, active_sink: ActiveComputeSink, ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync + 'static>>

Adds coordinator bookkeeping for an active compute sink.

This is a low-level method. The caller is responsible for installing the sink in the controller.


pub(crate) async fn remove_active_compute_sink( &mut self, id: GlobalId, ) -> Option<ActiveComputeSink>

Removes coordinator bookkeeping for an active compute sink.

This is a low-level method. The caller is responsible for dropping the sink from the controller. Consider calling drop_compute_sink or retire_compute_sink instead.


impl Coordinator


pub(crate) async fn bootstrap( &mut self, boot_ts: Timestamp, migrated_storage_collections_0dt: BTreeSet<CatalogItemId>, builtin_table_updates: Vec<BuiltinTableUpdate>, cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>, uncached_local_exprs: BTreeMap<GlobalId, LocalExpressions>, audit_logs_iterator: AuditLogIterator, ) -> Result<(), AdapterError>

Initializes coordinator state based on the contained catalog. Must be called after creating the coordinator and before calling the Coordinator::serve method.


async fn bootstrap_tables( &mut self, entries: &[CatalogEntry], builtin_table_updates: Vec<BuiltinTableUpdate>, audit_logs_iterator: AuditLogIterator, )

Prepares tables for writing by resetting them to a known state and appending the given builtin table updates. The timestamp oracle will be advanced to the write timestamp of the append when this method returns.


fn bootstrap_audit_log_table<'a>( &mut self, table_id: CatalogItemId, name: &'a QualifiedItemName, table: &'a Table, audit_logs_iterator: AuditLogIterator, read_ts: Timestamp, ) -> JoinHandle<Vec<StateUpdate>>

Prepare updates to the audit log table. The audit log table append only and very large, so we only need to find the events present in audit_logs_iterator but not in the audit log table.


async fn bootstrap_storage_collections( &mut self, migrated_storage_collections: &BTreeSet<CatalogItemId>, )

Initializes all storage collections required by catalog objects in the storage controller.

This method takes care of collection creation, as well as migration of existing collections.

Creating all storage collections in a single create_collections call, rather than on demand, is more efficient as it reduces the number of writes to durable storage. It also allows subsequent bootstrap logic to fetch metadata (such as frontiers) of arbitrary storage collections, without needing to worry about dependency order.

migrated_storage_collections is a set of builtin storage collections that have been migrated and should be handled specially.


async fn bootstrap_builtin_continual_tasks( &mut self, collections: Vec<(GlobalId, CollectionDescription<Timestamp>)>, )

Make as_of selection happy for builtin CTs. Ideally we’d write the initial as_of down in the durable catalog, but that’s hard because of boot ordering. Instead, we set the since of the storage collection to something that’s a reasonable lower bound for the as_of. Then, if the upper is 0, the as_of selection code will allow us to jump it forward to this since.


fn bootstrap_dataflow_plans( &mut self, ordered_catalog_entries: &[CatalogEntry], cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>, ) -> Result<BTreeMap<GlobalId, GlobalExpressions>, AdapterError>

Invokes the optimizer on all indexes and materialized views in the catalog and inserts the resulting dataflow plans into the catalog state.

ordered_catalog_entries must be sorted in dependency order, with dependencies ordered before their dependants.

This method does not perform timestamp selection for the dataflows, nor does it create them in the compute controller. Both of these steps happen later during bootstrapping.

Returns a map of expressions that were not cached.


async fn bootstrap_dataflow_as_ofs( &mut self, ) -> BTreeMap<GlobalId, ReadHold<Timestamp>>

Selects for each compute dataflow an as-of suitable for bootstrapping it.

Returns a set of ReadHolds that ensures the read frontiers of involved collections stay in place and that must not be dropped before all compute dataflows have been created with the compute controller.

This method expects all storage collections and dataflow plans to be available, so it must run after Coordinator::bootstrap_storage_collections and Coordinator::bootstrap_dataflow_plans.


fn serve( self, internal_cmd_rx: UnboundedReceiver<Message>, strict_serializable_reads_rx: UnboundedReceiver<(ConnectionId, PendingReadTxn)>, cmd_rx: UnboundedReceiver<(OpenTelemetryContext, Command)>, group_commit_rx: GroupCommitWaiter, ) -> LocalBoxFuture<'static, ()>

Serves the coordinator, receiving commands from users over cmd_rx and feedback from dataflow workers over feedback_rx.

You must call bootstrap before calling this method.

BOXED FUTURE: As of Nov 2023 the returned Future from this function was 92KB. This would get stored on the stack which is bad for runtime performance, and blow up our stack usage. Because of that we purposefully move this Future onto the heap (i.e. Box it).


fn catalog(&self) -> &Catalog

Obtain a read-only Catalog reference.


fn owned_catalog(&self) -> Arc<Catalog>

Obtain a read-only Catalog snapshot, suitable for giving out to non-Coordinator thread tasks.


fn optimizer_metrics(&self) -> OptimizerMetrics

Obtain a handle to the optimizer metrics, suitable for giving out to non-Coordinator thread tasks.


fn catalog_mut(&mut self) -> &mut Catalog

Obtain a writeable Catalog reference.


fn connection_context(&self) -> &ConnectionContext

Obtain a reference to the coordinator’s connection context.


fn secrets_reader(&self) -> &Arc<dyn SecretsReader>

Obtain a reference to the coordinator’s secret reader, in an Arc.


pub(crate) fn broadcast_notice(&self, notice: AdapterNotice)

Publishes a notice message to all sessions.

TODO(parkmycar): This code is dead, but is a nice parallel to Coordinator::broadcast_notice_tx so we keep it around.


pub(crate) fn broadcast_notice_tx( &self, ) -> Box<dyn FnOnce(AdapterNotice) + Send + 'static>

Returns a closure that will publish a notice to all sessions that were active at the time this method was called.


pub(crate) fn active_conns(&self) -> &BTreeMap<ConnectionId, ConnMeta>


pub(crate) fn retire_execution( &mut self, reason: StatementEndedExecutionReason, ctx_extra: ExecuteContextExtra, )


pub fn dataflow_builder( &self, instance: ComputeInstanceId, ) -> DataflowBuilder<'_>

Creates a new dataflow builder from the catalog and indexes in self.


pub fn instance_snapshot( &self, id: ComputeInstanceId, ) -> Result<ComputeInstanceSnapshot, InstanceMissing>

Return a reference-less snapshot to the indicated compute instance.


pub(crate) async fn ship_dataflow( &mut self, dataflow: DataflowDescription<Plan>, instance: ComputeInstanceId, subscribe_target_replica: Option<ReplicaId>, )

Call into the compute controller to install a finalized dataflow, and initialize the read policies for its exported readable objects.


pub(crate) async fn ship_dataflow_and_notice_builtin_table_updates( &mut self, dataflow: DataflowDescription<Plan>, instance: ComputeInstanceId, notice_builtin_updates_fut: Option<Pin<Box<dyn Future<Output = ()> + Send + Sync + 'static>>>, )

Like ship_dataflow, but also await on builtin table updates.


pub fn install_compute_watch_set( &mut self, conn_id: ConnectionId, objects: BTreeSet<GlobalId>, t: Timestamp, state: WatchSetResponse, )

Install a watch set in the controller that is automatically associated with the given connection id. The watchset will be automatically cleared if the connection terminates before the watchset completes.


pub fn install_storage_watch_set( &mut self, conn_id: ConnectionId, objects: BTreeSet<GlobalId>, t: Timestamp, state: WatchSetResponse, )

Install a watch set in the controller that is automatically associated with the given connection id. The watchset will be automatically cleared if the connection terminates before the watchset completes.


pub fn cancel_pending_watchsets(&mut self, conn_id: &ConnectionId)

Cancels pending watchsets associated with the provided connection id.


pub async fn dump(&self) -> Result<Value, Error>

Returns the state of the Coordinator formatted as JSON.

The returned value is not guaranteed to be stable and may change at any point in time.


async fn prune_storage_usage_events_on_startup( &self, retention_period: Duration, )

Prune all storage usage events from the MZ_STORAGE_USAGE_BY_SHARD table that are older than retention_period.

This method will read the entire contents of MZ_STORAGE_USAGE_BY_SHARD into memory which can be expensive.

DO NOT call this method outside of startup. The safety of reading at the current oracle read timestamp and then writing at whatever the current write timestamp is (instead of read_ts + 1) relies on the fact that there are no outstanding writes during startup.

Group commit, which this method uses to write the retractions, has builtin fencing, and we never commit retractions to MZ_STORAGE_USAGE_BY_SHARD outside of this method, which is only called once during startup. So we don’t have to worry about double/invalid retractions.

Trait Implementations§


impl Debug for Coordinator


fn fmt(&self, __f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

impl TimestampProvider for Coordinator


fn compute_read_frontier( &self, instance: ComputeInstanceId, id: GlobalId, ) -> Antichain<Timestamp>

Reports a collection’s current read frontier.


fn compute_write_frontier( &self, instance: ComputeInstanceId, id: GlobalId, ) -> Antichain<Timestamp>

Reports a collection’s current write frontier.


fn storage_frontiers( &self, ids: Vec<GlobalId>, ) -> Vec<(GlobalId, Antichain<Timestamp>, Antichain<Timestamp>)>

Returns the implied capability (since) and write frontier (upper) for the specified storage collections.

fn acquire_read_holds( &self, id_bundle: &CollectionIdBundle, ) -> ReadHolds<Timestamp>

Acquires ReadHolds, for the given id_bundle at the earliest possible times.

fn catalog_state(&self) -> &CatalogState


fn get_timeline(timeline_context: &TimelineContext) -> Option<Timeline>


fn needs_linearized_read_ts( isolation_level: &IsolationLevel, when: &QueryWhen, ) -> bool

Returns true if-and-only-if the given configuration needs a linearized read timetamp from a timestamp oracle. Read more

fn determine_timestamp_for( &self, session: &Session, id_bundle: &CollectionIdBundle, when: &QueryWhen, compute_instance: ComputeInstanceId, timeline_context: &TimelineContext, oracle_read_ts: Option<Timestamp>, real_time_recency_ts: Option<Timestamp>, isolation_level: &IsolationLevel, ) -> Result<(TimestampDetermination<Timestamp>, ReadHolds<Timestamp>), AdapterError>

Determines the timestamp for a query. Read more

fn least_valid_read( &self, read_holds: &ReadHolds<Timestamp>, ) -> Antichain<Timestamp>

The smallest common valid read frontier among times in the given ReadHolds.

fn least_valid_write( &self, id_bundle: &CollectionIdBundle, ) -> Antichain<Timestamp>

The smallest common valid write frontier among the specified collections. Read more

fn greatest_available_read( &self, id_bundle: &CollectionIdBundle, ) -> Antichain<Timestamp>

Returns least_valid_write - 1, i.e., each time in least_valid_write stepped back in a saturating way.

Auto Trait Implementations§

Blanket Implementations§


impl<T> Any for T
where T: 'static + ?Sized,


fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more

impl<T> AsAny for T
where T: Any,


fn as_any(&self) -> &(dyn Any + 'static)


impl<T> Borrow<T> for T
where T: ?Sized,


fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more

impl<T> BorrowMut<T> for T
where T: ?Sized,


fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more

impl<T, U> CastInto<U> for T
where U: CastFrom<T>,


fn cast_into(self) -> U

Performs the cast.

impl<T> Conv for T


fn conv<T>(self) -> T
where Self: Into<T>,

Converts self into T using Into<T>. Read more

impl<T> CopyAs<T> for T


fn copy_as(self) -> T


impl<T> FmtForward for T


fn fmt_binary(self) -> FmtBinary<Self>
where Self: Binary,

Causes self to use its Binary implementation when Debug-formatted.

fn fmt_display(self) -> FmtDisplay<Self>
where Self: Display,

Causes self to use its Display implementation when Debug-formatted.

fn fmt_lower_exp(self) -> FmtLowerExp<Self>
where Self: LowerExp,

Causes self to use its LowerExp implementation when Debug-formatted.

fn fmt_lower_hex(self) -> FmtLowerHex<Self>
where Self: LowerHex,

Causes self to use its LowerHex implementation when Debug-formatted.

fn fmt_octal(self) -> FmtOctal<Self>
where Self: Octal,

Causes self to use its Octal implementation when Debug-formatted.

fn fmt_pointer(self) -> FmtPointer<Self>
where Self: Pointer,

Causes self to use its Pointer implementation when Debug-formatted.

fn fmt_upper_exp(self) -> FmtUpperExp<Self>
where Self: UpperExp,

Causes self to use its UpperExp implementation when Debug-formatted.

fn fmt_upper_hex(self) -> FmtUpperHex<Self>
where Self: UpperHex,

Causes self to use its UpperHex implementation when Debug-formatted.

fn fmt_list(self) -> FmtList<Self>
where &'a Self: for<'a> IntoIterator,

Formats each item in a sequence. Read more

impl<T> From<T> for T


fn from(t: T) -> T

Returns the argument unchanged.


impl<T> FutureExt for T


fn with_context(self, otel_cx: Context) -> WithContext<Self>

Attaches the provided Context to this type, returning a WithContext wrapper. Read more

fn with_current_context(self) -> WithContext<Self>

Attaches the current Context to this type, returning a WithContext wrapper. Read more

impl<T> Instrument for T


fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more

impl<T, U> Into<U> for T
where U: From<T>,


fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.


impl<T> IntoEither for T


fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more

impl<T> IntoRequest<T> for T


fn into_request(self) -> Request<T>

Wrap the input message T in a tonic::Request

impl<Unshared, Shared> IntoShared<Shared> for Unshared
where Shared: FromUnshared<Unshared>,


fn into_shared(self) -> Shared

Creates a shared type from an unshared type.

impl<T, U> OverrideFrom<Option<&T>> for U
where U: OverrideFrom<T>,


fn override_from(self, layer: &Option<&T>) -> U

Override the configuration represented by Self with values from the given layer.

impl<T> Pipe for T
where T: ?Sized,


fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> R
where Self: Sized,

Pipes by value. This is generally the method you want to use. Read more

fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> R
where R: 'a,

Borrows self and passes that borrow into the pipe function. Read more

fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> R
where R: 'a,

Mutably borrows self and passes that borrow into the pipe function. Read more

fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
where Self: Borrow<B>, B: 'a + ?Sized, R: 'a,

Borrows self, then passes self.borrow() into the pipe function. Read more

fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R, ) -> R
where Self: BorrowMut<B>, B: 'a + ?Sized, R: 'a,

Mutably borrows self, then passes self.borrow_mut() into the pipe function. Read more

fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
where Self: AsRef<U>, U: 'a + ?Sized, R: 'a,

Borrows self, then passes self.as_ref() into the pipe function.

fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
where Self: AsMut<U>, U: 'a + ?Sized, R: 'a,

Mutably borrows self, then passes self.as_mut() into the pipe function.

fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
where Self: Deref<Target = T>, T: 'a + ?Sized, R: 'a,

Borrows self, then passes self.deref() into the pipe function.

fn pipe_deref_mut<'a, T, R>( &'a mut self, func: impl FnOnce(&'a mut T) -> R, ) -> R
where Self: DerefMut<Target = T> + Deref, T: 'a + ?Sized, R: 'a,

Mutably borrows self, then passes self.deref_mut() into the pipe function.

impl<T> Pointable for T


const ALIGN: usize = _

The alignment of pointer.

type Init = T

The type for initializers.

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more

impl<P, R> ProtoType<R> for P
where R: RustType<P>,


impl<T> Same for T


type Output = T

Should always be Self

impl<'a, S, T> Semigroup<&'a S> for T
where T: Semigroup<S>,


fn plus_equals(&mut self, rhs: &&'a S)

The method of std::ops::AddAssign, for types that do not implement AddAssign.

impl<T> Tap for T


fn tap(self, func: impl FnOnce(&Self)) -> Self

Immutable access to a value. Read more

fn tap_mut(self, func: impl FnOnce(&mut Self)) -> Self

Mutable access to a value. Read more

fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
where Self: Borrow<B>, B: ?Sized,

Immutable access to the Borrow<B> of a value. Read more

fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
where Self: BorrowMut<B>, B: ?Sized,

Mutable access to the BorrowMut<B> of a value. Read more

fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
where Self: AsRef<R>, R: ?Sized,

Immutable access to the AsRef<R> view of a value. Read more

fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
where Self: AsMut<R>, R: ?Sized,

Mutable access to the AsMut<R> view of a value. Read more

fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
where Self: Deref<Target = T>, T: ?Sized,

Immutable access to the Deref::Target of a value. Read more

fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
where Self: DerefMut<Target = T> + Deref, T: ?Sized,

Mutable access to the Deref::Target of a value. Read more

fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self

Calls .tap() only in debug builds, and is erased in release builds.

fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self

Calls .tap_mut() only in debug builds, and is erased in release builds.

fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
where Self: Borrow<B>, B: ?Sized,

Calls .tap_borrow() only in debug builds, and is erased in release builds.

fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
where Self: BorrowMut<B>, B: ?Sized,

Calls .tap_borrow_mut() only in debug builds, and is erased in release builds.

fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
where Self: AsRef<R>, R: ?Sized,

Calls .tap_ref() only in debug builds, and is erased in release builds.

fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
where Self: AsMut<R>, R: ?Sized,

Calls .tap_ref_mut() only in debug builds, and is erased in release builds.

fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
where Self: Deref<Target = T>, T: ?Sized,

Calls .tap_deref() only in debug builds, and is erased in release builds.

fn tap_deref_mut_dbg<T>(self, func: impl FnOnce(&mut T)) -> Self
where Self: DerefMut<Target = T> + Deref, T: ?Sized,

Calls .tap_deref_mut() only in debug builds, and is erased in release builds.

impl<T> TryConv for T


fn try_conv<T>(self) -> Result<T, Self::Error>
where Self: TryInto<T>,

Attempts to convert self into T using TryInto<T>. Read more

impl<T, U> TryFrom<U> for T
where U: Into<T>,


type Error = Infallible

The type returned in the event of a conversion error.

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,


type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.

impl<V, T> VZip<V> for T
where V: MultiLane<T>,


fn vzip(self) -> V


impl<T> WithSubscriber for T


fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more