1use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
13use mz_dyncfg::ConfigSet;
14use std::convert::Infallible;
15use std::fmt::Debug;
16use std::future::Future;
17use std::hash::Hash;
18use std::sync::Arc;
19use std::time::Instant;
20
21use differential_dataflow::lattice::Lattice;
22use futures::{StreamExt, future::Either};
23use mz_expr::{ColumnSpecs, Interpreter, MfpPlan, ResultSpec, UnmaterializableFunc};
24use mz_ore::cast::CastFrom;
25use mz_ore::collections::CollectionExt;
26use mz_persist_client::cache::PersistClientCache;
27use mz_persist_client::cfg::{PersistConfig, RetryParameters};
28use mz_persist_client::fetch::{ExchangeableBatchPart, ShardSourcePart};
29use mz_persist_client::fetch::{FetchedBlob, FetchedPart};
30use mz_persist_client::operators::shard_source::{
31 ErrorHandler, FilterResult, SnapshotMode, shard_source,
32};
33use mz_persist_types::Codec64;
34use mz_persist_types::codec_impls::UnitSchema;
35use mz_persist_types::columnar::{ColumnEncoder, Schema};
36use mz_repr::{Datum, DatumVec, Diff, GlobalId, RelationDesc, Row, RowArena, Timestamp};
37use mz_storage_types::StorageDiff;
38use mz_storage_types::controller::{CollectionMetadata, TxnsCodecRow};
39use mz_storage_types::errors::DataflowError;
40use mz_storage_types::sources::SourceData;
41use mz_storage_types::stats::RelationPartStats;
42use mz_timely_util::builder_async::{
43 Event, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
44};
45use mz_timely_util::probe::ProbeNotify;
46use mz_txn_wal::operator::{TxnsContext, txns_progress};
47use serde::{Deserialize, Serialize};
48use timely::PartialOrder;
49use timely::communication::Push;
50use timely::dataflow::ScopeParent;
51use timely::dataflow::channels::Message;
52use timely::dataflow::channels::pact::Pipeline;
53use timely::dataflow::operators::generic::OutputHandleCore;
54use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
55use timely::dataflow::operators::{Capability, Leave, OkErr};
56use timely::dataflow::operators::{CapabilitySet, ConnectLoop, Feedback};
57use timely::dataflow::scopes::Child;
58use timely::dataflow::{Scope, Stream};
59use timely::order::TotalOrder;
60use timely::progress::Antichain;
61use timely::progress::Timestamp as TimelyTimestamp;
62use timely::progress::timestamp::PathSummary;
63use timely::scheduling::Activator;
64use tokio::sync::mpsc::UnboundedSender;
65use tracing::trace;
66
67use crate::metrics::BackpressureMetrics;
68
69#[derive(
77 Copy, Clone, PartialEq, Default, Eq, PartialOrd, Ord, Debug, Serialize, Deserialize, Hash,
78)]
79pub struct Subtime(u64);
80
81impl PartialOrder for Subtime {
82 fn less_equal(&self, other: &Self) -> bool {
83 self.0.less_equal(&other.0)
84 }
85}
86
87impl TotalOrder for Subtime {}
88
89impl PathSummary<Subtime> for Subtime {
90 fn results_in(&self, src: &Subtime) -> Option<Subtime> {
91 self.0.results_in(&src.0).map(Subtime)
92 }
93
94 fn followed_by(&self, other: &Self) -> Option<Self> {
95 self.0.followed_by(&other.0).map(Subtime)
96 }
97}
98
99impl TimelyTimestamp for Subtime {
100 type Summary = Subtime;
101
102 fn minimum() -> Self {
103 Subtime(0)
104 }
105}
106
107impl Subtime {
108 pub const fn least_summary() -> Self {
110 Subtime(1)
111 }
112}
113
114pub fn persist_source<G>(
137 scope: &mut G,
138 source_id: GlobalId,
139 persist_clients: Arc<PersistClientCache>,
140 txns_ctx: &TxnsContext,
141 worker_dyncfgs: &ConfigSet,
144 metadata: CollectionMetadata,
145 read_schema: Option<RelationDesc>,
146 as_of: Option<Antichain<Timestamp>>,
147 snapshot_mode: SnapshotMode,
148 until: Antichain<Timestamp>,
149 map_filter_project: Option<&mut MfpPlan>,
150 max_inflight_bytes: Option<usize>,
151 start_signal: impl Future<Output = ()> + 'static,
152 error_handler: ErrorHandler,
153) -> (
154 Stream<G, (Row, Timestamp, Diff)>,
155 Stream<G, (DataflowError, Timestamp, Diff)>,
156 Vec<PressOnDropButton>,
157)
158where
159 G: Scope<Timestamp = mz_repr::Timestamp>,
160{
161 let shard_metrics = persist_clients.shard_metrics(&metadata.data_shard, &source_id.to_string());
162
163 let mut tokens = vec![];
164
165 let stream = scope.scoped(&format!("granular_backpressure({})", source_id), |scope| {
166 let (flow_control, flow_control_probe) = match max_inflight_bytes {
167 Some(max_inflight_bytes) => {
168 let backpressure_metrics = BackpressureMetrics {
169 emitted_bytes: Arc::clone(&shard_metrics.backpressure_emitted_bytes),
170 last_backpressured_bytes: Arc::clone(
171 &shard_metrics.backpressure_last_backpressured_bytes,
172 ),
173 retired_bytes: Arc::clone(&shard_metrics.backpressure_retired_bytes),
174 };
175
176 let probe = mz_timely_util::probe::Handle::default();
177 let progress_stream = mz_timely_util::probe::source(
178 scope.clone(),
179 format!("decode_backpressure_probe({source_id})"),
180 probe.clone(),
181 );
182 let flow_control = FlowControl {
183 progress_stream,
184 max_inflight_bytes,
185 summary: (Default::default(), Subtime::least_summary()),
186 metrics: Some(backpressure_metrics),
187 };
188 (Some(flow_control), Some(probe))
189 }
190 None => (None, None),
191 };
192
193 let cfg = Arc::clone(&persist_clients.cfg().configs);
199 let subscribe_sleep = match metadata.txns_shard {
200 Some(_) => Some(move || mz_txn_wal::operator::txns_data_shard_retry_params(&cfg)),
201 None => None,
202 };
203
204 let (stream, source_tokens) = persist_source_core(
205 scope,
206 source_id,
207 Arc::clone(&persist_clients),
208 metadata.clone(),
209 read_schema,
210 as_of.clone(),
211 snapshot_mode,
212 until.clone(),
213 map_filter_project,
214 flow_control,
215 subscribe_sleep,
216 start_signal,
217 error_handler,
218 );
219 tokens.extend(source_tokens);
220
221 let stream = match flow_control_probe {
222 Some(probe) => stream.probe_notify_with(vec![probe]),
223 None => stream,
224 };
225
226 stream.leave()
227 });
228
229 let (stream, txns_tokens) = match metadata.txns_shard {
234 Some(txns_shard) => txns_progress::<SourceData, (), Timestamp, i64, _, TxnsCodecRow, _, _>(
235 stream,
236 &source_id.to_string(),
237 txns_ctx,
238 worker_dyncfgs,
239 move || {
240 let (c, l) = (
241 Arc::clone(&persist_clients),
242 metadata.persist_location.clone(),
243 );
244 async move { c.open(l).await.expect("location is valid") }
245 },
246 txns_shard,
247 metadata.data_shard,
248 as_of
249 .expect("as_of is provided for table sources")
250 .into_option()
251 .expect("shard is not closed"),
252 until,
253 Arc::new(metadata.relation_desc),
254 Arc::new(UnitSchema),
255 ),
256 None => (stream, vec![]),
257 };
258 tokens.extend(txns_tokens);
259 let (ok_stream, err_stream) = stream.ok_err(|(d, t, r)| match d {
260 Ok(row) => Ok((row, t.0, r)),
261 Err(err) => Err((err, t.0, r)),
262 });
263 (ok_stream, err_stream, tokens)
264}
265
266type RefinedScope<'g, G> = Child<'g, G, (<G as ScopeParent>::Timestamp, Subtime)>;
267
268#[allow(clippy::needless_borrow)]
275pub fn persist_source_core<'g, G>(
276 scope: &RefinedScope<'g, G>,
277 source_id: GlobalId,
278 persist_clients: Arc<PersistClientCache>,
279 metadata: CollectionMetadata,
280 read_schema: Option<RelationDesc>,
281 as_of: Option<Antichain<Timestamp>>,
282 snapshot_mode: SnapshotMode,
283 until: Antichain<Timestamp>,
284 map_filter_project: Option<&mut MfpPlan>,
285 flow_control: Option<FlowControl<RefinedScope<'g, G>>>,
286 listen_sleep: Option<impl Fn() -> RetryParameters + 'static>,
288 start_signal: impl Future<Output = ()> + 'static,
289 error_handler: ErrorHandler,
290) -> (
291 Stream<
292 RefinedScope<'g, G>,
293 (
294 Result<Row, DataflowError>,
295 (mz_repr::Timestamp, Subtime),
296 Diff,
297 ),
298 >,
299 Vec<PressOnDropButton>,
300)
301where
302 G: Scope<Timestamp = mz_repr::Timestamp>,
303{
304 let cfg = persist_clients.cfg().clone();
305 let name = source_id.to_string();
306 let filter_plan = map_filter_project.as_ref().map(|p| (*p).clone());
307
308 let read_desc = match read_schema {
310 Some(desc) => desc,
311 None => metadata.relation_desc,
312 };
313
314 let desc_transformer = match flow_control {
315 Some(flow_control) => Some(move |mut scope: _, descs: &Stream<_, _>, chosen_worker| {
316 let (stream, token) = backpressure(
317 &mut scope,
318 &format!("backpressure({source_id})"),
319 descs,
320 flow_control,
321 chosen_worker,
322 None,
323 );
324 (stream, vec![token])
325 }),
326 None => None,
327 };
328
329 let metrics = Arc::clone(persist_clients.metrics());
330 let filter_name = name.clone();
331 let upper = until.as_option().cloned().unwrap_or(Timestamp::MAX);
335 let (fetched, token) = shard_source(
336 &mut scope.clone(),
337 &name,
338 move || {
339 let (c, l) = (
340 Arc::clone(&persist_clients),
341 metadata.persist_location.clone(),
342 );
343 async move { c.open(l).await.unwrap() }
344 },
345 metadata.data_shard,
346 as_of,
347 snapshot_mode,
348 until.clone(),
349 desc_transformer,
350 Arc::new(read_desc.clone()),
351 Arc::new(UnitSchema),
352 move |stats, frontier| {
353 let Some(lower) = frontier.as_option().copied() else {
354 return FilterResult::Discard;
357 };
358
359 if lower > upper {
360 return FilterResult::Discard;
363 }
364
365 let time_range =
366 ResultSpec::value_between(Datum::MzTimestamp(lower), Datum::MzTimestamp(upper));
367 if let Some(plan) = &filter_plan {
368 let metrics = &metrics.pushdown.part_stats;
369 let stats = RelationPartStats::new(&filter_name, metrics, &read_desc, stats);
370 filter_result(&read_desc, time_range, stats, plan)
371 } else {
372 FilterResult::Keep
373 }
374 },
375 listen_sleep,
376 start_signal,
377 error_handler,
378 );
379 let rows = decode_and_mfp(cfg, &fetched, &name, until, map_filter_project);
380 (rows, token)
381}
382
383fn filter_result(
384 relation_desc: &RelationDesc,
385 time_range: ResultSpec,
386 stats: RelationPartStats,
387 plan: &MfpPlan,
388) -> FilterResult {
389 let arena = RowArena::new();
390 let mut ranges = ColumnSpecs::new(relation_desc.typ(), &arena);
391 ranges.push_unmaterializable(UnmaterializableFunc::MzNow, time_range);
392
393 let may_error = stats.err_count().map_or(true, |count| count > 0);
394
395 for (pos, (idx, _, _)) in relation_desc.iter_all().enumerate() {
398 let result_spec = stats.col_stats(idx, &arena);
399 ranges.push_column(pos, result_spec);
400 }
401 let result = ranges.mfp_plan_filter(plan).range;
402 let may_error = may_error || result.may_fail();
403 let may_keep = result.may_contain(Datum::True);
404 let may_skip = result.may_contain(Datum::False) || result.may_contain(Datum::Null);
405 if relation_desc.len() == 0 && !may_error && !may_skip {
406 let Ok(mut key) = <RelationDesc as Schema<SourceData>>::encoder(relation_desc) else {
407 return FilterResult::Keep;
408 };
409 key.append(&SourceData(Ok(Row::default())));
410 let key = key.finish();
411 let Ok(mut val) = <UnitSchema as Schema<()>>::encoder(&UnitSchema) else {
412 return FilterResult::Keep;
413 };
414 val.append(&());
415 let val = val.finish();
416
417 FilterResult::ReplaceWith {
418 key: Arc::new(key),
419 val: Arc::new(val),
420 }
421 } else if may_error || may_keep {
422 FilterResult::Keep
423 } else {
424 FilterResult::Discard
425 }
426}
427
428pub fn decode_and_mfp<G>(
429 cfg: PersistConfig,
430 fetched: &Stream<G, FetchedBlob<SourceData, (), Timestamp, StorageDiff>>,
431 name: &str,
432 until: Antichain<Timestamp>,
433 mut map_filter_project: Option<&mut MfpPlan>,
434) -> Stream<G, (Result<Row, DataflowError>, G::Timestamp, Diff)>
435where
436 G: Scope<Timestamp = (mz_repr::Timestamp, Subtime)>,
437{
438 let scope = fetched.scope();
439 let mut builder = OperatorBuilder::new(
440 format!("persist_source::decode_and_mfp({})", name),
441 scope.clone(),
442 );
443 let operator_info = builder.operator_info();
444
445 let mut fetched_input = builder.new_input(fetched, Pipeline);
446 let (mut updates_output, updates_stream) =
447 builder.new_output::<ConsolidatingContainerBuilder<_>>();
448
449 let mut datum_vec = mz_repr::DatumVec::new();
451 let mut row_builder = Row::default();
452
453 let map_filter_project = map_filter_project.as_mut().map(|mfp| mfp.take());
455
456 builder.build(move |_caps| {
457 let name = name.to_owned();
458 let activations = scope.activations();
460 let activator = Activator::new(operator_info.address, activations);
461 let mut pending_work = std::collections::VecDeque::new();
463
464 move |_frontier| {
465 fetched_input.for_each(|time, data| {
466 let capability = time.retain();
467 for fetched_blob in data.drain(..) {
468 pending_work.push_back(PendingWork {
469 capability: capability.clone(),
470 part: PendingPart::Unparsed(fetched_blob),
471 })
472 }
473 });
474
475 let yield_fuel = cfg.storage_source_decode_fuel();
478 let yield_fn = |_, work| work >= yield_fuel;
479
480 let mut work = 0;
481 let start_time = Instant::now();
482 let mut output = updates_output.activate();
483 while !pending_work.is_empty() && !yield_fn(start_time, work) {
484 let done = pending_work.front_mut().unwrap().do_work(
485 &mut work,
486 &name,
487 start_time,
488 yield_fn,
489 &until,
490 map_filter_project.as_ref(),
491 &mut datum_vec,
492 &mut row_builder,
493 &mut output,
494 );
495 if done {
496 pending_work.pop_front();
497 }
498 }
499 if !pending_work.is_empty() {
500 activator.activate();
501 }
502 }
503 });
504
505 updates_stream
506}
507
508struct PendingWork {
510 capability: Capability<(mz_repr::Timestamp, Subtime)>,
512 part: PendingPart,
514}
515
516enum PendingPart {
517 Unparsed(FetchedBlob<SourceData, (), Timestamp, StorageDiff>),
518 Parsed {
519 part: ShardSourcePart<SourceData, (), Timestamp, StorageDiff>,
520 },
521}
522
523impl PendingPart {
524 fn part_mut(&mut self) -> &mut FetchedPart<SourceData, (), Timestamp, StorageDiff> {
531 match self {
532 PendingPart::Unparsed(x) => {
533 *self = PendingPart::Parsed { part: x.parse() };
534 self.part_mut()
536 }
537 PendingPart::Parsed { part } => &mut part.part,
538 }
539 }
540}
541
542impl PendingWork {
543 fn do_work<P, YFn>(
546 &mut self,
547 work: &mut usize,
548 name: &str,
549 start_time: Instant,
550 yield_fn: YFn,
551 until: &Antichain<Timestamp>,
552 map_filter_project: Option<&MfpPlan>,
553 datum_vec: &mut DatumVec,
554 row_builder: &mut Row,
555 output: &mut OutputHandleCore<
556 '_,
557 (mz_repr::Timestamp, Subtime),
558 ConsolidatingContainerBuilder<
559 Vec<(
560 Result<Row, DataflowError>,
561 (mz_repr::Timestamp, Subtime),
562 Diff,
563 )>,
564 >,
565 P,
566 >,
567 ) -> bool
568 where
569 P: Push<
570 Message<
571 (mz_repr::Timestamp, Subtime),
572 Vec<(
573 Result<Row, DataflowError>,
574 (mz_repr::Timestamp, Subtime),
575 Diff,
576 )>,
577 >,
578 >,
579 YFn: Fn(Instant, usize) -> bool,
580 {
581 let mut session = output.session_with_builder(&self.capability);
582 let fetched_part = self.part.part_mut();
583 let is_filter_pushdown_audit = fetched_part.is_filter_pushdown_audit();
584 let mut row_buf = None;
585 while let Some(((key, val), time, diff)) =
586 fetched_part.next_with_storage(&mut row_buf, &mut None)
587 {
588 if until.less_equal(&time) {
589 continue;
590 }
591 match (key, val) {
592 (Ok(SourceData(Ok(row))), Ok(())) => {
593 if let Some(mfp) = map_filter_project {
594 *work += 1;
601 let arena = mz_repr::RowArena::new();
602 let mut datums_local = datum_vec.borrow_with(&row);
603 for result in mfp.evaluate(
604 &mut datums_local,
605 &arena,
606 time,
607 diff.into(),
608 |time| !until.less_equal(time),
609 row_builder,
610 ) {
611 if let Some(_stats) = &is_filter_pushdown_audit {
615 sentry::with_scope(
619 |scope| {
620 scope
621 .set_tag("alert_id", "persist_pushdown_audit_violation")
622 },
623 || {
624 panic!(
626 "persist filter pushdown correctness violation! {}",
627 name
628 );
629 },
630 );
631 }
632 match result {
633 Ok((row, time, diff)) => {
634 if !until.less_equal(&time) {
636 let mut emit_time = *self.capability.time();
637 emit_time.0 = time;
638 session.give((Ok(row), emit_time, diff));
639 *work += 1;
640 }
641 }
642 Err((err, time, diff)) => {
643 if !until.less_equal(&time) {
645 let mut emit_time = *self.capability.time();
646 emit_time.0 = time;
647 session.give((Err(err), emit_time, diff));
648 *work += 1;
649 }
650 }
651 }
652 }
653 drop(datums_local);
657 row_buf.replace(SourceData(Ok(row)));
658 } else {
659 let mut emit_time = *self.capability.time();
660 emit_time.0 = time;
661 session.give((Ok(row.clone()), emit_time, diff.into()));
663 row_buf.replace(SourceData(Ok(row)));
664 *work += 1;
665 }
666 }
667 (Ok(SourceData(Err(err))), Ok(())) => {
668 let mut emit_time = *self.capability.time();
669 emit_time.0 = time;
670 session.give((Err(err), emit_time, diff.into()));
671 *work += 1;
672 }
673 (Err(_), Ok(_)) | (Ok(_), Err(_)) | (Err(_), Err(_)) => {
675 panic!("decoding failed")
676 }
677 }
678 if yield_fn(start_time, *work) {
679 return false;
680 }
681 }
682 true
683 }
684}
685
686pub trait Backpressureable: Clone + 'static {
688 fn byte_size(&self) -> usize;
690}
691
692impl<T: Clone + 'static> Backpressureable for (usize, ExchangeableBatchPart<T>) {
693 fn byte_size(&self) -> usize {
694 self.1.encoded_size_bytes()
695 }
696}
697
698#[derive(Debug)]
700pub struct FlowControl<G: Scope> {
701 pub progress_stream: Stream<G, Infallible>,
707 pub max_inflight_bytes: usize,
709 pub summary: <G::Timestamp as TimelyTimestamp>::Summary,
712
713 pub metrics: Option<BackpressureMetrics>,
715}
716
717pub fn backpressure<T, G, O>(
730 scope: &mut G,
731 name: &str,
732 data: &Stream<G, O>,
733 flow_control: FlowControl<G>,
734 chosen_worker: usize,
735 probe: Option<UnboundedSender<(Antichain<(T, Subtime)>, usize, usize)>>,
737) -> (Stream<G, O>, PressOnDropButton)
738where
739 T: TimelyTimestamp + Lattice + Codec64 + TotalOrder,
740 G: Scope<Timestamp = (T, Subtime)>,
741 O: Backpressureable + std::fmt::Debug,
742{
743 let worker_index = scope.index();
744
745 let (flow_control_stream, flow_control_max_bytes, metrics) = (
746 flow_control.progress_stream,
747 flow_control.max_inflight_bytes,
748 flow_control.metrics,
749 );
750
751 let (handle, summaried_flow) = scope.feedback(flow_control.summary.clone());
756 flow_control_stream.connect_loop(handle);
757
758 let mut builder = AsyncOperatorBuilder::new(
759 format!("persist_source_backpressure({})", name),
760 scope.clone(),
761 );
762 let (data_output, data_stream) = builder.new_output();
763
764 let mut data_input = builder.new_disconnected_input(data, Pipeline);
765 let mut flow_control_input = builder.new_disconnected_input(&summaried_flow, Pipeline);
766
767 fn synthesize_frontiers<T: PartialOrder + Clone>(
769 mut frontier: Antichain<(T, Subtime)>,
770 mut time: (T, Subtime),
771 part_number: &mut u64,
772 ) -> (
773 (T, Subtime),
774 Antichain<(T, Subtime)>,
775 Antichain<(T, Subtime)>,
776 ) {
777 let mut next_frontier = frontier.clone();
778 time.1 = Subtime(*part_number);
779 frontier.insert(time.clone());
780 *part_number += 1;
781 let mut next_time = time.clone();
782 next_time.1 = Subtime(*part_number);
783 next_frontier.insert(next_time);
784 (time, frontier, next_frontier)
785 }
786
787 let data_input = async_stream::stream!({
790 let mut part_number = 0;
791 let mut parts: Vec<((T, Subtime), O)> = Vec::new();
792 loop {
793 match data_input.next().await {
794 None => {
795 let empty = Antichain::new();
796 parts.sort_by_key(|val| val.0.clone());
797 for (part_time, d) in parts.drain(..) {
798 let (part_time, frontier, next_frontier) = synthesize_frontiers(
799 empty.clone(),
800 part_time.clone(),
801 &mut part_number,
802 );
803 yield Either::Right((part_time, d, frontier, next_frontier))
804 }
805 break;
806 }
807 Some(Event::Data(time, data)) => {
808 for d in data {
809 parts.push((time.clone(), d));
810 }
811 }
812 Some(Event::Progress(prog)) => {
813 parts.sort_by_key(|val| val.0.clone());
814 for (part_time, d) in parts.extract_if(.., |p| !prog.less_equal(&p.0)) {
815 let (part_time, frontier, next_frontier) =
816 synthesize_frontiers(prog.clone(), part_time.clone(), &mut part_number);
817 yield Either::Right((part_time, d, frontier, next_frontier))
818 }
819 yield Either::Left(prog)
820 }
821 }
822 }
823 });
824 let shutdown_button = builder.build(move |caps| async move {
825 let mut cap_set = CapabilitySet::from_elem(caps.into_element());
827
828 let mut output_frontier = Antichain::from_elem(TimelyTimestamp::minimum());
830 let mut flow_control_frontier = Antichain::from_elem(TimelyTimestamp::minimum());
832
833 let mut inflight_parts = Vec::new();
835 let mut pending_parts = std::collections::VecDeque::new();
837
838 if worker_index != chosen_worker {
840 trace!(
841 "We are not the chosen worker ({}), exiting...",
842 chosen_worker
843 );
844 return;
845 }
846 tokio::pin!(data_input);
847 'emitting_parts: loop {
848 let inflight_bytes: usize = inflight_parts.iter().map(|(_, size)| size).sum();
851
852 if inflight_bytes < flow_control_max_bytes
860 || !PartialOrder::less_equal(&flow_control_frontier, &output_frontier)
861 {
862 let (time, part, next_frontier) =
863 if let Some((time, part, next_frontier)) = pending_parts.pop_front() {
864 (time, part, next_frontier)
865 } else {
866 match data_input.next().await {
867 Some(Either::Right((time, part, frontier, next_frontier))) => {
868 output_frontier = frontier;
873 cap_set.downgrade(output_frontier.iter());
874
875 if inflight_bytes >= flow_control_max_bytes
880 && !PartialOrder::less_than(
881 &output_frontier,
882 &flow_control_frontier,
883 )
884 {
885 pending_parts.push_back((time, part, next_frontier));
886 continue 'emitting_parts;
887 }
888 (time, part, next_frontier)
889 }
890 Some(Either::Left(prog)) => {
891 output_frontier = prog;
892 cap_set.downgrade(output_frontier.iter());
893 continue 'emitting_parts;
894 }
895 None => {
896 if pending_parts.is_empty() {
897 break 'emitting_parts;
898 } else {
899 continue 'emitting_parts;
900 }
901 }
902 }
903 };
904
905 let byte_size = part.byte_size();
906 if let Some(emission_ts) = flow_control.summary.results_in(&time) {
916 inflight_parts.push((emission_ts, byte_size));
917 }
918
919 data_output.give(&cap_set.delayed(&time), part);
922
923 if let Some(metrics) = &metrics {
924 metrics.emitted_bytes.inc_by(u64::cast_from(byte_size))
925 }
926
927 output_frontier = next_frontier;
928 cap_set.downgrade(output_frontier.iter())
929 } else {
930 if let Some(metrics) = &metrics {
931 metrics
932 .last_backpressured_bytes
933 .set(u64::cast_from(inflight_bytes))
934 }
935 let parts_count = inflight_parts.len();
936 let new_flow_control_frontier = match flow_control_input.next().await {
941 Some(Event::Progress(frontier)) => frontier,
942 Some(Event::Data(_, _)) => {
943 unreachable!("flow_control_input should not contain data")
944 }
945 None => Antichain::new(),
946 };
947
948 flow_control_frontier.clone_from(&new_flow_control_frontier);
950
951 let retired_parts = inflight_parts
953 .extract_if(.., |(ts, _size)| !flow_control_frontier.less_equal(ts));
954 let (retired_size, retired_count): (usize, usize) = retired_parts
955 .fold((0, 0), |(accum_size, accum_count), (_ts, size)| {
956 (accum_size + size, accum_count + 1)
957 });
958 trace!(
959 "returning {} parts with {} bytes, frontier: {:?}",
960 retired_count, retired_size, flow_control_frontier,
961 );
962
963 if let Some(metrics) = &metrics {
964 metrics.retired_bytes.inc_by(u64::cast_from(retired_size))
965 }
966
967 if let Some(probe) = probe.as_ref() {
969 let _ = probe.send((new_flow_control_frontier, parts_count, retired_count));
970 }
971 }
972 }
973 });
974 (data_stream, shutdown_button.press_on_drop())
975}
976
977#[cfg(test)]
978mod tests {
979 use timely::container::CapacityContainerBuilder;
980 use timely::dataflow::operators::{Enter, Probe};
981 use tokio::sync::mpsc::unbounded_channel;
982 use tokio::sync::oneshot;
983
984 use super::*;
985
986 #[mz_ore::test]
987 fn test_backpressure_non_granular() {
988 use Step::*;
989 backpressure_runner(
990 vec![(50, Part(101)), (50, Part(102)), (100, Part(1))],
991 100,
992 (1, Subtime(0)),
993 vec![
994 AssertOutputFrontier((50, Subtime(2))),
997 AssertBackpressured {
998 frontier: (1, Subtime(0)),
999 inflight_parts: 1,
1000 retired_parts: 0,
1001 },
1002 AssertBackpressured {
1003 frontier: (51, Subtime(0)),
1004 inflight_parts: 1,
1005 retired_parts: 0,
1006 },
1007 ProcessXParts(2),
1008 AssertBackpressured {
1009 frontier: (101, Subtime(0)),
1010 inflight_parts: 2,
1011 retired_parts: 2,
1012 },
1013 AssertOutputFrontier((100, Subtime(3))),
1016 ],
1017 true,
1018 );
1019
1020 backpressure_runner(
1021 vec![
1022 (50, Part(10)),
1023 (50, Part(10)),
1024 (51, Part(100)),
1025 (52, Part(1000)),
1026 ],
1027 50,
1028 (1, Subtime(0)),
1029 vec![
1030 AssertOutputFrontier((51, Subtime(3))),
1032 AssertBackpressured {
1033 frontier: (1, Subtime(0)),
1034 inflight_parts: 3,
1035 retired_parts: 0,
1036 },
1037 ProcessXParts(3),
1038 AssertBackpressured {
1039 frontier: (52, Subtime(0)),
1040 inflight_parts: 3,
1041 retired_parts: 2,
1042 },
1043 AssertBackpressured {
1044 frontier: (53, Subtime(0)),
1045 inflight_parts: 1,
1046 retired_parts: 1,
1047 },
1048 AssertOutputFrontier((52, Subtime(4))),
1051 ],
1052 true,
1053 );
1054
1055 backpressure_runner(
1056 vec![
1057 (50, Part(98)),
1058 (50, Part(1)),
1059 (51, Part(10)),
1060 (52, Part(100)),
1061 (52, Part(10)),
1063 (52, Part(10)),
1064 (52, Part(10)),
1065 (52, Part(100)),
1066 (100, Part(100)),
1068 ],
1069 100,
1070 (1, Subtime(0)),
1071 vec![
1072 AssertOutputFrontier((51, Subtime(3))),
1073 AssertBackpressured {
1077 frontier: (1, Subtime(0)),
1078 inflight_parts: 3,
1079 retired_parts: 0,
1080 },
1081 AssertBackpressured {
1082 frontier: (51, Subtime(0)),
1083 inflight_parts: 3,
1084 retired_parts: 0,
1085 },
1086 ProcessXParts(1),
1087 AssertOutputFrontier((51, Subtime(3))),
1090 ProcessXParts(1),
1094 AssertOutputFrontier((52, Subtime(4))),
1095 AssertBackpressured {
1096 frontier: (52, Subtime(0)),
1097 inflight_parts: 3,
1098 retired_parts: 2,
1099 },
1100 ProcessXParts(1),
1104 AssertBackpressured {
1108 frontier: (53, Subtime(0)),
1109 inflight_parts: 2,
1110 retired_parts: 1,
1111 },
1112 ProcessXParts(5),
1114 AssertBackpressured {
1115 frontier: (101, Subtime(0)),
1116 inflight_parts: 5,
1117 retired_parts: 5,
1118 },
1119 AssertOutputFrontier((100, Subtime(9))),
1120 ],
1121 true,
1122 );
1123 }
1124
1125 #[mz_ore::test]
1126 fn test_backpressure_granular() {
1127 use Step::*;
1128 backpressure_runner(
1129 vec![(50, Part(101)), (50, Part(101))],
1130 100,
1131 (0, Subtime(1)),
1132 vec![
1133 AssertOutputFrontier((50, Subtime(1))),
1135 AssertBackpressured {
1138 frontier: (0, Subtime(1)),
1139 inflight_parts: 1,
1140 retired_parts: 0,
1141 },
1142 AssertBackpressured {
1143 frontier: (50, Subtime(1)),
1144 inflight_parts: 1,
1145 retired_parts: 0,
1146 },
1147 ProcessXParts(1),
1149 AssertBackpressured {
1151 frontier: (50, Subtime(2)),
1152 inflight_parts: 1,
1153 retired_parts: 1,
1154 },
1155 AssertOutputFrontier((50, Subtime(2))),
1157 ],
1158 false,
1159 );
1160
1161 backpressure_runner(
1162 vec![
1163 (50, Part(10)),
1164 (50, Part(10)),
1165 (51, Part(35)),
1166 (52, Part(100)),
1167 ],
1168 50,
1169 (0, Subtime(1)),
1170 vec![
1171 AssertOutputFrontier((51, Subtime(3))),
1173 AssertBackpressured {
1174 frontier: (0, Subtime(1)),
1175 inflight_parts: 3,
1176 retired_parts: 0,
1177 },
1178 AssertBackpressured {
1179 frontier: (50, Subtime(1)),
1180 inflight_parts: 3,
1181 retired_parts: 0,
1182 },
1183 ProcessXParts(1),
1185 AssertBackpressured {
1186 frontier: (50, Subtime(2)),
1187 inflight_parts: 3,
1188 retired_parts: 1,
1189 },
1190 AssertOutputFrontier((52, Subtime(4))),
1193 ProcessXParts(2),
1194 AssertBackpressured {
1195 frontier: (52, Subtime(4)),
1196 inflight_parts: 3,
1197 retired_parts: 2,
1198 },
1199 ],
1200 false,
1201 );
1202 }
1203
1204 type Time = (u64, Subtime);
1205 #[derive(Clone, Debug)]
1206 struct Part(usize);
1207 impl Backpressureable for Part {
1208 fn byte_size(&self) -> usize {
1209 self.0
1210 }
1211 }
1212
1213 enum Step {
1215 AssertOutputFrontier(Time),
1218 AssertBackpressured {
1222 frontier: Time,
1223 inflight_parts: usize,
1224 retired_parts: usize,
1225 },
1226 ProcessXParts(usize),
1228 }
1229
1230 fn backpressure_runner(
1232 input: Vec<(u64, Part)>,
1234 max_inflight_bytes: usize,
1236 summary: Time,
1238 steps: Vec<Step>,
1240 non_granular_consumer: bool,
1243 ) {
1244 timely::execute::execute_directly(move |worker| {
1245 let (backpressure_probe, consumer_tx, mut backpressure_status_rx, finalizer_tx, _token) =
1246 worker.dataflow::<u64, _, _>(|scope| {
1248 let (non_granular_feedback_handle, non_granular_feedback) =
1249 if non_granular_consumer {
1250 let (h, f) = scope.feedback(Default::default());
1251 (Some(h), Some(f))
1252 } else {
1253 (None, None)
1254 };
1255 let (
1256 backpressure_probe,
1257 consumer_tx,
1258 backpressure_status_rx,
1259 token,
1260 backpressured,
1261 finalizer_tx,
1262 ) = scope.scoped::<(u64, Subtime), _, _>("hybrid", |scope| {
1263 let (input, finalizer_tx) =
1264 iterator_operator(scope.clone(), input.into_iter());
1265
1266 let (flow_control, granular_feedback_handle) = if non_granular_consumer {
1267 (
1268 FlowControl {
1269 progress_stream: non_granular_feedback.unwrap().enter(scope),
1270 max_inflight_bytes,
1271 summary,
1272 metrics: None
1273 },
1274 None,
1275 )
1276 } else {
1277 let (granular_feedback_handle, granular_feedback) =
1278 scope.feedback(Default::default());
1279 (
1280 FlowControl {
1281 progress_stream: granular_feedback,
1282 max_inflight_bytes,
1283 summary,
1284 metrics: None,
1285 },
1286 Some(granular_feedback_handle),
1287 )
1288 };
1289
1290 let (backpressure_status_tx, backpressure_status_rx) = unbounded_channel();
1291
1292 let (backpressured, token) = backpressure(
1293 scope,
1294 "test",
1295 &input,
1296 flow_control,
1297 0,
1298 Some(backpressure_status_tx),
1299 );
1300
1301 let tx = if !non_granular_consumer {
1303 Some(consumer_operator(
1304 scope.clone(),
1305 &backpressured,
1306 granular_feedback_handle.unwrap(),
1307 ))
1308 } else {
1309 None
1310 };
1311
1312 (
1313 backpressured.probe(),
1314 tx,
1315 backpressure_status_rx,
1316 token,
1317 backpressured.leave(),
1318 finalizer_tx,
1319 )
1320 });
1321
1322 let consumer_tx = if non_granular_consumer {
1324 consumer_operator(
1325 scope.clone(),
1326 &backpressured,
1327 non_granular_feedback_handle.unwrap(),
1328 )
1329 } else {
1330 consumer_tx.unwrap()
1331 };
1332
1333 (
1334 backpressure_probe,
1335 consumer_tx,
1336 backpressure_status_rx,
1337 finalizer_tx,
1338 token,
1339 )
1340 });
1341
1342 use Step::*;
1343 for step in steps {
1344 match step {
1345 AssertOutputFrontier(time) => {
1346 eprintln!("checking advance to {time:?}");
1347 backpressure_probe.with_frontier(|front| {
1348 eprintln!("current backpressure output frontier: {front:?}");
1349 });
1350 while backpressure_probe.less_than(&time) {
1351 worker.step();
1352 backpressure_probe.with_frontier(|front| {
1353 eprintln!("current backpressure output frontier: {front:?}");
1354 });
1355 std::thread::sleep(std::time::Duration::from_millis(25));
1356 }
1357 }
1358 ProcessXParts(parts) => {
1359 eprintln!("processing {parts:?} parts");
1360 for _ in 0..parts {
1361 consumer_tx.send(()).unwrap();
1362 }
1363 }
1364 AssertBackpressured {
1365 frontier,
1366 inflight_parts,
1367 retired_parts,
1368 } => {
1369 let frontier = Antichain::from_elem(frontier);
1370 eprintln!(
1371 "asserting backpressured at {frontier:?}, with {inflight_parts:?} inflight parts \
1372 and {retired_parts:?} retired"
1373 );
1374 let (new_frontier, new_count, new_retired_count) = loop {
1375 if let Ok(val) = backpressure_status_rx.try_recv() {
1376 break val;
1377 }
1378 worker.step();
1379 std::thread::sleep(std::time::Duration::from_millis(25));
1380 };
1381 assert_eq!(
1382 (frontier, inflight_parts, retired_parts),
1383 (new_frontier, new_count, new_retired_count)
1384 );
1385 }
1386 }
1387 }
1388 let _ = finalizer_tx.send(());
1390 });
1391 }
1392
1393 fn iterator_operator<
1396 G: Scope<Timestamp = (u64, Subtime)>,
1397 I: Iterator<Item = (u64, Part)> + 'static,
1398 >(
1399 scope: G,
1400 mut input: I,
1401 ) -> (Stream<G, Part>, oneshot::Sender<()>) {
1402 let (finalizer_tx, finalizer_rx) = oneshot::channel();
1403 let mut iterator = AsyncOperatorBuilder::new("iterator".to_string(), scope);
1404 let (output_handle, output) = iterator.new_output::<CapacityContainerBuilder<Vec<Part>>>();
1405
1406 iterator.build(|mut caps| async move {
1407 let mut capability = Some(caps.pop().unwrap());
1408 let mut last = None;
1409 while let Some(element) = input.next() {
1410 let time = element.0.clone();
1411 let part = element.1;
1412 last = Some((time, Subtime(0)));
1413 output_handle.give(&capability.as_ref().unwrap().delayed(&last.unwrap()), part);
1414 }
1415 if let Some(last) = last {
1416 capability
1417 .as_mut()
1418 .unwrap()
1419 .downgrade(&(last.0 + 1, last.1));
1420 }
1421
1422 let _ = finalizer_rx.await;
1423 capability.take();
1424 });
1425
1426 (output, finalizer_tx)
1427 }
1428
1429 fn consumer_operator<G: Scope, O: Backpressureable + std::fmt::Debug>(
1433 scope: G,
1434 input: &Stream<G, O>,
1435 feedback: timely::dataflow::operators::feedback::Handle<G, Vec<std::convert::Infallible>>,
1436 ) -> UnboundedSender<()> {
1437 let (tx, mut rx) = unbounded_channel::<()>();
1438 let mut consumer = AsyncOperatorBuilder::new("consumer".to_string(), scope);
1439 let (output_handle, output) =
1440 consumer.new_output::<CapacityContainerBuilder<Vec<std::convert::Infallible>>>();
1441 let mut input = consumer.new_input_for(input, Pipeline, &output_handle);
1442
1443 consumer.build(|_caps| async move {
1444 while let Some(()) = rx.recv().await {
1445 while let Some(Event::Progress(_)) = input.next().await {}
1447 }
1448 });
1449 output.connect_loop(feedback);
1450
1451 tx
1452 }
1453}