pub trait StorageController: Debug + Send {
    type Timestamp;

Show 21 methods fn initialization_complete(&mut self); fn collection(
        &self,
        id: GlobalId
    ) -> Result<&CollectionState<Self::Timestamp>, StorageError>; fn collection_mut(
        &mut self,
        id: GlobalId
    ) -> Result<&mut CollectionState<Self::Timestamp>, StorageError>; fn create_collections<'life0, 'async_trait>(
        &'life0 mut self,
        collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>
    ) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + 'async_trait>>
    where
        'life0: 'async_trait,
        Self: 'async_trait
; fn alter_collections<'life0, 'async_trait>(
        &'life0 mut self,
        collections: Vec<(GlobalId, StorageHostConfig)>
    ) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + 'async_trait>>
    where
        'life0: 'async_trait,
        Self: 'async_trait
; fn export(
        &self,
        id: GlobalId
    ) -> Result<&ExportState<Self::Timestamp>, StorageError>; fn export_mut(
        &mut self,
        id: GlobalId
    ) -> Result<&mut ExportState<Self::Timestamp>, StorageError>; fn create_exports<'life0, 'async_trait>(
        &'life0 mut self,
        exports: Vec<(CreateExportToken, ExportDescription<Self::Timestamp>)>
    ) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + 'async_trait>>
    where
        'life0: 'async_trait,
        Self: 'async_trait
; fn prepare_export<'life0, 'async_trait>(
        &'life0 mut self,
        id: GlobalId,
        from_id: GlobalId
    ) -> Pin<Box<dyn Future<Output = Result<CreateExportToken, StorageError>> + 'async_trait>>
    where
        'life0: 'async_trait,
        Self: 'async_trait
; fn cancel_prepare_export<'life0, 'async_trait>(
        &'life0 mut self,
        token: CreateExportToken
    ) -> Pin<Box<dyn Future<Output = ()> + 'async_trait>>
    where
        'life0: 'async_trait,
        Self: 'async_trait
; fn drop_sources<'life0, 'async_trait>(
        &'life0 mut self,
        identifiers: Vec<GlobalId>
    ) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + 'async_trait>>
    where
        'life0: 'async_trait,
        Self: 'async_trait
; fn drop_sinks<'life0, 'async_trait>(
        &'life0 mut self,
        identifiers: Vec<GlobalId>
    ) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + 'async_trait>>
    where
        'life0: 'async_trait,
        Self: 'async_trait
; fn drop_sinks_unvalidated<'life0, 'async_trait>(
        &'life0 mut self,
        identifiers: Vec<GlobalId>
    ) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + 'async_trait>>
    where
        'life0: 'async_trait,
        Self: 'async_trait
; fn drop_sources_unvalidated<'life0, 'async_trait>(
        &'life0 mut self,
        identifiers: Vec<GlobalId>
    ) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + 'async_trait>>
    where
        'life0: 'async_trait,
        Self: 'async_trait
; fn append(
        &mut self,
        commands: Vec<(GlobalId, Vec<Update<Self::Timestamp>>, Self::Timestamp)>
    ) -> Result<Receiver<Result<(), StorageError>>, StorageError>; fn snapshot<'life0, 'async_trait>(
        &'life0 mut self,
        id: GlobalId,
        as_of: Self::Timestamp
    ) -> Pin<Box<dyn Future<Output = Result<Vec<(Row, Diff)>, StorageError>> + 'async_trait>>
    where
        'life0: 'async_trait,
        Self: 'async_trait
; fn set_read_policy<'life0, 'async_trait>(
        &'life0 mut self,
        policies: Vec<(GlobalId, ReadPolicy<Self::Timestamp>)>
    ) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + 'async_trait>>
    where
        'life0: 'async_trait,
        Self: 'async_trait
; fn update_write_frontiers<'life0, 'life1, 'async_trait>(
        &'life0 mut self,
        updates: &'life1 [(GlobalId, Antichain<Self::Timestamp>)]
    ) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + 'async_trait>>
    where
        'life0: 'async_trait,
        'life1: 'async_trait,
        Self: 'async_trait
; fn update_read_capabilities<'life0, 'life1, 'async_trait>(
        &'life0 mut self,
        updates: &'life1 mut BTreeMap<GlobalId, ChangeBatch<Self::Timestamp>>
    ) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + 'async_trait>>
    where
        'life0: 'async_trait,
        'life1: 'async_trait,
        Self: 'async_trait
