pub(super) struct Instance<T> {
Show 15 fields build_info: &'static BuildInfo, initialized: bool, replicas: BTreeMap<ReplicaId, ReplicaState<T>>, collections: BTreeMap<GlobalId, CollectionState<T>>, log_sources: BTreeMap<LogVariant, GlobalId>, peeks: BTreeMap<Uuid, PendingPeek<T>>, subscribes: BTreeMap<GlobalId, ActiveSubscribe<T>>, copy_tos: BTreeSet<GlobalId>, history: ComputeCommandHistory<UIntGauge, T>, response_tx: Sender<ComputeControllerResponse<T>>, introspection_tx: Sender<(IntrospectionType, Vec<(Row, Diff)>)>, envd_epoch: NonZeroI64, replica_epochs: BTreeMap<ReplicaId, u64>, metrics: InstanceMetrics, enable_aggressive_readhold_downgrades: bool,
}
Expand description

The state we keep for a compute instance.

Fields§

§build_info: &'static BuildInfo

Build info for spawning replicas

§initialized: bool

Whether instance initialization has been completed.

§replicas: BTreeMap<ReplicaId, ReplicaState<T>>

The replicas of this compute instance.

§collections: BTreeMap<GlobalId, CollectionState<T>>

Currently installed compute collections.

New entries are added for all collections exported from dataflows created through ActiveInstance::create_dataflow.

Entries are removed by Instance::cleanup_collections. See that method’s documentation about the conditions for removing collection state.

§log_sources: BTreeMap<LogVariant, GlobalId>

IDs of log sources maintained by this compute instance.

§peeks: BTreeMap<Uuid, PendingPeek<T>>

Currently outstanding peeks.

New entries are added for all peeks initiated through ActiveInstance::peek.

The entry for a peek is only removed once all replicas have responded to the peek. This is currently required to ensure all replicas have stopped reading from the peeked collection’s inputs before we allow them to compact. #16641 tracks changing this so we only have to wait for the first peek response.

§subscribes: BTreeMap<GlobalId, ActiveSubscribe<T>>

Currently in-progress subscribes.

New entries are added for all subscribes exported from dataflows created through ActiveInstance::create_dataflow.

The entry for a subscribe is removed once at least one replica has reported the subscribe to have advanced to the empty frontier or to have been dropped, implying that no further updates will be emitted for this subscribe.

Note that subscribes are tracked both in collections and subscribes. collections keeps track of the subscribe’s upper and since frontiers and ensures appropriate read holds on the subscribe’s input. subscribes is only used to track which updates have been emitted, to decide if new ones should be emitted or suppressed.

§copy_tos: BTreeSet<GlobalId>

Tracks all in-progress COPY TOs.

New entries are added for all s3 oneshot sinks (corresponding to a COPY TO) exported from dataflows created through ActiveInstance::create_dataflow.

The entry for a copy to is removed once at least one replica has finished or the exporting collection is dropped.

§history: ComputeCommandHistory<UIntGauge, T>

The command history, used when introducing new replicas or restarting existing replicas.

§response_tx: Sender<ComputeControllerResponse<T>>

Sender for responses to be delivered.

§introspection_tx: Sender<(IntrospectionType, Vec<(Row, Diff)>)>

Sender for introspection updates to be recorded.

§envd_epoch: NonZeroI64

A number that increases with each restart of environmentd.

§replica_epochs: BTreeMap<ReplicaId, u64>

Numbers that increase with each restart of a replica.

§metrics: InstanceMetrics

The registry the controller uses to report metrics.

§enable_aggressive_readhold_downgrades: bool

Whether to aggressively downgrade read holds for sink dataflows.

This flag exists to derisk the rollout of the aggressive downgrading approach. TODO(teskje): Remove this after a couple weeks.

Implementations§

source§

impl<T: Timestamp> Instance<T>

source

pub fn collection( &self, id: GlobalId ) -> Result<&CollectionState<T>, CollectionMissing>

Acquire a handle to the collection state associated with id.

source

fn collection_mut( &mut self, id: GlobalId ) -> Result<&mut CollectionState<T>, CollectionMissing>

Acquire a mutable handle to the collection state associated with id.

source

pub fn expect_collection(&self, id: GlobalId) -> &CollectionState<T>

Acquire a handle to the collection state associated with id.

Panics

Panics if the identified collection does not exist.

source

