mz_storage_controller::instance

Struct Instance

Source
pub(crate) struct Instance<T> {
    pub workload_class: Option<String>,
    replicas: BTreeMap<ReplicaId, Replica<T>>,
    active_ingestions: BTreeMap<GlobalId, ActiveIngestion>,
    ingestion_exports: BTreeMap<GlobalId, GlobalId>,
    active_exports: BTreeMap<GlobalId, ActiveExport>,
    history: CommandHistory<T>,
    epoch: ClusterStartupEpoch,
    metrics: InstanceMetrics,
    now: NowFn,
    response_tx: UnboundedSender<(Option<ReplicaId>, StorageResponse<T>)>,
}
Expand description

A controller for a storage instance.

Encapsulates communication with replicas in this instance, and their rehydration.

Note that storage objects (sources and sinks) don’t currently support replication (database-issues#5051). An instance can have muliple replicas connected, but only if it has no storage objects installed. Attempting to install storage objects on multi-replica instances, or attempting to add more than one replica to instances that have storage objects installed, is illegal and will lead to panics.

Fields§

§workload_class: Option<String>

The workload class of this instance.

This is currently only used to annotate metrics.

§replicas: BTreeMap<ReplicaId, Replica<T>>

The replicas connected to this storage instance.

§active_ingestions: BTreeMap<GlobalId, ActiveIngestion>

The ingestions currently running on this instance.

While this is derivable from history on demand, keeping a denormalized list of running ingestions is quite a bit more convenient in the implementation of StorageController::active_ingestions.

§ingestion_exports: BTreeMap<GlobalId, GlobalId>

A map from ingestion export ID to the ingestion that is producing it.

§active_exports: BTreeMap<GlobalId, ActiveExport>

The exports currently running on this instance.

While this is derivable from history on demand, keeping a denormalized list of running exports is quite a bit more convenient for the controller.

§history: CommandHistory<T>

The command history, used to replay past commands when introducing new replicas or reconnecting to existing replicas.

§epoch: ClusterStartupEpoch

The current cluster startup epoch.

The replica value of the epoch is increased every time a replica is (re)connected, allowing the distinction of different replica incarnations.

§metrics: InstanceMetrics

Metrics tracked for this storage instance.

§now: NowFn

A function that returns the current time.

§response_tx: UnboundedSender<(Option<ReplicaId>, StorageResponse<T>)>

A sender for responses from replicas.

Responses are tagged with the ReplicaId of the replica that sent the response. Responses that don’t originate from a replica (e.g. a “paused” status update, when no replicas are connected) are tagged with None.

Implementations§

Source§

impl<T> Instance<T>

Source

pub fn new( envd_epoch: NonZeroI64, metrics: InstanceMetrics, now: NowFn, instance_response_tx: UnboundedSender<(Option<ReplicaId>, StorageResponse<T>)>, enable_snapshot_frontier: ConfigValHandle<bool>, ) -> Self

Creates a new Instance.

Source

pub fn replica_ids(&self) -> impl Iterator<Item = ReplicaId> + '_

Returns the IDs of all replicas connected to this storage instance.

Source

pub fn add_replica(&mut self, id: ReplicaId, config: ReplicaConfig)

Adds a new replica to this storage instance.

Source

pub fn replay_commands(&mut self, replica_id: ReplicaId)

Replays commands to the specified replica.

Source

pub fn drop_replica(&mut self, id: ReplicaId)

Removes the identified replica from this storage instance.

Source

pub fn rehydrate_failed_replicas(&mut self)

Rehydrates any failed replicas of this storage instance.

Source

pub fn active_ingestions(&self) -> impl Iterator<Item = &GlobalId>

Returns the ingestions running on this instance.

Source

pub fn active_exports(&self) -> impl Iterator<Item = &GlobalId>

Returns the exports running on this instance.

Source

fn update_paused_statuses(&mut self)

Sets the status to paused for all sources/sinks in the history.

Source

pub fn send(&mut self, command: StorageCommand<T>)

Sends a command to this storage instance.

Source

fn absorb_ingestions(&mut self, ingestions: Vec<RunIngestionCommand>)

Updates internal state based on incoming ingestion commands.

This does not send commands to replicas, we only record the ingestion in state and potentially update scheduling decisions.

Source

fn absorb_exports(&mut self, exports: Vec<RunSinkCommand<T>>)

Updates internal state based on incoming export commands.

This does not send commands to replicas, we only record the export in state and potentially update scheduling decisions.

Source

fn update_scheduling(&mut self, send_commands: bool)

Update scheduling decisions, that is what replicas should be running a given object, if needed.

An important property of this scheduling algorithm is that we never change the scheduling decision for single-replica objects unless we have to, that is unless the replica that they are running on goes away. We do this, so that we don’t send a mix of “run”/“allow compaction”/“run” messages to replicas, which wouldn’t deal well with this. When we do have to make a scheduling decision we schedule a single-replica ingestion on the first replica, according to the sort order of ReplicaId. We do this latter so that the scheduling decision is stable across restarts of environmentd/the controller.

For multi-replica objects (e.g. Kafka ingestions), each active object is scheduled on all replicas.

If send_commands is true, will send commands for newly-scheduled single-replica objects.

Source

pub fn get_ingestion_description( &self, id: &GlobalId, ) -> Option<IngestionDescription<CollectionMetadata>>

Returns the ingestion description for the given ingestion ID, if it exists.

This function searches through the command history to find the most recent RunIngestionCommand for the specified ingestion ID and returns its description. Returns None if no ingestion with the given ID is found.

Source

pub fn get_export_description( &self, id: &GlobalId, ) -> Option<StorageSinkDesc<CollectionMetadata, T>>

Returns the export description for the given export ID, if it exists.

This function searches through the command history to find the most recent RunSinkCommand for the specified export ID and returns its description. Returns None if no ingestion with the given ID is found.

Source

fn absorb_compactions(&mut self, cmds: Vec<(GlobalId, Antichain<T>)>)

Updates internal state based on incoming compaction commands.

Source

fn active_replicas( &mut self, id: &GlobalId, ) -> Box<dyn Iterator<Item = &mut Replica<T>> + '_>

Returns the replicas that are actively running the given object (ingestion or export).

Source

fn is_active_replica(&self, id: &GlobalId, replica_id: &ReplicaId) -> bool

Returns whether the given replica is actively running the given object (ingestion or export).

Source

pub(crate) fn refresh_state_metrics(&self)

Refresh the controller state metrics for this instance.

We could also do state metric updates directly in response to state changes, but that would mean littering the code with metric update calls. Encapsulating state metric maintenance in a single method is less noisy.

This method is invoked by Controller::maintain, which we expect to be called once per second during normal operation.

Source

pub fn get_active_replicas_for_object( &self, id: &GlobalId, ) -> BTreeSet<ReplicaId>

Returns the set of replica IDs that are actively running the given object (ingestion, ingestion export (aka. subsource), or export).

Trait Implementations§

Source§

impl<T: Debug> Debug for Instance<T>

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

§

impl<T> Freeze for Instance<T>

§

impl<T> !RefUnwindSafe for Instance<T>

§

impl<T> Send for Instance<T>
where T: Send,

§

impl<T> Sync for Instance<T>
where T: Send + Sync,

§

impl<T> Unpin for Instance<T>
where T: Unpin,

§

impl<T> !UnwindSafe for Instance<T>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T, U> CastInto<U> for T
where U: CastFrom<T>,

Source§

fn cast_into(self) -> U

Performs the cast.
Source§

impl<T> Conv for T

Source§

fn conv<T>(self) -> T
where Self: Into<T>,

Converts self into T using Into<T>. Read more
Source§

impl<T> CopyAs<T> for T

Source§

fn copy_as(self) -> T

Source§

impl<T> FmtForward for T

Source§

fn fmt_binary(self) -> FmtBinary<Self>
where Self: Binary,

Causes self to use its Binary implementation when Debug-formatted.
Source§

fn fmt_display(self) -> FmtDisplay<Self>
where Self: Display,

Causes self to use its Display implementation when Debug-formatted.
Source§

fn fmt_lower_exp(self) -> FmtLowerExp<Self>
where Self: LowerExp,

Causes self to use its LowerExp implementation when Debug-formatted.
Source§

fn fmt_lower_hex(self) -> FmtLowerHex<Self>
where Self: LowerHex,

Causes self to use its LowerHex implementation when Debug-formatted.
Source§

fn fmt_octal(self) -> FmtOctal<Self>
where Self: Octal,

Causes self to use its Octal implementation when Debug-formatted.
Source§

fn fmt_pointer(self) -> FmtPointer<Self>
where Self: Pointer,

Causes self to use its Pointer implementation when Debug-formatted.
Source§

fn fmt_upper_exp(self) -> FmtUpperExp<Self>
where Self: UpperExp,

Causes self to use its UpperExp implementation when Debug-formatted.
Source§

fn fmt_upper_hex(self) -> FmtUpperHex<Self>
where Self: UpperHex,

Causes self to use its UpperHex implementation when Debug-formatted.
Source§

fn fmt_list(self) -> FmtList<Self>
where &'a Self: for<'a> IntoIterator,

Formats each item in a sequence. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> FutureExt for T

Source§

fn with_context(self, otel_cx: Context) -> WithContext<Self>

Attaches the provided Context to this type, returning a WithContext wrapper. Read more
Source§

fn with_current_context(self) -> WithContext<Self>

Attaches the current Context to this type, returning a WithContext wrapper. Read more
Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T> IntoRequest<T> for T

Source§

fn into_request(self) -> Request<T>

Wrap the input message T in a tonic::Request
Source§

impl<Unshared, Shared> IntoShared<Shared> for Unshared
where Shared: FromUnshared<Unshared>,

Source§

fn into_shared(self) -> Shared

Creates a shared type from an unshared type.
Source§

impl<T, U> OverrideFrom<Option<&T>> for U
where U: OverrideFrom<T>,

Source§

fn override_from(self, layer: &Option<&T>) -> U

Override the configuration represented by Self with values from the given layer.
Source§

impl<T> Pipe for T
where T: ?Sized,

Source§

fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> R
where Self: Sized,

Pipes by value. This is generally the method you want to use. Read more
Source§

fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> R
where R: 'a,

Borrows self and passes that borrow into the pipe function. Read more
Source§

fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> R
where R: 'a,

Mutably borrows self and passes that borrow into the pipe function. Read more
Source§

fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
where Self: Borrow<B>, B: 'a + ?Sized, R: 'a,

Borrows self, then passes self.borrow() into the pipe function. Read more
Source§

fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R, ) -> R
where Self: BorrowMut<B>, B: 'a + ?Sized, R: 'a,

