1use std::collections::{BTreeMap, BTreeSet};
13use std::fmt;
14use std::sync::Arc;
15
16use chrono::{DateTime, Utc};
17use futures::Future;
18use mz_adapter_types::connection::ConnectionId;
19use mz_compute_types::ComputeInstanceId;
20use mz_ore::now::{EpochMillis, NowFn, to_datetime};
21use mz_ore::{instrument, soft_assert_or_log};
22use mz_repr::{GlobalId, Timestamp};
23use mz_sql::names::{ResolvedDatabaseSpecifier, SchemaSpecifier};
24use mz_storage_types::sources::Timeline;
25use mz_timestamp_oracle::batching_oracle::BatchingTimestampOracle;
26use mz_timestamp_oracle::{self, TimestampOracle, TimestampOracleConfig, WriteTimestamp};
27use timely::progress::Timestamp as _;
28use tracing::{Instrument, debug, error, info};
29
30use crate::AdapterError;
31use crate::catalog::Catalog;
32use crate::coord::Coordinator;
33use crate::coord::id_bundle::CollectionIdBundle;
34use crate::coord::read_policy::ReadHolds;
35use crate::coord::timestamp_selection::TimestampProvider;
36use crate::optimize::dataflows::DataflowBuilder;
37
38#[derive(Clone, Debug, Ord, PartialOrd, Eq, PartialEq, Hash)]
41pub enum TimelineContext {
42 TimelineDependent(Timeline),
45 TimestampDependent,
48 TimestampIndependent,
50}
51
52impl TimelineContext {
53 pub fn contains_timeline(&self) -> bool {
55 self.timeline().is_some()
56 }
57
58 pub fn timeline(&self) -> Option<&Timeline> {
60 match self {
61 Self::TimelineDependent(timeline) => Some(timeline),
62 Self::TimestampIndependent | Self::TimestampDependent => None,
63 }
64 }
65}
66
67pub(crate) struct TimelineState {
73 pub(crate) oracle: Arc<dyn TimestampOracle<Timestamp> + Send + Sync>,
74 pub(crate) read_holds: ReadHolds,
75}
76
77impl fmt::Debug for TimelineState {
78 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
79 f.debug_struct("TimelineState")
80 .field("read_holds", &self.read_holds)
81 .finish()
82 }
83}
84
85impl Coordinator {
86 pub(crate) fn now(&self) -> EpochMillis {
87 (self.catalog().config().now)()
88 }
89
90 pub(crate) fn now_datetime(&self) -> DateTime<Utc> {
91 to_datetime(self.now())
92 }
93
94 pub(crate) fn get_timestamp_oracle(
95 &self,
96 timeline: &Timeline,
97 ) -> Arc<dyn TimestampOracle<Timestamp> + Send + Sync> {
98 let oracle = &self
99 .global_timelines
100 .get(timeline)
101 .expect("all timelines have a timestamp oracle")
102 .oracle;
103
104 Arc::clone(oracle)
105 }
106
107 pub(crate) fn get_local_timestamp_oracle(
109 &self,
110 ) -> Arc<dyn TimestampOracle<Timestamp> + Send + Sync> {
111 self.get_timestamp_oracle(&Timeline::EpochMilliseconds)
112 }
113
114 pub(crate) async fn get_local_read_ts(&self) -> Timestamp {
118 self.get_local_timestamp_oracle().read_ts().await
119 }
120
121 #[instrument(name = "coord::get_local_write_ts")]
125 pub(crate) async fn get_local_write_ts(&mut self) -> WriteTimestamp {
126 self.global_timelines
127 .get_mut(&Timeline::EpochMilliseconds)
128 .expect("no realtime timeline")
129 .oracle
130 .write_ts()
131 .await
132 }
133
134 pub(crate) async fn peek_local_write_ts(&self) -> Timestamp {
137 self.get_local_timestamp_oracle().peek_write_ts().await
138 }
139
140 pub(crate) fn apply_local_write(
142 &self,
143 timestamp: Timestamp,
144 ) -> impl Future<Output = ()> + Send + 'static {
145 let now = self.now().into();
146
147 let upper_bound = upper_bound(&now);
148 if timestamp > upper_bound {
149 error!(
150 %now,
151 "Setting local read timestamp to {timestamp}, which is more than \
152 the desired upper bound {upper_bound}."
153 );
154 }
155
156 let oracle = self.get_local_timestamp_oracle();
157
158 async move {
159 oracle
160 .apply_write(timestamp)
161 .instrument(tracing::debug_span!("apply_local_write_static", ?timestamp))
162 .await
163 }
164 }
165
166 pub(crate) async fn get_catalog_write_ts(&mut self) -> Timestamp {
178 if self.read_only_controllers {
179 let (write_ts, upper) =
180 futures::future::join(self.peek_local_write_ts(), self.catalog().current_upper())
181 .await;
182 std::cmp::max(write_ts, upper)
183 } else {
184 self.get_local_write_ts().await.timestamp
185 }
186 }
187
188 pub(crate) async fn ensure_timeline_state<'a>(
190 &'a mut self,
191 timeline: &'a Timeline,
192 ) -> &'a mut TimelineState {
193 Self::ensure_timeline_state_with_initial_time(
194 timeline,
195 Timestamp::minimum(),
196 self.catalog().config().now.clone(),
197 self.timestamp_oracle_config.clone(),
198 &mut self.global_timelines,
199 self.read_only_controllers,
200 )
201 .await
202 }
203
204 #[instrument]
207 pub(crate) async fn ensure_timeline_state_with_initial_time<'a>(
208 timeline: &'a Timeline,
209 initially: Timestamp,
210 now: NowFn,
211 oracle_config: Option<TimestampOracleConfig>,
212 global_timelines: &'a mut BTreeMap<Timeline, TimelineState>,
213 read_only: bool,
214 ) -> &'a mut TimelineState {
215 if !global_timelines.contains_key(timeline) {
216 info!("opening a new TimestampOracle for timeline {:?}", timeline,);
217
218 let now_fn = if timeline == &Timeline::EpochMilliseconds {
219 now
220 } else {
221 NowFn::from(|| Timestamp::minimum().into())
230 };
231
232 let oracle_config = oracle_config.expect(
233 "missing --timestamp-oracle-url even though the timestamp oracle was configured",
234 );
235
236 let oracle = oracle_config
237 .open(timeline.to_string(), initially, now_fn, read_only)
238 .await;
239
240 let batching_oracle = BatchingTimestampOracle::new(oracle_config.metrics(), oracle);
241
242 let oracle: Arc<dyn TimestampOracle<mz_repr::Timestamp> + Send + Sync> =
243 Arc::new(batching_oracle);
244
245 global_timelines.insert(
246 timeline.clone(),
247 TimelineState {
248 oracle,
249 read_holds: ReadHolds::new(),
250 },
251 );
252 }
253 global_timelines.get_mut(timeline).expect("inserted above")
254 }
255
256 pub(crate) fn remove_resources_associated_with_timeline(
259 &mut self,
260 timeline: Timeline,
261 ids: CollectionIdBundle,
262 ) -> bool {
263 let TimelineState { read_holds, .. } = self
264 .global_timelines
265 .get_mut(&timeline)
266 .expect("all timeslines have a timestamp oracle");
267
268 for id in ids.storage_ids {
270 read_holds.remove_storage_collection(id);
271 }
272 for (compute_id, ids) in ids.compute_ids {
273 for id in ids {
274 read_holds.remove_compute_collection(compute_id, id);
275 }
276 }
277 let became_empty = read_holds.is_empty();
278
279 became_empty
280 }
281
282 #[instrument(level = "debug")]
283 pub(crate) async fn advance_timelines(&mut self) {
284 let global_timelines = std::mem::take(&mut self.global_timelines);
285 for (
286 timeline,
287 TimelineState {
288 oracle,
289 mut read_holds,
290 },
291 ) in global_timelines
292 {
293 if timeline != Timeline::EpochMilliseconds && !self.read_only_controllers {
296 let id_bundle = self.catalog().ids_in_timeline(&timeline);
300
301 if !id_bundle.is_empty() {
305 let least_valid_write = self.least_valid_write(&id_bundle);
306 let now = Self::largest_not_in_advance_of_upper(&least_valid_write);
307 oracle.apply_write(now).await;
308 debug!(
309 least_valid_write = ?least_valid_write,
310 oracle_read_ts = ?oracle.read_ts().await,
311 "advanced {:?} to {}",
312 timeline,
313 now,
314 );
315 }
316 };
317 let read_ts = oracle.read_ts().await;
318 read_holds.downgrade(read_ts);
319 self.global_timelines
320 .insert(timeline, TimelineState { oracle, read_holds });
321 }
322 }
323}
324
325fn upper_bound(now: &mz_repr::Timestamp) -> mz_repr::Timestamp {
328 const TIMESTAMP_INTERVAL_MS: u64 = 5000;
329 const TIMESTAMP_INTERVAL_UPPER_BOUND: u64 = 2;
330
331 now.saturating_add(TIMESTAMP_INTERVAL_MS * TIMESTAMP_INTERVAL_UPPER_BOUND)
332}
333
334pub(crate) fn timedomain_for<'a, I>(
348 catalog: &Catalog,
349 dataflow_builder: &DataflowBuilder,
350 uses_ids: I,
351 timeline_context: &TimelineContext,
352 conn_id: &ConnectionId,
353 compute_instance: ComputeInstanceId,
354) -> Result<CollectionIdBundle, AdapterError>
355where
356 I: IntoIterator<Item = &'a GlobalId>,
357{
358 let mut orig_uses_ids = Vec::new();
360
361 let mut schemas = BTreeSet::new();
363 for id in uses_ids {
364 orig_uses_ids.push(id.clone());
365
366 let entry = catalog.get_entry_by_global_id(id);
367 let name = entry.name();
368 schemas.insert((name.qualifiers.database_spec, name.qualifiers.schema_spec));
369 }
370
371 let pg_catalog_schema = (
372 ResolvedDatabaseSpecifier::Ambient,
373 SchemaSpecifier::Id(catalog.get_pg_catalog_schema_id()),
374 );
375 let system_schemas: Vec<_> = catalog
376 .system_schema_ids()
377 .map(|id| (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Id(id)))
378 .collect();
379
380 if system_schemas.iter().any(|s| schemas.contains(s)) {
381 schemas.extend(system_schemas);
384 } else if !schemas.is_empty() {
385 schemas.insert(pg_catalog_schema);
388 }
389
390 let mut collection_ids: BTreeSet<GlobalId> = BTreeSet::new();
392 for (db, schema) in schemas {
393 let schema = catalog.get_schema(&db, &schema, conn_id);
394 let global_ids = schema
398 .items
399 .values()
400 .map(|item_id| catalog.get_entry(item_id).latest_global_id());
401 collection_ids.extend(global_ids);
402 }
403
404 {
405 for id in orig_uses_ids.iter() {
409 soft_assert_or_log!(
410 collection_ids.contains(id),
411 "timedomain_for is about to miss {}",
412 id
413 );
414 }
415 }
416
417 let mut id_bundle: CollectionIdBundle = dataflow_builder.sufficient_collections(collection_ids);
419
420 for ids in [
422 &mut id_bundle.storage_ids,
423 &mut id_bundle.compute_ids.entry(compute_instance).or_default(),
424 ] {
425 ids.retain(|gid| {
426 let id_timeline_context = catalog
427 .validate_timeline_context(vec![*gid])
428 .expect("single id should never fail");
429 match (&id_timeline_context, &timeline_context) {
430 (
432 TimelineContext::TimestampIndependent | TimelineContext::TimestampDependent,
433 _,
434 ) => true,
435 (
439 TimelineContext::TimelineDependent(id_timeline),
440 TimelineContext::TimestampIndependent | TimelineContext::TimestampDependent,
441 ) => *id_timeline == Timeline::EpochMilliseconds,
442 (
444 TimelineContext::TimelineDependent(id_timeline),
445 TimelineContext::TimelineDependent(source_timeline),
446 ) => id_timeline == source_timeline,
447 }
448 });
449 }
450
451 Ok(id_bundle)
452}