Struct mz_adapter::coord::Coordinator
source · pub struct Coordinator {Show 43 fields
controller: Controller,
catalog: Arc<Catalog>,
internal_cmd_tx: UnboundedSender<Message>,
group_commit_tx: GroupCommitNotifier,
strict_serializable_reads_tx: UnboundedSender<(ConnectionId, PendingReadTxn)>,
global_timelines: BTreeMap<Timeline, TimelineState<Timestamp>>,
transient_id_gen: Arc<TransientIdGen>,
active_conns: BTreeMap<ConnectionId, ConnMeta>,
txn_read_holds: BTreeMap<ConnectionId, ReadHolds<Timestamp>>,
pending_peeks: BTreeMap<Uuid, PendingPeek>,
client_pending_peeks: BTreeMap<ConnectionId, BTreeMap<Uuid, ClusterId>>,
pending_linearize_read_txns: BTreeMap<ConnectionId, PendingReadTxn>,
active_compute_sinks: BTreeMap<GlobalId, ActiveComputeSink>,
active_webhooks: BTreeMap<CatalogItemId, WebhookAppenderInvalidator>,
staged_cancellation: BTreeMap<ConnectionId, (Sender<bool>, Receiver<bool>)>,
introspection_subscribes: BTreeMap<GlobalId, IntrospectionSubscribe>,
write_locks: BTreeMap<CatalogItemId, Arc<Mutex<()>>>,
deferred_write_ops: BTreeMap<ConnectionId, DeferredWriteOp>,
pending_writes: Vec<PendingWriteTxn>,
advance_timelines_interval: Interval,
serialized_ddl: LockedVecDeque<DeferredPlanStatement>,
secrets_controller: Arc<dyn SecretsController>,
caching_secrets_reader: CachingSecretsReader,
cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
transient_replica_metadata: BTreeMap<ReplicaId, Option<ReplicaMetadata>>,
storage_usage_client: StorageUsageClient,
storage_usage_collection_interval: Duration,
segment_client: Option<Client>,
metrics: Metrics,
optimizer_metrics: OptimizerMetrics,
tracing_handle: TracingHandle,
statement_logging: StatementLogging,
webhook_concurrency_limit: WebhookConcurrencyLimiter,
pg_timestamp_oracle_config: Option<PostgresTimestampOracleConfig>,
check_cluster_scheduling_policies_interval: Interval,
cluster_scheduling_decisions: BTreeMap<ClusterId, BTreeMap<&'static str, SchedulingDecision>>,
caught_up_check_interval: Interval,
caught_up_check: Option<CaughtUpCheckContext>,
installed_watch_sets: BTreeMap<WatchSetId, (ConnectionId, WatchSetResponse)>,
connection_watch_sets: BTreeMap<ConnectionId, BTreeSet<WatchSetId>>,
cluster_replica_statuses: ClusterReplicaStatuses,
read_only_controllers: bool,
buffered_builtin_table_updates: Option<Vec<BuiltinTableUpdate>>,
}
Expand description
Glues the external world to the Timely workers.
Fields§
§controller: Controller
The controller for the storage and compute layers.
catalog: Arc<Catalog>
The catalog in an Arc suitable for readonly references. The Arc allows
us to hand out cheap copies of the catalog to functions that can use it
off of the main coordinator thread. If the coordinator needs to mutate
the catalog, call Self::catalog_mut
, which will clone this struct member,
allowing it to be mutated here while the other off-thread references can
read their catalog as long as needed. In the future we would like this
to be a pTVC, but for now this is sufficient.
internal_cmd_tx: UnboundedSender<Message>
Channel to manage internal commands from the coordinator to itself.
group_commit_tx: GroupCommitNotifier
Notification that triggers a group commit.
strict_serializable_reads_tx: UnboundedSender<(ConnectionId, PendingReadTxn)>
Channel for strict serializable reads ready to commit.
global_timelines: BTreeMap<Timeline, TimelineState<Timestamp>>
Mechanism for totally ordering write and read timestamps, so that all reads reflect exactly the set of writes that precede them, and no writes that follow.
transient_id_gen: Arc<TransientIdGen>
A generator for transient GlobalId
s, shareable with other threads.
active_conns: BTreeMap<ConnectionId, ConnMeta>
A map from connection ID to metadata about that connection for all active connections.
txn_read_holds: BTreeMap<ConnectionId, ReadHolds<Timestamp>>
For each transaction, the read holds taken to support any performed reads.
Upon completing a transaction, these read holds should be dropped.
pending_peeks: BTreeMap<Uuid, PendingPeek>
Access to the peek fields should be restricted to methods in the peek
API.
A map from pending peek ids to the queue into which responses are sent, and
the connection id of the client that initiated the peek.
client_pending_peeks: BTreeMap<ConnectionId, BTreeMap<Uuid, ClusterId>>
A map from client connection ids to a set of all pending peeks for that client.
pending_linearize_read_txns: BTreeMap<ConnectionId, PendingReadTxn>
A map from client connection ids to pending linearize read transaction.
active_compute_sinks: BTreeMap<GlobalId, ActiveComputeSink>
A map from the compute sink ID to it’s state description.
active_webhooks: BTreeMap<CatalogItemId, WebhookAppenderInvalidator>
A map from active webhooks to their invalidation handle.
staged_cancellation: BTreeMap<ConnectionId, (Sender<bool>, Receiver<bool>)>
A map from connection ids to a watch channel that is set to true
if the connection
received a cancel request.
introspection_subscribes: BTreeMap<GlobalId, IntrospectionSubscribe>
Active introspection subscribes.
write_locks: BTreeMap<CatalogItemId, Arc<Mutex<()>>>
Locks that grant access to a specific object, populated lazily as objects are written to.
deferred_write_ops: BTreeMap<ConnectionId, DeferredWriteOp>
Plans that are currently deferred and waiting on a write lock.
pending_writes: Vec<PendingWriteTxn>
Pending writes waiting for a group commit.
advance_timelines_interval: Interval
For the realtime timeline, an explicit SELECT or INSERT on a table will bump the
table’s timestamps, but there are cases where timestamps are not bumped but
we expect the closed timestamps to advance (AS OF X
, SUBSCRIBing views over
RT sources and tables). To address these, spawn a task that forces table
timestamps to close on a regular interval. This roughly tracks the behavior
of realtime sources that close off timestamps on an interval.
For non-realtime timelines, nothing pushes the timestamps forward, so we must do it manually.
serialized_ddl: LockedVecDeque<DeferredPlanStatement>
Serialized DDL. DDL must be serialized because:
- Many of them do off-thread work and need to verify the catalog is in a valid state, but
PlanValidity
does not currently support tracking all changes. Doing that correctly seems to be more difficult than it’s worth, so we would instead re-plan and re-sequence the statements. - Re-planning a statement is hard because Coordinator and Session state is mutated at various points, and we would need to correctly reset those changes before re-planning and re-sequencing.
secrets_controller: Arc<dyn SecretsController>
Handle to secret manager that can create and delete secrets from an arbitrary secret storage engine.
caching_secrets_reader: CachingSecretsReader
A secrets reader than maintains an in-memory cache, where values have a set TTL.
cloud_resource_controller: Option<Arc<dyn CloudResourceController>>
Handle to a manager that can create and delete kubernetes resources (ie: VpcEndpoint objects)
transient_replica_metadata: BTreeMap<ReplicaId, Option<ReplicaMetadata>>
Metadata about replicas that doesn’t need to be persisted. Intended for inclusion in system tables.
None
is used as a tombstone value for replicas that have been
dropped and for which no further updates should be recorded.
storage_usage_client: StorageUsageClient
Persist client for fetching storage metadata such as size metrics.
storage_usage_collection_interval: Duration
The interval at which to collect storage usage information.
segment_client: Option<Client>
Segment analytics client.
metrics: Metrics
Coordinator metrics.
optimizer_metrics: OptimizerMetrics
Optimizer metrics.
tracing_handle: TracingHandle
Tracing handle.
statement_logging: StatementLogging
Data used by the statement logging feature.
webhook_concurrency_limit: WebhookConcurrencyLimiter
Limit for how many concurrent webhook requests we allow.
pg_timestamp_oracle_config: Option<PostgresTimestampOracleConfig>
Optional config for the Postgres-backed timestamp oracle. This is
required when postgres
is configured using the timestamp_oracle
system variable.
check_cluster_scheduling_policies_interval: Interval
Periodically asks cluster scheduling policies to make their decisions.
cluster_scheduling_decisions: BTreeMap<ClusterId, BTreeMap<&'static str, SchedulingDecision>>
This keeps the last On/Off decision for each cluster and each scheduling policy. (Clusters that have been dropped or are otherwise out of scope for automatic scheduling are periodically cleaned up from this Map.)
caught_up_check_interval: Interval
When doing 0dt upgrades/in read-only mode, periodically ask all known clusters/collections whether they are caught up.
caught_up_check: Option<CaughtUpCheckContext>
Context needed to check whether all clusters/collections have caught up. Only used during 0dt deployment, while in read-only mode.
installed_watch_sets: BTreeMap<WatchSetId, (ConnectionId, WatchSetResponse)>
Tracks the state associated with the currently installed watchsets.
connection_watch_sets: BTreeMap<ConnectionId, BTreeSet<WatchSetId>>
Tracks the currently installed watchsets for each connection.
cluster_replica_statuses: ClusterReplicaStatuses
Tracks the statuses of all cluster replicas.
read_only_controllers: bool
Whether or not to start controllers in read-only mode. This is only meant for use during development of read-only clusters and 0dt upgrades and should go away once we have proper orchestration during upgrades.
buffered_builtin_table_updates: Option<Vec<BuiltinTableUpdate>>
Updates to builtin tables that are being buffered while we are in read-only mode. We apply these all at once when coming out of read-only mode.
This is a Some
while in read-only mode and will be replaced by a
None
when we transition out of read-only mode and write out any
buffered updates.
Implementations§
source§impl Coordinator
impl Coordinator
sourcepub fn resolve_collection_id_bundle_names(
&self,
session: &Session,
id_bundle: &CollectionIdBundle,
) -> Vec<String>
pub fn resolve_collection_id_bundle_names( &self, session: &Session, id_bundle: &CollectionIdBundle, ) -> Vec<String>
Resolves the full name from the corresponding catalog entry for each item in id_bundle
.
If an item in the bundle does not exist in the catalog, it’s not included in the result.
source§impl Coordinator
impl Coordinator
sourcepub async fn implement_peek_plan(
&mut self,
ctx_extra: &mut ExecuteContextExtra,
plan: PlannedPeek,
finishing: RowSetFinishing,
compute_instance: ComputeInstanceId,
target_replica: Option<ReplicaId>,
max_result_size: u64,
max_returned_query_size: Option<u64>,
) -> Result<ExecuteResponse, AdapterError>
pub async fn implement_peek_plan( &mut self, ctx_extra: &mut ExecuteContextExtra, plan: PlannedPeek, finishing: RowSetFinishing, compute_instance: ComputeInstanceId, target_replica: Option<ReplicaId>, max_result_size: u64, max_returned_query_size: Option<u64>, ) -> Result<ExecuteResponse, AdapterError>
Implements a peek plan produced by create_plan
above.
sourcepub(crate) fn cancel_pending_peeks(&mut self, conn_id: &ConnectionId)
pub(crate) fn cancel_pending_peeks(&mut self, conn_id: &ConnectionId)
Cancel and remove all pending peeks that were initiated by the client with conn_id
.
sourcepub(crate) fn handle_peek_notification(
&mut self,
uuid: Uuid,
notification: PeekNotification,
otel_ctx: OpenTelemetryContext,
)
pub(crate) fn handle_peek_notification( &mut self, uuid: Uuid, notification: PeekNotification, otel_ctx: OpenTelemetryContext, )
Handle a peek notification and retire the corresponding execution. Does nothing for already-removed peeks.
sourcepub(crate) fn remove_pending_peek(&mut self, uuid: &Uuid) -> Option<PendingPeek>
pub(crate) fn remove_pending_peek(&mut self, uuid: &Uuid) -> Option<PendingPeek>
Clean up a peek’s state.
sourcepub(crate) fn send_immediate_rows<I>(rows: I) -> ExecuteResponse
pub(crate) fn send_immediate_rows<I>(rows: I) -> ExecuteResponse
Constructs an ExecuteResponse
that that will send some rows to the
client immediately, as opposed to asking the dataflow layer to send along
the rows after some computation.
source§impl Coordinator
impl Coordinator
pub(crate) fn spawn_statement_logging_task(&self)
pub(crate) fn drain_statement_log(&mut self)
sourcefn statement_logging_throttling_check(&mut self, cost: usize) -> Option<usize>
fn statement_logging_throttling_check(&mut self, cost: usize) -> Option<usize>
Check whether we need to do throttling (i.e., whether STATEMENT_LOGGING_TARGET_DATA_RATE
is set).
If so, actually do the check.
Returns None
if we must throttle this statement, and Some(n)
otherwise, where n
is the number of statements that were dropped due to throttling before this one.
sourcepub(crate) fn log_prepared_statement(
&mut self,
session: &mut Session,
logging: &Arc<QCell<PreparedStatementLoggingInfo>>,
) -> Option<(Option<(StatementPreparedRecord, PreparedStatementEvent)>, Uuid)>
pub(crate) fn log_prepared_statement( &mut self, session: &mut Session, logging: &Arc<QCell<PreparedStatementLoggingInfo>>, ) -> Option<(Option<(StatementPreparedRecord, PreparedStatementEvent)>, Uuid)>
Returns any statement logging events needed for a particular
prepared statement. Possibly mutates the PreparedStatementLoggingInfo
metadata.
This function does not do a sampling check, and assumes we did so in a higher layer.
It does do a throttling check, and returns None
if we must not log due to throttling.
sourcepub fn statement_execution_sample_rate(&self, session: &Session) -> f64
pub fn statement_execution_sample_rate(&self, session: &Session) -> f64
The rate at which statement execution should be sampled.
This is the value of the session var statement_logging_sample_rate
,
constrained by the system var statement_logging_max_sample_rate
.
sourcepub fn end_statement_execution(
&mut self,
id: StatementLoggingId,
reason: StatementEndedExecutionReason,
)
pub fn end_statement_execution( &mut self, id: StatementLoggingId, reason: StatementEndedExecutionReason, )
Record the end of statement execution for a statement whose beginning was logged.
It is an error to call this function for a statement whose beginning was not logged
(because it was not sampled). Requiring the opaque StatementLoggingId
type,
which is only instantiated by begin_statement_execution
if the statement is actually logged,
should prevent this.
fn pack_statement_execution_inner( record: &StatementBeganExecutionRecord, packer: &mut RowPacker<'_>, )
fn pack_statement_began_execution_update( record: &StatementBeganExecutionRecord, ) -> Row
fn pack_statement_prepared_update( record: &StatementPreparedRecord, packer: &mut RowPacker<'_>, )
fn pack_session_history_update(event: &SessionHistoryEvent) -> Row
fn pack_statement_lifecycle_event( StatementLoggingId: &StatementLoggingId, event: &StatementLifecycleEvent, when: EpochMillis, ) -> Row
pub fn pack_full_statement_execution_update( began_record: &StatementBeganExecutionRecord, ended_record: &StatementEndedExecutionRecord, ) -> Row
pub fn pack_statement_ended_execution_updates( began_record: &StatementBeganExecutionRecord, ended_record: &StatementEndedExecutionRecord, ) -> [(Row, Diff); 2]
sourcefn mutate_record<F: FnOnce(&mut StatementBeganExecutionRecord)>(
&mut self,
StatementLoggingId: StatementLoggingId,
f: F,
)
fn mutate_record<F: FnOnce(&mut StatementBeganExecutionRecord)>( &mut self, StatementLoggingId: StatementLoggingId, f: F, )
Mutate a statement execution record via the given function f
.
sourcepub fn set_statement_execution_cluster(
&mut self,
id: StatementLoggingId,
cluster_id: ClusterId,
)
pub fn set_statement_execution_cluster( &mut self, id: StatementLoggingId, cluster_id: ClusterId, )
Set the cluster_id
for a statement, once it’s known.
sourcepub fn set_statement_execution_timestamp(
&mut self,
id: StatementLoggingId,
timestamp: Timestamp,
)
pub fn set_statement_execution_timestamp( &mut self, id: StatementLoggingId, timestamp: Timestamp, )
Set the execution_timestamp
for a statement, once it’s known
pub fn set_transient_index_id( &mut self, id: StatementLoggingId, transient_index_id: GlobalId, )
sourcepub fn begin_statement_execution(
&mut self,
session: &mut Session,
params: &Params,
logging: &Arc<QCell<PreparedStatementLoggingInfo>>,
) -> Option<StatementLoggingId>
pub fn begin_statement_execution( &mut self, session: &mut Session, params: &Params, logging: &Arc<QCell<PreparedStatementLoggingInfo>>, ) -> Option<StatementLoggingId>
Possibly record the beginning of statement execution, depending on a randomly-chosen value.
If the execution beginning was indeed logged, returns a StatementLoggingId
that must be
passed to end_statement_execution
to record when it ends.
sourcepub fn begin_session_for_statement_logging(&mut self, session: &ConnMeta)
pub fn begin_session_for_statement_logging(&mut self, session: &ConnMeta)
Record a new connection event
pub fn end_session_for_statement_logging(&mut self, uuid: Uuid)
pub fn record_statement_lifecycle_event( &mut self, id: &StatementLoggingId, event: &StatementLifecycleEvent, when: EpochMillis, )
source§impl Coordinator
impl Coordinator
pub(crate) fn now(&self) -> EpochMillis
pub(crate) fn now_datetime(&self) -> DateTime<Utc>
pub(crate) fn get_timestamp_oracle( &self, timeline: &Timeline, ) -> Arc<dyn TimestampOracle<Timestamp> + Send + Sync>
sourcepub(crate) fn get_local_timestamp_oracle(
&self,
) -> Arc<dyn TimestampOracle<Timestamp> + Send + Sync>
pub(crate) fn get_local_timestamp_oracle( &self, ) -> Arc<dyn TimestampOracle<Timestamp> + Send + Sync>
Returns a TimestampOracle
used for reads and writes from/to a local input.
sourcepub(crate) async fn get_local_read_ts(&self) -> Timestamp
pub(crate) async fn get_local_read_ts(&self) -> Timestamp
Assign a timestamp for a read from a local input. Reads following writes must be at a time >= the write’s timestamp; we choose “equal to” for simplicity’s sake and to open as few new timestamps as possible.
sourcepub(crate) async fn get_local_write_ts(&mut self) -> WriteTimestamp
pub(crate) async fn get_local_write_ts(&mut self) -> WriteTimestamp
Assign a timestamp for a write to a local input and increase the local ts. Writes following reads must ensure that they are assigned a strictly larger timestamp to ensure they are not visible to any real-time earlier reads.
sourcepub(crate) async fn peek_local_write_ts(&self) -> Timestamp
pub(crate) async fn peek_local_write_ts(&self) -> Timestamp
Peek the current timestamp used for operations on local inputs. Used to determine how much to block group commits by.
sourcepub(crate) fn apply_local_write(
&self,
timestamp: Timestamp,
) -> impl Future<Output = ()> + Send + 'static
pub(crate) fn apply_local_write( &self, timestamp: Timestamp, ) -> impl Future<Output = ()> + Send + 'static
Marks a write at timestamp
as completed, using a TimestampOracle
.
sourcepub(crate) async fn get_catalog_write_ts(&mut self) -> Timestamp
pub(crate) async fn get_catalog_write_ts(&mut self) -> Timestamp
Assign a timestamp for a write to the catalog. This timestamp should have the following properties:
- Monotonically increasing.
- Greater than or equal to the current catalog upper.
- Greater than the largest write timestamp used in the epoch millisecond timeline.
In general this is fully satisfied by the getting the current write timestamp in the epoch millisecond timeline from the timestamp oracle, however, in read-only mode we cannot modify the timestamp oracle.
sourcepub(crate) async fn ensure_timeline_state<'a>(
&'a mut self,
timeline: &'a Timeline,
) -> &mut TimelineState<Timestamp>
pub(crate) async fn ensure_timeline_state<'a>( &'a mut self, timeline: &'a Timeline, ) -> &mut TimelineState<Timestamp>
Ensures that a global timeline state exists for timeline
.
sourcepub(crate) async fn ensure_timeline_state_with_initial_time<'a>(
timeline: &'a Timeline,
initially: Timestamp,
now: NowFn,
pg_oracle_config: Option<PostgresTimestampOracleConfig>,
global_timelines: &'a mut BTreeMap<Timeline, TimelineState<Timestamp>>,
read_only: bool,
) -> &'a mut TimelineState<Timestamp>
pub(crate) async fn ensure_timeline_state_with_initial_time<'a>( timeline: &'a Timeline, initially: Timestamp, now: NowFn, pg_oracle_config: Option<PostgresTimestampOracleConfig>, global_timelines: &'a mut BTreeMap<Timeline, TimelineState<Timestamp>>, read_only: bool, ) -> &'a mut TimelineState<Timestamp>
Ensures that a global timeline state exists for timeline
, with an initial time
of initially
.
sourcepub(crate) fn build_collection_id_bundle(
&self,
storage_ids: impl IntoIterator<Item = GlobalId>,
compute_ids: impl IntoIterator<Item = (ComputeInstanceId, GlobalId)>,
clusters: impl IntoIterator<Item = ComputeInstanceId>,
) -> CollectionIdBundle
pub(crate) fn build_collection_id_bundle( &self, storage_ids: impl IntoIterator<Item = GlobalId>, compute_ids: impl IntoIterator<Item = (ComputeInstanceId, GlobalId)>, clusters: impl IntoIterator<Item = ComputeInstanceId>, ) -> CollectionIdBundle
Groups together storage and compute resources into a CollectionIdBundle
sourcepub(crate) fn remove_resources_associated_with_timeline(
&mut self,
timeline: Timeline,
ids: CollectionIdBundle,
) -> bool
pub(crate) fn remove_resources_associated_with_timeline( &mut self, timeline: Timeline, ids: CollectionIdBundle, ) -> bool
Given a Timeline
and a CollectionIdBundle
, removes all of the “storage ids”
and “compute ids” in the bundle, from the timeline.
pub(crate) fn remove_compute_ids_from_timeline<I>( &mut self, ids: I, ) -> Vec<Timeline>
pub(crate) fn ids_in_timeline(&self, timeline: &Timeline) -> CollectionIdBundle
sourcepub(crate) fn validate_timeline_context<I>(
&self,
ids: I,
) -> Result<TimelineContext, AdapterError>where
I: IntoIterator<Item = GlobalId>,
pub(crate) fn validate_timeline_context<I>(
&self,
ids: I,
) -> Result<TimelineContext, AdapterError>where
I: IntoIterator<Item = GlobalId>,
Return an error if the ids are from incompatible TimelineContext
s. This should
be used to prevent users from doing things that are either meaningless
(joining data from timelines that have similar numbers with different
meanings like two separate debezium topics) or will never complete (joining
cdcv2 and realtime data).
sourcepub(crate) fn get_timeline_context(&self, id: CatalogItemId) -> TimelineContext
pub(crate) fn get_timeline_context(&self, id: CatalogItemId) -> TimelineContext
Return the TimelineContext
belonging to a CatalogItemId
, if one exists.
sourcepub(crate) fn get_timeline_context_for_global_id(
&self,
id: GlobalId,
) -> TimelineContext
pub(crate) fn get_timeline_context_for_global_id( &self, id: GlobalId, ) -> TimelineContext
Return the TimelineContext
belonging to a GlobalId
, if one exists.
sourcefn get_timeline_contexts<I>(&self, ids: I) -> BTreeSet<TimelineContext>where
I: IntoIterator<Item = CatalogItemId>,
fn get_timeline_contexts<I>(&self, ids: I) -> BTreeSet<TimelineContext>where
I: IntoIterator<Item = CatalogItemId>,
Return the TimelineContext
s belonging to a list of CatalogItemId
s, if any exist.
sourcepub fn partition_ids_by_timeline_context(
&self,
id_bundle: &CollectionIdBundle,
) -> impl Iterator<Item = (TimelineContext, CollectionIdBundle)>
pub fn partition_ids_by_timeline_context( &self, id_bundle: &CollectionIdBundle, ) -> impl Iterator<Item = (TimelineContext, CollectionIdBundle)>
Returns an iterator that partitions an id bundle by the TimelineContext
that each id
belongs to.
sourcepub(crate) fn timedomain_for<'a, I>(
&self,
uses_ids: I,
timeline_context: &TimelineContext,
conn_id: &ConnectionId,
compute_instance: ComputeInstanceId,
) -> Result<CollectionIdBundle, AdapterError>where
I: IntoIterator<Item = &'a GlobalId>,
pub(crate) fn timedomain_for<'a, I>(
&self,
uses_ids: I,
timeline_context: &TimelineContext,
conn_id: &ConnectionId,
compute_instance: ComputeInstanceId,
) -> Result<CollectionIdBundle, AdapterError>where
I: IntoIterator<Item = &'a GlobalId>,
Return the set of ids in a timedomain and verify timeline correctness.
When a user starts a transaction, we need to prevent compaction of anything they might read from. We use a heuristic of “anything in the same database schemas with the same timeline as whatever the first query is”.
pub(crate) async fn advance_timelines(&mut self)
source§impl Coordinator
impl Coordinator
pub(crate) async fn oracle_read_ts( &self, session: &Session, timeline_ctx: &TimelineContext, when: &QueryWhen, ) -> Option<Timestamp>
sourcepub(crate) fn determine_timestamp(
&self,
session: &Session,
id_bundle: &CollectionIdBundle,
when: &QueryWhen,
compute_instance: ComputeInstanceId,
timeline_context: &TimelineContext,
oracle_read_ts: Option<Timestamp>,
real_time_recency_ts: Option<Timestamp>,
) -> Result<(TimestampDetermination<Timestamp>, ReadHolds<Timestamp>), AdapterError>
pub(crate) fn determine_timestamp( &self, session: &Session, id_bundle: &CollectionIdBundle, when: &QueryWhen, compute_instance: ComputeInstanceId, timeline_context: &TimelineContext, oracle_read_ts: Option<Timestamp>, real_time_recency_ts: Option<Timestamp>, ) -> Result<(TimestampDetermination<Timestamp>, ReadHolds<Timestamp>), AdapterError>
Determines the timestamp for a query, acquires read holds that ensure the query remains executable at that time, and returns those.
The caller is responsible for eventually dropping those read holds.
sourcepub(crate) fn largest_not_in_advance_of_upper(
upper: &Antichain<Timestamp>,
) -> Timestamp
pub(crate) fn largest_not_in_advance_of_upper( upper: &Antichain<Timestamp>, ) -> Timestamp
The largest element not in advance of any object in the collection.
Times that are not greater to this frontier are complete for all collections identified as arguments.
pub(crate) fn evaluate_when( catalog: &CatalogState, timestamp: MirScalarExpr, session: &Session, ) -> Result<Timestamp, AdapterError>
source§impl Coordinator
impl Coordinator
sourcepub(crate) fn trigger_group_commit(&mut self)
pub(crate) fn trigger_group_commit(&mut self)
Send a message to the Coordinate to start a group commit.
sourcepub(crate) async fn try_deferred(
&mut self,
conn_id: ConnectionId,
acquired_lock: Option<(CatalogItemId, OwnedMutexGuard<()>)>,
)
pub(crate) async fn try_deferred( &mut self, conn_id: ConnectionId, acquired_lock: Option<(CatalogItemId, OwnedMutexGuard<()>)>, )
Tries to execute a previously DeferredWriteOp
that requires write locks.
If we can’t acquire all of the write locks then we’ll defer the plan again and wait for the necessary locks to become available.
sourcepub(crate) async fn try_group_commit(
&mut self,
permit: Option<GroupCommitPermit>,
)
pub(crate) async fn try_group_commit( &mut self, permit: Option<GroupCommitPermit>, )
Attempts to commit all pending write transactions in a group commit. If the timestamp
chosen for the writes is not ahead of now()
, then we can execute and commit the writes
immediately. Otherwise we must wait for now()
to advance past the timestamp chosen for the
writes.
sourcepub(crate) async fn group_commit(
&mut self,
permit: Option<GroupCommitPermit>,
) -> Timestamp
pub(crate) async fn group_commit( &mut self, permit: Option<GroupCommitPermit>, ) -> Timestamp
Tries to commit all pending writes transactions at the same timestamp.
If the caller of this function has the write_lock
acquired, then they can optionally pass
it in to this method. If the caller does not have the write_lock
acquired and the
write_lock
is currently locked by another operation, then only writes to system tables
and table advancements will be applied. If the caller does not have the write_lock
acquired and the write_lock
is not currently locked by another operation, then group
commit will acquire it and all writes will be applied.
All applicable pending writes will be combined into a single Append command and sent to STORAGE as a single batch. All applicable writes will happen at the same timestamp and all involved tables will be advanced to some timestamp larger than the timestamp of the write.
Returns the timestamp of the write.
sourcepub(crate) fn submit_write(&mut self, pending_write_txn: PendingWriteTxn)
pub(crate) fn submit_write(&mut self, pending_write_txn: PendingWriteTxn)
Submit a write to be executed during the next group commit and trigger a group commit.
sourcepub(crate) fn builtin_table_update<'a>(&'a mut self) -> BuiltinTableAppend<'a>
pub(crate) fn builtin_table_update<'a>(&'a mut self) -> BuiltinTableAppend<'a>
Append some BuiltinTableUpdate
s, with various degrees of waiting and blocking.
pub(crate) fn defer_op<F>(&mut self, acquire_future: F, op: DeferredWriteOp)
sourcepub(crate) fn grant_object_write_lock(
&mut self,
object_id: CatalogItemId,
) -> impl Future<Output = (CatalogItemId, OwnedMutexGuard<()>)> + 'static
pub(crate) fn grant_object_write_lock( &mut self, object_id: CatalogItemId, ) -> impl Future<Output = (CatalogItemId, OwnedMutexGuard<()>)> + 'static
Returns a future that waits until it can get an exclusive lock on the specified collection.
sourcepub(crate) fn try_grant_object_write_lock(
&mut self,
object_id: CatalogItemId,
) -> Option<OwnedMutexGuard<()>>
pub(crate) fn try_grant_object_write_lock( &mut self, object_id: CatalogItemId, ) -> Option<OwnedMutexGuard<()>>
Lazily creates the lock for the provided object_id
, and grants it if possible, returns
None
if the lock is already held.
source§impl Coordinator
impl Coordinator
sourcepub async fn maybe_check_caught_up(&mut self)
pub async fn maybe_check_caught_up(&mut self)
Checks that all clusters/collections are caught up. If so, this will
trigger self.catchup_check.trigger
.
This method is a no-op when the trigger has already been fired.
async fn maybe_check_caught_up_new(&mut self)
sourceasync fn clusters_caught_up(
&self,
allowed_lag: Timestamp,
cutoff: Timestamp,
now: Timestamp,
live_frontiers: &BTreeMap<GlobalId, Antichain<Timestamp>>,
exclude_collections: &BTreeSet<GlobalId>,
) -> bool
async fn clusters_caught_up( &self, allowed_lag: Timestamp, cutoff: Timestamp, now: Timestamp, live_frontiers: &BTreeMap<GlobalId, Antichain<Timestamp>>, exclude_collections: &BTreeSet<GlobalId>, ) -> bool
Returns true
if all non-transient, non-excluded collections have their write
frontier (aka. upper) within allowed_lag
of the “live” frontier
reported in live_frontiers
. The “live” frontiers are frontiers as
reported by a currently running environmentd
deployment, during a 0dt
upgrade.
Collections whose write frontier is behind now
by more than the cutoff
are ignored.
For this check, zero-replica clusters are always considered caught up. Their collections would never normally be considered caught up but it’s clearly intentional that they have no replicas.
sourceasync fn collections_caught_up(
&self,
cluster: &Cluster,
allowed_lag: Timestamp,
cutoff: Timestamp,
now: Timestamp,
live_frontiers: &BTreeMap<GlobalId, Antichain<Timestamp>>,
exclude_collections: &BTreeSet<GlobalId>,
) -> Result<bool, Error>
async fn collections_caught_up( &self, cluster: &Cluster, allowed_lag: Timestamp, cutoff: Timestamp, now: Timestamp, live_frontiers: &BTreeMap<GlobalId, Antichain<Timestamp>>, exclude_collections: &BTreeSet<GlobalId>, ) -> Result<bool, Error>
Returns true
if all non-transient, non-excluded collections have their write
frontier (aka. upper) within allowed_lag
of the “live” frontier
reported in live_frontiers
. The “live” frontiers are frontiers as
reported by a currently running environmentd
deployment, during a 0dt
upgrade.
Collections whose write frontier is behind now
by more than the cutoff
are ignored.
This also returns true
in case this cluster does not have any
replicas.
async fn maybe_check_caught_up_legacy(&mut self)
source§impl Coordinator
impl Coordinator
sourcepub(crate) async fn check_scheduling_policies(&mut self)
pub(crate) async fn check_scheduling_policies(&mut self)
Call each scheduling policy.
sourcefn check_refresh_policy(&self)
fn check_refresh_policy(&self)
Runs the SCHEDULE = ON REFRESH
cluster scheduling policy, which makes cluster On/Off
decisions based on REFRESH materialized view write frontiers and the current time (the local
oracle read ts), and sends Message::SchedulingDecisions
with these decisions.
(Queries the timestamp oracle on a background task.)
sourcepub(crate) async fn handle_scheduling_decisions(
&mut self,
decisions: Vec<(&'static str, Vec<(ClusterId, SchedulingDecision)>)>,
)
pub(crate) async fn handle_scheduling_decisions( &mut self, decisions: Vec<(&'static str, Vec<(ClusterId, SchedulingDecision)>)>, )
Handles SchedulingDecisions
:
- Adds the newly made decisions to
cluster_scheduling_decisions
. - Cleans up old decisions that are for clusters no longer in scope of automated scheduling decisions.
- For each cluster, it sums up
cluster_scheduling_decisions
, checks the summed up decision against the cluster state, and turns cluster On/Off if needed.
sourcefn get_managed_cluster_config(
&self,
cluster_id: ClusterId,
) -> Option<ClusterVariantManaged>
fn get_managed_cluster_config( &self, cluster_id: ClusterId, ) -> Option<ClusterVariantManaged>
Returns the managed config for a cluster. Returns None if the cluster doesn’t exist or if it’s an unmanaged cluster.
source§impl Coordinator
impl Coordinator
sourcepub(crate) fn handle_command(&mut self, cmd: Command) -> LocalBoxFuture<'_, ()>
pub(crate) fn handle_command(&mut self, cmd: Command) -> LocalBoxFuture<'_, ()>
BOXED FUTURE: As of Nov 2023 the returned Future from this function was 58KB. This would get stored on the stack which is bad for runtime performance, and blow up our stack usage. Because of that we purposefully move this Future onto the heap (i.e. Box it).
async fn handle_startup( &mut self, tx: Sender<Result<StartupResponse, AdapterError>>, user: User, conn_id: ConnectionId, secret_key: u32, uuid: Uuid, client_ip: Option<IpAddr>, application_name: String, notice_tx: UnboundedSender<AdapterNotice>, )
async fn handle_startup_inner( &mut self, user: &User, conn_id: &ConnectionId, client_ip: &Option<IpAddr>, ) -> Result<(RoleId, BTreeMap<String, OwnedVarInput>), AdapterError>
sourcepub(crate) async fn handle_execute(
&mut self,
portal_name: String,
session: Session,
tx: ClientTransmitter<ExecuteResponse>,
outer_context: Option<ExecuteContextExtra>,
)
pub(crate) async fn handle_execute( &mut self, portal_name: String, session: Session, tx: ClientTransmitter<ExecuteResponse>, outer_context: Option<ExecuteContextExtra>, )
Handles an execute command.
pub(crate) async fn handle_execute_inner( &mut self, stmt: Arc<Statement<Raw>>, params: Params, ctx: ExecuteContext, )
sourcefn must_serialize_ddl(stmt: &Statement<Raw>, ctx: &ExecuteContext) -> bool
fn must_serialize_ddl(stmt: &Statement<Raw>, ctx: &ExecuteContext) -> bool
Whether the statement must be serialized and is DDL.
sourcefn must_spawn_purification<A: AstInfo>(stmt: &Statement<A>) -> bool
fn must_spawn_purification<A: AstInfo>(stmt: &Statement<A>) -> bool
Whether the statement must be purified off of the Coordinator thread.
sourceasync fn resolve_mz_now_for_create_materialized_view<'a>(
&mut self,
cmvs: &CreateMaterializedViewStatement<Aug>,
resolved_ids: &ResolvedIds,
session: &Session,
acquire_read_holds: bool,
) -> Result<Option<Timestamp>, AdapterError>
async fn resolve_mz_now_for_create_materialized_view<'a>( &mut self, cmvs: &CreateMaterializedViewStatement<Aug>, resolved_ids: &ResolvedIds, session: &Session, acquire_read_holds: bool, ) -> Result<Option<Timestamp>, AdapterError>
Chooses a timestamp for mz_now()
, if mz_now()
occurs in a REFRESH option of the
materialized view. Additionally, if acquire_read_holds
is true and the MV has any REFRESH
option, this function grabs read holds at the earliest possible time on input collections
that might be involved in the MV.
Note that this is NOT what handles mz_now()
in the query part of the MV. (handles it only
in with_options
).
(Note that the chosen timestamp won’t be the same timestamp as the system table inserts, unfortunately.)
sourceasync fn handle_cancel(&mut self, conn_id: ConnectionIdType, secret_key: u32)
async fn handle_cancel(&mut self, conn_id: ConnectionIdType, secret_key: u32)
Instruct the dataflow layer to cancel any ongoing, interactive work for
the named conn_id
if the correct secret key is specified.
Note: Here we take a ConnectionIdType
as opposed to an owned
ConnectionId
because this method gets called by external clients when
they request to cancel a request.
sourcepub(crate) async fn handle_privileged_cancel(&mut self, conn_id: ConnectionId)
pub(crate) async fn handle_privileged_cancel(&mut self, conn_id: ConnectionId)
Unconditionally instructs the dataflow layer to cancel any ongoing,
interactive work for the named conn_id
.
sourceasync fn handle_terminate(&mut self, conn_id: ConnectionId)
async fn handle_terminate(&mut self, conn_id: ConnectionId)
Handle termination of a client session.
This cleans up any state in the coordinator associated with the session.
sourcefn handle_get_webhook(
&mut self,
database: String,
schema: String,
name: String,
tx: Sender<Result<AppendWebhookResponse, AppendWebhookError>>,
)
fn handle_get_webhook( &mut self, database: String, schema: String, name: String, tx: Sender<Result<AppendWebhookResponse, AppendWebhookError>>, )
Returns the necessary metadata for appending to a webhook source, and a channel to send rows.
source§impl Coordinator
impl Coordinator
sourcepub fn check_consistency(&self) -> Result<(), CoordinatorInconsistencies>
pub fn check_consistency(&self) -> Result<(), CoordinatorInconsistencies>
Checks the Coordinator
to make sure we’re internally consistent.
sourcefn check_read_holds(&self) -> Result<(), Vec<ReadHoldsInconsistency>>
fn check_read_holds(&self) -> Result<(), Vec<ReadHoldsInconsistency>>
§Invariants:
- Read holds should reference known objects.
sourcefn check_active_webhooks(&self) -> Result<(), Vec<ActiveWebhookInconsistency>>
fn check_active_webhooks(&self) -> Result<(), Vec<ActiveWebhookInconsistency>>
sourcefn check_cluster_statuses(&self) -> Result<(), Vec<ClusterStatusInconsistency>>
fn check_cluster_statuses(&self) -> Result<(), Vec<ClusterStatusInconsistency>>
source§impl Coordinator
impl Coordinator
sourcepub(crate) async fn catalog_transact(
&mut self,
session: Option<&Session>,
ops: Vec<Op>,
) -> Result<(), AdapterError>
pub(crate) async fn catalog_transact( &mut self, session: Option<&Session>, ops: Vec<Op>, ) -> Result<(), AdapterError>
Same as Self::catalog_transact_conn
but takes a Session
.
sourcepub(crate) async fn catalog_transact_with_side_effects<'c, F, Fut>(
&'c mut self,
session: Option<&Session>,
ops: Vec<Op>,
side_effect: F,
) -> Result<(), AdapterError>
pub(crate) async fn catalog_transact_with_side_effects<'c, F, Fut>( &'c mut self, session: Option<&Session>, ops: Vec<Op>, side_effect: F, ) -> Result<(), AdapterError>
Same as Self::catalog_transact_conn
but takes a Session
and runs
builtin table updates concurrently with any side effects (e.g. creating
collections).
sourcepub(crate) async fn catalog_transact_conn(
&mut self,
conn_id: Option<&ConnectionId>,
ops: Vec<Op>,
) -> Result<(), AdapterError>
pub(crate) async fn catalog_transact_conn( &mut self, conn_id: Option<&ConnectionId>, ops: Vec<Op>, ) -> Result<(), AdapterError>
Same as Self::catalog_transact_inner
but awaits the table updates.
sourcepub(crate) async fn catalog_transact_with_ddl_transaction(
&mut self,
session: &mut Session,
ops: Vec<Op>,
) -> Result<(), AdapterError>
pub(crate) async fn catalog_transact_with_ddl_transaction( &mut self, session: &mut Session, ops: Vec<Op>, ) -> Result<(), AdapterError>
Executes a Catalog transaction with handling if the provided Session
is in a SQL transaction that is executing DDL.
sourcepub(crate) async fn catalog_transact_inner<'a>(
&mut self,
conn_id: Option<&ConnectionId>,
ops: Vec<Op>,
) -> Result<BoxFuture<'static, ()>, AdapterError>
pub(crate) async fn catalog_transact_inner<'a>( &mut self, conn_id: Option<&ConnectionId>, ops: Vec<Op>, ) -> Result<BoxFuture<'static, ()>, AdapterError>
Perform a catalog transaction. Coordinator::ship_dataflow
must be
called after this function successfully returns on any built
DataflowDesc
.
fn drop_replica(&mut self, cluster_id: ClusterId, replica_id: ReplicaId)
sourcefn drop_sources(&mut self, sources: Vec<(CatalogItemId, GlobalId)>)
fn drop_sources(&mut self, sources: Vec<(CatalogItemId, GlobalId)>)
A convenience method for dropping sources.
fn drop_tables(&mut self, table_gids: Vec<GlobalId>, ts: Timestamp)
fn restart_webhook_sources( &mut self, sources: impl IntoIterator<Item = CatalogItemId>, )
sourcepub async fn drop_compute_sink(
&mut self,
sink_id: GlobalId,
) -> Option<ActiveComputeSink>
pub async fn drop_compute_sink( &mut self, sink_id: GlobalId, ) -> Option<ActiveComputeSink>
Like drop_compute_sinks
, but for a single compute sink.
Returns the controller’s state for the compute sink if the identified
sink was known to the controller. It is the caller’s responsibility to
retire the returned sink. Consider using retire_compute_sinks
instead.
sourcepub async fn drop_compute_sinks(
&mut self,
sink_ids: impl IntoIterator<Item = GlobalId>,
) -> BTreeMap<GlobalId, ActiveComputeSink>
pub async fn drop_compute_sinks( &mut self, sink_ids: impl IntoIterator<Item = GlobalId>, ) -> BTreeMap<GlobalId, ActiveComputeSink>
Drops a batch of compute sinks.
For each sink that exists, the coordinator and controller’s state associated with the sink is removed.
Returns a map containing the controller’s state for each sink that was
removed. It is the caller’s responsibility to retire the returned sinks.
Consider using retire_compute_sinks
instead.
sourcepub async fn retire_compute_sinks(
&mut self,
reasons: BTreeMap<GlobalId, ActiveComputeSinkRetireReason>,
)
pub async fn retire_compute_sinks( &mut self, reasons: BTreeMap<GlobalId, ActiveComputeSinkRetireReason>, )
Retires a batch of sinks with disparate reasons for retirement.
Each sink identified in reasons
is dropped (see drop_compute_sinks
),
then retired with its corresponding reason.
sourcepub async fn drop_reconfiguration_replicas(
&mut self,
cluster_ids: BTreeSet<ClusterId>,
) -> Result<(), AdapterError>
pub async fn drop_reconfiguration_replicas( &mut self, cluster_ids: BTreeSet<ClusterId>, ) -> Result<(), AdapterError>
Drops all pending replicas for a set of clusters that are undergoing reconfiguration.
sourcepub(crate) async fn cancel_compute_sinks_for_conn(
&mut self,
conn_id: &ConnectionId,
)
pub(crate) async fn cancel_compute_sinks_for_conn( &mut self, conn_id: &ConnectionId, )
Cancels all active compute sinks for the identified connection.
sourcepub(crate) async fn cancel_cluster_reconfigurations_for_conn(
&mut self,
conn_id: &ConnectionId,
)
pub(crate) async fn cancel_cluster_reconfigurations_for_conn( &mut self, conn_id: &ConnectionId, )
Cancels all active cluster reconfigurations sinks for the identified connection.
sourcepub(crate) async fn retire_compute_sinks_for_conn(
&mut self,
conn_id: &ConnectionId,
reason: ActiveComputeSinkRetireReason,
)
pub(crate) async fn retire_compute_sinks_for_conn( &mut self, conn_id: &ConnectionId, reason: ActiveComputeSinkRetireReason, )
Retires all active compute sinks for the identified connection with the specified reason.
sourcepub(crate) async fn retire_cluster_reconfigurations_for_conn(
&mut self,
conn_id: &ConnectionId,
)
pub(crate) async fn retire_cluster_reconfigurations_for_conn( &mut self, conn_id: &ConnectionId, )
Cleans pending cluster reconfiguraiotns for the identified connection
pub(crate) fn drop_storage_sinks(&mut self, sink_gids: Vec<GlobalId>)
pub(crate) fn drop_indexes(&mut self, indexes: Vec<(ClusterId, GlobalId)>)
sourcefn drop_materialized_views(&mut self, mviews: Vec<(ClusterId, GlobalId)>)
fn drop_materialized_views(&mut self, mviews: Vec<(ClusterId, GlobalId)>)
A convenience method for dropping materialized views.
sourcefn drop_continual_tasks(
&mut self,
cts: Vec<(CatalogItemId, ClusterId, GlobalId)>,
)
fn drop_continual_tasks( &mut self, cts: Vec<(CatalogItemId, ClusterId, GlobalId)>, )
A convenience method for dropping continual tasks.
fn drop_vpc_endpoints_in_background(&self, vpc_endpoints: Vec<CatalogItemId>)
sourcepub(crate) async fn drop_temp_items(&mut self, conn_id: &ConnectionId)
pub(crate) async fn drop_temp_items(&mut self, conn_id: &ConnectionId)
Removes all temporary items created by the specified connection, though not the temporary schema itself.
fn update_cluster_scheduling_config(&self)
fn update_secrets_caching_config(&self)
fn update_tracing_config(&self)
fn update_compute_config(&mut self)
fn update_storage_config(&mut self)
fn update_pg_timestamp_oracle_config(&self)
fn update_metrics_retention(&mut self)
fn update_arrangement_exert_proportionality(&mut self)
fn update_http_config(&mut self)
pub(crate) async fn create_storage_export( &mut self, id: GlobalId, sink: &Sink, ) -> Result<(), AdapterError>
sourcefn validate_resource_limits(
&self,
ops: &Vec<Op>,
conn_id: &ConnectionId,
) -> Result<(), AdapterError>
fn validate_resource_limits( &self, ops: &Vec<Op>, conn_id: &ConnectionId, ) -> Result<(), AdapterError>
Validate all resource limits in a catalog transaction and return an error if that limit is exceeded.
sourcepub(crate) fn validate_resource_limit<F>(
&self,
current_amount: usize,
new_instances: i64,
resource_limit: F,
resource_type: &str,
limit_name: &str,
) -> Result<(), AdapterError>
pub(crate) fn validate_resource_limit<F>( &self, current_amount: usize, new_instances: i64, resource_limit: F, resource_type: &str, limit_name: &str, ) -> Result<(), AdapterError>
Validate a specific type of resource limit and return an error if that limit is exceeded.
sourcefn validate_resource_limit_numeric<F>(
&self,
current_amount: Numeric,
new_amount: Numeric,
resource_limit: F,
resource_type: &str,
limit_name: &str,
) -> Result<(), AdapterError>
fn validate_resource_limit_numeric<F>( &self, current_amount: Numeric, new_amount: Numeric, resource_limit: F, resource_type: &str, limit_name: &str, ) -> Result<(), AdapterError>
Validate a specific type of float resource limit and return an error if that limit is exceeded.
This is very similar to Self::validate_resource_limit
but for numerics.
source§impl Coordinator
impl Coordinator
sourcepub fn index_oracle(&self, instance: ComputeInstanceId) -> DataflowBuilder<'_>
pub fn index_oracle(&self, instance: ComputeInstanceId) -> DataflowBuilder<'_>
Creates a new index oracle for the specified compute instance.
source§impl Coordinator
impl Coordinator
sourcepub(super) async fn bootstrap_introspection_subscribes(&mut self)
pub(super) async fn bootstrap_introspection_subscribes(&mut self)
Installs introspection subscribes on all existing replicas.
Meant to be invoked during coordinator bootstrapping.
sourcepub(super) async fn install_introspection_subscribes(
&mut self,
cluster_id: ClusterId,
replica_id: ReplicaId,
)
pub(super) async fn install_introspection_subscribes( &mut self, cluster_id: ClusterId, replica_id: ReplicaId, )
Installs introspection subscribes on the given replica.
async fn install_introspection_subscribe( &mut self, cluster_id: ClusterId, replica_id: ReplicaId, spec: &'static SubscribeSpec, )
async fn sequence_introspection_subscribe( &mut self, subscribe_id: GlobalId, spec: &'static SubscribeSpec, cluster_id: ClusterId, replica_id: ReplicaId, )
fn sequence_introspection_subscribe_optimize_mir( &self, stage: IntrospectionSubscribeOptimizeMir, ) -> Result<StageResult<Box<IntrospectionSubscribeStage>>, AdapterError>
fn sequence_introspection_subscribe_timestamp_optimize_lir( &self, stage: IntrospectionSubscribeTimestampOptimizeLir, ) -> Result<StageResult<Box<IntrospectionSubscribeStage>>, AdapterError>
async fn sequence_introspection_subscribe_finish( &mut self, stage: IntrospectionSubscribeFinish, ) -> Result<StageResult<Box<IntrospectionSubscribeStage>>, AdapterError>
sourcepub(super) fn drop_introspection_subscribes(&mut self, replica_id: ReplicaId)
pub(super) fn drop_introspection_subscribes(&mut self, replica_id: ReplicaId)
Drops the introspection subscribes installed on the given replica.
Dropping an introspection subscribe entails:
- removing it from
Coordinator::introspection_subscribes
- dropping its compute collection
- retracting any rows previously omitted by it from its corresponding storage-managed collection
fn drop_introspection_subscribe(&mut self, id: GlobalId)
async fn reinstall_introspection_subscribe(&mut self, id: GlobalId)
sourcepub(super) async fn handle_introspection_subscribe_batch(
&mut self,
id: GlobalId,
batch: SubscribeBatch,
)
pub(super) async fn handle_introspection_subscribe_batch( &mut self, id: GlobalId, batch: SubscribeBatch, )
Processes a batch returned by an introspection subscribe.
Depending on the contents of the batch, this either appends received updates to the corresponding storage-managed collection, or reinstalls a disconnected subscribe.
source§impl Coordinator
impl Coordinator
sourcepub(crate) async fn handle_message(&mut self, msg: Message)
pub(crate) async fn handle_message(&mut self, msg: Message)
BOXED FUTURE: As of Nov 2023 the returned Future from this function was 74KB. This would get stored on the stack which is bad for runtime performance, and blow up our stack usage. Because of that we purposefully move Futures of inner function calls onto the heap (i.e. Box it).
pub async fn storage_usage_fetch(&mut self)
async fn storage_usage_update(&mut self, shards_usage: ShardsUsageReferenced)
async fn storage_usage_prune(&mut self, expired: Vec<BuiltinTableUpdate>)
pub async fn schedule_storage_usage_collection(&self)
async fn message_command(&mut self, cmd: Command)
async fn message_controller(&mut self, message: ControllerResponse)
async fn message_purified_statement_ready( &mut self, __arg1: BackgroundWorkResult<PurifiedStatement>, )
async fn message_create_connection_validation_ready( &mut self, __arg1: ValidationReady<CreateConnectionPlan>, )
async fn message_alter_connection_validation_ready( &mut self, __arg1: ValidationReady<Connection>, )
async fn message_cluster_event(&mut self, event: ClusterEvent)
sourceasync fn message_linearize_reads(&mut self)
async fn message_linearize_reads(&mut self)
Linearizes sending the results of a read transaction by,
- Holding back any results that were executed at some point in the future, until the containing timeline has advanced to that point in the future.
- Confirming that we are still the current leader before sending results to the client.
source§impl Coordinator
impl Coordinator
pub(crate) fn spawn_privatelink_vpc_endpoints_watch_task(&self)
source§impl Coordinator
impl Coordinator
sourcepub(crate) async fn initialize_storage_read_policies(
&mut self,
ids: BTreeSet<CatalogItemId>,
compaction_window: CompactionWindow,
)
pub(crate) async fn initialize_storage_read_policies( &mut self, ids: BTreeSet<CatalogItemId>, compaction_window: CompactionWindow, )
Initialize the storage read policies.
This should be called only after a storage collection is created, and ideally very soon afterwards. The collection is otherwise initialized with a read policy that allows no compaction.
sourcepub(crate) async fn initialize_compute_read_policies(
&mut self,
ids: Vec<GlobalId>,
instance: ComputeInstanceId,
compaction_window: CompactionWindow,
)
pub(crate) async fn initialize_compute_read_policies( &mut self, ids: Vec<GlobalId>, instance: ComputeInstanceId, compaction_window: CompactionWindow, )
Initialize the compute read policies.
This should be called only after a compute collection is created, and ideally very soon afterwards. The collection is otherwise initialized with a read policy that allows no compaction.
sourcepub(crate) async fn initialize_read_policies(
&mut self,
id_bundle: &CollectionIdBundle,
compaction_window: CompactionWindow,
)
pub(crate) async fn initialize_read_policies( &mut self, id_bundle: &CollectionIdBundle, compaction_window: CompactionWindow, )
Initialize the storage and compute read policies.
This should be called only after a collection is created, and ideally very soon afterwards. The collection is otherwise initialized with a read policy that allows no compaction.
pub(crate) fn update_storage_read_policies( &mut self, policies: Vec<(CatalogItemId, ReadPolicy<Timestamp>)>, )
pub(crate) fn update_compute_read_policies( &self, policies: Vec<(ComputeInstanceId, CatalogItemId, ReadPolicy<Timestamp>)>, )
pub(crate) fn update_compute_read_policy( &self, compute_instance: ComputeInstanceId, item_id: CatalogItemId, base_policy: ReadPolicy<Timestamp>, )
sourcepub(crate) fn acquire_read_holds(
&self,
id_bundle: &CollectionIdBundle,
) -> ReadHolds<Timestamp>
pub(crate) fn acquire_read_holds( &self, id_bundle: &CollectionIdBundle, ) -> ReadHolds<Timestamp>
Attempt to acquire read holds on the indicated collections at the earliest available time.
§Panics
Will panic if any of the referenced collections in id_bundle
don’t
exist.
sourcepub(crate) fn store_transaction_read_holds(
&mut self,
session: &Session,
read_holds: ReadHolds<Timestamp>,
)
pub(crate) fn store_transaction_read_holds( &mut self, session: &Session, read_holds: ReadHolds<Timestamp>, )
Stash transaction read holds. They will be released when the transaction is cleaned up.
source§impl Coordinator
impl Coordinator
pub(crate) async fn sequence_alter_cluster_staged( &mut self, ctx: ExecuteContext, plan: AlterClusterPlan, )
async fn alter_cluster_validate( &mut self, session: &Session, plan: AlterClusterPlan, ) -> Result<ClusterStage, AdapterError>
async fn sequence_alter_cluster_stage( &mut self, session: &Session, plan: AlterClusterPlan, validity: PlanValidity, ) -> Result<StageResult<Box<ClusterStage>>, AdapterError>
async fn finalize_alter_cluster_stage( &mut self, session: &Session, __arg2: AlterClusterPlan, new_config: ClusterVariantManaged, ) -> Result<StageResult<Box<ClusterStage>>, AdapterError>
async fn check_if_pending_replicas_hydrated_stage( &mut self, session: &Session, plan: AlterClusterPlan, new_config: ClusterVariantManaged, timeout_time: Instant, on_timeout: OnTimeoutAction, validity: PlanValidity, ) -> Result<StageResult<Box<ClusterStage>>, AdapterError>
pub(crate) async fn sequence_create_cluster( &mut self, session: &Session, __arg2: CreateClusterPlan, ) -> Result<ExecuteResponse, AdapterError>
async fn sequence_create_managed_cluster( &mut self, session: &Session, __arg2: CreateClusterManagedPlan, cluster_id: ClusterId, ops: Vec<Op>, ) -> Result<ExecuteResponse, AdapterError>
fn create_managed_cluster_replica_op( &self, cluster_id: ClusterId, name: String, compute: &ComputeReplicaConfig, size: &String, ops: &mut Vec<Op>, azs: Option<&[String]>, disk: bool, pending: bool, owner_id: RoleId, reason: ReplicaCreateDropReason, ) -> Result<(), AdapterError>
fn ensure_valid_azs<'a, I: IntoIterator<Item = &'a String>>( &self, azs: I, ) -> Result<(), AdapterError>
async fn sequence_create_unmanaged_cluster( &mut self, session: &Session, __arg2: CreateClusterUnmanagedPlan, id: ClusterId, ops: Vec<Op>, ) -> Result<ExecuteResponse, AdapterError>
async fn create_cluster(&mut self, cluster_id: ClusterId)
pub(crate) async fn sequence_create_cluster_replica( &mut self, session: &Session, __arg2: CreateClusterReplicaPlan, ) -> Result<ExecuteResponse, AdapterError>
async fn create_cluster_replica( &mut self, cluster_id: ClusterId, replica_id: ReplicaId, )
sourcepub(crate) async fn sequence_alter_cluster_managed_to_managed(
&mut self,
session: Option<&Session>,
cluster_id: ClusterId,
new_config: ClusterConfig,
reason: ReplicaCreateDropReason,
strategy: AlterClusterPlanStrategy,
) -> Result<NeedsFinalization, AdapterError>
pub(crate) async fn sequence_alter_cluster_managed_to_managed( &mut self, session: Option<&Session>, cluster_id: ClusterId, new_config: ClusterConfig, reason: ReplicaCreateDropReason, strategy: AlterClusterPlanStrategy, ) -> Result<NeedsFinalization, AdapterError>
When this is called by the automated cluster scheduling, scheduling_decision_reason
should
contain information on why is a cluster being turned On/Off. It will be forwarded to the
details
field of the audit log event that records creating or dropping replicas.
§Panics
Panics if the identified cluster is not a managed cluster.
Panics if new_config
is not a configuration for a managed cluster.
sourceasync fn sequence_alter_cluster_unmanaged_to_managed(
&mut self,
session: &Session,
cluster_id: ClusterId,
new_config: ClusterConfig,
options: PlanClusterOption,
) -> Result<(), AdapterError>
async fn sequence_alter_cluster_unmanaged_to_managed( &mut self, session: &Session, cluster_id: ClusterId, new_config: ClusterConfig, options: PlanClusterOption, ) -> Result<(), AdapterError>
§Panics
Panics if new_config
is not a configuration for a managed cluster.
async fn sequence_alter_cluster_managed_to_unmanaged( &mut self, session: &Session, cluster_id: ClusterId, new_config: ClusterConfig, ) -> Result<(), AdapterError>
async fn sequence_alter_cluster_unmanaged_to_unmanaged( &mut self, session: &Session, cluster_id: ClusterId, new_config: ClusterConfig, replicas: AlterOptionParameter<Vec<(String, ReplicaConfig)>>, ) -> Result<(), AdapterError>
pub(crate) async fn sequence_alter_cluster_rename( &mut self, session: &mut Session, __arg2: AlterClusterRenamePlan, ) -> Result<ExecuteResponse, AdapterError>
pub(crate) async fn sequence_alter_cluster_swap( &mut self, session: &mut Session, __arg2: AlterClusterSwapPlan, ) -> Result<ExecuteResponse, AdapterError>
pub(crate) async fn sequence_alter_cluster_replica_rename( &mut self, session: &Session, __arg2: AlterClusterReplicaRenamePlan, ) -> Result<ExecuteResponse, AdapterError>
sourcepub(crate) async fn sequence_alter_set_cluster(
&self,
_session: &Session,
__arg2: AlterSetClusterPlan,
) -> Result<ExecuteResponse, AdapterError>
pub(crate) async fn sequence_alter_set_cluster( &self, _session: &Session, __arg2: AlterSetClusterPlan, ) -> Result<ExecuteResponse, AdapterError>
Convert a AlterSetClusterPlan
to a sequence of catalog operators and adjust state.
source§impl Coordinator
impl Coordinator
pub(crate) async fn sequence_create_continual_task( &mut self, session: &Session, plan: CreateContinualTaskPlan, resolved_ids: ResolvedIds, ) -> Result<ExecuteResponse, AdapterError>
pub fn optimize_create_continual_task( &self, ct: &ContinualTask, output_id: GlobalId, catalog: Arc<dyn OptimizerCatalog>, debug_name: String, ) -> Result<(DataflowDescription<OptimizedMirRelationExpr>, DataflowDescription<Plan>, DataflowMetainfo<Arc<OptimizerNotice>>), AdapterError>
source§impl Coordinator
impl Coordinator
pub(crate) async fn sequence_create_index( &mut self, ctx: ExecuteContext, plan: CreateIndexPlan, resolved_ids: ResolvedIds, )
pub(crate) async fn explain_create_index( &mut self, ctx: ExecuteContext, __arg2: ExplainPlanPlan, )
pub(crate) async fn explain_replan_index( &mut self, ctx: ExecuteContext, __arg2: ExplainPlanPlan, )
pub(crate) fn explain_index( &mut self, ctx: &ExecuteContext, _: ExplainPlanPlan, ) -> Result<ExecuteResponse, AdapterError>
fn create_index_validate( &mut self, plan: CreateIndexPlan, resolved_ids: ResolvedIds, explain_ctx: ExplainContext, ) -> Result<CreateIndexStage, AdapterError>
async fn create_index_optimize( &mut self, __arg1: CreateIndexOptimize, ) -> Result<StageResult<Box<CreateIndexStage>>, AdapterError>
async fn create_index_finish( &mut self, session: &Session, __arg2: CreateIndexFinish, ) -> Result<StageResult<Box<CreateIndexStage>>, AdapterError>
async fn create_index_explain( &mut self, session: &Session, __arg2: CreateIndexExplain, ) -> Result<StageResult<Box<CreateIndexStage>>, AdapterError>
source§impl Coordinator
impl Coordinator
pub(crate) async fn sequence_create_materialized_view( &mut self, ctx: ExecuteContext, plan: CreateMaterializedViewPlan, resolved_ids: ResolvedIds, )
pub(crate) async fn explain_create_materialized_view( &mut self, ctx: ExecuteContext, __arg2: ExplainPlanPlan, )
pub(crate) async fn explain_replan_materialized_view( &mut self, ctx: ExecuteContext, __arg2: ExplainPlanPlan, )
pub(super) fn explain_materialized_view( &mut self, ctx: &ExecuteContext, _: ExplainPlanPlan, ) -> Result<ExecuteResponse, AdapterError>
fn create_materialized_view_validate( &mut self, session: &Session, plan: CreateMaterializedViewPlan, resolved_ids: ResolvedIds, explain_ctx: ExplainContext, ) -> Result<CreateMaterializedViewStage, AdapterError>
async fn create_materialized_view_optimize( &mut self, __arg1: CreateMaterializedViewOptimize, ) -> Result<StageResult<Box<CreateMaterializedViewStage>>, AdapterError>
async fn create_materialized_view_finish( &mut self, session: &Session, __arg2: CreateMaterializedViewFinish, ) -> Result<StageResult<Box<CreateMaterializedViewStage>>, AdapterError>
sourcefn select_timestamps(
&self,
id_bundle: CollectionIdBundle,
refresh_schedule: Option<&RefreshSchedule>,
read_holds: &ReadHolds<Timestamp>,
) -> Result<(Antichain<Timestamp>, Antichain<Timestamp>, Antichain<Timestamp>), AdapterError>
fn select_timestamps( &self, id_bundle: CollectionIdBundle, refresh_schedule: Option<&RefreshSchedule>, read_holds: &ReadHolds<Timestamp>, ) -> Result<(Antichain<Timestamp>, Antichain<Timestamp>, Antichain<Timestamp>), AdapterError>
Select the initial dataflow_as_of
, storage_as_of
, and until
frontiers for a
materialized view.
async fn create_materialized_view_explain( &mut self, session: &Session, __arg2: CreateMaterializedViewExplain, ) -> Result<StageResult<Box<CreateMaterializedViewStage>>, AdapterError>
pub(crate) async fn explain_pushdown_materialized_view( &self, ctx: ExecuteContext, item_id: CatalogItemId, )
source§impl Coordinator
impl Coordinator
pub(crate) async fn sequence_create_view( &mut self, ctx: ExecuteContext, plan: CreateViewPlan, resolved_ids: ResolvedIds, )
pub(crate) async fn explain_create_view( &mut self, ctx: ExecuteContext, __arg2: ExplainPlanPlan, )
pub(crate) async fn explain_replan_view( &mut self, ctx: ExecuteContext, __arg2: ExplainPlanPlan, )
pub(crate) fn explain_view( &mut self, ctx: &ExecuteContext, _: ExplainPlanPlan, ) -> Result<ExecuteResponse, AdapterError>
fn create_view_validate( &mut self, plan: CreateViewPlan, resolved_ids: ResolvedIds, explain_ctx: ExplainContext, ) -> Result<CreateViewStage, AdapterError>
async fn create_view_optimize( &mut self, __arg1: CreateViewOptimize, ) -> Result<StageResult<Box<CreateViewStage>>, AdapterError>
async fn create_view_finish( &mut self, session: &Session, __arg2: CreateViewFinish, ) -> Result<StageResult<Box<CreateViewStage>>, AdapterError>
async fn create_view_explain( &mut self, session: &Session, __arg2: CreateViewExplain, ) -> Result<StageResult<Box<CreateViewStage>>, AdapterError>
source§impl Coordinator
impl Coordinator
pub async fn sequence_explain_timestamp( &mut self, ctx: ExecuteContext, plan: ExplainTimestampPlan, target_cluster: TargetCluster, )
fn explain_timestamp_validity( &self, session: &Session, plan: ExplainTimestampPlan, target_cluster: TargetCluster, ) -> Result<ExplainTimestampStage, AdapterError>
fn explain_timestamp_optimize( &self, _: ExplainTimestampOptimize, ) -> Result<StageResult<Box<ExplainTimestampStage>>, AdapterError>
async fn explain_timestamp_real_time_recency( &self, session: &Session, __arg2: ExplainTimestampRealTimeRecency, ) -> Result<StageResult<Box<ExplainTimestampStage>>, AdapterError>
pub(crate) fn explain_timestamp( &self, session: &Session, cluster_id: ClusterId, id_bundle: &CollectionIdBundle, determination: TimestampDetermination<Timestamp>, ) -> TimestampExplanation<Timestamp>
async fn explain_timestamp_finish( &mut self, session: &mut Session, __arg2: ExplainTimestampFinish, ) -> Result<StageResult<Box<ExplainTimestampStage>>, AdapterError>
source§impl Coordinator
impl Coordinator
sourcepub(crate) async fn sequence_peek(
&mut self,
ctx: ExecuteContext,
plan: SelectPlan,
target_cluster: TargetCluster,
max_query_result_size: Option<u64>,
)
pub(crate) async fn sequence_peek( &mut self, ctx: ExecuteContext, plan: SelectPlan, target_cluster: TargetCluster, max_query_result_size: Option<u64>, )
Sequence a peek, determining a timestamp and the most efficient dataflow interaction.
Peeks are sequenced by assigning a timestamp for evaluation, and then determining and deploying the most efficient evaluation plan. The peek could evaluate to a constant, be a simple read out of an existing arrangement, or required a new dataflow to build the results to return.
pub(crate) async fn sequence_copy_to( &mut self, ctx: ExecuteContext, __arg2: CopyToPlan, target_cluster: TargetCluster, )
pub(crate) async fn explain_peek( &mut self, ctx: ExecuteContext, __arg2: ExplainPlanPlan, target_cluster: TargetCluster, )
sourcepub fn peek_validate(
&self,
session: &Session,
plan: SelectPlan,
target_cluster: TargetCluster,
copy_to_ctx: Option<CopyToContext>,
explain_ctx: ExplainContext,
max_query_result_size: Option<u64>,
) -> Result<PeekStage, AdapterError>
pub fn peek_validate( &self, session: &Session, plan: SelectPlan, target_cluster: TargetCluster, copy_to_ctx: Option<CopyToContext>, explain_ctx: ExplainContext, max_query_result_size: Option<u64>, ) -> Result<PeekStage, AdapterError>
Do some simple validation. We must defer most of it until after any off-thread work.
sourceasync fn peek_linearize_timestamp(
&self,
session: &Session,
__arg2: PeekStageLinearizeTimestamp,
) -> Result<StageResult<Box<PeekStage>>, AdapterError>
async fn peek_linearize_timestamp( &self, session: &Session, __arg2: PeekStageLinearizeTimestamp, ) -> Result<StageResult<Box<PeekStage>>, AdapterError>
Possibly linearize a timestamp from a TimestampOracle
.
sourcefn peek_timestamp_read_hold(
&mut self,
session: &mut Session,
_: PeekStageTimestampReadHold,
) -> Result<StageResult<Box<PeekStage>>, AdapterError>
fn peek_timestamp_read_hold( &mut self, session: &mut Session, _: PeekStageTimestampReadHold, ) -> Result<StageResult<Box<PeekStage>>, AdapterError>
Determine a read timestamp and create appropriate read holds.
async fn peek_optimize( &self, session: &Session, __arg2: PeekStageOptimize, ) -> Result<StageResult<Box<PeekStage>>, AdapterError>
async fn peek_real_time_recency( &self, session: &Session, __arg2: PeekStageRealTimeRecency, ) -> Result<StageResult<Box<PeekStage>>, AdapterError>
async fn peek_finish( &mut self, ctx: &mut ExecuteContext, __arg2: PeekStageFinish, ) -> Result<StageResult<Box<PeekStage>>, AdapterError>
async fn peek_copy_to_dataflow( &mut self, ctx: &ExecuteContext, __arg2: PeekStageCopyTo, ) -> Result<StageResult<Box<PeekStage>>, AdapterError>
async fn peek_explain_plan( &self, session: &Session, __arg2: PeekStageExplainPlan, ) -> Result<StageResult<Box<PeekStage>>, AdapterError>
async fn peek_explain_pushdown( &self, session: &Session, stage: PeekStageExplainPushdown, ) -> Result<StageResult<Box<PeekStage>>, AdapterError>
sourcepub(super) fn sequence_peek_timestamp(
&mut self,
session: &mut Session,
when: &QueryWhen,
cluster_id: ClusterId,
timeline_context: TimelineContext,
oracle_read_ts: Option<Timestamp>,
source_bundle: &CollectionIdBundle,
source_ids: &BTreeSet<GlobalId>,
real_time_recency_ts: Option<Timestamp>,
requires_linearization: RequireLinearization,
) -> Result<TimestampDetermination<Timestamp>, AdapterError>
pub(super) fn sequence_peek_timestamp( &mut self, session: &mut Session, when: &QueryWhen, cluster_id: ClusterId, timeline_context: TimelineContext, oracle_read_ts: Option<Timestamp>, source_bundle: &CollectionIdBundle, source_ids: &BTreeSet<GlobalId>, real_time_recency_ts: Option<Timestamp>, requires_linearization: RequireLinearization, ) -> Result<TimestampDetermination<Timestamp>, AdapterError>
Determines the query timestamp and acquires read holds on dependent sources if necessary.
source§impl Coordinator
impl Coordinator
pub(crate) async fn sequence_create_secret( &mut self, ctx: ExecuteContext, plan: CreateSecretPlan, )
async fn create_secret_validate( &mut self, session: &Session, plan: CreateSecretPlan, ) -> Result<SecretStage, AdapterError>
async fn create_secret_ensure( &mut self, session: &Session, __arg2: CreateSecretEnsure, ) -> Result<StageResult<Box<SecretStage>>, AdapterError>
fn extract_secret( &self, session: &Session, secret_as: &mut MirScalarExpr, ) -> Result<Vec<u8>, AdapterError>
async fn create_secret_finish( &mut self, session: &Session, __arg2: CreateSecretFinish, ) -> Result<StageResult<Box<SecretStage>>, AdapterError>
pub(crate) async fn sequence_alter_secret( &mut self, ctx: ExecuteContext, plan: AlterSecretPlan, )
fn alter_secret( &mut self, session: &Session, plan: AlterSecretPlan, ) -> Result<StageResult<Box<SecretStage>>, AdapterError>
pub(crate) async fn sequence_rotate_keys( &mut self, ctx: ExecuteContext, id: CatalogItemId, )
fn rotate_keys_ensure( &mut self, _: RotateKeysSecretEnsure, ) -> Result<StageResult<Box<SecretStage>>, AdapterError>
async fn rotate_keys_finish( &mut self, session: &Session, __arg2: RotateKeysSecretFinish, ) -> Result<StageResult<Box<SecretStage>>, AdapterError>
source§impl Coordinator
impl Coordinator
pub(crate) async fn sequence_subscribe( &mut self, ctx: ExecuteContext, plan: SubscribePlan, target_cluster: TargetCluster, )
fn subscribe_validate( &mut self, session: &mut Session, plan: SubscribePlan, target_cluster: TargetCluster, ) -> Result<SubscribeStage, AdapterError>
fn subscribe_optimize_mir( &mut self, session: &Session, _: SubscribeOptimizeMir, ) -> Result<StageResult<Box<SubscribeStage>>, AdapterError>
async fn subscribe_timestamp_optimize_lir( &mut self, ctx: &ExecuteContext, __arg2: SubscribeTimestampOptimizeLir, ) -> Result<StageResult<Box<SubscribeStage>>, AdapterError>
async fn subscribe_finish( &mut self, ctx: &mut ExecuteContext, __arg2: SubscribeFinish, ) -> Result<StageResult<Box<SubscribeStage>>, AdapterError>
source§impl Coordinator
impl Coordinator
sourcepub(crate) async fn sequence_staged<S>(
&mut self,
ctx: S::Ctx,
parent_span: Span,
stage: S,
)
pub(crate) async fn sequence_staged<S>( &mut self, ctx: S::Ctx, parent_span: Span, stage: S, )
fn handle_spawn<C, T, F>( &self, ctx: C, handle: JoinHandle<Result<T, AdapterError>>, cancel_enabled: bool, f: F, )
async fn create_source_inner( &self, session: &Session, plans: Vec<CreateSourcePlanBundle>, ) -> Result<CreateSourceInner, AdapterError>
sourcepub(crate) async fn plan_subsource(
&mut self,
session: &Session,
params: &Params,
subsource_stmt: CreateSubsourceStatement<Aug>,
) -> Result<CreateSourcePlanBundle, AdapterError>
pub(crate) async fn plan_subsource( &mut self, session: &Session, params: &Params, subsource_stmt: CreateSubsourceStatement<Aug>, ) -> Result<CreateSourcePlanBundle, AdapterError>
Subsources are planned differently from other statements because they
are typically synthesized from other statements, e.g. CREATE SOURCE
.
Because of this, we have usually “missed” the opportunity to plan them
through the normal statement execution life cycle (the exception being
during bootstrapping).
sourcepub(crate) async fn plan_purified_alter_source_add_subsource(
&mut self,
session: &Session,
params: Params,
source_name: ResolvedItemName,
options: Vec<AlterSourceAddSubsourceOption<Aug>>,
subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
) -> Result<(Plan, ResolvedIds), AdapterError>
pub(crate) async fn plan_purified_alter_source_add_subsource( &mut self, session: &Session, params: Params, source_name: ResolvedItemName, options: Vec<AlterSourceAddSubsourceOption<Aug>>, subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>, ) -> Result<(Plan, ResolvedIds), AdapterError>
Prepares an ALTER SOURCE...ADD SUBSOURCE
.
sourcepub(crate) fn plan_purified_alter_source_refresh_references(
&self,
_session: &Session,
_params: Params,
source_name: ResolvedItemName,
available_source_references: SourceReferences,
) -> Result<(Plan, ResolvedIds), AdapterError>
pub(crate) fn plan_purified_alter_source_refresh_references( &self, _session: &Session, _params: Params, source_name: ResolvedItemName, available_source_references: SourceReferences, ) -> Result<(Plan, ResolvedIds), AdapterError>
Prepares an ALTER SOURCE...REFRESH REFERENCES
.
sourcepub(crate) async fn plan_purified_create_source(
&mut self,
ctx: &ExecuteContext,
params: Params,
progress_stmt: CreateSubsourceStatement<Aug>,
source_stmt: CreateSourceStatement<Aug>,
subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
available_source_references: SourceReferences,
) -> Result<(Plan, ResolvedIds), AdapterError>
pub(crate) async fn plan_purified_create_source( &mut self, ctx: &ExecuteContext, params: Params, progress_stmt: CreateSubsourceStatement<Aug>, source_stmt: CreateSourceStatement<Aug>, subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>, available_source_references: SourceReferences, ) -> Result<(Plan, ResolvedIds), AdapterError>
Prepares a CREATE SOURCE
statement to create its progress subsource,
the primary source, and any ingestion export subsources (e.g. PG
tables).
pub(super) async fn sequence_create_source( &mut self, session: &mut Session, plans: Vec<CreateSourcePlanBundle>, ) -> Result<ExecuteResponse, AdapterError>
pub(super) async fn sequence_create_connection( &mut self, ctx: ExecuteContext, plan: CreateConnectionPlan, resolved_ids: ResolvedIds, )
pub(crate) async fn sequence_create_connection_stage_finish( &mut self, session: &mut Session, connection_id: CatalogItemId, connection_gid: GlobalId, plan: CreateConnectionPlan, resolved_ids: ResolvedIds, ) -> Result<ExecuteResponse, AdapterError>
pub(super) async fn sequence_create_database( &mut self, session: &mut Session, plan: CreateDatabasePlan, ) -> Result<ExecuteResponse, AdapterError>
pub(super) async fn sequence_create_schema( &mut self, session: &mut Session, plan: CreateSchemaPlan, ) -> Result<ExecuteResponse, AdapterError>
pub(super) async fn sequence_create_role( &mut self, conn_id: Option<&ConnectionId>, __arg2: CreateRolePlan, ) -> Result<ExecuteResponse, AdapterError>
pub(super) async fn sequence_create_network_policy( &mut self, session: &Session, __arg2: CreateNetworkPolicyPlan, ) -> Result<ExecuteResponse, AdapterError>
pub(super) async fn sequence_alter_network_policy( &mut self, session: &Session, __arg2: AlterNetworkPolicyPlan, ) -> Result<ExecuteResponse, AdapterError>
pub(super) async fn sequence_create_table( &mut self, ctx: &mut ExecuteContext, plan: CreateTablePlan, resolved_ids: ResolvedIds, ) -> Result<ExecuteResponse, AdapterError>
pub(super) async fn sequence_create_sink( &mut self, ctx: ExecuteContext, plan: CreateSinkPlan, resolved_ids: ResolvedIds, )
sourcepub(super) fn validate_system_column_references(
&self,
uses_ambiguous_columns: bool,
depends_on: &BTreeSet<GlobalId>,
) -> Result<(), AdapterError>
pub(super) fn validate_system_column_references( &self, uses_ambiguous_columns: bool, depends_on: &BTreeSet<GlobalId>, ) -> Result<(), AdapterError>
Validates that a view definition does not contain any expressions that may lead to
ambiguous column references to system tables. For example NATURAL JOIN
or SELECT *
.
We prevent these expressions so that we can add columns to system tables without changing the definition of the view.
Here is a bit of a hand wavy proof as to why we only need to check the immediate view definition for system objects and ambiguous column references, and not the entire dependency tree:
- A view with no object references cannot have any ambiguous column references to a system object, because it has no system objects.
- A view with a direct reference to a system object and a * or NATURAL JOIN will be rejected due to ambiguous column references.
- A view with system objects but no * or NATURAL JOINs cannot have any ambiguous column references to a system object, because all column references are explicitly named.
- A view with * or NATURAL JOINs, that doesn’t directly reference a system object cannot have any ambiguous column references to a system object, because there are no system objects in the top level view and all sub-views are guaranteed to have no ambiguous column references to system objects.
pub(super) async fn sequence_create_type( &mut self, session: &Session, plan: CreateTypePlan, resolved_ids: ResolvedIds, ) -> Result<ExecuteResponse, AdapterError>
pub(super) async fn sequence_comment_on( &mut self, session: &Session, plan: CommentPlan, ) -> Result<ExecuteResponse, AdapterError>
pub(super) async fn sequence_drop_objects( &mut self, session: &Session, __arg2: DropObjectsPlan, ) -> Result<ExecuteResponse, AdapterError>
fn validate_dropped_role_ownership( &self, session: &Session, dropped_roles: &BTreeMap<RoleId, &str>, ) -> Result<(), AdapterError>
pub(super) async fn sequence_drop_owned( &mut self, session: &Session, plan: DropOwnedPlan, ) -> Result<ExecuteResponse, AdapterError>
fn sequence_drop_common( &self, session: &Session, ids: Vec<ObjectId>, ) -> Result<DropOps, AdapterError>
pub(super) fn sequence_explain_schema( &self, _: ExplainSinkSchemaPlan, ) -> Result<ExecuteResponse, AdapterError>
pub(super) fn sequence_show_all_variables( &self, session: &Session, ) -> Result<ExecuteResponse, AdapterError>
pub(super) fn sequence_show_variable( &self, session: &Session, plan: ShowVariablePlan, ) -> Result<ExecuteResponse, AdapterError>
pub(super) async fn sequence_inspect_shard( &self, session: &Session, plan: InspectShardPlan, ) -> Result<ExecuteResponse, AdapterError>
pub(super) fn sequence_set_variable( &self, session: &mut Session, plan: SetVariablePlan, ) -> Result<ExecuteResponse, AdapterError>
pub(super) fn sequence_reset_variable( &self, session: &mut Session, plan: ResetVariablePlan, ) -> Result<ExecuteResponse, AdapterError>
pub(super) fn sequence_set_transaction( &self, session: &mut Session, plan: SetTransactionPlan, ) -> Result<ExecuteResponse, AdapterError>
fn validate_set_isolation_level( &self, session: &Session, ) -> Result<(), AdapterError>
fn validate_set_cluster(&self, session: &Session) -> Result<(), AdapterError>
pub(super) async fn sequence_end_transaction( &mut self, ctx: ExecuteContext, action: EndTransactionAction, )
async fn sequence_end_transaction_inner( &mut self, session: &mut Session, action: EndTransactionAction, ) -> Result<(Option<TransactionOps<Timestamp>>, Option<WriteLocks>), AdapterError>
pub(super) async fn sequence_side_effecting_func( &mut self, ctx: ExecuteContext, plan: SideEffectingFunc, )
sourcepub(super) async fn determine_real_time_recent_timestamp(
&self,
session: &Session,
source_ids: impl Iterator<Item = CatalogItemId>,
) -> Result<Option<BoxFuture<'static, Result<Timestamp, StorageError<Timestamp>>>>, AdapterError>
pub(super) async fn determine_real_time_recent_timestamp( &self, session: &Session, source_ids: impl Iterator<Item = CatalogItemId>, ) -> Result<Option<BoxFuture<'static, Result<Timestamp, StorageError<Timestamp>>>>, AdapterError>
Checks to see if the session needs a real time recency timestamp and if so returns a future that will return the timestamp.
pub(super) async fn sequence_explain_plan( &mut self, ctx: ExecuteContext, plan: ExplainPlanPlan, target_cluster: TargetCluster, )
pub(super) async fn sequence_explain_pushdown( &mut self, ctx: ExecuteContext, plan: ExplainPushdownPlan, target_cluster: TargetCluster, )
async fn render_explain_pushdown( &self, ctx: ExecuteContext, as_of: Antichain<Timestamp>, mz_now: ResultSpec<'static>, read_holds: Option<ReadHolds<Timestamp>>, imports: impl IntoIterator<Item = (GlobalId, MapFilterProject)> + 'static, )
async fn render_explain_pushdown_prepare( &self, session: &Session, as_of: Antichain<Timestamp>, mz_now: ResultSpec<'static>, imports: impl IntoIterator<Item = (GlobalId, MapFilterProject)>, ) -> impl Future<Output = Result<ExecuteResponse, AdapterError>>
pub(super) async fn sequence_insert( &mut self, ctx: ExecuteContext, plan: InsertPlan, )
sourcepub(super) async fn sequence_read_then_write(
&mut self,
ctx: ExecuteContext,
plan: ReadThenWritePlan,
)
pub(super) async fn sequence_read_then_write( &mut self, ctx: ExecuteContext, plan: ReadThenWritePlan, )
ReadThenWrite is a plan whose writes depend on the results of a read. This works by doing a Peek then queuing a SendDiffs. No writes or read-then-writes can occur between the Peek and SendDiff otherwise a serializability violation could occur.
pub(super) async fn sequence_alter_item_rename( &mut self, session: &mut Session, plan: AlterItemRenamePlan, ) -> Result<ExecuteResponse, AdapterError>
pub(super) async fn sequence_alter_retain_history( &mut self, session: &mut Session, plan: AlterRetainHistoryPlan, ) -> Result<ExecuteResponse, AdapterError>
pub(super) async fn sequence_alter_schema_rename( &mut self, session: &mut Session, plan: AlterSchemaRenamePlan, ) -> Result<ExecuteResponse, AdapterError>
pub(super) async fn sequence_alter_schema_swap( &mut self, session: &mut Session, plan: AlterSchemaSwapPlan, ) -> Result<ExecuteResponse, AdapterError>
pub(super) async fn sequence_alter_role( &mut self, session: &Session, __arg2: AlterRolePlan, ) -> Result<ExecuteResponse, AdapterError>
pub(super) async fn sequence_alter_sink_prepare( &mut self, ctx: ExecuteContext, plan: AlterSinkPlan, )
pub async fn sequence_alter_sink_finish(&mut self, ctx: AlterSinkReadyContext)
pub(super) async fn sequence_alter_connection( &mut self, ctx: ExecuteContext, __arg2: AlterConnectionPlan, )
async fn sequence_alter_connection_options( &mut self, ctx: ExecuteContext, id: CatalogItemId, set_options: BTreeMap<ConnectionOptionName, Option<WithOptionValue<Aug>>>, drop_options: BTreeSet<ConnectionOptionName>, validate: bool, )
pub(crate) async fn sequence_alter_connection_stage_finish( &mut self, session: &mut Session, id: CatalogItemId, connection: Connection, ) -> Result<ExecuteResponse, AdapterError>
pub(super) async fn sequence_alter_source( &mut self, session: &Session, __arg2: AlterSourcePlan, ) -> Result<ExecuteResponse, AdapterError>
pub(super) async fn sequence_alter_system_set( &mut self, session: &Session, __arg2: AlterSystemSetPlan, ) -> Result<ExecuteResponse, AdapterError>
pub(super) async fn sequence_alter_system_reset( &mut self, session: &Session, __arg2: AlterSystemResetPlan, ) -> Result<ExecuteResponse, AdapterError>
pub(super) async fn sequence_alter_system_reset_all( &mut self, session: &Session, _: AlterSystemResetAllPlan, ) -> Result<ExecuteResponse, AdapterError>
fn is_user_allowed_to_alter_system( &self, session: &Session, var_name: Option<&str>, ) -> Result<(), AdapterError>
fn validate_alter_system_network_policy( &self, session: &Session, policy_value: &VariableValue, ) -> Result<(), AdapterError>
sourcefn validate_alter_network_policy(
&self,
session: &Session,
policy_rules: &Vec<NetworkPolicyRule>,
) -> Result<(), AdapterError>
fn validate_alter_network_policy( &self, session: &Session, policy_rules: &Vec<NetworkPolicyRule>, ) -> Result<(), AdapterError>
Validates that a set of NetworkPolicyRule
s is valid for the current Session
.
This helps prevent users from modifying network policies in a way that would lock out their current connection.
pub(super) fn sequence_execute( &mut self, session: &mut Session, plan: ExecutePlan, ) -> Result<String, AdapterError>
pub(super) async fn sequence_grant_privileges( &mut self, session: &Session, __arg2: GrantPrivilegesPlan, ) -> Result<ExecuteResponse, AdapterError>
pub(super) async fn sequence_revoke_privileges( &mut self, session: &Session, __arg2: RevokePrivilegesPlan, ) -> Result<ExecuteResponse, AdapterError>
async fn sequence_update_privileges( &mut self, session: &Session, update_privileges: Vec<UpdatePrivilege>, grantees: Vec<RoleId>, variant: UpdatePrivilegeVariant, ) -> Result<ExecuteResponse, AdapterError>
pub(super) async fn sequence_alter_default_privileges( &mut self, session: &Session, __arg2: AlterDefaultPrivilegesPlan, ) -> Result<ExecuteResponse, AdapterError>
pub(super) async fn sequence_grant_role( &mut self, session: &Session, __arg2: GrantRolePlan, ) -> Result<ExecuteResponse, AdapterError>
pub(super) async fn sequence_revoke_role( &mut self, session: &Session, __arg2: RevokeRolePlan, ) -> Result<ExecuteResponse, AdapterError>
pub(super) async fn sequence_alter_owner( &mut self, session: &Session, __arg2: AlterOwnerPlan, ) -> Result<ExecuteResponse, AdapterError>
pub(super) async fn sequence_reassign_owned( &mut self, session: &Session, __arg2: ReassignOwnedPlan, ) -> Result<ExecuteResponse, AdapterError>
pub(crate) async fn handle_deferred_statement(&mut self)
pub(super) async fn sequence_alter_table( &mut self, _session: &Session, _plan: AlterTablePlan, ) -> Result<ExecuteResponse, AdapterError>
source§impl Coordinator
impl Coordinator
pub(super) async fn statistics_oracle( &self, session: &Session, source_ids: &BTreeSet<GlobalId>, query_as_of: &Antichain<Timestamp>, is_oneshot: bool, ) -> Result<Box<dyn StatisticsOracle>, AdapterError>
source§impl Coordinator
impl Coordinator
sourcefn emit_optimizer_notices(
&self,
session: &Session,
notices: &Vec<RawOptimizerNotice>,
)
fn emit_optimizer_notices( &self, session: &Session, notices: &Vec<RawOptimizerNotice>, )
Forward notices that we got from the optimizer.
sourceasync fn process_dataflow_metainfo(
&mut self,
df_meta: DataflowMetainfo,
export_id: GlobalId,
session: &Session,
notice_ids: Vec<GlobalId>,
) -> Option<BoxFuture<'static, ()>>
async fn process_dataflow_metainfo( &mut self, df_meta: DataflowMetainfo, export_id: GlobalId, session: &Session, notice_ids: Vec<GlobalId>, ) -> Option<BoxFuture<'static, ()>>
Process the metainfo from a newly created non-transient dataflow.
source§impl Coordinator
impl Coordinator
sourcepub(crate) fn sequence_plan(
&mut self,
ctx: ExecuteContext,
plan: Plan,
resolved_ids: ResolvedIds,
) -> LocalBoxFuture<'_, ()>
pub(crate) fn sequence_plan( &mut self, ctx: ExecuteContext, plan: Plan, resolved_ids: ResolvedIds, ) -> LocalBoxFuture<'_, ()>
BOXED FUTURE: As of Nov 2023 the returned Future from this function was 34KB. This would get stored on the stack which is bad for runtime performance, and blow up our stack usage. Because of that we purposefully move this Future onto the heap (i.e. Box it).
pub(crate) async fn sequence_execute_single_statement_transaction( &mut self, ctx: ExecuteContext, stmt: Arc<Statement<Raw>>, params: Params, )
sourcepub(crate) async fn sequence_create_role_for_startup(
&mut self,
plan: CreateRolePlan,
) -> Result<ExecuteResponse, AdapterError>
pub(crate) async fn sequence_create_role_for_startup( &mut self, plan: CreateRolePlan, ) -> Result<ExecuteResponse, AdapterError>
Creates a role during connection startup.
This should not be called from anywhere except connection startup.
pub(crate) fn allocate_transient_id(&self) -> (CatalogItemId, GlobalId)
fn should_emit_rbac_notice(&self, session: &Session) -> Option<AdapterNotice>
pub(crate) fn insert_constant( catalog: &Catalog, session: &mut Session, id: CatalogItemId, constants: MirRelationExpr, ) -> Result<ExecuteResponse, AdapterError>
pub(crate) fn send_diffs( session: &mut Session, plan: SendDiffsPlan, ) -> Result<ExecuteResponse, AdapterError>
source§impl Coordinator
impl Coordinator
pub(crate) fn plan_statement( &self, session: &Session, stmt: Statement<Aug>, params: &Params, resolved_ids: &ResolvedIds, ) -> Result<Plan, AdapterError>
pub(crate) fn declare( &self, ctx: ExecuteContext, name: String, stmt: Statement<Raw>, sql: String, params: Params, )
fn declare_inner( session: &mut Session, catalog: &Catalog, name: String, stmt: Statement<Raw>, sql: String, params: Params, now: EpochMillis, ) -> Result<(), AdapterError>
pub(crate) fn describe( catalog: &Catalog, session: &Session, stmt: Option<Statement<Raw>>, param_types: Vec<Option<ScalarType>>, ) -> Result<StatementDesc, AdapterError>
sourcepub(crate) fn verify_prepared_statement(
catalog: &Catalog,
session: &mut Session,
name: &str,
) -> Result<(), AdapterError>
pub(crate) fn verify_prepared_statement( catalog: &Catalog, session: &mut Session, name: &str, ) -> Result<(), AdapterError>
Verify a prepared statement is still valid. This will return an error if the catalog’s revision has changed and the statement now produces a different type than its original.
sourcepub(crate) fn verify_portal(
&self,
session: &mut Session,
name: &str,
) -> Result<(), AdapterError>
pub(crate) fn verify_portal( &self, session: &mut Session, name: &str, ) -> Result<(), AdapterError>
Verify a portal is still valid.
sourcefn verify_statement_revision(
catalog: &Catalog,
session: &Session,
stmt: Option<&Statement<Raw>>,
desc: &StatementDesc,
catalog_revision: u64,
) -> Result<Option<u64>, AdapterError>
fn verify_statement_revision( catalog: &Catalog, session: &Session, stmt: Option<&Statement<Raw>>, desc: &StatementDesc, catalog_revision: u64, ) -> Result<Option<u64>, AdapterError>
If the catalog and portal revisions don’t match, re-describe the statement
and ensure its result type has not changed. Return Some(x)
with the new
(valid) revision if its plan has changed. Return None
if the revisions
match. Return an error if the plan has changed.
sourcepub(crate) async fn clear_transaction(
&mut self,
session: &mut Session,
) -> TransactionStatus<Timestamp>
pub(crate) async fn clear_transaction( &mut self, session: &mut Session, ) -> TransactionStatus<Timestamp>
Handle removing in-progress transaction state regardless of the end action of the transaction.
sourcepub(crate) async fn clear_connection(&mut self, conn_id: &ConnectionId)
pub(crate) async fn clear_connection(&mut self, conn_id: &ConnectionId)
Clears coordinator state for a connection.
sourcepub(crate) async fn add_active_compute_sink(
&mut self,
id: GlobalId,
active_sink: ActiveComputeSink,
) -> BoxFuture<'static, ()>
pub(crate) async fn add_active_compute_sink( &mut self, id: GlobalId, active_sink: ActiveComputeSink, ) -> BoxFuture<'static, ()>
Adds coordinator bookkeeping for an active compute sink.
This is a low-level method. The caller is responsible for installing the sink in the controller.
sourcepub(crate) async fn remove_active_compute_sink(
&mut self,
id: GlobalId,
) -> Option<ActiveComputeSink>
pub(crate) async fn remove_active_compute_sink( &mut self, id: GlobalId, ) -> Option<ActiveComputeSink>
Removes coordinator bookkeeping for an active compute sink.
This is a low-level method. The caller is responsible for dropping the
sink from the controller. Consider calling drop_compute_sink
or
retire_compute_sink
instead.
source§impl Coordinator
impl Coordinator
sourcepub(crate) async fn bootstrap(
&mut self,
boot_ts: Timestamp,
migrated_storage_collections_0dt: BTreeSet<CatalogItemId>,
builtin_table_updates: Vec<BuiltinTableUpdate>,
cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
uncached_local_exprs: BTreeMap<GlobalId, LocalExpressions>,
audit_logs_iterator: AuditLogIterator,
) -> Result<(), AdapterError>
pub(crate) async fn bootstrap( &mut self, boot_ts: Timestamp, migrated_storage_collections_0dt: BTreeSet<CatalogItemId>, builtin_table_updates: Vec<BuiltinTableUpdate>, cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>, uncached_local_exprs: BTreeMap<GlobalId, LocalExpressions>, audit_logs_iterator: AuditLogIterator, ) -> Result<(), AdapterError>
Initializes coordinator state based on the contained catalog. Must be
called after creating the coordinator and before calling the
Coordinator::serve
method.
sourceasync fn bootstrap_tables(
&mut self,
entries: &[CatalogEntry],
builtin_table_updates: Vec<BuiltinTableUpdate>,
audit_logs_iterator: AuditLogIterator,
)
async fn bootstrap_tables( &mut self, entries: &[CatalogEntry], builtin_table_updates: Vec<BuiltinTableUpdate>, audit_logs_iterator: AuditLogIterator, )
Prepares tables for writing by resetting them to a known state and appending the given builtin table updates. The timestamp oracle will be advanced to the write timestamp of the append when this method returns.
sourcefn bootstrap_audit_log_table<'a>(
&mut self,
table_id: CatalogItemId,
name: &'a QualifiedItemName,
table: &'a Table,
audit_logs_iterator: AuditLogIterator,
read_ts: Timestamp,
) -> JoinHandle<Vec<StateUpdate>>
fn bootstrap_audit_log_table<'a>( &mut self, table_id: CatalogItemId, name: &'a QualifiedItemName, table: &'a Table, audit_logs_iterator: AuditLogIterator, read_ts: Timestamp, ) -> JoinHandle<Vec<StateUpdate>>
Prepare updates to the audit log table. The audit log table append only and very large, so
we only need to find the events present in audit_logs_iterator
but not in the audit log
table.
sourceasync fn bootstrap_storage_collections(
&mut self,
migrated_storage_collections: &BTreeSet<CatalogItemId>,
)
async fn bootstrap_storage_collections( &mut self, migrated_storage_collections: &BTreeSet<CatalogItemId>, )
Initializes all storage collections required by catalog objects in the storage controller.
This method takes care of collection creation, as well as migration of existing collections.
Creating all storage collections in a single create_collections
call, rather than on
demand, is more efficient as it reduces the number of writes to durable storage. It also
allows subsequent bootstrap logic to fetch metadata (such as frontiers) of arbitrary
storage collections, without needing to worry about dependency order.
migrated_storage_collections
is a set of builtin storage collections that have been
migrated and should be handled specially.
sourceasync fn bootstrap_builtin_continual_tasks(
&mut self,
collections: Vec<(GlobalId, CollectionDescription<Timestamp>)>,
)
async fn bootstrap_builtin_continual_tasks( &mut self, collections: Vec<(GlobalId, CollectionDescription<Timestamp>)>, )
Make as_of selection happy for builtin CTs. Ideally we’d write the initial as_of down in the durable catalog, but that’s hard because of boot ordering. Instead, we set the since of the storage collection to something that’s a reasonable lower bound for the as_of. Then, if the upper is 0, the as_of selection code will allow us to jump it forward to this since.
sourcefn bootstrap_dataflow_plans(
&mut self,
ordered_catalog_entries: &[CatalogEntry],
cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
) -> Result<BTreeMap<GlobalId, GlobalExpressions>, AdapterError>
fn bootstrap_dataflow_plans( &mut self, ordered_catalog_entries: &[CatalogEntry], cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>, ) -> Result<BTreeMap<GlobalId, GlobalExpressions>, AdapterError>
Invokes the optimizer on all indexes and materialized views in the catalog and inserts the resulting dataflow plans into the catalog state.
ordered_catalog_entries
must be sorted in dependency order, with dependencies ordered
before their dependants.
This method does not perform timestamp selection for the dataflows, nor does it create them in the compute controller. Both of these steps happen later during bootstrapping.
Returns a map of expressions that were not cached.
sourceasync fn bootstrap_dataflow_as_ofs(
&mut self,
) -> BTreeMap<GlobalId, ReadHold<Timestamp>>
async fn bootstrap_dataflow_as_ofs( &mut self, ) -> BTreeMap<GlobalId, ReadHold<Timestamp>>
Selects for each compute dataflow an as-of suitable for bootstrapping it.
Returns a set of ReadHold
s that ensures the read frontiers of involved collections stay
in place and that must not be dropped before all compute dataflows have been created with
the compute controller.
This method expects all storage collections and dataflow plans to be available, so it must
run after Coordinator::bootstrap_storage_collections
and
Coordinator::bootstrap_dataflow_plans
.
sourcefn serve(
self,
internal_cmd_rx: UnboundedReceiver<Message>,
strict_serializable_reads_rx: UnboundedReceiver<(ConnectionId, PendingReadTxn)>,
cmd_rx: UnboundedReceiver<(OpenTelemetryContext, Command)>,
group_commit_rx: GroupCommitWaiter,
) -> LocalBoxFuture<'static, ()>
fn serve( self, internal_cmd_rx: UnboundedReceiver<Message>, strict_serializable_reads_rx: UnboundedReceiver<(ConnectionId, PendingReadTxn)>, cmd_rx: UnboundedReceiver<(OpenTelemetryContext, Command)>, group_commit_rx: GroupCommitWaiter, ) -> LocalBoxFuture<'static, ()>
Serves the coordinator, receiving commands from users over cmd_rx
and feedback from dataflow workers over feedback_rx
.
You must call bootstrap
before calling this method.
BOXED FUTURE: As of Nov 2023 the returned Future from this function was 92KB. This would get stored on the stack which is bad for runtime performance, and blow up our stack usage. Because of that we purposefully move this Future onto the heap (i.e. Box it).
sourcefn owned_catalog(&self) -> Arc<Catalog>
fn owned_catalog(&self) -> Arc<Catalog>
Obtain a read-only Catalog snapshot, suitable for giving out to non-Coordinator thread tasks.
sourcefn optimizer_metrics(&self) -> OptimizerMetrics
fn optimizer_metrics(&self) -> OptimizerMetrics
Obtain a handle to the optimizer metrics, suitable for giving out to non-Coordinator thread tasks.
sourcefn catalog_mut(&mut self) -> &mut Catalog
fn catalog_mut(&mut self) -> &mut Catalog
Obtain a writeable Catalog reference.
sourcefn connection_context(&self) -> &ConnectionContext
fn connection_context(&self) -> &ConnectionContext
Obtain a reference to the coordinator’s connection context.
sourcefn secrets_reader(&self) -> &Arc<dyn SecretsReader>
fn secrets_reader(&self) -> &Arc<dyn SecretsReader>
Obtain a reference to the coordinator’s secret reader, in an Arc
.
sourcepub(crate) fn broadcast_notice(&self, notice: AdapterNotice)
pub(crate) fn broadcast_notice(&self, notice: AdapterNotice)
Publishes a notice message to all sessions.
pub(crate) fn active_conns(&self) -> &BTreeMap<ConnectionId, ConnMeta>
pub(crate) fn retire_execution( &mut self, reason: StatementEndedExecutionReason, ctx_extra: ExecuteContextExtra, )
sourcepub fn dataflow_builder(
&self,
instance: ComputeInstanceId,
) -> DataflowBuilder<'_>
pub fn dataflow_builder( &self, instance: ComputeInstanceId, ) -> DataflowBuilder<'_>
Creates a new dataflow builder from the catalog and indexes in self
.
sourcepub fn instance_snapshot(
&self,
id: ComputeInstanceId,
) -> Result<ComputeInstanceSnapshot, InstanceMissing>
pub fn instance_snapshot( &self, id: ComputeInstanceId, ) -> Result<ComputeInstanceSnapshot, InstanceMissing>
Return a reference-less snapshot to the indicated compute instance.
sourcepub(crate) async fn ship_dataflow(
&mut self,
dataflow: DataflowDescription<Plan>,
instance: ComputeInstanceId,
subscribe_target_replica: Option<ReplicaId>,
)
pub(crate) async fn ship_dataflow( &mut self, dataflow: DataflowDescription<Plan>, instance: ComputeInstanceId, subscribe_target_replica: Option<ReplicaId>, )
Call into the compute controller to install a finalized dataflow, and initialize the read policies for its exported readable objects.
sourcepub(crate) async fn ship_dataflow_and_notice_builtin_table_updates(
&mut self,
dataflow: DataflowDescription<Plan>,
instance: ComputeInstanceId,
notice_builtin_updates_fut: Option<BoxFuture<'static, ()>>,
)
pub(crate) async fn ship_dataflow_and_notice_builtin_table_updates( &mut self, dataflow: DataflowDescription<Plan>, instance: ComputeInstanceId, notice_builtin_updates_fut: Option<BoxFuture<'static, ()>>, )
Like ship_dataflow
, but also await on builtin table updates.
sourcepub fn install_compute_watch_set(
&mut self,
conn_id: ConnectionId,
objects: BTreeSet<GlobalId>,
t: Timestamp,
state: WatchSetResponse,
)
pub fn install_compute_watch_set( &mut self, conn_id: ConnectionId, objects: BTreeSet<GlobalId>, t: Timestamp, state: WatchSetResponse, )
Install a watch set in the controller that is automatically associated with the given connection id. The watchset will be automatically cleared if the connection terminates before the watchset completes.
sourcepub fn install_storage_watch_set(
&mut self,
conn_id: ConnectionId,
objects: BTreeSet<GlobalId>,
t: Timestamp,
state: WatchSetResponse,
)
pub fn install_storage_watch_set( &mut self, conn_id: ConnectionId, objects: BTreeSet<GlobalId>, t: Timestamp, state: WatchSetResponse, )
Install a watch set in the controller that is automatically associated with the given connection id. The watchset will be automatically cleared if the connection terminates before the watchset completes.
sourcepub fn cancel_pending_watchsets(&mut self, conn_id: &ConnectionId)
pub fn cancel_pending_watchsets(&mut self, conn_id: &ConnectionId)
Cancels pending watchsets associated with the provided connection id.
sourcepub async fn dump(&self) -> Result<Value, Error>
pub async fn dump(&self) -> Result<Value, Error>
Returns the state of the Coordinator
formatted as JSON.
The returned value is not guaranteed to be stable and may change at any point in time.
sourceasync fn prune_storage_usage_events_on_startup(
&self,
retention_period: Duration,
)
async fn prune_storage_usage_events_on_startup( &self, retention_period: Duration, )
Prune all storage usage events from the MZ_STORAGE_USAGE_BY_SHARD
table that are older
than retention_period
.
This method will read the entire contents of MZ_STORAGE_USAGE_BY_SHARD
into memory
which can be expensive.
DO NOT call this method outside of startup. The safety of reading at the current oracle read
timestamp and then writing at whatever the current write timestamp is (instead of
read_ts + 1
) relies on the fact that there are no outstanding writes during startup.
Group commit, which this method uses to write the retractions, has builtin fencing, and we
never commit retractions to MZ_STORAGE_USAGE_BY_SHARD
outside of this method, which is
only called once during startup. So we don’t have to worry about double/invalid retractions.
Trait Implementations§
source§impl Debug for Coordinator
impl Debug for Coordinator
source§impl TimestampProvider for Coordinator
impl TimestampProvider for Coordinator
source§fn compute_read_frontier(
&self,
instance: ComputeInstanceId,
id: GlobalId,
) -> Antichain<Timestamp>
fn compute_read_frontier( &self, instance: ComputeInstanceId, id: GlobalId, ) -> Antichain<Timestamp>
Reports a collection’s current read frontier.
source§fn compute_write_frontier(
&self,
instance: ComputeInstanceId,
id: GlobalId,
) -> Antichain<Timestamp>
fn compute_write_frontier( &self, instance: ComputeInstanceId, id: GlobalId, ) -> Antichain<Timestamp>
Reports a collection’s current write frontier.
source§fn storage_frontiers(
&self,
ids: Vec<GlobalId>,
) -> Vec<(GlobalId, Antichain<Timestamp>, Antichain<Timestamp>)>
fn storage_frontiers( &self, ids: Vec<GlobalId>, ) -> Vec<(GlobalId, Antichain<Timestamp>, Antichain<Timestamp>)>
source§fn acquire_read_holds(
&self,
id_bundle: &CollectionIdBundle,
) -> ReadHolds<Timestamp>
fn acquire_read_holds( &self, id_bundle: &CollectionIdBundle, ) -> ReadHolds<Timestamp>
id_bundle
at the earliest possible
times.fn catalog_state(&self) -> &CatalogState
fn get_timeline(timeline_context: &TimelineContext) -> Option<Timeline>
source§fn needs_linearized_read_ts(
isolation_level: &IsolationLevel,
when: &QueryWhen,
) -> bool
fn needs_linearized_read_ts( isolation_level: &IsolationLevel, when: &QueryWhen, ) -> bool
source§fn determine_timestamp_for(
&self,
session: &Session,
id_bundle: &CollectionIdBundle,
when: &QueryWhen,
compute_instance: ComputeInstanceId,
timeline_context: &TimelineContext,
oracle_read_ts: Option<Timestamp>,
real_time_recency_ts: Option<Timestamp>,
isolation_level: &IsolationLevel,
) -> Result<(TimestampDetermination<Timestamp>, ReadHolds<Timestamp>), AdapterError>
fn determine_timestamp_for( &self, session: &Session, id_bundle: &CollectionIdBundle, when: &QueryWhen, compute_instance: ComputeInstanceId, timeline_context: &TimelineContext, oracle_read_ts: Option<Timestamp>, real_time_recency_ts: Option<Timestamp>, isolation_level: &IsolationLevel, ) -> Result<(TimestampDetermination<Timestamp>, ReadHolds<Timestamp>), AdapterError>
source§fn least_valid_read(
&self,
read_holds: &ReadHolds<Timestamp>,
) -> Antichain<Timestamp>
fn least_valid_read( &self, read_holds: &ReadHolds<Timestamp>, ) -> Antichain<Timestamp>
source§fn least_valid_write(
&self,
id_bundle: &CollectionIdBundle,
) -> Antichain<Timestamp>
fn least_valid_write( &self, id_bundle: &CollectionIdBundle, ) -> Antichain<Timestamp>
source§fn greatest_available_read(
&self,
id_bundle: &CollectionIdBundle,
) -> Antichain<Timestamp>
fn greatest_available_read( &self, id_bundle: &CollectionIdBundle, ) -> Antichain<Timestamp>
least_valid_write
- 1, i.e., each time in least_valid_write
stepped back in a
saturating way.Auto Trait Implementations§
impl !Freeze for Coordinator
impl !RefUnwindSafe for Coordinator
impl !Send for Coordinator
impl !Sync for Coordinator
impl Unpin for Coordinator
impl !UnwindSafe for Coordinator
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
source§impl<T> FmtForward for T
impl<T> FmtForward for T
source§fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
self
to use its Binary
implementation when Debug
-formatted.source§fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
self
to use its Display
implementation when
Debug
-formatted.source§fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
self
to use its LowerExp
implementation when
Debug
-formatted.source§fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
self
to use its LowerHex
implementation when
Debug
-formatted.source§fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
self
to use its Octal
implementation when Debug
-formatted.source§fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
self
to use its Pointer
implementation when
Debug
-formatted.source§fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
self
to use its UpperExp
implementation when
Debug
-formatted.source§fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
self
to use its UpperHex
implementation when
Debug
-formatted.source§impl<T> FutureExt for T
impl<T> FutureExt for T
source§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
source§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
source§impl<T> Instrument for T
impl<T> Instrument for T
source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T
in a tonic::Request
source§impl<T, U> OverrideFrom<Option<&T>> for Uwhere
U: OverrideFrom<T>,
impl<T, U> OverrideFrom<Option<&T>> for Uwhere
U: OverrideFrom<T>,
source§impl<T> Pipe for Twhere
T: ?Sized,
impl<T> Pipe for Twhere
T: ?Sized,
source§fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
source§fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
self
and passes that borrow into the pipe function. Read moresource§fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
self
and passes that borrow into the pipe function. Read moresource§fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
source§fn pipe_borrow_mut<'a, B, R>(
&'a mut self,
func: impl FnOnce(&'a mut B) -> R,
) -> R
fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R, ) -> R
source§fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
self
, then passes self.as_ref()
into the pipe function.source§fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
self
, then passes self.as_mut()
into the pipe
function.source§fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
self
, then passes self.deref()
into the pipe function.source§impl<T> Pointable for T
impl<T> Pointable for T
source§impl<P, R> ProtoType<R> for Pwhere
R: RustType<P>,
impl<P, R> ProtoType<R> for Pwhere
R: RustType<P>,
source§fn into_rust(self) -> Result<R, TryFromProtoError>
fn into_rust(self) -> Result<R, TryFromProtoError>
RustType::from_proto
.source§fn from_rust(rust: &R) -> P
fn from_rust(rust: &R) -> P
RustType::into_proto
.source§impl<'a, S, T> Semigroup<&'a S> for Twhere
T: Semigroup<S>,
impl<'a, S, T> Semigroup<&'a S> for Twhere
T: Semigroup<S>,
source§fn plus_equals(&mut self, rhs: &&'a S)
fn plus_equals(&mut self, rhs: &&'a S)
std::ops::AddAssign
, for types that do not implement AddAssign
.source§impl<T> Tap for T
impl<T> Tap for T
source§fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
Borrow<B>
of a value. Read moresource§fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
BorrowMut<B>
of a value. Read moresource§fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
AsRef<R>
view of a value. Read moresource§fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
AsMut<R>
view of a value. Read moresource§fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
Deref::Target
of a value. Read moresource§fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
Deref::Target
of a value. Read moresource§fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
.tap()
only in debug builds, and is erased in release builds.source§fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
.tap_mut()
only in debug builds, and is erased in release
builds.source§fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
.tap_borrow()
only in debug builds, and is erased in release
builds.source§fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
.tap_borrow_mut()
only in debug builds, and is erased in release
builds.source§fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
.tap_ref()
only in debug builds, and is erased in release
builds.source§fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
.tap_ref_mut()
only in debug builds, and is erased in release
builds.source§fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
.tap_deref()
only in debug builds, and is erased in release
builds.