1use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
13use std::convert::Infallible;
14use std::fmt::Debug;
15use std::future::Future;
16use std::hash::Hash;
17use std::sync::Arc;
18use std::time::Instant;
19
20use differential_dataflow::lattice::Lattice;
21use futures::{StreamExt, future::Either};
22use mz_expr::{ColumnSpecs, Interpreter, MfpPlan, ResultSpec, UnmaterializableFunc};
23use mz_ore::cast::CastFrom;
24use mz_ore::collections::CollectionExt;
25use mz_persist_client::cache::PersistClientCache;
26use mz_persist_client::cfg::{PersistConfig, RetryParameters};
27use mz_persist_client::fetch::{ExchangeableBatchPart, ShardSourcePart};
28use mz_persist_client::fetch::{FetchedBlob, FetchedPart};
29use mz_persist_client::operators::shard_source::{
30 ErrorHandler, FilterResult, SnapshotMode, shard_source,
31};
32use mz_persist_client::stats::STATS_AUDIT_PANIC;
33use mz_persist_types::Codec64;
34use mz_persist_types::codec_impls::UnitSchema;
35use mz_persist_types::columnar::{ColumnEncoder, Schema};
36use mz_repr::{Datum, DatumVec, Diff, GlobalId, RelationDesc, Row, RowArena, Timestamp};
37use mz_storage_types::StorageDiff;
38use mz_storage_types::controller::{CollectionMetadata, TxnsCodecRow};
39use mz_storage_types::errors::DataflowError;
40use mz_storage_types::sources::SourceData;
41use mz_storage_types::stats::RelationPartStats;
42use mz_timely_util::builder_async::{
43 Event, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
44};
45use mz_timely_util::probe::ProbeNotify;
46use mz_txn_wal::operator::{TxnsContext, txns_progress};
47use serde::{Deserialize, Serialize};
48use timely::PartialOrder;
49use timely::container::CapacityContainerBuilder;
50use timely::dataflow::ScopeParent;
51use timely::dataflow::channels::pact::Pipeline;
52use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
53use timely::dataflow::operators::generic::{OutputBuilder, OutputBuilderSession};
54use timely::dataflow::operators::{Capability, Leave, OkErr};
55use timely::dataflow::operators::{CapabilitySet, ConnectLoop, Feedback};
56use timely::dataflow::scopes::Child;
57use timely::dataflow::{Scope, Stream};
58use timely::order::TotalOrder;
59use timely::progress::Antichain;
60use timely::progress::Timestamp as TimelyTimestamp;
61use timely::progress::timestamp::PathSummary;
62use timely::scheduling::Activator;
63use tokio::sync::mpsc::UnboundedSender;
64use tracing::{error, trace};
65
66use crate::metrics::BackpressureMetrics;
67
68#[derive(
76 Copy,
77 Clone,
78 PartialEq,
79 Default,
80 Eq,
81 PartialOrd,
82 Ord,
83 Debug,
84 Serialize,
85 Deserialize,
86 Hash
87)]
88pub struct Subtime(u64);
89
90impl PartialOrder for Subtime {
91 fn less_equal(&self, other: &Self) -> bool {
92 self.0.less_equal(&other.0)
93 }
94}
95
96impl TotalOrder for Subtime {}
97
98impl PathSummary<Subtime> for Subtime {
99 fn results_in(&self, src: &Subtime) -> Option<Subtime> {
100 self.0.results_in(&src.0).map(Subtime)
101 }
102
103 fn followed_by(&self, other: &Self) -> Option<Self> {
104 self.0.followed_by(&other.0).map(Subtime)
105 }
106}
107
108impl TimelyTimestamp for Subtime {
109 type Summary = Subtime;
110
111 fn minimum() -> Self {
112 Subtime(0)
113 }
114}
115
116impl Subtime {
117 pub const fn least_summary() -> Self {
119 Subtime(1)
120 }
121}
122
123pub fn persist_source<G>(
146 scope: &mut G,
147 source_id: GlobalId,
148 persist_clients: Arc<PersistClientCache>,
149 txns_ctx: &TxnsContext,
150 metadata: CollectionMetadata,
151 read_schema: Option<RelationDesc>,
152 as_of: Option<Antichain<Timestamp>>,
153 snapshot_mode: SnapshotMode,
154 until: Antichain<Timestamp>,
155 map_filter_project: Option<&mut MfpPlan>,
156 max_inflight_bytes: Option<usize>,
157 start_signal: impl Future<Output = ()> + 'static,
158 error_handler: ErrorHandler,
159) -> (
160 Stream<G, (Row, Timestamp, Diff)>,
161 Stream<G, (DataflowError, Timestamp, Diff)>,
162 Vec<PressOnDropButton>,
163)
164where
165 G: Scope<Timestamp = mz_repr::Timestamp>,
166{
167 let shard_metrics = persist_clients.shard_metrics(&metadata.data_shard, &source_id.to_string());
168
169 let mut tokens = vec![];
170
171 let stream = scope.scoped(&format!("granular_backpressure({})", source_id), |scope| {
172 let (flow_control, flow_control_probe) = match max_inflight_bytes {
173 Some(max_inflight_bytes) => {
174 let backpressure_metrics = BackpressureMetrics {
175 emitted_bytes: Arc::clone(&shard_metrics.backpressure_emitted_bytes),
176 last_backpressured_bytes: Arc::clone(
177 &shard_metrics.backpressure_last_backpressured_bytes,
178 ),
179 retired_bytes: Arc::clone(&shard_metrics.backpressure_retired_bytes),
180 };
181
182 let probe = mz_timely_util::probe::Handle::default();
183 let progress_stream = mz_timely_util::probe::source(
184 scope.clone(),
185 format!("decode_backpressure_probe({source_id})"),
186 probe.clone(),
187 );
188 let flow_control = FlowControl {
189 progress_stream,
190 max_inflight_bytes,
191 summary: (Default::default(), Subtime::least_summary()),
192 metrics: Some(backpressure_metrics),
193 };
194 (Some(flow_control), Some(probe))
195 }
196 None => (None, None),
197 };
198
199 let cfg = Arc::clone(&persist_clients.cfg().configs);
205 let subscribe_sleep = match metadata.txns_shard {
206 Some(_) => Some(move || mz_txn_wal::operator::txns_data_shard_retry_params(&cfg)),
207 None => None,
208 };
209
210 let (stream, source_tokens) = persist_source_core(
211 scope,
212 source_id,
213 Arc::clone(&persist_clients),
214 metadata.clone(),
215 read_schema,
216 as_of.clone(),
217 snapshot_mode,
218 until.clone(),
219 map_filter_project,
220 flow_control,
221 subscribe_sleep,
222 start_signal,
223 error_handler,
224 );
225 tokens.extend(source_tokens);
226
227 let stream = match flow_control_probe {
228 Some(probe) => stream.probe_notify_with(vec![probe]),
229 None => stream,
230 };
231
232 stream.leave()
233 });
234
235 let (stream, txns_tokens) = match metadata.txns_shard {
240 Some(txns_shard) => txns_progress::<SourceData, (), Timestamp, i64, _, TxnsCodecRow, _, _>(
241 stream,
242 &source_id.to_string(),
243 txns_ctx,
244 move || {
245 let (c, l) = (
246 Arc::clone(&persist_clients),
247 metadata.persist_location.clone(),
248 );
249 async move { c.open(l).await.expect("location is valid") }
250 },
251 txns_shard,
252 metadata.data_shard,
253 as_of
254 .expect("as_of is provided for table sources")
255 .into_option()
256 .expect("shard is not closed"),
257 until,
258 Arc::new(metadata.relation_desc),
259 Arc::new(UnitSchema),
260 ),
261 None => (stream, vec![]),
262 };
263 tokens.extend(txns_tokens);
264 let (ok_stream, err_stream) = stream.ok_err(|(d, t, r)| match d {
265 Ok(row) => Ok((row, t.0, r)),
266 Err(err) => Err((err, t.0, r)),
267 });
268 (ok_stream, err_stream, tokens)
269}
270
271type RefinedScope<'g, G> = Child<'g, G, (<G as ScopeParent>::Timestamp, Subtime)>;
272
273#[allow(clippy::needless_borrow)]
280pub fn persist_source_core<'g, G>(
281 scope: &RefinedScope<'g, G>,
282 source_id: GlobalId,
283 persist_clients: Arc<PersistClientCache>,
284 metadata: CollectionMetadata,
285 read_schema: Option<RelationDesc>,
286 as_of: Option<Antichain<Timestamp>>,
287 snapshot_mode: SnapshotMode,
288 until: Antichain<Timestamp>,
289 map_filter_project: Option<&mut MfpPlan>,
290 flow_control: Option<FlowControl<RefinedScope<'g, G>>>,
291 listen_sleep: Option<impl Fn() -> RetryParameters + 'static>,
293 start_signal: impl Future<Output = ()> + 'static,
294 error_handler: ErrorHandler,
295) -> (
296 Stream<
297 RefinedScope<'g, G>,
298 (
299 Result<Row, DataflowError>,
300 (mz_repr::Timestamp, Subtime),
301 Diff,
302 ),
303 >,
304 Vec<PressOnDropButton>,
305)
306where
307 G: Scope<Timestamp = mz_repr::Timestamp>,
308{
309 let cfg = persist_clients.cfg().clone();
310 let name = source_id.to_string();
311 let filter_plan = map_filter_project.as_ref().map(|p| (*p).clone());
312
313 let read_desc = match read_schema {
315 Some(desc) => desc,
316 None => metadata.relation_desc,
317 };
318
319 let desc_transformer = match flow_control {
320 Some(flow_control) => Some(move |mut scope: _, descs: &Stream<_, _>, chosen_worker| {
321 let (stream, token) = backpressure(
322 &mut scope,
323 &format!("backpressure({source_id})"),
324 descs,
325 flow_control,
326 chosen_worker,
327 None,
328 );
329 (stream, vec![token])
330 }),
331 None => None,
332 };
333
334 let metrics = Arc::clone(persist_clients.metrics());
335 let filter_name = name.clone();
336 let upper = until.as_option().cloned().unwrap_or(Timestamp::MAX);
340 let (fetched, token) = shard_source(
341 &mut scope.clone(),
342 &name,
343 move || {
344 let (c, l) = (
345 Arc::clone(&persist_clients),
346 metadata.persist_location.clone(),
347 );
348 async move { c.open(l).await.unwrap() }
349 },
350 metadata.data_shard,
351 as_of,
352 snapshot_mode,
353 until.clone(),
354 desc_transformer,
355 Arc::new(read_desc.clone()),
356 Arc::new(UnitSchema),
357 move |stats, frontier| {
358 let Some(lower) = frontier.as_option().copied() else {
359 return FilterResult::Discard;
362 };
363
364 if lower > upper {
365 return FilterResult::Discard;
368 }
369
370 let time_range =
371 ResultSpec::value_between(Datum::MzTimestamp(lower), Datum::MzTimestamp(upper));
372 if let Some(plan) = &filter_plan {
373 let metrics = &metrics.pushdown.part_stats;
374 let stats = RelationPartStats::new(&filter_name, metrics, &read_desc, stats);
375 filter_result(&read_desc, time_range, stats, plan)
376 } else {
377 FilterResult::Keep
378 }
379 },
380 listen_sleep,
381 start_signal,
382 error_handler,
383 );
384 let rows = decode_and_mfp(cfg, &fetched, &name, until, map_filter_project);
385 (rows, token)
386}
387
388fn filter_result(
389 relation_desc: &RelationDesc,
390 time_range: ResultSpec,
391 stats: RelationPartStats,
392 plan: &MfpPlan,
393) -> FilterResult {
394 let arena = RowArena::new();
395 let mut ranges = ColumnSpecs::new(relation_desc.typ(), &arena);
396 ranges.push_unmaterializable(UnmaterializableFunc::MzNow, time_range);
397
398 let may_error = stats.err_count().map_or(true, |count| count > 0);
399
400 for (pos, (idx, _, _)) in relation_desc.iter_all().enumerate() {
403 let result_spec = stats.col_stats(idx, &arena);
404 ranges.push_column(pos, result_spec);
405 }
406 let result = ranges.mfp_plan_filter(plan).range;
407 let may_error = may_error || result.may_fail();
408 let may_keep = result.may_contain(Datum::True);
409 let may_skip = result.may_contain(Datum::False) || result.may_contain(Datum::Null);
410 if relation_desc.len() == 0 && !may_error && !may_skip {
411 let Ok(mut key) = <RelationDesc as Schema<SourceData>>::encoder(relation_desc) else {
412 return FilterResult::Keep;
413 };
414 key.append(&SourceData(Ok(Row::default())));
415 let key = key.finish();
416 let Ok(mut val) = <UnitSchema as Schema<()>>::encoder(&UnitSchema) else {
417 return FilterResult::Keep;
418 };
419 val.append(&());
420 let val = val.finish();
421
422 FilterResult::ReplaceWith {
423 key: Arc::new(key),
424 val: Arc::new(val),
425 }
426 } else if may_error || may_keep {
427 FilterResult::Keep
428 } else {
429 FilterResult::Discard
430 }
431}
432
433pub fn decode_and_mfp<G>(
434 cfg: PersistConfig,
435 fetched: &Stream<G, FetchedBlob<SourceData, (), Timestamp, StorageDiff>>,
436 name: &str,
437 until: Antichain<Timestamp>,
438 mut map_filter_project: Option<&mut MfpPlan>,
439) -> Stream<G, (Result<Row, DataflowError>, G::Timestamp, Diff)>
440where
441 G: Scope<Timestamp = (mz_repr::Timestamp, Subtime)>,
442{
443 let scope = fetched.scope();
444 let mut builder = OperatorBuilder::new(
445 format!("persist_source::decode_and_mfp({})", name),
446 scope.clone(),
447 );
448 let operator_info = builder.operator_info();
449
450 let mut fetched_input = builder.new_input(fetched, Pipeline);
451 let (updates_output, updates_stream) = builder.new_output();
452 let mut updates_output = OutputBuilder::from(updates_output);
453
454 let mut datum_vec = mz_repr::DatumVec::new();
456 let mut row_builder = Row::default();
457
458 let map_filter_project = map_filter_project.as_mut().map(|mfp| mfp.take());
460
461 builder.build(move |_caps| {
462 let name = name.to_owned();
463 let activations = scope.activations();
465 let activator = Activator::new(operator_info.address, activations);
466 let mut pending_work = std::collections::VecDeque::new();
468 let panic_on_audit_failure = STATS_AUDIT_PANIC.handle(&cfg);
469
470 move |_frontier| {
471 fetched_input.for_each(|time, data| {
472 let capability = time.retain();
473 for fetched_blob in data.drain(..) {
474 pending_work.push_back(PendingWork {
475 panic_on_audit_failure: panic_on_audit_failure.get(),
476 capability: capability.clone(),
477 part: PendingPart::Unparsed(fetched_blob),
478 })
479 }
480 });
481
482 let yield_fuel = cfg.storage_source_decode_fuel();
485 let yield_fn = |_, work| work >= yield_fuel;
486
487 let mut work = 0;
488 let start_time = Instant::now();
489 let mut output = updates_output.activate();
490 while !pending_work.is_empty() && !yield_fn(start_time, work) {
491 let done = pending_work.front_mut().unwrap().do_work(
492 &mut work,
493 &name,
494 start_time,
495 yield_fn,
496 &until,
497 map_filter_project.as_ref(),
498 &mut datum_vec,
499 &mut row_builder,
500 &mut output,
501 );
502 if done {
503 pending_work.pop_front();
504 }
505 }
506 if !pending_work.is_empty() {
507 activator.activate();
508 }
509 }
510 });
511
512 updates_stream
513}
514
515struct PendingWork {
517 panic_on_audit_failure: bool,
519 capability: Capability<(mz_repr::Timestamp, Subtime)>,
521 part: PendingPart,
523}
524
525enum PendingPart {
526 Unparsed(FetchedBlob<SourceData, (), Timestamp, StorageDiff>),
527 Parsed {
528 part: ShardSourcePart<SourceData, (), Timestamp, StorageDiff>,
529 },
530}
531
532impl PendingPart {
533 fn part_mut(&mut self) -> &mut FetchedPart<SourceData, (), Timestamp, StorageDiff> {
540 match self {
541 PendingPart::Unparsed(x) => {
542 *self = PendingPart::Parsed { part: x.parse() };
543 self.part_mut()
545 }
546 PendingPart::Parsed { part } => &mut part.part,
547 }
548 }
549}
550
551impl PendingWork {
552 fn do_work<YFn>(
555 &mut self,
556 work: &mut usize,
557 name: &str,
558 start_time: Instant,
559 yield_fn: YFn,
560 until: &Antichain<Timestamp>,
561 map_filter_project: Option<&MfpPlan>,
562 datum_vec: &mut DatumVec,
563 row_builder: &mut Row,
564 output: &mut OutputBuilderSession<
565 '_,
566 (mz_repr::Timestamp, Subtime),
567 ConsolidatingContainerBuilder<
568 Vec<(
569 Result<Row, DataflowError>,
570 (mz_repr::Timestamp, Subtime),
571 Diff,
572 )>,
573 >,
574 >,
575 ) -> bool
576 where
577 YFn: Fn(Instant, usize) -> bool,
578 {
579 let mut session = output.session_with_builder(&self.capability);
580 let fetched_part = self.part.part_mut();
581 let is_filter_pushdown_audit = fetched_part.is_filter_pushdown_audit();
582 let mut row_buf = None;
583 while let Some(((key, val), time, diff)) =
584 fetched_part.next_with_storage(&mut row_buf, &mut None)
585 {
586 if until.less_equal(&time) {
587 continue;
588 }
589 match (key, val) {
590 (SourceData(Ok(row)), ()) => {
591 if let Some(mfp) = map_filter_project {
592 *work += 1;
599 let arena = mz_repr::RowArena::new();
600 let mut datums_local = datum_vec.borrow_with(&row);
601 for result in mfp.evaluate(
602 &mut datums_local,
603 &arena,
604 time,
605 diff.into(),
606 |time| !until.less_equal(time),
607 row_builder,
608 ) {
609 if let Some(stats) = &is_filter_pushdown_audit {
613 sentry::with_scope(
617 |scope| {
618 scope
619 .set_tag("alert_id", "persist_pushdown_audit_violation")
620 },
621 || {
622 error!(
623 ?stats,
624 name,
625 ?mfp,
626 ?result,
627 "persist filter pushdown correctness violation!"
628 );
629 if self.panic_on_audit_failure {
630 panic!(
631 "persist filter pushdown correctness violation! {}",
632 name
633 );
634 }
635 },
636 );
637 }
638 match result {
639 Ok((row, time, diff)) => {
640 if !until.less_equal(&time) {
642 let mut emit_time = *self.capability.time();
643 emit_time.0 = time;
644 session.give((Ok(row), emit_time, diff));
645 *work += 1;
646 }
647 }
648 Err((err, time, diff)) => {
649 if !until.less_equal(&time) {
651 let mut emit_time = *self.capability.time();
652 emit_time.0 = time;
653 session.give((Err(err), emit_time, diff));
654 *work += 1;
655 }
656 }
657 }
658 }
659 drop(datums_local);
663 row_buf.replace(SourceData(Ok(row)));
664 } else {
665 let mut emit_time = *self.capability.time();
666 emit_time.0 = time;
667 session.give((Ok(row.clone()), emit_time, diff.into()));
669 row_buf.replace(SourceData(Ok(row)));
670 *work += 1;
671 }
672 }
673 (SourceData(Err(err)), ()) => {
674 let mut emit_time = *self.capability.time();
675 emit_time.0 = time;
676 session.give((Err(err), emit_time, diff.into()));
677 *work += 1;
678 }
679 }
680 if yield_fn(start_time, *work) {
681 return false;
682 }
683 }
684 true
685 }
686}
687
688pub trait Backpressureable: Clone + 'static {
690 fn byte_size(&self) -> usize;
692}
693
694impl<T: Clone + 'static> Backpressureable for (usize, ExchangeableBatchPart<T>) {
695 fn byte_size(&self) -> usize {
696 self.1.encoded_size_bytes()
697 }
698}
699
700#[derive(Debug)]
702pub struct FlowControl<G: Scope> {
703 pub progress_stream: Stream<G, Infallible>,
709 pub max_inflight_bytes: usize,
711 pub summary: <G::Timestamp as TimelyTimestamp>::Summary,
714
715 pub metrics: Option<BackpressureMetrics>,
717}
718
719pub fn backpressure<T, G, O>(
732 scope: &mut G,
733 name: &str,
734 data: &Stream<G, O>,
735 flow_control: FlowControl<G>,
736 chosen_worker: usize,
737 probe: Option<UnboundedSender<(Antichain<(T, Subtime)>, usize, usize)>>,
739) -> (Stream<G, O>, PressOnDropButton)
740where
741 T: TimelyTimestamp + Lattice + Codec64 + TotalOrder,
742 G: Scope<Timestamp = (T, Subtime)>,
743 O: Backpressureable + std::fmt::Debug,
744{
745 let worker_index = scope.index();
746
747 let (flow_control_stream, flow_control_max_bytes, metrics) = (
748 flow_control.progress_stream,
749 flow_control.max_inflight_bytes,
750 flow_control.metrics,
751 );
752
753 let (handle, summaried_flow) = scope.feedback(flow_control.summary.clone());
758 flow_control_stream.connect_loop(handle);
759
760 let mut builder = AsyncOperatorBuilder::new(
761 format!("persist_source_backpressure({})", name),
762 scope.clone(),
763 );
764 let (data_output, data_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
765
766 let mut data_input = builder.new_disconnected_input(data, Pipeline);
767 let mut flow_control_input = builder.new_disconnected_input(&summaried_flow, Pipeline);
768
769 fn synthesize_frontiers<T: PartialOrder + Clone>(
771 mut frontier: Antichain<(T, Subtime)>,
772 mut time: (T, Subtime),
773 part_number: &mut u64,
774 ) -> (
775 (T, Subtime),
776 Antichain<(T, Subtime)>,
777 Antichain<(T, Subtime)>,
778 ) {
779 let mut next_frontier = frontier.clone();
780 time.1 = Subtime(*part_number);
781 frontier.insert(time.clone());
782 *part_number += 1;
783 let mut next_time = time.clone();
784 next_time.1 = Subtime(*part_number);
785 next_frontier.insert(next_time);
786 (time, frontier, next_frontier)
787 }
788
789 let data_input = async_stream::stream!({
792 let mut part_number = 0;
793 let mut parts: Vec<((T, Subtime), O)> = Vec::new();
794 loop {
795 match data_input.next().await {
796 None => {
797 let empty = Antichain::new();
798 parts.sort_by_key(|val| val.0.clone());
799 for (part_time, d) in parts.drain(..) {
800 let (part_time, frontier, next_frontier) = synthesize_frontiers(
801 empty.clone(),
802 part_time.clone(),
803 &mut part_number,
804 );
805 yield Either::Right((part_time, d, frontier, next_frontier))
806 }
807 break;
808 }
809 Some(Event::Data(time, data)) => {
810 for d in data {
811 parts.push((time.clone(), d));
812 }
813 }
814 Some(Event::Progress(prog)) => {
815 parts.sort_by_key(|val| val.0.clone());
816 for (part_time, d) in parts.extract_if(.., |p| !prog.less_equal(&p.0)) {
817 let (part_time, frontier, next_frontier) =
818 synthesize_frontiers(prog.clone(), part_time.clone(), &mut part_number);
819 yield Either::Right((part_time, d, frontier, next_frontier))
820 }
821 yield Either::Left(prog)
822 }
823 }
824 }
825 });
826 let shutdown_button = builder.build(move |caps| async move {
827 let mut cap_set = CapabilitySet::from_elem(caps.into_element());
829
830 let mut output_frontier = Antichain::from_elem(TimelyTimestamp::minimum());
832 let mut flow_control_frontier = Antichain::from_elem(TimelyTimestamp::minimum());
834
835 let mut inflight_parts = Vec::new();
837 let mut pending_parts = std::collections::VecDeque::new();
839
840 if worker_index != chosen_worker {
842 trace!(
843 "We are not the chosen worker ({}), exiting...",
844 chosen_worker
845 );
846 return;
847 }
848 tokio::pin!(data_input);
849 'emitting_parts: loop {
850 let inflight_bytes: usize = inflight_parts.iter().map(|(_, size)| size).sum();
853
854 if inflight_bytes < flow_control_max_bytes
862 || !PartialOrder::less_equal(&flow_control_frontier, &output_frontier)
863 {
864 let (time, part, next_frontier) =
865 if let Some((time, part, next_frontier)) = pending_parts.pop_front() {
866 (time, part, next_frontier)
867 } else {
868 match data_input.next().await {
869 Some(Either::Right((time, part, frontier, next_frontier))) => {
870 output_frontier = frontier;
875 cap_set.downgrade(output_frontier.iter());
876
877 if inflight_bytes >= flow_control_max_bytes
882 && !PartialOrder::less_than(
883 &output_frontier,
884 &flow_control_frontier,
885 )
886 {
887 pending_parts.push_back((time, part, next_frontier));
888 continue 'emitting_parts;
889 }
890 (time, part, next_frontier)
891 }
892 Some(Either::Left(prog)) => {
893 output_frontier = prog;
894 cap_set.downgrade(output_frontier.iter());
895 continue 'emitting_parts;
896 }
897 None => {
898 if pending_parts.is_empty() {
899 break 'emitting_parts;
900 } else {
901 continue 'emitting_parts;
902 }
903 }
904 }
905 };
906
907 let byte_size = part.byte_size();
908 if let Some(emission_ts) = flow_control.summary.results_in(&time) {
918 inflight_parts.push((emission_ts, byte_size));
919 }
920
921 data_output.give(&cap_set.delayed(&time), part);
924
925 if let Some(metrics) = &metrics {
926 metrics.emitted_bytes.inc_by(u64::cast_from(byte_size))
927 }
928
929 output_frontier = next_frontier;
930 cap_set.downgrade(output_frontier.iter())
931 } else {
932 if let Some(metrics) = &metrics {
933 metrics
934 .last_backpressured_bytes
935 .set(u64::cast_from(inflight_bytes))
936 }
937 let parts_count = inflight_parts.len();
938 let new_flow_control_frontier = match flow_control_input.next().await {
943 Some(Event::Progress(frontier)) => frontier,
944 Some(Event::Data(_, _)) => {
945 unreachable!("flow_control_input should not contain data")
946 }
947 None => Antichain::new(),
948 };
949
950 flow_control_frontier.clone_from(&new_flow_control_frontier);
952
953 let retired_parts = inflight_parts
955 .extract_if(.., |(ts, _size)| !flow_control_frontier.less_equal(ts));
956 let (retired_size, retired_count): (usize, usize) = retired_parts
957 .fold((0, 0), |(accum_size, accum_count), (_ts, size)| {
958 (accum_size + size, accum_count + 1)
959 });
960 trace!(
961 "returning {} parts with {} bytes, frontier: {:?}",
962 retired_count, retired_size, flow_control_frontier,
963 );
964
965 if let Some(metrics) = &metrics {
966 metrics.retired_bytes.inc_by(u64::cast_from(retired_size))
967 }
968
969 if let Some(probe) = probe.as_ref() {
971 let _ = probe.send((new_flow_control_frontier, parts_count, retired_count));
972 }
973 }
974 }
975 });
976 (data_stream, shutdown_button.press_on_drop())
977}
978
979#[cfg(test)]
980mod tests {
981 use timely::container::CapacityContainerBuilder;
982 use timely::dataflow::operators::{Enter, Probe};
983 use tokio::sync::mpsc::unbounded_channel;
984 use tokio::sync::oneshot;
985
986 use super::*;
987
988 #[mz_ore::test]
989 fn test_backpressure_non_granular() {
990 use Step::*;
991 backpressure_runner(
992 vec![(50, Part(101)), (50, Part(102)), (100, Part(1))],
993 100,
994 (1, Subtime(0)),
995 vec![
996 AssertOutputFrontier((50, Subtime(2))),
999 AssertBackpressured {
1000 frontier: (1, Subtime(0)),
1001 inflight_parts: 1,
1002 retired_parts: 0,
1003 },
1004 AssertBackpressured {
1005 frontier: (51, Subtime(0)),
1006 inflight_parts: 1,
1007 retired_parts: 0,
1008 },
1009 ProcessXParts(2),
1010 AssertBackpressured {
1011 frontier: (101, Subtime(0)),
1012 inflight_parts: 2,
1013 retired_parts: 2,
1014 },
1015 AssertOutputFrontier((100, Subtime(3))),
1018 ],
1019 true,
1020 );
1021
1022 backpressure_runner(
1023 vec![
1024 (50, Part(10)),
1025 (50, Part(10)),
1026 (51, Part(100)),
1027 (52, Part(1000)),
1028 ],
1029 50,
1030 (1, Subtime(0)),
1031 vec![
1032 AssertOutputFrontier((51, Subtime(3))),
1034 AssertBackpressured {
1035 frontier: (1, Subtime(0)),
1036 inflight_parts: 3,
1037 retired_parts: 0,
1038 },
1039 ProcessXParts(3),
1040 AssertBackpressured {
1041 frontier: (52, Subtime(0)),
1042 inflight_parts: 3,
1043 retired_parts: 2,
1044 },
1045 AssertBackpressured {
1046 frontier: (53, Subtime(0)),
1047 inflight_parts: 1,
1048 retired_parts: 1,
1049 },
1050 AssertOutputFrontier((52, Subtime(4))),
1053 ],
1054 true,
1055 );
1056
1057 backpressure_runner(
1058 vec![
1059 (50, Part(98)),
1060 (50, Part(1)),
1061 (51, Part(10)),
1062 (52, Part(100)),
1063 (52, Part(10)),
1065 (52, Part(10)),
1066 (52, Part(10)),
1067 (52, Part(100)),
1068 (100, Part(100)),
1070 ],
1071 100,
1072 (1, Subtime(0)),
1073 vec![
1074 AssertOutputFrontier((51, Subtime(3))),
1075 AssertBackpressured {
1079 frontier: (1, Subtime(0)),
1080 inflight_parts: 3,
1081 retired_parts: 0,
1082 },
1083 AssertBackpressured {
1084 frontier: (51, Subtime(0)),
1085 inflight_parts: 3,
1086 retired_parts: 0,
1087 },
1088 ProcessXParts(1),
1089 AssertOutputFrontier((51, Subtime(3))),
1092 ProcessXParts(1),
1096 AssertOutputFrontier((52, Subtime(4))),
1097 AssertBackpressured {
1098 frontier: (52, Subtime(0)),
1099 inflight_parts: 3,
1100 retired_parts: 2,
1101 },
1102 ProcessXParts(1),
1106 AssertBackpressured {
1110 frontier: (53, Subtime(0)),
1111 inflight_parts: 2,
1112 retired_parts: 1,
1113 },
1114 ProcessXParts(5),
1116 AssertBackpressured {
1117 frontier: (101, Subtime(0)),
1118 inflight_parts: 5,
1119 retired_parts: 5,
1120 },
1121 AssertOutputFrontier((100, Subtime(9))),
1122 ],
1123 true,
1124 );
1125 }
1126
1127 #[mz_ore::test]
1128 fn test_backpressure_granular() {
1129 use Step::*;
1130 backpressure_runner(
1131 vec![(50, Part(101)), (50, Part(101))],
1132 100,
1133 (0, Subtime(1)),
1134 vec![
1135 AssertOutputFrontier((50, Subtime(1))),
1137 AssertBackpressured {
1140 frontier: (0, Subtime(1)),
1141 inflight_parts: 1,
1142 retired_parts: 0,
1143 },
1144 AssertBackpressured {
1145 frontier: (50, Subtime(1)),
1146 inflight_parts: 1,
1147 retired_parts: 0,
1148 },
1149 ProcessXParts(1),
1151 AssertBackpressured {
1153 frontier: (50, Subtime(2)),
1154 inflight_parts: 1,
1155 retired_parts: 1,
1156 },
1157 AssertOutputFrontier((50, Subtime(2))),
1159 ],
1160 false,
1161 );
1162
1163 backpressure_runner(
1164 vec![
1165 (50, Part(10)),
1166 (50, Part(10)),
1167 (51, Part(35)),
1168 (52, Part(100)),
1169 ],
1170 50,
1171 (0, Subtime(1)),
1172 vec![
1173 AssertOutputFrontier((51, Subtime(3))),
1175 AssertBackpressured {
1176 frontier: (0, Subtime(1)),
1177 inflight_parts: 3,
1178 retired_parts: 0,
1179 },
1180 AssertBackpressured {
1181 frontier: (50, Subtime(1)),
1182 inflight_parts: 3,
1183 retired_parts: 0,
1184 },
1185 ProcessXParts(1),
1187 AssertBackpressured {
1188 frontier: (50, Subtime(2)),
1189 inflight_parts: 3,
1190 retired_parts: 1,
1191 },
1192 AssertOutputFrontier((52, Subtime(4))),
1195 ProcessXParts(2),
1196 AssertBackpressured {
1197 frontier: (52, Subtime(4)),
1198 inflight_parts: 3,
1199 retired_parts: 2,
1200 },
1201 ],
1202 false,
1203 );
1204 }
1205
1206 type Time = (u64, Subtime);
1207 #[derive(Clone, Debug)]
1208 struct Part(usize);
1209 impl Backpressureable for Part {
1210 fn byte_size(&self) -> usize {
1211 self.0
1212 }
1213 }
1214
1215 enum Step {
1217 AssertOutputFrontier(Time),
1220 AssertBackpressured {
1224 frontier: Time,
1225 inflight_parts: usize,
1226 retired_parts: usize,
1227 },
1228 ProcessXParts(usize),
1230 }
1231
1232 fn backpressure_runner(
1234 input: Vec<(u64, Part)>,
1236 max_inflight_bytes: usize,
1238 summary: Time,
1240 steps: Vec<Step>,
1242 non_granular_consumer: bool,
1245 ) {
1246 timely::execute::execute_directly(move |worker| {
1247 let (
1248 backpressure_probe,
1249 consumer_tx,
1250 mut backpressure_status_rx,
1251 finalizer_tx,
1252 _token,
1253 ) =
1254 worker.dataflow::<u64, _, _>(|scope| {
1256 let (non_granular_feedback_handle, non_granular_feedback) =
1257 if non_granular_consumer {
1258 let (h, f) = scope.feedback(Default::default());
1259 (Some(h), Some(f))
1260 } else {
1261 (None, None)
1262 };
1263 let (
1264 backpressure_probe,
1265 consumer_tx,
1266 backpressure_status_rx,
1267 token,
1268 backpressured,
1269 finalizer_tx,
1270 ) = scope.scoped::<(u64, Subtime), _, _>("hybrid", |scope| {
1271 let (input, finalizer_tx) =
1272 iterator_operator(scope.clone(), input.into_iter());
1273
1274 let (flow_control, granular_feedback_handle) = if non_granular_consumer {
1275 (
1276 FlowControl {
1277 progress_stream: non_granular_feedback.unwrap().enter(scope),
1278 max_inflight_bytes,
1279 summary,
1280 metrics: None
1281 },
1282 None,
1283 )
1284 } else {
1285 let (granular_feedback_handle, granular_feedback) =
1286 scope.feedback(Default::default());
1287 (
1288 FlowControl {
1289 progress_stream: granular_feedback,
1290 max_inflight_bytes,
1291 summary,
1292 metrics: None,
1293 },
1294 Some(granular_feedback_handle),
1295 )
1296 };
1297
1298 let (backpressure_status_tx, backpressure_status_rx) = unbounded_channel();
1299
1300 let (backpressured, token) = backpressure(
1301 scope,
1302 "test",
1303 &input,
1304 flow_control,
1305 0,
1306 Some(backpressure_status_tx),
1307 );
1308
1309 let tx = if !non_granular_consumer {
1311 Some(consumer_operator(
1312 scope.clone(),
1313 &backpressured,
1314 granular_feedback_handle.unwrap(),
1315 ))
1316 } else {
1317 None
1318 };
1319
1320 (
1321 backpressured.probe(),
1322 tx,
1323 backpressure_status_rx,
1324 token,
1325 backpressured.leave(),
1326 finalizer_tx,
1327 )
1328 });
1329
1330 let consumer_tx = if non_granular_consumer {
1332 consumer_operator(
1333 scope.clone(),
1334 &backpressured,
1335 non_granular_feedback_handle.unwrap(),
1336 )
1337 } else {
1338 consumer_tx.unwrap()
1339 };
1340
1341 (
1342 backpressure_probe,
1343 consumer_tx,
1344 backpressure_status_rx,
1345 finalizer_tx,
1346 token,
1347 )
1348 });
1349
1350 use Step::*;
1351 for step in steps {
1352 match step {
1353 AssertOutputFrontier(time) => {
1354 eprintln!("checking advance to {time:?}");
1355 backpressure_probe.with_frontier(|front| {
1356 eprintln!("current backpressure output frontier: {front:?}");
1357 });
1358 while backpressure_probe.less_than(&time) {
1359 worker.step();
1360 backpressure_probe.with_frontier(|front| {
1361 eprintln!("current backpressure output frontier: {front:?}");
1362 });
1363 std::thread::sleep(std::time::Duration::from_millis(25));
1364 }
1365 }
1366 ProcessXParts(parts) => {
1367 eprintln!("processing {parts:?} parts");
1368 for _ in 0..parts {
1369 consumer_tx.send(()).unwrap();
1370 }
1371 }
1372 AssertBackpressured {
1373 frontier,
1374 inflight_parts,
1375 retired_parts,
1376 } => {
1377 let frontier = Antichain::from_elem(frontier);
1378 eprintln!(
1379 "asserting backpressured at {frontier:?}, with {inflight_parts:?} inflight parts \
1380 and {retired_parts:?} retired"
1381 );
1382 let (new_frontier, new_count, new_retired_count) = loop {
1383 if let Ok(val) = backpressure_status_rx.try_recv() {
1384 break val;
1385 }
1386 worker.step();
1387 std::thread::sleep(std::time::Duration::from_millis(25));
1388 };
1389 assert_eq!(
1390 (frontier, inflight_parts, retired_parts),
1391 (new_frontier, new_count, new_retired_count)
1392 );
1393 }
1394 }
1395 }
1396 let _ = finalizer_tx.send(());
1398 });
1399 }
1400
1401 fn iterator_operator<
1404 G: Scope<Timestamp = (u64, Subtime)>,
1405 I: Iterator<Item = (u64, Part)> + 'static,
1406 >(
1407 scope: G,
1408 mut input: I,
1409 ) -> (Stream<G, Part>, oneshot::Sender<()>) {
1410 let (finalizer_tx, finalizer_rx) = oneshot::channel();
1411 let mut iterator = AsyncOperatorBuilder::new("iterator".to_string(), scope);
1412 let (output_handle, output) = iterator.new_output::<CapacityContainerBuilder<Vec<Part>>>();
1413
1414 iterator.build(|mut caps| async move {
1415 let mut capability = Some(caps.pop().unwrap());
1416 let mut last = None;
1417 while let Some(element) = input.next() {
1418 let time = element.0.clone();
1419 let part = element.1;
1420 last = Some((time, Subtime(0)));
1421 output_handle.give(&capability.as_ref().unwrap().delayed(&last.unwrap()), part);
1422 }
1423 if let Some(last) = last {
1424 capability
1425 .as_mut()
1426 .unwrap()
1427 .downgrade(&(last.0 + 1, last.1));
1428 }
1429
1430 let _ = finalizer_rx.await;
1431 capability.take();
1432 });
1433
1434 (output, finalizer_tx)
1435 }
1436
1437 fn consumer_operator<G: Scope, O: Backpressureable + std::fmt::Debug>(
1441 scope: G,
1442 input: &Stream<G, O>,
1443 feedback: timely::dataflow::operators::feedback::Handle<G, Vec<std::convert::Infallible>>,
1444 ) -> UnboundedSender<()> {
1445 let (tx, mut rx) = unbounded_channel::<()>();
1446 let mut consumer = AsyncOperatorBuilder::new("consumer".to_string(), scope);
1447 let (output_handle, output) =
1448 consumer.new_output::<CapacityContainerBuilder<Vec<std::convert::Infallible>>>();
1449 let mut input = consumer.new_input_for(input, Pipeline, &output_handle);
1450
1451 consumer.build(|_caps| async move {
1452 while let Some(()) = rx.recv().await {
1453 while let Some(Event::Progress(_)) = input.next().await {}
1455 }
1456 });
1457 output.connect_loop(feedback);
1458
1459 tx
1460 }
1461}