pub struct Controller<T: Timestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation> {
    build_info: &'static BuildInfo,
    state: StorageControllerState<T>,
    internal_response_queue: UnboundedReceiver<StorageResponse<T>>,
    persist_location: PersistLocation,
    persist: Arc<PersistClientCache>,
    metrics: StorageControllerMetrics,
    internal_response_sender: UnboundedSender<StorageResponse<T>>,
    recorded_frontiers: BTreeMap<(GlobalId, Option<ReplicaId>), Antichain<T>>,
}
Expand description

A storage controller for a storage instance.

Fields§

§build_info: &'static BuildInfo

The build information for this process.

§state: StorageControllerState<T>

The state for the storage controller. TODO(benesch): why is this a separate struct?

§internal_response_queue: UnboundedReceiver<StorageResponse<T>>

Mechanism for returning frontier advancement for tables.

§persist_location: PersistLocation

The persist location where all storage collections are being written to

§persist: Arc<PersistClientCache>

A persist client used to write to storage collections

§metrics: StorageControllerMetrics

Metrics of the Storage controller

§internal_response_sender: UnboundedSender<StorageResponse<T>>

Mechanism for the storage controller to send itself feedback, potentially emulating the responses we expect from clusters.

Note: This is used for finalizing shards of webhook sources, once webhook sources are installed on a clusterd this can likely be refactored away.

§recorded_frontiers: BTreeMap<(GlobalId, Option<ReplicaId>), Antichain<T>>

Frontiers that have been recorded in the Frontiers collection, kept to be able to retract old rows.

Implementations§

source§

impl<T> Controller<T>where T: Timestamp + Lattice + TotalOrder + Codec64 + From<EpochMillis> + TimestampManipulation, StorageCommand<T>: RustType<ProtoStorageCommand>, StorageResponse<T>: RustType<ProtoStorageResponse>, Self: StorageController<Timestamp = T>,

source

pub(super) async fn register_shards_for_finalization<I>(&mut self, entries: I)where I: IntoIterator<Item = ShardId>,

Register shards for finalization. This must be called if you intend to finalize shards, before you perform any work to e.g. replace one shard with another.

The reasoning behind this is that we need to identify the intent to finalize a shard so we can perform the finalization on reboot if we crash and do not find the shard in use in any collection.

source

pub(super) async fn clear_from_shard_finalization_register( &mut self, shards: BTreeSet<ShardId> )

Removes the shard from the finalization register.

This is appropriate to do if you can guarantee that the shard has been finalized or find the shard is still in use by some collection.

source

pub(super) async fn reconcile_state_inner(&mut self)

source§

impl<T> Controller<T>where T: Timestamp + Lattice + TotalOrder + Codec64 + From<EpochMillis> + TimestampManipulation, StorageCommand<T>: RustType<ProtoStorageCommand>, StorageResponse<T>: RustType<ProtoStorageResponse>, Self: StorageController<Timestamp = T>,

source

