1use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
13use std::convert::Infallible;
14use std::fmt::Debug;
15use std::future::Future;
16use std::hash::Hash;
17use std::sync::Arc;
18use std::time::Instant;
19
20use differential_dataflow::lattice::Lattice;
21use futures::{StreamExt, future::Either};
22use mz_expr::{ColumnSpecs, EvalError, Interpreter, MfpPlan, ResultSpec, UnmaterializableFunc};
23use mz_ore::cast::CastFrom;
24use mz_ore::collections::CollectionExt;
25use mz_ore::str::redact;
26use mz_persist_client::cache::PersistClientCache;
27use mz_persist_client::cfg::{PersistConfig, RetryParameters};
28use mz_persist_client::fetch::{ExchangeableBatchPart, ShardSourcePart};
29use mz_persist_client::fetch::{FetchedBlob, FetchedPart};
30use mz_persist_client::operators::shard_source::{
31 ErrorHandler, FilterResult, SnapshotMode, shard_source,
32};
33use mz_persist_client::stats::STATS_AUDIT_PANIC;
34use mz_persist_types::Codec64;
35use mz_persist_types::codec_impls::UnitSchema;
36use mz_persist_types::columnar::{ColumnEncoder, Schema};
37use mz_repr::{
38 Datum, DatumVec, Diff, GlobalId, RelationDesc, ReprRelationType, Row, RowArena, Timestamp,
39};
40use mz_storage_types::StorageDiff;
41use mz_storage_types::controller::{CollectionMetadata, TxnsCodecRow};
42use mz_storage_types::errors::DataflowError;
43use mz_storage_types::sources::SourceData;
44use mz_storage_types::stats::RelationPartStats;
45use mz_timely_util::builder_async::{
46 Event, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
47};
48use mz_timely_util::probe::ProbeNotify;
49use mz_txn_wal::operator::{TxnsContext, txns_progress};
50use serde::{Deserialize, Serialize};
51use timely::PartialOrder;
52use timely::container::CapacityContainerBuilder;
53use timely::dataflow::channels::pact::Pipeline;
54use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
55use timely::dataflow::operators::generic::{OutputBuilder, OutputBuilderSession};
56use timely::dataflow::operators::{Capability, Leave, OkErr};
57use timely::dataflow::operators::{CapabilitySet, ConnectLoop, Feedback};
58use timely::dataflow::{Scope, Stream, StreamVec};
59use timely::order::TotalOrder;
60use timely::progress::Antichain;
61use timely::progress::Timestamp as TimelyTimestamp;
62use timely::progress::timestamp::PathSummary;
63use timely::scheduling::Activator;
64use tokio::sync::mpsc::UnboundedSender;
65use tracing::{error, trace};
66
67use crate::metrics::BackpressureMetrics;
68
69#[derive(
77 Copy,
78 Clone,
79 PartialEq,
80 Default,
81 Eq,
82 PartialOrd,
83 Ord,
84 Debug,
85 Serialize,
86 Deserialize,
87 Hash
88)]
89pub struct Subtime(u64);
90
91impl PartialOrder for Subtime {
92 fn less_equal(&self, other: &Self) -> bool {
93 self.0.less_equal(&other.0)
94 }
95}
96
97impl TotalOrder for Subtime {}
98
99impl PathSummary<Subtime> for Subtime {
100 fn results_in(&self, src: &Subtime) -> Option<Subtime> {
101 self.0.results_in(&src.0).map(Subtime)
102 }
103
104 fn followed_by(&self, other: &Self) -> Option<Self> {
105 self.0.followed_by(&other.0).map(Subtime)
106 }
107}
108
109impl TimelyTimestamp for Subtime {
110 type Summary = Subtime;
111
112 fn minimum() -> Self {
113 Subtime(0)
114 }
115}
116
117impl columnation::Columnation for Subtime {
118 type InnerRegion = columnation::CopyRegion<Subtime>;
119}
120
121impl differential_dataflow::lattice::Lattice for Subtime {
122 fn join(&self, other: &Self) -> Self {
123 Subtime(std::cmp::max(self.0, other.0))
124 }
125 fn meet(&self, other: &Self) -> Self {
126 Subtime(std::cmp::min(self.0, other.0))
127 }
128}
129
130impl differential_dataflow::lattice::Maximum for Subtime {
131 fn maximum() -> Self {
132 Subtime(u64::MAX)
133 }
134}
135
136impl Subtime {
137 pub const fn least_summary() -> Self {
139 Subtime(1)
140 }
141}
142
143pub fn persist_source<'scope, E>(
166 scope: Scope<'scope, mz_repr::Timestamp>,
167 source_id: GlobalId,
168 persist_clients: Arc<PersistClientCache>,
169 txns_ctx: &TxnsContext,
170 metadata: CollectionMetadata,
171 read_schema: Option<RelationDesc>,
172 as_of: Option<Antichain<Timestamp>>,
173 snapshot_mode: SnapshotMode,
174 until: Antichain<Timestamp>,
175 map_filter_project: Option<&mut MfpPlan>,
176 max_inflight_bytes: Option<usize>,
177 start_signal: impl Future<Output = ()> + Send + 'static,
178 error_handler: ErrorHandler,
179) -> (
180 StreamVec<'scope, mz_repr::Timestamp, (Row, Timestamp, Diff)>,
181 StreamVec<'scope, mz_repr::Timestamp, (E, Timestamp, Diff)>,
182 Vec<PressOnDropButton>,
183)
184where
185 E: timely::ExchangeData + Ord + Clone + Debug + From<DataflowError> + From<EvalError>,
186{
187 let shard_metrics = persist_clients.shard_metrics(&metadata.data_shard, &source_id.to_string());
188
189 let mut tokens = vec![];
190
191 let outer = scope.clone();
192 let stream = scope.scoped(&format!("granular_backpressure({})", source_id), |scope| {
193 let (flow_control, flow_control_probe) = match max_inflight_bytes {
194 Some(max_inflight_bytes) => {
195 let backpressure_metrics = BackpressureMetrics {
196 emitted_bytes: Arc::clone(&shard_metrics.backpressure_emitted_bytes),
197 last_backpressured_bytes: Arc::clone(
198 &shard_metrics.backpressure_last_backpressured_bytes,
199 ),
200 retired_bytes: Arc::clone(&shard_metrics.backpressure_retired_bytes),
201 };
202
203 let probe = mz_timely_util::probe::Handle::default();
204 let progress_stream = mz_timely_util::probe::source(
205 scope.clone(),
206 format!("decode_backpressure_probe({source_id})"),
207 probe.clone(),
208 );
209 let flow_control = FlowControl {
210 progress_stream,
211 max_inflight_bytes,
212 summary: (Default::default(), Subtime::least_summary()),
213 metrics: Some(backpressure_metrics),
214 };
215 (Some(flow_control), Some(probe))
216 }
217 None => (None, None),
218 };
219
220 let cfg = Arc::clone(&persist_clients.cfg().configs);
226 let subscribe_sleep = match metadata.txns_shard {
227 Some(_) => Some(move || mz_txn_wal::operator::txns_data_shard_retry_params(&cfg)),
228 None => None,
229 };
230
231 let (stream, source_tokens) = persist_source_core(
232 outer,
233 scope,
234 source_id,
235 Arc::clone(&persist_clients),
236 metadata.clone(),
237 read_schema,
238 as_of.clone(),
239 snapshot_mode,
240 until.clone(),
241 map_filter_project,
242 flow_control,
243 subscribe_sleep,
244 start_signal,
245 error_handler,
246 );
247 tokens.extend(source_tokens);
248
249 let stream = match flow_control_probe {
250 Some(probe) => stream.probe_notify_with(vec![probe]),
251 None => stream,
252 };
253
254 stream.leave(outer)
255 });
256
257 let (stream, txns_tokens) = match metadata.txns_shard {
262 Some(txns_shard) => txns_progress::<SourceData, (), Timestamp, i64, _, TxnsCodecRow, _>(
263 stream,
264 &source_id.to_string(),
265 txns_ctx,
266 move || {
267 let (c, l) = (
268 Arc::clone(&persist_clients),
269 metadata.persist_location.clone(),
270 );
271 async move { c.open(l).await.expect("location is valid") }
272 },
273 txns_shard,
274 metadata.data_shard,
275 as_of
276 .expect("as_of is provided for table sources")
277 .into_option()
278 .expect("shard is not closed"),
279 until,
280 Arc::new(metadata.relation_desc),
281 Arc::new(UnitSchema),
282 ),
283 None => (stream, vec![]),
284 };
285 tokens.extend(txns_tokens);
286 let (ok_stream, err_stream) = stream.ok_err(|(d, t, r)| match d {
287 Ok(row) => Ok((row, t.0, r)),
288 Err(err) => Err((err, t.0, r)),
289 });
290 (ok_stream, err_stream, tokens)
291}
292
293type RefinedScope<'scope, T> = Scope<'scope, (T, Subtime)>;
294
295#[allow(clippy::needless_borrow)]
302pub fn persist_source_core<'g, 'outer, E>(
303 outer: Scope<'outer, mz_repr::Timestamp>,
304 scope: RefinedScope<'g, mz_repr::Timestamp>,
305 source_id: GlobalId,
306 persist_clients: Arc<PersistClientCache>,
307 metadata: CollectionMetadata,
308 read_schema: Option<RelationDesc>,
309 as_of: Option<Antichain<Timestamp>>,
310 snapshot_mode: SnapshotMode,
311 until: Antichain<Timestamp>,
312 map_filter_project: Option<&mut MfpPlan>,
313 flow_control: Option<FlowControl<'g, (mz_repr::Timestamp, Subtime)>>,
314 listen_sleep: Option<impl Fn() -> RetryParameters + Send + 'static>,
316 start_signal: impl Future<Output = ()> + Send + 'static,
317 error_handler: ErrorHandler,
318) -> (
319 Stream<
320 'g,
321 (mz_repr::Timestamp, Subtime),
322 Vec<(Result<Row, E>, (mz_repr::Timestamp, Subtime), Diff)>,
323 >,
324 Vec<PressOnDropButton>,
325)
326where
327 E: timely::ExchangeData + Ord + Clone + Debug + From<DataflowError> + From<EvalError>,
328{
329 let cfg = persist_clients.cfg().clone();
330 let name = source_id.to_string();
331 let filter_plan = map_filter_project.as_ref().map(|p| (*p).clone());
332
333 let read_desc = match read_schema {
335 Some(desc) => desc,
336 None => metadata.relation_desc,
337 };
338
339 let desc_transformer = match flow_control {
340 Some(flow_control) => Some(move |scope, descs, chosen_worker| {
341 let (stream, token) = backpressure(
342 scope,
343 &format!("backpressure({source_id})"),
344 descs,
345 flow_control,
346 chosen_worker,
347 None,
348 );
349 (stream, vec![token])
350 }),
351 None => None,
352 };
353
354 let metrics = Arc::clone(persist_clients.metrics());
355 let filter_name = name.clone();
356 let upper = until.as_option().cloned().unwrap_or(Timestamp::MAX);
360 let (fetched, token) = shard_source(
361 outer,
362 scope,
363 &name,
364 move || {
365 let (c, l) = (
366 Arc::clone(&persist_clients),
367 metadata.persist_location.clone(),
368 );
369 async move { c.open(l).await.unwrap() }
370 },
371 metadata.data_shard,
372 as_of,
373 snapshot_mode,
374 until.clone(),
375 desc_transformer,
376 Arc::new(read_desc.clone()),
377 Arc::new(UnitSchema),
378 move |stats, frontier| {
379 let Some(lower) = frontier.as_option().copied() else {
380 return FilterResult::Discard;
383 };
384
385 if lower > upper {
386 return FilterResult::Discard;
389 }
390
391 let time_range =
392 ResultSpec::value_between(Datum::MzTimestamp(lower), Datum::MzTimestamp(upper));
393 if let Some(plan) = &filter_plan {
394 let metrics = &metrics.pushdown.part_stats;
395 let stats = RelationPartStats::new(&filter_name, metrics, &read_desc, stats);
396 filter_result(&read_desc, time_range, stats, plan)
397 } else {
398 FilterResult::Keep
399 }
400 },
401 listen_sleep,
402 start_signal,
403 error_handler,
404 );
405 let rows = decode_and_mfp(cfg, fetched, &name, until, map_filter_project);
406 (rows, token)
407}
408
409fn filter_result(
410 relation_desc: &RelationDesc,
411 time_range: ResultSpec,
412 stats: RelationPartStats,
413 plan: &MfpPlan,
414) -> FilterResult {
415 let arena = RowArena::new();
416 let relation = ReprRelationType::from(relation_desc.typ());
417 let mut ranges = ColumnSpecs::new(&relation, &arena);
418 ranges.push_unmaterializable(UnmaterializableFunc::MzNow, time_range);
419
420 let may_error = stats.err_count().map_or(true, |count| count > 0);
421
422 for (pos, (idx, _, _)) in relation_desc.iter_all().enumerate() {
425 let result_spec = stats.col_stats(idx, &arena);
426 ranges.push_column(pos, result_spec);
427 }
428 let result = ranges.mfp_plan_filter(plan).range;
429 let may_error = may_error || result.may_fail();
430 let may_keep = result.may_contain(Datum::True);
431 let may_skip = result.may_contain(Datum::False) || result.may_contain(Datum::Null);
432 if relation_desc.len() == 0 && !may_error && !may_skip {
433 let Ok(mut key) = <RelationDesc as Schema<SourceData>>::encoder(relation_desc) else {
434 return FilterResult::Keep;
435 };
436 key.append(&SourceData(Ok(Row::default())));
437 let key = key.finish();
438 let Ok(mut val) = <UnitSchema as Schema<()>>::encoder(&UnitSchema) else {
439 return FilterResult::Keep;
440 };
441 val.append(&());
442 let val = val.finish();
443
444 FilterResult::ReplaceWith {
445 key: Arc::new(key),
446 val: Arc::new(val),
447 }
448 } else if may_error || may_keep {
449 FilterResult::Keep
450 } else {
451 FilterResult::Discard
452 }
453}
454
455pub fn decode_and_mfp<'scope, E>(
456 cfg: PersistConfig,
457 fetched: StreamVec<
458 'scope,
459 (mz_repr::Timestamp, Subtime),
460 FetchedBlob<SourceData, (), Timestamp, StorageDiff>,
461 >,
462 name: &str,
463 until: Antichain<Timestamp>,
464 mut map_filter_project: Option<&mut MfpPlan>,
465) -> StreamVec<
466 'scope,
467 (mz_repr::Timestamp, Subtime),
468 (Result<Row, E>, (mz_repr::Timestamp, Subtime), Diff),
469>
470where
471 E: timely::ExchangeData + Ord + Clone + Debug + From<DataflowError> + From<EvalError>,
472{
473 let scope = fetched.scope();
474 let mut builder = OperatorBuilder::new(
475 format!("persist_source::decode_and_mfp({})", name),
476 scope.clone(),
477 );
478 let operator_info = builder.operator_info();
479
480 let mut fetched_input = builder.new_input(fetched, Pipeline);
481 let (updates_output, updates_stream) = builder.new_output();
482 let mut updates_output = OutputBuilder::from(updates_output);
483
484 let mut datum_vec = mz_repr::DatumVec::new();
486 let mut row_builder = Row::default();
487
488 let map_filter_project = map_filter_project.as_mut().map(|mfp| mfp.take());
490
491 builder.build(move |_caps| {
492 let name = name.to_owned();
493 let activations = scope.activations();
495 let activator = Activator::new(operator_info.address, activations);
496 let mut pending_work = std::collections::VecDeque::new();
498 let panic_on_audit_failure = STATS_AUDIT_PANIC.handle(&cfg);
499
500 move |_frontier| {
501 fetched_input.for_each(|time, data| {
502 let capability = time.retain(0);
503 for fetched_blob in data.drain(..) {
504 pending_work.push_back(PendingWork {
505 panic_on_audit_failure: panic_on_audit_failure.get(),
506 capability: capability.clone(),
507 part: PendingPart::Unparsed(fetched_blob),
508 })
509 }
510 });
511
512 let yield_fuel = cfg.storage_source_decode_fuel();
515 let yield_fn = |_, work| work >= yield_fuel;
516
517 let mut work = 0;
518 let start_time = Instant::now();
519 let mut output = updates_output.activate();
520 while !pending_work.is_empty() && !yield_fn(start_time, work) {
521 let done = pending_work.front_mut().unwrap().do_work(
522 &mut work,
523 &name,
524 start_time,
525 yield_fn,
526 &until,
527 map_filter_project.as_ref(),
528 &mut datum_vec,
529 &mut row_builder,
530 &mut output,
531 );
532 if done {
533 pending_work.pop_front();
534 }
535 }
536 if !pending_work.is_empty() {
537 activator.activate();
538 }
539 }
540 });
541
542 updates_stream
543}
544
545struct PendingWork {
547 panic_on_audit_failure: bool,
549 capability: Capability<(mz_repr::Timestamp, Subtime)>,
551 part: PendingPart,
553}
554
555enum PendingPart {
556 Unparsed(FetchedBlob<SourceData, (), Timestamp, StorageDiff>),
557 Parsed {
558 part: ShardSourcePart<SourceData, (), Timestamp, StorageDiff>,
559 },
560}
561
562impl PendingPart {
563 fn part_mut(&mut self) -> &mut FetchedPart<SourceData, (), Timestamp, StorageDiff> {
570 match self {
571 PendingPart::Unparsed(x) => {
572 *self = PendingPart::Parsed { part: x.parse() };
573 self.part_mut()
575 }
576 PendingPart::Parsed { part } => &mut part.part,
577 }
578 }
579}
580
581impl PendingWork {
582 fn do_work<YFn, E>(
585 &mut self,
586 work: &mut usize,
587 name: &str,
588 start_time: Instant,
589 yield_fn: YFn,
590 until: &Antichain<Timestamp>,
591 map_filter_project: Option<&MfpPlan>,
592 datum_vec: &mut DatumVec,
593 row_builder: &mut Row,
594 output: &mut OutputBuilderSession<
595 '_,
596 (mz_repr::Timestamp, Subtime),
597 ConsolidatingContainerBuilder<
598 Vec<(Result<Row, E>, (mz_repr::Timestamp, Subtime), Diff)>,
599 >,
600 >,
601 ) -> bool
602 where
603 YFn: Fn(Instant, usize) -> bool,
604 E: timely::ExchangeData + Ord + Clone + Debug + From<DataflowError> + From<EvalError>,
605 {
606 let mut session = output.session_with_builder(&self.capability);
607 let fetched_part = self.part.part_mut();
608 let is_filter_pushdown_audit = fetched_part.is_filter_pushdown_audit();
609 let mut row_buf = None;
610 while let Some(((key, val), time, diff)) =
611 fetched_part.next_with_storage(&mut row_buf, &mut None)
612 {
613 if until.less_equal(&time) {
614 continue;
615 }
616 match (key, val) {
617 (SourceData(Ok(row)), ()) => {
618 if let Some(mfp) = map_filter_project {
619 *work += 1;
626 let arena = mz_repr::RowArena::new();
627 let mut datums_local = datum_vec.borrow_with(&row);
628 for result in mfp.evaluate(
629 &mut datums_local,
630 &arena,
631 time,
632 diff.into(),
633 |time| !until.less_equal(time),
634 row_builder,
635 ) {
636 if let Some(stats) = &is_filter_pushdown_audit {
640 sentry::with_scope(
644 |scope| {
645 scope
646 .set_tag("alert_id", "persist_pushdown_audit_violation")
647 },
648 || {
649 error!(
650 ?stats,
651 name,
652 mfp = ?redact(&mfp),
653 result = ?redact(&result),
654 "persist filter pushdown correctness violation!"
655 );
656 if self.panic_on_audit_failure {
657 panic!(
658 "persist filter pushdown correctness violation! {}",
659 name
660 );
661 }
662 },
663 );
664 }
665 match result {
666 Ok((row, time, diff)) => {
667 if !until.less_equal(&time) {
669 let mut emit_time = *self.capability.time();
670 emit_time.0 = time;
671 session.give((Ok(row), emit_time, diff));
672 *work += 1;
673 }
674 }
675 Err((err, time, diff)) => {
676 if !until.less_equal(&time) {
678 let mut emit_time = *self.capability.time();
679 emit_time.0 = time;
680 session.give((Err(err), emit_time, diff));
681 *work += 1;
682 }
683 }
684 }
685 }
686 drop(datums_local);
690 row_buf.replace(SourceData(Ok(row)));
691 } else {
692 let mut emit_time = *self.capability.time();
693 emit_time.0 = time;
694 session.give((Ok(row.clone()), emit_time, diff.into()));
696 row_buf.replace(SourceData(Ok(row)));
697 *work += 1;
698 }
699 }
700 (SourceData(Err(err)), ()) => {
701 let mut emit_time = *self.capability.time();
702 emit_time.0 = time;
703 session.give((Err(E::from(err)), emit_time, diff.into()));
704 *work += 1;
705 }
706 }
707 if yield_fn(start_time, *work) {
708 return false;
709 }
710 }
711 true
712 }
713}
714
715pub trait Backpressureable: Clone + 'static {
717 fn byte_size(&self) -> usize;
719}
720
721impl<T: Clone + 'static> Backpressureable for (usize, ExchangeableBatchPart<T>) {
722 fn byte_size(&self) -> usize {
723 self.1.encoded_size_bytes()
724 }
725}
726
727#[derive(Debug)]
729pub struct FlowControl<'scope, T: timely::progress::Timestamp> {
730 pub progress_stream: StreamVec<'scope, T, Infallible>,
736 pub max_inflight_bytes: usize,
738 pub summary: T::Summary,
741
742 pub metrics: Option<BackpressureMetrics>,
744}
745
746pub fn backpressure<'scope, T, O>(
759 scope: Scope<'scope, (T, Subtime)>,
760 name: &str,
761 data: StreamVec<'scope, (T, Subtime), O>,
762 flow_control: FlowControl<'scope, (T, Subtime)>,
763 chosen_worker: usize,
764 probe: Option<UnboundedSender<(Antichain<(T, Subtime)>, usize, usize)>>,
766) -> (StreamVec<'scope, (T, Subtime), O>, PressOnDropButton)
767where
768 T: TimelyTimestamp + Lattice + Codec64 + TotalOrder,
769 O: Backpressureable + std::fmt::Debug,
770{
771 let worker_index = scope.index();
772
773 let (flow_control_stream, flow_control_max_bytes, metrics) = (
774 flow_control.progress_stream,
775 flow_control.max_inflight_bytes,
776 flow_control.metrics,
777 );
778
779 let (handle, summaried_flow) = scope.feedback(flow_control.summary.clone());
784 flow_control_stream.connect_loop(handle);
785
786 let mut builder = AsyncOperatorBuilder::new(
787 format!("persist_source_backpressure({})", name),
788 scope.clone(),
789 );
790 let (data_output, data_stream) = builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
791
792 let mut data_input = builder.new_disconnected_input(data, Pipeline);
793 let mut flow_control_input = builder.new_disconnected_input(summaried_flow, Pipeline);
794
795 fn synthesize_frontiers<T: PartialOrder + Clone>(
797 mut frontier: Antichain<(T, Subtime)>,
798 mut time: (T, Subtime),
799 part_number: &mut u64,
800 ) -> (
801 (T, Subtime),
802 Antichain<(T, Subtime)>,
803 Antichain<(T, Subtime)>,
804 ) {
805 let mut next_frontier = frontier.clone();
806 time.1 = Subtime(*part_number);
807 frontier.insert(time.clone());
808 *part_number += 1;
809 let mut next_time = time.clone();
810 next_time.1 = Subtime(*part_number);
811 next_frontier.insert(next_time);
812 (time, frontier, next_frontier)
813 }
814
815 let data_input = async_stream::stream!({
818 let mut part_number = 0;
819 let mut parts: Vec<((T, Subtime), O)> = Vec::new();
820 loop {
821 match data_input.next().await {
822 None => {
823 let empty = Antichain::new();
824 parts.sort_by_key(|val| val.0.clone());
825 for (part_time, d) in parts.drain(..) {
826 let (part_time, frontier, next_frontier) = synthesize_frontiers(
827 empty.clone(),
828 part_time.clone(),
829 &mut part_number,
830 );
831 yield Either::Right((part_time, d, frontier, next_frontier))
832 }
833 break;
834 }
835 Some(Event::Data(time, data)) => {
836 for d in data {
837 parts.push((time.clone(), d));
838 }
839 }
840 Some(Event::Progress(prog)) => {
841 parts.sort_by_key(|val| val.0.clone());
842 for (part_time, d) in parts.extract_if(.., |p| !prog.less_equal(&p.0)) {
843 let (part_time, frontier, next_frontier) =
844 synthesize_frontiers(prog.clone(), part_time.clone(), &mut part_number);
845 yield Either::Right((part_time, d, frontier, next_frontier))
846 }
847 yield Either::Left(prog)
848 }
849 }
850 }
851 });
852 let shutdown_button = builder.build(move |caps| async move {
853 let mut cap_set = CapabilitySet::from_elem(caps.into_element());
855
856 let mut output_frontier = Antichain::from_elem(TimelyTimestamp::minimum());
858 let mut flow_control_frontier = Antichain::from_elem(TimelyTimestamp::minimum());
860
861 let mut inflight_parts = Vec::new();
863 let mut pending_parts = std::collections::VecDeque::new();
865
866 if worker_index != chosen_worker {
868 trace!(
869 "We are not the chosen worker ({}), exiting...",
870 chosen_worker
871 );
872 return;
873 }
874 tokio::pin!(data_input);
875 'emitting_parts: loop {
876 let inflight_bytes: usize = inflight_parts.iter().map(|(_, size)| size).sum();
879
880 if inflight_bytes < flow_control_max_bytes
888 || !PartialOrder::less_equal(&flow_control_frontier, &output_frontier)
889 {
890 let (time, part, next_frontier) =
891 if let Some((time, part, next_frontier)) = pending_parts.pop_front() {
892 (time, part, next_frontier)
893 } else {
894 match data_input.next().await {
895 Some(Either::Right((time, part, frontier, next_frontier))) => {
896 output_frontier = frontier;
901 cap_set.downgrade(output_frontier.iter());
902
903 if inflight_bytes >= flow_control_max_bytes
908 && !PartialOrder::less_than(
909 &output_frontier,
910 &flow_control_frontier,
911 )
912 {
913 pending_parts.push_back((time, part, next_frontier));
914 continue 'emitting_parts;
915 }
916 (time, part, next_frontier)
917 }
918 Some(Either::Left(prog)) => {
919 output_frontier = prog;
920 cap_set.downgrade(output_frontier.iter());
921 continue 'emitting_parts;
922 }
923 None => {
924 if pending_parts.is_empty() {
925 break 'emitting_parts;
926 } else {
927 continue 'emitting_parts;
928 }
929 }
930 }
931 };
932
933 let byte_size = part.byte_size();
934 if let Some(emission_ts) = flow_control.summary.results_in(&time) {
944 inflight_parts.push((emission_ts, byte_size));
945 }
946
947 data_output.give(&cap_set.delayed(&time), part);
950
951 if let Some(metrics) = &metrics {
952 metrics.emitted_bytes.inc_by(u64::cast_from(byte_size))
953 }
954
955 output_frontier = next_frontier;
956 cap_set.downgrade(output_frontier.iter())
957 } else {
958 if let Some(metrics) = &metrics {
959 metrics
960 .last_backpressured_bytes
961 .set(u64::cast_from(inflight_bytes))
962 }
963 let parts_count = inflight_parts.len();
964 let new_flow_control_frontier = match flow_control_input.next().await {
969 Some(Event::Progress(frontier)) => frontier,
970 Some(Event::Data(_, _)) => {
971 unreachable!("flow_control_input should not contain data")
972 }
973 None => Antichain::new(),
974 };
975
976 flow_control_frontier.clone_from(&new_flow_control_frontier);
978
979 let retired_parts = inflight_parts
981 .extract_if(.., |(ts, _size)| !flow_control_frontier.less_equal(ts));
982 let (retired_size, retired_count): (usize, usize) = retired_parts
983 .fold((0, 0), |(accum_size, accum_count), (_ts, size)| {
984 (accum_size + size, accum_count + 1)
985 });
986 trace!(
987 "returning {} parts with {} bytes, frontier: {:?}",
988 retired_count, retired_size, flow_control_frontier,
989 );
990
991 if let Some(metrics) = &metrics {
992 metrics.retired_bytes.inc_by(u64::cast_from(retired_size))
993 }
994
995 if let Some(probe) = probe.as_ref() {
997 let _ = probe.send((new_flow_control_frontier, parts_count, retired_count));
998 }
999 }
1000 }
1001 });
1002 (data_stream, shutdown_button.press_on_drop())
1003}
1004
1005#[cfg(test)]
1006mod tests {
1007 use timely::container::CapacityContainerBuilder;
1008 use timely::dataflow::operators::{Enter, Probe};
1009 use tokio::sync::mpsc::unbounded_channel;
1010 use tokio::sync::oneshot;
1011
1012 use super::*;
1013
1014 #[mz_ore::test]
1015 fn test_backpressure_non_granular() {
1016 use Step::*;
1017 backpressure_runner(
1018 vec![(50, Part(101)), (50, Part(102)), (100, Part(1))],
1019 100,
1020 (1, Subtime(0)),
1021 vec![
1022 AssertOutputFrontier((50, Subtime(2))),
1025 AssertBackpressured {
1026 frontier: (1, Subtime(0)),
1027 inflight_parts: 1,
1028 retired_parts: 0,
1029 },
1030 AssertBackpressured {
1031 frontier: (51, Subtime(0)),
1032 inflight_parts: 1,
1033 retired_parts: 0,
1034 },
1035 ProcessXParts(2),
1036 AssertBackpressured {
1037 frontier: (101, Subtime(0)),
1038 inflight_parts: 2,
1039 retired_parts: 2,
1040 },
1041 AssertOutputFrontier((100, Subtime(3))),
1044 ],
1045 true,
1046 );
1047
1048 backpressure_runner(
1049 vec![
1050 (50, Part(10)),
1051 (50, Part(10)),
1052 (51, Part(100)),
1053 (52, Part(1000)),
1054 ],
1055 50,
1056 (1, Subtime(0)),
1057 vec![
1058 AssertOutputFrontier((51, Subtime(3))),
1060 AssertBackpressured {
1061 frontier: (1, Subtime(0)),
1062 inflight_parts: 3,
1063 retired_parts: 0,
1064 },
1065 ProcessXParts(3),
1066 AssertBackpressured {
1067 frontier: (52, Subtime(0)),
1068 inflight_parts: 3,
1069 retired_parts: 2,
1070 },
1071 AssertBackpressured {
1072 frontier: (53, Subtime(0)),
1073 inflight_parts: 1,
1074 retired_parts: 1,
1075 },
1076 AssertOutputFrontier((52, Subtime(4))),
1079 ],
1080 true,
1081 );
1082
1083 backpressure_runner(
1084 vec![
1085 (50, Part(98)),
1086 (50, Part(1)),
1087 (51, Part(10)),
1088 (52, Part(100)),
1089 (52, Part(10)),
1091 (52, Part(10)),
1092 (52, Part(10)),
1093 (52, Part(100)),
1094 (100, Part(100)),
1096 ],
1097 100,
1098 (1, Subtime(0)),
1099 vec![
1100 AssertOutputFrontier((51, Subtime(3))),
1101 AssertBackpressured {
1105 frontier: (1, Subtime(0)),
1106 inflight_parts: 3,
1107 retired_parts: 0,
1108 },
1109 AssertBackpressured {
1110 frontier: (51, Subtime(0)),
1111 inflight_parts: 3,
1112 retired_parts: 0,
1113 },
1114 ProcessXParts(1),
1115 AssertOutputFrontier((51, Subtime(3))),
1118 ProcessXParts(1),
1122 AssertOutputFrontier((52, Subtime(4))),
1123 AssertBackpressured {
1124 frontier: (52, Subtime(0)),
1125 inflight_parts: 3,
1126 retired_parts: 2,
1127 },
1128 ProcessXParts(1),
1132 AssertBackpressured {
1136 frontier: (53, Subtime(0)),
1137 inflight_parts: 2,
1138 retired_parts: 1,
1139 },
1140 ProcessXParts(5),
1142 AssertBackpressured {
1143 frontier: (101, Subtime(0)),
1144 inflight_parts: 5,
1145 retired_parts: 5,
1146 },
1147 AssertOutputFrontier((100, Subtime(9))),
1148 ],
1149 true,
1150 );
1151 }
1152
1153 #[mz_ore::test]
1154 fn test_backpressure_granular() {
1155 use Step::*;
1156 backpressure_runner(
1157 vec![(50, Part(101)), (50, Part(101))],
1158 100,
1159 (0, Subtime(1)),
1160 vec![
1161 AssertOutputFrontier((50, Subtime(1))),
1163 AssertBackpressured {
1166 frontier: (0, Subtime(1)),
1167 inflight_parts: 1,
1168 retired_parts: 0,
1169 },
1170 AssertBackpressured {
1171 frontier: (50, Subtime(1)),
1172 inflight_parts: 1,
1173 retired_parts: 0,
1174 },
1175 ProcessXParts(1),
1177 AssertBackpressured {
1179 frontier: (50, Subtime(2)),
1180 inflight_parts: 1,
1181 retired_parts: 1,
1182 },
1183 AssertOutputFrontier((50, Subtime(2))),
1185 ],
1186 false,
1187 );
1188
1189 backpressure_runner(
1190 vec![
1191 (50, Part(10)),
1192 (50, Part(10)),
1193 (51, Part(35)),
1194 (52, Part(100)),
1195 ],
1196 50,
1197 (0, Subtime(1)),
1198 vec![
1199 AssertOutputFrontier((51, Subtime(3))),
1201 AssertBackpressured {
1202 frontier: (0, Subtime(1)),
1203 inflight_parts: 3,
1204 retired_parts: 0,
1205 },
1206 AssertBackpressured {
1207 frontier: (50, Subtime(1)),
1208 inflight_parts: 3,
1209 retired_parts: 0,
1210 },
1211 ProcessXParts(1),
1213 AssertBackpressured {
1214 frontier: (50, Subtime(2)),
1215 inflight_parts: 3,
1216 retired_parts: 1,
1217 },
1218 AssertOutputFrontier((52, Subtime(4))),
1221 ProcessXParts(2),
1222 AssertBackpressured {
1223 frontier: (52, Subtime(4)),
1224 inflight_parts: 3,
1225 retired_parts: 2,
1226 },
1227 ],
1228 false,
1229 );
1230 }
1231
1232 type Time = (u64, Subtime);
1233 #[derive(Clone, Debug)]
1234 struct Part(usize);
1235 impl Backpressureable for Part {
1236 fn byte_size(&self) -> usize {
1237 self.0
1238 }
1239 }
1240
1241 enum Step {
1243 AssertOutputFrontier(Time),
1246 AssertBackpressured {
1250 frontier: Time,
1251 inflight_parts: usize,
1252 retired_parts: usize,
1253 },
1254 ProcessXParts(usize),
1256 }
1257
1258 fn backpressure_runner(
1260 input: Vec<(u64, Part)>,
1262 max_inflight_bytes: usize,
1264 summary: Time,
1266 steps: Vec<Step>,
1268 non_granular_consumer: bool,
1271 ) {
1272 timely::execute::execute_directly(move |worker| {
1273 let (
1274 backpressure_probe,
1275 consumer_tx,
1276 mut backpressure_status_rx,
1277 finalizer_tx,
1278 _token,
1279 ) =
1280 worker.dataflow::<u64, _, _>(|outer_scope| {
1282 let (non_granular_feedback_handle, non_granular_feedback) =
1283 if non_granular_consumer {
1284 let (h, f) = outer_scope.feedback(Default::default());
1285 (Some(h), Some(f))
1286 } else {
1287 (None, None)
1288 };
1289 let (
1290 backpressure_probe,
1291 consumer_tx,
1292 backpressure_status_rx,
1293 token,
1294 backpressured,
1295 finalizer_tx,
1296 ) = outer_scope.scoped::<(u64, Subtime), _, _>("hybrid", |scope| {
1297 let (input, finalizer_tx) =
1298 iterator_operator(scope.clone(), input.into_iter());
1299
1300 let (flow_control, granular_feedback_handle) = if non_granular_consumer {
1301 (
1302 FlowControl {
1303 progress_stream: non_granular_feedback.unwrap().enter(scope),
1304 max_inflight_bytes,
1305 summary,
1306 metrics: None
1307 },
1308 None,
1309 )
1310 } else {
1311 let (granular_feedback_handle, granular_feedback) =
1312 scope.feedback(Default::default());
1313 (
1314 FlowControl {
1315 progress_stream: granular_feedback,
1316 max_inflight_bytes,
1317 summary,
1318 metrics: None,
1319 },
1320 Some(granular_feedback_handle),
1321 )
1322 };
1323
1324 let (backpressure_status_tx, backpressure_status_rx) = unbounded_channel();
1325
1326 let (backpressured, token) = backpressure(
1327 scope,
1328 "test",
1329 input,
1330 flow_control,
1331 0,
1332 Some(backpressure_status_tx),
1333 );
1334
1335 let tx = if !non_granular_consumer {
1337 Some(consumer_operator(
1338 scope.clone(),
1339 backpressured.clone(),
1340 granular_feedback_handle.unwrap(),
1341 ))
1342 } else {
1343 None
1344 };
1345
1346 let (probe_handle, backpressured) = backpressured.probe();
1347 (
1348 probe_handle,
1349 tx,
1350 backpressure_status_rx,
1351 token,
1352 backpressured.leave(outer_scope),
1353 finalizer_tx,
1354 )
1355 });
1356
1357 let consumer_tx = if non_granular_consumer {
1359 consumer_operator(
1360 outer_scope.clone(),
1361 backpressured,
1362 non_granular_feedback_handle.unwrap(),
1363 )
1364 } else {
1365 consumer_tx.unwrap()
1366 };
1367
1368 (
1369 backpressure_probe,
1370 consumer_tx,
1371 backpressure_status_rx,
1372 finalizer_tx,
1373 token,
1374 )
1375 });
1376
1377 use Step::*;
1378 for step in steps {
1379 match step {
1380 AssertOutputFrontier(time) => {
1381 eprintln!("checking advance to {time:?}");
1382 backpressure_probe.with_frontier(|front| {
1383 eprintln!("current backpressure output frontier: {front:?}");
1384 });
1385 while backpressure_probe.less_than(&time) {
1386 worker.step();
1387 backpressure_probe.with_frontier(|front| {
1388 eprintln!("current backpressure output frontier: {front:?}");
1389 });
1390 std::thread::sleep(std::time::Duration::from_millis(25));
1391 }
1392 }
1393 ProcessXParts(parts) => {
1394 eprintln!("processing {parts:?} parts");
1395 for _ in 0..parts {
1396 consumer_tx.send(()).unwrap();
1397 }
1398 }
1399 AssertBackpressured {
1400 frontier,
1401 inflight_parts,
1402 retired_parts,
1403 } => {
1404 let frontier = Antichain::from_elem(frontier);
1405 eprintln!(
1406 "asserting backpressured at {frontier:?}, with {inflight_parts:?} inflight parts \
1407 and {retired_parts:?} retired"
1408 );
1409 let (new_frontier, new_count, new_retired_count) = loop {
1410 if let Ok(val) = backpressure_status_rx.try_recv() {
1411 break val;
1412 }
1413 worker.step();
1414 std::thread::sleep(std::time::Duration::from_millis(25));
1415 };
1416 assert_eq!(
1417 (frontier, inflight_parts, retired_parts),
1418 (new_frontier, new_count, new_retired_count)
1419 );
1420 }
1421 }
1422 }
1423 let _ = finalizer_tx.send(());
1425 });
1426 }
1427
1428 fn iterator_operator<'scope, I: Iterator<Item = (u64, Part)> + 'static>(
1431 scope: Scope<'scope, (u64, Subtime)>,
1432 mut input: I,
1433 ) -> (StreamVec<'scope, (u64, Subtime), Part>, oneshot::Sender<()>) {
1434 let (finalizer_tx, finalizer_rx) = oneshot::channel();
1435 let mut iterator = AsyncOperatorBuilder::new("iterator".to_string(), scope);
1436 let (output_handle, output) = iterator.new_output::<CapacityContainerBuilder<Vec<Part>>>();
1437
1438 iterator.build(|mut caps| async move {
1439 let mut capability = Some(caps.pop().unwrap());
1440 let mut last = None;
1441 while let Some(element) = input.next() {
1442 let time = element.0.clone();
1443 let part = element.1;
1444 last = Some((time, Subtime(0)));
1445 output_handle.give(&capability.as_ref().unwrap().delayed(&last.unwrap()), part);
1446 }
1447 if let Some(last) = last {
1448 capability
1449 .as_mut()
1450 .unwrap()
1451 .downgrade(&(last.0 + 1, last.1));
1452 }
1453
1454 let _ = finalizer_rx.await;
1455 capability.take();
1456 });
1457
1458 (output, finalizer_tx)
1459 }
1460
1461 fn consumer_operator<
1465 'scope,
1466 T: timely::progress::Timestamp,
1467 O: Backpressureable + std::fmt::Debug,
1468 >(
1469 scope: Scope<'scope, T>,
1470 input: StreamVec<'scope, T, O>,
1471 feedback: timely::dataflow::operators::feedback::Handle<
1472 'scope,
1473 T,
1474 Vec<std::convert::Infallible>,
1475 >,
1476 ) -> UnboundedSender<()> {
1477 let (tx, mut rx) = unbounded_channel::<()>();
1478 let mut consumer = AsyncOperatorBuilder::new("consumer".to_string(), scope);
1479 let (output_handle, output) =
1480 consumer.new_output::<CapacityContainerBuilder<Vec<std::convert::Infallible>>>();
1481 let mut input = consumer.new_input_for(input, Pipeline, &output_handle);
1482
1483 consumer.build(|_caps| async move {
1484 while let Some(()) = rx.recv().await {
1485 while let Some(Event::Progress(_)) = input.next().await {}
1487 }
1488 });
1489 output.connect_loop(feedback);
1490
1491 tx
1492 }
1493}