1use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
13use std::convert::Infallible;
14use std::fmt::Debug;
15use std::future::Future;
16use std::hash::Hash;
17use std::sync::Arc;
18use std::time::Instant;
19
20use differential_dataflow::lattice::Lattice;
21use futures::{StreamExt, future::Either};
22use mz_expr::{ColumnSpecs, EvalError, Interpreter, MfpPlan, ResultSpec, UnmaterializableFunc};
23use mz_ore::cast::CastFrom;
24use mz_ore::collections::CollectionExt;
25use mz_ore::str::redact;
26use mz_persist_client::cache::PersistClientCache;
27use mz_persist_client::cfg::{PersistConfig, RetryParameters};
28use mz_persist_client::fetch::{ExchangeableBatchPart, ShardSourcePart};
29use mz_persist_client::fetch::{FetchedBlob, FetchedPart};
30use mz_persist_client::operators::shard_source::{
31 ErrorHandler, FilterResult, SnapshotMode, shard_source,
32};
33use mz_persist_client::stats::STATS_AUDIT_PANIC;
34use mz_persist_types::Codec64;
35use mz_persist_types::codec_impls::UnitSchema;
36use mz_persist_types::columnar::{ColumnEncoder, Schema};
37use mz_repr::{
38 Datum, DatumVec, Diff, GlobalId, RelationDesc, ReprRelationType, Row, RowArena, Timestamp,
39};
40use mz_storage_types::StorageDiff;
41use mz_storage_types::controller::{CollectionMetadata, TxnsCodecRow};
42use mz_storage_types::errors::DataflowError;
43use mz_storage_types::sources::SourceData;
44use mz_storage_types::stats::RelationPartStats;
45use mz_timely_util::builder_async::{
46 Event, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
47};
48use mz_timely_util::probe::ProbeNotify;
49use mz_txn_wal::operator::{TxnsContext, txns_progress};
50use serde::{Deserialize, Serialize};
51use timely::PartialOrder;
52use timely::container::CapacityContainerBuilder;
53use timely::dataflow::channels::pact::Pipeline;
54use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
55use timely::dataflow::operators::generic::{OutputBuilder, OutputBuilderSession};
56use timely::dataflow::operators::{Capability, Leave, OkErr};
57use timely::dataflow::operators::{CapabilitySet, ConnectLoop, Feedback};
58use timely::dataflow::{Scope, Stream, StreamVec};
59use timely::order::TotalOrder;
60use timely::progress::Antichain;
61use timely::progress::Timestamp as TimelyTimestamp;
62use timely::progress::timestamp::PathSummary;
63use timely::scheduling::Activator;
64use tokio::sync::mpsc::UnboundedSender;
65use tracing::{error, trace};
66
67use crate::metrics::BackpressureMetrics;
68
69#[derive(
77 Copy,
78 Clone,
79 PartialEq,
80 Default,
81 Eq,
82 PartialOrd,
83 Ord,
84 Debug,
85 Serialize,
86 Deserialize,
87 Hash
88)]
89pub struct Subtime(u64);
90
91impl PartialOrder for Subtime {
92 fn less_equal(&self, other: &Self) -> bool {
93 self.0.less_equal(&other.0)
94 }
95}
96
97impl TotalOrder for Subtime {}
98
99impl PathSummary<Subtime> for Subtime {
100 fn results_in(&self, src: &Subtime) -> Option<Subtime> {
101 self.0.results_in(&src.0).map(Subtime)
102 }
103
104 fn followed_by(&self, other: &Self) -> Option<Self> {
105 self.0.followed_by(&other.0).map(Subtime)
106 }
107}
108
109impl TimelyTimestamp for Subtime {
110 type Summary = Subtime;
111
112 fn minimum() -> Self {
113 Subtime(0)
114 }
115}
116
117impl differential_dataflow::lattice::Lattice for Subtime {
118 fn join(&self, other: &Self) -> Self {
119 Subtime(std::cmp::max(self.0, other.0))
120 }
121 fn meet(&self, other: &Self) -> Self {
122 Subtime(std::cmp::min(self.0, other.0))
123 }
124}
125
126impl differential_dataflow::lattice::Maximum for Subtime {
127 fn maximum() -> Self {
128 Subtime(u64::MAX)
129 }
130}
131
132impl Subtime {
133 pub const fn least_summary() -> Self {
135 Subtime(1)
136 }
137}
138
139pub fn persist_source<'scope, E>(
162 scope: Scope<'scope, mz_repr::Timestamp>,
163 source_id: GlobalId,
164 persist_clients: Arc<PersistClientCache>,
165 txns_ctx: &TxnsContext,
166 metadata: CollectionMetadata,
167 read_schema: Option<RelationDesc>,
168 as_of: Option<Antichain<Timestamp>>,
169 snapshot_mode: SnapshotMode,
170 until: Antichain<Timestamp>,
171 map_filter_project: Option<&mut MfpPlan>,
172 max_inflight_bytes: Option<usize>,
173 start_signal: impl Future<Output = ()> + Send + 'static,
174 error_handler: ErrorHandler,
175) -> (
176 StreamVec<'scope, mz_repr::Timestamp, (Row, Timestamp, Diff)>,
177 StreamVec<'scope, mz_repr::Timestamp, (E, Timestamp, Diff)>,
178 Vec<PressOnDropButton>,
179)
180where
181 E: timely::ExchangeData + Ord + Clone + Debug + From<DataflowError> + From<EvalError>,
182{
183 let shard_metrics = persist_clients.shard_metrics(&metadata.data_shard, &source_id.to_string());
184
185 let mut tokens = vec![];
186
187 let outer = scope.clone();
188 let stream = scope.scoped(&format!("granular_backpressure({})", source_id), |scope| {
189 let (flow_control, flow_control_probe) = match max_inflight_bytes {
190 Some(max_inflight_bytes) => {
191 let backpressure_metrics = BackpressureMetrics {
192 emitted_bytes: Arc::clone(&shard_metrics.backpressure_emitted_bytes),
193 last_backpressured_bytes: Arc::clone(
194 &shard_metrics.backpressure_last_backpressured_bytes,
195 ),
196 retired_bytes: Arc::clone(&shard_metrics.backpressure_retired_bytes),
197 };
198
199 let probe = mz_timely_util::probe::Handle::default();
200 let progress_stream = mz_timely_util::probe::source(
201 scope.clone(),
202 format!("decode_backpressure_probe({source_id})"),
203 probe.clone(),
204 );
205 let flow_control = FlowControl {
206 progress_stream,
207 max_inflight_bytes,
208 summary: (Default::default(), Subtime::least_summary()),
209 metrics: Some(backpressure_metrics),
210 };
211 (Some(flow_control), Some(probe))
212 }
213 None => (None, None),
214 };
215
216 let cfg = Arc::clone(&persist_clients.cfg().configs);
222 let subscribe_sleep = match metadata.txns_shard {
223 Some(_) => Some(move || mz_txn_wal::operator::txns_data_shard_retry_params(&cfg)),
224 None => None,
225 };
226
227 let (stream, source_tokens) = persist_source_core(
228 outer,
229 scope,
230 source_id,
231 Arc::clone(&persist_clients),
232 metadata.clone(),
233 read_schema,
234 as_of.clone(),
235 snapshot_mode,
236 until.clone(),
237 map_filter_project,
238 flow_control,
239 subscribe_sleep,
240 start_signal,
241 error_handler,
242 );
243 tokens.extend(source_tokens);
244
245 let stream = match flow_control_probe {
246 Some(probe) => stream.probe_notify_with(vec![probe]),
247 None => stream,
248 };
249
250 stream.leave(outer)
251 });
252
253 let (stream, txns_tokens) = match metadata.txns_shard {
258 Some(txns_shard) => txns_progress::<SourceData, (), Timestamp, i64, _, TxnsCodecRow, _>(
259 stream,
260 &source_id.to_string(),
261 txns_ctx,
262 move || {
263 let (c, l) = (
264 Arc::clone(&persist_clients),
265 metadata.persist_location.clone(),
266 );
267 async move { c.open(l).await.expect("location is valid") }
268 },
269 txns_shard,
270 metadata.data_shard,
271 as_of
272 .expect("as_of is provided for table sources")
273 .into_option()
274 .expect("shard is not closed"),
275 until,
276 Arc::new(metadata.relation_desc),
277 Arc::new(UnitSchema),
278 ),
279 None => (stream, vec![]),
280 };
281 tokens.extend(txns_tokens);
282 let (ok_stream, err_stream) = stream.ok_err(|(d, t, r)| match d {
283 Ok(row) => Ok((row, t.0, r)),
284 Err(err) => Err((err, t.0, r)),
285 });
286 (ok_stream, err_stream, tokens)
287}
288
289type RefinedScope<'scope, T> = Scope<'scope, (T, Subtime)>;
290
291#[allow(clippy::needless_borrow)]
298pub fn persist_source_core<'g, 'outer, E>(
299 outer: Scope<'outer, mz_repr::Timestamp>,
300 scope: RefinedScope<'g, mz_repr::Timestamp>,
301 source_id: GlobalId,
302 persist_clients: Arc<PersistClientCache>,
303 metadata: CollectionMetadata,
304 read_schema: Option<RelationDesc>,
305 as_of: Option<Antichain<Timestamp>>,
306 snapshot_mode: SnapshotMode,
307 until: Antichain<Timestamp>,
308 map_filter_project: Option<&mut MfpPlan>,
309 flow_control: Option<FlowControl<'g, (mz_repr::Timestamp, Subtime)>>,
310 listen_sleep: Option<impl Fn() -> RetryParameters + Send + 'static>,
312 start_signal: impl Future<Output = ()> + Send + 'static,
313 error_handler: ErrorHandler,
314) -> (
315 Stream<
316 'g,
317 (mz_repr::Timestamp, Subtime),
318 Vec<(Result<Row, E>, (mz_repr::Timestamp, Subtime), Diff)>,
319 >,
320 Vec<PressOnDropButton>,
321)
322where
323 E: timely::ExchangeData + Ord + Clone + Debug + From<DataflowError> + From<EvalError>,
324{
325 let cfg = persist_clients.cfg().clone();
326 let name = source_id.to_string();
327 let filter_plan = map_filter_project.as_ref().map(|p| (*p).clone());
328
329 let read_desc = match read_schema {
331 Some(desc) => desc,
332 None => metadata.relation_desc,
333 };
334
335 let desc_transformer = match flow_control {
336 Some(flow_control) => Some(move |scope, descs, chosen_worker| {
337 let (stream, token) = backpressure(
338 scope,
339 &format!("backpressure({source_id})"),
340 descs,
341 flow_control,
342 chosen_worker,
343 None,
344 );
345 (stream, vec![token])
346 }),
347 None => None,
348 };
349
350 let metrics = Arc::clone(persist_clients.metrics());
351 let filter_name = name.clone();
352 let upper = until.as_option().cloned().unwrap_or(Timestamp::MAX);
356 let (fetched, token) = shard_source(
357 outer,
358 scope,
359 &name,
360 move || {
361 let (c, l) = (
362 Arc::clone(&persist_clients),
363 metadata.persist_location.clone(),
364 );
365 async move { c.open(l).await.unwrap() }
366 },
367 metadata.data_shard,
368 as_of,
369 snapshot_mode,
370 until.clone(),
371 desc_transformer,
372 Arc::new(read_desc.clone()),
373 Arc::new(UnitSchema),
374 move |stats, frontier| {
375 let Some(lower) = frontier.as_option().copied() else {
376 return FilterResult::Discard;
379 };
380
381 if lower > upper {
382 return FilterResult::Discard;
385 }
386
387 let time_range =
388 ResultSpec::value_between(Datum::MzTimestamp(lower), Datum::MzTimestamp(upper));
389 if let Some(plan) = &filter_plan {
390 let metrics = &metrics.pushdown.part_stats;
391 let stats = RelationPartStats::new(&filter_name, metrics, &read_desc, stats);
392 filter_result(&read_desc, time_range, stats, plan)
393 } else {
394 FilterResult::Keep
395 }
396 },
397 listen_sleep,
398 start_signal,
399 error_handler,
400 );
401 let rows = decode_and_mfp(cfg, fetched, &name, until, map_filter_project);
402 (rows, token)
403}
404
405fn filter_result(
406 relation_desc: &RelationDesc,
407 time_range: ResultSpec,
408 stats: RelationPartStats,
409 plan: &MfpPlan,
410) -> FilterResult {
411 let arena = RowArena::new();
412 let relation = ReprRelationType::from(relation_desc.typ());
413 let mut ranges = ColumnSpecs::new(&relation, &arena);
414 ranges.push_unmaterializable(UnmaterializableFunc::MzNow, time_range);
415
416 let may_error = stats.err_count().map_or(true, |count| count > 0);
417
418 for (pos, (idx, _, _)) in relation_desc.iter_all().enumerate() {
421 let result_spec = stats.col_stats(idx, &arena);
422 ranges.push_column(pos, result_spec);
423 }
424 let result = ranges.mfp_plan_filter(plan).range;
425 let may_error = may_error || result.may_fail();
426 let may_keep = result.may_contain(Datum::True);
427 let may_skip = result.may_contain(Datum::False) || result.may_contain(Datum::Null);
428 if relation_desc.len() == 0 && !may_error && !may_skip {
429 let Ok(mut key) = <RelationDesc as Schema<SourceData>>::encoder(relation_desc) else {
430 return FilterResult::Keep;
431 };
432 key.append(&SourceData(Ok(Row::default())));
433 let key = key.finish();
434 let Ok(mut val) = <UnitSchema as Schema<()>>::encoder(&UnitSchema) else {
435 return FilterResult::Keep;
436 };
437 val.append(&());
438 let val = val.finish();
439
440 FilterResult::ReplaceWith {
441 key: Arc::new(key),
442 val: Arc::new(val),
443 }
444 } else if may_error || may_keep {
445 FilterResult::Keep
446 } else {
447 FilterResult::Discard
448 }
449}
450
451pub fn decode_and_mfp<'scope, E>(
452 cfg: PersistConfig,
453 fetched: StreamVec<
454 'scope,
455 (mz_repr::Timestamp, Subtime),
456 FetchedBlob<SourceData, (), Timestamp, StorageDiff>,
457 >,
458 name: &str,
459 until: Antichain<Timestamp>,
460 mut map_filter_project: Option<&mut MfpPlan>,
461) -> StreamVec<
462 'scope,
463 (mz_repr::Timestamp, Subtime),
464 (Result<Row, E>, (mz_repr::Timestamp, Subtime), Diff),
465>
466where
467 E: timely::ExchangeData + Ord + Clone + Debug + From<DataflowError> + From<EvalError>,
468{
469 let scope = fetched.scope();
470 let mut builder = OperatorBuilder::new(
471 format!("persist_source::decode_and_mfp({})", name),
472 scope.clone(),
473 );
474 let operator_info = builder.operator_info();
475
476 let mut fetched_input = builder.new_input(fetched, Pipeline);
477 let (updates_output, updates_stream) = builder.new_output();
478 let mut updates_output = OutputBuilder::from(updates_output);
479
480 let mut datum_vec = mz_repr::DatumVec::new();
482 let mut row_builder = Row::default();
483
484 let map_filter_project = map_filter_project.as_mut().map(|mfp| mfp.take());
486
487 builder.build(move |_caps| {
488 let name = name.to_owned();
489 let activations = scope.activations();
491 let activator = Activator::new(operator_info.address, activations);
492 let mut pending_work = std::collections::VecDeque::new();
494 let panic_on_audit_failure = STATS_AUDIT_PANIC.handle(&cfg);
495
496 move |_frontier| {
497 fetched_input.for_each(|time, data| {
498 let capability = time.retain(0);
499 for fetched_blob in data.drain(..) {
500 pending_work.push_back(PendingWork {
501 panic_on_audit_failure: panic_on_audit_failure.get(),
502 capability: capability.clone(),
503 part: PendingPart::Unparsed(fetched_blob),
504 })
505 }
506 });
507
508 let yield_fuel = cfg.storage_source_decode_fuel();
511 let yield_fn = |_, work| work >= yield_fuel;
512
513 let mut work = 0;
514 let start_time = Instant::now();
515 let mut output = updates_output.activate();
516 while !pending_work.is_empty() && !yield_fn(start_time, work) {
517 let done = pending_work.front_mut().unwrap().do_work(
518 &mut work,
519 &name,
520 start_time,
521 yield_fn,
522 &until,
523 map_filter_project.as_ref(),
524 &mut datum_vec,
525 &mut row_builder,
526 &mut output,
527 );
528 if done {
529 pending_work.pop_front();
530 }
531 }
532 if !pending_work.is_empty() {
533 activator.activate();
534 }
535 }
536 });
537
538 updates_stream
539}
540
541struct PendingWork {
543 panic_on_audit_failure: bool,
545 capability: Capability<(mz_repr::Timestamp, Subtime)>,
547 part: PendingPart,
549}
550
551enum PendingPart {
552 Unparsed(FetchedBlob<SourceData, (), Timestamp, StorageDiff>),
553 Parsed {
554 part: ShardSourcePart<SourceData, (), Timestamp, StorageDiff>,
555 },
556}
557
558impl PendingPart {
559 fn part_mut(&mut self) -> &mut FetchedPart<SourceData, (), Timestamp, StorageDiff> {
566 match self {
567 PendingPart::Unparsed(x) => {
568 *self = PendingPart::Parsed { part: x.parse() };
569 self.part_mut()
571 }
572 PendingPart::Parsed { part } => &mut part.part,
573 }
574 }
575}
576
577impl PendingWork {
578 fn do_work<YFn, E>(
581 &mut self,
582 work: &mut usize,
583 name: &str,
584 start_time: Instant,
585 yield_fn: YFn,
586 until: &Antichain<Timestamp>,
587 map_filter_project: Option<&MfpPlan>,
588 datum_vec: &mut DatumVec,
589 row_builder: &mut Row,
590 output: &mut OutputBuilderSession<
591 '_,
592 (mz_repr::Timestamp, Subtime),
593 ConsolidatingContainerBuilder<
594 Vec<(Result<Row, E>, (mz_repr::Timestamp, Subtime), Diff)>,
595 >,
596 >,
597 ) -> bool
598 where
599 YFn: Fn(Instant, usize) -> bool,
600 E: timely::ExchangeData + Ord + Clone + Debug + From<DataflowError> + From<EvalError>,
601 {
602 let mut session = output.session_with_builder(&self.capability);
603 let fetched_part = self.part.part_mut();
604 let is_filter_pushdown_audit = fetched_part.is_filter_pushdown_audit();
605 let mut row_buf = None;
606 while let Some(((key, val), time, diff)) =
607 fetched_part.next_with_storage(&mut row_buf, &mut None)
608 {
609 if until.less_equal(&time) {
610 continue;
611 }
612 match (key, val) {
613 (SourceData(Ok(row)), ()) => {
614 if let Some(mfp) = map_filter_project {
615 *work += 1;
622 let arena = mz_repr::RowArena::new();
623 let mut datums_local = datum_vec.borrow_with(&row);
624 for result in mfp.evaluate(
625 &mut datums_local,
626 &arena,
627 time,
628 diff.into(),
629 |time| !until.less_equal(time),
630 row_builder,
631 ) {
632 if let Some(stats) = &is_filter_pushdown_audit {
636 sentry::with_scope(
640 |scope| {
641 scope
642 .set_tag("alert_id", "persist_pushdown_audit_violation")
643 },
644 || {
645 error!(
646 ?stats,
647 name,
648 mfp = ?redact(&mfp),
649 result = ?redact(&result),
650 "persist filter pushdown correctness violation!"
651 );
652 if self.panic_on_audit_failure {
653 panic!(
654 "persist filter pushdown correctness violation! {}",
655 name
656 );
657 }
658 },
659 );
660 }
661 match result {
662 Ok((row, time, diff)) => {
663 if !until.less_equal(&time) {
665 let mut emit_time = *self.capability.time();
666 emit_time.0 = time;
667 session.give((Ok(row), emit_time, diff));
668 *work += 1;
669 }
670 }
671 Err((err, time, diff)) => {
672 if !until.less_equal(&time) {
674 let mut emit_time = *self.capability.time();
675 emit_time.0 = time;
676 session.give((Err(err), emit_time, diff));
677 *work += 1;
678 }
679 }
680 }
681 }
682 drop(datums_local);
686 row_buf.replace(SourceData(Ok(row)));
687 } else {
688 let mut emit_time = *self.capability.time();
689 emit_time.0 = time;
690 session.give((Ok(row.clone()), emit_time, diff.into()));
692 row_buf.replace(SourceData(Ok(row)));
693 *work += 1;
694 }
695 }
696 (SourceData(Err(err)), ()) => {
697 let mut emit_time = *self.capability.time();
698 emit_time.0 = time;
699 session.give((Err(E::from(err)), emit_time, diff.into()));
700 *work += 1;
701 }
702 }
703 if yield_fn(start_time, *work) {
704 return false;
705 }
706 }
707 true
708 }
709}
710
711pub trait Backpressureable: Clone + 'static {
713 fn byte_size(&self) -> usize;
715}
716
717impl<T: Clone + 'static> Backpressureable for (usize, ExchangeableBatchPart<T>) {
718 fn byte_size(&self) -> usize {
719 self.1.encoded_size_bytes()
720 }
721}
722
723#[derive(Debug)]
725pub struct FlowControl<'scope, T: timely::progress::Timestamp> {
726 pub progress_stream: StreamVec<'scope, T, Infallible>,
732 pub max_inflight_bytes: usize,
734 pub summary: T::Summary,
737
738 pub metrics: Option<BackpressureMetrics>,
740}
741
742pub fn backpressure<'scope, T, O>(
755 scope: Scope<'scope, (T, Subtime)>,
756 name: &str,
757 data: StreamVec<'scope, (T, Subtime), O>,
758 flow_control: FlowControl<'scope, (T, Subtime)>,
759 chosen_worker: usize,
760 probe: Option<UnboundedSender<(Antichain<(T, Subtime)>, usize, usize)>>,
762) -> (StreamVec<'scope, (T, Subtime), O>, PressOnDropButton)
763where
764 T: TimelyTimestamp + Lattice + Codec64 + TotalOrder,
765 O: Backpressureable + std::fmt::Debug,
766{
767 let worker_index = scope.index();
768
769 let (flow_control_stream, flow_control_max_bytes, metrics) = (
770 flow_control.progress_stream,
771 flow_control.max_inflight_bytes,
772 flow_control.metrics,
773 );
774
775 let (handle, summaried_flow) = scope.feedback(flow_control.summary.clone());
780 flow_control_stream.connect_loop(handle);
781
782 let mut builder = AsyncOperatorBuilder::new(
783 format!("persist_source_backpressure({})", name),
784 scope.clone(),
785 );
786 let (data_output, data_stream) = builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
787
788 let mut data_input = builder.new_disconnected_input(data, Pipeline);
789 let mut flow_control_input = builder.new_disconnected_input(summaried_flow, Pipeline);
790
791 fn synthesize_frontiers<T: PartialOrder + Clone>(
793 mut frontier: Antichain<(T, Subtime)>,
794 mut time: (T, Subtime),
795 part_number: &mut u64,
796 ) -> (
797 (T, Subtime),
798 Antichain<(T, Subtime)>,
799 Antichain<(T, Subtime)>,
800 ) {
801 let mut next_frontier = frontier.clone();
802 time.1 = Subtime(*part_number);
803 frontier.insert(time.clone());
804 *part_number += 1;
805 let mut next_time = time.clone();
806 next_time.1 = Subtime(*part_number);
807 next_frontier.insert(next_time);
808 (time, frontier, next_frontier)
809 }
810
811 let data_input = async_stream::stream!({
814 let mut part_number = 0;
815 let mut parts: Vec<((T, Subtime), O)> = Vec::new();
816 loop {
817 match data_input.next().await {
818 None => {
819 let empty = Antichain::new();
820 parts.sort_by_key(|val| val.0.clone());
821 for (part_time, d) in parts.drain(..) {
822 let (part_time, frontier, next_frontier) = synthesize_frontiers(
823 empty.clone(),
824 part_time.clone(),
825 &mut part_number,
826 );
827 yield Either::Right((part_time, d, frontier, next_frontier))
828 }
829 break;
830 }
831 Some(Event::Data(time, data)) => {
832 for d in data {
833 parts.push((time.clone(), d));
834 }
835 }
836 Some(Event::Progress(prog)) => {
837 parts.sort_by_key(|val| val.0.clone());
838 for (part_time, d) in parts.extract_if(.., |p| !prog.less_equal(&p.0)) {
839 let (part_time, frontier, next_frontier) =
840 synthesize_frontiers(prog.clone(), part_time.clone(), &mut part_number);
841 yield Either::Right((part_time, d, frontier, next_frontier))
842 }
843 yield Either::Left(prog)
844 }
845 }
846 }
847 });
848 let shutdown_button = builder.build(move |caps| async move {
849 let mut cap_set = CapabilitySet::from_elem(caps.into_element());
851
852 let mut output_frontier = Antichain::from_elem(TimelyTimestamp::minimum());
854 let mut flow_control_frontier = Antichain::from_elem(TimelyTimestamp::minimum());
856
857 let mut inflight_parts = Vec::new();
859 let mut pending_parts = std::collections::VecDeque::new();
861
862 if worker_index != chosen_worker {
864 trace!(
865 "We are not the chosen worker ({}), exiting...",
866 chosen_worker
867 );
868 return;
869 }
870 tokio::pin!(data_input);
871 'emitting_parts: loop {
872 let inflight_bytes: usize = inflight_parts.iter().map(|(_, size)| size).sum();
875
876 if inflight_bytes < flow_control_max_bytes
884 || !PartialOrder::less_equal(&flow_control_frontier, &output_frontier)
885 {
886 let (time, part, next_frontier) =
887 if let Some((time, part, next_frontier)) = pending_parts.pop_front() {
888 (time, part, next_frontier)
889 } else {
890 match data_input.next().await {
891 Some(Either::Right((time, part, frontier, next_frontier))) => {
892 output_frontier = frontier;
897 cap_set.downgrade(output_frontier.iter());
898
899 if inflight_bytes >= flow_control_max_bytes
904 && !PartialOrder::less_than(
905 &output_frontier,
906 &flow_control_frontier,
907 )
908 {
909 pending_parts.push_back((time, part, next_frontier));
910 continue 'emitting_parts;
911 }
912 (time, part, next_frontier)
913 }
914 Some(Either::Left(prog)) => {
915 output_frontier = prog;
916 cap_set.downgrade(output_frontier.iter());
917 continue 'emitting_parts;
918 }
919 None => {
920 if pending_parts.is_empty() {
921 break 'emitting_parts;
922 } else {
923 continue 'emitting_parts;
924 }
925 }
926 }
927 };
928
929 let byte_size = part.byte_size();
930 if let Some(emission_ts) = flow_control.summary.results_in(&time) {
940 inflight_parts.push((emission_ts, byte_size));
941 }
942
943 data_output.give(&cap_set.delayed(&time), part);
946
947 if let Some(metrics) = &metrics {
948 metrics.emitted_bytes.inc_by(u64::cast_from(byte_size))
949 }
950
951 output_frontier = next_frontier;
952 cap_set.downgrade(output_frontier.iter())
953 } else {
954 if let Some(metrics) = &metrics {
955 metrics
956 .last_backpressured_bytes
957 .set(u64::cast_from(inflight_bytes))
958 }
959 let parts_count = inflight_parts.len();
960 let new_flow_control_frontier = match flow_control_input.next().await {
965 Some(Event::Progress(frontier)) => frontier,
966 Some(Event::Data(_, _)) => {
967 unreachable!("flow_control_input should not contain data")
968 }
969 None => Antichain::new(),
970 };
971
972 flow_control_frontier.clone_from(&new_flow_control_frontier);
974
975 let retired_parts = inflight_parts
977 .extract_if(.., |(ts, _size)| !flow_control_frontier.less_equal(ts));
978 let (retired_size, retired_count): (usize, usize) = retired_parts
979 .fold((0, 0), |(accum_size, accum_count), (_ts, size)| {
980 (accum_size + size, accum_count + 1)
981 });
982 trace!(
983 "returning {} parts with {} bytes, frontier: {:?}",
984 retired_count, retired_size, flow_control_frontier,
985 );
986
987 if let Some(metrics) = &metrics {
988 metrics.retired_bytes.inc_by(u64::cast_from(retired_size))
989 }
990
991 if let Some(probe) = probe.as_ref() {
993 let _ = probe.send((new_flow_control_frontier, parts_count, retired_count));
994 }
995 }
996 }
997 });
998 (data_stream, shutdown_button.press_on_drop())
999}
1000
1001#[cfg(test)]
1002mod tests {
1003 use timely::container::CapacityContainerBuilder;
1004 use timely::dataflow::operators::{Enter, Probe};
1005 use tokio::sync::mpsc::unbounded_channel;
1006 use tokio::sync::oneshot;
1007
1008 use super::*;
1009
1010 #[mz_ore::test]
1011 fn test_backpressure_non_granular() {
1012 use Step::*;
1013 backpressure_runner(
1014 vec![(50, Part(101)), (50, Part(102)), (100, Part(1))],
1015 100,
1016 (1, Subtime(0)),
1017 vec![
1018 AssertOutputFrontier((50, Subtime(2))),
1021 AssertBackpressured {
1022 frontier: (1, Subtime(0)),
1023 inflight_parts: 1,
1024 retired_parts: 0,
1025 },
1026 AssertBackpressured {
1027 frontier: (51, Subtime(0)),
1028 inflight_parts: 1,
1029 retired_parts: 0,
1030 },
1031 ProcessXParts(2),
1032 AssertBackpressured {
1033 frontier: (101, Subtime(0)),
1034 inflight_parts: 2,
1035 retired_parts: 2,
1036 },
1037 AssertOutputFrontier((100, Subtime(3))),
1040 ],
1041 true,
1042 );
1043
1044 backpressure_runner(
1045 vec![
1046 (50, Part(10)),
1047 (50, Part(10)),
1048 (51, Part(100)),
1049 (52, Part(1000)),
1050 ],
1051 50,
1052 (1, Subtime(0)),
1053 vec![
1054 AssertOutputFrontier((51, Subtime(3))),
1056 AssertBackpressured {
1057 frontier: (1, Subtime(0)),
1058 inflight_parts: 3,
1059 retired_parts: 0,
1060 },
1061 ProcessXParts(3),
1062 AssertBackpressured {
1063 frontier: (52, Subtime(0)),
1064 inflight_parts: 3,
1065 retired_parts: 2,
1066 },
1067 AssertBackpressured {
1068 frontier: (53, Subtime(0)),
1069 inflight_parts: 1,
1070 retired_parts: 1,
1071 },
1072 AssertOutputFrontier((52, Subtime(4))),
1075 ],
1076 true,
1077 );
1078
1079 backpressure_runner(
1080 vec![
1081 (50, Part(98)),
1082 (50, Part(1)),
1083 (51, Part(10)),
1084 (52, Part(100)),
1085 (52, Part(10)),
1087 (52, Part(10)),
1088 (52, Part(10)),
1089 (52, Part(100)),
1090 (100, Part(100)),
1092 ],
1093 100,
1094 (1, Subtime(0)),
1095 vec![
1096 AssertOutputFrontier((51, Subtime(3))),
1097 AssertBackpressured {
1101 frontier: (1, Subtime(0)),
1102 inflight_parts: 3,
1103 retired_parts: 0,
1104 },
1105 AssertBackpressured {
1106 frontier: (51, Subtime(0)),
1107 inflight_parts: 3,
1108 retired_parts: 0,
1109 },
1110 ProcessXParts(1),
1111 AssertOutputFrontier((51, Subtime(3))),
1114 ProcessXParts(1),
1118 AssertOutputFrontier((52, Subtime(4))),
1119 AssertBackpressured {
1120 frontier: (52, Subtime(0)),
1121 inflight_parts: 3,
1122 retired_parts: 2,
1123 },
1124 ProcessXParts(1),
1128 AssertBackpressured {
1132 frontier: (53, Subtime(0)),
1133 inflight_parts: 2,
1134 retired_parts: 1,
1135 },
1136 ProcessXParts(5),
1138 AssertBackpressured {
1139 frontier: (101, Subtime(0)),
1140 inflight_parts: 5,
1141 retired_parts: 5,
1142 },
1143 AssertOutputFrontier((100, Subtime(9))),
1144 ],
1145 true,
1146 );
1147 }
1148
1149 #[mz_ore::test]
1150 fn test_backpressure_granular() {
1151 use Step::*;
1152 backpressure_runner(
1153 vec![(50, Part(101)), (50, Part(101))],
1154 100,
1155 (0, Subtime(1)),
1156 vec![
1157 AssertOutputFrontier((50, Subtime(1))),
1159 AssertBackpressured {
1162 frontier: (0, Subtime(1)),
1163 inflight_parts: 1,
1164 retired_parts: 0,
1165 },
1166 AssertBackpressured {
1167 frontier: (50, Subtime(1)),
1168 inflight_parts: 1,
1169 retired_parts: 0,
1170 },
1171 ProcessXParts(1),
1173 AssertBackpressured {
1175 frontier: (50, Subtime(2)),
1176 inflight_parts: 1,
1177 retired_parts: 1,
1178 },
1179 AssertOutputFrontier((50, Subtime(2))),
1181 ],
1182 false,
1183 );
1184
1185 backpressure_runner(
1186 vec![
1187 (50, Part(10)),
1188 (50, Part(10)),
1189 (51, Part(35)),
1190 (52, Part(100)),
1191 ],
1192 50,
1193 (0, Subtime(1)),
1194 vec![
1195 AssertOutputFrontier((51, Subtime(3))),
1197 AssertBackpressured {
1198 frontier: (0, Subtime(1)),
1199 inflight_parts: 3,
1200 retired_parts: 0,
1201 },
1202 AssertBackpressured {
1203 frontier: (50, Subtime(1)),
1204 inflight_parts: 3,
1205 retired_parts: 0,
1206 },
1207 ProcessXParts(1),
1209 AssertBackpressured {
1210 frontier: (50, Subtime(2)),
1211 inflight_parts: 3,
1212 retired_parts: 1,
1213 },
1214 AssertOutputFrontier((52, Subtime(4))),
1217 ProcessXParts(2),
1218 AssertBackpressured {
1219 frontier: (52, Subtime(4)),
1220 inflight_parts: 3,
1221 retired_parts: 2,
1222 },
1223 ],
1224 false,
1225 );
1226 }
1227
1228 type Time = (u64, Subtime);
1229 #[derive(Clone, Debug)]
1230 struct Part(usize);
1231 impl Backpressureable for Part {
1232 fn byte_size(&self) -> usize {
1233 self.0
1234 }
1235 }
1236
1237 enum Step {
1239 AssertOutputFrontier(Time),
1242 AssertBackpressured {
1246 frontier: Time,
1247 inflight_parts: usize,
1248 retired_parts: usize,
1249 },
1250 ProcessXParts(usize),
1252 }
1253
1254 fn backpressure_runner(
1256 input: Vec<(u64, Part)>,
1258 max_inflight_bytes: usize,
1260 summary: Time,
1262 steps: Vec<Step>,
1264 non_granular_consumer: bool,
1267 ) {
1268 timely::execute::execute_directly(move |worker| {
1269 let (
1270 backpressure_probe,
1271 consumer_tx,
1272 mut backpressure_status_rx,
1273 finalizer_tx,
1274 _token,
1275 ) =
1276 worker.dataflow::<u64, _, _>(|outer_scope| {
1278 let (non_granular_feedback_handle, non_granular_feedback) =
1279 if non_granular_consumer {
1280 let (h, f) = outer_scope.feedback(Default::default());
1281 (Some(h), Some(f))
1282 } else {
1283 (None, None)
1284 };
1285 let (
1286 backpressure_probe,
1287 consumer_tx,
1288 backpressure_status_rx,
1289 token,
1290 backpressured,
1291 finalizer_tx,
1292 ) = outer_scope.scoped::<(u64, Subtime), _, _>("hybrid", |scope| {
1293 let (input, finalizer_tx) =
1294 iterator_operator(scope.clone(), input.into_iter());
1295
1296 let (flow_control, granular_feedback_handle) = if non_granular_consumer {
1297 (
1298 FlowControl {
1299 progress_stream: non_granular_feedback.unwrap().enter(scope),
1300 max_inflight_bytes,
1301 summary,
1302 metrics: None
1303 },
1304 None,
1305 )
1306 } else {
1307 let (granular_feedback_handle, granular_feedback) =
1308 scope.feedback(Default::default());
1309 (
1310 FlowControl {
1311 progress_stream: granular_feedback,
1312 max_inflight_bytes,
1313 summary,
1314 metrics: None,
1315 },
1316 Some(granular_feedback_handle),
1317 )
1318 };
1319
1320 let (backpressure_status_tx, backpressure_status_rx) = unbounded_channel();
1321
1322 let (backpressured, token) = backpressure(
1323 scope,
1324 "test",
1325 input,
1326 flow_control,
1327 0,
1328 Some(backpressure_status_tx),
1329 );
1330
1331 let tx = if !non_granular_consumer {
1333 Some(consumer_operator(
1334 scope.clone(),
1335 backpressured.clone(),
1336 granular_feedback_handle.unwrap(),
1337 ))
1338 } else {
1339 None
1340 };
1341
1342 let (probe_handle, backpressured) = backpressured.probe();
1343 (
1344 probe_handle,
1345 tx,
1346 backpressure_status_rx,
1347 token,
1348 backpressured.leave(outer_scope),
1349 finalizer_tx,
1350 )
1351 });
1352
1353 let consumer_tx = if non_granular_consumer {
1355 consumer_operator(
1356 outer_scope.clone(),
1357 backpressured,
1358 non_granular_feedback_handle.unwrap(),
1359 )
1360 } else {
1361 consumer_tx.unwrap()
1362 };
1363
1364 (
1365 backpressure_probe,
1366 consumer_tx,
1367 backpressure_status_rx,
1368 finalizer_tx,
1369 token,
1370 )
1371 });
1372
1373 use Step::*;
1374 for step in steps {
1375 match step {
1376 AssertOutputFrontier(time) => {
1377 eprintln!("checking advance to {time:?}");
1378 backpressure_probe.with_frontier(|front| {
1379 eprintln!("current backpressure output frontier: {front:?}");
1380 });
1381 while backpressure_probe.less_than(&time) {
1382 worker.step();
1383 backpressure_probe.with_frontier(|front| {
1384 eprintln!("current backpressure output frontier: {front:?}");
1385 });
1386 std::thread::sleep(std::time::Duration::from_millis(25));
1387 }
1388 }
1389 ProcessXParts(parts) => {
1390 eprintln!("processing {parts:?} parts");
1391 for _ in 0..parts {
1392 consumer_tx.send(()).unwrap();
1393 }
1394 }
1395 AssertBackpressured {
1396 frontier,
1397 inflight_parts,
1398 retired_parts,
1399 } => {
1400 let frontier = Antichain::from_elem(frontier);
1401 eprintln!(
1402 "asserting backpressured at {frontier:?}, with {inflight_parts:?} inflight parts \
1403 and {retired_parts:?} retired"
1404 );
1405 let (new_frontier, new_count, new_retired_count) = loop {
1406 if let Ok(val) = backpressure_status_rx.try_recv() {
1407 break val;
1408 }
1409 worker.step();
1410 std::thread::sleep(std::time::Duration::from_millis(25));
1411 };
1412 assert_eq!(
1413 (frontier, inflight_parts, retired_parts),
1414 (new_frontier, new_count, new_retired_count)
1415 );
1416 }
1417 }
1418 }
1419 let _ = finalizer_tx.send(());
1421 });
1422 }
1423
1424 fn iterator_operator<'scope, I: Iterator<Item = (u64, Part)> + 'static>(
1427 scope: Scope<'scope, (u64, Subtime)>,
1428 mut input: I,
1429 ) -> (StreamVec<'scope, (u64, Subtime), Part>, oneshot::Sender<()>) {
1430 let (finalizer_tx, finalizer_rx) = oneshot::channel();
1431 let mut iterator = AsyncOperatorBuilder::new("iterator".to_string(), scope);
1432 let (output_handle, output) = iterator.new_output::<CapacityContainerBuilder<Vec<Part>>>();
1433
1434 iterator.build(|mut caps| async move {
1435 let mut capability = Some(caps.pop().unwrap());
1436 let mut last = None;
1437 while let Some(element) = input.next() {
1438 let time = element.0.clone();
1439 let part = element.1;
1440 last = Some((time, Subtime(0)));
1441 output_handle.give(&capability.as_ref().unwrap().delayed(&last.unwrap()), part);
1442 }
1443 if let Some(last) = last {
1444 capability
1445 .as_mut()
1446 .unwrap()
1447 .downgrade(&(last.0 + 1, last.1));
1448 }
1449
1450 let _ = finalizer_rx.await;
1451 capability.take();
1452 });
1453
1454 (output, finalizer_tx)
1455 }
1456
1457 fn consumer_operator<
1461 'scope,
1462 T: timely::progress::Timestamp,
1463 O: Backpressureable + std::fmt::Debug,
1464 >(
1465 scope: Scope<'scope, T>,
1466 input: StreamVec<'scope, T, O>,
1467 feedback: timely::dataflow::operators::feedback::Handle<
1468 'scope,
1469 T,
1470 Vec<std::convert::Infallible>,
1471 >,
1472 ) -> UnboundedSender<()> {
1473 let (tx, mut rx) = unbounded_channel::<()>();
1474 let mut consumer = AsyncOperatorBuilder::new("consumer".to_string(), scope);
1475 let (output_handle, output) =
1476 consumer.new_output::<CapacityContainerBuilder<Vec<std::convert::Infallible>>>();
1477 let mut input = consumer.new_input_for(input, Pipeline, &output_handle);
1478
1479 consumer.build(|_caps| async move {
1480 while let Some(()) = rx.recv().await {
1481 while let Some(Event::Progress(_)) = input.next().await {}
1483 }
1484 });
1485 output.connect_loop(feedback);
1486
1487 tx
1488 }
1489}