Struct mz_controller::Controller

source ·
pub struct Controller<T: ComputeControllerTimestamp = Timestamp> {
Show 19 fields pub storage: Box<dyn StorageController<Timestamp = T>>, pub storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>, pub compute: ComputeController<T>, pub(crate) clusterd_image: String, pub(crate) init_container_image: Option<String>, pub(crate) deploy_generation: u64, pub(crate) read_only: bool, pub(crate) orchestrator: Arc<dyn NamespacedOrchestrator>, pub(crate) readiness: Readiness<T>, pub(crate) metrics_tasks: BTreeMap<ReplicaId, AbortOnDropHandle<()>>, pub(crate) metrics_tx: UnboundedSender<(ReplicaId, Vec<ServiceProcessMetrics>)>, pub(crate) metrics_rx: UnboundedReceiver<(ReplicaId, Vec<ServiceProcessMetrics>)>, pub(crate) now: NowFn, pub(crate) persist_pubsub_url: String, pub(crate) secrets_args: SecretsReaderCliArgs, pub(crate) unfulfilled_watch_sets_by_object: BTreeMap<GlobalId, BTreeSet<WatchSetId>>, pub(crate) unfulfilled_watch_sets: BTreeMap<WatchSetId, (BTreeSet<GlobalId>, T)>, pub(crate) watch_set_id_gen: Gen<WatchSetId>, pub(crate) immediate_watch_sets: Vec<WatchSetId>,
}
Expand description

A client that maintains soft state and validates commands, in addition to forwarding them.

Fields§

§storage: Box<dyn StorageController<Timestamp = T>>§storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>§compute: ComputeController<T>§clusterd_image: String

The clusterd image to use when starting new cluster processes.

§init_container_image: Option<String>

The init container image to use for clusterd.

§deploy_generation: u64

A number representing the environment’s generation.

§read_only: bool

Whether or not this controller is in read-only mode.

When in read-only mode, neither this controller nor the instances controlled by it are allowed to affect changes to external systems (largely persist).

§orchestrator: Arc<dyn NamespacedOrchestrator>

The cluster orchestrator.

§readiness: Readiness<T>

Tracks the readiness of the underlying controllers.

§metrics_tasks: BTreeMap<ReplicaId, AbortOnDropHandle<()>>

Tasks for collecting replica metrics.

§metrics_tx: UnboundedSender<(ReplicaId, Vec<ServiceProcessMetrics>)>

Sender for the channel over which replica metrics are sent.

§metrics_rx: UnboundedReceiver<(ReplicaId, Vec<ServiceProcessMetrics>)>

Receiver for the channel over which replica metrics are sent.

§now: NowFn

A function providing the current wallclock time.

§persist_pubsub_url: String

The URL for Persist PubSub.

§secrets_args: SecretsReaderCliArgs

Arguments for secrets readers.

§unfulfilled_watch_sets_by_object: BTreeMap<GlobalId, BTreeSet<WatchSetId>>

A map associating a global ID to the set of all the unfulfilled watch set ids that include it.

See [self.install_watch_set] for a description of watch sets.

§unfulfilled_watch_sets: BTreeMap<WatchSetId, (BTreeSet<GlobalId>, T)>

A map of installed watch sets indexed by id.

§watch_set_id_gen: Gen<WatchSetId>

A sequence of numbers used to mint unique WatchSetIds.

§immediate_watch_sets: Vec<WatchSetId>

A list of watch sets that were already fulfilled as soon as they were installed, and thus that must be returned to the client on the next call to [self.process].

See [self.install_watch_set] for a description of watch sets.

Implementations§

source§

impl<T> Controller<T>

source

pub fn create_cluster( &mut self, id: ClusterId, config: ClusterConfig, ) -> Result<(), Error>

Creates a cluster with the specified identifier and configuration.

A cluster is a combination of a storage instance and a compute instance. A cluster has zero or more replicas; each replica colocates the storage and compute layers on the same physical resources.

source

pub fn update_cluster_workload_class( &mut self, id: ClusterId, workload_class: Option<String>, ) -> Result<(), Error>

Updates the workload class for a cluster.

source

pub fn drop_cluster(&mut self, id: ClusterId)

Drops the specified cluster.

§Panics

Panics if the cluster still has replicas.

source

pub fn create_replica( &mut self, cluster_id: ClusterId, replica_id: ReplicaId, role: ClusterRole, config: ReplicaConfig, enable_worker_core_affinity: bool, ) -> Result<(), Error>

Creates a replica of the specified cluster with the specified identifier and configuration.

source

