1use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
13use std::convert::Infallible;
14use std::fmt::Debug;
15use std::future::Future;
16use std::hash::Hash;
17use std::sync::Arc;
18use std::time::Instant;
19
20use differential_dataflow::lattice::Lattice;
21use futures::{StreamExt, future::Either};
22use mz_expr::{ColumnSpecs, Interpreter, MfpPlan, ResultSpec, UnmaterializableFunc};
23use mz_ore::cast::CastFrom;
24use mz_ore::collections::CollectionExt;
25use mz_persist_client::cache::PersistClientCache;
26use mz_persist_client::cfg::{PersistConfig, RetryParameters};
27use mz_persist_client::fetch::{ExchangeableBatchPart, ShardSourcePart};
28use mz_persist_client::fetch::{FetchedBlob, FetchedPart};
29use mz_persist_client::operators::shard_source::{
30 ErrorHandler, FilterResult, SnapshotMode, shard_source,
31};
32use mz_persist_client::stats::STATS_AUDIT_PANIC;
33use mz_persist_types::Codec64;
34use mz_persist_types::codec_impls::UnitSchema;
35use mz_persist_types::columnar::{ColumnEncoder, Schema};
36use mz_repr::{
37 Datum, DatumVec, Diff, GlobalId, RelationDesc, ReprRelationType, Row, RowArena, Timestamp,
38};
39use mz_storage_types::StorageDiff;
40use mz_storage_types::controller::{CollectionMetadata, TxnsCodecRow};
41use mz_storage_types::errors::DataflowError;
42use mz_storage_types::sources::SourceData;
43use mz_storage_types::stats::RelationPartStats;
44use mz_timely_util::builder_async::{
45 Event, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
46};
47use mz_timely_util::probe::ProbeNotify;
48use mz_txn_wal::operator::{TxnsContext, txns_progress};
49use serde::{Deserialize, Serialize};
50use timely::PartialOrder;
51use timely::container::CapacityContainerBuilder;
52use timely::dataflow::channels::pact::Pipeline;
53use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
54use timely::dataflow::operators::generic::{OutputBuilder, OutputBuilderSession};
55use timely::dataflow::operators::{Capability, Leave, OkErr};
56use timely::dataflow::operators::{CapabilitySet, ConnectLoop, Feedback};
57use timely::dataflow::{Scope, Stream, StreamVec};
58use timely::order::TotalOrder;
59use timely::progress::Antichain;
60use timely::progress::Timestamp as TimelyTimestamp;
61use timely::progress::timestamp::PathSummary;
62use timely::scheduling::Activator;
63use tokio::sync::mpsc::UnboundedSender;
64use tracing::{error, trace};
65
66use crate::metrics::BackpressureMetrics;
67
68#[derive(
76 Copy,
77 Clone,
78 PartialEq,
79 Default,
80 Eq,
81 PartialOrd,
82 Ord,
83 Debug,
84 Serialize,
85 Deserialize,
86 Hash
87)]
88pub struct Subtime(u64);
89
90impl PartialOrder for Subtime {
91 fn less_equal(&self, other: &Self) -> bool {
92 self.0.less_equal(&other.0)
93 }
94}
95
96impl TotalOrder for Subtime {}
97
98impl PathSummary<Subtime> for Subtime {
99 fn results_in(&self, src: &Subtime) -> Option<Subtime> {
100 self.0.results_in(&src.0).map(Subtime)
101 }
102
103 fn followed_by(&self, other: &Self) -> Option<Self> {
104 self.0.followed_by(&other.0).map(Subtime)
105 }
106}
107
108impl TimelyTimestamp for Subtime {
109 type Summary = Subtime;
110
111 fn minimum() -> Self {
112 Subtime(0)
113 }
114}
115
116impl differential_dataflow::lattice::Lattice for Subtime {
117 fn join(&self, other: &Self) -> Self {
118 Subtime(std::cmp::max(self.0, other.0))
119 }
120 fn meet(&self, other: &Self) -> Self {
121 Subtime(std::cmp::min(self.0, other.0))
122 }
123}
124
125impl differential_dataflow::lattice::Maximum for Subtime {
126 fn maximum() -> Self {
127 Subtime(u64::MAX)
128 }
129}
130
131impl Subtime {
132 pub const fn least_summary() -> Self {
134 Subtime(1)
135 }
136}
137
138pub fn persist_source<'scope>(
161 scope: Scope<'scope, mz_repr::Timestamp>,
162 source_id: GlobalId,
163 persist_clients: Arc<PersistClientCache>,
164 txns_ctx: &TxnsContext,
165 metadata: CollectionMetadata,
166 read_schema: Option<RelationDesc>,
167 as_of: Option<Antichain<Timestamp>>,
168 snapshot_mode: SnapshotMode,
169 until: Antichain<Timestamp>,
170 map_filter_project: Option<&mut MfpPlan>,
171 max_inflight_bytes: Option<usize>,
172 start_signal: impl Future<Output = ()> + 'static,
173 error_handler: ErrorHandler,
174) -> (
175 StreamVec<'scope, mz_repr::Timestamp, (Row, Timestamp, Diff)>,
176 StreamVec<'scope, mz_repr::Timestamp, (DataflowError, Timestamp, Diff)>,
177 Vec<PressOnDropButton>,
178) {
179 let shard_metrics = persist_clients.shard_metrics(&metadata.data_shard, &source_id.to_string());
180
181 let mut tokens = vec![];
182
183 let outer = scope.clone();
184 let stream = scope.scoped(&format!("granular_backpressure({})", source_id), |scope| {
185 let (flow_control, flow_control_probe) = match max_inflight_bytes {
186 Some(max_inflight_bytes) => {
187 let backpressure_metrics = BackpressureMetrics {
188 emitted_bytes: Arc::clone(&shard_metrics.backpressure_emitted_bytes),
189 last_backpressured_bytes: Arc::clone(
190 &shard_metrics.backpressure_last_backpressured_bytes,
191 ),
192 retired_bytes: Arc::clone(&shard_metrics.backpressure_retired_bytes),
193 };
194
195 let probe = mz_timely_util::probe::Handle::default();
196 let progress_stream = mz_timely_util::probe::source(
197 scope.clone(),
198 format!("decode_backpressure_probe({source_id})"),
199 probe.clone(),
200 );
201 let flow_control = FlowControl {
202 progress_stream,
203 max_inflight_bytes,
204 summary: (Default::default(), Subtime::least_summary()),
205 metrics: Some(backpressure_metrics),
206 };
207 (Some(flow_control), Some(probe))
208 }
209 None => (None, None),
210 };
211
212 let cfg = Arc::clone(&persist_clients.cfg().configs);
218 let subscribe_sleep = match metadata.txns_shard {
219 Some(_) => Some(move || mz_txn_wal::operator::txns_data_shard_retry_params(&cfg)),
220 None => None,
221 };
222
223 let (stream, source_tokens) = persist_source_core(
224 outer,
225 scope,
226 source_id,
227 Arc::clone(&persist_clients),
228 metadata.clone(),
229 read_schema,
230 as_of.clone(),
231 snapshot_mode,
232 until.clone(),
233 map_filter_project,
234 flow_control,
235 subscribe_sleep,
236 start_signal,
237 error_handler,
238 );
239 tokens.extend(source_tokens);
240
241 let stream = match flow_control_probe {
242 Some(probe) => stream.probe_notify_with(vec![probe]),
243 None => stream,
244 };
245
246 stream.leave(outer)
247 });
248
249 let (stream, txns_tokens) = match metadata.txns_shard {
254 Some(txns_shard) => txns_progress::<SourceData, (), Timestamp, i64, _, TxnsCodecRow, _>(
255 stream,
256 &source_id.to_string(),
257 txns_ctx,
258 move || {
259 let (c, l) = (
260 Arc::clone(&persist_clients),
261 metadata.persist_location.clone(),
262 );
263 async move { c.open(l).await.expect("location is valid") }
264 },
265 txns_shard,
266 metadata.data_shard,
267 as_of
268 .expect("as_of is provided for table sources")
269 .into_option()
270 .expect("shard is not closed"),
271 until,
272 Arc::new(metadata.relation_desc),
273 Arc::new(UnitSchema),
274 ),
275 None => (stream, vec![]),
276 };
277 tokens.extend(txns_tokens);
278 let (ok_stream, err_stream) = stream.ok_err(|(d, t, r)| match d {
279 Ok(row) => Ok((row, t.0, r)),
280 Err(err) => Err((err, t.0, r)),
281 });
282 (ok_stream, err_stream, tokens)
283}
284
285type RefinedScope<'scope, T> = Scope<'scope, (T, Subtime)>;
286
287#[allow(clippy::needless_borrow)]
294pub fn persist_source_core<'g, 'outer>(
295 outer: Scope<'outer, mz_repr::Timestamp>,
296 scope: RefinedScope<'g, mz_repr::Timestamp>,
297 source_id: GlobalId,
298 persist_clients: Arc<PersistClientCache>,
299 metadata: CollectionMetadata,
300 read_schema: Option<RelationDesc>,
301 as_of: Option<Antichain<Timestamp>>,
302 snapshot_mode: SnapshotMode,
303 until: Antichain<Timestamp>,
304 map_filter_project: Option<&mut MfpPlan>,
305 flow_control: Option<FlowControl<'g, (mz_repr::Timestamp, Subtime)>>,
306 listen_sleep: Option<impl Fn() -> RetryParameters + 'static>,
308 start_signal: impl Future<Output = ()> + 'static,
309 error_handler: ErrorHandler,
310) -> (
311 Stream<
312 'g,
313 (mz_repr::Timestamp, Subtime),
314 Vec<(
315 Result<Row, DataflowError>,
316 (mz_repr::Timestamp, Subtime),
317 Diff,
318 )>,
319 >,
320 Vec<PressOnDropButton>,
321) {
322 let cfg = persist_clients.cfg().clone();
323 let name = source_id.to_string();
324 let filter_plan = map_filter_project.as_ref().map(|p| (*p).clone());
325
326 let read_desc = match read_schema {
328 Some(desc) => desc,
329 None => metadata.relation_desc,
330 };
331
332 let desc_transformer = match flow_control {
333 Some(flow_control) => Some(move |scope, descs, chosen_worker| {
334 let (stream, token) = backpressure(
335 scope,
336 &format!("backpressure({source_id})"),
337 descs,
338 flow_control,
339 chosen_worker,
340 None,
341 );
342 (stream, vec![token])
343 }),
344 None => None,
345 };
346
347 let metrics = Arc::clone(persist_clients.metrics());
348 let filter_name = name.clone();
349 let upper = until.as_option().cloned().unwrap_or(Timestamp::MAX);
353 let (fetched, token) = shard_source(
354 outer,
355 scope,
356 &name,
357 move || {
358 let (c, l) = (
359 Arc::clone(&persist_clients),
360 metadata.persist_location.clone(),
361 );
362 async move { c.open(l).await.unwrap() }
363 },
364 metadata.data_shard,
365 as_of,
366 snapshot_mode,
367 until.clone(),
368 desc_transformer,
369 Arc::new(read_desc.clone()),
370 Arc::new(UnitSchema),
371 move |stats, frontier| {
372 let Some(lower) = frontier.as_option().copied() else {
373 return FilterResult::Discard;
376 };
377
378 if lower > upper {
379 return FilterResult::Discard;
382 }
383
384 let time_range =
385 ResultSpec::value_between(Datum::MzTimestamp(lower), Datum::MzTimestamp(upper));
386 if let Some(plan) = &filter_plan {
387 let metrics = &metrics.pushdown.part_stats;
388 let stats = RelationPartStats::new(&filter_name, metrics, &read_desc, stats);
389 filter_result(&read_desc, time_range, stats, plan)
390 } else {
391 FilterResult::Keep
392 }
393 },
394 listen_sleep,
395 start_signal,
396 error_handler,
397 );
398 let rows = decode_and_mfp(cfg, fetched, &name, until, map_filter_project);
399 (rows, token)
400}
401
402fn filter_result(
403 relation_desc: &RelationDesc,
404 time_range: ResultSpec,
405 stats: RelationPartStats,
406 plan: &MfpPlan,
407) -> FilterResult {
408 let arena = RowArena::new();
409 let relation = ReprRelationType::from(relation_desc.typ());
410 let mut ranges = ColumnSpecs::new(&relation, &arena);
411 ranges.push_unmaterializable(UnmaterializableFunc::MzNow, time_range);
412
413 let may_error = stats.err_count().map_or(true, |count| count > 0);
414
415 for (pos, (idx, _, _)) in relation_desc.iter_all().enumerate() {
418 let result_spec = stats.col_stats(idx, &arena);
419 ranges.push_column(pos, result_spec);
420 }
421 let result = ranges.mfp_plan_filter(plan).range;
422 let may_error = may_error || result.may_fail();
423 let may_keep = result.may_contain(Datum::True);
424 let may_skip = result.may_contain(Datum::False) || result.may_contain(Datum::Null);
425 if relation_desc.len() == 0 && !may_error && !may_skip {
426 let Ok(mut key) = <RelationDesc as Schema<SourceData>>::encoder(relation_desc) else {
427 return FilterResult::Keep;
428 };
429 key.append(&SourceData(Ok(Row::default())));
430 let key = key.finish();
431 let Ok(mut val) = <UnitSchema as Schema<()>>::encoder(&UnitSchema) else {
432 return FilterResult::Keep;
433 };
434 val.append(&());
435 let val = val.finish();
436
437 FilterResult::ReplaceWith {
438 key: Arc::new(key),
439 val: Arc::new(val),
440 }
441 } else if may_error || may_keep {
442 FilterResult::Keep
443 } else {
444 FilterResult::Discard
445 }
446}
447
448pub fn decode_and_mfp<'scope>(
449 cfg: PersistConfig,
450 fetched: StreamVec<
451 'scope,
452 (mz_repr::Timestamp, Subtime),
453 FetchedBlob<SourceData, (), Timestamp, StorageDiff>,
454 >,
455 name: &str,
456 until: Antichain<Timestamp>,
457 mut map_filter_project: Option<&mut MfpPlan>,
458) -> StreamVec<
459 'scope,
460 (mz_repr::Timestamp, Subtime),
461 (
462 Result<Row, DataflowError>,
463 (mz_repr::Timestamp, Subtime),
464 Diff,
465 ),
466> {
467 let scope = fetched.scope();
468 let mut builder = OperatorBuilder::new(
469 format!("persist_source::decode_and_mfp({})", name),
470 scope.clone(),
471 );
472 let operator_info = builder.operator_info();
473
474 let mut fetched_input = builder.new_input(fetched, Pipeline);
475 let (updates_output, updates_stream) = builder.new_output();
476 let mut updates_output = OutputBuilder::from(updates_output);
477
478 let mut datum_vec = mz_repr::DatumVec::new();
480 let mut row_builder = Row::default();
481
482 let map_filter_project = map_filter_project.as_mut().map(|mfp| mfp.take());
484
485 builder.build(move |_caps| {
486 let name = name.to_owned();
487 let activations = scope.activations();
489 let activator = Activator::new(operator_info.address, activations);
490 let mut pending_work = std::collections::VecDeque::new();
492 let panic_on_audit_failure = STATS_AUDIT_PANIC.handle(&cfg);
493
494 move |_frontier| {
495 fetched_input.for_each(|time, data| {
496 let capability = time.retain(0);
497 for fetched_blob in data.drain(..) {
498 pending_work.push_back(PendingWork {
499 panic_on_audit_failure: panic_on_audit_failure.get(),
500 capability: capability.clone(),
501 part: PendingPart::Unparsed(fetched_blob),
502 })
503 }
504 });
505
506 let yield_fuel = cfg.storage_source_decode_fuel();
509 let yield_fn = |_, work| work >= yield_fuel;
510
511 let mut work = 0;
512 let start_time = Instant::now();
513 let mut output = updates_output.activate();
514 while !pending_work.is_empty() && !yield_fn(start_time, work) {
515 let done = pending_work.front_mut().unwrap().do_work(
516 &mut work,
517 &name,
518 start_time,
519 yield_fn,
520 &until,
521 map_filter_project.as_ref(),
522 &mut datum_vec,
523 &mut row_builder,
524 &mut output,
525 );
526 if done {
527 pending_work.pop_front();
528 }
529 }
530 if !pending_work.is_empty() {
531 activator.activate();
532 }
533 }
534 });
535
536 updates_stream
537}
538
539struct PendingWork {
541 panic_on_audit_failure: bool,
543 capability: Capability<(mz_repr::Timestamp, Subtime)>,
545 part: PendingPart,
547}
548
549enum PendingPart {
550 Unparsed(FetchedBlob<SourceData, (), Timestamp, StorageDiff>),
551 Parsed {
552 part: ShardSourcePart<SourceData, (), Timestamp, StorageDiff>,
553 },
554}
555
556impl PendingPart {
557 fn part_mut(&mut self) -> &mut FetchedPart<SourceData, (), Timestamp, StorageDiff> {
564 match self {
565 PendingPart::Unparsed(x) => {
566 *self = PendingPart::Parsed { part: x.parse() };
567 self.part_mut()
569 }
570 PendingPart::Parsed { part } => &mut part.part,
571 }
572 }
573}
574
575impl PendingWork {
576 fn do_work<YFn>(
579 &mut self,
580 work: &mut usize,
581 name: &str,
582 start_time: Instant,
583 yield_fn: YFn,
584 until: &Antichain<Timestamp>,
585 map_filter_project: Option<&MfpPlan>,
586 datum_vec: &mut DatumVec,
587 row_builder: &mut Row,
588 output: &mut OutputBuilderSession<
589 '_,
590 (mz_repr::Timestamp, Subtime),
591 ConsolidatingContainerBuilder<
592 Vec<(
593 Result<Row, DataflowError>,
594 (mz_repr::Timestamp, Subtime),
595 Diff,
596 )>,
597 >,
598 >,
599 ) -> bool
600 where
601 YFn: Fn(Instant, usize) -> bool,
602 {
603 let mut session = output.session_with_builder(&self.capability);
604 let fetched_part = self.part.part_mut();
605 let is_filter_pushdown_audit = fetched_part.is_filter_pushdown_audit();
606 let mut row_buf = None;
607 while let Some(((key, val), time, diff)) =
608 fetched_part.next_with_storage(&mut row_buf, &mut None)
609 {
610 if until.less_equal(&time) {
611 continue;
612 }
613 match (key, val) {
614 (SourceData(Ok(row)), ()) => {
615 if let Some(mfp) = map_filter_project {
616 *work += 1;
623 let arena = mz_repr::RowArena::new();
624 let mut datums_local = datum_vec.borrow_with(&row);
625 for result in mfp.evaluate(
626 &mut datums_local,
627 &arena,
628 time,
629 diff.into(),
630 |time| !until.less_equal(time),
631 row_builder,
632 ) {
633 if let Some(stats) = &is_filter_pushdown_audit {
637 sentry::with_scope(
641 |scope| {
642 scope
643 .set_tag("alert_id", "persist_pushdown_audit_violation")
644 },
645 || {
646 error!(
647 ?stats,
648 name,
649 ?mfp,
650 ?result,
651 "persist filter pushdown correctness violation!"
652 );
653 if self.panic_on_audit_failure {
654 panic!(
655 "persist filter pushdown correctness violation! {}",
656 name
657 );
658 }
659 },
660 );
661 }
662 match result {
663 Ok((row, time, diff)) => {
664 if !until.less_equal(&time) {
666 let mut emit_time = *self.capability.time();
667 emit_time.0 = time;
668 session.give((Ok(row), emit_time, diff));
669 *work += 1;
670 }
671 }
672 Err((err, time, diff)) => {
673 if !until.less_equal(&time) {
675 let mut emit_time = *self.capability.time();
676 emit_time.0 = time;
677 session.give((Err(err), emit_time, diff));
678 *work += 1;
679 }
680 }
681 }
682 }
683 drop(datums_local);
687 row_buf.replace(SourceData(Ok(row)));
688 } else {
689 let mut emit_time = *self.capability.time();
690 emit_time.0 = time;
691 session.give((Ok(row.clone()), emit_time, diff.into()));
693 row_buf.replace(SourceData(Ok(row)));
694 *work += 1;
695 }
696 }
697 (SourceData(Err(err)), ()) => {
698 let mut emit_time = *self.capability.time();
699 emit_time.0 = time;
700 session.give((Err(err), emit_time, diff.into()));
701 *work += 1;
702 }
703 }
704 if yield_fn(start_time, *work) {
705 return false;
706 }
707 }
708 true
709 }
710}
711
712pub trait Backpressureable: Clone + 'static {
714 fn byte_size(&self) -> usize;
716}
717
718impl<T: Clone + 'static> Backpressureable for (usize, ExchangeableBatchPart<T>) {
719 fn byte_size(&self) -> usize {
720 self.1.encoded_size_bytes()
721 }
722}
723
724#[derive(Debug)]
726pub struct FlowControl<'scope, T: timely::progress::Timestamp> {
727 pub progress_stream: StreamVec<'scope, T, Infallible>,
733 pub max_inflight_bytes: usize,
735 pub summary: T::Summary,
738
739 pub metrics: Option<BackpressureMetrics>,
741}
742
743pub fn backpressure<'scope, T, O>(
756 scope: Scope<'scope, (T, Subtime)>,
757 name: &str,
758 data: StreamVec<'scope, (T, Subtime), O>,
759 flow_control: FlowControl<'scope, (T, Subtime)>,
760 chosen_worker: usize,
761 probe: Option<UnboundedSender<(Antichain<(T, Subtime)>, usize, usize)>>,
763) -> (StreamVec<'scope, (T, Subtime), O>, PressOnDropButton)
764where
765 T: TimelyTimestamp + Lattice + Codec64 + TotalOrder,
766 O: Backpressureable + std::fmt::Debug,
767{
768 let worker_index = scope.index();
769
770 let (flow_control_stream, flow_control_max_bytes, metrics) = (
771 flow_control.progress_stream,
772 flow_control.max_inflight_bytes,
773 flow_control.metrics,
774 );
775
776 let (handle, summaried_flow) = scope.feedback(flow_control.summary.clone());
781 flow_control_stream.connect_loop(handle);
782
783 let mut builder = AsyncOperatorBuilder::new(
784 format!("persist_source_backpressure({})", name),
785 scope.clone(),
786 );
787 let (data_output, data_stream) = builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
788
789 let mut data_input = builder.new_disconnected_input(data, Pipeline);
790 let mut flow_control_input = builder.new_disconnected_input(summaried_flow, Pipeline);
791
792 fn synthesize_frontiers<T: PartialOrder + Clone>(
794 mut frontier: Antichain<(T, Subtime)>,
795 mut time: (T, Subtime),
796 part_number: &mut u64,
797 ) -> (
798 (T, Subtime),
799 Antichain<(T, Subtime)>,
800 Antichain<(T, Subtime)>,
801 ) {
802 let mut next_frontier = frontier.clone();
803 time.1 = Subtime(*part_number);
804 frontier.insert(time.clone());
805 *part_number += 1;
806 let mut next_time = time.clone();
807 next_time.1 = Subtime(*part_number);
808 next_frontier.insert(next_time);
809 (time, frontier, next_frontier)
810 }
811
812 let data_input = async_stream::stream!({
815 let mut part_number = 0;
816 let mut parts: Vec<((T, Subtime), O)> = Vec::new();
817 loop {
818 match data_input.next().await {
819 None => {
820 let empty = Antichain::new();
821 parts.sort_by_key(|val| val.0.clone());
822 for (part_time, d) in parts.drain(..) {
823 let (part_time, frontier, next_frontier) = synthesize_frontiers(
824 empty.clone(),
825 part_time.clone(),
826 &mut part_number,
827 );
828 yield Either::Right((part_time, d, frontier, next_frontier))
829 }
830 break;
831 }
832 Some(Event::Data(time, data)) => {
833 for d in data {
834 parts.push((time.clone(), d));
835 }
836 }
837 Some(Event::Progress(prog)) => {
838 parts.sort_by_key(|val| val.0.clone());
839 for (part_time, d) in parts.extract_if(.., |p| !prog.less_equal(&p.0)) {
840 let (part_time, frontier, next_frontier) =
841 synthesize_frontiers(prog.clone(), part_time.clone(), &mut part_number);
842 yield Either::Right((part_time, d, frontier, next_frontier))
843 }
844 yield Either::Left(prog)
845 }
846 }
847 }
848 });
849 let shutdown_button = builder.build(move |caps| async move {
850 let mut cap_set = CapabilitySet::from_elem(caps.into_element());
852
853 let mut output_frontier = Antichain::from_elem(TimelyTimestamp::minimum());
855 let mut flow_control_frontier = Antichain::from_elem(TimelyTimestamp::minimum());
857
858 let mut inflight_parts = Vec::new();
860 let mut pending_parts = std::collections::VecDeque::new();
862
863 if worker_index != chosen_worker {
865 trace!(
866 "We are not the chosen worker ({}), exiting...",
867 chosen_worker
868 );
869 return;
870 }
871 tokio::pin!(data_input);
872 'emitting_parts: loop {
873 let inflight_bytes: usize = inflight_parts.iter().map(|(_, size)| size).sum();
876
877 if inflight_bytes < flow_control_max_bytes
885 || !PartialOrder::less_equal(&flow_control_frontier, &output_frontier)
886 {
887 let (time, part, next_frontier) =
888 if let Some((time, part, next_frontier)) = pending_parts.pop_front() {
889 (time, part, next_frontier)
890 } else {
891 match data_input.next().await {
892 Some(Either::Right((time, part, frontier, next_frontier))) => {
893 output_frontier = frontier;
898 cap_set.downgrade(output_frontier.iter());
899
900 if inflight_bytes >= flow_control_max_bytes
905 && !PartialOrder::less_than(
906 &output_frontier,
907 &flow_control_frontier,
908 )
909 {
910 pending_parts.push_back((time, part, next_frontier));
911 continue 'emitting_parts;
912 }
913 (time, part, next_frontier)
914 }
915 Some(Either::Left(prog)) => {
916 output_frontier = prog;
917 cap_set.downgrade(output_frontier.iter());
918 continue 'emitting_parts;
919 }
920 None => {
921 if pending_parts.is_empty() {
922 break 'emitting_parts;
923 } else {
924 continue 'emitting_parts;
925 }
926 }
927 }
928 };
929
930 let byte_size = part.byte_size();
931 if let Some(emission_ts) = flow_control.summary.results_in(&time) {
941 inflight_parts.push((emission_ts, byte_size));
942 }
943
944 data_output.give(&cap_set.delayed(&time), part);
947
948 if let Some(metrics) = &metrics {
949 metrics.emitted_bytes.inc_by(u64::cast_from(byte_size))
950 }
951
952 output_frontier = next_frontier;
953 cap_set.downgrade(output_frontier.iter())
954 } else {
955 if let Some(metrics) = &metrics {
956 metrics
957 .last_backpressured_bytes
958 .set(u64::cast_from(inflight_bytes))
959 }
960 let parts_count = inflight_parts.len();
961 let new_flow_control_frontier = match flow_control_input.next().await {
966 Some(Event::Progress(frontier)) => frontier,
967 Some(Event::Data(_, _)) => {
968 unreachable!("flow_control_input should not contain data")
969 }
970 None => Antichain::new(),
971 };
972
973 flow_control_frontier.clone_from(&new_flow_control_frontier);
975
976 let retired_parts = inflight_parts
978 .extract_if(.., |(ts, _size)| !flow_control_frontier.less_equal(ts));
979 let (retired_size, retired_count): (usize, usize) = retired_parts
980 .fold((0, 0), |(accum_size, accum_count), (_ts, size)| {
981 (accum_size + size, accum_count + 1)
982 });
983 trace!(
984 "returning {} parts with {} bytes, frontier: {:?}",
985 retired_count, retired_size, flow_control_frontier,
986 );
987
988 if let Some(metrics) = &metrics {
989 metrics.retired_bytes.inc_by(u64::cast_from(retired_size))
990 }
991
992 if let Some(probe) = probe.as_ref() {
994 let _ = probe.send((new_flow_control_frontier, parts_count, retired_count));
995 }
996 }
997 }
998 });
999 (data_stream, shutdown_button.press_on_drop())
1000}
1001
1002#[cfg(test)]
1003mod tests {
1004 use timely::container::CapacityContainerBuilder;
1005 use timely::dataflow::operators::{Enter, Probe};
1006 use tokio::sync::mpsc::unbounded_channel;
1007 use tokio::sync::oneshot;
1008
1009 use super::*;
1010
1011 #[mz_ore::test]
1012 fn test_backpressure_non_granular() {
1013 use Step::*;
1014 backpressure_runner(
1015 vec![(50, Part(101)), (50, Part(102)), (100, Part(1))],
1016 100,
1017 (1, Subtime(0)),
1018 vec![
1019 AssertOutputFrontier((50, Subtime(2))),
1022 AssertBackpressured {
1023 frontier: (1, Subtime(0)),
1024 inflight_parts: 1,
1025 retired_parts: 0,
1026 },
1027 AssertBackpressured {
1028 frontier: (51, Subtime(0)),
1029 inflight_parts: 1,
1030 retired_parts: 0,
1031 },
1032 ProcessXParts(2),
1033 AssertBackpressured {
1034 frontier: (101, Subtime(0)),
1035 inflight_parts: 2,
1036 retired_parts: 2,
1037 },
1038 AssertOutputFrontier((100, Subtime(3))),
1041 ],
1042 true,
1043 );
1044
1045 backpressure_runner(
1046 vec![
1047 (50, Part(10)),
1048 (50, Part(10)),
1049 (51, Part(100)),
1050 (52, Part(1000)),
1051 ],
1052 50,
1053 (1, Subtime(0)),
1054 vec![
1055 AssertOutputFrontier((51, Subtime(3))),
1057 AssertBackpressured {
1058 frontier: (1, Subtime(0)),
1059 inflight_parts: 3,
1060 retired_parts: 0,
1061 },
1062 ProcessXParts(3),
1063 AssertBackpressured {
1064 frontier: (52, Subtime(0)),
1065 inflight_parts: 3,
1066 retired_parts: 2,
1067 },
1068 AssertBackpressured {
1069 frontier: (53, Subtime(0)),
1070 inflight_parts: 1,
1071 retired_parts: 1,
1072 },
1073 AssertOutputFrontier((52, Subtime(4))),
1076 ],
1077 true,
1078 );
1079
1080 backpressure_runner(
1081 vec![
1082 (50, Part(98)),
1083 (50, Part(1)),
1084 (51, Part(10)),
1085 (52, Part(100)),
1086 (52, Part(10)),
1088 (52, Part(10)),
1089 (52, Part(10)),
1090 (52, Part(100)),
1091 (100, Part(100)),
1093 ],
1094 100,
1095 (1, Subtime(0)),
1096 vec![
1097 AssertOutputFrontier((51, Subtime(3))),
1098 AssertBackpressured {
1102 frontier: (1, Subtime(0)),
1103 inflight_parts: 3,
1104 retired_parts: 0,
1105 },
1106 AssertBackpressured {
1107 frontier: (51, Subtime(0)),
1108 inflight_parts: 3,
1109 retired_parts: 0,
1110 },
1111 ProcessXParts(1),
1112 AssertOutputFrontier((51, Subtime(3))),
1115 ProcessXParts(1),
1119 AssertOutputFrontier((52, Subtime(4))),
1120 AssertBackpressured {
1121 frontier: (52, Subtime(0)),
1122 inflight_parts: 3,
1123 retired_parts: 2,
1124 },
1125 ProcessXParts(1),
1129 AssertBackpressured {
1133 frontier: (53, Subtime(0)),
1134 inflight_parts: 2,
1135 retired_parts: 1,
1136 },
1137 ProcessXParts(5),
1139 AssertBackpressured {
1140 frontier: (101, Subtime(0)),
1141 inflight_parts: 5,
1142 retired_parts: 5,
1143 },
1144 AssertOutputFrontier((100, Subtime(9))),
1145 ],
1146 true,
1147 );
1148 }
1149
1150 #[mz_ore::test]
1151 fn test_backpressure_granular() {
1152 use Step::*;
1153 backpressure_runner(
1154 vec![(50, Part(101)), (50, Part(101))],
1155 100,
1156 (0, Subtime(1)),
1157 vec![
1158 AssertOutputFrontier((50, Subtime(1))),
1160 AssertBackpressured {
1163 frontier: (0, Subtime(1)),
1164 inflight_parts: 1,
1165 retired_parts: 0,
1166 },
1167 AssertBackpressured {
1168 frontier: (50, Subtime(1)),
1169 inflight_parts: 1,
1170 retired_parts: 0,
1171 },
1172 ProcessXParts(1),
1174 AssertBackpressured {
1176 frontier: (50, Subtime(2)),
1177 inflight_parts: 1,
1178 retired_parts: 1,
1179 },
1180 AssertOutputFrontier((50, Subtime(2))),
1182 ],
1183 false,
1184 );
1185
1186 backpressure_runner(
1187 vec![
1188 (50, Part(10)),
1189 (50, Part(10)),
1190 (51, Part(35)),
1191 (52, Part(100)),
1192 ],
1193 50,
1194 (0, Subtime(1)),
1195 vec![
1196 AssertOutputFrontier((51, Subtime(3))),
1198 AssertBackpressured {
1199 frontier: (0, Subtime(1)),
1200 inflight_parts: 3,
1201 retired_parts: 0,
1202 },
1203 AssertBackpressured {
1204 frontier: (50, Subtime(1)),
1205 inflight_parts: 3,
1206 retired_parts: 0,
1207 },
1208 ProcessXParts(1),
1210 AssertBackpressured {
1211 frontier: (50, Subtime(2)),
1212 inflight_parts: 3,
1213 retired_parts: 1,
1214 },
1215 AssertOutputFrontier((52, Subtime(4))),
1218 ProcessXParts(2),
1219 AssertBackpressured {
1220 frontier: (52, Subtime(4)),
1221 inflight_parts: 3,
1222 retired_parts: 2,
1223 },
1224 ],
1225 false,
1226 );
1227 }
1228
1229 type Time = (u64, Subtime);
1230 #[derive(Clone, Debug)]
1231 struct Part(usize);
1232 impl Backpressureable for Part {
1233 fn byte_size(&self) -> usize {
1234 self.0
1235 }
1236 }
1237
1238 enum Step {
1240 AssertOutputFrontier(Time),
1243 AssertBackpressured {
1247 frontier: Time,
1248 inflight_parts: usize,
1249 retired_parts: usize,
1250 },
1251 ProcessXParts(usize),
1253 }
1254
1255 fn backpressure_runner(
1257 input: Vec<(u64, Part)>,
1259 max_inflight_bytes: usize,
1261 summary: Time,
1263 steps: Vec<Step>,
1265 non_granular_consumer: bool,
1268 ) {
1269 timely::execute::execute_directly(move |worker| {
1270 let (
1271 backpressure_probe,
1272 consumer_tx,
1273 mut backpressure_status_rx,
1274 finalizer_tx,
1275 _token,
1276 ) =
1277 worker.dataflow::<u64, _, _>(|outer_scope| {
1279 let (non_granular_feedback_handle, non_granular_feedback) =
1280 if non_granular_consumer {
1281 let (h, f) = outer_scope.feedback(Default::default());
1282 (Some(h), Some(f))
1283 } else {
1284 (None, None)
1285 };
1286 let (
1287 backpressure_probe,
1288 consumer_tx,
1289 backpressure_status_rx,
1290 token,
1291 backpressured,
1292 finalizer_tx,
1293 ) = outer_scope.scoped::<(u64, Subtime), _, _>("hybrid", |scope| {
1294 let (input, finalizer_tx) =
1295 iterator_operator(scope.clone(), input.into_iter());
1296
1297 let (flow_control, granular_feedback_handle) = if non_granular_consumer {
1298 (
1299 FlowControl {
1300 progress_stream: non_granular_feedback.unwrap().enter(scope),
1301 max_inflight_bytes,
1302 summary,
1303 metrics: None
1304 },
1305 None,
1306 )
1307 } else {
1308 let (granular_feedback_handle, granular_feedback) =
1309 scope.feedback(Default::default());
1310 (
1311 FlowControl {
1312 progress_stream: granular_feedback,
1313 max_inflight_bytes,
1314 summary,
1315 metrics: None,
1316 },
1317 Some(granular_feedback_handle),
1318 )
1319 };
1320
1321 let (backpressure_status_tx, backpressure_status_rx) = unbounded_channel();
1322
1323 let (backpressured, token) = backpressure(
1324 scope,
1325 "test",
1326 input,
1327 flow_control,
1328 0,
1329 Some(backpressure_status_tx),
1330 );
1331
1332 let tx = if !non_granular_consumer {
1334 Some(consumer_operator(
1335 scope.clone(),
1336 backpressured.clone(),
1337 granular_feedback_handle.unwrap(),
1338 ))
1339 } else {
1340 None
1341 };
1342
1343 let (probe_handle, backpressured) = backpressured.probe();
1344 (
1345 probe_handle,
1346 tx,
1347 backpressure_status_rx,
1348 token,
1349 backpressured.leave(outer_scope),
1350 finalizer_tx,
1351 )
1352 });
1353
1354 let consumer_tx = if non_granular_consumer {
1356 consumer_operator(
1357 outer_scope.clone(),
1358 backpressured,
1359 non_granular_feedback_handle.unwrap(),
1360 )
1361 } else {
1362 consumer_tx.unwrap()
1363 };
1364
1365 (
1366 backpressure_probe,
1367 consumer_tx,
1368 backpressure_status_rx,
1369 finalizer_tx,
1370 token,
1371 )
1372 });
1373
1374 use Step::*;
1375 for step in steps {
1376 match step {
1377 AssertOutputFrontier(time) => {
1378 eprintln!("checking advance to {time:?}");
1379 backpressure_probe.with_frontier(|front| {
1380 eprintln!("current backpressure output frontier: {front:?}");
1381 });
1382 while backpressure_probe.less_than(&time) {
1383 worker.step();
1384 backpressure_probe.with_frontier(|front| {
1385 eprintln!("current backpressure output frontier: {front:?}");
1386 });
1387 std::thread::sleep(std::time::Duration::from_millis(25));
1388 }
1389 }
1390 ProcessXParts(parts) => {
1391 eprintln!("processing {parts:?} parts");
1392 for _ in 0..parts {
1393 consumer_tx.send(()).unwrap();
1394 }
1395 }
1396 AssertBackpressured {
1397 frontier,
1398 inflight_parts,
1399 retired_parts,
1400 } => {
1401 let frontier = Antichain::from_elem(frontier);
1402 eprintln!(
1403 "asserting backpressured at {frontier:?}, with {inflight_parts:?} inflight parts \
1404 and {retired_parts:?} retired"
1405 );
1406 let (new_frontier, new_count, new_retired_count) = loop {
1407 if let Ok(val) = backpressure_status_rx.try_recv() {
1408 break val;
1409 }
1410 worker.step();
1411 std::thread::sleep(std::time::Duration::from_millis(25));
1412 };
1413 assert_eq!(
1414 (frontier, inflight_parts, retired_parts),
1415 (new_frontier, new_count, new_retired_count)
1416 );
1417 }
1418 }
1419 }
1420 let _ = finalizer_tx.send(());
1422 });
1423 }
1424
1425 fn iterator_operator<'scope, I: Iterator<Item = (u64, Part)> + 'static>(
1428 scope: Scope<'scope, (u64, Subtime)>,
1429 mut input: I,
1430 ) -> (StreamVec<'scope, (u64, Subtime), Part>, oneshot::Sender<()>) {
1431 let (finalizer_tx, finalizer_rx) = oneshot::channel();
1432 let mut iterator = AsyncOperatorBuilder::new("iterator".to_string(), scope);
1433 let (output_handle, output) = iterator.new_output::<CapacityContainerBuilder<Vec<Part>>>();
1434
1435 iterator.build(|mut caps| async move {
1436 let mut capability = Some(caps.pop().unwrap());
1437 let mut last = None;
1438 while let Some(element) = input.next() {
1439 let time = element.0.clone();
1440 let part = element.1;
1441 last = Some((time, Subtime(0)));
1442 output_handle.give(&capability.as_ref().unwrap().delayed(&last.unwrap()), part);
1443 }
1444 if let Some(last) = last {
1445 capability
1446 .as_mut()
1447 .unwrap()
1448 .downgrade(&(last.0 + 1, last.1));
1449 }
1450
1451 let _ = finalizer_rx.await;
1452 capability.take();
1453 });
1454
1455 (output, finalizer_tx)
1456 }
1457
1458 fn consumer_operator<
1462 'scope,
1463 T: timely::progress::Timestamp,
1464 O: Backpressureable + std::fmt::Debug,
1465 >(
1466 scope: Scope<'scope, T>,
1467 input: StreamVec<'scope, T, O>,
1468 feedback: timely::dataflow::operators::feedback::Handle<
1469 'scope,
1470 T,
1471 Vec<std::convert::Infallible>,
1472 >,
1473 ) -> UnboundedSender<()> {
1474 let (tx, mut rx) = unbounded_channel::<()>();
1475 let mut consumer = AsyncOperatorBuilder::new("consumer".to_string(), scope);
1476 let (output_handle, output) =
1477 consumer.new_output::<CapacityContainerBuilder<Vec<std::convert::Infallible>>>();
1478 let mut input = consumer.new_input_for(input, Pipeline, &output_handle);
1479
1480 consumer.build(|_caps| async move {
1481 while let Some(()) = rx.recv().await {
1482 while let Some(Event::Progress(_)) = input.next().await {}
1484 }
1485 });
1486 output.connect_loop(feedback);
1487
1488 tx
1489 }
1490}