Struct mz_storage_client::controller::Controller
source · pub struct Controller<T: Timestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation> {
build_info: &'static BuildInfo,
state: StorageControllerState<T>,
internal_response_queue: UnboundedReceiver<StorageResponse<T>>,
persist_location: PersistLocation,
persist: Arc<PersistClientCache>,
metrics: StorageControllerMetrics,
}
Expand description
A storage controller for a storage instance.
Fields§
§build_info: &'static BuildInfo
The build information for this process.
state: StorageControllerState<T>
The state for the storage controller. TODO(benesch): why is this a separate struct?
internal_response_queue: UnboundedReceiver<StorageResponse<T>>
Mechanism for returning frontier advancement for tables.
persist_location: PersistLocation
The persist location where all storage collections are being written to
persist: Arc<PersistClientCache>
A persist client used to write to storage collections
metrics: StorageControllerMetrics
Metrics of the Storage controller
Implementations§
source§impl<T> Controller<T>where
T: Timestamp + Lattice + TotalOrder + Codec64 + From<EpochMillis> + TimestampManipulation,
StorageCommand<T>: RustType<ProtoStorageCommand>,
StorageResponse<T>: RustType<ProtoStorageResponse>,
Self: StorageController<Timestamp = T>,
impl<T> Controller<T>where
T: Timestamp + Lattice + TotalOrder + Codec64 + From<EpochMillis> + TimestampManipulation,
StorageCommand<T>: RustType<ProtoStorageCommand>,
StorageResponse<T>: RustType<ProtoStorageResponse>,
Self: StorageController<Timestamp = T>,
sourcepub(super) async fn is_shard_registered_for_finalization(
&mut self,
shard: ShardId
) -> bool
pub(super) async fn is_shard_registered_for_finalization(
&mut self,
shard: ShardId
) -> bool
true
if shard is in register for shards marked for finalization.
sourcepub(super) async fn register_shards_for_finalization<I>(&mut self, entries: I)where
I: IntoIterator<Item = ShardId>,
pub(super) async fn register_shards_for_finalization<I>(&mut self, entries: I)where
I: IntoIterator<Item = ShardId>,
Register shards for finalization. This must be called if you intend to finalize shards, before you perform any work to e.g. replace one shard with another.
The reasoning behind this is that we need to identify the intent to finalize a shard so we can perform the finalization on reboot if we crash and do not find the shard in use in any collection.
sourcepub(super) async fn clear_from_shard_finalization_register(
&mut self,
shards: BTreeSet<ShardId>
)
pub(super) async fn clear_from_shard_finalization_register(
&mut self,
shards: BTreeSet<ShardId>
)
Removes the shard from the finalization register.
This is appropriate to do if you can guarantee that the shard has been finalized or find the shard is still in use by some collection.
sourcepub(super) async fn reconcile_shards(&mut self)
pub(super) async fn reconcile_shards(&mut self)
Reconcile the state of SHARD_FINALIZATION_WAL
with
super::METADATA_COLLECTION
on boot.
source§impl<T> Controller<T>where
T: Timestamp + Lattice + TotalOrder + Codec64 + From<EpochMillis> + TimestampManipulation,
StorageCommand<T>: RustType<ProtoStorageCommand>,
StorageResponse<T>: RustType<ProtoStorageResponse>,
Self: StorageController<Timestamp = T>,
impl<T> Controller<T>where
T: Timestamp + Lattice + TotalOrder + Codec64 + From<EpochMillis> + TimestampManipulation,
StorageCommand<T>: RustType<ProtoStorageCommand>,
StorageResponse<T>: RustType<ProtoStorageResponse>,
Self: StorageController<Timestamp = T>,
sourcepub(super) fn remap_shard_migration(
&mut self,
durable_metadata: &BTreeMap<GlobalId, DurableCollectionMetadata>,
collections: &[(GlobalId, CollectionDescription<T>)]
) -> BTreeMap<GlobalId, DurableCollectionMetadata>
pub(super) fn remap_shard_migration(
&mut self,
durable_metadata: &BTreeMap<GlobalId, DurableCollectionMetadata>,
collections: &[(GlobalId, CollectionDescription<T>)]
) -> BTreeMap<GlobalId, DurableCollectionMetadata>
Determine the delta between durable_metadata
and collections
such
that:
- Each remap collection’s data shard is its parent collection’s remap shard.
- No collection contains a
Some
value for its remap shard.
Apply this delta using
Controller::upsert_collection_metadata(durable_metadata, <this function's return value>)
.
This approach is safe/backward compatible because:
- Every ingestion collection previously had a remap shard
- Every ingestion collection now has a progress collection subsource, whose data shard should be the remap shard
- No other type of collection used their remap shards, so dropping them entirely is essentially a nop.
MIGRATION: v0.44
source§impl<T> Controller<T>where
T: Timestamp + Lattice + TotalOrder + Codec64 + From<EpochMillis> + TimestampManipulation,
StorageCommand<T>: RustType<ProtoStorageCommand>,
StorageResponse<T>: RustType<ProtoStorageResponse>,
Self: StorageController<Timestamp = T>,
impl<T> Controller<T>where
T: Timestamp + Lattice + TotalOrder + Codec64 + From<EpochMillis> + TimestampManipulation,
StorageCommand<T>: RustType<ProtoStorageCommand>,
StorageResponse<T>: RustType<ProtoStorageResponse>,
Self: StorageController<Timestamp = T>,
sourcepub async fn new(
build_info: &'static BuildInfo,
postgres_url: String,
persist_location: PersistLocation,
persist_clients: Arc<PersistClientCache>,
now: NowFn,
postgres_factory: &StashFactory,
envd_epoch: NonZeroI64,
metrics_registry: MetricsRegistry
) -> Self
pub async fn new(
build_info: &'static BuildInfo,
postgres_url: String,
persist_location: PersistLocation,
persist_clients: Arc<PersistClientCache>,
now: NowFn,
postgres_factory: &StashFactory,
envd_epoch: NonZeroI64,
metrics_registry: MetricsRegistry
) -> Self
Create a new storage controller from a client it should wrap.
Note that when creating a new storage controller, you must also reconcile it with the previous state.
sourcefn validate_collection_ids(
&self,
ids: impl Iterator<Item = GlobalId>
) -> Result<(), StorageError>
fn validate_collection_ids(
&self,
ids: impl Iterator<Item = GlobalId>
) -> Result<(), StorageError>
Validate that a collection exists for all identifiers, and error if any do not.
sourcefn validate_export_ids(
&self,
ids: impl Iterator<Item = GlobalId>
) -> Result<(), StorageError>
fn validate_export_ids(
&self,
ids: impl Iterator<Item = GlobalId>
) -> Result<(), StorageError>
Validate that a collection exists for all identifiers, and error if any do not.
sourcefn determine_collection_since_joins(
&mut self,
collections: &[GlobalId]
) -> Result<Antichain<T>, StorageError>
fn determine_collection_since_joins(
&mut self,
collections: &[GlobalId]
) -> Result<Antichain<T>, StorageError>
Return the since frontier at which we can read from all the given collections.
The outer error is a potentially recoverable internal error, while the inner error is appropriate to return to the adapter.
sourcefn install_read_capabilities(
&mut self,
from_id: GlobalId,
storage_dependencies: &[GlobalId],
read_capability: Antichain<T>
) -> Result<(), StorageError>
fn install_read_capabilities(
&mut self,
from_id: GlobalId,
storage_dependencies: &[GlobalId],
read_capability: Antichain<T>
) -> Result<(), StorageError>
Install read capabilities on the given storage_dependencies
.
sourcefn remove_read_capabilities(
&mut self,
capability: Antichain<T>,
storage_dependencies: &[GlobalId]
)
fn remove_read_capabilities(
&mut self,
capability: Antichain<T>,
storage_dependencies: &[GlobalId]
)
Removes read holds that were previously acquired via
install_read_capabilities
.
Panics
This panics if there are no read capabilities at capability
for all
depended-upon collections.
sourceasync fn open_data_handles(
&self,
purpose: &str,
shard: ShardId,
relation_desc: RelationDesc,
persist_client: &PersistClient
) -> (WriteHandle<SourceData, (), T, Diff>, SinceHandle<SourceData, (), T, Diff, PersistEpoch>)
async fn open_data_handles(
&self,
purpose: &str,
shard: ShardId,
relation_desc: RelationDesc,
persist_client: &PersistClient
) -> (WriteHandle<SourceData, (), T, Diff>, SinceHandle<SourceData, (), T, Diff, PersistEpoch>)
Opens a write and critical since handles for the given shard
.
This will halt!
the process if we cannot successfully acquire a
critical handle with our current epoch.
sourceasync fn reconcile_managed_collection(
&self,
id: GlobalId,
updates: Vec<(Row, Diff)>
)
async fn reconcile_managed_collection(
&self,
id: GlobalId,
updates: Vec<(Row, Diff)>
)
Effectively truncates the data_shard
associated with global_id
effective as of the system time.
Panics
- If
id
does not belong to a collection or is not registered as a managed collection.
sourceasync fn append_to_managed_collection(
&self,
id: GlobalId,
updates: Vec<(Row, Diff)>
)
async fn append_to_managed_collection(
&self,
id: GlobalId,
updates: Vec<(Row, Diff)>
)
Append updates
to the data_shard
associated with global_id
effective as of the system time.
Panics
- If
id
is not registered as a managed collection.
sourceasync fn initialize_shard_mapping(&mut self)
async fn initialize_shard_mapping(&mut self)
Initializes the data expressing which global IDs correspond to which shards. Necessary because we cannot write any of these mappings that we discover before the shard mapping collection exists.
Panics
- If
IntrospectionType::ShardMapping
is not associated with aGlobalId
inself.state.introspection_ids
. - If
IntrospectionType::ShardMapping
’sGlobalId
is not registered as a managed collection.
sourceasync fn register_shard_mappings<I>(&self, global_ids: I)where
I: Iterator<Item = GlobalId>,
async fn register_shard_mappings<I>(&self, global_ids: I)where
I: Iterator<Item = GlobalId>,
Writes a new global ID, shard ID pair to the appropriate collection.
However, data is written iff we know of the GlobalId
of the
IntrospectionType::ShardMapping
collection; in other cases, data is
dropped on the floor. In these cases, the data is later written by
Self::initialize_shard_mapping
.
Panics
- If
self.state.collections
does not have an entry forglobal_id
. - If
IntrospectionType::ShardMapping
’sGlobalId
is not registered as a managed collection.
sourceasync fn upsert_collection_metadata(
&mut self,
all_current_metadata: &mut BTreeMap<GlobalId, DurableCollectionMetadata>,
upsert_state: BTreeMap<GlobalId, DurableCollectionMetadata>
)
async fn upsert_collection_metadata(
&mut self,
all_current_metadata: &mut BTreeMap<GlobalId, DurableCollectionMetadata>,
upsert_state: BTreeMap<GlobalId, DurableCollectionMetadata>
)
Updates the on-disk and in-memory representation of
DurableCollectionMetadata
(i.e. KV pairs in METADATA_COLLECTION
on-disk and all_current_metadata
as its in-memory representation) to
include that of upsert_state
, i.e. upserting the KV pairs in
upsert_state
into in all_current_metadata
, as well as
METADATA_COLLECTION
.
Any shards no longer referenced after the upsert will be finalized.
Note that this function expects to be called:
- While no source is currently using the shards identified in the current metadata.
- Before any sources begins using the shards identified in
new_metadata
.
sourceasync fn finalize_shards<I>(&mut self, shards: I)where
I: IntoIterator<Item = ShardId> + Clone,
async fn finalize_shards<I>(&mut self, shards: I)where
I: IntoIterator<Item = ShardId> + Clone,
Closes the identified shards from further reads or writes.
Trait Implementations§
source§impl<T: Debug + Timestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation> Debug for Controller<T>
impl<T: Debug + Timestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation> Debug for Controller<T>
source§impl<T> StorageController for Controller<T>where
T: Timestamp + Lattice + TotalOrder + Codec64 + From<EpochMillis> + TimestampManipulation,
StorageCommand<T>: RustType<ProtoStorageCommand>,
StorageResponse<T>: RustType<ProtoStorageResponse>,
MetadataExportFetcher: MetadataExport<T>,
DurableExportMetadata<T>: Data,
impl<T> StorageController for Controller<T>where
T: Timestamp + Lattice + TotalOrder + Codec64 + From<EpochMillis> + TimestampManipulation,
StorageCommand<T>: RustType<ProtoStorageCommand>,
StorageResponse<T>: RustType<ProtoStorageResponse>,
MetadataExportFetcher: MetadataExport<T>,
DurableExportMetadata<T>: Data,
source§fn drop_sinks(&mut self, identifiers: Vec<GlobalId>) -> Result<(), StorageError>
fn drop_sinks(&mut self, identifiers: Vec<GlobalId>) -> Result<(), StorageError>
Drops the read capability for the sinks and allows their resources to be reclaimed.
type Timestamp = T
source§fn initialization_complete(&mut self)
fn initialization_complete(&mut self)
source§fn update_configuration(&mut self, config_params: StorageParameters)
fn update_configuration(&mut self, config_params: StorageParameters)
source§fn collection(
&self,
id: GlobalId
) -> Result<&CollectionState<Self::Timestamp>, StorageError>
fn collection(
&self,
id: GlobalId
) -> Result<&CollectionState<Self::Timestamp>, StorageError>
source§fn collection_mut(
&mut self,
id: GlobalId
) -> Result<&mut CollectionState<Self::Timestamp>, StorageError>
fn collection_mut(
&mut self,
id: GlobalId
) -> Result<&mut CollectionState<Self::Timestamp>, StorageError>
source§fn collections(
&self
) -> Box<dyn Iterator<Item = (&GlobalId, &CollectionState<Self::Timestamp>)> + '_>
fn collections(
&self
) -> Box<dyn Iterator<Item = (&GlobalId, &CollectionState<Self::Timestamp>)> + '_>
source§fn create_instance(&mut self, id: StorageInstanceId)
fn create_instance(&mut self, id: StorageInstanceId)
source§fn drop_instance(&mut self, id: StorageInstanceId)
fn drop_instance(&mut self, id: StorageInstanceId)
source§fn connect_replica(
&mut self,
id: StorageInstanceId,
location: ClusterReplicaLocation
)
fn connect_replica(
&mut self,
id: StorageInstanceId,
location: ClusterReplicaLocation
)
source§fn migrate_collections<'life0, 'async_trait>(
&'life0 mut self,
collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>
) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn migrate_collections<'life0, 'async_trait>(
&'life0 mut self,
collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>
) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
source§fn create_collections<'life0, 'async_trait>(
&'life0 mut self,
collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>
) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn create_collections<'life0, 'async_trait>(
&'life0 mut self,
collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>
) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
source§fn export(
&self,
id: GlobalId
) -> Result<&ExportState<Self::Timestamp>, StorageError>
fn export(
&self,
id: GlobalId
) -> Result<&ExportState<Self::Timestamp>, StorageError>
source§fn export_mut(
&mut self,
id: GlobalId
) -> Result<&mut ExportState<Self::Timestamp>, StorageError>
fn export_mut(
&mut self,
id: GlobalId
) -> Result<&mut ExportState<Self::Timestamp>, StorageError>
source§fn prepare_export(
&mut self,
id: GlobalId,
from_id: GlobalId
) -> Result<CreateExportToken<T>, StorageError>
fn prepare_export(
&mut self,
id: GlobalId,
from_id: GlobalId
) -> Result<CreateExportToken<T>, StorageError>
source§fn cancel_prepare_export(&mut self, _: CreateExportToken<T>)
fn cancel_prepare_export(&mut self, _: CreateExportToken<T>)
source§fn create_exports<'life0, 'async_trait>(
&'life0 mut self,
exports: Vec<(CreateExportToken<Self::Timestamp>, ExportDescription<Self::Timestamp>)>
) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn create_exports<'life0, 'async_trait>(
&'life0 mut self,
exports: Vec<(CreateExportToken<Self::Timestamp>, ExportDescription<Self::Timestamp>)>
) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
ExportDescription
.source§fn drop_sources(
&mut self,
identifiers: Vec<GlobalId>
) -> Result<(), StorageError>
fn drop_sources(
&mut self,
identifiers: Vec<GlobalId>
) -> Result<(), StorageError>
source§fn drop_sources_unvalidated(&mut self, identifiers: Vec<GlobalId>)
fn drop_sources_unvalidated(&mut self, identifiers: Vec<GlobalId>)
source§fn drop_sinks_unvalidated(&mut self, identifiers: Vec<GlobalId>)
fn drop_sinks_unvalidated(&mut self, identifiers: Vec<GlobalId>)
source§fn append(
&mut self,
commands: Vec<(GlobalId, Vec<Update<Self::Timestamp>>, Self::Timestamp)>
) -> Result<Receiver<Result<(), StorageError>>, StorageError>
fn append(
&mut self,
commands: Vec<(GlobalId, Vec<Update<Self::Timestamp>>, Self::Timestamp)>
) -> Result<Receiver<Result<(), StorageError>>, StorageError>
source§fn snapshot<'life0, 'async_trait>(
&'life0 self,
id: GlobalId,
as_of: Self::Timestamp
) -> Pin<Box<dyn Future<Output = Result<Vec<(Row, Diff)>, StorageError>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn snapshot<'life0, 'async_trait>(
&'life0 self,
id: GlobalId,
as_of: Self::Timestamp
) -> Pin<Box<dyn Future<Output = Result<Vec<(Row, Diff)>, StorageError>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
id
at as_of
.source§fn set_read_policy(
&mut self,
policies: Vec<(GlobalId, ReadPolicy<Self::Timestamp>)>
)
fn set_read_policy(
&mut self,
policies: Vec<(GlobalId, ReadPolicy<Self::Timestamp>)>
)
source§fn update_write_frontiers(
&mut self,
updates: &[(GlobalId, Antichain<Self::Timestamp>)]
)
fn update_write_frontiers(
&mut self,
updates: &[(GlobalId, Antichain<Self::Timestamp>)]
)
StorageController::update_read_capabilities
. Read moresource§fn update_read_capabilities(
&mut self,
updates: &mut BTreeMap<GlobalId, ChangeBatch<Self::Timestamp>>
)
fn update_read_capabilities(
&mut self,
updates: &mut BTreeMap<GlobalId, ChangeBatch<Self::Timestamp>>
)
updates
and sends any appropriate compaction command.source§fn ready<'life0, 'async_trait>(
&'life0 mut self
) -> Pin<Box<dyn Future<Output = ()> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn ready<'life0, 'async_trait>(
&'life0 mut self
) -> Pin<Box<dyn Future<Output = ()> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
source§fn process<'life0, 'async_trait>(
&'life0 mut self
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn process<'life0, 'async_trait>(
&'life0 mut self
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
StorageController::ready
. Read moresource§fn reconcile_state<'life0, 'async_trait>(
&'life0 mut self
) -> Pin<Box<dyn Future<Output = ()> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn reconcile_state<'life0, 'async_trait>(
&'life0 mut self
) -> Pin<Box<dyn Future<Output = ()> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Auto Trait Implementations§
impl<T> !RefUnwindSafe for Controller<T>
impl<T> Send for Controller<T>
impl<T> Sync for Controller<T>
impl<T> Unpin for Controller<T>where
T: Unpin,
impl<T> !UnwindSafe for Controller<T>
Blanket Implementations§
source§impl<T> FutureExt for T
impl<T> FutureExt for T
source§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
source§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
source§impl<T> Instrument for T
impl<T> Instrument for T
source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T
in a tonic::Request
source§impl<T> Pointable for T
impl<T> Pointable for T
source§impl<P, R> ProtoType<R> for Pwhere
R: RustType<P>,
impl<P, R> ProtoType<R> for Pwhere
R: RustType<P>,
source§fn into_rust(self) -> Result<R, TryFromProtoError>
fn into_rust(self) -> Result<R, TryFromProtoError>
RustType::from_proto
.source§fn from_rust(rust: &R) -> P
fn from_rust(rust: &R) -> P
RustType::into_proto
.