Struct mz_storage_controller::Controller
source · pub struct Controller<T: Timestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation> { /* private fields */ }
Expand description
A storage controller for a storage instance.
Implementations§
source§impl<T> Controller<T>where
T: Timestamp + Lattice + TotalOrder + Codec64 + From<EpochMillis> + TimestampManipulation,
StorageCommand<T>: RustType<ProtoStorageCommand>,
StorageResponse<T>: RustType<ProtoStorageResponse>,
Self: StorageController<Timestamp = T>,
impl<T> Controller<T>where
T: Timestamp + Lattice + TotalOrder + Codec64 + From<EpochMillis> + TimestampManipulation,
StorageCommand<T>: RustType<ProtoStorageCommand>,
StorageResponse<T>: RustType<ProtoStorageResponse>,
Self: StorageController<Timestamp = T>,
sourcepub async fn new(
build_info: &'static BuildInfo,
persist_location: PersistLocation,
persist_clients: Arc<PersistClientCache>,
now: NowFn,
envd_epoch: NonZeroI64,
metrics_registry: MetricsRegistry,
persist_txn_tables: PersistTxnTablesImpl,
connection_context: ConnectionContext,
txn: &dyn StorageTxn<T>
) -> Self
pub async fn new( build_info: &'static BuildInfo, persist_location: PersistLocation, persist_clients: Arc<PersistClientCache>, now: NowFn, envd_epoch: NonZeroI64, metrics_registry: MetricsRegistry, persist_txn_tables: PersistTxnTablesImpl, connection_context: ConnectionContext, txn: &dyn StorageTxn<T> ) -> Self
Create a new storage controller from a client it should wrap.
Note that when creating a new storage controller, you must also reconcile it with the previous state.
§Panics
If this function is called before prepare_initialization
.
Trait Implementations§
source§impl<T> Debug for Controller<T>
impl<T> Debug for Controller<T>
source§impl<'a, T> StorageController for Controller<T>where
T: Timestamp + Lattice + TotalOrder + Codec64 + From<EpochMillis> + TimestampManipulation + Into<Datum<'a>> + Display,
StorageCommand<T>: RustType<ProtoStorageCommand>,
StorageResponse<T>: RustType<ProtoStorageResponse>,
impl<'a, T> StorageController for Controller<T>where
T: Timestamp + Lattice + TotalOrder + Codec64 + From<EpochMillis> + TimestampManipulation + Into<Datum<'a>> + Display,
StorageCommand<T>: RustType<ProtoStorageCommand>,
StorageResponse<T>: RustType<ProtoStorageResponse>,
source§fn config(&self) -> &StorageConfiguration
fn config(&self) -> &StorageConfiguration
Get the current configuration
source§fn create_collections<'life0, 'life1, 'async_trait>(
&'life0 mut self,
storage_metadata: &'life1 StorageMetadata,
register_ts: Option<Self::Timestamp>,
collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>
) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn create_collections<'life0, 'life1, 'async_trait>(
&'life0 mut self,
storage_metadata: &'life1 StorageMetadata,
register_ts: Option<Self::Timestamp>,
collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>
) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Create and “execute” the described collection.
“Execute” is in scare quotes because what executing a collection means varies widely based on the type of collection you’re creating.
The general process creating a collection undergoes is:
- Enrich the description we get from the user with the metadata only the storage controller’s metadata. This is mostly a matter of separating concerns.
- Generate write and read persist handles for the collection.
- Store the collection’s metadata in the appropriate field.
- “Execte” the collection. What that means is contingent on the type of collection. so consult the code for more details.
source§fn alter_export_connections<'life0, 'async_trait>(
&'life0 mut self,
exports: BTreeMap<GlobalId, StorageSinkConnection>
) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn alter_export_connections<'life0, 'async_trait>(
&'life0 mut self,
exports: BTreeMap<GlobalId, StorageSinkConnection>
) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Create the sinks described by the ExportDescription
.
source§fn drop_sinks(
&mut self,
identifiers: Vec<GlobalId>
) -> Result<(), StorageError<Self::Timestamp>>
fn drop_sinks( &mut self, identifiers: Vec<GlobalId> ) -> Result<(), StorageError<Self::Timestamp>>
Drops the read capability for the sinks and allows their resources to be reclaimed.
source§fn init_txns<'life0, 'async_trait>(
&'life0 mut self,
init_ts: T
) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn init_txns<'life0, 'async_trait>(
&'life0 mut self,
init_ts: T
) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
With the CRDB based timestamp oracle, there is no longer write timestamp
fencing. As in, when a new Coordinator, B
, starts up, there is nothing
that prevents an old Coordinator, A
, from getting a new write
timestamp that is higher than B
’s boot timestamp. Below is the
implications for all persist transaction scenarios, on
means a
Coordinator turning the persist txn flag on, off
means a Coordinator
turning the persist txn flag off.
The following series of events is a concern:
A
writes att_0
, s.t.t_0
>B
’s boot timestamp.B
writes att_1
, s.t.t_1
>t_0
.A
writes att_2
, s.t.t_2
>t_1
.- etc.
off
->off
: IfB`` manages to append
t_1before A appends
t_0then the
t_0append will panic and we won't acknowledge the write to the user (or similarly
t_2and
t_1`). Before persist-txn, appends are not atomic, so we might get a partial append. This is fine because we only support single table transactions.on
->on
: The txn-shard is meant to correctly handle two writers so this should be fine. Note it’s possible that we have two Coordinators interleaving write transactions without the leadership check described below, but that should be fine.off
->on
: IfA
gets a write timestamp higher thanB
’s boot timestamp, thenA
can write directly to a data shard after it’s been registered with a txn-shard, breaking the invariant that no data shard is written to directly while it’s registered to a transaction shard. To mitigate this, we must do a leadership check AFTER getting the write timestamp. In order forB
to register a data shard in the txn shard, it must first become the leader then second get a register timestamp. So ifA
gets a write timestamp higher thanB
’s register timestamp, it will fail the leadership check before attempting the append.on
->off
: IfA
tries to write to the txn-shard at a timestamp higher thanB
’s boot timestamp, it will fail because the shards have been forgotten. So everything should be ok.
In general, all transitions make the following steps:
- Get write timestamp,
ts
. - Apply all transactions to all data shards up to
ts
. - Register/forget all data shards. So if we crash at any point in these steps, for example after only applying some transactions, then the next Coordinator can pick up where we left off and finish whatever needs finishing.
H/t jkosh44 for the above notes from the discussion in which we hashed this all out.
type Timestamp = T
source§fn initialization_complete(&mut self)
fn initialization_complete(&mut self)
source§fn update_parameters(&mut self, config_params: StorageParameters)
fn update_parameters(&mut self, config_params: StorageParameters)
source§fn collection_metadata(
&self,
id: GlobalId
) -> Result<CollectionMetadata, StorageError<Self::Timestamp>>
fn collection_metadata( &self, id: GlobalId ) -> Result<CollectionMetadata, StorageError<Self::Timestamp>>
id
.source§fn collection_frontiers(
&self,
id: GlobalId
) -> Result<(Antichain<Self::Timestamp>, Antichain<Self::Timestamp>), StorageError<Self::Timestamp>>
fn collection_frontiers( &self, id: GlobalId ) -> Result<(Antichain<Self::Timestamp>, Antichain<Self::Timestamp>), StorageError<Self::Timestamp>>
source§fn collections_frontiers(
&self,
ids: Vec<GlobalId>
) -> Result<Vec<(GlobalId, Antichain<T>, Antichain<T>)>, StorageError<Self::Timestamp>>
fn collections_frontiers( &self, ids: Vec<GlobalId> ) -> Result<Vec<(GlobalId, Antichain<T>, Antichain<T>)>, StorageError<Self::Timestamp>>
source§fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)>
fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)>
source§fn check_exists(
&self,
id: GlobalId
) -> Result<(), StorageError<Self::Timestamp>>
fn check_exists( &self, id: GlobalId ) -> Result<(), StorageError<Self::Timestamp>>
GlobalId
. Returns
an error if the collection does not exist.source§fn create_instance(&mut self, id: StorageInstanceId)
fn create_instance(&mut self, id: StorageInstanceId)
source§fn drop_instance(&mut self, id: StorageInstanceId)
fn drop_instance(&mut self, id: StorageInstanceId)
source§fn connect_replica(
&mut self,
instance_id: StorageInstanceId,
replica_id: ReplicaId,
location: ClusterReplicaLocation
)
fn connect_replica( &mut self, instance_id: StorageInstanceId, replica_id: ReplicaId, location: ClusterReplicaLocation )
source§fn drop_replica(
&mut self,
instance_id: StorageInstanceId,
_replica_id: ReplicaId
)
fn drop_replica( &mut self, instance_id: StorageInstanceId, _replica_id: ReplicaId )
source§fn check_alter_ingestion_source_desc(
&mut self,
ingestion_id: GlobalId,
source_desc: &SourceDesc
) -> Result<(), StorageError<Self::Timestamp>>
fn check_alter_ingestion_source_desc( &mut self, ingestion_id: GlobalId, source_desc: &SourceDesc ) -> Result<(), StorageError<Self::Timestamp>>
source§fn alter_ingestion_source_desc<'life0, 'async_trait>(
&'life0 mut self,
ingestion_id: GlobalId,
source_desc: SourceDesc
) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn alter_ingestion_source_desc<'life0, 'async_trait>(
&'life0 mut self,
ingestion_id: GlobalId,
source_desc: SourceDesc
) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
SourceDesc
.source§fn alter_ingestion_connections<'life0, 'async_trait>(
&'life0 mut self,
source_connections: BTreeMap<GlobalId, GenericSourceConnection<InlinedConnection>>
) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn alter_ingestion_connections<'life0, 'async_trait>(
&'life0 mut self,
source_connections: BTreeMap<GlobalId, GenericSourceConnection<InlinedConnection>>
) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
GenericSourceConnection
.source§fn export(
&self,
id: GlobalId
) -> Result<&ExportState<Self::Timestamp>, StorageError<Self::Timestamp>>
fn export( &self, id: GlobalId ) -> Result<&ExportState<Self::Timestamp>, StorageError<Self::Timestamp>>
source§fn export_mut(
&mut self,
id: GlobalId
) -> Result<&mut ExportState<Self::Timestamp>, StorageError<Self::Timestamp>>
fn export_mut( &mut self, id: GlobalId ) -> Result<&mut ExportState<Self::Timestamp>, StorageError<Self::Timestamp>>
source§fn create_exports<'life0, 'async_trait>(
&'life0 mut self,
exports: Vec<(GlobalId, ExportDescription<Self::Timestamp>)>
) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn create_exports<'life0, 'async_trait>(
&'life0 mut self,
exports: Vec<(GlobalId, ExportDescription<Self::Timestamp>)>
) -> Pin<Box<dyn Future<Output = Result<(), StorageError<Self::Timestamp>>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
ExportDescription
.source§fn drop_tables(
&mut self,
identifiers: Vec<GlobalId>,
ts: Self::Timestamp
) -> Result<(), StorageError<Self::Timestamp>>
fn drop_tables( &mut self, identifiers: Vec<GlobalId>, ts: Self::Timestamp ) -> Result<(), StorageError<Self::Timestamp>>
source§fn drop_sources(
&mut self,
storage_metadata: &StorageMetadata,
identifiers: Vec<GlobalId>
) -> Result<(), StorageError<Self::Timestamp>>
fn drop_sources( &mut self, storage_metadata: &StorageMetadata, identifiers: Vec<GlobalId> ) -> Result<(), StorageError<Self::Timestamp>>
source§fn drop_sources_unvalidated(
&mut self,
storage_metadata: &StorageMetadata,
ids: Vec<GlobalId>
) -> Result<(), StorageError<Self::Timestamp>>
fn drop_sources_unvalidated( &mut self, storage_metadata: &StorageMetadata, ids: Vec<GlobalId> ) -> Result<(), StorageError<Self::Timestamp>>
source§fn drop_sinks_unvalidated(&mut self, identifiers: Vec<GlobalId>)
fn drop_sinks_unvalidated(&mut self, identifiers: Vec<GlobalId>)
source§fn append_table(
&mut self,
write_ts: Self::Timestamp,
advance_to: Self::Timestamp,
commands: Vec<(GlobalId, Vec<TimestamplessUpdate>)>
) -> Result<Receiver<Result<(), StorageError<Self::Timestamp>>>, StorageError<Self::Timestamp>>
fn append_table( &mut self, write_ts: Self::Timestamp, advance_to: Self::Timestamp, commands: Vec<(GlobalId, Vec<TimestamplessUpdate>)> ) -> Result<Receiver<Result<(), StorageError<Self::Timestamp>>>, StorageError<Self::Timestamp>>
source§fn monotonic_appender(
&self,
id: GlobalId
) -> Result<MonotonicAppender<Self::Timestamp>, StorageError<Self::Timestamp>>
fn monotonic_appender( &self, id: GlobalId ) -> Result<MonotonicAppender<Self::Timestamp>, StorageError<Self::Timestamp>>
MonotonicAppender
which is a channel that can be used to monotonically
append to the specified GlobalId
.source§fn webhook_statistics(
&self,
id: GlobalId
) -> Result<Arc<WebhookStatistics>, StorageError<Self::Timestamp>>
fn webhook_statistics( &self, id: GlobalId ) -> Result<Arc<WebhookStatistics>, StorageError<Self::Timestamp>>
WebhookStatistics
which can be used to report user-facing
statistics for this given webhhook, specified by the GlobalId
.source§fn snapshot<'life0, 'async_trait>(
&'life0 mut self,
id: GlobalId,
as_of: Self::Timestamp
) -> Pin<Box<dyn Future<Output = Result<Vec<(Row, Diff)>, StorageError<Self::Timestamp>>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn snapshot<'life0, 'async_trait>(
&'life0 mut self,
id: GlobalId,
as_of: Self::Timestamp
) -> Pin<Box<dyn Future<Output = Result<Vec<(Row, Diff)>, StorageError<Self::Timestamp>>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
id
at as_of
.source§fn snapshot_cursor<'life0, 'async_trait>(
&'life0 mut self,
id: GlobalId,
as_of: Self::Timestamp
) -> Pin<Box<dyn Future<Output = Result<SnapshotCursor<Self::Timestamp>, StorageError<Self::Timestamp>>> + 'async_trait>>
fn snapshot_cursor<'life0, 'async_trait>( &'life0 mut self, id: GlobalId, as_of: Self::Timestamp ) -> Pin<Box<dyn Future<Output = Result<SnapshotCursor<Self::Timestamp>, StorageError<Self::Timestamp>>> + 'async_trait>>
id
at as_of
.source§fn snapshot_stats<'life0, 'async_trait>(
&'life0 self,
id: GlobalId,
as_of: Antichain<Self::Timestamp>
) -> Pin<Box<dyn Future<Output = Result<SnapshotStats, StorageError<Self::Timestamp>>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn snapshot_stats<'life0, 'async_trait>(
&'life0 self,
id: GlobalId,
as_of: Antichain<Self::Timestamp>
) -> Pin<Box<dyn Future<Output = Result<SnapshotStats, StorageError<Self::Timestamp>>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
id
at as_of
.source§fn snapshot_parts_stats<'life0, 'async_trait>(
&'life0 self,
id: GlobalId,
as_of: Antichain<Self::Timestamp>
) -> Pin<Box<dyn Future<Output = BoxFuture<'static, Result<SnapshotPartsStats, StorageError<Self::Timestamp>>>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn snapshot_parts_stats<'life0, 'async_trait>(
&'life0 self,
id: GlobalId,
as_of: Antichain<Self::Timestamp>
) -> Pin<Box<dyn Future<Output = BoxFuture<'static, Result<SnapshotPartsStats, StorageError<Self::Timestamp>>>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
source§fn set_read_policy(
&mut self,
policies: Vec<(GlobalId, ReadPolicy<Self::Timestamp>)>
)
fn set_read_policy( &mut self, policies: Vec<(GlobalId, ReadPolicy<Self::Timestamp>)> )
source§fn acquire_read_holds(
&mut self,
desired_holds: Vec<GlobalId>
) -> Result<Vec<ReadHold<Self::Timestamp>>, ReadHoldError>
fn acquire_read_holds( &mut self, desired_holds: Vec<GlobalId> ) -> Result<Vec<ReadHold<Self::Timestamp>>, ReadHoldError>
source§fn update_write_frontiers(
&mut self,
updates: &[(GlobalId, Antichain<Self::Timestamp>)]
)
fn update_write_frontiers( &mut self, updates: &[(GlobalId, Antichain<Self::Timestamp>)] )
StorageController::update_read_capabilities
. Read moresource§fn update_read_capabilities(
&mut self,
updates: &mut BTreeMap<GlobalId, ChangeBatch<Self::Timestamp>>
)
fn update_read_capabilities( &mut self, updates: &mut BTreeMap<GlobalId, ChangeBatch<Self::Timestamp>> )
updates
and sends any appropriate compaction command.source§fn ready<'life0, 'async_trait>(
&'life0 mut self
) -> Pin<Box<dyn Future<Output = ()> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn ready<'life0, 'async_trait>(
&'life0 mut self
) -> Pin<Box<dyn Future<Output = ()> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
source§fn process<'life0, 'life1, 'async_trait>(
&'life0 mut self,
storage_metadata: &'life1 StorageMetadata
) -> Pin<Box<dyn Future<Output = Result<Option<Response<T>>, Error>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn process<'life0, 'life1, 'async_trait>(
&'life0 mut self,
storage_metadata: &'life1 StorageMetadata
) -> Pin<Box<dyn Future<Output = Result<Option<Response<T>>, Error>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
StorageController::ready
. Read moresource§fn inspect_persist_state<'life0, 'async_trait>(
&'life0 self,
id: GlobalId
) -> Pin<Box<dyn Future<Output = Result<Value, Error>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn inspect_persist_state<'life0, 'async_trait>(
&'life0 self,
id: GlobalId
) -> Pin<Box<dyn Future<Output = Result<Value, Error>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
source§fn record_frontiers<'life0, 'async_trait>(
&'life0 mut self,
external_frontiers: BTreeMap<GlobalId, (Antichain<Self::Timestamp>, Antichain<Self::Timestamp>)>
) -> Pin<Box<dyn Future<Output = ()> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn record_frontiers<'life0, 'async_trait>(
&'life0 mut self,
external_frontiers: BTreeMap<GlobalId, (Antichain<Self::Timestamp>, Antichain<Self::Timestamp>)>
) -> Pin<Box<dyn Future<Output = ()> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
source§fn record_replica_frontiers<'life0, 'async_trait>(
&'life0 mut self,
external_frontiers: BTreeMap<(GlobalId, ReplicaId), Antichain<Self::Timestamp>>
) -> Pin<Box<dyn Future<Output = ()> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn record_replica_frontiers<'life0, 'async_trait>(
&'life0 mut self,
external_frontiers: BTreeMap<(GlobalId, ReplicaId), Antichain<Self::Timestamp>>
) -> Pin<Box<dyn Future<Output = ()> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
source§fn record_introspection_updates<'life0, 'async_trait>(
&'life0 mut self,
type_: IntrospectionType,
updates: Vec<(Row, Diff)>
) -> Pin<Box<dyn Future<Output = ()> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn record_introspection_updates<'life0, 'async_trait>(
&'life0 mut self,
type_: IntrospectionType,
updates: Vec<(Row, Diff)>
) -> Pin<Box<dyn Future<Output = ()> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
source§fn initialize_state<'life0, 'life1, 'async_trait>(
&'life0 mut self,
txn: &'life1 mut dyn StorageTxn<T>,
init_ids: BTreeSet<GlobalId>,
drop_ids: BTreeSet<GlobalId>
) -> Pin<Box<dyn Future<Output = Result<(), StorageError<T>>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn initialize_state<'life0, 'life1, 'async_trait>(
&'life0 mut self,
txn: &'life1 mut dyn StorageTxn<T>,
init_ids: BTreeSet<GlobalId>,
drop_ids: BTreeSet<GlobalId>
) -> Pin<Box<dyn Future<Output = Result<(), StorageError<T>>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
source§fn prepare_state<'life0, 'life1, 'async_trait>(
&'life0 mut self,
txn: &'life1 mut dyn StorageTxn<T>,
ids_to_add: BTreeSet<GlobalId>,
ids_to_drop: BTreeSet<GlobalId>
) -> Pin<Box<dyn Future<Output = Result<(), StorageError<T>>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn prepare_state<'life0, 'life1, 'async_trait>(
&'life0 mut self,
txn: &'life1 mut dyn StorageTxn<T>,
ids_to_add: BTreeSet<GlobalId>,
ids_to_drop: BTreeSet<GlobalId>
) -> Pin<Box<dyn Future<Output = Result<(), StorageError<T>>> + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
StorageTxn
with the appropriate metadata
given the IDs to add and drop. Read moresource§fn acquire_read_hold(
&mut self,
id: GlobalId
) -> Result<ReadHold<Self::Timestamp>, ReadHoldError>
fn acquire_read_hold( &mut self, id: GlobalId ) -> Result<ReadHold<Self::Timestamp>, ReadHoldError>
source§fn acquire_read_hold_at_time(
&mut self,
id: GlobalId,
desired_hold: Antichain<Self::Timestamp>
) -> Result<ReadHold<Self::Timestamp>, ReadHoldError>
fn acquire_read_hold_at_time( &mut self, id: GlobalId, desired_hold: Antichain<Self::Timestamp> ) -> Result<ReadHold<Self::Timestamp>, ReadHoldError>
desired_time
. Returns
ReadHoldError::SinceViolation when that is not possible.Auto Trait Implementations§
impl<T> !Freeze for Controller<T>
impl<T> !RefUnwindSafe for Controller<T>
impl<T> Send for Controller<T>
impl<T> !Sync for Controller<T>
impl<T> Unpin for Controller<T>where
T: Unpin,
impl<T> !UnwindSafe for Controller<T>
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
source§impl<R, O, T> CopyOnto<ConsecutiveOffsetPairs<R, O>> for T
impl<R, O, T> CopyOnto<ConsecutiveOffsetPairs<R, O>> for T
source§fn copy_onto(
self,
target: &mut ConsecutiveOffsetPairs<R, O>
) -> <ConsecutiveOffsetPairs<R, O> as Region>::Index
fn copy_onto( self, target: &mut ConsecutiveOffsetPairs<R, O> ) -> <ConsecutiveOffsetPairs<R, O> as Region>::Index
source§impl<T> FmtForward for T
impl<T> FmtForward for T
source§fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
self
to use its Binary
implementation when Debug
-formatted.source§fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
self
to use its Display
implementation when
Debug
-formatted.source§fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
self
to use its LowerExp
implementation when
Debug
-formatted.source§fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
self
to use its LowerHex
implementation when
Debug
-formatted.source§fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
self
to use its Octal
implementation when Debug
-formatted.source§fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
self
to use its Pointer
implementation when
Debug
-formatted.source§fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
self
to use its UpperExp
implementation when
Debug
-formatted.source§fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
self
to use its UpperHex
implementation when
Debug
-formatted.source§impl<T> FutureExt for T
impl<T> FutureExt for T
source§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
source§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
source§impl<T> Instrument for T
impl<T> Instrument for T
source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T
in a tonic::Request
source§impl<T, U> OverrideFrom<Option<&T>> for Uwhere
U: OverrideFrom<T>,
impl<T, U> OverrideFrom<Option<&T>> for Uwhere
U: OverrideFrom<T>,
source§impl<T> Pipe for Twhere
T: ?Sized,
impl<T> Pipe for Twhere
T: ?Sized,
source§fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
source§fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
self
and passes that borrow into the pipe function. Read moresource§fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
self
and passes that borrow into the pipe function. Read moresource§fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
source§fn pipe_borrow_mut<'a, B, R>(
&'a mut self,
func: impl FnOnce(&'a mut B) -> R
) -> R
fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R ) -> R
source§fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
self
, then passes self.as_ref()
into the pipe function.source§fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
self
, then passes self.as_mut()
into the pipe
function.source§fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
self
, then passes self.deref()
into the pipe function.source§impl<T> Pointable for T
impl<T> Pointable for T
source§impl<P, R> ProtoType<R> for Pwhere
R: RustType<P>,
impl<P, R> ProtoType<R> for Pwhere
R: RustType<P>,
source§fn into_rust(self) -> Result<R, TryFromProtoError>
fn into_rust(self) -> Result<R, TryFromProtoError>
RustType::from_proto
.source§fn from_rust(rust: &R) -> P
fn from_rust(rust: &R) -> P
RustType::into_proto
.source§impl<T> Tap for T
impl<T> Tap for T
source§fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
Borrow<B>
of a value. Read moresource§fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
BorrowMut<B>
of a value. Read moresource§fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
AsRef<R>
view of a value. Read moresource§fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
AsMut<R>
view of a value. Read moresource§fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
Deref::Target
of a value. Read moresource§fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
Deref::Target
of a value. Read moresource§fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
.tap()
only in debug builds, and is erased in release builds.source§fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
.tap_mut()
only in debug builds, and is erased in release
builds.source§fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
.tap_borrow()
only in debug builds, and is erased in release
builds.source§fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
.tap_borrow_mut()
only in debug builds, and is erased in release
builds.source§fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
.tap_ref()
only in debug builds, and is erased in release
builds.source§fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
.tap_ref_mut()
only in debug builds, and is erased in release
builds.source§fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
.tap_deref()
only in debug builds, and is erased in release
builds.