Struct mz_storage::controller::Controller
source · [−]pub struct Controller<T: Timestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation> {
state: StorageControllerState<T>,
hosts: StorageHosts<T>,
internal_response_queue: UnboundedReceiver<StorageResponse<T>>,
persist_location: PersistLocation,
persist: Arc<Mutex<PersistClientCache>>,
}
Expand description
A storage controller for a storage instance.
Fields
state: StorageControllerState<T>
hosts: StorageHosts<T>
Storage host provisioning and storage object assignment.
internal_response_queue: UnboundedReceiver<StorageResponse<T>>
Mechanism for returning frontier advancement for tables.
persist_location: PersistLocation
The persist location where all storage collections are being written to
persist: Arc<Mutex<PersistClientCache>>
A persist client used to write to storage collections
Implementations
sourceimpl<T> Controller<T>where
T: Timestamp + Lattice + TotalOrder + Codec64 + From<EpochMillis> + TimestampManipulation,
StorageCommand<T>: RustType<ProtoStorageCommand>,
StorageResponse<T>: RustType<ProtoStorageResponse>,
impl<T> Controller<T>where
T: Timestamp + Lattice + TotalOrder + Codec64 + From<EpochMillis> + TimestampManipulation,
StorageCommand<T>: RustType<ProtoStorageCommand>,
StorageResponse<T>: RustType<ProtoStorageResponse>,
sourcepub async fn new(
build_info: &'static BuildInfo,
postgres_url: String,
persist_location: PersistLocation,
persist_clients: Arc<Mutex<PersistClientCache>>,
orchestrator: Arc<dyn NamespacedOrchestrator>,
storaged_image: String,
now: NowFn
) -> Self
pub async fn new(
build_info: &'static BuildInfo,
postgres_url: String,
persist_location: PersistLocation,
persist_clients: Arc<Mutex<PersistClientCache>>,
orchestrator: Arc<dyn NamespacedOrchestrator>,
storaged_image: String,
now: NowFn
) -> Self
Create a new storage controller from a client it should wrap.
sourceimpl<T> Controller<T>where
T: Timestamp + Lattice + TotalOrder + Codec64 + From<EpochMillis> + TimestampManipulation,
StorageCommand<T>: RustType<ProtoStorageCommand>,
StorageResponse<T>: RustType<ProtoStorageResponse>,
Self: StorageController<Timestamp = T>,
impl<T> Controller<T>where
T: Timestamp + Lattice + TotalOrder + Codec64 + From<EpochMillis> + TimestampManipulation,
StorageCommand<T>: RustType<ProtoStorageCommand>,
StorageResponse<T>: RustType<ProtoStorageResponse>,
Self: StorageController<Timestamp = T>,
sourcefn validate_collection_ids(
&self,
ids: impl Iterator<Item = GlobalId>
) -> Result<(), StorageError>
fn validate_collection_ids(
&self,
ids: impl Iterator<Item = GlobalId>
) -> Result<(), StorageError>
Validate that a collection exists for all identifiers, and error if any do not.
sourcefn validate_export_ids(
&self,
ids: impl Iterator<Item = GlobalId>
) -> Result<(), StorageError>
fn validate_export_ids(
&self,
ids: impl Iterator<Item = GlobalId>
) -> Result<(), StorageError>
Validate that a collection exists for all identifiers, and error if any do not.
fn generate_new_capability_for_collection<F>(
&mut self,
id: GlobalId,
f: F
) -> Result<ChangeBatch<<Self as StorageController>::Timestamp>, StorageError>where
F: FnOnce(&mut CollectionState<<Self as StorageController>::Timestamp>),
Trait Implementations
sourceimpl<T> CollectionManagement for Controller<T>where
T: Timestamp + Lattice + TotalOrder + Codec64 + From<EpochMillis> + TimestampManipulation,
StorageCommand<T>: RustType<ProtoStorageCommand>,
StorageResponse<T>: RustType<ProtoStorageResponse>,
MetadataExportFetcher: MetadataExport<T>,
DurableExportMetadata<T>: Data,
impl<T> CollectionManagement for Controller<T>where
T: Timestamp + Lattice + TotalOrder + Codec64 + From<EpochMillis> + TimestampManipulation,
StorageCommand<T>: RustType<ProtoStorageCommand>,
StorageResponse<T>: RustType<ProtoStorageResponse>,
MetadataExportFetcher: MetadataExport<T>,
DurableExportMetadata<T>: Data,
sourcefn truncate_managed_collection<'life0, 'async_trait>(
&'life0 mut self,
id: GlobalId
) -> Pin<Box<dyn Future<Output = ()> + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
fn truncate_managed_collection<'life0, 'async_trait>(
&'life0 mut self,
id: GlobalId
) -> Pin<Box<dyn Future<Output = ()> + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
Effectively truncates the data_shard
associated with global_id
effective as of the system time.
Panics
- If
id
does not belong to a collection or is not registered as a managed collection.
sourcefn append_to_managed_collection<'life0, 'async_trait>(
&'life0 mut self,
id: GlobalId,
updates: Vec<(Row, Diff)>
) -> Pin<Box<dyn Future<Output = ()> + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
fn append_to_managed_collection<'life0, 'async_trait>(
&'life0 mut self,
id: GlobalId,
updates: Vec<(Row, Diff)>
) -> Pin<Box<dyn Future<Output = ()> + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
Append updates
to the data_shard
associated with global_id
effective as of the system time.
Panics
- If
id
is not registered as a managed collection.
sourcefn initialize_shard_mapping<'life0, 'async_trait>(
&'life0 mut self
) -> Pin<Box<dyn Future<Output = ()> + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
fn initialize_shard_mapping<'life0, 'async_trait>(
&'life0 mut self
) -> Pin<Box<dyn Future<Output = ()> + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
Append updates
to the data_shard
correlated with global_id
effective as of the system time.
Panics
- If
IntrospectionType::ShardMapping
is not correlated with aGlobalId
. - If
IntrospectionType::ShardMapping
’sGlobalId
is not registered as a managed collection.
sourcefn register_shard_mapping<'life0, 'async_trait>(
&'life0 mut self,
global_id: GlobalId
) -> Pin<Box<dyn Future<Output = ()> + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
fn register_shard_mapping<'life0, 'async_trait>(
&'life0 mut self,
global_id: GlobalId
) -> Pin<Box<dyn Future<Output = ()> + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
Tracks the mapping of GlobalId
to data shards in the collection at
self.state.shard_collection_global_id
.
However, data is written iff we know of the GlobalId
of the
IntrospectionType::ShardMapping
collection; in other cases, data is
dropped on the floor. In these cases, the data is later written by
Self::initialize_shard_mapping
.
Panics
- If
self.state.collections
does not have an entry forglobal_id
. - If
IntrospectionType::ShardMapping
’sGlobalId
is not registered as a managed collection.
sourceimpl<T: Debug + Timestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation> Debug for Controller<T>
impl<T: Debug + Timestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation> Debug for Controller<T>
sourceimpl<T> StorageController for Controller<T>where
T: Timestamp + Lattice + TotalOrder + Codec64 + From<EpochMillis> + TimestampManipulation,
StorageCommand<T>: RustType<ProtoStorageCommand>,
StorageResponse<T>: RustType<ProtoStorageResponse>,
MetadataExportFetcher: MetadataExport<T>,
DurableExportMetadata<T>: Data,
impl<T> StorageController for Controller<T>where
T: Timestamp + Lattice + TotalOrder + Codec64 + From<EpochMillis> + TimestampManipulation,
StorageCommand<T>: RustType<ProtoStorageCommand>,
StorageResponse<T>: RustType<ProtoStorageResponse>,
MetadataExportFetcher: MetadataExport<T>,
DurableExportMetadata<T>: Data,
sourcefn drop_sinks<'life0, 'async_trait>(
&'life0 mut self,
identifiers: Vec<GlobalId>
) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
fn drop_sinks<'life0, 'async_trait>(
&'life0 mut self,
identifiers: Vec<GlobalId>
) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
Drops the read capability for the sinks and allows their resources to be reclaimed.
type Timestamp = T
sourcefn initialization_complete(&mut self)
fn initialization_complete(&mut self)
sourcefn collection(
&self,
id: GlobalId
) -> Result<&CollectionState<Self::Timestamp>, StorageError>
fn collection(
&self,
id: GlobalId
) -> Result<&CollectionState<Self::Timestamp>, StorageError>
sourcefn collection_mut(
&mut self,
id: GlobalId
) -> Result<&mut CollectionState<Self::Timestamp>, StorageError>
fn collection_mut(
&mut self,
id: GlobalId
) -> Result<&mut CollectionState<Self::Timestamp>, StorageError>
sourcefn create_collections<'life0, 'async_trait>(
&'life0 mut self,
collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>
) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
fn create_collections<'life0, 'async_trait>(
&'life0 mut self,
collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>
) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
fn alter_collections<'life0, 'async_trait>(
&'life0 mut self,
collections: Vec<(GlobalId, StorageHostConfig)>
) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
sourcefn export(
&self,
id: GlobalId
) -> Result<&ExportState<Self::Timestamp>, StorageError>
fn export(
&self,
id: GlobalId
) -> Result<&ExportState<Self::Timestamp>, StorageError>
sourcefn export_mut(
&mut self,
id: GlobalId
) -> Result<&mut ExportState<Self::Timestamp>, StorageError>
fn export_mut(
&mut self,
id: GlobalId
) -> Result<&mut ExportState<Self::Timestamp>, StorageError>
sourcefn prepare_export<'life0, 'async_trait>(
&'life0 mut self,
id: GlobalId,
from_id: GlobalId
) -> Pin<Box<dyn Future<Output = Result<CreateExportToken, StorageError>> + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
fn prepare_export<'life0, 'async_trait>(
&'life0 mut self,
id: GlobalId,
from_id: GlobalId
) -> Pin<Box<dyn Future<Output = Result<CreateExportToken, StorageError>> + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
sourcefn cancel_prepare_export<'life0, 'async_trait>(
&'life0 mut self,
__arg1: CreateExportToken
) -> Pin<Box<dyn Future<Output = ()> + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
fn cancel_prepare_export<'life0, 'async_trait>(
&'life0 mut self,
__arg1: CreateExportToken
) -> Pin<Box<dyn Future<Output = ()> + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
sourcefn create_exports<'life0, 'async_trait>(
&'life0 mut self,
exports: Vec<(CreateExportToken, ExportDescription<Self::Timestamp>)>
) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
fn create_exports<'life0, 'async_trait>(
&'life0 mut self,
exports: Vec<(CreateExportToken, ExportDescription<Self::Timestamp>)>
) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
ExportDescription
.sourcefn drop_sources<'life0, 'async_trait>(
&'life0 mut self,
identifiers: Vec<GlobalId>
) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
fn drop_sources<'life0, 'async_trait>(
&'life0 mut self,
identifiers: Vec<GlobalId>
) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
sourcefn drop_sources_unvalidated<'life0, 'async_trait>(
&'life0 mut self,
identifiers: Vec<GlobalId>
) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
fn drop_sources_unvalidated<'life0, 'async_trait>(
&'life0 mut self,
identifiers: Vec<GlobalId>
) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
sourcefn drop_sinks_unvalidated<'life0, 'async_trait>(
&'life0 mut self,
identifiers: Vec<GlobalId>
) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
fn drop_sinks_unvalidated<'life0, 'async_trait>(
&'life0 mut self,
identifiers: Vec<GlobalId>
) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
sourcefn append(
&mut self,
commands: Vec<(GlobalId, Vec<Update<Self::Timestamp>>, Self::Timestamp)>
) -> Result<Receiver<Result<(), StorageError>>, StorageError>
fn append(
&mut self,
commands: Vec<(GlobalId, Vec<Update<Self::Timestamp>>, Self::Timestamp)>
) -> Result<Receiver<Result<(), StorageError>>, StorageError>
sourcefn snapshot<'life0, 'async_trait>(
&'life0 mut self,
id: GlobalId,
as_of: Self::Timestamp
) -> Pin<Box<dyn Future<Output = Result<Vec<(Row, Diff)>, StorageError>> + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
fn snapshot<'life0, 'async_trait>(
&'life0 mut self,
id: GlobalId,
as_of: Self::Timestamp
) -> Pin<Box<dyn Future<Output = Result<Vec<(Row, Diff)>, StorageError>> + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
id
at as_of
.sourcefn set_read_policy<'life0, 'async_trait>(
&'life0 mut self,
policies: Vec<(GlobalId, ReadPolicy<Self::Timestamp>)>
) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
fn set_read_policy<'life0, 'async_trait>(
&'life0 mut self,
policies: Vec<(GlobalId, ReadPolicy<Self::Timestamp>)>
) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
sourcefn update_write_frontiers<'life0, 'life1, 'async_trait>(
&'life0 mut self,
updates: &'life1 [(GlobalId, Antichain<Self::Timestamp>)]
) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
fn update_write_frontiers<'life0, 'life1, 'async_trait>(
&'life0 mut self,
updates: &'life1 [(GlobalId, Antichain<Self::Timestamp>)]
) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
StorageController::update_read_capabilities
. Read moresourcefn update_read_capabilities<'life0, 'life1, 'async_trait>(
&'life0 mut self,
updates: &'life1 mut BTreeMap<GlobalId, ChangeBatch<Self::Timestamp>>
) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
fn update_read_capabilities<'life0, 'life1, 'async_trait>(
&'life0 mut self,
updates: &'life1 mut BTreeMap<GlobalId, ChangeBatch<Self::Timestamp>>
) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
updates
and sends any appropriate compaction command.Auto Trait Implementations
impl<T> !RefUnwindSafe for Controller<T>
impl<T> Send for Controller<T>
impl<T> Sync for Controller<T>
impl<T> Unpin for Controller<T>where
T: Unpin,
impl<T> !UnwindSafe for Controller<T>
Blanket Implementations
sourceimpl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
const: unstable · sourcefn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
sourceimpl<T> FutureExt for T
impl<T> FutureExt for T
sourcefn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
sourcefn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
sourceimpl<T> Instrument for T
impl<T> Instrument for T
sourcefn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
sourcefn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
sourceimpl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
sourcefn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T
in a tonic::Request
sourceimpl<T> Pointable for T
impl<T> Pointable for T
sourceimpl<P, R> ProtoType<R> for Pwhere
R: RustType<P>,
impl<P, R> ProtoType<R> for Pwhere
R: RustType<P>,
sourcefn into_rust(self) -> Result<R, TryFromProtoError>
fn into_rust(self) -> Result<R, TryFromProtoError>
RustType::from_proto
.sourcefn from_rust(rust: &R) -> P
fn from_rust(rust: &R) -> P
RustType::into_proto
.