Struct mz_storage_client::controller::Controller
source · pub struct Controller<T: Timestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation> { /* private fields */ }
Expand description
A storage controller for a storage instance.
Implementations§
source§impl<T> Controller<T>where
T: Timestamp + Lattice + TotalOrder + Codec64 + From<EpochMillis> + TimestampManipulation,
StorageCommand<T>: RustType<ProtoStorageCommand>,
StorageResponse<T>: RustType<ProtoStorageResponse>,
Self: StorageController<Timestamp = T>,
impl<T> Controller<T>where
T: Timestamp + Lattice + TotalOrder + Codec64 + From<EpochMillis> + TimestampManipulation,
StorageCommand<T>: RustType<ProtoStorageCommand>,
StorageResponse<T>: RustType<ProtoStorageResponse>,
Self: StorageController<Timestamp = T>,
sourcepub async fn new(
build_info: &'static BuildInfo,
postgres_url: String,
persist_location: PersistLocation,
persist_clients: Arc<PersistClientCache>,
now: NowFn,
postgres_factory: &StashFactory,
envd_epoch: NonZeroI64,
metrics_registry: MetricsRegistry
) -> Self
pub async fn new(
build_info: &'static BuildInfo,
postgres_url: String,
persist_location: PersistLocation,
persist_clients: Arc<PersistClientCache>,
now: NowFn,
postgres_factory: &StashFactory,
envd_epoch: NonZeroI64,
metrics_registry: MetricsRegistry
) -> Self
Create a new storage controller from a client it should wrap.
Note that when creating a new storage controller, you must also reconcile it with the previous state.
Trait Implementations§
source§impl<T: Debug + Timestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation> Debug for Controller<T>
impl<T: Debug + Timestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation> Debug for Controller<T>
source§impl<T> StorageController for Controller<T>where
T: Timestamp + Lattice + TotalOrder + Codec64 + From<EpochMillis> + TimestampManipulation,
StorageCommand<T>: RustType<ProtoStorageCommand>,
StorageResponse<T>: RustType<ProtoStorageResponse>,
MetadataExportFetcher: MetadataExport<T>,
DurableExportMetadata<T>: Data,
impl<T> StorageController for Controller<T>where
T: Timestamp + Lattice + TotalOrder + Codec64 + From<EpochMillis> + TimestampManipulation,
StorageCommand<T>: RustType<ProtoStorageCommand>,
StorageResponse<T>: RustType<ProtoStorageResponse>,
MetadataExportFetcher: MetadataExport<T>,
DurableExportMetadata<T>: Data,
source§fn drop_sinks(&mut self, identifiers: Vec<GlobalId>) -> Result<(), StorageError>
fn drop_sinks(&mut self, identifiers: Vec<GlobalId>) -> Result<(), StorageError>
Drops the read capability for the sinks and allows their resources to be reclaimed.
type Timestamp = T
source§fn initialization_complete(&mut self)
fn initialization_complete(&mut self)
Marks the end of any initialization commands. Read more
source§fn update_configuration(&mut self, config_params: StorageParameters)
fn update_configuration(&mut self, config_params: StorageParameters)
Update storage configuration.
source§fn collection(
&self,
id: GlobalId
) -> Result<&CollectionState<Self::Timestamp>, StorageError>
fn collection(
&self,
id: GlobalId
) -> Result<&CollectionState<Self::Timestamp>, StorageError>
Acquire an immutable reference to the collection state, should it exist.
source§fn collection_mut(
&mut self,
id: GlobalId
) -> Result<&mut CollectionState<Self::Timestamp>, StorageError>
fn collection_mut(
&mut self,
id: GlobalId
) -> Result<&mut CollectionState<Self::Timestamp>, StorageError>
Acquire a mutable reference to the collection state, should it exist.
source§fn collections(
&self
) -> Box<dyn Iterator<Item = (&GlobalId, &CollectionState<Self::Timestamp>)> + '_>
fn collections(
&self
) -> Box<dyn Iterator<Item = (&GlobalId, &CollectionState<Self::Timestamp>)> + '_>
Acquire an iterator over all collection states.
source§fn create_instance(&mut self, id: StorageInstanceId)
fn create_instance(&mut self, id: StorageInstanceId)
Creates a storage instance with the specified ID. Read more
source§fn drop_instance(&mut self, id: StorageInstanceId)
fn drop_instance(&mut self, id: StorageInstanceId)
Drops the storage instance with the given ID. Read more
source§fn connect_replica(
&mut self,
id: StorageInstanceId,
location: ClusterReplicaLocation
)
fn connect_replica(
&mut self,
id: StorageInstanceId,
location: ClusterReplicaLocation
)
Connects the storage instance to the specified replica. Read more
source§fn migrate_collections<'life0, 'async_trait>(
&'life0 mut self,
collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>
) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn migrate_collections<'life0, 'async_trait>(
&'life0 mut self,
collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>
) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Migrate any storage controller state from previous versions to this
version’s expectations. Read more
source§fn create_collections<'life0, 'async_trait>(
&'life0 mut self,
collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>
) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn create_collections<'life0, 'async_trait>(
&'life0 mut self,
collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>
) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Create the sources described in the individual CreateSourceCommand commands. Read more
source§fn export(
&self,
id: GlobalId
) -> Result<&ExportState<Self::Timestamp>, StorageError>
fn export(
&self,
id: GlobalId
) -> Result<&ExportState<Self::Timestamp>, StorageError>
Acquire an immutable reference to the export state, should it exist.
source§fn export_mut(
&mut self,
id: GlobalId
) -> Result<&mut ExportState<Self::Timestamp>, StorageError>
fn export_mut(
&mut self,
id: GlobalId
) -> Result<&mut ExportState<Self::Timestamp>, StorageError>
Acquire a mutable reference to the export state, should it exist.
source§fn prepare_export(
&mut self,
id: GlobalId,
from_id: GlobalId
) -> Result<CreateExportToken<T>, StorageError>
fn prepare_export(
&mut self,
id: GlobalId,
from_id: GlobalId
) -> Result<CreateExportToken<T>, StorageError>
Notify the storage controller to prepare for an export to be created
source§fn cancel_prepare_export(&mut self, _: CreateExportToken<T>)
fn cancel_prepare_export(&mut self, _: CreateExportToken<T>)
Cancel the pending export
source§fn create_exports<'life0, 'async_trait>(
&'life0 mut self,
exports: Vec<(CreateExportToken<Self::Timestamp>, ExportDescription<Self::Timestamp>)>
) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn create_exports<'life0, 'async_trait>(
&'life0 mut self,
exports: Vec<(CreateExportToken<Self::Timestamp>, ExportDescription<Self::Timestamp>)>
) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Create the sinks described by the
ExportDescription
.source§fn drop_sources(
&mut self,
identifiers: Vec<GlobalId>
) -> Result<(), StorageError>
fn drop_sources(
&mut self,
identifiers: Vec<GlobalId>
) -> Result<(), StorageError>
Drops the read capability for the sources and allows their resources to be reclaimed.
source§fn drop_sources_unvalidated(&mut self, identifiers: Vec<GlobalId>)
fn drop_sources_unvalidated(&mut self, identifiers: Vec<GlobalId>)
Drops the read capability for the sources and allows their resources to be reclaimed. Read more
source§fn drop_sinks_unvalidated(&mut self, identifiers: Vec<GlobalId>)
fn drop_sinks_unvalidated(&mut self, identifiers: Vec<GlobalId>)
Drops the read capability for the sinks and allows their resources to be reclaimed. Read more
source§fn append(
&mut self,
commands: Vec<(GlobalId, Vec<Update<Self::Timestamp>>, Self::Timestamp)>
) -> Result<Receiver<Result<(), StorageError>>, StorageError>
fn append(
&mut self,
commands: Vec<(GlobalId, Vec<Update<Self::Timestamp>>, Self::Timestamp)>
) -> Result<Receiver<Result<(), StorageError>>, StorageError>
source§fn snapshot<'life0, 'async_trait>(
&'life0 self,
id: GlobalId,
as_of: Self::Timestamp
) -> Pin<Box<dyn Future<Output = Result<Vec<(Row, Diff)>, StorageError>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn snapshot<'life0, 'async_trait>(
&'life0 self,
id: GlobalId,
as_of: Self::Timestamp
) -> Pin<Box<dyn Future<Output = Result<Vec<(Row, Diff)>, StorageError>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Returns the snapshot of the contents of the local input named
id
at as_of
.source§fn set_read_policy(
&mut self,
policies: Vec<(GlobalId, ReadPolicy<Self::Timestamp>)>
)
fn set_read_policy(
&mut self,
policies: Vec<(GlobalId, ReadPolicy<Self::Timestamp>)>
)
Assigns a read policy to specific identifiers. Read more
source§fn update_write_frontiers(
&mut self,
updates: &[(GlobalId, Antichain<Self::Timestamp>)]
)
fn update_write_frontiers(
&mut self,
updates: &[(GlobalId, Antichain<Self::Timestamp>)]
)
Ingests write frontier updates for collections that this controller
maintains and potentially generates updates to read capabilities, which
are passed on to
StorageController::update_read_capabilities
. Read moresource§fn update_read_capabilities(
&mut self,
updates: &mut BTreeMap<GlobalId, ChangeBatch<Self::Timestamp>>
)
fn update_read_capabilities(
&mut self,
updates: &mut BTreeMap<GlobalId, ChangeBatch<Self::Timestamp>>
)
Applies
updates
and sends any appropriate compaction command.source§fn ready<'life0, 'async_trait>(
&'life0 mut self
) -> Pin<Box<dyn Future<Output = ()> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn ready<'life0, 'async_trait>(
&'life0 mut self
) -> Pin<Box<dyn Future<Output = ()> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Waits until the controller is ready to process a response. Read more
source§fn process<'life0, 'async_trait>(
&'life0 mut self
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn process<'life0, 'async_trait>(
&'life0 mut self
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Processes the work queued by
StorageController::ready
. Read moresource§fn reconcile_state<'life0, 'async_trait>(
&'life0 mut self
) -> Pin<Box<dyn Future<Output = ()> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn reconcile_state<'life0, 'async_trait>(
&'life0 mut self
) -> Pin<Box<dyn Future<Output = ()> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Signal to the controller that the adapter has populated all of its
initial state and the controller can reconcile (i.e. drop) any unclaimed
resources.
Auto Trait Implementations§
impl<T> !RefUnwindSafe for Controller<T>
impl<T> Send for Controller<T>
impl<T> Sync for Controller<T>
impl<T> Unpin for Controller<T>where
T: Unpin,
impl<T> !UnwindSafe for Controller<T>
Blanket Implementations§
source§impl<T> Instrument for T
impl<T> Instrument for T
source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
Wrap the input message
T
in a tonic::Request
source§impl<T> Pointable for T
impl<T> Pointable for T
source§impl<P, R> ProtoType<R> for Pwhere
R: RustType<P>,
impl<P, R> ProtoType<R> for Pwhere
R: RustType<P>,
source§fn into_rust(self) -> Result<R, TryFromProtoError>
fn into_rust(self) -> Result<R, TryFromProtoError>
See
RustType::from_proto
.source§fn from_rust(rust: &R) -> P
fn from_rust(rust: &R) -> P
See
RustType::into_proto
.