1#![allow(missing_docs)]
24#![allow(clippy::needless_borrow)]
25
26use std::cell::RefCell;
27use std::collections::{BTreeMap, VecDeque};
28use std::convert::Infallible;
29use std::hash::{Hash, Hasher};
30use std::rc::Rc;
31use std::sync::Arc;
32use std::time::Duration;
33
34use differential_dataflow::lattice::Lattice;
35use differential_dataflow::{AsCollection, Collection, Hashable};
36use futures::stream::StreamExt;
37use mz_ore::cast::CastFrom;
38use mz_ore::collections::CollectionExt;
39use mz_ore::now::NowFn;
40use mz_persist_client::cache::PersistClientCache;
41use mz_repr::{Diff, GlobalId, RelationDesc, Row};
42use mz_storage_types::configuration::StorageConfiguration;
43use mz_storage_types::controller::CollectionMetadata;
44use mz_storage_types::dyncfgs;
45use mz_storage_types::errors::DataflowError;
46use mz_storage_types::sources::{SourceConnection, SourceExport, SourceTimestamp};
47use mz_timely_util::antichain::AntichainExt;
48use mz_timely_util::builder_async::{
49 Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
50};
51use mz_timely_util::capture::PusherCapture;
52use mz_timely_util::operator::ConcatenateFlatten;
53use mz_timely_util::reclock::reclock;
54use timely::container::CapacityContainerBuilder;
55use timely::dataflow::channels::pact::Pipeline;
56use timely::dataflow::operators::capture::capture::Capture;
57use timely::dataflow::operators::capture::{Event, EventPusher};
58use timely::dataflow::operators::core::Map as _;
59use timely::dataflow::operators::generic::builder_rc::OperatorBuilder as OperatorBuilderRc;
60use timely::dataflow::operators::{Broadcast, CapabilitySet, Inspect, Leave};
61use timely::dataflow::scopes::Child;
62use timely::dataflow::{Scope, Stream};
63use timely::order::TotalOrder;
64use timely::progress::frontier::MutableAntichain;
65use timely::progress::{Antichain, Timestamp};
66use timely::{Container, PartialOrder};
67use tokio::sync::{Semaphore, watch};
68use tokio_stream::wrappers::WatchStream;
69use tracing::trace;
70
71use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate};
72use crate::metrics::StorageMetrics;
73use crate::metrics::source::SourceMetrics;
74use crate::source::probe;
75use crate::source::reclock::ReclockOperator;
76use crate::source::types::{Probe, SourceMessage, SourceOutput, SourceRender, StackedCollection};
77use crate::statistics::SourceStatistics;
78
79#[derive(Clone)]
82pub struct RawSourceCreationConfig {
83 pub name: String,
85 pub id: GlobalId,
87 pub source_exports: BTreeMap<GlobalId, SourceExport<CollectionMetadata>>,
89 pub worker_id: usize,
91 pub worker_count: usize,
93 pub timestamp_interval: Duration,
96 pub now_fn: NowFn,
98 pub metrics: StorageMetrics,
100 pub storage_metadata: CollectionMetadata,
102 pub as_of: Antichain<mz_repr::Timestamp>,
104 pub resume_uppers: BTreeMap<GlobalId, Antichain<mz_repr::Timestamp>>,
107 pub source_resume_uppers: BTreeMap<GlobalId, Vec<Row>>,
114 pub persist_clients: Arc<PersistClientCache>,
116 pub source_statistics: SourceStatistics,
118 pub shared_remap_upper: Rc<RefCell<Antichain<mz_repr::Timestamp>>>,
120 pub config: StorageConfiguration,
122 pub remap_collection_id: GlobalId,
124 pub busy_signal: Arc<Semaphore>,
127}
128
129#[derive(Clone)]
132pub struct SourceExportCreationConfig {
133 pub id: GlobalId,
135 pub worker_id: usize,
137 pub metrics: StorageMetrics,
139 pub source_statistics: SourceStatistics,
141}
142
143impl RawSourceCreationConfig {
144 pub fn responsible_worker<P: Hash>(&self, partition: P) -> usize {
146 let mut h = std::hash::DefaultHasher::default();
147 (self.id, partition).hash(&mut h);
148 let key = usize::cast_from(h.finish());
149 key % self.worker_count
150 }
151
152 pub fn responsible_for<P: Hash>(&self, partition: P) -> bool {
154 self.responsible_worker(partition) == self.worker_id
155 }
156}
157
158pub fn create_raw_source<'g, G: Scope<Timestamp = ()>, C>(
171 scope: &mut Child<'g, G, mz_repr::Timestamp>,
172 storage_state: &crate::storage_state::StorageState,
173 committed_upper: &Stream<Child<'g, G, mz_repr::Timestamp>, ()>,
174 config: &RawSourceCreationConfig,
175 source_connection: C,
176 start_signal: impl std::future::Future<Output = ()> + 'static,
177) -> (
178 BTreeMap<
179 GlobalId,
180 Collection<
181 Child<'g, G, mz_repr::Timestamp>,
182 Result<SourceOutput<C::Time>, DataflowError>,
183 Diff,
184 >,
185 >,
186 Stream<G, HealthStatusMessage>,
187 Vec<PressOnDropButton>,
188)
189where
190 C: SourceConnection + SourceRender + Clone + 'static,
191{
192 let worker_id = config.worker_id;
193 let id = config.id;
194
195 let mut tokens = vec![];
196
197 let (ingested_upper_tx, ingested_upper_rx) =
198 watch::channel(MutableAntichain::new_bottom(C::Time::minimum()));
199 let (probed_upper_tx, probed_upper_rx) = watch::channel(None);
200
201 let source_metrics = Arc::new(config.metrics.get_source_metrics(id, worker_id));
202
203 let timestamp_desc = source_connection.timestamp_desc();
204
205 let (remap_collection, remap_token) = remap_operator(
206 scope,
207 storage_state,
208 config.clone(),
209 probed_upper_rx,
210 ingested_upper_rx,
211 timestamp_desc,
212 );
213 let remap_collection = remap_collection.inner.broadcast().as_collection();
215 tokens.push(remap_token);
216
217 let committed_upper = reclock_committed_upper(
218 &remap_collection,
219 config.as_of.clone(),
220 committed_upper,
221 id,
222 Arc::clone(&source_metrics),
223 );
224
225 let mut reclocked_exports = BTreeMap::new();
226
227 let reclocked_exports2 = &mut reclocked_exports;
228 let (health, source_tokens) = scope.parent.scoped("SourceTimeDomain", move |scope| {
229 let (exports, source_upper, health_stream, source_tokens) = source_render_operator(
230 scope,
231 config,
232 source_connection,
233 probed_upper_tx,
234 committed_upper,
235 start_signal,
236 );
237
238 for (id, export) in exports {
239 let (reclock_pusher, reclocked) = reclock(&remap_collection, config.as_of.clone());
240 export
241 .inner
242 .map(move |(result, from_time, diff)| {
243 let result = match result {
244 Ok(msg) => Ok(SourceOutput {
245 key: msg.key.clone(),
246 value: msg.value.clone(),
247 metadata: msg.metadata.clone(),
248 from_time: from_time.clone(),
249 }),
250 Err(err) => Err(err.clone()),
251 };
252 (result, from_time.clone(), *diff)
253 })
254 .capture_into(PusherCapture(reclock_pusher));
255 reclocked_exports2.insert(id, reclocked);
256 }
257
258 source_upper.capture_into(FrontierCapture(ingested_upper_tx));
259
260 (health_stream.leave(), source_tokens)
261 });
262
263 tokens.extend(source_tokens);
264
265 (reclocked_exports, health, tokens)
266}
267
268pub struct FrontierCapture<T>(watch::Sender<MutableAntichain<T>>);
269
270impl<T: Timestamp> EventPusher<T, Vec<Infallible>> for FrontierCapture<T> {
271 fn push(&mut self, event: Event<T, Vec<Infallible>>) {
272 match event {
273 Event::Progress(changes) => self.0.send_modify(|frontier| {
274 frontier.update_iter(changes);
275 }),
276 Event::Messages(_, _) => unreachable!(),
277 }
278 }
279}
280
281fn source_render_operator<G, C>(
287 scope: &mut G,
288 config: &RawSourceCreationConfig,
289 source_connection: C,
290 probed_upper_tx: watch::Sender<Option<Probe<C::Time>>>,
291 resume_uppers: impl futures::Stream<Item = Antichain<C::Time>> + 'static,
292 start_signal: impl std::future::Future<Output = ()> + 'static,
293) -> (
294 BTreeMap<GlobalId, StackedCollection<G, Result<SourceMessage, DataflowError>>>,
295 Stream<G, Infallible>,
296 Stream<G, HealthStatusMessage>,
297 Vec<PressOnDropButton>,
298)
299where
300 G: Scope<Timestamp = C::Time>,
301 C: SourceRender + 'static,
302{
303 let source_id = config.id;
304 let worker_id = config.worker_id;
305 let source_statistics = config.source_statistics.clone();
306 let now_fn = config.now_fn.clone();
307 let timestamp_interval = config.timestamp_interval;
308
309 let resume_uppers = resume_uppers.inspect(move |upper| {
310 let upper = upper.pretty();
311 trace!(%upper, "timely-{worker_id} source({source_id}) received resume upper");
312 });
313
314 let (exports, progress, health, stats, probes, tokens) =
315 source_connection.render(scope, config, resume_uppers, start_signal);
316
317 crate::source::statistics::process_statistics(
318 scope.clone(),
319 source_id,
320 worker_id,
321 stats,
322 source_statistics.clone(),
323 );
324
325 let mut export_collections = BTreeMap::new();
326
327 let source_metrics = config.metrics.get_source_metrics(config.id, worker_id);
328
329 let resume_upper = Antichain::from_iter(
331 config
332 .resume_uppers
333 .values()
334 .flat_map(|f| f.iter().cloned()),
335 );
336 source_metrics
337 .resume_upper
338 .set(mz_persist_client::metrics::encode_ts_metric(&resume_upper));
339
340 let mut health_streams = vec![];
341
342 for (id, export) in exports {
343 let name = format!("SourceGenericStats({})", id);
344 let mut builder = OperatorBuilderRc::new(name, scope.clone());
345
346 let (mut health_output, derived_health) =
347 builder.new_output::<CapacityContainerBuilder<_>>();
348 health_streams.push(derived_health);
349
350 let (mut output, new_export) = builder.new_output::<CapacityContainerBuilder<_>>();
351
352 let mut input = builder.new_input(&export.inner, Pipeline);
353 export_collections.insert(id, new_export.as_collection());
354
355 let bytes_read_counter = config.metrics.source_defs.bytes_read.clone();
356 let source_statistics = source_statistics.clone();
357
358 builder.build(move |mut caps| {
359 let mut health_cap = Some(caps.remove(0));
360
361 move |frontiers| {
362 let mut last_status = None;
363 let mut health_output = health_output.activate();
364
365 if frontiers[0].is_empty() {
366 health_cap = None;
367 return;
368 }
369 let health_cap = health_cap.as_mut().unwrap();
370
371 while let Some((cap, data)) = input.next() {
372 for (message, _, _) in data.iter() {
373 let status = match &message {
374 Ok(_) => HealthStatusUpdate::running(),
375 Err(error) => HealthStatusUpdate::stalled(
379 error.to_string(),
380 Some(
381 "retracting the errored value may resume the source"
382 .to_string(),
383 ),
384 ),
385 };
386
387 let status = HealthStatusMessage {
388 id: Some(id),
389 namespace: C::STATUS_NAMESPACE.clone(),
390 update: status,
391 };
392 if last_status.as_ref() != Some(&status) {
393 last_status = Some(status.clone());
394 health_output.session(&health_cap).give(status);
395 }
396
397 match message {
398 Ok(message) => {
399 source_statistics.inc_messages_received_by(1);
400 let key_len = u64::cast_from(message.key.byte_len());
401 let value_len = u64::cast_from(message.value.byte_len());
402 bytes_read_counter.inc_by(key_len + value_len);
403 source_statistics.inc_bytes_received_by(key_len + value_len);
404 }
405 Err(_) => {}
406 }
407 }
408 let mut output = output.activate();
409 output.session(&cap).give_container(data);
410 }
411 }
412 });
413 }
414
415 let probe_stream = match probes {
416 Some(stream) => stream,
417 None => synthesize_probes(source_id, &progress, timestamp_interval, now_fn),
418 };
419
420 probe_stream.broadcast().inspect(move |probe| {
424 let _ = probed_upper_tx.send(Some(probe.clone()));
426 });
427
428 (
429 export_collections,
430 progress,
431 health.concatenate_flatten::<_, CapacityContainerBuilder<_>>(health_streams),
432 tokens,
433 )
434}
435
436fn remap_operator<G, FromTime>(
442 scope: &G,
443 storage_state: &crate::storage_state::StorageState,
444 config: RawSourceCreationConfig,
445 mut probed_upper: watch::Receiver<Option<Probe<FromTime>>>,
446 mut ingested_upper: watch::Receiver<MutableAntichain<FromTime>>,
447 remap_relation_desc: RelationDesc,
448) -> (Collection<G, FromTime, Diff>, PressOnDropButton)
449where
450 G: Scope<Timestamp = mz_repr::Timestamp>,
451 FromTime: SourceTimestamp,
452{
453 let RawSourceCreationConfig {
454 name,
455 id,
456 source_exports: _,
457 worker_id,
458 worker_count,
459 timestamp_interval,
460 storage_metadata,
461 as_of,
462 resume_uppers: _,
463 source_resume_uppers: _,
464 metrics: _,
465 now_fn,
466 persist_clients,
467 source_statistics: _,
468 shared_remap_upper,
469 config: _,
470 remap_collection_id,
471 busy_signal: _,
472 } = config;
473
474 let read_only_rx = storage_state.read_only_rx.clone();
475 let error_handler = storage_state.error_handler("remap_operator", id);
476
477 let chosen_worker = usize::cast_from(id.hashed() % u64::cast_from(worker_count));
478 let active_worker = chosen_worker == worker_id;
479
480 let operator_name = format!("remap({})", id);
481 let mut remap_op = AsyncOperatorBuilder::new(operator_name, scope.clone());
482 let (remap_output, remap_stream) = remap_op.new_output::<CapacityContainerBuilder<_>>();
483
484 let button = remap_op.build(move |capabilities| async move {
485 if !active_worker {
486 shared_remap_upper.borrow_mut().clear();
489 return;
490 }
491
492 let mut cap_set = CapabilitySet::from_elem(capabilities.into_element());
493
494 let remap_handle = crate::source::reclock::compat::PersistHandle::<FromTime, _>::new(
495 Arc::clone(&persist_clients),
496 read_only_rx,
497 storage_metadata.clone(),
498 as_of.clone(),
499 shared_remap_upper,
500 id,
501 "remap",
502 worker_id,
503 worker_count,
504 remap_relation_desc,
505 remap_collection_id,
506 )
507 .await;
508
509 let remap_handle = match remap_handle {
510 Ok(handle) => handle,
511 Err(e) => {
512 error_handler
513 .report_and_stop(
514 e.context(format!("Failed to create remap handle for source {name}")),
515 )
516 .await
517 }
518 };
519
520 let (mut timestamper, mut initial_batch) = ReclockOperator::new(remap_handle).await;
521
522 trace!(
525 "timely-{worker_id} remap({id}) emitting remap snapshot: trace_updates={:?}",
526 &initial_batch.updates
527 );
528
529 let cap = cap_set.delayed(cap_set.first().unwrap());
530 remap_output.give_container(&cap, &mut initial_batch.updates);
531 drop(cap);
532 cap_set.downgrade(initial_batch.upper);
533
534 let mut ticker = tokio::time::interval(timestamp_interval);
535 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
536
537 let mut prev_probe_ts: Option<mz_repr::Timestamp> = None;
538 let timestamp_interval_ms: u64 = timestamp_interval
539 .as_millis()
540 .try_into()
541 .expect("huge duration");
542
543 while !cap_set.is_empty() {
544 let reclock_to_latest =
547 dyncfgs::STORAGE_RECLOCK_TO_LATEST.get(&config.config.config_set());
548
549 let mut new_probe = None;
553 if reclock_to_latest {
554 new_probe = probed_upper
555 .wait_for(|new_probe| match (prev_probe_ts, new_probe) {
556 (None, Some(_)) => true,
557 (Some(prev_ts), Some(new)) => prev_ts < new.probe_ts,
558 _ => false,
559 })
560 .await
561 .map(|probe| (*probe).clone())
562 .unwrap_or_else(|_| {
563 Some(Probe {
564 probe_ts: now_fn().into(),
565 upstream_frontier: Antichain::new(),
566 })
567 });
568 } else {
569 while prev_probe_ts >= new_probe.as_ref().map(|p| p.probe_ts) {
570 ticker.tick().await;
571 let upstream_frontier = ingested_upper
576 .wait_for(|f| *f.frontier() != [FromTime::minimum()])
577 .await
578 .unwrap()
579 .frontier()
580 .to_owned();
581
582 let now = (now_fn)();
583 let mut probe_ts = now - now % timestamp_interval_ms;
584 if (now % timestamp_interval_ms) != 0 {
585 probe_ts += timestamp_interval_ms;
586 }
587 new_probe = Some(Probe {
588 probe_ts: probe_ts.into(),
589 upstream_frontier,
590 });
591 }
592 };
593
594 let probe = new_probe.expect("known to be Some");
595 prev_probe_ts = Some(probe.probe_ts);
596
597 let binding_ts = probe.probe_ts;
598 let cur_source_upper = probe.upstream_frontier;
599
600 let new_into_upper = Antichain::from_elem(binding_ts.step_forward());
601
602 let mut remap_trace_batch = timestamper
603 .mint(binding_ts, new_into_upper, cur_source_upper.borrow())
604 .await;
605
606 trace!(
607 "timely-{worker_id} remap({id}) minted new bindings: \
608 updates={:?} \
609 source_upper={} \
610 trace_upper={}",
611 &remap_trace_batch.updates,
612 cur_source_upper.pretty(),
613 remap_trace_batch.upper.pretty()
614 );
615
616 let cap = cap_set.delayed(cap_set.first().unwrap());
617 remap_output.give_container(&cap, &mut remap_trace_batch.updates);
618 cap_set.downgrade(remap_trace_batch.upper);
619 }
620 });
621
622 (remap_stream.as_collection(), button.press_on_drop())
623}
624
625fn reclock_committed_upper<G, FromTime>(
629 bindings: &Collection<G, FromTime, Diff>,
630 as_of: Antichain<G::Timestamp>,
631 committed_upper: &Stream<G, ()>,
632 id: GlobalId,
633 metrics: Arc<SourceMetrics>,
634) -> impl futures::stream::Stream<Item = Antichain<FromTime>> + 'static
635where
636 G: Scope,
637 G::Timestamp: Lattice + TotalOrder,
638 FromTime: SourceTimestamp,
639{
640 let (tx, rx) = watch::channel(Antichain::from_elem(FromTime::minimum()));
641 let scope = bindings.scope().clone();
642
643 let name = format!("ReclockCommitUpper({id})");
644 let mut builder = OperatorBuilderRc::new(name, scope);
645
646 let mut bindings = builder.new_input(&bindings.inner, Pipeline);
647 let _ = builder.new_input(committed_upper, Pipeline);
648
649 builder.build(move |_| {
650 use timely::progress::ChangeBatch;
652 let mut accepted_times: ChangeBatch<(G::Timestamp, FromTime)> = ChangeBatch::new();
653 let mut upper = Antichain::from_elem(Timestamp::minimum());
655 let mut ready_times = VecDeque::new();
657 let mut source_upper = MutableAntichain::new();
658
659 move |frontiers| {
660 while let Some((_, data)) = bindings.next() {
662 accepted_times.extend(data.drain(..).map(|(from, mut into, diff)| {
663 into.advance_by(as_of.borrow());
664 ((into, from), diff.into_inner())
665 }));
666 }
667 let new_upper = frontiers[0].frontier();
669 if PartialOrder::less_than(&upper.borrow(), &new_upper) {
670 upper = new_upper.to_owned();
671 let mut pending_times = std::mem::take(&mut accepted_times).into_inner();
674 pending_times.sort_unstable_by(|a, b| a.0.cmp(&b.0));
676 for ((into, from), diff) in pending_times.drain(..) {
677 if !upper.less_equal(&into) {
678 ready_times.push_back((from, into, diff));
679 } else {
680 accepted_times.update((into, from), diff);
681 }
682 }
683 }
684
685 if as_of.iter().all(|t| !upper.less_equal(t)) {
687 let committed_upper = frontiers[1].frontier();
688 if as_of.iter().all(|t| !committed_upper.less_equal(t)) {
689 let reclocked_upper = match committed_upper.as_option() {
730 Some(t_next) => {
731 let idx = ready_times.partition_point(|(_, t, _)| t < t_next);
732 let updates = ready_times
733 .drain(0..idx)
734 .map(|(from_time, _, diff)| (from_time, diff));
735 source_upper.update_iter(updates);
736 source_upper.frontier().to_owned()
739 }
740 None => Antichain::new(),
741 };
742 tx.send_replace(reclocked_upper);
743 }
744 }
745
746 metrics
747 .commit_upper_accepted_times
748 .set(u64::cast_from(accepted_times.len()));
749 metrics
750 .commit_upper_ready_times
751 .set(u64::cast_from(ready_times.len()));
752 }
753 });
754
755 WatchStream::from_changes(rx)
756}
757
758fn synthesize_probes<G>(
764 source_id: GlobalId,
765 progress: &Stream<G, Infallible>,
766 interval: Duration,
767 now_fn: NowFn,
768) -> Stream<G, Probe<G::Timestamp>>
769where
770 G: Scope,
771{
772 let scope = progress.scope();
773
774 let active_worker = usize::cast_from(source_id.hashed()) % scope.peers();
775 let is_active_worker = active_worker == scope.index();
776
777 let mut op = AsyncOperatorBuilder::new("synthesize_probes".into(), scope);
778 let (output, output_stream) = op.new_output();
779 let mut input = op.new_input_for(progress, Pipeline, &output);
780
781 op.build(|caps| async move {
782 if !is_active_worker {
783 return;
784 }
785
786 let [cap] = caps.try_into().expect("one capability per output");
787
788 let mut ticker = probe::Ticker::new(move || interval, now_fn.clone());
789
790 let minimum_frontier = Antichain::from_elem(Timestamp::minimum());
791 let mut frontier = minimum_frontier.clone();
792 loop {
793 tokio::select! {
794 event = input.next() => match event {
795 Some(AsyncEvent::Progress(progress)) => frontier = progress,
796 Some(AsyncEvent::Data(..)) => unreachable!(),
797 None => break,
798 },
799 probe_ts = ticker.tick(), if frontier != minimum_frontier => {
804 let probe = Probe {
805 probe_ts,
806 upstream_frontier: frontier.clone(),
807 };
808 output.give(&cap, probe);
809 }
810 }
811 }
812
813 let probe = Probe {
814 probe_ts: now_fn().into(),
815 upstream_frontier: Antichain::new(),
816 };
817 output.give(&cap, probe);
818 });
819
820 output_stream
821}