1use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
13use std::convert::Infallible;
14use std::fmt::Debug;
15use std::future::Future;
16use std::hash::Hash;
17use std::sync::Arc;
18use std::time::Instant;
19
20use differential_dataflow::lattice::Lattice;
21use futures::{StreamExt, future::Either};
22use mz_expr::{ColumnSpecs, Interpreter, MfpPlan, ResultSpec, UnmaterializableFunc};
23use mz_ore::cast::CastFrom;
24use mz_ore::collections::CollectionExt;
25use mz_persist_client::cache::PersistClientCache;
26use mz_persist_client::cfg::{PersistConfig, RetryParameters};
27use mz_persist_client::fetch::{ExchangeableBatchPart, ShardSourcePart};
28use mz_persist_client::fetch::{FetchedBlob, FetchedPart};
29use mz_persist_client::operators::shard_source::{
30 ErrorHandler, FilterResult, SnapshotMode, shard_source,
31};
32use mz_persist_client::stats::STATS_AUDIT_PANIC;
33use mz_persist_types::Codec64;
34use mz_persist_types::codec_impls::UnitSchema;
35use mz_persist_types::columnar::{ColumnEncoder, Schema};
36use mz_repr::{Datum, DatumVec, Diff, GlobalId, RelationDesc, Row, RowArena, Timestamp};
37use mz_storage_types::StorageDiff;
38use mz_storage_types::controller::{CollectionMetadata, TxnsCodecRow};
39use mz_storage_types::errors::DataflowError;
40use mz_storage_types::sources::SourceData;
41use mz_storage_types::stats::RelationPartStats;
42use mz_timely_util::builder_async::{
43 Event, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
44};
45use mz_timely_util::probe::ProbeNotify;
46use mz_txn_wal::operator::{TxnsContext, txns_progress};
47use serde::{Deserialize, Serialize};
48use timely::PartialOrder;
49use timely::container::CapacityContainerBuilder;
50use timely::dataflow::ScopeParent;
51use timely::dataflow::channels::pact::Pipeline;
52use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
53use timely::dataflow::operators::generic::{OutputBuilder, OutputBuilderSession};
54use timely::dataflow::operators::{Capability, Leave, OkErr};
55use timely::dataflow::operators::{CapabilitySet, ConnectLoop, Feedback};
56use timely::dataflow::scopes::Child;
57use timely::dataflow::{Scope, Stream};
58use timely::order::TotalOrder;
59use timely::progress::Antichain;
60use timely::progress::Timestamp as TimelyTimestamp;
61use timely::progress::timestamp::PathSummary;
62use timely::scheduling::Activator;
63use tokio::sync::mpsc::UnboundedSender;
64use tracing::{error, trace};
65
66use crate::metrics::BackpressureMetrics;
67
68#[derive(
76 Copy, Clone, PartialEq, Default, Eq, PartialOrd, Ord, Debug, Serialize, Deserialize, Hash,
77)]
78pub struct Subtime(u64);
79
80impl PartialOrder for Subtime {
81 fn less_equal(&self, other: &Self) -> bool {
82 self.0.less_equal(&other.0)
83 }
84}
85
86impl TotalOrder for Subtime {}
87
88impl PathSummary<Subtime> for Subtime {
89 fn results_in(&self, src: &Subtime) -> Option<Subtime> {
90 self.0.results_in(&src.0).map(Subtime)
91 }
92
93 fn followed_by(&self, other: &Self) -> Option<Self> {
94 self.0.followed_by(&other.0).map(Subtime)
95 }
96}
97
98impl TimelyTimestamp for Subtime {
99 type Summary = Subtime;
100
101 fn minimum() -> Self {
102 Subtime(0)
103 }
104}
105
106impl Subtime {
107 pub const fn least_summary() -> Self {
109 Subtime(1)
110 }
111}
112
113pub fn persist_source<G>(
136 scope: &mut G,
137 source_id: GlobalId,
138 persist_clients: Arc<PersistClientCache>,
139 txns_ctx: &TxnsContext,
140 metadata: CollectionMetadata,
141 read_schema: Option<RelationDesc>,
142 as_of: Option<Antichain<Timestamp>>,
143 snapshot_mode: SnapshotMode,
144 until: Antichain<Timestamp>,
145 map_filter_project: Option<&mut MfpPlan>,
146 max_inflight_bytes: Option<usize>,
147 start_signal: impl Future<Output = ()> + 'static,
148 error_handler: ErrorHandler,
149) -> (
150 Stream<G, (Row, Timestamp, Diff)>,
151 Stream<G, (DataflowError, Timestamp, Diff)>,
152 Vec<PressOnDropButton>,
153)
154where
155 G: Scope<Timestamp = mz_repr::Timestamp>,
156{
157 let shard_metrics = persist_clients.shard_metrics(&metadata.data_shard, &source_id.to_string());
158
159 let mut tokens = vec![];
160
161 let stream = scope.scoped(&format!("granular_backpressure({})", source_id), |scope| {
162 let (flow_control, flow_control_probe) = match max_inflight_bytes {
163 Some(max_inflight_bytes) => {
164 let backpressure_metrics = BackpressureMetrics {
165 emitted_bytes: Arc::clone(&shard_metrics.backpressure_emitted_bytes),
166 last_backpressured_bytes: Arc::clone(
167 &shard_metrics.backpressure_last_backpressured_bytes,
168 ),
169 retired_bytes: Arc::clone(&shard_metrics.backpressure_retired_bytes),
170 };
171
172 let probe = mz_timely_util::probe::Handle::default();
173 let progress_stream = mz_timely_util::probe::source(
174 scope.clone(),
175 format!("decode_backpressure_probe({source_id})"),
176 probe.clone(),
177 );
178 let flow_control = FlowControl {
179 progress_stream,
180 max_inflight_bytes,
181 summary: (Default::default(), Subtime::least_summary()),
182 metrics: Some(backpressure_metrics),
183 };
184 (Some(flow_control), Some(probe))
185 }
186 None => (None, None),
187 };
188
189 let cfg = Arc::clone(&persist_clients.cfg().configs);
195 let subscribe_sleep = match metadata.txns_shard {
196 Some(_) => Some(move || mz_txn_wal::operator::txns_data_shard_retry_params(&cfg)),
197 None => None,
198 };
199
200 let (stream, source_tokens) = persist_source_core(
201 scope,
202 source_id,
203 Arc::clone(&persist_clients),
204 metadata.clone(),
205 read_schema,
206 as_of.clone(),
207 snapshot_mode,
208 until.clone(),
209 map_filter_project,
210 flow_control,
211 subscribe_sleep,
212 start_signal,
213 error_handler,
214 );
215 tokens.extend(source_tokens);
216
217 let stream = match flow_control_probe {
218 Some(probe) => stream.probe_notify_with(vec![probe]),
219 None => stream,
220 };
221
222 stream.leave()
223 });
224
225 let (stream, txns_tokens) = match metadata.txns_shard {
230 Some(txns_shard) => txns_progress::<SourceData, (), Timestamp, i64, _, TxnsCodecRow, _, _>(
231 stream,
232 &source_id.to_string(),
233 txns_ctx,
234 move || {
235 let (c, l) = (
236 Arc::clone(&persist_clients),
237 metadata.persist_location.clone(),
238 );
239 async move { c.open(l).await.expect("location is valid") }
240 },
241 txns_shard,
242 metadata.data_shard,
243 as_of
244 .expect("as_of is provided for table sources")
245 .into_option()
246 .expect("shard is not closed"),
247 until,
248 Arc::new(metadata.relation_desc),
249 Arc::new(UnitSchema),
250 ),
251 None => (stream, vec![]),
252 };
253 tokens.extend(txns_tokens);
254 let (ok_stream, err_stream) = stream.ok_err(|(d, t, r)| match d {
255 Ok(row) => Ok((row, t.0, r)),
256 Err(err) => Err((err, t.0, r)),
257 });
258 (ok_stream, err_stream, tokens)
259}
260
261type RefinedScope<'g, G> = Child<'g, G, (<G as ScopeParent>::Timestamp, Subtime)>;
262
263#[allow(clippy::needless_borrow)]
270pub fn persist_source_core<'g, G>(
271 scope: &RefinedScope<'g, G>,
272 source_id: GlobalId,
273 persist_clients: Arc<PersistClientCache>,
274 metadata: CollectionMetadata,
275 read_schema: Option<RelationDesc>,
276 as_of: Option<Antichain<Timestamp>>,
277 snapshot_mode: SnapshotMode,
278 until: Antichain<Timestamp>,
279 map_filter_project: Option<&mut MfpPlan>,
280 flow_control: Option<FlowControl<RefinedScope<'g, G>>>,
281 listen_sleep: Option<impl Fn() -> RetryParameters + 'static>,
283 start_signal: impl Future<Output = ()> + 'static,
284 error_handler: ErrorHandler,
285) -> (
286 Stream<
287 RefinedScope<'g, G>,
288 (
289 Result<Row, DataflowError>,
290 (mz_repr::Timestamp, Subtime),
291 Diff,
292 ),
293 >,
294 Vec<PressOnDropButton>,
295)
296where
297 G: Scope<Timestamp = mz_repr::Timestamp>,
298{
299 let cfg = persist_clients.cfg().clone();
300 let name = source_id.to_string();
301 let filter_plan = map_filter_project.as_ref().map(|p| (*p).clone());
302
303 let read_desc = match read_schema {
305 Some(desc) => desc,
306 None => metadata.relation_desc,
307 };
308
309 let desc_transformer = match flow_control {
310 Some(flow_control) => Some(move |mut scope: _, descs: &Stream<_, _>, chosen_worker| {
311 let (stream, token) = backpressure(
312 &mut scope,
313 &format!("backpressure({source_id})"),
314 descs,
315 flow_control,
316 chosen_worker,
317 None,
318 );
319 (stream, vec![token])
320 }),
321 None => None,
322 };
323
324 let metrics = Arc::clone(persist_clients.metrics());
325 let filter_name = name.clone();
326 let upper = until.as_option().cloned().unwrap_or(Timestamp::MAX);
330 let (fetched, token) = shard_source(
331 &mut scope.clone(),
332 &name,
333 move || {
334 let (c, l) = (
335 Arc::clone(&persist_clients),
336 metadata.persist_location.clone(),
337 );
338 async move { c.open(l).await.unwrap() }
339 },
340 metadata.data_shard,
341 as_of,
342 snapshot_mode,
343 until.clone(),
344 desc_transformer,
345 Arc::new(read_desc.clone()),
346 Arc::new(UnitSchema),
347 move |stats, frontier| {
348 let Some(lower) = frontier.as_option().copied() else {
349 return FilterResult::Discard;
352 };
353
354 if lower > upper {
355 return FilterResult::Discard;
358 }
359
360 let time_range =
361 ResultSpec::value_between(Datum::MzTimestamp(lower), Datum::MzTimestamp(upper));
362 if let Some(plan) = &filter_plan {
363 let metrics = &metrics.pushdown.part_stats;
364 let stats = RelationPartStats::new(&filter_name, metrics, &read_desc, stats);
365 filter_result(&read_desc, time_range, stats, plan)
366 } else {
367 FilterResult::Keep
368 }
369 },
370 listen_sleep,
371 start_signal,
372 error_handler,
373 );
374 let rows = decode_and_mfp(cfg, &fetched, &name, until, map_filter_project);
375 (rows, token)
376}
377
378fn filter_result(
379 relation_desc: &RelationDesc,
380 time_range: ResultSpec,
381 stats: RelationPartStats,
382 plan: &MfpPlan,
383) -> FilterResult {
384 let arena = RowArena::new();
385 let mut ranges = ColumnSpecs::new(relation_desc.typ(), &arena);
386 ranges.push_unmaterializable(UnmaterializableFunc::MzNow, time_range);
387
388 let may_error = stats.err_count().map_or(true, |count| count > 0);
389
390 for (pos, (idx, _, _)) in relation_desc.iter_all().enumerate() {
393 let result_spec = stats.col_stats(idx, &arena);
394 ranges.push_column(pos, result_spec);
395 }
396 let result = ranges.mfp_plan_filter(plan).range;
397 let may_error = may_error || result.may_fail();
398 let may_keep = result.may_contain(Datum::True);
399 let may_skip = result.may_contain(Datum::False) || result.may_contain(Datum::Null);
400 if relation_desc.len() == 0 && !may_error && !may_skip {
401 let Ok(mut key) = <RelationDesc as Schema<SourceData>>::encoder(relation_desc) else {
402 return FilterResult::Keep;
403 };
404 key.append(&SourceData(Ok(Row::default())));
405 let key = key.finish();
406 let Ok(mut val) = <UnitSchema as Schema<()>>::encoder(&UnitSchema) else {
407 return FilterResult::Keep;
408 };
409 val.append(&());
410 let val = val.finish();
411
412 FilterResult::ReplaceWith {
413 key: Arc::new(key),
414 val: Arc::new(val),
415 }
416 } else if may_error || may_keep {
417 FilterResult::Keep
418 } else {
419 FilterResult::Discard
420 }
421}
422
423pub fn decode_and_mfp<G>(
424 cfg: PersistConfig,
425 fetched: &Stream<G, FetchedBlob<SourceData, (), Timestamp, StorageDiff>>,
426 name: &str,
427 until: Antichain<Timestamp>,
428 mut map_filter_project: Option<&mut MfpPlan>,
429) -> Stream<G, (Result<Row, DataflowError>, G::Timestamp, Diff)>
430where
431 G: Scope<Timestamp = (mz_repr::Timestamp, Subtime)>,
432{
433 let scope = fetched.scope();
434 let mut builder = OperatorBuilder::new(
435 format!("persist_source::decode_and_mfp({})", name),
436 scope.clone(),
437 );
438 let operator_info = builder.operator_info();
439
440 let mut fetched_input = builder.new_input(fetched, Pipeline);
441 let (updates_output, updates_stream) = builder.new_output();
442 let mut updates_output = OutputBuilder::from(updates_output);
443
444 let mut datum_vec = mz_repr::DatumVec::new();
446 let mut row_builder = Row::default();
447
448 let map_filter_project = map_filter_project.as_mut().map(|mfp| mfp.take());
450
451 builder.build(move |_caps| {
452 let name = name.to_owned();
453 let activations = scope.activations();
455 let activator = Activator::new(operator_info.address, activations);
456 let mut pending_work = std::collections::VecDeque::new();
458 let panic_on_audit_failure = STATS_AUDIT_PANIC.handle(&cfg);
459
460 move |_frontier| {
461 fetched_input.for_each(|time, data| {
462 let capability = time.retain();
463 for fetched_blob in data.drain(..) {
464 pending_work.push_back(PendingWork {
465 panic_on_audit_failure: panic_on_audit_failure.get(),
466 capability: capability.clone(),
467 part: PendingPart::Unparsed(fetched_blob),
468 })
469 }
470 });
471
472 let yield_fuel = cfg.storage_source_decode_fuel();
475 let yield_fn = |_, work| work >= yield_fuel;
476
477 let mut work = 0;
478 let start_time = Instant::now();
479 let mut output = updates_output.activate();
480 while !pending_work.is_empty() && !yield_fn(start_time, work) {
481 let done = pending_work.front_mut().unwrap().do_work(
482 &mut work,
483 &name,
484 start_time,
485 yield_fn,
486 &until,
487 map_filter_project.as_ref(),
488 &mut datum_vec,
489 &mut row_builder,
490 &mut output,
491 );
492 if done {
493 pending_work.pop_front();
494 }
495 }
496 if !pending_work.is_empty() {
497 activator.activate();
498 }
499 }
500 });
501
502 updates_stream
503}
504
505struct PendingWork {
507 panic_on_audit_failure: bool,
509 capability: Capability<(mz_repr::Timestamp, Subtime)>,
511 part: PendingPart,
513}
514
515enum PendingPart {
516 Unparsed(FetchedBlob<SourceData, (), Timestamp, StorageDiff>),
517 Parsed {
518 part: ShardSourcePart<SourceData, (), Timestamp, StorageDiff>,
519 },
520}
521
522impl PendingPart {
523 fn part_mut(&mut self) -> &mut FetchedPart<SourceData, (), Timestamp, StorageDiff> {
530 match self {
531 PendingPart::Unparsed(x) => {
532 *self = PendingPart::Parsed { part: x.parse() };
533 self.part_mut()
535 }
536 PendingPart::Parsed { part } => &mut part.part,
537 }
538 }
539}
540
541impl PendingWork {
542 fn do_work<YFn>(
545 &mut self,
546 work: &mut usize,
547 name: &str,
548 start_time: Instant,
549 yield_fn: YFn,
550 until: &Antichain<Timestamp>,
551 map_filter_project: Option<&MfpPlan>,
552 datum_vec: &mut DatumVec,
553 row_builder: &mut Row,
554 output: &mut OutputBuilderSession<
555 '_,
556 (mz_repr::Timestamp, Subtime),
557 ConsolidatingContainerBuilder<
558 Vec<(
559 Result<Row, DataflowError>,
560 (mz_repr::Timestamp, Subtime),
561 Diff,
562 )>,
563 >,
564 >,
565 ) -> bool
566 where
567 YFn: Fn(Instant, usize) -> bool,
568 {
569 let mut session = output.session_with_builder(&self.capability);
570 let fetched_part = self.part.part_mut();
571 let is_filter_pushdown_audit = fetched_part.is_filter_pushdown_audit();
572 let mut row_buf = None;
573 while let Some(((key, val), time, diff)) =
574 fetched_part.next_with_storage(&mut row_buf, &mut None)
575 {
576 if until.less_equal(&time) {
577 continue;
578 }
579 match (key, val) {
580 (SourceData(Ok(row)), ()) => {
581 if let Some(mfp) = map_filter_project {
582 *work += 1;
589 let arena = mz_repr::RowArena::new();
590 let mut datums_local = datum_vec.borrow_with(&row);
591 for result in mfp.evaluate(
592 &mut datums_local,
593 &arena,
594 time,
595 diff.into(),
596 |time| !until.less_equal(time),
597 row_builder,
598 ) {
599 if let Some(stats) = &is_filter_pushdown_audit {
603 sentry::with_scope(
607 |scope| {
608 scope
609 .set_tag("alert_id", "persist_pushdown_audit_violation")
610 },
611 || {
612 error!(
613 ?stats,
614 name,
615 ?mfp,
616 ?result,
617 "persist filter pushdown correctness violation!"
618 );
619 if self.panic_on_audit_failure {
620 panic!(
621 "persist filter pushdown correctness violation! {}",
622 name
623 );
624 }
625 },
626 );
627 }
628 match result {
629 Ok((row, time, diff)) => {
630 if !until.less_equal(&time) {
632 let mut emit_time = *self.capability.time();
633 emit_time.0 = time;
634 session.give((Ok(row), emit_time, diff));
635 *work += 1;
636 }
637 }
638 Err((err, time, diff)) => {
639 if !until.less_equal(&time) {
641 let mut emit_time = *self.capability.time();
642 emit_time.0 = time;
643 session.give((Err(err), emit_time, diff));
644 *work += 1;
645 }
646 }
647 }
648 }
649 drop(datums_local);
653 row_buf.replace(SourceData(Ok(row)));
654 } else {
655 let mut emit_time = *self.capability.time();
656 emit_time.0 = time;
657 session.give((Ok(row.clone()), emit_time, diff.into()));
659 row_buf.replace(SourceData(Ok(row)));
660 *work += 1;
661 }
662 }
663 (SourceData(Err(err)), ()) => {
664 let mut emit_time = *self.capability.time();
665 emit_time.0 = time;
666 session.give((Err(err), emit_time, diff.into()));
667 *work += 1;
668 }
669 }
670 if yield_fn(start_time, *work) {
671 return false;
672 }
673 }
674 true
675 }
676}
677
678pub trait Backpressureable: Clone + 'static {
680 fn byte_size(&self) -> usize;
682}
683
684impl<T: Clone + 'static> Backpressureable for (usize, ExchangeableBatchPart<T>) {
685 fn byte_size(&self) -> usize {
686 self.1.encoded_size_bytes()
687 }
688}
689
690#[derive(Debug)]
692pub struct FlowControl<G: Scope> {
693 pub progress_stream: Stream<G, Infallible>,
699 pub max_inflight_bytes: usize,
701 pub summary: <G::Timestamp as TimelyTimestamp>::Summary,
704
705 pub metrics: Option<BackpressureMetrics>,
707}
708
709pub fn backpressure<T, G, O>(
722 scope: &mut G,
723 name: &str,
724 data: &Stream<G, O>,
725 flow_control: FlowControl<G>,
726 chosen_worker: usize,
727 probe: Option<UnboundedSender<(Antichain<(T, Subtime)>, usize, usize)>>,
729) -> (Stream<G, O>, PressOnDropButton)
730where
731 T: TimelyTimestamp + Lattice + Codec64 + TotalOrder,
732 G: Scope<Timestamp = (T, Subtime)>,
733 O: Backpressureable + std::fmt::Debug,
734{
735 let worker_index = scope.index();
736
737 let (flow_control_stream, flow_control_max_bytes, metrics) = (
738 flow_control.progress_stream,
739 flow_control.max_inflight_bytes,
740 flow_control.metrics,
741 );
742
743 let (handle, summaried_flow) = scope.feedback(flow_control.summary.clone());
748 flow_control_stream.connect_loop(handle);
749
750 let mut builder = AsyncOperatorBuilder::new(
751 format!("persist_source_backpressure({})", name),
752 scope.clone(),
753 );
754 let (data_output, data_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
755
756 let mut data_input = builder.new_disconnected_input(data, Pipeline);
757 let mut flow_control_input = builder.new_disconnected_input(&summaried_flow, Pipeline);
758
759 fn synthesize_frontiers<T: PartialOrder + Clone>(
761 mut frontier: Antichain<(T, Subtime)>,
762 mut time: (T, Subtime),
763 part_number: &mut u64,
764 ) -> (
765 (T, Subtime),
766 Antichain<(T, Subtime)>,
767 Antichain<(T, Subtime)>,
768 ) {
769 let mut next_frontier = frontier.clone();
770 time.1 = Subtime(*part_number);
771 frontier.insert(time.clone());
772 *part_number += 1;
773 let mut next_time = time.clone();
774 next_time.1 = Subtime(*part_number);
775 next_frontier.insert(next_time);
776 (time, frontier, next_frontier)
777 }
778
779 let data_input = async_stream::stream!({
782 let mut part_number = 0;
783 let mut parts: Vec<((T, Subtime), O)> = Vec::new();
784 loop {
785 match data_input.next().await {
786 None => {
787 let empty = Antichain::new();
788 parts.sort_by_key(|val| val.0.clone());
789 for (part_time, d) in parts.drain(..) {
790 let (part_time, frontier, next_frontier) = synthesize_frontiers(
791 empty.clone(),
792 part_time.clone(),
793 &mut part_number,
794 );
795 yield Either::Right((part_time, d, frontier, next_frontier))
796 }
797 break;
798 }
799 Some(Event::Data(time, data)) => {
800 for d in data {
801 parts.push((time.clone(), d));
802 }
803 }
804 Some(Event::Progress(prog)) => {
805 parts.sort_by_key(|val| val.0.clone());
806 for (part_time, d) in parts.extract_if(.., |p| !prog.less_equal(&p.0)) {
807 let (part_time, frontier, next_frontier) =
808 synthesize_frontiers(prog.clone(), part_time.clone(), &mut part_number);
809 yield Either::Right((part_time, d, frontier, next_frontier))
810 }
811 yield Either::Left(prog)
812 }
813 }
814 }
815 });
816 let shutdown_button = builder.build(move |caps| async move {
817 let mut cap_set = CapabilitySet::from_elem(caps.into_element());
819
820 let mut output_frontier = Antichain::from_elem(TimelyTimestamp::minimum());
822 let mut flow_control_frontier = Antichain::from_elem(TimelyTimestamp::minimum());
824
825 let mut inflight_parts = Vec::new();
827 let mut pending_parts = std::collections::VecDeque::new();
829
830 if worker_index != chosen_worker {
832 trace!(
833 "We are not the chosen worker ({}), exiting...",
834 chosen_worker
835 );
836 return;
837 }
838 tokio::pin!(data_input);
839 'emitting_parts: loop {
840 let inflight_bytes: usize = inflight_parts.iter().map(|(_, size)| size).sum();
843
844 if inflight_bytes < flow_control_max_bytes
852 || !PartialOrder::less_equal(&flow_control_frontier, &output_frontier)
853 {
854 let (time, part, next_frontier) =
855 if let Some((time, part, next_frontier)) = pending_parts.pop_front() {
856 (time, part, next_frontier)
857 } else {
858 match data_input.next().await {
859 Some(Either::Right((time, part, frontier, next_frontier))) => {
860 output_frontier = frontier;
865 cap_set.downgrade(output_frontier.iter());
866
867 if inflight_bytes >= flow_control_max_bytes
872 && !PartialOrder::less_than(
873 &output_frontier,
874 &flow_control_frontier,
875 )
876 {
877 pending_parts.push_back((time, part, next_frontier));
878 continue 'emitting_parts;
879 }
880 (time, part, next_frontier)
881 }
882 Some(Either::Left(prog)) => {
883 output_frontier = prog;
884 cap_set.downgrade(output_frontier.iter());
885 continue 'emitting_parts;
886 }
887 None => {
888 if pending_parts.is_empty() {
889 break 'emitting_parts;
890 } else {
891 continue 'emitting_parts;
892 }
893 }
894 }
895 };
896
897 let byte_size = part.byte_size();
898 if let Some(emission_ts) = flow_control.summary.results_in(&time) {
908 inflight_parts.push((emission_ts, byte_size));
909 }
910
911 data_output.give(&cap_set.delayed(&time), part);
914
915 if let Some(metrics) = &metrics {
916 metrics.emitted_bytes.inc_by(u64::cast_from(byte_size))
917 }
918
919 output_frontier = next_frontier;
920 cap_set.downgrade(output_frontier.iter())
921 } else {
922 if let Some(metrics) = &metrics {
923 metrics
924 .last_backpressured_bytes
925 .set(u64::cast_from(inflight_bytes))
926 }
927 let parts_count = inflight_parts.len();
928 let new_flow_control_frontier = match flow_control_input.next().await {
933 Some(Event::Progress(frontier)) => frontier,
934 Some(Event::Data(_, _)) => {
935 unreachable!("flow_control_input should not contain data")
936 }
937 None => Antichain::new(),
938 };
939
940 flow_control_frontier.clone_from(&new_flow_control_frontier);
942
943 let retired_parts = inflight_parts
945 .extract_if(.., |(ts, _size)| !flow_control_frontier.less_equal(ts));
946 let (retired_size, retired_count): (usize, usize) = retired_parts
947 .fold((0, 0), |(accum_size, accum_count), (_ts, size)| {
948 (accum_size + size, accum_count + 1)
949 });
950 trace!(
951 "returning {} parts with {} bytes, frontier: {:?}",
952 retired_count, retired_size, flow_control_frontier,
953 );
954
955 if let Some(metrics) = &metrics {
956 metrics.retired_bytes.inc_by(u64::cast_from(retired_size))
957 }
958
959 if let Some(probe) = probe.as_ref() {
961 let _ = probe.send((new_flow_control_frontier, parts_count, retired_count));
962 }
963 }
964 }
965 });
966 (data_stream, shutdown_button.press_on_drop())
967}
968
969#[cfg(test)]
970mod tests {
971 use timely::container::CapacityContainerBuilder;
972 use timely::dataflow::operators::{Enter, Probe};
973 use tokio::sync::mpsc::unbounded_channel;
974 use tokio::sync::oneshot;
975
976 use super::*;
977
978 #[mz_ore::test]
979 fn test_backpressure_non_granular() {
980 use Step::*;
981 backpressure_runner(
982 vec![(50, Part(101)), (50, Part(102)), (100, Part(1))],
983 100,
984 (1, Subtime(0)),
985 vec![
986 AssertOutputFrontier((50, Subtime(2))),
989 AssertBackpressured {
990 frontier: (1, Subtime(0)),
991 inflight_parts: 1,
992 retired_parts: 0,
993 },
994 AssertBackpressured {
995 frontier: (51, Subtime(0)),
996 inflight_parts: 1,
997 retired_parts: 0,
998 },
999 ProcessXParts(2),
1000 AssertBackpressured {
1001 frontier: (101, Subtime(0)),
1002 inflight_parts: 2,
1003 retired_parts: 2,
1004 },
1005 AssertOutputFrontier((100, Subtime(3))),
1008 ],
1009 true,
1010 );
1011
1012 backpressure_runner(
1013 vec![
1014 (50, Part(10)),
1015 (50, Part(10)),
1016 (51, Part(100)),
1017 (52, Part(1000)),
1018 ],
1019 50,
1020 (1, Subtime(0)),
1021 vec![
1022 AssertOutputFrontier((51, Subtime(3))),
1024 AssertBackpressured {
1025 frontier: (1, Subtime(0)),
1026 inflight_parts: 3,
1027 retired_parts: 0,
1028 },
1029 ProcessXParts(3),
1030 AssertBackpressured {
1031 frontier: (52, Subtime(0)),
1032 inflight_parts: 3,
1033 retired_parts: 2,
1034 },
1035 AssertBackpressured {
1036 frontier: (53, Subtime(0)),
1037 inflight_parts: 1,
1038 retired_parts: 1,
1039 },
1040 AssertOutputFrontier((52, Subtime(4))),
1043 ],
1044 true,
1045 );
1046
1047 backpressure_runner(
1048 vec![
1049 (50, Part(98)),
1050 (50, Part(1)),
1051 (51, Part(10)),
1052 (52, Part(100)),
1053 (52, Part(10)),
1055 (52, Part(10)),
1056 (52, Part(10)),
1057 (52, Part(100)),
1058 (100, Part(100)),
1060 ],
1061 100,
1062 (1, Subtime(0)),
1063 vec![
1064 AssertOutputFrontier((51, Subtime(3))),
1065 AssertBackpressured {
1069 frontier: (1, Subtime(0)),
1070 inflight_parts: 3,
1071 retired_parts: 0,
1072 },
1073 AssertBackpressured {
1074 frontier: (51, Subtime(0)),
1075 inflight_parts: 3,
1076 retired_parts: 0,
1077 },
1078 ProcessXParts(1),
1079 AssertOutputFrontier((51, Subtime(3))),
1082 ProcessXParts(1),
1086 AssertOutputFrontier((52, Subtime(4))),
1087 AssertBackpressured {
1088 frontier: (52, Subtime(0)),
1089 inflight_parts: 3,
1090 retired_parts: 2,
1091 },
1092 ProcessXParts(1),
1096 AssertBackpressured {
1100 frontier: (53, Subtime(0)),
1101 inflight_parts: 2,
1102 retired_parts: 1,
1103 },
1104 ProcessXParts(5),
1106 AssertBackpressured {
1107 frontier: (101, Subtime(0)),
1108 inflight_parts: 5,
1109 retired_parts: 5,
1110 },
1111 AssertOutputFrontier((100, Subtime(9))),
1112 ],
1113 true,
1114 );
1115 }
1116
1117 #[mz_ore::test]
1118 fn test_backpressure_granular() {
1119 use Step::*;
1120 backpressure_runner(
1121 vec![(50, Part(101)), (50, Part(101))],
1122 100,
1123 (0, Subtime(1)),
1124 vec![
1125 AssertOutputFrontier((50, Subtime(1))),
1127 AssertBackpressured {
1130 frontier: (0, Subtime(1)),
1131 inflight_parts: 1,
1132 retired_parts: 0,
1133 },
1134 AssertBackpressured {
1135 frontier: (50, Subtime(1)),
1136 inflight_parts: 1,
1137 retired_parts: 0,
1138 },
1139 ProcessXParts(1),
1141 AssertBackpressured {
1143 frontier: (50, Subtime(2)),
1144 inflight_parts: 1,
1145 retired_parts: 1,
1146 },
1147 AssertOutputFrontier((50, Subtime(2))),
1149 ],
1150 false,
1151 );
1152
1153 backpressure_runner(
1154 vec![
1155 (50, Part(10)),
1156 (50, Part(10)),
1157 (51, Part(35)),
1158 (52, Part(100)),
1159 ],
1160 50,
1161 (0, Subtime(1)),
1162 vec![
1163 AssertOutputFrontier((51, Subtime(3))),
1165 AssertBackpressured {
1166 frontier: (0, Subtime(1)),
1167 inflight_parts: 3,
1168 retired_parts: 0,
1169 },
1170 AssertBackpressured {
1171 frontier: (50, Subtime(1)),
1172 inflight_parts: 3,
1173 retired_parts: 0,
1174 },
1175 ProcessXParts(1),
1177 AssertBackpressured {
1178 frontier: (50, Subtime(2)),
1179 inflight_parts: 3,
1180 retired_parts: 1,
1181 },
1182 AssertOutputFrontier((52, Subtime(4))),
1185 ProcessXParts(2),
1186 AssertBackpressured {
1187 frontier: (52, Subtime(4)),
1188 inflight_parts: 3,
1189 retired_parts: 2,
1190 },
1191 ],
1192 false,
1193 );
1194 }
1195
1196 type Time = (u64, Subtime);
1197 #[derive(Clone, Debug)]
1198 struct Part(usize);
1199 impl Backpressureable for Part {
1200 fn byte_size(&self) -> usize {
1201 self.0
1202 }
1203 }
1204
1205 enum Step {
1207 AssertOutputFrontier(Time),
1210 AssertBackpressured {
1214 frontier: Time,
1215 inflight_parts: usize,
1216 retired_parts: usize,
1217 },
1218 ProcessXParts(usize),
1220 }
1221
1222 fn backpressure_runner(
1224 input: Vec<(u64, Part)>,
1226 max_inflight_bytes: usize,
1228 summary: Time,
1230 steps: Vec<Step>,
1232 non_granular_consumer: bool,
1235 ) {
1236 timely::execute::execute_directly(move |worker| {
1237 let (backpressure_probe, consumer_tx, mut backpressure_status_rx, finalizer_tx, _token) =
1238 worker.dataflow::<u64, _, _>(|scope| {
1240 let (non_granular_feedback_handle, non_granular_feedback) =
1241 if non_granular_consumer {
1242 let (h, f) = scope.feedback(Default::default());
1243 (Some(h), Some(f))
1244 } else {
1245 (None, None)
1246 };
1247 let (
1248 backpressure_probe,
1249 consumer_tx,
1250 backpressure_status_rx,
1251 token,
1252 backpressured,
1253 finalizer_tx,
1254 ) = scope.scoped::<(u64, Subtime), _, _>("hybrid", |scope| {
1255 let (input, finalizer_tx) =
1256 iterator_operator(scope.clone(), input.into_iter());
1257
1258 let (flow_control, granular_feedback_handle) = if non_granular_consumer {
1259 (
1260 FlowControl {
1261 progress_stream: non_granular_feedback.unwrap().enter(scope),
1262 max_inflight_bytes,
1263 summary,
1264 metrics: None
1265 },
1266 None,
1267 )
1268 } else {
1269 let (granular_feedback_handle, granular_feedback) =
1270 scope.feedback(Default::default());
1271 (
1272 FlowControl {
1273 progress_stream: granular_feedback,
1274 max_inflight_bytes,
1275 summary,
1276 metrics: None,
1277 },
1278 Some(granular_feedback_handle),
1279 )
1280 };
1281
1282 let (backpressure_status_tx, backpressure_status_rx) = unbounded_channel();
1283
1284 let (backpressured, token) = backpressure(
1285 scope,
1286 "test",
1287 &input,
1288 flow_control,
1289 0,
1290 Some(backpressure_status_tx),
1291 );
1292
1293 let tx = if !non_granular_consumer {
1295 Some(consumer_operator(
1296 scope.clone(),
1297 &backpressured,
1298 granular_feedback_handle.unwrap(),
1299 ))
1300 } else {
1301 None
1302 };
1303
1304 (
1305 backpressured.probe(),
1306 tx,
1307 backpressure_status_rx,
1308 token,
1309 backpressured.leave(),
1310 finalizer_tx,
1311 )
1312 });
1313
1314 let consumer_tx = if non_granular_consumer {
1316 consumer_operator(
1317 scope.clone(),
1318 &backpressured,
1319 non_granular_feedback_handle.unwrap(),
1320 )
1321 } else {
1322 consumer_tx.unwrap()
1323 };
1324
1325 (
1326 backpressure_probe,
1327 consumer_tx,
1328 backpressure_status_rx,
1329 finalizer_tx,
1330 token,
1331 )
1332 });
1333
1334 use Step::*;
1335 for step in steps {
1336 match step {
1337 AssertOutputFrontier(time) => {
1338 eprintln!("checking advance to {time:?}");
1339 backpressure_probe.with_frontier(|front| {
1340 eprintln!("current backpressure output frontier: {front:?}");
1341 });
1342 while backpressure_probe.less_than(&time) {
1343 worker.step();
1344 backpressure_probe.with_frontier(|front| {
1345 eprintln!("current backpressure output frontier: {front:?}");
1346 });
1347 std::thread::sleep(std::time::Duration::from_millis(25));
1348 }
1349 }
1350 ProcessXParts(parts) => {
1351 eprintln!("processing {parts:?} parts");
1352 for _ in 0..parts {
1353 consumer_tx.send(()).unwrap();
1354 }
1355 }
1356 AssertBackpressured {
1357 frontier,
1358 inflight_parts,
1359 retired_parts,
1360 } => {
1361 let frontier = Antichain::from_elem(frontier);
1362 eprintln!(
1363 "asserting backpressured at {frontier:?}, with {inflight_parts:?} inflight parts \
1364 and {retired_parts:?} retired"
1365 );
1366 let (new_frontier, new_count, new_retired_count) = loop {
1367 if let Ok(val) = backpressure_status_rx.try_recv() {
1368 break val;
1369 }
1370 worker.step();
1371 std::thread::sleep(std::time::Duration::from_millis(25));
1372 };
1373 assert_eq!(
1374 (frontier, inflight_parts, retired_parts),
1375 (new_frontier, new_count, new_retired_count)
1376 );
1377 }
1378 }
1379 }
1380 let _ = finalizer_tx.send(());
1382 });
1383 }
1384
1385 fn iterator_operator<
1388 G: Scope<Timestamp = (u64, Subtime)>,
1389 I: Iterator<Item = (u64, Part)> + 'static,
1390 >(
1391 scope: G,
1392 mut input: I,
1393 ) -> (Stream<G, Part>, oneshot::Sender<()>) {
1394 let (finalizer_tx, finalizer_rx) = oneshot::channel();
1395 let mut iterator = AsyncOperatorBuilder::new("iterator".to_string(), scope);
1396 let (output_handle, output) = iterator.new_output::<CapacityContainerBuilder<Vec<Part>>>();
1397
1398 iterator.build(|mut caps| async move {
1399 let mut capability = Some(caps.pop().unwrap());
1400 let mut last = None;
1401 while let Some(element) = input.next() {
1402 let time = element.0.clone();
1403 let part = element.1;
1404 last = Some((time, Subtime(0)));
1405 output_handle.give(&capability.as_ref().unwrap().delayed(&last.unwrap()), part);
1406 }
1407 if let Some(last) = last {
1408 capability
1409 .as_mut()
1410 .unwrap()
1411 .downgrade(&(last.0 + 1, last.1));
1412 }
1413
1414 let _ = finalizer_rx.await;
1415 capability.take();
1416 });
1417
1418 (output, finalizer_tx)
1419 }
1420
1421 fn consumer_operator<G: Scope, O: Backpressureable + std::fmt::Debug>(
1425 scope: G,
1426 input: &Stream<G, O>,
1427 feedback: timely::dataflow::operators::feedback::Handle<G, Vec<std::convert::Infallible>>,
1428 ) -> UnboundedSender<()> {
1429 let (tx, mut rx) = unbounded_channel::<()>();
1430 let mut consumer = AsyncOperatorBuilder::new("consumer".to_string(), scope);
1431 let (output_handle, output) =
1432 consumer.new_output::<CapacityContainerBuilder<Vec<std::convert::Infallible>>>();
1433 let mut input = consumer.new_input_for(input, Pipeline, &output_handle);
1434
1435 consumer.build(|_caps| async move {
1436 while let Some(()) = rx.recv().await {
1437 while let Some(Event::Progress(_)) = input.next().await {}
1439 }
1440 });
1441 output.connect_loop(feedback);
1442
1443 tx
1444 }
1445}