1use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
13use std::convert::Infallible;
14use std::fmt::Debug;
15use std::future::Future;
16use std::hash::Hash;
17use std::sync::Arc;
18use std::time::Instant;
19
20use differential_dataflow::lattice::Lattice;
21use futures::{StreamExt, future::Either};
22use mz_expr::{ColumnSpecs, Interpreter, MfpPlan, ResultSpec, UnmaterializableFunc};
23use mz_ore::cast::CastFrom;
24use mz_ore::collections::CollectionExt;
25use mz_persist_client::cache::PersistClientCache;
26use mz_persist_client::cfg::{PersistConfig, RetryParameters};
27use mz_persist_client::fetch::{ExchangeableBatchPart, ShardSourcePart};
28use mz_persist_client::fetch::{FetchedBlob, FetchedPart};
29use mz_persist_client::operators::shard_source::{
30 ErrorHandler, FilterResult, SnapshotMode, shard_source,
31};
32use mz_persist_client::stats::STATS_AUDIT_PANIC;
33use mz_persist_types::Codec64;
34use mz_persist_types::codec_impls::UnitSchema;
35use mz_persist_types::columnar::{ColumnEncoder, Schema};
36use mz_repr::{
37 Datum, DatumVec, Diff, GlobalId, RelationDesc, ReprRelationType, Row, RowArena, Timestamp,
38};
39use mz_storage_types::StorageDiff;
40use mz_storage_types::controller::{CollectionMetadata, TxnsCodecRow};
41use mz_storage_types::errors::DataflowError;
42use mz_storage_types::sources::SourceData;
43use mz_storage_types::stats::RelationPartStats;
44use mz_timely_util::builder_async::{
45 Event, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
46};
47use mz_timely_util::probe::ProbeNotify;
48use mz_txn_wal::operator::{TxnsContext, txns_progress};
49use serde::{Deserialize, Serialize};
50use timely::PartialOrder;
51use timely::container::CapacityContainerBuilder;
52use timely::dataflow::ScopeParent;
53use timely::dataflow::channels::pact::Pipeline;
54use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
55use timely::dataflow::operators::generic::{OutputBuilder, OutputBuilderSession};
56use timely::dataflow::operators::{Capability, Leave, OkErr};
57use timely::dataflow::operators::{CapabilitySet, ConnectLoop, Feedback};
58use timely::dataflow::scopes::Child;
59use timely::dataflow::{Scope, Stream, StreamVec};
60use timely::order::TotalOrder;
61use timely::progress::Antichain;
62use timely::progress::Timestamp as TimelyTimestamp;
63use timely::progress::timestamp::PathSummary;
64use timely::scheduling::Activator;
65use tokio::sync::mpsc::UnboundedSender;
66use tracing::{error, trace};
67
68use crate::metrics::BackpressureMetrics;
69
70#[derive(
78 Copy,
79 Clone,
80 PartialEq,
81 Default,
82 Eq,
83 PartialOrd,
84 Ord,
85 Debug,
86 Serialize,
87 Deserialize,
88 Hash
89)]
90pub struct Subtime(u64);
91
92impl PartialOrder for Subtime {
93 fn less_equal(&self, other: &Self) -> bool {
94 self.0.less_equal(&other.0)
95 }
96}
97
98impl TotalOrder for Subtime {}
99
100impl PathSummary<Subtime> for Subtime {
101 fn results_in(&self, src: &Subtime) -> Option<Subtime> {
102 self.0.results_in(&src.0).map(Subtime)
103 }
104
105 fn followed_by(&self, other: &Self) -> Option<Self> {
106 self.0.followed_by(&other.0).map(Subtime)
107 }
108}
109
110impl TimelyTimestamp for Subtime {
111 type Summary = Subtime;
112
113 fn minimum() -> Self {
114 Subtime(0)
115 }
116}
117
118impl Subtime {
119 pub const fn least_summary() -> Self {
121 Subtime(1)
122 }
123}
124
125pub fn persist_source<G>(
148 scope: &mut G,
149 source_id: GlobalId,
150 persist_clients: Arc<PersistClientCache>,
151 txns_ctx: &TxnsContext,
152 metadata: CollectionMetadata,
153 read_schema: Option<RelationDesc>,
154 as_of: Option<Antichain<Timestamp>>,
155 snapshot_mode: SnapshotMode,
156 until: Antichain<Timestamp>,
157 map_filter_project: Option<&mut MfpPlan>,
158 max_inflight_bytes: Option<usize>,
159 start_signal: impl Future<Output = ()> + 'static,
160 error_handler: ErrorHandler,
161) -> (
162 StreamVec<G, (Row, Timestamp, Diff)>,
163 StreamVec<G, (DataflowError, Timestamp, Diff)>,
164 Vec<PressOnDropButton>,
165)
166where
167 G: Scope<Timestamp = mz_repr::Timestamp>,
168{
169 let shard_metrics = persist_clients.shard_metrics(&metadata.data_shard, &source_id.to_string());
170
171 let mut tokens = vec![];
172
173 let stream = scope.scoped(&format!("granular_backpressure({})", source_id), |scope| {
174 let (flow_control, flow_control_probe) = match max_inflight_bytes {
175 Some(max_inflight_bytes) => {
176 let backpressure_metrics = BackpressureMetrics {
177 emitted_bytes: Arc::clone(&shard_metrics.backpressure_emitted_bytes),
178 last_backpressured_bytes: Arc::clone(
179 &shard_metrics.backpressure_last_backpressured_bytes,
180 ),
181 retired_bytes: Arc::clone(&shard_metrics.backpressure_retired_bytes),
182 };
183
184 let probe = mz_timely_util::probe::Handle::default();
185 let progress_stream = mz_timely_util::probe::source(
186 scope.clone(),
187 format!("decode_backpressure_probe({source_id})"),
188 probe.clone(),
189 );
190 let flow_control = FlowControl {
191 progress_stream,
192 max_inflight_bytes,
193 summary: (Default::default(), Subtime::least_summary()),
194 metrics: Some(backpressure_metrics),
195 };
196 (Some(flow_control), Some(probe))
197 }
198 None => (None, None),
199 };
200
201 let cfg = Arc::clone(&persist_clients.cfg().configs);
207 let subscribe_sleep = match metadata.txns_shard {
208 Some(_) => Some(move || mz_txn_wal::operator::txns_data_shard_retry_params(&cfg)),
209 None => None,
210 };
211
212 let (stream, source_tokens) = persist_source_core(
213 scope,
214 source_id,
215 Arc::clone(&persist_clients),
216 metadata.clone(),
217 read_schema,
218 as_of.clone(),
219 snapshot_mode,
220 until.clone(),
221 map_filter_project,
222 flow_control,
223 subscribe_sleep,
224 start_signal,
225 error_handler,
226 );
227 tokens.extend(source_tokens);
228
229 let stream = match flow_control_probe {
230 Some(probe) => stream.probe_notify_with(vec![probe]),
231 None => stream,
232 };
233
234 stream.leave()
235 });
236
237 let (stream, txns_tokens) = match metadata.txns_shard {
242 Some(txns_shard) => txns_progress::<SourceData, (), Timestamp, i64, _, TxnsCodecRow, _, _>(
243 stream,
244 &source_id.to_string(),
245 txns_ctx,
246 move || {
247 let (c, l) = (
248 Arc::clone(&persist_clients),
249 metadata.persist_location.clone(),
250 );
251 async move { c.open(l).await.expect("location is valid") }
252 },
253 txns_shard,
254 metadata.data_shard,
255 as_of
256 .expect("as_of is provided for table sources")
257 .into_option()
258 .expect("shard is not closed"),
259 until,
260 Arc::new(metadata.relation_desc),
261 Arc::new(UnitSchema),
262 ),
263 None => (stream, vec![]),
264 };
265 tokens.extend(txns_tokens);
266 let (ok_stream, err_stream) = stream.ok_err(|(d, t, r)| match d {
267 Ok(row) => Ok((row, t.0, r)),
268 Err(err) => Err((err, t.0, r)),
269 });
270 (ok_stream, err_stream, tokens)
271}
272
273type RefinedScope<'g, G> = Child<'g, G, (<G as ScopeParent>::Timestamp, Subtime)>;
274
275#[allow(clippy::needless_borrow)]
282pub fn persist_source_core<'g, G>(
283 scope: &RefinedScope<'g, G>,
284 source_id: GlobalId,
285 persist_clients: Arc<PersistClientCache>,
286 metadata: CollectionMetadata,
287 read_schema: Option<RelationDesc>,
288 as_of: Option<Antichain<Timestamp>>,
289 snapshot_mode: SnapshotMode,
290 until: Antichain<Timestamp>,
291 map_filter_project: Option<&mut MfpPlan>,
292 flow_control: Option<FlowControl<RefinedScope<'g, G>>>,
293 listen_sleep: Option<impl Fn() -> RetryParameters + 'static>,
295 start_signal: impl Future<Output = ()> + 'static,
296 error_handler: ErrorHandler,
297) -> (
298 Stream<
299 RefinedScope<'g, G>,
300 Vec<(
301 Result<Row, DataflowError>,
302 (mz_repr::Timestamp, Subtime),
303 Diff,
304 )>,
305 >,
306 Vec<PressOnDropButton>,
307)
308where
309 G: Scope<Timestamp = mz_repr::Timestamp>,
310{
311 let cfg = persist_clients.cfg().clone();
312 let name = source_id.to_string();
313 let map_filter_project = map_filter_project.filter(|p| !p.is_identity());
314 let filter_plan = map_filter_project.as_ref().map(|p| (*p).clone());
315
316 let read_desc = match read_schema {
318 Some(desc) => desc,
319 None => metadata.relation_desc,
320 };
321
322 let desc_transformer = match flow_control {
323 Some(flow_control) => Some(move |mut scope: _, descs: Stream<_, _>, chosen_worker| {
324 let (stream, token) = backpressure(
325 &mut scope,
326 &format!("backpressure({source_id})"),
327 descs,
328 flow_control,
329 chosen_worker,
330 None,
331 );
332 (stream, vec![token])
333 }),
334 None => None,
335 };
336
337 let metrics = Arc::clone(persist_clients.metrics());
338 let filter_name = name.clone();
339 let upper = until.as_option().cloned().unwrap_or(Timestamp::MAX);
343 let (fetched, token) = shard_source(
344 &mut scope.clone(),
345 &name,
346 move || {
347 let (c, l) = (
348 Arc::clone(&persist_clients),
349 metadata.persist_location.clone(),
350 );
351 async move { c.open(l).await.unwrap() }
352 },
353 metadata.data_shard,
354 as_of,
355 snapshot_mode,
356 until.clone(),
357 desc_transformer,
358 Arc::new(read_desc.clone()),
359 Arc::new(UnitSchema),
360 move |stats, frontier| {
361 let Some(lower) = frontier.as_option().copied() else {
362 return FilterResult::Discard;
365 };
366
367 if lower > upper {
368 return FilterResult::Discard;
371 }
372
373 let time_range =
374 ResultSpec::value_between(Datum::MzTimestamp(lower), Datum::MzTimestamp(upper));
375 if let Some(plan) = &filter_plan {
376 let metrics = &metrics.pushdown.part_stats;
377 let stats = RelationPartStats::new(&filter_name, metrics, &read_desc, stats);
378 filter_result(&read_desc, time_range, stats, plan)
379 } else {
380 FilterResult::Keep
381 }
382 },
383 listen_sleep,
384 start_signal,
385 error_handler,
386 );
387 let rows = decode_and_mfp(cfg, fetched, &name, until, map_filter_project);
388 (rows, token)
389}
390
391fn filter_result(
392 relation_desc: &RelationDesc,
393 time_range: ResultSpec,
394 stats: RelationPartStats,
395 plan: &MfpPlan,
396) -> FilterResult {
397 let arena = RowArena::new();
398 let relation = ReprRelationType::from(relation_desc.typ());
399 let mut ranges = ColumnSpecs::new(&relation, &arena);
400 ranges.push_unmaterializable(UnmaterializableFunc::MzNow, time_range);
401
402 let may_error = stats.err_count().map_or(true, |count| count > 0);
403
404 for (pos, (idx, _, _)) in relation_desc.iter_all().enumerate() {
407 let result_spec = stats.col_stats(idx, &arena);
408 ranges.push_column(pos, result_spec);
409 }
410 let result = ranges.mfp_plan_filter(plan).range;
411 let may_error = may_error || result.may_fail();
412 let may_keep = result.may_contain(Datum::True);
413 let may_skip = result.may_contain(Datum::False) || result.may_contain(Datum::Null);
414 if relation_desc.len() == 0 && !may_error && !may_skip {
415 let Ok(mut key) = <RelationDesc as Schema<SourceData>>::encoder(relation_desc) else {
416 return FilterResult::Keep;
417 };
418 key.append(&SourceData(Ok(Row::default())));
419 let key = key.finish();
420 let Ok(mut val) = <UnitSchema as Schema<()>>::encoder(&UnitSchema) else {
421 return FilterResult::Keep;
422 };
423 val.append(&());
424 let val = val.finish();
425
426 FilterResult::ReplaceWith {
427 key: Arc::new(key),
428 val: Arc::new(val),
429 }
430 } else if may_error || may_keep {
431 FilterResult::Keep
432 } else {
433 FilterResult::Discard
434 }
435}
436
437pub fn decode_and_mfp<G>(
438 cfg: PersistConfig,
439 fetched: StreamVec<G, FetchedBlob<SourceData, (), Timestamp, StorageDiff>>,
440 name: &str,
441 until: Antichain<Timestamp>,
442 mut map_filter_project: Option<&mut MfpPlan>,
443) -> StreamVec<G, (Result<Row, DataflowError>, G::Timestamp, Diff)>
444where
445 G: Scope<Timestamp = (mz_repr::Timestamp, Subtime)>,
446{
447 let scope = fetched.scope();
448 let mut builder = OperatorBuilder::new(
449 format!("persist_source::decode_and_mfp({})", name),
450 scope.clone(),
451 );
452 let operator_info = builder.operator_info();
453
454 let mut fetched_input = builder.new_input(fetched, Pipeline);
455 let (updates_output, updates_stream) = builder.new_output();
456 let mut updates_output = OutputBuilder::from(updates_output);
457
458 let mut datum_vec = mz_repr::DatumVec::new();
460 let mut row_builder = Row::default();
461
462 let map_filter_project = map_filter_project.as_mut().map(|mfp| mfp.take());
464
465 builder.build(move |_caps| {
466 let name = name.to_owned();
467 let activations = scope.activations();
469 let activator = Activator::new(operator_info.address, activations);
470 let mut pending_work = std::collections::VecDeque::new();
472 let panic_on_audit_failure = STATS_AUDIT_PANIC.handle(&cfg);
473
474 move |_frontier| {
475 fetched_input.for_each(|time, data| {
476 let capability = time.retain(0);
477 for fetched_blob in data.drain(..) {
478 pending_work.push_back(PendingWork {
479 panic_on_audit_failure: panic_on_audit_failure.get(),
480 capability: capability.clone(),
481 part: PendingPart::Unparsed(fetched_blob),
482 })
483 }
484 });
485
486 let yield_fuel = cfg.storage_source_decode_fuel();
489 let yield_fn = |_, work| work >= yield_fuel;
490
491 let mut work = 0;
492 let start_time = Instant::now();
493 let mut output = updates_output.activate();
494 while !pending_work.is_empty() && !yield_fn(start_time, work) {
495 let done = pending_work.front_mut().unwrap().do_work(
496 &mut work,
497 &name,
498 start_time,
499 yield_fn,
500 &until,
501 map_filter_project.as_ref(),
502 &mut datum_vec,
503 &mut row_builder,
504 &mut output,
505 );
506 if done {
507 pending_work.pop_front();
508 }
509 }
510 if !pending_work.is_empty() {
511 activator.activate();
512 }
513 }
514 });
515
516 updates_stream
517}
518
519struct PendingWork {
521 panic_on_audit_failure: bool,
523 capability: Capability<(mz_repr::Timestamp, Subtime)>,
525 part: PendingPart,
527}
528
529enum PendingPart {
530 Unparsed(FetchedBlob<SourceData, (), Timestamp, StorageDiff>),
531 Parsed {
532 part: ShardSourcePart<SourceData, (), Timestamp, StorageDiff>,
533 },
534}
535
536impl PendingPart {
537 fn part_mut(&mut self) -> &mut FetchedPart<SourceData, (), Timestamp, StorageDiff> {
544 match self {
545 PendingPart::Unparsed(x) => {
546 *self = PendingPart::Parsed { part: x.parse() };
547 self.part_mut()
549 }
550 PendingPart::Parsed { part } => &mut part.part,
551 }
552 }
553}
554
555impl PendingWork {
556 fn do_work<YFn>(
559 &mut self,
560 work: &mut usize,
561 name: &str,
562 start_time: Instant,
563 yield_fn: YFn,
564 until: &Antichain<Timestamp>,
565 map_filter_project: Option<&MfpPlan>,
566 datum_vec: &mut DatumVec,
567 row_builder: &mut Row,
568 output: &mut OutputBuilderSession<
569 '_,
570 (mz_repr::Timestamp, Subtime),
571 ConsolidatingContainerBuilder<
572 Vec<(
573 Result<Row, DataflowError>,
574 (mz_repr::Timestamp, Subtime),
575 Diff,
576 )>,
577 >,
578 >,
579 ) -> bool
580 where
581 YFn: Fn(Instant, usize) -> bool,
582 {
583 let mut session = output.session_with_builder(&self.capability);
584 let fetched_part = self.part.part_mut();
585 let is_filter_pushdown_audit = fetched_part.is_filter_pushdown_audit();
586 let mut row_buf = None;
587 while let Some(((key, val), time, diff)) =
588 fetched_part.next_with_storage(&mut row_buf, &mut None)
589 {
590 if until.less_equal(&time) {
591 continue;
592 }
593 match (key, val) {
594 (SourceData(Ok(row)), ()) => {
595 if let Some(mfp) = map_filter_project {
596 *work += 1;
603 let arena = mz_repr::RowArena::new();
604 let mut datums_local = datum_vec.borrow_with(&row);
605 for result in mfp.evaluate(
606 &mut datums_local,
607 &arena,
608 time,
609 diff.into(),
610 |time| !until.less_equal(time),
611 row_builder,
612 ) {
613 if let Some(stats) = &is_filter_pushdown_audit {
617 sentry::with_scope(
621 |scope| {
622 scope
623 .set_tag("alert_id", "persist_pushdown_audit_violation")
624 },
625 || {
626 error!(
627 ?stats,
628 name,
629 ?mfp,
630 ?result,
631 "persist filter pushdown correctness violation!"
632 );
633 if self.panic_on_audit_failure {
634 panic!(
635 "persist filter pushdown correctness violation! {}",
636 name
637 );
638 }
639 },
640 );
641 }
642 match result {
643 Ok((row, time, diff)) => {
644 if !until.less_equal(&time) {
646 let mut emit_time = *self.capability.time();
647 emit_time.0 = time;
648 session.give((Ok(row), emit_time, diff));
649 *work += 1;
650 }
651 }
652 Err((err, time, diff)) => {
653 if !until.less_equal(&time) {
655 let mut emit_time = *self.capability.time();
656 emit_time.0 = time;
657 session.give((Err(err), emit_time, diff));
658 *work += 1;
659 }
660 }
661 }
662 }
663 drop(datums_local);
667 row_buf.replace(SourceData(Ok(row)));
668 } else {
669 let mut emit_time = *self.capability.time();
670 emit_time.0 = time;
671 session.give((Ok(row.clone()), emit_time, diff.into()));
673 row_buf.replace(SourceData(Ok(row)));
674 *work += 1;
675 }
676 }
677 (SourceData(Err(err)), ()) => {
678 let mut emit_time = *self.capability.time();
679 emit_time.0 = time;
680 session.give((Err(err), emit_time, diff.into()));
681 *work += 1;
682 }
683 }
684 if yield_fn(start_time, *work) {
685 return false;
686 }
687 }
688 true
689 }
690}
691
692pub trait Backpressureable: Clone + 'static {
694 fn byte_size(&self) -> usize;
696}
697
698impl<T: Clone + 'static> Backpressureable for (usize, ExchangeableBatchPart<T>) {
699 fn byte_size(&self) -> usize {
700 self.1.encoded_size_bytes()
701 }
702}
703
704#[derive(Debug)]
706pub struct FlowControl<G: Scope> {
707 pub progress_stream: StreamVec<G, Infallible>,
713 pub max_inflight_bytes: usize,
715 pub summary: <G::Timestamp as TimelyTimestamp>::Summary,
718
719 pub metrics: Option<BackpressureMetrics>,
721}
722
723pub fn backpressure<T, G, O>(
736 scope: &mut G,
737 name: &str,
738 data: StreamVec<G, O>,
739 flow_control: FlowControl<G>,
740 chosen_worker: usize,
741 probe: Option<UnboundedSender<(Antichain<(T, Subtime)>, usize, usize)>>,
743) -> (StreamVec<G, O>, PressOnDropButton)
744where
745 T: TimelyTimestamp + Lattice + Codec64 + TotalOrder,
746 G: Scope<Timestamp = (T, Subtime)>,
747 O: Backpressureable + std::fmt::Debug,
748{
749 let worker_index = scope.index();
750
751 let (flow_control_stream, flow_control_max_bytes, metrics) = (
752 flow_control.progress_stream,
753 flow_control.max_inflight_bytes,
754 flow_control.metrics,
755 );
756
757 let (handle, summaried_flow) = scope.feedback(flow_control.summary.clone());
762 flow_control_stream.connect_loop(handle);
763
764 let mut builder = AsyncOperatorBuilder::new(
765 format!("persist_source_backpressure({})", name),
766 scope.clone(),
767 );
768 let (data_output, data_stream) = builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
769
770 let mut data_input = builder.new_disconnected_input(data, Pipeline);
771 let mut flow_control_input = builder.new_disconnected_input(summaried_flow, Pipeline);
772
773 fn synthesize_frontiers<T: PartialOrder + Clone>(
775 mut frontier: Antichain<(T, Subtime)>,
776 mut time: (T, Subtime),
777 part_number: &mut u64,
778 ) -> (
779 (T, Subtime),
780 Antichain<(T, Subtime)>,
781 Antichain<(T, Subtime)>,
782 ) {
783 let mut next_frontier = frontier.clone();
784 time.1 = Subtime(*part_number);
785 frontier.insert(time.clone());
786 *part_number += 1;
787 let mut next_time = time.clone();
788 next_time.1 = Subtime(*part_number);
789 next_frontier.insert(next_time);
790 (time, frontier, next_frontier)
791 }
792
793 let data_input = async_stream::stream!({
796 let mut part_number = 0;
797 let mut parts: Vec<((T, Subtime), O)> = Vec::new();
798 loop {
799 match data_input.next().await {
800 None => {
801 let empty = Antichain::new();
802 parts.sort_by_key(|val| val.0.clone());
803 for (part_time, d) in parts.drain(..) {
804 let (part_time, frontier, next_frontier) = synthesize_frontiers(
805 empty.clone(),
806 part_time.clone(),
807 &mut part_number,
808 );
809 yield Either::Right((part_time, d, frontier, next_frontier))
810 }
811 break;
812 }
813 Some(Event::Data(time, data)) => {
814 for d in data {
815 parts.push((time.clone(), d));
816 }
817 }
818 Some(Event::Progress(prog)) => {
819 parts.sort_by_key(|val| val.0.clone());
820 for (part_time, d) in parts.extract_if(.., |p| !prog.less_equal(&p.0)) {
821 let (part_time, frontier, next_frontier) =
822 synthesize_frontiers(prog.clone(), part_time.clone(), &mut part_number);
823 yield Either::Right((part_time, d, frontier, next_frontier))
824 }
825 yield Either::Left(prog)
826 }
827 }
828 }
829 });
830 let shutdown_button = builder.build(move |caps| async move {
831 let mut cap_set = CapabilitySet::from_elem(caps.into_element());
833
834 let mut output_frontier = Antichain::from_elem(TimelyTimestamp::minimum());
836 let mut flow_control_frontier = Antichain::from_elem(TimelyTimestamp::minimum());
838
839 let mut inflight_parts = Vec::new();
841 let mut pending_parts = std::collections::VecDeque::new();
843
844 if worker_index != chosen_worker {
846 trace!(
847 "We are not the chosen worker ({}), exiting...",
848 chosen_worker
849 );
850 return;
851 }
852 tokio::pin!(data_input);
853 'emitting_parts: loop {
854 let inflight_bytes: usize = inflight_parts.iter().map(|(_, size)| size).sum();
857
858 if inflight_bytes < flow_control_max_bytes
866 || !PartialOrder::less_equal(&flow_control_frontier, &output_frontier)
867 {
868 let (time, part, next_frontier) =
869 if let Some((time, part, next_frontier)) = pending_parts.pop_front() {
870 (time, part, next_frontier)
871 } else {
872 match data_input.next().await {
873 Some(Either::Right((time, part, frontier, next_frontier))) => {
874 output_frontier = frontier;
879 cap_set.downgrade(output_frontier.iter());
880
881 if inflight_bytes >= flow_control_max_bytes
886 && !PartialOrder::less_than(
887 &output_frontier,
888 &flow_control_frontier,
889 )
890 {
891 pending_parts.push_back((time, part, next_frontier));
892 continue 'emitting_parts;
893 }
894 (time, part, next_frontier)
895 }
896 Some(Either::Left(prog)) => {
897 output_frontier = prog;
898 cap_set.downgrade(output_frontier.iter());
899 continue 'emitting_parts;
900 }
901 None => {
902 if pending_parts.is_empty() {
903 break 'emitting_parts;
904 } else {
905 continue 'emitting_parts;
906 }
907 }
908 }
909 };
910
911 let byte_size = part.byte_size();
912 if let Some(emission_ts) = flow_control.summary.results_in(&time) {
922 inflight_parts.push((emission_ts, byte_size));
923 }
924
925 data_output.give(&cap_set.delayed(&time), part);
928
929 if let Some(metrics) = &metrics {
930 metrics.emitted_bytes.inc_by(u64::cast_from(byte_size))
931 }
932
933 output_frontier = next_frontier;
934 cap_set.downgrade(output_frontier.iter())
935 } else {
936 if let Some(metrics) = &metrics {
937 metrics
938 .last_backpressured_bytes
939 .set(u64::cast_from(inflight_bytes))
940 }
941 let parts_count = inflight_parts.len();
942 let new_flow_control_frontier = match flow_control_input.next().await {
947 Some(Event::Progress(frontier)) => frontier,
948 Some(Event::Data(_, _)) => {
949 unreachable!("flow_control_input should not contain data")
950 }
951 None => Antichain::new(),
952 };
953
954 flow_control_frontier.clone_from(&new_flow_control_frontier);
956
957 let retired_parts = inflight_parts
959 .extract_if(.., |(ts, _size)| !flow_control_frontier.less_equal(ts));
960 let (retired_size, retired_count): (usize, usize) = retired_parts
961 .fold((0, 0), |(accum_size, accum_count), (_ts, size)| {
962 (accum_size + size, accum_count + 1)
963 });
964 trace!(
965 "returning {} parts with {} bytes, frontier: {:?}",
966 retired_count, retired_size, flow_control_frontier,
967 );
968
969 if let Some(metrics) = &metrics {
970 metrics.retired_bytes.inc_by(u64::cast_from(retired_size))
971 }
972
973 if let Some(probe) = probe.as_ref() {
975 let _ = probe.send((new_flow_control_frontier, parts_count, retired_count));
976 }
977 }
978 }
979 });
980 (data_stream, shutdown_button.press_on_drop())
981}
982
983#[cfg(test)]
984mod tests {
985 use timely::container::CapacityContainerBuilder;
986 use timely::dataflow::operators::{Enter, Probe};
987 use tokio::sync::mpsc::unbounded_channel;
988 use tokio::sync::oneshot;
989
990 use super::*;
991
992 #[mz_ore::test]
993 fn test_backpressure_non_granular() {
994 use Step::*;
995 backpressure_runner(
996 vec![(50, Part(101)), (50, Part(102)), (100, Part(1))],
997 100,
998 (1, Subtime(0)),
999 vec![
1000 AssertOutputFrontier((50, Subtime(2))),
1003 AssertBackpressured {
1004 frontier: (1, Subtime(0)),
1005 inflight_parts: 1,
1006 retired_parts: 0,
1007 },
1008 AssertBackpressured {
1009 frontier: (51, Subtime(0)),
1010 inflight_parts: 1,
1011 retired_parts: 0,
1012 },
1013 ProcessXParts(2),
1014 AssertBackpressured {
1015 frontier: (101, Subtime(0)),
1016 inflight_parts: 2,
1017 retired_parts: 2,
1018 },
1019 AssertOutputFrontier((100, Subtime(3))),
1022 ],
1023 true,
1024 );
1025
1026 backpressure_runner(
1027 vec![
1028 (50, Part(10)),
1029 (50, Part(10)),
1030 (51, Part(100)),
1031 (52, Part(1000)),
1032 ],
1033 50,
1034 (1, Subtime(0)),
1035 vec![
1036 AssertOutputFrontier((51, Subtime(3))),
1038 AssertBackpressured {
1039 frontier: (1, Subtime(0)),
1040 inflight_parts: 3,
1041 retired_parts: 0,
1042 },
1043 ProcessXParts(3),
1044 AssertBackpressured {
1045 frontier: (52, Subtime(0)),
1046 inflight_parts: 3,
1047 retired_parts: 2,
1048 },
1049 AssertBackpressured {
1050 frontier: (53, Subtime(0)),
1051 inflight_parts: 1,
1052 retired_parts: 1,
1053 },
1054 AssertOutputFrontier((52, Subtime(4))),
1057 ],
1058 true,
1059 );
1060
1061 backpressure_runner(
1062 vec![
1063 (50, Part(98)),
1064 (50, Part(1)),
1065 (51, Part(10)),
1066 (52, Part(100)),
1067 (52, Part(10)),
1069 (52, Part(10)),
1070 (52, Part(10)),
1071 (52, Part(100)),
1072 (100, Part(100)),
1074 ],
1075 100,
1076 (1, Subtime(0)),
1077 vec![
1078 AssertOutputFrontier((51, Subtime(3))),
1079 AssertBackpressured {
1083 frontier: (1, Subtime(0)),
1084 inflight_parts: 3,
1085 retired_parts: 0,
1086 },
1087 AssertBackpressured {
1088 frontier: (51, Subtime(0)),
1089 inflight_parts: 3,
1090 retired_parts: 0,
1091 },
1092 ProcessXParts(1),
1093 AssertOutputFrontier((51, Subtime(3))),
1096 ProcessXParts(1),
1100 AssertOutputFrontier((52, Subtime(4))),
1101 AssertBackpressured {
1102 frontier: (52, Subtime(0)),
1103 inflight_parts: 3,
1104 retired_parts: 2,
1105 },
1106 ProcessXParts(1),
1110 AssertBackpressured {
1114 frontier: (53, Subtime(0)),
1115 inflight_parts: 2,
1116 retired_parts: 1,
1117 },
1118 ProcessXParts(5),
1120 AssertBackpressured {
1121 frontier: (101, Subtime(0)),
1122 inflight_parts: 5,
1123 retired_parts: 5,
1124 },
1125 AssertOutputFrontier((100, Subtime(9))),
1126 ],
1127 true,
1128 );
1129 }
1130
1131 #[mz_ore::test]
1132 fn test_backpressure_granular() {
1133 use Step::*;
1134 backpressure_runner(
1135 vec![(50, Part(101)), (50, Part(101))],
1136 100,
1137 (0, Subtime(1)),
1138 vec![
1139 AssertOutputFrontier((50, Subtime(1))),
1141 AssertBackpressured {
1144 frontier: (0, Subtime(1)),
1145 inflight_parts: 1,
1146 retired_parts: 0,
1147 },
1148 AssertBackpressured {
1149 frontier: (50, Subtime(1)),
1150 inflight_parts: 1,
1151 retired_parts: 0,
1152 },
1153 ProcessXParts(1),
1155 AssertBackpressured {
1157 frontier: (50, Subtime(2)),
1158 inflight_parts: 1,
1159 retired_parts: 1,
1160 },
1161 AssertOutputFrontier((50, Subtime(2))),
1163 ],
1164 false,
1165 );
1166
1167 backpressure_runner(
1168 vec![
1169 (50, Part(10)),
1170 (50, Part(10)),
1171 (51, Part(35)),
1172 (52, Part(100)),
1173 ],
1174 50,
1175 (0, Subtime(1)),
1176 vec![
1177 AssertOutputFrontier((51, Subtime(3))),
1179 AssertBackpressured {
1180 frontier: (0, Subtime(1)),
1181 inflight_parts: 3,
1182 retired_parts: 0,
1183 },
1184 AssertBackpressured {
1185 frontier: (50, Subtime(1)),
1186 inflight_parts: 3,
1187 retired_parts: 0,
1188 },
1189 ProcessXParts(1),
1191 AssertBackpressured {
1192 frontier: (50, Subtime(2)),
1193 inflight_parts: 3,
1194 retired_parts: 1,
1195 },
1196 AssertOutputFrontier((52, Subtime(4))),
1199 ProcessXParts(2),
1200 AssertBackpressured {
1201 frontier: (52, Subtime(4)),
1202 inflight_parts: 3,
1203 retired_parts: 2,
1204 },
1205 ],
1206 false,
1207 );
1208 }
1209
1210 type Time = (u64, Subtime);
1211 #[derive(Clone, Debug)]
1212 struct Part(usize);
1213 impl Backpressureable for Part {
1214 fn byte_size(&self) -> usize {
1215 self.0
1216 }
1217 }
1218
1219 enum Step {
1221 AssertOutputFrontier(Time),
1224 AssertBackpressured {
1228 frontier: Time,
1229 inflight_parts: usize,
1230 retired_parts: usize,
1231 },
1232 ProcessXParts(usize),
1234 }
1235
1236 fn backpressure_runner(
1238 input: Vec<(u64, Part)>,
1240 max_inflight_bytes: usize,
1242 summary: Time,
1244 steps: Vec<Step>,
1246 non_granular_consumer: bool,
1249 ) {
1250 timely::execute::execute_directly(move |worker| {
1251 let (
1252 backpressure_probe,
1253 consumer_tx,
1254 mut backpressure_status_rx,
1255 finalizer_tx,
1256 _token,
1257 ) =
1258 worker.dataflow::<u64, _, _>(|scope| {
1260 let (non_granular_feedback_handle, non_granular_feedback) =
1261 if non_granular_consumer {
1262 let (h, f) = scope.feedback(Default::default());
1263 (Some(h), Some(f))
1264 } else {
1265 (None, None)
1266 };
1267 let (
1268 backpressure_probe,
1269 consumer_tx,
1270 backpressure_status_rx,
1271 token,
1272 backpressured,
1273 finalizer_tx,
1274 ) = scope.scoped::<(u64, Subtime), _, _>("hybrid", |scope| {
1275 let (input, finalizer_tx) =
1276 iterator_operator(scope.clone(), input.into_iter());
1277
1278 let (flow_control, granular_feedback_handle) = if non_granular_consumer {
1279 (
1280 FlowControl {
1281 progress_stream: non_granular_feedback.unwrap().enter(scope),
1282 max_inflight_bytes,
1283 summary,
1284 metrics: None
1285 },
1286 None,
1287 )
1288 } else {
1289 let (granular_feedback_handle, granular_feedback) =
1290 scope.feedback(Default::default());
1291 (
1292 FlowControl {
1293 progress_stream: granular_feedback,
1294 max_inflight_bytes,
1295 summary,
1296 metrics: None,
1297 },
1298 Some(granular_feedback_handle),
1299 )
1300 };
1301
1302 let (backpressure_status_tx, backpressure_status_rx) = unbounded_channel();
1303
1304 let (backpressured, token) = backpressure(
1305 scope,
1306 "test",
1307 input,
1308 flow_control,
1309 0,
1310 Some(backpressure_status_tx),
1311 );
1312
1313 let tx = if !non_granular_consumer {
1315 Some(consumer_operator(
1316 scope.clone(),
1317 backpressured.clone(),
1318 granular_feedback_handle.unwrap(),
1319 ))
1320 } else {
1321 None
1322 };
1323
1324 let (probe_handle, backpressured) = backpressured.probe();
1325 (
1326 probe_handle,
1327 tx,
1328 backpressure_status_rx,
1329 token,
1330 backpressured.leave(),
1331 finalizer_tx,
1332 )
1333 });
1334
1335 let consumer_tx = if non_granular_consumer {
1337 consumer_operator(
1338 scope.clone(),
1339 backpressured,
1340 non_granular_feedback_handle.unwrap(),
1341 )
1342 } else {
1343 consumer_tx.unwrap()
1344 };
1345
1346 (
1347 backpressure_probe,
1348 consumer_tx,
1349 backpressure_status_rx,
1350 finalizer_tx,
1351 token,
1352 )
1353 });
1354
1355 use Step::*;
1356 for step in steps {
1357 match step {
1358 AssertOutputFrontier(time) => {
1359 eprintln!("checking advance to {time:?}");
1360 backpressure_probe.with_frontier(|front| {
1361 eprintln!("current backpressure output frontier: {front:?}");
1362 });
1363 while backpressure_probe.less_than(&time) {
1364 worker.step();
1365 backpressure_probe.with_frontier(|front| {
1366 eprintln!("current backpressure output frontier: {front:?}");
1367 });
1368 std::thread::sleep(std::time::Duration::from_millis(25));
1369 }
1370 }
1371 ProcessXParts(parts) => {
1372 eprintln!("processing {parts:?} parts");
1373 for _ in 0..parts {
1374 consumer_tx.send(()).unwrap();
1375 }
1376 }
1377 AssertBackpressured {
1378 frontier,
1379 inflight_parts,
1380 retired_parts,
1381 } => {
1382 let frontier = Antichain::from_elem(frontier);
1383 eprintln!(
1384 "asserting backpressured at {frontier:?}, with {inflight_parts:?} inflight parts \
1385 and {retired_parts:?} retired"
1386 );
1387 let (new_frontier, new_count, new_retired_count) = loop {
1388 if let Ok(val) = backpressure_status_rx.try_recv() {
1389 break val;
1390 }
1391 worker.step();
1392 std::thread::sleep(std::time::Duration::from_millis(25));
1393 };
1394 assert_eq!(
1395 (frontier, inflight_parts, retired_parts),
1396 (new_frontier, new_count, new_retired_count)
1397 );
1398 }
1399 }
1400 }
1401 let _ = finalizer_tx.send(());
1403 });
1404 }
1405
1406 fn iterator_operator<
1409 G: Scope<Timestamp = (u64, Subtime)>,
1410 I: Iterator<Item = (u64, Part)> + 'static,
1411 >(
1412 scope: G,
1413 mut input: I,
1414 ) -> (StreamVec<G, Part>, oneshot::Sender<()>) {
1415 let (finalizer_tx, finalizer_rx) = oneshot::channel();
1416 let mut iterator = AsyncOperatorBuilder::new("iterator".to_string(), scope);
1417 let (output_handle, output) = iterator.new_output::<CapacityContainerBuilder<Vec<Part>>>();
1418
1419 iterator.build(|mut caps| async move {
1420 let mut capability = Some(caps.pop().unwrap());
1421 let mut last = None;
1422 while let Some(element) = input.next() {
1423 let time = element.0.clone();
1424 let part = element.1;
1425 last = Some((time, Subtime(0)));
1426 output_handle.give(&capability.as_ref().unwrap().delayed(&last.unwrap()), part);
1427 }
1428 if let Some(last) = last {
1429 capability
1430 .as_mut()
1431 .unwrap()
1432 .downgrade(&(last.0 + 1, last.1));
1433 }
1434
1435 let _ = finalizer_rx.await;
1436 capability.take();
1437 });
1438
1439 (output, finalizer_tx)
1440 }
1441
1442 fn consumer_operator<G: Scope, O: Backpressureable + std::fmt::Debug>(
1446 scope: G,
1447 input: StreamVec<G, O>,
1448 feedback: timely::dataflow::operators::feedback::Handle<G, Vec<std::convert::Infallible>>,
1449 ) -> UnboundedSender<()> {
1450 let (tx, mut rx) = unbounded_channel::<()>();
1451 let mut consumer = AsyncOperatorBuilder::new("consumer".to_string(), scope);
1452 let (output_handle, output) =
1453 consumer.new_output::<CapacityContainerBuilder<Vec<std::convert::Infallible>>>();
1454 let mut input = consumer.new_input_for(input, Pipeline, &output_handle);
1455
1456 consumer.build(|_caps| async move {
1457 while let Some(()) = rx.recv().await {
1458 while let Some(Event::Progress(_)) = input.next().await {}
1460 }
1461 });
1462 output.connect_loop(feedback);
1463
1464 tx
1465 }
1466}