1use std::collections::{BTreeMap, BTreeSet};
13use std::fmt;
14use std::sync::Arc;
15
16use chrono::{DateTime, Utc};
17use futures::Future;
18use mz_adapter_types::connection::ConnectionId;
19use mz_compute_types::ComputeInstanceId;
20use mz_ore::instrument;
21use mz_ore::now::{EpochMillis, NowFn, to_datetime};
22use mz_repr::{GlobalId, Timestamp};
23use mz_sql::names::{ResolvedDatabaseSpecifier, SchemaSpecifier};
24use mz_storage_types::sources::Timeline;
25use mz_timestamp_oracle::batching_oracle::BatchingTimestampOracle;
26use mz_timestamp_oracle::postgres_oracle::{
27 PostgresTimestampOracle, PostgresTimestampOracleConfig,
28};
29use mz_timestamp_oracle::{self, TimestampOracle, WriteTimestamp};
30use timely::progress::Timestamp as TimelyTimestamp;
31use tracing::{Instrument, debug, error, info};
32
33use crate::AdapterError;
34use crate::coord::Coordinator;
35use crate::coord::id_bundle::CollectionIdBundle;
36use crate::coord::read_policy::ReadHolds;
37use crate::coord::timestamp_selection::TimestampProvider;
38
39#[derive(Clone, Debug, Ord, PartialOrd, Eq, PartialEq, Hash)]
42pub enum TimelineContext {
43 TimelineDependent(Timeline),
46 TimestampDependent,
49 TimestampIndependent,
51}
52
53impl TimelineContext {
54 pub fn contains_timeline(&self) -> bool {
56 self.timeline().is_some()
57 }
58
59 pub fn timeline(&self) -> Option<&Timeline> {
61 match self {
62 Self::TimelineDependent(timeline) => Some(timeline),
63 Self::TimestampIndependent | Self::TimestampDependent => None,
64 }
65 }
66}
67
68pub(crate) struct TimelineState<T: TimelyTimestamp> {
74 pub(crate) oracle: Arc<dyn TimestampOracle<T> + Send + Sync>,
75 pub(crate) read_holds: ReadHolds<T>,
76}
77
78impl<T: TimelyTimestamp> fmt::Debug for TimelineState<T> {
79 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
80 f.debug_struct("TimelineState")
81 .field("read_holds", &self.read_holds)
82 .finish()
83 }
84}
85
86impl Coordinator {
87 pub(crate) fn now(&self) -> EpochMillis {
88 (self.catalog().config().now)()
89 }
90
91 pub(crate) fn now_datetime(&self) -> DateTime<Utc> {
92 to_datetime(self.now())
93 }
94
95 pub(crate) fn get_timestamp_oracle(
96 &self,
97 timeline: &Timeline,
98 ) -> Arc<dyn TimestampOracle<Timestamp> + Send + Sync> {
99 let oracle = &self
100 .global_timelines
101 .get(timeline)
102 .expect("all timelines have a timestamp oracle")
103 .oracle;
104
105 Arc::clone(oracle)
106 }
107
108 pub(crate) fn get_local_timestamp_oracle(
110 &self,
111 ) -> Arc<dyn TimestampOracle<Timestamp> + Send + Sync> {
112 self.get_timestamp_oracle(&Timeline::EpochMilliseconds)
113 }
114
115 pub(crate) async fn get_local_read_ts(&self) -> Timestamp {
119 self.get_local_timestamp_oracle().read_ts().await
120 }
121
122 #[instrument(name = "coord::get_local_write_ts")]
126 pub(crate) async fn get_local_write_ts(&mut self) -> WriteTimestamp {
127 self.global_timelines
128 .get_mut(&Timeline::EpochMilliseconds)
129 .expect("no realtime timeline")
130 .oracle
131 .write_ts()
132 .await
133 }
134
135 pub(crate) async fn peek_local_write_ts(&self) -> Timestamp {
138 self.get_local_timestamp_oracle().peek_write_ts().await
139 }
140
141 pub(crate) fn apply_local_write(
143 &self,
144 timestamp: Timestamp,
145 ) -> impl Future<Output = ()> + Send + 'static {
146 let now = self.now().into();
147
148 let upper_bound = upper_bound(&now);
149 if timestamp > upper_bound {
150 error!(
151 %now,
152 "Setting local read timestamp to {timestamp}, which is more than \
153 the desired upper bound {upper_bound}."
154 );
155 }
156
157 let oracle = self.get_local_timestamp_oracle();
158
159 async move {
160 oracle
161 .apply_write(timestamp)
162 .instrument(tracing::debug_span!("apply_local_write_static", ?timestamp))
163 .await
164 }
165 }
166
167 pub(crate) async fn get_catalog_write_ts(&mut self) -> Timestamp {
179 if self.read_only_controllers {
180 let (write_ts, upper) =
181 futures::future::join(self.peek_local_write_ts(), self.catalog().current_upper())
182 .await;
183 std::cmp::max(write_ts, upper)
184 } else {
185 self.get_local_write_ts().await.timestamp
186 }
187 }
188
189 pub(crate) async fn ensure_timeline_state<'a>(
191 &'a mut self,
192 timeline: &'a Timeline,
193 ) -> &'a mut TimelineState<Timestamp> {
194 Self::ensure_timeline_state_with_initial_time(
195 timeline,
196 Timestamp::minimum(),
197 self.catalog().config().now.clone(),
198 self.pg_timestamp_oracle_config.clone(),
199 &mut self.global_timelines,
200 self.read_only_controllers,
201 )
202 .await
203 }
204
205 #[instrument]
208 pub(crate) async fn ensure_timeline_state_with_initial_time<'a>(
209 timeline: &'a Timeline,
210 initially: Timestamp,
211 now: NowFn,
212 pg_oracle_config: Option<PostgresTimestampOracleConfig>,
213 global_timelines: &'a mut BTreeMap<Timeline, TimelineState<Timestamp>>,
214 read_only: bool,
215 ) -> &'a mut TimelineState<Timestamp> {
216 if !global_timelines.contains_key(timeline) {
217 info!(
218 "opening a new CRDB/postgres TimestampOracle for timeline {:?}",
219 timeline,
220 );
221
222 let now_fn = if timeline == &Timeline::EpochMilliseconds {
223 now
224 } else {
225 NowFn::from(|| Timestamp::minimum().into())
234 };
235
236 let pg_oracle_config = pg_oracle_config.expect(
237 "missing --timestamp-oracle-url even though the crdb-backed timestamp oracle was configured");
238
239 let batching_metrics = Arc::clone(&pg_oracle_config.metrics);
240
241 let pg_oracle: Arc<dyn TimestampOracle<mz_repr::Timestamp> + Send + Sync> = Arc::new(
242 PostgresTimestampOracle::open(
243 pg_oracle_config,
244 timeline.to_string(),
245 initially,
246 now_fn,
247 read_only,
248 )
249 .await,
250 );
251
252 let batching_oracle = BatchingTimestampOracle::new(batching_metrics, pg_oracle);
253
254 let oracle: Arc<dyn TimestampOracle<mz_repr::Timestamp> + Send + Sync> =
255 Arc::new(batching_oracle);
256
257 global_timelines.insert(
258 timeline.clone(),
259 TimelineState {
260 oracle,
261 read_holds: ReadHolds::new(),
262 },
263 );
264 }
265 global_timelines.get_mut(timeline).expect("inserted above")
266 }
267
268 pub(crate) fn build_collection_id_bundle(
270 &self,
271 storage_ids: impl IntoIterator<Item = GlobalId>,
272 compute_ids: impl IntoIterator<Item = (ComputeInstanceId, GlobalId)>,
273 clusters: impl IntoIterator<Item = ComputeInstanceId>,
274 ) -> CollectionIdBundle {
275 let mut compute: BTreeMap<_, BTreeSet<_>> = BTreeMap::new();
276
277 for (instance_id, id) in compute_ids {
279 compute.entry(instance_id).or_default().insert(id);
280 }
281
282 let cluster_set: BTreeSet<_> = clusters.into_iter().collect();
284 for (_timeline, TimelineState { read_holds, .. }) in &self.global_timelines {
285 let compute_ids = read_holds
286 .compute_ids()
287 .filter(|(instance_id, _id)| cluster_set.contains(instance_id));
288 for (instance_id, id) in compute_ids {
289 compute.entry(instance_id).or_default().insert(id);
290 }
291 }
292
293 CollectionIdBundle {
294 storage_ids: storage_ids.into_iter().collect(),
295 compute_ids: compute,
296 }
297 }
298
299 pub(crate) fn remove_resources_associated_with_timeline(
302 &mut self,
303 timeline: Timeline,
304 ids: CollectionIdBundle,
305 ) -> bool {
306 let TimelineState { read_holds, .. } = self
307 .global_timelines
308 .get_mut(&timeline)
309 .expect("all timeslines have a timestamp oracle");
310
311 for id in ids.storage_ids {
313 read_holds.remove_storage_collection(id);
314 }
315 for (compute_id, ids) in ids.compute_ids {
316 for id in ids {
317 read_holds.remove_compute_collection(compute_id, id);
318 }
319 }
320 let became_empty = read_holds.is_empty();
321
322 became_empty
323 }
324
325 pub(crate) fn remove_compute_ids_from_timeline<I>(&mut self, ids: I) -> Vec<Timeline>
326 where
327 I: IntoIterator<Item = (ComputeInstanceId, GlobalId)>,
328 {
329 let mut empty_timelines = BTreeSet::new();
330 for (compute_instance, id) in ids {
331 for (timeline, TimelineState { read_holds, .. }) in &mut self.global_timelines {
332 read_holds.remove_compute_collection(compute_instance, id);
333 if read_holds.is_empty() {
334 empty_timelines.insert(timeline.clone());
335 }
336 }
337 }
338 empty_timelines.into_iter().collect()
339 }
340
341 pub(crate) fn timedomain_for<'a, I>(
347 &self,
348 uses_ids: I,
349 timeline_context: &TimelineContext,
350 conn_id: &ConnectionId,
351 compute_instance: ComputeInstanceId,
352 ) -> Result<CollectionIdBundle, AdapterError>
353 where
354 I: IntoIterator<Item = &'a GlobalId>,
355 {
356 let mut schemas = BTreeSet::new();
358 for id in uses_ids {
359 let entry = self.catalog().get_entry_by_global_id(id);
360 let name = entry.name();
361 schemas.insert((name.qualifiers.database_spec, name.qualifiers.schema_spec));
362 }
363
364 let pg_catalog_schema = (
365 ResolvedDatabaseSpecifier::Ambient,
366 SchemaSpecifier::Id(self.catalog().get_pg_catalog_schema_id()),
367 );
368 let system_schemas: Vec<_> = self
369 .catalog()
370 .system_schema_ids()
371 .map(|id| (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Id(id)))
372 .collect();
373
374 if system_schemas.iter().any(|s| schemas.contains(s)) {
375 schemas.extend(system_schemas);
378 } else if !schemas.is_empty() {
379 schemas.insert(pg_catalog_schema);
382 }
383
384 let mut collection_ids: BTreeSet<GlobalId> = BTreeSet::new();
386 for (db, schema) in schemas {
387 let schema = self.catalog().get_schema(&db, &schema, conn_id);
388 let global_ids = schema
392 .items
393 .values()
394 .map(|item_id| self.catalog().get_entry(item_id).latest_global_id());
395 collection_ids.extend(global_ids);
396 }
397
398 let mut id_bundle: CollectionIdBundle = self
400 .index_oracle(compute_instance)
401 .sufficient_collections(collection_ids);
402
403 for ids in [
405 &mut id_bundle.storage_ids,
406 &mut id_bundle.compute_ids.entry(compute_instance).or_default(),
407 ] {
408 ids.retain(|gid| {
409 let id_timeline_context = self
410 .catalog()
411 .validate_timeline_context(vec![*gid])
412 .expect("single id should never fail");
413 match (&id_timeline_context, &timeline_context) {
414 (
416 TimelineContext::TimestampIndependent | TimelineContext::TimestampDependent,
417 _,
418 ) => true,
419 (
423 TimelineContext::TimelineDependent(id_timeline),
424 TimelineContext::TimestampIndependent | TimelineContext::TimestampDependent,
425 ) => id_timeline == &Timeline::EpochMilliseconds,
426 (
428 TimelineContext::TimelineDependent(id_timeline),
429 TimelineContext::TimelineDependent(source_timeline),
430 ) => id_timeline == source_timeline,
431 }
432 });
433 }
434
435 Ok(id_bundle)
436 }
437
438 #[instrument(level = "debug")]
439 pub(crate) async fn advance_timelines(&mut self) {
440 let global_timelines = std::mem::take(&mut self.global_timelines);
441 for (
442 timeline,
443 TimelineState {
444 oracle,
445 mut read_holds,
446 },
447 ) in global_timelines
448 {
449 if timeline != Timeline::EpochMilliseconds && !self.read_only_controllers {
452 let id_bundle = self.catalog().ids_in_timeline(&timeline);
456
457 if !id_bundle.is_empty() {
461 let least_valid_write = self.least_valid_write(&id_bundle);
462 let now = Self::largest_not_in_advance_of_upper(&least_valid_write);
463 oracle.apply_write(now).await;
464 debug!(
465 least_valid_write = ?least_valid_write,
466 oracle_read_ts = ?oracle.read_ts().await,
467 "advanced {:?} to {}",
468 timeline,
469 now,
470 );
471 }
472 };
473 let read_ts = oracle.read_ts().await;
474 read_holds.downgrade(read_ts);
475 self.global_timelines
476 .insert(timeline, TimelineState { oracle, read_holds });
477 }
478 }
479}
480
481fn upper_bound(now: &mz_repr::Timestamp) -> mz_repr::Timestamp {
484 const TIMESTAMP_INTERVAL_MS: u64 = 5000;
485 const TIMESTAMP_INTERVAL_UPPER_BOUND: u64 = 2;
486
487 now.saturating_add(TIMESTAMP_INTERVAL_MS * TIMESTAMP_INTERVAL_UPPER_BOUND)
488}