pub trait StorageCollections: Debug {
    type Timestamp: TimelyTimestamp;

Show 19 methods // Required methods fn initialize_state<'life0, 'life1, 'async_trait>( &'life0 self, txn: &'life1 mut (dyn StorageTxn<Self::Timestamp> + Send), init_ids: BTreeSet<GlobalId>, drop_ids: BTreeSet<GlobalId>, ) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait; fn update_parameters(&self, config_params: StorageParameters); fn collection_metadata( &self, id: GlobalId, ) -> Result<CollectionMetadata, StorageError<Self::Timestamp>>; fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)>; fn collections_frontiers( &self, id: Vec<GlobalId>, ) -> Result<Vec<CollectionFrontiers<Self::Timestamp>>, StorageError<Self::Timestamp>>; fn active_collection_frontiers( &self, ) -> Vec<CollectionFrontiers<Self::Timestamp>>; fn check_exists( &self, id: GlobalId, ) -> Result<(), StorageError<Self::Timestamp>>; fn snapshot_stats<'life0, 'async_trait>( &'life0 self, id: GlobalId, as_of: Antichain<Self::Timestamp>, ) -> Pin<Box<dyn Future<Output = Result<SnapshotStats, StorageError<Self::Timestamp>>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait; fn snapshot_parts_stats<'life0, 'async_trait>( &'life0 self, id: GlobalId, as_of: Antichain<Self::Timestamp>, ) -> Pin<Box<dyn Future<Output = BoxFuture<'static, Result<SnapshotPartsStats, StorageError<Self::Timestamp>>>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait; fn prepare_state<'life0, 'life1, 'async_trait>( &'life0 self, txn: &'life1 mut (dyn StorageTxn<Self::Timestamp> + Send), ids_to_add: BTreeSet<GlobalId>, ids_to_drop: BTreeSet<GlobalId>, ) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait; fn create_collections_for_bootstrap<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, storage_metadata: &'life1 StorageMetadata, register_ts: Option<Self::Timestamp>, collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>, migrated_storage_collections: &'life2 BTreeSet<GlobalId>, ) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait; fn alter_ingestion_source_desc<'life0, 'async_trait>( &'life0 self, ingestion_id: GlobalId, source_desc: SourceDesc, ) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait; fn alter_ingestion_connections<'life0, 'async_trait>( &'life0 self, source_connections: BTreeMap<GlobalId, GenericSourceConnection<InlinedConnection>>, ) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait; fn alter_table_desc<'life0, 'async_trait>( &'life0 self, table_id: GlobalId, new_desc: RelationDesc, ) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait; fn drop_collections_unvalidated( &self, storage_metadata: &StorageMetadata, identifiers: Vec<GlobalId>, ); fn set_read_policies( &self, policies: Vec<(GlobalId, ReadPolicy<Self::Timestamp>)>, ); fn acquire_read_holds( &self, desired_holds: Vec<GlobalId>, ) -> Result<Vec<ReadHold<Self::Timestamp>>, ReadHoldError>; // Provided methods fn collection_frontiers( &self, id: GlobalId, ) -> Result<CollectionFrontiers<Self::Timestamp>, StorageError<Self::Timestamp>> { ... } fn create_collections<'life0, 'life1, 'async_trait>( &'life0 self, storage_metadata: &'life1 StorageMetadata, register_ts: Option<Self::Timestamp>, collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>, ) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + Send + 'async_trait>> where Self: Sync + 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait { ... }
}
Expand description

An abstraction for keeping track of storage collections and managing access to them.

Responsibilities:

  • Keeps a critical persist handle for holding the since of collections where it need to be.

  • Drives the since forward based on the upper of a collection and a ReadPolicy.

  • Hands out ReadHolds that prevent a collection’s since from advancing while it needs to be read at a specific time.

Required Associated Types§

Required Methods§

source

fn initialize_state<'life0, 'life1, 'async_trait>( &'life0 self, txn: &'life1 mut (dyn StorageTxn<Self::Timestamp> + Send), init_ids: BTreeSet<GlobalId>, drop_ids: BTreeSet<GlobalId>, ) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

On boot, reconcile this StorageCollections with outside state. We get a StorageTxn where we can record any durable state that we need.

We get init_ids, which tells us about all collections that currently exist, so that we can record durable state for those that we don’t know yet about.

We also get drop_ids, which tells us about all collections that we might have known about before and have now been dropped.

source

fn update_parameters(&self, config_params: StorageParameters)

Update storage configuration with new parameters.

source

fn collection_metadata( &self, id: GlobalId, ) -> Result<CollectionMetadata, StorageError<Self::Timestamp>>

Returns the CollectionMetadata of the collection identified by id.

source

fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)>

Acquire an iterator over CollectionMetadata for all active collections.

A collection is “active” when it has a non empty frontier of read capabilties.

source

fn collections_frontiers( &self, id: Vec<GlobalId>, ) -> Result<Vec<CollectionFrontiers<Self::Timestamp>>, StorageError<Self::Timestamp>>

Atomically gets and returns the frontiers of all the identified collections.

source

fn active_collection_frontiers( &self, ) -> Vec<CollectionFrontiers<Self::Timestamp>>

Atomically gets and returns the frontiers of all active collections.

A collection is “active” when it has a non empty frontier of read capabilties.

source

fn check_exists( &self, id: GlobalId, ) -> Result<(), StorageError<Self::Timestamp>>

