mz_storage_client/controller.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A controller that provides an interface to the storage layer.
11//!
12//! The storage controller curates the creation of sources, the progress of readers through these collections,
13//! and their eventual dropping and resource reclamation.
14//!
15//! The storage controller can be viewed as a partial map from `GlobalId` to collection. It is an error to
16//! use an identifier before it has been "created" with `create_source()`. Once created, the controller holds
17//! a read capability for each source, which is manipulated with `update_read_capabilities()`.
18//! Eventually, the source is dropped with either `drop_sources()` or by allowing compaction to the
19//! empty frontier.
20
21use std::collections::{BTreeMap, BTreeSet};
22use std::fmt::Debug;
23use std::future::Future;
24use std::num::NonZeroI64;
25use std::pin::Pin;
26use std::sync::Arc;
27use std::time::Duration;
28
29use async_trait::async_trait;
30use chrono::{DateTime, Utc};
31use differential_dataflow::lattice::Lattice;
32use mz_cluster_client::ReplicaId;
33use mz_cluster_client::client::ClusterReplicaLocation;
34use mz_controller_types::dyncfgs::WALLCLOCK_LAG_HISTOGRAM_PERIOD_INTERVAL;
35use mz_dyncfg::ConfigSet;
36use mz_ore::soft_panic_or_log;
37use mz_persist_client::batch::ProtoBatch;
38use mz_persist_types::{Codec64, Opaque, ShardId};
39use mz_repr::adt::interval::Interval;
40use mz_repr::adt::timestamp::CheckedTimestamp;
41use mz_repr::{Datum, Diff, GlobalId, RelationDesc, RelationVersion, Row};
42use mz_storage_types::configuration::StorageConfiguration;
43use mz_storage_types::connections::inline::InlinedConnection;
44use mz_storage_types::controller::{CollectionMetadata, StorageError};
45use mz_storage_types::instances::StorageInstanceId;
46use mz_storage_types::oneshot_sources::{OneshotIngestionRequest, OneshotResultCallback};
47use mz_storage_types::parameters::StorageParameters;
48use mz_storage_types::read_holds::ReadHold;
49use mz_storage_types::read_policy::ReadPolicy;
50use mz_storage_types::sinks::{StorageSinkConnection, StorageSinkDesc};
51use mz_storage_types::sources::{
52 GenericSourceConnection, IngestionDescription, SourceDesc, SourceExportDataConfig,
53 SourceExportDetails, Timeline,
54};
55use serde::{Deserialize, Serialize};
56use timely::progress::Timestamp as TimelyTimestamp;
57use timely::progress::frontier::MutableAntichain;
58use timely::progress::{Antichain, Timestamp};
59use tokio::sync::{mpsc, oneshot};
60
61use crate::client::{AppendOnlyUpdate, StatusUpdate, TableData};
62use crate::statistics::WebhookStatistics;
63
64#[derive(Clone, Copy, Debug, Serialize, Deserialize, Eq, PartialEq, Hash, PartialOrd, Ord)]
65pub enum IntrospectionType {
66 /// We're not responsible for appending to this collection automatically, but we should
67 /// automatically bump the write frontier from time to time.
68 SinkStatusHistory,
69 SourceStatusHistory,
70 ShardMapping,
71
72 Frontiers,
73 ReplicaFrontiers,
74
75 ReplicaStatusHistory,
76 ReplicaMetricsHistory,
77 WallclockLagHistory,
78 WallclockLagHistogram,
79
80 // Note that this single-shard introspection source will be changed to per-replica,
81 // once we allow multiplexing multiple sources/sinks on a single cluster.
82 StorageSourceStatistics,
83 StorageSinkStatistics,
84
85 // The below are for statement logging.
86 StatementExecutionHistory,
87 SessionHistory,
88 PreparedStatementHistory,
89 SqlText,
90 // For statement lifecycle logging, which is closely related
91 // to statement logging
92 StatementLifecycleHistory,
93
94 // Collections written by the compute controller.
95 ComputeDependencies,
96 ComputeOperatorHydrationStatus,
97 ComputeMaterializedViewRefreshes,
98 ComputeErrorCounts,
99 ComputeHydrationTimes,
100
101 // Written by the Adapter for tracking AWS PrivateLink Connection Status History
102 PrivatelinkConnectionStatusHistory,
103}
104
105/// Describes how data is written to the collection.
106#[derive(Clone, Debug, Eq, PartialEq)]
107pub enum DataSource<T> {
108 /// Ingest data from some external source.
109 Ingestion(IngestionDescription),
110 /// This source receives its data from the identified ingestion,
111 /// from an external object identified using `SourceExportDetails`.
112 ///
113 /// The referenced ingestion must be created before all of its exports.
114 IngestionExport {
115 ingestion_id: GlobalId,
116 details: SourceExportDetails,
117 data_config: SourceExportDataConfig,
118 },
119 /// Data comes from introspection sources, which the controller itself is
120 /// responsible for generating.
121 Introspection(IntrospectionType),
122 /// Data comes from the source's remapping/reclock operator.
123 Progress,
124 /// Data comes from external HTTP requests pushed to Materialize.
125 Webhook,
126 /// The adapter layer appends timestamped data, i.e. it is a `TABLE`.
127 Table {
128 /// This table has had columns added or dropped to it, so we're now a
129 /// "view" over the "primary" Table/collection. Within the
130 /// `storage-controller` we the primary as a dependency.
131 primary: Option<GlobalId>,
132 },
133 /// This source's data does not need to be managed by the storage
134 /// controller, e.g. it's a materialized view or the catalog collection.
135 Other,
136 /// This collection is the output collection of a sink.
137 Sink { desc: ExportDescription<T> },
138}
139
140/// Describes a request to create a source.
141#[derive(Clone, Debug, Eq, PartialEq)]
142pub struct CollectionDescription<T> {
143 /// The schema of this collection
144 pub desc: RelationDesc,
145 /// The source of this collection's data.
146 pub data_source: DataSource<T>,
147 /// An optional frontier to which the collection's `since` should be advanced.
148 pub since: Option<Antichain<T>>,
149 /// A GlobalId to use for this collection to use for the status collection.
150 /// Used to keep track of source status/error information.
151 pub status_collection_id: Option<GlobalId>,
152 /// The timeline of the source. Absent for materialized views, continual tasks, etc.
153 pub timeline: Option<Timeline>,
154}
155
156impl<T> CollectionDescription<T> {
157 pub fn for_table(desc: RelationDesc, primary: Option<GlobalId>) -> Self {
158 Self {
159 desc,
160 data_source: DataSource::Table { primary },
161 since: None,
162 status_collection_id: None,
163 timeline: Some(Timeline::EpochMilliseconds),
164 }
165 }
166}
167
168#[derive(Clone, Debug, Eq, PartialEq)]
169pub struct ExportDescription<T = mz_repr::Timestamp> {
170 pub sink: StorageSinkDesc<(), T>,
171 /// The ID of the instance in which to install the export.
172 pub instance_id: StorageInstanceId,
173}
174
175#[derive(Debug)]
176pub enum Response<T> {
177 FrontierUpdates(Vec<(GlobalId, Antichain<T>)>),
178}
179
180/// Metadata that the storage controller must know to properly handle the life
181/// cycle of creating and dropping collections.
182///
183/// This data should be kept consistent with the state modified using
184/// [`StorageTxn`].
185///
186/// n.b. the "txn WAL shard" is also metadata that's persisted, but if we
187/// included it in this struct it would never be read.
188#[derive(Debug, Clone, Serialize, Default)]
189pub struct StorageMetadata {
190 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
191 pub collection_metadata: BTreeMap<GlobalId, ShardId>,
192 pub unfinalized_shards: BTreeSet<ShardId>,
193}
194
195impl StorageMetadata {
196 pub fn get_collection_shard<T>(&self, id: GlobalId) -> Result<ShardId, StorageError<T>> {
197 let shard_id = self
198 .collection_metadata
199 .get(&id)
200 .ok_or(StorageError::IdentifierMissing(id))?;
201
202 Ok(*shard_id)
203 }
204}
205
206/// Provides an interface for the storage controller to read and write data that
207/// is recorded elsewhere.
208///
209/// Data written to the implementor of this trait should make a consistent view
210/// of the data available through [`StorageMetadata`].
211#[async_trait]
212pub trait StorageTxn<T> {
213 /// Retrieve all of the visible storage metadata.
214 ///
215 /// The value of this map should be treated as opaque.
216 fn get_collection_metadata(&self) -> BTreeMap<GlobalId, ShardId>;
217
218 /// Add new storage metadata for a collection.
219 ///
220 /// Subsequent calls to [`StorageTxn::get_collection_metadata`] must include
221 /// this data.
222 fn insert_collection_metadata(
223 &mut self,
224 s: BTreeMap<GlobalId, ShardId>,
225 ) -> Result<(), StorageError<T>>;
226
227 /// Remove the metadata associated with the identified collections.
228 ///
229 /// Subsequent calls to [`StorageTxn::get_collection_metadata`] must not
230 /// include these keys.
231 fn delete_collection_metadata(&mut self, ids: BTreeSet<GlobalId>) -> Vec<(GlobalId, ShardId)>;
232
233 /// Retrieve all of the shards that are no longer in use by an active
234 /// collection but are yet to be finalized.
235 fn get_unfinalized_shards(&self) -> BTreeSet<ShardId>;
236
237 /// Insert the specified values as unfinalized shards.
238 fn insert_unfinalized_shards(&mut self, s: BTreeSet<ShardId>) -> Result<(), StorageError<T>>;
239
240 /// Mark the specified shards as finalized, deleting them from the
241 /// unfinalized shard collection.
242 fn mark_shards_as_finalized(&mut self, shards: BTreeSet<ShardId>);
243
244 /// Get the txn WAL shard for this environment if it exists.
245 fn get_txn_wal_shard(&self) -> Option<ShardId>;
246
247 /// Store the specified shard as the environment's txn WAL shard.
248 ///
249 /// The implementor should error if the shard is already specified.
250 fn write_txn_wal_shard(&mut self, shard: ShardId) -> Result<(), StorageError<T>>;
251}
252
253pub type BoxFuture<T> = Pin<Box<dyn Future<Output = T> + Send + 'static>>;
254
255/// A predicate for a `Row` filter.
256pub type RowPredicate = Box<dyn Fn(&Row) -> bool + Send + Sync>;
257
258/// High-level write operations applicable to storage collections.
259pub enum StorageWriteOp {
260 /// Append a set of rows with specified multiplicities.
261 ///
262 /// The multiplicities may be negative, so an `Append` operation can perform
263 /// both insertions and retractions.
264 Append { updates: Vec<(Row, Diff)> },
265 /// Delete all rows matching the given predicate.
266 Delete { filter: RowPredicate },
267}
268
269impl StorageWriteOp {
270 /// Returns whether this operation appends an empty set of updates.
271 pub fn is_empty_append(&self) -> bool {
272 match self {
273 Self::Append { updates } => updates.is_empty(),
274 Self::Delete { .. } => false,
275 }
276 }
277}
278
279#[async_trait(?Send)]
280pub trait StorageController: Debug {
281 type Timestamp: TimelyTimestamp;
282
283 /// Marks the end of any initialization commands.
284 ///
285 /// The implementor may wait for this method to be called before implementing prior commands,
286 /// and so it is important for a user to invoke this method as soon as it is comfortable.
287 /// This method can be invoked immediately, at the potential expense of performance.
288 fn initialization_complete(&mut self);
289
290 /// Update storage configuration with new parameters.
291 fn update_parameters(&mut self, config_params: StorageParameters);
292
293 /// Get the current configuration, including parameters updated with `update_parameters`.
294 fn config(&self) -> &StorageConfiguration;
295
296 /// Returns the [CollectionMetadata] of the collection identified by `id`.
297 fn collection_metadata(
298 &self,
299 id: GlobalId,
300 ) -> Result<CollectionMetadata, StorageError<Self::Timestamp>>;
301
302 /// Returns `true` iff the given collection/ingestion has been hydrated.
303 ///
304 /// For this check, zero-replica clusters are always considered hydrated.
305 /// Their collections would never normally be considered hydrated but it's
306 /// clearly intentional that they have no replicas.
307 fn collection_hydrated(
308 &self,
309 collection_id: GlobalId,
310 ) -> Result<bool, StorageError<Self::Timestamp>>;
311
312 /// Returns `true` if each non-transient, non-excluded collection is
313 /// hydrated on at least one of the provided replicas.
314 ///
315 /// If no replicas are provided, this checks for hydration on _any_ replica.
316 ///
317 /// This also returns `true` in case this cluster does not have any
318 /// replicas.
319 fn collections_hydrated_on_replicas(
320 &self,
321 target_replica_ids: Option<Vec<ReplicaId>>,
322 target_cluster_ids: &StorageInstanceId,
323 exclude_collections: &BTreeSet<GlobalId>,
324 ) -> Result<bool, StorageError<Self::Timestamp>>;
325
326 /// Returns the since/upper frontiers of the identified collection.
327 fn collection_frontiers(
328 &self,
329 id: GlobalId,
330 ) -> Result<
331 (Antichain<Self::Timestamp>, Antichain<Self::Timestamp>),
332 StorageError<Self::Timestamp>,
333 >;
334
335 /// Returns the since/upper frontiers of the identified collections.
336 ///
337 /// Having a method that returns both frontiers at the same time, for all
338 /// requested collections, ensures that we can get a consistent "snapshot"
339 /// of collection state. If we had separate methods instead, and/or would
340 /// allow getting frontiers for collections one at a time, it could happen
341 /// that collection state changes concurrently, while information is
342 /// gathered.
343 fn collections_frontiers(
344 &self,
345 id: Vec<GlobalId>,
346 ) -> Result<
347 Vec<(
348 GlobalId,
349 Antichain<Self::Timestamp>,
350 Antichain<Self::Timestamp>,
351 )>,
352 StorageError<Self::Timestamp>,
353 >;
354
355 /// Acquire an iterator over [CollectionMetadata] for all active
356 /// collections.
357 ///
358 /// A collection is "active" when it has a non empty frontier of read
359 /// capabilties.
360 fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)>;
361
362 /// Returns the IDs of ingestion exports running on the given instance. This
363 /// includes the ingestion itself, if any, and running source tables (aka.
364 /// subsources).
365 fn active_ingestion_exports(
366 &self,
367 instance_id: StorageInstanceId,
368 ) -> Box<dyn Iterator<Item = &GlobalId> + '_>;
369
370 /// Checks whether a collection exists under the given `GlobalId`. Returns
371 /// an error if the collection does not exist.
372 fn check_exists(&self, id: GlobalId) -> Result<(), StorageError<Self::Timestamp>>;
373
374 /// Creates a storage instance with the specified ID.
375 ///
376 /// A storage instance can have zero or one replicas. The instance is
377 /// created with zero replicas.
378 ///
379 /// Panics if a storage instance with the given ID already exists.
380 fn create_instance(&mut self, id: StorageInstanceId, workload_class: Option<String>);
381
382 /// Drops the storage instance with the given ID.
383 ///
384 /// If you call this method while the storage instance has a replica
385 /// attached, that replica will be leaked. Call `drop_replica` first.
386 ///
387 /// Panics if a storage instance with the given ID does not exist.
388 fn drop_instance(&mut self, id: StorageInstanceId);
389
390 /// Updates a storage instance's workload class.
391 fn update_instance_workload_class(
392 &mut self,
393 id: StorageInstanceId,
394 workload_class: Option<String>,
395 );
396
397 /// Connects the storage instance to the specified replica.
398 ///
399 /// If the storage instance is already attached to a replica, communication
400 /// with that replica is severed in favor of the new replica.
401 ///
402 /// In the future, this API will be adjusted to support active replication
403 /// of storage instances (i.e., multiple replicas attached to a given
404 /// storage instance).
405 fn connect_replica(
406 &mut self,
407 instance_id: StorageInstanceId,
408 replica_id: ReplicaId,
409 location: ClusterReplicaLocation,
410 );
411
412 /// Disconnects the storage instance from the specified replica.
413 fn drop_replica(&mut self, instance_id: StorageInstanceId, replica_id: ReplicaId);
414
415 /// Across versions of Materialize the nullability of columns for some objects can change based
416 /// on updates to our optimizer.
417 ///
418 /// During bootstrap we will register these new schemas with Persist.
419 ///
420 /// See: <https://github.com/MaterializeInc/database-issues/issues/2488>
421 async fn evolve_nullability_for_bootstrap(
422 &mut self,
423 storage_metadata: &StorageMetadata,
424 collections: Vec<(GlobalId, RelationDesc)>,
425 ) -> Result<(), StorageError<Self::Timestamp>>;
426
427 /// Create the sources described in the individual RunIngestionCommand commands.
428 ///
429 /// Each command carries the source id, the source description, and any associated metadata
430 /// needed to ingest the particular source.
431 ///
432 /// This command installs collection state for the indicated sources, and they are
433 /// now valid to use in queries at times beyond the initial `since` frontiers. Each
434 /// collection also acquires a read capability at this frontier, which will need to
435 /// be repeatedly downgraded with `allow_compaction()` to permit compaction.
436 ///
437 /// This method is NOT idempotent; It can fail between processing of different
438 /// collections and leave the controller in an inconsistent state. It is almost
439 /// always wrong to do anything but abort the process on `Err`.
440 ///
441 /// The `register_ts` is used as the initial timestamp that tables are available for reads. (We
442 /// might later give non-tables the same treatment, but hold off on that initially.) Callers
443 /// must provide a Some if any of the collections is a table. A None may be given if none of the
444 /// collections are a table (i.e. all materialized views, sources, etc).
445 async fn create_collections(
446 &mut self,
447 storage_metadata: &StorageMetadata,
448 register_ts: Option<Self::Timestamp>,
449 collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
450 ) -> Result<(), StorageError<Self::Timestamp>> {
451 self.create_collections_for_bootstrap(
452 storage_metadata,
453 register_ts,
454 collections,
455 &BTreeSet::new(),
456 )
457 .await
458 }
459
460 /// Like [`Self::create_collections`], except used specifically for bootstrap.
461 ///
462 /// `migrated_storage_collections` is a set of migrated storage collections to be excluded
463 /// from the txn-wal sub-system.
464 async fn create_collections_for_bootstrap(
465 &mut self,
466 storage_metadata: &StorageMetadata,
467 register_ts: Option<Self::Timestamp>,
468 collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
469 migrated_storage_collections: &BTreeSet<GlobalId>,
470 ) -> Result<(), StorageError<Self::Timestamp>>;
471
472 /// Check that the ingestion associated with `id` can use the provided
473 /// [`SourceDesc`].
474 ///
475 /// Note that this check is optimistic and its return of `Ok(())` does not
476 /// guarantee that subsequent calls to `alter_ingestion_source_desc` are
477 /// guaranteed to succeed.
478 fn check_alter_ingestion_source_desc(
479 &mut self,
480 ingestion_id: GlobalId,
481 source_desc: &SourceDesc,
482 ) -> Result<(), StorageError<Self::Timestamp>>;
483
484 /// Alters each identified collection to use the correlated [`GenericSourceConnection`].
485 async fn alter_ingestion_connections(
486 &mut self,
487 source_connections: BTreeMap<GlobalId, GenericSourceConnection<InlinedConnection>>,
488 ) -> Result<(), StorageError<Self::Timestamp>>;
489
490 /// Alters the data config for the specified source exports of the specified ingestions.
491 async fn alter_ingestion_export_data_configs(
492 &mut self,
493 source_exports: BTreeMap<GlobalId, SourceExportDataConfig>,
494 ) -> Result<(), StorageError<Self::Timestamp>>;
495
496 async fn alter_table_desc(
497 &mut self,
498 existing_collection: GlobalId,
499 new_collection: GlobalId,
500 new_desc: RelationDesc,
501 expected_version: RelationVersion,
502 register_ts: Self::Timestamp,
503 ) -> Result<(), StorageError<Self::Timestamp>>;
504
505 /// Acquire an immutable reference to the export state, should it exist.
506 fn export(
507 &self,
508 id: GlobalId,
509 ) -> Result<&ExportState<Self::Timestamp>, StorageError<Self::Timestamp>>;
510
511 /// Acquire a mutable reference to the export state, should it exist.
512 fn export_mut(
513 &mut self,
514 id: GlobalId,
515 ) -> Result<&mut ExportState<Self::Timestamp>, StorageError<Self::Timestamp>>;
516
517 /// Create a oneshot ingestion.
518 async fn create_oneshot_ingestion(
519 &mut self,
520 ingestion_id: uuid::Uuid,
521 collection_id: GlobalId,
522 instance_id: StorageInstanceId,
523 request: OneshotIngestionRequest,
524 result_tx: OneshotResultCallback<ProtoBatch>,
525 ) -> Result<(), StorageError<Self::Timestamp>>;
526
527 /// Cancel a oneshot ingestion.
528 fn cancel_oneshot_ingestion(
529 &mut self,
530 ingestion_id: uuid::Uuid,
531 ) -> Result<(), StorageError<Self::Timestamp>>;
532
533 /// Alter the sink identified by the given id to match the provided `ExportDescription`.
534 async fn alter_export(
535 &mut self,
536 id: GlobalId,
537 export: ExportDescription<Self::Timestamp>,
538 ) -> Result<(), StorageError<Self::Timestamp>>;
539
540 /// For each identified export, alter its [`StorageSinkConnection`].
541 async fn alter_export_connections(
542 &mut self,
543 exports: BTreeMap<GlobalId, StorageSinkConnection>,
544 ) -> Result<(), StorageError<Self::Timestamp>>;
545
546 /// Drops the read capability for the tables and allows their resources to be reclaimed.
547 fn drop_tables(
548 &mut self,
549 storage_metadata: &StorageMetadata,
550 identifiers: Vec<GlobalId>,
551 ts: Self::Timestamp,
552 ) -> Result<(), StorageError<Self::Timestamp>>;
553
554 /// Drops the read capability for the sources and allows their resources to be reclaimed.
555 fn drop_sources(
556 &mut self,
557 storage_metadata: &StorageMetadata,
558 identifiers: Vec<GlobalId>,
559 ) -> Result<(), StorageError<Self::Timestamp>>;
560
561 /// Drops the read capability for the sinks and allows their resources to be reclaimed.
562 fn drop_sinks(
563 &mut self,
564 storage_metadata: &StorageMetadata,
565 identifiers: Vec<GlobalId>,
566 ) -> Result<(), StorageError<Self::Timestamp>>;
567
568 /// Drops the read capability for the sinks and allows their resources to be reclaimed.
569 ///
570 /// TODO(jkosh44): This method does not validate the provided identifiers. Currently when the
571 /// controller starts/restarts it has no durable state. That means that it has no way of
572 /// remembering any past commands sent. In the future we plan on persisting state for the
573 /// controller so that it is aware of past commands.
574 /// Therefore this method is for dropping sinks that we know to have been previously
575 /// created, but have been forgotten by the controller due to a restart.
576 /// Once command history becomes durable we can remove this method and use the normal
577 /// `drop_sinks`.
578 fn drop_sinks_unvalidated(
579 &mut self,
580 storage_metadata: &StorageMetadata,
581 identifiers: Vec<GlobalId>,
582 );
583
584 /// Drops the read capability for the sources and allows their resources to be reclaimed.
585 ///
586 /// TODO(jkosh44): This method does not validate the provided identifiers. Currently when the
587 /// controller starts/restarts it has no durable state. That means that it has no way of
588 /// remembering any past commands sent. In the future we plan on persisting state for the
589 /// controller so that it is aware of past commands.
590 /// Therefore this method is for dropping sources that we know to have been previously
591 /// created, but have been forgotten by the controller due to a restart.
592 /// Once command history becomes durable we can remove this method and use the normal
593 /// `drop_sources`.
594 fn drop_sources_unvalidated(
595 &mut self,
596 storage_metadata: &StorageMetadata,
597 identifiers: Vec<GlobalId>,
598 ) -> Result<(), StorageError<Self::Timestamp>>;
599
600 /// Append `updates` into the local input named `id` and advance its upper to `upper`.
601 ///
602 /// The method returns a oneshot that can be awaited to indicate completion of the write.
603 /// The method may return an error, indicating an immediately visible error, and also the
604 /// oneshot may return an error if one is encountered during the write.
605 ///
606 /// All updates in `commands` are applied atomically.
607 // TODO(petrosagg): switch upper to `Antichain<Timestamp>`
608 fn append_table(
609 &mut self,
610 write_ts: Self::Timestamp,
611 advance_to: Self::Timestamp,
612 commands: Vec<(GlobalId, Vec<TableData>)>,
613 ) -> Result<
614 tokio::sync::oneshot::Receiver<Result<(), StorageError<Self::Timestamp>>>,
615 StorageError<Self::Timestamp>,
616 >;
617
618 /// Returns a [`MonotonicAppender`] which is a channel that can be used to monotonically
619 /// append to the specified [`GlobalId`].
620 fn monotonic_appender(
621 &self,
622 id: GlobalId,
623 ) -> Result<MonotonicAppender<Self::Timestamp>, StorageError<Self::Timestamp>>;
624
625 /// Returns a shared [`WebhookStatistics`] which can be used to report user-facing
626 /// statistics for this given webhhook, specified by the [`GlobalId`].
627 ///
628 // This is used to support a fairly special case, where a source needs to report statistics
629 // from outside the ordinary controller-clusterd path. Its possible to merge this with
630 // `monotonic_appender`, whose only current user is webhooks, but given that they will
631 // likely be moved to clusterd, we just leave this a special case.
632 fn webhook_statistics(
633 &self,
634 id: GlobalId,
635 ) -> Result<Arc<WebhookStatistics>, StorageError<Self::Timestamp>>;
636
637 /// Waits until the controller is ready to process a response.
638 ///
639 /// This method may block for an arbitrarily long time.
640 ///
641 /// When the method returns, the owner should call
642 /// [`StorageController::process`] to process the ready message.
643 ///
644 /// This method is cancellation safe.
645 async fn ready(&mut self);
646
647 /// Processes the work queued by [`StorageController::ready`].
648 fn process(
649 &mut self,
650 storage_metadata: &StorageMetadata,
651 ) -> Result<Option<Response<Self::Timestamp>>, anyhow::Error>;
652
653 /// Exposes the internal state of the data shard for debugging and QA.
654 ///
655 /// We'll be thoughtful about making unnecessary changes, but the **output
656 /// of this method needs to be gated from users**, so that it's not subject
657 /// to our backward compatibility guarantees.
658 ///
659 /// TODO: Ideally this would return `impl Serialize` so the caller can do
660 /// with it what they like, but that doesn't work in traits yet. The
661 /// workaround (an associated type) doesn't work because persist doesn't
662 /// want to make the type public. In the meantime, move the `serde_json`
663 /// call from the single user into this method.
664 async fn inspect_persist_state(&self, id: GlobalId)
665 -> Result<serde_json::Value, anyhow::Error>;
666
667 /// Records append-only updates for the given introspection type.
668 ///
669 /// Rows passed in `updates` MUST have the correct schema for the given
670 /// introspection type, as readers rely on this and might panic otherwise.
671 fn append_introspection_updates(&mut self, type_: IntrospectionType, updates: Vec<(Row, Diff)>);
672
673 /// Records append-only status updates for the given introspection type.
674 fn append_status_introspection_updates(
675 &mut self,
676 type_: IntrospectionType,
677 updates: Vec<StatusUpdate>,
678 );
679
680 /// Updates the desired state of the given introspection type.
681 ///
682 /// Rows passed in `op` MUST have the correct schema for the given
683 /// introspection type, as readers rely on this and might panic otherwise.
684 fn update_introspection_collection(&mut self, type_: IntrospectionType, op: StorageWriteOp);
685
686 /// Returns a sender for updates to the specified append-only introspection collection.
687 ///
688 /// # Panics
689 ///
690 /// Panics if the given introspection type is not associated with an append-only collection.
691 fn append_only_introspection_tx(
692 &self,
693 type_: IntrospectionType,
694 ) -> mpsc::UnboundedSender<(
695 Vec<AppendOnlyUpdate>,
696 oneshot::Sender<Result<(), StorageError<Self::Timestamp>>>,
697 )>;
698
699 /// Returns a sender for updates to the specified differential introspection collection.
700 ///
701 /// # Panics
702 ///
703 /// Panics if the given introspection type is not associated with a differential collection.
704 fn differential_introspection_tx(
705 &self,
706 type_: IntrospectionType,
707 ) -> mpsc::UnboundedSender<(
708 StorageWriteOp,
709 oneshot::Sender<Result<(), StorageError<Self::Timestamp>>>,
710 )>;
711
712 async fn real_time_recent_timestamp(
713 &self,
714 source_ids: BTreeSet<GlobalId>,
715 timeout: Duration,
716 ) -> Result<
717 BoxFuture<Result<Self::Timestamp, StorageError<Self::Timestamp>>>,
718 StorageError<Self::Timestamp>,
719 >;
720}
721
722impl<T> DataSource<T> {
723 /// Returns true if the storage controller manages the data shard for this
724 /// source using txn-wal.
725 pub fn in_txns(&self) -> bool {
726 match self {
727 DataSource::Table { .. } => true,
728 DataSource::Other
729 | DataSource::Ingestion(_)
730 | DataSource::IngestionExport { .. }
731 | DataSource::Introspection(_)
732 | DataSource::Progress
733 | DataSource::Webhook => false,
734 DataSource::Sink { .. } => false,
735 }
736 }
737}
738
739/// A wrapper struct that presents the adapter token to a format that is understandable by persist
740/// and also allows us to differentiate between a token being present versus being set for the
741/// first time.
742#[derive(PartialEq, Clone, Debug)]
743pub struct PersistEpoch(pub Option<NonZeroI64>);
744
745impl Opaque for PersistEpoch {
746 fn initial() -> Self {
747 PersistEpoch(None)
748 }
749}
750
751impl Codec64 for PersistEpoch {
752 fn codec_name() -> String {
753 "PersistEpoch".to_owned()
754 }
755
756 fn encode(&self) -> [u8; 8] {
757 self.0.map(NonZeroI64::get).unwrap_or(0).to_le_bytes()
758 }
759
760 fn decode(buf: [u8; 8]) -> Self {
761 Self(NonZeroI64::new(i64::from_le_bytes(buf)))
762 }
763}
764
765impl From<NonZeroI64> for PersistEpoch {
766 fn from(epoch: NonZeroI64) -> Self {
767 Self(Some(epoch))
768 }
769}
770
771/// State maintained about individual exports.
772#[derive(Debug)]
773pub struct ExportState<T: TimelyTimestamp> {
774 /// Really only for keeping track of changes to the `derived_since`.
775 pub read_capabilities: MutableAntichain<T>,
776
777 /// The cluster this export is associated with.
778 pub cluster_id: StorageInstanceId,
779
780 /// The current since frontier, derived from `write_frontier` using
781 /// `hold_policy`.
782 pub derived_since: Antichain<T>,
783
784 /// The read holds that this export has on its dependencies (its input and itself). When
785 /// the upper of the export changes, we downgrade this, which in turn
786 /// downgrades holds we have on our dependencies' sinces.
787 pub read_holds: [ReadHold<T>; 2],
788
789 /// The policy to use to downgrade `self.read_capability`.
790 pub read_policy: ReadPolicy<T>,
791
792 /// Reported write frontier.
793 pub write_frontier: Antichain<T>,
794}
795
796impl<T: Timestamp> ExportState<T> {
797 pub fn new(
798 cluster_id: StorageInstanceId,
799 read_hold: ReadHold<T>,
800 self_hold: ReadHold<T>,
801 write_frontier: Antichain<T>,
802 read_policy: ReadPolicy<T>,
803 ) -> Self
804 where
805 T: Lattice,
806 {
807 let mut dependency_since = Antichain::from_elem(T::minimum());
808 for read_hold in [&read_hold, &self_hold] {
809 dependency_since.join_assign(read_hold.since());
810 }
811 Self {
812 read_capabilities: MutableAntichain::from(dependency_since.borrow()),
813 cluster_id,
814 derived_since: dependency_since,
815 read_holds: [read_hold, self_hold],
816 read_policy,
817 write_frontier,
818 }
819 }
820
821 /// Returns the cluster to which the export is bound.
822 pub fn cluster_id(&self) -> StorageInstanceId {
823 self.cluster_id
824 }
825
826 /// Returns the cluster to which the export is bound.
827 pub fn input_hold(&self) -> &ReadHold<T> {
828 &self.read_holds[0]
829 }
830
831 /// Returns whether the export was dropped.
832 pub fn is_dropped(&self) -> bool {
833 self.read_holds.iter().all(|h| h.since().is_empty())
834 }
835}
836/// A channel that allows you to append a set of updates to a pre-defined [`GlobalId`].
837///
838/// See `CollectionManager::monotonic_appender` to acquire a [`MonotonicAppender`].
839#[derive(Clone, Debug)]
840pub struct MonotonicAppender<T> {
841 /// Channel that sends to a [`tokio::task`] which pushes updates to Persist.
842 tx: mpsc::UnboundedSender<(
843 Vec<AppendOnlyUpdate>,
844 oneshot::Sender<Result<(), StorageError<T>>>,
845 )>,
846}
847
848impl<T> MonotonicAppender<T> {
849 pub fn new(
850 tx: mpsc::UnboundedSender<(
851 Vec<AppendOnlyUpdate>,
852 oneshot::Sender<Result<(), StorageError<T>>>,
853 )>,
854 ) -> Self {
855 MonotonicAppender { tx }
856 }
857
858 pub async fn append(&self, updates: Vec<AppendOnlyUpdate>) -> Result<(), StorageError<T>> {
859 let (tx, rx) = oneshot::channel();
860
861 // Send our update to the CollectionManager.
862 self.tx
863 .send((updates, tx))
864 .map_err(|_| StorageError::ShuttingDown("collection manager"))?;
865
866 // Wait for a response, if we fail to receive then the CollectionManager has gone away.
867 let result = rx
868 .await
869 .map_err(|_| StorageError::ShuttingDown("collection manager"))?;
870
871 result
872 }
873}
874
875/// A wallclock lag measurement.
876///
877/// The enum representation reflects the fact that wallclock lag is undefined for unreadable
878/// collections, i.e. collections that contain no readable times.
879#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
880pub enum WallclockLag {
881 /// Lag value in seconds, for readable collections.
882 Seconds(u64),
883 /// Undefined lag, for unreadable collections.
884 Undefined,
885}
886
887impl WallclockLag {
888 /// The smallest possible wallclock lag measurement.
889 pub const MIN: Self = Self::Seconds(0);
890
891 /// Return the maximum of two lag values.
892 ///
893 /// We treat `Undefined` lags as greater than `Seconds`, to ensure we never report low lag
894 /// values when a collection was actually unreadable for some amount of time.
895 pub fn max(self, other: Self) -> Self {
896 match (self, other) {
897 (Self::Seconds(a), Self::Seconds(b)) => Self::Seconds(a.max(b)),
898 (Self::Undefined, _) | (_, Self::Undefined) => Self::Undefined,
899 }
900 }
901
902 /// Return the wrapped seconds value, or a default if the lag is `Undefined`.
903 pub fn unwrap_seconds_or(self, default: u64) -> u64 {
904 match self {
905 Self::Seconds(s) => s,
906 Self::Undefined => default,
907 }
908 }
909
910 /// Create a new `WallclockLag` by transforming the wrapped seconds value.
911 pub fn map_seconds(self, f: impl FnOnce(u64) -> u64) -> Self {
912 match self {
913 Self::Seconds(s) => Self::Seconds(f(s)),
914 Self::Undefined => Self::Undefined,
915 }
916 }
917
918 /// Convert this lag value into a [`Datum::Interval`] or [`Datum::Null`].
919 pub fn into_interval_datum(self) -> Datum<'static> {
920 match self {
921 Self::Seconds(secs) => {
922 let micros = i64::try_from(secs * 1_000_000).expect("must fit");
923 Datum::Interval(Interval::new(0, 0, micros))
924 }
925 Self::Undefined => Datum::Null,
926 }
927 }
928
929 /// Convert this lag value into a [`Datum::UInt64`] or [`Datum::Null`].
930 pub fn into_uint64_datum(self) -> Datum<'static> {
931 match self {
932 Self::Seconds(secs) => Datum::UInt64(secs),
933 Self::Undefined => Datum::Null,
934 }
935 }
936}
937
938/// The period covered by a wallclock lag histogram, represented as a `[start, end)` range.
939#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
940pub struct WallclockLagHistogramPeriod {
941 pub start: CheckedTimestamp<DateTime<Utc>>,
942 pub end: CheckedTimestamp<DateTime<Utc>>,
943}
944
945impl WallclockLagHistogramPeriod {
946 /// Construct a `WallclockLagHistogramPeriod` from the given epoch timestamp and dyncfg.
947 pub fn from_epoch_millis(epoch_ms: u64, dyncfg: &ConfigSet) -> Self {
948 let interval = WALLCLOCK_LAG_HISTOGRAM_PERIOD_INTERVAL.get(dyncfg);
949 let interval_ms = u64::try_from(interval.as_millis()).unwrap_or_else(|_| {
950 soft_panic_or_log!("excessive wallclock lag histogram period interval: {interval:?}");
951 let default = WALLCLOCK_LAG_HISTOGRAM_PERIOD_INTERVAL.default();
952 u64::try_from(default.as_millis()).unwrap()
953 });
954 let interval_ms = std::cmp::max(interval_ms, 1);
955
956 let start_ms = epoch_ms - (epoch_ms % interval_ms);
957 let start_dt = mz_ore::now::to_datetime(start_ms);
958 let start = start_dt.try_into().expect("must fit");
959
960 let end_ms = start_ms + interval_ms;
961 let end_dt = mz_ore::now::to_datetime(end_ms);
962 let end = end_dt.try_into().expect("must fit");
963
964 Self { start, end }
965 }
966}
967
968#[cfg(test)]
969mod tests {
970 use super::*;
971
972 #[mz_ore::test]
973 fn lag_writes_by_zero() {
974 let policy =
975 ReadPolicy::lag_writes_by(mz_repr::Timestamp::default(), mz_repr::Timestamp::default());
976 let write_frontier = Antichain::from_elem(mz_repr::Timestamp::from(5));
977 assert_eq!(policy.frontier(write_frontier.borrow()), write_frontier);
978 }
979}