pub trait StorageController: Debug {
    type Timestamp;

Show 40 methods // Required methods fn initialization_complete(&mut self); fn update_parameters(&mut self, config_params: StorageParameters); fn config(&self) -> &StorageConfiguration; fn collection( &self, id: GlobalId ) -> Result<&CollectionState<Self::Timestamp>, StorageError<Self::Timestamp>>; fn create_instance(&mut self, id: StorageInstanceId); fn drop_instance(&mut self, id: StorageInstanceId); fn connect_replica( &mut self, instance_id: StorageInstanceId, replica_id: ReplicaId, location: ClusterReplicaLocation ); fn drop_replica( &mut self, instance_id: StorageInstanceId, replica_id: ReplicaId ); fn collection_mut( &mut self, id: GlobalId ) -> Result<&mut CollectionState<Self::Timestamp>, StorageError<Self::Timestamp>>; fn collections( &self ) -> Box<dyn Iterator<Item = (&GlobalId, &CollectionState<Self::Timestamp>)> + '_>; fn migrate_collections<'life0, 'async_trait>( &'life0 mut self, collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)> ) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait; fn create_collections<'life0, 'async_trait>( &'life0 mut self, register_ts: Option<Self::Timestamp>, collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)> ) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait; fn check_alter_collection( &mut self, collections: &BTreeMap<GlobalId, IngestionDescription> ) -> Result<(), StorageError<Self::Timestamp>>; fn alter_collection<'life0, 'async_trait>( &'life0 mut self, collections: BTreeMap<GlobalId, IngestionDescription> ) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait; fn export( &self, id: GlobalId ) -> Result<&ExportState<Self::Timestamp>, StorageError<Self::Timestamp>>; fn export_mut( &mut self, id: GlobalId ) -> Result<&mut ExportState<Self::Timestamp>, StorageError<Self::Timestamp>>; fn create_exports<'life0, 'async_trait>( &'life0 mut self, exports: Vec<(GlobalId, ExportDescription<Self::Timestamp>)> ) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait; fn update_export_connection<'life0, 'async_trait>( &'life0 mut self, exports: BTreeMap<GlobalId, StorageSinkConnection> ) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait; fn drop_sources( &mut self, identifiers: Vec<GlobalId> ) -> Result<(), StorageError<Self::Timestamp>>; fn drop_sinks( &mut self, identifiers: Vec<GlobalId> ) -> Result<(), StorageError<Self::Timestamp>>; fn drop_sinks_unvalidated(&mut self, identifiers: Vec<GlobalId>); fn drop_sources_unvalidated(&mut self, identifiers: Vec<GlobalId>); fn append_table( &mut self, write_ts: Self::Timestamp, advance_to: Self::Timestamp, commands: Vec<(GlobalId, Vec<TimestamplessUpdate>)> ) -> Result<Receiver<Result<(), StorageError<Self::Timestamp>>>, StorageError<Self::Timestamp>>; fn monotonic_appender( &self, id: GlobalId ) -> Result<MonotonicAppender<Self::Timestamp>, StorageError<Self::Timestamp>>; fn webhook_statistics( &self, id: GlobalId ) -> Result<Arc<WebhookStatistics>, StorageError<Self::Timestamp>>; fn snapshot<'life0, 'async_trait>( &'life0 mut self, id: GlobalId, as_of: Self::Timestamp ) -> Pin<Box<dyn Future<Output = Result<Vec<(Row, Diff)>, StorageError<Self::Timestamp>>> + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait; fn snapshot_cursor<'life0, 'async_trait>( &'life0 mut self, id: GlobalId, as_of: Self::Timestamp ) -> Pin<Box<dyn Future<Output = Result<SnapshotCursor<Self::Timestamp>, StorageError<Self::Timestamp>>> + 'async_trait>> where Self::Timestamp: Codec64 + Timestamp + Lattice, Self: 'async_trait, 'life0: 'async_trait; fn snapshot_stats<'life0, 'async_trait>( &'life0 self, id: GlobalId, as_of: Antichain<Self::Timestamp> ) -> Pin<Box<dyn Future<Output = Result<SnapshotStats, StorageError<Self::Timestamp>>> + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait; fn snapshot_parts_stats<'life0, 'async_trait>( &'life0 self, id: GlobalId, as_of: Antichain<Self::Timestamp> ) -> Pin<Box<dyn Future<Output = Result<SnapshotPartsStats, StorageError<Self::Timestamp>>> + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait; fn set_read_policy( &mut self, policies: Vec<(GlobalId, ReadPolicy<Self::Timestamp>)> ); fn update_write_frontiers( &mut self, updates: &[(GlobalId, Antichain<Self::Timestamp>)] ); fn update_read_capabilities( &mut self, updates: &mut BTreeMap<GlobalId, ChangeBatch<Self::Timestamp>> ); fn ready<'life0, 'async_trait>( &'life0 mut self ) -> Pin<Box<dyn Future<Output = ()> + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait; fn process<'life0, 'async_trait>( &'life0 mut self ) -> Pin<Box<dyn Future<Output = Result<Option<Response<Self::Timestamp>>, Error>> + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait; fn reconcile_state<'life0, 'async_trait>( &'life0 mut self ) -> Pin<Box<dyn Future<Output = ()> + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait; fn inspect_persist_state<'life0, 'async_trait>( &'life0 self, id: GlobalId ) -> Pin<Box<dyn Future<Output = Result<Value, Error>> + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait; fn record_frontiers<'life0, 'async_trait>( &'life0 mut self, external_frontiers: BTreeMap<GlobalId, (Antichain<Self::Timestamp>, Antichain<Self::Timestamp>)> ) -> Pin<Box<dyn Future<Output = ()> + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait; fn record_replica_frontiers<'life0, 'async_trait>( &'life0 mut self, external_frontiers: BTreeMap<(GlobalId, ReplicaId), Antichain<Self::Timestamp>> ) -> Pin<Box<dyn Future<Output = ()> + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait; fn record_introspection_updates<'life0, 'async_trait>( &'life0 mut self, type_: IntrospectionType, updates: Vec<(Row, Diff)> ) -> Pin<Box<dyn Future<Output = ()> + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait; fn init_txns<'life0, 'async_trait>( &'life0 mut self, init_ts: Self::Timestamp ) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait;
}

