Struct mz_compute_client::controller::ComputeController
source · pub struct ComputeController<T> {Show 19 fields
instances: BTreeMap<ComputeInstanceId, Instance<T>>,
instance_workload_classes: Arc<Mutex<BTreeMap<ComputeInstanceId, Option<String>>>>,
build_info: &'static BuildInfo,
storage_collections: Arc<dyn StorageCollections<Timestamp = T>>,
initialized: bool,
read_only: bool,
config: ComputeParameters,
arrangement_exert_proportionality: u32,
stashed_replica_response: Option<(ComputeInstanceId, ReplicaId, ComputeResponse<T>)>,
envd_epoch: NonZeroI64,
metrics: ComputeControllerMetrics,
wallclock_lag: Arc<dyn Fn(&T) -> Duration>,
dyncfg: Arc<ConfigSet>,
response_rx: Receiver<ComputeControllerResponse<T>>,
response_tx: Sender<ComputeControllerResponse<T>>,
introspection_rx: Receiver<(IntrospectionType, Vec<(Row, Diff)>)>,
introspection_tx: Sender<(IntrospectionType, Vec<(Row, Diff)>)>,
maintenance_ticker: Interval,
maintenance_scheduled: bool,
}
Expand description
A controller for the compute layer.
Fields§
§instances: BTreeMap<ComputeInstanceId, Instance<T>>
§instance_workload_classes: Arc<Mutex<BTreeMap<ComputeInstanceId, Option<String>>>>
A map from an instance ID to an arbitrary string that describes the
class of the workload that compute instance is running (e.g.,
production
or staging
).
build_info: &'static BuildInfo
§storage_collections: Arc<dyn StorageCollections<Timestamp = T>>
A handle providing access to storage collections.
initialized: bool
Set to true
once initialization_complete
has been called.
read_only: bool
Whether or not this controller is in read-only mode.
When in read-only mode, neither this controller nor the instances controlled by it are allowed to affect changes to external systems (largely persist).
config: ComputeParameters
Compute configuration to apply to new instances.
arrangement_exert_proportionality: u32
arrangement_exert_proportionality
value passed to new replicas.
stashed_replica_response: Option<(ComputeInstanceId, ReplicaId, ComputeResponse<T>)>
A replica response to be handled by the corresponding Instance
on a subsequent call to
ComputeController::process
.
envd_epoch: NonZeroI64
A number that increases on every environmentd
restart.
metrics: ComputeControllerMetrics
The compute controller metrics.
wallclock_lag: Arc<dyn Fn(&T) -> Duration>
A function that compute the lag between the given time and wallclock time.
dyncfg: Arc<ConfigSet>
Dynamic system configuration.
Updated through ComputeController::update_configuration
calls and shared with all
subcomponents of the compute controller.
response_rx: Receiver<ComputeControllerResponse<T>>
Receiver for responses produced by Instance
s, to be delivered on subsequent calls to
ComputeController::process
.
response_tx: Sender<ComputeControllerResponse<T>>
Response sender that’s passed to new Instance
s.
introspection_rx: Receiver<(IntrospectionType, Vec<(Row, Diff)>)>
Receiver for introspection updates produced by Instance
s.
introspection_tx: Sender<(IntrospectionType, Vec<(Row, Diff)>)>
Introspection updates sender that’s passed to new Instance
s.
maintenance_ticker: Interval
Ticker for scheduling periodic maintenance work.
maintenance_scheduled: bool
Whether maintenance work was scheduled.
Implementations§
source§impl<T: ComputeControllerTimestamp> ComputeController<T>
impl<T: ComputeControllerTimestamp> ComputeController<T>
sourcepub fn new(
build_info: &'static BuildInfo,
storage_collections: Arc<dyn StorageCollections<Timestamp = T>>,
envd_epoch: NonZeroI64,
read_only: bool,
metrics_registry: MetricsRegistry,
wallclock_lag: Arc<dyn Fn(&T) -> Duration>
) -> Self
pub fn new( build_info: &'static BuildInfo, storage_collections: Arc<dyn StorageCollections<Timestamp = T>>, envd_epoch: NonZeroI64, read_only: bool, metrics_registry: MetricsRegistry, wallclock_lag: Arc<dyn Fn(&T) -> Duration> ) -> Self
Construct a new ComputeController
.
sourcepub fn instance_exists(&self, id: ComputeInstanceId) -> bool
pub fn instance_exists(&self, id: ComputeInstanceId) -> bool
TODO(#25239): Add documentation.
sourcefn instance(
&self,
id: ComputeInstanceId
) -> Result<&Instance<T>, InstanceMissing>
fn instance( &self, id: ComputeInstanceId ) -> Result<&Instance<T>, InstanceMissing>
Return a reference to the indicated compute instance.
sourcefn instance_mut(
&mut self,
id: ComputeInstanceId
) -> Result<&mut Instance<T>, InstanceMissing>
fn instance_mut( &mut self, id: ComputeInstanceId ) -> Result<&mut Instance<T>, InstanceMissing>
Return a mutable reference to the indicated compute instance.
sourcepub fn instance_ref(
&self,
id: ComputeInstanceId
) -> Result<ComputeInstanceRef<'_, T>, InstanceMissing>
pub fn instance_ref( &self, id: ComputeInstanceId ) -> Result<ComputeInstanceRef<'_, T>, InstanceMissing>
Return a read-only handle to the indicated compute instance.
sourcepub fn collection(
&self,
instance_id: ComputeInstanceId,
collection_id: GlobalId
) -> Result<&CollectionState<T>, CollectionLookupError>
pub fn collection( &self, instance_id: ComputeInstanceId, collection_id: GlobalId ) -> Result<&CollectionState<T>, CollectionLookupError>
Return a read-only handle to the indicated collection.
sourcepub fn find_collection(
&self,
collection_id: GlobalId
) -> Result<&CollectionState<T>, CollectionLookupError>
pub fn find_collection( &self, collection_id: GlobalId ) -> Result<&CollectionState<T>, CollectionLookupError>
Return a read-only handle to the indicated collection.
sourcepub fn collection_reverse_dependencies(
&self,
instance_id: ComputeInstanceId,
id: GlobalId
) -> Result<impl Iterator<Item = &GlobalId>, InstanceMissing>
pub fn collection_reverse_dependencies( &self, instance_id: ComputeInstanceId, id: GlobalId ) -> Result<impl Iterator<Item = &GlobalId>, InstanceMissing>
List compute collections that depend on the given collection.
sourcepub fn set_arrangement_exert_proportionality(&mut self, value: u32)
pub fn set_arrangement_exert_proportionality(&mut self, value: u32)
Set the arrangement_exert_proportionality
value to be passed to new replicas.
sourcepub fn clusters_hydrated(&self) -> bool
pub fn clusters_hydrated(&self) -> bool
Returns true
iff all collections on all clusters have been hydrated.
For this check, zero-replica clusters are always considered hydrated. Their collections would never normally be considered hydrated but it’s clearly intentional that they have no replicas.
sourcepub fn collection_frontiers(
&self
) -> BTreeMap<GlobalId, (Antichain<T>, Antichain<T>)>
pub fn collection_frontiers( &self ) -> BTreeMap<GlobalId, (Antichain<T>, Antichain<T>)>
Returns the read and write frontiers for each collection.
source§impl<T> ComputeController<T>
impl<T> ComputeController<T>
sourcepub fn create_instance(
&mut self,
id: ComputeInstanceId,
arranged_logs: BTreeMap<LogVariant, GlobalId>,
workload_class: Option<String>
) -> Result<(), InstanceExists>
pub fn create_instance( &mut self, id: ComputeInstanceId, arranged_logs: BTreeMap<LogVariant, GlobalId>, workload_class: Option<String> ) -> Result<(), InstanceExists>
Create a compute instance.
sourcepub fn update_instance_workload_class(
&mut self,
id: ComputeInstanceId,
workload_class: Option<String>
) -> Result<(), InstanceMissing>
pub fn update_instance_workload_class( &mut self, id: ComputeInstanceId, workload_class: Option<String> ) -> Result<(), InstanceMissing>
Updates a compute instance’s workload class.
sourcepub fn drop_instance(&mut self, id: ComputeInstanceId)
pub fn drop_instance(&mut self, id: ComputeInstanceId)
sourcepub fn update_configuration(&mut self, config_params: ComputeParameters)
pub fn update_configuration(&mut self, config_params: ComputeParameters)
Update compute configuration.
sourcepub fn initialization_complete(&mut self)
pub fn initialization_complete(&mut self)
Mark the end of any initialization commands.
The implementor may wait for this method to be called before implementing prior commands, and so it is important for a user to invoke this method as soon as it is comfortable. This method can be invoked immediately, at the potential expense of performance.
sourcepub fn allow_writes(&mut self)
pub fn allow_writes(&mut self)
Allow this controller and instances controller by it to write to external systems.
sourcepub async fn ready(&mut self)
pub async fn ready(&mut self)
Wait until the controller is ready to do some processing.
This method may block for an arbitrarily long time.
When the method returns, the caller should call ComputeController::process
.
This method is cancellation safe.
sourcepub fn set_subscribe_target_replica(
&mut self,
instance_id: ComputeInstanceId,
subscribe_id: GlobalId,
target_replica: ReplicaId
) -> Result<(), SubscribeTargetError>
pub fn set_subscribe_target_replica( &mut self, instance_id: ComputeInstanceId, subscribe_id: GlobalId, target_replica: ReplicaId ) -> Result<(), SubscribeTargetError>
Assign a target replica to the identified subscribe.
If a subscribe has a target replica assigned, only subscribe responses sent by that replica are considered.
sourcepub fn add_replica_to_instance(
&mut self,
instance_id: ComputeInstanceId,
replica_id: ReplicaId,
location: ClusterReplicaLocation,
config: ComputeReplicaConfig
) -> Result<(), ReplicaCreationError>
pub fn add_replica_to_instance( &mut self, instance_id: ComputeInstanceId, replica_id: ReplicaId, location: ClusterReplicaLocation, config: ComputeReplicaConfig ) -> Result<(), ReplicaCreationError>
Adds replicas of an instance.
sourcepub fn drop_replica(
&mut self,
instance_id: ComputeInstanceId,
replica_id: ReplicaId
) -> Result<(), ReplicaDropError>
pub fn drop_replica( &mut self, instance_id: ComputeInstanceId, replica_id: ReplicaId ) -> Result<(), ReplicaDropError>
Removes a replica from an instance, including its service in the orchestrator.
sourcepub fn create_dataflow(
&mut self,
instance_id: ComputeInstanceId,
dataflow: DataflowDescription<Plan<T>, (), T>
) -> Result<(), DataflowCreationError>
pub fn create_dataflow( &mut self, instance_id: ComputeInstanceId, dataflow: DataflowDescription<Plan<T>, (), T> ) -> Result<(), DataflowCreationError>
Create and maintain the described dataflows, and initialize state for their output.
This method creates dataflows whose inputs are still readable at the dataflow as_of
frontier, and initializes the outputs as readable from that frontier onward.
It installs read dependencies from the outputs to the inputs, so that the input read
capabilities will be held back to the output read capabilities, ensuring that we are
always able to return to a state that can serve the output read capabilities.
sourcepub fn drop_collections(
&mut self,
instance_id: ComputeInstanceId,
collection_ids: Vec<GlobalId>
) -> Result<(), CollectionUpdateError>
pub fn drop_collections( &mut self, instance_id: ComputeInstanceId, collection_ids: Vec<GlobalId> ) -> Result<(), CollectionUpdateError>
Drop the read capability for the given collections and allow their resources to be reclaimed.
sourcepub fn peek(
&mut self,
instance_id: ComputeInstanceId,
collection_id: GlobalId,
literal_constraints: Option<Vec<Row>>,
uuid: Uuid,
timestamp: T,
finishing: RowSetFinishing,
map_filter_project: SafeMfpPlan,
target_replica: Option<ReplicaId>,
peek_target: PeekTarget
) -> Result<(), PeekError>
pub fn peek( &mut self, instance_id: ComputeInstanceId, collection_id: GlobalId, literal_constraints: Option<Vec<Row>>, uuid: Uuid, timestamp: T, finishing: RowSetFinishing, map_filter_project: SafeMfpPlan, target_replica: Option<ReplicaId>, peek_target: PeekTarget ) -> Result<(), PeekError>
Initiate a peek request for the contents of the given collection at timestamp
.
sourcepub fn cancel_peek(
&mut self,
instance_id: ComputeInstanceId,
uuid: Uuid
) -> Result<(), InstanceMissing>
pub fn cancel_peek( &mut self, instance_id: ComputeInstanceId, uuid: Uuid ) -> Result<(), InstanceMissing>
Cancel an existing peek request.
Canceling a peek is best effort. The caller may see any of the following after canceling a peek request:
- A
PeekResponse::Rows
indicating that the cancellation request did not take effect in time and the query succeeded. - A
PeekResponse::Canceled
affirming that the peek was canceled. - No
PeekResponse
at all.
sourcepub fn set_read_policy(
&mut self,
instance_id: ComputeInstanceId,
policies: Vec<(GlobalId, ReadPolicy<T>)>
) -> Result<(), ReadPolicyError>
pub fn set_read_policy( &mut self, instance_id: ComputeInstanceId, policies: Vec<(GlobalId, ReadPolicy<T>)> ) -> Result<(), ReadPolicyError>
Assign a read policy to specific identifiers.
The policies are assigned in the order presented, and repeated identifiers should conclude with the last policy. Changing a policy will immediately downgrade the read capability if appropriate, but it will not “recover” the read capability if the prior capability is already ahead of it.
Identifiers not present in policies
retain their existing read policies.
It is an error to attempt to set a read policy for a collection that is not readable in the context of compute. At this time, only indexes are readable compute collections.
async fn record_introspection_updates( &mut self, storage: &mut dyn StorageController<Timestamp = T> )
sourcepub async fn process(
&mut self,
storage: &mut dyn StorageController<Timestamp = T>
) -> Option<ComputeControllerResponse<T>>
pub async fn process( &mut self, storage: &mut dyn StorageController<Timestamp = T> ) -> Option<ComputeControllerResponse<T>>
Processes the work queued by ComputeController::ready
.
async fn maintain(&mut self, storage: &mut dyn StorageController<Timestamp = T>)
Auto Trait Implementations§
impl<T> !Freeze for ComputeController<T>
impl<T> !RefUnwindSafe for ComputeController<T>
impl<T> !Send for ComputeController<T>
impl<T> !Sync for ComputeController<T>
impl<T> Unpin for ComputeController<T>where
T: Unpin,
impl<T> !UnwindSafe for ComputeController<T>
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
source§impl<T> FmtForward for T
impl<T> FmtForward for T
source§fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
self
to use its Binary
implementation when Debug
-formatted.source§fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
self
to use its Display
implementation when
Debug
-formatted.source§fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
self
to use its LowerExp
implementation when
Debug
-formatted.source§fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
self
to use its LowerHex
implementation when
Debug
-formatted.source§fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
self
to use its Octal
implementation when Debug
-formatted.source§fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
self
to use its Pointer
implementation when
Debug
-formatted.source§fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
self
to use its UpperExp
implementation when
Debug
-formatted.source§fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
self
to use its UpperHex
implementation when
Debug
-formatted.source§impl<T> FutureExt for T
impl<T> FutureExt for T
source§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
source§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
source§impl<T> Instrument for T
impl<T> Instrument for T
source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T
in a tonic::Request
source§impl<T, U> OverrideFrom<Option<&T>> for Uwhere
U: OverrideFrom<T>,
impl<T, U> OverrideFrom<Option<&T>> for Uwhere
U: OverrideFrom<T>,
source§impl<T> Pipe for Twhere
T: ?Sized,
impl<T> Pipe for Twhere
T: ?Sized,
source§fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
source§fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
self
and passes that borrow into the pipe function. Read moresource§fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
self
and passes that borrow into the pipe function. Read moresource§fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
source§fn pipe_borrow_mut<'a, B, R>(
&'a mut self,
func: impl FnOnce(&'a mut B) -> R
) -> R
fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R ) -> R
source§fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
self
, then passes self.as_ref()
into the pipe function.source§fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
self
, then passes self.as_mut()
into the pipe
function.source§fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
self
, then passes self.deref()
into the pipe function.source§impl<T> Pointable for T
impl<T> Pointable for T
source§impl<P, R> ProtoType<R> for Pwhere
R: RustType<P>,
impl<P, R> ProtoType<R> for Pwhere
R: RustType<P>,
source§fn into_rust(self) -> Result<R, TryFromProtoError>
fn into_rust(self) -> Result<R, TryFromProtoError>
RustType::from_proto
.source§fn from_rust(rust: &R) -> P
fn from_rust(rust: &R) -> P
RustType::into_proto
.source§impl<'a, S, T> Semigroup<&'a S> for Twhere
T: Semigroup<S>,
impl<'a, S, T> Semigroup<&'a S> for Twhere
T: Semigroup<S>,
source§fn plus_equals(&mut self, rhs: &&'a S)
fn plus_equals(&mut self, rhs: &&'a S)
std::ops::AddAssign
, for types that do not implement AddAssign
.source§impl<T> Tap for T
impl<T> Tap for T
source§fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
Borrow<B>
of a value. Read moresource§fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
BorrowMut<B>
of a value. Read moresource§fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
AsRef<R>
view of a value. Read moresource§fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
AsMut<R>
view of a value. Read moresource§fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
Deref::Target
of a value. Read moresource§fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
Deref::Target
of a value. Read moresource§fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
.tap()
only in debug builds, and is erased in release builds.source§fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
.tap_mut()
only in debug builds, and is erased in release
builds.source§fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
.tap_borrow()
only in debug builds, and is erased in release
builds.source§fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
.tap_borrow_mut()
only in debug builds, and is erased in release
builds.source§fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
.tap_ref()
only in debug builds, and is erased in release
builds.source§fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
.tap_ref_mut()
only in debug builds, and is erased in release
builds.source§fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
.tap_deref()
only in debug builds, and is erased in release
builds.