pub fn drop_replica( &mut self, cluster_id: ClusterId, replica_id: ReplicaId, ) -> Result<(), Error>

Drops the specified replica of the specified cluster.

source

pub(crate) fn remove_past_generation_replicas_in_background(&self)

Removes replicas from past generations in a background task.

source

pub async fn remove_orphaned_replicas( &mut self, next_user_replica_id: u64, next_system_replica_id: u64, ) -> Result<(), Error>

Remove replicas that are orphaned in the current generation.

source

pub fn events_stream(&self) -> BoxStream<'static, ClusterEvent>

source

fn provision_replica( &self, cluster_id: ClusterId, replica_id: ReplicaId, role: ClusterRole, location: ManagedReplicaLocation, enable_worker_core_affinity: bool, ) -> Result<(Box<dyn Service>, AbortOnDropHandle<()>), Error>

Provisions a replica with the service orchestrator.

source

fn deprovision_replica( &self, cluster_id: ClusterId, replica_id: ReplicaId, generation: u64, ) -> Result<(), Error>

Deprovisions a replica with the service orchestrator.

source§

impl<T: ComputeControllerTimestamp> Controller<T>

source

pub fn set_arrangement_exert_proportionality(&mut self, value: u32)

source

pub fn connection_context(&self) -> &ConnectionContext

Returns the connection context installed in the controller.

This is purely a helper, and can be obtained from self.storage.

source

pub fn storage_configuration(&self) -> &StorageConfiguration

Returns the storage configuration installed in the storage controller.

This is purely a helper, and can be obtained from self.storage.

source

pub async fn dump(&self) -> Result<Value, Error>

Returns the state of the Controller formatted as JSON.

The returned value is not guaranteed to be stable and may change at any point in time.

source§

impl<T> Controller<T>

source

pub fn update_orchestrator_scheduling_config( &self, config: ServiceSchedulingConfig, )

source

pub fn initialization_complete(&mut self)

Marks the end of any initialization commands.

The implementor may wait for this method to be called before implementing prior commands, and so it is important for a user to invoke this method as soon as it is comfortable. This method can be invoked immediately, at the potential expense of performance.

source

pub fn read_only(&self) -> bool

Reports whether the controller is in read only mode.

source

pub(crate) fn take_internal_response(&mut self) -> Option<ControllerResponse<T>>

Returns Some if there is an immediately available internally-generated response that we need to return to the client (as opposed to waiting for a response from compute or storage).

source

pub async fn ready(&mut self)

Waits until the controller is ready to process a response.

This method may block for an arbitrarily long time.

When the method returns, the owner should call Controller::ready to process the ready message.

This method is cancellation safe.

source

pub fn install_compute_watch_set( &mut self, objects: BTreeSet<GlobalId>, t: T, ) -> WatchSetId

Install a watch set in the controller.

A watch set is a request to be informed by the controller when all of the frontiers of a particular set of objects have advanced at least to a particular timestamp.

When all the objects in objects have advanced to t, the watchset id is returned to the client on the next call to Self::process.

source

pub fn install_storage_watch_set( &mut self, objects: BTreeSet<GlobalId>, t: T, ) -> WatchSetId

Install a watch set in the controller.

A watch set is a request to be informed by the controller when all of the frontiers of a particular set of objects have advanced at least to a particular timestamp.

When all the objects in objects have advanced to t, the watchset id is returned to the client on the next call to Self::process.

source

pub fn uninstall_watch_set(&mut self, ws_id: &WatchSetId)

Uninstalls a previously installed WatchSetId. The method is a no-op if the watch set has already finished and therefore it’s safe to call this function unconditionally.

§Panics

This method panics if called with a WatchSetId that was never returned by the function.

source

pub(crate) fn process_storage_response( &mut self, storage_metadata: &StorageMetadata, ) -> Result<Option<ControllerResponse<T>>, Error>

Process a pending response from the storage controller. If necessary, return a higher-level response to our client.

source

pub(crate) fn process_compute_response( &mut self, ) -> Result<Option<ControllerResponse<T>>, Error>

Process a pending response from the compute controller. If necessary, return a higher-level response to our client.

source

pub fn process( &mut self, storage_metadata: &StorageMetadata, ) -> Result<Option<ControllerResponse<T>>, Error>

Processes the work queued by Controller::ready.

This method is guaranteed to return “quickly” unless doing so would compromise the correctness of the system.

This method is not guaranteed to be cancellation safe. It must be awaited to completion.

source

pub(crate) fn handle_frontier_updates( &mut self, updates: &[(GlobalId, Antichain<T>)], ) -> Option<ControllerResponse<T>>

