Struct mz_storage_controller::Controller

source ·
pub struct Controller<T: Timestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation> {
Show 38 fields pub(crate) build_info: &'static BuildInfo, pub(crate) now: NowFn, pub(crate) envd_epoch: NonZeroI64, pub(crate) read_only: bool, pub(crate) collections: BTreeMap<GlobalId, CollectionState<T>>, pub(crate) exports: BTreeMap<GlobalId, ExportState<T>>, pub(crate) persist_table_worker: PersistTableWriteWorker<T>, pub(crate) txns_read: TxnsRead<T>, pub(crate) txns_metrics: Arc<Metrics>, pub(crate) stashed_response: Option<StorageResponse<T>>, pub(crate) pending_compaction_commands: Vec<PendingCompactionCommand<T>>, pub(crate) pending_table_handle_drops_tx: UnboundedSender<GlobalId>, pub(crate) pending_table_handle_drops_rx: UnboundedReceiver<GlobalId>, pub(crate) collection_manager: CollectionManager<T>, pub(crate) introspection_ids: BTreeMap<IntrospectionType, GlobalId>, pub(crate) introspection_tokens: Arc<Mutex<BTreeMap<GlobalId, Box<dyn Any + Send + Sync>>>>, pub(crate) source_statistics: Arc<Mutex<SourceStatistics>>, pub(crate) sink_statistics: Arc<Mutex<BTreeMap<GlobalId, StatsState<SinkStatisticsUpdate>>>>, pub(crate) statistics_interval_sender: Sender<Duration>, pub(crate) instances: BTreeMap<StorageInstanceId, Instance<T>>, pub(crate) initialized: bool, pub(crate) config: StorageConfiguration, pub(crate) internal_response_queue: UnboundedReceiver<StorageResponse<T>>, pub(crate) persist_location: PersistLocation, pub(crate) persist: Arc<PersistClientCache>, pub(crate) metrics: StorageControllerMetrics, pub(crate) internal_response_sender: UnboundedSender<StorageResponse<T>>, pub(crate) recorded_frontiers: BTreeMap<GlobalId, (Antichain<T>, Antichain<T>)>, pub(crate) recorded_replica_frontiers: BTreeMap<(GlobalId, ReplicaId), Antichain<T>>, pub(crate) wallclock_lag: WallclockLagFn<T>, pub(crate) wallclock_lag_last_refresh: Instant, pub(crate) storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>, pub(crate) migrated_storage_collections: BTreeSet<GlobalId>, pub(crate) maintenance_ticker: Interval, pub(crate) maintenance_scheduled: bool, pub(crate) instance_response_tx: UnboundedSender<StorageResponse<T>>, pub(crate) instance_response_rx: UnboundedReceiver<StorageResponse<T>>, pub(crate) persist_warm_task: Option<AbortOnDropHandle<Box<dyn Debug + Send>>>,
}
Expand description

A storage controller for a storage instance.

Fields§

§build_info: &'static BuildInfo

The build information for this process.

§now: NowFn

A function that returns the current time.

§envd_epoch: NonZeroI64

The fencing token for this instance of the controller.

§read_only: bool

Whether or not this controller is in read-only mode.

When in read-only mode, neither this controller nor the instances controlled by it are allowed to affect changes to external systems (largely persist).

§collections: BTreeMap<GlobalId, CollectionState<T>>

Collections maintained by the storage controller.

This collection only grows, although individual collections may be rendered unusable. This is to prevent the re-binding of identifiers to other descriptions.

§exports: BTreeMap<GlobalId, ExportState<T>>§persist_table_worker: PersistTableWriteWorker<T>

Write handle for table shards.

§txns_read: TxnsRead<T>

A shared TxnsCache running in a task and communicated with over a channel.

§txns_metrics: Arc<Metrics>§stashed_response: Option<StorageResponse<T>>§pending_compaction_commands: Vec<PendingCompactionCommand<T>>

Compaction commands to send during the next call to StorageController::process.

§pending_table_handle_drops_tx: UnboundedSender<GlobalId>

Channel for sending table handle drops.

§pending_table_handle_drops_rx: UnboundedReceiver<GlobalId>