pub async fn new( build_info: &'static BuildInfo, postgres_url: String, persist_location: PersistLocation, persist_clients: Arc<PersistClientCache>, now: NowFn, postgres_factory: &StashFactory, envd_epoch: NonZeroI64, metrics_registry: MetricsRegistry ) -> Self

Create a new storage controller from a client it should wrap.

Note that when creating a new storage controller, you must also reconcile it with the previous state.

source

fn validate_collection_ids( &self, ids: impl Iterator<Item = GlobalId> ) -> Result<(), StorageError>

Validate that a collection exists for all identifiers, and error if any do not.

source

fn validate_export_ids( &self, ids: impl Iterator<Item = GlobalId> ) -> Result<(), StorageError>

Validate that a collection exists for all identifiers, and error if any do not.

source

fn active_collections( &self ) -> impl Iterator<Item = (GlobalId, &CollectionState<T>)>

Iterate over collections that have not been dropped.

source

fn determine_collection_since_joins( &self, collections: &[GlobalId] ) -> Result<Antichain<T>, StorageError>

Return the since frontier at which we can read from all the given collections.

The outer error is a potentially recoverable internal error, while the inner error is appropriate to return to the adapter.

source

fn install_read_capabilities( &mut self, from_id: GlobalId, storage_dependencies: &[GlobalId], read_capability: Antichain<T> ) -> Result<(), StorageError>

Install read capabilities on the given storage_dependencies.

source

fn remove_read_capabilities( &mut self, capability: Antichain<T>, storage_dependencies: &[GlobalId] )

Removes read holds that were previously acquired via install_read_capabilities.

Panics

This panics if there are no read capabilities at capability for all depended-upon collections.

source

async fn open_data_handles( &self, id: &GlobalId, shard: ShardId, since: Option<&Antichain<T>>, relation_desc: RelationDesc, persist_client: &PersistClient ) -> (WriteHandle<SourceData, (), T, Diff>, SinceHandle<SourceData, (), T, Diff, PersistEpoch>)

Opens a write and critical since handles for the given shard.

since is an optional since that the read handle will be forwarded to if it is less than its current since.

This will halt! the process if we cannot successfully acquire a critical handle with our current epoch.

source

async fn reconcile_managed_collection( &self, id: GlobalId, updates: Vec<(Row, Diff)> )

Effectively truncates the data_shard associated with global_id effective as of the system time.

Panics
  • If id does not belong to a collection or is not registered as a managed collection.
source

async fn append_to_managed_collection( &self, id: GlobalId, updates: Vec<(Row, Diff)> )

Append updates to the data_shard associated with global_id effective as of the system time.

Panics
  • If id is not registered as a managed collection.
source

async fn initialize_shard_mapping(&mut self)

Initializes the data expressing which global IDs correspond to which shards. Necessary because we cannot write any of these mappings that we discover before the shard mapping collection exists.

Panics
  • If IntrospectionType::ShardMapping is not associated with a GlobalId in self.state.introspection_ids.
  • If IntrospectionType::ShardMapping’s GlobalId is not registered as a managed collection.
source

async fn partially_truncate_status_history( &mut self, collection: IntrospectionType )

Effectively truncates the source status history shard except for the most recent updates from each ID.

source

async fn append_shard_mappings<I>(&self, global_ids: I, diff: i64)where I: Iterator<Item = GlobalId>,

Appends a new global ID, shard ID pair to the appropriate collection. Use a diff of 1 to append a new entry; -1 to retract an existing entry.

However, data is written iff we know of the GlobalId of the IntrospectionType::ShardMapping collection; in other cases, data is dropped on the floor. In these cases, the data is later written by Self::initialize_shard_mapping.

Panics
  • If self.state.collections does not have an entry for global_id.
  • If IntrospectionType::ShardMapping’s GlobalId is not registered as a managed collection.
  • If diff is any value other than 1 or -1.
source

async fn upsert_collection_metadata( &mut self, all_current_metadata: &mut BTreeMap<GlobalId, DurableCollectionMetadata>, upsert_state: BTreeMap<GlobalId, DurableCollectionMetadata> )

Updates the on-disk and in-memory representation of DurableCollectionMetadata (i.e. KV pairs in METADATA_COLLECTION on-disk and all_current_metadata as its in-memory representation) to include that of upsert_state, i.e. upserting the KV pairs in upsert_state into in all_current_metadata, as well as METADATA_COLLECTION.

Any shards no longer referenced after the upsert will be finalized.

Note that this function expects to be called:

  • While no source is currently using the shards identified in the current metadata.
  • Before any sources begins using the shards identified in new_metadata.

We allow this being kept around as dead code because we might want to perform similar migration in the future.

source

async fn finalize_shards(&mut self)

Attempts to close all shards marked for finalization.

source

fn check_alter_collection_inner( &self, id: GlobalId, ingestion: IngestionDescription ) -> Result<(), StorageError>

Determines if an ALTER is valid.

source

fn install_dependency_read_holds<I: Iterator<Item = GlobalId>>( &mut self, collections: I, storage_dependencies: &[GlobalId] ) -> Result<(), StorageError>

For each element of collections, install a read hold on all of the storage_dependencies.

Note that this adjustment is only guaranteed to be reflected in memory; downgrades to persist shards are not guaranteed to occur unless they close the shard.

Panics
  • If any identified collection’s since is less than the dependency since and:
    • Its read policy is not ReadPolicy::NoPolicy

    • Its read policy is ReadPolicy::NoPolicy(f) and the dependency since is <= f.

    • Its write frontier is neither T::minimum nor beyond the dependency since.

  • If any identified collection’s data source is not [DataSource::Ingestion] (primary source) or [DataSource::Other`] (subsources).
source

fn enrich_ingestion( &self, id: GlobalId, ingestion: IngestionDescription ) -> Result<IngestionDescription<CollectionMetadata>, StorageError>

Converts an IngestionDescription<()> into IngestionDescription<CollectionMetadata>.

Trait Implementations§

source§

impl<T: Debug + Timestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation> Debug for Controller<T>

source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
source§

impl<T> StorageController for Controller<T>where T: Timestamp + Lattice + TotalOrder + Codec64 + From<EpochMillis> + TimestampManipulation + Into<Timestamp>, StorageCommand<T>: RustType<ProtoStorageCommand>, StorageResponse<T>: RustType<ProtoStorageResponse>, MetadataExportFetcher: MetadataExport<T>, DurableExportMetadata<T>: RustType<DurableExportMetadata>,

source§

fn drop_sinks(&mut self, identifiers: Vec<GlobalId>) -> Result<(), StorageError>

Drops the read capability for the sinks and allows their resources to be reclaimed.

§

type Timestamp = T

source§

fn initialization_complete(&mut self)

Marks the end of any initialization commands. Read more
source§

fn update_configuration(&mut self, config_params: StorageParameters)

Update storage configuration.
source§

fn collection( &self, id: GlobalId ) -> Result<&CollectionState<Self::Timestamp>, StorageError>

Acquire an immutable reference to the collection state, should it exist.
source§

fn collection_mut( &mut self, id: GlobalId ) -> Result<&mut CollectionState<Self::Timestamp>, StorageError>

Acquire a mutable reference to the collection state, should it exist.
source§

fn collections( &self ) -> Box<dyn Iterator<Item = (&GlobalId, &CollectionState<Self::Timestamp>)> + '_>

Acquire an iterator over all collection states.
source§

fn create_instance( &mut self, id: StorageInstanceId, variable_length_row_encoding: bool )

Creates a storage instance with the specified ID. Read more
source§

fn drop_instance(&mut self, id: StorageInstanceId)

Drops the storage instance with the given ID. Read more
source§

fn connect_replica( &mut self, instance_id: StorageInstanceId, replica_id: ReplicaId, location: ClusterReplicaLocation )

Connects the storage instance to the specified replica. Read more
source§

fn drop_replica( &mut self, instance_id: StorageInstanceId, _replica_id: ReplicaId )

Disconnects the storage instance from the specified replica.
source§

fn migrate_collections<'life0, 'async_trait>( &'life0 mut self, _collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)> ) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + 'async_trait>>where Self: 'async_trait, 'life0: 'async_trait,

Migrate any storage controller state from previous versions to this version’s expectations. Read more
source§

fn create_collections<'life0, 'async_trait>( &'life0 mut self, collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)> ) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + 'async_trait>>where Self: 'async_trait, 'life0: 'async_trait,

Create the sources described in the individual RunIngestionCommand commands. Read more
source§

fn check_alter_collection( &mut self, id: GlobalId, ingestion: IngestionDescription ) -> Result<(), StorageError>

Check that the collection associated with id can be altered to represent the given ingestion. Read more
source§

fn alter_collection<'life0, 'async_trait>( &'life0 mut self, id: GlobalId, ingestion: IngestionDescription ) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + 'async_trait>>where Self: 'async_trait, 'life0: 'async_trait,

Alter the identified collection to use the described ingestion.
source§

fn export( &self, id: GlobalId ) -> Result<&ExportState<Self::Timestamp>, StorageError>

Acquire an immutable reference to the export state, should it exist.
source§

fn export_mut( &mut self, id: GlobalId ) -> Result<&mut ExportState<Self::Timestamp>, StorageError>

Acquire a mutable reference to the export state, should it exist.
source§

fn prepare_export( &mut self, id: GlobalId, from_id: GlobalId ) -> Result<CreateExportToken<T>, StorageError>

Notify the storage controller to prepare for an export to be created
source§

fn cancel_prepare_export(&mut self, _: CreateExportToken<T>)

Cancel the pending export
source§

fn create_exports<'life0, 'async_trait>( &'life0 mut self, exports: Vec<(CreateExportToken<Self::Timestamp>, ExportDescription<Self::Timestamp>)> ) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + 'async_trait>>where Self: 'async_trait, 'life0: 'async_trait,

Create the sinks described by the ExportDescription.
source§

fn drop_sources( &mut self, identifiers: Vec<GlobalId> ) -> Result<(), StorageError>

Drops the read capability for the sources and allows their resources to be reclaimed.
source§

fn drop_sources_unvalidated(&mut self, identifiers: Vec<GlobalId>)

Drops the read capability for the sources and allows their resources to be reclaimed. Read more
source§

fn drop_sinks_unvalidated(&mut self, identifiers: Vec<GlobalId>)

Drops the read capability for the sinks and allows their resources to be reclaimed. Read more
source§

fn append_table( &mut self, commands: Vec<(GlobalId, Vec<Update<Self::Timestamp>>, Self::Timestamp)> ) -> Result<Receiver<Result<(), StorageError>>, StorageError>

Append updates into the local input named id and advance its upper to upper. Read more
source§

fn monotonic_appender( &self, id: GlobalId ) -> Result<MonotonicAppender, StorageError>

Returns a MonotonicAppender which is a oneshot-esque struct that can be used to monotonically append to the specified GlobalId.
source§

fn snapshot<'life0, 'async_trait>( &'life0 self, id: GlobalId, as_of: Self::Timestamp ) -> Pin<Box<dyn Future<Output = Result<Vec<(Row, Diff)>, StorageError>> + 'async_trait>>where Self: 'async_trait, 'life0: 'async_trait,

Returns the snapshot of the contents of the local input named id at as_of.
source§

fn snapshot_stats<'life0, 'async_trait>( &'life0 self, id: GlobalId, as_of: Antichain<Self::Timestamp> ) -> Pin<Box<dyn Future<Output = Result<SnapshotStats<Self::Timestamp>, StorageError>> + 'async_trait>>where Self: 'async_trait, 'life0: 'async_trait,

Returns aggregate statistics about the contents of the local input named id at as_of.
source§

fn set_read_policy( &mut self, policies: Vec<(GlobalId, ReadPolicy<Self::Timestamp>)> )

Assigns a read policy to specific identifiers. Read more
source§

fn update_write_frontiers( &mut self, updates: &[(GlobalId, Antichain<Self::Timestamp>)] )

Ingests write frontier updates for collections that this controller maintains and potentially generates updates to read capabilities, which are passed on to StorageController::update_read_capabilities. Read more
source§

fn update_read_capabilities( &mut self, updates: &mut BTreeMap<GlobalId, ChangeBatch<Self::Timestamp>> )

Applies updates and sends any appropriate compaction command.
source§

fn ready<'life0, 'async_trait>( &'life0 mut self ) -> Pin<Box<dyn Future<Output = ()> + 'async_trait>>where Self: 'async_trait, 'life0: 'async_trait,

Waits until the controller is ready to process a response. Read more
source§

fn process<'life0, 'async_trait>( &'life0 mut self ) -> Pin<Box<dyn Future<Output = Result<(), Error>> + 'async_trait>>where Self: 'async_trait, 'life0: 'async_trait,

Processes the work queued by StorageController::ready. Read more
source§

fn reconcile_state<'life0, 'async_trait>( &'life0 mut self ) -> Pin<Box<dyn Future<Output = ()> + 'async_trait>>where Self: 'async_trait, 'life0: 'async_trait,

Signal to the controller that the adapter has populated all of its initial state and the controller can reconcile (i.e. drop) any unclaimed resources.
source§

fn inspect_persist_state<'life0, 'async_trait>( &'life0 self, id: GlobalId ) -> Pin<Box<dyn Future<Output = Result<Value, Error>> + 'async_trait>>where Self: 'async_trait, 'life0: 'async_trait,

Exposes the internal state of the data shard for debugging and QA. Read more
source§

fn record_frontiers<'life0, 'async_trait>( &'life0 mut self, external_frontiers: BTreeMap<(GlobalId, ReplicaId), Antichain<Self::Timestamp>> ) -> Pin<Box<dyn Future<Output = ()> + 'async_trait>>where Self: 'async_trait, 'life0: 'async_trait,

Records the current frontiers of all known storage objects. Read more

Auto Trait Implementations§

§

impl<T> !RefUnwindSafe for Controller<T>

§

impl<T> Send for Controller<T>

§

impl<T> Sync for Controller<T>

§

impl<T> Unpin for Controller<T>where T: Unpin,

§

impl<T> !UnwindSafe for Controller<T>

Blanket Implementations§

source§

impl<T> Any for Twhere T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for Twhere T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for Twhere T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T> FutureExt for T

source§

fn with_context(self, otel_cx: Context) -> WithContext<Self>

Attaches the provided Context to this type, returning a WithContext wrapper. Read more
source§

fn with_current_context(self) -> WithContext<Self>

Attaches the current Context to this type, returning a WithContext wrapper. Read more
source§

impl<T> Instrument for T

source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
source§

impl<T, U> Into<U> for Twhere U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T> IntoRequest<T> for T

source§

fn into_request(self) -> Request<T>

Wrap the input message T in a tonic::Request
source§

impl<T> Pointable for T

source§

const ALIGN: usize = mem::align_of::<T>()

The alignment of pointer.
§

type Init = T

The type for initializers.
source§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
source§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
source§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
source§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
source§

impl<P, R> ProtoType<R> for Pwhere R: RustType<P>,

source§

impl<T> Same<T> for T

§

type Output = T

Should always be Self
source§

impl<T, U> TryFrom<U> for Twhere U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for Twhere U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
source§

impl<V, T> VZip<V> for Twhere V: MultiLane<T>,

source§

fn vzip(self) -> V

source§

impl<T> WithSubscriber for T

source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more