1use std::collections::{BTreeMap, BTreeSet};
13use std::fmt;
14use std::sync::Arc;
15
16use chrono::{DateTime, Utc};
17use futures::Future;
18use mz_adapter_types::connection::ConnectionId;
19use mz_compute_types::ComputeInstanceId;
20use mz_ore::instrument;
21use mz_ore::now::{EpochMillis, NowFn, to_datetime};
22use mz_repr::{GlobalId, Timestamp};
23use mz_sql::names::{ResolvedDatabaseSpecifier, SchemaSpecifier};
24use mz_storage_types::sources::Timeline;
25use mz_timestamp_oracle::batching_oracle::BatchingTimestampOracle;
26use mz_timestamp_oracle::postgres_oracle::{
27    PostgresTimestampOracle, PostgresTimestampOracleConfig,
28};
29use mz_timestamp_oracle::{self, TimestampOracle, WriteTimestamp};
30use timely::progress::Timestamp as TimelyTimestamp;
31use tracing::{Instrument, debug, error, info};
32
33use crate::AdapterError;
34use crate::coord::Coordinator;
35use crate::coord::id_bundle::CollectionIdBundle;
36use crate::coord::read_policy::ReadHolds;
37use crate::coord::timestamp_selection::TimestampProvider;
38
39#[derive(Clone, Debug, Ord, PartialOrd, Eq, PartialEq, Hash)]
42pub enum TimelineContext {
43    TimelineDependent(Timeline),
46    TimestampDependent,
49    TimestampIndependent,
51}
52
53impl TimelineContext {
54    pub fn contains_timeline(&self) -> bool {
56        self.timeline().is_some()
57    }
58
59    pub fn timeline(&self) -> Option<&Timeline> {
61        match self {
62            Self::TimelineDependent(timeline) => Some(timeline),
63            Self::TimestampIndependent | Self::TimestampDependent => None,
64        }
65    }
66}
67
68pub(crate) struct TimelineState<T: TimelyTimestamp> {
74    pub(crate) oracle: Arc<dyn TimestampOracle<T> + Send + Sync>,
75    pub(crate) read_holds: ReadHolds<T>,
76}
77
78impl<T: TimelyTimestamp> fmt::Debug for TimelineState<T> {
79    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
80        f.debug_struct("TimelineState")
81            .field("read_holds", &self.read_holds)
82            .finish()
83    }
84}
85
86impl Coordinator {
87    pub(crate) fn now(&self) -> EpochMillis {
88        (self.catalog().config().now)()
89    }
90
91    pub(crate) fn now_datetime(&self) -> DateTime<Utc> {
92        to_datetime(self.now())
93    }
94
95    pub(crate) fn get_timestamp_oracle(
96        &self,
97        timeline: &Timeline,
98    ) -> Arc<dyn TimestampOracle<Timestamp> + Send + Sync> {
99        let oracle = &self
100            .global_timelines
101            .get(timeline)
102            .expect("all timelines have a timestamp oracle")
103            .oracle;
104
105        Arc::clone(oracle)
106    }
107
108    pub(crate) fn get_local_timestamp_oracle(
110        &self,
111    ) -> Arc<dyn TimestampOracle<Timestamp> + Send + Sync> {
112        self.get_timestamp_oracle(&Timeline::EpochMilliseconds)
113    }
114
115    pub(crate) async fn get_local_read_ts(&self) -> Timestamp {
119        self.get_local_timestamp_oracle().read_ts().await
120    }
121
122    #[instrument(name = "coord::get_local_write_ts")]
126    pub(crate) async fn get_local_write_ts(&mut self) -> WriteTimestamp {
127        self.global_timelines
128            .get_mut(&Timeline::EpochMilliseconds)
129            .expect("no realtime timeline")
130            .oracle
131            .write_ts()
132            .await
133    }
134
135    pub(crate) async fn peek_local_write_ts(&self) -> Timestamp {
138        self.get_local_timestamp_oracle().peek_write_ts().await
139    }
140
141    pub(crate) fn apply_local_write(
143        &self,
144        timestamp: Timestamp,
145    ) -> impl Future<Output = ()> + Send + 'static {
146        let now = self.now().into();
147
148        let upper_bound = upper_bound(&now);
149        if timestamp > upper_bound {
150            error!(
151                %now,
152                "Setting local read timestamp to {timestamp}, which is more than \
153                the desired upper bound {upper_bound}."
154            );
155        }
156
157        let oracle = self.get_local_timestamp_oracle();
158
159        async move {
160            oracle
161                .apply_write(timestamp)
162                .instrument(tracing::debug_span!("apply_local_write_static", ?timestamp))
163                .await
164        }
165    }
166
167    pub(crate) async fn get_catalog_write_ts(&mut self) -> Timestamp {
179        if self.read_only_controllers {
180            let (write_ts, upper) =
181                futures::future::join(self.peek_local_write_ts(), self.catalog().current_upper())
182                    .await;
183            std::cmp::max(write_ts, upper)
184        } else {
185            self.get_local_write_ts().await.timestamp
186        }
187    }
188
189    pub(crate) async fn ensure_timeline_state<'a>(
191        &'a mut self,
192        timeline: &'a Timeline,
193    ) -> &'a mut TimelineState<Timestamp> {
194        Self::ensure_timeline_state_with_initial_time(
195            timeline,
196            Timestamp::minimum(),
197            self.catalog().config().now.clone(),
198            self.pg_timestamp_oracle_config.clone(),
199            &mut self.global_timelines,
200            self.read_only_controllers,
201        )
202        .await
203    }
204
205    #[instrument]
208    pub(crate) async fn ensure_timeline_state_with_initial_time<'a>(
209        timeline: &'a Timeline,
210        initially: Timestamp,
211        now: NowFn,
212        pg_oracle_config: Option<PostgresTimestampOracleConfig>,
213        global_timelines: &'a mut BTreeMap<Timeline, TimelineState<Timestamp>>,
214        read_only: bool,
215    ) -> &'a mut TimelineState<Timestamp> {
216        if !global_timelines.contains_key(timeline) {
217            info!(
218                "opening a new CRDB/postgres TimestampOracle for timeline {:?}",
219                timeline,
220            );
221
222            let now_fn = if timeline == &Timeline::EpochMilliseconds {
223                now
224            } else {
225                NowFn::from(|| Timestamp::minimum().into())
234            };
235
236            let pg_oracle_config = pg_oracle_config.expect(
237                        "missing --timestamp-oracle-url even though the crdb-backed timestamp oracle was configured");
238
239            let batching_metrics = Arc::clone(&pg_oracle_config.metrics);
240
241            let pg_oracle: Arc<dyn TimestampOracle<mz_repr::Timestamp> + Send + Sync> = Arc::new(
242                PostgresTimestampOracle::open(
243                    pg_oracle_config,
244                    timeline.to_string(),
245                    initially,
246                    now_fn,
247                    read_only,
248                )
249                .await,
250            );
251
252            let batching_oracle = BatchingTimestampOracle::new(batching_metrics, pg_oracle);
253
254            let oracle: Arc<dyn TimestampOracle<mz_repr::Timestamp> + Send + Sync> =
255                Arc::new(batching_oracle);
256
257            global_timelines.insert(
258                timeline.clone(),
259                TimelineState {
260                    oracle,
261                    read_holds: ReadHolds::new(),
262                },
263            );
264        }
265        global_timelines.get_mut(timeline).expect("inserted above")
266    }
267
268    pub(crate) fn build_collection_id_bundle(
270        &self,
271        storage_ids: impl IntoIterator<Item = GlobalId>,
272        compute_ids: impl IntoIterator<Item = (ComputeInstanceId, GlobalId)>,
273        clusters: impl IntoIterator<Item = ComputeInstanceId>,
274    ) -> CollectionIdBundle {
275        let mut compute: BTreeMap<_, BTreeSet<_>> = BTreeMap::new();
276
277        for (instance_id, id) in compute_ids {
279            compute.entry(instance_id).or_default().insert(id);
280        }
281
282        let cluster_set: BTreeSet<_> = clusters.into_iter().collect();
284        for (_timeline, TimelineState { read_holds, .. }) in &self.global_timelines {
285            let compute_ids = read_holds
286                .compute_ids()
287                .filter(|(instance_id, _id)| cluster_set.contains(instance_id));
288            for (instance_id, id) in compute_ids {
289                compute.entry(instance_id).or_default().insert(id);
290            }
291        }
292
293        CollectionIdBundle {
294            storage_ids: storage_ids.into_iter().collect(),
295            compute_ids: compute,
296        }
297    }
298
299    pub(crate) fn remove_resources_associated_with_timeline(
302        &mut self,
303        timeline: Timeline,
304        ids: CollectionIdBundle,
305    ) -> bool {
306        let TimelineState { read_holds, .. } = self
307            .global_timelines
308            .get_mut(&timeline)
309            .expect("all timeslines have a timestamp oracle");
310
311        for id in ids.storage_ids {
313            read_holds.remove_storage_collection(id);
314        }
315        for (compute_id, ids) in ids.compute_ids {
316            for id in ids {
317                read_holds.remove_compute_collection(compute_id, id);
318            }
319        }
320        let became_empty = read_holds.is_empty();
321
322        became_empty
323    }
324
325    pub(crate) fn remove_compute_ids_from_timeline<I>(&mut self, ids: I) -> Vec<Timeline>
326    where
327        I: IntoIterator<Item = (ComputeInstanceId, GlobalId)>,
328    {
329        let mut empty_timelines = BTreeSet::new();
330        for (compute_instance, id) in ids {
331            for (timeline, TimelineState { read_holds, .. }) in &mut self.global_timelines {
332                read_holds.remove_compute_collection(compute_instance, id);
333                if read_holds.is_empty() {
334                    empty_timelines.insert(timeline.clone());
335                }
336            }
337        }
338        empty_timelines.into_iter().collect()
339    }
340
341    pub(crate) fn timedomain_for<'a, I>(
347        &self,
348        uses_ids: I,
349        timeline_context: &TimelineContext,
350        conn_id: &ConnectionId,
351        compute_instance: ComputeInstanceId,
352    ) -> Result<CollectionIdBundle, AdapterError>
353    where
354        I: IntoIterator<Item = &'a GlobalId>,
355    {
356        let mut schemas = BTreeSet::new();
358        for id in uses_ids {
359            let entry = self.catalog().get_entry_by_global_id(id);
360            let name = entry.name();
361            schemas.insert((name.qualifiers.database_spec, name.qualifiers.schema_spec));
362        }
363
364        let pg_catalog_schema = (
365            ResolvedDatabaseSpecifier::Ambient,
366            SchemaSpecifier::Id(self.catalog().get_pg_catalog_schema_id()),
367        );
368        let system_schemas: Vec<_> = self
369            .catalog()
370            .system_schema_ids()
371            .map(|id| (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Id(id)))
372            .collect();
373
374        if system_schemas.iter().any(|s| schemas.contains(s)) {
375            schemas.extend(system_schemas);
378        } else if !schemas.is_empty() {
379            schemas.insert(pg_catalog_schema);
382        }
383
384        let mut collection_ids: BTreeSet<GlobalId> = BTreeSet::new();
386        for (db, schema) in schemas {
387            let schema = self.catalog().get_schema(&db, &schema, conn_id);
388            let global_ids = schema
392                .items
393                .values()
394                .map(|item_id| self.catalog().get_entry(item_id).latest_global_id());
395            collection_ids.extend(global_ids);
396        }
397
398        let mut id_bundle: CollectionIdBundle = self
400            .index_oracle(compute_instance)
401            .sufficient_collections(collection_ids);
402
403        for ids in [
405            &mut id_bundle.storage_ids,
406            &mut id_bundle.compute_ids.entry(compute_instance).or_default(),
407        ] {
408            ids.retain(|gid| {
409                let id_timeline_context = self
410                    .catalog()
411                    .validate_timeline_context(vec![*gid])
412                    .expect("single id should never fail");
413                match (&id_timeline_context, &timeline_context) {
414                    (
416                        TimelineContext::TimestampIndependent | TimelineContext::TimestampDependent,
417                        _,
418                    ) => true,
419                    (
423                        TimelineContext::TimelineDependent(id_timeline),
424                        TimelineContext::TimestampIndependent | TimelineContext::TimestampDependent,
425                    ) => id_timeline == &Timeline::EpochMilliseconds,
426                    (
428                        TimelineContext::TimelineDependent(id_timeline),
429                        TimelineContext::TimelineDependent(source_timeline),
430                    ) => id_timeline == source_timeline,
431                }
432            });
433        }
434
435        Ok(id_bundle)
436    }
437
438    #[instrument(level = "debug")]
439    pub(crate) async fn advance_timelines(&mut self) {
440        let global_timelines = std::mem::take(&mut self.global_timelines);
441        for (
442            timeline,
443            TimelineState {
444                oracle,
445                mut read_holds,
446            },
447        ) in global_timelines
448        {
449            if timeline != Timeline::EpochMilliseconds && !self.read_only_controllers {
452                let id_bundle = self.catalog().ids_in_timeline(&timeline);
456
457                if !id_bundle.is_empty() {
461                    let least_valid_write = self.least_valid_write(&id_bundle);
462                    let now = Self::largest_not_in_advance_of_upper(&least_valid_write);
463                    oracle.apply_write(now).await;
464                    debug!(
465                        least_valid_write = ?least_valid_write,
466                        oracle_read_ts = ?oracle.read_ts().await,
467                        "advanced {:?} to {}",
468                        timeline,
469                        now,
470                    );
471                }
472            };
473            let read_ts = oracle.read_ts().await;
474            read_holds.downgrade(read_ts);
475            self.global_timelines
476                .insert(timeline, TimelineState { oracle, read_holds });
477        }
478    }
479}
480
481fn upper_bound(now: &mz_repr::Timestamp) -> mz_repr::Timestamp {
484    const TIMESTAMP_INTERVAL_MS: u64 = 5000;
485    const TIMESTAMP_INTERVAL_UPPER_BOUND: u64 = 2;
486
487    now.saturating_add(TIMESTAMP_INTERVAL_MS * TIMESTAMP_INTERVAL_UPPER_BOUND)
488}