fn expect_collection_mut(&mut self, id: GlobalId) -> &mut CollectionState<T>

Acquire a mutable handle to the collection state associated with id.

Panics

Panics if the identified collection does not exist.

source

pub fn collections_iter( &self ) -> impl Iterator<Item = (&GlobalId, &CollectionState<T>)>

source

fn add_collection( &mut self, id: GlobalId, as_of: Antichain<T>, storage_dependencies: Vec<GlobalId>, compute_dependencies: Vec<GlobalId>, write_only: bool )

Add a collection to the instance state.

Panics

Panics if a collection with the same ID exists already.

source

fn remove_collection(&mut self, id: GlobalId)

source

fn add_replica_state( &mut self, id: ReplicaId, client: ReplicaClient<T>, config: ReplicaConfig )

source

fn remove_replica_state(&mut self, id: ReplicaId) -> Option<ReplicaState<T>>

source

fn deliver_response(&mut self, response: ComputeControllerResponse<T>)

Enqueue the given response for delivery to the controller clients.

source

fn deliver_introspection_updates( &mut self, type_: IntrospectionType, updates: Vec<(Row, Diff)> )

Enqueue the given introspection updates for recording.

source

pub fn activate<'a>( &'a mut self, storage_controller: &'a mut dyn StorageController<Timestamp = T> ) -> ActiveInstance<'a, T>

Acquire an ActiveInstance by providing a storage controller.

source

pub fn replica_exists(&self, id: ReplicaId) -> bool

Returns whether the identified replica exists.

source

pub fn replica_ids(&self) -> impl Iterator<Item = ReplicaId> + '_

Returns the ids of all replicas of this instance.

source

fn peeks_targeting( &self, replica_id: ReplicaId ) -> impl Iterator<Item = (Uuid, &PendingPeek<T>)>

Return the IDs of pending peeks targeting the specified replica.

source

fn subscribes_targeting( &self, replica_id: ReplicaId ) -> impl Iterator<Item = GlobalId> + '_

Return the IDs of in-progress subscribes targeting the specified replica.

source

fn refresh_state_metrics(&self)

Refresh the controller state metrics for this instance.

We could also do state metric updates directly in response to state changes, but that would mean littering the code with metric update calls. Encapsulating state metric maintenance in a single method is less noisy.

This method is invoked by ActiveComputeController::process, which we expect to be periodically called during normal operation.

source

fn report_dependency_updates(&mut self, id: GlobalId, diff: i64)

Report updates (inserts or retractions) to the identified collection’s dependencies.

Panics

Panics if the identified collection does not exist.

source

fn update_hydration_status( &mut self, id: GlobalId, replica_id: ReplicaId, frontier: &Antichain<T> )

Update the tracked hydration status for the given collection and replica according to an observed frontier update.

source

fn update_operator_hydration_status( &mut self, replica_id: ReplicaId, status: OperatorHydrationStatus )

Update the tracked hydration status for an operator according to a received status update.

source

fn cleanup_collections(&mut self)

Clean up collection state that is not needed anymore.

Three conditions need to be true before we can remove state for a collection:

  1. A client must have explicitly dropped the collection. If that is not the case, clients can still reasonably assume that the controller knows about the collection and can answer queries about it.
  2. There must be no outstanding read capabilities on the collection. As long as someone still holds read capabilities on a collection, we need to keep it around to be able to properly handle downgrading of said capabilities.
  3. All replica write frontiers for the collection must have advanced to the empty frontier. Advancement to the empty frontier signals that replicas are done computing the collection and that they won’t send more ComputeResponses for it. As long as we might receive responses for a collection we want to keep it around to be able to validate and handle these responses.
source

pub fn collection_reverse_dependencies( &self, id: GlobalId ) -> impl Iterator<Item = &GlobalId>

List compute collections that depend on the given collection.

source§

impl<T> Instance<T>where T: Timestamp + Lattice, ComputeGrpcClient: ComputeClient<T>,

source

