pub trait StorageCollections: Debug {
type Timestamp: TimelyTimestamp;
Show 21 methods
// Required methods
fn initialize_state<'life0, 'life1, 'async_trait>(
&'life0 self,
txn: &'life1 mut (dyn StorageTxn<Self::Timestamp> + Send),
init_ids: BTreeSet<GlobalId>,
drop_ids: BTreeSet<GlobalId>,
) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait;
fn update_parameters(&self, config_params: StorageParameters);
fn collection_metadata(
&self,
id: GlobalId,
) -> Result<CollectionMetadata, StorageError<Self::Timestamp>>;
fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)>;
fn collections_frontiers(
&self,
id: Vec<GlobalId>,
) -> Result<Vec<CollectionFrontiers<Self::Timestamp>>, StorageError<Self::Timestamp>>;
fn active_collection_frontiers(
&self,
) -> Vec<CollectionFrontiers<Self::Timestamp>>;
fn check_exists(
&self,
id: GlobalId,
) -> Result<(), StorageError<Self::Timestamp>>;
fn snapshot_stats<'life0, 'async_trait>(
&'life0 self,
id: GlobalId,
as_of: Antichain<Self::Timestamp>,
) -> Pin<Box<dyn Future<Output = Result<SnapshotStats, StorageError<Self::Timestamp>>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait;
fn snapshot_parts_stats<'life0, 'async_trait>(
&'life0 self,
id: GlobalId,
as_of: Antichain<Self::Timestamp>,
) -> Pin<Box<dyn Future<Output = BoxFuture<'static, Result<SnapshotPartsStats, StorageError<Self::Timestamp>>>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait;
fn prepare_state<'life0, 'life1, 'async_trait>(
&'life0 self,
txn: &'life1 mut (dyn StorageTxn<Self::Timestamp> + Send),
ids_to_add: BTreeSet<GlobalId>,
ids_to_drop: BTreeSet<GlobalId>,
) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait;
fn create_collections_for_bootstrap<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
storage_metadata: &'life1 StorageMetadata,
register_ts: Option<Self::Timestamp>,
collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
migrated_storage_collections: &'life2 BTreeSet<GlobalId>,
) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait;
fn alter_ingestion_source_desc<'life0, 'async_trait>(
&'life0 self,
ingestion_id: GlobalId,
source_desc: SourceDesc,
) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait;
fn alter_ingestion_export_data_configs<'life0, 'async_trait>(
&'life0 self,
source_exports: BTreeMap<GlobalId, SourceExportDataConfig>,
) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait;
fn alter_ingestion_connections<'life0, 'async_trait>(
&'life0 self,
source_connections: BTreeMap<GlobalId, GenericSourceConnection<InlinedConnection>>,
) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait;
fn alter_table_desc(
&self,
table_id: GlobalId,
new_desc: RelationDesc,
) -> Result<(), StorageError<Self::Timestamp>>;
fn drop_collections_unvalidated(
&self,
storage_metadata: &StorageMetadata,
identifiers: Vec<GlobalId>,
);
fn set_read_policies(
&self,
policies: Vec<(GlobalId, ReadPolicy<Self::Timestamp>)>,
);
fn acquire_read_holds(
&self,
desired_holds: Vec<GlobalId>,
) -> Result<Vec<ReadHold<Self::Timestamp>>, ReadHoldError>;
fn determine_time_dependence(
&self,
id: GlobalId,
) -> Result<Option<TimeDependence>, TimeDependenceError>;
// Provided methods
fn collection_frontiers(
&self,
id: GlobalId,
) -> Result<CollectionFrontiers<Self::Timestamp>, StorageError<Self::Timestamp>> { ... }
fn create_collections<'life0, 'life1, 'async_trait>(
&'life0 self,
storage_metadata: &'life1 StorageMetadata,
register_ts: Option<Self::Timestamp>,
collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + Send + 'async_trait>>
where Self: Sync + 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait { ... }
}
Expand description
An abstraction for keeping track of storage collections and managing access to them.
Responsibilities:
-
Keeps a critical persist handle for holding the since of collections where it need to be.
-
Drives the since forward based on the upper of a collection and a ReadPolicy.
-
Hands out ReadHolds that prevent a collection’s since from advancing while it needs to be read at a specific time.
Required Associated Types§
Required Methods§
sourcefn initialize_state<'life0, 'life1, 'async_trait>(
&'life0 self,
txn: &'life1 mut (dyn StorageTxn<Self::Timestamp> + Send),
init_ids: BTreeSet<GlobalId>,
drop_ids: BTreeSet<GlobalId>,
) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn initialize_state<'life0, 'life1, 'async_trait>(
&'life0 self,
txn: &'life1 mut (dyn StorageTxn<Self::Timestamp> + Send),
init_ids: BTreeSet<GlobalId>,
drop_ids: BTreeSet<GlobalId>,
) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
On boot, reconcile this StorageCollections with outside state. We get a StorageTxn where we can record any durable state that we need.
We get init_ids
, which tells us about all collections that currently
exist, so that we can record durable state for those that we don’t
know yet about.
We also get drop_ids
, which tells us about all collections that we
might have known about before and have now been dropped.
sourcefn update_parameters(&self, config_params: StorageParameters)
fn update_parameters(&self, config_params: StorageParameters)
Update storage configuration with new parameters.
sourcefn collection_metadata(
&self,
id: GlobalId,
) -> Result<CollectionMetadata, StorageError<Self::Timestamp>>
fn collection_metadata( &self, id: GlobalId, ) -> Result<CollectionMetadata, StorageError<Self::Timestamp>>
Returns the CollectionMetadata of the collection identified by id
.
sourcefn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)>
fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)>
Acquire an iterator over CollectionMetadata for all active collections.
A collection is “active” when it has a non empty frontier of read capabilties.
sourcefn collections_frontiers(
&self,
id: Vec<GlobalId>,
) -> Result<Vec<CollectionFrontiers<Self::Timestamp>>, StorageError<Self::Timestamp>>
fn collections_frontiers( &self, id: Vec<GlobalId>, ) -> Result<Vec<CollectionFrontiers<Self::Timestamp>>, StorageError<Self::Timestamp>>
Atomically gets and returns the frontiers of all the identified collections.
sourcefn active_collection_frontiers(
&self,
) -> Vec<CollectionFrontiers<Self::Timestamp>>
fn active_collection_frontiers( &self, ) -> Vec<CollectionFrontiers<Self::Timestamp>>
Atomically gets and returns the frontiers of all active collections.
A collection is “active” when it has a non empty frontier of read capabilties.
sourcefn check_exists(
&self,
id: GlobalId,
) -> Result<(), StorageError<Self::Timestamp>>
fn check_exists( &self, id: GlobalId, ) -> Result<(), StorageError<Self::Timestamp>>
Checks whether a collection exists under the given GlobalId
. Returns
an error if the collection does not exist.
sourcefn snapshot_stats<'life0, 'async_trait>(
&'life0 self,
id: GlobalId,
as_of: Antichain<Self::Timestamp>,
) -> Pin<Box<dyn Future<Output = Result<SnapshotStats, StorageError<Self::Timestamp>>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn snapshot_stats<'life0, 'async_trait>(
&'life0 self,
id: GlobalId,
as_of: Antichain<Self::Timestamp>,
) -> Pin<Box<dyn Future<Output = Result<SnapshotStats, StorageError<Self::Timestamp>>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Returns aggregate statistics about the contents of the local input named
id
at as_of
.
sourcefn snapshot_parts_stats<'life0, 'async_trait>(
&'life0 self,
id: GlobalId,
as_of: Antichain<Self::Timestamp>,
) -> Pin<Box<dyn Future<Output = BoxFuture<'static, Result<SnapshotPartsStats, StorageError<Self::Timestamp>>>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn snapshot_parts_stats<'life0, 'async_trait>(
&'life0 self,
id: GlobalId,
as_of: Antichain<Self::Timestamp>,
) -> Pin<Box<dyn Future<Output = BoxFuture<'static, Result<SnapshotPartsStats, StorageError<Self::Timestamp>>>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Returns aggregate statistics about the contents of the local input named
id
at as_of
.
Note that this async function itself returns a future. We may need to block on the stats being available, but don’t want to hold a reference to the controller for too long… so the outer future holds a reference to the controller but returns quickly, and the inner future is slow but does not reference the controller.
sourcefn prepare_state<'life0, 'life1, 'async_trait>(
&'life0 self,
txn: &'life1 mut (dyn StorageTxn<Self::Timestamp> + Send),
ids_to_add: BTreeSet<GlobalId>,
ids_to_drop: BTreeSet<GlobalId>,
) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn prepare_state<'life0, 'life1, 'async_trait>(
&'life0 self,
txn: &'life1 mut (dyn StorageTxn<Self::Timestamp> + Send),
ids_to_add: BTreeSet<GlobalId>,
ids_to_drop: BTreeSet<GlobalId>,
) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Update the given StorageTxn
with the appropriate metadata given the
IDs to add and drop.
The data modified in the StorageTxn
must be made available in all
subsequent calls that require StorageMetadata
as a parameter.
sourcefn create_collections_for_bootstrap<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
storage_metadata: &'life1 StorageMetadata,
register_ts: Option<Self::Timestamp>,
collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
migrated_storage_collections: &'life2 BTreeSet<GlobalId>,
) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
fn create_collections_for_bootstrap<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
storage_metadata: &'life1 StorageMetadata,
register_ts: Option<Self::Timestamp>,
collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
migrated_storage_collections: &'life2 BTreeSet<GlobalId>,
) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
Like Self::create_collections
, except used specifically for bootstrap.
migrated_storage_collections
is a set of migrated storage collections to be excluded
from the txn-wal sub-system.
sourcefn alter_ingestion_source_desc<'life0, 'async_trait>(
&'life0 self,
ingestion_id: GlobalId,
source_desc: SourceDesc,
) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn alter_ingestion_source_desc<'life0, 'async_trait>(
&'life0 self,
ingestion_id: GlobalId,
source_desc: SourceDesc,
) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Alters the identified ingestion to use the provided SourceDesc
.
NOTE: Ideally, StorageCollections would not care about these, but we have to learn about changes such that when new subsources are created we can correctly determine a since based on its depenencies’ sinces. This is really only relevant because newly created subsources depend on the remap shard, and we can’t just have them start at since 0.
sourcefn alter_ingestion_export_data_configs<'life0, 'async_trait>(
&'life0 self,
source_exports: BTreeMap<GlobalId, SourceExportDataConfig>,
) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn alter_ingestion_export_data_configs<'life0, 'async_trait>(
&'life0 self,
source_exports: BTreeMap<GlobalId, SourceExportDataConfig>,
) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Alters the data config for the specified source exports of the specified ingestions.
sourcefn alter_ingestion_connections<'life0, 'async_trait>(
&'life0 self,
source_connections: BTreeMap<GlobalId, GenericSourceConnection<InlinedConnection>>,
) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn alter_ingestion_connections<'life0, 'async_trait>(
&'life0 self,
source_connections: BTreeMap<GlobalId, GenericSourceConnection<InlinedConnection>>,
) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Alters each identified collection to use the correlated
GenericSourceConnection
.
See NOTE on StorageCollections::alter_ingestion_source_desc.
sourcefn alter_table_desc(
&self,
table_id: GlobalId,
new_desc: RelationDesc,
) -> Result<(), StorageError<Self::Timestamp>>
fn alter_table_desc( &self, table_id: GlobalId, new_desc: RelationDesc, ) -> Result<(), StorageError<Self::Timestamp>>
Updates the RelationDesc
for the specified table.
sourcefn drop_collections_unvalidated(
&self,
storage_metadata: &StorageMetadata,
identifiers: Vec<GlobalId>,
)
fn drop_collections_unvalidated( &self, storage_metadata: &StorageMetadata, identifiers: Vec<GlobalId>, )
Drops the read capability for the sources and allows their resources to be reclaimed.
TODO(jkosh44): This method does not validate the provided identifiers.
Currently when the controller starts/restarts it has no durable state.
That means that it has no way of remembering any past commands sent. In
the future we plan on persisting state for the controller so that it is
aware of past commands. Therefore this method is for dropping sources
that we know to have been previously created, but have been forgotten by
the controller due to a restart. Once command history becomes durable we
can remove this method and use the normal drop_sources
.
sourcefn set_read_policies(
&self,
policies: Vec<(GlobalId, ReadPolicy<Self::Timestamp>)>,
)
fn set_read_policies( &self, policies: Vec<(GlobalId, ReadPolicy<Self::Timestamp>)>, )
Assigns a read policy to specific identifiers.
The policies are assigned in the order presented, and repeated identifiers should conclude with the last policy. Changing a policy will immediately downgrade the read capability if appropriate, but it will not “recover” the read capability if the prior capability is already ahead of it.
This StorageCollections may include its own overrides on these policies.
Identifiers not present in policies
retain their existing read
policies.
sourcefn acquire_read_holds(
&self,
desired_holds: Vec<GlobalId>,
) -> Result<Vec<ReadHold<Self::Timestamp>>, ReadHoldError>
fn acquire_read_holds( &self, desired_holds: Vec<GlobalId>, ) -> Result<Vec<ReadHold<Self::Timestamp>>, ReadHoldError>
Acquires and returns the earliest possible read holds for the specified collections.
sourcefn determine_time_dependence(
&self,
id: GlobalId,
) -> Result<Option<TimeDependence>, TimeDependenceError>
fn determine_time_dependence( &self, id: GlobalId, ) -> Result<Option<TimeDependence>, TimeDependenceError>
Get the time dependence for a storage collection. Returns no value if unknown or if the object isn’t managed by storage.
Provided Methods§
sourcefn collection_frontiers(
&self,
id: GlobalId,
) -> Result<CollectionFrontiers<Self::Timestamp>, StorageError<Self::Timestamp>>
fn collection_frontiers( &self, id: GlobalId, ) -> Result<CollectionFrontiers<Self::Timestamp>, StorageError<Self::Timestamp>>
Returns the frontiers of the identified collection.
sourcefn create_collections<'life0, 'life1, 'async_trait>(
&'life0 self,
storage_metadata: &'life1 StorageMetadata,
register_ts: Option<Self::Timestamp>,
collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + Send + 'async_trait>>where
Self: Sync + 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn create_collections<'life0, 'life1, 'async_trait>(
&'life0 self,
storage_metadata: &'life1 StorageMetadata,
register_ts: Option<Self::Timestamp>,
collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + Send + 'async_trait>>where
Self: Sync + 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Create the collections described by the individual CollectionDescriptions.
Each command carries the source id, the source description, and any associated metadata needed to ingest the particular source.
This command installs collection state for the indicated sources, and
they are now valid to use in queries at times beyond the initial since
frontiers. Each collection also acquires a read capability at this
frontier, which will need to be repeatedly downgraded with
allow_compaction()
to permit compaction.
This method is NOT idempotent; It can fail between processing of
different collections and leave the StorageCollections in an
inconsistent state. It is almost always wrong to do anything but abort
the process on Err
.
The register_ts
is used as the initial timestamp that tables are
available for reads. (We might later give non-tables the same treatment,
but hold off on that initially.) Callers must provide a Some if any of
the collections is a table. A None may be given if none of the
collections are a table (i.e. all materialized views, sources, etc).