1use std::collections::{BTreeMap, BTreeSet};
13use std::fmt;
14use std::sync::Arc;
15
16use chrono::{DateTime, Utc};
17use futures::Future;
18use itertools::Itertools;
19use mz_adapter_types::connection::ConnectionId;
20use mz_catalog::memory::objects::{CatalogItem, ContinualTask, MaterializedView, View};
21use mz_compute_types::ComputeInstanceId;
22use mz_expr::CollectionPlan;
23use mz_ore::collections::CollectionExt;
24use mz_ore::instrument;
25use mz_ore::now::{EpochMillis, NowFn, to_datetime};
26use mz_ore::vec::VecExt;
27use mz_repr::{CatalogItemId, GlobalId, Timestamp};
28use mz_sql::names::{ResolvedDatabaseSpecifier, SchemaSpecifier};
29use mz_storage_types::sources::Timeline;
30use mz_timestamp_oracle::batching_oracle::BatchingTimestampOracle;
31use mz_timestamp_oracle::postgres_oracle::{
32 PostgresTimestampOracle, PostgresTimestampOracleConfig,
33};
34use mz_timestamp_oracle::{self, TimestampOracle, WriteTimestamp};
35use timely::progress::Timestamp as TimelyTimestamp;
36use tracing::{Instrument, debug, error, info};
37
38use crate::AdapterError;
39use crate::coord::Coordinator;
40use crate::coord::id_bundle::CollectionIdBundle;
41use crate::coord::read_policy::ReadHolds;
42use crate::coord::timestamp_selection::TimestampProvider;
43
44#[derive(Clone, Debug, Ord, PartialOrd, Eq, PartialEq, Hash)]
47pub enum TimelineContext {
48 TimelineDependent(Timeline),
51 TimestampDependent,
54 TimestampIndependent,
56}
57
58impl TimelineContext {
59 pub fn contains_timeline(&self) -> bool {
61 self.timeline().is_some()
62 }
63
64 pub fn timeline(&self) -> Option<&Timeline> {
66 match self {
67 Self::TimelineDependent(timeline) => Some(timeline),
68 Self::TimestampIndependent | Self::TimestampDependent => None,
69 }
70 }
71}
72
73pub(crate) struct TimelineState<T: TimelyTimestamp> {
79 pub(crate) oracle: Arc<dyn TimestampOracle<T> + Send + Sync>,
80 pub(crate) read_holds: ReadHolds<T>,
81}
82
83impl<T: TimelyTimestamp> fmt::Debug for TimelineState<T> {
84 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
85 f.debug_struct("TimelineState")
86 .field("read_holds", &self.read_holds)
87 .finish()
88 }
89}
90
91impl Coordinator {
92 pub(crate) fn now(&self) -> EpochMillis {
93 (self.catalog().config().now)()
94 }
95
96 pub(crate) fn now_datetime(&self) -> DateTime<Utc> {
97 to_datetime(self.now())
98 }
99
100 pub(crate) fn get_timestamp_oracle(
101 &self,
102 timeline: &Timeline,
103 ) -> Arc<dyn TimestampOracle<Timestamp> + Send + Sync> {
104 let oracle = &self
105 .global_timelines
106 .get(timeline)
107 .expect("all timelines have a timestamp oracle")
108 .oracle;
109
110 Arc::clone(oracle)
111 }
112
113 pub(crate) fn get_local_timestamp_oracle(
115 &self,
116 ) -> Arc<dyn TimestampOracle<Timestamp> + Send + Sync> {
117 self.get_timestamp_oracle(&Timeline::EpochMilliseconds)
118 }
119
120 pub(crate) async fn get_local_read_ts(&self) -> Timestamp {
124 self.get_local_timestamp_oracle().read_ts().await
125 }
126
127 #[instrument(name = "coord::get_local_write_ts")]
131 pub(crate) async fn get_local_write_ts(&mut self) -> WriteTimestamp {
132 self.global_timelines
133 .get_mut(&Timeline::EpochMilliseconds)
134 .expect("no realtime timeline")
135 .oracle
136 .write_ts()
137 .await
138 }
139
140 pub(crate) async fn peek_local_write_ts(&self) -> Timestamp {
143 self.get_local_timestamp_oracle().peek_write_ts().await
144 }
145
146 pub(crate) fn apply_local_write(
148 &self,
149 timestamp: Timestamp,
150 ) -> impl Future<Output = ()> + Send + 'static {
151 let now = self.now().into();
152
153 let upper_bound = upper_bound(&now);
154 if timestamp > upper_bound {
155 error!(
156 %now,
157 "Setting local read timestamp to {timestamp}, which is more than \
158 the desired upper bound {upper_bound}."
159 );
160 }
161
162 let oracle = self.get_local_timestamp_oracle();
163
164 async move {
165 oracle
166 .apply_write(timestamp)
167 .instrument(tracing::debug_span!("apply_local_write_static", ?timestamp))
168 .await
169 }
170 }
171
172 pub(crate) async fn get_catalog_write_ts(&mut self) -> Timestamp {
184 if self.read_only_controllers {
185 let (write_ts, upper) =
186 futures::future::join(self.peek_local_write_ts(), self.catalog().current_upper())
187 .await;
188 std::cmp::max(write_ts, upper)
189 } else {
190 self.get_local_write_ts().await.timestamp
191 }
192 }
193
194 pub(crate) async fn ensure_timeline_state<'a>(
196 &'a mut self,
197 timeline: &'a Timeline,
198 ) -> &'a mut TimelineState<Timestamp> {
199 Self::ensure_timeline_state_with_initial_time(
200 timeline,
201 Timestamp::minimum(),
202 self.catalog().config().now.clone(),
203 self.pg_timestamp_oracle_config.clone(),
204 &mut self.global_timelines,
205 self.read_only_controllers,
206 )
207 .await
208 }
209
210 #[instrument]
213 pub(crate) async fn ensure_timeline_state_with_initial_time<'a>(
214 timeline: &'a Timeline,
215 initially: Timestamp,
216 now: NowFn,
217 pg_oracle_config: Option<PostgresTimestampOracleConfig>,
218 global_timelines: &'a mut BTreeMap<Timeline, TimelineState<Timestamp>>,
219 read_only: bool,
220 ) -> &'a mut TimelineState<Timestamp> {
221 if !global_timelines.contains_key(timeline) {
222 info!(
223 "opening a new CRDB/postgres TimestampOracle for timeline {:?}",
224 timeline,
225 );
226
227 let now_fn = if timeline == &Timeline::EpochMilliseconds {
228 now
229 } else {
230 NowFn::from(|| Timestamp::minimum().into())
239 };
240
241 let pg_oracle_config = pg_oracle_config.expect(
242 "missing --timestamp-oracle-url even though the crdb-backed timestamp oracle was configured");
243
244 let batching_metrics = Arc::clone(&pg_oracle_config.metrics);
245
246 let pg_oracle: Arc<dyn TimestampOracle<mz_repr::Timestamp> + Send + Sync> = Arc::new(
247 PostgresTimestampOracle::open(
248 pg_oracle_config,
249 timeline.to_string(),
250 initially,
251 now_fn,
252 read_only,
253 )
254 .await,
255 );
256
257 let batching_oracle = BatchingTimestampOracle::new(batching_metrics, pg_oracle);
258
259 let oracle: Arc<dyn TimestampOracle<mz_repr::Timestamp> + Send + Sync> =
260 Arc::new(batching_oracle);
261
262 global_timelines.insert(
263 timeline.clone(),
264 TimelineState {
265 oracle,
266 read_holds: ReadHolds::new(),
267 },
268 );
269 }
270 global_timelines.get_mut(timeline).expect("inserted above")
271 }
272
273 pub(crate) fn build_collection_id_bundle(
275 &self,
276 storage_ids: impl IntoIterator<Item = GlobalId>,
277 compute_ids: impl IntoIterator<Item = (ComputeInstanceId, GlobalId)>,
278 clusters: impl IntoIterator<Item = ComputeInstanceId>,
279 ) -> CollectionIdBundle {
280 let mut compute: BTreeMap<_, BTreeSet<_>> = BTreeMap::new();
281
282 for (instance_id, id) in compute_ids {
284 compute.entry(instance_id).or_default().insert(id);
285 }
286
287 let cluster_set: BTreeSet<_> = clusters.into_iter().collect();
289 for (_timeline, TimelineState { read_holds, .. }) in &self.global_timelines {
290 let compute_ids = read_holds
291 .compute_ids()
292 .filter(|(instance_id, _id)| cluster_set.contains(instance_id));
293 for (instance_id, id) in compute_ids {
294 compute.entry(instance_id).or_default().insert(id);
295 }
296 }
297
298 CollectionIdBundle {
299 storage_ids: storage_ids.into_iter().collect(),
300 compute_ids: compute,
301 }
302 }
303
304 pub(crate) fn remove_resources_associated_with_timeline(
307 &mut self,
308 timeline: Timeline,
309 ids: CollectionIdBundle,
310 ) -> bool {
311 let TimelineState { read_holds, .. } = self
312 .global_timelines
313 .get_mut(&timeline)
314 .expect("all timeslines have a timestamp oracle");
315
316 for id in ids.storage_ids {
318 read_holds.remove_storage_collection(id);
319 }
320 for (compute_id, ids) in ids.compute_ids {
321 for id in ids {
322 read_holds.remove_compute_collection(compute_id, id);
323 }
324 }
325 let became_empty = read_holds.is_empty();
326
327 became_empty
328 }
329
330 pub(crate) fn remove_compute_ids_from_timeline<I>(&mut self, ids: I) -> Vec<Timeline>
331 where
332 I: IntoIterator<Item = (ComputeInstanceId, GlobalId)>,
333 {
334 let mut empty_timelines = BTreeSet::new();
335 for (compute_instance, id) in ids {
336 for (timeline, TimelineState { read_holds, .. }) in &mut self.global_timelines {
337 read_holds.remove_compute_collection(compute_instance, id);
338 if read_holds.is_empty() {
339 empty_timelines.insert(timeline.clone());
340 }
341 }
342 }
343 empty_timelines.into_iter().collect()
344 }
345
346 pub(crate) fn ids_in_timeline(&self, timeline: &Timeline) -> CollectionIdBundle {
347 let mut id_bundle = CollectionIdBundle::default();
348 for entry in self.catalog().entries() {
349 if let TimelineContext::TimelineDependent(entry_timeline) =
350 self.get_timeline_context(entry.id())
351 {
352 if timeline == &entry_timeline {
353 match entry.item() {
354 CatalogItem::Table(table) => {
355 id_bundle.storage_ids.extend(table.global_ids());
356 }
357 CatalogItem::Source(source) => {
358 id_bundle.storage_ids.insert(source.global_id());
359 }
360 CatalogItem::MaterializedView(mv) => {
361 id_bundle.storage_ids.insert(mv.global_id());
362 }
363 CatalogItem::ContinualTask(ct) => {
364 id_bundle.storage_ids.insert(ct.global_id());
365 }
366 CatalogItem::Index(index) => {
367 id_bundle
368 .compute_ids
369 .entry(index.cluster_id)
370 .or_default()
371 .insert(index.global_id());
372 }
373 CatalogItem::View(_)
374 | CatalogItem::Sink(_)
375 | CatalogItem::Type(_)
376 | CatalogItem::Func(_)
377 | CatalogItem::Secret(_)
378 | CatalogItem::Connection(_)
379 | CatalogItem::Log(_) => {}
380 }
381 }
382 }
383 }
384 id_bundle
385 }
386
387 pub(crate) fn validate_timeline_context<I>(
393 &self,
394 ids: I,
395 ) -> Result<TimelineContext, AdapterError>
396 where
397 I: IntoIterator<Item = GlobalId>,
398 {
399 let items_ids = ids
400 .into_iter()
401 .filter_map(|gid| self.catalog().try_resolve_item_id(&gid));
402 let mut timeline_contexts: Vec<_> =
403 self.get_timeline_contexts(items_ids).into_iter().collect();
404 let timelines: Vec<_> = timeline_contexts
420 .drain_filter_swapping(|timeline_context| timeline_context.contains_timeline())
421 .collect();
422
423 if timelines.len() > 1 {
427 Err(AdapterError::Unsupported(
428 "multiple timelines within one dataflow",
429 ))
430 } else if timelines.len() == 1 {
431 Ok(timelines.into_element())
432 } else if timeline_contexts
433 .iter()
434 .contains(&TimelineContext::TimestampDependent)
435 {
436 Ok(TimelineContext::TimestampDependent)
437 } else {
438 Ok(TimelineContext::TimestampIndependent)
439 }
440 }
441
442 pub(crate) fn get_timeline_context(&self, id: CatalogItemId) -> TimelineContext {
444 let entry = self.catalog().get_entry(&id);
445 self.validate_timeline_context(entry.global_ids())
446 .expect("impossible for a single object to belong to incompatible timeline contexts")
447 }
448
449 pub(crate) fn get_timeline_context_for_global_id(&self, id: GlobalId) -> TimelineContext {
451 self.validate_timeline_context(vec![id])
452 .expect("impossible for a single object to belong to incompatible timeline contexts")
453 }
454
455 fn get_timeline_contexts<I>(&self, ids: I) -> BTreeSet<TimelineContext>
457 where
458 I: IntoIterator<Item = CatalogItemId>,
459 {
460 let mut seen: BTreeSet<CatalogItemId> = BTreeSet::new();
461 let mut timelines: BTreeSet<TimelineContext> = BTreeSet::new();
462
463 let mut ids: Vec<_> = ids.into_iter().collect();
466 while let Some(id) = ids.pop() {
467 if !seen.insert(id) {
470 continue;
471 }
472 if let Some(entry) = self.catalog().try_get_entry(&id) {
473 match entry.item() {
474 CatalogItem::Source(source) => {
475 timelines
476 .insert(TimelineContext::TimelineDependent(source.timeline.clone()));
477 }
478 CatalogItem::Index(index) => {
479 let on_id = self.catalog().resolve_item_id(&index.on);
480 ids.push(on_id);
481 }
482 CatalogItem::View(View { optimized_expr, .. }) => {
483 if optimized_expr.contains_temporal() {
486 timelines.insert(TimelineContext::TimestampDependent);
487 } else {
488 timelines.insert(TimelineContext::TimestampIndependent);
489 }
490 let item_ids = optimized_expr
491 .depends_on()
492 .into_iter()
493 .map(|gid| self.catalog().resolve_item_id(&gid));
494 ids.extend(item_ids);
495 }
496 CatalogItem::MaterializedView(MaterializedView { optimized_expr, .. }) => {
497 timelines.insert(TimelineContext::TimestampDependent);
504 let item_ids = optimized_expr
505 .depends_on()
506 .into_iter()
507 .map(|gid| self.catalog().resolve_item_id(&gid));
508 ids.extend(item_ids);
509 }
510 CatalogItem::ContinualTask(ContinualTask { raw_expr, .. }) => {
511 timelines.insert(TimelineContext::TimestampDependent);
513 let item_ids = raw_expr
514 .depends_on()
515 .into_iter()
516 .map(|gid| self.catalog().resolve_item_id(&gid));
517 ids.extend(item_ids);
518 }
519 CatalogItem::Table(table) => {
520 timelines.insert(TimelineContext::TimelineDependent(table.timeline()));
521 }
522 CatalogItem::Log(_) => {
523 timelines.insert(TimelineContext::TimelineDependent(
524 Timeline::EpochMilliseconds,
525 ));
526 }
527 CatalogItem::Sink(_)
528 | CatalogItem::Type(_)
529 | CatalogItem::Func(_)
530 | CatalogItem::Secret(_)
531 | CatalogItem::Connection(_) => {}
532 }
533 }
534 }
535
536 timelines
537 }
538
539 pub fn partition_ids_by_timeline_context(
542 &self,
543 id_bundle: &CollectionIdBundle,
544 ) -> impl Iterator<Item = (TimelineContext, CollectionIdBundle)> + use<> {
545 let mut res: BTreeMap<TimelineContext, CollectionIdBundle> = BTreeMap::new();
546
547 for gid in &id_bundle.storage_ids {
548 let timeline_context = self.get_timeline_context_for_global_id(*gid);
549 res.entry(timeline_context)
550 .or_default()
551 .storage_ids
552 .insert(*gid);
553 }
554
555 for (compute_instance, ids) in &id_bundle.compute_ids {
556 for gid in ids {
557 let timeline_context = self.get_timeline_context_for_global_id(*gid);
558 res.entry(timeline_context)
559 .or_default()
560 .compute_ids
561 .entry(*compute_instance)
562 .or_default()
563 .insert(*gid);
564 }
565 }
566
567 res.into_iter()
568 }
569
570 pub(crate) fn timedomain_for<'a, I>(
576 &self,
577 uses_ids: I,
578 timeline_context: &TimelineContext,
579 conn_id: &ConnectionId,
580 compute_instance: ComputeInstanceId,
581 ) -> Result<CollectionIdBundle, AdapterError>
582 where
583 I: IntoIterator<Item = &'a GlobalId>,
584 {
585 let mut schemas = BTreeSet::new();
587 for id in uses_ids {
588 let entry = self.catalog().get_entry_by_global_id(id);
589 let name = entry.name();
590 schemas.insert((name.qualifiers.database_spec, name.qualifiers.schema_spec));
591 }
592
593 let pg_catalog_schema = (
594 ResolvedDatabaseSpecifier::Ambient,
595 SchemaSpecifier::Id(self.catalog().get_pg_catalog_schema_id()),
596 );
597 let system_schemas: Vec<_> = self
598 .catalog()
599 .system_schema_ids()
600 .map(|id| (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Id(id)))
601 .collect();
602
603 if system_schemas.iter().any(|s| schemas.contains(s)) {
604 schemas.extend(system_schemas);
607 } else if !schemas.is_empty() {
608 schemas.insert(pg_catalog_schema);
611 }
612
613 let mut collection_ids: BTreeSet<GlobalId> = BTreeSet::new();
615 for (db, schema) in schemas {
616 let schema = self.catalog().get_schema(&db, &schema, conn_id);
617 let global_ids = schema
621 .items
622 .values()
623 .map(|item_id| self.catalog().get_entry(item_id).latest_global_id());
624 collection_ids.extend(global_ids);
625 }
626
627 let mut id_bundle: CollectionIdBundle = self
629 .index_oracle(compute_instance)
630 .sufficient_collections(collection_ids);
631
632 for ids in [
634 &mut id_bundle.storage_ids,
635 &mut id_bundle.compute_ids.entry(compute_instance).or_default(),
636 ] {
637 ids.retain(|gid| {
638 let id_timeline_context = self
639 .validate_timeline_context(vec![*gid])
640 .expect("single id should never fail");
641 match (&id_timeline_context, &timeline_context) {
642 (
644 TimelineContext::TimestampIndependent | TimelineContext::TimestampDependent,
645 _,
646 ) => true,
647 (
651 TimelineContext::TimelineDependent(id_timeline),
652 TimelineContext::TimestampIndependent | TimelineContext::TimestampDependent,
653 ) => id_timeline == &Timeline::EpochMilliseconds,
654 (
656 TimelineContext::TimelineDependent(id_timeline),
657 TimelineContext::TimelineDependent(source_timeline),
658 ) => id_timeline == source_timeline,
659 }
660 });
661 }
662
663 Ok(id_bundle)
664 }
665
666 #[instrument(level = "debug")]
667 pub(crate) async fn advance_timelines(&mut self) {
668 let global_timelines = std::mem::take(&mut self.global_timelines);
669 for (
670 timeline,
671 TimelineState {
672 oracle,
673 mut read_holds,
674 },
675 ) in global_timelines
676 {
677 if timeline != Timeline::EpochMilliseconds && !self.read_only_controllers {
680 let id_bundle = self.ids_in_timeline(&timeline);
684
685 if !id_bundle.is_empty() {
689 let least_valid_write = self.least_valid_write(&id_bundle);
690 let now = Self::largest_not_in_advance_of_upper(&least_valid_write);
691 oracle.apply_write(now).await;
692 debug!(
693 least_valid_write = ?least_valid_write,
694 oracle_read_ts = ?oracle.read_ts().await,
695 "advanced {:?} to {}",
696 timeline,
697 now,
698 );
699 }
700 };
701 let read_ts = oracle.read_ts().await;
702 read_holds.downgrade(read_ts);
703 self.global_timelines
704 .insert(timeline, TimelineState { oracle, read_holds });
705 }
706 }
707}
708
709fn upper_bound(now: &mz_repr::Timestamp) -> mz_repr::Timestamp {
712 const TIMESTAMP_INTERVAL_MS: u64 = 5000;
713 const TIMESTAMP_INTERVAL_UPPER_BOUND: u64 = 2;
714
715 now.saturating_add(TIMESTAMP_INTERVAL_MS * TIMESTAMP_INTERVAL_UPPER_BOUND)
716}