1use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
13use std::convert::Infallible;
14use std::fmt::Debug;
15use std::future::Future;
16use std::hash::Hash;
17use std::sync::Arc;
18use std::time::Instant;
19
20use differential_dataflow::lattice::Lattice;
21use futures::{StreamExt, future::Either};
22use mz_expr::{ColumnSpecs, Interpreter, MfpPlan, ResultSpec, UnmaterializableFunc};
23use mz_ore::cast::CastFrom;
24use mz_ore::collections::CollectionExt;
25use mz_persist_client::cache::PersistClientCache;
26use mz_persist_client::cfg::{PersistConfig, RetryParameters};
27use mz_persist_client::fetch::{ExchangeableBatchPart, ShardSourcePart};
28use mz_persist_client::fetch::{FetchedBlob, FetchedPart};
29use mz_persist_client::operators::shard_source::{
30 ErrorHandler, FilterResult, SnapshotMode, shard_source,
31};
32use mz_persist_client::stats::STATS_AUDIT_PANIC;
33use mz_persist_types::Codec64;
34use mz_persist_types::codec_impls::UnitSchema;
35use mz_persist_types::columnar::{ColumnEncoder, Schema};
36use mz_repr::{
37 Datum, DatumVec, Diff, GlobalId, RelationDesc, ReprRelationType, Row, RowArena, Timestamp,
38};
39use mz_storage_types::StorageDiff;
40use mz_storage_types::controller::{CollectionMetadata, TxnsCodecRow};
41use mz_storage_types::errors::DataflowError;
42use mz_storage_types::sources::SourceData;
43use mz_storage_types::stats::RelationPartStats;
44use mz_timely_util::builder_async::{
45 Event, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
46};
47use mz_timely_util::probe::ProbeNotify;
48use mz_txn_wal::operator::{TxnsContext, txns_progress};
49use serde::{Deserialize, Serialize};
50use timely::PartialOrder;
51use timely::container::CapacityContainerBuilder;
52use timely::dataflow::ScopeParent;
53use timely::dataflow::channels::pact::Pipeline;
54use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
55use timely::dataflow::operators::generic::{OutputBuilder, OutputBuilderSession};
56use timely::dataflow::operators::{Capability, Leave, OkErr};
57use timely::dataflow::operators::{CapabilitySet, ConnectLoop, Feedback};
58use timely::dataflow::scopes::Child;
59use timely::dataflow::{Scope, Stream, StreamVec};
60use timely::order::TotalOrder;
61use timely::progress::Antichain;
62use timely::progress::Timestamp as TimelyTimestamp;
63use timely::progress::timestamp::PathSummary;
64use timely::scheduling::Activator;
65use tokio::sync::mpsc::UnboundedSender;
66use tracing::{error, trace};
67
68use crate::metrics::BackpressureMetrics;
69
70#[derive(
78 Copy,
79 Clone,
80 PartialEq,
81 Default,
82 Eq,
83 PartialOrd,
84 Ord,
85 Debug,
86 Serialize,
87 Deserialize,
88 Hash
89)]
90pub struct Subtime(u64);
91
92impl PartialOrder for Subtime {
93 fn less_equal(&self, other: &Self) -> bool {
94 self.0.less_equal(&other.0)
95 }
96}
97
98impl TotalOrder for Subtime {}
99
100impl PathSummary<Subtime> for Subtime {
101 fn results_in(&self, src: &Subtime) -> Option<Subtime> {
102 self.0.results_in(&src.0).map(Subtime)
103 }
104
105 fn followed_by(&self, other: &Self) -> Option<Self> {
106 self.0.followed_by(&other.0).map(Subtime)
107 }
108}
109
110impl TimelyTimestamp for Subtime {
111 type Summary = Subtime;
112
113 fn minimum() -> Self {
114 Subtime(0)
115 }
116}
117
118impl Subtime {
119 pub const fn least_summary() -> Self {
121 Subtime(1)
122 }
123}
124
125pub fn persist_source<G>(
148 scope: &mut G,
149 source_id: GlobalId,
150 persist_clients: Arc<PersistClientCache>,
151 txns_ctx: &TxnsContext,
152 metadata: CollectionMetadata,
153 read_schema: Option<RelationDesc>,
154 as_of: Option<Antichain<Timestamp>>,
155 snapshot_mode: SnapshotMode,
156 until: Antichain<Timestamp>,
157 map_filter_project: Option<&mut MfpPlan>,
158 max_inflight_bytes: Option<usize>,
159 start_signal: impl Future<Output = ()> + 'static,
160 error_handler: ErrorHandler,
161) -> (
162 StreamVec<G, (Row, Timestamp, Diff)>,
163 StreamVec<G, (DataflowError, Timestamp, Diff)>,
164 Vec<PressOnDropButton>,
165)
166where
167 G: Scope<Timestamp = mz_repr::Timestamp>,
168{
169 let shard_metrics = persist_clients.shard_metrics(&metadata.data_shard, &source_id.to_string());
170
171 let mut tokens = vec![];
172
173 let stream = scope.scoped(&format!("granular_backpressure({})", source_id), |scope| {
174 let (flow_control, flow_control_probe) = match max_inflight_bytes {
175 Some(max_inflight_bytes) => {
176 let backpressure_metrics = BackpressureMetrics {
177 emitted_bytes: Arc::clone(&shard_metrics.backpressure_emitted_bytes),
178 last_backpressured_bytes: Arc::clone(
179 &shard_metrics.backpressure_last_backpressured_bytes,
180 ),
181 retired_bytes: Arc::clone(&shard_metrics.backpressure_retired_bytes),
182 };
183
184 let probe = mz_timely_util::probe::Handle::default();
185 let progress_stream = mz_timely_util::probe::source(
186 scope.clone(),
187 format!("decode_backpressure_probe({source_id})"),
188 probe.clone(),
189 );
190 let flow_control = FlowControl {
191 progress_stream,
192 max_inflight_bytes,
193 summary: (Default::default(), Subtime::least_summary()),
194 metrics: Some(backpressure_metrics),
195 };
196 (Some(flow_control), Some(probe))
197 }
198 None => (None, None),
199 };
200
201 let cfg = Arc::clone(&persist_clients.cfg().configs);
207 let subscribe_sleep = match metadata.txns_shard {
208 Some(_) => Some(move || mz_txn_wal::operator::txns_data_shard_retry_params(&cfg)),
209 None => None,
210 };
211
212 let (stream, source_tokens) = persist_source_core(
213 scope,
214 source_id,
215 Arc::clone(&persist_clients),
216 metadata.clone(),
217 read_schema,
218 as_of.clone(),
219 snapshot_mode,
220 until.clone(),
221 map_filter_project,
222 flow_control,
223 subscribe_sleep,
224 start_signal,
225 error_handler,
226 );
227 tokens.extend(source_tokens);
228
229 let stream = match flow_control_probe {
230 Some(probe) => stream.probe_notify_with(vec![probe]),
231 None => stream,
232 };
233
234 stream.leave()
235 });
236
237 let (stream, txns_tokens) = match metadata.txns_shard {
242 Some(txns_shard) => txns_progress::<SourceData, (), Timestamp, i64, _, TxnsCodecRow, _, _>(
243 stream,
244 &source_id.to_string(),
245 txns_ctx,
246 move || {
247 let (c, l) = (
248 Arc::clone(&persist_clients),
249 metadata.persist_location.clone(),
250 );
251 async move { c.open(l).await.expect("location is valid") }
252 },
253 txns_shard,
254 metadata.data_shard,
255 as_of
256 .expect("as_of is provided for table sources")
257 .into_option()
258 .expect("shard is not closed"),
259 until,
260 Arc::new(metadata.relation_desc),
261 Arc::new(UnitSchema),
262 ),
263 None => (stream, vec![]),
264 };
265 tokens.extend(txns_tokens);
266 let (ok_stream, err_stream) = stream.ok_err(|(d, t, r)| match d {
267 Ok(row) => Ok((row, t.0, r)),
268 Err(err) => Err((err, t.0, r)),
269 });
270 (ok_stream, err_stream, tokens)
271}
272
273type RefinedScope<'g, G> = Child<'g, G, (<G as ScopeParent>::Timestamp, Subtime)>;
274
275#[allow(clippy::needless_borrow)]
282pub fn persist_source_core<'g, G>(
283 scope: &RefinedScope<'g, G>,
284 source_id: GlobalId,
285 persist_clients: Arc<PersistClientCache>,
286 metadata: CollectionMetadata,
287 read_schema: Option<RelationDesc>,
288 as_of: Option<Antichain<Timestamp>>,
289 snapshot_mode: SnapshotMode,
290 until: Antichain<Timestamp>,
291 map_filter_project: Option<&mut MfpPlan>,
292 flow_control: Option<FlowControl<RefinedScope<'g, G>>>,
293 listen_sleep: Option<impl Fn() -> RetryParameters + 'static>,
295 start_signal: impl Future<Output = ()> + 'static,
296 error_handler: ErrorHandler,
297) -> (
298 Stream<
299 RefinedScope<'g, G>,
300 Vec<(
301 Result<Row, DataflowError>,
302 (mz_repr::Timestamp, Subtime),
303 Diff,
304 )>,
305 >,
306 Vec<PressOnDropButton>,
307)
308where
309 G: Scope<Timestamp = mz_repr::Timestamp>,
310{
311 let cfg = persist_clients.cfg().clone();
312 let name = source_id.to_string();
313 let filter_plan = map_filter_project.as_ref().map(|p| (*p).clone());
314
315 let read_desc = match read_schema {
317 Some(desc) => desc,
318 None => metadata.relation_desc,
319 };
320
321 let desc_transformer = match flow_control {
322 Some(flow_control) => Some(move |mut scope: _, descs: Stream<_, _>, chosen_worker| {
323 let (stream, token) = backpressure(
324 &mut scope,
325 &format!("backpressure({source_id})"),
326 descs,
327 flow_control,
328 chosen_worker,
329 None,
330 );
331 (stream, vec![token])
332 }),
333 None => None,
334 };
335
336 let metrics = Arc::clone(persist_clients.metrics());
337 let filter_name = name.clone();
338 let upper = until.as_option().cloned().unwrap_or(Timestamp::MAX);
342 let (fetched, token) = shard_source(
343 &mut scope.clone(),
344 &name,
345 move || {
346 let (c, l) = (
347 Arc::clone(&persist_clients),
348 metadata.persist_location.clone(),
349 );
350 async move { c.open(l).await.unwrap() }
351 },
352 metadata.data_shard,
353 as_of,
354 snapshot_mode,
355 until.clone(),
356 desc_transformer,
357 Arc::new(read_desc.clone()),
358 Arc::new(UnitSchema),
359 move |stats, frontier| {
360 let Some(lower) = frontier.as_option().copied() else {
361 return FilterResult::Discard;
364 };
365
366 if lower > upper {
367 return FilterResult::Discard;
370 }
371
372 let time_range =
373 ResultSpec::value_between(Datum::MzTimestamp(lower), Datum::MzTimestamp(upper));
374 if let Some(plan) = &filter_plan {
375 let metrics = &metrics.pushdown.part_stats;
376 let stats = RelationPartStats::new(&filter_name, metrics, &read_desc, stats);
377 filter_result(&read_desc, time_range, stats, plan)
378 } else {
379 FilterResult::Keep
380 }
381 },
382 listen_sleep,
383 start_signal,
384 error_handler,
385 );
386 let rows = decode_and_mfp(cfg, fetched, &name, until, map_filter_project);
387 (rows, token)
388}
389
390fn filter_result(
391 relation_desc: &RelationDesc,
392 time_range: ResultSpec,
393 stats: RelationPartStats,
394 plan: &MfpPlan,
395) -> FilterResult {
396 let arena = RowArena::new();
397 let relation = ReprRelationType::from(relation_desc.typ());
398 let mut ranges = ColumnSpecs::new(&relation, &arena);
399 ranges.push_unmaterializable(UnmaterializableFunc::MzNow, time_range);
400
401 let may_error = stats.err_count().map_or(true, |count| count > 0);
402
403 for (pos, (idx, _, _)) in relation_desc.iter_all().enumerate() {
406 let result_spec = stats.col_stats(idx, &arena);
407 ranges.push_column(pos, result_spec);
408 }
409 let result = ranges.mfp_plan_filter(plan).range;
410 let may_error = may_error || result.may_fail();
411 let may_keep = result.may_contain(Datum::True);
412 let may_skip = result.may_contain(Datum::False) || result.may_contain(Datum::Null);
413 if relation_desc.len() == 0 && !may_error && !may_skip {
414 let Ok(mut key) = <RelationDesc as Schema<SourceData>>::encoder(relation_desc) else {
415 return FilterResult::Keep;
416 };
417 key.append(&SourceData(Ok(Row::default())));
418 let key = key.finish();
419 let Ok(mut val) = <UnitSchema as Schema<()>>::encoder(&UnitSchema) else {
420 return FilterResult::Keep;
421 };
422 val.append(&());
423 let val = val.finish();
424
425 FilterResult::ReplaceWith {
426 key: Arc::new(key),
427 val: Arc::new(val),
428 }
429 } else if may_error || may_keep {
430 FilterResult::Keep
431 } else {
432 FilterResult::Discard
433 }
434}
435
436pub fn decode_and_mfp<G>(
437 cfg: PersistConfig,
438 fetched: StreamVec<G, FetchedBlob<SourceData, (), Timestamp, StorageDiff>>,
439 name: &str,
440 until: Antichain<Timestamp>,
441 mut map_filter_project: Option<&mut MfpPlan>,
442) -> StreamVec<G, (Result<Row, DataflowError>, G::Timestamp, Diff)>
443where
444 G: Scope<Timestamp = (mz_repr::Timestamp, Subtime)>,
445{
446 let scope = fetched.scope();
447 let mut builder = OperatorBuilder::new(
448 format!("persist_source::decode_and_mfp({})", name),
449 scope.clone(),
450 );
451 let operator_info = builder.operator_info();
452
453 let mut fetched_input = builder.new_input(fetched, Pipeline);
454 let (updates_output, updates_stream) = builder.new_output();
455 let mut updates_output = OutputBuilder::from(updates_output);
456
457 let mut datum_vec = mz_repr::DatumVec::new();
459 let mut row_builder = Row::default();
460
461 let map_filter_project = map_filter_project.as_mut().map(|mfp| mfp.take());
463
464 builder.build(move |_caps| {
465 let name = name.to_owned();
466 let activations = scope.activations();
468 let activator = Activator::new(operator_info.address, activations);
469 let mut pending_work = std::collections::VecDeque::new();
471 let panic_on_audit_failure = STATS_AUDIT_PANIC.handle(&cfg);
472
473 move |_frontier| {
474 fetched_input.for_each(|time, data| {
475 let capability = time.retain(0);
476 for fetched_blob in data.drain(..) {
477 pending_work.push_back(PendingWork {
478 panic_on_audit_failure: panic_on_audit_failure.get(),
479 capability: capability.clone(),
480 part: PendingPart::Unparsed(fetched_blob),
481 })
482 }
483 });
484
485 let yield_fuel = cfg.storage_source_decode_fuel();
488 let yield_fn = |_, work| work >= yield_fuel;
489
490 let mut work = 0;
491 let start_time = Instant::now();
492 let mut output = updates_output.activate();
493 while !pending_work.is_empty() && !yield_fn(start_time, work) {
494 let done = pending_work.front_mut().unwrap().do_work(
495 &mut work,
496 &name,
497 start_time,
498 yield_fn,
499 &until,
500 map_filter_project.as_ref(),
501 &mut datum_vec,
502 &mut row_builder,
503 &mut output,
504 );
505 if done {
506 pending_work.pop_front();
507 }
508 }
509 if !pending_work.is_empty() {
510 activator.activate();
511 }
512 }
513 });
514
515 updates_stream
516}
517
518struct PendingWork {
520 panic_on_audit_failure: bool,
522 capability: Capability<(mz_repr::Timestamp, Subtime)>,
524 part: PendingPart,
526}
527
528enum PendingPart {
529 Unparsed(FetchedBlob<SourceData, (), Timestamp, StorageDiff>),
530 Parsed {
531 part: ShardSourcePart<SourceData, (), Timestamp, StorageDiff>,
532 },
533}
534
535impl PendingPart {
536 fn part_mut(&mut self) -> &mut FetchedPart<SourceData, (), Timestamp, StorageDiff> {
543 match self {
544 PendingPart::Unparsed(x) => {
545 *self = PendingPart::Parsed { part: x.parse() };
546 self.part_mut()
548 }
549 PendingPart::Parsed { part } => &mut part.part,
550 }
551 }
552}
553
554impl PendingWork {
555 fn do_work<YFn>(
558 &mut self,
559 work: &mut usize,
560 name: &str,
561 start_time: Instant,
562 yield_fn: YFn,
563 until: &Antichain<Timestamp>,
564 map_filter_project: Option<&MfpPlan>,
565 datum_vec: &mut DatumVec,
566 row_builder: &mut Row,
567 output: &mut OutputBuilderSession<
568 '_,
569 (mz_repr::Timestamp, Subtime),
570 ConsolidatingContainerBuilder<
571 Vec<(
572 Result<Row, DataflowError>,
573 (mz_repr::Timestamp, Subtime),
574 Diff,
575 )>,
576 >,
577 >,
578 ) -> bool
579 where
580 YFn: Fn(Instant, usize) -> bool,
581 {
582 let mut session = output.session_with_builder(&self.capability);
583 let fetched_part = self.part.part_mut();
584 let is_filter_pushdown_audit = fetched_part.is_filter_pushdown_audit();
585 let mut row_buf = None;
586 while let Some(((key, val), time, diff)) =
587 fetched_part.next_with_storage(&mut row_buf, &mut None)
588 {
589 if until.less_equal(&time) {
590 continue;
591 }
592 match (key, val) {
593 (SourceData(Ok(row)), ()) => {
594 if let Some(mfp) = map_filter_project {
595 *work += 1;
602 let arena = mz_repr::RowArena::new();
603 let mut datums_local = datum_vec.borrow_with(&row);
604 for result in mfp.evaluate(
605 &mut datums_local,
606 &arena,
607 time,
608 diff.into(),
609 |time| !until.less_equal(time),
610 row_builder,
611 ) {
612 if let Some(stats) = &is_filter_pushdown_audit {
616 sentry::with_scope(
620 |scope| {
621 scope
622 .set_tag("alert_id", "persist_pushdown_audit_violation")
623 },
624 || {
625 error!(
626 ?stats,
627 name,
628 ?mfp,
629 ?result,
630 "persist filter pushdown correctness violation!"
631 );
632 if self.panic_on_audit_failure {
633 panic!(
634 "persist filter pushdown correctness violation! {}",
635 name
636 );
637 }
638 },
639 );
640 }
641 match result {
642 Ok((row, time, diff)) => {
643 if !until.less_equal(&time) {
645 let mut emit_time = *self.capability.time();
646 emit_time.0 = time;
647 session.give((Ok(row), emit_time, diff));
648 *work += 1;
649 }
650 }
651 Err((err, time, diff)) => {
652 if !until.less_equal(&time) {
654 let mut emit_time = *self.capability.time();
655 emit_time.0 = time;
656 session.give((Err(err), emit_time, diff));
657 *work += 1;
658 }
659 }
660 }
661 }
662 drop(datums_local);
666 row_buf.replace(SourceData(Ok(row)));
667 } else {
668 let mut emit_time = *self.capability.time();
669 emit_time.0 = time;
670 session.give((Ok(row.clone()), emit_time, diff.into()));
672 row_buf.replace(SourceData(Ok(row)));
673 *work += 1;
674 }
675 }
676 (SourceData(Err(err)), ()) => {
677 let mut emit_time = *self.capability.time();
678 emit_time.0 = time;
679 session.give((Err(err), emit_time, diff.into()));
680 *work += 1;
681 }
682 }
683 if yield_fn(start_time, *work) {
684 return false;
685 }
686 }
687 true
688 }
689}
690
691pub trait Backpressureable: Clone + 'static {
693 fn byte_size(&self) -> usize;
695}
696
697impl<T: Clone + 'static> Backpressureable for (usize, ExchangeableBatchPart<T>) {
698 fn byte_size(&self) -> usize {
699 self.1.encoded_size_bytes()
700 }
701}
702
703#[derive(Debug)]
705pub struct FlowControl<G: Scope> {
706 pub progress_stream: StreamVec<G, Infallible>,
712 pub max_inflight_bytes: usize,
714 pub summary: <G::Timestamp as TimelyTimestamp>::Summary,
717
718 pub metrics: Option<BackpressureMetrics>,
720}
721
722pub fn backpressure<T, G, O>(
735 scope: &mut G,
736 name: &str,
737 data: StreamVec<G, O>,
738 flow_control: FlowControl<G>,
739 chosen_worker: usize,
740 probe: Option<UnboundedSender<(Antichain<(T, Subtime)>, usize, usize)>>,
742) -> (StreamVec<G, O>, PressOnDropButton)
743where
744 T: TimelyTimestamp + Lattice + Codec64 + TotalOrder,
745 G: Scope<Timestamp = (T, Subtime)>,
746 O: Backpressureable + std::fmt::Debug,
747{
748 let worker_index = scope.index();
749
750 let (flow_control_stream, flow_control_max_bytes, metrics) = (
751 flow_control.progress_stream,
752 flow_control.max_inflight_bytes,
753 flow_control.metrics,
754 );
755
756 let (handle, summaried_flow) = scope.feedback(flow_control.summary.clone());
761 flow_control_stream.connect_loop(handle);
762
763 let mut builder = AsyncOperatorBuilder::new(
764 format!("persist_source_backpressure({})", name),
765 scope.clone(),
766 );
767 let (data_output, data_stream) = builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
768
769 let mut data_input = builder.new_disconnected_input(data, Pipeline);
770 let mut flow_control_input = builder.new_disconnected_input(summaried_flow, Pipeline);
771
772 fn synthesize_frontiers<T: PartialOrder + Clone>(
774 mut frontier: Antichain<(T, Subtime)>,
775 mut time: (T, Subtime),
776 part_number: &mut u64,
777 ) -> (
778 (T, Subtime),
779 Antichain<(T, Subtime)>,
780 Antichain<(T, Subtime)>,
781 ) {
782 let mut next_frontier = frontier.clone();
783 time.1 = Subtime(*part_number);
784 frontier.insert(time.clone());
785 *part_number += 1;
786 let mut next_time = time.clone();
787 next_time.1 = Subtime(*part_number);
788 next_frontier.insert(next_time);
789 (time, frontier, next_frontier)
790 }
791
792 let data_input = async_stream::stream!({
795 let mut part_number = 0;
796 let mut parts: Vec<((T, Subtime), O)> = Vec::new();
797 loop {
798 match data_input.next().await {
799 None => {
800 let empty = Antichain::new();
801 parts.sort_by_key(|val| val.0.clone());
802 for (part_time, d) in parts.drain(..) {
803 let (part_time, frontier, next_frontier) = synthesize_frontiers(
804 empty.clone(),
805 part_time.clone(),
806 &mut part_number,
807 );
808 yield Either::Right((part_time, d, frontier, next_frontier))
809 }
810 break;
811 }
812 Some(Event::Data(time, data)) => {
813 for d in data {
814 parts.push((time.clone(), d));
815 }
816 }
817 Some(Event::Progress(prog)) => {
818 parts.sort_by_key(|val| val.0.clone());
819 for (part_time, d) in parts.extract_if(.., |p| !prog.less_equal(&p.0)) {
820 let (part_time, frontier, next_frontier) =
821 synthesize_frontiers(prog.clone(), part_time.clone(), &mut part_number);
822 yield Either::Right((part_time, d, frontier, next_frontier))
823 }
824 yield Either::Left(prog)
825 }
826 }
827 }
828 });
829 let shutdown_button = builder.build(move |caps| async move {
830 let mut cap_set = CapabilitySet::from_elem(caps.into_element());
832
833 let mut output_frontier = Antichain::from_elem(TimelyTimestamp::minimum());
835 let mut flow_control_frontier = Antichain::from_elem(TimelyTimestamp::minimum());
837
838 let mut inflight_parts = Vec::new();
840 let mut pending_parts = std::collections::VecDeque::new();
842
843 if worker_index != chosen_worker {
845 trace!(
846 "We are not the chosen worker ({}), exiting...",
847 chosen_worker
848 );
849 return;
850 }
851 tokio::pin!(data_input);
852 'emitting_parts: loop {
853 let inflight_bytes: usize = inflight_parts.iter().map(|(_, size)| size).sum();
856
857 if inflight_bytes < flow_control_max_bytes
865 || !PartialOrder::less_equal(&flow_control_frontier, &output_frontier)
866 {
867 let (time, part, next_frontier) =
868 if let Some((time, part, next_frontier)) = pending_parts.pop_front() {
869 (time, part, next_frontier)
870 } else {
871 match data_input.next().await {
872 Some(Either::Right((time, part, frontier, next_frontier))) => {
873 output_frontier = frontier;
878 cap_set.downgrade(output_frontier.iter());
879
880 if inflight_bytes >= flow_control_max_bytes
885 && !PartialOrder::less_than(
886 &output_frontier,
887 &flow_control_frontier,
888 )
889 {
890 pending_parts.push_back((time, part, next_frontier));
891 continue 'emitting_parts;
892 }
893 (time, part, next_frontier)
894 }
895 Some(Either::Left(prog)) => {
896 output_frontier = prog;
897 cap_set.downgrade(output_frontier.iter());
898 continue 'emitting_parts;
899 }
900 None => {
901 if pending_parts.is_empty() {
902 break 'emitting_parts;
903 } else {
904 continue 'emitting_parts;
905 }
906 }
907 }
908 };
909
910 let byte_size = part.byte_size();
911 if let Some(emission_ts) = flow_control.summary.results_in(&time) {
921 inflight_parts.push((emission_ts, byte_size));
922 }
923
924 data_output.give(&cap_set.delayed(&time), part);
927
928 if let Some(metrics) = &metrics {
929 metrics.emitted_bytes.inc_by(u64::cast_from(byte_size))
930 }
931
932 output_frontier = next_frontier;
933 cap_set.downgrade(output_frontier.iter())
934 } else {
935 if let Some(metrics) = &metrics {
936 metrics
937 .last_backpressured_bytes
938 .set(u64::cast_from(inflight_bytes))
939 }
940 let parts_count = inflight_parts.len();
941 let new_flow_control_frontier = match flow_control_input.next().await {
946 Some(Event::Progress(frontier)) => frontier,
947 Some(Event::Data(_, _)) => {
948 unreachable!("flow_control_input should not contain data")
949 }
950 None => Antichain::new(),
951 };
952
953 flow_control_frontier.clone_from(&new_flow_control_frontier);
955
956 let retired_parts = inflight_parts
958 .extract_if(.., |(ts, _size)| !flow_control_frontier.less_equal(ts));
959 let (retired_size, retired_count): (usize, usize) = retired_parts
960 .fold((0, 0), |(accum_size, accum_count), (_ts, size)| {
961 (accum_size + size, accum_count + 1)
962 });
963 trace!(
964 "returning {} parts with {} bytes, frontier: {:?}",
965 retired_count, retired_size, flow_control_frontier,
966 );
967
968 if let Some(metrics) = &metrics {
969 metrics.retired_bytes.inc_by(u64::cast_from(retired_size))
970 }
971
972 if let Some(probe) = probe.as_ref() {
974 let _ = probe.send((new_flow_control_frontier, parts_count, retired_count));
975 }
976 }
977 }
978 });
979 (data_stream, shutdown_button.press_on_drop())
980}
981
982#[cfg(test)]
983mod tests {
984 use timely::container::CapacityContainerBuilder;
985 use timely::dataflow::operators::{Enter, Probe};
986 use tokio::sync::mpsc::unbounded_channel;
987 use tokio::sync::oneshot;
988
989 use super::*;
990
991 #[mz_ore::test]
992 fn test_backpressure_non_granular() {
993 use Step::*;
994 backpressure_runner(
995 vec![(50, Part(101)), (50, Part(102)), (100, Part(1))],
996 100,
997 (1, Subtime(0)),
998 vec![
999 AssertOutputFrontier((50, Subtime(2))),
1002 AssertBackpressured {
1003 frontier: (1, Subtime(0)),
1004 inflight_parts: 1,
1005 retired_parts: 0,
1006 },
1007 AssertBackpressured {
1008 frontier: (51, Subtime(0)),
1009 inflight_parts: 1,
1010 retired_parts: 0,
1011 },
1012 ProcessXParts(2),
1013 AssertBackpressured {
1014 frontier: (101, Subtime(0)),
1015 inflight_parts: 2,
1016 retired_parts: 2,
1017 },
1018 AssertOutputFrontier((100, Subtime(3))),
1021 ],
1022 true,
1023 );
1024
1025 backpressure_runner(
1026 vec![
1027 (50, Part(10)),
1028 (50, Part(10)),
1029 (51, Part(100)),
1030 (52, Part(1000)),
1031 ],
1032 50,
1033 (1, Subtime(0)),
1034 vec![
1035 AssertOutputFrontier((51, Subtime(3))),
1037 AssertBackpressured {
1038 frontier: (1, Subtime(0)),
1039 inflight_parts: 3,
1040 retired_parts: 0,
1041 },
1042 ProcessXParts(3),
1043 AssertBackpressured {
1044 frontier: (52, Subtime(0)),
1045 inflight_parts: 3,
1046 retired_parts: 2,
1047 },
1048 AssertBackpressured {
1049 frontier: (53, Subtime(0)),
1050 inflight_parts: 1,
1051 retired_parts: 1,
1052 },
1053 AssertOutputFrontier((52, Subtime(4))),
1056 ],
1057 true,
1058 );
1059
1060 backpressure_runner(
1061 vec![
1062 (50, Part(98)),
1063 (50, Part(1)),
1064 (51, Part(10)),
1065 (52, Part(100)),
1066 (52, Part(10)),
1068 (52, Part(10)),
1069 (52, Part(10)),
1070 (52, Part(100)),
1071 (100, Part(100)),
1073 ],
1074 100,
1075 (1, Subtime(0)),
1076 vec![
1077 AssertOutputFrontier((51, Subtime(3))),
1078 AssertBackpressured {
1082 frontier: (1, Subtime(0)),
1083 inflight_parts: 3,
1084 retired_parts: 0,
1085 },
1086 AssertBackpressured {
1087 frontier: (51, Subtime(0)),
1088 inflight_parts: 3,
1089 retired_parts: 0,
1090 },
1091 ProcessXParts(1),
1092 AssertOutputFrontier((51, Subtime(3))),
1095 ProcessXParts(1),
1099 AssertOutputFrontier((52, Subtime(4))),
1100 AssertBackpressured {
1101 frontier: (52, Subtime(0)),
1102 inflight_parts: 3,
1103 retired_parts: 2,
1104 },
1105 ProcessXParts(1),
1109 AssertBackpressured {
1113 frontier: (53, Subtime(0)),
1114 inflight_parts: 2,
1115 retired_parts: 1,
1116 },
1117 ProcessXParts(5),
1119 AssertBackpressured {
1120 frontier: (101, Subtime(0)),
1121 inflight_parts: 5,
1122 retired_parts: 5,
1123 },
1124 AssertOutputFrontier((100, Subtime(9))),
1125 ],
1126 true,
1127 );
1128 }
1129
1130 #[mz_ore::test]
1131 fn test_backpressure_granular() {
1132 use Step::*;
1133 backpressure_runner(
1134 vec![(50, Part(101)), (50, Part(101))],
1135 100,
1136 (0, Subtime(1)),
1137 vec![
1138 AssertOutputFrontier((50, Subtime(1))),
1140 AssertBackpressured {
1143 frontier: (0, Subtime(1)),
1144 inflight_parts: 1,
1145 retired_parts: 0,
1146 },
1147 AssertBackpressured {
1148 frontier: (50, Subtime(1)),
1149 inflight_parts: 1,
1150 retired_parts: 0,
1151 },
1152 ProcessXParts(1),
1154 AssertBackpressured {
1156 frontier: (50, Subtime(2)),
1157 inflight_parts: 1,
1158 retired_parts: 1,
1159 },
1160 AssertOutputFrontier((50, Subtime(2))),
1162 ],
1163 false,
1164 );
1165
1166 backpressure_runner(
1167 vec![
1168 (50, Part(10)),
1169 (50, Part(10)),
1170 (51, Part(35)),
1171 (52, Part(100)),
1172 ],
1173 50,
1174 (0, Subtime(1)),
1175 vec![
1176 AssertOutputFrontier((51, Subtime(3))),
1178 AssertBackpressured {
1179 frontier: (0, Subtime(1)),
1180 inflight_parts: 3,
1181 retired_parts: 0,
1182 },
1183 AssertBackpressured {
1184 frontier: (50, Subtime(1)),
1185 inflight_parts: 3,
1186 retired_parts: 0,
1187 },
1188 ProcessXParts(1),
1190 AssertBackpressured {
1191 frontier: (50, Subtime(2)),
1192 inflight_parts: 3,
1193 retired_parts: 1,
1194 },
1195 AssertOutputFrontier((52, Subtime(4))),
1198 ProcessXParts(2),
1199 AssertBackpressured {
1200 frontier: (52, Subtime(4)),
1201 inflight_parts: 3,
1202 retired_parts: 2,
1203 },
1204 ],
1205 false,
1206 );
1207 }
1208
1209 type Time = (u64, Subtime);
1210 #[derive(Clone, Debug)]
1211 struct Part(usize);
1212 impl Backpressureable for Part {
1213 fn byte_size(&self) -> usize {
1214 self.0
1215 }
1216 }
1217
1218 enum Step {
1220 AssertOutputFrontier(Time),
1223 AssertBackpressured {
1227 frontier: Time,
1228 inflight_parts: usize,
1229 retired_parts: usize,
1230 },
1231 ProcessXParts(usize),
1233 }
1234
1235 fn backpressure_runner(
1237 input: Vec<(u64, Part)>,
1239 max_inflight_bytes: usize,
1241 summary: Time,
1243 steps: Vec<Step>,
1245 non_granular_consumer: bool,
1248 ) {
1249 timely::execute::execute_directly(move |worker| {
1250 let (
1251 backpressure_probe,
1252 consumer_tx,
1253 mut backpressure_status_rx,
1254 finalizer_tx,
1255 _token,
1256 ) =
1257 worker.dataflow::<u64, _, _>(|scope| {
1259 let (non_granular_feedback_handle, non_granular_feedback) =
1260 if non_granular_consumer {
1261 let (h, f) = scope.feedback(Default::default());
1262 (Some(h), Some(f))
1263 } else {
1264 (None, None)
1265 };
1266 let (
1267 backpressure_probe,
1268 consumer_tx,
1269 backpressure_status_rx,
1270 token,
1271 backpressured,
1272 finalizer_tx,
1273 ) = scope.scoped::<(u64, Subtime), _, _>("hybrid", |scope| {
1274 let (input, finalizer_tx) =
1275 iterator_operator(scope.clone(), input.into_iter());
1276
1277 let (flow_control, granular_feedback_handle) = if non_granular_consumer {
1278 (
1279 FlowControl {
1280 progress_stream: non_granular_feedback.unwrap().enter(scope),
1281 max_inflight_bytes,
1282 summary,
1283 metrics: None
1284 },
1285 None,
1286 )
1287 } else {
1288 let (granular_feedback_handle, granular_feedback) =
1289 scope.feedback(Default::default());
1290 (
1291 FlowControl {
1292 progress_stream: granular_feedback,
1293 max_inflight_bytes,
1294 summary,
1295 metrics: None,
1296 },
1297 Some(granular_feedback_handle),
1298 )
1299 };
1300
1301 let (backpressure_status_tx, backpressure_status_rx) = unbounded_channel();
1302
1303 let (backpressured, token) = backpressure(
1304 scope,
1305 "test",
1306 input,
1307 flow_control,
1308 0,
1309 Some(backpressure_status_tx),
1310 );
1311
1312 let tx = if !non_granular_consumer {
1314 Some(consumer_operator(
1315 scope.clone(),
1316 backpressured.clone(),
1317 granular_feedback_handle.unwrap(),
1318 ))
1319 } else {
1320 None
1321 };
1322
1323 let (probe_handle, backpressured) = backpressured.probe();
1324 (
1325 probe_handle,
1326 tx,
1327 backpressure_status_rx,
1328 token,
1329 backpressured.leave(),
1330 finalizer_tx,
1331 )
1332 });
1333
1334 let consumer_tx = if non_granular_consumer {
1336 consumer_operator(
1337 scope.clone(),
1338 backpressured,
1339 non_granular_feedback_handle.unwrap(),
1340 )
1341 } else {
1342 consumer_tx.unwrap()
1343 };
1344
1345 (
1346 backpressure_probe,
1347 consumer_tx,
1348 backpressure_status_rx,
1349 finalizer_tx,
1350 token,
1351 )
1352 });
1353
1354 use Step::*;
1355 for step in steps {
1356 match step {
1357 AssertOutputFrontier(time) => {
1358 eprintln!("checking advance to {time:?}");
1359 backpressure_probe.with_frontier(|front| {
1360 eprintln!("current backpressure output frontier: {front:?}");
1361 });
1362 while backpressure_probe.less_than(&time) {
1363 worker.step();
1364 backpressure_probe.with_frontier(|front| {
1365 eprintln!("current backpressure output frontier: {front:?}");
1366 });
1367 std::thread::sleep(std::time::Duration::from_millis(25));
1368 }
1369 }
1370 ProcessXParts(parts) => {
1371 eprintln!("processing {parts:?} parts");
1372 for _ in 0..parts {
1373 consumer_tx.send(()).unwrap();
1374 }
1375 }
1376 AssertBackpressured {
1377 frontier,
1378 inflight_parts,
1379 retired_parts,
1380 } => {
1381 let frontier = Antichain::from_elem(frontier);
1382 eprintln!(
1383 "asserting backpressured at {frontier:?}, with {inflight_parts:?} inflight parts \
1384 and {retired_parts:?} retired"
1385 );
1386 let (new_frontier, new_count, new_retired_count) = loop {
1387 if let Ok(val) = backpressure_status_rx.try_recv() {
1388 break val;
1389 }
1390 worker.step();
1391 std::thread::sleep(std::time::Duration::from_millis(25));
1392 };
1393 assert_eq!(
1394 (frontier, inflight_parts, retired_parts),
1395 (new_frontier, new_count, new_retired_count)
1396 );
1397 }
1398 }
1399 }
1400 let _ = finalizer_tx.send(());
1402 });
1403 }
1404
1405 fn iterator_operator<
1408 G: Scope<Timestamp = (u64, Subtime)>,
1409 I: Iterator<Item = (u64, Part)> + 'static,
1410 >(
1411 scope: G,
1412 mut input: I,
1413 ) -> (StreamVec<G, Part>, oneshot::Sender<()>) {
1414 let (finalizer_tx, finalizer_rx) = oneshot::channel();
1415 let mut iterator = AsyncOperatorBuilder::new("iterator".to_string(), scope);
1416 let (output_handle, output) = iterator.new_output::<CapacityContainerBuilder<Vec<Part>>>();
1417
1418 iterator.build(|mut caps| async move {
1419 let mut capability = Some(caps.pop().unwrap());
1420 let mut last = None;
1421 while let Some(element) = input.next() {
1422 let time = element.0.clone();
1423 let part = element.1;
1424 last = Some((time, Subtime(0)));
1425 output_handle.give(&capability.as_ref().unwrap().delayed(&last.unwrap()), part);
1426 }
1427 if let Some(last) = last {
1428 capability
1429 .as_mut()
1430 .unwrap()
1431 .downgrade(&(last.0 + 1, last.1));
1432 }
1433
1434 let _ = finalizer_rx.await;
1435 capability.take();
1436 });
1437
1438 (output, finalizer_tx)
1439 }
1440
1441 fn consumer_operator<G: Scope, O: Backpressureable + std::fmt::Debug>(
1445 scope: G,
1446 input: StreamVec<G, O>,
1447 feedback: timely::dataflow::operators::feedback::Handle<G, Vec<std::convert::Infallible>>,
1448 ) -> UnboundedSender<()> {
1449 let (tx, mut rx) = unbounded_channel::<()>();
1450 let mut consumer = AsyncOperatorBuilder::new("consumer".to_string(), scope);
1451 let (output_handle, output) =
1452 consumer.new_output::<CapacityContainerBuilder<Vec<std::convert::Infallible>>>();
1453 let mut input = consumer.new_input_for(input, Pipeline, &output_handle);
1454
1455 consumer.build(|_caps| async move {
1456 while let Some(()) = rx.recv().await {
1457 while let Some(Event::Progress(_)) = input.next().await {}
1459 }
1460 });
1461 output.connect_loop(feedback);
1462
1463 tx
1464 }
1465}