Struct mz_compute_client::controller::instance::ActiveInstance
source · pub(super) struct ActiveInstance<'a, T> {
compute: &'a mut Instance<T>,
storage_controller: &'a mut dyn StorageController<Timestamp = T>,
}
Expand description
A wrapper around Instance
with a live storage controller.
Fields§
§compute: &'a mut Instance<T>
§storage_controller: &'a mut dyn StorageController<Timestamp = T>
Implementations§
source§impl<'a, T> ActiveInstance<'a, T>where
T: Timestamp + Lattice,
ComputeGrpcClient: ComputeClient<T>,
impl<'a, T> ActiveInstance<'a, T>where T: Timestamp + Lattice, ComputeGrpcClient: ComputeClient<T>,
sourcepub fn add_replica(
&mut self,
id: ReplicaId,
config: ReplicaConfig
) -> Result<(), ReplicaExists>
pub fn add_replica( &mut self, id: ReplicaId, config: ReplicaConfig ) -> Result<(), ReplicaExists>
Add a new instance replica, by ID.
sourcepub fn remove_replica(&mut self, id: ReplicaId) -> Result<(), ReplicaMissing>
pub fn remove_replica(&mut self, id: ReplicaId) -> Result<(), ReplicaMissing>
Remove an existing instance replica, by ID.
sourcefn rehydrate_replica(&mut self, id: ReplicaId)
fn rehydrate_replica(&mut self, id: ReplicaId)
sourcefn rehydrate_failed_replicas(&mut self)
fn rehydrate_failed_replicas(&mut self)
Rehydrate any failed replicas of this instance.
sourcepub fn create_dataflow(
&mut self,
dataflow: DataflowDescription<Plan<T>, (), T>
) -> Result<(), DataflowCreationError>
pub fn create_dataflow( &mut self, dataflow: DataflowDescription<Plan<T>, (), T> ) -> Result<(), DataflowCreationError>
Create the described dataflows and initializes state for their output.
sourcepub fn drop_collections(
&mut self,
ids: Vec<GlobalId>
) -> Result<(), CollectionMissing>
pub fn drop_collections( &mut self, ids: Vec<GlobalId> ) -> Result<(), CollectionMissing>
Drops the read capability for the given collections and allows their resources to be reclaimed.
sourcepub fn peek(
&mut self,
id: GlobalId,
literal_constraints: Option<Vec<Row>>,
uuid: Uuid,
timestamp: T,
finishing: RowSetFinishing,
map_filter_project: SafeMfpPlan,
target_replica: Option<ReplicaId>,
peek_target: PeekTarget
) -> Result<(), PeekError>
pub fn peek( &mut self, id: GlobalId, literal_constraints: Option<Vec<Row>>, uuid: Uuid, timestamp: T, finishing: RowSetFinishing, map_filter_project: SafeMfpPlan, target_replica: Option<ReplicaId>, peek_target: PeekTarget ) -> Result<(), PeekError>
Initiate a peek request for the contents of id
at timestamp
.
sourcepub fn cancel_peek(&mut self, uuid: Uuid)
pub fn cancel_peek(&mut self, uuid: Uuid)
Cancels an existing peek request.
sourcepub fn set_read_policy(
&mut self,
policies: Vec<(GlobalId, ReadPolicy<T>)>
) -> Result<(), ReadPolicyError>
pub fn set_read_policy( &mut self, policies: Vec<(GlobalId, ReadPolicy<T>)> ) -> Result<(), ReadPolicyError>
Assigns a read policy to specific identifiers.
The policies are assigned in the order presented, and repeated identifiers should conclude with the last policy. Changing a policy will immediately downgrade the read capability if appropriate, but it will not “recover” the read capability if the prior capability is already ahead of it.
Identifiers not present in policies
retain their existing read policies.
It is an error to attempt to set a read policy for a collection that is not readable in the context of compute. At this time, only indexes are readable compute collections.
sourcefn update_write_frontiers(
&mut self,
replica_id: ReplicaId,
updates: &BTreeMap<GlobalId, Antichain<T>>
)
fn update_write_frontiers( &mut self, replica_id: ReplicaId, updates: &BTreeMap<GlobalId, Antichain<T>> )
Accept write frontier updates from the compute layer.
Panics
Panics if any of the updates
references an absent collection.
Panics if any of the updates
regresses an existing write frontier.
sourcefn update_replica_write_frontiers(
&mut self,
replica_id: ReplicaId,
updates: &BTreeMap<GlobalId, Antichain<T>>
)
fn update_replica_write_frontiers( &mut self, replica_id: ReplicaId, updates: &BTreeMap<GlobalId, Antichain<T>> )
Apply replica write frontier updates.
Panics
Panics if any of the updates
references an absent collection.
Panics if any of the updates
regresses an existing replica write frontier.
sourcefn remove_replica_write_frontiers(&mut self, replica_id: ReplicaId)
fn remove_replica_write_frontiers(&mut self, replica_id: ReplicaId)
Remove frontier tracking state for the given replica.
sourcefn maybe_update_global_write_frontiers(
&mut self,
updates: &BTreeMap<GlobalId, Antichain<T>>
)
fn maybe_update_global_write_frontiers( &mut self, updates: &BTreeMap<GlobalId, Antichain<T>> )
Apply global write frontier updates.
Frontier regressions are gracefully ignored.
Panics
Panics if any of the updates
references an absent collection.
sourcefn update_read_capabilities(
&mut self,
updates: BTreeMap<GlobalId, ChangeBatch<T>>
)
fn update_read_capabilities( &mut self, updates: BTreeMap<GlobalId, ChangeBatch<T>> )
Applies updates
, propagates consequences through other read capabilities, and sends
appropriate compaction commands.
sourcefn remove_peek(&mut self, uuid: Uuid)
fn remove_peek(&mut self, uuid: Uuid)
Removes a registered peek and clean up associated state.
As part of this we:
- Emit a
CancelPeek
command to instruct replicas to stop spending resources on this peek, and to allow theComputeCommandHistory
to reduce away the correspondingPeek
command. - Remove the read hold for this peek, unblocking compaction that might have waited on it.
pub fn handle_response( &mut self, response: ComputeResponse<T>, replica_id: ReplicaId ) -> Option<ComputeControllerResponse<T>>
fn handle_frontier_upper( &mut self, id: GlobalId, new_frontier: Antichain<T>, replica_id: ReplicaId )
fn handle_peek_response( &mut self, uuid: Uuid, response: PeekResponse, otel_ctx: OpenTelemetryContext, replica_id: ReplicaId ) -> Option<ComputeControllerResponse<T>>
fn handle_copy_to_response( &mut self, sink_id: GlobalId, response: CopyToResponse, replica_id: ReplicaId ) -> Option<ComputeControllerResponse<T>>
fn handle_subscribe_response( &mut self, subscribe_id: GlobalId, response: SubscribeResponse<T>, replica_id: ReplicaId ) -> Option<ComputeControllerResponse<T>>
fn handle_status_response( &mut self, response: StatusResponse, replica_id: ReplicaId )
source§impl<'a, T> ActiveInstance<'a, T>where
T: TimestampManipulation,
ComputeGrpcClient: ComputeClient<T>,
impl<'a, T> ActiveInstance<'a, T>where T: TimestampManipulation, ComputeGrpcClient: ComputeClient<T>,
sourcefn downgrade_warmup_capabilities(&mut self)
fn downgrade_warmup_capabilities(&mut self)
Downgrade the warmup capabilities of collections as much as possible.
The only requirement we have for a collection’s warmup capability is that it is for a time
that is available in all of the collection’s inputs. For each input the latest time that is
the case for is write_frontier - 1
. So the farthest we can downgrade a collection’s
warmup capability is the minimum of write_frontier - 1
of all its inputs.
This method expects to be periodically called as part of instance maintenance work. We would like to instead update the warmup capabilities synchronously in response to frontier updates of dependency collections, but that is not generally possible because we don’t learn about frontier updates of storage collections synchronously. We could do synchronous updates for compute dependencies, but we refrain from doing for simplicity.
sourcepub fn maintain(&mut self)
pub fn maintain(&mut self)
Process pending maintenance work.
This method is invoked periodically by the global controller. It is a good place to perform maintenance work that arises from various controller state changes and that cannot conveniently be handled synchronously with those state changes.
Trait Implementations§
Auto Trait Implementations§
impl<'a, T> !RefUnwindSafe for ActiveInstance<'a, T>
impl<'a, T> !Send for ActiveInstance<'a, T>
impl<'a, T> !Sync for ActiveInstance<'a, T>
impl<'a, T> Unpin for ActiveInstance<'a, T>
impl<'a, T> !UnwindSafe for ActiveInstance<'a, T>
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
source§impl<T> FmtForward for T
impl<T> FmtForward for T
source§fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
fn fmt_binary(self) -> FmtBinary<Self>where Self: Binary,
self
to use its Binary
implementation when Debug
-formatted.source§fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
fn fmt_display(self) -> FmtDisplay<Self>where Self: Display,
self
to use its Display
implementation when
Debug
-formatted.source§fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
fn fmt_lower_exp(self) -> FmtLowerExp<Self>where Self: LowerExp,
self
to use its LowerExp
implementation when
Debug
-formatted.source§fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
fn fmt_lower_hex(self) -> FmtLowerHex<Self>where Self: LowerHex,
self
to use its LowerHex
implementation when
Debug
-formatted.source§fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
fn fmt_octal(self) -> FmtOctal<Self>where Self: Octal,
self
to use its Octal
implementation when Debug
-formatted.source§fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
fn fmt_pointer(self) -> FmtPointer<Self>where Self: Pointer,
self
to use its Pointer
implementation when
Debug
-formatted.source§fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
fn fmt_upper_exp(self) -> FmtUpperExp<Self>where Self: UpperExp,
self
to use its UpperExp
implementation when
Debug
-formatted.source§fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
fn fmt_upper_hex(self) -> FmtUpperHex<Self>where Self: UpperHex,
self
to use its UpperHex
implementation when
Debug
-formatted.source§impl<T> FutureExt for T
impl<T> FutureExt for T
source§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
source§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
source§impl<T> Instrument for T
impl<T> Instrument for T
source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T
in a tonic::Request
source§impl<T> Pipe for Twhere
T: ?Sized,
impl<T> Pipe for Twhere T: ?Sized,
source§fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere Self: Sized,
source§fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere R: 'a,
self
and passes that borrow into the pipe function. Read moresource§fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere R: 'a,
self
and passes that borrow into the pipe function. Read moresource§fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> Rwhere
Self: Borrow<B>,
B: 'a + ?Sized,
R: 'a,
fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> Rwhere Self: Borrow<B>, B: 'a + ?Sized, R: 'a,
source§fn pipe_borrow_mut<'a, B, R>(
&'a mut self,
func: impl FnOnce(&'a mut B) -> R
) -> Rwhere
Self: BorrowMut<B>,
B: 'a + ?Sized,
R: 'a,
fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R ) -> Rwhere Self: BorrowMut<B>, B: 'a + ?Sized, R: 'a,
source§fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> Rwhere
Self: AsRef<U>,
U: 'a + ?Sized,
R: 'a,
fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> Rwhere Self: AsRef<U>, U: 'a + ?Sized, R: 'a,
self
, then passes self.as_ref()
into the pipe function.source§fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> Rwhere
Self: AsMut<U>,
U: 'a + ?Sized,
R: 'a,
fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> Rwhere Self: AsMut<U>, U: 'a + ?Sized, R: 'a,
self
, then passes self.as_mut()
into the pipe
function.source§impl<T> Pointable for T
impl<T> Pointable for T
source§impl<P, R> ProtoType<R> for Pwhere
R: RustType<P>,
impl<P, R> ProtoType<R> for Pwhere R: RustType<P>,
source§fn into_rust(self) -> Result<R, TryFromProtoError>
fn into_rust(self) -> Result<R, TryFromProtoError>
RustType::from_proto
.source§fn from_rust(rust: &R) -> P
fn from_rust(rust: &R) -> P
RustType::into_proto
.source§impl<T> Tap for T
impl<T> Tap for T
source§fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Selfwhere
Self: Borrow<B>,
B: ?Sized,
fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Selfwhere Self: Borrow<B>, B: ?Sized,
Borrow<B>
of a value. Read moresource§fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Selfwhere
Self: BorrowMut<B>,
B: ?Sized,
fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Selfwhere Self: BorrowMut<B>, B: ?Sized,
BorrowMut<B>
of a value. Read moresource§fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Selfwhere
Self: AsRef<R>,
R: ?Sized,
fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Selfwhere Self: AsRef<R>, R: ?Sized,
AsRef<R>
view of a value. Read moresource§fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Selfwhere
Self: AsMut<R>,
R: ?Sized,
fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Selfwhere Self: AsMut<R>, R: ?Sized,
AsMut<R>
view of a value. Read moresource§fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Selfwhere
Self: Deref<Target = T>,
T: ?Sized,
fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Selfwhere Self: Deref<Target = T>, T: ?Sized,
Deref::Target
of a value. Read moresource§fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Selfwhere
Self: DerefMut<Target = T> + Deref,
T: ?Sized,
fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Selfwhere Self: DerefMut<Target = T> + Deref, T: ?Sized,
Deref::Target
of a value. Read moresource§fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
.tap()
only in debug builds, and is erased in release builds.source§fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
.tap_mut()
only in debug builds, and is erased in release
builds.source§fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Selfwhere
Self: Borrow<B>,
B: ?Sized,
fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Selfwhere Self: Borrow<B>, B: ?Sized,
.tap_borrow()
only in debug builds, and is erased in release
builds.source§fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Selfwhere
Self: BorrowMut<B>,
B: ?Sized,
fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Selfwhere Self: BorrowMut<B>, B: ?Sized,
.tap_borrow_mut()
only in debug builds, and is erased in release
builds.source§fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Selfwhere
Self: AsRef<R>,
R: ?Sized,
fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Selfwhere Self: AsRef<R>, R: ?Sized,
.tap_ref()
only in debug builds, and is erased in release
builds.source§fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Selfwhere
Self: AsMut<R>,
R: ?Sized,
fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Selfwhere Self: AsMut<R>, R: ?Sized,
.tap_ref_mut()
only in debug builds, and is erased in release
builds.