pub fn new( build_info: &'static BuildInfo, arranged_logs: BTreeMap<LogVariant, GlobalId>, envd_epoch: NonZeroI64, metrics: InstanceMetrics, response_tx: Sender<ComputeControllerResponse<T>>, introspection_tx: Sender<(IntrospectionType, Vec<(Row, Diff)>)>, enable_aggressive_readhold_downgrades: bool ) -> Self

source

pub fn update_configuration(&mut self, config_params: ComputeParameters)

Update instance configuration.

source

pub fn initialization_complete(&mut self)

Marks the end of any initialization commands.

Intended to be called by Controller, rather than by other code. Calling this method repeatedly has no effect.

source

pub fn drop(self)

Drop this compute instance.

Panics

Panics if the compute instance still has active replicas. Panics if the compute instance still has collections installed.

source

pub fn send(&mut self, cmd: ComputeCommand<T>)

Sends a command to all replicas of this instance.

source

pub async fn recv( &mut self ) -> Result<(ReplicaId, ComputeResponse<T>), ReplicaId>

Receives the next response from any replica of this instance.

Returns Err if receiving from a replica has failed, to signal that it is in need of rehydration.

This method is cancellation safe.

source

fn register_replica_heartbeat(&mut self, replica_id: ReplicaId)

Register a heartbeat from the given replica.

Panics

Panics if the specified replica does not exist.

source

pub fn set_subscribe_target_replica( &mut self, id: GlobalId, target_replica: ReplicaId ) -> Result<(), SubscribeTargetError>

Assign a target replica to the identified subscribe.

If a subscribe has a target replica assigned, only subscribe responses sent by that replica are considered.

Trait Implementations§

source§

impl<T: Debug> Debug for Instance<T>

source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

§

impl<T> !RefUnwindSafe for Instance<T>

§

impl<T> Send for Instance<T>where T: Send,

§

impl<T> Sync for Instance<T>where T: Send + Sync,

§

impl<T> Unpin for Instance<T>where T: Unpin,

§

impl<T> !UnwindSafe for Instance<T>

Blanket Implementations§

source§

impl<T> Any for Twhere T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for Twhere T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for Twhere T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T, U> CastInto<U> for Twhere U: CastFrom<T>,

source§

fn cast_into(self) -> U

Performs the cast.
source§

impl<T> Conv for T

source§

fn conv<T>(self) -> Twhere Self: Into<T>,

Converts self into T using Into<T>. Read more
source§

impl<T> FmtForward for T

source§

fn fmt_binary(self) -> FmtBinary<Self>where Self: Binary,

Causes self to use its Binary implementation when Debug-formatted.
source§

fn fmt_display(self) -> FmtDisplay<Self>where Self: Display,

Causes self to use its Display implementation when Debug-formatted.
source§

fn fmt_lower_exp(self) -> FmtLowerExp<Self>where Self: LowerExp,

Causes self to use its LowerExp implementation when Debug-formatted.
source§

fn fmt_lower_hex(self) -> FmtLowerHex<Self>where Self: LowerHex,

Causes self to use its LowerHex implementation when Debug-formatted.
source§

fn fmt_octal(self) -> FmtOctal<Self>where Self: Octal,

Causes self to use its Octal implementation when Debug-formatted.
source§

fn fmt_pointer(self) -> FmtPointer<Self>where Self: Pointer,

Causes self to use its Pointer implementation when Debug-formatted.
source§

fn fmt_upper_exp(self) -> FmtUpperExp<Self>where Self: UpperExp,

Causes self to use its UpperExp implementation when Debug-formatted.
source§

fn fmt_upper_hex(self) -> FmtUpperHex<Self>where Self: UpperHex,

Causes self to use its UpperHex implementation when Debug-formatted.
source§

fn fmt_list(self) -> FmtList<Self>where &'a Self: for<'a> IntoIterator,

Formats each item in a sequence. Read more
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T> FutureExt for T

source§

fn with_context(self, otel_cx: Context) -> WithContext<Self>

Attaches the provided Context to this type, returning a WithContext wrapper. Read more
source§

fn with_current_context(self) -> WithContext<Self>

Attaches the current Context to this type, returning a WithContext wrapper. Read more
source§

impl<T> Instrument for T

source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
source§

impl<T, U> Into<U> for Twhere U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T> IntoRequest<T> for T

source§

fn into_request(self) -> Request<T>

Wrap the input message T in a tonic::Request
source§

impl<Unshared, Shared> IntoShared<Shared> for Unsharedwhere Shared: FromUnshared<Unshared>,

source§

fn into_shared(self) -> Shared

Creates a shared type from an unshared type.
source§

impl<T> Pipe for Twhere T: ?Sized,

source§

fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere Self: Sized,

Pipes by value. This is generally the method you want to use. Read more
source§

fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere R: 'a,

Borrows self and passes that borrow into the pipe function. Read more
source§

fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere R: 'a,

Mutably borrows self and passes that borrow into the pipe function. Read more
source§

fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> Rwhere Self: Borrow<B>, B: 'a + ?Sized, R: 'a,

Borrows self, then passes self.borrow() into the pipe function. Read more
source§

fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R ) -> Rwhere Self: BorrowMut<B>, B: 'a + ?Sized, R: 'a,