Required Associated Types§

Required Methods§

source

fn initialization_complete(&mut self)

Marks the end of any initialization commands.

The implementor may wait for this method to be called before implementing prior commands, and so it is important for a user to invoke this method as soon as it is comfortable. This method can be invoked immediately, at the potential expense of performance.

source

fn update_parameters(&mut self, config_params: StorageParameters)

Update storage configuration with new parameters.

source

fn config(&self) -> &StorageConfiguration

Get the current configuration, including parameters updated with update_parameters.

source

fn collection( &self, id: GlobalId ) -> Result<&CollectionState<Self::Timestamp>, StorageError<Self::Timestamp>>

Acquire an immutable reference to the collection state, should it exist.

source

fn create_instance(&mut self, id: StorageInstanceId)

Creates a storage instance with the specified ID.

A storage instance can have zero or one replicas. The instance is created with zero replicas.

Panics if a storage instance with the given ID already exists.

source

fn drop_instance(&mut self, id: StorageInstanceId)

Drops the storage instance with the given ID.

If you call this method while the storage instance has a replica attached, that replica will be leaked. Call drop_replica first.

Panics if a storage instance with the given ID does not exist.

source

fn connect_replica( &mut self, instance_id: StorageInstanceId, replica_id: ReplicaId, location: ClusterReplicaLocation )

Connects the storage instance to the specified replica.

If the storage instance is already attached to a replica, communication with that replica is severed in favor of the new replica.

In the future, this API will be adjusted to support active replication of storage instances (i.e., multiple replicas attached to a given storage instance).

source

fn drop_replica( &mut self, instance_id: StorageInstanceId, replica_id: ReplicaId )

Disconnects the storage instance from the specified replica.

