1#![allow(missing_docs)]
24#![allow(clippy::needless_borrow)]
25
26use std::cell::RefCell;
27use std::collections::{BTreeMap, VecDeque};
28use std::convert::Infallible;
29use std::hash::{Hash, Hasher};
30use std::rc::Rc;
31use std::sync::Arc;
32use std::time::Duration;
33
34use differential_dataflow::lattice::Lattice;
35use differential_dataflow::{AsCollection, Hashable, VecCollection};
36use futures::stream::StreamExt;
37use mz_ore::cast::CastFrom;
38use mz_ore::collections::CollectionExt;
39use mz_ore::now::NowFn;
40use mz_persist_client::cache::PersistClientCache;
41use mz_repr::{Diff, GlobalId, RelationDesc, Row};
42use mz_storage_types::configuration::StorageConfiguration;
43use mz_storage_types::controller::CollectionMetadata;
44use mz_storage_types::dyncfgs;
45use mz_storage_types::errors::DataflowError;
46use mz_storage_types::sources::{SourceConnection, SourceExport, SourceTimestamp};
47use mz_timely_util::antichain::AntichainExt;
48use mz_timely_util::builder_async::{
49 Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
50};
51use mz_timely_util::capture::PusherCapture;
52use mz_timely_util::operator::ConcatenateFlatten;
53use mz_timely_util::reclock::reclock;
54use timely::PartialOrder;
55use timely::container::CapacityContainerBuilder;
56use timely::dataflow::channels::pact::Pipeline;
57use timely::dataflow::operators::capture::capture::Capture;
58use timely::dataflow::operators::capture::{Event, EventPusher};
59use timely::dataflow::operators::core::Map as _;
60use timely::dataflow::operators::generic::OutputBuilder;
61use timely::dataflow::operators::generic::builder_rc::OperatorBuilder as OperatorBuilderRc;
62use timely::dataflow::operators::{Broadcast, CapabilitySet, Inspect, Leave};
63use timely::dataflow::scopes::Child;
64use timely::dataflow::{Scope, Stream};
65use timely::order::TotalOrder;
66use timely::progress::frontier::MutableAntichain;
67use timely::progress::{Antichain, Timestamp};
68use tokio::sync::{Semaphore, watch};
69use tokio_stream::wrappers::WatchStream;
70use tracing::trace;
71
72use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate};
73use crate::metrics::StorageMetrics;
74use crate::metrics::source::SourceMetrics;
75use crate::source::probe;
76use crate::source::reclock::ReclockOperator;
77use crate::source::types::{Probe, SourceMessage, SourceOutput, SourceRender, StackedCollection};
78use crate::statistics::SourceStatistics;
79
80#[derive(Clone)]
83pub struct RawSourceCreationConfig {
84 pub name: String,
86 pub id: GlobalId,
88 pub source_exports: BTreeMap<GlobalId, SourceExport<CollectionMetadata>>,
90 pub worker_id: usize,
92 pub worker_count: usize,
94 pub timestamp_interval: Duration,
97 pub now_fn: NowFn,
99 pub metrics: StorageMetrics,
101 pub as_of: Antichain<mz_repr::Timestamp>,
103 pub resume_uppers: BTreeMap<GlobalId, Antichain<mz_repr::Timestamp>>,
106 pub source_resume_uppers: BTreeMap<GlobalId, Vec<Row>>,
113 pub persist_clients: Arc<PersistClientCache>,
115 pub statistics: BTreeMap<GlobalId, SourceStatistics>,
117 pub shared_remap_upper: Rc<RefCell<Antichain<mz_repr::Timestamp>>>,
119 pub config: StorageConfiguration,
121 pub remap_collection_id: GlobalId,
123 pub remap_metadata: CollectionMetadata,
125 pub busy_signal: Arc<Semaphore>,
128}
129
130#[derive(Clone)]
133pub struct SourceExportCreationConfig {
134 pub id: GlobalId,
136 pub worker_id: usize,
138 pub metrics: StorageMetrics,
140 pub source_statistics: SourceStatistics,
142}
143
144impl RawSourceCreationConfig {
145 pub fn responsible_worker<P: Hash>(&self, partition: P) -> usize {
147 let mut h = std::hash::DefaultHasher::default();
148 (self.id, partition).hash(&mut h);
149 let key = usize::cast_from(h.finish());
150 key % self.worker_count
151 }
152
153 pub fn responsible_for<P: Hash>(&self, partition: P) -> bool {
155 self.responsible_worker(partition) == self.worker_id
156 }
157}
158
159pub fn create_raw_source<'g, G: Scope<Timestamp = ()>, C>(
172 scope: &mut Child<'g, G, mz_repr::Timestamp>,
173 storage_state: &crate::storage_state::StorageState,
174 committed_upper: &Stream<Child<'g, G, mz_repr::Timestamp>, ()>,
175 config: &RawSourceCreationConfig,
176 source_connection: C,
177 start_signal: impl std::future::Future<Output = ()> + 'static,
178) -> (
179 BTreeMap<
180 GlobalId,
181 VecCollection<
182 Child<'g, G, mz_repr::Timestamp>,
183 Result<SourceOutput<C::Time>, DataflowError>,
184 Diff,
185 >,
186 >,
187 Stream<G, HealthStatusMessage>,
188 Vec<PressOnDropButton>,
189)
190where
191 C: SourceConnection + SourceRender + Clone + 'static,
192{
193 let worker_id = config.worker_id;
194 let id = config.id;
195
196 let mut tokens = vec![];
197
198 let (ingested_upper_tx, ingested_upper_rx) =
199 watch::channel(MutableAntichain::new_bottom(C::Time::minimum()));
200 let (probed_upper_tx, probed_upper_rx) = watch::channel(None);
201
202 let source_metrics = Arc::new(config.metrics.get_source_metrics(id, worker_id));
203
204 let timestamp_desc = source_connection.timestamp_desc();
205
206 let (remap_collection, remap_token) = remap_operator(
207 scope,
208 storage_state,
209 config.clone(),
210 probed_upper_rx,
211 ingested_upper_rx,
212 timestamp_desc,
213 );
214 let remap_collection = remap_collection.inner.broadcast().as_collection();
216 tokens.push(remap_token);
217
218 let committed_upper = reclock_committed_upper(
219 &remap_collection,
220 config.as_of.clone(),
221 committed_upper,
222 id,
223 Arc::clone(&source_metrics),
224 );
225
226 let mut reclocked_exports = BTreeMap::new();
227
228 let reclocked_exports2 = &mut reclocked_exports;
229 let (health, source_tokens) = scope.parent.scoped("SourceTimeDomain", move |scope| {
230 let (exports, source_upper, health_stream, source_tokens) = source_render_operator(
231 scope,
232 config,
233 source_connection,
234 probed_upper_tx,
235 committed_upper,
236 start_signal,
237 );
238
239 for (id, export) in exports {
240 let (reclock_pusher, reclocked) = reclock(&remap_collection, config.as_of.clone());
241 export
242 .inner
243 .map(move |(result, from_time, diff)| {
244 let result = match result {
245 Ok(msg) => Ok(SourceOutput {
246 key: msg.key.clone(),
247 value: msg.value.clone(),
248 metadata: msg.metadata.clone(),
249 from_time: from_time.clone(),
250 }),
251 Err(err) => Err(err.clone()),
252 };
253 (result, from_time.clone(), *diff)
254 })
255 .capture_into(PusherCapture(reclock_pusher));
256 reclocked_exports2.insert(id, reclocked);
257 }
258
259 source_upper.capture_into(FrontierCapture(ingested_upper_tx));
260
261 (health_stream.leave(), source_tokens)
262 });
263
264 tokens.extend(source_tokens);
265
266 (reclocked_exports, health, tokens)
267}
268
269pub struct FrontierCapture<T>(watch::Sender<MutableAntichain<T>>);
270
271impl<T: Timestamp> EventPusher<T, Vec<Infallible>> for FrontierCapture<T> {
272 fn push(&mut self, event: Event<T, Vec<Infallible>>) {
273 match event {
274 Event::Progress(changes) => self.0.send_modify(|frontier| {
275 frontier.update_iter(changes);
276 }),
277 Event::Messages(_, _) => unreachable!(),
278 }
279 }
280}
281
282fn source_render_operator<G, C>(
288 scope: &mut G,
289 config: &RawSourceCreationConfig,
290 source_connection: C,
291 probed_upper_tx: watch::Sender<Option<Probe<C::Time>>>,
292 resume_uppers: impl futures::Stream<Item = Antichain<C::Time>> + 'static,
293 start_signal: impl std::future::Future<Output = ()> + 'static,
294) -> (
295 BTreeMap<GlobalId, StackedCollection<G, Result<SourceMessage, DataflowError>>>,
296 Stream<G, Infallible>,
297 Stream<G, HealthStatusMessage>,
298 Vec<PressOnDropButton>,
299)
300where
301 G: Scope<Timestamp = C::Time>,
302 C: SourceRender + 'static,
303{
304 let source_id = config.id;
305 let worker_id = config.worker_id;
306 let now_fn = config.now_fn.clone();
307 let timestamp_interval = config.timestamp_interval;
308
309 let resume_uppers = resume_uppers.inspect(move |upper| {
310 let upper = upper.pretty();
311 trace!(%upper, "timely-{worker_id} source({source_id}) received resume upper");
312 });
313
314 let (exports, progress, health, probes, tokens) =
315 source_connection.render(scope, config, resume_uppers, start_signal);
316
317 let mut export_collections = BTreeMap::new();
318
319 let source_metrics = config.metrics.get_source_metrics(config.id, worker_id);
320
321 let resume_upper = Antichain::from_iter(
323 config
324 .resume_uppers
325 .values()
326 .flat_map(|f| f.iter().cloned()),
327 );
328 source_metrics
329 .resume_upper
330 .set(mz_persist_client::metrics::encode_ts_metric(&resume_upper));
331
332 let mut health_streams = vec![];
333
334 for (id, export) in exports {
335 let name = format!("SourceGenericStats({})", id);
336 let mut builder = OperatorBuilderRc::new(name, scope.clone());
337
338 let (health_output, derived_health) = builder.new_output();
339 let mut health_output =
340 OutputBuilder::<_, CapacityContainerBuilder<_>>::from(health_output);
341 health_streams.push(derived_health);
342
343 let (output, new_export) = builder.new_output();
344 let mut output = OutputBuilder::<_, CapacityContainerBuilder<_>>::from(output);
345
346 let mut input = builder.new_input(&export.inner, Pipeline);
347 export_collections.insert(id, new_export.as_collection());
348
349 let bytes_read_counter = config.metrics.source_defs.bytes_read.clone();
350 let source_statistics = config
351 .statistics
352 .get(&id)
353 .expect("statistics initialized")
354 .clone();
355
356 builder.build(move |mut caps| {
357 let mut health_cap = Some(caps.remove(0));
358
359 move |frontiers| {
360 let mut last_status = None;
361 let mut health_output = health_output.activate();
362
363 if frontiers[0].is_empty() {
364 health_cap = None;
365 return;
366 }
367 let health_cap = health_cap.as_mut().unwrap();
368
369 while let Some((cap, data)) = input.next() {
370 for (message, _, _) in data.iter() {
371 match message {
372 Ok(message) => {
373 source_statistics.inc_messages_received_by(1);
374 let key_len = u64::cast_from(message.key.byte_len());
375 let value_len = u64::cast_from(message.value.byte_len());
376 bytes_read_counter.inc_by(key_len + value_len);
377 source_statistics.inc_bytes_received_by(key_len + value_len);
378 }
379 Err(error) => {
380 let update = HealthStatusUpdate::stalled(
384 error.to_string(),
385 Some(
386 "retracting the errored value may resume the source"
387 .to_string(),
388 ),
389 );
390 let status = HealthStatusMessage {
391 id: Some(id),
392 namespace: C::STATUS_NAMESPACE.clone(),
393 update,
394 };
395 if last_status.as_ref() != Some(&status) {
396 last_status = Some(status.clone());
397 health_output.session(&health_cap).give(status);
398 }
399 }
400 }
401 }
402 let mut output = output.activate();
403 output.session(&cap).give_container(data);
404 }
405 }
406 });
407 }
408
409 let probe_stream = match probes {
410 Some(stream) => stream,
411 None => synthesize_probes(source_id, &progress, timestamp_interval, now_fn),
412 };
413
414 probe_stream.broadcast().inspect(move |probe| {
418 let _ = probed_upper_tx.send(Some(probe.clone()));
420 });
421
422 (
423 export_collections,
424 progress,
425 health.concatenate_flatten::<_, CapacityContainerBuilder<_>>(health_streams),
426 tokens,
427 )
428}
429
430fn remap_operator<G, FromTime>(
436 scope: &G,
437 storage_state: &crate::storage_state::StorageState,
438 config: RawSourceCreationConfig,
439 mut probed_upper: watch::Receiver<Option<Probe<FromTime>>>,
440 mut ingested_upper: watch::Receiver<MutableAntichain<FromTime>>,
441 remap_relation_desc: RelationDesc,
442) -> (VecCollection<G, FromTime, Diff>, PressOnDropButton)
443where
444 G: Scope<Timestamp = mz_repr::Timestamp>,
445 FromTime: SourceTimestamp,
446{
447 let RawSourceCreationConfig {
448 name,
449 id,
450 source_exports: _,
451 worker_id,
452 worker_count,
453 timestamp_interval,
454 remap_metadata,
455 as_of,
456 resume_uppers: _,
457 source_resume_uppers: _,
458 metrics: _,
459 now_fn,
460 persist_clients,
461 statistics: _,
462 shared_remap_upper,
463 config: _,
464 remap_collection_id,
465 busy_signal: _,
466 } = config;
467
468 let read_only_rx = storage_state.read_only_rx.clone();
469 let error_handler = storage_state.error_handler("remap_operator", id);
470
471 let chosen_worker = usize::cast_from(id.hashed() % u64::cast_from(worker_count));
472 let active_worker = chosen_worker == worker_id;
473
474 let operator_name = format!("remap({})", id);
475 let mut remap_op = AsyncOperatorBuilder::new(operator_name, scope.clone());
476 let (remap_output, remap_stream) = remap_op.new_output::<CapacityContainerBuilder<_>>();
477
478 let button = remap_op.build(move |capabilities| async move {
479 if !active_worker {
480 shared_remap_upper.borrow_mut().clear();
483 return;
484 }
485
486 let mut cap_set = CapabilitySet::from_elem(capabilities.into_element());
487
488 let remap_handle = crate::source::reclock::compat::PersistHandle::<FromTime, _>::new(
489 Arc::clone(&persist_clients),
490 read_only_rx,
491 remap_metadata.clone(),
492 as_of.clone(),
493 shared_remap_upper,
494 id,
495 "remap",
496 worker_id,
497 worker_count,
498 remap_relation_desc,
499 remap_collection_id,
500 )
501 .await;
502
503 let remap_handle = match remap_handle {
504 Ok(handle) => handle,
505 Err(e) => {
506 error_handler
507 .report_and_stop(
508 e.context(format!("Failed to create remap handle for source {name}")),
509 )
510 .await
511 }
512 };
513
514 let (mut timestamper, mut initial_batch) = ReclockOperator::new(remap_handle).await;
515
516 trace!(
519 "timely-{worker_id} remap({id}) emitting remap snapshot: trace_updates={:?}",
520 &initial_batch.updates
521 );
522
523 let cap = cap_set.delayed(cap_set.first().unwrap());
524 remap_output.give_container(&cap, &mut initial_batch.updates);
525 drop(cap);
526 cap_set.downgrade(initial_batch.upper);
527
528 let mut ticker = tokio::time::interval(timestamp_interval);
529 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
530
531 let mut prev_probe_ts: Option<mz_repr::Timestamp> = None;
532 let timestamp_interval_ms: u64 = timestamp_interval
533 .as_millis()
534 .try_into()
535 .expect("huge duration");
536
537 while !cap_set.is_empty() {
538 let reclock_to_latest =
541 dyncfgs::STORAGE_RECLOCK_TO_LATEST.get(&config.config.config_set());
542
543 let mut new_probe = None;
547 if reclock_to_latest {
548 new_probe = probed_upper
549 .wait_for(|new_probe| match (prev_probe_ts, new_probe) {
550 (None, Some(_)) => true,
551 (Some(prev_ts), Some(new)) => prev_ts < new.probe_ts,
552 _ => false,
553 })
554 .await
555 .map(|probe| (*probe).clone())
556 .unwrap_or_else(|_| {
557 Some(Probe {
558 probe_ts: now_fn().into(),
559 upstream_frontier: Antichain::new(),
560 })
561 });
562 } else {
563 while prev_probe_ts >= new_probe.as_ref().map(|p| p.probe_ts) {
564 ticker.tick().await;
565 let upstream_frontier = ingested_upper
570 .wait_for(|f| *f.frontier() != [FromTime::minimum()])
571 .await
572 .unwrap()
573 .frontier()
574 .to_owned();
575
576 let now = (now_fn)();
577 let mut probe_ts = now - now % timestamp_interval_ms;
578 if (now % timestamp_interval_ms) != 0 {
579 probe_ts += timestamp_interval_ms;
580 }
581 new_probe = Some(Probe {
582 probe_ts: probe_ts.into(),
583 upstream_frontier,
584 });
585 }
586 };
587
588 let probe = new_probe.expect("known to be Some");
589 prev_probe_ts = Some(probe.probe_ts);
590
591 let binding_ts = probe.probe_ts;
592 let cur_source_upper = probe.upstream_frontier;
593
594 let new_into_upper = Antichain::from_elem(binding_ts.step_forward());
595
596 let mut remap_trace_batch = timestamper
597 .mint(binding_ts, new_into_upper, cur_source_upper.borrow())
598 .await;
599
600 trace!(
601 "timely-{worker_id} remap({id}) minted new bindings: \
602 updates={:?} \
603 source_upper={} \
604 trace_upper={}",
605 &remap_trace_batch.updates,
606 cur_source_upper.pretty(),
607 remap_trace_batch.upper.pretty()
608 );
609
610 let cap = cap_set.delayed(cap_set.first().unwrap());
611 remap_output.give_container(&cap, &mut remap_trace_batch.updates);
612 cap_set.downgrade(remap_trace_batch.upper);
613 }
614 });
615
616 (remap_stream.as_collection(), button.press_on_drop())
617}
618
619fn reclock_committed_upper<G, FromTime>(
623 bindings: &VecCollection<G, FromTime, Diff>,
624 as_of: Antichain<G::Timestamp>,
625 committed_upper: &Stream<G, ()>,
626 id: GlobalId,
627 metrics: Arc<SourceMetrics>,
628) -> impl futures::stream::Stream<Item = Antichain<FromTime>> + 'static
629where
630 G: Scope,
631 G::Timestamp: Lattice + TotalOrder,
632 FromTime: SourceTimestamp,
633{
634 let (tx, rx) = watch::channel(Antichain::from_elem(FromTime::minimum()));
635 let scope = bindings.scope().clone();
636
637 let name = format!("ReclockCommitUpper({id})");
638 let mut builder = OperatorBuilderRc::new(name, scope);
639
640 let mut bindings = builder.new_input(&bindings.inner, Pipeline);
641 let _ = builder.new_input(committed_upper, Pipeline);
642
643 builder.build(move |_| {
644 use timely::progress::ChangeBatch;
646 let mut accepted_times: ChangeBatch<(G::Timestamp, FromTime)> = ChangeBatch::new();
647 let mut upper = Antichain::from_elem(Timestamp::minimum());
649 let mut ready_times = VecDeque::new();
651 let mut source_upper = MutableAntichain::new();
652
653 move |frontiers| {
654 while let Some((_, data)) = bindings.next() {
656 accepted_times.extend(data.drain(..).map(|(from, mut into, diff)| {
657 into.advance_by(as_of.borrow());
658 ((into, from), diff.into_inner())
659 }));
660 }
661 let new_upper = frontiers[0].frontier();
663 if PartialOrder::less_than(&upper.borrow(), &new_upper) {
664 upper = new_upper.to_owned();
665 let mut pending_times = std::mem::take(&mut accepted_times).into_inner();
668 pending_times.sort_unstable_by(|a, b| a.0.cmp(&b.0));
670 for ((into, from), diff) in pending_times.drain(..) {
671 if !upper.less_equal(&into) {
672 ready_times.push_back((from, into, diff));
673 } else {
674 accepted_times.update((into, from), diff);
675 }
676 }
677 }
678
679 if as_of.iter().all(|t| !upper.less_equal(t)) {
681 let committed_upper = frontiers[1].frontier();
682 if as_of.iter().all(|t| !committed_upper.less_equal(t)) {
683 let reclocked_upper = match committed_upper.as_option() {
724 Some(t_next) => {
725 let idx = ready_times.partition_point(|(_, t, _)| t < t_next);
726 let updates = ready_times
727 .drain(0..idx)
728 .map(|(from_time, _, diff)| (from_time, diff));
729 source_upper.update_iter(updates);
730 source_upper.frontier().to_owned()
733 }
734 None => Antichain::new(),
735 };
736 tx.send_replace(reclocked_upper);
737 }
738 }
739
740 metrics
741 .commit_upper_accepted_times
742 .set(u64::cast_from(accepted_times.len()));
743 metrics
744 .commit_upper_ready_times
745 .set(u64::cast_from(ready_times.len()));
746 }
747 });
748
749 WatchStream::from_changes(rx)
750}
751
752fn synthesize_probes<G>(
758 source_id: GlobalId,
759 progress: &Stream<G, Infallible>,
760 interval: Duration,
761 now_fn: NowFn,
762) -> Stream<G, Probe<G::Timestamp>>
763where
764 G: Scope,
765{
766 let scope = progress.scope();
767
768 let active_worker = usize::cast_from(source_id.hashed()) % scope.peers();
769 let is_active_worker = active_worker == scope.index();
770
771 let mut op = AsyncOperatorBuilder::new("synthesize_probes".into(), scope);
772 let (output, output_stream) = op.new_output::<CapacityContainerBuilder<_>>();
773 let mut input = op.new_input_for(progress, Pipeline, &output);
774
775 op.build(|caps| async move {
776 if !is_active_worker {
777 return;
778 }
779
780 let [cap] = caps.try_into().expect("one capability per output");
781
782 let mut ticker = probe::Ticker::new(move || interval, now_fn.clone());
783
784 let minimum_frontier = Antichain::from_elem(Timestamp::minimum());
785 let mut frontier = minimum_frontier.clone();
786 loop {
787 tokio::select! {
788 event = input.next() => match event {
789 Some(AsyncEvent::Progress(progress)) => frontier = progress,
790 Some(AsyncEvent::Data(..)) => unreachable!(),
791 None => break,
792 },
793 probe_ts = ticker.tick(), if frontier != minimum_frontier => {
798 let probe = Probe {
799 probe_ts,
800 upstream_frontier: frontier.clone(),
801 };
802 output.give(&cap, probe);
803 }
804 }
805 }
806
807 let probe = Probe {
808 probe_ts: now_fn().into(),
809 upstream_frontier: Antichain::new(),
810 };
811 output.give(&cap, probe);
812 });
813
814 output_stream
815}