1use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
13use std::convert::Infallible;
14use std::fmt::Debug;
15use std::future::Future;
16use std::hash::Hash;
17use std::sync::Arc;
18use std::time::Instant;
19
20use differential_dataflow::lattice::Lattice;
21use futures::{StreamExt, future::Either};
22use mz_expr::{ColumnSpecs, Interpreter, MfpPlan, ResultSpec, UnmaterializableFunc};
23use mz_ore::cast::CastFrom;
24use mz_ore::collections::CollectionExt;
25use mz_persist_client::cache::PersistClientCache;
26use mz_persist_client::cfg::{PersistConfig, RetryParameters};
27use mz_persist_client::fetch::{ExchangeableBatchPart, ShardSourcePart};
28use mz_persist_client::fetch::{FetchedBlob, FetchedPart};
29use mz_persist_client::operators::shard_source::{
30 ErrorHandler, FilterResult, SnapshotMode, shard_source,
31};
32use mz_persist_client::stats::STATS_AUDIT_PANIC;
33use mz_persist_types::Codec64;
34use mz_persist_types::codec_impls::UnitSchema;
35use mz_persist_types::columnar::{ColumnEncoder, Schema};
36use mz_repr::{Datum, DatumVec, Diff, GlobalId, RelationDesc, Row, RowArena, Timestamp};
37use mz_storage_types::StorageDiff;
38use mz_storage_types::controller::{CollectionMetadata, TxnsCodecRow};
39use mz_storage_types::errors::DataflowError;
40use mz_storage_types::sources::SourceData;
41use mz_storage_types::stats::RelationPartStats;
42use mz_timely_util::builder_async::{
43 Event, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
44};
45use mz_timely_util::probe::ProbeNotify;
46use mz_txn_wal::operator::{TxnsContext, txns_progress};
47use serde::{Deserialize, Serialize};
48use timely::PartialOrder;
49use timely::container::CapacityContainerBuilder;
50use timely::dataflow::ScopeParent;
51use timely::dataflow::channels::pact::Pipeline;
52use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
53use timely::dataflow::operators::generic::{OutputBuilder, OutputBuilderSession};
54use timely::dataflow::operators::{Capability, Leave, OkErr};
55use timely::dataflow::operators::{CapabilitySet, ConnectLoop, Feedback};
56use timely::dataflow::scopes::Child;
57use timely::dataflow::{Scope, Stream};
58use timely::order::TotalOrder;
59use timely::progress::Antichain;
60use timely::progress::Timestamp as TimelyTimestamp;
61use timely::progress::timestamp::PathSummary;
62use timely::scheduling::Activator;
63use tokio::sync::mpsc::UnboundedSender;
64use tracing::{error, trace};
65
66use crate::metrics::BackpressureMetrics;
67
68#[derive(
76 Copy, Clone, PartialEq, Default, Eq, PartialOrd, Ord, Debug, Serialize, Deserialize, Hash,
77)]
78pub struct Subtime(u64);
79
80impl PartialOrder for Subtime {
81 fn less_equal(&self, other: &Self) -> bool {
82 self.0.less_equal(&other.0)
83 }
84}
85
86impl TotalOrder for Subtime {}
87
88impl PathSummary<Subtime> for Subtime {
89 fn results_in(&self, src: &Subtime) -> Option<Subtime> {
90 self.0.results_in(&src.0).map(Subtime)
91 }
92
93 fn followed_by(&self, other: &Self) -> Option<Self> {
94 self.0.followed_by(&other.0).map(Subtime)
95 }
96}
97
98impl TimelyTimestamp for Subtime {
99 type Summary = Subtime;
100
101 fn minimum() -> Self {
102 Subtime(0)
103 }
104}
105
106impl Subtime {
107 pub const fn least_summary() -> Self {
109 Subtime(1)
110 }
111}
112
113pub fn persist_source<G>(
136 scope: &mut G,
137 source_id: GlobalId,
138 persist_clients: Arc<PersistClientCache>,
139 txns_ctx: &TxnsContext,
140 metadata: CollectionMetadata,
141 read_schema: Option<RelationDesc>,
142 as_of: Option<Antichain<Timestamp>>,
143 snapshot_mode: SnapshotMode,
144 until: Antichain<Timestamp>,
145 map_filter_project: Option<&mut MfpPlan>,
146 max_inflight_bytes: Option<usize>,
147 start_signal: impl Future<Output = ()> + 'static,
148 error_handler: ErrorHandler,
149) -> (
150 Stream<G, (Row, Timestamp, Diff)>,
151 Stream<G, (DataflowError, Timestamp, Diff)>,
152 Vec<PressOnDropButton>,
153)
154where
155 G: Scope<Timestamp = mz_repr::Timestamp>,
156{
157 let shard_metrics = persist_clients.shard_metrics(&metadata.data_shard, &source_id.to_string());
158
159 let mut tokens = vec![];
160
161 let stream = scope.scoped(&format!("granular_backpressure({})", source_id), |scope| {
162 let (flow_control, flow_control_probe) = match max_inflight_bytes {
163 Some(max_inflight_bytes) => {
164 let backpressure_metrics = BackpressureMetrics {
165 emitted_bytes: Arc::clone(&shard_metrics.backpressure_emitted_bytes),
166 last_backpressured_bytes: Arc::clone(
167 &shard_metrics.backpressure_last_backpressured_bytes,
168 ),
169 retired_bytes: Arc::clone(&shard_metrics.backpressure_retired_bytes),
170 };
171
172 let probe = mz_timely_util::probe::Handle::default();
173 let progress_stream = mz_timely_util::probe::source(
174 scope.clone(),
175 format!("decode_backpressure_probe({source_id})"),
176 probe.clone(),
177 );
178 let flow_control = FlowControl {
179 progress_stream,
180 max_inflight_bytes,
181 summary: (Default::default(), Subtime::least_summary()),
182 metrics: Some(backpressure_metrics),
183 };
184 (Some(flow_control), Some(probe))
185 }
186 None => (None, None),
187 };
188
189 let cfg = Arc::clone(&persist_clients.cfg().configs);
195 let subscribe_sleep = match metadata.txns_shard {
196 Some(_) => Some(move || mz_txn_wal::operator::txns_data_shard_retry_params(&cfg)),
197 None => None,
198 };
199
200 let (stream, source_tokens) = persist_source_core(
201 scope,
202 source_id,
203 Arc::clone(&persist_clients),
204 metadata.clone(),
205 read_schema,
206 as_of.clone(),
207 snapshot_mode,
208 until.clone(),
209 map_filter_project,
210 flow_control,
211 subscribe_sleep,
212 start_signal,
213 error_handler,
214 );
215 tokens.extend(source_tokens);
216
217 let stream = match flow_control_probe {
218 Some(probe) => stream.probe_notify_with(vec![probe]),
219 None => stream,
220 };
221
222 stream.leave()
223 });
224
225 let (stream, txns_tokens) = match metadata.txns_shard {
230 Some(txns_shard) => txns_progress::<SourceData, (), Timestamp, i64, _, TxnsCodecRow, _, _>(
231 stream,
232 &source_id.to_string(),
233 txns_ctx,
234 move || {
235 let (c, l) = (
236 Arc::clone(&persist_clients),
237 metadata.persist_location.clone(),
238 );
239 async move { c.open(l).await.expect("location is valid") }
240 },
241 txns_shard,
242 metadata.data_shard,
243 as_of
244 .expect("as_of is provided for table sources")
245 .into_option()
246 .expect("shard is not closed"),
247 until,
248 Arc::new(metadata.relation_desc),
249 Arc::new(UnitSchema),
250 ),
251 None => (stream, vec![]),
252 };
253 tokens.extend(txns_tokens);
254 let (ok_stream, err_stream) = stream.ok_err(|(d, t, r)| match d {
255 Ok(row) => Ok((row, t.0, r)),
256 Err(err) => Err((err, t.0, r)),
257 });
258 (ok_stream, err_stream, tokens)
259}
260
261type RefinedScope<'g, G> = Child<'g, G, (<G as ScopeParent>::Timestamp, Subtime)>;
262
263#[allow(clippy::needless_borrow)]
270pub fn persist_source_core<'g, G>(
271 scope: &RefinedScope<'g, G>,
272 source_id: GlobalId,
273 persist_clients: Arc<PersistClientCache>,
274 metadata: CollectionMetadata,
275 read_schema: Option<RelationDesc>,
276 as_of: Option<Antichain<Timestamp>>,
277 snapshot_mode: SnapshotMode,
278 until: Antichain<Timestamp>,
279 map_filter_project: Option<&mut MfpPlan>,
280 flow_control: Option<FlowControl<RefinedScope<'g, G>>>,
281 listen_sleep: Option<impl Fn() -> RetryParameters + 'static>,
283 start_signal: impl Future<Output = ()> + 'static,
284 error_handler: ErrorHandler,
285) -> (
286 Stream<
287 RefinedScope<'g, G>,
288 (
289 Result<Row, DataflowError>,
290 (mz_repr::Timestamp, Subtime),
291 Diff,
292 ),
293 >,
294 Vec<PressOnDropButton>,
295)
296where
297 G: Scope<Timestamp = mz_repr::Timestamp>,
298{
299 let cfg = persist_clients.cfg().clone();
300 let name = source_id.to_string();
301 let filter_plan = map_filter_project.as_ref().map(|p| (*p).clone());
302
303 let read_desc = match read_schema {
305 Some(desc) => desc,
306 None => metadata.relation_desc,
307 };
308
309 let desc_transformer = match flow_control {
310 Some(flow_control) => Some(move |mut scope: _, descs: &Stream<_, _>, chosen_worker| {
311 let (stream, token) = backpressure(
312 &mut scope,
313 &format!("backpressure({source_id})"),
314 descs,
315 flow_control,
316 chosen_worker,
317 None,
318 );
319 (stream, vec![token])
320 }),
321 None => None,
322 };
323
324 let metrics = Arc::clone(persist_clients.metrics());
325 let filter_name = name.clone();
326 let upper = until.as_option().cloned().unwrap_or(Timestamp::MAX);
330 let (fetched, token) = shard_source(
331 &mut scope.clone(),
332 &name,
333 move || {
334 let (c, l) = (
335 Arc::clone(&persist_clients),
336 metadata.persist_location.clone(),
337 );
338 async move { c.open(l).await.unwrap() }
339 },
340 metadata.data_shard,
341 as_of,
342 snapshot_mode,
343 until.clone(),
344 desc_transformer,
345 Arc::new(read_desc.clone()),
346 Arc::new(UnitSchema),
347 move |stats, frontier| {
348 let Some(lower) = frontier.as_option().copied() else {
349 return FilterResult::Discard;
352 };
353
354 if lower > upper {
355 return FilterResult::Discard;
358 }
359
360 let time_range =
361 ResultSpec::value_between(Datum::MzTimestamp(lower), Datum::MzTimestamp(upper));
362 if let Some(plan) = &filter_plan {
363 let metrics = &metrics.pushdown.part_stats;
364 let stats = RelationPartStats::new(&filter_name, metrics, &read_desc, stats);
365 filter_result(&read_desc, time_range, stats, plan)
366 } else {
367 FilterResult::Keep
368 }
369 },
370 listen_sleep,
371 start_signal,
372 error_handler,
373 );
374 let rows = decode_and_mfp(cfg, &fetched, &name, until, map_filter_project);
375 (rows, token)
376}
377
378fn filter_result(
379 relation_desc: &RelationDesc,
380 time_range: ResultSpec,
381 stats: RelationPartStats,
382 plan: &MfpPlan,
383) -> FilterResult {
384 let arena = RowArena::new();
385 let mut ranges = ColumnSpecs::new(relation_desc.typ(), &arena);
386 ranges.push_unmaterializable(UnmaterializableFunc::MzNow, time_range);
387
388 let may_error = stats.err_count().map_or(true, |count| count > 0);
389
390 for (pos, (idx, _, _)) in relation_desc.iter_all().enumerate() {
393 let result_spec = stats.col_stats(idx, &arena);
394 ranges.push_column(pos, result_spec);
395 }
396 let result = ranges.mfp_plan_filter(plan).range;
397 let may_error = may_error || result.may_fail();
398 let may_keep = result.may_contain(Datum::True);
399 let may_skip = result.may_contain(Datum::False) || result.may_contain(Datum::Null);
400 if relation_desc.len() == 0 && !may_error && !may_skip {
401 let Ok(mut key) = <RelationDesc as Schema<SourceData>>::encoder(relation_desc) else {
402 return FilterResult::Keep;
403 };
404 key.append(&SourceData(Ok(Row::default())));
405 let key = key.finish();
406 let Ok(mut val) = <UnitSchema as Schema<()>>::encoder(&UnitSchema) else {
407 return FilterResult::Keep;
408 };
409 val.append(&());
410 let val = val.finish();
411
412 FilterResult::ReplaceWith {
413 key: Arc::new(key),
414 val: Arc::new(val),
415 }
416 } else if may_error || may_keep {
417 FilterResult::Keep
418 } else {
419 FilterResult::Discard
420 }
421}
422
423pub fn decode_and_mfp<G>(
424 cfg: PersistConfig,
425 fetched: &Stream<G, FetchedBlob<SourceData, (), Timestamp, StorageDiff>>,
426 name: &str,
427 until: Antichain<Timestamp>,
428 mut map_filter_project: Option<&mut MfpPlan>,
429) -> Stream<G, (Result<Row, DataflowError>, G::Timestamp, Diff)>
430where
431 G: Scope<Timestamp = (mz_repr::Timestamp, Subtime)>,
432{
433 let scope = fetched.scope();
434 let mut builder = OperatorBuilder::new(
435 format!("persist_source::decode_and_mfp({})", name),
436 scope.clone(),
437 );
438 let operator_info = builder.operator_info();
439
440 let mut fetched_input = builder.new_input(fetched, Pipeline);
441 let (updates_output, updates_stream) = builder.new_output();
442 let mut updates_output = OutputBuilder::from(updates_output);
443
444 let mut datum_vec = mz_repr::DatumVec::new();
446 let mut row_builder = Row::default();
447
448 let map_filter_project = map_filter_project.as_mut().map(|mfp| mfp.take());
450
451 builder.build(move |_caps| {
452 let name = name.to_owned();
453 let activations = scope.activations();
455 let activator = Activator::new(operator_info.address, activations);
456 let mut pending_work = std::collections::VecDeque::new();
458 let panic_on_audit_failure = STATS_AUDIT_PANIC.handle(&cfg);
459
460 move |_frontier| {
461 fetched_input.for_each(|time, data| {
462 let capability = time.retain();
463 for fetched_blob in data.drain(..) {
464 pending_work.push_back(PendingWork {
465 panic_on_audit_failure: panic_on_audit_failure.get(),
466 capability: capability.clone(),
467 part: PendingPart::Unparsed(fetched_blob),
468 })
469 }
470 });
471
472 let yield_fuel = cfg.storage_source_decode_fuel();
475 let yield_fn = |_, work| work >= yield_fuel;
476
477 let mut work = 0;
478 let start_time = Instant::now();
479 let mut output = updates_output.activate();
480 while !pending_work.is_empty() && !yield_fn(start_time, work) {
481 let done = pending_work.front_mut().unwrap().do_work(
482 &mut work,
483 &name,
484 start_time,
485 yield_fn,
486 &until,
487 map_filter_project.as_ref(),
488 &mut datum_vec,
489 &mut row_builder,
490 &mut output,
491 );
492 if done {
493 pending_work.pop_front();
494 }
495 }
496 if !pending_work.is_empty() {
497 activator.activate();
498 }
499 }
500 });
501
502 updates_stream
503}
504
505struct PendingWork {
507 panic_on_audit_failure: bool,
509 capability: Capability<(mz_repr::Timestamp, Subtime)>,
511 part: PendingPart,
513}
514
515enum PendingPart {
516 Unparsed(FetchedBlob<SourceData, (), Timestamp, StorageDiff>),
517 Parsed {
518 part: ShardSourcePart<SourceData, (), Timestamp, StorageDiff>,
519 },
520}
521
522impl PendingPart {
523 fn part_mut(&mut self) -> &mut FetchedPart<SourceData, (), Timestamp, StorageDiff> {
530 match self {
531 PendingPart::Unparsed(x) => {
532 *self = PendingPart::Parsed { part: x.parse() };
533 self.part_mut()
535 }
536 PendingPart::Parsed { part } => &mut part.part,
537 }
538 }
539}
540
541impl PendingWork {
542 fn do_work<YFn>(
545 &mut self,
546 work: &mut usize,
547 name: &str,
548 start_time: Instant,
549 yield_fn: YFn,
550 until: &Antichain<Timestamp>,
551 map_filter_project: Option<&MfpPlan>,
552 datum_vec: &mut DatumVec,
553 row_builder: &mut Row,
554 output: &mut OutputBuilderSession<
555 '_,
556 (mz_repr::Timestamp, Subtime),
557 ConsolidatingContainerBuilder<
558 Vec<(
559 Result<Row, DataflowError>,
560 (mz_repr::Timestamp, Subtime),
561 Diff,
562 )>,
563 >,
564 >,
565 ) -> bool
566 where
567 YFn: Fn(Instant, usize) -> bool,
568 {
569 let mut session = output.session_with_builder(&self.capability);
570 let fetched_part = self.part.part_mut();
571 let is_filter_pushdown_audit = fetched_part.is_filter_pushdown_audit();
572 let mut row_buf = None;
573 while let Some(((key, val), time, diff)) =
574 fetched_part.next_with_storage(&mut row_buf, &mut None)
575 {
576 if until.less_equal(&time) {
577 continue;
578 }
579 match (key, val) {
580 (Ok(SourceData(Ok(row))), Ok(())) => {
581 if let Some(mfp) = map_filter_project {
582 *work += 1;
589 let arena = mz_repr::RowArena::new();
590 let mut datums_local = datum_vec.borrow_with(&row);
591 for result in mfp.evaluate(
592 &mut datums_local,
593 &arena,
594 time,
595 diff.into(),
596 |time| !until.less_equal(time),
597 row_builder,
598 ) {
599 if let Some(stats) = &is_filter_pushdown_audit {
603 sentry::with_scope(
607 |scope| {
608 scope
609 .set_tag("alert_id", "persist_pushdown_audit_violation")
610 },
611 || {
612 error!(
613 ?stats,
614 name,
615 ?mfp,
616 ?result,
617 "persist filter pushdown correctness violation!"
618 );
619 if self.panic_on_audit_failure {
620 panic!(
621 "persist filter pushdown correctness violation! {}",
622 name
623 );
624 }
625 },
626 );
627 }
628 match result {
629 Ok((row, time, diff)) => {
630 if !until.less_equal(&time) {
632 let mut emit_time = *self.capability.time();
633 emit_time.0 = time;
634 session.give((Ok(row), emit_time, diff));
635 *work += 1;
636 }
637 }
638 Err((err, time, diff)) => {
639 if !until.less_equal(&time) {
641 let mut emit_time = *self.capability.time();
642 emit_time.0 = time;
643 session.give((Err(err), emit_time, diff));
644 *work += 1;
645 }
646 }
647 }
648 }
649 drop(datums_local);
653 row_buf.replace(SourceData(Ok(row)));
654 } else {
655 let mut emit_time = *self.capability.time();
656 emit_time.0 = time;
657 session.give((Ok(row.clone()), emit_time, diff.into()));
659 row_buf.replace(SourceData(Ok(row)));
660 *work += 1;
661 }
662 }
663 (Ok(SourceData(Err(err))), Ok(())) => {
664 let mut emit_time = *self.capability.time();
665 emit_time.0 = time;
666 session.give((Err(err), emit_time, diff.into()));
667 *work += 1;
668 }
669 (Err(_), Ok(_)) | (Ok(_), Err(_)) | (Err(_), Err(_)) => {
671 panic!("decoding failed")
672 }
673 }
674 if yield_fn(start_time, *work) {
675 return false;
676 }
677 }
678 true
679 }
680}
681
682pub trait Backpressureable: Clone + 'static {
684 fn byte_size(&self) -> usize;
686}
687
688impl<T: Clone + 'static> Backpressureable for (usize, ExchangeableBatchPart<T>) {
689 fn byte_size(&self) -> usize {
690 self.1.encoded_size_bytes()
691 }
692}
693
694#[derive(Debug)]
696pub struct FlowControl<G: Scope> {
697 pub progress_stream: Stream<G, Infallible>,
703 pub max_inflight_bytes: usize,
705 pub summary: <G::Timestamp as TimelyTimestamp>::Summary,
708
709 pub metrics: Option<BackpressureMetrics>,
711}
712
713pub fn backpressure<T, G, O>(
726 scope: &mut G,
727 name: &str,
728 data: &Stream<G, O>,
729 flow_control: FlowControl<G>,
730 chosen_worker: usize,
731 probe: Option<UnboundedSender<(Antichain<(T, Subtime)>, usize, usize)>>,
733) -> (Stream<G, O>, PressOnDropButton)
734where
735 T: TimelyTimestamp + Lattice + Codec64 + TotalOrder,
736 G: Scope<Timestamp = (T, Subtime)>,
737 O: Backpressureable + std::fmt::Debug,
738{
739 let worker_index = scope.index();
740
741 let (flow_control_stream, flow_control_max_bytes, metrics) = (
742 flow_control.progress_stream,
743 flow_control.max_inflight_bytes,
744 flow_control.metrics,
745 );
746
747 let (handle, summaried_flow) = scope.feedback(flow_control.summary.clone());
752 flow_control_stream.connect_loop(handle);
753
754 let mut builder = AsyncOperatorBuilder::new(
755 format!("persist_source_backpressure({})", name),
756 scope.clone(),
757 );
758 let (data_output, data_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
759
760 let mut data_input = builder.new_disconnected_input(data, Pipeline);
761 let mut flow_control_input = builder.new_disconnected_input(&summaried_flow, Pipeline);
762
763 fn synthesize_frontiers<T: PartialOrder + Clone>(
765 mut frontier: Antichain<(T, Subtime)>,
766 mut time: (T, Subtime),
767 part_number: &mut u64,
768 ) -> (
769 (T, Subtime),
770 Antichain<(T, Subtime)>,
771 Antichain<(T, Subtime)>,
772 ) {
773 let mut next_frontier = frontier.clone();
774 time.1 = Subtime(*part_number);
775 frontier.insert(time.clone());
776 *part_number += 1;
777 let mut next_time = time.clone();
778 next_time.1 = Subtime(*part_number);
779 next_frontier.insert(next_time);
780 (time, frontier, next_frontier)
781 }
782
783 let data_input = async_stream::stream!({
786 let mut part_number = 0;
787 let mut parts: Vec<((T, Subtime), O)> = Vec::new();
788 loop {
789 match data_input.next().await {
790 None => {
791 let empty = Antichain::new();
792 parts.sort_by_key(|val| val.0.clone());
793 for (part_time, d) in parts.drain(..) {
794 let (part_time, frontier, next_frontier) = synthesize_frontiers(
795 empty.clone(),
796 part_time.clone(),
797 &mut part_number,
798 );
799 yield Either::Right((part_time, d, frontier, next_frontier))
800 }
801 break;
802 }
803 Some(Event::Data(time, data)) => {
804 for d in data {
805 parts.push((time.clone(), d));
806 }
807 }
808 Some(Event::Progress(prog)) => {
809 parts.sort_by_key(|val| val.0.clone());
810 for (part_time, d) in parts.extract_if(.., |p| !prog.less_equal(&p.0)) {
811 let (part_time, frontier, next_frontier) =
812 synthesize_frontiers(prog.clone(), part_time.clone(), &mut part_number);
813 yield Either::Right((part_time, d, frontier, next_frontier))
814 }
815 yield Either::Left(prog)
816 }
817 }
818 }
819 });
820 let shutdown_button = builder.build(move |caps| async move {
821 let mut cap_set = CapabilitySet::from_elem(caps.into_element());
823
824 let mut output_frontier = Antichain::from_elem(TimelyTimestamp::minimum());
826 let mut flow_control_frontier = Antichain::from_elem(TimelyTimestamp::minimum());
828
829 let mut inflight_parts = Vec::new();
831 let mut pending_parts = std::collections::VecDeque::new();
833
834 if worker_index != chosen_worker {
836 trace!(
837 "We are not the chosen worker ({}), exiting...",
838 chosen_worker
839 );
840 return;
841 }
842 tokio::pin!(data_input);
843 'emitting_parts: loop {
844 let inflight_bytes: usize = inflight_parts.iter().map(|(_, size)| size).sum();
847
848 if inflight_bytes < flow_control_max_bytes
856 || !PartialOrder::less_equal(&flow_control_frontier, &output_frontier)
857 {
858 let (time, part, next_frontier) =
859 if let Some((time, part, next_frontier)) = pending_parts.pop_front() {
860 (time, part, next_frontier)
861 } else {
862 match data_input.next().await {
863 Some(Either::Right((time, part, frontier, next_frontier))) => {
864 output_frontier = frontier;
869 cap_set.downgrade(output_frontier.iter());
870
871 if inflight_bytes >= flow_control_max_bytes
876 && !PartialOrder::less_than(
877 &output_frontier,
878 &flow_control_frontier,
879 )
880 {
881 pending_parts.push_back((time, part, next_frontier));
882 continue 'emitting_parts;
883 }
884 (time, part, next_frontier)
885 }
886 Some(Either::Left(prog)) => {
887 output_frontier = prog;
888 cap_set.downgrade(output_frontier.iter());
889 continue 'emitting_parts;
890 }
891 None => {
892 if pending_parts.is_empty() {
893 break 'emitting_parts;
894 } else {
895 continue 'emitting_parts;
896 }
897 }
898 }
899 };
900
901 let byte_size = part.byte_size();
902 if let Some(emission_ts) = flow_control.summary.results_in(&time) {
912 inflight_parts.push((emission_ts, byte_size));
913 }
914
915 data_output.give(&cap_set.delayed(&time), part);
918
919 if let Some(metrics) = &metrics {
920 metrics.emitted_bytes.inc_by(u64::cast_from(byte_size))
921 }
922
923 output_frontier = next_frontier;
924 cap_set.downgrade(output_frontier.iter())
925 } else {
926 if let Some(metrics) = &metrics {
927 metrics
928 .last_backpressured_bytes
929 .set(u64::cast_from(inflight_bytes))
930 }
931 let parts_count = inflight_parts.len();
932 let new_flow_control_frontier = match flow_control_input.next().await {
937 Some(Event::Progress(frontier)) => frontier,
938 Some(Event::Data(_, _)) => {
939 unreachable!("flow_control_input should not contain data")
940 }
941 None => Antichain::new(),
942 };
943
944 flow_control_frontier.clone_from(&new_flow_control_frontier);
946
947 let retired_parts = inflight_parts
949 .extract_if(.., |(ts, _size)| !flow_control_frontier.less_equal(ts));
950 let (retired_size, retired_count): (usize, usize) = retired_parts
951 .fold((0, 0), |(accum_size, accum_count), (_ts, size)| {
952 (accum_size + size, accum_count + 1)
953 });
954 trace!(
955 "returning {} parts with {} bytes, frontier: {:?}",
956 retired_count, retired_size, flow_control_frontier,
957 );
958
959 if let Some(metrics) = &metrics {
960 metrics.retired_bytes.inc_by(u64::cast_from(retired_size))
961 }
962
963 if let Some(probe) = probe.as_ref() {
965 let _ = probe.send((new_flow_control_frontier, parts_count, retired_count));
966 }
967 }
968 }
969 });
970 (data_stream, shutdown_button.press_on_drop())
971}
972
973#[cfg(test)]
974mod tests {
975 use timely::container::CapacityContainerBuilder;
976 use timely::dataflow::operators::{Enter, Probe};
977 use tokio::sync::mpsc::unbounded_channel;
978 use tokio::sync::oneshot;
979
980 use super::*;
981
982 #[mz_ore::test]
983 fn test_backpressure_non_granular() {
984 use Step::*;
985 backpressure_runner(
986 vec![(50, Part(101)), (50, Part(102)), (100, Part(1))],
987 100,
988 (1, Subtime(0)),
989 vec![
990 AssertOutputFrontier((50, Subtime(2))),
993 AssertBackpressured {
994 frontier: (1, Subtime(0)),
995 inflight_parts: 1,
996 retired_parts: 0,
997 },
998 AssertBackpressured {
999 frontier: (51, Subtime(0)),
1000 inflight_parts: 1,
1001 retired_parts: 0,
1002 },
1003 ProcessXParts(2),
1004 AssertBackpressured {
1005 frontier: (101, Subtime(0)),
1006 inflight_parts: 2,
1007 retired_parts: 2,
1008 },
1009 AssertOutputFrontier((100, Subtime(3))),
1012 ],
1013 true,
1014 );
1015
1016 backpressure_runner(
1017 vec![
1018 (50, Part(10)),
1019 (50, Part(10)),
1020 (51, Part(100)),
1021 (52, Part(1000)),
1022 ],
1023 50,
1024 (1, Subtime(0)),
1025 vec![
1026 AssertOutputFrontier((51, Subtime(3))),
1028 AssertBackpressured {
1029 frontier: (1, Subtime(0)),
1030 inflight_parts: 3,
1031 retired_parts: 0,
1032 },
1033 ProcessXParts(3),
1034 AssertBackpressured {
1035 frontier: (52, Subtime(0)),
1036 inflight_parts: 3,
1037 retired_parts: 2,
1038 },
1039 AssertBackpressured {
1040 frontier: (53, Subtime(0)),
1041 inflight_parts: 1,
1042 retired_parts: 1,
1043 },
1044 AssertOutputFrontier((52, Subtime(4))),
1047 ],
1048 true,
1049 );
1050
1051 backpressure_runner(
1052 vec![
1053 (50, Part(98)),
1054 (50, Part(1)),
1055 (51, Part(10)),
1056 (52, Part(100)),
1057 (52, Part(10)),
1059 (52, Part(10)),
1060 (52, Part(10)),
1061 (52, Part(100)),
1062 (100, Part(100)),
1064 ],
1065 100,
1066 (1, Subtime(0)),
1067 vec![
1068 AssertOutputFrontier((51, Subtime(3))),
1069 AssertBackpressured {
1073 frontier: (1, Subtime(0)),
1074 inflight_parts: 3,
1075 retired_parts: 0,
1076 },
1077 AssertBackpressured {
1078 frontier: (51, Subtime(0)),
1079 inflight_parts: 3,
1080 retired_parts: 0,
1081 },
1082 ProcessXParts(1),
1083 AssertOutputFrontier((51, Subtime(3))),
1086 ProcessXParts(1),
1090 AssertOutputFrontier((52, Subtime(4))),
1091 AssertBackpressured {
1092 frontier: (52, Subtime(0)),
1093 inflight_parts: 3,
1094 retired_parts: 2,
1095 },
1096 ProcessXParts(1),
1100 AssertBackpressured {
1104 frontier: (53, Subtime(0)),
1105 inflight_parts: 2,
1106 retired_parts: 1,
1107 },
1108 ProcessXParts(5),
1110 AssertBackpressured {
1111 frontier: (101, Subtime(0)),
1112 inflight_parts: 5,
1113 retired_parts: 5,
1114 },
1115 AssertOutputFrontier((100, Subtime(9))),
1116 ],
1117 true,
1118 );
1119 }
1120
1121 #[mz_ore::test]
1122 fn test_backpressure_granular() {
1123 use Step::*;
1124 backpressure_runner(
1125 vec![(50, Part(101)), (50, Part(101))],
1126 100,
1127 (0, Subtime(1)),
1128 vec![
1129 AssertOutputFrontier((50, Subtime(1))),
1131 AssertBackpressured {
1134 frontier: (0, Subtime(1)),
1135 inflight_parts: 1,
1136 retired_parts: 0,
1137 },
1138 AssertBackpressured {
1139 frontier: (50, Subtime(1)),
1140 inflight_parts: 1,
1141 retired_parts: 0,
1142 },
1143 ProcessXParts(1),
1145 AssertBackpressured {
1147 frontier: (50, Subtime(2)),
1148 inflight_parts: 1,
1149 retired_parts: 1,
1150 },
1151 AssertOutputFrontier((50, Subtime(2))),
1153 ],
1154 false,
1155 );
1156
1157 backpressure_runner(
1158 vec![
1159 (50, Part(10)),
1160 (50, Part(10)),
1161 (51, Part(35)),
1162 (52, Part(100)),
1163 ],
1164 50,
1165 (0, Subtime(1)),
1166 vec![
1167 AssertOutputFrontier((51, Subtime(3))),
1169 AssertBackpressured {
1170 frontier: (0, Subtime(1)),
1171 inflight_parts: 3,
1172 retired_parts: 0,
1173 },
1174 AssertBackpressured {
1175 frontier: (50, Subtime(1)),
1176 inflight_parts: 3,
1177 retired_parts: 0,
1178 },
1179 ProcessXParts(1),
1181 AssertBackpressured {
1182 frontier: (50, Subtime(2)),
1183 inflight_parts: 3,
1184 retired_parts: 1,
1185 },
1186 AssertOutputFrontier((52, Subtime(4))),
1189 ProcessXParts(2),
1190 AssertBackpressured {
1191 frontier: (52, Subtime(4)),
1192 inflight_parts: 3,
1193 retired_parts: 2,
1194 },
1195 ],
1196 false,
1197 );
1198 }
1199
1200 type Time = (u64, Subtime);
1201 #[derive(Clone, Debug)]
1202 struct Part(usize);
1203 impl Backpressureable for Part {
1204 fn byte_size(&self) -> usize {
1205 self.0
1206 }
1207 }
1208
1209 enum Step {
1211 AssertOutputFrontier(Time),
1214 AssertBackpressured {
1218 frontier: Time,
1219 inflight_parts: usize,
1220 retired_parts: usize,
1221 },
1222 ProcessXParts(usize),
1224 }
1225
1226 fn backpressure_runner(
1228 input: Vec<(u64, Part)>,
1230 max_inflight_bytes: usize,
1232 summary: Time,
1234 steps: Vec<Step>,
1236 non_granular_consumer: bool,
1239 ) {
1240 timely::execute::execute_directly(move |worker| {
1241 let (backpressure_probe, consumer_tx, mut backpressure_status_rx, finalizer_tx, _token) =
1242 worker.dataflow::<u64, _, _>(|scope| {
1244 let (non_granular_feedback_handle, non_granular_feedback) =
1245 if non_granular_consumer {
1246 let (h, f) = scope.feedback(Default::default());
1247 (Some(h), Some(f))
1248 } else {
1249 (None, None)
1250 };
1251 let (
1252 backpressure_probe,
1253 consumer_tx,
1254 backpressure_status_rx,
1255 token,
1256 backpressured,
1257 finalizer_tx,
1258 ) = scope.scoped::<(u64, Subtime), _, _>("hybrid", |scope| {
1259 let (input, finalizer_tx) =
1260 iterator_operator(scope.clone(), input.into_iter());
1261
1262 let (flow_control, granular_feedback_handle) = if non_granular_consumer {
1263 (
1264 FlowControl {
1265 progress_stream: non_granular_feedback.unwrap().enter(scope),
1266 max_inflight_bytes,
1267 summary,
1268 metrics: None
1269 },
1270 None,
1271 )
1272 } else {
1273 let (granular_feedback_handle, granular_feedback) =
1274 scope.feedback(Default::default());
1275 (
1276 FlowControl {
1277 progress_stream: granular_feedback,
1278 max_inflight_bytes,
1279 summary,
1280 metrics: None,
1281 },
1282 Some(granular_feedback_handle),
1283 )
1284 };
1285
1286 let (backpressure_status_tx, backpressure_status_rx) = unbounded_channel();
1287
1288 let (backpressured, token) = backpressure(
1289 scope,
1290 "test",
1291 &input,
1292 flow_control,
1293 0,
1294 Some(backpressure_status_tx),
1295 );
1296
1297 let tx = if !non_granular_consumer {
1299 Some(consumer_operator(
1300 scope.clone(),
1301 &backpressured,
1302 granular_feedback_handle.unwrap(),
1303 ))
1304 } else {
1305 None
1306 };
1307
1308 (
1309 backpressured.probe(),
1310 tx,
1311 backpressure_status_rx,
1312 token,
1313 backpressured.leave(),
1314 finalizer_tx,
1315 )
1316 });
1317
1318 let consumer_tx = if non_granular_consumer {
1320 consumer_operator(
1321 scope.clone(),
1322 &backpressured,
1323 non_granular_feedback_handle.unwrap(),
1324 )
1325 } else {
1326 consumer_tx.unwrap()
1327 };
1328
1329 (
1330 backpressure_probe,
1331 consumer_tx,
1332 backpressure_status_rx,
1333 finalizer_tx,
1334 token,
1335 )
1336 });
1337
1338 use Step::*;
1339 for step in steps {
1340 match step {
1341 AssertOutputFrontier(time) => {
1342 eprintln!("checking advance to {time:?}");
1343 backpressure_probe.with_frontier(|front| {
1344 eprintln!("current backpressure output frontier: {front:?}");
1345 });
1346 while backpressure_probe.less_than(&time) {
1347 worker.step();
1348 backpressure_probe.with_frontier(|front| {
1349 eprintln!("current backpressure output frontier: {front:?}");
1350 });
1351 std::thread::sleep(std::time::Duration::from_millis(25));
1352 }
1353 }
1354 ProcessXParts(parts) => {
1355 eprintln!("processing {parts:?} parts");
1356 for _ in 0..parts {
1357 consumer_tx.send(()).unwrap();
1358 }
1359 }
1360 AssertBackpressured {
1361 frontier,
1362 inflight_parts,
1363 retired_parts,
1364 } => {
1365 let frontier = Antichain::from_elem(frontier);
1366 eprintln!(
1367 "asserting backpressured at {frontier:?}, with {inflight_parts:?} inflight parts \
1368 and {retired_parts:?} retired"
1369 );
1370 let (new_frontier, new_count, new_retired_count) = loop {
1371 if let Ok(val) = backpressure_status_rx.try_recv() {
1372 break val;
1373 }
1374 worker.step();
1375 std::thread::sleep(std::time::Duration::from_millis(25));
1376 };
1377 assert_eq!(
1378 (frontier, inflight_parts, retired_parts),
1379 (new_frontier, new_count, new_retired_count)
1380 );
1381 }
1382 }
1383 }
1384 let _ = finalizer_tx.send(());
1386 });
1387 }
1388
1389 fn iterator_operator<
1392 G: Scope<Timestamp = (u64, Subtime)>,
1393 I: Iterator<Item = (u64, Part)> + 'static,
1394 >(
1395 scope: G,
1396 mut input: I,
1397 ) -> (Stream<G, Part>, oneshot::Sender<()>) {
1398 let (finalizer_tx, finalizer_rx) = oneshot::channel();
1399 let mut iterator = AsyncOperatorBuilder::new("iterator".to_string(), scope);
1400 let (output_handle, output) = iterator.new_output::<CapacityContainerBuilder<Vec<Part>>>();
1401
1402 iterator.build(|mut caps| async move {
1403 let mut capability = Some(caps.pop().unwrap());
1404 let mut last = None;
1405 while let Some(element) = input.next() {
1406 let time = element.0.clone();
1407 let part = element.1;
1408 last = Some((time, Subtime(0)));
1409 output_handle.give(&capability.as_ref().unwrap().delayed(&last.unwrap()), part);
1410 }
1411 if let Some(last) = last {
1412 capability
1413 .as_mut()
1414 .unwrap()
1415 .downgrade(&(last.0 + 1, last.1));
1416 }
1417
1418 let _ = finalizer_rx.await;
1419 capability.take();
1420 });
1421
1422 (output, finalizer_tx)
1423 }
1424
1425 fn consumer_operator<G: Scope, O: Backpressureable + std::fmt::Debug>(
1429 scope: G,
1430 input: &Stream<G, O>,
1431 feedback: timely::dataflow::operators::feedback::Handle<G, Vec<std::convert::Infallible>>,
1432 ) -> UnboundedSender<()> {
1433 let (tx, mut rx) = unbounded_channel::<()>();
1434 let mut consumer = AsyncOperatorBuilder::new("consumer".to_string(), scope);
1435 let (output_handle, output) =
1436 consumer.new_output::<CapacityContainerBuilder<Vec<std::convert::Infallible>>>();
1437 let mut input = consumer.new_input_for(input, Pipeline, &output_handle);
1438
1439 consumer.build(|_caps| async move {
1440 while let Some(()) = rx.recv().await {
1441 while let Some(Event::Progress(_)) = input.next().await {}
1443 }
1444 });
1445 output.connect_loop(feedback);
1446
1447 tx
1448 }
1449}