pub trait StorageController: Debug + Send {
type Timestamp;
Show 27 methods
fn initialization_complete(&mut self);
fn update_configuration(&mut self, config_params: StorageParameters);
fn collection(
&self,
id: GlobalId
) -> Result<&CollectionState<Self::Timestamp>, StorageError>;
fn create_instance(&mut self, id: StorageInstanceId);
fn drop_instance(&mut self, id: StorageInstanceId);
fn connect_replica(
&mut self,
id: StorageInstanceId,
location: ClusterReplicaLocation
);
fn collection_mut(
&mut self,
id: GlobalId
) -> Result<&mut CollectionState<Self::Timestamp>, StorageError>;
fn collections(
&self
) -> Box<dyn Iterator<Item = (&GlobalId, &CollectionState<Self::Timestamp>)> + '_>;
fn migrate_collections<'life0, 'async_trait>(
&'life0 mut self,
collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>
) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + 'async_trait>>
where
Self: 'async_trait,
'life0: 'async_trait;
fn create_collections<'life0, 'async_trait>(
&'life0 mut self,
collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>
) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + 'async_trait>>
where
Self: 'async_trait,
'life0: 'async_trait;
fn export(
&self,
id: GlobalId
) -> Result<&ExportState<Self::Timestamp>, StorageError>;
fn export_mut(
&mut self,
id: GlobalId
) -> Result<&mut ExportState<Self::Timestamp>, StorageError>;
fn create_exports<'life0, 'async_trait>(
&'life0 mut self,
exports: Vec<(CreateExportToken<Self::Timestamp>, ExportDescription<Self::Timestamp>)>
) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + 'async_trait>>
where
Self: 'async_trait,
'life0: 'async_trait;
fn prepare_export(
&mut self,
id: GlobalId,
from_id: GlobalId
) -> Result<CreateExportToken<Self::Timestamp>, StorageError>;
fn cancel_prepare_export(
&mut self,
token: CreateExportToken<Self::Timestamp>
);
fn drop_sources(
&mut self,
identifiers: Vec<GlobalId>
) -> Result<(), StorageError>;
fn drop_sinks(
&mut self,
identifiers: Vec<GlobalId>
) -> Result<(), StorageError>;
fn drop_sinks_unvalidated(&mut self, identifiers: Vec<GlobalId>);
fn drop_sources_unvalidated(&mut self, identifiers: Vec<GlobalId>);
fn append(
&mut self,
commands: Vec<(GlobalId, Vec<Update<Self::Timestamp>>, Self::Timestamp)>
) -> Result<Receiver<Result<(), StorageError>>, StorageError>;
fn snapshot<'life0, 'async_trait>(
&'life0 self,
id: GlobalId,
as_of: Self::Timestamp
) -> Pin<Box<dyn Future<Output = Result<Vec<(Row, Diff)>, StorageError>> + 'async_trait>>
where
Self: 'async_trait,
'life0: 'async_trait;
fn set_read_policy(
&mut self,
policies: Vec<(GlobalId, ReadPolicy<Self::Timestamp>)>
);
fn update_write_frontiers(
&mut self,
updates: &[(GlobalId, Antichain<Self::Timestamp>)]
);
fn update_read_capabilities(
&mut self,
updates: &mut BTreeMap<GlobalId, ChangeBatch<Self::Timestamp>>
);
fn ready<'life0, 'async_trait>(
&'life0 mut self
) -> Pin<Box<dyn Future<Output = ()> + 'async_trait>>
where
Self: 'async_trait,
'life0: 'async_trait;
fn process<'life0, 'async_trait>(
&'life0 mut self
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + 'async_trait>>
where
Self: 'async_trait,
'life0: 'async_trait;
fn reconcile_state<'life0, 'async_trait>(
&'life0 mut self
) -> Pin<Box<dyn Future<Output = ()> + 'async_trait>>
where
Self: 'async_trait,
'life0: 'async_trait;
}
Required Associated Types§
Required Methods§
sourcefn initialization_complete(&mut self)
fn initialization_complete(&mut self)
Marks the end of any initialization commands.
The implementor may wait for this method to be called before implementing prior commands, and so it is important for a user to invoke this method as soon as it is comfortable. This method can be invoked immediately, at the potential expense of performance.
sourcefn update_configuration(&mut self, config_params: StorageParameters)
fn update_configuration(&mut self, config_params: StorageParameters)
Update storage configuration.
sourcefn collection(
&self,
id: GlobalId
) -> Result<&CollectionState<Self::Timestamp>, StorageError>
fn collection(
&self,
id: GlobalId
) -> Result<&CollectionState<Self::Timestamp>, StorageError>
Acquire an immutable reference to the collection state, should it exist.
sourcefn create_instance(&mut self, id: StorageInstanceId)
fn create_instance(&mut self, id: StorageInstanceId)
Creates a storage instance with the specified ID.
A storage instance can have zero or one replicas. The instance is created with zero replicas.
Panics if a storage instance with the given ID already exists.
sourcefn drop_instance(&mut self, id: StorageInstanceId)
fn drop_instance(&mut self, id: StorageInstanceId)
Drops the storage instance with the given ID.
If you call this method while the storage instance has a replica
attached, that replica will be leaked. Call drop_replica
first.
Panics if a storage instance with the given ID does not exist.
sourcefn connect_replica(
&mut self,
id: StorageInstanceId,
location: ClusterReplicaLocation
)
fn connect_replica(
&mut self,
id: StorageInstanceId,
location: ClusterReplicaLocation
)
Connects the storage instance to the specified replica.
If the storage instance is already attached to a replica, communication with that replica is severed in favor of the new replica.
In the future, this API will be adjusted to support active replication of storage instances (i.e., multiple replicas attached to a given storage instance).
sourcefn collection_mut(
&mut self,
id: GlobalId
) -> Result<&mut CollectionState<Self::Timestamp>, StorageError>
fn collection_mut(
&mut self,
id: GlobalId
) -> Result<&mut CollectionState<Self::Timestamp>, StorageError>
Acquire a mutable reference to the collection state, should it exist.
sourcefn collections(
&self
) -> Box<dyn Iterator<Item = (&GlobalId, &CollectionState<Self::Timestamp>)> + '_>
fn collections(
&self
) -> Box<dyn Iterator<Item = (&GlobalId, &CollectionState<Self::Timestamp>)> + '_>
Acquire an iterator over all collection states.
sourcefn migrate_collections<'life0, 'async_trait>(
&'life0 mut self,
collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>
) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn migrate_collections<'life0, 'async_trait>(
&'life0 mut self,
collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>
) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Migrate any storage controller state from previous versions to this version’s expectations.
This function must “see” the GlobalId of every collection you plan to create, but can be called with all of the catalog’s collections at once.
sourcefn create_collections<'life0, 'async_trait>(
&'life0 mut self,
collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>
) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn create_collections<'life0, 'async_trait>(
&'life0 mut self,
collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>
) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Create the sources described in the individual CreateSourceCommand commands.
Each command carries the source id, the source description, and any associated metadata needed to ingest the particular source.
This command installs collection state for the indicated sources, and the are
now valid to use in queries at times beyond the initial since
frontiers. Each
collection also acquires a read capability at this frontier, which will need to
be repeatedly downgraded with allow_compaction()
to permit compaction.
This method is NOT idempotent; It can fail between processing of different
collections and leave the controller in an inconsistent state. It is almost
always wrong to do anything but abort the process on Err
.
sourcefn export(
&self,
id: GlobalId
) -> Result<&ExportState<Self::Timestamp>, StorageError>
fn export(
&self,
id: GlobalId
) -> Result<&ExportState<Self::Timestamp>, StorageError>
Acquire an immutable reference to the export state, should it exist.
sourcefn export_mut(
&mut self,
id: GlobalId
) -> Result<&mut ExportState<Self::Timestamp>, StorageError>
fn export_mut(
&mut self,
id: GlobalId
) -> Result<&mut ExportState<Self::Timestamp>, StorageError>
Acquire a mutable reference to the export state, should it exist.
sourcefn create_exports<'life0, 'async_trait>(
&'life0 mut self,
exports: Vec<(CreateExportToken<Self::Timestamp>, ExportDescription<Self::Timestamp>)>
) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn create_exports<'life0, 'async_trait>(
&'life0 mut self,
exports: Vec<(CreateExportToken<Self::Timestamp>, ExportDescription<Self::Timestamp>)>
) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Create the sinks described by the ExportDescription
.
sourcefn prepare_export(
&mut self,
id: GlobalId,
from_id: GlobalId
) -> Result<CreateExportToken<Self::Timestamp>, StorageError>
fn prepare_export(
&mut self,
id: GlobalId,
from_id: GlobalId
) -> Result<CreateExportToken<Self::Timestamp>, StorageError>
Notify the storage controller to prepare for an export to be created
sourcefn cancel_prepare_export(&mut self, token: CreateExportToken<Self::Timestamp>)
fn cancel_prepare_export(&mut self, token: CreateExportToken<Self::Timestamp>)
Cancel the pending export
sourcefn drop_sources(
&mut self,
identifiers: Vec<GlobalId>
) -> Result<(), StorageError>
fn drop_sources(
&mut self,
identifiers: Vec<GlobalId>
) -> Result<(), StorageError>
Drops the read capability for the sources and allows their resources to be reclaimed.
sourcefn drop_sinks(&mut self, identifiers: Vec<GlobalId>) -> Result<(), StorageError>
fn drop_sinks(&mut self, identifiers: Vec<GlobalId>) -> Result<(), StorageError>
Drops the read capability for the sinks and allows their resources to be reclaimed.
sourcefn drop_sinks_unvalidated(&mut self, identifiers: Vec<GlobalId>)
fn drop_sinks_unvalidated(&mut self, identifiers: Vec<GlobalId>)
Drops the read capability for the sinks and allows their resources to be reclaimed.
TODO(jkosh44): This method does not validate the provided identifiers. Currently when the
controller starts/restarts it has no durable state. That means that it has no way of
remembering any past commands sent. In the future we plan on persisting state for the
controller so that it is aware of past commands.
Therefore this method is for dropping sinks that we know to have been previously
created, but have been forgotten by the controller due to a restart.
Once command history becomes durable we can remove this method and use the normal
drop_sinks
.
sourcefn drop_sources_unvalidated(&mut self, identifiers: Vec<GlobalId>)
fn drop_sources_unvalidated(&mut self, identifiers: Vec<GlobalId>)
Drops the read capability for the sources and allows their resources to be reclaimed.
TODO(jkosh44): This method does not validate the provided identifiers. Currently when the
controller starts/restarts it has no durable state. That means that it has no way of
remembering any past commands sent. In the future we plan on persisting state for the
controller so that it is aware of past commands.
Therefore this method is for dropping sources that we know to have been previously
created, but have been forgotten by the controller due to a restart.
Once command history becomes durable we can remove this method and use the normal
drop_sources
.
sourcefn append(
&mut self,
commands: Vec<(GlobalId, Vec<Update<Self::Timestamp>>, Self::Timestamp)>
) -> Result<Receiver<Result<(), StorageError>>, StorageError>
fn append(
&mut self,
commands: Vec<(GlobalId, Vec<Update<Self::Timestamp>>, Self::Timestamp)>
) -> Result<Receiver<Result<(), StorageError>>, StorageError>
Append updates
into the local input named id
and advance its upper to upper
.
The method returns a oneshot that can be awaited to indicate completion of the write. The method may return an error, indicating an immediately visible error, and also the oneshot may return an error if one is encountered during the write.
sourcefn snapshot<'life0, 'async_trait>(
&'life0 self,
id: GlobalId,
as_of: Self::Timestamp
) -> Pin<Box<dyn Future<Output = Result<Vec<(Row, Diff)>, StorageError>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn snapshot<'life0, 'async_trait>(
&'life0 self,
id: GlobalId,
as_of: Self::Timestamp
) -> Pin<Box<dyn Future<Output = Result<Vec<(Row, Diff)>, StorageError>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Returns the snapshot of the contents of the local input named id
at as_of
.
sourcefn set_read_policy(
&mut self,
policies: Vec<(GlobalId, ReadPolicy<Self::Timestamp>)>
)
fn set_read_policy(
&mut self,
policies: Vec<(GlobalId, ReadPolicy<Self::Timestamp>)>
)
Assigns a read policy to specific identifiers.
The policies are assigned in the order presented, and repeated identifiers should conclude with the last policy. Changing a policy will immediately downgrade the read capability if appropriate, but it will not “recover” the read capability if the prior capability is already ahead of it.
The StorageController
may include its own overrides on these policies.
Identifiers not present in policies
retain their existing read policies.
sourcefn update_write_frontiers(
&mut self,
updates: &[(GlobalId, Antichain<Self::Timestamp>)]
)
fn update_write_frontiers(
&mut self,
updates: &[(GlobalId, Antichain<Self::Timestamp>)]
)
Ingests write frontier updates for collections that this controller
maintains and potentially generates updates to read capabilities, which
are passed on to StorageController::update_read_capabilities
.
These updates come from the entity that is responsible for writing to
the collection, and in turn advancing its upper
(aka
write_frontier
). The most common such “writers” are:
-
clusterd
instances, for source ingestions -
introspection collections (which this controller writes to)
-
Tables (which are written to by this controller)
-
Materialized Views, which are running inside COMPUTE, and for which COMPUTE sends updates to this storage controller
The so-called “implied capability” is a read capability for a collection
that is updated based on the write frontier and the collections
ReadPolicy
. Advancing the write frontier might change this implied
capability, which in turn might change the overall since
(a
combination of all read capabilities) of a collection.
sourcefn update_read_capabilities(
&mut self,
updates: &mut BTreeMap<GlobalId, ChangeBatch<Self::Timestamp>>
)
fn update_read_capabilities(
&mut self,
updates: &mut BTreeMap<GlobalId, ChangeBatch<Self::Timestamp>>
)
Applies updates
and sends any appropriate compaction command.
sourcefn ready<'life0, 'async_trait>(
&'life0 mut self
) -> Pin<Box<dyn Future<Output = ()> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn ready<'life0, 'async_trait>(
&'life0 mut self
) -> Pin<Box<dyn Future<Output = ()> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Waits until the controller is ready to process a response.
This method may block for an arbitrarily long time.
When the method returns, the owner should call
StorageController::process
to process the ready message.
This method is cancellation safe.
sourcefn process<'life0, 'async_trait>(
&'life0 mut self
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn process<'life0, 'async_trait>(
&'life0 mut self
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Processes the work queued by StorageController::ready
.
This method is guaranteed to return “quickly” unless doing so would compromise the correctness of the system.
This method is not guaranteed to be cancellation safe. It must be awaited to completion.
sourcefn reconcile_state<'life0, 'async_trait>(
&'life0 mut self
) -> Pin<Box<dyn Future<Output = ()> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn reconcile_state<'life0, 'async_trait>(
&'life0 mut self
) -> Pin<Box<dyn Future<Output = ()> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Signal to the controller that the adapter has populated all of its initial state and the controller can reconcile (i.e. drop) any unclaimed resources.