; fn ready<'life0, 'async_trait>(
        &'life0 mut self
    ) -> Pin<Box<dyn Future<Output = ()> + 'async_trait>>
    where
        'life0: 'async_trait,
        Self: 'async_trait
; fn process<'life0, 'async_trait>(
        &'life0 mut self
    ) -> Pin<Box<dyn Future<Output = Result<(), Error>> + 'async_trait>>
    where
        'life0: 'async_trait,
        Self: 'async_trait
;
}

Required Associated Types

Required Methods

Marks the end of any initialization commands.

The implementor may wait for this method to be called before implementing prior commands, and so it is important for a user to invoke this method as soon as it is comfortable. This method can be invoked immediately, at the potential expense of performance.

Acquire an immutable reference to the collection state, should it exist.

Acquire a mutable reference to the collection state, should it exist.

Create the sources described in the individual CreateSourceCommand commands.

Each command carries the source id, the source description, and any associated metadata needed to ingest the particular source.

This command installs collection state for the indicated sources, and the are now valid to use in queries at times beyond the initial since frontiers. Each collection also acquires a read capability at this frontier, which will need to be repeatedly downgraded with allow_compaction() to permit compaction.

Acquire an immutable reference to the export state, should it exist.

Acquire a mutable reference to the export state, should it exist.

Create the sinks described by the ExportDescription.

Notify the storage controller to prepare for an export to be created

Cancel the pending export

Drops the read capability for the sources and allows their resources to be reclaimed.

Drops the read capability for the sinks and allows their resources to be reclaimed.

Drops the read capability for the sinks and allows their resources to be reclaimed.

TODO(jkosh44): This method does not validate the provided identifiers. Currently when the controller starts/restarts it has no durable state. That means that it has no way of remembering any past commands sent. In the future we plan on persisting state for the controller so that it is aware of past commands. Therefore this method is for dropping sinks that we know to have been previously created, but have been forgotten by the controller due to a restart. Once command history becomes durable we can remove this method and use the normal drop_sinks.

Drops the read capability for the sources and allows their resources to be reclaimed.

TODO(jkosh44): This method does not validate the provided identifiers. Currently when the controller starts/restarts it has no durable state. That means that it has no way of remembering any past commands sent. In the future we plan on persisting state for the controller so that it is aware of past commands. Therefore this method is for dropping sources that we know to have been previously created, but have been forgotten by the controller due to a restart. Once command history becomes durable we can remove this method and use the normal drop_sources.

Append updates into the local input named id and advance its upper to upper.

The method returns a oneshot that can be awaited to indicate completion of the write. The method may return an error, indicating an immediately visible error, and also the oneshot may return an error if one is encountered during the write.

Returns the snapshot of the contents of the local input named id at as_of.

Assigns a read policy to specific identifiers.

The policies are assigned in the order presented, and repeated identifiers should conclude with the last policy. Changing a policy will immediately downgrade the read capability if appropriate, but it will not “recover” the read capability if the prior capability is already ahead of it.

The StorageController may include its own overrides on these policies.

Identifiers not present in policies retain their existing read policies.

Ingests write frontier updates for collections that this controller maintains and potentially generates updates to read capabilities, which are passed on to StorageController::update_read_capabilities.

These updates come from the entity that is responsible for writing to the collection, and in turn advancing its upper (aka write_frontier). The most common such “writers” are:

  • storaged instances, for source ingestions

  • introspection collections (which this controller writes to)

  • Tables (which are written to by this controller)

  • Materialized Views, which are running inside COMPUTE, and for which COMPUTE sends updates to this storage controller

The so-called “implied capability” is a read capability for a collection that is updated based on the write frontier and the collections ReadPolicy. Advancing the write frontier might change this implied capability, which in turn might change the overall since (a combination of all read capabilities) of a collection.

Applies updates and sends any appropriate compaction command.

Waits until the controller is ready to process a response.

This method may block for an arbitrarily long time.

When the method returns, the owner should call StorageController::process to process the ready message.

This method is cancellation safe.

Processes the work queued by StorageController::ready.

This method is guaranteed to return “quickly” unless doing so would compromise the correctness of the system.

This method is not guaranteed to be cancellation safe. It must be awaited to completion.

Implementors