mz_storage_client/
controller.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A controller that provides an interface to the storage layer.
11//!
12//! The storage controller curates the creation of sources, the progress of readers through these collections,
13//! and their eventual dropping and resource reclamation.
14//!
15//! The storage controller can be viewed as a partial map from `GlobalId` to collection. It is an error to
16//! use an identifier before it has been "created" with `create_source()`. Once created, the controller holds
17//! a read capability for each source, which is manipulated with `update_read_capabilities()`.
18//! Eventually, the source is dropped with either `drop_sources()` or by allowing compaction to the
19//! empty frontier.
20
21use std::collections::{BTreeMap, BTreeSet};
22use std::fmt::Debug;
23use std::future::Future;
24use std::num::NonZeroI64;
25use std::pin::Pin;
26use std::sync::Arc;
27use std::time::Duration;
28
29use async_trait::async_trait;
30use chrono::{DateTime, Utc};
31use differential_dataflow::lattice::Lattice;
32use mz_cluster_client::ReplicaId;
33use mz_cluster_client::client::ClusterReplicaLocation;
34use mz_controller_types::dyncfgs::WALLCLOCK_LAG_HISTOGRAM_PERIOD_INTERVAL;
35use mz_dyncfg::ConfigSet;
36use mz_ore::soft_panic_or_log;
37use mz_persist_client::batch::ProtoBatch;
38use mz_persist_types::{Codec64, Opaque, ShardId};
39use mz_repr::adt::interval::Interval;
40use mz_repr::adt::timestamp::CheckedTimestamp;
41use mz_repr::{Datum, Diff, GlobalId, RelationDesc, RelationVersion, Row};
42use mz_storage_types::configuration::StorageConfiguration;
43use mz_storage_types::connections::inline::InlinedConnection;
44use mz_storage_types::controller::{CollectionMetadata, StorageError};
45use mz_storage_types::errors::CollectionMissing;
46use mz_storage_types::instances::StorageInstanceId;
47use mz_storage_types::oneshot_sources::{OneshotIngestionRequest, OneshotResultCallback};
48use mz_storage_types::parameters::StorageParameters;
49use mz_storage_types::read_holds::ReadHold;
50use mz_storage_types::read_policy::ReadPolicy;
51use mz_storage_types::sinks::{StorageSinkConnection, StorageSinkDesc};
52use mz_storage_types::sources::{
53    GenericSourceConnection, IngestionDescription, SourceDesc, SourceExportDataConfig,
54    SourceExportDetails, Timeline,
55};
56use serde::{Deserialize, Serialize};
57use timely::progress::Timestamp as TimelyTimestamp;
58use timely::progress::frontier::MutableAntichain;
59use timely::progress::{Antichain, Timestamp};
60use tokio::sync::{mpsc, oneshot};
61
62use crate::client::{AppendOnlyUpdate, StatusUpdate, TableData};
63use crate::statistics::WebhookStatistics;
64
65#[derive(Clone, Copy, Debug, Serialize, Deserialize, Eq, PartialEq, Hash, PartialOrd, Ord)]
66pub enum IntrospectionType {
67    /// We're not responsible for appending to this collection automatically, but we should
68    /// automatically bump the write frontier from time to time.
69    SinkStatusHistory,
70    SourceStatusHistory,
71    ShardMapping,
72
73    Frontiers,
74    ReplicaFrontiers,
75
76    ReplicaStatusHistory,
77    ReplicaMetricsHistory,
78    WallclockLagHistory,
79    WallclockLagHistogram,
80
81    // Note that this single-shard introspection source will be changed to per-replica,
82    // once we allow multiplexing multiple sources/sinks on a single cluster.
83    StorageSourceStatistics,
84    StorageSinkStatistics,
85
86    // The below are for statement logging.
87    StatementExecutionHistory,
88    SessionHistory,
89    PreparedStatementHistory,
90    SqlText,
91    // For statement lifecycle logging, which is closely related
92    // to statement logging
93    StatementLifecycleHistory,
94
95    // Collections written by the compute controller.
96    ComputeDependencies,
97    ComputeOperatorHydrationStatus,
98    ComputeMaterializedViewRefreshes,
99    ComputeErrorCounts,
100    ComputeHydrationTimes,
101
102    // Written by the Adapter for tracking AWS PrivateLink Connection Status History
103    PrivatelinkConnectionStatusHistory,
104}
105
106/// Describes how data is written to the collection.
107#[derive(Clone, Debug, Eq, PartialEq)]
108pub enum DataSource<T> {
109    /// Ingest data from some external source.
110    Ingestion(IngestionDescription),
111    /// This source receives its data from the identified ingestion,
112    /// from an external object identified using `SourceExportDetails`.
113    ///
114    /// The referenced ingestion must be created before all of its exports.
115    IngestionExport {
116        ingestion_id: GlobalId,
117        details: SourceExportDetails,
118        data_config: SourceExportDataConfig,
119    },
120    /// Data comes from introspection sources, which the controller itself is
121    /// responsible for generating.
122    Introspection(IntrospectionType),
123    /// Data comes from the source's remapping/reclock operator.
124    Progress,
125    /// Data comes from external HTTP requests pushed to Materialize.
126    Webhook,
127    /// The adapter layer appends timestamped data, i.e. it is a `TABLE`.
128    Table,
129    /// This source's data does not need to be managed by the storage
130    /// controller, e.g. it's a materialized view or the catalog collection.
131    Other,
132    /// This collection is the output collection of a sink.
133    Sink { desc: ExportDescription<T> },
134}
135
136/// Describes a request to create a source.
137#[derive(Clone, Debug, Eq, PartialEq)]
138pub struct CollectionDescription<T> {
139    /// The schema of this collection
140    pub desc: RelationDesc,
141    /// The source of this collection's data.
142    pub data_source: DataSource<T>,
143    /// An optional frontier to which the collection's `since` should be advanced.
144    pub since: Option<Antichain<T>>,
145    /// A GlobalId to use for this collection to use for the status collection.
146    /// Used to keep track of source status/error information.
147    pub status_collection_id: Option<GlobalId>,
148    /// The timeline of the source. Absent for materialized views, continual tasks, etc.
149    pub timeline: Option<Timeline>,
150    /// The primary of this collections.
151    ///
152    /// Multiple storage collections can point to the same persist shard,
153    /// possibly with different schemas. In such a configuration, we select one
154    /// of the involved collections as the primary, who "owns" the persist
155    /// shard. All other involved collections have a dependency on the primary.
156    pub primary: Option<GlobalId>,
157}
158
159impl<T> CollectionDescription<T> {
160    /// Create a CollectionDescription for [`DataSource::Other`].
161    pub fn for_other(desc: RelationDesc, since: Option<Antichain<T>>) -> Self {
162        Self {
163            desc,
164            data_source: DataSource::Other,
165            since,
166            status_collection_id: None,
167            timeline: None,
168            primary: None,
169        }
170    }
171
172    /// Create a CollectionDescription for a table.
173    pub fn for_table(desc: RelationDesc) -> Self {
174        Self {
175            desc,
176            data_source: DataSource::Table,
177            since: None,
178            status_collection_id: None,
179            timeline: Some(Timeline::EpochMilliseconds),
180            primary: None,
181        }
182    }
183}
184
185#[derive(Clone, Debug, Eq, PartialEq)]
186pub struct ExportDescription<T = mz_repr::Timestamp> {
187    pub sink: StorageSinkDesc<(), T>,
188    /// The ID of the instance in which to install the export.
189    pub instance_id: StorageInstanceId,
190}
191
192#[derive(Debug)]
193pub enum Response<T> {
194    FrontierUpdates(Vec<(GlobalId, Antichain<T>)>),
195}
196
197/// Metadata that the storage controller must know to properly handle the life
198/// cycle of creating and dropping collections.
199///
200/// This data should be kept consistent with the state modified using
201/// [`StorageTxn`].
202///
203/// n.b. the "txn WAL shard" is also metadata that's persisted, but if we
204/// included it in this struct it would never be read.
205#[derive(Debug, Clone, Serialize, Default)]
206pub struct StorageMetadata {
207    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
208    pub collection_metadata: BTreeMap<GlobalId, ShardId>,
209    pub unfinalized_shards: BTreeSet<ShardId>,
210}
211
212impl StorageMetadata {
213    pub fn get_collection_shard<T>(&self, id: GlobalId) -> Result<ShardId, StorageError<T>> {
214        let shard_id = self
215            .collection_metadata
216            .get(&id)
217            .ok_or(StorageError::IdentifierMissing(id))?;
218
219        Ok(*shard_id)
220    }
221}
222
223/// Provides an interface for the storage controller to read and write data that
224/// is recorded elsewhere.
225///
226/// Data written to the implementor of this trait should make a consistent view
227/// of the data available through [`StorageMetadata`].
228#[async_trait]
229pub trait StorageTxn<T> {
230    /// Retrieve all of the visible storage metadata.
231    ///
232    /// The value of this map should be treated as opaque.
233    fn get_collection_metadata(&self) -> BTreeMap<GlobalId, ShardId>;
234
235    /// Add new storage metadata for a collection.
236    ///
237    /// Subsequent calls to [`StorageTxn::get_collection_metadata`] must include
238    /// this data.
239    fn insert_collection_metadata(
240        &mut self,
241        s: BTreeMap<GlobalId, ShardId>,
242    ) -> Result<(), StorageError<T>>;
243
244    /// Remove the metadata associated with the identified collections.
245    ///
246    /// Subsequent calls to [`StorageTxn::get_collection_metadata`] must not
247    /// include these keys.
248    fn delete_collection_metadata(&mut self, ids: BTreeSet<GlobalId>) -> Vec<(GlobalId, ShardId)>;
249
250    /// Retrieve all of the shards that are no longer in use by an active
251    /// collection but are yet to be finalized.
252    fn get_unfinalized_shards(&self) -> BTreeSet<ShardId>;
253
254    /// Insert the specified values as unfinalized shards.
255    fn insert_unfinalized_shards(&mut self, s: BTreeSet<ShardId>) -> Result<(), StorageError<T>>;
256
257    /// Mark the specified shards as finalized, deleting them from the
258    /// unfinalized shard collection.
259    fn mark_shards_as_finalized(&mut self, shards: BTreeSet<ShardId>);
260
261    /// Get the txn WAL shard for this environment if it exists.
262    fn get_txn_wal_shard(&self) -> Option<ShardId>;
263
264    /// Store the specified shard as the environment's txn WAL shard.
265    ///
266    /// The implementor should error if the shard is already specified.
267    fn write_txn_wal_shard(&mut self, shard: ShardId) -> Result<(), StorageError<T>>;
268}
269
270pub type BoxFuture<T> = Pin<Box<dyn Future<Output = T> + Send + 'static>>;
271
272/// A predicate for a `Row` filter.
273pub type RowPredicate = Box<dyn Fn(&Row) -> bool + Send + Sync>;
274
275/// High-level write operations applicable to storage collections.
276pub enum StorageWriteOp {
277    /// Append a set of rows with specified multiplicities.
278    ///
279    /// The multiplicities may be negative, so an `Append` operation can perform
280    /// both insertions and retractions.
281    Append { updates: Vec<(Row, Diff)> },
282    /// Delete all rows matching the given predicate.
283    Delete { filter: RowPredicate },
284}
285
286impl StorageWriteOp {
287    /// Returns whether this operation appends an empty set of updates.
288    pub fn is_empty_append(&self) -> bool {
289        match self {
290            Self::Append { updates } => updates.is_empty(),
291            Self::Delete { .. } => false,
292        }
293    }
294}
295
296#[async_trait(?Send)]
297pub trait StorageController: Debug {
298    type Timestamp: TimelyTimestamp;
299
300    /// Marks the end of any initialization commands.
301    ///
302    /// The implementor may wait for this method to be called before implementing prior commands,
303    /// and so it is important for a user to invoke this method as soon as it is comfortable.
304    /// This method can be invoked immediately, at the potential expense of performance.
305    fn initialization_complete(&mut self);
306
307    /// Update storage configuration with new parameters.
308    fn update_parameters(&mut self, config_params: StorageParameters);
309
310    /// Get the current configuration, including parameters updated with `update_parameters`.
311    fn config(&self) -> &StorageConfiguration;
312
313    /// Returns the [CollectionMetadata] of the collection identified by `id`.
314    fn collection_metadata(&self, id: GlobalId) -> Result<CollectionMetadata, CollectionMissing>;
315
316    /// Returns `true` iff the given collection/ingestion has been hydrated.
317    ///
318    /// For this check, zero-replica clusters are always considered hydrated.
319    /// Their collections would never normally be considered hydrated but it's
320    /// clearly intentional that they have no replicas.
321    fn collection_hydrated(
322        &self,
323        collection_id: GlobalId,
324    ) -> Result<bool, StorageError<Self::Timestamp>>;
325
326    /// Returns `true` if each non-transient, non-excluded collection is
327    /// hydrated on at least one of the provided replicas.
328    ///
329    /// If no replicas are provided, this checks for hydration on _any_ replica.
330    ///
331    /// This also returns `true` in case this cluster does not have any
332    /// replicas.
333    fn collections_hydrated_on_replicas(
334        &self,
335        target_replica_ids: Option<Vec<ReplicaId>>,
336        target_cluster_ids: &StorageInstanceId,
337        exclude_collections: &BTreeSet<GlobalId>,
338    ) -> Result<bool, StorageError<Self::Timestamp>>;
339
340    /// Returns the since/upper frontiers of the identified collection.
341    fn collection_frontiers(
342        &self,
343        id: GlobalId,
344    ) -> Result<(Antichain<Self::Timestamp>, Antichain<Self::Timestamp>), CollectionMissing>;
345
346    /// Returns the since/upper frontiers of the identified collections.
347    ///
348    /// Having a method that returns both frontiers at the same time, for all
349    /// requested collections, ensures that we can get a consistent "snapshot"
350    /// of collection state. If we had separate methods instead, and/or would
351    /// allow getting frontiers for collections one at a time, it could happen
352    /// that collection state changes concurrently, while information is
353    /// gathered.
354    fn collections_frontiers(
355        &self,
356        id: Vec<GlobalId>,
357    ) -> Result<
358        Vec<(
359            GlobalId,
360            Antichain<Self::Timestamp>,
361            Antichain<Self::Timestamp>,
362        )>,
363        CollectionMissing,
364    >;
365
366    /// Acquire an iterator over [CollectionMetadata] for all active
367    /// collections.
368    ///
369    /// A collection is "active" when it has a non-empty frontier of read
370    /// capabilities.
371    fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)>;
372
373    /// Returns the IDs of ingestion exports running on the given instance. This
374    /// includes the ingestion itself, if any, and running source tables (aka.
375    /// subsources).
376    fn active_ingestion_exports(
377        &self,
378        instance_id: StorageInstanceId,
379    ) -> Box<dyn Iterator<Item = &GlobalId> + '_>;
380
381    /// Checks whether a collection exists under the given `GlobalId`. Returns
382    /// an error if the collection does not exist.
383    fn check_exists(&self, id: GlobalId) -> Result<(), StorageError<Self::Timestamp>>;
384
385    /// Creates a storage instance with the specified ID.
386    ///
387    /// A storage instance can have zero or one replicas. The instance is
388    /// created with zero replicas.
389    ///
390    /// Panics if a storage instance with the given ID already exists.
391    fn create_instance(&mut self, id: StorageInstanceId, workload_class: Option<String>);
392
393    /// Drops the storage instance with the given ID.
394    ///
395    /// If you call this method while the storage instance has a replica
396    /// attached, that replica will be leaked. Call `drop_replica` first.
397    ///
398    /// Panics if a storage instance with the given ID does not exist.
399    fn drop_instance(&mut self, id: StorageInstanceId);
400
401    /// Updates a storage instance's workload class.
402    fn update_instance_workload_class(
403        &mut self,
404        id: StorageInstanceId,
405        workload_class: Option<String>,
406    );
407
408    /// Connects the storage instance to the specified replica.
409    ///
410    /// If the storage instance is already attached to a replica, communication
411    /// with that replica is severed in favor of the new replica.
412    ///
413    /// In the future, this API will be adjusted to support active replication
414    /// of storage instances (i.e., multiple replicas attached to a given
415    /// storage instance).
416    fn connect_replica(
417        &mut self,
418        instance_id: StorageInstanceId,
419        replica_id: ReplicaId,
420        location: ClusterReplicaLocation,
421    );
422
423    /// Disconnects the storage instance from the specified replica.
424    fn drop_replica(&mut self, instance_id: StorageInstanceId, replica_id: ReplicaId);
425
426    /// Across versions of Materialize the nullability of columns for some objects can change based
427    /// on updates to our optimizer.
428    ///
429    /// During bootstrap we will register these new schemas with Persist.
430    ///
431    /// See: <https://github.com/MaterializeInc/database-issues/issues/2488>
432    async fn evolve_nullability_for_bootstrap(
433        &mut self,
434        storage_metadata: &StorageMetadata,
435        collections: Vec<(GlobalId, RelationDesc)>,
436    ) -> Result<(), StorageError<Self::Timestamp>>;
437
438    /// Create the sources described in the individual RunIngestionCommand commands.
439    ///
440    /// Each command carries the source id, the source description, and any associated metadata
441    /// needed to ingest the particular source.
442    ///
443    /// This command installs collection state for the indicated sources, and they are
444    /// now valid to use in queries at times beyond the initial `since` frontiers. Each
445    /// collection also acquires a read capability at this frontier, which will need to
446    /// be repeatedly downgraded with `allow_compaction()` to permit compaction.
447    ///
448    /// This method is NOT idempotent; It can fail between processing of different
449    /// collections and leave the controller in an inconsistent state. It is almost
450    /// always wrong to do anything but abort the process on `Err`.
451    ///
452    /// The `register_ts` is used as the initial timestamp that tables are available for reads. (We
453    /// might later give non-tables the same treatment, but hold off on that initially.) Callers
454    /// must provide a Some if any of the collections is a table. A None may be given if none of the
455    /// collections are a table (i.e. all materialized views, sources, etc).
456    async fn create_collections(
457        &mut self,
458        storage_metadata: &StorageMetadata,
459        register_ts: Option<Self::Timestamp>,
460        collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
461    ) -> Result<(), StorageError<Self::Timestamp>> {
462        self.create_collections_for_bootstrap(
463            storage_metadata,
464            register_ts,
465            collections,
466            &BTreeSet::new(),
467        )
468        .await
469    }
470
471    /// Like [`Self::create_collections`], except used specifically for bootstrap.
472    ///
473    /// `migrated_storage_collections` is a set of migrated storage collections to be excluded
474    /// from the txn-wal sub-system.
475    async fn create_collections_for_bootstrap(
476        &mut self,
477        storage_metadata: &StorageMetadata,
478        register_ts: Option<Self::Timestamp>,
479        collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
480        migrated_storage_collections: &BTreeSet<GlobalId>,
481    ) -> Result<(), StorageError<Self::Timestamp>>;
482
483    /// Check that the ingestion associated with `id` can use the provided
484    /// [`SourceDesc`].
485    ///
486    /// Note that this check is optimistic and its return of `Ok(())` does not
487    /// guarantee that subsequent calls to `alter_ingestion_source_desc` are
488    /// guaranteed to succeed.
489    fn check_alter_ingestion_source_desc(
490        &mut self,
491        ingestion_id: GlobalId,
492        source_desc: &SourceDesc,
493    ) -> Result<(), StorageError<Self::Timestamp>>;
494
495    /// Alters each identified collection to use the correlated [`GenericSourceConnection`].
496    async fn alter_ingestion_connections(
497        &mut self,
498        source_connections: BTreeMap<GlobalId, GenericSourceConnection<InlinedConnection>>,
499    ) -> Result<(), StorageError<Self::Timestamp>>;
500
501    /// Alters the data config for the specified source exports of the specified ingestions.
502    async fn alter_ingestion_export_data_configs(
503        &mut self,
504        source_exports: BTreeMap<GlobalId, SourceExportDataConfig>,
505    ) -> Result<(), StorageError<Self::Timestamp>>;
506
507    async fn alter_table_desc(
508        &mut self,
509        existing_collection: GlobalId,
510        new_collection: GlobalId,
511        new_desc: RelationDesc,
512        expected_version: RelationVersion,
513        register_ts: Self::Timestamp,
514    ) -> Result<(), StorageError<Self::Timestamp>>;
515
516    /// Acquire an immutable reference to the export state, should it exist.
517    fn export(
518        &self,
519        id: GlobalId,
520    ) -> Result<&ExportState<Self::Timestamp>, StorageError<Self::Timestamp>>;
521
522    /// Acquire a mutable reference to the export state, should it exist.
523    fn export_mut(
524        &mut self,
525        id: GlobalId,
526    ) -> Result<&mut ExportState<Self::Timestamp>, StorageError<Self::Timestamp>>;
527
528    /// Create a oneshot ingestion.
529    async fn create_oneshot_ingestion(
530        &mut self,
531        ingestion_id: uuid::Uuid,
532        collection_id: GlobalId,
533        instance_id: StorageInstanceId,
534        request: OneshotIngestionRequest,
535        result_tx: OneshotResultCallback<ProtoBatch>,
536    ) -> Result<(), StorageError<Self::Timestamp>>;
537
538    /// Cancel a oneshot ingestion.
539    fn cancel_oneshot_ingestion(
540        &mut self,
541        ingestion_id: uuid::Uuid,
542    ) -> Result<(), StorageError<Self::Timestamp>>;
543
544    /// Alter the sink identified by the given id to match the provided `ExportDescription`.
545    async fn alter_export(
546        &mut self,
547        id: GlobalId,
548        export: ExportDescription<Self::Timestamp>,
549    ) -> Result<(), StorageError<Self::Timestamp>>;
550
551    /// For each identified export, alter its [`StorageSinkConnection`].
552    async fn alter_export_connections(
553        &mut self,
554        exports: BTreeMap<GlobalId, StorageSinkConnection>,
555    ) -> Result<(), StorageError<Self::Timestamp>>;
556
557    /// Drops the read capability for the tables and allows their resources to be reclaimed.
558    fn drop_tables(
559        &mut self,
560        storage_metadata: &StorageMetadata,
561        identifiers: Vec<GlobalId>,
562        ts: Self::Timestamp,
563    ) -> Result<(), StorageError<Self::Timestamp>>;
564
565    /// Drops the read capability for the sources and allows their resources to be reclaimed.
566    fn drop_sources(
567        &mut self,
568        storage_metadata: &StorageMetadata,
569        identifiers: Vec<GlobalId>,
570    ) -> Result<(), StorageError<Self::Timestamp>>;
571
572    /// Drops the read capability for the sinks and allows their resources to be reclaimed.
573    fn drop_sinks(
574        &mut self,
575        storage_metadata: &StorageMetadata,
576        identifiers: Vec<GlobalId>,
577    ) -> Result<(), StorageError<Self::Timestamp>>;
578
579    /// Drops the read capability for the sinks and allows their resources to be reclaimed.
580    ///
581    /// TODO(jkosh44): This method does not validate the provided identifiers. Currently when the
582    ///     controller starts/restarts it has no durable state. That means that it has no way of
583    ///     remembering any past commands sent. In the future we plan on persisting state for the
584    ///     controller so that it is aware of past commands.
585    ///     Therefore this method is for dropping sinks that we know to have been previously
586    ///     created, but have been forgotten by the controller due to a restart.
587    ///     Once command history becomes durable we can remove this method and use the normal
588    ///     `drop_sinks`.
589    fn drop_sinks_unvalidated(
590        &mut self,
591        storage_metadata: &StorageMetadata,
592        identifiers: Vec<GlobalId>,
593    );
594
595    /// Drops the read capability for the sources and allows their resources to be reclaimed.
596    ///
597    /// TODO(jkosh44): This method does not validate the provided identifiers. Currently when the
598    ///     controller starts/restarts it has no durable state. That means that it has no way of
599    ///     remembering any past commands sent. In the future we plan on persisting state for the
600    ///     controller so that it is aware of past commands.
601    ///     Therefore this method is for dropping sources that we know to have been previously
602    ///     created, but have been forgotten by the controller due to a restart.
603    ///     Once command history becomes durable we can remove this method and use the normal
604    ///     `drop_sources`.
605    fn drop_sources_unvalidated(
606        &mut self,
607        storage_metadata: &StorageMetadata,
608        identifiers: Vec<GlobalId>,
609    ) -> Result<(), StorageError<Self::Timestamp>>;
610
611    /// Append `updates` into the local input named `id` and advance its upper to `upper`.
612    ///
613    /// The method returns a oneshot that can be awaited to indicate completion of the write.
614    /// The method may return an error, indicating an immediately visible error, and also the
615    /// oneshot may return an error if one is encountered during the write.
616    ///
617    /// All updates in `commands` are applied atomically.
618    // TODO(petrosagg): switch upper to `Antichain<Timestamp>`
619    fn append_table(
620        &mut self,
621        write_ts: Self::Timestamp,
622        advance_to: Self::Timestamp,
623        commands: Vec<(GlobalId, Vec<TableData>)>,
624    ) -> Result<
625        tokio::sync::oneshot::Receiver<Result<(), StorageError<Self::Timestamp>>>,
626        StorageError<Self::Timestamp>,
627    >;
628
629    /// Returns a [`MonotonicAppender`] which is a channel that can be used to monotonically
630    /// append to the specified [`GlobalId`].
631    fn monotonic_appender(
632        &self,
633        id: GlobalId,
634    ) -> Result<MonotonicAppender<Self::Timestamp>, StorageError<Self::Timestamp>>;
635
636    /// Returns a shared [`WebhookStatistics`] which can be used to report user-facing
637    /// statistics for this given webhhook, specified by the [`GlobalId`].
638    ///
639    // This is used to support a fairly special case, where a source needs to report statistics
640    // from outside the ordinary controller-clusterd path. Its possible to merge this with
641    // `monotonic_appender`, whose only current user is webhooks, but given that they will
642    // likely be moved to clusterd, we just leave this a special case.
643    fn webhook_statistics(
644        &self,
645        id: GlobalId,
646    ) -> Result<Arc<WebhookStatistics>, StorageError<Self::Timestamp>>;
647
648    /// Waits until the controller is ready to process a response.
649    ///
650    /// This method may block for an arbitrarily long time.
651    ///
652    /// When the method returns, the owner should call
653    /// [`StorageController::process`] to process the ready message.
654    ///
655    /// This method is cancellation safe.
656    async fn ready(&mut self);
657
658    /// Processes the work queued by [`StorageController::ready`].
659    fn process(
660        &mut self,
661        storage_metadata: &StorageMetadata,
662    ) -> Result<Option<Response<Self::Timestamp>>, anyhow::Error>;
663
664    /// Exposes the internal state of the data shard for debugging and QA.
665    ///
666    /// We'll be thoughtful about making unnecessary changes, but the **output
667    /// of this method needs to be gated from users**, so that it's not subject
668    /// to our backward compatibility guarantees.
669    ///
670    /// TODO: Ideally this would return `impl Serialize` so the caller can do
671    /// with it what they like, but that doesn't work in traits yet. The
672    /// workaround (an associated type) doesn't work because persist doesn't
673    /// want to make the type public. In the meantime, move the `serde_json`
674    /// call from the single user into this method.
675    async fn inspect_persist_state(&self, id: GlobalId)
676    -> Result<serde_json::Value, anyhow::Error>;
677
678    /// Records append-only updates for the given introspection type.
679    ///
680    /// Rows passed in `updates` MUST have the correct schema for the given
681    /// introspection type, as readers rely on this and might panic otherwise.
682    fn append_introspection_updates(&mut self, type_: IntrospectionType, updates: Vec<(Row, Diff)>);
683
684    /// Records append-only status updates for the given introspection type.
685    fn append_status_introspection_updates(
686        &mut self,
687        type_: IntrospectionType,
688        updates: Vec<StatusUpdate>,
689    );
690
691    /// Updates the desired state of the given introspection type.
692    ///
693    /// Rows passed in `op` MUST have the correct schema for the given
694    /// introspection type, as readers rely on this and might panic otherwise.
695    fn update_introspection_collection(&mut self, type_: IntrospectionType, op: StorageWriteOp);
696
697    /// Returns a sender for updates to the specified append-only introspection collection.
698    ///
699    /// # Panics
700    ///
701    /// Panics if the given introspection type is not associated with an append-only collection.
702    fn append_only_introspection_tx(
703        &self,
704        type_: IntrospectionType,
705    ) -> mpsc::UnboundedSender<(
706        Vec<AppendOnlyUpdate>,
707        oneshot::Sender<Result<(), StorageError<Self::Timestamp>>>,
708    )>;
709
710    /// Returns a sender for updates to the specified differential introspection collection.
711    ///
712    /// # Panics
713    ///
714    /// Panics if the given introspection type is not associated with a differential collection.
715    fn differential_introspection_tx(
716        &self,
717        type_: IntrospectionType,
718    ) -> mpsc::UnboundedSender<(
719        StorageWriteOp,
720        oneshot::Sender<Result<(), StorageError<Self::Timestamp>>>,
721    )>;
722
723    async fn real_time_recent_timestamp(
724        &self,
725        source_ids: BTreeSet<GlobalId>,
726        timeout: Duration,
727    ) -> Result<
728        BoxFuture<Result<Self::Timestamp, StorageError<Self::Timestamp>>>,
729        StorageError<Self::Timestamp>,
730    >;
731}
732
733impl<T> DataSource<T> {
734    /// Returns true if the storage controller manages the data shard for this
735    /// source using txn-wal.
736    pub fn in_txns(&self) -> bool {
737        match self {
738            DataSource::Table => true,
739            DataSource::Other
740            | DataSource::Ingestion(_)
741            | DataSource::IngestionExport { .. }
742            | DataSource::Introspection(_)
743            | DataSource::Progress
744            | DataSource::Webhook => false,
745            DataSource::Sink { .. } => false,
746        }
747    }
748}
749
750/// A wrapper struct that presents the adapter token to a format that is understandable by persist
751/// and also allows us to differentiate between a token being present versus being set for the
752/// first time.
753#[derive(PartialEq, Clone, Debug)]
754pub struct PersistEpoch(pub Option<NonZeroI64>);
755
756impl Opaque for PersistEpoch {
757    fn initial() -> Self {
758        PersistEpoch(None)
759    }
760}
761
762impl Codec64 for PersistEpoch {
763    fn codec_name() -> String {
764        "PersistEpoch".to_owned()
765    }
766
767    fn encode(&self) -> [u8; 8] {
768        self.0.map(NonZeroI64::get).unwrap_or(0).to_le_bytes()
769    }
770
771    fn decode(buf: [u8; 8]) -> Self {
772        Self(NonZeroI64::new(i64::from_le_bytes(buf)))
773    }
774}
775
776impl From<NonZeroI64> for PersistEpoch {
777    fn from(epoch: NonZeroI64) -> Self {
778        Self(Some(epoch))
779    }
780}
781
782/// State maintained about individual exports.
783#[derive(Debug)]
784pub struct ExportState<T: TimelyTimestamp> {
785    /// Really only for keeping track of changes to the `derived_since`.
786    pub read_capabilities: MutableAntichain<T>,
787
788    /// The cluster this export is associated with.
789    pub cluster_id: StorageInstanceId,
790
791    /// The current since frontier, derived from `write_frontier` using
792    /// `hold_policy`.
793    pub derived_since: Antichain<T>,
794
795    /// The read holds that this export has on its dependencies (its input and itself). When
796    /// the upper of the export changes, we downgrade this, which in turn
797    /// downgrades holds we have on our dependencies' sinces.
798    pub read_holds: [ReadHold<T>; 2],
799
800    /// The policy to use to downgrade `self.read_capability`.
801    pub read_policy: ReadPolicy<T>,
802
803    /// Reported write frontier.
804    pub write_frontier: Antichain<T>,
805}
806
807impl<T: Timestamp> ExportState<T> {
808    pub fn new(
809        cluster_id: StorageInstanceId,
810        read_hold: ReadHold<T>,
811        self_hold: ReadHold<T>,
812        write_frontier: Antichain<T>,
813        read_policy: ReadPolicy<T>,
814    ) -> Self
815    where
816        T: Lattice,
817    {
818        let mut dependency_since = Antichain::from_elem(T::minimum());
819        for read_hold in [&read_hold, &self_hold] {
820            dependency_since.join_assign(read_hold.since());
821        }
822        Self {
823            read_capabilities: MutableAntichain::from(dependency_since.borrow()),
824            cluster_id,
825            derived_since: dependency_since,
826            read_holds: [read_hold, self_hold],
827            read_policy,
828            write_frontier,
829        }
830    }
831
832    /// Returns the cluster to which the export is bound.
833    pub fn cluster_id(&self) -> StorageInstanceId {
834        self.cluster_id
835    }
836
837    /// Returns the cluster to which the export is bound.
838    pub fn input_hold(&self) -> &ReadHold<T> {
839        &self.read_holds[0]
840    }
841
842    /// Returns whether the export was dropped.
843    pub fn is_dropped(&self) -> bool {
844        self.read_holds.iter().all(|h| h.since().is_empty())
845    }
846}
847/// A channel that allows you to append a set of updates to a pre-defined [`GlobalId`].
848///
849/// See `CollectionManager::monotonic_appender` to acquire a [`MonotonicAppender`].
850#[derive(Clone, Debug)]
851pub struct MonotonicAppender<T> {
852    /// Channel that sends to a [`tokio::task`] which pushes updates to Persist.
853    tx: mpsc::UnboundedSender<(
854        Vec<AppendOnlyUpdate>,
855        oneshot::Sender<Result<(), StorageError<T>>>,
856    )>,
857}
858
859impl<T> MonotonicAppender<T> {
860    pub fn new(
861        tx: mpsc::UnboundedSender<(
862            Vec<AppendOnlyUpdate>,
863            oneshot::Sender<Result<(), StorageError<T>>>,
864        )>,
865    ) -> Self {
866        MonotonicAppender { tx }
867    }
868
869    pub async fn append(&self, updates: Vec<AppendOnlyUpdate>) -> Result<(), StorageError<T>> {
870        let (tx, rx) = oneshot::channel();
871
872        // Send our update to the CollectionManager.
873        self.tx
874            .send((updates, tx))
875            .map_err(|_| StorageError::ShuttingDown("collection manager"))?;
876
877        // Wait for a response, if we fail to receive then the CollectionManager has gone away.
878        let result = rx
879            .await
880            .map_err(|_| StorageError::ShuttingDown("collection manager"))?;
881
882        result
883    }
884}
885
886/// A wallclock lag measurement.
887///
888/// The enum representation reflects the fact that wallclock lag is undefined for unreadable
889/// collections, i.e. collections that contain no readable times.
890#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
891pub enum WallclockLag {
892    /// Lag value in seconds, for readable collections.
893    Seconds(u64),
894    /// Undefined lag, for unreadable collections.
895    Undefined,
896}
897
898impl WallclockLag {
899    /// The smallest possible wallclock lag measurement.
900    pub const MIN: Self = Self::Seconds(0);
901
902    /// Return the maximum of two lag values.
903    ///
904    /// We treat `Undefined` lags as greater than `Seconds`, to ensure we never report low lag
905    /// values when a collection was actually unreadable for some amount of time.
906    pub fn max(self, other: Self) -> Self {
907        match (self, other) {
908            (Self::Seconds(a), Self::Seconds(b)) => Self::Seconds(a.max(b)),
909            (Self::Undefined, _) | (_, Self::Undefined) => Self::Undefined,
910        }
911    }
912
913    /// Return the wrapped seconds value, or a default if the lag is `Undefined`.
914    pub fn unwrap_seconds_or(self, default: u64) -> u64 {
915        match self {
916            Self::Seconds(s) => s,
917            Self::Undefined => default,
918        }
919    }
920
921    /// Create a new `WallclockLag` by transforming the wrapped seconds value.
922    pub fn map_seconds(self, f: impl FnOnce(u64) -> u64) -> Self {
923        match self {
924            Self::Seconds(s) => Self::Seconds(f(s)),
925            Self::Undefined => Self::Undefined,
926        }
927    }
928
929    /// Convert this lag value into a [`Datum::Interval`] or [`Datum::Null`].
930    pub fn into_interval_datum(self) -> Datum<'static> {
931        match self {
932            Self::Seconds(secs) => {
933                let micros = i64::try_from(secs * 1_000_000).expect("must fit");
934                Datum::Interval(Interval::new(0, 0, micros))
935            }
936            Self::Undefined => Datum::Null,
937        }
938    }
939
940    /// Convert this lag value into a [`Datum::UInt64`] or [`Datum::Null`].
941    pub fn into_uint64_datum(self) -> Datum<'static> {
942        match self {
943            Self::Seconds(secs) => Datum::UInt64(secs),
944            Self::Undefined => Datum::Null,
945        }
946    }
947}
948
949/// The period covered by a wallclock lag histogram, represented as a `[start, end)` range.
950#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
951pub struct WallclockLagHistogramPeriod {
952    pub start: CheckedTimestamp<DateTime<Utc>>,
953    pub end: CheckedTimestamp<DateTime<Utc>>,
954}
955
956impl WallclockLagHistogramPeriod {
957    /// Construct a `WallclockLagHistogramPeriod` from the given epoch timestamp and dyncfg.
958    pub fn from_epoch_millis(epoch_ms: u64, dyncfg: &ConfigSet) -> Self {
959        let interval = WALLCLOCK_LAG_HISTOGRAM_PERIOD_INTERVAL.get(dyncfg);
960        let interval_ms = u64::try_from(interval.as_millis()).unwrap_or_else(|_| {
961            soft_panic_or_log!("excessive wallclock lag histogram period interval: {interval:?}");
962            let default = WALLCLOCK_LAG_HISTOGRAM_PERIOD_INTERVAL.default();
963            u64::try_from(default.as_millis()).unwrap()
964        });
965        let interval_ms = std::cmp::max(interval_ms, 1);
966
967        let start_ms = epoch_ms - (epoch_ms % interval_ms);
968        let start_dt = mz_ore::now::to_datetime(start_ms);
969        let start = start_dt.try_into().expect("must fit");
970
971        let end_ms = start_ms + interval_ms;
972        let end_dt = mz_ore::now::to_datetime(end_ms);
973        let end = end_dt.try_into().expect("must fit");
974
975        Self { start, end }
976    }
977}
978
979#[cfg(test)]
980mod tests {
981    use super::*;
982
983    #[mz_ore::test]
984    fn lag_writes_by_zero() {
985        let policy =
986            ReadPolicy::lag_writes_by(mz_repr::Timestamp::default(), mz_repr::Timestamp::default());
987        let write_frontier = Antichain::from_elem(mz_repr::Timestamp::from(5));
988        assert_eq!(policy.frontier(write_frontier.borrow()), write_frontier);
989    }
990}