Struct mz_storage_controller::Controller

source ·
pub struct Controller<T: Timestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation> {
Show 33 fields pub(crate) build_info: &'static BuildInfo, pub(crate) now: NowFn, pub(crate) envd_epoch: NonZeroI64, pub(crate) read_only: bool, pub(crate) collections: BTreeMap<GlobalId, CollectionState<T>>, pub(crate) exports: BTreeMap<GlobalId, ExportState<T>>, pub(crate) persist_table_worker: PersistTableWriteWorker<T>, pub(crate) persist_monotonic_worker: PersistMonotonicWriteWorker<T>, pub(crate) txns_read: TxnsRead<T>, pub(crate) txns_metrics: Arc<Metrics>, pub(crate) stashed_response: Option<StorageResponse<T>>, pub(crate) pending_compaction_commands: Vec<PendingCompactionCommand<T>>, pub(crate) pending_table_handle_drops_tx: UnboundedSender<GlobalId>, pub(crate) pending_table_handle_drops_rx: UnboundedReceiver<GlobalId>, pub(crate) collection_manager: CollectionManager<T>, pub(crate) collection_status_manager: CollectionStatusManager<T>, pub(crate) introspection_ids: Arc<Mutex<BTreeMap<IntrospectionType, GlobalId>>>, pub(crate) introspection_tokens: BTreeMap<GlobalId, Box<dyn Any + Send + Sync>>, pub(crate) source_statistics: Arc<Mutex<SourceStatistics>>, pub(crate) sink_statistics: Arc<Mutex<BTreeMap<GlobalId, StatsState<SinkStatisticsUpdate>>>>, pub(crate) statistics_interval_sender: Sender<Duration>, pub(crate) clients: BTreeMap<StorageInstanceId, RehydratingStorageClient<T>>, pub(crate) replicas: BTreeMap<StorageInstanceId, ReplicaId>, pub(crate) initialized: bool, pub(crate) config: StorageConfiguration, pub(crate) internal_response_queue: UnboundedReceiver<StorageResponse<T>>, pub(crate) persist_location: PersistLocation, pub(crate) persist: Arc<PersistClientCache>, pub(crate) metrics: StorageControllerMetrics, pub(crate) internal_response_sender: UnboundedSender<StorageResponse<T>>, pub(crate) recorded_frontiers: BTreeMap<GlobalId, (Antichain<T>, Antichain<T>)>, pub(crate) recorded_replica_frontiers: BTreeMap<(GlobalId, ReplicaId), Antichain<T>>, pub(crate) storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
}
Expand description

A storage controller for a storage instance.

Fields§

§build_info: &'static BuildInfo

The build information for this process.

§now: NowFn

A function that returns the current time.

§envd_epoch: NonZeroI64

The fencing token for this instance of the controller.

§read_only: bool

Whether or not this controller is in read-only mode.

When in read-only mode, neither this controller nor the instances controlled by it are allowed to affect changes to external systems (largely persist).

§collections: BTreeMap<GlobalId, CollectionState<T>>

Collections maintained by the storage controller.

This collection only grows, although individual collections may be rendered unusable. This is to prevent the re-binding of identifiers to other descriptions.

§exports: BTreeMap<GlobalId, ExportState<T>>§persist_table_worker: PersistTableWriteWorker<T>

Write handle for table shards.

§persist_monotonic_worker: PersistMonotonicWriteWorker<T>

Write handle for monotonic shards.

§txns_read: TxnsRead<T>

A shared TxnsCache running in a task and communicated with over a channel.

§txns_metrics: Arc<Metrics>§stashed_response: Option<StorageResponse<T>>§pending_compaction_commands: Vec<PendingCompactionCommand<T>>

Compaction commands to send during the next call to StorageController::process.

§pending_table_handle_drops_tx: UnboundedSender<GlobalId>

Channel for sending table handle drops.

§pending_table_handle_drops_rx: UnboundedReceiver<GlobalId>

Channel for receiving table handle drops.

§collection_manager: CollectionManager<T>