Channel for receiving table handle drops.

§collection_manager: CollectionManager<T>

Interface for managed collections

§introspection_ids: BTreeMap<IntrospectionType, GlobalId>

Tracks which collection is responsible for which IntrospectionType.

§introspection_tokens: Arc<Mutex<BTreeMap<GlobalId, Box<dyn Any + Send + Sync>>>>

Tokens for tasks that drive updating introspection collections. Dropping this will make sure that any tasks (or other resources) will stop when needed.

§source_statistics: Arc<Mutex<SourceStatistics>>

Consolidated metrics updates to periodically write. We do not eagerly initialize this, and its contents are entirely driven by StorageResponse::StatisticsUpdates’s, as well as webhook statistics.

§sink_statistics: Arc<Mutex<BTreeMap<GlobalId, StatsState<SinkStatisticsUpdate>>>>

Consolidated metrics updates to periodically write. We do not eagerly initialize this, and its contents are entirely driven by StorageResponse::StatisticsUpdates’s.

§statistics_interval_sender: Sender<Duration>

A way to update the statistics interval in the statistics tasks.

§instances: BTreeMap<StorageInstanceId, Instance<T>>

Clients for all known storage instances.

§initialized: bool

Set to true once initialization_complete has been called.

§config: StorageConfiguration

Storage configuration to apply to newly provisioned instances, and use during purification.

§internal_response_queue: UnboundedReceiver<StorageResponse<T>>

Mechanism for returning frontier advancement for tables.

§persist_location: PersistLocation

The persist location where all storage collections are being written to

§persist: Arc<PersistClientCache>

A persist client used to write to storage collections

§metrics: StorageControllerMetrics

Metrics of the Storage controller

§internal_response_sender: UnboundedSender<StorageResponse<T>>

Mechanism for the storage controller to send itself feedback, potentially emulating the responses we expect from clusters.

Note: This is used for finalizing shards of webhook sources, once webhook sources are installed on a clusterd this can likely be refactored away.

§recorded_frontiers: BTreeMap<GlobalId, (Antichain<T>, Antichain<T>)>

(read, write) frontiers that have been recorded in the Frontiers collection, kept to be able to retract old rows.

§recorded_replica_frontiers: BTreeMap<(GlobalId, ReplicaId), Antichain<T>>

Write frontiers that have been recorded in the ReplicaFrontiers collection, kept to be able to retract old rows.

§wallclock_lag: WallclockLagFn<T>

A function that computes the lag between the given time and wallclock time.

§wallclock_lag_last_refresh: Instant

The last time wallclock lag introspection was refreshed.

§storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>

Handle to a StorageCollections.

§migrated_storage_collections: BTreeSet<GlobalId>

Migrated storage collections that can be written even in read only mode.

§maintenance_ticker: Interval

Ticker for scheduling periodic maintenance work.

§maintenance_scheduled: bool

Whether maintenance work was scheduled.

§instance_response_tx: UnboundedSender<StorageResponse<T>>

Shared transmit channel for replicas to send responses.

§instance_response_rx: UnboundedReceiver<StorageResponse<T>>

Receive end for replica responses.

§persist_warm_task: Option<AbortOnDropHandle<Box<dyn Debug + Send>>>

Background task run at startup to warm persist state.

Implementations§

source§

impl<T> Controller<T>

source