source

fn collection_mut( &mut self, id: GlobalId ) -> Result<&mut CollectionState<Self::Timestamp>, StorageError<Self::Timestamp>>

Acquire a mutable reference to the collection state, should it exist.

source

fn collections( &self ) -> Box<dyn Iterator<Item = (&GlobalId, &CollectionState<Self::Timestamp>)> + '_>

Acquire an iterator over all collection states.

source

fn migrate_collections<'life0, 'async_trait>( &'life0 mut self, collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)> ) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Migrate any storage controller state from previous versions to this version’s expectations.

This function must “see” the GlobalId of every collection you plan to create, but can be called with all of the catalog’s collections at once.

source

fn create_collections<'life0, 'async_trait>( &'life0 mut self, register_ts: Option<Self::Timestamp>, collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)> ) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Create the sources described in the individual RunIngestionCommand commands.

Each command carries the source id, the source description, and any associated metadata needed to ingest the particular source.

This command installs collection state for the indicated sources, and the are now valid to use in queries at times beyond the initial since frontiers. Each collection also acquires a read capability at this frontier, which will need to be repeatedly downgraded with allow_compaction() to permit compaction.

This method is NOT idempotent; It can fail between processing of different collections and leave the controller in an inconsistent state. It is almost always wrong to do anything but abort the process on Err.

The register_ts is used as the initial timestamp that tables are available for reads. (We might later give non-tables the same treatment, but hold off on that initially.) Callers must provide a Some if any of the collections is a table. A None may be given if none of the collections are a table (i.e. all materialized views, sources, etc).

source

fn check_alter_collection( &mut self, collections: &BTreeMap<GlobalId, IngestionDescription> ) -> Result<(), StorageError<Self::Timestamp>>

Check that the collection associated with id can be altered to represent the given ingestion.

Note that this check is optimistic and its return of Ok(()) does not guarantee that subsequent calls to alter_collection are guaranteed to succeed.

source

fn alter_collection<'life0, 'async_trait>( &'life0 mut self, collections: BTreeMap<GlobalId, IngestionDescription> ) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Alter the identified collection to use the described ingestion.

source

fn export( &self, id: GlobalId ) -> Result<&ExportState<Self::Timestamp>, StorageError<Self::Timestamp>>

Acquire an immutable reference to the export state, should it exist.

source

fn export_mut( &mut self, id: GlobalId ) -> Result<&mut ExportState<Self::Timestamp>, StorageError<Self::Timestamp>>

Acquire a mutable reference to the export state, should it exist.

source

fn create_exports<'life0, 'async_trait>( &'life0 mut self, exports: Vec<(GlobalId, ExportDescription<Self::Timestamp>)> ) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Create the sinks described by the ExportDescription.

source

fn update_export_connection<'life0, 'async_trait>( &'life0 mut self, exports: BTreeMap<GlobalId, StorageSinkConnection> ) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

For each identified export, update its StorageSinkConnection.

source

fn drop_sources( &mut self, identifiers: Vec<GlobalId> ) -> Result<(), StorageError<Self::Timestamp>>

Drops the read capability for the sources and allows their resources to be reclaimed.

source

fn drop_sinks( &mut self, identifiers: Vec<GlobalId> ) -> Result<(), StorageError<Self::Timestamp>>

Drops the read capability for the sinks and allows their resources to be reclaimed.

source

fn drop_sinks_unvalidated(&mut self, identifiers: Vec<GlobalId>)

Drops the read capability for the sinks and allows their resources to be reclaimed.

TODO(jkosh44): This method does not validate the provided identifiers. Currently when the controller starts/restarts it has no durable state. That means that it has no way of remembering any past commands sent. In the future we plan on persisting state for the controller so that it is aware of past commands. Therefore this method is for dropping sinks that we know to have been previously created, but have been forgotten by the controller due to a restart. Once command history becomes durable we can remove this method and use the normal drop_sinks.

source

fn drop_sources_unvalidated(&mut self, identifiers: Vec<GlobalId>)

Drops the read capability for the sources and allows their resources to be reclaimed.

TODO(jkosh44): This method does not validate the provided identifiers. Currently when the controller starts/restarts it has no durable state. That means that it has no way of remembering any past commands sent. In the future we plan on persisting state for the controller so that it is aware of past commands. Therefore this method is for dropping sources that we know to have been previously created, but have been forgotten by the controller due to a restart. Once command history becomes durable we can remove this method and use the normal drop_sources.

source

fn append_table( &mut self, write_ts: Self::Timestamp, advance_to: Self::Timestamp, commands: Vec<(GlobalId, Vec<TimestamplessUpdate>)> ) -> Result<Receiver<Result<(), StorageError<Self::Timestamp>>>, StorageError<Self::Timestamp>>

Append updates into the local input named id and advance its upper to upper.

The method returns a oneshot that can be awaited to indicate completion of the write. The method may return an error, indicating an immediately visible error, and also the oneshot may return an error if one is encountered during the write.

source

fn monotonic_appender( &self, id: GlobalId ) -> Result<MonotonicAppender<Self::Timestamp>, StorageError<Self::Timestamp>>

Returns a MonotonicAppender which is a channel that can be used to monotonically append to the specified GlobalId.

source

fn webhook_statistics( &self, id: GlobalId ) -> Result<Arc<WebhookStatistics>, StorageError<Self::Timestamp>>

Returns a shared WebhookStatistics which can be used to report user-facing statistics for this given webhhook, specified by the GlobalId.

source

fn snapshot<'life0, 'async_trait>( &'life0 mut self, id: GlobalId, as_of: Self::Timestamp ) -> Pin<Box<dyn Future<Output = Result<Vec<(Row, Diff)>, StorageError<Self::Timestamp>>> + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Returns the snapshot of the contents of the local input named id at as_of.

source

fn snapshot_cursor<'life0, 'async_trait>( &'life0 mut self, id: GlobalId, as_of: Self::Timestamp ) -> Pin<Box<dyn Future<Output = Result<SnapshotCursor<Self::Timestamp>, StorageError<Self::Timestamp>>> + 'async_trait>>
where Self::Timestamp: Codec64 + Timestamp + Lattice, Self: 'async_trait, 'life0: 'async_trait,

Returns the snapshot of the contents of the local input named id at as_of.

source

fn snapshot_stats<'life0, 'async_trait>( &'life0 self, id: GlobalId, as_of: Antichain<Self::Timestamp> ) -> Pin<Box<dyn Future<Output = Result<SnapshotStats, StorageError<Self::Timestamp>>> + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Returns aggregate statistics about the contents of the local input named id at as_of.

source

fn snapshot_parts_stats<'life0, 'async_trait>( &'life0 self, id: GlobalId, as_of: Antichain<Self::Timestamp> ) -> Pin<Box<dyn Future<Output = Result<SnapshotPartsStats, StorageError<Self::Timestamp>>> + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Returns aggregate statistics about the contents of the local input named id at as_of.

source

fn set_read_policy( &mut self, policies: Vec<(GlobalId, ReadPolicy<Self::Timestamp>)> )

Assigns a read policy to specific identifiers.

The policies are assigned in the order presented, and repeated identifiers should conclude with the last policy. Changing a policy will immediately downgrade the read capability if appropriate, but it will not “recover” the read capability if the prior capability is already ahead of it.

The StorageController may include its own overrides on these policies.

Identifiers not present in policies retain their existing read policies.

source

fn update_write_frontiers( &mut self, updates: &[(GlobalId, Antichain<Self::Timestamp>)] )

Ingests write frontier updates for collections that this controller maintains and potentially generates updates to read capabilities, which are passed on to StorageController::update_read_capabilities.

These updates come from the entity that is responsible for writing to the collection, and in turn advancing its upper (aka write_frontier). The most common such “writers” are:

  • clusterd instances, for source ingestions

  • introspection collections (which this controller writes to)

  • Tables (which are written to by this controller)

  • Materialized Views, which are running inside COMPUTE, and for which COMPUTE sends updates to this storage controller

The so-called “implied capability” is a read capability for a collection that is updated based on the write frontier and the collections ReadPolicy. Advancing the write frontier might change this implied capability, which in turn might change the overall since (a combination of all read capabilities) of a collection.

source

fn update_read_capabilities( &mut self, updates: &mut BTreeMap<GlobalId, ChangeBatch<Self::Timestamp>> )

Applies updates and sends any appropriate compaction command.

source

fn ready<'life0, 'async_trait>( &'life0 mut self ) -> Pin<Box<dyn Future<Output = ()> + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Waits until the controller is ready to process a response.

This method may block for an arbitrarily long time.

When the method returns, the owner should call StorageController::process to process the ready message.

This method is cancellation safe.

source

fn process<'life0, 'async_trait>( &'life0 mut self ) -> Pin<Box<dyn Future<Output = Result<Option<Response<Self::Timestamp>>, Error>> + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Processes the work queued by StorageController::ready.

This method is guaranteed to return “quickly” unless doing so would compromise the correctness of the system.

This method is not guaranteed to be cancellation safe. It must be awaited to completion.

source

fn reconcile_state<'life0, 'async_trait>( &'life0 mut self ) -> Pin<Box<dyn Future<Output = ()> + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Signal to the controller that the adapter has populated all of its initial state and the controller can reconcile (i.e. drop) any unclaimed resources.

source

fn inspect_persist_state<'life0, 'async_trait>( &'life0 self, id: GlobalId ) -> Pin<Box<dyn Future<Output = Result<Value, Error>> + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Exposes the internal state of the data shard for debugging and QA.

We’ll be thoughtful about making unnecessary changes, but the output of this method needs to be gated from users, so that it’s not subject to our backward compatibility guarantees.

TODO: Ideally this would return impl Serialize so the caller can do with it what they like, but that doesn’t work in traits yet. The workaround (an associated type) doesn’t work because persist doesn’t want to make the type public. In the meantime, move the serde_json call from the single user into this method.

source

fn record_frontiers<'life0, 'async_trait>( &'life0 mut self, external_frontiers: BTreeMap<GlobalId, (Antichain<Self::Timestamp>, Antichain<Self::Timestamp>)> ) -> Pin<Box<dyn Future<Output = ()> + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Records the current read and write frontiers of all known storage objects.

The provided external_frontiers are merged with the frontiers known to the storage controller. If external_frontiers contains entries with object IDs that are known to storage controller, the storage controller’s frontiers take precedence. The rationale is that the storage controller should be the authority on frontiers of storage objects, not the caller of this method.

source

fn record_replica_frontiers<'life0, 'async_trait>( &'life0 mut self, external_frontiers: BTreeMap<(GlobalId, ReplicaId), Antichain<Self::Timestamp>> ) -> Pin<Box<dyn Future<Output = ()> + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Records the current per-replica write frontiers of all known storage objects.

The provided external_frontiers are merged with the frontiers known to the storage controller. If external_frontiers contains entries with object IDs that are known to storage controller, the storage controller’s frontiers take precedence. The rationale is that the storage controller should be the authority on frontiers of storage objects, not the caller of this method.

source

fn record_introspection_updates<'life0, 'async_trait>( &'life0 mut self, type_: IntrospectionType, updates: Vec<(Row, Diff)> ) -> Pin<Box<dyn Future<Output = ()> + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Records updates for the given introspection type.

Rows passed in updates MUST have the correct schema for the given introspection type, as readers rely on this and might panic otherwise.

source

fn init_txns<'life0, 'async_trait>( &'life0 mut self, init_ts: Self::Timestamp ) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Resets the txns system to a set of invariants necessary for correctness.

Must be called on boot before create_collections or the various appends. This is true regardless of whether the persist-txn feature is on or not. See the big comment in the impl of the method for details. Ideally, this would have just been folded into Controller::new, but it needs the timestamp and there are boot dependency issues.

TODO: This can be removed once we’ve flipped to the new txns system for good and there is no possibility of the old code running concurrently with the new code.

Implementors§