Interface for managed collections

§collection_status_manager: CollectionStatusManager<T>

Facility for appending status updates for sources/sinks

§introspection_ids: Arc<Mutex<BTreeMap<IntrospectionType, GlobalId>>>

Tracks which collection is responsible for which IntrospectionType.

§introspection_tokens: BTreeMap<GlobalId, Box<dyn Any + Send + Sync>>

Tokens for tasks that drive updating introspection collections. Dropping this will make sure that any tasks (or other resources) will stop when needed.

§source_statistics: Arc<Mutex<SourceStatistics>>

Consolidated metrics updates to periodically write. We do not eagerly initialize this, and its contents are entirely driven by StorageResponse::StatisticsUpdates’s, as well as webhook statistics.

§sink_statistics: Arc<Mutex<BTreeMap<GlobalId, StatsState<SinkStatisticsUpdate>>>>

Consolidated metrics updates to periodically write. We do not eagerly initialize this, and its contents are entirely driven by StorageResponse::StatisticsUpdates’s.

§statistics_interval_sender: Sender<Duration>

A way to update the statistics interval in the statistics tasks.

§clients: BTreeMap<StorageInstanceId, RehydratingStorageClient<T>>

Clients for all known storage instances.

§replicas: BTreeMap<StorageInstanceId, ReplicaId>

For each storage instance the ID of its replica, if any.

§initialized: bool

Set to true once initialization_complete has been called.

§config: StorageConfiguration

Storage configuration to apply to newly provisioned instances, and use during purification.

§internal_response_queue: UnboundedReceiver<StorageResponse<T>>

Mechanism for returning frontier advancement for tables.

§persist_location: PersistLocation

The persist location where all storage collections are being written to

§persist: Arc<PersistClientCache>

A persist client used to write to storage collections

§metrics: StorageControllerMetrics

Metrics of the Storage controller

§internal_response_sender: UnboundedSender<StorageResponse<T>>

Mechanism for the storage controller to send itself feedback, potentially emulating the responses we expect from clusters.

Note: This is used for finalizing shards of webhook sources, once webhook sources are installed on a clusterd this can likely be refactored away.

§recorded_frontiers: BTreeMap<GlobalId, (Antichain<T>, Antichain<T>)>

(read, write) frontiers that have been recorded in the Frontiers collection, kept to be able to retract old rows.

§recorded_replica_frontiers: BTreeMap<(GlobalId, ReplicaId), Antichain<T>>

Write frontiers that have been recorded in the ReplicaFrontiers collection, kept to be able to retract old rows.

§storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>

Handle to a StorageCollections.

Implementations§

source§

impl<T> Controller<T>

source

