Struct mz_compute_client::controller::instance::Instance
source · pub(super) struct Instance<T> {Show 15 fields
build_info: &'static BuildInfo,
initialized: bool,
replicas: BTreeMap<ReplicaId, ReplicaState<T>>,
collections: BTreeMap<GlobalId, CollectionState<T>>,
log_sources: BTreeMap<LogVariant, GlobalId>,
peeks: BTreeMap<Uuid, PendingPeek<T>>,
subscribes: BTreeMap<GlobalId, ActiveSubscribe<T>>,
copy_tos: BTreeSet<GlobalId>,
history: ComputeCommandHistory<UIntGauge, T>,
response_tx: Sender<ComputeControllerResponse<T>>,
introspection_tx: Sender<(IntrospectionType, Vec<(Row, Diff)>)>,
envd_epoch: NonZeroI64,
replica_epochs: BTreeMap<ReplicaId, u64>,
metrics: InstanceMetrics,
enable_aggressive_readhold_downgrades: bool,
}
Expand description
The state we keep for a compute instance.
Fields§
§build_info: &'static BuildInfo
Build info for spawning replicas
initialized: bool
Whether instance initialization has been completed.
replicas: BTreeMap<ReplicaId, ReplicaState<T>>
The replicas of this compute instance.
collections: BTreeMap<GlobalId, CollectionState<T>>
Currently installed compute collections.
New entries are added for all collections exported from dataflows created through
ActiveInstance::create_dataflow
.
Entries are removed by Instance::cleanup_collections
. See that method’s documentation
about the conditions for removing collection state.
log_sources: BTreeMap<LogVariant, GlobalId>
IDs of log sources maintained by this compute instance.
peeks: BTreeMap<Uuid, PendingPeek<T>>
Currently outstanding peeks.
New entries are added for all peeks initiated through ActiveInstance::peek
.
The entry for a peek is only removed once all replicas have responded to the peek. This is currently required to ensure all replicas have stopped reading from the peeked collection’s inputs before we allow them to compact. #16641 tracks changing this so we only have to wait for the first peek response.
subscribes: BTreeMap<GlobalId, ActiveSubscribe<T>>
Currently in-progress subscribes.
New entries are added for all subscribes exported from dataflows created through
ActiveInstance::create_dataflow
.
The entry for a subscribe is removed once at least one replica has reported the subscribe to have advanced to the empty frontier or to have been dropped, implying that no further updates will be emitted for this subscribe.
Note that subscribes are tracked both in collections
and subscribes
. collections
keeps track of the subscribe’s upper and since frontiers and ensures appropriate read holds
on the subscribe’s input. subscribes
is only used to track which updates have been
emitted, to decide if new ones should be emitted or suppressed.
copy_tos: BTreeSet<GlobalId>
Tracks all in-progress COPY TOs.
New entries are added for all s3 oneshot sinks (corresponding to a COPY TO) exported from
dataflows created through ActiveInstance::create_dataflow
.
The entry for a copy to is removed once at least one replica has finished or the exporting collection is dropped.
history: ComputeCommandHistory<UIntGauge, T>
The command history, used when introducing new replicas or restarting existing replicas.
response_tx: Sender<ComputeControllerResponse<T>>
Sender for responses to be delivered.
introspection_tx: Sender<(IntrospectionType, Vec<(Row, Diff)>)>
Sender for introspection updates to be recorded.
envd_epoch: NonZeroI64
A number that increases with each restart of environmentd
.
replica_epochs: BTreeMap<ReplicaId, u64>
Numbers that increase with each restart of a replica.
metrics: InstanceMetrics
The registry the controller uses to report metrics.
enable_aggressive_readhold_downgrades: bool
Whether to aggressively downgrade read holds for sink dataflows.
This flag exists to derisk the rollout of the aggressive downgrading approach. TODO(teskje): Remove this after a couple weeks.
Implementations§
source§impl<T: Timestamp> Instance<T>
impl<T: Timestamp> Instance<T>
sourcepub fn collection(
&self,
id: GlobalId
) -> Result<&CollectionState<T>, CollectionMissing>
pub fn collection( &self, id: GlobalId ) -> Result<&CollectionState<T>, CollectionMissing>
Acquire a handle to the collection state associated with id
.
sourcefn collection_mut(
&mut self,
id: GlobalId
) -> Result<&mut CollectionState<T>, CollectionMissing>
fn collection_mut( &mut self, id: GlobalId ) -> Result<&mut CollectionState<T>, CollectionMissing>
Acquire a mutable handle to the collection state associated with id
.
sourcepub fn expect_collection(&self, id: GlobalId) -> &CollectionState<T>
pub fn expect_collection(&self, id: GlobalId) -> &CollectionState<T>
Acquire a handle to the collection state associated with id
.
Panics
Panics if the identified collection does not exist.
sourcefn expect_collection_mut(&mut self, id: GlobalId) -> &mut CollectionState<T>
fn expect_collection_mut(&mut self, id: GlobalId) -> &mut CollectionState<T>
Acquire a mutable handle to the collection state associated with id
.
Panics
Panics if the identified collection does not exist.
pub fn collections_iter( &self ) -> impl Iterator<Item = (&GlobalId, &CollectionState<T>)>
sourcefn add_collection(
&mut self,
id: GlobalId,
as_of: Antichain<T>,
storage_dependencies: Vec<GlobalId>,
compute_dependencies: Vec<GlobalId>,
write_only: bool
)
fn add_collection( &mut self, id: GlobalId, as_of: Antichain<T>, storage_dependencies: Vec<GlobalId>, compute_dependencies: Vec<GlobalId>, write_only: bool )
Add a collection to the instance state.
Panics
Panics if a collection with the same ID exists already.
fn remove_collection(&mut self, id: GlobalId)
fn add_replica_state( &mut self, id: ReplicaId, client: ReplicaClient<T>, config: ReplicaConfig )
fn remove_replica_state(&mut self, id: ReplicaId) -> Option<ReplicaState<T>>
sourcefn deliver_response(&mut self, response: ComputeControllerResponse<T>)
fn deliver_response(&mut self, response: ComputeControllerResponse<T>)
Enqueue the given response for delivery to the controller clients.
sourcefn deliver_introspection_updates(
&mut self,
type_: IntrospectionType,
updates: Vec<(Row, Diff)>
)
fn deliver_introspection_updates( &mut self, type_: IntrospectionType, updates: Vec<(Row, Diff)> )
Enqueue the given introspection updates for recording.
sourcepub fn activate<'a>(
&'a mut self,
storage_controller: &'a mut dyn StorageController<Timestamp = T>
) -> ActiveInstance<'a, T>
pub fn activate<'a>( &'a mut self, storage_controller: &'a mut dyn StorageController<Timestamp = T> ) -> ActiveInstance<'a, T>
Acquire an ActiveInstance
by providing a storage controller.
sourcepub fn replica_exists(&self, id: ReplicaId) -> bool
pub fn replica_exists(&self, id: ReplicaId) -> bool
Returns whether the identified replica exists.
sourcepub fn replica_ids(&self) -> impl Iterator<Item = ReplicaId> + '_
pub fn replica_ids(&self) -> impl Iterator<Item = ReplicaId> + '_
Returns the ids of all replicas of this instance.
sourcefn peeks_targeting(
&self,
replica_id: ReplicaId
) -> impl Iterator<Item = (Uuid, &PendingPeek<T>)>
fn peeks_targeting( &self, replica_id: ReplicaId ) -> impl Iterator<Item = (Uuid, &PendingPeek<T>)>
Return the IDs of pending peeks targeting the specified replica.
sourcefn subscribes_targeting(
&self,
replica_id: ReplicaId
) -> impl Iterator<Item = GlobalId> + '_
fn subscribes_targeting( &self, replica_id: ReplicaId ) -> impl Iterator<Item = GlobalId> + '_
Return the IDs of in-progress subscribes targeting the specified replica.
sourcefn refresh_state_metrics(&self)
fn refresh_state_metrics(&self)
Refresh the controller state metrics for this instance.
We could also do state metric updates directly in response to state changes, but that would mean littering the code with metric update calls. Encapsulating state metric maintenance in a single method is less noisy.
This method is invoked by ActiveComputeController::process
, which we expect to
be periodically called during normal operation.
sourcefn report_dependency_updates(&mut self, id: GlobalId, diff: i64)
fn report_dependency_updates(&mut self, id: GlobalId, diff: i64)
Report updates (inserts or retractions) to the identified collection’s dependencies.
Panics
Panics if the identified collection does not exist.
sourcefn update_hydration_status(
&mut self,
id: GlobalId,
replica_id: ReplicaId,
frontier: &Antichain<T>
)
fn update_hydration_status( &mut self, id: GlobalId, replica_id: ReplicaId, frontier: &Antichain<T> )
Update the tracked hydration status for the given collection and replica according to an observed frontier update.
sourcefn update_operator_hydration_status(
&mut self,
replica_id: ReplicaId,
status: OperatorHydrationStatus
)
fn update_operator_hydration_status( &mut self, replica_id: ReplicaId, status: OperatorHydrationStatus )
Update the tracked hydration status for an operator according to a received status update.
sourcefn cleanup_collections(&mut self)
fn cleanup_collections(&mut self)
Clean up collection state that is not needed anymore.
Three conditions need to be true before we can remove state for a collection:
- A client must have explicitly dropped the collection. If that is not the case, clients can still reasonably assume that the controller knows about the collection and can answer queries about it.
- There must be no outstanding read capabilities on the collection. As long as someone still holds read capabilities on a collection, we need to keep it around to be able to properly handle downgrading of said capabilities.
- All replica write frontiers for the collection must have advanced to the empty
frontier. Advancement to the empty frontier signals that replicas are done computing
the collection and that they won’t send more
ComputeResponse
s for it. As long as we might receive responses for a collection we want to keep it around to be able to validate and handle these responses.
sourcepub fn collection_reverse_dependencies(
&self,
id: GlobalId
) -> impl Iterator<Item = &GlobalId>
pub fn collection_reverse_dependencies( &self, id: GlobalId ) -> impl Iterator<Item = &GlobalId>
List compute collections that depend on the given collection.
source§impl<T> Instance<T>where
T: Timestamp + Lattice,
ComputeGrpcClient: ComputeClient<T>,
impl<T> Instance<T>where T: Timestamp + Lattice, ComputeGrpcClient: ComputeClient<T>,
pub fn new( build_info: &'static BuildInfo, arranged_logs: BTreeMap<LogVariant, GlobalId>, envd_epoch: NonZeroI64, metrics: InstanceMetrics, response_tx: Sender<ComputeControllerResponse<T>>, introspection_tx: Sender<(IntrospectionType, Vec<(Row, Diff)>)>, enable_aggressive_readhold_downgrades: bool ) -> Self
sourcepub fn update_configuration(&mut self, config_params: ComputeParameters)
pub fn update_configuration(&mut self, config_params: ComputeParameters)
Update instance configuration.
sourcepub fn initialization_complete(&mut self)
pub fn initialization_complete(&mut self)
Marks the end of any initialization commands.
Intended to be called by Controller
, rather than by other code.
Calling this method repeatedly has no effect.
sourcepub fn drop(self)
pub fn drop(self)
Drop this compute instance.
Panics
Panics if the compute instance still has active replicas. Panics if the compute instance still has collections installed.
sourcepub fn send(&mut self, cmd: ComputeCommand<T>)
pub fn send(&mut self, cmd: ComputeCommand<T>)
Sends a command to all replicas of this instance.
sourcepub async fn recv(
&mut self
) -> Result<(ReplicaId, ComputeResponse<T>), ReplicaId>
pub async fn recv( &mut self ) -> Result<(ReplicaId, ComputeResponse<T>), ReplicaId>
Receives the next response from any replica of this instance.
Returns Err
if receiving from a replica has failed, to signal that it is in need of
rehydration.
This method is cancellation safe.
sourcefn register_replica_heartbeat(&mut self, replica_id: ReplicaId)
fn register_replica_heartbeat(&mut self, replica_id: ReplicaId)
sourcepub fn set_subscribe_target_replica(
&mut self,
id: GlobalId,
target_replica: ReplicaId
) -> Result<(), SubscribeTargetError>
pub fn set_subscribe_target_replica( &mut self, id: GlobalId, target_replica: ReplicaId ) -> Result<(), SubscribeTargetError>
Assign a target replica to the identified subscribe.
If a subscribe has a target replica assigned, only subscribe responses sent by that replica are considered.
Trait Implementations§
Auto Trait Implementations§
impl<T> !RefUnwindSafe for Instance<T>
impl<T> Send for Instance<T>where T: Send,
impl<T> Sync for Instance<T>where T: Send + Sync,
impl<T> Unpin for Instance<T>where T: Unpin,
impl<T> !UnwindSafe for Instance<T>
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
source§impl<T> FmtForward for T
impl<T> FmtForward for T
source§fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
fn fmt_binary(self) -> FmtBinary<Self>where Self: Binary,
self
to use its Binary
implementation when Debug
-formatted.source§fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
fn fmt_display(self) -> FmtDisplay<Self>where Self: Display,
self
to use its Display
implementation when
Debug
-formatted.source§fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
fn fmt_lower_exp(self) -> FmtLowerExp<Self>where Self: LowerExp,
self
to use its LowerExp
implementation when
Debug
-formatted.source§fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
fn fmt_lower_hex(self) -> FmtLowerHex<Self>where Self: LowerHex,
self
to use its LowerHex
implementation when
Debug
-formatted.source§fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
fn fmt_octal(self) -> FmtOctal<Self>where Self: Octal,
self
to use its Octal
implementation when Debug
-formatted.source§fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
fn fmt_pointer(self) -> FmtPointer<Self>where Self: Pointer,
self
to use its Pointer
implementation when
Debug
-formatted.source§fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
fn fmt_upper_exp(self) -> FmtUpperExp<Self>where Self: UpperExp,
self
to use its UpperExp
implementation when
Debug
-formatted.source§fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
fn fmt_upper_hex(self) -> FmtUpperHex<Self>where Self: UpperHex,
self
to use its UpperHex
implementation when
Debug
-formatted.source§impl<T> FutureExt for T
impl<T> FutureExt for T
source§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
source§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
source§impl<T> Instrument for T
impl<T> Instrument for T
source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T
in a tonic::Request
source§impl<T> Pipe for Twhere
T: ?Sized,
impl<T> Pipe for Twhere T: ?Sized,
source§fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere Self: Sized,
source§fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere R: 'a,
self
and passes that borrow into the pipe function. Read moresource§fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere R: 'a,
self
and passes that borrow into the pipe function. Read moresource§fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> Rwhere
Self: Borrow<B>,
B: 'a + ?Sized,
R: 'a,
fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> Rwhere Self: Borrow<B>, B: 'a + ?Sized, R: 'a,
source§fn pipe_borrow_mut<'a, B, R>(
&'a mut self,
func: impl FnOnce(&'a mut B) -> R
) -> Rwhere
Self: BorrowMut<B>,
B: 'a + ?Sized,
R: 'a,
fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R ) -> Rwhere Self: BorrowMut<B>, B: 'a + ?Sized, R: 'a,
source§fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> Rwhere
Self: AsRef<U>,
U: 'a + ?Sized,
R: 'a,
fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> Rwhere Self: AsRef<U>, U: 'a + ?Sized, R: 'a,
self
, then passes self.as_ref()
into the pipe function.source§fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> Rwhere
Self: AsMut<U>,
U: 'a + ?Sized,
R: 'a,
fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> Rwhere Self: AsMut<U>, U: 'a + ?Sized, R: 'a,
self
, then passes self.as_mut()
into the pipe
function.source§impl<T> Pointable for T
impl<T> Pointable for T
source§impl<P, R> ProtoType<R> for Pwhere
R: RustType<P>,
impl<P, R> ProtoType<R> for Pwhere R: RustType<P>,
source§fn into_rust(self) -> Result<R, TryFromProtoError>
fn into_rust(self) -> Result<R, TryFromProtoError>
RustType::from_proto
.source§fn from_rust(rust: &R) -> P
fn from_rust(rust: &R) -> P
RustType::into_proto
.source§impl<T> Tap for T
impl<T> Tap for T
source§fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Selfwhere
Self: Borrow<B>,
B: ?Sized,
fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Selfwhere Self: Borrow<B>, B: ?Sized,
Borrow<B>
of a value. Read moresource§fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Selfwhere
Self: BorrowMut<B>,
B: ?Sized,
fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Selfwhere Self: BorrowMut<B>, B: ?Sized,
BorrowMut<B>
of a value. Read moresource§fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Selfwhere
Self: AsRef<R>,
R: ?Sized,
fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Selfwhere Self: AsRef<R>, R: ?Sized,
AsRef<R>
view of a value. Read moresource§fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Selfwhere
Self: AsMut<R>,
R: ?Sized,
fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Selfwhere Self: AsMut<R>, R: ?Sized,
AsMut<R>
view of a value. Read moresource§fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Selfwhere
Self: Deref<Target = T>,
T: ?Sized,
fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Selfwhere Self: Deref<Target = T>, T: ?Sized,
Deref::Target
of a value. Read moresource§fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Selfwhere
Self: DerefMut<Target = T> + Deref,
T: ?Sized,
fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Selfwhere Self: DerefMut<Target = T> + Deref, T: ?Sized,
Deref::Target
of a value. Read moresource§fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
.tap()
only in debug builds, and is erased in release builds.source§fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
.tap_mut()
only in debug builds, and is erased in release
builds.source§fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Selfwhere
Self: Borrow<B>,
B: ?Sized,
fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Selfwhere Self: Borrow<B>, B: ?Sized,
.tap_borrow()
only in debug builds, and is erased in release
builds.source§fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Selfwhere
Self: BorrowMut<B>,
B: ?Sized,
fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Selfwhere Self: BorrowMut<B>, B: ?Sized,
.tap_borrow_mut()
only in debug builds, and is erased in release
builds.source§fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Selfwhere
Self: AsRef<R>,
R: ?Sized,
fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Selfwhere Self: AsRef<R>, R: ?Sized,
.tap_ref()
only in debug builds, and is erased in release
builds.source§fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Selfwhere
Self: AsMut<R>,
R: ?Sized,
fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Selfwhere Self: AsMut<R>, R: ?Sized,
.tap_ref_mut()
only in debug builds, and is erased in release
builds.