pub async fn new( build_info: &'static BuildInfo, persist_location: PersistLocation, persist_clients: Arc<PersistClientCache>, now: NowFn, wallclock_lag: WallclockLagFn<T>, txns_metrics: Arc<TxnMetrics>, envd_epoch: NonZeroI64, read_only: bool, metrics_registry: MetricsRegistry, connection_context: ConnectionContext, txn: &dyn StorageTxn<T>, storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>, ) -> Self

Create a new storage controller from a client it should wrap.

Note that when creating a new storage controller, you must also reconcile it with the previous state.

§Panics

If this function is called before prepare_initialization.

source

pub(crate) fn set_hold_policies( &mut self, policies: Vec<(GlobalId, ReadPolicy<T>)>, )

source

pub(crate) fn update_write_frontiers( &mut self, updates: &[(GlobalId, Antichain<T>)], )

source

pub(crate) fn update_hold_capabilities( &mut self, updates: &mut BTreeMap<GlobalId, ChangeBatch<T>>, )

source

pub(crate) fn validate_collection_ids( &self, ids: impl Iterator<Item = GlobalId>, ) -> Result<(), StorageError<T>>

Validate that a collection exists for all identifiers, and error if any do not.

source

pub(crate) fn validate_export_ids( &self, ids: impl Iterator<Item = GlobalId>, ) -> Result<(), StorageError<T>>

Validate that a collection exists for all identifiers, and error if any do not.

source

pub(crate) fn active_exports( &self, ) -> impl Iterator<Item = (GlobalId, &ExportState<T>)>

Iterate over exports that have not been dropped.

source

pub(crate) async fn recent_upper( &self, id: GlobalId, ) -> Result<Antichain<T>, StorageError<T>>

source

pub(crate) async fn open_data_handles( &self, id: &GlobalId, shard: ShardId, relation_desc: RelationDesc, persist_client: &PersistClient, ) -> WriteHandle<SourceData, (), T, Diff>

Opens a write and critical since handles for the given shard.

since is an optional since that the read handle will be forwarded to if it is less than its current since.

This will halt! the process if we cannot successfully acquire a critical handle with our current epoch.

source

pub(crate) fn register_introspection_collection( &mut self, id: GlobalId, introspection_type: IntrospectionType, write_handle: WriteHandle<SourceData, (), T, Diff>, persist_client: PersistClient, ) -> Result<(), StorageError<T>>

Registers the given introspection collection and does any preparatory work that we have to do before we start writing to it. This preparatory work will include partial truncation or other cleanup schemes, depending on introspection type.

source

pub(crate) fn reconcile_dangling_statistics(&self)

Remove statistics for sources/sinks that were dropped but still have statistics rows hanging around.

source

pub(crate) fn append_shard_mappings<I>(&self, global_ids: I, diff: i64)
where I: Iterator<Item = GlobalId>,

Appends a new global ID, shard ID pair to the appropriate collection. Use a diff of 1 to append a new entry; -1 to retract an existing entry.

§Panics
  • If self.collections does not have an entry for global_id.
  • If IntrospectionType::ShardMapping’s GlobalId is not registered as a managed collection.
  • If diff is any value other than 1 or -1.
source

pub(crate) fn determine_collection_dependencies( &self, self_id: GlobalId, data_source: &DataSource, ) -> Result<Vec<GlobalId>, StorageError<T>>

Determines and returns this collection’s dependencies, if any.

source

pub(crate) async fn read_handle_for_snapshot( &self, id: GlobalId, ) -> Result<ReadHandle<SourceData, (), T, Diff>, StorageError<T>>

source

pub(crate) async fn snapshot_and_stream( &self, id: GlobalId, as_of: T, ) -> Result<BoxStream<'_, (SourceData, T, Diff)>, StorageError<T>>

source

pub(crate) fn record_status_updates(&mut self, updates: Vec<StatusUpdate>)

Handles writing of status updates for sources/sinks to the appropriate status relation

source

pub(crate) fn collection( &self, id: GlobalId, ) -> Result<&CollectionState<T>, StorageError<T>>

source

pub(crate) fn run_ingestion( &mut self, id: GlobalId, ) -> Result<(), StorageError<T>>

Runs the identified ingestion using the current definition of the ingestion in-memory.

source

pub(crate) fn run_export(&mut self, id: GlobalId) -> Result<(), StorageError<T>>

Runs the identified export using the current definition of the export that we have in memory.

source

pub(crate) fn update_frontier_introspection(&mut self)

Update introspection with the current frontiers of storage objects.

This method is invoked by Controller::maintain, which we expect to be called once per second during normal operation.

source

pub(crate) fn update_wallclock_lag_introspection(&mut self)

Update introspection with the current wallclock lag values.

We measure the lag of write frontiers behind the wallclock time every second and track the maximum over 60 measurements (i.e., one minute). Every minute, we emit a new lag event to the WallclockLagHistory introspection with the current maximum.

This method is invoked by ComputeController::maintain, which we expect to be called once per second during normal operation.

source

pub(crate) fn maintain(&mut self)

Run periodic tasks.

This method is invoked roughly once per second during normal operation. It is a good place for tasks that need to run periodically, such as state cleanup or updating of metrics.

Trait Implementations§

source§

impl<T> Debug for Controller<T>

source§

fn fmt(&self, __f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
source§

impl<T> StorageController for Controller<T>

source§

fn config(&self) -> &StorageConfiguration

Get the current configuration

source§

fn create_collections_for_bootstrap<'life0, 'life1, 'life2, 'async_trait>( &'life0 mut self, storage_metadata: &'life1 StorageMetadata, register_ts: Option<Self::Timestamp>, collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>, migrated_storage_collections: &'life2 BTreeSet<GlobalId>, ) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Create and “execute” the described collection.

“Execute” is in scare quotes because what executing a collection means varies widely based on the type of collection you’re creating.

The general process creating a collection undergoes is:

  1. Enrich the description we get from the user with the metadata only the storage controller’s metadata. This is mostly a matter of separating concerns.
  2. Generate write and read persist handles for the collection.
  3. Store the collection’s metadata in the appropriate field.
  4. “Execute” the collection. What that means is contingent on the type of collection. so consult the code for more details.
source§

fn alter_export_connections<'life0, 'async_trait>( &'life0 mut self, exports: BTreeMap<GlobalId, StorageSinkConnection>, ) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Create the sinks described by the ExportDescription.

source§

fn drop_sinks( &mut self, identifiers: Vec<GlobalId>, ) -> Result<(), StorageError<Self::Timestamp>>

Drops the read capability for the sinks and allows their resources to be reclaimed.

§

type Timestamp = T

source§

fn initialization_complete(&mut self)

Marks the end of any initialization commands. Read more
source§

fn update_parameters(&mut self, config_params: StorageParameters)

Update storage configuration with new parameters.
source§

fn collection_metadata( &self, id: GlobalId, ) -> Result<CollectionMetadata, StorageError<Self::Timestamp>>

Returns the CollectionMetadata of the collection identified by id.
source§

fn collection_hydrated( &self, collection_id: GlobalId, ) -> Result<bool, StorageError<Self::Timestamp>>

Returns true iff the given collection/ingestion has been hydrated. Read more
source§

fn collection_frontiers( &self, id: GlobalId, ) -> Result<(Antichain<Self::Timestamp>, Antichain<Self::Timestamp>), StorageError<Self::Timestamp>>

Returns the since/upper frontiers of the identified collection.
source§

fn collections_frontiers( &self, ids: Vec<GlobalId>, ) -> Result<Vec<(GlobalId, Antichain<T>, Antichain<T>)>, StorageError<Self::Timestamp>>

Returns the since/upper frontiers of the identified collections. Read more
source§

fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)>

Acquire an iterator over CollectionMetadata for all active collections. Read more
source§

fn active_ingestions( &self, instance_id: StorageInstanceId, ) -> &BTreeSet<GlobalId>

Returns the IDs of all active ingestions for the given storage instance.
source§

fn check_exists( &self, id: GlobalId, ) -> Result<(), StorageError<Self::Timestamp>>

Checks whether a collection exists under the given GlobalId. Returns an error if the collection does not exist.
source§

fn create_instance(&mut self, id: StorageInstanceId)

Creates a storage instance with the specified ID. Read more
source§

fn drop_instance(&mut self, id: StorageInstanceId)

Drops the storage instance with the given ID. Read more
source§

fn connect_replica( &mut self, instance_id: StorageInstanceId, replica_id: ReplicaId, location: ClusterReplicaLocation, )

Connects the storage instance to the specified replica. Read more
source§

fn drop_replica( &mut self, instance_id: StorageInstanceId, replica_id: ReplicaId, )

Disconnects the storage instance from the specified replica.
source§

fn check_alter_ingestion_source_desc( &mut self, ingestion_id: GlobalId, source_desc: &SourceDesc, ) -> Result<(), StorageError<Self::Timestamp>>

Check that the ingestion associated with id can use the provided SourceDesc. Read more
source§

fn alter_ingestion_source_desc<'life0, 'async_trait>( &'life0 mut self, ingestion_id: GlobalId, source_desc: SourceDesc, ) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Alters the identified collection to use the provided SourceDesc.
source§

fn alter_ingestion_connections<'life0, 'async_trait>( &'life0 mut self, source_connections: BTreeMap<GlobalId, GenericSourceConnection<InlinedConnection>>, ) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Alters each identified collection to use the correlated GenericSourceConnection.
source§

fn alter_ingestion_export_data_configs<'life0, 'async_trait>( &'life0 mut self, source_exports: BTreeMap<GlobalId, SourceExportDataConfig>, ) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Alters the data config for the specified source exports of the specified ingestions.
source§

fn alter_table_desc<'life0, 'async_trait>( &'life0 mut self, table_id: GlobalId, new_desc: RelationDesc, expected_schema: SchemaId, forget_ts: Self::Timestamp, register_ts: Self::Timestamp, ) -> Pin<Box<dyn Future<Output = Result<SchemaId, StorageError<Self::Timestamp>>> + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

source§

fn export( &self, id: GlobalId, ) -> Result<&ExportState<Self::Timestamp>, StorageError<Self::Timestamp>>

Acquire an immutable reference to the export state, should it exist.
source§

fn export_mut( &mut self, id: GlobalId, ) -> Result<&mut ExportState<Self::Timestamp>, StorageError<Self::Timestamp>>

Acquire a mutable reference to the export state, should it exist.
source§

fn create_exports<'life0, 'async_trait>( &'life0 mut self, exports: Vec<(GlobalId, ExportDescription<Self::Timestamp>)>, ) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Create the sinks described by the ExportDescription.
source§

fn alter_export<'life0, 'async_trait>( &'life0 mut self, id: GlobalId, new_description: ExportDescription<Self::Timestamp>, ) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Alter the sink identified by the given id to match the provided ExportDescription.
source§

fn drop_tables( &mut self, storage_metadata: &StorageMetadata, identifiers: Vec<GlobalId>, ts: Self::Timestamp, ) -> Result<(), StorageError<Self::Timestamp>>

Drops the read capability for the tables and allows their resources to be reclaimed.
source§

fn drop_sources( &mut self, storage_metadata: &StorageMetadata, identifiers: Vec<GlobalId>, ) -> Result<(), StorageError<Self::Timestamp>>

Drops the read capability for the sources and allows their resources to be reclaimed.
source§

fn drop_sources_unvalidated( &mut self, storage_metadata: &StorageMetadata, ids: Vec<GlobalId>, ) -> Result<(), StorageError<Self::Timestamp>>

Drops the read capability for the sources and allows their resources to be reclaimed. Read more
source§

fn drop_sinks_unvalidated(&mut self, identifiers: Vec<GlobalId>)

Drops the read capability for the sinks and allows their resources to be reclaimed. Read more
source§

fn append_table( &mut self, write_ts: Self::Timestamp, advance_to: Self::Timestamp, commands: Vec<(GlobalId, Vec<TimestamplessUpdate>)>, ) -> Result<Receiver<Result<(), StorageError<Self::Timestamp>>>, StorageError<Self::Timestamp>>

Append updates into the local input named id and advance its upper to upper. Read more
source§

fn monotonic_appender( &self, id: GlobalId, ) -> Result<MonotonicAppender<Self::Timestamp>, StorageError<Self::Timestamp>>

Returns a MonotonicAppender which is a channel that can be used to monotonically append to the specified GlobalId.
source§

fn webhook_statistics( &self, id: GlobalId, ) -> Result<Arc<WebhookStatistics>, StorageError<Self::Timestamp>>

Returns a shared WebhookStatistics which can be used to report user-facing statistics for this given webhhook, specified by the GlobalId.
source§

fn snapshot( &self, id: GlobalId, as_of: Self::Timestamp, ) -> BoxFuture<Result<Vec<(Row, Diff)>, StorageError<Self::Timestamp>>>

Returns the snapshot of the contents of the local input named id at as_of.
source§

fn snapshot_latest<'life0, 'async_trait>( &'life0 self, id: GlobalId, ) -> Pin<Box<dyn Future<Output = Result<Vec<Row>, StorageError<Self::Timestamp>>> + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Returns the snapshot of the contents of the local input named id at the largest readable as_of.
source§

fn snapshot_cursor<'life0, 'async_trait>( &'life0 mut self, id: GlobalId, as_of: Self::Timestamp, ) -> Pin<Box<dyn Future<Output = Result<SnapshotCursor<Self::Timestamp>, StorageError<Self::Timestamp>>> + 'async_trait>>
where Self::Timestamp: Timestamp + Lattice + Codec64, Self: 'async_trait, 'life0: 'async_trait,

Returns the snapshot of the contents of the local input named id at as_of.
source§

fn snapshot_stats<'life0, 'async_trait>( &'life0 self, id: GlobalId, as_of: Antichain<Self::Timestamp>, ) -> Pin<Box<dyn Future<Output = Result<SnapshotStats, StorageError<Self::Timestamp>>> + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Returns aggregate statistics about the contents of the local input named id at as_of.
source§

fn snapshot_parts_stats<'life0, 'async_trait>( &'life0 self, id: GlobalId, as_of: Antichain<Self::Timestamp>, ) -> Pin<Box<dyn Future<Output = BoxFuture<Result<SnapshotPartsStats, StorageError<Self::Timestamp>>>> + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Returns aggregate statistics about the contents of the local input named id at as_of. Read more
source§

fn set_read_policy( &mut self, policies: Vec<(GlobalId, ReadPolicy<Self::Timestamp>)>, )

Assigns a read policy to specific identifiers. Read more
source§

fn acquire_read_holds( &self, desired_holds: Vec<GlobalId>, ) -> Result<Vec<ReadHold<Self::Timestamp>>, ReadHoldError>

Acquires and returns the desired read holds, advancing them to the since frontier when necessary.
source§

fn ready<'life0, 'async_trait>( &'life0 mut self, ) -> Pin<Box<dyn Future<Output = ()> + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Waits until the controller is ready to process a response. Read more
source§

fn process( &mut self, storage_metadata: &StorageMetadata, ) -> Result<Option<Response<T>>, Error>

Processes the work queued by StorageController::ready.
source§

fn inspect_persist_state<'life0, 'async_trait>( &'life0 self, id: GlobalId, ) -> Pin<Box<dyn Future<Output = Result<Value, Error>> + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Exposes the internal state of the data shard for debugging and QA. Read more
source§

fn append_introspection_updates( &mut self, type_: IntrospectionType, updates: Vec<(Row, Diff)>, )

Records append-only updates for the given introspection type. Read more
source§

fn append_status_introspection_updates( &mut self, type_: IntrospectionType, updates: Vec<StatusUpdate>, )

Records append-only status updates for the given introspection type.
source§

fn update_introspection_collection( &mut self, type_: IntrospectionType, op: StorageWriteOp, )

Updates the desired state of the given introspection type. Read more
source§

fn initialize_state<'life0, 'life1, 'async_trait>( &'life0 mut self, txn: &'life1 mut (dyn StorageTxn<T> + Send), init_ids: BTreeSet<GlobalId>, drop_ids: BTreeSet<GlobalId>, ) -> Pin<Box<dyn Future<Output = Result<(), StorageError<T>>> + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

On boot, seed the controller’s metadata/state.
source§

fn prepare_state<'life0, 'life1, 'async_trait>( &'life0 self, txn: &'life1 mut (dyn StorageTxn<T> + Send), ids_to_add: BTreeSet<GlobalId>, ids_to_drop: BTreeSet<GlobalId>, ) -> Pin<Box<dyn Future<Output = Result<(), StorageError<T>>> + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Update the implementor of StorageTxn with the appropriate metadata given the IDs to add and drop. Read more
source§

fn real_time_recent_timestamp<'life0, 'async_trait>( &'life0 self, timestamp_objects: BTreeSet<GlobalId>, timeout: Duration, ) -> Pin<Box<dyn Future<Output = Result<BoxFuture<Result<Self::Timestamp, StorageError<Self::Timestamp>>>, StorageError<Self::Timestamp>>> + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

source§

fn create_collections<'life0, 'life1, 'async_trait>( &'life0 mut self, storage_metadata: &'life1 StorageMetadata, register_ts: Option<Self::Timestamp>, collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>, ) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + 'async_trait>>
where 'life0: 'async_trait, 'life1: 'async_trait, Self: 'async_trait,

Create the sources described in the individual RunIngestionCommand commands. Read more
source§

fn acquire_read_hold( &mut self, id: GlobalId, ) -> Result<ReadHold<Self::Timestamp>, ReadHoldError>

Acquires and returns the earliest legal read hold.

Auto Trait Implementations§

§

impl<T> !Freeze for Controller<T>

§

impl<T> !RefUnwindSafe for Controller<T>

§

impl<T> Send for Controller<T>

§

impl<T> Sync for Controller<T>

§

impl<T> Unpin for Controller<T>
where T: Unpin,

§

impl<T> !UnwindSafe for Controller<T>

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T, U> CastInto<U> for T
where U: CastFrom<T>,

source§

fn cast_into(self) -> U

Performs the cast.
source§

impl<T> Conv for T

source§

fn conv<T>(self) -> T
where Self: Into<T>,

Converts self into T using Into<T>. Read more
source§

impl<T> FmtForward for T

source§

fn fmt_binary(self) -> FmtBinary<Self>
where Self: Binary,

Causes self to use its Binary implementation when Debug-formatted.
source§

fn fmt_display(self) -> FmtDisplay<Self>
where Self: Display,

Causes self to use its Display implementation when Debug-formatted.
source§

fn fmt_lower_exp(self) -> FmtLowerExp<Self>
where Self: LowerExp,

Causes self to use its LowerExp implementation when Debug-formatted.
source§

fn fmt_lower_hex(self) -> FmtLowerHex<Self>
where Self: LowerHex,

Causes self to use its LowerHex implementation when Debug-formatted.
source§

fn fmt_octal(self) -> FmtOctal<Self>
where Self: Octal,

Causes self to use its Octal implementation when Debug-formatted.
source§

fn fmt_pointer(self) -> FmtPointer<Self>
where Self: Pointer,

Causes self to use its Pointer implementation when Debug-formatted.
source§

fn fmt_upper_exp(self) -> FmtUpperExp<Self>
where Self: UpperExp,

Causes self to use its UpperExp implementation when Debug-formatted.
source§

fn fmt_upper_hex(self) -> FmtUpperHex<Self>
where Self: UpperHex,

Causes self to use its UpperHex implementation when Debug-formatted.
source§

fn fmt_list(self) -> FmtList<Self>
where &'a Self: for<'a> IntoIterator,

Formats each item in a sequence. Read more
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T> FutureExt for T

source§

fn with_context(self, otel_cx: Context) -> WithContext<Self>

Attaches the provided Context to this type, returning a WithContext wrapper. Read more
source§

fn with_current_context(self) -> WithContext<Self>

Attaches the current Context to this type, returning a WithContext wrapper. Read more
source§

impl<T> Instrument for T

source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
source§

impl<T, U> Into<U> for T
where U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T> IntoRequest<T> for T

source§

fn into_request(self) -> Request<T>

Wrap the input message T in a tonic::Request
source§

impl<Unshared, Shared> IntoShared<Shared> for Unshared
where Shared: FromUnshared<Unshared>,

source§

fn into_shared(self) -> Shared

Creates a shared type from an unshared type.
source§

impl<T, U> OverrideFrom<Option<&T>> for U
where U: OverrideFrom<T>,

source§

fn override_from(self, layer: &Option<&T>) -> U

Override the configuration represented by Self with values from the given layer.
source§

impl<T> Pipe for T
where T: ?Sized,

source§

fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> R
where Self: Sized,

Pipes by value. This is generally the method you want to use. Read more
source§

fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> R
where R: 'a,

Borrows self and passes that borrow into the pipe function. Read more
source§

fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> R
where R: 'a,

Mutably borrows self and passes that borrow into the pipe function. Read more
source§

fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
where Self: Borrow<B>, B: 'a + ?Sized, R: 'a,

Borrows self, then passes self.borrow() into the pipe function. Read more
source§

fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R, ) -> R
where Self: BorrowMut<B>, B: 'a + ?Sized, R: 'a,

Mutably borrows self, then passes self.borrow_mut() into the pipe function. Read more
source§

fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
where Self: AsRef<U>, U: 'a + ?Sized, R: 'a,

Borrows self, then passes self.as_ref() into the pipe function.
source§

fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
where Self: AsMut<U>, U: 'a + ?Sized, R: 'a,

Mutably borrows self, then passes self.as_mut() into the pipe function.
source§

fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
where Self: Deref<Target = T>, T: 'a + ?Sized, R: 'a,

Borrows self, then passes self.deref() into the pipe function.
source§

fn pipe_deref_mut<'a, T, R>( &'a mut self, func: impl FnOnce(&'a mut T) -> R, ) -> R
where Self: DerefMut<Target = T> + Deref, T: 'a + ?Sized, R: 'a,

Mutably borrows self, then passes self.deref_mut() into the pipe function.
source§

impl<T> Pointable for T

source§

const ALIGN: usize = _

The alignment of pointer.
§

type Init = T

The type for initializers.
source§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
source§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
source§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
source§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
source§

impl<P, R> ProtoType<R> for P
where R: RustType<P>,

source§

impl<T> Same for T

§

type Output = T

Should always be Self
source§

impl<'a, S, T> Semigroup<&'a S> for T
where T: Semigroup<S>,

source§

fn plus_equals(&mut self, rhs: &&'a S)

The method of std::ops::AddAssign, for types that do not implement AddAssign.
source§

impl<T> Tap for T

source§

fn tap(self, func: impl FnOnce(&Self)) -> Self

Immutable access to a value. Read more
source§

fn tap_mut(self, func: impl FnOnce(&mut Self)) -> Self

Mutable access to a value. Read more
source§

fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
where Self: Borrow<B>, B: ?Sized,

Immutable access to the Borrow<B> of a value. Read more
source§

fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
where Self: BorrowMut<B>, B: ?Sized,

Mutable access to the BorrowMut<B> of a value. Read more
source§

fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
where Self: AsRef<R>, R: ?Sized,

Immutable access to the AsRef<R> view of a value. Read more
source§

fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
where Self: AsMut<R>, R: ?Sized,

Mutable access to the AsMut<R> view of a value. Read more
source§

fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
where Self: Deref<Target = T>, T: ?Sized,

Immutable access to the Deref::Target of a value. Read more
source§

fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
where Self: DerefMut<Target = T> + Deref, T: ?Sized,

Mutable access to the Deref::Target of a value. Read more
source§

fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self

Calls .tap() only in debug builds, and is erased in release builds.
source§

fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self

Calls .tap_mut() only in debug builds, and is erased in release builds.
source§

fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
where Self: Borrow<B>, B: ?Sized,

Calls .tap_borrow() only in debug builds, and is erased in release builds.
source§

fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
where Self: BorrowMut<B>, B: ?Sized,

Calls .tap_borrow_mut() only in debug builds, and is erased in release builds.
source§

fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
where Self: AsRef<R>, R: ?Sized,

Calls .tap_ref() only in debug builds, and is erased in release builds.
source§

fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
where Self: AsMut<R>, R: ?Sized,

Calls .tap_ref_mut() only in debug builds, and is erased in release builds.
source§

fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
where Self: Deref<Target = T>, T: ?Sized,

Calls .tap_deref() only in debug builds, and is erased in release builds.
source§

fn tap_deref_mut_dbg<T>(self, func: impl FnOnce(&mut T)) -> Self
where Self: DerefMut<Target = T> + Deref, T: ?Sized,

Calls .tap_deref_mut() only in debug builds, and is erased in release builds.
source§

impl<T> TryConv for T

source§

fn try_conv<T>(self) -> Result<T, Self::Error>
where Self: TryInto<T>,

Attempts to convert self into T using TryInto<T>. Read more
source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

source§

fn vzip(self) -> V

source§

impl<T> WithSubscriber for T

source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more