1#![allow(missing_docs)]
24#![allow(clippy::needless_borrow)]
25
26use std::cell::RefCell;
27use std::collections::{BTreeMap, VecDeque};
28use std::hash::{Hash, Hasher};
29use std::rc::Rc;
30use std::sync::Arc;
31use std::time::Duration;
32
33use differential_dataflow::lattice::Lattice;
34use differential_dataflow::{AsCollection, Hashable, VecCollection};
35use futures::stream::StreamExt;
36use mz_ore::cast::CastFrom;
37use mz_ore::collections::CollectionExt;
38use mz_ore::now::NowFn;
39use mz_persist_client::cache::PersistClientCache;
40use mz_repr::{Diff, GlobalId, RelationDesc, Row};
41use mz_storage_types::configuration::StorageConfiguration;
42use mz_storage_types::controller::CollectionMetadata;
43use mz_storage_types::errors::DataflowError;
44use mz_storage_types::sources::{SourceConnection, SourceExport, SourceTimestamp};
45use mz_timely_util::antichain::AntichainExt;
46use mz_timely_util::builder_async::{OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton};
47use mz_timely_util::capture::PusherCapture;
48use mz_timely_util::operator::ConcatenateFlatten;
49use mz_timely_util::reclock::reclock;
50use timely::PartialOrder;
51use timely::container::CapacityContainerBuilder;
52use timely::dataflow::channels::pact::Pipeline;
53use timely::dataflow::operators::capture::capture::Capture;
54use timely::dataflow::operators::core::Map as _;
55use timely::dataflow::operators::generic::OutputBuilder;
56use timely::dataflow::operators::generic::builder_rc::OperatorBuilder as OperatorBuilderRc;
57use timely::dataflow::operators::vec::Broadcast;
58use timely::dataflow::operators::{CapabilitySet, Inspect, Leave};
59use timely::dataflow::{Scope, StreamVec};
60use timely::order::TotalOrder;
61use timely::progress::frontier::MutableAntichain;
62use timely::progress::{Antichain, Timestamp};
63use tokio::sync::{Semaphore, watch};
64use tokio_stream::wrappers::WatchStream;
65use tracing::trace;
66
67use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate};
68use crate::metrics::StorageMetrics;
69use crate::metrics::source::SourceMetrics;
70use crate::source::reclock::ReclockOperator;
71use crate::source::types::{Probe, SourceMessage, SourceOutput, SourceRender, StackedCollection};
72use crate::statistics::SourceStatistics;
73
74#[derive(Clone)]
77pub struct RawSourceCreationConfig {
78 pub name: String,
80 pub id: GlobalId,
82 pub source_exports: BTreeMap<GlobalId, SourceExport<CollectionMetadata>>,
84 pub worker_id: usize,
86 pub worker_count: usize,
88 pub timestamp_interval: Duration,
91 pub now_fn: NowFn,
93 pub metrics: StorageMetrics,
95 pub as_of: Antichain<mz_repr::Timestamp>,
97 pub resume_uppers: BTreeMap<GlobalId, Antichain<mz_repr::Timestamp>>,
100 pub source_resume_uppers: BTreeMap<GlobalId, Vec<Row>>,
107 pub persist_clients: Arc<PersistClientCache>,
109 pub statistics: BTreeMap<GlobalId, SourceStatistics>,
111 pub shared_remap_upper: Rc<RefCell<Antichain<mz_repr::Timestamp>>>,
113 pub config: StorageConfiguration,
115 pub remap_collection_id: GlobalId,
117 pub remap_metadata: CollectionMetadata,
119 pub busy_signal: Arc<Semaphore>,
122}
123
124#[derive(Clone)]
127pub struct SourceExportCreationConfig {
128 pub id: GlobalId,
130 pub worker_id: usize,
132 pub metrics: StorageMetrics,
134 pub source_statistics: SourceStatistics,
136}
137
138impl RawSourceCreationConfig {
139 pub fn responsible_worker<P: Hash>(&self, partition: P) -> usize {
141 let mut h = std::hash::DefaultHasher::default();
142 (self.id, partition).hash(&mut h);
143 let key = usize::cast_from(h.finish());
144 key % self.worker_count
145 }
146
147 pub fn responsible_for<P: Hash>(&self, partition: P) -> bool {
149 self.responsible_worker(partition) == self.worker_id
150 }
151}
152
153pub fn create_raw_source<'scope, 'root, C>(
166 scope: Scope<'scope, mz_repr::Timestamp>,
167 root_scope: Scope<'root, ()>,
168 storage_state: &crate::storage_state::StorageState,
169 committed_upper: StreamVec<'scope, mz_repr::Timestamp, ()>,
170 config: &RawSourceCreationConfig,
171 source_connection: C,
172 start_signal: impl std::future::Future<Output = ()> + 'static,
173) -> (
174 BTreeMap<
175 GlobalId,
176 VecCollection<
177 'scope,
178 mz_repr::Timestamp,
179 Result<SourceOutput<C::Time>, DataflowError>,
180 Diff,
181 >,
182 >,
183 StreamVec<'root, (), HealthStatusMessage>,
184 Vec<PressOnDropButton>,
185)
186where
187 C: SourceConnection + SourceRender + Clone + 'static,
188{
189 let worker_id = config.worker_id;
190 let id = config.id;
191
192 let mut tokens = vec![];
193
194 let (probed_upper_tx, probed_upper_rx) = watch::channel(None);
195
196 let source_metrics = Arc::new(config.metrics.get_source_metrics(id, worker_id));
197
198 let timestamp_desc = source_connection.timestamp_desc();
199
200 let (remap_collection, remap_token) = remap_operator(
201 scope,
202 storage_state,
203 config.clone(),
204 probed_upper_rx,
205 timestamp_desc,
206 );
207 let remap_collection = remap_collection.inner.broadcast().as_collection();
209 tokens.push(remap_token);
210
211 let committed_upper = reclock_committed_upper(
212 remap_collection.clone(),
213 config.as_of.clone(),
214 committed_upper,
215 id,
216 Arc::clone(&source_metrics),
217 );
218
219 let mut reclocked_exports = BTreeMap::new();
220
221 let reclocked_exports2 = &mut reclocked_exports;
222 let (health, source_tokens) = root_scope.scoped("SourceTimeDomain", move |scope| {
223 let (exports, health_stream, source_tokens) = source_render_operator(
224 scope,
225 config,
226 source_connection,
227 probed_upper_tx,
228 committed_upper,
229 start_signal,
230 );
231
232 for (id, export) in exports {
233 let (reclock_pusher, reclocked) =
234 reclock(remap_collection.clone(), config.as_of.clone());
235 export
236 .inner
237 .map(move |(result, from_time, diff)| {
238 let result = match result {
239 Ok(msg) => Ok(SourceOutput {
240 key: msg.key.clone(),
241 value: msg.value.clone(),
242 metadata: msg.metadata.clone(),
243 from_time: from_time.clone(),
244 }),
245 Err(err) => Err(err.clone()),
246 };
247 (result, from_time.clone(), *diff)
248 })
249 .capture_into(PusherCapture(reclock_pusher));
250 reclocked_exports2.insert(id, reclocked);
251 }
252
253 (health_stream.leave(root_scope), source_tokens)
254 });
255
256 tokens.extend(source_tokens);
257
258 (reclocked_exports, health, tokens)
259}
260
261fn source_render_operator<'scope, C>(
264 scope: Scope<'scope, C::Time>,
265 config: &RawSourceCreationConfig,
266 source_connection: C,
267 probed_upper_tx: watch::Sender<Option<Probe<C::Time>>>,
268 resume_uppers: impl futures::Stream<Item = Antichain<C::Time>> + 'static,
269 start_signal: impl std::future::Future<Output = ()> + 'static,
270) -> (
271 BTreeMap<GlobalId, StackedCollection<'scope, C::Time, Result<SourceMessage, DataflowError>>>,
272 StreamVec<'scope, C::Time, HealthStatusMessage>,
273 Vec<PressOnDropButton>,
274)
275where
276 C: SourceRender + 'static,
277{
278 let source_id = config.id;
279 let worker_id = config.worker_id;
280
281 let resume_uppers = resume_uppers.inspect(move |upper| {
282 let upper = upper.pretty();
283 trace!(%upper, "timely-{worker_id} source({source_id}) received resume upper");
284 });
285
286 let (exports, health, probe_stream, tokens) =
287 source_connection.render(scope, config, resume_uppers, start_signal);
288
289 let mut export_collections = BTreeMap::new();
290
291 let source_metrics = config.metrics.get_source_metrics(config.id, worker_id);
292
293 let resume_upper = Antichain::from_iter(
295 config
296 .resume_uppers
297 .values()
298 .flat_map(|f| f.iter().cloned()),
299 );
300 source_metrics
301 .resume_upper
302 .set(mz_persist_client::metrics::encode_ts_metric(&resume_upper));
303
304 let mut health_streams = vec![];
305
306 for (id, export) in exports {
307 let name = format!("SourceGenericStats({})", id);
308 let mut builder = OperatorBuilderRc::new(name, scope.clone());
309
310 let (health_output, derived_health) = builder.new_output();
311 let mut health_output =
312 OutputBuilder::<_, CapacityContainerBuilder<_>>::from(health_output);
313 health_streams.push(derived_health);
314
315 let (output, new_export) = builder.new_output();
316 let mut output = OutputBuilder::<_, CapacityContainerBuilder<_>>::from(output);
317
318 let mut input = builder.new_input(export.inner, Pipeline);
319 export_collections.insert(id, new_export.as_collection());
320
321 let bytes_read_counter = config.metrics.source_defs.bytes_read.clone();
322 let source_statistics = config
323 .statistics
324 .get(&id)
325 .expect("statistics initialized")
326 .clone();
327
328 builder.build(move |mut caps| {
329 let mut health_cap = Some(caps.remove(0));
330
331 move |frontiers| {
332 let mut last_status = None;
333 let mut health_output = health_output.activate();
334
335 if frontiers[0].is_empty() {
336 health_cap = None;
337 return;
338 }
339 let health_cap = health_cap.as_mut().unwrap();
340
341 input.for_each(|cap, data| {
342 for (message, _, _) in data.iter() {
343 match message {
344 Ok(message) => {
345 source_statistics.inc_messages_received_by(1);
346 let key_len = u64::cast_from(message.key.byte_len());
347 let value_len = u64::cast_from(message.value.byte_len());
348 bytes_read_counter.inc_by(key_len + value_len);
349 source_statistics.inc_bytes_received_by(key_len + value_len);
350 }
351 Err(error) => {
352 let update = HealthStatusUpdate::stalled(
356 error.to_string(),
357 Some(
358 "retracting the errored value may resume the source"
359 .to_string(),
360 ),
361 );
362 let status = HealthStatusMessage {
363 id: Some(id),
364 namespace: C::STATUS_NAMESPACE.clone(),
365 update,
366 };
367 if last_status.as_ref() != Some(&status) {
368 last_status = Some(status.clone());
369 health_output.session(&health_cap).give(status);
370 }
371 }
372 }
373 }
374 let mut output = output.activate();
375 output.session(&cap).give_container(data);
376 });
377 }
378 });
379 }
380
381 probe_stream.broadcast().inspect(move |probe| {
385 let _ = probed_upper_tx.send(Some(probe.clone()));
387 });
388
389 (
390 export_collections,
391 health.concatenate_flatten::<_, CapacityContainerBuilder<_>>(health_streams),
392 tokens,
393 )
394}
395
396fn remap_operator<'scope, FromTime>(
402 scope: Scope<'scope, mz_repr::Timestamp>,
403 storage_state: &crate::storage_state::StorageState,
404 config: RawSourceCreationConfig,
405 mut probed_upper: watch::Receiver<Option<Probe<FromTime>>>,
406 remap_relation_desc: RelationDesc,
407) -> (
408 VecCollection<'scope, mz_repr::Timestamp, FromTime, Diff>,
409 PressOnDropButton,
410)
411where
412 FromTime: SourceTimestamp,
413{
414 let RawSourceCreationConfig {
415 name,
416 id,
417 source_exports: _,
418 worker_id,
419 worker_count,
420 timestamp_interval: _,
421 remap_metadata,
422 as_of,
423 resume_uppers: _,
424 source_resume_uppers: _,
425 metrics: _,
426 now_fn,
427 persist_clients,
428 statistics: _,
429 shared_remap_upper,
430 config: _,
431 remap_collection_id,
432 busy_signal: _,
433 } = config;
434
435 let read_only_rx = storage_state.read_only_rx.clone();
436 let error_handler = storage_state.error_handler("remap_operator", id);
437
438 let chosen_worker = usize::cast_from(id.hashed() % u64::cast_from(worker_count));
439 let active_worker = chosen_worker == worker_id;
440
441 let operator_name = format!("remap({})", id);
442 let mut remap_op = AsyncOperatorBuilder::new(operator_name, scope.clone());
443 let (remap_output, remap_stream) = remap_op.new_output::<CapacityContainerBuilder<_>>();
444
445 let button = remap_op.build(move |capabilities| async move {
446 if !active_worker {
447 shared_remap_upper.borrow_mut().clear();
450 return;
451 }
452
453 let mut cap_set = CapabilitySet::from_elem(capabilities.into_element());
454
455 let remap_handle = crate::source::reclock::compat::PersistHandle::<FromTime, _>::new(
456 Arc::clone(&persist_clients),
457 read_only_rx,
458 remap_metadata.clone(),
459 as_of.clone(),
460 shared_remap_upper,
461 id,
462 "remap",
463 worker_id,
464 worker_count,
465 remap_relation_desc,
466 remap_collection_id,
467 )
468 .await;
469
470 let remap_handle = match remap_handle {
471 Ok(handle) => handle,
472 Err(e) => {
473 error_handler
474 .report_and_stop(
475 e.context(format!("Failed to create remap handle for source {name}")),
476 )
477 .await
478 }
479 };
480
481 let (mut timestamper, mut initial_batch) = ReclockOperator::new(remap_handle).await;
482
483 trace!(
486 "timely-{worker_id} remap({id}) emitting remap snapshot: trace_updates={:?}",
487 &initial_batch.updates
488 );
489
490 let cap = cap_set.delayed(cap_set.first().unwrap());
491 remap_output.give_container(&cap, &mut initial_batch.updates);
492 drop(cap);
493 cap_set.downgrade(initial_batch.upper);
494
495 let mut prev_probe_ts: Option<mz_repr::Timestamp> = None;
496
497 while !cap_set.is_empty() {
498 let new_probe = probed_upper
500 .wait_for(|new_probe| match (prev_probe_ts, new_probe) {
501 (None, Some(_)) => true,
502 (Some(prev_ts), Some(new)) => prev_ts < new.probe_ts,
503 _ => false,
504 })
505 .await
506 .map(|probe| (*probe).clone())
507 .unwrap_or_else(|_| {
508 Some(Probe {
509 probe_ts: now_fn().into(),
510 upstream_frontier: Antichain::new(),
511 })
512 });
513
514 let probe = new_probe.expect("known to be Some");
515 prev_probe_ts = Some(probe.probe_ts);
516
517 let binding_ts = probe.probe_ts;
518 let cur_source_upper = probe.upstream_frontier;
519
520 let new_into_upper = Antichain::from_elem(binding_ts.step_forward());
521
522 let mut remap_trace_batch = timestamper
523 .mint(binding_ts, new_into_upper, cur_source_upper.borrow())
524 .await;
525
526 trace!(
527 "timely-{worker_id} remap({id}) minted new bindings: \
528 updates={:?} \
529 source_upper={} \
530 trace_upper={}",
531 &remap_trace_batch.updates,
532 cur_source_upper.pretty(),
533 remap_trace_batch.upper.pretty()
534 );
535
536 let cap = cap_set.delayed(cap_set.first().unwrap());
537 remap_output.give_container(&cap, &mut remap_trace_batch.updates);
538 cap_set.downgrade(remap_trace_batch.upper);
539 }
540 });
541
542 (remap_stream.as_collection(), button.press_on_drop())
543}
544
545fn reclock_committed_upper<'scope, T, FromTime>(
549 bindings: VecCollection<'scope, T, FromTime, Diff>,
550 as_of: Antichain<T>,
551 committed_upper: StreamVec<'scope, T, ()>,
552 id: GlobalId,
553 metrics: Arc<SourceMetrics>,
554) -> impl futures::stream::Stream<Item = Antichain<FromTime>> + 'static
555where
556 T: Timestamp + Lattice + TotalOrder,
557 FromTime: SourceTimestamp,
558{
559 let (tx, rx) = watch::channel(Antichain::from_elem(FromTime::minimum()));
560 let scope = bindings.scope().clone();
561
562 let name = format!("ReclockCommitUpper({id})");
563 let mut builder = OperatorBuilderRc::new(name, scope);
564
565 let mut bindings = builder.new_input(bindings.inner.clone(), Pipeline);
566 let _ = builder.new_input(committed_upper.clone(), Pipeline);
567
568 builder.build(move |_| {
569 use timely::progress::ChangeBatch;
571 let mut accepted_times: ChangeBatch<(T, FromTime)> = ChangeBatch::new();
572 let mut upper = Antichain::from_elem(Timestamp::minimum());
574 let mut ready_times = VecDeque::new();
576 let mut source_upper = MutableAntichain::new();
577
578 move |frontiers| {
579 bindings.for_each(|_, data| {
581 accepted_times.extend(data.drain(..).map(|(from, mut into, diff)| {
582 into.advance_by(as_of.borrow());
583 ((into, from), diff.into_inner())
584 }));
585 });
586 let new_upper = frontiers[0].frontier();
588 if PartialOrder::less_than(&upper.borrow(), &new_upper) {
589 upper = new_upper.to_owned();
590 let mut pending_times = std::mem::take(&mut accepted_times).into_inner();
593 pending_times.sort_unstable_by(|a, b| a.0.cmp(&b.0));
595 for ((into, from), diff) in pending_times.drain(..) {
596 if !upper.less_equal(&into) {
597 ready_times.push_back((from, into, diff));
598 } else {
599 accepted_times.update((into, from), diff);
600 }
601 }
602 }
603
604 if as_of.iter().all(|t| !upper.less_equal(t)) {
606 let committed_upper = frontiers[1].frontier();
607 if as_of.iter().all(|t| !committed_upper.less_equal(t)) {
608 let reclocked_upper = match committed_upper.as_option() {
649 Some(t_next) => {
650 let idx = ready_times.partition_point(|(_, t, _)| t < t_next);
651 let updates = ready_times
652 .drain(0..idx)
653 .map(|(from_time, _, diff)| (from_time, diff));
654 source_upper.update_iter(updates);
655 source_upper.frontier().to_owned()
658 }
659 None => Antichain::new(),
660 };
661 tx.send_replace(reclocked_upper);
662 }
663 }
664
665 metrics
666 .commit_upper_accepted_times
667 .set(u64::cast_from(accepted_times.len()));
668 metrics
669 .commit_upper_ready_times
670 .set(u64::cast_from(ready_times.len()));
671 }
672 });
673
674 WatchStream::from_changes(rx)
675}