Struct mz_controller::Controller
source · pub struct Controller<T: ComputeControllerTimestamp = Timestamp> {Show 19 fields
pub storage: Box<dyn StorageController<Timestamp = T>>,
pub storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
pub compute: ComputeController<T>,
pub(crate) clusterd_image: String,
pub(crate) init_container_image: Option<String>,
pub(crate) deploy_generation: u64,
pub(crate) read_only: bool,
pub(crate) orchestrator: Arc<dyn NamespacedOrchestrator>,
pub(crate) readiness: Readiness<T>,
pub(crate) metrics_tasks: BTreeMap<ReplicaId, AbortOnDropHandle<()>>,
pub(crate) metrics_tx: UnboundedSender<(ReplicaId, Vec<ServiceProcessMetrics>)>,
pub(crate) metrics_rx: UnboundedReceiver<(ReplicaId, Vec<ServiceProcessMetrics>)>,
pub(crate) now: NowFn,
pub(crate) persist_pubsub_url: String,
pub(crate) secrets_args: SecretsReaderCliArgs,
pub(crate) unfulfilled_watch_sets_by_object: BTreeMap<GlobalId, BTreeSet<WatchSetId>>,
pub(crate) unfulfilled_watch_sets: BTreeMap<WatchSetId, (BTreeSet<GlobalId>, T)>,
pub(crate) watch_set_id_gen: Gen<WatchSetId>,
pub(crate) immediate_watch_sets: Vec<WatchSetId>,
}
Expand description
A client that maintains soft state and validates commands, in addition to forwarding them.
Fields§
§storage: Box<dyn StorageController<Timestamp = T>>
§storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>
§compute: ComputeController<T>
§clusterd_image: String
The clusterd image to use when starting new cluster processes.
init_container_image: Option<String>
The init container image to use for clusterd.
deploy_generation: u64
A number representing the environment’s generation.
read_only: bool
Whether or not this controller is in read-only mode.
When in read-only mode, neither this controller nor the instances controlled by it are allowed to affect changes to external systems (largely persist).
orchestrator: Arc<dyn NamespacedOrchestrator>
The cluster orchestrator.
readiness: Readiness<T>
Tracks the readiness of the underlying controllers.
metrics_tasks: BTreeMap<ReplicaId, AbortOnDropHandle<()>>
Tasks for collecting replica metrics.
metrics_tx: UnboundedSender<(ReplicaId, Vec<ServiceProcessMetrics>)>
Sender for the channel over which replica metrics are sent.
metrics_rx: UnboundedReceiver<(ReplicaId, Vec<ServiceProcessMetrics>)>
Receiver for the channel over which replica metrics are sent.
now: NowFn
A function providing the current wallclock time.
persist_pubsub_url: String
The URL for Persist PubSub.
secrets_args: SecretsReaderCliArgs
Arguments for secrets readers.
unfulfilled_watch_sets_by_object: BTreeMap<GlobalId, BTreeSet<WatchSetId>>
A map associating a global ID to the set of all the unfulfilled watch set ids that include it.
See [self.install_watch_set
] for a description of watch sets.
unfulfilled_watch_sets: BTreeMap<WatchSetId, (BTreeSet<GlobalId>, T)>
A map of installed watch sets indexed by id.
watch_set_id_gen: Gen<WatchSetId>
A sequence of numbers used to mint unique WatchSetIds.
immediate_watch_sets: Vec<WatchSetId>
A list of watch sets that were already fulfilled as soon as
they were installed, and thus that must be returned to the
client on the next call to [self.process
].
See [self.install_watch_set
] for a description of watch sets.
Implementations§
source§impl<T> Controller<T>
impl<T> Controller<T>
sourcepub fn create_cluster(
&mut self,
id: ClusterId,
config: ClusterConfig,
) -> Result<(), Error>
pub fn create_cluster( &mut self, id: ClusterId, config: ClusterConfig, ) -> Result<(), Error>
Creates a cluster with the specified identifier and configuration.
A cluster is a combination of a storage instance and a compute instance. A cluster has zero or more replicas; each replica colocates the storage and compute layers on the same physical resources.
sourcepub fn update_cluster_workload_class(
&mut self,
id: ClusterId,
workload_class: Option<String>,
) -> Result<(), Error>
pub fn update_cluster_workload_class( &mut self, id: ClusterId, workload_class: Option<String>, ) -> Result<(), Error>
Updates the workload class for a cluster.
sourcepub fn drop_cluster(&mut self, id: ClusterId)
pub fn drop_cluster(&mut self, id: ClusterId)
sourcepub fn create_replica(
&mut self,
cluster_id: ClusterId,
replica_id: ReplicaId,
role: ClusterRole,
config: ReplicaConfig,
enable_worker_core_affinity: bool,
) -> Result<(), Error>
pub fn create_replica( &mut self, cluster_id: ClusterId, replica_id: ReplicaId, role: ClusterRole, config: ReplicaConfig, enable_worker_core_affinity: bool, ) -> Result<(), Error>
Creates a replica of the specified cluster with the specified identifier and configuration.
sourcepub fn drop_replica(
&mut self,
cluster_id: ClusterId,
replica_id: ReplicaId,
) -> Result<(), Error>
pub fn drop_replica( &mut self, cluster_id: ClusterId, replica_id: ReplicaId, ) -> Result<(), Error>
Drops the specified replica of the specified cluster.
sourcepub(crate) fn remove_past_generation_replicas_in_background(&self)
pub(crate) fn remove_past_generation_replicas_in_background(&self)
Removes replicas from past generations in a background task.
sourcepub async fn remove_orphaned_replicas(
&mut self,
next_user_replica_id: u64,
next_system_replica_id: u64,
) -> Result<(), Error>
pub async fn remove_orphaned_replicas( &mut self, next_user_replica_id: u64, next_system_replica_id: u64, ) -> Result<(), Error>
Remove replicas that are orphaned in the current generation.
pub fn events_stream(&self) -> BoxStream<'static, ClusterEvent>
sourcefn provision_replica(
&self,
cluster_id: ClusterId,
replica_id: ReplicaId,
role: ClusterRole,
location: ManagedReplicaLocation,
enable_worker_core_affinity: bool,
) -> Result<(Box<dyn Service>, AbortOnDropHandle<()>), Error>
fn provision_replica( &self, cluster_id: ClusterId, replica_id: ReplicaId, role: ClusterRole, location: ManagedReplicaLocation, enable_worker_core_affinity: bool, ) -> Result<(Box<dyn Service>, AbortOnDropHandle<()>), Error>
Provisions a replica with the service orchestrator.
source§impl<T: ComputeControllerTimestamp> Controller<T>
impl<T: ComputeControllerTimestamp> Controller<T>
pub fn set_arrangement_exert_proportionality(&mut self, value: u32)
sourcepub fn connection_context(&self) -> &ConnectionContext
pub fn connection_context(&self) -> &ConnectionContext
Returns the connection context installed in the controller.
This is purely a helper, and can be obtained from self.storage
.
sourcepub fn storage_configuration(&self) -> &StorageConfiguration
pub fn storage_configuration(&self) -> &StorageConfiguration
Returns the storage configuration installed in the storage controller.
This is purely a helper, and can be obtained from self.storage
.
source§impl<T> Controller<T>
impl<T> Controller<T>
pub fn update_orchestrator_scheduling_config( &self, config: ServiceSchedulingConfig, )
sourcepub fn initialization_complete(&mut self)
pub fn initialization_complete(&mut self)
Marks the end of any initialization commands.
The implementor may wait for this method to be called before implementing prior commands, and so it is important for a user to invoke this method as soon as it is comfortable. This method can be invoked immediately, at the potential expense of performance.
sourcepub(crate) fn take_internal_response(&mut self) -> Option<ControllerResponse<T>>
pub(crate) fn take_internal_response(&mut self) -> Option<ControllerResponse<T>>
Returns Some
if there is an immediately available
internally-generated response that we need to return to the
client (as opposed to waiting for a response from compute or storage).
sourcepub async fn ready(&mut self)
pub async fn ready(&mut self)
Waits until the controller is ready to process a response.
This method may block for an arbitrarily long time.
When the method returns, the owner should call Controller::ready
to
process the ready message.
This method is cancellation safe.
sourcepub fn install_compute_watch_set(
&mut self,
objects: BTreeSet<GlobalId>,
t: T,
) -> WatchSetId
pub fn install_compute_watch_set( &mut self, objects: BTreeSet<GlobalId>, t: T, ) -> WatchSetId
Install a watch set in the controller.
A watch set is a request to be informed by the controller when all of the frontiers of a particular set of objects have advanced at least to a particular timestamp.
When all the objects in objects
have advanced to t
, the watchset id
is returned to the client on the next call to Self::process
.
sourcepub fn install_storage_watch_set(
&mut self,
objects: BTreeSet<GlobalId>,
t: T,
) -> WatchSetId
pub fn install_storage_watch_set( &mut self, objects: BTreeSet<GlobalId>, t: T, ) -> WatchSetId
Install a watch set in the controller.
A watch set is a request to be informed by the controller when all of the frontiers of a particular set of objects have advanced at least to a particular timestamp.
When all the objects in objects
have advanced to t
, the watchset id
is returned to the client on the next call to Self::process
.
sourcepub fn uninstall_watch_set(&mut self, ws_id: &WatchSetId)
pub fn uninstall_watch_set(&mut self, ws_id: &WatchSetId)
Uninstalls a previously installed WatchSetId. The method is a no-op if the watch set has already finished and therefore it’s safe to call this function unconditionally.
§Panics
This method panics if called with a WatchSetId that was never returned by the function.
sourcepub(crate) fn process_storage_response(
&mut self,
storage_metadata: &StorageMetadata,
) -> Result<Option<ControllerResponse<T>>, Error>
pub(crate) fn process_storage_response( &mut self, storage_metadata: &StorageMetadata, ) -> Result<Option<ControllerResponse<T>>, Error>
Process a pending response from the storage controller. If necessary, return a higher-level response to our client.
sourcepub(crate) fn process_compute_response(
&mut self,
) -> Result<Option<ControllerResponse<T>>, Error>
pub(crate) fn process_compute_response( &mut self, ) -> Result<Option<ControllerResponse<T>>, Error>
Process a pending response from the compute controller. If necessary, return a higher-level response to our client.
sourcepub fn process(
&mut self,
storage_metadata: &StorageMetadata,
) -> Result<Option<ControllerResponse<T>>, Error>
pub fn process( &mut self, storage_metadata: &StorageMetadata, ) -> Result<Option<ControllerResponse<T>>, Error>
Processes the work queued by Controller::ready
.
This method is guaranteed to return “quickly” unless doing so would compromise the correctness of the system.
This method is not guaranteed to be cancellation safe. It must be awaited to completion.
sourcepub(crate) fn handle_frontier_updates(
&mut self,
updates: &[(GlobalId, Antichain<T>)],
) -> Option<ControllerResponse<T>>
pub(crate) fn handle_frontier_updates( &mut self, updates: &[(GlobalId, Antichain<T>)], ) -> Option<ControllerResponse<T>>
Record updates to frontiers, and propagate any necessary responses.
As of this writing (2/29/2024), the only response that can be generated
from a frontier update is WatchSetCompleted
.
pub(crate) fn process_replica_metrics( &mut self, id: ReplicaId, metrics: Vec<ServiceProcessMetrics>, ) -> Result<Option<ControllerResponse<T>>, Error>
pub(crate) fn record_replica_metrics( &mut self, replica_id: ReplicaId, metrics: &[ServiceProcessMetrics], )
sourcepub async fn determine_real_time_recent_timestamp(
&self,
ids: BTreeSet<GlobalId>,
timeout: Duration,
) -> Result<BoxFuture<'static, Result<T, StorageError<T>>>, StorageError<T>>
pub async fn determine_real_time_recent_timestamp( &self, ids: BTreeSet<GlobalId>, timeout: Duration, ) -> Result<BoxFuture<'static, Result<T, StorageError<T>>>, StorageError<T>>
Determine the “real-time recency” timestamp for all ids
.
Real-time recency is defined as the minimum value of T
that all
objects can be queried at to return all data visible in the upstream
system the query was issued. In this case, “the upstream systems” are
any user sources that connect to objects outside of Materialize, such as
Kafka sources.
If no items in ids
connect to external systems, this function will
return Ok(T::minimum)
.
source§impl<T> Controller<T>where
T: Timestamp + Codec64 + From<EpochMillis> + TimestampManipulation + Display + Into<Timestamp> + ComputeControllerTimestamp,
StorageCommand<T>: RustType<ProtoStorageCommand>,
StorageResponse<T>: RustType<ProtoStorageResponse>,
ComputeGrpcClient: ComputeClient<T>,
impl<T> Controller<T>where
T: Timestamp + Codec64 + From<EpochMillis> + TimestampManipulation + Display + Into<Timestamp> + ComputeControllerTimestamp,
StorageCommand<T>: RustType<ProtoStorageCommand>,
StorageResponse<T>: RustType<ProtoStorageResponse>,
ComputeGrpcClient: ComputeClient<T>,
sourcepub async fn new(
config: ControllerConfig,
envd_epoch: NonZeroI64,
read_only: bool,
storage_txn: &dyn StorageTxn<T>,
) -> Self
pub async fn new( config: ControllerConfig, envd_epoch: NonZeroI64, read_only: bool, storage_txn: &dyn StorageTxn<T>, ) -> Self
Creates a new controller.
For correctness, this function expects to have access to the mutations
to the storage_txn
that occurred in prepare_initialization
.
§Panics
If this function is called before prepare_initialization
.
Auto Trait Implementations§
impl<T = Timestamp> !Freeze for Controller<T>
impl<T = Timestamp> !RefUnwindSafe for Controller<T>
impl<T = Timestamp> !Send for Controller<T>
impl<T = Timestamp> !Sync for Controller<T>
impl<T> Unpin for Controller<T>where
T: Unpin,
impl<T = Timestamp> !UnwindSafe for Controller<T>
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
source§impl<T> FmtForward for T
impl<T> FmtForward for T
source§fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
self
to use its Binary
implementation when Debug
-formatted.source§fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
self
to use its Display
implementation when
Debug
-formatted.source§fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
self
to use its LowerExp
implementation when
Debug
-formatted.source§fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
self
to use its LowerHex
implementation when
Debug
-formatted.source§fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
self
to use its Octal
implementation when Debug
-formatted.source§fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
self
to use its Pointer
implementation when
Debug
-formatted.source§fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
self
to use its UpperExp
implementation when
Debug
-formatted.source§fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
self
to use its UpperHex
implementation when
Debug
-formatted.source§impl<T> FutureExt for T
impl<T> FutureExt for T
source§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
source§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
source§impl<T> Instrument for T
impl<T> Instrument for T
source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T
in a tonic::Request
source§impl<T, U> OverrideFrom<Option<&T>> for Uwhere
U: OverrideFrom<T>,
impl<T, U> OverrideFrom<Option<&T>> for Uwhere
U: OverrideFrom<T>,
source§impl<T> Pipe for Twhere
T: ?Sized,
impl<T> Pipe for Twhere
T: ?Sized,
source§fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
source§fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
self
and passes that borrow into the pipe function. Read moresource§fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
self
and passes that borrow into the pipe function. Read moresource§fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
source§fn pipe_borrow_mut<'a, B, R>(
&'a mut self,
func: impl FnOnce(&'a mut B) -> R,
) -> R
fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R, ) -> R
source§fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
self
, then passes self.as_ref()
into the pipe function.source§fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
self
, then passes self.as_mut()
into the pipe
function.source§fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
self
, then passes self.deref()
into the pipe function.source§impl<T> Pointable for T
impl<T> Pointable for T
source§impl<P, R> ProtoType<R> for Pwhere
R: RustType<P>,
impl<P, R> ProtoType<R> for Pwhere
R: RustType<P>,
source§fn into_rust(self) -> Result<R, TryFromProtoError>
fn into_rust(self) -> Result<R, TryFromProtoError>
RustType::from_proto
.source§fn from_rust(rust: &R) -> P
fn from_rust(rust: &R) -> P
RustType::into_proto
.source§impl<'a, S, T> Semigroup<&'a S> for Twhere
T: Semigroup<S>,
impl<'a, S, T> Semigroup<&'a S> for Twhere
T: Semigroup<S>,
source§fn plus_equals(&mut self, rhs: &&'a S)
fn plus_equals(&mut self, rhs: &&'a S)
std::ops::AddAssign
, for types that do not implement AddAssign
.source§impl<T> Tap for T
impl<T> Tap for T
source§fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
Borrow<B>
of a value. Read moresource§fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
BorrowMut<B>
of a value. Read moresource§fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
AsRef<R>
view of a value. Read moresource§fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
AsMut<R>
view of a value. Read moresource§fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
Deref::Target
of a value. Read moresource§fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
Deref::Target
of a value. Read moresource§fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
.tap()
only in debug builds, and is erased in release builds.source§fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
.tap_mut()
only in debug builds, and is erased in release
builds.source§fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
.tap_borrow()
only in debug builds, and is erased in release
builds.source§fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
.tap_borrow_mut()
only in debug builds, and is erased in release
builds.source§fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
.tap_ref()
only in debug builds, and is erased in release
builds.source§fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
.tap_ref_mut()
only in debug builds, and is erased in release
builds.source§fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
.tap_deref()
only in debug builds, and is erased in release
builds.