Struct mz_adapter::coord::Coordinator
source · pub struct Coordinator {Show 34 fields
controller: Controller,
catalog: Arc<Catalog>,
internal_cmd_tx: UnboundedSender<Message>,
group_commit_tx: GroupCommitNotifier,
strict_serializable_reads_tx: UnboundedSender<(ConnectionId, PendingReadTxn)>,
global_timelines: BTreeMap<Timeline, TimelineState<Timestamp>>,
transient_id_counter: u64,
active_conns: BTreeMap<ConnectionId, ConnMeta>,
storage_read_capabilities: BTreeMap<GlobalId, ReadCapability<Timestamp>>,
compute_read_capabilities: BTreeMap<GlobalId, ReadCapability<Timestamp>>,
txn_read_holds: BTreeMap<ConnectionId, Vec<ReadHolds<Timestamp>>>,
pending_peeks: BTreeMap<Uuid, PendingPeek>,
client_pending_peeks: BTreeMap<ConnectionId, BTreeMap<Uuid, ClusterId>>,
pending_real_time_recency_timestamp: BTreeMap<ConnectionId, RealTimeRecencyContext>,
pending_linearize_read_txns: BTreeMap<ConnectionId, PendingReadTxn>,
active_compute_sinks: BTreeMap<GlobalId, ActiveComputeSink>,
active_webhooks: BTreeMap<GlobalId, WebhookAppenderInvalidator>,
write_lock: Arc<Mutex<()>>,
write_lock_wait_group: VecDeque<Deferred>,
pending_writes: Vec<PendingWriteTxn>,
advance_timelines_interval: Interval,
secrets_controller: Arc<dyn SecretsController>,
caching_secrets_reader: CachingSecretsReader,
cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
transient_replica_metadata: BTreeMap<ReplicaId, Option<ReplicaMetadata>>,
storage_usage_client: StorageUsageClient,
storage_usage_collection_interval: Duration,
segment_client: Option<Client>,
metrics: Metrics,
tracing_handle: TracingHandle,
statement_logging: StatementLogging,
webhook_concurrency_limit: WebhookConcurrencyLimiter,
timestamp_oracle_impl: TimestampOracleImpl,
pg_timestamp_oracle_config: Option<PostgresTimestampOracleConfig>,
}
Expand description
Glues the external world to the Timely workers.
Fields§
§controller: Controller
The controller for the storage and compute layers.
catalog: Arc<Catalog>
The catalog in an Arc suitable for readonly references. The Arc allows
us to hand out cheap copies of the catalog to functions that can use it
off of the main coordinator thread. If the coordinator needs to mutate
the catalog, call Self::catalog_mut
, which will clone this struct member,
allowing it to be mutated here while the other off-thread references can
read their catalog as long as needed. In the future we would like this
to be a pTVC, but for now this is sufficient.
internal_cmd_tx: UnboundedSender<Message>
Channel to manage internal commands from the coordinator to itself.
group_commit_tx: GroupCommitNotifier
Notification that triggers a group commit.
strict_serializable_reads_tx: UnboundedSender<(ConnectionId, PendingReadTxn)>
Channel for strict serializable reads ready to commit.
global_timelines: BTreeMap<Timeline, TimelineState<Timestamp>>
Mechanism for totally ordering write and read timestamps, so that all reads reflect exactly the set of writes that precede them, and no writes that follow.
transient_id_counter: u64
§active_conns: BTreeMap<ConnectionId, ConnMeta>
A map from connection ID to metadata about that connection for all active connections.
storage_read_capabilities: BTreeMap<GlobalId, ReadCapability<Timestamp>>
For each identifier in STORAGE, its read policy and any read holds on time.
Transactions should introduce and remove constraints through the methods
acquire_read_holds
and release_read_holds
, respectively. The base
policy can also be updated, though one should be sure to communicate this
to the controller for it to have an effect.
Access to this field should be restricted to methods in the read_policy
API.
compute_read_capabilities: BTreeMap<GlobalId, ReadCapability<Timestamp>>
For each identifier in COMPUTE, its read policy and any read holds on time.
Transactions should introduce and remove constraints through the methods
acquire_read_holds
and release_read_holds
, respectively. The base
policy can also be updated, though one should be sure to communicate this
to the controller for it to have an effect.
Access to this field should be restricted to methods in the read_policy
API.
txn_read_holds: BTreeMap<ConnectionId, Vec<ReadHolds<Timestamp>>>
For each transaction, the pinned storage and compute identifiers and time at which they are pinned.
Upon completing a transaction, this timestamp should be removed from the holds
in self.read_capability[id]
, using the release_read_holds
method.
We use a Vec because ReadHolds
doesn’t have a way of tracking multiplicity.
pending_peeks: BTreeMap<Uuid, PendingPeek>
Access to the peek fields should be restricted to methods in the peek
API.
A map from pending peek ids to the queue into which responses are sent, and
the connection id of the client that initiated the peek.
client_pending_peeks: BTreeMap<ConnectionId, BTreeMap<Uuid, ClusterId>>
A map from client connection ids to a set of all pending peeks for that client.
pending_real_time_recency_timestamp: BTreeMap<ConnectionId, RealTimeRecencyContext>
A map from client connection ids to a pending real time recency timestamps.
pending_linearize_read_txns: BTreeMap<ConnectionId, PendingReadTxn>
A map from client connection ids to pending linearize read transaction.
active_compute_sinks: BTreeMap<GlobalId, ActiveComputeSink>
A map from the compute sink ID to it’s state description.
active_webhooks: BTreeMap<GlobalId, WebhookAppenderInvalidator>
A map from active webhooks to their invalidation handle.
write_lock: Arc<Mutex<()>>
Serializes accesses to write critical sections.
write_lock_wait_group: VecDeque<Deferred>
Holds plans deferred due to write lock.
pending_writes: Vec<PendingWriteTxn>
Pending writes waiting for a group commit.
advance_timelines_interval: Interval
For the realtime timeline, an explicit SELECT or INSERT on a table will bump the
table’s timestamps, but there are cases where timestamps are not bumped but
we expect the closed timestamps to advance (AS OF X
, SUBSCRIBing views over
RT sources and tables). To address these, spawn a task that forces table
timestamps to close on a regular interval. This roughly tracks the behavior
of realtime sources that close off timestamps on an interval.
For non-realtime timelines, nothing pushes the timestamps forward, so we must do it manually.
secrets_controller: Arc<dyn SecretsController>
Handle to secret manager that can create and delete secrets from an arbitrary secret storage engine.
caching_secrets_reader: CachingSecretsReader
A secrets reader than maintains an in-memory cache, where values have a set TTL.
cloud_resource_controller: Option<Arc<dyn CloudResourceController>>
Handle to a manager that can create and delete kubernetes resources (ie: VpcEndpoint objects)
transient_replica_metadata: BTreeMap<ReplicaId, Option<ReplicaMetadata>>
Metadata about replicas that doesn’t need to be persisted. Intended for inclusion in system tables.
None
is used as a tombstone value for replicas that have been
dropped and for which no further updates should be recorded.
storage_usage_client: StorageUsageClient
Persist client for fetching storage metadata such as size metrics.
storage_usage_collection_interval: Duration
The interval at which to collect storage usage information.
segment_client: Option<Client>
Segment analytics client.
metrics: Metrics
Coordinator metrics.
tracing_handle: TracingHandle
Tracing handle.
statement_logging: StatementLogging
Data used by the statement logging feature.
webhook_concurrency_limit: WebhookConcurrencyLimiter
Limit for how many conncurrent webhook requests we allow.
timestamp_oracle_impl: TimestampOracleImpl
Implementation of
TimestampOracle
to
use.
pg_timestamp_oracle_config: Option<PostgresTimestampOracleConfig>
Optional config for the Postgres-backed timestamp oracle. This is
required when postgres
is configured using the timestamp_oracle
system variable.
Implementations§
source§impl Coordinator
impl Coordinator
sourcepub fn resolve_collection_id_bundle_names(
&self,
session: &Session,
id_bundle: &CollectionIdBundle
) -> Vec<String>
pub fn resolve_collection_id_bundle_names( &self, session: &Session, id_bundle: &CollectionIdBundle ) -> Vec<String>
Resolves the full name from the corresponding catalog entry for each item in id_bundle
.
If an item in the bundle does not exist in the catalog, it’s not included in the result.
source§impl Coordinator
impl Coordinator
sourcepub async fn implement_peek_plan(
&mut self,
ctx_extra: &mut ExecuteContextExtra,
plan: PlannedPeek,
finishing: RowSetFinishing,
compute_instance: ComputeInstanceId,
target_replica: Option<ReplicaId>,
max_result_size: u64
) -> Result<ExecuteResponse, AdapterError>
pub async fn implement_peek_plan( &mut self, ctx_extra: &mut ExecuteContextExtra, plan: PlannedPeek, finishing: RowSetFinishing, compute_instance: ComputeInstanceId, target_replica: Option<ReplicaId>, max_result_size: u64 ) -> Result<ExecuteResponse, AdapterError>
Implements a peek plan produced by create_plan
above.
sourcepub(crate) fn cancel_pending_peeks(&mut self, conn_id: &ConnectionId)
pub(crate) fn cancel_pending_peeks(&mut self, conn_id: &ConnectionId)
Cancel and remove all pending peeks that were initiated by the client with conn_id
.
pub(crate) fn send_peek_response( &mut self, uuid: Uuid, response: PeekResponse, otel_ctx: OpenTelemetryContext )
sourcepub(crate) fn remove_pending_peek(&mut self, uuid: &Uuid) -> Option<PendingPeek>
pub(crate) fn remove_pending_peek(&mut self, uuid: &Uuid) -> Option<PendingPeek>
Clean up a peek’s state.
sourcepub(crate) fn send_immediate_rows(rows: Vec<Row>) -> ExecuteResponse
pub(crate) fn send_immediate_rows(rows: Vec<Row>) -> ExecuteResponse
Constructs an ExecuteResponse
that that will send some rows to the
client immediately, as opposed to asking the dataflow layer to send along
the rows after some computation.
source§impl Coordinator
impl Coordinator
pub(crate) fn spawn_statement_logging_task(&self)
pub(crate) async fn drain_statement_log(&mut self)
sourcefn statement_logging_throttling_check(&mut self, cost: usize) -> Option<usize>
fn statement_logging_throttling_check(&mut self, cost: usize) -> Option<usize>
Check whether we need to do throttling (i.e., whether STATEMENT_LOGGING_TARGET_DATA_RATE
is set).
If so, actually do the check.
Returns None
if we must throttle this statement, and Some(n)
otherwise, where n
is the number of statements that were dropped due to throttling before this one.
sourcepub(crate) fn log_prepared_statement(
&mut self,
session: &mut Session,
logging: &Arc<QCell<PreparedStatementLoggingInfo>>
) -> Option<(Option<(StatementPreparedRecord, PreparedStatementEvent)>, Uuid)>
pub(crate) fn log_prepared_statement( &mut self, session: &mut Session, logging: &Arc<QCell<PreparedStatementLoggingInfo>> ) -> Option<(Option<(StatementPreparedRecord, PreparedStatementEvent)>, Uuid)>
Returns any statement logging events needed for a particular
prepared statement. Possibly mutates the PreparedStatementLoggingInfo
metadata.
This function does not do a sampling check, and assumes we did so in a higher layer.
It does do a throttling check, and returns None
if we must not log due to throttling.
sourcepub fn statement_execution_sample_rate(&self, session: &Session) -> f64
pub fn statement_execution_sample_rate(&self, session: &Session) -> f64
The rate at which statement execution should be sampled.
This is the value of the session var statement_logging_sample_rate
,
constrained by the system var statement_logging_max_sample_rate
.
sourcepub fn end_statement_execution(
&mut self,
id: StatementLoggingId,
reason: StatementEndedExecutionReason
)
pub fn end_statement_execution( &mut self, id: StatementLoggingId, reason: StatementEndedExecutionReason )
Record the end of statement execution for a statement whose beginning was logged.
It is an error to call this function for a statement whose beginning was not logged
(because it was not sampled). Requiring the opaque StatementLoggingId
type,
which is only instantiated by begin_statement_execution
if the statement is actually logged,
should prevent this.
fn pack_statement_execution_inner( record: &StatementBeganExecutionRecord, packer: &mut RowPacker<'_> )
fn pack_statement_began_execution_update( record: &StatementBeganExecutionRecord ) -> Row
fn pack_statement_prepared_update( record: &StatementPreparedRecord, packer: &mut RowPacker<'_> )
fn pack_session_history_update(event: &SessionHistoryEvent) -> Row
fn pack_statement_lifecycle_event( StatementLoggingId: &StatementLoggingId, event: &StatementLifecycleEvent, when: EpochMillis ) -> Row
pub fn pack_full_statement_execution_update( began_record: &StatementBeganExecutionRecord, ended_record: &StatementEndedExecutionRecord ) -> Row
pub fn pack_statement_ended_execution_updates( began_record: &StatementBeganExecutionRecord, ended_record: &StatementEndedExecutionRecord ) -> Vec<(Row, Diff)>
sourcefn mutate_record<F: FnOnce(&mut StatementBeganExecutionRecord)>(
&mut self,
StatementLoggingId: StatementLoggingId,
f: F
)
fn mutate_record<F: FnOnce(&mut StatementBeganExecutionRecord)>( &mut self, StatementLoggingId: StatementLoggingId, f: F )
Mutate a statement execution record via the given function f
.
sourcepub fn set_statement_execution_cluster(
&mut self,
id: StatementLoggingId,
cluster_id: ClusterId
)
pub fn set_statement_execution_cluster( &mut self, id: StatementLoggingId, cluster_id: ClusterId )
Set the cluster_id
for a statement, once it’s known.
sourcepub fn set_statement_execution_timestamp(
&mut self,
id: StatementLoggingId,
timestamp: Timestamp
)
pub fn set_statement_execution_timestamp( &mut self, id: StatementLoggingId, timestamp: Timestamp )
Set the execution_timestamp
for a statement, once it’s known
pub fn set_transient_index_id( &mut self, id: StatementLoggingId, transient_index_id: GlobalId )
sourcepub fn begin_statement_execution(
&mut self,
session: &mut Session,
params: Params,
logging: &Arc<QCell<PreparedStatementLoggingInfo>>
) -> Option<StatementLoggingId>
pub fn begin_statement_execution( &mut self, session: &mut Session, params: Params, logging: &Arc<QCell<PreparedStatementLoggingInfo>> ) -> Option<StatementLoggingId>
Possibly record the beginning of statement execution, depending on a randomly-chosen value.
If the execution beginning was indeed logged, returns a StatementLoggingId
that must be
passed to end_statement_execution
to record when it ends.
sourcepub fn begin_session_for_statement_logging(&mut self, session: &ConnMeta)
pub fn begin_session_for_statement_logging(&mut self, session: &ConnMeta)
Record a new connection event
pub fn end_session_for_statement_logging(&mut self, uuid: Uuid)
pub fn record_statement_lifecycle_event( &mut self, id: &StatementLoggingId, event: &StatementLifecycleEvent )
source§impl Coordinator
impl Coordinator
pub(crate) fn now(&self) -> EpochMillis
pub(crate) fn now_datetime(&self) -> DateTime<Utc>
pub(crate) fn get_timestamp_oracle( &self, timeline: &Timeline ) -> &dyn TimestampOracle<Timestamp>
sourcefn get_local_timestamp_oracle(&self) -> &dyn TimestampOracle<Timestamp>
fn get_local_timestamp_oracle(&self) -> &dyn TimestampOracle<Timestamp>
Returns a reference to the timestamp oracle used for reads and writes from/to a local input.
Returns a ShareableTimestampOracle
used for reads and writes from/to a local input.
sourcepub(crate) async fn get_local_read_ts(&self) -> Timestamp
pub(crate) async fn get_local_read_ts(&self) -> Timestamp
Assign a timestamp for a read from a local input. Reads following writes must be at a time >= the write’s timestamp; we choose “equal to” for simplicity’s sake and to open as few new timestamps as possible.
sourcepub(crate) async fn get_local_write_ts(&mut self) -> WriteTimestamp
pub(crate) async fn get_local_write_ts(&mut self) -> WriteTimestamp
Assign a timestamp for a write to a local input and increase the local ts. Writes following reads must ensure that they are assigned a strictly larger timestamp to ensure they are not visible to any real-time earlier reads.
sourcepub(crate) async fn peek_local_write_ts(&self) -> Timestamp
pub(crate) async fn peek_local_write_ts(&self) -> Timestamp
Peek the current timestamp used for operations on local inputs. Used to determine how much to block group commits by.
sourcepub(crate) async fn apply_local_write(&mut self, timestamp: Timestamp)
pub(crate) async fn apply_local_write(&mut self, timestamp: Timestamp)
Peek the current timestamp used for operations on local inputs. Used to determine how much to block group commits by.
Marks a write at timestamp
as completed, using a ShareableTimestampOracle
.
TODO(parkmycar): ShareableTimestampOracle
is a new concept and not always available.
When we move entirely to the shareable model, this method and
Coordinator::apply_local_write
should get merged.
sourcepub(crate) async fn ensure_timeline_state<'a>(
&'a mut self,
timeline: &'a Timeline
) -> &mut TimelineState<Timestamp>
pub(crate) async fn ensure_timeline_state<'a>( &'a mut self, timeline: &'a Timeline ) -> &mut TimelineState<Timestamp>
Ensures that a global timeline state exists for timeline
.
sourcepub(crate) async fn ensure_timeline_state_with_initial_time<'a, P>(
timeline: &'a Timeline,
initially: Timestamp,
now: NowFn,
timestamp_oracle_impl: TimestampOracleImpl,
timestamp_persistence: P,
pg_oracle_config: Option<PostgresTimestampOracleConfig>,
global_timelines: &'a mut BTreeMap<Timeline, TimelineState<Timestamp>>
) -> &'a mut TimelineState<Timestamp>where
P: TimestampPersistence<Timestamp> + 'static,
pub(crate) async fn ensure_timeline_state_with_initial_time<'a, P>( timeline: &'a Timeline, initially: Timestamp, now: NowFn, timestamp_oracle_impl: TimestampOracleImpl, timestamp_persistence: P, pg_oracle_config: Option<PostgresTimestampOracleConfig>, global_timelines: &'a mut BTreeMap<Timeline, TimelineState<Timestamp>> ) -> &'a mut TimelineState<Timestamp>where P: TimestampPersistence<Timestamp> + 'static,
Ensures that a global timeline state exists for timeline
, with an initial time
of initially
.
sourcepub(crate) fn build_collection_id_bundle(
&self,
storage_ids: impl IntoIterator<Item = GlobalId>,
compute_ids: impl IntoIterator<Item = (ComputeInstanceId, GlobalId)>,
clusters: impl IntoIterator<Item = ComputeInstanceId>
) -> CollectionIdBundle
pub(crate) fn build_collection_id_bundle( &self, storage_ids: impl IntoIterator<Item = GlobalId>, compute_ids: impl IntoIterator<Item = (ComputeInstanceId, GlobalId)>, clusters: impl IntoIterator<Item = ComputeInstanceId> ) -> CollectionIdBundle
Groups together storage and compute resources into a CollectionIdBundle
sourcepub(crate) fn remove_resources_associated_with_timeline(
&mut self,
timeline: Timeline,
ids: CollectionIdBundle
) -> bool
pub(crate) fn remove_resources_associated_with_timeline( &mut self, timeline: Timeline, ids: CollectionIdBundle ) -> bool
Given a Timeline
and a CollectionIdBundle
, removes all of the “storage ids”
and “compute ids” in the bundle, from the timeline.
pub(crate) fn remove_compute_ids_from_timeline<I>( &mut self, ids: I ) -> Vec<Timeline>where I: IntoIterator<Item = (ComputeInstanceId, GlobalId)>,
pub(crate) fn ids_in_timeline(&self, timeline: &Timeline) -> CollectionIdBundle
sourcepub(crate) fn validate_timeline_context<I>(
&self,
ids: I
) -> Result<TimelineContext, AdapterError>where
I: IntoIterator<Item = GlobalId>,
pub(crate) fn validate_timeline_context<I>( &self, ids: I ) -> Result<TimelineContext, AdapterError>where I: IntoIterator<Item = GlobalId>,
Return an error if the ids are from incompatible TimelineContext
s. This should
be used to prevent users from doing things that are either meaningless
(joining data from timelines that have similar numbers with different
meanings like two separate debezium topics) or will never complete (joining
cdcv2 and realtime data).
sourcepub(crate) fn get_timeline_context(&self, id: GlobalId) -> TimelineContext
pub(crate) fn get_timeline_context(&self, id: GlobalId) -> TimelineContext
Return the TimelineContext
belonging to a GlobalId, if one exists.
sourcefn get_timeline_contexts<I>(&self, ids: I) -> BTreeSet<TimelineContext>where
I: IntoIterator<Item = GlobalId>,
fn get_timeline_contexts<I>(&self, ids: I) -> BTreeSet<TimelineContext>where I: IntoIterator<Item = GlobalId>,
Return the TimelineContext
s belonging to a list of GlobalIds, if any exist.
sourcepub fn partition_ids_by_timeline_context(
&self,
id_bundle: &CollectionIdBundle
) -> impl Iterator<Item = (TimelineContext, CollectionIdBundle)>
pub fn partition_ids_by_timeline_context( &self, id_bundle: &CollectionIdBundle ) -> impl Iterator<Item = (TimelineContext, CollectionIdBundle)>
Returns an iterator that partitions an id bundle by the TimelineContext
that each id
belongs to.
sourcepub(crate) fn timedomain_for<'a, I>(
&self,
uses_ids: I,
timeline_context: &TimelineContext,
conn_id: &ConnectionId,
compute_instance: ComputeInstanceId
) -> Result<CollectionIdBundle, AdapterError>where
I: IntoIterator<Item = &'a GlobalId>,
pub(crate) fn timedomain_for<'a, I>( &self, uses_ids: I, timeline_context: &TimelineContext, conn_id: &ConnectionId, compute_instance: ComputeInstanceId ) -> Result<CollectionIdBundle, AdapterError>where I: IntoIterator<Item = &'a GlobalId>,
Return the set of ids in a timedomain and verify timeline correctness.
When a user starts a transaction, we need to prevent compaction of anything they might read from. We use a heuristic of “anything in the same database schemas with the same timeline as whatever the first query is”.
pub(crate) async fn advance_timelines(&mut self)
source§impl Coordinator
impl Coordinator
pub(crate) async fn oracle_read_ts( &self, session: &Session, timeline_ctx: &TimelineContext, when: &QueryWhen ) -> Option<Timestamp>
sourcepub(crate) async fn determine_timestamp(
&self,
session: &Session,
id_bundle: &CollectionIdBundle,
when: &QueryWhen,
compute_instance: ComputeInstanceId,
timeline_context: &TimelineContext,
oracle_read_ts: Option<Timestamp>,
real_time_recency_ts: Option<Timestamp>
) -> Result<TimestampDetermination<Timestamp>, AdapterError>
pub(crate) async fn determine_timestamp( &self, session: &Session, id_bundle: &CollectionIdBundle, when: &QueryWhen, compute_instance: ComputeInstanceId, timeline_context: &TimelineContext, oracle_read_ts: Option<Timestamp>, real_time_recency_ts: Option<Timestamp> ) -> Result<TimestampDetermination<Timestamp>, AdapterError>
Determines the timestamp for a query.
sourcepub(crate) fn largest_not_in_advance_of_upper(
upper: &Antichain<Timestamp>
) -> Timestamp
pub(crate) fn largest_not_in_advance_of_upper( upper: &Antichain<Timestamp> ) -> Timestamp
The largest element not in advance of any object in the collection.
Times that are not greater to this frontier are complete for all collections identified as arguments.
pub(crate) fn evaluate_when( catalog: &CatalogState, timestamp: MirScalarExpr, session: &Session ) -> Result<Timestamp, AdapterError>
source§impl Coordinator
impl Coordinator
sourcepub(crate) fn trigger_group_commit(&mut self)
pub(crate) fn trigger_group_commit(&mut self)
Send a message to the Coordinate to start a group commit.
sourcepub(crate) async fn try_group_commit(
&mut self,
permit: Option<GroupCommitPermit>
)
pub(crate) async fn try_group_commit( &mut self, permit: Option<GroupCommitPermit> )
Attempts to commit all pending write transactions in a group commit. If the timestamp
chosen for the writes is not ahead of now()
, then we can execute and commit the writes
immediately. Otherwise we must wait for now()
to advance past the timestamp chosen for the
writes.
sourcepub(crate) async fn group_commit_initiate(
&mut self,
write_lock_guard: Option<OwnedMutexGuard<()>>,
permit: Option<GroupCommitPermit>
)
pub(crate) async fn group_commit_initiate( &mut self, write_lock_guard: Option<OwnedMutexGuard<()>>, permit: Option<GroupCommitPermit> )
Tries to commit all pending writes transactions at the same timestamp.
If the caller of this function has the write_lock
acquired, then they can optionally pass
it in to this method. If the caller does not have the write_lock
acquired and the
write_lock
is currently locked by another operation, then only writes to system tables
and table advancements will be applied. If the caller does not have the write_lock
acquired and the write_lock
is not currently locked by another operation, then group
commit will acquire it and all writes will be applied.
All applicable pending writes will be combined into a single Append command and sent to STORAGE as a single batch. All applicable writes will happen at the same timestamp and all involved tables will be advanced to some timestamp larger than the timestamp of the write.
sourcepub(crate) async fn group_commit_apply(
&mut self,
timestamp: Timestamp,
responses: Vec<CompletedClientTransmitter>,
_write_lock_guard: Option<OwnedMutexGuard<()>>,
_permit: Option<GroupCommitPermit>
)
pub(crate) async fn group_commit_apply( &mut self, timestamp: Timestamp, responses: Vec<CompletedClientTransmitter>, _write_lock_guard: Option<OwnedMutexGuard<()>>, _permit: Option<GroupCommitPermit> )
Applies the results of a completed group commit. The read timestamp of the timeline containing user tables will be advanced to the timestamp of the completed write, the read hold on the timeline containing user tables is advanced to the new time, and responses are sent to all waiting clients.
It’s important that the timeline is advanced before responses are sent so that the client is guaranteed to see the write.
We also advance all other timelines and update the read holds of non-realtime timelines.
sourcepub(crate) fn submit_write(&mut self, pending_write_txn: PendingWriteTxn)
pub(crate) fn submit_write(&mut self, pending_write_txn: PendingWriteTxn)
Submit a write to be executed during the next group commit and trigger a group commit.
sourcepub(crate) fn builtin_table_update<'a>(&'a mut self) -> BuiltinTableAppend<'a>
pub(crate) fn builtin_table_update<'a>(&'a mut self) -> BuiltinTableAppend<'a>
Append some BuiltinTableUpdate
s, with various degrees of waiting and blocking.
sourcepub(crate) fn defer_write(&mut self, deferred: Deferred)
pub(crate) fn defer_write(&mut self, deferred: Deferred)
Defers executing deferred
until the write lock becomes available; waiting
occurs in a green-thread, so callers of this function likely want to
return after calling it.
sourcepub(crate) fn try_grant_session_write_lock(
&self,
session: &mut Session
) -> Result<(), TryLockError>
pub(crate) fn try_grant_session_write_lock( &self, session: &mut Session ) -> Result<(), TryLockError>
Attempts to immediately grant session
access to the write lock or
errors if the lock is currently held.
source§impl Coordinator
impl Coordinator
sourcepub(crate) fn handle_command<'a>(
&'a mut self,
cmd: Command
) -> LocalBoxFuture<'a, ()>
pub(crate) fn handle_command<'a>( &'a mut self, cmd: Command ) -> LocalBoxFuture<'a, ()>
BOXED FUTURE: As of Nov 2023 the returned Future from this function was 58KB. This would get stored on the stack which is bad for runtime performance, and blow up our stack usage. Because of that we purposefully move this Future onto the heap (i.e. Box it).
async fn handle_startup( &mut self, tx: Sender<Result<StartupResponse, AdapterError>>, user: User, conn_id: ConnectionId, secret_key: u32, uuid: Uuid, application_name: String, notice_tx: UnboundedSender<AdapterNotice> )
async fn handle_startup_inner( &mut self, user: &User, conn_id: &ConnectionId ) -> Result<RoleId, AdapterError>
sourcepub(crate) async fn handle_execute(
&mut self,
portal_name: String,
session: Session,
tx: ClientTransmitter<ExecuteResponse>,
outer_context: Option<ExecuteContextExtra>
)
pub(crate) async fn handle_execute( &mut self, portal_name: String, session: Session, tx: ClientTransmitter<ExecuteResponse>, outer_context: Option<ExecuteContextExtra> )
Handles an execute command.
pub(crate) async fn handle_execute_inner( &mut self, stmt: Arc<Statement<Raw>>, params: Params, ctx: ExecuteContext )
sourceasync fn resolve_mz_now_for_create_materialized_view<'a>(
&mut self,
cmvs: &CreateMaterializedViewStatement<Aug>,
resolved_ids: &ResolvedIds,
session: &mut Session,
acquire_read_holds: bool
) -> Result<Option<Timestamp>, AdapterError>
async fn resolve_mz_now_for_create_materialized_view<'a>( &mut self, cmvs: &CreateMaterializedViewStatement<Aug>, resolved_ids: &ResolvedIds, session: &mut Session, acquire_read_holds: bool ) -> Result<Option<Timestamp>, AdapterError>
Chooses a timestamp for mz_now()
, if mz_now()
occurs in the with_options
of the materialized view.
If acquire_read_holds
is true, it also grabs read holds on input collections that might possibly be involved
in the MV.
Note that this is NOT what handles mz_now()
in the query part of the MV. (handles it only in with_options
).
sourceasync fn handle_cancel(&mut self, conn_id: ConnectionIdType, secret_key: u32)
async fn handle_cancel(&mut self, conn_id: ConnectionIdType, secret_key: u32)
Instruct the dataflow layer to cancel any ongoing, interactive work for
the named conn_id
if the correct secret key is specified.
Note: Here we take a ConnectionIdType
as opposed to an owned
ConnectionId
because this method gets called by external clients when
they request to cancel a request.
sourcepub(crate) async fn handle_privileged_cancel(&mut self, conn_id: ConnectionId)
pub(crate) async fn handle_privileged_cancel(&mut self, conn_id: ConnectionId)
Unconditionally instructs the dataflow layer to cancel any ongoing,
interactive work for the named conn_id
.
sourceasync fn handle_terminate(&mut self, conn_id: ConnectionId)
async fn handle_terminate(&mut self, conn_id: ConnectionId)
Handle termination of a client session.
This cleans up any state in the coordinator associated with the session.
sourcefn handle_get_webhook(
&mut self,
database: String,
schema: String,
name: String,
tx: Sender<Result<AppendWebhookResponse, AppendWebhookError>>
)
fn handle_get_webhook( &mut self, database: String, schema: String, name: String, tx: Sender<Result<AppendWebhookResponse, AppendWebhookError>> )
Returns the necessary metadata for appending to a webhook source, and a channel to send rows.
source§impl Coordinator
impl Coordinator
sourcepub fn check_consistency(&self) -> Result<(), CoordinatorInconsistencies>
pub fn check_consistency(&self) -> Result<(), CoordinatorInconsistencies>
Checks the Coordinator
to make sure we’re internally consistent.
sourcefn check_read_capabilities(
&self
) -> Result<(), Vec<ReadCapabilitiesInconsistency>>
fn check_read_capabilities( &self ) -> Result<(), Vec<ReadCapabilitiesInconsistency>>
Invariants:
- Read capabilities should reference known objects.
sourcefn check_active_webhooks(&self) -> Result<(), Vec<ActiveWebhookInconsistency>>
fn check_active_webhooks(&self) -> Result<(), Vec<ActiveWebhookInconsistency>>
Invariants
- All
GlobalId
s in theactive_webhooks
map should reference known webhook sources.
source§impl Coordinator
impl Coordinator
sourcepub(crate) async fn catalog_transact(
&mut self,
session: Option<&Session>,
ops: Vec<Op>
) -> Result<(), AdapterError>
pub(crate) async fn catalog_transact( &mut self, session: Option<&Session>, ops: Vec<Op> ) -> Result<(), AdapterError>
Same as Self::catalog_transact_with
without a closure passed in.
sourcepub(crate) async fn catalog_transact_with_side_effects<'c, F, Fut>(
&'c mut self,
session: Option<&Session>,
ops: Vec<Op>,
side_effect: F
) -> Result<(), AdapterError>where
F: FnOnce(&'c mut Coordinator) -> Fut,
Fut: Future<Output = ()>,
pub(crate) async fn catalog_transact_with_side_effects<'c, F, Fut>( &'c mut self, session: Option<&Session>, ops: Vec<Op>, side_effect: F ) -> Result<(), AdapterError>where F: FnOnce(&'c mut Coordinator) -> Fut, Fut: Future<Output = ()>,
Same as Self::catalog_transact_with
but runs builtin table updates concurrently with
any side effects (e.g. creating collections).
sourcepub(crate) async fn catalog_transact_conn(
&mut self,
conn_id: Option<&ConnectionId>,
ops: Vec<Op>
) -> Result<(), AdapterError>
pub(crate) async fn catalog_transact_conn( &mut self, conn_id: Option<&ConnectionId>, ops: Vec<Op> ) -> Result<(), AdapterError>
Same as Self::catalog_transact_with
without a closure passed in.
sourcepub(crate) async fn catalog_transact_with_ddl_transaction(
&mut self,
session: &mut Session,
ops: Vec<Op>
) -> Result<(), AdapterError>
pub(crate) async fn catalog_transact_with_ddl_transaction( &mut self, session: &mut Session, ops: Vec<Op> ) -> Result<(), AdapterError>
Executes a Catalog transaction with handling if the provided Session
is in a SQL
transaction that is executing DDL.
sourcepub(crate) async fn catalog_transact_with<'a, F, R>(
&mut self,
conn_id: Option<&ConnectionId>,
ops: Vec<Op>,
f: F
) -> Result<(R, BoxFuture<'static, ()>), AdapterError>where
F: FnOnce(CatalogTxn<'_, Timestamp>) -> Result<R, AdapterError>,
pub(crate) async fn catalog_transact_with<'a, F, R>( &mut self, conn_id: Option<&ConnectionId>, ops: Vec<Op>, f: F ) -> Result<(R, BoxFuture<'static, ()>), AdapterError>where F: FnOnce(CatalogTxn<'_, Timestamp>) -> Result<R, AdapterError>,
Perform a catalog transaction. The closure is passed a CatalogTxn
made from the prospective CatalogState
(i.e., the Catalog
with ops
applied but before the transaction is committed). The closure can return
an error to abort the transaction, or otherwise return a value that is
returned by this function. This allows callers to error while building
DataflowDesc
s. Coordinator::ship_dataflow
must be called after this
function successfully returns on any built DataflowDesc
.
async fn drop_replica(&mut self, cluster_id: ClusterId, replica_id: ReplicaId)
fn drop_sources(&mut self, sources: Vec<GlobalId>)
fn restart_webhook_sources( &mut self, sources: impl IntoIterator<Item = GlobalId> )
sourcepub async fn drop_compute_sink(
&mut self,
sink_id: GlobalId
) -> Option<ActiveComputeSink>
pub async fn drop_compute_sink( &mut self, sink_id: GlobalId ) -> Option<ActiveComputeSink>
Like drop_compute_sinks
, but for a single compute sink.
Returns the controller’s state for the compute sink if the identified
sink was known to the controller. It is the caller’s responsibility to
retire the returned sink. Consider using retire_compute_sinks
instead.
sourcepub async fn drop_compute_sinks(
&mut self,
sink_ids: impl IntoIterator<Item = GlobalId>
) -> BTreeMap<GlobalId, ActiveComputeSink>
pub async fn drop_compute_sinks( &mut self, sink_ids: impl IntoIterator<Item = GlobalId> ) -> BTreeMap<GlobalId, ActiveComputeSink>
Drops a batch of compute sinks.
For each sink that exists, the coordinator and controller’s state associated with the sink is removed.
Returns a map containing the controller’s state for each sink that was
removed. It is the caller’s responsibility to retire the returned sinks.
Consider using retire_compute_sinks
instead.
sourcepub async fn retire_compute_sinks(
&mut self,
reasons: BTreeMap<GlobalId, ActiveComputeSinkRetireReason>
)
pub async fn retire_compute_sinks( &mut self, reasons: BTreeMap<GlobalId, ActiveComputeSinkRetireReason> )
Retires a batch of sinks with disparate reasons for retirement.
Each sink identified in reasons
is dropped (see drop_compute_sinks
),
then retired with its corresponding reason.
sourcepub(crate) async fn cancel_compute_sinks_for_conn(
&mut self,
conn_id: &ConnectionId
)
pub(crate) async fn cancel_compute_sinks_for_conn( &mut self, conn_id: &ConnectionId )
Cancels all active compute sinks for the identified connection.
sourcepub(crate) async fn retire_compute_sinks_for_conn(
&mut self,
conn_id: &ConnectionId,
reason: ActiveComputeSinkRetireReason
)
pub(crate) async fn retire_compute_sinks_for_conn( &mut self, conn_id: &ConnectionId, reason: ActiveComputeSinkRetireReason )
Retires all active compute sinks for the identified connection with the specified reason.
pub(crate) fn drop_storage_sinks(&mut self, sinks: Vec<GlobalId>)
pub(crate) fn drop_indexes(&mut self, indexes: Vec<(ClusterId, GlobalId)>)
fn drop_materialized_views(&mut self, mviews: Vec<(ClusterId, GlobalId)>)
async fn drop_secrets(&mut self, secrets: Vec<GlobalId>)
async fn drop_vpc_endpoints(&mut self, vpc_endpoints: Vec<GlobalId>)
sourcepub(crate) async fn drop_temp_items(&mut self, conn_id: &ConnectionId)
pub(crate) async fn drop_temp_items(&mut self, conn_id: &ConnectionId)
Removes all temporary items created by the specified connection, though not the temporary schema itself.
fn update_cluster_scheduling_config(&mut self)
fn update_secrets_caching_config(&mut self)
fn update_tracing_config(&mut self)
fn update_compute_config(&mut self)
fn update_storage_config(&mut self)
fn update_pg_timestamp_oracle_config(&mut self)
fn update_metrics_retention(&mut self)
async fn update_jemalloc_profiling_config(&mut self)
fn update_default_arrangement_merge_options(&mut self)
fn update_http_config(&mut self)
pub(crate) async fn create_storage_export( &mut self, id: GlobalId, sink: &Sink ) -> Result<(), AdapterError>
sourcefn validate_resource_limits(
&self,
ops: &Vec<Op>,
conn_id: &ConnectionId
) -> Result<(), AdapterError>
fn validate_resource_limits( &self, ops: &Vec<Op>, conn_id: &ConnectionId ) -> Result<(), AdapterError>
Validate all resource limits in a catalog transaction and return an error if that limit is exceeded.
sourcepub(crate) fn validate_resource_limit<F>(
&self,
current_amount: usize,
new_instances: i64,
resource_limit: F,
resource_type: &str,
limit_name: &str
) -> Result<(), AdapterError>where
F: Fn(&SystemVars) -> u32,
pub(crate) fn validate_resource_limit<F>( &self, current_amount: usize, new_instances: i64, resource_limit: F, resource_type: &str, limit_name: &str ) -> Result<(), AdapterError>where F: Fn(&SystemVars) -> u32,
Validate a specific type of resource limit and return an error if that limit is exceeded.
sourcefn validate_resource_limit_numeric<F>(
&self,
current_amount: Numeric,
new_amount: Numeric,
resource_limit: F,
resource_type: &str,
limit_name: &str
) -> Result<(), AdapterError>where
F: Fn(&SystemVars) -> Numeric,
fn validate_resource_limit_numeric<F>( &self, current_amount: Numeric, new_amount: Numeric, resource_limit: F, resource_type: &str, limit_name: &str ) -> Result<(), AdapterError>where F: Fn(&SystemVars) -> Numeric,
Validate a specific type of float resource limit and return an error if that limit is exceeded.
This is very similar to Self::validate_resource_limit
but for numerics.
source§impl Coordinator
impl Coordinator
sourcepub fn index_oracle(&self, instance: ComputeInstanceId) -> DataflowBuilder<'_>
pub fn index_oracle(&self, instance: ComputeInstanceId) -> DataflowBuilder<'_>
Creates a new index oracle for the specified compute instance.
source§impl Coordinator
impl Coordinator
sourcepub(crate) fn handle_message<'a>(
&'a mut self,
span: Span,
msg: Message
) -> LocalBoxFuture<'a, ()>
pub(crate) fn handle_message<'a>( &'a mut self, span: Span, msg: Message ) -> LocalBoxFuture<'a, ()>
BOXED FUTURE: As of Nov 2023 the returned Future from this function was 74KB. This would get stored on the stack which is bad for runtime performance, and blow up our stack usage. Because of that we purposefully move this Future onto the heap (i.e. Box it).
We pass in a span from the outside, rather than instrumenting this
method using #instrument[...]
or calling .instrument()
at the
callsite so that we can correctly instrument the boxed future here and
so that we can stitch up the OpenTelemetryContext when we’re processing
a Message::Command
or other commands that pass around a context.
pub async fn storage_usage_fetch(&mut self)
async fn storage_usage_update(&mut self, shards_usage: ShardsUsageReferenced)
pub async fn schedule_storage_usage_collection(&self)
async fn message_command(&mut self, cmd: Command)
async fn message_controller(&mut self, message: ControllerResponse)
async fn message_purified_statement_ready( &mut self, __arg1: BackgroundWorkResult<(Vec<(GlobalId, CreateSubsourceStatement<Aug>)>, Statement<Aug>)> )
async fn message_create_connection_validation_ready( &mut self, __arg1: ValidationReady<CreateConnectionPlan> )
async fn message_alter_connection_validation_ready( &mut self, __arg1: ValidationReady<Connection> )
async fn message_write_lock_grant( &mut self, write_lock_guard: OwnedMutexGuard<()> )
async fn message_cluster_event(&mut self, event: ClusterEvent)
sourceasync fn message_linearize_reads(&mut self)
async fn message_linearize_reads(&mut self)
Linearizes sending the results of a read transaction by,
- Holding back any results that were executed at some point in the future, until the containing timeline has advanced to that point in the future.
- Confirming that we are still the current leader before sending results to the client.
sourceasync fn message_real_time_recency_timestamp(
&mut self,
conn_id: ConnectionId,
real_time_recency_ts: Timestamp,
validity: PlanValidity
)
async fn message_real_time_recency_timestamp( &mut self, conn_id: ConnectionId, real_time_recency_ts: Timestamp, validity: PlanValidity )
Finishes sequencing a command that was waiting on a real time recency timestamp.
source§impl Coordinator
impl Coordinator
pub(crate) fn spawn_privatelink_vpc_endpoints_watch_task(&self)
source§impl Coordinator
impl Coordinator
sourcepub(crate) async fn initialize_storage_read_policies(
&mut self,
ids: Vec<GlobalId>,
compaction_window: CompactionWindow
)
pub(crate) async fn initialize_storage_read_policies( &mut self, ids: Vec<GlobalId>, compaction_window: CompactionWindow )
Initialize the storage read policies.
This should be called only after a storage collection is created, and ideally very soon afterwards. The collection is otherwise initialized with a read policy that allows no compaction.
sourcepub(crate) async fn initialize_compute_read_policies(
&mut self,
ids: Vec<GlobalId>,
instance: ComputeInstanceId,
compaction_window: CompactionWindow
)
pub(crate) async fn initialize_compute_read_policies( &mut self, ids: Vec<GlobalId>, instance: ComputeInstanceId, compaction_window: CompactionWindow )
Initialize the compute read policies.
This should be called only after a compute collection is created, and ideally very soon afterwards. The collection is otherwise initialized with a read policy that allows no compaction.
sourcepub(crate) async fn initialize_read_policies(
&mut self,
id_bundle: &CollectionIdBundle,
compaction_window: CompactionWindow
)
pub(crate) async fn initialize_read_policies( &mut self, id_bundle: &CollectionIdBundle, compaction_window: CompactionWindow )
Initialize the storage and compute read policies.
This should be called only after a collection is created, and ideally very soon afterwards. The collection is otherwise initialized with a read policy that allows no compaction.
pub(crate) fn update_storage_base_read_policies( &mut self, base_policies: Vec<(GlobalId, ReadPolicy<Timestamp>)> )
pub(crate) fn update_compute_base_read_policies( &mut self, base_policies: Vec<(ComputeInstanceId, GlobalId, ReadPolicy<Timestamp>)> )
pub(crate) fn update_compute_base_read_policy( &mut self, compute_instance: ComputeInstanceId, id: GlobalId, base_policy: ReadPolicy<Timestamp> )
sourcepub(crate) fn drop_storage_read_policy(&mut self, id: &GlobalId) -> bool
pub(crate) fn drop_storage_read_policy(&mut self, id: &GlobalId) -> bool
Drop read policy in STORAGE for id
.
Returns true if id
had a read policy and false otherwise.
sourcepub(crate) fn drop_compute_read_policy(&mut self, id: &GlobalId) -> bool
pub(crate) fn drop_compute_read_policy(&mut self, id: &GlobalId) -> bool
Drop read policy in COMPUTE for id
.
Returns true if id
had a read policy and false otherwise.
sourcefn initialize_read_holds(
&mut self,
time: Timestamp,
id_bundle: &CollectionIdBundle
) -> ReadHolds<Timestamp>
fn initialize_read_holds( &mut self, time: Timestamp, id_bundle: &CollectionIdBundle ) -> ReadHolds<Timestamp>
Creates a ReadHolds
struct that creates a read hold for each id in
id_bundle
. The time of each read holds is at time
, if possible
otherwise it is at the lowest possible time.
This does not apply the read holds in STORAGE or COMPUTE. It is up to the caller to apply the read holds.
sourcepub(crate) fn acquire_read_holds(
&mut self,
time: Timestamp,
id_bundle: &CollectionIdBundle,
precise: bool
) -> Result<ReadHolds<Timestamp>, Vec<(Antichain<Timestamp>, CollectionIdBundle)>>
pub(crate) fn acquire_read_holds( &mut self, time: Timestamp, id_bundle: &CollectionIdBundle, precise: bool ) -> Result<ReadHolds<Timestamp>, Vec<(Antichain<Timestamp>, CollectionIdBundle)>>
Attempt to acquire read holds on the indicated collections at the indicated time
.
If we are unable to acquire a read hold at the provided time
for a specific id, then
depending on the precise
argument, we either fall back to acquiring a read hold at
the lowest possible time for that id, or return an error. The returned error contains
those collection sinces that were later than the specified time.
sourcepub(crate) fn acquire_read_holds_auto_cleanup(
&mut self,
session: &Session,
time: Timestamp,
id_bundle: &CollectionIdBundle,
precise: bool
) -> Result<(), Vec<(Antichain<Timestamp>, CollectionIdBundle)>>
pub(crate) fn acquire_read_holds_auto_cleanup( &mut self, session: &Session, time: Timestamp, id_bundle: &CollectionIdBundle, precise: bool ) -> Result<(), Vec<(Antichain<Timestamp>, CollectionIdBundle)>>
Attempt to acquire read holds on the indicated collections at the indicated time
.
This is similar to Self::acquire_read_holds, but instead of returning the read holds,
it arranges for them to be automatically released at the end of the transaction.
If we are unable to acquire a read hold at the provided time
for a specific id, then
depending on the precise
argument, we either fall back to acquiring a read hold at
the lowest possible time for that id, or return an error. The returned error contains
those collection sinces that were later than the specified time.
sourcepub(super) fn update_read_holds(
&mut self,
read_holds: ReadHolds<Timestamp>,
new_time: Timestamp
) -> ReadHolds<Timestamp>
pub(super) fn update_read_holds( &mut self, read_holds: ReadHolds<Timestamp>, new_time: Timestamp ) -> ReadHolds<Timestamp>
Attempt to update the timestamp of the read holds on the indicated collections from the
indicated times within read_holds
to new_time
.
If we are unable to update a read hold at the provided time
for a specific id, then we
leave it unchanged.
This method relies on a previous call to
initialize_read_holds
, acquire_read_holds
, or update_read_hold
that returned
read_holds
, and its behavior will be erratic if called on anything else.
sourcepub(super) fn release_read_holds(
&mut self,
read_holdses: Vec<ReadHolds<Timestamp>>
)
pub(super) fn release_read_holds( &mut self, read_holdses: Vec<ReadHolds<Timestamp>> )
Release the given read holds.
This method relies on a previous call to
initialize_read_holds
, acquire_read_holds
, or update_read_hold
that returned
ReadHolds
, and its behavior will be erratic if called on anything else,
or if called more than once on the same bundle of read holds.
source§impl Coordinator
impl Coordinator
sourcepub(super) async fn sequence_alter_set_cluster(
&mut self,
_session: &Session,
__arg2: AlterSetClusterPlan
) -> Result<ExecuteResponse, AdapterError>
pub(super) async fn sequence_alter_set_cluster( &mut self, _session: &Session, __arg2: AlterSetClusterPlan ) -> Result<ExecuteResponse, AdapterError>
Convert a AlterSetClusterPlan
to a sequence of catalog operators and adjust state.
source§impl Coordinator
impl Coordinator
pub(super) async fn sequence_create_cluster( &mut self, session: &Session, __arg2: CreateClusterPlan ) -> Result<ExecuteResponse, AdapterError>
pub(super) async fn sequence_create_managed_cluster( &mut self, session: &Session, __arg2: CreateClusterManagedPlan, cluster_id: ClusterId, ops: Vec<Op> ) -> Result<ExecuteResponse, AdapterError>
fn create_managed_cluster_replica_op( &mut self, cluster_id: ClusterId, id: ReplicaId, name: String, compute: &ComputeReplicaConfig, size: &String, ops: &mut Vec<Op>, azs: Option<&[String]>, disk: bool, owner_id: RoleId ) -> Result<(), AdapterError>
fn ensure_valid_azs<'a, I: IntoIterator<Item = &'a String>>( &self, azs: I ) -> Result<(), AdapterError>
pub(super) async fn sequence_create_unmanaged_cluster( &mut self, session: &Session, __arg2: CreateClusterUnmanagedPlan, id: ClusterId, ops: Vec<Op> ) -> Result<ExecuteResponse, AdapterError>
pub(super) async fn create_cluster(&mut self, cluster_id: ClusterId)
pub(super) async fn sequence_create_cluster_replica( &mut self, session: &Session, __arg2: CreateClusterReplicaPlan ) -> Result<ExecuteResponse, AdapterError>
pub(super) async fn create_cluster_replicas( &mut self, replicas: &[(ClusterId, ReplicaId)] )
pub(super) async fn sequence_alter_cluster( &mut self, session: &Session, __arg2: AlterClusterPlan ) -> Result<ExecuteResponse, AdapterError>
async fn sequence_alter_cluster_managed_to_managed( &mut self, session: &Session, cluster_id: ClusterId, config: &ClusterVariantManaged, new_config: ClusterVariantManaged ) -> Result<(), AdapterError>
async fn sequence_alter_cluster_unmanaged_to_managed( &mut self, session: &Session, cluster_id: ClusterId, new_config: ClusterVariantManaged, options: PlanClusterOption ) -> Result<(), AdapterError>
async fn sequence_alter_cluster_managed_to_unmanaged( &mut self, session: &Session, cluster_id: ClusterId ) -> Result<(), AdapterError>
fn sequence_alter_cluster_unmanaged_to_unmanaged( &self, _session: &Session, _cluster_id: ClusterId, _replicas: AlterOptionParameter<Vec<(String, ReplicaConfig)>> ) -> Result<(), AdapterError>
pub(super) async fn sequence_alter_cluster_rename( &mut self, session: &mut Session, __arg2: AlterClusterRenamePlan ) -> Result<ExecuteResponse, AdapterError>
pub(super) async fn sequence_alter_cluster_swap( &mut self, session: &mut Session, __arg2: AlterClusterSwapPlan ) -> Result<ExecuteResponse, AdapterError>
pub(super) async fn sequence_alter_cluster_replica_rename( &mut self, session: &Session, __arg2: AlterClusterReplicaRenamePlan ) -> Result<ExecuteResponse, AdapterError>
source§impl Coordinator
impl Coordinator
pub(crate) async fn sequence_create_index( &mut self, ctx: ExecuteContext, plan: CreateIndexPlan, resolved_ids: ResolvedIds )
pub(crate) async fn explain_create_index( &mut self, ctx: ExecuteContext, __arg2: ExplainPlanPlan )
pub(crate) async fn explain_replan_index( &mut self, ctx: ExecuteContext, __arg2: ExplainPlanPlan )
fn create_index_validate( &mut self, session: &Session, plan: CreateIndexPlan, resolved_ids: ResolvedIds, explain_ctx: ExplainContext ) -> Result<CreateIndexStage, AdapterError>
async fn create_index_optimize( &mut self, __arg1: CreateIndexOptimize ) -> Result<StageResult<Box<CreateIndexStage>>, AdapterError>
async fn create_index_finish( &mut self, session: &mut Session, __arg2: CreateIndexFinish ) -> Result<StageResult<Box<CreateIndexStage>>, AdapterError>
fn create_index_explain( &mut self, session: &Session, _: CreateIndexExplain ) -> Result<StageResult<Box<CreateIndexStage>>, AdapterError>
source§impl Coordinator
impl Coordinator
pub(crate) async fn sequence_create_materialized_view( &mut self, ctx: ExecuteContext, plan: CreateMaterializedViewPlan, resolved_ids: ResolvedIds )
pub(crate) async fn explain_create_materialized_view( &mut self, ctx: ExecuteContext, __arg2: ExplainPlanPlan )
pub(crate) async fn explain_replan_materialized_view( &mut self, ctx: ExecuteContext, __arg2: ExplainPlanPlan )
fn create_materialized_view_validate( &mut self, session: &Session, plan: CreateMaterializedViewPlan, resolved_ids: ResolvedIds, explain_ctx: ExplainContext ) -> Result<CreateMaterializedViewStage, AdapterError>
async fn create_materialized_view_optimize( &mut self, __arg1: CreateMaterializedViewOptimize ) -> Result<StageResult<Box<CreateMaterializedViewStage>>, AdapterError>
async fn create_materialized_view_finish( &mut self, session: &Session, __arg2: CreateMaterializedViewFinish ) -> Result<StageResult<Box<CreateMaterializedViewStage>>, AdapterError>
sourcefn select_timestamps(
&self,
id_bundle: CollectionIdBundle,
refresh_schedule: Option<&RefreshSchedule>
) -> Result<(Antichain<Timestamp>, Antichain<Timestamp>), AdapterError>
fn select_timestamps( &self, id_bundle: CollectionIdBundle, refresh_schedule: Option<&RefreshSchedule> ) -> Result<(Antichain<Timestamp>, Antichain<Timestamp>), AdapterError>
Select the initial as_of
and until
frontiers for a materialized view.
fn create_materialized_view_explain( &mut self, session: &Session, _: CreateMaterializedViewExplain ) -> Result<StageResult<Box<CreateMaterializedViewStage>>, AdapterError>
source§impl Coordinator
impl Coordinator
pub(crate) async fn sequence_create_view( &mut self, ctx: ExecuteContext, plan: CreateViewPlan, resolved_ids: ResolvedIds )
fn create_view_validate( &mut self, session: &Session, plan: CreateViewPlan, resolved_ids: ResolvedIds ) -> Result<CreateViewStage, AdapterError>
async fn create_view_optimize( &mut self, __arg1: CreateViewOptimize ) -> Result<StageResult<Box<CreateViewStage>>, AdapterError>
async fn create_view_finish( &mut self, session: &Session, __arg2: CreateViewFinish ) -> Result<StageResult<Box<CreateViewStage>>, AdapterError>
source§impl Coordinator
impl Coordinator
sourcepub(crate) async fn sequence_peek(
&mut self,
ctx: ExecuteContext,
plan: SelectPlan,
target_cluster: TargetCluster
)
pub(crate) async fn sequence_peek( &mut self, ctx: ExecuteContext, plan: SelectPlan, target_cluster: TargetCluster )
Sequence a peek, determining a timestamp and the most efficient dataflow interaction.
Peeks are sequenced by assigning a timestamp for evaluation, and then determining and deploying the most efficient evaluation plan. The peek could evaluate to a constant, be a simple read out of an existing arrangement, or required a new dataflow to build the results to return.
pub(crate) async fn sequence_copy_to( &mut self, ctx: ExecuteContext, __arg2: CopyToPlan, target_cluster: TargetCluster )
pub(crate) async fn explain_peek( &mut self, ctx: ExecuteContext, __arg2: ExplainPlanPlan, target_cluster: TargetCluster )
sourcepub(crate) async fn execute_peek_stage(
&mut self,
ctx: ExecuteContext,
root_otel_ctx: OpenTelemetryContext,
stage: PeekStage
)
pub(crate) async fn execute_peek_stage( &mut self, ctx: ExecuteContext, root_otel_ctx: OpenTelemetryContext, stage: PeekStage )
Processes as many peek
stages as possible.
sourcefn peek_stage_validate(
&mut self,
session: &Session,
_: PeekStageValidate
) -> Result<PeekStageLinearizeTimestamp, AdapterError>
fn peek_stage_validate( &mut self, session: &Session, _: PeekStageValidate ) -> Result<PeekStageLinearizeTimestamp, AdapterError>
Do some simple validation. We must defer most of it until after any off-thread work.
sourceasync fn peek_stage_linearize_timestamp(
&mut self,
ctx: ExecuteContext,
root_otel_ctx: OpenTelemetryContext,
__arg3: PeekStageLinearizeTimestamp
)
async fn peek_stage_linearize_timestamp( &mut self, ctx: ExecuteContext, root_otel_ctx: OpenTelemetryContext, __arg3: PeekStageLinearizeTimestamp )
Possibly linearize a timestamp from a TimestampOracle
.
sourceasync fn peek_stage_timestamp_read_hold(
&mut self,
session: &mut Session,
__arg2: PeekStageTimestampReadHold
) -> Result<PeekStageOptimize, AdapterError>
async fn peek_stage_timestamp_read_hold( &mut self, session: &mut Session, __arg2: PeekStageTimestampReadHold ) -> Result<PeekStageOptimize, AdapterError>
Determine a read timestamp and create appropriate read holds.
async fn peek_stage_optimize( &mut self, ctx: ExecuteContext, root_otel_ctx: OpenTelemetryContext, __arg3: PeekStageOptimize )
fn peek_stage_real_time_recency( &mut self, ctx: ExecuteContext, root_otel_ctx: OpenTelemetryContext, _: PeekStageRealTimeRecency ) -> Option<(ExecuteContext, PeekStageTimestampReadHold)>
async fn peek_stage_finish( &mut self, ctx: &mut ExecuteContext, __arg2: PeekStageFinish ) -> Result<ExecuteResponse, AdapterError>
async fn peek_stage_copy_to_dataflow( &mut self, ctx: ExecuteContext, __arg2: PeekStageCopyTo )
fn peek_stage_explain_plan( &mut self, ctx: &mut ExecuteContext, _: PeekStageExplainPlan ) -> Result<ExecuteResponse, AdapterError>
sourcepub(super) async fn sequence_peek_timestamp(
&mut self,
session: &mut Session,
when: &QueryWhen,
cluster_id: ClusterId,
timeline_context: TimelineContext,
oracle_read_ts: Option<Timestamp>,
source_bundle: &CollectionIdBundle,
source_ids: &BTreeSet<GlobalId>,
real_time_recency_ts: Option<Timestamp>,
requires_linearization: RequireLinearization
) -> Result<TimestampDetermination<Timestamp>, AdapterError>
pub(super) async fn sequence_peek_timestamp( &mut self, session: &mut Session, when: &QueryWhen, cluster_id: ClusterId, timeline_context: TimelineContext, oracle_read_ts: Option<Timestamp>, source_bundle: &CollectionIdBundle, source_ids: &BTreeSet<GlobalId>, real_time_recency_ts: Option<Timestamp>, requires_linearization: RequireLinearization ) -> Result<TimestampDetermination<Timestamp>, AdapterError>
Determines the query timestamp and acquires read holds on dependent sources if necessary.
source§impl Coordinator
impl Coordinator
pub(crate) async fn sequence_subscribe( &mut self, ctx: ExecuteContext, plan: SubscribePlan, target_cluster: TargetCluster )
fn subscribe_validate( &mut self, session: &mut Session, plan: SubscribePlan, target_cluster: TargetCluster ) -> Result<SubscribeStage, AdapterError>
fn subscribe_optimize_mir( &mut self, session: &Session, _: SubscribeOptimizeMir ) -> Result<StageResult<Box<SubscribeStage>>, AdapterError>
async fn subscribe_timestamp_optimize_lir( &mut self, ctx: &ExecuteContext, __arg2: SubscribeTimestampOptimizeLir ) -> Result<StageResult<Box<SubscribeStage>>, AdapterError>
async fn subscribe_finish( &mut self, ctx: &mut ExecuteContext, __arg2: SubscribeFinish ) -> Result<StageResult<Box<SubscribeStage>>, AdapterError>
source§impl Coordinator
impl Coordinator
sourcepub(crate) async fn sequence_staged<S: Staged + 'static>(
&mut self,
ctx: ExecuteContext,
parent_span: Span,
stage: S
)
pub(crate) async fn sequence_staged<S: Staged + 'static>( &mut self, ctx: ExecuteContext, parent_span: Span, stage: S )
async fn create_source_inner( &mut self, session: &Session, plans: Vec<CreateSourcePlans> ) -> Result<CreateSourceInner, AdapterError>
pub(super) async fn sequence_create_source( &mut self, session: &mut Session, plans: Vec<CreateSourcePlans> ) -> Result<ExecuteResponse, AdapterError>
pub(super) async fn sequence_create_connection( &mut self, ctx: ExecuteContext, plan: CreateConnectionPlan, resolved_ids: ResolvedIds )
pub(crate) async fn sequence_create_connection_stage_finish( &mut self, session: &mut Session, connection_gid: GlobalId, plan: CreateConnectionPlan, resolved_ids: ResolvedIds ) -> Result<ExecuteResponse, AdapterError>
pub(super) async fn sequence_create_database( &mut self, session: &mut Session, plan: CreateDatabasePlan ) -> Result<ExecuteResponse, AdapterError>
pub(super) async fn sequence_create_schema( &mut self, session: &mut Session, plan: CreateSchemaPlan ) -> Result<ExecuteResponse, AdapterError>
pub(super) async fn sequence_create_role( &mut self, conn_id: Option<&ConnectionId>, __arg2: CreateRolePlan ) -> Result<ExecuteResponse, AdapterError>
pub(super) async fn sequence_create_table( &mut self, ctx: &mut ExecuteContext, plan: CreateTablePlan, resolved_ids: ResolvedIds ) -> Result<ExecuteResponse, AdapterError>
pub(super) async fn sequence_create_secret( &mut self, session: &mut Session, plan: CreateSecretPlan ) -> Result<ExecuteResponse, AdapterError>
pub(super) async fn sequence_create_sink( &mut self, ctx: ExecuteContext, plan: CreateSinkPlan, resolved_ids: ResolvedIds )
sourcepub(super) fn validate_system_column_references(
&self,
uses_ambiguous_columns: bool,
depends_on: &BTreeSet<GlobalId>
) -> Result<(), AdapterError>
pub(super) fn validate_system_column_references( &self, uses_ambiguous_columns: bool, depends_on: &BTreeSet<GlobalId> ) -> Result<(), AdapterError>
Validates that a view definition does not contain any expressions that may lead to
ambiguous column references to system tables. For example NATURAL JOIN
or SELECT *
.
We prevent these expressions so that we can add columns to system tables without changing the definition of the view.
Here is a bit of a hand wavy proof as to why we only need to check the immediate view definition for system objects and ambiguous column references, and not the entire dependency tree:
- A view with no object references cannot have any ambiguous column references to a system object, because it has no system objects.
- A view with a direct reference to a system object and a * or NATURAL JOIN will be rejected due to ambiguous column references.
- A view with system objects but no * or NATURAL JOINs cannot have any ambiguous column references to a system object, because all column references are explicitly named.
- A view with * or NATURAL JOINs, that doesn’t directly reference a system object cannot have any ambiguous column references to a system object, because there are no system objects in the top level view and all sub-views are guaranteed to have no ambiguous column references to system objects.
pub(super) async fn sequence_create_type( &mut self, session: &Session, plan: CreateTypePlan, resolved_ids: ResolvedIds ) -> Result<ExecuteResponse, AdapterError>
pub(super) async fn sequence_comment_on( &mut self, session: &Session, plan: CommentPlan ) -> Result<ExecuteResponse, AdapterError>
pub(super) async fn sequence_drop_objects( &mut self, session: &Session, __arg2: DropObjectsPlan ) -> Result<ExecuteResponse, AdapterError>
fn validate_dropped_role_ownership( &self, session: &Session, dropped_roles: &BTreeMap<RoleId, &str> ) -> Result<(), AdapterError>
pub(super) async fn sequence_drop_owned( &mut self, session: &Session, plan: DropOwnedPlan ) -> Result<ExecuteResponse, AdapterError>
fn sequence_drop_common( &self, session: &Session, ids: Vec<ObjectId> ) -> Result<DropOps, AdapterError>
pub(super) fn sequence_explain_schema( &mut self, _: ExplainSinkSchemaPlan ) -> Result<ExecuteResponse, AdapterError>
pub(super) fn sequence_show_all_variables( &mut self, session: &Session ) -> Result<ExecuteResponse, AdapterError>
pub(super) fn sequence_show_variable( &mut self, session: &Session, plan: ShowVariablePlan ) -> Result<ExecuteResponse, AdapterError>
pub(super) async fn sequence_inspect_shard( &self, session: &Session, plan: InspectShardPlan ) -> Result<ExecuteResponse, AdapterError>
pub(super) fn sequence_set_variable( &self, session: &mut Session, plan: SetVariablePlan ) -> Result<ExecuteResponse, AdapterError>
pub(super) fn sequence_reset_variable( &self, session: &mut Session, plan: ResetVariablePlan ) -> Result<ExecuteResponse, AdapterError>
pub(super) fn sequence_set_transaction( &self, session: &mut Session, plan: SetTransactionPlan ) -> Result<ExecuteResponse, AdapterError>
fn validate_set_isolation_level( &self, session: &Session ) -> Result<(), AdapterError>
fn validate_set_cluster(&self, session: &Session) -> Result<(), AdapterError>
pub(super) async fn sequence_end_transaction( &mut self, ctx: ExecuteContext, action: EndTransactionAction )
async fn sequence_end_transaction_inner( &mut self, session: &mut Session, action: EndTransactionAction ) -> Result<(Option<TransactionOps<Timestamp>>, Option<OwnedMutexGuard<()>>), AdapterError>
pub(super) async fn sequence_side_effecting_func( &mut self, ctx: ExecuteContext, plan: SideEffectingFunc )
sourcepub(super) fn recent_timestamp(
&self,
session: &Session,
source_ids: impl Iterator<Item = GlobalId>
) -> Option<BoxFuture<'static, Timestamp>>
pub(super) fn recent_timestamp( &self, session: &Session, source_ids: impl Iterator<Item = GlobalId> ) -> Option<BoxFuture<'static, Timestamp>>
Checks to see if the session needs a real time recency timestamp and if so returns a future that will return the timestamp.
pub(super) async fn sequence_explain_plan( &mut self, ctx: ExecuteContext, plan: ExplainPlanPlan, target_cluster: TargetCluster )
fn explain_materialized_view( &mut self, ctx: &ExecuteContext, _: ExplainPlanPlan ) -> Result<ExecuteResponse, AdapterError>
fn explain_index( &mut self, ctx: &ExecuteContext, _: ExplainPlanPlan ) -> Result<ExecuteResponse, AdapterError>
pub async fn sequence_explain_timestamp( &mut self, ctx: ExecuteContext, plan: ExplainTimestampPlan, target_cluster: TargetCluster )
fn sequence_explain_timestamp_begin_inner( &mut self, session: &Session, plan: ExplainTimestampPlan, target_cluster: TargetCluster ) -> Result<(ExplainFormat, BTreeSet<GlobalId>, OptimizedMirRelationExpr, ClusterId, CollectionIdBundle), AdapterError>
pub(crate) fn explain_timestamp( &self, session: &Session, cluster_id: ClusterId, id_bundle: &CollectionIdBundle, determination: TimestampDetermination<Timestamp> ) -> TimestampExplanation<Timestamp>
pub(super) async fn sequence_explain_timestamp_finish_inner( &mut self, session: &mut Session, format: ExplainFormat, cluster_id: ClusterId, source: OptimizedMirRelationExpr, id_bundle: CollectionIdBundle, when: QueryWhen, real_time_recency_ts: Option<Timestamp> ) -> Result<ExecuteResponse, AdapterError>
pub(super) async fn sequence_insert( &mut self, ctx: ExecuteContext, plan: InsertPlan )
sourcepub(super) async fn sequence_read_then_write(
&mut self,
ctx: ExecuteContext,
plan: ReadThenWritePlan
)
pub(super) async fn sequence_read_then_write( &mut self, ctx: ExecuteContext, plan: ReadThenWritePlan )
ReadThenWrite is a plan whose writes depend on the results of a read. This works by doing a Peek then queuing a SendDiffs. No writes or read-then-writes can occur between the Peek and SendDiff otherwise a serializability violation could occur.
pub(super) async fn sequence_alter_item_rename( &mut self, session: &mut Session, plan: AlterItemRenamePlan ) -> Result<ExecuteResponse, AdapterError>
pub(super) async fn sequence_alter_schema_rename( &mut self, session: &mut Session, plan: AlterSchemaRenamePlan ) -> Result<ExecuteResponse, AdapterError>
pub(super) async fn sequence_alter_schema_swap( &mut self, session: &mut Session, plan: AlterSchemaSwapPlan ) -> Result<ExecuteResponse, AdapterError>
pub(super) fn sequence_alter_index_set_options( &mut self, plan: AlterIndexSetOptionsPlan ) -> Result<ExecuteResponse, AdapterError>
pub(super) fn sequence_alter_index_reset_options( &mut self, plan: AlterIndexResetOptionsPlan ) -> Result<ExecuteResponse, AdapterError>
pub(super) fn set_index_compaction_window( &mut self, id: GlobalId, window: CompactionWindow ) -> Result<(), AdapterError>
pub(super) async fn sequence_alter_role( &mut self, session: &Session, __arg2: AlterRolePlan ) -> Result<ExecuteResponse, AdapterError>
pub(super) async fn sequence_alter_secret( &mut self, session: &Session, plan: AlterSecretPlan ) -> Result<ExecuteResponse, AdapterError>
pub(super) async fn sequence_alter_connection( &mut self, ctx: ExecuteContext, __arg2: AlterConnectionPlan )
async fn sequence_rotate_keys( &mut self, session: &Session, id: GlobalId ) -> Result<ExecuteResponse, AdapterError>
async fn sequence_alter_connection_options( &mut self, ctx: ExecuteContext, id: GlobalId, set_options: BTreeMap<ConnectionOptionName, Option<WithOptionValue<Aug>>>, drop_options: BTreeSet<ConnectionOptionName>, validate: bool )
pub(crate) async fn sequence_alter_connection_stage_finish( &mut self, session: &mut Session, id: GlobalId, connection: Connection ) -> Result<ExecuteResponse, AdapterError>
pub(super) async fn sequence_alter_source( &mut self, session: &Session, __arg2: AlterSourcePlan, to_create_subsources: Vec<CreateSourcePlans> ) -> Result<ExecuteResponse, AdapterError>
fn extract_secret( &mut self, session: &Session, secret_as: &mut MirScalarExpr ) -> Result<Vec<u8>, AdapterError>
pub(super) async fn sequence_alter_system_set( &mut self, session: &Session, __arg2: AlterSystemSetPlan ) -> Result<ExecuteResponse, AdapterError>
pub(super) async fn sequence_alter_system_reset( &mut self, session: &Session, __arg2: AlterSystemResetPlan ) -> Result<ExecuteResponse, AdapterError>
pub(super) async fn sequence_alter_system_reset_all( &mut self, session: &Session, _: AlterSystemResetAllPlan ) -> Result<ExecuteResponse, AdapterError>
fn is_user_allowed_to_alter_system( &self, session: &Session, var_name: Option<&str> ) -> Result<(), AdapterError>
pub(super) fn sequence_execute( &mut self, session: &mut Session, plan: ExecutePlan ) -> Result<String, AdapterError>
pub(super) async fn sequence_grant_privileges( &mut self, session: &Session, __arg2: GrantPrivilegesPlan ) -> Result<ExecuteResponse, AdapterError>
pub(super) async fn sequence_revoke_privileges( &mut self, session: &Session, __arg2: RevokePrivilegesPlan ) -> Result<ExecuteResponse, AdapterError>
async fn sequence_update_privileges( &mut self, session: &Session, update_privileges: Vec<UpdatePrivilege>, grantees: Vec<RoleId>, variant: UpdatePrivilegeVariant ) -> Result<ExecuteResponse, AdapterError>
pub(super) async fn sequence_alter_default_privileges( &mut self, session: &Session, __arg2: AlterDefaultPrivilegesPlan ) -> Result<ExecuteResponse, AdapterError>
pub(super) async fn sequence_grant_role( &mut self, session: &Session, __arg2: GrantRolePlan ) -> Result<ExecuteResponse, AdapterError>
pub(super) async fn sequence_revoke_role( &mut self, session: &Session, __arg2: RevokeRolePlan ) -> Result<ExecuteResponse, AdapterError>
pub(super) async fn sequence_alter_owner( &mut self, session: &Session, __arg2: AlterOwnerPlan ) -> Result<ExecuteResponse, AdapterError>
pub(super) async fn sequence_reassign_owned( &mut self, session: &Session, __arg2: ReassignOwnedPlan ) -> Result<ExecuteResponse, AdapterError>
source§impl Coordinator
impl Coordinator
pub(super) async fn statistics_oracle( &self, session: &Session, source_ids: &BTreeSet<GlobalId>, query_as_of: &Antichain<Timestamp>, is_oneshot: bool ) -> Result<Box<dyn StatisticsOracle>, AdapterError>
source§impl Coordinator
impl Coordinator
sourcepub(super) fn emit_optimizer_notices(
&mut self,
session: &Session,
notices: &Vec<RawOptimizerNotice>
)
pub(super) fn emit_optimizer_notices( &mut self, session: &Session, notices: &Vec<RawOptimizerNotice> )
Forward notices that we got from the optimizer.
source§impl Coordinator
impl Coordinator
sourcepub(crate) fn sequence_plan(
&mut self,
ctx: ExecuteContext,
plan: Plan,
resolved_ids: ResolvedIds
) -> LocalBoxFuture<'_, ()>
pub(crate) fn sequence_plan( &mut self, ctx: ExecuteContext, plan: Plan, resolved_ids: ResolvedIds ) -> LocalBoxFuture<'_, ()>
BOXED FUTURE: As of Nov 2023 the returned Future from this function was 34KB. This would get stored on the stack which is bad for runtime performance, and blow up our stack usage. Because of that we purposefully move this Future onto the heap (i.e. Box it).
pub(crate) async fn sequence_execute_single_statement_transaction( &mut self, ctx: ExecuteContext, stmt: Arc<Statement<Raw>>, params: Params )
sourcepub(crate) async fn sequence_create_role_for_startup(
&mut self,
plan: CreateRolePlan
) -> Result<ExecuteResponse, AdapterError>
pub(crate) async fn sequence_create_role_for_startup( &mut self, plan: CreateRolePlan ) -> Result<ExecuteResponse, AdapterError>
Creates a role during connection startup.
This should not be called from anywhere except connection startup.
pub(crate) async fn sequence_explain_timestamp_finish( &mut self, ctx: &mut ExecuteContext, format: ExplainFormat, cluster_id: ClusterId, optimized_plan: OptimizedMirRelationExpr, id_bundle: CollectionIdBundle, when: QueryWhen, real_time_recency_ts: Option<Timestamp> ) -> Result<ExecuteResponse, AdapterError>
pub(crate) fn allocate_transient_id(&mut self) -> Result<GlobalId, AdapterError>
fn should_emit_rbac_notice(&self, session: &Session) -> Option<AdapterNotice>
pub(crate) fn insert_constant( catalog: &Catalog, session: &mut Session, id: GlobalId, constants: MirRelationExpr ) -> Result<ExecuteResponse, AdapterError>
pub(crate) fn send_diffs( session: &mut Session, plan: SendDiffsPlan ) -> Result<ExecuteResponse, AdapterError>
source§impl Coordinator
impl Coordinator
pub(crate) fn plan_statement( &self, session: &Session, stmt: Statement<Aug>, params: &Params, resolved_ids: &ResolvedIds ) -> Result<Plan, AdapterError>
pub(crate) fn declare( &self, ctx: ExecuteContext, name: String, stmt: Statement<Raw>, sql: String, params: Params )
fn declare_inner( session: &mut Session, catalog: &Catalog, name: String, stmt: Statement<Raw>, sql: String, params: Params, now: EpochMillis ) -> Result<(), AdapterError>
pub(crate) fn describe( catalog: &Catalog, session: &Session, stmt: Option<Statement<Raw>>, param_types: Vec<Option<ScalarType>> ) -> Result<StatementDesc, AdapterError>
sourcepub(crate) fn verify_prepared_statement(
catalog: &Catalog,
session: &mut Session,
name: &str
) -> Result<(), AdapterError>
pub(crate) fn verify_prepared_statement( catalog: &Catalog, session: &mut Session, name: &str ) -> Result<(), AdapterError>
Verify a prepared statement is still valid. This will return an error if the catalog’s revision has changed and the statement now produces a different type than its original.
sourcepub(crate) fn verify_portal(
&self,
session: &mut Session,
name: &str
) -> Result<(), AdapterError>
pub(crate) fn verify_portal( &self, session: &mut Session, name: &str ) -> Result<(), AdapterError>
Verify a portal is still valid.
sourcefn verify_statement_revision(
catalog: &Catalog,
session: &Session,
stmt: Option<&Statement<Raw>>,
desc: &StatementDesc,
catalog_revision: u64
) -> Result<Option<u64>, AdapterError>
fn verify_statement_revision( catalog: &Catalog, session: &Session, stmt: Option<&Statement<Raw>>, desc: &StatementDesc, catalog_revision: u64 ) -> Result<Option<u64>, AdapterError>
If the catalog and portal revisions don’t match, re-describe the statement
and ensure its result type has not changed. Return Some(x)
with the new
(valid) revision if its plan has changed. Return None
if the revisions
match. Return an error if the plan has changed.
sourcepub(crate) async fn clear_transaction(
&mut self,
session: &mut Session
) -> TransactionStatus<Timestamp>
pub(crate) async fn clear_transaction( &mut self, session: &mut Session ) -> TransactionStatus<Timestamp>
Handle removing in-progress transaction state regardless of the end action of the transaction.
sourcepub(crate) async fn clear_connection(&mut self, conn_id: &ConnectionId)
pub(crate) async fn clear_connection(&mut self, conn_id: &ConnectionId)
Clears coordinator state for a connection.
sourcepub(crate) async fn add_active_compute_sink(
&mut self,
id: GlobalId,
active_sink: ActiveComputeSink
) -> BoxFuture<'static, ()>
pub(crate) async fn add_active_compute_sink( &mut self, id: GlobalId, active_sink: ActiveComputeSink ) -> BoxFuture<'static, ()>
Adds coordinator bookkeeping for an active compute sink.
This is a low-level method. The caller is responsible for installing the sink in the controller.
sourcepub(crate) async fn remove_active_compute_sink(
&mut self,
id: GlobalId
) -> Option<ActiveComputeSink>
pub(crate) async fn remove_active_compute_sink( &mut self, id: GlobalId ) -> Option<ActiveComputeSink>
Removes coordinator bookkeeping for an active compute sink.
This is a low-level method. The caller is responsible for dropping the
sink from the controller. Consider calling drop_compute_sink
or
retire_compute_sink
instead.
source§impl Coordinator
impl Coordinator
sourcepub(crate) async fn bootstrap(
&mut self,
builtin_migration_metadata: BuiltinMigrationMetadata,
builtin_table_updates: Vec<BuiltinTableUpdate>
) -> Result<(), AdapterError>
pub(crate) async fn bootstrap( &mut self, builtin_migration_metadata: BuiltinMigrationMetadata, builtin_table_updates: Vec<BuiltinTableUpdate> ) -> Result<(), AdapterError>
Initializes coordinator state based on the contained catalog. Must be
called after creating the coordinator and before calling the
Coordinator::serve
method.
sourceasync fn bootstrap_storage_collections(&mut self)
async fn bootstrap_storage_collections(&mut self)
Initializes all storage collections required by catalog objects in the storage controller.
This method takes care of collection creation, as well as migration of existing collections.
Creating all storage collections in a single create_collections
call, rather than on
demand, is more efficient as it reduces the number of writes to durable storage. It also
allows subsequent bootstrap logic to fetch metadata (such as frontiers) of arbitrary
storage collections, without needing to worry about dependency order.
sourcefn bootstrap_dataflow_plans(
&mut self,
ordered_catalog_entries: &[CatalogEntry]
) -> Result<(), AdapterError>
fn bootstrap_dataflow_plans( &mut self, ordered_catalog_entries: &[CatalogEntry] ) -> Result<(), AdapterError>
Invokes the optimizer on all indexes and materialized views in the catalog and inserts the resulting dataflow plans into the catalog state.
ordered_catalog_entries
must by sorted in dependency order, with dependencies ordered
before their dependants.
This method does not perform timestamp selection for the dataflows, nor does it create them in the compute controller. Both of these steps happen later during bootstrapping.
sourcefn collect_index_dependent_matviews(
&self
) -> BTreeMap<GlobalId, BTreeSet<GlobalId>>
fn collect_index_dependent_matviews( &self ) -> BTreeMap<GlobalId, BTreeSet<GlobalId>>
Collects for each index the materialized views that depend on it, either directly or transitively through other indexes (but not through other MVs).
The returned information is required during coordinator bootstrap for index as-of selection, to ensure that selected as-ofs satisfy the requirements of downstream MVs.
This method expects all dataflow plans to be available, so it must run after
Coordinator::bootstrap_dataflow_plans
.
sourcefn bootstrap_index_as_of(
&self,
dataflow: &DataflowDescription<Plan>,
cluster_id: ComputeInstanceId,
dependent_matviews: BTreeSet<GlobalId>,
compaction_window: CompactionWindow
) -> Antichain<Timestamp>
fn bootstrap_index_as_of( &self, dataflow: &DataflowDescription<Plan>, cluster_id: ComputeInstanceId, dependent_matviews: BTreeSet<GlobalId>, compaction_window: CompactionWindow ) -> Antichain<Timestamp>
Returns an as_of
suitable for bootstrapping the given index dataflow.
sourcefn bootstrap_materialized_view_as_of(
&self,
dataflow: &DataflowDescription<Plan>,
cluster_id: ComputeInstanceId
) -> Antichain<Timestamp>
fn bootstrap_materialized_view_as_of( &self, dataflow: &DataflowDescription<Plan>, cluster_id: ComputeInstanceId ) -> Antichain<Timestamp>
Returns an as_of
suitable for bootstrapping the given materialized view dataflow.
sourcefn serve(
self,
internal_cmd_rx: UnboundedReceiver<Message>,
strict_serializable_reads_rx: UnboundedReceiver<(ConnectionId, PendingReadTxn)>,
cmd_rx: UnboundedReceiver<(OpenTelemetryContext, Command)>,
group_commit_rx: GroupCommitWaiter
) -> LocalBoxFuture<'static, ()>
fn serve( self, internal_cmd_rx: UnboundedReceiver<Message>, strict_serializable_reads_rx: UnboundedReceiver<(ConnectionId, PendingReadTxn)>, cmd_rx: UnboundedReceiver<(OpenTelemetryContext, Command)>, group_commit_rx: GroupCommitWaiter ) -> LocalBoxFuture<'static, ()>
Serves the coordinator, receiving commands from users over cmd_rx
and feedback from dataflow workers over feedback_rx
.
You must call bootstrap
before calling this method.
BOXED FUTURE: As of Nov 2023 the returned Future from this function was 92KB. This would get stored on the stack which is bad for runtime performance, and blow up our stack usage. Because of that we purposefully move this Future onto the heap (i.e. Box it).
sourcefn owned_catalog(&self) -> Arc<Catalog>
fn owned_catalog(&self) -> Arc<Catalog>
Obtain a read-only Catalog snapshot, suitable for giving out to non-Coordinator thread tasks.
sourcefn catalog_mut(&mut self) -> &mut Catalog
fn catalog_mut(&mut self) -> &mut Catalog
Obtain a writeable Catalog reference.
sourcefn connection_context(&self) -> &ConnectionContext
fn connection_context(&self) -> &ConnectionContext
Obtain a reference to the coordinator’s connection context.
sourcefn secrets_reader(&self) -> &dyn SecretsReader
fn secrets_reader(&self) -> &dyn SecretsReader
Obtain a reference to the coordinator’s secret reader.
sourcepub(crate) fn broadcast_notice(&mut self, notice: AdapterNotice)
pub(crate) fn broadcast_notice(&mut self, notice: AdapterNotice)
Publishes a notice message to all sessions.
pub(crate) fn active_conns(&self) -> &BTreeMap<ConnectionId, ConnMeta>
pub(crate) fn retire_execution( &mut self, reason: StatementEndedExecutionReason, ctx_extra: ExecuteContextExtra )
sourcepub fn dataflow_builder(
&self,
instance: ComputeInstanceId
) -> DataflowBuilder<'_>
pub fn dataflow_builder( &self, instance: ComputeInstanceId ) -> DataflowBuilder<'_>
Creates a new dataflow builder from the catalog and indexes in self
.
sourcepub fn instance_snapshot(
&self,
id: ComputeInstanceId
) -> Result<ComputeInstanceSnapshot, InstanceMissing>
pub fn instance_snapshot( &self, id: ComputeInstanceId ) -> Result<ComputeInstanceSnapshot, InstanceMissing>
Return a reference-less snapshot to the indicated compute instance.
sourcepub(crate) async fn ship_dataflow(
&mut self,
dataflow: DataflowDescription<Plan>,
instance: ComputeInstanceId
)
pub(crate) async fn ship_dataflow( &mut self, dataflow: DataflowDescription<Plan>, instance: ComputeInstanceId )
Call into the compute controller to install a finalized dataflow, and initialize the read policies for its exported readable objects.
Trait Implementations§
source§impl Debug for Coordinator
impl Debug for Coordinator
source§impl TimestampProvider for Coordinator
impl TimestampProvider for Coordinator
source§fn compute_read_frontier<'a>(
&'a self,
instance: ComputeInstanceId,
id: GlobalId
) -> AntichainRef<'a, Timestamp>
fn compute_read_frontier<'a>( &'a self, instance: ComputeInstanceId, id: GlobalId ) -> AntichainRef<'a, Timestamp>
Reports a collection’s current read frontier.
source§fn compute_read_capability<'a>(
&'a self,
instance: ComputeInstanceId,
id: GlobalId
) -> &'a Antichain<Timestamp>
fn compute_read_capability<'a>( &'a self, instance: ComputeInstanceId, id: GlobalId ) -> &'a Antichain<Timestamp>
Reports a collection’s current read capability.
source§fn compute_write_frontier<'a>(
&'a self,
instance: ComputeInstanceId,
id: GlobalId
) -> AntichainRef<'a, Timestamp>
fn compute_write_frontier<'a>( &'a self, instance: ComputeInstanceId, id: GlobalId ) -> AntichainRef<'a, Timestamp>
Reports a collection’s current write frontier.
source§fn storage_read_capabilities<'a>(
&'a self,
id: GlobalId
) -> AntichainRef<'a, Timestamp>
fn storage_read_capabilities<'a>( &'a self, id: GlobalId ) -> AntichainRef<'a, Timestamp>
Accumulation of read capabilities for the collection.
source§fn storage_implied_capability<'a>(
&'a self,
id: GlobalId
) -> &'a Antichain<Timestamp>
fn storage_implied_capability<'a>( &'a self, id: GlobalId ) -> &'a Antichain<Timestamp>
The implicit capability associated with collection creation.
source§fn storage_write_frontier<'a>(
&'a self,
id: GlobalId
) -> &'a Antichain<Timestamp>
fn storage_write_frontier<'a>( &'a self, id: GlobalId ) -> &'a Antichain<Timestamp>
Reported write frontier.
fn get_timeline(timeline_context: &TimelineContext) -> Option<Timeline>
source§fn get_linearized_timeline(
isolation_level: &IsolationLevel,
when: &QueryWhen,
timeline_context: &TimelineContext
) -> Option<Timeline>
fn get_linearized_timeline( isolation_level: &IsolationLevel, when: &QueryWhen, timeline_context: &TimelineContext ) -> Option<Timeline>
Timeline
whose timestamp oracle we have to use to get a
linearized read timestamp, iff linearization is needed.source§fn determine_timestamp_for<'life0, 'life1, 'life2, 'life3, 'life4, 'life5, 'life6, 'async_trait>(
&'life0 self,
catalog: &'life1 CatalogState,
session: &'life2 Session,
id_bundle: &'life3 CollectionIdBundle,
when: &'life4 QueryWhen,
compute_instance: ComputeInstanceId,
timeline_context: &'life5 TimelineContext,
oracle_read_ts: Option<Timestamp>,
real_time_recency_ts: Option<Timestamp>,
isolation_level: &'life6 IsolationLevel
) -> Pin<Box<dyn Future<Output = Result<TimestampDetermination<Timestamp>, AdapterError>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
'life3: 'async_trait,
'life4: 'async_trait,
'life5: 'async_trait,
'life6: 'async_trait,
fn determine_timestamp_for<'life0, 'life1, 'life2, 'life3, 'life4, 'life5, 'life6, 'async_trait>( &'life0 self, catalog: &'life1 CatalogState, session: &'life2 Session, id_bundle: &'life3 CollectionIdBundle, when: &'life4 QueryWhen, compute_instance: ComputeInstanceId, timeline_context: &'life5 TimelineContext, oracle_read_ts: Option<Timestamp>, real_time_recency_ts: Option<Timestamp>, isolation_level: &'life6 IsolationLevel ) -> Pin<Box<dyn Future<Output = Result<TimestampDetermination<Timestamp>, AdapterError>> + 'async_trait>>where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait, 'life3: 'async_trait, 'life4: 'async_trait, 'life5: 'async_trait, 'life6: 'async_trait,
source§fn least_valid_read(
&self,
id_bundle: &CollectionIdBundle
) -> Antichain<Timestamp>
fn least_valid_read( &self, id_bundle: &CollectionIdBundle ) -> Antichain<Timestamp>
source§fn least_valid_write(
&self,
id_bundle: &CollectionIdBundle
) -> Antichain<Timestamp>
fn least_valid_write( &self, id_bundle: &CollectionIdBundle ) -> Antichain<Timestamp>
source§fn greatest_available_read(
&self,
id_bundle: &CollectionIdBundle
) -> Antichain<Timestamp>
fn greatest_available_read( &self, id_bundle: &CollectionIdBundle ) -> Antichain<Timestamp>
least_valid_write
- 1, i.e., each time in least_valid_write
stepped back in a
saturating way.fn generate_timestamp_not_valid_error_msg( &self, id_bundle: &CollectionIdBundle, compute_instance: ComputeInstanceId, candidate: Timestamp ) -> String
Auto Trait Implementations§
impl !RefUnwindSafe for Coordinator
impl !Send for Coordinator
impl !Sync for Coordinator
impl Unpin for Coordinator
impl !UnwindSafe for Coordinator
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
source§impl<T> FmtForward for T
impl<T> FmtForward for T
source§fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
fn fmt_binary(self) -> FmtBinary<Self>where Self: Binary,
self
to use its Binary
implementation when Debug
-formatted.source§fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
fn fmt_display(self) -> FmtDisplay<Self>where Self: Display,
self
to use its Display
implementation when
Debug
-formatted.source§fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
fn fmt_lower_exp(self) -> FmtLowerExp<Self>where Self: LowerExp,
self
to use its LowerExp
implementation when
Debug
-formatted.source§fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
fn fmt_lower_hex(self) -> FmtLowerHex<Self>where Self: LowerHex,
self
to use its LowerHex
implementation when
Debug
-formatted.source§fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
fn fmt_octal(self) -> FmtOctal<Self>where Self: Octal,
self
to use its Octal
implementation when Debug
-formatted.source§fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
fn fmt_pointer(self) -> FmtPointer<Self>where Self: Pointer,
self
to use its Pointer
implementation when
Debug
-formatted.source§fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
fn fmt_upper_exp(self) -> FmtUpperExp<Self>where Self: UpperExp,
self
to use its UpperExp
implementation when
Debug
-formatted.source§fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
fn fmt_upper_hex(self) -> FmtUpperHex<Self>where Self: UpperHex,
self
to use its UpperHex
implementation when
Debug
-formatted.source§impl<T> FutureExt for T
impl<T> FutureExt for T
source§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
source§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
source§impl<T> Instrument for T
impl<T> Instrument for T
source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T
in a tonic::Request
source§impl<T> Pipe for Twhere
T: ?Sized,
impl<T> Pipe for Twhere T: ?Sized,
source§fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere Self: Sized,
source§fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere R: 'a,
self
and passes that borrow into the pipe function. Read moresource§fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere R: 'a,
self
and passes that borrow into the pipe function. Read moresource§fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> Rwhere
Self: Borrow<B>,
B: 'a + ?Sized,
R: 'a,
fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> Rwhere Self: Borrow<B>, B: 'a + ?Sized, R: 'a,
source§fn pipe_borrow_mut<'a, B, R>(
&'a mut self,
func: impl FnOnce(&'a mut B) -> R
) -> Rwhere
Self: BorrowMut<B>,
B: 'a + ?Sized,
R: 'a,
fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R ) -> Rwhere Self: BorrowMut<B>, B: 'a + ?Sized, R: 'a,
source§fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> Rwhere
Self: AsRef<U>,
U: 'a + ?Sized,
R: 'a,
fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> Rwhere Self: AsRef<U>, U: 'a + ?Sized, R: 'a,
self
, then passes self.as_ref()
into the pipe function.source§fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> Rwhere
Self: AsMut<U>,
U: 'a + ?Sized,
R: 'a,
fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> Rwhere Self: AsMut<U>, U: 'a + ?Sized, R: 'a,
self
, then passes self.as_mut()
into the pipe
function.source§impl<T> Pointable for T
impl<T> Pointable for T
source§impl<P, R> ProtoType<R> for Pwhere
R: RustType<P>,
impl<P, R> ProtoType<R> for Pwhere R: RustType<P>,
source§fn into_rust(self) -> Result<R, TryFromProtoError>
fn into_rust(self) -> Result<R, TryFromProtoError>
RustType::from_proto
.source§fn from_rust(rust: &R) -> P
fn from_rust(rust: &R) -> P
RustType::into_proto
.source§impl<T> Tap for T
impl<T> Tap for T
source§fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Selfwhere
Self: Borrow<B>,
B: ?Sized,
fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Selfwhere Self: Borrow<B>, B: ?Sized,
Borrow<B>
of a value. Read moresource§fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Selfwhere
Self: BorrowMut<B>,
B: ?Sized,
fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Selfwhere Self: BorrowMut<B>, B: ?Sized,
BorrowMut<B>
of a value. Read moresource§fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Selfwhere
Self: AsRef<R>,
R: ?Sized,
fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Selfwhere Self: AsRef<R>, R: ?Sized,
AsRef<R>
view of a value. Read moresource§fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Selfwhere
Self: AsMut<R>,
R: ?Sized,
fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Selfwhere Self: AsMut<R>, R: ?Sized,
AsMut<R>
view of a value. Read moresource§fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Selfwhere
Self: Deref<Target = T>,
T: ?Sized,
fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Selfwhere Self: Deref<Target = T>, T: ?Sized,
Deref::Target
of a value. Read moresource§fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Selfwhere
Self: DerefMut<Target = T> + Deref,
T: ?Sized,
fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Selfwhere Self: DerefMut<Target = T> + Deref, T: ?Sized,
Deref::Target
of a value. Read moresource§fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
.tap()
only in debug builds, and is erased in release builds.source§fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
.tap_mut()
only in debug builds, and is erased in release
builds.source§fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Selfwhere
Self: Borrow<B>,
B: ?Sized,
fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Selfwhere Self: Borrow<B>, B: ?Sized,
.tap_borrow()
only in debug builds, and is erased in release
builds.source§fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Selfwhere
Self: BorrowMut<B>,
B: ?Sized,
fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Selfwhere Self: BorrowMut<B>, B: ?Sized,
.tap_borrow_mut()
only in debug builds, and is erased in release
builds.source§fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Selfwhere
Self: AsRef<R>,
R: ?Sized,
fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Selfwhere Self: AsRef<R>, R: ?Sized,
.tap_ref()
only in debug builds, and is erased in release
builds.source§fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Selfwhere
Self: AsMut<R>,
R: ?Sized,
fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Selfwhere Self: AsMut<R>, R: ?Sized,
.tap_ref_mut()
only in debug builds, and is erased in release
builds.