pub struct ComputeController<T> {
Show 19 fields instances: BTreeMap<ComputeInstanceId, Instance<T>>, instance_workload_classes: Arc<Mutex<BTreeMap<ComputeInstanceId, Option<String>>>>, build_info: &'static BuildInfo, storage_collections: Arc<dyn StorageCollections<Timestamp = T>>, initialized: bool, read_only: bool, config: ComputeParameters, arrangement_exert_proportionality: u32, stashed_replica_response: Option<(ComputeInstanceId, ReplicaId, ComputeResponse<T>)>, envd_epoch: NonZeroI64, metrics: ComputeControllerMetrics, wallclock_lag: Arc<dyn Fn(&T) -> Duration>, dyncfg: Arc<ConfigSet>, response_rx: Receiver<ComputeControllerResponse<T>>, response_tx: Sender<ComputeControllerResponse<T>>, introspection_rx: Receiver<(IntrospectionType, Vec<(Row, Diff)>)>, introspection_tx: Sender<(IntrospectionType, Vec<(Row, Diff)>)>, maintenance_ticker: Interval, maintenance_scheduled: bool,
A controller for the compute layer.


§instances: BTreeMap<ComputeInstanceId, Instance<T>>§instance_workload_classes: Arc<Mutex<BTreeMap<ComputeInstanceId, Option<String>>>>

A map from an instance ID to an arbitrary string that describes the class of the workload that compute instance is running (e.g., production or staging).

§build_info: &'static BuildInfo§storage_collections: Arc<dyn StorageCollections<Timestamp = T>>

A handle providing access to storage collections.

§initialized: bool

Set to true once initialization_complete has been called.

§read_only: bool

Whether or not this controller is in read-only mode.

When in read-only mode, neither this controller nor the instances controlled by it are allowed to affect changes to external systems (largely persist).

§config: ComputeParameters

Compute configuration to apply to new instances.

§arrangement_exert_proportionality: u32

arrangement_exert_proportionality value passed to new replicas.

§stashed_replica_response: Option<(ComputeInstanceId, ReplicaId, ComputeResponse<T>)>

A replica response to be handled by the corresponding Instance on a subsequent call to ComputeController::process.

§envd_epoch: NonZeroI64

A number that increases on every environmentd restart.

§metrics: ComputeControllerMetrics

The compute controller metrics.

§wallclock_lag: Arc<dyn Fn(&T) -> Duration>

A function that compute the lag between the given time and wallclock time.

§dyncfg: Arc<ConfigSet>

Dynamic system configuration.

Updated through ComputeController::update_configuration calls and shared with all subcomponents of the compute controller.

§response_rx: Receiver<ComputeControllerResponse<T>>

Receiver for responses produced by Instances, to be delivered on subsequent calls to ComputeController::process.

§response_tx: Sender<ComputeControllerResponse<T>>

Response sender that’s passed to new Instances.

§introspection_rx: Receiver<(IntrospectionType, Vec<(Row, Diff)>)>

Receiver for introspection updates produced by Instances.

§introspection_tx: Sender<(IntrospectionType, Vec<(Row, Diff)>)>

Introspection updates sender that’s passed to new Instances.

§maintenance_ticker: Interval

Ticker for scheduling periodic maintenance work.

§maintenance_scheduled: bool

Whether maintenance work was scheduled.



impl<T: ComputeControllerTimestamp> ComputeController<T>


pub fn new( build_info: &'static BuildInfo, storage_collections: Arc<dyn StorageCollections<Timestamp = T>>, envd_epoch: NonZeroI64, read_only: bool, metrics_registry: MetricsRegistry, wallclock_lag: Arc<dyn Fn(&T) -> Duration> ) -> Self

Construct a new ComputeController.


pub fn instance_exists(&self, id: ComputeInstanceId) -> bool

TODO(#25239): Add documentation.


fn instance( &self, id: ComputeInstanceId ) -> Result<&Instance<T>, InstanceMissing>

Return a reference to the indicated compute instance.


fn instance_mut( &mut self, id: ComputeInstanceId ) -> Result<&mut Instance<T>, InstanceMissing>

Return a mutable reference to the indicated compute instance.


pub fn instance_ref( &self, id: ComputeInstanceId ) -> Result<ComputeInstanceRef<'_, T>, InstanceMissing>

Return a read-only handle to the indicated compute instance.


pub fn collection( &self, instance_id: ComputeInstanceId, collection_id: GlobalId ) -> Result<&CollectionState<T>, CollectionLookupError>

Return a read-only handle to the indicated collection.


pub fn find_collection( &self, collection_id: GlobalId ) -> Result<&CollectionState<T>, CollectionLookupError>

Return a read-only handle to the indicated collection.


pub fn collection_reverse_dependencies( &self, instance_id: ComputeInstanceId, id: GlobalId ) -> Result<impl Iterator<Item = &GlobalId>, InstanceMissing>

List compute collections that depend on the given collection.


pub fn set_arrangement_exert_proportionality(&mut self, value: u32)

Set the arrangement_exert_proportionality value to be passed to new replicas.


pub fn clusters_hydrated(&self) -> bool

Returns true iff all collections on all clusters have been hydrated.

For this check, zero-replica clusters are always considered hydrated. Their collections would never normally be considered hydrated but it’s clearly intentional that they have no replicas.


pub fn collection_frontiers( &self ) -> BTreeMap<GlobalId, (Antichain<T>, Antichain<T>)>

Returns the read and write frontiers for each collection.


pub fn replica_write_frontiers( &self ) -> BTreeMap<(GlobalId, ReplicaId), Antichain<T>>

Returns the write frontier for each collection installed on each replica.


pub fn dump(&self) -> Result<Value, Error>

Returns the state of the ComputeController formatted as JSON.

The returned value is not guaranteed to be stable and may change at any point in time.


impl<T> ComputeController<T>


pub fn create_instance( &mut self, id: ComputeInstanceId, arranged_logs: BTreeMap<LogVariant, GlobalId>, workload_class: Option<String> ) -> Result<(), InstanceExists>

Create a compute instance.


pub fn update_instance_workload_class( &mut self, id: ComputeInstanceId, workload_class: Option<String> ) -> Result<(), InstanceMissing>

Updates a compute instance’s workload class.


pub fn drop_instance(&mut self, id: ComputeInstanceId)

Remove a compute instance.


Panics if the identified instance still has active replicas.


pub fn dyncfg(&self) -> &Arc<ConfigSet>

Returns the compute controller’s config set.


pub fn update_configuration(&mut self, config_params: ComputeParameters)

Update compute configuration.


pub fn initialization_complete(&mut self)

Mark the end of any initialization commands.

The implementor may wait for this method to be called before implementing prior commands, and so it is important for a user to invoke this method as soon as it is comfortable. This method can be invoked immediately, at the potential expense of performance.


pub fn allow_writes(&mut self)

Allow this controller and instances controller by it to write to external systems.


pub async fn ready(&mut self)

Wait until the controller is ready to do some processing.

This method may block for an arbitrarily long time.

When the method returns, the caller should call ComputeController::process.

This method is cancellation safe.


pub fn set_subscribe_target_replica( &mut self, instance_id: ComputeInstanceId, subscribe_id: GlobalId, target_replica: ReplicaId ) -> Result<(), SubscribeTargetError>

Assign a target replica to the identified subscribe.

If a subscribe has a target replica assigned, only subscribe responses sent by that replica are considered.


pub fn add_replica_to_instance( &mut self, instance_id: ComputeInstanceId, replica_id: ReplicaId, location: ClusterReplicaLocation, config: ComputeReplicaConfig ) -> Result<(), ReplicaCreationError>

Adds replicas of an instance.


pub fn drop_replica( &mut self, instance_id: ComputeInstanceId, replica_id: ReplicaId ) -> Result<(), ReplicaDropError>

Removes a replica from an instance, including its service in the orchestrator.


pub fn create_dataflow( &mut self, instance_id: ComputeInstanceId, dataflow: DataflowDescription<Plan<T>, (), T> ) -> Result<(), DataflowCreationError>

Create and maintain the described dataflows, and initialize state for their output.

This method creates dataflows whose inputs are still readable at the dataflow as_of frontier, and initializes the outputs as readable from that frontier onward. It installs read dependencies from the outputs to the inputs, so that the input read capabilities will be held back to the output read capabilities, ensuring that we are always able to return to a state that can serve the output read capabilities.


pub fn drop_collections( &mut self, instance_id: ComputeInstanceId, collection_ids: Vec<GlobalId> ) -> Result<(), CollectionUpdateError>

Drop the read capability for the given collections and allow their resources to be reclaimed.


pub fn peek( &mut self, instance_id: ComputeInstanceId, collection_id: GlobalId, literal_constraints: Option<Vec<Row>>, uuid: Uuid, timestamp: T, finishing: RowSetFinishing, map_filter_project: SafeMfpPlan, target_replica: Option<ReplicaId>, peek_target: PeekTarget ) -> Result<(), PeekError>

Initiate a peek request for the contents of the given collection at timestamp.


pub fn cancel_peek( &mut self, instance_id: ComputeInstanceId, uuid: Uuid ) -> Result<(), InstanceMissing>

Cancel an existing peek request.

Canceling a peek is best effort. The caller may see any of the following after canceling a peek request:

  • A PeekResponse::Rows indicating that the cancellation request did not take effect in time and the query succeeded.
  • A PeekResponse::Canceled affirming that the peek was canceled.
  • No PeekResponse at all.

pub fn set_read_policy( &mut self, instance_id: ComputeInstanceId, policies: Vec<(GlobalId, ReadPolicy<T>)> ) -> Result<(), ReadPolicyError>

Assign a read policy to specific identifiers.

The policies are assigned in the order presented, and repeated identifiers should conclude with the last policy. Changing a policy will immediately downgrade the read capability if appropriate, but it will not “recover” the read capability if the prior capability is already ahead of it.

Identifiers not present in policies retain their existing read policies.

It is an error to attempt to set a read policy for a collection that is not readable in the context of compute. At this time, only indexes are readable compute collections.


async fn record_introspection_updates( &mut self, storage: &mut dyn StorageController<Timestamp = T> )


pub async fn process( &mut self, storage: &mut dyn StorageController<Timestamp = T> ) -> Option<ComputeControllerResponse<T>>

Processes the work queued by ComputeController::ready.


async fn maintain(&mut self, storage: &mut dyn StorageController<Timestamp = T>)

