1use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
13use mz_dyncfg::ConfigSet;
14use std::convert::Infallible;
15use std::fmt::Debug;
16use std::future::Future;
17use std::hash::Hash;
18use std::pin::Pin;
19use std::sync::Arc;
20use std::time::Instant;
21
22use differential_dataflow::lattice::Lattice;
23use futures::{StreamExt, future::Either};
24use mz_expr::{ColumnSpecs, Interpreter, MfpPlan, ResultSpec, UnmaterializableFunc};
25use mz_ore::cast::CastFrom;
26use mz_ore::collections::CollectionExt;
27use mz_ore::vec::VecExt;
28use mz_persist_client::cache::PersistClientCache;
29use mz_persist_client::cfg::{PersistConfig, RetryParameters};
30use mz_persist_client::fetch::{FetchedBlob, FetchedPart};
31use mz_persist_client::fetch::{SerdeLeasedBatchPart, ShardSourcePart};
32use mz_persist_client::operators::shard_source::{FilterResult, SnapshotMode, shard_source};
33use mz_persist_types::Codec64;
34use mz_persist_types::codec_impls::UnitSchema;
35use mz_persist_types::columnar::{ColumnEncoder, Schema};
36use mz_repr::{Datum, DatumVec, Diff, GlobalId, RelationDesc, Row, RowArena, Timestamp};
37use mz_storage_types::StorageDiff;
38use mz_storage_types::controller::{CollectionMetadata, TxnsCodecRow};
39use mz_storage_types::errors::DataflowError;
40use mz_storage_types::sources::SourceData;
41use mz_storage_types::stats::RelationPartStats;
42use mz_timely_util::builder_async::{
43 Event, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
44};
45use mz_timely_util::probe::ProbeNotify;
46use mz_txn_wal::operator::{TxnsContext, txns_progress};
47use serde::{Deserialize, Serialize};
48use timely::PartialOrder;
49use timely::communication::Push;
50use timely::dataflow::ScopeParent;
51use timely::dataflow::channels::Message;
52use timely::dataflow::channels::pact::Pipeline;
53use timely::dataflow::operators::generic::OutputHandleCore;
54use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
55use timely::dataflow::operators::{Capability, Leave, OkErr};
56use timely::dataflow::operators::{CapabilitySet, ConnectLoop, Feedback};
57use timely::dataflow::scopes::Child;
58use timely::dataflow::{Scope, Stream};
59use timely::order::TotalOrder;
60use timely::progress::Antichain;
61use timely::progress::Timestamp as TimelyTimestamp;
62use timely::progress::timestamp::PathSummary;
63use timely::scheduling::Activator;
64use tokio::sync::mpsc::UnboundedSender;
65use tracing::trace;
66
67use crate::metrics::BackpressureMetrics;
68
69#[derive(
77 Copy, Clone, PartialEq, Default, Eq, PartialOrd, Ord, Debug, Serialize, Deserialize, Hash,
78)]
79pub struct Subtime(u64);
80
81impl PartialOrder for Subtime {
82 fn less_equal(&self, other: &Self) -> bool {
83 self.0.less_equal(&other.0)
84 }
85}
86
87impl TotalOrder for Subtime {}
88
89impl PathSummary<Subtime> for Subtime {
90 fn results_in(&self, src: &Subtime) -> Option<Subtime> {
91 self.0.results_in(&src.0).map(Subtime)
92 }
93
94 fn followed_by(&self, other: &Self) -> Option<Self> {
95 self.0.followed_by(&other.0).map(Subtime)
96 }
97}
98
99impl TimelyTimestamp for Subtime {
100 type Summary = Subtime;
101
102 fn minimum() -> Self {
103 Subtime(0)
104 }
105}
106
107impl Subtime {
108 pub const fn least_summary() -> Self {
110 Subtime(1)
111 }
112}
113
114pub fn persist_source<G>(
137 scope: &mut G,
138 source_id: GlobalId,
139 persist_clients: Arc<PersistClientCache>,
140 txns_ctx: &TxnsContext,
141 worker_dyncfgs: &ConfigSet,
144 metadata: CollectionMetadata,
145 read_schema: Option<RelationDesc>,
146 as_of: Option<Antichain<Timestamp>>,
147 snapshot_mode: SnapshotMode,
148 until: Antichain<Timestamp>,
149 map_filter_project: Option<&mut MfpPlan>,
150 max_inflight_bytes: Option<usize>,
151 start_signal: impl Future<Output = ()> + 'static,
152 error_handler: impl FnOnce(String) -> Pin<Box<dyn Future<Output = ()>>> + 'static,
153) -> (
154 Stream<G, (Row, Timestamp, Diff)>,
155 Stream<G, (DataflowError, Timestamp, Diff)>,
156 Vec<PressOnDropButton>,
157)
158where
159 G: Scope<Timestamp = mz_repr::Timestamp>,
160{
161 let shard_metrics = persist_clients.shard_metrics(&metadata.data_shard, &source_id.to_string());
162
163 let mut tokens = vec![];
164
165 let stream = scope.scoped(&format!("granular_backpressure({})", source_id), |scope| {
166 let (flow_control, flow_control_probe) = match max_inflight_bytes {
167 Some(max_inflight_bytes) => {
168 let backpressure_metrics = BackpressureMetrics {
169 emitted_bytes: Arc::clone(&shard_metrics.backpressure_emitted_bytes),
170 last_backpressured_bytes: Arc::clone(
171 &shard_metrics.backpressure_last_backpressured_bytes,
172 ),
173 retired_bytes: Arc::clone(&shard_metrics.backpressure_retired_bytes),
174 };
175
176 let probe = mz_timely_util::probe::Handle::default();
177 let progress_stream = mz_timely_util::probe::source(
178 scope.clone(),
179 format!("decode_backpressure_probe({source_id})"),
180 probe.clone(),
181 );
182 let flow_control = FlowControl {
183 progress_stream,
184 max_inflight_bytes,
185 summary: (Default::default(), Subtime::least_summary()),
186 metrics: Some(backpressure_metrics),
187 };
188 (Some(flow_control), Some(probe))
189 }
190 None => (None, None),
191 };
192
193 let cfg = Arc::clone(&persist_clients.cfg().configs);
199 let subscribe_sleep = match metadata.txns_shard {
200 Some(_) => Some(move || mz_txn_wal::operator::txns_data_shard_retry_params(&cfg)),
201 None => None,
202 };
203
204 let (stream, source_tokens) = persist_source_core(
205 scope,
206 source_id,
207 Arc::clone(&persist_clients),
208 metadata.clone(),
209 read_schema,
210 as_of.clone(),
211 snapshot_mode,
212 until.clone(),
213 map_filter_project,
214 flow_control,
215 subscribe_sleep,
216 start_signal,
217 error_handler,
218 );
219 tokens.extend(source_tokens);
220
221 let stream = match flow_control_probe {
222 Some(probe) => stream.probe_notify_with(vec![probe]),
223 None => stream,
224 };
225
226 stream.leave()
227 });
228
229 let (stream, txns_tokens) = match metadata.txns_shard {
234 Some(txns_shard) => txns_progress::<SourceData, (), Timestamp, i64, _, TxnsCodecRow, _, _>(
235 stream,
236 &source_id.to_string(),
237 txns_ctx,
238 worker_dyncfgs,
239 move || {
240 let (c, l) = (
241 Arc::clone(&persist_clients),
242 metadata.persist_location.clone(),
243 );
244 async move { c.open(l).await.expect("location is valid") }
245 },
246 txns_shard,
247 metadata.data_shard,
248 as_of
249 .expect("as_of is provided for table sources")
250 .into_option()
251 .expect("shard is not closed"),
252 until,
253 Arc::new(metadata.relation_desc),
254 Arc::new(UnitSchema),
255 ),
256 None => (stream, vec![]),
257 };
258 tokens.extend(txns_tokens);
259 let (ok_stream, err_stream) = stream.ok_err(|(d, t, r)| match d {
260 Ok(row) => Ok((row, t.0, r)),
261 Err(err) => Err((err, t.0, r)),
262 });
263 (ok_stream, err_stream, tokens)
264}
265
266type RefinedScope<'g, G> = Child<'g, G, (<G as ScopeParent>::Timestamp, Subtime)>;
267
268#[allow(clippy::needless_borrow)]
275pub fn persist_source_core<'g, G>(
276 scope: &RefinedScope<'g, G>,
277 source_id: GlobalId,
278 persist_clients: Arc<PersistClientCache>,
279 metadata: CollectionMetadata,
280 read_schema: Option<RelationDesc>,
281 as_of: Option<Antichain<Timestamp>>,
282 snapshot_mode: SnapshotMode,
283 until: Antichain<Timestamp>,
284 map_filter_project: Option<&mut MfpPlan>,
285 flow_control: Option<FlowControl<RefinedScope<'g, G>>>,
286 listen_sleep: Option<impl Fn() -> RetryParameters + 'static>,
288 start_signal: impl Future<Output = ()> + 'static,
289 error_handler: impl FnOnce(String) -> Pin<Box<dyn Future<Output = ()>>> + 'static,
290) -> (
291 Stream<
292 RefinedScope<'g, G>,
293 (
294 Result<Row, DataflowError>,
295 (mz_repr::Timestamp, Subtime),
296 Diff,
297 ),
298 >,
299 Vec<PressOnDropButton>,
300)
301where
302 G: Scope<Timestamp = mz_repr::Timestamp>,
303{
304 let cfg = persist_clients.cfg().clone();
305 let name = source_id.to_string();
306 let filter_plan = map_filter_project.as_ref().map(|p| (*p).clone());
307
308 let read_desc = match read_schema {
310 Some(desc) => desc,
311 None => metadata.relation_desc,
312 };
313
314 let desc_transformer = match flow_control {
315 Some(flow_control) => Some(move |mut scope: _, descs: &Stream<_, _>, chosen_worker| {
316 let (stream, token) = backpressure(
317 &mut scope,
318 &format!("backpressure({source_id})"),
319 descs,
320 flow_control,
321 chosen_worker,
322 None,
323 );
324 (stream, vec![token])
325 }),
326 None => None,
327 };
328
329 let metrics = Arc::clone(persist_clients.metrics());
330 let filter_name = name.clone();
331 let upper = until.as_option().cloned().unwrap_or(Timestamp::MAX);
335 let (fetched, token) = shard_source(
336 &mut scope.clone(),
337 &name,
338 move || {
339 let (c, l) = (
340 Arc::clone(&persist_clients),
341 metadata.persist_location.clone(),
342 );
343 async move { c.open(l).await.unwrap() }
344 },
345 metadata.data_shard,
346 as_of,
347 snapshot_mode,
348 until.clone(),
349 desc_transformer,
350 Arc::new(read_desc.clone()),
351 Arc::new(UnitSchema),
352 move |stats, frontier| {
353 let Some(lower) = frontier.as_option().copied() else {
354 return FilterResult::Discard;
357 };
358
359 if lower > upper {
360 return FilterResult::Discard;
363 }
364
365 let time_range =
366 ResultSpec::value_between(Datum::MzTimestamp(lower), Datum::MzTimestamp(upper));
367 if let Some(plan) = &filter_plan {
368 let metrics = &metrics.pushdown.part_stats;
369 let stats = RelationPartStats::new(&filter_name, metrics, &read_desc, stats);
370 filter_result(&read_desc, time_range, stats, plan)
371 } else {
372 FilterResult::Keep
373 }
374 },
375 listen_sleep,
376 start_signal,
377 error_handler,
378 );
379 let rows = decode_and_mfp(cfg, &fetched, &name, until, map_filter_project);
380 (rows, token)
381}
382
383fn filter_result(
384 relation_desc: &RelationDesc,
385 time_range: ResultSpec,
386 stats: RelationPartStats,
387 plan: &MfpPlan,
388) -> FilterResult {
389 let arena = RowArena::new();
390 let mut ranges = ColumnSpecs::new(relation_desc.typ(), &arena);
391 ranges.push_unmaterializable(UnmaterializableFunc::MzNow, time_range);
392
393 let may_error = stats.err_count().map_or(true, |count| count > 0);
394
395 for (pos, (idx, _, _)) in relation_desc.iter_all().enumerate() {
398 let result_spec = stats.col_stats(idx, &arena);
399 ranges.push_column(pos, result_spec);
400 }
401 let result = ranges.mfp_plan_filter(plan).range;
402 let may_error = may_error || result.may_fail();
403 let may_keep = result.may_contain(Datum::True);
404 let may_skip = result.may_contain(Datum::False) || result.may_contain(Datum::Null);
405 if relation_desc.len() == 0 && !may_error && !may_skip {
406 let Ok(mut key) = <RelationDesc as Schema<SourceData>>::encoder(relation_desc) else {
407 return FilterResult::Keep;
408 };
409 key.append(&SourceData(Ok(Row::default())));
410 let key = key.finish();
411 let Ok(mut val) = <UnitSchema as Schema<()>>::encoder(&UnitSchema) else {
412 return FilterResult::Keep;
413 };
414 val.append(&());
415 let val = val.finish();
416
417 FilterResult::ReplaceWith {
418 key: Arc::new(key),
419 val: Arc::new(val),
420 }
421 } else if may_error || may_keep {
422 FilterResult::Keep
423 } else {
424 FilterResult::Discard
425 }
426}
427
428pub fn decode_and_mfp<G>(
429 cfg: PersistConfig,
430 fetched: &Stream<G, FetchedBlob<SourceData, (), Timestamp, StorageDiff>>,
431 name: &str,
432 until: Antichain<Timestamp>,
433 mut map_filter_project: Option<&mut MfpPlan>,
434) -> Stream<G, (Result<Row, DataflowError>, G::Timestamp, Diff)>
435where
436 G: Scope<Timestamp = (mz_repr::Timestamp, Subtime)>,
437{
438 let scope = fetched.scope();
439 let mut builder = OperatorBuilder::new(
440 format!("persist_source::decode_and_mfp({})", name),
441 scope.clone(),
442 );
443 let operator_info = builder.operator_info();
444
445 let mut fetched_input = builder.new_input(fetched, Pipeline);
446 let (mut updates_output, updates_stream) =
447 builder.new_output::<ConsolidatingContainerBuilder<_>>();
448
449 let mut datum_vec = mz_repr::DatumVec::new();
451 let mut row_builder = Row::default();
452
453 let map_filter_project = map_filter_project.as_mut().map(|mfp| mfp.take());
455
456 builder.build(move |_caps| {
457 let name = name.to_owned();
458 let activations = scope.activations();
460 let activator = Activator::new(operator_info.address, activations);
461 let mut pending_work = std::collections::VecDeque::new();
463
464 move |_frontier| {
465 fetched_input.for_each(|time, data| {
466 let capability = time.retain();
467 for fetched_blob in data.drain(..) {
468 pending_work.push_back(PendingWork {
469 capability: capability.clone(),
470 part: PendingPart::Unparsed(fetched_blob),
471 })
472 }
473 });
474
475 let yield_fuel = cfg.storage_source_decode_fuel();
478 let yield_fn = |_, work| work >= yield_fuel;
479
480 let mut work = 0;
481 let start_time = Instant::now();
482 let mut output = updates_output.activate();
483 while !pending_work.is_empty() && !yield_fn(start_time, work) {
484 let done = pending_work.front_mut().unwrap().do_work(
485 &mut work,
486 &name,
487 start_time,
488 yield_fn,
489 &until,
490 map_filter_project.as_ref(),
491 &mut datum_vec,
492 &mut row_builder,
493 &mut output,
494 );
495 if done {
496 pending_work.pop_front();
497 }
498 }
499 if !pending_work.is_empty() {
500 activator.activate();
501 }
502 }
503 });
504
505 updates_stream
506}
507
508struct PendingWork {
510 capability: Capability<(mz_repr::Timestamp, Subtime)>,
512 part: PendingPart,
514}
515
516enum PendingPart {
517 Unparsed(FetchedBlob<SourceData, (), Timestamp, StorageDiff>),
518 Parsed {
519 part: ShardSourcePart<SourceData, (), Timestamp, StorageDiff>,
520 },
521}
522
523impl PendingPart {
524 fn part_mut(&mut self) -> &mut FetchedPart<SourceData, (), Timestamp, StorageDiff> {
531 match self {
532 PendingPart::Unparsed(x) => {
533 *self = PendingPart::Parsed { part: x.parse() };
534 self.part_mut()
536 }
537 PendingPart::Parsed { part } => &mut part.part,
538 }
539 }
540}
541
542impl PendingWork {
543 fn do_work<P, YFn>(
546 &mut self,
547 work: &mut usize,
548 name: &str,
549 start_time: Instant,
550 yield_fn: YFn,
551 until: &Antichain<Timestamp>,
552 map_filter_project: Option<&MfpPlan>,
553 datum_vec: &mut DatumVec,
554 row_builder: &mut Row,
555 output: &mut OutputHandleCore<
556 '_,
557 (mz_repr::Timestamp, Subtime),
558 ConsolidatingContainerBuilder<
559 Vec<(
560 Result<Row, DataflowError>,
561 (mz_repr::Timestamp, Subtime),
562 Diff,
563 )>,
564 >,
565 P,
566 >,
567 ) -> bool
568 where
569 P: Push<
570 Message<
571 (mz_repr::Timestamp, Subtime),
572 Vec<(
573 Result<Row, DataflowError>,
574 (mz_repr::Timestamp, Subtime),
575 Diff,
576 )>,
577 >,
578 >,
579 YFn: Fn(Instant, usize) -> bool,
580 {
581 let mut session = output.session_with_builder(&self.capability);
582 let fetched_part = self.part.part_mut();
583 let is_filter_pushdown_audit = fetched_part.is_filter_pushdown_audit();
584 let mut row_buf = None;
585 while let Some(((key, val), time, diff)) =
586 fetched_part.next_with_storage(&mut row_buf, &mut None)
587 {
588 if until.less_equal(&time) {
589 continue;
590 }
591 match (key, val) {
592 (Ok(SourceData(Ok(row))), Ok(())) => {
593 if let Some(mfp) = map_filter_project {
594 *work += 1;
601 let arena = mz_repr::RowArena::new();
602 let mut datums_local = datum_vec.borrow_with(&row);
603 for result in mfp.evaluate(
604 &mut datums_local,
605 &arena,
606 time,
607 diff.into(),
608 |time| !until.less_equal(time),
609 row_builder,
610 ) {
611 if let Some(_stats) = &is_filter_pushdown_audit {
615 sentry::with_scope(
619 |scope| {
620 scope
621 .set_tag("alert_id", "persist_pushdown_audit_violation")
622 },
623 || {
624 panic!(
626 "persist filter pushdown correctness violation! {}",
627 name
628 );
629 },
630 );
631 }
632 match result {
633 Ok((row, time, diff)) => {
634 if !until.less_equal(&time) {
636 let mut emit_time = *self.capability.time();
637 emit_time.0 = time;
638 session.give((Ok(row), emit_time, diff));
639 *work += 1;
640 }
641 }
642 Err((err, time, diff)) => {
643 if !until.less_equal(&time) {
645 let mut emit_time = *self.capability.time();
646 emit_time.0 = time;
647 session.give((Err(err), emit_time, diff));
648 *work += 1;
649 }
650 }
651 }
652 }
653 drop(datums_local);
657 row_buf.replace(SourceData(Ok(row)));
658 } else {
659 let mut emit_time = *self.capability.time();
660 emit_time.0 = time;
661 session.give((Ok(row), emit_time, diff.into()));
662 *work += 1;
663 }
664 }
665 (Ok(SourceData(Err(err))), Ok(())) => {
666 let mut emit_time = *self.capability.time();
667 emit_time.0 = time;
668 session.give((Err(err), emit_time, diff.into()));
669 *work += 1;
670 }
671 (Err(_), Ok(_)) | (Ok(_), Err(_)) | (Err(_), Err(_)) => {
673 panic!("decoding failed")
674 }
675 }
676 if yield_fn(start_time, *work) {
677 return false;
678 }
679 }
680 true
681 }
682}
683
684pub trait Backpressureable: Clone + 'static {
686 fn byte_size(&self) -> usize;
688}
689
690impl Backpressureable for (usize, SerdeLeasedBatchPart) {
691 fn byte_size(&self) -> usize {
692 self.1.encoded_size_bytes()
693 }
694}
695
696#[derive(Debug)]
698pub struct FlowControl<G: Scope> {
699 pub progress_stream: Stream<G, Infallible>,
705 pub max_inflight_bytes: usize,
707 pub summary: <G::Timestamp as TimelyTimestamp>::Summary,
710
711 pub metrics: Option<BackpressureMetrics>,
713}
714
715pub fn backpressure<T, G, O>(
728 scope: &mut G,
729 name: &str,
730 data: &Stream<G, O>,
731 flow_control: FlowControl<G>,
732 chosen_worker: usize,
733 probe: Option<UnboundedSender<(Antichain<(T, Subtime)>, usize, usize)>>,
735) -> (Stream<G, O>, PressOnDropButton)
736where
737 T: TimelyTimestamp + Lattice + Codec64 + TotalOrder,
738 G: Scope<Timestamp = (T, Subtime)>,
739 O: Backpressureable + std::fmt::Debug,
740{
741 let worker_index = scope.index();
742
743 let (flow_control_stream, flow_control_max_bytes, metrics) = (
744 flow_control.progress_stream,
745 flow_control.max_inflight_bytes,
746 flow_control.metrics,
747 );
748
749 let (handle, summaried_flow) = scope.feedback(flow_control.summary.clone());
754 flow_control_stream.connect_loop(handle);
755
756 let mut builder = AsyncOperatorBuilder::new(
757 format!("persist_source_backpressure({})", name),
758 scope.clone(),
759 );
760 let (data_output, data_stream) = builder.new_output();
761
762 let mut data_input = builder.new_disconnected_input(data, Pipeline);
763 let mut flow_control_input = builder.new_disconnected_input(&summaried_flow, Pipeline);
764
765 fn synthesize_frontiers<T: PartialOrder + Clone>(
767 mut frontier: Antichain<(T, Subtime)>,
768 mut time: (T, Subtime),
769 part_number: &mut u64,
770 ) -> (
771 (T, Subtime),
772 Antichain<(T, Subtime)>,
773 Antichain<(T, Subtime)>,
774 ) {
775 let mut next_frontier = frontier.clone();
776 time.1 = Subtime(*part_number);
777 frontier.insert(time.clone());
778 *part_number += 1;
779 let mut next_time = time.clone();
780 next_time.1 = Subtime(*part_number);
781 next_frontier.insert(next_time);
782 (time, frontier, next_frontier)
783 }
784
785 let data_input = async_stream::stream!({
788 let mut part_number = 0;
789 let mut parts: Vec<((T, Subtime), O)> = Vec::new();
790 loop {
791 match data_input.next().await {
792 None => {
793 let empty = Antichain::new();
794 parts.sort_by_key(|val| val.0.clone());
795 for (part_time, d) in parts.drain(..) {
796 let (part_time, frontier, next_frontier) = synthesize_frontiers(
797 empty.clone(),
798 part_time.clone(),
799 &mut part_number,
800 );
801 yield Either::Right((part_time, d, frontier, next_frontier))
802 }
803 break;
804 }
805 Some(Event::Data(time, data)) => {
806 for d in data {
807 parts.push((time.clone(), d));
808 }
809 }
810 Some(Event::Progress(prog)) => {
811 let mut i = 0;
812 parts.sort_by_key(|val| val.0.clone());
813 while i < parts.len() {
816 if !prog.less_equal(&parts[i].0) {
817 let (part_time, d) = parts.remove(i);
818 let (part_time, frontier, next_frontier) = synthesize_frontiers(
819 prog.clone(),
820 part_time.clone(),
821 &mut part_number,
822 );
823 yield Either::Right((part_time, d, frontier, next_frontier))
824 } else {
825 i += 1;
826 }
827 }
828 yield Either::Left(prog)
829 }
830 }
831 }
832 });
833 let shutdown_button = builder.build(move |caps| async move {
834 let mut cap_set = CapabilitySet::from_elem(caps.into_element());
836
837 let mut output_frontier = Antichain::from_elem(TimelyTimestamp::minimum());
839 let mut flow_control_frontier = Antichain::from_elem(TimelyTimestamp::minimum());
841
842 let mut inflight_parts = Vec::new();
844 let mut pending_parts = std::collections::VecDeque::new();
846
847 if worker_index != chosen_worker {
849 trace!(
850 "We are not the chosen worker ({}), exiting...",
851 chosen_worker
852 );
853 return;
854 }
855 tokio::pin!(data_input);
856 'emitting_parts: loop {
857 let inflight_bytes: usize = inflight_parts.iter().map(|(_, size)| size).sum();
860
861 if inflight_bytes < flow_control_max_bytes
869 || !PartialOrder::less_equal(&flow_control_frontier, &output_frontier)
870 {
871 let (time, part, next_frontier) =
872 if let Some((time, part, next_frontier)) = pending_parts.pop_front() {
873 (time, part, next_frontier)
874 } else {
875 match data_input.next().await {
876 Some(Either::Right((time, part, frontier, next_frontier))) => {
877 output_frontier = frontier;
882 cap_set.downgrade(output_frontier.iter());
883
884 if inflight_bytes >= flow_control_max_bytes
889 && !PartialOrder::less_than(
890 &output_frontier,
891 &flow_control_frontier,
892 )
893 {
894 pending_parts.push_back((time, part, next_frontier));
895 continue 'emitting_parts;
896 }
897 (time, part, next_frontier)
898 }
899 Some(Either::Left(prog)) => {
900 output_frontier = prog;
901 cap_set.downgrade(output_frontier.iter());
902 continue 'emitting_parts;
903 }
904 None => {
905 if pending_parts.is_empty() {
906 break 'emitting_parts;
907 } else {
908 continue 'emitting_parts;
909 }
910 }
911 }
912 };
913
914 let byte_size = part.byte_size();
915 if let Some(emission_ts) = flow_control.summary.results_in(&time) {
925 inflight_parts.push((emission_ts, byte_size));
926 }
927
928 data_output.give(&cap_set.delayed(&time), part);
931
932 if let Some(metrics) = &metrics {
933 metrics.emitted_bytes.inc_by(u64::cast_from(byte_size))
934 }
935
936 output_frontier = next_frontier;
937 cap_set.downgrade(output_frontier.iter())
938 } else {
939 if let Some(metrics) = &metrics {
940 metrics
941 .last_backpressured_bytes
942 .set(u64::cast_from(inflight_bytes))
943 }
944 let parts_count = inflight_parts.len();
945 let new_flow_control_frontier = match flow_control_input.next().await {
950 Some(Event::Progress(frontier)) => frontier,
951 Some(Event::Data(_, _)) => {
952 unreachable!("flow_control_input should not contain data")
953 }
954 None => Antichain::new(),
955 };
956
957 flow_control_frontier.clone_from(&new_flow_control_frontier);
959
960 let retired_parts = inflight_parts
962 .drain_filter_swapping(|(ts, _size)| !flow_control_frontier.less_equal(ts));
963 let (retired_size, retired_count): (usize, usize) = retired_parts
964 .fold((0, 0), |(accum_size, accum_count), (_ts, size)| {
965 (accum_size + size, accum_count + 1)
966 });
967 trace!(
968 "returning {} parts with {} bytes, frontier: {:?}",
969 retired_count, retired_size, flow_control_frontier,
970 );
971
972 if let Some(metrics) = &metrics {
973 metrics.retired_bytes.inc_by(u64::cast_from(retired_size))
974 }
975
976 if let Some(probe) = probe.as_ref() {
978 let _ = probe.send((new_flow_control_frontier, parts_count, retired_count));
979 }
980 }
981 }
982 });
983 (data_stream, shutdown_button.press_on_drop())
984}
985
986#[cfg(test)]
987mod tests {
988 use timely::container::CapacityContainerBuilder;
989 use timely::dataflow::operators::{Enter, Probe};
990 use tokio::sync::mpsc::unbounded_channel;
991 use tokio::sync::oneshot;
992
993 use super::*;
994
995 #[mz_ore::test]
996 fn test_backpressure_non_granular() {
997 use Step::*;
998 backpressure_runner(
999 vec![(50, Part(101)), (50, Part(102)), (100, Part(1))],
1000 100,
1001 (1, Subtime(0)),
1002 vec![
1003 AssertOutputFrontier((50, Subtime(2))),
1006 AssertBackpressured {
1007 frontier: (1, Subtime(0)),
1008 inflight_parts: 1,
1009 retired_parts: 0,
1010 },
1011 AssertBackpressured {
1012 frontier: (51, Subtime(0)),
1013 inflight_parts: 1,
1014 retired_parts: 0,
1015 },
1016 ProcessXParts(2),
1017 AssertBackpressured {
1018 frontier: (101, Subtime(0)),
1019 inflight_parts: 2,
1020 retired_parts: 2,
1021 },
1022 AssertOutputFrontier((100, Subtime(3))),
1025 ],
1026 true,
1027 );
1028
1029 backpressure_runner(
1030 vec![
1031 (50, Part(10)),
1032 (50, Part(10)),
1033 (51, Part(100)),
1034 (52, Part(1000)),
1035 ],
1036 50,
1037 (1, Subtime(0)),
1038 vec![
1039 AssertOutputFrontier((51, Subtime(3))),
1041 AssertBackpressured {
1042 frontier: (1, Subtime(0)),
1043 inflight_parts: 3,
1044 retired_parts: 0,
1045 },
1046 ProcessXParts(3),
1047 AssertBackpressured {
1048 frontier: (52, Subtime(0)),
1049 inflight_parts: 3,
1050 retired_parts: 2,
1051 },
1052 AssertBackpressured {
1053 frontier: (53, Subtime(0)),
1054 inflight_parts: 1,
1055 retired_parts: 1,
1056 },
1057 AssertOutputFrontier((52, Subtime(4))),
1060 ],
1061 true,
1062 );
1063
1064 backpressure_runner(
1065 vec![
1066 (50, Part(98)),
1067 (50, Part(1)),
1068 (51, Part(10)),
1069 (52, Part(100)),
1070 (52, Part(10)),
1072 (52, Part(10)),
1073 (52, Part(10)),
1074 (52, Part(100)),
1075 (100, Part(100)),
1077 ],
1078 100,
1079 (1, Subtime(0)),
1080 vec![
1081 AssertOutputFrontier((51, Subtime(3))),
1082 AssertBackpressured {
1086 frontier: (1, Subtime(0)),
1087 inflight_parts: 3,
1088 retired_parts: 0,
1089 },
1090 AssertBackpressured {
1091 frontier: (51, Subtime(0)),
1092 inflight_parts: 3,
1093 retired_parts: 0,
1094 },
1095 ProcessXParts(1),
1096 AssertOutputFrontier((51, Subtime(3))),
1099 ProcessXParts(1),
1103 AssertOutputFrontier((52, Subtime(4))),
1104 AssertBackpressured {
1105 frontier: (52, Subtime(0)),
1106 inflight_parts: 3,
1107 retired_parts: 2,
1108 },
1109 ProcessXParts(1),
1113 AssertBackpressured {
1117 frontier: (53, Subtime(0)),
1118 inflight_parts: 2,
1119 retired_parts: 1,
1120 },
1121 ProcessXParts(5),
1123 AssertBackpressured {
1124 frontier: (101, Subtime(0)),
1125 inflight_parts: 5,
1126 retired_parts: 5,
1127 },
1128 AssertOutputFrontier((100, Subtime(9))),
1129 ],
1130 true,
1131 );
1132 }
1133
1134 #[mz_ore::test]
1135 fn test_backpressure_granular() {
1136 use Step::*;
1137 backpressure_runner(
1138 vec![(50, Part(101)), (50, Part(101))],
1139 100,
1140 (0, Subtime(1)),
1141 vec![
1142 AssertOutputFrontier((50, Subtime(1))),
1144 AssertBackpressured {
1147 frontier: (0, Subtime(1)),
1148 inflight_parts: 1,
1149 retired_parts: 0,
1150 },
1151 AssertBackpressured {
1152 frontier: (50, Subtime(1)),
1153 inflight_parts: 1,
1154 retired_parts: 0,
1155 },
1156 ProcessXParts(1),
1158 AssertBackpressured {
1160 frontier: (50, Subtime(2)),
1161 inflight_parts: 1,
1162 retired_parts: 1,
1163 },
1164 AssertOutputFrontier((50, Subtime(2))),
1166 ],
1167 false,
1168 );
1169
1170 backpressure_runner(
1171 vec![
1172 (50, Part(10)),
1173 (50, Part(10)),
1174 (51, Part(35)),
1175 (52, Part(100)),
1176 ],
1177 50,
1178 (0, Subtime(1)),
1179 vec![
1180 AssertOutputFrontier((51, Subtime(3))),
1182 AssertBackpressured {
1183 frontier: (0, Subtime(1)),
1184 inflight_parts: 3,
1185 retired_parts: 0,
1186 },
1187 AssertBackpressured {
1188 frontier: (50, Subtime(1)),
1189 inflight_parts: 3,
1190 retired_parts: 0,
1191 },
1192 ProcessXParts(1),
1194 AssertBackpressured {
1195 frontier: (50, Subtime(2)),
1196 inflight_parts: 3,
1197 retired_parts: 1,
1198 },
1199 AssertOutputFrontier((52, Subtime(4))),
1202 ProcessXParts(2),
1203 AssertBackpressured {
1204 frontier: (52, Subtime(4)),
1205 inflight_parts: 3,
1206 retired_parts: 2,
1207 },
1208 ],
1209 false,
1210 );
1211 }
1212
1213 type Time = (u64, Subtime);
1214 #[derive(Clone, Debug)]
1215 struct Part(usize);
1216 impl Backpressureable for Part {
1217 fn byte_size(&self) -> usize {
1218 self.0
1219 }
1220 }
1221
1222 enum Step {
1224 AssertOutputFrontier(Time),
1227 AssertBackpressured {
1231 frontier: Time,
1232 inflight_parts: usize,
1233 retired_parts: usize,
1234 },
1235 ProcessXParts(usize),
1237 }
1238
1239 fn backpressure_runner(
1241 input: Vec<(u64, Part)>,
1243 max_inflight_bytes: usize,
1245 summary: Time,
1247 steps: Vec<Step>,
1249 non_granular_consumer: bool,
1252 ) {
1253 timely::execute::execute_directly(move |worker| {
1254 let (backpressure_probe, consumer_tx, mut backpressure_status_rx, finalizer_tx, _token) =
1255 worker.dataflow::<u64, _, _>(|scope| {
1257 let (non_granular_feedback_handle, non_granular_feedback) =
1258 if non_granular_consumer {
1259 let (h, f) = scope.feedback(Default::default());
1260 (Some(h), Some(f))
1261 } else {
1262 (None, None)
1263 };
1264 let (
1265 backpressure_probe,
1266 consumer_tx,
1267 backpressure_status_rx,
1268 token,
1269 backpressured,
1270 finalizer_tx,
1271 ) = scope.scoped::<(u64, Subtime), _, _>("hybrid", |scope| {
1272 let (input, finalizer_tx) =
1273 iterator_operator(scope.clone(), input.into_iter());
1274
1275 let (flow_control, granular_feedback_handle) = if non_granular_consumer {
1276 (
1277 FlowControl {
1278 progress_stream: non_granular_feedback.unwrap().enter(scope),
1279 max_inflight_bytes,
1280 summary,
1281 metrics: None
1282 },
1283 None,
1284 )
1285 } else {
1286 let (granular_feedback_handle, granular_feedback) =
1287 scope.feedback(Default::default());
1288 (
1289 FlowControl {
1290 progress_stream: granular_feedback,
1291 max_inflight_bytes,
1292 summary,
1293 metrics: None,
1294 },
1295 Some(granular_feedback_handle),
1296 )
1297 };
1298
1299 let (backpressure_status_tx, backpressure_status_rx) = unbounded_channel();
1300
1301 let (backpressured, token) = backpressure(
1302 scope,
1303 "test",
1304 &input,
1305 flow_control,
1306 0,
1307 Some(backpressure_status_tx),
1308 );
1309
1310 let tx = if !non_granular_consumer {
1312 Some(consumer_operator(
1313 scope.clone(),
1314 &backpressured,
1315 granular_feedback_handle.unwrap(),
1316 ))
1317 } else {
1318 None
1319 };
1320
1321 (
1322 backpressured.probe(),
1323 tx,
1324 backpressure_status_rx,
1325 token,
1326 backpressured.leave(),
1327 finalizer_tx,
1328 )
1329 });
1330
1331 let consumer_tx = if non_granular_consumer {
1333 consumer_operator(
1334 scope.clone(),
1335 &backpressured,
1336 non_granular_feedback_handle.unwrap(),
1337 )
1338 } else {
1339 consumer_tx.unwrap()
1340 };
1341
1342 (
1343 backpressure_probe,
1344 consumer_tx,
1345 backpressure_status_rx,
1346 finalizer_tx,
1347 token,
1348 )
1349 });
1350
1351 use Step::*;
1352 for step in steps {
1353 match step {
1354 AssertOutputFrontier(time) => {
1355 eprintln!("checking advance to {time:?}");
1356 backpressure_probe.with_frontier(|front| {
1357 eprintln!("current backpressure output frontier: {front:?}");
1358 });
1359 while backpressure_probe.less_than(&time) {
1360 worker.step();
1361 backpressure_probe.with_frontier(|front| {
1362 eprintln!("current backpressure output frontier: {front:?}");
1363 });
1364 std::thread::sleep(std::time::Duration::from_millis(25));
1365 }
1366 }
1367 ProcessXParts(parts) => {
1368 eprintln!("processing {parts:?} parts");
1369 for _ in 0..parts {
1370 consumer_tx.send(()).unwrap();
1371 }
1372 }
1373 AssertBackpressured {
1374 frontier,
1375 inflight_parts,
1376 retired_parts,
1377 } => {
1378 let frontier = Antichain::from_elem(frontier);
1379 eprintln!(
1380 "asserting backpressured at {frontier:?}, with {inflight_parts:?} inflight parts \
1381 and {retired_parts:?} retired"
1382 );
1383 let (new_frontier, new_count, new_retired_count) = loop {
1384 if let Ok(val) = backpressure_status_rx.try_recv() {
1385 break val;
1386 }
1387 worker.step();
1388 std::thread::sleep(std::time::Duration::from_millis(25));
1389 };
1390 assert_eq!(
1391 (frontier, inflight_parts, retired_parts),
1392 (new_frontier, new_count, new_retired_count)
1393 );
1394 }
1395 }
1396 }
1397 let _ = finalizer_tx.send(());
1399 });
1400 }
1401
1402 fn iterator_operator<
1405 G: Scope<Timestamp = (u64, Subtime)>,
1406 I: Iterator<Item = (u64, Part)> + 'static,
1407 >(
1408 scope: G,
1409 mut input: I,
1410 ) -> (Stream<G, Part>, oneshot::Sender<()>) {
1411 let (finalizer_tx, finalizer_rx) = oneshot::channel();
1412 let mut iterator = AsyncOperatorBuilder::new("iterator".to_string(), scope);
1413 let (output_handle, output) = iterator.new_output::<CapacityContainerBuilder<Vec<Part>>>();
1414
1415 iterator.build(|mut caps| async move {
1416 let mut capability = Some(caps.pop().unwrap());
1417 let mut last = None;
1418 while let Some(element) = input.next() {
1419 let time = element.0.clone();
1420 let part = element.1;
1421 last = Some((time, Subtime(0)));
1422 output_handle.give(&capability.as_ref().unwrap().delayed(&last.unwrap()), part);
1423 }
1424 if let Some(last) = last {
1425 capability
1426 .as_mut()
1427 .unwrap()
1428 .downgrade(&(last.0 + 1, last.1));
1429 }
1430
1431 let _ = finalizer_rx.await;
1432 capability.take();
1433 });
1434
1435 (output, finalizer_tx)
1436 }
1437
1438 fn consumer_operator<G: Scope, O: Backpressureable + std::fmt::Debug>(
1442 scope: G,
1443 input: &Stream<G, O>,
1444 feedback: timely::dataflow::operators::feedback::Handle<G, Vec<std::convert::Infallible>>,
1445 ) -> UnboundedSender<()> {
1446 let (tx, mut rx) = unbounded_channel::<()>();
1447 let mut consumer = AsyncOperatorBuilder::new("consumer".to_string(), scope);
1448 let (output_handle, output) =
1449 consumer.new_output::<CapacityContainerBuilder<Vec<std::convert::Infallible>>>();
1450 let mut input = consumer.new_input_for(input, Pipeline, &output_handle);
1451
1452 consumer.build(|_caps| async move {
1453 while let Some(()) = rx.recv().await {
1454 while let Some(Event::Progress(_)) = input.next().await {}
1456 }
1457 });
1458 output.connect_loop(feedback);
1459
1460 tx
1461 }
1462}