Record updates to frontiers, and propagate any necessary responses. As of this writing (2/29/2024), the only response that can be generated from a frontier update is WatchSetCompleted.

source

pub(crate) fn process_replica_metrics( &mut self, id: ReplicaId, metrics: Vec<ServiceProcessMetrics>, ) -> Result<Option<ControllerResponse<T>>, Error>

source

pub(crate) fn record_replica_metrics( &mut self, replica_id: ReplicaId, metrics: &[ServiceProcessMetrics], )

source

pub async fn determine_real_time_recent_timestamp( &self, ids: BTreeSet<GlobalId>, timeout: Duration, ) -> Result<BoxFuture<'static, Result<T, StorageError<T>>>, StorageError<T>>

Determine the “real-time recency” timestamp for all ids.

Real-time recency is defined as the minimum value of T that all objects can be queried at to return all data visible in the upstream system the query was issued. In this case, “the upstream systems” are any user sources that connect to objects outside of Materialize, such as Kafka sources.

If no items in ids connect to external systems, this function will return Ok(T::minimum).

source§

impl<T> Controller<T>

source

pub async fn new( config: ControllerConfig, envd_epoch: NonZeroI64, read_only: bool, storage_txn: &dyn StorageTxn<T>, ) -> Self

Creates a new controller.

For correctness, this function expects to have access to the mutations to the storage_txn that occurred in prepare_initialization.

§Panics

If this function is called before prepare_initialization.

Auto Trait Implementations§

§

impl<T = Timestamp> !Freeze for Controller<T>

§

impl<T = Timestamp> !RefUnwindSafe for Controller<T>

§

impl<T = Timestamp> !Send for Controller<T>

§

impl<T = Timestamp> !Sync for Controller<T>

§

impl<T> Unpin for Controller<T>
where T: Unpin,

§

impl<T = Timestamp> !UnwindSafe for Controller<T>

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T, U> CastInto<U> for T
where U: CastFrom<T>,

source§

fn cast_into(self) -> U

Performs the cast.
source§

impl<T> Conv for T

source§

fn conv<T>(self) -> T
where Self: Into<T>,

Converts self into T using Into<T>. Read more
source§

impl<T> FmtForward for T

source§

fn fmt_binary(self) -> FmtBinary<Self>
where Self: Binary,

Causes self to use its Binary implementation when Debug-formatted.
source§

fn fmt_display(self) -> FmtDisplay<Self>
where Self: Display,

Causes self to use its Display implementation when Debug-formatted.
source§

fn fmt_lower_exp(self) -> FmtLowerExp<Self>
where Self: LowerExp,

Causes self to use its LowerExp implementation when Debug-formatted.
source§

fn fmt_lower_hex(self) -> FmtLowerHex<Self>
where Self: LowerHex,

Causes self to use its LowerHex implementation when Debug-formatted.
source§

fn fmt_octal(self) -> FmtOctal<Self>
where Self: Octal,

Causes self to use its Octal implementation when Debug-formatted.
source§

fn fmt_pointer(self) -> FmtPointer<Self>
where Self: Pointer,

Causes self to use its Pointer implementation when Debug-formatted.
source§

fn fmt_upper_exp(self) -> FmtUpperExp<Self>
where Self: UpperExp,

Causes self to use its UpperExp implementation when Debug-formatted.
source§

fn fmt_upper_hex(self) -> FmtUpperHex<Self>
where Self: UpperHex,

Causes self to use its UpperHex implementation when Debug-formatted.
source§

fn fmt_list(self) -> FmtList<Self>
where &'a Self: for<'a> IntoIterator,

Formats each item in a sequence. Read more
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T> FutureExt for T

source§

fn with_context(self, otel_cx: Context) -> WithContext<Self>

Attaches the provided Context to this type, returning a WithContext wrapper. Read more
source§

fn with_current_context(self) -> WithContext<Self>

Attaches the current Context to this type, returning a WithContext wrapper. Read more
source§

impl<T> Instrument for T

source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
source§

impl<T, U> Into<U> for T
where U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T> IntoRequest<T> for T

source§

fn into_request(self) -> Request<T>

Wrap the input message T in a tonic::Request
source§

impl<Unshared, Shared> IntoShared<Shared> for Unshared
where Shared: FromUnshared<Unshared>,

source§

fn into_shared(self) -> Shared

Creates a shared type from an unshared type.
source§

impl<T, U> OverrideFrom<Option<&T>> for U
where U: OverrideFrom<T>,

source§

fn override_from(self, layer: &Option<&T>) -> U

