1use std::collections::{BTreeMap, BTreeSet};
13use std::fmt;
14use std::sync::Arc;
15
16use chrono::{DateTime, Utc};
17use futures::Future;
18use mz_adapter_types::connection::ConnectionId;
19use mz_compute_types::ComputeInstanceId;
20use mz_ore::instrument;
21use mz_ore::now::{EpochMillis, NowFn, to_datetime};
22use mz_repr::{GlobalId, Timestamp};
23use mz_sql::names::{ResolvedDatabaseSpecifier, SchemaSpecifier};
24use mz_storage_types::sources::Timeline;
25use mz_timestamp_oracle::batching_oracle::BatchingTimestampOracle;
26use mz_timestamp_oracle::postgres_oracle::{
27 PostgresTimestampOracle, PostgresTimestampOracleConfig,
28};
29use mz_timestamp_oracle::{self, TimestampOracle, WriteTimestamp};
30use timely::progress::Timestamp as TimelyTimestamp;
31use tracing::{Instrument, debug, error, info};
32
33use crate::AdapterError;
34use crate::catalog::Catalog;
35use crate::coord::Coordinator;
36use crate::coord::id_bundle::CollectionIdBundle;
37use crate::coord::read_policy::ReadHolds;
38use crate::coord::timestamp_selection::TimestampProvider;
39use crate::optimize::dataflows::DataflowBuilder;
40
41#[derive(Clone, Debug, Ord, PartialOrd, Eq, PartialEq, Hash)]
44pub enum TimelineContext {
45 TimelineDependent(Timeline),
48 TimestampDependent,
51 TimestampIndependent,
53}
54
55impl TimelineContext {
56 pub fn contains_timeline(&self) -> bool {
58 self.timeline().is_some()
59 }
60
61 pub fn timeline(&self) -> Option<&Timeline> {
63 match self {
64 Self::TimelineDependent(timeline) => Some(timeline),
65 Self::TimestampIndependent | Self::TimestampDependent => None,
66 }
67 }
68}
69
70pub(crate) struct TimelineState<T: TimelyTimestamp> {
76 pub(crate) oracle: Arc<dyn TimestampOracle<T> + Send + Sync>,
77 pub(crate) read_holds: ReadHolds<T>,
78}
79
80impl<T: TimelyTimestamp> fmt::Debug for TimelineState<T> {
81 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
82 f.debug_struct("TimelineState")
83 .field("read_holds", &self.read_holds)
84 .finish()
85 }
86}
87
88impl Coordinator {
89 pub(crate) fn now(&self) -> EpochMillis {
90 (self.catalog().config().now)()
91 }
92
93 pub(crate) fn now_datetime(&self) -> DateTime<Utc> {
94 to_datetime(self.now())
95 }
96
97 pub(crate) fn get_timestamp_oracle(
98 &self,
99 timeline: &Timeline,
100 ) -> Arc<dyn TimestampOracle<Timestamp> + Send + Sync> {
101 let oracle = &self
102 .global_timelines
103 .get(timeline)
104 .expect("all timelines have a timestamp oracle")
105 .oracle;
106
107 Arc::clone(oracle)
108 }
109
110 pub(crate) fn get_local_timestamp_oracle(
112 &self,
113 ) -> Arc<dyn TimestampOracle<Timestamp> + Send + Sync> {
114 self.get_timestamp_oracle(&Timeline::EpochMilliseconds)
115 }
116
117 pub(crate) async fn get_local_read_ts(&self) -> Timestamp {
121 self.get_local_timestamp_oracle().read_ts().await
122 }
123
124 #[instrument(name = "coord::get_local_write_ts")]
128 pub(crate) async fn get_local_write_ts(&mut self) -> WriteTimestamp {
129 self.global_timelines
130 .get_mut(&Timeline::EpochMilliseconds)
131 .expect("no realtime timeline")
132 .oracle
133 .write_ts()
134 .await
135 }
136
137 pub(crate) async fn peek_local_write_ts(&self) -> Timestamp {
140 self.get_local_timestamp_oracle().peek_write_ts().await
141 }
142
143 pub(crate) fn apply_local_write(
145 &self,
146 timestamp: Timestamp,
147 ) -> impl Future<Output = ()> + Send + 'static {
148 let now = self.now().into();
149
150 let upper_bound = upper_bound(&now);
151 if timestamp > upper_bound {
152 error!(
153 %now,
154 "Setting local read timestamp to {timestamp}, which is more than \
155 the desired upper bound {upper_bound}."
156 );
157 }
158
159 let oracle = self.get_local_timestamp_oracle();
160
161 async move {
162 oracle
163 .apply_write(timestamp)
164 .instrument(tracing::debug_span!("apply_local_write_static", ?timestamp))
165 .await
166 }
167 }
168
169 pub(crate) async fn get_catalog_write_ts(&mut self) -> Timestamp {
181 if self.read_only_controllers {
182 let (write_ts, upper) =
183 futures::future::join(self.peek_local_write_ts(), self.catalog().current_upper())
184 .await;
185 std::cmp::max(write_ts, upper)
186 } else {
187 self.get_local_write_ts().await.timestamp
188 }
189 }
190
191 pub(crate) async fn ensure_timeline_state<'a>(
193 &'a mut self,
194 timeline: &'a Timeline,
195 ) -> &'a mut TimelineState<Timestamp> {
196 Self::ensure_timeline_state_with_initial_time(
197 timeline,
198 Timestamp::minimum(),
199 self.catalog().config().now.clone(),
200 self.pg_timestamp_oracle_config.clone(),
201 &mut self.global_timelines,
202 self.read_only_controllers,
203 )
204 .await
205 }
206
207 #[instrument]
210 pub(crate) async fn ensure_timeline_state_with_initial_time<'a>(
211 timeline: &'a Timeline,
212 initially: Timestamp,
213 now: NowFn,
214 pg_oracle_config: Option<PostgresTimestampOracleConfig>,
215 global_timelines: &'a mut BTreeMap<Timeline, TimelineState<Timestamp>>,
216 read_only: bool,
217 ) -> &'a mut TimelineState<Timestamp> {
218 if !global_timelines.contains_key(timeline) {
219 info!(
220 "opening a new CRDB/postgres TimestampOracle for timeline {:?}",
221 timeline,
222 );
223
224 let now_fn = if timeline == &Timeline::EpochMilliseconds {
225 now
226 } else {
227 NowFn::from(|| Timestamp::minimum().into())
236 };
237
238 let pg_oracle_config = pg_oracle_config.expect(
239 "missing --timestamp-oracle-url even though the crdb-backed timestamp oracle was configured");
240
241 let batching_metrics = Arc::clone(&pg_oracle_config.metrics);
242
243 let pg_oracle: Arc<dyn TimestampOracle<mz_repr::Timestamp> + Send + Sync> = Arc::new(
244 PostgresTimestampOracle::open(
245 pg_oracle_config,
246 timeline.to_string(),
247 initially,
248 now_fn,
249 read_only,
250 )
251 .await,
252 );
253
254 let batching_oracle = BatchingTimestampOracle::new(batching_metrics, pg_oracle);
255
256 let oracle: Arc<dyn TimestampOracle<mz_repr::Timestamp> + Send + Sync> =
257 Arc::new(batching_oracle);
258
259 global_timelines.insert(
260 timeline.clone(),
261 TimelineState {
262 oracle,
263 read_holds: ReadHolds::new(),
264 },
265 );
266 }
267 global_timelines.get_mut(timeline).expect("inserted above")
268 }
269
270 pub(crate) fn remove_resources_associated_with_timeline(
273 &mut self,
274 timeline: Timeline,
275 ids: CollectionIdBundle,
276 ) -> bool {
277 let TimelineState { read_holds, .. } = self
278 .global_timelines
279 .get_mut(&timeline)
280 .expect("all timeslines have a timestamp oracle");
281
282 for id in ids.storage_ids {
284 read_holds.remove_storage_collection(id);
285 }
286 for (compute_id, ids) in ids.compute_ids {
287 for id in ids {
288 read_holds.remove_compute_collection(compute_id, id);
289 }
290 }
291 let became_empty = read_holds.is_empty();
292
293 became_empty
294 }
295
296 pub(crate) fn remove_compute_ids_from_timeline<I>(&mut self, ids: I) -> Vec<Timeline>
297 where
298 I: IntoIterator<Item = (ComputeInstanceId, GlobalId)>,
299 {
300 let mut empty_timelines = BTreeSet::new();
301 for (compute_instance, id) in ids {
302 for (timeline, TimelineState { read_holds, .. }) in &mut self.global_timelines {
303 read_holds.remove_compute_collection(compute_instance, id);
304 if read_holds.is_empty() {
305 empty_timelines.insert(timeline.clone());
306 }
307 }
308 }
309 empty_timelines.into_iter().collect()
310 }
311
312 #[instrument(level = "debug")]
313 pub(crate) async fn advance_timelines(&mut self) {
314 let global_timelines = std::mem::take(&mut self.global_timelines);
315 for (
316 timeline,
317 TimelineState {
318 oracle,
319 mut read_holds,
320 },
321 ) in global_timelines
322 {
323 if timeline != Timeline::EpochMilliseconds && !self.read_only_controllers {
326 let id_bundle = self.catalog().ids_in_timeline(&timeline);
330
331 if !id_bundle.is_empty() {
335 let least_valid_write = self.least_valid_write(&id_bundle);
336 let now = Self::largest_not_in_advance_of_upper(&least_valid_write);
337 oracle.apply_write(now).await;
338 debug!(
339 least_valid_write = ?least_valid_write,
340 oracle_read_ts = ?oracle.read_ts().await,
341 "advanced {:?} to {}",
342 timeline,
343 now,
344 );
345 }
346 };
347 let read_ts = oracle.read_ts().await;
348 read_holds.downgrade(read_ts);
349 self.global_timelines
350 .insert(timeline, TimelineState { oracle, read_holds });
351 }
352 }
353}
354
355fn upper_bound(now: &mz_repr::Timestamp) -> mz_repr::Timestamp {
358 const TIMESTAMP_INTERVAL_MS: u64 = 5000;
359 const TIMESTAMP_INTERVAL_UPPER_BOUND: u64 = 2;
360
361 now.saturating_add(TIMESTAMP_INTERVAL_MS * TIMESTAMP_INTERVAL_UPPER_BOUND)
362}
363
364pub(crate) fn timedomain_for<'a, I>(
373 catalog: &Catalog,
374 dataflow_builder: &DataflowBuilder,
375 uses_ids: I,
376 timeline_context: &TimelineContext,
377 conn_id: &ConnectionId,
378 compute_instance: ComputeInstanceId,
379) -> Result<CollectionIdBundle, AdapterError>
380where
381 I: IntoIterator<Item = &'a GlobalId>,
382{
383 let mut schemas = BTreeSet::new();
385 for id in uses_ids {
386 let entry = catalog.get_entry_by_global_id(id);
387 let name = entry.name();
388 schemas.insert((name.qualifiers.database_spec, name.qualifiers.schema_spec));
389 }
390
391 let pg_catalog_schema = (
392 ResolvedDatabaseSpecifier::Ambient,
393 SchemaSpecifier::Id(catalog.get_pg_catalog_schema_id()),
394 );
395 let system_schemas: Vec<_> = catalog
396 .system_schema_ids()
397 .map(|id| (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Id(id)))
398 .collect();
399
400 if system_schemas.iter().any(|s| schemas.contains(s)) {
401 schemas.extend(system_schemas);
404 } else if !schemas.is_empty() {
405 schemas.insert(pg_catalog_schema);
408 }
409
410 let mut collection_ids: BTreeSet<GlobalId> = BTreeSet::new();
412 for (db, schema) in schemas {
413 let schema = catalog.get_schema(&db, &schema, conn_id);
414 let global_ids = schema
418 .items
419 .values()
420 .map(|item_id| catalog.get_entry(item_id).latest_global_id());
421 collection_ids.extend(global_ids);
422 }
423
424 let mut id_bundle: CollectionIdBundle = dataflow_builder.sufficient_collections(collection_ids);
426
427 for ids in [
429 &mut id_bundle.storage_ids,
430 &mut id_bundle.compute_ids.entry(compute_instance).or_default(),
431 ] {
432 ids.retain(|gid| {
433 let id_timeline_context = catalog
434 .validate_timeline_context(vec![*gid])
435 .expect("single id should never fail");
436 match (&id_timeline_context, &timeline_context) {
437 (
439 TimelineContext::TimestampIndependent | TimelineContext::TimestampDependent,
440 _,
441 ) => true,
442 (
446 TimelineContext::TimelineDependent(id_timeline),
447 TimelineContext::TimestampIndependent | TimelineContext::TimestampDependent,
448 ) => *id_timeline == Timeline::EpochMilliseconds,
449 (
451 TimelineContext::TimelineDependent(id_timeline),
452 TimelineContext::TimelineDependent(source_timeline),
453 ) => id_timeline == source_timeline,
454 }
455 });
456 }
457
458 Ok(id_bundle)
459}