pub struct Controller<T = Timestamp> {
Show 19 fields pub storage: Box<dyn StorageController<Timestamp = T>>, pub storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>, pub compute: ComputeController<T>, pub(crate) clusterd_image: String, pub(crate) init_container_image: Option<String>, pub(crate) deploy_generation: u64, pub(crate) read_only: bool, pub(crate) orchestrator: Arc<dyn NamespacedOrchestrator>, pub(crate) readiness: Readiness<T>, pub(crate) metrics_tasks: BTreeMap<ReplicaId, AbortOnDropHandle<()>>, pub(crate) metrics_tx: UnboundedSender<(ReplicaId, Vec<ServiceProcessMetrics>)>, pub(crate) metrics_rx: Peekable<UnboundedReceiverStream<(ReplicaId, Vec<ServiceProcessMetrics>)>>, pub(crate) frontiers_ticker: Interval, pub(crate) persist_pubsub_url: String, pub(crate) secrets_args: SecretsReaderCliArgs, pub(crate) unfulfilled_watch_sets_by_object: BTreeMap<GlobalId, BTreeSet<WatchSetId>>, pub(crate) unfulfilled_watch_sets: BTreeMap<WatchSetId, (BTreeSet<GlobalId>, T)>, pub(crate) watch_set_id_gen: Gen<WatchSetId>, pub(crate) immediate_watch_sets: Vec<WatchSetId>,
A client that maintains soft state and validates commands, in addition to forwarding them.


§storage: Box<dyn StorageController<Timestamp = T>>§storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>§compute: ComputeController<T>§clusterd_image: String

The clusterd image to use when starting new cluster processes.

§init_container_image: Option<String>

The init container image to use for clusterd.

§deploy_generation: u64

A number representing the environment’s generation.

§read_only: bool

Whether or not this controller is in read-only mode.

When in read-only mode, neither this controller nor the instances controlled by it are allowed to affect changes to external systems (largely persist).

§orchestrator: Arc<dyn NamespacedOrchestrator>

The cluster orchestrator.

§readiness: Readiness<T>

Tracks the readiness of the underlying controllers.

§metrics_tasks: BTreeMap<ReplicaId, AbortOnDropHandle<()>>

Tasks for collecting replica metrics.

§metrics_tx: UnboundedSender<(ReplicaId, Vec<ServiceProcessMetrics>)>

Sender for the channel over which replica metrics are sent.

§metrics_rx: Peekable<UnboundedReceiverStream<(ReplicaId, Vec<ServiceProcessMetrics>)>>

Receiver for the channel over which replica metrics are sent.

§frontiers_ticker: Interval

Periodic notification to record frontiers.

§persist_pubsub_url: String

The URL for Persist PubSub.

§secrets_args: SecretsReaderCliArgs

Arguments for secrets readers.

§unfulfilled_watch_sets_by_object: BTreeMap<GlobalId, BTreeSet<WatchSetId>>

A map associating a global ID to the set of all the unfulfilled watch set ids that include it.

See [self.install_watch_set] for a description of watch sets.

§unfulfilled_watch_sets: BTreeMap<WatchSetId, (BTreeSet<GlobalId>, T)>

A map of installed watch sets indexed by id.

§watch_set_id_gen: Gen<WatchSetId>

A sequence of numbers used to mint unique WatchSetIds.

§immediate_watch_sets: Vec<WatchSetId>

A list of watch sets that were already fulfilled as soon as they were installed, and thus that must be returned to the client on the next call to [self.process].

See [self.install_watch_set] for a description of watch sets.



impl<T> Controller<T>


pub fn create_cluster( &mut self, id: ClusterId, config: ClusterConfig ) -> Result<(), Error>

Creates a cluster with the specified identifier and configuration.

A cluster is a combination of a storage instance and a compute instance. A cluster has zero or more replicas; each replica colocates the storage and compute layers on the same physical resources.


pub fn update_cluster_workload_class( &mut self, id: ClusterId, workload_class: Option<String> ) -> Result<(), Error>

Updates the workload class for a cluster.


pub fn drop_cluster(&mut self, id: ClusterId)

Drops the specified cluster.


Panics if the cluster still has replicas.


pub fn create_replica( &mut self, cluster_id: ClusterId, replica_id: ReplicaId, role: ClusterRole, config: ReplicaConfig, enable_worker_core_affinity: bool ) -> Result<(), Error>

Creates a replica of the specified cluster with the specified identifier and configuration.


pub fn drop_replica( &mut self, cluster_id: ClusterId, replica_id: ReplicaId ) -> Result<(), Error>

Drops the specified replica of the specified cluster.


pub(crate) fn remove_past_generation_replicas_in_background(&self)

Removes replicas from past generations in a background task.


pub async fn remove_orphaned_replicas( &mut self, next_user_replica_id: u64, next_system_replica_id: u64 ) -> Result<(), Error>

Remove replicas that are orphaned in the current generation.


pub fn events_stream(&self) -> BoxStream<'static, ClusterEvent>


fn provision_replica( &self, cluster_id: ClusterId, replica_id: ReplicaId, role: ClusterRole, location: ManagedReplicaLocation, enable_worker_core_affinity: bool ) -> Result<(Box<dyn Service>, AbortOnDropHandle<()>), Error>

Provisions a replica with the service orchestrator.


fn deprovision_replica( &mut self, cluster_id: ClusterId, replica_id: ReplicaId, generation: u64 ) -> Result<(), Error>

Deprovisions a replica with the service orchestrator.


impl<T: ComputeControllerTimestamp> Controller<T>


pub fn set_arrangement_exert_proportionality(&mut self, value: u32)


pub fn connection_context(&self) -> &ConnectionContext

Returns the connection context installed in the controller.

This is purely a helper, and can be obtained from


pub fn storage_configuration(&self) -> &StorageConfiguration

Returns the storage configuration installed in the storage controller.

This is purely a helper, and can be obtained from


pub fn dump(&self) -> Result<Value, Error>

Returns the state of the Controller formatted as JSON.

The returned value is not guaranteed to be stable and may change at any point in time.


impl<T> Controller<T>


pub fn update_orchestrator_scheduling_config( &mut self, config: ServiceSchedulingConfig )


pub fn initialization_complete(&mut self)

Marks the end of any initialization commands.

The implementor may wait for this method to be called before implementing prior commands, and so it is important for a user to invoke this method as soon as it is comfortable. This method can be invoked immediately, at the potential expense of performance.


pub fn read_only(&self) -> bool

Reports whether the controller is in read only mode.


pub async fn allow_writes(&mut self, register_ts: Option<T>)

Allow this controller and instances controlled by it to write to external systems.

If the controller has previously been told about tables (via StorageController::create_collections), the caller must provide a register_ts, the timestamp at which any tables that are known to the controller should be registered in the txn system.


Panics when the controller knows about tables but not register_ts is provided.


pub(crate) fn take_internal_response(&mut self) -> Option<ControllerResponse<T>>

Returns Some if there is an immediately available internally-generated response that we need to return to the client (as opposed to waiting for a response from compute or storage).


pub async fn ready(&mut self)

Waits until the controller is ready to process a response.

This method may block for an arbitrarily long time.

When the method returns, the owner should call Controller::ready to process the ready message.

This method is cancellation safe.


pub fn install_compute_watch_set( &mut self, objects: BTreeSet<GlobalId>, t: T ) -> WatchSetId

Install a watch set in the controller.

A watch set is a request to be informed by the controller when all of the frontiers of a particular set of objects have advanced at least to a particular timestamp.

When all the objects in objects have advanced to t, the watchset id is returned to the client on the next call to Self::process.


pub fn install_storage_watch_set( &mut self, objects: BTreeSet<GlobalId>, t: T ) -> WatchSetId

Install a watch set in the controller.

A watch set is a request to be informed by the controller when all of the frontiers of a particular set of objects have advanced at least to a particular timestamp.

When all the objects in objects have advanced to t, the watchset id is returned to the client on the next call to Self::process.


pub fn uninstall_watch_set(&mut self, ws_id: &WatchSetId)

Uninstalls a previously installed WatchSetId. The method is a no-op if the watch set has already finished and therefore it’s safe to call this function unconditionally.


This method panics if called with a WatchSetId that was never returned by the function.


pub(crate) async fn process_storage_response( &mut self, storage_metadata: &StorageMetadata ) -> Result<Option<ControllerResponse<T>>, Error>

Process a pending response from the storage controller. If necessary, return a higher-level response to our client.


pub(crate) async fn process_compute_response( &mut self ) -> Result<Option<ControllerResponse<T>>, Error>

Process a pending response from the compute controller. If necessary, return a higher-level response to our client.


pub async fn process( &mut self, storage_metadata: &StorageMetadata ) -> Result<Option<ControllerResponse<T>>, Error>

Processes the work queued by Controller::ready.

This method is guaranteed to return “quickly” unless doing so would compromise the correctness of the system.

This method is not guaranteed to be cancellation safe. It must be awaited to completion.


pub(crate) fn handle_frontier_updates( &mut self, updates: &[(GlobalId, Antichain<T>)] ) -> Option<ControllerResponse<T>>

Record updates to frontiers, and propagate any necessary responses. As of this writing (2/29/2024), the only response that can be generated from a frontier update is WatchSetCompleted.


pub(crate) async fn record_frontiers(&mut self)


pub async fn determine_real_time_recent_timestamp( &mut self, ids: BTreeSet<GlobalId>, timeout: Duration ) -> Result<BoxFuture<'static, Result<T, StorageError<T>>>, StorageError<T>>

Determine the “real-time recency” timestamp for all ids.

Real-time recency is defined as the minimum value of T that all objects can be queried at to return all data visible in the upstream system the query was issued. In this case, “the upstream systems” are any user sources that connect to objects outside of Materialize, such as Kafka sources.

If no items in ids connect to external systems, this function will return Ok(T::minimum).


impl<T> Controller<T>


pub async fn new( config: ControllerConfig, envd_epoch: NonZeroI64, read_only: bool, storage_txn: &dyn StorageTxn<T> ) -> Self

Creates a new controller.

For correctness, this function expects to have access to the mutations to the storage_txn that occurred in prepare_initialization.


If this function is called before prepare_initialization.