Checks whether a collection exists under the given GlobalId. Returns an error if the collection does not exist.

source

fn snapshot_stats<'life0, 'async_trait>( &'life0 self, id: GlobalId, as_of: Antichain<Self::Timestamp>, ) -> Pin<Box<dyn Future<Output = Result<SnapshotStats, StorageError<Self::Timestamp>>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Returns aggregate statistics about the contents of the local input named id at as_of.

source

fn snapshot_parts_stats<'life0, 'async_trait>( &'life0 self, id: GlobalId, as_of: Antichain<Self::Timestamp>, ) -> Pin<Box<dyn Future<Output = BoxFuture<'static, Result<SnapshotPartsStats, StorageError<Self::Timestamp>>>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Returns aggregate statistics about the contents of the local input named id at as_of.

Note that this async function itself returns a future. We may need to block on the stats being available, but don’t want to hold a reference to the controller for too long… so the outer future holds a reference to the controller but returns quickly, and the inner future is slow but does not reference the controller.

source

fn prepare_state<'life0, 'life1, 'async_trait>( &'life0 self, txn: &'life1 mut (dyn StorageTxn<Self::Timestamp> + Send), ids_to_add: BTreeSet<GlobalId>, ids_to_drop: BTreeSet<GlobalId>, ) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Update the given StorageTxn with the appropriate metadata given the IDs to add and drop.

The data modified in the StorageTxn must be made available in all subsequent calls that require StorageMetadata as a parameter.

source

fn create_collections_for_bootstrap<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, storage_metadata: &'life1 StorageMetadata, register_ts: Option<Self::Timestamp>, collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>, migrated_storage_collections: &'life2 BTreeSet<GlobalId>, ) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Like Self::create_collections, except used specifically for bootstrap.

migrated_storage_collections is a set of migrated storage collections to be excluded from the txn-wal sub-system.

source

fn alter_ingestion_source_desc<'life0, 'async_trait>( &'life0 self, ingestion_id: GlobalId, source_desc: SourceDesc, ) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Alters the identified ingestion to use the provided SourceDesc.

NOTE: Ideally, StorageCollections would not care about these, but we have to learn about changes such that when new subsources are created we can correctly determine a since based on its depenencies’ sinces. This is really only relevant because newly created subsources depend on the remap shard, and we can’t just have them start at since 0.

source

fn alter_ingestion_connections<'life0, 'async_trait>( &'life0 self, source_connections: BTreeMap<GlobalId, GenericSourceConnection<InlinedConnection>>, ) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Alters each identified collection to use the correlated GenericSourceConnection.

See NOTE on StorageCollections::alter_ingestion_source_desc.

source

fn alter_table_desc<'life0, 'async_trait>( &'life0 self, table_id: GlobalId, new_desc: RelationDesc, ) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Updates the RelationDesc for the specified table.

source

fn drop_collections_unvalidated( &self, storage_metadata: &StorageMetadata, identifiers: Vec<GlobalId>, )

Drops the read capability for the sources and allows their resources to be reclaimed.

TODO(jkosh44): This method does not validate the provided identifiers. Currently when the controller starts/restarts it has no durable state. That means that it has no way of remembering any past commands sent. In the future we plan on persisting state for the controller so that it is aware of past commands. Therefore this method is for dropping sources that we know to have been previously created, but have been forgotten by the controller due to a restart. Once command history becomes durable we can remove this method and use the normal drop_sources.

source

fn set_read_policies( &self, policies: Vec<(GlobalId, ReadPolicy<Self::Timestamp>)>, )

Assigns a read policy to specific identifiers.

The policies are assigned in the order presented, and repeated identifiers should conclude with the last policy. Changing a policy will immediately downgrade the read capability if appropriate, but it will not “recover” the read capability if the prior capability is already ahead of it.

This StorageCollections may include its own overrides on these policies.

Identifiers not present in policies retain their existing read policies.

source

fn acquire_read_holds( &self, desired_holds: Vec<GlobalId>, ) -> Result<Vec<ReadHold<Self::Timestamp>>, ReadHoldError>

Acquires and returns the earliest possible read holds for the specified collections.

Provided Methods§

source

fn collection_frontiers( &self, id: GlobalId, ) -> Result<CollectionFrontiers<Self::Timestamp>, StorageError<Self::Timestamp>>

Returns the frontiers of the identified collection.

source

fn create_collections<'life0, 'life1, 'async_trait>( &'life0 self, storage_metadata: &'life1 StorageMetadata, register_ts: Option<Self::Timestamp>, collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>, ) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + Send + 'async_trait>>
where Self: Sync + 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Create the collections described by the individual CollectionDescriptions.

Each command carries the source id, the source description, and any associated metadata needed to ingest the particular source.

This command installs collection state for the indicated sources, and they are now valid to use in queries at times beyond the initial since frontiers. Each collection also acquires a read capability at this frontier, which will need to be repeatedly downgraded with allow_compaction() to permit compaction.

This method is NOT idempotent; It can fail between processing of different collections and leave the StorageCollections in an inconsistent state. It is almost always wrong to do anything but abort the process on Err.

The register_ts is used as the initial timestamp that tables are available for reads. (We might later give non-tables the same treatment, but hold off on that initially.) Callers must provide a Some if any of the collections is a table. A None may be given if none of the collections are a table (i.e. all materialized views, sources, etc).

Implementors§