pub struct Postgres {
txn_mode: TransactionMode,
url: String,
schema: Option<String>,
tls: MakeTlsConnector,
client: Option<Client>,
statements: Option<PreparedStatements>,
epoch: Option<NonZeroI64>,
nonce: [u8; 16],
sinces_tx: UnboundedSender<(Id, Antichain<Timestamp>)>,
metrics: Arc<Metrics>,
}
Expand description
A Stash whose data is stored in a Postgres database. The format of the tables are not specified and should not be relied upon. The only promise is stability. Any changes to the table schemas will be accompanied by a clear migration path.
Fields§
§txn_mode: TransactionMode
§url: String
§schema: Option<String>
§tls: MakeTlsConnector
§client: Option<Client>
§statements: Option<PreparedStatements>
§epoch: Option<NonZeroI64>
§nonce: [u8; 16]
§sinces_tx: UnboundedSender<(Id, Antichain<Timestamp>)>
§metrics: Arc<Metrics>
Implementations§
source§impl Postgres
impl Postgres
sourcepub async fn clear(url: &str, tls: MakeTlsConnector) -> Result<(), StashError>
pub async fn clear(url: &str, tls: MakeTlsConnector) -> Result<(), StashError>
Drops all tables associated with the stash if they exist.
sourcepub async fn verify(&self) -> Result<(), StashError>
pub async fn verify(&self) -> Result<(), StashError>
Verifies stash invariants. Should only be called by tests.
sourceasync fn connect(&mut self) -> Result<(), StashError>
async fn connect(&mut self) -> Result<(), StashError>
Sets client
to a new connection to the Postgres server.
sourceasync fn transact<F, T>(&mut self, f: F) -> Result<T, StashError>where
F: for<'a> Fn(&'a CountedStatements<'a>, &'a Client) -> BoxFuture<'a, Result<T, StashError>>,
async fn transact<F, T>(&mut self, f: F) -> Result<T, StashError>where
F: for<'a> Fn(&'a CountedStatements<'a>, &'a Client) -> BoxFuture<'a, Result<T, StashError>>,
Construct a fenced transaction, which will cause this Stash to fail if
another connection is opened to it. f
may be called multiple times in a
backoff-retry loop if the Postgres server is unavailable, so it should only
call functions on its Transaction argument.
Examples
async fn x(&mut self) -> Result<(), StashError> {
self.transact(move |stmts, tx| {
Box::pin(async move {
// Use tx.
})
})
.await
async fn transact_inner<F, T>(&mut self, f: &F) -> Result<T, StashError>where
F: for<'a> Fn(&'a CountedStatements<'a>, &'a Client) -> BoxFuture<'a, Result<T, StashError>>,
async fn since_tx(
stmts: &CountedStatements<'_>,
tx: &Client,
collection_id: Id
) -> Result<Antichain<Timestamp>, StashError>
async fn upper_tx(
stmts: &CountedStatements<'_>,
tx: &Client,
collection_id: Id
) -> Result<Antichain<Timestamp>, StashError>
sourceasync fn seal_batch_tx<'a, I>(
stmts: &CountedStatements<'_>,
tx: &Client,
seals: I
) -> Result<(), StashError>where
I: Iterator<Item = (Id, &'a Antichain<Timestamp>, Option<Antichain<Timestamp>>)>,
async fn seal_batch_tx<'a, I>(
stmts: &CountedStatements<'_>,
tx: &Client,
seals: I
) -> Result<(), StashError>where
I: Iterator<Item = (Id, &'a Antichain<Timestamp>, Option<Antichain<Timestamp>>)>,
seals
has tuples of (collection id, new upper, Option<current upper>)
. The
current upper can be Some
if it is already known.
sourceasync fn update_many_tx(
stmts: &CountedStatements<'_>,
tx: &Client,
collection_id: Id,
entries: &[((Value, Value), Timestamp, Diff)],
upper: Option<Antichain<Timestamp>>
) -> Result<(), StashError>
async fn update_many_tx(
stmts: &CountedStatements<'_>,
tx: &Client,
collection_id: Id,
entries: &[((Value, Value), Timestamp, Diff)],
upper: Option<Antichain<Timestamp>>
) -> Result<(), StashError>
upper
can be Some
if the collection’s upper is already known.
sourceasync fn compact_batch_tx<'a, I>(
stmts: &CountedStatements<'_>,
tx: &Client,
compactions: I
) -> Result<(), StashError>where
I: Iterator<Item = (Id, &'a Antichain<Timestamp>, Option<Antichain<Timestamp>>)>,
async fn compact_batch_tx<'a, I>(
stmts: &CountedStatements<'_>,
tx: &Client,
compactions: I
) -> Result<(), StashError>where
I: Iterator<Item = (Id, &'a Antichain<Timestamp>, Option<Antichain<Timestamp>>)>,
compactions
has tuples of (collection id, new since, Option<current upper>)
. The current upper can be Some
if it is already known.
sourceasync fn sinces_batch_tx(
stmts: &CountedStatements<'_>,
tx: &Client,
collections: &[Id]
) -> Result<HashMap<Id, Antichain<Timestamp>>, StashError>
async fn sinces_batch_tx(
stmts: &CountedStatements<'_>,
tx: &Client,
collections: &[Id]
) -> Result<HashMap<Id, Antichain<Timestamp>>, StashError>
Returns sinces for the requested collections.
Trait Implementations§
source§impl Append for Postgres
impl Append for Postgres
source§fn append_batch<'life0, 'life1, 'async_trait>(
&'life0 mut self,
batches: &'life1 [AppendBatch]
) -> Pin<Box<dyn Future<Output = Result<(), StashError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn append_batch<'life0, 'life1, 'async_trait>(
&'life0 mut self,
batches: &'life1 [AppendBatch]
) -> Pin<Box<dyn Future<Output = Result<(), StashError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
append
, but does not consolidate batches.source§fn append<'life0, 'life1, 'async_trait>(
&'life0 mut self,
batches: &'life1 [AppendBatch]
) -> Pin<Box<dyn Future<Output = Result<(), StashError>> + Send + 'async_trait>>where
Self: Send + 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn append<'life0, 'life1, 'async_trait>(
&'life0 mut self,
batches: &'life1 [AppendBatch]
) -> Pin<Box<dyn Future<Output = Result<(), StashError>> + Send + 'async_trait>>where
Self: Send + 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
source§impl Stash for Postgres
impl Stash for Postgres
source§fn since<'life0, 'async_trait, K, V>(
&'life0 mut self,
collection: StashCollection<K, V>
) -> Pin<Box<dyn Future<Output = Result<Antichain<Timestamp>, StashError>> + Send + 'async_trait>>where
K: Data + 'async_trait,
V: Data + 'async_trait,
Self: 'async_trait,
'life0: 'async_trait,
fn since<'life0, 'async_trait, K, V>(
&'life0 mut self,
collection: StashCollection<K, V>
) -> Pin<Box<dyn Future<Output = Result<Antichain<Timestamp>, StashError>> + Send + 'async_trait>>where
K: Data + 'async_trait,
V: Data + 'async_trait,
Self: 'async_trait,
'life0: 'async_trait,
Reports the current since frontier.
source§fn upper<'life0, 'async_trait, K, V>(
&'life0 mut self,
collection: StashCollection<K, V>
) -> Pin<Box<dyn Future<Output = Result<Antichain<Timestamp>, StashError>> + Send + 'async_trait>>where
K: Data + 'async_trait,
V: Data + 'async_trait,
Self: 'async_trait,
'life0: 'async_trait,
fn upper<'life0, 'async_trait, K, V>(
&'life0 mut self,
collection: StashCollection<K, V>
) -> Pin<Box<dyn Future<Output = Result<Antichain<Timestamp>, StashError>> + Send + 'async_trait>>where
K: Data + 'async_trait,
V: Data + 'async_trait,
Self: 'async_trait,
'life0: 'async_trait,
Reports the current upper frontier.
source§fn collection<'life0, 'life1, 'async_trait, K, V>(
&'life0 mut self,
name: &'life1 str
) -> Pin<Box<dyn Future<Output = Result<StashCollection<K, V>, StashError>> + Send + 'async_trait>>where
K: Data + 'async_trait,
V: Data + 'async_trait,
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn collection<'life0, 'life1, 'async_trait, K, V>(
&'life0 mut self,
name: &'life1 str
) -> Pin<Box<dyn Future<Output = Result<StashCollection<K, V>, StashError>> + Send + 'async_trait>>where
K: Data + 'async_trait,
V: Data + 'async_trait,
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
source§fn collections<'life0, 'async_trait>(
&'life0 mut self
) -> Pin<Box<dyn Future<Output = Result<BTreeSet<String>, StashError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn collections<'life0, 'async_trait>(
&'life0 mut self
) -> Pin<Box<dyn Future<Output = Result<BTreeSet<String>, StashError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
source§fn iter<'life0, 'async_trait, K, V>(
&'life0 mut self,
collection: StashCollection<K, V>
) -> Pin<Box<dyn Future<Output = Result<Vec<((K, V), Timestamp, Diff)>, StashError>> + Send + 'async_trait>>where
K: Data + 'async_trait,
V: Data + 'async_trait,
Self: 'async_trait,
'life0: 'async_trait,
fn iter<'life0, 'async_trait, K, V>(
&'life0 mut self,
collection: StashCollection<K, V>
) -> Pin<Box<dyn Future<Output = Result<Vec<((K, V), Timestamp, Diff)>, StashError>> + Send + 'async_trait>>where
K: Data + 'async_trait,
V: Data + 'async_trait,
Self: 'async_trait,
'life0: 'async_trait,
source§fn iter_key<'life0, 'life1, 'async_trait, K, V>(
&'life0 mut self,
collection: StashCollection<K, V>,
key: &'life1 K
) -> Pin<Box<dyn Future<Output = Result<Vec<(V, Timestamp, Diff)>, StashError>> + Send + 'async_trait>>where
K: Data + 'async_trait,
V: Data + 'async_trait,
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn iter_key<'life0, 'life1, 'async_trait, K, V>(
&'life0 mut self,
collection: StashCollection<K, V>,
key: &'life1 K
) -> Pin<Box<dyn Future<Output = Result<Vec<(V, Timestamp, Diff)>, StashError>> + Send + 'async_trait>>where
K: Data + 'async_trait,
V: Data + 'async_trait,
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
source§fn update_many<'life0, 'async_trait, K, V, I>(
&'life0 mut self,
collection: StashCollection<K, V>,
entries: I
) -> Pin<Box<dyn Future<Output = Result<(), StashError>> + Send + 'async_trait>>where
K: Data + 'async_trait,
V: Data + 'async_trait,
I: IntoIterator<Item = ((K, V), Timestamp, Diff)> + Send + 'async_trait,
I::IntoIter: Send,
Self: 'async_trait,
'life0: 'async_trait,
fn update_many<'life0, 'async_trait, K, V, I>(
&'life0 mut self,
collection: StashCollection<K, V>,
entries: I
) -> Pin<Box<dyn Future<Output = Result<(), StashError>> + Send + 'async_trait>>where
K: Data + 'async_trait,
V: Data + 'async_trait,
I: IntoIterator<Item = ((K, V), Timestamp, Diff)> + Send + 'async_trait,
I::IntoIter: Send,
Self: 'async_trait,
'life0: 'async_trait,
source§fn seal<'life0, 'life1, 'async_trait, K, V>(
&'life0 mut self,
collection: StashCollection<K, V>,
new_upper: AntichainRef<'life1, Timestamp>
) -> Pin<Box<dyn Future<Output = Result<(), StashError>> + Send + 'async_trait>>where
K: Data + 'async_trait,
V: Data + 'async_trait,
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn seal<'life0, 'life1, 'async_trait, K, V>(
&'life0 mut self,
collection: StashCollection<K, V>,
new_upper: AntichainRef<'life1, Timestamp>
) -> Pin<Box<dyn Future<Output = Result<(), StashError>> + Send + 'async_trait>>where
K: Data + 'async_trait,
V: Data + 'async_trait,
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
source§fn seal_batch<'life0, 'life1, 'async_trait, K, V>(
&'life0 mut self,
seals: &'life1 [(StashCollection<K, V>, Antichain<Timestamp>)]
) -> Pin<Box<dyn Future<Output = Result<(), StashError>> + Send + 'async_trait>>where
K: Data + 'async_trait,
V: Data + 'async_trait,
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn seal_batch<'life0, 'life1, 'async_trait, K, V>(
&'life0 mut self,
seals: &'life1 [(StashCollection<K, V>, Antichain<Timestamp>)]
) -> Pin<Box<dyn Future<Output = Result<(), StashError>> + Send + 'async_trait>>where
K: Data + 'async_trait,
V: Data + 'async_trait,
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
source§fn compact<'a, 'async_trait, K, V>(
&'a mut self,
collection: StashCollection<K, V>,
new_since: AntichainRef<'a, Timestamp>
) -> Pin<Box<dyn Future<Output = Result<(), StashError>> + Send + 'async_trait>>where
K: Data + 'async_trait,
V: Data + 'async_trait,
Self: 'async_trait,
'a: 'async_trait,
fn compact<'a, 'async_trait, K, V>(
&'a mut self,
collection: StashCollection<K, V>,
new_since: AntichainRef<'a, Timestamp>
) -> Pin<Box<dyn Future<Output = Result<(), StashError>> + Send + 'async_trait>>where
K: Data + 'async_trait,
V: Data + 'async_trait,
Self: 'async_trait,
'a: 'async_trait,
source§fn compact_batch<'life0, 'life1, 'async_trait, K, V>(
&'life0 mut self,
compactions: &'life1 [(StashCollection<K, V>, Antichain<Timestamp>)]
) -> Pin<Box<dyn Future<Output = Result<(), StashError>> + Send + 'async_trait>>where
K: Data + 'async_trait,
V: Data + 'async_trait,
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn compact_batch<'life0, 'life1, 'async_trait, K, V>(
&'life0 mut self,
compactions: &'life1 [(StashCollection<K, V>, Antichain<Timestamp>)]
) -> Pin<Box<dyn Future<Output = Result<(), StashError>> + Send + 'async_trait>>where
K: Data + 'async_trait,
V: Data + 'async_trait,
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
source§fn consolidate<'life0, 'async_trait>(
&'life0 mut self,
collection: Id
) -> Pin<Box<dyn Future<Output = Result<(), StashError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn consolidate<'life0, 'async_trait>(
&'life0 mut self,
collection: Id
) -> Pin<Box<dyn Future<Output = Result<(), StashError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
source§fn consolidate_batch<'life0, 'life1, 'async_trait>(
&'life0 mut self,
collections: &'life1 [Id]
) -> Pin<Box<dyn Future<Output = Result<(), StashError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn consolidate_batch<'life0, 'life1, 'async_trait>(
&'life0 mut self,
collections: &'life1 [Id]
) -> Pin<Box<dyn Future<Output = Result<(), StashError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
source§fn confirm_leadership<'life0, 'async_trait>(
&'life0 mut self
) -> Pin<Box<dyn Future<Output = Result<(), StashError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn confirm_leadership<'life0, 'async_trait>(
&'life0 mut self
) -> Pin<Box<dyn Future<Output = Result<(), StashError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Ok
if this stash instance was the leader at some
point from the invocation of this method to the return of this
method. Otherwise, returns Err
.source§fn is_readonly(&self) -> bool
fn is_readonly(&self) -> bool
source§fn epoch(&self) -> Option<NonZeroI64>
fn epoch(&self) -> Option<NonZeroI64>
Some
, it is a positive number that
increases with each start of a stash.source§fn peek_timestamp<'life0, 'async_trait, K, V>(
&'life0 mut self,
collection: StashCollection<K, V>
) -> Pin<Box<dyn Future<Output = Result<Timestamp, StashError>> + Send + 'async_trait>>where
K: Data + 'async_trait,
V: Data + 'async_trait,
Self: 'async_trait,
'life0: 'async_trait,
fn peek_timestamp<'life0, 'async_trait, K, V>(
&'life0 mut self,
collection: StashCollection<K, V>
) -> Pin<Box<dyn Future<Output = Result<Timestamp, StashError>> + Send + 'async_trait>>where
K: Data + 'async_trait,
V: Data + 'async_trait,
Self: 'async_trait,
'life0: 'async_trait,
source§fn peek<'life0, 'async_trait, K, V>(
&'life0 mut self,
collection: StashCollection<K, V>
) -> Pin<Box<dyn Future<Output = Result<Vec<(K, V, Diff)>, StashError>> + Send + 'async_trait>>where
K: Data + 'async_trait,
V: Data + 'async_trait,
Self: 'async_trait,
'life0: 'async_trait,
fn peek<'life0, 'async_trait, K, V>(
&'life0 mut self,
collection: StashCollection<K, V>
) -> Pin<Box<dyn Future<Output = Result<Vec<(K, V, Diff)>, StashError>> + Send + 'async_trait>>where
K: Data + 'async_trait,
V: Data + 'async_trait,
Self: 'async_trait,
'life0: 'async_trait,
source§fn peek_one<'life0, 'async_trait, K, V>(
&'life0 mut self,
collection: StashCollection<K, V>
) -> Pin<Box<dyn Future<Output = Result<BTreeMap<K, V>, StashError>> + Send + 'async_trait>>where
K: Data + Hash + 'async_trait,
V: Data + 'async_trait,
Self: 'async_trait,
'life0: 'async_trait,
fn peek_one<'life0, 'async_trait, K, V>(
&'life0 mut self,
collection: StashCollection<K, V>
) -> Pin<Box<dyn Future<Output = Result<BTreeMap<K, V>, StashError>> + Send + 'async_trait>>where
K: Data + Hash + 'async_trait,
V: Data + 'async_trait,
Self: 'async_trait,
'life0: 'async_trait,
source§fn peek_key_one<'life0, 'life1, 'async_trait, K, V>(
&'life0 mut self,
collection: StashCollection<K, V>,
key: &'life1 K
) -> Pin<Box<dyn Future<Output = Result<Option<V>, StashError>> + Send + 'async_trait>>where
K: Data + 'async_trait,
V: Data + 'async_trait,
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn peek_key_one<'life0, 'life1, 'async_trait, K, V>(
&'life0 mut self,
collection: StashCollection<K, V>,
key: &'life1 K
) -> Pin<Box<dyn Future<Output = Result<Option<V>, StashError>> + Send + 'async_trait>>where
K: Data + 'async_trait,
V: Data + 'async_trait,
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
source§fn update<'life0, 'async_trait, K, V>(
&'life0 mut self,
collection: StashCollection<K, V>,
data: (K, V),
time: Timestamp,
diff: Diff
) -> Pin<Box<dyn Future<Output = Result<(), StashError>> + Send + 'async_trait>>where
K: Data + 'async_trait,
V: Data + 'async_trait,
Self: 'async_trait,
'life0: 'async_trait,
fn update<'life0, 'async_trait, K, V>(
&'life0 mut self,
collection: StashCollection<K, V>,
data: (K, V),
time: Timestamp,
diff: Diff
) -> Pin<Box<dyn Future<Output = Result<(), StashError>> + Send + 'async_trait>>where
K: Data + 'async_trait,
V: Data + 'async_trait,
Self: 'async_trait,
'life0: 'async_trait,
Auto Trait Implementations§
impl !RefUnwindSafe for Postgres
impl Send for Postgres
impl Sync for Postgres
impl Unpin for Postgres
impl !UnwindSafe for Postgres
Blanket Implementations§
source§impl<T> FutureExt for T
impl<T> FutureExt for T
source§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
source§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
source§impl<T> Instrument for T
impl<T> Instrument for T
source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T
in a tonic::Request