pub async fn new( build_info: &'static BuildInfo, persist_location: PersistLocation, persist_clients: Arc<PersistClientCache>, now: NowFn, txns_metrics: Arc<TxnMetrics>, envd_epoch: NonZeroI64, read_only: bool, metrics_registry: MetricsRegistry, connection_context: ConnectionContext, txn: &dyn StorageTxn<T>, storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync> ) -> Self

Create a new storage controller from a client it should wrap.

Note that when creating a new storage controller, you must also reconcile it with the previous state.

§Panics

If this function is called before prepare_initialization.

source

pub(crate) fn set_hold_policies( &mut self, policies: Vec<(GlobalId, ReadPolicy<T>)> )

source

pub(crate) fn update_write_frontiers( &mut self, updates: &[(GlobalId, Antichain<T>)] )

source

pub(crate) fn update_hold_capabilities( &mut self, updates: &mut BTreeMap<GlobalId, ChangeBatch<T>> )

source

pub(crate) fn validate_collection_ids( &self, ids: impl Iterator<Item = GlobalId> ) -> Result<(), StorageError<T>>

Validate that a collection exists for all identifiers, and error if any do not.

source

pub(crate) fn validate_export_ids( &self, ids: impl Iterator<Item = GlobalId> ) -> Result<(), StorageError<T>>

Validate that a collection exists for all identifiers, and error if any do not.

source

pub(crate) fn active_exports( &self ) -> impl Iterator<Item = (GlobalId, &ExportState<T>)>

Iterate over exports that have not been dropped.

source

pub(crate) async fn open_data_handles( &self, id: &GlobalId, shard: ShardId, relation_desc: RelationDesc, persist_client: &PersistClient ) -> WriteHandle<SourceData, (), T, Diff>

Opens a write and critical since handles for the given shard.

since is an optional since that the read handle will be forwarded to if it is less than its current since.

This will halt! the process if we cannot successfully acquire a critical handle with our current epoch.

source

pub(crate) async fn register_introspection_collection( &mut self, id: GlobalId, introspection_type: IntrospectionType ) -> Result<(), StorageError<T>>

Registers the given introspection collection and does any preparatory work that we have to to do before we start writing to it. This preparatory work will include partial truncation or other cleanup schemes, depending on introspection type.

source

pub(crate) async fn prepare_introspection_collection( &mut self, id: GlobalId, introspection_type: IntrospectionType ) -> Result<(), StorageError<T>>

Does any work that is required before this controller instance starts writing to the given introspection collection.

This migh include consolidation, deleting older entries or seeding in-memory state of, say, scrapers, with current collection contents.

source

pub(crate) async fn snapshot_statistics(&mut self, id: GlobalId) -> Vec<Row>

Get the current rows in the given statistics table. This is used to bootstrap the statistics tasks.

source

pub(crate) fn reconcile_dangling_statistics(&mut self)

Remove statistics for sources/sinks that were dropped but still have statistics rows hanging around.

source

pub(crate) async fn initialize_shard_mapping(&mut self)

Initializes the data expressing which global IDs correspond to which shards. Necessary because we cannot write any of these mappings that we discover before the shard mapping collection exists.

§Panics
  • If IntrospectionType::ShardMapping is not associated with a GlobalId in self.introspection_ids.
  • If IntrospectionType::ShardMapping’s GlobalId is not registered as a managed collection.
source

pub(crate) async fn partially_truncate_status_history( &mut self, collection: IntrospectionType ) -> BTreeMap<GlobalId, Row>

Effectively truncates the status history shard except for the most recent updates from each ID.

NOTE: The history collections are really append-only collections, but every-now-and-then we want to retract old updates so that the collection does not grow unboundedly. Crucially, these are not incremental collections, they are not derived from a state at some time t and we cannot maintain a desired state for them.

Returns a map with latest unpacked row per id.

source

pub(crate) async fn append_shard_mappings<I>(&self, global_ids: I, diff: i64)
where I: Iterator<Item = GlobalId>,

Appends a new global ID, shard ID pair to the appropriate collection. Use a diff of 1 to append a new entry; -1 to retract an existing entry.

However, data is written iff we know of the GlobalId of the IntrospectionType::ShardMapping collection; in other cases, data is dropped on the floor. In these cases, the data is later written by Self::initialize_shard_mapping.

§Panics
  • If self.collections does not have an entry for global_id.
  • If IntrospectionType::ShardMapping’s GlobalId is not registered as a managed collection.
  • If diff is any value other than 1 or -1.
source

pub(crate) fn determine_collection_dependencies( &self, self_id: GlobalId, data_source: &DataSource ) -> Result<Vec<GlobalId>, StorageError<T>>

Determines and returns this collection’s dependencies, if any.

source

pub(crate) async fn read_handle_for_snapshot( &self, id: GlobalId ) -> Result<ReadHandle<SourceData, (), T, Diff>, StorageError<T>>

source

pub(crate) async fn snapshot_and_stream( &self, id: GlobalId, as_of: T ) -> Result<BoxStream<'_, (SourceData, T, Diff)>, StorageError<T>>

source

pub(crate) async fn record_status_updates(&mut self, updates: Vec<StatusUpdate>)

Handles writing of status updates for sources/sinks to the appropriate status relation

source

pub(crate) fn collection( &self, id: GlobalId ) -> Result<&CollectionState<T>, StorageError<T>>

source

pub(crate) fn run_ingestion( &mut self, id: GlobalId ) -> Result<(), StorageError<T>>

Runs the identified ingestion using the current definition of the ingestion in-memory.

Trait Implementations§

source§

impl<T> Debug for Controller<T>

source§

fn fmt(&self, __f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
source§

impl<'a, T> StorageController for Controller<T>

source§

fn config(&self) -> &StorageConfiguration

Get the current configuration

source§

fn create_collections<'life0, 'life1, 'async_trait>( &'life0 mut self, storage_metadata: &'life1 StorageMetadata, register_ts: Option<Self::Timestamp>, collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)> ) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Create and “execute” the described collection.

“Execute” is in scare quotes because what executing a collection means varies widely based on the type of collection you’re creating.

The general process creating a collection undergoes is:

  1. Enrich the description we get from the user with the metadata only the storage controller’s metadata. This is mostly a matter of separating concerns.
  2. Generate write and read persist handles for the collection.
  3. Store the collection’s metadata in the appropriate field.
  4. “Execte” the collection. What that means is contingent on the type of collection. so consult the code for more details.
source§

fn alter_export_connections<'life0, 'async_trait>( &'life0 mut self, exports: BTreeMap<GlobalId, StorageSinkConnection> ) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Create the sinks described by the ExportDescription.

source§

fn drop_sinks( &mut self, identifiers: Vec<GlobalId> ) -> Result<(), StorageError<Self::Timestamp>>

Drops the read capability for the sinks and allows their resources to be reclaimed.

§

type Timestamp = T

source§

fn initialization_complete(&mut self)

Marks the end of any initialization commands. Read more
source§

fn allow_writes<'life0, 'async_trait>( &'life0 mut self, register_ts: Option<Self::Timestamp> ) -> Pin<Box<dyn Future<Output = ()> + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Allow this controller and instances controlled by it to write to external systems. Read more
source§

fn update_parameters(&mut self, config_params: StorageParameters)

Update storage configuration with new parameters.
source§

fn collection_metadata( &self, id: GlobalId ) -> Result<CollectionMetadata, StorageError<Self::Timestamp>>

Returns the CollectionMetadata of the collection identified by id.
source§

fn collection_frontiers( &self, id: GlobalId ) -> Result<(Antichain<Self::Timestamp>, Antichain<Self::Timestamp>), StorageError<Self::Timestamp>>

Returns the since/upper frontiers of the identified collection.
source§

fn collections_frontiers( &self, ids: Vec<GlobalId> ) -> Result<Vec<(GlobalId, Antichain<T>, Antichain<T>)>, StorageError<Self::Timestamp>>

Returns the since/upper frontiers of the identified collections. Read more
source§

fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)>

Acquire an iterator over CollectionMetadata for all active collections. Read more
source§

fn check_exists( &self, id: GlobalId ) -> Result<(), StorageError<Self::Timestamp>>

Checks whether a collection exists under the given GlobalId. Returns an error if the collection does not exist.
source§

fn create_instance(&mut self, id: StorageInstanceId)

Creates a storage instance with the specified ID. Read more
source§

fn drop_instance(&mut self, id: StorageInstanceId)

Drops the storage instance with the given ID. Read more
source§

fn connect_replica( &mut self, instance_id: StorageInstanceId, replica_id: ReplicaId, location: ClusterReplicaLocation )

Connects the storage instance to the specified replica. Read more
source§

fn drop_replica( &mut self, instance_id: StorageInstanceId, _replica_id: ReplicaId )

Disconnects the storage instance from the specified replica.
source§

fn check_alter_ingestion_source_desc( &mut self, ingestion_id: GlobalId, source_desc: &SourceDesc ) -> Result<(), StorageError<Self::Timestamp>>

Check that the ingestion associated with id can use the provided SourceDesc. Read more
source§

fn alter_ingestion_source_desc<'life0, 'async_trait>( &'life0 mut self, ingestion_id: GlobalId, source_desc: SourceDesc ) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Alters the identified collection to use the provided SourceDesc.
source§

fn alter_ingestion_connections<'life0, 'async_trait>( &'life0 mut self, source_connections: BTreeMap<GlobalId, GenericSourceConnection<InlinedConnection>> ) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Alters each identified collection to use the correlated GenericSourceConnection.
source§

fn export( &self, id: GlobalId ) -> Result<&ExportState<Self::Timestamp>, StorageError<Self::Timestamp>>

Acquire an immutable reference to the export state, should it exist.
source§

fn export_mut( &mut self, id: GlobalId ) -> Result<&mut ExportState<Self::Timestamp>, StorageError<Self::Timestamp>>

Acquire a mutable reference to the export state, should it exist.
source§

fn create_exports<'life0, 'async_trait>( &'life0 mut self, exports: Vec<(GlobalId, ExportDescription<Self::Timestamp>)> ) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Create the sinks described by the ExportDescription.
source§

fn alter_export<'life0, 'async_trait>( &'life0 mut self, id: GlobalId, new_description: ExportDescription<Self::Timestamp> ) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Alter the sink identified by the given id to match the provided ExportDescription.
source§

fn drop_tables( &mut self, identifiers: Vec<GlobalId>, ts: Self::Timestamp ) -> Result<(), StorageError<Self::Timestamp>>

Drops the read capability for the tables and allows their resources to be reclaimed.
source§

fn drop_sources( &mut self, storage_metadata: &StorageMetadata, identifiers: Vec<GlobalId> ) -> Result<(), StorageError<Self::Timestamp>>

Drops the read capability for the sources and allows their resources to be reclaimed.
source§

fn drop_sources_unvalidated( &mut self, storage_metadata: &StorageMetadata, ids: Vec<GlobalId> ) -> Result<(), StorageError<Self::Timestamp>>

Drops the read capability for the sources and allows their resources to be reclaimed. Read more
source§

fn drop_sinks_unvalidated(&mut self, identifiers: Vec<GlobalId>)

Drops the read capability for the sinks and allows their resources to be reclaimed. Read more
source§

fn append_table( &mut self, write_ts: Self::Timestamp, advance_to: Self::Timestamp, commands: Vec<(GlobalId, Vec<TimestamplessUpdate>)> ) -> Result<Receiver<Result<(), StorageError<Self::Timestamp>>>, StorageError<Self::Timestamp>>

Append updates into the local input named id and advance its upper to upper. Read more
source§

fn monotonic_appender( &self, id: GlobalId ) -> Result<MonotonicAppender<Self::Timestamp>, StorageError<Self::Timestamp>>

Returns a MonotonicAppender which is a channel that can be used to monotonically append to the specified GlobalId.
source§

fn webhook_statistics( &self, id: GlobalId ) -> Result<Arc<WebhookStatistics>, StorageError<Self::Timestamp>>

Returns a shared WebhookStatistics which can be used to report user-facing statistics for this given webhhook, specified by the GlobalId.
source§

fn snapshot<'life0, 'async_trait>( &'life0 mut self, id: GlobalId, as_of: Self::Timestamp ) -> Pin<Box<dyn Future<Output = Result<Vec<(Row, Diff)>, StorageError<Self::Timestamp>>> + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Returns the snapshot of the contents of the local input named id at as_of.
source§

fn snapshot_cursor<'life0, 'async_trait>( &'life0 mut self, id: GlobalId, as_of: Self::Timestamp ) -> Pin<Box<dyn Future<Output = Result<SnapshotCursor<Self::Timestamp>, StorageError<Self::Timestamp>>> + 'async_trait>>
where Self::Timestamp: Timestamp + Lattice + Codec64, Self: 'async_trait, 'life0: 'async_trait,

Returns the snapshot of the contents of the local input named id at as_of.
source§

fn snapshot_stats<'life0, 'async_trait>( &'life0 self, id: GlobalId, as_of: Antichain<Self::Timestamp> ) -> Pin<Box<dyn Future<Output = Result<SnapshotStats, StorageError<Self::Timestamp>>> + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Returns aggregate statistics about the contents of the local input named id at as_of.
source§

fn snapshot_parts_stats<'life0, 'async_trait>( &'life0 self, id: GlobalId, as_of: Antichain<Self::Timestamp> ) -> Pin<Box<dyn Future<Output = BoxFuture<'static, Result<SnapshotPartsStats, StorageError<Self::Timestamp>>>> + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Returns aggregate statistics about the contents of the local input named id at as_of. Read more
source§

fn set_read_policy( &mut self, policies: Vec<(GlobalId, ReadPolicy<Self::Timestamp>)> )

Assigns a read policy to specific identifiers. Read more
source§

fn acquire_read_holds( &mut self, desired_holds: Vec<GlobalId> ) -> Result<Vec<ReadHold<Self::Timestamp>>, ReadHoldError>

Acquires and returns the desired read holds, advancing them to the since frontier when necessary.
source§

fn update_read_capabilities( &mut self, updates: &mut BTreeMap<GlobalId, ChangeBatch<Self::Timestamp>> )

Applies updates and sends any appropriate compaction command.
source§

fn ready<'life0, 'async_trait>( &'life0 mut self ) -> Pin<Box<dyn Future<Output = ()> + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Waits until the controller is ready to process a response. Read more
source§

fn process<'life0, 'life1, 'async_trait>( &'life0 mut self, storage_metadata: &'life1 StorageMetadata ) -> Pin<Box<dyn Future<Output = Result<Option<Response<T>>, Error>> + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Processes the work queued by StorageController::ready. Read more
source§

fn inspect_persist_state<'life0, 'async_trait>( &'life0 self, id: GlobalId ) -> Pin<Box<dyn Future<Output = Result<Value, Error>> + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Exposes the internal state of the data shard for debugging and QA. Read more
source§

fn record_frontiers<'life0, 'async_trait>( &'life0 mut self, external_frontiers: BTreeMap<GlobalId, (Antichain<Self::Timestamp>, Antichain<Self::Timestamp>)> ) -> Pin<Box<dyn Future<Output = ()> + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Records the current read and write frontiers of all known storage objects. Read more
source§

fn record_replica_frontiers<'life0, 'async_trait>( &'life0 mut self, external_frontiers: BTreeMap<(GlobalId, ReplicaId), Antichain<Self::Timestamp>> ) -> Pin<Box<dyn Future<Output = ()> + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Records the current per-replica write frontiers of all known storage objects. Read more
source§

fn append_introspection_updates<'life0, 'async_trait>( &'life0 mut self, type_: IntrospectionType, updates: Vec<(Row, Diff)> ) -> Pin<Box<dyn Future<Output = ()> + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Records append-only updates for the given introspection type. Read more
source§

fn update_introspection_collection<'life0, 'async_trait>( &'life0 mut self, type_: IntrospectionType, op: StorageWriteOp ) -> Pin<Box<dyn Future<Output = ()> + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Updates the desired state of the given introspection type. Read more
source§

fn initialize_state<'life0, 'life1, 'async_trait>( &'life0 mut self, txn: &'life1 mut (dyn StorageTxn<T> + Send), init_ids: BTreeSet<GlobalId>, drop_ids: BTreeSet<GlobalId> ) -> Pin<Box<dyn Future<Output = Result<(), StorageError<T>>> + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

On boot, seed the controller’s metadata/state.
source§

fn prepare_state<'life0, 'life1, 'async_trait>( &'life0 self, txn: &'life1 mut (dyn StorageTxn<T> + Send), ids_to_add: BTreeSet<GlobalId>, ids_to_drop: BTreeSet<GlobalId> ) -> Pin<Box<dyn Future<Output = Result<(), StorageError<T>>> + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Update the implementor of StorageTxn with the appropriate metadata given the IDs to add and drop. Read more
source§

fn real_time_recent_timestamp<'life0, 'async_trait>( &'life0 mut self, timestamp_objects: BTreeSet<GlobalId>, timeout: Duration ) -> Pin<Box<dyn Future<Output = Result<BoxFuture<'static, Result<Self::Timestamp, StorageError<Self::Timestamp>>>, StorageError<Self::Timestamp>>> + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

source§

fn acquire_read_hold( &mut self, id: GlobalId ) -> Result<ReadHold<Self::Timestamp>, ReadHoldError>

Acquires and returns the earliest legal read hold.

Auto Trait Implementations§

§

impl<T> !Freeze for Controller<T>

§

impl<T> !RefUnwindSafe for Controller<T>

§

impl<T> Send for Controller<T>

§

impl<T> Sync for Controller<T>

§

impl<T> Unpin for Controller<T>
where T: Unpin,

§

impl<T> !UnwindSafe for Controller<T>

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T, U> CastInto<U> for T
where U: CastFrom<T>,

source§

fn cast_into(self) -> U

Performs the cast.
source§

impl<T> Conv for T

source§

fn conv<T>(self) -> T
where Self: Into<T>,

Converts self into T using Into<T>. Read more
source§

impl<T> FmtForward for T

source§

fn fmt_binary(self) -> FmtBinary<Self>
where Self: Binary,

Causes self to use its Binary implementation when Debug-formatted.
source§

fn fmt_display(self) -> FmtDisplay<Self>
where Self: Display,

Causes self to use its Display implementation when Debug-formatted.
source§

fn fmt_lower_exp(self) -> FmtLowerExp<Self>
where Self: LowerExp,

Causes self to use its LowerExp implementation when Debug-formatted.
source§

fn fmt_lower_hex(self) -> FmtLowerHex<Self>
where Self: LowerHex,

Causes self to use its LowerHex implementation when Debug-formatted.
source§

fn fmt_octal(self) -> FmtOctal<Self>
where Self: Octal,

Causes self to use its Octal implementation when Debug-formatted.
source§

fn fmt_pointer(self) -> FmtPointer<Self>
where Self: Pointer,

Causes self to use its Pointer implementation when Debug-formatted.
source§

fn fmt_upper_exp(self) -> FmtUpperExp<Self>
where Self: UpperExp,

Causes self to use its UpperExp implementation when Debug-formatted.
source§

fn fmt_upper_hex(self) -> FmtUpperHex<Self>
where Self: UpperHex,

Causes self to use its UpperHex implementation when Debug-formatted.
source§

fn fmt_list(self) -> FmtList<Self>
where &'a Self: for<'a> IntoIterator,

Formats each item in a sequence. Read more
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T> FutureExt for T

source§

fn with_context(self, otel_cx: Context) -> WithContext<Self>

Attaches the provided Context to this type, returning a WithContext wrapper. Read more
source§

fn with_current_context(self) -> WithContext<Self>

Attaches the current Context to this type, returning a WithContext wrapper. Read more
source§

impl<T> Instrument for T

source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
source§

impl<T, U> Into<U> for T
where U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T> IntoRequest<T> for T

source§

fn into_request(self) -> Request<T>

Wrap the input message T in a tonic::Request
source§

impl<Unshared, Shared> IntoShared<Shared> for Unshared
where Shared: FromUnshared<Unshared>,

source§

fn into_shared(self) -> Shared

Creates a shared type from an unshared type.
source§

impl<T, U> OverrideFrom<Option<&T>> for U
where U: OverrideFrom<T>,

source§

fn override_from(self, layer: &Option<&T>) -> U

Override the configuration represented by Self with values from the given layer.
source§

impl<T> Pipe for T
where T: ?Sized,

source§

fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> R
where Self: Sized,

Pipes by value. This is generally the method you want to use. Read more
source§

fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> R
where R: 'a,

Borrows self and passes that borrow into the pipe function. Read more
source§

fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> R
where R: 'a,

Mutably borrows self and passes that borrow into the pipe function. Read more
source§

fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
where Self: Borrow<B>, B: 'a + ?Sized, R: 'a,

Borrows self, then passes self.borrow() into the pipe function. Read more
source§

fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R ) -> R
where Self: BorrowMut<B>, B: 'a + ?Sized, R: 'a,

Mutably borrows self, then passes self.borrow_mut() into the pipe function. Read more
source§

fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
where Self: AsRef<U>, U: 'a + ?Sized, R: 'a,

Borrows self, then passes self.as_ref() into the pipe function.
source§

fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
where Self: AsMut<U>, U: 'a + ?Sized, R: 'a,

Mutably borrows self, then passes self.as_mut() into the pipe function.
source§

fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
where Self: Deref<Target = T>, T: 'a + ?Sized, R: 'a,

Borrows self, then passes self.deref() into the pipe function.
source§

fn pipe_deref_mut<'a, T, R>( &'a mut self, func: impl FnOnce(&'a mut T) -> R ) -> R
where Self: DerefMut<Target = T> + Deref, T: 'a + ?Sized, R: 'a,

Mutably borrows self, then passes self.deref_mut() into the pipe function.
source§

impl<T> Pointable for T

source§

const ALIGN: usize = _

The alignment of pointer.
§

type Init = T

The type for initializers.
source§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
source§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
source§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
source§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
source§

impl<P, R> ProtoType<R> for P
where R: RustType<P>,

source§

impl<T> Same for T

§

type Output = T

Should always be Self
source§

impl<'a, S, T> Semigroup<&'a S> for T
where T: Semigroup<S>,

source§

fn plus_equals(&mut self, rhs: &&'a S)

The method of std::ops::AddAssign, for types that do not implement AddAssign.
source§

impl<T> Tap for T

source§

fn tap(self, func: impl FnOnce(&Self)) -> Self

Immutable access to a value. Read more
source§

fn tap_mut(self, func: impl FnOnce(&mut Self)) -> Self

Mutable access to a value. Read more
source§

fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
where Self: Borrow<B>, B: ?Sized,

Immutable access to the Borrow<B> of a value. Read more
source§

fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
where Self: BorrowMut<B>, B: ?Sized,

Mutable access to the BorrowMut<B> of a value. Read more
source§

fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
where Self: AsRef<R>, R: ?Sized,

Immutable access to the AsRef<R> view of a value. Read more
source§

fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
where Self: AsMut<R>, R: ?Sized,

Mutable access to the AsMut<R> view of a value. Read more
source§

fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
where Self: Deref<Target = T>, T: ?Sized,

Immutable access to the Deref::Target of a value. Read more
source§

fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
where Self: DerefMut<Target = T> + Deref, T: ?Sized,

Mutable access to the Deref::Target of a value. Read more
source§

fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self

Calls .tap() only in debug builds, and is erased in release builds.
source§

fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self

Calls .tap_mut() only in debug builds, and is erased in release builds.
source§

fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
where Self: Borrow<B>, B: ?Sized,

Calls .tap_borrow() only in debug builds, and is erased in release builds.
source§

fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
where Self: BorrowMut<B>, B: ?Sized,

Calls .tap_borrow_mut() only in debug builds, and is erased in release builds.
source§

fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
where Self: AsRef<R>, R: ?Sized,

Calls .tap_ref() only in debug builds, and is erased in release builds.
source§

fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
where Self: AsMut<R>, R: ?Sized,

Calls .tap_ref_mut() only in debug builds, and is erased in release builds.
source§

fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
where Self: Deref<Target = T>, T: ?Sized,

Calls .tap_deref() only in debug builds, and is erased in release builds.
source§

fn tap_deref_mut_dbg<T>(self, func: impl FnOnce(&mut T)) -> Self
where Self: DerefMut<Target = T> + Deref, T: ?Sized,

Calls .tap_deref_mut() only in debug builds, and is erased in release builds.
source§

impl<T> TryConv for T

source§

fn try_conv<T>(self) -> Result<T, Self::Error>
where Self: TryInto<T>,

Attempts to convert self into T using TryInto<T>. Read more
source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

source§

fn vzip(self) -> V

source§

impl<T> WithSubscriber for T

source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more