Override the configuration represented by Self with values from the given layer.
source§

impl<T> Pipe for T
where T: ?Sized,

source§

fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> R
where Self: Sized,

Pipes by value. This is generally the method you want to use. Read more
source§

fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> R
where R: 'a,

Borrows self and passes that borrow into the pipe function. Read more
source§

fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> R
where R: 'a,

Mutably borrows self and passes that borrow into the pipe function. Read more
source§

fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
where Self: Borrow<B>, B: 'a + ?Sized, R: 'a,

Borrows self, then passes self.borrow() into the pipe function. Read more
source§

fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R, ) -> R
where Self: BorrowMut<B>, B: 'a + ?Sized, R: 'a,

Mutably borrows self, then passes self.borrow_mut() into the pipe function. Read more
source§

fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
where Self: AsRef<U>, U: 'a + ?Sized, R: 'a,

Borrows self, then passes self.as_ref() into the pipe function.
source§

fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
where Self: AsMut<U>, U: 'a + ?Sized, R: 'a,

Mutably borrows self, then passes self.as_mut() into the pipe function.
source§

fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
where Self: Deref<Target = T>, T: 'a + ?Sized, R: 'a,

Borrows self, then passes self.deref() into the pipe function.
source§

fn pipe_deref_mut<'a, T, R>( &'a mut self, func: impl FnOnce(&'a mut T) -> R, ) -> R
where Self: DerefMut<Target = T> + Deref, T: 'a + ?Sized, R: 'a,

Mutably borrows self, then passes self.deref_mut() into the pipe function.
source§

impl<T> Pointable for T

source§

const ALIGN: usize = _

The alignment of pointer.
§

type Init = T

The type for initializers.
source§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
source§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
source§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
source§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
source§

impl<P, R> ProtoType<R> for P
where R: RustType<P>,

source§

impl<T> Same for T

§

type Output = T

Should always be Self
source§

impl<'a, S, T> Semigroup<&'a S> for T
where T: Semigroup<S>,

source§

fn plus_equals(&mut self, rhs: &&'a S)

The method of std::ops::AddAssign, for types that do not implement AddAssign.
source§

impl<T> Tap for T

source§

fn tap(self, func: impl FnOnce(&Self)) -> Self

Immutable access to a value. Read more
source§

fn tap_mut(self, func: impl FnOnce(&mut Self)) -> Self

Mutable access to a value. Read more
source§

fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
where Self: Borrow<B>, B: ?Sized,

Immutable access to the Borrow<B> of a value. Read more
source§

fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
where Self: BorrowMut<B>, B: ?Sized,

Mutable access to the BorrowMut<B> of a value. Read more
source§

fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
where Self: AsRef<R>, R: ?Sized,

Immutable access to the AsRef<R> view of a value. Read more
source§

fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
where Self: AsMut<R>, R: ?Sized,

Mutable access to the AsMut<R> view of a value. Read more
source§

fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
where Self: Deref<Target = T>, T: ?Sized,

Immutable access to the Deref::Target of a value. Read more
source§

fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
where Self: DerefMut<Target = T> + Deref, T: ?Sized,

Mutable access to the Deref::Target of a value. Read more
source§

fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self

Calls .tap() only in debug builds, and is erased in release builds.
source§

fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self

Calls .tap_mut() only in debug builds, and is erased in release builds.
source§

fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
where Self: Borrow<B>, B: ?Sized,

Calls .tap_borrow() only in debug builds, and is erased in release builds.
source§

fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
where Self: BorrowMut<B>, B: ?Sized,

Calls .tap_borrow_mut() only in debug builds, and is erased in release builds.
source§

fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
where Self: AsRef<R>, R: ?Sized,

Calls .tap_ref() only in debug builds, and is erased in release builds.
source§

fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
where Self: AsMut<R>, R: ?Sized,

Calls .tap_ref_mut() only in debug builds, and is erased in release builds.
source§

fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
where Self: Deref<Target = T>, T: ?Sized,

Calls .tap_deref() only in debug builds, and is erased in release builds.
source§

fn tap_deref_mut_dbg<T>(self, func: impl FnOnce(&mut T)) -> Self
where Self: DerefMut<Target = T> + Deref, T: ?Sized,

Calls .tap_deref_mut() only in debug builds, and is erased in release builds.
source§

impl<T> TryConv for T

source§

fn try_conv<T>(self) -> Result<T, Self::Error>
where Self: TryInto<T>,

Attempts to convert self into T using TryInto<T>. Read more
source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

source§

fn vzip(self) -> V

source§

impl<T> WithSubscriber for T

source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more