1use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
13use std::convert::Infallible;
14use std::fmt::Debug;
15use std::future::Future;
16use std::hash::Hash;
17use std::sync::Arc;
18use std::time::Instant;
19
20use differential_dataflow::lattice::Lattice;
21use futures::{StreamExt, future::Either};
22use mz_expr::{ColumnSpecs, EvalError, Interpreter, MfpPlan, ResultSpec, UnmaterializableFunc};
23use mz_ore::cast::CastFrom;
24use mz_ore::collections::CollectionExt;
25use mz_ore::str::redact;
26use mz_persist_client::cache::PersistClientCache;
27use mz_persist_client::cfg::{PersistConfig, RetryParameters};
28use mz_persist_client::fetch::{ExchangeableBatchPart, ShardSourcePart};
29use mz_persist_client::fetch::{FetchedBlob, FetchedPart};
30use mz_persist_client::operators::shard_source::{
31 ErrorHandler, FilterResult, SnapshotMode, shard_source,
32};
33use mz_persist_client::stats::STATS_AUDIT_PANIC;
34use mz_persist_types::Codec64;
35use mz_persist_types::codec_impls::UnitSchema;
36use mz_persist_types::columnar::{ColumnEncoder, Schema};
37use mz_repr::{
38 Datum, DatumVec, Diff, GlobalId, RelationDesc, ReprRelationType, Row, RowArena, Timestamp,
39};
40use mz_storage_types::StorageDiff;
41use mz_storage_types::controller::{CollectionMetadata, TxnsCodecRow};
42use mz_storage_types::errors::DataflowError;
43use mz_storage_types::sources::SourceData;
44use mz_storage_types::stats::RelationPartStats;
45use mz_timely_util::builder_async::{
46 Event, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
47};
48use mz_timely_util::probe::ProbeNotify;
49use mz_txn_wal::operator::{TxnsContext, txns_progress};
50use serde::{Deserialize, Serialize};
51use timely::PartialOrder;
52use timely::container::CapacityContainerBuilder;
53use timely::dataflow::channels::pact::Pipeline;
54use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
55use timely::dataflow::operators::generic::{OutputBuilder, OutputBuilderSession};
56use timely::dataflow::operators::{Capability, Leave, OkErr};
57use timely::dataflow::operators::{CapabilitySet, ConnectLoop, Feedback};
58use timely::dataflow::{Scope, Stream, StreamVec};
59use timely::order::TotalOrder;
60use timely::progress::Antichain;
61use timely::progress::Timestamp as TimelyTimestamp;
62use timely::progress::timestamp::PathSummary;
63use timely::scheduling::Activator;
64use tokio::sync::mpsc::UnboundedSender;
65use tracing::{error, trace};
66
67use crate::metrics::BackpressureMetrics;
68
69#[derive(
77 Copy,
78 Clone,
79 PartialEq,
80 Default,
81 Eq,
82 PartialOrd,
83 Ord,
84 Debug,
85 Serialize,
86 Deserialize,
87 Hash,
88 columnar::Columnar
89)]
90#[columnar(derive(PartialEq, Eq, PartialOrd, Ord))]
93pub struct Subtime(u64);
94
95impl PartialOrder for Subtime {
96 fn less_equal(&self, other: &Self) -> bool {
97 self.0.less_equal(&other.0)
98 }
99}
100
101impl TotalOrder for Subtime {}
102
103impl PathSummary<Subtime> for Subtime {
104 fn results_in(&self, src: &Subtime) -> Option<Subtime> {
105 self.0.results_in(&src.0).map(Subtime)
106 }
107
108 fn followed_by(&self, other: &Self) -> Option<Self> {
109 self.0.followed_by(&other.0).map(Subtime)
110 }
111}
112
113impl TimelyTimestamp for Subtime {
114 type Summary = Subtime;
115
116 fn minimum() -> Self {
117 Subtime(0)
118 }
119}
120
121impl columnation::Columnation for Subtime {
122 type InnerRegion = columnation::CopyRegion<Subtime>;
123}
124
125impl differential_dataflow::lattice::Lattice for Subtime {
126 fn join(&self, other: &Self) -> Self {
127 Subtime(std::cmp::max(self.0, other.0))
128 }
129 fn meet(&self, other: &Self) -> Self {
130 Subtime(std::cmp::min(self.0, other.0))
131 }
132}
133
134impl differential_dataflow::lattice::Maximum for Subtime {
135 fn maximum() -> Self {
136 Subtime(u64::MAX)
137 }
138}
139
140impl Subtime {
141 pub const fn least_summary() -> Self {
143 Subtime(1)
144 }
145}
146
147pub fn persist_source<'scope, E>(
170 scope: Scope<'scope, mz_repr::Timestamp>,
171 source_id: GlobalId,
172 persist_clients: Arc<PersistClientCache>,
173 txns_ctx: &TxnsContext,
174 metadata: CollectionMetadata,
175 read_schema: Option<RelationDesc>,
176 as_of: Option<Antichain<Timestamp>>,
177 snapshot_mode: SnapshotMode,
178 until: Antichain<Timestamp>,
179 map_filter_project: Option<&mut MfpPlan>,
180 max_inflight_bytes: Option<usize>,
181 start_signal: impl Future<Output = ()> + Send + 'static,
182 error_handler: ErrorHandler,
183) -> (
184 StreamVec<'scope, mz_repr::Timestamp, (Row, Timestamp, Diff)>,
185 StreamVec<'scope, mz_repr::Timestamp, (E, Timestamp, Diff)>,
186 Vec<PressOnDropButton>,
187)
188where
189 E: timely::ExchangeData + Ord + Clone + Debug + From<DataflowError> + From<EvalError>,
190{
191 let shard_metrics = persist_clients.shard_metrics(&metadata.data_shard, &source_id.to_string());
192
193 let mut tokens = vec![];
194
195 let outer = scope.clone();
196 let stream = scope.scoped(&format!("granular_backpressure({})", source_id), |scope| {
197 let (flow_control, flow_control_probe) = match max_inflight_bytes {
198 Some(max_inflight_bytes) => {
199 let backpressure_metrics = BackpressureMetrics {
200 emitted_bytes: Arc::clone(&shard_metrics.backpressure_emitted_bytes),
201 last_backpressured_bytes: Arc::clone(
202 &shard_metrics.backpressure_last_backpressured_bytes,
203 ),
204 retired_bytes: Arc::clone(&shard_metrics.backpressure_retired_bytes),
205 };
206
207 let probe = mz_timely_util::probe::Handle::default();
208 let progress_stream = mz_timely_util::probe::source(
209 scope.clone(),
210 format!("decode_backpressure_probe({source_id})"),
211 probe.clone(),
212 );
213 let flow_control = FlowControl {
214 progress_stream,
215 max_inflight_bytes,
216 summary: (Default::default(), Subtime::least_summary()),
217 metrics: Some(backpressure_metrics),
218 };
219 (Some(flow_control), Some(probe))
220 }
221 None => (None, None),
222 };
223
224 let cfg = Arc::clone(&persist_clients.cfg().configs);
230 let subscribe_sleep = match metadata.txns_shard {
231 Some(_) => Some(move || mz_txn_wal::operator::txns_data_shard_retry_params(&cfg)),
232 None => None,
233 };
234
235 let (stream, source_tokens) = persist_source_core(
236 outer,
237 scope,
238 source_id,
239 Arc::clone(&persist_clients),
240 metadata.clone(),
241 read_schema,
242 as_of.clone(),
243 snapshot_mode,
244 until.clone(),
245 map_filter_project,
246 flow_control,
247 subscribe_sleep,
248 start_signal,
249 error_handler,
250 );
251 tokens.extend(source_tokens);
252
253 let stream = match flow_control_probe {
254 Some(probe) => stream.probe_notify_with(vec![probe]),
255 None => stream,
256 };
257
258 stream.leave(outer)
259 });
260
261 let (stream, txns_tokens) = match metadata.txns_shard {
266 Some(txns_shard) => txns_progress::<SourceData, (), Timestamp, i64, _, TxnsCodecRow, _>(
267 stream,
268 &source_id.to_string(),
269 txns_ctx,
270 move || {
271 let (c, l) = (
272 Arc::clone(&persist_clients),
273 metadata.persist_location.clone(),
274 );
275 async move { c.open(l).await.expect("location is valid") }
276 },
277 txns_shard,
278 metadata.data_shard,
279 as_of
280 .expect("as_of is provided for table sources")
281 .into_option()
282 .expect("shard is not closed"),
283 until,
284 Arc::new(metadata.relation_desc),
285 Arc::new(UnitSchema),
286 ),
287 None => (stream, vec![]),
288 };
289 tokens.extend(txns_tokens);
290 let (ok_stream, err_stream) = stream.ok_err(|(d, t, r)| match d {
291 Ok(row) => Ok((row, t.0, r)),
292 Err(err) => Err((err, t.0, r)),
293 });
294 (ok_stream, err_stream, tokens)
295}
296
297type RefinedScope<'scope, T> = Scope<'scope, (T, Subtime)>;
298
299#[allow(clippy::needless_borrow)]
306pub fn persist_source_core<'g, 'outer, E>(
307 outer: Scope<'outer, mz_repr::Timestamp>,
308 scope: RefinedScope<'g, mz_repr::Timestamp>,
309 source_id: GlobalId,
310 persist_clients: Arc<PersistClientCache>,
311 metadata: CollectionMetadata,
312 read_schema: Option<RelationDesc>,
313 as_of: Option<Antichain<Timestamp>>,
314 snapshot_mode: SnapshotMode,
315 until: Antichain<Timestamp>,
316 map_filter_project: Option<&mut MfpPlan>,
317 flow_control: Option<FlowControl<'g, (mz_repr::Timestamp, Subtime)>>,
318 listen_sleep: Option<impl Fn() -> RetryParameters + Send + 'static>,
320 start_signal: impl Future<Output = ()> + Send + 'static,
321 error_handler: ErrorHandler,
322) -> (
323 Stream<
324 'g,
325 (mz_repr::Timestamp, Subtime),
326 Vec<(Result<Row, E>, (mz_repr::Timestamp, Subtime), Diff)>,
327 >,
328 Vec<PressOnDropButton>,
329)
330where
331 E: timely::ExchangeData + Ord + Clone + Debug + From<DataflowError> + From<EvalError>,
332{
333 let cfg = persist_clients.cfg().clone();
334 let name = source_id.to_string();
335 let filter_plan = map_filter_project.as_ref().map(|p| (*p).clone());
336
337 let read_desc = match read_schema {
339 Some(desc) => desc,
340 None => metadata.relation_desc,
341 };
342
343 let desc_transformer = match flow_control {
344 Some(flow_control) => Some(move |scope, descs, chosen_worker| {
345 let (stream, token) = backpressure(
346 scope,
347 &format!("backpressure({source_id})"),
348 descs,
349 flow_control,
350 chosen_worker,
351 None,
352 );
353 (stream, vec![token])
354 }),
355 None => None,
356 };
357
358 let metrics = Arc::clone(persist_clients.metrics());
359 let filter_name = name.clone();
360 let upper = until.as_option().cloned().unwrap_or(Timestamp::MAX);
364 let (fetched, token) = shard_source(
365 outer,
366 scope,
367 &name,
368 move || {
369 let (c, l) = (
370 Arc::clone(&persist_clients),
371 metadata.persist_location.clone(),
372 );
373 async move { c.open(l).await.unwrap() }
374 },
375 metadata.data_shard,
376 as_of,
377 snapshot_mode,
378 until.clone(),
379 desc_transformer,
380 Arc::new(read_desc.clone()),
381 Arc::new(UnitSchema),
382 move |stats, frontier| {
383 let Some(lower) = frontier.as_option().copied() else {
384 return FilterResult::Discard;
387 };
388
389 if lower > upper {
390 return FilterResult::Discard;
393 }
394
395 let time_range =
396 ResultSpec::value_between(Datum::MzTimestamp(lower), Datum::MzTimestamp(upper));
397 if let Some(plan) = &filter_plan {
398 let metrics = &metrics.pushdown.part_stats;
399 let stats = RelationPartStats::new(&filter_name, metrics, &read_desc, stats);
400 filter_result(&read_desc, time_range, stats, plan)
401 } else {
402 FilterResult::Keep
403 }
404 },
405 listen_sleep,
406 start_signal,
407 error_handler,
408 );
409 let rows = decode_and_mfp(cfg, fetched, &name, until, map_filter_project);
410 (rows, token)
411}
412
413fn filter_result(
414 relation_desc: &RelationDesc,
415 time_range: ResultSpec,
416 stats: RelationPartStats,
417 plan: &MfpPlan,
418) -> FilterResult {
419 let arena = RowArena::new();
420 let relation = ReprRelationType::from(relation_desc.typ());
421 let mut ranges = ColumnSpecs::new(&relation, &arena);
422 ranges.push_unmaterializable(UnmaterializableFunc::MzNow, time_range);
423
424 let may_error = stats.err_count().map_or(true, |count| count > 0);
425
426 for (pos, (idx, _, _)) in relation_desc.iter_all().enumerate() {
429 let result_spec = stats.col_stats(idx, &arena);
430 ranges.push_column(pos, result_spec);
431 }
432 let result = ranges.mfp_plan_filter(plan).range;
433 let may_error = may_error || result.may_fail();
434 let may_keep = result.may_contain(Datum::True);
435 let may_skip = result.may_contain(Datum::False) || result.may_contain(Datum::Null);
436 if relation_desc.len() == 0 && !may_error && !may_skip {
437 let Ok(mut key) = <RelationDesc as Schema<SourceData>>::encoder(relation_desc) else {
438 return FilterResult::Keep;
439 };
440 key.append(&SourceData(Ok(Row::default())));
441 let key = key.finish();
442 let Ok(mut val) = <UnitSchema as Schema<()>>::encoder(&UnitSchema) else {
443 return FilterResult::Keep;
444 };
445 val.append(&());
446 let val = val.finish();
447
448 FilterResult::ReplaceWith {
449 key: Arc::new(key),
450 val: Arc::new(val),
451 }
452 } else if may_error || may_keep {
453 FilterResult::Keep
454 } else {
455 FilterResult::Discard
456 }
457}
458
459pub fn decode_and_mfp<'scope, E>(
460 cfg: PersistConfig,
461 fetched: StreamVec<
462 'scope,
463 (mz_repr::Timestamp, Subtime),
464 FetchedBlob<SourceData, (), Timestamp, StorageDiff>,
465 >,
466 name: &str,
467 until: Antichain<Timestamp>,
468 mut map_filter_project: Option<&mut MfpPlan>,
469) -> StreamVec<
470 'scope,
471 (mz_repr::Timestamp, Subtime),
472 (Result<Row, E>, (mz_repr::Timestamp, Subtime), Diff),
473>
474where
475 E: timely::ExchangeData + Ord + Clone + Debug + From<DataflowError> + From<EvalError>,
476{
477 let scope = fetched.scope();
478 let mut builder = OperatorBuilder::new(
479 format!("persist_source::decode_and_mfp({})", name),
480 scope.clone(),
481 );
482 let operator_info = builder.operator_info();
483
484 let mut fetched_input = builder.new_input(fetched, Pipeline);
485 let (updates_output, updates_stream) = builder.new_output();
486 let mut updates_output = OutputBuilder::from(updates_output);
487
488 let mut datum_vec = mz_repr::DatumVec::new();
490 let mut row_builder = Row::default();
491
492 let map_filter_project = map_filter_project.as_mut().map(|mfp| mfp.take());
494
495 builder.build(move |_caps| {
496 let name = name.to_owned();
497 let activations = scope.activations();
499 let activator = Activator::new(operator_info.address, activations);
500 let mut pending_work = std::collections::VecDeque::new();
502 let panic_on_audit_failure = STATS_AUDIT_PANIC.handle(&cfg);
503
504 move |_frontier| {
505 fetched_input.for_each(|time, data| {
506 let capability = time.retain(0);
507 for fetched_blob in data.drain(..) {
508 pending_work.push_back(PendingWork {
509 panic_on_audit_failure: panic_on_audit_failure.get(),
510 capability: capability.clone(),
511 part: PendingPart::Unparsed(fetched_blob),
512 })
513 }
514 });
515
516 let yield_fuel = cfg.storage_source_decode_fuel();
519 let yield_fn = |_, work| work >= yield_fuel;
520
521 let mut work = 0;
522 let start_time = Instant::now();
523 let mut output = updates_output.activate();
524 while !pending_work.is_empty() && !yield_fn(start_time, work) {
525 let done = pending_work.front_mut().unwrap().do_work(
526 &mut work,
527 &name,
528 start_time,
529 yield_fn,
530 &until,
531 map_filter_project.as_ref(),
532 &mut datum_vec,
533 &mut row_builder,
534 &mut output,
535 );
536 if done {
537 pending_work.pop_front();
538 }
539 }
540 if !pending_work.is_empty() {
541 activator.activate();
542 }
543 }
544 });
545
546 updates_stream
547}
548
549struct PendingWork {
551 panic_on_audit_failure: bool,
553 capability: Capability<(mz_repr::Timestamp, Subtime)>,
555 part: PendingPart,
557}
558
559enum PendingPart {
560 Unparsed(FetchedBlob<SourceData, (), Timestamp, StorageDiff>),
561 Parsed {
562 part: ShardSourcePart<SourceData, (), Timestamp, StorageDiff>,
563 },
564}
565
566impl PendingPart {
567 fn part_mut(&mut self) -> &mut FetchedPart<SourceData, (), Timestamp, StorageDiff> {
574 match self {
575 PendingPart::Unparsed(x) => {
576 *self = PendingPart::Parsed { part: x.parse() };
577 self.part_mut()
579 }
580 PendingPart::Parsed { part } => &mut part.part,
581 }
582 }
583}
584
585impl PendingWork {
586 fn do_work<YFn, E>(
589 &mut self,
590 work: &mut usize,
591 name: &str,
592 start_time: Instant,
593 yield_fn: YFn,
594 until: &Antichain<Timestamp>,
595 map_filter_project: Option<&MfpPlan>,
596 datum_vec: &mut DatumVec,
597 row_builder: &mut Row,
598 output: &mut OutputBuilderSession<
599 '_,
600 (mz_repr::Timestamp, Subtime),
601 ConsolidatingContainerBuilder<
602 Vec<(Result<Row, E>, (mz_repr::Timestamp, Subtime), Diff)>,
603 >,
604 >,
605 ) -> bool
606 where
607 YFn: Fn(Instant, usize) -> bool,
608 E: timely::ExchangeData + Ord + Clone + Debug + From<DataflowError> + From<EvalError>,
609 {
610 let mut session = output.session_with_builder(&self.capability);
611 let fetched_part = self.part.part_mut();
612 let is_filter_pushdown_audit = fetched_part.is_filter_pushdown_audit();
613 let mut row_buf = None;
614 while let Some(((key, val), time, diff)) =
615 fetched_part.next_with_storage(&mut row_buf, &mut None)
616 {
617 if until.less_equal(&time) {
618 continue;
619 }
620 match (key, val) {
621 (SourceData(Ok(row)), ()) => {
622 if let Some(mfp) = map_filter_project {
623 *work += 1;
630 let arena = mz_repr::RowArena::new();
631 let mut datums_local = datum_vec.borrow_with(&row);
632 for result in mfp.evaluate(
633 &mut datums_local,
634 &arena,
635 time,
636 diff.into(),
637 |time| !until.less_equal(time),
638 row_builder,
639 ) {
640 if let Some(stats) = &is_filter_pushdown_audit {
644 sentry::with_scope(
648 |scope| {
649 scope
650 .set_tag("alert_id", "persist_pushdown_audit_violation")
651 },
652 || {
653 error!(
654 ?stats,
655 name,
656 mfp = ?redact(&mfp),
657 result = ?redact(&result),
658 "persist filter pushdown correctness violation!"
659 );
660 if self.panic_on_audit_failure {
661 panic!(
662 "persist filter pushdown correctness violation! {}",
663 name
664 );
665 }
666 },
667 );
668 }
669 match result {
670 Ok((row, time, diff)) => {
671 if !until.less_equal(&time) {
673 let mut emit_time = *self.capability.time();
674 emit_time.0 = time;
675 session.give((Ok(row), emit_time, diff));
676 *work += 1;
677 }
678 }
679 Err((err, time, diff)) => {
680 if !until.less_equal(&time) {
682 let mut emit_time = *self.capability.time();
683 emit_time.0 = time;
684 session.give((Err(err), emit_time, diff));
685 *work += 1;
686 }
687 }
688 }
689 }
690 drop(datums_local);
694 row_buf.replace(SourceData(Ok(row)));
695 } else {
696 let mut emit_time = *self.capability.time();
697 emit_time.0 = time;
698 session.give((Ok(row.clone()), emit_time, diff.into()));
700 row_buf.replace(SourceData(Ok(row)));
701 *work += 1;
702 }
703 }
704 (SourceData(Err(err)), ()) => {
705 let mut emit_time = *self.capability.time();
706 emit_time.0 = time;
707 session.give((Err(E::from(err)), emit_time, diff.into()));
708 *work += 1;
709 }
710 }
711 if yield_fn(start_time, *work) {
712 return false;
713 }
714 }
715 true
716 }
717}
718
719pub trait Backpressureable: Clone + 'static {
721 fn byte_size(&self) -> usize;
723}
724
725impl<T: Clone + 'static> Backpressureable for (usize, ExchangeableBatchPart<T>) {
726 fn byte_size(&self) -> usize {
727 self.1.encoded_size_bytes()
728 }
729}
730
731#[derive(Debug)]
733pub struct FlowControl<'scope, T: timely::progress::Timestamp> {
734 pub progress_stream: StreamVec<'scope, T, Infallible>,
740 pub max_inflight_bytes: usize,
742 pub summary: T::Summary,
745
746 pub metrics: Option<BackpressureMetrics>,
748}
749
750pub fn backpressure<'scope, T, O>(
763 scope: Scope<'scope, (T, Subtime)>,
764 name: &str,
765 data: StreamVec<'scope, (T, Subtime), O>,
766 flow_control: FlowControl<'scope, (T, Subtime)>,
767 chosen_worker: usize,
768 probe: Option<UnboundedSender<(Antichain<(T, Subtime)>, usize, usize)>>,
770) -> (StreamVec<'scope, (T, Subtime), O>, PressOnDropButton)
771where
772 T: TimelyTimestamp + Lattice + Codec64 + TotalOrder,
773 O: Backpressureable + std::fmt::Debug,
774{
775 let worker_index = scope.index();
776
777 let (flow_control_stream, flow_control_max_bytes, metrics) = (
778 flow_control.progress_stream,
779 flow_control.max_inflight_bytes,
780 flow_control.metrics,
781 );
782
783 let (handle, summaried_flow) = scope.feedback(flow_control.summary.clone());
788 flow_control_stream.connect_loop(handle);
789
790 let mut builder = AsyncOperatorBuilder::new(
791 format!("persist_source_backpressure({})", name),
792 scope.clone(),
793 );
794 let (data_output, data_stream) = builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
795
796 let mut data_input = builder.new_disconnected_input(data, Pipeline);
797 let mut flow_control_input = builder.new_disconnected_input(summaried_flow, Pipeline);
798
799 fn synthesize_frontiers<T: PartialOrder + Clone>(
801 mut frontier: Antichain<(T, Subtime)>,
802 mut time: (T, Subtime),
803 part_number: &mut u64,
804 ) -> (
805 (T, Subtime),
806 Antichain<(T, Subtime)>,
807 Antichain<(T, Subtime)>,
808 ) {
809 let mut next_frontier = frontier.clone();
810 time.1 = Subtime(*part_number);
811 frontier.insert(time.clone());
812 *part_number += 1;
813 let mut next_time = time.clone();
814 next_time.1 = Subtime(*part_number);
815 next_frontier.insert(next_time);
816 (time, frontier, next_frontier)
817 }
818
819 let data_input = async_stream::stream!({
822 let mut part_number = 0;
823 let mut parts: Vec<((T, Subtime), O)> = Vec::new();
824 loop {
825 match data_input.next().await {
826 None => {
827 let empty = Antichain::new();
828 parts.sort_by_key(|val| val.0.clone());
829 for (part_time, d) in parts.drain(..) {
830 let (part_time, frontier, next_frontier) = synthesize_frontiers(
831 empty.clone(),
832 part_time.clone(),
833 &mut part_number,
834 );
835 yield Either::Right((part_time, d, frontier, next_frontier))
836 }
837 break;
838 }
839 Some(Event::Data(time, data)) => {
840 for d in data {
841 parts.push((time.clone(), d));
842 }
843 }
844 Some(Event::Progress(prog)) => {
845 parts.sort_by_key(|val| val.0.clone());
846 for (part_time, d) in parts.extract_if(.., |p| !prog.less_equal(&p.0)) {
847 let (part_time, frontier, next_frontier) =
848 synthesize_frontiers(prog.clone(), part_time.clone(), &mut part_number);
849 yield Either::Right((part_time, d, frontier, next_frontier))
850 }
851 yield Either::Left(prog)
852 }
853 }
854 }
855 });
856 let shutdown_button = builder.build(move |caps| async move {
857 let mut cap_set = CapabilitySet::from_elem(caps.into_element());
859
860 let mut output_frontier = Antichain::from_elem(TimelyTimestamp::minimum());
862 let mut flow_control_frontier = Antichain::from_elem(TimelyTimestamp::minimum());
864
865 let mut inflight_parts = Vec::new();
867 let mut pending_parts = std::collections::VecDeque::new();
869
870 if worker_index != chosen_worker {
872 trace!(
873 "We are not the chosen worker ({}), exiting...",
874 chosen_worker
875 );
876 return;
877 }
878 tokio::pin!(data_input);
879 'emitting_parts: loop {
880 let inflight_bytes: usize = inflight_parts.iter().map(|(_, size)| size).sum();
883
884 if inflight_bytes < flow_control_max_bytes
892 || !PartialOrder::less_equal(&flow_control_frontier, &output_frontier)
893 {
894 let (time, part, next_frontier) =
895 if let Some((time, part, next_frontier)) = pending_parts.pop_front() {
896 (time, part, next_frontier)
897 } else {
898 match data_input.next().await {
899 Some(Either::Right((time, part, frontier, next_frontier))) => {
900 output_frontier = frontier;
905 cap_set.downgrade(output_frontier.iter());
906
907 if inflight_bytes >= flow_control_max_bytes
912 && !PartialOrder::less_than(
913 &output_frontier,
914 &flow_control_frontier,
915 )
916 {
917 pending_parts.push_back((time, part, next_frontier));
918 continue 'emitting_parts;
919 }
920 (time, part, next_frontier)
921 }
922 Some(Either::Left(prog)) => {
923 output_frontier = prog;
924 cap_set.downgrade(output_frontier.iter());
925 continue 'emitting_parts;
926 }
927 None => {
928 if pending_parts.is_empty() {
929 break 'emitting_parts;
930 } else {
931 continue 'emitting_parts;
932 }
933 }
934 }
935 };
936
937 let byte_size = part.byte_size();
938 if let Some(emission_ts) = flow_control.summary.results_in(&time) {
948 inflight_parts.push((emission_ts, byte_size));
949 }
950
951 data_output.give(&cap_set.delayed(&time), part);
954
955 if let Some(metrics) = &metrics {
956 metrics.emitted_bytes.inc_by(u64::cast_from(byte_size))
957 }
958
959 output_frontier = next_frontier;
960 cap_set.downgrade(output_frontier.iter())
961 } else {
962 if let Some(metrics) = &metrics {
963 metrics
964 .last_backpressured_bytes
965 .set(u64::cast_from(inflight_bytes))
966 }
967 let parts_count = inflight_parts.len();
968 let new_flow_control_frontier = match flow_control_input.next().await {
973 Some(Event::Progress(frontier)) => frontier,
974 Some(Event::Data(_, _)) => {
975 unreachable!("flow_control_input should not contain data")
976 }
977 None => Antichain::new(),
978 };
979
980 flow_control_frontier.clone_from(&new_flow_control_frontier);
982
983 let retired_parts = inflight_parts
985 .extract_if(.., |(ts, _size)| !flow_control_frontier.less_equal(ts));
986 let (retired_size, retired_count): (usize, usize) = retired_parts
987 .fold((0, 0), |(accum_size, accum_count), (_ts, size)| {
988 (accum_size + size, accum_count + 1)
989 });
990 trace!(
991 "returning {} parts with {} bytes, frontier: {:?}",
992 retired_count, retired_size, flow_control_frontier,
993 );
994
995 if let Some(metrics) = &metrics {
996 metrics.retired_bytes.inc_by(u64::cast_from(retired_size))
997 }
998
999 if let Some(probe) = probe.as_ref() {
1001 let _ = probe.send((new_flow_control_frontier, parts_count, retired_count));
1002 }
1003 }
1004 }
1005 });
1006 (data_stream, shutdown_button.press_on_drop())
1007}
1008
1009#[cfg(test)]
1010mod tests {
1011 use timely::container::CapacityContainerBuilder;
1012 use timely::dataflow::operators::{Enter, Probe};
1013 use tokio::sync::mpsc::unbounded_channel;
1014 use tokio::sync::oneshot;
1015
1016 use super::*;
1017
1018 #[mz_ore::test]
1019 fn test_backpressure_non_granular() {
1020 use Step::*;
1021 backpressure_runner(
1022 vec![(50, Part(101)), (50, Part(102)), (100, Part(1))],
1023 100,
1024 (1, Subtime(0)),
1025 vec![
1026 AssertOutputFrontier((50, Subtime(2))),
1029 AssertBackpressured {
1030 frontier: (1, Subtime(0)),
1031 inflight_parts: 1,
1032 retired_parts: 0,
1033 },
1034 AssertBackpressured {
1035 frontier: (51, Subtime(0)),
1036 inflight_parts: 1,
1037 retired_parts: 0,
1038 },
1039 ProcessXParts(2),
1040 AssertBackpressured {
1041 frontier: (101, Subtime(0)),
1042 inflight_parts: 2,
1043 retired_parts: 2,
1044 },
1045 AssertOutputFrontier((100, Subtime(3))),
1048 ],
1049 true,
1050 );
1051
1052 backpressure_runner(
1053 vec![
1054 (50, Part(10)),
1055 (50, Part(10)),
1056 (51, Part(100)),
1057 (52, Part(1000)),
1058 ],
1059 50,
1060 (1, Subtime(0)),
1061 vec![
1062 AssertOutputFrontier((51, Subtime(3))),
1064 AssertBackpressured {
1065 frontier: (1, Subtime(0)),
1066 inflight_parts: 3,
1067 retired_parts: 0,
1068 },
1069 ProcessXParts(3),
1070 AssertBackpressured {
1071 frontier: (52, Subtime(0)),
1072 inflight_parts: 3,
1073 retired_parts: 2,
1074 },
1075 AssertBackpressured {
1076 frontier: (53, Subtime(0)),
1077 inflight_parts: 1,
1078 retired_parts: 1,
1079 },
1080 AssertOutputFrontier((52, Subtime(4))),
1083 ],
1084 true,
1085 );
1086
1087 backpressure_runner(
1088 vec![
1089 (50, Part(98)),
1090 (50, Part(1)),
1091 (51, Part(10)),
1092 (52, Part(100)),
1093 (52, Part(10)),
1095 (52, Part(10)),
1096 (52, Part(10)),
1097 (52, Part(100)),
1098 (100, Part(100)),
1100 ],
1101 100,
1102 (1, Subtime(0)),
1103 vec![
1104 AssertOutputFrontier((51, Subtime(3))),
1105 AssertBackpressured {
1109 frontier: (1, Subtime(0)),
1110 inflight_parts: 3,
1111 retired_parts: 0,
1112 },
1113 AssertBackpressured {
1114 frontier: (51, Subtime(0)),
1115 inflight_parts: 3,
1116 retired_parts: 0,
1117 },
1118 ProcessXParts(1),
1119 AssertOutputFrontier((51, Subtime(3))),
1122 ProcessXParts(1),
1126 AssertOutputFrontier((52, Subtime(4))),
1127 AssertBackpressured {
1128 frontier: (52, Subtime(0)),
1129 inflight_parts: 3,
1130 retired_parts: 2,
1131 },
1132 ProcessXParts(1),
1136 AssertBackpressured {
1140 frontier: (53, Subtime(0)),
1141 inflight_parts: 2,
1142 retired_parts: 1,
1143 },
1144 ProcessXParts(5),
1146 AssertBackpressured {
1147 frontier: (101, Subtime(0)),
1148 inflight_parts: 5,
1149 retired_parts: 5,
1150 },
1151 AssertOutputFrontier((100, Subtime(9))),
1152 ],
1153 true,
1154 );
1155 }
1156
1157 #[mz_ore::test]
1158 fn test_backpressure_granular() {
1159 use Step::*;
1160 backpressure_runner(
1161 vec![(50, Part(101)), (50, Part(101))],
1162 100,
1163 (0, Subtime(1)),
1164 vec![
1165 AssertOutputFrontier((50, Subtime(1))),
1167 AssertBackpressured {
1170 frontier: (0, Subtime(1)),
1171 inflight_parts: 1,
1172 retired_parts: 0,
1173 },
1174 AssertBackpressured {
1175 frontier: (50, Subtime(1)),
1176 inflight_parts: 1,
1177 retired_parts: 0,
1178 },
1179 ProcessXParts(1),
1181 AssertBackpressured {
1183 frontier: (50, Subtime(2)),
1184 inflight_parts: 1,
1185 retired_parts: 1,
1186 },
1187 AssertOutputFrontier((50, Subtime(2))),
1189 ],
1190 false,
1191 );
1192
1193 backpressure_runner(
1194 vec![
1195 (50, Part(10)),
1196 (50, Part(10)),
1197 (51, Part(35)),
1198 (52, Part(100)),
1199 ],
1200 50,
1201 (0, Subtime(1)),
1202 vec![
1203 AssertOutputFrontier((51, Subtime(3))),
1205 AssertBackpressured {
1206 frontier: (0, Subtime(1)),
1207 inflight_parts: 3,
1208 retired_parts: 0,
1209 },
1210 AssertBackpressured {
1211 frontier: (50, Subtime(1)),
1212 inflight_parts: 3,
1213 retired_parts: 0,
1214 },
1215 ProcessXParts(1),
1217 AssertBackpressured {
1218 frontier: (50, Subtime(2)),
1219 inflight_parts: 3,
1220 retired_parts: 1,
1221 },
1222 AssertOutputFrontier((52, Subtime(4))),
1225 ProcessXParts(2),
1226 AssertBackpressured {
1227 frontier: (52, Subtime(4)),
1228 inflight_parts: 3,
1229 retired_parts: 2,
1230 },
1231 ],
1232 false,
1233 );
1234 }
1235
1236 type Time = (u64, Subtime);
1237 #[derive(Clone, Debug)]
1238 struct Part(usize);
1239 impl Backpressureable for Part {
1240 fn byte_size(&self) -> usize {
1241 self.0
1242 }
1243 }
1244
1245 enum Step {
1247 AssertOutputFrontier(Time),
1250 AssertBackpressured {
1254 frontier: Time,
1255 inflight_parts: usize,
1256 retired_parts: usize,
1257 },
1258 ProcessXParts(usize),
1260 }
1261
1262 fn backpressure_runner(
1264 input: Vec<(u64, Part)>,
1266 max_inflight_bytes: usize,
1268 summary: Time,
1270 steps: Vec<Step>,
1272 non_granular_consumer: bool,
1275 ) {
1276 timely::execute::execute_directly(move |worker| {
1277 let (
1278 backpressure_probe,
1279 consumer_tx,
1280 mut backpressure_status_rx,
1281 finalizer_tx,
1282 _token,
1283 ) =
1284 worker.dataflow::<u64, _, _>(|outer_scope| {
1286 let (non_granular_feedback_handle, non_granular_feedback) =
1287 if non_granular_consumer {
1288 let (h, f) = outer_scope.feedback(Default::default());
1289 (Some(h), Some(f))
1290 } else {
1291 (None, None)
1292 };
1293 let (
1294 backpressure_probe,
1295 consumer_tx,
1296 backpressure_status_rx,
1297 token,
1298 backpressured,
1299 finalizer_tx,
1300 ) = outer_scope.scoped::<(u64, Subtime), _, _>("hybrid", |scope| {
1301 let (input, finalizer_tx) =
1302 iterator_operator(scope.clone(), input.into_iter());
1303
1304 let (flow_control, granular_feedback_handle) = if non_granular_consumer {
1305 (
1306 FlowControl {
1307 progress_stream: non_granular_feedback.unwrap().enter(scope),
1308 max_inflight_bytes,
1309 summary,
1310 metrics: None
1311 },
1312 None,
1313 )
1314 } else {
1315 let (granular_feedback_handle, granular_feedback) =
1316 scope.feedback(Default::default());
1317 (
1318 FlowControl {
1319 progress_stream: granular_feedback,
1320 max_inflight_bytes,
1321 summary,
1322 metrics: None,
1323 },
1324 Some(granular_feedback_handle),
1325 )
1326 };
1327
1328 let (backpressure_status_tx, backpressure_status_rx) = unbounded_channel();
1329
1330 let (backpressured, token) = backpressure(
1331 scope,
1332 "test",
1333 input,
1334 flow_control,
1335 0,
1336 Some(backpressure_status_tx),
1337 );
1338
1339 let tx = if !non_granular_consumer {
1341 Some(consumer_operator(
1342 scope.clone(),
1343 backpressured.clone(),
1344 granular_feedback_handle.unwrap(),
1345 ))
1346 } else {
1347 None
1348 };
1349
1350 let (probe_handle, backpressured) = backpressured.probe();
1351 (
1352 probe_handle,
1353 tx,
1354 backpressure_status_rx,
1355 token,
1356 backpressured.leave(outer_scope),
1357 finalizer_tx,
1358 )
1359 });
1360
1361 let consumer_tx = if non_granular_consumer {
1363 consumer_operator(
1364 outer_scope.clone(),
1365 backpressured,
1366 non_granular_feedback_handle.unwrap(),
1367 )
1368 } else {
1369 consumer_tx.unwrap()
1370 };
1371
1372 (
1373 backpressure_probe,
1374 consumer_tx,
1375 backpressure_status_rx,
1376 finalizer_tx,
1377 token,
1378 )
1379 });
1380
1381 use Step::*;
1382 for step in steps {
1383 match step {
1384 AssertOutputFrontier(time) => {
1385 eprintln!("checking advance to {time:?}");
1386 backpressure_probe.with_frontier(|front| {
1387 eprintln!("current backpressure output frontier: {front:?}");
1388 });
1389 while backpressure_probe.less_than(&time) {
1390 worker.step();
1391 backpressure_probe.with_frontier(|front| {
1392 eprintln!("current backpressure output frontier: {front:?}");
1393 });
1394 std::thread::sleep(std::time::Duration::from_millis(25));
1395 }
1396 }
1397 ProcessXParts(parts) => {
1398 eprintln!("processing {parts:?} parts");
1399 for _ in 0..parts {
1400 consumer_tx.send(()).unwrap();
1401 }
1402 }
1403 AssertBackpressured {
1404 frontier,
1405 inflight_parts,
1406 retired_parts,
1407 } => {
1408 let frontier = Antichain::from_elem(frontier);
1409 eprintln!(
1410 "asserting backpressured at {frontier:?}, with {inflight_parts:?} inflight parts \
1411 and {retired_parts:?} retired"
1412 );
1413 let (new_frontier, new_count, new_retired_count) = loop {
1414 if let Ok(val) = backpressure_status_rx.try_recv() {
1415 break val;
1416 }
1417 worker.step();
1418 std::thread::sleep(std::time::Duration::from_millis(25));
1419 };
1420 assert_eq!(
1421 (frontier, inflight_parts, retired_parts),
1422 (new_frontier, new_count, new_retired_count)
1423 );
1424 }
1425 }
1426 }
1427 let _ = finalizer_tx.send(());
1429 });
1430 }
1431
1432 fn iterator_operator<'scope, I: Iterator<Item = (u64, Part)> + 'static>(
1435 scope: Scope<'scope, (u64, Subtime)>,
1436 mut input: I,
1437 ) -> (StreamVec<'scope, (u64, Subtime), Part>, oneshot::Sender<()>) {
1438 let (finalizer_tx, finalizer_rx) = oneshot::channel();
1439 let mut iterator = AsyncOperatorBuilder::new("iterator".to_string(), scope);
1440 let (output_handle, output) = iterator.new_output::<CapacityContainerBuilder<Vec<Part>>>();
1441
1442 iterator.build(|mut caps| async move {
1443 let mut capability = Some(caps.pop().unwrap());
1444 let mut last = None;
1445 while let Some(element) = input.next() {
1446 let time = element.0.clone();
1447 let part = element.1;
1448 last = Some((time, Subtime(0)));
1449 output_handle.give(&capability.as_ref().unwrap().delayed(&last.unwrap()), part);
1450 }
1451 if let Some(last) = last {
1452 capability
1453 .as_mut()
1454 .unwrap()
1455 .downgrade(&(last.0 + 1, last.1));
1456 }
1457
1458 let _ = finalizer_rx.await;
1459 capability.take();
1460 });
1461
1462 (output, finalizer_tx)
1463 }
1464
1465 fn consumer_operator<
1469 'scope,
1470 T: timely::progress::Timestamp,
1471 O: Backpressureable + std::fmt::Debug,
1472 >(
1473 scope: Scope<'scope, T>,
1474 input: StreamVec<'scope, T, O>,
1475 feedback: timely::dataflow::operators::feedback::Handle<
1476 'scope,
1477 T,
1478 Vec<std::convert::Infallible>,
1479 >,
1480 ) -> UnboundedSender<()> {
1481 let (tx, mut rx) = unbounded_channel::<()>();
1482 let mut consumer = AsyncOperatorBuilder::new("consumer".to_string(), scope);
1483 let (output_handle, output) =
1484 consumer.new_output::<CapacityContainerBuilder<Vec<std::convert::Infallible>>>();
1485 let mut input = consumer.new_input_for(input, Pipeline, &output_handle);
1486
1487 consumer.build(|_caps| async move {
1488 while let Some(()) = rx.recv().await {
1489 while let Some(Event::Progress(_)) = input.next().await {}
1491 }
1492 });
1493 output.connect_loop(feedback);
1494
1495 tx
1496 }
1497}