Struct mz_dataflow_types::client::controller::storage::Controller
source · [−]pub struct Controller<T: Timestamp + Lattice + Codec64 + Unpin> {
state: StorageControllerState<T>,
persist_location: PersistLocation,
persist_client: PersistClient,
orchestrator: Arc<dyn NamespacedOrchestrator>,
storaged_image: String,
}
Expand description
A storage controller for a storage instance.
Fields
state: StorageControllerState<T>
persist_location: PersistLocation
The persist location where all storage collections are being written to
persist_client: PersistClient
A persist client used to write to storage collections
orchestrator: Arc<dyn NamespacedOrchestrator>
An orchestrator to start and stop storage processes.
storaged_image: String
The storaged image to use when starting new storage processes.
Implementations
sourceimpl<T> Controller<T> where
T: Timestamp + Lattice + TotalOrder + TryInto<i64> + TryFrom<i64> + Codec64 + Unpin,
<T as TryInto<i64>>::Error: Debug,
<T as TryFrom<i64>>::Error: Debug,
StorageCommand<T>: RustType<ProtoStorageCommand>,
StorageResponse<T>: RustType<ProtoStorageResponse>,
impl<T> Controller<T> where
T: Timestamp + Lattice + TotalOrder + TryInto<i64> + TryFrom<i64> + Codec64 + Unpin,
<T as TryInto<i64>>::Error: Debug,
<T as TryFrom<i64>>::Error: Debug,
StorageCommand<T>: RustType<ProtoStorageCommand>,
StorageResponse<T>: RustType<ProtoStorageResponse>,
sourcepub async fn new(
postgres_url: String,
persist_location: PersistLocation,
persist_clients: Arc<Mutex<PersistClientCache>>,
orchestrator: Arc<dyn NamespacedOrchestrator>,
storaged_image: String
) -> Self
pub async fn new(
postgres_url: String,
persist_location: PersistLocation,
persist_clients: Arc<Mutex<PersistClientCache>>,
orchestrator: Arc<dyn NamespacedOrchestrator>,
storaged_image: String
) -> Self
Create a new storage controller from a client it should wrap.
sourcefn validate_ids(
&self,
ids: impl Iterator<Item = GlobalId>
) -> Result<(), StorageError>
fn validate_ids(
&self,
ids: impl Iterator<Item = GlobalId>
) -> Result<(), StorageError>
Validate that a collection exists for all identifiers, and error if any do not.
Trait Implementations
sourceimpl<T> StorageController for Controller<T> where
T: Timestamp + Lattice + TotalOrder + TryInto<i64> + TryFrom<i64> + Codec64 + Unpin,
<T as TryInto<i64>>::Error: Debug,
<T as TryFrom<i64>>::Error: Debug,
StorageCommand<T>: RustType<ProtoStorageCommand>,
StorageResponse<T>: RustType<ProtoStorageResponse>,
impl<T> StorageController for Controller<T> where
T: Timestamp + Lattice + TotalOrder + TryInto<i64> + TryFrom<i64> + Codec64 + Unpin,
<T as TryInto<i64>>::Error: Debug,
<T as TryFrom<i64>>::Error: Debug,
StorageCommand<T>: RustType<ProtoStorageCommand>,
StorageResponse<T>: RustType<ProtoStorageResponse>,
sourcefn linearize_sources<'life0, 'async_trait>(
&'life0 mut self,
_peek_id: Uuid,
_source_ids: Vec<GlobalId>
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'async_trait>> where
'life0: 'async_trait,
Self: 'async_trait,
fn linearize_sources<'life0, 'async_trait>(
&'life0 mut self,
_peek_id: Uuid,
_source_ids: Vec<GlobalId>
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'async_trait>> where
'life0: 'async_trait,
Self: 'async_trait,
“Linearize” the listed sources.
If these sources are valid and “linearizable”, then the response will respond with timestamps that are guaranteed to be up-to-date with the max offset found at the time of the command issuance.
Note: “linearizable” in this context may not represent true linearizability in all cases.
type Timestamp = T
sourcefn collection(&self, id: GlobalId) -> Result<&CollectionState<T>, StorageError>
fn collection(&self, id: GlobalId) -> Result<&CollectionState<T>, StorageError>
Acquire an immutable reference to the collection state, should it exist.
sourcefn collection_mut(
&mut self,
id: GlobalId
) -> Result<&mut CollectionState<T>, StorageError>
fn collection_mut(
&mut self,
id: GlobalId
) -> Result<&mut CollectionState<T>, StorageError>
Acquire a mutable reference to the collection state, should it exist.
sourcefn create_collections<'life0, 'async_trait>(
&'life0 mut self,
collections: Vec<(GlobalId, CollectionDescription)>
) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + Send + 'async_trait>> where
'life0: 'async_trait,
Self: 'async_trait,
fn create_collections<'life0, 'async_trait>(
&'life0 mut self,
collections: Vec<(GlobalId, CollectionDescription)>
) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + Send + 'async_trait>> where
'life0: 'async_trait,
Self: 'async_trait,
Create the sources described in the individual CreateSourceCommand commands. Read more
sourcefn drop_sources<'life0, 'async_trait>(
&'life0 mut self,
identifiers: Vec<GlobalId>
) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + Send + 'async_trait>> where
'life0: 'async_trait,
Self: 'async_trait,
fn drop_sources<'life0, 'async_trait>(
&'life0 mut self,
identifiers: Vec<GlobalId>
) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + Send + 'async_trait>> where
'life0: 'async_trait,
Self: 'async_trait,
Drops the read capability for the sources and allows their resources to be reclaimed.
sourcefn append<'life0, 'async_trait>(
&'life0 mut self,
commands: Vec<(GlobalId, Vec<Update<Self::Timestamp>>, Self::Timestamp)>
) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + Send + 'async_trait>> where
'life0: 'async_trait,
Self: 'async_trait,
fn append<'life0, 'async_trait>(
&'life0 mut self,
commands: Vec<(GlobalId, Vec<Update<Self::Timestamp>>, Self::Timestamp)>
) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + Send + 'async_trait>> where
'life0: 'async_trait,
Self: 'async_trait,
Append updates
into the local input named id
and advance its upper to upper
.
sourcefn snapshot<'life0, 'async_trait>(
&'life0 mut self,
id: GlobalId,
as_of: Self::Timestamp
) -> Pin<Box<dyn Future<Output = Result<Vec<(Row, Diff)>, StorageError>> + Send + 'async_trait>> where
'life0: 'async_trait,
Self: 'async_trait,
fn snapshot<'life0, 'async_trait>(
&'life0 mut self,
id: GlobalId,
as_of: Self::Timestamp
) -> Pin<Box<dyn Future<Output = Result<Vec<(Row, Diff)>, StorageError>> + Send + 'async_trait>> where
'life0: 'async_trait,
Self: 'async_trait,
Returns the snapshot of the contents of the local input named id
at as_of
.
sourcefn set_read_policy<'life0, 'async_trait>(
&'life0 mut self,
policies: Vec<(GlobalId, ReadPolicy<T>)>
) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + Send + 'async_trait>> where
'life0: 'async_trait,
Self: 'async_trait,
fn set_read_policy<'life0, 'async_trait>(
&'life0 mut self,
policies: Vec<(GlobalId, ReadPolicy<T>)>
) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + Send + 'async_trait>> where
'life0: 'async_trait,
Self: 'async_trait,
Assigns a read policy to specific identifiers. Read more
sourcefn update_write_frontiers<'life0, 'life1, 'async_trait>(
&'life0 mut self,
updates: &'life1 [(GlobalId, ChangeBatch<T>)]
) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + Send + 'async_trait>> where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
fn update_write_frontiers<'life0, 'life1, 'async_trait>(
&'life0 mut self,
updates: &'life1 [(GlobalId, ChangeBatch<T>)]
) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + Send + 'async_trait>> where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
Accept write frontier updates from the compute layer.
sourcefn update_read_capabilities<'life0, 'life1, 'async_trait>(
&'life0 mut self,
updates: &'life1 mut BTreeMap<GlobalId, ChangeBatch<T>>
) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + Send + 'async_trait>> where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
fn update_read_capabilities<'life0, 'life1, 'async_trait>(
&'life0 mut self,
updates: &'life1 mut BTreeMap<GlobalId, ChangeBatch<T>>
) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + Send + 'async_trait>> where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
Applies updates
and sends any appropriate compaction command.
fn recv<'life0, 'async_trait>(
&'life0 mut self
) -> Pin<Box<dyn Future<Output = Result<Option<StorageResponse<Self::Timestamp>>, Error>> + Send + 'async_trait>> where
'life0: 'async_trait,
Self: 'async_trait,
Auto Trait Implementations
impl<T> !RefUnwindSafe for Controller<T>
impl<T> Send for Controller<T>
impl<T> !Sync for Controller<T>
impl<T> Unpin for Controller<T>
impl<T> !UnwindSafe for Controller<T>
Blanket Implementations
sourceimpl<T> BorrowMut<T> for T where
T: ?Sized,
impl<T> BorrowMut<T> for T where
T: ?Sized,
const: unstable · sourcefn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more
sourceimpl<T> FutureExt for T
impl<T> FutureExt for T
sourcefn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
sourcefn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
sourceimpl<T> Instrument for T
impl<T> Instrument for T
sourcefn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
sourcefn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
sourceimpl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
sourcefn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
Wrap the input message T
in a tonic::Request
sourceimpl<P, R> ProtoType<R> for P where
R: RustType<P>,
impl<P, R> ProtoType<R> for P where
R: RustType<P>,
sourcefn into_rust(self) -> Result<R, TryFromProtoError>
fn into_rust(self) -> Result<R, TryFromProtoError>
See RustType::from_proto
.
sourcefn from_rust(rust: &R) -> P
fn from_rust(rust: &R) -> P
See RustType::into_proto
.
sourceimpl<T> WithSubscriber for T
impl<T> WithSubscriber for T
sourcefn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self> where
S: Into<Dispatch>,
fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self> where
S: Into<Dispatch>,
Attaches the provided Subscriber
to this type, returning a
WithDispatch
wrapper. Read more
sourcefn with_current_subscriber(self) -> WithDispatch<Self>
fn with_current_subscriber(self) -> WithDispatch<Self>
Attaches the current default Subscriber
to this type, returning a
WithDispatch
wrapper. Read more