1use std::collections::{BTreeMap, BTreeSet};
13use std::fmt;
14use std::sync::Arc;
15
16use chrono::{DateTime, Utc};
17use futures::Future;
18use mz_adapter_types::connection::ConnectionId;
19use mz_compute_types::ComputeInstanceId;
20use mz_ore::now::{EpochMillis, NowFn, to_datetime};
21use mz_ore::{instrument, soft_assert_or_log};
22use mz_repr::{GlobalId, Timestamp};
23use mz_sql::names::{ResolvedDatabaseSpecifier, SchemaSpecifier};
24use mz_storage_types::sources::Timeline;
25use mz_timestamp_oracle::batching_oracle::BatchingTimestampOracle;
26use mz_timestamp_oracle::{self, TimestampOracle, TimestampOracleConfig, WriteTimestamp};
27use timely::progress::Timestamp as TimelyTimestamp;
28use tracing::{Instrument, debug, error, info};
29
30use crate::AdapterError;
31use crate::catalog::Catalog;
32use crate::coord::Coordinator;
33use crate::coord::id_bundle::CollectionIdBundle;
34use crate::coord::read_policy::ReadHolds;
35use crate::coord::timestamp_selection::TimestampProvider;
36use crate::optimize::dataflows::DataflowBuilder;
37
38#[derive(Clone, Debug, Ord, PartialOrd, Eq, PartialEq, Hash)]
41pub enum TimelineContext {
42 TimelineDependent(Timeline),
45 TimestampDependent,
48 TimestampIndependent,
50}
51
52impl TimelineContext {
53 pub fn contains_timeline(&self) -> bool {
55 self.timeline().is_some()
56 }
57
58 pub fn timeline(&self) -> Option<&Timeline> {
60 match self {
61 Self::TimelineDependent(timeline) => Some(timeline),
62 Self::TimestampIndependent | Self::TimestampDependent => None,
63 }
64 }
65}
66
67pub(crate) struct TimelineState<T: TimelyTimestamp> {
73 pub(crate) oracle: Arc<dyn TimestampOracle<T> + Send + Sync>,
74 pub(crate) read_holds: ReadHolds<T>,
75}
76
77impl<T: TimelyTimestamp> fmt::Debug for TimelineState<T> {
78 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
79 f.debug_struct("TimelineState")
80 .field("read_holds", &self.read_holds)
81 .finish()
82 }
83}
84
85impl Coordinator {
86 pub(crate) fn now(&self) -> EpochMillis {
87 (self.catalog().config().now)()
88 }
89
90 pub(crate) fn now_datetime(&self) -> DateTime<Utc> {
91 to_datetime(self.now())
92 }
93
94 pub(crate) fn get_timestamp_oracle(
95 &self,
96 timeline: &Timeline,
97 ) -> Arc<dyn TimestampOracle<Timestamp> + Send + Sync> {
98 let oracle = &self
99 .global_timelines
100 .get(timeline)
101 .expect("all timelines have a timestamp oracle")
102 .oracle;
103
104 Arc::clone(oracle)
105 }
106
107 pub(crate) fn get_local_timestamp_oracle(
109 &self,
110 ) -> Arc<dyn TimestampOracle<Timestamp> + Send + Sync> {
111 self.get_timestamp_oracle(&Timeline::EpochMilliseconds)
112 }
113
114 pub(crate) async fn get_local_read_ts(&self) -> Timestamp {
118 self.get_local_timestamp_oracle().read_ts().await
119 }
120
121 #[instrument(name = "coord::get_local_write_ts")]
125 pub(crate) async fn get_local_write_ts(&mut self) -> WriteTimestamp {
126 self.global_timelines
127 .get_mut(&Timeline::EpochMilliseconds)
128 .expect("no realtime timeline")
129 .oracle
130 .write_ts()
131 .await
132 }
133
134 pub(crate) async fn peek_local_write_ts(&self) -> Timestamp {
137 self.get_local_timestamp_oracle().peek_write_ts().await
138 }
139
140 pub(crate) fn apply_local_write(
142 &self,
143 timestamp: Timestamp,
144 ) -> impl Future<Output = ()> + Send + 'static {
145 let now = self.now().into();
146
147 let upper_bound = upper_bound(&now);
148 if timestamp > upper_bound {
149 error!(
150 %now,
151 "Setting local read timestamp to {timestamp}, which is more than \
152 the desired upper bound {upper_bound}."
153 );
154 }
155
156 let oracle = self.get_local_timestamp_oracle();
157
158 async move {
159 oracle
160 .apply_write(timestamp)
161 .instrument(tracing::debug_span!("apply_local_write_static", ?timestamp))
162 .await
163 }
164 }
165
166 pub(crate) async fn get_catalog_write_ts(&mut self) -> Timestamp {
178 if self.read_only_controllers {
179 let (write_ts, upper) =
180 futures::future::join(self.peek_local_write_ts(), self.catalog().current_upper())
181 .await;
182 std::cmp::max(write_ts, upper)
183 } else {
184 self.get_local_write_ts().await.timestamp
185 }
186 }
187
188 pub(crate) async fn ensure_timeline_state<'a>(
190 &'a mut self,
191 timeline: &'a Timeline,
192 ) -> &'a mut TimelineState<Timestamp> {
193 Self::ensure_timeline_state_with_initial_time(
194 timeline,
195 Timestamp::minimum(),
196 self.catalog().config().now.clone(),
197 self.timestamp_oracle_config.clone(),
198 &mut self.global_timelines,
199 self.read_only_controllers,
200 )
201 .await
202 }
203
204 #[instrument]
207 pub(crate) async fn ensure_timeline_state_with_initial_time<'a>(
208 timeline: &'a Timeline,
209 initially: Timestamp,
210 now: NowFn,
211 oracle_config: Option<TimestampOracleConfig>,
212 global_timelines: &'a mut BTreeMap<Timeline, TimelineState<Timestamp>>,
213 read_only: bool,
214 ) -> &'a mut TimelineState<Timestamp> {
215 if !global_timelines.contains_key(timeline) {
216 info!("opening a new TimestampOracle for timeline {:?}", timeline,);
217
218 let now_fn = if timeline == &Timeline::EpochMilliseconds {
219 now
220 } else {
221 NowFn::from(|| Timestamp::minimum().into())
230 };
231
232 let oracle_config = oracle_config.expect(
233 "missing --timestamp-oracle-url even though the timestamp oracle was configured",
234 );
235
236 let oracle = oracle_config
237 .open(timeline.to_string(), initially, now_fn, read_only)
238 .await;
239
240 let batching_oracle = BatchingTimestampOracle::new(oracle_config.metrics(), oracle);
241
242 let oracle: Arc<dyn TimestampOracle<mz_repr::Timestamp> + Send + Sync> =
243 Arc::new(batching_oracle);
244
245 global_timelines.insert(
246 timeline.clone(),
247 TimelineState {
248 oracle,
249 read_holds: ReadHolds::new(),
250 },
251 );
252 }
253 global_timelines.get_mut(timeline).expect("inserted above")
254 }
255
256 pub(crate) fn remove_resources_associated_with_timeline(
259 &mut self,
260 timeline: Timeline,
261 ids: CollectionIdBundle,
262 ) -> bool {
263 let TimelineState { read_holds, .. } = self
264 .global_timelines
265 .get_mut(&timeline)
266 .expect("all timeslines have a timestamp oracle");
267
268 for id in ids.storage_ids {
270 read_holds.remove_storage_collection(id);
271 }
272 for (compute_id, ids) in ids.compute_ids {
273 for id in ids {
274 read_holds.remove_compute_collection(compute_id, id);
275 }
276 }
277 let became_empty = read_holds.is_empty();
278
279 became_empty
280 }
281
282 pub(crate) fn remove_compute_ids_from_timeline<I>(&mut self, ids: I) -> Vec<Timeline>
283 where
284 I: IntoIterator<Item = (ComputeInstanceId, GlobalId)>,
285 {
286 let mut empty_timelines = BTreeSet::new();
287 for (compute_instance, id) in ids {
288 for (timeline, TimelineState { read_holds, .. }) in &mut self.global_timelines {
289 read_holds.remove_compute_collection(compute_instance, id);
290 if read_holds.is_empty() {
291 empty_timelines.insert(timeline.clone());
292 }
293 }
294 }
295 empty_timelines.into_iter().collect()
296 }
297
298 #[instrument(level = "debug")]
299 pub(crate) async fn advance_timelines(&mut self) {
300 let global_timelines = std::mem::take(&mut self.global_timelines);
301 for (
302 timeline,
303 TimelineState {
304 oracle,
305 mut read_holds,
306 },
307 ) in global_timelines
308 {
309 if timeline != Timeline::EpochMilliseconds && !self.read_only_controllers {
312 let id_bundle = self.catalog().ids_in_timeline(&timeline);
316
317 if !id_bundle.is_empty() {
321 let least_valid_write = self.least_valid_write(&id_bundle);
322 let now = Self::largest_not_in_advance_of_upper(&least_valid_write);
323 oracle.apply_write(now).await;
324 debug!(
325 least_valid_write = ?least_valid_write,
326 oracle_read_ts = ?oracle.read_ts().await,
327 "advanced {:?} to {}",
328 timeline,
329 now,
330 );
331 }
332 };
333 let read_ts = oracle.read_ts().await;
334 read_holds.downgrade(read_ts);
335 self.global_timelines
336 .insert(timeline, TimelineState { oracle, read_holds });
337 }
338 }
339}
340
341fn upper_bound(now: &mz_repr::Timestamp) -> mz_repr::Timestamp {
344 const TIMESTAMP_INTERVAL_MS: u64 = 5000;
345 const TIMESTAMP_INTERVAL_UPPER_BOUND: u64 = 2;
346
347 now.saturating_add(TIMESTAMP_INTERVAL_MS * TIMESTAMP_INTERVAL_UPPER_BOUND)
348}
349
350pub(crate) fn timedomain_for<'a, I>(
364 catalog: &Catalog,
365 dataflow_builder: &DataflowBuilder,
366 uses_ids: I,
367 timeline_context: &TimelineContext,
368 conn_id: &ConnectionId,
369 compute_instance: ComputeInstanceId,
370) -> Result<CollectionIdBundle, AdapterError>
371where
372 I: IntoIterator<Item = &'a GlobalId>,
373{
374 let mut orig_uses_ids = Vec::new();
376
377 let mut schemas = BTreeSet::new();
379 for id in uses_ids {
380 orig_uses_ids.push(id.clone());
381
382 let entry = catalog.get_entry_by_global_id(id);
383 let name = entry.name();
384 schemas.insert((name.qualifiers.database_spec, name.qualifiers.schema_spec));
385 }
386
387 let pg_catalog_schema = (
388 ResolvedDatabaseSpecifier::Ambient,
389 SchemaSpecifier::Id(catalog.get_pg_catalog_schema_id()),
390 );
391 let system_schemas: Vec<_> = catalog
392 .system_schema_ids()
393 .map(|id| (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Id(id)))
394 .collect();
395
396 if system_schemas.iter().any(|s| schemas.contains(s)) {
397 schemas.extend(system_schemas);
400 } else if !schemas.is_empty() {
401 schemas.insert(pg_catalog_schema);
404 }
405
406 let mut collection_ids: BTreeSet<GlobalId> = BTreeSet::new();
408 for (db, schema) in schemas {
409 let schema = catalog.get_schema(&db, &schema, conn_id);
410 let global_ids = schema
414 .items
415 .values()
416 .map(|item_id| catalog.get_entry(item_id).latest_global_id());
417 collection_ids.extend(global_ids);
418 }
419
420 {
421 for id in orig_uses_ids.iter() {
425 soft_assert_or_log!(
426 collection_ids.contains(id),
427 "timedomain_for is about to miss {}",
428 id
429 );
430 }
431 }
432
433 let mut id_bundle: CollectionIdBundle = dataflow_builder.sufficient_collections(collection_ids);
435
436 for ids in [
438 &mut id_bundle.storage_ids,
439 &mut id_bundle.compute_ids.entry(compute_instance).or_default(),
440 ] {
441 ids.retain(|gid| {
442 let id_timeline_context = catalog
443 .validate_timeline_context(vec![*gid])
444 .expect("single id should never fail");
445 match (&id_timeline_context, &timeline_context) {
446 (
448 TimelineContext::TimestampIndependent | TimelineContext::TimestampDependent,
449 _,
450 ) => true,
451 (
455 TimelineContext::TimelineDependent(id_timeline),
456 TimelineContext::TimestampIndependent | TimelineContext::TimestampDependent,
457 ) => *id_timeline == Timeline::EpochMilliseconds,
458 (
460 TimelineContext::TimelineDependent(id_timeline),
461 TimelineContext::TimelineDependent(source_timeline),
462 ) => id_timeline == source_timeline,
463 }
464 });
465 }
466
467 Ok(id_bundle)
468}