Mutably borrows self, then passes self.borrow_mut() into the pipe function. Read more
source§

fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> Rwhere Self: AsRef<U>, U: 'a + ?Sized, R: 'a,

Borrows self, then passes self.as_ref() into the pipe function.
source§

fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> Rwhere Self: AsMut<U>, U: 'a + ?Sized, R: 'a,

Mutably borrows self, then passes self.as_mut() into the pipe function.
source§

fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> Rwhere Self: Deref<Target = T>, T: 'a + ?Sized, R: 'a,

Borrows self, then passes self.deref() into the pipe function.
source§

fn pipe_deref_mut<'a, T, R>( &'a mut self, func: impl FnOnce(&'a mut T) -> R ) -> Rwhere Self: DerefMut<Target = T> + Deref, T: 'a + ?Sized, R: 'a,

Mutably borrows self, then passes self.deref_mut() into the pipe function.
source§

impl<T> Pointable for T

source§

const ALIGN: usize = _

The alignment of pointer.
§

type Init = T

The type for initializers.
source§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
source§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
source§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
source§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
source§

impl<P, R> ProtoType<R> for Pwhere R: RustType<P>,

source§

impl<T> Same for T

§

type Output = T

Should always be Self
source§

impl<T> Tap for T

source§

fn tap(self, func: impl FnOnce(&Self)) -> Self

Immutable access to a value. Read more
source§

fn tap_mut(self, func: impl FnOnce(&mut Self)) -> Self

Mutable access to a value. Read more
source§

fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Selfwhere Self: Borrow<B>, B: ?Sized,

Immutable access to the Borrow<B> of a value. Read more
source§

fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Selfwhere Self: BorrowMut<B>, B: ?Sized,

Mutable access to the BorrowMut<B> of a value. Read more
source§

fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Selfwhere Self: AsRef<R>, R: ?Sized,

Immutable access to the AsRef<R> view of a value. Read more
source§

fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Selfwhere Self: AsMut<R>, R: ?Sized,

Mutable access to the AsMut<R> view of a value. Read more
source§

fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Selfwhere Self: Deref<Target = T>, T: ?Sized,

Immutable access to the Deref::Target of a value. Read more
source§

fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Selfwhere Self: DerefMut<Target = T> + Deref, T: ?Sized,

Mutable access to the Deref::Target of a value. Read more
source§

fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self

Calls .tap() only in debug builds, and is erased in release builds.
source§

fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self

Calls .tap_mut() only in debug builds, and is erased in release builds.
source§

fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Selfwhere Self: Borrow<B>, B: ?Sized,

Calls .tap_borrow() only in debug builds, and is erased in release builds.
source§

fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Selfwhere Self: BorrowMut<B>, B: ?Sized,

Calls .tap_borrow_mut() only in debug builds, and is erased in release builds.
source§

fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Selfwhere Self: AsRef<R>, R: ?Sized,

Calls .tap_ref() only in debug builds, and is erased in release builds.
source§

fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Selfwhere Self: AsMut<R>, R: ?Sized,

Calls .tap_ref_mut() only in debug builds, and is erased in release builds.
source§

fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Selfwhere Self: Deref<Target = T>, T: ?Sized,

Calls .tap_deref() only in debug builds, and is erased in release builds.
source§

fn tap_deref_mut_dbg<T>(self, func: impl FnOnce(&mut T)) -> Selfwhere Self: DerefMut<Target = T> + Deref, T: ?Sized,

Calls .tap_deref_mut() only in debug builds, and is erased in release builds.
source§

impl<T> TryConv for T

source§

fn try_conv<T>(self) -> Result<T, Self::Error>where Self: TryInto<T>,

Attempts to convert self into T using TryInto<T>. Read more
source§

impl<T, U> TryFrom<U> for Twhere U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for Twhere U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
source§

impl<V, T> VZip<V> for Twhere V: MultiLane<T>,

source§

fn vzip(self) -> V

source§

impl<T> WithSubscriber for T

source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more