pub(super) struct Instance<T: ComputeControllerTimestamp> {Show 24 fields
build_info: &'static BuildInfo,
storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
initialized: bool,
read_only: bool,
workload_class: Option<String>,
replicas: BTreeMap<ReplicaId, ReplicaState<T>>,
collections: BTreeMap<GlobalId, CollectionState<T>>,
log_sources: BTreeMap<LogVariant, GlobalId>,
peeks: BTreeMap<Uuid, PendingPeek<T>>,
subscribes: BTreeMap<GlobalId, ActiveSubscribe<T>>,
copy_tos: BTreeSet<GlobalId>,
history: ComputeCommandHistory<DeleteOnDropGauge<AtomicU64, Vec<String>>, T>,
command_rx: UnboundedReceiver<Box<dyn FnOnce(&mut Instance<T>) + Send>>,
response_tx: UnboundedSender<ComputeControllerResponse<T>>,
introspection_tx: UnboundedSender<(IntrospectionType, Vec<(Row, Diff)>)>,
metrics: InstanceMetrics,
dyncfg: Arc<ConfigSet>,
peek_stash_persist_location: PersistLocation,
now: NowFn,
wallclock_lag: WallclockLagFn<T>,
wallclock_lag_last_recorded: DateTime<Utc>,
read_hold_tx: ChangeTx<T>,
replica_tx: InstrumentedUnboundedSender<(ReplicaId, u64, ComputeResponse<T>), DeleteOnDropCounter<AtomicU64, Vec<String>>>,
replica_rx: InstrumentedUnboundedReceiver<(ReplicaId, u64, ComputeResponse<T>), DeleteOnDropCounter<AtomicU64, Vec<String>>>,
}Expand description
The state we keep for a compute instance.
Fields§
§build_info: &'static BuildInfoBuild info for spawning replicas
storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>A handle providing access to storage collections.
initialized: boolWhether instance initialization has been completed.
read_only: boolWhether or not this instance is in read-only mode.
When in read-only mode, neither the controller nor the instances controlled by it are allowed to affect changes to external systems (largely persist).
workload_class: Option<String>The workload class of this instance.
This is currently only used to annotate metrics.
replicas: BTreeMap<ReplicaId, ReplicaState<T>>The replicas of this compute instance.
collections: BTreeMap<GlobalId, CollectionState<T>>Currently installed compute collections.
New entries are added for all collections exported from dataflows created through
Instance::create_dataflow.
Entries are removed by Instance::cleanup_collections. See that method’s documentation
about the conditions for removing collection state.
log_sources: BTreeMap<LogVariant, GlobalId>IDs of log sources maintained by this compute instance.
peeks: BTreeMap<Uuid, PendingPeek<T>>Currently outstanding peeks.
New entries are added for all peeks initiated through Instance::peek.
The entry for a peek is only removed once all replicas have responded to the peek. This is currently required to ensure all replicas have stopped reading from the peeked collection’s inputs before we allow them to compact. database-issues#4822 tracks changing this so we only have to wait for the first peek response.
subscribes: BTreeMap<GlobalId, ActiveSubscribe<T>>Currently in-progress subscribes.
New entries are added for all subscribes exported from dataflows created through
Instance::create_dataflow.
The entry for a subscribe is removed once at least one replica has reported the subscribe to have advanced to the empty frontier or to have been dropped, implying that no further updates will be emitted for this subscribe.
Note that subscribes are tracked both in collections and subscribes. collections
keeps track of the subscribe’s upper and since frontiers and ensures appropriate read holds
on the subscribe’s input. subscribes is only used to track which updates have been
emitted, to decide if new ones should be emitted or suppressed.
copy_tos: BTreeSet<GlobalId>Tracks all in-progress COPY TOs.
New entries are added for all s3 oneshot sinks (corresponding to a COPY TO) exported from
dataflows created through Instance::create_dataflow.
The entry for a copy to is removed once at least one replica has finished or the exporting collection is dropped.
history: ComputeCommandHistory<DeleteOnDropGauge<AtomicU64, Vec<String>>, T>The command history, used when introducing new replicas or restarting existing replicas.
command_rx: UnboundedReceiver<Box<dyn FnOnce(&mut Instance<T>) + Send>>Receiver for commands to be executed.
response_tx: UnboundedSender<ComputeControllerResponse<T>>Sender for responses to be delivered.
introspection_tx: UnboundedSender<(IntrospectionType, Vec<(Row, Diff)>)>Sender for introspection updates to be recorded.
metrics: InstanceMetricsThe registry the controller uses to report metrics.
dyncfg: Arc<ConfigSet>Dynamic system configuration.
peek_stash_persist_location: PersistLocationThe persist location where we can stash large peek results.
now: NowFnA function that produces the current wallclock time.
wallclock_lag: WallclockLagFn<T>A function that computes the lag between the given time and wallclock time.
wallclock_lag_last_recorded: DateTime<Utc>The last time wallclock lag introspection was recorded.
read_hold_tx: ChangeTx<T>Sender for updates to collection read holds.
Copies of this sender are given to ReadHolds that are created in
CollectionState::new.
replica_tx: InstrumentedUnboundedSender<(ReplicaId, u64, ComputeResponse<T>), DeleteOnDropCounter<AtomicU64, Vec<String>>>A sender for responses from replicas.
replica_rx: InstrumentedUnboundedReceiver<(ReplicaId, u64, ComputeResponse<T>), DeleteOnDropCounter<AtomicU64, Vec<String>>>A receiver for responses from replicas.
Implementations§
Source§impl<T: ComputeControllerTimestamp> Instance<T>
impl<T: ComputeControllerTimestamp> Instance<T>
Sourcefn collection(
&self,
id: GlobalId,
) -> Result<&CollectionState<T>, CollectionMissing>
fn collection( &self, id: GlobalId, ) -> Result<&CollectionState<T>, CollectionMissing>
Acquire a handle to the collection state associated with id.
Sourcefn collection_mut(
&mut self,
id: GlobalId,
) -> Result<&mut CollectionState<T>, CollectionMissing>
fn collection_mut( &mut self, id: GlobalId, ) -> Result<&mut CollectionState<T>, CollectionMissing>
Acquire a mutable handle to the collection state associated with id.
Sourcefn expect_collection(&self, id: GlobalId) -> &CollectionState<T>
fn expect_collection(&self, id: GlobalId) -> &CollectionState<T>
Acquire a handle to the collection state associated with id.
§Panics
Panics if the identified collection does not exist.
Sourcefn expect_collection_mut(&mut self, id: GlobalId) -> &mut CollectionState<T>
fn expect_collection_mut(&mut self, id: GlobalId) -> &mut CollectionState<T>
Acquire a mutable handle to the collection state associated with id.
§Panics
Panics if the identified collection does not exist.
fn collections_iter( &self, ) -> impl Iterator<Item = (GlobalId, &CollectionState<T>)>
Sourcefn add_collection(
&mut self,
id: GlobalId,
as_of: Antichain<T>,
shared: SharedCollectionState<T>,
storage_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
compute_dependencies: BTreeMap<GlobalId, ReadHold<T>>,
replica_input_read_holds: Vec<ReadHold<T>>,
write_only: bool,
storage_sink: bool,
initial_as_of: Option<Antichain<T>>,
refresh_schedule: Option<RefreshSchedule>,
)
fn add_collection( &mut self, id: GlobalId, as_of: Antichain<T>, shared: SharedCollectionState<T>, storage_dependencies: BTreeMap<GlobalId, ReadHold<T>>, compute_dependencies: BTreeMap<GlobalId, ReadHold<T>>, replica_input_read_holds: Vec<ReadHold<T>>, write_only: bool, storage_sink: bool, initial_as_of: Option<Antichain<T>>, refresh_schedule: Option<RefreshSchedule>, )
Add a collection to the instance state.
§Panics
Panics if a collection with the same ID exists already.
fn remove_collection(&mut self, id: GlobalId)
fn add_replica_state( &mut self, id: ReplicaId, client: ReplicaClient<T>, config: ReplicaConfig, epoch: u64, )
Sourcefn deliver_response(&self, response: ComputeControllerResponse<T>)
fn deliver_response(&self, response: ComputeControllerResponse<T>)
Enqueue the given response for delivery to the controller clients.
Sourcefn deliver_introspection_updates(
&self,
type_: IntrospectionType,
updates: Vec<(Row, Diff)>,
)
fn deliver_introspection_updates( &self, type_: IntrospectionType, updates: Vec<(Row, Diff)>, )
Enqueue the given introspection updates for recording.
Sourcefn replica_exists(&self, id: ReplicaId) -> bool
fn replica_exists(&self, id: ReplicaId) -> bool
Returns whether the identified replica exists.
Sourcefn peeks_targeting(
&self,
replica_id: ReplicaId,
) -> impl Iterator<Item = (Uuid, &PendingPeek<T>)>
fn peeks_targeting( &self, replica_id: ReplicaId, ) -> impl Iterator<Item = (Uuid, &PendingPeek<T>)>
Return the IDs of pending peeks targeting the specified replica.
Sourcefn subscribes_targeting(
&self,
replica_id: ReplicaId,
) -> impl Iterator<Item = GlobalId> + '_
fn subscribes_targeting( &self, replica_id: ReplicaId, ) -> impl Iterator<Item = GlobalId> + '_
Return the IDs of in-progress subscribes targeting the specified replica.
Sourcefn update_frontier_introspection(&mut self)
fn update_frontier_introspection(&mut self)
Update introspection with the current collection frontiers.
We could also do this directly in response to frontier changes, but doing it periodically lets us avoid emitting some introspection updates that can be consolidated (e.g. a write frontier updated immediately followed by a read frontier update).
This method is invoked by ComputeController::maintain, which we expect to be called once
per second during normal operation.
Sourcefn refresh_state_metrics(&self)
fn refresh_state_metrics(&self)
Refresh the controller state metrics for this instance.
We could also do state metric updates directly in response to state changes, but that would mean littering the code with metric update calls. Encapsulating state metric maintenance in a single method is less noisy.
This method is invoked by ComputeController::maintain, which we expect to be called once
per second during normal operation.
Sourcefn refresh_wallclock_lag(&mut self)
fn refresh_wallclock_lag(&mut self)
Refresh the wallclock lag introspection and metrics with the current lag values.
This method produces wallclock lag metrics of two different shapes:
- Histories: For each replica and each collection, we measure the lag of the write frontier behind the wallclock time every second. Every minute we emit the maximum lag observed over the last minute, together with the current time.
- Histograms: For each collection, we measure the lag of the write frontier behind wallclock time every second. Every minute we emit all lags observed over the last minute, together with the current histogram period.
Histories are emitted to both Mz introspection and Prometheus, histograms only to
introspection. We treat lags of unreadable collections (i.e. collections that contain no
readable times) as undefined and set them to NULL in introspection and u64::MAX in
Prometheus.
This method is invoked by ComputeController::maintain, which we expect to be called once
per second during normal operation.
Sourcefn maybe_record_wallclock_lag(&mut self)
fn maybe_record_wallclock_lag(&mut self)
Produce new wallclock lag introspection updates, provided enough time has passed since the last recording. We emit new introspection updates if the system time has passed into a new multiple of the recording interval (typically 1 minute) since the last refresh. The storage controller uses the same approach, ensuring that both controllers commit their lags at roughly the same time, avoiding confusion caused by inconsistencies.
Sourcefn report_dependency_updates(&self, id: GlobalId, diff: Diff)
fn report_dependency_updates(&self, id: GlobalId, diff: Diff)
Report updates (inserts or retractions) to the identified collection’s dependencies.
§Panics
Panics if the identified collection does not exist.
Sourcepub fn collection_hydrated(
&self,
collection_id: GlobalId,
) -> Result<bool, CollectionLookupError>
pub fn collection_hydrated( &self, collection_id: GlobalId, ) -> Result<bool, CollectionLookupError>
Returns true if the given collection is hydrated on at least one
replica.
This also returns true in case this cluster does not have any
replicas.
Sourcepub fn collections_hydrated_on_replicas(
&self,
target_replica_ids: Option<Vec<ReplicaId>>,
exclude_collections: &BTreeSet<GlobalId>,
) -> Result<bool, HydrationCheckBadTarget>
pub fn collections_hydrated_on_replicas( &self, target_replica_ids: Option<Vec<ReplicaId>>, exclude_collections: &BTreeSet<GlobalId>, ) -> Result<bool, HydrationCheckBadTarget>
Returns true if each non-transient, non-excluded collection is hydrated on at
least one replica.
This also returns true in case this cluster does not have any
replicas.
Sourcepub fn collections_hydrated(
&self,
exclude_collections: &BTreeSet<GlobalId>,
) -> bool
pub fn collections_hydrated( &self, exclude_collections: &BTreeSet<GlobalId>, ) -> bool
Returns true if all non-transient, non-excluded collections are hydrated on at least one
replica.
This also returns true in case this cluster does not have any
replicas.
Sourcefn cleanup_collections(&mut self)
fn cleanup_collections(&mut self)
Clean up collection state that is not needed anymore.
Three conditions need to be true before we can remove state for a collection:
- A client must have explicitly dropped the collection. If that is not the case, clients can still reasonably assume that the controller knows about the collection and can answer queries about it.
- There must be no outstanding read capabilities on the collection. As long as someone still holds read capabilities on a collection, we need to keep it around to be able to properly handle downgrading of said capabilities.
- All replica frontiers for the collection must have advanced to the empty frontier.
Advancement to the empty frontiers signals that replicas are done computing the
collection and that they won’t send more
ComputeResponses for it. As long as we might receive responses for a collection we want to keep it around to be able to validate and handle these responses.
Source§impl<T> Instance<T>where
T: ComputeControllerTimestamp,
impl<T> Instance<T>where
T: ComputeControllerTimestamp,
fn new( build_info: &'static BuildInfo, storage: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>, peek_stash_persist_location: PersistLocation, arranged_logs: Vec<(LogVariant, GlobalId, SharedCollectionState<T>)>, metrics: InstanceMetrics, now: NowFn, wallclock_lag: WallclockLagFn<T>, dyncfg: Arc<ConfigSet>, command_rx: UnboundedReceiver<Box<dyn FnOnce(&mut Instance<T>) + Send>>, response_tx: UnboundedSender<ComputeControllerResponse<T>>, read_hold_tx: ChangeTx<T>, introspection_tx: UnboundedSender<(IntrospectionType, Vec<(Row, Diff)>)>, ) -> Self
async fn run(self)
Sourcepub fn update_configuration(&mut self, config_params: ComputeParameters)
pub fn update_configuration(&mut self, config_params: ComputeParameters)
Update instance configuration.
Sourcepub fn initialization_complete(&mut self)
pub fn initialization_complete(&mut self)
Marks the end of any initialization commands.
Intended to be called by Controller, rather than by other code.
Calling this method repeatedly has no effect.
Sourcepub fn allow_writes(&mut self)
pub fn allow_writes(&mut self)
Allows this instance to affect writes to external systems (persist).
Calling this method repeatedly has no effect.
Sourcepub fn shutdown(&mut self)
pub fn shutdown(&mut self)
Shut down this instance.
This method runs various assertions ensuring the instance state is empty. It exists to help us find bugs where the client drops a compute instance that still has replicas or collections installed, and later assumes that said replicas/collections still exists.
§Panics
Panics if the compute instance still has active replicas. Panics if the compute instance still has collections installed.
Sourcefn send(&mut self, cmd: ComputeCommand<T>)
fn send(&mut self, cmd: ComputeCommand<T>)
Sends a command to all replicas of this instance.
Sourcepub fn add_replica(
&mut self,
id: ReplicaId,
config: ReplicaConfig,
epoch: Option<u64>,
) -> Result<(), ReplicaExists>
pub fn add_replica( &mut self, id: ReplicaId, config: ReplicaConfig, epoch: Option<u64>, ) -> Result<(), ReplicaExists>
Add a new instance replica, by ID.
Sourcepub fn remove_replica(&mut self, id: ReplicaId) -> Result<(), ReplicaMissing>
pub fn remove_replica(&mut self, id: ReplicaId) -> Result<(), ReplicaMissing>
Remove an existing instance replica, by ID.
Sourcefn rehydrate_replica(&mut self, id: ReplicaId)
fn rehydrate_replica(&mut self, id: ReplicaId)
Sourcefn rehydrate_failed_replicas(&mut self)
fn rehydrate_failed_replicas(&mut self)
Rehydrate any failed replicas of this instance.
Sourcepub fn create_dataflow(
&mut self,
dataflow: DataflowDescription<Plan<T>, (), T>,
import_read_holds: Vec<ReadHold<T>>,
subscribe_target_replica: Option<ReplicaId>,
shared_collection_state: BTreeMap<GlobalId, SharedCollectionState<T>>,
) -> Result<(), DataflowCreationError>
pub fn create_dataflow( &mut self, dataflow: DataflowDescription<Plan<T>, (), T>, import_read_holds: Vec<ReadHold<T>>, subscribe_target_replica: Option<ReplicaId>, shared_collection_state: BTreeMap<GlobalId, SharedCollectionState<T>>, ) -> Result<(), DataflowCreationError>
Creates the described dataflow and initializes state for its output.
This method expects a DataflowDescription with an as_of frontier specified, as well as
for each imported collection a read hold in import_read_holds at at least the as_of.
If a subscribe_target_replica is given, any subscribes exported by the dataflow are
configured to target that replica, i.e., only subscribe responses sent by that replica are
considered.
Sourcefn maybe_schedule_collection(&mut self, id: GlobalId)
fn maybe_schedule_collection(&mut self, id: GlobalId)
Schedule the identified collection if all its inputs are available.
§Panics
Panics if the identified collection does not exist.
Sourcefn schedule_collections(&mut self)
fn schedule_collections(&mut self)
Schedule any unscheduled collections that are ready.
Sourcepub fn drop_collections(
&mut self,
ids: Vec<GlobalId>,
) -> Result<(), CollectionMissing>
pub fn drop_collections( &mut self, ids: Vec<GlobalId>, ) -> Result<(), CollectionMissing>
Drops the read capability for the given collections and allows their resources to be reclaimed.
Sourcepub fn peek(
&mut self,
peek_target: PeekTarget,
literal_constraints: Option<Vec<Row>>,
uuid: Uuid,
timestamp: T,
result_desc: RelationDesc,
finishing: RowSetFinishing,
map_filter_project: SafeMfpPlan,
read_hold: ReadHold<T>,
target_replica: Option<ReplicaId>,
peek_response_tx: Sender<PeekResponse>,
) -> Result<(), PeekError>
pub fn peek( &mut self, peek_target: PeekTarget, literal_constraints: Option<Vec<Row>>, uuid: Uuid, timestamp: T, result_desc: RelationDesc, finishing: RowSetFinishing, map_filter_project: SafeMfpPlan, read_hold: ReadHold<T>, target_replica: Option<ReplicaId>, peek_response_tx: Sender<PeekResponse>, ) -> Result<(), PeekError>
Initiate a peek request for the contents of id at timestamp.
Sourcepub fn cancel_peek(&mut self, uuid: Uuid, reason: PeekResponse)
pub fn cancel_peek(&mut self, uuid: Uuid, reason: PeekResponse)
Cancels an existing peek request.
Sourcepub fn set_read_policy(
&mut self,
policies: Vec<(GlobalId, ReadPolicy<T>)>,
) -> Result<(), ReadPolicyError>
pub fn set_read_policy( &mut self, policies: Vec<(GlobalId, ReadPolicy<T>)>, ) -> Result<(), ReadPolicyError>
Assigns a read policy to specific identifiers.
The policies are assigned in the order presented, and repeated identifiers should conclude with the last policy. Changing a policy will immediately downgrade the read capability if appropriate, but it will not “recover” the read capability if the prior capability is already ahead of it.
Identifiers not present in policies retain their existing read policies.
It is an error to attempt to set a read policy for a collection that is not readable in the context of compute. At this time, only indexes are readable compute collections.
Sourcefn maybe_update_global_write_frontier(
&mut self,
id: GlobalId,
new_frontier: Antichain<T>,
)
fn maybe_update_global_write_frontier( &mut self, id: GlobalId, new_frontier: Antichain<T>, )
Advance the global write frontier of the given collection.
Frontier regressions are gracefully ignored.
§Panics
Panics if the identified collection does not exist.
Sourcefn apply_read_hold_change(&mut self, id: GlobalId, update: ChangeBatch<T>)
fn apply_read_hold_change(&mut self, id: GlobalId, update: ChangeBatch<T>)
Apply a collection read hold change.
Sourcefn finish_peek(&mut self, uuid: Uuid, response: PeekResponse)
fn finish_peek(&mut self, uuid: Uuid, response: PeekResponse)
Fulfills a registered peek and cleans up associated state.
As part of this we:
- Send a
PeekResponsethrough the peek’s response channel. - Emit a
CancelPeekcommand to instruct replicas to stop spending resources on this peek, and to allow theComputeCommandHistoryto reduce away the correspondingPeekcommand. - Remove the read hold for this peek, unblocking compaction that might have waited on it.
Sourcefn handle_response(
&mut self,
(replica_id, epoch, response): (ReplicaId, u64, ComputeResponse<T>),
)
fn handle_response( &mut self, (replica_id, epoch, response): (ReplicaId, u64, ComputeResponse<T>), )
Handles a response from a replica. Replica IDs are re-used across replica restarts, so we use the replica epoch to drop stale responses.
Sourcefn handle_frontiers_response(
&mut self,
id: GlobalId,
frontiers: FrontiersResponse<T>,
replica_id: ReplicaId,
)
fn handle_frontiers_response( &mut self, id: GlobalId, frontiers: FrontiersResponse<T>, replica_id: ReplicaId, )
Handle new frontiers, returning any compute response that needs to be sent to the client.
fn handle_peek_response( &mut self, uuid: Uuid, response: PeekResponse, otel_ctx: OpenTelemetryContext, replica_id: ReplicaId, )
fn handle_copy_to_response( &mut self, sink_id: GlobalId, response: CopyToResponse, replica_id: ReplicaId, )
fn handle_subscribe_response( &mut self, subscribe_id: GlobalId, response: SubscribeResponse<T>, replica_id: ReplicaId, )
fn handle_status_response( &self, response: StatusResponse, _replica_id: ReplicaId, )
Sourcefn dependency_write_frontiers<'b>(
&'b self,
collection: &'b CollectionState<T>,
) -> impl Iterator<Item = Antichain<T>> + 'b
fn dependency_write_frontiers<'b>( &'b self, collection: &'b CollectionState<T>, ) -> impl Iterator<Item = Antichain<T>> + 'b
Return the write frontiers of the dependencies of the given collection.
Sourcefn transitive_storage_dependency_write_frontiers<'b>(
&'b self,
collection: &'b CollectionState<T>,
) -> impl Iterator<Item = Antichain<T>> + 'b
fn transitive_storage_dependency_write_frontiers<'b>( &'b self, collection: &'b CollectionState<T>, ) -> impl Iterator<Item = Antichain<T>> + 'b
Return the write frontiers of transitive storage dependencies of the given collection.
Sourcefn downgrade_warmup_capabilities(&mut self)
fn downgrade_warmup_capabilities(&mut self)
Downgrade the warmup capabilities of collections as much as possible.
The only requirement we have for a collection’s warmup capability is that it is for a time
that is available in all of the collection’s inputs. For each input the latest time that is
the case for is write_frontier - 1. So the farthest we can downgrade a collection’s
warmup capability is the minimum of write_frontier - 1 of all its inputs.
This method expects to be periodically called as part of instance maintenance work. We would like to instead update the warmup capabilities synchronously in response to frontier updates of dependency collections, but that is not generally possible because we don’t learn about frontier updates of storage collections synchronously. We could do synchronous updates for compute dependencies, but we refrain from doing for simplicity.
Sourcefn forward_implied_capabilities(&mut self)
fn forward_implied_capabilities(&mut self)
Forward the implied capabilities of collections, if possible.
The implied capability of a collection controls (a) which times are still readable (for indexes) and (b) with which as-of the collection gets installed on a new replica. We are usually not allowed to advance an implied capability beyond the frontier that follows from the collection’s read policy applied to its write frontier:
- For sink collections, some external consumer might rely on seeing all distinct times in the input reflected in the output. If we’d forward the implied capability of a sink, we’d risk skipping times in the output across replica restarts.
- For index collections, we might make the index unreadable by advancing its read frontier beyond its write frontier.
There is one case where forwarding an implied capability is fine though: an index installed on a cluster that has no replicas. Such indexes are not readable anyway until a new replica is added, so advancing its read frontier can’t make it unreadable. We can thus advance the implied capability as long as we make sure that when a new replica is added, the expected relationship between write frontier, read policy, and implied capability can be restored immediately (modulo computation time).
Forwarding implied capabilities is not necessary for the correct functioning of the controller but an optimization that is beneficial in two ways:
- It relaxes read holds on inputs to forwarded collections, allowing their compaction.
- It reduces the amount of historical detail new replicas need to process when computing forwarded collections, as forwarding the implied capability also forwards the corresponding dataflow as-of.
Sourcepub fn maintain(&mut self)
pub fn maintain(&mut self)
Process pending maintenance work.
This method is invoked periodically by the global controller. It is a good place to perform maintenance work that arises from various controller state changes and that cannot conveniently be handled synchronously with those state changes.
Auto Trait Implementations§
impl<T> Freeze for Instance<T>
impl<T> !RefUnwindSafe for Instance<T>
impl<T> Send for Instance<T>
impl<T> Sync for Instance<T>
impl<T> Unpin for Instance<T>where
T: Unpin,
impl<T> !UnwindSafe for Instance<T>
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> Downcast for T
impl<T> Downcast for T
Source§impl<T> FmtForward for T
impl<T> FmtForward for T
Source§fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
self to use its Binary implementation when Debug-formatted.Source§fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
self to use its Display implementation when
Debug-formatted.Source§fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
self to use its LowerExp implementation when
Debug-formatted.Source§fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
self to use its LowerHex implementation when
Debug-formatted.Source§fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
self to use its Octal implementation when Debug-formatted.Source§fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
self to use its Pointer implementation when
Debug-formatted.Source§fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
self to use its UpperExp implementation when
Debug-formatted.Source§fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
self to use its UpperHex implementation when
Debug-formatted.Source§impl<T> FutureExt for T
impl<T> FutureExt for T
Source§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
Source§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
Source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T in a tonic::RequestSource§impl<T, U> OverrideFrom<Option<&T>> for Uwhere
U: OverrideFrom<T>,
impl<T, U> OverrideFrom<Option<&T>> for Uwhere
U: OverrideFrom<T>,
Source§impl<T> Paint for Twhere
T: ?Sized,
impl<T> Paint for Twhere
T: ?Sized,
Source§fn fg(&self, value: Color) -> Painted<&T>
fn fg(&self, value: Color) -> Painted<&T>
Returns a styled value derived from self with the foreground set to
value.
This method should be used rarely. Instead, prefer to use color-specific
builder methods like red() and
green(), which have the same functionality but are
pithier.
§Example
Set foreground color to white using fg():
use yansi::{Paint, Color};
painted.fg(Color::White);Set foreground color to white using white().
use yansi::Paint;
painted.white();Source§fn bright_black(&self) -> Painted<&T>
fn bright_black(&self) -> Painted<&T>
Source§fn bright_red(&self) -> Painted<&T>
fn bright_red(&self) -> Painted<&T>
Source§fn bright_green(&self) -> Painted<&T>
fn bright_green(&self) -> Painted<&T>
Source§fn bright_yellow(&self) -> Painted<&T>
fn bright_yellow(&self) -> Painted<&T>
Source§fn bright_blue(&self) -> Painted<&T>
fn bright_blue(&self) -> Painted<&T>
Source§fn bright_magenta(&self) -> Painted<&T>
fn bright_magenta(&self) -> Painted<&T>
Source§fn bright_cyan(&self) -> Painted<&T>
fn bright_cyan(&self) -> Painted<&T>
Source§fn bright_white(&self) -> Painted<&T>
fn bright_white(&self) -> Painted<&T>
Source§fn bg(&self, value: Color) -> Painted<&T>
fn bg(&self, value: Color) -> Painted<&T>
Returns a styled value derived from self with the background set to
value.
This method should be used rarely. Instead, prefer to use color-specific
builder methods like on_red() and
on_green(), which have the same functionality but
are pithier.
§Example
Set background color to red using fg():
use yansi::{Paint, Color};
painted.bg(Color::Red);Set background color to red using on_red().
use yansi::Paint;
painted.on_red();Source§fn on_primary(&self) -> Painted<&T>
fn on_primary(&self) -> Painted<&T>
Source§fn on_magenta(&self) -> Painted<&T>
fn on_magenta(&self) -> Painted<&T>
Source§fn on_bright_black(&self) -> Painted<&T>
fn on_bright_black(&self) -> Painted<&T>
Source§fn on_bright_red(&self) -> Painted<&T>
fn on_bright_red(&self) -> Painted<&T>
Source§fn on_bright_green(&self) -> Painted<&T>
fn on_bright_green(&self) -> Painted<&T>
Source§fn on_bright_yellow(&self) -> Painted<&T>
fn on_bright_yellow(&self) -> Painted<&T>
Source§fn on_bright_blue(&self) -> Painted<&T>
fn on_bright_blue(&self) -> Painted<&T>
Source§fn on_bright_magenta(&self) -> Painted<&T>
fn on_bright_magenta(&self) -> Painted<&T>
Source§fn on_bright_cyan(&self) -> Painted<&T>
fn on_bright_cyan(&self) -> Painted<&T>
Source§fn on_bright_white(&self) -> Painted<&T>
fn on_bright_white(&self) -> Painted<&T>
Source§fn attr(&self, value: Attribute) -> Painted<&T>
fn attr(&self, value: Attribute) -> Painted<&T>
Enables the styling Attribute value.
This method should be used rarely. Instead, prefer to use
attribute-specific builder methods like bold() and
underline(), which have the same functionality
but are pithier.
§Example
Make text bold using attr():
use yansi::{Paint, Attribute};
painted.attr(Attribute::Bold);Make text bold using using bold().
use yansi::Paint;
painted.bold();Source§fn rapid_blink(&self) -> Painted<&T>
fn rapid_blink(&self) -> Painted<&T>
Source§fn quirk(&self, value: Quirk) -> Painted<&T>
fn quirk(&self, value: Quirk) -> Painted<&T>
Enables the yansi Quirk value.
This method should be used rarely. Instead, prefer to use quirk-specific
builder methods like mask() and
wrap(), which have the same functionality but are
pithier.
§Example
Enable wrapping using .quirk():
use yansi::{Paint, Quirk};
painted.quirk(Quirk::Wrap);Enable wrapping using wrap().
use yansi::Paint;
painted.wrap();Source§fn clear(&self) -> Painted<&T>
👎Deprecated since 1.0.1: renamed to resetting() due to conflicts with Vec::clear().
The clear() method will be removed in a future release.
fn clear(&self) -> Painted<&T>
resetting() due to conflicts with Vec::clear().
The clear() method will be removed in a future release.Source§fn whenever(&self, value: Condition) -> Painted<&T>
fn whenever(&self, value: Condition) -> Painted<&T>
Conditionally enable styling based on whether the Condition value
applies. Replaces any previous condition.
See the crate level docs for more details.
§Example
Enable styling painted only when both stdout and stderr are TTYs:
use yansi::{Paint, Condition};
painted.red().on_yellow().whenever(Condition::STDOUTERR_ARE_TTY);Source§impl<T> Pipe for Twhere
T: ?Sized,
impl<T> Pipe for Twhere
T: ?Sized,
Source§fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
Source§fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
self and passes that borrow into the pipe function. Read moreSource§fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
self and passes that borrow into the pipe function. Read moreSource§fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
Source§fn pipe_borrow_mut<'a, B, R>(
&'a mut self,
func: impl FnOnce(&'a mut B) -> R,
) -> R
fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R, ) -> R
Source§fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
self, then passes self.as_ref() into the pipe function.Source§fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
self, then passes self.as_mut() into the pipe
function.Source§fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
self, then passes self.deref() into the pipe function.Source§impl<T> Pointable for T
impl<T> Pointable for T
Source§impl<T> PolicyExt for Twhere
T: ?Sized,
impl<T> PolicyExt for Twhere
T: ?Sized,
Source§impl<P, R> ProtoType<R> for Pwhere
R: RustType<P>,
impl<P, R> ProtoType<R> for Pwhere
R: RustType<P>,
Source§fn into_rust(self) -> Result<R, TryFromProtoError>
fn into_rust(self) -> Result<R, TryFromProtoError>
RustType::from_proto.Source§fn from_rust(rust: &R) -> P
fn from_rust(rust: &R) -> P
RustType::into_proto.Source§impl<'a, S, T> Semigroup<&'a S> for Twhere
T: Semigroup<S>,
impl<'a, S, T> Semigroup<&'a S> for Twhere
T: Semigroup<S>,
Source§fn plus_equals(&mut self, rhs: &&'a S)
fn plus_equals(&mut self, rhs: &&'a S)
std::ops::AddAssign, for types that do not implement AddAssign.Source§impl<T> ServiceExt for T
impl<T> ServiceExt for T
Source§fn map_response_body<F>(self, f: F) -> MapResponseBody<Self, F>where
Self: Sized,
fn map_response_body<F>(self, f: F) -> MapResponseBody<Self, F>where
Self: Sized,
Source§fn decompression(self) -> Decompression<Self>where
Self: Sized,
fn decompression(self) -> Decompression<Self>where
Self: Sized,
Source§fn trace_for_http(self) -> Trace<Self, SharedClassifier<ServerErrorsAsFailures>>where
Self: Sized,
fn trace_for_http(self) -> Trace<Self, SharedClassifier<ServerErrorsAsFailures>>where
Self: Sized,
Source§fn trace_for_grpc(self) -> Trace<Self, SharedClassifier<GrpcErrorsAsFailures>>where
Self: Sized,
fn trace_for_grpc(self) -> Trace<Self, SharedClassifier<GrpcErrorsAsFailures>>where
Self: Sized,
Source§fn follow_redirects(self) -> FollowRedirect<Self>where
Self: Sized,
fn follow_redirects(self) -> FollowRedirect<Self>where
Self: Sized,
Source§impl<T> Tap for T
impl<T> Tap for T
Source§fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
Borrow<B> of a value. Read moreSource§fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
BorrowMut<B> of a value. Read moreSource§fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
AsRef<R> view of a value. Read moreSource§fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
AsMut<R> view of a value. Read moreSource§fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
Deref::Target of a value. Read moreSource§fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
Deref::Target of a value. Read moreSource§fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
.tap() only in debug builds, and is erased in release builds.Source§fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
.tap_mut() only in debug builds, and is erased in release
builds.Source§fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
.tap_borrow() only in debug builds, and is erased in release
builds.Source§fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
.tap_borrow_mut() only in debug builds, and is erased in release
builds.Source§fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
.tap_ref() only in debug builds, and is erased in release
builds.Source§fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
.tap_ref_mut() only in debug builds, and is erased in release
builds.Source§fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
.tap_deref() only in debug builds, and is erased in release
builds.