pub(crate) struct Instance<T> {
pub workload_class: Option<String>,
replicas: BTreeMap<ReplicaId, Replica<T>>,
active_ingestions: BTreeMap<GlobalId, ActiveIngestion>,
ingestion_exports: BTreeMap<GlobalId, GlobalId>,
active_exports: BTreeMap<GlobalId, ActiveExport>,
history: CommandHistory<T>,
epoch: ClusterStartupEpoch,
metrics: InstanceMetrics,
now: NowFn,
response_tx: UnboundedSender<(Option<ReplicaId>, StorageResponse<T>)>,
}
Expand description
A controller for a storage instance.
Encapsulates communication with replicas in this instance, and their rehydration.
Note that storage objects (sources and sinks) don’t currently support replication (database-issues#5051). An instance can have muliple replicas connected, but only if it has no storage objects installed. Attempting to install storage objects on multi-replica instances, or attempting to add more than one replica to instances that have storage objects installed, is illegal and will lead to panics.
Fields§
§workload_class: Option<String>
The workload class of this instance.
This is currently only used to annotate metrics.
replicas: BTreeMap<ReplicaId, Replica<T>>
The replicas connected to this storage instance.
active_ingestions: BTreeMap<GlobalId, ActiveIngestion>
The ingestions currently running on this instance.
While this is derivable from history
on demand, keeping a denormalized
list of running ingestions is quite a bit more convenient in the
implementation of StorageController::active_ingestions
.
ingestion_exports: BTreeMap<GlobalId, GlobalId>
A map from ingestion export ID to the ingestion that is producing it.
active_exports: BTreeMap<GlobalId, ActiveExport>
The exports currently running on this instance.
While this is derivable from history
on demand, keeping a denormalized
list of running exports is quite a bit more convenient for the
controller.
history: CommandHistory<T>
The command history, used to replay past commands when introducing new replicas or reconnecting to existing replicas.
epoch: ClusterStartupEpoch
The current cluster startup epoch.
The replica
value of the epoch is increased every time a replica is (re)connected,
allowing the distinction of different replica incarnations.
metrics: InstanceMetrics
Metrics tracked for this storage instance.
now: NowFn
A function that returns the current time.
response_tx: UnboundedSender<(Option<ReplicaId>, StorageResponse<T>)>
A sender for responses from replicas.
Responses are tagged with the ReplicaId
of the replica that sent the
response. Responses that don’t originate from a replica (e.g. a “paused”
status update, when no replicas are connected) are tagged with None
.
Implementations§
Source§impl<T> Instance<T>
impl<T> Instance<T>
Sourcepub fn new(
envd_epoch: NonZeroI64,
metrics: InstanceMetrics,
now: NowFn,
instance_response_tx: UnboundedSender<(Option<ReplicaId>, StorageResponse<T>)>,
enable_snapshot_frontier: ConfigValHandle<bool>,
) -> Self
pub fn new( envd_epoch: NonZeroI64, metrics: InstanceMetrics, now: NowFn, instance_response_tx: UnboundedSender<(Option<ReplicaId>, StorageResponse<T>)>, enable_snapshot_frontier: ConfigValHandle<bool>, ) -> Self
Creates a new Instance
.
Sourcepub fn replica_ids(&self) -> impl Iterator<Item = ReplicaId> + '_
pub fn replica_ids(&self) -> impl Iterator<Item = ReplicaId> + '_
Returns the IDs of all replicas connected to this storage instance.
Sourcepub fn add_replica(&mut self, id: ReplicaId, config: ReplicaConfig)
pub fn add_replica(&mut self, id: ReplicaId, config: ReplicaConfig)
Adds a new replica to this storage instance.
Sourcepub fn replay_commands(&mut self, replica_id: ReplicaId)
pub fn replay_commands(&mut self, replica_id: ReplicaId)
Replays commands to the specified replica.
Sourcepub fn drop_replica(&mut self, id: ReplicaId)
pub fn drop_replica(&mut self, id: ReplicaId)
Removes the identified replica from this storage instance.
Sourcepub fn rehydrate_failed_replicas(&mut self)
pub fn rehydrate_failed_replicas(&mut self)
Rehydrates any failed replicas of this storage instance.
Sourcepub fn active_ingestions(&self) -> impl Iterator<Item = &GlobalId>
pub fn active_ingestions(&self) -> impl Iterator<Item = &GlobalId>
Returns the ingestions running on this instance.
Sourcepub fn active_exports(&self) -> impl Iterator<Item = &GlobalId>
pub fn active_exports(&self) -> impl Iterator<Item = &GlobalId>
Returns the exports running on this instance.
Sourcefn update_paused_statuses(&mut self)
fn update_paused_statuses(&mut self)
Sets the status to paused for all sources/sinks in the history.
Sourcepub fn send(&mut self, command: StorageCommand<T>)
pub fn send(&mut self, command: StorageCommand<T>)
Sends a command to this storage instance.
Sourcefn absorb_ingestions(&mut self, ingestions: Vec<RunIngestionCommand>)
fn absorb_ingestions(&mut self, ingestions: Vec<RunIngestionCommand>)
Updates internal state based on incoming ingestion commands.
This does not send commands to replicas, we only record the ingestion in state and potentially update scheduling decisions.
Sourcefn absorb_exports(&mut self, exports: Vec<RunSinkCommand<T>>)
fn absorb_exports(&mut self, exports: Vec<RunSinkCommand<T>>)
Updates internal state based on incoming export commands.
This does not send commands to replicas, we only record the export in state and potentially update scheduling decisions.
Sourcefn update_scheduling(&mut self, send_commands: bool)
fn update_scheduling(&mut self, send_commands: bool)
Update scheduling decisions, that is what replicas should be running a given object, if needed.
An important property of this scheduling algorithm is that we never
change the scheduling decision for single-replica objects unless we
have to, that is unless the replica that they are running on goes away.
We do this, so that we don’t send a mix of “run”/“allow
compaction”/“run” messages to replicas, which wouldn’t deal well with
this. When we do have to make a scheduling decision we schedule a
single-replica ingestion on the first replica, according to the sort
order of ReplicaId
. We do this latter so that the scheduling decision
is stable across restarts of environmentd
/the controller.
For multi-replica objects (e.g. Kafka ingestions), each active object is scheduled on all replicas.
If send_commands
is true, will send commands for newly-scheduled
single-replica objects.
Sourcepub fn get_ingestion_description(
&self,
id: &GlobalId,
) -> Option<IngestionDescription<CollectionMetadata>>
pub fn get_ingestion_description( &self, id: &GlobalId, ) -> Option<IngestionDescription<CollectionMetadata>>
Returns the ingestion description for the given ingestion ID, if it exists.
This function searches through the command history to find the most recent RunIngestionCommand for the specified ingestion ID and returns its description. Returns None if no ingestion with the given ID is found.
Sourcepub fn get_export_description(
&self,
id: &GlobalId,
) -> Option<StorageSinkDesc<CollectionMetadata, T>>
pub fn get_export_description( &self, id: &GlobalId, ) -> Option<StorageSinkDesc<CollectionMetadata, T>>
Returns the export description for the given export ID, if it exists.
This function searches through the command history to find the most recent RunSinkCommand for the specified export ID and returns its description. Returns None if no ingestion with the given ID is found.
Sourcefn absorb_compactions(&mut self, cmds: Vec<(GlobalId, Antichain<T>)>)
fn absorb_compactions(&mut self, cmds: Vec<(GlobalId, Antichain<T>)>)
Updates internal state based on incoming compaction commands.
Sourcefn active_replicas(
&mut self,
id: &GlobalId,
) -> Box<dyn Iterator<Item = &mut Replica<T>> + '_>
fn active_replicas( &mut self, id: &GlobalId, ) -> Box<dyn Iterator<Item = &mut Replica<T>> + '_>
Returns the replicas that are actively running the given object (ingestion or export).
Sourcefn is_active_replica(&self, id: &GlobalId, replica_id: &ReplicaId) -> bool
fn is_active_replica(&self, id: &GlobalId, replica_id: &ReplicaId) -> bool
Returns whether the given replica is actively running the given object (ingestion or export).
Sourcepub(crate) fn refresh_state_metrics(&self)
pub(crate) fn refresh_state_metrics(&self)
Refresh the controller state metrics for this instance.
We could also do state metric updates directly in response to state changes, but that would mean littering the code with metric update calls. Encapsulating state metric maintenance in a single method is less noisy.
This method is invoked by Controller::maintain
, which we expect to be called once per
second during normal operation.
Sourcepub fn get_active_replicas_for_object(
&self,
id: &GlobalId,
) -> BTreeSet<ReplicaId>
pub fn get_active_replicas_for_object( &self, id: &GlobalId, ) -> BTreeSet<ReplicaId>
Returns the set of replica IDs that are actively running the given object (ingestion, ingestion export (aka. subsource), or export).
Trait Implementations§
Auto Trait Implementations§
impl<T> Freeze for Instance<T>
impl<T> !RefUnwindSafe for Instance<T>
impl<T> Send for Instance<T>where
T: Send,
impl<T> Sync for Instance<T>
impl<T> Unpin for Instance<T>where
T: Unpin,
impl<T> !UnwindSafe for Instance<T>
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> FmtForward for T
impl<T> FmtForward for T
Source§fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
self
to use its Binary
implementation when Debug
-formatted.Source§fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
self
to use its Display
implementation when
Debug
-formatted.Source§fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
self
to use its LowerExp
implementation when
Debug
-formatted.Source§fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
self
to use its LowerHex
implementation when
Debug
-formatted.Source§fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
self
to use its Octal
implementation when Debug
-formatted.Source§fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
self
to use its Pointer
implementation when
Debug
-formatted.Source§fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
self
to use its UpperExp
implementation when
Debug
-formatted.Source§fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
self
to use its UpperHex
implementation when
Debug
-formatted.Source§impl<T> FutureExt for T
impl<T> FutureExt for T
Source§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
Source§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left
is true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left(&self)
returns true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moreSource§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
Source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T
in a tonic::Request
Source§impl<T, U> OverrideFrom<Option<&T>> for Uwhere
U: OverrideFrom<T>,
impl<T, U> OverrideFrom<Option<&T>> for Uwhere
U: OverrideFrom<T>,
Source§impl<T> Pipe for Twhere
T: ?Sized,
impl<T> Pipe for Twhere
T: ?Sized,
Source§fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
Source§fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
self
and passes that borrow into the pipe function. Read moreSource§fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
self
and passes that borrow into the pipe function. Read moreSource§fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
Source§fn pipe_borrow_mut<'a, B, R>(
&'a mut self,
func: impl FnOnce(&'a mut B) -> R,
) -> R
fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R, ) -> R
Source§fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
self
, then passes self.as_ref()
into the pipe function.Source§fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
self
, then passes self.as_mut()
into the pipe
function.Source§fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
self
, then passes self.deref()
into the pipe function.Source§impl<T> Pointable for T
impl<T> Pointable for T
Source§impl<P, R> ProtoType<R> for Pwhere
R: RustType<P>,
impl<P, R> ProtoType<R> for Pwhere
R: RustType<P>,
Source§fn into_rust(self) -> Result<R, TryFromProtoError>
fn into_rust(self) -> Result<R, TryFromProtoError>
RustType::from_proto
.Source§fn from_rust(rust: &R) -> P
fn from_rust(rust: &R) -> P
RustType::into_proto
.Source§impl<'a, S, T> Semigroup<&'a S> for Twhere
T: Semigroup<S>,
impl<'a, S, T> Semigroup<&'a S> for Twhere
T: Semigroup<S>,
Source§fn plus_equals(&mut self, rhs: &&'a S)
fn plus_equals(&mut self, rhs: &&'a S)
std::ops::AddAssign
, for types that do not implement AddAssign
.Source§impl<T> Tap for T
impl<T> Tap for T
Source§fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
Borrow<B>
of a value. Read moreSource§fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
BorrowMut<B>
of a value. Read moreSource§fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
AsRef<R>
view of a value. Read moreSource§fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
AsMut<R>
view of a value. Read moreSource§fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
Deref::Target
of a value. Read moreSource§fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
Deref::Target
of a value. Read moreSource§fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
.tap()
only in debug builds, and is erased in release builds.Source§fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
.tap_mut()
only in debug builds, and is erased in release
builds.Source§fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
.tap_borrow()
only in debug builds, and is erased in release
builds.Source§fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
.tap_borrow_mut()
only in debug builds, and is erased in release
builds.Source§fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
.tap_ref()
only in debug builds, and is erased in release
builds.Source§fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
.tap_ref_mut()
only in debug builds, and is erased in release
builds.Source§fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
.tap_deref()
only in debug builds, and is erased in release
builds.