pub trait StorageController: Debug {
type Timestamp: TimelyTimestamp;
Show 51 methods
// Required methods
fn initialization_complete(&mut self);
fn update_parameters(&mut self, config_params: StorageParameters);
fn config(&self) -> &StorageConfiguration;
fn collection_metadata(
&self,
id: GlobalId,
) -> Result<CollectionMetadata, StorageError<Self::Timestamp>>;
fn collection_hydrated(
&self,
collection_id: GlobalId,
) -> Result<bool, StorageError<Self::Timestamp>>;
fn collection_frontiers(
&self,
id: GlobalId,
) -> Result<(Antichain<Self::Timestamp>, Antichain<Self::Timestamp>), StorageError<Self::Timestamp>>;
fn collections_frontiers(
&self,
id: Vec<GlobalId>,
) -> Result<Vec<(GlobalId, Antichain<Self::Timestamp>, Antichain<Self::Timestamp>)>, StorageError<Self::Timestamp>>;
fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)>;
fn active_ingestions(
&self,
instance_id: StorageInstanceId,
) -> &BTreeSet<GlobalId>;
fn check_exists(
&self,
id: GlobalId,
) -> Result<(), StorageError<Self::Timestamp>>;
fn create_instance(&mut self, id: StorageInstanceId);
fn drop_instance(&mut self, id: StorageInstanceId);
fn connect_replica(
&mut self,
instance_id: StorageInstanceId,
replica_id: ReplicaId,
location: ClusterReplicaLocation,
);
fn drop_replica(
&mut self,
instance_id: StorageInstanceId,
replica_id: ReplicaId,
);
fn create_collections_for_bootstrap<'life0, 'life1, 'life2, 'async_trait>(
&'life0 mut self,
storage_metadata: &'life1 StorageMetadata,
register_ts: Option<Self::Timestamp>,
collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
migrated_storage_collections: &'life2 BTreeSet<GlobalId>,
) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait;
fn check_alter_ingestion_source_desc(
&mut self,
ingestion_id: GlobalId,
source_desc: &SourceDesc,
) -> Result<(), StorageError<Self::Timestamp>>;
fn alter_ingestion_source_desc<'life0, 'async_trait>(
&'life0 mut self,
ingestion_id: GlobalId,
source_desc: SourceDesc,
) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait;
fn alter_ingestion_connections<'life0, 'async_trait>(
&'life0 mut self,
source_connections: BTreeMap<GlobalId, GenericSourceConnection<InlinedConnection>>,
) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait;
fn alter_ingestion_export_data_configs<'life0, 'async_trait>(
&'life0 mut self,
source_exports: BTreeMap<GlobalId, SourceExportDataConfig>,
) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait;
fn alter_table_desc<'life0, 'async_trait>(
&'life0 mut self,
table_id: GlobalId,
new_desc: RelationDesc,
expected_schema: SchemaId,
forget_ts: Self::Timestamp,
register_ts: Self::Timestamp,
) -> Pin<Box<dyn Future<Output = Result<SchemaId, StorageError<Self::Timestamp>>> + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait;
fn export(
&self,
id: GlobalId,
) -> Result<&ExportState<Self::Timestamp>, StorageError<Self::Timestamp>>;
fn export_mut(
&mut self,
id: GlobalId,
) -> Result<&mut ExportState<Self::Timestamp>, StorageError<Self::Timestamp>>;
fn create_exports<'life0, 'async_trait>(
&'life0 mut self,
exports: Vec<(GlobalId, ExportDescription<Self::Timestamp>)>,
) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait;
fn alter_export<'life0, 'async_trait>(
&'life0 mut self,
id: GlobalId,
export: ExportDescription<Self::Timestamp>,
) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait;
fn alter_export_connections<'life0, 'async_trait>(
&'life0 mut self,
exports: BTreeMap<GlobalId, StorageSinkConnection>,
) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait;
fn drop_tables(
&mut self,
storage_metadata: &StorageMetadata,
identifiers: Vec<GlobalId>,
ts: Self::Timestamp,
) -> Result<(), StorageError<Self::Timestamp>>;
fn drop_sources(
&mut self,
storage_metadata: &StorageMetadata,
identifiers: Vec<GlobalId>,
) -> Result<(), StorageError<Self::Timestamp>>;
fn drop_sinks(
&mut self,
identifiers: Vec<GlobalId>,
) -> Result<(), StorageError<Self::Timestamp>>;
fn drop_sinks_unvalidated(&mut self, identifiers: Vec<GlobalId>);
fn drop_sources_unvalidated(
&mut self,
storage_metadata: &StorageMetadata,
identifiers: Vec<GlobalId>,
) -> Result<(), StorageError<Self::Timestamp>>;
fn append_table(
&mut self,
write_ts: Self::Timestamp,
advance_to: Self::Timestamp,
commands: Vec<(GlobalId, Vec<TimestamplessUpdate>)>,
) -> Result<Receiver<Result<(), StorageError<Self::Timestamp>>>, StorageError<Self::Timestamp>>;
fn monotonic_appender(
&self,
id: GlobalId,
) -> Result<MonotonicAppender<Self::Timestamp>, StorageError<Self::Timestamp>>;
fn webhook_statistics(
&self,
id: GlobalId,
) -> Result<Arc<WebhookStatistics>, StorageError<Self::Timestamp>>;
fn snapshot(
&self,
id: GlobalId,
as_of: Self::Timestamp,
) -> BoxFuture<Result<Vec<(Row, Diff)>, StorageError<Self::Timestamp>>>;
fn snapshot_latest<'life0, 'async_trait>(
&'life0 self,
id: GlobalId,
) -> Pin<Box<dyn Future<Output = Result<Vec<Row>, StorageError<Self::Timestamp>>> + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait;
fn snapshot_cursor<'life0, 'async_trait>(
&'life0 mut self,
id: GlobalId,
as_of: Self::Timestamp,
) -> Pin<Box<dyn Future<Output = Result<SnapshotCursor<Self::Timestamp>, StorageError<Self::Timestamp>>> + 'async_trait>>
where Self::Timestamp: Codec64 + Timestamp + Lattice,
Self: 'async_trait,
'life0: 'async_trait;
fn snapshot_stats<'life0, 'async_trait>(
&'life0 self,
id: GlobalId,
as_of: Antichain<Self::Timestamp>,
) -> Pin<Box<dyn Future<Output = Result<SnapshotStats, StorageError<Self::Timestamp>>> + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait;
fn snapshot_parts_stats<'life0, 'async_trait>(
&'life0 self,
id: GlobalId,
as_of: Antichain<Self::Timestamp>,
) -> Pin<Box<dyn Future<Output = BoxFuture<Result<SnapshotPartsStats, StorageError<Self::Timestamp>>>> + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait;
fn set_read_policy(
&mut self,
policies: Vec<(GlobalId, ReadPolicy<Self::Timestamp>)>,
);
fn acquire_read_holds(
&self,
desired_holds: Vec<GlobalId>,
) -> Result<Vec<ReadHold<Self::Timestamp>>, ReadHoldError>;
fn ready<'life0, 'async_trait>(
&'life0 mut self,
) -> Pin<Box<dyn Future<Output = ()> + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait;
fn process(
&mut self,
storage_metadata: &StorageMetadata,
) -> Result<Option<Response<Self::Timestamp>>, Error>;
fn inspect_persist_state<'life0, 'async_trait>(
&'life0 self,
id: GlobalId,
) -> Pin<Box<dyn Future<Output = Result<Value, Error>> + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait;
fn append_introspection_updates(
&mut self,
type_: IntrospectionType,
updates: Vec<(Row, Diff)>,
);
fn append_status_introspection_updates(
&mut self,
type_: IntrospectionType,
updates: Vec<StatusUpdate>,
);
fn update_introspection_collection(
&mut self,
type_: IntrospectionType,
op: StorageWriteOp,
);
fn initialize_state<'life0, 'life1, 'async_trait>(
&'life0 mut self,
txn: &'life1 mut (dyn StorageTxn<Self::Timestamp> + Send),
init_ids: BTreeSet<GlobalId>,
drop_ids: BTreeSet<GlobalId>,
) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait;
fn prepare_state<'life0, 'life1, 'async_trait>(
&'life0 self,
txn: &'life1 mut (dyn StorageTxn<Self::Timestamp> + Send),
ids_to_add: BTreeSet<GlobalId>,
ids_to_drop: BTreeSet<GlobalId>,
) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait;
fn real_time_recent_timestamp<'life0, 'async_trait>(
&'life0 self,
source_ids: BTreeSet<GlobalId>,
timeout: Duration,
) -> Pin<Box<dyn Future<Output = Result<BoxFuture<Result<Self::Timestamp, StorageError<Self::Timestamp>>>, StorageError<Self::Timestamp>>> + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait;
// Provided methods
fn create_collections<'life0, 'life1, 'async_trait>(
&'life0 mut self,
storage_metadata: &'life1 StorageMetadata,
register_ts: Option<Self::Timestamp>,
collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait { ... }
fn acquire_read_hold(
&mut self,
id: GlobalId,
) -> Result<ReadHold<Self::Timestamp>, ReadHoldError> { ... }
}
Required Associated Types§
Required Methods§
sourcefn initialization_complete(&mut self)
fn initialization_complete(&mut self)
Marks the end of any initialization commands.
The implementor may wait for this method to be called before implementing prior commands, and so it is important for a user to invoke this method as soon as it is comfortable. This method can be invoked immediately, at the potential expense of performance.
sourcefn update_parameters(&mut self, config_params: StorageParameters)
fn update_parameters(&mut self, config_params: StorageParameters)
Update storage configuration with new parameters.
sourcefn config(&self) -> &StorageConfiguration
fn config(&self) -> &StorageConfiguration
Get the current configuration, including parameters updated with update_parameters
.
sourcefn collection_metadata(
&self,
id: GlobalId,
) -> Result<CollectionMetadata, StorageError<Self::Timestamp>>
fn collection_metadata( &self, id: GlobalId, ) -> Result<CollectionMetadata, StorageError<Self::Timestamp>>
Returns the CollectionMetadata of the collection identified by id
.
sourcefn collection_hydrated(
&self,
collection_id: GlobalId,
) -> Result<bool, StorageError<Self::Timestamp>>
fn collection_hydrated( &self, collection_id: GlobalId, ) -> Result<bool, StorageError<Self::Timestamp>>
Returns true
iff the given collection/ingestion has been hydrated.
For this check, zero-replica clusters are always considered hydrated. Their collections would never normally be considered hydrated but it’s clearly intentional that they have no replicas.
sourcefn collection_frontiers(
&self,
id: GlobalId,
) -> Result<(Antichain<Self::Timestamp>, Antichain<Self::Timestamp>), StorageError<Self::Timestamp>>
fn collection_frontiers( &self, id: GlobalId, ) -> Result<(Antichain<Self::Timestamp>, Antichain<Self::Timestamp>), StorageError<Self::Timestamp>>
Returns the since/upper frontiers of the identified collection.
sourcefn collections_frontiers(
&self,
id: Vec<GlobalId>,
) -> Result<Vec<(GlobalId, Antichain<Self::Timestamp>, Antichain<Self::Timestamp>)>, StorageError<Self::Timestamp>>
fn collections_frontiers( &self, id: Vec<GlobalId>, ) -> Result<Vec<(GlobalId, Antichain<Self::Timestamp>, Antichain<Self::Timestamp>)>, StorageError<Self::Timestamp>>
Returns the since/upper frontiers of the identified collections.
Having a method that returns both frontiers at the same time, for all requested collections, ensures that we can get a consistent “snapshot” of collection state. If we had separate methods instead, and/or would allow getting frontiers for collections one at a time, it could happen that collection state changes concurrently, while information is gathered.
sourcefn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)>
fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)>
Acquire an iterator over CollectionMetadata for all active collections.
A collection is “active” when it has a non empty frontier of read capabilties.
sourcefn active_ingestions(
&self,
instance_id: StorageInstanceId,
) -> &BTreeSet<GlobalId>
fn active_ingestions( &self, instance_id: StorageInstanceId, ) -> &BTreeSet<GlobalId>
Returns the IDs of all active ingestions for the given storage instance.
sourcefn check_exists(
&self,
id: GlobalId,
) -> Result<(), StorageError<Self::Timestamp>>
fn check_exists( &self, id: GlobalId, ) -> Result<(), StorageError<Self::Timestamp>>
Checks whether a collection exists under the given GlobalId
. Returns
an error if the collection does not exist.
sourcefn create_instance(&mut self, id: StorageInstanceId)
fn create_instance(&mut self, id: StorageInstanceId)
Creates a storage instance with the specified ID.
A storage instance can have zero or one replicas. The instance is created with zero replicas.
Panics if a storage instance with the given ID already exists.
sourcefn drop_instance(&mut self, id: StorageInstanceId)
fn drop_instance(&mut self, id: StorageInstanceId)
Drops the storage instance with the given ID.
If you call this method while the storage instance has a replica
attached, that replica will be leaked. Call drop_replica
first.
Panics if a storage instance with the given ID does not exist.
sourcefn connect_replica(
&mut self,
instance_id: StorageInstanceId,
replica_id: ReplicaId,
location: ClusterReplicaLocation,
)
fn connect_replica( &mut self, instance_id: StorageInstanceId, replica_id: ReplicaId, location: ClusterReplicaLocation, )
Connects the storage instance to the specified replica.
If the storage instance is already attached to a replica, communication with that replica is severed in favor of the new replica.
In the future, this API will be adjusted to support active replication of storage instances (i.e., multiple replicas attached to a given storage instance).
sourcefn drop_replica(
&mut self,
instance_id: StorageInstanceId,
replica_id: ReplicaId,
)
fn drop_replica( &mut self, instance_id: StorageInstanceId, replica_id: ReplicaId, )
Disconnects the storage instance from the specified replica.
sourcefn create_collections_for_bootstrap<'life0, 'life1, 'life2, 'async_trait>(
&'life0 mut self,
storage_metadata: &'life1 StorageMetadata,
register_ts: Option<Self::Timestamp>,
collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
migrated_storage_collections: &'life2 BTreeSet<GlobalId>,
) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
fn create_collections_for_bootstrap<'life0, 'life1, 'life2, 'async_trait>(
&'life0 mut self,
storage_metadata: &'life1 StorageMetadata,
register_ts: Option<Self::Timestamp>,
collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
migrated_storage_collections: &'life2 BTreeSet<GlobalId>,
) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
Like Self::create_collections
, except used specifically for bootstrap.
migrated_storage_collections
is a set of migrated storage collections to be excluded
from the txn-wal sub-system.
sourcefn check_alter_ingestion_source_desc(
&mut self,
ingestion_id: GlobalId,
source_desc: &SourceDesc,
) -> Result<(), StorageError<Self::Timestamp>>
fn check_alter_ingestion_source_desc( &mut self, ingestion_id: GlobalId, source_desc: &SourceDesc, ) -> Result<(), StorageError<Self::Timestamp>>
Check that the ingestion associated with id
can use the provided
SourceDesc
.
Note that this check is optimistic and its return of Ok(())
does not
guarantee that subsequent calls to alter_ingestion_source_desc
are
guaranteed to succeed.
sourcefn alter_ingestion_source_desc<'life0, 'async_trait>(
&'life0 mut self,
ingestion_id: GlobalId,
source_desc: SourceDesc,
) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn alter_ingestion_source_desc<'life0, 'async_trait>(
&'life0 mut self,
ingestion_id: GlobalId,
source_desc: SourceDesc,
) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Alters the identified collection to use the provided SourceDesc
.
sourcefn alter_ingestion_connections<'life0, 'async_trait>(
&'life0 mut self,
source_connections: BTreeMap<GlobalId, GenericSourceConnection<InlinedConnection>>,
) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn alter_ingestion_connections<'life0, 'async_trait>(
&'life0 mut self,
source_connections: BTreeMap<GlobalId, GenericSourceConnection<InlinedConnection>>,
) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Alters each identified collection to use the correlated GenericSourceConnection
.
sourcefn alter_ingestion_export_data_configs<'life0, 'async_trait>(
&'life0 mut self,
source_exports: BTreeMap<GlobalId, SourceExportDataConfig>,
) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn alter_ingestion_export_data_configs<'life0, 'async_trait>(
&'life0 mut self,
source_exports: BTreeMap<GlobalId, SourceExportDataConfig>,
) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Alters the data config for the specified source exports of the specified ingestions.
fn alter_table_desc<'life0, 'async_trait>(
&'life0 mut self,
table_id: GlobalId,
new_desc: RelationDesc,
expected_schema: SchemaId,
forget_ts: Self::Timestamp,
register_ts: Self::Timestamp,
) -> Pin<Box<dyn Future<Output = Result<SchemaId, StorageError<Self::Timestamp>>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
sourcefn export(
&self,
id: GlobalId,
) -> Result<&ExportState<Self::Timestamp>, StorageError<Self::Timestamp>>
fn export( &self, id: GlobalId, ) -> Result<&ExportState<Self::Timestamp>, StorageError<Self::Timestamp>>
Acquire an immutable reference to the export state, should it exist.
sourcefn export_mut(
&mut self,
id: GlobalId,
) -> Result<&mut ExportState<Self::Timestamp>, StorageError<Self::Timestamp>>
fn export_mut( &mut self, id: GlobalId, ) -> Result<&mut ExportState<Self::Timestamp>, StorageError<Self::Timestamp>>
Acquire a mutable reference to the export state, should it exist.
sourcefn create_exports<'life0, 'async_trait>(
&'life0 mut self,
exports: Vec<(GlobalId, ExportDescription<Self::Timestamp>)>,
) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn create_exports<'life0, 'async_trait>(
&'life0 mut self,
exports: Vec<(GlobalId, ExportDescription<Self::Timestamp>)>,
) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Create the sinks described by the ExportDescription
.
sourcefn alter_export<'life0, 'async_trait>(
&'life0 mut self,
id: GlobalId,
export: ExportDescription<Self::Timestamp>,
) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn alter_export<'life0, 'async_trait>(
&'life0 mut self,
id: GlobalId,
export: ExportDescription<Self::Timestamp>,
) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Alter the sink identified by the given id to match the provided ExportDescription
.
sourcefn alter_export_connections<'life0, 'async_trait>(
&'life0 mut self,
exports: BTreeMap<GlobalId, StorageSinkConnection>,
) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn alter_export_connections<'life0, 'async_trait>(
&'life0 mut self,
exports: BTreeMap<GlobalId, StorageSinkConnection>,
) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
For each identified export, alter its StorageSinkConnection
.
sourcefn drop_tables(
&mut self,
storage_metadata: &StorageMetadata,
identifiers: Vec<GlobalId>,
ts: Self::Timestamp,
) -> Result<(), StorageError<Self::Timestamp>>
fn drop_tables( &mut self, storage_metadata: &StorageMetadata, identifiers: Vec<GlobalId>, ts: Self::Timestamp, ) -> Result<(), StorageError<Self::Timestamp>>
Drops the read capability for the tables and allows their resources to be reclaimed.
sourcefn drop_sources(
&mut self,
storage_metadata: &StorageMetadata,
identifiers: Vec<GlobalId>,
) -> Result<(), StorageError<Self::Timestamp>>
fn drop_sources( &mut self, storage_metadata: &StorageMetadata, identifiers: Vec<GlobalId>, ) -> Result<(), StorageError<Self::Timestamp>>
Drops the read capability for the sources and allows their resources to be reclaimed.
sourcefn drop_sinks(
&mut self,
identifiers: Vec<GlobalId>,
) -> Result<(), StorageError<Self::Timestamp>>
fn drop_sinks( &mut self, identifiers: Vec<GlobalId>, ) -> Result<(), StorageError<Self::Timestamp>>
Drops the read capability for the sinks and allows their resources to be reclaimed.
sourcefn drop_sinks_unvalidated(&mut self, identifiers: Vec<GlobalId>)
fn drop_sinks_unvalidated(&mut self, identifiers: Vec<GlobalId>)
Drops the read capability for the sinks and allows their resources to be reclaimed.
TODO(jkosh44): This method does not validate the provided identifiers. Currently when the
controller starts/restarts it has no durable state. That means that it has no way of
remembering any past commands sent. In the future we plan on persisting state for the
controller so that it is aware of past commands.
Therefore this method is for dropping sinks that we know to have been previously
created, but have been forgotten by the controller due to a restart.
Once command history becomes durable we can remove this method and use the normal
drop_sinks
.
sourcefn drop_sources_unvalidated(
&mut self,
storage_metadata: &StorageMetadata,
identifiers: Vec<GlobalId>,
) -> Result<(), StorageError<Self::Timestamp>>
fn drop_sources_unvalidated( &mut self, storage_metadata: &StorageMetadata, identifiers: Vec<GlobalId>, ) -> Result<(), StorageError<Self::Timestamp>>
Drops the read capability for the sources and allows their resources to be reclaimed.
TODO(jkosh44): This method does not validate the provided identifiers. Currently when the
controller starts/restarts it has no durable state. That means that it has no way of
remembering any past commands sent. In the future we plan on persisting state for the
controller so that it is aware of past commands.
Therefore this method is for dropping sources that we know to have been previously
created, but have been forgotten by the controller due to a restart.
Once command history becomes durable we can remove this method and use the normal
drop_sources
.
sourcefn append_table(
&mut self,
write_ts: Self::Timestamp,
advance_to: Self::Timestamp,
commands: Vec<(GlobalId, Vec<TimestamplessUpdate>)>,
) -> Result<Receiver<Result<(), StorageError<Self::Timestamp>>>, StorageError<Self::Timestamp>>
fn append_table( &mut self, write_ts: Self::Timestamp, advance_to: Self::Timestamp, commands: Vec<(GlobalId, Vec<TimestamplessUpdate>)>, ) -> Result<Receiver<Result<(), StorageError<Self::Timestamp>>>, StorageError<Self::Timestamp>>
Append updates
into the local input named id
and advance its upper to upper
.
The method returns a oneshot that can be awaited to indicate completion of the write. The method may return an error, indicating an immediately visible error, and also the oneshot may return an error if one is encountered during the write.
All updates in commands
are applied atomically.
sourcefn monotonic_appender(
&self,
id: GlobalId,
) -> Result<MonotonicAppender<Self::Timestamp>, StorageError<Self::Timestamp>>
fn monotonic_appender( &self, id: GlobalId, ) -> Result<MonotonicAppender<Self::Timestamp>, StorageError<Self::Timestamp>>
Returns a MonotonicAppender
which is a channel that can be used to monotonically
append to the specified GlobalId
.
sourcefn webhook_statistics(
&self,
id: GlobalId,
) -> Result<Arc<WebhookStatistics>, StorageError<Self::Timestamp>>
fn webhook_statistics( &self, id: GlobalId, ) -> Result<Arc<WebhookStatistics>, StorageError<Self::Timestamp>>
Returns a shared WebhookStatistics
which can be used to report user-facing
statistics for this given webhhook, specified by the GlobalId
.
sourcefn snapshot(
&self,
id: GlobalId,
as_of: Self::Timestamp,
) -> BoxFuture<Result<Vec<(Row, Diff)>, StorageError<Self::Timestamp>>>
fn snapshot( &self, id: GlobalId, as_of: Self::Timestamp, ) -> BoxFuture<Result<Vec<(Row, Diff)>, StorageError<Self::Timestamp>>>
Returns the snapshot of the contents of the local input named id
at as_of
.
sourcefn snapshot_latest<'life0, 'async_trait>(
&'life0 self,
id: GlobalId,
) -> Pin<Box<dyn Future<Output = Result<Vec<Row>, StorageError<Self::Timestamp>>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn snapshot_latest<'life0, 'async_trait>(
&'life0 self,
id: GlobalId,
) -> Pin<Box<dyn Future<Output = Result<Vec<Row>, StorageError<Self::Timestamp>>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Returns the snapshot of the contents of the local input named id
at
the largest readable as_of
.
sourcefn snapshot_cursor<'life0, 'async_trait>(
&'life0 mut self,
id: GlobalId,
as_of: Self::Timestamp,
) -> Pin<Box<dyn Future<Output = Result<SnapshotCursor<Self::Timestamp>, StorageError<Self::Timestamp>>> + 'async_trait>>
fn snapshot_cursor<'life0, 'async_trait>( &'life0 mut self, id: GlobalId, as_of: Self::Timestamp, ) -> Pin<Box<dyn Future<Output = Result<SnapshotCursor<Self::Timestamp>, StorageError<Self::Timestamp>>> + 'async_trait>>
Returns the snapshot of the contents of the local input named id
at as_of
.
sourcefn snapshot_stats<'life0, 'async_trait>(
&'life0 self,
id: GlobalId,
as_of: Antichain<Self::Timestamp>,
) -> Pin<Box<dyn Future<Output = Result<SnapshotStats, StorageError<Self::Timestamp>>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn snapshot_stats<'life0, 'async_trait>(
&'life0 self,
id: GlobalId,
as_of: Antichain<Self::Timestamp>,
) -> Pin<Box<dyn Future<Output = Result<SnapshotStats, StorageError<Self::Timestamp>>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Returns aggregate statistics about the contents of the local input named
id
at as_of
.
sourcefn snapshot_parts_stats<'life0, 'async_trait>(
&'life0 self,
id: GlobalId,
as_of: Antichain<Self::Timestamp>,
) -> Pin<Box<dyn Future<Output = BoxFuture<Result<SnapshotPartsStats, StorageError<Self::Timestamp>>>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn snapshot_parts_stats<'life0, 'async_trait>(
&'life0 self,
id: GlobalId,
as_of: Antichain<Self::Timestamp>,
) -> Pin<Box<dyn Future<Output = BoxFuture<Result<SnapshotPartsStats, StorageError<Self::Timestamp>>>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Returns aggregate statistics about the contents of the local input named
id
at as_of
.
Note that this async function itself returns a future. We may need to block on the stats being available, but don’t want to hold a reference to the controller for too long… so the outer future holds a reference to the controller but returns quickly, and the inner future is slow but does not reference the controller.
sourcefn set_read_policy(
&mut self,
policies: Vec<(GlobalId, ReadPolicy<Self::Timestamp>)>,
)
fn set_read_policy( &mut self, policies: Vec<(GlobalId, ReadPolicy<Self::Timestamp>)>, )
Assigns a read policy to specific identifiers.
The policies are assigned in the order presented, and repeated identifiers should conclude with the last policy. Changing a policy will immediately downgrade the read capability if appropriate, but it will not “recover” the read capability if the prior capability is already ahead of it.
The StorageController
may include its own overrides on these policies.
Identifiers not present in policies
retain their existing read policies.
sourcefn acquire_read_holds(
&self,
desired_holds: Vec<GlobalId>,
) -> Result<Vec<ReadHold<Self::Timestamp>>, ReadHoldError>
fn acquire_read_holds( &self, desired_holds: Vec<GlobalId>, ) -> Result<Vec<ReadHold<Self::Timestamp>>, ReadHoldError>
Acquires and returns the desired read holds, advancing them to the since frontier when necessary.
sourcefn ready<'life0, 'async_trait>(
&'life0 mut self,
) -> Pin<Box<dyn Future<Output = ()> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn ready<'life0, 'async_trait>(
&'life0 mut self,
) -> Pin<Box<dyn Future<Output = ()> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Waits until the controller is ready to process a response.
This method may block for an arbitrarily long time.
When the method returns, the owner should call
StorageController::process
to process the ready message.
This method is cancellation safe.
sourcefn process(
&mut self,
storage_metadata: &StorageMetadata,
) -> Result<Option<Response<Self::Timestamp>>, Error>
fn process( &mut self, storage_metadata: &StorageMetadata, ) -> Result<Option<Response<Self::Timestamp>>, Error>
Processes the work queued by StorageController::ready
.
sourcefn inspect_persist_state<'life0, 'async_trait>(
&'life0 self,
id: GlobalId,
) -> Pin<Box<dyn Future<Output = Result<Value, Error>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn inspect_persist_state<'life0, 'async_trait>(
&'life0 self,
id: GlobalId,
) -> Pin<Box<dyn Future<Output = Result<Value, Error>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Exposes the internal state of the data shard for debugging and QA.
We’ll be thoughtful about making unnecessary changes, but the output of this method needs to be gated from users, so that it’s not subject to our backward compatibility guarantees.
TODO: Ideally this would return impl Serialize
so the caller can do
with it what they like, but that doesn’t work in traits yet. The
workaround (an associated type) doesn’t work because persist doesn’t
want to make the type public. In the meantime, move the serde_json
call from the single user into this method.
sourcefn append_introspection_updates(
&mut self,
type_: IntrospectionType,
updates: Vec<(Row, Diff)>,
)
fn append_introspection_updates( &mut self, type_: IntrospectionType, updates: Vec<(Row, Diff)>, )
Records append-only updates for the given introspection type.
Rows passed in updates
MUST have the correct schema for the given
introspection type, as readers rely on this and might panic otherwise.
sourcefn append_status_introspection_updates(
&mut self,
type_: IntrospectionType,
updates: Vec<StatusUpdate>,
)
fn append_status_introspection_updates( &mut self, type_: IntrospectionType, updates: Vec<StatusUpdate>, )
Records append-only status updates for the given introspection type.
sourcefn update_introspection_collection(
&mut self,
type_: IntrospectionType,
op: StorageWriteOp,
)
fn update_introspection_collection( &mut self, type_: IntrospectionType, op: StorageWriteOp, )
Updates the desired state of the given introspection type.
Rows passed in op
MUST have the correct schema for the given
introspection type, as readers rely on this and might panic otherwise.
sourcefn initialize_state<'life0, 'life1, 'async_trait>(
&'life0 mut self,
txn: &'life1 mut (dyn StorageTxn<Self::Timestamp> + Send),
init_ids: BTreeSet<GlobalId>,
drop_ids: BTreeSet<GlobalId>,
) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn initialize_state<'life0, 'life1, 'async_trait>(
&'life0 mut self,
txn: &'life1 mut (dyn StorageTxn<Self::Timestamp> + Send),
init_ids: BTreeSet<GlobalId>,
drop_ids: BTreeSet<GlobalId>,
) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
On boot, seed the controller’s metadata/state.
sourcefn prepare_state<'life0, 'life1, 'async_trait>(
&'life0 self,
txn: &'life1 mut (dyn StorageTxn<Self::Timestamp> + Send),
ids_to_add: BTreeSet<GlobalId>,
ids_to_drop: BTreeSet<GlobalId>,
) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn prepare_state<'life0, 'life1, 'async_trait>(
&'life0 self,
txn: &'life1 mut (dyn StorageTxn<Self::Timestamp> + Send),
ids_to_add: BTreeSet<GlobalId>,
ids_to_drop: BTreeSet<GlobalId>,
) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Update the implementor of StorageTxn
with the appropriate metadata
given the IDs to add and drop.
The data modified in the StorageTxn
must be made available in all
subsequent calls that require StorageMetadata
as a parameter.
fn real_time_recent_timestamp<'life0, 'async_trait>(
&'life0 self,
source_ids: BTreeSet<GlobalId>,
timeout: Duration,
) -> Pin<Box<dyn Future<Output = Result<BoxFuture<Result<Self::Timestamp, StorageError<Self::Timestamp>>>, StorageError<Self::Timestamp>>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Provided Methods§
sourcefn create_collections<'life0, 'life1, 'async_trait>(
&'life0 mut self,
storage_metadata: &'life1 StorageMetadata,
register_ts: Option<Self::Timestamp>,
collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn create_collections<'life0, 'life1, 'async_trait>(
&'life0 mut self,
storage_metadata: &'life1 StorageMetadata,
register_ts: Option<Self::Timestamp>,
collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Create the sources described in the individual RunIngestionCommand commands.
Each command carries the source id, the source description, and any associated metadata needed to ingest the particular source.
This command installs collection state for the indicated sources, and they are
now valid to use in queries at times beyond the initial since
frontiers. Each
collection also acquires a read capability at this frontier, which will need to
be repeatedly downgraded with allow_compaction()
to permit compaction.
This method is NOT idempotent; It can fail between processing of different
collections and leave the controller in an inconsistent state. It is almost
always wrong to do anything but abort the process on Err
.
The register_ts
is used as the initial timestamp that tables are available for reads. (We
might later give non-tables the same treatment, but hold off on that initially.) Callers
must provide a Some if any of the collections is a table. A None may be given if none of the
collections are a table (i.e. all materialized views, sources, etc).
sourcefn acquire_read_hold(
&mut self,
id: GlobalId,
) -> Result<ReadHold<Self::Timestamp>, ReadHoldError>
fn acquire_read_hold( &mut self, id: GlobalId, ) -> Result<ReadHold<Self::Timestamp>, ReadHoldError>
Acquires and returns the earliest legal read hold.