Mutably borrows self, then passes self.borrow_mut() into the pipe function. Read more
Source§

fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
where Self: AsRef<U>, U: 'a + ?Sized, R: 'a,

Borrows self, then passes self.as_ref() into the pipe function.
Source§

fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
where Self: AsMut<U>, U: 'a + ?Sized, R: 'a,

Mutably borrows self, then passes self.as_mut() into the pipe function.
Source§

fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
where Self: Deref<Target = T>, T: 'a + ?Sized, R: 'a,

Borrows self, then passes self.deref() into the pipe function.
Source§

fn pipe_deref_mut<'a, T, R>( &'a mut self, func: impl FnOnce(&'a mut T) -> R, ) -> R
where Self: DerefMut<Target = T> + Deref, T: 'a + ?Sized, R: 'a,

Mutably borrows self, then passes self.deref_mut() into the pipe function.
Source§

impl<T> Pointable for T

Source§

const ALIGN: usize = _

The alignment of pointer.
Source§

type Init = T

The type for initializers.
Source§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
Source§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
Source§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
Source§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
Source§

impl<P, R> ProtoType<R> for P
where R: RustType<P>,

Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<'a, S, T> Semigroup<&'a S> for T
where T: Semigroup<S>,

Source§

fn plus_equals(&mut self, rhs: &&'a S)

The method of std::ops::AddAssign, for types that do not implement AddAssign.
Source§

impl<T> Tap for T

Source§

fn tap(self, func: impl FnOnce(&Self)) -> Self

Immutable access to a value. Read more
Source§

fn tap_mut(self, func: impl FnOnce(&mut Self)) -> Self

Mutable access to a value. Read more
Source§

fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
where Self: Borrow<B>, B: ?Sized,

Immutable access to the Borrow<B> of a value. Read more
Source§

fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
where Self: BorrowMut<B>, B: ?Sized,

Mutable access to the BorrowMut<B> of a value. Read more
Source§

fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
where Self: AsRef<R>, R: ?Sized,

Immutable access to the AsRef<R> view of a value. Read more
Source§

fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
where Self: AsMut<R>, R: ?Sized,

Mutable access to the AsMut<R> view of a value. Read more
Source§

fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
where Self: Deref<Target = T>, T: ?Sized,

Immutable access to the Deref::Target of a value. Read more
Source§

fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
where Self: DerefMut<Target = T> + Deref, T: ?Sized,

Mutable access to the Deref::Target of a value. Read more
Source§

fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self

Calls .tap() only in debug builds, and is erased in release builds.
Source§

fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self

Calls .tap_mut() only in debug builds, and is erased in release builds.
Source§

fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
where Self: Borrow<B>, B: ?Sized,

Calls .tap_borrow() only in debug builds, and is erased in release builds.
Source§

fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
where Self: BorrowMut<B>, B: ?Sized,

Calls .tap_borrow_mut() only in debug builds, and is erased in release builds.
Source§

fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
where Self: AsRef<R>, R: ?Sized,

Calls .tap_ref() only in debug builds, and is erased in release builds.
Source§

fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
where Self: AsMut<R>, R: ?Sized,

Calls .tap_ref_mut() only in debug builds, and is erased in release builds.
Source§

fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
where Self: Deref<Target = T>, T: ?Sized,

Calls .tap_deref() only in debug builds, and is erased in release builds.
Source§

fn tap_deref_mut_dbg<T>(self, func: impl FnOnce(&mut T)) -> Self
where Self: DerefMut<Target = T> + Deref, T: ?Sized,

Calls .tap_deref_mut() only in debug builds, and is erased in release builds.
Source§

impl<T> TryConv for T

Source§

fn try_conv<T>(self) -> Result<T, Self::Error>
where Self: TryInto<T>,

Attempts to convert self into T using TryInto<T>. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more