use std::cell::RefCell;
use std::cmp::Reverse;
use std::convert::AsRef;
use std::fmt::Debug;
use std::hash::{Hash, Hasher};
use std::sync::Arc;
use differential_dataflow::hashable::Hashable;
use differential_dataflow::{AsCollection, Collection};
use futures::future::FutureExt;
use futures::StreamExt;
use indexmap::map::Entry;
use itertools::Itertools;
use mz_ore::error::ErrorExt;
use mz_repr::{Datum, DatumVec, Diff, Row};
use mz_rocksdb::ValueIterator;
use mz_storage_operators::metrics::BackpressureMetrics;
use mz_storage_types::configuration::StorageConfiguration;
use mz_storage_types::dyncfgs;
use mz_storage_types::errors::{DataflowError, EnvelopeError, UpsertError};
use mz_storage_types::sources::envelope::UpsertEnvelope;
use mz_timely_util::builder_async::{
AsyncOutputHandle, Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder,
PressOnDropButton,
};
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use timely::dataflow::channels::pact::Exchange;
use timely::dataflow::channels::pushers::Tee;
use timely::dataflow::operators::{Capability, InputCapability, Operator};
use timely::dataflow::{Scope, ScopeParent, Stream};
use timely::order::{PartialOrder, TotalOrder};
use timely::progress::{Antichain, Timestamp};
use crate::healthcheck::HealthStatusUpdate;
use crate::metrics::upsert::UpsertMetrics;
use crate::render::sources::OutputIndex;
use crate::storage_state::StorageInstanceContext;
use crate::upsert_continual_feedback;
use autospill::AutoSpillBackend;
use memory::InMemoryHashMap;
use types::{
consolidating_merge_function, upsert_bincode_opts, BincodeOpts, StateValue, UpsertState,
UpsertStateBackend, Value,
};
mod autospill;
mod memory;
mod rocksdb;
pub(crate) mod types;
pub type UpsertValue = Result<Row, UpsertError>;
#[derive(Copy, Clone, Debug, Hash, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub struct UpsertKey([u8; 32]);
impl AsRef<[u8]> for UpsertKey {
#[inline(always)]
fn as_ref(&self) -> &[u8] {
&self.0
}
}
impl From<&[u8]> for UpsertKey {
fn from(bytes: &[u8]) -> Self {
UpsertKey(bytes.try_into().expect("invalid key length"))
}
}
type KeyHash = Sha256;
impl UpsertKey {
pub fn from_key(key: Result<&Row, &UpsertError>) -> Self {
Self::from_iter(key.map(|r| r.iter()))
}
pub fn from_value(value: Result<&Row, &UpsertError>, key_indices: &[usize]) -> Self {
thread_local! {
static VALUE_DATUMS: RefCell<DatumVec> = RefCell::new(DatumVec::new());
}
VALUE_DATUMS.with(|value_datums| {
let mut value_datums = value_datums.borrow_mut();
let value = value.map(|v| value_datums.borrow_with(v));
let key = match value {
Ok(ref datums) => Ok(key_indices.iter().map(|&idx| datums[idx])),
Err(err) => Err(err),
};
Self::from_iter(key)
})
}
pub fn from_iter<'a, 'b>(
key: Result<impl Iterator<Item = Datum<'a>> + 'b, &UpsertError>,
) -> Self {
thread_local! {
static KEY_DATUMS: RefCell<DatumVec> = RefCell::new(DatumVec::new());
}
KEY_DATUMS.with(|key_datums| {
let mut key_datums = key_datums.borrow_mut();
let mut key_datums = key_datums.borrow();
let key: Result<&[Datum], Datum> = match key {
Ok(key) => {
for datum in key {
key_datums.push(datum);
}
Ok(&*key_datums)
}
Err(UpsertError::Value(err)) => {
key_datums.extend(err.for_key.iter());
Ok(&*key_datums)
}
Err(UpsertError::KeyDecode(err)) => Err(Datum::Bytes(&err.raw)),
Err(UpsertError::NullKey(_)) => Err(Datum::Null),
};
let mut hasher = DigestHasher(KeyHash::new());
key.hash(&mut hasher);
Self(hasher.0.finalize().into())
})
}
}
struct DigestHasher<H: Digest>(H);
impl<H: Digest> Hasher for DigestHasher<H> {
fn write(&mut self, bytes: &[u8]) {
self.0.update(bytes);
}
fn finish(&self) -> u64 {
panic!("digest wrapper used to produce a hash");
}
}
use std::convert::Infallible;
use timely::container::CapacityContainerBuilder;
use timely::dataflow::channels::pact::Pipeline;
use self::types::ValueMetadata;
pub fn rehydration_finished<G, T>(
scope: G,
source_config: &crate::source::RawSourceCreationConfig,
token: impl std::any::Any + 'static,
resume_upper: Antichain<T>,
input: &Stream<G, Infallible>,
) where
G: Scope<Timestamp = T>,
T: Timestamp,
{
let worker_id = source_config.worker_id;
let id = source_config.id;
let mut builder = AsyncOperatorBuilder::new(format!("rehydration_finished({id}"), scope);
let mut input = builder.new_disconnected_input(input, Pipeline);
builder.build(move |_capabilities| async move {
let mut input_upper = Antichain::from_elem(Timestamp::minimum());
while !PartialOrder::less_equal(&resume_upper, &input_upper) {
let Some(event) = input.next().await else {
break;
};
if let AsyncEvent::Progress(upper) = event {
input_upper = upper;
}
}
tracing::info!(
%worker_id,
source_id = %id,
"upsert source has downgraded past the resume upper ({resume_upper:?}) across all workers",
);
drop(token);
});
}
pub(crate) fn upsert<G: Scope, FromTime>(
input: &Collection<G, (UpsertKey, Option<UpsertValue>, FromTime), Diff>,
upsert_envelope: UpsertEnvelope,
resume_upper: Antichain<G::Timestamp>,
previous: Collection<G, Result<Row, DataflowError>, Diff>,
previous_token: Option<Vec<PressOnDropButton>>,
source_config: crate::source::SourceExportCreationConfig,
instance_context: &StorageInstanceContext,
storage_configuration: &StorageConfiguration,
dataflow_paramters: &crate::internal_control::DataflowParameters,
backpressure_metrics: Option<BackpressureMetrics>,
) -> (
Collection<G, Result<Row, DataflowError>, Diff>,
Stream<G, (OutputIndex, HealthStatusUpdate)>,
Stream<G, Infallible>,
PressOnDropButton,
)
where
G::Timestamp: TotalOrder + Sync,
FromTime: Timestamp + Sync,
{
let upsert_metrics = source_config.metrics.get_upsert_metrics(
source_config.id,
source_config.worker_id,
backpressure_metrics,
);
let rocksdb_cleanup_tries =
dyncfgs::STORAGE_ROCKSDB_CLEANUP_TRIES.get(storage_configuration.config_set());
let prevent_snapshot_buffering =
dyncfgs::STORAGE_UPSERT_PREVENT_SNAPSHOT_BUFFERING.get(storage_configuration.config_set());
let snapshot_buffering_max = dyncfgs::STORAGE_UPSERT_MAX_SNAPSHOT_BATCH_BUFFERING
.get(storage_configuration.config_set());
let rocksdb_use_native_merge_operator =
dyncfgs::STORAGE_ROCKSDB_USE_MERGE_OPERATOR.get(storage_configuration.config_set());
let upsert_config = UpsertConfig {
shrink_upsert_unused_buffers_by_ratio: storage_configuration
.parameters
.shrink_upsert_unused_buffers_by_ratio,
};
let thin_input = upsert_thinning(input);
if let Some(scratch_directory) = instance_context.scratch_directory.as_ref() {
let tuning = dataflow_paramters.upsert_rocksdb_tuning_config.clone();
let allow_auto_spill = storage_configuration
.parameters
.upsert_auto_spill_config
.allow_spilling_to_disk;
let spill_threshold = storage_configuration
.parameters
.upsert_auto_spill_config
.spill_to_disk_threshold_bytes;
tracing::info!(
worker_id = %source_config.worker_id,
source_id = %source_config.id,
?tuning,
?storage_configuration.parameters.upsert_auto_spill_config,
?rocksdb_use_native_merge_operator,
"rendering upsert source with rocksdb-backed upsert state"
);
let rocksdb_shared_metrics = Arc::clone(&upsert_metrics.rocksdb_shared);
let rocksdb_instance_metrics = Arc::clone(&upsert_metrics.rocksdb_instance_metrics);
let rocksdb_dir = scratch_directory
.join("storage")
.join("upsert")
.join(source_config.id.to_string())
.join(source_config.worker_id.to_string());
let env = instance_context.rocksdb_env.clone();
let rocksdb_in_use_metric = Arc::clone(&upsert_metrics.rocksdb_autospill_in_use);
let rocksdb_init_fn = move || async move {
let merge_operator =
if rocksdb_use_native_merge_operator {
Some((
"upsert_state_snapshot_merge_v1".to_string(),
|a: &[u8],
b: ValueIterator<
BincodeOpts,
StateValue<G::Timestamp, Option<FromTime>>,
>| {
consolidating_merge_function::<G::Timestamp, Option<FromTime>>(
a.into(),
b,
)
},
))
} else {
None
};
rocksdb::RocksDB::new(
mz_rocksdb::RocksDBInstance::new(
&rocksdb_dir,
mz_rocksdb::InstanceOptions::new(
env,
rocksdb_cleanup_tries,
merge_operator,
upsert_bincode_opts(),
),
tuning,
rocksdb_shared_metrics,
rocksdb_instance_metrics,
)
.await
.unwrap(),
)
};
if allow_auto_spill {
upsert_operator(
&thin_input,
upsert_envelope.key_indices,
resume_upper,
previous,
previous_token,
upsert_metrics,
source_config,
move || async move {
AutoSpillBackend::new(rocksdb_init_fn, spill_threshold, rocksdb_in_use_metric)
},
upsert_config,
storage_configuration,
prevent_snapshot_buffering,
snapshot_buffering_max,
)
} else {
upsert_operator(
&thin_input,
upsert_envelope.key_indices,
resume_upper,
previous,
previous_token,
upsert_metrics,
source_config,
rocksdb_init_fn,
upsert_config,
storage_configuration,
prevent_snapshot_buffering,
snapshot_buffering_max,
)
}
} else {
tracing::info!(
worker_id = %source_config.worker_id,
source_id = %source_config.id,
"rendering upsert source with memory-backed upsert state",
);
upsert_operator(
&thin_input,
upsert_envelope.key_indices,
resume_upper,
previous,
previous_token,
upsert_metrics,
source_config,
|| async { InMemoryHashMap::default() },
upsert_config,
storage_configuration,
prevent_snapshot_buffering,
snapshot_buffering_max,
)
}
}
fn upsert_operator<G: Scope, FromTime, F, Fut, US>(
input: &Collection<G, (UpsertKey, Option<UpsertValue>, FromTime), Diff>,
key_indices: Vec<usize>,
resume_upper: Antichain<G::Timestamp>,
persist_input: Collection<G, Result<Row, DataflowError>, Diff>,
persist_token: Option<Vec<PressOnDropButton>>,
upsert_metrics: UpsertMetrics,
source_config: crate::source::SourceExportCreationConfig,
state: F,
upsert_config: UpsertConfig,
storage_configuration: &StorageConfiguration,
prevent_snapshot_buffering: bool,
snapshot_buffering_max: Option<usize>,
) -> (
Collection<G, Result<Row, DataflowError>, Diff>,
Stream<G, (OutputIndex, HealthStatusUpdate)>,
Stream<G, Infallible>,
PressOnDropButton,
)
where
G::Timestamp: TotalOrder + Sync,
F: FnOnce() -> Fut + 'static,
Fut: std::future::Future<Output = US>,
US: UpsertStateBackend<G::Timestamp, Option<FromTime>>,
FromTime: Debug + timely::ExchangeData + Ord + Sync,
{
let use_continual_feedback_upsert =
dyncfgs::STORAGE_USE_CONTINUAL_FEEDBACK_UPSERT.get(storage_configuration.config_set());
tracing::info!(id = %source_config.id, %use_continual_feedback_upsert, "upsert operator implementation");
if use_continual_feedback_upsert {
upsert_continual_feedback::upsert_inner(
input,
key_indices,
resume_upper,
persist_input,
persist_token,
upsert_metrics,
source_config,
state,
upsert_config,
prevent_snapshot_buffering,
snapshot_buffering_max,
)
} else {
upsert_classic(
input,
key_indices,
resume_upper,
persist_input,
persist_token,
upsert_metrics,
source_config,
state,
upsert_config,
prevent_snapshot_buffering,
snapshot_buffering_max,
)
}
}
fn upsert_thinning<G, K, V, FromTime>(
input: &Collection<G, (K, V, FromTime), Diff>,
) -> Collection<G, (K, V, FromTime), Diff>
where
G: Scope,
G::Timestamp: TotalOrder,
K: timely::Data + Eq + Ord,
V: timely::Data,
FromTime: Timestamp,
{
input
.inner
.unary(Pipeline, "UpsertThinning", |_, _| {
let mut capability: Option<InputCapability<G::Timestamp>> = None;
let mut updates = Vec::new();
move |input, output| {
while let Some((cap, data)) = input.next() {
assert!(
data.iter().all(|(_, _, diff)| *diff > 0),
"invalid upsert input"
);
updates.append(data);
match capability.as_mut() {
Some(capability) => {
if cap.time() <= capability.time() {
*capability = cap;
}
}
None => capability = Some(cap),
}
}
if let Some(capability) = capability.take() {
updates.sort_unstable_by(|a, b| {
let ((key1, _, from_time1), time1, _) = a;
let ((key2, _, from_time2), time2, _) = b;
Ord::cmp(
&(key1, time1, Reverse(from_time1)),
&(key2, time2, Reverse(from_time2)),
)
});
let mut session = output.session(&capability);
session.give_iterator(updates.drain(..).dedup_by(|a, b| {
let ((key1, _, _), time1, _) = a;
let ((key2, _, _), time2, _) = b;
(key1, time1) == (key2, time2)
}))
}
}
})
.as_collection()
}
fn stage_input<T, FromTime>(
stash: &mut Vec<(T, UpsertKey, Reverse<FromTime>, Option<UpsertValue>)>,
data: &mut Vec<((UpsertKey, Option<UpsertValue>, FromTime), T, Diff)>,
input_upper: &Antichain<T>,
resume_upper: &Antichain<T>,
storage_shrink_upsert_unused_buffers_by_ratio: usize,
) where
T: PartialOrder,
FromTime: Ord,
{
if PartialOrder::less_equal(input_upper, resume_upper) {
data.retain(|(_, ts, _)| resume_upper.less_equal(ts));
}
stash.extend(data.drain(..).map(|((key, value, order), time, diff)| {
assert!(diff > 0, "invalid upsert input");
(time, key, Reverse(order), value)
}));
if storage_shrink_upsert_unused_buffers_by_ratio > 0 {
let reduced_capacity = stash.capacity() / storage_shrink_upsert_unused_buffers_by_ratio;
if reduced_capacity > stash.len() {
stash.shrink_to(reduced_capacity);
}
}
}
#[derive(Debug)]
enum DrainStyle<'a, T> {
ToUpper(&'a Antichain<T>),
AtTime(T),
}
async fn drain_staged_input<S, G, T, FromTime, E>(
stash: &mut Vec<(T, UpsertKey, Reverse<FromTime>, Option<UpsertValue>)>,
commands_state: &mut indexmap::IndexMap<
UpsertKey,
types::UpsertValueAndSize<T, Option<FromTime>>,
>,
output_updates: &mut Vec<(Result<Row, UpsertError>, T, Diff)>,
multi_get_scratch: &mut Vec<UpsertKey>,
drain_style: DrainStyle<'_, T>,
error_emitter: &mut E,
state: &mut UpsertState<'_, S, T, Option<FromTime>>,
) where
S: UpsertStateBackend<T, Option<FromTime>>,
G: Scope,
T: PartialOrder + Ord + Clone + Send + Sync + Serialize + Debug + 'static,
FromTime: timely::ExchangeData + Ord + Sync,
E: UpsertErrorEmitter<G>,
{
stash.sort_unstable();
let idx = stash.partition_point(|(ts, _, _, _)| match &drain_style {
DrainStyle::ToUpper(upper) => !upper.less_equal(ts),
DrainStyle::AtTime(time) => ts <= time,
});
tracing::trace!(?drain_style, updates = idx, "draining stash in upsert");
commands_state.clear();
for (_, key, _, _) in stash.iter().take(idx) {
commands_state.entry(*key).or_default();
}
multi_get_scratch.clear();
multi_get_scratch.extend(commands_state.iter().map(|(k, _)| *k));
match state
.multi_get(multi_get_scratch.drain(..), commands_state.values_mut())
.await
{
Ok(_) => {}
Err(e) => {
error_emitter
.emit("Failed to fetch records from state".to_string(), e)
.await;
}
}
let mut commands = stash.drain(..idx).dedup_by(|a, b| {
let ((a_ts, a_key, _, _), (b_ts, b_key, _, _)) = (a, b);
a_ts == b_ts && a_key == b_key
});
let bincode_opts = types::upsert_bincode_opts();
while let Some((ts, key, from_time, value)) = commands.next() {
let mut command_state = if let Entry::Occupied(command_state) = commands_state.entry(key) {
command_state
} else {
panic!("key missing from commands_state");
};
let existing_value = &mut command_state.get_mut().value;
if let Some(cs) = existing_value.as_mut() {
cs.ensure_decoded(bincode_opts);
}
let existing_order = existing_value.as_ref().and_then(|cs| cs.order().as_ref());
if existing_order >= Some(&from_time.0) {
continue;
}
match value {
Some(value) => {
if let Some(old_value) = existing_value.replace(StateValue::finalized_value(
value.clone(),
Some(from_time.0.clone()),
)) {
if let Value::FinalizedValue(old_value, _) = old_value.into_decoded() {
output_updates.push((old_value, ts.clone(), -1));
}
}
output_updates.push((value, ts, 1));
}
None => {
if let Some(old_value) = existing_value.take() {
if let Value::FinalizedValue(old_value, _) = old_value.into_decoded() {
output_updates.push((old_value, ts, -1));
}
}
*existing_value = Some(StateValue::tombstone(Some(from_time.0.clone())));
}
}
}
match state
.multi_put(
true, commands_state.drain(..).map(|(k, cv)| {
(
k,
types::PutValue {
value: cv.value.map(|cv| cv.into_decoded()),
previous_value_metadata: cv.metadata.map(|v| ValueMetadata {
size: v.size.try_into().expect("less than i64 size"),
is_tombstone: v.is_tombstone,
}),
},
)
}),
)
.await
{
Ok(_) => {}
Err(e) => {
error_emitter
.emit("Failed to update records in state".to_string(), e)
.await;
}
}
}
pub(crate) struct UpsertConfig {
pub shrink_upsert_unused_buffers_by_ratio: usize,
}
fn upsert_classic<G: Scope, FromTime, F, Fut, US>(
input: &Collection<G, (UpsertKey, Option<UpsertValue>, FromTime), Diff>,
key_indices: Vec<usize>,
resume_upper: Antichain<G::Timestamp>,
previous: Collection<G, Result<Row, DataflowError>, Diff>,
previous_token: Option<Vec<PressOnDropButton>>,
upsert_metrics: UpsertMetrics,
source_config: crate::source::SourceExportCreationConfig,
state: F,
upsert_config: UpsertConfig,
prevent_snapshot_buffering: bool,
snapshot_buffering_max: Option<usize>,
) -> (
Collection<G, Result<Row, DataflowError>, Diff>,
Stream<G, (OutputIndex, HealthStatusUpdate)>,
Stream<G, Infallible>,
PressOnDropButton,
)
where
G::Timestamp: TotalOrder + Sync,
F: FnOnce() -> Fut + 'static,
Fut: std::future::Future<Output = US>,
US: UpsertStateBackend<G::Timestamp, Option<FromTime>>,
FromTime: timely::ExchangeData + Ord + Sync,
{
let mut builder = AsyncOperatorBuilder::new("Upsert".to_string(), input.scope());
let previous = previous.flat_map(move |result| {
let value = match result {
Ok(ok) => Ok(ok),
Err(DataflowError::EnvelopeError(err)) => match *err {
EnvelopeError::Upsert(err) => Err(err),
_ => return None,
},
Err(_) => return None,
};
Some((UpsertKey::from_value(value.as_ref(), &key_indices), value))
});
let (output_handle, output) = builder.new_output();
let (_snapshot_handle, snapshot_stream) =
builder.new_output::<CapacityContainerBuilder<Vec<Infallible>>>();
let (mut health_output, health_stream) = builder.new_output();
let mut input = builder.new_input_for(
&input.inner,
Exchange::new(move |((key, _, _), _, _)| UpsertKey::hashed(key)),
&output_handle,
);
let mut previous = builder.new_input_for(
&previous.inner,
Exchange::new(|((key, _), _, _)| UpsertKey::hashed(key)),
&output_handle,
);
let upsert_shared_metrics = Arc::clone(&upsert_metrics.shared);
let shutdown_button = builder.build(move |caps| async move {
let [mut output_cap, mut snapshot_cap, health_cap]: [_; 3] = caps.try_into().unwrap();
let mut state = UpsertState::<_, _, Option<FromTime>>::new(
state().await,
upsert_shared_metrics,
&upsert_metrics,
source_config.source_statistics,
upsert_config.shrink_upsert_unused_buffers_by_ratio,
);
let mut events = vec![];
let mut snapshot_upper = Antichain::from_elem(Timestamp::minimum());
let mut stash = vec![];
let mut error_emitter = (&mut health_output, &health_cap);
tracing::info!(
?resume_upper,
?snapshot_upper,
"timely-{} upsert source {} starting rehydration",
source_config.worker_id,
source_config.id
);
while !PartialOrder::less_equal(&resume_upper, &snapshot_upper) {
previous.ready().await;
while let Some(event) = previous.next_sync() {
match event {
AsyncEvent::Data(_cap, data) => {
events.extend(data.into_iter().filter_map(|((key, value), ts, diff)| {
if !resume_upper.less_equal(&ts) {
Some((key, value, diff))
} else {
None
}
}))
}
AsyncEvent::Progress(upper) => {
snapshot_upper = upper;
}
};
}
match state
.consolidate_chunk(
events.drain(..),
PartialOrder::less_equal(&resume_upper, &snapshot_upper),
)
.await
{
Ok(_) => {
if let Some(ts) = snapshot_upper.clone().into_option() {
if !resume_upper.less_equal(&ts) {
snapshot_cap.downgrade(&ts);
output_cap.downgrade(&ts);
}
}
}
Err(e) => {
UpsertErrorEmitter::<G>::emit(
&mut error_emitter,
"Failed to rehydrate state".to_string(),
e,
)
.await;
}
}
}
drop(events);
drop(previous_token);
drop(snapshot_cap);
while let Some(_event) = previous.next().await {}
if let Some(ts) = resume_upper.as_option() {
output_cap.downgrade(ts);
}
tracing::info!(
"timely-{} upsert source {} finished rehydration",
source_config.worker_id,
source_config.id
);
let mut commands_state: indexmap::IndexMap<
_,
types::UpsertValueAndSize<G::Timestamp, Option<FromTime>>,
> = indexmap::IndexMap::new();
let mut multi_get_scratch = Vec::new();
let mut output_updates = vec![];
let mut input_upper = Antichain::from_elem(Timestamp::minimum());
while let Some(event) = input.next().await {
let events = [event]
.into_iter()
.chain(std::iter::from_fn(|| input.next().now_or_never().flatten()))
.enumerate();
let mut partial_drain_time = None;
for (i, event) in events {
match event {
AsyncEvent::Data(cap, mut data) => {
tracing::trace!(
time=?cap.time(),
updates=%data.len(),
"received data in upsert"
);
stage_input(
&mut stash,
&mut data,
&input_upper,
&resume_upper,
upsert_config.shrink_upsert_unused_buffers_by_ratio,
);
let event_time = cap.time();
if prevent_snapshot_buffering && output_cap.time() == event_time {
partial_drain_time = Some(event_time.clone());
}
}
AsyncEvent::Progress(upper) => {
tracing::trace!(?upper, "received progress in upsert");
if PartialOrder::less_than(&upper, &resume_upper) {
continue;
}
partial_drain_time = None;
drain_staged_input::<_, G, _, _, _>(
&mut stash,
&mut commands_state,
&mut output_updates,
&mut multi_get_scratch,
DrainStyle::ToUpper(&upper),
&mut error_emitter,
&mut state,
)
.await;
output_handle.give_container(&output_cap, &mut output_updates);
if let Some(ts) = upper.as_option() {
output_cap.downgrade(ts);
}
input_upper = upper;
}
}
let events_processed = i + 1;
if let Some(max) = snapshot_buffering_max {
if events_processed >= max {
break;
}
}
}
if let Some(partial_drain_time) = partial_drain_time {
drain_staged_input::<_, G, _, _, _>(
&mut stash,
&mut commands_state,
&mut output_updates,
&mut multi_get_scratch,
DrainStyle::AtTime(partial_drain_time),
&mut error_emitter,
&mut state,
)
.await;
output_handle.give_container(&output_cap, &mut output_updates);
}
}
});
(
output.as_collection().map(|result| match result {
Ok(ok) => Ok(ok),
Err(err) => Err(DataflowError::from(EnvelopeError::Upsert(err))),
}),
health_stream,
snapshot_stream,
shutdown_button.press_on_drop(),
)
}
#[async_trait::async_trait(?Send)]
pub(crate) trait UpsertErrorEmitter<G> {
async fn emit(&mut self, context: String, e: anyhow::Error);
}
#[async_trait::async_trait(?Send)]
impl<G: Scope> UpsertErrorEmitter<G>
for (
&mut AsyncOutputHandle<
<G as ScopeParent>::Timestamp,
CapacityContainerBuilder<Vec<(OutputIndex, HealthStatusUpdate)>>,
Tee<<G as ScopeParent>::Timestamp, Vec<(OutputIndex, HealthStatusUpdate)>>,
>,
&Capability<<G as ScopeParent>::Timestamp>,
)
{
async fn emit(&mut self, context: String, e: anyhow::Error) {
process_upsert_state_error::<G>(context, e, self.0, self.1).await
}
}
async fn process_upsert_state_error<G: Scope>(
context: String,
e: anyhow::Error,
health_output: &AsyncOutputHandle<
<G as ScopeParent>::Timestamp,
CapacityContainerBuilder<Vec<(OutputIndex, HealthStatusUpdate)>>,
Tee<<G as ScopeParent>::Timestamp, Vec<(OutputIndex, HealthStatusUpdate)>>,
>,
health_cap: &Capability<<G as ScopeParent>::Timestamp>,
) {
let update = HealthStatusUpdate::halting(e.context(context).to_string_with_causes(), None);
health_output.give(health_cap, (0, update));
std::future::pending::<()>().await;
unreachable!("pending future never returns");
}